Files
drivers_bot/app/api/cars.py
VPN SaaS Dev 99bc9aa6a1
Some checks failed
ci / test (push) Has been cancelled
complete admin notifications data explorer
2026-05-19 19:02:16 +09:00

148 lines
5.5 KiB
Python

from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy import func, select
from sqlalchemy.ext.asyncio import AsyncSession
from app.api.deps import get_current_telegram_user
from app.db.session import get_session
from app.models.car import Car, VehicleAccess
from app.models.user import User
from app.schemas.car import CarCreate, CarRead, CarUpdate
from app.services.admin_notifications import create_admin_notification
from app.services.odometer import add_odometer_history, validate_odometer_change
from app.services.vehicle_identity import normalize_license_plate, validate_vin
router = APIRouter(prefix="/cars", tags=["cars"])
def apply_identity_fields(data: dict) -> dict:
if "plate_number" in data:
data["license_plate_display"] = data["plate_number"]
data["license_plate_normalized"] = normalize_license_plate(data["plate_number"])
if "vin" in data:
data["vin_normalized"] = validate_vin(data["vin"])
return data
@router.post("", response_model=CarRead, status_code=status.HTTP_201_CREATED)
async def create_car(
payload: CarCreate,
session: AsyncSession = Depends(get_session),
current_user: User = Depends(get_current_telegram_user),
) -> Car:
data = apply_identity_fields(payload.model_dump(exclude={"owner_id"}))
car = Car(**data, owner_id=current_user.id)
session.add(car)
await session.flush()
session.add(VehicleAccess(vehicle_id=car.id, user_id=current_user.id, role="owner", status="active"))
if car.current_odometer is not None:
add_odometer_history(
session,
car,
new_odometer=car.current_odometer,
source_record_type="manual",
source_record_id=None,
changed_by=current_user.id,
)
vehicle_count = int(
(await session.execute(select(func.count(Car.id)).where(Car.owner_id == current_user.id))).scalar_one()
or 0
)
if vehicle_count == 1:
await create_admin_notification(
session,
event_type="vehicle_created",
title="Пользователь впервые добавил авто",
body="\n".join(
[
f"User ID: {current_user.id}",
f"Telegram ID: {current_user.telegram_id}",
f"Авто: {car.name}",
f"Пробег: {car.current_odometer or '-'}",
]
),
entity_type="vehicle",
entity_id=car.id,
idempotency_key=f"vehicle_created:{current_user.id}",
metadata={"user_id": current_user.id, "vehicle_id": car.id},
)
await session.commit()
await session.refresh(car)
return car
@router.get("", response_model=list[CarRead])
async def list_cars(
owner_id: int | None = None,
session: AsyncSession = Depends(get_session),
current_user: User = Depends(get_current_telegram_user),
) -> list[Car]:
if owner_id is not None and owner_id != current_user.id:
raise HTTPException(status_code=403, detail="Forbidden")
result = await session.execute(
select(Car).where(Car.owner_id == current_user.id).order_by(Car.created_at.desc())
)
return list(result.scalars())
@router.get("/{car_id}", response_model=CarRead)
async def get_car(
car_id: int,
session: AsyncSession = Depends(get_session),
current_user: User = Depends(get_current_telegram_user),
) -> Car:
car = await session.get(Car, car_id)
if car is None:
raise HTTPException(status_code=404, detail="Car not found")
if car.owner_id != current_user.id:
raise HTTPException(status_code=403, detail="Forbidden")
return car
@router.patch("/{car_id}", response_model=CarRead)
async def update_car(
car_id: int,
payload: CarUpdate,
session: AsyncSession = Depends(get_session),
current_user: User = Depends(get_current_telegram_user),
) -> Car:
car = await session.get(Car, car_id)
if car is None:
raise HTTPException(status_code=404, detail="Car not found")
if car.owner_id != current_user.id:
raise HTTPException(status_code=403, detail="Forbidden")
raw = apply_identity_fields(payload.model_dump(exclude_unset=True))
odometer_value = raw.pop("current_odometer", None) if "current_odometer" in raw else None
if odometer_value is not None:
validate_odometer_change(car, odometer_value, source_record_type="manual", confirm_lower_odometer=True)
for field, value in raw.items():
setattr(car, field, value)
if odometer_value is not None and odometer_value != car.current_odometer:
add_odometer_history(
session,
car,
new_odometer=odometer_value,
source_record_type="manual",
source_record_id=None,
changed_by=current_user.id,
confirmation_required=car.current_odometer is not None and odometer_value < car.current_odometer,
user_confirmed=True,
)
await session.commit()
await session.refresh(car)
return car
@router.delete("/{car_id}", status_code=status.HTTP_204_NO_CONTENT)
async def delete_car(
car_id: int,
session: AsyncSession = Depends(get_session),
current_user: User = Depends(get_current_telegram_user),
) -> None:
car = await session.get(Car, car_id)
if car is None:
raise HTTPException(status_code=404, detail="Car not found")
if car.owner_id != current_user.id:
raise HTTPException(status_code=403, detail="Forbidden")
await session.delete(car)
await session.commit()