Files
drivers_bot/bot/api_client.py
VPN SaaS Dev 8982299e71
Some checks failed
ci / test (pull_request) Has been cancelled
add admin data mutations and load check
2026-05-18 18:37:19 +09:00

167 lines
6.6 KiB
Python

from typing import Any
import httpx
from app.core.config import settings
class ApiClient:
def __init__(self) -> None:
self.base_url = settings.api_base_url.rstrip("/")
self._client: httpx.AsyncClient | None = None
@property
def client(self) -> httpx.AsyncClient:
if self._client is None or self._client.is_closed:
self._client = httpx.AsyncClient(
base_url=self.base_url,
timeout=httpx.Timeout(15.0, connect=5.0),
limits=httpx.Limits(max_connections=100, max_keepalive_connections=20),
)
return self._client
async def close(self) -> None:
if self._client is not None:
await self._client.aclose()
def headers(self, telegram_id: int | None = None) -> dict[str, str]:
headers = {"X-Internal-API-Token": settings.internal_api_token}
if telegram_id is not None:
headers["X-Telegram-User-Id"] = str(telegram_id)
return headers
async def request(
self,
method: str,
path: str,
*,
telegram_id: int | None = None,
json: dict[str, Any] | None = None,
params: dict[str, Any] | None = None,
) -> Any:
response = await self.client.request(
method,
path,
json=json,
params=params,
headers=self.headers(telegram_id),
)
response.raise_for_status()
if response.status_code == 204:
return None
return response.json()
async def upsert_user(self, telegram_user: Any) -> dict[str, Any]:
payload = {
"telegram_id": telegram_user.id,
"username": telegram_user.username,
"first_name": telegram_user.first_name,
"last_name": telegram_user.last_name,
}
response = await self.client.post("/api/users", json=payload, headers=self.headers())
response.raise_for_status()
return response.json()
async def list_cars(self, owner_id: int, telegram_id: int) -> list[dict[str, Any]]:
response = await self.client.get(
"/api/cars", params={"owner_id": owner_id}, headers=self.headers(telegram_id)
)
response.raise_for_status()
return response.json()
async def create_car(self, owner_id: int, name: str, telegram_id: int) -> dict[str, Any]:
response = await self.client.post(
"/api/cars",
json={"owner_id": owner_id, "name": name},
headers=self.headers(telegram_id),
)
response.raise_for_status()
return response.json()
async def stats(self, car_id: int, telegram_id: int) -> dict[str, Any]:
response = await self.client.get(f"/api/cars/{car_id}/stats", headers=self.headers(telegram_id))
response.raise_for_status()
return response.json()
async def create_fuel(self, telegram_id: int, payload: dict[str, Any]) -> dict[str, Any]:
return await self.request("POST", "/api/fuel", telegram_id=telegram_id, json=payload)
async def create_service(self, telegram_id: int, payload: dict[str, Any]) -> dict[str, Any]:
return await self.request("POST", "/api/service", telegram_id=telegram_id, json=payload)
async def create_expense(self, telegram_id: int, payload: dict[str, Any]) -> dict[str, Any]:
return await self.request("POST", "/api/expenses", telegram_id=telegram_id, json=payload)
async def parse_record(self, telegram_id: int, text: str) -> dict[str, Any]:
return await self.request("POST", "/api/parse/record", telegram_id=telegram_id, json={"text": text})
async def public_service_centers(self, telegram_id: int) -> list[dict[str, Any]]:
return await self.request("GET", "/api/service-centers/public", telegram_id=telegram_id)
async def sto_catalog(self, telegram_id: int) -> list[dict[str, Any]]:
return await self.request("GET", "/api/sto/catalog", telegram_id=telegram_id)
async def my_appointments(self, telegram_id: int) -> list[dict[str, Any]]:
return await self.request("GET", "/api/appointments/my", telegram_id=telegram_id)
async def sto_dashboard(self, telegram_id: int, service_center_id: int) -> dict[str, Any]:
return await self.request(
"GET",
"/api/sto/dashboard",
telegram_id=telegram_id,
params={"service_center_id": service_center_id},
)
async def sto_appointments(self, telegram_id: int, service_center_id: int) -> list[dict[str, Any]]:
return await self.request(
"GET",
"/api/sto/appointments",
telegram_id=telegram_id,
params={"service_center_id": service_center_id, "status": "requested"},
)
async def my_service_centers(self, telegram_id: int) -> list[dict[str, Any]]:
return await self.request("GET", "/api/service-centers/my", telegram_id=telegram_id)
async def register_service_center(self, telegram_id: int, payload: dict[str, Any]) -> dict[str, Any]:
return await self.request("POST", "/api/service-centers", telegram_id=telegram_id, json=payload)
async def accept_sto_invite(self, telegram_id: int, invite_token: str) -> dict[str, Any]:
return await self.request(
"POST",
f"/api/service-centers/employees/invites/{invite_token}/accept",
telegram_id=telegram_id,
)
async def pending_service_centers(self, telegram_id: int) -> list[dict[str, Any]]:
return await self.request("GET", "/api/admin/service-centers/pending", telegram_id=telegram_id)
async def admin_dashboard(self, telegram_id: int) -> dict[str, Any]:
return await self.request("GET", "/api/admin/dashboard", telegram_id=telegram_id)
async def admin_users(self, telegram_id: int) -> dict[str, Any]:
return await self.request("GET", "/api/admin/users", telegram_id=telegram_id, params={"limit": 10})
async def admin_alerts(self, telegram_id: int) -> dict[str, Any]:
return await self.request("GET", "/api/admin/notifications", telegram_id=telegram_id, params={"limit": 10})
async def moderate_service_center(
self,
telegram_id: int,
service_center_id: int,
action: str,
payload: dict[str, Any] | None = None,
) -> dict[str, Any]:
endpoint = {
"approve": "verify",
"reject": "reject",
"suspend": "suspend",
"changes": "request-changes",
}[action]
return await self.request(
"POST",
f"/api/admin/service-centers/{service_center_id}/{endpoint}",
telegram_id=telegram_id,
json=payload or {},
)