Files
finance_bot/.history/app/services/auth_service_20251210221406.py
2025-12-10 22:18:07 +09:00

256 lines
7.9 KiB
Python

"""
Authentication Service - User login, token management, Telegram binding
"""
from datetime import datetime, timedelta
from typing import Optional, Dict, Any
import secrets
import json
from sqlalchemy.orm import Session
from app.db.models.user import User
from app.security.jwt_manager import jwt_manager
from app.core.config import settings
import logging
import redis
logger = logging.getLogger(__name__)
# Redis connection for caching binding codes
redis_client = redis.from_url(settings.redis_url)
class AuthService:
"""Handles user authentication, token management, and Telegram binding"""
TELEGRAM_BINDING_CODE_TTL = 600 # 10 minutes
BINDING_CODE_LENGTH = 24
def __init__(self, db: Session):
self.db = db
async def create_telegram_binding_code(self, chat_id: int) -> str:
"""
Generate temporary code for Telegram user binding.
**Flow:**
1. Bot calls /auth/telegram/start with chat_id
2. Service generates random code and stores in Redis
3. Bot builds link: https://bot.example.com/bind?code=XXX
4. Bot sends link to user in Telegram
5. User clicks link, authenticates, calls /auth/telegram/confirm
"""
code = secrets.token_urlsafe(self.BINDING_CODE_LENGTH)
# Store in Redis with TTL (10 minutes)
cache_key = f"binding_code:{code}"
cache_data = {
"chat_id": chat_id,
"created_at": datetime.utcnow().isoformat(),
}
redis_client.setex(
cache_key,
self.TELEGRAM_BINDING_CODE_TTL,
json.dumps(cache_data),
)
logger.info(f"Generated Telegram binding code for chat_id={chat_id}")
return code
async def confirm_telegram_binding(
self,
user_id: int,
chat_id: int,
code: str,
username: Optional[str] = None,
first_name: Optional[str] = None,
last_name: Optional[str] = None,
) -> Dict[str, Any]:
"""
Confirm Telegram binding after user clicks link.
**Flow:**
1. User authenticates with email/password
2. User clicks binding link: /bind?code=XXX
3. Frontend calls /auth/telegram/confirm with code
4. Service validates code from Redis
5. Service links user.id with telegram_id
6. Service returns JWT for bot to use
**Returns:**
{
"success": True,
"user_id": 123,
"jwt_token": "eyJ...",
"expires_at": "2025-12-11T12:00:00",
}
"""
# Validate code from Redis
cache_key = f"binding_code:{code}"
cached_data = redis_client.get(cache_key)
if not cached_data:
logger.warning(f"Binding code not found or expired: {code}")
return {"success": False, "error": "Code expired"}
binding_data = json.loads(cached_data)
cached_chat_id = binding_data.get("chat_id")
if cached_chat_id != chat_id:
logger.warning(
f"Chat ID mismatch: expected {cached_chat_id}, got {chat_id}"
)
return {"success": False, "error": "Code mismatch"}
# Get user from database
user = self.db.query(User).filter_by(id=user_id).first()
if not user:
logger.error(f"User not found: {user_id}")
return {"success": False, "error": "User not found"}
# Update user with Telegram info
user.telegram_id = chat_id
if username:
user.username = username
if first_name:
user.first_name = first_name
if last_name:
user.last_name = last_name
user.updated_at = datetime.utcnow()
self.db.commit()
# Create JWT token for bot
access_token = jwt_manager.create_access_token(user_id=user.id)
# Remove code from Redis
redis_client.delete(cache_key)
logger.info(
f"Telegram binding confirmed: user_id={user_id}, chat_id={chat_id}"
)
return {
"success": True,
"user_id": user.id,
"jwt_token": access_token,
"expires_at": (
datetime.utcnow() + timedelta(hours=24)
).isoformat(),
}
async def create_session(
self,
user_id: int,
device_id: Optional[str] = None,
) -> tuple[str, str]:
"""
Create session and issue tokens.
**Returns:**
(access_token, refresh_token)
"""
user = self.db.query(User).filter_by(id=user_id).first()
if not user:
raise ValueError("User not found")
# Create tokens
access_token = jwt_manager.create_access_token(user_id=user.id)
refresh_token = jwt_manager.create_refresh_token(user_id=user.id)
# Store refresh token in Redis for validation
token_key = f"refresh_token:{refresh_token}"
token_data = {
"user_id": user.id,
"device_id": device_id,
"created_at": datetime.utcnow().isoformat(),
}
# Refresh token valid for 30 days
redis_client.setex(
token_key,
86400 * 30,
json.dumps(token_data),
)
# Update user activity
user.last_activity = datetime.utcnow()
self.db.commit()
logger.info(f"Session created for user_id={user_id}")
return access_token, refresh_token
async def refresh_access_token(
self,
refresh_token: str,
user_id: int,
) -> str:
"""
Issue new access token using valid refresh token.
**Flow:**
1. Check refresh token in Redis
2. Validate it belongs to user_id
3. Create new access token
4. Return new token (don't invalidate refresh token)
"""
token_key = f"refresh_token:{refresh_token}"
token_data = redis_client.get(token_key)
if not token_data:
logger.warning(f"Refresh token not found: {user_id}")
raise ValueError("Invalid refresh token")
data = json.loads(token_data)
if data.get("user_id") != user_id:
logger.warning(f"Refresh token user mismatch: {user_id}")
raise ValueError("Token user mismatch")
# Create new access token
access_token = jwt_manager.create_access_token(user_id=user_id)
logger.info(f"Access token refreshed for user_id={user_id}")
return access_token
async def authenticate_telegram_user(self, chat_id: int) -> Optional[Dict[str, Any]]:
"""
Get JWT token for Telegram user (bot authentication).
**Flow:**
1. Bot queries: GET /auth/telegram/authenticate?chat_id=12345
2. Service finds user by telegram_id
3. Service creates/returns JWT
4. Bot stores JWT for API calls
**Returns:**
{
"user_id": 123,
"jwt_token": "eyJ...",
"expires_at": "2025-12-11T12:00:00",
} or None if not found
"""
user = self.db.query(User).filter_by(telegram_id=chat_id).first()
if not user:
logger.warning(f"Telegram user not found: {chat_id}")
return None
# Create JWT token
access_token = jwt_manager.create_access_token(user_id=user.id)
logger.info(f"Telegram user authenticated: chat_id={chat_id}, user_id={user.id}")
return {
"user_id": user.id,
"jwt_token": access_token,
"expires_at": (
datetime.utcnow() + timedelta(hours=24)
).isoformat(),
}