Files
chat/shared/auth.py
Andrew K. Choi 4e3768a6ee
Some checks failed
continuous-integration/drone/push Build is failing
pipeline issues fix
2025-09-25 11:59:54 +09:00

77 lines
2.2 KiB
Python

"""
Authentication utilities for all services.
This module provides common authentication functionality to avoid circular imports.
"""
from datetime import datetime, timedelta
from typing import Optional
import jwt
from fastapi import Depends, HTTPException, status
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
from jwt.exceptions import InvalidTokenError
from passlib.context import CryptContext
from shared.config import settings
# Password hashing
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
# Bearer token scheme
security = HTTPBearer()
def verify_password(plain_password: str, hashed_password: str) -> bool:
"""Verify password against hash."""
return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password: str) -> str:
"""Get password hash."""
return pwd_context.hash(password)
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None) -> str:
"""Create access token."""
to_encode = data.copy()
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=15)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(
to_encode, settings.SECRET_KEY, algorithm=settings.ALGORITHM
)
return encoded_jwt
def verify_token(token: str) -> Optional[dict]:
"""Verify and decode JWT token."""
try:
payload = jwt.decode(
token, settings.SECRET_KEY, algorithms=[settings.ALGORITHM]
)
user_id: str = payload.get("sub")
if user_id is None:
return None
return {"user_id": int(user_id), "email": payload.get("email")}
except InvalidTokenError:
return None
async def get_current_user_from_token(
credentials: HTTPAuthorizationCredentials = Depends(security),
) -> dict:
"""Get current user from JWT token."""
token = credentials.credentials
user_data = verify_token(token)
if user_data is None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
return user_data