pykafka.protocol¶
-
class
pykafka.protocol.
MetadataRequest
(topics=None, *kwargs)¶ Bases:
pykafka.protocol.base.Request
Metadata Request Specification:
MetadataRequest => [TopicName] TopicName => string
-
__init__
(topics=None, *kwargs)¶ Create a new MetadataRequest :param topics: Topics to query. Leave empty for all available topics.
-
__len__
()¶ Length of the serialized message, in bytes
-
get_bytes
()¶ Serialize the message :returns: Serialized message :rtype:
bytearray
-
-
class
pykafka.protocol.
MetadataResponse
(buff)¶ Bases:
pykafka.protocol.base.Response
Response from MetadataRequest Specification:: Metadata Response (Version: 0) => [brokers] [topic_metadata]
- brokers => node_id host port
- node_id => INT32 host => STRING port => INT32
- topic_metadata => error_code topic [partition_metadata]
error_code => INT16 topic => STRING partition_metadata => error_code partition leader [replicas] [isr]
error_code => INT16 partition => INT32 leader => INT32 replicas => INT32 isr => INT32
-
__init__
(buff)¶ Deserialize into a new Response :param buff: Serialized message :type buff:
bytearray
-
class
pykafka.protocol.
ProduceRequest
(compression_type=0, required_acks=1, timeout=10000, broker_version='0.9.0')¶ Bases:
pykafka.protocol.base.Request
Produce Request Specification:
ProduceRequest => RequiredAcks Timeout [TopicName [Partition MessageSetSize MessageSet]] RequiredAcks => int16 Timeout => int32 Partition => int32 MessageSetSize => int32
-
__init__
(compression_type=0, required_acks=1, timeout=10000, broker_version='0.9.0')¶ Create a new ProduceRequest
required_acks
determines how many acknowledgement the server waits for before returning. This is useful for ensuring the replication factor of published messages. The behavior is:-1: Block until all servers acknowledge 0: No waiting -- server doesn't even respond to the Produce request 1: Wait for this server to write to the local log and then return 2+: Wait for N servers to acknowledge
Parameters: - partition_requests – Iterable of
kafka.pykafka.protocol.PartitionProduceRequest
for this request - compression_type – Compression to use for messages
- required_acks – see docstring
- timeout – timeout (in ms) to wait for the required acks
- partition_requests – Iterable of
-
__len__
()¶ Length of the serialized message, in bytes
-
add_message
(message, topic_name, partition_id)¶ Add a list of
kafka.common.Message
to the waiting request :param messages: an iterable ofkafka.common.Message
to add :param topic_name: the name of the topic to publish to :param partition_id: the partition to publish to
-
get_bytes
()¶ Serialize the message :returns: Serialized message :rtype:
bytearray
-
message_count
()¶ Get the number of messages across all MessageSets in the request.
-
messages
¶ Iterable of all messages in the Request
-
-
class
pykafka.protocol.
ProduceResponse
(buff)¶ Bases:
pykafka.protocol.base.Response
Produce Response. Checks to make sure everything went okay. Specification:
ProduceResponse => [TopicName [Partition ErrorCode Offset]] TopicName => string Partition => int32 ErrorCode => int16 Offset => int64
-
__init__
(buff)¶ Deserialize into a new Response :param buff: Serialized message :type buff:
bytearray
-
-
class
pykafka.protocol.
PartitionFetchRequest
¶ Bases:
pykafka.protocol.fetch.PartitionFetchRequest
Fetch request for a specific topic/partition
Variables: - topic_name – Name of the topic to fetch from
- partition_id – Id of the partition to fetch from
- offset – Offset at which to start reading
- max_bytes – Max bytes to read from this partition (default: 300kb)
-
class
pykafka.protocol.
FetchRequest
(partition_requests=[], timeout=1000, min_bytes=1024, api_version=0)¶ Bases:
pykafka.protocol.base.Request
A Fetch request sent to Kafka
Specification:
FetchRequest => ReplicaId MaxWaitTime MinBytes [TopicName [Partition FetchOffset MaxBytes]] ReplicaId => int32 MaxWaitTime => int32 MinBytes => int32 TopicName => string Partition => int32 FetchOffset => int64 MaxBytes => int32
-
__init__
(partition_requests=[], timeout=1000, min_bytes=1024, api_version=0)¶ Create a new fetch request
Kafka 0.8 uses long polling for fetch requests, which is different from 0.7x. Instead of polling and waiting, we can now set a timeout to wait and a minimum number of bytes to be collected before it returns. This way we can block effectively and also ensure good network throughput by having fewer, large transfers instead of many small ones every time a byte is written to the log.
Parameters: - partition_requests – Iterable of
kafka.pykafka..protocol.PartitionFetchRequest
for this request - timeout – Max time to wait (in ms) for a response from the server
- min_bytes – Minimum bytes to collect before returning
- partition_requests – Iterable of
-
__len__
()¶ Length of the serialized message, in bytes
-
add_request
(partition_request)¶ Add a topic/partition/offset to the requests
Parameters: - topic_name – The topic to fetch from
- partition_id – The partition to fetch from
- offset – The offset to start reading data from
- max_bytes – The maximum number of bytes to return in the response
-
get_bytes
()¶ Serialize the message
Returns: Serialized message Return type: bytearray
-
-
class
pykafka.protocol.
FetchPartitionResponse
(max_offset, messages, err)¶ Bases:
tuple
-
__getnewargs__
()¶ Return self as a plain tuple. Used by copy and pickle.
-
__getstate__
()¶ Exclude the OrderedDict from pickling
-
static
__new__
(_cls, max_offset, messages, err)¶ Create new instance of FetchPartitionResponse(max_offset, messages, err)
-
__repr__
()¶ Return a nicely formatted representation string
-
_asdict
()¶ Return a new OrderedDict which maps field names to their values
-
classmethod
_make
(iterable, new=<built-in method __new__ of type object>, len=<built-in function len>)¶ Make a new FetchPartitionResponse object from a sequence or iterable
-
_replace
(**kwds)¶ Return a new FetchPartitionResponse object replacing specified fields with new values
-
err
¶ Alias for field number 2
-
max_offset
¶ Alias for field number 0
-
messages
¶ Alias for field number 1
-
-
class
pykafka.protocol.
FetchResponse
(buff, offset=0, broker_version='0.9.0')¶ Bases:
pykafka.protocol.base.Response
Unpack a fetch response from the server
Specification:
FetchResponse => [TopicName [Partition ErrorCode HighwaterMarkOffset MessageSetSize MessageSet]] TopicName => string Partition => int32 ErrorCode => int16 HighwaterMarkOffset => int64 MessageSetSize => int32
-
__init__
(buff, offset=0, broker_version='0.9.0')¶ Deserialize into a new Response
Parameters: - buff (
bytearray
) – Serialized message - offset (int) – Offset into the message
- buff (
-
_unpack_message_set
(buff, partition_id=-1, broker_version='0.9.0')¶ MessageSets can be nested. Get just the Messages out of it.
-
-
class
pykafka.protocol.
ListOffsetRequest
(partition_requests)¶ Bases:
pykafka.protocol.base.Request
An offset request Specification:
ListOffsetRequest => ReplicaId [TopicName [Partition Time MaxNumberOfOffsets]] ReplicaId => int32 TopicName => string Partition => int32 Time => int64 MaxNumberOfOffsets => int32
-
__init__
(partition_requests)¶ Create a new offset request
-
__len__
()¶ Length of the serialized message, in bytes
-
get_bytes
()¶ Serialize the message :returns: Serialized message :rtype:
bytearray
-
-
class
pykafka.protocol.
ListOffsetResponse
(buff)¶ Bases:
pykafka.protocol.base.Response
An offset response Specification:
ListOffsetResponse => [TopicName [PartitionOffsets]] PartitionOffsets => Partition ErrorCode [Offset] Partition => int32 ErrorCode => int16 Offset => int64
-
__init__
(buff)¶ Deserialize into a new Response :param buff: Serialized message :type buff:
bytearray
-
-
class
pykafka.protocol.
GroupCoordinatorRequest
(consumer_group)¶ Bases:
pykafka.protocol.base.Request
A consumer metadata request Specification:
GroupCoordinatorRequest => ConsumerGroup ConsumerGroup => string
-
__init__
(consumer_group)¶ Create a new group coordinator request
-
__len__
()¶ Length of the serialized message, in bytes
-
get_bytes
()¶ Serialize the message :returns: Serialized message :rtype:
bytearray
-
-
class
pykafka.protocol.
GroupCoordinatorResponse
(buff)¶ Bases:
pykafka.protocol.base.Response
A group coordinator response Specification:
GroupCoordinatorResponse => ErrorCode CoordinatorId CoordinatorHost CoordinatorPort ErrorCode => int16 CoordinatorId => int32 CoordinatorHost => string CoordinatorPort => int32
-
__init__
(buff)¶ Deserialize into a new Response :param buff: Serialized message :type buff:
bytearray
-
-
class
pykafka.protocol.
PartitionOffsetCommitRequest
¶ Bases:
pykafka.protocol.offset_commit.PartitionOffsetCommitRequest
Offset commit request for a specific topic/partition :ivar topic_name: Name of the topic to look up :ivar partition_id: Id of the partition to look up :ivar offset: :ivar timestamp: :ivar metadata: arbitrary metadata that should be committed with this offset commit
-
class
pykafka.protocol.
OffsetCommitRequest
(consumer_group, consumer_group_generation_id, consumer_id, partition_requests=[])¶ Bases:
pykafka.protocol.base.Request
An offset commit request Specification:
OffsetCommitRequest => ConsumerGroupId ConsumerGroupGenerationId ConsumerId [TopicName [Partition Offset TimeStamp Metadata]] ConsumerGroupId => string ConsumerGroupGenerationId => int32 ConsumerId => string TopicName => string Partition => int32 Offset => int64 TimeStamp => int64 Metadata => string
-
__init__
(consumer_group, consumer_group_generation_id, consumer_id, partition_requests=[])¶ Create a new offset commit request :param partition_requests: Iterable of
kafka.pykafka.protocol.PartitionOffsetCommitRequest
for this request
-
__len__
()¶ Length of the serialized message, in bytes
-
get_bytes
()¶ Serialize the message :returns: Serialized message :rtype:
bytearray
-
-
class
pykafka.protocol.
OffsetCommitPartitionResponse
(err)¶ Bases:
tuple
-
__getnewargs__
()¶ Return self as a plain tuple. Used by copy and pickle.
-
__getstate__
()¶ Exclude the OrderedDict from pickling
-
static
__new__
(_cls, err)¶ Create new instance of OffsetCommitPartitionResponse(err,)
-
__repr__
()¶ Return a nicely formatted representation string
-
_asdict
()¶ Return a new OrderedDict which maps field names to their values
-
classmethod
_make
(iterable, new=<built-in method __new__ of type object>, len=<built-in function len>)¶ Make a new OffsetCommitPartitionResponse object from a sequence or iterable
-
_replace
(**kwds)¶ Return a new OffsetCommitPartitionResponse object replacing specified fields with new values
-
err
¶ Alias for field number 0
-
-
class
pykafka.protocol.
OffsetCommitResponse
(buff)¶ Bases:
pykafka.protocol.base.Response
An offset commit response Specification:
OffsetCommitResponse => [TopicName [Partition ErrorCode]]] TopicName => string Partition => int32 ErrorCode => int16
-
__init__
(buff)¶ Deserialize into a new Response :param buff: Serialized message :type buff:
bytearray
-
-
class
pykafka.protocol.
PartitionOffsetFetchRequest
¶ Bases:
pykafka.protocol.offset_commit.PartitionOffsetFetchRequest
Offset fetch request for a specific topic/partition :ivar topic_name: Name of the topic to look up :ivar partition_id: Id of the partition to look up
-
class
pykafka.protocol.
OffsetFetchRequest
(consumer_group, partition_requests=[])¶ Bases:
pykafka.protocol.base.Request
An offset fetch request Specification:
OffsetFetchRequest => ConsumerGroup [TopicName [Partition]] ConsumerGroup => string TopicName => string Partition => int32
-
__init__
(consumer_group, partition_requests=[])¶ Create a new offset fetch request :param partition_requests: Iterable of
kafka.pykafka.protocol.PartitionOffsetFetchRequest
for this request
-
__len__
()¶ Length of the serialized message, in bytes
-
get_bytes
()¶ Serialize the message :returns: Serialized message :rtype:
bytearray
-
-
class
pykafka.protocol.
OffsetFetchPartitionResponse
(offset, metadata, err)¶ Bases:
tuple
-
__getnewargs__
()¶ Return self as a plain tuple. Used by copy and pickle.
-
__getstate__
()¶ Exclude the OrderedDict from pickling
-
static
__new__
(_cls, offset, metadata, err)¶ Create new instance of OffsetFetchPartitionResponse(offset, metadata, err)
-
__repr__
()¶ Return a nicely formatted representation string
-
_asdict
()¶ Return a new OrderedDict which maps field names to their values
-
classmethod
_make
(iterable, new=<built-in method __new__ of type object>, len=<built-in function len>)¶ Make a new OffsetFetchPartitionResponse object from a sequence or iterable
-
_replace
(**kwds)¶ Return a new OffsetFetchPartitionResponse object replacing specified fields with new values
-
err
¶ Alias for field number 2
-
metadata
¶ Alias for field number 1
-
offset
¶ Alias for field number 0
-
-
class
pykafka.protocol.
OffsetFetchResponse
(buff)¶ Bases:
pykafka.protocol.base.Response
An offset fetch response v0 Specification:: OffsetFetch Response (Version: 0) => [responses]
- responses => topic [partition_responses]
topic => STRING partition_responses => partition offset metadata error_code
partition => INT32 offset => INT64 metadata => NULLABLE_STRING error_code => INT16
-
__init__
(buff)¶ Deserialize into a new Response :param buff: Serialized message :type buff:
bytearray
-
class
pykafka.protocol.
JoinGroupRequest
(group_id, member_id, topic_name, membership_protocol, session_timeout=30000)¶ Bases:
pykafka.protocol.base.Request
A group join request Specification:: JoinGroupRequest => GroupId SessionTimeout MemberId ProtocolType GroupProtocols
GroupId => string SessionTimeout => int32 MemberId => string ProtocolType => string GroupProtocols => [ProtocolName ProtocolMetadata]
ProtocolName => string ProtocolMetadata => bytes-
__init__
(group_id, member_id, topic_name, membership_protocol, session_timeout=30000)¶ Create a new group join request
-
__len__
()¶ Length of the serialized message, in bytes
-
get_bytes
()¶ Serialize the message :returns: Serialized message :rtype:
bytearray
-
-
class
pykafka.protocol.
JoinGroupResponse
(buff)¶ Bases:
pykafka.protocol.base.Response
A group join response Specification:: JoinGroupResponse => ErrorCode GenerationId GroupProtocol LeaderId MemberId Members
ErrorCode => int16 GenerationId => int32 GroupProtocol => string LeaderId => string MemberId => string Members => [MemberId MemberMetadata]
MemberId => string MemberMetadata => bytes-
__init__
(buff)¶ Deserialize into a new Response :param buff: Serialized message :type buff:
bytearray
-
-
class
pykafka.protocol.
SyncGroupRequest
(group_id, generation_id, member_id, group_assignment)¶ Bases:
pykafka.protocol.base.Request
A group sync request Specification:: SyncGroupRequest => GroupId GenerationId MemberId GroupAssignment
GroupId => string GenerationId => int32 MemberId => string GroupAssignment => [MemberId MemberAssignment]
MemberId => string MemberAssignment => bytes-
__init__
(group_id, generation_id, member_id, group_assignment)¶ Create a new group join request
-
__len__
()¶ Length of the serialized message, in bytes
-
get_bytes
()¶ Serialize the message :returns: Serialized message :rtype:
bytearray
-
-
class
pykafka.protocol.
SyncGroupResponse
(buff)¶ Bases:
pykafka.protocol.base.Response
A group sync response Specification:: SyncGroupResponse => ErrorCode MemberAssignment
ErrorCode => int16 MemberAssignment => bytes-
__init__
(buff)¶ Deserialize into a new Response :param buff: Serialized message :type buff:
bytearray
-
-
class
pykafka.protocol.
HeartbeatRequest
(group_id, generation_id, member_id)¶ Bases:
pykafka.protocol.base.Request
A group heartbeat request Specification:: HeartbeatRequest => GroupId GenerationId MemberId
GroupId => string GenerationId => int32 MemberId => string-
__init__
(group_id, generation_id, member_id)¶ Create a new heartbeat request
-
__len__
()¶ Length of the serialized message, in bytes
-
get_bytes
()¶ Serialize the message :returns: Serialized message :rtype:
bytearray
-
-
class
pykafka.protocol.
HeartbeatResponse
(buff)¶ Bases:
pykafka.protocol.base.Response
A group heartbeat response Specification:: HeartbeatResponse => ErrorCode
ErrorCode => int16-
__init__
(buff)¶ Deserialize into a new Response :param buff: Serialized message :type buff:
bytearray
-
-
class
pykafka.protocol.
LeaveGroupRequest
(group_id, member_id)¶ Bases:
pykafka.protocol.base.Request
A group exit request Specification:: LeaveGroupRequest => GroupId MemberId
GroupId => string MemberId => string-
__init__
(group_id, member_id)¶ Create a new group join request
-
__len__
()¶ Length of the serialized message, in bytes
-
get_bytes
()¶ Serialize the message :returns: Serialized message :rtype:
bytearray
-
-
class
pykafka.protocol.
LeaveGroupResponse
(buff)¶ Bases:
pykafka.protocol.base.Response
A group exit response Specification:: LeaveGroupResponse => ErrorCode
ErrorCode => int16-
__init__
(buff)¶ Deserialize into a new Response :param buff: Serialized message :type buff:
bytearray
-
-
class
pykafka.protocol.
ListGroupsRequest
¶ Bases:
pykafka.protocol.base.Request
A list groups request
Specification:
ListGroupsRequest =>
-
__len__
()¶ Length of the serialized message, in bytes
-
get_bytes
()¶ Create a new list group request
-
-
class
pykafka.protocol.
ListGroupsResponse
(buff)¶ Bases:
pykafka.protocol.base.Response
A list groups response
Specification:
- ListGroupsResponse => ErrorCode Groups
ErrorCode => int16 Groups => [GroupId ProtocolType]
GroupId => string ProtocolType => string
-
__init__
(buff)¶ Deserialize into a new Response
Parameters: buff ( bytearray
) – Serialized message
-
class
pykafka.protocol.
DescribeGroupsRequest
(group_ids)¶ Bases:
pykafka.protocol.base.Request
A describe groups request
Specification:
- DescribeGroupsRequest => [GroupId]
- GroupId => string
-
__init__
(group_ids)¶ x.__init__(…) initializes x; see help(type(x)) for signature
-
__len__
()¶ Length of the serialized message, in bytes
-
get_bytes
()¶ Create a new list group request
-
class
pykafka.protocol.
DescribeGroupsResponse
(buff)¶ Bases:
pykafka.protocol.base.Response
A describe groups response
Specification:
- DescribeGroupsResponse => [ErrorCode GroupId State ProtocolType Protocol Members]
ErrorCode => int16 GroupId => string State => string ProtocolType => string Protocol => string Members => [MemberId ClientId ClientHost MemberMetadata MemberAssignment]
MemberId => string ClientId => string ClientHost => string MemberMetadata => bytes MemberAssignment => bytes
-
__init__
(buff)¶ Deserialize into a new Response
Parameters: buff ( bytearray
) – Serialized message
-
class
pykafka.protocol.
Message
(value, partition_key=None, compression_type=0, offset=-1, partition_id=-1, produce_attempt=0, protocol_version=0, timestamp=None, delivery_report_q=None)¶ Bases:
pykafka.common.Message
,pykafka.utils.Serializable
Representation of a Kafka Message
NOTE: Compression is handled in the protocol because of the way Kafka embeds compressed MessageSets within Messages
Specification:
Message => Crc MagicByte Attributes Key Value Crc => int32 MagicByte => int8 Attributes => int8 Key => bytes Value => bytes
pykafka.protocol.Message
also contains partition and partition_id fields. Both of these have meaningless default values. Whenpykafka.protocol.Message
is used by the producer, partition_id identifies the Message’s destination partition. When used in apykafka.protocol.FetchRequest
, partition_id is set to the id of the partition from which the message was sent on receipt of the message. In thepykafka.simpleconsumer.SimpleConsumer
, partition is set to thepykafka.partition.Partition
instance from which the message was sent.Variables: - compression_type – The compression algorithm used to generate the message’s current value. Internal use only - regardless of the algorithm used, this will be CompressionType.NONE in any publicly accessible `Message`s.
- partition_key – Value used to assign this message to a particular partition.
- value – The payload associated with this message
- offset – The offset of the message
- partition_id – The id of the partition to which this message belongs
- delivery_report_q – For use by
pykafka.producer.Producer
-
__init__
(value, partition_key=None, compression_type=0, offset=-1, partition_id=-1, produce_attempt=0, protocol_version=0, timestamp=None, delivery_report_q=None)¶ x.__init__(…) initializes x; see help(type(x)) for signature
-
__len__
()¶ Length of the bytes that will be sent to the Kafka server.
-
pack_into
(buff, offset)¶ Serialize and write to
buff
starting at offsetoffset
.Intentionally follows the pattern of
struct.pack_into
Parameters: - buff – The buffer to write into
- offset – The offset to start the write at
-
timestamp_dt
¶ Get the timestamp as a datetime, if valid
-
class
pykafka.protocol.
MessageSet
(compression_type=0, messages=None, broker_version='0.9.0')¶ Bases:
pykafka.utils.Serializable
Representation of a set of messages in Kafka
This isn’t useful outside of direct communications with Kafka, so we keep it hidden away here.
N.B.: MessageSets are not preceded by an int32 like other array elements in the protocol.
Specification:
MessageSet => [Offset MessageSize Message] Offset => int64 MessageSize => int32
-
__init__
(compression_type=0, messages=None, broker_version='0.9.0')¶ Create a new MessageSet
Parameters: - compression_type – Compression to use on the messages
- messages – An initial list of messages for the set
- broker_version – A broker version with which this MessageSet is compatible
-
__len__
()¶ Length of the serialized message, in bytes
We don’t put the MessageSetSize in front of the serialization because that’s technically not part of the MessageSet. Most requests/responses using MessageSets need that size, though, so be careful when using this.
-
__weakref__
¶ list of weak references to the object (if defined)
-
_get_compressed
()¶ Get a compressed representation of all current messages.
Returns a Message object with correct headers set and compressed data in the value field.
-
classmethod
decode
(buff, partition_id=-1)¶ Decode a serialized MessageSet.
-
pack_into
(buff, offset)¶ Serialize and write to
buff
starting at offsetoffset
.Intentionally follows the pattern of
struct.pack_into
Parameters: - buff – The buffer to write into
- offset – The offset to start the write at
-
-
class
pykafka.protocol.
ApiVersionsRequest
¶ Bases:
pykafka.protocol.base.Request
An api versions request
Specification:
ApiVersions Request (Version: 0) =>
-
__len__
()¶ Length of the serialized message, in bytes
-
get_bytes
()¶ Create a new api versions request
-
-
class
pykafka.protocol.
ApiVersionsResponse
(buff)¶ Bases:
pykafka.protocol.base.Response
Specification:
- ApiVersions Response (Version: 0) => error_code [api_versions]
error_code => INT16 api_versions => api_key min_version max_version
api_key => INT16 min_version => INT16 max_version => INT16
-
__init__
(buff)¶ Deserialize into a new Response
Parameters: buff ( bytearray
) – Serialized message
-
class
pykafka.protocol.
CreateTopicsRequest
(topic_requests, timeout=0)¶ Bases:
pykafka.protocol.base.Request
A create topics request
Specification:
- CreateTopics Request (Version: 0) => [create_topic_requests] timeout
- create_topic_requests => topic num_partitions replication_factor [replica_assignment] [config_entries]
topic => STRING num_partitions => INT32 replication_factor => INT16 replica_assignment => partition [replicas]
partition => INT32 replicas => INT32- config_entries => config_name config_value
- config_name => STRING config_value => NULLABLE_STRING
timeout => INT32
-
__init__
(topic_requests, timeout=0)¶ x.__init__(…) initializes x; see help(type(x)) for signature
-
__len__
()¶ Length of the serialized message, in bytes
-
get_bytes
()¶ Create a new create topics request
-
class
pykafka.protocol.
CreateTopicsResponse
(buff)¶ Bases:
pykafka.protocol.base.Response
A create topics response
Specification:
- CreateTopics Response (Version: 0) => [topic_errors]
- topic_errors => topic error_code
- topic => STRING error_code => INT16
-
__init__
(buff)¶ Deserialize into a new Response
Parameters: buff ( bytearray
) – Serialized message
-
class
pykafka.protocol.
DeleteTopicsRequest
(topics, timeout=0)¶ Bases:
pykafka.protocol.base.Request
A delete topics request
Specification:
- DeleteTopics Request (Version: 0) => [topics] timeout
- topics => STRING timeout => INT32
-
__init__
(topics, timeout=0)¶ x.__init__(…) initializes x; see help(type(x)) for signature
-
__len__
()¶ Length of the serialized message, in bytes
-
get_bytes
()¶ Create a new delete topics request
-
class
pykafka.protocol.
DeleteTopicsResponse
(buff)¶ Bases:
pykafka.protocol.base.Response
A delete topics response
Specification:
- DeleteTopics Response (Version: 0) => [topic_error_codes]
- topic_error_codes => topic error_code
- topic => STRING error_code => INT16
-
__init__
(buff)¶ Deserialize into a new Response
Parameters: buff ( bytearray
) – Serialized message
-
class
pykafka.protocol.
PartitionOffsetRequest
¶ Bases:
pykafka.protocol.offset.PartitionOffsetRequest
Offset request for a specific topic/partition :ivar topic_name: Name of the topic to look up :ivar partition_id: Id of the partition to look up :ivar offsets_before: Retrieve offset information for messages before
this timestamp (ms). -1 will retrieve the latest offsets and -2 will retrieve the earliest available offset. If -2,only 1 offset is returnedVariables: max_offsets – How many offsets to return
-
class
pykafka.protocol.
ConsumerGroupProtocolMetadata
(version=0, topic_names=None, user_data='testuserdata')¶ Bases:
object
Protocol specification:: ProtocolMetadata => Version Subscription UserData
Version => int16 Subscription => [Topic]
Topic => stringUserData => bytes
-
__init__
(version=0, topic_names=None, user_data='testuserdata')¶ x.__init__(…) initializes x; see help(type(x)) for signature
-
__weakref__
¶ list of weak references to the object (if defined)
-
-
class
pykafka.protocol.
MemberAssignment
(partition_assignment, version=1)¶ Bases:
object
Protocol specification:: MemberAssignment => Version PartitionAssignment
Version => int16 PartitionAssignment => [Topic [Partition]]
Topic => string Partition => int32UserData => bytes
-
__init__
(partition_assignment, version=1)¶ x.__init__(…) initializes x; see help(type(x)) for signature
-
__weakref__
¶ list of weak references to the object (if defined)
-
-
class
pykafka.protocol.
FetchResponseV1
(buff, offset=0, broker_version='0.9.0')¶ Bases:
pykafka.protocol.fetch.FetchResponse
-
__init__
(buff, offset=0, broker_version='0.9.0')¶ Deserialize into a new Response
Parameters: - buff (
bytearray
) – Serialized message - offset (int) – Offset into the message
- buff (
-
-
class
pykafka.protocol.
FetchResponseV2
(buff, offset=0, broker_version='0.9.0')¶ Bases:
pykafka.protocol.fetch.FetchResponseV1
-
class
pykafka.protocol.
MetadataResponseV1
(buff)¶ Bases:
pykafka.protocol.metadata.MetadataResponse
Response from MetadataRequest Specification:: Metadata Response (Version: 1) => [brokers] controller_id [topic_metadata]
- brokers => node_id host port rack
- node_id => INT32 host => STRING port => INT32 rack => NULLABLE_STRING (new since v0)
controller_id => INT32 (new since v0) topic_metadata => error_code topic is_internal [partition_metadata]
error_code => INT16 topic => STRING is_internal => BOOLEAN (new since v0) partition_metadata => error_code partition leader [replicas] [isr]
error_code => INT16 partition => INT32 leader => INT32 replicas => INT32 isr => INT32-
__init__
(buff)¶ Deserialize into a new Response :param buff: Serialized message :type buff:
bytearray
-
class
pykafka.protocol.
MetadataRequestV1
(topics=None, *kwargs)¶ Bases:
pykafka.protocol.metadata.MetadataRequest
-
class
pykafka.protocol.
CreateTopicRequest
¶ Bases:
pykafka.protocol.admin.CreateTopicRequest
-
class
pykafka.protocol.
ProducePartitionResponse
(err, offset)¶ Bases:
tuple
-
__getnewargs__
()¶ Return self as a plain tuple. Used by copy and pickle.
-
__getstate__
()¶ Exclude the OrderedDict from pickling
-
static
__new__
(_cls, err, offset)¶ Create new instance of ProducePartitionResponse(err, offset)
-
__repr__
()¶ Return a nicely formatted representation string
-
_asdict
()¶ Return a new OrderedDict which maps field names to their values
-
classmethod
_make
(iterable, new=<built-in method __new__ of type object>, len=<built-in function len>)¶ Make a new ProducePartitionResponse object from a sequence or iterable
-
_replace
(**kwds)¶ Return a new ProducePartitionResponse object replacing specified fields with new values
-
err
¶ Alias for field number 0
-
offset
¶ Alias for field number 1
-
-
class
pykafka.protocol.
ListOffsetRequestV1
(partition_requests)¶ Bases:
pykafka.protocol.offset.ListOffsetRequest
- Specification::
- ListOffsetRequest => ReplicaId [TopicName [Partition Time]]
- ReplicaId => int32 TopicName => string Partition => int32 Time => int64
-
__init__
(partition_requests)¶ Create a new offset request
-
__len__
()¶ Length of the serialized message, in bytes
-
get_bytes
()¶ Serialize the message :returns: Serialized message :rtype:
bytearray
-
class
pykafka.protocol.
ListOffsetResponseV1
(buff)¶ Bases:
pykafka.protocol.offset.ListOffsetResponse
- Specification::
- ListOffsetResponse => [TopicName [PartitionOffsets]]
- PartitionOffsets => Partition ErrorCode Timestamp [Offset] Partition => int32 ErrorCode => int16 Timestamp => int64 Offset => int64
-
__init__
(buff)¶ Deserialize into a new Response :param buff: Serialized message :type buff:
bytearray
-
class
pykafka.protocol.
OffsetFetchRequestV1
(consumer_group, partition_requests=[])¶ Bases:
pykafka.protocol.offset_commit.OffsetFetchRequest
-
class
pykafka.protocol.
OffsetFetchResponseV1
(buff)¶ Bases:
pykafka.protocol.offset_commit.OffsetFetchResponse
An offset fetch response v1 (all the same as v0) Specification:: OffsetFetch Response (Version: 1) => [responses]
- responses => topic [partition_responses]
topic => STRING partition_responses => partition offset metadata error_code
partition => INT32 offset => INT64 metadata => NULLABLE_STRING error_code => INT16
-
class
pykafka.protocol.
OffsetFetchRequestV2
(consumer_group, partition_requests=[])¶ Bases:
pykafka.protocol.offset_commit.OffsetFetchRequestV1
-
class
pykafka.protocol.
OffsetFetchResponseV2
(buff)¶ Bases:
pykafka.protocol.offset_commit.OffsetFetchResponseV1
An offset fetch response v2 Specification:: OffsetFetch Response (Version: 2) => [responses] error_code
- responses => topic [partition_responses]
topic => STRING partition_responses => partition offset metadata error_code
partition => INT32 offset => INT64 metadata => NULLABLE_STRING error_code => INT16
error_code => INT16 (new since v1)
-
__init__
(buff)¶ Deserialize into a new Response :param buff: Serialized message :type buff:
bytearray
-
class
pykafka.protocol.
MetadataRequestV2
(topics=None, *kwargs)¶ Bases:
pykafka.protocol.metadata.MetadataRequestV1
-
class
pykafka.protocol.
MetadataResponseV2
(buff)¶ Bases:
pykafka.protocol.metadata.MetadataResponseV1
Response from MetadataRequest Specification:: Metadata Response (Version: 2) => [brokers] cluster_id controller_id [topic_metadata]
- brokers => node_id host port rack
- node_id => INT32 host => STRING port => INT32 rack => NULLABLE_STRING
cluster_id => NULLABLE_STRING (new since v1) controller_id => INT32 topic_metadata => error_code topic is_internal [partition_metadata]
error_code => INT16 topic => STRING is_internal => BOOLEAN partition_metadata => error_code partition leader [replicas] [isr]
error_code => INT16 partition => INT32 leader => INT32 replicas => INT32 isr => INT32-
__init__
(buff)¶ Deserialize into a new Response :param buff: Serialized message :type buff:
bytearray
-
class
pykafka.protocol.
MetadataRequestV3
(topics=None, *kwargs)¶ Bases:
pykafka.protocol.metadata.MetadataRequestV2
-
class
pykafka.protocol.
MetadataResponseV3
(buff)¶ Bases:
pykafka.protocol.metadata.MetadataResponseV2
Response from MetadataRequest Specification:: Metadata Response (Version: 3) => throttle_time_ms [brokers] cluster_id controller_id [topic_metadata]
throttle_time_ms => INT32 (new since v2) brokers => node_id host port rack
node_id => INT32 host => STRING port => INT32 rack => NULLABLE_STRINGcluster_id => NULLABLE_STRING controller_id => INT32 topic_metadata => error_code topic is_internal [partition_metadata]
error_code => INT16 topic => STRING is_internal => BOOLEAN partition_metadata => error_code partition leader [replicas] [isr]
error_code => INT16 partition => INT32 leader => INT32 replicas => INT32 isr => INT32-
__init__
(buff)¶ Deserialize into a new Response :param buff: Serialized message :type buff:
bytearray
-
-
class
pykafka.protocol.
MetadataRequestV4
(topics=None, allow_topic_autocreation=True)¶ Bases:
pykafka.protocol.metadata.MetadataRequestV3
Metadata Request Specification:: Metadata Request (Version: 4) => [topics] allow_auto_topic_creation
topics => STRING allow_auto_topic_creation => BOOLEAN-
__init__
(topics=None, allow_topic_autocreation=True)¶ Create a new MetadataRequest :param topics: Topics to query. Leave empty for all available topics. :param allow_topic_autocreation: If this and the broker config
‘auto.create.topics.enable’ are true, topics that don’t exist will be created by the broker. Otherwise, no topics will be created by the broker.
-
__len__
()¶ Length of the serialized message, in bytes
-
get_bytes
()¶ Serialize the message :returns: Serialized message :rtype:
bytearray
-
-
class
pykafka.protocol.
MetadataResponseV4
(buff)¶ Bases:
pykafka.protocol.metadata.MetadataResponseV3
Response from MetadataRequest Specification:: Metadata Response (Version: 4) => throttle_time_ms [brokers] cluster_id controller_id [topic_metadata]
throttle_time_ms => INT32 brokers => node_id host port rack
node_id => INT32 host => STRING port => INT32 rack => NULLABLE_STRINGcluster_id => NULLABLE_STRING controller_id => INT32 topic_metadata => error_code topic is_internal [partition_metadata]
error_code => INT16 topic => STRING is_internal => BOOLEAN partition_metadata => error_code partition leader [replicas] [isr]
error_code => INT16 partition => INT32 leader => INT32 replicas => INT32 isr => INT32
-
class
pykafka.protocol.
MetadataRequestV5
(topics=None, allow_topic_autocreation=True)¶ Bases:
pykafka.protocol.metadata.MetadataRequestV4
-
class
pykafka.protocol.
MetadataResponseV5
(buff)¶ Bases:
pykafka.protocol.metadata.MetadataResponseV4
Response from MetadataRequest Specification:: Metadata Response (Version: 5) => throttle_time_ms [brokers] cluster_id controller_id [topic_metadata]
throttle_time_ms => INT32 brokers => node_id host port rack
node_id => INT32 host => STRING port => INT32 rack => NULLABLE_STRINGcluster_id => NULLABLE_STRING controller_id => INT32 topic_metadata => error_code topic is_internal [partition_metadata]
error_code => INT16 topic => STRING is_internal => BOOLEAN partition_metadata => error_code partition leader [replicas] [isr] [offline_replicas]
error_code => INT16 partition => INT32 leader => INT32 replicas => INT32 isr => INT32 offline_replicas => INT32 (new since v4)-
__init__
(buff)¶ Deserialize into a new Response :param buff: Serialized message :type buff:
bytearray
-