main commit
All checks were successful
continuous-integration/drone/push Build is passing

This commit is contained in:
2025-10-16 16:30:25 +09:00
parent 91c7e04474
commit 537e7b363f
1146 changed files with 45926 additions and 77196 deletions

View File

@@ -1,40 +1,18 @@
from __future__ import absolute_import
import abc
from collections import OrderedDict
try:
from collections.abc import Sequence
except ImportError:
from collections import Sequence
try:
# enum in stdlib as of py3.4
from enum import IntEnum # pylint: disable=import-error
except ImportError:
# vendored backport module
from kafka.vendor.enum34 import IntEnum
import logging
import random
import re
import threading
import time
from kafka.vendor import six
import kafka.errors as Errors
from kafka.protocol.list_offsets import OffsetResetStrategy
from kafka.errors import IllegalStateError
from kafka.protocol.offset import OffsetResetStrategy
from kafka.structs import OffsetAndMetadata
from kafka.util import ensure_valid_topic_name, synchronized
log = logging.getLogger(__name__)
class SubscriptionType(IntEnum):
NONE = 0
AUTO_TOPICS = 1
AUTO_PATTERN = 2
USER_ASSIGNED = 3
class SubscriptionState(object):
"""
A class for tracking the topics, partitions, and offsets for the consumer.
@@ -54,6 +32,10 @@ class SubscriptionState(object):
Note that pause state as well as fetch/consumed positions are not preserved
when partition assignment is changed whether directly by the user or
through a group rebalance.
This class also maintains a cache of the latest commit position for each of
the assigned partitions. This is updated through committed() and can be used
to set the initial fetch position (e.g. Fetcher._reset_offset() ).
"""
_SUBSCRIPTION_EXCEPTION_MESSAGE = (
"You must choose only one way to configure your consumer:"
@@ -61,6 +43,10 @@ class SubscriptionState(object):
" (2) subscribe to topics matching a regex pattern,"
" (3) assign itself specific topic-partitions.")
# Taken from: https://github.com/apache/kafka/blob/39eb31feaeebfb184d98cc5d94da9148c2319d81/clients/src/main/java/org/apache/kafka/common/internals/Topic.java#L29
_MAX_NAME_LENGTH = 249
_TOPIC_LEGAL_CHARS = re.compile('^[a-zA-Z0-9._-]+$')
def __init__(self, offset_reset_strategy='earliest'):
"""Initialize a SubscriptionState instance
@@ -78,24 +64,15 @@ class SubscriptionState(object):
self._default_offset_reset_strategy = offset_reset_strategy
self.subscription = None # set() or None
self.subscription_type = SubscriptionType.NONE
self.subscribed_pattern = None # regex str or None
self._group_subscription = set()
self._user_assignment = set()
self.assignment = OrderedDict()
self.rebalance_listener = None
self.listeners = []
self._lock = threading.RLock()
self.assignment = dict()
self.listener = None
def _set_subscription_type(self, subscription_type):
if not isinstance(subscription_type, SubscriptionType):
raise ValueError('SubscriptionType enum required')
if self.subscription_type == SubscriptionType.NONE:
self.subscription_type = subscription_type
elif self.subscription_type != subscription_type:
raise Errors.IllegalStateError(self._SUBSCRIPTION_EXCEPTION_MESSAGE)
# initialize to true for the consumers to fetch offset upon starting up
self.needs_fetch_committed_offsets = True
@synchronized
def subscribe(self, topics=(), pattern=None, listener=None):
"""Subscribe to a list of topics, or a topic regex pattern.
@@ -131,26 +108,39 @@ class SubscriptionState(object):
guaranteed, however, that the partitions revoked/assigned
through this interface are from topics subscribed in this call.
"""
if self._user_assignment or (topics and pattern):
raise IllegalStateError(self._SUBSCRIPTION_EXCEPTION_MESSAGE)
assert topics or pattern, 'Must provide topics or pattern'
if (topics and pattern):
raise Errors.IllegalStateError(self._SUBSCRIPTION_EXCEPTION_MESSAGE)
elif pattern:
self._set_subscription_type(SubscriptionType.AUTO_PATTERN)
if pattern:
log.info('Subscribing to pattern: /%s/', pattern)
self.subscription = set()
self.subscribed_pattern = re.compile(pattern)
else:
if isinstance(topics, str) or not isinstance(topics, Sequence):
raise TypeError('Topics must be a list (or non-str sequence)')
self._set_subscription_type(SubscriptionType.AUTO_TOPICS)
self.change_subscription(topics)
if listener and not isinstance(listener, ConsumerRebalanceListener):
raise TypeError('listener must be a ConsumerRebalanceListener')
self.rebalance_listener = listener
self.listener = listener
def _ensure_valid_topic_name(self, topic):
""" Ensures that the topic name is valid according to the kafka source. """
# See Kafka Source:
# https://github.com/apache/kafka/blob/39eb31feaeebfb184d98cc5d94da9148c2319d81/clients/src/main/java/org/apache/kafka/common/internals/Topic.java
if topic is None:
raise TypeError('All topics must not be None')
if not isinstance(topic, six.string_types):
raise TypeError('All topics must be strings')
if len(topic) == 0:
raise ValueError('All topics must be non-empty strings')
if topic == '.' or topic == '..':
raise ValueError('Topic name cannot be "." or ".."')
if len(topic) > self._MAX_NAME_LENGTH:
raise ValueError('Topic name is illegal, it can\'t be longer than {0} characters, topic: "{1}"'.format(self._MAX_NAME_LENGTH, topic))
if not self._TOPIC_LEGAL_CHARS.match(topic):
raise ValueError('Topic name "{0}" is illegal, it contains a character other than ASCII alphanumerics, ".", "_" and "-"'.format(topic))
@synchronized
def change_subscription(self, topics):
"""Change the topic subscription.
@@ -164,8 +154,8 @@ class SubscriptionState(object):
- a topic name is '.' or '..' or
- a topic name does not consist of ASCII-characters/'-'/'_'/'.'
"""
if not self.partitions_auto_assigned():
raise Errors.IllegalStateError(self._SUBSCRIPTION_EXCEPTION_MESSAGE)
if self._user_assignment:
raise IllegalStateError(self._SUBSCRIPTION_EXCEPTION_MESSAGE)
if isinstance(topics, six.string_types):
topics = [topics]
@@ -176,13 +166,17 @@ class SubscriptionState(object):
return
for t in topics:
ensure_valid_topic_name(t)
self._ensure_valid_topic_name(t)
log.info('Updating subscribed topics to: %s', topics)
self.subscription = set(topics)
self._group_subscription.update(topics)
@synchronized
# Remove any assigned partitions which are no longer subscribed to
for tp in set(self.assignment.keys()):
if tp.topic not in self.subscription:
del self.assignment[tp]
def group_subscribe(self, topics):
"""Add topics to the current group subscription.
@@ -192,19 +186,17 @@ class SubscriptionState(object):
Arguments:
topics (list of str): topics to add to the group subscription
"""
if not self.partitions_auto_assigned():
raise Errors.IllegalStateError(self._SUBSCRIPTION_EXCEPTION_MESSAGE)
if self._user_assignment:
raise IllegalStateError(self._SUBSCRIPTION_EXCEPTION_MESSAGE)
self._group_subscription.update(topics)
@synchronized
def reset_group_subscription(self):
"""Reset the group's subscription to only contain topics subscribed by this consumer."""
if not self.partitions_auto_assigned():
raise Errors.IllegalStateError(self._SUBSCRIPTION_EXCEPTION_MESSAGE)
if self._user_assignment:
raise IllegalStateError(self._SUBSCRIPTION_EXCEPTION_MESSAGE)
assert self.subscription is not None, 'Subscription required'
self._group_subscription.intersection_update(self.subscription)
@synchronized
def assign_from_user(self, partitions):
"""Manually assign a list of TopicPartitions to this consumer.
@@ -223,13 +215,21 @@ class SubscriptionState(object):
Raises:
IllegalStateError: if consumer has already called subscribe()
"""
self._set_subscription_type(SubscriptionType.USER_ASSIGNED)
if self.subscription is not None:
raise IllegalStateError(self._SUBSCRIPTION_EXCEPTION_MESSAGE)
if self._user_assignment != set(partitions):
self._user_assignment = set(partitions)
self._set_assignment({partition: self.assignment.get(partition, TopicPartitionState())
for partition in partitions})
@synchronized
for partition in partitions:
if partition not in self.assignment:
self._add_assigned_partition(partition)
for tp in set(self.assignment.keys()) - self._user_assignment:
del self.assignment[tp]
self.needs_fetch_committed_offsets = True
def assign_from_subscribed(self, assignments):
"""Update the assignment to the specified partitions
@@ -243,39 +243,26 @@ class SubscriptionState(object):
consumer instance.
"""
if not self.partitions_auto_assigned():
raise Errors.IllegalStateError(self._SUBSCRIPTION_EXCEPTION_MESSAGE)
raise IllegalStateError(self._SUBSCRIPTION_EXCEPTION_MESSAGE)
for tp in assignments:
if tp.topic not in self.subscription:
raise ValueError("Assigned partition %s for non-subscribed topic." % (tp,))
# randomized ordering should improve balance for short-lived consumers
self._set_assignment({partition: TopicPartitionState() for partition in assignments}, randomize=True)
# after rebalancing, we always reinitialize the assignment state
self.assignment.clear()
for tp in assignments:
self._add_assigned_partition(tp)
self.needs_fetch_committed_offsets = True
log.info("Updated partition assignment: %s", assignments)
def _set_assignment(self, partition_states, randomize=False):
"""Batch partition assignment by topic (self.assignment is OrderedDict)"""
self.assignment.clear()
topics = [tp.topic for tp in six.iterkeys(partition_states)]
if randomize:
random.shuffle(topics)
topic_partitions = OrderedDict({topic: [] for topic in topics})
for tp in six.iterkeys(partition_states):
topic_partitions[tp.topic].append(tp)
for topic in six.iterkeys(topic_partitions):
for tp in topic_partitions[topic]:
self.assignment[tp] = partition_states[tp]
@synchronized
def unsubscribe(self):
"""Clear all topic subscriptions and partition assignments"""
self.subscription = None
self._user_assignment.clear()
self.assignment.clear()
self.subscribed_pattern = None
self.subscription_type = SubscriptionType.NONE
@synchronized
def group_subscription(self):
"""Get the topic subscription for the group.
@@ -291,7 +278,6 @@ class SubscriptionState(object):
"""
return self._group_subscription
@synchronized
def seek(self, partition, offset):
"""Manually specify the fetch offset for a TopicPartition.
@@ -303,48 +289,40 @@ class SubscriptionState(object):
Arguments:
partition (TopicPartition): partition for seek operation
offset (int or OffsetAndMetadata): message offset in partition
offset (int): message offset in partition
"""
if not isinstance(offset, (int, OffsetAndMetadata)):
raise TypeError("offset must be type in or OffsetAndMetadata")
self.assignment[partition].seek(offset)
@synchronized
def assigned_partitions(self):
"""Return set of TopicPartitions in current assignment."""
return set(self.assignment.keys())
@synchronized
def paused_partitions(self):
"""Return current set of paused TopicPartitions."""
return set(partition for partition in self.assignment
if self.is_paused(partition))
@synchronized
def fetchable_partitions(self):
"""Return ordered list of TopicPartitions that should be Fetched."""
fetchable = list()
"""Return set of TopicPartitions that should be Fetched."""
fetchable = set()
for partition, state in six.iteritems(self.assignment):
if state.is_fetchable():
fetchable.append(partition)
fetchable.add(partition)
return fetchable
@synchronized
def partitions_auto_assigned(self):
"""Return True unless user supplied partitions manually."""
return self.subscription_type in (SubscriptionType.AUTO_TOPICS, SubscriptionType.AUTO_PATTERN)
return self.subscription is not None
@synchronized
def all_consumed_offsets(self):
"""Returns consumed offsets as {TopicPartition: OffsetAndMetadata}"""
all_consumed = {}
for partition, state in six.iteritems(self.assignment):
if state.has_valid_position:
all_consumed[partition] = state.position
all_consumed[partition] = OffsetAndMetadata(state.position, '')
return all_consumed
@synchronized
def request_offset_reset(self, partition, offset_reset_strategy=None):
def need_offset_reset(self, partition, offset_reset_strategy=None):
"""Mark partition for offset reset using specified or default strategy.
Arguments:
@@ -353,113 +331,63 @@ class SubscriptionState(object):
"""
if offset_reset_strategy is None:
offset_reset_strategy = self._default_offset_reset_strategy
self.assignment[partition].reset(offset_reset_strategy)
self.assignment[partition].await_reset(offset_reset_strategy)
@synchronized
def set_reset_pending(self, partitions, next_allowed_reset_time):
for partition in partitions:
self.assignment[partition].set_reset_pending(next_allowed_reset_time)
@synchronized
def has_default_offset_reset_policy(self):
"""Return True if default offset reset policy is Earliest or Latest"""
return self._default_offset_reset_strategy != OffsetResetStrategy.NONE
@synchronized
def is_offset_reset_needed(self, partition):
return self.assignment[partition].awaiting_reset
@synchronized
def has_all_fetch_positions(self):
for state in six.itervalues(self.assignment):
for state in self.assignment.values():
if not state.has_valid_position:
return False
return True
@synchronized
def missing_fetch_positions(self):
missing = set()
for partition, state in six.iteritems(self.assignment):
if state.is_missing_position():
if not state.has_valid_position:
missing.add(partition)
return missing
@synchronized
def has_valid_position(self, partition):
return partition in self.assignment and self.assignment[partition].has_valid_position
@synchronized
def reset_missing_positions(self):
partitions_with_no_offsets = set()
for tp, state in six.iteritems(self.assignment):
if state.is_missing_position():
if self._default_offset_reset_strategy == OffsetResetStrategy.NONE:
partitions_with_no_offsets.add(tp)
else:
state.reset(self._default_offset_reset_strategy)
if partitions_with_no_offsets:
raise Errors.NoOffsetForPartitionError(partitions_with_no_offsets)
@synchronized
def partitions_needing_reset(self):
partitions = set()
for tp, state in six.iteritems(self.assignment):
if state.awaiting_reset and state.is_reset_allowed():
partitions.add(tp)
return partitions
@synchronized
def is_assigned(self, partition):
return partition in self.assignment
@synchronized
def is_paused(self, partition):
return partition in self.assignment and self.assignment[partition].paused
@synchronized
def is_fetchable(self, partition):
return partition in self.assignment and self.assignment[partition].is_fetchable()
@synchronized
def pause(self, partition):
self.assignment[partition].pause()
@synchronized
def resume(self, partition):
self.assignment[partition].resume()
@synchronized
def reset_failed(self, partitions, next_retry_time):
for partition in partitions:
self.assignment[partition].reset_failed(next_retry_time)
@synchronized
def move_partition_to_end(self, partition):
if partition in self.assignment:
try:
self.assignment.move_to_end(partition)
except AttributeError:
state = self.assignment.pop(partition)
self.assignment[partition] = state
@synchronized
def position(self, partition):
return self.assignment[partition].position
def _add_assigned_partition(self, partition):
self.assignment[partition] = TopicPartitionState()
class TopicPartitionState(object):
def __init__(self):
self.committed = None # last committed OffsetAndMetadata
self.has_valid_position = False # whether we have valid position
self.paused = False # whether this partition has been paused by the user
self.reset_strategy = None # the reset strategy if awaiting_reset is set
self._position = None # OffsetAndMetadata exposed to the user
self.awaiting_reset = False # whether we are awaiting reset
self.reset_strategy = None # the reset strategy if awaitingReset is set
self._position = None # offset exposed to the user
self.highwater = None
self.drop_pending_record_batch = False
self.next_allowed_retry_time = None
self.drop_pending_message_set = False
# The last message offset hint available from a message batch with
# magic=2 which includes deleted compacted messages
self.last_offset_from_message_batch = None
def _set_position(self, offset):
assert self.has_valid_position, 'Valid position required'
assert isinstance(offset, OffsetAndMetadata)
self._position = offset
def _get_position(self):
@@ -467,37 +395,20 @@ class TopicPartitionState(object):
position = property(_get_position, _set_position, None, "last position")
def reset(self, strategy):
assert strategy is not None
def await_reset(self, strategy):
self.awaiting_reset = True
self.reset_strategy = strategy
self._position = None
self.next_allowed_retry_time = None
def is_reset_allowed(self):
return self.next_allowed_retry_time is None or self.next_allowed_retry_time < time.time()
@property
def awaiting_reset(self):
return self.reset_strategy is not None
def set_reset_pending(self, next_allowed_retry_time):
self.next_allowed_retry_time = next_allowed_retry_time
def reset_failed(self, next_allowed_retry_time):
self.next_allowed_retry_time = next_allowed_retry_time
@property
def has_valid_position(self):
return self._position is not None
def is_missing_position(self):
return not self.has_valid_position and not self.awaiting_reset
self.last_offset_from_message_batch = None
self.has_valid_position = False
def seek(self, offset):
self._position = offset if isinstance(offset, OffsetAndMetadata) else OffsetAndMetadata(offset, '', -1)
self._position = offset
self.awaiting_reset = False
self.reset_strategy = None
self.drop_pending_record_batch = True
self.next_allowed_retry_time = None
self.has_valid_position = True
self.drop_pending_message_set = True
self.last_offset_from_message_batch = None
def pause(self):
self.paused = True
@@ -509,7 +420,6 @@ class TopicPartitionState(object):
return not self.paused and self.has_valid_position
@six.add_metaclass(abc.ABCMeta)
class ConsumerRebalanceListener(object):
"""
A callback interface that the user can implement to trigger custom actions
@@ -551,6 +461,8 @@ class ConsumerRebalanceListener(object):
taking over that partition has their on_partitions_assigned() callback
called to load the state.
"""
__metaclass__ = abc.ABCMeta
@abc.abstractmethod
def on_partitions_revoked(self, revoked):
"""