Files
tg_autopost/posts/utils.py
2025-08-08 12:21:25 +09:00

171 lines
7.2 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# posts/utils.py
import asyncio
import logging
from contextlib import ExitStack
from typing import Iterable, Optional, List
from django.template import Template, Context
from telegram import (
InlineKeyboardButton,
InlineKeyboardMarkup,
InputMediaPhoto,
InputMediaVideo,
InputMediaDocument,
Bot,
)
from telegram.error import TelegramError
# ⬇️ берём фабрику приложений и доступ к активному боту
from bot.bot_factory import (
get_first_active_bot_with_config,
build_application_for_bot,
)
logger = logging.getLogger(__name__)
# Кэш Application по bot.id, чтобы не собирать каждый раз
_APP_CACHE = {}
def _get_or_build_application():
"""
Возвращает PTB Application для первого активного бота.
Если в проекте несколько ботов — логично в моделях Post хранить FK на TelegramBot
и сделать аналогичную функцию _get_or_build_application_for(bot).
"""
tb, cfg = get_first_active_bot_with_config()
app = _APP_CACHE.get(tb.id)
if app is None:
app, _allowed = build_application_for_bot(tb, cfg)
_APP_CACHE[tb.id] = app
logger.info("Application создан и закеширован для бота id=%s @%s", tb.id, tb.username or "")
return app
def _ensure_run(coro):
"""
Безопасно выполнить async-корутину из синхронного кода (Django view, Celery task).
Если цикл уже есть — используем его, иначе создаём свой.
"""
try:
loop = asyncio.get_running_loop()
except RuntimeError:
loop = None
if loop and loop.is_running():
# Мы уже внутри event loop (редко, но бывает) — запускаем как таску и ждём
return asyncio.run_coroutine_threadsafe(coro, loop).result()
else:
return asyncio.run(coro)
def render_post_text(post) -> str:
"""
Рендер текста поста:
- Если есть шаблон, подставляем variables через Django Template;
- Иначе берём post.text.
"""
if getattr(post, "template", None):
try:
tpl = Template(post.template.text)
return tpl.render(Context(post.variables or {}))
except Exception as e:
logger.exception("Ошибка рендера шаблона поста #%s: %s", post.pk, e)
return post.text or ""
return post.text or ""
def build_keyboard(buttons) -> Optional[InlineKeyboardMarkup]:
"""
Ожидает JSON-структуру:
[
[ {"text":"Купить","url":"https://..."}, {"text":"Подробнее","callback_data":"more"} ],
[ {"text":"Связаться","callback_data":"contact"} ]
]
"""
if not buttons:
return None
try:
rows = [[InlineKeyboardButton(**btn) for btn in row] for row in buttons]
return InlineKeyboardMarkup(rows)
except Exception as e:
logger.exception("Ошибка построения клавиатуры: %s", e)
return None
async def _send_async(bot: Bot, chat_id: int, post, text: str, kb: Optional[InlineKeyboardMarkup]):
"""
Асинхронная отправка поста:
- media_group если >1,
- одиночное медиа,
- либо текст.
Важно: аккуратно закрываем открытые файловые дескрипторы.
"""
media_items: List = list(post.media.all()) if hasattr(post, "media") else []
parse_mode = None if getattr(post, "parse_mode", None) in (None, "None") else post.parse_mode
# Откроем все локальные файлы в одном стеке, чтобы гарантировать закрытие
with ExitStack() as stack:
def _payload(m):
# либо file_id, либо открываем local_path
if getattr(m, "file_id", None):
return m.file_id
if getattr(m, "local_path", None):
return stack.enter_context(open(m.local_path, "rb"))
raise ValueError("MediaItem без file_id и local_path")
if len(media_items) > 1:
group = []
for i, m in enumerate(media_items):
caption = text if i == 0 else None
if m.type == "photo":
group.append(InputMediaPhoto(media=_payload(m), caption=caption, parse_mode=parse_mode))
elif m.type == "video":
group.append(InputMediaVideo(media=_payload(m), caption=caption, parse_mode=parse_mode))
elif m.type == "document":
group.append(InputMediaDocument(media=_payload(m), caption=caption, parse_mode=parse_mode))
else:
raise ValueError(f"Неизвестный тип медиа: {m.type}")
await bot.send_media_group(chat_id=chat_id, media=group)
# В media_group нельзя прикрепить inline-кнопки — отправим пустое техсообщение с клавиатурой
if kb:
await bot.send_message(chat_id=chat_id, text=" ", reply_markup=kb)
elif len(media_items) == 1:
m = media_items[0]
if m.type == "photo":
await bot.send_photo(chat_id=chat_id, photo=_payload(m), caption=text or None,
parse_mode=parse_mode, reply_markup=kb)
elif m.type == "video":
await bot.send_video(chat_id=chat_id, video=_payload(m), caption=text or None,
parse_mode=parse_mode, reply_markup=kb)
elif m.type == "document":
await bot.send_document(chat_id=chat_id, document=_payload(m), caption=text or None,
parse_mode=parse_mode, reply_markup=kb)
else:
raise ValueError(f"Неизвестный тип медиа: {m.type}")
else:
await bot.send_message(chat_id=chat_id, text=text, parse_mode=parse_mode, reply_markup=kb)
def send_post_to_chat(chat_id: int, post, text: str, kb: Optional[InlineKeyboardMarkup] = None):
"""
Синхронная обёртка для отправки поста (удобно звать из Celery/Django).
Внутри строим/берём Application у активного бота через bot_factory
и выполняем асинхронную отправку.
"""
app = _get_or_build_application()
bot = app.bot # Bot объект PTB v20
try:
_ensure_run(_send_async(bot, chat_id, post, text, kb))
logger.info("Пост #%s отправлен в чат %s", post.pk, chat_id)
except TelegramError as e:
logger.exception("Ошибка отправки поста #%s в чат %s: %s", post.pk, chat_id, e)
raise
except Exception as e:
logger.exception("Непредвиденная ошибка при отправке поста #%s: %s", post.pk, e)
raise