pykafka.cluster¶
-
class
pykafka.cluster.
Cluster
(hosts, handler, socket_timeout_ms=30000, offsets_channel_socket_timeout_ms=10000, exclude_internal_topics=True, source_address='', zookeeper_hosts=None, ssl_config=None, broker_version='0.9.0')¶ Bases:
object
A Cluster is a high-level abstraction of the collection of brokers and topics that makes up a real kafka cluster.
-
__init__
(hosts, handler, socket_timeout_ms=30000, offsets_channel_socket_timeout_ms=10000, exclude_internal_topics=True, source_address='', zookeeper_hosts=None, ssl_config=None, broker_version='0.9.0')¶ Create a new Cluster instance.
Parameters: - hosts (str) – Comma-separated list of kafka hosts to which to connect.
- zookeeper_hosts (str) – KazooClient-formatted string of ZooKeeper hosts to which to connect. If not None, this argument takes precedence over hosts
- handler (
pykafka.handlers.Handler
) – The concurrency handler for network requests. - socket_timeout_ms (int) – The socket timeout (in milliseconds) for network requests
- offsets_channel_socket_timeout_ms (int) – The socket timeout (in milliseconds) when reading responses for offset commit and offset fetch requests.
- exclude_internal_topics (bool) – Whether messages from internal topics (specifically, the offsets topic) should be exposed to consumers.
- source_address (str ‘host:port’) – The source address for socket connections
- ssl_config (
pykafka.connection.SslConfig
) – Config object for SSL connection - broker_version (str) – The protocol version of the cluster being connected to. If this parameter doesn’t match the actual broker version, some pykafka features may not work properly.
-
__repr__
() <==> repr(x)¶
-
__weakref__
¶ list of weak references to the object (if defined)
-
_get_broker_connection_info
()¶ Get a list of host:port pairs representing possible broker connections
For use only when self.brokers is not populated (ie at startup)
-
_get_brokers_from_zookeeper
(zk_connect)¶ Build a list of broker connection pairs from a ZooKeeper host
Parameters: zk_connect (str) – The ZooKeeper connect string of the instance to which to connect
-
_get_metadata
(topics=None)¶ Get fresh cluster metadata from a broker.
-
_request_random_broker
(broker_connects, req_fn)¶ Make a request to any broker in broker_connects
Returns the result of the first successful request
Parameters: - broker_connects (Iterable of two-element sequences of the format (broker_host, broker_port)) – The set of brokers to which to attempt to connect
- req_fn (function) – A function accepting a
pykafka.broker.Broker
as its sole argument that returns apykafka.protocol.Response
. The argument to this function will be the each of the brokers discoverable via broker_connects in turn.
-
_update_brokers
(broker_metadata, controller_id)¶ Update brokers with fresh metadata.
Parameters: - broker_metadata (Dict of {name: metadata} where metadata is
pykafka.protocol.BrokerMetadata
and name is str.) – Metadata for all brokers. - controller_id (int) – The ID of the cluster’s controller broker, if applicable
- broker_metadata (Dict of {name: metadata} where metadata is
-
brokers
¶ The dict of known brokers for this cluster
-
fetch_api_versions
()¶ Get API version info from an available broker and save it
-
get_group_coordinator
(consumer_group)¶ Get the broker designated as the group coordinator for this consumer group.
Based on Step 1 at https://cwiki.apache.org/confluence/display/KAFKA/Committing+and+fetching+consumer+offsets+in+Kafka
Parameters: consumer_group (str) – The name of the consumer group for which to find the offset manager.
-
get_managed_group_descriptions
()¶ Return detailed descriptions of all managed consumer groups on this cluster
This function only returns descriptions for consumer groups created via the Group Management API, which pykafka refers to as :class:`ManagedBalancedConsumer`s
-
handler
¶ The concurrency handler for network requests
-
topics
¶ The dict of known topics for this cluster
NOTE: This dict is an instance of
pykafka.cluster.TopicDict
, which uses weak references and lazy evaluation to avoid instantiating unnecessary pykafka.Topic objects. Thus, the values displayed when printing client.topics on a freshly createdpykafka.KafkaClient
will be None. This simply means that the topic instances have not yet been created, but they will be when __getitem__ is called on the dictionary.
-
update
()¶ Update known brokers and topics.
-