From fa703acce12aeb0368a9e1df6bc28b8ce7b4a8d7 Mon Sep 17 00:00:00 2001 From: VPN SaaS Dev Date: Sun, 17 May 2026 21:16:22 +0900 Subject: [PATCH] admin notifications and data explorer backend --- .env.example | 5 + ...70001_admin_notifications_data_explorer.py | 85 ++ app/api/admin.py | 887 +++++++++++++++++- app/api/deps.py | 19 + app/api/my.py | 15 + app/api/service_centers.py | 20 + app/core/config.py | 5 + app/models/car.py | 39 + app/services/admin_notifications.py | 150 +++ 9 files changed, 1218 insertions(+), 7 deletions(-) create mode 100644 alembic/versions/202605170001_admin_notifications_data_explorer.py create mode 100644 app/services/admin_notifications.py diff --git a/.env.example b/.env.example index 5d8d1c4..98bbc09 100644 --- a/.env.example +++ b/.env.example @@ -22,3 +22,8 @@ OCR_PROVIDER=tesseract OCR_LANGUAGES=eng+rus+kor ADMIN_TELEGRAM_IDS= ADMIN_BOOTSTRAP_TOKEN= +ADMIN_NOTIFICATION_CHAT_ID= +ADMIN_NOTIFY_NEW_USERS=true +ADMIN_NOTIFY_STO_APPLICATIONS=true +ADMIN_NOTIFY_SECURITY_EVENTS=true +ADMIN_NOTIFY_SYSTEM_ERRORS=true diff --git a/alembic/versions/202605170001_admin_notifications_data_explorer.py b/alembic/versions/202605170001_admin_notifications_data_explorer.py new file mode 100644 index 0000000..9a48616 --- /dev/null +++ b/alembic/versions/202605170001_admin_notifications_data_explorer.py @@ -0,0 +1,85 @@ +"""admin notifications and data explorer jobs + +Revision ID: 202605170001 +Revises: 202605160002 +Create Date: 2026-05-17 00:00:00.000000 +""" + +from collections.abc import Sequence + +import sqlalchemy as sa +from alembic import op + +revision: str = "202605170001" +down_revision: str | None = "202605160002" +branch_labels: str | Sequence[str] | None = None +depends_on: str | Sequence[str] | None = None + + +def upgrade() -> None: + op.create_table( + "admin_notifications", + sa.Column("id", sa.Integer(), nullable=False), + sa.Column("event_type", sa.String(length=80), nullable=False), + sa.Column("severity", sa.String(length=24), server_default="info", nullable=False), + sa.Column("title", sa.String(length=180), nullable=False), + sa.Column("body", sa.Text(), nullable=True), + sa.Column("entity_type", sa.String(length=80), nullable=True), + sa.Column("entity_id", sa.String(length=80), nullable=True), + sa.Column("status", sa.String(length=24), server_default="unread", nullable=False), + sa.Column("idempotency_key", sa.String(length=180), nullable=False), + sa.Column("metadata_json", sa.JSON(), nullable=True), + sa.Column("telegram_status", sa.String(length=24), server_default="pending", nullable=False), + sa.Column("telegram_error", sa.Text(), nullable=True), + sa.Column("read_at", sa.DateTime(timezone=True), nullable=True), + sa.Column("dismissed_at", sa.DateTime(timezone=True), nullable=True), + sa.Column("created_at", sa.DateTime(timezone=True), server_default=sa.func.now(), nullable=False), + sa.Column("updated_at", sa.DateTime(timezone=True), server_default=sa.func.now(), nullable=False), + sa.PrimaryKeyConstraint("id"), + ) + op.create_index("ix_admin_notifications_created_at", "admin_notifications", ["created_at"]) + op.create_index("ix_admin_notifications_entity_id", "admin_notifications", ["entity_id"]) + op.create_index("ix_admin_notifications_entity_type", "admin_notifications", ["entity_type"]) + op.create_index("ix_admin_notifications_event_type", "admin_notifications", ["event_type"]) + op.create_index("ix_admin_notifications_idempotency_key", "admin_notifications", ["idempotency_key"], unique=True) + op.create_index("ix_admin_notifications_severity", "admin_notifications", ["severity"]) + op.create_index("ix_admin_notifications_status", "admin_notifications", ["status"]) + op.create_index("ix_admin_notifications_telegram_status", "admin_notifications", ["telegram_status"]) + + op.create_table( + "admin_export_jobs", + sa.Column("id", sa.Integer(), nullable=False), + sa.Column("requested_by_user_id", sa.Integer(), nullable=True), + sa.Column("source", sa.String(length=80), nullable=False), + sa.Column("export_format", sa.String(length=16), server_default="json", nullable=False), + sa.Column("status", sa.String(length=24), server_default="ready", nullable=False), + sa.Column("reason", sa.Text(), nullable=True), + sa.Column("filters_json", sa.JSON(), nullable=True), + sa.Column("result_text", sa.Text(), nullable=True), + sa.Column("row_count", sa.Integer(), server_default="0", nullable=False), + sa.Column("expires_at", sa.DateTime(timezone=True), nullable=True), + sa.Column("created_at", sa.DateTime(timezone=True), server_default=sa.func.now(), nullable=False), + sa.ForeignKeyConstraint(["requested_by_user_id"], ["users.id"], ondelete="SET NULL"), + sa.PrimaryKeyConstraint("id"), + ) + op.create_index("ix_admin_export_jobs_created_at", "admin_export_jobs", ["created_at"]) + op.create_index("ix_admin_export_jobs_requested_by_user_id", "admin_export_jobs", ["requested_by_user_id"]) + op.create_index("ix_admin_export_jobs_source", "admin_export_jobs", ["source"]) + op.create_index("ix_admin_export_jobs_status", "admin_export_jobs", ["status"]) + + +def downgrade() -> None: + op.drop_index("ix_admin_export_jobs_status", table_name="admin_export_jobs") + op.drop_index("ix_admin_export_jobs_source", table_name="admin_export_jobs") + op.drop_index("ix_admin_export_jobs_requested_by_user_id", table_name="admin_export_jobs") + op.drop_index("ix_admin_export_jobs_created_at", table_name="admin_export_jobs") + op.drop_table("admin_export_jobs") + op.drop_index("ix_admin_notifications_telegram_status", table_name="admin_notifications") + op.drop_index("ix_admin_notifications_status", table_name="admin_notifications") + op.drop_index("ix_admin_notifications_severity", table_name="admin_notifications") + op.drop_index("ix_admin_notifications_idempotency_key", table_name="admin_notifications") + op.drop_index("ix_admin_notifications_event_type", table_name="admin_notifications") + op.drop_index("ix_admin_notifications_entity_type", table_name="admin_notifications") + op.drop_index("ix_admin_notifications_entity_id", table_name="admin_notifications") + op.drop_index("ix_admin_notifications_created_at", table_name="admin_notifications") + op.drop_table("admin_notifications") diff --git a/app/api/admin.py b/app/api/admin.py index 8bb15ba..96c696b 100644 --- a/app/api/admin.py +++ b/app/api/admin.py @@ -1,27 +1,826 @@ -from datetime import UTC, datetime +import csv +import io +import json +from datetime import UTC, date, datetime, timedelta +from decimal import Decimal +from typing import Any from fastapi import APIRouter, Depends, HTTPException -from sqlalchemy import select +from fastapi.encoders import jsonable_encoder +from pydantic import BaseModel, ConfigDict, Field +from sqlalchemy import Select, func, or_, select from sqlalchemy.ext.asyncio import AsyncSession from app.api.deps import get_current_telegram_user, log_audit, require_platform_role from app.db.session import get_session from app.models.car import ( + AdminExportJob, + AdminNotification, AuditLog, + Car, + CarServiceLink, + ServiceAppointment, ServiceCenter, + ServiceCenterReview, ServiceCenterVerification, ServiceEmployee, + ServiceNotification, + ServiceProductItem, ServiceVisit, + ServiceWorkItem, ) +from app.models.expense import ExpenseEntry, FuelEntry, ServiceEntry from app.models.user import User from app.schemas.service_center import AdminModerationDecision, ServiceCenterRead, ServiceVisitRead +from app.services.admin_notifications import ( + create_admin_notification, + dismiss_admin_notification, + mark_admin_notification_read, +) from app.services.notifications import notify_user router = APIRouter(prefix="/admin", tags=["admin"]) +ADMIN_ROLES = {"admin", "super_admin", "moderator", "support", "analyst"} +FULL_ADMIN_ROLES = {"admin", "super_admin"} +MODERATION_ROLES = {"admin", "super_admin", "moderator"} +DATA_EXPORT_ROLES = {"admin", "super_admin", "analyst"} + + +class AdminDataQuery(BaseModel): + model_config = ConfigDict(extra="forbid") + + source: str + date_from: date | None = None + date_to: date | None = None + status: str | None = None + user_id: int | None = None + telegram_id: int | None = None + vehicle_id: int | None = None + sto_id: int | None = None + city: str | None = None + role: str | None = None + category: str | None = None + amount_min: Decimal | None = None + amount_max: Decimal | None = None + has_errors: bool | None = None + is_pending: bool | None = None + is_completed: bool | None = None + search: str | None = None + sort: str = "created_at_desc" + limit: int = Field(default=50, ge=1, le=500) + offset: int = Field(default=0, ge=0) + include_sensitive: bool = False + reason: str | None = None + + +class AdminExportRequest(AdminDataQuery): + export_format: str = "json" + + +class AdminUserNote(BaseModel): + note: str + + +DATA_SOURCES: dict[str, dict[str, Any]] = { + "users": { + "model": User, + "roles": ADMIN_ROLES, + "search": ["username", "first_name", "last_name"], + "filters": {"user_id": "id", "telegram_id": "telegram_id", "role": "platform_role"}, + "sensitive": {"telegram_id"}, + "columns": ["id", "telegram_id", "username", "first_name", "last_name", "platform_role", "created_at", "updated_at"], + }, + "vehicles": { + "model": Car, + "roles": ADMIN_ROLES, + "search": ["name", "make", "model", "vin", "plate_number", "license_plate_display"], + "filters": {"user_id": "owner_id", "vehicle_id": "id"}, + "sensitive": {"vin", "plate_number", "license_plate_display", "vin_normalized", "license_plate_normalized"}, + "columns": ["id", "owner_id", "name", "make", "model", "year", "vin", "plate_number", "license_plate_display", "current_odometer", "created_at"], + }, + "fuel_entries": { + "model": FuelEntry, + "roles": FULL_ADMIN_ROLES | {"analyst"}, + "filters": {"vehicle_id": "car_id"}, + "amount": "total_cost", + "date": "entry_date", + "columns": ["id", "car_id", "entry_date", "odometer", "liters", "total_cost", "created_at"], + }, + "service_entries": { + "model": ServiceEntry, + "roles": FULL_ADMIN_ROLES | {"analyst", "support"}, + "search": ["title", "vendor", "category"], + "filters": {"vehicle_id": "car_id", "category": "category"}, + "amount": "total_cost", + "date": "entry_date", + "columns": ["id", "car_id", "entry_date", "service_type", "title", "total_cost", "created_at"], + }, + "expense_entries": { + "model": ExpenseEntry, + "roles": FULL_ADMIN_ROLES | {"analyst"}, + "search": ["title", "vendor"], + "filters": {"vehicle_id": "car_id", "category": "category"}, + "amount": "total_cost", + "date": "entry_date", + "columns": ["id", "car_id", "entry_date", "category", "title", "total_cost", "currency", "created_at"], + }, + "sto_profiles": { + "model": ServiceCenter, + "roles": ADMIN_ROLES, + "search": ["name", "display_name", "legal_name", "city"], + "filters": {"sto_id": "id", "city": "city", "status": "verification_status", "user_id": "owner_user_id"}, + "sensitive": {"phone", "contact_phone", "business_registration_number"}, + "columns": ["id", "display_name", "legal_name", "city", "phone", "verification_status", "owner_user_id", "created_at", "verified_at"], + }, + "sto_applications": { + "model": ServiceCenterVerification, + "roles": MODERATION_ROLES, + "filters": {"sto_id": "service_center_id", "status": "status", "user_id": "reviewed_by"}, + "columns": ["id", "service_center_id", "status", "reviewed_by", "reviewed_at", "created_at", "comment"], + }, + "sto_employees": { + "model": ServiceEmployee, + "roles": MODERATION_ROLES | {"support"}, + "filters": {"sto_id": "service_center_id", "user_id": "user_id", "role": "role", "status": "status"}, + "columns": ["id", "service_center_id", "user_id", "role", "status", "created_at"], + }, + "vehicle_sto_links": { + "model": CarServiceLink, + "roles": MODERATION_ROLES | {"support"}, + "filters": {"vehicle_id": "car_id", "sto_id": "service_center_id", "status": "status"}, + "columns": ["id", "car_id", "service_center_id", "access_level", "status", "created_at"], + }, + "appointments": { + "model": ServiceAppointment, + "roles": ADMIN_ROLES, + "filters": {"vehicle_id": "vehicle_id", "sto_id": "service_center_id", "user_id": "owner_user_id", "status": "status"}, + "columns": ["id", "service_center_id", "vehicle_id", "owner_user_id", "service_type", "service_name", "status", "requested_start_at", "created_at"], + }, + "work_orders": { + "model": ServiceVisit, + "roles": ADMIN_ROLES, + "filters": {"vehicle_id": "vehicle_id", "sto_id": "service_center_id", "user_id": "owner_user_id", "status": "status"}, + "amount": "final_total", + "columns": ["id", "service_center_id", "vehicle_id", "owner_user_id", "status", "final_total", "currency", "created_at", "completed_at"], + }, + "work_order_items": { + "model": ServiceWorkItem, + "roles": FULL_ADMIN_ROLES | {"support"}, + "filters": {"category": "category"}, + "amount": "total", + "columns": ["id", "service_visit_id", "work_type", "title", "category", "quantity", "total", "created_at"], + }, + "work_order_products": { + "model": ServiceProductItem, + "roles": FULL_ADMIN_ROLES | {"support"}, + "filters": {"category": "category"}, + "amount": "total", + "columns": ["id", "service_visit_id", "title", "category", "product_type", "quantity", "total", "created_at"], + }, + "reviews": { + "model": ServiceCenterReview, + "roles": MODERATION_ROLES | {"support", "analyst"}, + "filters": {"sto_id": "service_center_id", "user_id": "user_id", "status": "status"}, + "columns": ["id", "service_center_id", "user_id", "rating", "status", "created_at"], + }, + "notifications": { + "model": ServiceNotification, + "roles": FULL_ADMIN_ROLES | {"support"}, + "filters": {"user_id": "recipient_user_id", "status": "status", "sto_id": "service_center_id"}, + "columns": ["id", "recipient_user_id", "notification_type", "title", "status", "created_at", "sent_at"], + }, + "admin_notifications": { + "model": AdminNotification, + "roles": ADMIN_ROLES, + "filters": {"status": "status"}, + "columns": ["id", "event_type", "severity", "title", "entity_type", "entity_id", "status", "created_at"], + }, + "audit_logs": { + "model": AuditLog, + "roles": FULL_ADMIN_ROLES | {"moderator", "support"}, + "search": ["action", "target_type", "target_id"], + "filters": {"user_id": "actor_user_id", "role": "actor_role"}, + "columns": ["id", "actor_user_id", "actor_role", "action", "target_type", "target_id", "ip", "created_at"], + }, + "ocr_results": {"model": None, "roles": ADMIN_ROLES, "columns": []}, + "imports_exports": { + "model": AdminExportJob, + "roles": DATA_EXPORT_ROLES, + "filters": {"user_id": "requested_by_user_id", "status": "status"}, + "columns": ["id", "requested_by_user_id", "source", "export_format", "status", "row_count", "created_at"], + }, +} + def require_admin_or_verifier(user: User) -> None: - require_platform_role(user, {"admin", "verifier", "moderator"}) + require_platform_role(user, MODERATION_ROLES | {"verifier"}) + + +def require_admin_access(user: User, allowed: set[str] | None = None) -> None: + require_platform_role(user, allowed or ADMIN_ROLES) + + +def mask_text(value: Any, visible: int = 4) -> Any: + if value is None: + return None + text = str(value) + if len(text) <= visible: + return "*" * len(text) + return f"{text[:2]}{'*' * max(len(text) - visible, 3)}{text[-2:]}" + + +def can_view_sensitive(user: User, query: AdminDataQuery) -> bool: + if not query.include_sensitive: + return False + if user.platform_role not in FULL_ADMIN_ROLES: + raise HTTPException(status_code=403, detail="Sensitive data is restricted") + if not query.reason or len(query.reason.strip()) < 5: + raise HTTPException(status_code=400, detail="reason is required for sensitive data") + return True + + +def public_columns(source: str) -> list[str]: + return list(DATA_SOURCES[source].get("columns") or []) + + +def serialize_row(row: Any, source: str, *, include_sensitive: bool, role: str) -> dict[str, Any]: + config = DATA_SOURCES[source] + sensitive = set(config.get("sensitive") or set()) + columns = public_columns(source) + payload: dict[str, Any] = {} + for column in columns: + value = getattr(row, column, None) + if role == "analyst" and column in {"telegram_id", "username", "first_name", "last_name", "owner_id", "user_id"}: + value = mask_text(value) + elif column in sensitive and not include_sensitive: + value = mask_text(value) + payload[column] = jsonable_encoder(value) + return payload + + +def source_config(source: str, user: User) -> dict[str, Any]: + config = DATA_SOURCES.get(source) + if config is None or config.get("model") is None: + raise HTTPException(status_code=400, detail="Unsupported data source") + require_admin_access(user, set(config["roles"])) + return config + + +def apply_data_filters(stmt: Select, query: AdminDataQuery, config: dict[str, Any]) -> Select: + model = config["model"] + date_column = config.get("date", "created_at") + if hasattr(model, date_column): + column = getattr(model, date_column) + if query.date_from: + stmt = stmt.where(column >= query.date_from) + if query.date_to: + stmt = stmt.where(column <= query.date_to) + for query_field, model_field in (config.get("filters") or {}).items(): + value = getattr(query, query_field) + if value is not None and value != "" and hasattr(model, model_field): + stmt = stmt.where(getattr(model, model_field) == value) + if query.is_pending is True and hasattr(model, "status"): + stmt = stmt.where(model.status.in_(["pending", "requested", "unread"])) + if query.is_completed is True and hasattr(model, "status"): + stmt = stmt.where(model.status.in_(["completed", "closed", "confirmed"])) + amount_column = config.get("amount") + if amount_column and hasattr(model, amount_column): + column = getattr(model, amount_column) + if query.amount_min is not None: + stmt = stmt.where(column >= query.amount_min) + if query.amount_max is not None: + stmt = stmt.where(column <= query.amount_max) + if query.has_errors is not None: + if hasattr(model, "last_error"): + stmt = stmt.where(model.last_error.is_not(None) if query.has_errors else model.last_error.is_(None)) + elif hasattr(model, "telegram_error"): + stmt = stmt.where(model.telegram_error.is_not(None) if query.has_errors else model.telegram_error.is_(None)) + if query.search: + search_fields = [field for field in config.get("search", []) if hasattr(model, field)] + if search_fields: + pattern = f"%{query.search}%" + stmt = stmt.where(or_(*(getattr(model, field).ilike(pattern) for field in search_fields))) + return stmt + + +def apply_sort(stmt: Select, query: AdminDataQuery, config: dict[str, Any]) -> Select: + model = config["model"] + sort_map = { + "created_at_desc": ("created_at", True), + "created_at_asc": ("created_at", False), + "updated_at_desc": ("updated_at", True), + "amount_desc": (config.get("amount") or "created_at", True), + "status": ("status", False), + "city": ("city", False), + } + field, desc = sort_map.get(query.sort, ("created_at", True)) + if not hasattr(model, field): + field = "id" + column = getattr(model, field) + return stmt.order_by(column.desc() if desc else column.asc()) + + +async def run_data_query( + session: AsyncSession, current_user: User, query: AdminDataQuery +) -> dict[str, Any]: + config = source_config(query.source, current_user) + include_sensitive = can_view_sensitive(current_user, query) + stmt = apply_sort(apply_data_filters(select(config["model"]), query, config), query, config) + result = await session.execute(stmt.limit(query.limit).offset(query.offset)) + rows = [ + serialize_row(item, query.source, include_sensitive=include_sensitive, role=current_user.platform_role) + for item in result.scalars() + ] + await log_audit( + session, + actor=current_user, + action="admin.data.query", + target_type=query.source, + metadata={ + "filters": query.model_dump(mode="json", exclude={"reason"}), + "include_sensitive": include_sensitive, + "reason": query.reason if include_sensitive else None, + "rows": len(rows), + }, + ) + return {"source": query.source, "limit": query.limit, "offset": query.offset, "rows": rows} + + +def csv_from_rows(rows: list[dict[str, Any]]) -> str: + output = io.StringIO() + if not rows: + return "" + writer = csv.DictWriter(output, fieldnames=list(rows[0].keys())) + writer.writeheader() + writer.writerows(rows) + return output.getvalue() + + +@router.get("/dashboard") +async def admin_dashboard( + session: AsyncSession = Depends(get_session), + current_user: User = Depends(get_current_telegram_user), +) -> dict[str, Any]: + require_admin_access(current_user) + today = datetime.now(UTC).date() + week_ago = today - timedelta(days=6) + + async def count(stmt: Select) -> int: + return int((await session.execute(stmt)).scalar_one() or 0) + + users_total = await count(select(func.count(User.id))) + users_today = await count(select(func.count(User.id)).where(func.date(User.created_at) == today)) + users_7d = await count(select(func.count(User.id)).where(func.date(User.created_at) >= week_ago)) + vehicles_total = await count(select(func.count(Car.id))) + pending_sto = await count(select(func.count(ServiceCenter.id)).where(ServiceCenter.verification_status == "pending")) + approved_sto = await count(select(func.count(ServiceCenter.id)).where(ServiceCenter.verification_status.in_(["approved", "verified"]))) + suspended_sto = await count(select(func.count(ServiceCenter.id)).where(ServiceCenter.verification_status == "suspended")) + appointments_today = await count( + select(func.count(ServiceAppointment.id)).where(func.date(ServiceAppointment.requested_start_at) == today) + ) + active_work_orders = await count( + select(func.count(ServiceVisit.id)).where(ServiceVisit.status.in_(["draft", "awaiting_approval", "approved", "in_progress"])) + ) + completed_work_orders = await count(select(func.count(ServiceVisit.id)).where(ServiceVisit.status == "completed")) + system_errors = await count(select(func.count(AdminNotification.id)).where(AdminNotification.severity.in_(["error", "critical"]))) + security_events = await count(select(func.count(AdminNotification.id)).where(AdminNotification.event_type.in_(["security_event", "rate_limit_exceeded", "upload_blocked"]))) + latest_alerts = ( + await session.execute(select(AdminNotification).order_by(AdminNotification.created_at.desc()).limit(8)) + ).scalars() + return { + "users_today": users_today, + "users_7d": users_7d, + "users_total": users_total, + "active_users": users_7d, + "vehicles_total": vehicles_total, + "new_sto_applications": pending_sto, + "pending_sto_applications": pending_sto, + "approved_sto": approved_sto, + "suspended_sto": suspended_sto, + "appointments_today": appointments_today, + "active_work_orders": active_work_orders, + "completed_work_orders": completed_work_orders, + "system_errors": system_errors, + "security_events": security_events, + "latest_alerts": [serialize_row(item, "admin_notifications", include_sensitive=False, role=current_user.platform_role) for item in latest_alerts], + } + + +@router.get("/notifications") +async def admin_notifications( + status: str | None = None, + limit: int = 50, + offset: int = 0, + session: AsyncSession = Depends(get_session), + current_user: User = Depends(get_current_telegram_user), +) -> dict[str, Any]: + require_admin_access(current_user) + limit = min(max(limit, 1), 200) + stmt = select(AdminNotification) + if status: + stmt = stmt.where(AdminNotification.status == status) + rows = ( + await session.execute(stmt.order_by(AdminNotification.created_at.desc()).limit(limit).offset(max(offset, 0))) + ).scalars() + return {"rows": [jsonable_encoder(item) for item in rows], "limit": limit, "offset": offset} + + +@router.post("/notifications/{notification_id}/read") +async def read_admin_notification( + notification_id: int, + session: AsyncSession = Depends(get_session), + current_user: User = Depends(get_current_telegram_user), +) -> dict[str, Any]: + require_admin_access(current_user) + notification = await session.get(AdminNotification, notification_id) + if notification is None: + raise HTTPException(status_code=404, detail="Admin notification not found") + await mark_admin_notification_read(session, notification) + await log_audit(session, actor=current_user, action="admin_notification.read", target_type="admin_notification", target_id=notification_id) + await session.commit() + return jsonable_encoder(notification) + + +@router.post("/notifications/read-all") +async def read_all_admin_notifications( + session: AsyncSession = Depends(get_session), + current_user: User = Depends(get_current_telegram_user), +) -> dict[str, Any]: + require_admin_access(current_user) + rows = (await session.execute(select(AdminNotification).where(AdminNotification.status == "unread"))).scalars().all() + for notification in rows: + await mark_admin_notification_read(session, notification) + await log_audit(session, actor=current_user, action="admin_notification.read_all", target_type="admin_notification", metadata={"count": len(rows)}) + await session.commit() + return {"updated": len(rows)} + + +@router.post("/notifications/{notification_id}/dismiss") +async def dismiss_notification( + notification_id: int, + session: AsyncSession = Depends(get_session), + current_user: User = Depends(get_current_telegram_user), +) -> dict[str, Any]: + require_admin_access(current_user) + notification = await session.get(AdminNotification, notification_id) + if notification is None: + raise HTTPException(status_code=404, detail="Admin notification not found") + await dismiss_admin_notification(session, notification) + await log_audit(session, actor=current_user, action="admin_notification.dismiss", target_type="admin_notification", target_id=notification_id) + await session.commit() + return jsonable_encoder(notification) + + +@router.get("/data/sources") +async def admin_data_sources(current_user: User = Depends(get_current_telegram_user)) -> dict[str, Any]: + require_admin_access(current_user) + return { + "sources": [ + { + "name": name, + "available": bool(config.get("model")), + "columns": config.get("columns") or [], + "sensitive": sorted(config.get("sensitive") or []), + "allowed": current_user.platform_role in set(config["roles"]), + } + for name, config in DATA_SOURCES.items() + ], + "sorts": ["created_at_desc", "created_at_asc", "updated_at_desc", "amount_desc", "status", "city"], + "limits": [25, 50, 100, 500], + } + + +@router.post("/data/query") +async def admin_data_query( + payload: AdminDataQuery, + session: AsyncSession = Depends(get_session), + current_user: User = Depends(get_current_telegram_user), +) -> dict[str, Any]: + data = await run_data_query(session, current_user, payload) + await session.commit() + return data + + +@router.post("/data/export") +async def admin_data_export( + payload: AdminExportRequest, + session: AsyncSession = Depends(get_session), + current_user: User = Depends(get_current_telegram_user), +) -> dict[str, Any]: + require_admin_access(current_user, DATA_EXPORT_ROLES) + if payload.export_format not in {"json", "csv"}: + raise HTTPException(status_code=400, detail="Unsupported export format") + if payload.source in {"users", "vehicles", "sto_profiles"} and not payload.reason: + raise HTTPException(status_code=400, detail="Export reason is required") + data = await run_data_query(session, current_user, payload) + content = json.dumps(data["rows"], ensure_ascii=False, default=str, indent=2) + if payload.export_format == "csv": + content = csv_from_rows(data["rows"]) + job = AdminExportJob( + requested_by_user_id=current_user.id, + source=payload.source, + export_format=payload.export_format, + status="ready", + reason=payload.reason, + filters_json=payload.model_dump(mode="json", exclude={"reason"}), + result_text=content, + row_count=len(data["rows"]), + expires_at=datetime.now(UTC) + timedelta(days=7), + ) + session.add(job) + await log_audit(session, actor=current_user, action="admin.data.export", target_type=payload.source, metadata={"format": payload.export_format, "rows": len(data["rows"])}) + await session.commit() + await session.refresh(job) + return {"id": job.id, "status": job.status, "row_count": job.row_count, "expires_at": job.expires_at} + + +@router.get("/exports") +async def admin_exports( + session: AsyncSession = Depends(get_session), + current_user: User = Depends(get_current_telegram_user), +) -> dict[str, Any]: + require_admin_access(current_user, DATA_EXPORT_ROLES) + rows = (await session.execute(select(AdminExportJob).order_by(AdminExportJob.created_at.desc()).limit(50))).scalars() + return { + "rows": [ + { + "id": item.id, + "requested_by_user_id": item.requested_by_user_id, + "source": item.source, + "export_format": item.export_format, + "status": item.status, + "row_count": item.row_count, + "reason": item.reason, + "expires_at": item.expires_at, + "created_at": item.created_at, + } + for item in rows + ] + } + + +@router.get("/exports/{export_id}") +async def admin_export_detail( + export_id: int, + session: AsyncSession = Depends(get_session), + current_user: User = Depends(get_current_telegram_user), +) -> dict[str, Any]: + require_admin_access(current_user, DATA_EXPORT_ROLES) + job = await session.get(AdminExportJob, export_id) + if job is None: + raise HTTPException(status_code=404, detail="Export not found") + await log_audit(session, actor=current_user, action="admin.export.view", target_type="admin_export", target_id=export_id) + await session.commit() + return jsonable_encoder(job) + + +@router.get("/users") +async def admin_users( + search: str | None = None, + limit: int = 50, + offset: int = 0, + session: AsyncSession = Depends(get_session), + current_user: User = Depends(get_current_telegram_user), +) -> dict[str, Any]: + query = AdminDataQuery(source="users", search=search, limit=min(limit, 100), offset=offset) + data = await run_data_query(session, current_user, query) + await session.commit() + return data + + +@router.get("/users/{user_id}") +async def admin_user_detail( + user_id: int, + session: AsyncSession = Depends(get_session), + current_user: User = Depends(get_current_telegram_user), +) -> dict[str, Any]: + require_admin_access(current_user, ADMIN_ROLES - {"analyst"}) + user = await session.get(User, user_id) + if user is None: + raise HTTPException(status_code=404, detail="User not found") + cars_count = int((await session.execute(select(func.count(Car.id)).where(Car.owner_id == user.id))).scalar_one() or 0) + fuel_count = int( + ( + await session.execute( + select(func.count(FuelEntry.id)).join(Car, FuelEntry.car_id == Car.id).where(Car.owner_id == user.id) + ) + ).scalar_one() + or 0 + ) + appointments_count = int((await session.execute(select(func.count(ServiceAppointment.id)).where(ServiceAppointment.owner_user_id == user.id))).scalar_one() or 0) + await log_audit(session, actor=current_user, action="admin.user.view", target_type="user", target_id=user.id) + await session.commit() + return { + **serialize_row(user, "users", include_sensitive=current_user.platform_role in FULL_ADMIN_ROLES, role=current_user.platform_role), + "cars_count": cars_count, + "records_count": fuel_count, + "appointments_count": appointments_count, + } + + +@router.get("/users/{user_id}/activity") +async def admin_user_activity( + user_id: int, + limit: int = 50, + session: AsyncSession = Depends(get_session), + current_user: User = Depends(get_current_telegram_user), +) -> dict[str, Any]: + require_admin_access(current_user, ADMIN_ROLES - {"analyst"}) + rows = ( + await session.execute( + select(AuditLog) + .where(AuditLog.actor_user_id == user_id) + .order_by(AuditLog.created_at.desc()) + .limit(min(max(limit, 1), 100)) + ) + ).scalars() + await log_audit(session, actor=current_user, action="admin.user.activity", target_type="user", target_id=user_id) + await session.commit() + return {"rows": [jsonable_encoder(item) for item in rows]} + + +@router.post("/users/{user_id}/note") +async def admin_user_note( + user_id: int, + payload: AdminUserNote, + session: AsyncSession = Depends(get_session), + current_user: User = Depends(get_current_telegram_user), +) -> dict[str, str]: + require_admin_access(current_user, {"admin", "super_admin", "support"}) + await log_audit(session, actor=current_user, action="admin.user.note", target_type="user", target_id=user_id, metadata={"note": payload.note}) + await session.commit() + return {"status": "saved"} + + +@router.post("/users/{user_id}/block") +async def admin_block_user( + user_id: int, + payload: AdminUserNote | None = None, + session: AsyncSession = Depends(get_session), + current_user: User = Depends(get_current_telegram_user), +) -> dict[str, Any]: + require_admin_access(current_user, FULL_ADMIN_ROLES) + user = await session.get(User, user_id) + if user is None: + raise HTTPException(status_code=404, detail="User not found") + user.platform_role = "blocked" + await log_audit(session, actor=current_user, action="admin.user.block", target_type="user", target_id=user_id, metadata={"reason": payload.note if payload else None}) + await session.commit() + return {"id": user.id, "platform_role": user.platform_role} + + +@router.post("/users/{user_id}/unblock") +async def admin_unblock_user( + user_id: int, + payload: AdminUserNote | None = None, + session: AsyncSession = Depends(get_session), + current_user: User = Depends(get_current_telegram_user), +) -> dict[str, Any]: + require_admin_access(current_user, FULL_ADMIN_ROLES) + user = await session.get(User, user_id) + if user is None: + raise HTTPException(status_code=404, detail="User not found") + user.platform_role = "user" + await log_audit(session, actor=current_user, action="admin.user.unblock", target_type="user", target_id=user_id, metadata={"reason": payload.note if payload else None}) + await session.commit() + return {"id": user.id, "platform_role": user.platform_role} + + +@router.get("/sto") +async def admin_sto( + status: str | None = None, + city: str | None = None, + limit: int = 50, + offset: int = 0, + session: AsyncSession = Depends(get_session), + current_user: User = Depends(get_current_telegram_user), +) -> dict[str, Any]: + query = AdminDataQuery(source="sto_profiles", status=status, city=city, limit=min(limit, 100), offset=offset) + data = await run_data_query(session, current_user, query) + await session.commit() + return data + + +@router.get("/sto/{service_center_id}") +async def admin_sto_detail( + service_center_id: int, + session: AsyncSession = Depends(get_session), + current_user: User = Depends(get_current_telegram_user), +) -> dict[str, Any]: + require_admin_access(current_user, ADMIN_ROLES - {"analyst"}) + center = await session.get(ServiceCenter, service_center_id) + if center is None: + raise HTTPException(status_code=404, detail="Service center not found") + employees_count = int((await session.execute(select(func.count(ServiceEmployee.id)).where(ServiceEmployee.service_center_id == center.id))).scalar_one() or 0) + appointments_count = int((await session.execute(select(func.count(ServiceAppointment.id)).where(ServiceAppointment.service_center_id == center.id))).scalar_one() or 0) + work_orders_count = int((await session.execute(select(func.count(ServiceVisit.id)).where(ServiceVisit.service_center_id == center.id))).scalar_one() or 0) + await log_audit(session, actor=current_user, action="admin.sto.view", target_type="service_center", target_id=center.id) + await session.commit() + return { + **serialize_row(center, "sto_profiles", include_sensitive=current_user.platform_role in FULL_ADMIN_ROLES, role=current_user.platform_role), + "employees_count": employees_count, + "appointments_count": appointments_count, + "work_orders_count": work_orders_count, + } + + +@router.get("/sto-applications") +async def admin_sto_applications( + status: str | None = None, + city: str | None = None, + limit: int = 50, + offset: int = 0, + session: AsyncSession = Depends(get_session), + current_user: User = Depends(get_current_telegram_user), +) -> dict[str, Any]: + require_admin_access(current_user, MODERATION_ROLES) + stmt = select(ServiceCenter).where(ServiceCenter.verification_status.in_(["pending", "needs_changes"] if status is None else [status])) + if city: + stmt = stmt.where(ServiceCenter.city == city) + rows = (await session.execute(stmt.order_by(ServiceCenter.created_at.asc()).limit(min(limit, 100)).offset(offset))).scalars() + return {"rows": [serialize_row(item, "sto_profiles", include_sensitive=False, role=current_user.platform_role) for item in rows]} + + +async def center_id_from_application(session: AsyncSession, application_id: int) -> int: + verification = await session.get(ServiceCenterVerification, application_id) + if verification is None: + center = await session.get(ServiceCenter, application_id) + if center is None: + raise HTTPException(status_code=404, detail="STO application not found") + return center.id + return verification.service_center_id + + +@router.post("/sto-applications/{application_id}/approve", response_model=ServiceCenterRead) +async def approve_sto_application( + application_id: int, + payload: AdminModerationDecision | None = None, + session: AsyncSession = Depends(get_session), + current_user: User = Depends(get_current_telegram_user), +) -> ServiceCenter: + center_id = await center_id_from_application(session, application_id) + return await verify_service_center(center_id, payload, session, current_user) + + +@router.post("/sto-applications/{application_id}/reject", response_model=ServiceCenterRead) +async def reject_sto_application( + application_id: int, + payload: AdminModerationDecision | None = None, + session: AsyncSession = Depends(get_session), + current_user: User = Depends(get_current_telegram_user), +) -> ServiceCenter: + center_id = await center_id_from_application(session, application_id) + return await reject_service_center(center_id, payload, session, current_user) + + +@router.post("/sto-applications/{application_id}/request-changes", response_model=ServiceCenterRead) +async def request_sto_application_changes( + application_id: int, + payload: AdminModerationDecision, + session: AsyncSession = Depends(get_session), + current_user: User = Depends(get_current_telegram_user), +) -> ServiceCenter: + center_id = await center_id_from_application(session, application_id) + return await request_service_center_changes(center_id, payload, session, current_user) + + +@router.post("/sto/{service_center_id}/suspend", response_model=ServiceCenterRead) +async def suspend_sto( + service_center_id: int, + payload: AdminModerationDecision | None = None, + session: AsyncSession = Depends(get_session), + current_user: User = Depends(get_current_telegram_user), +) -> ServiceCenter: + return await suspend_service_center(service_center_id, payload, session, current_user) + + +@router.post("/sto/{service_center_id}/unsuspend", response_model=ServiceCenterRead) +async def unsuspend_sto( + service_center_id: int, + payload: AdminModerationDecision | None = None, + session: AsyncSession = Depends(get_session), + current_user: User = Depends(get_current_telegram_user), +) -> ServiceCenter: + require_admin_access(current_user, FULL_ADMIN_ROLES) + center = await session.get(ServiceCenter, service_center_id) + if center is None: + raise HTTPException(status_code=404, detail="Service center not found") + center.verification_status = "approved" + center.suspended_at = None + await create_admin_notification( + session, + event_type="sto_approved", + title="СТО разблокировано", + body=center.display_name or center.name, + entity_type="service_center", + entity_id=center.id, + idempotency_key=f"sto_unsuspended:{center.id}:{datetime.now(UTC).isoformat()}", + ) + await log_audit(session, actor=current_user, action="service_center.unsuspend", target_type="service_center", target_id=center.id, metadata={"reason": payload.reason if payload else None}) + await session.commit() + await session.refresh(center) + return center @router.get("/service-centers/pending", response_model=list[ServiceCenterRead]) @@ -79,6 +878,15 @@ async def verify_service_center( target_id=center.id, metadata={"comment": payload.comment if payload else None}, ) + await create_admin_notification( + session, + event_type="sto_approved", + title="СТО одобрено", + body=center.display_name or center.name, + entity_type="service_center", + entity_id=center.id, + idempotency_key=f"sto_approved:{center.id}:{center.verified_at.isoformat() if center.verified_at else 'now'}", + ) await session.commit() await session.refresh(center) return center @@ -110,6 +918,15 @@ async def reject_service_center( target_id=center.id, metadata={"reason": payload.reason if payload else None, "comment": payload.comment if payload else None}, ) + await create_admin_notification( + session, + event_type="sto_application_updated", + title="Заявка СТО отклонена", + body=center.display_name or center.name, + entity_type="service_center", + entity_id=center.id, + idempotency_key=f"sto_rejected:{center.id}:{datetime.now(UTC).isoformat()}", + ) await session.commit() await session.refresh(center) return center @@ -122,7 +939,7 @@ async def suspend_service_center( session: AsyncSession = Depends(get_session), current_user: User = Depends(get_current_telegram_user), ) -> ServiceCenter: - require_platform_role(current_user, {"admin"}) + require_admin_access(current_user, FULL_ADMIN_ROLES) center = await session.get(ServiceCenter, service_center_id) if center is None: raise HTTPException(status_code=404, detail="Service center not found") @@ -141,6 +958,16 @@ async def suspend_service_center( target_id=center.id, metadata={"reason": payload.reason if payload else None, "comment": payload.comment if payload else None}, ) + await create_admin_notification( + session, + event_type="sto_suspended", + title="СТО заблокировано", + body=center.display_name or center.name, + entity_type="service_center", + entity_id=center.id, + severity="warning", + idempotency_key=f"sto_suspended:{center.id}:{center.suspended_at.isoformat() if center.suspended_at else 'now'}", + ) await session.commit() await session.refresh(center) return center @@ -172,6 +999,15 @@ async def request_service_center_changes( target_id=center.id, metadata={"reason": payload.reason, "comment": payload.comment}, ) + await create_admin_notification( + session, + event_type="sto_application_updated", + title="По заявке СТО запрошены правки", + body=center.display_name or center.name, + entity_type="service_center", + entity_id=center.id, + idempotency_key=f"sto_changes:{center.id}:{datetime.now(UTC).isoformat()}", + ) await session.commit() await session.refresh(center) return center @@ -179,16 +1015,53 @@ async def request_service_center_changes( @router.get("/audit-log") async def audit_log( + actor_id: int | None = None, + actor_role: str | None = None, + action: str | None = None, + entity_type: str | None = None, + entity_id: str | None = None, + date_from: date | None = None, + date_to: date | None = None, + severity: str | None = None, + ip: str | None = None, + user_agent: str | None = None, limit: int = 100, offset: int = 0, session: AsyncSession = Depends(get_session), current_user: User = Depends(get_current_telegram_user), ) -> list[dict]: - require_platform_role(current_user, {"admin", "verifier", "moderator"}) + require_admin_access(current_user, FULL_ADMIN_ROLES | {"moderator", "support"}) limit = min(max(limit, 1), 200) - result = await session.execute( - select(AuditLog).order_by(AuditLog.created_at.desc()).limit(limit).offset(max(offset, 0)) + stmt = select(AuditLog) + if actor_id is not None: + stmt = stmt.where(AuditLog.actor_user_id == actor_id) + if actor_role: + stmt = stmt.where(AuditLog.actor_role == actor_role) + if action: + stmt = stmt.where(AuditLog.action.ilike(f"%{action}%")) + if entity_type: + stmt = stmt.where(AuditLog.target_type == entity_type) + if entity_id: + stmt = stmt.where(AuditLog.target_id == entity_id) + if date_from: + stmt = stmt.where(func.date(AuditLog.created_at) >= date_from) + if date_to: + stmt = stmt.where(func.date(AuditLog.created_at) <= date_to) + if ip: + stmt = stmt.where(AuditLog.ip == ip) + if user_agent: + stmt = stmt.where(AuditLog.user_agent.ilike(f"%{user_agent}%")) + if severity: + stmt = stmt.where(AuditLog.metadata_json["severity"].as_string() == severity) + result = await session.execute(stmt.order_by(AuditLog.created_at.desc()).limit(limit).offset(max(offset, 0))) + await log_audit( + session, + actor=current_user, + action="admin.audit_log.view", + target_type="audit_log", + metadata={"limit": limit, "offset": offset, "filter_action": action}, ) + await session.commit() return [ { "id": item.id, diff --git a/app/api/deps.py b/app/api/deps.py index ff68f56..b94b500 100644 --- a/app/api/deps.py +++ b/app/api/deps.py @@ -8,6 +8,7 @@ from app.core.config import settings from app.db.session import get_session from app.models.car import AuditLog, Car, ServiceCenter, ServiceEmployee, VehicleAccess from app.models.user import User +from app.services.admin_notifications import create_admin_notification from app.services.telegram_auth import verify_webapp_init_data @@ -36,6 +37,24 @@ async def get_or_create_telegram_user( if user is None: user = User(**{key: value for key, value in payload.items() if value is not None}) session.add(user) + await session.flush() + await create_admin_notification( + session, + event_type="user_registered", + title="Новый пользователь", + body="\n".join( + item + for item in [ + f"Имя: {' '.join(part for part in [first_name, last_name] if part) or '-'}", + f"Telegram ID: {telegram_id}", + f"Username: @{username}" if username else "Username: -", + ] + ), + entity_type="user", + entity_id=user.id, + idempotency_key=f"user_registered:{telegram_id}", + metadata={"telegram_id": telegram_id, "username": username}, + ) else: for field, value in payload.items(): if value is not None: diff --git a/app/api/my.py b/app/api/my.py index 0d56b3e..eb807a5 100644 --- a/app/api/my.py +++ b/app/api/my.py @@ -27,6 +27,7 @@ from app.schemas.service_center import ( VehicleUpdate, ) from app.schemas.user import UserRead +from app.services.admin_notifications import create_admin_notification from app.services.odometer import ( add_odometer_history, recalculate_current_odometer, @@ -381,6 +382,20 @@ async def create_vehicle( changed_by=current_user.id, ) await log_audit(session, actor=current_user, action="vehicle.create", target_type="vehicle", target_id=car.id) + vehicle_count = ( + await session.execute(select(Car.id).where(Car.owner_id == current_user.id).limit(2)) + ).scalars().all() + if len(vehicle_count) == 1: + await create_admin_notification( + session, + event_type="vehicle_created", + title="Пользователь впервые добавил авто", + body=f"{current_user.first_name or current_user.username or current_user.telegram_id}: {car.name}", + entity_type="vehicle", + entity_id=car.id, + idempotency_key=f"first_vehicle:{current_user.id}", + metadata={"user_id": current_user.id, "vehicle_id": car.id}, + ) await session.commit() await session.refresh(car) return car diff --git a/app/api/service_centers.py b/app/api/service_centers.py index 584c202..898290d 100644 --- a/app/api/service_centers.py +++ b/app/api/service_centers.py @@ -48,6 +48,7 @@ from app.schemas.service_center import ( VehicleSearchRequest, VehicleSearchResult, ) +from app.services.admin_notifications import create_admin_notification from app.services.notifications import notify_platform_moderators from app.services.odometer import validate_odometer_change from app.services.rate_limit import check_rate_limit @@ -147,6 +148,25 @@ async def create_service_center( target_type="service_center", target_id=center.id, ) + await create_admin_notification( + session, + event_type="sto_application_created", + title="Новая заявка СТО", + body="\n".join( + item + for item in [ + f"Название: {center.display_name or center.name}", + f"Город: {center.city or '-'}", + f"Телефон: {center.phone or center.contact_phone or '-'}", + f"Документы: {len(center.document_photo_urls or [])}", + "Статус: pending", + ] + ), + entity_type="service_center", + entity_id=center.id, + idempotency_key=f"sto_application_created:{center.id}", + metadata={"city": center.city, "owner_user_id": current_user.id}, + ) await session.commit() await session.refresh(center) await notify_platform_moderators( diff --git a/app/core/config.py b/app/core/config.py index 4e12116..7f8edec 100644 --- a/app/core/config.py +++ b/app/core/config.py @@ -24,6 +24,11 @@ class Settings(BaseSettings): ocr_languages: str = "eng+rus+kor" admin_telegram_ids: str = "" admin_bootstrap_token: str = "" + admin_notification_chat_id: str = "" + admin_notify_new_users: bool = True + admin_notify_sto_applications: bool = True + admin_notify_security_events: bool = True + admin_notify_system_errors: bool = True model_config = SettingsConfigDict(env_file=".env", env_file_encoding="utf-8", extra="ignore") diff --git a/app/models/car.py b/app/models/car.py index b4098d2..7e09f9b 100644 --- a/app/models/car.py +++ b/app/models/car.py @@ -432,6 +432,29 @@ class ServiceNotification(Base): created_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), server_default=func.now(), index=True) +class AdminNotification(Base): + __tablename__ = "admin_notifications" + + id: Mapped[int] = mapped_column(primary_key=True) + event_type: Mapped[str] = mapped_column(String(80), index=True) + severity: Mapped[str] = mapped_column(String(24), default="info", server_default="info", index=True) + title: Mapped[str] = mapped_column(String(180)) + body: Mapped[str | None] = mapped_column(Text) + entity_type: Mapped[str | None] = mapped_column(String(80), index=True) + entity_id: Mapped[str | None] = mapped_column(String(80), index=True) + status: Mapped[str] = mapped_column(String(24), default="unread", server_default="unread", index=True) + idempotency_key: Mapped[str] = mapped_column(String(180), unique=True, index=True) + metadata_json: Mapped[dict | None] = mapped_column(JSON) + telegram_status: Mapped[str] = mapped_column(String(24), default="pending", server_default="pending", index=True) + telegram_error: Mapped[str | None] = mapped_column(Text) + read_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True)) + dismissed_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True)) + created_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), server_default=func.now(), index=True) + updated_at: Mapped[datetime] = mapped_column( + DateTime(timezone=True), server_default=func.now(), onupdate=func.now() + ) + + class ServiceWorkItem(Base): __tablename__ = "service_work_items" @@ -625,3 +648,19 @@ class AuditLog(Base): user_agent: Mapped[str | None] = mapped_column(String(256)) metadata_json: Mapped[dict | None] = mapped_column(JSON) created_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), server_default=func.now(), index=True) + + +class AdminExportJob(Base): + __tablename__ = "admin_export_jobs" + + id: Mapped[int] = mapped_column(primary_key=True) + requested_by_user_id: Mapped[int | None] = mapped_column(ForeignKey("users.id", ondelete="SET NULL"), index=True) + source: Mapped[str] = mapped_column(String(80), index=True) + export_format: Mapped[str] = mapped_column(String(16), default="json", server_default="json") + status: Mapped[str] = mapped_column(String(24), default="ready", server_default="ready", index=True) + reason: Mapped[str | None] = mapped_column(Text) + filters_json: Mapped[dict | None] = mapped_column(JSON) + result_text: Mapped[str | None] = mapped_column(Text) + row_count: Mapped[int] = mapped_column(Integer, default=0, server_default="0") + expires_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True)) + created_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), server_default=func.now(), index=True) diff --git a/app/services/admin_notifications.py b/app/services/admin_notifications.py new file mode 100644 index 0000000..35e6d94 --- /dev/null +++ b/app/services/admin_notifications.py @@ -0,0 +1,150 @@ +import logging +from datetime import UTC, datetime +from html import escape + +import httpx +from sqlalchemy import select +from sqlalchemy.ext.asyncio import AsyncSession + +from app.core.config import settings +from app.models.car import AdminNotification + +logger = logging.getLogger(__name__) + + +ADMIN_EVENT_FLAGS = { + "user_registered": "admin_notify_new_users", + "vehicle_created": "admin_notify_new_users", + "first_record_created": "admin_notify_new_users", + "sto_application_created": "admin_notify_sto_applications", + "sto_application_updated": "admin_notify_sto_applications", + "sto_approved": "admin_notify_sto_applications", + "sto_suspended": "admin_notify_sto_applications", + "security_event": "admin_notify_security_events", + "rate_limit_exceeded": "admin_notify_security_events", + "upload_blocked": "admin_notify_security_events", + "system_error": "admin_notify_system_errors", + "ocr_failed": "admin_notify_system_errors", +} + + +def admin_event_enabled(event_type: str) -> bool: + flag = ADMIN_EVENT_FLAGS.get(event_type) + return bool(getattr(settings, flag, True)) if flag else True + + +def admin_recipients() -> list[str]: + recipients: list[str] = [] + if settings.admin_notification_chat_id: + recipients.append(settings.admin_notification_chat_id) + recipients.extend(str(item) for item in settings.admin_telegram_id_list) + return list(dict.fromkeys(recipients)) + + +def admin_notification_url(entity_type: str | None = None, entity_id: str | int | None = None) -> str: + base = settings.effective_webapp_url + if entity_type == "service_center" and entity_id: + return f"{base}/admin.html?section=sto-applications&entity_id={entity_id}" + if entity_type == "user" and entity_id: + return f"{base}/admin.html?section=users&entity_id={entity_id}" + return f"{base}/admin.html" + + +async def create_admin_notification( + session: AsyncSession, + *, + event_type: str, + title: str, + body: str | None = None, + entity_type: str | None = None, + entity_id: int | str | None = None, + severity: str = "info", + idempotency_key: str | None = None, + metadata: dict | None = None, + send_telegram: bool = True, +) -> AdminNotification: + key = idempotency_key or f"{event_type}:{entity_type or 'system'}:{entity_id or title}" + existing = ( + await session.execute(select(AdminNotification).where(AdminNotification.idempotency_key == key)) + ).scalar_one_or_none() + if existing: + return existing + + notification = AdminNotification( + event_type=event_type, + title=title, + body=body, + entity_type=entity_type, + entity_id=str(entity_id) if entity_id is not None else None, + severity=severity, + idempotency_key=key, + metadata_json=metadata, + telegram_status="pending" if send_telegram else "skipped", + ) + session.add(notification) + await session.flush() + + if send_telegram and admin_event_enabled(event_type): + await send_admin_telegram_notification(notification) + elif send_telegram: + notification.telegram_status = "skipped" + return notification + + +async def send_admin_telegram_notification(notification: AdminNotification) -> None: + recipients = admin_recipients() + if not recipients or not settings.bot_token: + notification.telegram_status = "skipped" + return + + link = admin_notification_url(notification.entity_type, notification.entity_id) + text = "\n".join( + item + for item in [ + f"{escape(notification.title)}", + escape(notification.body or ""), + f"Событие: {escape(notification.event_type)}", + f"Открыть: {escape(link)}", + ] + if item + ) + errors: list[str] = [] + async with httpx.AsyncClient(timeout=8) as client: + for chat_id in recipients: + try: + response = await client.post( + f"https://api.telegram.org/bot{settings.bot_token}/sendMessage", + json={ + "chat_id": chat_id, + "text": text, + "parse_mode": "HTML", + "disable_web_page_preview": True, + }, + ) + response.raise_for_status() + except Exception as error: # noqa: BLE001 - notification delivery is best-effort + logger.warning("Admin Telegram notification failed: %s", error) + errors.append(str(error)) + if errors: + notification.telegram_status = "failed" + notification.telegram_error = "; ".join(errors)[:2000] + else: + notification.telegram_status = "sent" + + +async def mark_admin_notification_read( + session: AsyncSession, notification: AdminNotification +) -> AdminNotification: + notification.status = "read" + notification.read_at = datetime.now(UTC) + await session.flush() + return notification + + +async def dismiss_admin_notification( + session: AsyncSession, notification: AdminNotification +) -> AdminNotification: + notification.status = "dismissed" + notification.dismissed_at = datetime.now(UTC) + await session.flush() + return notification