from datetime import date from decimal import Decimal import pandas as pd from sqlalchemy import Select, func, select from sqlalchemy.ext.asyncio import AsyncSession from app.models.expense import FuelEntry, ServiceEntry from app.schemas.expense import OdometerPrediction, OwnershipStats async def get_ownership_stats( session: AsyncSession, car_id: int, date_from: date, date_to: date ) -> OwnershipStats: fuel_totals = await session.execute( select( func.coalesce(func.sum(FuelEntry.total_cost), 0), func.coalesce(func.sum(FuelEntry.liters), 0), func.count(FuelEntry.id), func.min(FuelEntry.odometer), func.max(FuelEntry.odometer), ).where( FuelEntry.car_id == car_id, FuelEntry.entry_date >= date_from, FuelEntry.entry_date <= date_to, ) ) fuel_cost, liters, fuel_count, min_odo, max_odo = fuel_totals.one() service_totals = await session.execute( select(func.coalesce(func.sum(ServiceEntry.total_cost), 0), func.count(ServiceEntry.id)).where( ServiceEntry.car_id == car_id, ServiceEntry.entry_date >= date_from, ServiceEntry.entry_date <= date_to, ) ) service_cost, service_count = service_totals.one() distance_km = int(max_odo - min_odo) if min_odo is not None and max_odo is not None else 0 total_cost = Decimal(fuel_cost) + Decimal(service_cost) avg_consumption = await full_tank_consumption(session, car_id, date_from, date_to) cost_per_km = float(total_cost / distance_km) if distance_km else None return OwnershipStats( car_id=car_id, date_from=date_from, date_to=date_to, fuel_cost=fuel_cost, service_cost=service_cost, total_cost=total_cost, liters=liters, distance_km=distance_km, avg_consumption_l_per_100km=avg_consumption, cost_per_km=cost_per_km, fuel_entries_count=fuel_count, service_entries_count=service_count, ) async def full_tank_consumption( session: AsyncSession, car_id: int, date_from: date, date_to: date ) -> float | None: result = await session.execute( select(FuelEntry) .where( FuelEntry.car_id == car_id, FuelEntry.entry_date <= date_to, ) .order_by(FuelEntry.entry_date.asc(), FuelEntry.odometer.asc(), FuelEntry.id.asc()) ) entries = list(result.scalars()) full_indexes = [index for index, entry in enumerate(entries) if entry.is_full_tank] if len(full_indexes) < 2: return None total_liters = Decimal("0") total_distance = 0 previous_full_index = full_indexes[0] for current_full_index in full_indexes[1:]: previous = entries[previous_full_index] current = entries[current_full_index] if current.entry_date < date_from: previous_full_index = current_full_index continue distance = current.odometer - previous.odometer if distance <= 0: previous_full_index = current_full_index continue interval_liters = sum( Decimal(entry.liters) for entry in entries[previous_full_index + 1 : current_full_index + 1] ) if interval_liters > 0: total_liters += interval_liters total_distance += distance previous_full_index = current_full_index if total_distance <= 0 or total_liters <= 0: return None return float(total_liters * Decimal(100) / Decimal(total_distance)) async def dataframe_from_query(session: AsyncSession, stmt: Select) -> pd.DataFrame: result = await session.execute(stmt) rows = result.mappings().all() return pd.DataFrame(rows) async def predict_odometer(session: AsyncSession, car_id: int) -> OdometerPrediction: price_prediction = await predict_fuel_price(session, car_id) fuel = await dataframe_from_query( session, select(FuelEntry.entry_date.label("date"), FuelEntry.odometer.label("odometer")).where( FuelEntry.car_id == car_id ), ) service = await dataframe_from_query( session, select(ServiceEntry.entry_date.label("date"), ServiceEntry.odometer.label("odometer")).where( ServiceEntry.car_id == car_id, ServiceEntry.odometer.is_not(None) ), ) if fuel.empty and service.empty: return OdometerPrediction( car_id=car_id, samples=0, current_odometer=None, predicted_today=None, predicted_30_days=None, avg_km_per_day=None, avg_km_per_month=None, **price_prediction, confidence=0, insight="Недостаточно данных: добавь одометр в заправках или сервисных записях.", ) df = pd.concat([fuel, service]).dropna().drop_duplicates().sort_values("date") df["date"] = pd.to_datetime(df["date"]) df = df[df["odometer"] >= 0] df = df.sort_values(["date", "odometer"]).drop_duplicates(subset=["date"], keep="last") df = df[df["odometer"].diff().fillna(0) >= 0] if len(df) < 2: current = int(df.iloc[-1]["odometer"]) return OdometerPrediction( car_id=car_id, samples=len(df), current_odometer=current, predicted_today=current, predicted_30_days=None, avg_km_per_day=None, avg_km_per_month=None, **price_prediction, confidence=0.2, insight="Есть только одна точка пробега. Для прогноза нужны минимум две записи.", ) last = df.iloc[-1] df["days_delta"] = df["date"].diff().dt.days df["km_delta"] = df["odometer"].diff() intervals = df[(df["days_delta"] > 0) & (df["km_delta"] >= 0)].copy() intervals["km_per_day"] = intervals["km_delta"] / intervals["days_delta"] intervals = intervals[(intervals["km_per_day"] >= 0) & (intervals["km_per_day"] <= 500)] if intervals.empty: km_per_day = 0 else: recent = intervals.tail(6).copy() recent["weight"] = range(1, len(recent) + 1) weighted = (recent["km_per_day"] * recent["weight"]).sum() / recent["weight"].sum() median = recent["km_per_day"].median() km_per_day = float((weighted * 0.7) + (median * 0.3)) today = pd.Timestamp.utcnow().tz_localize(None).normalize() days_since_last = max((today - last["date"]).days, 0) predicted_today = int(last["odometer"] + km_per_day * days_since_last) predicted_30 = int(predicted_today + km_per_day * 30) span_days = max((last["date"] - df.iloc[0]["date"]).days, 1) interval_count = len(intervals) variability = 0 if interval_count < 3 or km_per_day == 0 else min( float(intervals["km_per_day"].std() / max(km_per_day, 1)), 1, ) confidence = min( 0.95, max(0.25, 0.3 + interval_count * 0.055 + min(span_days, 365) / 900 - variability * 0.18), ) insight = ( "Пробег стабилен, прогноз надежный." if confidence >= 0.75 else "Прогноз предварительный: точность вырастет после регулярных записей одометра." ) return OdometerPrediction( car_id=car_id, samples=len(df), current_odometer=int(last["odometer"]), predicted_today=predicted_today, predicted_30_days=predicted_30, avg_km_per_day=round(km_per_day, 1), avg_km_per_month=round(km_per_day * 30.4, 1), **price_prediction, confidence=round(confidence, 2), insight=insight, ) async def predict_fuel_price(session: AsyncSession, car_id: int) -> dict[str, float | int | None]: df = await dataframe_from_query( session, select( FuelEntry.entry_date.label("date"), FuelEntry.price_per_liter.label("price"), ).where(FuelEntry.car_id == car_id), ) empty = { "current_price_per_liter": None, "predicted_price_per_liter_30_days": None, "avg_price_per_liter": None, "price_samples": 0, "price_confidence": 0, } if df.empty: return empty df = df.dropna().copy() if df.empty: return empty df["date"] = pd.to_datetime(df["date"]) df["price"] = pd.to_numeric(df["price"], errors="coerce") df = df[(df["price"] > 0) & (df["price"] < 10000)].sort_values("date") if df.empty: return empty recent = df.tail(8).copy() current = float(recent.iloc[-1]["price"]) avg = float(recent["price"].mean()) predicted = current confidence = min(0.72, 0.22 + len(recent) * 0.055) if len(recent) >= 2: span_days = max((recent.iloc[-1]["date"] - recent.iloc[0]["date"]).days, 1) change_per_day = float((recent.iloc[-1]["price"] - recent.iloc[0]["price"]) / span_days) predicted = current + change_per_day * 30 predicted = (predicted * 0.65) + (avg * 0.35) volatility = float(recent["price"].std() / max(avg, 1)) if len(recent) >= 3 else 0 confidence = min(0.9, max(0.3, confidence + min(span_days, 180) / 600 - volatility)) return { "current_price_per_liter": round(current, 2), "predicted_price_per_liter_30_days": round(max(predicted, 0), 2), "avg_price_per_liter": round(avg, 2), "price_samples": int(len(df)), "price_confidence": round(confidence, 2), }