# posts/utils.py import asyncio import logging from contextlib import ExitStack from typing import Iterable, Optional, List from django.template import Template, Context from telegram import ( InlineKeyboardButton, InlineKeyboardMarkup, InputMediaPhoto, InputMediaVideo, InputMediaDocument, Bot, ) from telegram.error import TelegramError # ⬇️ берём фабрику приложений и доступ к активному боту from bot.bot_factory import ( get_first_active_bot_with_config, build_application_for_bot, ) logger = logging.getLogger(__name__) # Кэш Application по bot.id, чтобы не собирать каждый раз _APP_CACHE = {} def _get_or_build_application(): """ Возвращает PTB Application для первого активного бота. Если в проекте несколько ботов — логично в моделях Post хранить FK на TelegramBot и сделать аналогичную функцию _get_or_build_application_for(bot). """ tb, cfg = get_first_active_bot_with_config() app = _APP_CACHE.get(tb.id) if app is None: app, _allowed = build_application_for_bot(tb, cfg) _APP_CACHE[tb.id] = app logger.info("Application создан и закеширован для бота id=%s @%s", tb.id, tb.username or "—") return app def _ensure_run(coro): """ Безопасно выполнить async-корутину из синхронного кода (Django view, Celery task). Если цикл уже есть — используем его, иначе создаём свой. """ try: loop = asyncio.get_running_loop() except RuntimeError: loop = None if loop and loop.is_running(): # Мы уже внутри event loop (редко, но бывает) — запускаем как таску и ждём return asyncio.run_coroutine_threadsafe(coro, loop).result() else: return asyncio.run(coro) def render_post_text(post) -> str: """ Рендер текста поста: - Если есть шаблон, подставляем variables через Django Template; - Иначе берём post.text. """ if getattr(post, "template", None): try: tpl = Template(post.template.text) return tpl.render(Context(post.variables or {})) except Exception as e: logger.exception("Ошибка рендера шаблона поста #%s: %s", post.pk, e) return post.text or "" return post.text or "" def build_keyboard(buttons) -> Optional[InlineKeyboardMarkup]: """ Ожидает JSON-структуру: [ [ {"text":"Купить","url":"https://..."}, {"text":"Подробнее","callback_data":"more"} ], [ {"text":"Связаться","callback_data":"contact"} ] ] """ if not buttons: return None try: rows = [[InlineKeyboardButton(**btn) for btn in row] for row in buttons] return InlineKeyboardMarkup(rows) except Exception as e: logger.exception("Ошибка построения клавиатуры: %s", e) return None async def _send_async(bot: Bot, chat_id: int, post, text: str, kb: Optional[InlineKeyboardMarkup]): """ Асинхронная отправка поста: - media_group если >1, - одиночное медиа, - либо текст. Важно: аккуратно закрываем открытые файловые дескрипторы. """ media_items: List = list(post.media.all()) if hasattr(post, "media") else [] parse_mode = None if getattr(post, "parse_mode", None) in (None, "None") else post.parse_mode # Откроем все локальные файлы в одном стеке, чтобы гарантировать закрытие with ExitStack() as stack: def _payload(m): # либо file_id, либо открываем local_path if getattr(m, "file_id", None): return m.file_id if getattr(m, "local_path", None): return stack.enter_context(open(m.local_path, "rb")) raise ValueError("MediaItem без file_id и local_path") if len(media_items) > 1: group = [] for i, m in enumerate(media_items): caption = text if i == 0 else None if m.type == "photo": group.append(InputMediaPhoto(media=_payload(m), caption=caption, parse_mode=parse_mode)) elif m.type == "video": group.append(InputMediaVideo(media=_payload(m), caption=caption, parse_mode=parse_mode)) elif m.type == "document": group.append(InputMediaDocument(media=_payload(m), caption=caption, parse_mode=parse_mode)) else: raise ValueError(f"Неизвестный тип медиа: {m.type}") await bot.send_media_group(chat_id=chat_id, media=group) # В media_group нельзя прикрепить inline-кнопки — отправим пустое техсообщение с клавиатурой if kb: await bot.send_message(chat_id=chat_id, text=" ", reply_markup=kb) elif len(media_items) == 1: m = media_items[0] if m.type == "photo": await bot.send_photo(chat_id=chat_id, photo=_payload(m), caption=text or None, parse_mode=parse_mode, reply_markup=kb) elif m.type == "video": await bot.send_video(chat_id=chat_id, video=_payload(m), caption=text or None, parse_mode=parse_mode, reply_markup=kb) elif m.type == "document": await bot.send_document(chat_id=chat_id, document=_payload(m), caption=text or None, parse_mode=parse_mode, reply_markup=kb) else: raise ValueError(f"Неизвестный тип медиа: {m.type}") else: await bot.send_message(chat_id=chat_id, text=text, parse_mode=parse_mode, reply_markup=kb) def send_post_to_chat(chat_id: int, post, text: str, kb: Optional[InlineKeyboardMarkup] = None): """ Синхронная обёртка для отправки поста (удобно звать из Celery/Django). Внутри строим/берём Application у активного бота через bot_factory и выполняем асинхронную отправку. """ app = _get_or_build_application() bot = app.bot # Bot объект PTB v20 try: _ensure_run(_send_async(bot, chat_id, post, text, kb)) logger.info("Пост #%s отправлен в чат %s", post.pk, chat_id) except TelegramError as e: logger.exception("Ошибка отправки поста #%s в чат %s: %s", post.pk, chat_id, e) raise except Exception as e: logger.exception("Непредвиденная ошибка при отправке поста #%s: %s", post.pk, e) raise