Files
post_bot/handlers/new_post.py
Choi A.K. 2bcf07f6a9
Some checks failed
continuous-integration/drone/push Build is failing
script update
2025-09-06 13:38:55 +09:00

277 lines
13 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# handlers/new_post.py
from __future__ import annotations
from typing import List, Optional, Tuple
from telegram import (
Update, InlineKeyboardMarkup, InlineKeyboardButton, MessageEntity, Bot
)
from telegram.ext import (
ContextTypes, ConversationHandler, MessageHandler, CommandHandler, CallbackQueryHandler, filters
)
from telegram.constants import MessageEntityType
from telegram.error import BadRequest
from sqlalchemy import select as sa_select
from db import AsyncSessionLocal
from models import Channel, Group
from .permissions import get_or_create_admin, list_channels_for_admin, has_scope_on_channel, SCOPE_POST
from models import Channel, Group, Button
SELECT_MEDIA, SELECT_TEXT, SELECT_TARGET = range(3)
# ===== UTF-16 helpers (для custom_emoji) =====
def _utf16_units_len(s: str) -> int:
return len(s.encode("utf-16-le")) // 2
def _utf16_index_map(text: str) -> List[Tuple[int, int, str]]:
out: List[Tuple[int, int, str]] = []
off = 0
for ch in text:
ln = _utf16_units_len(ch)
out.append((off, ln, ch))
off += ln
return out
def _split_custom_emoji_by_utf16(text: str, entities: List[MessageEntity]) -> List[MessageEntity]:
if not text or not entities:
return entities or []
map_utf16 = _utf16_index_map(text)
out: List[MessageEntity] = []
for e in entities:
if (e.type == MessageEntityType.CUSTOM_EMOJI and e.length and e.length > 1 and getattr(e, "custom_emoji_id", None)):
start = e.offset
end = e.offset + e.length
for uoff, ulen, _ in map_utf16:
if start <= uoff < end:
out.append(MessageEntity(
type=MessageEntityType.CUSTOM_EMOJI,
offset=uoff,
length=ulen,
custom_emoji_id=e.custom_emoji_id,
))
else:
out.append(e)
out.sort(key=lambda x: x.offset)
return out
def _strip_broken_entities(entities: Optional[List[MessageEntity]]) -> List[MessageEntity]:
cleaned: List[MessageEntity] = []
for e in entities or []:
if e.offset is None or e.length is None or e.offset < 0 or e.length < 1:
continue
if e.type == MessageEntityType.CUSTOM_EMOJI and not getattr(e, "custom_emoji_id", None):
continue
cleaned.append(e)
cleaned.sort(key=lambda x: x.offset)
return cleaned
def _extract_text_and_entities(msg) -> tuple[str, List[MessageEntity], bool]:
if getattr(msg, "text", None):
return msg.text, (msg.entities or []), False
if getattr(msg, "caption", None):
return msg.caption, (msg.caption_entities or []), True
return "", [], False
# ===== Conversation =====
async def new_post_start(update: Update, context: ContextTypes.DEFAULT_TYPE):
if update.message:
await update.message.reply_text("Отправьте медиа для поста или пришлите /skip:")
return SELECT_MEDIA
return ConversationHandler.END
async def select_media(update: Update, context: ContextTypes.DEFAULT_TYPE):
if context.user_data is None:
context.user_data = {}
if not update.message:
return ConversationHandler.END
msg = update.message
if msg.text and msg.text.strip().lower() == "/skip":
await update.message.reply_text("Введите текст поста или перешлите сообщение (можно с кастом-эмодзи):")
return SELECT_TEXT
if msg.photo: context.user_data["photo"] = msg.photo[-1].file_id
elif msg.animation:context.user_data["animation"] = msg.animation.file_id
elif msg.video: context.user_data["video"] = msg.video.file_id
elif msg.document: context.user_data["document"] = msg.document.file_id
elif msg.audio: context.user_data["audio"] = msg.audio.file_id
elif msg.voice: context.user_data["voice"] = msg.voice.file_id
elif msg.sticker: context.user_data["sticker"] = msg.sticker.file_id
await update.message.reply_text("Введите текст поста или перешлите сообщение (можно с кастом-эмодзи):")
return SELECT_TEXT
async def select_text(update: Update, context: ContextTypes.DEFAULT_TYPE):
if not update.message:
return ConversationHandler.END
if context.user_data is None:
context.user_data = {}
msg = update.message
text, entities, _ = _extract_text_and_entities(msg)
entities = _strip_broken_entities(entities)
entities = _split_custom_emoji_by_utf16(text, entities)
# сохраним исходник для copyMessage
context.user_data["text"] = text
context.user_data["entities"] = entities
context.user_data["src_chat_id"] = update.effective_chat.id
context.user_data["src_msg_id"] = update.message.message_id
# дать выбор только тех каналов, где у текущего админа есть право постинга
async with AsyncSessionLocal() as session:
me = await get_or_create_admin(session, update.effective_user.id)
channels = await list_channels_for_admin(session, me.id)
# группы оставляем без ACL (как было)
groups = (await session.execute(sa_select(Group))).scalars().all()
# если каналов нет — всё равно покажем группы
keyboard = []
for c in channels:
keyboard.append([InlineKeyboardButton(f'Канал: {c.name}', callback_data=f'channel_{c.id}')])
for g in groups:
keyboard.append([InlineKeyboardButton(f'Группа: {g.name}', callback_data=f'group_{g.id}')])
if not keyboard:
await update.message.reply_text("Нет доступных каналов/групп для отправки.")
return ConversationHandler.END
await update.message.reply_text('Выберите, куда отправить пост:', reply_markup=InlineKeyboardMarkup(keyboard))
return SELECT_TARGET
async def select_target(update: Update, context: ContextTypes.DEFAULT_TYPE):
query = update.callback_query
if not query:
return ConversationHandler.END
await query.answer()
data = query.data
async with AsyncSessionLocal() as session:
chat_id = None
markup = None
selected_title = None
btns = []
if data and data.startswith('channel_'):
channel_id = int(data.split('_')[1])
me = await get_or_create_admin(session, update.effective_user.id)
if not await has_scope_on_channel(session, me.id, channel_id, SCOPE_POST):
await query.edit_message_text("У вас нет права постить в этот канал.")
return ConversationHandler.END
channel = (await session.execute(sa_select(Channel).where(Channel.id == channel_id))).scalar_one_or_none()
if not channel:
await query.edit_message_text("Канал не найден.")
return ConversationHandler.END
chat_id = (channel.link or "").strip()
selected_title = channel.name
btns = (await session.execute(sa_select(Button).where(Button.channel_id == channel_id))).scalars().all()
if btns:
markup = InlineKeyboardMarkup([[InlineKeyboardButton(str(b.name), url=str(b.url))] for b in btns])
elif data and data.startswith('group_'):
group_id = int(data.split('_')[1])
group = (await session.execute(sa_select(Group).where(Group.id == group_id))).scalar_one_or_none()
if not group:
await query.edit_message_text("Группа не найдена.")
return ConversationHandler.END
chat_id = (group.link or "").strip()
selected_title = group.name
btns = (await session.execute(sa_select(Button).where(Button.group_id == group_id))).scalars().all()
if btns:
markup = InlineKeyboardMarkup([[InlineKeyboardButton(str(b.name), url=str(b.url))] for b in btns]])
if not chat_id or not (chat_id.startswith('@') or chat_id.startswith('-')):
await query.edit_message_text('Ошибка: ссылка должна быть @username или -100…')
return ConversationHandler.END
# DEBUG: покажем, сколько кнопок нашли и куда шлём
print(f"[DEBUG] send -> chat_id={chat_id} title={selected_title!r} buttons={len(btns)} has_markup={bool(markup)}")
ud = context.user_data or {}
text: str = ud.get("text", "") or ""
entities: List[MessageEntity] = ud.get("entities", []) or []
entities = _strip_broken_entities(entities)
entities = _split_custom_emoji_by_utf16(text, entities)
try:
sent_msg = None
# ВСЕГДА ручная отправка (никакого copyMessage), чтобы приклеить reply_markup
if "photo" in ud:
sent_msg = await context.bot.send_photo(chat_id=chat_id, photo=ud["photo"],
caption=(text or None),
caption_entities=(entities if text else None),
reply_markup=markup)
elif "animation" in ud:
sent_msg = await context.bot.send_animation(chat_id=chat_id, animation=ud["animation"],
caption=(text or None),
caption_entities=(entities if text else None),
reply_markup=markup)
elif "video" in ud:
sent_msg = await context.bot.send_video(chat_id=chat_id, video=ud["video"],
caption=(text or None),
caption_entities=(entities if text else None),
reply_markup=markup)
elif "document" in ud:
sent_msg = await context.bot.send_document(chat_id=chat_id, document=ud["document"],
caption=(text or None),
caption_entities=(entities if text else None),
reply_markup=markup)
elif "audio" in ud:
sent_msg = await context.bot.send_audio(chat_id=chat_id, audio=ud["audio"],
caption=(text or None),
caption_entities=(entities if text else None),
reply_markup=markup)
elif "voice" in ud:
sent_msg = await context.bot.send_voice(chat_id=chat_id, voice=ud["voice"],
caption=(text or None),
caption_entities=(entities if text else None),
reply_markup=markup)
elif "sticker" in ud:
sent_msg = await context.bot.send_sticker(chat_id=chat_id, sticker=ud["sticker"], reply_markup=markup)
if text:
await context.bot.send_message(chat_id=chat_id, text=text, entities=entities)
else:
sent_msg = await context.bot.send_message(chat_id=chat_id, text=text, entities=entities, reply_markup=markup)
# страховка: если вдруг Telegram проглотил клаву — доклеим через edit_message_reply_markup
if markup and getattr(sent_msg, "message_id", None):
try:
await context.bot.edit_message_reply_markup(chat_id=chat_id,
message_id=sent_msg.message_id,
reply_markup=markup)
except Exception:
pass
await query.edit_message_text(f'Пост отправлен{(" в: " + selected_title) if selected_title else "!"}')
except BadRequest as e:
await query.edit_message_text(f'Ошибка отправки поста: {e}')
return ConversationHandler.END
new_post_conv = ConversationHandler(
entry_points=[CommandHandler("new_post", new_post_start)],
states={
SELECT_MEDIA: [MessageHandler(
filters.PHOTO | filters.ANIMATION | filters.VIDEO | filters.Document.ALL |
filters.AUDIO | filters.VOICE | filters.Sticker.ALL | filters.COMMAND,
select_media
)],
SELECT_TEXT: [MessageHandler(filters.TEXT | filters.FORWARDED | filters.CAPTION, select_text)],
SELECT_TARGET: [CallbackQueryHandler(select_target)],
},
fallbacks=[],
)