pykafka.cluster¶
-
class
pykafka.cluster.Cluster(hosts, handler, socket_timeout_ms=30000, offsets_channel_socket_timeout_ms=10000, exclude_internal_topics=True, source_address='', zookeeper_hosts=None, ssl_config=None, broker_version='0.9.0')¶ Bases:
objectA Cluster is a high-level abstraction of the collection of brokers and topics that makes up a real kafka cluster.
-
__init__(hosts, handler, socket_timeout_ms=30000, offsets_channel_socket_timeout_ms=10000, exclude_internal_topics=True, source_address='', zookeeper_hosts=None, ssl_config=None, broker_version='0.9.0')¶ Create a new Cluster instance.
Parameters: - hosts (str) – Comma-separated list of kafka hosts to which to connect.
- zookeeper_hosts (str) – KazooClient-formatted string of ZooKeeper hosts to which to connect. If not None, this argument takes precedence over hosts
- handler (
pykafka.handlers.Handler) – The concurrency handler for network requests. - socket_timeout_ms (int) – The socket timeout (in milliseconds) for network requests
- offsets_channel_socket_timeout_ms (int) – The socket timeout (in milliseconds) when reading responses for offset commit and offset fetch requests.
- exclude_internal_topics (bool) – Whether messages from internal topics (specifically, the offsets topic) should be exposed to consumers.
- source_address (str ‘host:port’) – The source address for socket connections
- ssl_config (
pykafka.connection.SslConfig) – Config object for SSL connection - broker_version (str) – The protocol version of the cluster being connected to. If this parameter doesn’t match the actual broker version, some pykafka features may not work properly.
-
__repr__() <==> repr(x)¶
-
__weakref__¶ list of weak references to the object (if defined)
-
_get_broker_connection_info()¶ Get a list of host:port pairs representing possible broker connections
For use only when self.brokers is not populated (ie at startup)
-
_get_brokers_from_zookeeper(zk_connect)¶ Build a list of broker connection pairs from a ZooKeeper host
Parameters: zk_connect (str) – The ZooKeeper connect string of the instance to which to connect
-
_get_metadata(topics=None)¶ Get fresh cluster metadata from a broker.
-
_request_random_broker(broker_connects, req_fn)¶ Make a request to any broker in broker_connects
Returns the result of the first successful request
Parameters: - broker_connects (Iterable of two-element sequences of the format (broker_host, broker_port)) – The set of brokers to which to attempt to connect
- req_fn (function) – A function accepting a
pykafka.broker.Brokeras its sole argument that returns apykafka.protocol.Response. The argument to this function will be the each of the brokers discoverable via broker_connects in turn.
-
_update_brokers(broker_metadata, controller_id)¶ Update brokers with fresh metadata.
Parameters: - broker_metadata (Dict of {name: metadata} where metadata is
pykafka.protocol.BrokerMetadataand name is str.) – Metadata for all brokers. - controller_id (int) – The ID of the cluster’s controller broker, if applicable
- broker_metadata (Dict of {name: metadata} where metadata is
-
brokers¶ The dict of known brokers for this cluster
-
fetch_api_versions()¶ Get API version info from an available broker and save it
-
get_group_coordinator(consumer_group)¶ Get the broker designated as the group coordinator for this consumer group.
Based on Step 1 at https://cwiki.apache.org/confluence/display/KAFKA/Committing+and+fetching+consumer+offsets+in+Kafka
Parameters: consumer_group (str) – The name of the consumer group for which to find the offset manager.
-
get_managed_group_descriptions()¶ Return detailed descriptions of all managed consumer groups on this cluster
This function only returns descriptions for consumer groups created via the Group Management API, which pykafka refers to as :class:`ManagedBalancedConsumer`s
-
handler¶ The concurrency handler for network requests
-
topics¶ The dict of known topics for this cluster
NOTE: This dict is an instance of
pykafka.cluster.TopicDict, which uses weak references and lazy evaluation to avoid instantiating unnecessary pykafka.Topic objects. Thus, the values displayed when printing client.topics on a freshly createdpykafka.KafkaClientwill be None. This simply means that the topic instances have not yet been created, but they will be when __getitem__ is called on the dictionary.
-
update()¶ Update known brokers and topics.
-