#!/usr/bin/env python3 # -*- coding: utf-8 -*- import base64 import json import logging import os import random import string import sys import time from dataclasses import dataclass from logging.handlers import RotatingFileHandler from typing import Any, Dict, Iterable, List, Optional, Tuple from urllib.parse import urljoin import requests from faker import Faker # ------------------------- # Конфигурация по умолчанию # ------------------------- DEFAULT_BASE_URL = os.getenv("BASE_URL", "http://localhost:8080") DEFAULT_PASSWORD = os.getenv("PASS", "secret123") DEFAULT_CLIENTS = int(os.getenv("CLIENTS", "2")) DEFAULT_EMAIL_DOMAIN = os.getenv("EMAIL_DOMAIN", "agency.dev") DEFAULT_LOG_FILE = os.getenv("LOG_FILE", "logs/api.log")trevor@trevor-pc:/home/data/marriage/scripts$ ./e2e.sh === E2E smoke test start === BASE_URL: http://localhost:8080 [23:20:10] Waiting gateway at http://localhost:8080/auth/health (allowed: 200) DEFAULT_TIMEOUT = float(os.getenv("HTTP_TIMEOUT", "10.0")) # ------------------------- # Логирование # ------------------------- def setup_logger(path: str) -> logging.Logger: os.makedirs(os.path.dirname(path), exist_ok=True) logger = logging.getLogger("api_e2e") logger.setLevel(logging.DEBUG) # Ротация логов: до 5 файлов по 5 МБ file_handler = RotatingFileHandler(path, maxBytes=5 * 1024 * 1024, backupCount=5, encoding="utf-8") file_handler.setLevel(logging.DEBUG) file_handler.setFormatter(logging.Formatter( fmt="%(asctime)s | %(levelname)s | %(name)s | %(message)s", datefmt="%Y-%m-%d %H:%M:%S", )) logger.addHandler(file_handler) # Консоль — INFO и короче console = logging.StreamHandler(sys.stdout) console.setLevel(logging.INFO) console.setFormatter(logging.Formatter("%(levelname)s | %(message)s")) logger.addHandler(console) return logger # ------------------------- # Утилиты # ------------------------- def b64url_json(token_part: str) -> Dict[str, Any]: """Декодирует часть JWT (payload) без валидации сигнатуры.""" s = token_part + "=" * (-len(token_part) % 4) return json.loads(base64.urlsafe_b64decode(s).decode("utf-8")) def decode_jwt_sub(token: str) -> str: try: payload = b64url_json(token.split(".")[1]) return str(payload.get("sub", "")) # UUID пользователя except Exception: return "" def mask_token(token: Optional[str]) -> str: if not token: return "" return token[:12] + "..." if len(token) > 12 else token def now_ms() -> int: return int(time.time() * 1000) @dataclass class UserCreds: id: str email: str access_token: str role: str # ------------------------- # Класс-клиент # ------------------------- class APIE2E: def __init__(self, base_url: str, logger: logging.Logger, timeout: float = DEFAULT_TIMEOUT) -> None: self.base_url = base_url.rstrip("/") + "/" self.logger = logger self.timeout = timeout self.sess = requests.Session() self.urls = { "auth": urljoin(self.base_url, "auth/"), "profiles": urljoin(self.base_url, "profiles/"), "match": urljoin(self.base_url, "match/"), "chat": urljoin(self.base_url, "chat/"), "payments": urljoin(self.base_url, "payments/"), } # --------- низкоуровневый запрос с логированием ---------- def req(self, method, url, token=None, body=None, expected=(200,), name=None): headers = {"Accept": "application/json"} if token: headers["Authorization"] = f"Bearer {token}" log_body = {} if body: log_body = dict(body) for key in list(log_body.keys()): if key.lower() in ("password", "token", "access_token", "refresh_token"): log_body[key] = "***hidden***" started = now_ms() self.logger.debug( f"HTTP {method} {url} | headers={{Authorization: Bearer {mask_token(token)}}} | body={log_body}" ) try: resp = self.sess.request(method=method, url=url, json=body, timeout=self.timeout) except Exception as e: duration = now_ms() - started self.logger.error(f"{name or url} FAILED transport error: {e} ({duration} ms)") raise text = resp.text or "" try: data = resp.json() if text else {} except ValueError: data = {} duration = now_ms() - started self.logger.debug(f"← {resp.status_code} in {duration} ms | body={text[:2000]}") if expected and resp.status_code not in expected: msg = f"{name or url} unexpected status {resp.status_code}, expected {list(expected)}; body={text}" self.logger.error(msg) raise RuntimeError(msg) return resp.status_code, data, text # --------- health ---------- def wait_health(self, name: str, url: str, timeout_sec: int = 60) -> None: self.logger.info(f"Waiting {name} health: {url}") deadline = time.time() + timeout_sec while time.time() < deadline: try: code, _, _ = self.req("GET", url, expected=(200,), name=f"{name}/health") if code == 200: self.logger.info(f"{name} is healthy") return except Exception: pass time.sleep(1) raise TimeoutError(f"{name} not healthy in time: {url}") # --------- auth ---------- def login(self, email: str, password: str) -> Tuple[str, str]: url = urljoin(self.urls["auth"], "v1/token") _, data, _ = self.req("POST", url, body={"email": email, "password": password}, expected=(200,), name="login") token = data.get("access_token", "") if not token: raise RuntimeError("access_token is empty") user_id = decode_jwt_sub(token) if not user_id: raise RuntimeError("cannot decode user id (sub) from token") return user_id, token def register(self, email: str, password: str, full_name: str, role: str) -> None: url = urljoin(self.urls["auth"], "v1/register") # /register в вашем бэке может возвращать 201/200, а иногда 500 при сериализации — # поэтому не падаем на 500 сразу, а логинимся ниже. try: self.req( "POST", url, body={"email": email, "password": password, "full_name": full_name, "role": role}, expected=(200, 201), name="register", ) except RuntimeError as e: self.logger.warning(f"register returned non-2xx: {e} — will try login anyway") def login_or_register(self, email: str, password: str, full_name: str, role: str) -> UserCreds: # 1) пробуем логин try: uid, token = self.login(email, password) self.logger.info(f"Login OK: {email} -> {uid}") return UserCreds(id=uid, email=email, access_token=token, role=role) except Exception as e: self.logger.info(f"Login failed for {email}: {e}; will try register") # 2) регистрируем (не фатально, если вернулся 500) self.register(email, password, full_name, role) # 3) снова логин uid, token = self.login(email, password) self.logger.info(f"Registered+Login OK: {email} -> {uid}") return UserCreds(id=uid, email=email, access_token=token, role=role) # --------- profiles ---------- def get_my_profile(self, token: str) -> Tuple[int, Dict[str, Any]]: url = urljoin(self.urls["profiles"], "v1/profiles/me") code, data, _ = self.req("GET", url, token=token, expected=(200, 404), name="profiles/me") return code, data def create_profile( self, token: str, gender: str, city: str, languages: List[str], interests: List[str], ) -> Dict[str, Any]: url = urljoin(self.urls["profiles"], "v1/profiles") _, data, _ = self.req( "POST", url, token=token, body={"gender": gender, "city": city, "languages": languages, "interests": interests}, expected=(200, 201), name="profiles/create", ) return data def ensure_profile( self, token: str, gender: str, city: str, languages: List[str], interests: List[str] ) -> Dict[str, Any]: code, p = self.get_my_profile(token) if code == 200: self.logger.info(f"Profile exists: id={p.get('id')}") return p self.logger.info("Profile not found -> creating") p = self.create_profile(token, gender, city, languages, interests) self.logger.info(f"Profile created: id={p.get('id')}") return p # --------- match ---------- def create_pair(self, admin_token: str, user_id_a: str, user_id_b: str, score: float, notes: str) -> Dict[str, Any]: url = urljoin(self.urls["match"], "v1/pairs") _, data, _ = self.req( "POST", url, token=admin_token, body={"user_id_a": user_id_a, "user_id_b": user_id_b, "score": score, "notes": notes}, expected=(200, 201), name="match/create_pair", ) return data # --------- chat ---------- def create_room(self, admin_token: str, title: str, participants: List[str]) -> Dict[str, Any]: url = urljoin(self.urls["chat"], "v1/rooms") _, data, _ = self.req( "POST", url, token=admin_token, body={"title": title, "participants": participants}, expected=(200, 201), name="chat/create_room", ) return data def send_message(self, admin_token: str, room_id: str, content: str) -> Dict[str, Any]: url = urljoin(self.urls["chat"], f"v1/rooms/{room_id}/messages") _, data, _ = self.req( "POST", url, token=admin_token, body={"content": content}, expected=(200, 201), name="chat/send_message", ) return data # --------- payments ---------- def create_invoice( self, admin_token: str, client_id: str, amount: float, currency: str, description: str ) -> Dict[str, Any]: url = urljoin(self.urls["payments"], "v1/invoices") _, data, _ = self.req( "POST", url, token=admin_token, body={"client_id": client_id, "amount": amount, "currency": currency, "description": description}, expected=(200, 201), name="payments/create_invoice", ) return data def mark_invoice_paid(self, admin_token: str, invoice_id: str) -> Dict[str, Any]: url = urljoin(self.urls["payments"], f"v1/invoices/{invoice_id}/mark-paid") _, data, _ = self.req("POST", url, token=admin_token, expected=(200,), name="payments/mark_paid") return data # ------------------------- # Генерация данных # ------------------------- GENDERS = ["female", "male", "other"] CITIES = [ "Moscow", "Saint Petersburg", "Kazan", "Novosibirsk", "Krasnodar", "Yekaterinburg", "Minsk", "Kyiv", "Almaty", "Tbilisi", ] LANG_POOL = ["ru", "en", "de", "fr", "es", "it", "zh", "uk", "kk", "tr", "pl"] INTR_POOL = ["music", "travel", "reading", "sports", "movies", "games", "cooking", "yoga", "art", "tech"] def pick_languages(n: int = 2) -> List[str]: n = max(1, min(n, len(LANG_POOL))) return sorted(random.sample(LANG_POOL, n)) def pick_interests(n: int = 3) -> List[str]: n = max(1, min(n, len(INTR_POOL))) return sorted(random.sample(INTR_POOL, n)) def random_email(prefix: str, domain: str) -> str: suffix = "".join(random.choices(string.ascii_lowercase + string.digits, k=6)) return f"{prefix}+{int(time.time())}.{suffix}@{domain}" # ------------------------- # Основной сценарий # ------------------------- def main(): import argparse parser = argparse.ArgumentParser(description="E2E тест API брачного агентства с фейковыми анкетами.") parser.add_argument("--base-url", default=DEFAULT_BASE_URL, help="Gateway base URL (по умолчанию http://localhost:8080)") parser.add_argument("--clients", type=int, default=DEFAULT_CLIENTS, help="Количество клиентских аккаунтов (>=2)") parser.add_argument("--password", default=DEFAULT_PASSWORD, help="Пароль для всех тестовых пользователей") parser.add_argument("--email-domain", default=DEFAULT_EMAIL_DOMAIN, help="Домен e-mail для теста (напр. agency.dev)") parser.add_argument("--log-file", default=DEFAULT_LOG_FILE, help="Файл логов (по умолчанию logs/api.log)") parser.add_argument("--seed", type=int, default=42, help="Генератор случайных чисел (seed)") args = parser.parse_args() random.seed(args.seed) fake = Faker() logger = setup_logger(args.log_file) logger.info("=== API E2E START ===") logger.info(f"BASE_URL={args.base_url} clients={args.clients} domain={args.email_domain}") if args.clients < 2: logger.error("Нужно минимум 2 клиента (для пары).") sys.exit(2) api = APIE2E(args.base_url, logger) # Health checks через gateway api.wait_health("gateway/auth", urljoin(api.urls["auth"], "health")) api.wait_health("profiles", urljoin(api.urls["profiles"], "health")) api.wait_health("match", urljoin(api.urls["match"], "health")) api.wait_health("chat", urljoin(api.urls["chat"], "health")) api.wait_health("payments", urljoin(api.urls["payments"], "health")) # Админ admin_email = random_email("admin", args.email_domain) admin_full = fake.name() admin = api.login_or_register(admin_email, args.password, admin_full, role="ADMIN") # Клиенты clients: List[UserCreds] = [] for i in range(args.clients): email = random_email(f"user{i+1}", args.email_domain) full = fake.name() u = api.login_or_register(email, args.password, full, role="CLIENT") clients.append(u) # Профили для всех for i, u in enumerate([admin] + clients, start=1): gender = random.choice(GENDERS) city = random.choice(CITIES) languages = pick_languages(random.choice([1, 2, 3])) interests = pick_interests(random.choice([2, 3, 4])) logger.info(f"[{i}/{1+len(clients)}] Ensure profile for {u.email} (role={u.role})") api.ensure_profile(u.access_token, gender, city, languages, interests) # Match‑пара между двумя случайными клиентами a, b = random.sample(clients, 2) score = round(random.uniform(0.6, 0.98), 2) pair = api.create_pair(admin.access_token, a.id, b.id, score, notes="e2e generated") pair_id = str(pair.get("id", "")) logger.info(f"Match pair created: id={pair_id}, {a.email} ↔ {b.email}, score={score}") # Чат‑комната и сообщение room = api.create_room(admin.access_token, title=f"{a.email} & {b.email}", participants=[a.id, b.id]) room_id = str(room.get("id", "")) msg = api.send_message(admin.access_token, room_id, content="Hello from admin (e2e)") msg_id = str(msg.get("id", "")) logger.info(f"Chat message sent: room={room_id}, msg={msg_id}") # Счёт для первого клиента amount = random.choice([99.0, 199.0, 299.0]) inv = api.create_invoice(admin.access_token, client_id=a.id, amount=amount, currency="USD", description="Consultation (e2e)") inv_id = str(inv.get("id", "")) invp = api.mark_invoice_paid(admin.access_token, inv_id) logger.info(f"Invoice paid: id={inv_id}, status={invp.get('status')}") # Итог summary = { "admin": {"email": admin.email, "id": admin.id}, "clients": [{"email": c.email, "id": c.id} for c in clients], "pair_id": pair_id, "room_id": room_id, "message_id": msg_id, "invoice_id": inv_id, "invoice_status": invp.get("status"), } logger.info("=== SUMMARY ===") logger.info(json.dumps(summary, ensure_ascii=False, indent=2)) print(json.dumps(summary, ensure_ascii=False, indent=2)) # на stdout — короткое резюме if __name__ == "__main__": try: main() except KeyboardInterrupt: print("\nInterrupted.", file=sys.stderr) sys.exit(130)