Author: Keith Bourgoin, Emmett Butler

class pykafka.topic.Topic(cluster, topic_metadata)

Bases: object

A Topic is an abstraction over the kafka concept of a topic. It contains a dictionary of partitions that comprise it.

__init__(cluster, topic_metadata)

Create the Topic from metadata.

  • cluster (pykafka.cluster.Cluster) – The Cluster to use
  • topic_metadata (pykafka.protocol.TopicMetadata) – Metadata for all topics.
__repr__() <==> repr(x)

list of weak references to the object (if defined)


Get the earliest offset for each partition of this topic.

fetch_offset_limits(offsets_before, max_offsets=1)

Get information about the offsets of log segments for this topic

The ListOffsets API, which this function relies on, primarily deals with topics in terms of their log segments. Its behavior can be summed up as follows: it returns some subset of starting message offsets for the log segments of each partition. The particular subset depends on this function’s two arguments, filtering by timestamp and in certain cases, count. The documentation for this API is notoriously imprecise, so here’s a little example to illustrate how it works.

Take a topic with three partitions 0,1,2. 2665 messages have been produced to this topic, and the brokers’ log.segment.bytes settings are configured such that each log segment contains roughly 530 messages. The two oldest log segments have been deleted due to log retention settings such as log.retention.hours. Thus, the log.dirs currently contains these files for partition 0:

/var/local/kafka/data/test2-0/00000000000000001059.log /var/local/kafka/data/test2-0/00000000000000002119.log /var/local/kafka/data/test2-0/00000000000000001589.log /var/local/kafka/data/test2-0/00000000000000002649.log

The numbers on these filenames indicate the offset of the earliest message contained within. The most recent message was written at 1523572215.69.

Given this log state, a call to this function with offsets_before=OffsetType.LATEST and max_offsets=100 will result in a return value of [2665,2649,2119,1589,1059] for partition 0. The first value (2665) is the offset of the latest available message from the latest log segment. The other four offsets are those of the earliest messages from each log segment for the partition. Changing max_offsets to 3 will result in only the first three elements of this list being returned.

A call to this function with offsets_before=OffsetType.EARLIEST will result in a value of [1059] - only the offset of the earliest message present in log segments for partition 0. In this case, the return value is not affected by max_offsets.

A call to this function with offsets_before=(1523572215.69 * 1000) (the timestamp in milliseconds of the very last message written to the partition) will result in a value of [2649,2119,1589,1059]. This is the same list as with OffsetType.LATEST, but with the first element removed. This is because unlike the other elements, the message with this offset (2665) was not written before the given timestamp.

In cases where there are no log segments fitting the given criteria for a partition, an empty list is returned. This applies if the given timestamp is before the write time of the oldest message in the partition, as well as if there are no log segments for the partition.

Thanks to Andras Beni from the Kafka users mailing list for providing this example.

  • offsets_before (datetime.datetime or int) – Epoch timestamp in milliseconds or datetime indicating the latest write time for returned offsets. Only offsets of messages written before this timestamp will be returned. Permissible special values are common.OffsetType.LATEST, indicating that offsets from all available log segments should be returned, and common.OffsetType.EARLIEST, indicating that only the offset of the earliest available message should be returned. Deprecated::2.7,3.6: do not use int
  • max_offsets (int) – The maximum number of offsets to return when more than one is available. In the case where offsets_before == OffsetType.EARLIEST, this parameter is meaningless since there is always either one or zero earliest offsets. In other cases, this parameter slices off the earliest end of the list, leaving the latest max_offsets offsets.
get_balanced_consumer(consumer_group, managed=False, **kwargs)

Return a BalancedConsumer of this topic

  • consumer_group (bytes) – The name of the consumer group to join
  • managed (bool) – If True, manage the consumer group with Kafka using the 0.9 group management api (requires Kafka >=0.9))
get_producer(use_rdkafka=False, **kwargs)

Create a pykafka.producer.Producer for this topic.

For a description of all available kwargs, see the Producer docstring.

get_simple_consumer(consumer_group=None, use_rdkafka=False, **kwargs)

Return a SimpleConsumer of this topic

  • consumer_group (bytes) – The name of the consumer group to join
  • use_rdkafka (bool) – Use librdkafka-backed consumer if available

Create a pykafka.producer.Producer for this topic.

The created Producer instance will have sync=True.

For a description of all available kwargs, see the Producer docstring.


Fetch the next available offset

Get the offset of the next message that would be appended to each partition of
this topic.

The name of this topic


A dictionary containing all known partitions for this topic


Update the Partitions with metadata about the cluster.

Parameters:metadata (pykafka.protocol.TopicMetadata) – Metadata for all topics