import asyncio from datetime import datetime, timedelta import httpx from fastapi import BackgroundTasks, Depends, FastAPI, HTTPException, status from fastapi.middleware.cors import CORSMiddleware from sqlalchemy import func, select from sqlalchemy.ext.asyncio import AsyncSession from services.emergency_service.models import EmergencyAlert, EmergencyResponse from services.emergency_service.schemas import ( EmergencyAlertCreate, EmergencyAlertResponse, EmergencyResponseCreate, EmergencyResponseResponse, EmergencyStats, ) from services.user_service.models import User from shared.auth import get_current_user_from_token from shared.config import settings from shared.database import AsyncSessionLocal, get_db app = FastAPI(title="Emergency Service", version="1.0.0") # CORS middleware app.add_middleware( CORSMiddleware, allow_origins=settings.CORS_ORIGINS, allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) async def get_current_user( user_data: dict = Depends(get_current_user_from_token), db: AsyncSession = Depends(get_db), ): """Get current user from token via auth dependency.""" # Get full user object from database result = await db.execute(select(User).filter(User.id == user_data["user_id"])) user = result.scalars().first() if user is None: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="User not found" ) return user @app.get("/health") async def health_check(): """Health check endpoint""" return {"status": "healthy", "service": "emergency_service"} async def get_nearby_users( latitude: float, longitude: float, radius_km: float = 1.0 ) -> list: """Get users within radius using Location Service""" async with httpx.AsyncClient() as client: try: response = await client.get( f"http://localhost:8003/api/v1/nearby-users", params={ "latitude": latitude, "longitude": longitude, "radius_km": radius_km, }, timeout=5.0, ) if response.status_code == 200: return response.json() return [] except Exception: return [] async def send_emergency_notifications(alert_id: int, nearby_users: list): """Send push notifications to nearby users""" async with httpx.AsyncClient() as client: try: await client.post( "http://localhost:8005/api/v1/send-emergency-notifications", json={ "alert_id": alert_id, "user_ids": [user["user_id"] for user in nearby_users], }, timeout=10.0, ) except Exception as e: print(f"Failed to send notifications: {e}") @app.post("/api/v1/alert", response_model=EmergencyAlertResponse) async def create_emergency_alert( alert_data: EmergencyAlertCreate, background_tasks: BackgroundTasks, current_user: User = Depends(get_current_user), db: AsyncSession = Depends(get_db), ): """Create new emergency alert and notify nearby users""" # Create alert db_alert = EmergencyAlert( user_id=current_user.id, latitude=alert_data.latitude, longitude=alert_data.longitude, address=alert_data.address, alert_type=alert_data.alert_type.value, message=alert_data.message, ) db.add(db_alert) await db.commit() await db.refresh(db_alert) # Get nearby users and send notifications in background background_tasks.add_task( process_emergency_alert, db_alert.id, alert_data.latitude, alert_data.longitude ) return EmergencyAlertResponse.model_validate(db_alert) async def process_emergency_alert(alert_id: int, latitude: float, longitude: float): """Process emergency alert - get nearby users and send notifications""" # Get nearby users nearby_users = await get_nearby_users( latitude, longitude, settings.MAX_EMERGENCY_RADIUS_KM ) # Update alert with notified users count async with AsyncSessionLocal() as db: result = await db.execute( select(EmergencyAlert).filter(EmergencyAlert.id == alert_id) ) alert = result.scalars().first() if alert: alert.notified_users_count = len(nearby_users) await db.commit() # Send notifications if nearby_users: await send_emergency_notifications(alert_id, nearby_users) @app.post("/api/v1/alert/{alert_id}/respond", response_model=EmergencyResponseResponse) async def respond_to_alert( alert_id: int, response_data: EmergencyResponseCreate, current_user: User = Depends(get_current_user), db: AsyncSession = Depends(get_db), ): """Respond to emergency alert""" # Check if alert exists result = await db.execute( select(EmergencyAlert).filter(EmergencyAlert.id == alert_id) ) alert = result.scalars().first() if not alert: raise HTTPException(status_code=404, detail="Alert not found") if alert.is_resolved: raise HTTPException(status_code=400, detail="Alert already resolved") # Check if user already responded existing_response = await db.execute( select(EmergencyResponse).filter( EmergencyResponse.alert_id == alert_id, EmergencyResponse.responder_id == current_user.id, ) ) if existing_response.scalars().first(): raise HTTPException( status_code=400, detail="You already responded to this alert" ) # Create response db_response = EmergencyResponse( alert_id=alert_id, responder_id=current_user.id, response_type=response_data.response_type.value, message=response_data.message, eta_minutes=response_data.eta_minutes, ) db.add(db_response) # Update responded users count alert.responded_users_count += 1 await db.commit() await db.refresh(db_response) return EmergencyResponseResponse.model_validate(db_response) @app.put("/api/v1/alert/{alert_id}/resolve") async def resolve_alert( alert_id: int, current_user: User = Depends(get_current_user), db: AsyncSession = Depends(get_db), ): """Mark alert as resolved (only by alert creator)""" result = await db.execute( select(EmergencyAlert).filter(EmergencyAlert.id == alert_id) ) alert = result.scalars().first() if not alert: raise HTTPException(status_code=404, detail="Alert not found") if alert.user_id != current_user.id: raise HTTPException(status_code=403, detail="Only alert creator can resolve it") if alert.is_resolved: raise HTTPException(status_code=400, detail="Alert already resolved") alert.is_resolved = True alert.resolved_at = datetime.utcnow() alert.resolved_by = current_user.id await db.commit() return {"message": "Alert resolved successfully"} @app.get("/api/v1/alerts/my", response_model=list[EmergencyAlertResponse]) async def get_my_alerts( current_user: User = Depends(get_current_user), db: AsyncSession = Depends(get_db), limit: int = 50, ): """Get current user's emergency alerts""" result = await db.execute( select(EmergencyAlert) .filter(EmergencyAlert.user_id == current_user.id) .order_by(EmergencyAlert.created_at.desc()) .limit(limit) ) alerts = result.scalars().all() return [EmergencyAlertResponse.model_validate(alert) for alert in alerts] @app.get("/api/v1/alerts/active", response_model=list[EmergencyAlertResponse]) async def get_active_alerts( current_user: User = Depends(get_current_user), db: AsyncSession = Depends(get_db), limit: int = 20, ): """Get active alerts in user's area (last 2 hours)""" # Get user's current location first async with httpx.AsyncClient() as client: try: response = await client.get( f"http://localhost:8003/api/v1/user-location/{current_user.id}", timeout=5.0, ) if response.status_code != 200: raise HTTPException( status_code=400, detail="User location not available" ) location_data = response.json() except Exception: raise HTTPException(status_code=400, detail="Location service unavailable") # Get alerts from last 2 hours two_hours_ago = datetime.utcnow() - timedelta(hours=2) result = await db.execute( select(EmergencyAlert) .filter( EmergencyAlert.is_resolved == False, EmergencyAlert.created_at >= two_hours_ago, ) .order_by(EmergencyAlert.created_at.desc()) .limit(limit) ) alerts = result.scalars().all() return [EmergencyAlertResponse.model_validate(alert) for alert in alerts] @app.get("/api/v1/stats", response_model=EmergencyStats) async def get_emergency_stats( current_user: User = Depends(get_current_user), db: AsyncSession = Depends(get_db) ): """Get emergency service statistics""" # Get total alerts total_result = await db.execute(select(func.count(EmergencyAlert.id))) total_alerts = total_result.scalar() # Get active alerts active_result = await db.execute( select(func.count(EmergencyAlert.id)).filter( EmergencyAlert.is_resolved == False ) ) active_alerts = active_result.scalar() # Get resolved alerts resolved_alerts = total_alerts - active_alerts # Get total responders responders_result = await db.execute( select(func.count(func.distinct(EmergencyResponse.responder_id))) ) total_responders = responders_result.scalar() return EmergencyStats( total_alerts=total_alerts, active_alerts=active_alerts, resolved_alerts=resolved_alerts, avg_response_time_minutes=None, # TODO: Calculate this total_responders=total_responders, ) @app.get("/api/v1/health") async def health_check(): """Health check endpoint""" return {"status": "healthy", "service": "emergency-service"} if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=8002)