Files
drivers_bot/app/services/calculations.py
2026-05-14 21:19:37 +09:00

607 lines
25 KiB
Python

import calendar
from datetime import date, timedelta
from decimal import Decimal
import pandas as pd
from sqlalchemy import Select, func, select
from sqlalchemy.ext.asyncio import AsyncSession
from app.models.car import Car
from app.models.expense import ExpenseCategory, ExpenseEntry, FuelEntry, ServiceEntry
from app.schemas.expense import OdometerPrediction, OwnershipStats
from app.services.loans import generate_annuity_schedule
FIXED_EXPENSE_CATEGORIES = {
ExpenseCategory.insurance,
ExpenseCategory.tax,
ExpenseCategory.loan_payment,
ExpenseCategory.loan_interest,
ExpenseCategory.parking,
}
VARIABLE_EXPENSE_CATEGORIES = {
ExpenseCategory.fine,
ExpenseCategory.car_wash,
ExpenseCategory.toll,
ExpenseCategory.tires,
ExpenseCategory.wheels,
ExpenseCategory.battery,
ExpenseCategory.parts,
ExpenseCategory.repair,
ExpenseCategory.maintenance,
ExpenseCategory.diagnostics,
ExpenseCategory.towing,
ExpenseCategory.state_fee,
ExpenseCategory.registration,
ExpenseCategory.inspection,
ExpenseCategory.other,
}
async def get_ownership_stats(
session: AsyncSession, car_id: int, date_from: date, date_to: date
) -> OwnershipStats:
fuel_totals = await session.execute(
select(
func.coalesce(func.sum(FuelEntry.total_cost), 0),
func.coalesce(func.sum(FuelEntry.liters), 0),
func.count(FuelEntry.id),
func.min(FuelEntry.odometer),
func.max(FuelEntry.odometer),
).where(
FuelEntry.car_id == car_id,
FuelEntry.entry_date >= date_from,
FuelEntry.entry_date <= date_to,
)
)
fuel_cost, liters, fuel_count, min_odo, max_odo = fuel_totals.one()
service_totals = await session.execute(
select(func.coalesce(func.sum(ServiceEntry.total_cost), 0), func.count(ServiceEntry.id)).where(
ServiceEntry.car_id == car_id,
ServiceEntry.entry_date >= date_from,
ServiceEntry.entry_date <= date_to,
)
)
service_cost, service_count = service_totals.one()
odometer_values = [min_odo, max_odo]
service_odo = await session.execute(
select(func.min(ServiceEntry.odometer), func.max(ServiceEntry.odometer)).where(
ServiceEntry.car_id == car_id,
ServiceEntry.odometer.is_not(None),
ServiceEntry.entry_date >= date_from,
ServiceEntry.entry_date <= date_to,
)
)
expense_odo = await session.execute(
select(func.min(ExpenseEntry.odometer), func.max(ExpenseEntry.odometer)).where(
ExpenseEntry.car_id == car_id,
ExpenseEntry.odometer.is_not(None),
ExpenseEntry.entry_date >= date_from,
ExpenseEntry.entry_date <= date_to,
)
)
odometer_values.extend(service_odo.one())
odometer_values.extend(expense_odo.one())
odometer_values = [value for value in odometer_values if value is not None]
distance_km = int(max(odometer_values) - min(odometer_values)) if len(odometer_values) >= 2 else 0
(
expense_cost,
recurring_cost,
_expense_count,
expense_categories,
fixed_expense_cost,
variable_expense_cost,
) = await expense_period_totals(
session, car_id, date_from, date_to
)
car = await session.get(Car, car_id)
depreciation_cost = calculate_depreciation(car, date_from, date_to) if car else Decimal("0")
loan_principal_cost, loan_interest_cost = calculate_loan_costs(car, date_from, date_to) if car else (Decimal("0"), Decimal("0"))
total_cost = Decimal(fuel_cost) + Decimal(service_cost) + expense_cost + depreciation_cost + loan_principal_cost + loan_interest_cost
tank_metrics = await full_tank_metrics(session, car_id, date_from, date_to)
avg_consumption = tank_metrics["average_fuel_consumption_full_tank"]
cost_per_km = float(total_cost / distance_km) if distance_km else None
months = max(Decimal(period_days(date_from, date_to)) / Decimal("30.4375"), Decimal("0.033"))
cost_per_day = (total_cost / Decimal(period_days(date_from, date_to))).quantize(Decimal("0.01"))
cost_per_month = (total_cost / months).quantize(Decimal("0.01"))
recurring_total = (recurring_cost + depreciation_cost + loan_principal_cost + loan_interest_cost).quantize(Decimal("0.01"))
one_time_costs = max(total_cost - recurring_total, Decimal("0")).quantize(Decimal("0.01"))
recurring_monthly = (recurring_total / months).quantize(Decimal("0.01"))
forecast_next_month = max(cost_per_month, recurring_monthly).quantize(Decimal("0.01"))
repair_cost = (
Decimal(service_cost)
+ expense_categories.get("repair", Decimal("0"))
+ expense_categories.get("maintenance", Decimal("0"))
+ expense_categories.get("diagnostics", Decimal("0"))
).quantize(Decimal("0.01"))
fixed_costs = (fixed_expense_cost + depreciation_cost + loan_principal_cost + loan_interest_cost).quantize(Decimal("0.01"))
variable_costs = (Decimal(fuel_cost) + Decimal(service_cost) + variable_expense_cost).quantize(Decimal("0.01"))
cost_by_category = {
"fuel": Decimal(fuel_cost),
"service": Decimal(service_cost),
**expense_categories,
}
if depreciation_cost:
cost_by_category["depreciation"] = depreciation_cost
if loan_principal_cost:
cost_by_category["loan_payment"] = cost_by_category.get("loan_payment", Decimal("0")) + loan_principal_cost
if loan_interest_cost:
cost_by_category["loan_interest"] = cost_by_category.get("loan_interest", Decimal("0")) + loan_interest_cost
categories = [
{"category": key, "total_cost": value, "entries_count": 0}
for key, value in sorted(cost_by_category.items())
if value
]
current_month_cost, previous_month_cost = await month_comparison_totals(session, car_id, date_to)
month_change = None
cost_warning = None
if previous_month_cost > 0:
month_change = float((current_month_cost - previous_month_cost) * Decimal("100") / previous_month_cost)
if month_change >= 35:
cost_warning = "Расходы заметно выше прошлого месяца. Проверьте крупные ремонты, штрафы или регулярные платежи."
return OwnershipStats(
car_id=car_id,
date_from=date_from,
date_to=date_to,
fuel_cost=fuel_cost,
service_cost=service_cost,
expenses_cost=expense_cost,
total_cost=total_cost,
repair_cost=repair_cost,
fixed_costs=fixed_costs,
variable_costs=variable_costs,
recurring_costs=recurring_total,
one_time_costs=one_time_costs,
forecast_next_month=forecast_next_month,
depreciation_cost=depreciation_cost,
loan_principal_cost=loan_principal_cost,
loan_interest_cost=loan_interest_cost,
total_cost_without_credit=(total_cost - loan_principal_cost - loan_interest_cost).quantize(Decimal("0.01")),
total_cost_with_credit=total_cost.quantize(Decimal("0.01")),
cost_per_day=cost_per_day,
cost_per_month=cost_per_month,
current_month_cost=current_month_cost,
previous_month_cost=previous_month_cost,
month_over_month_change_pct=round(month_change, 2) if month_change is not None else None,
cost_warning=cost_warning,
cost_by_category=cost_by_category,
categories=categories,
liters=liters,
distance_km=distance_km,
avg_consumption_l_per_100km=avg_consumption,
cost_per_km=cost_per_km,
fuel_entries_count=fuel_count,
service_entries_count=service_count,
)
def period_days(date_from: date, date_to: date) -> int:
return max((date_to - date_from).days + 1, 1)
def add_months(value: date, months: int) -> date:
month = value.month - 1 + months
year = value.year + month // 12
month = month % 12 + 1
day = min(value.day, calendar.monthrange(year, month)[1])
return date(year, month, day)
def overlap_days(left_start: date, left_end: date, right_start: date, right_end: date) -> int:
start = max(left_start, right_start)
end = min(left_end, right_end)
if end < start:
return 0
return period_days(start, end)
def expense_window(entry: ExpenseEntry) -> tuple[date, date]:
if entry.period_start and entry.period_end:
return entry.period_start, entry.period_end
if entry.period_start and entry.period_months:
return entry.period_start, add_months(entry.period_start, entry.period_months) - timedelta(days=1)
if entry.period_months:
return entry.entry_date, add_months(entry.entry_date, entry.period_months) - timedelta(days=1)
return entry.entry_date, entry.entry_date
def allocated_expense_cost(entry: ExpenseEntry, date_from: date, date_to: date) -> Decimal:
monthly_period = entry.payment_period_months or entry.period_months or inferred_monthly_period(entry)
if monthly_period and (entry.period_start or entry.entry_date):
return allocated_monthly_expense_cost(entry, date_from, date_to, monthly_period)
start, end = expense_window(entry)
total_days = period_days(start, end)
matched_days = overlap_days(start, end, date_from, date_to)
if matched_days <= 0:
return Decimal("0")
if total_days <= 1 and start == entry.entry_date:
return Decimal(entry.total_cost)
return (Decimal(entry.total_cost) * Decimal(matched_days) / Decimal(total_days)).quantize(Decimal("0.01"))
def inferred_monthly_period(entry: ExpenseEntry) -> int | None:
if entry.category != ExpenseCategory.insurance or not entry.period_start or not entry.period_end:
return None
for months in (1, 3, 6, 12):
if add_months(entry.period_start, months) - timedelta(days=1) == entry.period_end:
return months
return None
def allocated_monthly_expense_cost(
entry: ExpenseEntry, date_from: date, date_to: date, months: int
) -> Decimal:
start = entry.period_start or entry.entry_date
if months <= 0:
return Decimal("0")
monthly_cost = Decimal(entry.total_cost) / Decimal(months)
total = Decimal("0")
for month_index in range(months):
month_start = add_months(start, month_index)
month_end = add_months(start, month_index + 1) - timedelta(days=1)
matched = overlap_days(month_start, month_end, date_from, date_to)
if matched <= 0:
continue
total_days = period_days(month_start, month_end)
total += monthly_cost * Decimal(matched) / Decimal(total_days)
return total.quantize(Decimal("0.01"))
async def expense_period_totals(
session: AsyncSession, car_id: int, date_from: date, date_to: date
) -> tuple[Decimal, Decimal, int, dict[str, Decimal], Decimal, Decimal]:
result = await session.execute(
select(ExpenseEntry)
.where(
ExpenseEntry.car_id == car_id,
ExpenseEntry.entry_date <= date_to,
)
.order_by(ExpenseEntry.entry_date.asc(), ExpenseEntry.id.asc())
)
total = Decimal("0")
recurring = Decimal("0")
fixed = Decimal("0")
variable = Decimal("0")
categories: dict[str, Decimal] = {}
count = 0
for entry in result.scalars():
amount = allocated_expense_cost(entry, date_from, date_to)
if amount <= 0:
continue
count += 1
total += amount
category = entry.category.value if isinstance(entry.category, ExpenseCategory) else str(entry.category)
categories[category] = categories.get(category, Decimal("0")) + amount
if entry.is_recurring or entry.category in FIXED_EXPENSE_CATEGORIES:
recurring += amount
if entry.category in FIXED_EXPENSE_CATEGORIES or entry.is_recurring:
fixed += amount
else:
variable += amount
return (
total.quantize(Decimal("0.01")),
recurring.quantize(Decimal("0.01")),
count,
categories,
fixed.quantize(Decimal("0.01")),
variable.quantize(Decimal("0.01")),
)
def calculate_depreciation(car: Car, date_from: date, date_to: date) -> Decimal:
if not car.include_depreciation or not car.purchase_price or not car.purchase_date:
return Decimal("0")
depreciation_start = car.purchase_date
months = car.expected_ownership_months or 60
residual = Decimal(car.expected_residual_value or 0)
depreciable = max(Decimal(car.purchase_price) - residual, Decimal("0"))
depreciation_end = add_months(car.purchase_date, months) - timedelta(days=1)
matched_days = overlap_days(depreciation_start, depreciation_end, date_from, date_to)
if matched_days <= 0:
return Decimal("0")
daily_cost = depreciable / Decimal(period_days(depreciation_start, depreciation_end))
return (daily_cost * Decimal(matched_days)).quantize(Decimal("0.01"))
def calculate_loan_costs(car: Car, date_from: date, date_to: date) -> tuple[Decimal, Decimal]:
if not car.loan_principal or not car.loan_term_months:
return Decimal("0"), Decimal("0")
first_payment = car.loan_first_payment_date or car.purchase_date
if not first_payment:
return Decimal("0"), Decimal("0")
annual_rate = Decimal(car.loan_annual_interest_rate or 0)
schedule = generate_annuity_schedule(
principal=Decimal(car.loan_principal),
months=car.loan_term_months,
annual_rate=annual_rate,
first_payment_date=first_payment,
)
principal = Decimal("0")
interest = Decimal("0")
for row in schedule:
if row.payment_date and date_from <= row.payment_date <= date_to:
principal += row.principal
interest += row.interest
return principal.quantize(Decimal("0.01")), interest.quantize(Decimal("0.01"))
async def raw_period_total(session: AsyncSession, car_id: int, date_from: date, date_to: date) -> Decimal:
fuel = (
await session.execute(
select(func.coalesce(func.sum(FuelEntry.total_cost), 0)).where(
FuelEntry.car_id == car_id,
FuelEntry.entry_date >= date_from,
FuelEntry.entry_date <= date_to,
)
)
).scalar_one()
service = (
await session.execute(
select(func.coalesce(func.sum(ServiceEntry.total_cost), 0)).where(
ServiceEntry.car_id == car_id,
ServiceEntry.entry_date >= date_from,
ServiceEntry.entry_date <= date_to,
)
)
).scalar_one()
expenses, _, _, _, _, _ = await expense_period_totals(session, car_id, date_from, date_to)
car = await session.get(Car, car_id)
depreciation = calculate_depreciation(car, date_from, date_to) if car else Decimal("0")
loan_principal, loan_interest = calculate_loan_costs(car, date_from, date_to) if car else (Decimal("0"), Decimal("0"))
return (Decimal(fuel) + Decimal(service) + expenses + depreciation + loan_principal + loan_interest).quantize(Decimal("0.01"))
async def month_comparison_totals(session: AsyncSession, car_id: int, today: date) -> tuple[Decimal, Decimal]:
current_from = today.replace(day=1)
previous_to = current_from - timedelta(days=1)
previous_from = previous_to.replace(day=1)
return (
await raw_period_total(session, car_id, current_from, today),
await raw_period_total(session, car_id, previous_from, previous_to),
)
async def full_tank_consumption(
session: AsyncSession, car_id: int, date_from: date, date_to: date
) -> float | None:
return (await full_tank_metrics(session, car_id, date_from, date_to))["average_fuel_consumption_full_tank"]
async def full_tank_metrics(
session: AsyncSession, car_id: int, date_from: date, date_to: date
) -> dict[str, float | int | str | None]:
result = await session.execute(
select(FuelEntry)
.where(
FuelEntry.car_id == car_id,
FuelEntry.entry_date <= date_to,
)
.order_by(FuelEntry.entry_date.asc(), FuelEntry.odometer.asc(), FuelEntry.id.asc())
)
entries = list(result.scalars())
full_indexes = [index for index, entry in enumerate(entries) if entry.is_full_tank]
if len(full_indexes) < 2:
return {
"average_full_tank_distance": None,
"average_fuel_consumption_full_tank": None,
"average_cost_per_full_tank": None,
"last_full_tank_distance": None,
"full_tank_warning": None,
}
intervals: list[dict] = []
previous_full_index = full_indexes[0]
for current_full_index in full_indexes[1:]:
previous = entries[previous_full_index]
current = entries[current_full_index]
if current.entry_date < date_from:
previous_full_index = current_full_index
continue
distance = current.odometer - previous.odometer
if distance <= 0:
previous_full_index = current_full_index
continue
interval_liters = sum(
Decimal(entry.liters) for entry in entries[previous_full_index + 1 : current_full_index + 1]
)
if interval_liters > 0:
interval_cost = sum(
Decimal(entry.total_cost) for entry in entries[previous_full_index + 1 : current_full_index + 1]
)
intervals.append({"distance": distance, "liters": interval_liters, "cost": interval_cost})
previous_full_index = current_full_index
if not intervals:
return {
"average_full_tank_distance": None,
"average_fuel_consumption_full_tank": None,
"average_cost_per_full_tank": None,
"last_full_tank_distance": None,
"full_tank_warning": None,
}
total_distance = sum(item["distance"] for item in intervals)
total_liters = sum((item["liters"] for item in intervals), Decimal("0"))
total_cost = sum((item["cost"] for item in intervals), Decimal("0"))
avg_distance = float(Decimal(total_distance) / Decimal(len(intervals)))
avg_consumption = float(total_liters * Decimal(100) / Decimal(total_distance))
avg_cost = float(total_cost / Decimal(len(intervals)))
last_distance = int(intervals[-1]["distance"])
warning = None
previous = intervals[:-1]
if previous:
previous_avg = float(Decimal(sum(item["distance"] for item in previous)) / Decimal(len(previous)))
if previous_avg > 0 and last_distance < previous_avg * 0.75:
drop = round((1 - last_distance / previous_avg) * 100)
warning = (
f"Обычно на полном баке получается около {previous_avg:.0f} км. "
f"Последний интервал {last_distance} км, это на {drop}% меньше. "
"Проверьте режим поездок, давление шин, качество топлива или техническое состояние."
)
return {
"average_full_tank_distance": round(avg_distance, 1),
"average_fuel_consumption_full_tank": round(avg_consumption, 2),
"average_cost_per_full_tank": round(avg_cost, 2),
"last_full_tank_distance": last_distance,
"full_tank_warning": warning,
}
async def dataframe_from_query(session: AsyncSession, stmt: Select) -> pd.DataFrame:
result = await session.execute(stmt)
rows = result.mappings().all()
return pd.DataFrame(rows)
async def predict_odometer(session: AsyncSession, car_id: int) -> OdometerPrediction:
price_prediction = await predict_fuel_price(session, car_id)
tank_prediction = await full_tank_metrics(session, car_id, date.min, date.today())
fuel = await dataframe_from_query(
session,
select(FuelEntry.entry_date.label("date"), FuelEntry.odometer.label("odometer")).where(
FuelEntry.car_id == car_id
),
)
service = await dataframe_from_query(
session,
select(ServiceEntry.entry_date.label("date"), ServiceEntry.odometer.label("odometer")).where(
ServiceEntry.car_id == car_id, ServiceEntry.odometer.is_not(None)
),
)
if fuel.empty and service.empty:
return OdometerPrediction(
car_id=car_id,
samples=0,
current_odometer=None,
predicted_today=None,
predicted_30_days=None,
avg_km_per_day=None,
avg_km_per_month=None,
**price_prediction,
**tank_prediction,
confidence=0,
insight="Недостаточно данных: добавь одометр в заправках или сервисных записях.",
)
df = pd.concat([fuel, service]).dropna().drop_duplicates().sort_values("date")
df["date"] = pd.to_datetime(df["date"])
df = df[df["odometer"] >= 0]
df = df.sort_values(["date", "odometer"]).drop_duplicates(subset=["date"], keep="last")
df = df[df["odometer"].diff().fillna(0) >= 0]
if len(df) < 2:
current = int(df.iloc[-1]["odometer"])
return OdometerPrediction(
car_id=car_id,
samples=len(df),
current_odometer=current,
predicted_today=current,
predicted_30_days=None,
avg_km_per_day=None,
avg_km_per_month=None,
**price_prediction,
**tank_prediction,
confidence=0.2,
insight="Есть только одна точка пробега. Для прогноза нужны минимум две записи.",
)
last = df.iloc[-1]
df["days_delta"] = df["date"].diff().dt.days
df["km_delta"] = df["odometer"].diff()
intervals = df[(df["days_delta"] > 0) & (df["km_delta"] >= 0)].copy()
intervals["km_per_day"] = intervals["km_delta"] / intervals["days_delta"]
intervals = intervals[(intervals["km_per_day"] >= 0) & (intervals["km_per_day"] <= 500)]
if intervals.empty:
km_per_day = 0
else:
recent = intervals.tail(6).copy()
recent["weight"] = range(1, len(recent) + 1)
weighted = (recent["km_per_day"] * recent["weight"]).sum() / recent["weight"].sum()
median = recent["km_per_day"].median()
km_per_day = float((weighted * 0.7) + (median * 0.3))
today = pd.Timestamp.utcnow().tz_localize(None).normalize()
days_since_last = max((today - last["date"]).days, 0)
predicted_today = int(last["odometer"] + km_per_day * days_since_last)
predicted_30 = int(predicted_today + km_per_day * 30)
span_days = max((last["date"] - df.iloc[0]["date"]).days, 1)
interval_count = len(intervals)
variability = 0 if interval_count < 3 or km_per_day == 0 else min(
float(intervals["km_per_day"].std() / max(km_per_day, 1)),
1,
)
confidence = min(
0.95,
max(0.25, 0.3 + interval_count * 0.055 + min(span_days, 365) / 900 - variability * 0.18),
)
insight = (
"Пробег стабилен, прогноз надежный."
if confidence >= 0.75
else "Прогноз предварительный: точность вырастет после регулярных записей одометра."
)
return OdometerPrediction(
car_id=car_id,
samples=len(df),
current_odometer=int(last["odometer"]),
predicted_today=predicted_today,
predicted_30_days=predicted_30,
avg_km_per_day=round(km_per_day, 1),
avg_km_per_month=round(km_per_day * 30.4, 1),
**price_prediction,
**tank_prediction,
confidence=round(confidence, 2),
insight=insight,
)
async def predict_fuel_price(session: AsyncSession, car_id: int) -> dict[str, float | int | None]:
df = await dataframe_from_query(
session,
select(
FuelEntry.entry_date.label("date"),
FuelEntry.price_per_liter.label("price"),
).where(FuelEntry.car_id == car_id),
)
empty = {
"current_price_per_liter": None,
"predicted_price_per_liter_30_days": None,
"avg_price_per_liter": None,
"price_samples": 0,
"price_confidence": 0,
}
if df.empty:
return empty
df = df.dropna().copy()
if df.empty:
return empty
df["date"] = pd.to_datetime(df["date"])
df["price"] = pd.to_numeric(df["price"], errors="coerce")
df = df[(df["price"] > 0) & (df["price"] < 10000)].sort_values("date")
if df.empty:
return empty
recent = df.tail(8).copy()
current = float(recent.iloc[-1]["price"])
avg = float(recent["price"].mean())
predicted = current
confidence = min(0.72, 0.22 + len(recent) * 0.055)
if len(recent) >= 2:
span_days = max((recent.iloc[-1]["date"] - recent.iloc[0]["date"]).days, 1)
change_per_day = float((recent.iloc[-1]["price"] - recent.iloc[0]["price"]) / span_days)
predicted = current + change_per_day * 30
predicted = (predicted * 0.65) + (avg * 0.35)
volatility = float(recent["price"].std() / max(avg, 1)) if len(recent) >= 3 else 0
confidence = min(0.9, max(0.3, confidence + min(span_days, 180) / 600 - volatility))
return {
"current_price_per_liter": round(current, 2),
"predicted_price_per_liter_30_days": round(max(predicted, 0), 2),
"avg_price_per_liter": round(avg, 2),
"price_samples": int(len(df)),
"price_confidence": round(confidence, 2),
}