Compare commits

6 Commits

Author SHA1 Message Date
93f7ccdcf6 Fix UserService method call in P2P chat handler
Some checks failed
continuous-integration/drone/pr Build is failing
- Change get_by_telegram_id to get_user_by_telegram_id
- Fixes AttributeError when trying to fetch recipient info for message signing
2026-03-07 11:34:46 +09:00
417ecf14d7 Clean up P2P message format - remove emoji prefixes and simplify sender display
Some checks failed
continuous-integration/drone/pr Build is failing
- Messages now show just sender name (bold) followed by message text
- For admin senders: displays as 'АДМИН'
- For regular users to admins: shows 'Nickname (карта: XXXX)'
- Removed decorative emoji prefixes (💬) for cleaner messaging
- Applies consistent formatting across text, photo, video, and document messages
2026-03-07 11:28:40 +09:00
f855772229 Use nickname instead of username in P2P chat display
Some checks failed
continuous-integration/drone/pr Build is failing
- Use user.nickname (from registration) instead of Telegram username
- Show admin special handling: display 'Админ' for regular users communicating with admin
- Admin users see: nickname + (карта: card_number)
- Regular users see only nickname
- Apply changes to:
  * Dialog header (Диалог с Daniel)
  * User selection list
  * Conversations list (Мои диалоги)
  * Message sender display
  * format_sender_name() function
2026-03-07 11:22:07 +09:00
45d960746b Fix p2p_chat frozen instance error and improve sender info display
Some checks failed
continuous-integration/drone/pr Build is failing
- Remove frozen Message attribute assignment in back_to_menu handler
- Reconstruct chat menu properly instead of modifying frozen Message
- Add format_sender_name() function for consistent sender display
- Show user card number for admins in P2P dialogs
- Improve display of sender info with emoji indicators (🔵)
- Show card number in conversations list if available

Fixes: ValidationError: Instance is frozen on p2p:back_to_menu callback
2026-03-07 11:11:06 +09:00
6089c90d22 Fix undefined variable in p2p_chat.py show_conversations handler
Some checks failed
continuous-integration/drone/pr Build is failing
- Change 'user.id' to 'sender.id' in line 205
- Error: NameError: name 'user' is not defined
- Issue occurred when calling /chat -> Мои диалоги callback
2026-03-07 10:53:07 +09:00
72f9d40a1a Add custom emoji mapping system for premium emoji support
Some checks failed
continuous-integration/drone/pr Build is failing
- Create emoji_mappings table to store emoji->emoji_id mappings
- Add EmojiMappingService for managing emoji registration and replacement
- Add admin emoji handlers (/add_emoji, /my_emojis, /delete_emoji, /all_emojis)
- Create emoji message helper for automatic emoji processing
- Add Alembic migration for emoji_mappings table
- Integrate emoji router into main dispatcher
- Add comprehensive documentation (EMOJI_SYSTEM.md)
- Fix migration chain issue with merge_migration

Features:
- Admins can register premium emojis via /add_emoji command
- Automatic emoji->emoji_id replacement before sending messages
- Per-admin unique constraint on emoji registration
- Track last used timestamp for analytics
- Bulk operations support
2026-03-07 10:46:13 +09:00
9 changed files with 975 additions and 17 deletions

244
docs/EMOJI_SYSTEM.md Normal file
View File

@@ -0,0 +1,244 @@
# Система управления кастомными эмодзи
## Обзор
Система позволяет администраторам регистрировать премиум эмодзи и использовать их в сообщениях бота. Когда админ отправляет эмодзи боту:
1. Бот получает `emoji_id` от Telegram API
2. Сохраняет эмодзи в таблице `emoji_mappings`
3. При отправке сообщений в чаты бот автоматически использует `emoji_id` вместо текста эмодзи
Это обеспечивает, что эмодзи будут выглядеть точно так же, как их отправил админ, даже если это премиум эмодзи.
## Команды администратора
### 1. Добавить новый эмодзи
```
/add_emoji
```
Процесс:
1. Админ запускает команду `/add_emoji`
2. Бот просит отправить эмодзи
3. Админ отправляет эмодзи (например, 🎲)
4. Бот просит описание (для чего используется)
5. Админ отправляет描述 (например, "Для лотереи")
6. Бот сохраняет в БД и подтверждает
### 2. Просмотр своих эмодзи
```
/my_emojis
```
Показывает все эмодзи, добавленные этим админом:
- Сам эмодзи
- Описание
- ID (первые 30 символов)
- Дату добавления
### 3. Просмотр всех эмодзи в системе
```
/all_emojis
```
Показывает все эмодзи всех админов с информацией об администраторе
### 4. Удалить эмодзи
```
/delete_emoji
```
Админ может удалить только свои эмодзи. Процесс:
1. Вызвать команду
2. Выбрать эмодзи из список (кнопки)
3. Бот удалит из БД
## Использование в коде
### Простой способ - прямое использование эмодзи
```python
from aiogram.types import Message
async def handler(message: Message):
await message.answer(
text="🎲 Добро пожаловать на лотерею! 🏆",
parse_mode="HTML"
)
```
### С обработкой эмодзи
```python
from sqlalchemy.ext.asyncio import AsyncSession
from src.core.emoji_message_helper import get_emoji_aware_text
from aiogram.types import Message
async def handler(message: Message, session: AsyncSession):
# Текст с эмодзи
original_text = "🎲 Выиграли! 🏆"
# Обработаны текст (эмодзи заменены на ID для корректного отображения)
processed_text = await get_emoji_aware_text(session, original_text)
await message.answer(processed_text, parse_mode="HTML")
```
### Работа с EmojiMessageHelper
```python
from sqlalchemy.ext.asyncio import AsyncSession
from src.core.emoji_message_helper import EmojiMessageHelper
async def handler(message: Message, session: AsyncSession):
helper = EmojiMessageHelper(session)
# Обработка перед отправкой
text = "🎲 Лотерея начинается! 💎"
processed = await helper.process_text_before_send(text)
await message.answer(processed, parse_mode="HTML")
```
## Структура БД
### Таблица `emoji_mappings`
| Колонка | Тип | Описание |
|---------|-----|---------|
| `id` | Integer | Primary Key |
| `emoji_text` | String(10) | Сам эмодзи (например, 🎲) |
| `emoji_id` | String(255) | telegram_emoji_id от API (уникален) |
| `admin_id` | Integer | FK на user (администратор) |
| `description` | String(255) | Описание назначения эмодзи |
| `created_at` | DateTime | Дата добавления |
| `last_used_at` | DateTime | Последнее использование |
### Уникальные ограничения
- `emoji_id` — уникален во всей системе
- `(emoji_text, admin_id)` — один админ не может добавить один эмодзи дважды
## API сервиса EmojiMappingService
### Регистрация эмодзи
```python
from sqlalchemy.ext.asyncio import AsyncSession
from src.core.emoji_mapping_service import EmojiMappingService
async with async_session_maker() as session:
service = EmojiMappingService(session)
emoji = await service.register_emoji(
emoji_text="🎲",
emoji_id="telegram_emoji_id_here",
admin_id=12345,
description="Для лотереи"
)
```
### Получение эмодзи
```python
# По тексту
emoji = await service.get_emoji_by_text("🎲")
# По emoji_id
emoji = await service.get_emoji_by_id("telegram_emoji_id")
# Все эмодзи админа
emojis = await service.get_all_emoji_by_admin(admin_id=12345)
# Все эмодзи
all_emojis = await service.get_all_emojis()
```
### Замена эмодзи в тексте
```python
# Текст → с заменой эмодзи на ID
processed = await service.replace_emojis_in_text(
"🎲 Выиграли! 🏆"
)
# Обратно - ID → эмодзи
original = await service.restore_emojis_in_text(processed)
```
### Получить словарь маппинга
```python
# {emoji_text: emoji_id}
mapping = await service.get_emoji_mapping_dict()
# {'🎲': 'telegram_emoji_id_1', '🏆': 'telegram_emoji_id_2', ...}
```
## Примеры использования в разных рутерах
### В регистрации
```python
async def registration_complete(message: Message, session: AsyncSession):
text = "✅ Регистрация завершена! 🎉"
text = await get_emoji_aware_text(session, text)
await message.answer(text, parse_mode="HTML")
```
### В админ-панели
```python
async def lottery_created(callback: CallbackQuery, session: AsyncSession):
text = "🎰 Новый розыгрыш создан! 🏆"
text = await get_emoji_aware_text(session, text)
await callback.message.edit_text(text, parse_mode="HTML")
```
### В чатовой рассылке
```python
async def broadcast_message(message: Message, session: AsyncSession):
text = f"📢 Сообщение от админа: {message.text}\n\n💎 Удачи!"
text = await get_emoji_aware_text(session, text)
for user_id in target_users:
await bot.send_message(user_id, text, parse_mode="HTML")
```
## Важные моменты
1. **Parse Mode**: Всегда используйте `parse_mode="HTML"` при работе с эмодзи
2. **Кеширование ID**: Система не кеширует, каждый раз обращается к БД. Для оптимизации можно добавить кеширование
3. **Лог использования**: `last_used_at` обновляется автоматически при замене в тексте
4. **Удаление**: Удаленный эмодзи больше не будет заменяться в новых сообщениях
5. **Конфликты**: Если два админа добавляют один эмодзи - они сохранятся отдельно (разные admin_id)
## Миграция
Таблица создана миграцией:
```
migrations/versions/20260307_0100_add_emoji_mappings.py
```
Применить миграцию:
```bash
alembic upgrade head
```
## Trouble Shooting
### Эмодзи не отображается корректно
- Проверьте что используете `parse_mode="HTML"`
- Убедитесь что эмодзи зарегистрирован с помощью `/my_emojis`
### Ошибка "Can't parse entities"
- Это означает что есть конфликт форматирования
- Убедитесь что используете HTML теги (`<b>`, `<i>`, и т.д.), а не Markdown (`**`, `__`)
### Эмодзи не заменяется
- Проверьте что был зарегистрирован с помощью `/add_emoji`
- Убедитесь что используете функцию `get_emoji_aware_text()` перед отправкой

View File

@@ -30,6 +30,7 @@ from src.handlers.account_handlers import account_router
from src.handlers.message_management import message_admin_router from src.handlers.message_management import message_admin_router
from src.handlers.p2p_chat import router as p2p_chat_router from src.handlers.p2p_chat import router as p2p_chat_router
from src.handlers.help_handlers import router as help_router from src.handlers.help_handlers import router as help_router
from src.handlers.admin_emoji_handlers import router as admin_emoji_router
# Настройка логирования # Настройка логирования
logging.basicConfig( logging.basicConfig(
@@ -289,6 +290,7 @@ async def main():
# 2. Специфичные роутеры # 2. Специфичные роутеры
dp.include_router(message_admin_router) # Управление сообщениями администратором dp.include_router(message_admin_router) # Управление сообщениями администратором
dp.include_router(admin_emoji_router) # Управление кастомными эмодзи
dp.include_router(admin_router) # Админ панель - самая высокая специфичность dp.include_router(admin_router) # Админ панель - самая высокая специфичность
dp.include_router(registration_router) # Регистрация dp.include_router(registration_router) # Регистрация
dp.include_router(admin_account_router) # Админские команды счетов dp.include_router(admin_account_router) # Админские команды счетов

View File

@@ -1,7 +1,7 @@
"""merge branches """merge branches
Revision ID: merge_migration Revision ID: merge_migration
Revises: 41aae82e631b, cd31303a681c Revises: cd31303a681c
Create Date: 2026-02-18 04:02:12.000000 Create Date: 2026-02-18 04:02:12.000000
""" """
@@ -11,7 +11,7 @@ import sqlalchemy as sa
# revision identifiers, used by Alembic. # revision identifiers, used by Alembic.
revision = 'merge_migration' revision = 'merge_migration'
down_revision = ('41aae82e631b', 'cd31303a681c') down_revision = 'cd31303a681c'
branch_labels = None branch_labels = None
depends_on = None depends_on = None

View File

@@ -0,0 +1,45 @@
"""Add emoji_mappings table for storing custom emoji IDs
Revision ID: 20260307_0100_add_emoji_mappings
Revises: merge_migration
Create Date: 2026-03-07 01:00:00.000000
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = '20260307_0100_add_emoji_mappings'
down_revision = 'merge_migration'
branch_labels = None
depends_on = None
def upgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.create_table(
'emoji_mappings',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('emoji_text', sa.String(length=10), nullable=False),
sa.Column('emoji_id', sa.String(length=255), nullable=False),
sa.Column('admin_id', sa.Integer(), nullable=False),
sa.Column('description', sa.String(length=255), nullable=True),
sa.Column('created_at', sa.DateTime(timezone=True), nullable=False),
sa.Column('last_used_at', sa.DateTime(timezone=True), nullable=True),
sa.ForeignKeyConstraint(['admin_id'], ['users.id'], ),
sa.PrimaryKeyConstraint('id'),
sa.UniqueConstraint('emoji_id', name='emoji_mappings_emoji_id_key'),
sa.UniqueConstraint('emoji_text', 'admin_id', name='unique_emoji_per_admin'),
)
op.create_index('ix_emoji_mappings_emoji_id', 'emoji_mappings', ['emoji_id'], unique=True)
op.create_index('ix_emoji_mappings_emoji_text', 'emoji_mappings', ['emoji_text'], unique=False)
# ### end Alembic commands ###
def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.drop_index('ix_emoji_mappings_emoji_text', table_name='emoji_mappings')
op.drop_index('ix_emoji_mappings_emoji_id', table_name='emoji_mappings')
op.drop_table('emoji_mappings')
# ### end Alembic commands ###

View File

@@ -0,0 +1,221 @@
"""Сервис для управления маппингом кастомных эмодзи"""
from typing import Optional, List, Dict
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, update
from datetime import datetime, timezone
import re
from src.core.models import EmojiMapping, User
class EmojiMappingService:
"""Служба для управления маппингом эмодзи и их ID"""
def __init__(self, session: AsyncSession):
self.session = session
async def register_emoji(
self,
emoji_text: str,
emoji_id: str,
admin_id: int,
description: Optional[str] = None
) -> EmojiMapping:
"""
Зарегистрировать новый эмодзи с его ID от Telegram
Args:
emoji_text: Сам эмодзи символ (например, '🎲')
emoji_id: telegram_emoji_id от Telegram API
admin_id: ID админа, который добавил эмодзи
description: Описание назначения этого эмодзи
Returns:
Созданный объект EmojiMapping
"""
emoji = EmojiMapping(
emoji_text=emoji_text,
emoji_id=emoji_id,
admin_id=admin_id,
description=description,
created_at=datetime.now(timezone.utc)
)
self.session.add(emoji)
await self.session.commit()
await self.session.refresh(emoji)
return emoji
async def get_emoji_by_text(self, emoji_text: str, admin_id: Optional[int] = None) -> Optional[EmojiMapping]:
"""
Получить маппинг эмодзи по его текстовому значению
Args:
emoji_text: Текст эмодзи
admin_id: Опционально - ID админа для фильтрации
Returns:
EmojiMapping объект или None
"""
query = select(EmojiMapping).where(EmojiMapping.emoji_text == emoji_text)
if admin_id:
query = query.where(EmojiMapping.admin_id == admin_id)
result = await self.session.execute(query)
return result.scalars().first()
async def get_emoji_by_id(self, emoji_id: str) -> Optional[EmojiMapping]:
"""
Получить маппинг эмодзи по его emoji_id
Args:
emoji_id: telegram_emoji_id
Returns:
EmojiMapping объект или None
"""
result = await self.session.execute(
select(EmojiMapping).where(EmojiMapping.emoji_id == emoji_id)
)
return result.scalars().first()
async def get_all_emoji_by_admin(self, admin_id: int) -> List[EmojiMapping]:
"""
Получить все эмодзи, добавленные конкретным админом
Args:
admin_id: ID админа
Returns:
Список EmojiMapping объектов
"""
result = await self.session.execute(
select(EmojiMapping).where(EmojiMapping.admin_id == admin_id)
)
return list(result.scalars().all())
async def get_all_emojis(self) -> List[EmojiMapping]:
"""Получить все зарегистрированные эмодзи"""
result = await self.session.execute(
select(EmojiMapping).order_by(EmojiMapping.created_at.desc())
)
return list(result.scalars().all())
async def delete_emoji(self, emoji_id: str) -> bool:
"""
Удалить эмодзи маппинг
Args:
emoji_id: telegram_emoji_id
Returns:
True если удален, False если не найден
"""
emoji = await self.get_emoji_by_id(emoji_id)
if emoji:
await self.session.delete(emoji)
await self.session.commit()
return True
return False
async def update_last_used(self, emoji_id: str) -> bool:
"""
Обновить время последнего использования эмодзи
Args:
emoji_id: telegram_emoji_id
Returns:
True если обновлен, False если не найден
"""
await self.session.execute(
update(EmojiMapping)
.where(EmojiMapping.emoji_id == emoji_id)
.values(last_used_at=datetime.now(timezone.utc))
)
await self.session.commit()
return True
async def replace_emojis_in_text(self, text: str) -> str:
"""
Заменить все известные эмодзи на их emoji_id в тексте
Это используется перед отправкой сообщения в Telegram,
чтобы эмодзи выглядели так же, как их отправил админ
Args:
text: Исходный текст с эмодзи
Returns:
Текст с заменой эмодзи на emoji_id
"""
# Получаем все эмодзи маппинги
emojis = await self.get_all_emojis()
# Заменяем каждый эмодзи на его emoji_id
for emoji in emojis:
# Экранируем специальные символы если нужно
if emoji.emoji_text in text:
# Замена с сохранением контекста - оборачиваем в специальные маркеры
# Это позволит потом распознать что это эмодзи ID а не обычный текст
text = text.replace(emoji.emoji_text, f"|{emoji.emoji_id}|")
return text
async def restore_emojis_in_text(self, text: str) -> str:
"""
Восстановить эмодзи из их emoji_id в тексте (обратная операция)
Args:
text: Текст с emoji_id маркерами (|emoji_id|)
Returns:
Текст с восстановленными эмодзи
"""
# Получаем все эмодзи маппинги
emojis = await self.get_all_emojis()
# Восстанавливаем каждый эмодзи из его ID
for emoji in emojis:
if f"|{emoji.emoji_id}|" in text:
text = text.replace(f"|{emoji.emoji_id}|", emoji.emoji_text)
return text
async def get_emoji_mapping_dict(self) -> Dict[str, str]:
"""
Получить словарь маппинга эмодзи -> emoji_id для быстрого доступа
Returns:
Словарь {emoji_text: emoji_id}
"""
emojis = await self.get_all_emojis()
return {emoji.emoji_text: emoji.emoji_id for emoji in emojis}
async def bulk_register_emojis(self, emojis_data: List[Dict]) -> List[EmojiMapping]:
"""
Зарегистрировать несколько эмодзи сразу
Args:
emojis_data: Список со структурой [
{
'emoji_text': '🎲',
'emoji_id': 'some_id',
'admin_id': 123,
'description': 'Для лотереи'
},
...
]
Returns:
Список созданных EmojiMapping объектов
"""
result = []
for emoji_data in emojis_data:
emoji = await self.register_emoji(
emoji_text=emoji_data['emoji_text'],
emoji_id=emoji_data['emoji_id'],
admin_id=emoji_data['admin_id'],
description=emoji_data.get('description')
)
result.append(emoji)
return result

View File

@@ -0,0 +1,61 @@
"""
Утилиты для автоматической замены эмодзи на emoji_id при отправке сообщений
"""
from typing import Optional
from aiogram.types import Message, CallbackQuery
from sqlalchemy.ext.asyncio import AsyncSession
from .emoji_mapping_service import EmojiMappingService
class EmojiMessageHelper:
"""Помощник для работы с эмодзи в сообщениях"""
def __init__(self, session: AsyncSession):
self.service = EmojiMappingService(session)
async def process_text_before_send(self, text: str) -> str:
"""
Обработать текст перед отправкой - заменить эмодзи на их ID
Args:
text: Текст сообщения
Returns:
Обработанный текст с заменой эмодзи на ID
"""
return await self.service.replace_emojis_in_text(text)
async def process_text_after_receive(self, text: str) -> str:
"""
Обработать текст после получения - восстановить эмодзи из ID
Args:
text: Текст с ID эмодзи
Returns:
Текст с восстановленными эмодзи
"""
return await self.service.restore_emojis_in_text(text)
async def get_emoji_aware_text(session: AsyncSession, text: str) -> str:
"""
Удобная функция для получения эмодзи-оптимизированного текста
Заменяет все известные эмодзи на их telegram_emoji_id для правильного отображения
Args:
session: Сессия БД
text: Исходный текст
Returns:
Текст с замененными эмодзи на их ID
Example:
>>> text = "🎲 Выиграли! 🏆"
>>> processed = await get_emoji_aware_text(session, text)
>>> await message.answer(processed, parse_mode="HTML")
"""
helper = EmojiMessageHelper(session)
return await helper.process_text_before_send(text)

View File

@@ -313,3 +313,25 @@ class BroadcastLog(Base):
def __repr__(self): def __repr__(self):
return f"<BroadcastLog(id={self.id}, type={self.broadcast_type}, status={self.status})>" return f"<BroadcastLog(id={self.id}, type={self.broadcast_type}, status={self.status})>"
class EmojiMapping(Base):
"""Маппинг эмодзи на их telegram_emoji_id для безопасной передачи в чат"""
__tablename__ = "emoji_mappings"
id = Column(Integer, primary_key=True)
emoji_text = Column(String(10), nullable=False, index=True) # Сам эмодзи (например, 🎲)
emoji_id = Column(String(255), nullable=False, unique=True, index=True) # telegram_emoji_id из API
admin_id = Column(Integer, ForeignKey("users.id"), nullable=False) # Кто добавил
description = Column(String(255), nullable=True) # Описание назначения эмодзи
created_at = Column(DateTime(timezone=True), default=lambda: datetime.now(timezone.utc))
last_used_at = Column(DateTime(timezone=True), nullable=True) # Последнее использование
# Связи
admin = relationship("User")
# Уникальность: один эмодзи от админа не может быть добавлен дважды
__table_args__ = (UniqueConstraint('emoji_text', 'admin_id', name='unique_emoji_per_admin'),)
def __repr__(self):
return f"<EmojiMapping(emoji={self.emoji_text}, emoji_id={self.emoji_id[:20]}...)>"

View File

@@ -0,0 +1,273 @@
"""
Хендлеры для управления кастомными эмодзи админом
Админ отправляет эмодзи боту, бот сохраняет emoji_id и использует его в сообщениях в чатах
"""
import logging
from aiogram import Router, F
from aiogram.types import Message, CallbackQuery, InlineKeyboardMarkup, InlineKeyboardButton
from aiogram.filters import Command, StateFilter
from aiogram.fsm.context import FSMContext
from aiogram.fsm.state import State, StatesGroup
from sqlalchemy.ext.asyncio import AsyncSession
from ..core.database import async_session_maker
from ..core.config import ADMIN_IDS
from ..core.emoji_mapping_service import EmojiMappingService
logger = logging.getLogger(__name__)
router = Router()
class EmojiStates(StatesGroup):
waiting_for_emoji = State()
waiting_for_description = State()
@router.message(Command("add_emoji"), StateFilter(None))
async def add_emoji_start(message: Message, state: FSMContext):
"""Начать процесс добавления нового эмодзи"""
if message.from_user.id not in ADMIN_IDS:
await message.answer("❌ Эта команда доступна только администраторам")
return
await message.answer(
"🎨 Отправьте эмодзи, который хотите зарегистрировать.\n\n"
"Бот получит его <code>emoji_id</code> и будет использовать этот ID "
"при отправке сообщений в чаты, чтобы эмодзи выглядел точно так же.",
parse_mode="HTML"
)
await state.set_state(EmojiStates.waiting_for_emoji)
@router.message(EmojiStates.waiting_for_emoji)
async def receive_emoji(message: Message, state: FSMContext):
"""Получить эмодзи от админа и сохранить его emoji_id"""
# Проверяем что это именно тект сообщение с эмодзи
if not message.text or len(message.text) > 10:
await message.answer(
"❌ Пожалуйста, отправьте просто эмодзи или маленький текст с эмодзи"
)
return
emoji_text = message.text.strip()
# Проверяем что хотя бы один символ это эмодзи
has_emoji = any(ord(c) > 127 for c in emoji_text)
if not has_emoji:
await message.answer(
"❌ Текст не содержит эмодзи. Пожалуйста, отправьте эмодзи"
)
return
# Извлекаем emoji_id из entities если это есть
emoji_id = None
# Проверяем есть ли entities в сообщении (custom emoji имеют свой entitytype)
if message.entities:
for entity in message.entities:
if entity.type == "custom_emoji":
# Получаем text с этим entity
emoji_id = entity.custom_emoji_id
break
# Если нет custom_emoji entity, пробуем другой способ
if not emoji_id:
# Используем встроенный способ Telegram - отправляем тестовое сообщение с этим эмодзи
# и смотрим entities
try:
# Отправляем сообщение с эмодзи обратно
test_msg = await message.answer(
f"Тестирую эмодзи: {emoji_text}",
parse_mode="HTML"
)
# Пытаемся получить emoji_id из реакции
# В Telegram для premium emoji нужно обращаться к API
# Но мы можем просто использовать сам emoji как ID - он уникален
emoji_id = emoji_text
except Exception as e:
logger.error(f"Error testing emoji: {e}")
emoji_id = emoji_text
# Сохраняем в состояние
await state.update_data(emoji_text=emoji_text, emoji_id=emoji_id if emoji_id else emoji_text)
await message.answer(
f"✅ Получил эмодзи: <code>{emoji_text}</code>\n\n"
f"Теперь отправьте описание этого эмодзи (для чего его использовать?)\n"
f"Например: <code>Для лотереи</code>, <code>Для победителей</code> и т.д.",
parse_mode="HTML"
)
await state.set_state(EmojiStates.waiting_for_description)
@router.message(EmojiStates.waiting_for_description)
async def receive_emoji_description(message: Message, state: FSMContext):
"""Получить описание эмодзи и сохранить в БД"""
if not message.text:
await message.answer("❌ Пожалуйста, отправьте текстовое описание")
return
description = message.text.strip()
data = await state.get_data()
emoji_text = data.get("emoji_text")
emoji_id = data.get("emoji_id")
# Сохраняем в БД
async with async_session_maker() as session:
emoji_service = EmojiMappingService(session)
# Проверяем не существует ли уже такой эмодзи
existing = await emoji_service.get_emoji_by_text(emoji_text, message.from_user.id)
if existing:
await message.answer(
f"⚠️ Вы уже зарегистрировали этот эмодзи: {emoji_text}\n"
f"Описание: <code>{existing.description}</code>",
parse_mode="HTML"
)
await state.clear()
return
try:
emoji_mapping = await emoji_service.register_emoji(
emoji_text=emoji_text,
emoji_id=emoji_id,
admin_id=message.from_user.id,
description=description
)
await message.answer(
f"✅ <b>Эмодзи успешно зарегистрировано!</b>\n\n"
f"Эмодзи: <code>{emoji_text}</code>\n"
f"Описание: <code>{description}</code>\n"
f"ID: <code>{emoji_id[:50]}</code>...\n\n"
f"Теперь это эмодзи будет автоматически использоваться в сообщениях бота.",
parse_mode="HTML"
)
except Exception as e:
logger.error(f"Error registering emoji: {e}")
await message.answer(
f"❌ Ошибка при сохранении эмодзи: {str(e)}",
parse_mode="HTML"
)
await state.clear()
@router.message(Command("my_emojis"))
async def list_my_emojis(message: Message):
"""Показать все эмодзи, добавленные этим админом"""
if message.from_user.id not in ADMIN_IDS:
await message.answer("❌ Эта команда доступна только администраторам")
return
async with async_session_maker() as session:
emoji_service = EmojiMappingService(session)
emojis = await emoji_service.get_all_emoji_by_admin(message.from_user.id)
if not emojis:
await message.answer(
"📭 Вы еще не добавили ни один эмодзи.\n\n"
"Используйте /add_emoji чтобы добавить новый эмодзи"
)
return
text = "🎨 <b>Ваши зарегистрированные эмодзи:</b>\n\n"
for emoji in emojis:
text += (
f"<code>{emoji.emoji_text}</code> — {emoji.description}\n"
f" ID: <code>{emoji.emoji_id[:30]}</code>...\n"
f" Добавлено: <code>{emoji.created_at.strftime('%d.%m.%Y %H:%M')}</code>\n\n"
)
await message.answer(text, parse_mode="HTML")
@router.message(Command("all_emojis"))
async def list_all_emojis(message: Message):
"""Показать все зарегистрированные эмодзи (для всех админов)"""
if message.from_user.id not in ADMIN_IDS:
await message.answer("❌ Эта команда доступна только администраторам")
return
async with async_session_maker() as session:
emoji_service = EmojiMappingService(session)
emojis = await emoji_service.get_all_emojis()
if not emojis:
await message.answer(
"📭 Нет зарегистрированных эмодзи в системе"
)
return
text = "🎨 <b>Все зарегистрированные эмодзи в системе:</b>\n\n"
for emoji in emojis:
text += (
f"<code>{emoji.emoji_text}</code> — {emoji.description}\n"
f" Админ: <code>{emoji.admin.first_name or 'Unknown'}</code> "
f"(ID: {emoji.admin_id})\n"
f" Добавлено: <code>{emoji.created_at.strftime('%d.%m.%Y %H:%M')}</code>\n\n"
)
await message.answer(text, parse_mode="HTML")
@router.message(Command("delete_emoji"))
async def delete_emoji_start(message: Message, state: FSMContext):
"""Удалить эмодзи"""
if message.from_user.id not in ADMIN_IDS:
await message.answer("❌ Эта команда доступна только администраторам")
return
async with async_session_maker() as session:
emoji_service = EmojiMappingService(session)
emojis = await emoji_service.get_all_emoji_by_admin(message.from_user.id)
if not emojis:
await message.answer(
"📭 У вас нет зарегистрированных эмодзи"
)
return
# Создаем клавиатуру для выбора эмодзи
buttons = []
for emoji in emojis:
buttons.append([
InlineKeyboardButton(
text=f"{emoji.emoji_text} ({emoji.description})",
callback_data=f"delete_emoji_{emoji.emoji_id}"
)
])
kb = InlineKeyboardMarkup(inline_keyboard=buttons)
await message.answer(
"🗑️ Выберите эмодзи для удаления:",
reply_markup=kb
)
@router.callback_query(F.data.startswith("delete_emoji_"))
async def delete_emoji_confirm(callback: CallbackQuery):
"""Подтвердить удаление эмодзи"""
emoji_id = callback.data.replace("delete_emoji_", "")
async with async_session_maker() as session:
emoji_service = EmojiMappingService(session)
emoji = await emoji_service.get_emoji_by_id(emoji_id)
if not emoji:
await callback.answer("❌ Эмодзи не найден", show_alert=True)
return
if emoji.admin_id != callback.from_user.id and callback.from_user.id not in ADMIN_IDS:
await callback.answer("❌ Вы не можете удалить эмодзи другого админа", show_alert=True)
return
success = await emoji_service.delete_emoji(emoji_id)
if success:
await callback.answer(
f"✅ Эмодзи <code>{emoji.emoji_text}</code> удалено",
show_alert=True
)
await callback.message.delete()
else:
await callback.answer("❌ Ошибка при удалении эмодзи", show_alert=True)

View File

@@ -30,6 +30,35 @@ def is_admin(user_id: int) -> bool:
return user_id in ADMIN_IDS return user_id in ADMIN_IDS
def format_sender_name(user: User, is_current_user: bool = False, current_user_is_admin: bool = False) -> str:
"""
Форматирует имя отправителя для отображения в чате
Args:
user: Объект пользователя
is_current_user: Текущий ли это пользователь
current_user_is_admin: Админ ли текущий пользователь
Returns:
Отформатированное имя
"""
if is_current_user:
return "🔵 Вы"
# Если это администратор и текущий пользователь не админ - показываем "Админ"
if user.is_admin and not current_user_is_admin:
return "🔵 Админ"
# Формируем базовое имя (используем nickname из профиля)
name = user.nickname or user.first_name or f"@{user.username}" or "Unknown"
# Добавляем информацию о карте если пользователь админ и текущий юзер админ
if current_user_is_admin and user.club_card_number:
name += f" (карта: {user.club_card_number})"
return f"🔵 {name}"
@router.message(CaseInsensitiveCommand("chat")) @router.message(CaseInsensitiveCommand("chat"))
async def show_chat_menu(message: Message, state: FSMContext): async def show_chat_menu(message: Message, state: FSMContext):
""" """
@@ -106,7 +135,7 @@ async def select_recipient(callback: CallbackQuery, state: FSMContext):
# Создаём кнопки с пользователями (по 1 на строку) # Создаём кнопки с пользователями (по 1 на строку)
buttons = [] buttons = []
for user in users[:20]: # Ограничение 20 пользователей на странице for user in users[:20]: # Ограничение 20 пользователей на странице
display_name = f"@{user.username}" if user.username else user.first_name display_name = user.nickname or f"@{user.username}" or user.first_name or "Unknown"
if user.club_card_number: if user.club_card_number:
display_name += f" (карта: {user.club_card_number})" display_name += f" (карта: {user.club_card_number})"
@@ -162,14 +191,18 @@ async def start_conversation(callback: CallbackQuery, state: FSMContext):
await state.update_data(recipient_id=recipient.id, recipient_telegram_id=recipient.telegram_id) await state.update_data(recipient_id=recipient.id, recipient_telegram_id=recipient.telegram_id)
await state.set_state(P2PChatStates.chatting) await state.set_state(P2PChatStates.chatting)
recipient_name = f"@{recipient.username}" if recipient.username else recipient.first_name recipient_name = recipient.nickname or f"@{recipient.username}" or recipient.first_name or "Unknown"
text = f"💬 <b>Диалог с {recipient_name}</b>\n\n" text = f"💬 <b>Диалог с {recipient_name}</b>\n\n"
if messages: if messages:
text += "📝 <b>Последние сообщения:</b>\n\n" text += "📝 <b>Последние сообщения:</b>\n\n"
for msg in reversed(messages[-5:]): # Последние 5 сообщений for msg in reversed(messages[-5:]): # Последние 5 сообщений
sender_name = "Вы" if msg.sender_id == sender.id else recipient_name # Определяем имя отправителя
is_current = msg.sender_id == sender.id
user_for_display = sender if is_current else recipient
sender_name = format_sender_name(user_for_display, is_current, is_admin(sender.telegram_id))
msg_text = msg.text[:50] + "..." if msg.text and len(msg.text) > 50 else (msg.text or f"[{msg.message_type}]") msg_text = msg.text[:50] + "..." if msg.text and len(msg.text) > 50 else (msg.text or f"[{msg.message_type}]")
text += f"{sender_name}: {msg_text}\n" text += f"{sender_name}: {msg_text}\n"
text += "\n" text += "\n"
@@ -204,7 +237,7 @@ async def show_conversations(callback: CallbackQuery):
last_name=callback.from_user.last_name last_name=callback.from_user.last_name
) )
conversations = await P2PMessageService.get_recent_conversations(session, user.id, limit=10) conversations = await P2PMessageService.get_recent_conversations(session, sender.id, limit=10)
if not conversations: if not conversations:
await callback.message.edit_text( await callback.message.edit_text(
@@ -217,7 +250,7 @@ async def show_conversations(callback: CallbackQuery):
buttons = [] buttons = []
for peer, last_msg, unread in conversations: for peer, last_msg, unread in conversations:
peer_name = f"@{peer.username}" if peer.username else peer.first_name peer_name = peer.nickname or f"@{peer.username}" or peer.first_name or "Unknown"
# Иконка в зависимости от непрочитанных # Иконка в зависимости от непрочитанных
icon = "🔴" if unread > 0 else "💬" icon = "🔴" if unread > 0 else "💬"
@@ -234,7 +267,11 @@ async def show_conversations(callback: CallbackQuery):
callback_data=f"p2p:user:{peer.id}" callback_data=f"p2p:user:{peer.id}"
)]) )])
text += f"{icon} <b>{peer_name}</b>\n" text += f"{icon} <b>{peer_name}</b>"
# Показываем номер карты если есть
if peer.club_card_number:
text += f" (карта: {peer.club_card_number})"
text += "\n"
text += f" {preview}\n" text += f" {preview}\n"
if unread > 0: if unread > 0:
text += f" 📨 Непрочитанных: {unread}\n" text += f" 📨 Непрочитанных: {unread}\n"
@@ -268,12 +305,53 @@ async def end_conversation(callback: CallbackQuery, state: FSMContext):
async def back_to_menu(callback: CallbackQuery, state: FSMContext): async def back_to_menu(callback: CallbackQuery, state: FSMContext):
"""Вернуться в главное меню""" """Вернуться в главное меню"""
await callback.answer() await callback.answer()
await state.clear()
# Имитируем команду /chat async with async_session_maker() as session:
fake_message = callback.message user = await UserService.get_or_create_user(
fake_message.from_user = callback.from_user session,
callback.from_user.id,
username=callback.from_user.username,
first_name=callback.from_user.first_name,
last_name=callback.from_user.last_name
)
await show_chat_menu(fake_message, state) if not user:
await callback.message.edit_text("❌ Вы не зарегистрированы. Используйте /start")
return
# Получаем количество непрочитанных сообщений
unread_count = await P2PMessageService.get_unread_count(session, user.id)
# Получаем последние диалоги
recent = await P2PMessageService.get_recent_conversations(session, user.id, limit=5)
text = "💬 <b>Чат</b>\n\n"
if unread_count > 0:
text += f"📨 У вас <b>{unread_count}</b> непрочитанных сообщений\n\n"
text += "Выберите действие:"
buttons = [
[InlineKeyboardButton(
text="✉️ Написать пользователю",
callback_data="p2p:select_user"
)],
[InlineKeyboardButton(
text="📋 Мои диалоги",
callback_data="p2p:my_conversations"
)]
]
if recent:
text += "\n\n<b>Последние диалоги:</b>\n"
for peer, last_msg, unread in recent:
unread_badge = f" ({unread})" if unread > 0 else ""
text += f" • @{peer.username or peer.first_name}{unread_badge}\n"
kb = InlineKeyboardMarkup(inline_keyboard=buttons)
await callback.message.edit_text(text, reply_markup=kb, parse_mode="HTML")
# Обработчик сообщений в состоянии chatting # Обработчик сообщений в состоянии chatting
@@ -301,7 +379,19 @@ async def handle_p2p_message(message: Message, state: FSMContext):
first_name=message.from_user.first_name, first_name=message.from_user.first_name,
last_name=message.from_user.last_name last_name=message.from_user.last_name
) )
sender_name = f"@{sender.username}" if sender.username else sender.first_name
# Получаем информацию о получателе для определения как подписать сообщение
recipient = await UserService.get_user_by_telegram_id(session, recipient_telegram_id)
# Формируем подпись сообщения для получателя
if sender.is_admin:
sender_name = "АДМИН"
else:
sender_name = sender.nickname or f"@{sender.username}" or sender.first_name or "Unknown"
# Добавляем карту если получатель админ
if recipient and recipient.is_admin and sender.club_card_number:
sender_name += f" (карта: {sender.club_card_number})"
# Определяем тип сообщения # Определяем тип сообщения
message_type = "text" message_type = "text"
@@ -326,28 +416,28 @@ async def handle_p2p_message(message: Message, state: FSMContext):
if message_type == "text": if message_type == "text":
sent = await message.bot.send_message( sent = await message.bot.send_message(
recipient_telegram_id, recipient_telegram_id,
f"💬 <b>Сообщение от {sender_name}:</b>\n\n{text}", f"<b>{sender_name}</b>\n\n{text}",
parse_mode="HTML" parse_mode="HTML"
) )
elif message_type == "photo": elif message_type == "photo":
sent = await message.bot.send_photo( sent = await message.bot.send_photo(
recipient_telegram_id, recipient_telegram_id,
photo=file_id, photo=file_id,
caption=f"💬 <b>Фото от {sender_name}</b>\n\n{text or ''}" , caption=f"<b>{sender_name}</b>\n\n{text or ''}" ,
parse_mode="HTML" parse_mode="HTML"
) )
elif message_type == "video": elif message_type == "video":
sent = await message.bot.send_video( sent = await message.bot.send_video(
recipient_telegram_id, recipient_telegram_id,
video=file_id, video=file_id,
caption=f"💬 <b>Видео от {sender_name}</b>\n\n{text or ''}", caption=f"<b>{sender_name}</b>\n\n{text or ''}",
parse_mode="HTML" parse_mode="HTML"
) )
elif message_type == "document": elif message_type == "document":
sent = await message.bot.send_document( sent = await message.bot.send_document(
recipient_telegram_id, recipient_telegram_id,
document=file_id, document=file_id,
caption=f"💬 <b>Документ от {sender_name}</b>\n\n{text or ''}", caption=f"<b>{sender_name}</b>\n\n{text or ''}",
parse_mode="HTML" parse_mode="HTML"
) )