141 lines
5.5 KiB
Python
141 lines
5.5 KiB
Python
from __future__ import annotations
|
|
from typing import Any, Optional, Iterable, Set
|
|
|
|
from jinja2.sandbox import SandboxedEnvironment
|
|
from jinja2 import StrictUndefined, TemplateError, Environment, meta
|
|
from sqlalchemy import select
|
|
|
|
from app.db.session import async_session_maker
|
|
from app.models.templates import Template
|
|
|
|
# ---- Jinja2 окружение для рендера (sandbox) ----
|
|
_env = SandboxedEnvironment(autoescape=False, undefined=StrictUndefined)
|
|
|
|
# Безопасные фильтры для подстановок
|
|
import html as _py_html
|
|
|
|
def escape_mdv2(value: str) -> str:
|
|
if value is None:
|
|
return ""
|
|
s = str(value)
|
|
for ch in ["_","*","[","]","(",")","~","`",">","#","+","-","=","|","{","}",".","!"]:
|
|
s = s.replace(ch, f"\{ch}")
|
|
return s
|
|
|
|
def escape_html(value: str) -> str:
|
|
if value is None:
|
|
return ""
|
|
return _py_html.escape(str(value), quote=False)
|
|
|
|
_env.filters["mdv2"] = escape_mdv2
|
|
_env.filters["html"] = escape_html
|
|
_env.filters["upper"] = str.upper
|
|
_env.filters["lower"] = str.lower
|
|
|
|
# ---- Служебное окружение для анализа AST шаблона (поиск переменных) ----
|
|
_ast_env = Environment()
|
|
|
|
# -------- БД операции --------
|
|
async def list_templates(owner_id: int, limit: int = 10, offset: int = 0, q: str | None = None) -> list[Template]:
|
|
async with async_session_maker() as s:
|
|
stmt = select(Template).where(Template.owner_id == owner_id, Template.is_archived.is_(False))
|
|
if q:
|
|
# простейший поиск по name/title (SQLite/MariaDB — регистр может зависеть от коллаций)
|
|
like = f"%{q}%"
|
|
from sqlalchemy import or_
|
|
stmt = stmt.where(or_(Template.name.ilike(like), Template.title.ilike(like)))
|
|
stmt = stmt.order_by(Template.updated_at.desc()).limit(limit).offset(offset)
|
|
res = await s.execute(stmt)
|
|
return list(res.scalars())
|
|
|
|
async def count_templates(owner_id: int, q: str | None = None) -> int:
|
|
async with async_session_maker() as s:
|
|
from sqlalchemy import func, or_
|
|
stmt = select(func.count()).select_from(Template).where(Template.owner_id == owner_id, Template.is_archived.is_(False))
|
|
if q:
|
|
like = f"%{q}%"
|
|
stmt = stmt.where(or_(Template.name.ilike(like), Template.title.ilike(like)))
|
|
res = await s.execute(stmt)
|
|
return int(res.scalar_one())
|
|
|
|
async def get_template_by_name(owner_id: int, name: str) -> Optional[Template]:
|
|
async with async_session_maker() as s:
|
|
res = await s.execute(
|
|
select(Template)
|
|
.where(Template.owner_id == owner_id, Template.name == name, Template.is_archived.is_(False))
|
|
.limit(1)
|
|
)
|
|
return res.scalars().first()
|
|
|
|
async def create_template(owner_id: int, name: str, title: str | None, type_: str,
|
|
content: str, keyboard_tpl: list[dict] | None, parse_mode: str | None) -> int:
|
|
async with async_session_maker() as s:
|
|
tpl = Template(
|
|
owner_id=owner_id,
|
|
name=name,
|
|
title=title,
|
|
type=type_,
|
|
content=content,
|
|
keyboard_tpl=keyboard_tpl,
|
|
parse_mode=parse_mode or "HTML",
|
|
)
|
|
s.add(tpl)
|
|
await s.commit()
|
|
await s.refresh(tpl)
|
|
return tpl.id
|
|
|
|
async def delete_template(owner_id: int, tpl_id: int) -> bool:
|
|
async with async_session_maker() as s:
|
|
res = await s.execute(select(Template).where(Template.id == tpl_id, Template.owner_id == owner_id))
|
|
tpl = res.scalars().first()
|
|
if not tpl:
|
|
return False
|
|
await s.delete(tpl)
|
|
await s.commit()
|
|
return True
|
|
|
|
# -------- Рендер и анализ переменных --------
|
|
|
|
def _render_text(src: str, ctx: dict[str, Any]) -> str:
|
|
return _env.from_string(src).render(**ctx)
|
|
|
|
def _render_keyboard(kb_tpl: list[dict] | None, ctx: dict[str, Any]) -> list[list[dict]] | None:
|
|
if not kb_tpl:
|
|
return None
|
|
rows: list[list[dict]] = []
|
|
for btn in kb_tpl:
|
|
text = _render_text(str(btn.get("text", "")), ctx)
|
|
url = _render_text(str(btn.get("url", "")), ctx)
|
|
rows.append([{"text": text, "url": url}])
|
|
return rows
|
|
|
|
def _collect_vars_from_str(src: str) -> Set[str]:
|
|
try:
|
|
ast = _ast_env.parse(src or "")
|
|
vars_ = meta.find_undeclared_variables(ast)
|
|
# исключим служебные имена (loop, etc.) — обычно они не используются как внешние
|
|
return {v for v in vars_ if not v.startswith("loop")}
|
|
except Exception:
|
|
return set()
|
|
|
|
def required_variables_of_template(content: str, keyboard_tpl: list[dict] | None) -> Set[str]:
|
|
req = _collect_vars_from_str(content)
|
|
if keyboard_tpl:
|
|
for btn in keyboard_tpl:
|
|
req |= _collect_vars_from_str(str(btn.get("text", "")))
|
|
req |= _collect_vars_from_str(str(btn.get("url", "")))
|
|
return req
|
|
|
|
async def render_template_by_name(owner_id: int, name: str, ctx: dict[str, Any]) -> dict:
|
|
tpl = await get_template_by_name(owner_id, name)
|
|
if not tpl:
|
|
raise LookupError("Template not found")
|
|
content = _render_text(tpl.content, ctx)
|
|
keyboard_rows = _render_keyboard(tpl.keyboard_tpl, ctx)
|
|
return {
|
|
"type": tpl.type,
|
|
"text": content,
|
|
"parse_mode": tpl.parse_mode,
|
|
"keyboard_rows": keyboard_rows,
|
|
"_required": list(required_variables_of_template(tpl.content, tpl.keyboard_tpl or [])),
|
|
} |