pykafka.protocol

class pykafka.protocol.MetadataRequest(topics=None)

Bases: pykafka.protocol.Request

Metadata Request

MetadataRequest => [TopicName]
TopicName => string
API_KEY

API_KEY for this request, from the Kafka docs

__init__(topics=None)

Create a new MetadatRequest

Parameters:topics – Topics to query. Leave empty for all available topics.
__len__()

Length of the serialized message, in bytes

get_bytes()

Serialize the message

Returns:Serialized message
Return type:bytearray
class pykafka.protocol.MetadataResponse(buff)

Bases: pykafka.protocol.Response

Response from MetadataRequest

MetadataResponse => [Broker][TopicMetadata]   Broker => NodeId Host Port   NodeId => int32   Host => string   Port => int32   TopicMetadata => TopicErrorCode TopicName [PartitionMetadata]   TopicErrorCode => int16   PartitionMetadata => PartitionErrorCode PartitionId Leader Replicas Isr   PartitionErrorCode => int16   PartitionId => int32   Leader => int32   Replicas => [int32]   Isr => [int32]

__init__(buff)

Deserialize into a new Response

Parameters:buff (bytearray) – Serialized message
class pykafka.protocol.ProduceRequest(compression_type=0, required_acks=1, timeout=10000)

Bases: pykafka.protocol.Request

Produce Request

ProduceRequest => RequiredAcks Timeout [TopicName [Partition MessageSetSize MessageSet]]   RequiredAcks => int16   Timeout => int32   Partition => int32   MessageSetSize => int32

API_KEY

API_KEY for this request, from the Kafka docs

__init__(compression_type=0, required_acks=1, timeout=10000)

Create a new ProduceRequest

required_acks determines how many acknowledgement the server waits for before returning. This is useful for ensuring the replication factor of published messages. The behavior is:

-1: Block until all servers acknowledge 0: No waiting – server doesn’t even respond to the Produce request 1: Wait for this server to write to the local log and then return 2+: Wait for N servers to acknowledge
Parameters:
  • partition_requests – Iterable of kafka.pykafka.protocol.PartitionProduceRequest for this request
  • compression_type – Compression to use for messages
  • required_acks – see docstring
  • timeout – timeout (in ms) to wait for the required acks
__len__()

Length of the serialized message, in bytes

add_message(message, topic_name, partition_id)

Add a list of kafka.common.Message to the waiting request

Parameters:
  • messages – an iterable of kafka.common.Message to add
  • topic_name – the name of the topic to publish to
  • partition_id – the partition to publish to
get_bytes()

Serialize the message

Returns:Serialized message
Return type:bytearray
message_count()

Get the number of messages across all MessageSets in the request.

messages

Iterable of all messages in the Request

class pykafka.protocol.ProduceResponse(buff)

Bases: pykafka.protocol.Response

Produce Response. Checks to make sure everything went okay.

ProduceResponse => [TopicName [Partition ErrorCode Offset]]   TopicName => string   Partition => int32   ErrorCode => int16   Offset => int64

__init__(buff)

Deserialize into a new Response

Parameters:buff (bytearray) – Serialized message
class pykafka.protocol.OffsetRequest(partition_requests)

Bases: pykafka.protocol.Request

An offset request

OffsetRequest => ReplicaId [TopicName [Partition Time MaxNumberOfOffsets]]   ReplicaId => int32   TopicName => string   Partition => int32   Time => int64   MaxNumberOfOffsets => int32

API_KEY

API_KEY for this request, from the Kafka docs

__init__(partition_requests)

Create a new offset request

__len__()

Length of the serialized message, in bytes

get_bytes()

Serialize the message

Returns:Serialized message
Return type:bytearray
class pykafka.protocol.OffsetResponse(buff)

Bases: pykafka.protocol.Response

An offset response

OffsetResponse => [TopicName [PartitionOffsets]]   PartitionOffsets => Partition ErrorCode [Offset]   Partition => int32   ErrorCode => int16   Offset => int64

__init__(buff)

Deserialize into a new Response

Parameters:buff (bytearray) – Serialized message
class pykafka.protocol.OffsetCommitRequest(consumer_group, consumer_group_generation_id, consumer_id, partition_requests=[])

Bases: pykafka.protocol.Request

An offset commit request

OffsetCommitRequest => ConsumerGroupId ConsumerGroupGenerationId ConsumerId [TopicName [Partition Offset TimeStamp Metadata]]
ConsumerGroupId => string ConsumerGroupGenerationId => int32 ConsumerId => string TopicName => string Partition => int32 Offset => int64 TimeStamp => int64 Metadata => string
API_KEY

API_KEY for this request, from the Kafka docs

__init__(consumer_group, consumer_group_generation_id, consumer_id, partition_requests=[])

Create a new offset commit request

Parameters:partition_requests – Iterable of kafka.pykafka.protocol.PartitionOffsetCommitRequest for this request
__len__()

Length of the serialized message, in bytes

get_bytes()

Serialize the message

Returns:Serialized message
Return type:bytearray
class pykafka.protocol.FetchRequest(partition_requests=[], timeout=1000, min_bytes=1024)

Bases: pykafka.protocol.Request

A Fetch request sent to Kafka

FetchRequest => ReplicaId MaxWaitTime MinBytes [TopicName [Partition FetchOffset MaxBytes]]   ReplicaId => int32   MaxWaitTime => int32   MinBytes => int32   TopicName => string   Partition => int32   FetchOffset => int64   MaxBytes => int32

API_KEY

API_KEY for this request, from the Kafka docs

__init__(partition_requests=[], timeout=1000, min_bytes=1024)

Create a new fetch request

Kafka 0.8 uses long polling for fetch requests, which is different from 0.7x. Instead of polling and waiting, we can now set a timeout to wait and a minimum number of bytes to be collected before it returns. This way we can block effectively and also ensure good network throughput by having fewer, large transfers instead of many small ones every time a byte is written to the log.

Parameters:
  • partition_requests – Iterable of kafka.pykafka..protocol.PartitionFetchRequest for this request
  • timeout – Max time to wait (in ms) for a response from the server
  • min_bytes – Minimum bytes to collect before returning
__len__()

Length of the serialized message, in bytes

add_request(partition_request)

Add a topic/partition/offset to the requests

Parameters:
  • topic_name – The topic to fetch from
  • partition_id – The partition to fetch from
  • offset – The offset to start reading data from
  • max_bytes – The maximum number of bytes to return in the response
get_bytes()

Serialize the message

Returns:Serialized message
Return type:bytearray
class pykafka.protocol.FetchResponse(buff)

Bases: pykafka.protocol.Response

Unpack a fetch response from the server

FetchResponse => [TopicName [Partition ErrorCode HighwaterMarkOffset MessageSetSize MessageSet]]   TopicName => string   Partition => int32   ErrorCode => int16   HighwaterMarkOffset => int64   MessageSetSize => int32

__init__(buff)

Deserialize into a new Response

Parameters:buff (bytearray) – Serialized message
_unpack_message_set(buff, partition_id=-1)

MessageSets can be nested. Get just the Messages out of it.

class pykafka.protocol.PartitionFetchRequest

Bases: pykafka.protocol.PartitionFetchRequest

Fetch request for a specific topic/partition

Variables:
  • topic_name – Name of the topic to fetch from
  • partition_id – Id of the partition to fetch from
  • offset – Offset at which to start reading
  • max_bytes – Max bytes to read from this partition (default: 300kb)
class pykafka.protocol.OffsetCommitResponse(buff)

Bases: pykafka.protocol.Response

An offset commit response

OffsetCommitResponse => [TopicName [Partition ErrorCode]]]
TopicName => string Partition => int32 ErrorCode => int16
__init__(buff)

Deserialize into a new Response

Parameters:buff (bytearray) – Serialized message
class pykafka.protocol.OffsetFetchRequest(consumer_group, partition_requests=[])

Bases: pykafka.protocol.Request

An offset fetch request

OffsetFetchRequest => ConsumerGroup [TopicName [Partition]]
ConsumerGroup => string TopicName => string Partition => int32
API_KEY

API_KEY for this request, from the Kafka docs

__init__(consumer_group, partition_requests=[])

Create a new offset fetch request

Parameters:partition_requests – Iterable of kafka.pykafka.protocol.PartitionOffsetFetchRequest for this request
__len__()

Length of the serialized message, in bytes

get_bytes()

Serialize the message

Returns:Serialized message
Return type:bytearray
class pykafka.protocol.OffsetFetchResponse(buff)

Bases: pykafka.protocol.Response

An offset fetch response

OffsetFetchResponse => [TopicName [Partition Offset Metadata ErrorCode]]
TopicName => string Partition => int32 Offset => int64 Metadata => string ErrorCode => int16
__init__(buff)

Deserialize into a new Response

Parameters:buff (bytearray) – Serialized message
class pykafka.protocol.PartitionOffsetRequest

Bases: pykafka.protocol.PartitionOffsetRequest

Offset request for a specific topic/partition

Variables:
  • topic_name – Name of the topic to look up
  • partition_id – Id of the partition to look up
  • offsets_before – Retrieve offset information for messages before this timestamp (ms). -1 will retrieve the latest offsets and -2 will retrieve the earliest available offset. If -2,only 1 offset is returned
  • max_offsets – How many offsets to return
class pykafka.protocol.ConsumerMetadataRequest(consumer_group)

Bases: pykafka.protocol.Request

A consumer metadata request

ConsumerMetadataRequest => ConsumerGroup
ConsumerGroup => string
API_KEY

API_KEY for this request, from the Kafka docs

__init__(consumer_group)

Create a new consumer metadata request

__len__()

Length of the serialized message, in bytes

get_bytes()

Serialize the message

Returns:Serialized message
Return type:bytearray
class pykafka.protocol.ConsumerMetadataResponse(buff)

Bases: pykafka.protocol.Response

A consumer metadata response

ConsumerMetadataResponse => ErrorCode CoordinatorId CoordinatorHost CoordinatorPort
ErrorCode => int16 CoordinatorId => int32 CoordinatorHost => string CoordinatorPort => int32
__init__(buff)

Deserialize into a new Response

Parameters:buff (bytearray) – Serialized message
class pykafka.protocol.PartitionOffsetCommitRequest

Bases: pykafka.protocol.PartitionOffsetCommitRequest

Offset commit request for a specific topic/partition

Variables:
  • topic_name – Name of the topic to look up
  • partition_id – Id of the partition to look up
  • offset
  • timestamp
  • metadata – arbitrary metadata that should be committed with this offset commit
class pykafka.protocol.PartitionOffsetFetchRequest

Bases: pykafka.protocol.PartitionOffsetFetchRequest

Offset fetch request for a specific topic/partition

Variables:
  • topic_name – Name of the topic to look up
  • partition_id – Id of the partition to look up
class pykafka.protocol.Request

Bases: pykafka.utils.Serializable

Base class for all Requests. Handles writing header information

API_KEY()

API key for this request, from the Kafka docs

_write_header(buff, api_version=0, correlation_id=0)

Write the header for an outgoing message.

Parameters:
  • buff (buffer) – The buffer into which to write the header
  • api_version (int) – The “kafka api version id”, used for feature flagging
  • correlation_id (int) – This is a user-supplied integer. It will be passed back in the response by the server, unmodified. It is useful for matching request and response between the client and server.
get_bytes()

Serialize the message

Returns:Serialized message
Return type:bytearray
class pykafka.protocol.Response

Bases: object

Base class for Response objects.

__weakref__

list of weak references to the object (if defined)

raise_error(err_code, response)

Raise an error based on the Kafka error code

Parameters:
  • err_code – The error code from Kafka
  • response – The unpacked raw data from the response
class pykafka.protocol.Message(value, partition_key=None, compression_type=0, offset=-1, partition_id=-1)

Bases: pykafka.common.Message, pykafka.utils.Serializable

Representation of a Kafka Message

NOTE: Compression is handled in the protocol because
of the way Kafka embeds compressed MessageSets within Messages

Message => Crc MagicByte Attributes Key Value   Crc => int32   MagicByte => int8   Attributes => int8   Key => bytes   Value => bytes

pykafka.protocol.Message also contains partition and partition_id fields. Both of these have meaningless default values. When pykafka.protocol.Message is used by the producer. When used in a pykafka.protocol.FetchRequest, partition_id is set to the id of the partition from which the message was sent on receipt of the message. In the pykafka.simpleconsumer.SimpleConsumer, partition is set to the pykafka.partition.Partition instance from which the message was sent.

pack_into(buff, offset)

Serialize and write to buff starting at offset offset.

Intentionally follows the pattern of struct.pack_into

Parameters:
  • buff – The buffer to write into
  • offset – The offset to start the write at
class pykafka.protocol.MessageSet(compression_type=0, messages=None)

Bases: pykafka.utils.Serializable

Representation of a set of messages in Kafka

This isn’t useful outside of direct communications with Kafka, so we keep it hidden away here.

N.B.: MessageSets are not preceded by an int32 like other
array elements in the protocol.

MessageSet => [Offset MessageSize Message]   Offset => int64   MessageSize => int32

Variables:
  • messages – The list of messages currently in the MessageSet
  • compression_type – compression to use for the messages
__init__(compression_type=0, messages=None)

Create a new MessageSet

Parameters:
  • compression_type – Compression to use on the messages
  • messages – An initial list of messages for the set
__len__()

Length of the serialized message, in bytes

We don’t put the MessageSetSize in front of the serialization because that’s technically not part of the MessageSet. Most requests/responses using MessageSets need that size, though, so be careful when using this.

_get_compressed()

Get a compressed representation of all current messages.

Returns a Message object with correct headers set and compressed data in the value field.

classmethod decode(buff, partition_id=-1)

Decode a serialized MessageSet.

pack_into(buff, offset)

Serialize and write to buff starting at offset offset.

Intentionally follows the pattern of struct.pack_into

Parameters:
  • buff – The buffer to write into
  • offset – The offset to start the write at