import asyncio import time from typing import Dict, Any, Optional, List import httpx from fastapi import Depends, FastAPI, HTTPException, Request from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import JSONResponse from fastapi.openapi.utils import get_openapi from pydantic import BaseModel, Field from shared.config import settings from services.user_service.schemas import UserCreate, UserLogin, UserResponse, UserUpdate, Token # Импортируем схемы для экстренных контактов class EmergencyContactBase(BaseModel): name: str = Field(..., min_length=1, max_length=100) phone_number: str = Field(..., min_length=5, max_length=20) relationship: Optional[str] = Field(None, max_length=50) notes: Optional[str] = Field(None, max_length=500) class EmergencyContactCreate(EmergencyContactBase): pass class EmergencyContactUpdate(BaseModel): name: Optional[str] = Field(None, min_length=1, max_length=100) phone_number: Optional[str] = Field(None, min_length=5, max_length=20) relationship: Optional[str] = Field(None, max_length=50) notes: Optional[str] = Field(None, max_length=500) class EmergencyContactResponse(EmergencyContactBase): id: int uuid: str user_id: int class Config: from_attributes = True app = FastAPI(title="API Gateway", version="1.0.0", openapi_url="/api/openapi.json") # CORS middleware app.add_middleware( CORSMiddleware, allow_origins=settings.CORS_ORIGINS, allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # Service registry SERVICES = { "users": "http://localhost:8001", "emergency": "http://localhost:8002", "location": "http://localhost:8003", "calendar": "http://localhost:8004", "notifications": "http://localhost:8005", } # Rate limiting (simple in-memory implementation) request_counts: Dict[str, Dict[str, int]] = {} RATE_LIMIT_REQUESTS = 100 # requests per minute RATE_LIMIT_WINDOW = 60 # seconds def get_client_ip(request: Request) -> str: """Get client IP address""" x_forwarded_for = request.headers.get("X-Forwarded-For") if x_forwarded_for: return x_forwarded_for.split(",")[0].strip() return request.client.host if request.client else "127.0.0.1" def is_rate_limited(client_ip: str) -> bool: """Check if client is rate limited""" current_time = int(time.time()) window_start = current_time - RATE_LIMIT_WINDOW if client_ip not in request_counts: request_counts[client_ip] = {} # Clean old entries request_counts[client_ip] = { timestamp: count for timestamp, count in request_counts[client_ip].items() if int(timestamp) > window_start } # Count requests in current window total_requests = sum(request_counts[client_ip].values()) if total_requests >= RATE_LIMIT_REQUESTS: return True # Add current request timestamp_key = str(current_time) request_counts[client_ip][timestamp_key] = ( request_counts[client_ip].get(timestamp_key, 0) + 1 ) return False async def proxy_request( service_url: str, path: str, method: str, headers: dict, body: Optional[bytes] = None, params: Optional[dict] = None, user_create: Optional[UserCreate] = None, user_update: Optional[UserUpdate] = None, user_login: Optional[UserLogin] = None, emergency_contact_create: Optional[EmergencyContactCreate] = None, emergency_contact_update: Optional[EmergencyContactUpdate] = None ): """Proxy request to microservice""" url = f"{service_url}{path}" # Для отладки print(f"Proxy request to: {url}, method: {method}") if body: print(f"Request body: {body.decode('utf-8')[:100]}...") # Remove hop-by-hop headers filtered_headers = { k: v for k, v in headers.items() if k.lower() not in [ "host", "connection", "upgrade", "proxy-connection", "proxy-authenticate", "proxy-authorization", "te", "trailers", "transfer-encoding", ] } async with httpx.AsyncClient(timeout=30.0) as client: try: response = await client.request( method=method, url=url, headers=filtered_headers, content=body, params=params, ) # Для отладки print(f"Response status: {response.status_code}") if response.status_code >= 400: print(f"Error response: {response.text}") return response except httpx.TimeoutException: print(f"Timeout error for {url}") raise HTTPException(status_code=504, detail="Service timeout") except httpx.ConnectError: print(f"Connection error for {url}") raise HTTPException(status_code=503, detail="Service unavailable") except Exception as e: print(f"Proxy error for {url}: {str(e)}") raise HTTPException(status_code=500, detail=f"Proxy error: {str(e)}") @app.middleware("http") async def rate_limiting_middleware(request: Request, call_next): """Rate limiting middleware""" client_ip = get_client_ip(request) # Skip rate limiting for health checks if request.url.path.endswith("/health"): return await call_next(request) if is_rate_limited(client_ip): return JSONResponse(status_code=429, content={"detail": "Rate limit exceeded"}) return await call_next(request) # User Service routes @app.post("/api/v1/auth/register", operation_id="user_auth_register", response_model=UserResponse, tags=["Authentication"], summary="Register a new user") @app.post("/api/v1/auth/login", operation_id="user_auth_login", response_model=Token, tags=["Authentication"], summary="Login user") @app.post("/api/v1/users/register", operation_id="user_register", response_model=UserResponse, tags=["Users"], summary="Register a new user") @app.get("/api/v1/users/me", operation_id="user_me_get", response_model=UserResponse, tags=["Users"], summary="Get current user profile") @app.patch("/api/v1/users/me", operation_id="user_me_patch", response_model=UserResponse, tags=["Users"], summary="Update user profile") @app.put("/api/v1/users/me", operation_id="user_me_put", response_model=UserResponse, tags=["Users"], summary="Update user profile") @app.get("/api/v1/users/me/emergency-contacts", operation_id="user_emergency_contacts_get", response_model=List[EmergencyContactResponse], tags=["Emergency Contacts"], summary="Get all emergency contacts") @app.post("/api/v1/users/me/emergency-contacts", operation_id="user_emergency_contacts_post", response_model=EmergencyContactResponse, tags=["Emergency Contacts"], summary="Create a new emergency contact") @app.get("/api/v1/users/me/emergency-contacts/{contact_id}", operation_id="user_emergency_contact_get", response_model=EmergencyContactResponse, tags=["Emergency Contacts"], summary="Get emergency contact by ID") @app.delete("/api/v1/users/me/emergency-contacts/{contact_id}", operation_id="user_emergency_contact_delete", tags=["Emergency Contacts"], summary="Delete emergency contact") @app.patch("/api/v1/users/me/emergency-contacts/{contact_id}", operation_id="user_emergency_contact_patch", response_model=EmergencyContactResponse, tags=["Emergency Contacts"], summary="Update emergency contact") @app.get("/api/v1/users/dashboard", operation_id="user_dashboard_get", tags=["Users"]) @app.post("/api/v1/users/me/change-password", operation_id="user_change_password", tags=["Users"]) @app.get("/api/v1/profile", operation_id="user_profile_get", response_model=UserResponse, tags=["Users"]) @app.put("/api/v1/profile", operation_id="user_profile_update", response_model=UserResponse, tags=["Users"]) async def user_service_proxy( request: Request, user_create: Optional[UserCreate] = None, user_login: Optional[UserLogin] = None, user_update: Optional[UserUpdate] = None, emergency_contact_create: Optional[EmergencyContactCreate] = None, emergency_contact_update: Optional[EmergencyContactUpdate] = None ): """Proxy requests to User Service""" body = await request.body() print(f"User service proxy: {request.url.path}, method: {request.method}") try: response = await proxy_request( SERVICES["users"], request.url.path, request.method, dict(request.headers), body, dict(request.query_params), user_create=user_create, user_login=user_login, user_update=user_update, emergency_contact_create=emergency_contact_create, emergency_contact_update=emergency_contact_update ) try: response_json = response.json() return JSONResponse( status_code=response.status_code, content=response_json, headers={ k: v for k, v in response.headers.items() if k.lower() not in ["content-length", "transfer-encoding"] }, ) except Exception as e: print(f"Error processing JSON response: {str(e)}") print(f"Response text: {response.text}") return JSONResponse( status_code=500, content={"detail": f"Error processing response: {str(e)}"}, ) except Exception as e: print(f"Error in user_service_proxy: {str(e)}") return JSONResponse( status_code=500, content={"detail": f"Proxy error: {str(e)}"}, ) # Emergency Service routes @app.api_route("/api/v1/emergency/alerts", methods=["POST"], operation_id="emergency_alerts_post") @app.api_route("/api/v1/emergency/alerts", methods=["GET"], operation_id="emergency_alerts_get") @app.api_route("/api/v1/emergency/alerts/my", methods=["GET"], operation_id="emergency_alerts_my_get") @app.api_route("/api/v1/emergency/alerts/nearby", methods=["GET"], operation_id="emergency_alerts_nearby_get") @app.api_route("/api/v1/emergency/alerts/{alert_id}", methods=["GET"], operation_id="emergency_alert_get") @app.api_route("/api/v1/emergency/alerts/{alert_id}", methods=["PATCH"], operation_id="emergency_alert_patch") @app.api_route("/api/v1/emergency/alerts/{alert_id}", methods=["DELETE"], operation_id="emergency_alert_delete") @app.api_route("/api/v1/emergency/alerts/{alert_id}/cancel", methods=["PATCH"], operation_id="emergency_alert_cancel") @app.api_route("/api/v1/emergency/reports", methods=["POST"], operation_id="emergency_reports_post") @app.api_route("/api/v1/emergency/reports", methods=["GET"], operation_id="emergency_reports_get") @app.api_route("/api/v1/emergency/reports/nearby", methods=["GET"], operation_id="emergency_reports_nearby_get") @app.api_route("/api/v1/emergency/reports/{report_id}", methods=["GET"], operation_id="emergency_report_get") @app.api_route("/api/v1/emergency/reports/{report_id}", methods=["PATCH"], operation_id="emergency_report_patch") @app.api_route("/api/v1/emergency/reports/{report_id}", methods=["DELETE"], operation_id="emergency_report_delete") async def emergency_service_proxy(request: Request): """Proxy requests to Emergency Service""" body = await request.body() response = await proxy_request( SERVICES["emergency"], request.url.path, request.method, dict(request.headers), body, dict(request.query_params), ) return JSONResponse( status_code=response.status_code, content=response.json(), headers={ k: v for k, v in response.headers.items() if k.lower() not in ["content-length", "transfer-encoding"] }, ) # Location Service routes @app.api_route("/api/v1/locations/update", methods=["POST"], operation_id="location_update_post") @app.api_route("/api/v1/locations/last", methods=["GET"], operation_id="location_last_get") @app.api_route("/api/v1/locations/history", methods=["GET"], operation_id="location_history_get") @app.api_route("/api/v1/locations/users/nearby", methods=["GET"], operation_id="location_users_nearby_get") @app.api_route("/api/v1/locations/safe-places", methods=["GET"], operation_id="location_safe_places_get") @app.api_route("/api/v1/locations/safe-places", methods=["POST"], operation_id="location_safe_places_post") @app.api_route("/api/v1/locations/safe-places/{place_id}", methods=["GET"], operation_id="location_safe_place_get") @app.api_route("/api/v1/locations/safe-places/{place_id}", methods=["PATCH"], operation_id="location_safe_place_patch") @app.api_route("/api/v1/locations/safe-places/{place_id}", methods=["DELETE"], operation_id="location_safe_place_delete") async def location_service_proxy(request: Request): """Proxy requests to Location Service""" body = await request.body() response = await proxy_request( SERVICES["location"], request.url.path, request.method, dict(request.headers), body, dict(request.query_params), ) return JSONResponse( status_code=response.status_code, content=response.json(), headers={ k: v for k, v in response.headers.items() if k.lower() not in ["content-length", "transfer-encoding"] }, ) # Calendar Service routes @app.api_route("/api/v1/calendar/entries", methods=["GET"], operation_id="calendar_entries_get") @app.api_route("/api/v1/calendar/entries", methods=["POST"], operation_id="calendar_entries_post") @app.api_route("/api/v1/calendar/entries/{entry_id}", methods=["GET"], operation_id="calendar_entry_get") @app.api_route("/api/v1/calendar/entries/{entry_id}", methods=["PUT"], operation_id="calendar_entry_put") @app.api_route("/api/v1/calendar/entries/{entry_id}", methods=["DELETE"], operation_id="calendar_entry_delete") @app.api_route("/api/v1/calendar/cycle-overview", methods=["GET"], operation_id="calendar_cycle_overview_get") @app.api_route("/api/v1/calendar/insights", methods=["GET"], operation_id="calendar_insights_get") @app.api_route("/api/v1/calendar/reminders", methods=["GET"], operation_id="calendar_reminders_get") @app.api_route("/api/v1/calendar/reminders", methods=["POST"], operation_id="calendar_reminders_post") @app.api_route("/api/v1/calendar/settings", methods=["GET"], operation_id="calendar_settings_get") @app.api_route("/api/v1/calendar/settings", methods=["PUT"], operation_id="calendar_settings_put") async def calendar_service_proxy(request: Request): """Proxy requests to Calendar Service""" body = await request.body() response = await proxy_request( SERVICES["calendar"], request.url.path, request.method, dict(request.headers), body, dict(request.query_params), ) return JSONResponse( status_code=response.status_code, content=response.json(), headers={ k: v for k, v in response.headers.items() if k.lower() not in ["content-length", "transfer-encoding"] }, ) # Notification Service routes @app.api_route("/api/v1/notifications/devices", methods=["GET"], operation_id="notifications_devices_get") @app.api_route("/api/v1/notifications/devices", methods=["POST"], operation_id="notifications_devices_post") @app.api_route("/api/v1/notifications/devices/{device_id}", methods=["DELETE"], operation_id="notifications_device_delete") @app.api_route("/api/v1/notifications/devices/{device_id}", methods=["GET"], operation_id="notifications_device_get") @app.api_route("/api/v1/notifications/preferences", methods=["GET"], operation_id="notifications_preferences_get") @app.api_route("/api/v1/notifications/preferences", methods=["POST"], operation_id="notifications_preferences_post") @app.api_route("/api/v1/notifications/test", methods=["POST"], operation_id="notifications_test_post") @app.api_route("/api/v1/notifications/history", methods=["GET"], operation_id="notifications_history_get") async def notification_service_proxy(request: Request): """Proxy requests to Notification Service""" body = await request.body() response = await proxy_request( SERVICES["notifications"], request.url.path, request.method, dict(request.headers), body, dict(request.query_params), ) return JSONResponse( status_code=response.status_code, content=response.json(), headers={ k: v for k, v in response.headers.items() if k.lower() not in ["content-length", "transfer-encoding"] }, ) @app.get("/api/v1/health") async def gateway_health_check(): """Gateway health check""" return {"status": "healthy", "service": "api-gateway"} @app.get("/api/v1/services-status") async def check_services_status(): """Check status of all microservices""" service_status = {} async def check_service(name: str, url: str): try: async with httpx.AsyncClient(timeout=5.0) as client: response = await client.get(f"{url}/api/v1/health") service_status[name] = { "status": "healthy" if response.status_code == 200 else "unhealthy", "response_time_ms": response.elapsed.total_seconds() * 1000, "url": url, } except Exception as e: service_status[name] = {"status": "unhealthy", "error": str(e), "url": url} # Check all services concurrently tasks = [check_service(name, url) for name, url in SERVICES.items()] await asyncio.gather(*tasks) all_healthy = all( status["status"] == "healthy" for status in service_status.values() ) return { "gateway_status": "healthy", "all_services_healthy": all_healthy, "services": service_status, } @app.get("/") async def root(): """Root endpoint with API information""" return { "service": "Women Safety App API Gateway", "version": "1.0.0", "status": "running", "endpoints": { "auth": "/api/v1/auth/register, /api/v1/auth/login", "users": "/api/v1/users/me, /api/v1/users/dashboard", "emergency": "/api/v1/emergency/alerts, /api/v1/emergency/reports", "location": "/api/v1/locations/update, /api/v1/locations/safe-places", "calendar": "/api/v1/calendar/entries, /api/v1/calendar/cycle-overview", "notifications": "/api/v1/notifications/devices, /api/v1/notifications/history", }, "docs": "/docs", } # Переопределение схемы OpenAPI def custom_openapi(): if app.openapi_schema: return app.openapi_schema openapi_schema = get_openapi( title="Women's Safety App API Gateway", version="1.0.0", description="API Gateway для Women's Safety App с поддержкой микросервисной архитектуры", routes=app.routes, ) # Добавление примеров запросов для маршрутов if "paths" in openapi_schema: if "/api/v1/auth/register" in openapi_schema["paths"]: request_example = { "username": "user123", "email": "user@example.com", "password": "Password123!", "full_name": "John Doe", "phone_number": "+7123456789" } path_item = openapi_schema["paths"]["/api/v1/auth/register"] for method in path_item: if "requestBody" in path_item[method]: path_item[method]["requestBody"]["content"]["application/json"]["example"] = request_example if "/api/v1/auth/login" in openapi_schema["paths"]: login_example = { "username": "user123", "password": "Password123!" } path_item = openapi_schema["paths"]["/api/v1/auth/login"] for method in path_item: if "requestBody" in path_item[method]: path_item[method]["requestBody"]["content"]["application/json"]["example"] = login_example app.openapi_schema = openapi_schema return app.openapi_schema app.openapi = custom_openapi if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=8000)