Files
post_bot/handlers/new_post.py
Choi A.K. 405e663da4
Some checks failed
continuous-integration/drone/push Build is failing
migrations fixes
admin check-in
2025-09-06 08:41:28 +09:00

326 lines
14 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from typing import List, Optional
from telegram import (
Update,
InlineKeyboardMarkup,
InlineKeyboardButton,
MessageEntity,
)
from telegram.ext import (
ContextTypes,
ConversationHandler,
MessageHandler,
CommandHandler,
CallbackQueryHandler,
filters,
)
from telegram.constants import MessageEntityType
from sqlalchemy import select as sa_select
from db import AsyncSessionLocal
from models import Channel, Group, Button
from models import Admin
SELECT_MEDIA, SELECT_TEXT, SELECT_TARGET = range(3)
# ========== UTF-16 helpers ==========
def _utf16_units_len(s: str) -> int:
"""Длина строки в UTF-16 code units (LE)."""
return len(s.encode("utf-16-le")) // 2
def _normalize_custom_emoji_entities(text: str, entities: List[MessageEntity]) -> List[MessageEntity]:
"""
Для entity типа CUSTOM_EMOJI Telegram ожидает offset/length в UTF-16.
Если вдруг кастомная эмодзи пришла с length > 1 (несколько символов),
дробим на несколько entity, каждая длиной в реальное кол-во UTF-16 юнитов
(обычно 2 на эмодзи).
Остальные entity не трогаем (у них уже корректные UTF-16 offset/length).
"""
if not text or not entities:
return entities or []
out: List[MessageEntity] = []
for e in entities:
if (
e.type == MessageEntityType.CUSTOM_EMOJI
and e.length is not None
and e.length > 1
and getattr(e, "custom_emoji_id", None)
):
# Возьмём срез по codepoints — нормально, но считать длины будем в UTF-16
substr = text[e.offset: e.offset + e.length]
rel_utf16 = 0
for ch in substr:
ch_len16 = _utf16_units_len(ch) # чаще всего 2
out.append(
MessageEntity(
type=MessageEntityType.CUSTOM_EMOJI,
offset=e.offset + rel_utf16,
length=ch_len16,
custom_emoji_id=e.custom_emoji_id,
url=None, user=None, language=None
)
)
rel_utf16 += ch_len16
else:
out.append(e)
return out
def _strip_none_entities(entities: Optional[List[MessageEntity]]) -> List[MessageEntity]:
"""
Фильтрация мусорных/пустых entity, чтобы не падать на сервере.
"""
cleaned: List[MessageEntity] = []
for e in entities or []:
if e.offset is None or e.length is None or e.offset < 0 or e.length < 1:
continue
if e.type == MessageEntityType.CUSTOM_EMOJI:
if not getattr(e, "custom_emoji_id", None):
continue
cleaned.append(e)
return cleaned
def _extract_text_and_entities(msg) -> tuple[str, List[MessageEntity], bool]:
"""
Унифицированно достаём текст и entities из сообщения:
- Если есть text -> берём text/entities, is_caption=False
- Иначе если есть caption -> берём caption/caption_entities, is_caption=True
- Иначе пусто
"""
if getattr(msg, "text", None):
return msg.text, (msg.entities or []), False
if getattr(msg, "caption", None):
return msg.caption, (msg.caption_entities or []), True
return "", [], False
# ========== Conversation handlers ==========
async def new_post_start(update: Update, context: ContextTypes.DEFAULT_TYPE):
if update.message:
await update.message.reply_text("Отправьте картинку/медиа для поста или пришлите /skip:")
return SELECT_MEDIA
return ConversationHandler.END
async def select_media(update: Update, context: ContextTypes.DEFAULT_TYPE):
if context.user_data is None:
context.user_data = {}
if not update.message:
return ConversationHandler.END
msg = update.message
# Поддержка пропуска медиа
if msg.text and msg.text.strip().lower() == "/skip":
await update.message.reply_text("Введите текст поста или пересланное сообщение (можно с эмодзи/разметкой):")
return SELECT_TEXT
# Сохраняем медиа (последний вариант — стикер)
if msg.photo:
context.user_data["photo"] = msg.photo[-1].file_id
elif msg.animation:
context.user_data["animation"] = msg.animation.file_id
elif msg.video:
context.user_data["video"] = msg.video.file_id
elif msg.document:
context.user_data["document"] = msg.document.file_id
elif msg.audio:
context.user_data["audio"] = msg.audio.file_id
elif msg.voice:
context.user_data["voice"] = msg.voice.file_id
elif msg.sticker:
context.user_data["sticker"] = msg.sticker.file_id
await update.message.reply_text("Введите текст поста или пересланное сообщение (можно с эмодзи/разметкой):")
return SELECT_TEXT
async def select_text(update: Update, context: ContextTypes.DEFAULT_TYPE):
if not update.message:
return ConversationHandler.END
if context.user_data is None:
context.user_data = {}
msg = update.message
# Берём как есть: либо text/entities, либо caption/caption_entities
if msg.text:
text = msg.text
entities = msg.entities or []
else:
text = msg.caption or ""
entities = msg.caption_entities or []
# Сохраняем исходные данные БЕЗ изменений
context.user_data["text"] = text
context.user_data["entities"] = entities
if update.effective_chat and hasattr(update.effective_chat, 'id'):
context.user_data["src_chat_id"] = update.effective_chat.id
else:
context.user_data["src_chat_id"] = None
context.user_data["src_msg_id"] = update.message.message_id
# Дальше как у тебя: выбор канала/группы
from sqlalchemy import select as sa_select
user = update.effective_user
if not user or not hasattr(user, 'id'):
await update.message.reply_text('Ошибка: не удалось определить пользователя.')
return ConversationHandler.END
admin_tg_id = user.id
async with AsyncSessionLocal() as session:
admin_obj = (await session.execute(sa_select(Admin).where(Admin.tg_id == admin_tg_id))).scalar_one_or_none()
admin_id = admin_obj.id if admin_obj else None
channels = (await session.execute(sa_select(Channel).where(Channel.admin_id == admin_id))).scalars().all() if admin_id is not None else []
groups = (await session.execute(sa_select(Group).where(Group.admin_id == admin_id))).scalars().all() if admin_id is not None else []
keyboard = []
for c in channels:
keyboard.append([InlineKeyboardButton(f'Канал: {c.name}', callback_data=f'channel_{c.id}')])
for g in groups:
keyboard.append([InlineKeyboardButton(f'Группа: {g.name}', callback_data=f'group_{g.id}')])
await update.message.reply_text('Выберите, куда отправить пост:', reply_markup=InlineKeyboardMarkup(keyboard))
return SELECT_TARGET
async def select_target(update: Update, context: ContextTypes.DEFAULT_TYPE):
query = update.callback_query
if not query:
return ConversationHandler.END
await query.answer()
data = query.data
from sqlalchemy import select as sa_select
async with AsyncSessionLocal() as session:
chat_id = None
markup = None
if data and data.startswith('channel_'):
channel_id = int(data.split('_')[1])
channel = (await session.execute(sa_select(Channel).where(Channel.id == channel_id))).scalar_one_or_none()
buttons = (await session.execute(sa_select(Button).where(Button.channel_id == channel_id))).scalars().all()
markup = InlineKeyboardMarkup([[InlineKeyboardButton(str(b.name), url=str(b.url))] for b in buttons]) if buttons else None
chat_id = channel.link if channel else None
elif data and data.startswith('group_'):
group_id = int(data.split('_')[1])
group = (await session.execute(sa_select(Group).where(Group.id == group_id))).scalar_one_or_none()
buttons = (await session.execute(sa_select(Button).where(Button.group_id == group_id))).scalars().all()
markup = InlineKeyboardMarkup([[InlineKeyboardButton(str(b.name), url=str(b.url))] for b in buttons]) if buttons else None
chat_id = group.link if group else None
if chat_id is None:
await query.edit_message_text('Ошибка: объект не найден.')
return ConversationHandler.END
chat_id = chat_id.strip()
if not (chat_id.startswith('@') or chat_id.startswith('-')):
await query.edit_message_text('Ошибка: ссылка должна быть username (@channel) или числовой ID (-100...)')
return ConversationHandler.END
try:
ud = context.user_data or {}
text = ud.get('text', '') or ''
entities = ud.get('entities', []) or []
# Если это одно входящее сообщение без “сборки” из двух шагов — идеально копируем:
# (то есть пользователь НЕ загружал медиа в SELECT_MEDIA)
has_media = any(k in ud for k in ('photo','animation','video','document','audio','voice','sticker'))
if ud.get('src_chat_id') and ud.get('src_msg_id') and not has_media:
await context.bot.copy_message(
chat_id=chat_id,
from_chat_id=ud['src_chat_id'],
message_id=ud['src_msg_id'],
reply_markup=markup,
)
await query.edit_message_text('Пост отправлен!')
return ConversationHandler.END
# Иначе — отправляем собранный пост, но entities/emoji берём ИСХОДНЫЕ
sent = False
if 'photo' in ud:
await context.bot.send_photo(chat_id=chat_id, photo=ud['photo'],
caption=text or None, caption_entities=entities if text else None,
reply_markup=markup)
sent = True
elif 'animation' in ud:
await context.bot.send_animation(chat_id=chat_id, animation=ud['animation'],
caption=text or None, caption_entities=entities if text else None,
reply_markup=markup)
sent = True
elif 'video' in ud:
await context.bot.send_video(chat_id=chat_id, video=ud['video'],
caption=text or None, caption_entities=entities if text else None,
reply_markup=markup)
sent = True
elif 'document' in ud:
await context.bot.send_document(chat_id=chat_id, document=ud['document'],
caption=text or None, caption_entities=entities if text else None,
reply_markup=markup)
sent = True
elif 'audio' in ud:
await context.bot.send_audio(chat_id=chat_id, audio=ud['audio'],
caption=text or None, caption_entities=entities if text else None,
reply_markup=markup)
sent = True
elif 'voice' in ud:
await context.bot.send_voice(chat_id=chat_id, voice=ud['voice'],
caption=text or None, caption_entities=entities if text else None,
reply_markup=markup)
sent = True
elif 'sticker' in ud:
await context.bot.send_sticker(chat_id=chat_id, sticker=ud['sticker'])
if text:
await context.bot.send_message(chat_id=chat_id, text=text, entities=entities, reply_markup=markup)
sent = True
else:
await context.bot.send_message(chat_id=chat_id, text=text, entities=entities, reply_markup=markup)
sent = True
await query.edit_message_text('Пост отправлен!' if sent else 'Ошибка: не удалось отправить сообщение.')
except Exception as e:
await query.edit_message_text(f'Ошибка отправки поста: {e}')
return ConversationHandler.END
new_post_conv = ConversationHandler(
entry_points=[CommandHandler("new_post", new_post_start)],
states={
# Принимаем любые медиа + /skip
SELECT_MEDIA: [
MessageHandler(
filters.PHOTO
| filters.ANIMATION
| filters.VIDEO
| filters.Document.ALL
| filters.AUDIO
| filters.VOICE
| filters.Sticker.ALL
| filters.COMMAND,
select_media,
)
],
# Принимаем любой текст/пересланное (caption попадёт как caption_entities через media-фильтры на предыдущем шаге)
SELECT_TEXT: [
MessageHandler(
filters.ALL & (filters.TEXT | filters.FORWARDED),
select_text
)
],
SELECT_TARGET: [CallbackQueryHandler(select_target)],
},
fallbacks=[],
)