import calendar from datetime import date, timedelta from decimal import Decimal import pandas as pd from sqlalchemy import Select, func, select from sqlalchemy.ext.asyncio import AsyncSession from app.models.car import Car from app.models.expense import ExpenseCategory, ExpenseEntry, FuelEntry, ServiceEntry from app.schemas.expense import OdometerPrediction, OwnershipStats from app.services.loans import generate_annuity_schedule FIXED_EXPENSE_CATEGORIES = { ExpenseCategory.insurance, ExpenseCategory.tax, ExpenseCategory.loan_payment, ExpenseCategory.loan_interest, ExpenseCategory.parking, } VARIABLE_EXPENSE_CATEGORIES = { ExpenseCategory.fine, ExpenseCategory.car_wash, ExpenseCategory.toll, ExpenseCategory.tires, ExpenseCategory.wheels, ExpenseCategory.battery, ExpenseCategory.parts, ExpenseCategory.repair, ExpenseCategory.maintenance, ExpenseCategory.diagnostics, ExpenseCategory.towing, ExpenseCategory.state_fee, ExpenseCategory.registration, ExpenseCategory.inspection, ExpenseCategory.other, } async def get_ownership_stats( session: AsyncSession, car_id: int, date_from: date, date_to: date ) -> OwnershipStats: fuel_totals = await session.execute( select( func.coalesce(func.sum(FuelEntry.total_cost), 0), func.coalesce(func.sum(FuelEntry.liters), 0), func.count(FuelEntry.id), func.min(FuelEntry.odometer), func.max(FuelEntry.odometer), ).where( FuelEntry.car_id == car_id, FuelEntry.entry_date >= date_from, FuelEntry.entry_date <= date_to, ) ) fuel_cost, liters, fuel_count, min_odo, max_odo = fuel_totals.one() service_totals = await session.execute( select(func.coalesce(func.sum(ServiceEntry.total_cost), 0), func.count(ServiceEntry.id)).where( ServiceEntry.car_id == car_id, ServiceEntry.entry_date >= date_from, ServiceEntry.entry_date <= date_to, ) ) service_cost, service_count = service_totals.one() odometer_values = [min_odo, max_odo] service_odo = await session.execute( select(func.min(ServiceEntry.odometer), func.max(ServiceEntry.odometer)).where( ServiceEntry.car_id == car_id, ServiceEntry.odometer.is_not(None), ServiceEntry.entry_date >= date_from, ServiceEntry.entry_date <= date_to, ) ) expense_odo = await session.execute( select(func.min(ExpenseEntry.odometer), func.max(ExpenseEntry.odometer)).where( ExpenseEntry.car_id == car_id, ExpenseEntry.odometer.is_not(None), ExpenseEntry.entry_date >= date_from, ExpenseEntry.entry_date <= date_to, ) ) odometer_values.extend(service_odo.one()) odometer_values.extend(expense_odo.one()) odometer_values = [value for value in odometer_values if value is not None] distance_km = int(max(odometer_values) - min(odometer_values)) if len(odometer_values) >= 2 else 0 ( expense_cost, recurring_cost, _expense_count, expense_categories, fixed_expense_cost, variable_expense_cost, ) = await expense_period_totals( session, car_id, date_from, date_to ) car = await session.get(Car, car_id) depreciation_cost = calculate_depreciation(car, date_from, date_to) if car else Decimal("0") loan_principal_cost, loan_interest_cost = calculate_loan_costs(car, date_from, date_to) if car else (Decimal("0"), Decimal("0")) total_cost = Decimal(fuel_cost) + Decimal(service_cost) + expense_cost + depreciation_cost + loan_principal_cost + loan_interest_cost tank_metrics = await full_tank_metrics(session, car_id, date_from, date_to) avg_consumption = tank_metrics["average_fuel_consumption_full_tank"] cost_per_km = float(total_cost / distance_km) if distance_km else None months = max(Decimal(period_days(date_from, date_to)) / Decimal("30.4375"), Decimal("0.033")) cost_per_day = (total_cost / Decimal(period_days(date_from, date_to))).quantize(Decimal("0.01")) cost_per_month = (total_cost / months).quantize(Decimal("0.01")) recurring_total = (recurring_cost + depreciation_cost + loan_principal_cost + loan_interest_cost).quantize(Decimal("0.01")) one_time_costs = max(total_cost - recurring_total, Decimal("0")).quantize(Decimal("0.01")) recurring_monthly = (recurring_total / months).quantize(Decimal("0.01")) forecast_next_month = max(cost_per_month, recurring_monthly).quantize(Decimal("0.01")) repair_cost = ( Decimal(service_cost) + expense_categories.get("repair", Decimal("0")) + expense_categories.get("maintenance", Decimal("0")) + expense_categories.get("diagnostics", Decimal("0")) ).quantize(Decimal("0.01")) fixed_costs = (fixed_expense_cost + depreciation_cost + loan_principal_cost + loan_interest_cost).quantize(Decimal("0.01")) variable_costs = (Decimal(fuel_cost) + Decimal(service_cost) + variable_expense_cost).quantize(Decimal("0.01")) cost_by_category = { "fuel": Decimal(fuel_cost), "service": Decimal(service_cost), **expense_categories, } if depreciation_cost: cost_by_category["depreciation"] = depreciation_cost if loan_principal_cost: cost_by_category["loan_payment"] = cost_by_category.get("loan_payment", Decimal("0")) + loan_principal_cost if loan_interest_cost: cost_by_category["loan_interest"] = cost_by_category.get("loan_interest", Decimal("0")) + loan_interest_cost categories = [ {"category": key, "total_cost": value, "entries_count": 0} for key, value in sorted(cost_by_category.items()) if value ] current_month_cost, previous_month_cost = await month_comparison_totals(session, car_id, date_to) month_change = None cost_warning = None if previous_month_cost > 0: month_change = float((current_month_cost - previous_month_cost) * Decimal("100") / previous_month_cost) if month_change >= 35: cost_warning = "Расходы заметно выше прошлого месяца. Проверьте крупные ремонты, штрафы или регулярные платежи." return OwnershipStats( car_id=car_id, date_from=date_from, date_to=date_to, fuel_cost=fuel_cost, service_cost=service_cost, expenses_cost=expense_cost, total_cost=total_cost, repair_cost=repair_cost, fixed_costs=fixed_costs, variable_costs=variable_costs, recurring_costs=recurring_total, one_time_costs=one_time_costs, forecast_next_month=forecast_next_month, depreciation_cost=depreciation_cost, loan_principal_cost=loan_principal_cost, loan_interest_cost=loan_interest_cost, total_cost_without_credit=(total_cost - loan_principal_cost - loan_interest_cost).quantize(Decimal("0.01")), total_cost_with_credit=total_cost.quantize(Decimal("0.01")), cost_per_day=cost_per_day, cost_per_month=cost_per_month, current_month_cost=current_month_cost, previous_month_cost=previous_month_cost, month_over_month_change_pct=round(month_change, 2) if month_change is not None else None, cost_warning=cost_warning, cost_by_category=cost_by_category, categories=categories, liters=liters, distance_km=distance_km, avg_consumption_l_per_100km=avg_consumption, cost_per_km=cost_per_km, fuel_entries_count=fuel_count, service_entries_count=service_count, ) def period_days(date_from: date, date_to: date) -> int: return max((date_to - date_from).days + 1, 1) def add_months(value: date, months: int) -> date: month = value.month - 1 + months year = value.year + month // 12 month = month % 12 + 1 day = min(value.day, calendar.monthrange(year, month)[1]) return date(year, month, day) def overlap_days(left_start: date, left_end: date, right_start: date, right_end: date) -> int: start = max(left_start, right_start) end = min(left_end, right_end) if end < start: return 0 return period_days(start, end) def expense_window(entry: ExpenseEntry) -> tuple[date, date]: if entry.period_start and entry.period_end: return entry.period_start, entry.period_end if entry.period_start and entry.period_months: return entry.period_start, add_months(entry.period_start, entry.period_months) - timedelta(days=1) if entry.period_months: return entry.entry_date, add_months(entry.entry_date, entry.period_months) - timedelta(days=1) return entry.entry_date, entry.entry_date def allocated_expense_cost(entry: ExpenseEntry, date_from: date, date_to: date) -> Decimal: monthly_period = entry.payment_period_months or entry.period_months or inferred_monthly_period(entry) if monthly_period and (entry.period_start or entry.entry_date): return allocated_monthly_expense_cost(entry, date_from, date_to, monthly_period) start, end = expense_window(entry) total_days = period_days(start, end) matched_days = overlap_days(start, end, date_from, date_to) if matched_days <= 0: return Decimal("0") if total_days <= 1 and start == entry.entry_date: return Decimal(entry.total_cost) return (Decimal(entry.total_cost) * Decimal(matched_days) / Decimal(total_days)).quantize(Decimal("0.01")) def inferred_monthly_period(entry: ExpenseEntry) -> int | None: if entry.category != ExpenseCategory.insurance or not entry.period_start or not entry.period_end: return None for months in (1, 3, 6, 12): if add_months(entry.period_start, months) - timedelta(days=1) == entry.period_end: return months return None def allocated_monthly_expense_cost( entry: ExpenseEntry, date_from: date, date_to: date, months: int ) -> Decimal: start = entry.period_start or entry.entry_date if months <= 0: return Decimal("0") monthly_cost = Decimal(entry.total_cost) / Decimal(months) total = Decimal("0") for month_index in range(months): month_start = add_months(start, month_index) month_end = add_months(start, month_index + 1) - timedelta(days=1) matched = overlap_days(month_start, month_end, date_from, date_to) if matched <= 0: continue total_days = period_days(month_start, month_end) total += monthly_cost * Decimal(matched) / Decimal(total_days) return total.quantize(Decimal("0.01")) async def expense_period_totals( session: AsyncSession, car_id: int, date_from: date, date_to: date ) -> tuple[Decimal, Decimal, int, dict[str, Decimal], Decimal, Decimal]: result = await session.execute( select(ExpenseEntry) .where( ExpenseEntry.car_id == car_id, ExpenseEntry.entry_date <= date_to, ExpenseEntry.service_visit_id.is_(None), ) .order_by(ExpenseEntry.entry_date.asc(), ExpenseEntry.id.asc()) ) total = Decimal("0") recurring = Decimal("0") fixed = Decimal("0") variable = Decimal("0") categories: dict[str, Decimal] = {} count = 0 for entry in result.scalars(): amount = allocated_expense_cost(entry, date_from, date_to) if amount <= 0: continue count += 1 total += amount category = entry.category.value if isinstance(entry.category, ExpenseCategory) else str(entry.category) categories[category] = categories.get(category, Decimal("0")) + amount if entry.is_recurring or entry.category in FIXED_EXPENSE_CATEGORIES: recurring += amount if entry.category in FIXED_EXPENSE_CATEGORIES or entry.is_recurring: fixed += amount else: variable += amount return ( total.quantize(Decimal("0.01")), recurring.quantize(Decimal("0.01")), count, categories, fixed.quantize(Decimal("0.01")), variable.quantize(Decimal("0.01")), ) def calculate_depreciation(car: Car, date_from: date, date_to: date) -> Decimal: if not car.include_depreciation or not car.purchase_price or not car.purchase_date: return Decimal("0") depreciation_start = car.purchase_date months = car.expected_ownership_months or 60 residual = Decimal(car.expected_residual_value or 0) depreciable = max(Decimal(car.purchase_price) - residual, Decimal("0")) depreciation_end = add_months(car.purchase_date, months) - timedelta(days=1) matched_days = overlap_days(depreciation_start, depreciation_end, date_from, date_to) if matched_days <= 0: return Decimal("0") daily_cost = depreciable / Decimal(period_days(depreciation_start, depreciation_end)) return (daily_cost * Decimal(matched_days)).quantize(Decimal("0.01")) def calculate_loan_costs(car: Car, date_from: date, date_to: date) -> tuple[Decimal, Decimal]: if not car.loan_principal or not car.loan_term_months: return Decimal("0"), Decimal("0") first_payment = car.loan_first_payment_date or car.purchase_date if not first_payment: return Decimal("0"), Decimal("0") annual_rate = Decimal(car.loan_annual_interest_rate or 0) schedule = generate_annuity_schedule( principal=Decimal(car.loan_principal), months=car.loan_term_months, annual_rate=annual_rate, first_payment_date=first_payment, ) principal = Decimal("0") interest = Decimal("0") for row in schedule: if row.payment_date and date_from <= row.payment_date <= date_to: principal += row.principal interest += row.interest return principal.quantize(Decimal("0.01")), interest.quantize(Decimal("0.01")) async def raw_period_total(session: AsyncSession, car_id: int, date_from: date, date_to: date) -> Decimal: fuel = ( await session.execute( select(func.coalesce(func.sum(FuelEntry.total_cost), 0)).where( FuelEntry.car_id == car_id, FuelEntry.entry_date >= date_from, FuelEntry.entry_date <= date_to, ) ) ).scalar_one() service = ( await session.execute( select(func.coalesce(func.sum(ServiceEntry.total_cost), 0)).where( ServiceEntry.car_id == car_id, ServiceEntry.entry_date >= date_from, ServiceEntry.entry_date <= date_to, ) ) ).scalar_one() expenses, _, _, _, _, _ = await expense_period_totals(session, car_id, date_from, date_to) car = await session.get(Car, car_id) depreciation = calculate_depreciation(car, date_from, date_to) if car else Decimal("0") loan_principal, loan_interest = calculate_loan_costs(car, date_from, date_to) if car else (Decimal("0"), Decimal("0")) return (Decimal(fuel) + Decimal(service) + expenses + depreciation + loan_principal + loan_interest).quantize(Decimal("0.01")) async def month_comparison_totals(session: AsyncSession, car_id: int, today: date) -> tuple[Decimal, Decimal]: current_from = today.replace(day=1) previous_to = current_from - timedelta(days=1) previous_from = previous_to.replace(day=1) return ( await raw_period_total(session, car_id, current_from, today), await raw_period_total(session, car_id, previous_from, previous_to), ) async def full_tank_consumption( session: AsyncSession, car_id: int, date_from: date, date_to: date ) -> float | None: return (await full_tank_metrics(session, car_id, date_from, date_to))["average_fuel_consumption_full_tank"] async def full_tank_metrics( session: AsyncSession, car_id: int, date_from: date, date_to: date ) -> dict[str, float | int | str | None]: result = await session.execute( select(FuelEntry) .where( FuelEntry.car_id == car_id, FuelEntry.entry_date <= date_to, ) .order_by(FuelEntry.entry_date.asc(), FuelEntry.odometer.asc(), FuelEntry.id.asc()) ) entries = list(result.scalars()) full_indexes = [index for index, entry in enumerate(entries) if entry.is_full_tank] if len(full_indexes) < 2: return { "average_full_tank_distance": None, "average_fuel_consumption_full_tank": None, "average_cost_per_full_tank": None, "last_full_tank_distance": None, "full_tank_warning": None, } intervals: list[dict] = [] previous_full_index = full_indexes[0] for current_full_index in full_indexes[1:]: previous = entries[previous_full_index] current = entries[current_full_index] if current.entry_date < date_from: previous_full_index = current_full_index continue distance = current.odometer - previous.odometer if distance <= 0: previous_full_index = current_full_index continue interval_liters = sum( Decimal(entry.liters) for entry in entries[previous_full_index + 1 : current_full_index + 1] ) if interval_liters > 0: interval_cost = sum( Decimal(entry.total_cost) for entry in entries[previous_full_index + 1 : current_full_index + 1] ) intervals.append({"distance": distance, "liters": interval_liters, "cost": interval_cost}) previous_full_index = current_full_index if not intervals: return { "average_full_tank_distance": None, "average_fuel_consumption_full_tank": None, "average_cost_per_full_tank": None, "last_full_tank_distance": None, "full_tank_warning": None, } total_distance = sum(item["distance"] for item in intervals) total_liters = sum((item["liters"] for item in intervals), Decimal("0")) total_cost = sum((item["cost"] for item in intervals), Decimal("0")) avg_distance = float(Decimal(total_distance) / Decimal(len(intervals))) avg_consumption = float(total_liters * Decimal(100) / Decimal(total_distance)) avg_cost = float(total_cost / Decimal(len(intervals))) last_distance = int(intervals[-1]["distance"]) warning = None previous = intervals[:-1] if previous: previous_avg = float(Decimal(sum(item["distance"] for item in previous)) / Decimal(len(previous))) if previous_avg > 0 and last_distance < previous_avg * 0.75: drop = round((1 - last_distance / previous_avg) * 100) warning = ( f"Обычно на полном баке получается около {previous_avg:.0f} км. " f"Последний интервал {last_distance} км, это на {drop}% меньше. " "Проверьте режим поездок, давление шин, качество топлива или техническое состояние." ) return { "average_full_tank_distance": round(avg_distance, 1), "average_fuel_consumption_full_tank": round(avg_consumption, 2), "average_cost_per_full_tank": round(avg_cost, 2), "last_full_tank_distance": last_distance, "full_tank_warning": warning, } async def dataframe_from_query(session: AsyncSession, stmt: Select) -> pd.DataFrame: result = await session.execute(stmt) rows = result.mappings().all() return pd.DataFrame(rows) async def predict_odometer(session: AsyncSession, car_id: int) -> OdometerPrediction: price_prediction = await predict_fuel_price(session, car_id) tank_prediction = await full_tank_metrics(session, car_id, date.min, date.today()) fuel = await dataframe_from_query( session, select(FuelEntry.entry_date.label("date"), FuelEntry.odometer.label("odometer")).where( FuelEntry.car_id == car_id ), ) service = await dataframe_from_query( session, select(ServiceEntry.entry_date.label("date"), ServiceEntry.odometer.label("odometer")).where( ServiceEntry.car_id == car_id, ServiceEntry.odometer.is_not(None) ), ) if fuel.empty and service.empty: return OdometerPrediction( car_id=car_id, samples=0, current_odometer=None, predicted_today=None, predicted_30_days=None, avg_km_per_day=None, avg_km_per_month=None, **price_prediction, **tank_prediction, confidence=0, insight="Недостаточно данных: добавь одометр в заправках или сервисных записях.", ) df = pd.concat([fuel, service]).dropna().drop_duplicates().sort_values("date") df["date"] = pd.to_datetime(df["date"]) df = df[df["odometer"] >= 0] df = df.sort_values(["date", "odometer"]).drop_duplicates(subset=["date"], keep="last") df = df[df["odometer"].diff().fillna(0) >= 0] if len(df) < 2: current = int(df.iloc[-1]["odometer"]) return OdometerPrediction( car_id=car_id, samples=len(df), current_odometer=current, predicted_today=current, predicted_30_days=None, avg_km_per_day=None, avg_km_per_month=None, **price_prediction, **tank_prediction, confidence=0.2, insight="Есть только одна точка пробега. Для прогноза нужны минимум две записи.", ) last = df.iloc[-1] df["days_delta"] = df["date"].diff().dt.days df["km_delta"] = df["odometer"].diff() intervals = df[(df["days_delta"] > 0) & (df["km_delta"] >= 0)].copy() intervals["km_per_day"] = intervals["km_delta"] / intervals["days_delta"] intervals = intervals[(intervals["km_per_day"] >= 0) & (intervals["km_per_day"] <= 500)] if intervals.empty: km_per_day = 0 else: recent = intervals.tail(6).copy() recent["weight"] = range(1, len(recent) + 1) weighted = (recent["km_per_day"] * recent["weight"]).sum() / recent["weight"].sum() median = recent["km_per_day"].median() km_per_day = float((weighted * 0.7) + (median * 0.3)) today = pd.Timestamp.utcnow().tz_localize(None).normalize() days_since_last = max((today - last["date"]).days, 0) predicted_today = int(last["odometer"] + km_per_day * days_since_last) predicted_30 = int(predicted_today + km_per_day * 30) span_days = max((last["date"] - df.iloc[0]["date"]).days, 1) interval_count = len(intervals) variability = 0 if interval_count < 3 or km_per_day == 0 else min( float(intervals["km_per_day"].std() / max(km_per_day, 1)), 1, ) confidence = min( 0.95, max(0.25, 0.3 + interval_count * 0.055 + min(span_days, 365) / 900 - variability * 0.18), ) insight = ( "Пробег стабилен, прогноз надежный." if confidence >= 0.75 else "Прогноз предварительный: точность вырастет после регулярных записей одометра." ) return OdometerPrediction( car_id=car_id, samples=len(df), current_odometer=int(last["odometer"]), predicted_today=predicted_today, predicted_30_days=predicted_30, avg_km_per_day=round(km_per_day, 1), avg_km_per_month=round(km_per_day * 30.4, 1), **price_prediction, **tank_prediction, confidence=round(confidence, 2), insight=insight, ) async def predict_fuel_price(session: AsyncSession, car_id: int) -> dict[str, float | int | None]: df = await dataframe_from_query( session, select( FuelEntry.entry_date.label("date"), FuelEntry.price_per_liter.label("price"), ).where(FuelEntry.car_id == car_id), ) empty = { "current_price_per_liter": None, "predicted_price_per_liter_30_days": None, "avg_price_per_liter": None, "price_samples": 0, "price_confidence": 0, } if df.empty: return empty df = df.dropna().copy() if df.empty: return empty df["date"] = pd.to_datetime(df["date"]) df["price"] = pd.to_numeric(df["price"], errors="coerce") df = df[(df["price"] > 0) & (df["price"] < 10000)].sort_values("date") if df.empty: return empty recent = df.tail(8).copy() current = float(recent.iloc[-1]["price"]) avg = float(recent["price"].mean()) predicted = current confidence = min(0.72, 0.22 + len(recent) * 0.055) if len(recent) >= 2: span_days = max((recent.iloc[-1]["date"] - recent.iloc[0]["date"]).days, 1) change_per_day = float((recent.iloc[-1]["price"] - recent.iloc[0]["price"]) / span_days) predicted = current + change_per_day * 30 predicted = (predicted * 0.65) + (avg * 0.35) volatility = float(recent["price"].std() / max(avg, 1)) if len(recent) >= 3 else 0 confidence = min(0.9, max(0.3, confidence + min(span_days, 180) / 600 - volatility)) return { "current_price_per_liter": round(current, 2), "predicted_price_per_liter_30_days": round(max(predicted, 0), 2), "avg_price_per_liter": round(avg, 2), "price_samples": int(len(df)), "price_confidence": round(confidence, 2), }