""" Планировщик расписания для автоматических рассылок """ import logging from datetime import datetime from apscheduler.schedulers.asyncio import AsyncIOScheduler from apscheduler.triggers.cron import CronTrigger from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession from sqlalchemy.orm import sessionmaker from app.settings import Config from app.database.repository import GroupRepository, MessageRepository, MessageGroupRepository from app.celery_tasks import broadcast_message_task, parse_group_members_task, cleanup_old_messages_task logger = logging.getLogger(__name__) class BroadcastScheduler: """Планировщик для расписания рассылок""" def __init__(self): self.scheduler = AsyncIOScheduler(timezone='UTC') self.engine = None self.SessionLocal = None async def initialize(self): """Инициализировать планировщик""" self.engine = create_async_engine(Config.DATABASE_URL, echo=False) self.SessionLocal = sessionmaker(self.engine, class_=AsyncSession, expire_on_commit=False) logger.info("✅ Планировщик инициализирован") async def shutdown(self): """Остановить планировщик""" if self.scheduler.running: self.scheduler.shutdown() if self.engine: await self.engine.dispose() logger.info("✅ Планировщик остановлен") def start(self): """Запустить планировщик""" if not self.scheduler.running: self.scheduler.start() logger.info("🚀 Планировщик запущен") async def add_broadcast_schedule(self, message_id: int, group_ids: list, cron_expr: str, description: str = None): """ Добавить расписание для рассылки Args: message_id: ID сообщения group_ids: Список ID групп cron_expr: Cron выражение (например "0 9 * * *" - ежедневно в 9:00) description: Описание задачи """ try: job_id = f"broadcast_{message_id}_{datetime.utcnow().timestamp()}" self.scheduler.add_job( broadcast_message_task.delay, trigger=CronTrigger.from_crontab(cron_expr), args=(message_id, group_ids), id=job_id, name=description or f"Broadcast message {message_id}", replace_existing=True ) logger.info(f"✅ Расписание добавлено: {job_id}") logger.info(f" Сообщение: {message_id}") logger.info(f" Группы: {group_ids}") logger.info(f" Расписание: {cron_expr}") return job_id except Exception as e: logger.error(f"❌ Ошибка при добавлении расписания: {e}") raise async def remove_broadcast_schedule(self, job_id: str): """Удалить расписание""" try: self.scheduler.remove_job(job_id) logger.info(f"✅ Расписание удалено: {job_id}") return True except Exception as e: logger.error(f"❌ Ошибка при удалении расписания: {e}") return False async def list_schedules(self) -> list: """Получить список всех расписаний""" jobs = [] for job in self.scheduler.get_jobs(): jobs.append({ 'id': job.id, 'name': job.name, 'trigger': str(job.trigger), 'next_run_time': job.next_run_time }) return jobs async def add_maintenance_schedules(self): """Добавить периодические задачи обслуживания""" # Очистка старых сообщений каждый день в 3:00 UTC self.scheduler.add_job( cleanup_old_messages_task.delay, trigger=CronTrigger.from_crontab('0 3 * * *'), id='cleanup_old_messages', name='Cleanup old messages', args=(Config.MESSAGE_HISTORY_DAYS,), replace_existing=True ) logger.info("✅ Добавлена задача очистки старых сообщений (ежедневно 3:00 UTC)") # Парсинг участников активных групп каждые 6 часов if Config.ENABLE_KEYWORD_PARSING and Config.GROUP_PARSE_INTERVAL > 0: self.scheduler.add_job( self._parse_all_groups, trigger=CronTrigger.from_crontab('0 */6 * * *'), id='parse_all_groups', name='Parse all group members', replace_existing=True ) logger.info("✅ Добавлена задача парсинга участников (каждые 6 часов)") async def _parse_all_groups(self): """Парсить участников всех активных групп""" try: async with self.SessionLocal() as session: group_repo = GroupRepository(session) groups = await group_repo.get_active_groups() for group in groups: parse_group_members_task.delay( group.id, group.chat_id, limit=Config.MAX_MEMBERS_TO_LOAD ) logger.info(f"✅ Запущен парсинг {len(groups)} групп") except Exception as e: logger.error(f"❌ Ошибка при парсинге групп: {e}") async def pause_schedule(self, job_id: str): """Приостановить расписание""" try: job = self.scheduler.get_job(job_id) if job: job.pause() logger.info(f"⏸️ Расписание приостановлено: {job_id}") return True except Exception as e: logger.error(f"❌ Ошибка при паузе расписания: {e}") return False async def resume_schedule(self, job_id: str): """Возобновить расписание""" try: job = self.scheduler.get_job(job_id) if job: job.resume() logger.info(f"▶️ Расписание возобновлено: {job_id}") return True except Exception as e: logger.error(f"❌ Ошибка при возобновлении расписания: {e}") return False # Глобальный экземпляр планировщика broadcast_scheduler = BroadcastScheduler() # Вспомогательные функции для работы с расписанием async def schedule_broadcast(message_id: int, group_ids: list, cron_expr: str): """Расписать рассылку сообщения""" return await broadcast_scheduler.add_broadcast_schedule( message_id=message_id, group_ids=group_ids, cron_expr=cron_expr, description=f"Broadcast message {message_id}" ) async def cancel_broadcast(job_id: str): """Отменить расписанную рассылку""" return await broadcast_scheduler.remove_broadcast_schedule(job_id) async def list_broadcasts(): """Получить список всех расписаний""" return await broadcast_scheduler.list_schedules() # Примеры cron выражений """ Cron формат: minute hour day month day_of_week Примеры: - '0 9 * * *' - ежедневно в 9:00 UTC - '0 9 * * MON' - по понедельникам в 9:00 UTC - '0 */6 * * *' - каждые 6 часов - '0 9,14,18 * * *' - в 9:00, 14:00 и 18:00 UTC ежедневно - '*/30 * * * *' - каждые 30 минут - '0 0 * * *' - ежедневно в полночь UTC - '0 0 1 * *' - первого числа каждого месяца в полночь UTC - '0 0 * * 0' - по воскресеньям в полночь UTC """