from typing import Annotated from fastapi import Depends, Header, HTTPException, status from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession from app.core.config import settings from app.db.session import get_session from app.models.car import AuditLog, Car, ServiceCenter, ServiceEmployee, VehicleAccess from app.models.user import User from app.services.admin_notifications import create_admin_notification from app.services.telegram_auth import verify_webapp_init_data async def get_or_create_telegram_user( session: AsyncSession, *, telegram_id: int, username: str | None = None, first_name: str | None = None, last_name: str | None = None, locale: str | None = None, currency: str | None = None, platform_role: str | None = None, ) -> User: result = await session.execute(select(User).where(User.telegram_id == telegram_id)) user = result.scalar_one_or_none() payload = { "telegram_id": telegram_id, "username": str(telegram_id), "first_name": first_name, "last_name": last_name, "locale": locale, "currency": currency, "platform_role": platform_role, } if user is None: user = User(**{key: value for key, value in payload.items() if value is not None}) session.add(user) await session.flush() await create_admin_notification( session, event_type="user_registered", title="Новый пользователь", body="\n".join( item for item in [ f"Имя: {' '.join(part for part in [first_name, last_name] if part) or '-'}", f"Telegram ID: {telegram_id}", f"Username: @{username}" if username else "Username: -", ] ), entity_type="user", entity_id=user.id, idempotency_key=f"user_registered:{telegram_id}", metadata={"telegram_id": telegram_id, "username": username}, ) else: for field, value in payload.items(): if value is not None: setattr(user, field, value) await session.commit() await session.refresh(user) return user def require_internal_api_token(token: str | None) -> None: if not settings.internal_api_token: raise HTTPException( status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail="Internal API token is not configured", ) if not token or token != settings.internal_api_token: raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Forbidden") async def get_current_telegram_user( session: Annotated[AsyncSession, Depends(get_session)], x_telegram_init_data: Annotated[str | None, Header(alias="X-Telegram-Init-Data")] = None, x_internal_api_token: Annotated[str | None, Header(alias="X-Internal-API-Token")] = None, x_telegram_user_id: Annotated[int | None, Header(alias="X-Telegram-User-Id")] = None, x_dev_telegram_id: Annotated[int | None, Header(alias="X-Dev-Telegram-Id")] = None, ) -> User: if x_telegram_init_data: user_data = verify_webapp_init_data(x_telegram_init_data, settings.bot_token) return await get_or_create_telegram_user( session, telegram_id=int(user_data["id"]), username=user_data.get("username"), first_name=user_data.get("first_name"), last_name=user_data.get("last_name"), locale=user_data.get("language_code"), ) if x_internal_api_token and x_telegram_user_id: require_internal_api_token(x_internal_api_token) return await get_or_create_telegram_user(session, telegram_id=x_telegram_user_id) if settings.allow_dev_auth and not settings.is_production and x_dev_telegram_id: return await get_or_create_telegram_user(session, telegram_id=x_dev_telegram_id) raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Telegram initData required") async def get_owned_car( car_id: int, current_user: Annotated[User, Depends(get_current_telegram_user)], session: Annotated[AsyncSession, Depends(get_session)], ) -> Car: car = await session.get(Car, car_id) if car is None: raise HTTPException(status_code=404, detail="Car not found") if car.owner_id != current_user.id: raise HTTPException(status_code=403, detail="Forbidden") return car async def user_has_vehicle_access( session: AsyncSession, vehicle_id: int, user_id: int, roles: set[str] | None = None ) -> bool: stmt = select(VehicleAccess).where( VehicleAccess.vehicle_id == vehicle_id, VehicleAccess.user_id == user_id, VehicleAccess.status == "active", ) if roles: stmt = stmt.where(VehicleAccess.role.in_(roles)) result = await session.execute(stmt) return result.scalar_one_or_none() is not None async def ensure_vehicle_owner_or_access( session: AsyncSession, vehicle_id: int, user: User, roles: set[str] | None = None, ) -> Car: car = await session.get(Car, vehicle_id) if car is None: raise HTTPException(status_code=404, detail="Vehicle not found") if car.owner_id == user.id: return car if await user_has_vehicle_access(session, vehicle_id, user.id, roles): return car raise HTTPException(status_code=403, detail="Forbidden") def require_platform_role(user: User, allowed: set[str]) -> None: if user.platform_role not in allowed: raise HTTPException(status_code=403, detail="Forbidden") async def ensure_service_employee( session: AsyncSession, service_center_id: int, user: User, allowed_roles: set[str] | None = None, ) -> ServiceEmployee: result = await session.execute( select(ServiceEmployee).where( ServiceEmployee.service_center_id == service_center_id, ServiceEmployee.user_id == user.id, ServiceEmployee.status == "active", ) ) employee = result.scalar_one_or_none() center = await session.get(ServiceCenter, service_center_id) owner_allowed = center is not None and center.owner_user_id == user.id if employee is None and owner_allowed: employee = ServiceEmployee( service_center_id=service_center_id, user_id=user.id, role="owner", status="active", ) session.add(employee) await session.flush() if employee is None: raise HTTPException(status_code=403, detail="Service center access required") if allowed_roles and employee.role not in allowed_roles: raise HTTPException(status_code=403, detail="Insufficient service role") return employee async def log_audit( session: AsyncSession, *, actor: User | None, action: str, target_type: str, target_id: int | str | None = None, metadata: dict | None = None, ip: str | None = None, user_agent: str | None = None, ) -> None: session.add( AuditLog( actor_user_id=actor.id if actor else None, actor_role=actor.platform_role if actor else None, action=action, target_type=target_type, target_id=str(target_id) if target_id is not None else None, metadata_json=metadata, ip=ip, user_agent=user_agent[:256] if user_agent else None, ) )