581 lines
20 KiB
Python
581 lines
20 KiB
Python
from __future__ import annotations
|
||
|
||
from dataclasses import dataclass
|
||
from datetime import UTC, date, datetime, timedelta
|
||
from decimal import Decimal
|
||
|
||
from sqlalchemy import func, select
|
||
from sqlalchemy.ext.asyncio import AsyncSession
|
||
from sqlalchemy.orm import selectinload
|
||
|
||
from app.models.car import Car, ServiceCenter, ServiceVisit
|
||
from app.models.expense import FuelEntry, ServiceEntry
|
||
from app.models.gamification import (
|
||
Achievement,
|
||
EngagementEvent,
|
||
ServiceCenterScore,
|
||
UserAchievement,
|
||
VehicleScore,
|
||
)
|
||
|
||
|
||
@dataclass(frozen=True)
|
||
class MissingItem:
|
||
code: str
|
||
title: str
|
||
description: str
|
||
weight: int
|
||
|
||
def as_dict(self) -> dict:
|
||
return {
|
||
"code": self.code,
|
||
"title": self.title,
|
||
"description": self.description,
|
||
"weight": self.weight,
|
||
}
|
||
|
||
|
||
DEFAULT_ACHIEVEMENTS = [
|
||
{
|
||
"code": "vehicle_added",
|
||
"scope": "vehicle",
|
||
"title": "Авто добавлено",
|
||
"description": "В гараже появилась первая карточка автомобиля.",
|
||
"icon": "car",
|
||
"category": "profile",
|
||
},
|
||
{
|
||
"code": "vin_added",
|
||
"scope": "vehicle",
|
||
"title": "VIN указан",
|
||
"description": "Идентификация автомобиля стала надежнее.",
|
||
"icon": "vin",
|
||
"category": "profile",
|
||
},
|
||
{
|
||
"code": "license_plate_added",
|
||
"scope": "vehicle",
|
||
"title": "Госномер указан",
|
||
"description": "Карточку проще связать с сервисными визитами.",
|
||
"icon": "plate",
|
||
"category": "profile",
|
||
},
|
||
{
|
||
"code": "vehicle_profile_half",
|
||
"scope": "vehicle",
|
||
"title": "Карточка авто заполнена на 50%",
|
||
"description": "Данных уже достаточно для базовой аналитики.",
|
||
"icon": "progress",
|
||
"category": "profile",
|
||
},
|
||
{
|
||
"code": "vehicle_profile_full",
|
||
"scope": "vehicle",
|
||
"title": "Карточка авто заполнена полностью",
|
||
"description": "Цифровой паспорт автомобиля готов к эксплуатации.",
|
||
"icon": "passport",
|
||
"category": "profile",
|
||
},
|
||
{
|
||
"code": "first_fuel_record",
|
||
"scope": "vehicle",
|
||
"title": "Первая заправка",
|
||
"description": "Расход топлива начал формировать историю владения.",
|
||
"icon": "fuel",
|
||
"category": "tracking",
|
||
},
|
||
{
|
||
"code": "first_service_record",
|
||
"scope": "vehicle",
|
||
"title": "First Service Record",
|
||
"description": "The vehicle passport has its first service record.",
|
||
"icon": "service",
|
||
"category": "maintenance",
|
||
},
|
||
{
|
||
"code": "first_verified_service",
|
||
"scope": "vehicle",
|
||
"title": "First Verified Service",
|
||
"description": "A service-center visit was confirmed by the owner.",
|
||
"icon": "verified",
|
||
"category": "trust",
|
||
},
|
||
{
|
||
"code": "full_vehicle_profile",
|
||
"scope": "vehicle",
|
||
"title": "Full Vehicle Profile",
|
||
"description": "The digital vehicle passport has high-quality identity and maintenance data.",
|
||
"icon": "passport",
|
||
"category": "profile",
|
||
},
|
||
{
|
||
"code": "fuel_tracking_active",
|
||
"scope": "vehicle",
|
||
"title": "Fuel Tracking Active",
|
||
"description": "Fuel records are consistent enough to improve running-cost analytics.",
|
||
"icon": "fuel",
|
||
"category": "tracking",
|
||
},
|
||
{
|
||
"code": "trusted_history",
|
||
"scope": "vehicle",
|
||
"title": "Trusted History",
|
||
"description": "Most maintenance records are confirmed and suitable for a trusted vehicle history.",
|
||
"icon": "shield",
|
||
"category": "trust",
|
||
},
|
||
{
|
||
"code": "complete_garage",
|
||
"scope": "user",
|
||
"title": "Complete Garage",
|
||
"description": "Every active vehicle in the garage has a strong digital passport.",
|
||
"icon": "garage",
|
||
"category": "garage",
|
||
},
|
||
]
|
||
|
||
|
||
def profile_quality(score: int) -> str:
|
||
if score >= 86:
|
||
return "high_confidence"
|
||
if score >= 61:
|
||
return "strong"
|
||
if score >= 31:
|
||
return "useful"
|
||
return "basic"
|
||
|
||
|
||
def verified_history_status(score: int, confirmed_visits: int) -> str:
|
||
if score >= 80 and confirmed_visits > 0:
|
||
return "verified"
|
||
if score >= 30 or confirmed_visits > 0:
|
||
return "partially_verified"
|
||
return "self_reported"
|
||
|
||
|
||
def score_maintenance(car: Car, service_entries: list[ServiceEntry], visits: list[ServiceVisit]) -> tuple[int, str]:
|
||
today = date.today()
|
||
current_odometer = car.current_odometer
|
||
red = False
|
||
yellow = False
|
||
has_baseline = bool(service_entries or visits)
|
||
|
||
for entry in service_entries:
|
||
if entry.next_due_date and entry.next_due_date < today:
|
||
red = True
|
||
elif entry.next_due_date and (entry.next_due_date - today).days <= 30:
|
||
yellow = True
|
||
if current_odometer and entry.next_due_odometer:
|
||
overdue_km = current_odometer - entry.next_due_odometer
|
||
if overdue_km > 0:
|
||
red = True
|
||
elif overdue_km >= -1000:
|
||
yellow = True
|
||
|
||
for visit in visits:
|
||
for item in getattr(visit, "work_items", []) or []:
|
||
if item.next_due_date and item.next_due_date < today:
|
||
red = True
|
||
elif item.next_due_date and (item.next_due_date - today).days <= 30:
|
||
yellow = True
|
||
if current_odometer and item.next_due_odometer:
|
||
overdue_km = current_odometer - item.next_due_odometer
|
||
if overdue_km > 0:
|
||
red = True
|
||
elif overdue_km >= -1000:
|
||
yellow = True
|
||
|
||
if red:
|
||
return 30, "red"
|
||
if yellow:
|
||
return 70, "yellow"
|
||
if has_baseline:
|
||
return 100, "green"
|
||
return 55, "unknown"
|
||
|
||
|
||
async def compute_vehicle_score(session: AsyncSession, car: Car) -> VehicleScore:
|
||
fuel_entries = list(
|
||
(
|
||
await session.execute(
|
||
select(FuelEntry).where(FuelEntry.car_id == car.id).order_by(FuelEntry.entry_date.desc())
|
||
)
|
||
).scalars()
|
||
)
|
||
service_entries = list(
|
||
(
|
||
await session.execute(
|
||
select(ServiceEntry).where(ServiceEntry.car_id == car.id).order_by(ServiceEntry.entry_date.desc())
|
||
)
|
||
).scalars()
|
||
)
|
||
visits = list(
|
||
(
|
||
await session.execute(
|
||
select(ServiceVisit)
|
||
.options(selectinload(ServiceVisit.work_items))
|
||
.where(ServiceVisit.vehicle_id == car.id)
|
||
.order_by(ServiceVisit.visit_date.desc())
|
||
)
|
||
).scalars()
|
||
)
|
||
|
||
score = 5
|
||
missing: list[MissingItem] = []
|
||
|
||
baseline_fields = [car.make, car.model, car.year, car.name]
|
||
baseline_points = int(round(15 * sum(1 for value in baseline_fields if value) / len(baseline_fields)))
|
||
score += baseline_points
|
||
if baseline_points < 15:
|
||
missing.append(
|
||
MissingItem(
|
||
"vehicle_baseline",
|
||
"Complete make, model and year",
|
||
"These fields make the passport easier to identify and export.",
|
||
15 - baseline_points,
|
||
)
|
||
)
|
||
|
||
if car.vin_normalized:
|
||
score += 15
|
||
else:
|
||
missing.append(MissingItem("vin", "Add VIN", "VIN raises identity confidence for the vehicle passport.", 15))
|
||
|
||
if car.license_plate_normalized and car.license_plate_country:
|
||
score += 10
|
||
else:
|
||
missing.append(
|
||
MissingItem("license_plate", "Add license plate", "Plate and country help service centers request access safely.", 10)
|
||
)
|
||
|
||
if car.current_odometer:
|
||
score += 10
|
||
else:
|
||
missing.append(MissingItem("odometer", "Add current mileage", "Mileage improves reminders and maintenance health.", 10))
|
||
|
||
if car.fuel_type and (fuel_entries or car.target_consumption_l_per_100km):
|
||
score += 10
|
||
else:
|
||
missing.append(
|
||
MissingItem("fuel_baseline", "Add fuel baseline", "Fuel type and records improve consumption analytics.", 10)
|
||
)
|
||
|
||
if car.engine_oil_type and car.engine_oil_volume_l:
|
||
score += 10
|
||
else:
|
||
missing.append(MissingItem("engine_oil", "Add oil specification", "Oil type and volume make service reminders useful.", 10))
|
||
|
||
fluid_fields = [car.transmission_fluid_type, car.coolant_type, car.brake_fluid_type]
|
||
fluid_points = min(5, sum(1 for value in fluid_fields if value) * 2)
|
||
score += fluid_points
|
||
if fluid_points < 5:
|
||
missing.append(MissingItem("fluids", "Add fluid details", "Transmission, coolant and brake fluid data improve service readiness.", 5 - fluid_points))
|
||
|
||
if fuel_entries:
|
||
score += 5
|
||
else:
|
||
missing.append(MissingItem("fuel_entry", "Add first fuel record", "Fuel records unlock more accurate ownership cost analytics.", 5))
|
||
|
||
if service_entries or visits:
|
||
score += 5
|
||
else:
|
||
missing.append(MissingItem("service_entry", "Add service history", "Service records build the maintenance timeline.", 5))
|
||
|
||
confirmed_visits = [visit for visit in visits if visit.status == "confirmed"]
|
||
if confirmed_visits:
|
||
score += 10
|
||
else:
|
||
missing.append(
|
||
MissingItem(
|
||
"confirmed_service",
|
||
"Confirm a service-center visit",
|
||
"Owner-confirmed service visits make the history more trustworthy.",
|
||
10,
|
||
)
|
||
)
|
||
|
||
total_service_records = len(service_entries) + len([visit for visit in visits if visit.status != "draft"])
|
||
trusted_records = len(confirmed_visits)
|
||
history_score = int(round((trusted_records / total_service_records) * 100)) if total_service_records else 0
|
||
maintenance_score, maintenance_status = score_maintenance(car, service_entries, visits)
|
||
|
||
score = min(100, score)
|
||
result = await session.execute(select(VehicleScore).where(VehicleScore.vehicle_id == car.id))
|
||
vehicle_score = result.scalar_one_or_none()
|
||
payload = {
|
||
"completeness_score": score,
|
||
"verified_history_score": history_score,
|
||
"maintenance_health_score": maintenance_score,
|
||
"maintenance_status": maintenance_status,
|
||
"profile_quality": profile_quality(score),
|
||
"verified_history_status": verified_history_status(history_score, trusted_records),
|
||
"missing_items": [item.as_dict() for item in missing],
|
||
}
|
||
if vehicle_score is None:
|
||
vehicle_score = VehicleScore(vehicle_id=car.id, **payload)
|
||
session.add(vehicle_score)
|
||
else:
|
||
for key, value in payload.items():
|
||
setattr(vehicle_score, key, value)
|
||
await session.flush()
|
||
await evaluate_vehicle_achievements(session, car, vehicle_score, fuel_entries, service_entries, visits)
|
||
return vehicle_score
|
||
|
||
|
||
async def ensure_default_achievements(session: AsyncSession) -> dict[str, Achievement]:
|
||
result = await session.execute(select(Achievement))
|
||
existing = {achievement.code: achievement for achievement in result.scalars()}
|
||
for item in DEFAULT_ACHIEVEMENTS:
|
||
if item["code"] not in existing:
|
||
achievement = Achievement(**item)
|
||
session.add(achievement)
|
||
existing[item["code"]] = achievement
|
||
await session.flush()
|
||
return existing
|
||
|
||
|
||
async def unlock_achievement(
|
||
session: AsyncSession,
|
||
*,
|
||
user_id: int,
|
||
achievement: Achievement,
|
||
vehicle_id: int | None = None,
|
||
service_center_id: int | None = None,
|
||
metadata: dict | None = None,
|
||
) -> None:
|
||
result = await session.execute(
|
||
select(UserAchievement).where(
|
||
UserAchievement.user_id == user_id,
|
||
UserAchievement.achievement_id == achievement.id,
|
||
UserAchievement.vehicle_id.is_(None) if vehicle_id is None else UserAchievement.vehicle_id == vehicle_id,
|
||
UserAchievement.service_center_id.is_(None)
|
||
if service_center_id is None
|
||
else UserAchievement.service_center_id == service_center_id,
|
||
)
|
||
)
|
||
if result.scalar_one_or_none() is not None:
|
||
return
|
||
session.add(
|
||
UserAchievement(
|
||
user_id=user_id,
|
||
achievement_id=achievement.id,
|
||
vehicle_id=vehicle_id,
|
||
service_center_id=service_center_id,
|
||
metadata_json=metadata,
|
||
)
|
||
)
|
||
session.add(
|
||
EngagementEvent(
|
||
user_id=user_id,
|
||
vehicle_id=vehicle_id,
|
||
service_center_id=service_center_id,
|
||
event_type="achievement_unlocked",
|
||
metadata_json={"code": achievement.code},
|
||
)
|
||
)
|
||
|
||
|
||
async def record_engagement_event(
|
||
session: AsyncSession,
|
||
*,
|
||
event_type: str,
|
||
user_id: int | None = None,
|
||
vehicle_id: int | None = None,
|
||
service_center_id: int | None = None,
|
||
metadata: dict | None = None,
|
||
cooldown_minutes: int = 360,
|
||
) -> None:
|
||
since = datetime.now(UTC) - timedelta(minutes=cooldown_minutes)
|
||
result = await session.execute(
|
||
select(EngagementEvent).where(
|
||
EngagementEvent.event_type == event_type,
|
||
EngagementEvent.user_id.is_(None) if user_id is None else EngagementEvent.user_id == user_id,
|
||
EngagementEvent.vehicle_id.is_(None) if vehicle_id is None else EngagementEvent.vehicle_id == vehicle_id,
|
||
EngagementEvent.service_center_id.is_(None)
|
||
if service_center_id is None
|
||
else EngagementEvent.service_center_id == service_center_id,
|
||
EngagementEvent.created_at >= since,
|
||
)
|
||
)
|
||
if result.scalar_one_or_none() is not None:
|
||
return
|
||
session.add(
|
||
EngagementEvent(
|
||
user_id=user_id,
|
||
vehicle_id=vehicle_id,
|
||
service_center_id=service_center_id,
|
||
event_type=event_type,
|
||
metadata_json=metadata,
|
||
)
|
||
)
|
||
|
||
|
||
async def evaluate_vehicle_achievements(
|
||
session: AsyncSession,
|
||
car: Car,
|
||
vehicle_score: VehicleScore,
|
||
fuel_entries: list[FuelEntry],
|
||
service_entries: list[ServiceEntry],
|
||
visits: list[ServiceVisit],
|
||
) -> None:
|
||
achievements = await ensure_default_achievements(session)
|
||
await unlock_achievement(
|
||
session,
|
||
user_id=car.owner_id,
|
||
vehicle_id=car.id,
|
||
achievement=achievements["vehicle_added"],
|
||
)
|
||
if car.vin_normalized:
|
||
await unlock_achievement(
|
||
session,
|
||
user_id=car.owner_id,
|
||
vehicle_id=car.id,
|
||
achievement=achievements["vin_added"],
|
||
)
|
||
if car.license_plate_normalized:
|
||
await unlock_achievement(
|
||
session,
|
||
user_id=car.owner_id,
|
||
vehicle_id=car.id,
|
||
achievement=achievements["license_plate_added"],
|
||
)
|
||
if vehicle_score.completeness_score >= 50:
|
||
await unlock_achievement(
|
||
session,
|
||
user_id=car.owner_id,
|
||
vehicle_id=car.id,
|
||
achievement=achievements["vehicle_profile_half"],
|
||
)
|
||
if vehicle_score.completeness_score >= 95:
|
||
await unlock_achievement(
|
||
session,
|
||
user_id=car.owner_id,
|
||
vehicle_id=car.id,
|
||
achievement=achievements["vehicle_profile_full"],
|
||
)
|
||
if fuel_entries:
|
||
await unlock_achievement(
|
||
session,
|
||
user_id=car.owner_id,
|
||
vehicle_id=car.id,
|
||
achievement=achievements["first_fuel_record"],
|
||
)
|
||
if service_entries or visits:
|
||
await unlock_achievement(
|
||
session,
|
||
user_id=car.owner_id,
|
||
vehicle_id=car.id,
|
||
achievement=achievements["first_service_record"],
|
||
)
|
||
if any(visit.status == "confirmed" for visit in visits):
|
||
await unlock_achievement(
|
||
session,
|
||
user_id=car.owner_id,
|
||
vehicle_id=car.id,
|
||
achievement=achievements["first_verified_service"],
|
||
)
|
||
if vehicle_score.completeness_score >= 90:
|
||
await unlock_achievement(
|
||
session,
|
||
user_id=car.owner_id,
|
||
vehicle_id=car.id,
|
||
achievement=achievements["full_vehicle_profile"],
|
||
)
|
||
if len(fuel_entries) >= 3:
|
||
await unlock_achievement(
|
||
session,
|
||
user_id=car.owner_id,
|
||
vehicle_id=car.id,
|
||
achievement=achievements["fuel_tracking_active"],
|
||
)
|
||
if vehicle_score.verified_history_status == "verified":
|
||
await unlock_achievement(
|
||
session,
|
||
user_id=car.owner_id,
|
||
vehicle_id=car.id,
|
||
achievement=achievements["trusted_history"],
|
||
)
|
||
|
||
|
||
async def evaluate_garage_achievements(session: AsyncSession, user_id: int) -> None:
|
||
result = await session.execute(select(Car).where(Car.owner_id == user_id))
|
||
cars = list(result.scalars())
|
||
if not cars:
|
||
return
|
||
scores = []
|
||
for car in cars:
|
||
scores.append(await compute_vehicle_score(session, car))
|
||
if all(score.completeness_score >= 80 for score in scores):
|
||
achievements = await ensure_default_achievements(session)
|
||
await unlock_achievement(session, user_id=user_id, achievement=achievements["complete_garage"])
|
||
|
||
|
||
async def compute_service_center_score(session: AsyncSession, center: ServiceCenter) -> ServiceCenterScore:
|
||
visits = list(
|
||
(
|
||
await session.execute(select(ServiceVisit).where(ServiceVisit.service_center_id == center.id))
|
||
).scalars()
|
||
)
|
||
relevant = [visit for visit in visits if visit.status not in {"draft", "cancelled"}]
|
||
confirmed = [visit for visit in relevant if visit.status == "confirmed"]
|
||
disputed = [visit for visit in relevant if visit.status == "disputed"]
|
||
confirmation_rate = Decimal("0")
|
||
dispute_rate = Decimal("0")
|
||
if relevant:
|
||
confirmation_rate = Decimal(len(confirmed) * 100) / Decimal(len(relevant))
|
||
dispute_rate = Decimal(len(disputed) * 100) / Decimal(len(relevant))
|
||
|
||
score = 20 if center.verification_status in {"approved", "verified"} else 5
|
||
score += min(30, len(confirmed) * 5)
|
||
score += int(min(30, confirmation_rate * Decimal("0.3")))
|
||
score -= int(min(25, dispute_rate * Decimal("0.5")))
|
||
score = max(0, min(100, score))
|
||
|
||
if center.verification_status not in {"approved", "verified"}:
|
||
level = "new_service"
|
||
elif score >= 85:
|
||
level = "high_confidence_service"
|
||
elif score >= 65:
|
||
level = "reliable_service"
|
||
else:
|
||
level = "verified_service"
|
||
|
||
result = await session.execute(select(ServiceCenterScore).where(ServiceCenterScore.service_center_id == center.id))
|
||
center_score = result.scalar_one_or_none()
|
||
payload = {
|
||
"trust_score": score,
|
||
"trust_level": level,
|
||
"confirmed_visits_count": len(confirmed),
|
||
"confirmation_rate": confirmation_rate.quantize(Decimal("0.01")),
|
||
"dispute_rate": dispute_rate.quantize(Decimal("0.01")),
|
||
}
|
||
if center_score is None:
|
||
center_score = ServiceCenterScore(service_center_id=center.id, **payload)
|
||
session.add(center_score)
|
||
else:
|
||
for key, value in payload.items():
|
||
setattr(center_score, key, value)
|
||
await session.flush()
|
||
return center_score
|
||
|
||
|
||
async def garage_quality(session: AsyncSession, user_id: int) -> dict:
|
||
result = await session.execute(select(Car).where(Car.owner_id == user_id))
|
||
cars = list(result.scalars())
|
||
if not cars:
|
||
return {"vehicles_count": 0, "average_completeness": 0, "verified_records": 0}
|
||
scores = [await compute_vehicle_score(session, car) for car in cars]
|
||
confirmed_count = (
|
||
await session.execute(
|
||
select(func.count(ServiceVisit.id)).where(
|
||
ServiceVisit.vehicle_id.in_([car.id for car in cars]),
|
||
ServiceVisit.status == "confirmed",
|
||
)
|
||
)
|
||
).scalar_one()
|
||
return {
|
||
"vehicles_count": len(cars),
|
||
"average_completeness": int(round(sum(score.completeness_score for score in scores) / len(scores))),
|
||
"verified_records": confirmed_count,
|
||
}
|