""" Celery задачи для асинхронной обработки """ import logging from celery import shared_task from datetime import datetime, timedelta from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession from sqlalchemy.orm import sessionmaker from app.settings import Config from app.database.repository import GroupRepository, MessageRepository, MessageGroupRepository from app.database.member_repository import GroupMemberRepository, GroupStatisticsRepository from app.handlers.telethon_client import telethon_manager from app.handlers.group_parser import GroupParser from app.models import Base logger = logging.getLogger(__name__) async def get_db_session(): """Получить сессию БД для Celery задач""" engine = create_async_engine(Config.DATABASE_URL, echo=False) SessionLocal = sessionmaker(engine, class_=AsyncSession, expire_on_commit=False) async with SessionLocal() as session: yield session @shared_task(name='app.celery_tasks.send_message_task') def send_message_task(message_id: int, group_id: int, chat_id: str, message_text: str): """ Задача для отправки сообщения в группу Args: message_id: ID сообщения в БД group_id: ID группы в БД chat_id: ID чата в Telegram message_text: Текст сообщения """ import asyncio async def _send(): # Инициализировать Telethon если необходимо if Config.USE_TELETHON and not telethon_manager.is_connected(): await telethon_manager.initialize() engine = create_async_engine(Config.DATABASE_URL, echo=False) SessionLocal = sessionmaker(engine, class_=AsyncSession, expire_on_commit=False) async with SessionLocal() as session: from app.handlers.hybrid_sender import HybridMessageSender from telegram.ext import Application # Получить Application (нужен для гибридного отправителя) # В Celery контексте создаем минималистичный объект app = type('obj', (object,), {'bot': type('obj', (object,), {})()})() sender = HybridMessageSender(app.bot, session) try: success, method = await sender.send_message_with_retry( chat_id=chat_id, message_text=message_text, group_id=group_id, max_retries=Config.MAX_RETRIES ) if success: # Обновить статус в БД message_group_repo = MessageGroupRepository(session) await message_group_repo.mark_as_sent(message_id, group_id) logger.info(f"✅ Задача отправки выполнена: сообщение {message_id} в группу {group_id} (способ: {method})") return {'status': 'success', 'method': method} else: logger.error(f"❌ Ошибка отправки сообщения {message_id} в группу {group_id}") return {'status': 'failed'} except Exception as e: logger.error(f"❌ Ошибка в задаче отправки: {e}") return {'status': 'error', 'error': str(e)} finally: await engine.dispose() return asyncio.run(_send()) @shared_task(name='app.celery_tasks.parse_group_members_task') def parse_group_members_task(group_id: int, chat_id: str, limit: int = 1000): """ Задача для загрузки участников группы Args: group_id: ID группы в БД chat_id: ID чата в Telegram limit: Максимум участников для загрузки """ import asyncio async def _parse(): if Config.USE_TELETHON and not telethon_manager.is_connected(): await telethon_manager.initialize() engine = create_async_engine(Config.DATABASE_URL, echo=False) SessionLocal = sessionmaker(engine, class_=AsyncSession, expire_on_commit=False) async with SessionLocal() as session: member_repo = GroupMemberRepository(session) parser = GroupParser(session) try: result = await parser.parse_group_members( chat_id=int(chat_id), member_repo=member_repo, limit=limit ) logger.info(f"✅ Задача парсинга завершена: группа {group_id} - {result}") # Коммитить изменения await session.commit() return result except Exception as e: logger.error(f"❌ Ошибка в задаче парсинга: {e}") await session.rollback() return {'success': False, 'error': str(e)} finally: await engine.dispose() return asyncio.run(_parse()) @shared_task(name='app.celery_tasks.cleanup_old_messages_task') def cleanup_old_messages_task(days: int = 30): """ Задача для очистки старых сообщений из БД Args: days: Удалить сообщения старше N дней """ import asyncio async def _cleanup(): engine = create_async_engine(Config.DATABASE_URL, echo=False) SessionLocal = sessionmaker(engine, class_=AsyncSession, expire_on_commit=False) async with SessionLocal() as session: message_repo = MessageRepository(session) try: cutoff_date = datetime.utcnow() - timedelta(days=days) count = await message_repo.delete_before_date(cutoff_date) logger.info(f"✅ Очистка завершена: удалено {count} сообщений старше {days} дней") await session.commit() return {'status': 'success', 'deleted_count': count} except Exception as e: logger.error(f"❌ Ошибка в задаче очистки: {e}") await session.rollback() return {'status': 'error', 'error': str(e)} finally: await engine.dispose() return asyncio.run(_cleanup()) @shared_task(name='app.celery_tasks.broadcast_message_task') def broadcast_message_task(message_id: int, group_ids: list): """ Задача для рассылки сообщения в несколько групп Args: message_id: ID сообщения в БД group_ids: Список ID групп в БД """ import asyncio from app.handlers.hybrid_sender import HybridMessageSender async def _broadcast(): if Config.USE_TELETHON and not telethon_manager.is_connected(): await telethon_manager.initialize() engine = create_async_engine(Config.DATABASE_URL, echo=False) SessionLocal = sessionmaker(engine, class_=AsyncSession, expire_on_commit=False) async with SessionLocal() as session: message_repo = MessageRepository(session) group_repo = GroupRepository(session) message_group_repo = MessageGroupRepository(session) try: # Получить сообщение message = await message_repo.get_by_id(message_id) if not message: logger.error(f"Сообщение {message_id} не найдено") return {'status': 'error', 'error': 'Message not found'} # Получить группы groups = [] for gid in group_ids: group = await group_repo.get_by_id(gid) if group: groups.append(group) # Отправить во все группы app = type('obj', (object,), {'bot': type('obj', (object,), {})()})() sender = HybridMessageSender(app.bot, session) results = { 'total': len(groups), 'success': 0, 'failed': 0, 'via_bot': 0, 'via_client': 0 } for group in groups: success, method = await sender.send_message_with_retry( chat_id=group.chat_id, message_text=message.text, group_id=group.id, max_retries=Config.MAX_RETRIES ) if success: results['success'] += 1 if method == 'bot': results['via_bot'] += 1 else: results['via_client'] += 1 await message_group_repo.mark_as_sent(message_id, group.id) else: results['failed'] += 1 await session.commit() logger.info(f"✅ Рассылка завершена: {results}") return results except Exception as e: logger.error(f"❌ Ошибка в задаче рассылки: {e}") await session.rollback() return {'status': 'error', 'error': str(e)} finally: await engine.dispose() return asyncio.run(_broadcast()) # Периодические задачи @shared_task(name='app.celery_tasks.health_check_task') def health_check_task(): """Проверка здоровья системы""" logger.info("✅ Health check выполнен") return {'status': 'healthy'}