from contextlib import asynccontextmanager from time import monotonic from uuid import uuid4 from fastapi import Depends, FastAPI, Request, Response from fastapi.middleware.cors import CORSMiddleware from fastapi.staticfiles import StaticFiles from sqlalchemy import text from sqlalchemy.ext.asyncio import AsyncSession from app.api import ( admin, cars, catalog, change_requests, entries, gamification, my, ocr, parser, service_centers, service_visits, sto_booking, users, work_orders, ) from app.core.config import settings from app.db.session import async_session_factory, get_session from app.services.admin_notifications import create_admin_notification from app.services.rate_limit import get_redis_client @asynccontextmanager async def lifespan(app: FastAPI): settings.validate_production_settings() yield app = FastAPI(title="Drivers Bot API", version="0.1.0", lifespan=lifespan) REQUEST_COUNT = 0 REQUEST_ERRORS = 0 REQUEST_DURATION_TOTAL = 0.0 @app.middleware("http") async def production_headers_and_metrics(request: Request, call_next): global REQUEST_COUNT, REQUEST_DURATION_TOTAL, REQUEST_ERRORS request_id = request.headers.get("X-Request-ID") or str(uuid4()) start = monotonic() try: response = await call_next(request) except Exception as exc: REQUEST_ERRORS += 1 try: async with async_session_factory() as session: await create_admin_notification( session, event_type="system_error", title="Unhandled API error", body=f"{request.method} {request.url.path}\nError: {type(exc).__name__}", entity_type="system", entity_id=request.url.path, severity="error", idempotency_key=f"system_error:{request.url.path}:{type(exc).__name__}:{int(start // 60)}", metadata={ "path": request.url.path, "method": request.method, "request_id": request_id, "error_type": type(exc).__name__, }, ) await session.commit() except Exception: pass raise duration = monotonic() - start REQUEST_COUNT += 1 REQUEST_DURATION_TOTAL += duration response.headers["X-Request-ID"] = request_id response.headers["X-Content-Type-Options"] = "nosniff" response.headers["Referrer-Policy"] = "strict-origin-when-cross-origin" response.headers["X-Frame-Options"] = "DENY" if settings.is_production: response.headers["Strict-Transport-Security"] = "max-age=31536000; includeSubDomains" response.headers["Content-Security-Policy"] = ( "default-src 'self' https://telegram.org https://*.telegram.org; " "connect-src 'self' https://api.telegram.org; " "img-src 'self' data: https:; " "script-src 'self' 'unsafe-inline' https://telegram.org https://*.telegram.org; " "style-src 'self' 'unsafe-inline'; " "frame-ancestors 'none'" ) return response dev_origins = ["http://localhost:8000", "http://127.0.0.1:8000"] if not settings.is_production else [] cors_origins = settings.cors_origin_list or dev_origins app.add_middleware( CORSMiddleware, allow_origins=cors_origins, allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) app.include_router(users.router, prefix="/api") app.include_router(my.router, prefix="/api") app.include_router(catalog.router, prefix="/api") app.include_router(cars.router, prefix="/api") app.include_router(entries.router, prefix="/api") app.include_router(gamification.router, prefix="/api") app.include_router(ocr.router, prefix="/api") app.include_router(parser.router, prefix="/api") app.include_router(service_centers.router, prefix="/api") app.include_router(sto_booking.router, prefix="/api") app.include_router(service_visits.router, prefix="/api") app.include_router(work_orders.router, prefix="/api") app.include_router(change_requests.router, prefix="/api") app.include_router(admin.router, prefix="/api") @app.get("/health") async def health() -> dict[str, str]: return {"status": "ok"} @app.get("/metrics") async def metrics() -> Response: avg = REQUEST_DURATION_TOTAL / REQUEST_COUNT if REQUEST_COUNT else 0 body = "\n".join( [ "# TYPE carpass_requests_total counter", f"carpass_requests_total {REQUEST_COUNT}", "# TYPE carpass_request_errors_total counter", f"carpass_request_errors_total {REQUEST_ERRORS}", "# TYPE carpass_request_duration_seconds_avg gauge", f"carpass_request_duration_seconds_avg {avg:.6f}", "", ] ) return Response(body, media_type="text/plain; version=0.0.4") @app.get("/ready") async def ready(session: AsyncSession = Depends(get_session)) -> dict[str, str]: await session.execute(text("select 1")) migration = "unknown" try: version = await session.execute(text("select version_num from alembic_version limit 1")) migration = version.scalar_one_or_none() or "unknown" except Exception: migration = "not_checked" redis_status = "disabled" if settings.redis_url: redis = await get_redis_client() if redis is None: redis_status = "client_missing" else: await redis.ping() redis_status = "ok" return {"status": "ready", "database": "ok", "redis": redis_status, "migration": migration} app.mount("/", StaticFiles(directory="web", html=True), name="web")