Files
marriage/.history/scripts/api_e2e_20250808215528.py
Andrey K. Choi cc87dcc0fa
Some checks failed
continuous-integration/drone/push Build is failing
api development
2025-08-08 21:58:36 +09:00

425 lines
17 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import base64
import json
import logging
import os
import random
import string
import sys
import time
from dataclasses import dataclass
from logging.handlers import RotatingFileHandler
from typing import Any, Dict, Iterable, List, Optional, Tuple
from urllib.parse import urljoin
import requests
from faker import Faker
# -------------------------
# Конфигурация по умолчанию
# -------------------------
DEFAULT_BASE_URL = os.getenv("BASE_URL", "http://localhost:8080")
DEFAULT_PASSWORD = os.getenv("PASS", "secret123")
DEFAULT_CLIENTS = int(os.getenv("CLIENTS", "2"))
DEFAULT_EMAIL_DOMAIN = os.getenv("EMAIL_DOMAIN", "agency.dev")
DEFAULT_LOG_FILE = os.getenv("LOG_FILE", "logs/api.log")
DEFAULT_TIMEOUT = float(os.getenv("HTTP_TIMEOUT", "10.0"))
# -------------------------
# Логирование
# -------------------------
def setup_logger(path: str) -> logging.Logger:
os.makedirs(os.path.dirname(path), exist_ok=True)
logger = logging.getLogger("api_e2e")
logger.setLevel(logging.DEBUG)
# Ротация логов: до 5 файлов по 5 МБ
file_handler = RotatingFileHandler(path, maxBytes=5 * 1024 * 1024, backupCount=5, encoding="utf-8")
file_handler.setLevel(logging.DEBUG)
file_handler.setFormatter(logging.Formatter(
fmt="%(asctime)s | %(levelname)s | %(name)s | %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
))
logger.addHandler(file_handler)
# Консоль — INFO и короче
console = logging.StreamHandler(sys.stdout)
console.setLevel(logging.INFO)
console.setFormatter(logging.Formatter("%(levelname)s | %(message)s"))
logger.addHandler(console)
return logger
# -------------------------
# Утилиты
# -------------------------
def b64url_json(token_part: str) -> Dict[str, Any]:
"""Декодирует часть JWT (payload) без валидации сигнатуры."""
s = token_part + "=" * (-len(token_part) % 4)
return json.loads(base64.urlsafe_b64decode(s).decode("utf-8"))
def decode_jwt_sub(token: str) -> str:
try:
payload = b64url_json(token.split(".")[1])
return str(payload.get("sub", "")) # UUID пользователя
except Exception:
return ""
def mask_token(token: Optional[str]) -> str:
if not token:
return ""
return token[:12] + "..." if len(token) > 12 else token
def now_ms() -> int:
return int(time.time() * 1000)
@dataclass
class UserCreds:
id: str
email: str
access_token: str
role: str
# -------------------------
# Класс-клиент
# -------------------------
class APIE2E:
def __init__(self, base_url: str, logger: logging.Logger, timeout: float = DEFAULT_TIMEOUT) -> None:
self.base_url = base_url.rstrip("/") + "/"
self.logger = logger
self.timeout = timeout
self.sess = requests.Session()
self.urls = {
"auth": urljoin(self.base_url, "auth/"),
"profiles": urljoin(self.base_url, "profiles/"),
"match": urljoin(self.base_url, "match/"),
"chat": urljoin(self.base_url, "chat/"),
"payments": urljoin(self.base_url, "payments/"),
}
# --------- низкоуровневый запрос с логированием ----------
def req(self, method, url, body=None, token=None, expected=(200,), name=""):
headers = {"Content-Type": "application/json"}
if token:
headers["Authorization"] = f"Bearer {token}"
# Готовим запрос, чтобы увидеть финальные заголовки
req = requests.Request(method, url,
headers=headers,
data=(json.dumps(body) if body is not None else None))
prep = self.session.prepare_request(req)
# ЛОГ: какие заголовки действительно уйдут
self.log.debug("HTTP %s %s | headers=%s | body=%s",
method, url,
{k: (v if k.lower() != "authorization" else f"Bearer {v.split(' ',1)[1][:10]}...") for k,v in prep.headers.items()},
(body if body is not None else {}))
t0 = time.time()
resp = self.session.send(prep,
allow_redirects=False, # ВАЖНО
timeout=15)
dt = int((time.time()-t0)*1000)
# ЛОГ: редиректы, если были
if resp.is_redirect or resp.is_permanent_redirect or resp.history:
self.log.warning("%s got redirect chain: %s",
name or url,
" -> ".join(f"{r.status_code}:{r.headers.get('Location','')}" for r in resp.history+[resp]))
text = resp.text
self.log.debug("%s in %d ms | body=%s", resp.status_code, dt, text[:1000])
if resp.status_code not in expected:
raise RuntimeError(f"{name or url} unexpected status {resp.status_code}, expected {list(expected)}; body={text}")
data = None
try:
data = resp.json() if text else None
except Exception:
pass
return resp.status_code, data, resp.headers
# --------- health ----------
def wait_health(self, name: str, url: str, timeout_sec: int = 60) -> None:
self.logger.info(f"Waiting {name} health: {url}")
deadline = time.time() + timeout_sec
while time.time() < deadline:
try:
code, _, _ = self.req("GET", url, expected=(200,), name=f"{name}/health")
if code == 200:
self.logger.info(f"{name} is healthy")
return
except Exception:
pass
time.sleep(1)
raise TimeoutError(f"{name} not healthy in time: {url}")
# --------- auth ----------
def login(self, email: str, password: str) -> Tuple[str, str]:
url = urljoin(self.urls["auth"], "v1/token")
_, data, _ = self.req("POST", url, body={"email": email, "password": password}, expected=(200,), name="login")
token = data.get("access_token", "")
if not token:
raise RuntimeError("access_token is empty")
user_id = decode_jwt_sub(token)
if not user_id:
raise RuntimeError("cannot decode user id (sub) from token")
return user_id, token
def register(self, email: str, password: str, full_name: str, role: str) -> None:
url = urljoin(self.urls["auth"], "v1/register")
# /register в вашем бэке может возвращать 201/200, а иногда 500 при сериализации —
# поэтому не падаем на 500 сразу, а логинимся ниже.
try:
self.req(
"POST",
url,
body={"email": email, "password": password, "full_name": full_name, "role": role},
expected=(200, 201),
name="register",
)
except RuntimeError as e:
self.logger.warning(f"register returned non-2xx: {e} — will try login anyway")
def login_or_register(self, email: str, password: str, full_name: str, role: str) -> UserCreds:
# 1) пробуем логин
try:
uid, token = self.login(email, password)
self.logger.info(f"Login OK: {email} -> {uid}")
return UserCreds(id=uid, email=email, access_token=token, role=role)
except Exception as e:
self.logger.info(f"Login failed for {email}: {e}; will try register")
# 2) регистрируем (не фатально, если вернулся 500)
self.register(email, password, full_name, role)
# 3) снова логин
uid, token = self.login(email, password)
self.logger.info(f"Registered+Login OK: {email} -> {uid}")
return UserCreds(id=uid, email=email, access_token=token, role=role)
# --------- profiles ----------
def get_my_profile(self, token: str) -> Tuple[int, Dict[str, Any]]:
url = urljoin(self.urls["profiles"], "v1/profiles/me")
code, data, _ = self.req("GET", url, token=token, expected=(200, 404), name="profiles/me")
return code, data
def create_profile(
self,
token: str,
gender: str,
city: str,
languages: List[str],
interests: List[str],
) -> Dict[str, Any]:
url = urljoin(self.urls["profiles"], "v1/profiles")
_, data, _ = self.req(
"POST",
url,
token=token,
body={"gender": gender, "city": city, "languages": languages, "interests": interests},
expected=(200, 201),
name="profiles/create",
)
return data
def ensure_profile(
self, token: str, gender: str, city: str, languages: List[str], interests: List[str]
) -> Dict[str, Any]:
code, p = self.get_my_profile(token)
if code == 200:
self.logger.info(f"Profile exists: id={p.get('id')}")
return p
self.logger.info("Profile not found -> creating")
p = self.create_profile(token, gender, city, languages, interests)
self.logger.info(f"Profile created: id={p.get('id')}")
return p
# --------- match ----------
def create_pair(self, admin_token: str, user_id_a: str, user_id_b: str, score: float, notes: str) -> Dict[str, Any]:
url = urljoin(self.urls["match"], "v1/pairs")
_, data, _ = self.req(
"POST",
url,
token=admin_token,
body={"user_id_a": user_id_a, "user_id_b": user_id_b, "score": score, "notes": notes},
expected=(200, 201),
name="match/create_pair",
)
return data
# --------- chat ----------
def create_room(self, admin_token: str, title: str, participants: List[str]) -> Dict[str, Any]:
url = urljoin(self.urls["chat"], "v1/rooms")
_, data, _ = self.req(
"POST",
url,
token=admin_token,
body={"title": title, "participants": participants},
expected=(200, 201),
name="chat/create_room",
)
return data
def send_message(self, admin_token: str, room_id: str, content: str) -> Dict[str, Any]:
url = urljoin(self.urls["chat"], f"v1/rooms/{room_id}/messages")
_, data, _ = self.req(
"POST",
url,
token=admin_token,
body={"content": content},
expected=(200, 201),
name="chat/send_message",
)
return data
# --------- payments ----------
def create_invoice(
self, admin_token: str, client_id: str, amount: float, currency: str, description: str
) -> Dict[str, Any]:
url = urljoin(self.urls["payments"], "v1/invoices")
_, data, _ = self.req(
"POST",
url,
token=admin_token,
body={"client_id": client_id, "amount": amount, "currency": currency, "description": description},
expected=(200, 201),
name="payments/create_invoice",
)
return data
def mark_invoice_paid(self, admin_token: str, invoice_id: str) -> Dict[str, Any]:
url = urljoin(self.urls["payments"], f"v1/invoices/{invoice_id}/mark-paid")
_, data, _ = self.req("POST", url, token=admin_token, expected=(200,), name="payments/mark_paid")
return data
# -------------------------
# Генерация данных
# -------------------------
GENDERS = ["female", "male", "other"]
CITIES = [
"Moscow", "Saint Petersburg", "Kazan", "Novosibirsk", "Krasnodar",
"Yekaterinburg", "Minsk", "Kyiv", "Almaty", "Tbilisi",
]
LANG_POOL = ["ru", "en", "de", "fr", "es", "it", "zh", "uk", "kk", "tr", "pl"]
INTR_POOL = ["music", "travel", "reading", "sports", "movies", "games", "cooking", "yoga", "art", "tech"]
def pick_languages(n: int = 2) -> List[str]:
n = max(1, min(n, len(LANG_POOL)))
return sorted(random.sample(LANG_POOL, n))
def pick_interests(n: int = 3) -> List[str]:
n = max(1, min(n, len(INTR_POOL)))
return sorted(random.sample(INTR_POOL, n))
def random_email(prefix: str, domain: str) -> str:
suffix = "".join(random.choices(string.ascii_lowercase + string.digits, k=6))
return f"{prefix}+{int(time.time())}.{suffix}@{domain}"
# -------------------------
# Основной сценарий
# -------------------------
def main():
import argparse
parser = argparse.ArgumentParser(description="E2E тест API брачного агентства с фейковыми анкетами.")
parser.add_argument("--base-url", default=DEFAULT_BASE_URL, help="Gateway base URL (по умолчанию http://localhost:8080)")
parser.add_argument("--clients", type=int, default=DEFAULT_CLIENTS, help="Количество клиентских аккаунтов (>=2)")
parser.add_argument("--password", default=DEFAULT_PASSWORD, help="Пароль для всех тестовых пользователей")
parser.add_argument("--email-domain", default=DEFAULT_EMAIL_DOMAIN, help="Домен e-mail для теста (напр. agency.dev)")
parser.add_argument("--log-file", default=DEFAULT_LOG_FILE, help="Файл логов (по умолчанию logs/api.log)")
parser.add_argument("--seed", type=int, default=42, help="Генератор случайных чисел (seed)")
args = parser.parse_args()
random.seed(args.seed)
fake = Faker()
logger = setup_logger(args.log_file)
logger.info("=== API E2E START ===")
logger.info(f"BASE_URL={args.base_url} clients={args.clients} domain={args.email_domain}")
if args.clients < 2:
logger.error("Нужно минимум 2 клиента (для пары).")
sys.exit(2)
api = APIE2E(args.base_url, logger)
# Health checks через gateway
api.wait_health("gateway/auth", urljoin(api.urls["auth"], "health"))
api.wait_health("profiles", urljoin(api.urls["profiles"], "health"))
api.wait_health("match", urljoin(api.urls["match"], "health"))
api.wait_health("chat", urljoin(api.urls["chat"], "health"))
api.wait_health("payments", urljoin(api.urls["payments"], "health"))
# Админ
admin_email = random_email("admin", args.email_domain)
admin_full = fake.name()
admin = api.login_or_register(admin_email, args.password, admin_full, role="ADMIN")
# Клиенты
clients: List[UserCreds] = []
for i in range(args.clients):
email = random_email(f"user{i+1}", args.email_domain)
full = fake.name()
u = api.login_or_register(email, args.password, full, role="CLIENT")
clients.append(u)
# Профили для всех
for i, u in enumerate([admin] + clients, start=1):
gender = random.choice(GENDERS)
city = random.choice(CITIES)
languages = pick_languages(random.choice([1, 2, 3]))
interests = pick_interests(random.choice([2, 3, 4]))
logger.info(f"[{i}/{1+len(clients)}] Ensure profile for {u.email} (role={u.role})")
api.ensure_profile(u.access_token, gender, city, languages, interests)
# Matchпара между двумя случайными клиентами
a, b = random.sample(clients, 2)
score = round(random.uniform(0.6, 0.98), 2)
pair = api.create_pair(admin.access_token, a.id, b.id, score, notes="e2e generated")
pair_id = str(pair.get("id", ""))
logger.info(f"Match pair created: id={pair_id}, {a.email}{b.email}, score={score}")
# Чат‑комната и сообщение
room = api.create_room(admin.access_token, title=f"{a.email} & {b.email}", participants=[a.id, b.id])
room_id = str(room.get("id", ""))
msg = api.send_message(admin.access_token, room_id, content="Hello from admin (e2e)")
msg_id = str(msg.get("id", ""))
logger.info(f"Chat message sent: room={room_id}, msg={msg_id}")
# Счёт для первого клиента
amount = random.choice([99.0, 199.0, 299.0])
inv = api.create_invoice(admin.access_token, client_id=a.id, amount=amount, currency="USD",
description="Consultation (e2e)")
inv_id = str(inv.get("id", ""))
invp = api.mark_invoice_paid(admin.access_token, inv_id)
logger.info(f"Invoice paid: id={inv_id}, status={invp.get('status')}")
# Итог
summary = {
"admin": {"email": admin.email, "id": admin.id},
"clients": [{"email": c.email, "id": c.id} for c in clients],
"pair_id": pair_id,
"room_id": room_id,
"message_id": msg_id,
"invoice_id": inv_id,
"invoice_status": invp.get("status"),
}
logger.info("=== SUMMARY ===")
logger.info(json.dumps(summary, ensure_ascii=False, indent=2))
print(json.dumps(summary, ensure_ascii=False, indent=2)) # на stdout — короткое резюме
if __name__ == "__main__":
try:
main()
except KeyboardInterrupt:
print("\nInterrupted.", file=sys.stderr)
sys.exit(130)