Files
postbot/app/bots/editor/handlers.py

1586 lines
57 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from __future__ import annotations
import re
from datetime import datetime
from urllib.parse import urlparse
from typing import Optional, Dict, List, Any, Union, cast, Literal, TypedDict, TypeAlias, Awaitable, Callable
from telegram import Update, InlineKeyboardButton, InlineKeyboardMarkup, Message
from telegram.ext import ConversationHandler, ContextTypes
from telegram.constants import ParseMode
from sqlalchemy import select, delete, or_
from .states import BotStates
from app.models.templates import Template, TemplateVisibility, PostStatus
from app.models.post import PostType
from app.db.session import async_session_maker
async def choose_template_navigate(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int:
"""Обработчик навигации по списку шаблонов."""
query = update.callback_query
if not query or not query.data:
return BotStates.SELECT_TEMPLATE
await query.answer()
data = query.data
if data == "back":
# Вернуться в предыдущее меню
kb = []
kb.append([InlineKeyboardButton(text="📝 Создать пост", callback_data="post:new")])
kb.append([InlineKeyboardButton(text="📋 Мои шаблоны", callback_data="tpl:list")])
kb.append([InlineKeyboardButton(text="🔧 Управление", callback_data="settings")])
reply_markup = InlineKeyboardMarkup(kb)
await query.edit_message_text(
text="Выберите действие:",
reply_markup=reply_markup
)
return BotStates.MAIN_MENU
# Обработка выбора шаблона
template_parts = data.split(":")
if len(template_parts) == 3 and template_parts[0] == "tpl" and template_parts[1] == "select":
try:
template_id = int(template_parts[2])
async with async_session_maker() as session:
stmt = select(Template).where(Template.id == template_id)
template = (await session.execute(stmt)).scalar_one_or_none()
if template:
if context.user_data is None:
context.user_data = {}
context.user_data["current_template"] = {
"id": template.id,
"name": template.name,
"query": query
}
# Показать предпросмотр и действия
kb = []
kb.append([InlineKeyboardButton(text="✅ Использовать", callback_data="tpl:use")])
kb.append([InlineKeyboardButton(text="🔙 Назад", callback_data="tpl:choose")])
reply_markup = InlineKeyboardMarkup(kb)
preview = f"Шаблон: {template.name}\n\n{template.content}"
await query.edit_message_text(
text=preview,
reply_markup=reply_markup
)
return BotStates.TEMPLATE_PREVIEW
except (ValueError, TypeError):
pass
return BotStates.SELECT_TEMPLATE
async def choose_template_apply(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int:
"""Применить выбранный шаблон."""
query = update.callback_query
if not query:
return BotStates.TEMPLATE_PREVIEW
await query.answer()
if context.user_data is None:
context.user_data = {}
template = context.user_data.get("current_template")
if not template:
await query.edit_message_text(
text="Ошибка: шаблон не выбран",
reply_markup=InlineKeyboardMarkup([[
InlineKeyboardButton(text="🔙 Назад", callback_data="tpl:choose")
]])
)
return BotStates.SELECT_TEMPLATE
# Подготовка переменных для шаблона
kb = []
kb.append([InlineKeyboardButton(text="✅ Подтвердить", callback_data="tpl:confirm")])
kb.append([InlineKeyboardButton(text="🔙 Назад", callback_data="tpl:preview")])
reply_markup = InlineKeyboardMarkup(kb)
text = "Укажите значения для переменных шаблона:"
await query.edit_message_text(text=text, reply_markup=reply_markup)
return BotStates.TEMPLATE_VARS
async def choose_channel_handler(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int:
"""Обработчик выбора канала."""
query = update.callback_query
if not query:
return BotStates.SELECT_TEMPLATE
template = context.user_data.get("current_template") if context.user_data else None
if not template:
await query.edit_message_text(
text="Ошибка: шаблон не выбран",
reply_markup=InlineKeyboardMarkup([[
InlineKeyboardButton(text="🔙 Назад", callback_data="tpl:choose")
]])
)
return BotStates.SELECT_TEMPLATE
# Подготовка переменных для шаблона
kb = []
kb.append([InlineKeyboardButton(text="✅ Подтвердить", callback_data="tpl:confirm")])
kb.append([InlineKeyboardButton(text="🔙 Назад", callback_data="tpl:preview")])
reply_markup = InlineKeyboardMarkup(kb)
text = "Укажите значения для переменных шаблона:"
await query.edit_message_text(text=text, reply_markup=reply_markup)
return BotStates.TEMPLATE_VARS
async def choose_template_preview(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int:
"""Предпросмотр шаблона."""
query = update.callback_query
if not query:
return BotStates.TEMPLATE_PREVIEW
await query.answer()
if context.user_data is None:
context.user_data = {}
template = context.user_data.get("current_template")
if not template:
await query.edit_message_text(
text="Ошибка: шаблон не выбран",
reply_markup=InlineKeyboardMarkup([[
InlineKeyboardButton(text="🔙 Назад", callback_data="tpl:choose")
]])
)
return BotStates.SELECT_TEMPLATE
# Показать предпросмотр и действия
kb = []
kb.append([InlineKeyboardButton(text="✅ Использовать", callback_data="tpl:use")])
kb.append([InlineKeyboardButton(text="🔙 Назад", callback_data="tpl:choose")])
reply_markup = InlineKeyboardMarkup(kb)
preview = f"Шаблон: {template['name']}\n\n{template['content']}"
await query.edit_message_text(
text=preview,
reply_markup=reply_markup
)
return BotStates.TEMPLATE_PREVIEW
async def choose_template_cancel(update: Update, context: CallbackContext) -> int:
"""Отмена выбора шаблона."""
query = update.callback_query
if query:
await query.answer()
if context.user_data is None:
context.user_data = {}
if "current_template" in context.user_data:
del context.user_data["current_template"]
await choose_template_open(update, context)
return BotStates.SELECT_TEMPLATE
async def preview_collect_vars(update: Update, context: CallbackContext) -> int:
"""Сбор переменных для шаблона."""
query = update.callback_query
if not query:
return BotStates.TEMPLATE_VARS
await query.answer()
# Здесь будет логика сбора переменных
return BotStates.TEMPLATE_VARS
async def preview_confirm(update: Update, context: CallbackContext) -> int:
"""Подтверждение использования шаблона."""
query = update.callback_query
if not query:
return BotStates.TEMPLATE_VARS
await query.answer()
if context.user_data is None:
context.user_data = {}
template = context.user_data.get("current_template")
if not template:
await query.edit_message_text(
text="Ошибка: шаблон не выбран",
reply_markup=InlineKeyboardMarkup([[
InlineKeyboardButton(text="🔙 Назад", callback_data="tpl:choose")
]])
)
return BotStates.SELECT_TEMPLATE
# Применяем шаблон и возвращаемся к созданию поста
context.user_data["current_post"] = {
"type": template["type"],
"content": template["content"],
"keyboard": template["keyboard"]
}
await query.edit_message_text(
text="Шаблон применен. Выберите канал для публикации:",
reply_markup=InlineKeyboardMarkup([[
InlineKeyboardButton(text="Выбрать канал", callback_data="channel:select")
]])
)
return BotStates.CHANNEL_SELECT_BOT
async def choose_template_open(update: Update, context: CallbackContext) -> int:
"""Открыть список шаблонов для выбора."""
query = update.callback_query
if query:
await query.answer()
kb = []
user_data = context.user_data or {}
user_id = user_data.get("user_id")
async with async_session_maker() as session:
# Получаем все публичные шаблоны и приватные шаблоны пользователя
conditions = [Template.visibility == TemplateVisibility.public]
if user_id:
conditions.append(Template.owner_id == user_id)
stmt = select(Template).where(or_(*conditions))
templates = (await session.execute(stmt)).scalars().all()
for template in templates:
kb.append([
InlineKeyboardButton(
text=f"📝 {template.name}",
callback_data=f"tpl:select:{template.id}"
)
])
kb.append([InlineKeyboardButton(text="🔙 Назад", callback_data="back")])
reply_markup = InlineKeyboardMarkup(kb)
text = "Выберите шаблон из списка:"
if query:
await query.edit_message_text(text=text, reply_markup=reply_markup)
elif update.message:
await update.message.reply_text(text=text, reply_markup=reply_markup)
return BotStates.SELECT_TEMPLATE
def validate_url(url: str | None) -> bool:
"""
Проверка корректности URL.
Args:
url: URL для проверки
Returns:
True если URL корректен, False в противном случае
"""
if not url:
return False
try:
result = urlparse(url)
return bool(result.scheme in ('http', 'https') and result.netloc)
except Exception:
return False
async def tpl_new_start(update: Update, context: CallbackContext) -> int:
"""Начало создания нового шаблона."""
message = update.effective_message
if not message:
return BotStates.CONVERSATION_END
await message.reply_text(
"📝 Создание нового шаблона\n\n"
"Введите название шаблона (максимум 100 символов):"
)
return BotStates.TPL_NEW_NAME
async def tpl_new_name(update: Update, context: CallbackContext) -> int:
"""Обработка ввода имени шаблона."""
message = update.effective_message
if not message or not message.text:
return BotStates.CONVERSATION_END
name = message.text.strip()
if len(name) > 100:
await message.reply_text("❌ Слишком длинное название (максимум 100 символов)")
return BotStates.TPL_NEW_NAME
if not isinstance(context.user_data, dict):
context.user_data = {}
context.user_data["template_name"] = name
keyboard = [
[{"text": "📝 Текст", "callback_data": "type:text"}],
[{"text": "🖼 Фото", "callback_data": "type:photo"}],
[{"text": "🎥 Видео", "callback_data": "type:video"}],
[{"text": "🎭 Анимация", "callback_data": "type:animation"}]
]
markup = create_inline_keyboard(keyboard)
await message.reply_text(
"📋 Выберите тип шаблона:",
reply_markup=markup
)
return BotStates.TPL_NEW_TYPE
async def tpl_new_type(update: Update, context: CallbackContext) -> int:
"""Обработка выбора типа шаблона."""
query = update.callback_query
if not query or not query.data or not isinstance(context.user_data, dict):
return BotStates.CONVERSATION_END
await query.answer()
type_name = query.data.split(":", 1)[1]
context.user_data["template_type"] = type_name
keyboard = [
[{"text": "HTML", "callback_data": "format:html"}],
[{"text": "Markdown", "callback_data": "format:markdown"}],
[{"text": "Простой текст", "callback_data": "format:plain"}]
]
markup = create_inline_keyboard(keyboard)
await query.edit_message_text(
"📋 Выберите формат текста:",
reply_markup=markup
)
return BotStates.TPL_NEW_FORMAT
async def tpl_new_format(update: Update, context: CallbackContext) -> int:
"""Обработка выбора формата текста."""
query = update.callback_query
if not query or not query.data or not isinstance(context.user_data, dict):
return BotStates.CONVERSATION_END
await query.answer()
format_name = query.data.split(":", 1)[1]
context.user_data["template_format"] = format_name
await query.edit_message_text(
"📝 Введите содержимое шаблона\n\n"
"Используйте следующий синтаксис для переменных:\n"
"{{variable_name}}\n\n"
"Например: Здравствуйте, {{name}}!"
)
return BotStates.TPL_NEW_CONTENT
async def tpl_new_content(update: Update, context: CallbackContext) -> int:
"""Обработка ввода содержимого шаблона."""
message = update.effective_message
if not message or not message.text or not isinstance(context.user_data, dict):
return BotStates.CONVERSATION_END
context.user_data["template_content"] = message.text
await message.reply_text(
"⌨️ Введите разметку клавиатуры в формате:\n"
"Кнопка 1 -> http://example1.com\n"
"Кнопка 2 -> http://example2.com\n\n"
"Или /skip чтобы пропустить"
)
return BotStates.TPL_NEW_KB
async def tpl_new_kb(update: Update, context: CallbackContext) -> int:
"""Обработка ввода клавиатуры для шаблона."""
message = update.effective_message
if not message or not isinstance(context.user_data, dict):
return BotStates.CONVERSATION_END
if message.text == "/skip":
keyboard_tpl = None
else:
# Парсим разметку клавиатуры
lines = validate_text(message.text)
keyboard_tpl = []
row = []
try:
for line in lines:
if not line:
continue
parts = line.split("->", 1)
if len(parts) != 2:
raise ValueError("Неверный формат кнопки")
text = parts[0].strip()
url = parts[1].strip()
# Проверяем URL
if not validate_url(url):
raise ValueError(f"Недопустимый URL: {url}")
row.append({"text": text, "url": url})
if len(row) >= 2:
keyboard_tpl.append(row)
row = []
if row:
keyboard_tpl.append(row)
except ValueError as e:
await message.reply_text(f"❌ Ошибка в разметке клавиатуры: {str(e)}")
return BotStates.TPL_NEW_KB
# Сохраняем шаблон
try:
template = Template(
owner_id=message.from_user.id if message.from_user else 0,
name=context.user_data["template_name"],
title=context.user_data["template_name"],
content=context.user_data["template_content"],
type=context.user_data["template_type"],
parse_mode=context.user_data["template_format"].upper(),
keyboard_tpl=keyboard_tpl if keyboard_tpl else None
)
async with async_session_maker() as session:
session.add(template)
await session.commit()
await message.reply_text("✅ Шаблон успешно создан!")
except Exception as e:
await message.reply_text(f"❌ Ошибка при сохранении шаблона: {str(e)}")
return BotStates.CONVERSATION_END
async def tpl_list(update: Update, context: CallbackContext) -> int:
"""Просмотр списка шаблонов."""
message = update.effective_message
if not message:
return BotStates.CONVERSATION_END
try:
async with async_session_maker() as session:
# Получаем шаблоны пользователя
user_id = message.from_user.id if message.from_user else 0
result = await session.execute(
select(Template).where(Template.owner_id == user_id)
)
templates = list(result.scalars())
if not templates:
await message.reply_text(
"📂 У вас пока нет шаблонов\n\n"
"Используйте /new_template чтобы создать"
)
return BotStates.CONVERSATION_END
# Формируем список
text = "📂 Ваши шаблоны:\n\n"
for tpl in templates:
text += f"📑 {tpl.name} ({tpl.type})\n"
await message.reply_text(text)
except Exception as e:
await message.reply_text("❌ Ошибка при получении списка шаблонов")
return BotStates.CONVERSATION_END
from telegram import (
Update,
InlineKeyboardMarkup,
InlineKeyboardButton,
Message,
Chat,
)
from telegram.constants import MessageType, ParseMode, ChatType
from telegram.ext import CallbackContext, ConversationHandler
from sqlalchemy import select
from app.models.bot import Bot as BotModel
from app.models.channel import Channel as ChannelModel
from app.bots.editor.states import BotStates
# Message states and types
TYPE_PHOTO = "photo"
TYPE_VIDEO = "video"
TYPE_ANIMATION = "animation"
# Type definitions
ButtonData = Dict[str, str] # {'text': str, 'url': str} | {'text': str, 'callback_data': str}
UserData = Dict[str, Any] # Type alias for telegram context user_data
# Message type mappings
TYPE_MAP = {
TYPE_PHOTO: 'фотографию',
TYPE_VIDEO: 'видео',
TYPE_ANIMATION: 'анимацию'
}
def get_media_type_text(post_type: str | None) -> str:
"""Get human-readable media type description."""
if not post_type:
return 'медиафайл'
return TYPE_MAP.get(post_type, 'медиафайл')
def create_keyboard_button(text: str, url: str | None = None, callback_data: str | None = None) -> InlineKeyboardButton:
"""Create an InlineKeyboardButton with either url or callback_data."""
if url:
return InlineKeyboardButton(text=text, url=url)
return InlineKeyboardButton(text=text, callback_data=callback_data or text)
def create_inline_markup(buttons: list[list[ButtonData]]) -> InlineKeyboardMarkup:
"""
Create an inline keyboard from button data.
Args:
buttons: List of button rows, each containing dicts with 'text' and either 'url' or 'callback_data'
Returns:
Formatted InlineKeyboardMarkup ready to use in messages
"""
keyboard = []
for row in buttons:
kb_row = []
for btn in row:
text = btn.get('text', '')
url = btn.get('url')
callback_data = btn.get('callback_data', text)
kb_row.append(create_keyboard_button(text, url, callback_data))
if kb_row:
keyboard.append(kb_row)
return InlineKeyboardMarkup(keyboard)
# Message states and types
TYPE_PHOTO = "photo"
TYPE_VIDEO = "video"
TYPE_ANIMATION = "animation"
MediaType = Literal[TYPE_PHOTO, TYPE_VIDEO, TYPE_ANIMATION]
type_map: Dict[str, str] = {
TYPE_PHOTO: 'фотографию',
TYPE_VIDEO: 'видео',
TYPE_ANIMATION: 'анимацию'
}
# Helper functions
def extract_text(msg: Message | None) -> str | None:
"""Extract text from message."""
try:
return msg.text if msg and msg.text else None
except AttributeError:
return None
def validate_text(text: str | None) -> list[str]:
"""Validate and split text into lines, removing empty lines."""
if not text:
return []
try:
return [line for line in text.strip().split("\n") if line.strip()]
except (AttributeError, Exception):
return []
def validate_keyboard_data(data: Any) -> list[list[dict[str, str]]]:
"""
Validate and normalize keyboard data.
Args:
data: Raw keyboard data dictionary that should contain 'keyboard' key
with a list of button rows
Returns:
List of valid keyboard rows, where each row is a list of button dictionaries
with at least a 'text' key
"""
if not data or not isinstance(data, dict):
return []
keyboard = data.get('keyboard', [])
if not keyboard or not isinstance(keyboard, list):
return []
result = []
for row in keyboard:
if isinstance(row, list):
valid_row = []
for btn in row:
if isinstance(btn, dict) and 'text' in btn:
valid_row.append(btn)
if valid_row:
result.append(valid_row)
return result
def create_inline_keyboard(buttons: list[list[dict[str, str]]]) -> InlineKeyboardMarkup:
"""
Create an inline keyboard from validated button data.
Args:
buttons: List of button rows, each containing button data dictionaries
Returns:
InlineKeyboardMarkup object ready to be sent with a message
"""
keyboard = []
for row in buttons:
kb_row = []
for btn in row:
text = btn.get('text', '')
callback_data = btn.get('callback_data', text)
url = btn.get('url')
if url:
kb_row.append(InlineKeyboardButton(text=text, url=url))
else:
kb_row.append(InlineKeyboardButton(text=text, callback_data=callback_data))
if kb_row:
keyboard.append(kb_row)
return InlineKeyboardMarkup(keyboard)
def extract_chat_info(msg: Message | None) -> tuple[int | None, str | None]:
"""Extract chat info from message."""
try:
chat = msg.chat if msg else None
if chat and chat.type == ChatType.CHANNEL.value:
return chat.id, chat.title
except AttributeError:
pass
return None, None
def is_valid_url(text: str | None) -> bool:
"""Check if text is a valid URL."""
try:
if not text:
return False
result = urlparse(text.strip())
return bool(result.scheme and result.netloc)
except (AttributeError, Exception):
return False
# User data and keyboard types
UserData = Dict[str, Any]
KeyboardType = List[List[Dict[str, str]]]
def get_message_media_type(msg: Message) -> str | None:
"""Get media type from message."""
if not msg:
return None
# Check message contents
if msg.photo:
return TYPE_PHOTO
elif msg.video:
return TYPE_VIDEO
elif msg.animation:
return TYPE_ANIMATION
return None
def get_message_text(msg: Message) -> str | None:
"""Get text content from message."""
return msg.text if msg and msg.text else None
def get_message_media_text(msg: Message) -> str:
"""Get human-readable media type description."""
media_type = get_message_media_type(msg)
if not media_type:
return 'медиафайл'
return type_map.get(media_type, 'медиафайл')
def get_media_type_str(msg: Message) -> str:
"""Get media type as string."""
media_type = get_message_media_type(msg)
if not media_type:
return 'медиафайл'
return type_map.get(media_type, 'медиафайл')
def get_post_type_text(post_type: str | None) -> str:
"""Get human-readable text for media type."""
if not post_type:
return 'медиафайл'
return type_map.get(str(post_type), 'медиафайл')
def check_media_type(msg: Message) -> str | None:
"""Check message media type."""
if not msg:
return None
if msg.photo:
return TYPE_PHOTO
elif msg.video:
return TYPE_VIDEO
elif msg.animation:
return TYPE_ANIMATION
return None
def get_channel_info(msg: Message) -> tuple[int | None, str | None]:
"""Extract channel info from message."""
if not msg or not msg.chat:
return None, None
chat = msg.chat
if not isinstance(chat, Chat) or chat.type != ChatType.CHANNEL.value:
return None, None
return chat.id, chat.title
def process_message_text(text: str | None) -> list[str]:
"""Process and split message text into lines."""
if not text:
return []
try:
return [line for line in text.strip().split("\n") if line]
except AttributeError:
return []
def process_keyboard_data(data: Any) -> list[list[dict[str, str]]]:
"""Process and validate keyboard data."""
if not data or not isinstance(data, dict):
return []
keyboard = data.get('keyboard', [])
if not isinstance(keyboard, list):
return []
return [
[btn for btn in row if isinstance(btn, dict)]
for row in keyboard
if isinstance(row, list)
]
def count_keyboard_buttons(data: Any) -> int:
"""Count total buttons in keyboard data."""
keyboard = process_keyboard_data(data)
return sum(len(row) for row in keyboard)
def extract_message_text(msg: Message) -> str | None:
"""Extract text content from message."""
return msg.text if msg and hasattr(msg, 'text') else None
def check_url(text: str) -> bool:
"""Check if text is a valid URL."""
if not text:
return False
try:
result = urlparse(text)
return bool(result.scheme and result.netloc)
except Exception:
return False
from app.models.bot import Bot as BotModel
from app.models.channel import Channel as ChannelModel
from app.bots.editor.states import BotStates
# Type hints
PostType = Union[MessageType.PHOTO, MessageType.VIDEO, MessageType.ANIMATION, str, None]
UserData = Dict[str, Any]
type_map = {
MessageType.PHOTO: 'фотографию',
MessageType.VIDEO: 'видео',
MessageType.ANIMATION: 'анимацию'
}
from app.db.session import async_session_maker
from app.models.bot import Bot as BotModel
from app.models.channel import Channel as ChannelModel
from app.services.telegram import validate_bot_token
from ..states.base import BotStates
from .messages import Messages
from .session import UserSession
async def start(update: Update, context: CallbackContext) -> int:
"""
Обработчик команды /start. Показывает приветственное сообщение.
"""
message = update.effective_message
if not message:
return BotStates.CONVERSATION_END
await message.reply_text(Messages.WELCOME_MESSAGE)
return BotStates.CONVERSATION_END
async def help_command(update: Update, context: CallbackContext) -> int:
"""
Обработчик команды /help. Показывает справочное сообщение.
"""
message = update.effective_message
if not message:
return BotStates.CONVERSATION_END
await message.reply_text(Messages.HELP_MESSAGE)
return BotStates.CONVERSATION_END
async def newpost(update: Update, context: CallbackContext) -> int:
"""
Начало создания нового поста.
Команда: /newpost
"""
message = update.effective_message
user = update.effective_user
if not message or not user:
return BotStates.CONVERSATION_END
try:
# Получаем список каналов пользователя
async with async_session_maker() as session:
result = await session.execute(
select(ChannelModel).where(ChannelModel.owner_id == user.id)
)
channels = list(result.scalars())
if not channels:
await message.reply_text(
"У вас нет добавленных каналов.\n"
"Используйте /add_channel чтобы добавить канал."
)
return BotStates.CONVERSATION_END
# Создаем клавиатуру выбора канала
keyboard = []
for channel in channels:
if channel.bot:
title = f"{channel.title or channel.chat_id} (@{channel.bot.username})"
else:
title = str(channel.chat_id)
keyboard.append([
InlineKeyboardButton(
title,
callback_data=f"channel:{channel.id}"
)
])
markup = InlineKeyboardMarkup(keyboard)
await message.reply_text(
"📢 Выберите канал для публикации:",
reply_markup=markup
)
return BotStates.CHOOSE_CHANNEL
except Exception as e:
await message.reply_text(
"❌ Произошла ошибка при получении списка каналов."
)
return BotStates.CONVERSATION_END
async def choose_channel(update: Update, context: CallbackContext) -> int:
"""
Обработка выбора канала.
"""
query = update.callback_query
if not query or not query.data:
return BotStates.CONVERSATION_END
try:
await query.answer()
channel_id = int(query.data.split(":", 1)[1])
# Сохраняем выбранный канал
if isinstance(context.user_data, dict):
context.user_data["channel_id"] = channel_id
# Показываем выбор типа поста
keyboard = [
[InlineKeyboardButton("📝 Текст", callback_data="type:text")],
[InlineKeyboardButton("🖼️ Фото", callback_data="type:photo")],
[InlineKeyboardButton("🎥 Видео", callback_data="type:video")],
[InlineKeyboardButton("📹 Анимация", callback_data="type:animation")],
]
markup = InlineKeyboardMarkup(keyboard)
await query.edit_message_text(
"📝 Выберите тип поста:",
reply_markup=markup
)
return BotStates.CHOOSE_TYPE
except ValueError:
await query.edit_message_text("❌ Неверный формат данных")
return BotStates.CONVERSATION_END
except Exception as e:
await query.edit_message_text("❌ Произошла ошибка.")
return BotStates.CONVERSATION_END
async def choose_type(update: Update, context: CallbackContext) -> int:
"""
Обработка выбора типа поста.
"""
query = update.callback_query
if not query or not query.data or not isinstance(context.user_data, dict):
return BotStates.CONVERSATION_END
try:
await query.answer()
post_type = query.data.split(":", 1)[1]
context.user_data["post_type"] = post_type
# Спрашиваем формат текста
keyboard = [
[InlineKeyboardButton("📄 Обычный текст", callback_data="fmt:plain")],
[InlineKeyboardButton("🔠 Markdown", callback_data="fmt:markdown")],
[InlineKeyboardButton("🌐 HTML", callback_data="fmt:html")],
]
markup = InlineKeyboardMarkup(keyboard)
await query.edit_message_text(
"📝 Выберите формат текста:",
reply_markup=markup
)
return BotStates.CHOOSE_FORMAT
except ValueError:
await query.edit_message_text("❌ Неверный формат данных")
return BotStates.CONVERSATION_END
except Exception as e:
await query.edit_message_text("❌ Произошла ошибка.")
return BotStates.CONVERSATION_END
async def choose_format(update: Update, context: CallbackContext) -> int:
"""
Обработка выбора формата текста.
"""
query = update.callback_query
if not query or not query.data or not isinstance(context.user_data, dict):
return BotStates.CONVERSATION_END
try:
await query.answer()
text_format = query.data.split(":", 1)[1]
context.user_data["text_format"] = text_format
keyboard = [
[InlineKeyboardButton("📋 Из шаблона", callback_data="tpl:choose")]
]
markup = InlineKeyboardMarkup(keyboard)
await query.edit_message_text(
"📝 Отправьте текст поста\n"
"Или выберите шаблон:",
reply_markup=markup
)
return BotStates.ENTER_TEXT
except ValueError:
await query.edit_message_text("❌ Неверный формат данных")
return BotStates.CONVERSATION_END
except Exception as e:
await query.edit_message_text("❌ Произошла ошибка.")
return BotStates.CONVERSATION_END
async def enter_text(update: Update, context: CallbackContext) -> int:
"""
Обработка ввода текста поста.
"""
message = update.effective_message
if not message or not message.text or not isinstance(context.user_data, dict):
return BotStates.CONVERSATION_END
text = message.text
post_type = context.user_data.get("post_type")
if not post_type:
await message.reply_text("❌ Ошибка: тип поста не выбран")
return BotStates.CONVERSATION_END
context.user_data["text"] = text
if post_type == "text":
# Для текстового поста сразу переходим к клавиатуре
await message.reply_text(
"⌨️ Отправьте разметку клавиатуры в формате:\n"
"Кнопка 1 -> http://example1.com\n"
"Кнопка 2 -> http://example2.com\n\n"
"Или /skip чтобы пропустить"
)
return BotStates.EDIT_KEYBOARD
else:
# Для медиа постов запрашиваем файл
msg = {
"photo": "🖼️ Отправьте фотографию",
"video": "🎥 Отправьте видео",
"animation": "📹 Отправьте анимацию"
}
await message.reply_text(msg.get(post_type, "❌ Неизвестный тип поста"))
return BotStates.ENTER_MEDIA
async def add_bot_start(update: Update, context: CallbackContext) -> int:
"""
Начало процесса добавления нового бота.
Команда: /add_bot
"""
message = update.effective_message
if not message:
return BotStates.CONVERSATION_END
await message.reply_text(
"🤖 Отправьте токен бота, полученный от @BotFather\n"
"Или /cancel для отмены"
)
return BotStates.BOT_TOKEN
async def add_bot_token(update: Update, context: CallbackContext) -> int:
"""
Обработка токена бота.
"""
message = update.effective_message
user = update.effective_user
if not message or not message.text or not user:
return BotStates.CONVERSATION_END
token = message.text.strip()
# Валидация токена
is_valid, username, bot_id = await validate_bot_token(token)
if not is_valid or not username or not bot_id:
await message.reply_text(
"❌ Неверный токен бота. Проверьте токен и попробуйте снова.\n"
"Или /cancel для отмены"
)
return BotStates.BOT_TOKEN
try:
# Проверяем, не добавлен ли уже этот бот
async with async_session_maker() as session:
existing = await session.execute(
select(BotModel).where(BotModel.bot_id == bot_id)
)
if existing.scalar_one_or_none():
await message.reply_text(
"❌ Этот бот уже добавлен в систему."
)
return BotStates.CONVERSATION_END
# Сохраняем бота
new_bot = BotModel(
owner_id=user.id,
bot_id=bot_id,
username=username,
token=token,
)
session.add(new_bot)
await session.commit()
await message.reply_text(
f"✅ Бот @{username} успешно добавлен!\n\n"
"Теперь вы можете:\n"
"1. Добавить каналы через /add_channel\n"
"2. Создать шаблоны через /tpl_new\n"
"3. Начать создание поста через /newpost"
)
return BotStates.CONVERSATION_END
except Exception as e:
await message.reply_text(
"❌ Произошла ошибка при добавлении бота. Попробуйте позже."
)
return BotStates.CONVERSATION_END
async def list_bots(update: Update, context: CallbackContext) -> None:
"""
Список ботов пользователя.
Команда: /bots
"""
message = update.effective_message
user = update.effective_user
if not message or not user:
return
try:
async with async_session_maker() as session:
result = await session.execute(
select(BotModel).where(BotModel.owner_id == user.id)
)
bots = list(result.scalars())
if not bots:
await message.reply_text(
"🤖 У вас пока нет добавленных ботов.\n"
"Используйте /add_bot чтобы добавить бота."
)
return
text = "🤖 Ваши боты:\n\n"
for bot in bots:
text += f"@{bot.username}\n"
await message.reply_text(text)
except Exception as e:
await message.reply_text(
"❌ Произошла ошибка при получении списка ботов."
)
async def enter_media(update: Update, context: CallbackContext) -> int:
"""
Обработка загрузки медиафайла.
Args:
update: Telegram update
context: Контекст с данными пользователя
Returns:
Следующее состояние разговора
"""
message = update.effective_message
if not message or not isinstance(context.user_data, dict):
return BotStates.CONVERSATION_END
# Проверяем тип медиа
post_type = context.user_data.get("post_type")
file_id = None
if post_type == TYPE_PHOTO and message.photo:
# Берем самую большую версию фото
file_id = message.photo[-1].file_id
elif post_type == TYPE_VIDEO and message.video:
file_id = message.video.file_id
elif post_type == TYPE_ANIMATION and message.animation:
file_id = message.animation.file_id
if not file_id:
media_type = get_media_type_text(post_type)
await message.reply_text(
f"❌ Пожалуйста, отправьте {media_type}"
)
return BotStates.ENTER_MEDIA
# Сохраняем файл
context.user_data["media_file_id"] = file_id
# Переходим к клавиатуре
await message.reply_text(
"⌨️ Отправьте разметку клавиатуры в формате:\n"
"Кнопка 1 -> http://example1.com\n"
"Кнопка 2 -> http://example2.com\n\n"
"Или /skip чтобы пропустить"
)
return BotStates.EDIT_KEYBOARD
async def edit_keyboard(update: Update, context: CallbackContext) -> int:
"""
Обработка разметки клавиатуры.
Args:
update: Telegram update объект
context: Контекст обработчика с user_data типа UserData
Returns:
Следующее состояние разговора
"""
message = update.effective_message
if not message or not isinstance(context.user_data, dict):
return BotStates.CONVERSATION_END
text = message.text
if not text:
await message.reply_text("❌ Отправьте текст разметки клавиатуры или /skip для пропуска")
return BotStates.EDIT_KEYBOARD
# Пропускаем клавиатуру
if text == "/skip":
context.user_data["keyboard"] = None
else:
# Парсим разметку
keyboard: list[list[ButtonData]] = []
row: list[ButtonData] = []
try:
# Обрабатываем каждую строку разметки
for line in text.strip().split("\n"):
if not line:
continue
# Разбираем строку на текст и URL
parts = line.split("->", 1)
if len(parts) != 2:
raise ValueError("Неверный формат кнопки. Используйте: Текст -> URL")
label = parts[0].strip()
url = parts[1].strip()
# Проверяем URL
if not validate_url(url):
raise ValueError(f"Недопустимый URL: {url}")
# Проверяем длину текста кнопки
if len(label) > 64:
raise ValueError(f"Текст кнопки слишком длинный (макс. 64 символа): {label}")
# Добавляем кнопку в текущий ряд
row.append({"text": label, "url": url})
# Переход на новую строку после 2 кнопок
if len(row) >= 2: # Максимум 2 кнопки в ряду
keyboard.append(row)
row = []
# Добавляем оставшиеся кнопки
if row:
keyboard.append(row)
# Проверяем общее количество кнопок
total_buttons = sum(len(row) for row in keyboard)
if total_buttons > 10:
raise ValueError("Слишком много кнопок (максимум 10)")
context.user_data["keyboard"] = keyboard
except ValueError as e:
await message.reply_text(f"❌ Ошибка в разметке клавиатуры: {str(e)}")
return BotStates.EDIT_KEYBOARD
# Готовим предпросмотр
if not isinstance(context.user_data, dict):
return BotStates.CONVERSATION_END
preview_text = context.user_data.get("text", "")
post_type = context.user_data.get("post_type")
# Обрезаем длинный текст
if len(preview_text) > 50:
preview_text = preview_text[:47] + "..."
# Создаем клавиатуру выбора действия
keyboard_data = [
[{"text": "✅ Отправить сейчас", "callback_data": "send:now"}],
[{"text": "⏰ Отложить", "callback_data": "send:schedule"}]
]
markup = create_inline_keyboard(keyboard_data)
# Информация о медиафайле
media_info = ""
if post_type:
media_info = f"\n📎 Прикреплен файл: {get_media_type_text(post_type)}"
# Информация о клавиатуре
kb_info = ""
keyboard = context.user_data.get("keyboard", [])
if keyboard and isinstance(keyboard, list):
try:
kb_count = sum(len(row) for row in keyboard)
kb_info = f"\n⌨️ Клавиатура: {kb_count} кнопок"
except (TypeError, AttributeError):
pass
await message.reply_text(
f"📝 Предпросмотр:\n\n{preview_text}\n{media_info}{kb_info}\n\n"
"Выберите действие:",
reply_markup=markup
)
return BotStates.CONFIRM_SEND
async def confirm_send(update: Update, context: CallbackContext) -> int:
"""
Обработка подтверждения отправки.
"""
query = update.callback_query
if not query or not query.data or not isinstance(context.user_data, dict):
return BotStates.CONVERSATION_END
try:
await query.answer()
action = query.data.split(":", 1)[1]
if action == "now":
# TODO: Отправка поста
await query.edit_message_text(
"✅ Пост успешно отправлен!"
)
return BotStates.CONVERSATION_END
elif action == "schedule":
await query.edit_message_text(
"⏰ Отправьте дату и время публикации в формате:\n"
"ДД.ММ.ГГГГ ЧЧ:ММ\n"
"Например: 31.12.2025 23:59"
)
return BotStates.ENTER_SCHEDULE
else:
await query.edit_message_text("❌ Неизвестное действие")
return BotStates.CONVERSATION_END
except (ValueError, AttributeError):
await query.edit_message_text("❌ Неверный формат данных")
return BotStates.CONVERSATION_END
except Exception:
await query.edit_message_text("❌ Произошла ошибка")
return BotStates.CONVERSATION_END
async def enter_schedule(update: Update, context: CallbackContext) -> int:
"""
Обработка времени отложенной публикации.
Args:
update: Telegram update
context: Контекст с данными пользователя
Returns:
Следующее состояние разговора
"""
message = update.effective_message
if not message or not message.text:
return BotStates.CONVERSATION_END
try:
# Парсим дату и время
text = message.text.strip()
dt = datetime.strptime(text, "%d.%m.%Y %H:%M")
now = datetime.now()
if dt <= now:
await message.reply_text(
"❌ Дата публикации должна быть в будущем"
)
return BotStates.ENTER_SCHEDULE
if isinstance(context.user_data, dict):
context.user_data["schedule"] = dt.isoformat()
# TODO: Сохранение отложенного поста
await message.reply_text(
f"✅ Пост запланирован на {dt.strftime('%d.%m.%Y %H:%M')}"
)
return BotStates.CONVERSATION_END
except ValueError:
await message.reply_text(
"❌ Неверный формат даты и времени.\n"
"Используйте формат ДД.ММ.ГГГГ ЧЧ:ММ"
)
return BotStates.ENTER_SCHEDULE
return BotStates.CONVERSATION_END
async def add_channel_start(update: Update, context: CallbackContext) -> int:
"""
Начало процесса добавления канала.
Команда: /add_channel
"""
message = update.effective_message
user = update.effective_user
if not message or not user:
return BotStates.CONVERSATION_END
try:
# Получаем список ботов пользователя
async with async_session_maker() as session:
result = await session.execute(
select(BotModel).where(BotModel.owner_id == user.id)
)
bots = list(result.scalars())
if not bots:
await message.reply_text(
"❌ Сначала добавьте бота через /add_bot"
)
return BotStates.CONVERSATION_END
# Создаем клавиатуру выбора бота
keyboard = []
for bot in bots:
keyboard.append([
InlineKeyboardButton(
f"@{bot.username}",
callback_data=f"addch_bot:{bot.id}"
)
])
markup = InlineKeyboardMarkup(keyboard)
await message.reply_text(
"🤖 Выберите бота для добавления канала:",
reply_markup=markup
)
return BotStates.CHANNEL_SELECT_BOT
except Exception as e:
await message.reply_text(
"❌ Произошла ошибка. Попробуйте позже."
)
return BotStates.CONVERSATION_END
async def add_channel_bot_selected(update: Update, context: CallbackContext) -> int:
"""
Обработка выбора бота для канала.
"""
query = update.callback_query
user = update.effective_user
if not query or not query.data or not user:
return BotStates.CONVERSATION_END
try:
await query.answer()
bot_id = int(query.data.split(":", 1)[1])
# Сохраняем выбранного бота в контекст
if isinstance(context.user_data, dict):
context.user_data["selected_bot_id"] = bot_id
await query.edit_message_text(
"📢 Добавьте бота в канал как администратора и отправьте:\n\n"
"1. Username канала (например @channel)\n"
"2. Или ID канала (например -100...)\n"
"3. Или перешлите любое сообщение из канала\n\n"
"Или /cancel для отмены"
)
return BotStates.CHANNEL_INPUT
except ValueError:
await query.edit_message_text("❌ Неверный формат данных")
return BotStates.CONVERSATION_END
except Exception as e:
await query.edit_message_text("❌ Произошла ошибка. Попробуйте позже.")
return BotStates.CONVERSATION_END
async def add_channel_input(update: Update, context: CallbackContext) -> int:
"""
Обработка ввода канала.
"""
message = update.effective_message
user = update.effective_user
if not message or not user or not isinstance(context.user_data, dict):
return BotStates.CONVERSATION_END
bot_id = context.user_data.get("selected_bot_id")
if not bot_id:
await message.reply_text("❌ Ошибка: не выбран бот")
return BotStates.CONVERSATION_END
try:
# Определяем ID канала
channel_id: Optional[int] = None
# Проверяем информацию о канале
channel_id, channel_title = extract_chat_info(message)
# Если не нашли, пробуем из текста
if not channel_id:
text = extract_text(message)
if text:
if text.startswith('@'):
# TODO: Получить chat_id по username через Bot API
pass
elif text.startswith('-100'):
try:
channel_id = int(text)
except ValueError:
pass
if not channel_id:
await message.reply_text(
"Не удалось определить ID канала.\n"
"Попробуйте переслать сообщение из канала\n"
"Или /cancel для отмены"
)
return BotStates.CHANNEL_INPUT
# Проверяем права бота в канале
async with async_session_maker() as session:
bot_result = await session.execute(
select(BotModel).where(BotModel.id == bot_id)
)
bot = bot_result.scalar_one_or_none()
if not bot:
await message.reply_text("❌ Ошибка: бот не найден")
return BotStates.CONVERSATION_END
# TODO: Проверить права бота в канале через Bot API
# Проверяем, не добавлен ли уже канал
existing = await session.execute(
select(ChannelModel).where(
ChannelModel.chat_id == channel_id,
ChannelModel.bot_id == bot_id
)
)
if existing.scalar_one_or_none():
await message.reply_text("❌ Этот канал уже добавлен")
return BotStates.CONVERSATION_END
# Сохраняем канал
new_channel = ChannelModel(
owner_id=user.id,
bot_id=bot_id,
chat_id=channel_id,
title=channel_title if channel_title else None
)
session.add(new_channel)
await session.commit()
await message.reply_text(
"✅ Канал успешно добавлен!\n"
"Теперь вы можете создать пост через /newpost"
)
return BotStates.CONVERSATION_END
except Exception as e:
await message.reply_text(
"❌ Произошла ошибка при добавлении канала.\n"
"Убедитесь, что бот добавлен в канал как администратор."
)
return BotStates.CONVERSATION_END
async def list_channels(update: Update, context: CallbackContext) -> None:
"""
Список каналов пользователя.
Команда: /channels
"""
message = update.effective_message
user = update.effective_user
if not message or not user:
return
try:
async with async_session_maker() as session:
result = await session.execute(
select(ChannelModel)
.where(ChannelModel.owner_id == user.id)
.order_by(ChannelModel.bot_id)
)
channels = list(result.scalars())
if not channels:
await message.reply_text(
"📢 У вас пока нет добавленных каналов.\n"
"Используйте /add_channel чтобы добавить канал."
)
return
text = "📢 Ваши каналы:\n\n"
current_bot_id = None
for channel in channels:
if channel.bot_id != current_bot_id:
current_bot_id = channel.bot_id
if channel.bot:
text += f"\n🤖 Бот @{channel.bot.username}:\n"
title = channel.title or str(channel.chat_id)
text += f"- {title}\n"
await message.reply_text(text)
except Exception as e:
await message.reply_text(
"❌ Произошла ошибка при получении списка каналов."
)