Files
post_bot/handlers/new_post.py
Choi A.K. 86f7c65697
Some checks failed
continuous-integration/drone/push Build is failing
Merge branch 'main' into security
2025-09-07 14:22:04 +09:00

370 lines
16 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# handlers/new_post.py
from __future__ import annotations
from typing import List, Optional, Tuple
from telegram import (
Update, InlineKeyboardMarkup, InlineKeyboardButton, MessageEntity, Bot
)
from telegram.ext import (
ContextTypes, ConversationHandler, MessageHandler, CommandHandler, CallbackQueryHandler, filters
)
from telegram.constants import MessageEntityType
from telegram.error import BadRequest
from sqlalchemy import select as sa_select
from db import AsyncSessionLocal
<<<<<<< HEAD
from models import Channel, Group, Button, Admin
=======
from models import Channel, Group
from .permissions import get_or_create_admin, list_channels_for_admin, has_scope_on_channel, SCOPE_POST
from models import Channel, Group, Button
>>>>>>> main
SELECT_MEDIA, SELECT_TEXT, SELECT_TARGET = range(3)
# ===== UTF-16 helpers (для custom_emoji) =====
def _utf16_units_len(s: str) -> int:
return len(s.encode("utf-16-le")) // 2
def _utf16_index_map(text: str) -> List[Tuple[int, int, str]]:
out: List[Tuple[int, int, str]] = []
off = 0
for ch in text:
ln = _utf16_units_len(ch)
out.append((off, ln, ch))
off += ln
return out
def _split_custom_emoji_by_utf16(text: str, entities: List[MessageEntity]) -> List[MessageEntity]:
if not text or not entities:
return entities or []
map_utf16 = _utf16_index_map(text)
out: List[MessageEntity] = []
for e in entities:
if (e.type == MessageEntityType.CUSTOM_EMOJI and e.length and e.length > 1 and getattr(e, "custom_emoji_id", None)):
start = e.offset
end = e.offset + e.length
for uoff, ulen, _ in map_utf16:
if start <= uoff < end:
out.append(MessageEntity(
type=MessageEntityType.CUSTOM_EMOJI,
offset=uoff,
length=ulen,
custom_emoji_id=e.custom_emoji_id,
))
else:
out.append(e)
out.sort(key=lambda x: x.offset)
return out
def _strip_broken_entities(entities: Optional[List[MessageEntity]]) -> List[MessageEntity]:
cleaned: List[MessageEntity] = []
for e in entities or []:
if e.offset is None or e.length is None or e.offset < 0 or e.length < 1:
continue
if e.type == MessageEntityType.CUSTOM_EMOJI and not getattr(e, "custom_emoji_id", None):
continue
cleaned.append(e)
cleaned.sort(key=lambda x: x.offset)
return cleaned
def _extract_text_and_entities(msg) -> tuple[str, List[MessageEntity], bool]:
if getattr(msg, "text", None):
return msg.text, (msg.entities or []), False
if getattr(msg, "caption", None):
return msg.caption, (msg.caption_entities or []), True
return "", [], False
# ===== Conversation =====
async def new_post_start(update: Update, context: ContextTypes.DEFAULT_TYPE):
if update.message:
await update.message.reply_text("Отправьте медиа для поста или пришлите /skip:")
return SELECT_MEDIA
return ConversationHandler.END
async def select_media(update: Update, context: ContextTypes.DEFAULT_TYPE):
if context.user_data is None:
context.user_data = {}
if not update.message:
return ConversationHandler.END
msg = update.message
if msg.text and msg.text.strip().lower() == "/skip":
await update.message.reply_text("Введите текст поста или перешлите сообщение (можно с кастом-эмодзи):")
return SELECT_TEXT
if msg.photo: context.user_data["photo"] = msg.photo[-1].file_id
elif msg.animation:context.user_data["animation"] = msg.animation.file_id
elif msg.video: context.user_data["video"] = msg.video.file_id
elif msg.document: context.user_data["document"] = msg.document.file_id
elif msg.audio: context.user_data["audio"] = msg.audio.file_id
elif msg.voice: context.user_data["voice"] = msg.voice.file_id
elif msg.sticker: context.user_data["sticker"] = msg.sticker.file_id
await update.message.reply_text("Введите текст поста или перешлите сообщение (можно с кастом-эмодзи):")
return SELECT_TEXT
async def select_text(update: Update, context: ContextTypes.DEFAULT_TYPE):
<<<<<<< HEAD
if update.message:
if context.user_data is None:
context.user_data = {}
context.user_data['text'] = getattr(update.message, 'text', None) or getattr(update.message, 'caption', None)
from sqlalchemy import select
session = AsyncSessionLocal()
user_id = update.effective_user.id if update.effective_user else None
try:
# Ограничиваем каналы и группы только теми, где пользователь — админ
channels_result = await session.execute(
select(Channel).join(Button, isouter=True).join(Group, isouter=True)
.join(Admin, Channel.id == Admin.channel_id)
.where(Admin.tg_id == user_id)
)
channels = channels_result.scalars().all()
groups_result = await session.execute(
select(Group).join(Button, isouter=True)
.join(Admin, Group.id == Admin.channel_id)
.where(Admin.tg_id == user_id)
)
groups = groups_result.scalars().all()
keyboard = []
for c in channels:
keyboard.append([InlineKeyboardButton(f'Канал: {getattr(c, "name", str(c.name))}', callback_data=f'channel_{getattr(c, "id", str(c.id))}')])
for g in groups:
keyboard.append([InlineKeyboardButton(f'Группа: {getattr(g, "name", str(g.name))}', callback_data=f'group_{getattr(g, "id", str(g.id))}')])
reply_markup = InlineKeyboardMarkup(keyboard)
await update.message.reply_text('Выберите, куда отправить пост:', reply_markup=reply_markup)
# Сохраняем id исходного сообщения для пересылки
context.user_data['forward_message_id'] = update.message.message_id
context.user_data['forward_chat_id'] = update.message.chat_id
return SELECT_TARGET
finally:
await session.close()
return ConversationHandler.END
=======
if not update.message:
return ConversationHandler.END
if context.user_data is None:
context.user_data = {}
msg = update.message
text, entities, _ = _extract_text_and_entities(msg)
entities = _strip_broken_entities(entities)
entities = _split_custom_emoji_by_utf16(text, entities)
# сохраним исходник для copyMessage
context.user_data["text"] = text
context.user_data["entities"] = entities
context.user_data["src_chat_id"] = update.effective_chat.id
context.user_data["src_msg_id"] = update.message.message_id
# дать выбор только тех каналов, где у текущего админа есть право постинга
async with AsyncSessionLocal() as session:
me = await get_or_create_admin(session, update.effective_user.id)
channels = await list_channels_for_admin(session, me.id)
# группы оставляем без ACL (как было)
groups = (await session.execute(sa_select(Group))).scalars().all()
# если каналов нет — всё равно покажем группы
keyboard = []
for c in channels:
keyboard.append([InlineKeyboardButton(f'Канал: {c.name}', callback_data=f'channel_{c.id}')])
for g in groups:
keyboard.append([InlineKeyboardButton(f'Группа: {g.name}', callback_data=f'group_{g.id}')])
if not keyboard:
await update.message.reply_text("Нет доступных каналов/групп для отправки.")
return ConversationHandler.END
await update.message.reply_text('Выберите, куда отправить пост:', reply_markup=InlineKeyboardMarkup(keyboard))
return SELECT_TARGET
>>>>>>> main
async def select_target(update: Update, context: ContextTypes.DEFAULT_TYPE):
query = update.callback_query
if not query:
return ConversationHandler.END
await query.answer()
data = (query.data or "")
async with AsyncSessionLocal() as session:
chat_id: str | None = None
markup: InlineKeyboardMarkup | None = None
selected_title: str | None = None
btns = []
if data.startswith('channel_'):
channel_id = int(data.split('_', 1)[1])
# ACL: право постинга в канал
me = await get_or_create_admin(session, update.effective_user.id)
allowed = await has_scope_on_channel(session, me.id, channel_id, SCOPE_POST)
if not allowed:
await query.edit_message_text("У вас нет права постить в этот канал.")
return ConversationHandler.END
channel = (await session.execute(sa_select(Channel).where(Channel.id == channel_id))).scalar_one_or_none()
if not channel:
await query.edit_message_text("Канал не найден.")
return ConversationHandler.END
chat_id = (channel.link or "").strip()
selected_title = channel.name
# Кнопки канала
btns = (await session.execute(sa_select(Button).where(Button.channel_id == channel_id))).scalars().all()
if btns:
rows = [[InlineKeyboardButton(str(b.name), url=str(b.url))] for b in btns]
markup = InlineKeyboardMarkup(rows)
elif data.startswith('group_'):
group_id = int(data.split('_', 1)[1])
group = (await session.execute(sa_select(Group).where(Group.id == group_id))).scalar_one_or_none()
if not group:
await query.edit_message_text("Группа не найдена.")
return ConversationHandler.END
chat_id = (group.link or "").strip()
selected_title = group.name
# Кнопки группы
btns = (await session.execute(sa_select(Button).where(Button.group_id == group_id))).scalars().all()
if btns:
rows = [[InlineKeyboardButton(str(b.name), url=str(b.url))] for b in btns]
markup = InlineKeyboardMarkup(rows)
if not chat_id or not (chat_id.startswith('@') or chat_id.startswith('-')):
await query.edit_message_text('Ошибка: ссылка должна быть username (@channel) или числовой ID (-100...)')
return ConversationHandler.END
# DEBUG: сколько кнопок нашли и есть ли markup
print(f"[DEBUG] send -> chat_id={chat_id} title={selected_title!r} buttons={len(btns)} has_markup={bool(markup)}")
# Текст и entities (без parse_mode)
ud = context.user_data or {}
text: str = ud.get("text", "") or ""
entities: List[MessageEntity] = ud.get("entities", []) or []
entities = _strip_broken_entities(entities)
entities = _split_custom_emoji_by_utf16(text, entities)
# Всегда ручная отправка (send_*), чтобы гарантированно приклеить inline-клавиатуру
try:
sent_msg = None
if "photo" in ud:
sent_msg = await context.bot.send_photo(
chat_id=chat_id,
photo=ud["photo"],
caption=(text or None),
caption_entities=(entities if text else None),
reply_markup=markup,
)
elif "animation" in ud:
sent_msg = await context.bot.send_animation(
chat_id=chat_id,
animation=ud["animation"],
caption=(text or None),
caption_entities=(entities if text else None),
reply_markup=markup,
)
elif "video" in ud:
sent_msg = await context.bot.send_video(
chat_id=chat_id,
video=ud["video"],
caption=(text or None),
caption_entities=(entities if text else None),
reply_markup=markup,
)
elif "document" in ud:
sent_msg = await context.bot.send_document(
chat_id=chat_id,
document=ud["document"],
caption=(text or None),
caption_entities=(entities if text else None),
reply_markup=markup,
)
elif "audio" in ud:
sent_msg = await context.bot.send_audio(
chat_id=chat_id,
audio=ud["audio"],
caption=(text or None),
caption_entities=(entities if text else None),
reply_markup=markup,
)
elif "voice" in ud:
sent_msg = await context.bot.send_voice(
chat_id=chat_id,
voice=ud["voice"],
caption=(text or None),
caption_entities=(entities if text else None),
reply_markup=markup,
)
elif "sticker" in ud:
sent_msg = await context.bot.send_sticker(
chat_id=chat_id,
sticker=ud["sticker"],
reply_markup=markup,
)
if text:
await context.bot.send_message(chat_id=chat_id, text=text, entities=entities)
else:
sent_msg = await context.bot.send_message(
chat_id=chat_id,
text=text,
entities=entities,
reply_markup=markup,
)
# Страховка: если вдруг Telegram проглотил клаву — доклеим её
if markup and getattr(sent_msg, "message_id", None):
try:
<<<<<<< HEAD
# Пересылка исходного сообщения
await context.bot.forward_message(
chat_id=chat_id,
from_chat_id=context.user_data.get('forward_chat_id'),
message_id=context.user_data.get('forward_message_id')
)
from db import log_action
user_id = update.effective_user.id if update.effective_user else None
log_action(user_id, "forward_post", f"chat_id={chat_id}, from_chat_id={context.user_data.get('forward_chat_id')}, message_id={context.user_data.get('forward_message_id')}")
await query.edit_message_text('Пост переслан!')
except Exception as e:
await query.edit_message_text(f'Ошибка пересылки поста: {e}')
finally:
await session.close()
=======
await context.bot.edit_message_reply_markup(
chat_id=chat_id,
message_id=sent_msg.message_id,
reply_markup=markup,
)
except Exception:
pass
await query.edit_message_text(f'Пост отправлен{(" в: " + selected_title) if selected_title else "!"}')
except BadRequest as e:
await query.edit_message_text(f'Ошибка отправки поста: {e}')
>>>>>>> main
return ConversationHandler.END
new_post_conv = ConversationHandler(
entry_points=[CommandHandler("new_post", new_post_start)],
states={
SELECT_MEDIA: [MessageHandler(
filters.PHOTO | filters.ANIMATION | filters.VIDEO | filters.Document.ALL |
filters.AUDIO | filters.VOICE | filters.Sticker.ALL | filters.COMMAND,
select_media
)],
SELECT_TEXT: [MessageHandler(filters.TEXT | filters.FORWARDED | filters.CAPTION, select_text)],
SELECT_TARGET: [CallbackQueryHandler(select_target)],
},
fallbacks=[],
)