Files
drivers_bot/app/api/admin.py
VPN SaaS Dev 8982299e71
Some checks failed
ci / test (pull_request) Has been cancelled
add admin data mutations and load check
2026-05-18 18:37:19 +09:00

1381 lines
58 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import csv
import enum
import io
import json
from datetime import UTC, date, datetime, timedelta
from decimal import Decimal
from typing import Any
from fastapi import APIRouter, Depends, HTTPException
from fastapi.encoders import jsonable_encoder
from pydantic import BaseModel, ConfigDict, Field
from sqlalchemy import Select, func, or_, select
from sqlalchemy.ext.asyncio import AsyncSession
from app.api.deps import get_current_telegram_user, log_audit, require_platform_role
from app.db.session import get_session
from app.models.car import (
AdminExportJob,
AdminNotification,
AuditLog,
Car,
CarServiceLink,
ServiceAppointment,
ServiceCenter,
ServiceCenterReview,
ServiceCenterVerification,
ServiceEmployee,
ServiceNotification,
ServiceProductItem,
ServiceVisit,
ServiceWorkItem,
)
from app.models.expense import ExpenseEntry, FuelEntry, ServiceEntry
from app.models.user import User
from app.schemas.service_center import AdminModerationDecision, ServiceCenterRead, ServiceVisitRead
from app.services.admin_notifications import (
create_admin_notification,
dismiss_admin_notification,
mark_admin_notification_read,
retry_admin_telegram_notifications,
)
from app.services.notifications import notify_user, process_notification_queue
router = APIRouter(prefix="/admin", tags=["admin"])
ADMIN_ROLES = {"admin", "super_admin", "moderator", "support", "analyst"}
FULL_ADMIN_ROLES = {"admin", "super_admin"}
MODERATION_ROLES = {"admin", "super_admin", "moderator"}
DATA_EXPORT_ROLES = {"admin", "super_admin", "analyst"}
class AdminDataQuery(BaseModel):
model_config = ConfigDict(extra="forbid")
source: str
date_from: date | None = None
date_to: date | None = None
status: str | None = None
user_id: int | None = None
telegram_id: int | None = None
vehicle_id: int | None = None
sto_id: int | None = None
city: str | None = None
role: str | None = None
category: str | None = None
amount_min: Decimal | None = None
amount_max: Decimal | None = None
has_errors: bool | None = None
is_pending: bool | None = None
is_completed: bool | None = None
search: str | None = None
sort: str = "created_at_desc"
limit: int = Field(default=50, ge=1, le=500)
offset: int = Field(default=0, ge=0)
include_sensitive: bool = False
reason: str | None = None
class AdminExportRequest(AdminDataQuery):
export_format: str = "json"
class AdminUserNote(BaseModel):
note: str
class AdminDataMutation(BaseModel):
model_config = ConfigDict(extra="forbid")
values: dict[str, Any] = Field(default_factory=dict)
reason: str
class AdminDataDelete(BaseModel):
model_config = ConfigDict(extra="forbid")
reason: str
hard: bool = False
DATA_SOURCES: dict[str, dict[str, Any]] = {
"users": {
"model": User,
"roles": ADMIN_ROLES,
"search": ["username", "first_name", "last_name"],
"filters": {"user_id": "id", "telegram_id": "telegram_id", "role": "platform_role"},
"sensitive": {"telegram_id"},
"columns": ["id", "telegram_id", "username", "first_name", "last_name", "platform_role", "created_at", "updated_at"],
"editable": ["username", "first_name", "last_name", "platform_role", "locale", "currency"],
"delete": {"type": "soft", "field": "platform_role", "value": "blocked"},
},
"vehicles": {
"model": Car,
"roles": ADMIN_ROLES,
"search": ["name", "make", "model", "vin", "plate_number", "license_plate_display"],
"filters": {"user_id": "owner_id", "vehicle_id": "id"},
"sensitive": {"vin", "plate_number", "license_plate_display", "vin_normalized", "license_plate_normalized"},
"columns": ["id", "owner_id", "name", "make", "model", "year", "vin", "plate_number", "license_plate_display", "current_odometer", "created_at"],
"editable": ["name", "make", "model", "year", "vin", "plate_number", "license_plate_display", "current_odometer", "notes"],
"delete": {"type": "hard", "requires_super_admin": True},
},
"fuel_entries": {
"model": FuelEntry,
"roles": FULL_ADMIN_ROLES | {"analyst"},
"filters": {"vehicle_id": "car_id"},
"amount": "total_cost",
"date": "entry_date",
"columns": ["id", "car_id", "entry_date", "odometer", "liters", "total_cost", "created_at"],
"editable": ["entry_date", "odometer", "liters", "price_per_liter", "total_cost", "station", "fuel_brand", "is_full_tank", "notes"],
"delete": {"type": "hard"},
},
"service_entries": {
"model": ServiceEntry,
"roles": FULL_ADMIN_ROLES | {"analyst", "support"},
"search": ["title", "vendor", "category"],
"filters": {"vehicle_id": "car_id", "category": "category"},
"amount": "total_cost",
"date": "entry_date",
"columns": ["id", "car_id", "entry_date", "service_type", "title", "total_cost", "created_at"],
"editable": ["entry_date", "odometer", "service_type", "title", "category", "vendor", "total_cost", "next_due_date", "next_due_odometer", "notes"],
"delete": {"type": "hard"},
},
"expense_entries": {
"model": ExpenseEntry,
"roles": FULL_ADMIN_ROLES | {"analyst"},
"search": ["title", "vendor"],
"filters": {"vehicle_id": "car_id", "category": "category"},
"amount": "total_cost",
"date": "entry_date",
"columns": ["id", "car_id", "entry_date", "category", "title", "total_cost", "currency", "created_at"],
"editable": ["entry_date", "category", "title", "vendor", "total_cost", "currency", "odometer", "period_start", "period_end", "period_months", "is_recurring", "notes"],
"delete": {"type": "hard"},
},
"sto_profiles": {
"model": ServiceCenter,
"roles": ADMIN_ROLES,
"search": ["name", "display_name", "legal_name", "city"],
"filters": {"sto_id": "id", "city": "city", "status": "verification_status", "user_id": "owner_user_id"},
"sensitive": {"phone", "contact_phone", "business_registration_number"},
"columns": ["id", "display_name", "legal_name", "city", "phone", "verification_status", "owner_user_id", "created_at", "verified_at"],
"editable": ["display_name", "legal_name", "country", "city", "address", "phone", "contact_phone", "description", "working_hours", "contact_person", "verification_status"],
"delete": {"type": "soft", "field": "verification_status", "value": "suspended", "timestamp_field": "suspended_at"},
},
"sto_applications": {
"model": ServiceCenterVerification,
"roles": MODERATION_ROLES,
"filters": {"sto_id": "service_center_id", "status": "status", "user_id": "reviewed_by"},
"columns": ["id", "service_center_id", "status", "reviewed_by", "reviewed_at", "created_at", "comment"],
"editable": ["status", "comment", "reviewed_by", "reviewed_at"],
"delete": {"type": "soft", "field": "status", "value": "rejected"},
},
"sto_employees": {
"model": ServiceEmployee,
"roles": MODERATION_ROLES | {"support"},
"filters": {"sto_id": "service_center_id", "user_id": "user_id", "role": "role", "status": "status"},
"columns": ["id", "service_center_id", "user_id", "role", "status", "created_at"],
"editable": ["role", "permissions", "status"],
"delete": {"type": "soft", "field": "status", "value": "revoked", "timestamp_field": "invite_revoked_at"},
},
"vehicle_sto_links": {
"model": CarServiceLink,
"roles": MODERATION_ROLES | {"support"},
"filters": {"vehicle_id": "car_id", "sto_id": "service_center_id", "status": "status"},
"columns": ["id", "car_id", "service_center_id", "access_level", "status", "created_at"],
"editable": ["access_level", "status", "external_vehicle_ref"],
"delete": {"type": "soft", "field": "status", "value": "revoked", "timestamp_field": "revoked_at"},
},
"appointments": {
"model": ServiceAppointment,
"roles": ADMIN_ROLES,
"filters": {"vehicle_id": "vehicle_id", "sto_id": "service_center_id", "user_id": "owner_user_id", "status": "status"},
"columns": ["id", "service_center_id", "vehicle_id", "owner_user_id", "service_type", "service_name", "status", "requested_start_at", "created_at"],
"editable": ["service_type", "service_name", "requested_start_at", "requested_end_at", "confirmed_start_at", "confirmed_end_at", "estimated_duration_minutes", "status", "customer_comment", "service_center_comment"],
"delete": {"type": "soft", "field": "status", "value": "cancelled"},
},
"work_orders": {
"model": ServiceVisit,
"roles": ADMIN_ROLES,
"filters": {"vehicle_id": "vehicle_id", "sto_id": "service_center_id", "user_id": "owner_user_id", "status": "status"},
"amount": "final_total",
"columns": ["id", "service_center_id", "vehicle_id", "owner_user_id", "status", "final_total", "currency", "created_at", "completed_at"],
"editable": ["visit_date", "odometer", "status", "customer_complaint", "diagnosis", "recommendations", "internal_notes", "labor_total", "product_total", "discount_total", "final_total", "currency"],
"delete": {"type": "soft", "field": "status", "value": "archived"},
},
"work_order_items": {
"model": ServiceWorkItem,
"roles": FULL_ADMIN_ROLES | {"support"},
"filters": {"category": "category"},
"amount": "total",
"columns": ["id", "service_visit_id", "work_type", "title", "category", "quantity", "total", "created_at"],
"editable": ["work_type", "title", "category", "description", "quantity", "unit", "unit_price", "discount", "total", "parts", "oil_brand", "oil_viscosity", "oil_volume"],
"delete": {"type": "hard"},
},
"work_order_products": {
"model": ServiceProductItem,
"roles": FULL_ADMIN_ROLES | {"support"},
"filters": {"category": "category"},
"amount": "total",
"columns": ["id", "service_visit_id", "title", "category", "product_type", "quantity", "total", "created_at"],
"editable": ["title", "category", "product_type", "brand", "sku", "quantity", "unit", "unit_price", "discount", "total"],
"delete": {"type": "hard"},
},
"reviews": {
"model": ServiceCenterReview,
"roles": MODERATION_ROLES | {"support", "analyst"},
"filters": {"sto_id": "service_center_id", "user_id": "user_id", "status": "status"},
"columns": ["id", "service_center_id", "user_id", "rating", "status", "created_at"],
"editable": ["rating", "text", "status", "service_response", "service_responded_at"],
"delete": {"type": "soft", "field": "status", "value": "hidden"},
},
"notifications": {
"model": ServiceNotification,
"roles": FULL_ADMIN_ROLES | {"support"},
"filters": {"user_id": "recipient_user_id", "status": "status", "sto_id": "service_center_id"},
"columns": ["id", "recipient_user_id", "notification_type", "title", "status", "created_at", "sent_at"],
"editable": ["title", "body", "status", "retry_count", "last_error"],
"delete": {"type": "soft", "field": "status", "value": "dismissed"},
},
"admin_notifications": {
"model": AdminNotification,
"roles": ADMIN_ROLES,
"filters": {"status": "status"},
"columns": ["id", "event_type", "severity", "title", "entity_type", "entity_id", "status", "created_at"],
"editable": ["severity", "title", "body", "status", "telegram_status", "telegram_error"],
"delete": {"type": "soft", "field": "status", "value": "dismissed", "timestamp_field": "dismissed_at"},
},
"audit_logs": {
"model": AuditLog,
"roles": FULL_ADMIN_ROLES | {"moderator", "support"},
"search": ["action", "target_type", "target_id"],
"filters": {"user_id": "actor_user_id", "role": "actor_role"},
"columns": ["id", "actor_user_id", "actor_role", "action", "target_type", "target_id", "ip", "created_at"],
},
"ocr_results": {"model": None, "roles": ADMIN_ROLES, "columns": []},
"imports_exports": {
"model": AdminExportJob,
"roles": DATA_EXPORT_ROLES,
"filters": {"user_id": "requested_by_user_id", "status": "status"},
"columns": ["id", "requested_by_user_id", "source", "export_format", "status", "row_count", "created_at"],
"editable": ["status", "reason", "expires_at"],
"delete": {"type": "soft", "field": "status", "value": "expired"},
},
}
def require_admin_or_verifier(user: User) -> None:
require_platform_role(user, MODERATION_ROLES | {"verifier"})
def require_admin_access(user: User, allowed: set[str] | None = None) -> None:
require_platform_role(user, allowed or ADMIN_ROLES)
def mask_text(value: Any, visible: int = 4) -> Any:
if value is None:
return None
text = str(value)
if len(text) <= visible:
return "*" * len(text)
return f"{text[:2]}{'*' * max(len(text) - visible, 3)}{text[-2:]}"
def can_view_sensitive(user: User, query: AdminDataQuery) -> bool:
if not query.include_sensitive:
return False
if user.platform_role not in FULL_ADMIN_ROLES:
raise HTTPException(status_code=403, detail="Sensitive data is restricted")
if not query.reason or len(query.reason.strip()) < 5:
raise HTTPException(status_code=400, detail="reason is required for sensitive data")
return True
def public_columns(source: str) -> list[str]:
return list(DATA_SOURCES[source].get("columns") or [])
def serialize_row(row: Any, source: str, *, include_sensitive: bool, role: str) -> dict[str, Any]:
config = DATA_SOURCES[source]
sensitive = set(config.get("sensitive") or set())
columns = public_columns(source)
payload: dict[str, Any] = {}
for column in columns:
value = getattr(row, column, None)
if role == "analyst" and column in {"telegram_id", "username", "first_name", "last_name", "owner_id", "user_id"}:
value = mask_text(value)
elif column in sensitive and not include_sensitive:
value = mask_text(value)
payload[column] = jsonable_encoder(value)
return payload
def source_config(source: str, user: User) -> dict[str, Any]:
config = DATA_SOURCES.get(source)
if config is None or config.get("model") is None:
raise HTTPException(status_code=400, detail="Unsupported data source")
require_admin_access(user, set(config["roles"]))
return config
def mutation_source_config(source: str, user: User, *, action: str) -> dict[str, Any]:
config = source_config(source, user)
if user.platform_role not in FULL_ADMIN_ROLES:
raise HTTPException(status_code=403, detail="Only admin roles can modify data")
if action == "update" and not config.get("editable"):
raise HTTPException(status_code=400, detail="Data source is read-only")
if action == "delete" and not config.get("delete"):
raise HTTPException(status_code=400, detail="Data source cannot be deleted from admin")
delete_config = config.get("delete") or {}
if action == "delete" and delete_config.get("requires_super_admin") and user.platform_role != "super_admin":
raise HTTPException(status_code=403, detail="Super admin role is required")
return config
def require_mutation_reason(reason: str) -> str:
clean = reason.strip()
if len(clean) < 5:
raise HTTPException(status_code=400, detail="reason is required")
return clean
def model_column_python_type(model: Any, field: str) -> type | None:
column = getattr(model, "__mapper__", None).columns.get(field) if getattr(model, "__mapper__", None) else None
if column is None:
return None
try:
return column.type.python_type
except NotImplementedError:
return None
def coerce_admin_value(model: Any, field: str, value: Any, current_value: Any) -> Any:
if value == "":
return None
if value is None:
return None
target_type = type(current_value) if current_value is not None else model_column_python_type(model, field)
if target_type is None:
return value
if isinstance(current_value, enum.Enum):
return current_value.__class__(value)
if isinstance(target_type, type) and issubclass(target_type, enum.Enum):
return target_type(value)
if target_type is Decimal:
return Decimal(str(value))
if target_type is datetime:
if isinstance(value, datetime):
return value
return datetime.fromisoformat(str(value).replace("Z", "+00:00"))
if target_type is date:
if isinstance(value, date) and not isinstance(value, datetime):
return value
return date.fromisoformat(str(value)[:10])
if target_type is bool:
if isinstance(value, bool):
return value
return str(value).strip().lower() in {"1", "true", "yes", "on", "да"}
if target_type in {int, float, str}:
return target_type(value)
return value
async def get_mutation_item(session: AsyncSession, config: dict[str, Any], item_id: int) -> Any:
item = await session.get(config["model"], item_id)
if item is None:
raise HTTPException(status_code=404, detail="Record not found")
return item
def mutation_snapshot(item: Any, fields: list[str]) -> dict[str, Any]:
return {field: jsonable_encoder(getattr(item, field, None)) for field in fields}
def apply_data_filters(stmt: Select, query: AdminDataQuery, config: dict[str, Any]) -> Select:
model = config["model"]
date_column = config.get("date", "created_at")
if hasattr(model, date_column):
column = getattr(model, date_column)
if query.date_from:
stmt = stmt.where(column >= query.date_from)
if query.date_to:
stmt = stmt.where(column <= query.date_to)
for query_field, model_field in (config.get("filters") or {}).items():
value = getattr(query, query_field)
if value is not None and value != "" and hasattr(model, model_field):
stmt = stmt.where(getattr(model, model_field) == value)
if query.is_pending is True and hasattr(model, "status"):
stmt = stmt.where(model.status.in_(["pending", "requested", "unread"]))
if query.is_completed is True and hasattr(model, "status"):
stmt = stmt.where(model.status.in_(["completed", "closed", "confirmed"]))
amount_column = config.get("amount")
if amount_column and hasattr(model, amount_column):
column = getattr(model, amount_column)
if query.amount_min is not None:
stmt = stmt.where(column >= query.amount_min)
if query.amount_max is not None:
stmt = stmt.where(column <= query.amount_max)
if query.has_errors is not None:
if hasattr(model, "last_error"):
stmt = stmt.where(model.last_error.is_not(None) if query.has_errors else model.last_error.is_(None))
elif hasattr(model, "telegram_error"):
stmt = stmt.where(model.telegram_error.is_not(None) if query.has_errors else model.telegram_error.is_(None))
if query.search:
search_fields = [field for field in config.get("search", []) if hasattr(model, field)]
if search_fields:
pattern = f"%{query.search}%"
stmt = stmt.where(or_(*(getattr(model, field).ilike(pattern) for field in search_fields)))
return stmt
def apply_sort(stmt: Select, query: AdminDataQuery, config: dict[str, Any]) -> Select:
model = config["model"]
sort_map = {
"created_at_desc": ("created_at", True),
"created_at_asc": ("created_at", False),
"updated_at_desc": ("updated_at", True),
"amount_desc": (config.get("amount") or "created_at", True),
"status": ("status", False),
"city": ("city", False),
}
field, desc = sort_map.get(query.sort, ("created_at", True))
if not hasattr(model, field):
field = "id"
column = getattr(model, field)
return stmt.order_by(column.desc() if desc else column.asc())
async def run_data_query(
session: AsyncSession, current_user: User, query: AdminDataQuery
) -> dict[str, Any]:
config = source_config(query.source, current_user)
include_sensitive = can_view_sensitive(current_user, query)
stmt = apply_sort(apply_data_filters(select(config["model"]), query, config), query, config)
result = await session.execute(stmt.limit(query.limit).offset(query.offset))
rows = [
serialize_row(item, query.source, include_sensitive=include_sensitive, role=current_user.platform_role)
for item in result.scalars()
]
await log_audit(
session,
actor=current_user,
action="admin.data.query",
target_type=query.source,
metadata={
"filters": query.model_dump(mode="json", exclude={"reason"}),
"include_sensitive": include_sensitive,
"reason": query.reason if include_sensitive else None,
"rows": len(rows),
},
)
return {"source": query.source, "limit": query.limit, "offset": query.offset, "rows": rows}
def csv_from_rows(rows: list[dict[str, Any]]) -> str:
output = io.StringIO()
if not rows:
return ""
writer = csv.DictWriter(output, fieldnames=list(rows[0].keys()))
writer.writeheader()
writer.writerows(rows)
return output.getvalue()
@router.get("/dashboard")
async def admin_dashboard(
session: AsyncSession = Depends(get_session),
current_user: User = Depends(get_current_telegram_user),
) -> dict[str, Any]:
require_admin_access(current_user)
today = datetime.now(UTC).date()
week_ago = today - timedelta(days=6)
async def count(stmt: Select) -> int:
return int((await session.execute(stmt)).scalar_one() or 0)
users_total = await count(select(func.count(User.id)))
users_today = await count(select(func.count(User.id)).where(func.date(User.created_at) == today))
users_7d = await count(select(func.count(User.id)).where(func.date(User.created_at) >= week_ago))
vehicles_total = await count(select(func.count(Car.id)))
pending_sto = await count(select(func.count(ServiceCenter.id)).where(ServiceCenter.verification_status == "pending"))
approved_sto = await count(select(func.count(ServiceCenter.id)).where(ServiceCenter.verification_status.in_(["approved", "verified"])))
suspended_sto = await count(select(func.count(ServiceCenter.id)).where(ServiceCenter.verification_status == "suspended"))
appointments_today = await count(
select(func.count(ServiceAppointment.id)).where(func.date(ServiceAppointment.requested_start_at) == today)
)
active_work_orders = await count(
select(func.count(ServiceVisit.id)).where(ServiceVisit.status.in_(["draft", "awaiting_approval", "approved", "in_progress"]))
)
completed_work_orders = await count(select(func.count(ServiceVisit.id)).where(ServiceVisit.status == "completed"))
system_errors = await count(select(func.count(AdminNotification.id)).where(AdminNotification.severity.in_(["error", "critical"])))
security_events = await count(select(func.count(AdminNotification.id)).where(AdminNotification.event_type.in_(["security_event", "rate_limit_exceeded", "upload_blocked"])))
latest_alerts = (
await session.execute(select(AdminNotification).order_by(AdminNotification.created_at.desc()).limit(8))
).scalars()
return {
"users_today": users_today,
"users_7d": users_7d,
"users_total": users_total,
"active_users": users_7d,
"vehicles_total": vehicles_total,
"new_sto_applications": pending_sto,
"pending_sto_applications": pending_sto,
"approved_sto": approved_sto,
"suspended_sto": suspended_sto,
"appointments_today": appointments_today,
"active_work_orders": active_work_orders,
"completed_work_orders": completed_work_orders,
"system_errors": system_errors,
"security_events": security_events,
"latest_alerts": [serialize_row(item, "admin_notifications", include_sensitive=False, role=current_user.platform_role) for item in latest_alerts],
}
@router.get("/notifications")
async def admin_notifications(
status: str | None = None,
limit: int = 50,
offset: int = 0,
session: AsyncSession = Depends(get_session),
current_user: User = Depends(get_current_telegram_user),
) -> dict[str, Any]:
require_admin_access(current_user)
limit = min(max(limit, 1), 200)
stmt = select(AdminNotification)
if status:
stmt = stmt.where(AdminNotification.status == status)
rows = (
await session.execute(stmt.order_by(AdminNotification.created_at.desc()).limit(limit).offset(max(offset, 0)))
).scalars()
return {"rows": [jsonable_encoder(item) for item in rows], "limit": limit, "offset": offset}
@router.post("/notifications/{notification_id}/read")
async def read_admin_notification(
notification_id: int,
session: AsyncSession = Depends(get_session),
current_user: User = Depends(get_current_telegram_user),
) -> dict[str, Any]:
require_admin_access(current_user)
notification = await session.get(AdminNotification, notification_id)
if notification is None:
raise HTTPException(status_code=404, detail="Admin notification not found")
await mark_admin_notification_read(session, notification)
await log_audit(session, actor=current_user, action="admin_notification.read", target_type="admin_notification", target_id=notification_id)
await session.commit()
return jsonable_encoder(notification)
@router.post("/notifications/read-all")
async def read_all_admin_notifications(
session: AsyncSession = Depends(get_session),
current_user: User = Depends(get_current_telegram_user),
) -> dict[str, Any]:
require_admin_access(current_user)
rows = (await session.execute(select(AdminNotification).where(AdminNotification.status == "unread"))).scalars().all()
for notification in rows:
await mark_admin_notification_read(session, notification)
await log_audit(session, actor=current_user, action="admin_notification.read_all", target_type="admin_notification", metadata={"count": len(rows)})
await session.commit()
return {"updated": len(rows)}
@router.post("/notifications/retry")
async def retry_notifications(
limit: int = 50,
session: AsyncSession = Depends(get_session),
current_user: User = Depends(get_current_telegram_user),
) -> dict[str, Any]:
require_admin_access(current_user, FULL_ADMIN_ROLES | {"support"})
limit = min(max(limit, 1), 200)
service_delivered = await process_notification_queue(session, limit=limit)
admin_delivered = await retry_admin_telegram_notifications(session, limit=limit)
await log_audit(
session,
actor=current_user,
action="admin.notifications.retry",
target_type="notifications",
metadata={
"limit": limit,
"service_delivered": service_delivered,
"admin_delivered": admin_delivered,
},
)
await session.commit()
return {
"service_delivered": service_delivered,
"admin_delivered": admin_delivered,
"limit": limit,
}
@router.post("/notifications/{notification_id}/dismiss")
async def dismiss_notification(
notification_id: int,
session: AsyncSession = Depends(get_session),
current_user: User = Depends(get_current_telegram_user),
) -> dict[str, Any]:
require_admin_access(current_user)
notification = await session.get(AdminNotification, notification_id)
if notification is None:
raise HTTPException(status_code=404, detail="Admin notification not found")
await dismiss_admin_notification(session, notification)
await log_audit(session, actor=current_user, action="admin_notification.dismiss", target_type="admin_notification", target_id=notification_id)
await session.commit()
return jsonable_encoder(notification)
@router.get("/data/sources")
async def admin_data_sources(current_user: User = Depends(get_current_telegram_user)) -> dict[str, Any]:
require_admin_access(current_user)
return {
"sources": [
{
"name": name,
"available": bool(config.get("model")),
"columns": config.get("columns") or [],
"sensitive": sorted(config.get("sensitive") or []),
"allowed": current_user.platform_role in set(config["roles"]),
"editable": sorted(config.get("editable") or []),
"deletable": bool(config.get("delete")),
"delete_mode": (config.get("delete") or {}).get("type"),
}
for name, config in DATA_SOURCES.items()
],
"sorts": ["created_at_desc", "created_at_asc", "updated_at_desc", "amount_desc", "status", "city"],
"limits": [25, 50, 100, 500],
}
@router.post("/data/query")
async def admin_data_query(
payload: AdminDataQuery,
session: AsyncSession = Depends(get_session),
current_user: User = Depends(get_current_telegram_user),
) -> dict[str, Any]:
data = await run_data_query(session, current_user, payload)
await session.commit()
return data
@router.patch("/data/{source}/{item_id}")
async def admin_data_update(
source: str,
item_id: int,
payload: AdminDataMutation,
session: AsyncSession = Depends(get_session),
current_user: User = Depends(get_current_telegram_user),
) -> dict[str, Any]:
reason = require_mutation_reason(payload.reason)
config = mutation_source_config(source, current_user, action="update")
editable = set(config.get("editable") or [])
values = {key: value for key, value in payload.values.items() if key in editable}
if not payload.values:
raise HTTPException(status_code=400, detail="values are required")
forbidden = sorted(set(payload.values) - editable)
if forbidden:
raise HTTPException(status_code=400, detail=f"Forbidden fields: {', '.join(forbidden)}")
item = await get_mutation_item(session, config, item_id)
old_values = mutation_snapshot(item, sorted(values))
for field, value in values.items():
setattr(item, field, coerce_admin_value(config["model"], field, value, getattr(item, field, None)))
await session.flush()
await session.refresh(item)
new_values = mutation_snapshot(item, sorted(values))
await log_audit(
session,
actor=current_user,
action="admin.data.update",
target_type=source,
target_id=item_id,
metadata={"reason": reason, "old": old_values, "new": new_values},
)
await session.commit()
await session.refresh(item)
return {
"source": source,
"id": item_id,
"row": serialize_row(item, source, include_sensitive=True, role=current_user.platform_role),
}
@router.delete("/data/{source}/{item_id}")
async def admin_data_delete(
source: str,
item_id: int,
payload: AdminDataDelete,
session: AsyncSession = Depends(get_session),
current_user: User = Depends(get_current_telegram_user),
) -> dict[str, Any]:
reason = require_mutation_reason(payload.reason)
config = mutation_source_config(source, current_user, action="delete")
delete_config = config["delete"]
item = await get_mutation_item(session, config, item_id)
columns = public_columns(source)
old_values = mutation_snapshot(item, columns)
delete_type = delete_config.get("type")
if payload.hard and delete_type != "hard":
raise HTTPException(status_code=400, detail="Hard delete is not supported for this source")
if delete_type == "hard":
await session.delete(item)
result_payload: dict[str, Any] = {"source": source, "id": item_id, "deleted": True, "mode": "hard"}
else:
field = delete_config["field"]
if not hasattr(item, field):
raise HTTPException(status_code=400, detail="Soft delete field is not available")
setattr(item, field, delete_config.get("value"))
timestamp_field = delete_config.get("timestamp_field")
if timestamp_field and hasattr(item, timestamp_field):
setattr(item, timestamp_field, datetime.now(UTC))
await session.flush()
await session.refresh(item)
result_payload = {
"source": source,
"id": item_id,
"deleted": True,
"mode": "soft",
"row": serialize_row(item, source, include_sensitive=True, role=current_user.platform_role),
}
await log_audit(
session,
actor=current_user,
action="admin.data.delete",
target_type=source,
target_id=item_id,
metadata={"reason": reason, "mode": result_payload["mode"], "old": old_values},
)
await session.commit()
return result_payload
@router.post("/data/export")
async def admin_data_export(
payload: AdminExportRequest,
session: AsyncSession = Depends(get_session),
current_user: User = Depends(get_current_telegram_user),
) -> dict[str, Any]:
require_admin_access(current_user, DATA_EXPORT_ROLES)
if payload.export_format not in {"json", "csv"}:
raise HTTPException(status_code=400, detail="Unsupported export format")
if payload.source in {"users", "vehicles", "sto_profiles"} and not payload.reason:
raise HTTPException(status_code=400, detail="Export reason is required")
data = await run_data_query(session, current_user, payload)
content = json.dumps(data["rows"], ensure_ascii=False, default=str, indent=2)
if payload.export_format == "csv":
content = csv_from_rows(data["rows"])
job = AdminExportJob(
requested_by_user_id=current_user.id,
source=payload.source,
export_format=payload.export_format,
status="ready",
reason=payload.reason,
filters_json=payload.model_dump(mode="json", exclude={"reason"}),
result_text=content,
row_count=len(data["rows"]),
expires_at=datetime.now(UTC) + timedelta(days=7),
)
session.add(job)
await log_audit(session, actor=current_user, action="admin.data.export", target_type=payload.source, metadata={"format": payload.export_format, "rows": len(data["rows"])})
await session.commit()
await session.refresh(job)
return {"id": job.id, "status": job.status, "row_count": job.row_count, "expires_at": job.expires_at}
@router.get("/exports")
async def admin_exports(
session: AsyncSession = Depends(get_session),
current_user: User = Depends(get_current_telegram_user),
) -> dict[str, Any]:
require_admin_access(current_user, DATA_EXPORT_ROLES)
rows = (await session.execute(select(AdminExportJob).order_by(AdminExportJob.created_at.desc()).limit(50))).scalars()
return {
"rows": [
{
"id": item.id,
"requested_by_user_id": item.requested_by_user_id,
"source": item.source,
"export_format": item.export_format,
"status": item.status,
"row_count": item.row_count,
"reason": item.reason,
"expires_at": item.expires_at,
"created_at": item.created_at,
}
for item in rows
]
}
@router.get("/exports/{export_id}")
async def admin_export_detail(
export_id: int,
session: AsyncSession = Depends(get_session),
current_user: User = Depends(get_current_telegram_user),
) -> dict[str, Any]:
require_admin_access(current_user, DATA_EXPORT_ROLES)
job = await session.get(AdminExportJob, export_id)
if job is None:
raise HTTPException(status_code=404, detail="Export not found")
await log_audit(session, actor=current_user, action="admin.export.view", target_type="admin_export", target_id=export_id)
await session.commit()
return jsonable_encoder(job)
@router.get("/users")
async def admin_users(
search: str | None = None,
limit: int = 50,
offset: int = 0,
session: AsyncSession = Depends(get_session),
current_user: User = Depends(get_current_telegram_user),
) -> dict[str, Any]:
query = AdminDataQuery(source="users", search=search, limit=min(limit, 100), offset=offset)
data = await run_data_query(session, current_user, query)
await session.commit()
return data
@router.get("/users/{user_id}")
async def admin_user_detail(
user_id: int,
session: AsyncSession = Depends(get_session),
current_user: User = Depends(get_current_telegram_user),
) -> dict[str, Any]:
require_admin_access(current_user, ADMIN_ROLES - {"analyst"})
user = await session.get(User, user_id)
if user is None:
raise HTTPException(status_code=404, detail="User not found")
cars_count = int((await session.execute(select(func.count(Car.id)).where(Car.owner_id == user.id))).scalar_one() or 0)
fuel_count = int(
(
await session.execute(
select(func.count(FuelEntry.id)).join(Car, FuelEntry.car_id == Car.id).where(Car.owner_id == user.id)
)
).scalar_one()
or 0
)
appointments_count = int((await session.execute(select(func.count(ServiceAppointment.id)).where(ServiceAppointment.owner_user_id == user.id))).scalar_one() or 0)
await log_audit(session, actor=current_user, action="admin.user.view", target_type="user", target_id=user.id)
await session.commit()
return {
**serialize_row(user, "users", include_sensitive=current_user.platform_role in FULL_ADMIN_ROLES, role=current_user.platform_role),
"cars_count": cars_count,
"records_count": fuel_count,
"appointments_count": appointments_count,
}
@router.get("/users/{user_id}/activity")
async def admin_user_activity(
user_id: int,
limit: int = 50,
session: AsyncSession = Depends(get_session),
current_user: User = Depends(get_current_telegram_user),
) -> dict[str, Any]:
require_admin_access(current_user, ADMIN_ROLES - {"analyst"})
rows = (
await session.execute(
select(AuditLog)
.where(AuditLog.actor_user_id == user_id)
.order_by(AuditLog.created_at.desc())
.limit(min(max(limit, 1), 100))
)
).scalars()
await log_audit(session, actor=current_user, action="admin.user.activity", target_type="user", target_id=user_id)
await session.commit()
return {"rows": [jsonable_encoder(item) for item in rows]}
@router.post("/users/{user_id}/note")
async def admin_user_note(
user_id: int,
payload: AdminUserNote,
session: AsyncSession = Depends(get_session),
current_user: User = Depends(get_current_telegram_user),
) -> dict[str, str]:
require_admin_access(current_user, {"admin", "super_admin", "support"})
await log_audit(session, actor=current_user, action="admin.user.note", target_type="user", target_id=user_id, metadata={"note": payload.note})
await session.commit()
return {"status": "saved"}
@router.post("/users/{user_id}/block")
async def admin_block_user(
user_id: int,
payload: AdminUserNote | None = None,
session: AsyncSession = Depends(get_session),
current_user: User = Depends(get_current_telegram_user),
) -> dict[str, Any]:
require_admin_access(current_user, FULL_ADMIN_ROLES)
user = await session.get(User, user_id)
if user is None:
raise HTTPException(status_code=404, detail="User not found")
user.platform_role = "blocked"
await log_audit(session, actor=current_user, action="admin.user.block", target_type="user", target_id=user_id, metadata={"reason": payload.note if payload else None})
await session.commit()
return {"id": user.id, "platform_role": user.platform_role}
@router.post("/users/{user_id}/unblock")
async def admin_unblock_user(
user_id: int,
payload: AdminUserNote | None = None,
session: AsyncSession = Depends(get_session),
current_user: User = Depends(get_current_telegram_user),
) -> dict[str, Any]:
require_admin_access(current_user, FULL_ADMIN_ROLES)
user = await session.get(User, user_id)
if user is None:
raise HTTPException(status_code=404, detail="User not found")
user.platform_role = "user"
await log_audit(session, actor=current_user, action="admin.user.unblock", target_type="user", target_id=user_id, metadata={"reason": payload.note if payload else None})
await session.commit()
return {"id": user.id, "platform_role": user.platform_role}
@router.get("/sto")
async def admin_sto(
status: str | None = None,
city: str | None = None,
limit: int = 50,
offset: int = 0,
session: AsyncSession = Depends(get_session),
current_user: User = Depends(get_current_telegram_user),
) -> dict[str, Any]:
query = AdminDataQuery(source="sto_profiles", status=status, city=city, limit=min(limit, 100), offset=offset)
data = await run_data_query(session, current_user, query)
await session.commit()
return data
@router.get("/sto/{service_center_id}")
async def admin_sto_detail(
service_center_id: int,
session: AsyncSession = Depends(get_session),
current_user: User = Depends(get_current_telegram_user),
) -> dict[str, Any]:
require_admin_access(current_user, ADMIN_ROLES - {"analyst"})
center = await session.get(ServiceCenter, service_center_id)
if center is None:
raise HTTPException(status_code=404, detail="Service center not found")
employees_count = int((await session.execute(select(func.count(ServiceEmployee.id)).where(ServiceEmployee.service_center_id == center.id))).scalar_one() or 0)
appointments_count = int((await session.execute(select(func.count(ServiceAppointment.id)).where(ServiceAppointment.service_center_id == center.id))).scalar_one() or 0)
work_orders_count = int((await session.execute(select(func.count(ServiceVisit.id)).where(ServiceVisit.service_center_id == center.id))).scalar_one() or 0)
await log_audit(session, actor=current_user, action="admin.sto.view", target_type="service_center", target_id=center.id)
await session.commit()
return {
**serialize_row(center, "sto_profiles", include_sensitive=current_user.platform_role in FULL_ADMIN_ROLES, role=current_user.platform_role),
"employees_count": employees_count,
"appointments_count": appointments_count,
"work_orders_count": work_orders_count,
}
@router.get("/sto-applications")
async def admin_sto_applications(
status: str | None = None,
city: str | None = None,
limit: int = 50,
offset: int = 0,
session: AsyncSession = Depends(get_session),
current_user: User = Depends(get_current_telegram_user),
) -> dict[str, Any]:
require_admin_access(current_user, MODERATION_ROLES)
stmt = select(ServiceCenter).where(ServiceCenter.verification_status.in_(["pending", "needs_changes"] if status is None else [status]))
if city:
stmt = stmt.where(ServiceCenter.city == city)
rows = (await session.execute(stmt.order_by(ServiceCenter.created_at.asc()).limit(min(limit, 100)).offset(offset))).scalars()
return {"rows": [serialize_row(item, "sto_profiles", include_sensitive=False, role=current_user.platform_role) for item in rows]}
async def center_id_from_application(session: AsyncSession, application_id: int) -> int:
verification = await session.get(ServiceCenterVerification, application_id)
if verification is None:
center = await session.get(ServiceCenter, application_id)
if center is None:
raise HTTPException(status_code=404, detail="STO application not found")
return center.id
return verification.service_center_id
@router.post("/sto-applications/{application_id}/approve", response_model=ServiceCenterRead)
async def approve_sto_application(
application_id: int,
payload: AdminModerationDecision | None = None,
session: AsyncSession = Depends(get_session),
current_user: User = Depends(get_current_telegram_user),
) -> ServiceCenter:
center_id = await center_id_from_application(session, application_id)
return await verify_service_center(center_id, payload, session, current_user)
@router.post("/sto-applications/{application_id}/reject", response_model=ServiceCenterRead)
async def reject_sto_application(
application_id: int,
payload: AdminModerationDecision | None = None,
session: AsyncSession = Depends(get_session),
current_user: User = Depends(get_current_telegram_user),
) -> ServiceCenter:
center_id = await center_id_from_application(session, application_id)
return await reject_service_center(center_id, payload, session, current_user)
@router.post("/sto-applications/{application_id}/request-changes", response_model=ServiceCenterRead)
async def request_sto_application_changes(
application_id: int,
payload: AdminModerationDecision,
session: AsyncSession = Depends(get_session),
current_user: User = Depends(get_current_telegram_user),
) -> ServiceCenter:
center_id = await center_id_from_application(session, application_id)
return await request_service_center_changes(center_id, payload, session, current_user)
@router.post("/sto/{service_center_id}/suspend", response_model=ServiceCenterRead)
async def suspend_sto(
service_center_id: int,
payload: AdminModerationDecision | None = None,
session: AsyncSession = Depends(get_session),
current_user: User = Depends(get_current_telegram_user),
) -> ServiceCenter:
return await suspend_service_center(service_center_id, payload, session, current_user)
@router.post("/sto/{service_center_id}/unsuspend", response_model=ServiceCenterRead)
async def unsuspend_sto(
service_center_id: int,
payload: AdminModerationDecision | None = None,
session: AsyncSession = Depends(get_session),
current_user: User = Depends(get_current_telegram_user),
) -> ServiceCenter:
require_admin_access(current_user, FULL_ADMIN_ROLES)
center = await session.get(ServiceCenter, service_center_id)
if center is None:
raise HTTPException(status_code=404, detail="Service center not found")
center.verification_status = "approved"
center.suspended_at = None
await create_admin_notification(
session,
event_type="sto_approved",
title="СТО разблокировано",
body=center.display_name or center.name,
entity_type="service_center",
entity_id=center.id,
idempotency_key=f"sto_unsuspended:{center.id}:{datetime.now(UTC).isoformat()}",
)
await log_audit(session, actor=current_user, action="service_center.unsuspend", target_type="service_center", target_id=center.id, metadata={"reason": payload.reason if payload else None})
await session.commit()
await session.refresh(center)
return center
@router.get("/service-centers/pending", response_model=list[ServiceCenterRead])
async def pending_service_centers(
session: AsyncSession = Depends(get_session),
current_user: User = Depends(get_current_telegram_user),
) -> list[ServiceCenter]:
require_admin_or_verifier(current_user)
result = await session.execute(
select(ServiceCenter)
.where(ServiceCenter.verification_status == "pending")
.order_by(ServiceCenter.created_at.asc())
)
return list(result.scalars())
@router.get("/service-centers/{service_center_id}", response_model=ServiceCenterRead)
async def admin_service_center_detail(
service_center_id: int,
session: AsyncSession = Depends(get_session),
current_user: User = Depends(get_current_telegram_user),
) -> ServiceCenter:
require_admin_or_verifier(current_user)
center = await session.get(ServiceCenter, service_center_id)
if center is None:
raise HTTPException(status_code=404, detail="Service center not found")
return center
@router.post("/service-centers/{service_center_id}/verify", response_model=ServiceCenterRead)
async def verify_service_center(
service_center_id: int,
payload: AdminModerationDecision | None = None,
session: AsyncSession = Depends(get_session),
current_user: User = Depends(get_current_telegram_user),
) -> ServiceCenter:
require_admin_or_verifier(current_user)
center = await session.get(ServiceCenter, service_center_id)
if center is None:
raise HTTPException(status_code=404, detail="Service center not found")
center.verification_status = "approved"
center.verified_at = datetime.now(UTC)
if center.owner_user_id:
owner = await session.get(User, center.owner_user_id)
if owner:
owner.platform_role = "service_owner"
await ensure_owner_employee(session, center.id, owner.id)
await notify_user(owner, f"Заявка СТО «{center.display_name or center.name}» одобрена. Панель СТО доступна в CarPass.")
await mark_latest_verification(session, center.id, "approved", current_user.id, payload)
await log_audit(
session,
actor=current_user,
action="service_center.verify",
target_type="service_center",
target_id=center.id,
metadata={"comment": payload.comment if payload else None},
)
await create_admin_notification(
session,
event_type="sto_approved",
title="СТО одобрено",
body=center.display_name or center.name,
entity_type="service_center",
entity_id=center.id,
idempotency_key=f"sto_approved:{center.id}:{center.verified_at.isoformat() if center.verified_at else 'now'}",
)
await session.commit()
await session.refresh(center)
return center
@router.post("/service-centers/{service_center_id}/reject", response_model=ServiceCenterRead)
async def reject_service_center(
service_center_id: int,
payload: AdminModerationDecision | None = None,
session: AsyncSession = Depends(get_session),
current_user: User = Depends(get_current_telegram_user),
) -> ServiceCenter:
require_admin_or_verifier(current_user)
center = await session.get(ServiceCenter, service_center_id)
if center is None:
raise HTTPException(status_code=404, detail="Service center not found")
center.verification_status = "rejected"
if center.owner_user_id:
owner = await session.get(User, center.owner_user_id)
if owner:
reason = payload.reason or payload.comment if payload else None
await notify_user(owner, f"Заявка СТО «{center.display_name or center.name}» отклонена.{f' Причина: {reason}' if reason else ''}")
await mark_latest_verification(session, center.id, "rejected", current_user.id, payload)
await log_audit(
session,
actor=current_user,
action="service_center.reject",
target_type="service_center",
target_id=center.id,
metadata={"reason": payload.reason if payload else None, "comment": payload.comment if payload else None},
)
await create_admin_notification(
session,
event_type="sto_application_updated",
title="Заявка СТО отклонена",
body=center.display_name or center.name,
entity_type="service_center",
entity_id=center.id,
idempotency_key=f"sto_rejected:{center.id}:{datetime.now(UTC).isoformat()}",
)
await session.commit()
await session.refresh(center)
return center
@router.post("/service-centers/{service_center_id}/suspend", response_model=ServiceCenterRead)
async def suspend_service_center(
service_center_id: int,
payload: AdminModerationDecision | None = None,
session: AsyncSession = Depends(get_session),
current_user: User = Depends(get_current_telegram_user),
) -> ServiceCenter:
require_admin_access(current_user, FULL_ADMIN_ROLES)
center = await session.get(ServiceCenter, service_center_id)
if center is None:
raise HTTPException(status_code=404, detail="Service center not found")
center.verification_status = "suspended"
center.suspended_at = datetime.now(UTC)
if center.owner_user_id:
owner = await session.get(User, center.owner_user_id)
if owner:
reason = payload.reason or payload.comment if payload else None
await notify_user(owner, f"СТО «{center.display_name or center.name}» временно заблокировано.{f' Причина: {reason}' if reason else ''}")
await log_audit(
session,
actor=current_user,
action="service_center.suspend",
target_type="service_center",
target_id=center.id,
metadata={"reason": payload.reason if payload else None, "comment": payload.comment if payload else None},
)
await create_admin_notification(
session,
event_type="sto_suspended",
title="СТО заблокировано",
body=center.display_name or center.name,
entity_type="service_center",
entity_id=center.id,
severity="warning",
idempotency_key=f"sto_suspended:{center.id}:{center.suspended_at.isoformat() if center.suspended_at else 'now'}",
)
await session.commit()
await session.refresh(center)
return center
@router.post("/service-centers/{service_center_id}/request-changes", response_model=ServiceCenterRead)
async def request_service_center_changes(
service_center_id: int,
payload: AdminModerationDecision,
session: AsyncSession = Depends(get_session),
current_user: User = Depends(get_current_telegram_user),
) -> ServiceCenter:
require_admin_or_verifier(current_user)
center = await session.get(ServiceCenter, service_center_id)
if center is None:
raise HTTPException(status_code=404, detail="Service center not found")
center.verification_status = "needs_changes"
if center.owner_user_id:
owner = await session.get(User, center.owner_user_id)
if owner:
reason = payload.reason or payload.comment or "Администратор попросил уточнить данные заявки."
await notify_user(owner, f"По заявке СТО «{center.display_name or center.name}» нужны правки: {reason}")
await mark_latest_verification(session, center.id, "needs_changes", current_user.id, payload)
await log_audit(
session,
actor=current_user,
action="service_center.request_changes",
target_type="service_center",
target_id=center.id,
metadata={"reason": payload.reason, "comment": payload.comment},
)
await create_admin_notification(
session,
event_type="sto_application_updated",
title="По заявке СТО запрошены правки",
body=center.display_name or center.name,
entity_type="service_center",
entity_id=center.id,
idempotency_key=f"sto_changes:{center.id}:{datetime.now(UTC).isoformat()}",
)
await session.commit()
await session.refresh(center)
return center
@router.get("/audit-log")
async def audit_log(
actor_id: int | None = None,
actor_role: str | None = None,
action: str | None = None,
entity_type: str | None = None,
entity_id: str | None = None,
date_from: date | None = None,
date_to: date | None = None,
severity: str | None = None,
ip: str | None = None,
user_agent: str | None = None,
limit: int = 100,
offset: int = 0,
session: AsyncSession = Depends(get_session),
current_user: User = Depends(get_current_telegram_user),
) -> list[dict]:
require_admin_access(current_user, FULL_ADMIN_ROLES | {"moderator", "support"})
limit = min(max(limit, 1), 200)
stmt = select(AuditLog)
if actor_id is not None:
stmt = stmt.where(AuditLog.actor_user_id == actor_id)
if actor_role:
stmt = stmt.where(AuditLog.actor_role == actor_role)
if action:
stmt = stmt.where(AuditLog.action.ilike(f"%{action}%"))
if entity_type:
stmt = stmt.where(AuditLog.target_type == entity_type)
if entity_id:
stmt = stmt.where(AuditLog.target_id == entity_id)
if date_from:
stmt = stmt.where(func.date(AuditLog.created_at) >= date_from)
if date_to:
stmt = stmt.where(func.date(AuditLog.created_at) <= date_to)
if ip:
stmt = stmt.where(AuditLog.ip == ip)
if user_agent:
stmt = stmt.where(AuditLog.user_agent.ilike(f"%{user_agent}%"))
if severity:
stmt = stmt.where(AuditLog.metadata_json["severity"].as_string() == severity)
result = await session.execute(stmt.order_by(AuditLog.created_at.desc()).limit(limit).offset(max(offset, 0)))
await log_audit(
session,
actor=current_user,
action="admin.audit_log.view",
target_type="audit_log",
metadata={"limit": limit, "offset": offset, "filter_action": action},
)
await session.commit()
return [
{
"id": item.id,
"actor_user_id": item.actor_user_id,
"actor_role": item.actor_role,
"action": item.action,
"target_type": item.target_type,
"target_id": item.target_id,
"metadata_json": item.metadata_json,
"created_at": item.created_at,
}
for item in result.scalars()
]
@router.get("/disputes", response_model=list[ServiceVisitRead])
async def disputes(
session: AsyncSession = Depends(get_session),
current_user: User = Depends(get_current_telegram_user),
) -> list[ServiceVisit]:
require_admin_or_verifier(current_user)
result = await session.execute(
select(ServiceVisit).where(ServiceVisit.status == "disputed").order_by(ServiceVisit.updated_at.desc())
)
return list(result.scalars())
async def mark_latest_verification(
session: AsyncSession,
service_center_id: int,
status: str,
reviewed_by: int,
payload: AdminModerationDecision | None = None,
) -> None:
result = await session.execute(
select(ServiceCenterVerification)
.where(ServiceCenterVerification.service_center_id == service_center_id)
.order_by(ServiceCenterVerification.created_at.desc())
.limit(1)
)
verification = result.scalar_one_or_none()
if verification:
verification.status = status
verification.reviewed_by = reviewed_by
verification.reviewed_at = datetime.now(UTC)
if payload and (payload.reason or payload.comment):
verification.comment = "\n".join(
item for item in [payload.reason, payload.comment] if item
)
async def ensure_owner_employee(session: AsyncSession, service_center_id: int, owner_user_id: int) -> None:
result = await session.execute(
select(ServiceEmployee).where(
ServiceEmployee.service_center_id == service_center_id,
ServiceEmployee.user_id == owner_user_id,
)
)
employee = result.scalar_one_or_none()
if employee is None:
session.add(
ServiceEmployee(
service_center_id=service_center_id,
user_id=owner_user_id,
role="owner",
status="active",
)
)
else:
employee.role = "owner"
employee.status = "active"