Notify moderators about service center applications
This commit is contained in:
27
app/services/notifications.py
Normal file
27
app/services/notifications.py
Normal file
@@ -0,0 +1,27 @@
|
||||
import httpx
|
||||
from sqlalchemy import select
|
||||
from sqlalchemy.ext.asyncio import AsyncSession
|
||||
|
||||
from app.core.config import settings
|
||||
from app.models.user import User
|
||||
|
||||
MODERATOR_ROLES = {"admin", "verifier", "moderator"}
|
||||
|
||||
|
||||
async def notify_user(user: User, text: str) -> None:
|
||||
if not settings.bot_token or settings.app_env == "test":
|
||||
return
|
||||
try:
|
||||
async with httpx.AsyncClient(timeout=5) as client:
|
||||
await client.post(
|
||||
f"https://api.telegram.org/bot{settings.bot_token}/sendMessage",
|
||||
data={"chat_id": str(user.telegram_id), "text": text},
|
||||
)
|
||||
except Exception:
|
||||
return
|
||||
|
||||
|
||||
async def notify_platform_moderators(session: AsyncSession, text: str) -> None:
|
||||
result = await session.execute(select(User).where(User.platform_role.in_(MODERATOR_ROLES)))
|
||||
for user in result.scalars():
|
||||
await notify_user(user, text)
|
||||
Reference in New Issue
Block a user