add admin data mutations and load check
Some checks failed
ci / test (pull_request) Has been cancelled

This commit is contained in:
VPN SaaS Dev
2026-05-18 18:37:19 +09:00
parent 59bc6ebd4f
commit 8982299e71
9 changed files with 650 additions and 44 deletions

View File

@@ -8,6 +8,21 @@ from app.core.config import settings
class ApiClient:
def __init__(self) -> None:
self.base_url = settings.api_base_url.rstrip("/")
self._client: httpx.AsyncClient | None = None
@property
def client(self) -> httpx.AsyncClient:
if self._client is None or self._client.is_closed:
self._client = httpx.AsyncClient(
base_url=self.base_url,
timeout=httpx.Timeout(15.0, connect=5.0),
limits=httpx.Limits(max_connections=100, max_keepalive_connections=20),
)
return self._client
async def close(self) -> None:
if self._client is not None:
await self._client.aclose()
def headers(self, telegram_id: int | None = None) -> dict[str, str]:
headers = {"X-Internal-API-Token": settings.internal_api_token}
@@ -24,18 +39,17 @@ class ApiClient:
json: dict[str, Any] | None = None,
params: dict[str, Any] | None = None,
) -> Any:
async with httpx.AsyncClient(base_url=self.base_url, timeout=15) as client:
response = await client.request(
method,
path,
json=json,
params=params,
headers=self.headers(telegram_id),
)
response.raise_for_status()
if response.status_code == 204:
return None
return response.json()
response = await self.client.request(
method,
path,
json=json,
params=params,
headers=self.headers(telegram_id),
)
response.raise_for_status()
if response.status_code == 204:
return None
return response.json()
async def upsert_user(self, telegram_user: Any) -> dict[str, Any]:
payload = {
@@ -44,34 +58,30 @@ class ApiClient:
"first_name": telegram_user.first_name,
"last_name": telegram_user.last_name,
}
async with httpx.AsyncClient(base_url=self.base_url, timeout=10) as client:
response = await client.post("/api/users", json=payload, headers=self.headers())
response.raise_for_status()
return response.json()
response = await self.client.post("/api/users", json=payload, headers=self.headers())
response.raise_for_status()
return response.json()
async def list_cars(self, owner_id: int, telegram_id: int) -> list[dict[str, Any]]:
async with httpx.AsyncClient(base_url=self.base_url, timeout=10) as client:
response = await client.get(
"/api/cars", params={"owner_id": owner_id}, headers=self.headers(telegram_id)
)
response.raise_for_status()
return response.json()
response = await self.client.get(
"/api/cars", params={"owner_id": owner_id}, headers=self.headers(telegram_id)
)
response.raise_for_status()
return response.json()
async def create_car(self, owner_id: int, name: str, telegram_id: int) -> dict[str, Any]:
async with httpx.AsyncClient(base_url=self.base_url, timeout=10) as client:
response = await client.post(
"/api/cars",
json={"owner_id": owner_id, "name": name},
headers=self.headers(telegram_id),
)
response.raise_for_status()
return response.json()
response = await self.client.post(
"/api/cars",
json={"owner_id": owner_id, "name": name},
headers=self.headers(telegram_id),
)
response.raise_for_status()
return response.json()
async def stats(self, car_id: int, telegram_id: int) -> dict[str, Any]:
async with httpx.AsyncClient(base_url=self.base_url, timeout=10) as client:
response = await client.get(f"/api/cars/{car_id}/stats", headers=self.headers(telegram_id))
response.raise_for_status()
return response.json()
response = await self.client.get(f"/api/cars/{car_id}/stats", headers=self.headers(telegram_id))
response.raise_for_status()
return response.json()
async def create_fuel(self, telegram_id: int, payload: dict[str, Any]) -> dict[str, Any]:
return await self.request("POST", "/api/fuel", telegram_id=telegram_id, json=payload)

View File

@@ -782,7 +782,11 @@ async def main() -> None:
raise RuntimeError("INTERNAL_API_TOKEN is empty")
settings.validate_webapp_url_for_telegram()
bot = Bot(settings.bot_token)
await dp.start_polling(bot)
try:
await dp.start_polling(bot)
finally:
await api.close()
await bot.session.close()
if __name__ == "__main__":