CarPass Admin
+Control Center
+Пилотный контур
+Операционный обзор
+ Загружаю доступ и источники данных... +Сервис
+{escape(notification.event_type)}",
+ f"Открыть: {escape(link)}",
+ ]
+ if item
+ )
+ errors: list[str] = []
+ async with httpx.AsyncClient(timeout=8) as client:
+ for chat_id in recipients:
+ try:
+ response = await client.post(
+ f"https://api.telegram.org/bot{settings.bot_token}/sendMessage",
+ json={
+ "chat_id": chat_id,
+ "text": text,
+ "parse_mode": "HTML",
+ "disable_web_page_preview": True,
+ },
+ )
+ response.raise_for_status()
+ except Exception as error: # noqa: BLE001 - notification delivery is best-effort
+ logger.warning("Admin Telegram notification failed: %s", error)
+ errors.append(str(error))
+ if errors:
+ notification.telegram_status = "failed"
+ notification.telegram_error = "; ".join(errors)[:2000]
+ else:
+ notification.telegram_status = "sent"
+
+
+async def retry_admin_telegram_notifications(session: AsyncSession, *, limit: int = 50) -> int:
+ result = await session.execute(
+ select(AdminNotification)
+ .where(AdminNotification.telegram_status.in_(["pending", "failed"]))
+ .order_by(AdminNotification.created_at.asc())
+ .limit(limit)
+ )
+ delivered = 0
+ for notification in result.scalars():
+ await send_admin_telegram_notification(notification)
+ if notification.telegram_status == "sent":
+ delivered += 1
+ await session.commit()
+ return delivered
+
+
+async def mark_admin_notification_read(
+ session: AsyncSession, notification: AdminNotification
+) -> AdminNotification:
+ notification.status = "read"
+ notification.read_at = datetime.now(UTC)
+ await session.flush()
+ return notification
+
+
+async def dismiss_admin_notification(
+ session: AsyncSession, notification: AdminNotification
+) -> AdminNotification:
+ notification.status = "dismissed"
+ notification.dismissed_at = datetime.now(UTC)
+ await session.flush()
+ return notification
diff --git a/app/services/rate_limit.py b/app/services/rate_limit.py
index eb02fec..83ca023 100644
--- a/app/services/rate_limit.py
+++ b/app/services/rate_limit.py
@@ -41,7 +41,13 @@ async def check_rate_limit(
if settings.redis_url:
allowed = await check_redis_rate_limit(scope, identifiers, limit, window_seconds)
if not allowed:
- await log_rate_limit_event(session, scope=scope, identifier="redis")
+ await log_rate_limit_event(
+ session,
+ scope=scope,
+ identifier="redis",
+ user=user,
+ request=request,
+ )
raise_rate_limit(scope, window_seconds)
return
@@ -52,7 +58,13 @@ async def check_rate_limit(
while bucket and now - bucket[0] > window_seconds:
bucket.popleft()
if len(bucket) >= limit:
- await log_rate_limit_event(session, scope=scope, identifier=str(identifier))
+ await log_rate_limit_event(
+ session,
+ scope=scope,
+ identifier=str(identifier),
+ user=user,
+ request=request,
+ )
raise_rate_limit(scope, window_seconds)
for identifier in identifiers:
_buckets[(scope, identifier)].append(now)
@@ -107,18 +119,82 @@ async def log_rate_limit_event(
*,
scope: str,
identifier: str,
+ user: User | None = None,
+ request: Request | None = None,
) -> None:
- if session is None:
- return
- from app.models.car import AuditLog
+ client_host = request.client.host if request and request.client else None
+ user_agent = request.headers.get("user-agent") if request else None
+ metadata = {
+ "scope": scope,
+ "identifier": identifier,
+ "telegram_id": user.telegram_id if user else None,
+ "user_id": user.id if user else None,
+ "ip": client_host,
+ }
- session.add(
- AuditLog(
- actor_user_id=None,
- actor_role="system",
- action="rate_limit.exceeded",
- target_type=scope,
- target_id=identifier[:80],
- metadata_json={"scope": scope, "identifier": identifier},
- )
+ if session is None:
+ from app.db.session import async_session_factory
+
+ async with async_session_factory() as event_session:
+ await persist_rate_limit_event(
+ event_session,
+ scope=scope,
+ identifier=identifier,
+ user=user,
+ client_host=client_host,
+ user_agent=user_agent,
+ metadata=metadata,
+ )
+ return
+
+ await persist_rate_limit_event(
+ session,
+ scope=scope,
+ identifier=identifier,
+ user=user,
+ client_host=client_host,
+ user_agent=user_agent,
+ metadata=metadata,
)
+
+
+async def persist_rate_limit_event(
+ event_session: AsyncSession,
+ *,
+ scope: str,
+ identifier: str,
+ user: User | None,
+ client_host: str | None,
+ user_agent: str | None,
+ metadata: dict,
+) -> None:
+ from app.models.car import AuditLog
+ from app.services.admin_notifications import create_admin_notification
+
+ try:
+ event_session.add(
+ AuditLog(
+ actor_user_id=user.id if user else None,
+ actor_role=user.platform_role if user else "system",
+ action="rate_limit.exceeded",
+ target_type=scope,
+ target_id=identifier[:80],
+ metadata_json=metadata,
+ ip=client_host,
+ user_agent=user_agent[:256] if user_agent else None,
+ )
+ )
+ await create_admin_notification(
+ event_session,
+ event_type="rate_limit_exceeded",
+ title="Rate limit exceeded",
+ body=f"Scope: {scope}\nIdentifier: {identifier}",
+ entity_type="user" if user else "system",
+ entity_id=user.id if user else scope,
+ severity="warning",
+ idempotency_key=f"rate_limit:{scope}:{identifier}:{int(time.time() // max(60, 1))}",
+ metadata=metadata,
+ )
+ await event_session.commit()
+ except Exception:
+ await event_session.rollback()
diff --git a/bot/api_client.py b/bot/api_client.py
index 356fcb4..7ef9126 100644
--- a/bot/api_client.py
+++ b/bot/api_client.py
@@ -8,6 +8,21 @@ from app.core.config import settings
class ApiClient:
def __init__(self) -> None:
self.base_url = settings.api_base_url.rstrip("/")
+ self._client: httpx.AsyncClient | None = None
+
+ @property
+ def client(self) -> httpx.AsyncClient:
+ if self._client is None or self._client.is_closed:
+ self._client = httpx.AsyncClient(
+ base_url=self.base_url,
+ timeout=httpx.Timeout(15.0, connect=5.0),
+ limits=httpx.Limits(max_connections=100, max_keepalive_connections=20),
+ )
+ return self._client
+
+ async def close(self) -> None:
+ if self._client is not None:
+ await self._client.aclose()
def headers(self, telegram_id: int | None = None) -> dict[str, str]:
headers = {"X-Internal-API-Token": settings.internal_api_token}
@@ -24,18 +39,17 @@ class ApiClient:
json: dict[str, Any] | None = None,
params: dict[str, Any] | None = None,
) -> Any:
- async with httpx.AsyncClient(base_url=self.base_url, timeout=15) as client:
- response = await client.request(
- method,
- path,
- json=json,
- params=params,
- headers=self.headers(telegram_id),
- )
- response.raise_for_status()
- if response.status_code == 204:
- return None
- return response.json()
+ response = await self.client.request(
+ method,
+ path,
+ json=json,
+ params=params,
+ headers=self.headers(telegram_id),
+ )
+ response.raise_for_status()
+ if response.status_code == 204:
+ return None
+ return response.json()
async def upsert_user(self, telegram_user: Any) -> dict[str, Any]:
payload = {
@@ -44,34 +58,30 @@ class ApiClient:
"first_name": telegram_user.first_name,
"last_name": telegram_user.last_name,
}
- async with httpx.AsyncClient(base_url=self.base_url, timeout=10) as client:
- response = await client.post("/api/users", json=payload, headers=self.headers())
- response.raise_for_status()
- return response.json()
+ response = await self.client.post("/api/users", json=payload, headers=self.headers())
+ response.raise_for_status()
+ return response.json()
async def list_cars(self, owner_id: int, telegram_id: int) -> list[dict[str, Any]]:
- async with httpx.AsyncClient(base_url=self.base_url, timeout=10) as client:
- response = await client.get(
- "/api/cars", params={"owner_id": owner_id}, headers=self.headers(telegram_id)
- )
- response.raise_for_status()
- return response.json()
+ response = await self.client.get(
+ "/api/cars", params={"owner_id": owner_id}, headers=self.headers(telegram_id)
+ )
+ response.raise_for_status()
+ return response.json()
async def create_car(self, owner_id: int, name: str, telegram_id: int) -> dict[str, Any]:
- async with httpx.AsyncClient(base_url=self.base_url, timeout=10) as client:
- response = await client.post(
- "/api/cars",
- json={"owner_id": owner_id, "name": name},
- headers=self.headers(telegram_id),
- )
- response.raise_for_status()
- return response.json()
+ response = await self.client.post(
+ "/api/cars",
+ json={"owner_id": owner_id, "name": name},
+ headers=self.headers(telegram_id),
+ )
+ response.raise_for_status()
+ return response.json()
async def stats(self, car_id: int, telegram_id: int) -> dict[str, Any]:
- async with httpx.AsyncClient(base_url=self.base_url, timeout=10) as client:
- response = await client.get(f"/api/cars/{car_id}/stats", headers=self.headers(telegram_id))
- response.raise_for_status()
- return response.json()
+ response = await self.client.get(f"/api/cars/{car_id}/stats", headers=self.headers(telegram_id))
+ response.raise_for_status()
+ return response.json()
async def create_fuel(self, telegram_id: int, payload: dict[str, Any]) -> dict[str, Any]:
return await self.request("POST", "/api/fuel", telegram_id=telegram_id, json=payload)
@@ -126,6 +136,15 @@ class ApiClient:
async def pending_service_centers(self, telegram_id: int) -> list[dict[str, Any]]:
return await self.request("GET", "/api/admin/service-centers/pending", telegram_id=telegram_id)
+ async def admin_dashboard(self, telegram_id: int) -> dict[str, Any]:
+ return await self.request("GET", "/api/admin/dashboard", telegram_id=telegram_id)
+
+ async def admin_users(self, telegram_id: int) -> dict[str, Any]:
+ return await self.request("GET", "/api/admin/users", telegram_id=telegram_id, params={"limit": 10})
+
+ async def admin_alerts(self, telegram_id: int) -> dict[str, Any]:
+ return await self.request("GET", "/api/admin/notifications", telegram_id=telegram_id, params={"limit": 10})
+
async def moderate_service_center(
self,
telegram_id: int,
diff --git a/bot/main.py b/bot/main.py
index 571b713..af767b1 100644
--- a/bot/main.py
+++ b/bot/main.py
@@ -433,6 +433,7 @@ async def register_sto(message: Message, command: CommandObject) -> None:
@dp.message(Command("admin_sto_pending"))
+@dp.message(Command("admin_pending_sto"))
async def admin_sto_pending(message: Message) -> None:
await upsert(message)
try:
@@ -459,6 +460,77 @@ async def admin_sto_pending(message: Message) -> None:
await message.answer(text, reply_markup=admin_card_keyboard(center["id"]))
+@dp.message(Command("admin"))
+async def admin_home(message: Message) -> None:
+ await upsert(message)
+ try:
+ await api.admin_dashboard(message.from_user.id)
+ except httpx.HTTPStatusError as error:
+ await message.answer(f"Админка недоступна: {error.response.text}")
+ return
+ await message.answer(
+ "Admin Control Center: уведомления, пользователи, СТО, заявки, Data Explorer и Audit Log.",
+ reply_markup=webapp_inline_keyboard("Открыть админку", "admin.html"),
+ )
+
+
+@dp.message(Command("admin_stats"))
+async def admin_stats(message: Message) -> None:
+ await upsert(message)
+ try:
+ dashboard = await api.admin_dashboard(message.from_user.id)
+ except httpx.HTTPStatusError as error:
+ await message.answer(f"Нет доступа к admin stats: {error.response.text}")
+ return
+ await message.answer(
+ "\n".join(
+ [
+ "Admin stats",
+ f"Users today: {dashboard['users_today']}",
+ f"Users total: {dashboard['users_total']}",
+ f"STO pending: {dashboard['pending_sto_applications']}",
+ f"Appointments today: {dashboard['appointments_today']}",
+ f"Work orders active: {dashboard['active_work_orders']}",
+ f"Errors/security: {dashboard['system_errors']} / {dashboard['security_events']}",
+ ]
+ ),
+ reply_markup=webapp_inline_keyboard("Admin dashboard", "admin.html"),
+ )
+
+
+@dp.message(Command("admin_users"))
+async def admin_users(message: Message) -> None:
+ await upsert(message)
+ try:
+ data = await api.admin_users(message.from_user.id)
+ except httpx.HTTPStatusError as error:
+ await message.answer(f"Нет доступа к admin users: {error.response.text}")
+ return
+ lines = ["Последние пользователи:"]
+ for row in data.get("rows", [])[:10]:
+ lines.append(f"#{row.get('id')} {row.get('username') or '-'} · {row.get('platform_role')} · {row.get('created_at')}")
+ await message.answer("\n".join(lines), reply_markup=webapp_inline_keyboard("Users", "admin.html?section=users"))
+
+
+@dp.message(Command("admin_sto"))
+async def admin_sto(message: Message) -> None:
+ await admin_sto_pending(message)
+
+
+@dp.message(Command("admin_alerts"))
+async def admin_alerts(message: Message) -> None:
+ await upsert(message)
+ try:
+ data = await api.admin_alerts(message.from_user.id)
+ except httpx.HTTPStatusError as error:
+ await message.answer(f"Нет доступа к admin alerts: {error.response.text}")
+ return
+ lines = ["Admin alerts:"]
+ for row in data.get("rows", [])[:10]:
+ lines.append(f"#{row.get('id')} {row.get('severity')} · {row.get('title')} · {row.get('status')}")
+ await message.answer("\n".join(lines), reply_markup=webapp_inline_keyboard("Alerts", "admin.html?section=notifications"))
+
+
async def admin_action(message: Message, command: CommandObject, action: str) -> None:
args = (command.args or "").split(maxsplit=1)
if not args:
@@ -577,7 +649,14 @@ async def admin_callback(callback: CallbackQuery) -> None:
@dp.message(F.text == "Помощь")
@dp.message(Command("help"))
async def help_message(message: Message) -> None:
+ user = await api.upsert_user(message.from_user)
centers = await sto_workplace_centers(message.from_user.id)
+ admin_help = (
+ "Админ: /admin — панель, /admin_stats — метрики, /admin_users — последние пользователи, "
+ "/admin_pending_sto — заявки СТО, /admin_alerts — события.\n"
+ if user.get("platform_role") in {"admin", "super_admin", "moderator", "support", "analyst"}
+ else ""
+ )
sto_workplace_help = (
"• /sto_bookings или /sto_workplace — панель подтвержденного СТО;\n"
"• /accept_sto_invite CarPass Admin
+Пилотный контур
+Сервис
+| ${escapeHtml(column)} | `).join("")} + ${hasActions ? "Действия | " : ""} +
|---|---|
| ${valueOrDash(row[column])} | `).join("")} + ${ + hasActions + ? `+ ${ + config.editable?.length + ? `` + : "" + } + ${ + config.deletable + ? `` + : "" + } + | ` + : "" + } +
${escapeHtml(item.body || "")}
+