69 lines
2.5 KiB
Python
69 lines
2.5 KiB
Python
# permissions.py
|
||
import hashlib, secrets
|
||
from datetime import datetime, timedelta
|
||
from sqlalchemy import select
|
||
from models import Admin, Channel, ChannelAccess, SCOPE_POST, SCOPE_SHARE
|
||
from sqlalchemy.exc import OperationalError
|
||
|
||
|
||
|
||
def make_token(nbytes: int = 9) -> str:
|
||
# Короткий URL-safe токен (<= ~12-16 символов укладывается в /start payload)
|
||
return secrets.token_urlsafe(nbytes)
|
||
|
||
def token_hash(token: str) -> str:
|
||
return hashlib.sha256(token.encode('utf-8')).hexdigest()
|
||
|
||
async def get_or_create_admin(session, tg_id: int) -> Admin:
|
||
res = await session.execute(select(Admin).where(Admin.tg_id == tg_id))
|
||
admin = res.scalar_one_or_none()
|
||
if not admin:
|
||
admin = Admin(tg_id=tg_id)
|
||
session.add(admin)
|
||
await session.flush()
|
||
return admin
|
||
|
||
async def has_scope_on_channel(session, admin_id: int, channel_id: int, scope: int) -> bool:
|
||
# Владелец канала — всегда полный доступ
|
||
res = await session.execute(select(Channel).where(Channel.id == channel_id))
|
||
ch = res.scalar_one_or_none()
|
||
if ch and ch.admin_id == admin_id:
|
||
return True
|
||
|
||
# Иначе ищем активный доступ с нужной маской
|
||
res = await session.execute(
|
||
select(ChannelAccess).where(
|
||
ChannelAccess.channel_id == channel_id,
|
||
ChannelAccess.invited_admin_id == admin_id,
|
||
ChannelAccess.status == "active",
|
||
)
|
||
)
|
||
acc = res.scalar_one_or_none()
|
||
if not acc:
|
||
return False
|
||
return (acc.scopes & scope) == scope
|
||
|
||
async def list_channels_for_admin(session, admin_id: int):
|
||
q1 = await session.execute(select(Channel).where(Channel.admin_id == admin_id))
|
||
owned = q1.scalars().all()
|
||
try:
|
||
q2 = await session.execute(
|
||
select(ChannelAccess).where(
|
||
ChannelAccess.invited_admin_id == admin_id,
|
||
ChannelAccess.status == "active",
|
||
)
|
||
)
|
||
rows = q2.scalars().all()
|
||
except OperationalError:
|
||
return owned # таблицы ещё нет — просто вернём свои каналы
|
||
|
||
can_post_ids = {r.channel_id for r in rows if (r.scopes & SCOPE_POST)}
|
||
if not can_post_ids:
|
||
return owned
|
||
q3 = await session.execute(select(Channel).where(Channel.id.in_(can_post_ids)))
|
||
shared = q3.scalars().all()
|
||
d = {c.id: c for c in owned}
|
||
for c in shared:
|
||
d[c.id] = c
|
||
return list(d.values())
|