Files
drivers_bot/bot/api_client.py
2026-05-12 19:14:21 +09:00

53 lines
2.1 KiB
Python

from typing import Any
import httpx
from app.core.config import settings
class ApiClient:
def __init__(self) -> None:
self.base_url = settings.api_base_url.rstrip("/")
def headers(self, telegram_id: int | None = None) -> dict[str, str]:
headers = {"X-Internal-API-Token": settings.internal_api_token}
if telegram_id is not None:
headers["X-Telegram-User-Id"] = str(telegram_id)
return headers
async def upsert_user(self, telegram_user: Any) -> dict[str, Any]:
payload = {
"telegram_id": telegram_user.id,
"username": telegram_user.username,
"first_name": telegram_user.first_name,
"last_name": telegram_user.last_name,
}
async with httpx.AsyncClient(base_url=self.base_url, timeout=10) as client:
response = await client.post("/api/users", json=payload, headers=self.headers())
response.raise_for_status()
return response.json()
async def list_cars(self, owner_id: int, telegram_id: int) -> list[dict[str, Any]]:
async with httpx.AsyncClient(base_url=self.base_url, timeout=10) as client:
response = await client.get(
"/api/cars", params={"owner_id": owner_id}, headers=self.headers(telegram_id)
)
response.raise_for_status()
return response.json()
async def create_car(self, owner_id: int, name: str, telegram_id: int) -> dict[str, Any]:
async with httpx.AsyncClient(base_url=self.base_url, timeout=10) as client:
response = await client.post(
"/api/cars",
json={"owner_id": owner_id, "name": name},
headers=self.headers(telegram_id),
)
response.raise_for_status()
return response.json()
async def stats(self, car_id: int, telegram_id: int) -> dict[str, Any]:
async with httpx.AsyncClient(base_url=self.base_url, timeout=10) as client:
response = await client.get(f"/api/cars/{car_id}/stats", headers=self.headers(telegram_id))
response.raise_for_status()
return response.json()