Files
marriage/scripts/patch_profiles_security.sh
Andrey K. Choi cc87dcc0fa
Some checks failed
continuous-integration/drone/push Build is failing
api development
2025-08-08 21:58:36 +09:00

80 lines
3.0 KiB
Bash
Executable File

#!/usr/bin/env bash
set -euo pipefail
REQ="services/profiles/requirements.txt"
PY="services/profiles/src/app/core/security.py"
# 1) гарантируем зависимость PyJWT
grep -qE '(^|[[:space:]])PyJWT' "$REQ" 2>/dev/null || {
echo "PyJWT>=2.8.0" >> "$REQ"
echo "[profiles] added PyJWT to requirements.txt"
}
# 2) модуль security.py
mkdir -p "$(dirname "$PY")"
cat > "$PY" <<'PY'
import os
from typing import Optional
import jwt
from fastapi import Depends, HTTPException, status
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
from pydantic import BaseModel
reusable_bearer = HTTPBearer(auto_error=True)
JWT_SECRET = os.getenv("JWT_SECRET", "dev-secret")
JWT_ALGORITHM = os.getenv("JWT_ALGORITHM", "HS256")
# Возможность включить строгую проверку audience/issuer в будущем
JWT_VERIFY_AUD = os.getenv("JWT_VERIFY_AUD", "0") == "1"
JWT_AUDIENCE: Optional[str] = os.getenv("JWT_AUDIENCE") or None
JWT_VERIFY_ISS = os.getenv("JWT_VERIFY_ISS", "0") == "1"
JWT_ISSUER: Optional[str] = os.getenv("JWT_ISSUER") or None
# Допустимая рассинхронизация часов (сек)
JWT_LEEWAY = int(os.getenv("JWT_LEEWAY", "30"))
class JwtUser(BaseModel):
sub: str
email: Optional[str] = None
role: Optional[str] = None
def decode_token(token: str) -> JwtUser:
options = {
"verify_signature": True,
"verify_exp": True,
"verify_aud": JWT_VERIFY_AUD,
"verify_iss": JWT_VERIFY_ISS,
}
kwargs = {"algorithms": [JWT_ALGORITHM], "options": options, "leeway": JWT_LEEWAY}
if JWT_VERIFY_AUD and JWT_AUDIENCE:
kwargs["audience"] = JWT_AUDIENCE
if JWT_VERIFY_ISS and JWT_ISSUER:
kwargs["issuer"] = JWT_ISSUER
try:
payload = jwt.decode(token, JWT_SECRET, **kwargs)
sub = str(payload.get("sub") or "")
if not sub:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid token: no sub")
return JwtUser(sub=sub, email=payload.get("email"), role=payload.get("role"))
except jwt.ExpiredSignatureError:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Token expired")
except jwt.InvalidAudienceError:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid audience")
except jwt.InvalidIssuerError:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid issuer")
except jwt.InvalidTokenError:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid token")
def get_current_user(credentials: HTTPAuthorizationCredentials = Depends(reusable_bearer)) -> JwtUser:
if credentials.scheme.lower() != "bearer":
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid auth scheme")
return decode_token(credentials.credentials)
PY
echo "[profiles] rebuilding..."
docker compose build profiles
docker compose restart profiles