Files
chat/test_ws_full.py
Andrew K. Choi cfc93cb99a
All checks were successful
continuous-integration/drone/push Build is passing
feat: Fix nutrition service and add location-based alerts
Changes:
- Fix nutrition service: add is_active column and Pydantic validation for UUID/datetime
- Add location-based alerts feature: users can now see alerts within 1km radius
- Fix CORS and response serialization in nutrition service
- Add getCurrentLocation() and loadAlertsNearby() functions
- Improve UI for nearby alerts display with distance and response count
2025-12-13 16:34:50 +09:00

275 lines
9.6 KiB
Python
Raw Permalink Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
Full WebSocket SOS Service Test with Emergency Alert Creation
"""
import asyncio
import json
import sys
from datetime import datetime
try:
import httpx
import websockets
except ImportError:
import subprocess
subprocess.check_call([sys.executable, "-m", "pip", "install", "httpx", "websockets"])
import httpx
import websockets
class Colors:
GREEN = '\033[92m'
RED = '\033[91m'
YELLOW = '\033[93m'
BLUE = '\033[94m'
CYAN = '\033[96m'
RESET = '\033[0m'
BOLD = '\033[1m'
def print_section(title):
print(f"\n{Colors.BOLD}{Colors.BLUE}{'='*70}{Colors.RESET}")
print(f"{Colors.BOLD}{Colors.BLUE}{title:^70}{Colors.RESET}")
print(f"{Colors.BOLD}{Colors.BLUE}{'='*70}{Colors.RESET}\n")
def print_success(msg):
print(f"{Colors.GREEN}{msg}{Colors.RESET}")
def print_error(msg):
print(f"{Colors.RED}{msg}{Colors.RESET}")
def print_info(msg):
print(f"{Colors.CYAN} {msg}{Colors.RESET}")
def print_warning(msg):
print(f"{Colors.YELLOW}⚠️ {msg}{Colors.RESET}")
async def get_jwt_token(email, password):
"""Get JWT token from User Service"""
try:
async with httpx.AsyncClient() as client:
response = await client.post(
"http://localhost:8001/api/v1/auth/login",
json={"email": email, "password": password},
timeout=10.0
)
if response.status_code != 200:
print_error(f"Login failed: {response.status_code}")
return None, None
data = response.json()
token = data.get("access_token")
# Decode token to get user_id
import base64
parts = token.split('.')
if len(parts) == 3:
payload = parts[1]
padding = 4 - len(payload) % 4
if padding != 4:
payload += '=' * padding
decoded = base64.urlsafe_b64decode(payload)
token_data = json.loads(decoded)
user_id = token_data.get("sub")
else:
return None, None
print_success(f"Token obtained for user {user_id}")
return token, int(user_id)
except Exception as e:
print_error(f"Connection error: {e}")
return None, None
async def create_emergency_alert(token, latitude=55.7558, longitude=37.6173, message="SOS!"):
"""Create an emergency alert"""
print_info(f"Creating emergency alert at ({latitude}, {longitude})...")
try:
async with httpx.AsyncClient() as client:
response = await client.post(
"http://localhost:8002/api/v1/alert",
headers={"Authorization": f"Bearer {token}"},
json={
"latitude": latitude,
"longitude": longitude,
"address": f"Location ({latitude}, {longitude})",
"alert_type": "SOS",
"message": message
},
timeout=10.0
)
if response.status_code not in (200, 201):
print_warning(f"Alert creation returned {response.status_code}")
print_info(f"Response: {response.text}")
return None
alert = response.json()
alert_id = alert.get("id")
print_success(f"Alert created with ID: {alert_id}")
return alert_id
except Exception as e:
print_error(f"Failed to create alert: {e}")
return None
async def test_full_sos_flow():
"""Test full SOS flow: login -> connect WebSocket -> create alert -> receive notification"""
print_section("🚨 FULL WebSocket SOS SERVICE TEST")
# Step 1: Get token
print_section("Step 1: Authentication")
token, user_id = await get_jwt_token("wstester@test.com", "WsTest1234!")
if not token or not user_id:
print_error("Authentication failed")
return False
# Step 2: Connect WebSocket
print_section("Step 2: WebSocket Connection")
ws_url = f"ws://localhost:8002/api/v1/emergency/ws/{user_id}?token={token}"
print_info(f"Connecting to: {ws_url[:80]}...")
try:
async with websockets.connect(ws_url) as websocket:
print_success("WebSocket connected")
# Receive connection confirmation
msg = await asyncio.wait_for(websocket.recv(), timeout=5.0)
data = json.loads(msg)
print_success(f"Connection confirmed: {data.get('type')}")
# Step 3: Create emergency alert
print_section("Step 3: Create Emergency Alert")
alert_id = await create_emergency_alert(
token,
latitude=55.7558,
longitude=37.6173,
message="🚨 Test SOS alert from WebSocket test"
)
if not alert_id:
print_warning("Could not create alert, but WebSocket is working")
return True
# Step 4: Listen for alert notification on WebSocket
print_section("Step 4: Listen for Alert Notification")
print_info("Waiting for SOS notification on WebSocket (30 seconds)...")
print_info("Note: Notifications are sent to nearby users who are online")
alert_received = False
try:
while not alert_received:
msg = await asyncio.wait_for(websocket.recv(), timeout=30.0)
data = json.loads(msg)
if data.get("type") == "emergency_alert":
alert_received = True
print_success("🚨 Emergency alert notification received!")
print_info(f"Alert details:")
print(f" - Type: {data.get('alert_type')}")
print(f" - Location: ({data.get('latitude')}, {data.get('longitude')})")
print(f" - Distance: {data.get('distance_km')} km")
print(f" - Message: {data.get('message')}")
break
elif data.get("type") == "pong":
print_info("Ping/pong exchange")
else:
print_info(f"Received message type: {data.get('type')}")
except asyncio.TimeoutError:
print_warning("No alert notification received (timeout)")
print_info("This is expected if no other users are nearby and listening")
return True
print_section("✅ TEST COMPLETED SUCCESSFULLY")
print_success("Full WebSocket SOS flow is working!")
return True
except Exception as e:
print_error(f"WebSocket error: {e}")
return False
async def test_multiple_clients():
"""Test multiple concurrent WebSocket clients"""
print_section("🔗 MULTIPLE CLIENT TEST")
# Get token
token, user_id = await get_jwt_token("wstester@test.com", "WsTest1234!")
if not token:
return False
print_info(f"Testing with user ID: {user_id}")
async def client_task(client_num):
ws_url = f"ws://localhost:8002/api/v1/emergency/ws/{user_id}?token={token}"
try:
async with websockets.connect(ws_url) as ws:
# Receive connection
msg = await asyncio.wait_for(ws.recv(), timeout=5.0)
print_success(f"Client {client_num} connected")
# Send ping
await ws.send(json.dumps({"type": "ping"}))
pong = await asyncio.wait_for(ws.recv(), timeout=5.0)
if json.loads(pong).get("type") == "pong":
print_success(f"Client {client_num} ping/pong OK")
return True
except Exception as e:
print_error(f"Client {client_num} failed: {e}")
return False
# Create multiple concurrent connections
tasks = [client_task(i) for i in range(1, 4)]
results = await asyncio.gather(*tasks)
if all(results):
print_success(f"All {len(results)} clients connected successfully!")
return True
else:
print_warning(f"Some clients failed")
return False
async def main():
"""Run all tests"""
results = {}
# Test 1: Full SOS flow
print_section("🧪 WebSocket SOS Service Tests")
results["full_sos_flow"] = await test_full_sos_flow()
# Test 2: Multiple clients
results["multiple_clients"] = await test_multiple_clients()
# Summary
print_section("📊 Test Summary")
for test_name, result in results.items():
status = f"{Colors.GREEN}✅ PASSED{Colors.RESET}" if result else f"{Colors.RED}❌ FAILED{Colors.RESET}"
print(f"{test_name.ljust(30)}: {status}")
passed = sum(1 for v in results.values() if v)
total = len(results)
print(f"\n{Colors.BOLD}Total: {passed}/{total} tests passed{Colors.RESET}")
if passed == total:
print_success("All tests passed! WebSocket SOS service is fully functional! 🎉")
else:
print_warning("Some tests failed")
return all(results.values())
if __name__ == "__main__":
try:
success = asyncio.run(main())
sys.exit(0 if success else 1)
except KeyboardInterrupt:
print_warning("\nTests interrupted by user")
sys.exit(1)