pykafka.protocol

class pykafka.protocol.MetadataRequest(topics=None, *kwargs)

Bases: pykafka.protocol.base.Request

Metadata Request Specification:

MetadataRequest => [TopicName]
    TopicName => string
__init__(topics=None, *kwargs)

Create a new MetadataRequest :param topics: Topics to query. Leave empty for all available topics.

__len__()

Length of the serialized message, in bytes

get_bytes()

Serialize the message :returns: Serialized message :rtype: bytearray

class pykafka.protocol.MetadataResponse(buff)

Bases: pykafka.protocol.base.Response

Response from MetadataRequest Specification:: Metadata Response (Version: 0) => [brokers] [topic_metadata]

brokers => node_id host port
node_id => INT32 host => STRING port => INT32
topic_metadata => error_code topic [partition_metadata]

error_code => INT16 topic => STRING partition_metadata => error_code partition leader [replicas] [isr]

error_code => INT16 partition => INT32 leader => INT32 replicas => INT32 isr => INT32
__init__(buff)

Deserialize into a new Response :param buff: Serialized message :type buff: bytearray

class pykafka.protocol.ProduceRequest(compression_type=0, required_acks=1, timeout=10000, broker_version='0.9.0')

Bases: pykafka.protocol.base.Request

Produce Request Specification:

ProduceRequest => RequiredAcks Timeout [TopicName [Partition MessageSetSize MessageSet]]
  RequiredAcks => int16
  Timeout => int32
  Partition => int32
  MessageSetSize => int32
__init__(compression_type=0, required_acks=1, timeout=10000, broker_version='0.9.0')

Create a new ProduceRequest required_acks determines how many acknowledgement the server waits for before returning. This is useful for ensuring the replication factor of published messages. The behavior is:

-1: Block until all servers acknowledge
0: No waiting -- server doesn't even respond to the Produce request
1: Wait for this server to write to the local log and then return
2+: Wait for N servers to acknowledge
Parameters:
  • partition_requests – Iterable of kafka.pykafka.protocol.PartitionProduceRequest for this request
  • compression_type – Compression to use for messages
  • required_acks – see docstring
  • timeout – timeout (in ms) to wait for the required acks
__len__()

Length of the serialized message, in bytes

add_message(message, topic_name, partition_id)

Add a list of kafka.common.Message to the waiting request :param messages: an iterable of kafka.common.Message to add :param topic_name: the name of the topic to publish to :param partition_id: the partition to publish to

get_bytes()

Serialize the message :returns: Serialized message :rtype: bytearray

message_count()

Get the number of messages across all MessageSets in the request.

messages

Iterable of all messages in the Request

class pykafka.protocol.ProduceResponse(buff)

Bases: pykafka.protocol.base.Response

Produce Response. Checks to make sure everything went okay. Specification:

ProduceResponse => [TopicName [Partition ErrorCode Offset]]
  TopicName => string
  Partition => int32
  ErrorCode => int16
  Offset => int64
__init__(buff)

Deserialize into a new Response :param buff: Serialized message :type buff: bytearray

class pykafka.protocol.PartitionFetchRequest

Bases: pykafka.protocol.fetch.PartitionFetchRequest

Fetch request for a specific topic/partition

Variables:
  • topic_name – Name of the topic to fetch from
  • partition_id – Id of the partition to fetch from
  • offset – Offset at which to start reading
  • max_bytes – Max bytes to read from this partition (default: 300kb)
class pykafka.protocol.FetchRequest(partition_requests=[], timeout=1000, min_bytes=1024, api_version=0)

Bases: pykafka.protocol.base.Request

A Fetch request sent to Kafka

Specification:

FetchRequest => ReplicaId MaxWaitTime MinBytes [TopicName [Partition FetchOffset MaxBytes]]
  ReplicaId => int32
  MaxWaitTime => int32
  MinBytes => int32
  TopicName => string
  Partition => int32
  FetchOffset => int64
  MaxBytes => int32
__init__(partition_requests=[], timeout=1000, min_bytes=1024, api_version=0)

Create a new fetch request

Kafka 0.8 uses long polling for fetch requests, which is different from 0.7x. Instead of polling and waiting, we can now set a timeout to wait and a minimum number of bytes to be collected before it returns. This way we can block effectively and also ensure good network throughput by having fewer, large transfers instead of many small ones every time a byte is written to the log.

Parameters:
  • partition_requests – Iterable of kafka.pykafka..protocol.PartitionFetchRequest for this request
  • timeout – Max time to wait (in ms) for a response from the server
  • min_bytes – Minimum bytes to collect before returning
__len__()

Length of the serialized message, in bytes

add_request(partition_request)

Add a topic/partition/offset to the requests

Parameters:
  • topic_name – The topic to fetch from
  • partition_id – The partition to fetch from
  • offset – The offset to start reading data from
  • max_bytes – The maximum number of bytes to return in the response
get_bytes()

Serialize the message

Returns:Serialized message
Return type:bytearray
class pykafka.protocol.FetchPartitionResponse(max_offset, messages, err)

Bases: tuple

__getnewargs__()

Return self as a plain tuple. Used by copy and pickle.

__getstate__()

Exclude the OrderedDict from pickling

static __new__(_cls, max_offset, messages, err)

Create new instance of FetchPartitionResponse(max_offset, messages, err)

__repr__()

Return a nicely formatted representation string

_asdict()

Return a new OrderedDict which maps field names to their values

classmethod _make(iterable, new=<built-in method __new__ of type object at 0x906d60>, len=<built-in function len>)

Make a new FetchPartitionResponse object from a sequence or iterable

_replace(**kwds)

Return a new FetchPartitionResponse object replacing specified fields with new values

err

Alias for field number 2

max_offset

Alias for field number 0

messages

Alias for field number 1

class pykafka.protocol.FetchResponse(buff, offset=0, broker_version='0.9.0')

Bases: pykafka.protocol.base.Response

Unpack a fetch response from the server

Specification:

FetchResponse => [TopicName [Partition ErrorCode HighwaterMarkOffset MessageSetSize MessageSet]]
  TopicName => string
  Partition => int32
  ErrorCode => int16
  HighwaterMarkOffset => int64
  MessageSetSize => int32
__init__(buff, offset=0, broker_version='0.9.0')

Deserialize into a new Response

Parameters:
  • buff (bytearray) – Serialized message
  • offset (int) – Offset into the message
_unpack_message_set(buff, partition_id=-1, broker_version='0.9.0')

MessageSets can be nested. Get just the Messages out of it.

class pykafka.protocol.ListOffsetRequest(partition_requests)

Bases: pykafka.protocol.base.Request

An offset request Specification:

ListOffsetRequest => ReplicaId [TopicName [Partition Time MaxNumberOfOffsets]]
  ReplicaId => int32
  TopicName => string
  Partition => int32
  Time => int64
  MaxNumberOfOffsets => int32
__init__(partition_requests)

Create a new offset request

__len__()

Length of the serialized message, in bytes

get_bytes()

Serialize the message :returns: Serialized message :rtype: bytearray

class pykafka.protocol.ListOffsetResponse(buff)

Bases: pykafka.protocol.base.Response

An offset response Specification:

ListOffsetResponse => [TopicName [PartitionOffsets]]
  PartitionOffsets => Partition ErrorCode [Offset]
  Partition => int32
  ErrorCode => int16
  Offset => int64
__init__(buff)

Deserialize into a new Response :param buff: Serialized message :type buff: bytearray

class pykafka.protocol.GroupCoordinatorRequest(consumer_group)

Bases: pykafka.protocol.base.Request

A consumer metadata request Specification:

GroupCoordinatorRequest => ConsumerGroup
    ConsumerGroup => string
__init__(consumer_group)

Create a new group coordinator request

__len__()

Length of the serialized message, in bytes

get_bytes()

Serialize the message :returns: Serialized message :rtype: bytearray

class pykafka.protocol.GroupCoordinatorResponse(buff)

Bases: pykafka.protocol.base.Response

A group coordinator response Specification:

GroupCoordinatorResponse => ErrorCode CoordinatorId CoordinatorHost CoordinatorPort
    ErrorCode => int16
    CoordinatorId => int32
    CoordinatorHost => string
    CoordinatorPort => int32
__init__(buff)

Deserialize into a new Response :param buff: Serialized message :type buff: bytearray

class pykafka.protocol.PartitionOffsetCommitRequest

Bases: pykafka.protocol.offset_commit.PartitionOffsetCommitRequest

Offset commit request for a specific topic/partition :ivar topic_name: Name of the topic to look up :ivar partition_id: Id of the partition to look up :ivar offset: :ivar timestamp: :ivar metadata: arbitrary metadata that should be committed with this offset commit

class pykafka.protocol.OffsetCommitRequest(consumer_group, consumer_group_generation_id, consumer_id, partition_requests=[])

Bases: pykafka.protocol.base.Request

An offset commit request Specification:

OffsetCommitRequest => ConsumerGroupId ConsumerGroupGenerationId ConsumerId [TopicName [Partition Offset TimeStamp Metadata]]
    ConsumerGroupId => string
    ConsumerGroupGenerationId => int32
    ConsumerId => string
    TopicName => string
    Partition => int32
    Offset => int64
    TimeStamp => int64
    Metadata => string
__init__(consumer_group, consumer_group_generation_id, consumer_id, partition_requests=[])

Create a new offset commit request :param partition_requests: Iterable of

kafka.pykafka.protocol.PartitionOffsetCommitRequest for this request
__len__()

Length of the serialized message, in bytes

get_bytes()

Serialize the message :returns: Serialized message :rtype: bytearray

class pykafka.protocol.OffsetCommitPartitionResponse(err)

Bases: tuple

__getnewargs__()

Return self as a plain tuple. Used by copy and pickle.

__getstate__()

Exclude the OrderedDict from pickling

static __new__(_cls, err)

Create new instance of OffsetCommitPartitionResponse(err,)

__repr__()

Return a nicely formatted representation string

_asdict()

Return a new OrderedDict which maps field names to their values

classmethod _make(iterable, new=<built-in method __new__ of type object at 0x906d60>, len=<built-in function len>)

Make a new OffsetCommitPartitionResponse object from a sequence or iterable

_replace(**kwds)

Return a new OffsetCommitPartitionResponse object replacing specified fields with new values

err

Alias for field number 0

class pykafka.protocol.OffsetCommitResponse(buff)

Bases: pykafka.protocol.base.Response

An offset commit response Specification:

OffsetCommitResponse => [TopicName [Partition ErrorCode]]]
    TopicName => string
    Partition => int32
    ErrorCode => int16
__init__(buff)

Deserialize into a new Response :param buff: Serialized message :type buff: bytearray

class pykafka.protocol.PartitionOffsetFetchRequest

Bases: pykafka.protocol.offset_commit.PartitionOffsetFetchRequest

Offset fetch request for a specific topic/partition :ivar topic_name: Name of the topic to look up :ivar partition_id: Id of the partition to look up

class pykafka.protocol.OffsetFetchRequest(consumer_group, partition_requests=[])

Bases: pykafka.protocol.base.Request

An offset fetch request Specification:

OffsetFetchRequest => ConsumerGroup [TopicName [Partition]]
    ConsumerGroup => string
    TopicName => string
    Partition => int32
__init__(consumer_group, partition_requests=[])

Create a new offset fetch request :param partition_requests: Iterable of

kafka.pykafka.protocol.PartitionOffsetFetchRequest for this request
__len__()

Length of the serialized message, in bytes

get_bytes()

Serialize the message :returns: Serialized message :rtype: bytearray

class pykafka.protocol.OffsetFetchPartitionResponse(offset, metadata, err)

Bases: tuple

__getnewargs__()

Return self as a plain tuple. Used by copy and pickle.

__getstate__()

Exclude the OrderedDict from pickling

static __new__(_cls, offset, metadata, err)

Create new instance of OffsetFetchPartitionResponse(offset, metadata, err)

__repr__()

Return a nicely formatted representation string

_asdict()

Return a new OrderedDict which maps field names to their values

classmethod _make(iterable, new=<built-in method __new__ of type object at 0x906d60>, len=<built-in function len>)

Make a new OffsetFetchPartitionResponse object from a sequence or iterable

_replace(**kwds)

Return a new OffsetFetchPartitionResponse object replacing specified fields with new values

err

Alias for field number 2

metadata

Alias for field number 1

offset

Alias for field number 0

class pykafka.protocol.OffsetFetchResponse(buff)

Bases: pykafka.protocol.base.Response

An offset fetch response v0 Specification:: OffsetFetch Response (Version: 0) => [responses]

responses => topic [partition_responses]

topic => STRING partition_responses => partition offset metadata error_code

partition => INT32 offset => INT64 metadata => NULLABLE_STRING error_code => INT16
__init__(buff)

Deserialize into a new Response :param buff: Serialized message :type buff: bytearray

class pykafka.protocol.JoinGroupRequest(group_id, member_id, topic_name, membership_protocol, session_timeout=30000)

Bases: pykafka.protocol.base.Request

A group join request Specification:: JoinGroupRequest => GroupId SessionTimeout MemberId ProtocolType GroupProtocols

GroupId => string SessionTimeout => int32 MemberId => string ProtocolType => string GroupProtocols => [ProtocolName ProtocolMetadata]

ProtocolName => string ProtocolMetadata => bytes
__init__(group_id, member_id, topic_name, membership_protocol, session_timeout=30000)

Create a new group join request

__len__()

Length of the serialized message, in bytes

get_bytes()

Serialize the message :returns: Serialized message :rtype: bytearray

class pykafka.protocol.JoinGroupResponse(buff)

Bases: pykafka.protocol.base.Response

A group join response Specification:: JoinGroupResponse => ErrorCode GenerationId GroupProtocol LeaderId MemberId Members

ErrorCode => int16 GenerationId => int32 GroupProtocol => string LeaderId => string MemberId => string Members => [MemberId MemberMetadata]

MemberId => string MemberMetadata => bytes
__init__(buff)

Deserialize into a new Response :param buff: Serialized message :type buff: bytearray

class pykafka.protocol.SyncGroupRequest(group_id, generation_id, member_id, group_assignment)

Bases: pykafka.protocol.base.Request

A group sync request Specification:: SyncGroupRequest => GroupId GenerationId MemberId GroupAssignment

GroupId => string GenerationId => int32 MemberId => string GroupAssignment => [MemberId MemberAssignment]

MemberId => string MemberAssignment => bytes
__init__(group_id, generation_id, member_id, group_assignment)

Create a new group join request

__len__()

Length of the serialized message, in bytes

get_bytes()

Serialize the message :returns: Serialized message :rtype: bytearray

class pykafka.protocol.SyncGroupResponse(buff)

Bases: pykafka.protocol.base.Response

A group sync response Specification:: SyncGroupResponse => ErrorCode MemberAssignment

ErrorCode => int16 MemberAssignment => bytes
__init__(buff)

Deserialize into a new Response :param buff: Serialized message :type buff: bytearray

class pykafka.protocol.HeartbeatRequest(group_id, generation_id, member_id)

Bases: pykafka.protocol.base.Request

A group heartbeat request Specification:: HeartbeatRequest => GroupId GenerationId MemberId

GroupId => string GenerationId => int32 MemberId => string
__init__(group_id, generation_id, member_id)

Create a new heartbeat request

__len__()

Length of the serialized message, in bytes

get_bytes()

Serialize the message :returns: Serialized message :rtype: bytearray

class pykafka.protocol.HeartbeatResponse(buff)

Bases: pykafka.protocol.base.Response

A group heartbeat response Specification:: HeartbeatResponse => ErrorCode

ErrorCode => int16
__init__(buff)

Deserialize into a new Response :param buff: Serialized message :type buff: bytearray

class pykafka.protocol.LeaveGroupRequest(group_id, member_id)

Bases: pykafka.protocol.base.Request

A group exit request Specification:: LeaveGroupRequest => GroupId MemberId

GroupId => string MemberId => string
__init__(group_id, member_id)

Create a new group join request

__len__()

Length of the serialized message, in bytes

get_bytes()

Serialize the message :returns: Serialized message :rtype: bytearray

class pykafka.protocol.LeaveGroupResponse(buff)

Bases: pykafka.protocol.base.Response

A group exit response Specification:: LeaveGroupResponse => ErrorCode

ErrorCode => int16
__init__(buff)

Deserialize into a new Response :param buff: Serialized message :type buff: bytearray

class pykafka.protocol.ListGroupsRequest

Bases: pykafka.protocol.base.Request

A list groups request

Specification:

ListGroupsRequest =>

__len__()

Length of the serialized message, in bytes

get_bytes()

Create a new list group request

class pykafka.protocol.ListGroupsResponse(buff)

Bases: pykafka.protocol.base.Response

A list groups response

Specification:

ListGroupsResponse => ErrorCode Groups

ErrorCode => int16 Groups => [GroupId ProtocolType]

GroupId => string ProtocolType => string
__init__(buff)

Deserialize into a new Response

Parameters:buff (bytearray) – Serialized message
class pykafka.protocol.DescribeGroupsRequest(group_ids)

Bases: pykafka.protocol.base.Request

A describe groups request

Specification:

DescribeGroupsRequest => [GroupId]
GroupId => string
__init__(group_ids)

x.__init__(…) initializes x; see help(type(x)) for signature

__len__()

Length of the serialized message, in bytes

get_bytes()

Create a new list group request

class pykafka.protocol.DescribeGroupsResponse(buff)

Bases: pykafka.protocol.base.Response

A describe groups response

Specification:

DescribeGroupsResponse => [ErrorCode GroupId State ProtocolType Protocol Members]

ErrorCode => int16 GroupId => string State => string ProtocolType => string Protocol => string Members => [MemberId ClientId ClientHost MemberMetadata MemberAssignment]

MemberId => string ClientId => string ClientHost => string MemberMetadata => bytes MemberAssignment => bytes
__init__(buff)

Deserialize into a new Response

Parameters:buff (bytearray) – Serialized message
class pykafka.protocol.Message(value, partition_key=None, compression_type=0, offset=-1, partition_id=-1, produce_attempt=0, protocol_version=0, timestamp=None, delivery_report_q=None)

Bases: pykafka.common.Message, pykafka.utils.Serializable

Representation of a Kafka Message

NOTE: Compression is handled in the protocol because of the way Kafka embeds compressed MessageSets within Messages

Specification:

Message => Crc MagicByte Attributes Key Value
  Crc => int32
  MagicByte => int8
  Attributes => int8
  Key => bytes
  Value => bytes

pykafka.protocol.Message also contains partition and partition_id fields. Both of these have meaningless default values. When pykafka.protocol.Message is used by the producer, partition_id identifies the Message’s destination partition. When used in a pykafka.protocol.FetchRequest, partition_id is set to the id of the partition from which the message was sent on receipt of the message. In the pykafka.simpleconsumer.SimpleConsumer, partition is set to the pykafka.partition.Partition instance from which the message was sent.

Variables:
  • compression_type – The compression algorithm used to generate the message’s current value. Internal use only - regardless of the algorithm used, this will be CompressionType.NONE in any publicly accessible `Message`s.
  • partition_key – Value used to assign this message to a particular partition.
  • value – The payload associated with this message
  • offset – The offset of the message
  • partition_id – The id of the partition to which this message belongs
  • delivery_report_q – For use by pykafka.producer.Producer
__init__(value, partition_key=None, compression_type=0, offset=-1, partition_id=-1, produce_attempt=0, protocol_version=0, timestamp=None, delivery_report_q=None)

x.__init__(…) initializes x; see help(type(x)) for signature

__len__()

Length of the bytes that will be sent to the Kafka server.

pack_into(buff, offset)

Serialize and write to buff starting at offset offset.

Intentionally follows the pattern of struct.pack_into

Parameters:
  • buff – The buffer to write into
  • offset – The offset to start the write at
timestamp_dt

Get the timestamp as a datetime, if valid

class pykafka.protocol.MessageSet(compression_type=0, messages=None, broker_version='0.9.0')

Bases: pykafka.utils.Serializable

Representation of a set of messages in Kafka

This isn’t useful outside of direct communications with Kafka, so we keep it hidden away here.

N.B.: MessageSets are not preceded by an int32 like other array elements in the protocol.

Specification:

MessageSet => [Offset MessageSize Message]
  Offset => int64
  MessageSize => int32
__init__(compression_type=0, messages=None, broker_version='0.9.0')

Create a new MessageSet

Parameters:
  • compression_type – Compression to use on the messages
  • messages – An initial list of messages for the set
  • broker_version – A broker version with which this MessageSet is compatible
__len__()

Length of the serialized message, in bytes

We don’t put the MessageSetSize in front of the serialization because that’s technically not part of the MessageSet. Most requests/responses using MessageSets need that size, though, so be careful when using this.

__weakref__

list of weak references to the object (if defined)

_get_compressed()

Get a compressed representation of all current messages.

Returns a Message object with correct headers set and compressed data in the value field.

classmethod decode(buff, partition_id=-1)

Decode a serialized MessageSet.

pack_into(buff, offset)

Serialize and write to buff starting at offset offset.

Intentionally follows the pattern of struct.pack_into

Parameters:
  • buff – The buffer to write into
  • offset – The offset to start the write at
class pykafka.protocol.ApiVersionsRequest

Bases: pykafka.protocol.base.Request

An api versions request

Specification:

ApiVersions Request (Version: 0) =>
__len__()

Length of the serialized message, in bytes

get_bytes()

Create a new api versions request

class pykafka.protocol.ApiVersionsResponse(buff)

Bases: pykafka.protocol.base.Response

Specification:

ApiVersions Response (Version: 0) => error_code [api_versions]

error_code => INT16 api_versions => api_key min_version max_version

api_key => INT16 min_version => INT16 max_version => INT16
__init__(buff)

Deserialize into a new Response

Parameters:buff (bytearray) – Serialized message
class pykafka.protocol.CreateTopicsRequest(topic_requests, timeout=0)

Bases: pykafka.protocol.base.Request

A create topics request

Specification:

CreateTopics Request (Version: 0) => [create_topic_requests] timeout
create_topic_requests => topic num_partitions replication_factor [replica_assignment] [config_entries]

topic => STRING num_partitions => INT32 replication_factor => INT16 replica_assignment => partition [replicas]

partition => INT32 replicas => INT32
config_entries => config_name config_value
config_name => STRING config_value => NULLABLE_STRING

timeout => INT32

__init__(topic_requests, timeout=0)

x.__init__(…) initializes x; see help(type(x)) for signature

__len__()

Length of the serialized message, in bytes

get_bytes()

Create a new create topics request

class pykafka.protocol.CreateTopicsResponse(buff)

Bases: pykafka.protocol.base.Response

A create topics response

Specification:

CreateTopics Response (Version: 0) => [topic_errors]
topic_errors => topic error_code
topic => STRING error_code => INT16
__init__(buff)

Deserialize into a new Response

Parameters:buff (bytearray) – Serialized message
class pykafka.protocol.DeleteTopicsRequest(topics, timeout=0)

Bases: pykafka.protocol.base.Request

A delete topics request

Specification:

DeleteTopics Request (Version: 0) => [topics] timeout
topics => STRING timeout => INT32
__init__(topics, timeout=0)

x.__init__(…) initializes x; see help(type(x)) for signature

__len__()

Length of the serialized message, in bytes

get_bytes()

Create a new delete topics request

class pykafka.protocol.DeleteTopicsResponse(buff)

Bases: pykafka.protocol.base.Response

A delete topics response

Specification:

DeleteTopics Response (Version: 0) => [topic_error_codes]
topic_error_codes => topic error_code
topic => STRING error_code => INT16
__init__(buff)

Deserialize into a new Response

Parameters:buff (bytearray) – Serialized message
class pykafka.protocol.PartitionOffsetRequest

Bases: pykafka.protocol.offset.PartitionOffsetRequest

Offset request for a specific topic/partition :ivar topic_name: Name of the topic to look up :ivar partition_id: Id of the partition to look up :ivar offsets_before: Retrieve offset information for messages before

this timestamp (ms). -1 will retrieve the latest offsets and -2 will retrieve the earliest available offset. If -2,only 1 offset is returned
Variables:max_offsets – How many offsets to return
class pykafka.protocol.ConsumerGroupProtocolMetadata(version=0, topic_names=None, user_data='testuserdata')

Bases: object

Protocol specification:: ProtocolMetadata => Version Subscription UserData

Version => int16 Subscription => [Topic]

Topic => string

UserData => bytes

__init__(version=0, topic_names=None, user_data='testuserdata')

x.__init__(…) initializes x; see help(type(x)) for signature

__weakref__

list of weak references to the object (if defined)

class pykafka.protocol.MemberAssignment(partition_assignment, version=1)

Bases: object

Protocol specification:: MemberAssignment => Version PartitionAssignment

Version => int16 PartitionAssignment => [Topic [Partition]]

Topic => string Partition => int32

UserData => bytes

__init__(partition_assignment, version=1)

x.__init__(…) initializes x; see help(type(x)) for signature

__weakref__

list of weak references to the object (if defined)

class pykafka.protocol.FetchResponseV1(buff, offset=0, broker_version='0.9.0')

Bases: pykafka.protocol.fetch.FetchResponse

__init__(buff, offset=0, broker_version='0.9.0')

Deserialize into a new Response

Parameters:
  • buff (bytearray) – Serialized message
  • offset (int) – Offset into the message
class pykafka.protocol.FetchResponseV2(buff, offset=0, broker_version='0.9.0')

Bases: pykafka.protocol.fetch.FetchResponseV1

class pykafka.protocol.MetadataResponseV1(buff)

Bases: pykafka.protocol.metadata.MetadataResponse

Response from MetadataRequest Specification:: Metadata Response (Version: 1) => [brokers] controller_id [topic_metadata]

brokers => node_id host port rack
node_id => INT32 host => STRING port => INT32 rack => NULLABLE_STRING (new since v0)

controller_id => INT32 (new since v0) topic_metadata => error_code topic is_internal [partition_metadata]

error_code => INT16 topic => STRING is_internal => BOOLEAN (new since v0) partition_metadata => error_code partition leader [replicas] [isr]

error_code => INT16 partition => INT32 leader => INT32 replicas => INT32 isr => INT32
__init__(buff)

Deserialize into a new Response :param buff: Serialized message :type buff: bytearray

class pykafka.protocol.MetadataRequestV1(topics=None, *kwargs)

Bases: pykafka.protocol.metadata.MetadataRequest

class pykafka.protocol.CreateTopicRequest

Bases: pykafka.protocol.admin.CreateTopicRequest

class pykafka.protocol.ProducePartitionResponse(err, offset)

Bases: tuple

__getnewargs__()

Return self as a plain tuple. Used by copy and pickle.

__getstate__()

Exclude the OrderedDict from pickling

static __new__(_cls, err, offset)

Create new instance of ProducePartitionResponse(err, offset)

__repr__()

Return a nicely formatted representation string

_asdict()

Return a new OrderedDict which maps field names to their values

classmethod _make(iterable, new=<built-in method __new__ of type object at 0x906d60>, len=<built-in function len>)

Make a new ProducePartitionResponse object from a sequence or iterable

_replace(**kwds)

Return a new ProducePartitionResponse object replacing specified fields with new values

err

Alias for field number 0

offset

Alias for field number 1

class pykafka.protocol.ListOffsetRequestV1(partition_requests)

Bases: pykafka.protocol.offset.ListOffsetRequest

Specification::
ListOffsetRequest => ReplicaId [TopicName [Partition Time]]
ReplicaId => int32 TopicName => string Partition => int32 Time => int64
__init__(partition_requests)

Create a new offset request

__len__()

Length of the serialized message, in bytes

get_bytes()

Serialize the message :returns: Serialized message :rtype: bytearray

class pykafka.protocol.ListOffsetResponseV1(buff)

Bases: pykafka.protocol.offset.ListOffsetResponse

Specification::
ListOffsetResponse => [TopicName [PartitionOffsets]]
PartitionOffsets => Partition ErrorCode Timestamp [Offset] Partition => int32 ErrorCode => int16 Timestamp => int64 Offset => int64
__init__(buff)

Deserialize into a new Response :param buff: Serialized message :type buff: bytearray

class pykafka.protocol.OffsetFetchRequestV1(consumer_group, partition_requests=[])

Bases: pykafka.protocol.offset_commit.OffsetFetchRequest

class pykafka.protocol.OffsetFetchResponseV1(buff)

Bases: pykafka.protocol.offset_commit.OffsetFetchResponse

An offset fetch response v1 (all the same as v0) Specification:: OffsetFetch Response (Version: 1) => [responses]

responses => topic [partition_responses]

topic => STRING partition_responses => partition offset metadata error_code

partition => INT32 offset => INT64 metadata => NULLABLE_STRING error_code => INT16
class pykafka.protocol.OffsetFetchRequestV2(consumer_group, partition_requests=[])

Bases: pykafka.protocol.offset_commit.OffsetFetchRequestV1

class pykafka.protocol.OffsetFetchResponseV2(buff)

Bases: pykafka.protocol.offset_commit.OffsetFetchResponseV1

An offset fetch response v2 Specification:: OffsetFetch Response (Version: 2) => [responses] error_code

responses => topic [partition_responses]

topic => STRING partition_responses => partition offset metadata error_code

partition => INT32 offset => INT64 metadata => NULLABLE_STRING error_code => INT16

error_code => INT16 (new since v1)

__init__(buff)

Deserialize into a new Response :param buff: Serialized message :type buff: bytearray

class pykafka.protocol.MetadataRequestV2(topics=None, *kwargs)

Bases: pykafka.protocol.metadata.MetadataRequestV1

class pykafka.protocol.MetadataResponseV2(buff)

Bases: pykafka.protocol.metadata.MetadataResponseV1

Response from MetadataRequest Specification:: Metadata Response (Version: 2) => [brokers] cluster_id controller_id [topic_metadata]

brokers => node_id host port rack
node_id => INT32 host => STRING port => INT32 rack => NULLABLE_STRING

cluster_id => NULLABLE_STRING (new since v1) controller_id => INT32 topic_metadata => error_code topic is_internal [partition_metadata]

error_code => INT16 topic => STRING is_internal => BOOLEAN partition_metadata => error_code partition leader [replicas] [isr]

error_code => INT16 partition => INT32 leader => INT32 replicas => INT32 isr => INT32
__init__(buff)

Deserialize into a new Response :param buff: Serialized message :type buff: bytearray

class pykafka.protocol.MetadataRequestV3(topics=None, *kwargs)

Bases: pykafka.protocol.metadata.MetadataRequestV2

class pykafka.protocol.MetadataResponseV3(buff)

Bases: pykafka.protocol.metadata.MetadataResponseV2

Response from MetadataRequest Specification:: Metadata Response (Version: 3) => throttle_time_ms [brokers] cluster_id controller_id [topic_metadata]

throttle_time_ms => INT32 (new since v2) brokers => node_id host port rack

node_id => INT32 host => STRING port => INT32 rack => NULLABLE_STRING

cluster_id => NULLABLE_STRING controller_id => INT32 topic_metadata => error_code topic is_internal [partition_metadata]

error_code => INT16 topic => STRING is_internal => BOOLEAN partition_metadata => error_code partition leader [replicas] [isr]

error_code => INT16 partition => INT32 leader => INT32 replicas => INT32 isr => INT32
__init__(buff)

Deserialize into a new Response :param buff: Serialized message :type buff: bytearray

class pykafka.protocol.MetadataRequestV4(topics=None, allow_topic_autocreation=True)

Bases: pykafka.protocol.metadata.MetadataRequestV3

Metadata Request Specification:: Metadata Request (Version: 4) => [topics] allow_auto_topic_creation

topics => STRING allow_auto_topic_creation => BOOLEAN
__init__(topics=None, allow_topic_autocreation=True)

Create a new MetadataRequest :param topics: Topics to query. Leave empty for all available topics. :param allow_topic_autocreation: If this and the broker config

‘auto.create.topics.enable’ are true, topics that don’t exist will be created by the broker. Otherwise, no topics will be created by the broker.
__len__()

Length of the serialized message, in bytes

get_bytes()

Serialize the message :returns: Serialized message :rtype: bytearray

class pykafka.protocol.MetadataResponseV4(buff)

Bases: pykafka.protocol.metadata.MetadataResponseV3

Response from MetadataRequest Specification:: Metadata Response (Version: 4) => throttle_time_ms [brokers] cluster_id controller_id [topic_metadata]

throttle_time_ms => INT32 brokers => node_id host port rack

node_id => INT32 host => STRING port => INT32 rack => NULLABLE_STRING

cluster_id => NULLABLE_STRING controller_id => INT32 topic_metadata => error_code topic is_internal [partition_metadata]

error_code => INT16 topic => STRING is_internal => BOOLEAN partition_metadata => error_code partition leader [replicas] [isr]

error_code => INT16 partition => INT32 leader => INT32 replicas => INT32 isr => INT32
class pykafka.protocol.MetadataRequestV5(topics=None, allow_topic_autocreation=True)

Bases: pykafka.protocol.metadata.MetadataRequestV4

class pykafka.protocol.MetadataResponseV5(buff)

Bases: pykafka.protocol.metadata.MetadataResponseV4

Response from MetadataRequest Specification:: Metadata Response (Version: 5) => throttle_time_ms [brokers] cluster_id controller_id [topic_metadata]

throttle_time_ms => INT32 brokers => node_id host port rack

node_id => INT32 host => STRING port => INT32 rack => NULLABLE_STRING

cluster_id => NULLABLE_STRING controller_id => INT32 topic_metadata => error_code topic is_internal [partition_metadata]

error_code => INT16 topic => STRING is_internal => BOOLEAN partition_metadata => error_code partition leader [replicas] [isr] [offline_replicas]

error_code => INT16 partition => INT32 leader => INT32 replicas => INT32 isr => INT32 offline_replicas => INT32 (new since v4)
__init__(buff)

Deserialize into a new Response :param buff: Serialized message :type buff: bytearray