50 lines
2.4 KiB
Python
50 lines
2.4 KiB
Python
import hashlib
|
|
import hmac
|
|
import json
|
|
import time
|
|
from urllib.parse import parse_qsl
|
|
|
|
from fastapi import HTTPException, status
|
|
|
|
|
|
def _secret_key(bot_token: str, *, webapp: bool) -> bytes:
|
|
if webapp:
|
|
return hmac.new(b"WebAppData", bot_token.encode(), hashlib.sha256).digest()
|
|
return hashlib.sha256(bot_token.encode()).digest()
|
|
|
|
|
|
def verify_webapp_init_data(init_data: str, bot_token: str, max_age_seconds: int = 86400) -> dict:
|
|
values = dict(parse_qsl(init_data, keep_blank_values=True))
|
|
received_hash = values.pop("hash", "")
|
|
if not received_hash:
|
|
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Telegram hash missing")
|
|
auth_date = int(values.get("auth_date") or 0)
|
|
if max_age_seconds and auth_date and time.time() - auth_date > max_age_seconds:
|
|
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Telegram auth expired")
|
|
|
|
data_check_string = "\n".join(f"{key}={values[key]}" for key in sorted(values))
|
|
expected = hmac.new(_secret_key(bot_token, webapp=True), data_check_string.encode(), hashlib.sha256).hexdigest()
|
|
if not hmac.compare_digest(expected, received_hash):
|
|
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Telegram auth invalid")
|
|
|
|
user_raw = values.get("user")
|
|
if not user_raw:
|
|
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Telegram user missing")
|
|
return json.loads(user_raw)
|
|
|
|
|
|
def verify_login_widget(payload: dict, bot_token: str, max_age_seconds: int = 86400) -> dict:
|
|
values = {key: value for key, value in payload.items() if value is not None}
|
|
received_hash = str(values.pop("hash", ""))
|
|
if not received_hash:
|
|
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Telegram hash missing")
|
|
auth_date = int(values.get("auth_date") or 0)
|
|
if max_age_seconds and auth_date and time.time() - auth_date > max_age_seconds:
|
|
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Telegram auth expired")
|
|
|
|
data_check_string = "\n".join(f"{key}={values[key]}" for key in sorted(values))
|
|
expected = hmac.new(_secret_key(bot_token, webapp=False), data_check_string.encode(), hashlib.sha256).hexdigest()
|
|
if not hmac.compare_digest(expected, received_hash):
|
|
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Telegram auth invalid")
|
|
return values
|