608 lines
25 KiB
Python
608 lines
25 KiB
Python
import calendar
|
|
from datetime import date, timedelta
|
|
from decimal import Decimal
|
|
|
|
import pandas as pd
|
|
from sqlalchemy import Select, func, select
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
|
|
from app.models.car import Car
|
|
from app.models.expense import ExpenseCategory, ExpenseEntry, FuelEntry, ServiceEntry
|
|
from app.schemas.expense import OdometerPrediction, OwnershipStats
|
|
from app.services.loans import generate_annuity_schedule
|
|
|
|
FIXED_EXPENSE_CATEGORIES = {
|
|
ExpenseCategory.insurance,
|
|
ExpenseCategory.tax,
|
|
ExpenseCategory.loan_payment,
|
|
ExpenseCategory.loan_interest,
|
|
ExpenseCategory.parking,
|
|
}
|
|
VARIABLE_EXPENSE_CATEGORIES = {
|
|
ExpenseCategory.fine,
|
|
ExpenseCategory.car_wash,
|
|
ExpenseCategory.toll,
|
|
ExpenseCategory.tires,
|
|
ExpenseCategory.wheels,
|
|
ExpenseCategory.battery,
|
|
ExpenseCategory.parts,
|
|
ExpenseCategory.repair,
|
|
ExpenseCategory.maintenance,
|
|
ExpenseCategory.diagnostics,
|
|
ExpenseCategory.towing,
|
|
ExpenseCategory.state_fee,
|
|
ExpenseCategory.registration,
|
|
ExpenseCategory.inspection,
|
|
ExpenseCategory.other,
|
|
}
|
|
|
|
|
|
async def get_ownership_stats(
|
|
session: AsyncSession, car_id: int, date_from: date, date_to: date
|
|
) -> OwnershipStats:
|
|
fuel_totals = await session.execute(
|
|
select(
|
|
func.coalesce(func.sum(FuelEntry.total_cost), 0),
|
|
func.coalesce(func.sum(FuelEntry.liters), 0),
|
|
func.count(FuelEntry.id),
|
|
func.min(FuelEntry.odometer),
|
|
func.max(FuelEntry.odometer),
|
|
).where(
|
|
FuelEntry.car_id == car_id,
|
|
FuelEntry.entry_date >= date_from,
|
|
FuelEntry.entry_date <= date_to,
|
|
)
|
|
)
|
|
fuel_cost, liters, fuel_count, min_odo, max_odo = fuel_totals.one()
|
|
|
|
service_totals = await session.execute(
|
|
select(func.coalesce(func.sum(ServiceEntry.total_cost), 0), func.count(ServiceEntry.id)).where(
|
|
ServiceEntry.car_id == car_id,
|
|
ServiceEntry.entry_date >= date_from,
|
|
ServiceEntry.entry_date <= date_to,
|
|
)
|
|
)
|
|
service_cost, service_count = service_totals.one()
|
|
|
|
odometer_values = [min_odo, max_odo]
|
|
service_odo = await session.execute(
|
|
select(func.min(ServiceEntry.odometer), func.max(ServiceEntry.odometer)).where(
|
|
ServiceEntry.car_id == car_id,
|
|
ServiceEntry.odometer.is_not(None),
|
|
ServiceEntry.entry_date >= date_from,
|
|
ServiceEntry.entry_date <= date_to,
|
|
)
|
|
)
|
|
expense_odo = await session.execute(
|
|
select(func.min(ExpenseEntry.odometer), func.max(ExpenseEntry.odometer)).where(
|
|
ExpenseEntry.car_id == car_id,
|
|
ExpenseEntry.odometer.is_not(None),
|
|
ExpenseEntry.entry_date >= date_from,
|
|
ExpenseEntry.entry_date <= date_to,
|
|
)
|
|
)
|
|
odometer_values.extend(service_odo.one())
|
|
odometer_values.extend(expense_odo.one())
|
|
odometer_values = [value for value in odometer_values if value is not None]
|
|
distance_km = int(max(odometer_values) - min(odometer_values)) if len(odometer_values) >= 2 else 0
|
|
|
|
(
|
|
expense_cost,
|
|
recurring_cost,
|
|
_expense_count,
|
|
expense_categories,
|
|
fixed_expense_cost,
|
|
variable_expense_cost,
|
|
) = await expense_period_totals(
|
|
session, car_id, date_from, date_to
|
|
)
|
|
car = await session.get(Car, car_id)
|
|
depreciation_cost = calculate_depreciation(car, date_from, date_to) if car else Decimal("0")
|
|
loan_principal_cost, loan_interest_cost = calculate_loan_costs(car, date_from, date_to) if car else (Decimal("0"), Decimal("0"))
|
|
|
|
total_cost = Decimal(fuel_cost) + Decimal(service_cost) + expense_cost + depreciation_cost + loan_principal_cost + loan_interest_cost
|
|
tank_metrics = await full_tank_metrics(session, car_id, date_from, date_to)
|
|
avg_consumption = tank_metrics["average_fuel_consumption_full_tank"]
|
|
cost_per_km = float(total_cost / distance_km) if distance_km else None
|
|
months = max(Decimal(period_days(date_from, date_to)) / Decimal("30.4375"), Decimal("0.033"))
|
|
cost_per_day = (total_cost / Decimal(period_days(date_from, date_to))).quantize(Decimal("0.01"))
|
|
cost_per_month = (total_cost / months).quantize(Decimal("0.01"))
|
|
recurring_total = (recurring_cost + depreciation_cost + loan_principal_cost + loan_interest_cost).quantize(Decimal("0.01"))
|
|
one_time_costs = max(total_cost - recurring_total, Decimal("0")).quantize(Decimal("0.01"))
|
|
recurring_monthly = (recurring_total / months).quantize(Decimal("0.01"))
|
|
forecast_next_month = max(cost_per_month, recurring_monthly).quantize(Decimal("0.01"))
|
|
repair_cost = (
|
|
Decimal(service_cost)
|
|
+ expense_categories.get("repair", Decimal("0"))
|
|
+ expense_categories.get("maintenance", Decimal("0"))
|
|
+ expense_categories.get("diagnostics", Decimal("0"))
|
|
).quantize(Decimal("0.01"))
|
|
fixed_costs = (fixed_expense_cost + depreciation_cost + loan_principal_cost + loan_interest_cost).quantize(Decimal("0.01"))
|
|
variable_costs = (Decimal(fuel_cost) + Decimal(service_cost) + variable_expense_cost).quantize(Decimal("0.01"))
|
|
|
|
cost_by_category = {
|
|
"fuel": Decimal(fuel_cost),
|
|
"service": Decimal(service_cost),
|
|
**expense_categories,
|
|
}
|
|
if depreciation_cost:
|
|
cost_by_category["depreciation"] = depreciation_cost
|
|
if loan_principal_cost:
|
|
cost_by_category["loan_payment"] = cost_by_category.get("loan_payment", Decimal("0")) + loan_principal_cost
|
|
if loan_interest_cost:
|
|
cost_by_category["loan_interest"] = cost_by_category.get("loan_interest", Decimal("0")) + loan_interest_cost
|
|
categories = [
|
|
{"category": key, "total_cost": value, "entries_count": 0}
|
|
for key, value in sorted(cost_by_category.items())
|
|
if value
|
|
]
|
|
current_month_cost, previous_month_cost = await month_comparison_totals(session, car_id, date_to)
|
|
month_change = None
|
|
cost_warning = None
|
|
if previous_month_cost > 0:
|
|
month_change = float((current_month_cost - previous_month_cost) * Decimal("100") / previous_month_cost)
|
|
if month_change >= 35:
|
|
cost_warning = "Расходы заметно выше прошлого месяца. Проверьте крупные ремонты, штрафы или регулярные платежи."
|
|
|
|
return OwnershipStats(
|
|
car_id=car_id,
|
|
date_from=date_from,
|
|
date_to=date_to,
|
|
fuel_cost=fuel_cost,
|
|
service_cost=service_cost,
|
|
expenses_cost=expense_cost,
|
|
total_cost=total_cost,
|
|
repair_cost=repair_cost,
|
|
fixed_costs=fixed_costs,
|
|
variable_costs=variable_costs,
|
|
recurring_costs=recurring_total,
|
|
one_time_costs=one_time_costs,
|
|
forecast_next_month=forecast_next_month,
|
|
depreciation_cost=depreciation_cost,
|
|
loan_principal_cost=loan_principal_cost,
|
|
loan_interest_cost=loan_interest_cost,
|
|
total_cost_without_credit=(total_cost - loan_principal_cost - loan_interest_cost).quantize(Decimal("0.01")),
|
|
total_cost_with_credit=total_cost.quantize(Decimal("0.01")),
|
|
cost_per_day=cost_per_day,
|
|
cost_per_month=cost_per_month,
|
|
current_month_cost=current_month_cost,
|
|
previous_month_cost=previous_month_cost,
|
|
month_over_month_change_pct=round(month_change, 2) if month_change is not None else None,
|
|
cost_warning=cost_warning,
|
|
cost_by_category=cost_by_category,
|
|
categories=categories,
|
|
liters=liters,
|
|
distance_km=distance_km,
|
|
avg_consumption_l_per_100km=avg_consumption,
|
|
cost_per_km=cost_per_km,
|
|
fuel_entries_count=fuel_count,
|
|
service_entries_count=service_count,
|
|
)
|
|
|
|
|
|
def period_days(date_from: date, date_to: date) -> int:
|
|
return max((date_to - date_from).days + 1, 1)
|
|
|
|
|
|
def add_months(value: date, months: int) -> date:
|
|
month = value.month - 1 + months
|
|
year = value.year + month // 12
|
|
month = month % 12 + 1
|
|
day = min(value.day, calendar.monthrange(year, month)[1])
|
|
return date(year, month, day)
|
|
|
|
|
|
def overlap_days(left_start: date, left_end: date, right_start: date, right_end: date) -> int:
|
|
start = max(left_start, right_start)
|
|
end = min(left_end, right_end)
|
|
if end < start:
|
|
return 0
|
|
return period_days(start, end)
|
|
|
|
|
|
def expense_window(entry: ExpenseEntry) -> tuple[date, date]:
|
|
if entry.period_start and entry.period_end:
|
|
return entry.period_start, entry.period_end
|
|
if entry.period_start and entry.period_months:
|
|
return entry.period_start, add_months(entry.period_start, entry.period_months) - timedelta(days=1)
|
|
if entry.period_months:
|
|
return entry.entry_date, add_months(entry.entry_date, entry.period_months) - timedelta(days=1)
|
|
return entry.entry_date, entry.entry_date
|
|
|
|
|
|
def allocated_expense_cost(entry: ExpenseEntry, date_from: date, date_to: date) -> Decimal:
|
|
monthly_period = entry.payment_period_months or entry.period_months or inferred_monthly_period(entry)
|
|
if monthly_period and (entry.period_start or entry.entry_date):
|
|
return allocated_monthly_expense_cost(entry, date_from, date_to, monthly_period)
|
|
start, end = expense_window(entry)
|
|
total_days = period_days(start, end)
|
|
matched_days = overlap_days(start, end, date_from, date_to)
|
|
if matched_days <= 0:
|
|
return Decimal("0")
|
|
if total_days <= 1 and start == entry.entry_date:
|
|
return Decimal(entry.total_cost)
|
|
return (Decimal(entry.total_cost) * Decimal(matched_days) / Decimal(total_days)).quantize(Decimal("0.01"))
|
|
|
|
|
|
def inferred_monthly_period(entry: ExpenseEntry) -> int | None:
|
|
if entry.category != ExpenseCategory.insurance or not entry.period_start or not entry.period_end:
|
|
return None
|
|
for months in (1, 3, 6, 12):
|
|
if add_months(entry.period_start, months) - timedelta(days=1) == entry.period_end:
|
|
return months
|
|
return None
|
|
|
|
|
|
def allocated_monthly_expense_cost(
|
|
entry: ExpenseEntry, date_from: date, date_to: date, months: int
|
|
) -> Decimal:
|
|
start = entry.period_start or entry.entry_date
|
|
if months <= 0:
|
|
return Decimal("0")
|
|
monthly_cost = Decimal(entry.total_cost) / Decimal(months)
|
|
total = Decimal("0")
|
|
for month_index in range(months):
|
|
month_start = add_months(start, month_index)
|
|
month_end = add_months(start, month_index + 1) - timedelta(days=1)
|
|
matched = overlap_days(month_start, month_end, date_from, date_to)
|
|
if matched <= 0:
|
|
continue
|
|
total_days = period_days(month_start, month_end)
|
|
total += monthly_cost * Decimal(matched) / Decimal(total_days)
|
|
return total.quantize(Decimal("0.01"))
|
|
|
|
|
|
async def expense_period_totals(
|
|
session: AsyncSession, car_id: int, date_from: date, date_to: date
|
|
) -> tuple[Decimal, Decimal, int, dict[str, Decimal], Decimal, Decimal]:
|
|
result = await session.execute(
|
|
select(ExpenseEntry)
|
|
.where(
|
|
ExpenseEntry.car_id == car_id,
|
|
ExpenseEntry.entry_date <= date_to,
|
|
ExpenseEntry.service_visit_id.is_(None),
|
|
)
|
|
.order_by(ExpenseEntry.entry_date.asc(), ExpenseEntry.id.asc())
|
|
)
|
|
total = Decimal("0")
|
|
recurring = Decimal("0")
|
|
fixed = Decimal("0")
|
|
variable = Decimal("0")
|
|
categories: dict[str, Decimal] = {}
|
|
count = 0
|
|
for entry in result.scalars():
|
|
amount = allocated_expense_cost(entry, date_from, date_to)
|
|
if amount <= 0:
|
|
continue
|
|
count += 1
|
|
total += amount
|
|
category = entry.category.value if isinstance(entry.category, ExpenseCategory) else str(entry.category)
|
|
categories[category] = categories.get(category, Decimal("0")) + amount
|
|
if entry.is_recurring or entry.category in FIXED_EXPENSE_CATEGORIES:
|
|
recurring += amount
|
|
if entry.category in FIXED_EXPENSE_CATEGORIES or entry.is_recurring:
|
|
fixed += amount
|
|
else:
|
|
variable += amount
|
|
return (
|
|
total.quantize(Decimal("0.01")),
|
|
recurring.quantize(Decimal("0.01")),
|
|
count,
|
|
categories,
|
|
fixed.quantize(Decimal("0.01")),
|
|
variable.quantize(Decimal("0.01")),
|
|
)
|
|
|
|
|
|
def calculate_depreciation(car: Car, date_from: date, date_to: date) -> Decimal:
|
|
if not car.include_depreciation or not car.purchase_price or not car.purchase_date:
|
|
return Decimal("0")
|
|
depreciation_start = car.purchase_date
|
|
months = car.expected_ownership_months or 60
|
|
residual = Decimal(car.expected_residual_value or 0)
|
|
depreciable = max(Decimal(car.purchase_price) - residual, Decimal("0"))
|
|
depreciation_end = add_months(car.purchase_date, months) - timedelta(days=1)
|
|
matched_days = overlap_days(depreciation_start, depreciation_end, date_from, date_to)
|
|
if matched_days <= 0:
|
|
return Decimal("0")
|
|
daily_cost = depreciable / Decimal(period_days(depreciation_start, depreciation_end))
|
|
return (daily_cost * Decimal(matched_days)).quantize(Decimal("0.01"))
|
|
|
|
|
|
def calculate_loan_costs(car: Car, date_from: date, date_to: date) -> tuple[Decimal, Decimal]:
|
|
if not car.loan_principal or not car.loan_term_months:
|
|
return Decimal("0"), Decimal("0")
|
|
first_payment = car.loan_first_payment_date or car.purchase_date
|
|
if not first_payment:
|
|
return Decimal("0"), Decimal("0")
|
|
annual_rate = Decimal(car.loan_annual_interest_rate or 0)
|
|
schedule = generate_annuity_schedule(
|
|
principal=Decimal(car.loan_principal),
|
|
months=car.loan_term_months,
|
|
annual_rate=annual_rate,
|
|
first_payment_date=first_payment,
|
|
)
|
|
principal = Decimal("0")
|
|
interest = Decimal("0")
|
|
for row in schedule:
|
|
if row.payment_date and date_from <= row.payment_date <= date_to:
|
|
principal += row.principal
|
|
interest += row.interest
|
|
return principal.quantize(Decimal("0.01")), interest.quantize(Decimal("0.01"))
|
|
|
|
|
|
async def raw_period_total(session: AsyncSession, car_id: int, date_from: date, date_to: date) -> Decimal:
|
|
fuel = (
|
|
await session.execute(
|
|
select(func.coalesce(func.sum(FuelEntry.total_cost), 0)).where(
|
|
FuelEntry.car_id == car_id,
|
|
FuelEntry.entry_date >= date_from,
|
|
FuelEntry.entry_date <= date_to,
|
|
)
|
|
)
|
|
).scalar_one()
|
|
service = (
|
|
await session.execute(
|
|
select(func.coalesce(func.sum(ServiceEntry.total_cost), 0)).where(
|
|
ServiceEntry.car_id == car_id,
|
|
ServiceEntry.entry_date >= date_from,
|
|
ServiceEntry.entry_date <= date_to,
|
|
)
|
|
)
|
|
).scalar_one()
|
|
expenses, _, _, _, _, _ = await expense_period_totals(session, car_id, date_from, date_to)
|
|
car = await session.get(Car, car_id)
|
|
depreciation = calculate_depreciation(car, date_from, date_to) if car else Decimal("0")
|
|
loan_principal, loan_interest = calculate_loan_costs(car, date_from, date_to) if car else (Decimal("0"), Decimal("0"))
|
|
return (Decimal(fuel) + Decimal(service) + expenses + depreciation + loan_principal + loan_interest).quantize(Decimal("0.01"))
|
|
|
|
|
|
async def month_comparison_totals(session: AsyncSession, car_id: int, today: date) -> tuple[Decimal, Decimal]:
|
|
current_from = today.replace(day=1)
|
|
previous_to = current_from - timedelta(days=1)
|
|
previous_from = previous_to.replace(day=1)
|
|
return (
|
|
await raw_period_total(session, car_id, current_from, today),
|
|
await raw_period_total(session, car_id, previous_from, previous_to),
|
|
)
|
|
|
|
|
|
async def full_tank_consumption(
|
|
session: AsyncSession, car_id: int, date_from: date, date_to: date
|
|
) -> float | None:
|
|
return (await full_tank_metrics(session, car_id, date_from, date_to))["average_fuel_consumption_full_tank"]
|
|
|
|
|
|
async def full_tank_metrics(
|
|
session: AsyncSession, car_id: int, date_from: date, date_to: date
|
|
) -> dict[str, float | int | str | None]:
|
|
result = await session.execute(
|
|
select(FuelEntry)
|
|
.where(
|
|
FuelEntry.car_id == car_id,
|
|
FuelEntry.entry_date <= date_to,
|
|
)
|
|
.order_by(FuelEntry.entry_date.asc(), FuelEntry.odometer.asc(), FuelEntry.id.asc())
|
|
)
|
|
entries = list(result.scalars())
|
|
full_indexes = [index for index, entry in enumerate(entries) if entry.is_full_tank]
|
|
if len(full_indexes) < 2:
|
|
return {
|
|
"average_full_tank_distance": None,
|
|
"average_fuel_consumption_full_tank": None,
|
|
"average_cost_per_full_tank": None,
|
|
"last_full_tank_distance": None,
|
|
"full_tank_warning": None,
|
|
}
|
|
|
|
intervals: list[dict] = []
|
|
previous_full_index = full_indexes[0]
|
|
for current_full_index in full_indexes[1:]:
|
|
previous = entries[previous_full_index]
|
|
current = entries[current_full_index]
|
|
if current.entry_date < date_from:
|
|
previous_full_index = current_full_index
|
|
continue
|
|
distance = current.odometer - previous.odometer
|
|
if distance <= 0:
|
|
previous_full_index = current_full_index
|
|
continue
|
|
interval_liters = sum(
|
|
Decimal(entry.liters) for entry in entries[previous_full_index + 1 : current_full_index + 1]
|
|
)
|
|
if interval_liters > 0:
|
|
interval_cost = sum(
|
|
Decimal(entry.total_cost) for entry in entries[previous_full_index + 1 : current_full_index + 1]
|
|
)
|
|
intervals.append({"distance": distance, "liters": interval_liters, "cost": interval_cost})
|
|
previous_full_index = current_full_index
|
|
|
|
if not intervals:
|
|
return {
|
|
"average_full_tank_distance": None,
|
|
"average_fuel_consumption_full_tank": None,
|
|
"average_cost_per_full_tank": None,
|
|
"last_full_tank_distance": None,
|
|
"full_tank_warning": None,
|
|
}
|
|
total_distance = sum(item["distance"] for item in intervals)
|
|
total_liters = sum((item["liters"] for item in intervals), Decimal("0"))
|
|
total_cost = sum((item["cost"] for item in intervals), Decimal("0"))
|
|
avg_distance = float(Decimal(total_distance) / Decimal(len(intervals)))
|
|
avg_consumption = float(total_liters * Decimal(100) / Decimal(total_distance))
|
|
avg_cost = float(total_cost / Decimal(len(intervals)))
|
|
last_distance = int(intervals[-1]["distance"])
|
|
warning = None
|
|
previous = intervals[:-1]
|
|
if previous:
|
|
previous_avg = float(Decimal(sum(item["distance"] for item in previous)) / Decimal(len(previous)))
|
|
if previous_avg > 0 and last_distance < previous_avg * 0.75:
|
|
drop = round((1 - last_distance / previous_avg) * 100)
|
|
warning = (
|
|
f"Обычно на полном баке получается около {previous_avg:.0f} км. "
|
|
f"Последний интервал {last_distance} км, это на {drop}% меньше. "
|
|
"Проверьте режим поездок, давление шин, качество топлива или техническое состояние."
|
|
)
|
|
return {
|
|
"average_full_tank_distance": round(avg_distance, 1),
|
|
"average_fuel_consumption_full_tank": round(avg_consumption, 2),
|
|
"average_cost_per_full_tank": round(avg_cost, 2),
|
|
"last_full_tank_distance": last_distance,
|
|
"full_tank_warning": warning,
|
|
}
|
|
|
|
|
|
async def dataframe_from_query(session: AsyncSession, stmt: Select) -> pd.DataFrame:
|
|
result = await session.execute(stmt)
|
|
rows = result.mappings().all()
|
|
return pd.DataFrame(rows)
|
|
|
|
|
|
async def predict_odometer(session: AsyncSession, car_id: int) -> OdometerPrediction:
|
|
price_prediction = await predict_fuel_price(session, car_id)
|
|
tank_prediction = await full_tank_metrics(session, car_id, date.min, date.today())
|
|
fuel = await dataframe_from_query(
|
|
session,
|
|
select(FuelEntry.entry_date.label("date"), FuelEntry.odometer.label("odometer")).where(
|
|
FuelEntry.car_id == car_id
|
|
),
|
|
)
|
|
service = await dataframe_from_query(
|
|
session,
|
|
select(ServiceEntry.entry_date.label("date"), ServiceEntry.odometer.label("odometer")).where(
|
|
ServiceEntry.car_id == car_id, ServiceEntry.odometer.is_not(None)
|
|
),
|
|
)
|
|
if fuel.empty and service.empty:
|
|
return OdometerPrediction(
|
|
car_id=car_id,
|
|
samples=0,
|
|
current_odometer=None,
|
|
predicted_today=None,
|
|
predicted_30_days=None,
|
|
avg_km_per_day=None,
|
|
avg_km_per_month=None,
|
|
**price_prediction,
|
|
**tank_prediction,
|
|
confidence=0,
|
|
insight="Недостаточно данных: добавь одометр в заправках или сервисных записях.",
|
|
)
|
|
|
|
df = pd.concat([fuel, service]).dropna().drop_duplicates().sort_values("date")
|
|
df["date"] = pd.to_datetime(df["date"])
|
|
df = df[df["odometer"] >= 0]
|
|
df = df.sort_values(["date", "odometer"]).drop_duplicates(subset=["date"], keep="last")
|
|
df = df[df["odometer"].diff().fillna(0) >= 0]
|
|
if len(df) < 2:
|
|
current = int(df.iloc[-1]["odometer"])
|
|
return OdometerPrediction(
|
|
car_id=car_id,
|
|
samples=len(df),
|
|
current_odometer=current,
|
|
predicted_today=current,
|
|
predicted_30_days=None,
|
|
avg_km_per_day=None,
|
|
avg_km_per_month=None,
|
|
**price_prediction,
|
|
**tank_prediction,
|
|
confidence=0.2,
|
|
insight="Есть только одна точка пробега. Для прогноза нужны минимум две записи.",
|
|
)
|
|
|
|
last = df.iloc[-1]
|
|
df["days_delta"] = df["date"].diff().dt.days
|
|
df["km_delta"] = df["odometer"].diff()
|
|
intervals = df[(df["days_delta"] > 0) & (df["km_delta"] >= 0)].copy()
|
|
intervals["km_per_day"] = intervals["km_delta"] / intervals["days_delta"]
|
|
intervals = intervals[(intervals["km_per_day"] >= 0) & (intervals["km_per_day"] <= 500)]
|
|
if intervals.empty:
|
|
km_per_day = 0
|
|
else:
|
|
recent = intervals.tail(6).copy()
|
|
recent["weight"] = range(1, len(recent) + 1)
|
|
weighted = (recent["km_per_day"] * recent["weight"]).sum() / recent["weight"].sum()
|
|
median = recent["km_per_day"].median()
|
|
km_per_day = float((weighted * 0.7) + (median * 0.3))
|
|
today = pd.Timestamp.utcnow().tz_localize(None).normalize()
|
|
days_since_last = max((today - last["date"]).days, 0)
|
|
predicted_today = int(last["odometer"] + km_per_day * days_since_last)
|
|
predicted_30 = int(predicted_today + km_per_day * 30)
|
|
span_days = max((last["date"] - df.iloc[0]["date"]).days, 1)
|
|
interval_count = len(intervals)
|
|
variability = 0 if interval_count < 3 or km_per_day == 0 else min(
|
|
float(intervals["km_per_day"].std() / max(km_per_day, 1)),
|
|
1,
|
|
)
|
|
confidence = min(
|
|
0.95,
|
|
max(0.25, 0.3 + interval_count * 0.055 + min(span_days, 365) / 900 - variability * 0.18),
|
|
)
|
|
insight = (
|
|
"Пробег стабилен, прогноз надежный."
|
|
if confidence >= 0.75
|
|
else "Прогноз предварительный: точность вырастет после регулярных записей одометра."
|
|
)
|
|
return OdometerPrediction(
|
|
car_id=car_id,
|
|
samples=len(df),
|
|
current_odometer=int(last["odometer"]),
|
|
predicted_today=predicted_today,
|
|
predicted_30_days=predicted_30,
|
|
avg_km_per_day=round(km_per_day, 1),
|
|
avg_km_per_month=round(km_per_day * 30.4, 1),
|
|
**price_prediction,
|
|
**tank_prediction,
|
|
confidence=round(confidence, 2),
|
|
insight=insight,
|
|
)
|
|
|
|
|
|
async def predict_fuel_price(session: AsyncSession, car_id: int) -> dict[str, float | int | None]:
|
|
df = await dataframe_from_query(
|
|
session,
|
|
select(
|
|
FuelEntry.entry_date.label("date"),
|
|
FuelEntry.price_per_liter.label("price"),
|
|
).where(FuelEntry.car_id == car_id),
|
|
)
|
|
empty = {
|
|
"current_price_per_liter": None,
|
|
"predicted_price_per_liter_30_days": None,
|
|
"avg_price_per_liter": None,
|
|
"price_samples": 0,
|
|
"price_confidence": 0,
|
|
}
|
|
if df.empty:
|
|
return empty
|
|
|
|
df = df.dropna().copy()
|
|
if df.empty:
|
|
return empty
|
|
df["date"] = pd.to_datetime(df["date"])
|
|
df["price"] = pd.to_numeric(df["price"], errors="coerce")
|
|
df = df[(df["price"] > 0) & (df["price"] < 10000)].sort_values("date")
|
|
if df.empty:
|
|
return empty
|
|
|
|
recent = df.tail(8).copy()
|
|
current = float(recent.iloc[-1]["price"])
|
|
avg = float(recent["price"].mean())
|
|
predicted = current
|
|
confidence = min(0.72, 0.22 + len(recent) * 0.055)
|
|
|
|
if len(recent) >= 2:
|
|
span_days = max((recent.iloc[-1]["date"] - recent.iloc[0]["date"]).days, 1)
|
|
change_per_day = float((recent.iloc[-1]["price"] - recent.iloc[0]["price"]) / span_days)
|
|
predicted = current + change_per_day * 30
|
|
predicted = (predicted * 0.65) + (avg * 0.35)
|
|
volatility = float(recent["price"].std() / max(avg, 1)) if len(recent) >= 3 else 0
|
|
confidence = min(0.9, max(0.3, confidence + min(span_days, 180) / 600 - volatility))
|
|
|
|
return {
|
|
"current_price_per_liter": round(current, 2),
|
|
"predicted_price_per_liter_30_days": round(max(predicted, 0), 2),
|
|
"avg_price_per_liter": round(avg, 2),
|
|
"price_samples": int(len(df)),
|
|
"price_confidence": round(confidence, 2),
|
|
}
|