Files
drivers_bot/app/services/calculations.py
2026-05-12 04:26:24 +09:00

209 lines
8.0 KiB
Python

from datetime import date
from decimal import Decimal
import pandas as pd
from sqlalchemy import Select, func, select
from sqlalchemy.ext.asyncio import AsyncSession
from app.models.expense import FuelEntry, ServiceEntry
from app.schemas.expense import OdometerPrediction, OwnershipStats
async def get_ownership_stats(
session: AsyncSession, car_id: int, date_from: date, date_to: date
) -> OwnershipStats:
fuel_totals = await session.execute(
select(
func.coalesce(func.sum(FuelEntry.total_cost), 0),
func.coalesce(func.sum(FuelEntry.liters), 0),
func.count(FuelEntry.id),
func.min(FuelEntry.odometer),
func.max(FuelEntry.odometer),
).where(
FuelEntry.car_id == car_id,
FuelEntry.entry_date >= date_from,
FuelEntry.entry_date <= date_to,
)
)
fuel_cost, liters, fuel_count, min_odo, max_odo = fuel_totals.one()
service_totals = await session.execute(
select(func.coalesce(func.sum(ServiceEntry.total_cost), 0), func.count(ServiceEntry.id)).where(
ServiceEntry.car_id == car_id,
ServiceEntry.entry_date >= date_from,
ServiceEntry.entry_date <= date_to,
)
)
service_cost, service_count = service_totals.one()
distance_km = int(max_odo - min_odo) if min_odo is not None and max_odo is not None else 0
total_cost = Decimal(fuel_cost) + Decimal(service_cost)
avg_consumption = float(Decimal(liters) * Decimal(100) / distance_km) if distance_km else None
cost_per_km = float(total_cost / distance_km) if distance_km else None
return OwnershipStats(
car_id=car_id,
date_from=date_from,
date_to=date_to,
fuel_cost=fuel_cost,
service_cost=service_cost,
total_cost=total_cost,
liters=liters,
distance_km=distance_km,
avg_consumption_l_per_100km=avg_consumption,
cost_per_km=cost_per_km,
fuel_entries_count=fuel_count,
service_entries_count=service_count,
)
async def dataframe_from_query(session: AsyncSession, stmt: Select) -> pd.DataFrame:
result = await session.execute(stmt)
rows = result.mappings().all()
return pd.DataFrame(rows)
async def predict_odometer(session: AsyncSession, car_id: int) -> OdometerPrediction:
price_prediction = await predict_fuel_price(session, car_id)
fuel = await dataframe_from_query(
session,
select(FuelEntry.entry_date.label("date"), FuelEntry.odometer.label("odometer")).where(
FuelEntry.car_id == car_id
),
)
service = await dataframe_from_query(
session,
select(ServiceEntry.entry_date.label("date"), ServiceEntry.odometer.label("odometer")).where(
ServiceEntry.car_id == car_id, ServiceEntry.odometer.is_not(None)
),
)
if fuel.empty and service.empty:
return OdometerPrediction(
car_id=car_id,
samples=0,
current_odometer=None,
predicted_today=None,
predicted_30_days=None,
avg_km_per_day=None,
avg_km_per_month=None,
**price_prediction,
confidence=0,
insight="Недостаточно данных: добавь одометр в заправках или сервисных записях.",
)
df = pd.concat([fuel, service]).dropna().drop_duplicates().sort_values("date")
df["date"] = pd.to_datetime(df["date"])
df = df[df["odometer"] >= 0]
df = df.sort_values(["date", "odometer"]).drop_duplicates(subset=["date"], keep="last")
df = df[df["odometer"].diff().fillna(0) >= 0]
if len(df) < 2:
current = int(df.iloc[-1]["odometer"])
return OdometerPrediction(
car_id=car_id,
samples=len(df),
current_odometer=current,
predicted_today=current,
predicted_30_days=None,
avg_km_per_day=None,
avg_km_per_month=None,
**price_prediction,
confidence=0.2,
insight="Есть только одна точка пробега. Для прогноза нужны минимум две записи.",
)
last = df.iloc[-1]
df["days_delta"] = df["date"].diff().dt.days
df["km_delta"] = df["odometer"].diff()
intervals = df[(df["days_delta"] > 0) & (df["km_delta"] >= 0)].copy()
intervals["km_per_day"] = intervals["km_delta"] / intervals["days_delta"]
intervals = intervals[(intervals["km_per_day"] >= 0) & (intervals["km_per_day"] <= 500)]
if intervals.empty:
km_per_day = 0
else:
recent = intervals.tail(6).copy()
recent["weight"] = range(1, len(recent) + 1)
weighted = (recent["km_per_day"] * recent["weight"]).sum() / recent["weight"].sum()
median = recent["km_per_day"].median()
km_per_day = float((weighted * 0.7) + (median * 0.3))
today = pd.Timestamp.utcnow().tz_localize(None).normalize()
days_since_last = max((today - last["date"]).days, 0)
predicted_today = int(last["odometer"] + km_per_day * days_since_last)
predicted_30 = int(predicted_today + km_per_day * 30)
span_days = max((last["date"] - df.iloc[0]["date"]).days, 1)
interval_count = len(intervals)
variability = 0 if interval_count < 3 or km_per_day == 0 else min(
float(intervals["km_per_day"].std() / max(km_per_day, 1)),
1,
)
confidence = min(
0.95,
max(0.25, 0.3 + interval_count * 0.055 + min(span_days, 365) / 900 - variability * 0.18),
)
insight = (
"Пробег стабилен, прогноз надежный."
if confidence >= 0.75
else "Прогноз предварительный: точность вырастет после регулярных записей одометра."
)
return OdometerPrediction(
car_id=car_id,
samples=len(df),
current_odometer=int(last["odometer"]),
predicted_today=predicted_today,
predicted_30_days=predicted_30,
avg_km_per_day=round(km_per_day, 1),
avg_km_per_month=round(km_per_day * 30.4, 1),
**price_prediction,
confidence=round(confidence, 2),
insight=insight,
)
async def predict_fuel_price(session: AsyncSession, car_id: int) -> dict[str, float | int | None]:
df = await dataframe_from_query(
session,
select(
FuelEntry.entry_date.label("date"),
FuelEntry.price_per_liter.label("price"),
).where(FuelEntry.car_id == car_id),
)
empty = {
"current_price_per_liter": None,
"predicted_price_per_liter_30_days": None,
"avg_price_per_liter": None,
"price_samples": 0,
"price_confidence": 0,
}
if df.empty:
return empty
df = df.dropna().copy()
if df.empty:
return empty
df["date"] = pd.to_datetime(df["date"])
df["price"] = pd.to_numeric(df["price"], errors="coerce")
df = df[(df["price"] > 0) & (df["price"] < 10000)].sort_values("date")
if df.empty:
return empty
recent = df.tail(8).copy()
current = float(recent.iloc[-1]["price"])
avg = float(recent["price"].mean())
predicted = current
confidence = min(0.72, 0.22 + len(recent) * 0.055)
if len(recent) >= 2:
span_days = max((recent.iloc[-1]["date"] - recent.iloc[0]["date"]).days, 1)
change_per_day = float((recent.iloc[-1]["price"] - recent.iloc[0]["price"]) / span_days)
predicted = current + change_per_day * 30
predicted = (predicted * 0.65) + (avg * 0.35)
volatility = float(recent["price"].std() / max(avg, 1)) if len(recent) >= 3 else 0
confidence = min(0.9, max(0.3, confidence + min(span_days, 180) / 600 - volatility))
return {
"current_price_per_liter": round(current, 2),
"predicted_price_per_liter_30_days": round(max(predicted, 0), 2),
"avg_price_per_liter": round(avg, 2),
"price_samples": int(len(df)),
"price_confidence": round(confidence, 2),
}