pykafka.producer

class pykafka.producer.Producer(cluster, topic, partitioner=None, compression=0, max_retries=3, retry_backoff_ms=100, required_acks=1, ack_timeout_ms=10000, max_queued_messages=100000, min_queued_messages=70000, linger_ms=5000, queue_empty_timeout_ms=0, block_on_queue_full=True, max_request_size=1000012, sync=False, delivery_reports=False, pending_timeout_ms=5000, auto_start=True, serializer=None)

Bases: object

Implements asynchronous producer logic similar to the JVM driver.

It creates a thread of execution for each broker that is the leader of one or more of its topic’s partitions. Each of these threads (which may use threading or some other parallelism implementation like gevent) is associated with a queue that holds the messages that are waiting to be sent to that queue’s broker.

__enter__()

Context manager entry point - start the producer

__exit__(exc_type, exc_value, traceback)

Context manager exit point - stop the producer

__init__(cluster, topic, partitioner=None, compression=0, max_retries=3, retry_backoff_ms=100, required_acks=1, ack_timeout_ms=10000, max_queued_messages=100000, min_queued_messages=70000, linger_ms=5000, queue_empty_timeout_ms=0, block_on_queue_full=True, max_request_size=1000012, sync=False, delivery_reports=False, pending_timeout_ms=5000, auto_start=True, serializer=None)

Instantiate a new AsyncProducer

Parameters:
  • cluster (pykafka.cluster.Cluster) – The cluster to which to connect
  • topic (pykafka.topic.Topic) – The topic to which to produce messages
  • partitioner (pykafka.partitioners.BasePartitioner) – The partitioner to use during message production
  • compression (pykafka.common.CompressionType) – The type of compression to use.
  • max_retries (int) – How many times to attempt to produce a given batch of messages before raising an error. Allowing retries will potentially change the ordering of records because if two records are sent to a single partition, and the first fails and is retried but the second succeeds, then the second record may appear first. If you want to completely disallow message reordering, use sync=True.
  • retry_backoff_ms (int) – The amount of time (in milliseconds) to back off during produce request retries. This does not equal the total time spent between message send attempts, since that number can be influenced by other kwargs, including linger_ms and socket_timeout_ms.
  • required_acks (int) – The number of other brokers that must have committed the data to their log and acknowledged this to the leader before a request is considered complete
  • ack_timeout_ms (int) – The amount of time (in milliseconds) to wait for acknowledgment of a produce request on the server.
  • max_queued_messages (int) – The maximum number of messages the producer can have waiting to be sent to the broker. If messages are sent faster than they can be delivered to the broker, the producer will either block or throw an exception based on the preference specified with block_on_queue_full.
  • min_queued_messages (int) – The minimum number of messages the producer can have waiting in a queue before it flushes that queue to its broker (must be greater than 0). This paramater can be used to control the number of messages sent in one batch during async production. This parameter is automatically overridden to 1 when sync=True.
  • linger_ms (int) – This setting gives the upper bound on the delay for batching: once the producer gets min_queued_messages worth of messages for a broker, it will be sent immediately regardless of this setting. However, if we have fewer than this many messages accumulated for this partition we will ‘linger’ for the specified time waiting for more records to show up. linger_ms=0 indicates no lingering - messages are sent as fast as possible after they are `produce()`d.
  • queue_empty_timeout_ms (int) – The amount of time in milliseconds for which the producer’s worker threads should block when no messages are available to flush to brokers. After each linger_ms interval, the worker thread checks for the presence of at least one message in its queue. If there is not at least one, it enters an “empty wait” period for queue_empty_timeout_ms before starting a new linger_ms wait loop. If queue_empty_timeout_ms is 0, this “empty wait” period is a noop, and flushes will continue to be attempted at intervals of linger_ms, even when the queue is empty. If queue_empty_timeout_ms is a positive integer, this “empty wait” period will last for at most that long, but it ends earlier if a message is produce()`d before that time. If `queue_empty_timeout_ms is -1, the “empty wait” period can only be stopped (and the worker thread killed) by a call to either produce() or stop() - it will never time out.
  • block_on_queue_full (bool) – When the producer’s message queue for a broker contains max_queued_messages, we must either stop accepting new messages (block) or throw an error. If True, this setting indicates we should block until space is available in the queue. If False, we should throw an error immediately.
  • max_request_size (int) – The maximum size of a request in bytes. This is also effectively a cap on the maximum record size. Note that the server has its own cap on record size which may be different from this. This setting will limit the number of record batches the producer will send in a single request to avoid sending huge requests.
  • sync (bool) – Whether calls to produce should wait for the message to send before returning. If True, an exception will be raised from produce() if delivery to kafka failed.
  • delivery_reports (bool) – If set to True, the producer will maintain a thread-local queue on which delivery reports are posted for each message produced. These must regularly be retrieved through get_delivery_report(), which returns a 2-tuple of pykafka.protocol.Message and either None (for success) or an Exception in case of failed delivery to kafka. If get_delivery_report() is not called regularly with this setting enabled, memory usage will grow unbounded. This setting is ignored when sync=True.
  • pending_timeout_ms – The amount of time (in milliseconds) to wait for delivery reports to be returned from the broker during a produce() call. Also, the time in ms to wait during a stop() call for all messages to be marked as delivered. -1 indicates that these calls should block indefinitely. Differs from ack_timeout_ms in that ack_timeout_ms is a value sent to the broker to control the broker-side timeout, while pending_timeout_ms is used internally by pykafka and not sent to the broker.
  • auto_start (bool) – Whether the producer should begin communicating with kafka after __init__ is complete. If false, communication can be started with start().
  • serializer (function) – A function defining how to serialize messages to be sent to Kafka. A function with the signature d(value, partition_key) that returns a tuple of (serialized_value, serialized_partition_key). The arguments passed to this function are a message’s value and partition key, and the returned data should be these fields transformed according to the client code’s serialization logic. See pykafka.utils.__init__ for stock implemtations.
__repr__() <==> repr(x)
__weakref__

list of weak references to the object (if defined)

_produce(message)

Enqueue a message for the relevant broker Attempts to update metadata in response to missing brokers. :param message: Message with valid partition_id, ready to be sent :type message: pykafka.protocol.Message

_produce_has_timed_out(start_time)

Indicates whether enough time has passed since start_time for a produce() call to timeout

_raise_worker_exceptions()

Raises exceptions encountered on worker threads

_send_request(message_batch, owned_broker)

Send the produce request to the broker and handle the response.

Parameters:
  • message_batch (iterable of pykafka.protocol.Message) – An iterable of messages to send
  • owned_broker (pykafka.producer.OwnedBroker) – The broker to which to send the request
_setup_owned_brokers()

Instantiate one OwnedBroker per broker

If there are already OwnedBrokers instantiated, safely stop and flush them before creating new ones.

_update()

Update the producer and cluster after an ERROR_CODE

Also re-produces messages that were in queues at the time the update was triggered

_wait_all()

Block until all pending messages are sent or until pending_timeout_ms

“Pending” messages are those that have been used in calls to produce and have not yet been acknowledged in a response from the broker

get_delivery_report(block=True, timeout=None)

Fetch delivery reports for messages produced on the current thread

Returns 2-tuples of a pykafka.protocol.Message and either None (for successful deliveries) or Exception (for failed deliveries). This interface is only available if you enabled delivery_reports on init (and you did not use sync=True)

Parameters:
  • block (bool) – Whether to block on dequeueing a delivery report
  • timeout – How long (in seconds) to block before returning None

;type timeout: int

produce(message, partition_key=None, timestamp=None)

Produce a message.

Parameters:
  • message (bytes) – The message to produce (use None to send null)
  • partition_key (bytes) – The key to use when deciding which partition to send this message to. This key is passed to the partitioner, which may or may not use it in deciding the partition. The default RandomPartitioner does not use this key, but the optional HashingPartitioner does.
  • timestamp (datetime.datetime) – The timestamp at which the message is produced (requires broker_version >= 0.10.0)
Returns:

The pykafka.protocol.Message instance that was added to the internal message queue

start()

Set up data structures and start worker threads

stop()

Mark the producer as stopped, and wait until all messages to be sent