Files
TG_autoposter/app/scheduler.py
2025-12-18 05:55:32 +09:00

211 lines
8.4 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
Планировщик расписания для автоматических рассылок
"""
import logging
from datetime import datetime
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.triggers.cron import CronTrigger
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker
from app.settings import Config
from app.database.repository import GroupRepository, MessageRepository, MessageGroupRepository
from app.celery_tasks import broadcast_message_task, parse_group_members_task, cleanup_old_messages_task
logger = logging.getLogger(__name__)
class BroadcastScheduler:
"""Планировщик для расписания рассылок"""
def __init__(self):
self.scheduler = AsyncIOScheduler(timezone='UTC')
self.engine = None
self.SessionLocal = None
async def initialize(self):
"""Инициализировать планировщик"""
self.engine = create_async_engine(Config.DATABASE_URL, echo=False)
self.SessionLocal = sessionmaker(self.engine, class_=AsyncSession, expire_on_commit=False)
logger.info("✅ Планировщик инициализирован")
async def shutdown(self):
"""Остановить планировщик"""
if self.scheduler.running:
self.scheduler.shutdown()
if self.engine:
await self.engine.dispose()
logger.info("✅ Планировщик остановлен")
def start(self):
"""Запустить планировщик"""
if not self.scheduler.running:
self.scheduler.start()
logger.info("🚀 Планировщик запущен")
async def add_broadcast_schedule(self, message_id: int, group_ids: list, cron_expr: str,
description: str = None):
"""
Добавить расписание для рассылки
Args:
message_id: ID сообщения
group_ids: Список ID групп
cron_expr: Cron выражение (например "0 9 * * *" - ежедневно в 9:00)
description: Описание задачи
"""
try:
job_id = f"broadcast_{message_id}_{datetime.utcnow().timestamp()}"
self.scheduler.add_job(
broadcast_message_task.delay,
trigger=CronTrigger.from_crontab(cron_expr),
args=(message_id, group_ids),
id=job_id,
name=description or f"Broadcast message {message_id}",
replace_existing=True
)
logger.info(f"✅ Расписание добавлено: {job_id}")
logger.info(f" Сообщение: {message_id}")
logger.info(f" Группы: {group_ids}")
logger.info(f" Расписание: {cron_expr}")
return job_id
except Exception as e:
logger.error(f"❌ Ошибка при добавлении расписания: {e}")
raise
async def remove_broadcast_schedule(self, job_id: str):
"""Удалить расписание"""
try:
self.scheduler.remove_job(job_id)
logger.info(f"✅ Расписание удалено: {job_id}")
return True
except Exception as e:
logger.error(f"❌ Ошибка при удалении расписания: {e}")
return False
async def list_schedules(self) -> list:
"""Получить список всех расписаний"""
jobs = []
for job in self.scheduler.get_jobs():
jobs.append({
'id': job.id,
'name': job.name,
'trigger': str(job.trigger),
'next_run_time': job.next_run_time
})
return jobs
async def add_maintenance_schedules(self):
"""Добавить периодические задачи обслуживания"""
# Очистка старых сообщений каждый день в 3:00 UTC
self.scheduler.add_job(
cleanup_old_messages_task.delay,
trigger=CronTrigger.from_crontab('0 3 * * *'),
id='cleanup_old_messages',
name='Cleanup old messages',
args=(Config.MESSAGE_HISTORY_DAYS,),
replace_existing=True
)
logger.info("✅ Добавлена задача очистки старых сообщений (ежедневно 3:00 UTC)")
# Парсинг участников активных групп каждые 6 часов
if Config.ENABLE_KEYWORD_PARSING and Config.GROUP_PARSE_INTERVAL > 0:
self.scheduler.add_job(
self._parse_all_groups,
trigger=CronTrigger.from_crontab('0 */6 * * *'),
id='parse_all_groups',
name='Parse all group members',
replace_existing=True
)
logger.info("✅ Добавлена задача парсинга участников (каждые 6 часов)")
async def _parse_all_groups(self):
"""Парсить участников всех активных групп"""
try:
async with self.SessionLocal() as session:
group_repo = GroupRepository(session)
groups = await group_repo.get_active_groups()
for group in groups:
parse_group_members_task.delay(
group.id,
group.chat_id,
limit=Config.MAX_MEMBERS_TO_LOAD
)
logger.info(f"✅ Запущен парсинг {len(groups)} групп")
except Exception as e:
logger.error(f"❌ Ошибка при парсинге групп: {e}")
async def pause_schedule(self, job_id: str):
"""Приостановить расписание"""
try:
job = self.scheduler.get_job(job_id)
if job:
job.pause()
logger.info(f"⏸️ Расписание приостановлено: {job_id}")
return True
except Exception as e:
logger.error(f"❌ Ошибка при паузе расписания: {e}")
return False
async def resume_schedule(self, job_id: str):
"""Возобновить расписание"""
try:
job = self.scheduler.get_job(job_id)
if job:
job.resume()
logger.info(f"▶️ Расписание возобновлено: {job_id}")
return True
except Exception as e:
logger.error(f"❌ Ошибка при возобновлении расписания: {e}")
return False
# Глобальный экземпляр планировщика
broadcast_scheduler = BroadcastScheduler()
# Вспомогательные функции для работы с расписанием
async def schedule_broadcast(message_id: int, group_ids: list, cron_expr: str):
"""Расписать рассылку сообщения"""
return await broadcast_scheduler.add_broadcast_schedule(
message_id=message_id,
group_ids=group_ids,
cron_expr=cron_expr,
description=f"Broadcast message {message_id}"
)
async def cancel_broadcast(job_id: str):
"""Отменить расписанную рассылку"""
return await broadcast_scheduler.remove_broadcast_schedule(job_id)
async def list_broadcasts():
"""Получить список всех расписаний"""
return await broadcast_scheduler.list_schedules()
# Примеры cron выражений
"""
Cron формат: minute hour day month day_of_week
Примеры:
- '0 9 * * *' - ежедневно в 9:00 UTC
- '0 9 * * MON' - по понедельникам в 9:00 UTC
- '0 */6 * * *' - каждые 6 часов
- '0 9,14,18 * * *' - в 9:00, 14:00 и 18:00 UTC ежедневно
- '*/30 * * * *' - каждые 30 минут
- '0 0 * * *' - ежедневно в полночь UTC
- '0 0 1 * *' - первого числа каждого месяца в полночь UTC
- '0 0 * * 0' - по воскресеньям в полночь UTC
"""