Files
drivers_bot/app/services/notifications.py
VPN SaaS Dev 545f4d088d
Some checks failed
ci / test (push) Has been cancelled
Add owner work order approval page
2026-05-16 10:51:05 +09:00

84 lines
2.9 KiB
Python

import json
from datetime import UTC, datetime, timedelta
import httpx
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from app.core.config import settings
from app.models.car import ServiceNotification
from app.models.user import User
MODERATOR_ROLES = {"admin", "verifier", "moderator"}
async def notify_user(
user: User,
text: str,
*,
web_app_url: str | None = None,
button_text: str = "Открыть",
) -> bool:
if not settings.bot_token or settings.app_env == "test":
return False
data: dict[str, str] = {"chat_id": str(user.telegram_id), "text": text}
if web_app_url:
data["reply_markup"] = json.dumps(
{"inline_keyboard": [[{"text": button_text, "web_app": {"url": web_app_url}}]]},
ensure_ascii=False,
)
try:
async with httpx.AsyncClient(timeout=5) as client:
response = await client.post(
f"https://api.telegram.org/bot{settings.bot_token}/sendMessage",
data=data,
)
return response.status_code < 400
except Exception:
return False
async def notify_platform_moderators(session: AsyncSession, text: str) -> None:
result = await session.execute(select(User).where(User.platform_role.in_(MODERATOR_ROLES)))
for user in result.scalars():
await notify_user(user, text)
async def retry_failed_notifications(session: AsyncSession, *, limit: int = 50) -> int:
return await process_notification_queue(session, limit=limit)
async def process_notification_queue(session: AsyncSession, *, limit: int = 50) -> int:
now = datetime.now(UTC)
result = await session.execute(
select(ServiceNotification)
.where(
ServiceNotification.status.in_(["pending", "failed", "retrying"]),
ServiceNotification.retry_count < 5,
)
.order_by(ServiceNotification.created_at.asc())
.limit(limit)
)
delivered = 0
for notification in result.scalars():
if notification.status == "retrying" and notification.created_at > now - timedelta(seconds=30):
continue
notification.status = "processing"
user = await session.get(User, notification.recipient_user_id)
if user is None:
notification.status = "abandoned"
notification.last_error = "recipient_not_found"
continue
ok = await notify_user(user, f"{notification.title}\n{notification.body}" if notification.body else notification.title)
notification.retry_count += 1
if ok:
notification.status = "sent"
notification.sent_at = datetime.now(UTC)
notification.last_error = None
delivered += 1
else:
notification.status = "abandoned" if notification.retry_count >= 5 else "retrying"
notification.last_error = "telegram_delivery_failed"
await session.commit()
return delivered