pykafka.simpleconsumer

class pykafka.simpleconsumer.SimpleConsumer(topic, cluster, consumer_group=None, partitions=None, fetch_message_max_bytes=1048576, num_consumer_fetchers=1, auto_commit_enable=False, auto_commit_interval_ms=60000, queued_max_messages=2000, fetch_min_bytes=1, fetch_wait_max_ms=100, offsets_channel_backoff_ms=1000, offsets_commit_max_retries=5, auto_offset_reset=-2, consumer_timeout_ms=-1, auto_start=True, reset_offset_on_start=False)

A non-balancing consumer for Kafka

__del__()

Stop consumption and workers when object is deleted

__init__(topic, cluster, consumer_group=None, partitions=None, fetch_message_max_bytes=1048576, num_consumer_fetchers=1, auto_commit_enable=False, auto_commit_interval_ms=60000, queued_max_messages=2000, fetch_min_bytes=1, fetch_wait_max_ms=100, offsets_channel_backoff_ms=1000, offsets_commit_max_retries=5, auto_offset_reset=-2, consumer_timeout_ms=-1, auto_start=True, reset_offset_on_start=False)

Create a SimpleConsumer.

Settings and default values are taken from the Scala consumer implementation. Consumer group is included because it’s necessary for offset management, but doesn’t imply that this is a balancing consumer. Use a BalancedConsumer for that.

Parameters:
  • topic (pykafka.topic.Topic) – The topic this consumer should consume
  • cluster (pykafka.cluster.Cluster) – The cluster to which this consumer should connect
  • consumer_group (bytes) – The name of the consumer group this consumer should use for offset committing and fetching.
  • partitions (Iterable of pykafka.partition.Partition) – Existing partitions to which to connect
  • fetch_message_max_bytes (int) – The number of bytes of messages to attempt to fetch
  • num_consumer_fetchers (int) – The number of workers used to make FetchRequests
  • auto_commit_enable (bool) – If true, periodically commit to kafka the offset of messages already fetched by this consumer. This also requires that consumer_group is not None.
  • auto_commit_interval_ms (int) – The frequency (in milliseconds) at which the consumer offsets are committed to kafka. This setting is ignored if auto_commit_enable is False.
  • queued_max_messages (int) – Maximum number of messages buffered for consumption
  • fetch_min_bytes (int) – The minimum amount of data (in bytes) the server should return for a fetch request. If insufficient data is available the request will block until sufficient data is available.
  • fetch_wait_max_ms (int) – The maximum amount of time (in milliseconds) the server will block before answering the fetch request if there isn’t sufficient data to immediately satisfy fetch_min_bytes.
  • offsets_channel_backoff_ms (int) – Backoff time (in milliseconds) to retry offset commits/fetches
  • offsets_commit_max_retries (int) – Retry the offset commit up to this many times on failure.
  • auto_offset_reset (pykafka.common.OffsetType) – What to do if an offset is out of range. This setting indicates how to reset the consumer’s internal offset counter when an OffsetOutOfRangeError is encountered.
  • consumer_timeout_ms (int) – Amount of time (in milliseconds) the consumer may spend without messages available for consumption before returning None.
  • auto_start (bool) – Whether the consumer should begin communicating with kafka after __init__ is complete. If false, communication can be started with start().
  • reset_offset_on_start (bool) – Whether the consumer should reset its internal offset counter to self._auto_offset_reset and commit that offset immediately upon starting up
__iter__()

Yield an infinite stream of messages until the consumer times out

_auto_commit()

Commit offsets only if it’s time to do so

_build_default_error_handlers()

Set up the error handlers to use for partition errors.

_discover_offset_manager()

Set the offset manager for this consumer.

If a consumer group is not supplied to __init__, this method does nothing

_raise_worker_exceptions()

Raises exceptions encountered on worker threads

_setup_autocommit_worker()

Start the autocommitter thread

_setup_fetch_workers()

Start the fetcher threads

_update()

Update the consumer and cluster after an ERROR_CODE

commit_offsets()

Commit offsets for this consumer’s partitions

Uses the offset commit/fetch API

consume(block=True)

Get one message from the consumer.

Parameters:block (bool) – Whether to block while waiting for a message
fetch()

Fetch new messages for all partitions

Create a FetchRequest for each broker and send it. Enqueue each of the returned messages in the approprate OwnedPartition.

fetch_offsets()

Fetch offsets for this consumer’s topic

Uses the offset commit/fetch API

Returns:List of (id, pykafka.protocol.OffsetFetchPartitionResponse) tuples
held_offsets

Return a map from partition id to held offset for each partition

partitions

A list of the partitions that this consumer consumes

reset_offsets(partition_offsets=None)

Reset offsets for the specified partitions

Issue an OffsetRequest for each partition and set the appropriate returned offset in the consumer’s internal offset counter.

Parameters:partition_offsets (Iterable of (pykafka.partition.Partition, int)) – (partition, timestamp_or_offset) pairs to reset where partition is the partition for which to reset the offset and timestamp_or_offset is EITHER the timestamp of the message whose offset the partition should have OR the new offset the partition should have

NOTE: If an instance of timestamp_or_offset is treated by kafka as an invalid offset timestamp, this function directly sets the consumer’s internal offset counter for that partition to that instance of timestamp_or_offset. On the next fetch request, the consumer attempts to fetch messages starting from that offset. See the following link for more information on what kafka treats as a valid offset timestamp: https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetRequest

start()

Begin communicating with Kafka, including setting up worker threads

Fetches offsets, starts an offset autocommitter worker pool, and starts a message fetcher worker pool.

stop()

Flag all running workers for deletion.

topic

The topic this consumer consumes