import hashlib import hmac import json import time from urllib.parse import parse_qsl from fastapi import HTTPException, status def _secret_key(bot_token: str, *, webapp: bool) -> bytes: if webapp: return hmac.new(b"WebAppData", bot_token.encode(), hashlib.sha256).digest() return hashlib.sha256(bot_token.encode()).digest() def verify_webapp_init_data(init_data: str, bot_token: str, max_age_seconds: int = 86400) -> dict: if not bot_token: raise HTTPException(status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail="BOT_TOKEN is not configured") values = dict(parse_qsl(init_data, keep_blank_values=True)) received_hash = values.pop("hash", "") if not received_hash: raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Telegram hash missing") auth_date = int(values.get("auth_date") or 0) if max_age_seconds and auth_date and time.time() - auth_date > max_age_seconds: raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Telegram auth expired") data_check_string = "\n".join(f"{key}={values[key]}" for key in sorted(values)) expected = hmac.new(_secret_key(bot_token, webapp=True), data_check_string.encode(), hashlib.sha256).hexdigest() if not hmac.compare_digest(expected, received_hash): raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Telegram auth invalid") user_raw = values.get("user") if not user_raw: raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Telegram user missing") return json.loads(user_raw) def verify_login_widget(payload: dict, bot_token: str, max_age_seconds: int = 86400) -> dict: if not bot_token: raise HTTPException(status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail="BOT_TOKEN is not configured") values = {key: value for key, value in payload.items() if value is not None} received_hash = str(values.pop("hash", "")) if not received_hash: raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Telegram hash missing") auth_date = int(values.get("auth_date") or 0) if max_age_seconds and auth_date and time.time() - auth_date > max_age_seconds: raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Telegram auth expired") data_check_string = "\n".join(f"{key}={values[key]}" for key in sorted(values)) expected = hmac.new(_secret_key(bot_token, webapp=False), data_check_string.encode(), hashlib.sha256).hexdigest() if not hmac.compare_digest(expected, received_hash): raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Telegram auth invalid") return values