71 lines
2.6 KiB
Python
71 lines
2.6 KiB
Python
from datetime import UTC, datetime, timedelta
|
|
|
|
import httpx
|
|
from sqlalchemy import select
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
|
|
from app.core.config import settings
|
|
from app.models.car import ServiceNotification
|
|
from app.models.user import User
|
|
|
|
MODERATOR_ROLES = {"admin", "verifier", "moderator"}
|
|
|
|
|
|
async def notify_user(user: User, text: str) -> bool:
|
|
if not settings.bot_token or settings.app_env == "test":
|
|
return False
|
|
try:
|
|
async with httpx.AsyncClient(timeout=5) as client:
|
|
response = await client.post(
|
|
f"https://api.telegram.org/bot{settings.bot_token}/sendMessage",
|
|
data={"chat_id": str(user.telegram_id), "text": text},
|
|
)
|
|
return response.status_code < 400
|
|
except Exception:
|
|
return False
|
|
|
|
|
|
async def notify_platform_moderators(session: AsyncSession, text: str) -> None:
|
|
result = await session.execute(select(User).where(User.platform_role.in_(MODERATOR_ROLES)))
|
|
for user in result.scalars():
|
|
await notify_user(user, text)
|
|
|
|
|
|
async def retry_failed_notifications(session: AsyncSession, *, limit: int = 50) -> int:
|
|
return await process_notification_queue(session, limit=limit)
|
|
|
|
|
|
async def process_notification_queue(session: AsyncSession, *, limit: int = 50) -> int:
|
|
now = datetime.now(UTC)
|
|
result = await session.execute(
|
|
select(ServiceNotification)
|
|
.where(
|
|
ServiceNotification.status.in_(["pending", "failed", "retrying"]),
|
|
ServiceNotification.retry_count < 5,
|
|
)
|
|
.order_by(ServiceNotification.created_at.asc())
|
|
.limit(limit)
|
|
)
|
|
delivered = 0
|
|
for notification in result.scalars():
|
|
if notification.status == "retrying" and notification.created_at > now - timedelta(seconds=30):
|
|
continue
|
|
notification.status = "processing"
|
|
user = await session.get(User, notification.recipient_user_id)
|
|
if user is None:
|
|
notification.status = "abandoned"
|
|
notification.last_error = "recipient_not_found"
|
|
continue
|
|
ok = await notify_user(user, f"{notification.title}\n{notification.body}" if notification.body else notification.title)
|
|
notification.retry_count += 1
|
|
if ok:
|
|
notification.status = "sent"
|
|
notification.sent_at = datetime.now(UTC)
|
|
notification.last_error = None
|
|
delivered += 1
|
|
else:
|
|
notification.status = "abandoned" if notification.retry_count >= 5 else "retrying"
|
|
notification.last_error = "telegram_delivery_failed"
|
|
await session.commit()
|
|
return delivered
|