Files
drivers_bot/scripts/load_check.py
VPN SaaS Dev 8982299e71
Some checks failed
ci / test (pull_request) Has been cancelled
add admin data mutations and load check
2026-05-18 18:37:19 +09:00

82 lines
3.0 KiB
Python

#!/usr/bin/env python3
from __future__ import annotations
import argparse
import asyncio
import statistics
import time
from dataclasses import dataclass
import httpx
@dataclass
class Result:
path: str
status: int | None
elapsed_ms: float
error: str | None = None
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description="Small HTTP concurrency smoke test for CarPass.")
parser.add_argument("--base-url", default="http://127.0.0.1:8000")
parser.add_argument("--requests", type=int, default=200)
parser.add_argument("--concurrency", type=int, default=25)
parser.add_argument(
"--path",
action="append",
dest="paths",
default=None,
help="Path to request. Can be repeated.",
)
parser.add_argument("--timeout", type=float, default=10.0)
return parser.parse_args()
async def fetch(client: httpx.AsyncClient, semaphore: asyncio.Semaphore, path: str) -> Result:
async with semaphore:
started = time.perf_counter()
try:
response = await client.get(path)
elapsed_ms = (time.perf_counter() - started) * 1000
return Result(path=path, status=response.status_code, elapsed_ms=elapsed_ms)
except Exception as error: # noqa: BLE001
elapsed_ms = (time.perf_counter() - started) * 1000
return Result(path=path, status=None, elapsed_ms=elapsed_ms, error=str(error))
async def run() -> int:
args = parse_args()
paths = args.paths or ["/health", "/ready", "/", "/admin.html", "/sto.html"]
semaphore = asyncio.Semaphore(max(args.concurrency, 1))
limits = httpx.Limits(
max_connections=max(args.concurrency * 2, 10),
max_keepalive_connections=max(args.concurrency, 10),
)
timeout = httpx.Timeout(args.timeout, connect=min(args.timeout, 5.0))
started = time.perf_counter()
async with httpx.AsyncClient(base_url=args.base_url.rstrip("/"), timeout=timeout, limits=limits) as client:
tasks = [fetch(client, semaphore, paths[index % len(paths)]) for index in range(args.requests)]
results = await asyncio.gather(*tasks)
elapsed = time.perf_counter() - started
failures = [result for result in results if result.error or not result.status or result.status >= 500]
latencies = [result.elapsed_ms for result in results]
p95 = statistics.quantiles(latencies, n=20)[18] if len(latencies) >= 20 else max(latencies, default=0)
print(
"load_check "
f"base_url={args.base_url} requests={len(results)} concurrency={args.concurrency} "
f"ok={len(results) - len(failures)} failures={len(failures)} "
f"rps={len(results) / elapsed:.2f} avg_ms={statistics.fmean(latencies):.1f} "
f"p95_ms={p95:.1f} max_ms={max(latencies, default=0):.1f}"
)
if failures:
for result in failures[:10]:
print(f"failure path={result.path} status={result.status} error={result.error or '-'}")
return 1
return 0
if __name__ == "__main__":
raise SystemExit(asyncio.run(run()))