from typing import List, Optional from telegram import ( Update, InlineKeyboardMarkup, InlineKeyboardButton, MessageEntity, ) from telegram.ext import ( ContextTypes, ConversationHandler, MessageHandler, CommandHandler, CallbackQueryHandler, filters, ) from telegram.constants import MessageEntityType from sqlalchemy import select as sa_select from db import AsyncSessionLocal from models import Channel, Group, Button SELECT_MEDIA, SELECT_TEXT, SELECT_TARGET = range(3) # ========== UTF-16 helpers ========== def _utf16_units_len(s: str) -> int: """Длина строки в UTF-16 code units (LE).""" return len(s.encode("utf-16-le")) // 2 def _normalize_custom_emoji_entities(text: str, entities: List[MessageEntity]) -> List[MessageEntity]: """ Для entity типа CUSTOM_EMOJI Telegram ожидает offset/length в UTF-16. Если вдруг кастомная эмодзи пришла с length > 1 (несколько символов), дробим на несколько entity, каждая длиной в реальное кол-во UTF-16 юнитов (обычно 2 на эмодзи). Остальные entity не трогаем (у них уже корректные UTF-16 offset/length). """ if not text or not entities: return entities or [] out: List[MessageEntity] = [] for e in entities: if ( e.type == MessageEntityType.CUSTOM_EMOJI and e.length is not None and e.length > 1 and getattr(e, "custom_emoji_id", None) ): # Возьмём срез по codepoints — нормально, но считать длины будем в UTF-16 substr = text[e.offset: e.offset + e.length] rel_utf16 = 0 for ch in substr: ch_len16 = _utf16_units_len(ch) # чаще всего 2 out.append( MessageEntity( type=MessageEntityType.CUSTOM_EMOJI, offset=e.offset + rel_utf16, length=ch_len16, custom_emoji_id=e.custom_emoji_id, url=None, user=None, language=None ) ) rel_utf16 += ch_len16 else: out.append(e) return out def _strip_none_entities(entities: Optional[List[MessageEntity]]) -> List[MessageEntity]: """ Фильтрация мусорных/пустых entity, чтобы не падать на сервере. """ cleaned: List[MessageEntity] = [] for e in entities or []: if e.offset is None or e.length is None or e.offset < 0 or e.length < 1: continue if e.type == MessageEntityType.CUSTOM_EMOJI: if not getattr(e, "custom_emoji_id", None): continue cleaned.append(e) return cleaned def _extract_text_and_entities(msg) -> tuple[str, List[MessageEntity], bool]: """ Унифицированно достаём текст и entities из сообщения: - Если есть text -> берём text/entities, is_caption=False - Иначе если есть caption -> берём caption/caption_entities, is_caption=True - Иначе пусто """ if getattr(msg, "text", None): return msg.text, (msg.entities or []), False if getattr(msg, "caption", None): return msg.caption, (msg.caption_entities or []), True return "", [], False # ========== Conversation handlers ========== async def new_post_start(update: Update, context: ContextTypes.DEFAULT_TYPE): if update.message: await update.message.reply_text("Отправьте картинку/медиа для поста или пришлите /skip:") return SELECT_MEDIA return ConversationHandler.END async def select_media(update: Update, context: ContextTypes.DEFAULT_TYPE): if context.user_data is None: context.user_data = {} if not update.message: return ConversationHandler.END msg = update.message # Поддержка пропуска медиа if msg.text and msg.text.strip().lower() == "/skip": await update.message.reply_text("Введите текст поста или пересланное сообщение (можно с эмодзи/разметкой):") return SELECT_TEXT # Сохраняем медиа (последний вариант — стикер) if msg.photo: context.user_data["photo"] = msg.photo[-1].file_id elif msg.animation: context.user_data["animation"] = msg.animation.file_id elif msg.video: context.user_data["video"] = msg.video.file_id elif msg.document: context.user_data["document"] = msg.document.file_id elif msg.audio: context.user_data["audio"] = msg.audio.file_id elif msg.voice: context.user_data["voice"] = msg.voice.file_id elif msg.sticker: context.user_data["sticker"] = msg.sticker.file_id await update.message.reply_text("Введите текст поста или пересланное сообщение (можно с эмодзи/разметкой):") return SELECT_TEXT async def select_text(update: Update, context: ContextTypes.DEFAULT_TYPE): if not update.message: return ConversationHandler.END if context.user_data is None: context.user_data = {} msg = update.message # Берём как есть: либо text/entities, либо caption/caption_entities if msg.text: text = msg.text entities = msg.entities or [] else: text = msg.caption or "" entities = msg.caption_entities or [] # Сохраняем исходные данные БЕЗ изменений context.user_data["text"] = text context.user_data["entities"] = entities context.user_data["src_chat_id"] = update.effective_chat.id context.user_data["src_msg_id"] = update.message.message_id # Дальше как у тебя: выбор канала/группы from sqlalchemy import select as sa_select async with AsyncSessionLocal() as session: channels = (await session.execute(sa_select(Channel))).scalars().all() groups = (await session.execute(sa_select(Group))).scalars().all() keyboard = [] for c in channels: keyboard.append([InlineKeyboardButton(f'Канал: {c.name}', callback_data=f'channel_{c.id}')]) for g in groups: keyboard.append([InlineKeyboardButton(f'Группа: {g.name}', callback_data=f'group_{g.id}')]) await update.message.reply_text('Выберите, куда отправить пост:', reply_markup=InlineKeyboardMarkup(keyboard)) return SELECT_TARGET async def select_target(update: Update, context: ContextTypes.DEFAULT_TYPE): query = update.callback_query if not query: return ConversationHandler.END await query.answer() data = query.data from sqlalchemy import select as sa_select async with AsyncSessionLocal() as session: chat_id = None markup = None if data and data.startswith('channel_'): channel_id = int(data.split('_')[1]) channel = (await session.execute(sa_select(Channel).where(Channel.id == channel_id))).scalar_one_or_none() buttons = (await session.execute(sa_select(Button).where(Button.channel_id == channel_id))).scalars().all() markup = InlineKeyboardMarkup([[InlineKeyboardButton(str(b.name), url=str(b.url))] for b in buttons]) if buttons else None chat_id = channel.link if channel else None elif data and data.startswith('group_'): group_id = int(data.split('_')[1]) group = (await session.execute(sa_select(Group).where(Group.id == group_id))).scalar_one_or_none() buttons = (await session.execute(sa_select(Button).where(Button.group_id == group_id))).scalars().all() markup = InlineKeyboardMarkup([[InlineKeyboardButton(str(b.name), url=str(b.url))] for b in buttons]) if buttons else None chat_id = group.link if group else None if not chat_id: await query.edit_message_text('Ошибка: объект не найден.') return ConversationHandler.END chat_id = chat_id.strip() if not (chat_id.startswith('@') or chat_id.startswith('-')): await query.edit_message_text('Ошибка: ссылка должна быть username (@channel) или числовой ID (-100...)') return ConversationHandler.END try: ud = context.user_data or {} text = ud.get('text', '') or '' entities = ud.get('entities', []) or [] # Если это одно входящее сообщение без “сборки” из двух шагов — идеально копируем: # (то есть пользователь НЕ загружал медиа в SELECT_MEDIA) has_media = any(k in ud for k in ('photo','animation','video','document','audio','voice','sticker')) if ud.get('src_chat_id') and ud.get('src_msg_id') and not has_media: await context.bot.copy_message( chat_id=chat_id, from_chat_id=ud['src_chat_id'], message_id=ud['src_msg_id'], reply_markup=markup, ) await query.edit_message_text('Пост отправлен!') return ConversationHandler.END # Иначе — отправляем собранный пост, но entities/emoji берём ИСХОДНЫЕ sent = False if 'photo' in ud: await context.bot.send_photo(chat_id=chat_id, photo=ud['photo'], caption=text or None, caption_entities=entities if text else None, reply_markup=markup) sent = True elif 'animation' in ud: await context.bot.send_animation(chat_id=chat_id, animation=ud['animation'], caption=text or None, caption_entities=entities if text else None, reply_markup=markup) sent = True elif 'video' in ud: await context.bot.send_video(chat_id=chat_id, video=ud['video'], caption=text or None, caption_entities=entities if text else None, reply_markup=markup) sent = True elif 'document' in ud: await context.bot.send_document(chat_id=chat_id, document=ud['document'], caption=text or None, caption_entities=entities if text else None, reply_markup=markup) sent = True elif 'audio' in ud: await context.bot.send_audio(chat_id=chat_id, audio=ud['audio'], caption=text or None, caption_entities=entities if text else None, reply_markup=markup) sent = True elif 'voice' in ud: await context.bot.send_voice(chat_id=chat_id, voice=ud['voice'], caption=text or None, caption_entities=entities if text else None, reply_markup=markup) sent = True elif 'sticker' in ud: await context.bot.send_sticker(chat_id=chat_id, sticker=ud['sticker']) if text: await context.bot.send_message(chat_id=chat_id, text=text, entities=entities, reply_markup=markup) sent = True else: await context.bot.send_message(chat_id=chat_id, text=text, entities=entities, reply_markup=markup) sent = True await query.edit_message_text('Пост отправлен!' if sent else 'Ошибка: не удалось отправить сообщение.') except Exception as e: await query.edit_message_text(f'Ошибка отправки поста: {e}') return ConversationHandler.END new_post_conv = ConversationHandler( entry_points=[CommandHandler("new_post", new_post_start)], states={ # Принимаем любые медиа + /skip SELECT_MEDIA: [ MessageHandler( filters.PHOTO | filters.ANIMATION | filters.VIDEO | filters.Document.ALL | filters.AUDIO | filters.VOICE | filters.Sticker.ALL | filters.COMMAND, select_media, ) ], # Принимаем любой текст/пересланное (caption попадёт как caption_entities через media-фильтры на предыдущем шаге) SELECT_TEXT: [ MessageHandler( filters.ALL & (filters.TEXT | filters.FORWARDED), select_text ) ], SELECT_TARGET: [CallbackQueryHandler(select_target)], }, fallbacks=[], )