from __future__ import annotations from typing import Any, Optional, Iterable, Set from jinja2.sandbox import SandboxedEnvironment from jinja2 import StrictUndefined, TemplateError, Environment, meta from sqlalchemy import select from app.db.session import async_session_maker from app.models.templates import Template # ---- Jinja2 окружение для рендера (sandbox) ---- _env = SandboxedEnvironment(autoescape=False, undefined=StrictUndefined) # Безопасные фильтры для подстановок import html as _py_html def escape_mdv2(value: str) -> str: if value is None: return "" s = str(value) for ch in ["_","*","[","]","(",")","~","`",">","#","+","-","=","|","{","}",".","!"]: s = s.replace(ch, f"\{ch}") return s def escape_html(value: str) -> str: if value is None: return "" return _py_html.escape(str(value), quote=False) _env.filters["mdv2"] = escape_mdv2 _env.filters["html"] = escape_html _env.filters["upper"] = str.upper _env.filters["lower"] = str.lower # ---- Служебное окружение для анализа AST шаблона (поиск переменных) ---- _ast_env = Environment() # -------- БД операции -------- async def list_templates(owner_id: int, limit: int = 10, offset: int = 0, q: str | None = None) -> list[Template]: async with async_session_maker() as s: stmt = select(Template).where(Template.owner_id == owner_id, Template.is_archived.is_(False)) if q: # простейший поиск по name/title (SQLite/MariaDB — регистр может зависеть от коллаций) like = f"%{q}%" from sqlalchemy import or_ stmt = stmt.where(or_(Template.name.ilike(like), Template.title.ilike(like))) stmt = stmt.order_by(Template.updated_at.desc()).limit(limit).offset(offset) res = await s.execute(stmt) return list(res.scalars()) async def count_templates(owner_id: int, q: str | None = None) -> int: async with async_session_maker() as s: from sqlalchemy import func, or_ stmt = select(func.count()).select_from(Template).where(Template.owner_id == owner_id, Template.is_archived.is_(False)) if q: like = f"%{q}%" stmt = stmt.where(or_(Template.name.ilike(like), Template.title.ilike(like))) res = await s.execute(stmt) return int(res.scalar_one()) async def get_template_by_name(owner_id: int, name: str) -> Optional[Template]: async with async_session_maker() as s: res = await s.execute( select(Template) .where(Template.owner_id == owner_id, Template.name == name, Template.is_archived.is_(False)) .limit(1) ) return res.scalars().first() async def create_template(owner_id: int, name: str, title: str | None, type_: str, content: str, keyboard_tpl: list[dict] | None, parse_mode: str | None) -> int: async with async_session_maker() as s: tpl = Template( owner_id=owner_id, name=name, title=title, type=type_, content=content, keyboard_tpl=keyboard_tpl, parse_mode=parse_mode or "HTML", ) s.add(tpl) await s.commit() await s.refresh(tpl) return tpl.id async def delete_template(owner_id: int, tpl_id: int) -> bool: async with async_session_maker() as s: res = await s.execute(select(Template).where(Template.id == tpl_id, Template.owner_id == owner_id)) tpl = res.scalars().first() if not tpl: return False await s.delete(tpl) await s.commit() return True # -------- Рендер и анализ переменных -------- def _render_text(src: str, ctx: dict[str, Any]) -> str: return _env.from_string(src).render(**ctx) def _render_keyboard(kb_tpl: list[dict] | None, ctx: dict[str, Any]) -> list[list[dict]] | None: if not kb_tpl: return None rows: list[list[dict]] = [] for btn in kb_tpl: text = _render_text(str(btn.get("text", "")), ctx) url = _render_text(str(btn.get("url", "")), ctx) rows.append([{"text": text, "url": url}]) return rows def _collect_vars_from_str(src: str) -> Set[str]: try: ast = _ast_env.parse(src or "") vars_ = meta.find_undeclared_variables(ast) # исключим служебные имена (loop, etc.) — обычно они не используются как внешние return {v for v in vars_ if not v.startswith("loop")} except Exception: return set() def required_variables_of_template(content: str, keyboard_tpl: list[dict] | None) -> Set[str]: req = _collect_vars_from_str(content) if keyboard_tpl: for btn in keyboard_tpl: req |= _collect_vars_from_str(str(btn.get("text", ""))) req |= _collect_vars_from_str(str(btn.get("url", ""))) return req async def render_template_by_name(owner_id: int, name: str, ctx: dict[str, Any]) -> dict: tpl = await get_template_by_name(owner_id, name) if not tpl: raise LookupError("Template not found") content = _render_text(tpl.content, ctx) keyboard_rows = _render_keyboard(tpl.keyboard_tpl, ctx) return { "type": tpl.type, "text": content, "parse_mode": tpl.parse_mode, "keyboard_rows": keyboard_rows, "_required": list(required_variables_of_template(tpl.content, tpl.keyboard_tpl or [])), }