Files
post_bot/handlers/new_post.py
Choi A.K. 297af93fff
Some checks failed
continuous-integration/drone/push Build is failing
select_target -> disabled COPY using send_
2025-09-06 13:18:42 +09:00

415 lines
20 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# handlers/new_post.py
from __future__ import annotations
from typing import List, Optional, Tuple
from telegram import (
Update, InlineKeyboardMarkup, InlineKeyboardButton, MessageEntity, Bot
)
from telegram.ext import (
ContextTypes, ConversationHandler, MessageHandler, CommandHandler, CallbackQueryHandler, filters
)
from telegram.constants import MessageEntityType
from telegram.error import BadRequest
from sqlalchemy import select as sa_select
from db import AsyncSessionLocal
from models import Channel, Group
from .permissions import get_or_create_admin, list_channels_for_admin, has_scope_on_channel, SCOPE_POST
from models import Channel, Group, Button
SELECT_MEDIA, SELECT_TEXT, SELECT_TARGET = range(3)
# ===== UTF-16 helpers (для custom_emoji) =====
def _utf16_units_len(s: str) -> int:
return len(s.encode("utf-16-le")) // 2
def _utf16_index_map(text: str) -> List[Tuple[int, int, str]]:
out: List[Tuple[int, int, str]] = []
off = 0
for ch in text:
ln = _utf16_units_len(ch)
out.append((off, ln, ch))
off += ln
return out
def _split_custom_emoji_by_utf16(text: str, entities: List[MessageEntity]) -> List[MessageEntity]:
if not text or not entities:
return entities or []
map_utf16 = _utf16_index_map(text)
out: List[MessageEntity] = []
for e in entities:
if (e.type == MessageEntityType.CUSTOM_EMOJI and e.length and e.length > 1 and getattr(e, "custom_emoji_id", None)):
start = e.offset
end = e.offset + e.length
for uoff, ulen, _ in map_utf16:
if start <= uoff < end:
out.append(MessageEntity(
type=MessageEntityType.CUSTOM_EMOJI,
offset=uoff,
length=ulen,
custom_emoji_id=e.custom_emoji_id,
))
else:
out.append(e)
out.sort(key=lambda x: x.offset)
return out
def _strip_broken_entities(entities: Optional[List[MessageEntity]]) -> List[MessageEntity]:
cleaned: List[MessageEntity] = []
for e in entities or []:
if e.offset is None or e.length is None or e.offset < 0 or e.length < 1:
continue
if e.type == MessageEntityType.CUSTOM_EMOJI and not getattr(e, "custom_emoji_id", None):
continue
cleaned.append(e)
cleaned.sort(key=lambda x: x.offset)
return cleaned
def _extract_text_and_entities(msg) -> tuple[str, List[MessageEntity], bool]:
if getattr(msg, "text", None):
return msg.text, (msg.entities or []), False
if getattr(msg, "caption", None):
return msg.caption, (msg.caption_entities or []), True
return "", [], False
# ===== Conversation =====
async def new_post_start(update: Update, context: ContextTypes.DEFAULT_TYPE):
if update.message:
await update.message.reply_text("Отправьте медиа для поста или пришлите /skip:")
return SELECT_MEDIA
return ConversationHandler.END
async def select_media(update: Update, context: ContextTypes.DEFAULT_TYPE):
if context.user_data is None:
context.user_data = {}
if not update.message:
return ConversationHandler.END
msg = update.message
if msg.text and msg.text.strip().lower() == "/skip":
await update.message.reply_text("Введите текст поста или перешлите сообщение (можно с кастом-эмодзи):")
return SELECT_TEXT
if msg.photo: context.user_data["photo"] = msg.photo[-1].file_id
elif msg.animation:context.user_data["animation"] = msg.animation.file_id
elif msg.video: context.user_data["video"] = msg.video.file_id
elif msg.document: context.user_data["document"] = msg.document.file_id
elif msg.audio: context.user_data["audio"] = msg.audio.file_id
elif msg.voice: context.user_data["voice"] = msg.voice.file_id
elif msg.sticker: context.user_data["sticker"] = msg.sticker.file_id
await update.message.reply_text("Введите текст поста или перешлите сообщение (можно с кастом-эмодзи):")
return SELECT_TEXT
async def select_text(update: Update, context: ContextTypes.DEFAULT_TYPE):
if not update.message:
return ConversationHandler.END
if context.user_data is None:
context.user_data = {}
msg = update.message
text, entities, _ = _extract_text_and_entities(msg)
entities = _strip_broken_entities(entities)
entities = _split_custom_emoji_by_utf16(text, entities)
# сохраним исходник для copyMessage
context.user_data["text"] = text
context.user_data["entities"] = entities
context.user_data["src_chat_id"] = update.effective_chat.id
context.user_data["src_msg_id"] = update.message.message_id
# дать выбор только тех каналов, где у текущего админа есть право постинга
async with AsyncSessionLocal() as session:
me = await get_or_create_admin(session, update.effective_user.id)
channels = await list_channels_for_admin(session, me.id)
# группы оставляем без ACL (как было)
groups = (await session.execute(sa_select(Group))).scalars().all()
# если каналов нет — всё равно покажем группы
keyboard = []
for c in channels:
keyboard.append([InlineKeyboardButton(f'Канал: {c.name}', callback_data=f'channel_{c.id}')])
for g in groups:
keyboard.append([InlineKeyboardButton(f'Группа: {g.name}', callback_data=f'group_{g.id}')])
if not keyboard:
await update.message.reply_text("Нет доступных каналов/групп для отправки.")
return ConversationHandler.END
await update.message.reply_text('Выберите, куда отправить пост:', reply_markup=InlineKeyboardMarkup(keyboard))
return SELECT_TARGET
# async def select_target(update: Update, context: ContextTypes.DEFAULT_TYPE):
# query = update.callback_query
# if not query:
# return ConversationHandler.END
# await query.answer()
# data = query.data
# async with AsyncSessionLocal() as session:
# chat_id = None
# markup = None
# selected_kind = None # "channel" | "group"
# selected_title = None
# if data and data.startswith('channel_'):
# selected_kind = "channel"
# channel_id = int(data.split('_')[1])
# # ACL: право постинга
# me = await get_or_create_admin(session, update.effective_user.id)
# allowed = await has_scope_on_channel(session, me.id, channel_id, SCOPE_POST)
# if not allowed:
# await query.edit_message_text("У вас нет права постить в этот канал.")
# return ConversationHandler.END
# channel = (await session.execute(sa_select(Channel).where(Channel.id == channel_id))).scalar_one_or_none()
# if not channel:
# await query.edit_message_text("Канал не найден.")
# return ConversationHandler.END
# chat_id = (channel.link or "").strip()
# selected_title = channel.name
# # кнопки канала
# btns = (await session.execute(sa_select(Button).where(Button.channel_id == channel_id))).scalars().all()
# if btns:
# markup = InlineKeyboardMarkup(
# [[InlineKeyboardButton(str(b.name), url=str(b.url))] for b in btns]
# )
# elif data and data.startswith('group_'):
# selected_kind = "group"
# group_id = int(data.split('_')[1])
# group = (await session.execute(sa_select(Group).where(Group.id == group_id))).scalar_one_or_none()
# if not group:
# await query.edit_message_text("Группа не найдена.")
# return ConversationHandler.END
# chat_id = (group.link or "").strip()
# selected_title = group.name
# # кнопки группы
# btns = (await session.execute(sa_select(Button).where(Button.group_id == group_id))).scalars().all()
# if btns:
# markup = InlineKeyboardMarkup(
# [[InlineKeyboardButton(str(b.name), url=str(b.url))] for b in btns]
# )
# if not chat_id or not (chat_id.startswith('@') or chat_id.startswith('-')):
# await query.edit_message_text('Ошибка: ссылка должна быть username (@channel) или числовой ID (-100...)')
# return ConversationHandler.END
# ud = context.user_data or {}
# text: str = ud.get("text", "") or ""
# entities: List[MessageEntity] = ud.get("entities", []) or []
# # санация для custom_emoji
# entities = _strip_broken_entities(entities)
# entities = _split_custom_emoji_by_utf16(text, entities)
# # если есть клавиатура, ПРЕДПОЧИТАЕМ отправку "вручную" (а не copyMessage),
# # т.к. в некоторых кейсах клавиатура при копировании может не приклеиться
# prefer_manual_send = markup is not None
# # определим, были ли “части медиа”
# has_media_parts = any(k in ud for k in ("photo","animation","video","document","audio","voice","sticker"))
# try:
# if not prefer_manual_send and ud.get("src_chat_id") and ud.get("src_msg_id") and not has_media_parts:
# # 1) copyMessage без медиа и без клавиатуры — максимальная идентичность
# msg_id_obj = await context.bot.copy_message(
# chat_id=chat_id,
# from_chat_id=ud["src_chat_id"],
# message_id=ud["src_msg_id"],
# reply_markup=markup # если вдруг есть, попробуем сразу
# )
# # страховка: если клавиатуры нет/не приклеилась — прикрутим edit_message_reply_markup
# if markup and getattr(msg_id_obj, "message_id", None):
# try:
# await context.bot.edit_message_reply_markup(
# chat_id=chat_id,
# message_id=msg_id_obj.message_id,
# reply_markup=markup
# )
# except Exception:
# pass
# await query.edit_message_text(f'Пост отправлен{" в: " + selected_title if selected_title else "!"}')
# return ConversationHandler.END
# # 2) fallback / manual — ВСЕГДА прикладываем reply_markup
# sent = False
# if "photo" in ud:
# await context.bot.send_photo(chat_id=chat_id, photo=ud["photo"],
# caption=(text or None), caption_entities=(entities if text else None),
# reply_markup=markup)
# sent = True
# elif "animation" in ud:
# await context.bot.send_animation(chat_id=chat_id, animation=ud["animation"],
# caption=(text or None), caption_entities=(entities if text else None),
# reply_markup=markup)
# sent = True
# elif "video" in ud:
# await context.bot.send_video(chat_id=chat_id, video=ud["video"],
# caption=(text or None), caption_entities=(entities if text else None),
# reply_markup=markup)
# sent = True
# elif "document" in ud:
# await context.bot.send_document(chat_id=chat_id, document=ud["document"],
# caption=(text or None), caption_entities=(entities if text else None),
# reply_markup=markup)
# sent = True
# elif "audio" in ud:
# await context.bot.send_audio(chat_id=chat_id, audio=ud["audio"],
# caption=(text or None), caption_entities=(entities if text else None),
# reply_markup=markup)
# sent = True
# elif "voice" in ud:
# await context.bot.send_voice(chat_id=chat_id, voice=ud["voice"],
# caption=(text or None), caption_entities=(entities if text else None),
# reply_markup=markup)
# sent = True
# elif "sticker" in ud:
# await context.bot.send_sticker(chat_id=chat_id, sticker=ud["sticker"], reply_markup=markup)
# if text:
# await context.bot.send_message(chat_id=chat_id, text=text, entities=entities)
# sent = True
# else:
# await context.bot.send_message(chat_id=chat_id, text=text, entities=entities, reply_markup=markup)
# sent = True
# await query.edit_message_text(f'Пост отправлен{" в: " + selected_title if selected_title else "!"}' if sent else 'Ошибка: не удалось отправить сообщение.')
# except BadRequest as e:
# await query.edit_message_text(f'Ошибка отправки поста: {e}')
# return ConversationHandler.END
async def select_target(update: Update, context: ContextTypes.DEFAULT_TYPE):
query = update.callback_query
if not query:
return ConversationHandler.END
await query.answer()
data = query.data
async with AsyncSessionLocal() as session:
chat_id = None
markup = None
selected_title = None
if data and data.startswith('channel_'):
channel_id = int(data.split('_')[1])
# ACL
me = await get_or_create_admin(session, update.effective_user.id)
if not await has_scope_on_channel(session, me.id, channel_id, SCOPE_POST):
await query.edit_message_text("У вас нет права постить в этот канал.")
return ConversationHandler.END
channel = (await session.execute(sa_select(Channel).where(Channel.id == channel_id))).scalar_one_or_none()
if not channel:
await query.edit_message_text("Канал не найден.")
return ConversationHandler.END
chat_id = (channel.link or "").strip()
selected_title = channel.name
# КНОПКИ
btns = (await session.execute(sa_select(Button).where(Button.channel_id == channel_id))).scalars().all()
if btns:
markup = InlineKeyboardMarkup([[InlineKeyboardButton(str(b.name), url=str(b.url))] for b in btns])
elif data and data.startswith('group_'):
group_id = int(data.split('_')[1])
group = (await session.execute(sa_select(Group).where(Group.id == group_id))).scalar_one_or_none()
if not group:
await query.edit_message_text("Группа не найдена.")
return ConversationHandler.END
chat_id = (group.link or "").strip()
selected_title = group.name
btns = (await session.execute(sa_select(Button).where(Button.group_id == group_id))).scalars().all()
if btns:
markup = InlineKeyboardMarkup([[InlineKeyboardButton(str(b.name), url=str(b.url))] for b in btns])
if not chat_id or not (chat_id.startswith('@') or chat_id.startswith('-')):
await query.edit_message_text('Ошибка: ссылка должна быть @username или -100...')
return ConversationHandler.END
ud = context.user_data or {}
text: str = ud.get("text", "") or ""
entities: List[MessageEntity] = ud.get("entities", []) or []
# санация custom_emoji/UTF-16
entities = _strip_broken_entities(entities)
entities = _split_custom_emoji_by_utf16(text, entities)
# >>> ВАЖНО: НИКАКОГО copyMessage. Всегда manual send с reply_markup <<<
try:
sent = False
if "photo" in ud:
await context.bot.send_photo(chat_id=chat_id, photo=ud["photo"],
caption=(text or None),
caption_entities=(entities if text else None),
reply_markup=markup)
sent = True
elif "animation" in ud:
await context.bot.send_animation(chat_id=chat_id, animation=ud["animation"],
caption=(text or None),
caption_entities=(entities if text else None),
reply_markup=markup)
sent = True
elif "video" in ud:
await context.bot.send_video(chat_id=chat_id, video=ud["video"],
caption=(text or None),
caption_entities=(entities if text else None),
reply_markup=markup)
sent = True
elif "document" in ud:
await context.bot.send_document(chat_id=chat_id, document=ud["document"],
caption=(text or None),
caption_entities=(entities if text else None),
reply_markup=markup)
sent = True
elif "audio" in ud:
await context.bot.send_audio(chat_id=chat_id, audio=ud["audio"],
caption=(text or None),
caption_entities=(entities if text else None),
reply_markup=markup)
sent = True
elif "voice" in ud:
await context.bot.send_voice(chat_id=chat_id, voice=ud["voice"],
caption=(text or None),
caption_entities=(entities if text else None),
reply_markup=markup)
sent = True
elif "sticker" in ud:
await context.bot.send_sticker(chat_id=chat_id, sticker=ud["sticker"], reply_markup=markup)
if text:
await context.bot.send_message(chat_id=chat_id, text=text, entities=entities)
sent = True
else:
await context.bot.send_message(chat_id=chat_id, text=text, entities=entities, reply_markup=markup)
sent = True
await query.edit_message_text(f'Пост отправлен{(" в: " + selected_title) if selected_title else "!"}' if sent else 'Ошибка: не удалось отправить сообщение.')
except BadRequest as e:
await query.edit_message_text(f'Ошибка отправки поста: {e}')
return ConversationHandler.END
new_post_conv = ConversationHandler(
entry_points=[CommandHandler("new_post", new_post_start)],
states={
SELECT_MEDIA: [MessageHandler(
filters.PHOTO | filters.ANIMATION | filters.VIDEO | filters.Document.ALL |
filters.AUDIO | filters.VOICE | filters.Sticker.ALL | filters.COMMAND,
select_media
)],
SELECT_TEXT: [MessageHandler(filters.TEXT | filters.FORWARDED | filters.CAPTION, select_text)],
SELECT_TARGET: [CallbackQueryHandler(select_target)],
},
fallbacks=[],
)