pykafka.broker

Author: Keith Bourgoin, Emmett Butler

class pykafka.broker.Broker(id_, host, port, handler, socket_timeout_ms, offsets_channel_socket_timeout_ms, buffer_size=1048576)

A Broker is an abstraction over a real kafka server instance. It is used to perform requests to these servers.

__init__(id_, host, port, handler, socket_timeout_ms, offsets_channel_socket_timeout_ms, buffer_size=1048576)

Create a Broker instance.

Parameters:
  • id (int) – The id number of this broker
  • host (str) – The host address to which to connect. An IP address or a DNS name
  • port (int) – The port on which to connect
  • handler (pykafka.handlers.Handler) – A Handler instance that will be used to service requests and responses
  • socket_timeout_ms (int) – The socket timeout for network requests
  • offsets_channel_socket_timeout_ms (int) – The socket timeout for network requests on the offsets channel
  • buffer_size (int) – The size (bytes) of the internal buffer used to receive network responses
commit_consumer_group_offsets(consumer_group, consumer_group_generation_id, consumer_id, preqs)

Commit offsets to Kafka using the Offset Commit/Fetch API

Commit the offsets of all messages consumed so far by this consumer group with the Offset Commit/Fetch API

Based on Step 2 here https://cwiki.apache.org/confluence/display/KAFKA/Committing+and+fetching+consumer+offsets+in+Kafka

Parameters:
  • consumer_group (str) – the name of the consumer group for which to commit offsets
  • consumer_group_generation_id (int) – The generation ID for this consumer group
  • consumer_id (str) – The identifier for this consumer group
  • preqs (Iterable of pykafka.protocol.PartitionOffsetCommitRequest) – Requests indicating the partitions for which offsets should be committed
connect()

Establish a connection to the broker server.

Creates a new pykafka.connection.BrokerConnection and a new pykafka.handlers.RequestHandler for this broker

connect_offsets_channel()

Establish a connection to the Broker for the offsets channel

Creates a new pykafka.connection.BrokerConnection and a new pykafka.handlers.RequestHandler for this broker’s offsets channel

connected

Returns True if this object’s main connection to the Kafka broker is active

fetch_consumer_group_offsets(consumer_group, preqs)

Fetch the offsets stored in Kafka with the Offset Commit/Fetch API

Based on Step 2 here https://cwiki.apache.org/confluence/display/KAFKA/Committing+and+fetching+consumer+offsets+in+Kafka

Parameters:
  • consumer_group (str) – the name of the consumer group for which to fetch offsets
  • preqs (Iterable of pykafka.protocol.PartitionOffsetFetchRequest) – Requests indicating the partitions for which offsets should be fetched
fetch_messages(partition_requests, timeout=30000, min_bytes=1)

Fetch messages from a set of partitions.

Parameters:
  • partition_requests (Iterable of pykafka.protocol.PartitionFetchRequest) – Requests of messages to fetch.
  • timeout (int) – the maximum amount of time (in milliseconds) the server will block before answering the fetch request if there isn’t sufficient data to immediately satisfy min_bytes
  • min_bytes (int) – the minimum amount of data (in bytes) the server should return. If insufficient data is available the request will block for up to timeout milliseconds.
classmethod from_metadata(metadata, handler, socket_timeout_ms, offsets_channel_socket_timeout_ms, buffer_size=65536)

Create a Broker using BrokerMetadata

Parameters:
  • metadata (pykafka.protocol.BrokerMetadata.) – Metadata that describes the broker.
  • handler (pykafka.handlers.Handler) – A Handler instance that will be used to service requests and responses
  • socket_timeout_ms (int) – The socket timeout for network requests
  • offsets_channel_socket_timeout_ms (int) – The socket timeout for network requests on the offsets channel
  • buffer_size (int) – The size (bytes) of the internal buffer used to receive network responses
handler

The primary pykafka.handlers.RequestHandler for this broker

This handler handles all requests outside of the commit/fetch api

host

The host to which this broker is connected

id

The broker’s ID within the Kafka cluster

offsets_channel_connected

Returns True if this object’s offsets channel connection to the Kafka broker is active

offsets_channel_handler
The offset channel pykafka.handlers.RequestHandler for this
broker

This handler handles all requests that use the commit/fetch api

port

The port where the broker is available

produce_messages(produce_request)

Produce messages to a set of partitions.

Parameters:produce_request (pykafka.protocol.ProduceRequest) – a request object indicating the messages to produce
request_metadata(topics=None)

Request cluster metadata

Parameters:topics (Iterable of int) – The topic ids for which to request metadata
request_offset_limits(partition_requests)

Request offset information for a set of topic/partitions

Parameters:partition_requests (Iterable of pykafka.protocol.PartitionOffsetRequest) – requests specifying the partitions for which to fetch offsets