diff --git a/.env.example b/.env.example
index 5d8d1c4..98bbc09 100644
--- a/.env.example
+++ b/.env.example
@@ -22,3 +22,8 @@ OCR_PROVIDER=tesseract
OCR_LANGUAGES=eng+rus+kor
ADMIN_TELEGRAM_IDS=
ADMIN_BOOTSTRAP_TOKEN=
+ADMIN_NOTIFICATION_CHAT_ID=
+ADMIN_NOTIFY_NEW_USERS=true
+ADMIN_NOTIFY_STO_APPLICATIONS=true
+ADMIN_NOTIFY_SECURITY_EVENTS=true
+ADMIN_NOTIFY_SYSTEM_ERRORS=true
diff --git a/alembic/versions/202605170001_admin_notifications_data_explorer.py b/alembic/versions/202605170001_admin_notifications_data_explorer.py
new file mode 100644
index 0000000..9a48616
--- /dev/null
+++ b/alembic/versions/202605170001_admin_notifications_data_explorer.py
@@ -0,0 +1,85 @@
+"""admin notifications and data explorer jobs
+
+Revision ID: 202605170001
+Revises: 202605160002
+Create Date: 2026-05-17 00:00:00.000000
+"""
+
+from collections.abc import Sequence
+
+import sqlalchemy as sa
+from alembic import op
+
+revision: str = "202605170001"
+down_revision: str | None = "202605160002"
+branch_labels: str | Sequence[str] | None = None
+depends_on: str | Sequence[str] | None = None
+
+
+def upgrade() -> None:
+ op.create_table(
+ "admin_notifications",
+ sa.Column("id", sa.Integer(), nullable=False),
+ sa.Column("event_type", sa.String(length=80), nullable=False),
+ sa.Column("severity", sa.String(length=24), server_default="info", nullable=False),
+ sa.Column("title", sa.String(length=180), nullable=False),
+ sa.Column("body", sa.Text(), nullable=True),
+ sa.Column("entity_type", sa.String(length=80), nullable=True),
+ sa.Column("entity_id", sa.String(length=80), nullable=True),
+ sa.Column("status", sa.String(length=24), server_default="unread", nullable=False),
+ sa.Column("idempotency_key", sa.String(length=180), nullable=False),
+ sa.Column("metadata_json", sa.JSON(), nullable=True),
+ sa.Column("telegram_status", sa.String(length=24), server_default="pending", nullable=False),
+ sa.Column("telegram_error", sa.Text(), nullable=True),
+ sa.Column("read_at", sa.DateTime(timezone=True), nullable=True),
+ sa.Column("dismissed_at", sa.DateTime(timezone=True), nullable=True),
+ sa.Column("created_at", sa.DateTime(timezone=True), server_default=sa.func.now(), nullable=False),
+ sa.Column("updated_at", sa.DateTime(timezone=True), server_default=sa.func.now(), nullable=False),
+ sa.PrimaryKeyConstraint("id"),
+ )
+ op.create_index("ix_admin_notifications_created_at", "admin_notifications", ["created_at"])
+ op.create_index("ix_admin_notifications_entity_id", "admin_notifications", ["entity_id"])
+ op.create_index("ix_admin_notifications_entity_type", "admin_notifications", ["entity_type"])
+ op.create_index("ix_admin_notifications_event_type", "admin_notifications", ["event_type"])
+ op.create_index("ix_admin_notifications_idempotency_key", "admin_notifications", ["idempotency_key"], unique=True)
+ op.create_index("ix_admin_notifications_severity", "admin_notifications", ["severity"])
+ op.create_index("ix_admin_notifications_status", "admin_notifications", ["status"])
+ op.create_index("ix_admin_notifications_telegram_status", "admin_notifications", ["telegram_status"])
+
+ op.create_table(
+ "admin_export_jobs",
+ sa.Column("id", sa.Integer(), nullable=False),
+ sa.Column("requested_by_user_id", sa.Integer(), nullable=True),
+ sa.Column("source", sa.String(length=80), nullable=False),
+ sa.Column("export_format", sa.String(length=16), server_default="json", nullable=False),
+ sa.Column("status", sa.String(length=24), server_default="ready", nullable=False),
+ sa.Column("reason", sa.Text(), nullable=True),
+ sa.Column("filters_json", sa.JSON(), nullable=True),
+ sa.Column("result_text", sa.Text(), nullable=True),
+ sa.Column("row_count", sa.Integer(), server_default="0", nullable=False),
+ sa.Column("expires_at", sa.DateTime(timezone=True), nullable=True),
+ sa.Column("created_at", sa.DateTime(timezone=True), server_default=sa.func.now(), nullable=False),
+ sa.ForeignKeyConstraint(["requested_by_user_id"], ["users.id"], ondelete="SET NULL"),
+ sa.PrimaryKeyConstraint("id"),
+ )
+ op.create_index("ix_admin_export_jobs_created_at", "admin_export_jobs", ["created_at"])
+ op.create_index("ix_admin_export_jobs_requested_by_user_id", "admin_export_jobs", ["requested_by_user_id"])
+ op.create_index("ix_admin_export_jobs_source", "admin_export_jobs", ["source"])
+ op.create_index("ix_admin_export_jobs_status", "admin_export_jobs", ["status"])
+
+
+def downgrade() -> None:
+ op.drop_index("ix_admin_export_jobs_status", table_name="admin_export_jobs")
+ op.drop_index("ix_admin_export_jobs_source", table_name="admin_export_jobs")
+ op.drop_index("ix_admin_export_jobs_requested_by_user_id", table_name="admin_export_jobs")
+ op.drop_index("ix_admin_export_jobs_created_at", table_name="admin_export_jobs")
+ op.drop_table("admin_export_jobs")
+ op.drop_index("ix_admin_notifications_telegram_status", table_name="admin_notifications")
+ op.drop_index("ix_admin_notifications_status", table_name="admin_notifications")
+ op.drop_index("ix_admin_notifications_severity", table_name="admin_notifications")
+ op.drop_index("ix_admin_notifications_idempotency_key", table_name="admin_notifications")
+ op.drop_index("ix_admin_notifications_event_type", table_name="admin_notifications")
+ op.drop_index("ix_admin_notifications_entity_type", table_name="admin_notifications")
+ op.drop_index("ix_admin_notifications_entity_id", table_name="admin_notifications")
+ op.drop_index("ix_admin_notifications_created_at", table_name="admin_notifications")
+ op.drop_table("admin_notifications")
diff --git a/app/api/admin.py b/app/api/admin.py
index 8bb15ba..96c696b 100644
--- a/app/api/admin.py
+++ b/app/api/admin.py
@@ -1,27 +1,826 @@
-from datetime import UTC, datetime
+import csv
+import io
+import json
+from datetime import UTC, date, datetime, timedelta
+from decimal import Decimal
+from typing import Any
from fastapi import APIRouter, Depends, HTTPException
-from sqlalchemy import select
+from fastapi.encoders import jsonable_encoder
+from pydantic import BaseModel, ConfigDict, Field
+from sqlalchemy import Select, func, or_, select
from sqlalchemy.ext.asyncio import AsyncSession
from app.api.deps import get_current_telegram_user, log_audit, require_platform_role
from app.db.session import get_session
from app.models.car import (
+ AdminExportJob,
+ AdminNotification,
AuditLog,
+ Car,
+ CarServiceLink,
+ ServiceAppointment,
ServiceCenter,
+ ServiceCenterReview,
ServiceCenterVerification,
ServiceEmployee,
+ ServiceNotification,
+ ServiceProductItem,
ServiceVisit,
+ ServiceWorkItem,
)
+from app.models.expense import ExpenseEntry, FuelEntry, ServiceEntry
from app.models.user import User
from app.schemas.service_center import AdminModerationDecision, ServiceCenterRead, ServiceVisitRead
+from app.services.admin_notifications import (
+ create_admin_notification,
+ dismiss_admin_notification,
+ mark_admin_notification_read,
+)
from app.services.notifications import notify_user
router = APIRouter(prefix="/admin", tags=["admin"])
+ADMIN_ROLES = {"admin", "super_admin", "moderator", "support", "analyst"}
+FULL_ADMIN_ROLES = {"admin", "super_admin"}
+MODERATION_ROLES = {"admin", "super_admin", "moderator"}
+DATA_EXPORT_ROLES = {"admin", "super_admin", "analyst"}
+
+
+class AdminDataQuery(BaseModel):
+ model_config = ConfigDict(extra="forbid")
+
+ source: str
+ date_from: date | None = None
+ date_to: date | None = None
+ status: str | None = None
+ user_id: int | None = None
+ telegram_id: int | None = None
+ vehicle_id: int | None = None
+ sto_id: int | None = None
+ city: str | None = None
+ role: str | None = None
+ category: str | None = None
+ amount_min: Decimal | None = None
+ amount_max: Decimal | None = None
+ has_errors: bool | None = None
+ is_pending: bool | None = None
+ is_completed: bool | None = None
+ search: str | None = None
+ sort: str = "created_at_desc"
+ limit: int = Field(default=50, ge=1, le=500)
+ offset: int = Field(default=0, ge=0)
+ include_sensitive: bool = False
+ reason: str | None = None
+
+
+class AdminExportRequest(AdminDataQuery):
+ export_format: str = "json"
+
+
+class AdminUserNote(BaseModel):
+ note: str
+
+
+DATA_SOURCES: dict[str, dict[str, Any]] = {
+ "users": {
+ "model": User,
+ "roles": ADMIN_ROLES,
+ "search": ["username", "first_name", "last_name"],
+ "filters": {"user_id": "id", "telegram_id": "telegram_id", "role": "platform_role"},
+ "sensitive": {"telegram_id"},
+ "columns": ["id", "telegram_id", "username", "first_name", "last_name", "platform_role", "created_at", "updated_at"],
+ },
+ "vehicles": {
+ "model": Car,
+ "roles": ADMIN_ROLES,
+ "search": ["name", "make", "model", "vin", "plate_number", "license_plate_display"],
+ "filters": {"user_id": "owner_id", "vehicle_id": "id"},
+ "sensitive": {"vin", "plate_number", "license_plate_display", "vin_normalized", "license_plate_normalized"},
+ "columns": ["id", "owner_id", "name", "make", "model", "year", "vin", "plate_number", "license_plate_display", "current_odometer", "created_at"],
+ },
+ "fuel_entries": {
+ "model": FuelEntry,
+ "roles": FULL_ADMIN_ROLES | {"analyst"},
+ "filters": {"vehicle_id": "car_id"},
+ "amount": "total_cost",
+ "date": "entry_date",
+ "columns": ["id", "car_id", "entry_date", "odometer", "liters", "total_cost", "created_at"],
+ },
+ "service_entries": {
+ "model": ServiceEntry,
+ "roles": FULL_ADMIN_ROLES | {"analyst", "support"},
+ "search": ["title", "vendor", "category"],
+ "filters": {"vehicle_id": "car_id", "category": "category"},
+ "amount": "total_cost",
+ "date": "entry_date",
+ "columns": ["id", "car_id", "entry_date", "service_type", "title", "total_cost", "created_at"],
+ },
+ "expense_entries": {
+ "model": ExpenseEntry,
+ "roles": FULL_ADMIN_ROLES | {"analyst"},
+ "search": ["title", "vendor"],
+ "filters": {"vehicle_id": "car_id", "category": "category"},
+ "amount": "total_cost",
+ "date": "entry_date",
+ "columns": ["id", "car_id", "entry_date", "category", "title", "total_cost", "currency", "created_at"],
+ },
+ "sto_profiles": {
+ "model": ServiceCenter,
+ "roles": ADMIN_ROLES,
+ "search": ["name", "display_name", "legal_name", "city"],
+ "filters": {"sto_id": "id", "city": "city", "status": "verification_status", "user_id": "owner_user_id"},
+ "sensitive": {"phone", "contact_phone", "business_registration_number"},
+ "columns": ["id", "display_name", "legal_name", "city", "phone", "verification_status", "owner_user_id", "created_at", "verified_at"],
+ },
+ "sto_applications": {
+ "model": ServiceCenterVerification,
+ "roles": MODERATION_ROLES,
+ "filters": {"sto_id": "service_center_id", "status": "status", "user_id": "reviewed_by"},
+ "columns": ["id", "service_center_id", "status", "reviewed_by", "reviewed_at", "created_at", "comment"],
+ },
+ "sto_employees": {
+ "model": ServiceEmployee,
+ "roles": MODERATION_ROLES | {"support"},
+ "filters": {"sto_id": "service_center_id", "user_id": "user_id", "role": "role", "status": "status"},
+ "columns": ["id", "service_center_id", "user_id", "role", "status", "created_at"],
+ },
+ "vehicle_sto_links": {
+ "model": CarServiceLink,
+ "roles": MODERATION_ROLES | {"support"},
+ "filters": {"vehicle_id": "car_id", "sto_id": "service_center_id", "status": "status"},
+ "columns": ["id", "car_id", "service_center_id", "access_level", "status", "created_at"],
+ },
+ "appointments": {
+ "model": ServiceAppointment,
+ "roles": ADMIN_ROLES,
+ "filters": {"vehicle_id": "vehicle_id", "sto_id": "service_center_id", "user_id": "owner_user_id", "status": "status"},
+ "columns": ["id", "service_center_id", "vehicle_id", "owner_user_id", "service_type", "service_name", "status", "requested_start_at", "created_at"],
+ },
+ "work_orders": {
+ "model": ServiceVisit,
+ "roles": ADMIN_ROLES,
+ "filters": {"vehicle_id": "vehicle_id", "sto_id": "service_center_id", "user_id": "owner_user_id", "status": "status"},
+ "amount": "final_total",
+ "columns": ["id", "service_center_id", "vehicle_id", "owner_user_id", "status", "final_total", "currency", "created_at", "completed_at"],
+ },
+ "work_order_items": {
+ "model": ServiceWorkItem,
+ "roles": FULL_ADMIN_ROLES | {"support"},
+ "filters": {"category": "category"},
+ "amount": "total",
+ "columns": ["id", "service_visit_id", "work_type", "title", "category", "quantity", "total", "created_at"],
+ },
+ "work_order_products": {
+ "model": ServiceProductItem,
+ "roles": FULL_ADMIN_ROLES | {"support"},
+ "filters": {"category": "category"},
+ "amount": "total",
+ "columns": ["id", "service_visit_id", "title", "category", "product_type", "quantity", "total", "created_at"],
+ },
+ "reviews": {
+ "model": ServiceCenterReview,
+ "roles": MODERATION_ROLES | {"support", "analyst"},
+ "filters": {"sto_id": "service_center_id", "user_id": "user_id", "status": "status"},
+ "columns": ["id", "service_center_id", "user_id", "rating", "status", "created_at"],
+ },
+ "notifications": {
+ "model": ServiceNotification,
+ "roles": FULL_ADMIN_ROLES | {"support"},
+ "filters": {"user_id": "recipient_user_id", "status": "status", "sto_id": "service_center_id"},
+ "columns": ["id", "recipient_user_id", "notification_type", "title", "status", "created_at", "sent_at"],
+ },
+ "admin_notifications": {
+ "model": AdminNotification,
+ "roles": ADMIN_ROLES,
+ "filters": {"status": "status"},
+ "columns": ["id", "event_type", "severity", "title", "entity_type", "entity_id", "status", "created_at"],
+ },
+ "audit_logs": {
+ "model": AuditLog,
+ "roles": FULL_ADMIN_ROLES | {"moderator", "support"},
+ "search": ["action", "target_type", "target_id"],
+ "filters": {"user_id": "actor_user_id", "role": "actor_role"},
+ "columns": ["id", "actor_user_id", "actor_role", "action", "target_type", "target_id", "ip", "created_at"],
+ },
+ "ocr_results": {"model": None, "roles": ADMIN_ROLES, "columns": []},
+ "imports_exports": {
+ "model": AdminExportJob,
+ "roles": DATA_EXPORT_ROLES,
+ "filters": {"user_id": "requested_by_user_id", "status": "status"},
+ "columns": ["id", "requested_by_user_id", "source", "export_format", "status", "row_count", "created_at"],
+ },
+}
+
def require_admin_or_verifier(user: User) -> None:
- require_platform_role(user, {"admin", "verifier", "moderator"})
+ require_platform_role(user, MODERATION_ROLES | {"verifier"})
+
+
+def require_admin_access(user: User, allowed: set[str] | None = None) -> None:
+ require_platform_role(user, allowed or ADMIN_ROLES)
+
+
+def mask_text(value: Any, visible: int = 4) -> Any:
+ if value is None:
+ return None
+ text = str(value)
+ if len(text) <= visible:
+ return "*" * len(text)
+ return f"{text[:2]}{'*' * max(len(text) - visible, 3)}{text[-2:]}"
+
+
+def can_view_sensitive(user: User, query: AdminDataQuery) -> bool:
+ if not query.include_sensitive:
+ return False
+ if user.platform_role not in FULL_ADMIN_ROLES:
+ raise HTTPException(status_code=403, detail="Sensitive data is restricted")
+ if not query.reason or len(query.reason.strip()) < 5:
+ raise HTTPException(status_code=400, detail="reason is required for sensitive data")
+ return True
+
+
+def public_columns(source: str) -> list[str]:
+ return list(DATA_SOURCES[source].get("columns") or [])
+
+
+def serialize_row(row: Any, source: str, *, include_sensitive: bool, role: str) -> dict[str, Any]:
+ config = DATA_SOURCES[source]
+ sensitive = set(config.get("sensitive") or set())
+ columns = public_columns(source)
+ payload: dict[str, Any] = {}
+ for column in columns:
+ value = getattr(row, column, None)
+ if role == "analyst" and column in {"telegram_id", "username", "first_name", "last_name", "owner_id", "user_id"}:
+ value = mask_text(value)
+ elif column in sensitive and not include_sensitive:
+ value = mask_text(value)
+ payload[column] = jsonable_encoder(value)
+ return payload
+
+
+def source_config(source: str, user: User) -> dict[str, Any]:
+ config = DATA_SOURCES.get(source)
+ if config is None or config.get("model") is None:
+ raise HTTPException(status_code=400, detail="Unsupported data source")
+ require_admin_access(user, set(config["roles"]))
+ return config
+
+
+def apply_data_filters(stmt: Select, query: AdminDataQuery, config: dict[str, Any]) -> Select:
+ model = config["model"]
+ date_column = config.get("date", "created_at")
+ if hasattr(model, date_column):
+ column = getattr(model, date_column)
+ if query.date_from:
+ stmt = stmt.where(column >= query.date_from)
+ if query.date_to:
+ stmt = stmt.where(column <= query.date_to)
+ for query_field, model_field in (config.get("filters") or {}).items():
+ value = getattr(query, query_field)
+ if value is not None and value != "" and hasattr(model, model_field):
+ stmt = stmt.where(getattr(model, model_field) == value)
+ if query.is_pending is True and hasattr(model, "status"):
+ stmt = stmt.where(model.status.in_(["pending", "requested", "unread"]))
+ if query.is_completed is True and hasattr(model, "status"):
+ stmt = stmt.where(model.status.in_(["completed", "closed", "confirmed"]))
+ amount_column = config.get("amount")
+ if amount_column and hasattr(model, amount_column):
+ column = getattr(model, amount_column)
+ if query.amount_min is not None:
+ stmt = stmt.where(column >= query.amount_min)
+ if query.amount_max is not None:
+ stmt = stmt.where(column <= query.amount_max)
+ if query.has_errors is not None:
+ if hasattr(model, "last_error"):
+ stmt = stmt.where(model.last_error.is_not(None) if query.has_errors else model.last_error.is_(None))
+ elif hasattr(model, "telegram_error"):
+ stmt = stmt.where(model.telegram_error.is_not(None) if query.has_errors else model.telegram_error.is_(None))
+ if query.search:
+ search_fields = [field for field in config.get("search", []) if hasattr(model, field)]
+ if search_fields:
+ pattern = f"%{query.search}%"
+ stmt = stmt.where(or_(*(getattr(model, field).ilike(pattern) for field in search_fields)))
+ return stmt
+
+
+def apply_sort(stmt: Select, query: AdminDataQuery, config: dict[str, Any]) -> Select:
+ model = config["model"]
+ sort_map = {
+ "created_at_desc": ("created_at", True),
+ "created_at_asc": ("created_at", False),
+ "updated_at_desc": ("updated_at", True),
+ "amount_desc": (config.get("amount") or "created_at", True),
+ "status": ("status", False),
+ "city": ("city", False),
+ }
+ field, desc = sort_map.get(query.sort, ("created_at", True))
+ if not hasattr(model, field):
+ field = "id"
+ column = getattr(model, field)
+ return stmt.order_by(column.desc() if desc else column.asc())
+
+
+async def run_data_query(
+ session: AsyncSession, current_user: User, query: AdminDataQuery
+) -> dict[str, Any]:
+ config = source_config(query.source, current_user)
+ include_sensitive = can_view_sensitive(current_user, query)
+ stmt = apply_sort(apply_data_filters(select(config["model"]), query, config), query, config)
+ result = await session.execute(stmt.limit(query.limit).offset(query.offset))
+ rows = [
+ serialize_row(item, query.source, include_sensitive=include_sensitive, role=current_user.platform_role)
+ for item in result.scalars()
+ ]
+ await log_audit(
+ session,
+ actor=current_user,
+ action="admin.data.query",
+ target_type=query.source,
+ metadata={
+ "filters": query.model_dump(mode="json", exclude={"reason"}),
+ "include_sensitive": include_sensitive,
+ "reason": query.reason if include_sensitive else None,
+ "rows": len(rows),
+ },
+ )
+ return {"source": query.source, "limit": query.limit, "offset": query.offset, "rows": rows}
+
+
+def csv_from_rows(rows: list[dict[str, Any]]) -> str:
+ output = io.StringIO()
+ if not rows:
+ return ""
+ writer = csv.DictWriter(output, fieldnames=list(rows[0].keys()))
+ writer.writeheader()
+ writer.writerows(rows)
+ return output.getvalue()
+
+
+@router.get("/dashboard")
+async def admin_dashboard(
+ session: AsyncSession = Depends(get_session),
+ current_user: User = Depends(get_current_telegram_user),
+) -> dict[str, Any]:
+ require_admin_access(current_user)
+ today = datetime.now(UTC).date()
+ week_ago = today - timedelta(days=6)
+
+ async def count(stmt: Select) -> int:
+ return int((await session.execute(stmt)).scalar_one() or 0)
+
+ users_total = await count(select(func.count(User.id)))
+ users_today = await count(select(func.count(User.id)).where(func.date(User.created_at) == today))
+ users_7d = await count(select(func.count(User.id)).where(func.date(User.created_at) >= week_ago))
+ vehicles_total = await count(select(func.count(Car.id)))
+ pending_sto = await count(select(func.count(ServiceCenter.id)).where(ServiceCenter.verification_status == "pending"))
+ approved_sto = await count(select(func.count(ServiceCenter.id)).where(ServiceCenter.verification_status.in_(["approved", "verified"])))
+ suspended_sto = await count(select(func.count(ServiceCenter.id)).where(ServiceCenter.verification_status == "suspended"))
+ appointments_today = await count(
+ select(func.count(ServiceAppointment.id)).where(func.date(ServiceAppointment.requested_start_at) == today)
+ )
+ active_work_orders = await count(
+ select(func.count(ServiceVisit.id)).where(ServiceVisit.status.in_(["draft", "awaiting_approval", "approved", "in_progress"]))
+ )
+ completed_work_orders = await count(select(func.count(ServiceVisit.id)).where(ServiceVisit.status == "completed"))
+ system_errors = await count(select(func.count(AdminNotification.id)).where(AdminNotification.severity.in_(["error", "critical"])))
+ security_events = await count(select(func.count(AdminNotification.id)).where(AdminNotification.event_type.in_(["security_event", "rate_limit_exceeded", "upload_blocked"])))
+ latest_alerts = (
+ await session.execute(select(AdminNotification).order_by(AdminNotification.created_at.desc()).limit(8))
+ ).scalars()
+ return {
+ "users_today": users_today,
+ "users_7d": users_7d,
+ "users_total": users_total,
+ "active_users": users_7d,
+ "vehicles_total": vehicles_total,
+ "new_sto_applications": pending_sto,
+ "pending_sto_applications": pending_sto,
+ "approved_sto": approved_sto,
+ "suspended_sto": suspended_sto,
+ "appointments_today": appointments_today,
+ "active_work_orders": active_work_orders,
+ "completed_work_orders": completed_work_orders,
+ "system_errors": system_errors,
+ "security_events": security_events,
+ "latest_alerts": [serialize_row(item, "admin_notifications", include_sensitive=False, role=current_user.platform_role) for item in latest_alerts],
+ }
+
+
+@router.get("/notifications")
+async def admin_notifications(
+ status: str | None = None,
+ limit: int = 50,
+ offset: int = 0,
+ session: AsyncSession = Depends(get_session),
+ current_user: User = Depends(get_current_telegram_user),
+) -> dict[str, Any]:
+ require_admin_access(current_user)
+ limit = min(max(limit, 1), 200)
+ stmt = select(AdminNotification)
+ if status:
+ stmt = stmt.where(AdminNotification.status == status)
+ rows = (
+ await session.execute(stmt.order_by(AdminNotification.created_at.desc()).limit(limit).offset(max(offset, 0)))
+ ).scalars()
+ return {"rows": [jsonable_encoder(item) for item in rows], "limit": limit, "offset": offset}
+
+
+@router.post("/notifications/{notification_id}/read")
+async def read_admin_notification(
+ notification_id: int,
+ session: AsyncSession = Depends(get_session),
+ current_user: User = Depends(get_current_telegram_user),
+) -> dict[str, Any]:
+ require_admin_access(current_user)
+ notification = await session.get(AdminNotification, notification_id)
+ if notification is None:
+ raise HTTPException(status_code=404, detail="Admin notification not found")
+ await mark_admin_notification_read(session, notification)
+ await log_audit(session, actor=current_user, action="admin_notification.read", target_type="admin_notification", target_id=notification_id)
+ await session.commit()
+ return jsonable_encoder(notification)
+
+
+@router.post("/notifications/read-all")
+async def read_all_admin_notifications(
+ session: AsyncSession = Depends(get_session),
+ current_user: User = Depends(get_current_telegram_user),
+) -> dict[str, Any]:
+ require_admin_access(current_user)
+ rows = (await session.execute(select(AdminNotification).where(AdminNotification.status == "unread"))).scalars().all()
+ for notification in rows:
+ await mark_admin_notification_read(session, notification)
+ await log_audit(session, actor=current_user, action="admin_notification.read_all", target_type="admin_notification", metadata={"count": len(rows)})
+ await session.commit()
+ return {"updated": len(rows)}
+
+
+@router.post("/notifications/{notification_id}/dismiss")
+async def dismiss_notification(
+ notification_id: int,
+ session: AsyncSession = Depends(get_session),
+ current_user: User = Depends(get_current_telegram_user),
+) -> dict[str, Any]:
+ require_admin_access(current_user)
+ notification = await session.get(AdminNotification, notification_id)
+ if notification is None:
+ raise HTTPException(status_code=404, detail="Admin notification not found")
+ await dismiss_admin_notification(session, notification)
+ await log_audit(session, actor=current_user, action="admin_notification.dismiss", target_type="admin_notification", target_id=notification_id)
+ await session.commit()
+ return jsonable_encoder(notification)
+
+
+@router.get("/data/sources")
+async def admin_data_sources(current_user: User = Depends(get_current_telegram_user)) -> dict[str, Any]:
+ require_admin_access(current_user)
+ return {
+ "sources": [
+ {
+ "name": name,
+ "available": bool(config.get("model")),
+ "columns": config.get("columns") or [],
+ "sensitive": sorted(config.get("sensitive") or []),
+ "allowed": current_user.platform_role in set(config["roles"]),
+ }
+ for name, config in DATA_SOURCES.items()
+ ],
+ "sorts": ["created_at_desc", "created_at_asc", "updated_at_desc", "amount_desc", "status", "city"],
+ "limits": [25, 50, 100, 500],
+ }
+
+
+@router.post("/data/query")
+async def admin_data_query(
+ payload: AdminDataQuery,
+ session: AsyncSession = Depends(get_session),
+ current_user: User = Depends(get_current_telegram_user),
+) -> dict[str, Any]:
+ data = await run_data_query(session, current_user, payload)
+ await session.commit()
+ return data
+
+
+@router.post("/data/export")
+async def admin_data_export(
+ payload: AdminExportRequest,
+ session: AsyncSession = Depends(get_session),
+ current_user: User = Depends(get_current_telegram_user),
+) -> dict[str, Any]:
+ require_admin_access(current_user, DATA_EXPORT_ROLES)
+ if payload.export_format not in {"json", "csv"}:
+ raise HTTPException(status_code=400, detail="Unsupported export format")
+ if payload.source in {"users", "vehicles", "sto_profiles"} and not payload.reason:
+ raise HTTPException(status_code=400, detail="Export reason is required")
+ data = await run_data_query(session, current_user, payload)
+ content = json.dumps(data["rows"], ensure_ascii=False, default=str, indent=2)
+ if payload.export_format == "csv":
+ content = csv_from_rows(data["rows"])
+ job = AdminExportJob(
+ requested_by_user_id=current_user.id,
+ source=payload.source,
+ export_format=payload.export_format,
+ status="ready",
+ reason=payload.reason,
+ filters_json=payload.model_dump(mode="json", exclude={"reason"}),
+ result_text=content,
+ row_count=len(data["rows"]),
+ expires_at=datetime.now(UTC) + timedelta(days=7),
+ )
+ session.add(job)
+ await log_audit(session, actor=current_user, action="admin.data.export", target_type=payload.source, metadata={"format": payload.export_format, "rows": len(data["rows"])})
+ await session.commit()
+ await session.refresh(job)
+ return {"id": job.id, "status": job.status, "row_count": job.row_count, "expires_at": job.expires_at}
+
+
+@router.get("/exports")
+async def admin_exports(
+ session: AsyncSession = Depends(get_session),
+ current_user: User = Depends(get_current_telegram_user),
+) -> dict[str, Any]:
+ require_admin_access(current_user, DATA_EXPORT_ROLES)
+ rows = (await session.execute(select(AdminExportJob).order_by(AdminExportJob.created_at.desc()).limit(50))).scalars()
+ return {
+ "rows": [
+ {
+ "id": item.id,
+ "requested_by_user_id": item.requested_by_user_id,
+ "source": item.source,
+ "export_format": item.export_format,
+ "status": item.status,
+ "row_count": item.row_count,
+ "reason": item.reason,
+ "expires_at": item.expires_at,
+ "created_at": item.created_at,
+ }
+ for item in rows
+ ]
+ }
+
+
+@router.get("/exports/{export_id}")
+async def admin_export_detail(
+ export_id: int,
+ session: AsyncSession = Depends(get_session),
+ current_user: User = Depends(get_current_telegram_user),
+) -> dict[str, Any]:
+ require_admin_access(current_user, DATA_EXPORT_ROLES)
+ job = await session.get(AdminExportJob, export_id)
+ if job is None:
+ raise HTTPException(status_code=404, detail="Export not found")
+ await log_audit(session, actor=current_user, action="admin.export.view", target_type="admin_export", target_id=export_id)
+ await session.commit()
+ return jsonable_encoder(job)
+
+
+@router.get("/users")
+async def admin_users(
+ search: str | None = None,
+ limit: int = 50,
+ offset: int = 0,
+ session: AsyncSession = Depends(get_session),
+ current_user: User = Depends(get_current_telegram_user),
+) -> dict[str, Any]:
+ query = AdminDataQuery(source="users", search=search, limit=min(limit, 100), offset=offset)
+ data = await run_data_query(session, current_user, query)
+ await session.commit()
+ return data
+
+
+@router.get("/users/{user_id}")
+async def admin_user_detail(
+ user_id: int,
+ session: AsyncSession = Depends(get_session),
+ current_user: User = Depends(get_current_telegram_user),
+) -> dict[str, Any]:
+ require_admin_access(current_user, ADMIN_ROLES - {"analyst"})
+ user = await session.get(User, user_id)
+ if user is None:
+ raise HTTPException(status_code=404, detail="User not found")
+ cars_count = int((await session.execute(select(func.count(Car.id)).where(Car.owner_id == user.id))).scalar_one() or 0)
+ fuel_count = int(
+ (
+ await session.execute(
+ select(func.count(FuelEntry.id)).join(Car, FuelEntry.car_id == Car.id).where(Car.owner_id == user.id)
+ )
+ ).scalar_one()
+ or 0
+ )
+ appointments_count = int((await session.execute(select(func.count(ServiceAppointment.id)).where(ServiceAppointment.owner_user_id == user.id))).scalar_one() or 0)
+ await log_audit(session, actor=current_user, action="admin.user.view", target_type="user", target_id=user.id)
+ await session.commit()
+ return {
+ **serialize_row(user, "users", include_sensitive=current_user.platform_role in FULL_ADMIN_ROLES, role=current_user.platform_role),
+ "cars_count": cars_count,
+ "records_count": fuel_count,
+ "appointments_count": appointments_count,
+ }
+
+
+@router.get("/users/{user_id}/activity")
+async def admin_user_activity(
+ user_id: int,
+ limit: int = 50,
+ session: AsyncSession = Depends(get_session),
+ current_user: User = Depends(get_current_telegram_user),
+) -> dict[str, Any]:
+ require_admin_access(current_user, ADMIN_ROLES - {"analyst"})
+ rows = (
+ await session.execute(
+ select(AuditLog)
+ .where(AuditLog.actor_user_id == user_id)
+ .order_by(AuditLog.created_at.desc())
+ .limit(min(max(limit, 1), 100))
+ )
+ ).scalars()
+ await log_audit(session, actor=current_user, action="admin.user.activity", target_type="user", target_id=user_id)
+ await session.commit()
+ return {"rows": [jsonable_encoder(item) for item in rows]}
+
+
+@router.post("/users/{user_id}/note")
+async def admin_user_note(
+ user_id: int,
+ payload: AdminUserNote,
+ session: AsyncSession = Depends(get_session),
+ current_user: User = Depends(get_current_telegram_user),
+) -> dict[str, str]:
+ require_admin_access(current_user, {"admin", "super_admin", "support"})
+ await log_audit(session, actor=current_user, action="admin.user.note", target_type="user", target_id=user_id, metadata={"note": payload.note})
+ await session.commit()
+ return {"status": "saved"}
+
+
+@router.post("/users/{user_id}/block")
+async def admin_block_user(
+ user_id: int,
+ payload: AdminUserNote | None = None,
+ session: AsyncSession = Depends(get_session),
+ current_user: User = Depends(get_current_telegram_user),
+) -> dict[str, Any]:
+ require_admin_access(current_user, FULL_ADMIN_ROLES)
+ user = await session.get(User, user_id)
+ if user is None:
+ raise HTTPException(status_code=404, detail="User not found")
+ user.platform_role = "blocked"
+ await log_audit(session, actor=current_user, action="admin.user.block", target_type="user", target_id=user_id, metadata={"reason": payload.note if payload else None})
+ await session.commit()
+ return {"id": user.id, "platform_role": user.platform_role}
+
+
+@router.post("/users/{user_id}/unblock")
+async def admin_unblock_user(
+ user_id: int,
+ payload: AdminUserNote | None = None,
+ session: AsyncSession = Depends(get_session),
+ current_user: User = Depends(get_current_telegram_user),
+) -> dict[str, Any]:
+ require_admin_access(current_user, FULL_ADMIN_ROLES)
+ user = await session.get(User, user_id)
+ if user is None:
+ raise HTTPException(status_code=404, detail="User not found")
+ user.platform_role = "user"
+ await log_audit(session, actor=current_user, action="admin.user.unblock", target_type="user", target_id=user_id, metadata={"reason": payload.note if payload else None})
+ await session.commit()
+ return {"id": user.id, "platform_role": user.platform_role}
+
+
+@router.get("/sto")
+async def admin_sto(
+ status: str | None = None,
+ city: str | None = None,
+ limit: int = 50,
+ offset: int = 0,
+ session: AsyncSession = Depends(get_session),
+ current_user: User = Depends(get_current_telegram_user),
+) -> dict[str, Any]:
+ query = AdminDataQuery(source="sto_profiles", status=status, city=city, limit=min(limit, 100), offset=offset)
+ data = await run_data_query(session, current_user, query)
+ await session.commit()
+ return data
+
+
+@router.get("/sto/{service_center_id}")
+async def admin_sto_detail(
+ service_center_id: int,
+ session: AsyncSession = Depends(get_session),
+ current_user: User = Depends(get_current_telegram_user),
+) -> dict[str, Any]:
+ require_admin_access(current_user, ADMIN_ROLES - {"analyst"})
+ center = await session.get(ServiceCenter, service_center_id)
+ if center is None:
+ raise HTTPException(status_code=404, detail="Service center not found")
+ employees_count = int((await session.execute(select(func.count(ServiceEmployee.id)).where(ServiceEmployee.service_center_id == center.id))).scalar_one() or 0)
+ appointments_count = int((await session.execute(select(func.count(ServiceAppointment.id)).where(ServiceAppointment.service_center_id == center.id))).scalar_one() or 0)
+ work_orders_count = int((await session.execute(select(func.count(ServiceVisit.id)).where(ServiceVisit.service_center_id == center.id))).scalar_one() or 0)
+ await log_audit(session, actor=current_user, action="admin.sto.view", target_type="service_center", target_id=center.id)
+ await session.commit()
+ return {
+ **serialize_row(center, "sto_profiles", include_sensitive=current_user.platform_role in FULL_ADMIN_ROLES, role=current_user.platform_role),
+ "employees_count": employees_count,
+ "appointments_count": appointments_count,
+ "work_orders_count": work_orders_count,
+ }
+
+
+@router.get("/sto-applications")
+async def admin_sto_applications(
+ status: str | None = None,
+ city: str | None = None,
+ limit: int = 50,
+ offset: int = 0,
+ session: AsyncSession = Depends(get_session),
+ current_user: User = Depends(get_current_telegram_user),
+) -> dict[str, Any]:
+ require_admin_access(current_user, MODERATION_ROLES)
+ stmt = select(ServiceCenter).where(ServiceCenter.verification_status.in_(["pending", "needs_changes"] if status is None else [status]))
+ if city:
+ stmt = stmt.where(ServiceCenter.city == city)
+ rows = (await session.execute(stmt.order_by(ServiceCenter.created_at.asc()).limit(min(limit, 100)).offset(offset))).scalars()
+ return {"rows": [serialize_row(item, "sto_profiles", include_sensitive=False, role=current_user.platform_role) for item in rows]}
+
+
+async def center_id_from_application(session: AsyncSession, application_id: int) -> int:
+ verification = await session.get(ServiceCenterVerification, application_id)
+ if verification is None:
+ center = await session.get(ServiceCenter, application_id)
+ if center is None:
+ raise HTTPException(status_code=404, detail="STO application not found")
+ return center.id
+ return verification.service_center_id
+
+
+@router.post("/sto-applications/{application_id}/approve", response_model=ServiceCenterRead)
+async def approve_sto_application(
+ application_id: int,
+ payload: AdminModerationDecision | None = None,
+ session: AsyncSession = Depends(get_session),
+ current_user: User = Depends(get_current_telegram_user),
+) -> ServiceCenter:
+ center_id = await center_id_from_application(session, application_id)
+ return await verify_service_center(center_id, payload, session, current_user)
+
+
+@router.post("/sto-applications/{application_id}/reject", response_model=ServiceCenterRead)
+async def reject_sto_application(
+ application_id: int,
+ payload: AdminModerationDecision | None = None,
+ session: AsyncSession = Depends(get_session),
+ current_user: User = Depends(get_current_telegram_user),
+) -> ServiceCenter:
+ center_id = await center_id_from_application(session, application_id)
+ return await reject_service_center(center_id, payload, session, current_user)
+
+
+@router.post("/sto-applications/{application_id}/request-changes", response_model=ServiceCenterRead)
+async def request_sto_application_changes(
+ application_id: int,
+ payload: AdminModerationDecision,
+ session: AsyncSession = Depends(get_session),
+ current_user: User = Depends(get_current_telegram_user),
+) -> ServiceCenter:
+ center_id = await center_id_from_application(session, application_id)
+ return await request_service_center_changes(center_id, payload, session, current_user)
+
+
+@router.post("/sto/{service_center_id}/suspend", response_model=ServiceCenterRead)
+async def suspend_sto(
+ service_center_id: int,
+ payload: AdminModerationDecision | None = None,
+ session: AsyncSession = Depends(get_session),
+ current_user: User = Depends(get_current_telegram_user),
+) -> ServiceCenter:
+ return await suspend_service_center(service_center_id, payload, session, current_user)
+
+
+@router.post("/sto/{service_center_id}/unsuspend", response_model=ServiceCenterRead)
+async def unsuspend_sto(
+ service_center_id: int,
+ payload: AdminModerationDecision | None = None,
+ session: AsyncSession = Depends(get_session),
+ current_user: User = Depends(get_current_telegram_user),
+) -> ServiceCenter:
+ require_admin_access(current_user, FULL_ADMIN_ROLES)
+ center = await session.get(ServiceCenter, service_center_id)
+ if center is None:
+ raise HTTPException(status_code=404, detail="Service center not found")
+ center.verification_status = "approved"
+ center.suspended_at = None
+ await create_admin_notification(
+ session,
+ event_type="sto_approved",
+ title="СТО разблокировано",
+ body=center.display_name or center.name,
+ entity_type="service_center",
+ entity_id=center.id,
+ idempotency_key=f"sto_unsuspended:{center.id}:{datetime.now(UTC).isoformat()}",
+ )
+ await log_audit(session, actor=current_user, action="service_center.unsuspend", target_type="service_center", target_id=center.id, metadata={"reason": payload.reason if payload else None})
+ await session.commit()
+ await session.refresh(center)
+ return center
@router.get("/service-centers/pending", response_model=list[ServiceCenterRead])
@@ -79,6 +878,15 @@ async def verify_service_center(
target_id=center.id,
metadata={"comment": payload.comment if payload else None},
)
+ await create_admin_notification(
+ session,
+ event_type="sto_approved",
+ title="СТО одобрено",
+ body=center.display_name or center.name,
+ entity_type="service_center",
+ entity_id=center.id,
+ idempotency_key=f"sto_approved:{center.id}:{center.verified_at.isoformat() if center.verified_at else 'now'}",
+ )
await session.commit()
await session.refresh(center)
return center
@@ -110,6 +918,15 @@ async def reject_service_center(
target_id=center.id,
metadata={"reason": payload.reason if payload else None, "comment": payload.comment if payload else None},
)
+ await create_admin_notification(
+ session,
+ event_type="sto_application_updated",
+ title="Заявка СТО отклонена",
+ body=center.display_name or center.name,
+ entity_type="service_center",
+ entity_id=center.id,
+ idempotency_key=f"sto_rejected:{center.id}:{datetime.now(UTC).isoformat()}",
+ )
await session.commit()
await session.refresh(center)
return center
@@ -122,7 +939,7 @@ async def suspend_service_center(
session: AsyncSession = Depends(get_session),
current_user: User = Depends(get_current_telegram_user),
) -> ServiceCenter:
- require_platform_role(current_user, {"admin"})
+ require_admin_access(current_user, FULL_ADMIN_ROLES)
center = await session.get(ServiceCenter, service_center_id)
if center is None:
raise HTTPException(status_code=404, detail="Service center not found")
@@ -141,6 +958,16 @@ async def suspend_service_center(
target_id=center.id,
metadata={"reason": payload.reason if payload else None, "comment": payload.comment if payload else None},
)
+ await create_admin_notification(
+ session,
+ event_type="sto_suspended",
+ title="СТО заблокировано",
+ body=center.display_name or center.name,
+ entity_type="service_center",
+ entity_id=center.id,
+ severity="warning",
+ idempotency_key=f"sto_suspended:{center.id}:{center.suspended_at.isoformat() if center.suspended_at else 'now'}",
+ )
await session.commit()
await session.refresh(center)
return center
@@ -172,6 +999,15 @@ async def request_service_center_changes(
target_id=center.id,
metadata={"reason": payload.reason, "comment": payload.comment},
)
+ await create_admin_notification(
+ session,
+ event_type="sto_application_updated",
+ title="По заявке СТО запрошены правки",
+ body=center.display_name or center.name,
+ entity_type="service_center",
+ entity_id=center.id,
+ idempotency_key=f"sto_changes:{center.id}:{datetime.now(UTC).isoformat()}",
+ )
await session.commit()
await session.refresh(center)
return center
@@ -179,16 +1015,53 @@ async def request_service_center_changes(
@router.get("/audit-log")
async def audit_log(
+ actor_id: int | None = None,
+ actor_role: str | None = None,
+ action: str | None = None,
+ entity_type: str | None = None,
+ entity_id: str | None = None,
+ date_from: date | None = None,
+ date_to: date | None = None,
+ severity: str | None = None,
+ ip: str | None = None,
+ user_agent: str | None = None,
limit: int = 100,
offset: int = 0,
session: AsyncSession = Depends(get_session),
current_user: User = Depends(get_current_telegram_user),
) -> list[dict]:
- require_platform_role(current_user, {"admin", "verifier", "moderator"})
+ require_admin_access(current_user, FULL_ADMIN_ROLES | {"moderator", "support"})
limit = min(max(limit, 1), 200)
- result = await session.execute(
- select(AuditLog).order_by(AuditLog.created_at.desc()).limit(limit).offset(max(offset, 0))
+ stmt = select(AuditLog)
+ if actor_id is not None:
+ stmt = stmt.where(AuditLog.actor_user_id == actor_id)
+ if actor_role:
+ stmt = stmt.where(AuditLog.actor_role == actor_role)
+ if action:
+ stmt = stmt.where(AuditLog.action.ilike(f"%{action}%"))
+ if entity_type:
+ stmt = stmt.where(AuditLog.target_type == entity_type)
+ if entity_id:
+ stmt = stmt.where(AuditLog.target_id == entity_id)
+ if date_from:
+ stmt = stmt.where(func.date(AuditLog.created_at) >= date_from)
+ if date_to:
+ stmt = stmt.where(func.date(AuditLog.created_at) <= date_to)
+ if ip:
+ stmt = stmt.where(AuditLog.ip == ip)
+ if user_agent:
+ stmt = stmt.where(AuditLog.user_agent.ilike(f"%{user_agent}%"))
+ if severity:
+ stmt = stmt.where(AuditLog.metadata_json["severity"].as_string() == severity)
+ result = await session.execute(stmt.order_by(AuditLog.created_at.desc()).limit(limit).offset(max(offset, 0)))
+ await log_audit(
+ session,
+ actor=current_user,
+ action="admin.audit_log.view",
+ target_type="audit_log",
+ metadata={"limit": limit, "offset": offset, "filter_action": action},
)
+ await session.commit()
return [
{
"id": item.id,
diff --git a/app/api/deps.py b/app/api/deps.py
index ff68f56..b94b500 100644
--- a/app/api/deps.py
+++ b/app/api/deps.py
@@ -8,6 +8,7 @@ from app.core.config import settings
from app.db.session import get_session
from app.models.car import AuditLog, Car, ServiceCenter, ServiceEmployee, VehicleAccess
from app.models.user import User
+from app.services.admin_notifications import create_admin_notification
from app.services.telegram_auth import verify_webapp_init_data
@@ -36,6 +37,24 @@ async def get_or_create_telegram_user(
if user is None:
user = User(**{key: value for key, value in payload.items() if value is not None})
session.add(user)
+ await session.flush()
+ await create_admin_notification(
+ session,
+ event_type="user_registered",
+ title="Новый пользователь",
+ body="\n".join(
+ item
+ for item in [
+ f"Имя: {' '.join(part for part in [first_name, last_name] if part) or '-'}",
+ f"Telegram ID: {telegram_id}",
+ f"Username: @{username}" if username else "Username: -",
+ ]
+ ),
+ entity_type="user",
+ entity_id=user.id,
+ idempotency_key=f"user_registered:{telegram_id}",
+ metadata={"telegram_id": telegram_id, "username": username},
+ )
else:
for field, value in payload.items():
if value is not None:
diff --git a/app/api/my.py b/app/api/my.py
index 0d56b3e..eb807a5 100644
--- a/app/api/my.py
+++ b/app/api/my.py
@@ -27,6 +27,7 @@ from app.schemas.service_center import (
VehicleUpdate,
)
from app.schemas.user import UserRead
+from app.services.admin_notifications import create_admin_notification
from app.services.odometer import (
add_odometer_history,
recalculate_current_odometer,
@@ -381,6 +382,20 @@ async def create_vehicle(
changed_by=current_user.id,
)
await log_audit(session, actor=current_user, action="vehicle.create", target_type="vehicle", target_id=car.id)
+ vehicle_count = (
+ await session.execute(select(Car.id).where(Car.owner_id == current_user.id).limit(2))
+ ).scalars().all()
+ if len(vehicle_count) == 1:
+ await create_admin_notification(
+ session,
+ event_type="vehicle_created",
+ title="Пользователь впервые добавил авто",
+ body=f"{current_user.first_name or current_user.username or current_user.telegram_id}: {car.name}",
+ entity_type="vehicle",
+ entity_id=car.id,
+ idempotency_key=f"first_vehicle:{current_user.id}",
+ metadata={"user_id": current_user.id, "vehicle_id": car.id},
+ )
await session.commit()
await session.refresh(car)
return car
diff --git a/app/api/service_centers.py b/app/api/service_centers.py
index 584c202..898290d 100644
--- a/app/api/service_centers.py
+++ b/app/api/service_centers.py
@@ -48,6 +48,7 @@ from app.schemas.service_center import (
VehicleSearchRequest,
VehicleSearchResult,
)
+from app.services.admin_notifications import create_admin_notification
from app.services.notifications import notify_platform_moderators
from app.services.odometer import validate_odometer_change
from app.services.rate_limit import check_rate_limit
@@ -147,6 +148,25 @@ async def create_service_center(
target_type="service_center",
target_id=center.id,
)
+ await create_admin_notification(
+ session,
+ event_type="sto_application_created",
+ title="Новая заявка СТО",
+ body="\n".join(
+ item
+ for item in [
+ f"Название: {center.display_name or center.name}",
+ f"Город: {center.city or '-'}",
+ f"Телефон: {center.phone or center.contact_phone or '-'}",
+ f"Документы: {len(center.document_photo_urls or [])}",
+ "Статус: pending",
+ ]
+ ),
+ entity_type="service_center",
+ entity_id=center.id,
+ idempotency_key=f"sto_application_created:{center.id}",
+ metadata={"city": center.city, "owner_user_id": current_user.id},
+ )
await session.commit()
await session.refresh(center)
await notify_platform_moderators(
diff --git a/app/core/config.py b/app/core/config.py
index 4e12116..7f8edec 100644
--- a/app/core/config.py
+++ b/app/core/config.py
@@ -24,6 +24,11 @@ class Settings(BaseSettings):
ocr_languages: str = "eng+rus+kor"
admin_telegram_ids: str = ""
admin_bootstrap_token: str = ""
+ admin_notification_chat_id: str = ""
+ admin_notify_new_users: bool = True
+ admin_notify_sto_applications: bool = True
+ admin_notify_security_events: bool = True
+ admin_notify_system_errors: bool = True
model_config = SettingsConfigDict(env_file=".env", env_file_encoding="utf-8", extra="ignore")
diff --git a/app/models/car.py b/app/models/car.py
index b4098d2..7e09f9b 100644
--- a/app/models/car.py
+++ b/app/models/car.py
@@ -432,6 +432,29 @@ class ServiceNotification(Base):
created_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), server_default=func.now(), index=True)
+class AdminNotification(Base):
+ __tablename__ = "admin_notifications"
+
+ id: Mapped[int] = mapped_column(primary_key=True)
+ event_type: Mapped[str] = mapped_column(String(80), index=True)
+ severity: Mapped[str] = mapped_column(String(24), default="info", server_default="info", index=True)
+ title: Mapped[str] = mapped_column(String(180))
+ body: Mapped[str | None] = mapped_column(Text)
+ entity_type: Mapped[str | None] = mapped_column(String(80), index=True)
+ entity_id: Mapped[str | None] = mapped_column(String(80), index=True)
+ status: Mapped[str] = mapped_column(String(24), default="unread", server_default="unread", index=True)
+ idempotency_key: Mapped[str] = mapped_column(String(180), unique=True, index=True)
+ metadata_json: Mapped[dict | None] = mapped_column(JSON)
+ telegram_status: Mapped[str] = mapped_column(String(24), default="pending", server_default="pending", index=True)
+ telegram_error: Mapped[str | None] = mapped_column(Text)
+ read_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True))
+ dismissed_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True))
+ created_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), server_default=func.now(), index=True)
+ updated_at: Mapped[datetime] = mapped_column(
+ DateTime(timezone=True), server_default=func.now(), onupdate=func.now()
+ )
+
+
class ServiceWorkItem(Base):
__tablename__ = "service_work_items"
@@ -625,3 +648,19 @@ class AuditLog(Base):
user_agent: Mapped[str | None] = mapped_column(String(256))
metadata_json: Mapped[dict | None] = mapped_column(JSON)
created_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), server_default=func.now(), index=True)
+
+
+class AdminExportJob(Base):
+ __tablename__ = "admin_export_jobs"
+
+ id: Mapped[int] = mapped_column(primary_key=True)
+ requested_by_user_id: Mapped[int | None] = mapped_column(ForeignKey("users.id", ondelete="SET NULL"), index=True)
+ source: Mapped[str] = mapped_column(String(80), index=True)
+ export_format: Mapped[str] = mapped_column(String(16), default="json", server_default="json")
+ status: Mapped[str] = mapped_column(String(24), default="ready", server_default="ready", index=True)
+ reason: Mapped[str | None] = mapped_column(Text)
+ filters_json: Mapped[dict | None] = mapped_column(JSON)
+ result_text: Mapped[str | None] = mapped_column(Text)
+ row_count: Mapped[int] = mapped_column(Integer, default=0, server_default="0")
+ expires_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True))
+ created_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), server_default=func.now(), index=True)
diff --git a/app/services/admin_notifications.py b/app/services/admin_notifications.py
new file mode 100644
index 0000000..35e6d94
--- /dev/null
+++ b/app/services/admin_notifications.py
@@ -0,0 +1,150 @@
+import logging
+from datetime import UTC, datetime
+from html import escape
+
+import httpx
+from sqlalchemy import select
+from sqlalchemy.ext.asyncio import AsyncSession
+
+from app.core.config import settings
+from app.models.car import AdminNotification
+
+logger = logging.getLogger(__name__)
+
+
+ADMIN_EVENT_FLAGS = {
+ "user_registered": "admin_notify_new_users",
+ "vehicle_created": "admin_notify_new_users",
+ "first_record_created": "admin_notify_new_users",
+ "sto_application_created": "admin_notify_sto_applications",
+ "sto_application_updated": "admin_notify_sto_applications",
+ "sto_approved": "admin_notify_sto_applications",
+ "sto_suspended": "admin_notify_sto_applications",
+ "security_event": "admin_notify_security_events",
+ "rate_limit_exceeded": "admin_notify_security_events",
+ "upload_blocked": "admin_notify_security_events",
+ "system_error": "admin_notify_system_errors",
+ "ocr_failed": "admin_notify_system_errors",
+}
+
+
+def admin_event_enabled(event_type: str) -> bool:
+ flag = ADMIN_EVENT_FLAGS.get(event_type)
+ return bool(getattr(settings, flag, True)) if flag else True
+
+
+def admin_recipients() -> list[str]:
+ recipients: list[str] = []
+ if settings.admin_notification_chat_id:
+ recipients.append(settings.admin_notification_chat_id)
+ recipients.extend(str(item) for item in settings.admin_telegram_id_list)
+ return list(dict.fromkeys(recipients))
+
+
+def admin_notification_url(entity_type: str | None = None, entity_id: str | int | None = None) -> str:
+ base = settings.effective_webapp_url
+ if entity_type == "service_center" and entity_id:
+ return f"{base}/admin.html?section=sto-applications&entity_id={entity_id}"
+ if entity_type == "user" and entity_id:
+ return f"{base}/admin.html?section=users&entity_id={entity_id}"
+ return f"{base}/admin.html"
+
+
+async def create_admin_notification(
+ session: AsyncSession,
+ *,
+ event_type: str,
+ title: str,
+ body: str | None = None,
+ entity_type: str | None = None,
+ entity_id: int | str | None = None,
+ severity: str = "info",
+ idempotency_key: str | None = None,
+ metadata: dict | None = None,
+ send_telegram: bool = True,
+) -> AdminNotification:
+ key = idempotency_key or f"{event_type}:{entity_type or 'system'}:{entity_id or title}"
+ existing = (
+ await session.execute(select(AdminNotification).where(AdminNotification.idempotency_key == key))
+ ).scalar_one_or_none()
+ if existing:
+ return existing
+
+ notification = AdminNotification(
+ event_type=event_type,
+ title=title,
+ body=body,
+ entity_type=entity_type,
+ entity_id=str(entity_id) if entity_id is not None else None,
+ severity=severity,
+ idempotency_key=key,
+ metadata_json=metadata,
+ telegram_status="pending" if send_telegram else "skipped",
+ )
+ session.add(notification)
+ await session.flush()
+
+ if send_telegram and admin_event_enabled(event_type):
+ await send_admin_telegram_notification(notification)
+ elif send_telegram:
+ notification.telegram_status = "skipped"
+ return notification
+
+
+async def send_admin_telegram_notification(notification: AdminNotification) -> None:
+ recipients = admin_recipients()
+ if not recipients or not settings.bot_token:
+ notification.telegram_status = "skipped"
+ return
+
+ link = admin_notification_url(notification.entity_type, notification.entity_id)
+ text = "\n".join(
+ item
+ for item in [
+ f"{escape(notification.title)}",
+ escape(notification.body or ""),
+ f"Событие: {escape(notification.event_type)}",
+ f"Открыть: {escape(link)}",
+ ]
+ if item
+ )
+ errors: list[str] = []
+ async with httpx.AsyncClient(timeout=8) as client:
+ for chat_id in recipients:
+ try:
+ response = await client.post(
+ f"https://api.telegram.org/bot{settings.bot_token}/sendMessage",
+ json={
+ "chat_id": chat_id,
+ "text": text,
+ "parse_mode": "HTML",
+ "disable_web_page_preview": True,
+ },
+ )
+ response.raise_for_status()
+ except Exception as error: # noqa: BLE001 - notification delivery is best-effort
+ logger.warning("Admin Telegram notification failed: %s", error)
+ errors.append(str(error))
+ if errors:
+ notification.telegram_status = "failed"
+ notification.telegram_error = "; ".join(errors)[:2000]
+ else:
+ notification.telegram_status = "sent"
+
+
+async def mark_admin_notification_read(
+ session: AsyncSession, notification: AdminNotification
+) -> AdminNotification:
+ notification.status = "read"
+ notification.read_at = datetime.now(UTC)
+ await session.flush()
+ return notification
+
+
+async def dismiss_admin_notification(
+ session: AsyncSession, notification: AdminNotification
+) -> AdminNotification:
+ notification.status = "dismissed"
+ notification.dismissed_at = datetime.now(UTC)
+ await session.flush()
+ return notification