Files
TG_autoposter/app/celery_tasks.py
2025-12-18 05:55:32 +09:00

260 lines
10 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
Celery задачи для асинхронной обработки
"""
import logging
from celery import shared_task
from datetime import datetime, timedelta
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker
from app.settings import Config
from app.database.repository import GroupRepository, MessageRepository, MessageGroupRepository
from app.database.member_repository import GroupMemberRepository, GroupStatisticsRepository
from app.handlers.telethon_client import telethon_manager
from app.handlers.group_parser import GroupParser
from app.models import Base
logger = logging.getLogger(__name__)
async def get_db_session():
"""Получить сессию БД для Celery задач"""
engine = create_async_engine(Config.DATABASE_URL, echo=False)
SessionLocal = sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
async with SessionLocal() as session:
yield session
@shared_task(name='app.celery_tasks.send_message_task')
def send_message_task(message_id: int, group_id: int, chat_id: str, message_text: str):
"""
Задача для отправки сообщения в группу
Args:
message_id: ID сообщения в БД
group_id: ID группы в БД
chat_id: ID чата в Telegram
message_text: Текст сообщения
"""
import asyncio
async def _send():
# Инициализировать Telethon если необходимо
if Config.USE_TELETHON and not telethon_manager.is_connected():
await telethon_manager.initialize()
engine = create_async_engine(Config.DATABASE_URL, echo=False)
SessionLocal = sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
async with SessionLocal() as session:
from app.handlers.hybrid_sender import HybridMessageSender
from telegram.ext import Application
# Получить Application (нужен для гибридного отправителя)
# В Celery контексте создаем минималистичный объект
app = type('obj', (object,), {'bot': type('obj', (object,), {})()})()
sender = HybridMessageSender(app.bot, session)
try:
success, method = await sender.send_message_with_retry(
chat_id=chat_id,
message_text=message_text,
group_id=group_id,
max_retries=Config.MAX_RETRIES
)
if success:
# Обновить статус в БД
message_group_repo = MessageGroupRepository(session)
await message_group_repo.mark_as_sent(message_id, group_id)
logger.info(f"✅ Задача отправки выполнена: сообщение {message_id} в группу {group_id} (способ: {method})")
return {'status': 'success', 'method': method}
else:
logger.error(f"❌ Ошибка отправки сообщения {message_id} в группу {group_id}")
return {'status': 'failed'}
except Exception as e:
logger.error(f"❌ Ошибка в задаче отправки: {e}")
return {'status': 'error', 'error': str(e)}
finally:
await engine.dispose()
return asyncio.run(_send())
@shared_task(name='app.celery_tasks.parse_group_members_task')
def parse_group_members_task(group_id: int, chat_id: str, limit: int = 1000):
"""
Задача для загрузки участников группы
Args:
group_id: ID группы в БД
chat_id: ID чата в Telegram
limit: Максимум участников для загрузки
"""
import asyncio
async def _parse():
if Config.USE_TELETHON and not telethon_manager.is_connected():
await telethon_manager.initialize()
engine = create_async_engine(Config.DATABASE_URL, echo=False)
SessionLocal = sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
async with SessionLocal() as session:
member_repo = GroupMemberRepository(session)
parser = GroupParser(session)
try:
result = await parser.parse_group_members(
chat_id=int(chat_id),
member_repo=member_repo,
limit=limit
)
logger.info(f"✅ Задача парсинга завершена: группа {group_id} - {result}")
# Коммитить изменения
await session.commit()
return result
except Exception as e:
logger.error(f"❌ Ошибка в задаче парсинга: {e}")
await session.rollback()
return {'success': False, 'error': str(e)}
finally:
await engine.dispose()
return asyncio.run(_parse())
@shared_task(name='app.celery_tasks.cleanup_old_messages_task')
def cleanup_old_messages_task(days: int = 30):
"""
Задача для очистки старых сообщений из БД
Args:
days: Удалить сообщения старше N дней
"""
import asyncio
async def _cleanup():
engine = create_async_engine(Config.DATABASE_URL, echo=False)
SessionLocal = sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
async with SessionLocal() as session:
message_repo = MessageRepository(session)
try:
cutoff_date = datetime.utcnow() - timedelta(days=days)
count = await message_repo.delete_before_date(cutoff_date)
logger.info(f"✅ Очистка завершена: удалено {count} сообщений старше {days} дней")
await session.commit()
return {'status': 'success', 'deleted_count': count}
except Exception as e:
logger.error(f"❌ Ошибка в задаче очистки: {e}")
await session.rollback()
return {'status': 'error', 'error': str(e)}
finally:
await engine.dispose()
return asyncio.run(_cleanup())
@shared_task(name='app.celery_tasks.broadcast_message_task')
def broadcast_message_task(message_id: int, group_ids: list):
"""
Задача для рассылки сообщения в несколько групп
Args:
message_id: ID сообщения в БД
group_ids: Список ID групп в БД
"""
import asyncio
from app.handlers.hybrid_sender import HybridMessageSender
async def _broadcast():
if Config.USE_TELETHON and not telethon_manager.is_connected():
await telethon_manager.initialize()
engine = create_async_engine(Config.DATABASE_URL, echo=False)
SessionLocal = sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
async with SessionLocal() as session:
message_repo = MessageRepository(session)
group_repo = GroupRepository(session)
message_group_repo = MessageGroupRepository(session)
try:
# Получить сообщение
message = await message_repo.get_by_id(message_id)
if not message:
logger.error(f"Сообщение {message_id} не найдено")
return {'status': 'error', 'error': 'Message not found'}
# Получить группы
groups = []
for gid in group_ids:
group = await group_repo.get_by_id(gid)
if group:
groups.append(group)
# Отправить во все группы
app = type('obj', (object,), {'bot': type('obj', (object,), {})()})()
sender = HybridMessageSender(app.bot, session)
results = {
'total': len(groups),
'success': 0,
'failed': 0,
'via_bot': 0,
'via_client': 0
}
for group in groups:
success, method = await sender.send_message_with_retry(
chat_id=group.chat_id,
message_text=message.text,
group_id=group.id,
max_retries=Config.MAX_RETRIES
)
if success:
results['success'] += 1
if method == 'bot':
results['via_bot'] += 1
else:
results['via_client'] += 1
await message_group_repo.mark_as_sent(message_id, group.id)
else:
results['failed'] += 1
await session.commit()
logger.info(f"✅ Рассылка завершена: {results}")
return results
except Exception as e:
logger.error(f"❌ Ошибка в задаче рассылки: {e}")
await session.rollback()
return {'status': 'error', 'error': str(e)}
finally:
await engine.dispose()
return asyncio.run(_broadcast())
# Периодические задачи
@shared_task(name='app.celery_tasks.health_check_task')
def health_check_task():
"""Проверка здоровья системы"""
logger.info("✅ Health check выполнен")
return {'status': 'healthy'}