Files
chat/services/calendar_service/main.py
2025-09-25 08:05:25 +09:00

413 lines
12 KiB
Python

from fastapi import FastAPI, HTTPException, Depends, Query
from fastapi.middleware.cors import CORSMiddleware
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, and_, desc
from shared.config import settings
from shared.database import get_db
from services.calendar_service.models import CalendarEntry, CycleData, HealthInsights
from services.user_service.main import get_current_user
from services.user_service.models import User
from pydantic import BaseModel, Field
from typing import List, Optional
from datetime import datetime, date, timedelta
from enum import Enum
app = FastAPI(title="Calendar Service", version="1.0.0")
# CORS middleware
app.add_middleware(
CORSMiddleware,
allow_origins=settings.CORS_ORIGINS,
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
class EntryType(str, Enum):
PERIOD = "period"
OVULATION = "ovulation"
SYMPTOMS = "symptoms"
MEDICATION = "medication"
MOOD = "mood"
EXERCISE = "exercise"
APPOINTMENT = "appointment"
class FlowIntensity(str, Enum):
LIGHT = "light"
MEDIUM = "medium"
HEAVY = "heavy"
SPOTTING = "spotting"
class MoodType(str, Enum):
HAPPY = "happy"
SAD = "sad"
ANXIOUS = "anxious"
IRRITATED = "irritated"
ENERGETIC = "energetic"
TIRED = "tired"
class CalendarEntryCreate(BaseModel):
entry_date: date
entry_type: EntryType
flow_intensity: Optional[FlowIntensity] = None
period_symptoms: Optional[str] = Field(None, max_length=500)
mood: Optional[MoodType] = None
energy_level: Optional[int] = Field(None, ge=1, le=5)
sleep_hours: Optional[int] = Field(None, ge=0, le=24)
symptoms: Optional[str] = Field(None, max_length=1000)
medications: Optional[str] = Field(None, max_length=500)
notes: Optional[str] = Field(None, max_length=1000)
class CalendarEntryResponse(BaseModel):
id: int
uuid: str
entry_date: date
entry_type: str
flow_intensity: Optional[str]
period_symptoms: Optional[str]
mood: Optional[str]
energy_level: Optional[int]
sleep_hours: Optional[int]
symptoms: Optional[str]
medications: Optional[str]
notes: Optional[str]
is_predicted: bool
confidence_score: Optional[int]
created_at: datetime
class Config:
from_attributes = True
class CycleDataResponse(BaseModel):
id: int
cycle_start_date: date
cycle_length: Optional[int]
period_length: Optional[int]
ovulation_date: Optional[date]
fertile_window_start: Optional[date]
fertile_window_end: Optional[date]
next_period_predicted: Optional[date]
avg_cycle_length: Optional[int]
avg_period_length: Optional[int]
class Config:
from_attributes = True
class HealthInsightResponse(BaseModel):
id: int
insight_type: str
title: str
description: str
recommendation: Optional[str]
confidence_level: str
created_at: datetime
class Config:
from_attributes = True
class CycleOverview(BaseModel):
current_cycle_day: Optional[int]
current_phase: str # menstrual, follicular, ovulation, luteal
next_period_date: Optional[date]
days_until_period: Optional[int]
cycle_regularity: str # very_regular, regular, irregular, very_irregular
avg_cycle_length: Optional[int]
def calculate_cycle_phase(cycle_start: date, cycle_length: int, current_date: date) -> str:
"""Calculate current cycle phase"""
days_since_start = (current_date - cycle_start).days
if days_since_start <= 5:
return "menstrual"
elif days_since_start <= cycle_length // 2 - 2:
return "follicular"
elif cycle_length // 2 - 2 < days_since_start <= cycle_length // 2 + 2:
return "ovulation"
else:
return "luteal"
async def calculate_predictions(user_id: int, db: AsyncSession):
"""Calculate cycle predictions based on historical data"""
# Get last 6 cycles for calculations
cycles = await db.execute(
select(CycleData)
.filter(CycleData.user_id == user_id)
.order_by(desc(CycleData.cycle_start_date))
.limit(6)
)
cycle_list = cycles.scalars().all()
if len(cycle_list) < 2:
return None
# Calculate averages
cycle_lengths = [c.cycle_length for c in cycle_list if c.cycle_length]
period_lengths = [c.period_length for c in cycle_list if c.period_length]
if not cycle_lengths:
return None
avg_cycle = sum(cycle_lengths) / len(cycle_lengths)
avg_period = sum(period_lengths) / len(period_lengths) if period_lengths else 5
# Predict next period
last_cycle = cycle_list[0]
next_period_date = last_cycle.cycle_start_date + timedelta(days=int(avg_cycle))
return {
"avg_cycle_length": int(avg_cycle),
"avg_period_length": int(avg_period),
"next_period_predicted": next_period_date,
"ovulation_date": last_cycle.cycle_start_date + timedelta(days=int(avg_cycle // 2))
}
@app.post("/api/v1/entries", response_model=CalendarEntryResponse)
async def create_calendar_entry(
entry_data: CalendarEntryCreate,
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db)
):
"""Create new calendar entry"""
# Check if entry already exists for this date and type
existing = await db.execute(
select(CalendarEntry).filter(
and_(
CalendarEntry.user_id == current_user.id,
CalendarEntry.entry_date == entry_data.entry_date,
CalendarEntry.entry_type == entry_data.entry_type.value
)
)
)
if existing.scalars().first():
raise HTTPException(
status_code=400,
detail="Entry already exists for this date and type"
)
db_entry = CalendarEntry(
user_id=current_user.id,
entry_date=entry_data.entry_date,
entry_type=entry_data.entry_type.value,
flow_intensity=entry_data.flow_intensity.value if entry_data.flow_intensity else None,
period_symptoms=entry_data.period_symptoms,
mood=entry_data.mood.value if entry_data.mood else None,
energy_level=entry_data.energy_level,
sleep_hours=entry_data.sleep_hours,
symptoms=entry_data.symptoms,
medications=entry_data.medications,
notes=entry_data.notes,
)
db.add(db_entry)
await db.commit()
await db.refresh(db_entry)
# If this is a period entry, update cycle data
if entry_data.entry_type == EntryType.PERIOD:
await update_cycle_data(current_user.id, entry_data.entry_date, db)
return CalendarEntryResponse.model_validate(db_entry)
async def update_cycle_data(user_id: int, period_date: date, db: AsyncSession):
"""Update cycle data when period is logged"""
# Get last cycle
last_cycle = await db.execute(
select(CycleData)
.filter(CycleData.user_id == user_id)
.order_by(desc(CycleData.cycle_start_date))
.limit(1)
)
last_cycle_data = last_cycle.scalars().first()
if last_cycle_data:
# Calculate cycle length
cycle_length = (period_date - last_cycle_data.cycle_start_date).days
last_cycle_data.cycle_length = cycle_length
# Create new cycle
predictions = await calculate_predictions(user_id, db)
new_cycle = CycleData(
user_id=user_id,
cycle_start_date=period_date,
avg_cycle_length=predictions["avg_cycle_length"] if predictions else None,
next_period_predicted=predictions["next_period_predicted"] if predictions else None,
ovulation_date=predictions["ovulation_date"] if predictions else None,
)
db.add(new_cycle)
await db.commit()
@app.get("/api/v1/entries", response_model=List[CalendarEntryResponse])
async def get_calendar_entries(
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
start_date: Optional[date] = Query(None),
end_date: Optional[date] = Query(None),
entry_type: Optional[EntryType] = Query(None),
limit: int = Query(100, ge=1, le=365)
):
"""Get calendar entries with optional filtering"""
query = select(CalendarEntry).filter(CalendarEntry.user_id == current_user.id)
if start_date:
query = query.filter(CalendarEntry.entry_date >= start_date)
if end_date:
query = query.filter(CalendarEntry.entry_date <= end_date)
if entry_type:
query = query.filter(CalendarEntry.entry_type == entry_type.value)
query = query.order_by(desc(CalendarEntry.entry_date)).limit(limit)
result = await db.execute(query)
entries = result.scalars().all()
return [CalendarEntryResponse.model_validate(entry) for entry in entries]
@app.get("/api/v1/cycle-overview", response_model=CycleOverview)
async def get_cycle_overview(
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db)
):
"""Get current cycle overview and predictions"""
# Get current cycle
current_cycle = await db.execute(
select(CycleData)
.filter(CycleData.user_id == current_user.id)
.order_by(desc(CycleData.cycle_start_date))
.limit(1)
)
cycle_data = current_cycle.scalars().first()
if not cycle_data:
return CycleOverview(
current_cycle_day=None,
current_phase="unknown",
next_period_date=None,
days_until_period=None,
cycle_regularity="unknown",
avg_cycle_length=None
)
today = date.today()
current_cycle_day = (today - cycle_data.cycle_start_date).days + 1
# Calculate current phase
cycle_length = cycle_data.avg_cycle_length or 28
current_phase = calculate_cycle_phase(cycle_data.cycle_start_date, cycle_length, today)
# Days until next period
next_period_date = cycle_data.next_period_predicted
days_until_period = None
if next_period_date:
days_until_period = (next_period_date - today).days
# Calculate regularity
cycles = await db.execute(
select(CycleData)
.filter(CycleData.user_id == current_user.id)
.order_by(desc(CycleData.cycle_start_date))
.limit(6)
)
cycle_list = cycles.scalars().all()
regularity = "unknown"
if len(cycle_list) >= 3:
lengths = [c.cycle_length for c in cycle_list if c.cycle_length]
if lengths:
variance = max(lengths) - min(lengths)
if variance <= 2:
regularity = "very_regular"
elif variance <= 5:
regularity = "regular"
elif variance <= 10:
regularity = "irregular"
else:
regularity = "very_irregular"
return CycleOverview(
current_cycle_day=current_cycle_day,
current_phase=current_phase,
next_period_date=next_period_date,
days_until_period=days_until_period,
cycle_regularity=regularity,
avg_cycle_length=cycle_data.avg_cycle_length
)
@app.get("/api/v1/insights", response_model=List[HealthInsightResponse])
async def get_health_insights(
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
limit: int = Query(10, ge=1, le=50)
):
"""Get personalized health insights"""
result = await db.execute(
select(HealthInsights)
.filter(
HealthInsights.user_id == current_user.id,
HealthInsights.is_dismissed == False
)
.order_by(desc(HealthInsights.created_at))
.limit(limit)
)
insights = result.scalars().all()
return [HealthInsightResponse.model_validate(insight) for insight in insights]
@app.delete("/api/v1/entries/{entry_id}")
async def delete_calendar_entry(
entry_id: int,
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db)
):
"""Delete calendar entry"""
result = await db.execute(
select(CalendarEntry).filter(
and_(
CalendarEntry.id == entry_id,
CalendarEntry.user_id == current_user.id
)
)
)
entry = result.scalars().first()
if not entry:
raise HTTPException(status_code=404, detail="Entry not found")
await db.delete(entry)
await db.commit()
return {"message": "Entry deleted successfully"}
@app.get("/api/v1/health")
async def health_check():
"""Health check endpoint"""
return {"status": "healthy", "service": "calendar-service"}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8004)