Files
chat/services/location_service/main.py
Andrew K. Choi 4e3768a6ee
Some checks failed
continuous-integration/drone/push Build is failing
pipeline issues fix
2025-09-25 11:59:54 +09:00

321 lines
9.5 KiB
Python

import math
from datetime import datetime, timedelta
from typing import List, Optional
from fastapi import Depends, FastAPI, HTTPException, Query
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel, Field
from sqlalchemy import select, text
from sqlalchemy.ext.asyncio import AsyncSession
from services.location_service.models import LocationHistory, UserLocation
from services.user_service.main import get_current_user
from services.user_service.models import User
from shared.cache import CacheService
from shared.config import settings
from shared.database import get_db
app = FastAPI(title="Location Service", version="1.0.0")
# CORS middleware
app.add_middleware(
CORSMiddleware,
allow_origins=settings.CORS_ORIGINS,
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
class LocationUpdate(BaseModel):
latitude: float = Field(..., ge=-90, le=90)
longitude: float = Field(..., ge=-180, le=180)
accuracy: Optional[float] = Field(None, ge=0)
altitude: Optional[float] = None
speed: Optional[float] = Field(None, ge=0)
heading: Optional[float] = Field(None, ge=0, le=360)
class LocationResponse(BaseModel):
user_id: int
latitude: float
longitude: float
accuracy: Optional[float]
updated_at: datetime
class Config:
from_attributes = True
class NearbyUserResponse(BaseModel):
user_id: int
latitude: float
longitude: float
distance_meters: float
last_seen: datetime
def calculate_distance(lat1: float, lon1: float, lat2: float, lon2: float) -> float:
"""Calculate distance between two points using Haversine formula (in meters)"""
R = 6371000 # Earth's radius in meters
lat1_rad = math.radians(lat1)
lat2_rad = math.radians(lat2)
delta_lat = math.radians(lat2 - lat1)
delta_lon = math.radians(lon2 - lon1)
a = math.sin(delta_lat / 2) * math.sin(delta_lat / 2) + math.cos(
lat1_rad
) * math.cos(lat2_rad) * math.sin(delta_lon / 2) * math.sin(delta_lon / 2)
c = 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a))
distance = R * c
return distance
@app.post("/api/v1/update-location")
async def update_user_location(
location_data: LocationUpdate,
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
"""Update user's current location"""
if not current_user.location_sharing_enabled:
raise HTTPException(status_code=403, detail="Location sharing is disabled")
# Update or create current location
result = await db.execute(
select(UserLocation).filter(UserLocation.user_id == current_user.id)
)
user_location = result.scalars().first()
if user_location:
user_location.latitude = location_data.latitude
user_location.longitude = location_data.longitude
user_location.accuracy = location_data.accuracy
user_location.altitude = location_data.altitude
user_location.speed = location_data.speed
user_location.heading = location_data.heading
else:
user_location = UserLocation(
user_id=current_user.id,
latitude=location_data.latitude,
longitude=location_data.longitude,
accuracy=location_data.accuracy,
altitude=location_data.altitude,
speed=location_data.speed,
heading=location_data.heading,
)
db.add(user_location)
# Save to history
location_history = LocationHistory(
user_id=current_user.id,
latitude=location_data.latitude,
longitude=location_data.longitude,
accuracy=location_data.accuracy,
recorded_at=datetime.utcnow(),
)
db.add(location_history)
await db.commit()
# Cache location for fast access
await CacheService.set_location(
current_user.id,
location_data.latitude,
location_data.longitude,
expire=300, # 5 minutes
)
return {"message": "Location updated successfully"}
@app.get("/api/v1/user-location/{user_id}", response_model=LocationResponse)
async def get_user_location(
user_id: int,
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
"""Get specific user's location (if sharing is enabled)"""
# Check if requested user exists and has location sharing enabled
result = await db.execute(select(User).filter(User.id == user_id))
target_user = result.scalars().first()
if not target_user:
raise HTTPException(status_code=404, detail="User not found")
if not target_user.location_sharing_enabled and target_user.id != current_user.id:
raise HTTPException(
status_code=403, detail="User has disabled location sharing"
)
# Try cache first
cached_location = await CacheService.get_location(user_id)
if cached_location:
lat, lng = cached_location
return LocationResponse(
user_id=user_id,
latitude=lat,
longitude=lng,
accuracy=None,
updated_at=datetime.utcnow(),
)
# Get from database
result = await db.execute(
select(UserLocation).filter(UserLocation.user_id == user_id)
)
user_location = result.scalars().first()
if not user_location:
raise HTTPException(status_code=404, detail="Location not found")
return LocationResponse.model_validate(user_location)
@app.get("/api/v1/nearby-users", response_model=List[NearbyUserResponse])
async def get_nearby_users(
latitude: float = Query(..., ge=-90, le=90),
longitude: float = Query(..., ge=-180, le=180),
radius_km: float = Query(1.0, ge=0.1, le=10.0),
limit: int = Query(50, ge=1, le=200),
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
"""Find users within specified radius"""
# Convert radius to degrees (approximate)
# 1 degree ≈ 111 km
radius_deg = radius_km / 111.0
# Query for nearby users with location sharing enabled
# Using bounding box for initial filtering (more efficient than distance calculation)
query = text(
"""
SELECT
ul.user_id,
ul.latitude,
ul.longitude,
ul.updated_at,
u.location_sharing_enabled
FROM user_locations ul
JOIN users u ON ul.user_id = u.id
WHERE u.location_sharing_enabled = true
AND u.is_active = true
AND ul.user_id != :current_user_id
AND ul.latitude BETWEEN :lat_min AND :lat_max
AND ul.longitude BETWEEN :lng_min AND :lng_max
AND ul.updated_at > :time_threshold
LIMIT :limit_val
"""
)
time_threshold = datetime.utcnow() - timedelta(minutes=15) # Only recent locations
result = await db.execute(
query,
{
"current_user_id": current_user.id,
"lat_min": latitude - radius_deg,
"lat_max": latitude + radius_deg,
"lng_min": longitude - radius_deg,
"lng_max": longitude + radius_deg,
"time_threshold": time_threshold,
"limit_val": limit,
},
)
nearby_users = []
for row in result:
# Calculate exact distance
distance = calculate_distance(latitude, longitude, row.latitude, row.longitude)
# Filter by exact radius
if distance <= radius_km * 1000: # Convert km to meters
nearby_users.append(
NearbyUserResponse(
user_id=row.user_id,
latitude=row.latitude,
longitude=row.longitude,
distance_meters=distance,
last_seen=row.updated_at,
)
)
# Sort by distance
nearby_users.sort(key=lambda x: x.distance_meters)
return nearby_users
@app.get("/api/v1/location-history")
async def get_location_history(
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
hours: int = Query(24, ge=1, le=168), # Max 1 week
limit: int = Query(100, ge=1, le=1000),
):
"""Get user's location history"""
time_threshold = datetime.utcnow() - timedelta(hours=hours)
result = await db.execute(
select(LocationHistory)
.filter(
LocationHistory.user_id == current_user.id,
LocationHistory.recorded_at >= time_threshold,
)
.order_by(LocationHistory.recorded_at.desc())
.limit(limit)
)
history = result.scalars().all()
return [
{
"latitude": entry.latitude,
"longitude": entry.longitude,
"accuracy": entry.accuracy,
"recorded_at": entry.recorded_at,
}
for entry in history
]
@app.delete("/api/v1/location")
async def delete_user_location(
current_user: User = Depends(get_current_user), db: AsyncSession = Depends(get_db)
):
"""Delete user's current location"""
# Delete current location
result = await db.execute(
select(UserLocation).filter(UserLocation.user_id == current_user.id)
)
user_location = result.scalars().first()
if user_location:
await db.delete(user_location)
await db.commit()
# Clear cache
await CacheService.delete(f"location:{current_user.id}")
return {"message": "Location deleted successfully"}
@app.get("/api/v1/health")
async def health_check():
"""Health check endpoint"""
return {"status": "healthy", "service": "location-service"}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8003)