pykafka.broker

Author: Keith Bourgoin, Emmett Butler

class pykafka.broker.Broker(id_, host, port, handler, socket_timeout_ms, offsets_channel_socket_timeout_ms, buffer_size=1048576, source_host='', source_port=0)

Bases: object

A Broker is an abstraction over a real kafka server instance. It is used to perform requests to these servers.

__init__(id_, host, port, handler, socket_timeout_ms, offsets_channel_socket_timeout_ms, buffer_size=1048576, source_host='', source_port=0)

Create a Broker instance.

Parameters:
  • id (int) – The id number of this broker
  • host (str) – The host address to which to connect. An IP address or a DNS name
  • port (int) – The port on which to connect
  • handler (pykafka.handlers.Handler) – A Handler instance that will be used to service requests and responses
  • socket_timeout_ms (int) – The socket timeout for network requests
  • offsets_channel_socket_timeout_ms (int) – The socket timeout for network requests on the offsets channel
  • buffer_size (int) – The size (bytes) of the internal buffer used to receive network responses
  • source_host (str) – The host portion of the source address for socket connections
  • source_port (int) – The port portion of the source address for socket connections
__weakref__

list of weak references to the object (if defined)

_get_unique_req_handler(connection_id)

Return a RequestHandler instance unique to the given connection_id

In some applications, for example the Group Membership API, requests running in the same process must be interleaved. When both of these requests are using the same RequestHandler instance, the requests are queued and the interleaving semantics are not upheld. This method behaves identically to self._req_handler if there is only one connection_id per KafkaClient. If a single KafkaClient needs to use more than one connection_id, this method maintains a dictionary of connections unique to those ids.

Parameters:connection_id (str) – The unique identifier of the connection to return
commit_consumer_group_offsets(consumer_group, consumer_group_generation_id, consumer_id, preqs)

Commit offsets to Kafka using the Offset Commit/Fetch API

Commit the offsets of all messages consumed so far by this consumer group with the Offset Commit/Fetch API

Based on Step 2 here https://cwiki.apache.org/confluence/display/KAFKA/Committing+and+fetching+consumer+offsets+in+Kafka

Parameters:
  • consumer_group (str) – the name of the consumer group for which to commit offsets
  • consumer_group_generation_id (int) – The generation ID for this consumer group
  • consumer_id (str) – The identifier for this consumer group
  • preqs (Iterable of pykafka.protocol.PartitionOffsetCommitRequest) – Requests indicating the partitions for which offsets should be committed
connect()

Establish a connection to the broker server.

Creates a new pykafka.connection.BrokerConnection and a new pykafka.handlers.RequestHandler for this broker

connect_offsets_channel()

Establish a connection to the Broker for the offsets channel

Creates a new pykafka.connection.BrokerConnection and a new pykafka.handlers.RequestHandler for this broker’s offsets channel

connected

Returns True if this object’s main connection to the Kafka broker is active

fetch_consumer_group_offsets(consumer_group, preqs)

Fetch the offsets stored in Kafka with the Offset Commit/Fetch API

Based on Step 2 here https://cwiki.apache.org/confluence/display/KAFKA/Committing+and+fetching+consumer+offsets+in+Kafka

Parameters:
  • consumer_group (str) – the name of the consumer group for which to fetch offsets
  • preqs (Iterable of pykafka.protocol.PartitionOffsetFetchRequest) – Requests indicating the partitions for which offsets should be fetched
fetch_messages(partition_requests, timeout=30000, min_bytes=1)

Fetch messages from a set of partitions.

Parameters:
  • partition_requests (Iterable of pykafka.protocol.PartitionFetchRequest) – Requests of messages to fetch.
  • timeout (int) – the maximum amount of time (in milliseconds) the server will block before answering the fetch request if there isn’t sufficient data to immediately satisfy min_bytes
  • min_bytes (int) – the minimum amount of data (in bytes) the server should return. If insufficient data is available the request will block for up to timeout milliseconds.
classmethod from_metadata(metadata, handler, socket_timeout_ms, offsets_channel_socket_timeout_ms, buffer_size=65536, source_host='', source_port=0)

Create a Broker using BrokerMetadata

Parameters:
  • metadata (pykafka.protocol.BrokerMetadata.) – Metadata that describes the broker.
  • handler (pykafka.handlers.Handler) – A Handler instance that will be used to service requests and responses
  • socket_timeout_ms (int) – The socket timeout for network requests
  • offsets_channel_socket_timeout_ms (int) – The socket timeout for network requests on the offsets channel
  • buffer_size (int) – The size (bytes) of the internal buffer used to receive network responses
  • source_host (str) – The host portion of the source address for socket connections
  • source_port (int) – The port portion of the source address for socket connections
handler

The primary pykafka.handlers.RequestHandler for this broker

This handler handles all requests outside of the commit/fetch api

heartbeat(connection_id, consumer_group, generation_id, member_id)

Send a HeartbeatRequest

Parameters:
  • connection_id (str) – The unique identifier of the connection on which to make this request
  • consumer_group (bytes) – The name of the consumer group to which this consumer belongs
  • generation_id (int) – The current generation for the consumer group
  • member_id (bytes) – The ID of the consumer sending this heartbeat
host

The host to which this broker is connected

id

The broker’s ID within the Kafka cluster

join_group(connection_id, consumer_group, member_id)

Send a JoinGroupRequest

Parameters:
  • connection_id (str) – The unique identifier of the connection on which to make this request
  • consumer_group (bytes) – The name of the consumer group to join
  • member_id (bytes) – The ID of the consumer joining the group
leave_group(connection_id, consumer_group, member_id)

Send a LeaveGroupRequest

Parameters:
  • connection_id (str) – The unique identifier of the connection on which to make this request
  • consumer_group (bytes) – The name of the consumer group to leave
  • member_id (bytes) – The ID of the consumer leaving the group
offsets_channel_connected

Returns True if this object’s offsets channel connection to the Kafka broker is active

offsets_channel_handler
The offset channel pykafka.handlers.RequestHandler for this
broker

This handler handles all requests that use the commit/fetch api

port

The port where the broker is available

produce_messages(produce_request)

Produce messages to a set of partitions.

Parameters:produce_request (pykafka.protocol.ProduceRequest) – a request object indicating the messages to produce
request_metadata(topics=None)

Request cluster metadata

Parameters:topics (Iterable of bytes) – The topic names for which to request metadata
request_offset_limits(partition_requests)

Request offset information for a set of topic/partitions

Parameters:partition_requests (Iterable of pykafka.protocol.PartitionOffsetRequest) – requests specifying the partitions for which to fetch offsets
sync_group(connection_id, consumer_group, generation_id, member_id, group_assignment)

Send a SyncGroupRequest

Parameters:
  • connection_id (str) – The unique identifier of the connection on which to make this request
  • consumer_group (bytes) – The name of the consumer group to which this consumer belongs
  • generation_id (int) – The current generation for the consumer group
  • member_id (bytes) – The ID of the consumer syncing
  • group_assignment (iterable of pykafka.protocol.MemberAssignment) – A sequence of pykafka.protocol.MemberAssignment instances indicating the partition assignments for each member of the group. When sync_group is called by a member other than the leader of the group, group_assignment should be an empty sequence.