from datetime import UTC, date, datetime, time, timedelta from decimal import Decimal from fastapi import HTTPException from sqlalchemy import func, or_, select from sqlalchemy.ext.asyncio import AsyncSession from app.models.car import ( Car, MaintenanceRecommendation, ServiceAppointment, ServiceCenter, ServiceCenterBookingSettings, ServiceCenterHoliday, ServiceEmployee, ServiceNotification, ServiceVisit, ServiceWorkItem, ) from app.models.expense import ServiceEntry from app.models.user import User from app.services.notifications import notify_user ACTIVE_APPOINTMENT_STATUSES = {"requested", "confirmed", "confirmed_by_sto", "proposed_new_time"} DEFAULT_SERVICE_DURATIONS = { "oil_change": 60, "diagnostics": 60, "repair": 120, "tire_service": 60, "brakes": 90, "fluids": 60, "maintenance": 90, "other": 60, } def _as_aware_utc(value: datetime) -> datetime: if value.tzinfo is None: return value.replace(tzinfo=UTC) return value.astimezone(UTC) def estimate_duration(service_type: str, requested: int | None = None) -> int: return requested or DEFAULT_SERVICE_DURATIONS.get(service_type, DEFAULT_SERVICE_DURATIONS["other"]) async def get_booking_settings( session: AsyncSession, service_center_id: int ) -> ServiceCenterBookingSettings: result = await session.execute( select(ServiceCenterBookingSettings).where( ServiceCenterBookingSettings.service_center_id == service_center_id ) ) settings = result.scalar_one_or_none() if settings is not None: return settings settings = ServiceCenterBookingSettings(service_center_id=service_center_id) session.add(settings) await session.flush() return settings def _slot_overlaps_lunch(start_time: time, end_time: time, settings: ServiceCenterBookingSettings) -> bool: if not settings.lunch_break_start or not settings.lunch_break_end: return False return start_time < settings.lunch_break_end and end_time > settings.lunch_break_start async def count_overlapping_appointments( session: AsyncSession, *, service_center_id: int, start_at: datetime, end_at: datetime, exclude_appointment_id: int | None = None, ) -> int: start_at = _as_aware_utc(start_at) end_at = _as_aware_utc(end_at) stmt = select(func.count(ServiceAppointment.id)).where( ServiceAppointment.service_center_id == service_center_id, ServiceAppointment.status.in_(ACTIVE_APPOINTMENT_STATUSES), ServiceAppointment.requested_start_at < end_at, ServiceAppointment.requested_end_at > start_at, ) if exclude_appointment_id: stmt = stmt.where(ServiceAppointment.id != exclude_appointment_id) return int((await session.execute(stmt)).scalar_one() or 0) async def ensure_slot_available( session: AsyncSession, *, service_center_id: int, start_at: datetime, duration_minutes: int, exclude_appointment_id: int | None = None, ) -> datetime: start_at = _as_aware_utc(start_at) now = datetime.now(UTC) if start_at <= now: raise HTTPException(status_code=409, detail="Cannot book past time") end_at = start_at + timedelta(minutes=duration_minutes) settings = await get_booking_settings(session, service_center_id) if not settings.accepts_online_booking: raise HTTPException(status_code=409, detail="Online booking is disabled") holiday = await session.execute( select(ServiceCenterHoliday).where( ServiceCenterHoliday.service_center_id == service_center_id, ServiceCenterHoliday.holiday_date == start_at.date(), ) ) if holiday.scalar_one_or_none() is not None: raise HTTPException(status_code=409, detail="Service center is closed on this date") working_days = settings.working_days or [0, 1, 2, 3, 4] if start_at.weekday() not in working_days: raise HTTPException(status_code=409, detail="Service center is closed on this weekday") if start_at.time() < settings.open_time or end_at.time() > settings.close_time: raise HTTPException(status_code=409, detail="Slot is outside working hours") if _slot_overlaps_lunch(start_at.time(), end_at.time(), settings): raise HTTPException(status_code=409, detail="Slot overlaps lunch break") overlapping = await count_overlapping_appointments( session, service_center_id=service_center_id, start_at=start_at, end_at=end_at, exclude_appointment_id=exclude_appointment_id, ) if overlapping >= settings.max_parallel_bookings: raise HTTPException(status_code=409, detail="Slot is already full") return end_at async def calculate_available_slots( session: AsyncSession, *, service_center_id: int, date_from: date, date_to: date, duration_minutes: int, limit: int = 80, ) -> list[tuple[datetime, datetime]]: settings = await get_booking_settings(session, service_center_id) holidays = { item.holiday_date for item in ( await session.execute( select(ServiceCenterHoliday).where( ServiceCenterHoliday.service_center_id == service_center_id, ServiceCenterHoliday.holiday_date >= date_from, ServiceCenterHoliday.holiday_date <= date_to, ) ) ).scalars() } slots: list[tuple[datetime, datetime]] = [] now = datetime.now(UTC) current = date_from step = timedelta(minutes=settings.slot_duration_minutes + settings.booking_buffer_minutes) working_days = settings.working_days or [0, 1, 2, 3, 4] while current <= date_to and len(slots) < limit: if current.weekday() not in working_days or current in holidays: current += timedelta(days=1) continue cursor = datetime.combine(current, settings.open_time, tzinfo=UTC) day_close = datetime.combine(current, settings.close_time, tzinfo=UTC) while cursor + timedelta(minutes=duration_minutes) <= day_close and len(slots) < limit: end_at = cursor + timedelta(minutes=duration_minutes) if cursor > now and not _slot_overlaps_lunch(cursor.time(), end_at.time(), settings): overlapping = await count_overlapping_appointments( session, service_center_id=service_center_id, start_at=cursor, end_at=end_at, ) if overlapping < settings.max_parallel_bookings: slots.append((cursor, end_at)) cursor += step current += timedelta(days=1) return slots async def create_service_notification( session: AsyncSession, *, recipient_user_id: int, notification_type: str, title: str, body: str | None = None, service_center_id: int | None = None, appointment_id: int | None = None, send_telegram: bool = True, idempotency_key: str | None = None, web_app_url: str | None = None, button_text: str = "Открыть", ) -> ServiceNotification: if idempotency_key: existing = ( await session.execute( select(ServiceNotification).where(ServiceNotification.idempotency_key == idempotency_key) ) ).scalar_one_or_none() if existing is not None: return existing notification = ServiceNotification( recipient_user_id=recipient_user_id, service_center_id=service_center_id, appointment_id=appointment_id, notification_type=notification_type, title=title, body=body, idempotency_key=idempotency_key, ) session.add(notification) if send_telegram: user = await session.get(User, recipient_user_id) if user is not None: notification.status = "processing" message = f"{title}\n{body}" if body else title delivered = await notify_user(user, message, web_app_url=web_app_url, button_text=button_text) if delivered: notification.status = "sent" notification.sent_at = datetime.now(UTC) else: notification.status = "retrying" notification.retry_count = 1 notification.last_error = "telegram_delivery_failed" return notification async def notify_service_staff( session: AsyncSession, *, service_center_id: int, notification_type: str, title: str, body: str | None = None, appointment_id: int | None = None, ) -> None: center = await session.get(ServiceCenter, service_center_id) recipient_ids: set[int] = set() if center and center.owner_user_id: recipient_ids.add(center.owner_user_id) employees = await session.execute( select(ServiceEmployee).where( ServiceEmployee.service_center_id == service_center_id, ServiceEmployee.status == "active", ) ) recipient_ids.update(employee.user_id for employee in employees.scalars()) for user_id in recipient_ids: await create_service_notification( session, recipient_user_id=user_id, notification_type=notification_type, title=title, body=body, service_center_id=service_center_id, appointment_id=appointment_id, ) def _recommendation_priority(car: Car, due_odometer: int | None, due_date: date | None) -> str: today = date.today() current_odometer = car.current_odometer or 0 if due_odometer is not None and current_odometer >= due_odometer: return "overdue" if due_date is not None and today >= due_date: return "overdue" if due_odometer is not None and due_odometer - current_odometer <= 1000: return "high" if due_date is not None and (due_date - today).days <= 14: return "high" return "medium" async def ensure_oil_recommendation(session: AsyncSession, car: Car) -> MaintenanceRecommendation | None: interval_km = car.oil_change_interval_km or 10_000 interval_months = car.oil_change_interval_months or 12 latest_entry = ( await session.execute( select(ServiceEntry) .where( ServiceEntry.car_id == car.id, or_( ServiceEntry.title.ilike("%масл%"), ServiceEntry.title.ilike("%oil%"), ServiceEntry.category.ilike("%масл%"), ServiceEntry.category.ilike("%oil%"), ), ) .order_by(ServiceEntry.entry_date.desc(), ServiceEntry.id.desc()) ) ).scalars().first() latest_work_item = ( await session.execute( select(ServiceWorkItem, ServiceVisit) .join(ServiceVisit, ServiceVisit.id == ServiceWorkItem.service_visit_id) .where( ServiceVisit.vehicle_id == car.id, or_( ServiceWorkItem.work_type.in_(["oil_change", "fluids"]), ServiceWorkItem.title.ilike("%масл%"), ServiceWorkItem.title.ilike("%oil%"), ), ) .order_by(ServiceVisit.visit_date.desc(), ServiceWorkItem.id.desc()) ) ).first() base_odometer = car.current_odometer or 0 base_date = date.today() if latest_entry: base_odometer = latest_entry.odometer or base_odometer base_date = latest_entry.entry_date or base_date if latest_work_item: work_item, visit = latest_work_item base_odometer = visit.odometer or base_odometer base_date = visit.visit_date or base_date if work_item.next_due_odometer: due_odometer = work_item.next_due_odometer else: due_odometer = base_odometer + interval_km due_date = work_item.next_due_date or (base_date + timedelta(days=interval_months * 30)) else: due_odometer = base_odometer + interval_km due_date = base_date + timedelta(days=interval_months * 30) existing = ( await session.execute( select(MaintenanceRecommendation).where( MaintenanceRecommendation.vehicle_id == car.id, MaintenanceRecommendation.recommendation_type == "oil_change", MaintenanceRecommendation.status == "active", ) ) ).scalar_one_or_none() priority = _recommendation_priority(car, due_odometer, due_date) payload = { "vehicle_id": car.id, "recommendation_type": "oil_change", "title": "Замена моторного масла", "description": "Плановая рекомендация по интервалу пробега и времени.", "due_odometer_km": due_odometer, "due_date": due_date, "priority": priority, "source": "mileage_interval", } if existing: for key, value in payload.items(): setattr(existing, key, value) await session.flush() return existing recommendation = MaintenanceRecommendation(**payload) session.add(recommendation) await session.flush() return recommendation def money_to_float(value: Decimal | None) -> float: return float(value or 0)