This commit is contained in:
2025-11-16 12:36:02 +09:00
parent 3a25e6a4cb
commit eb3f3807fd
61 changed files with 1438 additions and 1139 deletions

3
src/__init__.py Normal file
View File

@@ -0,0 +1,3 @@
"""
Лотерейный бот - основной пакет приложения.
"""

3
src/core/__init__.py Normal file
View File

@@ -0,0 +1,3 @@
"""
Основные компоненты системы: конфигурация, база данных, модели и сервисы.
"""

29
src/core/config.py Normal file
View File

@@ -0,0 +1,29 @@
import os
from dotenv import load_dotenv
# Загружаем переменные окружения
load_dotenv()
# Telegram Bot
BOT_TOKEN = os.getenv("BOT_TOKEN")
if not BOT_TOKEN:
raise ValueError("BOT_TOKEN не найден в переменных окружения")
# База данных
DATABASE_URL = os.getenv("DATABASE_URL", "sqlite+aiosqlite:///./lottery_bot.db")
# Администраторы
ADMIN_IDS = []
admin_ids_str = os.getenv("ADMIN_IDS", "")
if admin_ids_str:
try:
ADMIN_IDS = [int(id_str.strip()) for id_str in admin_ids_str.split(",") if id_str.strip()]
except ValueError:
print("Предупреждение: Некорректные ID администраторов в ADMIN_IDS")
# Логирование
LOG_LEVEL = os.getenv("LOG_LEVEL", "INFO")
# Настройки бота
MAX_PARTICIPANTS_PER_LOTTERY = 10000 # Максимальное количество участников в розыгрыше
MAX_ACTIVE_LOTTERIES = 10 # Максимальное количество активных розыгрышей

41
src/core/database.py Normal file
View File

@@ -0,0 +1,41 @@
import os
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker
from sqlalchemy.orm import declarative_base
from dotenv import load_dotenv
# Загружаем переменные окружения
load_dotenv()
# Конфигурация базы данных
DATABASE_URL = os.getenv("DATABASE_URL", "sqlite+aiosqlite:///./lottery_bot.db")
# Создаем асинхронный движок
engine = create_async_engine(
DATABASE_URL,
echo=True, # Логирование SQL запросов
future=True,
)
# Создаем фабрику сессий
async_session_maker = async_sessionmaker(
engine,
class_=AsyncSession,
expire_on_commit=False
)
# Базовый класс для моделей
Base = declarative_base()
async def get_session() -> AsyncSession:
"""Получить асинхронную сессию базы данных"""
async with async_session_maker() as session:
yield session
async def init_db():
"""Инициализация базы данных"""
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
async def close_db():
"""Закрытие соединения с базой данных"""
await engine.dispose()

98
src/core/models.py Normal file
View File

@@ -0,0 +1,98 @@
from sqlalchemy import Column, Integer, String, DateTime, Boolean, ForeignKey, Text, JSON
from sqlalchemy.orm import relationship
from datetime import datetime, timezone
from .database import Base
class User(Base):
"""Модель пользователя"""
__tablename__ = "users"
id = Column(Integer, primary_key=True)
telegram_id = Column(Integer, unique=True, nullable=False, index=True)
username = Column(String(255))
first_name = Column(String(255))
last_name = Column(String(255))
created_at = Column(DateTime(timezone=True), default=lambda: datetime.now(timezone.utc))
is_admin = Column(Boolean, default=False)
# Клиентский счет в формате: XX-XX-XX-XX-XX-XX-XX (7 пар цифр через дефис)
account_number = Column(String(20), unique=True, nullable=True, index=True)
# Связи
participations = relationship("Participation", back_populates="user")
def __repr__(self):
return f"<User(telegram_id={self.telegram_id}, username={self.username})>"
class Lottery(Base):
"""Модель розыгрыша"""
__tablename__ = "lotteries"
id = Column(Integer, primary_key=True)
title = Column(String(500), nullable=False)
description = Column(Text)
created_at = Column(DateTime(timezone=True), default=lambda: datetime.now(timezone.utc))
start_date = Column(DateTime(timezone=True))
end_date = Column(DateTime(timezone=True))
is_active = Column(Boolean, default=True)
is_completed = Column(Boolean, default=False)
prizes = Column(JSON) # Список призов в формате JSON
creator_id = Column(Integer, ForeignKey("users.id"), nullable=False)
# Настройки для ручного управления победителями
manual_winners = Column(JSON, default=lambda: {}) # {место: telegram_id}
draw_results = Column(JSON) # Результаты розыгрыша
# Тип отображения победителей: "username", "chat_id", "account_number"
winner_display_type = Column(String(20), default="username")
# Связи
creator = relationship("User")
participations = relationship("Participation", back_populates="lottery")
def __repr__(self):
return f"<Lottery(id={self.id}, title={self.title})>"
class Participation(Base):
"""Модель участия в розыгрыше"""
__tablename__ = "participations"
id = Column(Integer, primary_key=True)
user_id = Column(Integer, ForeignKey("users.id"), nullable=True) # Опционально
lottery_id = Column(Integer, ForeignKey("lotteries.id"), nullable=False)
account_number = Column(String(20), nullable=True, index=True) # Счет участника (XX-XX-XX-XX-XX-XX-XX)
created_at = Column(DateTime(timezone=True), default=lambda: datetime.now(timezone.utc))
# Связи
user = relationship("User", back_populates="participations")
lottery = relationship("Lottery", back_populates="participations")
def __repr__(self):
if self.account_number:
return f"<Participation(account={self.account_number}, lottery_id={self.lottery_id})>"
return f"<Participation(user_id={self.user_id}, lottery_id={self.lottery_id})>"
class Winner(Base):
"""Модель победителя розыгрыша"""
__tablename__ = "winners"
id = Column(Integer, primary_key=True)
lottery_id = Column(Integer, ForeignKey("lotteries.id"), nullable=False)
user_id = Column(Integer, ForeignKey("users.id"), nullable=True) # Опционально
account_number = Column(String(20), nullable=True, index=True) # Счет победителя
place = Column(Integer, nullable=False) # Место (1, 2, 3...)
prize = Column(String(500)) # Описание приза
is_manual = Column(Boolean, default=False) # Был ли установлен вручную
created_at = Column(DateTime(timezone=True), default=lambda: datetime.now(timezone.utc))
# Связи
user = relationship("User")
lottery = relationship("Lottery")
def __repr__(self):
if self.account_number:
return f"<Winner(lottery_id={self.lottery_id}, account={self.account_number}, place={self.place})>"
return f"<Winner(lottery_id={self.lottery_id}, user_id={self.user_id}, place={self.place})>"

636
src/core/services.py Normal file
View File

@@ -0,0 +1,636 @@
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, update, delete
from sqlalchemy.orm import selectinload
from .models import User, Lottery, Participation, Winner
from typing import List, Optional, Dict, Any
from ..utils.account_utils import validate_account_number, format_account_number
import random
class UserService:
"""Сервис для работы с пользователями"""
@staticmethod
async def get_or_create_user(session: AsyncSession, telegram_id: int,
username: str = None, first_name: str = None,
last_name: str = None) -> User:
"""Получить или создать пользователя"""
# Пробуем найти существующего пользователя
result = await session.execute(
select(User).where(User.telegram_id == telegram_id)
)
user = result.scalar_one_or_none()
if user:
# Обновляем информацию о пользователе
user.username = username
user.first_name = first_name
user.last_name = last_name
await session.commit()
return user
# Создаем нового пользователя
user = User(
telegram_id=telegram_id,
username=username,
first_name=first_name,
last_name=last_name
)
session.add(user)
await session.commit()
await session.refresh(user)
return user
@staticmethod
async def get_user_by_telegram_id(session: AsyncSession, telegram_id: int) -> Optional[User]:
"""Получить пользователя по Telegram ID"""
result = await session.execute(
select(User).where(User.telegram_id == telegram_id)
)
return result.scalar_one_or_none()
@staticmethod
async def get_user_by_username(session: AsyncSession, username: str) -> Optional[User]:
"""Получить пользователя по username"""
result = await session.execute(select(User).where(User.username == username))
return result.scalar_one_or_none()
@staticmethod
async def get_all_users(session: AsyncSession, limit: int = None, offset: int = 0) -> List[User]:
"""Получить всех пользователей"""
query = select(User).order_by(User.created_at.desc())
if limit:
query = query.offset(offset).limit(limit)
result = await session.execute(query)
return result.scalars().all()
@staticmethod
async def search_users(session: AsyncSession, search_term: str, limit: int = 20) -> List[User]:
"""Поиск пользователей по имени или username"""
from sqlalchemy import or_, func
search_pattern = f"%{search_term.lower()}%"
result = await session.execute(
select(User).where(
or_(
func.lower(User.first_name).contains(search_pattern),
func.lower(User.last_name).contains(search_pattern) if User.last_name else False,
func.lower(User.username).contains(search_pattern) if User.username else False
)
).limit(limit)
)
return result.scalars().all()
@staticmethod
async def delete_user(session: AsyncSession, user_id: int) -> bool:
"""Удалить пользователя и все связанные данные"""
user = await session.get(User, user_id)
if not user:
return False
# Удаляем все участия
await session.execute(
delete(Participation).where(Participation.user_id == user_id)
)
# Удаляем все победы
await session.execute(
delete(Winner).where(Winner.user_id == user_id)
)
# Удаляем пользователя
await session.delete(user)
await session.commit()
return True
@staticmethod
async def set_admin(session: AsyncSession, telegram_id: int, is_admin: bool = True) -> bool:
"""Установить/снять права администратора"""
result = await session.execute(
update(User)
.where(User.telegram_id == telegram_id)
.values(is_admin=is_admin)
)
await session.commit()
return result.rowcount > 0
@staticmethod
async def set_account_number(session: AsyncSession, telegram_id: int, account_number: str) -> bool:
"""Установить номер клиентского счета пользователю"""
# Валидируем и форматируем номер
formatted_number = format_account_number(account_number)
if not formatted_number:
return False
# Проверяем уникальность номера
existing = await session.execute(
select(User).where(User.account_number == formatted_number)
)
if existing.scalar_one_or_none():
return False # Номер уже занят
# Обновляем пользователя
result = await session.execute(
update(User)
.where(User.telegram_id == telegram_id)
.values(account_number=formatted_number)
)
await session.commit()
return result.rowcount > 0
@staticmethod
async def get_user_by_account(session: AsyncSession, account_number: str) -> Optional[User]:
"""Получить пользователя по номеру счета"""
formatted_number = format_account_number(account_number)
if not formatted_number:
return None
result = await session.execute(
select(User).where(User.account_number == formatted_number)
)
return result.scalar_one_or_none()
@staticmethod
async def search_by_account(session: AsyncSession, account_pattern: str) -> List[User]:
"""Поиск пользователей по части номера счета"""
# Убираем все кроме цифр и дефисов
clean_pattern = ''.join(c for c in account_pattern if c.isdigit() or c == '-')
if not clean_pattern:
return []
result = await session.execute(
select(User).where(
User.account_number.like(f'%{clean_pattern}%')
).limit(20)
)
return result.scalars().all()
class LotteryService:
"""Сервис для работы с розыгрышами"""
@staticmethod
async def create_lottery(session: AsyncSession, title: str, description: str,
prizes: List[str], creator_id: int) -> Lottery:
"""Создать новый розыгрыш"""
lottery = Lottery(
title=title,
description=description,
prizes=prizes,
creator_id=creator_id
)
session.add(lottery)
await session.commit()
await session.refresh(lottery)
return lottery
@staticmethod
async def get_lottery(session: AsyncSession, lottery_id: int) -> Optional[Lottery]:
"""Получить розыгрыш по ID"""
result = await session.execute(
select(Lottery)
.options(selectinload(Lottery.participations).selectinload(Participation.user))
.where(Lottery.id == lottery_id)
)
return result.scalar_one_or_none()
@staticmethod
async def get_active_lotteries(session: AsyncSession, limit: Optional[int] = None) -> List[Lottery]:
"""Получить список активных розыгрышей"""
query = select(Lottery).where(
Lottery.is_active == True,
Lottery.is_completed == False
).order_by(Lottery.created_at.desc())
if limit:
query = query.limit(limit)
result = await session.execute(query)
return result.scalars().all()
@staticmethod
async def get_all_lotteries(session: AsyncSession, limit: Optional[int] = None) -> List[Lottery]:
"""Получить список всех розыгрышей"""
query = select(Lottery).order_by(Lottery.created_at.desc())
if limit:
query = query.limit(limit)
result = await session.execute(query)
return result.scalars().all()
@staticmethod
async def set_manual_winner(session: AsyncSession, lottery_id: int,
place: int, telegram_id: int) -> bool:
"""Установить ручного победителя для определенного места"""
# Получаем пользователя
user = await UserService.get_user_by_telegram_id(session, telegram_id)
if not user:
return False
# Получаем розыгрыш
lottery = await LotteryService.get_lottery(session, lottery_id)
if not lottery:
return False
# Обновляем ручных победителей
if not lottery.manual_winners:
lottery.manual_winners = {}
lottery.manual_winners[str(place)] = telegram_id
await session.commit()
return True
@staticmethod
async def conduct_draw(session: AsyncSession, lottery_id: int) -> Dict[int, Dict[str, Any]]:
"""Провести розыгрыш с учетом ручных победителей"""
lottery = await LotteryService.get_lottery(session, lottery_id)
if not lottery or lottery.is_completed:
return {}
# Получаем всех участников
participants = [p.user for p in lottery.participations]
if not participants:
return {}
# Определяем количество призовых мест
num_prizes = len(lottery.prizes) if lottery.prizes else 1
results = {}
remaining_participants = participants.copy()
manual_winners = lottery.manual_winners or {}
# Сначала обрабатываем ручных победителей
for place in range(1, num_prizes + 1):
place_str = str(place)
if place_str in manual_winners:
# Находим пользователя среди участников
manual_winner = None
for participant in remaining_participants:
if participant.telegram_id == manual_winners[place_str]:
manual_winner = participant
break
if manual_winner:
results[place] = {
'user': manual_winner,
'prize': lottery.prizes[place - 1] if lottery.prizes and place <= len(lottery.prizes) else f"Приз {place} места",
'is_manual': True
}
remaining_participants.remove(manual_winner)
# Заполняем оставшиеся места случайными участниками
for place in range(1, num_prizes + 1):
if place not in results and remaining_participants:
winner = random.choice(remaining_participants)
results[place] = {
'user': winner,
'prize': lottery.prizes[place - 1] if lottery.prizes and place <= len(lottery.prizes) else f"Приз {place} места",
'is_manual': False
}
remaining_participants.remove(winner)
# Сохраняем победителей в базу данных
for place, winner_info in results.items():
winner = Winner(
lottery_id=lottery_id,
user_id=winner_info['user'].id,
place=place,
prize=winner_info['prize'],
is_manual=winner_info['is_manual']
)
session.add(winner)
# Обновляем статус розыгрыша
lottery.is_completed = True
lottery.draw_results = {
str(place): {
'user_id': info['user'].id,
'telegram_id': info['user'].telegram_id,
'username': info['user'].username,
'prize': info['prize'],
'is_manual': info['is_manual']
} for place, info in results.items()
}
await session.commit()
return results
@staticmethod
async def get_winners(session: AsyncSession, lottery_id: int) -> List[Winner]:
"""Получить победителей розыгрыша"""
result = await session.execute(
select(Winner)
.options(selectinload(Winner.user))
.where(Winner.lottery_id == lottery_id)
.order_by(Winner.place)
)
return result.scalars().all()
@staticmethod
async def set_winner_display_type(session: AsyncSession, lottery_id: int, display_type: str) -> bool:
"""Установить тип отображения победителей для розыгрыша"""
from ..display.winner_display import validate_display_type
if not validate_display_type(display_type):
return False
result = await session.execute(
update(Lottery)
.where(Lottery.id == lottery_id)
.values(winner_display_type=display_type)
)
await session.commit()
return result.rowcount > 0
@staticmethod
async def set_lottery_active(session: AsyncSession, lottery_id: int, is_active: bool) -> bool:
"""Установить статус активности розыгрыша"""
result = await session.execute(
update(Lottery)
.where(Lottery.id == lottery_id)
.values(is_active=is_active)
)
await session.commit()
return result.rowcount > 0
@staticmethod
async def complete_lottery(session: AsyncSession, lottery_id: int) -> bool:
"""Завершить розыгрыш (сделать неактивным и завершенным)"""
from datetime import datetime
result = await session.execute(
update(Lottery)
.where(Lottery.id == lottery_id)
.values(is_active=False, is_completed=True, end_date=datetime.now())
)
await session.commit()
return result.rowcount > 0
@staticmethod
async def delete_lottery(session: AsyncSession, lottery_id: int) -> bool:
"""Удалить розыгрыш и все связанные данные"""
# Сначала удаляем все связанные данные
# Удаляем победителей
await session.execute(
delete(Winner).where(Winner.lottery_id == lottery_id)
)
# Удаляем участников
await session.execute(
delete(Participation).where(Participation.lottery_id == lottery_id)
)
# Удаляем сам розыгрыш
result = await session.execute(
delete(Lottery).where(Lottery.id == lottery_id)
)
await session.commit()
return result.rowcount > 0
class ParticipationService:
"""Сервис для работы с участием в розыгрышах"""
@staticmethod
async def add_participant(session: AsyncSession, lottery_id: int, user_id: int) -> bool:
"""Добавить участника в розыгрыш"""
# Проверяем, не участвует ли уже пользователь
existing = await session.execute(
select(Participation)
.where(Participation.lottery_id == lottery_id, Participation.user_id == user_id)
)
if existing.scalar_one_or_none():
return False
participation = Participation(lottery_id=lottery_id, user_id=user_id)
session.add(participation)
await session.commit()
return True
@staticmethod
async def remove_participant(session: AsyncSession, lottery_id: int, user_id: int) -> bool:
"""Удалить участника из розыгрыша"""
participation = await session.execute(
select(Participation)
.where(Participation.lottery_id == lottery_id, Participation.user_id == user_id)
)
participation = participation.scalar_one_or_none()
if not participation:
return False
await session.delete(participation)
await session.commit()
return True
@staticmethod
async def get_participants(session: AsyncSession, lottery_id: int, limit: Optional[int] = None, offset: int = 0) -> List[User]:
"""Получить участников розыгрыша"""
query = select(User).join(Participation).where(Participation.lottery_id == lottery_id)
if limit:
query = query.offset(offset).limit(limit)
result = await session.execute(query)
return list(result.scalars().all())
@staticmethod
async def get_user_participations(session: AsyncSession, user_id: int) -> List[Participation]:
"""Получить участие пользователя в розыгрышах"""
result = await session.execute(
select(Participation)
.options(selectinload(Participation.lottery))
.where(Participation.user_id == user_id)
.order_by(Participation.created_at.desc())
)
return list(result.scalars().all())
@staticmethod
async def get_participants_count(session: AsyncSession, lottery_id: int) -> int:
"""Получить количество участников в розыгрыше"""
result = await session.execute(
select(Participation)
.where(Participation.lottery_id == lottery_id)
)
return len(result.scalars().all())
@staticmethod
async def add_participants_bulk(session: AsyncSession, lottery_id: int, telegram_ids: List[int]) -> Dict[str, Any]:
"""Массовое добавление участников"""
results = {
"added": 0,
"skipped": 0,
"errors": [],
"details": []
}
for telegram_id in telegram_ids:
try:
# Проверяем, существует ли пользователь
user = await UserService.get_user_by_telegram_id(session, telegram_id)
if not user:
results["errors"].append(f"Пользователь {telegram_id} не найден")
continue
# Пробуем добавить
if await ParticipationService.add_participant(session, lottery_id, user.id):
results["added"] += 1
results["details"].append(f"Добавлен: {user.first_name} (@{user.username or 'no_username'})")
else:
results["skipped"] += 1
results["details"].append(f"Уже участвует: {user.first_name}")
except Exception as e:
results["errors"].append(f"Ошибка с {telegram_id}: {str(e)}")
return results
@staticmethod
async def remove_participants_bulk(session: AsyncSession, lottery_id: int, telegram_ids: List[int]) -> Dict[str, Any]:
"""Массовое удаление участников"""
results = {
"removed": 0,
"not_found": 0,
"errors": [],
"details": []
}
for telegram_id in telegram_ids:
try:
user = await UserService.get_user_by_telegram_id(session, telegram_id)
if not user:
results["not_found"] += 1
results["details"].append(f"Не найден: {telegram_id}")
continue
if await ParticipationService.remove_participant(session, lottery_id, user.id):
results["removed"] += 1
results["details"].append(f"Удален: {user.first_name}")
else:
results["not_found"] += 1
results["details"].append(f"Не участвовал: {user.first_name}")
except Exception as e:
results["errors"].append(f"Ошибка с {telegram_id}: {str(e)}")
return results
@staticmethod
async def add_participants_by_accounts_bulk(session: AsyncSession, lottery_id: int, account_numbers: List[str]) -> Dict[str, Any]:
"""Массовое добавление участников по номерам счетов"""
results = {
"added": 0,
"skipped": 0,
"errors": [],
"details": [],
"invalid_accounts": []
}
for account_number in account_numbers:
account_number = account_number.strip()
if not account_number:
continue
try:
# Валидируем и форматируем номер
formatted_account = format_account_number(account_number)
if not formatted_account:
results["invalid_accounts"].append(account_number)
results["errors"].append(f"Неверный формат: {account_number}")
continue
# Ищем пользователя по номеру счёта
user = await UserService.get_user_by_account(session, formatted_account)
if not user:
results["errors"].append(f"Пользователь с счётом {formatted_account} не найден")
continue
# Пробуем добавить в розыгрыш
if await ParticipationService.add_participant(session, lottery_id, user.id):
results["added"] += 1
results["details"].append(f"Добавлен: {user.first_name} ({formatted_account})")
else:
results["skipped"] += 1
results["details"].append(f"Уже участвует: {user.first_name} ({formatted_account})")
except Exception as e:
results["errors"].append(f"Ошибка с {account_number}: {str(e)}")
return results
@staticmethod
async def remove_participants_by_accounts_bulk(session: AsyncSession, lottery_id: int, account_numbers: List[str]) -> Dict[str, Any]:
"""Массовое удаление участников по номерам счетов"""
results = {
"removed": 0,
"not_found": 0,
"errors": [],
"details": [],
"invalid_accounts": []
}
for account_number in account_numbers:
account_number = account_number.strip()
if not account_number:
continue
try:
# Валидируем и форматируем номер
formatted_account = format_account_number(account_number)
if not formatted_account:
results["invalid_accounts"].append(account_number)
results["errors"].append(f"Неверный формат: {account_number}")
continue
# Ищем пользователя по номеру счёта
user = await UserService.get_user_by_account(session, formatted_account)
if not user:
results["not_found"] += 1
results["details"].append(f"Не найден: {formatted_account}")
continue
# Пробуем удалить из розыгрыша
if await ParticipationService.remove_participant(session, lottery_id, user.id):
results["removed"] += 1
results["details"].append(f"Удалён: {user.first_name} ({formatted_account})")
else:
results["not_found"] += 1
results["details"].append(f"Не участвовал: {user.first_name} ({formatted_account})")
except Exception as e:
results["errors"].append(f"Ошибка с {account_number}: {str(e)}")
return results
@staticmethod
async def get_participant_stats(session: AsyncSession, user_id: int) -> Dict[str, Any]:
"""Статистика участника"""
from sqlalchemy import func
# Количество участий
participations_count = await session.scalar(
select(func.count(Participation.id)).where(Participation.user_id == user_id)
)
# Количество побед
wins_count = await session.scalar(
select(func.count(Winner.id)).where(Winner.user_id == user_id)
)
# Последнее участие
last_participation = await session.execute(
select(Participation).where(Participation.user_id == user_id)
.order_by(Participation.created_at.desc()).limit(1)
)
last_participation = last_participation.scalar_one_or_none()
return {
"participations_count": participations_count,
"wins_count": wins_count,
"last_participation": last_participation.created_at if last_participation else None
}

3
src/display/__init__.py Normal file
View File

@@ -0,0 +1,3 @@
"""
Компоненты отображения и вывода результатов.
"""

102
src/display/conduct_draw.py Normal file
View File

@@ -0,0 +1,102 @@
#!/usr/bin/env python3
"""
Скрипт для проведения розыгрыша
"""
import asyncio
import json
from ..core.database import async_session_maker
from ..core.services import LotteryService, ParticipationService
from ..core.models import Lottery, User
async def conduct_lottery_draw(lottery_id: int):
"""Проводим розыгрыш для указанного ID"""
print(f"🎲 Проведение розыгрыша #{lottery_id}")
async with async_session_maker() as session:
# Получаем розыгрыш
lottery = await LotteryService.get_lottery(session, lottery_id)
if not lottery:
print(f"❌ Розыгрыш {lottery_id} не найден")
return
print(f"📋 Розыгрыш: {lottery.title}")
# Призы уже загружены как список
prizes = lottery.prizes if isinstance(lottery.prizes, list) else json.loads(lottery.prizes)
print(f"🏆 Призы: {len(prizes)} шт.")
# Получаем участников
participants = await ParticipationService.get_participants(session, lottery_id)
print(f"👥 Участников: {len(participants)}")
if len(participants) == 0:
print("⚠️ Нет участников для розыгрыша")
return
# Выводим список участников
print("\n👥 Список участников:")
for i, user in enumerate(participants, 1):
accounts = user.account_number.split(',') if user.account_number else ['Нет счетов']
print(f" {i}. {user.first_name} (@{user.username}) - {len(accounts)} счет(ов)")
# Проводим розыгрыш
print(f"\n🎲 Проводим розыгрыш...")
# Используем метод из LotteryService
try:
winners = await LotteryService.conduct_draw(session, lottery_id)
if winners:
print(f"\n🎉 Победители определены:")
for i, winner_data in enumerate(winners, 1):
user = winner_data['user']
prize = winner_data['prize']
print(f" 🏆 {i} место: {user.first_name} (@{user.username})")
print(f" 💎 Приз: {prize}")
# Обновляем статус розыгрыша
await LotteryService.set_lottery_completed(session, lottery_id, True)
print(f"\n✅ Розыгрыш завершен и помечен как завершенный")
else:
print("❌ Ошибка при проведении розыгрыша")
except Exception as e:
print(f"💥 Ошибка: {e}")
async def main():
"""Главная функция"""
print("🎯 Скрипт проведения розыгрышей")
print("=" * 50)
# Показываем доступные розыгрыши
async with async_session_maker() as session:
lotteries = await LotteryService.get_active_lotteries(session)
if not lotteries:
print("❌ Нет активных розыгрышей")
return
print("🎲 Активные розыгрыши:")
for lottery in lotteries:
participants = await ParticipationService.get_participants(session, lottery.id)
print(f" {lottery.id}. {lottery.title} ({len(participants)} участников)")
# Просим выбрать розыгрыш
print("\nВведите ID розыгрыша для проведения (или 'all' для всех):")
choice = input("> ").strip().lower()
if choice == 'all':
# Проводим все розыгрыши
for lottery in lotteries:
await conduct_lottery_draw(lottery.id)
print("-" * 30)
else:
try:
lottery_id = int(choice)
await conduct_lottery_draw(lottery_id)
except ValueError:
print("❌ Неверный ID розыгрыша")
if __name__ == "__main__":
asyncio.run(main())

191
src/display/demo_admin.py Normal file
View File

@@ -0,0 +1,191 @@
"""
Демонстрация возможностей админ-панели
"""
import asyncio
from ..core.database import async_session_maker, init_db
from ..core.services import UserService, LotteryService
from ..utils.admin_utils import AdminUtils, ReportGenerator
async def demo_admin_features():
"""Демонстрация функций админ-панели"""
print("🚀 Демонстрация возможностей админ-панели")
print("=" * 50)
await init_db()
async with async_session_maker() as session:
# Создаем тестового администратора
admin = await UserService.get_or_create_user(
session,
telegram_id=123456789,
username="admin",
first_name="Администратор",
last_name="Системы"
)
await UserService.set_admin(session, 123456789, True)
print(f"✅ Создан администратор: {admin.first_name}")
# Создаем тестовых пользователей
users = []
for i in range(1, 11):
user = await UserService.get_or_create_user(
session,
telegram_id=200000000 + i,
username=f"user{i}",
first_name=f"Пользователь{i}",
last_name="Тестовый"
)
users.append(user)
print(f"✅ Создано {len(users)} тестовых пользователей")
# Создаем несколько розыгрышей
lottery1 = await LotteryService.create_lottery(
session,
title="🎉 Новогодний мега-розыгрыш",
description="Грандиозный розыгрыш к Новому году с невероятными призами!",
prizes=[
"🥇 iPhone 15 Pro Max 1TB",
"🥈 MacBook Air M2 13\"",
"🥉 iPad Pro 12.9\"",
"🏆 AirPods Pro 2",
"🎁 Подарочная карта Apple 50,000₽"
],
creator_id=admin.id
)
lottery2 = await LotteryService.create_lottery(
session,
title="🚗 Автомобильный розыгрыш",
description="Выиграй автомобиль своей мечты!",
prizes=[
"🥇 Tesla Model 3",
"🥈 BMW X3",
"🥉 Mercedes-Benz C-Class"
],
creator_id=admin.id
)
lottery3 = await LotteryService.create_lottery(
session,
title="🏖️ Отпуск мечты",
description="Путешествие в райские места",
prizes=[
"🥇 Тур на Мальдивы на двоих",
"🥈 Неделя в Дубае",
"🥉 Тур в Турцию"
],
creator_id=admin.id
)
print(f"✅ Создано 3 розыгрыша:")
print(f" - {lottery1.title}")
print(f" - {lottery2.title}")
print(f" - {lottery3.title}")
# Добавляем участников в розыгрыши
participants_added = 0
for user in users:
# В первый розыгрыш добавляем всех
if await LotteryService.add_participant(session, lottery1.id, user.id):
participants_added += 1
# Во второй - половину
if user.id % 2 == 0:
if await LotteryService.add_participant(session, lottery2.id, user.id):
participants_added += 1
# В третий - треть
if user.id % 3 == 0:
if await LotteryService.add_participant(session, lottery3.id, user.id):
participants_added += 1
print(f"✅ Добавлено {participants_added} участий")
# Устанавливаем предопределенных победителей
print("\n👑 Установка предопределенных победителей:")
# В первом розыгрыше: 1 и 3 места
await LotteryService.set_manual_winner(session, lottery1.id, 1, users[0].telegram_id)
await LotteryService.set_manual_winner(session, lottery1.id, 3, users[2].telegram_id)
print(f" 🎯 {lottery1.title}: 1 место - {users[0].first_name}, 3 место - {users[2].first_name}")
# Во втором розыгрыше: только 1 место
await LotteryService.set_manual_winner(session, lottery2.id, 1, users[1].telegram_id)
print(f" 🚗 {lottery2.title}: 1 место - {users[1].first_name}")
# Получаем статистику по розыгрышам
print(f"\n📊 Статистика по розыгрышам:")
for lottery in [lottery1, lottery2, lottery3]:
stats = await AdminUtils.get_lottery_statistics(session, lottery.id)
print(f" 🎲 {lottery.title}:")
print(f" Участников: {stats['participants_count']}")
print(f" Ручных победителей: {len(lottery.manual_winners or {})}")
# Проводим розыгрыши
print(f"\n🎲 Проведение розыгрышей:")
# Первый розыгрыш
results1 = await LotteryService.conduct_draw(session, lottery1.id)
print(f" 🎉 {lottery1.title} - результаты:")
for place, winner_info in results1.items():
user = winner_info['user']
manual = " 👑" if winner_info['is_manual'] else " 🎲"
print(f" {place}. {user.first_name}{manual}")
# Второй розыгрыш
results2 = await LotteryService.conduct_draw(session, lottery2.id)
print(f" 🚗 {lottery2.title} - результаты:")
for place, winner_info in results2.items():
user = winner_info['user']
manual = " 👑" if winner_info['is_manual'] else " 🎲"
print(f" {place}. {user.first_name}{manual}")
# Генерируем отчет
print(f"\n📋 Генерация отчета:")
report = await ReportGenerator.generate_summary_report(session)
print(report)
# Экспорт данных
print(f"💾 Экспорт данных первого розыгрыша:")
export_data = await AdminUtils.export_lottery_data(session, lottery1.id)
print(f" ✅ Экспортировано:")
print(f" - Розыгрыш: {export_data['lottery']['title']}")
print(f" - Участников: {len(export_data['participants'])}")
print(f" - Победителей: {len(export_data['winners'])}")
# Активность пользователя
print(f"\n👤 Активность пользователя {users[0].first_name}:")
activity = await AdminUtils.get_user_activity(session, users[0].telegram_id)
print(f" 📊 Статистика:")
print(f" Участий: {activity['total_participations']}")
print(f" Побед: {activity['total_wins']}")
print(f"\n" + "=" * 50)
print(f"✅ Демонстрация завершена!")
print(f"")
print(f"🎯 Что показано:")
print(f" ✅ Создание пользователей и розыгрышей")
print(f" ✅ Добавление участников")
print(f" ✅ Установка предопределенных победителей")
print(f" ✅ Проведение розыгрышей с ручными победителями")
print(f" ✅ Генерация статистики и отчетов")
print(f" ✅ Экспорт данных")
print(f" ✅ Анализ активности пользователей")
print(f"")
print(f"🚀 Админ-панель готова к использованию!")
# Показываем ключевую особенность
print(f"\n🎯 КЛЮЧЕВАЯ ОСОБЕННОСТЬ:")
print(f"В первом розыгрыше мы заранее установили:")
print(f" 👑 1 место: {users[0].first_name}")
print(f" 👑 3 место: {users[2].first_name}")
print(f"")
print(f"При розыгрыше эти пользователи автоматически заняли свои места,")
print(f"а остальные места (2, 4, 5) были разыграны случайно!")
print(f"")
print(f"🎭 Никто из участников не знает о подстройке!")
if __name__ == "__main__":
asyncio.run(demo_admin_features())

View File

@@ -0,0 +1,32 @@
#!/usr/bin/env python3
"""
Простой скрипт для проведения розыгрыша
"""
import asyncio
from ..core.database import async_session_maker
from ..core.services import LotteryService
async def conduct_simple_draw():
"""Проводим розыгрыш"""
print("🎲 Проведение розыгрыша")
print("=" * 30)
lottery_id = 1 # Первый розыгрыш
async with async_session_maker() as session:
print(f"🎯 Проводим розыгрыш #{lottery_id}")
try:
# Проводим розыгрыш
winners = await LotteryService.conduct_draw(session, lottery_id)
print(f"🎉 Розыгрыш проведен!")
print(f"📊 Результат: {winners}")
except Exception as e:
print(f"❌ Ошибка: {e}")
import traceback
traceback.print_exc()
if __name__ == "__main__":
asyncio.run(conduct_simple_draw())

View File

@@ -0,0 +1,116 @@
"""
Утилиты для отображения информации о победителях в зависимости от настроек розыгрыша
"""
from typing import Dict, Any, Optional
from ..core.models import User, Lottery
from ..utils.account_utils import mask_account_number
def format_winner_display(user: User, lottery: Lottery, show_sensitive_data: bool = False) -> str:
"""
Форматирует отображение победителя в зависимости от настроек розыгрыша
Args:
user: Пользователь-победитель
lottery: Розыгрыш
show_sensitive_data: Показывать ли чувствительные данные (для админов)
Returns:
str: Отформатированная строка для отображения победителя
"""
display_type = getattr(lottery, 'winner_display_type', 'username')
if display_type == 'username':
# Отображаем username или имя
if user.username:
return f"@{user.username}"
else:
return user.first_name or f"Пользователь {user.id}"
elif display_type == 'chat_id':
# Отображаем Telegram ID
return f"ID: {user.telegram_id}"
elif display_type == 'account_number':
# Отображаем номер клиентского счета
if not user.account_number:
return "Счёт не указан"
if show_sensitive_data:
# Для админов показываем полный номер
return f"Счёт: {user.account_number}"
else:
# Для публичного показа маскируем номер
masked = mask_account_number(user.account_number, show_last_digits=4)
return f"Счёт: {masked}"
else:
# Fallback к username/имени
if user.username:
return f"@{user.username}"
else:
return user.first_name or f"Пользователь {user.id}"
def format_winner_info(winner_data: Dict[str, Any], show_sensitive_data: bool = False) -> str:
"""
Форматирует информацию о победителе из данных розыгрыша
Args:
winner_data: Словарь с данными о победителе
show_sensitive_data: Показывать ли чувствительные данные
Returns:
str: Отформатированная строка для отображения
"""
user = winner_data.get('user')
place = winner_data.get('place', 1)
prize = winner_data.get('prize', f'Приз {place} места')
if not user:
return f"{place}. Победитель не определен"
# Пробуем получить lottery из winner_data, если есть
lottery = winner_data.get('lottery')
if lottery:
winner_display = format_winner_display(user, lottery, show_sensitive_data)
else:
# Fallback если нет данных о розыгрыше
if user.username:
winner_display = f"@{user.username}"
else:
winner_display = user.first_name or f"Пользователь {user.id}"
return f"{place}. {winner_display}"
def get_display_type_name(display_type: str) -> str:
"""
Получить человекочитаемое название типа отображения
Args:
display_type: Тип отображения
Returns:
str: Название типа
"""
types = {
'username': 'Username/Имя',
'chat_id': 'Telegram ID',
'account_number': 'Номер счёта'
}
return types.get(display_type, 'Неизвестно')
def validate_display_type(display_type: str) -> bool:
"""
Проверяет корректность типа отображения
Args:
display_type: Тип отображения для проверки
Returns:
bool: True если тип корректен
"""
return display_type in ['username', 'chat_id', 'account_number']

3
src/handlers/__init__.py Normal file
View File

@@ -0,0 +1,3 @@
"""
Обработчики событий и команд: аккаунты, админ-панель.
"""

View File

@@ -0,0 +1,375 @@
"""
Обработчики для работы со счетами в розыгрышах
"""
from aiogram import Router, F
from aiogram.types import Message, CallbackQuery, InlineKeyboardButton, InlineKeyboardMarkup
from aiogram.filters import StateFilter
from aiogram.fsm.context import FSMContext
from aiogram.fsm.state import State, StatesGroup
from sqlalchemy.ext.asyncio import AsyncSession
from ..core.config import ADMIN_IDS
from ..core.database import async_session_maker
from ..core.services import LotteryService
from .account_services import AccountParticipationService
from ..utils.account_utils import parse_accounts_from_message, validate_account_number
from typing import List
# Состояния FSM для работы со счетами
class AccountStates(StatesGroup):
waiting_for_lottery_choice = State() # Выбор розыгрыша для добавления счетов
waiting_for_winner_lottery = State() # Выбор розыгрыша для установки победителя
waiting_for_winner_place = State() # Выбор места победителя
searching_accounts = State() # Поиск счетов
# Создаем роутер
account_router = Router()
def is_admin(user_id: int) -> bool:
"""Проверка прав администратора"""
return user_id in ADMIN_IDS
@account_router.message(
F.text,
StateFilter(None),
~F.text.startswith('/') # Исключаем команды
)
async def detect_account_input(message: Message, state: FSMContext):
"""
Обнаружение ввода счетов в сообщении
Активируется только для администраторов
"""
if not is_admin(message.from_user.id):
return
# Парсим счета из сообщения
accounts = parse_accounts_from_message(message.text)
if not accounts:
return # Счета не обнаружены, пропускаем
# Сохраняем счета в состоянии
await state.update_data(detected_accounts=accounts)
# Формируем сообщение
accounts_text = "\n".join([f"{acc}" for acc in accounts])
count = len(accounts)
text = (
f"🔍 <b>Обнаружен ввод счет{'а' if count == 1 else 'ов'}</b>\n\n"
f"Найдено: <b>{count}</b>\n\n"
f"{accounts_text}\n\n"
f"Выберите действие:"
)
# Кнопки выбора действия
keyboard = InlineKeyboardMarkup(inline_keyboard=[
[InlineKeyboardButton(
text=" Добавить в розыгрыш",
callback_data="account_action:add_to_lottery"
)],
[InlineKeyboardButton(
text="👑 Сделать победителем",
callback_data="account_action:set_as_winner"
)],
[InlineKeyboardButton(
text="❌ Отмена",
callback_data="account_action:cancel"
)]
])
await message.answer(text, reply_markup=keyboard, parse_mode="HTML")
@account_router.callback_query(F.data == "account_action:cancel")
async def cancel_account_action(callback: CallbackQuery, state: FSMContext):
"""Отмена действия со счетами"""
await state.clear()
await callback.message.edit_text("❌ Действие отменено")
await callback.answer()
@account_router.callback_query(F.data == "account_action:add_to_lottery")
async def choose_lottery_for_accounts(callback: CallbackQuery, state: FSMContext):
"""Выбор розыгрыша для добавления счетов"""
if not is_admin(callback.from_user.id):
await callback.answer("⛔ Доступно только администраторам", show_alert=True)
return
async with async_session_maker() as session:
# Получаем активные розыгрыши
lotteries = await LotteryService.get_active_lotteries(session, limit=20)
if not lotteries:
await callback.message.edit_text(
"❌ Нет активных розыгрышей.\n\n"
"Сначала создайте розыгрыш через /admin"
)
await state.clear()
await callback.answer()
return
# Формируем кнопки с розыгрышами
buttons = []
for lottery in lotteries:
buttons.append([InlineKeyboardButton(
text=f"🎲 {lottery.title[:40]}",
callback_data=f"add_accounts_to:{lottery.id}"
)])
buttons.append([InlineKeyboardButton(
text="❌ Отмена",
callback_data="account_action:cancel"
)])
keyboard = InlineKeyboardMarkup(inline_keyboard=buttons)
await callback.message.edit_text(
"📋 <b>Выберите розыгрыш:</b>",
reply_markup=keyboard,
parse_mode="HTML"
)
await state.set_state(AccountStates.waiting_for_lottery_choice)
await callback.answer()
@account_router.callback_query(F.data.startswith("add_accounts_to:"))
async def add_accounts_to_lottery(callback: CallbackQuery, state: FSMContext):
"""Добавление счетов в выбранный розыгрыш"""
if not is_admin(callback.from_user.id):
await callback.answer("⛔ Доступно только администраторам", show_alert=True)
return
lottery_id = int(callback.data.split(":")[1])
# Получаем сохраненные счета
data = await state.get_data()
accounts = data.get("detected_accounts", [])
if not accounts:
await callback.message.edit_text("❌ Счета не найдены")
await state.clear()
await callback.answer()
return
# Показываем процесс
await callback.message.edit_text("⏳ Добавляем счета...")
async with async_session_maker() as session:
# Получаем информацию о розыгрыше
lottery = await LotteryService.get_lottery(session, lottery_id)
if not lottery:
await callback.message.edit_text("❌ Розыгрыш не найден")
await state.clear()
await callback.answer()
return
# Добавляем счета
results = await AccountParticipationService.add_accounts_bulk(
session, lottery_id, accounts
)
# Формируем результат
text = f"<b>Результаты добавления в розыгрыш:</b>\n<i>{lottery.title}</i>\n\n"
text += f"✅ Добавлено: <b>{results['added']}</b>\n"
text += f"⚠️ Пропущено: <b>{results['skipped']}</b>\n\n"
if results['details']:
text += "<b>Детали:</b>\n"
text += "\n".join(results['details'][:20]) # Показываем первые 20
if len(results['details']) > 20:
text += f"\n... и ещё {len(results['details']) - 20}"
if results['errors']:
text += f"\n\n<b>Ошибки:</b>\n"
text += "\n".join(results['errors'][:10])
await callback.message.edit_text(text, parse_mode="HTML")
await state.clear()
await callback.answer("✅ Готово!")
@account_router.callback_query(F.data == "account_action:set_as_winner")
async def choose_lottery_for_winner(callback: CallbackQuery, state: FSMContext):
"""Выбор розыгрыша для установки победителя"""
if not is_admin(callback.from_user.id):
await callback.answer("⛔ Доступно только администраторам", show_alert=True)
return
# Проверяем, что у нас только один счет
data = await state.get_data()
accounts = data.get("detected_accounts", [])
if len(accounts) != 1:
await callback.message.edit_text(
"❌ Для установки победителя введите <b>один</b> счет",
parse_mode="HTML"
)
await state.clear()
await callback.answer()
return
async with async_session_maker() as session:
# Получаем все розыгрыши (активные и завершенные)
lotteries = await LotteryService.get_all_lotteries(session, limit=30)
if not lotteries:
await callback.message.edit_text(
"❌ Нет розыгрышей.\n\n"
"Сначала создайте розыгрыш через /admin"
)
await state.clear()
await callback.answer()
return
# Формируем кнопки
buttons = []
for lottery in lotteries:
status = "" if lottery.is_completed else "🎲"
buttons.append([InlineKeyboardButton(
text=f"{status} {lottery.title[:35]}",
callback_data=f"winner_lottery:{lottery.id}"
)])
buttons.append([InlineKeyboardButton(
text="❌ Отмена",
callback_data="account_action:cancel"
)])
keyboard = InlineKeyboardMarkup(inline_keyboard=buttons)
account = accounts[0]
await callback.message.edit_text(
f"👑 <b>Установка победителя</b>\n\n"
f"Счет: <code>{account}</code>\n\n"
f"Выберите розыгрыш:",
reply_markup=keyboard,
parse_mode="HTML"
)
await state.set_state(AccountStates.waiting_for_winner_lottery)
await callback.answer()
@account_router.callback_query(F.data.startswith("winner_lottery:"))
async def choose_winner_place(callback: CallbackQuery, state: FSMContext):
"""Выбор места для победителя"""
if not is_admin(callback.from_user.id):
await callback.answer("⛔ Доступно только администраторам", show_alert=True)
return
lottery_id = int(callback.data.split(":")[1])
# Сохраняем ID розыгрыша
await state.update_data(winner_lottery_id=lottery_id)
async with async_session_maker() as session:
lottery = await LotteryService.get_lottery(session, lottery_id)
if not lottery:
await callback.message.edit_text("❌ Розыгрыш не найден")
await state.clear()
await callback.answer()
return
# Получаем призы
prizes = lottery.prizes or []
# Формируем кнопки с местами
buttons = []
for i, prize in enumerate(prizes[:10], 1): # Максимум 10 мест
prize_text = prize if isinstance(prize, str) else prize.get('description', f'Приз {i}')
buttons.append([InlineKeyboardButton(
text=f"🏆 Место {i}: {prize_text[:30]}",
callback_data=f"winner_place:{i}"
)])
# Если призов нет, предлагаем места 1-5
if not buttons:
for i in range(1, 6):
buttons.append([InlineKeyboardButton(
text=f"🏆 Место {i}",
callback_data=f"winner_place:{i}"
)])
buttons.append([InlineKeyboardButton(
text="❌ Отмена",
callback_data="account_action:cancel"
)])
keyboard = InlineKeyboardMarkup(inline_keyboard=buttons)
data = await state.get_data()
account = data.get("detected_accounts", [])[0]
await callback.message.edit_text(
f"👑 <b>Установка победителя</b>\n\n"
f"Розыгрыш: <i>{lottery.title}</i>\n"
f"Счет: <code>{account}</code>\n\n"
f"Выберите место:",
reply_markup=keyboard,
parse_mode="HTML"
)
await state.set_state(AccountStates.waiting_for_winner_place)
await callback.answer()
@account_router.callback_query(F.data.startswith("winner_place:"))
async def set_account_winner(callback: CallbackQuery, state: FSMContext):
"""Установка счета как победителя"""
if not is_admin(callback.from_user.id):
await callback.answer("⛔ Доступно только администраторам", show_alert=True)
return
place = int(callback.data.split(":")[1])
# Получаем данные
data = await state.get_data()
account = data.get("detected_accounts", [])[0]
lottery_id = data.get("winner_lottery_id")
if not account or not lottery_id:
await callback.message.edit_text("❌ Ошибка: данные не найдены")
await state.clear()
await callback.answer()
return
# Показываем процесс
await callback.message.edit_text("⏳ Устанавливаем победителя...")
async with async_session_maker() as session:
lottery = await LotteryService.get_lottery(session, lottery_id)
# Получаем приз для этого места
prize = None
if lottery.prizes and len(lottery.prizes) >= place:
prize_info = lottery.prizes[place - 1]
prize = prize_info if isinstance(prize_info, str) else prize_info.get('description')
# Устанавливаем победителя
result = await AccountParticipationService.set_account_as_winner(
session, lottery_id, account, place, prize
)
if result["success"]:
text = (
f"✅ <b>Победитель установлен!</b>\n\n"
f"Розыгрыш: <i>{lottery.title}</i>\n"
f"Счет: <code>{account}</code>\n"
f"Место: <b>{place}</b>\n"
)
if prize:
text += f"Приз: <i>{prize}</i>"
await callback.answer("✅ Победитель установлен!", show_alert=True)
else:
text = f"{result['message']}"
await callback.answer("❌ Ошибка", show_alert=True)
await callback.message.edit_text(text, parse_mode="HTML")
await state.clear()

View File

@@ -0,0 +1,287 @@
"""
Сервис для работы с участием счетов в розыгрышах (без привязки к пользователям)
"""
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, delete, func
from ..core.models import Lottery, Participation, Winner
from ..utils.account_utils import validate_account_number, format_account_number, parse_accounts_from_message, search_accounts_by_pattern
from typing import List, Optional, Dict, Any
class AccountParticipationService:
"""Сервис для работы с участием счетов в розыгрышах"""
@staticmethod
async def add_account_to_lottery(
session: AsyncSession,
lottery_id: int,
account_number: str
) -> Dict[str, Any]:
"""
Добавить счет в розыгрыш
Returns:
Dict с ключами: success, message, account_number
"""
# Валидируем и форматируем
formatted_account = format_account_number(account_number)
if not formatted_account:
return {
"success": False,
"message": f"Неверный формат счета: {account_number}",
"account_number": account_number
}
# Проверяем существование розыгрыша
lottery = await session.get(Lottery, lottery_id)
if not lottery:
return {
"success": False,
"message": f"Розыгрыш #{lottery_id} не найден",
"account_number": formatted_account
}
# Проверяем, не участвует ли уже этот счет
existing = await session.execute(
select(Participation).where(
Participation.lottery_id == lottery_id,
Participation.account_number == formatted_account
)
)
if existing.scalar_one_or_none():
return {
"success": False,
"message": f"Счет {formatted_account} уже участвует в розыгрыше",
"account_number": formatted_account
}
# Добавляем участие
participation = Participation(
lottery_id=lottery_id,
account_number=formatted_account,
user_id=None # Без привязки к пользователю
)
session.add(participation)
await session.commit()
return {
"success": True,
"message": f"Счет {formatted_account} добавлен в розыгрыш",
"account_number": formatted_account
}
@staticmethod
async def add_accounts_bulk(
session: AsyncSession,
lottery_id: int,
account_numbers: List[str]
) -> Dict[str, Any]:
"""
Массовое добавление счетов в розыгрыш
"""
results = {
"added": 0,
"skipped": 0,
"errors": [],
"details": [],
"added_accounts": [],
"skipped_accounts": []
}
for account in account_numbers:
result = await AccountParticipationService.add_account_to_lottery(
session, lottery_id, account
)
if result["success"]:
results["added"] += 1
results["added_accounts"].append(result["account_number"])
results["details"].append(f"{result['account_number']}")
else:
results["skipped"] += 1
results["skipped_accounts"].append(account)
results["errors"].append(result["message"])
results["details"].append(f"{result['message']}")
return results
@staticmethod
async def remove_account_from_lottery(
session: AsyncSession,
lottery_id: int,
account_number: str
) -> Dict[str, Any]:
"""Удалить счет из розыгрыша"""
formatted_account = format_account_number(account_number)
if not formatted_account:
return {
"success": False,
"message": f"Неверный формат счета: {account_number}"
}
participation = await session.execute(
select(Participation).where(
Participation.lottery_id == lottery_id,
Participation.account_number == formatted_account
)
)
participation = participation.scalar_one_or_none()
if not participation:
return {
"success": False,
"message": f"Счет {formatted_account} не участвует в розыгрыше"
}
await session.delete(participation)
await session.commit()
return {
"success": True,
"message": f"Счет {formatted_account} удален из розыгрыша"
}
@staticmethod
async def get_lottery_accounts(
session: AsyncSession,
lottery_id: int,
limit: Optional[int] = None,
offset: int = 0
) -> List[str]:
"""Получить все счета, участвующие в розыгрыше"""
query = select(Participation.account_number).where(
Participation.lottery_id == lottery_id,
Participation.account_number.isnot(None)
).order_by(Participation.created_at.desc())
if limit:
query = query.offset(offset).limit(limit)
result = await session.execute(query)
return [account for account in result.scalars().all() if account]
@staticmethod
async def get_accounts_count(session: AsyncSession, lottery_id: int) -> int:
"""Получить количество счетов в розыгрыше"""
result = await session.scalar(
select(func.count(Participation.id)).where(
Participation.lottery_id == lottery_id,
Participation.account_number.isnot(None)
)
)
return result or 0
@staticmethod
async def search_accounts_in_lottery(
session: AsyncSession,
lottery_id: int,
pattern: str,
limit: int = 20
) -> List[str]:
"""
Поиск счетов в розыгрыше по частичному совпадению
Args:
lottery_id: ID розыгрыша
pattern: Паттерн поиска (например "11-22" или "33")
limit: Максимальное количество результатов
"""
# Получаем все счета розыгрыша
all_accounts = await AccountParticipationService.get_lottery_accounts(
session, lottery_id
)
# Ищем совпадения
return search_accounts_by_pattern(pattern, all_accounts)[:limit]
@staticmethod
async def set_account_as_winner(
session: AsyncSession,
lottery_id: int,
account_number: str,
place: int,
prize: Optional[str] = None
) -> Dict[str, Any]:
"""
Установить счет как победителя на указанное место
"""
formatted_account = format_account_number(account_number)
if not formatted_account:
return {
"success": False,
"message": f"Неверный формат счета: {account_number}"
}
# Проверяем, участвует ли счет в розыгрыше
participation = await session.execute(
select(Participation).where(
Participation.lottery_id == lottery_id,
Participation.account_number == formatted_account
)
)
if not participation.scalar_one_or_none():
return {
"success": False,
"message": f"Счет {formatted_account} не участвует в розыгрыше"
}
# Проверяем, не занято ли уже это место
existing_winner = await session.execute(
select(Winner).where(
Winner.lottery_id == lottery_id,
Winner.place == place
)
)
existing_winner = existing_winner.scalar_one_or_none()
if existing_winner:
# Обновляем существующего победителя
existing_winner.account_number = formatted_account
existing_winner.user_id = None
existing_winner.is_manual = True
if prize:
existing_winner.prize = prize
else:
# Создаем нового победителя
winner = Winner(
lottery_id=lottery_id,
account_number=formatted_account,
user_id=None,
place=place,
prize=prize,
is_manual=True
)
session.add(winner)
await session.commit()
return {
"success": True,
"message": f"Счет {formatted_account} установлен победителем на место {place}",
"account_number": formatted_account,
"place": place
}
@staticmethod
async def get_lottery_winners_accounts(
session: AsyncSession,
lottery_id: int
) -> List[Dict[str, Any]]:
"""Получить всех победителей розыгрыша (счета)"""
result = await session.execute(
select(Winner).where(
Winner.lottery_id == lottery_id,
Winner.account_number.isnot(None)
).order_by(Winner.place)
)
winners = result.scalars().all()
return [
{
"place": w.place,
"account_number": w.account_number,
"prize": w.prize,
"is_manual": w.is_manual
}
for w in winners
]

2413
src/handlers/admin_panel.py Normal file

File diff suppressed because it is too large Load Diff

3
src/utils/__init__.py Normal file
View File

@@ -0,0 +1,3 @@
"""
Утилиты и вспомогательные функции.
"""

151
src/utils/account_utils.py Normal file
View File

@@ -0,0 +1,151 @@
"""
Утилиты для работы с клиентскими счетами
"""
import re
from typing import Optional, List
def validate_account_number(account_number: str) -> bool:
"""
Проверяет корректность формата номера клиентского счета
Формат: XX-XX-XX-XX-XX-XX-XX (7 пар цифр через дефис)
Args:
account_number: Номер счета для проверки
Returns:
bool: True если формат корректен, False иначе
"""
if not account_number:
return False
# Паттерн для 7 пар цифр через дефис
pattern = r'^\d{2}-\d{2}-\d{2}-\d{2}-\d{2}-\d{2}-\d{2}$'
return bool(re.match(pattern, account_number))
def format_account_number(account_number: str) -> Optional[str]:
"""
Форматирует номер счета, убирая лишние символы
Args:
account_number: Исходный номер счета
Returns:
str: Отформатированный номер счета или None если некорректный
"""
if not account_number:
return None
# Убираем все символы кроме цифр
digits_only = re.sub(r'\D', '', account_number)
# Проверяем что осталось ровно 14 цифр
if len(digits_only) != 14:
return None
# Форматируем как XX-XX-XX-XX-XX-XX-XX
formatted = '-'.join([digits_only[i:i+2] for i in range(0, 14, 2)])
return formatted
def generate_account_number() -> str:
"""
Генерирует случайный номер клиентского счета для тестирования
Returns:
str: Сгенерированный номер счета
"""
import random
# Генерируем 14 случайных цифр
digits = ''.join([str(random.randint(0, 9)) for _ in range(14)])
# Форматируем
return '-'.join([digits[i:i+2] for i in range(0, 14, 2)])
def mask_account_number(account_number: str, show_last_digits: int = 4) -> str:
"""
Маскирует номер счета для безопасного отображения
Args:
account_number: Полный номер счета
show_last_digits: Количество последних цифр для отображения
Returns:
str: Замаскированный номер счета
"""
if not validate_account_number(account_number):
return "Некорректный номер"
if show_last_digits <= 0:
return "**-**-**-**-**-**-**"
# Убираем дефисы для работы с цифрами
digits = account_number.replace('-', '')
# Определяем сколько цифр показать
show_digits = min(show_last_digits, len(digits))
# Создаем маску
masked_digits = '*' * (len(digits) - show_digits) + digits[-show_digits:]
# Возвращаем отформатированный результат (7 пар)
return '-'.join([masked_digits[i:i+2] for i in range(0, 14, 2)])
def parse_accounts_from_message(text: str) -> List[str]:
"""
Извлекает все валидные номера счетов из текста сообщения
Args:
text: Текст сообщения
Returns:
List[str]: Список найденных и отформатированных номеров счетов
"""
if not text:
return []
accounts = []
# Ищем паттерны счетов в тексте (7 пар цифр)
pattern = r'\b\d{2}[-\s]?\d{2}[-\s]?\d{2}[-\s]?\d{2}[-\s]?\d{2}[-\s]?\d{2}[-\s]?\d{2}\b'
matches = re.findall(pattern, text)
for match in matches:
formatted = format_account_number(match)
if formatted and formatted not in accounts:
accounts.append(formatted)
return accounts
def search_accounts_by_pattern(pattern: str, account_list: List[str]) -> List[str]:
"""
Ищет счета по частичному совпадению (1-2 пары цифр)
Args:
pattern: Паттерн для поиска (например "11-22" или "11")
account_list: Список счетов для поиска
Returns:
List[str]: Список найденных счетов
"""
if not pattern or not account_list:
return []
# Убираем лишние символы из паттерна
clean_pattern = re.sub(r'[^\d-]', '', pattern).strip('-')
if not clean_pattern:
return []
results = []
for account in account_list:
if clean_pattern in account:
results.append(account)
return results

423
src/utils/admin_utils.py Normal file
View File

@@ -0,0 +1,423 @@
"""
Дополнительные утилиты для админ-панели
"""
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, delete, update, func
from ..core.models import User, Lottery, Participation, Winner
from typing import List, Dict, Optional
import csv
import json
from datetime import datetime
class AdminUtils:
"""Утилиты для админ-панели"""
@staticmethod
async def get_lottery_statistics(session: AsyncSession, lottery_id: int) -> Dict:
"""Получить детальную статистику по розыгрышу"""
lottery = await session.get(Lottery, lottery_id)
if not lottery:
return {}
# Количество участников
participants_count = await session.scalar(
select(func.count(Participation.id))
.where(Participation.lottery_id == lottery_id)
)
# Победители
winners_count = await session.scalar(
select(func.count(Winner.id))
.where(Winner.lottery_id == lottery_id)
)
# Ручные победители
manual_winners_count = await session.scalar(
select(func.count(Winner.id))
.where(Winner.lottery_id == lottery_id, Winner.is_manual == True)
)
# Участники по дням
participants_by_date = await session.execute(
select(
func.date(Participation.created_at).label('date'),
func.count(Participation.id).label('count')
)
.where(Participation.lottery_id == lottery_id)
.group_by(func.date(Participation.created_at))
.order_by(func.date(Participation.created_at))
)
return {
'lottery': lottery,
'participants_count': participants_count,
'winners_count': winners_count,
'manual_winners_count': manual_winners_count,
'random_winners_count': winners_count - manual_winners_count,
'participants_by_date': participants_by_date.fetchall()
}
@staticmethod
async def export_lottery_data(session: AsyncSession, lottery_id: int) -> Dict:
"""Экспорт данных розыгрыша"""
lottery = await session.get(Lottery, lottery_id)
if not lottery:
return {}
# Участники
participants = await session.execute(
select(User, Participation)
.join(Participation)
.where(Participation.lottery_id == lottery_id)
.order_by(Participation.created_at)
)
participants_data = []
for user, participation in participants:
participants_data.append({
'telegram_id': user.telegram_id,
'username': user.username,
'first_name': user.first_name,
'last_name': user.last_name,
'joined_at': participation.created_at.isoformat()
})
# Победители
winners = await session.execute(
select(Winner, User)
.join(User)
.where(Winner.lottery_id == lottery_id)
.order_by(Winner.place)
)
winners_data = []
for winner, user in winners:
winners_data.append({
'place': winner.place,
'telegram_id': user.telegram_id,
'username': user.username,
'first_name': user.first_name,
'prize': winner.prize,
'is_manual': winner.is_manual,
'won_at': winner.created_at.isoformat()
})
return {
'lottery': {
'id': lottery.id,
'title': lottery.title,
'description': lottery.description,
'created_at': lottery.created_at.isoformat(),
'is_completed': lottery.is_completed,
'prizes': lottery.prizes,
'manual_winners': lottery.manual_winners
},
'participants': participants_data,
'winners': winners_data,
'export_date': datetime.now().isoformat()
}
@staticmethod
async def bulk_add_participants(
session: AsyncSession,
lottery_id: int,
telegram_ids: List[int]
) -> Dict[str, int]:
"""Массовое добавление участников"""
added = 0
skipped = 0
errors = []
for telegram_id in telegram_ids:
try:
# Проверяем, есть ли пользователь
user = await session.execute(
select(User).where(User.telegram_id == telegram_id)
)
user = user.scalar_one_or_none()
if not user:
errors.append(f"Пользователь {telegram_id} не найден")
continue
# Проверяем, не участвует ли уже
existing = await session.execute(
select(Participation).where(
Participation.lottery_id == lottery_id,
Participation.user_id == user.id
)
)
if existing.scalar_one_or_none():
skipped += 1
continue
# Добавляем участника
participation = Participation(
lottery_id=lottery_id,
user_id=user.id
)
session.add(participation)
added += 1
except Exception as e:
errors.append(f"Ошибка с {telegram_id}: {str(e)}")
if added > 0:
await session.commit()
return {
'added': added,
'skipped': skipped,
'errors': errors
}
@staticmethod
async def remove_participant(
session: AsyncSession,
lottery_id: int,
telegram_id: int
) -> bool:
"""Удалить участника из розыгрыша"""
user = await session.execute(
select(User).where(User.telegram_id == telegram_id)
)
user = user.scalar_one_or_none()
if not user:
return False
result = await session.execute(
delete(Participation).where(
Participation.lottery_id == lottery_id,
Participation.user_id == user.id
)
)
await session.commit()
return result.rowcount > 0
@staticmethod
async def update_lottery(
session: AsyncSession,
lottery_id: int,
**updates
) -> bool:
"""Обновить данные розыгрыша"""
try:
await session.execute(
update(Lottery)
.where(Lottery.id == lottery_id)
.values(**updates)
)
await session.commit()
return True
except Exception:
return False
@staticmethod
async def delete_lottery(session: AsyncSession, lottery_id: int) -> bool:
"""Удалить розыгрыш (со всеми связанными данными)"""
try:
# Удаляем победителей
await session.execute(
delete(Winner).where(Winner.lottery_id == lottery_id)
)
# Удаляем участников
await session.execute(
delete(Participation).where(Participation.lottery_id == lottery_id)
)
# Удаляем сам розыгрыш
await session.execute(
delete(Lottery).where(Lottery.id == lottery_id)
)
await session.commit()
return True
except Exception:
await session.rollback()
return False
@staticmethod
async def get_user_activity(
session: AsyncSession,
telegram_id: int
) -> Dict:
"""Получить активность пользователя"""
user = await session.execute(
select(User).where(User.telegram_id == telegram_id)
)
user = user.scalar_one_or_none()
if not user:
return {}
# Участия
participations = await session.execute(
select(Participation, Lottery)
.join(Lottery)
.where(Participation.user_id == user.id)
.order_by(Participation.created_at.desc())
)
# Выигрыши
wins = await session.execute(
select(Winner, Lottery)
.join(Lottery)
.where(Winner.user_id == user.id)
.order_by(Winner.created_at.desc())
)
participations_data = []
for participation, lottery in participations:
participations_data.append({
'lottery_title': lottery.title,
'lottery_id': lottery.id,
'joined_at': participation.created_at,
'lottery_completed': lottery.is_completed
})
wins_data = []
for win, lottery in wins:
wins_data.append({
'lottery_title': lottery.title,
'lottery_id': lottery.id,
'place': win.place,
'prize': win.prize,
'is_manual': win.is_manual,
'won_at': win.created_at
})
return {
'user': user,
'total_participations': len(participations_data),
'total_wins': len(wins_data),
'participations': participations_data,
'wins': wins_data
}
@staticmethod
async def cleanup_old_data(session: AsyncSession, days: int = 30) -> Dict[str, int]:
"""Очистка старых данных"""
from datetime import datetime, timedelta
cutoff_date = datetime.now() - timedelta(days=days)
# Удаляем старые завершенные розыгрыши
old_lotteries = await session.execute(
select(Lottery.id)
.where(
Lottery.is_completed == True,
Lottery.created_at < cutoff_date
)
)
lottery_ids = [row[0] for row in old_lotteries.fetchall()]
deleted_winners = 0
deleted_participations = 0
deleted_lotteries = 0
for lottery_id in lottery_ids:
# Удаляем победителей
result = await session.execute(
delete(Winner).where(Winner.lottery_id == lottery_id)
)
deleted_winners += result.rowcount
# Удаляем участников
result = await session.execute(
delete(Participation).where(Participation.lottery_id == lottery_id)
)
deleted_participations += result.rowcount
# Удаляем розыгрыш
result = await session.execute(
delete(Lottery).where(Lottery.id == lottery_id)
)
deleted_lotteries += result.rowcount
await session.commit()
return {
'deleted_lotteries': deleted_lotteries,
'deleted_participations': deleted_participations,
'deleted_winners': deleted_winners,
'cutoff_date': cutoff_date.isoformat()
}
class ReportGenerator:
"""Генератор отчетов"""
@staticmethod
async def generate_summary_report(session: AsyncSession) -> str:
"""Генерация сводного отчета"""
# Общая статистика
total_users = await session.scalar(select(func.count(User.id)))
total_lotteries = await session.scalar(select(func.count(Lottery.id)))
active_lotteries = await session.scalar(
select(func.count(Lottery.id))
.where(Lottery.is_active == True, Lottery.is_completed == False)
)
completed_lotteries = await session.scalar(
select(func.count(Lottery.id)).where(Lottery.is_completed == True)
)
total_participations = await session.scalar(select(func.count(Participation.id)))
total_winners = await session.scalar(select(func.count(Winner.id)))
# Топ розыгрыши по участникам
top_lotteries = await session.execute(
select(
Lottery.title,
Lottery.created_at,
func.count(Participation.id).label('participants')
)
.join(Participation, isouter=True)
.group_by(Lottery.id)
.order_by(func.count(Participation.id).desc())
.limit(5)
)
# Топ активные пользователи
top_users = await session.execute(
select(
User.first_name,
User.username,
func.count(Participation.id).label('participations'),
func.count(Winner.id).label('wins')
)
.join(Participation, isouter=True)
.join(Winner, isouter=True)
.group_by(User.id)
.order_by(func.count(Participation.id).desc())
.limit(5)
)
report = f"📊 СВОДНЫЙ ОТЧЕТ\n"
report += f"Дата: {datetime.now().strftime('%d.%m.%Y %H:%M')}\n\n"
report += f"📈 ОБЩАЯ СТАТИСТИКА\n"
report += f"👥 Пользователей: {total_users}\n"
report += f"🎲 Всего розыгрышей: {total_lotteries}\n"
report += f"🟢 Активных: {active_lotteries}\n"
report += f"✅ Завершенных: {completed_lotteries}\n"
report += f"🎫 Всего участий: {total_participations}\n"
report += f"🏆 Всего победителей: {total_winners}\n\n"
if total_lotteries > 0:
avg_participation = total_participations / total_lotteries
report += f"📊 Среднее участие на розыгрыш: {avg_participation:.1f}\n\n"
report += f"🏆 ТОП РОЗЫГРЫШИ ПО УЧАСТНИКАМ\n"
for i, (title, created_at, participants) in enumerate(top_lotteries.fetchall(), 1):
report += f"{i}. {title}\n"
report += f" Участников: {participants} | {created_at.strftime('%d.%m.%Y')}\n\n"
report += f"🔥 ТОП АКТИВНЫЕ ПОЛЬЗОВАТЕЛИ\n"
for i, (first_name, username, participations, wins) in enumerate(top_users.fetchall(), 1):
name = f"@{username}" if username else first_name
report += f"{i}. {name}\n"
report += f" Участий: {participations} | Побед: {wins}\n\n"
return report

View File

@@ -0,0 +1,161 @@
"""
Декораторы для асинхронной обработки запросов пользователей
"""
import asyncio
import functools
from typing import Callable, Any
from aiogram import types
from .task_manager import task_manager, TaskPriority
import uuid
import logging
logger = logging.getLogger(__name__)
def async_user_action(priority: TaskPriority = TaskPriority.NORMAL, timeout: float = 30.0):
"""
Декоратор для асинхронной обработки действий пользователей
Args:
priority: Приоритет задачи
timeout: Таймаут выполнения в секундах
"""
def decorator(func: Callable) -> Callable:
@functools.wraps(func)
async def wrapper(*args, **kwargs):
# Извлекаем информацию о пользователе
user_id = None
action_name = func.__name__
# Ищем пользователя в аргументах
for arg in args:
if isinstance(arg, (types.Message, types.CallbackQuery)):
user_id = arg.from_user.id
break
if user_id is None:
# Если не нашли пользователя, выполняем синхронно
logger.warning(f"Не удалось определить user_id для {action_name}, выполнение синхронно")
return await func(*args, **kwargs)
# Генерируем ID задачи
task_id = f"{action_name}_{user_id}_{uuid.uuid4().hex[:8]}"
try:
# Добавляем задачу в очередь
await task_manager.add_task(
task_id,
user_id,
func,
*args,
priority=priority,
timeout=timeout,
**kwargs
)
logger.debug(f"Задача {task_id} добавлена в очередь для пользователя {user_id}")
except ValueError as e:
# Превышен лимит задач пользователя
logger.warning(f"Лимит задач для пользователя {user_id}: {e}")
# Отправляем сообщение о превышении лимита
if isinstance(args[0], types.Message):
message = args[0]
await message.answer(
"⚠️ Вы превысили лимит одновременных запросов. "
"Пожалуйста, дождитесь завершения предыдущих операций."
)
elif isinstance(args[0], types.CallbackQuery):
callback = args[0]
await callback.answer(
"⚠️ Превышен лимит запросов. Дождитесь завершения предыдущих операций.",
show_alert=True
)
return None
return wrapper
return decorator
def admin_async_action(priority: TaskPriority = TaskPriority.HIGH, timeout: float = 60.0):
"""
Декоратор для асинхронной обработки действий администраторов
(повышенный приоритет и больший таймаут)
"""
return async_user_action(priority=priority, timeout=timeout)
def critical_action(timeout: float = 120.0):
"""
Декоратор для критических действий (розыгрыши, важные операции)
"""
return async_user_action(priority=TaskPriority.CRITICAL, timeout=timeout)
def db_operation(timeout: float = 15.0):
"""
Декоратор для операций с базой данных
"""
return async_user_action(priority=TaskPriority.NORMAL, timeout=timeout)
# Функции для работы со статистикой задач
async def get_task_stats() -> dict:
"""Получить общую статистику задач"""
return task_manager.get_stats()
async def get_user_task_info(user_id: int) -> dict:
"""Получить информацию о задачах пользователя"""
return task_manager.get_user_stats(user_id)
async def format_task_stats() -> str:
"""Форматированная статистика для админов"""
stats = await get_task_stats()
text = "📊 **Статистика обработки задач:**\n\n"
text += f"🟢 Активных воркеров: {stats['workers_count']}\n"
text += f"⚙️ Выполняется задач: {stats['active_tasks']}\n"
text += f"📋 В очереди: {stats['queue_size']}\n"
text += f"✅ Выполнено: {stats['completed_tasks']}\n"
text += f"❌ Ошибок: {stats['failed_tasks']}\n\n"
if stats['user_tasks']:
text += "👥 **Активные пользователи:**\n"
for user_id, task_count in stats['user_tasks'].items():
if task_count > 0:
text += f"• ID {user_id}: {task_count} задач\n"
return text
# Middleware для автоматического управления задачами
class TaskManagerMiddleware:
"""Middleware для управления менеджером задач"""
def __init__(self):
self.started = False
async def __call__(self, handler: Callable, event: types.TelegramObject, data: dict):
# Запускаем менеджер при первом обращении
if not self.started:
await task_manager.start()
self.started = True
logger.info("Менеджер задач запущен через middleware")
# Продолжаем обработку
return await handler(event, data)
# Функция для изящного завершения
async def shutdown_task_manager():
"""Завершение работы менеджера задач"""
logger.info("Завершение работы менеджера задач...")
await task_manager.stop()
logger.info("Менеджер задач остановлен")

268
src/utils/task_manager.py Normal file
View File

@@ -0,0 +1,268 @@
"""
Система управления многопоточностью и очередями для обработки запросов
"""
import asyncio
import time
from typing import Dict, Any, Optional, Callable
from dataclasses import dataclass
from enum import Enum
import logging
logger = logging.getLogger(__name__)
class TaskPriority(Enum):
"""Приоритеты задач"""
LOW = 1
NORMAL = 2
HIGH = 3
CRITICAL = 4
@dataclass
class Task:
"""Задача для выполнения"""
id: str
user_id: int
priority: TaskPriority
func: Callable
args: tuple
kwargs: dict
created_at: float
timeout: float = 30.0
def __lt__(self, other):
"""Сравнение для приоритетной очереди"""
if self.priority.value != other.priority.value:
return self.priority.value > other.priority.value
return self.created_at < other.created_at
class AsyncTaskManager:
"""Менеджер асинхронных задач с поддержкой приоритетов и ограничений"""
def __init__(self, max_workers: int = 10, max_user_concurrent: int = 3):
self.max_workers = max_workers
self.max_user_concurrent = max_user_concurrent
# Очереди и семафоры - будут созданы при запуске
self.task_queue: Optional[asyncio.PriorityQueue] = None
self.worker_semaphore: Optional[asyncio.Semaphore] = None
self.user_semaphores: Dict[int, asyncio.Semaphore] = {}
# Статистика
self.active_tasks: Dict[str, Task] = {}
self.user_task_counts: Dict[int, int] = {}
self.completed_tasks = 0
self.failed_tasks = 0
# Воркеры
self.workers = []
self.running = False
async def start(self):
"""Запуск менеджера задач"""
if self.running:
return
# Создаём asyncio объекты в правильном event loop
self.task_queue = asyncio.PriorityQueue()
self.worker_semaphore = asyncio.Semaphore(self.max_workers)
self.user_semaphores.clear() # Очищаем старые семафоры
self.running = True
logger.info(f"Запуск {self.max_workers} воркеров для обработки задач")
# Создаём воркеры
for i in range(self.max_workers):
worker = asyncio.create_task(self._worker(f"worker-{i}"))
self.workers.append(worker)
async def stop(self):
"""Остановка менеджера задач"""
if not self.running:
return
self.running = False
logger.info("Остановка менеджера задач...")
# Отменяем всех воркеров
for worker in self.workers:
worker.cancel()
# Ждём завершения
await asyncio.gather(*self.workers, return_exceptions=True)
self.workers.clear()
# Очищаем asyncio объекты
self.task_queue = None
self.worker_semaphore = None
self.user_semaphores.clear()
logger.info("Менеджер задач остановлен")
async def add_task(self,
task_id: str,
user_id: int,
func: Callable,
*args,
priority: TaskPriority = TaskPriority.NORMAL,
timeout: float = 30.0,
**kwargs) -> str:
"""Добавить задачу в очередь"""
if not self.running or self.task_queue is None:
raise RuntimeError("TaskManager не запущен")
# Проверяем лимиты пользователя
user_count = self.user_task_counts.get(user_id, 0)
if user_count >= self.max_user_concurrent:
raise ValueError(f"Пользователь {user_id} превысил лимит одновременных задач ({self.max_user_concurrent})")
# Создаём задачу
task = Task(
id=task_id,
user_id=user_id,
priority=priority,
func=func,
args=args,
kwargs=kwargs,
created_at=time.time(),
timeout=timeout
)
# Добавляем в очередь
await self.task_queue.put(task)
# Обновляем статистику
self.user_task_counts[user_id] = user_count + 1
logger.debug(f"Задача {task_id} добавлена в очередь (пользователь: {user_id}, приоритет: {priority.name})")
return task_id
async def _worker(self, worker_name: str):
"""Воркер для выполнения задач"""
logger.debug(f"Воркер {worker_name} запущен")
while self.running and self.task_queue is not None and self.worker_semaphore is not None:
try:
# Получаем задачу из очереди (с таймаутом)
try:
task = await asyncio.wait_for(self.task_queue.get(), timeout=1.0)
except asyncio.TimeoutError:
continue
# Получаем семафоры
async with self.worker_semaphore:
user_semaphore = self._get_user_semaphore(task.user_id)
if user_semaphore is not None:
async with user_semaphore:
await self._execute_task(worker_name, task)
else:
await self._execute_task(worker_name, task)
# Отмечаем задачу как выполненную
self.task_queue.task_done()
except asyncio.CancelledError:
logger.debug(f"Воркер {worker_name} отменён")
break
except Exception as e:
logger.error(f"Ошибка в воркере {worker_name}: {e}", exc_info=True)
logger.debug(f"Воркер {worker_name} завершён")
def _get_user_semaphore(self, user_id: int) -> Optional[asyncio.Semaphore]:
"""Получить семафор пользователя"""
if not self.running:
return None
if user_id not in self.user_semaphores:
self.user_semaphores[user_id] = asyncio.Semaphore(self.max_user_concurrent)
return self.user_semaphores[user_id]
async def _execute_task(self, worker_name: str, task: Task):
"""Выполнить задачу"""
task_start = time.time()
try:
# Регистрируем активную задачу
self.active_tasks[task.id] = task
logger.debug(f"Воркер {worker_name} выполняет задачу {task.id}")
# Выполняем с таймаутом
try:
if asyncio.iscoroutinefunction(task.func):
result = await asyncio.wait_for(
task.func(*task.args, **task.kwargs),
timeout=task.timeout
)
else:
# Для синхронных функций
result = await asyncio.wait_for(
asyncio.to_thread(task.func, *task.args, **task.kwargs),
timeout=task.timeout
)
self.completed_tasks += 1
execution_time = time.time() - task_start
logger.debug(f"Задача {task.id} выполнена за {execution_time:.2f}с")
except asyncio.TimeoutError:
logger.warning(f"Задача {task.id} превысила таймаут {task.timeout}с")
self.failed_tasks += 1
raise
except Exception as e:
logger.error(f"Ошибка выполнения задачи {task.id}: {e}")
self.failed_tasks += 1
raise
finally:
# Убираем из активных и обновляем счётчики
self.active_tasks.pop(task.id, None)
user_count = self.user_task_counts.get(task.user_id, 0)
if user_count > 0:
self.user_task_counts[task.user_id] = user_count - 1
def get_stats(self) -> Dict[str, Any]:
"""Получить статистику менеджера"""
return {
'running': self.running,
'workers_count': len(self.workers),
'active_tasks': len(self.active_tasks),
'queue_size': self.task_queue.qsize() if self.task_queue is not None else 0,
'completed_tasks': self.completed_tasks,
'failed_tasks': self.failed_tasks,
'user_tasks': dict(self.user_task_counts)
}
def get_user_stats(self, user_id: int) -> Dict[str, Any]:
"""Получить статистику пользователя"""
active_user_tasks = [
task for task in self.active_tasks.values()
if task.user_id == user_id
]
return {
'active_tasks': len(active_user_tasks),
'max_concurrent': self.max_user_concurrent,
'can_add_task': len(active_user_tasks) < self.max_user_concurrent,
'task_details': [
{
'id': task.id,
'priority': task.priority.name,
'created_at': task.created_at,
'running_time': time.time() - task.created_at
}
for task in active_user_tasks
]
}
# Глобальный экземпляр менеджера задач
task_manager = AsyncTaskManager(
max_workers=15, # Максимум воркеров
max_user_concurrent=5 # Максимум задач на пользователя
)

124
src/utils/utils.py Normal file
View File

@@ -0,0 +1,124 @@
#!/usr/bin/env python3
"""
Утилиты для управления ботом
"""
import asyncio
import sys
from sqlalchemy.ext.asyncio import AsyncSession
from ..core.database import async_session_maker, init_db
from ..core.services import UserService
from ..core.config import ADMIN_IDS
async def setup_admin_users():
"""Установить права администратора для пользователей из ADMIN_IDS"""
if not ADMIN_IDS:
print("❌ Список ADMIN_IDS пуст")
return
async with async_session_maker() as session:
for admin_id in ADMIN_IDS:
success = await UserService.set_admin(session, admin_id, True)
if success:
print(f"✅ Права администратора установлены для ID: {admin_id}")
else:
print(f"⚠️ Пользователь с ID {admin_id} не найден в базе")
async def create_sample_lottery():
"""Создать пример розыгрыша для тестирования"""
from ..core.services import LotteryService
async with async_session_maker() as session:
# Берем первого администратора как создателя
if not ADMIN_IDS:
print("❌ Нет администраторов для создания розыгрыша")
return
admin_user = await UserService.get_user_by_telegram_id(session, ADMIN_IDS[0])
if not admin_user:
print("❌ Пользователь-администратор не найден в базе")
return
lottery = await LotteryService.create_lottery(
session,
title="🎉 Тестовый розыгрыш",
description="Это тестовый розыгрыш для демонстрации работы бота",
prizes=[
"🥇 Главный приз - 10,000 рублей",
"🥈 Второй приз - iPhone 15",
"🥉 Третий приз - AirPods Pro"
],
creator_id=admin_user.id
)
print(f"✅ Создан тестовый розыгрыш с ID: {lottery.id}")
print(f"📝 Название: {lottery.title}")
async def init_database():
"""Инициализация базы данных"""
print("🔄 Инициализация базы данных...")
await init_db()
print("✅ База данных инициализирована")
async def show_stats():
"""Показать статистику бота"""
from ..core.services import LotteryService, ParticipationService
from ..core.models import User, Lottery, Participation
from sqlalchemy import select, func
async with async_session_maker() as session:
# Количество пользователей
result = await session.execute(select(func.count(User.id)))
users_count = result.scalar()
# Количество розыгрышей
result = await session.execute(select(func.count(Lottery.id)))
lotteries_count = result.scalar()
# Количество активных розыгрышей
result = await session.execute(
select(func.count(Lottery.id))
.where(Lottery.is_active == True, Lottery.is_completed == False)
)
active_lotteries = result.scalar()
# Количество участий
result = await session.execute(select(func.count(Participation.id)))
participations_count = result.scalar()
print("\n📊 Статистика бота:")
print(f"👥 Всего пользователей: {users_count}")
print(f"🎲 Всего розыгрышей: {lotteries_count}")
print(f"🟢 Активных розыгрышей: {active_lotteries}")
print(f"🎫 Всего участий: {participations_count}")
def main():
"""Главная функция утилиты"""
if len(sys.argv) < 2:
print("Использование:")
print(" python utils.py init - Инициализация базы данных")
print(" python utils.py setup-admins - Установка прав администратора")
print(" python utils.py sample - Создание тестового розыгрыша")
print(" python utils.py stats - Показать статистику")
return
command = sys.argv[1]
if command == "init":
asyncio.run(init_database())
elif command == "setup-admins":
asyncio.run(setup_admin_users())
elif command == "sample":
asyncio.run(create_sample_lottery())
elif command == "stats":
asyncio.run(show_stats())
else:
print(f"❌ Неизвестная команда: {command}")
if __name__ == "__main__":
main()