pykafka.producer

class pykafka.producer.Producer(cluster, topic, partitioner=<function random_partitioner>, compression=0, max_retries=3, retry_backoff_ms=100, required_acks=1, ack_timeout_ms=10000, max_queued_messages=100000, min_queued_messages=70000, linger_ms=5000, block_on_queue_full=True, max_request_size=1000012, sync=False, delivery_reports=False, auto_start=True)

Bases: object

Implements asynchronous producer logic similar to the JVM driver.

It creates a thread of execution for each broker that is the leader of one or more of its topic’s partitions. Each of these threads (which may use threading or some other parallelism implementation like gevent) is associated with a queue that holds the messages that are waiting to be sent to that queue’s broker.

__enter__()

Context manager entry point - start the producer

__exit__(exc_type, exc_value, traceback)

Context manager exit point - stop the producer

__init__(cluster, topic, partitioner=<function random_partitioner>, compression=0, max_retries=3, retry_backoff_ms=100, required_acks=1, ack_timeout_ms=10000, max_queued_messages=100000, min_queued_messages=70000, linger_ms=5000, block_on_queue_full=True, max_request_size=1000012, sync=False, delivery_reports=False, auto_start=True)

Instantiate a new AsyncProducer

Parameters:
  • cluster (pykafka.cluster.Cluster) – The cluster to which to connect
  • topic (pykafka.topic.Topic) – The topic to which to produce messages
  • partitioner (pykafka.partitioners.BasePartitioner) – The partitioner to use during message production
  • compression (pykafka.common.CompressionType) – The type of compression to use.
  • max_retries (int) – How many times to attempt to produce a given batch of messages before raising an error. Allowing retries will potentially change the ordering of records because if two records are sent to a single partition, and the first fails and is retried but the second succeeds, then the second record may appear first. If you want to completely disallow message reordering, use sync=True.
  • retry_backoff_ms (int) – The amount of time (in milliseconds) to back off during produce request retries. This does not equal the total time spent between message send attempts, since that number can be influenced by other kwargs, including linger_ms and socket_timeout_ms.
  • required_acks (int) – The number of other brokers that must have committed the data to their log and acknowledged this to the leader before a request is considered complete
  • ack_timeout_ms (int) – The amount of time (in milliseconds) to wait for acknowledgment of a produce request.
  • max_queued_messages (int) – The maximum number of messages the producer can have waiting to be sent to the broker. If messages are sent faster than they can be delivered to the broker, the producer will either block or throw an exception based on the preference specified with block_on_queue_full.
  • min_queued_messages (int) – The minimum number of messages the producer can have waiting in a queue before it flushes that queue to its broker (must be greater than 0). This paramater can be used to control the number of messages sent in one batch during async production. This parameter is automatically overridden to 1 when sync=True.
  • linger_ms (int) – This setting gives the upper bound on the delay for batching: once the producer gets min_queued_messages worth of messages for a broker, it will be sent immediately regardless of this setting. However, if we have fewer than this many messages accumulated for this partition we will ‘linger’ for the specified time waiting for more records to show up. linger_ms=0 indicates no lingering.
  • block_on_queue_full (bool) – When the producer’s message queue for a broker contains max_queued_messages, we must either stop accepting new messages (block) or throw an error. If True, this setting indicates we should block until space is available in the queue. If False, we should throw an error immediately.
  • max_request_size (int) – The maximum size of a request in bytes. This is also effectively a cap on the maximum record size. Note that the server has its own cap on record size which may be different from this. This setting will limit the number of record batches the producer will send in a single request to avoid sending huge requests.
  • sync (bool) – Whether calls to produce should wait for the message to send before returning. If True, an exception will be raised from produce() if delivery to kafka failed.
  • delivery_reports (bool) – If set to True, the producer will maintain a thread-local queue on which delivery reports are posted for each message produced. These must regularly be retrieved through get_delivery_report(), which returns a 2-tuple of pykafka.protocol.Message and either None (for success) or an Exception in case of failed delivery to kafka. If get_delivery_report() is not called regularly with this setting enabled, memory usage will grow unbounded. This setting is ignored when sync=True.
  • auto_start (bool) – Whether the producer should begin communicating with kafka after __init__ is complete. If false, communication can be started with start().
__weakref__

list of weak references to the object (if defined)

_produce(message)

Enqueue a message for the relevant broker

Parameters:message (pykafka.protocol.Message) – Message with valid partition_id, ready to be sent
_raise_worker_exceptions()

Raises exceptions encountered on worker threads

_send_request(message_batch, owned_broker)

Send the produce request to the broker and handle the response.

Parameters:
  • message_batch (iterable of pykafka.protocol.Message) – An iterable of messages to send
  • owned_broker (pykafka.producer.OwnedBroker) – The broker to which to send the request
_setup_owned_brokers()

Instantiate one OwnedBroker per broker

If there are already OwnedBrokers instantiated, safely stop and flush them before creating new ones.

_update()

Update the producer and cluster after an ERROR_CODE

Also re-produces messages that were in queues at the time the update was triggered

_wait_all()

Block until all pending messages are sent

“Pending” messages are those that have been used in calls to produce and have not yet been dequeued and sent to the broker

get_delivery_report(block=True, timeout=None)

Fetch delivery reports for messages produced on the current thread

Returns 2-tuples of a pykafka.protocol.Message and either None (for successful deliveries) or Exception (for failed deliveries). This interface is only available if you enabled delivery_reports on init (and you did not use sync=True)

Parameters:
  • block (bool) – Whether to block on dequeueing a delivery report
  • timeout – How long (in seconds) to block before returning None

;type timeout: int

produce(message, partition_key=None, timestamp=None)

Produce a message.

Parameters:
  • message (bytes) – The message to produce (use None to send null)
  • partition_key (bytes) – The key to use when deciding which partition to send this message to
  • timestamp (datetime.datetime) – The timestamp at which the message is produced (requires broker_version >= 0.10.0)
Returns:

The pykafka.protocol.Message instance that was added to the internal message queue

start()

Set up data structures and start worker threads

stop()

Mark the producer as stopped, and wait until all messages to be sent