#!/usr/bin/env python3 """ API Test Script for Women's Safety App Run this script to test all major API endpoints """ import asyncio import httpx import json from typing import Dict, Any BASE_URL = "http://localhost:8000" class APITester: def __init__(self, base_url: str = BASE_URL): self.base_url = base_url self.token = None self.user_id = None async def test_registration(self) -> Dict[str, Any]: """Test user registration""" print("🔐 Testing user registration...") user_data = { "email": "test@example.com", "password": "testpassword123", "first_name": "Test", "last_name": "User", "phone": "+1234567890" } async with httpx.AsyncClient() as client: response = await client.post(f"{self.base_url}/api/v1/register", json=user_data) if response.status_code == 200: data = response.json() self.user_id = data["id"] print(f"✅ Registration successful! User ID: {self.user_id}") return data else: print(f"❌ Registration failed: {response.status_code} - {response.text}") return {} async def test_login(self) -> str: """Test user login and get token""" print("🔑 Testing user login...") login_data = { "email": "test@example.com", "password": "testpassword123" } async with httpx.AsyncClient() as client: response = await client.post(f"{self.base_url}/api/v1/login", json=login_data) if response.status_code == 200: data = response.json() self.token = data["access_token"] print("✅ Login successful! Token received") return self.token else: print(f"❌ Login failed: {response.status_code} - {response.text}") return "" async def test_profile(self): """Test getting and updating profile""" if not self.token: print("❌ No token available for profile test") return print("👤 Testing profile operations...") headers = {"Authorization": f"Bearer {self.token}"} async with httpx.AsyncClient() as client: # Get profile response = await client.get(f"{self.base_url}/api/v1/profile", headers=headers) if response.status_code == 200: print("✅ Profile retrieval successful") else: print(f"❌ Profile retrieval failed: {response.status_code}") # Update profile update_data = {"bio": "Updated bio for testing"} response = await client.put(f"{self.base_url}/api/v1/profile", json=update_data, headers=headers) if response.status_code == 200: print("✅ Profile update successful") else: print(f"❌ Profile update failed: {response.status_code}") async def test_location_update(self): """Test location services""" if not self.token: print("❌ No token available for location test") return print("📍 Testing location services...") headers = {"Authorization": f"Bearer {self.token}"} location_data = { "latitude": 37.7749, "longitude": -122.4194, "accuracy": 10.5 } async with httpx.AsyncClient() as client: # Update location response = await client.post(f"{self.base_url}/api/v1/update-location", json=location_data, headers=headers) if response.status_code == 200: print("✅ Location update successful") else: print(f"❌ Location update failed: {response.status_code} - {response.text}") # Get nearby users params = { "latitude": 37.7749, "longitude": -122.4194, "radius_km": 1.0 } response = await client.get(f"{self.base_url}/api/v1/nearby-users", params=params, headers=headers) if response.status_code == 200: nearby = response.json() print(f"✅ Nearby users query successful - found {len(nearby)} users") else: print(f"❌ Nearby users query failed: {response.status_code}") async def test_emergency_alert(self): """Test emergency alert system""" if not self.token: print("❌ No token available for emergency test") return print("🚨 Testing emergency alert system...") headers = {"Authorization": f"Bearer {self.token}"} alert_data = { "latitude": 37.7749, "longitude": -122.4194, "alert_type": "general", "message": "Test emergency alert", "address": "123 Test Street, San Francisco, CA" } async with httpx.AsyncClient() as client: # Create emergency alert response = await client.post(f"{self.base_url}/api/v1/alert", json=alert_data, headers=headers) if response.status_code == 200: alert = response.json() alert_id = alert["id"] print(f"✅ Emergency alert created successfully! Alert ID: {alert_id}") # Get my alerts response = await client.get(f"{self.base_url}/api/v1/alerts/my", headers=headers) if response.status_code == 200: alerts = response.json() print(f"✅ Retrieved {len(alerts)} alerts") else: print(f"❌ Failed to retrieve alerts: {response.status_code}") # Resolve alert response = await client.put(f"{self.base_url}/api/v1/alert/{alert_id}/resolve", headers=headers) if response.status_code == 200: print("✅ Alert resolved successfully") else: print(f"❌ Failed to resolve alert: {response.status_code}") else: print(f"❌ Emergency alert creation failed: {response.status_code} - {response.text}") async def test_calendar_entry(self): """Test calendar services""" if not self.token: print("❌ No token available for calendar test") return print("📅 Testing calendar services...") headers = {"Authorization": f"Bearer {self.token}"} calendar_data = { "entry_date": "2024-01-15", "entry_type": "period", "flow_intensity": "medium", "mood": "happy", "energy_level": 4 } async with httpx.AsyncClient() as client: # Create calendar entry response = await client.post(f"{self.base_url}/api/v1/entries", json=calendar_data, headers=headers) if response.status_code == 200: print("✅ Calendar entry created successfully") # Get calendar entries response = await client.get(f"{self.base_url}/api/v1/entries", headers=headers) if response.status_code == 200: entries = response.json() print(f"✅ Retrieved {len(entries)} calendar entries") else: print(f"❌ Failed to retrieve calendar entries: {response.status_code}") # Get cycle overview response = await client.get(f"{self.base_url}/api/v1/cycle-overview", headers=headers) if response.status_code == 200: overview = response.json() print(f"✅ Cycle overview retrieved - Phase: {overview.get('current_phase', 'unknown')}") else: print(f"❌ Failed to get cycle overview: {response.status_code}") else: print(f"❌ Calendar entry creation failed: {response.status_code} - {response.text}") async def test_notifications(self): """Test notification services""" if not self.token: print("❌ No token available for notification test") return print("🔔 Testing notification services...") headers = {"Authorization": f"Bearer {self.token}"} device_data = { "token": "test_fcm_token_12345", "platform": "android" } async with httpx.AsyncClient() as client: # Register device token response = await client.post(f"{self.base_url}/api/v1/register-device", json=device_data, headers=headers) if response.status_code == 200: print("✅ Device token registered successfully") # Get my devices response = await client.get(f"{self.base_url}/api/v1/my-devices", headers=headers) if response.status_code == 200: devices = response.json() print(f"✅ Retrieved device info - {devices['device_count']} devices") else: print(f"❌ Failed to retrieve devices: {response.status_code}") else: print(f"❌ Device token registration failed: {response.status_code} - {response.text}") async def test_health_checks(self): """Test system health endpoints""" print("🏥 Testing health checks...") async with httpx.AsyncClient() as client: # Gateway health response = await client.get(f"{self.base_url}/api/v1/health") if response.status_code == 200: print("✅ API Gateway health check passed") else: print(f"❌ API Gateway health check failed: {response.status_code}") # Services status response = await client.get(f"{self.base_url}/api/v1/services-status") if response.status_code == 200: status = response.json() healthy_services = sum(1 for service in status["services"].values() if service["status"] == "healthy") total_services = len(status["services"]) print(f"✅ Services status check - {healthy_services}/{total_services} services healthy") # Print individual service status for name, service in status["services"].items(): status_icon = "✅" if service["status"] == "healthy" else "❌" print(f" {status_icon} {name}: {service['status']}") else: print(f"❌ Services status check failed: {response.status_code}") async def run_all_tests(self): """Run all API tests""" print("🚀 Starting API Tests for Women's Safety App\n") # Test basic functionality await self.test_health_checks() print() await self.test_registration() print() await self.test_login() print() if self.token: await self.test_profile() print() await self.test_location_update() print() await self.test_emergency_alert() print() await self.test_calendar_entry() print() await self.test_notifications() print() print("🎉 API testing completed!") async def main(): """Main function to run tests""" print("Women's Safety App - API Test Suite") print("=" * 50) # Check if services are running try: async with httpx.AsyncClient() as client: response = await client.get(f"{BASE_URL}/api/v1/health", timeout=5.0) if response.status_code != 200: print(f"❌ Services not responding. Make sure to run './start_services.sh' first") return except Exception as e: print(f"❌ Cannot connect to services: {e}") print("Make sure to run './start_services.sh' first") return # Run tests tester = APITester() await tester.run_all_tests() if __name__ == "__main__": asyncio.run(main())