Major fixes and new features
All checks were successful
continuous-integration/drone/push Build is passing

This commit is contained in:
2025-09-25 15:51:48 +09:00
parent dd7349bb4c
commit ddce9f5125
5586 changed files with 1470941 additions and 0 deletions

View File

@@ -0,0 +1,7 @@
from __future__ import absolute_import
from kafka.producer.kafka import KafkaProducer
__all__ = [
'KafkaProducer'
]

View File

@@ -0,0 +1,115 @@
from __future__ import absolute_import, division
import collections
import io
import threading
import time
from kafka.metrics.stats import Rate
import kafka.errors as Errors
class SimpleBufferPool(object):
"""A simple pool of BytesIO objects with a weak memory ceiling."""
def __init__(self, memory, poolable_size, metrics=None, metric_group_prefix='producer-metrics'):
"""Create a new buffer pool.
Arguments:
memory (int): maximum memory that this buffer pool can allocate
poolable_size (int): memory size per buffer to cache in the free
list rather than deallocating
"""
self._poolable_size = poolable_size
self._lock = threading.RLock()
buffers = int(memory / poolable_size) if poolable_size else 0
self._free = collections.deque([io.BytesIO() for _ in range(buffers)])
self._waiters = collections.deque()
self.wait_time = None
if metrics:
self.wait_time = metrics.sensor('bufferpool-wait-time')
self.wait_time.add(metrics.metric_name(
'bufferpool-wait-ratio', metric_group_prefix,
'The fraction of time an appender waits for space allocation.'),
Rate())
def allocate(self, size, max_time_to_block_ms):
"""
Allocate a buffer of the given size. This method blocks if there is not
enough memory and the buffer pool is configured with blocking mode.
Arguments:
size (int): The buffer size to allocate in bytes [ignored]
max_time_to_block_ms (int): The maximum time in milliseconds to
block for buffer memory to be available
Returns:
io.BytesIO
"""
with self._lock:
# check if we have a free buffer of the right size pooled
if self._free:
return self._free.popleft()
elif self._poolable_size == 0:
return io.BytesIO()
else:
# we are out of buffers and will have to block
buf = None
more_memory = threading.Condition(self._lock)
self._waiters.append(more_memory)
# loop over and over until we have a buffer or have reserved
# enough memory to allocate one
while buf is None:
start_wait = time.time()
more_memory.wait(max_time_to_block_ms / 1000.0)
end_wait = time.time()
if self.wait_time:
self.wait_time.record(end_wait - start_wait)
if self._free:
buf = self._free.popleft()
else:
self._waiters.remove(more_memory)
raise Errors.KafkaTimeoutError(
"Failed to allocate memory within the configured"
" max blocking time")
# remove the condition for this thread to let the next thread
# in line start getting memory
removed = self._waiters.popleft()
assert removed is more_memory, 'Wrong condition'
# signal any additional waiters if there is more memory left
# over for them
if self._free and self._waiters:
self._waiters[0].notify()
# unlock and return the buffer
return buf
def deallocate(self, buf):
"""
Return buffers to the pool. If they are of the poolable size add them
to the free list, otherwise just mark the memory as free.
Arguments:
buffer_ (io.BytesIO): The buffer to return
"""
with self._lock:
# BytesIO.truncate here makes the pool somewhat pointless
# but we stick with the BufferPool API until migrating to
# bytesarray / memoryview. The buffer we return must not
# expose any prior data on read().
buf.truncate(0)
self._free.append(buf)
if self._waiters:
self._waiters[0].notify()
def queued(self):
"""The number of threads blocked waiting on memory."""
with self._lock:
return len(self._waiters)

View File

@@ -0,0 +1,71 @@
from __future__ import absolute_import
import collections
import threading
from kafka import errors as Errors
from kafka.future import Future
class FutureProduceResult(Future):
def __init__(self, topic_partition):
super(FutureProduceResult, self).__init__()
self.topic_partition = topic_partition
self._latch = threading.Event()
def success(self, value):
ret = super(FutureProduceResult, self).success(value)
self._latch.set()
return ret
def failure(self, error):
ret = super(FutureProduceResult, self).failure(error)
self._latch.set()
return ret
def wait(self, timeout=None):
# wait() on python2.6 returns None instead of the flag value
return self._latch.wait(timeout) or self._latch.is_set()
class FutureRecordMetadata(Future):
def __init__(self, produce_future, relative_offset, timestamp_ms, checksum, serialized_key_size, serialized_value_size, serialized_header_size):
super(FutureRecordMetadata, self).__init__()
self._produce_future = produce_future
# packing args as a tuple is a minor speed optimization
self.args = (relative_offset, timestamp_ms, checksum, serialized_key_size, serialized_value_size, serialized_header_size)
produce_future.add_callback(self._produce_success)
produce_future.add_errback(self.failure)
def _produce_success(self, offset_and_timestamp):
offset, produce_timestamp_ms, log_start_offset = offset_and_timestamp
# Unpacking from args tuple is minor speed optimization
(relative_offset, timestamp_ms, checksum,
serialized_key_size, serialized_value_size, serialized_header_size) = self.args
# None is when Broker does not support the API (<0.10) and
# -1 is when the broker is configured for CREATE_TIME timestamps
if produce_timestamp_ms is not None and produce_timestamp_ms != -1:
timestamp_ms = produce_timestamp_ms
if offset != -1 and relative_offset is not None:
offset += relative_offset
tp = self._produce_future.topic_partition
metadata = RecordMetadata(tp[0], tp[1], tp, offset, timestamp_ms, log_start_offset,
checksum, serialized_key_size,
serialized_value_size, serialized_header_size)
self.success(metadata)
def get(self, timeout=None):
if not self.is_done and not self._produce_future.wait(timeout):
raise Errors.KafkaTimeoutError(
"Timeout after waiting for %s secs." % (timeout,))
assert self.is_done
if self.failed():
raise self.exception # pylint: disable-msg=raising-bad-type
return self.value
RecordMetadata = collections.namedtuple(
'RecordMetadata', ['topic', 'partition', 'topic_partition', 'offset', 'timestamp', 'log_start_offset',
'checksum', 'serialized_key_size', 'serialized_value_size', 'serialized_header_size'])

View File

@@ -0,0 +1,749 @@
from __future__ import absolute_import
import atexit
import copy
import logging
import socket
import threading
import time
import weakref
from kafka.vendor import six
import kafka.errors as Errors
from kafka.client_async import KafkaClient, selectors
from kafka.codec import has_gzip, has_snappy, has_lz4, has_zstd
from kafka.metrics import MetricConfig, Metrics
from kafka.partitioner.default import DefaultPartitioner
from kafka.producer.future import FutureRecordMetadata, FutureProduceResult
from kafka.producer.record_accumulator import AtomicInteger, RecordAccumulator
from kafka.producer.sender import Sender
from kafka.record.default_records import DefaultRecordBatchBuilder
from kafka.record.legacy_records import LegacyRecordBatchBuilder
from kafka.serializer import Serializer
from kafka.structs import TopicPartition
log = logging.getLogger(__name__)
PRODUCER_CLIENT_ID_SEQUENCE = AtomicInteger()
class KafkaProducer(object):
"""A Kafka client that publishes records to the Kafka cluster.
The producer is thread safe and sharing a single producer instance across
threads will generally be faster than having multiple instances.
The producer consists of a pool of buffer space that holds records that
haven't yet been transmitted to the server as well as a background I/O
thread that is responsible for turning these records into requests and
transmitting them to the cluster.
:meth:`~kafka.KafkaProducer.send` is asynchronous. When called it adds the
record to a buffer of pending record sends and immediately returns. This
allows the producer to batch together individual records for efficiency.
The 'acks' config controls the criteria under which requests are considered
complete. The "all" setting will result in blocking on the full commit of
the record, the slowest but most durable setting.
If the request fails, the producer can automatically retry, unless
'retries' is configured to 0. Enabling retries also opens up the
possibility of duplicates (see the documentation on message
delivery semantics for details:
https://kafka.apache.org/documentation.html#semantics
).
The producer maintains buffers of unsent records for each partition. These
buffers are of a size specified by the 'batch_size' config. Making this
larger can result in more batching, but requires more memory (since we will
generally have one of these buffers for each active partition).
By default a buffer is available to send immediately even if there is
additional unused space in the buffer. However if you want to reduce the
number of requests you can set 'linger_ms' to something greater than 0.
This will instruct the producer to wait up to that number of milliseconds
before sending a request in hope that more records will arrive to fill up
the same batch. This is analogous to Nagle's algorithm in TCP. Note that
records that arrive close together in time will generally batch together
even with linger_ms=0 so under heavy load batching will occur regardless of
the linger configuration; however setting this to something larger than 0
can lead to fewer, more efficient requests when not under maximal load at
the cost of a small amount of latency.
The buffer_memory controls the total amount of memory available to the
producer for buffering. If records are sent faster than they can be
transmitted to the server then this buffer space will be exhausted. When
the buffer space is exhausted additional send calls will block.
The key_serializer and value_serializer instruct how to turn the key and
value objects the user provides into bytes.
Keyword Arguments:
bootstrap_servers: 'host[:port]' string (or list of 'host[:port]'
strings) that the producer should contact to bootstrap initial
cluster metadata. This does not have to be the full node list.
It just needs to have at least one broker that will respond to a
Metadata API Request. Default port is 9092. If no servers are
specified, will default to localhost:9092.
client_id (str): a name for this client. This string is passed in
each request to servers and can be used to identify specific
server-side log entries that correspond to this client.
Default: 'kafka-python-producer-#' (appended with a unique number
per instance)
key_serializer (callable): used to convert user-supplied keys to bytes
If not None, called as f(key), should return bytes. Default: None.
value_serializer (callable): used to convert user-supplied message
values to bytes. If not None, called as f(value), should return
bytes. Default: None.
acks (0, 1, 'all'): The number of acknowledgments the producer requires
the leader to have received before considering a request complete.
This controls the durability of records that are sent. The
following settings are common:
0: Producer will not wait for any acknowledgment from the server.
The message will immediately be added to the socket
buffer and considered sent. No guarantee can be made that the
server has received the record in this case, and the retries
configuration will not take effect (as the client won't
generally know of any failures). The offset given back for each
record will always be set to -1.
1: Wait for leader to write the record to its local log only.
Broker will respond without awaiting full acknowledgement from
all followers. In this case should the leader fail immediately
after acknowledging the record but before the followers have
replicated it then the record will be lost.
all: Wait for the full set of in-sync replicas to write the record.
This guarantees that the record will not be lost as long as at
least one in-sync replica remains alive. This is the strongest
available guarantee.
If unset, defaults to acks=1.
compression_type (str): The compression type for all data generated by
the producer. Valid values are 'gzip', 'snappy', 'lz4', 'zstd' or None.
Compression is of full batches of data, so the efficacy of batching
will also impact the compression ratio (more batching means better
compression). Default: None.
retries (int): Setting a value greater than zero will cause the client
to resend any record whose send fails with a potentially transient
error. Note that this retry is no different than if the client
resent the record upon receiving the error. Allowing retries
without setting max_in_flight_requests_per_connection to 1 will
potentially change the ordering of records because if two batches
are sent to a single partition, and the first fails and is retried
but the second succeeds, then the records in the second batch may
appear first.
Default: 0.
batch_size (int): Requests sent to brokers will contain multiple
batches, one for each partition with data available to be sent.
A small batch size will make batching less common and may reduce
throughput (a batch size of zero will disable batching entirely).
Default: 16384
linger_ms (int): The producer groups together any records that arrive
in between request transmissions into a single batched request.
Normally this occurs only under load when records arrive faster
than they can be sent out. However in some circumstances the client
may want to reduce the number of requests even under moderate load.
This setting accomplishes this by adding a small amount of
artificial delay; that is, rather than immediately sending out a
record the producer will wait for up to the given delay to allow
other records to be sent so that the sends can be batched together.
This can be thought of as analogous to Nagle's algorithm in TCP.
This setting gives the upper bound on the delay for batching: once
we get batch_size worth of records for a partition it will be sent
immediately regardless of this setting, however if we have fewer
than this many bytes accumulated for this partition we will
'linger' for the specified time waiting for more records to show
up. This setting defaults to 0 (i.e. no delay). Setting linger_ms=5
would have the effect of reducing the number of requests sent but
would add up to 5ms of latency to records sent in the absence of
load. Default: 0.
partitioner (callable): Callable used to determine which partition
each message is assigned to. Called (after key serialization):
partitioner(key_bytes, all_partitions, available_partitions).
The default partitioner implementation hashes each non-None key
using the same murmur2 algorithm as the java client so that
messages with the same key are assigned to the same partition.
When a key is None, the message is delivered to a random partition
(filtered to partitions with available leaders only, if possible).
buffer_memory (int): The total bytes of memory the producer should use
to buffer records waiting to be sent to the server. If records are
sent faster than they can be delivered to the server the producer
will block up to max_block_ms, raising an exception on timeout.
In the current implementation, this setting is an approximation.
Default: 33554432 (32MB)
connections_max_idle_ms: Close idle connections after the number of
milliseconds specified by this config. The broker closes idle
connections after connections.max.idle.ms, so this avoids hitting
unexpected socket disconnected errors on the client.
Default: 540000
max_block_ms (int): Number of milliseconds to block during
:meth:`~kafka.KafkaProducer.send` and
:meth:`~kafka.KafkaProducer.partitions_for`. These methods can be
blocked either because the buffer is full or metadata unavailable.
Blocking in the user-supplied serializers or partitioner will not be
counted against this timeout. Default: 60000.
max_request_size (int): The maximum size of a request. This is also
effectively a cap on the maximum record size. Note that the server
has its own cap on record size which may be different from this.
This setting will limit the number of record batches the producer
will send in a single request to avoid sending huge requests.
Default: 1048576.
metadata_max_age_ms (int): The period of time in milliseconds after
which we force a refresh of metadata even if we haven't seen any
partition leadership changes to proactively discover any new
brokers or partitions. Default: 300000
retry_backoff_ms (int): Milliseconds to backoff when retrying on
errors. Default: 100.
request_timeout_ms (int): Client request timeout in milliseconds.
Default: 30000.
receive_buffer_bytes (int): The size of the TCP receive buffer
(SO_RCVBUF) to use when reading data. Default: None (relies on
system defaults). Java client defaults to 32768.
send_buffer_bytes (int): The size of the TCP send buffer
(SO_SNDBUF) to use when sending data. Default: None (relies on
system defaults). Java client defaults to 131072.
socket_options (list): List of tuple-arguments to socket.setsockopt
to apply to broker connection sockets. Default:
[(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)]
reconnect_backoff_ms (int): The amount of time in milliseconds to
wait before attempting to reconnect to a given host.
Default: 50.
reconnect_backoff_max_ms (int): The maximum amount of time in
milliseconds to backoff/wait when reconnecting to a broker that has
repeatedly failed to connect. If provided, the backoff per host
will increase exponentially for each consecutive connection
failure, up to this maximum. Once the maximum is reached,
reconnection attempts will continue periodically with this fixed
rate. To avoid connection storms, a randomization factor of 0.2
will be applied to the backoff resulting in a random range between
20% below and 20% above the computed value. Default: 1000.
max_in_flight_requests_per_connection (int): Requests are pipelined
to kafka brokers up to this number of maximum requests per
broker connection. Note that if this setting is set to be greater
than 1 and there are failed sends, there is a risk of message
re-ordering due to retries (i.e., if retries are enabled).
Default: 5.
security_protocol (str): Protocol used to communicate with brokers.
Valid values are: PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL.
Default: PLAINTEXT.
ssl_context (ssl.SSLContext): pre-configured SSLContext for wrapping
socket connections. If provided, all other ssl_* configurations
will be ignored. Default: None.
ssl_check_hostname (bool): flag to configure whether ssl handshake
should verify that the certificate matches the brokers hostname.
default: true.
ssl_cafile (str): optional filename of ca file to use in certificate
veriication. default: none.
ssl_certfile (str): optional filename of file in pem format containing
the client certificate, as well as any ca certificates needed to
establish the certificate's authenticity. default: none.
ssl_keyfile (str): optional filename containing the client private key.
default: none.
ssl_password (str): optional password to be used when loading the
certificate chain. default: none.
ssl_crlfile (str): optional filename containing the CRL to check for
certificate expiration. By default, no CRL check is done. When
providing a file, only the leaf certificate will be checked against
this CRL. The CRL can only be checked with Python 3.4+ or 2.7.9+.
default: none.
ssl_ciphers (str): optionally set the available ciphers for ssl
connections. It should be a string in the OpenSSL cipher list
format. If no cipher can be selected (because compile-time options
or other configuration forbids use of all the specified ciphers),
an ssl.SSLError will be raised. See ssl.SSLContext.set_ciphers
api_version (tuple): Specify which Kafka API version to use. If set to
None, the client will attempt to infer the broker version by probing
various APIs. Example: (0, 10, 2). Default: None
api_version_auto_timeout_ms (int): number of milliseconds to throw a
timeout exception from the constructor when checking the broker
api version. Only applies if api_version set to None.
metric_reporters (list): A list of classes to use as metrics reporters.
Implementing the AbstractMetricsReporter interface allows plugging
in classes that will be notified of new metric creation. Default: []
metrics_num_samples (int): The number of samples maintained to compute
metrics. Default: 2
metrics_sample_window_ms (int): The maximum age in milliseconds of
samples used to compute metrics. Default: 30000
selector (selectors.BaseSelector): Provide a specific selector
implementation to use for I/O multiplexing.
Default: selectors.DefaultSelector
sasl_mechanism (str): Authentication mechanism when security_protocol
is configured for SASL_PLAINTEXT or SASL_SSL. Valid values are:
PLAIN, GSSAPI, OAUTHBEARER, SCRAM-SHA-256, SCRAM-SHA-512.
sasl_plain_username (str): username for sasl PLAIN and SCRAM authentication.
Required if sasl_mechanism is PLAIN or one of the SCRAM mechanisms.
sasl_plain_password (str): password for sasl PLAIN and SCRAM authentication.
Required if sasl_mechanism is PLAIN or one of the SCRAM mechanisms.
sasl_kerberos_service_name (str): Service name to include in GSSAPI
sasl mechanism handshake. Default: 'kafka'
sasl_kerberos_domain_name (str): kerberos domain name to use in GSSAPI
sasl mechanism handshake. Default: one of bootstrap servers
sasl_oauth_token_provider (AbstractTokenProvider): OAuthBearer token provider
instance. (See kafka.oauth.abstract). Default: None
Note:
Configuration parameters are described in more detail at
https://kafka.apache.org/0100/configuration.html#producerconfigs
"""
DEFAULT_CONFIG = {
'bootstrap_servers': 'localhost',
'client_id': None,
'key_serializer': None,
'value_serializer': None,
'acks': 1,
'bootstrap_topics_filter': set(),
'compression_type': None,
'retries': 0,
'batch_size': 16384,
'linger_ms': 0,
'partitioner': DefaultPartitioner(),
'buffer_memory': 33554432,
'connections_max_idle_ms': 9 * 60 * 1000,
'max_block_ms': 60000,
'max_request_size': 1048576,
'metadata_max_age_ms': 300000,
'retry_backoff_ms': 100,
'request_timeout_ms': 30000,
'receive_buffer_bytes': None,
'send_buffer_bytes': None,
'socket_options': [(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)],
'sock_chunk_bytes': 4096, # undocumented experimental option
'sock_chunk_buffer_count': 1000, # undocumented experimental option
'reconnect_backoff_ms': 50,
'reconnect_backoff_max_ms': 1000,
'max_in_flight_requests_per_connection': 5,
'security_protocol': 'PLAINTEXT',
'ssl_context': None,
'ssl_check_hostname': True,
'ssl_cafile': None,
'ssl_certfile': None,
'ssl_keyfile': None,
'ssl_crlfile': None,
'ssl_password': None,
'ssl_ciphers': None,
'api_version': None,
'api_version_auto_timeout_ms': 2000,
'metric_reporters': [],
'metrics_num_samples': 2,
'metrics_sample_window_ms': 30000,
'selector': selectors.DefaultSelector,
'sasl_mechanism': None,
'sasl_plain_username': None,
'sasl_plain_password': None,
'sasl_kerberos_service_name': 'kafka',
'sasl_kerberos_domain_name': None,
'sasl_oauth_token_provider': None
}
_COMPRESSORS = {
'gzip': (has_gzip, LegacyRecordBatchBuilder.CODEC_GZIP),
'snappy': (has_snappy, LegacyRecordBatchBuilder.CODEC_SNAPPY),
'lz4': (has_lz4, LegacyRecordBatchBuilder.CODEC_LZ4),
'zstd': (has_zstd, DefaultRecordBatchBuilder.CODEC_ZSTD),
None: (lambda: True, LegacyRecordBatchBuilder.CODEC_NONE),
}
def __init__(self, **configs):
log.debug("Starting the Kafka producer") # trace
self.config = copy.copy(self.DEFAULT_CONFIG)
for key in self.config:
if key in configs:
self.config[key] = configs.pop(key)
# Only check for extra config keys in top-level class
assert not configs, 'Unrecognized configs: %s' % (configs,)
if self.config['client_id'] is None:
self.config['client_id'] = 'kafka-python-producer-%s' % \
(PRODUCER_CLIENT_ID_SEQUENCE.increment(),)
if self.config['acks'] == 'all':
self.config['acks'] = -1
# api_version was previously a str. accept old format for now
if isinstance(self.config['api_version'], str):
deprecated = self.config['api_version']
if deprecated == 'auto':
self.config['api_version'] = None
else:
self.config['api_version'] = tuple(map(int, deprecated.split('.')))
log.warning('use api_version=%s [tuple] -- "%s" as str is deprecated',
str(self.config['api_version']), deprecated)
# Configure metrics
metrics_tags = {'client-id': self.config['client_id']}
metric_config = MetricConfig(samples=self.config['metrics_num_samples'],
time_window_ms=self.config['metrics_sample_window_ms'],
tags=metrics_tags)
reporters = [reporter() for reporter in self.config['metric_reporters']]
self._metrics = Metrics(metric_config, reporters)
client = KafkaClient(metrics=self._metrics, metric_group_prefix='producer',
wakeup_timeout_ms=self.config['max_block_ms'],
**self.config)
# Get auto-discovered version from client if necessary
if self.config['api_version'] is None:
self.config['api_version'] = client.config['api_version']
if self.config['compression_type'] == 'lz4':
assert self.config['api_version'] >= (0, 8, 2), 'LZ4 Requires >= Kafka 0.8.2 Brokers'
if self.config['compression_type'] == 'zstd':
assert self.config['api_version'] >= (2, 1, 0), 'Zstd Requires >= Kafka 2.1.0 Brokers'
# Check compression_type for library support
ct = self.config['compression_type']
if ct not in self._COMPRESSORS:
raise ValueError("Not supported codec: {}".format(ct))
else:
checker, compression_attrs = self._COMPRESSORS[ct]
assert checker(), "Libraries for {} compression codec not found".format(ct)
self.config['compression_attrs'] = compression_attrs
message_version = self._max_usable_produce_magic()
self._accumulator = RecordAccumulator(message_version=message_version, metrics=self._metrics, **self.config)
self._metadata = client.cluster
guarantee_message_order = bool(self.config['max_in_flight_requests_per_connection'] == 1)
self._sender = Sender(client, self._metadata,
self._accumulator, self._metrics,
guarantee_message_order=guarantee_message_order,
**self.config)
self._sender.daemon = True
self._sender.start()
self._closed = False
self._cleanup = self._cleanup_factory()
atexit.register(self._cleanup)
log.debug("Kafka producer started")
def bootstrap_connected(self):
"""Return True if the bootstrap is connected."""
return self._sender.bootstrap_connected()
def _cleanup_factory(self):
"""Build a cleanup clojure that doesn't increase our ref count"""
_self = weakref.proxy(self)
def wrapper():
try:
_self.close(timeout=0)
except (ReferenceError, AttributeError):
pass
return wrapper
def _unregister_cleanup(self):
if getattr(self, '_cleanup', None):
if hasattr(atexit, 'unregister'):
atexit.unregister(self._cleanup) # pylint: disable=no-member
# py2 requires removing from private attribute...
else:
# ValueError on list.remove() if the exithandler no longer exists
# but that is fine here
try:
atexit._exithandlers.remove( # pylint: disable=no-member
(self._cleanup, (), {}))
except ValueError:
pass
self._cleanup = None
def __del__(self):
# Disable logger during destruction to avoid touching dangling references
class NullLogger(object):
def __getattr__(self, name):
return lambda *args: None
global log
log = NullLogger()
self.close()
def close(self, timeout=None):
"""Close this producer.
Arguments:
timeout (float, optional): timeout in seconds to wait for completion.
"""
# drop our atexit handler now to avoid leaks
self._unregister_cleanup()
if not hasattr(self, '_closed') or self._closed:
log.info('Kafka producer closed')
return
if timeout is None:
# threading.TIMEOUT_MAX is available in Python3.3+
timeout = getattr(threading, 'TIMEOUT_MAX', float('inf'))
if getattr(threading, 'TIMEOUT_MAX', False):
assert 0 <= timeout <= getattr(threading, 'TIMEOUT_MAX')
else:
assert timeout >= 0
log.info("Closing the Kafka producer with %s secs timeout.", timeout)
invoked_from_callback = bool(threading.current_thread() is self._sender)
if timeout > 0:
if invoked_from_callback:
log.warning("Overriding close timeout %s secs to 0 in order to"
" prevent useless blocking due to self-join. This"
" means you have incorrectly invoked close with a"
" non-zero timeout from the producer call-back.",
timeout)
else:
# Try to close gracefully.
if self._sender is not None:
self._sender.initiate_close()
self._sender.join(timeout)
if self._sender is not None and self._sender.is_alive():
log.info("Proceeding to force close the producer since pending"
" requests could not be completed within timeout %s.",
timeout)
self._sender.force_close()
self._metrics.close()
try:
self.config['key_serializer'].close()
except AttributeError:
pass
try:
self.config['value_serializer'].close()
except AttributeError:
pass
self._closed = True
log.debug("The Kafka producer has closed.")
def partitions_for(self, topic):
"""Returns set of all known partitions for the topic."""
max_wait = self.config['max_block_ms'] / 1000.0
return self._wait_on_metadata(topic, max_wait)
def _max_usable_produce_magic(self):
if self.config['api_version'] >= (0, 11):
return 2
elif self.config['api_version'] >= (0, 10):
return 1
else:
return 0
def _estimate_size_in_bytes(self, key, value, headers=[]):
magic = self._max_usable_produce_magic()
if magic == 2:
return DefaultRecordBatchBuilder.estimate_size_in_bytes(
key, value, headers)
else:
return LegacyRecordBatchBuilder.estimate_size_in_bytes(
magic, self.config['compression_type'], key, value)
def send(self, topic, value=None, key=None, headers=None, partition=None, timestamp_ms=None):
"""Publish a message to a topic.
Arguments:
topic (str): topic where the message will be published
value (optional): message value. Must be type bytes, or be
serializable to bytes via configured value_serializer. If value
is None, key is required and message acts as a 'delete'.
See kafka compaction documentation for more details:
https://kafka.apache.org/documentation.html#compaction
(compaction requires kafka >= 0.8.1)
partition (int, optional): optionally specify a partition. If not
set, the partition will be selected using the configured
'partitioner'.
key (optional): a key to associate with the message. Can be used to
determine which partition to send the message to. If partition
is None (and producer's partitioner config is left as default),
then messages with the same key will be delivered to the same
partition (but if key is None, partition is chosen randomly).
Must be type bytes, or be serializable to bytes via configured
key_serializer.
headers (optional): a list of header key value pairs. List items
are tuples of str key and bytes value.
timestamp_ms (int, optional): epoch milliseconds (from Jan 1 1970 UTC)
to use as the message timestamp. Defaults to current time.
Returns:
FutureRecordMetadata: resolves to RecordMetadata
Raises:
KafkaTimeoutError: if unable to fetch topic metadata, or unable
to obtain memory buffer prior to configured max_block_ms
"""
assert value is not None or self.config['api_version'] >= (0, 8, 1), (
'Null messages require kafka >= 0.8.1')
assert not (value is None and key is None), 'Need at least one: key or value'
key_bytes = value_bytes = None
try:
self._wait_on_metadata(topic, self.config['max_block_ms'] / 1000.0)
key_bytes = self._serialize(
self.config['key_serializer'],
topic, key)
value_bytes = self._serialize(
self.config['value_serializer'],
topic, value)
assert type(key_bytes) in (bytes, bytearray, memoryview, type(None))
assert type(value_bytes) in (bytes, bytearray, memoryview, type(None))
partition = self._partition(topic, partition, key, value,
key_bytes, value_bytes)
if headers is None:
headers = []
assert type(headers) == list
assert all(type(item) == tuple and len(item) == 2 and type(item[0]) == str and type(item[1]) == bytes for item in headers)
message_size = self._estimate_size_in_bytes(key_bytes, value_bytes, headers)
self._ensure_valid_record_size(message_size)
tp = TopicPartition(topic, partition)
log.debug("Sending (key=%r value=%r headers=%r) to %s", key, value, headers, tp)
result = self._accumulator.append(tp, timestamp_ms,
key_bytes, value_bytes, headers,
self.config['max_block_ms'],
estimated_size=message_size)
future, batch_is_full, new_batch_created = result
if batch_is_full or new_batch_created:
log.debug("Waking up the sender since %s is either full or"
" getting a new batch", tp)
self._sender.wakeup()
return future
# handling exceptions and record the errors;
# for API exceptions return them in the future,
# for other exceptions raise directly
except Errors.BrokerResponseError as e:
log.debug("Exception occurred during message send: %s", e)
return FutureRecordMetadata(
FutureProduceResult(TopicPartition(topic, partition)),
-1, None, None,
len(key_bytes) if key_bytes is not None else -1,
len(value_bytes) if value_bytes is not None else -1,
sum(len(h_key.encode("utf-8")) + len(h_value) for h_key, h_value in headers) if headers else -1,
).failure(e)
def flush(self, timeout=None):
"""
Invoking this method makes all buffered records immediately available
to send (even if linger_ms is greater than 0) and blocks on the
completion of the requests associated with these records. The
post-condition of :meth:`~kafka.KafkaProducer.flush` is that any
previously sent record will have completed
(e.g. Future.is_done() == True). A request is considered completed when
either it is successfully acknowledged according to the 'acks'
configuration for the producer, or it results in an error.
Other threads can continue sending messages while one thread is blocked
waiting for a flush call to complete; however, no guarantee is made
about the completion of messages sent after the flush call begins.
Arguments:
timeout (float, optional): timeout in seconds to wait for completion.
Raises:
KafkaTimeoutError: failure to flush buffered records within the
provided timeout
"""
log.debug("Flushing accumulated records in producer.") # trace
self._accumulator.begin_flush()
self._sender.wakeup()
self._accumulator.await_flush_completion(timeout=timeout)
def _ensure_valid_record_size(self, size):
"""Validate that the record size isn't too large."""
if size > self.config['max_request_size']:
raise Errors.MessageSizeTooLargeError(
"The message is %d bytes when serialized which is larger than"
" the maximum request size you have configured with the"
" max_request_size configuration" % (size,))
if size > self.config['buffer_memory']:
raise Errors.MessageSizeTooLargeError(
"The message is %d bytes when serialized which is larger than"
" the total memory buffer you have configured with the"
" buffer_memory configuration." % (size,))
def _wait_on_metadata(self, topic, max_wait):
"""
Wait for cluster metadata including partitions for the given topic to
be available.
Arguments:
topic (str): topic we want metadata for
max_wait (float): maximum time in secs for waiting on the metadata
Returns:
set: partition ids for the topic
Raises:
KafkaTimeoutError: if partitions for topic were not obtained before
specified max_wait timeout
"""
# add topic to metadata topic list if it is not there already.
self._sender.add_topic(topic)
begin = time.time()
elapsed = 0.0
metadata_event = None
while True:
partitions = self._metadata.partitions_for_topic(topic)
if partitions is not None:
return partitions
if not metadata_event:
metadata_event = threading.Event()
log.debug("Requesting metadata update for topic %s", topic)
metadata_event.clear()
future = self._metadata.request_update()
future.add_both(lambda e, *args: e.set(), metadata_event)
self._sender.wakeup()
metadata_event.wait(max_wait - elapsed)
elapsed = time.time() - begin
if not metadata_event.is_set():
raise Errors.KafkaTimeoutError(
"Failed to update metadata after %.1f secs." % (max_wait,))
elif topic in self._metadata.unauthorized_topics:
raise Errors.TopicAuthorizationFailedError(topic)
else:
log.debug("_wait_on_metadata woke after %s secs.", elapsed)
def _serialize(self, f, topic, data):
if not f:
return data
if isinstance(f, Serializer):
return f.serialize(topic, data)
return f(data)
def _partition(self, topic, partition, key, value,
serialized_key, serialized_value):
if partition is not None:
assert partition >= 0
assert partition in self._metadata.partitions_for_topic(topic), 'Unrecognized partition'
return partition
all_partitions = sorted(self._metadata.partitions_for_topic(topic))
available = list(self._metadata.available_partitions_for_topic(topic))
return self.config['partitioner'](serialized_key,
all_partitions,
available)
def metrics(self, raw=False):
"""Get metrics on producer performance.
This is ported from the Java Producer, for details see:
https://kafka.apache.org/documentation/#producer_monitoring
Warning:
This is an unstable interface. It may change in future
releases without warning.
"""
if raw:
return self._metrics.metrics.copy()
metrics = {}
for k, v in six.iteritems(self._metrics.metrics.copy()):
if k.group not in metrics:
metrics[k.group] = {}
if k.name not in metrics[k.group]:
metrics[k.group][k.name] = {}
metrics[k.group][k.name] = v.value()
return metrics

View File

@@ -0,0 +1,590 @@
from __future__ import absolute_import
import collections
import copy
import logging
import threading
import time
import kafka.errors as Errors
from kafka.producer.buffer import SimpleBufferPool
from kafka.producer.future import FutureRecordMetadata, FutureProduceResult
from kafka.record.memory_records import MemoryRecordsBuilder
from kafka.structs import TopicPartition
log = logging.getLogger(__name__)
class AtomicInteger(object):
def __init__(self, val=0):
self._lock = threading.Lock()
self._val = val
def increment(self):
with self._lock:
self._val += 1
return self._val
def decrement(self):
with self._lock:
self._val -= 1
return self._val
def get(self):
return self._val
class ProducerBatch(object):
def __init__(self, tp, records, buffer):
self.max_record_size = 0
now = time.time()
self.created = now
self.drained = None
self.attempts = 0
self.last_attempt = now
self.last_append = now
self.records = records
self.topic_partition = tp
self.produce_future = FutureProduceResult(tp)
self._retry = False
self._buffer = buffer # We only save it, we don't write to it
@property
def record_count(self):
return self.records.next_offset()
def try_append(self, timestamp_ms, key, value, headers):
metadata = self.records.append(timestamp_ms, key, value, headers)
if metadata is None:
return None
self.max_record_size = max(self.max_record_size, metadata.size)
self.last_append = time.time()
future = FutureRecordMetadata(self.produce_future, metadata.offset,
metadata.timestamp, metadata.crc,
len(key) if key is not None else -1,
len(value) if value is not None else -1,
sum(len(h_key.encode("utf-8")) + len(h_val) for h_key, h_val in headers) if headers else -1)
return future
def done(self, base_offset=None, timestamp_ms=None, exception=None, log_start_offset=None, global_error=None):
level = logging.DEBUG if exception is None else logging.WARNING
log.log(level, "Produced messages to topic-partition %s with base offset"
" %s log start offset %s and error %s.", self.topic_partition, base_offset,
log_start_offset, global_error) # trace
if self.produce_future.is_done:
log.warning('Batch is already closed -- ignoring batch.done()')
return
elif exception is None:
self.produce_future.success((base_offset, timestamp_ms, log_start_offset))
else:
self.produce_future.failure(exception)
def maybe_expire(self, request_timeout_ms, retry_backoff_ms, linger_ms, is_full):
"""Expire batches if metadata is not available
A batch whose metadata is not available should be expired if one
of the following is true:
* the batch is not in retry AND request timeout has elapsed after
it is ready (full or linger.ms has reached).
* the batch is in retry AND request timeout has elapsed after the
backoff period ended.
"""
now = time.time()
since_append = now - self.last_append
since_ready = now - (self.created + linger_ms / 1000.0)
since_backoff = now - (self.last_attempt + retry_backoff_ms / 1000.0)
timeout = request_timeout_ms / 1000.0
error = None
if not self.in_retry() and is_full and timeout < since_append:
error = "%d seconds have passed since last append" % (since_append,)
elif not self.in_retry() and timeout < since_ready:
error = "%d seconds have passed since batch creation plus linger time" % (since_ready,)
elif self.in_retry() and timeout < since_backoff:
error = "%d seconds have passed since last attempt plus backoff time" % (since_backoff,)
if error:
self.records.close()
self.done(-1, None, Errors.KafkaTimeoutError(
"Batch for %s containing %s record(s) expired: %s" % (
self.topic_partition, self.records.next_offset(), error)))
return True
return False
def in_retry(self):
return self._retry
def set_retry(self):
self._retry = True
def buffer(self):
return self._buffer
def __str__(self):
return 'ProducerBatch(topic_partition=%s, record_count=%d)' % (
self.topic_partition, self.records.next_offset())
class RecordAccumulator(object):
"""
This class maintains a dequeue per TopicPartition that accumulates messages
into MessageSets to be sent to the server.
The accumulator attempts to bound memory use, and append calls will block
when that memory is exhausted.
Keyword Arguments:
batch_size (int): Requests sent to brokers will contain multiple
batches, one for each partition with data available to be sent.
A small batch size will make batching less common and may reduce
throughput (a batch size of zero will disable batching entirely).
Default: 16384
buffer_memory (int): The total bytes of memory the producer should use
to buffer records waiting to be sent to the server. If records are
sent faster than they can be delivered to the server the producer
will block up to max_block_ms, raising an exception on timeout.
In the current implementation, this setting is an approximation.
Default: 33554432 (32MB)
compression_attrs (int): The compression type for all data generated by
the producer. Valid values are gzip(1), snappy(2), lz4(3), or
none(0).
Compression is of full batches of data, so the efficacy of batching
will also impact the compression ratio (more batching means better
compression). Default: None.
linger_ms (int): An artificial delay time to add before declaring a
messageset (that isn't full) ready for sending. This allows
time for more records to arrive. Setting a non-zero linger_ms
will trade off some latency for potentially better throughput
due to more batching (and hence fewer, larger requests).
Default: 0
retry_backoff_ms (int): An artificial delay time to retry the
produce request upon receiving an error. This avoids exhausting
all retries in a short period of time. Default: 100
"""
DEFAULT_CONFIG = {
'buffer_memory': 33554432,
'batch_size': 16384,
'compression_attrs': 0,
'linger_ms': 0,
'retry_backoff_ms': 100,
'message_version': 0,
'metrics': None,
'metric_group_prefix': 'producer-metrics',
}
def __init__(self, **configs):
self.config = copy.copy(self.DEFAULT_CONFIG)
for key in self.config:
if key in configs:
self.config[key] = configs.pop(key)
self._closed = False
self._flushes_in_progress = AtomicInteger()
self._appends_in_progress = AtomicInteger()
self._batches = collections.defaultdict(collections.deque) # TopicPartition: [ProducerBatch]
self._tp_locks = {None: threading.Lock()} # TopicPartition: Lock, plus a lock to add entries
self._free = SimpleBufferPool(self.config['buffer_memory'],
self.config['batch_size'],
metrics=self.config['metrics'],
metric_group_prefix=self.config['metric_group_prefix'])
self._incomplete = IncompleteProducerBatches()
# The following variables should only be accessed by the sender thread,
# so we don't need to protect them w/ locking.
self.muted = set()
self._drain_index = 0
def append(self, tp, timestamp_ms, key, value, headers, max_time_to_block_ms,
estimated_size=0):
"""Add a record to the accumulator, return the append result.
The append result will contain the future metadata, and flag for
whether the appended batch is full or a new batch is created
Arguments:
tp (TopicPartition): The topic/partition to which this record is
being sent
timestamp_ms (int): The timestamp of the record (epoch ms)
key (bytes): The key for the record
value (bytes): The value for the record
headers (List[Tuple[str, bytes]]): The header fields for the record
max_time_to_block_ms (int): The maximum time in milliseconds to
block for buffer memory to be available
Returns:
tuple: (future, batch_is_full, new_batch_created)
"""
assert isinstance(tp, TopicPartition), 'not TopicPartition'
assert not self._closed, 'RecordAccumulator is closed'
# We keep track of the number of appending thread to make sure we do
# not miss batches in abortIncompleteBatches().
self._appends_in_progress.increment()
try:
if tp not in self._tp_locks:
with self._tp_locks[None]:
if tp not in self._tp_locks:
self._tp_locks[tp] = threading.Lock()
with self._tp_locks[tp]:
# check if we have an in-progress batch
dq = self._batches[tp]
if dq:
last = dq[-1]
future = last.try_append(timestamp_ms, key, value, headers)
if future is not None:
batch_is_full = len(dq) > 1 or last.records.is_full()
return future, batch_is_full, False
size = max(self.config['batch_size'], estimated_size)
log.debug("Allocating a new %d byte message buffer for %s", size, tp) # trace
buf = self._free.allocate(size, max_time_to_block_ms)
with self._tp_locks[tp]:
# Need to check if producer is closed again after grabbing the
# dequeue lock.
assert not self._closed, 'RecordAccumulator is closed'
if dq:
last = dq[-1]
future = last.try_append(timestamp_ms, key, value, headers)
if future is not None:
# Somebody else found us a batch, return the one we
# waited for! Hopefully this doesn't happen often...
self._free.deallocate(buf)
batch_is_full = len(dq) > 1 or last.records.is_full()
return future, batch_is_full, False
records = MemoryRecordsBuilder(
self.config['message_version'],
self.config['compression_attrs'],
self.config['batch_size']
)
batch = ProducerBatch(tp, records, buf)
future = batch.try_append(timestamp_ms, key, value, headers)
if not future:
raise Exception()
dq.append(batch)
self._incomplete.add(batch)
batch_is_full = len(dq) > 1 or batch.records.is_full()
return future, batch_is_full, True
finally:
self._appends_in_progress.decrement()
def abort_expired_batches(self, request_timeout_ms, cluster):
"""Abort the batches that have been sitting in RecordAccumulator for
more than the configured request_timeout due to metadata being
unavailable.
Arguments:
request_timeout_ms (int): milliseconds to timeout
cluster (ClusterMetadata): current metadata for kafka cluster
Returns:
list of ProducerBatch that were expired
"""
expired_batches = []
to_remove = []
count = 0
for tp in list(self._batches.keys()):
assert tp in self._tp_locks, 'TopicPartition not in locks dict'
# We only check if the batch should be expired if the partition
# does not have a batch in flight. This is to avoid the later
# batches get expired when an earlier batch is still in progress.
# This protection only takes effect when user sets
# max.in.flight.request.per.connection=1. Otherwise the expiration
# order is not guranteed.
if tp in self.muted:
continue
with self._tp_locks[tp]:
# iterate over the batches and expire them if they have stayed
# in accumulator for more than request_timeout_ms
dq = self._batches[tp]
for batch in dq:
is_full = bool(bool(batch != dq[-1]) or batch.records.is_full())
# check if the batch is expired
if batch.maybe_expire(request_timeout_ms,
self.config['retry_backoff_ms'],
self.config['linger_ms'],
is_full):
expired_batches.append(batch)
to_remove.append(batch)
count += 1
self.deallocate(batch)
else:
# Stop at the first batch that has not expired.
break
# Python does not allow us to mutate the dq during iteration
# Assuming expired batches are infrequent, this is better than
# creating a new copy of the deque for iteration on every loop
if to_remove:
for batch in to_remove:
dq.remove(batch)
to_remove = []
if expired_batches:
log.warning("Expired %d batches in accumulator", count) # trace
return expired_batches
def reenqueue(self, batch):
"""Re-enqueue the given record batch in the accumulator to retry."""
now = time.time()
batch.attempts += 1
batch.last_attempt = now
batch.last_append = now
batch.set_retry()
assert batch.topic_partition in self._tp_locks, 'TopicPartition not in locks dict'
assert batch.topic_partition in self._batches, 'TopicPartition not in batches'
dq = self._batches[batch.topic_partition]
with self._tp_locks[batch.topic_partition]:
dq.appendleft(batch)
def ready(self, cluster):
"""
Get a list of nodes whose partitions are ready to be sent, and the
earliest time at which any non-sendable partition will be ready;
Also return the flag for whether there are any unknown leaders for the
accumulated partition batches.
A destination node is ready to send if:
* There is at least one partition that is not backing off its send
* and those partitions are not muted (to prevent reordering if
max_in_flight_requests_per_connection is set to 1)
* and any of the following are true:
* The record set is full
* The record set has sat in the accumulator for at least linger_ms
milliseconds
* The accumulator is out of memory and threads are blocking waiting
for data (in this case all partitions are immediately considered
ready).
* The accumulator has been closed
Arguments:
cluster (ClusterMetadata):
Returns:
tuple:
ready_nodes (set): node_ids that have ready batches
next_ready_check (float): secs until next ready after backoff
unknown_leaders_exist (bool): True if metadata refresh needed
"""
ready_nodes = set()
next_ready_check = 9999999.99
unknown_leaders_exist = False
now = time.time()
exhausted = bool(self._free.queued() > 0)
# several threads are accessing self._batches -- to simplify
# concurrent access, we iterate over a snapshot of partitions
# and lock each partition separately as needed
partitions = list(self._batches.keys())
for tp in partitions:
leader = cluster.leader_for_partition(tp)
if leader is None or leader == -1:
unknown_leaders_exist = True
continue
elif leader in ready_nodes:
continue
elif tp in self.muted:
continue
with self._tp_locks[tp]:
dq = self._batches[tp]
if not dq:
continue
batch = dq[0]
retry_backoff = self.config['retry_backoff_ms'] / 1000.0
linger = self.config['linger_ms'] / 1000.0
backing_off = bool(batch.attempts > 0 and
batch.last_attempt + retry_backoff > now)
waited_time = now - batch.last_attempt
time_to_wait = retry_backoff if backing_off else linger
time_left = max(time_to_wait - waited_time, 0)
full = bool(len(dq) > 1 or batch.records.is_full())
expired = bool(waited_time >= time_to_wait)
sendable = (full or expired or exhausted or self._closed or
self._flush_in_progress())
if sendable and not backing_off:
ready_nodes.add(leader)
else:
# Note that this results in a conservative estimate since
# an un-sendable partition may have a leader that will
# later be found to have sendable data. However, this is
# good enough since we'll just wake up and then sleep again
# for the remaining time.
next_ready_check = min(time_left, next_ready_check)
return ready_nodes, next_ready_check, unknown_leaders_exist
def has_unsent(self):
"""Return whether there is any unsent record in the accumulator."""
for tp in list(self._batches.keys()):
with self._tp_locks[tp]:
dq = self._batches[tp]
if len(dq):
return True
return False
def drain(self, cluster, nodes, max_size):
"""
Drain all the data for the given nodes and collate them into a list of
batches that will fit within the specified size on a per-node basis.
This method attempts to avoid choosing the same topic-node repeatedly.
Arguments:
cluster (ClusterMetadata): The current cluster metadata
nodes (list): list of node_ids to drain
max_size (int): maximum number of bytes to drain
Returns:
dict: {node_id: list of ProducerBatch} with total size less than the
requested max_size.
"""
if not nodes:
return {}
now = time.time()
batches = {}
for node_id in nodes:
size = 0
partitions = list(cluster.partitions_for_broker(node_id))
ready = []
# to make starvation less likely this loop doesn't start at 0
self._drain_index %= len(partitions)
start = self._drain_index
while True:
tp = partitions[self._drain_index]
if tp in self._batches and tp not in self.muted:
with self._tp_locks[tp]:
dq = self._batches[tp]
if dq:
first = dq[0]
backoff = (
bool(first.attempts > 0) and
bool(first.last_attempt +
self.config['retry_backoff_ms'] / 1000.0
> now)
)
# Only drain the batch if it is not during backoff
if not backoff:
if (size + first.records.size_in_bytes() > max_size
and len(ready) > 0):
# there is a rare case that a single batch
# size is larger than the request size due
# to compression; in this case we will
# still eventually send this batch in a
# single request
break
else:
batch = dq.popleft()
batch.records.close()
size += batch.records.size_in_bytes()
ready.append(batch)
batch.drained = now
self._drain_index += 1
self._drain_index %= len(partitions)
if start == self._drain_index:
break
batches[node_id] = ready
return batches
def deallocate(self, batch):
"""Deallocate the record batch."""
self._incomplete.remove(batch)
self._free.deallocate(batch.buffer())
def _flush_in_progress(self):
"""Are there any threads currently waiting on a flush?"""
return self._flushes_in_progress.get() > 0
def begin_flush(self):
"""
Initiate the flushing of data from the accumulator...this makes all
requests immediately ready
"""
self._flushes_in_progress.increment()
def await_flush_completion(self, timeout=None):
"""
Mark all partitions as ready to send and block until the send is complete
"""
try:
for batch in self._incomplete.all():
log.debug('Waiting on produce to %s',
batch.produce_future.topic_partition)
if not batch.produce_future.wait(timeout=timeout):
raise Errors.KafkaTimeoutError('Timeout waiting for future')
if not batch.produce_future.is_done:
raise Errors.UnknownError('Future not done')
if batch.produce_future.failed():
log.warning(batch.produce_future.exception)
finally:
self._flushes_in_progress.decrement()
def abort_incomplete_batches(self):
"""
This function is only called when sender is closed forcefully. It will fail all the
incomplete batches and return.
"""
# We need to keep aborting the incomplete batch until no thread is trying to append to
# 1. Avoid losing batches.
# 2. Free up memory in case appending threads are blocked on buffer full.
# This is a tight loop but should be able to get through very quickly.
while True:
self._abort_batches()
if not self._appends_in_progress.get():
break
# After this point, no thread will append any messages because they will see the close
# flag set. We need to do the last abort after no thread was appending in case the there was a new
# batch appended by the last appending thread.
self._abort_batches()
self._batches.clear()
def _abort_batches(self):
"""Go through incomplete batches and abort them."""
error = Errors.IllegalStateError("Producer is closed forcefully.")
for batch in self._incomplete.all():
tp = batch.topic_partition
# Close the batch before aborting
with self._tp_locks[tp]:
batch.records.close()
batch.done(exception=error)
self.deallocate(batch)
def close(self):
"""Close this accumulator and force all the record buffers to be drained."""
self._closed = True
class IncompleteProducerBatches(object):
"""A threadsafe helper class to hold ProducerBatches that haven't been ack'd yet"""
def __init__(self):
self._incomplete = set()
self._lock = threading.Lock()
def add(self, batch):
with self._lock:
return self._incomplete.add(batch)
def remove(self, batch):
with self._lock:
return self._incomplete.remove(batch)
def all(self):
with self._lock:
return list(self._incomplete)

View File

@@ -0,0 +1,517 @@
from __future__ import absolute_import, division
import collections
import copy
import logging
import threading
import time
from kafka.vendor import six
from kafka import errors as Errors
from kafka.metrics.measurable import AnonMeasurable
from kafka.metrics.stats import Avg, Max, Rate
from kafka.protocol.produce import ProduceRequest
from kafka.structs import TopicPartition
from kafka.version import __version__
log = logging.getLogger(__name__)
class Sender(threading.Thread):
"""
The background thread that handles the sending of produce requests to the
Kafka cluster. This thread makes metadata requests to renew its view of the
cluster and then sends produce requests to the appropriate nodes.
"""
DEFAULT_CONFIG = {
'max_request_size': 1048576,
'acks': 1,
'retries': 0,
'request_timeout_ms': 30000,
'guarantee_message_order': False,
'client_id': 'kafka-python-' + __version__,
'api_version': (0, 8, 0),
}
def __init__(self, client, metadata, accumulator, metrics, **configs):
super(Sender, self).__init__()
self.config = copy.copy(self.DEFAULT_CONFIG)
for key in self.config:
if key in configs:
self.config[key] = configs.pop(key)
self.name = self.config['client_id'] + '-network-thread'
self._client = client
self._accumulator = accumulator
self._metadata = client.cluster
self._running = True
self._force_close = False
self._topics_to_add = set()
self._sensors = SenderMetrics(metrics, self._client, self._metadata)
def run(self):
"""The main run loop for the sender thread."""
log.debug("Starting Kafka producer I/O thread.")
# main loop, runs until close is called
while self._running:
try:
self.run_once()
except Exception:
log.exception("Uncaught error in kafka producer I/O thread")
log.debug("Beginning shutdown of Kafka producer I/O thread, sending"
" remaining records.")
# okay we stopped accepting requests but there may still be
# requests in the accumulator or waiting for acknowledgment,
# wait until these are completed.
while (not self._force_close
and (self._accumulator.has_unsent()
or self._client.in_flight_request_count() > 0)):
try:
self.run_once()
except Exception:
log.exception("Uncaught error in kafka producer I/O thread")
if self._force_close:
# We need to fail all the incomplete batches and wake up the
# threads waiting on the futures.
self._accumulator.abort_incomplete_batches()
try:
self._client.close()
except Exception:
log.exception("Failed to close network client")
log.debug("Shutdown of Kafka producer I/O thread has completed.")
def run_once(self):
"""Run a single iteration of sending."""
while self._topics_to_add:
self._client.add_topic(self._topics_to_add.pop())
# get the list of partitions with data ready to send
result = self._accumulator.ready(self._metadata)
ready_nodes, next_ready_check_delay, unknown_leaders_exist = result
# if there are any partitions whose leaders are not known yet, force
# metadata update
if unknown_leaders_exist:
log.debug('Unknown leaders exist, requesting metadata update')
self._metadata.request_update()
# remove any nodes we aren't ready to send to
not_ready_timeout = float('inf')
for node in list(ready_nodes):
if not self._client.is_ready(node):
log.debug('Node %s not ready; delaying produce of accumulated batch', node)
self._client.maybe_connect(node, wakeup=False)
ready_nodes.remove(node)
not_ready_timeout = min(not_ready_timeout,
self._client.connection_delay(node))
# create produce requests
batches_by_node = self._accumulator.drain(
self._metadata, ready_nodes, self.config['max_request_size'])
if self.config['guarantee_message_order']:
# Mute all the partitions drained
for batch_list in six.itervalues(batches_by_node):
for batch in batch_list:
self._accumulator.muted.add(batch.topic_partition)
expired_batches = self._accumulator.abort_expired_batches(
self.config['request_timeout_ms'], self._metadata)
for expired_batch in expired_batches:
self._sensors.record_errors(expired_batch.topic_partition.topic, expired_batch.record_count)
self._sensors.update_produce_request_metrics(batches_by_node)
requests = self._create_produce_requests(batches_by_node)
# If we have any nodes that are ready to send + have sendable data,
# poll with 0 timeout so this can immediately loop and try sending more
# data. Otherwise, the timeout is determined by nodes that have
# partitions with data that isn't yet sendable (e.g. lingering, backing
# off). Note that this specifically does not include nodes with
# sendable data that aren't ready to send since they would cause busy
# looping.
poll_timeout_ms = min(next_ready_check_delay * 1000, not_ready_timeout)
if ready_nodes:
log.debug("Nodes with data ready to send: %s", ready_nodes) # trace
log.debug("Created %d produce requests: %s", len(requests), requests) # trace
poll_timeout_ms = 0
for node_id, request in six.iteritems(requests):
batches = batches_by_node[node_id]
log.debug('Sending Produce Request: %r', request)
(self._client.send(node_id, request, wakeup=False)
.add_callback(
self._handle_produce_response, node_id, time.time(), batches)
.add_errback(
self._failed_produce, batches, node_id))
# if some partitions are already ready to be sent, the select time
# would be 0; otherwise if some partition already has some data
# accumulated but not ready yet, the select time will be the time
# difference between now and its linger expiry time; otherwise the
# select time will be the time difference between now and the
# metadata expiry time
self._client.poll(timeout_ms=poll_timeout_ms)
def initiate_close(self):
"""Start closing the sender (won't complete until all data is sent)."""
self._running = False
self._accumulator.close()
self.wakeup()
def force_close(self):
"""Closes the sender without sending out any pending messages."""
self._force_close = True
self.initiate_close()
def add_topic(self, topic):
# This is generally called from a separate thread
# so this needs to be a thread-safe operation
# we assume that checking set membership across threads
# is ok where self._client._topics should never
# remove topics for a producer instance, only add them.
if topic not in self._client._topics:
self._topics_to_add.add(topic)
self.wakeup()
def _failed_produce(self, batches, node_id, error):
log.debug("Error sending produce request to node %d: %s", node_id, error) # trace
for batch in batches:
self._complete_batch(batch, error, -1, None)
def _handle_produce_response(self, node_id, send_time, batches, response):
"""Handle a produce response."""
# if we have a response, parse it
log.debug('Parsing produce response: %r', response)
if response:
batches_by_partition = dict([(batch.topic_partition, batch)
for batch in batches])
for topic, partitions in response.topics:
for partition_info in partitions:
global_error = None
log_start_offset = None
if response.API_VERSION < 2:
partition, error_code, offset = partition_info
ts = None
elif 2 <= response.API_VERSION <= 4:
partition, error_code, offset, ts = partition_info
elif 5 <= response.API_VERSION <= 7:
partition, error_code, offset, ts, log_start_offset = partition_info
else:
# the ignored parameter is record_error of type list[(batch_index: int, error_message: str)]
partition, error_code, offset, ts, log_start_offset, _, global_error = partition_info
tp = TopicPartition(topic, partition)
error = Errors.for_code(error_code)
batch = batches_by_partition[tp]
self._complete_batch(batch, error, offset, ts, log_start_offset, global_error)
if response.API_VERSION > 0:
self._sensors.record_throttle_time(response.throttle_time_ms, node=node_id)
else:
# this is the acks = 0 case, just complete all requests
for batch in batches:
self._complete_batch(batch, None, -1, None)
def _complete_batch(self, batch, error, base_offset, timestamp_ms=None, log_start_offset=None, global_error=None):
"""Complete or retry the given batch of records.
Arguments:
batch (RecordBatch): The record batch
error (Exception): The error (or None if none)
base_offset (int): The base offset assigned to the records if successful
timestamp_ms (int, optional): The timestamp returned by the broker for this batch
log_start_offset (int): The start offset of the log at the time this produce response was created
global_error (str): The summarising error message
"""
# Standardize no-error to None
if error is Errors.NoError:
error = None
if error is not None and self._can_retry(batch, error):
# retry
log.warning("Got error produce response on topic-partition %s,"
" retrying (%d attempts left). Error: %s",
batch.topic_partition,
self.config['retries'] - batch.attempts - 1,
global_error or error)
self._accumulator.reenqueue(batch)
self._sensors.record_retries(batch.topic_partition.topic, batch.record_count)
else:
if error is Errors.TopicAuthorizationFailedError:
error = error(batch.topic_partition.topic)
# tell the user the result of their request
batch.done(base_offset, timestamp_ms, error, log_start_offset, global_error)
self._accumulator.deallocate(batch)
if error is not None:
self._sensors.record_errors(batch.topic_partition.topic, batch.record_count)
if getattr(error, 'invalid_metadata', False):
self._metadata.request_update()
# Unmute the completed partition.
if self.config['guarantee_message_order']:
self._accumulator.muted.remove(batch.topic_partition)
def _can_retry(self, batch, error):
"""
We can retry a send if the error is transient and the number of
attempts taken is fewer than the maximum allowed
"""
return (batch.attempts < self.config['retries']
and getattr(error, 'retriable', False))
def _create_produce_requests(self, collated):
"""
Transfer the record batches into a list of produce requests on a
per-node basis.
Arguments:
collated: {node_id: [RecordBatch]}
Returns:
dict: {node_id: ProduceRequest} (version depends on api_version)
"""
requests = {}
for node_id, batches in six.iteritems(collated):
requests[node_id] = self._produce_request(
node_id, self.config['acks'],
self.config['request_timeout_ms'], batches)
return requests
def _produce_request(self, node_id, acks, timeout, batches):
"""Create a produce request from the given record batches.
Returns:
ProduceRequest (version depends on api_version)
"""
produce_records_by_partition = collections.defaultdict(dict)
for batch in batches:
topic = batch.topic_partition.topic
partition = batch.topic_partition.partition
buf = batch.records.buffer()
produce_records_by_partition[topic][partition] = buf
kwargs = {}
if self.config['api_version'] >= (2, 1):
version = 7
elif self.config['api_version'] >= (2, 0):
version = 6
elif self.config['api_version'] >= (1, 1):
version = 5
elif self.config['api_version'] >= (1, 0):
version = 4
elif self.config['api_version'] >= (0, 11):
version = 3
kwargs = dict(transactional_id=None)
elif self.config['api_version'] >= (0, 10):
version = 2
elif self.config['api_version'] == (0, 9):
version = 1
else:
version = 0
return ProduceRequest[version](
required_acks=acks,
timeout=timeout,
topics=[(topic, list(partition_info.items()))
for topic, partition_info
in six.iteritems(produce_records_by_partition)],
**kwargs
)
def wakeup(self):
"""Wake up the selector associated with this send thread."""
self._client.wakeup()
def bootstrap_connected(self):
return self._client.bootstrap_connected()
class SenderMetrics(object):
def __init__(self, metrics, client, metadata):
self.metrics = metrics
self._client = client
self._metadata = metadata
sensor_name = 'batch-size'
self.batch_size_sensor = self.metrics.sensor(sensor_name)
self.add_metric('batch-size-avg', Avg(),
sensor_name=sensor_name,
description='The average number of bytes sent per partition per-request.')
self.add_metric('batch-size-max', Max(),
sensor_name=sensor_name,
description='The max number of bytes sent per partition per-request.')
sensor_name = 'compression-rate'
self.compression_rate_sensor = self.metrics.sensor(sensor_name)
self.add_metric('compression-rate-avg', Avg(),
sensor_name=sensor_name,
description='The average compression rate of record batches.')
sensor_name = 'queue-time'
self.queue_time_sensor = self.metrics.sensor(sensor_name)
self.add_metric('record-queue-time-avg', Avg(),
sensor_name=sensor_name,
description='The average time in ms record batches spent in the record accumulator.')
self.add_metric('record-queue-time-max', Max(),
sensor_name=sensor_name,
description='The maximum time in ms record batches spent in the record accumulator.')
sensor_name = 'produce-throttle-time'
self.produce_throttle_time_sensor = self.metrics.sensor(sensor_name)
self.add_metric('produce-throttle-time-avg', Avg(),
sensor_name=sensor_name,
description='The average throttle time in ms')
self.add_metric('produce-throttle-time-max', Max(),
sensor_name=sensor_name,
description='The maximum throttle time in ms')
sensor_name = 'records-per-request'
self.records_per_request_sensor = self.metrics.sensor(sensor_name)
self.add_metric('record-send-rate', Rate(),
sensor_name=sensor_name,
description='The average number of records sent per second.')
self.add_metric('records-per-request-avg', Avg(),
sensor_name=sensor_name,
description='The average number of records per request.')
sensor_name = 'bytes'
self.byte_rate_sensor = self.metrics.sensor(sensor_name)
self.add_metric('byte-rate', Rate(),
sensor_name=sensor_name,
description='The average number of bytes sent per second.')
sensor_name = 'record-retries'
self.retry_sensor = self.metrics.sensor(sensor_name)
self.add_metric('record-retry-rate', Rate(),
sensor_name=sensor_name,
description='The average per-second number of retried record sends')
sensor_name = 'errors'
self.error_sensor = self.metrics.sensor(sensor_name)
self.add_metric('record-error-rate', Rate(),
sensor_name=sensor_name,
description='The average per-second number of record sends that resulted in errors')
sensor_name = 'record-size-max'
self.max_record_size_sensor = self.metrics.sensor(sensor_name)
self.add_metric('record-size-max', Max(),
sensor_name=sensor_name,
description='The maximum record size across all batches')
self.add_metric('record-size-avg', Avg(),
sensor_name=sensor_name,
description='The average maximum record size per batch')
self.add_metric('requests-in-flight',
AnonMeasurable(lambda *_: self._client.in_flight_request_count()),
description='The current number of in-flight requests awaiting a response.')
self.add_metric('metadata-age',
AnonMeasurable(lambda _, now: (now - self._metadata._last_successful_refresh_ms) / 1000),
description='The age in seconds of the current producer metadata being used.')
def add_metric(self, metric_name, measurable, group_name='producer-metrics',
description=None, tags=None,
sensor_name=None):
m = self.metrics
metric = m.metric_name(metric_name, group_name, description, tags)
if sensor_name:
sensor = m.sensor(sensor_name)
sensor.add(metric, measurable)
else:
m.add_metric(metric, measurable)
def maybe_register_topic_metrics(self, topic):
def sensor_name(name):
return 'topic.{0}.{1}'.format(topic, name)
# if one sensor of the metrics has been registered for the topic,
# then all other sensors should have been registered; and vice versa
if not self.metrics.get_sensor(sensor_name('records-per-batch')):
self.add_metric('record-send-rate', Rate(),
sensor_name=sensor_name('records-per-batch'),
group_name='producer-topic-metrics.' + topic,
description= 'Records sent per second for topic ' + topic)
self.add_metric('byte-rate', Rate(),
sensor_name=sensor_name('bytes'),
group_name='producer-topic-metrics.' + topic,
description='Bytes per second for topic ' + topic)
self.add_metric('compression-rate', Avg(),
sensor_name=sensor_name('compression-rate'),
group_name='producer-topic-metrics.' + topic,
description='Average Compression ratio for topic ' + topic)
self.add_metric('record-retry-rate', Rate(),
sensor_name=sensor_name('record-retries'),
group_name='producer-topic-metrics.' + topic,
description='Record retries per second for topic ' + topic)
self.add_metric('record-error-rate', Rate(),
sensor_name=sensor_name('record-errors'),
group_name='producer-topic-metrics.' + topic,
description='Record errors per second for topic ' + topic)
def update_produce_request_metrics(self, batches_map):
for node_batch in batches_map.values():
records = 0
total_bytes = 0
for batch in node_batch:
# register all per-topic metrics at once
topic = batch.topic_partition.topic
self.maybe_register_topic_metrics(topic)
# per-topic record send rate
topic_records_count = self.metrics.get_sensor(
'topic.' + topic + '.records-per-batch')
topic_records_count.record(batch.record_count)
# per-topic bytes send rate
topic_byte_rate = self.metrics.get_sensor(
'topic.' + topic + '.bytes')
topic_byte_rate.record(batch.records.size_in_bytes())
# per-topic compression rate
topic_compression_rate = self.metrics.get_sensor(
'topic.' + topic + '.compression-rate')
topic_compression_rate.record(batch.records.compression_rate())
# global metrics
self.batch_size_sensor.record(batch.records.size_in_bytes())
if batch.drained:
self.queue_time_sensor.record(batch.drained - batch.created)
self.compression_rate_sensor.record(batch.records.compression_rate())
self.max_record_size_sensor.record(batch.max_record_size)
records += batch.record_count
total_bytes += batch.records.size_in_bytes()
self.records_per_request_sensor.record(records)
self.byte_rate_sensor.record(total_bytes)
def record_retries(self, topic, count):
self.retry_sensor.record(count)
sensor = self.metrics.get_sensor('topic.' + topic + '.record-retries')
if sensor:
sensor.record(count)
def record_errors(self, topic, count):
self.error_sensor.record(count)
sensor = self.metrics.get_sensor('topic.' + topic + '.record-errors')
if sensor:
sensor.record(count)
def record_throttle_time(self, throttle_time_ms, node=None):
self.produce_throttle_time_sensor.record(throttle_time_ms)