Files
finance_bot/app/security/jwt_manager.py
Andrew K. Choi 23a9d975a9 feat: Complete API authentication system with email & Telegram support
- Add email/password registration endpoint (/api/v1/auth/register)
- Add JWT token endpoints for Telegram users (/api/v1/auth/token/get, /api/v1/auth/token/refresh-telegram)
- Enhance User model to support both email and Telegram authentication
- Fix JWT token handling: convert sub to string (RFC compliance with PyJWT 2.10.1+)
- Fix bot API calls: filter None values from query parameters
- Fix JWT extraction from Redis: handle both bytes and string returns
- Add public endpoints to JWT middleware: /api/v1/auth/register, /api/v1/auth/token/*
- Update bot commands: /register (one-tap), /link (account linking), /start (options)
- Create complete database schema migration with email auth support
- Remove deprecated version attribute from docker-compose.yml
- Add service dependency: bot waits for web service startup

Features:
- Dual authentication: email/password OR Telegram ID
- JWT tokens with 15-min access + 30-day refresh lifetime
- Redis-based token storage with TTL
- Comprehensive API documentation and integration guides
- Test scripts and Python examples
- Full deployment checklist

Database changes:
- User model: added email, password_hash, email_verified (nullable fields)
- telegram_id now nullable to support email-only users
- Complete schema with families, accounts, categories, transactions, budgets, goals

Status: Production-ready with all tests passing
2025-12-11 21:00:34 +09:00

150 lines
4.5 KiB
Python

"""
JWT Token Management - Access & Refresh Token Handling
"""
from datetime import datetime, timedelta
from typing import Optional, Dict, Any
from enum import Enum
import jwt
from pydantic import BaseModel
from app.core.config import settings
class TokenType(str, Enum):
ACCESS = "access"
REFRESH = "refresh"
SERVICE = "service" # For bot/workers
class TokenPayload(BaseModel):
"""JWT Token Payload Structure"""
sub: str # user_id as string (RFC compliance)
type: TokenType
device_id: Optional[str] = None
scope: str = "default" # For granular permissions
family_ids: list[int] = [] # Accessible families
iat: int # issued at
exp: int # expiration
class JWTManager:
"""
JWT token generation, validation, and management.
Algorithms:
- Production: RS256 (asymmetric) - more secure, scalable
- MVP: HS256 (symmetric) - simpler setup
"""
# Token lifetimes (configurable in settings)
ACCESS_TOKEN_EXPIRE_MINUTES = 15 # Short-lived
REFRESH_TOKEN_EXPIRE_DAYS = 30 # Long-lived
SERVICE_TOKEN_EXPIRE_HOURS = 8760 # 1 year
def __init__(self, secret_key: str = None):
self.secret_key = secret_key or settings.jwt_secret_key
self.algorithm = "HS256"
def create_access_token(
self,
user_id: int,
device_id: Optional[str] = None,
family_ids: list[int] = None,
expires_delta: Optional[timedelta] = None,
) -> str:
"""Generate short-lived access token"""
if expires_delta is None:
expires_delta = timedelta(minutes=self.ACCESS_TOKEN_EXPIRE_MINUTES)
return self._create_token(
user_id=user_id,
token_type=TokenType.ACCESS,
expires_delta=expires_delta,
device_id=device_id,
family_ids=family_ids or [],
)
def create_refresh_token(
self,
user_id: int,
device_id: Optional[str] = None,
expires_delta: Optional[timedelta] = None,
) -> str:
"""Generate long-lived refresh token"""
if expires_delta is None:
expires_delta = timedelta(days=self.REFRESH_TOKEN_EXPIRE_DAYS)
return self._create_token(
user_id=user_id,
token_type=TokenType.REFRESH,
expires_delta=expires_delta,
device_id=device_id,
)
def create_service_token(
self,
service_name: str,
expires_delta: Optional[timedelta] = None,
) -> str:
"""Generate service-to-service token (e.g., for bot)"""
if expires_delta is None:
expires_delta = timedelta(hours=self.SERVICE_TOKEN_EXPIRE_HOURS)
now = datetime.utcnow()
expire = now + expires_delta
payload = {
"sub": f"service:{service_name}",
"type": TokenType.SERVICE,
"iat": int(now.timestamp()),
"exp": int(expire.timestamp()),
}
return jwt.encode(payload, self.secret_key, algorithm=self.algorithm)
def _create_token(
self,
user_id: int,
token_type: TokenType,
expires_delta: timedelta,
device_id: Optional[str] = None,
family_ids: list[int] = None,
) -> str:
"""Internal token creation"""
now = datetime.utcnow()
expire = now + expires_delta
payload = {
"sub": str(user_id), # RFC requires string, convert int to str
"type": token_type.value,
"device_id": device_id,
"family_ids": family_ids or [],
"iat": int(now.timestamp()),
"exp": int(expire.timestamp()),
}
return jwt.encode(payload, self.secret_key, algorithm=self.algorithm)
def verify_token(self, token: str) -> TokenPayload:
"""
Verify token signature and expiration.
Raises:
- jwt.InvalidTokenError
- jwt.ExpiredSignatureError
"""
try:
payload = jwt.decode(token, self.secret_key, algorithms=[self.algorithm])
return TokenPayload(**payload)
except jwt.ExpiredSignatureError:
raise ValueError("Token has expired")
except jwt.InvalidTokenError:
raise ValueError("Invalid token")
def decode_token(self, token: str) -> Dict[str, Any]:
"""Decode token without verification (for debugging only)"""
return jwt.decode(token, options={"verify_signature": False})
# Singleton instance
jwt_manager = JWTManager()