#!/usr/bin/env python3 """ Full WebSocket SOS Service Test with Emergency Alert Creation """ import asyncio import json import sys from datetime import datetime try: import httpx import websockets except ImportError: import subprocess subprocess.check_call([sys.executable, "-m", "pip", "install", "httpx", "websockets"]) import httpx import websockets class Colors: GREEN = '\033[92m' RED = '\033[91m' YELLOW = '\033[93m' BLUE = '\033[94m' CYAN = '\033[96m' RESET = '\033[0m' BOLD = '\033[1m' def print_section(title): print(f"\n{Colors.BOLD}{Colors.BLUE}{'='*70}{Colors.RESET}") print(f"{Colors.BOLD}{Colors.BLUE}{title:^70}{Colors.RESET}") print(f"{Colors.BOLD}{Colors.BLUE}{'='*70}{Colors.RESET}\n") def print_success(msg): print(f"{Colors.GREEN}✅ {msg}{Colors.RESET}") def print_error(msg): print(f"{Colors.RED}❌ {msg}{Colors.RESET}") def print_info(msg): print(f"{Colors.CYAN}ℹ️ {msg}{Colors.RESET}") def print_warning(msg): print(f"{Colors.YELLOW}⚠️ {msg}{Colors.RESET}") async def get_jwt_token(email, password): """Get JWT token from User Service""" try: async with httpx.AsyncClient() as client: response = await client.post( "http://localhost:8001/api/v1/auth/login", json={"email": email, "password": password}, timeout=10.0 ) if response.status_code != 200: print_error(f"Login failed: {response.status_code}") return None, None data = response.json() token = data.get("access_token") # Decode token to get user_id import base64 parts = token.split('.') if len(parts) == 3: payload = parts[1] padding = 4 - len(payload) % 4 if padding != 4: payload += '=' * padding decoded = base64.urlsafe_b64decode(payload) token_data = json.loads(decoded) user_id = token_data.get("sub") else: return None, None print_success(f"Token obtained for user {user_id}") return token, int(user_id) except Exception as e: print_error(f"Connection error: {e}") return None, None async def create_emergency_alert(token, latitude=55.7558, longitude=37.6173, message="SOS!"): """Create an emergency alert""" print_info(f"Creating emergency alert at ({latitude}, {longitude})...") try: async with httpx.AsyncClient() as client: response = await client.post( "http://localhost:8002/api/v1/alert", headers={"Authorization": f"Bearer {token}"}, json={ "latitude": latitude, "longitude": longitude, "address": f"Location ({latitude}, {longitude})", "alert_type": "SOS", "message": message }, timeout=10.0 ) if response.status_code not in (200, 201): print_warning(f"Alert creation returned {response.status_code}") print_info(f"Response: {response.text}") return None alert = response.json() alert_id = alert.get("id") print_success(f"Alert created with ID: {alert_id}") return alert_id except Exception as e: print_error(f"Failed to create alert: {e}") return None async def test_full_sos_flow(): """Test full SOS flow: login -> connect WebSocket -> create alert -> receive notification""" print_section("🚨 FULL WebSocket SOS SERVICE TEST") # Step 1: Get token print_section("Step 1: Authentication") token, user_id = await get_jwt_token("wstester@test.com", "WsTest1234!") if not token or not user_id: print_error("Authentication failed") return False # Step 2: Connect WebSocket print_section("Step 2: WebSocket Connection") ws_url = f"ws://localhost:8002/api/v1/emergency/ws/{user_id}?token={token}" print_info(f"Connecting to: {ws_url[:80]}...") try: async with websockets.connect(ws_url) as websocket: print_success("WebSocket connected") # Receive connection confirmation msg = await asyncio.wait_for(websocket.recv(), timeout=5.0) data = json.loads(msg) print_success(f"Connection confirmed: {data.get('type')}") # Step 3: Create emergency alert print_section("Step 3: Create Emergency Alert") alert_id = await create_emergency_alert( token, latitude=55.7558, longitude=37.6173, message="🚨 Test SOS alert from WebSocket test" ) if not alert_id: print_warning("Could not create alert, but WebSocket is working") return True # Step 4: Listen for alert notification on WebSocket print_section("Step 4: Listen for Alert Notification") print_info("Waiting for SOS notification on WebSocket (30 seconds)...") print_info("Note: Notifications are sent to nearby users who are online") alert_received = False try: while not alert_received: msg = await asyncio.wait_for(websocket.recv(), timeout=30.0) data = json.loads(msg) if data.get("type") == "emergency_alert": alert_received = True print_success("🚨 Emergency alert notification received!") print_info(f"Alert details:") print(f" - Type: {data.get('alert_type')}") print(f" - Location: ({data.get('latitude')}, {data.get('longitude')})") print(f" - Distance: {data.get('distance_km')} km") print(f" - Message: {data.get('message')}") break elif data.get("type") == "pong": print_info("Ping/pong exchange") else: print_info(f"Received message type: {data.get('type')}") except asyncio.TimeoutError: print_warning("No alert notification received (timeout)") print_info("This is expected if no other users are nearby and listening") return True print_section("✅ TEST COMPLETED SUCCESSFULLY") print_success("Full WebSocket SOS flow is working!") return True except Exception as e: print_error(f"WebSocket error: {e}") return False async def test_multiple_clients(): """Test multiple concurrent WebSocket clients""" print_section("🔗 MULTIPLE CLIENT TEST") # Get token token, user_id = await get_jwt_token("wstester@test.com", "WsTest1234!") if not token: return False print_info(f"Testing with user ID: {user_id}") async def client_task(client_num): ws_url = f"ws://localhost:8002/api/v1/emergency/ws/{user_id}?token={token}" try: async with websockets.connect(ws_url) as ws: # Receive connection msg = await asyncio.wait_for(ws.recv(), timeout=5.0) print_success(f"Client {client_num} connected") # Send ping await ws.send(json.dumps({"type": "ping"})) pong = await asyncio.wait_for(ws.recv(), timeout=5.0) if json.loads(pong).get("type") == "pong": print_success(f"Client {client_num} ping/pong OK") return True except Exception as e: print_error(f"Client {client_num} failed: {e}") return False # Create multiple concurrent connections tasks = [client_task(i) for i in range(1, 4)] results = await asyncio.gather(*tasks) if all(results): print_success(f"All {len(results)} clients connected successfully!") return True else: print_warning(f"Some clients failed") return False async def main(): """Run all tests""" results = {} # Test 1: Full SOS flow print_section("🧪 WebSocket SOS Service Tests") results["full_sos_flow"] = await test_full_sos_flow() # Test 2: Multiple clients results["multiple_clients"] = await test_multiple_clients() # Summary print_section("📊 Test Summary") for test_name, result in results.items(): status = f"{Colors.GREEN}✅ PASSED{Colors.RESET}" if result else f"{Colors.RED}❌ FAILED{Colors.RESET}" print(f"{test_name.ljust(30)}: {status}") passed = sum(1 for v in results.values() if v) total = len(results) print(f"\n{Colors.BOLD}Total: {passed}/{total} tests passed{Colors.RESET}") if passed == total: print_success("All tests passed! WebSocket SOS service is fully functional! 🎉") else: print_warning("Some tests failed") return all(results.values()) if __name__ == "__main__": try: success = asyncio.run(main()) sys.exit(0 if success else 1) except KeyboardInterrupt: print_warning("\nTests interrupted by user") sys.exit(1)