pykafka.broker¶
Author: Keith Bourgoin, Emmett Butler
-
class
pykafka.broker.
Broker
(id_, host, port, handler, socket_timeout_ms, offsets_channel_socket_timeout_ms, buffer_size=1048576, source_host='', source_port=0, ssl_config=None, broker_version='0.9.0', api_versions=None)¶ Bases:
object
A Broker is an abstraction over a real kafka server instance. It is used to perform requests to these servers.
-
__init__
(id_, host, port, handler, socket_timeout_ms, offsets_channel_socket_timeout_ms, buffer_size=1048576, source_host='', source_port=0, ssl_config=None, broker_version='0.9.0', api_versions=None)¶ Create a Broker instance.
Parameters: - id (int) – The id number of this broker
- host (str) – The host address to which to connect. An IP address or a DNS name
- port (int) – The port on which to connect
- handler (
pykafka.handlers.Handler
) – A Handler instance that will be used to service requests and responses - socket_timeout_ms (int) – The socket timeout for network requests
- offsets_channel_socket_timeout_ms (int) – The socket timeout for network requests on the offsets channel
- buffer_size (int) – The size (bytes) of the internal buffer used to receive network responses
- source_host (str) – The host portion of the source address for socket connections
- source_port (int) – The port portion of the source address for socket connections
- ssl_config (
pykafka.connection.SslConfig
) – Config object for SSL connection - broker_version (str) – The protocol version of the cluster being connected to. If this parameter doesn’t match the actual broker version, some pykafka features may not work properly.
- api_versions (Iterable of
pykafka.protocol.ApiVersionsSpec
) – A sequence ofpykafka.protocol.ApiVersionsSpec
objects indicating the API version compatibility of this broker
-
__repr__
() <==> repr(x)¶
-
__weakref__
¶ list of weak references to the object (if defined)
-
_get_unique_req_handler
(connection_id)¶ Return a RequestHandler instance unique to the given connection_id
In some applications, for example the Group Membership API, requests running in the same process must be interleaved. When both of these requests are using the same RequestHandler instance, the requests are queued and the interleaving semantics are not upheld. This method behaves identically to self._req_handler if there is only one connection_id per KafkaClient. If a single KafkaClient needs to use more than one connection_id, this method maintains a dictionary of connections unique to those ids.
Parameters: connection_id (str) – The unique identifier of the connection to return
-
commit_consumer_group_offsets
(consumer_group, consumer_group_generation_id, consumer_id, preqs)¶ Commit offsets to Kafka using the Offset Commit/Fetch API
Commit the offsets of all messages consumed so far by this consumer group with the Offset Commit/Fetch API
Based on Step 2 here https://cwiki.apache.org/confluence/display/KAFKA/Committing+and+fetching+consumer+offsets+in+Kafka
Parameters: - consumer_group (str) – the name of the consumer group for which to commit offsets
- consumer_group_generation_id (int) – The generation ID for this consumer group
- consumer_id (str) – The identifier for this consumer group
- preqs (Iterable of
pykafka.protocol.PartitionOffsetCommitRequest
) – Requests indicating the partitions for which offsets should be committed
-
connect
(attempts=3)¶ Establish a connection to the broker server.
Creates a new
pykafka.connection.BrokerConnection
and a newpykafka.handlers.RequestHandler
for this broker
-
connect_offsets_channel
(attempts=3)¶ Establish a connection to the Broker for the offsets channel
Creates a new
pykafka.connection.BrokerConnection
and a newpykafka.handlers.RequestHandler
for this broker’s offsets channel
-
connected
¶ Returns True if this object’s main connection to the Kafka broker is active
-
fetch_consumer_group_offsets
(consumer_group, preqs)¶ Fetch the offsets stored in Kafka with the Offset Commit/Fetch API
Based on Step 2 here https://cwiki.apache.org/confluence/display/KAFKA/Committing+and+fetching+consumer+offsets+in+Kafka
Parameters: - consumer_group (str) – the name of the consumer group for which to fetch offsets
- preqs (Iterable of
pykafka.protocol.PartitionOffsetFetchRequest
) – Requests indicating the partitions for which offsets should be fetched
-
classmethod
from_metadata
(metadata, handler, socket_timeout_ms, offsets_channel_socket_timeout_ms, buffer_size=65536, source_host='', source_port=0, ssl_config=None, broker_version='0.9.0', api_versions=None)¶ Create a Broker using BrokerMetadata
Parameters: - metadata (
pykafka.protocol.BrokerMetadata.
) – Metadata that describes the broker. - handler (
pykafka.handlers.Handler
) – A Handler instance that will be used to service requests and responses - socket_timeout_ms (int) – The socket timeout for network requests
- offsets_channel_socket_timeout_ms (int) – The socket timeout for network requests on the offsets channel
- buffer_size (int) – The size (bytes) of the internal buffer used to receive network responses
- source_host (str) – The host portion of the source address for socket connections
- source_port (int) – The port portion of the source address for socket connections
- ssl_config (
pykafka.connection.SslConfig
) – Config object for SSL connection - broker_version (str) – The protocol version of the cluster being connected to. If this parameter doesn’t match the actual broker version, some pykafka features may not work properly.
- api_versions (Iterable of
pykafka.protocol.ApiVersionsSpec
) – A sequence ofpykafka.protocol.ApiVersionsSpec
objects indicating the API version compatibility of this broker
- metadata (
-
handler
¶ The primary
pykafka.handlers.RequestHandler
for this brokerThis handler handles all requests outside of the commit/fetch api
-
heartbeat
(connection_id, consumer_group, generation_id, member_id)¶ Send a HeartbeatRequest
Parameters: - connection_id (str) – The unique identifier of the connection on which to make this request
- consumer_group (bytes) – The name of the consumer group to which this consumer belongs
- generation_id (int) – The current generation for the consumer group
- member_id (bytes) – The ID of the consumer sending this heartbeat
-
host
¶ The host to which this broker is connected
-
id
¶ The broker’s ID within the Kafka cluster
-
join_group
(connection_id, consumer_group, member_id, topic_name, membership_protocol)¶ Send a JoinGroupRequest
Parameters: - connection_id (str) – The unique identifier of the connection on which to make this request
- consumer_group (bytes) – The name of the consumer group to join
- member_id (bytes) – The ID of the consumer joining the group
- topic_name (str) – The name of the topic to which to connect, used in protocol metadata
- membership_protocol (
pykafka.membershipprotocol.GroupMembershipProtocol
) – The group membership protocol to which this request should adhere
-
leave_group
(connection_id, consumer_group, member_id)¶ Send a LeaveGroupRequest
Parameters: - connection_id (str) – The unique identifier of the connection on which to make this request
- consumer_group (bytes) – The name of the consumer group to leave
- member_id (bytes) – The ID of the consumer leaving the group
-
offsets_channel_connected
¶ Returns True if this object’s offsets channel connection to the Kafka broker is active
-
offsets_channel_handler
¶ - The offset channel
pykafka.handlers.RequestHandler
for this - broker
This handler handles all requests that use the commit/fetch api
- The offset channel
-
port
¶ The port where the broker is available
-
sync_group
(connection_id, consumer_group, generation_id, member_id, group_assignment)¶ Send a SyncGroupRequest
Parameters: - connection_id (str) – The unique identifier of the connection on which to make this request
- consumer_group (bytes) – The name of the consumer group to which this consumer belongs
- generation_id (int) – The current generation for the consumer group
- member_id (bytes) – The ID of the consumer syncing
- group_assignment (iterable of
pykafka.protocol.MemberAssignment
) – A sequence ofpykafka.protocol.MemberAssignment
instances indicating the partition assignments for each member of the group. When sync_group is called by a member other than the leader of the group, group_assignment should be an empty sequence.
-