feat: добавлен P2P чат между пользователями

- Новая модель P2PMessage для хранения личных сообщений
- Миграция 008_add_p2p_messages.py
- Сервис P2PMessageService для работы с P2P сообщениями
- Команда /chat с меню чата
- Выбор пользователя из списка
- Отправка текста, фото, видео, документов
- История последних диалогов
- Счетчик непрочитанных сообщений
- FSM состояния для управления диалогами
This commit is contained in:
2025-11-17 11:11:33 +09:00
parent e882601b85
commit 9dbf90aca9
6 changed files with 681 additions and 2 deletions

View File

@@ -1 +1 @@
966528
969266

View File

@@ -23,6 +23,7 @@ from src.handlers.chat_handlers import router as chat_router
from src.handlers.admin_chat_handlers import router as admin_chat_router
from src.handlers.account_handlers import account_router
from src.handlers.message_management import message_admin_router
from src.handlers.p2p_chat import router as p2p_chat_router
# Настройка логирования
logging.basicConfig(
@@ -116,6 +117,7 @@ async def main():
dp.include_router(admin_account_router) # Админские команды счетов
dp.include_router(admin_chat_router) # Админские команды чата
dp.include_router(redraw_router) # Повторные розыгрыши
dp.include_router(p2p_chat_router) # P2P чат между пользователями
dp.include_router(account_router) # Пользовательские счета
# 3. Chat router ПОСЛЕДНИМ (ловит все необработанные сообщения)

View File

@@ -0,0 +1,53 @@
"""add p2p messages table
Revision ID: 008
Revises: 007
Create Date: 2025-11-17
"""
from alembic import op
import sqlalchemy as sa
from sqlalchemy.dialects import postgresql
# revision identifiers
revision = '008'
down_revision = '007'
branch_labels = None
depends_on = None
def upgrade() -> None:
# Создаём таблицу P2P сообщений
op.create_table(
'p2p_messages',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('sender_id', sa.Integer(), nullable=False),
sa.Column('recipient_id', sa.Integer(), nullable=False),
sa.Column('message_type', sa.String(length=20), nullable=False),
sa.Column('text', sa.Text(), nullable=True),
sa.Column('file_id', sa.String(length=255), nullable=True),
sa.Column('sender_message_id', sa.Integer(), nullable=False),
sa.Column('recipient_message_id', sa.Integer(), nullable=True),
sa.Column('is_read', sa.Boolean(), nullable=False, server_default='false'),
sa.Column('read_at', sa.DateTime(timezone=True), nullable=True),
sa.Column('reply_to_id', sa.Integer(), nullable=True),
sa.Column('created_at', sa.DateTime(timezone=True), nullable=False),
sa.ForeignKeyConstraint(['sender_id'], ['users.id'], ),
sa.ForeignKeyConstraint(['recipient_id'], ['users.id'], ),
sa.ForeignKeyConstraint(['reply_to_id'], ['p2p_messages.id'], ),
sa.PrimaryKeyConstraint('id')
)
# Создаём индексы для быстрого поиска
op.create_index('ix_p2p_messages_sender_id', 'p2p_messages', ['sender_id'])
op.create_index('ix_p2p_messages_recipient_id', 'p2p_messages', ['recipient_id'])
op.create_index('ix_p2p_messages_is_read', 'p2p_messages', ['is_read'])
op.create_index('ix_p2p_messages_created_at', 'p2p_messages', ['created_at'])
def downgrade() -> None:
op.drop_index('ix_p2p_messages_created_at', 'p2p_messages')
op.drop_index('ix_p2p_messages_is_read', 'p2p_messages')
op.drop_index('ix_p2p_messages_recipient_id', 'p2p_messages')
op.drop_index('ix_p2p_messages_sender_id', 'p2p_messages')
op.drop_table('p2p_messages')

View File

@@ -215,4 +215,30 @@ class ChatMessage(Base):
moderator = relationship("User", foreign_keys=[deleted_by])
def __repr__(self):
return f"<ChatMessage(id={self.id}, user_id={self.user_id}, type={self.message_type})>"
return f"<ChatMessage(id={self.id}, user_id={self.user_id}, type={self.message_type})>"
class P2PMessage(Base):
"""P2P сообщения между пользователями"""
__tablename__ = "p2p_messages"
id = Column(Integer, primary_key=True)
sender_id = Column(Integer, ForeignKey("users.id"), nullable=False, index=True)
recipient_id = Column(Integer, ForeignKey("users.id"), nullable=False, index=True)
message_type = Column(String(20), nullable=False) # text, photo, video, etc.
text = Column(Text, nullable=True)
file_id = Column(String(255), nullable=True)
sender_message_id = Column(Integer, nullable=False) # ID сообщения у отправителя
recipient_message_id = Column(Integer, nullable=True) # ID сообщения у получателя
is_read = Column(Boolean, default=False, index=True)
read_at = Column(DateTime(timezone=True), nullable=True)
reply_to_id = Column(Integer, ForeignKey("p2p_messages.id"), nullable=True) # Ответ на сообщение
created_at = Column(DateTime(timezone=True), default=lambda: datetime.now(timezone.utc), index=True)
# Связи
sender = relationship("User", foreign_keys=[sender_id], backref="sent_p2p_messages")
recipient = relationship("User", foreign_keys=[recipient_id], backref="received_p2p_messages")
reply_to = relationship("P2PMessage", remote_side=[id], backref="replies")
def __repr__(self):
return f"<P2PMessage(id={self.id}, from={self.sender_id}, to={self.recipient_id})>"

263
src/core/p2p_services.py Normal file
View File

@@ -0,0 +1,263 @@
"""Сервисы для работы с P2P сообщениями"""
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, and_, or_, desc, func
from sqlalchemy.orm import selectinload
from typing import List, Optional, Tuple
from datetime import datetime, timezone
from .models import P2PMessage, User
class P2PMessageService:
"""Сервис для работы с P2P сообщениями"""
@staticmethod
async def send_message(
session: AsyncSession,
sender_id: int,
recipient_id: int,
message_type: str,
sender_message_id: int,
recipient_message_id: Optional[int] = None,
text: Optional[str] = None,
file_id: Optional[str] = None,
reply_to_id: Optional[int] = None
) -> P2PMessage:
"""
Сохранить отправленное P2P сообщение
Args:
session: Сессия БД
sender_id: ID отправителя
recipient_id: ID получателя
message_type: Тип сообщения (text, photo, etc.)
sender_message_id: ID сообщения у отправителя
recipient_message_id: ID сообщения у получателя
text: Текст сообщения
file_id: ID файла
reply_to_id: ID сообщения, на которое отвечают
Returns:
P2PMessage
"""
message = P2PMessage(
sender_id=sender_id,
recipient_id=recipient_id,
message_type=message_type,
text=text,
file_id=file_id,
sender_message_id=sender_message_id,
recipient_message_id=recipient_message_id,
reply_to_id=reply_to_id,
created_at=datetime.now(timezone.utc)
)
session.add(message)
await session.commit()
await session.refresh(message)
return message
@staticmethod
async def mark_as_read(session: AsyncSession, message_id: int) -> bool:
"""
Отметить сообщение как прочитанное
Args:
session: Сессия БД
message_id: ID сообщения
Returns:
bool: True если успешно
"""
result = await session.execute(
select(P2PMessage).where(P2PMessage.id == message_id)
)
message = result.scalar_one_or_none()
if message and not message.is_read:
message.is_read = True
message.read_at = datetime.now(timezone.utc)
await session.commit()
return True
return False
@staticmethod
async def get_conversation(
session: AsyncSession,
user1_id: int,
user2_id: int,
limit: int = 50,
offset: int = 0
) -> List[P2PMessage]:
"""
Получить переписку между двумя пользователями
Args:
session: Сессия БД
user1_id: ID первого пользователя
user2_id: ID второго пользователя
limit: Максимальное количество сообщений
offset: Смещение для пагинации
Returns:
List[P2PMessage]: Список сообщений (от новых к старым)
"""
result = await session.execute(
select(P2PMessage)
.options(selectinload(P2PMessage.sender), selectinload(P2PMessage.recipient))
.where(
or_(
and_(P2PMessage.sender_id == user1_id, P2PMessage.recipient_id == user2_id),
and_(P2PMessage.sender_id == user2_id, P2PMessage.recipient_id == user1_id)
)
)
.order_by(desc(P2PMessage.created_at))
.limit(limit)
.offset(offset)
)
return list(result.scalars().all())
@staticmethod
async def get_unread_count(session: AsyncSession, user_id: int) -> int:
"""
Получить количество непрочитанных сообщений пользователя
Args:
session: Сессия БД
user_id: ID пользователя
Returns:
int: Количество непрочитанных сообщений
"""
result = await session.execute(
select(func.count(P2PMessage.id))
.where(
and_(
P2PMessage.recipient_id == user_id,
P2PMessage.is_read == False
)
)
)
return result.scalar() or 0
@staticmethod
async def get_recent_conversations(
session: AsyncSession,
user_id: int,
limit: int = 10
) -> List[Tuple[User, P2PMessage, int]]:
"""
Получить список последних диалогов пользователя
Args:
session: Сессия БД
user_id: ID пользователя
limit: Максимальное количество диалогов
Returns:
List[Tuple[User, P2PMessage, int]]: Список (собеседник, последнее_сообщение, непрочитанных)
"""
# Получаем все ID собеседников
result = await session.execute(
select(P2PMessage.sender_id, P2PMessage.recipient_id)
.where(
or_(
P2PMessage.sender_id == user_id,
P2PMessage.recipient_id == user_id
)
)
)
# Собираем уникальных собеседников
peers = set()
for sender_id, recipient_id in result.all():
peer_id = recipient_id if sender_id == user_id else sender_id
peers.add(peer_id)
# Для каждого собеседника получаем последнее сообщение и количество непрочитанных
conversations = []
for peer_id in peers:
# Последнее сообщение
last_msg_result = await session.execute(
select(P2PMessage)
.where(
or_(
and_(P2PMessage.sender_id == user_id, P2PMessage.recipient_id == peer_id),
and_(P2PMessage.sender_id == peer_id, P2PMessage.recipient_id == user_id)
)
)
.order_by(desc(P2PMessage.created_at))
.limit(1)
)
last_message = last_msg_result.scalar_one_or_none()
if not last_message:
continue
# Количество непрочитанных от этого собеседника
unread_result = await session.execute(
select(func.count(P2PMessage.id))
.where(
and_(
P2PMessage.sender_id == peer_id,
P2PMessage.recipient_id == user_id,
P2PMessage.is_read == False
)
)
)
unread_count = unread_result.scalar() or 0
# Получаем пользователя-собеседника
peer_result = await session.execute(
select(User).where(User.id == peer_id)
)
peer = peer_result.scalar_one_or_none()
if peer:
conversations.append((peer, last_message, unread_count))
# Сортируем по времени последнего сообщения
conversations.sort(key=lambda x: x[1].created_at, reverse=True)
return conversations[:limit]
@staticmethod
async def find_original_message(
session: AsyncSession,
telegram_message_id: int,
user_id: int
) -> Optional[P2PMessage]:
"""
Найти оригинальное P2P сообщение по telegram_message_id
Args:
session: Сессия БД
telegram_message_id: ID сообщения в Telegram
user_id: ID пользователя (для проверки прав)
Returns:
Optional[P2PMessage]: Найденное сообщение или None
"""
result = await session.execute(
select(P2PMessage)
.options(selectinload(P2PMessage.sender), selectinload(P2PMessage.recipient))
.where(
or_(
and_(
P2PMessage.sender_message_id == telegram_message_id,
P2PMessage.sender_id == user_id
),
and_(
P2PMessage.recipient_message_id == telegram_message_id,
P2PMessage.recipient_id == user_id
)
)
)
)
return result.scalar_one_or_none()

335
src/handlers/p2p_chat.py Normal file
View File

@@ -0,0 +1,335 @@
"""Обработчики P2P чата между пользователями"""
from aiogram import Router, F
from aiogram.filters import Command, StateFilter
from aiogram.types import Message, CallbackQuery, InlineKeyboardMarkup, InlineKeyboardButton
from aiogram.fsm.context import FSMContext
from aiogram.fsm.state import State, StatesGroup
from sqlalchemy.ext.asyncio import AsyncSession
from typing import Optional
from src.core.p2p_services import P2PMessageService
from src.core.services import UserService
from src.core.models import User
from src.core.database import async_session_maker
from src.core.config import ADMIN_IDS
router = Router(name='p2p_chat_router')
class P2PChatStates(StatesGroup):
"""Состояния для P2P чата"""
waiting_for_recipient = State() # Ожидание выбора получателя
chatting = State() # В процессе переписки с пользователем
def is_admin(user_id: int) -> bool:
"""Проверка прав администратора"""
return user_id in ADMIN_IDS
@router.message(Command("chat"))
async def show_chat_menu(message: Message, state: FSMContext):
"""
Главное меню чата
/chat - показать меню с опциями общения
"""
async with async_session_maker() as session:
user = await UserService.get_user_by_telegram_id(session, message.from_user.id)
if not user:
await message.answer("❌ Вы не зарегистрированы. Используйте /start")
return
# Получаем количество непрочитанных сообщений
unread_count = await P2PMessageService.get_unread_count(session, user.id)
# Получаем последние диалоги
recent = await P2PMessageService.get_recent_conversations(session, user.id, limit=5)
text = "💬 <b>Чат</b>\n\n"
if unread_count > 0:
text += f"📨 У вас <b>{unread_count}</b> непрочитанных сообщений\n\n"
text += "Выберите действие:"
buttons = [
[InlineKeyboardButton(
text="✉️ Написать пользователю",
callback_data="p2p:select_user"
)],
[InlineKeyboardButton(
text="📋 Мои диалоги",
callback_data="p2p:my_conversations"
)]
]
if is_admin(message.from_user.id):
buttons.append([InlineKeyboardButton(
text="📢 Написать всем (broadcast)",
callback_data="p2p:broadcast"
)])
await message.answer(
text,
reply_markup=InlineKeyboardMarkup(inline_keyboard=buttons),
parse_mode="HTML"
)
@router.callback_query(F.data == "p2p:select_user")
async def select_recipient(callback: CallbackQuery, state: FSMContext):
"""Выбор получателя для P2P сообщения"""
await callback.answer()
async with async_session_maker() as session:
# Получаем всех зарегистрированных пользователей кроме себя
users = await UserService.get_all_users(session)
users = [u for u in users if u.telegram_id != callback.from_user.id and u.is_registered]
if not users:
await callback.message.edit_text("❌ Нет доступных пользователей для общения")
return
# Создаём кнопки с пользователями (по 1 на строку)
buttons = []
for user in users[:20]: # Ограничение 20 пользователей на странице
display_name = f"@{user.username}" if user.username else user.first_name
if user.club_card_number:
display_name += f" (карта: {user.club_card_number})"
buttons.append([InlineKeyboardButton(
text=display_name,
callback_data=f"p2p:user:{user.id}"
)])
buttons.append([InlineKeyboardButton(
text="« Назад",
callback_data="p2p:back_to_menu"
)])
await callback.message.edit_text(
"👥 <b>Выберите пользователя:</b>\n\n"
"Кликните на пользователя, чтобы начать диалог",
reply_markup=InlineKeyboardMarkup(inline_keyboard=buttons),
parse_mode="HTML"
)
@router.callback_query(F.data.startswith("p2p:user:"))
async def start_conversation(callback: CallbackQuery, state: FSMContext):
"""Начать диалог с выбранным пользователем"""
await callback.answer()
user_id = int(callback.data.split(":")[2])
async with async_session_maker() as session:
recipient = await session.get(User, user_id)
if not recipient:
await callback.message.edit_text("❌ Пользователь не найден")
return
sender = await UserService.get_user_by_telegram_id(session, callback.from_user.id)
# Получаем последние 10 сообщений из диалога
messages = await P2PMessageService.get_conversation(
session,
sender.id,
recipient.id,
limit=10
)
# Сохраняем ID получателя в состоянии
await state.update_data(recipient_id=recipient.id, recipient_telegram_id=recipient.telegram_id)
await state.set_state(P2PChatStates.chatting)
recipient_name = f"@{recipient.username}" if recipient.username else recipient.first_name
text = f"💬 <b>Диалог с {recipient_name}</b>\n\n"
if messages:
text += "📝 <b>Последние сообщения:</b>\n\n"
for msg in reversed(messages[-5:]): # Последние 5 сообщений
sender_name = "Вы" if msg.sender_id == sender.id else recipient_name
msg_text = msg.text[:50] + "..." if msg.text and len(msg.text) > 50 else (msg.text or f"[{msg.message_type}]")
text += f"{sender_name}: {msg_text}\n"
text += "\n"
text += "✍️ Отправьте сообщение (текст, фото, видео...)\n"
text += "Для выхода нажмите кнопку ниже"
buttons = [[InlineKeyboardButton(
text="« Завершить диалог",
callback_data="p2p:end_conversation"
)]]
await callback.message.edit_text(
text,
reply_markup=InlineKeyboardMarkup(inline_keyboard=buttons),
parse_mode="HTML"
)
@router.callback_query(F.data == "p2p:my_conversations")
async def show_conversations(callback: CallbackQuery):
"""Показать список диалогов"""
await callback.answer()
async with async_session_maker() as session:
user = await UserService.get_user_by_telegram_id(session, callback.from_user.id)
conversations = await P2PMessageService.get_recent_conversations(session, user.id, limit=10)
if not conversations:
await callback.message.edit_text(
"📭 У вас пока нет диалогов\n\n"
"Используйте /chat чтобы написать кому-нибудь"
)
return
text = "📋 <b>Ваши диалоги:</b>\n\n"
buttons = []
for peer, last_msg, unread in conversations:
peer_name = f"@{peer.username}" if peer.username else peer.first_name
# Иконка в зависимости от непрочитанных
icon = "🔴" if unread > 0 else "💬"
# Превью последнего сообщения
preview = last_msg.text[:30] + "..." if last_msg.text and len(last_msg.text) > 30 else (last_msg.text or f"[{last_msg.message_type}]")
button_text = f"{icon} {peer_name}"
if unread > 0:
button_text += f" ({unread})"
buttons.append([InlineKeyboardButton(
text=button_text,
callback_data=f"p2p:user:{peer.id}"
)])
text += f"{icon} <b>{peer_name}</b>\n"
text += f" {preview}\n"
if unread > 0:
text += f" 📨 Непрочитанных: {unread}\n"
text += "\n"
buttons.append([InlineKeyboardButton(
text="« Назад",
callback_data="p2p:back_to_menu"
)])
await callback.message.edit_text(
text,
reply_markup=InlineKeyboardMarkup(inline_keyboard=buttons),
parse_mode="HTML"
)
@router.callback_query(F.data == "p2p:end_conversation")
async def end_conversation(callback: CallbackQuery, state: FSMContext):
"""Завершить текущий диалог"""
await callback.answer("Диалог завершён")
await state.clear()
await callback.message.edit_text(
"✅ Диалог завершён\n\n"
"Используйте /chat чтобы открыть меню чата"
)
@router.callback_query(F.data == "p2p:back_to_menu")
async def back_to_menu(callback: CallbackQuery):
"""Вернуться в главное меню"""
await callback.answer()
# Имитируем команду /chat
fake_message = callback.message
fake_message.from_user = callback.from_user
await show_chat_menu(fake_message, None)
# Обработчик сообщений в состоянии chatting
@router.message(StateFilter(P2PChatStates.chatting), F.text | F.photo | F.video | F.document)
async def handle_p2p_message(message: Message, state: FSMContext):
"""Обработка P2P сообщения от пользователя"""
data = await state.get_data()
recipient_id = data.get("recipient_id")
recipient_telegram_id = data.get("recipient_telegram_id")
if not recipient_id or not recipient_telegram_id:
await message.answer("❌ Ошибка: получатель не найден. Начните диалог заново с /chat")
await state.clear()
return
async with async_session_maker() as session:
sender = await UserService.get_user_by_telegram_id(session, message.from_user.id)
sender_name = f"@{sender.username}" if sender.username else sender.first_name
# Определяем тип сообщения
message_type = "text"
text = message.text
file_id = None
if message.photo:
message_type = "photo"
file_id = message.photo[-1].file_id
text = message.caption
elif message.video:
message_type = "video"
file_id = message.video.file_id
text = message.caption
elif message.document:
message_type = "document"
file_id = message.document.file_id
text = message.caption
# Отправляем сообщение получателю
try:
if message_type == "text":
sent = await message.bot.send_message(
recipient_telegram_id,
f"💬 <b>Сообщение от {sender_name}:</b>\n\n{text}",
parse_mode="HTML"
)
elif message_type == "photo":
sent = await message.bot.send_photo(
recipient_telegram_id,
photo=file_id,
caption=f"💬 <b>Фото от {sender_name}</b>\n\n{text or ''}" ,
parse_mode="HTML"
)
elif message_type == "video":
sent = await message.bot.send_video(
recipient_telegram_id,
video=file_id,
caption=f"💬 <b>Видео от {sender_name}</b>\n\n{text or ''}",
parse_mode="HTML"
)
elif message_type == "document":
sent = await message.bot.send_document(
recipient_telegram_id,
document=file_id,
caption=f"💬 <b>Документ от {sender_name}</b>\n\n{text or ''}",
parse_mode="HTML"
)
# Сохраняем в БД
await P2PMessageService.send_message(
session,
sender_id=sender.id,
recipient_id=recipient_id,
message_type=message_type,
text=text,
file_id=file_id,
sender_message_id=message.message_id,
recipient_message_id=sent.message_id
)
await message.answer("✅ Сообщение доставлено")
except Exception as e:
await message.answer(f"Не удалось доставить сообщение: {e}")