""" Authentication utilities for all services. This module provides common authentication functionality to avoid circular imports. """ import logging from datetime import datetime, timedelta from typing import Optional import jwt from fastapi import Depends, HTTPException, status from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer from jwt.exceptions import InvalidTokenError from passlib.context import CryptContext from shared.config import settings # Suppress bcrypt version warnings logging.getLogger("passlib").setLevel(logging.ERROR) # Password hashing - настройка bcrypt с более надежными параметрами pwd_context = CryptContext( schemes=["bcrypt"], deprecated="auto", bcrypt__rounds=12, # Стандартное количество раундов bcrypt__truncate_error=False # Не вызывать ошибку при длинных паролях, а просто обрезать ) # Bearer token scheme security = HTTPBearer() def verify_password(plain_password: str, hashed_password: str) -> bool: """Verify a password against its hash. Handle bcrypt compatibility issues.""" try: # Увеличим подробность логов logging.info(f"Verifying password length: {len(plain_password)} chars") # Проверяем пароль с помощью passlib и логируем результат result = pwd_context.verify(plain_password, hashed_password) logging.info(f"Password verification result: {result}") return result except Exception as e: logging.error(f"Error verifying password: {e}, hash_type: {hashed_password[:10]}...") return False def get_password_hash(password: str) -> str: """Get password hash. Let passlib handle bcrypt compatibility.""" try: # Увеличим подробность логов logging.info(f"Hashing password length: {len(password)} chars") # bcrypt автоматически ограничит длину пароля до 72 байт hashed = pwd_context.hash(password) logging.info("Password hashed successfully") return hashed except Exception as e: # Логируем ошибку и пробрасываем исключение logging.error(f"Error hashing password: {e}") raise ValueError(f"Password hashing failed: {str(e)}") def create_access_token(data: dict, expires_delta: Optional[timedelta] = None) -> str: """Create access token.""" to_encode = data.copy() if expires_delta: expire = datetime.utcnow() + expires_delta else: expire = datetime.utcnow() + timedelta(minutes=15) to_encode.update({"exp": expire}) encoded_jwt = jwt.encode( to_encode, settings.SECRET_KEY, algorithm=settings.ALGORITHM ) return encoded_jwt def verify_token(token: str) -> Optional[dict]: """Verify and decode JWT token.""" try: payload = jwt.decode( token, settings.SECRET_KEY, algorithms=[settings.ALGORITHM] ) user_id: str = payload.get("sub") if user_id is None: return None return {"user_id": int(user_id), "email": payload.get("email")} except InvalidTokenError: return None async def get_current_user_from_token( credentials: HTTPAuthorizationCredentials = Depends(security), ) -> dict: """Get current user from JWT token.""" token = credentials.credentials user_data = verify_token(token) if user_data is None: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Could not validate credentials", headers={"WWW-Authenticate": "Bearer"}, ) return user_data