Files
post_bot/handlers/permissions.py
Choi A.K. 18f91bbd40
Some checks failed
continuous-integration/drone/push Build is failing
script update
2025-09-06 14:04:18 +09:00

69 lines
2.5 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# permissions.py
import hashlib, secrets
from datetime import datetime, timedelta
from sqlalchemy import select
from models import Admin, Channel, ChannelAccess, SCOPE_POST, SCOPE_SHARE
from sqlalchemy.exc import OperationalError
def make_token(nbytes: int = 9) -> str:
# Короткий URL-safe токен (<= ~12-16 символов укладывается в /start payload)
return secrets.token_urlsafe(nbytes)
def token_hash(token: str) -> str:
return hashlib.sha256(token.encode('utf-8')).hexdigest()
async def get_or_create_admin(session, tg_id: int) -> Admin:
res = await session.execute(select(Admin).where(Admin.tg_id == tg_id))
admin = res.scalar_one_or_none()
if not admin:
admin = Admin(tg_id=tg_id)
session.add(admin)
await session.flush()
return admin
async def has_scope_on_channel(session, admin_id: int, channel_id: int, scope: int) -> bool:
# Владелец канала — всегда полный доступ
res = await session.execute(select(Channel).where(Channel.id == channel_id))
ch = res.scalar_one_or_none()
if ch and ch.admin_id == admin_id:
return True
# Иначе ищем активный доступ с нужной маской
res = await session.execute(
select(ChannelAccess).where(
ChannelAccess.channel_id == channel_id,
ChannelAccess.invited_admin_id == admin_id,
ChannelAccess.status == "active",
)
)
acc = res.scalar_one_or_none()
if not acc:
return False
return (acc.scopes & scope) == scope
async def list_channels_for_admin(session, admin_id: int):
q1 = await session.execute(select(Channel).where(Channel.admin_id == admin_id))
owned = q1.scalars().all()
try:
q2 = await session.execute(
select(ChannelAccess).where(
ChannelAccess.invited_admin_id == admin_id,
ChannelAccess.status == "active",
)
)
rows = q2.scalars().all()
except OperationalError:
return owned # таблицы ещё нет — просто вернём свои каналы
can_post_ids = {r.channel_id for r in rows if (r.scopes & SCOPE_POST)}
if not can_post_ids:
return owned
q3 = await session.execute(select(Channel).where(Channel.id.in_(can_post_ids)))
shared = q3.scalars().all()
d = {c.id: c for c in owned}
for c in shared:
d[c.id] = c
return list(d.values())