pykafka.balancedconsumer¶
-
class
pykafka.balancedconsumer.
BalancedConsumer
(topic, cluster, consumer_group, fetch_message_max_bytes=1048576, num_consumer_fetchers=1, auto_commit_enable=False, auto_commit_interval_ms=60000, queued_max_messages=2000, fetch_min_bytes=1, fetch_error_backoff_ms=500, fetch_wait_max_ms=100, offsets_channel_backoff_ms=1000, offsets_commit_max_retries=5, auto_offset_reset=-2, consumer_timeout_ms=-1, rebalance_max_retries=5, rebalance_backoff_ms=2000, zookeeper_connection_timeout_ms=6000, zookeeper_connect=None, zookeeper_hosts='127.0.0.1:2181', zookeeper=None, auto_start=True, reset_offset_on_start=False, post_rebalance_callback=None, use_rdkafka=False, compacted_topic=False, membership_protocol=GroupMembershipProtocol(protocol_type='consumer', protocol_name='range', metadata=<pykafka.protocol.group_membership.ConsumerGroupProtocolMetadata object>, decide_partitions=<function decide_partitions_range>), deserializer=None, reset_offset_on_fetch=True)¶ Bases:
object
A self-balancing consumer for Kafka that uses ZooKeeper to communicate with other balancing consumers.
Maintains a single instance of SimpleConsumer, periodically using the consumer rebalancing algorithm to reassign partitions to this SimpleConsumer.
-
__init__
(topic, cluster, consumer_group, fetch_message_max_bytes=1048576, num_consumer_fetchers=1, auto_commit_enable=False, auto_commit_interval_ms=60000, queued_max_messages=2000, fetch_min_bytes=1, fetch_error_backoff_ms=500, fetch_wait_max_ms=100, offsets_channel_backoff_ms=1000, offsets_commit_max_retries=5, auto_offset_reset=-2, consumer_timeout_ms=-1, rebalance_max_retries=5, rebalance_backoff_ms=2000, zookeeper_connection_timeout_ms=6000, zookeeper_connect=None, zookeeper_hosts='127.0.0.1:2181', zookeeper=None, auto_start=True, reset_offset_on_start=False, post_rebalance_callback=None, use_rdkafka=False, compacted_topic=False, membership_protocol=GroupMembershipProtocol(protocol_type='consumer', protocol_name='range', metadata=<pykafka.protocol.group_membership.ConsumerGroupProtocolMetadata object>, decide_partitions=<function decide_partitions_range>), deserializer=None, reset_offset_on_fetch=True)¶ Create a BalancedConsumer instance
Parameters: - topic (
pykafka.topic.Topic
) – The topic this consumer should consume - cluster (
pykafka.cluster.Cluster
) – The cluster to which this consumer should connect - consumer_group (str) – The name of the consumer group this consumer should join. Consumer group names are namespaced at the cluster level, meaning that two consumers consuming different topics with the same group name will be treated as part of the same group.
- fetch_message_max_bytes (int) – The number of bytes of messages to attempt to fetch with each fetch request
- num_consumer_fetchers (int) – The number of workers used to make FetchRequests
- auto_commit_enable (bool) – If true, periodically commit to kafka the offset of messages already returned from consume() calls. Requires that consumer_group is not None.
- auto_commit_interval_ms (int) – The frequency (in milliseconds) at which the consumer’s offsets are committed to kafka. This setting is ignored if auto_commit_enable is False.
- queued_max_messages (int) – The maximum number of messages buffered for
consumption in the internal
pykafka.simpleconsumer.SimpleConsumer
- fetch_min_bytes (int) – The minimum amount of data (in bytes) that the server should return for a fetch request. If insufficient data is available, the request will block until sufficient data is available.
- fetch_error_backoff_ms (int) – UNUSED.
See
pykafka.simpleconsumer.SimpleConsumer
. - fetch_wait_max_ms (int) – The maximum amount of time (in milliseconds) that the server will block before answering a fetch request if there isn’t sufficient data to immediately satisfy fetch_min_bytes.
- offsets_channel_backoff_ms (int) – Backoff time to retry failed offset commits and fetches.
- offsets_commit_max_retries (int) – The number of times the offset commit worker should retry before raising an error.
- auto_offset_reset (
pykafka.common.OffsetType
) – What to do if an offset is out of range. This setting indicates how to reset the consumer’s internal offset counter when an OffsetOutOfRangeError is encountered. - consumer_timeout_ms (int) – Amount of time (in milliseconds) the consumer may spend without messages available for consumption before returning None.
- rebalance_max_retries (int) – The number of times the rebalance should retry before raising an error.
- rebalance_backoff_ms (int) – Backoff time (in milliseconds) between retries during rebalance.
- zookeeper_connection_timeout_ms (int) – The maximum time (in milliseconds) that the consumer waits while establishing a connection to zookeeper.
- zookeeper_connect (str) – Deprecated::2.7,3.6 Comma-Separated (ip1:port1,ip2:port2) strings indicating the zookeeper nodes to which to connect.
- zookeeper_hosts (str) – KazooClient-formatted string of ZooKeeper hosts to which to connect.
- zookeeper (
kazoo.client.KazooClient
) – A KazooClient connected to a Zookeeper instance. If provided, zookeeper_connect is ignored. - auto_start (bool) – Whether the consumer should begin communicating with zookeeper after __init__ is complete. If false, communication can be started with start().
- reset_offset_on_start (bool) – Whether the consumer should reset its internal offset counter to self._auto_offset_reset and commit that offset immediately upon starting up
- post_rebalance_callback (function) – A function to be called when a rebalance is
in progress. This function should accept three arguments: the
pykafka.balancedconsumer.BalancedConsumer
instance that just completed its rebalance, a dict of partitions that it owned before the rebalance, and a dict of partitions it owns after the rebalance. These dicts map partition ids to the most recently known offsets for those partitions. This function can optionally return a dictionary mapping partition ids to offsets. If it does, the consumer will reset its offsets to the supplied values before continuing consumption. Note that the BalancedConsumer is in a poorly defined state at the time this callback runs, so that accessing its properties (such as held_offsets or partitions) might yield confusing results. Instead, the callback should really rely on the provided partition-id dicts, which are well-defined. - use_rdkafka (bool) – Use librdkafka-backed consumer if available
- compacted_topic (bool) – Set to read from a compacted topic. Forces consumer to use less stringent message ordering logic because compacted topics do not provide offsets in strict incrementing order.
- membership_protocol (
pykafka.membershipprotocol.GroupMembershipProtocol
) – The group membership protocol to which this consumer should adhere - deserializer (function) – A function defining how to deserialize messages returned from Kafka. A function with the signature d(value, partition_key) that returns a tuple of (deserialized_value, deserialized_partition_key). The arguments passed to this function are the bytes representations of a message’s value and partition key, and the returned data should be these fields transformed according to the client code’s serialization logic. See pykafka.utils.__init__ for stock implemtations.
- reset_offset_on_fetch (bool) – Whether to update offsets during fetch_offsets. Disable for read-only use cases to prevent side-effects.
- topic (
-
__iter__
()¶ Yield an infinite stream of messages until the consumer times out
-
__repr__
() <==> repr(x)¶
-
__weakref__
¶ list of weak references to the object (if defined)
-
_add_partitions
(partitions)¶ Add partitions to the zookeeper registry for this consumer.
Parameters: partitions (Iterable of pykafka.partition.Partition
) – The partitions to add.
-
_add_self
()¶ Register this consumer in zookeeper.
-
_build_watch_callback
(fn, proxy)¶ Return a function that’s safe to use as a ChildrenWatch callback
Fixes the issue from https://github.com/Parsely/pykafka/issues/345
-
_get_held_partitions
()¶ Build a set of partitions zookeeper says we own
-
_get_internal_consumer
(partitions=None, start=True)¶ Instantiate a SimpleConsumer for internal use.
If there is already a SimpleConsumer instance held by this object, disable its workers and mark it for garbage collection before creating a new one.
-
_get_participants
()¶ Use zookeeper to get the other consumers of this topic.
Returns: A sorted list of the ids of other consumers of this consumer’s topic
-
_partitions
¶ Convenient shorthand for set of partitions internally held
-
_path_from_partition
(p)¶ Given a partition, return its path in zookeeper.
-
_path_self
¶ Path where this consumer should be registered in zookeeper
-
_raise_worker_exceptions
()¶ Raises exceptions encountered on worker threads
-
_rebalance
()¶ Start the rebalancing process for this consumer
This method is called whenever a zookeeper watch is triggered.
-
_remove_partitions
(partitions)¶ Remove partitions from the zookeeper registry for this consumer.
Parameters: partitions (Iterable of pykafka.partition.Partition
) – The partitions to remove.
-
_set_watches
()¶ Set watches in zookeeper that will trigger rebalances.
Rebalances should be triggered whenever a broker, topic, or consumer znode is changed in zookeeper. This ensures that the balance of the consumer group remains up-to-date with the current state of the cluster.
-
_setup_internal_consumer
(partitions=None, start=True)¶ Instantiate an internal SimpleConsumer instance
-
_setup_zookeeper
(zookeeper_connect, timeout)¶ Open a connection to a ZooKeeper host.
Parameters: - zookeeper_connect (str) – The ‘ip:port’ address of the zookeeper node to which to connect.
- timeout (int) – Connection timeout (in milliseconds)
-
_update_member_assignment
()¶ Decide and assign new partitions for this consumer
-
commit_offsets
(partition_offsets=None)¶ Commit offsets for this consumer’s partitions
Uses the offset commit/fetch API
Parameters: partition_offsets (Sequence of tuples of the form ( pykafka.partition.Partition
, int)) – (partition, offset) pairs to commit where partition is the partition for which to commit the offset and offset is the offset to commit for the partition. Note that using this argument when auto_commit_enable is enabled can cause inconsistencies in committed offsets. For best results, use either this argument or auto_commit_enable.
-
consume
(block=True)¶ Get one message from the consumer
Parameters: block (bool) – Whether to block while waiting for a message
-
held_offsets
¶ Return a map from partition id to held offset for each partition
-
partitions
¶ A list of the partitions that this consumer consumes
-
reset_offsets
(partition_offsets=None)¶ Reset offsets for the specified partitions
For each value provided in partition_offsets: if the value is an integer, immediately reset the partition’s internal offset counter to that value. If it’s a datetime.datetime instance or a valid OffsetType, issue a ListOffsetRequest using that timestamp value to discover the latest offset in the latest log segment before that timestamp, then set the partition’s internal counter to that value.
Parameters: partition_offsets (Sequence of tuples of the form ( pykafka.partition.Partition
, int OR datetime.datetime)) – (partition, timestamp_or_offset) pairs to reset where partition is the partition for which to reset the offset and timestamp_or_offset is EITHER the timestamp before which to find a valid offset to set the partition’s counter to OR the new offset the partition’s counter should be set to
-
start
()¶ Open connections and join a consumer group.
-
stop
()¶ Close the zookeeper connection and stop consuming.
This method should be called as part of a graceful shutdown process.
-
topic
¶ The topic this consumer consumes
-