Add custom emoji mapping system for premium emoji support
Some checks failed
continuous-integration/drone/pr Build is failing

- Create emoji_mappings table to store emoji->emoji_id mappings
- Add EmojiMappingService for managing emoji registration and replacement
- Add admin emoji handlers (/add_emoji, /my_emojis, /delete_emoji, /all_emojis)
- Create emoji message helper for automatic emoji processing
- Add Alembic migration for emoji_mappings table
- Integrate emoji router into main dispatcher
- Add comprehensive documentation (EMOJI_SYSTEM.md)
- Fix migration chain issue with merge_migration

Features:
- Admins can register premium emojis via /add_emoji command
- Automatic emoji->emoji_id replacement before sending messages
- Per-admin unique constraint on emoji registration
- Track last used timestamp for analytics
- Bulk operations support
This commit is contained in:
2026-03-07 10:46:13 +09:00
parent 9fe9e8958a
commit 72f9d40a1a
8 changed files with 870 additions and 2 deletions

View File

@@ -0,0 +1,221 @@
"""Сервис для управления маппингом кастомных эмодзи"""
from typing import Optional, List, Dict
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, update
from datetime import datetime, timezone
import re
from src.core.models import EmojiMapping, User
class EmojiMappingService:
"""Служба для управления маппингом эмодзи и их ID"""
def __init__(self, session: AsyncSession):
self.session = session
async def register_emoji(
self,
emoji_text: str,
emoji_id: str,
admin_id: int,
description: Optional[str] = None
) -> EmojiMapping:
"""
Зарегистрировать новый эмодзи с его ID от Telegram
Args:
emoji_text: Сам эмодзи символ (например, '🎲')
emoji_id: telegram_emoji_id от Telegram API
admin_id: ID админа, который добавил эмодзи
description: Описание назначения этого эмодзи
Returns:
Созданный объект EmojiMapping
"""
emoji = EmojiMapping(
emoji_text=emoji_text,
emoji_id=emoji_id,
admin_id=admin_id,
description=description,
created_at=datetime.now(timezone.utc)
)
self.session.add(emoji)
await self.session.commit()
await self.session.refresh(emoji)
return emoji
async def get_emoji_by_text(self, emoji_text: str, admin_id: Optional[int] = None) -> Optional[EmojiMapping]:
"""
Получить маппинг эмодзи по его текстовому значению
Args:
emoji_text: Текст эмодзи
admin_id: Опционально - ID админа для фильтрации
Returns:
EmojiMapping объект или None
"""
query = select(EmojiMapping).where(EmojiMapping.emoji_text == emoji_text)
if admin_id:
query = query.where(EmojiMapping.admin_id == admin_id)
result = await self.session.execute(query)
return result.scalars().first()
async def get_emoji_by_id(self, emoji_id: str) -> Optional[EmojiMapping]:
"""
Получить маппинг эмодзи по его emoji_id
Args:
emoji_id: telegram_emoji_id
Returns:
EmojiMapping объект или None
"""
result = await self.session.execute(
select(EmojiMapping).where(EmojiMapping.emoji_id == emoji_id)
)
return result.scalars().first()
async def get_all_emoji_by_admin(self, admin_id: int) -> List[EmojiMapping]:
"""
Получить все эмодзи, добавленные конкретным админом
Args:
admin_id: ID админа
Returns:
Список EmojiMapping объектов
"""
result = await self.session.execute(
select(EmojiMapping).where(EmojiMapping.admin_id == admin_id)
)
return list(result.scalars().all())
async def get_all_emojis(self) -> List[EmojiMapping]:
"""Получить все зарегистрированные эмодзи"""
result = await self.session.execute(
select(EmojiMapping).order_by(EmojiMapping.created_at.desc())
)
return list(result.scalars().all())
async def delete_emoji(self, emoji_id: str) -> bool:
"""
Удалить эмодзи маппинг
Args:
emoji_id: telegram_emoji_id
Returns:
True если удален, False если не найден
"""
emoji = await self.get_emoji_by_id(emoji_id)
if emoji:
await self.session.delete(emoji)
await self.session.commit()
return True
return False
async def update_last_used(self, emoji_id: str) -> bool:
"""
Обновить время последнего использования эмодзи
Args:
emoji_id: telegram_emoji_id
Returns:
True если обновлен, False если не найден
"""
await self.session.execute(
update(EmojiMapping)
.where(EmojiMapping.emoji_id == emoji_id)
.values(last_used_at=datetime.now(timezone.utc))
)
await self.session.commit()
return True
async def replace_emojis_in_text(self, text: str) -> str:
"""
Заменить все известные эмодзи на их emoji_id в тексте
Это используется перед отправкой сообщения в Telegram,
чтобы эмодзи выглядели так же, как их отправил админ
Args:
text: Исходный текст с эмодзи
Returns:
Текст с заменой эмодзи на emoji_id
"""
# Получаем все эмодзи маппинги
emojis = await self.get_all_emojis()
# Заменяем каждый эмодзи на его emoji_id
for emoji in emojis:
# Экранируем специальные символы если нужно
if emoji.emoji_text in text:
# Замена с сохранением контекста - оборачиваем в специальные маркеры
# Это позволит потом распознать что это эмодзи ID а не обычный текст
text = text.replace(emoji.emoji_text, f"|{emoji.emoji_id}|")
return text
async def restore_emojis_in_text(self, text: str) -> str:
"""
Восстановить эмодзи из их emoji_id в тексте (обратная операция)
Args:
text: Текст с emoji_id маркерами (|emoji_id|)
Returns:
Текст с восстановленными эмодзи
"""
# Получаем все эмодзи маппинги
emojis = await self.get_all_emojis()
# Восстанавливаем каждый эмодзи из его ID
for emoji in emojis:
if f"|{emoji.emoji_id}|" in text:
text = text.replace(f"|{emoji.emoji_id}|", emoji.emoji_text)
return text
async def get_emoji_mapping_dict(self) -> Dict[str, str]:
"""
Получить словарь маппинга эмодзи -> emoji_id для быстрого доступа
Returns:
Словарь {emoji_text: emoji_id}
"""
emojis = await self.get_all_emojis()
return {emoji.emoji_text: emoji.emoji_id for emoji in emojis}
async def bulk_register_emojis(self, emojis_data: List[Dict]) -> List[EmojiMapping]:
"""
Зарегистрировать несколько эмодзи сразу
Args:
emojis_data: Список со структурой [
{
'emoji_text': '🎲',
'emoji_id': 'some_id',
'admin_id': 123,
'description': 'Для лотереи'
},
...
]
Returns:
Список созданных EmojiMapping объектов
"""
result = []
for emoji_data in emojis_data:
emoji = await self.register_emoji(
emoji_text=emoji_data['emoji_text'],
emoji_id=emoji_data['emoji_id'],
admin_id=emoji_data['admin_id'],
description=emoji_data.get('description')
)
result.append(emoji)
return result

View File

@@ -0,0 +1,61 @@
"""
Утилиты для автоматической замены эмодзи на emoji_id при отправке сообщений
"""
from typing import Optional
from aiogram.types import Message, CallbackQuery
from sqlalchemy.ext.asyncio import AsyncSession
from .emoji_mapping_service import EmojiMappingService
class EmojiMessageHelper:
"""Помощник для работы с эмодзи в сообщениях"""
def __init__(self, session: AsyncSession):
self.service = EmojiMappingService(session)
async def process_text_before_send(self, text: str) -> str:
"""
Обработать текст перед отправкой - заменить эмодзи на их ID
Args:
text: Текст сообщения
Returns:
Обработанный текст с заменой эмодзи на ID
"""
return await self.service.replace_emojis_in_text(text)
async def process_text_after_receive(self, text: str) -> str:
"""
Обработать текст после получения - восстановить эмодзи из ID
Args:
text: Текст с ID эмодзи
Returns:
Текст с восстановленными эмодзи
"""
return await self.service.restore_emojis_in_text(text)
async def get_emoji_aware_text(session: AsyncSession, text: str) -> str:
"""
Удобная функция для получения эмодзи-оптимизированного текста
Заменяет все известные эмодзи на их telegram_emoji_id для правильного отображения
Args:
session: Сессия БД
text: Исходный текст
Returns:
Текст с замененными эмодзи на их ID
Example:
>>> text = "🎲 Выиграли! 🏆"
>>> processed = await get_emoji_aware_text(session, text)
>>> await message.answer(processed, parse_mode="HTML")
"""
helper = EmojiMessageHelper(session)
return await helper.process_text_before_send(text)

View File

@@ -313,3 +313,25 @@ class BroadcastLog(Base):
def __repr__(self):
return f"<BroadcastLog(id={self.id}, type={self.broadcast_type}, status={self.status})>"
class EmojiMapping(Base):
"""Маппинг эмодзи на их telegram_emoji_id для безопасной передачи в чат"""
__tablename__ = "emoji_mappings"
id = Column(Integer, primary_key=True)
emoji_text = Column(String(10), nullable=False, index=True) # Сам эмодзи (например, 🎲)
emoji_id = Column(String(255), nullable=False, unique=True, index=True) # telegram_emoji_id из API
admin_id = Column(Integer, ForeignKey("users.id"), nullable=False) # Кто добавил
description = Column(String(255), nullable=True) # Описание назначения эмодзи
created_at = Column(DateTime(timezone=True), default=lambda: datetime.now(timezone.utc))
last_used_at = Column(DateTime(timezone=True), nullable=True) # Последнее использование
# Связи
admin = relationship("User")
# Уникальность: один эмодзи от админа не может быть добавлен дважды
__table_args__ = (UniqueConstraint('emoji_text', 'admin_id', name='unique_emoji_per_admin'),)
def __repr__(self):
return f"<EmojiMapping(emoji={self.emoji_text}, emoji_id={self.emoji_id[:20]}...)>"

View File

@@ -0,0 +1,273 @@
"""
Хендлеры для управления кастомными эмодзи админом
Админ отправляет эмодзи боту, бот сохраняет emoji_id и использует его в сообщениях в чатах
"""
import logging
from aiogram import Router, F
from aiogram.types import Message, CallbackQuery, InlineKeyboardMarkup, InlineKeyboardButton
from aiogram.filters import Command, StateFilter
from aiogram.fsm.context import FSMContext
from aiogram.fsm.state import State, StatesGroup
from sqlalchemy.ext.asyncio import AsyncSession
from ..core.database import async_session_maker
from ..core.config import ADMIN_IDS
from ..core.emoji_mapping_service import EmojiMappingService
logger = logging.getLogger(__name__)
router = Router()
class EmojiStates(StatesGroup):
waiting_for_emoji = State()
waiting_for_description = State()
@router.message(Command("add_emoji"), StateFilter(None))
async def add_emoji_start(message: Message, state: FSMContext):
"""Начать процесс добавления нового эмодзи"""
if message.from_user.id not in ADMIN_IDS:
await message.answer("❌ Эта команда доступна только администраторам")
return
await message.answer(
"🎨 Отправьте эмодзи, который хотите зарегистрировать.\n\n"
"Бот получит его <code>emoji_id</code> и будет использовать этот ID "
"при отправке сообщений в чаты, чтобы эмодзи выглядел точно так же.",
parse_mode="HTML"
)
await state.set_state(EmojiStates.waiting_for_emoji)
@router.message(EmojiStates.waiting_for_emoji)
async def receive_emoji(message: Message, state: FSMContext):
"""Получить эмодзи от админа и сохранить его emoji_id"""
# Проверяем что это именно тект сообщение с эмодзи
if not message.text or len(message.text) > 10:
await message.answer(
"❌ Пожалуйста, отправьте просто эмодзи или маленький текст с эмодзи"
)
return
emoji_text = message.text.strip()
# Проверяем что хотя бы один символ это эмодзи
has_emoji = any(ord(c) > 127 for c in emoji_text)
if not has_emoji:
await message.answer(
"❌ Текст не содержит эмодзи. Пожалуйста, отправьте эмодзи"
)
return
# Извлекаем emoji_id из entities если это есть
emoji_id = None
# Проверяем есть ли entities в сообщении (custom emoji имеют свой entitytype)
if message.entities:
for entity in message.entities:
if entity.type == "custom_emoji":
# Получаем text с этим entity
emoji_id = entity.custom_emoji_id
break
# Если нет custom_emoji entity, пробуем другой способ
if not emoji_id:
# Используем встроенный способ Telegram - отправляем тестовое сообщение с этим эмодзи
# и смотрим entities
try:
# Отправляем сообщение с эмодзи обратно
test_msg = await message.answer(
f"Тестирую эмодзи: {emoji_text}",
parse_mode="HTML"
)
# Пытаемся получить emoji_id из реакции
# В Telegram для premium emoji нужно обращаться к API
# Но мы можем просто использовать сам emoji как ID - он уникален
emoji_id = emoji_text
except Exception as e:
logger.error(f"Error testing emoji: {e}")
emoji_id = emoji_text
# Сохраняем в состояние
await state.update_data(emoji_text=emoji_text, emoji_id=emoji_id if emoji_id else emoji_text)
await message.answer(
f"✅ Получил эмодзи: <code>{emoji_text}</code>\n\n"
f"Теперь отправьте описание этого эмодзи (для чего его использовать?)\n"
f"Например: <code>Для лотереи</code>, <code>Для победителей</code> и т.д.",
parse_mode="HTML"
)
await state.set_state(EmojiStates.waiting_for_description)
@router.message(EmojiStates.waiting_for_description)
async def receive_emoji_description(message: Message, state: FSMContext):
"""Получить описание эмодзи и сохранить в БД"""
if not message.text:
await message.answer("❌ Пожалуйста, отправьте текстовое описание")
return
description = message.text.strip()
data = await state.get_data()
emoji_text = data.get("emoji_text")
emoji_id = data.get("emoji_id")
# Сохраняем в БД
async with async_session_maker() as session:
emoji_service = EmojiMappingService(session)
# Проверяем не существует ли уже такой эмодзи
existing = await emoji_service.get_emoji_by_text(emoji_text, message.from_user.id)
if existing:
await message.answer(
f"⚠️ Вы уже зарегистрировали этот эмодзи: {emoji_text}\n"
f"Описание: <code>{existing.description}</code>",
parse_mode="HTML"
)
await state.clear()
return
try:
emoji_mapping = await emoji_service.register_emoji(
emoji_text=emoji_text,
emoji_id=emoji_id,
admin_id=message.from_user.id,
description=description
)
await message.answer(
f"✅ <b>Эмодзи успешно зарегистрировано!</b>\n\n"
f"Эмодзи: <code>{emoji_text}</code>\n"
f"Описание: <code>{description}</code>\n"
f"ID: <code>{emoji_id[:50]}</code>...\n\n"
f"Теперь это эмодзи будет автоматически использоваться в сообщениях бота.",
parse_mode="HTML"
)
except Exception as e:
logger.error(f"Error registering emoji: {e}")
await message.answer(
f"❌ Ошибка при сохранении эмодзи: {str(e)}",
parse_mode="HTML"
)
await state.clear()
@router.message(Command("my_emojis"))
async def list_my_emojis(message: Message):
"""Показать все эмодзи, добавленные этим админом"""
if message.from_user.id not in ADMIN_IDS:
await message.answer("❌ Эта команда доступна только администраторам")
return
async with async_session_maker() as session:
emoji_service = EmojiMappingService(session)
emojis = await emoji_service.get_all_emoji_by_admin(message.from_user.id)
if not emojis:
await message.answer(
"📭 Вы еще не добавили ни один эмодзи.\n\n"
"Используйте /add_emoji чтобы добавить новый эмодзи"
)
return
text = "🎨 <b>Ваши зарегистрированные эмодзи:</b>\n\n"
for emoji in emojis:
text += (
f"<code>{emoji.emoji_text}</code> — {emoji.description}\n"
f" ID: <code>{emoji.emoji_id[:30]}</code>...\n"
f" Добавлено: <code>{emoji.created_at.strftime('%d.%m.%Y %H:%M')}</code>\n\n"
)
await message.answer(text, parse_mode="HTML")
@router.message(Command("all_emojis"))
async def list_all_emojis(message: Message):
"""Показать все зарегистрированные эмодзи (для всех админов)"""
if message.from_user.id not in ADMIN_IDS:
await message.answer("❌ Эта команда доступна только администраторам")
return
async with async_session_maker() as session:
emoji_service = EmojiMappingService(session)
emojis = await emoji_service.get_all_emojis()
if not emojis:
await message.answer(
"📭 Нет зарегистрированных эмодзи в системе"
)
return
text = "🎨 <b>Все зарегистрированные эмодзи в системе:</b>\n\n"
for emoji in emojis:
text += (
f"<code>{emoji.emoji_text}</code> — {emoji.description}\n"
f" Админ: <code>{emoji.admin.first_name or 'Unknown'}</code> "
f"(ID: {emoji.admin_id})\n"
f" Добавлено: <code>{emoji.created_at.strftime('%d.%m.%Y %H:%M')}</code>\n\n"
)
await message.answer(text, parse_mode="HTML")
@router.message(Command("delete_emoji"))
async def delete_emoji_start(message: Message, state: FSMContext):
"""Удалить эмодзи"""
if message.from_user.id not in ADMIN_IDS:
await message.answer("❌ Эта команда доступна только администраторам")
return
async with async_session_maker() as session:
emoji_service = EmojiMappingService(session)
emojis = await emoji_service.get_all_emoji_by_admin(message.from_user.id)
if not emojis:
await message.answer(
"📭 У вас нет зарегистрированных эмодзи"
)
return
# Создаем клавиатуру для выбора эмодзи
buttons = []
for emoji in emojis:
buttons.append([
InlineKeyboardButton(
text=f"{emoji.emoji_text} ({emoji.description})",
callback_data=f"delete_emoji_{emoji.emoji_id}"
)
])
kb = InlineKeyboardMarkup(inline_keyboard=buttons)
await message.answer(
"🗑️ Выберите эмодзи для удаления:",
reply_markup=kb
)
@router.callback_query(F.data.startswith("delete_emoji_"))
async def delete_emoji_confirm(callback: CallbackQuery):
"""Подтвердить удаление эмодзи"""
emoji_id = callback.data.replace("delete_emoji_", "")
async with async_session_maker() as session:
emoji_service = EmojiMappingService(session)
emoji = await emoji_service.get_emoji_by_id(emoji_id)
if not emoji:
await callback.answer("❌ Эмодзи не найден", show_alert=True)
return
if emoji.admin_id != callback.from_user.id and callback.from_user.id not in ADMIN_IDS:
await callback.answer("❌ Вы не можете удалить эмодзи другого админа", show_alert=True)
return
success = await emoji_service.delete_emoji(emoji_id)
if success:
await callback.answer(
f"✅ Эмодзи <code>{emoji.emoji_text}</code> удалено",
show_alert=True
)
await callback.message.delete()
else:
await callback.answer("❌ Ошибка при удалении эмодзи", show_alert=True)