Improve CarPass product UX and service flows

This commit is contained in:
VPN SaaS Dev
2026-05-14 19:33:25 +09:00
parent b85db333d8
commit caa5f6d3db
36 changed files with 1836 additions and 366 deletions

View File

@@ -1,11 +1,13 @@
from datetime import date
import calendar
from datetime import date, timedelta
from decimal import Decimal
import pandas as pd
from sqlalchemy import Select, func, select
from sqlalchemy import Select, func, or_, select
from sqlalchemy.ext.asyncio import AsyncSession
from app.models.expense import FuelEntry, ServiceEntry
from app.models.car import Car
from app.models.expense import ExpenseCategory, ExpenseEntry, FuelEntry, ServiceEntry
from app.schemas.expense import OdometerPrediction, OwnershipStats
@@ -36,10 +38,56 @@ async def get_ownership_stats(
)
service_cost, service_count = service_totals.one()
distance_km = int(max_odo - min_odo) if min_odo is not None and max_odo is not None else 0
total_cost = Decimal(fuel_cost) + Decimal(service_cost)
odometer_values = [min_odo, max_odo]
service_odo = await session.execute(
select(func.min(ServiceEntry.odometer), func.max(ServiceEntry.odometer)).where(
ServiceEntry.car_id == car_id,
ServiceEntry.odometer.is_not(None),
ServiceEntry.entry_date >= date_from,
ServiceEntry.entry_date <= date_to,
)
)
expense_odo = await session.execute(
select(func.min(ExpenseEntry.odometer), func.max(ExpenseEntry.odometer)).where(
ExpenseEntry.car_id == car_id,
ExpenseEntry.odometer.is_not(None),
ExpenseEntry.entry_date >= date_from,
ExpenseEntry.entry_date <= date_to,
)
)
odometer_values.extend(service_odo.one())
odometer_values.extend(expense_odo.one())
odometer_values = [value for value in odometer_values if value is not None]
distance_km = int(max(odometer_values) - min(odometer_values)) if len(odometer_values) >= 2 else 0
expense_cost, recurring_cost, _expense_count, expense_categories = await expense_period_totals(
session, car_id, date_from, date_to
)
car = await session.get(Car, car_id)
depreciation_cost = calculate_depreciation(car, date_from, date_to) if car else Decimal("0")
total_cost = Decimal(fuel_cost) + Decimal(service_cost) + expense_cost + depreciation_cost
avg_consumption = await full_tank_consumption(session, car_id, date_from, date_to)
cost_per_km = float(total_cost / distance_km) if distance_km else None
months = max(Decimal(period_days(date_from, date_to)) / Decimal("30.4375"), Decimal("0.033"))
cost_per_month = (total_cost / months).quantize(Decimal("0.01"))
recurring_total = (recurring_cost + depreciation_cost).quantize(Decimal("0.01"))
one_time_costs = max(total_cost - recurring_total, Decimal("0")).quantize(Decimal("0.01"))
recurring_monthly = (recurring_total / months).quantize(Decimal("0.01"))
forecast_next_month = max(cost_per_month, recurring_monthly).quantize(Decimal("0.01"))
cost_by_category = {
"fuel": Decimal(fuel_cost),
"service": Decimal(service_cost),
**expense_categories,
}
if depreciation_cost:
cost_by_category["depreciation"] = depreciation_cost
categories = [
{"category": key, "total_cost": value, "entries_count": 0}
for key, value in sorted(cost_by_category.items())
if value
]
return OwnershipStats(
car_id=car_id,
@@ -47,7 +95,15 @@ async def get_ownership_stats(
date_to=date_to,
fuel_cost=fuel_cost,
service_cost=service_cost,
expenses_cost=expense_cost,
total_cost=total_cost,
recurring_costs=recurring_total,
one_time_costs=one_time_costs,
forecast_next_month=forecast_next_month,
depreciation_cost=depreciation_cost,
cost_per_month=cost_per_month,
cost_by_category=cost_by_category,
categories=categories,
liters=liters,
distance_km=distance_km,
avg_consumption_l_per_100km=avg_consumption,
@@ -57,6 +113,92 @@ async def get_ownership_stats(
)
def period_days(date_from: date, date_to: date) -> int:
return max((date_to - date_from).days + 1, 1)
def add_months(value: date, months: int) -> date:
month = value.month - 1 + months
year = value.year + month // 12
month = month % 12 + 1
day = min(value.day, calendar.monthrange(year, month)[1])
return date(year, month, day)
def overlap_days(left_start: date, left_end: date, right_start: date, right_end: date) -> int:
start = max(left_start, right_start)
end = min(left_end, right_end)
if end < start:
return 0
return period_days(start, end)
def expense_window(entry: ExpenseEntry) -> tuple[date, date]:
if entry.period_start and entry.period_end:
return entry.period_start, entry.period_end
if entry.period_start and entry.period_months:
return entry.period_start, add_months(entry.period_start, entry.period_months) - timedelta(days=1)
if entry.period_months:
return entry.entry_date, add_months(entry.entry_date, entry.period_months) - timedelta(days=1)
return entry.entry_date, entry.entry_date
def allocated_expense_cost(entry: ExpenseEntry, date_from: date, date_to: date) -> Decimal:
start, end = expense_window(entry)
total_days = period_days(start, end)
matched_days = overlap_days(start, end, date_from, date_to)
if matched_days <= 0:
return Decimal("0")
if total_days <= 1 and start == entry.entry_date:
return Decimal(entry.total_cost)
return (Decimal(entry.total_cost) * Decimal(matched_days) / Decimal(total_days)).quantize(Decimal("0.01"))
async def expense_period_totals(
session: AsyncSession, car_id: int, date_from: date, date_to: date
) -> tuple[Decimal, Decimal, int, dict[str, Decimal]]:
result = await session.execute(
select(ExpenseEntry)
.where(
ExpenseEntry.car_id == car_id,
or_(
ExpenseEntry.entry_date.between(date_from, date_to),
ExpenseEntry.period_start.between(date_from, date_to),
ExpenseEntry.period_end.between(date_from, date_to),
(ExpenseEntry.period_start <= date_from) & (ExpenseEntry.period_end >= date_to),
),
)
.order_by(ExpenseEntry.entry_date.asc(), ExpenseEntry.id.asc())
)
total = Decimal("0")
recurring = Decimal("0")
categories: dict[str, Decimal] = {}
count = 0
for entry in result.scalars():
amount = allocated_expense_cost(entry, date_from, date_to)
if amount <= 0:
continue
count += 1
total += amount
category = entry.category.value if isinstance(entry.category, ExpenseCategory) else str(entry.category)
categories[category] = categories.get(category, Decimal("0")) + amount
if entry.is_recurring or entry.category in {ExpenseCategory.insurance, ExpenseCategory.loan_payment, ExpenseCategory.loan_interest}:
recurring += amount
return total.quantize(Decimal("0.01")), recurring.quantize(Decimal("0.01")), count, categories
def calculate_depreciation(car: Car, date_from: date, date_to: date) -> Decimal:
if not car.include_depreciation or not car.purchase_price or not car.purchase_date:
return Decimal("0")
depreciation_start = car.purchase_date
depreciation_end = add_months(car.purchase_date, 60) - timedelta(days=1)
matched_days = overlap_days(depreciation_start, depreciation_end, date_from, date_to)
if matched_days <= 0:
return Decimal("0")
daily_cost = Decimal(car.purchase_price) / Decimal(period_days(depreciation_start, depreciation_end))
return (daily_cost * Decimal(matched_days)).quantize(Decimal("0.01"))
async def full_tank_consumption(
session: AsyncSession, car_id: int, date_from: date, date_to: date
) -> float | None:

View File

@@ -1,6 +1,11 @@
import asyncio
import re
from dataclasses import dataclass
from functools import lru_cache
from io import BytesIO
from typing import Protocol
from app.core.config import settings
from app.services.vehicle_identity import normalize_license_plate, validate_vin
@@ -15,34 +20,95 @@ class OcrCandidate:
class OcrResult:
recognized_text: str
candidates: list[OcrCandidate]
provider: str = "heuristic"
class OCRProvider:
class OCRProvider(Protocol):
async def recognize(self, content: bytes, filename: str | None = None) -> OcrResult:
raise NotImplementedError
...
class StubOCRProvider(OCRProvider):
class TextHeuristicOCRProvider:
provider_name = "heuristic"
async def recognize(self, content: bytes, filename: str | None = None) -> OcrResult:
text = " ".join(
[
filename or "",
content.decode("utf-8", errors="ignore"),
]
text = " ".join([filename or "", content.decode("utf-8", errors="ignore")])
return build_ocr_result(text, provider=self.provider_name, base_confidence=0.62)
class TesseractOCRProvider:
provider_name = "tesseract"
async def recognize(self, content: bytes, filename: str | None = None) -> OcrResult:
text = await asyncio.to_thread(self._recognize_sync, content)
if not text.strip():
fallback = await TextHeuristicOCRProvider().recognize(content, filename)
fallback.provider = self.provider_name
return fallback
return build_ocr_result(text, provider=self.provider_name, base_confidence=0.78)
def _recognize_sync(self, content: bytes) -> str:
try:
import pytesseract
from PIL import Image
except ImportError:
return ""
try:
image = Image.open(BytesIO(content))
except Exception:
return ""
try:
return pytesseract.image_to_string(image, lang=settings.ocr_languages)
except Exception:
return pytesseract.image_to_string(image)
class CompositeOCRProvider:
def __init__(self) -> None:
provider = settings.ocr_provider.lower()
self.primary: OCRProvider = (
TextHeuristicOCRProvider() if provider == "heuristic" else TesseractOCRProvider()
)
compact = re.sub(r"\s+", " ", text).strip()
candidates: list[OcrCandidate] = []
for raw in re.findall(r"\b[A-HJ-NPR-Z0-9]{17}\b", compact.upper()):
try:
candidates.append(OcrCandidate(type="vin", value=validate_vin(raw) or raw, confidence=0.84))
except ValueError:
continue
for raw in re.findall(r"\b[0-9A-ZА-Я가-힣][0-9A-ZА-Я가-힣\-\s]{4,10}\b", compact.upper()):
async def recognize(self, content: bytes, filename: str | None = None) -> OcrResult:
return await self.primary.recognize(content, filename)
def build_ocr_result(text: str, *, provider: str, base_confidence: float) -> OcrResult:
compact = re.sub(r"\s+", " ", text.replace("\xa0", " ")).strip()
candidates: list[OcrCandidate] = []
upper = compact.upper()
seen: set[tuple[str, str]] = set()
for raw in re.findall(r"\b[A-HJ-NPR-Z0-9]{17}\b", upper):
try:
value = validate_vin(raw) or raw
except ValueError:
continue
key = ("vin", value)
if key not in seen:
seen.add(key)
candidates.append(OcrCandidate(type="vin", value=value, confidence=min(base_confidence + 0.12, 0.95)))
plate_patterns = [
r"\b\d{2,3}\s*[가-힣]\s*\d{4}\b",
r"\b[A-ZА-Я]{1}\s?\d{3}\s?[A-ZА-Я]{2}\s?\d{2,3}\b",
r"\b[0-9A-ZА-Я가-힣][0-9A-ZА-Я가-힣\-\s]{4,10}\b",
]
for pattern in plate_patterns:
for raw in re.findall(pattern, upper):
normalized = normalize_license_plate(raw)
if normalized and 5 <= len(normalized) <= 10 and not any(item.value == normalized for item in candidates):
candidates.append(OcrCandidate(type="license_plate", value=normalized, confidence=0.62))
return OcrResult(recognized_text=compact, candidates=candidates[:8])
if normalized and 5 <= len(normalized) <= 10:
key = ("license_plate", normalized)
if key not in seen:
seen.add(key)
candidates.append(
OcrCandidate(type="license_plate", value=normalized, confidence=base_confidence)
)
return OcrResult(recognized_text=compact, candidates=candidates[:12], provider=provider)
@lru_cache
def get_ocr_provider() -> OCRProvider:
return StubOCRProvider()
return CompositeOCRProvider()

View File

@@ -436,13 +436,13 @@ async def compute_service_center_score(session: AsyncSession, center: ServiceCen
confirmation_rate = Decimal(len(confirmed) * 100) / Decimal(len(relevant))
dispute_rate = Decimal(len(disputed) * 100) / Decimal(len(relevant))
score = 20 if center.verification_status == "verified" else 5
score = 20 if center.verification_status in {"approved", "verified"} else 5
score += min(30, len(confirmed) * 5)
score += int(min(30, confirmation_rate * Decimal("0.3")))
score -= int(min(25, dispute_rate * Decimal("0.5")))
score = max(0, min(100, score))
if center.verification_status != "verified":
if center.verification_status not in {"approved", "verified"}:
level = "new_service"
elif score >= 85:
level = "high_confidence_service"