pykafka.balancedconsumer

class pykafka.balancedconsumer.BalancedConsumer(topic, cluster, consumer_group, fetch_message_max_bytes=1048576, num_consumer_fetchers=1, auto_commit_enable=False, auto_commit_interval_ms=60000, queued_max_messages=2000, fetch_min_bytes=1, fetch_wait_max_ms=100, offsets_channel_backoff_ms=1000, offsets_commit_max_retries=5, auto_offset_reset=-1, consumer_timeout_ms=-1, rebalance_max_retries=5, rebalance_backoff_ms=2000, zookeeper_connection_timeout_ms=6000, zookeeper_connect='127.0.0.1:2181', zookeeper=None, auto_start=True, reset_offset_on_start=False)

A self-balancing consumer for Kafka that uses ZooKeeper to communicate with other balancing consumers.

Maintains a single instance of SimpleConsumer, periodically using the consumer rebalancing algorithm to reassign partitions to this SimpleConsumer.

__init__(topic, cluster, consumer_group, fetch_message_max_bytes=1048576, num_consumer_fetchers=1, auto_commit_enable=False, auto_commit_interval_ms=60000, queued_max_messages=2000, fetch_min_bytes=1, fetch_wait_max_ms=100, offsets_channel_backoff_ms=1000, offsets_commit_max_retries=5, auto_offset_reset=-1, consumer_timeout_ms=-1, rebalance_max_retries=5, rebalance_backoff_ms=2000, zookeeper_connection_timeout_ms=6000, zookeeper_connect='127.0.0.1:2181', zookeeper=None, auto_start=True, reset_offset_on_start=False)

Create a BalancedConsumer instance

Parameters:
  • topic (pykafka.topic.Topic) – The topic this consumer should consume
  • cluster (pykafka.cluster.Cluster) – The cluster to which this consumer should connect
  • consumer_group (str) – The name of the consumer group this consumer should join.
  • fetch_message_max_bytes (int) – The number of bytes of messages to attempt to fetch with each fetch request
  • num_consumer_fetchers (int) – The number of workers used to make FetchRequests
  • auto_commit_enable (bool) – If true, periodically commit to kafka the offset of messages already fetched by this consumer. This also requires that consumer_group is not None.
  • auto_commit_interval_ms (int) – The frequency (in milliseconds) at which the consumer’s offsets are committed to kafka. This setting is ignored if auto_commit_enable is False.
  • queued_max_messages (int) – The maximum number of messages buffered for consumption in the internal pykafka.simpleconsumer.SimpleConsumer
  • fetch_min_bytes (int) – The minimum amount of data (in bytes) that the server should return for a fetch request. If insufficient data is available, the request will block until sufficient data is available.
  • fetch_wait_max_ms (int) – The maximum amount of time (in milliseconds) that the server will block before answering a fetch request if there isn’t sufficient data to immediately satisfy fetch_min_bytes.
  • offsets_channel_backoff_ms (int) – Backoff time to retry failed offset commits and fetches.
  • offsets_commit_max_retries (int) – The number of times the offset commit worker should retry before raising an error.
  • auto_offset_reset (pykafka.common.OffsetType) – What to do if an offset is out of range. This setting indicates how to reset the consumer’s internal offset counter when an OffsetOutOfRangeError is encountered.
  • consumer_timeout_ms (int) – Amount of time (in milliseconds) the consumer may spend without messages available for consumption before returning None.
  • rebalance_max_retries (int) – The number of times the rebalance should retry before raising an error.
  • rebalance_backoff_ms (int) – Backoff time (in milliseconds) between retries during rebalance.
  • zookeeper_connection_timeout_ms (int) – The maximum time (in milliseconds) that the consumer waits while establishing a connection to zookeeper.
  • zookeeper_connect (str) – Comma-separated (ip1:port1,ip2:port2) strings indicating the zookeeper nodes to which to connect.
  • zookeeper (kazoo.client.KazooClient) – A KazooClient connected to a Zookeeper instance. If provided, zookeeper_connect is ignored.
  • auto_start (bool) – Whether the consumer should begin communicating with zookeeper after __init__ is complete. If false, communication can be started with start().
  • reset_offset_on_start (bool) – Whether the consumer should reset its internal offset counter to self._auto_offset_reset and commit that offset immediately upon starting up
_add_partitions(partitions)

Add partitions to the zookeeper registry for this consumer.

Also add these partitions to the consumer’s internal partition registry.

Parameters:partitions (Iterable of pykafka.partition.Partition) – The partitions to add.
_add_self()

Register this consumer in zookeeper.

This method ensures that the number of participants is at most the number of partitions.

_check_held_partitions()

Double-check held partitions against zookeeper

Ensure that the partitions held by this consumer are the ones that zookeeper thinks it’s holding. If not, rebalance.

_decide_partitions(participants)

Decide which partitions belong to this consumer.

Uses the consumer rebalancing algorithm described here http://kafka.apache.org/documentation.html

It is very important that the participants array is sorted, since this algorithm runs on each consumer and indexes into the same array. The same array index operation must return the same result on each consumer.

Parameters:participants (Iterable of str) – Sorted list of ids of all other consumers in this consumer group.
_get_participants()

Use zookeeper to get the other consumers of this topic.

Returns:A sorted list of the ids of the other consumers of this consumer’s topic
_path_from_partition(p)

Given a partition, return its path in zookeeper.

_rebalance()

Claim partitions for this consumer.

This method is called whenever a zookeeper watch is triggered.

_remove_partitions(partitions)

Remove partitions from the zookeeper registry for this consumer.

Also remove these partitions from the consumer’s internal partition registry.

Parameters:partitions (Iterable of pykafka.partition.Partition) – The partitions to remove.
_set_watches()

Set watches in zookeeper that will trigger rebalances.

Rebalances should be triggered whenever a broker, topic, or consumer znode is changed in zookeeper. This ensures that the balance of the consumer group remains up-to-date with the current state of the cluster.

_setup_checker_worker()

Start the zookeeper partition checker thread

_setup_internal_consumer(start=True)

Instantiate an internal SimpleConsumer.

If there is already a SimpleConsumer instance held by this object, disable its workers and mark it for garbage collection before creating a new one.

_setup_zookeeper(zookeeper_connect, timeout)

Open a connection to a ZooKeeper host.

Parameters:
  • zookeeper_connect (str) – The ‘ip:port’ address of the zookeeper node to which to connect.
  • timeout (int) – Connection timeout (in milliseconds)
commit_offsets()

Commit offsets for this consumer’s partitions

Uses the offset commit/fetch API

consume(block=True)

Get one message from the consumer

Parameters:block (bool) – Whether to block while waiting for a message
held_offsets

Return a map from partition id to held offset for each partition

reset_offsets(partition_offsets=None)

Reset offsets for the specified partitions

Issue an OffsetRequest for each partition and set the appropriate returned offset in the OwnedPartition

Parameters:partition_offsets (Iterable of (pykafka.partition.Partition, int)) – (partition, offset) pairs to reset where partition is the partition for which to reset the offset and offset is the new offset the partition should have
start()

Open connections and join a cluster.

stop()

Close the zookeeper connection and stop consuming.

This method should be called as part of a graceful shutdown process.