Merge pull request 'Add custom emoji mapping system for premium emoji support' (#11) from v2_functions into master
All checks were successful
continuous-integration/drone/push Build is passing
All checks were successful
continuous-integration/drone/push Build is passing
Reviewed-on: #11
This commit is contained in:
221
src/core/emoji_mapping_service.py
Normal file
221
src/core/emoji_mapping_service.py
Normal file
@@ -0,0 +1,221 @@
|
||||
"""Сервис для управления маппингом кастомных эмодзи"""
|
||||
from typing import Optional, List, Dict
|
||||
from sqlalchemy.ext.asyncio import AsyncSession
|
||||
from sqlalchemy import select, update
|
||||
from datetime import datetime, timezone
|
||||
import re
|
||||
|
||||
from src.core.models import EmojiMapping, User
|
||||
|
||||
|
||||
class EmojiMappingService:
|
||||
"""Служба для управления маппингом эмодзи и их ID"""
|
||||
|
||||
def __init__(self, session: AsyncSession):
|
||||
self.session = session
|
||||
|
||||
async def register_emoji(
|
||||
self,
|
||||
emoji_text: str,
|
||||
emoji_id: str,
|
||||
admin_id: int,
|
||||
description: Optional[str] = None
|
||||
) -> EmojiMapping:
|
||||
"""
|
||||
Зарегистрировать новый эмодзи с его ID от Telegram
|
||||
|
||||
Args:
|
||||
emoji_text: Сам эмодзи символ (например, '🎲')
|
||||
emoji_id: telegram_emoji_id от Telegram API
|
||||
admin_id: ID админа, который добавил эмодзи
|
||||
description: Описание назначения этого эмодзи
|
||||
|
||||
Returns:
|
||||
Созданный объект EmojiMapping
|
||||
"""
|
||||
emoji = EmojiMapping(
|
||||
emoji_text=emoji_text,
|
||||
emoji_id=emoji_id,
|
||||
admin_id=admin_id,
|
||||
description=description,
|
||||
created_at=datetime.now(timezone.utc)
|
||||
)
|
||||
self.session.add(emoji)
|
||||
await self.session.commit()
|
||||
await self.session.refresh(emoji)
|
||||
return emoji
|
||||
|
||||
async def get_emoji_by_text(self, emoji_text: str, admin_id: Optional[int] = None) -> Optional[EmojiMapping]:
|
||||
"""
|
||||
Получить маппинг эмодзи по его текстовому значению
|
||||
|
||||
Args:
|
||||
emoji_text: Текст эмодзи
|
||||
admin_id: Опционально - ID админа для фильтрации
|
||||
|
||||
Returns:
|
||||
EmojiMapping объект или None
|
||||
"""
|
||||
query = select(EmojiMapping).where(EmojiMapping.emoji_text == emoji_text)
|
||||
if admin_id:
|
||||
query = query.where(EmojiMapping.admin_id == admin_id)
|
||||
|
||||
result = await self.session.execute(query)
|
||||
return result.scalars().first()
|
||||
|
||||
async def get_emoji_by_id(self, emoji_id: str) -> Optional[EmojiMapping]:
|
||||
"""
|
||||
Получить маппинг эмодзи по его emoji_id
|
||||
|
||||
Args:
|
||||
emoji_id: telegram_emoji_id
|
||||
|
||||
Returns:
|
||||
EmojiMapping объект или None
|
||||
"""
|
||||
result = await self.session.execute(
|
||||
select(EmojiMapping).where(EmojiMapping.emoji_id == emoji_id)
|
||||
)
|
||||
return result.scalars().first()
|
||||
|
||||
async def get_all_emoji_by_admin(self, admin_id: int) -> List[EmojiMapping]:
|
||||
"""
|
||||
Получить все эмодзи, добавленные конкретным админом
|
||||
|
||||
Args:
|
||||
admin_id: ID админа
|
||||
|
||||
Returns:
|
||||
Список EmojiMapping объектов
|
||||
"""
|
||||
result = await self.session.execute(
|
||||
select(EmojiMapping).where(EmojiMapping.admin_id == admin_id)
|
||||
)
|
||||
return list(result.scalars().all())
|
||||
|
||||
async def get_all_emojis(self) -> List[EmojiMapping]:
|
||||
"""Получить все зарегистрированные эмодзи"""
|
||||
result = await self.session.execute(
|
||||
select(EmojiMapping).order_by(EmojiMapping.created_at.desc())
|
||||
)
|
||||
return list(result.scalars().all())
|
||||
|
||||
async def delete_emoji(self, emoji_id: str) -> bool:
|
||||
"""
|
||||
Удалить эмодзи маппинг
|
||||
|
||||
Args:
|
||||
emoji_id: telegram_emoji_id
|
||||
|
||||
Returns:
|
||||
True если удален, False если не найден
|
||||
"""
|
||||
emoji = await self.get_emoji_by_id(emoji_id)
|
||||
if emoji:
|
||||
await self.session.delete(emoji)
|
||||
await self.session.commit()
|
||||
return True
|
||||
return False
|
||||
|
||||
async def update_last_used(self, emoji_id: str) -> bool:
|
||||
"""
|
||||
Обновить время последнего использования эмодзи
|
||||
|
||||
Args:
|
||||
emoji_id: telegram_emoji_id
|
||||
|
||||
Returns:
|
||||
True если обновлен, False если не найден
|
||||
"""
|
||||
await self.session.execute(
|
||||
update(EmojiMapping)
|
||||
.where(EmojiMapping.emoji_id == emoji_id)
|
||||
.values(last_used_at=datetime.now(timezone.utc))
|
||||
)
|
||||
await self.session.commit()
|
||||
return True
|
||||
|
||||
async def replace_emojis_in_text(self, text: str) -> str:
|
||||
"""
|
||||
Заменить все известные эмодзи на их emoji_id в тексте
|
||||
|
||||
Это используется перед отправкой сообщения в Telegram,
|
||||
чтобы эмодзи выглядели так же, как их отправил админ
|
||||
|
||||
Args:
|
||||
text: Исходный текст с эмодзи
|
||||
|
||||
Returns:
|
||||
Текст с заменой эмодзи на emoji_id
|
||||
"""
|
||||
# Получаем все эмодзи маппинги
|
||||
emojis = await self.get_all_emojis()
|
||||
|
||||
# Заменяем каждый эмодзи на его emoji_id
|
||||
for emoji in emojis:
|
||||
# Экранируем специальные символы если нужно
|
||||
if emoji.emoji_text in text:
|
||||
# Замена с сохранением контекста - оборачиваем в специальные маркеры
|
||||
# Это позволит потом распознать что это эмодзи ID а не обычный текст
|
||||
text = text.replace(emoji.emoji_text, f"|{emoji.emoji_id}|")
|
||||
|
||||
return text
|
||||
|
||||
async def restore_emojis_in_text(self, text: str) -> str:
|
||||
"""
|
||||
Восстановить эмодзи из их emoji_id в тексте (обратная операция)
|
||||
|
||||
Args:
|
||||
text: Текст с emoji_id маркерами (|emoji_id|)
|
||||
|
||||
Returns:
|
||||
Текст с восстановленными эмодзи
|
||||
"""
|
||||
# Получаем все эмодзи маппинги
|
||||
emojis = await self.get_all_emojis()
|
||||
|
||||
# Восстанавливаем каждый эмодзи из его ID
|
||||
for emoji in emojis:
|
||||
if f"|{emoji.emoji_id}|" in text:
|
||||
text = text.replace(f"|{emoji.emoji_id}|", emoji.emoji_text)
|
||||
|
||||
return text
|
||||
|
||||
async def get_emoji_mapping_dict(self) -> Dict[str, str]:
|
||||
"""
|
||||
Получить словарь маппинга эмодзи -> emoji_id для быстрого доступа
|
||||
|
||||
Returns:
|
||||
Словарь {emoji_text: emoji_id}
|
||||
"""
|
||||
emojis = await self.get_all_emojis()
|
||||
return {emoji.emoji_text: emoji.emoji_id for emoji in emojis}
|
||||
|
||||
async def bulk_register_emojis(self, emojis_data: List[Dict]) -> List[EmojiMapping]:
|
||||
"""
|
||||
Зарегистрировать несколько эмодзи сразу
|
||||
|
||||
Args:
|
||||
emojis_data: Список со структурой [
|
||||
{
|
||||
'emoji_text': '🎲',
|
||||
'emoji_id': 'some_id',
|
||||
'admin_id': 123,
|
||||
'description': 'Для лотереи'
|
||||
},
|
||||
...
|
||||
]
|
||||
|
||||
Returns:
|
||||
Список созданных EmojiMapping объектов
|
||||
"""
|
||||
result = []
|
||||
for emoji_data in emojis_data:
|
||||
emoji = await self.register_emoji(
|
||||
emoji_text=emoji_data['emoji_text'],
|
||||
emoji_id=emoji_data['emoji_id'],
|
||||
admin_id=emoji_data['admin_id'],
|
||||
description=emoji_data.get('description')
|
||||
)
|
||||
result.append(emoji)
|
||||
return result
|
||||
61
src/core/emoji_message_helper.py
Normal file
61
src/core/emoji_message_helper.py
Normal file
@@ -0,0 +1,61 @@
|
||||
"""
|
||||
Утилиты для автоматической замены эмодзи на emoji_id при отправке сообщений
|
||||
"""
|
||||
from typing import Optional
|
||||
from aiogram.types import Message, CallbackQuery
|
||||
from sqlalchemy.ext.asyncio import AsyncSession
|
||||
|
||||
from .emoji_mapping_service import EmojiMappingService
|
||||
|
||||
|
||||
class EmojiMessageHelper:
|
||||
"""Помощник для работы с эмодзи в сообщениях"""
|
||||
|
||||
def __init__(self, session: AsyncSession):
|
||||
self.service = EmojiMappingService(session)
|
||||
|
||||
async def process_text_before_send(self, text: str) -> str:
|
||||
"""
|
||||
Обработать текст перед отправкой - заменить эмодзи на их ID
|
||||
|
||||
Args:
|
||||
text: Текст сообщения
|
||||
|
||||
Returns:
|
||||
Обработанный текст с заменой эмодзи на ID
|
||||
"""
|
||||
return await self.service.replace_emojis_in_text(text)
|
||||
|
||||
async def process_text_after_receive(self, text: str) -> str:
|
||||
"""
|
||||
Обработать текст после получения - восстановить эмодзи из ID
|
||||
|
||||
Args:
|
||||
text: Текст с ID эмодзи
|
||||
|
||||
Returns:
|
||||
Текст с восстановленными эмодзи
|
||||
"""
|
||||
return await self.service.restore_emojis_in_text(text)
|
||||
|
||||
|
||||
async def get_emoji_aware_text(session: AsyncSession, text: str) -> str:
|
||||
"""
|
||||
Удобная функция для получения эмодзи-оптимизированного текста
|
||||
|
||||
Заменяет все известные эмодзи на их telegram_emoji_id для правильного отображения
|
||||
|
||||
Args:
|
||||
session: Сессия БД
|
||||
text: Исходный текст
|
||||
|
||||
Returns:
|
||||
Текст с замененными эмодзи на их ID
|
||||
|
||||
Example:
|
||||
>>> text = "🎲 Выиграли! 🏆"
|
||||
>>> processed = await get_emoji_aware_text(session, text)
|
||||
>>> await message.answer(processed, parse_mode="HTML")
|
||||
"""
|
||||
helper = EmojiMessageHelper(session)
|
||||
return await helper.process_text_before_send(text)
|
||||
@@ -313,3 +313,25 @@ class BroadcastLog(Base):
|
||||
|
||||
def __repr__(self):
|
||||
return f"<BroadcastLog(id={self.id}, type={self.broadcast_type}, status={self.status})>"
|
||||
|
||||
|
||||
class EmojiMapping(Base):
|
||||
"""Маппинг эмодзи на их telegram_emoji_id для безопасной передачи в чат"""
|
||||
__tablename__ = "emoji_mappings"
|
||||
|
||||
id = Column(Integer, primary_key=True)
|
||||
emoji_text = Column(String(10), nullable=False, index=True) # Сам эмодзи (например, 🎲)
|
||||
emoji_id = Column(String(255), nullable=False, unique=True, index=True) # telegram_emoji_id из API
|
||||
admin_id = Column(Integer, ForeignKey("users.id"), nullable=False) # Кто добавил
|
||||
description = Column(String(255), nullable=True) # Описание назначения эмодзи
|
||||
created_at = Column(DateTime(timezone=True), default=lambda: datetime.now(timezone.utc))
|
||||
last_used_at = Column(DateTime(timezone=True), nullable=True) # Последнее использование
|
||||
|
||||
# Связи
|
||||
admin = relationship("User")
|
||||
|
||||
# Уникальность: один эмодзи от админа не может быть добавлен дважды
|
||||
__table_args__ = (UniqueConstraint('emoji_text', 'admin_id', name='unique_emoji_per_admin'),)
|
||||
|
||||
def __repr__(self):
|
||||
return f"<EmojiMapping(emoji={self.emoji_text}, emoji_id={self.emoji_id[:20]}...)>"
|
||||
|
||||
Reference in New Issue
Block a user