146 lines
4.3 KiB
Python
146 lines
4.3 KiB
Python
"""
|
|
HMAC Signature Verification - Replay Attack Prevention & Request Integrity
|
|
"""
|
|
import hashlib
|
|
import hmac
|
|
import json
|
|
from datetime import datetime
|
|
from typing import Tuple
|
|
from urllib.parse import urlencode
|
|
from app.core.config import settings
|
|
import redis
|
|
|
|
|
|
class HMACManager:
|
|
"""
|
|
Request signing and verification using HMAC-SHA256.
|
|
|
|
Signature Format:
|
|
────────────────────────────────────────────────────
|
|
base_string = METHOD + ENDPOINT + TIMESTAMP + hash(BODY)
|
|
signature = HMAC_SHA256(base_string, client_secret)
|
|
|
|
Headers Required:
|
|
- X-Signature: base64(signature)
|
|
- X-Timestamp: unix timestamp (seconds)
|
|
- X-Client-Id: client identifier
|
|
|
|
Anti-Replay Protection:
|
|
- Check timestamp freshness (±30 seconds)
|
|
- Store signature hash in Redis with 1-minute TTL
|
|
- Reject duplicate signatures (nonce check)
|
|
"""
|
|
|
|
# Configuration
|
|
TIMESTAMP_TOLERANCE_SECONDS = 30
|
|
REPLAY_NONCE_TTL_SECONDS = 60
|
|
|
|
def __init__(self, redis_client: redis.Redis = None):
|
|
self.redis_client = redis_client
|
|
self.algorithm = "sha256"
|
|
|
|
def create_signature(
|
|
self,
|
|
method: str,
|
|
endpoint: str,
|
|
timestamp: int,
|
|
body: dict = None,
|
|
client_secret: str = None,
|
|
) -> str:
|
|
"""
|
|
Create HMAC signature for request.
|
|
|
|
Args:
|
|
method: HTTP method (GET, POST, PUT, DELETE)
|
|
endpoint: API endpoint path (/api/v1/transactions)
|
|
timestamp: Unix timestamp
|
|
body: Request body dictionary
|
|
client_secret: Shared secret key
|
|
|
|
Returns:
|
|
Base64-encoded signature
|
|
"""
|
|
if client_secret is None:
|
|
client_secret = settings.hmac_secret_key
|
|
|
|
# Create base string
|
|
base_string = self._build_base_string(method, endpoint, timestamp, body)
|
|
|
|
# Generate HMAC
|
|
signature = hmac.new(
|
|
client_secret.encode(),
|
|
base_string.encode(),
|
|
hashlib.sha256
|
|
).hexdigest()
|
|
|
|
return signature
|
|
|
|
def verify_signature(
|
|
self,
|
|
method: str,
|
|
endpoint: str,
|
|
timestamp: int,
|
|
signature: str,
|
|
body: dict = None,
|
|
client_secret: str = None,
|
|
) -> Tuple[bool, str]:
|
|
"""
|
|
Verify HMAC signature and check for replay attacks.
|
|
|
|
Returns:
|
|
(is_valid, error_message)
|
|
"""
|
|
if client_secret is None:
|
|
client_secret = settings.hmac_secret_key
|
|
|
|
# Step 1: Check timestamp freshness
|
|
now = datetime.utcnow().timestamp()
|
|
time_diff = abs(now - timestamp)
|
|
|
|
if time_diff > self.TIMESTAMP_TOLERANCE_SECONDS:
|
|
return False, f"Timestamp too old (diff: {time_diff}s)"
|
|
|
|
# Step 2: Verify signature match
|
|
expected_signature = self.create_signature(
|
|
method, endpoint, timestamp, body, client_secret
|
|
)
|
|
|
|
if not hmac.compare_digest(signature, expected_signature):
|
|
return False, "Signature mismatch"
|
|
|
|
# Step 3: Check for replay (signature already used)
|
|
if self.redis_client:
|
|
nonce_key = f"hmac:nonce:{signature}"
|
|
if self.redis_client.exists(nonce_key):
|
|
return False, "Signature already used (replay attack)"
|
|
|
|
# Store nonce
|
|
self.redis_client.setex(nonce_key, self.REPLAY_NONCE_TTL_SECONDS, "1")
|
|
|
|
return True, ""
|
|
|
|
def _build_base_string(
|
|
self,
|
|
method: str,
|
|
endpoint: str,
|
|
timestamp: int,
|
|
body: dict = None,
|
|
) -> str:
|
|
"""Construct base string for signing"""
|
|
# Normalize method
|
|
method = method.upper()
|
|
|
|
# Hash body (sorted JSON)
|
|
body_hash = ""
|
|
if body:
|
|
body_json = json.dumps(body, sort_keys=True, separators=(',', ':'))
|
|
body_hash = hashlib.sha256(body_json.encode()).hexdigest()
|
|
|
|
# Base string format
|
|
base_string = f"{method}:{endpoint}:{timestamp}:{body_hash}"
|
|
return base_string
|
|
|
|
|
|
# Singleton instance
|
|
hmac_manager = HMACManager()
|