Files
Touchh/scheduler/tasks.py
2024-12-24 15:43:43 +09:00

68 lines
2.3 KiB
Python

from apscheduler.schedulers.base import BaseScheduler
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from importlib import import_module
from scheduler.models import ScheduledTask
import importlib
from apscheduler.triggers.cron import CronTrigger
def format_weekdays(weekdays):
"""Преобразует список дней недели в строку."""
if isinstance(weekdays, list):
return ",".join(map(str, weekdays))
return str(weekdays)
def run_task(task):
"""
Выполняет задачу, указанную в модели ScheduledTask.
"""
module_name, func_name = task.module_path.rsplit(".", 1)
module = import_module(module_name)
func = getattr(module, func_name)
func()
def setup_scheduler():
"""Настройка планировщика задач из БД."""
print("Настройка планировщика задач...")
scheduler = AsyncIOScheduler()
tasks = ScheduledTask.objects.filter(active=True)
for task in tasks:
scheduler.add_job(
run_task,
"cron",
id=task.name,
minute=task.cron_minute,
hour=task.cron_hour,
day=task.cron_day,
month=task.cron_month,
day_of_week=task.cron_weekday,
args=[task],
)
scheduler.start()
print("Планировщик запущен.")
return scheduler
def load_tasks_to_scheduler(scheduler):
"""
Загружает активные задачи в планировщик.
"""
tasks = ScheduledTask.objects.filter(active=True)
for task in tasks:
try:
module_name, func_name = task.function_path.rsplit('.', 1)
module = import_module(module_name)
func = getattr(module, func_name)
scheduler.add_job(
func,
trigger="cron",
minute=task.minutes,
hour=task.hours,
day=task.days or "*",
month=task.months or "*",
day_of_week=task.weekdays or "*",
id=str(task.id),
replace_existing=True,
)
except Exception as e:
print(f"Ошибка при добавлении задачи '{task.task_name}': {e}")