Files
new_lottery_bot/src/core/services.py
Andrew K. Choi 8e692d2f61
All checks were successful
continuous-integration/drone/push Build is passing
feat: добавлено управление сообщениями пользователей в админ-панель
- Добавлена кнопка 'Сообщения пользователей' в админ меню
- Реализован просмотр последних сообщений с фильтрацией
- Возможность просмотра медиа (фото, видео) прямо в боте
- Функция удаления сообщений администратором
- Удаление происходит как в БД, так и у пользователей в Telegram
- Просмотр всех сообщений конкретного пользователя
- Добавлены методы в ChatMessageService и UserService
- Метод get_user_messages_all для получения всех сообщений
- Метод mark_as_deleted для пометки сообщений как удаленных
- Метод count_messages для подсчета количества сообщений
- Метод get_user_by_id в UserService
2025-11-22 19:46:38 +09:00

803 lines
35 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, update, delete
from sqlalchemy.orm import selectinload
from .models import User, Lottery, Participation, Winner, Account
from typing import List, Optional, Dict, Any
from ..utils.account_utils import validate_account_number, format_account_number
import random
class UserService:
"""Сервис для работы с пользователями"""
@staticmethod
async def get_or_create_user(session: AsyncSession, telegram_id: int,
username: str = None, first_name: str = None,
last_name: str = None) -> User:
"""Получить или создать пользователя"""
# Пробуем найти существующего пользователя
result = await session.execute(
select(User).where(User.telegram_id == telegram_id)
)
user = result.scalar_one_or_none()
if user:
# Обновляем информацию о пользователе
user.username = username
user.first_name = first_name
user.last_name = last_name
await session.commit()
return user
# Создаем нового пользователя
user = User(
telegram_id=telegram_id,
username=username,
first_name=first_name,
last_name=last_name
)
session.add(user)
await session.commit()
await session.refresh(user)
return user
@staticmethod
async def get_user_by_telegram_id(session: AsyncSession, telegram_id: int) -> Optional[User]:
"""Получить пользователя по Telegram ID"""
result = await session.execute(
select(User).where(User.telegram_id == telegram_id)
)
return result.scalar_one_or_none()
@staticmethod
async def get_user_by_id(session: AsyncSession, user_id: int) -> Optional[User]:
"""Получить пользователя по ID"""
result = await session.execute(select(User).where(User.id == user_id))
return result.scalar_one_or_none()
@staticmethod
async def get_user_by_username(session: AsyncSession, username: str) -> Optional[User]:
"""Получить пользователя по username"""
result = await session.execute(select(User).where(User.username == username))
return result.scalar_one_or_none()
@staticmethod
async def get_all_users(session: AsyncSession, limit: int = None, offset: int = 0) -> List[User]:
"""Получить всех пользователей"""
query = select(User).order_by(User.created_at.desc())
if limit:
query = query.offset(offset).limit(limit)
result = await session.execute(query)
return result.scalars().all()
@staticmethod
async def search_users(session: AsyncSession, search_term: str, limit: int = 20) -> List[User]:
"""Поиск пользователей по имени или username"""
from sqlalchemy import or_, func
search_pattern = f"%{search_term.lower()}%"
result = await session.execute(
select(User).where(
or_(
func.lower(User.first_name).contains(search_pattern),
func.lower(User.last_name).contains(search_pattern) if User.last_name else False,
func.lower(User.username).contains(search_pattern) if User.username else False
)
).limit(limit)
)
return result.scalars().all()
@staticmethod
async def delete_user(session: AsyncSession, user_id: int) -> bool:
"""Удалить пользователя и все связанные данные"""
user = await session.get(User, user_id)
if not user:
return False
# Удаляем все участия
await session.execute(
delete(Participation).where(Participation.user_id == user_id)
)
# Удаляем все победы
await session.execute(
delete(Winner).where(Winner.user_id == user_id)
)
# Удаляем пользователя
await session.delete(user)
await session.commit()
return True
@staticmethod
async def set_admin(session: AsyncSession, telegram_id: int, is_admin: bool = True) -> bool:
"""Установить/снять права администратора"""
result = await session.execute(
update(User)
.where(User.telegram_id == telegram_id)
.values(is_admin=is_admin)
)
await session.commit()
return result.rowcount > 0
@staticmethod
async def set_account_number(session: AsyncSession, telegram_id: int, account_number: str) -> bool:
"""Установить номер клиентского счета пользователю"""
# Валидируем и форматируем номер
formatted_number = format_account_number(account_number)
if not formatted_number:
return False
# Проверяем уникальность номера
existing = await session.execute(
select(User).where(User.account_number == formatted_number)
)
if existing.scalar_one_or_none():
return False # Номер уже занят
# Обновляем пользователя
result = await session.execute(
update(User)
.where(User.telegram_id == telegram_id)
.values(account_number=formatted_number)
)
await session.commit()
return result.rowcount > 0
@staticmethod
async def get_user_by_account(session: AsyncSession, account_number: str) -> Optional[User]:
"""Получить пользователя по номеру счета"""
formatted_number = format_account_number(account_number)
if not formatted_number:
return None
@staticmethod
async def get_user_by_club_card(session: AsyncSession, club_card_number: str) -> Optional[User]:
"""
Получить пользователя по номеру клубной карты
Args:
session: Сессия БД
club_card_number: Номер клубной карты (4 цифры)
Returns:
User или None если не найден
"""
result = await session.execute(
select(User).where(User.club_card_number == club_card_number)
)
return result.scalar_one_or_none()
result = await session.execute(
select(User).where(User.account_number == formatted_number)
)
return result.scalar_one_or_none()
@staticmethod
async def search_by_account(session: AsyncSession, account_pattern: str) -> List[User]:
"""Поиск пользователей по части номера счета"""
# Убираем все кроме цифр и дефисов
clean_pattern = ''.join(c for c in account_pattern if c.isdigit() or c == '-')
if not clean_pattern:
return []
result = await session.execute(
select(User).where(
User.account_number.like(f'%{clean_pattern}%')
).limit(20)
)
return result.scalars().all()
class LotteryService:
"""Сервис для работы с розыгрышами"""
@staticmethod
async def create_lottery(session: AsyncSession, title: str, description: str,
prizes: List[str], creator_id: int) -> Lottery:
"""Создать новый розыгрыш"""
lottery = Lottery(
title=title,
description=description,
prizes=prizes,
creator_id=creator_id
)
session.add(lottery)
await session.commit()
await session.refresh(lottery)
return lottery
@staticmethod
async def get_lottery(session: AsyncSession, lottery_id: int) -> Optional[Lottery]:
"""Получить розыгрыш по ID"""
result = await session.execute(
select(Lottery)
.options(selectinload(Lottery.participations).selectinload(Participation.user))
.where(Lottery.id == lottery_id)
)
return result.scalar_one_or_none()
@staticmethod
async def get_active_lotteries(session: AsyncSession, limit: Optional[int] = None) -> List[Lottery]:
"""Получить список активных розыгрышей"""
query = select(Lottery).where(
Lottery.is_active == True,
Lottery.is_completed == False
).order_by(Lottery.created_at.desc())
if limit:
query = query.limit(limit)
result = await session.execute(query)
return result.scalars().all()
@staticmethod
async def update_lottery(
session: AsyncSession,
lottery_id: int,
**updates
) -> bool:
"""Обновить данные розыгрыша"""
try:
await session.execute(
update(Lottery)
.where(Lottery.id == lottery_id)
.values(**updates)
)
await session.commit()
return True
except Exception:
await session.rollback()
return False
@staticmethod
async def get_all_lotteries(session: AsyncSession, limit: Optional[int] = None) -> List[Lottery]:
"""Получить список всех розыгрышей"""
query = select(Lottery).order_by(Lottery.created_at.desc())
if limit:
query = query.limit(limit)
result = await session.execute(query)
return result.scalars().all()
@staticmethod
async def set_manual_winner(session: AsyncSession, lottery_id: int,
place: int, telegram_id: int) -> bool:
"""Установить ручного победителя для определенного места"""
# Получаем пользователя
user = await UserService.get_user_by_telegram_id(session, telegram_id)
if not user:
return False
# Получаем розыгрыш
lottery = await LotteryService.get_lottery(session, lottery_id)
if not lottery:
return False
# Обновляем ручных победителей
if not lottery.manual_winners:
lottery.manual_winners = {}
lottery.manual_winners[str(place)] = telegram_id
await session.commit()
return True
@staticmethod
async def conduct_draw(session: AsyncSession, lottery_id: int) -> Dict[int, Dict[str, Any]]:
"""Провести розыгрыш с учетом ручных победителей"""
import logging
logger = logging.getLogger(__name__)
logger.info(f"conduct_draw: начало для lottery_id={lottery_id}")
lottery = await LotteryService.get_lottery(session, lottery_id)
if not lottery or lottery.is_completed:
logger.warning(f"conduct_draw: lottery не найден или завершён")
return {}
logger.info(f"conduct_draw: получаем участников")
# Получаем всех участников (включая тех, у кого нет user)
participants = []
for p in lottery.participations:
if p.user:
participants.append(p.user)
else:
# Создаем временный объект для участников без пользователя
# Храним только номер счета
participants.append(type('obj', (object,), {
'id': None,
'telegram_id': None,
'account_number': p.account_number
})())
logger.info(f"conduct_draw: участников {len(participants)}")
if not participants:
logger.warning(f"conduct_draw: нет участников")
return {}
# Определяем количество призовых мест
num_prizes = len(lottery.prizes) if lottery.prizes else 1
results = {}
remaining_participants = participants.copy()
manual_winners = lottery.manual_winners or {}
# Сначала обрабатываем ручных победителей
for place in range(1, num_prizes + 1):
place_str = str(place)
if place_str in manual_winners:
# Находим пользователя среди участников
manual_winner = None
for participant in remaining_participants:
if hasattr(participant, 'telegram_id') and participant.telegram_id == manual_winners[place_str]:
manual_winner = participant
break
if manual_winner:
results[place] = {
'user': manual_winner,
'prize': lottery.prizes[place - 1] if lottery.prizes and place <= len(lottery.prizes) else f"Приз {place} места",
'is_manual': True
}
remaining_participants.remove(manual_winner)
# Заполняем оставшиеся места случайными участниками
for place in range(1, num_prizes + 1):
if place not in results and remaining_participants:
winner = random.choice(remaining_participants)
results[place] = {
'user': winner,
'prize': lottery.prizes[place - 1] if lottery.prizes and place <= len(lottery.prizes) else f"Приз {place} места",
'is_manual': False
}
remaining_participants.remove(winner)
# Сохраняем победителей в базу данных
for place, winner_info in results.items():
user_obj = winner_info['user']
winner = Winner(
lottery_id=lottery_id,
user_id=user_obj.id if hasattr(user_obj, 'id') and user_obj.id else None,
account_number=user_obj.account_number if hasattr(user_obj, 'account_number') else None,
place=place,
prize=winner_info['prize'],
is_manual=winner_info['is_manual']
)
session.add(winner)
# Обновляем статус розыгрыша
logger.info(f"conduct_draw: обновляем статус lottery")
lottery.is_completed = True
lottery.draw_results = {}
for place, info in results.items():
user_obj = info['user']
lottery.draw_results[str(place)] = {
'user_id': user_obj.id if hasattr(user_obj, 'id') and user_obj.id else None,
'telegram_id': user_obj.telegram_id if hasattr(user_obj, 'telegram_id') else None,
'username': user_obj.username if hasattr(user_obj, 'username') else None,
'account_number': user_obj.account_number if hasattr(user_obj, 'account_number') else None,
'prize': info['prize'],
'is_manual': info['is_manual']
}
# НЕ коммитим здесь - это должно сделать вызывающая функция
logger.info(f"conduct_draw: изменения подготовлены, победителей: {len(results)}")
return results
@staticmethod
async def get_winners(session: AsyncSession, lottery_id: int) -> List[Winner]:
"""Получить победителей розыгрыша"""
result = await session.execute(
select(Winner)
.options(selectinload(Winner.user))
.where(Winner.lottery_id == lottery_id)
.order_by(Winner.place)
)
return result.scalars().all()
@staticmethod
async def set_winner_display_type(session: AsyncSession, lottery_id: int, display_type: str) -> bool:
"""Установить тип отображения победителей для розыгрыша"""
from ..display.winner_display import validate_display_type
if not validate_display_type(display_type):
return False
result = await session.execute(
update(Lottery)
.where(Lottery.id == lottery_id)
.values(winner_display_type=display_type)
)
await session.commit()
return result.rowcount > 0
@staticmethod
async def set_lottery_active(session: AsyncSession, lottery_id: int, is_active: bool) -> bool:
"""Установить статус активности розыгрыша"""
result = await session.execute(
update(Lottery)
.where(Lottery.id == lottery_id)
.values(is_active=is_active)
)
await session.commit()
return result.rowcount > 0
@staticmethod
async def complete_lottery(session: AsyncSession, lottery_id: int) -> bool:
"""Завершить розыгрыш (сделать неактивным и завершенным)"""
from datetime import datetime
result = await session.execute(
update(Lottery)
.where(Lottery.id == lottery_id)
.values(is_active=False, is_completed=True, end_date=datetime.now())
)
await session.commit()
return result.rowcount > 0
@staticmethod
async def delete_lottery(session: AsyncSession, lottery_id: int) -> bool:
"""Удалить розыгрыш и все связанные данные"""
# Сначала удаляем все связанные данные
# Удаляем победителей
await session.execute(
delete(Winner).where(Winner.lottery_id == lottery_id)
)
# Удаляем участников
await session.execute(
delete(Participation).where(Participation.lottery_id == lottery_id)
)
# Удаляем сам розыгрыш
result = await session.execute(
delete(Lottery).where(Lottery.id == lottery_id)
)
await session.commit()
return result.rowcount > 0
class ParticipationService:
"""Сервис для работы с участием в розыгрышах"""
@staticmethod
async def add_participant(session: AsyncSession, lottery_id: int, user_id: int) -> bool:
"""Добавить участника в розыгрыш"""
# Проверяем, не участвует ли уже пользователь
existing = await session.execute(
select(Participation)
.where(Participation.lottery_id == lottery_id, Participation.user_id == user_id)
)
if existing.scalar_one_or_none():
return False
participation = Participation(lottery_id=lottery_id, user_id=user_id)
session.add(participation)
await session.commit()
return True
@staticmethod
async def remove_participant(session: AsyncSession, lottery_id: int, user_id: int) -> bool:
"""Удалить участника из розыгрыша"""
participation = await session.execute(
select(Participation)
.where(Participation.lottery_id == lottery_id, Participation.user_id == user_id)
)
participation = participation.scalar_one_or_none()
if not participation:
return False
await session.delete(participation)
await session.commit()
return True
@staticmethod
async def get_participants(session: AsyncSession, lottery_id: int, limit: Optional[int] = None, offset: int = 0) -> List[User]:
"""Получить участников розыгрыша"""
query = select(User).join(Participation).where(Participation.lottery_id == lottery_id)
if limit:
query = query.offset(offset).limit(limit)
result = await session.execute(query)
return list(result.scalars().all())
@staticmethod
async def get_user_participations(session: AsyncSession, user_id: int) -> List[Participation]:
"""Получить участие пользователя в розыгрышах"""
result = await session.execute(
select(Participation)
.options(selectinload(Participation.lottery))
.where(Participation.user_id == user_id)
.order_by(Participation.created_at.desc())
)
return list(result.scalars().all())
@staticmethod
async def get_participants_count(session: AsyncSession, lottery_id: int) -> int:
"""Получить количество участников в розыгрыше"""
result = await session.execute(
select(Participation)
.where(Participation.lottery_id == lottery_id)
)
return len(result.scalars().all())
@staticmethod
async def add_participants_bulk(session: AsyncSession, lottery_id: int, telegram_ids: List[int]) -> Dict[str, Any]:
"""Массовое добавление участников"""
results = {
"added": 0,
"skipped": 0,
"errors": [],
"details": []
}
for telegram_id in telegram_ids:
try:
# Проверяем, существует ли пользователь
user = await UserService.get_user_by_telegram_id(session, telegram_id)
if not user:
results["errors"].append(f"Пользователь {telegram_id} не найден")
continue
# Пробуем добавить
if await ParticipationService.add_participant(session, lottery_id, user.id):
results["added"] += 1
results["details"].append(f"Добавлен: {user.first_name} (@{user.username or 'no_username'})")
else:
results["skipped"] += 1
results["details"].append(f"Уже участвует: {user.first_name}")
except Exception as e:
results["errors"].append(f"Ошибка с {telegram_id}: {str(e)}")
return results
@staticmethod
async def remove_participants_bulk(session: AsyncSession, lottery_id: int, telegram_ids: List[int]) -> Dict[str, Any]:
"""Массовое удаление участников"""
results = {
"removed": 0,
"not_found": 0,
"errors": [],
"details": []
}
for telegram_id in telegram_ids:
try:
user = await UserService.get_user_by_telegram_id(session, telegram_id)
if not user:
results["not_found"] += 1
results["details"].append(f"Не найден: {telegram_id}")
continue
if await ParticipationService.remove_participant(session, lottery_id, user.id):
results["removed"] += 1
results["details"].append(f"Удален: {user.first_name}")
else:
results["not_found"] += 1
results["details"].append(f"Не участвовал: {user.first_name}")
except Exception as e:
results["errors"].append(f"Ошибка с {telegram_id}: {str(e)}")
return results
@staticmethod
async def add_participants_by_accounts_bulk(session: AsyncSession, lottery_id: int, account_numbers: List[str]) -> Dict[str, Any]:
"""Массовое добавление участников по номерам счетов"""
import logging
logger = logging.getLogger(__name__)
results = {
"added": 0,
"skipped": 0,
"errors": [],
"details": [],
"invalid_accounts": []
}
for account_input in account_numbers:
account_input = account_input.strip()
if not account_input:
continue
logger.info(f"DEBUG: Processing account_input={account_input!r}")
try:
# Разделяем по пробелу: левая часть - номер карты, правая - номер счета
parts = account_input.split()
logger.info(f"DEBUG: After split: parts={parts}, len={len(parts)}")
if len(parts) == 2:
card_number = parts[0] # Номер клубной карты
account_number = parts[1] # Номер счета
logger.info(f"DEBUG: 2 parts - card={card_number!r}, account={account_number!r}")
elif len(parts) == 1:
# Если нет пробела, считаем что это просто номер счета
card_number = None
account_number = parts[0]
logger.info(f"DEBUG: 1 part - account={account_number!r}")
else:
logger.info(f"DEBUG: Invalid parts count={len(parts)}")
results["invalid_accounts"].append(account_input)
results["errors"].append(f"Неверный формат: {account_input}")
continue
# Валидируем и форматируем номер счета
logger.info(f"DEBUG: Before format_account_number: {account_number!r}")
formatted_account = format_account_number(account_number)
logger.info(f"DEBUG: After format_account_number: {formatted_account!r}")
if not formatted_account:
card_info = f" (карта: {card_number})" if card_number else ""
results["invalid_accounts"].append(account_input)
results["errors"].append(f"Неверный формат счета: {account_number}{card_info}")
logger.error(f"DEBUG: Format failed for {account_number!r}")
continue
# Ищем владельца счёта через таблицу Account
from ..core.registration_services import AccountService
user = await AccountService.get_account_owner(session, formatted_account)
if not user:
card_info = f" (карта: {card_number})" if card_number else ""
results["errors"].append(f"Пользователь с счётом {formatted_account}{card_info} не найден")
continue
# Получаем запись Account для этого счета
account_record = await session.execute(
select(Account).where(Account.account_number == formatted_account)
)
account_record = account_record.scalar_one_or_none()
if not account_record:
card_info = f" (карта: {card_number})" if card_number else ""
results["errors"].append(f"Запись счета {formatted_account}{card_info} не найдена в базе")
continue
# Проверяем, не участвует ли уже этот счет
existing = await session.execute(
select(Participation).where(
Participation.lottery_id == lottery_id,
Participation.account_number == formatted_account
)
)
if existing.scalar_one_or_none():
results["skipped"] += 1
detail = f"{user.first_name} ({formatted_account})"
if card_number:
detail = f"{user.first_name} (карта: {card_number}, счёт: {formatted_account})"
results["details"].append(f"Уже участвует: {detail}")
continue
# Добавляем участие по счету
participation = Participation(
lottery_id=lottery_id,
user_id=user.id,
account_id=account_record.id,
account_number=formatted_account
)
session.add(participation)
await session.commit()
results["added"] += 1
detail = f"{user.first_name} ({formatted_account})"
if card_number:
detail = f"{user.first_name} (карта: {card_number}, счёт: {formatted_account})"
results["details"].append(detail)
except Exception as e:
results["errors"].append(f"Ошибка с {account_input}: {str(e)}")
return results
@staticmethod
async def remove_participants_by_accounts_bulk(session: AsyncSession, lottery_id: int, account_numbers: List[str]) -> Dict[str, Any]:
"""Массовое удаление участников по номерам счетов"""
results = {
"removed": 0,
"not_found": 0,
"errors": [],
"details": [],
"invalid_accounts": []
}
for account_input in account_numbers:
account_input = account_input.strip()
if not account_input:
continue
try:
# Разделяем по пробелу: левая часть - номер карты, правая - номер счета
parts = account_input.split()
if len(parts) == 2:
card_number = parts[0] # Номер клубной карты
account_number = parts[1] # Номер счета
elif len(parts) == 1:
# Если нет пробела, считаем что это просто номер счета
card_number = None
account_number = parts[0]
else:
results["invalid_accounts"].append(account_input)
results["errors"].append(f"Неверный формат: {account_input}")
continue
# Валидируем и форматируем номер счета
formatted_account = format_account_number(account_number)
if not formatted_account:
card_info = f" (карта: {card_number})" if card_number else ""
results["invalid_accounts"].append(account_input)
results["errors"].append(f"Неверный формат счета: {account_number}{card_info}")
continue
# Ищем владельца счёта через таблицу Account
from ..core.registration_services import AccountService
user = await AccountService.get_account_owner(session, formatted_account)
if not user:
card_info = f" (карта: {card_number})" if card_number else ""
results["not_found"] += 1
results["details"].append(f"Не найден: {formatted_account}{card_info}")
continue
# Ищем участие по номеру счета (не по user_id!)
participation = await session.execute(
select(Participation).where(
Participation.lottery_id == lottery_id,
Participation.account_number == formatted_account
)
)
participation = participation.scalar_one_or_none()
if participation:
await session.delete(participation)
await session.commit()
results["removed"] += 1
detail = f"{user.first_name} ({formatted_account})"
if card_number:
detail = f"{user.first_name} (карта: {card_number}, счёт: {formatted_account})"
results["details"].append(detail)
else:
results["not_found"] += 1
detail = f"{user.first_name} ({formatted_account})"
if card_number:
detail = f"{user.first_name} (карта: {card_number}, счёт: {formatted_account})"
results["details"].append(f"Не участвовал: {detail}")
except Exception as e:
results["errors"].append(f"Ошибка с {account_input}: {str(e)}")
return results
@staticmethod
async def get_participant_stats(session: AsyncSession, user_id: int) -> Dict[str, Any]:
"""Статистика участника"""
from sqlalchemy import func
# Количество участий
participations_count = await session.scalar(
select(func.count(Participation.id)).where(Participation.user_id == user_id)
)
# Количество побед
wins_count = await session.scalar(
select(func.count(Winner.id)).where(Winner.user_id == user_id)
)
# Последнее участие
last_participation = await session.execute(
select(Participation).where(Participation.user_id == user_id)
.order_by(Participation.created_at.desc()).limit(1)
)
last_participation = last_participation.scalar_one_or_none()
return {
"participations_count": participations_count,
"wins_count": wins_count,
"last_participation": last_participation.created_at if last_participation else None
}