pykafka.protocol¶
-
class
pykafka.protocol.
MetadataRequest
(topics=None)¶ Bases:
pykafka.protocol.Request
Metadata Request
Specification:
MetadataRequest => [TopicName] TopicName => string
-
API_KEY
¶ API_KEY for this request, from the Kafka docs
-
__init__
(topics=None)¶ Create a new MetadataRequest
Parameters: topics – Topics to query. Leave empty for all available topics.
-
__len__
()¶ Length of the serialized message, in bytes
-
get_bytes
()¶ Serialize the message
Returns: Serialized message Return type: bytearray
-
-
class
pykafka.protocol.
MetadataResponse
(buff)¶ Bases:
pykafka.protocol.Response
Response from MetadataRequest
Specification:
MetadataResponse => [Broker][TopicMetadata] Broker => NodeId Host Port NodeId => int32 Host => string Port => int32 TopicMetadata => TopicErrorCode TopicName [PartitionMetadata] TopicErrorCode => int16 PartitionMetadata => PartitionErrorCode PartitionId Leader Replicas Isr PartitionErrorCode => int16 PartitionId => int32 Leader => int32 Replicas => [int32] Isr => [int32]
-
__init__
(buff)¶ Deserialize into a new Response
Parameters: buff ( bytearray
) – Serialized message
-
-
class
pykafka.protocol.
ProduceRequest
(compression_type=0, required_acks=1, timeout=10000)¶ Bases:
pykafka.protocol.Request
Produce Request
Specification:
ProduceRequest => RequiredAcks Timeout [TopicName [Partition MessageSetSize MessageSet]] RequiredAcks => int16 Timeout => int32 Partition => int32 MessageSetSize => int32
-
API_KEY
¶ API_KEY for this request, from the Kafka docs
-
__init__
(compression_type=0, required_acks=1, timeout=10000)¶ Create a new ProduceRequest
required_acks
determines how many acknowledgement the server waits for before returning. This is useful for ensuring the replication factor of published messages. The behavior is:-1: Block until all servers acknowledge 0: No waiting -- server doesn't even respond to the Produce request 1: Wait for this server to write to the local log and then return 2+: Wait for N servers to acknowledge
Parameters: - partition_requests – Iterable of
kafka.pykafka.protocol.PartitionProduceRequest
for this request - compression_type – Compression to use for messages
- required_acks – see docstring
- timeout – timeout (in ms) to wait for the required acks
- partition_requests – Iterable of
-
__len__
()¶ Length of the serialized message, in bytes
-
add_message
(message, topic_name, partition_id)¶ Add a list of
kafka.common.Message
to the waiting requestParameters: - messages – an iterable of
kafka.common.Message
to add - topic_name – the name of the topic to publish to
- partition_id – the partition to publish to
- messages – an iterable of
-
get_bytes
()¶ Serialize the message
Returns: Serialized message Return type: bytearray
-
message_count
()¶ Get the number of messages across all MessageSets in the request.
-
messages
¶ Iterable of all messages in the Request
-
-
class
pykafka.protocol.
ProduceResponse
(buff)¶ Bases:
pykafka.protocol.Response
Produce Response. Checks to make sure everything went okay.
Specification:
ProduceResponse => [TopicName [Partition ErrorCode Offset]] TopicName => string Partition => int32 ErrorCode => int16 Offset => int64
-
__init__
(buff)¶ Deserialize into a new Response
Parameters: buff ( bytearray
) – Serialized message
-
-
class
pykafka.protocol.
OffsetRequest
(partition_requests)¶ Bases:
pykafka.protocol.Request
An offset request
Specification:
OffsetRequest => ReplicaId [TopicName [Partition Time MaxNumberOfOffsets]] ReplicaId => int32 TopicName => string Partition => int32 Time => int64 MaxNumberOfOffsets => int32
-
API_KEY
¶ API_KEY for this request, from the Kafka docs
-
__init__
(partition_requests)¶ Create a new offset request
-
__len__
()¶ Length of the serialized message, in bytes
-
get_bytes
()¶ Serialize the message
Returns: Serialized message Return type: bytearray
-
-
class
pykafka.protocol.
OffsetResponse
(buff)¶ Bases:
pykafka.protocol.Response
An offset response
Specification:
OffsetResponse => [TopicName [PartitionOffsets]] PartitionOffsets => Partition ErrorCode [Offset] Partition => int32 ErrorCode => int16 Offset => int64
-
__init__
(buff)¶ Deserialize into a new Response
Parameters: buff ( bytearray
) – Serialized message
-
-
class
pykafka.protocol.
OffsetCommitRequest
(consumer_group, consumer_group_generation_id, consumer_id, partition_requests=[])¶ Bases:
pykafka.protocol.Request
An offset commit request
Specification:
OffsetCommitRequest => ConsumerGroupId ConsumerGroupGenerationId ConsumerId [TopicName [Partition Offset TimeStamp Metadata]] ConsumerGroupId => string ConsumerGroupGenerationId => int32 ConsumerId => string TopicName => string Partition => int32 Offset => int64 TimeStamp => int64 Metadata => string
-
API_KEY
¶ API_KEY for this request, from the Kafka docs
-
__init__
(consumer_group, consumer_group_generation_id, consumer_id, partition_requests=[])¶ Create a new offset commit request
Parameters: partition_requests – Iterable of kafka.pykafka.protocol.PartitionOffsetCommitRequest
for this request
-
__len__
()¶ Length of the serialized message, in bytes
-
get_bytes
()¶ Serialize the message
Returns: Serialized message Return type: bytearray
-
-
class
pykafka.protocol.
FetchRequest
(partition_requests=[], timeout=1000, min_bytes=1024)¶ Bases:
pykafka.protocol.Request
A Fetch request sent to Kafka
Specification:
FetchRequest => ReplicaId MaxWaitTime MinBytes [TopicName [Partition FetchOffset MaxBytes]] ReplicaId => int32 MaxWaitTime => int32 MinBytes => int32 TopicName => string Partition => int32 FetchOffset => int64 MaxBytes => int32
-
API_KEY
¶ API_KEY for this request, from the Kafka docs
-
__init__
(partition_requests=[], timeout=1000, min_bytes=1024)¶ Create a new fetch request
Kafka 0.8 uses long polling for fetch requests, which is different from 0.7x. Instead of polling and waiting, we can now set a timeout to wait and a minimum number of bytes to be collected before it returns. This way we can block effectively and also ensure good network throughput by having fewer, large transfers instead of many small ones every time a byte is written to the log.
Parameters: - partition_requests – Iterable of
kafka.pykafka..protocol.PartitionFetchRequest
for this request - timeout – Max time to wait (in ms) for a response from the server
- min_bytes – Minimum bytes to collect before returning
- partition_requests – Iterable of
-
__len__
()¶ Length of the serialized message, in bytes
-
add_request
(partition_request)¶ Add a topic/partition/offset to the requests
Parameters: - topic_name – The topic to fetch from
- partition_id – The partition to fetch from
- offset – The offset to start reading data from
- max_bytes – The maximum number of bytes to return in the response
-
get_bytes
()¶ Serialize the message
Returns: Serialized message Return type: bytearray
-
-
class
pykafka.protocol.
FetchResponse
(buff)¶ Bases:
pykafka.protocol.Response
Unpack a fetch response from the server
Specification:
FetchResponse => [TopicName [Partition ErrorCode HighwaterMarkOffset MessageSetSize MessageSet]] TopicName => string Partition => int32 ErrorCode => int16 HighwaterMarkOffset => int64 MessageSetSize => int32
-
__init__
(buff)¶ Deserialize into a new Response
Parameters: buff ( bytearray
) – Serialized message
-
_unpack_message_set
(buff, partition_id=-1)¶ MessageSets can be nested. Get just the Messages out of it.
-
-
class
pykafka.protocol.
PartitionFetchRequest
¶ Bases:
pykafka.protocol.PartitionFetchRequest
Fetch request for a specific topic/partition
Variables: - topic_name – Name of the topic to fetch from
- partition_id – Id of the partition to fetch from
- offset – Offset at which to start reading
- max_bytes – Max bytes to read from this partition (default: 300kb)
-
class
pykafka.protocol.
OffsetCommitResponse
(buff)¶ Bases:
pykafka.protocol.Response
An offset commit response
Specification:
OffsetCommitResponse => [TopicName [Partition ErrorCode]]] TopicName => string Partition => int32 ErrorCode => int16
-
__init__
(buff)¶ Deserialize into a new Response
Parameters: buff ( bytearray
) – Serialized message
-
-
class
pykafka.protocol.
OffsetFetchRequest
(consumer_group, partition_requests=[])¶ Bases:
pykafka.protocol.Request
An offset fetch request
Specification:
OffsetFetchRequest => ConsumerGroup [TopicName [Partition]] ConsumerGroup => string TopicName => string Partition => int32
-
API_KEY
¶ API_KEY for this request, from the Kafka docs
-
__init__
(consumer_group, partition_requests=[])¶ Create a new offset fetch request
Parameters: partition_requests – Iterable of kafka.pykafka.protocol.PartitionOffsetFetchRequest
for this request
-
__len__
()¶ Length of the serialized message, in bytes
-
get_bytes
()¶ Serialize the message
Returns: Serialized message Return type: bytearray
-
-
class
pykafka.protocol.
OffsetFetchResponse
(buff)¶ Bases:
pykafka.protocol.Response
An offset fetch response
Specification:
OffsetFetchResponse => [TopicName [Partition Offset Metadata ErrorCode]] TopicName => string Partition => int32 Offset => int64 Metadata => string ErrorCode => int16
-
__init__
(buff)¶ Deserialize into a new Response
Parameters: buff ( bytearray
) – Serialized message
-
-
class
pykafka.protocol.
PartitionOffsetRequest
¶ Bases:
pykafka.protocol.PartitionOffsetRequest
Offset request for a specific topic/partition
Variables: - topic_name – Name of the topic to look up
- partition_id – Id of the partition to look up
- offsets_before – Retrieve offset information for messages before this timestamp (ms). -1 will retrieve the latest offsets and -2 will retrieve the earliest available offset. If -2,only 1 offset is returned
- max_offsets – How many offsets to return
-
class
pykafka.protocol.
ConsumerMetadataRequest
(consumer_group)¶ Bases:
pykafka.protocol.Request
A consumer metadata request
Specification:
ConsumerMetadataRequest => ConsumerGroup ConsumerGroup => string
-
API_KEY
¶ API_KEY for this request, from the Kafka docs
-
__init__
(consumer_group)¶ Create a new consumer metadata request
-
__len__
()¶ Length of the serialized message, in bytes
-
get_bytes
()¶ Serialize the message
Returns: Serialized message Return type: bytearray
-
-
class
pykafka.protocol.
ConsumerMetadataResponse
(buff)¶ Bases:
pykafka.protocol.Response
A consumer metadata response
Specification:
ConsumerMetadataResponse => ErrorCode CoordinatorId CoordinatorHost CoordinatorPort ErrorCode => int16 CoordinatorId => int32 CoordinatorHost => string CoordinatorPort => int32
-
__init__
(buff)¶ Deserialize into a new Response
Parameters: buff ( bytearray
) – Serialized message
-
-
class
pykafka.protocol.
PartitionOffsetCommitRequest
¶ Bases:
pykafka.protocol.PartitionOffsetCommitRequest
Offset commit request for a specific topic/partition
Variables: - topic_name – Name of the topic to look up
- partition_id – Id of the partition to look up
- offset –
- timestamp –
- metadata – arbitrary metadata that should be committed with this offset commit
-
class
pykafka.protocol.
PartitionOffsetFetchRequest
¶ Bases:
pykafka.protocol.PartitionOffsetFetchRequest
Offset fetch request for a specific topic/partition
Variables: - topic_name – Name of the topic to look up
- partition_id – Id of the partition to look up
-
class
pykafka.protocol.
Request
¶ Bases:
pykafka.utils.Serializable
Base class for all Requests. Handles writing header information
-
API_KEY
()¶ API key for this request, from the Kafka docs
-
_write_header
(buff, api_version=0, correlation_id=0)¶ Write the header for an outgoing message.
Parameters: - buff (buffer) – The buffer into which to write the header
- api_version (int) – The “kafka api version id”, used for feature flagging
- correlation_id (int) – This is a user-supplied integer. It will be passed back in the response by the server, unmodified. It is useful for matching request and response between the client and server.
-
get_bytes
()¶ Serialize the message
Returns: Serialized message Return type: bytearray
-
-
class
pykafka.protocol.
Response
¶ Bases:
object
Base class for Response objects.
-
__weakref__
¶ list of weak references to the object (if defined)
-
raise_error
(err_code, response)¶ Raise an error based on the Kafka error code
Parameters: - err_code – The error code from Kafka
- response – The unpacked raw data from the response
-
-
class
pykafka.protocol.
Message
(value, partition_key=None, compression_type=0, offset=-1, partition_id=-1, produce_attempt=0)¶ Bases:
pykafka.common.Message
,pykafka.utils.Serializable
Representation of a Kafka Message
NOTE: Compression is handled in the protocol because of the way Kafka embeds compressed MessageSets within Messages
Specification:
Message => Crc MagicByte Attributes Key Value Crc => int32 MagicByte => int8 Attributes => int8 Key => bytes Value => bytes
pykafka.protocol.Message
also contains partition and partition_id fields. Both of these have meaningless default values whenpykafka.protocol.Message
is used by the producer. When used in apykafka.protocol.FetchRequest
, partition_id is set to the id of the partition from which the message was sent on receipt of the message. In thepykafka.simpleconsumer.SimpleConsumer
, partition is set to thepykafka.partition.Partition
instance from which the message was sent.Variables: - compression_type – Type of compression to use for the message
- partition_key – Value used to assign this message to a particular partition.
- value – The payload associated with this message
- offset – The offset of the message
- partition_id – The id of the partition to which this message belongs
-
pack_into
(buff, offset)¶ Serialize and write to
buff
starting at offsetoffset
.Intentionally follows the pattern of
struct.pack_into
Parameters: - buff – The buffer to write into
- offset – The offset to start the write at
-
class
pykafka.protocol.
MessageSet
(compression_type=0, messages=None)¶ Bases:
pykafka.utils.Serializable
Representation of a set of messages in Kafka
This isn’t useful outside of direct communications with Kafka, so we keep it hidden away here.
N.B.: MessageSets are not preceded by an int32 like other array elements in the protocol.
Specification:
MessageSet => [Offset MessageSize Message] Offset => int64 MessageSize => int32
Variables: - messages – The list of messages currently in the MessageSet
- compression_type – compression to use for the messages
-
__init__
(compression_type=0, messages=None)¶ Create a new MessageSet
Parameters: - compression_type – Compression to use on the messages
- messages – An initial list of messages for the set
-
__len__
()¶ Length of the serialized message, in bytes
We don’t put the MessageSetSize in front of the serialization because that’s technically not part of the MessageSet. Most requests/responses using MessageSets need that size, though, so be careful when using this.
-
_get_compressed
()¶ Get a compressed representation of all current messages.
Returns a Message object with correct headers set and compressed data in the value field.
-
classmethod
decode
(buff, partition_id=-1)¶ Decode a serialized MessageSet.
-
pack_into
(buff, offset)¶ Serialize and write to
buff
starting at offsetoffset
.Intentionally follows the pattern of
struct.pack_into
Parameters: - buff – The buffer to write into
- offset – The offset to start the write at