Files
drivers_bot/app/api/users.py
2026-05-12 04:36:30 +09:00

210 lines
7.3 KiB
Python

from datetime import date, timedelta
from fastapi import APIRouter, Depends, HTTPException, Request, status
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from app.core.config import settings
from app.db.session import get_session
from app.models.car import Car
from app.models.expense import ServiceEntry
from app.models.push import PushSubscription
from app.models.user import User
from app.schemas.user import (
AuthConfig,
PushSubscriptionCreate,
ReminderRead,
TelegramLoginRequest,
UserPreferencesUpdate,
UserRead,
UserUpsert,
WebAppAuthRequest,
)
from app.services.telegram_auth import verify_login_widget, verify_webapp_init_data
router = APIRouter(prefix="/users", tags=["users"])
def username_from_telegram(telegram_id: int, username: str | None = None) -> str:
return str(telegram_id) if not username else str(telegram_id)
async def upsert_telegram_user(
session: AsyncSession,
*,
telegram_id: int,
username: str | None = None,
first_name: str | None = None,
last_name: str | None = None,
locale: str | None = None,
currency: str | None = None,
) -> User:
result = await session.execute(select(User).where(User.telegram_id == telegram_id))
user = result.scalar_one_or_none()
payload = {
"telegram_id": telegram_id,
"username": username_from_telegram(telegram_id, username),
"first_name": first_name,
"last_name": last_name,
"locale": locale,
"currency": currency,
}
if user is None:
user = User(**{key: value for key, value in payload.items() if value is not None})
session.add(user)
else:
for field, value in payload.items():
if value is not None:
setattr(user, field, value)
await session.commit()
await session.refresh(user)
return user
@router.post("", response_model=UserRead)
async def upsert_user(payload: UserUpsert, session: AsyncSession = Depends(get_session)) -> User:
return await upsert_telegram_user(session, **payload.model_dump())
@router.get("/auth/config", response_model=AuthConfig)
async def auth_config() -> AuthConfig:
return AuthConfig(
bot_username=settings.bot_username or "seoulmate_officialbot",
vapid_public_key=settings.vapid_public_key or None,
)
@router.post("/webapp-auth", response_model=UserRead)
async def webapp_auth(
payload: WebAppAuthRequest, session: AsyncSession = Depends(get_session)
) -> User:
user_data = verify_webapp_init_data(payload.init_data, settings.bot_token)
telegram_id = int(user_data["id"])
return await upsert_telegram_user(
session,
telegram_id=telegram_id,
username=user_data.get("username"),
first_name=user_data.get("first_name"),
last_name=user_data.get("last_name"),
locale=user_data.get("language_code"),
)
@router.post("/telegram-login", response_model=UserRead)
async def telegram_login(
payload: TelegramLoginRequest, session: AsyncSession = Depends(get_session)
) -> User:
values = verify_login_widget(payload.model_dump(), settings.bot_token)
telegram_id = int(values["id"])
return await upsert_telegram_user(
session,
telegram_id=telegram_id,
username=values.get("username"),
first_name=values.get("first_name"),
last_name=values.get("last_name"),
)
@router.get("/telegram/{telegram_id}", response_model=UserRead)
async def get_user_by_telegram_id(
telegram_id: int, session: AsyncSession = Depends(get_session)
) -> User:
result = await session.execute(select(User).where(User.telegram_id == telegram_id))
return result.scalar_one()
@router.patch("/{user_id}/preferences", response_model=UserRead)
async def update_preferences(
user_id: int, payload: UserPreferencesUpdate, session: AsyncSession = Depends(get_session)
) -> User:
user = await session.get(User, user_id)
if user is None:
raise HTTPException(status_code=404, detail="User not found")
for field, value in payload.model_dump(exclude_none=True).items():
setattr(user, field, value)
await session.commit()
await session.refresh(user)
return user
@router.post("/{user_id}/push-subscriptions", status_code=status.HTTP_204_NO_CONTENT)
async def save_push_subscription(
user_id: int,
payload: PushSubscriptionCreate,
request: Request,
session: AsyncSession = Depends(get_session),
) -> None:
user = await session.get(User, user_id)
if user is None:
raise HTTPException(status_code=404, detail="User not found")
result = await session.execute(
select(PushSubscription).where(
PushSubscription.user_id == user_id,
PushSubscription.endpoint == payload.endpoint,
)
)
subscription = result.scalar_one_or_none()
keys = payload.keys or None
user_agent = payload.user_agent or request.headers.get("user-agent")
if subscription is None:
subscription = PushSubscription(
user_id=user_id,
endpoint=payload.endpoint,
p256dh=keys.p256dh if keys else None,
auth=keys.auth if keys else None,
user_agent=user_agent[:256] if user_agent else None,
)
session.add(subscription)
else:
subscription.p256dh = keys.p256dh if keys else subscription.p256dh
subscription.auth = keys.auth if keys else subscription.auth
subscription.user_agent = user_agent[:256] if user_agent else subscription.user_agent
await session.commit()
@router.get("/{user_id}/reminders", response_model=list[ReminderRead])
async def due_reminders(
user_id: int,
days: int = 30,
session: AsyncSession = Depends(get_session),
) -> list[ReminderRead]:
today = date.today()
horizon = today + timedelta(days=max(1, min(days, 180)))
stmt = (
select(ServiceEntry, Car)
.join(Car, ServiceEntry.car_id == Car.id)
.where(Car.owner_id == user_id)
.where(
(ServiceEntry.next_due_date.is_not(None) & (ServiceEntry.next_due_date <= horizon))
| (
ServiceEntry.next_due_odometer.is_not(None)
& Car.current_odometer.is_not(None)
& (ServiceEntry.next_due_odometer <= Car.current_odometer + 1000)
)
)
.order_by(ServiceEntry.next_due_date.asc().nulls_last())
)
rows = (await session.execute(stmt)).all()
reminders: list[ReminderRead] = []
for service, car in rows:
overdue_date = service.next_due_date is not None and service.next_due_date <= today
overdue_odo = (
service.next_due_odometer is not None
and car.current_odometer is not None
and service.next_due_odometer <= car.current_odometer
)
reminders.append(
ReminderRead(
id=service.id,
car_id=car.id,
car_name=car.name,
title=service.title,
service_type=service.service_type.value,
due_date=service.next_due_date.isoformat() if service.next_due_date else None,
due_odometer=service.next_due_odometer,
current_odometer=car.current_odometer,
priority="overdue" if overdue_date or overdue_odo else "soon",
)
)
return reminders