#!/usr/bin/env python3 import asyncio import json import os import signal import subprocess import sys import time import aiohttp async def test_user_service(): """Test the User Service API""" # Start the service print("๐Ÿš€ Starting User Service...") # Set up environment env = os.environ.copy() env["PYTHONPATH"] = f"{os.getcwd()}:{env.get('PYTHONPATH', '')}" # Start uvicorn process process = subprocess.Popen( [ sys.executable, "-m", "uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8001", ], cwd="services/user_service", env=env, ) print("โณ Waiting for service to start...") await asyncio.sleep(5) try: # Test registration async with aiohttp.ClientSession() as session: print("๐Ÿงช Testing user registration...") registration_data = { "email": "test3@example.com", "password": "testpassword123", "first_name": "Test", "last_name": "User3", "phone": "+1234567892", } async with session.post( "http://localhost:8001/api/v1/register", json=registration_data, headers={"Content-Type": "application/json"}, ) as response: if response.status == 201: data = await response.json() print("โœ… Registration successful!") print(f"๐Ÿ“ Response: {json.dumps(data, indent=2)}") else: text = await response.text() print(f"โŒ Registration failed with status {response.status}") print(f"๐Ÿ“ Error: {text}") # Test login print("\n๐Ÿงช Testing user login...") login_data = {"email": "test3@example.com", "password": "testpassword123"} async with session.post( "http://localhost:8001/api/v1/login", json=login_data, headers={"Content-Type": "application/json"}, ) as response: if response.status == 200: data = await response.json() print("โœ… Login successful!") print(f"๐Ÿ“ Token: {data['access_token'][:50]}...") else: text = await response.text() print(f"โŒ Login failed with status {response.status}") print(f"๐Ÿ“ Error: {text}") # Test health check print("\n๐Ÿงช Testing health check...") async with session.get("http://localhost:8001/api/v1/health") as response: if response.status == 200: data = await response.json() print("โœ… Health check successful!") print(f"๐Ÿ“ Response: {json.dumps(data, indent=2)}") else: text = await response.text() print(f"โŒ Health check failed with status {response.status}") print(f"๐Ÿ“ Error: {text}") except Exception as e: print(f"โŒ Test failed with exception: {e}") finally: # Stop the service print("\n๐Ÿ›‘ Stopping service...") process.terminate() process.wait() print("โœ… Test completed!") if __name__ == "__main__": asyncio.run(test_user_service())