Files
new_lottery_bot/task_manager.py
2025-11-12 20:57:36 +09:00

268 lines
10 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
Система управления многопоточностью и очередями для обработки запросов
"""
import asyncio
import time
from typing import Dict, Any, Optional, Callable
from dataclasses import dataclass
from enum import Enum
import logging
logger = logging.getLogger(__name__)
class TaskPriority(Enum):
"""Приоритеты задач"""
LOW = 1
NORMAL = 2
HIGH = 3
CRITICAL = 4
@dataclass
class Task:
"""Задача для выполнения"""
id: str
user_id: int
priority: TaskPriority
func: Callable
args: tuple
kwargs: dict
created_at: float
timeout: float = 30.0
def __lt__(self, other):
"""Сравнение для приоритетной очереди"""
if self.priority.value != other.priority.value:
return self.priority.value > other.priority.value
return self.created_at < other.created_at
class AsyncTaskManager:
"""Менеджер асинхронных задач с поддержкой приоритетов и ограничений"""
def __init__(self, max_workers: int = 10, max_user_concurrent: int = 3):
self.max_workers = max_workers
self.max_user_concurrent = max_user_concurrent
# Очереди и семафоры - будут созданы при запуске
self.task_queue: Optional[asyncio.PriorityQueue] = None
self.worker_semaphore: Optional[asyncio.Semaphore] = None
self.user_semaphores: Dict[int, asyncio.Semaphore] = {}
# Статистика
self.active_tasks: Dict[str, Task] = {}
self.user_task_counts: Dict[int, int] = {}
self.completed_tasks = 0
self.failed_tasks = 0
# Воркеры
self.workers = []
self.running = False
async def start(self):
"""Запуск менеджера задач"""
if self.running:
return
# Создаём asyncio объекты в правильном event loop
self.task_queue = asyncio.PriorityQueue()
self.worker_semaphore = asyncio.Semaphore(self.max_workers)
self.user_semaphores.clear() # Очищаем старые семафоры
self.running = True
logger.info(f"Запуск {self.max_workers} воркеров для обработки задач")
# Создаём воркеры
for i in range(self.max_workers):
worker = asyncio.create_task(self._worker(f"worker-{i}"))
self.workers.append(worker)
async def stop(self):
"""Остановка менеджера задач"""
if not self.running:
return
self.running = False
logger.info("Остановка менеджера задач...")
# Отменяем всех воркеров
for worker in self.workers:
worker.cancel()
# Ждём завершения
await asyncio.gather(*self.workers, return_exceptions=True)
self.workers.clear()
# Очищаем asyncio объекты
self.task_queue = None
self.worker_semaphore = None
self.user_semaphores.clear()
logger.info("Менеджер задач остановлен")
async def add_task(self,
task_id: str,
user_id: int,
func: Callable,
*args,
priority: TaskPriority = TaskPriority.NORMAL,
timeout: float = 30.0,
**kwargs) -> str:
"""Добавить задачу в очередь"""
if not self.running or self.task_queue is None:
raise RuntimeError("TaskManager не запущен")
# Проверяем лимиты пользователя
user_count = self.user_task_counts.get(user_id, 0)
if user_count >= self.max_user_concurrent:
raise ValueError(f"Пользователь {user_id} превысил лимит одновременных задач ({self.max_user_concurrent})")
# Создаём задачу
task = Task(
id=task_id,
user_id=user_id,
priority=priority,
func=func,
args=args,
kwargs=kwargs,
created_at=time.time(),
timeout=timeout
)
# Добавляем в очередь
await self.task_queue.put(task)
# Обновляем статистику
self.user_task_counts[user_id] = user_count + 1
logger.debug(f"Задача {task_id} добавлена в очередь (пользователь: {user_id}, приоритет: {priority.name})")
return task_id
async def _worker(self, worker_name: str):
"""Воркер для выполнения задач"""
logger.debug(f"Воркер {worker_name} запущен")
while self.running and self.task_queue is not None and self.worker_semaphore is not None:
try:
# Получаем задачу из очереди (с таймаутом)
try:
task = await asyncio.wait_for(self.task_queue.get(), timeout=1.0)
except asyncio.TimeoutError:
continue
# Получаем семафоры
async with self.worker_semaphore:
user_semaphore = self._get_user_semaphore(task.user_id)
if user_semaphore is not None:
async with user_semaphore:
await self._execute_task(worker_name, task)
else:
await self._execute_task(worker_name, task)
# Отмечаем задачу как выполненную
self.task_queue.task_done()
except asyncio.CancelledError:
logger.debug(f"Воркер {worker_name} отменён")
break
except Exception as e:
logger.error(f"Ошибка в воркере {worker_name}: {e}", exc_info=True)
logger.debug(f"Воркер {worker_name} завершён")
def _get_user_semaphore(self, user_id: int) -> Optional[asyncio.Semaphore]:
"""Получить семафор пользователя"""
if not self.running:
return None
if user_id not in self.user_semaphores:
self.user_semaphores[user_id] = asyncio.Semaphore(self.max_user_concurrent)
return self.user_semaphores[user_id]
async def _execute_task(self, worker_name: str, task: Task):
"""Выполнить задачу"""
task_start = time.time()
try:
# Регистрируем активную задачу
self.active_tasks[task.id] = task
logger.debug(f"Воркер {worker_name} выполняет задачу {task.id}")
# Выполняем с таймаутом
try:
if asyncio.iscoroutinefunction(task.func):
result = await asyncio.wait_for(
task.func(*task.args, **task.kwargs),
timeout=task.timeout
)
else:
# Для синхронных функций
result = await asyncio.wait_for(
asyncio.to_thread(task.func, *task.args, **task.kwargs),
timeout=task.timeout
)
self.completed_tasks += 1
execution_time = time.time() - task_start
logger.debug(f"Задача {task.id} выполнена за {execution_time:.2f}с")
except asyncio.TimeoutError:
logger.warning(f"Задача {task.id} превысила таймаут {task.timeout}с")
self.failed_tasks += 1
raise
except Exception as e:
logger.error(f"Ошибка выполнения задачи {task.id}: {e}")
self.failed_tasks += 1
raise
finally:
# Убираем из активных и обновляем счётчики
self.active_tasks.pop(task.id, None)
user_count = self.user_task_counts.get(task.user_id, 0)
if user_count > 0:
self.user_task_counts[task.user_id] = user_count - 1
def get_stats(self) -> Dict[str, Any]:
"""Получить статистику менеджера"""
return {
'running': self.running,
'workers_count': len(self.workers),
'active_tasks': len(self.active_tasks),
'queue_size': self.task_queue.qsize() if self.task_queue is not None else 0,
'completed_tasks': self.completed_tasks,
'failed_tasks': self.failed_tasks,
'user_tasks': dict(self.user_task_counts)
}
def get_user_stats(self, user_id: int) -> Dict[str, Any]:
"""Получить статистику пользователя"""
active_user_tasks = [
task for task in self.active_tasks.values()
if task.user_id == user_id
]
return {
'active_tasks': len(active_user_tasks),
'max_concurrent': self.max_user_concurrent,
'can_add_task': len(active_user_tasks) < self.max_user_concurrent,
'task_details': [
{
'id': task.id,
'priority': task.priority.name,
'created_at': task.created_at,
'running_time': time.time() - task.created_at
}
for task in active_user_tasks
]
}
# Глобальный экземпляр менеджера задач
task_manager = AsyncTaskManager(
max_workers=15, # Максимум воркеров
max_user_concurrent=5 # Максимум задач на пользователя
)