# permissions.py import hashlib, secrets from datetime import datetime, timedelta from sqlalchemy import select from models import Admin, Channel, ChannelAccess, SCOPE_POST, SCOPE_SHARE def make_token(nbytes: int = 9) -> str: # Короткий URL-safe токен (<= ~12-16 символов укладывается в /start payload) return secrets.token_urlsafe(nbytes) def token_hash(token: str) -> str: return hashlib.sha256(token.encode('utf-8')).hexdigest() async def get_or_create_admin(session, tg_id: int) -> Admin: res = await session.execute(select(Admin).where(Admin.tg_id == tg_id)) admin = res.scalar_one_or_none() if not admin: admin = Admin(tg_id=tg_id) session.add(admin) await session.flush() return admin async def has_scope_on_channel(session, admin_id: int, channel_id: int, scope: int) -> bool: # Владелец канала — всегда полный доступ res = await session.execute(select(Channel).where(Channel.id == channel_id)) ch = res.scalar_one_or_none() if ch and ch.admin_id == admin_id: return True # Иначе ищем активный доступ с нужной маской res = await session.execute( select(ChannelAccess).where( ChannelAccess.channel_id == channel_id, ChannelAccess.invited_admin_id == admin_id, ChannelAccess.status == "active", ) ) acc = res.scalar_one_or_none() if not acc: return False return (acc.scopes & scope) == scope async def list_channels_for_admin(session, admin_id: int): """Каналы, куда можно постить: владелец + активные доступы с SCOPE_POST.""" # Владелец q1 = await session.execute(select(Channel).where(Channel.admin_id == admin_id)) owned = q1.scalars().all() # Доступы q2 = await session.execute( select(ChannelAccess).where( ChannelAccess.invited_admin_id == admin_id, ChannelAccess.status == "active", ) ) access_rows = q2.scalars().all() access_map = {ar.channel_id for ar in access_rows if (ar.scopes & SCOPE_POST)} if not access_map: return owned q3 = await session.execute(select(Channel).where(Channel.id.in_(access_map))) shared = q3.scalars().all() # Уникальный список (owner + shared) all_channels = {c.id: c for c in owned} for c in shared: all_channels[c.id] = c return list(all_channels.values())