calendar
All checks were successful
continuous-integration/drone/push Build is passing

This commit is contained in:
2025-09-26 14:12:49 +09:00
parent 0724018895
commit b98034b616
7 changed files with 1213 additions and 87 deletions

View File

@@ -8,9 +8,11 @@ from sqlalchemy import and_, desc, select
from sqlalchemy.ext.asyncio import AsyncSession
from services.calendar_service.models import CalendarEntry, CycleData, HealthInsights
import services.calendar_service.schemas as schemas
from services.calendar_service.schemas import (CalendarEntryCreate, CalendarEntryResponse,
CycleDataResponse, CycleOverview, EntryType,
FlowIntensity, HealthInsightResponse, MoodType)
FlowIntensity, HealthInsightResponse, MoodType,
CalendarEventCreate)
from shared.auth import get_current_user_from_token as get_current_user
from shared.config import settings
from shared.database import get_db
@@ -33,42 +35,7 @@ async def health_check():
return {"status": "healthy", "service": "calendar_service"}
class CycleDataResponse(BaseModel):
id: int
cycle_start_date: date
cycle_length: Optional[int]
period_length: Optional[int]
ovulation_date: Optional[date]
fertile_window_start: Optional[date]
fertile_window_end: Optional[date]
next_period_predicted: Optional[date]
avg_cycle_length: Optional[int]
avg_period_length: Optional[int]
class Config:
from_attributes = True
class HealthInsightResponse(BaseModel):
id: int
insight_type: str
title: str
description: str
recommendation: Optional[str]
confidence_level: str
created_at: datetime
class Config:
from_attributes = True
class CycleOverview(BaseModel):
current_cycle_day: Optional[int]
current_phase: str # menstrual, follicular, ovulation, luteal
next_period_date: Optional[date]
days_until_period: Optional[int]
cycle_regularity: str # very_regular, regular, irregular, very_irregular
avg_cycle_length: Optional[int]
# Используем классы из schemas
def calculate_cycle_phase(
@@ -124,62 +91,28 @@ async def calculate_predictions(user_id: int, db: AsyncSession):
}
@app.post("/api/v1/entries", response_model=CalendarEntryResponse)
async def create_calendar_entry(
@app.post("/api/v1/entries", response_model=CalendarEntryResponse, status_code=201)
@app.post("/api/v1/entry", response_model=CalendarEntryResponse, status_code=201)
async def create_calendar_entry_legacy(
entry_data: CalendarEntryCreate,
current_user: Dict = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
"""Create new calendar entry"""
"""Create a new calendar entry via legacy endpoint"""
return await create_calendar_entry(entry_data, current_user, db)
# Check if entry already exists for this date and type
existing = await db.execute(
select(CalendarEntry).filter(
and_(
CalendarEntry.user_id == current_user["user_id"],
CalendarEntry.entry_date == entry_data.entry_date,
CalendarEntry.entry_type == entry_data.entry_type.value,
)
)
)
if existing.scalars().first():
raise HTTPException(
status_code=400, detail="Entry already exists for this date and type"
)
try:
db_entry = CalendarEntry(
user_id=current_user["user_id"],
entry_date=entry_data.entry_date,
entry_type=entry_data.entry_type.value,
flow_intensity=entry_data.flow_intensity.value
if entry_data.flow_intensity
else None,
period_symptoms=entry_data.period_symptoms,
mood=entry_data.mood.value if entry_data.mood else None,
energy_level=entry_data.energy_level,
sleep_hours=entry_data.sleep_hours,
symptoms=entry_data.symptoms,
medications=entry_data.medications,
notes=entry_data.notes,
)
db.add(db_entry)
await db.commit()
await db.refresh(db_entry)
import logging
logging.info(f"Created calendar entry: {db_entry.id}")
except Exception as e:
import logging
logging.error(f"Error creating calendar entry: {str(e)}")
raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}")
# If this is a period entry, update cycle data
if entry_data.entry_type == EntryType.PERIOD:
await update_cycle_data(current_user["user_id"], entry_data.entry_date, db)
return CalendarEntryResponse.model_validate(db_entry)
@app.post("/api/v1/calendar/entry", response_model=CalendarEntryResponse, status_code=201)
async def create_calendar_entry_mobile_app(
entry_data: CalendarEventCreate,
current_user: Dict = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
"""Create a new calendar entry via mobile app format endpoint"""
# Convert mobile app format to server format
server_entry_data = entry_data.to_server_format()
response = await create_calendar_entry(server_entry_data, current_user, db)
return response
async def update_cycle_data(user_id: int, period_date: date, db: AsyncSession):
@@ -455,6 +388,84 @@ async def health_check():
return {"status": "healthy", "service": "calendar-service"}
# Новый эндпоинт для мобильного приложения
@app.post("/api/v1/calendar/entry", response_model=schemas.CalendarEvent, status_code=201)
async def create_mobile_calendar_entry(
entry_data: schemas.CalendarEventCreate,
current_user: Dict = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
"""Create a new calendar entry from mobile app"""
import logging
logging.info(f"Received mobile entry data: {entry_data}")
# Преобразуем в серверный формат
server_entry_data = entry_data.to_server_format()
logging.info(f"Converted to server format: {server_entry_data}")
# Проверяем существование записи
try:
existing = await db.execute(
select(CalendarEntry).filter(
and_(
CalendarEntry.user_id == current_user["user_id"],
CalendarEntry.entry_date == server_entry_data.entry_date,
CalendarEntry.entry_type == server_entry_data.entry_type.value,
)
)
)
existing_entry = existing.scalars().first()
if existing_entry:
# Если запись существует, обновляем её
if server_entry_data.flow_intensity:
existing_entry.flow_intensity = server_entry_data.flow_intensity.value
if server_entry_data.symptoms:
existing_entry.symptoms = server_entry_data.symptoms
if server_entry_data.mood:
existing_entry.mood = server_entry_data.mood.value
if server_entry_data.notes:
existing_entry.notes = server_entry_data.notes
await db.commit()
await db.refresh(existing_entry)
# Возвращаем обновлённую запись
response = schemas.CalendarEntryResponse.model_validate(existing_entry)
return schemas.CalendarEvent.from_server_response(response)
# Создаем новую запись
db_entry = CalendarEntry(
user_id=current_user["user_id"],
entry_date=server_entry_data.entry_date,
entry_type=server_entry_data.entry_type.value,
flow_intensity=server_entry_data.flow_intensity.value if server_entry_data.flow_intensity else None,
period_symptoms=server_entry_data.period_symptoms,
mood=server_entry_data.mood.value if server_entry_data.mood else None,
energy_level=server_entry_data.energy_level,
sleep_hours=server_entry_data.sleep_hours,
symptoms=server_entry_data.symptoms,
medications=server_entry_data.medications,
notes=server_entry_data.notes,
)
db.add(db_entry)
await db.commit()
await db.refresh(db_entry)
# Если это запись о периоде, обновляем данные цикла
if server_entry_data.entry_type == schemas.EntryType.PERIOD:
await update_cycle_data(current_user["user_id"], server_entry_data.entry_date, db)
# Преобразуем в формат для мобильного приложения
response = schemas.CalendarEntryResponse.model_validate(db_entry)
return schemas.CalendarEvent.from_server_response(response)
except Exception as e:
logging.error(f"Error creating calendar entry: {str(e)}")
raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}")
if __name__ == "__main__":
import uvicorn

View File

@@ -0,0 +1,443 @@
from datetime import date, datetime, timedelta
from typing import Dict, List, Optional
from fastapi import Depends, FastAPI, HTTPException, Query
from fastapi.middleware.cors import CORSMiddleware
from sqlalchemy import and_, desc, select
from sqlalchemy.ext.asyncio import AsyncSession
from services.calendar_service.models import CalendarEntry, CycleData, HealthInsights
from services.calendar_service.schemas import (CalendarEntryCreate, CalendarEntryResponse,
CycleDataResponse, CycleOverview, EntryType,
FlowIntensity, HealthInsightResponse, MoodType)
from shared.auth import get_current_user_from_token as get_current_user
from shared.config import settings
from shared.database import get_db
app = FastAPI(title="Calendar Service", version="1.0.0")
# CORS middleware
app.add_middleware(
CORSMiddleware,
allow_origins=settings.CORS_ORIGINS,
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
@app.get("/health")
async def health_check():
"""Health check endpoint"""
return {"status": "healthy", "service": "calendar_service"}
class CycleDataResponse(BaseModel):
id: int
cycle_start_date: date
cycle_length: Optional[int]
period_length: Optional[int]
ovulation_date: Optional[date]
fertile_window_start: Optional[date]
fertile_window_end: Optional[date]
next_period_predicted: Optional[date]
avg_cycle_length: Optional[int]
avg_period_length: Optional[int]
class Config:
from_attributes = True
class HealthInsightResponse(BaseModel):
id: int
insight_type: str
title: str
description: str
recommendation: Optional[str]
confidence_level: str
created_at: datetime
class Config:
from_attributes = True
class CycleOverview(BaseModel):
current_cycle_day: Optional[int]
current_phase: str # menstrual, follicular, ovulation, luteal
next_period_date: Optional[date]
days_until_period: Optional[int]
cycle_regularity: str # very_regular, regular, irregular, very_irregular
avg_cycle_length: Optional[int]
def calculate_cycle_phase(
cycle_start: date, cycle_length: int, current_date: date
) -> str:
"""Calculate current cycle phase"""
days_since_start = (current_date - cycle_start).days
if days_since_start <= 5:
return "menstrual"
elif days_since_start <= cycle_length // 2 - 2:
return "follicular"
elif cycle_length // 2 - 2 < days_since_start <= cycle_length // 2 + 2:
return "ovulation"
else:
return "luteal"
async def calculate_predictions(user_id: int, db: AsyncSession):
"""Calculate cycle predictions based on historical data"""
# Get last 6 cycles for calculations
cycles = await db.execute(
select(CycleData)
.filter(CycleData.user_id == user_id)
.order_by(desc(CycleData.cycle_start_date))
.limit(6)
)
cycle_list = cycles.scalars().all()
if len(cycle_list) < 2:
return None
# Calculate averages
cycle_lengths = [c.cycle_length for c in cycle_list if c.cycle_length]
period_lengths = [c.period_length for c in cycle_list if c.period_length]
if not cycle_lengths:
return None
avg_cycle = sum(cycle_lengths) / len(cycle_lengths)
avg_period = sum(period_lengths) / len(period_lengths) if period_lengths else 5
# Predict next period
last_cycle = cycle_list[0]
next_period_date = last_cycle.cycle_start_date + timedelta(days=int(avg_cycle))
return {
"avg_cycle_length": int(avg_cycle),
"avg_period_length": int(avg_period),
"next_period_predicted": next_period_date,
"ovulation_date": last_cycle.cycle_start_date
+ timedelta(days=int(avg_cycle // 2)),
}
@app.post("/api/v1/entries", response_model=CalendarEntryResponse)
async def create_calendar_entry(
entry_data: CalendarEntryCreate,
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
"""Create new calendar entry"""
# Check if entry already exists for this date and type
existing = await db.execute(
select(CalendarEntry).filter(
and_(
CalendarEntry.user_id == current_user.id,
CalendarEntry.entry_date == entry_data.entry_date,
CalendarEntry.entry_type == entry_data.entry_type.value,
)
)
)
if existing.scalars().first():
raise HTTPException(
status_code=400, detail="Entry already exists for this date and type"
)
db_entry = CalendarEntry(
user_id=current_user.id,
entry_date=entry_data.entry_date,
entry_type=entry_data.entry_type.value,
flow_intensity=entry_data.flow_intensity.value
if entry_data.flow_intensity
else None,
period_symptoms=entry_data.period_symptoms,
mood=entry_data.mood.value if entry_data.mood else None,
energy_level=entry_data.energy_level,
sleep_hours=entry_data.sleep_hours,
symptoms=entry_data.symptoms,
medications=entry_data.medications,
notes=entry_data.notes,
)
db.add(db_entry)
await db.commit()
await db.refresh(db_entry)
# If this is a period entry, update cycle data
if entry_data.entry_type == EntryType.PERIOD:
await update_cycle_data(current_user.id, entry_data.entry_date, db)
return CalendarEntryResponse.model_validate(db_entry)
async def update_cycle_data(user_id: int, period_date: date, db: AsyncSession):
"""Update cycle data when period is logged"""
# Get last cycle
last_cycle = await db.execute(
select(CycleData)
.filter(CycleData.user_id == user_id)
.order_by(desc(CycleData.cycle_start_date))
.limit(1)
)
last_cycle_data = last_cycle.scalars().first()
if last_cycle_data:
# Calculate cycle length
cycle_length = (period_date - last_cycle_data.cycle_start_date).days
last_cycle_data.cycle_length = cycle_length
# Create new cycle
predictions = await calculate_predictions(user_id, db)
new_cycle = CycleData(
user_id=user_id,
cycle_start_date=period_date,
avg_cycle_length=predictions["avg_cycle_length"] if predictions else None,
next_period_predicted=predictions["next_period_predicted"]
if predictions
else None,
ovulation_date=predictions["ovulation_date"] if predictions else None,
)
db.add(new_cycle)
await db.commit()
@app.get("/api/v1/entries", response_model=List[CalendarEntryResponse])
async def get_calendar_entries(
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
start_date: Optional[date] = Query(None),
end_date: Optional[date] = Query(None),
entry_type: Optional[EntryType] = Query(None),
limit: int = Query(100, ge=1, le=365),
):
"""Get calendar entries with optional filtering"""
query = select(CalendarEntry).filter(CalendarEntry.user_id == current_user.id)
if start_date:
query = query.filter(CalendarEntry.entry_date >= start_date)
if end_date:
query = query.filter(CalendarEntry.entry_date <= end_date)
if entry_type:
query = query.filter(CalendarEntry.entry_type == entry_type.value)
query = query.order_by(desc(CalendarEntry.entry_date)).limit(limit)
result = await db.execute(query)
entries = result.scalars().all()
return [CalendarEntryResponse.model_validate(entry) for entry in entries]
@app.get("/api/v1/cycle-overview", response_model=CycleOverview)
async def get_cycle_overview(
current_user: User = Depends(get_current_user), db: AsyncSession = Depends(get_db)
):
"""Get current cycle overview and predictions"""
# Get current cycle
current_cycle = await db.execute(
select(CycleData)
.filter(CycleData.user_id == current_user.id)
.order_by(desc(CycleData.cycle_start_date))
.limit(1)
)
cycle_data = current_cycle.scalars().first()
if not cycle_data:
return CycleOverview(
current_cycle_day=None,
current_phase="unknown",
next_period_date=None,
days_until_period=None,
cycle_regularity="unknown",
avg_cycle_length=None,
)
today = date.today()
current_cycle_day = (today - cycle_data.cycle_start_date).days + 1
# Calculate current phase
cycle_length = cycle_data.avg_cycle_length or 28
current_phase = calculate_cycle_phase(
cycle_data.cycle_start_date, cycle_length, today
)
# Days until next period
next_period_date = cycle_data.next_period_predicted
days_until_period = None
if next_period_date:
days_until_period = (next_period_date - today).days
# Calculate regularity
cycles = await db.execute(
select(CycleData)
.filter(CycleData.user_id == current_user.id)
.order_by(desc(CycleData.cycle_start_date))
.limit(6)
)
cycle_list = cycles.scalars().all()
regularity = "unknown"
if len(cycle_list) >= 3:
lengths = [c.cycle_length for c in cycle_list if c.cycle_length]
if lengths:
variance = max(lengths) - min(lengths)
if variance <= 2:
regularity = "very_regular"
elif variance <= 5:
regularity = "regular"
elif variance <= 10:
regularity = "irregular"
else:
regularity = "very_irregular"
return CycleOverview(
current_cycle_day=current_cycle_day,
current_phase=current_phase,
next_period_date=next_period_date,
days_until_period=days_until_period,
cycle_regularity=regularity,
avg_cycle_length=cycle_data.avg_cycle_length,
)
@app.get("/api/v1/insights", response_model=List[HealthInsightResponse])
async def get_health_insights(
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
limit: int = Query(10, ge=1, le=50),
):
"""Get personalized health insights"""
result = await db.execute(
select(HealthInsights)
.filter(
HealthInsights.user_id == current_user.id,
HealthInsights.is_dismissed == False,
)
.order_by(desc(HealthInsights.created_at))
.limit(limit)
)
insights = result.scalars().all()
return [HealthInsightResponse.model_validate(insight) for insight in insights]
@app.get("/api/v1/calendar/entries", response_model=List[CalendarEntryResponse])
async def get_all_calendar_entries(
start_date: Optional[date] = None,
end_date: Optional[date] = None,
entry_type: Optional[EntryType] = None,
current_user: Dict = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
limit: int = Query(100, ge=1, le=500),
):
"""Get all calendar entries for the current user"""
query = select(CalendarEntry).filter(CalendarEntry.user_id == current_user["user_id"])
if start_date:
query = query.filter(CalendarEntry.entry_date >= start_date)
if end_date:
query = query.filter(CalendarEntry.entry_date <= end_date)
if entry_type:
query = query.filter(CalendarEntry.entry_type == entry_type)
query = query.order_by(CalendarEntry.entry_date.desc()).limit(limit)
result = await db.execute(query)
entries = result.scalars().all()
return [CalendarEntryResponse.model_validate(entry) for entry in entries]
@app.post("/api/v1/calendar/entries", response_model=CalendarEntryResponse, status_code=201)
async def create_calendar_entry(
entry_data: CalendarEntryCreate,
current_user: Dict = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
"""Create a new calendar entry"""
# Check if entry already exists for this date and type
existing = await db.execute(
select(CalendarEntry).filter(
and_(
CalendarEntry.user_id == current_user.id,
CalendarEntry.entry_date == entry_data.entry_date,
CalendarEntry.entry_type == entry_data.entry_type.value,
)
)
)
if existing.scalars().first():
raise HTTPException(
status_code=400, detail="Entry already exists for this date and type"
)
# Create new calendar entry
new_entry = CalendarEntry(
user_id=current_user.id,
entry_date=entry_data.entry_date,
entry_type=entry_data.entry_type.value,
flow_intensity=entry_data.flow_intensity.value if entry_data.flow_intensity else None,
period_symptoms=entry_data.period_symptoms,
mood=entry_data.mood.value if entry_data.mood else None,
energy_level=entry_data.energy_level,
sleep_hours=entry_data.sleep_hours,
symptoms=entry_data.symptoms,
medications=entry_data.medications,
notes=entry_data.notes,
)
db.add(new_entry)
await db.commit()
await db.refresh(new_entry)
# If this is a period entry, update cycle data
if entry_data.entry_type == EntryType.PERIOD:
await update_cycle_data(current_user.id, entry_data.entry_date, db)
return CalendarEntryResponse.model_validate(new_entry)
@app.delete("/api/v1/entries/{entry_id}")
async def delete_calendar_entry(
entry_id: int,
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
"""Delete calendar entry"""
result = await db.execute(
select(CalendarEntry).filter(
and_(CalendarEntry.id == entry_id, CalendarEntry.user_id == current_user.id)
)
)
entry = result.scalars().first()
if not entry:
raise HTTPException(status_code=404, detail="Entry not found")
await db.delete(entry)
await db.commit()
return {"message": "Entry deleted successfully"}
@app.get("/api/v1/health")
async def health_check():
"""Health check endpoint"""
return {"status": "healthy", "service": "calendar-service"}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8004)

View File

@@ -0,0 +1,247 @@
from datetime import date, datetime
from enum import Enum
from typing import List, Optional, Union
from pydantic import BaseModel, Field
class EntryType(str, Enum):
PERIOD = "period"
OVULATION = "ovulation"
SYMPTOMS = "symptoms"
MEDICATION = "medication"
MOOD = "mood"
EXERCISE = "exercise"
APPOINTMENT = "appointment"
# Мобильное приложение использует другие названия типов
class MobileEntryType(str, Enum):
MENSTRUATION = "MENSTRUATION"
OVULATION = "OVULATION"
SPOTTING = "SPOTTING"
DISCHARGE = "DISCHARGE"
PAIN = "PAIN"
MOOD = "MOOD"
def to_server_type(self) -> EntryType:
"""Конвертировать тип из мобильного приложения в тип сервера"""
mapping = {
self.MENSTRUATION: EntryType.PERIOD,
self.OVULATION: EntryType.OVULATION,
self.SPOTTING: EntryType.SYMPTOMS,
self.DISCHARGE: EntryType.SYMPTOMS,
self.PAIN: EntryType.SYMPTOMS,
self.MOOD: EntryType.MOOD,
}
return mapping.get(self, EntryType.SYMPTOMS)
class FlowIntensity(str, Enum):
LIGHT = "light"
MEDIUM = "medium"
HEAVY = "heavy"
SPOTTING = "spotting"
@classmethod
def from_int(cls, value: int) -> 'FlowIntensity':
"""Конвертировать числовое значение в enum"""
if value <= 1:
return cls.LIGHT
elif value <= 3:
return cls.MEDIUM
else:
return cls.HEAVY
class MobileMoodType(str, Enum):
"""Типы настроения из мобильного приложения"""
HAPPY = "HAPPY"
SAD = "SAD"
NORMAL = "NORMAL"
STRESSED = "STRESSED"
ANXIOUS = "ANXIOUS"
IRRITATED = "IRRITATED"
class MoodType(str, Enum):
HAPPY = "happy"
SAD = "sad"
ANXIOUS = "anxious"
IRRITATED = "irritated"
ENERGETIC = "energetic"
TIRED = "tired"
@classmethod
def from_mobile_mood(cls, mood_type: 'MobileMoodType') -> Optional['MoodType']:
"""Конвертировать из мобильного типа настроения"""
if mood_type == MobileMoodType.NORMAL:
return None
mapping = {
MobileMoodType.HAPPY: cls.HAPPY,
MobileMoodType.SAD: cls.SAD,
MobileMoodType.ANXIOUS: cls.ANXIOUS,
MobileMoodType.IRRITATED: cls.IRRITATED,
MobileMoodType.STRESSED: cls.ANXIOUS,
}
return mapping.get(mood_type)
class CalendarEntryBase(BaseModel):
entry_date: date
entry_type: EntryType
flow_intensity: Optional[FlowIntensity] = None
period_symptoms: Optional[str] = Field(None, max_length=500)
mood: Optional[MoodType] = None
energy_level: Optional[int] = Field(None, ge=1, le=5)
sleep_hours: Optional[int] = Field(None, ge=0, le=24)
symptoms: Optional[str] = Field(None, max_length=1000)
medications: Optional[str] = Field(None, max_length=500)
notes: Optional[str] = Field(None, max_length=1000)
class MobileSymptom(str, Enum):
"""Симптомы, используемые в мобильном приложении"""
CRAMPS = "CRAMPS"
HEADACHE = "HEADACHE"
BLOATING = "BLOATING"
FATIGUE = "FATIGUE"
NAUSEA = "NAUSEA"
BREAST_TENDERNESS = "BREAST_TENDERNESS"
ACNE = "ACNE"
BACKACHE = "BACKACHE"
class CalendarEventCreate(BaseModel):
"""Модель создания события календаря из мобильного приложения"""
date: date
type: MobileEntryType
flow_intensity: Optional[int] = Field(None, ge=1, le=5)
mood: Optional[MobileMoodType] = None
symptoms: Optional[List[MobileSymptom]] = None
notes: Optional[str] = Field(None, max_length=1000)
def to_server_format(self) -> 'CalendarEntryCreate':
"""Преобразовать в серверный формат"""
symptoms_str = None
if self.symptoms:
symptoms_str = ", ".join([s.value for s in self.symptoms])
flow = None
if self.flow_intensity is not None:
flow = FlowIntensity.from_int(self.flow_intensity)
mood_val = None
if self.mood:
mood_val = MoodType.from_mobile_mood(self.mood)
return CalendarEntryCreate(
entry_date=self.date,
entry_type=self.type.to_server_type(),
flow_intensity=flow,
mood=mood_val,
symptoms=symptoms_str,
notes=self.notes,
period_symptoms=None,
energy_level=None,
sleep_hours=None,
medications=None
)
class CalendarEntryCreate(CalendarEntryBase):
pass
class CalendarEntryResponse(BaseModel):
id: int
uuid: str
entry_date: date
entry_type: str
flow_intensity: Optional[str]
period_symptoms: Optional[str]
mood: Optional[str]
energy_level: Optional[int]
sleep_hours: Optional[int]
symptoms: Optional[str]
medications: Optional[str]
notes: Optional[str]
is_predicted: bool
confidence_score: Optional[int]
created_at: datetime
class Config:
from_attributes = True
# Модель ответа для мобильного приложения
class CalendarEvent(BaseModel):
id: int
uuid: str
date: date
type: str
flow_intensity: Optional[int] = None
mood: Optional[str] = None
symptoms: Optional[List[str]] = None
notes: Optional[str] = None
created_at: datetime
@classmethod
def from_server_response(cls, entry: CalendarEntryResponse) -> 'CalendarEvent':
"""Преобразовать из серверной модели в модель для мобильного приложения"""
# Преобразование flow_intensity из строки в число
flow_int = None
if entry.flow_intensity:
if entry.flow_intensity == "light":
flow_int = 1
elif entry.flow_intensity == "medium":
flow_int = 3
elif entry.flow_intensity == "heavy":
flow_int = 5
# Преобразование symptoms из строки в список
symptoms_list = None
if entry.symptoms:
symptoms_list = [s.strip() for s in entry.symptoms.split(",")]
return cls(
id=entry.id,
uuid=entry.uuid,
date=entry.entry_date,
type=entry.entry_type.upper(),
flow_intensity=flow_int,
mood=entry.mood.upper() if entry.mood else None,
symptoms=symptoms_list,
notes=entry.notes,
created_at=entry.created_at
)
class CycleDataResponse(BaseModel):
id: int
cycle_start_date: date
cycle_length: Optional[int]
period_length: Optional[int]
ovulation_date: Optional[date]
class CycleOverview(BaseModel):
current_cycle_day: Optional[int]
current_phase: str # menstrual, follicular, ovulation, luteal
next_period_date: Optional[date]
days_until_period: Optional[int]
cycle_regularity: str # very_regular, regular, irregular, very_irregular
avg_cycle_length: Optional[int]
class HealthInsightResponse(BaseModel):
id: int
insight_type: str
title: str
description: str
recommendation: Optional[str]
confidence_level: str
created_at: datetime
class Config:
from_attributes = True

View File

@@ -0,0 +1,197 @@
from datetime import date
from enum import Enum
from typing import Optional, Dict, Any, List
from pydantic import BaseModel, Field
from .schemas import (
EntryType, FlowIntensity, MoodType, CalendarEntryCreate, CalendarEntryResponse
)
class MobileFlowIntensity(int, Enum):
"""Flow intensity values used in the mobile app (1-5)"""
SPOTTING = 1
LIGHT = 2
MEDIUM = 3
HEAVY = 4
VERY_HEAVY = 5
@classmethod
def to_server_intensity(cls, value: int) -> Optional[str]:
"""Convert mobile app intensity value to server format"""
if value is None:
return None
mapping = {
cls.SPOTTING.value: FlowIntensity.SPOTTING.value,
cls.LIGHT.value: FlowIntensity.LIGHT.value,
cls.MEDIUM.value: FlowIntensity.MEDIUM.value,
cls.HEAVY.value: FlowIntensity.HEAVY.value,
cls.VERY_HEAVY.value: FlowIntensity.HEAVY.value, # Server has no "very heavy"
}
return mapping.get(value)
@classmethod
def from_server_intensity(cls, value: Optional[str]) -> Optional[int]:
"""Convert server intensity value to mobile app format"""
if value is None:
return None
mapping = {
FlowIntensity.SPOTTING.value: cls.SPOTTING.value,
FlowIntensity.LIGHT.value: cls.LIGHT.value,
FlowIntensity.MEDIUM.value: cls.MEDIUM.value,
FlowIntensity.HEAVY.value: cls.HEAVY.value,
}
return mapping.get(value)
class MobileMood(str, Enum):
"""Mood values used in the mobile app"""
HAPPY = "HAPPY"
SAD = "SAD"
ANXIOUS = "ANXIOUS"
IRRITABLE = "IRRITABLE"
ENERGETIC = "ENERGETIC"
TIRED = "TIRED"
@classmethod
def to_server_mood(cls, value: str) -> Optional[MoodType]:
"""Convert mobile app mood value to server format"""
if value is None:
return None
mapping = {
cls.HAPPY: MoodType.HAPPY,
cls.SAD: MoodType.SAD,
cls.ANXIOUS: MoodType.ANXIOUS,
cls.IRRITABLE: MoodType.IRRITATED, # Different spelling
cls.ENERGETIC: MoodType.ENERGETIC,
cls.TIRED: MoodType.TIRED,
}
return mapping.get(MobileMood(value))
@classmethod
def from_server_mood(cls, value: Optional[str]) -> Optional[str]:
"""Convert server mood value to mobile app format"""
if value is None:
return None
# Convert string value to MoodType enum
try:
mood_type = MoodType(value)
except ValueError:
return None
mapping = {
MoodType.HAPPY: cls.HAPPY,
MoodType.SAD: cls.SAD,
MoodType.ANXIOUS: cls.ANXIOUS,
MoodType.IRRITATED: cls.IRRITABLE, # Different spelling
MoodType.ENERGETIC: cls.ENERGETIC,
MoodType.TIRED: cls.TIRED,
}
return mapping.get(mood_type)
class MobileAppCalendarEntryCreate(BaseModel):
"""Schema for creating calendar entries from mobile app"""
entry_date: date
entry_type: str # Will be mapped to EntryType
flow_intensity: Optional[int] = None # 1-5 scale
symptoms: Optional[str] = None
mood: Optional[str] = None # Mobile mood enum as string
notes: Optional[str] = None
class MobileAppCalendarEntryResponse(BaseModel):
"""Schema for calendar entry responses to mobile app"""
id: int
user_id: int
entry_date: date
entry_type: str
flow_intensity: Optional[int] = None
symptoms: Optional[str] = None
mood: Optional[str] = None
notes: Optional[str] = None
created_at: date
updated_at: date
def convert_mobile_app_format_to_server(mobile_entry: MobileAppCalendarEntryCreate) -> CalendarEntryCreate:
"""Convert mobile app format to server format"""
# Map entry type
try:
entry_type = EntryType(mobile_entry.entry_type.lower())
except ValueError:
# Handle case where entry type doesn't match directly
if mobile_entry.entry_type == "MENSTRUATION":
entry_type = EntryType.PERIOD
elif mobile_entry.entry_type == "SPOTTING":
entry_type = EntryType.PERIOD
else:
# Default to symptoms if no direct mapping
entry_type = EntryType.SYMPTOMS
# Map flow intensity (1-5 scale to text values)
flow_intensity_str = None
if mobile_entry.flow_intensity is not None:
flow_intensity_str = MobileFlowIntensity.to_server_intensity(mobile_entry.flow_intensity)
# Map mood
mood_str = None
if mobile_entry.mood is not None:
try:
mobile_mood = MobileMood(mobile_entry.mood)
mood_type = MobileMood.to_server_mood(mobile_entry.mood)
if mood_type:
mood_str = mood_type.value
except ValueError:
# If mood doesn't match any enum value, leave it as None
pass
return CalendarEntryCreate(
entry_date=mobile_entry.entry_date,
entry_type=entry_type,
flow_intensity=flow_intensity_str,
symptoms=mobile_entry.symptoms or "",
mood=mood_str,
notes=mobile_entry.notes or "",
# Default values for server required fields that mobile app doesn't provide
period_symptoms="",
energy_level=0,
sleep_hours=0,
medications="",
)
def convert_server_response_to_mobile_app(
server_response: CalendarEntryResponse
) -> MobileAppCalendarEntryResponse:
"""Convert server response to mobile app format"""
# Map flow intensity back to 1-5 scale
flow_intensity = None
if server_response.flow_intensity is not None:
flow_intensity = MobileFlowIntensity.from_server_intensity(server_response.flow_intensity)
# Map mood back to mobile format
mood = None
if server_response.mood is not None:
mood = MobileMood.from_server_mood(server_response.mood)
# Upper case the entry type
entry_type = server_response.entry_type.upper() if server_response.entry_type else ""
return MobileAppCalendarEntryResponse(
id=server_response.id,
user_id=getattr(server_response, "user_id", 0), # Handle if field doesn't exist
entry_date=server_response.entry_date,
entry_type=entry_type,
flow_intensity=flow_intensity,
symptoms=server_response.symptoms,
mood=mood,
notes=server_response.notes,
created_at=getattr(server_response, "created_at", date.today()),
updated_at=getattr(server_response, "created_at", date.today()), # Fallback to created_at
)