pykafka.producer

class pykafka.producer.Producer(cluster, topic, partitioner=<function random_partitioner>, compression=0, max_retries=3, retry_backoff_ms=100, required_acks=1, ack_timeout_ms=10000, max_queued_messages=100000, min_queued_messages=70000, linger_ms=5000, block_on_queue_full=True, sync=False)

Bases: object

Implements asynchronous producer logic similar to the JVM driver.

It creates a thread of execution for each broker that is the leader of one or more of its topic’s partitions. Each of these threads (which may use threading or some other parallelism implementation like gevent) is associated with a queue that holds the messages that are waiting to be sent to that queue’s broker.

__enter__()

Context manager entry point - start the producer

__exit__(exc_type, exc_value, traceback)

Context manager exit point - stop the producer

__init__(cluster, topic, partitioner=<function random_partitioner>, compression=0, max_retries=3, retry_backoff_ms=100, required_acks=1, ack_timeout_ms=10000, max_queued_messages=100000, min_queued_messages=70000, linger_ms=5000, block_on_queue_full=True, sync=False)

Instantiate a new AsyncProducer

Parameters:
  • cluster (pykafka.cluster.Cluster) – The cluster to which to connect
  • topic (pykafka.topic.Topic) – The topic to which to produce messages
  • partitioner (pykafka.partitioners.BasePartitioner) – The partitioner to use during message production
  • compression (pykafka.common.CompressionType) – The type of compression to use.
  • max_retries (int) – How many times to attempt to produce a given batch of messages before raising an error.
  • retry_backoff_ms (int) – The amount of time (in milliseconds) to back off during produce request retries.
  • required_acks (int) – The number of other brokers that must have committed the data to their log and acknowledged this to the leader before a request is considered complete
  • ack_timeout_ms (int) – The amount of time (in milliseconds) to wait for acknowledgment of a produce request.
  • max_queued_messages (int) – The maximum number of messages the producer can have waiting to be sent to the broker. If messages are sent faster than they can be delivered to the broker, the producer will either block or throw an exception based on the preference specified with block_on_queue_full.
  • min_queued_messages (int) – The minimum number of messages the producer can have waiting in a queue before it flushes that queue to its broker (must be greater than 0).
  • linger_ms (int) – This setting gives the upper bound on the delay for batching: once the producer gets min_queued_messages worth of messages for a broker, it will be sent immediately regardless of this setting. However, if we have fewer than this many messages accumulated for this partition we will ‘linger’ for the specified time waiting for more records to show up. linger_ms=0 indicates no lingering.
  • block_on_queue_full (bool) – When the producer’s message queue for a broker contains max_queued_messages, we must either stop accepting new messages (block) or throw an error. If True, this setting indicates we should block until space is available in the queue. If False, we should throw an error immediately.
  • sync (bool) – Whether calls to produce should wait for the message to send before returning
__weakref__

list of weak references to the object (if defined)

_produce(message_partition_tup)

Enqueue a message for the relevant broker

Parameters:message_partition_tup (((bytes, bytes), int) tuple) – Message with partition assigned.
_raise_worker_exceptions()

Raises exceptions encountered on worker threads

_send_request(message_batch, owned_broker)

Send the produce request to the broker and handle the response.

Parameters:
  • message_batch (iterable of ((key, value), partition_id) tuples) – An iterable of messages to send
  • owned_broker (pykafka.producer.OwnedBroker) – The broker to which to send the request
_setup_owned_brokers()

Instantiate one OwnedBroker per broker

If there are already OwnedBrokers instantiated, safely stop and flush them before creating new ones.

_update()

Update the producer and cluster after an ERROR_CODE

Also re-produces messages that were in queues at the time the update was triggered

_wait_all()

Block until all pending messages are sent

“Pending” messages are those that have been used in calls to produce and have not yet been dequeued and sent to the broker

produce(message, partition_key=None)

Produce a message.

Parameters:
  • message (bytes) – The message to produce (use None to send null)
  • partition_key (bytes) – The key to use when deciding which partition to send this message to
start()

Set up data structures and start worker threads

stop()

Mark the producer as stopped