#!/usr/bin/env bash set -euo pipefail REQ="services/profiles/requirements.txt" PY="services/profiles/src/app/core/security.py" # 1) гарантируем зависимость PyJWT grep -qE '(^|[[:space:]])PyJWT' "$REQ" 2>/dev/null || { echo "PyJWT>=2.8.0" >> "$REQ" echo "[profiles] added PyJWT to requirements.txt" } # 2) модуль security.py mkdir -p "$(dirname "$PY")" cat > "$PY" <<'PY' import os from typing import Optional import jwt from fastapi import Depends, HTTPException, status from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer from pydantic import BaseModel reusable_bearer = HTTPBearer(auto_error=True) JWT_SECRET = os.getenv("JWT_SECRET", "dev-secret") JWT_ALGORITHM = os.getenv("JWT_ALGORITHM", "HS256") # Возможность включить строгую проверку audience/issuer в будущем JWT_VERIFY_AUD = os.getenv("JWT_VERIFY_AUD", "0") == "1" JWT_AUDIENCE: Optional[str] = os.getenv("JWT_AUDIENCE") or None JWT_VERIFY_ISS = os.getenv("JWT_VERIFY_ISS", "0") == "1" JWT_ISSUER: Optional[str] = os.getenv("JWT_ISSUER") or None # Допустимая рассинхронизация часов (сек) JWT_LEEWAY = int(os.getenv("JWT_LEEWAY", "30")) class JwtUser(BaseModel): sub: str email: Optional[str] = None role: Optional[str] = None def decode_token(token: str) -> JwtUser: options = { "verify_signature": True, "verify_exp": True, "verify_aud": JWT_VERIFY_AUD, "verify_iss": JWT_VERIFY_ISS, } kwargs = {"algorithms": [JWT_ALGORITHM], "options": options, "leeway": JWT_LEEWAY} if JWT_VERIFY_AUD and JWT_AUDIENCE: kwargs["audience"] = JWT_AUDIENCE if JWT_VERIFY_ISS and JWT_ISSUER: kwargs["issuer"] = JWT_ISSUER try: payload = jwt.decode(token, JWT_SECRET, **kwargs) sub = str(payload.get("sub") or "") if not sub: raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid token: no sub") return JwtUser(sub=sub, email=payload.get("email"), role=payload.get("role")) except jwt.ExpiredSignatureError: raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Token expired") except jwt.InvalidAudienceError: raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid audience") except jwt.InvalidIssuerError: raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid issuer") except jwt.InvalidTokenError: raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid token") def get_current_user(credentials: HTTPAuthorizationCredentials = Depends(reusable_bearer)) -> JwtUser: if credentials.scheme.lower() != "bearer": raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid auth scheme") return decode_token(credentials.credentials) PY echo "[profiles] rebuilding..." docker compose build profiles docker compose restart profiles