SimpleConsumer(topic, cluster, consumer_group=None, partitions=None, fetch_message_max_bytes=1048576, num_consumer_fetchers=1, auto_commit_enable=False, auto_commit_interval_ms=60000, queued_max_messages=2000, fetch_min_bytes=1, fetch_wait_max_ms=100, offsets_channel_backoff_ms=1000, offsets_commit_max_retries=5, auto_offset_reset=-1, consumer_timeout_ms=-1, auto_start=True, reset_offset_on_start=False)¶
A non-balancing consumer for Kafka
Stop consumption and workers when object is deleted
__init__(topic, cluster, consumer_group=None, partitions=None, fetch_message_max_bytes=1048576, num_consumer_fetchers=1, auto_commit_enable=False, auto_commit_interval_ms=60000, queued_max_messages=2000, fetch_min_bytes=1, fetch_wait_max_ms=100, offsets_channel_backoff_ms=1000, offsets_commit_max_retries=5, auto_offset_reset=-1, consumer_timeout_ms=-1, auto_start=True, reset_offset_on_start=False)¶
Create a SimpleConsumer.
Settings and default values are taken from the Scala consumer implementation. Consumer group is included because it’s necessary for offset management, but doesn’t imply that this is a balancing consumer. Use a BalancedConsumer for that.
- topic (
pykafka.topic.Topic) – The topic this consumer should consume
- cluster (
pykafka.cluster.Cluster) – The cluster to which this consumer should connect
- consumer_group (str) – The name of the consumer group this consumer should use for offset committing and fetching.
- partitions (Iterable of
pykafka.partition.Partition) – Existing partitions to which to connect
- fetch_message_max_bytes (int) – The number of bytes of messages to attempt to fetch
- num_consumer_fetchers (int) – The number of workers used to make FetchRequests
- auto_commit_enable (bool) – If true, periodically commit to kafka the offset of messages already fetched by this consumer. This also requires that consumer_group is not None.
- auto_commit_interval_ms (int) – The frequency (in milliseconds) at which the consumer offsets are committed to kafka. This setting is ignored if auto_commit_enable is False.
- queued_max_messages (int) – Maximum number of messages buffered for consumption
- fetch_min_bytes (int) – The minimum amount of data (in bytes) the server should return for a fetch request. If insufficient data is available the request will block until sufficient data is available.
- fetch_wait_max_ms (int) – The maximum amount of time (in milliseconds) the server will block before answering the fetch request if there isn’t sufficient data to immediately satisfy fetch_min_bytes.
- offsets_channel_backoff_ms (int) – Backoff time (in milliseconds) to retry offset commits/fetches
- offsets_commit_max_retries (int) – Retry the offset commit up to this many times on failure.
- auto_offset_reset (
pykafka.common.OffsetType) – What to do if an offset is out of range. This setting indicates how to reset the consumer’s internal offset counter when an OffsetOutOfRangeError is encountered.
- consumer_timeout_ms (int) – Amount of time (in milliseconds) the consumer may spend without messages available for consumption before returning None.
- auto_start (bool) – Whether the consumer should begin communicating with kafka after __init__ is complete. If false, communication can be started with start().
- reset_offset_on_start (bool) – Whether the consumer should reset its internal offset counter to self._auto_offset_reset and commit that offset immediately upon starting up
- topic (
Yield an infinite stream of messages until the consumer times out
Commit offsets only if it’s time to do so
Set up the error handlers to use for partition errors.
Set the offset manager for this consumer.
If a consumer group is not supplied to __init__, this method does nothing
Start the autocommitter thread
Start the fetcher threads
Commit offsets for this consumer’s partitions
Uses the offset commit/fetch API
Get one message from the consumer.
Parameters: block (bool) – Whether to block while waiting for a message
Fetch new messages for all partitions
Create a FetchRequest for each broker and send it. Enqueue each of the returned messages in the approprate OwnedPartition.
Fetch offsets for this consumer’s topic
Uses the offset commit/fetch API
Returns: List of (id,
Return a map from partition id to held offset for each partition
A list of the partitions that this consumer consumes
Reset offsets for the specified partitions
Issue an OffsetRequest for each partition and set the appropriate returned offset in the consumer’s internal offset counter.
Parameters: partition_offsets (Iterable of (
pykafka.partition.Partition, int)) – (partition, timestamp_or_offset) pairs to reset where partition is the partition for which to reset the offset and timestamp_or_offset is EITHER the timestamp of the message whose offset the partition should have OR the new offset the partition should have
NOTE: If an instance of timestamp_or_offset is treated by kafka as an invalid offset timestamp, this function directly sets the consumer’s internal offset counter for that partition to that instance of timestamp_or_offset. On the next fetch request, the consumer attempts to fetch messages starting from that offset. See the following link for more information on what kafka treats as a valid offset timestamp: https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetRequest
Begin communicating with Kafka, including setting up worker threads
Fetches offsets, starts an offset autocommitter worker pool, and starts a message fetcher worker pool.
Flag all running workers for deletion.
The topic this consumer consumes