Files
post_bot/handlers/new_post.py
Choi A.K. 929d2face6
Some checks failed
continuous-integration/drone/push Build is failing
select_target fixsync def select_target(update: Update, context:
ContextTypes.DEFAULT_TYPE):
    query = update.callback_query
        if not query:
	        return ConversationHandler.END
		    await query.answer()

		        data = (query.data or "")
			    async with AsyncSessionLocal() as session:
			            chat_id: str | None = None
				            markup: InlineKeyboardMarkup
					    | None = None
					            selected_title: str
						    | None = None
						            btns = []

							            if
								    data.startswith('channel_'):
								                channel_id
										=
										int(data.split('_',
										1)[1])

										            #
											    ACL:
											    право
											    постинга
											    в
											    канал
											                me
													=
													await
													get_or_create_admin(session,
													update.effective_user.id)
													            allowed
														    =
														    await
														    has_scope_on_channel(session,
														    me.id,
														    channel_id,
														    SCOPE_POST)
														                if
																not
																allowed:
																                await
																		query.edit_message_text("У
																		вас
																		нет
																		права
																		постить
																		в
																		этот
																		канал.")
																		                return
																				ConversationHandler.END

																				            channel
																					    =
																					    (await
																					    session.execute(sa_select(Channel).where(Channel.id
																					    ==
																					    channel_id))).scalar_one_or_none()
																					                if
																							not
																							channel:
																							                await
																									query.edit_message_text("Канал
																									не
																									найден.")
																									                return
																											ConversationHandler.END

																											            chat_id
																												    =
																												    (channel.link
																												    or
																												    "").strip()
																												                selected_title
																														=
																														channel.name

																														            #
																															    Кнопки
																															    канала
																															                btns
																																	=
																																	(await
																																	session.execute(sa_select(Button).where(Button.channel_id
																																	==
																																	channel_id))).scalars().all()
																																	            if
																																		    btns:
																																		                    rows
																																				    =
																																				    [[InlineKeyboardButton(str(b.name),
																																				    url=str(b.url))]
																																				    for
																																				    b
																																				    in
																																				    btns]
																																				                    markup
																																						    =
																																						    InlineKeyboardMarkup(rows)

																																						            elif
																																							    data.startswith('group_'):
																																							                group_id
																																									=
																																									int(data.split('_',
																																									1)[1])

																																									            group
																																										    =
																																										    (await
																																										    session.execute(sa_select(Group).where(Group.id
																																										    ==
																																										    group_id))).scalar_one_or_none()
																																										                if
																																												not
																																												group:
																																												                await
																																														query.edit_message_text("Группа
																																														не
																																														найдена.")
																																														                return
																																																ConversationHandler.END

																																																            chat_id
																																																	    =
																																																	    (group.link
																																																	    or
																																																	    "").strip()
																																																	                selected_title
																																																			=
																																																			group.name

																																																			            #
																																																				    Кнопки
																																																				    группы
																																																				                btns
																																																						=
																																																						(await
																																																						session.execute(sa_select(Button).where(Button.group_id
																																																						==
																																																						group_id))).scalars().all()
																																																						            if
																																																							    btns:
																																																							                    rows
																																																									    =
																																																									    [[InlineKeyboardButton(str(b.name),
																																																									    url=str(b.url))]
																																																									    for
																																																									    b
																																																									    in
																																																									    btns]
																																																									                    markup
																																																											    =
																																																											    InlineKeyboardMarkup(rows)

																																																											        if
																																																												not
																																																												chat_id
																																																												or
																																																												not
																																																												(chat_id.startswith('@')
																																																												or
																																																												chat_id.startswith('-')):
																																																												        await
																																																													query.edit_message_text('Ошибка:
																																																													ссылка
																																																													должна
																																																													быть
																																																													username
																																																													(@channel)
																																																													или
																																																													числовой
																																																													ID
																																																													(-100...)')
																																																													        return
																																																														ConversationHandler.END

																																																														    #
																																																														    DEBUG:
																																																														    сколько
																																																														    кнопок
																																																														    нашли
																																																														    и
																																																														    есть
																																																														    ли
																																																														    markup
																																																														        print(f"[DEBUG]
																																																															send
																																																															->
																																																															chat_id={chat_id}
																																																															title={selected_title!r}
																																																															buttons={len(btns)}
																																																															has_markup={bool(markup)}")

																																																															    #
																																																															    Текст
																																																															    и
																																																															    entities
																																																															    (без
																																																															    parse_mode)
																																																															        ud
																																																																=
																																																																context.user_data
																																																																or
																																																																{}
																																																																    text:
																																																																    str
																																																																    =
																																																																    ud.get("text",
																																																																    "")
																																																																    or
																																																																    ""
																																																																        entities:
																																																																	List[MessageEntity]
																																																																	=
																																																																	ud.get("entities",
																																																																	[])
																																																																	or
																																																																	[]
																																																																	    entities
																																																																	    =
																																																																	    _strip_broken_entities(entities)
																																																																	        entities
																																																																		=
																																																																		_split_custom_emoji_by_utf16(text,
																																																																		entities)

																																																																		    #
																																																																		    Всегда
																																																																		    ручная
																																																																		    отправка
																																																																		    (send_*),
																																																																		    чтобы
																																																																		    гарантированно
																																																																		    приклеить
																																																																		    inline-клавиатуру
																																																																		        try:
																																																																			        sent_msg
																																																																				=
																																																																				None
																																																																				        if
																																																																					"photo"
																																																																					in
																																																																					ud:
																																																																					            sent_msg
																																																																						    =
																																																																						    await
																																																																						    context.bot.send_photo(
																																																																						                    chat_id=chat_id,
																																																																								                    photo=ud["photo"],
																																																																										                    caption=(text
																																																																												    or
																																																																												    None),
																																																																												                    caption_entities=(entities
																																																																														    if
																																																																														    text
																																																																														    else
																																																																														    None),
																																																																														                    reply_markup=markup,
																																																																																                )
																																																																																		        elif
																																																																																			"animation"
																																																																																			in
																																																																																			ud:
																																																																																			            sent_msg
																																																																																				    =
																																																																																				    await
																																																																																				    context.bot.send_animation(
																																																																																				                    chat_id=chat_id,
																																																																																						                    animation=ud["animation"],
																																																																																								                    caption=(text
																																																																																										    or
																																																																																										    None),
																																																																																										                    caption_entities=(entities
																																																																																												    if
																																																																																												    text
																																																																																												    else
																																																																																												    None),
																																																																																												                    reply_markup=markup,
																																																																																														                )
																																																																																																        elif
																																																																																																	"video"
																																																																																																	in
																																																																																																	ud:
																																																																																																	            sent_msg
																																																																																																		    =
																																																																																																		    await
																																																																																																		    context.bot.send_video(
																																																																																																		                    chat_id=chat_id,
																																																																																																				                    video=ud["video"],
																																																																																																						                    caption=(text
																																																																																																								    or
																																																																																																								    None),
																																																																																																								                    caption_entities=(entities
																																																																																																										    if
																																																																																																										    text
																																																																																																										    else
																																																																																																										    None),
																																																																																																										                    reply_markup=markup,
																																																																																																												                )
																																																																																																														        elif
																																																																																																															"document"
																																																																																																															in
																																																																																																															ud:
																																																																																																															            sent_msg
																																																																																																																    =
																																																																																																																    await
																																																																																																																    context.bot.send_document(
																																																																																																																                    chat_id=chat_id,
																																																																																																																		                    document=ud["document"],
																																																																																																																				                    caption=(text
																																																																																																																						    or
																																																																																																																						    None),
																																																																																																																						                    caption_entities=(entities
																																																																																																																								    if
																																																																																																																								    text
																																																																																																																								    else
																																																																																																																								    None),
																																																																																																																								                    reply_markup=markup,
																																																																																																																										                )
																																																																																																																												        elif
																																																																																																																													"audio"
																																																																																																																													in
																																																																																																																													ud:
																																																																																																																													            sent_msg
																																																																																																																														    =
																																																																																																																														    await
																																																																																																																														    context.bot.send_audio(
																																																																																																																														                    chat_id=chat_id,
																																																																																																																																                    audio=ud["audio"],
																																																																																																																																		                    caption=(text
																																																																																																																																				    or
																																																																																																																																				    None),
																																																																																																																																				                    caption_entities=(entities
																																																																																																																																						    if
																																																																																																																																						    text
																																																																																																																																						    else
																																																																																																																																						    None),
																																																																																																																																						                    reply_markup=markup,
																																																																																																																																								                )
																																																																																																																																										        elif
																																																																																																																																											"voice"
																																																																																																																																											in
																																																																																																																																											ud:
																																																																																																																																											            sent_msg
																																																																																																																																												    =
																																																																																																																																												    await
																																																																																																																																												    context.bot.send_voice(
																																																																																																																																												                    chat_id=chat_id,
																																																																																																																																														                    voice=ud["voice"],
																																																																																																																																																                    caption=(text
																																																																																																																																																		    or
																																																																																																																																																		    None),
																																																																																																																																																		                    caption_entities=(entities
																																																																																																																																																				    if
																																																																																																																																																				    text
																																																																																																																																																				    else
																																																																																																																																																				    None),
																																																																																																																																																				                    reply_markup=markup,
																																																																																																																																																						                )
																																																																																																																																																								        elif
																																																																																																																																																									"sticker"
																																																																																																																																																									in
																																																																																																																																																									ud:
																																																																																																																																																									            sent_msg
																																																																																																																																																										    =
																																																																																																																																																										    await
																																																																																																																																																										    context.bot.send_sticker(
																																																																																																																																																										                    chat_id=chat_id,
																																																																																																																																																												                    sticker=ud["sticker"],
																																																																																																																																																														                    reply_markup=markup,
																																																																																																																																																																                )
																																																																																																																																																																		            if
																																																																																																																																																																			    text:
																																																																																																																																																																			                    await
																																																																																																																																																																					    context.bot.send_message(chat_id=chat_id,
																																																																																																																																																																					    text=text,
																																																																																																																																																																					    entities=entities)
																																																																																																																																																																					            else:
																																																																																																																																																																						                sent_msg
																																																																																																																																																																								=
																																																																																																																																																																								await
																																																																																																																																																																								context.bot.send_message(
																																																																																																																																																																								                chat_id=chat_id,
																																																																																																																																																																										                text=text,
																																																																																																																																																																												                entities=entities,
																																																																																																																																																																														                reply_markup=markup,
																																																																																																																																																																																            )

																																																																																																																																																																																	            #
																																																																																																																																																																																		    Страховка:
																																																																																																																																																																																		    если
																																																																																																																																																																																		    вдруг
																																																																																																																																																																																		    Telegram
																																																																																																																																																																																		    проглотил
																																																																																																																																																																																		    клаву
																																																																																																																																																																																		    —
																																																																																																																																																																																		    доклеим
																																																																																																																																																																																		    её
																																																																																																																																																																																		            if
																																																																																																																																																																																			    markup
																																																																																																																																																																																			    and
																																																																																																																																																																																			    getattr(sent_msg,
																																																																																																																																																																																			    "message_id",
																																																																																																																																																																																			    None):
																																																																																																																																																																																			                t
2025-09-06 13:54:24 +09:00

311 lines
13 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# handlers/new_post.py
from __future__ import annotations
from typing import List, Optional, Tuple
from telegram import (
Update, InlineKeyboardMarkup, InlineKeyboardButton, MessageEntity, Bot
)
from telegram.ext import (
ContextTypes, ConversationHandler, MessageHandler, CommandHandler, CallbackQueryHandler, filters
)
from telegram.constants import MessageEntityType
from telegram.error import BadRequest
from sqlalchemy import select as sa_select
from db import AsyncSessionLocal
from models import Channel, Group
from .permissions import get_or_create_admin, list_channels_for_admin, has_scope_on_channel, SCOPE_POST
from models import Channel, Group, Button
SELECT_MEDIA, SELECT_TEXT, SELECT_TARGET = range(3)
# ===== UTF-16 helpers (для custom_emoji) =====
def _utf16_units_len(s: str) -> int:
return len(s.encode("utf-16-le")) // 2
def _utf16_index_map(text: str) -> List[Tuple[int, int, str]]:
out: List[Tuple[int, int, str]] = []
off = 0
for ch in text:
ln = _utf16_units_len(ch)
out.append((off, ln, ch))
off += ln
return out
def _split_custom_emoji_by_utf16(text: str, entities: List[MessageEntity]) -> List[MessageEntity]:
if not text or not entities:
return entities or []
map_utf16 = _utf16_index_map(text)
out: List[MessageEntity] = []
for e in entities:
if (e.type == MessageEntityType.CUSTOM_EMOJI and e.length and e.length > 1 and getattr(e, "custom_emoji_id", None)):
start = e.offset
end = e.offset + e.length
for uoff, ulen, _ in map_utf16:
if start <= uoff < end:
out.append(MessageEntity(
type=MessageEntityType.CUSTOM_EMOJI,
offset=uoff,
length=ulen,
custom_emoji_id=e.custom_emoji_id,
))
else:
out.append(e)
out.sort(key=lambda x: x.offset)
return out
def _strip_broken_entities(entities: Optional[List[MessageEntity]]) -> List[MessageEntity]:
cleaned: List[MessageEntity] = []
for e in entities or []:
if e.offset is None or e.length is None or e.offset < 0 or e.length < 1:
continue
if e.type == MessageEntityType.CUSTOM_EMOJI and not getattr(e, "custom_emoji_id", None):
continue
cleaned.append(e)
cleaned.sort(key=lambda x: x.offset)
return cleaned
def _extract_text_and_entities(msg) -> tuple[str, List[MessageEntity], bool]:
if getattr(msg, "text", None):
return msg.text, (msg.entities or []), False
if getattr(msg, "caption", None):
return msg.caption, (msg.caption_entities or []), True
return "", [], False
# ===== Conversation =====
async def new_post_start(update: Update, context: ContextTypes.DEFAULT_TYPE):
if update.message:
await update.message.reply_text("Отправьте медиа для поста или пришлите /skip:")
return SELECT_MEDIA
return ConversationHandler.END
async def select_media(update: Update, context: ContextTypes.DEFAULT_TYPE):
if context.user_data is None:
context.user_data = {}
if not update.message:
return ConversationHandler.END
msg = update.message
if msg.text and msg.text.strip().lower() == "/skip":
await update.message.reply_text("Введите текст поста или перешлите сообщение (можно с кастом-эмодзи):")
return SELECT_TEXT
if msg.photo: context.user_data["photo"] = msg.photo[-1].file_id
elif msg.animation:context.user_data["animation"] = msg.animation.file_id
elif msg.video: context.user_data["video"] = msg.video.file_id
elif msg.document: context.user_data["document"] = msg.document.file_id
elif msg.audio: context.user_data["audio"] = msg.audio.file_id
elif msg.voice: context.user_data["voice"] = msg.voice.file_id
elif msg.sticker: context.user_data["sticker"] = msg.sticker.file_id
await update.message.reply_text("Введите текст поста или перешлите сообщение (можно с кастом-эмодзи):")
return SELECT_TEXT
async def select_text(update: Update, context: ContextTypes.DEFAULT_TYPE):
if not update.message:
return ConversationHandler.END
if context.user_data is None:
context.user_data = {}
msg = update.message
text, entities, _ = _extract_text_and_entities(msg)
entities = _strip_broken_entities(entities)
entities = _split_custom_emoji_by_utf16(text, entities)
# сохраним исходник для copyMessage
context.user_data["text"] = text
context.user_data["entities"] = entities
context.user_data["src_chat_id"] = update.effective_chat.id
context.user_data["src_msg_id"] = update.message.message_id
# дать выбор только тех каналов, где у текущего админа есть право постинга
async with AsyncSessionLocal() as session:
me = await get_or_create_admin(session, update.effective_user.id)
channels = await list_channels_for_admin(session, me.id)
# группы оставляем без ACL (как было)
groups = (await session.execute(sa_select(Group))).scalars().all()
# если каналов нет — всё равно покажем группы
keyboard = []
for c in channels:
keyboard.append([InlineKeyboardButton(f'Канал: {c.name}', callback_data=f'channel_{c.id}')])
for g in groups:
keyboard.append([InlineKeyboardButton(f'Группа: {g.name}', callback_data=f'group_{g.id}')])
if not keyboard:
await update.message.reply_text("Нет доступных каналов/групп для отправки.")
return ConversationHandler.END
await update.message.reply_text('Выберите, куда отправить пост:', reply_markup=InlineKeyboardMarkup(keyboard))
return SELECT_TARGET
async def select_target(update: Update, context: ContextTypes.DEFAULT_TYPE):
query = update.callback_query
if not query:
return ConversationHandler.END
await query.answer()
data = (query.data or "")
async with AsyncSessionLocal() as session:
chat_id: str | None = None
markup: InlineKeyboardMarkup | None = None
selected_title: str | None = None
btns = []
if data.startswith('channel_'):
channel_id = int(data.split('_', 1)[1])
# ACL: право постинга в канал
me = await get_or_create_admin(session, update.effective_user.id)
allowed = await has_scope_on_channel(session, me.id, channel_id, SCOPE_POST)
if not allowed:
await query.edit_message_text("У вас нет права постить в этот канал.")
return ConversationHandler.END
channel = (await session.execute(sa_select(Channel).where(Channel.id == channel_id))).scalar_one_or_none()
if not channel:
await query.edit_message_text("Канал не найден.")
return ConversationHandler.END
chat_id = (channel.link or "").strip()
selected_title = channel.name
# Кнопки канала
btns = (await session.execute(sa_select(Button).where(Button.channel_id == channel_id))).scalars().all()
if btns:
rows = [[InlineKeyboardButton(str(b.name), url=str(b.url))] for b in btns]
markup = InlineKeyboardMarkup(rows)
elif data.startswith('group_'):
group_id = int(data.split('_', 1)[1])
group = (await session.execute(sa_select(Group).where(Group.id == group_id))).scalar_one_or_none()
if not group:
await query.edit_message_text("Группа не найдена.")
return ConversationHandler.END
chat_id = (group.link or "").strip()
selected_title = group.name
# Кнопки группы
btns = (await session.execute(sa_select(Button).where(Button.group_id == group_id))).scalars().all()
if btns:
rows = [[InlineKeyboardButton(str(b.name), url=str(b.url))] for b in btns]
markup = InlineKeyboardMarkup(rows)
if not chat_id or not (chat_id.startswith('@') or chat_id.startswith('-')):
await query.edit_message_text('Ошибка: ссылка должна быть username (@channel) или числовой ID (-100...)')
return ConversationHandler.END
# DEBUG: сколько кнопок нашли и есть ли markup
print(f"[DEBUG] send -> chat_id={chat_id} title={selected_title!r} buttons={len(btns)} has_markup={bool(markup)}")
# Текст и entities (без parse_mode)
ud = context.user_data or {}
text: str = ud.get("text", "") or ""
entities: List[MessageEntity] = ud.get("entities", []) or []
entities = _strip_broken_entities(entities)
entities = _split_custom_emoji_by_utf16(text, entities)
# Всегда ручная отправка (send_*), чтобы гарантированно приклеить inline-клавиатуру
try:
sent_msg = None
if "photo" in ud:
sent_msg = await context.bot.send_photo(
chat_id=chat_id,
photo=ud["photo"],
caption=(text or None),
caption_entities=(entities if text else None),
reply_markup=markup,
)
elif "animation" in ud:
sent_msg = await context.bot.send_animation(
chat_id=chat_id,
animation=ud["animation"],
caption=(text or None),
caption_entities=(entities if text else None),
reply_markup=markup,
)
elif "video" in ud:
sent_msg = await context.bot.send_video(
chat_id=chat_id,
video=ud["video"],
caption=(text or None),
caption_entities=(entities if text else None),
reply_markup=markup,
)
elif "document" in ud:
sent_msg = await context.bot.send_document(
chat_id=chat_id,
document=ud["document"],
caption=(text or None),
caption_entities=(entities if text else None),
reply_markup=markup,
)
elif "audio" in ud:
sent_msg = await context.bot.send_audio(
chat_id=chat_id,
audio=ud["audio"],
caption=(text or None),
caption_entities=(entities if text else None),
reply_markup=markup,
)
elif "voice" in ud:
sent_msg = await context.bot.send_voice(
chat_id=chat_id,
voice=ud["voice"],
caption=(text or None),
caption_entities=(entities if text else None),
reply_markup=markup,
)
elif "sticker" in ud:
sent_msg = await context.bot.send_sticker(
chat_id=chat_id,
sticker=ud["sticker"],
reply_markup=markup,
)
if text:
await context.bot.send_message(chat_id=chat_id, text=text, entities=entities)
else:
sent_msg = await context.bot.send_message(
chat_id=chat_id,
text=text,
entities=entities,
reply_markup=markup,
)
# Страховка: если вдруг Telegram проглотил клаву — доклеим её
if markup and getattr(sent_msg, "message_id", None):
try:
await context.bot.edit_message_reply_markup(
chat_id=chat_id,
message_id=sent_msg.message_id,
reply_markup=markup,
)
except Exception:
pass
await query.edit_message_text(f'Пост отправлен{(" в: " + selected_title) if selected_title else "!"}')
except BadRequest as e:
await query.edit_message_text(f'Ошибка отправки поста: {e}')
return ConversationHandler.END
new_post_conv = ConversationHandler(
entry_points=[CommandHandler("new_post", new_post_start)],
states={
SELECT_MEDIA: [MessageHandler(
filters.PHOTO | filters.ANIMATION | filters.VIDEO | filters.Document.ALL |
filters.AUDIO | filters.VOICE | filters.Sticker.ALL | filters.COMMAND,
select_media
)],
SELECT_TEXT: [MessageHandler(filters.TEXT | filters.FORWARDED | filters.CAPTION, select_text)],
SELECT_TARGET: [CallbackQueryHandler(select_target)],
},
fallbacks=[],
)