Complete CarPass product flows

This commit is contained in:
VPN SaaS Dev
2026-05-14 21:19:37 +09:00
parent a83f55c646
commit c0014ab4ea
28 changed files with 3006 additions and 159 deletions

View File

@@ -3,12 +3,38 @@ from datetime import date, timedelta
from decimal import Decimal
import pandas as pd
from sqlalchemy import Select, func, or_, select
from sqlalchemy import Select, func, select
from sqlalchemy.ext.asyncio import AsyncSession
from app.models.car import Car
from app.models.expense import ExpenseCategory, ExpenseEntry, FuelEntry, ServiceEntry
from app.schemas.expense import OdometerPrediction, OwnershipStats
from app.services.loans import generate_annuity_schedule
FIXED_EXPENSE_CATEGORIES = {
ExpenseCategory.insurance,
ExpenseCategory.tax,
ExpenseCategory.loan_payment,
ExpenseCategory.loan_interest,
ExpenseCategory.parking,
}
VARIABLE_EXPENSE_CATEGORIES = {
ExpenseCategory.fine,
ExpenseCategory.car_wash,
ExpenseCategory.toll,
ExpenseCategory.tires,
ExpenseCategory.wheels,
ExpenseCategory.battery,
ExpenseCategory.parts,
ExpenseCategory.repair,
ExpenseCategory.maintenance,
ExpenseCategory.diagnostics,
ExpenseCategory.towing,
ExpenseCategory.state_fee,
ExpenseCategory.registration,
ExpenseCategory.inspection,
ExpenseCategory.other,
}
async def get_ownership_stats(
@@ -60,21 +86,39 @@ async def get_ownership_stats(
odometer_values = [value for value in odometer_values if value is not None]
distance_km = int(max(odometer_values) - min(odometer_values)) if len(odometer_values) >= 2 else 0
expense_cost, recurring_cost, _expense_count, expense_categories = await expense_period_totals(
(
expense_cost,
recurring_cost,
_expense_count,
expense_categories,
fixed_expense_cost,
variable_expense_cost,
) = await expense_period_totals(
session, car_id, date_from, date_to
)
car = await session.get(Car, car_id)
depreciation_cost = calculate_depreciation(car, date_from, date_to) if car else Decimal("0")
loan_principal_cost, loan_interest_cost = calculate_loan_costs(car, date_from, date_to) if car else (Decimal("0"), Decimal("0"))
total_cost = Decimal(fuel_cost) + Decimal(service_cost) + expense_cost + depreciation_cost
avg_consumption = await full_tank_consumption(session, car_id, date_from, date_to)
total_cost = Decimal(fuel_cost) + Decimal(service_cost) + expense_cost + depreciation_cost + loan_principal_cost + loan_interest_cost
tank_metrics = await full_tank_metrics(session, car_id, date_from, date_to)
avg_consumption = tank_metrics["average_fuel_consumption_full_tank"]
cost_per_km = float(total_cost / distance_km) if distance_km else None
months = max(Decimal(period_days(date_from, date_to)) / Decimal("30.4375"), Decimal("0.033"))
cost_per_day = (total_cost / Decimal(period_days(date_from, date_to))).quantize(Decimal("0.01"))
cost_per_month = (total_cost / months).quantize(Decimal("0.01"))
recurring_total = (recurring_cost + depreciation_cost).quantize(Decimal("0.01"))
recurring_total = (recurring_cost + depreciation_cost + loan_principal_cost + loan_interest_cost).quantize(Decimal("0.01"))
one_time_costs = max(total_cost - recurring_total, Decimal("0")).quantize(Decimal("0.01"))
recurring_monthly = (recurring_total / months).quantize(Decimal("0.01"))
forecast_next_month = max(cost_per_month, recurring_monthly).quantize(Decimal("0.01"))
repair_cost = (
Decimal(service_cost)
+ expense_categories.get("repair", Decimal("0"))
+ expense_categories.get("maintenance", Decimal("0"))
+ expense_categories.get("diagnostics", Decimal("0"))
).quantize(Decimal("0.01"))
fixed_costs = (fixed_expense_cost + depreciation_cost + loan_principal_cost + loan_interest_cost).quantize(Decimal("0.01"))
variable_costs = (Decimal(fuel_cost) + Decimal(service_cost) + variable_expense_cost).quantize(Decimal("0.01"))
cost_by_category = {
"fuel": Decimal(fuel_cost),
@@ -83,11 +127,22 @@ async def get_ownership_stats(
}
if depreciation_cost:
cost_by_category["depreciation"] = depreciation_cost
if loan_principal_cost:
cost_by_category["loan_payment"] = cost_by_category.get("loan_payment", Decimal("0")) + loan_principal_cost
if loan_interest_cost:
cost_by_category["loan_interest"] = cost_by_category.get("loan_interest", Decimal("0")) + loan_interest_cost
categories = [
{"category": key, "total_cost": value, "entries_count": 0}
for key, value in sorted(cost_by_category.items())
if value
]
current_month_cost, previous_month_cost = await month_comparison_totals(session, car_id, date_to)
month_change = None
cost_warning = None
if previous_month_cost > 0:
month_change = float((current_month_cost - previous_month_cost) * Decimal("100") / previous_month_cost)
if month_change >= 35:
cost_warning = "Расходы заметно выше прошлого месяца. Проверьте крупные ремонты, штрафы или регулярные платежи."
return OwnershipStats(
car_id=car_id,
@@ -97,11 +152,23 @@ async def get_ownership_stats(
service_cost=service_cost,
expenses_cost=expense_cost,
total_cost=total_cost,
repair_cost=repair_cost,
fixed_costs=fixed_costs,
variable_costs=variable_costs,
recurring_costs=recurring_total,
one_time_costs=one_time_costs,
forecast_next_month=forecast_next_month,
depreciation_cost=depreciation_cost,
loan_principal_cost=loan_principal_cost,
loan_interest_cost=loan_interest_cost,
total_cost_without_credit=(total_cost - loan_principal_cost - loan_interest_cost).quantize(Decimal("0.01")),
total_cost_with_credit=total_cost.quantize(Decimal("0.01")),
cost_per_day=cost_per_day,
cost_per_month=cost_per_month,
current_month_cost=current_month_cost,
previous_month_cost=previous_month_cost,
month_over_month_change_pct=round(month_change, 2) if month_change is not None else None,
cost_warning=cost_warning,
cost_by_category=cost_by_category,
categories=categories,
liters=liters,
@@ -144,6 +211,9 @@ def expense_window(entry: ExpenseEntry) -> tuple[date, date]:
def allocated_expense_cost(entry: ExpenseEntry, date_from: date, date_to: date) -> Decimal:
monthly_period = entry.payment_period_months or entry.period_months or inferred_monthly_period(entry)
if monthly_period and (entry.period_start or entry.entry_date):
return allocated_monthly_expense_cost(entry, date_from, date_to, monthly_period)
start, end = expense_window(entry)
total_days = period_days(start, end)
matched_days = overlap_days(start, end, date_from, date_to)
@@ -154,24 +224,49 @@ def allocated_expense_cost(entry: ExpenseEntry, date_from: date, date_to: date)
return (Decimal(entry.total_cost) * Decimal(matched_days) / Decimal(total_days)).quantize(Decimal("0.01"))
def inferred_monthly_period(entry: ExpenseEntry) -> int | None:
if entry.category != ExpenseCategory.insurance or not entry.period_start or not entry.period_end:
return None
for months in (1, 3, 6, 12):
if add_months(entry.period_start, months) - timedelta(days=1) == entry.period_end:
return months
return None
def allocated_monthly_expense_cost(
entry: ExpenseEntry, date_from: date, date_to: date, months: int
) -> Decimal:
start = entry.period_start or entry.entry_date
if months <= 0:
return Decimal("0")
monthly_cost = Decimal(entry.total_cost) / Decimal(months)
total = Decimal("0")
for month_index in range(months):
month_start = add_months(start, month_index)
month_end = add_months(start, month_index + 1) - timedelta(days=1)
matched = overlap_days(month_start, month_end, date_from, date_to)
if matched <= 0:
continue
total_days = period_days(month_start, month_end)
total += monthly_cost * Decimal(matched) / Decimal(total_days)
return total.quantize(Decimal("0.01"))
async def expense_period_totals(
session: AsyncSession, car_id: int, date_from: date, date_to: date
) -> tuple[Decimal, Decimal, int, dict[str, Decimal]]:
) -> tuple[Decimal, Decimal, int, dict[str, Decimal], Decimal, Decimal]:
result = await session.execute(
select(ExpenseEntry)
.where(
ExpenseEntry.car_id == car_id,
or_(
ExpenseEntry.entry_date.between(date_from, date_to),
ExpenseEntry.period_start.between(date_from, date_to),
ExpenseEntry.period_end.between(date_from, date_to),
(ExpenseEntry.period_start <= date_from) & (ExpenseEntry.period_end >= date_to),
),
ExpenseEntry.entry_date <= date_to,
)
.order_by(ExpenseEntry.entry_date.asc(), ExpenseEntry.id.asc())
)
total = Decimal("0")
recurring = Decimal("0")
fixed = Decimal("0")
variable = Decimal("0")
categories: dict[str, Decimal] = {}
count = 0
for entry in result.scalars():
@@ -182,26 +277,104 @@ async def expense_period_totals(
total += amount
category = entry.category.value if isinstance(entry.category, ExpenseCategory) else str(entry.category)
categories[category] = categories.get(category, Decimal("0")) + amount
if entry.is_recurring or entry.category in {ExpenseCategory.insurance, ExpenseCategory.loan_payment, ExpenseCategory.loan_interest}:
if entry.is_recurring or entry.category in FIXED_EXPENSE_CATEGORIES:
recurring += amount
return total.quantize(Decimal("0.01")), recurring.quantize(Decimal("0.01")), count, categories
if entry.category in FIXED_EXPENSE_CATEGORIES or entry.is_recurring:
fixed += amount
else:
variable += amount
return (
total.quantize(Decimal("0.01")),
recurring.quantize(Decimal("0.01")),
count,
categories,
fixed.quantize(Decimal("0.01")),
variable.quantize(Decimal("0.01")),
)
def calculate_depreciation(car: Car, date_from: date, date_to: date) -> Decimal:
if not car.include_depreciation or not car.purchase_price or not car.purchase_date:
return Decimal("0")
depreciation_start = car.purchase_date
depreciation_end = add_months(car.purchase_date, 60) - timedelta(days=1)
months = car.expected_ownership_months or 60
residual = Decimal(car.expected_residual_value or 0)
depreciable = max(Decimal(car.purchase_price) - residual, Decimal("0"))
depreciation_end = add_months(car.purchase_date, months) - timedelta(days=1)
matched_days = overlap_days(depreciation_start, depreciation_end, date_from, date_to)
if matched_days <= 0:
return Decimal("0")
daily_cost = Decimal(car.purchase_price) / Decimal(period_days(depreciation_start, depreciation_end))
daily_cost = depreciable / Decimal(period_days(depreciation_start, depreciation_end))
return (daily_cost * Decimal(matched_days)).quantize(Decimal("0.01"))
def calculate_loan_costs(car: Car, date_from: date, date_to: date) -> tuple[Decimal, Decimal]:
if not car.loan_principal or not car.loan_term_months:
return Decimal("0"), Decimal("0")
first_payment = car.loan_first_payment_date or car.purchase_date
if not first_payment:
return Decimal("0"), Decimal("0")
annual_rate = Decimal(car.loan_annual_interest_rate or 0)
schedule = generate_annuity_schedule(
principal=Decimal(car.loan_principal),
months=car.loan_term_months,
annual_rate=annual_rate,
first_payment_date=first_payment,
)
principal = Decimal("0")
interest = Decimal("0")
for row in schedule:
if row.payment_date and date_from <= row.payment_date <= date_to:
principal += row.principal
interest += row.interest
return principal.quantize(Decimal("0.01")), interest.quantize(Decimal("0.01"))
async def raw_period_total(session: AsyncSession, car_id: int, date_from: date, date_to: date) -> Decimal:
fuel = (
await session.execute(
select(func.coalesce(func.sum(FuelEntry.total_cost), 0)).where(
FuelEntry.car_id == car_id,
FuelEntry.entry_date >= date_from,
FuelEntry.entry_date <= date_to,
)
)
).scalar_one()
service = (
await session.execute(
select(func.coalesce(func.sum(ServiceEntry.total_cost), 0)).where(
ServiceEntry.car_id == car_id,
ServiceEntry.entry_date >= date_from,
ServiceEntry.entry_date <= date_to,
)
)
).scalar_one()
expenses, _, _, _, _, _ = await expense_period_totals(session, car_id, date_from, date_to)
car = await session.get(Car, car_id)
depreciation = calculate_depreciation(car, date_from, date_to) if car else Decimal("0")
loan_principal, loan_interest = calculate_loan_costs(car, date_from, date_to) if car else (Decimal("0"), Decimal("0"))
return (Decimal(fuel) + Decimal(service) + expenses + depreciation + loan_principal + loan_interest).quantize(Decimal("0.01"))
async def month_comparison_totals(session: AsyncSession, car_id: int, today: date) -> tuple[Decimal, Decimal]:
current_from = today.replace(day=1)
previous_to = current_from - timedelta(days=1)
previous_from = previous_to.replace(day=1)
return (
await raw_period_total(session, car_id, current_from, today),
await raw_period_total(session, car_id, previous_from, previous_to),
)
async def full_tank_consumption(
session: AsyncSession, car_id: int, date_from: date, date_to: date
) -> float | None:
return (await full_tank_metrics(session, car_id, date_from, date_to))["average_fuel_consumption_full_tank"]
async def full_tank_metrics(
session: AsyncSession, car_id: int, date_from: date, date_to: date
) -> dict[str, float | int | str | None]:
result = await session.execute(
select(FuelEntry)
.where(
@@ -213,10 +386,15 @@ async def full_tank_consumption(
entries = list(result.scalars())
full_indexes = [index for index, entry in enumerate(entries) if entry.is_full_tank]
if len(full_indexes) < 2:
return None
return {
"average_full_tank_distance": None,
"average_fuel_consumption_full_tank": None,
"average_cost_per_full_tank": None,
"last_full_tank_distance": None,
"full_tank_warning": None,
}
total_liters = Decimal("0")
total_distance = 0
intervals: list[dict] = []
previous_full_index = full_indexes[0]
for current_full_index in full_indexes[1:]:
previous = entries[previous_full_index]
@@ -232,13 +410,45 @@ async def full_tank_consumption(
Decimal(entry.liters) for entry in entries[previous_full_index + 1 : current_full_index + 1]
)
if interval_liters > 0:
total_liters += interval_liters
total_distance += distance
interval_cost = sum(
Decimal(entry.total_cost) for entry in entries[previous_full_index + 1 : current_full_index + 1]
)
intervals.append({"distance": distance, "liters": interval_liters, "cost": interval_cost})
previous_full_index = current_full_index
if total_distance <= 0 or total_liters <= 0:
return None
return float(total_liters * Decimal(100) / Decimal(total_distance))
if not intervals:
return {
"average_full_tank_distance": None,
"average_fuel_consumption_full_tank": None,
"average_cost_per_full_tank": None,
"last_full_tank_distance": None,
"full_tank_warning": None,
}
total_distance = sum(item["distance"] for item in intervals)
total_liters = sum((item["liters"] for item in intervals), Decimal("0"))
total_cost = sum((item["cost"] for item in intervals), Decimal("0"))
avg_distance = float(Decimal(total_distance) / Decimal(len(intervals)))
avg_consumption = float(total_liters * Decimal(100) / Decimal(total_distance))
avg_cost = float(total_cost / Decimal(len(intervals)))
last_distance = int(intervals[-1]["distance"])
warning = None
previous = intervals[:-1]
if previous:
previous_avg = float(Decimal(sum(item["distance"] for item in previous)) / Decimal(len(previous)))
if previous_avg > 0 and last_distance < previous_avg * 0.75:
drop = round((1 - last_distance / previous_avg) * 100)
warning = (
f"Обычно на полном баке получается около {previous_avg:.0f} км. "
f"Последний интервал {last_distance} км, это на {drop}% меньше. "
"Проверьте режим поездок, давление шин, качество топлива или техническое состояние."
)
return {
"average_full_tank_distance": round(avg_distance, 1),
"average_fuel_consumption_full_tank": round(avg_consumption, 2),
"average_cost_per_full_tank": round(avg_cost, 2),
"last_full_tank_distance": last_distance,
"full_tank_warning": warning,
}
async def dataframe_from_query(session: AsyncSession, stmt: Select) -> pd.DataFrame:
@@ -249,6 +459,7 @@ async def dataframe_from_query(session: AsyncSession, stmt: Select) -> pd.DataFr
async def predict_odometer(session: AsyncSession, car_id: int) -> OdometerPrediction:
price_prediction = await predict_fuel_price(session, car_id)
tank_prediction = await full_tank_metrics(session, car_id, date.min, date.today())
fuel = await dataframe_from_query(
session,
select(FuelEntry.entry_date.label("date"), FuelEntry.odometer.label("odometer")).where(
@@ -271,6 +482,7 @@ async def predict_odometer(session: AsyncSession, car_id: int) -> OdometerPredic
avg_km_per_day=None,
avg_km_per_month=None,
**price_prediction,
**tank_prediction,
confidence=0,
insight="Недостаточно данных: добавь одометр в заправках или сервисных записях.",
)
@@ -291,6 +503,7 @@ async def predict_odometer(session: AsyncSession, car_id: int) -> OdometerPredic
avg_km_per_day=None,
avg_km_per_month=None,
**price_prediction,
**tank_prediction,
confidence=0.2,
insight="Есть только одна точка пробега. Для прогноза нужны минимум две записи.",
)
@@ -337,6 +550,7 @@ async def predict_odometer(session: AsyncSession, car_id: int) -> OdometerPredic
avg_km_per_day=round(km_per_day, 1),
avg_km_per_month=round(km_per_day * 30.4, 1),
**price_prediction,
**tank_prediction,
confidence=round(confidence, 2),
insight=insight,
)

94
app/services/loans.py Normal file
View File

@@ -0,0 +1,94 @@
from __future__ import annotations
from dataclasses import dataclass
from datetime import date
from decimal import ROUND_HALF_UP, Decimal
MONEY = Decimal("0.01")
@dataclass(frozen=True)
class LoanPayment:
number: int
payment_date: date | None
payment: Decimal
principal: Decimal
interest: Decimal
remaining_principal: Decimal
def quantize_money(value: Decimal) -> Decimal:
return value.quantize(MONEY, rounding=ROUND_HALF_UP)
def annuity_payment(principal: Decimal, months: int, annual_rate: Decimal) -> Decimal:
if principal <= 0:
raise ValueError("principal must be positive")
if months <= 0:
raise ValueError("months must be positive")
if annual_rate < 0:
raise ValueError("annual_rate must be non-negative")
if annual_rate == 0:
return quantize_money(principal / Decimal(months))
monthly_rate = annual_rate / Decimal("12") / Decimal("100")
factor = (Decimal("1") + monthly_rate) ** months
payment = principal * monthly_rate * factor / (factor - Decimal("1"))
return quantize_money(payment)
def loan_summary(principal: Decimal, months: int, annual_rate: Decimal) -> dict:
payment = annuity_payment(principal, months, annual_rate)
total_payment = quantize_money(payment * Decimal(months))
total_interest = max(total_payment - principal, Decimal("0")).quantize(MONEY)
return {
"monthly_payment": payment,
"total_payment": total_payment,
"overpayment": total_interest,
"total_interest": total_interest,
"principal": principal,
"months": months,
"annual_rate": annual_rate,
}
def generate_annuity_schedule(
*,
principal: Decimal,
months: int,
annual_rate: Decimal,
first_payment_date: date | None = None,
) -> list[LoanPayment]:
payment = annuity_payment(principal, months, annual_rate)
monthly_rate = annual_rate / Decimal("12") / Decimal("100")
remaining = principal
rows: list[LoanPayment] = []
for number in range(1, months + 1):
interest = quantize_money(remaining * monthly_rate) if annual_rate else Decimal("0.00")
principal_part = payment - interest
if number == months or principal_part > remaining:
principal_part = remaining
payment_for_row = principal_part + interest
else:
payment_for_row = payment
remaining = max(remaining - principal_part, Decimal("0"))
rows.append(
LoanPayment(
number=number,
payment_date=None if first_payment_date is None else add_months(first_payment_date, number - 1),
payment=quantize_money(payment_for_row),
principal=quantize_money(principal_part),
interest=quantize_money(interest),
remaining_principal=quantize_money(remaining),
)
)
return rows
def add_months(value: date, months: int) -> date:
import calendar
month = value.month - 1 + months
year = value.year + month // 12
month = month % 12 + 1
day = min(value.day, calendar.monthrange(year, month)[1])
return date(year, month, day)

157
app/services/odometer.py Normal file
View File

@@ -0,0 +1,157 @@
from __future__ import annotations
from fastapi import HTTPException
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from app.models.car import Car, OdometerHistory
from app.models.expense import ExpenseEntry, FuelEntry, ServiceEntry
def validate_odometer_change(
car: Car,
new_odometer: int | None,
*,
source_record_type: str,
confirm_lower_odometer: bool = False,
) -> None:
if new_odometer is None:
return
if new_odometer < 0:
raise HTTPException(status_code=422, detail="Odometer must be non-negative")
current = car.current_odometer
if current is not None and new_odometer < current and not confirm_lower_odometer:
raise HTTPException(
status_code=409,
detail={
"code": "odometer_lower_than_current",
"message": "Новый пробег меньше текущего. Подтвердите ручную корректировку или проверьте запись.",
"current_odometer": current,
"new_odometer": new_odometer,
"source": source_record_type,
},
)
if current is not None and new_odometer > current + 100000 and not confirm_lower_odometer:
raise HTTPException(
status_code=409,
detail={
"code": "odometer_jump_requires_confirmation",
"message": "Пробег сильно отличается от текущего. Проверьте число перед сохранением.",
"current_odometer": current,
"new_odometer": new_odometer,
"source": source_record_type,
},
)
def add_odometer_history(
session: AsyncSession,
car: Car,
*,
new_odometer: int,
source_record_type: str,
source_record_id: int | None,
changed_by: int | None,
confirmation_required: bool = False,
user_confirmed: bool = True,
) -> None:
previous = car.current_odometer
session.add(
OdometerHistory(
car_id=car.id,
previous_odometer=previous,
new_odometer=new_odometer,
source_record_type=source_record_type,
source_record_id=source_record_id,
changed_by=changed_by,
confirmation_required=confirmation_required,
user_confirmed=user_confirmed,
)
)
car.current_odometer = new_odometer
async def apply_odometer_from_record(
session: AsyncSession,
car: Car,
*,
new_odometer: int | None,
source_record_type: str,
source_record_id: int | None,
changed_by: int | None,
confirm_lower_odometer: bool = False,
) -> None:
if new_odometer is None:
return
validate_odometer_change(
car,
new_odometer,
source_record_type=source_record_type,
confirm_lower_odometer=confirm_lower_odometer,
)
current = car.current_odometer
if current is None or new_odometer > current or confirm_lower_odometer:
add_odometer_history(
session,
car,
new_odometer=new_odometer,
source_record_type=source_record_type,
source_record_id=source_record_id,
changed_by=changed_by,
confirmation_required=current is not None and new_odometer < current,
user_confirmed=True,
)
async def recalculate_current_odometer(
session: AsyncSession,
car_id: int,
*,
changed_by: int | None = None,
source_record_type: str = "recalculate",
) -> None:
car = await session.get(Car, car_id)
if car is None:
return
fuel_result = await session.execute(
select(FuelEntry.odometer)
.where(FuelEntry.car_id == car_id)
.order_by(FuelEntry.odometer.desc())
.limit(1)
)
service_result = await session.execute(
select(ServiceEntry.odometer)
.where(ServiceEntry.car_id == car_id, ServiceEntry.odometer.is_not(None))
.order_by(ServiceEntry.odometer.desc())
.limit(1)
)
expense_result = await session.execute(
select(ExpenseEntry.odometer)
.where(ExpenseEntry.car_id == car_id, ExpenseEntry.odometer.is_not(None))
.order_by(ExpenseEntry.odometer.desc())
.limit(1)
)
values = [
value
for value in (
fuel_result.scalar_one_or_none(),
service_result.scalar_one_or_none(),
expense_result.scalar_one_or_none(),
)
if value is not None
]
new_value = max(values) if values else None
if new_value != car.current_odometer:
if new_value is None:
car.current_odometer = None
return
add_odometer_history(
session,
car,
new_odometer=new_value,
source_record_type=source_record_type,
source_record_id=None,
changed_by=changed_by,
confirmation_required=False,
user_confirmed=True,
)

View File

@@ -0,0 +1,193 @@
from __future__ import annotations
import re
from decimal import Decimal
from typing import Any
from pydantic import BaseModel, Field
from app.services.vehicle_identity import normalize_license_plate, validate_vin
FULL_TANK_RE = re.compile(r"(до\s+полного|полный\s+бак|залил\s+полный|full\s+tank)", re.I)
NUMBER_RE = re.compile(r"(\d+(?:[.,]\d+)?)")
class ParsedRecord(BaseModel):
event_type: str
confidence: float = Field(ge=0, le=1)
missing_fields: list[str] = Field(default_factory=list)
warnings: list[str] = Field(default_factory=list)
data: dict[str, Any] = Field(default_factory=dict)
def decimal_from_match(value: str | None) -> Decimal | None:
if not value:
return None
return Decimal(value.replace(",", "."))
def parse_record_text(text: str) -> ParsedRecord:
source = " ".join(text.strip().split())
lower = source.lower()
if not source:
return ParsedRecord(event_type="unknown", confidence=0, missing_fields=["text"])
vin = extract_vin(source)
plate = extract_license_plate(source)
if any(word in lower for word in ("купил", "покупка", "кредит", "loan", "lease")):
return parse_purchase(source, vin, plate)
if any(word in lower for word in ("заправ", "литр", "л ", "full tank", "бак")):
return parse_fuel(source, vin, plate)
if any(word in lower for word in ("страхов", "полис", "osago", "каско")):
return parse_expense(source, "insurance", vin, plate)
if any(word in lower for word in ("штраф", "fine")):
return parse_expense(source, "fine", vin, plate)
if any(word in lower for word in ("налог", "tax")):
return parse_expense(source, "tax", vin, plate)
if any(word in lower for word in ("то", "сервис", "ремонт", "масл", "diagnostics", "repair")):
return parse_service(source, vin, plate)
return ParsedRecord(
event_type="unknown",
confidence=0.2,
warnings=["Не удалось надежно определить тип записи. Откройте ручной ввод."],
data=identity_payload(vin, plate),
)
def parse_fuel(source: str, vin: str | None, plate: str | None) -> ParsedRecord:
liters = find_decimal(r"(\d+(?:[.,]\d+)?)\s*(?:л|литр|liter|l)\b", source)
amount = find_decimal(r"(?:на|сумма|total|amount)\s*(\d+(?:[.,]\d+)?)", source)
if amount is None:
amount = largest_money_like_number(source, exclude={liters})
odometer = find_int(r"(?:пробег|одометр|odo|km|км)\s*(\d{2,7})", source)
price_per_liter = None
if liters and amount:
price_per_liter = (amount / liters).quantize(Decimal("0.01"))
missing = []
if liters is None:
missing.append("fuel_liters")
if amount is None:
missing.append("amount")
if odometer is None:
missing.append("odometer_km")
return ParsedRecord(
event_type="fuel",
confidence=0.9 if not missing else 0.55,
missing_fields=missing,
data={
**identity_payload(vin, plate),
"is_full_tank": bool(FULL_TANK_RE.search(source)),
"fuel_liters": float(liters) if liters is not None else None,
"amount": float(amount) if amount is not None else None,
"price_per_liter": float(price_per_liter) if price_per_liter is not None else None,
"odometer_km": odometer,
},
)
def parse_purchase(source: str, vin: str | None, plate: str | None) -> ParsedRecord:
purchase_price = find_decimal(r"(?:за|стоимость|цена)\s*(\d+(?:[.,]\d+)?)", source)
loan_principal = find_decimal(r"(?:кредит|loan)\s*(\d+(?:[.,]\d+)?)", source)
term = find_int(r"(?:на|срок)\s*(\d{1,3})\s*(?:мес|месяц|months)", source)
rate = find_decimal(r"(?:под|ставк[аи]|rate)\s*(\d+(?:[.,]\d+)?)\s*%?", source)
currency = detect_currency(source)
missing = []
if purchase_price is None:
missing.append("purchase_price")
return ParsedRecord(
event_type="vehicle_purchase",
confidence=0.86 if purchase_price is not None else 0.45,
missing_fields=missing,
data={
**identity_payload(vin, plate),
"purchase_price": float(purchase_price) if purchase_price is not None else None,
"purchase_currency": currency,
"purchase_type": "credit" if loan_principal else "cash",
"loan_principal": float(loan_principal) if loan_principal is not None else None,
"loan_term_months": term,
"annual_interest_rate": float(rate) if rate is not None else None,
},
)
def parse_expense(source: str, category: str, vin: str | None, plate: str | None) -> ParsedRecord:
amount = find_decimal(r"(?:на|сумма|оплатил|total|amount)\s*(\d+(?:[.,]\d+)?)", source) or largest_money_like_number(source)
return ParsedRecord(
event_type=category,
confidence=0.75 if amount is not None else 0.5,
missing_fields=[] if amount is not None else ["amount"],
data={
**identity_payload(vin, plate),
"category": category,
"amount": float(amount) if amount is not None else None,
"currency": detect_currency(source),
},
)
def parse_service(source: str, vin: str | None, plate: str | None) -> ParsedRecord:
amount = find_decimal(r"(?:на|сумма|стоимость|total|amount)\s*(\d+(?:[.,]\d+)?)", source)
odometer = find_int(r"(?:пробег|одометр|odo|km|км)\s*(\d{2,7})", source)
title = "Замена масла" if re.search(r"масл", source, re.I) else "Сервисная запись"
return ParsedRecord(
event_type="service",
confidence=0.72,
missing_fields=[] if odometer is not None else ["odometer_km"],
data={
**identity_payload(vin, plate),
"title": title,
"amount": float(amount) if amount is not None else 0,
"odometer_km": odometer,
"service_type": "maintenance" if title == "Замена масла" else "repair",
},
)
def identity_payload(vin: str | None, plate: str | None) -> dict[str, str | None]:
return {"vin": vin, "license_plate": plate}
def extract_vin(source: str) -> str | None:
for candidate in re.findall(r"[A-HJ-NPR-Z0-9][A-HJ-NPR-Z0-9\s-]{15,25}[A-HJ-NPR-Z0-9]", source.upper()):
try:
return validate_vin(candidate)
except ValueError:
continue
return None
def extract_license_plate(source: str) -> str | None:
match = re.search(r"(?:номер|госномер|plate)\s*[:#]?\s*([A-ZА-Я0-9가-힣\-\s]{4,14})", source, re.I)
return normalize_license_plate(match.group(1)) if match else None
def find_decimal(pattern: str, source: str) -> Decimal | None:
match = re.search(pattern, source, re.I)
return decimal_from_match(match.group(1)) if match else None
def find_int(pattern: str, source: str) -> int | None:
match = re.search(pattern, source, re.I)
return int(match.group(1)) if match else None
def largest_money_like_number(source: str, exclude: set[Decimal | None] | None = None) -> Decimal | None:
excluded = {item for item in (exclude or set()) if item is not None}
values = [decimal_from_match(match.group(1)) for match in NUMBER_RE.finditer(source)]
candidates = [value for value in values if value is not None and value not in excluded]
if not candidates:
return None
return max(candidates)
def detect_currency(source: str) -> str:
lower = source.lower()
if "вон" in lower or "krw" in lower or "" in lower:
return "KRW"
if "usd" in lower or "$" in lower:
return "USD"
if "eur" in lower or "" in lower:
return "EUR"
return "RUB"

View File

@@ -36,6 +36,54 @@ class MissingItem:
DEFAULT_ACHIEVEMENTS = [
{
"code": "vehicle_added",
"scope": "vehicle",
"title": "Авто добавлено",
"description": "В гараже появилась первая карточка автомобиля.",
"icon": "car",
"category": "profile",
},
{
"code": "vin_added",
"scope": "vehicle",
"title": "VIN указан",
"description": "Идентификация автомобиля стала надежнее.",
"icon": "vin",
"category": "profile",
},
{
"code": "license_plate_added",
"scope": "vehicle",
"title": "Госномер указан",
"description": "Карточку проще связать с сервисными визитами.",
"icon": "plate",
"category": "profile",
},
{
"code": "vehicle_profile_half",
"scope": "vehicle",
"title": "Карточка авто заполнена на 50%",
"description": "Данных уже достаточно для базовой аналитики.",
"icon": "progress",
"category": "profile",
},
{
"code": "vehicle_profile_full",
"scope": "vehicle",
"title": "Карточка авто заполнена полностью",
"description": "Цифровой паспорт автомобиля готов к эксплуатации.",
"icon": "passport",
"category": "profile",
},
{
"code": "first_fuel_record",
"scope": "vehicle",
"title": "Первая заправка",
"description": "Расход топлива начал формировать историю владения.",
"icon": "fuel",
"category": "tracking",
},
{
"code": "first_service_record",
"scope": "vehicle",
@@ -371,6 +419,47 @@ async def evaluate_vehicle_achievements(
visits: list[ServiceVisit],
) -> None:
achievements = await ensure_default_achievements(session)
await unlock_achievement(
session,
user_id=car.owner_id,
vehicle_id=car.id,
achievement=achievements["vehicle_added"],
)
if car.vin_normalized:
await unlock_achievement(
session,
user_id=car.owner_id,
vehicle_id=car.id,
achievement=achievements["vin_added"],
)
if car.license_plate_normalized:
await unlock_achievement(
session,
user_id=car.owner_id,
vehicle_id=car.id,
achievement=achievements["license_plate_added"],
)
if vehicle_score.completeness_score >= 50:
await unlock_achievement(
session,
user_id=car.owner_id,
vehicle_id=car.id,
achievement=achievements["vehicle_profile_half"],
)
if vehicle_score.completeness_score >= 95:
await unlock_achievement(
session,
user_id=car.owner_id,
vehicle_id=car.id,
achievement=achievements["vehicle_profile_full"],
)
if fuel_entries:
await unlock_achievement(
session,
user_id=car.owner_id,
vehicle_id=car.id,
achievement=achievements["first_fuel_record"],
)
if service_entries or visits:
await unlock_achievement(
session,