Files
drivers_bot/app/api/deps.py
2026-05-17 21:16:22 +09:00

208 lines
7.4 KiB
Python

from typing import Annotated
from fastapi import Depends, Header, HTTPException, status
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from app.core.config import settings
from app.db.session import get_session
from app.models.car import AuditLog, Car, ServiceCenter, ServiceEmployee, VehicleAccess
from app.models.user import User
from app.services.admin_notifications import create_admin_notification
from app.services.telegram_auth import verify_webapp_init_data
async def get_or_create_telegram_user(
session: AsyncSession,
*,
telegram_id: int,
username: str | None = None,
first_name: str | None = None,
last_name: str | None = None,
locale: str | None = None,
currency: str | None = None,
platform_role: str | None = None,
) -> User:
result = await session.execute(select(User).where(User.telegram_id == telegram_id))
user = result.scalar_one_or_none()
payload = {
"telegram_id": telegram_id,
"username": str(telegram_id),
"first_name": first_name,
"last_name": last_name,
"locale": locale,
"currency": currency,
"platform_role": platform_role,
}
if user is None:
user = User(**{key: value for key, value in payload.items() if value is not None})
session.add(user)
await session.flush()
await create_admin_notification(
session,
event_type="user_registered",
title="Новый пользователь",
body="\n".join(
item
for item in [
f"Имя: {' '.join(part for part in [first_name, last_name] if part) or '-'}",
f"Telegram ID: {telegram_id}",
f"Username: @{username}" if username else "Username: -",
]
),
entity_type="user",
entity_id=user.id,
idempotency_key=f"user_registered:{telegram_id}",
metadata={"telegram_id": telegram_id, "username": username},
)
else:
for field, value in payload.items():
if value is not None:
setattr(user, field, value)
await session.commit()
await session.refresh(user)
return user
def require_internal_api_token(token: str | None) -> None:
if not settings.internal_api_token:
raise HTTPException(
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
detail="Internal API token is not configured",
)
if not token or token != settings.internal_api_token:
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Forbidden")
async def get_current_telegram_user(
session: Annotated[AsyncSession, Depends(get_session)],
x_telegram_init_data: Annotated[str | None, Header(alias="X-Telegram-Init-Data")] = None,
x_internal_api_token: Annotated[str | None, Header(alias="X-Internal-API-Token")] = None,
x_telegram_user_id: Annotated[int | None, Header(alias="X-Telegram-User-Id")] = None,
x_dev_telegram_id: Annotated[int | None, Header(alias="X-Dev-Telegram-Id")] = None,
) -> User:
if x_telegram_init_data:
user_data = verify_webapp_init_data(x_telegram_init_data, settings.bot_token)
return await get_or_create_telegram_user(
session,
telegram_id=int(user_data["id"]),
username=user_data.get("username"),
first_name=user_data.get("first_name"),
last_name=user_data.get("last_name"),
locale=user_data.get("language_code"),
)
if x_internal_api_token and x_telegram_user_id:
require_internal_api_token(x_internal_api_token)
return await get_or_create_telegram_user(session, telegram_id=x_telegram_user_id)
if settings.allow_dev_auth and not settings.is_production and x_dev_telegram_id:
return await get_or_create_telegram_user(session, telegram_id=x_dev_telegram_id)
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Telegram initData required")
async def get_owned_car(
car_id: int,
current_user: Annotated[User, Depends(get_current_telegram_user)],
session: Annotated[AsyncSession, Depends(get_session)],
) -> Car:
car = await session.get(Car, car_id)
if car is None:
raise HTTPException(status_code=404, detail="Car not found")
if car.owner_id != current_user.id:
raise HTTPException(status_code=403, detail="Forbidden")
return car
async def user_has_vehicle_access(
session: AsyncSession, vehicle_id: int, user_id: int, roles: set[str] | None = None
) -> bool:
stmt = select(VehicleAccess).where(
VehicleAccess.vehicle_id == vehicle_id,
VehicleAccess.user_id == user_id,
VehicleAccess.status == "active",
)
if roles:
stmt = stmt.where(VehicleAccess.role.in_(roles))
result = await session.execute(stmt)
return result.scalar_one_or_none() is not None
async def ensure_vehicle_owner_or_access(
session: AsyncSession,
vehicle_id: int,
user: User,
roles: set[str] | None = None,
) -> Car:
car = await session.get(Car, vehicle_id)
if car is None:
raise HTTPException(status_code=404, detail="Vehicle not found")
if car.owner_id == user.id:
return car
if await user_has_vehicle_access(session, vehicle_id, user.id, roles):
return car
raise HTTPException(status_code=403, detail="Forbidden")
def require_platform_role(user: User, allowed: set[str]) -> None:
if user.platform_role not in allowed:
raise HTTPException(status_code=403, detail="Forbidden")
async def ensure_service_employee(
session: AsyncSession,
service_center_id: int,
user: User,
allowed_roles: set[str] | None = None,
) -> ServiceEmployee:
result = await session.execute(
select(ServiceEmployee).where(
ServiceEmployee.service_center_id == service_center_id,
ServiceEmployee.user_id == user.id,
ServiceEmployee.status == "active",
)
)
employee = result.scalar_one_or_none()
center = await session.get(ServiceCenter, service_center_id)
owner_allowed = center is not None and center.owner_user_id == user.id
if employee is None and owner_allowed:
employee = ServiceEmployee(
service_center_id=service_center_id,
user_id=user.id,
role="owner",
status="active",
)
session.add(employee)
await session.flush()
if employee is None:
raise HTTPException(status_code=403, detail="Service center access required")
if allowed_roles and employee.role not in allowed_roles:
raise HTTPException(status_code=403, detail="Insufficient service role")
return employee
async def log_audit(
session: AsyncSession,
*,
actor: User | None,
action: str,
target_type: str,
target_id: int | str | None = None,
metadata: dict | None = None,
ip: str | None = None,
user_agent: str | None = None,
) -> None:
session.add(
AuditLog(
actor_user_id=actor.id if actor else None,
actor_role=actor.platform_role if actor else None,
action=action,
target_type=target_type,
target_id=str(target_id) if target_id is not None else None,
metadata_json=metadata,
ip=ip,
user_agent=user_agent[:256] if user_agent else None,
)
)