add telegram auth and reminders foundation
This commit is contained in:
49
app/services/telegram_auth.py
Normal file
49
app/services/telegram_auth.py
Normal file
@@ -0,0 +1,49 @@
|
||||
import hashlib
|
||||
import hmac
|
||||
import json
|
||||
import time
|
||||
from urllib.parse import parse_qsl
|
||||
|
||||
from fastapi import HTTPException, status
|
||||
|
||||
|
||||
def _secret_key(bot_token: str, *, webapp: bool) -> bytes:
|
||||
if webapp:
|
||||
return hmac.new(b"WebAppData", bot_token.encode(), hashlib.sha256).digest()
|
||||
return hashlib.sha256(bot_token.encode()).digest()
|
||||
|
||||
|
||||
def verify_webapp_init_data(init_data: str, bot_token: str, max_age_seconds: int = 86400) -> dict:
|
||||
values = dict(parse_qsl(init_data, keep_blank_values=True))
|
||||
received_hash = values.pop("hash", "")
|
||||
if not received_hash:
|
||||
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Telegram hash missing")
|
||||
auth_date = int(values.get("auth_date") or 0)
|
||||
if max_age_seconds and auth_date and time.time() - auth_date > max_age_seconds:
|
||||
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Telegram auth expired")
|
||||
|
||||
data_check_string = "\n".join(f"{key}={values[key]}" for key in sorted(values))
|
||||
expected = hmac.new(_secret_key(bot_token, webapp=True), data_check_string.encode(), hashlib.sha256).hexdigest()
|
||||
if not hmac.compare_digest(expected, received_hash):
|
||||
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Telegram auth invalid")
|
||||
|
||||
user_raw = values.get("user")
|
||||
if not user_raw:
|
||||
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Telegram user missing")
|
||||
return json.loads(user_raw)
|
||||
|
||||
|
||||
def verify_login_widget(payload: dict, bot_token: str, max_age_seconds: int = 86400) -> dict:
|
||||
values = {key: value for key, value in payload.items() if value is not None}
|
||||
received_hash = str(values.pop("hash", ""))
|
||||
if not received_hash:
|
||||
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Telegram hash missing")
|
||||
auth_date = int(values.get("auth_date") or 0)
|
||||
if max_age_seconds and auth_date and time.time() - auth_date > max_age_seconds:
|
||||
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Telegram auth expired")
|
||||
|
||||
data_check_string = "\n".join(f"{key}={values[key]}" for key in sorted(values))
|
||||
expected = hmac.new(_secret_key(bot_token, webapp=False), data_check_string.encode(), hashlib.sha256).hexdigest()
|
||||
if not hmac.compare_digest(expected, received_hash):
|
||||
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Telegram auth invalid")
|
||||
return values
|
||||
Reference in New Issue
Block a user