from apscheduler.schedulers.base import BaseScheduler from apscheduler.schedulers.asyncio import AsyncIOScheduler from importlib import import_module from scheduler.models import ScheduledTask import importlib from apscheduler.triggers.cron import CronTrigger def format_weekdays(weekdays): """Преобразует список дней недели в строку.""" if isinstance(weekdays, list): return ",".join(map(str, weekdays)) return str(weekdays) def run_task(task): """ Выполняет задачу, указанную в модели ScheduledTask. """ module_name, func_name = task.module_path.rsplit(".", 1) module = import_module(module_name) func = getattr(module, func_name) func() def setup_scheduler(): """Настройка планировщика задач из БД.""" print("Настройка планировщика задач...") scheduler = AsyncIOScheduler() tasks = ScheduledTask.objects.filter(active=True) for task in tasks: scheduler.add_job( run_task, "cron", id=task.name, minute=task.cron_minute, hour=task.cron_hour, day=task.cron_day, month=task.cron_month, day_of_week=task.cron_weekday, args=[task], ) scheduler.start() print("Планировщик запущен.") return scheduler def load_tasks_to_scheduler(scheduler: BaseScheduler): tasks = ScheduledTask.objects.filter(active=True) for task in tasks: try: module_name, func_name = task.function_path.rsplit('.', 1) module = import_module(module_name) func = getattr(module, func_name) scheduler.add_job( func, trigger="cron", minute=task.minutes, hour=task.hours, day=task.days or "*", month=task.months or "*", day_of_week=task.weekdays or "*", id=str(task.id), replace_existing=True, ) except Exception as e: print(f"Ошибка при добавлении задачи '{task.task_name}': {e}")