pykafka.cluster

class pykafka.cluster.Cluster(hosts, handler, socket_timeout_ms=30000, offsets_channel_socket_timeout_ms=10000, exclude_internal_topics=True, source_address='', zookeeper_hosts=None, ssl_config=None, broker_version='0.9.0')

Bases: object

A Cluster is a high-level abstraction of the collection of brokers and topics that makes up a real kafka cluster.

__init__(hosts, handler, socket_timeout_ms=30000, offsets_channel_socket_timeout_ms=10000, exclude_internal_topics=True, source_address='', zookeeper_hosts=None, ssl_config=None, broker_version='0.9.0')

Create a new Cluster instance.

Parameters:
  • hosts (str) – Comma-separated list of kafka hosts to which to connect.
  • zookeeper_hosts (str) – KazooClient-formatted string of ZooKeeper hosts to which to connect. If not None, this argument takes precedence over hosts
  • handler (pykafka.handlers.Handler) – The concurrency handler for network requests.
  • socket_timeout_ms (int) – The socket timeout (in milliseconds) for network requests
  • offsets_channel_socket_timeout_ms (int) – The socket timeout (in milliseconds) when reading responses for offset commit and offset fetch requests.
  • exclude_internal_topics (bool) – Whether messages from internal topics (specifically, the offsets topic) should be exposed to consumers.
  • source_address (str ‘host:port’) – The source address for socket connections
  • ssl_config (pykafka.connection.SslConfig) – Config object for SSL connection
  • broker_version (str) – The protocol version of the cluster being connected to. If this parameter doesn’t match the actual broker version, some pykafka features may not work properly.
__weakref__

list of weak references to the object (if defined)

_get_broker_connection_info()

Get a list of host:port pairs representing possible broker connections

For use only when self.brokers is not populated (ie at startup)

_get_brokers_from_zookeeper(zk_connect)

Build a list of broker connection pairs from a ZooKeeper host

Parameters:zk_connect (str) – The ZooKeeper connect string of the instance to which to connect
_get_metadata(topics=None)

Get fresh cluster metadata from a broker.

_request_random_broker(broker_connects, req_fn)

Make a request to any broker in broker_connects

Returns the result of the first successful request

Parameters:
  • broker_connects (Iterable of two-element sequences of the format (broker_host, broker_port)) – The set of brokers to which to attempt to connect
  • req_fn (function) – A function accepting a pykafka.broker.Broker as its sole argument that returns a pykafka.protocol.Response. The argument to this function will be the each of the brokers discoverable via broker_connects in turn.
_update_brokers(broker_metadata, controller_id)

Update brokers with fresh metadata.

Parameters:
  • broker_metadata (Dict of {name: metadata} where metadata is pykafka.protocol.BrokerMetadata and name is str.) – Metadata for all brokers.
  • controller_id (int) – The ID of the cluster’s controller broker, if applicable
brokers

The dict of known brokers for this cluster

fetch_api_versions()

Get API version info from an available broker and save it

get_group_coordinator(consumer_group)

Get the broker designated as the group coordinator for this consumer group.

Based on Step 1 at https://cwiki.apache.org/confluence/display/KAFKA/Committing+and+fetching+consumer+offsets+in+Kafka

Parameters:consumer_group (str) – The name of the consumer group for which to find the offset manager.
get_managed_group_descriptions()

Return detailed descriptions of all managed consumer groups on this cluster

This function only returns descriptions for consumer groups created via the Group Management API, which pykafka refers to as :class:`ManagedBalancedConsumer`s

handler

The concurrency handler for network requests

topics

The dict of known topics for this cluster

NOTE: This dict is an instance of pykafka.cluster.TopicDict, which uses weak references and lazy evaluation to avoid instantiating unnecessary pykafka.Topic objects. Thus, the values displayed when printing client.topics on a freshly created pykafka.KafkaClient will be None. This simply means that the topic instances have not yet been created, but they will be when __getitem__ is called on the dictionary.

update()

Update known brokers and topics.