pykafka.balancedconsumer

class pykafka.balancedconsumer.BalancedConsumer(topic, cluster, consumer_group, fetch_message_max_bytes=1048576, num_consumer_fetchers=1, auto_commit_enable=False, auto_commit_interval_ms=60000, queued_max_messages=2000, fetch_min_bytes=1, fetch_error_backoff_ms=500, fetch_wait_max_ms=100, offsets_channel_backoff_ms=1000, offsets_commit_max_retries=5, auto_offset_reset=-2, consumer_timeout_ms=-1, rebalance_max_retries=5, rebalance_backoff_ms=2000, zookeeper_connection_timeout_ms=6000, zookeeper_connect=None, zookeeper_hosts='127.0.0.1:2181', zookeeper=None, auto_start=True, reset_offset_on_start=False, post_rebalance_callback=None, use_rdkafka=False, compacted_topic=False, membership_protocol=GroupMembershipProtocol(protocol_type='consumer', protocol_name='range', metadata=<pykafka.protocol.group_membership.ConsumerGroupProtocolMetadata object>, decide_partitions=<function decide_partitions_range>), deserializer=None, reset_offset_on_fetch=True)

Bases: object

A self-balancing consumer for Kafka that uses ZooKeeper to communicate with other balancing consumers.

Maintains a single instance of SimpleConsumer, periodically using the consumer rebalancing algorithm to reassign partitions to this SimpleConsumer.

__init__(topic, cluster, consumer_group, fetch_message_max_bytes=1048576, num_consumer_fetchers=1, auto_commit_enable=False, auto_commit_interval_ms=60000, queued_max_messages=2000, fetch_min_bytes=1, fetch_error_backoff_ms=500, fetch_wait_max_ms=100, offsets_channel_backoff_ms=1000, offsets_commit_max_retries=5, auto_offset_reset=-2, consumer_timeout_ms=-1, rebalance_max_retries=5, rebalance_backoff_ms=2000, zookeeper_connection_timeout_ms=6000, zookeeper_connect=None, zookeeper_hosts='127.0.0.1:2181', zookeeper=None, auto_start=True, reset_offset_on_start=False, post_rebalance_callback=None, use_rdkafka=False, compacted_topic=False, membership_protocol=GroupMembershipProtocol(protocol_type='consumer', protocol_name='range', metadata=<pykafka.protocol.group_membership.ConsumerGroupProtocolMetadata object>, decide_partitions=<function decide_partitions_range>), deserializer=None, reset_offset_on_fetch=True)

Create a BalancedConsumer instance

Parameters:
  • topic (pykafka.topic.Topic) – The topic this consumer should consume
  • cluster (pykafka.cluster.Cluster) – The cluster to which this consumer should connect
  • consumer_group (str) – The name of the consumer group this consumer should join. Consumer group names are namespaced at the cluster level, meaning that two consumers consuming different topics with the same group name will be treated as part of the same group.
  • fetch_message_max_bytes (int) – The number of bytes of messages to attempt to fetch with each fetch request
  • num_consumer_fetchers (int) – The number of workers used to make FetchRequests
  • auto_commit_enable (bool) – If true, periodically commit to kafka the offset of messages already returned from consume() calls. Requires that consumer_group is not None.
  • auto_commit_interval_ms (int) – The frequency (in milliseconds) at which the consumer’s offsets are committed to kafka. This setting is ignored if auto_commit_enable is False.
  • queued_max_messages (int) – The maximum number of messages buffered for consumption in the internal pykafka.simpleconsumer.SimpleConsumer
  • fetch_min_bytes (int) – The minimum amount of data (in bytes) that the server should return for a fetch request. If insufficient data is available, the request will block until sufficient data is available.
  • fetch_error_backoff_ms (int) – UNUSED. See pykafka.simpleconsumer.SimpleConsumer.
  • fetch_wait_max_ms (int) – The maximum amount of time (in milliseconds) that the server will block before answering a fetch request if there isn’t sufficient data to immediately satisfy fetch_min_bytes.
  • offsets_channel_backoff_ms (int) – Backoff time to retry failed offset commits and fetches.
  • offsets_commit_max_retries (int) – The number of times the offset commit worker should retry before raising an error.
  • auto_offset_reset (pykafka.common.OffsetType) – What to do if an offset is out of range. This setting indicates how to reset the consumer’s internal offset counter when an OffsetOutOfRangeError is encountered.
  • consumer_timeout_ms (int) – Amount of time (in milliseconds) the consumer may spend without messages available for consumption before returning None.
  • rebalance_max_retries (int) – The number of times the rebalance should retry before raising an error.
  • rebalance_backoff_ms (int) – Backoff time (in milliseconds) between retries during rebalance.
  • zookeeper_connection_timeout_ms (int) – The maximum time (in milliseconds) that the consumer waits while establishing a connection to zookeeper.
  • zookeeper_connect (str) – Deprecated::2.7,3.6 Comma-Separated (ip1:port1,ip2:port2) strings indicating the zookeeper nodes to which to connect.
  • zookeeper_hosts (str) – KazooClient-formatted string of ZooKeeper hosts to which to connect.
  • zookeeper (kazoo.client.KazooClient) – A KazooClient connected to a Zookeeper instance. If provided, zookeeper_connect is ignored.
  • auto_start (bool) – Whether the consumer should begin communicating with zookeeper after __init__ is complete. If false, communication can be started with start().
  • reset_offset_on_start (bool) – Whether the consumer should reset its internal offset counter to self._auto_offset_reset and commit that offset immediately upon starting up
  • post_rebalance_callback (function) – A function to be called when a rebalance is in progress. This function should accept three arguments: the pykafka.balancedconsumer.BalancedConsumer instance that just completed its rebalance, a dict of partitions that it owned before the rebalance, and a dict of partitions it owns after the rebalance. These dicts map partition ids to the most recently known offsets for those partitions. This function can optionally return a dictionary mapping partition ids to offsets. If it does, the consumer will reset its offsets to the supplied values before continuing consumption. Note that the BalancedConsumer is in a poorly defined state at the time this callback runs, so that accessing its properties (such as held_offsets or partitions) might yield confusing results. Instead, the callback should really rely on the provided partition-id dicts, which are well-defined.
  • use_rdkafka (bool) – Use librdkafka-backed consumer if available
  • compacted_topic (bool) – Set to read from a compacted topic. Forces consumer to use less stringent message ordering logic because compacted topics do not provide offsets in strict incrementing order.
  • membership_protocol (pykafka.membershipprotocol.GroupMembershipProtocol) – The group membership protocol to which this consumer should adhere
  • deserializer (function) – A function defining how to deserialize messages returned from Kafka. A function with the signature d(value, partition_key) that returns a tuple of (deserialized_value, deserialized_partition_key). The arguments passed to this function are the bytes representations of a message’s value and partition key, and the returned data should be these fields transformed according to the client code’s serialization logic. See pykafka.utils.__init__ for stock implemtations.
  • reset_offset_on_fetch (bool) – Whether to update offsets during fetch_offsets. Disable for read-only use cases to prevent side-effects.
__iter__()

Yield an infinite stream of messages until the consumer times out

__repr__() <==> repr(x)
__weakref__

list of weak references to the object (if defined)

_add_partitions(partitions)

Add partitions to the zookeeper registry for this consumer.

Parameters:partitions (Iterable of pykafka.partition.Partition) – The partitions to add.
_add_self()

Register this consumer in zookeeper.

_build_watch_callback(fn, proxy)

Return a function that’s safe to use as a ChildrenWatch callback

Fixes the issue from https://github.com/Parsely/pykafka/issues/345

_get_held_partitions()

Build a set of partitions zookeeper says we own

_get_internal_consumer(partitions=None, start=True)

Instantiate a SimpleConsumer for internal use.

If there is already a SimpleConsumer instance held by this object, disable its workers and mark it for garbage collection before creating a new one.

_get_participants()

Use zookeeper to get the other consumers of this topic.

Returns:A sorted list of the ids of other consumers of this consumer’s topic
_partitions

Convenient shorthand for set of partitions internally held

_path_from_partition(p)

Given a partition, return its path in zookeeper.

_path_self

Path where this consumer should be registered in zookeeper

_raise_worker_exceptions()

Raises exceptions encountered on worker threads

_rebalance()

Start the rebalancing process for this consumer

This method is called whenever a zookeeper watch is triggered.

_remove_partitions(partitions)

Remove partitions from the zookeeper registry for this consumer.

Parameters:partitions (Iterable of pykafka.partition.Partition) – The partitions to remove.
_set_watches()

Set watches in zookeeper that will trigger rebalances.

Rebalances should be triggered whenever a broker, topic, or consumer znode is changed in zookeeper. This ensures that the balance of the consumer group remains up-to-date with the current state of the cluster.

_setup_internal_consumer(partitions=None, start=True)

Instantiate an internal SimpleConsumer instance

_setup_zookeeper(zookeeper_connect, timeout)

Open a connection to a ZooKeeper host.

Parameters:
  • zookeeper_connect (str) – The ‘ip:port’ address of the zookeeper node to which to connect.
  • timeout (int) – Connection timeout (in milliseconds)
_update_member_assignment()

Decide and assign new partitions for this consumer

commit_offsets(partition_offsets=None)

Commit offsets for this consumer’s partitions

Uses the offset commit/fetch API

Parameters:partition_offsets (Sequence of tuples of the form (pykafka.partition.Partition, int)) – (partition, offset) pairs to commit where partition is the partition for which to commit the offset and offset is the offset to commit for the partition. Note that using this argument when auto_commit_enable is enabled can cause inconsistencies in committed offsets. For best results, use either this argument or auto_commit_enable.
consume(block=True)

Get one message from the consumer

Parameters:block (bool) – Whether to block while waiting for a message
held_offsets

Return a map from partition id to held offset for each partition

partitions

A list of the partitions that this consumer consumes

reset_offsets(partition_offsets=None)

Reset offsets for the specified partitions

For each value provided in partition_offsets: if the value is an integer, immediately reset the partition’s internal offset counter to that value. If it’s a datetime.datetime instance or a valid OffsetType, issue a ListOffsetRequest using that timestamp value to discover the latest offset in the latest log segment before that timestamp, then set the partition’s internal counter to that value.

Parameters:partition_offsets (Sequence of tuples of the form (pykafka.partition.Partition, int OR datetime.datetime)) – (partition, timestamp_or_offset) pairs to reset where partition is the partition for which to reset the offset and timestamp_or_offset is EITHER the timestamp before which to find a valid offset to set the partition’s counter to OR the new offset the partition’s counter should be set to
start()

Open connections and join a consumer group.

stop()

Close the zookeeper connection and stop consuming.

This method should be called as part of a graceful shutdown process.

topic

The topic this consumer consumes