pykafka.simpleconsumer

class pykafka.simpleconsumer.SimpleConsumer(topic, cluster, consumer_group=None, partitions=None, fetch_message_max_bytes=1048576, num_consumer_fetchers=1, auto_commit_enable=False, auto_commit_interval_ms=60000, queued_max_messages=2000, fetch_min_bytes=1, fetch_error_backoff_ms=500, fetch_wait_max_ms=100, offsets_channel_backoff_ms=1000, offsets_commit_max_retries=5, auto_offset_reset=-2, consumer_timeout_ms=-1, auto_start=True, reset_offset_on_start=False, compacted_topic=False, generation_id=-1, consumer_id='', deserializer=None, reset_offset_on_fetch=True)

Bases: object

A non-balancing consumer for Kafka

__del__()

Stop consumption and workers when object is deleted

__init__(topic, cluster, consumer_group=None, partitions=None, fetch_message_max_bytes=1048576, num_consumer_fetchers=1, auto_commit_enable=False, auto_commit_interval_ms=60000, queued_max_messages=2000, fetch_min_bytes=1, fetch_error_backoff_ms=500, fetch_wait_max_ms=100, offsets_channel_backoff_ms=1000, offsets_commit_max_retries=5, auto_offset_reset=-2, consumer_timeout_ms=-1, auto_start=True, reset_offset_on_start=False, compacted_topic=False, generation_id=-1, consumer_id='', deserializer=None, reset_offset_on_fetch=True)

Create a SimpleConsumer.

Settings and default values are taken from the Scala consumer implementation. Consumer group is included because it’s necessary for offset management, but doesn’t imply that this is a balancing consumer. Use a BalancedConsumer for that.

Parameters:
  • topic (pykafka.topic.Topic) – The topic this consumer should consume
  • cluster (pykafka.cluster.Cluster) – The cluster to which this consumer should connect
  • consumer_group (str) – The name of the consumer group this consumer should use for offset committing and fetching.
  • partitions (Iterable of pykafka.partition.Partition) – Existing partitions to which to connect
  • fetch_message_max_bytes (int) – The number of bytes of messages to attempt to fetch
  • num_consumer_fetchers (int) – The number of workers used to make FetchRequests
  • auto_commit_enable (bool) – If true, periodically commit to kafka the offset of messages already returned from consume() calls. Requires that consumer_group is not None.
  • auto_commit_interval_ms (int) – The frequency (in milliseconds) at which the consumer offsets are committed to kafka. This setting is ignored if auto_commit_enable is False.
  • queued_max_messages (int) – Maximum number of messages buffered for consumption per partition
  • fetch_min_bytes (int) – The minimum amount of data (in bytes) the server should return for a fetch request. If insufficient data is available the request will block until sufficient data is available.
  • fetch_error_backoff_ms (int) – The amount of time (in milliseconds) that the consumer should wait before retrying after an error. Errors include absence of data (RD_KAFKA_RESP_ERR__PARTITION_EOF), so this can slow a normal fetch scenario. Only used by the native consumer (RdKafkaSimpleConsumer).
  • fetch_wait_max_ms (int) – The maximum amount of time (in milliseconds) the server will block before answering the fetch request if there isn’t sufficient data to immediately satisfy fetch_min_bytes.
  • offsets_channel_backoff_ms (int) – Backoff time (in milliseconds) to retry offset commits/fetches
  • offsets_commit_max_retries (int) – Retry the offset commit up to this many times on failure.
  • auto_offset_reset (pykafka.common.OffsetType) – What to do if an offset is out of range. This setting indicates how to reset the consumer’s internal offset counter when an OffsetOutOfRangeError is encountered.
  • consumer_timeout_ms (int) – Amount of time (in milliseconds) the consumer may spend without messages available for consumption before returning None.
  • auto_start (bool) – Whether the consumer should begin communicating with kafka after __init__ is complete. If false, communication can be started with start().
  • reset_offset_on_start (bool) – Whether the consumer should reset its internal offset counter to self._auto_offset_reset and commit that offset immediately upon starting up
  • compacted_topic (bool) – Set to read from a compacted topic. Forces consumer to use less stringent message ordering logic because compacted topics do not provide offsets in strict incrementing order.
  • generation_id (int) – Deprecated::2.7 Do not set if directly instantiating SimpleConsumer. The generation id with which to make group requests
  • consumer_id (bytes) – Deprecated::2.7 Do not set if directly instantiating SimpleConsumer. The identifying string to use for this consumer on group requests
  • deserializer (function) – A function defining how to deserialize messages returned from Kafka. A function with the signature d(value, partition_key) that returns a tuple of (deserialized_value, deserialized_partition_key). The arguments passed to this function are the bytes representations of a message’s value and partition key, and the returned data should be these fields transformed according to the client code’s serialization logic. See pykafka.utils.__init__ for stock implemtations.
  • reset_offset_on_fetch (bool) – Whether to update offsets during fetch_offsets. Disable for read-only use cases to prevent side-effects.
__iter__()

Yield an infinite stream of messages until the consumer times out

__repr__() <==> repr(x)
__weakref__

list of weak references to the object (if defined)

_auto_commit()

Commit offsets only if it’s time to do so

_build_default_error_handlers()

Set up the error handlers to use for partition errors.

_discover_group_coordinator()

Set the group coordinator for this consumer.

If a consumer group is not supplied to __init__, this method does nothing

_raise_worker_exceptions()

Raises exceptions encountered on worker threads

_setup_autocommit_worker()

Start the autocommitter thread

_setup_fetch_workers()

Start the fetcher threads

_update()

Update the consumer and cluster after an ERROR_CODE

_wait_for_slot_available()

Block until at least one queue has less than _queued_max_messages

commit_offsets(partition_offsets=None)

Commit offsets for this consumer’s partitions

Uses the offset commit/fetch API

Parameters:partition_offsets (Sequence of tuples of the form (pykafka.partition.Partition, int)) – (partition, offset) pairs to commit where partition is the partition for which to commit the offset and offset is the offset to commit for the partition. Note that using this argument when auto_commit_enable is enabled can cause inconsistencies in committed offsets. For best results, use either this argument or auto_commit_enable.
consume(block=True, unblock_event=None)

Get one message from the consumer.

Parameters:
  • block (bool) – Whether to block while waiting for a message
  • unblock_event (threading.Event) – Return when the event is set()
fetch()

Fetch new messages for all partitions

Create a FetchRequest for each broker and send it. Enqueue each of the returned messages in the approprate OwnedPartition.

fetch_offsets()

Fetch offsets for this consumer’s topic

Uses the offset commit/fetch API

Returns:List of (id, pykafka.protocol.OffsetFetchPartitionResponse) tuples
held_offsets

Return a map from partition id to held offset for each partition

partitions

A list of the partitions that this consumer consumes

reset_offsets(partition_offsets=None)

Reset offsets for the specified partitions

For each value provided in partition_offsets: if the value is an integer, immediately reset the partition’s internal offset counter to that value. If it’s a datetime.datetime instance or a valid OffsetType, issue a ListOffsetRequest using that timestamp value to discover the latest offset in the latest log segment before that timestamp, then set the partition’s internal counter to that value.

Parameters:partition_offsets (Sequence of tuples of the form (pykafka.partition.Partition, int OR datetime.datetime)) – (partition, timestamp_or_offset) pairs to reset where partition is the partition for which to reset the offset and timestamp_or_offset is EITHER the timestamp before which to find a valid offset to set the partition’s counter to OR the new offset the partition’s counter should be set to.
start()

Begin communicating with Kafka, including setting up worker threads

Fetches offsets, starts an offset autocommitter worker pool, and starts a message fetcher worker pool.

stop()

Flag all running workers for deletion.

topic

The topic this consumer consumes