All checks were successful
continuous-integration/drone/push Build is passing
Changes: - Fix nutrition service: add is_active column and Pydantic validation for UUID/datetime - Add location-based alerts feature: users can now see alerts within 1km radius - Fix CORS and response serialization in nutrition service - Add getCurrentLocation() and loadAlertsNearby() functions - Improve UI for nearby alerts display with distance and response count
112 lines
4.0 KiB
Python
112 lines
4.0 KiB
Python
"""
|
||
Authentication utilities for all services.
|
||
This module provides common authentication functionality to avoid circular imports.
|
||
"""
|
||
|
||
import logging
|
||
from datetime import datetime, timedelta, timezone
|
||
from typing import Optional
|
||
|
||
import jwt
|
||
from fastapi import Depends, HTTPException, status
|
||
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
|
||
from jwt.exceptions import InvalidTokenError
|
||
from passlib.context import CryptContext
|
||
|
||
from shared.config import settings
|
||
|
||
# Suppress bcrypt version warnings
|
||
logging.getLogger("passlib").setLevel(logging.ERROR)
|
||
|
||
# Password hashing - настройка bcrypt с более надежными параметрами
|
||
pwd_context = CryptContext(
|
||
schemes=["bcrypt"],
|
||
deprecated="auto",
|
||
bcrypt__rounds=12, # Стандартное количество раундов
|
||
bcrypt__truncate_error=False # Не вызывать ошибку при длинных паролях, а просто обрезать
|
||
)
|
||
|
||
# Bearer token scheme
|
||
security = HTTPBearer()
|
||
|
||
|
||
def verify_password(plain_password: str, hashed_password: str) -> bool:
|
||
"""Verify a password against its hash. Handle bcrypt compatibility issues."""
|
||
try:
|
||
# Увеличим подробность логов
|
||
logging.info(f"Verifying password length: {len(plain_password)} chars")
|
||
|
||
# Проверяем пароль с помощью passlib и логируем результат
|
||
result = pwd_context.verify(plain_password, hashed_password)
|
||
logging.info(f"Password verification result: {result}")
|
||
return result
|
||
except Exception as e:
|
||
logging.error(f"Error verifying password: {e}, hash_type: {hashed_password[:10]}...")
|
||
return False
|
||
|
||
|
||
def get_password_hash(password: str) -> str:
|
||
"""Get password hash. Let passlib handle bcrypt compatibility."""
|
||
try:
|
||
# Увеличим подробность логов
|
||
logging.info(f"Hashing password length: {len(password)} chars")
|
||
|
||
# bcrypt автоматически ограничит длину пароля до 72 байт
|
||
hashed = pwd_context.hash(password)
|
||
logging.info("Password hashed successfully")
|
||
return hashed
|
||
except Exception as e:
|
||
# Логируем ошибку и пробрасываем исключение
|
||
logging.error(f"Error hashing password: {e}")
|
||
raise ValueError(f"Password hashing failed: {str(e)}")
|
||
|
||
|
||
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None) -> str:
|
||
"""Create access token."""
|
||
to_encode = data.copy()
|
||
if expires_delta:
|
||
expire = datetime.now(timezone.utc) + expires_delta
|
||
else:
|
||
expire = datetime.now(timezone.utc) + timedelta(minutes=15)
|
||
to_encode.update({"exp": expire})
|
||
try:
|
||
encoded_jwt = jwt.encode(
|
||
to_encode, settings.SECRET_KEY, algorithm=settings.ALGORITHM
|
||
)
|
||
return encoded_jwt
|
||
except Exception as e:
|
||
logging.error(f"Error encoding JWT: {e}, data: {data}, expire: {expire}")
|
||
raise
|
||
|
||
|
||
def verify_token(token: str) -> Optional[dict]:
|
||
"""Verify and decode JWT token."""
|
||
try:
|
||
payload = jwt.decode(
|
||
token, settings.SECRET_KEY, algorithms=[settings.ALGORITHM]
|
||
)
|
||
user_id: str = payload.get("sub")
|
||
if user_id is None:
|
||
return None
|
||
return {"user_id": int(user_id), "email": payload.get("email")}
|
||
except InvalidTokenError as e:
|
||
logging.error(f"Token validation error: {e}")
|
||
return None
|
||
|
||
|
||
async def get_current_user_from_token(
|
||
credentials: HTTPAuthorizationCredentials = Depends(security),
|
||
) -> dict:
|
||
"""Get current user from JWT token."""
|
||
token = credentials.credentials
|
||
user_data = verify_token(token)
|
||
|
||
if user_data is None:
|
||
raise HTTPException(
|
||
status_code=status.HTTP_401_UNAUTHORIZED,
|
||
detail="Could not validate credentials",
|
||
headers={"WWW-Authenticate": "Bearer"},
|
||
)
|
||
|
||
return user_data
|