from datetime import date, timedelta from fastapi import APIRouter, Depends, Header, HTTPException, Request, status from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession from app.api.deps import ( get_current_telegram_user, get_or_create_telegram_user, require_internal_api_token, ) from app.core.config import settings from app.db.session import get_session from app.models.car import Car from app.models.expense import ServiceEntry from app.models.push import PushSubscription from app.models.user import User from app.schemas.user import ( AuthConfig, PushSubscriptionCreate, ReminderRead, TelegramLoginRequest, UserPreferencesUpdate, UserRead, UserUpsert, WebAppAuthRequest, ) from app.services.telegram_auth import verify_login_widget, verify_webapp_init_data router = APIRouter(prefix="/users", tags=["users"]) def username_from_telegram(telegram_id: int, username: str | None = None) -> str: return str(telegram_id) if not username else str(telegram_id) @router.post("", response_model=UserRead) async def upsert_user( payload: UserUpsert, session: AsyncSession = Depends(get_session), x_internal_api_token: str | None = Header(default=None, alias="X-Internal-API-Token"), ) -> User: require_internal_api_token(x_internal_api_token) return await get_or_create_telegram_user(session, **payload.model_dump()) @router.get("/auth/config", response_model=AuthConfig) async def auth_config() -> AuthConfig: return AuthConfig( bot_username=settings.bot_username or None, vapid_public_key=settings.vapid_public_key or None, app_env=settings.app_env, allow_dev_auth=settings.allow_dev_auth and not settings.is_production, ) @router.post("/webapp-auth", response_model=UserRead) async def webapp_auth( payload: WebAppAuthRequest, session: AsyncSession = Depends(get_session) ) -> User: user_data = verify_webapp_init_data(payload.init_data, settings.bot_token) telegram_id = int(user_data["id"]) return await get_or_create_telegram_user( session, telegram_id=telegram_id, username=user_data.get("username"), first_name=user_data.get("first_name"), last_name=user_data.get("last_name"), locale=user_data.get("language_code"), ) @router.post("/telegram-login", response_model=UserRead) async def telegram_login( payload: TelegramLoginRequest, session: AsyncSession = Depends(get_session) ) -> User: values = verify_login_widget(payload.model_dump(), settings.bot_token) telegram_id = int(values["id"]) return await get_or_create_telegram_user( session, telegram_id=telegram_id, username=values.get("username"), first_name=values.get("first_name"), last_name=values.get("last_name"), ) @router.get("/me", response_model=UserRead) async def current_user_profile(current_user: User = Depends(get_current_telegram_user)) -> User: return current_user @router.get("/telegram/{telegram_id}", response_model=UserRead) async def get_user_by_telegram_id( telegram_id: int, session: AsyncSession = Depends(get_session), x_internal_api_token: str | None = Header(default=None, alias="X-Internal-API-Token"), ) -> User: require_internal_api_token(x_internal_api_token) result = await session.execute(select(User).where(User.telegram_id == telegram_id)) user = result.scalar_one_or_none() if user is None: raise HTTPException(status_code=404, detail="User not found") return user @router.patch("/{user_id}/preferences", response_model=UserRead) async def update_preferences( user_id: int, payload: UserPreferencesUpdate, session: AsyncSession = Depends(get_session), current_user: User = Depends(get_current_telegram_user), ) -> User: if current_user.id != user_id: raise HTTPException(status_code=403, detail="Forbidden") user = current_user for field, value in payload.model_dump(exclude_none=True).items(): setattr(user, field, value) await session.commit() await session.refresh(user) return user @router.post("/{user_id}/push-subscriptions", status_code=status.HTTP_204_NO_CONTENT) async def save_push_subscription( user_id: int, payload: PushSubscriptionCreate, request: Request, session: AsyncSession = Depends(get_session), current_user: User = Depends(get_current_telegram_user), ) -> None: if current_user.id != user_id: raise HTTPException(status_code=403, detail="Forbidden") result = await session.execute( select(PushSubscription).where( PushSubscription.user_id == user_id, PushSubscription.endpoint == payload.endpoint, ) ) subscription = result.scalar_one_or_none() keys = payload.keys or None user_agent = payload.user_agent or request.headers.get("user-agent") if subscription is None: subscription = PushSubscription( user_id=user_id, endpoint=payload.endpoint, p256dh=keys.p256dh if keys else None, auth=keys.auth if keys else None, user_agent=user_agent[:256] if user_agent else None, ) session.add(subscription) else: subscription.p256dh = keys.p256dh if keys else subscription.p256dh subscription.auth = keys.auth if keys else subscription.auth subscription.user_agent = user_agent[:256] if user_agent else subscription.user_agent await session.commit() @router.get("/{user_id}/reminders", response_model=list[ReminderRead]) async def due_reminders( user_id: int, days: int = 30, limit: int = 50, offset: int = 0, session: AsyncSession = Depends(get_session), current_user: User = Depends(get_current_telegram_user), ) -> list[ReminderRead]: if current_user.id != user_id: raise HTTPException(status_code=403, detail="Forbidden") limit = min(max(limit, 1), 200) offset = max(offset, 0) today = date.today() horizon = today + timedelta(days=max(1, min(days, 180))) stmt = ( select(ServiceEntry, Car) .join(Car, ServiceEntry.car_id == Car.id) .where(Car.owner_id == user_id) .where( (ServiceEntry.next_due_date.is_not(None) & (ServiceEntry.next_due_date <= horizon)) | ( ServiceEntry.next_due_odometer.is_not(None) & Car.current_odometer.is_not(None) & (ServiceEntry.next_due_odometer <= Car.current_odometer + 1000) ) ) .order_by(ServiceEntry.next_due_date.asc().nulls_last()) .limit(limit) .offset(offset) ) rows = (await session.execute(stmt)).all() reminders: list[ReminderRead] = [] for service, car in rows: overdue_date = service.next_due_date is not None and service.next_due_date <= today overdue_odo = ( service.next_due_odometer is not None and car.current_odometer is not None and service.next_due_odometer <= car.current_odometer ) reminders.append( ReminderRead( id=service.id, car_id=car.id, car_name=car.name, title=service.title, service_type=service.service_type.value, due_date=service.next_due_date.isoformat() if service.next_due_date else None, due_odometer=service.next_due_odometer, current_odometer=car.current_odometer, priority="overdue" if overdue_date or overdue_odo else "soon", ) ) return reminders