Files
finance_bot/.history/app/security/hmac_manager_20251210210906.py
2025-12-10 22:09:31 +09:00

146 lines
4.3 KiB
Python

"""
HMAC Signature Verification - Replay Attack Prevention & Request Integrity
"""
import hashlib
import hmac
import json
from datetime import datetime
from typing import Tuple
from urllib.parse import urlencode
from app.core.config import settings
import redis
class HMACManager:
"""
Request signing and verification using HMAC-SHA256.
Signature Format:
────────────────────────────────────────────────────
base_string = METHOD + ENDPOINT + TIMESTAMP + hash(BODY)
signature = HMAC_SHA256(base_string, client_secret)
Headers Required:
- X-Signature: base64(signature)
- X-Timestamp: unix timestamp (seconds)
- X-Client-Id: client identifier
Anti-Replay Protection:
- Check timestamp freshness (±30 seconds)
- Store signature hash in Redis with 1-minute TTL
- Reject duplicate signatures (nonce check)
"""
# Configuration
TIMESTAMP_TOLERANCE_SECONDS = 30
REPLAY_NONCE_TTL_SECONDS = 60
def __init__(self, redis_client: redis.Redis = None):
self.redis_client = redis_client
self.algorithm = "sha256"
def create_signature(
self,
method: str,
endpoint: str,
timestamp: int,
body: dict = None,
client_secret: str = None,
) -> str:
"""
Create HMAC signature for request.
Args:
method: HTTP method (GET, POST, PUT, DELETE)
endpoint: API endpoint path (/api/v1/transactions)
timestamp: Unix timestamp
body: Request body dictionary
client_secret: Shared secret key
Returns:
Base64-encoded signature
"""
if client_secret is None:
client_secret = settings.hmac_secret_key
# Create base string
base_string = self._build_base_string(method, endpoint, timestamp, body)
# Generate HMAC
signature = hmac.new(
client_secret.encode(),
base_string.encode(),
hashlib.sha256
).hexdigest()
return signature
def verify_signature(
self,
method: str,
endpoint: str,
timestamp: int,
signature: str,
body: dict = None,
client_secret: str = None,
) -> Tuple[bool, str]:
"""
Verify HMAC signature and check for replay attacks.
Returns:
(is_valid, error_message)
"""
if client_secret is None:
client_secret = settings.hmac_secret_key
# Step 1: Check timestamp freshness
now = datetime.utcnow().timestamp()
time_diff = abs(now - timestamp)
if time_diff > self.TIMESTAMP_TOLERANCE_SECONDS:
return False, f"Timestamp too old (diff: {time_diff}s)"
# Step 2: Verify signature match
expected_signature = self.create_signature(
method, endpoint, timestamp, body, client_secret
)
if not hmac.compare_digest(signature, expected_signature):
return False, "Signature mismatch"
# Step 3: Check for replay (signature already used)
if self.redis_client:
nonce_key = f"hmac:nonce:{signature}"
if self.redis_client.exists(nonce_key):
return False, "Signature already used (replay attack)"
# Store nonce
self.redis_client.setex(nonce_key, self.REPLAY_NONCE_TTL_SECONDS, "1")
return True, ""
def _build_base_string(
self,
method: str,
endpoint: str,
timestamp: int,
body: dict = None,
) -> str:
"""Construct base string for signing"""
# Normalize method
method = method.upper()
# Hash body (sorted JSON)
body_hash = ""
if body:
body_json = json.dumps(body, sort_keys=True, separators=(',', ':'))
body_hash = hashlib.sha256(body_json.encode()).hexdigest()
# Base string format
base_string = f"{method}:{endpoint}:{timestamp}:{body_hash}"
return base_string
# Singleton instance
hmac_manager = HMACManager()