Author: Keith Bourgoin, Emmett Butler

class, host, port, handler, socket_timeout_ms, offsets_channel_socket_timeout_ms, buffer_size=1048576, source_host='', source_port=0)

Bases: object

A Broker is an abstraction over a real kafka server instance. It is used to perform requests to these servers.

__init__(id_, host, port, handler, socket_timeout_ms, offsets_channel_socket_timeout_ms, buffer_size=1048576, source_host='', source_port=0)

Create a Broker instance.

  • id (int) – The id number of this broker
  • host (str) – The host address to which to connect. An IP address or a DNS name
  • port (int) – The port on which to connect
  • handler (pykafka.handlers.Handler) – A Handler instance that will be used to service requests and responses
  • socket_timeout_ms (int) – The socket timeout for network requests
  • offsets_channel_socket_timeout_ms (int) – The socket timeout for network requests on the offsets channel
  • buffer_size (int) – The size (bytes) of the internal buffer used to receive network responses
  • source_host (str) – The host portion of the source address for socket connections
  • source_port (int) – The port portion of the source address for socket connections

list of weak references to the object (if defined)

commit_consumer_group_offsets(consumer_group, consumer_group_generation_id, consumer_id, preqs)

Commit offsets to Kafka using the Offset Commit/Fetch API

Commit the offsets of all messages consumed so far by this consumer group with the Offset Commit/Fetch API

Based on Step 2 here

  • consumer_group (str) – the name of the consumer group for which to commit offsets
  • consumer_group_generation_id (int) – The generation ID for this consumer group
  • consumer_id (str) – The identifier for this consumer group
  • preqs (Iterable of pykafka.protocol.PartitionOffsetCommitRequest) – Requests indicating the partitions for which offsets should be committed

Establish a connection to the broker server.

Creates a new pykafka.connection.BrokerConnection and a new pykafka.handlers.RequestHandler for this broker


Establish a connection to the Broker for the offsets channel

Creates a new pykafka.connection.BrokerConnection and a new pykafka.handlers.RequestHandler for this broker’s offsets channel


Returns True if this object’s main connection to the Kafka broker is active

fetch_consumer_group_offsets(consumer_group, preqs)

Fetch the offsets stored in Kafka with the Offset Commit/Fetch API

Based on Step 2 here

  • consumer_group (str) – the name of the consumer group for which to fetch offsets
  • preqs (Iterable of pykafka.protocol.PartitionOffsetFetchRequest) – Requests indicating the partitions for which offsets should be fetched
fetch_messages(partition_requests, timeout=30000, min_bytes=1)

Fetch messages from a set of partitions.

  • partition_requests (Iterable of pykafka.protocol.PartitionFetchRequest) – Requests of messages to fetch.
  • timeout (int) – the maximum amount of time (in milliseconds) the server will block before answering the fetch request if there isn’t sufficient data to immediately satisfy min_bytes
  • min_bytes (int) – the minimum amount of data (in bytes) the server should return. If insufficient data is available the request will block for up to timeout milliseconds.
classmethod from_metadata(metadata, handler, socket_timeout_ms, offsets_channel_socket_timeout_ms, buffer_size=65536, source_host='', source_port=0)

Create a Broker using BrokerMetadata

  • metadata (pykafka.protocol.BrokerMetadata.) – Metadata that describes the broker.
  • handler (pykafka.handlers.Handler) – A Handler instance that will be used to service requests and responses
  • socket_timeout_ms (int) – The socket timeout for network requests
  • offsets_channel_socket_timeout_ms (int) – The socket timeout for network requests on the offsets channel
  • buffer_size (int) – The size (bytes) of the internal buffer used to receive network responses
  • source_host (str) – The host portion of the source address for socket connections
  • source_port (int) – The port portion of the source address for socket connections

The primary pykafka.handlers.RequestHandler for this broker

This handler handles all requests outside of the commit/fetch api


The host to which this broker is connected


The broker’s ID within the Kafka cluster


Returns True if this object’s offsets channel connection to the Kafka broker is active

The offset channel pykafka.handlers.RequestHandler for this

This handler handles all requests that use the commit/fetch api


The port where the broker is available


Produce messages to a set of partitions.

Parameters:produce_request (pykafka.protocol.ProduceRequest) – a request object indicating the messages to produce

Request cluster metadata

Parameters:topics (Iterable of bytes) – The topic names for which to request metadata

Request offset information for a set of topic/partitions

Parameters:partition_requests (Iterable of pykafka.protocol.PartitionOffsetRequest) – requests specifying the partitions for which to fetch offsets