Files
drivers_bot/app/api/my.py
VPN SaaS Dev ecfb5aa949
Some checks failed
ci / test (push) Has been cancelled
Refactor menu flows into dedicated pages
2026-05-16 11:59:09 +09:00

729 lines
26 KiB
Python

from datetime import UTC, date, datetime
from decimal import Decimal
from fastapi import APIRouter, Depends, HTTPException, status
from fastapi.encoders import jsonable_encoder
from sqlalchemy import or_, select
from sqlalchemy.ext.asyncio import AsyncSession
from app.api.deps import get_current_telegram_user, log_audit
from app.db.session import get_session
from app.models.car import (
Car,
CarServiceLink,
ServiceAppointment,
ServiceCenter,
ServiceVisit,
VehicleAccess,
VehicleDataChangeRequest,
)
from app.models.expense import ExpenseEntry, FuelEntry, ServiceEntry
from app.models.user import User
from app.schemas.service_center import (
VehicleAccessGrant,
VehicleAccessRead,
VehicleCreate,
VehicleRead,
VehicleUpdate,
)
from app.schemas.user import UserRead
from app.services.odometer import (
add_odometer_history,
recalculate_current_odometer,
validate_odometer_change,
)
from app.services.vehicle_identity import normalize_license_plate, validate_vin
router = APIRouter(tags=["my"])
EXPORT_SCHEMA = "carpass.exchange.v1"
VEHICLE_IMPORT_FIELDS = {
"name",
"make",
"model",
"trim",
"generation",
"body_type",
"year",
"plate_number",
"vin",
"fuel_type",
"engine_volume_l",
"transmission",
"drive_type",
"target_consumption_l_per_100km",
"fuel_tank_volume_l",
"engine_oil_type",
"engine_oil_volume_l",
"transmission_fluid_type",
"transmission_fluid_volume_l",
"coolant_type",
"brake_fluid_type",
"tire_pressure_front_bar",
"tire_pressure_rear_bar",
"tire_size",
"oil_change_interval_km",
"oil_change_interval_months",
"purchase_date",
"purchase_price",
"purchase_currency",
"purchase_type",
"currency",
"include_depreciation",
"expected_ownership_months",
"expected_residual_value",
"loan_principal",
"loan_down_payment",
"loan_term_months",
"loan_annual_interest_rate",
"loan_first_payment_date",
"loan_payment_day",
"loan_payment_type",
"loan_currency",
"loan_comment",
"current_odometer",
"notes",
}
def _parse_date(value: str | date | None) -> date | None:
if not value:
return None
if isinstance(value, date):
return value
return date.fromisoformat(str(value)[:10])
def _decimal(value: object) -> Decimal | None:
if value in (None, ""):
return None
return Decimal(str(value))
def _vehicle_import_data(raw: dict) -> dict:
data = {key: raw.get(key) for key in VEHICLE_IMPORT_FIELDS if key in raw}
for key in ("purchase_date", "loan_first_payment_date"):
if key in data:
data[key] = _parse_date(data[key])
if "plate_number" in data:
data["license_plate_display"] = data["plate_number"]
data["license_plate_normalized"] = normalize_license_plate(data["plate_number"])
if "vin" in data:
data["vin_normalized"] = validate_vin(data["vin"])
return data
def _exchange_counts(payload: dict) -> dict:
vehicles = payload.get("vehicles") or []
return {
"vehicles": len(vehicles),
"fuel_entries": sum(len(item.get("fuel_entries") or []) for item in vehicles),
"service_entries": sum(len(item.get("service_entries") or []) for item in vehicles),
"expense_entries": sum(len(item.get("expense_entries") or []) for item in vehicles),
"appointments": sum(len(item.get("appointments") or []) for item in vehicles),
"service_visits": sum(len(item.get("service_visits") or []) for item in vehicles),
}
def _model_dict(item) -> dict:
return {column.name: getattr(item, column.name) for column in item.__table__.columns}
async def _find_import_vehicle(session: AsyncSession, current_user: User, data: dict) -> Car | None:
if data.get("vin_normalized"):
found = (
await session.execute(
select(Car).where(Car.owner_id == current_user.id, Car.vin_normalized == data["vin_normalized"])
)
).scalar_one_or_none()
if found is not None:
return found
if data.get("license_plate_normalized"):
found = (
await session.execute(
select(Car).where(
Car.owner_id == current_user.id,
Car.license_plate_normalized == data["license_plate_normalized"],
)
)
).scalar_one_or_none()
if found is not None:
return found
return (
await session.execute(
select(Car).where(
Car.owner_id == current_user.id,
Car.name == data.get("name", "Импортированное авто"),
Car.make == data.get("make"),
Car.model == data.get("model"),
)
)
).scalar_one_or_none()
async def _clear_conflicting_unique_identity(session: AsyncSession, current_user: User, data: dict) -> None:
if not data.get("vin_normalized"):
return
existing_owner_id = (
await session.execute(
select(Car.owner_id).where(
Car.vin_normalized == data["vin_normalized"],
Car.owner_id != current_user.id,
)
)
).scalar_one_or_none()
if existing_owner_id is not None:
data.pop("vin", None)
data.pop("vin_normalized", None)
def _supplement_empty_vehicle_fields(car: Car, data: dict) -> list[str]:
updated: list[str] = []
for field, value in data.items():
if field in {"id", "owner_id", "created_at", "updated_at"} or value in (None, ""):
continue
if getattr(car, field, None) in (None, ""):
setattr(car, field, value)
updated.append(field)
return updated
async def _import_fuel_entries(session: AsyncSession, car: Car, rows: list[dict]) -> int:
imported = 0
for raw in rows:
entry_date = _parse_date(raw.get("entry_date"))
if entry_date is None or raw.get("odometer") is None:
continue
liters = _decimal(raw.get("liters"))
price_per_liter = _decimal(raw.get("price_per_liter"))
if liters is None or price_per_liter is None:
continue
total_cost = _decimal(raw.get("total_cost")) or (liters * price_per_liter)
exists = (
await session.execute(
select(FuelEntry.id).where(
FuelEntry.car_id == car.id,
FuelEntry.entry_date == entry_date,
FuelEntry.odometer == int(raw["odometer"]),
FuelEntry.liters == liters,
FuelEntry.total_cost == total_cost,
)
)
).scalar_one_or_none()
if exists is not None:
continue
session.add(
FuelEntry(
car_id=car.id,
entry_date=entry_date,
odometer=int(raw["odometer"]),
liters=liters,
price_per_liter=price_per_liter,
total_cost=total_cost,
station=raw.get("station"),
fuel_brand=raw.get("fuel_brand"),
is_full_tank=raw.get("is_full_tank"),
notes=raw.get("notes"),
)
)
imported += 1
return imported
async def _import_service_entries(session: AsyncSession, car: Car, rows: list[dict]) -> int:
imported = 0
for raw in rows:
entry_date = _parse_date(raw.get("entry_date"))
title = (raw.get("title") or "").strip()
total_cost = _decimal(raw.get("total_cost"))
if entry_date is None or not title or total_cost is None:
continue
exists = (
await session.execute(
select(ServiceEntry.id).where(
ServiceEntry.car_id == car.id,
ServiceEntry.entry_date == entry_date,
ServiceEntry.title == title,
ServiceEntry.total_cost == total_cost,
)
)
).scalar_one_or_none()
if exists is not None:
continue
session.add(
ServiceEntry(
car_id=car.id,
entry_date=entry_date,
odometer=raw.get("odometer"),
service_type=raw.get("service_type") or "maintenance",
title=title,
category=raw.get("category"),
vendor=raw.get("vendor"),
total_cost=total_cost,
next_due_date=_parse_date(raw.get("next_due_date")),
next_due_odometer=raw.get("next_due_odometer"),
notes=raw.get("notes"),
)
)
imported += 1
return imported
async def _import_expense_entries(session: AsyncSession, car: Car, rows: list[dict]) -> int:
imported = 0
for raw in rows:
entry_date = _parse_date(raw.get("entry_date"))
title = (raw.get("title") or "").strip()
total_cost = _decimal(raw.get("total_cost"))
if entry_date is None or not title or total_cost is None:
continue
exists = (
await session.execute(
select(ExpenseEntry.id).where(
ExpenseEntry.car_id == car.id,
ExpenseEntry.entry_date == entry_date,
ExpenseEntry.title == title,
ExpenseEntry.total_cost == total_cost,
)
)
).scalar_one_or_none()
if exists is not None:
continue
session.add(
ExpenseEntry(
car_id=car.id,
entry_date=entry_date,
category=raw.get("category") or "other",
title=title,
vendor=raw.get("vendor"),
total_cost=total_cost,
currency=raw.get("currency") or car.currency or "RUB",
odometer=raw.get("odometer"),
period_start=_parse_date(raw.get("period_start")),
period_end=_parse_date(raw.get("period_end")),
period_months=raw.get("period_months"),
is_recurring=bool(raw.get("is_recurring")),
policy_number=raw.get("policy_number"),
insurance_type=raw.get("insurance_type"),
payment_period_months=raw.get("payment_period_months"),
document_urls=raw.get("document_urls"),
metadata_json=raw.get("metadata_json"),
notes=raw.get("notes"),
)
)
imported += 1
return imported
@router.get("/me", response_model=UserRead)
async def me(current_user: User = Depends(get_current_telegram_user)) -> User:
return current_user
def vehicle_data(payload: VehicleCreate | VehicleUpdate, *, partial: bool = False) -> dict:
raw = payload.model_dump(exclude_unset=partial)
data = {
key: value
for key, value in raw.items()
if key not in {"license_plate", "license_plate_country", "vin"}
}
if "license_plate" in raw:
data["license_plate_display"] = raw["license_plate"]
data["license_plate_normalized"] = normalize_license_plate(raw["license_plate"])
data["plate_number"] = raw["license_plate"]
if "license_plate_country" in raw:
data["license_plate_country"] = (
raw["license_plate_country"].upper() if raw["license_plate_country"] else None
)
if "vin" in raw:
data["vin_normalized"] = validate_vin(raw["vin"])
data["vin"] = raw["vin"]
return data
@router.get("/my/vehicles", response_model=list[VehicleRead])
async def my_vehicles(
session: AsyncSession = Depends(get_session),
current_user: User = Depends(get_current_telegram_user),
) -> list[Car]:
result = await session.execute(
select(Car)
.outerjoin(VehicleAccess, VehicleAccess.vehicle_id == Car.id)
.where(
or_(
Car.owner_id == current_user.id,
(VehicleAccess.user_id == current_user.id) & (VehicleAccess.status == "active"),
)
)
.order_by(Car.created_at.desc())
)
return list(result.scalars())
@router.post("/my/vehicles", response_model=VehicleRead, status_code=status.HTTP_201_CREATED)
async def create_vehicle(
payload: VehicleCreate,
session: AsyncSession = Depends(get_session),
current_user: User = Depends(get_current_telegram_user),
) -> Car:
car = Car(**vehicle_data(payload), owner_id=current_user.id)
session.add(car)
await session.flush()
session.add(VehicleAccess(vehicle_id=car.id, user_id=current_user.id, role="owner", status="active"))
if car.current_odometer is not None:
add_odometer_history(
session,
car,
new_odometer=car.current_odometer,
source_record_type="manual",
source_record_id=None,
changed_by=current_user.id,
)
await log_audit(session, actor=current_user, action="vehicle.create", target_type="vehicle", target_id=car.id)
await session.commit()
await session.refresh(car)
return car
@router.patch("/my/vehicles/{vehicle_id}", response_model=VehicleRead)
async def update_vehicle(
vehicle_id: int,
payload: VehicleUpdate,
session: AsyncSession = Depends(get_session),
current_user: User = Depends(get_current_telegram_user),
) -> Car:
car = await session.get(Car, vehicle_id)
if car is None:
raise HTTPException(status_code=404, detail="Vehicle not found")
if car.owner_id != current_user.id:
raise HTTPException(status_code=403, detail="Forbidden")
raw = vehicle_data(payload, partial=True)
odometer_value = raw.pop("current_odometer", None) if "current_odometer" in raw else None
if odometer_value is not None:
validate_odometer_change(car, odometer_value, source_record_type="manual", confirm_lower_odometer=True)
for field, value in raw.items():
setattr(car, field, value)
if odometer_value is not None and odometer_value != car.current_odometer:
add_odometer_history(
session,
car,
new_odometer=odometer_value,
source_record_type="manual",
source_record_id=None,
changed_by=current_user.id,
confirmation_required=car.current_odometer is not None and odometer_value < car.current_odometer,
user_confirmed=True,
)
await log_audit(session, actor=current_user, action="vehicle.update", target_type="vehicle", target_id=car.id)
await session.commit()
await session.refresh(car)
return car
@router.get("/my/vehicles/{vehicle_id}/service-history")
async def vehicle_service_history(
vehicle_id: int,
session: AsyncSession = Depends(get_session),
current_user: User = Depends(get_current_telegram_user),
) -> dict:
car = await session.get(Car, vehicle_id)
if car is None:
raise HTTPException(status_code=404, detail="Vehicle not found")
if car.owner_id != current_user.id:
raise HTTPException(status_code=403, detail="Forbidden")
result = await session.execute(
select(ServiceVisit)
.where(ServiceVisit.vehicle_id == vehicle_id)
.order_by(ServiceVisit.visit_date.desc())
)
visits = list(result.scalars())
return {"vehicle_id": vehicle_id, "service_visits": jsonable_encoder(visits)}
@router.get("/my/confirmations")
async def my_confirmations(
session: AsyncSession = Depends(get_session),
current_user: User = Depends(get_current_telegram_user),
) -> dict:
owner_cars = select(Car.id).where(Car.owner_id == current_user.id)
visits = list(
(
await session.execute(
select(ServiceVisit)
.where(
ServiceVisit.vehicle_id.in_(owner_cars),
ServiceVisit.status == "pending_owner_confirmation",
)
.order_by(ServiceVisit.updated_at.desc(), ServiceVisit.id.desc())
)
).scalars()
)
change_requests = list(
(
await session.execute(
select(VehicleDataChangeRequest)
.where(
VehicleDataChangeRequest.owner_user_id == current_user.id,
VehicleDataChangeRequest.status == "pending",
)
.order_by(VehicleDataChangeRequest.created_at.desc())
)
).scalars()
)
links = list(
(
await session.execute(
select(CarServiceLink)
.where(
CarServiceLink.car_id.in_(owner_cars),
CarServiceLink.status == "pending",
CarServiceLink.is_active.is_(False),
)
.order_by(CarServiceLink.created_at.desc())
)
).scalars()
)
return {
"service_visits": jsonable_encoder(visits),
"change_requests": jsonable_encoder(change_requests),
"service_links": jsonable_encoder(links),
}
@router.get("/my/service-links")
async def my_service_links(
session: AsyncSession = Depends(get_session),
current_user: User = Depends(get_current_telegram_user),
) -> list[dict]:
result = await session.execute(
select(CarServiceLink, Car, ServiceCenter)
.join(Car, Car.id == CarServiceLink.car_id)
.join(ServiceCenter, ServiceCenter.id == CarServiceLink.service_center_id)
.where(Car.owner_id == current_user.id)
.order_by(CarServiceLink.created_at.desc())
)
return [
{
"id": link.id,
"status": link.status,
"access_level": link.access_level,
"car_id": car.id,
"car_name": car.name,
"service_center_id": center.id,
"service_center_name": center.display_name or center.name,
"created_at": link.created_at,
"approved_at": link.approved_at,
"revoked_at": link.revoked_at,
}
for link, car, center in result.all()
]
@router.get("/my/export")
async def export_my_data(
session: AsyncSession = Depends(get_session),
current_user: User = Depends(get_current_telegram_user),
) -> dict:
cars = list(
(
await session.execute(
select(Car).where(Car.owner_id == current_user.id).order_by(Car.created_at.asc(), Car.id.asc())
)
).scalars()
)
exported_vehicles = []
for car in cars:
fuel_entries = list(
(
await session.execute(
select(FuelEntry).where(FuelEntry.car_id == car.id).order_by(FuelEntry.entry_date.asc(), FuelEntry.id.asc())
)
).scalars()
)
service_entries = list(
(
await session.execute(
select(ServiceEntry)
.where(ServiceEntry.car_id == car.id)
.order_by(ServiceEntry.entry_date.asc(), ServiceEntry.id.asc())
)
).scalars()
)
expense_entries = list(
(
await session.execute(
select(ExpenseEntry)
.where(ExpenseEntry.car_id == car.id)
.order_by(ExpenseEntry.entry_date.asc(), ExpenseEntry.id.asc())
)
).scalars()
)
appointments = list(
(
await session.execute(
select(ServiceAppointment)
.where(ServiceAppointment.vehicle_id == car.id, ServiceAppointment.owner_id == current_user.id)
.order_by(ServiceAppointment.created_at.asc(), ServiceAppointment.id.asc())
)
).scalars()
)
visits = list(
(
await session.execute(
select(ServiceVisit)
.where(ServiceVisit.vehicle_id == car.id)
.order_by(ServiceVisit.visit_date.asc(), ServiceVisit.id.asc())
)
).scalars()
)
exported_vehicles.append(
{
"vehicle": _model_dict(car),
"fuel_entries": [_model_dict(item) for item in fuel_entries],
"service_entries": [_model_dict(item) for item in service_entries],
"expense_entries": [_model_dict(item) for item in expense_entries],
"appointments": [_model_dict(item) for item in appointments],
"service_visits": [_model_dict(item) for item in visits],
}
)
centers = list(
(
await session.execute(
select(ServiceCenter)
.where(ServiceCenter.owner_user_id == current_user.id)
.order_by(ServiceCenter.created_at.asc(), ServiceCenter.id.asc())
)
).scalars()
)
payload = {
"schema": EXPORT_SCHEMA,
"exported_at": datetime.now(UTC),
"user": {
"telegram_id": current_user.telegram_id,
"username": current_user.username,
"locale": current_user.locale,
"currency": current_user.currency,
},
"vehicles": exported_vehicles,
"service_centers": [_model_dict(item) for item in centers],
"exchange_policy": {
"import_mode": "create_missing",
"dedupe": ["vin", "license_plate", "vehicle_name_make_model", "entry_date_title_amount"],
"work_orders": "archived_export_only",
},
}
return jsonable_encoder(payload)
@router.post("/my/import/preview")
async def preview_my_data_import(
payload: dict,
current_user: User = Depends(get_current_telegram_user),
) -> dict:
warnings = []
if payload.get("schema") != EXPORT_SCHEMA:
warnings.append("Импорт ожидает JSON CarPass exchange v1. Данные будут обработаны в режиме совместимости.")
counts = _exchange_counts(payload)
if counts["service_visits"] or counts["appointments"]:
warnings.append("Брони и заказ-наряды импортируются как архивные данные экспорта, без создания активных заявок.")
return {
"valid": bool(payload.get("vehicles")),
"owner_telegram_id": current_user.telegram_id,
"counts": counts,
"warnings": warnings,
"mode": "create_missing",
}
@router.post("/my/import")
async def import_my_data(
payload: dict,
dry_run: bool = False,
session: AsyncSession = Depends(get_session),
current_user: User = Depends(get_current_telegram_user),
) -> dict:
preview = await preview_my_data_import(payload, current_user)
if dry_run:
return preview
imported = {
"vehicles_created": 0,
"vehicles_matched": 0,
"vehicle_fields_updated": 0,
"fuel_entries": 0,
"service_entries": 0,
"expense_entries": 0,
}
for bundle in payload.get("vehicles") or []:
raw_vehicle = bundle.get("vehicle") or bundle
vehicle_payload = _vehicle_import_data(raw_vehicle)
if not vehicle_payload.get("name"):
vehicle_payload["name"] = "Импортированное авто"
await _clear_conflicting_unique_identity(session, current_user, vehicle_payload)
car = await _find_import_vehicle(session, current_user, vehicle_payload)
if car is None:
car = Car(**vehicle_payload, owner_id=current_user.id)
session.add(car)
await session.flush()
session.add(VehicleAccess(vehicle_id=car.id, user_id=current_user.id, role="owner", status="active"))
imported["vehicles_created"] += 1
else:
imported["vehicles_matched"] += 1
imported["vehicle_fields_updated"] += len(_supplement_empty_vehicle_fields(car, vehicle_payload))
imported["fuel_entries"] += await _import_fuel_entries(session, car, bundle.get("fuel_entries") or [])
imported["service_entries"] += await _import_service_entries(session, car, bundle.get("service_entries") or [])
imported["expense_entries"] += await _import_expense_entries(session, car, bundle.get("expense_entries") or [])
await session.flush()
await recalculate_current_odometer(session, car.id, changed_by=current_user.id, source_record_type="data_import")
await log_audit(
session,
actor=current_user,
action="data_exchange.import",
target_type="user",
target_id=current_user.id,
metadata={"imported": imported, "counts": preview["counts"]},
)
await session.commit()
return {"status": "imported", "imported": imported, "preview": preview}
@router.post("/my/vehicles/{vehicle_id}/grant-service-access", response_model=VehicleAccessRead)
async def grant_vehicle_access(
vehicle_id: int,
payload: VehicleAccessGrant,
session: AsyncSession = Depends(get_session),
current_user: User = Depends(get_current_telegram_user),
) -> VehicleAccess:
car = await session.get(Car, vehicle_id)
if car is None:
raise HTTPException(status_code=404, detail="Vehicle not found")
if car.owner_id != current_user.id:
raise HTTPException(status_code=403, detail="Forbidden")
if not payload.user_id:
raise HTTPException(status_code=400, detail="user_id is required for access grants")
result = await session.execute(
select(VehicleAccess).where(
VehicleAccess.vehicle_id == vehicle_id,
VehicleAccess.user_id == payload.user_id,
VehicleAccess.role == payload.role,
)
)
access = result.scalar_one_or_none()
if access is None:
access = VehicleAccess(vehicle_id=vehicle_id, user_id=payload.user_id, role=payload.role, status="active")
session.add(access)
else:
access.status = "active"
access.revoked_at = None
await log_audit(
session,
actor=current_user,
action="vehicle_access.grant",
target_type="vehicle",
target_id=vehicle_id,
metadata={"granted_user_id": payload.user_id, "role": payload.role},
)
await session.commit()
await session.refresh(access)
return access