Files
drivers_bot/app/services/calculations.py
2026-05-12 19:14:21 +09:00

251 lines
9.5 KiB
Python

from datetime import date
from decimal import Decimal
import pandas as pd
from sqlalchemy import Select, func, select
from sqlalchemy.ext.asyncio import AsyncSession
from app.models.expense import FuelEntry, ServiceEntry
from app.schemas.expense import OdometerPrediction, OwnershipStats
async def get_ownership_stats(
session: AsyncSession, car_id: int, date_from: date, date_to: date
) -> OwnershipStats:
fuel_totals = await session.execute(
select(
func.coalesce(func.sum(FuelEntry.total_cost), 0),
func.coalesce(func.sum(FuelEntry.liters), 0),
func.count(FuelEntry.id),
func.min(FuelEntry.odometer),
func.max(FuelEntry.odometer),
).where(
FuelEntry.car_id == car_id,
FuelEntry.entry_date >= date_from,
FuelEntry.entry_date <= date_to,
)
)
fuel_cost, liters, fuel_count, min_odo, max_odo = fuel_totals.one()
service_totals = await session.execute(
select(func.coalesce(func.sum(ServiceEntry.total_cost), 0), func.count(ServiceEntry.id)).where(
ServiceEntry.car_id == car_id,
ServiceEntry.entry_date >= date_from,
ServiceEntry.entry_date <= date_to,
)
)
service_cost, service_count = service_totals.one()
distance_km = int(max_odo - min_odo) if min_odo is not None and max_odo is not None else 0
total_cost = Decimal(fuel_cost) + Decimal(service_cost)
avg_consumption = await full_tank_consumption(session, car_id, date_from, date_to)
cost_per_km = float(total_cost / distance_km) if distance_km else None
return OwnershipStats(
car_id=car_id,
date_from=date_from,
date_to=date_to,
fuel_cost=fuel_cost,
service_cost=service_cost,
total_cost=total_cost,
liters=liters,
distance_km=distance_km,
avg_consumption_l_per_100km=avg_consumption,
cost_per_km=cost_per_km,
fuel_entries_count=fuel_count,
service_entries_count=service_count,
)
async def full_tank_consumption(
session: AsyncSession, car_id: int, date_from: date, date_to: date
) -> float | None:
result = await session.execute(
select(FuelEntry)
.where(
FuelEntry.car_id == car_id,
FuelEntry.entry_date <= date_to,
)
.order_by(FuelEntry.entry_date.asc(), FuelEntry.odometer.asc(), FuelEntry.id.asc())
)
entries = list(result.scalars())
full_indexes = [index for index, entry in enumerate(entries) if entry.is_full_tank]
if len(full_indexes) < 2:
return None
total_liters = Decimal("0")
total_distance = 0
previous_full_index = full_indexes[0]
for current_full_index in full_indexes[1:]:
previous = entries[previous_full_index]
current = entries[current_full_index]
if current.entry_date < date_from:
previous_full_index = current_full_index
continue
distance = current.odometer - previous.odometer
if distance <= 0:
previous_full_index = current_full_index
continue
interval_liters = sum(
Decimal(entry.liters) for entry in entries[previous_full_index + 1 : current_full_index + 1]
)
if interval_liters > 0:
total_liters += interval_liters
total_distance += distance
previous_full_index = current_full_index
if total_distance <= 0 or total_liters <= 0:
return None
return float(total_liters * Decimal(100) / Decimal(total_distance))
async def dataframe_from_query(session: AsyncSession, stmt: Select) -> pd.DataFrame:
result = await session.execute(stmt)
rows = result.mappings().all()
return pd.DataFrame(rows)
async def predict_odometer(session: AsyncSession, car_id: int) -> OdometerPrediction:
price_prediction = await predict_fuel_price(session, car_id)
fuel = await dataframe_from_query(
session,
select(FuelEntry.entry_date.label("date"), FuelEntry.odometer.label("odometer")).where(
FuelEntry.car_id == car_id
),
)
service = await dataframe_from_query(
session,
select(ServiceEntry.entry_date.label("date"), ServiceEntry.odometer.label("odometer")).where(
ServiceEntry.car_id == car_id, ServiceEntry.odometer.is_not(None)
),
)
if fuel.empty and service.empty:
return OdometerPrediction(
car_id=car_id,
samples=0,
current_odometer=None,
predicted_today=None,
predicted_30_days=None,
avg_km_per_day=None,
avg_km_per_month=None,
**price_prediction,
confidence=0,
insight="Недостаточно данных: добавь одометр в заправках или сервисных записях.",
)
df = pd.concat([fuel, service]).dropna().drop_duplicates().sort_values("date")
df["date"] = pd.to_datetime(df["date"])
df = df[df["odometer"] >= 0]
df = df.sort_values(["date", "odometer"]).drop_duplicates(subset=["date"], keep="last")
df = df[df["odometer"].diff().fillna(0) >= 0]
if len(df) < 2:
current = int(df.iloc[-1]["odometer"])
return OdometerPrediction(
car_id=car_id,
samples=len(df),
current_odometer=current,
predicted_today=current,
predicted_30_days=None,
avg_km_per_day=None,
avg_km_per_month=None,
**price_prediction,
confidence=0.2,
insight="Есть только одна точка пробега. Для прогноза нужны минимум две записи.",
)
last = df.iloc[-1]
df["days_delta"] = df["date"].diff().dt.days
df["km_delta"] = df["odometer"].diff()
intervals = df[(df["days_delta"] > 0) & (df["km_delta"] >= 0)].copy()
intervals["km_per_day"] = intervals["km_delta"] / intervals["days_delta"]
intervals = intervals[(intervals["km_per_day"] >= 0) & (intervals["km_per_day"] <= 500)]
if intervals.empty:
km_per_day = 0
else:
recent = intervals.tail(6).copy()
recent["weight"] = range(1, len(recent) + 1)
weighted = (recent["km_per_day"] * recent["weight"]).sum() / recent["weight"].sum()
median = recent["km_per_day"].median()
km_per_day = float((weighted * 0.7) + (median * 0.3))
today = pd.Timestamp.utcnow().tz_localize(None).normalize()
days_since_last = max((today - last["date"]).days, 0)
predicted_today = int(last["odometer"] + km_per_day * days_since_last)
predicted_30 = int(predicted_today + km_per_day * 30)
span_days = max((last["date"] - df.iloc[0]["date"]).days, 1)
interval_count = len(intervals)
variability = 0 if interval_count < 3 or km_per_day == 0 else min(
float(intervals["km_per_day"].std() / max(km_per_day, 1)),
1,
)
confidence = min(
0.95,
max(0.25, 0.3 + interval_count * 0.055 + min(span_days, 365) / 900 - variability * 0.18),
)
insight = (
"Пробег стабилен, прогноз надежный."
if confidence >= 0.75
else "Прогноз предварительный: точность вырастет после регулярных записей одометра."
)
return OdometerPrediction(
car_id=car_id,
samples=len(df),
current_odometer=int(last["odometer"]),
predicted_today=predicted_today,
predicted_30_days=predicted_30,
avg_km_per_day=round(km_per_day, 1),
avg_km_per_month=round(km_per_day * 30.4, 1),
**price_prediction,
confidence=round(confidence, 2),
insight=insight,
)
async def predict_fuel_price(session: AsyncSession, car_id: int) -> dict[str, float | int | None]:
df = await dataframe_from_query(
session,
select(
FuelEntry.entry_date.label("date"),
FuelEntry.price_per_liter.label("price"),
).where(FuelEntry.car_id == car_id),
)
empty = {
"current_price_per_liter": None,
"predicted_price_per_liter_30_days": None,
"avg_price_per_liter": None,
"price_samples": 0,
"price_confidence": 0,
}
if df.empty:
return empty
df = df.dropna().copy()
if df.empty:
return empty
df["date"] = pd.to_datetime(df["date"])
df["price"] = pd.to_numeric(df["price"], errors="coerce")
df = df[(df["price"] > 0) & (df["price"] < 10000)].sort_values("date")
if df.empty:
return empty
recent = df.tail(8).copy()
current = float(recent.iloc[-1]["price"])
avg = float(recent["price"].mean())
predicted = current
confidence = min(0.72, 0.22 + len(recent) * 0.055)
if len(recent) >= 2:
span_days = max((recent.iloc[-1]["date"] - recent.iloc[0]["date"]).days, 1)
change_per_day = float((recent.iloc[-1]["price"] - recent.iloc[0]["price"]) / span_days)
predicted = current + change_per_day * 30
predicted = (predicted * 0.65) + (avg * 0.35)
volatility = float(recent["price"].std() / max(avg, 1)) if len(recent) >= 3 else 0
confidence = min(0.9, max(0.3, confidence + min(span_days, 180) / 600 - volatility))
return {
"current_price_per_liter": round(current, 2),
"predicted_price_per_liter_30_days": round(max(predicted, 0), 2),
"avg_price_per_liter": round(avg, 2),
"price_samples": int(len(df)),
"price_confidence": round(confidence, 2),
}