82 lines
3.0 KiB
Python
82 lines
3.0 KiB
Python
#!/usr/bin/env python3
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import asyncio
|
|
import statistics
|
|
import time
|
|
from dataclasses import dataclass
|
|
|
|
import httpx
|
|
|
|
|
|
@dataclass
|
|
class Result:
|
|
path: str
|
|
status: int | None
|
|
elapsed_ms: float
|
|
error: str | None = None
|
|
|
|
|
|
def parse_args() -> argparse.Namespace:
|
|
parser = argparse.ArgumentParser(description="Small HTTP concurrency smoke test for CarPass.")
|
|
parser.add_argument("--base-url", default="http://127.0.0.1:8000")
|
|
parser.add_argument("--requests", type=int, default=200)
|
|
parser.add_argument("--concurrency", type=int, default=25)
|
|
parser.add_argument(
|
|
"--path",
|
|
action="append",
|
|
dest="paths",
|
|
default=None,
|
|
help="Path to request. Can be repeated.",
|
|
)
|
|
parser.add_argument("--timeout", type=float, default=10.0)
|
|
return parser.parse_args()
|
|
|
|
|
|
async def fetch(client: httpx.AsyncClient, semaphore: asyncio.Semaphore, path: str) -> Result:
|
|
async with semaphore:
|
|
started = time.perf_counter()
|
|
try:
|
|
response = await client.get(path)
|
|
elapsed_ms = (time.perf_counter() - started) * 1000
|
|
return Result(path=path, status=response.status_code, elapsed_ms=elapsed_ms)
|
|
except Exception as error: # noqa: BLE001
|
|
elapsed_ms = (time.perf_counter() - started) * 1000
|
|
return Result(path=path, status=None, elapsed_ms=elapsed_ms, error=str(error))
|
|
|
|
|
|
async def run() -> int:
|
|
args = parse_args()
|
|
paths = args.paths or ["/health", "/ready", "/", "/admin.html", "/sto.html"]
|
|
semaphore = asyncio.Semaphore(max(args.concurrency, 1))
|
|
limits = httpx.Limits(
|
|
max_connections=max(args.concurrency * 2, 10),
|
|
max_keepalive_connections=max(args.concurrency, 10),
|
|
)
|
|
timeout = httpx.Timeout(args.timeout, connect=min(args.timeout, 5.0))
|
|
started = time.perf_counter()
|
|
async with httpx.AsyncClient(base_url=args.base_url.rstrip("/"), timeout=timeout, limits=limits) as client:
|
|
tasks = [fetch(client, semaphore, paths[index % len(paths)]) for index in range(args.requests)]
|
|
results = await asyncio.gather(*tasks)
|
|
elapsed = time.perf_counter() - started
|
|
failures = [result for result in results if result.error or not result.status or result.status >= 500]
|
|
latencies = [result.elapsed_ms for result in results]
|
|
p95 = statistics.quantiles(latencies, n=20)[18] if len(latencies) >= 20 else max(latencies, default=0)
|
|
print(
|
|
"load_check "
|
|
f"base_url={args.base_url} requests={len(results)} concurrency={args.concurrency} "
|
|
f"ok={len(results) - len(failures)} failures={len(failures)} "
|
|
f"rps={len(results) / elapsed:.2f} avg_ms={statistics.fmean(latencies):.1f} "
|
|
f"p95_ms={p95:.1f} max_ms={max(latencies, default=0):.1f}"
|
|
)
|
|
if failures:
|
|
for result in failures[:10]:
|
|
print(f"failure path={result.path} status={result.status} error={result.error or '-'}")
|
|
return 1
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
raise SystemExit(asyncio.run(run()))
|