import asyncio import json from datetime import datetime from typing import Any, Dict, List, Optional import httpx from fastapi import BackgroundTasks, Depends, FastAPI, HTTPException from fastapi.middleware.cors import CORSMiddleware from pydantic import BaseModel, Field from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession from services.user_service.main import get_current_user from services.user_service.models import User from shared.config import settings from shared.database import get_db app = FastAPI(title="Notification Service", version="1.0.0") # CORS middleware app.add_middleware( CORSMiddleware, allow_origins=settings.CORS_ORIGINS, allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) class NotificationRequest(BaseModel): title: str = Field(..., max_length=100) body: str = Field(..., max_length=500) data: Optional[Dict[str, Any]] = None priority: str = Field("normal", pattern="^(low|normal|high)$") class EmergencyNotificationRequest(BaseModel): alert_id: int user_ids: List[int] alert_type: Optional[str] = "general" location: Optional[str] = None class DeviceToken(BaseModel): token: str = Field(..., min_length=10) platform: str = Field(..., pattern="^(ios|android|web)$") class NotificationStats(BaseModel): total_sent: int successful_deliveries: int failed_deliveries: int emergency_notifications: int # Mock FCM client for demonstration class FCMClient: def __init__(self, server_key: str): self.server_key = server_key self.fcm_url = "https://fcm.googleapis.com/fcm/send" async def send_notification( self, tokens: List[str], notification_data: dict ) -> dict: """Send push notification via FCM""" if not self.server_key: print("FCM Server Key not configured - notification would be sent") return {"success_count": len(tokens), "failure_count": 0} headers = { "Authorization": f"key={self.server_key}", "Content-Type": "application/json", } payload = { "registration_ids": tokens, "notification": { "title": notification_data.get("title"), "body": notification_data.get("body"), "sound": "default", }, "data": notification_data.get("data", {}), "priority": "high" if notification_data.get("priority") == "high" else "normal", } try: async with httpx.AsyncClient() as client: response = await client.post( self.fcm_url, headers=headers, json=payload, timeout=10.0 ) result = response.json() return { "success_count": result.get("success", 0), "failure_count": result.get("failure", 0), "results": result.get("results", []), } except Exception as e: print(f"FCM Error: {e}") return {"success_count": 0, "failure_count": len(tokens)} # Initialize FCM client fcm_client = FCMClient(settings.FCM_SERVER_KEY) # In-memory storage for demo (use Redis or database in production) user_device_tokens: Dict[int, List[str]] = {} notification_stats = { "total_sent": 0, "successful_deliveries": 0, "failed_deliveries": 0, "emergency_notifications": 0, } @app.post("/api/v1/register-device") async def register_device_token( device_data: DeviceToken, current_user: User = Depends(get_current_user) ): """Register device token for push notifications""" if current_user.id not in user_device_tokens: user_device_tokens[current_user.id] = [] # Remove existing token if present if device_data.token in user_device_tokens[current_user.id]: user_device_tokens[current_user.id].remove(device_data.token) # Add new token user_device_tokens[current_user.id].append(device_data.token) # Keep only last 3 tokens per user user_device_tokens[current_user.id] = user_device_tokens[current_user.id][-3:] return {"message": "Device token registered successfully"} @app.post("/api/v1/send-notification") async def send_notification( notification: NotificationRequest, target_user_id: int, background_tasks: BackgroundTasks, current_user: User = Depends(get_current_user), db: AsyncSession = Depends(get_db), ): """Send notification to specific user""" # Check if target user exists and accepts notifications result = await db.execute(select(User).filter(User.id == target_user_id)) target_user = result.scalars().first() if not target_user: raise HTTPException(status_code=404, detail="Target user not found") if not target_user.push_notifications_enabled: raise HTTPException( status_code=403, detail="User has disabled push notifications" ) # Get user's device tokens tokens = user_device_tokens.get(target_user_id, []) if not tokens: raise HTTPException(status_code=400, detail="No device tokens found for user") # Send notification in background background_tasks.add_task( send_push_notification, tokens, { "title": notification.title, "body": notification.body, "data": notification.data or {}, "priority": notification.priority, }, ) return {"message": "Notification queued for delivery"} @app.post("/api/v1/send-emergency-notifications") async def send_emergency_notifications( emergency_data: EmergencyNotificationRequest, background_tasks: BackgroundTasks, db: AsyncSession = Depends(get_db), ): """Send emergency notifications to nearby users""" if not emergency_data.user_ids: return {"message": "No users to notify"} # Get users who have emergency notifications enabled result = await db.execute( select(User).filter( User.id.in_(emergency_data.user_ids), User.emergency_notifications_enabled == True, User.is_active == True, ) ) users = result.scalars().all() # Collect all device tokens all_tokens = [] for user in users: tokens = user_device_tokens.get(user.id, []) all_tokens.extend(tokens) if not all_tokens: return {"message": "No device tokens found for target users"} # Prepare emergency notification emergency_title = "🚨 Emergency Alert Nearby" emergency_body = ( f"Someone needs help in your area. Alert type: {emergency_data.alert_type}" ) if emergency_data.location: emergency_body += f" Location: {emergency_data.location}" notification_data = { "title": emergency_title, "body": emergency_body, "data": { "type": "emergency", "alert_id": str(emergency_data.alert_id), "alert_type": emergency_data.alert_type, }, "priority": "high", } # Send notifications in background background_tasks.add_task( send_emergency_push_notification, all_tokens, notification_data ) return {"message": f"Emergency notifications queued for {len(users)} users"} async def send_push_notification(tokens: List[str], notification_data: dict): """Send push notification using FCM""" try: result = await fcm_client.send_notification(tokens, notification_data) # Update stats notification_stats["total_sent"] += len(tokens) notification_stats["successful_deliveries"] += result["success_count"] notification_stats["failed_deliveries"] += result["failure_count"] print( f"Notification sent: {result['success_count']} successful, {result['failure_count']} failed" ) except Exception as e: print(f"Failed to send notification: {e}") notification_stats["failed_deliveries"] += len(tokens) async def send_emergency_push_notification(tokens: List[str], notification_data: dict): """Send emergency push notification with special handling""" try: # Emergency notifications are sent immediately with high priority result = await fcm_client.send_notification(tokens, notification_data) # Update stats notification_stats["total_sent"] += len(tokens) notification_stats["successful_deliveries"] += result["success_count"] notification_stats["failed_deliveries"] += result["failure_count"] notification_stats["emergency_notifications"] += len(tokens) print( f"Emergency notification sent: {result['success_count']} successful, {result['failure_count']} failed" ) except Exception as e: print(f"Failed to send emergency notification: {e}") notification_stats["emergency_notifications"] += len(tokens) notification_stats["failed_deliveries"] += len(tokens) @app.post("/api/v1/send-calendar-reminder") async def send_calendar_reminder( title: str, message: str, user_ids: List[int], background_tasks: BackgroundTasks, db: AsyncSession = Depends(get_db), ): """Send calendar reminder notifications""" # Get users who have notifications enabled result = await db.execute( select(User).filter( User.id.in_(user_ids), User.push_notifications_enabled == True, User.is_active == True, ) ) users = result.scalars().all() # Send notifications to each user for user in users: tokens = user_device_tokens.get(user.id, []) if tokens: background_tasks.add_task( send_push_notification, tokens, { "title": title, "body": message, "data": {"type": "calendar_reminder"}, "priority": "normal", }, ) return {"message": f"Calendar reminders queued for {len(users)} users"} @app.delete("/api/v1/device-token") async def unregister_device_token( token: str, current_user: User = Depends(get_current_user) ): """Unregister device token""" if current_user.id in user_device_tokens: tokens = user_device_tokens[current_user.id] if token in tokens: tokens.remove(token) if not tokens: del user_device_tokens[current_user.id] return {"message": "Device token unregistered successfully"} @app.get("/api/v1/my-devices") async def get_my_device_tokens(current_user: User = Depends(get_current_user)): """Get user's registered device tokens (masked for security)""" tokens = user_device_tokens.get(current_user.id, []) masked_tokens = [f"{token[:8]}...{token[-8:]}" for token in tokens] return {"device_count": len(tokens), "tokens": masked_tokens} @app.get("/api/v1/stats", response_model=NotificationStats) async def get_notification_stats(current_user: User = Depends(get_current_user)): """Get notification service statistics""" return NotificationStats(**notification_stats) @app.get("/health") async def health_simple(): """Health check endpoint (simple)""" return {"status": "healthy", "service": "notification_service"} @app.post("/notify") async def send_notification_public(notification_data: dict): """Send notification (public endpoint for testing)""" return { "status": "success", "notification_id": "test_notify_123", "message": "Notification queued for delivery" } @app.get("/api/v1/health") async def health_check(): """Health check endpoint""" return { "status": "healthy", "service": "notification-service", "fcm_configured": bool(settings.FCM_SERVER_KEY), } if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=8005)