Files
marriage/scripts/api_e2e.py
Andrey K. Choi 7ecc556c77
Some checks failed
continuous-integration/drone/push Build is failing
API fully functional
Api docs (SWAGGER, REDOC) available
2025-08-09 23:36:52 +09:00

423 lines
17 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import base64
import json
import logging
import os
import random
import string
import sys
import time
from dataclasses import dataclass
from logging.handlers import RotatingFileHandler
from typing import Any, Dict, Iterable, List, Optional, Tuple
from urllib.parse import urljoin
import requests
from faker import Faker
# -------------------------
# Конфигурация по умолчанию
# -------------------------
DEFAULT_BASE_URL = os.getenv("BASE_URL", "http://localhost:8080")
DEFAULT_PASSWORD = os.getenv("PASS", "secret123")
DEFAULT_CLIENTS = int(os.getenv("CLIENTS", "2"))
DEFAULT_EMAIL_DOMAIN = os.getenv("EMAIL_DOMAIN", "agency.dev")
DEFAULT_LOG_FILE = os.getenv("LOG_FILE", "logs/api.log")trevor@trevor-pc:/home/data/marriage/scripts$ ./e2e.sh
=== E2E smoke test start ===
BASE_URL: http://localhost:8080
[23:20:10] Waiting gateway at http://localhost:8080/auth/health (allowed: 200)
DEFAULT_TIMEOUT = float(os.getenv("HTTP_TIMEOUT", "10.0"))
# -------------------------
# Логирование
# -------------------------
def setup_logger(path: str) -> logging.Logger:
os.makedirs(os.path.dirname(path), exist_ok=True)
logger = logging.getLogger("api_e2e")
logger.setLevel(logging.DEBUG)
# Ротация логов: до 5 файлов по 5 МБ
file_handler = RotatingFileHandler(path, maxBytes=5 * 1024 * 1024, backupCount=5, encoding="utf-8")
file_handler.setLevel(logging.DEBUG)
file_handler.setFormatter(logging.Formatter(
fmt="%(asctime)s | %(levelname)s | %(name)s | %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
))
logger.addHandler(file_handler)
# Консоль — INFO и короче
console = logging.StreamHandler(sys.stdout)
console.setLevel(logging.INFO)
console.setFormatter(logging.Formatter("%(levelname)s | %(message)s"))
logger.addHandler(console)
return logger
# -------------------------
# Утилиты
# -------------------------
def b64url_json(token_part: str) -> Dict[str, Any]:
"""Декодирует часть JWT (payload) без валидации сигнатуры."""
s = token_part + "=" * (-len(token_part) % 4)
return json.loads(base64.urlsafe_b64decode(s).decode("utf-8"))
def decode_jwt_sub(token: str) -> str:
try:
payload = b64url_json(token.split(".")[1])
return str(payload.get("sub", "")) # UUID пользователя
except Exception:
return ""
def mask_token(token: Optional[str]) -> str:
if not token:
return ""
return token[:12] + "..." if len(token) > 12 else token
def now_ms() -> int:
return int(time.time() * 1000)
@dataclass
class UserCreds:
id: str
email: str
access_token: str
role: str
# -------------------------
# Класс-клиент
# -------------------------
class APIE2E:
def __init__(self, base_url: str, logger: logging.Logger, timeout: float = DEFAULT_TIMEOUT) -> None:
self.base_url = base_url.rstrip("/") + "/"
self.logger = logger
self.timeout = timeout
self.sess = requests.Session()
self.urls = {
"auth": urljoin(self.base_url, "auth/"),
"profiles": urljoin(self.base_url, "profiles/"),
"match": urljoin(self.base_url, "match/"),
"chat": urljoin(self.base_url, "chat/"),
"payments": urljoin(self.base_url, "payments/"),
}
# --------- низкоуровневый запрос с логированием ----------
def req(self, method, url, token=None, body=None, expected=(200,), name=None):
headers = {"Accept": "application/json"}
if token:
headers["Authorization"] = f"Bearer {token}"
log_body = {}
if body:
log_body = dict(body)
for key in list(log_body.keys()):
if key.lower() in ("password", "token", "access_token", "refresh_token"):
log_body[key] = "***hidden***"
started = now_ms()
self.logger.debug(
f"HTTP {method} {url} | headers={{Authorization: Bearer {mask_token(token)}}} | body={log_body}"
)
try:
resp = self.sess.request(method=method, url=url, json=body, timeout=self.timeout)
except Exception as e:
duration = now_ms() - started
self.logger.error(f"{name or url} FAILED transport error: {e} ({duration} ms)")
raise
text = resp.text or ""
try:
data = resp.json() if text else {}
except ValueError:
data = {}
duration = now_ms() - started
self.logger.debug(f"{resp.status_code} in {duration} ms | body={text[:2000]}")
if expected and resp.status_code not in expected:
msg = f"{name or url} unexpected status {resp.status_code}, expected {list(expected)}; body={text}"
self.logger.error(msg)
raise RuntimeError(msg)
return resp.status_code, data, text
# --------- health ----------
def wait_health(self, name: str, url: str, timeout_sec: int = 60) -> None:
self.logger.info(f"Waiting {name} health: {url}")
deadline = time.time() + timeout_sec
while time.time() < deadline:
try:
code, _, _ = self.req("GET", url, expected=(200,), name=f"{name}/health")
if code == 200:
self.logger.info(f"{name} is healthy")
return
except Exception:
pass
time.sleep(1)
raise TimeoutError(f"{name} not healthy in time: {url}")
# --------- auth ----------
def login(self, email: str, password: str) -> Tuple[str, str]:
url = urljoin(self.urls["auth"], "v1/token")
_, data, _ = self.req("POST", url, body={"email": email, "password": password}, expected=(200,), name="login")
token = data.get("access_token", "")
if not token:
raise RuntimeError("access_token is empty")
user_id = decode_jwt_sub(token)
if not user_id:
raise RuntimeError("cannot decode user id (sub) from token")
return user_id, token
def register(self, email: str, password: str, full_name: str, role: str) -> None:
url = urljoin(self.urls["auth"], "v1/register")
# /register в вашем бэке может возвращать 201/200, а иногда 500 при сериализации —
# поэтому не падаем на 500 сразу, а логинимся ниже.
try:
self.req(
"POST",
url,
body={"email": email, "password": password, "full_name": full_name, "role": role},
expected=(200, 201),
name="register",
)
except RuntimeError as e:
self.logger.warning(f"register returned non-2xx: {e} — will try login anyway")
def login_or_register(self, email: str, password: str, full_name: str, role: str) -> UserCreds:
# 1) пробуем логин
try:
uid, token = self.login(email, password)
self.logger.info(f"Login OK: {email} -> {uid}")
return UserCreds(id=uid, email=email, access_token=token, role=role)
except Exception as e:
self.logger.info(f"Login failed for {email}: {e}; will try register")
# 2) регистрируем (не фатально, если вернулся 500)
self.register(email, password, full_name, role)
# 3) снова логин
uid, token = self.login(email, password)
self.logger.info(f"Registered+Login OK: {email} -> {uid}")
return UserCreds(id=uid, email=email, access_token=token, role=role)
# --------- profiles ----------
def get_my_profile(self, token: str) -> Tuple[int, Dict[str, Any]]:
url = urljoin(self.urls["profiles"], "v1/profiles/me")
code, data, _ = self.req("GET", url, token=token, expected=(200, 404), name="profiles/me")
return code, data
def create_profile(
self,
token: str,
gender: str,
city: str,
languages: List[str],
interests: List[str],
) -> Dict[str, Any]:
url = urljoin(self.urls["profiles"], "v1/profiles")
_, data, _ = self.req(
"POST",
url,
token=token,
body={"gender": gender, "city": city, "languages": languages, "interests": interests},
expected=(200, 201),
name="profiles/create",
)
return data
def ensure_profile(
self, token: str, gender: str, city: str, languages: List[str], interests: List[str]
) -> Dict[str, Any]:
code, p = self.get_my_profile(token)
if code == 200:
self.logger.info(f"Profile exists: id={p.get('id')}")
return p
self.logger.info("Profile not found -> creating")
p = self.create_profile(token, gender, city, languages, interests)
self.logger.info(f"Profile created: id={p.get('id')}")
return p
# --------- match ----------
def create_pair(self, admin_token: str, user_id_a: str, user_id_b: str, score: float, notes: str) -> Dict[str, Any]:
url = urljoin(self.urls["match"], "v1/pairs")
_, data, _ = self.req(
"POST",
url,
token=admin_token,
body={"user_id_a": user_id_a, "user_id_b": user_id_b, "score": score, "notes": notes},
expected=(200, 201),
name="match/create_pair",
)
return data
# --------- chat ----------
def create_room(self, admin_token: str, title: str, participants: List[str]) -> Dict[str, Any]:
url = urljoin(self.urls["chat"], "v1/rooms")
_, data, _ = self.req(
"POST",
url,
token=admin_token,
body={"title": title, "participants": participants},
expected=(200, 201),
name="chat/create_room",
)
return data
def send_message(self, admin_token: str, room_id: str, content: str) -> Dict[str, Any]:
url = urljoin(self.urls["chat"], f"v1/rooms/{room_id}/messages")
_, data, _ = self.req(
"POST",
url,
token=admin_token,
body={"content": content},
expected=(200, 201),
name="chat/send_message",
)
return data
# --------- payments ----------
def create_invoice(
self, admin_token: str, client_id: str, amount: float, currency: str, description: str
) -> Dict[str, Any]:
url = urljoin(self.urls["payments"], "v1/invoices")
_, data, _ = self.req(
"POST",
url,
token=admin_token,
body={"client_id": client_id, "amount": amount, "currency": currency, "description": description},
expected=(200, 201),
name="payments/create_invoice",
)
return data
def mark_invoice_paid(self, admin_token: str, invoice_id: str) -> Dict[str, Any]:
url = urljoin(self.urls["payments"], f"v1/invoices/{invoice_id}/mark-paid")
_, data, _ = self.req("POST", url, token=admin_token, expected=(200,), name="payments/mark_paid")
return data
# -------------------------
# Генерация данных
# -------------------------
GENDERS = ["female", "male", "other"]
CITIES = [
"Moscow", "Saint Petersburg", "Kazan", "Novosibirsk", "Krasnodar",
"Yekaterinburg", "Minsk", "Kyiv", "Almaty", "Tbilisi",
]
LANG_POOL = ["ru", "en", "de", "fr", "es", "it", "zh", "uk", "kk", "tr", "pl"]
INTR_POOL = ["music", "travel", "reading", "sports", "movies", "games", "cooking", "yoga", "art", "tech"]
def pick_languages(n: int = 2) -> List[str]:
n = max(1, min(n, len(LANG_POOL)))
return sorted(random.sample(LANG_POOL, n))
def pick_interests(n: int = 3) -> List[str]:
n = max(1, min(n, len(INTR_POOL)))
return sorted(random.sample(INTR_POOL, n))
def random_email(prefix: str, domain: str) -> str:
suffix = "".join(random.choices(string.ascii_lowercase + string.digits, k=6))
return f"{prefix}+{int(time.time())}.{suffix}@{domain}"
# -------------------------
# Основной сценарий
# -------------------------
def main():
import argparse
parser = argparse.ArgumentParser(description="E2E тест API брачного агентства с фейковыми анкетами.")
parser.add_argument("--base-url", default=DEFAULT_BASE_URL, help="Gateway base URL (по умолчанию http://localhost:8080)")
parser.add_argument("--clients", type=int, default=DEFAULT_CLIENTS, help="Количество клиентских аккаунтов (>=2)")
parser.add_argument("--password", default=DEFAULT_PASSWORD, help="Пароль для всех тестовых пользователей")
parser.add_argument("--email-domain", default=DEFAULT_EMAIL_DOMAIN, help="Домен e-mail для теста (напр. agency.dev)")
parser.add_argument("--log-file", default=DEFAULT_LOG_FILE, help="Файл логов (по умолчанию logs/api.log)")
parser.add_argument("--seed", type=int, default=42, help="Генератор случайных чисел (seed)")
args = parser.parse_args()
random.seed(args.seed)
fake = Faker()
logger = setup_logger(args.log_file)
logger.info("=== API E2E START ===")
logger.info(f"BASE_URL={args.base_url} clients={args.clients} domain={args.email_domain}")
if args.clients < 2:
logger.error("Нужно минимум 2 клиента (для пары).")
sys.exit(2)
api = APIE2E(args.base_url, logger)
# Health checks через gateway
api.wait_health("gateway/auth", urljoin(api.urls["auth"], "health"))
api.wait_health("profiles", urljoin(api.urls["profiles"], "health"))
api.wait_health("match", urljoin(api.urls["match"], "health"))
api.wait_health("chat", urljoin(api.urls["chat"], "health"))
api.wait_health("payments", urljoin(api.urls["payments"], "health"))
# Админ
admin_email = random_email("admin", args.email_domain)
admin_full = fake.name()
admin = api.login_or_register(admin_email, args.password, admin_full, role="ADMIN")
# Клиенты
clients: List[UserCreds] = []
for i in range(args.clients):
email = random_email(f"user{i+1}", args.email_domain)
full = fake.name()
u = api.login_or_register(email, args.password, full, role="CLIENT")
clients.append(u)
# Профили для всех
for i, u in enumerate([admin] + clients, start=1):
gender = random.choice(GENDERS)
city = random.choice(CITIES)
languages = pick_languages(random.choice([1, 2, 3]))
interests = pick_interests(random.choice([2, 3, 4]))
logger.info(f"[{i}/{1+len(clients)}] Ensure profile for {u.email} (role={u.role})")
api.ensure_profile(u.access_token, gender, city, languages, interests)
# Matchпара между двумя случайными клиентами
a, b = random.sample(clients, 2)
score = round(random.uniform(0.6, 0.98), 2)
pair = api.create_pair(admin.access_token, a.id, b.id, score, notes="e2e generated")
pair_id = str(pair.get("id", ""))
logger.info(f"Match pair created: id={pair_id}, {a.email}{b.email}, score={score}")
# Чат‑комната и сообщение
room = api.create_room(admin.access_token, title=f"{a.email} & {b.email}", participants=[a.id, b.id])
room_id = str(room.get("id", ""))
msg = api.send_message(admin.access_token, room_id, content="Hello from admin (e2e)")
msg_id = str(msg.get("id", ""))
logger.info(f"Chat message sent: room={room_id}, msg={msg_id}")
# Счёт для первого клиента
amount = random.choice([99.0, 199.0, 299.0])
inv = api.create_invoice(admin.access_token, client_id=a.id, amount=amount, currency="USD",
description="Consultation (e2e)")
inv_id = str(inv.get("id", ""))
invp = api.mark_invoice_paid(admin.access_token, inv_id)
logger.info(f"Invoice paid: id={inv_id}, status={invp.get('status')}")
# Итог
summary = {
"admin": {"email": admin.email, "id": admin.id},
"clients": [{"email": c.email, "id": c.id} for c in clients],
"pair_id": pair_id,
"room_id": room_id,
"message_id": msg_id,
"invoice_id": inv_id,
"invoice_status": invp.get("status"),
}
logger.info("=== SUMMARY ===")
logger.info(json.dumps(summary, ensure_ascii=False, indent=2))
print(json.dumps(summary, ensure_ascii=False, indent=2)) # на stdout — короткое резюме
if __name__ == "__main__":
try:
main()
except KeyboardInterrupt:
print("\nInterrupted.", file=sys.stderr)
sys.exit(130)