# handlers/new_post.py from __future__ import annotations from typing import List, Optional, Tuple from telegram import ( Update, InlineKeyboardMarkup, InlineKeyboardButton, MessageEntity, Bot ) from telegram.ext import ( ContextTypes, ConversationHandler, MessageHandler, CommandHandler, CallbackQueryHandler, filters ) from telegram.constants import MessageEntityType from telegram.error import BadRequest from sqlalchemy import select as sa_select from db import AsyncSessionLocal from models import Channel, Group from .permissions import get_or_create_admin, list_channels_for_admin, has_scope_on_channel, SCOPE_POST from models import Channel, Group, Button SELECT_MEDIA, SELECT_TEXT, SELECT_TARGET = range(3) # ===== UTF-16 helpers (для custom_emoji) ===== def _utf16_units_len(s: str) -> int: return len(s.encode("utf-16-le")) // 2 def _utf16_index_map(text: str) -> List[Tuple[int, int, str]]: out: List[Tuple[int, int, str]] = [] off = 0 for ch in text: ln = _utf16_units_len(ch) out.append((off, ln, ch)) off += ln return out def _split_custom_emoji_by_utf16(text: str, entities: List[MessageEntity]) -> List[MessageEntity]: if not text or not entities: return entities or [] map_utf16 = _utf16_index_map(text) out: List[MessageEntity] = [] for e in entities: if (e.type == MessageEntityType.CUSTOM_EMOJI and e.length and e.length > 1 and getattr(e, "custom_emoji_id", None)): start = e.offset end = e.offset + e.length for uoff, ulen, _ in map_utf16: if start <= uoff < end: out.append(MessageEntity( type=MessageEntityType.CUSTOM_EMOJI, offset=uoff, length=ulen, custom_emoji_id=e.custom_emoji_id, )) else: out.append(e) out.sort(key=lambda x: x.offset) return out def _strip_broken_entities(entities: Optional[List[MessageEntity]]) -> List[MessageEntity]: cleaned: List[MessageEntity] = [] for e in entities or []: if e.offset is None or e.length is None or e.offset < 0 or e.length < 1: continue if e.type == MessageEntityType.CUSTOM_EMOJI and not getattr(e, "custom_emoji_id", None): continue cleaned.append(e) cleaned.sort(key=lambda x: x.offset) return cleaned def _extract_text_and_entities(msg) -> tuple[str, List[MessageEntity], bool]: if getattr(msg, "text", None): return msg.text, (msg.entities or []), False if getattr(msg, "caption", None): return msg.caption, (msg.caption_entities or []), True return "", [], False # ===== Conversation ===== async def new_post_start(update: Update, context: ContextTypes.DEFAULT_TYPE): if update.message: await update.message.reply_text("Отправьте медиа для поста или пришлите /skip:") return SELECT_MEDIA return ConversationHandler.END async def select_media(update: Update, context: ContextTypes.DEFAULT_TYPE): if context.user_data is None: context.user_data = {} if not update.message: return ConversationHandler.END msg = update.message if msg.text and msg.text.strip().lower() == "/skip": await update.message.reply_text("Введите текст поста или перешлите сообщение (можно с кастом-эмодзи):") return SELECT_TEXT if msg.photo: context.user_data["photo"] = msg.photo[-1].file_id elif msg.animation:context.user_data["animation"] = msg.animation.file_id elif msg.video: context.user_data["video"] = msg.video.file_id elif msg.document: context.user_data["document"] = msg.document.file_id elif msg.audio: context.user_data["audio"] = msg.audio.file_id elif msg.voice: context.user_data["voice"] = msg.voice.file_id elif msg.sticker: context.user_data["sticker"] = msg.sticker.file_id await update.message.reply_text("Введите текст поста или перешлите сообщение (можно с кастом-эмодзи):") return SELECT_TEXT async def select_text(update: Update, context: ContextTypes.DEFAULT_TYPE): if not update.message: return ConversationHandler.END if context.user_data is None: context.user_data = {} msg = update.message text, entities, _ = _extract_text_and_entities(msg) entities = _strip_broken_entities(entities) entities = _split_custom_emoji_by_utf16(text, entities) # сохраним исходник для copyMessage context.user_data["text"] = text context.user_data["entities"] = entities context.user_data["src_chat_id"] = update.effective_chat.id context.user_data["src_msg_id"] = update.message.message_id # дать выбор только тех каналов, где у текущего админа есть право постинга async with AsyncSessionLocal() as session: me = await get_or_create_admin(session, update.effective_user.id) channels = await list_channels_for_admin(session, me.id) # группы оставляем без ACL (как было) groups = (await session.execute(sa_select(Group))).scalars().all() # если каналов нет — всё равно покажем группы keyboard = [] for c in channels: keyboard.append([InlineKeyboardButton(f'Канал: {c.name}', callback_data=f'channel_{c.id}')]) for g in groups: keyboard.append([InlineKeyboardButton(f'Группа: {g.name}', callback_data=f'group_{g.id}')]) if not keyboard: await update.message.reply_text("Нет доступных каналов/групп для отправки.") return ConversationHandler.END await update.message.reply_text('Выберите, куда отправить пост:', reply_markup=InlineKeyboardMarkup(keyboard)) return SELECT_TARGET # async def select_target(update: Update, context: ContextTypes.DEFAULT_TYPE): # query = update.callback_query # if not query: # return ConversationHandler.END # await query.answer() # data = query.data # async with AsyncSessionLocal() as session: # chat_id = None # markup = None # selected_kind = None # "channel" | "group" # selected_title = None # if data and data.startswith('channel_'): # selected_kind = "channel" # channel_id = int(data.split('_')[1]) # # ACL: право постинга # me = await get_or_create_admin(session, update.effective_user.id) # allowed = await has_scope_on_channel(session, me.id, channel_id, SCOPE_POST) # if not allowed: # await query.edit_message_text("У вас нет права постить в этот канал.") # return ConversationHandler.END # channel = (await session.execute(sa_select(Channel).where(Channel.id == channel_id))).scalar_one_or_none() # if not channel: # await query.edit_message_text("Канал не найден.") # return ConversationHandler.END # chat_id = (channel.link or "").strip() # selected_title = channel.name # # кнопки канала # btns = (await session.execute(sa_select(Button).where(Button.channel_id == channel_id))).scalars().all() # if btns: # markup = InlineKeyboardMarkup( # [[InlineKeyboardButton(str(b.name), url=str(b.url))] for b in btns] # ) # elif data and data.startswith('group_'): # selected_kind = "group" # group_id = int(data.split('_')[1]) # group = (await session.execute(sa_select(Group).where(Group.id == group_id))).scalar_one_or_none() # if not group: # await query.edit_message_text("Группа не найдена.") # return ConversationHandler.END # chat_id = (group.link or "").strip() # selected_title = group.name # # кнопки группы # btns = (await session.execute(sa_select(Button).where(Button.group_id == group_id))).scalars().all() # if btns: # markup = InlineKeyboardMarkup( # [[InlineKeyboardButton(str(b.name), url=str(b.url))] for b in btns] # ) # if not chat_id or not (chat_id.startswith('@') or chat_id.startswith('-')): # await query.edit_message_text('Ошибка: ссылка должна быть username (@channel) или числовой ID (-100...)') # return ConversationHandler.END # ud = context.user_data or {} # text: str = ud.get("text", "") or "" # entities: List[MessageEntity] = ud.get("entities", []) or [] # # санация для custom_emoji # entities = _strip_broken_entities(entities) # entities = _split_custom_emoji_by_utf16(text, entities) # # если есть клавиатура, ПРЕДПОЧИТАЕМ отправку "вручную" (а не copyMessage), # # т.к. в некоторых кейсах клавиатура при копировании может не приклеиться # prefer_manual_send = markup is not None # # определим, были ли “части медиа” # has_media_parts = any(k in ud for k in ("photo","animation","video","document","audio","voice","sticker")) # try: # if not prefer_manual_send and ud.get("src_chat_id") and ud.get("src_msg_id") and not has_media_parts: # # 1) copyMessage без медиа и без клавиатуры — максимальная идентичность # msg_id_obj = await context.bot.copy_message( # chat_id=chat_id, # from_chat_id=ud["src_chat_id"], # message_id=ud["src_msg_id"], # reply_markup=markup # если вдруг есть, попробуем сразу # ) # # страховка: если клавиатуры нет/не приклеилась — прикрутим edit_message_reply_markup # if markup and getattr(msg_id_obj, "message_id", None): # try: # await context.bot.edit_message_reply_markup( # chat_id=chat_id, # message_id=msg_id_obj.message_id, # reply_markup=markup # ) # except Exception: # pass # await query.edit_message_text(f'Пост отправлен{" в: " + selected_title if selected_title else "!"}') # return ConversationHandler.END # # 2) fallback / manual — ВСЕГДА прикладываем reply_markup # sent = False # if "photo" in ud: # await context.bot.send_photo(chat_id=chat_id, photo=ud["photo"], # caption=(text or None), caption_entities=(entities if text else None), # reply_markup=markup) # sent = True # elif "animation" in ud: # await context.bot.send_animation(chat_id=chat_id, animation=ud["animation"], # caption=(text or None), caption_entities=(entities if text else None), # reply_markup=markup) # sent = True # elif "video" in ud: # await context.bot.send_video(chat_id=chat_id, video=ud["video"], # caption=(text or None), caption_entities=(entities if text else None), # reply_markup=markup) # sent = True # elif "document" in ud: # await context.bot.send_document(chat_id=chat_id, document=ud["document"], # caption=(text or None), caption_entities=(entities if text else None), # reply_markup=markup) # sent = True # elif "audio" in ud: # await context.bot.send_audio(chat_id=chat_id, audio=ud["audio"], # caption=(text or None), caption_entities=(entities if text else None), # reply_markup=markup) # sent = True # elif "voice" in ud: # await context.bot.send_voice(chat_id=chat_id, voice=ud["voice"], # caption=(text or None), caption_entities=(entities if text else None), # reply_markup=markup) # sent = True # elif "sticker" in ud: # await context.bot.send_sticker(chat_id=chat_id, sticker=ud["sticker"], reply_markup=markup) # if text: # await context.bot.send_message(chat_id=chat_id, text=text, entities=entities) # sent = True # else: # await context.bot.send_message(chat_id=chat_id, text=text, entities=entities, reply_markup=markup) # sent = True # await query.edit_message_text(f'Пост отправлен{" в: " + selected_title if selected_title else "!"}' if sent else 'Ошибка: не удалось отправить сообщение.') # except BadRequest as e: # await query.edit_message_text(f'Ошибка отправки поста: {e}') # return ConversationHandler.END async def select_target(update: Update, context: ContextTypes.DEFAULT_TYPE): query = update.callback_query if not query: return ConversationHandler.END await query.answer() data = query.data async with AsyncSessionLocal() as session: chat_id = None markup = None selected_title = None if data and data.startswith('channel_'): channel_id = int(data.split('_')[1]) # ACL me = await get_or_create_admin(session, update.effective_user.id) if not await has_scope_on_channel(session, me.id, channel_id, SCOPE_POST): await query.edit_message_text("У вас нет права постить в этот канал.") return ConversationHandler.END channel = (await session.execute(sa_select(Channel).where(Channel.id == channel_id))).scalar_one_or_none() if not channel: await query.edit_message_text("Канал не найден.") return ConversationHandler.END chat_id = (channel.link or "").strip() selected_title = channel.name # КНОПКИ btns = (await session.execute(sa_select(Button).where(Button.channel_id == channel_id))).scalars().all() if btns: markup = InlineKeyboardMarkup([[InlineKeyboardButton(str(b.name), url=str(b.url))] for b in btns]) elif data and data.startswith('group_'): group_id = int(data.split('_')[1]) group = (await session.execute(sa_select(Group).where(Group.id == group_id))).scalar_one_or_none() if not group: await query.edit_message_text("Группа не найдена.") return ConversationHandler.END chat_id = (group.link or "").strip() selected_title = group.name btns = (await session.execute(sa_select(Button).where(Button.group_id == group_id))).scalars().all() if btns: markup = InlineKeyboardMarkup([[InlineKeyboardButton(str(b.name), url=str(b.url))] for b in btns]) if not chat_id or not (chat_id.startswith('@') or chat_id.startswith('-')): await query.edit_message_text('Ошибка: ссылка должна быть @username или -100...') return ConversationHandler.END ud = context.user_data or {} text: str = ud.get("text", "") or "" entities: List[MessageEntity] = ud.get("entities", []) or [] # санация custom_emoji/UTF-16 entities = _strip_broken_entities(entities) entities = _split_custom_emoji_by_utf16(text, entities) # >>> ВАЖНО: НИКАКОГО copyMessage. Всегда manual send с reply_markup <<< try: sent = False if "photo" in ud: await context.bot.send_photo(chat_id=chat_id, photo=ud["photo"], caption=(text or None), caption_entities=(entities if text else None), reply_markup=markup) sent = True elif "animation" in ud: await context.bot.send_animation(chat_id=chat_id, animation=ud["animation"], caption=(text or None), caption_entities=(entities if text else None), reply_markup=markup) sent = True elif "video" in ud: await context.bot.send_video(chat_id=chat_id, video=ud["video"], caption=(text or None), caption_entities=(entities if text else None), reply_markup=markup) sent = True elif "document" in ud: await context.bot.send_document(chat_id=chat_id, document=ud["document"], caption=(text or None), caption_entities=(entities if text else None), reply_markup=markup) sent = True elif "audio" in ud: await context.bot.send_audio(chat_id=chat_id, audio=ud["audio"], caption=(text or None), caption_entities=(entities if text else None), reply_markup=markup) sent = True elif "voice" in ud: await context.bot.send_voice(chat_id=chat_id, voice=ud["voice"], caption=(text or None), caption_entities=(entities if text else None), reply_markup=markup) sent = True elif "sticker" in ud: await context.bot.send_sticker(chat_id=chat_id, sticker=ud["sticker"], reply_markup=markup) if text: await context.bot.send_message(chat_id=chat_id, text=text, entities=entities) sent = True else: await context.bot.send_message(chat_id=chat_id, text=text, entities=entities, reply_markup=markup) sent = True await query.edit_message_text(f'Пост отправлен{(" в: " + selected_title) if selected_title else "!"}' if sent else 'Ошибка: не удалось отправить сообщение.') except BadRequest as e: await query.edit_message_text(f'Ошибка отправки поста: {e}') return ConversationHandler.END new_post_conv = ConversationHandler( entry_points=[CommandHandler("new_post", new_post_start)], states={ SELECT_MEDIA: [MessageHandler( filters.PHOTO | filters.ANIMATION | filters.VIDEO | filters.Document.ALL | filters.AUDIO | filters.VOICE | filters.Sticker.ALL | filters.COMMAND, select_media )], SELECT_TEXT: [MessageHandler(filters.TEXT | filters.FORWARDED | filters.CAPTION, select_text)], SELECT_TARGET: [CallbackQueryHandler(select_target)], }, fallbacks=[], )