SimpleConsumer(topic, cluster, consumer_group=None, partitions=None, fetch_message_max_bytes=1048576, num_consumer_fetchers=1, auto_commit_enable=False, auto_commit_interval_ms=60000, queued_max_messages=2000, fetch_min_bytes=1, fetch_error_backoff_ms=500, fetch_wait_max_ms=100, offsets_channel_backoff_ms=1000, offsets_commit_max_retries=5, auto_offset_reset=-2, consumer_timeout_ms=-1, auto_start=True, reset_offset_on_start=False, compacted_topic=False, generation_id=-1, consumer_id='', deserializer=None, reset_offset_on_fetch=True)¶
A non-balancing consumer for Kafka
Stop consumption and workers when object is deleted
__init__(topic, cluster, consumer_group=None, partitions=None, fetch_message_max_bytes=1048576, num_consumer_fetchers=1, auto_commit_enable=False, auto_commit_interval_ms=60000, queued_max_messages=2000, fetch_min_bytes=1, fetch_error_backoff_ms=500, fetch_wait_max_ms=100, offsets_channel_backoff_ms=1000, offsets_commit_max_retries=5, auto_offset_reset=-2, consumer_timeout_ms=-1, auto_start=True, reset_offset_on_start=False, compacted_topic=False, generation_id=-1, consumer_id='', deserializer=None, reset_offset_on_fetch=True)¶
Create a SimpleConsumer.
Settings and default values are taken from the Scala consumer implementation. Consumer group is included because it’s necessary for offset management, but doesn’t imply that this is a balancing consumer. Use a BalancedConsumer for that.
- topic (
pykafka.topic.Topic) – The topic this consumer should consume
- cluster (
pykafka.cluster.Cluster) – The cluster to which this consumer should connect
- consumer_group (str) – The name of the consumer group this consumer should use for offset committing and fetching.
- partitions (Iterable of
pykafka.partition.Partition) – Existing partitions to which to connect
- fetch_message_max_bytes (int) – The number of bytes of messages to attempt to fetch
- num_consumer_fetchers (int) – The number of workers used to make FetchRequests
- auto_commit_enable (bool) – If true, periodically commit to kafka the offset of messages already fetched by this consumer. This also requires that consumer_group is not None.
- auto_commit_interval_ms (int) – The frequency (in milliseconds) at which the consumer offsets are committed to kafka. This setting is ignored if auto_commit_enable is False.
- queued_max_messages (int) – Maximum number of messages buffered for consumption per partition
- fetch_min_bytes (int) – The minimum amount of data (in bytes) the server should return for a fetch request. If insufficient data is available the request will block until sufficient data is available.
- fetch_error_backoff_ms (int) – The amount of time (in milliseconds) that the consumer should wait before retrying after an error. Errors include absence of data (RD_KAFKA_RESP_ERR__PARTITION_EOF), so this can slow a normal fetch scenario. Only used by the native consumer (RdKafkaSimpleConsumer).
- fetch_wait_max_ms (int) – The maximum amount of time (in milliseconds) the server will block before answering the fetch request if there isn’t sufficient data to immediately satisfy fetch_min_bytes.
- offsets_channel_backoff_ms (int) – Backoff time (in milliseconds) to retry offset commits/fetches
- offsets_commit_max_retries (int) – Retry the offset commit up to this many times on failure.
- auto_offset_reset (
pykafka.common.OffsetType) – What to do if an offset is out of range. This setting indicates how to reset the consumer’s internal offset counter when an OffsetOutOfRangeError is encountered.
- consumer_timeout_ms (int) – Amount of time (in milliseconds) the consumer may spend without messages available for consumption before returning None.
- auto_start (bool) – Whether the consumer should begin communicating with kafka after __init__ is complete. If false, communication can be started with start().
- reset_offset_on_start (bool) – Whether the consumer should reset its internal offset counter to self._auto_offset_reset and commit that offset immediately upon starting up
- compacted_topic (bool) – Set to read from a compacted topic. Forces consumer to use less stringent message ordering logic because compacted topics do not provide offsets in strict incrementing order.
- generation_id (int) – Deprecated::2.7 Do not set if directly instantiating SimpleConsumer. The generation id with which to make group requests
- consumer_id (bytes) – Deprecated::2.7 Do not set if directly instantiating SimpleConsumer. The identifying string to use for this consumer on group requests
- deserializer (function) – A function defining how to deserialize messages returned from Kafka. A function with the signature d(value, partition_key) that returns a tuple of (deserialized_value, deserialized_partition_key). The arguments passed to this function are the bytes representations of a message’s value and partition key, and the returned data should be these fields transformed according to the client code’s serialization logic. See pykafka.utils.__init__ for stock implemtations.
- reset_offset_on_fetch (bool) – Whether to update offsets during fetch_offsets. Disable for read-only use cases to prevent side-effects.
- topic (
Yield an infinite stream of messages until the consumer times out
list of weak references to the object (if defined)
Commit offsets only if it’s time to do so
Set up the error handlers to use for partition errors.
Set the group coordinator for this consumer.
If a consumer group is not supplied to __init__, this method does nothing
Raises exceptions encountered on worker threads
Start the autocommitter thread
Start the fetcher threads
Update the consumer and cluster after an ERROR_CODE
Block until at least one queue has less than _queued_max_messages
Commit offsets for this consumer’s partitions
Uses the offset commit/fetch API
Get one message from the consumer.
- block (bool) – Whether to block while waiting for a message
- unblock_event (
threading.Event) – Return when the event is set()
Fetch new messages for all partitions
Create a FetchRequest for each broker and send it. Enqueue each of the returned messages in the approprate OwnedPartition.
Fetch offsets for this consumer’s topic
Uses the offset commit/fetch API
Returns: List of (id,
Return a map from partition id to held offset for each partition
A list of the partitions that this consumer consumes
Reset offsets for the specified partitions
Issue an OffsetRequest for each partition and set the appropriate returned offset in the consumer’s internal offset counter.
Parameters: partition_offsets (Sequence of tuples of the form (
pykafka.partition.Partition, int)) – (partition, timestamp_or_offset) pairs to reset where partition is the partition for which to reset the offset and timestamp_or_offset is EITHER the timestamp of the message whose offset the partition should have OR the new “most recently consumed” offset the partition should have
NOTE: If an instance of timestamp_or_offset is treated by kafka as an invalid offset timestamp, this function directly sets the consumer’s internal offset counter for that partition to that instance of timestamp_or_offset. This counter represents the offset most recently consumed. On the next fetch request, the consumer attempts to fetch messages starting from that offset plus one. See the following link for more information on what kafka treats as a valid offset timestamp: https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetRequest
Begin communicating with Kafka, including setting up worker threads
Fetches offsets, starts an offset autocommitter worker pool, and starts a message fetcher worker pool.
Flag all running workers for deletion.
The topic this consumer consumes