calendar features
All checks were successful
continuous-integration/drone/push Build is passing

This commit is contained in:
2025-09-26 14:45:00 +09:00
parent b98034b616
commit 64171196b6
18 changed files with 2189 additions and 0 deletions

View File

@@ -617,6 +617,10 @@ async def location_service_proxy(request: Request):
@app.api_route("/api/v1/calendar/reminders", methods=["POST"], operation_id="calendar_reminders_post")
@app.api_route("/api/v1/calendar/settings", methods=["GET"], operation_id="calendar_settings_get")
@app.api_route("/api/v1/calendar/settings", methods=["PUT"], operation_id="calendar_settings_put")
# Мобильное API для календаря
@app.api_route("/api/v1/entry", methods=["POST"], operation_id="mobile_calendar_entry_post")
@app.api_route("/api/v1/entries", methods=["GET"], operation_id="mobile_calendar_entries_get")
@app.api_route("/api/v1/entries", methods=["POST"], operation_id="mobile_calendar_entries_post")
async def calendar_service_proxy(request: Request):
"""Proxy requests to Calendar Service"""
body = await request.body()

View File

@@ -34,6 +34,64 @@ async def health_check():
"""Health check endpoint"""
return {"status": "healthy", "service": "calendar_service"}
@app.get("/debug/entries")
async def debug_entries(db: AsyncSession = Depends(get_db)):
"""Debug endpoint for entries without auth"""
# Получить последние 10 записей из БД для отладки
query = select(CalendarEntry).limit(10)
result = await db.execute(query)
entries = result.scalars().all()
# Преобразовать в словари для ответа
entries_list = []
for entry in entries:
entry_dict = {
"id": entry.id,
"user_id": entry.user_id,
"entry_date": str(entry.entry_date),
"entry_type": entry.entry_type,
"note": entry.notes,
"symptoms": entry.symptoms,
"flow_intensity": entry.flow_intensity,
"mood": entry.mood,
"created_at": str(entry.created_at)
}
entries_list.append(entry_dict)
return entries_list
@app.get("/debug/add-entry")
async def debug_add_entry(db: AsyncSession = Depends(get_db)):
"""Debug endpoint to add a test entry without auth"""
try:
# Создаем тестовую запись
new_entry = CalendarEntry(
user_id=29, # ID пользователя
entry_date=date.today(),
entry_type="period",
flow_intensity="medium",
period_symptoms=["cramps", "headache"],
mood="neutral",
symptoms=["headache", "cramps"],
notes="Test entry added via debug endpoint",
is_predicted=False
)
db.add(new_entry)
await db.commit()
await db.refresh(new_entry)
return {
"message": "Test entry added successfully",
"entry_id": new_entry.id,
"user_id": new_entry.user_id,
"entry_date": str(new_entry.entry_date),
"entry_type": new_entry.entry_type
}
except Exception as e:
await db.rollback()
return {"error": str(e)}
# Используем классы из schemas

View File

@@ -0,0 +1,472 @@
from datetime import date, datetime, timedelta
from typing import Dict, List, Optional
from fastapi import Depends, FastAPI, HTTPException, Query
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from sqlalchemy import and_, desc, select
from sqlalchemy.ext.asyncio import AsyncSession
from services.calendar_service.models import CalendarEntry, CycleData, HealthInsights
import services.calendar_service.schemas as schemas
from services.calendar_service.schemas import (CalendarEntryCreate, CalendarEntryResponse,
CycleDataResponse, CycleOverview, EntryType,
FlowIntensity, HealthInsightResponse, MoodType,
CalendarEventCreate)
from shared.auth import get_current_user_from_token as get_current_user
from shared.config import settings
from shared.database import get_db
app = FastAPI(title="Calendar Service", version="1.0.0")
# CORS middleware
app.add_middleware(
CORSMiddleware,
allow_origins=settings.CORS_ORIGINS,
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
@app.get("/health")
async def health_check():
"""Health check endpoint"""
return {"status": "healthy", "service": "calendar_service"}
# Используем классы из schemas
def calculate_cycle_phase(
cycle_start: date, cycle_length: int, current_date: date
) -> str:
"""Calculate current cycle phase"""
days_since_start = (current_date - cycle_start).days
if days_since_start <= 5:
return "menstrual"
elif days_since_start <= cycle_length // 2 - 2:
return "follicular"
elif cycle_length // 2 - 2 < days_since_start <= cycle_length // 2 + 2:
return "ovulation"
else:
return "luteal"
async def calculate_predictions(user_id: int, db: AsyncSession):
"""Calculate cycle predictions based on historical data"""
# Get last 6 cycles for calculations
cycles = await db.execute(
select(CycleData)
.filter(CycleData.user_id == user_id)
.order_by(desc(CycleData.cycle_start_date))
.limit(6)
)
cycle_list = cycles.scalars().all()
if len(cycle_list) < 2:
return None
# Calculate averages
cycle_lengths = [c.cycle_length for c in cycle_list if c.cycle_length]
period_lengths = [c.period_length for c in cycle_list if c.period_length]
if not cycle_lengths:
return None
avg_cycle = sum(cycle_lengths) / len(cycle_lengths)
avg_period = sum(period_lengths) / len(period_lengths) if period_lengths else 5
# Predict next period
last_cycle = cycle_list[0]
next_period_date = last_cycle.cycle_start_date + timedelta(days=int(avg_cycle))
return {
"avg_cycle_length": int(avg_cycle),
"avg_period_length": int(avg_period),
"next_period_predicted": next_period_date,
"ovulation_date": last_cycle.cycle_start_date
+ timedelta(days=int(avg_cycle // 2)),
}
@app.post("/api/v1/entries", response_model=CalendarEntryResponse, status_code=201)
@app.post("/api/v1/entry", response_model=CalendarEntryResponse, status_code=201)
async def create_calendar_entry_legacy(
entry_data: CalendarEntryCreate,
current_user: Dict = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
"""Create a new calendar entry via legacy endpoint"""
return await create_calendar_entry(entry_data, current_user, db)
@app.post("/api/v1/calendar/entry", response_model=CalendarEntryResponse, status_code=201)
async def create_calendar_entry_mobile_app(
entry_data: CalendarEventCreate,
current_user: Dict = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
"""Create a new calendar entry via mobile app format endpoint"""
# Convert mobile app format to server format
server_entry_data = entry_data.to_server_format()
response = await create_calendar_entry(server_entry_data, current_user, db)
return response
async def update_cycle_data(user_id: int, period_date: date, db: AsyncSession):
"""Update cycle data when period is logged"""
# Get last cycle
last_cycle = await db.execute(
select(CycleData)
.filter(CycleData.user_id == user_id)
.order_by(desc(CycleData.cycle_start_date))
.limit(1)
)
last_cycle_data = last_cycle.scalars().first()
if last_cycle_data:
# Calculate cycle length
cycle_length = (period_date - last_cycle_data.cycle_start_date).days
last_cycle_data.cycle_length = cycle_length
# Create new cycle
predictions = await calculate_predictions(user_id, db)
new_cycle = CycleData(
user_id=user_id,
cycle_start_date=period_date,
avg_cycle_length=predictions["avg_cycle_length"] if predictions else None,
next_period_predicted=predictions["next_period_predicted"]
if predictions
else None,
ovulation_date=predictions["ovulation_date"] if predictions else None,
)
db.add(new_cycle)
await db.commit()
@app.get("/api/v1/entries", response_model=List[CalendarEntryResponse])
async def get_calendar_entries(
current_user: Dict = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
start_date: Optional[date] = Query(None),
end_date: Optional[date] = Query(None),
entry_type: Optional[EntryType] = Query(None),
limit: int = Query(100, ge=1, le=365),
):
"""Get calendar entries with optional filtering"""
query = select(CalendarEntry).filter(CalendarEntry.user_id == current_user["user_id"])
if start_date:
query = query.filter(CalendarEntry.entry_date >= start_date)
if end_date:
query = query.filter(CalendarEntry.entry_date <= end_date)
if entry_type:
query = query.filter(CalendarEntry.entry_type == entry_type.value)
query = query.order_by(desc(CalendarEntry.entry_date)).limit(limit)
result = await db.execute(query)
entries = result.scalars().all()
return [CalendarEntryResponse.model_validate(entry) for entry in entries]
@app.get("/api/v1/cycle-overview", response_model=CycleOverview)
async def get_cycle_overview(
current_user: Dict = Depends(get_current_user), db: AsyncSession = Depends(get_db)
):
"""Get current cycle overview and predictions"""
# Get current cycle
current_cycle = await db.execute(
select(CycleData)
.filter(CycleData.user_id == current_user["user_id"])
.order_by(desc(CycleData.cycle_start_date))
.limit(1)
)
cycle_data = current_cycle.scalars().first()
if not cycle_data:
return CycleOverview(
current_cycle_day=None,
current_phase="unknown",
next_period_date=None,
days_until_period=None,
cycle_regularity="unknown",
avg_cycle_length=None,
)
today = date.today()
current_cycle_day = (today - cycle_data.cycle_start_date).days + 1
# Calculate current phase
cycle_length = cycle_data.avg_cycle_length or 28
current_phase = calculate_cycle_phase(
cycle_data.cycle_start_date, cycle_length, today
)
# Days until next period
next_period_date = cycle_data.next_period_predicted
days_until_period = None
if next_period_date:
days_until_period = (next_period_date - today).days
# Calculate regularity
cycles = await db.execute(
select(CycleData)
.filter(CycleData.user_id == current_user["user_id"])
.order_by(desc(CycleData.cycle_start_date))
.limit(6)
)
cycle_list = cycles.scalars().all()
regularity = "unknown"
if len(cycle_list) >= 3:
lengths = [c.cycle_length for c in cycle_list if c.cycle_length]
if lengths:
variance = max(lengths) - min(lengths)
if variance <= 2:
regularity = "very_regular"
elif variance <= 5:
regularity = "regular"
elif variance <= 10:
regularity = "irregular"
else:
regularity = "very_irregular"
return CycleOverview(
current_cycle_day=current_cycle_day,
current_phase=current_phase,
next_period_date=next_period_date,
days_until_period=days_until_period,
cycle_regularity=regularity,
avg_cycle_length=cycle_data.avg_cycle_length,
)
@app.get("/api/v1/insights", response_model=List[HealthInsightResponse])
async def get_health_insights(
current_user: Dict = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
limit: int = Query(10, ge=1, le=50),
):
"""Get personalized health insights"""
result = await db.execute(
select(HealthInsights)
.filter(
HealthInsights.user_id == current_user["user_id"],
HealthInsights.is_dismissed == False,
)
.order_by(desc(HealthInsights.created_at))
.limit(limit)
)
insights = result.scalars().all()
return [HealthInsightResponse.model_validate(insight) for insight in insights]
@app.get("/api/v1/calendar/entries", response_model=List[CalendarEntryResponse])
async def get_all_calendar_entries(
start_date: Optional[date] = None,
end_date: Optional[date] = None,
entry_type: Optional[EntryType] = None,
current_user: Dict = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
limit: int = Query(100, ge=1, le=500),
):
"""Get all calendar entries for the current user"""
query = select(CalendarEntry).filter(CalendarEntry.user_id == current_user["user_id"])
if start_date:
query = query.filter(CalendarEntry.entry_date >= start_date)
if end_date:
query = query.filter(CalendarEntry.entry_date <= end_date)
if entry_type:
query = query.filter(CalendarEntry.entry_type == entry_type)
query = query.order_by(CalendarEntry.entry_date.desc()).limit(limit)
result = await db.execute(query)
entries = result.scalars().all()
return [CalendarEntryResponse.model_validate(entry) for entry in entries]
@app.post("/api/v1/calendar/entries", response_model=CalendarEntryResponse, status_code=201)
async def create_calendar_entry(
entry_data: CalendarEntryCreate,
current_user: Dict = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
"""Create a new calendar entry"""
# Debug prints
import logging
logging.info(f"Current user: {current_user}")
logging.info(f"Entry data: {entry_data}")
try:
# Check if entry already exists for this date and type
existing = await db.execute(
select(CalendarEntry).filter(
and_(
CalendarEntry.user_id == current_user["user_id"],
CalendarEntry.entry_date == entry_data.entry_date,
CalendarEntry.entry_type == entry_data.entry_type.value,
)
)
)
if existing.scalars().first():
raise HTTPException(
status_code=400, detail="Entry already exists for this date and type"
)
except Exception as e:
logging.error(f"Error checking for existing entry: {str(e)}")
raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}")
# Create new calendar entry
new_entry = CalendarEntry(
user_id=current_user["user_id"],
entry_date=entry_data.entry_date,
entry_type=entry_data.entry_type.value,
flow_intensity=entry_data.flow_intensity.value if entry_data.flow_intensity else None,
period_symptoms=entry_data.period_symptoms,
mood=entry_data.mood.value if entry_data.mood else None,
energy_level=entry_data.energy_level,
sleep_hours=entry_data.sleep_hours,
symptoms=entry_data.symptoms,
medications=entry_data.medications,
notes=entry_data.notes,
)
db.add(new_entry)
await db.commit()
await db.refresh(new_entry)
# If this is a period entry, update cycle data
if entry_data.entry_type == EntryType.PERIOD:
await update_cycle_data(current_user["user_id"], entry_data.entry_date, db)
return CalendarEntryResponse.model_validate(new_entry)
@app.delete("/api/v1/entries/{entry_id}")
async def delete_calendar_entry(
entry_id: int,
current_user: Dict = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
"""Delete calendar entry"""
result = await db.execute(
select(CalendarEntry).filter(
and_(CalendarEntry.id == entry_id, CalendarEntry.user_id == current_user["user_id"])
)
)
entry = result.scalars().first()
if not entry:
raise HTTPException(status_code=404, detail="Entry not found")
await db.delete(entry)
await db.commit()
return {"message": "Entry deleted successfully"}
@app.get("/api/v1/health")
async def health_check():
"""Health check endpoint"""
return {"status": "healthy", "service": "calendar-service"}
# Новый эндпоинт для мобильного приложения
@app.post("/api/v1/calendar/entry", response_model=schemas.CalendarEvent, status_code=201)
async def create_mobile_calendar_entry(
entry_data: schemas.CalendarEventCreate,
current_user: Dict = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
"""Create a new calendar entry from mobile app"""
import logging
logging.info(f"Received mobile entry data: {entry_data}")
# Преобразуем в серверный формат
server_entry_data = entry_data.to_server_format()
logging.info(f"Converted to server format: {server_entry_data}")
# Проверяем существование записи
try:
existing = await db.execute(
select(CalendarEntry).filter(
and_(
CalendarEntry.user_id == current_user["user_id"],
CalendarEntry.entry_date == server_entry_data.entry_date,
CalendarEntry.entry_type == server_entry_data.entry_type.value,
)
)
)
existing_entry = existing.scalars().first()
if existing_entry:
# Если запись существует, обновляем её
if server_entry_data.flow_intensity:
existing_entry.flow_intensity = server_entry_data.flow_intensity.value
if server_entry_data.symptoms:
existing_entry.symptoms = server_entry_data.symptoms
if server_entry_data.mood:
existing_entry.mood = server_entry_data.mood.value
if server_entry_data.notes:
existing_entry.notes = server_entry_data.notes
await db.commit()
await db.refresh(existing_entry)
# Возвращаем обновлённую запись
response = schemas.CalendarEntryResponse.model_validate(existing_entry)
return schemas.CalendarEvent.from_server_response(response)
# Создаем новую запись
db_entry = CalendarEntry(
user_id=current_user["user_id"],
entry_date=server_entry_data.entry_date,
entry_type=server_entry_data.entry_type.value,
flow_intensity=server_entry_data.flow_intensity.value if server_entry_data.flow_intensity else None,
period_symptoms=server_entry_data.period_symptoms,
mood=server_entry_data.mood.value if server_entry_data.mood else None,
energy_level=server_entry_data.energy_level,
sleep_hours=server_entry_data.sleep_hours,
symptoms=server_entry_data.symptoms,
medications=server_entry_data.medications,
notes=server_entry_data.notes,
)
db.add(db_entry)
await db.commit()
await db.refresh(db_entry)
# Если это запись о периоде, обновляем данные цикла
if server_entry_data.entry_type == schemas.EntryType.PERIOD:
await update_cycle_data(current_user["user_id"], server_entry_data.entry_date, db)
# Преобразуем в формат для мобильного приложения
response = schemas.CalendarEntryResponse.model_validate(db_entry)
return schemas.CalendarEvent.from_server_response(response)
except Exception as e:
logging.error(f"Error creating calendar entry: {str(e)}")
raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}")
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8004)

View File

@@ -0,0 +1,361 @@
"""
Служба календаря для приложения женской безопасности.
Предоставляет API для работы с записями календаря здоровья.
Поддерживаются следующие форматы запросов:
1. Стандартный формат: /api/v1/calendar/entries
2. Мобильный формат: /api/v1/calendar/entry
3. Запросы без префикса /calendar: /api/v1/entries и /api/v1/entry
"""
from fastapi import FastAPI, Depends, HTTPException, Header, Body, Query, Path
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse
import json
from typing import List, Optional, Dict, Any, Union
from datetime import date, datetime, timedelta
from pydantic import BaseModel, Field
from enum import Enum
import os
import sys
import uuid
import logging
from typing import Dict, List, Optional
import time
import asyncio
# Добавляем путь к общим модулям
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '../../')))
# Импортируем общие модули
from shared.auth import get_current_user
from shared.database import get_db
from shared.config import settings
from services.calendar_service.models import CalendarEntryInDB, EntryType, FlowIntensity, MoodType
# Схемы данных для календарных записей
class CalendarEntryBase(BaseModel):
entry_date: date
entry_type: str
flow_intensity: Optional[str] = None
period_symptoms: Optional[str] = None
mood: Optional[str] = None
energy_level: Optional[int] = None
sleep_hours: Optional[float] = None
symptoms: Optional[str] = None
medications: Optional[str] = None
notes: Optional[str] = None
class CalendarEntryCreate(CalendarEntryBase):
pass
class CalendarEntryResponse(CalendarEntryBase):
id: int
created_at: datetime
updated_at: Optional[datetime] = None
user_id: int
class Config:
orm_mode = True
# Мобильные типы записей и данных
class MobileEntryType(str, Enum):
MENSTRUATION = "MENSTRUATION"
OVULATION = "OVULATION"
SPOTTING = "SPOTTING"
DISCHARGE = "DISCHARGE"
PAIN = "PAIN"
MOOD = "MOOD"
class MobileMood(str, Enum):
HAPPY = "HAPPY"
SAD = "SAD"
ANXIOUS = "ANXIOUS"
IRRITABLE = "IRRITABLE"
ENERGETIC = "ENERGETIC"
TIRED = "TIRED"
NORMAL = "NORMAL"
class MobileSymptom(str, Enum):
CRAMPS = "CRAMPS"
HEADACHE = "HEADACHE"
BLOATING = "BLOATING"
FATIGUE = "FATIGUE"
NAUSEA = "NAUSEA"
BREAST_TENDERNESS = "BREAST_TENDERNESS"
ACNE = "ACNE"
BACKACHE = "BACKACHE"
# Схемы для мобильного API
class MobileCalendarEntry(BaseModel):
date: date
type: MobileEntryType
flow_intensity: Optional[int] = None # 1-5 scale
mood: Optional[MobileMood] = None
symptoms: Optional[List[str]] = None
notes: Optional[str] = None
# Инициализация приложения
app = FastAPI(title="Calendar Service API",
description="API для работы с календарём здоровья",
version="1.0.0")
# CORS
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Настройка логирования
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Конвертеры форматов
def mobile_to_standard_entry(mobile_entry: MobileCalendarEntry) -> CalendarEntryCreate:
"""Конвертирует запись из мобильного формата в стандартный"""
# Отображение типов записей
entry_type_mapping = {
MobileEntryType.MENSTRUATION: "period",
MobileEntryType.OVULATION: "ovulation",
MobileEntryType.SPOTTING: "spotting",
MobileEntryType.DISCHARGE: "discharge",
MobileEntryType.PAIN: "pain",
MobileEntryType.MOOD: "mood"
}
# Отображение интенсивности потока
flow_intensity_mapping = {
1: "very_light",
2: "light",
3: "medium",
4: "heavy",
5: "very_heavy"
}
# Отображение настроения
mood_mapping = {
MobileMood.HAPPY: "happy",
MobileMood.SAD: "sad",
MobileMood.ANXIOUS: "anxious",
MobileMood.IRRITABLE: "irritable",
MobileMood.ENERGETIC: "energetic",
MobileMood.TIRED: "tired",
MobileMood.NORMAL: "normal"
}
# Сопоставляем симптомы
symptoms_str = ""
if mobile_entry.symptoms:
symptoms_str = ", ".join(symptom.lower() for symptom in mobile_entry.symptoms)
# Определяем тип записи
entry_type = entry_type_mapping.get(mobile_entry.type, "other")
# Преобразуем интенсивность потока
flow_intensity = None
if mobile_entry.flow_intensity and mobile_entry.type == MobileEntryType.MENSTRUATION:
flow_intensity = flow_intensity_mapping.get(mobile_entry.flow_intensity)
# Преобразуем настроение
mood = None
if mobile_entry.mood:
mood = mood_mapping.get(mobile_entry.mood)
# Создаем стандартную запись
standard_entry = CalendarEntryCreate(
entry_date=mobile_entry.date,
entry_type=entry_type,
flow_intensity=flow_intensity,
period_symptoms="" if entry_type != "period" else symptoms_str,
mood=mood,
symptoms=symptoms_str if entry_type != "period" else "",
notes=mobile_entry.notes,
energy_level=None,
sleep_hours=None,
medications=""
)
return standard_entry
def standard_to_mobile_entry(entry: CalendarEntryResponse) -> Dict[str, Any]:
"""Конвертирует запись из стандартного формата в мобильный"""
# Обратное отображение типов записей
entry_type_mapping = {
"period": MobileEntryType.MENSTRUATION,
"ovulation": MobileEntryType.OVULATION,
"spotting": MobileEntryType.SPOTTING,
"discharge": MobileEntryType.DISCHARGE,
"pain": MobileEntryType.PAIN,
"mood": MobileEntryType.MOOD
}
# Обратное отображение интенсивности потока
flow_intensity_mapping = {
"very_light": 1,
"light": 2,
"medium": 3,
"heavy": 4,
"very_heavy": 5
}
# Обратное отображение настроения
mood_mapping = {
"happy": MobileMood.HAPPY,
"sad": MobileMood.SAD,
"anxious": MobileMood.ANXIOUS,
"irritable": MobileMood.IRRITABLE,
"energetic": MobileMood.ENERGETIC,
"tired": MobileMood.TIRED,
"normal": MobileMood.NORMAL
}
# Определяем тип записи
mobile_type = entry_type_mapping.get(entry.entry_type, MobileEntryType.MOOD)
# Интенсивность потока
flow_intensity = None
if entry.flow_intensity:
flow_intensity = flow_intensity_mapping.get(entry.flow_intensity)
# Настроение
mood = None
if entry.mood:
mood = mood_mapping.get(entry.mood, MobileMood.NORMAL)
# Симптомы
symptoms = []
if entry.entry_type == "period" and entry.period_symptoms:
symptoms = [s.strip().upper() for s in entry.period_symptoms.split(',') if s.strip()]
elif entry.symptoms:
symptoms = [s.strip().upper() for s in entry.symptoms.split(',') if s.strip()]
# Создаем мобильную запись
mobile_entry = {
"id": entry.id,
"date": entry.entry_date.isoformat(),
"type": mobile_type,
"flow_intensity": flow_intensity,
"mood": mood,
"symptoms": symptoms,
"notes": entry.notes,
"created_at": entry.created_at.isoformat(),
"updated_at": entry.updated_at.isoformat() if entry.updated_at else None
}
return mobile_entry
# Эндпоинт для проверки здоровья сервиса
@app.get("/health")
def health_check():
return {"status": "ok", "service": "calendar"}
# API для работы с календарем
# 1. Стандартный эндпоинт для получения записей календаря
@app.get("/api/v1/calendar/entries", response_model=List[CalendarEntryResponse])
async def get_calendar_entries(
skip: int = 0,
limit: int = 100,
current_user: dict = Depends(get_current_user),
db = Depends(get_db)
):
"""Получение списка записей календаря"""
entries = db.query(CalendarEntryInDB).filter(
CalendarEntryInDB.user_id == current_user["id"]
).offset(skip).limit(limit).all()
return entries
# 2. Стандартный эндпоинт для создания записи календаря
@app.post("/api/v1/calendar/entries", response_model=CalendarEntryResponse, status_code=201)
async def create_calendar_entry(
entry: CalendarEntryCreate,
current_user: dict = Depends(get_current_user),
db = Depends(get_db)
):
"""Создание новой записи в календаре"""
db_entry = CalendarEntryInDB(**entry.dict(), user_id=current_user["id"])
db.add(db_entry)
db.commit()
db.refresh(db_entry)
return db_entry
# 3. Поддержка legacy эндпоинта /api/v1/entries (без префикса /calendar)
@app.get("/api/v1/entries", response_model=List[CalendarEntryResponse])
async def get_entries_legacy(
skip: int = 0,
limit: int = 100,
current_user: dict = Depends(get_current_user),
db = Depends(get_db)
):
"""Legacy эндпоинт для получения записей календаря"""
return await get_calendar_entries(skip, limit, current_user, db)
@app.post("/api/v1/entries", response_model=CalendarEntryResponse, status_code=201)
async def create_entry_legacy(
entry: CalendarEntryCreate,
current_user: dict = Depends(get_current_user),
db = Depends(get_db)
):
"""Legacy эндпоинт для создания записи в календаре"""
return await create_calendar_entry(entry, current_user, db)
# 4. Поддержка эндпоинта /api/v1/entry (ед. число, без префикса /calendar)
@app.post("/api/v1/entry", response_model=CalendarEntryResponse, status_code=201)
async def create_entry_singular(
entry: CalendarEntryCreate,
current_user: dict = Depends(get_current_user),
db = Depends(get_db)
):
"""Эндпоинт для создания записи в календаре (ед. число)"""
return await create_calendar_entry(entry, current_user, db)
# 5. Поддержка мобильного API с эндпоинтом /api/v1/calendar/entry
@app.post("/api/v1/calendar/entry", status_code=201)
async def create_mobile_calendar_entry(
entry: MobileCalendarEntry,
current_user: dict = Depends(get_current_user),
db = Depends(get_db)
):
"""Создание записи в календаре из мобильного приложения"""
# Преобразуем мобильную запись в стандартную
standard_entry = mobile_to_standard_entry(entry)
# Создаем запись в БД
db_entry = CalendarEntryInDB(**standard_entry.dict(), user_id=current_user["id"])
db.add(db_entry)
db.commit()
db.refresh(db_entry)
# Преобразуем ответ обратно в мобильный формат
mobile_response = standard_to_mobile_entry(db_entry)
return mobile_response
@app.get("/api/v1/calendar/entry/{entry_id}", status_code=200)
async def get_mobile_calendar_entry(
entry_id: int,
current_user: dict = Depends(get_current_user),
db = Depends(get_db)
):
"""Получение записи календаря в мобильном формате"""
db_entry = db.query(CalendarEntryInDB).filter(
CalendarEntryInDB.id == entry_id,
CalendarEntryInDB.user_id == current_user["id"]
).first()
if not db_entry:
raise HTTPException(status_code=404, detail="Запись не найдена")
# Преобразуем ответ в мобильный формат
mobile_response = standard_to_mobile_entry(db_entry)
return mobile_response
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8004)