import asyncio from typing import Dict, Any, List from app.db.session import get_session from app.db.models import ModerationLog, MessageEvent async def moderation_writer( queue: asyncio.Queue, flush_interval: float = 0.3, max_batch: int = 200, ): """ Фоновый воркер: получает события из очереди и записывает в БД пачками. Ожидаемые элементы очереди — dict со схемой: - {'type':'log', 'chat_id':..., 'user_id':..., 'message_id':..., 'reason':..., 'action':...} - {'type':'event', 'chat_id':..., 'user_id':..., 'message_id':..., 'content_hash':...} """ buf: List[Dict[str, Any]] = [] while True: try: item = await asyncio.wait_for(queue.get(), timeout=flush_interval) buf.append(item) except asyncio.TimeoutError: pass if not buf: continue # ограничим размер пачки batch, buf = (buf[:max_batch], buf[max_batch:]) if len(buf) > max_batch else (buf[:], []) try: with get_session() as s: for ev in batch: t = ev.get("type") if t == "log": s.add( ModerationLog( chat_id=ev.get("chat_id", 0), tg_user_id=ev.get("user_id", 0), message_id=ev.get("message_id"), reason=ev.get("reason", "")[:4000], action=ev.get("action", "")[:32], ) ) elif t == "event": s.add( MessageEvent( chat_id=ev.get("chat_id", 0), tg_user_id=ev.get("user_id", 0), message_id=ev.get("message_id"), content_hash=ev.get("content_hash"), ) ) s.commit() except Exception: # на ошибке просто пропускаем пачку, чтобы не зациклиться pass