pykafka.producer¶
-
class
pykafka.producer.
Producer
(cluster, topic, partitioner=<function random_partitioner>, compression=0, max_retries=3, retry_backoff_ms=100, required_acks=1, ack_timeout_ms=10000, max_queued_messages=100000, min_queued_messages=70000, linger_ms=5000, block_on_queue_full=True, sync=False)¶ Bases:
object
Implements asynchronous producer logic similar to the JVM driver.
It creates a thread of execution for each broker that is the leader of one or more of its topic’s partitions. Each of these threads (which may use threading or some other parallelism implementation like gevent) is associated with a queue that holds the messages that are waiting to be sent to that queue’s broker.
-
__enter__
()¶ Context manager entry point - start the producer
-
__exit__
(exc_type, exc_value, traceback)¶ Context manager exit point - stop the producer
-
__init__
(cluster, topic, partitioner=<function random_partitioner>, compression=0, max_retries=3, retry_backoff_ms=100, required_acks=1, ack_timeout_ms=10000, max_queued_messages=100000, min_queued_messages=70000, linger_ms=5000, block_on_queue_full=True, sync=False)¶ Instantiate a new AsyncProducer
Parameters: - cluster (
pykafka.cluster.Cluster
) – The cluster to which to connect - topic (
pykafka.topic.Topic
) – The topic to which to produce messages - partitioner (
pykafka.partitioners.BasePartitioner
) – The partitioner to use during message production - compression (
pykafka.common.CompressionType
) – The type of compression to use. - max_retries (int) – How many times to attempt to produce messages before raising an error.
- retry_backoff_ms (int) – The amount of time (in milliseconds) to back off during produce request retries.
- required_acks (int) – The number of other brokers that must have committed the data to their log and acknowledged this to the leader before a request is considered complete
- ack_timeout_ms (int) – The amount of time (in milliseconds) to wait for acknowledgment of a produce request.
- max_queued_messages (int) – The maximum number of messages the producer can have waiting to be sent to the broker. If messages are sent faster than they can be delivered to the broker, the producer will either block or throw an exception based on the preference specified with block_on_queue_full.
- min_queued_messages (int) – The minimum number of messages the producer can have waiting in a queue before it flushes that queue to its broker.
- linger_ms (int) – This setting gives the upper bound on the delay for batching: once the producer gets min_queued_messages worth of messages for a broker, it will be sent immediately regardless of this setting. However, if we have fewer than this many messages accumulated for this partition we will ‘linger’ for the specified time waiting for more records to show up. linger_ms=0 indicates no lingering.
- block_on_queue_full (bool) – When the producer’s message queue for a broker contains max_queued_messages, we must either stop accepting new messages (block) or throw an error. If True, this setting indicates we should block until space is available in the queue. If False, we should throw an error immediately.
- sync (bool) – Whether calls to produce should wait for the message to send before returning
- cluster (
-
__weakref__
¶ list of weak references to the object (if defined)
-
_prepare_request
(message_batch, owned_broker)¶ Prepare a request and send it to the broker
Parameters: - message_batch (iterable of ((key, value), partition_id) tuples) – An iterable of messages to send
- owned_broker (
pykafka.producer.OwnedBroker
) – The OwnedBroker to which to send the request
-
_produce
(message_partition_tup)¶ Enqueue a message for the relevant broker
Parameters: message_partition_tup (((str, str), int) tuple) – Message with partition assigned.
-
_send_request
(req, attempt, owned_broker)¶ Send the produce request to the broker and handle the response.
Parameters: - req (
pykafka.protocol.ProduceRequest
) – The produce request to send - attempt (int) – The current attempt count. Used for retry logic
- owned_broker (
pykafka.producer.OwnedBroker
) – The broker to which to send the request
- req (
-
_update_leaders
()¶ - Ensure each message in each queue is in the queue owned by its
- partition’s leader
This function empties all broker queues, maps their messages to the current leaders for their partitions, and enqueues the messages in the appropriate queues.
-
_wait_all
()¶ Block until all pending messages are sent
“Pending” messages are those that have been used in calls to produce and have not yet been dequeued and sent to the broker
-
produce
(message, partition_key=None)¶ Produce a message.
Parameters: - message (str) – The message to produce
- partition_key (str) – The key to use when deciding which partition to send this message to
-
raise_worker_exceptions
()¶ Raises exceptions encountered on worker threads
-
start
()¶ Set up data structures and start worker threads
-
stop
()¶ Mark the producer as stopped
-