Producer(cluster, topic, partitioner=<function random_partitioner>, compression=0, max_retries=3, retry_backoff_ms=100, required_acks=1, ack_timeout_ms=10000, max_queued_messages=100000, min_queued_messages=70000, linger_ms=5000, block_on_queue_full=True, sync=False)¶
Implements asynchronous producer logic similar to the JVM driver.
It creates a thread of execution for each broker that is the leader of one or more of its topic’s partitions. Each of these threads (which may use threading or some other parallelism implementation like gevent) is associated with a queue that holds the messages that are waiting to be sent to that queue’s broker.
Context manager entry point - start the producer
__exit__(exc_type, exc_value, traceback)¶
Context manager exit point - stop the producer
__init__(cluster, topic, partitioner=<function random_partitioner>, compression=0, max_retries=3, retry_backoff_ms=100, required_acks=1, ack_timeout_ms=10000, max_queued_messages=100000, min_queued_messages=70000, linger_ms=5000, block_on_queue_full=True, sync=False)¶
Instantiate a new AsyncProducer
- cluster (
pykafka.cluster.Cluster) – The cluster to which to connect
- topic (
pykafka.topic.Topic) – The topic to which to produce messages
- partitioner (
pykafka.partitioners.BasePartitioner) – The partitioner to use during message production
- compression (
pykafka.common.CompressionType) – The type of compression to use.
- max_retries (int) – How many times to attempt to produce a given batch of messages before raising an error.
- retry_backoff_ms (int) – The amount of time (in milliseconds) to back off during produce request retries.
- required_acks (int) – The number of other brokers that must have committed the data to their log and acknowledged this to the leader before a request is considered complete
- ack_timeout_ms (int) – The amount of time (in milliseconds) to wait for acknowledgment of a produce request.
- max_queued_messages (int) – The maximum number of messages the producer can have waiting to be sent to the broker. If messages are sent faster than they can be delivered to the broker, the producer will either block or throw an exception based on the preference specified with block_on_queue_full.
- min_queued_messages (int) – The minimum number of messages the producer can have waiting in a queue before it flushes that queue to its broker (must be greater than 0).
- linger_ms (int) – This setting gives the upper bound on the delay for batching: once the producer gets min_queued_messages worth of messages for a broker, it will be sent immediately regardless of this setting. However, if we have fewer than this many messages accumulated for this partition we will ‘linger’ for the specified time waiting for more records to show up. linger_ms=0 indicates no lingering.
- block_on_queue_full (bool) – When the producer’s message queue for a broker contains max_queued_messages, we must either stop accepting new messages (block) or throw an error. If True, this setting indicates we should block until space is available in the queue. If False, we should throw an error immediately.
- sync (bool) – Whether calls to produce should wait for the message to send before returning
- cluster (
list of weak references to the object (if defined)
Enqueue a message for the relevant broker
Parameters: message_partition_tup (((bytes, bytes), int) tuple) – Message with partition assigned.
Raises exceptions encountered on worker threads
Send the produce request to the broker and handle the response.
- message_batch (iterable of ((key, value), partition_id) tuples) – An iterable of messages to send
- owned_broker (
pykafka.producer.OwnedBroker) – The broker to which to send the request
Instantiate one OwnedBroker per broker
If there are already OwnedBrokers instantiated, safely stop and flush them before creating new ones.
Update the producer and cluster after an ERROR_CODE
Also re-produces messages that were in queues at the time the update was triggered
Block until all pending messages are sent
“Pending” messages are those that have been used in calls to produce and have not yet been dequeued and sent to the broker
Produce a message.
- message (bytes) – The message to produce (use None to send null)
- partition_key (bytes) – The key to use when deciding which partition to send this message to
Set up data structures and start worker threads
Mark the producer as stopped