import json from datetime import UTC, datetime, timedelta import httpx from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession from app.core.config import settings from app.models.car import ServiceNotification from app.models.user import User MODERATOR_ROLES = {"admin", "verifier", "moderator"} async def notify_user( user: User, text: str, *, web_app_url: str | None = None, button_text: str = "Открыть", ) -> bool: if not settings.bot_token or settings.app_env == "test": return False data: dict[str, str] = {"chat_id": str(user.telegram_id), "text": text} if web_app_url: data["reply_markup"] = json.dumps( {"inline_keyboard": [[{"text": button_text, "web_app": {"url": web_app_url}}]]}, ensure_ascii=False, ) try: async with httpx.AsyncClient(timeout=5) as client: response = await client.post( f"https://api.telegram.org/bot{settings.bot_token}/sendMessage", data=data, ) return response.status_code < 400 except Exception: return False async def notify_platform_moderators(session: AsyncSession, text: str) -> None: result = await session.execute(select(User).where(User.platform_role.in_(MODERATOR_ROLES))) for user in result.scalars(): await notify_user(user, text) async def retry_failed_notifications(session: AsyncSession, *, limit: int = 50) -> int: return await process_notification_queue(session, limit=limit) async def process_notification_queue(session: AsyncSession, *, limit: int = 50) -> int: now = datetime.now(UTC) result = await session.execute( select(ServiceNotification) .where( ServiceNotification.status.in_(["pending", "failed", "retrying"]), ServiceNotification.retry_count < 5, ) .order_by(ServiceNotification.created_at.asc()) .limit(limit) ) delivered = 0 for notification in result.scalars(): if notification.status == "retrying" and notification.created_at > now - timedelta(seconds=30): continue notification.status = "processing" user = await session.get(User, notification.recipient_user_id) if user is None: notification.status = "abandoned" notification.last_error = "recipient_not_found" continue ok = await notify_user(user, f"{notification.title}\n{notification.body}" if notification.body else notification.title) notification.retry_count += 1 if ok: notification.status = "sent" notification.sent_at = datetime.now(UTC) notification.last_error = None delivered += 1 else: notification.status = "abandoned" if notification.retry_count >= 5 else "retrying" notification.last_error = "telegram_delivery_failed" await session.commit() return delivered