Files
chat/tests/test_api_python.py
Andrew K. Choi 4e3768a6ee
Some checks failed
continuous-integration/drone/push Build is failing
pipeline issues fix
2025-09-25 11:59:54 +09:00

114 lines
3.5 KiB
Python

#!/usr/bin/env python3
import asyncio
import json
import os
import signal
import subprocess
import sys
import time
import aiohttp
async def test_user_service():
"""Test the User Service API"""
# Start the service
print("🚀 Starting User Service...")
# Set up environment
env = os.environ.copy()
env["PYTHONPATH"] = f"{os.getcwd()}:{env.get('PYTHONPATH', '')}"
# Start uvicorn process
process = subprocess.Popen(
[
sys.executable,
"-m",
"uvicorn",
"main:app",
"--host",
"0.0.0.0",
"--port",
"8001",
],
cwd="services/user_service",
env=env,
)
print("⏳ Waiting for service to start...")
await asyncio.sleep(5)
try:
# Test registration
async with aiohttp.ClientSession() as session:
print("🧪 Testing user registration...")
registration_data = {
"email": "test3@example.com",
"password": "testpassword123",
"first_name": "Test",
"last_name": "User3",
"phone": "+1234567892",
}
async with session.post(
"http://localhost:8001/api/v1/register",
json=registration_data,
headers={"Content-Type": "application/json"},
) as response:
if response.status == 201:
data = await response.json()
print("✅ Registration successful!")
print(f"📝 Response: {json.dumps(data, indent=2)}")
else:
text = await response.text()
print(f"❌ Registration failed with status {response.status}")
print(f"📝 Error: {text}")
# Test login
print("\n🧪 Testing user login...")
login_data = {"email": "test3@example.com", "password": "testpassword123"}
async with session.post(
"http://localhost:8001/api/v1/login",
json=login_data,
headers={"Content-Type": "application/json"},
) as response:
if response.status == 200:
data = await response.json()
print("✅ Login successful!")
print(f"📝 Token: {data['access_token'][:50]}...")
else:
text = await response.text()
print(f"❌ Login failed with status {response.status}")
print(f"📝 Error: {text}")
# Test health check
print("\n🧪 Testing health check...")
async with session.get("http://localhost:8001/api/v1/health") as response:
if response.status == 200:
data = await response.json()
print("✅ Health check successful!")
print(f"📝 Response: {json.dumps(data, indent=2)}")
else:
text = await response.text()
print(f"❌ Health check failed with status {response.status}")
print(f"📝 Error: {text}")
except Exception as e:
print(f"❌ Test failed with exception: {e}")
finally:
# Stop the service
print("\n🛑 Stopping service...")
process.terminate()
process.wait()
print("✅ Test completed!")
if __name__ == "__main__":
asyncio.run(test_user_service())