pykafka.simpleconsumer

class pykafka.simpleconsumer.SimpleConsumer(topic, cluster, consumer_group=None, partitions=None, fetch_message_max_bytes=1048576, num_consumer_fetchers=1, auto_commit_enable=False, auto_commit_interval_ms=60000, queued_max_messages=2000, fetch_min_bytes=1, fetch_error_backoff_ms=500, fetch_wait_max_ms=100, offsets_channel_backoff_ms=1000, offsets_commit_max_retries=5, auto_offset_reset=-2, consumer_timeout_ms=-1, auto_start=True, reset_offset_on_start=False, compacted_topic=False, generation_id=-1, consumer_id='')

Bases: object

A non-balancing consumer for Kafka

__del__()

Stop consumption and workers when object is deleted

__init__(topic, cluster, consumer_group=None, partitions=None, fetch_message_max_bytes=1048576, num_consumer_fetchers=1, auto_commit_enable=False, auto_commit_interval_ms=60000, queued_max_messages=2000, fetch_min_bytes=1, fetch_error_backoff_ms=500, fetch_wait_max_ms=100, offsets_channel_backoff_ms=1000, offsets_commit_max_retries=5, auto_offset_reset=-2, consumer_timeout_ms=-1, auto_start=True, reset_offset_on_start=False, compacted_topic=False, generation_id=-1, consumer_id='')

Create a SimpleConsumer.

Settings and default values are taken from the Scala consumer implementation. Consumer group is included because it’s necessary for offset management, but doesn’t imply that this is a balancing consumer. Use a BalancedConsumer for that.

Parameters:
  • topic (pykafka.topic.Topic) – The topic this consumer should consume
  • cluster (pykafka.cluster.Cluster) – The cluster to which this consumer should connect
  • consumer_group (bytes) – The name of the consumer group this consumer should use for offset committing and fetching.
  • partitions (Iterable of pykafka.partition.Partition) – Existing partitions to which to connect
  • fetch_message_max_bytes (int) – The number of bytes of messages to attempt to fetch
  • num_consumer_fetchers (int) – The number of workers used to make FetchRequests
  • auto_commit_enable (bool) – If true, periodically commit to kafka the offset of messages already fetched by this consumer. This also requires that consumer_group is not None.
  • auto_commit_interval_ms (int) – The frequency (in milliseconds) at which the consumer offsets are committed to kafka. This setting is ignored if auto_commit_enable is False.
  • queued_max_messages (int) – Maximum number of messages buffered for consumption per partition
  • fetch_min_bytes (int) – The minimum amount of data (in bytes) the server should return for a fetch request. If insufficient data is available the request will block until sufficient data is available.
  • fetch_error_backoff_ms (int) – The amount of time (in milliseconds) that the consumer should wait before retrying after an error. Errors include absence of data (RD_KAFKA_RESP_ERR__PARTITION_EOF), so this can slow a normal fetch scenario. Only used by the native consumer (RdKafkaSimpleConsumer).
  • fetch_wait_max_ms (int) – The maximum amount of time (in milliseconds) the server will block before answering the fetch request if there isn’t sufficient data to immediately satisfy fetch_min_bytes.
  • offsets_channel_backoff_ms (int) – Backoff time (in milliseconds) to retry offset commits/fetches
  • offsets_commit_max_retries (int) – Retry the offset commit up to this many times on failure.
  • auto_offset_reset (pykafka.common.OffsetType) – What to do if an offset is out of range. This setting indicates how to reset the consumer’s internal offset counter when an OffsetOutOfRangeError is encountered.
  • consumer_timeout_ms (int) – Amount of time (in milliseconds) the consumer may spend without messages available for consumption before returning None.
  • auto_start (bool) – Whether the consumer should begin communicating with kafka after __init__ is complete. If false, communication can be started with start().
  • reset_offset_on_start (bool) – Whether the consumer should reset its internal offset counter to self._auto_offset_reset and commit that offset immediately upon starting up
  • compacted_topic (bool) – Set to read from a compacted topic. Forces consumer to use less stringent message ordering logic because compacted topics do not provide offsets in stict incrementing order.
  • generation_id (int) – The generation id with which to make group requests
  • consumer_id (bytes) – The identifying string to use for this consumer on group requests
__iter__()

Yield an infinite stream of messages until the consumer times out

__weakref__

list of weak references to the object (if defined)

_auto_commit()

Commit offsets only if it’s time to do so

_build_default_error_handlers()

Set up the error handlers to use for partition errors.

_discover_group_coordinator()

Set the group coordinator for this consumer.

If a consumer group is not supplied to __init__, this method does nothing

_raise_worker_exceptions()

Raises exceptions encountered on worker threads

_setup_autocommit_worker()

Start the autocommitter thread

_setup_fetch_workers()

Start the fetcher threads

_update()

Update the consumer and cluster after an ERROR_CODE

_wait_for_slot_available()

Block until at least one queue has less than _queued_max_messages

commit_offsets()

Commit offsets for this consumer’s partitions

Uses the offset commit/fetch API

consume(block=True)

Get one message from the consumer.

Parameters:block (bool) – Whether to block while waiting for a message
fetch()

Fetch new messages for all partitions

Create a FetchRequest for each broker and send it. Enqueue each of the returned messages in the approprate OwnedPartition.

fetch_offsets()

Fetch offsets for this consumer’s topic

Uses the offset commit/fetch API

Returns:List of (id, pykafka.protocol.OffsetFetchPartitionResponse) tuples
held_offsets

Return a map from partition id to held offset for each partition

partitions

A list of the partitions that this consumer consumes

reset_offsets(partition_offsets=None)

Reset offsets for the specified partitions

Issue an OffsetRequest for each partition and set the appropriate returned offset in the consumer’s internal offset counter.

Parameters:partition_offsets (Sequence of tuples of the form (pykafka.partition.Partition, int)) – (partition, timestamp_or_offset) pairs to reset where partition is the partition for which to reset the offset and timestamp_or_offset is EITHER the timestamp of the message whose offset the partition should have OR the new “most recently consumed” offset the partition should have

NOTE: If an instance of timestamp_or_offset is treated by kafka as an invalid offset timestamp, this function directly sets the consumer’s internal offset counter for that partition to that instance of timestamp_or_offset. This counter represents the offset most recently consumed. On the next fetch request, the consumer attempts to fetch messages starting from that offset plus one. See the following link for more information on what kafka treats as a valid offset timestamp: https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetRequest

start()

Begin communicating with Kafka, including setting up worker threads

Fetches offsets, starts an offset autocommitter worker pool, and starts a message fetcher worker pool.

stop()

Flag all running workers for deletion.

topic

The topic this consumer consumes