from __future__ import annotations from dataclasses import dataclass from datetime import UTC, date, datetime, timedelta from decimal import Decimal from sqlalchemy import func, select from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.orm import selectinload from app.models.car import Car, ServiceCenter, ServiceVisit from app.models.expense import FuelEntry, ServiceEntry from app.models.gamification import ( Achievement, EngagementEvent, ServiceCenterScore, UserAchievement, VehicleScore, ) @dataclass(frozen=True) class MissingItem: code: str title: str description: str weight: int def as_dict(self) -> dict: return { "code": self.code, "title": self.title, "description": self.description, "weight": self.weight, } DEFAULT_ACHIEVEMENTS = [ { "code": "first_service_record", "scope": "vehicle", "title": "First Service Record", "description": "The vehicle passport has its first service record.", "icon": "service", "category": "maintenance", }, { "code": "first_verified_service", "scope": "vehicle", "title": "First Verified Service", "description": "A service-center visit was confirmed by the owner.", "icon": "verified", "category": "trust", }, { "code": "full_vehicle_profile", "scope": "vehicle", "title": "Full Vehicle Profile", "description": "The digital vehicle passport has high-quality identity and maintenance data.", "icon": "passport", "category": "profile", }, { "code": "fuel_tracking_active", "scope": "vehicle", "title": "Fuel Tracking Active", "description": "Fuel records are consistent enough to improve running-cost analytics.", "icon": "fuel", "category": "tracking", }, { "code": "trusted_history", "scope": "vehicle", "title": "Trusted History", "description": "Most maintenance records are confirmed and suitable for a trusted vehicle history.", "icon": "shield", "category": "trust", }, { "code": "complete_garage", "scope": "user", "title": "Complete Garage", "description": "Every active vehicle in the garage has a strong digital passport.", "icon": "garage", "category": "garage", }, ] def profile_quality(score: int) -> str: if score >= 86: return "high_confidence" if score >= 61: return "strong" if score >= 31: return "useful" return "basic" def verified_history_status(score: int, confirmed_visits: int) -> str: if score >= 80 and confirmed_visits > 0: return "verified" if score >= 30 or confirmed_visits > 0: return "partially_verified" return "self_reported" def score_maintenance(car: Car, service_entries: list[ServiceEntry], visits: list[ServiceVisit]) -> tuple[int, str]: today = date.today() current_odometer = car.current_odometer red = False yellow = False has_baseline = bool(service_entries or visits) for entry in service_entries: if entry.next_due_date and entry.next_due_date < today: red = True elif entry.next_due_date and (entry.next_due_date - today).days <= 30: yellow = True if current_odometer and entry.next_due_odometer: overdue_km = current_odometer - entry.next_due_odometer if overdue_km > 0: red = True elif overdue_km >= -1000: yellow = True for visit in visits: for item in getattr(visit, "work_items", []) or []: if item.next_due_date and item.next_due_date < today: red = True elif item.next_due_date and (item.next_due_date - today).days <= 30: yellow = True if current_odometer and item.next_due_odometer: overdue_km = current_odometer - item.next_due_odometer if overdue_km > 0: red = True elif overdue_km >= -1000: yellow = True if red: return 30, "red" if yellow: return 70, "yellow" if has_baseline: return 100, "green" return 55, "unknown" async def compute_vehicle_score(session: AsyncSession, car: Car) -> VehicleScore: fuel_entries = list( ( await session.execute( select(FuelEntry).where(FuelEntry.car_id == car.id).order_by(FuelEntry.entry_date.desc()) ) ).scalars() ) service_entries = list( ( await session.execute( select(ServiceEntry).where(ServiceEntry.car_id == car.id).order_by(ServiceEntry.entry_date.desc()) ) ).scalars() ) visits = list( ( await session.execute( select(ServiceVisit) .options(selectinload(ServiceVisit.work_items)) .where(ServiceVisit.vehicle_id == car.id) .order_by(ServiceVisit.visit_date.desc()) ) ).scalars() ) score = 5 missing: list[MissingItem] = [] baseline_fields = [car.make, car.model, car.year, car.name] baseline_points = int(round(15 * sum(1 for value in baseline_fields if value) / len(baseline_fields))) score += baseline_points if baseline_points < 15: missing.append( MissingItem( "vehicle_baseline", "Complete make, model and year", "These fields make the passport easier to identify and export.", 15 - baseline_points, ) ) if car.vin_normalized: score += 15 else: missing.append(MissingItem("vin", "Add VIN", "VIN raises identity confidence for the vehicle passport.", 15)) if car.license_plate_normalized and car.license_plate_country: score += 10 else: missing.append( MissingItem("license_plate", "Add license plate", "Plate and country help service centers request access safely.", 10) ) if car.current_odometer: score += 10 else: missing.append(MissingItem("odometer", "Add current mileage", "Mileage improves reminders and maintenance health.", 10)) if car.fuel_type and (fuel_entries or car.target_consumption_l_per_100km): score += 10 else: missing.append( MissingItem("fuel_baseline", "Add fuel baseline", "Fuel type and records improve consumption analytics.", 10) ) if car.engine_oil_type and car.engine_oil_volume_l: score += 10 else: missing.append(MissingItem("engine_oil", "Add oil specification", "Oil type and volume make service reminders useful.", 10)) fluid_fields = [car.transmission_fluid_type, car.coolant_type, car.brake_fluid_type] fluid_points = min(5, sum(1 for value in fluid_fields if value) * 2) score += fluid_points if fluid_points < 5: missing.append(MissingItem("fluids", "Add fluid details", "Transmission, coolant and brake fluid data improve service readiness.", 5 - fluid_points)) if fuel_entries: score += 5 else: missing.append(MissingItem("fuel_entry", "Add first fuel record", "Fuel records unlock more accurate ownership cost analytics.", 5)) if service_entries or visits: score += 5 else: missing.append(MissingItem("service_entry", "Add service history", "Service records build the maintenance timeline.", 5)) confirmed_visits = [visit for visit in visits if visit.status == "confirmed"] if confirmed_visits: score += 10 else: missing.append( MissingItem( "confirmed_service", "Confirm a service-center visit", "Owner-confirmed service visits make the history more trustworthy.", 10, ) ) total_service_records = len(service_entries) + len([visit for visit in visits if visit.status != "draft"]) trusted_records = len(confirmed_visits) history_score = int(round((trusted_records / total_service_records) * 100)) if total_service_records else 0 maintenance_score, maintenance_status = score_maintenance(car, service_entries, visits) score = min(100, score) result = await session.execute(select(VehicleScore).where(VehicleScore.vehicle_id == car.id)) vehicle_score = result.scalar_one_or_none() payload = { "completeness_score": score, "verified_history_score": history_score, "maintenance_health_score": maintenance_score, "maintenance_status": maintenance_status, "profile_quality": profile_quality(score), "verified_history_status": verified_history_status(history_score, trusted_records), "missing_items": [item.as_dict() for item in missing], } if vehicle_score is None: vehicle_score = VehicleScore(vehicle_id=car.id, **payload) session.add(vehicle_score) else: for key, value in payload.items(): setattr(vehicle_score, key, value) await session.flush() await evaluate_vehicle_achievements(session, car, vehicle_score, fuel_entries, service_entries, visits) return vehicle_score async def ensure_default_achievements(session: AsyncSession) -> dict[str, Achievement]: result = await session.execute(select(Achievement)) existing = {achievement.code: achievement for achievement in result.scalars()} for item in DEFAULT_ACHIEVEMENTS: if item["code"] not in existing: achievement = Achievement(**item) session.add(achievement) existing[item["code"]] = achievement await session.flush() return existing async def unlock_achievement( session: AsyncSession, *, user_id: int, achievement: Achievement, vehicle_id: int | None = None, service_center_id: int | None = None, metadata: dict | None = None, ) -> None: result = await session.execute( select(UserAchievement).where( UserAchievement.user_id == user_id, UserAchievement.achievement_id == achievement.id, UserAchievement.vehicle_id.is_(None) if vehicle_id is None else UserAchievement.vehicle_id == vehicle_id, UserAchievement.service_center_id.is_(None) if service_center_id is None else UserAchievement.service_center_id == service_center_id, ) ) if result.scalar_one_or_none() is not None: return session.add( UserAchievement( user_id=user_id, achievement_id=achievement.id, vehicle_id=vehicle_id, service_center_id=service_center_id, metadata_json=metadata, ) ) session.add( EngagementEvent( user_id=user_id, vehicle_id=vehicle_id, service_center_id=service_center_id, event_type="achievement_unlocked", metadata_json={"code": achievement.code}, ) ) async def record_engagement_event( session: AsyncSession, *, event_type: str, user_id: int | None = None, vehicle_id: int | None = None, service_center_id: int | None = None, metadata: dict | None = None, cooldown_minutes: int = 360, ) -> None: since = datetime.now(UTC) - timedelta(minutes=cooldown_minutes) result = await session.execute( select(EngagementEvent).where( EngagementEvent.event_type == event_type, EngagementEvent.user_id.is_(None) if user_id is None else EngagementEvent.user_id == user_id, EngagementEvent.vehicle_id.is_(None) if vehicle_id is None else EngagementEvent.vehicle_id == vehicle_id, EngagementEvent.service_center_id.is_(None) if service_center_id is None else EngagementEvent.service_center_id == service_center_id, EngagementEvent.created_at >= since, ) ) if result.scalar_one_or_none() is not None: return session.add( EngagementEvent( user_id=user_id, vehicle_id=vehicle_id, service_center_id=service_center_id, event_type=event_type, metadata_json=metadata, ) ) async def evaluate_vehicle_achievements( session: AsyncSession, car: Car, vehicle_score: VehicleScore, fuel_entries: list[FuelEntry], service_entries: list[ServiceEntry], visits: list[ServiceVisit], ) -> None: achievements = await ensure_default_achievements(session) if service_entries or visits: await unlock_achievement( session, user_id=car.owner_id, vehicle_id=car.id, achievement=achievements["first_service_record"], ) if any(visit.status == "confirmed" for visit in visits): await unlock_achievement( session, user_id=car.owner_id, vehicle_id=car.id, achievement=achievements["first_verified_service"], ) if vehicle_score.completeness_score >= 90: await unlock_achievement( session, user_id=car.owner_id, vehicle_id=car.id, achievement=achievements["full_vehicle_profile"], ) if len(fuel_entries) >= 3: await unlock_achievement( session, user_id=car.owner_id, vehicle_id=car.id, achievement=achievements["fuel_tracking_active"], ) if vehicle_score.verified_history_status == "verified": await unlock_achievement( session, user_id=car.owner_id, vehicle_id=car.id, achievement=achievements["trusted_history"], ) async def evaluate_garage_achievements(session: AsyncSession, user_id: int) -> None: result = await session.execute(select(Car).where(Car.owner_id == user_id)) cars = list(result.scalars()) if not cars: return scores = [] for car in cars: scores.append(await compute_vehicle_score(session, car)) if all(score.completeness_score >= 80 for score in scores): achievements = await ensure_default_achievements(session) await unlock_achievement(session, user_id=user_id, achievement=achievements["complete_garage"]) async def compute_service_center_score(session: AsyncSession, center: ServiceCenter) -> ServiceCenterScore: visits = list( ( await session.execute(select(ServiceVisit).where(ServiceVisit.service_center_id == center.id)) ).scalars() ) relevant = [visit for visit in visits if visit.status not in {"draft", "cancelled"}] confirmed = [visit for visit in relevant if visit.status == "confirmed"] disputed = [visit for visit in relevant if visit.status == "disputed"] confirmation_rate = Decimal("0") dispute_rate = Decimal("0") if relevant: confirmation_rate = Decimal(len(confirmed) * 100) / Decimal(len(relevant)) dispute_rate = Decimal(len(disputed) * 100) / Decimal(len(relevant)) score = 20 if center.verification_status in {"approved", "verified"} else 5 score += min(30, len(confirmed) * 5) score += int(min(30, confirmation_rate * Decimal("0.3"))) score -= int(min(25, dispute_rate * Decimal("0.5"))) score = max(0, min(100, score)) if center.verification_status not in {"approved", "verified"}: level = "new_service" elif score >= 85: level = "high_confidence_service" elif score >= 65: level = "reliable_service" else: level = "verified_service" result = await session.execute(select(ServiceCenterScore).where(ServiceCenterScore.service_center_id == center.id)) center_score = result.scalar_one_or_none() payload = { "trust_score": score, "trust_level": level, "confirmed_visits_count": len(confirmed), "confirmation_rate": confirmation_rate.quantize(Decimal("0.01")), "dispute_rate": dispute_rate.quantize(Decimal("0.01")), } if center_score is None: center_score = ServiceCenterScore(service_center_id=center.id, **payload) session.add(center_score) else: for key, value in payload.items(): setattr(center_score, key, value) await session.flush() return center_score async def garage_quality(session: AsyncSession, user_id: int) -> dict: result = await session.execute(select(Car).where(Car.owner_id == user_id)) cars = list(result.scalars()) if not cars: return {"vehicles_count": 0, "average_completeness": 0, "verified_records": 0} scores = [await compute_vehicle_score(session, car) for car in cars] confirmed_count = ( await session.execute( select(func.count(ServiceVisit.id)).where( ServiceVisit.vehicle_id.in_([car.id for car in cars]), ServiceVisit.status == "confirmed", ) ) ).scalar_one() return { "vehicles_count": len(cars), "average_completeness": int(round(sum(score.completeness_score for score in scores) / len(scores))), "verified_records": confirmed_count, }