""" Система управления многопоточностью и очередями для обработки запросов """ import asyncio import time from typing import Dict, Any, Optional, Callable from dataclasses import dataclass from enum import Enum import logging logger = logging.getLogger(__name__) class TaskPriority(Enum): """Приоритеты задач""" LOW = 1 NORMAL = 2 HIGH = 3 CRITICAL = 4 @dataclass class Task: """Задача для выполнения""" id: str user_id: int priority: TaskPriority func: Callable args: tuple kwargs: dict created_at: float timeout: float = 30.0 def __lt__(self, other): """Сравнение для приоритетной очереди""" if self.priority.value != other.priority.value: return self.priority.value > other.priority.value return self.created_at < other.created_at class AsyncTaskManager: """Менеджер асинхронных задач с поддержкой приоритетов и ограничений""" def __init__(self, max_workers: int = 10, max_user_concurrent: int = 3): self.max_workers = max_workers self.max_user_concurrent = max_user_concurrent # Очереди и семафоры - будут созданы при запуске self.task_queue: Optional[asyncio.PriorityQueue] = None self.worker_semaphore: Optional[asyncio.Semaphore] = None self.user_semaphores: Dict[int, asyncio.Semaphore] = {} # Статистика self.active_tasks: Dict[str, Task] = {} self.user_task_counts: Dict[int, int] = {} self.completed_tasks = 0 self.failed_tasks = 0 # Воркеры self.workers = [] self.running = False async def start(self): """Запуск менеджера задач""" if self.running: return # Создаём asyncio объекты в правильном event loop self.task_queue = asyncio.PriorityQueue() self.worker_semaphore = asyncio.Semaphore(self.max_workers) self.user_semaphores.clear() # Очищаем старые семафоры self.running = True logger.info(f"Запуск {self.max_workers} воркеров для обработки задач") # Создаём воркеры for i in range(self.max_workers): worker = asyncio.create_task(self._worker(f"worker-{i}")) self.workers.append(worker) async def stop(self): """Остановка менеджера задач""" if not self.running: return self.running = False logger.info("Остановка менеджера задач...") # Отменяем всех воркеров for worker in self.workers: worker.cancel() # Ждём завершения await asyncio.gather(*self.workers, return_exceptions=True) self.workers.clear() # Очищаем asyncio объекты self.task_queue = None self.worker_semaphore = None self.user_semaphores.clear() logger.info("Менеджер задач остановлен") async def add_task(self, task_id: str, user_id: int, func: Callable, *args, priority: TaskPriority = TaskPriority.NORMAL, timeout: float = 30.0, **kwargs) -> str: """Добавить задачу в очередь""" if not self.running or self.task_queue is None: raise RuntimeError("TaskManager не запущен") # Проверяем лимиты пользователя user_count = self.user_task_counts.get(user_id, 0) if user_count >= self.max_user_concurrent: raise ValueError(f"Пользователь {user_id} превысил лимит одновременных задач ({self.max_user_concurrent})") # Создаём задачу task = Task( id=task_id, user_id=user_id, priority=priority, func=func, args=args, kwargs=kwargs, created_at=time.time(), timeout=timeout ) # Добавляем в очередь await self.task_queue.put(task) # Обновляем статистику self.user_task_counts[user_id] = user_count + 1 logger.debug(f"Задача {task_id} добавлена в очередь (пользователь: {user_id}, приоритет: {priority.name})") return task_id async def _worker(self, worker_name: str): """Воркер для выполнения задач""" logger.debug(f"Воркер {worker_name} запущен") while self.running and self.task_queue is not None and self.worker_semaphore is not None: try: # Получаем задачу из очереди (с таймаутом) try: task = await asyncio.wait_for(self.task_queue.get(), timeout=1.0) except asyncio.TimeoutError: continue # Получаем семафоры async with self.worker_semaphore: user_semaphore = self._get_user_semaphore(task.user_id) if user_semaphore is not None: async with user_semaphore: await self._execute_task(worker_name, task) else: await self._execute_task(worker_name, task) # Отмечаем задачу как выполненную self.task_queue.task_done() except asyncio.CancelledError: logger.debug(f"Воркер {worker_name} отменён") break except Exception as e: logger.error(f"Ошибка в воркере {worker_name}: {e}", exc_info=True) logger.debug(f"Воркер {worker_name} завершён") def _get_user_semaphore(self, user_id: int) -> Optional[asyncio.Semaphore]: """Получить семафор пользователя""" if not self.running: return None if user_id not in self.user_semaphores: self.user_semaphores[user_id] = asyncio.Semaphore(self.max_user_concurrent) return self.user_semaphores[user_id] async def _execute_task(self, worker_name: str, task: Task): """Выполнить задачу""" task_start = time.time() try: # Регистрируем активную задачу self.active_tasks[task.id] = task logger.debug(f"Воркер {worker_name} выполняет задачу {task.id}") # Выполняем с таймаутом try: if asyncio.iscoroutinefunction(task.func): result = await asyncio.wait_for( task.func(*task.args, **task.kwargs), timeout=task.timeout ) else: # Для синхронных функций result = await asyncio.wait_for( asyncio.to_thread(task.func, *task.args, **task.kwargs), timeout=task.timeout ) self.completed_tasks += 1 execution_time = time.time() - task_start logger.debug(f"Задача {task.id} выполнена за {execution_time:.2f}с") except asyncio.TimeoutError: logger.warning(f"Задача {task.id} превысила таймаут {task.timeout}с") self.failed_tasks += 1 raise except Exception as e: logger.error(f"Ошибка выполнения задачи {task.id}: {e}") self.failed_tasks += 1 raise finally: # Убираем из активных и обновляем счётчики self.active_tasks.pop(task.id, None) user_count = self.user_task_counts.get(task.user_id, 0) if user_count > 0: self.user_task_counts[task.user_id] = user_count - 1 def get_stats(self) -> Dict[str, Any]: """Получить статистику менеджера""" return { 'running': self.running, 'workers_count': len(self.workers), 'active_tasks': len(self.active_tasks), 'queue_size': self.task_queue.qsize() if self.task_queue is not None else 0, 'completed_tasks': self.completed_tasks, 'failed_tasks': self.failed_tasks, 'user_tasks': dict(self.user_task_counts) } def get_user_stats(self, user_id: int) -> Dict[str, Any]: """Получить статистику пользователя""" active_user_tasks = [ task for task in self.active_tasks.values() if task.user_id == user_id ] return { 'active_tasks': len(active_user_tasks), 'max_concurrent': self.max_user_concurrent, 'can_add_task': len(active_user_tasks) < self.max_user_concurrent, 'task_details': [ { 'id': task.id, 'priority': task.priority.name, 'created_at': task.created_at, 'running_time': time.time() - task.created_at } for task in active_user_tasks ] } # Глобальный экземпляр менеджера задач task_manager = AsyncTaskManager( max_workers=15, # Максимум воркеров max_user_concurrent=5 # Максимум задач на пользователя )