211 lines
8.4 KiB
Python
211 lines
8.4 KiB
Python
"""
|
||
Планировщик расписания для автоматических рассылок
|
||
"""
|
||
|
||
import logging
|
||
from datetime import datetime
|
||
from apscheduler.schedulers.asyncio import AsyncIOScheduler
|
||
from apscheduler.triggers.cron import CronTrigger
|
||
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
|
||
from sqlalchemy.orm import sessionmaker
|
||
|
||
from app.settings import Config
|
||
from app.database.repository import GroupRepository, MessageRepository, MessageGroupRepository
|
||
from app.celery_tasks import broadcast_message_task, parse_group_members_task, cleanup_old_messages_task
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
|
||
class BroadcastScheduler:
|
||
"""Планировщик для расписания рассылок"""
|
||
|
||
def __init__(self):
|
||
self.scheduler = AsyncIOScheduler(timezone='UTC')
|
||
self.engine = None
|
||
self.SessionLocal = None
|
||
|
||
async def initialize(self):
|
||
"""Инициализировать планировщик"""
|
||
self.engine = create_async_engine(Config.DATABASE_URL, echo=False)
|
||
self.SessionLocal = sessionmaker(self.engine, class_=AsyncSession, expire_on_commit=False)
|
||
logger.info("✅ Планировщик инициализирован")
|
||
|
||
async def shutdown(self):
|
||
"""Остановить планировщик"""
|
||
if self.scheduler.running:
|
||
self.scheduler.shutdown()
|
||
if self.engine:
|
||
await self.engine.dispose()
|
||
logger.info("✅ Планировщик остановлен")
|
||
|
||
def start(self):
|
||
"""Запустить планировщик"""
|
||
if not self.scheduler.running:
|
||
self.scheduler.start()
|
||
logger.info("🚀 Планировщик запущен")
|
||
|
||
async def add_broadcast_schedule(self, message_id: int, group_ids: list, cron_expr: str,
|
||
description: str = None):
|
||
"""
|
||
Добавить расписание для рассылки
|
||
|
||
Args:
|
||
message_id: ID сообщения
|
||
group_ids: Список ID групп
|
||
cron_expr: Cron выражение (например "0 9 * * *" - ежедневно в 9:00)
|
||
description: Описание задачи
|
||
"""
|
||
try:
|
||
job_id = f"broadcast_{message_id}_{datetime.utcnow().timestamp()}"
|
||
|
||
self.scheduler.add_job(
|
||
broadcast_message_task.delay,
|
||
trigger=CronTrigger.from_crontab(cron_expr),
|
||
args=(message_id, group_ids),
|
||
id=job_id,
|
||
name=description or f"Broadcast message {message_id}",
|
||
replace_existing=True
|
||
)
|
||
|
||
logger.info(f"✅ Расписание добавлено: {job_id}")
|
||
logger.info(f" Сообщение: {message_id}")
|
||
logger.info(f" Группы: {group_ids}")
|
||
logger.info(f" Расписание: {cron_expr}")
|
||
|
||
return job_id
|
||
|
||
except Exception as e:
|
||
logger.error(f"❌ Ошибка при добавлении расписания: {e}")
|
||
raise
|
||
|
||
async def remove_broadcast_schedule(self, job_id: str):
|
||
"""Удалить расписание"""
|
||
try:
|
||
self.scheduler.remove_job(job_id)
|
||
logger.info(f"✅ Расписание удалено: {job_id}")
|
||
return True
|
||
except Exception as e:
|
||
logger.error(f"❌ Ошибка при удалении расписания: {e}")
|
||
return False
|
||
|
||
async def list_schedules(self) -> list:
|
||
"""Получить список всех расписаний"""
|
||
jobs = []
|
||
for job in self.scheduler.get_jobs():
|
||
jobs.append({
|
||
'id': job.id,
|
||
'name': job.name,
|
||
'trigger': str(job.trigger),
|
||
'next_run_time': job.next_run_time
|
||
})
|
||
return jobs
|
||
|
||
async def add_maintenance_schedules(self):
|
||
"""Добавить периодические задачи обслуживания"""
|
||
|
||
# Очистка старых сообщений каждый день в 3:00 UTC
|
||
self.scheduler.add_job(
|
||
cleanup_old_messages_task.delay,
|
||
trigger=CronTrigger.from_crontab('0 3 * * *'),
|
||
id='cleanup_old_messages',
|
||
name='Cleanup old messages',
|
||
args=(Config.MESSAGE_HISTORY_DAYS,),
|
||
replace_existing=True
|
||
)
|
||
logger.info("✅ Добавлена задача очистки старых сообщений (ежедневно 3:00 UTC)")
|
||
|
||
# Парсинг участников активных групп каждые 6 часов
|
||
if Config.ENABLE_KEYWORD_PARSING and Config.GROUP_PARSE_INTERVAL > 0:
|
||
self.scheduler.add_job(
|
||
self._parse_all_groups,
|
||
trigger=CronTrigger.from_crontab('0 */6 * * *'),
|
||
id='parse_all_groups',
|
||
name='Parse all group members',
|
||
replace_existing=True
|
||
)
|
||
logger.info("✅ Добавлена задача парсинга участников (каждые 6 часов)")
|
||
|
||
async def _parse_all_groups(self):
|
||
"""Парсить участников всех активных групп"""
|
||
try:
|
||
async with self.SessionLocal() as session:
|
||
group_repo = GroupRepository(session)
|
||
groups = await group_repo.get_active_groups()
|
||
|
||
for group in groups:
|
||
parse_group_members_task.delay(
|
||
group.id,
|
||
group.chat_id,
|
||
limit=Config.MAX_MEMBERS_TO_LOAD
|
||
)
|
||
|
||
logger.info(f"✅ Запущен парсинг {len(groups)} групп")
|
||
|
||
except Exception as e:
|
||
logger.error(f"❌ Ошибка при парсинге групп: {e}")
|
||
|
||
async def pause_schedule(self, job_id: str):
|
||
"""Приостановить расписание"""
|
||
try:
|
||
job = self.scheduler.get_job(job_id)
|
||
if job:
|
||
job.pause()
|
||
logger.info(f"⏸️ Расписание приостановлено: {job_id}")
|
||
return True
|
||
except Exception as e:
|
||
logger.error(f"❌ Ошибка при паузе расписания: {e}")
|
||
return False
|
||
|
||
async def resume_schedule(self, job_id: str):
|
||
"""Возобновить расписание"""
|
||
try:
|
||
job = self.scheduler.get_job(job_id)
|
||
if job:
|
||
job.resume()
|
||
logger.info(f"▶️ Расписание возобновлено: {job_id}")
|
||
return True
|
||
except Exception as e:
|
||
logger.error(f"❌ Ошибка при возобновлении расписания: {e}")
|
||
return False
|
||
|
||
|
||
# Глобальный экземпляр планировщика
|
||
broadcast_scheduler = BroadcastScheduler()
|
||
|
||
|
||
# Вспомогательные функции для работы с расписанием
|
||
async def schedule_broadcast(message_id: int, group_ids: list, cron_expr: str):
|
||
"""Расписать рассылку сообщения"""
|
||
return await broadcast_scheduler.add_broadcast_schedule(
|
||
message_id=message_id,
|
||
group_ids=group_ids,
|
||
cron_expr=cron_expr,
|
||
description=f"Broadcast message {message_id}"
|
||
)
|
||
|
||
|
||
async def cancel_broadcast(job_id: str):
|
||
"""Отменить расписанную рассылку"""
|
||
return await broadcast_scheduler.remove_broadcast_schedule(job_id)
|
||
|
||
|
||
async def list_broadcasts():
|
||
"""Получить список всех расписаний"""
|
||
return await broadcast_scheduler.list_schedules()
|
||
|
||
|
||
# Примеры cron выражений
|
||
"""
|
||
Cron формат: minute hour day month day_of_week
|
||
|
||
Примеры:
|
||
- '0 9 * * *' - ежедневно в 9:00 UTC
|
||
- '0 9 * * MON' - по понедельникам в 9:00 UTC
|
||
- '0 */6 * * *' - каждые 6 часов
|
||
- '0 9,14,18 * * *' - в 9:00, 14:00 и 18:00 UTC ежедневно
|
||
- '*/30 * * * *' - каждые 30 минут
|
||
- '0 0 * * *' - ежедневно в полночь UTC
|
||
- '0 0 1 * *' - первого числа каждого месяца в полночь UTC
|
||
- '0 0 * * 0' - по воскресеньям в полночь UTC
|
||
"""
|