pykafka.managedbalancedconsumer

class pykafka.managedbalancedconsumer.ManagedBalancedConsumer(topic, cluster, consumer_group, fetch_message_max_bytes=1048576, num_consumer_fetchers=1, auto_commit_enable=False, auto_commit_interval_ms=60000, queued_max_messages=2000, fetch_min_bytes=1, fetch_error_backoff_ms=500, fetch_wait_max_ms=100, offsets_channel_backoff_ms=1000, offsets_commit_max_retries=5, auto_offset_reset=-2, consumer_timeout_ms=-1, rebalance_max_retries=5, rebalance_backoff_ms=2000, auto_start=True, reset_offset_on_start=False, post_rebalance_callback=None, use_rdkafka=False, compacted_topic=False, heartbeat_interval_ms=3000, membership_protocol=GroupMembershipProtocol(protocol_type='consumer', protocol_name='range', metadata=<pykafka.protocol.group_membership.ConsumerGroupProtocolMetadata object>, decide_partitions=<function decide_partitions_range>), deserializer=None, reset_offset_on_fetch=True)

Bases: pykafka.balancedconsumer.BalancedConsumer

A self-balancing consumer that uses Kafka 0.9’s Group Membership API

Implements the Group Management API semantics for Kafka 0.9 compatibility

Maintains a single instance of SimpleConsumer, periodically using the consumer rebalancing algorithm to reassign partitions to this SimpleConsumer.

This class overrides the functionality of pykafka.balancedconsumer.BalancedConsumer that deals with ZooKeeper and inherits other functionality directly.

__init__(topic, cluster, consumer_group, fetch_message_max_bytes=1048576, num_consumer_fetchers=1, auto_commit_enable=False, auto_commit_interval_ms=60000, queued_max_messages=2000, fetch_min_bytes=1, fetch_error_backoff_ms=500, fetch_wait_max_ms=100, offsets_channel_backoff_ms=1000, offsets_commit_max_retries=5, auto_offset_reset=-2, consumer_timeout_ms=-1, rebalance_max_retries=5, rebalance_backoff_ms=2000, auto_start=True, reset_offset_on_start=False, post_rebalance_callback=None, use_rdkafka=False, compacted_topic=False, heartbeat_interval_ms=3000, membership_protocol=GroupMembershipProtocol(protocol_type='consumer', protocol_name='range', metadata=<pykafka.protocol.group_membership.ConsumerGroupProtocolMetadata object>, decide_partitions=<function decide_partitions_range>), deserializer=None, reset_offset_on_fetch=True)

Create a ManagedBalancedConsumer instance

Parameters:
  • topic (pykafka.topic.Topic) – The topic this consumer should consume
  • cluster (pykafka.cluster.Cluster) – The cluster to which this consumer should connect
  • consumer_group (str) – The name of the consumer group this consumer should join. Consumer group names are namespaced at the cluster level, meaning that two consumers consuming different topics with the same group name will be treated as part of the same group.
  • fetch_message_max_bytes (int) – The number of bytes of messages to attempt to fetch with each fetch request
  • num_consumer_fetchers (int) – The number of workers used to make FetchRequests
  • auto_commit_enable (bool) – If true, periodically commit to kafka the offset of messages already fetched by this consumer. This also requires that consumer_group is not None.
  • auto_commit_interval_ms (int) – The frequency (in milliseconds) at which the consumer’s offsets are committed to kafka. This setting is ignored if auto_commit_enable is False.
  • queued_max_messages (int) – The maximum number of messages buffered for consumption in the internal pykafka.simpleconsumer.SimpleConsumer
  • fetch_min_bytes (int) – The minimum amount of data (in bytes) that the server should return for a fetch request. If insufficient data is available, the request will block until sufficient data is available.
  • fetch_error_backoff_ms (int) – UNUSED. See pykafka.simpleconsumer.SimpleConsumer.
  • fetch_wait_max_ms (int) – The maximum amount of time (in milliseconds) that the server will block before answering a fetch request if there isn’t sufficient data to immediately satisfy fetch_min_bytes.
  • offsets_channel_backoff_ms (int) – Backoff time to retry failed offset commits and fetches.
  • offsets_commit_max_retries (int) – The number of times the offset commit worker should retry before raising an error.
  • auto_offset_reset (pykafka.common.OffsetType) – What to do if an offset is out of range. This setting indicates how to reset the consumer’s internal offset counter when an OffsetOutOfRangeError is encountered.
  • consumer_timeout_ms (int) – Amount of time (in milliseconds) the consumer may spend without messages available for consumption before returning None.
  • rebalance_max_retries (int) – The number of times the rebalance should retry before raising an error.
  • rebalance_backoff_ms (int) – Backoff time (in milliseconds) between retries during rebalance.
  • auto_start (bool) – Whether the consumer should start after __init__ is complete. If false, it can be started with start().
  • reset_offset_on_start (bool) – Whether the consumer should reset its internal offset counter to self._auto_offset_reset and commit that offset immediately upon starting up
  • post_rebalance_callback (function) – A function to be called when a rebalance is in progress. This function should accept three arguments: the pykafka.balancedconsumer.BalancedConsumer instance that just completed its rebalance, a dict of partitions that it owned before the rebalance, and a dict of partitions it owns after the rebalance. These dicts map partition ids to the most recently known offsets for those partitions. This function can optionally return a dictionary mapping partition ids to offsets. If it does, the consumer will reset its offsets to the supplied values before continuing consumption. Note that the BalancedConsumer is in a poorly defined state at the time this callback runs, so that accessing its properties (such as held_offsets or partitions) might yield confusing results. Instead, the callback should really rely on the provided partition-id dicts, which are well-defined.
  • use_rdkafka (bool) – Use librdkafka-backed consumer if available
  • compacted_topic (bool) – Set to read from a compacted topic. Forces consumer to use less stringent message ordering logic because compacted topics do not provide offsets in strict incrementing order.
  • heartbeat_interval_ms (int) – The amount of time in milliseconds to wait between heartbeat requests
  • membership_protocol (pykafka.membershipprotocol.GroupMembershipProtocol) – The group membership protocol to which this consumer should adhere
  • deserializer (function) – A function defining how to deserialize messages returned from Kafka. A function with the signature d(value, partition_key) that returns a tuple of (deserialized_value, deserialized_partition_key). The arguments passed to this function are the bytes representations of a message’s value and partition key, and the returned data should be these fields transformed according to the client code’s serialization logic. See pykafka.utils.__init__ for stock implemtations.
  • reset_offset_on_fetch (bool) – Whether to update offsets during fetch_offsets. Disable for read-only use cases to prevent side-effects.
_build_default_error_handlers()

Set up default responses to common error codes

_handle_error(error_code)

Call the appropriate handler function for the given error code

Parameters:error_code (int) – The error code returned from a Group Membership API request
_join_group()

Send a JoinGroupRequest.

Assigns a member id and tells the coordinator about this consumer.

_setup_heartbeat_worker()

Start the heartbeat worker

_sync_group(group_assignments)

Send a SyncGroupRequest.

If this consumer is the group leader, this call informs the other consumers of their partition assignments. For all consumers including the leader, this call is used to fetch partition assignments.

The group leader could tell itself its own assignment instead of using the result of this request, but it does the latter to ensure consistency.

_update_member_assignment()

Join a managed consumer group and start consuming assigned partitions

Equivalent to pykafka.balancedconsumer.BalancedConsumer._update_member_assignment, but uses the Kafka 0.9 Group Membership API instead of ZooKeeper to manage group state

start()

Start this consumer.

Must be called before consume() if auto_start=False.

stop()

Stop this consumer

Should be called as part of a graceful shutdown