Some checks reported errors
continuous-integration/drone Build encountered an error
103 lines
3.5 KiB
Python
103 lines
3.5 KiB
Python
#!/usr/bin/env python3
|
|
|
|
import asyncio
|
|
import aiohttp
|
|
import json
|
|
import subprocess
|
|
import time
|
|
import signal
|
|
import os
|
|
import sys
|
|
|
|
async def test_user_service():
|
|
"""Test the User Service API"""
|
|
|
|
# Start the service
|
|
print("🚀 Starting User Service...")
|
|
|
|
# Set up environment
|
|
env = os.environ.copy()
|
|
env['PYTHONPATH'] = f"{os.getcwd()}:{env.get('PYTHONPATH', '')}"
|
|
|
|
# Start uvicorn process
|
|
process = subprocess.Popen([
|
|
sys.executable, "-m", "uvicorn", "main:app",
|
|
"--host", "0.0.0.0", "--port", "8001"
|
|
], cwd="services/user_service", env=env)
|
|
|
|
print("⏳ Waiting for service to start...")
|
|
await asyncio.sleep(5)
|
|
|
|
try:
|
|
# Test registration
|
|
async with aiohttp.ClientSession() as session:
|
|
print("🧪 Testing user registration...")
|
|
|
|
registration_data = {
|
|
"email": "test3@example.com",
|
|
"password": "testpassword123",
|
|
"first_name": "Test",
|
|
"last_name": "User3",
|
|
"phone": "+1234567892"
|
|
}
|
|
|
|
async with session.post(
|
|
"http://localhost:8001/api/v1/register",
|
|
json=registration_data,
|
|
headers={"Content-Type": "application/json"}
|
|
) as response:
|
|
if response.status == 201:
|
|
data = await response.json()
|
|
print("✅ Registration successful!")
|
|
print(f"📝 Response: {json.dumps(data, indent=2)}")
|
|
else:
|
|
text = await response.text()
|
|
print(f"❌ Registration failed with status {response.status}")
|
|
print(f"📝 Error: {text}")
|
|
|
|
# Test login
|
|
print("\n🧪 Testing user login...")
|
|
|
|
login_data = {
|
|
"email": "test3@example.com",
|
|
"password": "testpassword123"
|
|
}
|
|
|
|
async with session.post(
|
|
"http://localhost:8001/api/v1/login",
|
|
json=login_data,
|
|
headers={"Content-Type": "application/json"}
|
|
) as response:
|
|
if response.status == 200:
|
|
data = await response.json()
|
|
print("✅ Login successful!")
|
|
print(f"📝 Token: {data['access_token'][:50]}...")
|
|
else:
|
|
text = await response.text()
|
|
print(f"❌ Login failed with status {response.status}")
|
|
print(f"📝 Error: {text}")
|
|
|
|
# Test health check
|
|
print("\n🧪 Testing health check...")
|
|
async with session.get("http://localhost:8001/api/v1/health") as response:
|
|
if response.status == 200:
|
|
data = await response.json()
|
|
print("✅ Health check successful!")
|
|
print(f"📝 Response: {json.dumps(data, indent=2)}")
|
|
else:
|
|
text = await response.text()
|
|
print(f"❌ Health check failed with status {response.status}")
|
|
print(f"📝 Error: {text}")
|
|
|
|
except Exception as e:
|
|
print(f"❌ Test failed with exception: {e}")
|
|
|
|
finally:
|
|
# Stop the service
|
|
print("\n🛑 Stopping service...")
|
|
process.terminate()
|
|
process.wait()
|
|
print("✅ Test completed!")
|
|
|
|
if __name__ == "__main__":
|
|
asyncio.run(test_user_service()) |