""" HMAC Signature Verification - Replay Attack Prevention & Request Integrity """ import hashlib import hmac import json from datetime import datetime from typing import Tuple from urllib.parse import urlencode from app.core.config import settings import redis class HMACManager: """ Request signing and verification using HMAC-SHA256. Signature Format: ──────────────────────────────────────────────────── base_string = METHOD + ENDPOINT + TIMESTAMP + hash(BODY) signature = HMAC_SHA256(base_string, client_secret) Headers Required: - X-Signature: base64(signature) - X-Timestamp: unix timestamp (seconds) - X-Client-Id: client identifier Anti-Replay Protection: - Check timestamp freshness (±30 seconds) - Store signature hash in Redis with 1-minute TTL - Reject duplicate signatures (nonce check) """ # Configuration TIMESTAMP_TOLERANCE_SECONDS = 30 REPLAY_NONCE_TTL_SECONDS = 60 def __init__(self, redis_client: redis.Redis = None): self.redis_client = redis_client self.algorithm = "sha256" def create_signature( self, method: str, endpoint: str, timestamp: int, body: dict = None, client_secret: str = None, ) -> str: """ Create HMAC signature for request. Args: method: HTTP method (GET, POST, PUT, DELETE) endpoint: API endpoint path (/api/v1/transactions) timestamp: Unix timestamp body: Request body dictionary client_secret: Shared secret key Returns: Base64-encoded signature """ if client_secret is None: client_secret = settings.hmac_secret_key # Create base string base_string = self._build_base_string(method, endpoint, timestamp, body) # Generate HMAC signature = hmac.new( client_secret.encode(), base_string.encode(), hashlib.sha256 ).hexdigest() return signature def verify_signature( self, method: str, endpoint: str, timestamp: int, signature: str, body: dict = None, client_secret: str = None, ) -> Tuple[bool, str]: """ Verify HMAC signature and check for replay attacks. Returns: (is_valid, error_message) """ if client_secret is None: client_secret = settings.hmac_secret_key # Step 1: Check timestamp freshness now = datetime.utcnow().timestamp() time_diff = abs(now - timestamp) if time_diff > self.TIMESTAMP_TOLERANCE_SECONDS: return False, f"Timestamp too old (diff: {time_diff}s)" # Step 2: Verify signature match expected_signature = self.create_signature( method, endpoint, timestamp, body, client_secret ) if not hmac.compare_digest(signature, expected_signature): return False, "Signature mismatch" # Step 3: Check for replay (signature already used) if self.redis_client: nonce_key = f"hmac:nonce:{signature}" if self.redis_client.exists(nonce_key): return False, "Signature already used (replay attack)" # Store nonce self.redis_client.setex(nonce_key, self.REPLAY_NONCE_TTL_SECONDS, "1") return True, "" def _build_base_string( self, method: str, endpoint: str, timestamp: int, body: dict = None, ) -> str: """Construct base string for signing""" # Normalize method method = method.upper() # Hash body (sorted JSON) body_hash = "" if body: body_json = json.dumps(body, sort_keys=True, separators=(',', ':')) body_hash = hashlib.sha256(body_json.encode()).hexdigest() # Base string format base_string = f"{method}:{endpoint}:{timestamp}:{body_hash}" return base_string # Singleton instance hmac_manager = HMACManager()