from datetime import date from io import BytesIO import matplotlib.pyplot as plt from fastapi import APIRouter, Depends, HTTPException, Response, status from sqlalchemy import func, select from sqlalchemy.ext.asyncio import AsyncSession from app.api.deps import get_current_telegram_user from app.db.session import get_session from app.models.car import Car, OdometerHistory from app.models.expense import ExpenseEntry, FuelEntry, ServiceEntry from app.models.user import User from app.schemas.expense import ( ExpenseEntryCreate, ExpenseEntryRead, ExpenseEntryUpdate, FuelEntryCreate, FuelEntryRead, FuelEntryUpdate, OdometerHistoryRead, OdometerPrediction, OwnershipStats, ServiceEntryCreate, ServiceEntryRead, ServiceEntryUpdate, ) from app.services.admin_notifications import create_admin_notification from app.services.calculations import dataframe_from_query, get_ownership_stats, predict_odometer from app.services.odometer import ( apply_odometer_from_record, recalculate_current_odometer, validate_odometer_change, ) router = APIRouter(tags=["entries"]) async def ensure_owned_car(session: AsyncSession, car_id: int, user: User) -> Car: car = await session.get(Car, car_id) if car is None: raise HTTPException(status_code=404, detail="Car not found") if car.owner_id != user.id: raise HTTPException(status_code=403, detail="Forbidden") return car async def ensure_entry_owner( session: AsyncSession, entry: FuelEntry | ServiceEntry | ExpenseEntry | None, user: User ) -> FuelEntry | ServiceEntry | ExpenseEntry: if entry is None: raise HTTPException(status_code=404, detail="Entry not found") await ensure_owned_car(session, entry.car_id, user) return entry async def maybe_notify_first_record( session: AsyncSession, *, user: User, car: Car, record_type: str, record_id: int, ) -> None: fuel_count = int( ( await session.execute( select(func.count(FuelEntry.id)).join(Car, FuelEntry.car_id == Car.id).where(Car.owner_id == user.id) ) ).scalar_one() or 0 ) service_count = int( ( await session.execute( select(func.count(ServiceEntry.id)).join(Car, ServiceEntry.car_id == Car.id).where(Car.owner_id == user.id) ) ).scalar_one() or 0 ) expense_count = int( ( await session.execute( select(func.count(ExpenseEntry.id)).join(Car, ExpenseEntry.car_id == Car.id).where(Car.owner_id == user.id) ) ).scalar_one() or 0 ) if fuel_count + service_count + expense_count != 1: return await create_admin_notification( session, event_type="first_record_created", title="Пользователь впервые создал запись", body="\n".join( [ f"User ID: {user.id}", f"Telegram ID: {user.telegram_id}", f"Авто: {car.name}", f"Тип записи: {record_type}", ] ), entity_type="vehicle", entity_id=car.id, idempotency_key=f"first_record_created:{user.id}", metadata={"user_id": user.id, "vehicle_id": car.id, "record_type": record_type, "record_id": record_id}, ) @router.post("/fuel", response_model=FuelEntryRead, status_code=status.HTTP_201_CREATED) async def create_fuel_entry( payload: FuelEntryCreate, session: AsyncSession = Depends(get_session), current_user: User = Depends(get_current_telegram_user), ) -> FuelEntry: car = await ensure_owned_car(session, payload.car_id, current_user) validate_odometer_change( car, payload.odometer, source_record_type="fuel", confirm_lower_odometer=payload.confirm_lower_odometer, ) entry = FuelEntry(**payload.model_dump(exclude={"confirm_lower_odometer"})) session.add(entry) await session.flush() await apply_odometer_from_record( session, car, new_odometer=payload.odometer, source_record_type="fuel", source_record_id=entry.id, changed_by=current_user.id, confirm_lower_odometer=payload.confirm_lower_odometer, ) await maybe_notify_first_record(session, user=current_user, car=car, record_type="fuel", record_id=entry.id) await session.commit() await session.refresh(entry) return entry @router.get("/cars/{car_id}/fuel", response_model=list[FuelEntryRead]) async def list_fuel_entries( car_id: int, date_from: date | None = None, date_to: date | None = None, limit: int = 50, offset: int = 0, session: AsyncSession = Depends(get_session), current_user: User = Depends(get_current_telegram_user), ) -> list[FuelEntry]: await ensure_owned_car(session, car_id, current_user) limit = min(max(limit, 1), 200) offset = max(offset, 0) stmt = select(FuelEntry).where(FuelEntry.car_id == car_id) if date_from: stmt = stmt.where(FuelEntry.entry_date >= date_from) if date_to: stmt = stmt.where(FuelEntry.entry_date <= date_to) result = await session.execute( stmt.order_by(FuelEntry.entry_date.desc()).limit(limit).offset(offset) ) return list(result.scalars()) @router.patch("/fuel/{entry_id}", response_model=FuelEntryRead) async def update_fuel_entry( entry_id: int, payload: FuelEntryUpdate, session: AsyncSession = Depends(get_session), current_user: User = Depends(get_current_telegram_user), ) -> FuelEntry: entry = await ensure_entry_owner(session, await session.get(FuelEntry, entry_id), current_user) car = await session.get(Car, entry.car_id) if car is not None and payload.odometer is not None: validate_odometer_change( car, payload.odometer, source_record_type="fuel", confirm_lower_odometer=payload.confirm_lower_odometer, ) for field, value in payload.model_dump(exclude_unset=True, exclude={"confirm_lower_odometer"}).items(): setattr(entry, field, value) if payload.total_cost is None and ( payload.liters is not None or payload.price_per_liter is not None ): entry.total_cost = entry.liters * entry.price_per_liter await recalculate_current_odometer(session, entry.car_id, changed_by=current_user.id, source_record_type="fuel_update") await session.commit() await session.refresh(entry) return entry @router.delete("/fuel/{entry_id}", status_code=status.HTTP_204_NO_CONTENT) async def delete_fuel_entry( entry_id: int, session: AsyncSession = Depends(get_session), current_user: User = Depends(get_current_telegram_user), ) -> None: entry = await ensure_entry_owner(session, await session.get(FuelEntry, entry_id), current_user) car_id = entry.car_id await session.delete(entry) await session.flush() await recalculate_current_odometer(session, car_id, changed_by=current_user.id, source_record_type="fuel_delete") await session.commit() @router.post("/service", response_model=ServiceEntryRead, status_code=status.HTTP_201_CREATED) async def create_service_entry( payload: ServiceEntryCreate, session: AsyncSession = Depends(get_session), current_user: User = Depends(get_current_telegram_user), ) -> ServiceEntry: car = await ensure_owned_car(session, payload.car_id, current_user) validate_odometer_change( car, payload.odometer, source_record_type="service", confirm_lower_odometer=payload.confirm_lower_odometer, ) entry = ServiceEntry(**payload.model_dump(exclude={"confirm_lower_odometer"})) session.add(entry) await session.flush() await apply_odometer_from_record( session, car, new_odometer=payload.odometer, source_record_type="service", source_record_id=entry.id, changed_by=current_user.id, confirm_lower_odometer=payload.confirm_lower_odometer, ) await maybe_notify_first_record(session, user=current_user, car=car, record_type="service", record_id=entry.id) await session.commit() await session.refresh(entry) return entry @router.get("/cars/{car_id}/service", response_model=list[ServiceEntryRead]) async def list_service_entries( car_id: int, date_from: date | None = None, date_to: date | None = None, limit: int = 50, offset: int = 0, session: AsyncSession = Depends(get_session), current_user: User = Depends(get_current_telegram_user), ) -> list[ServiceEntry]: await ensure_owned_car(session, car_id, current_user) limit = min(max(limit, 1), 200) offset = max(offset, 0) stmt = select(ServiceEntry).where(ServiceEntry.car_id == car_id) if date_from: stmt = stmt.where(ServiceEntry.entry_date >= date_from) if date_to: stmt = stmt.where(ServiceEntry.entry_date <= date_to) result = await session.execute( stmt.order_by(ServiceEntry.entry_date.desc()).limit(limit).offset(offset) ) return list(result.scalars()) @router.patch("/service/{entry_id}", response_model=ServiceEntryRead) async def update_service_entry( entry_id: int, payload: ServiceEntryUpdate, session: AsyncSession = Depends(get_session), current_user: User = Depends(get_current_telegram_user), ) -> ServiceEntry: entry = await ensure_entry_owner(session, await session.get(ServiceEntry, entry_id), current_user) car = await session.get(Car, entry.car_id) if car is not None and payload.odometer is not None: validate_odometer_change( car, payload.odometer, source_record_type="service", confirm_lower_odometer=payload.confirm_lower_odometer, ) for field, value in payload.model_dump(exclude_unset=True, exclude={"confirm_lower_odometer"}).items(): setattr(entry, field, value) await recalculate_current_odometer(session, entry.car_id, changed_by=current_user.id, source_record_type="service_update") await session.commit() await session.refresh(entry) return entry @router.delete("/service/{entry_id}", status_code=status.HTTP_204_NO_CONTENT) async def delete_service_entry( entry_id: int, session: AsyncSession = Depends(get_session), current_user: User = Depends(get_current_telegram_user), ) -> None: entry = await ensure_entry_owner(session, await session.get(ServiceEntry, entry_id), current_user) car_id = entry.car_id await session.delete(entry) await session.flush() await recalculate_current_odometer(session, car_id, changed_by=current_user.id, source_record_type="service_delete") await session.commit() @router.post("/expenses", response_model=ExpenseEntryRead, status_code=status.HTTP_201_CREATED) async def create_expense_entry( payload: ExpenseEntryCreate, session: AsyncSession = Depends(get_session), current_user: User = Depends(get_current_telegram_user), ) -> ExpenseEntry: car = await ensure_owned_car(session, payload.car_id, current_user) validate_odometer_change( car, payload.odometer, source_record_type="expense", confirm_lower_odometer=payload.confirm_lower_odometer, ) entry = ExpenseEntry(**payload.model_dump(exclude={"confirm_lower_odometer"})) session.add(entry) await session.flush() await apply_odometer_from_record( session, car, new_odometer=payload.odometer, source_record_type="expense", source_record_id=entry.id, changed_by=current_user.id, confirm_lower_odometer=payload.confirm_lower_odometer, ) await maybe_notify_first_record(session, user=current_user, car=car, record_type="expense", record_id=entry.id) await session.commit() await session.refresh(entry) return entry @router.get("/cars/{car_id}/expenses", response_model=list[ExpenseEntryRead]) async def list_expense_entries( car_id: int, date_from: date | None = None, date_to: date | None = None, category: str | None = None, limit: int = 50, offset: int = 0, session: AsyncSession = Depends(get_session), current_user: User = Depends(get_current_telegram_user), ) -> list[ExpenseEntry]: await ensure_owned_car(session, car_id, current_user) limit = min(max(limit, 1), 200) offset = max(offset, 0) stmt = select(ExpenseEntry).where(ExpenseEntry.car_id == car_id) if date_from: stmt = stmt.where(ExpenseEntry.entry_date >= date_from) if date_to: stmt = stmt.where(ExpenseEntry.entry_date <= date_to) if category: stmt = stmt.where(ExpenseEntry.category == category) result = await session.execute( stmt.order_by(ExpenseEntry.entry_date.desc(), ExpenseEntry.id.desc()).limit(limit).offset(offset) ) return list(result.scalars()) @router.patch("/expenses/{entry_id}", response_model=ExpenseEntryRead) async def update_expense_entry( entry_id: int, payload: ExpenseEntryUpdate, session: AsyncSession = Depends(get_session), current_user: User = Depends(get_current_telegram_user), ) -> ExpenseEntry: entry = await ensure_entry_owner(session, await session.get(ExpenseEntry, entry_id), current_user) car = await session.get(Car, entry.car_id) if car is not None and payload.odometer is not None: validate_odometer_change( car, payload.odometer, source_record_type="expense", confirm_lower_odometer=payload.confirm_lower_odometer, ) for field, value in payload.model_dump(exclude_unset=True, exclude={"confirm_lower_odometer"}).items(): setattr(entry, field, value) await recalculate_current_odometer(session, entry.car_id, changed_by=current_user.id, source_record_type="expense_update") await session.commit() await session.refresh(entry) return entry @router.delete("/expenses/{entry_id}", status_code=status.HTTP_204_NO_CONTENT) async def delete_expense_entry( entry_id: int, session: AsyncSession = Depends(get_session), current_user: User = Depends(get_current_telegram_user), ) -> None: entry = await ensure_entry_owner(session, await session.get(ExpenseEntry, entry_id), current_user) car_id = entry.car_id await session.delete(entry) await session.flush() await recalculate_current_odometer(session, car_id, changed_by=current_user.id, source_record_type="expense_delete") await session.commit() @router.get("/cars/{car_id}/odometer-history", response_model=list[OdometerHistoryRead]) async def odometer_history( car_id: int, limit: int = 50, offset: int = 0, session: AsyncSession = Depends(get_session), current_user: User = Depends(get_current_telegram_user), ) -> list[OdometerHistory]: await ensure_owned_car(session, car_id, current_user) limit = min(max(limit, 1), 200) result = await session.execute( select(OdometerHistory) .where(OdometerHistory.car_id == car_id) .order_by(OdometerHistory.changed_at.desc(), OdometerHistory.id.desc()) .limit(limit) .offset(max(offset, 0)) ) return list(result.scalars()) @router.get("/cars/{car_id}/stats", response_model=OwnershipStats) async def car_stats( car_id: int, date_from: date | None = None, date_to: date | None = None, session: AsyncSession = Depends(get_session), current_user: User = Depends(get_current_telegram_user), ) -> OwnershipStats: await ensure_owned_car(session, car_id, current_user) today = date.today() period_from = date_from or today.replace(day=1) period_to = date_to or today return await get_ownership_stats(session, car_id, period_from, period_to) @router.get("/cars/{car_id}/analytics", response_model=OdometerPrediction) async def car_analytics( car_id: int, session: AsyncSession = Depends(get_session), current_user: User = Depends(get_current_telegram_user), ) -> OdometerPrediction: await ensure_owned_car(session, car_id, current_user) return await predict_odometer(session, car_id) @router.get("/cars/{car_id}/charts/expenses.png") async def expenses_chart( car_id: int, session: AsyncSession = Depends(get_session), current_user: User = Depends(get_current_telegram_user), ) -> Response: await ensure_owned_car(session, car_id, current_user) fuel_df = await dataframe_from_query( session, select(FuelEntry.entry_date.label("date"), FuelEntry.total_cost.label("cost")).where( FuelEntry.car_id == car_id ), ) service_df = await dataframe_from_query( session, select(ServiceEntry.entry_date.label("date"), ServiceEntry.total_cost.label("cost")).where( ServiceEntry.car_id == car_id ), ) if fuel_df.empty and service_df.empty: raise HTTPException(status_code=404, detail="No data for chart") frames = [] if not fuel_df.empty: fuel_df["type"] = "fuel" frames.append(fuel_df) if not service_df.empty: service_df["type"] = "service" frames.append(service_df) import pandas as pd df = pd.concat(frames) df["date"] = pd.to_datetime(df["date"]) pivot = df.pivot_table(index="date", columns="type", values="cost", aggfunc="sum").sort_index() fig, ax = plt.subplots(figsize=(8, 4.5)) pivot.plot(kind="bar", stacked=True, ax=ax) ax.set_title("Car expenses") ax.set_xlabel("Date") ax.set_ylabel("Cost") fig.tight_layout() buffer = BytesIO() fig.savefig(buffer, format="png") plt.close(fig) return Response(buffer.getvalue(), media_type="image/png")