pykafka.protocol

class pykafka.protocol.MetadataRequest(topics=None)

Bases: pykafka.protocol.Request

Metadata Request

Specification:

MetadataRequest => [TopicName]
    TopicName => string
API_KEY

API_KEY for this request, from the Kafka docs

__init__(topics=None)

Create a new MetadataRequest

Parameters:topics – Topics to query. Leave empty for all available topics.
__len__()

Length of the serialized message, in bytes

get_bytes()

Serialize the message

Returns:Serialized message
Return type:bytearray
class pykafka.protocol.MetadataResponse(buff)

Bases: pykafka.protocol.Response

Response from MetadataRequest

Specification:

MetadataResponse => [Broker][TopicMetadata]
  Broker => NodeId Host Port
  NodeId => int32
  Host => string
  Port => int32
  TopicMetadata => TopicErrorCode TopicName [PartitionMetadata]
  TopicErrorCode => int16
  PartitionMetadata => PartitionErrorCode PartitionId Leader Replicas Isr
  PartitionErrorCode => int16
  PartitionId => int32
  Leader => int32
  Replicas => [int32]
  Isr => [int32]
__init__(buff)

Deserialize into a new Response

Parameters:buff (bytearray) – Serialized message
class pykafka.protocol.ProduceRequest(compression_type=0, required_acks=1, timeout=10000)

Bases: pykafka.protocol.Request

Produce Request

Specification:

ProduceRequest => RequiredAcks Timeout [TopicName [Partition MessageSetSize MessageSet]]
  RequiredAcks => int16
  Timeout => int32
  Partition => int32
  MessageSetSize => int32
API_KEY

API_KEY for this request, from the Kafka docs

__init__(compression_type=0, required_acks=1, timeout=10000)

Create a new ProduceRequest

required_acks determines how many acknowledgement the server waits for before returning. This is useful for ensuring the replication factor of published messages. The behavior is:

-1: Block until all servers acknowledge
0: No waiting -- server doesn't even respond to the Produce request
1: Wait for this server to write to the local log and then return
2+: Wait for N servers to acknowledge
Parameters:
  • partition_requests – Iterable of kafka.pykafka.protocol.PartitionProduceRequest for this request
  • compression_type – Compression to use for messages
  • required_acks – see docstring
  • timeout – timeout (in ms) to wait for the required acks
__len__()

Length of the serialized message, in bytes

add_message(message, topic_name, partition_id)

Add a list of kafka.common.Message to the waiting request

Parameters:
  • messages – an iterable of kafka.common.Message to add
  • topic_name – the name of the topic to publish to
  • partition_id – the partition to publish to
get_bytes()

Serialize the message

Returns:Serialized message
Return type:bytearray
message_count()

Get the number of messages across all MessageSets in the request.

messages

Iterable of all messages in the Request

class pykafka.protocol.ProduceResponse(buff)

Bases: pykafka.protocol.Response

Produce Response. Checks to make sure everything went okay.

Specification:

ProduceResponse => [TopicName [Partition ErrorCode Offset]]
  TopicName => string
  Partition => int32
  ErrorCode => int16
  Offset => int64
__init__(buff)

Deserialize into a new Response

Parameters:buff (bytearray) – Serialized message
class pykafka.protocol.OffsetRequest(partition_requests)

Bases: pykafka.protocol.Request

An offset request

Specification:

OffsetRequest => ReplicaId [TopicName [Partition Time MaxNumberOfOffsets]]
  ReplicaId => int32
  TopicName => string
  Partition => int32
  Time => int64
  MaxNumberOfOffsets => int32
API_KEY

API_KEY for this request, from the Kafka docs

__init__(partition_requests)

Create a new offset request

__len__()

Length of the serialized message, in bytes

get_bytes()

Serialize the message

Returns:Serialized message
Return type:bytearray
class pykafka.protocol.OffsetResponse(buff)

Bases: pykafka.protocol.Response

An offset response

Specification:

OffsetResponse => [TopicName [PartitionOffsets]]
  PartitionOffsets => Partition ErrorCode [Offset]
  Partition => int32
  ErrorCode => int16
  Offset => int64
__init__(buff)

Deserialize into a new Response

Parameters:buff (bytearray) – Serialized message
class pykafka.protocol.OffsetCommitRequest(consumer_group, consumer_group_generation_id, consumer_id, partition_requests=[])

Bases: pykafka.protocol.Request

An offset commit request

Specification:

OffsetCommitRequest => ConsumerGroupId ConsumerGroupGenerationId ConsumerId [TopicName [Partition Offset TimeStamp Metadata]]
    ConsumerGroupId => string
    ConsumerGroupGenerationId => int32
    ConsumerId => string
    TopicName => string
    Partition => int32
    Offset => int64
    TimeStamp => int64
    Metadata => string
API_KEY

API_KEY for this request, from the Kafka docs

__init__(consumer_group, consumer_group_generation_id, consumer_id, partition_requests=[])

Create a new offset commit request

Parameters:partition_requests – Iterable of kafka.pykafka.protocol.PartitionOffsetCommitRequest for this request
__len__()

Length of the serialized message, in bytes

get_bytes()

Serialize the message

Returns:Serialized message
Return type:bytearray
class pykafka.protocol.FetchRequest(partition_requests=[], timeout=1000, min_bytes=1024)

Bases: pykafka.protocol.Request

A Fetch request sent to Kafka

Specification:

FetchRequest => ReplicaId MaxWaitTime MinBytes [TopicName [Partition FetchOffset MaxBytes]]
  ReplicaId => int32
  MaxWaitTime => int32
  MinBytes => int32
  TopicName => string
  Partition => int32
  FetchOffset => int64
  MaxBytes => int32
API_KEY

API_KEY for this request, from the Kafka docs

__init__(partition_requests=[], timeout=1000, min_bytes=1024)

Create a new fetch request

Kafka 0.8 uses long polling for fetch requests, which is different from 0.7x. Instead of polling and waiting, we can now set a timeout to wait and a minimum number of bytes to be collected before it returns. This way we can block effectively and also ensure good network throughput by having fewer, large transfers instead of many small ones every time a byte is written to the log.

Parameters:
  • partition_requests – Iterable of kafka.pykafka..protocol.PartitionFetchRequest for this request
  • timeout – Max time to wait (in ms) for a response from the server
  • min_bytes – Minimum bytes to collect before returning
__len__()

Length of the serialized message, in bytes

add_request(partition_request)

Add a topic/partition/offset to the requests

Parameters:
  • topic_name – The topic to fetch from
  • partition_id – The partition to fetch from
  • offset – The offset to start reading data from
  • max_bytes – The maximum number of bytes to return in the response
get_bytes()

Serialize the message

Returns:Serialized message
Return type:bytearray
class pykafka.protocol.FetchResponse(buff)

Bases: pykafka.protocol.Response

Unpack a fetch response from the server

Specification:

FetchResponse => [TopicName [Partition ErrorCode HighwaterMarkOffset MessageSetSize MessageSet]]
  TopicName => string
  Partition => int32
  ErrorCode => int16
  HighwaterMarkOffset => int64
  MessageSetSize => int32
__init__(buff)

Deserialize into a new Response

Parameters:buff (bytearray) – Serialized message
_unpack_message_set(buff, partition_id=-1)

MessageSets can be nested. Get just the Messages out of it.

class pykafka.protocol.PartitionFetchRequest

Bases: pykafka.protocol.PartitionFetchRequest

Fetch request for a specific topic/partition

Variables:
  • topic_name – Name of the topic to fetch from
  • partition_id – Id of the partition to fetch from
  • offset – Offset at which to start reading
  • max_bytes – Max bytes to read from this partition (default: 300kb)
class pykafka.protocol.OffsetCommitResponse(buff)

Bases: pykafka.protocol.Response

An offset commit response

Specification:

OffsetCommitResponse => [TopicName [Partition ErrorCode]]]
    TopicName => string
    Partition => int32
    ErrorCode => int16
__init__(buff)

Deserialize into a new Response

Parameters:buff (bytearray) – Serialized message
class pykafka.protocol.OffsetFetchRequest(consumer_group, partition_requests=[])

Bases: pykafka.protocol.Request

An offset fetch request

Specification:

OffsetFetchRequest => ConsumerGroup [TopicName [Partition]]
    ConsumerGroup => string
    TopicName => string
    Partition => int32
API_KEY

API_KEY for this request, from the Kafka docs

__init__(consumer_group, partition_requests=[])

Create a new offset fetch request

Parameters:partition_requests – Iterable of kafka.pykafka.protocol.PartitionOffsetFetchRequest for this request
__len__()

Length of the serialized message, in bytes

get_bytes()

Serialize the message

Returns:Serialized message
Return type:bytearray
class pykafka.protocol.OffsetFetchResponse(buff)

Bases: pykafka.protocol.Response

An offset fetch response

Specification:

OffsetFetchResponse => [TopicName [Partition Offset Metadata ErrorCode]]
    TopicName => string
    Partition => int32
    Offset => int64
    Metadata => string
    ErrorCode => int16
__init__(buff)

Deserialize into a new Response

Parameters:buff (bytearray) – Serialized message
class pykafka.protocol.PartitionOffsetRequest

Bases: pykafka.protocol.PartitionOffsetRequest

Offset request for a specific topic/partition

Variables:
  • topic_name – Name of the topic to look up
  • partition_id – Id of the partition to look up
  • offsets_before – Retrieve offset information for messages before this timestamp (ms). -1 will retrieve the latest offsets and -2 will retrieve the earliest available offset. If -2,only 1 offset is returned
  • max_offsets – How many offsets to return
class pykafka.protocol.GroupCoordinatorRequest(consumer_group)

Bases: pykafka.protocol.Request

A consumer metadata request

Specification:

GroupCoordinatorRequest => ConsumerGroup
    ConsumerGroup => string
API_KEY

API_KEY for this request, from the Kafka docs

__init__(consumer_group)

Create a new group coordinator request

__len__()

Length of the serialized message, in bytes

get_bytes()

Serialize the message

Returns:Serialized message
Return type:bytearray
class pykafka.protocol.GroupCoordinatorResponse(buff)

Bases: pykafka.protocol.Response

A group coordinator response

Specification:

GroupCoordinatorResponse => ErrorCode CoordinatorId CoordinatorHost CoordinatorPort
    ErrorCode => int16
    CoordinatorId => int32
    CoordinatorHost => string
    CoordinatorPort => int32
__init__(buff)

Deserialize into a new Response

Parameters:buff (bytearray) – Serialized message
class pykafka.protocol.PartitionOffsetCommitRequest

Bases: pykafka.protocol.PartitionOffsetCommitRequest

Offset commit request for a specific topic/partition

Variables:
  • topic_name – Name of the topic to look up
  • partition_id – Id of the partition to look up
  • offset
  • timestamp
  • metadata – arbitrary metadata that should be committed with this offset commit
class pykafka.protocol.PartitionOffsetFetchRequest

Bases: pykafka.protocol.PartitionOffsetFetchRequest

Offset fetch request for a specific topic/partition

Variables:
  • topic_name – Name of the topic to look up
  • partition_id – Id of the partition to look up
class pykafka.protocol.Request

Bases: pykafka.utils.Serializable

Base class for all Requests. Handles writing header information

API_KEY()

API key for this request, from the Kafka docs

__weakref__

list of weak references to the object (if defined)

_write_header(buff, api_version=0, correlation_id=0)

Write the header for an outgoing message.

Parameters:
  • buff (buffer) – The buffer into which to write the header
  • api_version (int) – The “kafka api version id”, used for feature flagging
  • correlation_id (int) – This is a user-supplied integer. It will be passed back in the response by the server, unmodified. It is useful for matching request and response between the client and server.
get_bytes()

Serialize the message

Returns:Serialized message
Return type:bytearray
class pykafka.protocol.Response

Bases: object

Base class for Response objects.

__weakref__

list of weak references to the object (if defined)

raise_error(err_code, response)

Raise an error based on the Kafka error code

Parameters:
  • err_code – The error code from Kafka
  • response – The unpacked raw data from the response
class pykafka.protocol.Message(value, partition_key=None, compression_type=0, offset=-1, partition_id=-1, produce_attempt=0, delivery_report_q=None)

Bases: pykafka.common.Message, pykafka.utils.Serializable

Representation of a Kafka Message

NOTE: Compression is handled in the protocol because of the way Kafka embeds compressed MessageSets within Messages

Specification:

Message => Crc MagicByte Attributes Key Value
  Crc => int32
  MagicByte => int8
  Attributes => int8
  Key => bytes
  Value => bytes

pykafka.protocol.Message also contains partition and partition_id fields. Both of these have meaningless default values. When pykafka.protocol.Message is used by the producer, partition_id identifies the Message’s destination partition. When used in a pykafka.protocol.FetchRequest, partition_id is set to the id of the partition from which the message was sent on receipt of the message. In the pykafka.simpleconsumer.SimpleConsumer, partition is set to the pykafka.partition.Partition instance from which the message was sent.

Variables:
  • compression_type – Type of compression to use for the message
  • partition_key – Value used to assign this message to a particular partition.
  • value – The payload associated with this message
  • offset – The offset of the message
  • partition_id – The id of the partition to which this message belongs
  • delivery_report_q – For use by pykafka.producer.Producer
pack_into(buff, offset)

Serialize and write to buff starting at offset offset.

Intentionally follows the pattern of struct.pack_into

Parameters:
  • buff – The buffer to write into
  • offset – The offset to start the write at
class pykafka.protocol.MessageSet(compression_type=0, messages=None)

Bases: pykafka.utils.Serializable

Representation of a set of messages in Kafka

This isn’t useful outside of direct communications with Kafka, so we keep it hidden away here.

N.B.: MessageSets are not preceded by an int32 like other array elements in the protocol.

Specification:

MessageSet => [Offset MessageSize Message]
  Offset => int64
  MessageSize => int32
Variables:
  • messages – The list of messages currently in the MessageSet
  • compression_type – compression to use for the messages
__init__(compression_type=0, messages=None)

Create a new MessageSet

Parameters:
  • compression_type – Compression to use on the messages
  • messages – An initial list of messages for the set
__len__()

Length of the serialized message, in bytes

We don’t put the MessageSetSize in front of the serialization because that’s technically not part of the MessageSet. Most requests/responses using MessageSets need that size, though, so be careful when using this.

__weakref__

list of weak references to the object (if defined)

_get_compressed()

Get a compressed representation of all current messages.

Returns a Message object with correct headers set and compressed data in the value field.

classmethod decode(buff, partition_id=-1)

Decode a serialized MessageSet.

pack_into(buff, offset)

Serialize and write to buff starting at offset offset.

Intentionally follows the pattern of struct.pack_into

Parameters:
  • buff – The buffer to write into
  • offset – The offset to start the write at