Files
new_lottery_bot/scripts/examples.py
2025-11-16 12:36:02 +09:00

165 lines
7.5 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
Примеры использования API сервисов для тестирования
"""
import asyncio
from database import async_session_maker, init_db
from services import UserService, LotteryService, ParticipationService
async def example_usage():
"""Пример использования всех основных функций бота"""
print("🔄 Инициализация базы данных...")
await init_db()
async with async_session_maker() as session:
print("\n👤 Создание пользователей...")
# Создаем администратора
admin = await UserService.get_or_create_user(
session,
telegram_id=123456789,
username="admin_user",
first_name="Администратор",
last_name="Бота"
)
await UserService.set_admin(session, 123456789, True)
print(f"✅ Создан админ: {admin.first_name} (@{admin.username})")
# Создаем обычных пользователей
users = []
for i in range(1, 6):
user = await UserService.get_or_create_user(
session,
telegram_id=100000000 + i,
username=f"user{i}",
first_name=f"Пользователь{i}",
last_name="Тестовый"
)
users.append(user)
print(f"✅ Создан пользователь: {user.first_name}")
print(f"\n🎲 Создание розыгрыша...")
# Создаем розыгрыш
lottery = await LotteryService.create_lottery(
session,
title="🎉 Новогодний розыгрыш 2025",
description="Грандиозный новогодний розыгрыш с крутыми призами!",
prizes=[
"🥇 iPhone 15 Pro Max",
"🥈 MacBook Air M2",
"🥉 AirPods Pro",
"🏆 PlayStation 5",
"🎁 Подарочный сертификат 10,000₽"
],
creator_id=admin.id
)
print(f"✅ Создан розыгрыш: {lottery.title}")
print(f"\n🎫 Регистрация участников...")
# Добавляем участников
for user in users:
success = await LotteryService.add_participant(session, lottery.id, user.id)
if success:
print(f"{user.first_name} присоединился к розыгрышу")
print(f"\n👑 Установка ручного победителя...")
# Устанавливаем ручного победителя на 1 место
manual_winner = users[0] # Первый пользователь
success = await LotteryService.set_manual_winner(
session, lottery.id, 1, manual_winner.telegram_id
)
if success:
print(f"{manual_winner.first_name} установлен как победитель 1 места")
# Устанавливаем еще одного ручного победителя на 3 место
manual_winner2 = users[2] # Третий пользователь
success = await LotteryService.set_manual_winner(
session, lottery.id, 3, manual_winner2.telegram_id
)
if success:
print(f"{manual_winner2.first_name} установлен как победитель 3 места")
print(f"\n📊 Статистика перед розыгрышем...")
participants_count = await ParticipationService.get_participants_count(
session, lottery.id
)
print(f"👥 Участников в розыгрыше: {participants_count}")
print(f"\n🎲 Проведение розыгрыша...")
# Проводим розыгрыш
results = await LotteryService.conduct_draw(session, lottery.id)
print(f"\n🏆 Результаты розыгрыша:")
print(f"🎯 Розыгрыш: {lottery.title}")
print("=" * 50)
for place, winner_info in results.items():
user = winner_info['user']
prize = winner_info['prize']
is_manual = winner_info['is_manual']
manual_indicator = "👑 (установлен вручную)" if is_manual else "🎲 (случайный)"
print(f"{place}. {user.first_name} (@{user.username}) {manual_indicator}")
print(f" 🎁 {prize}")
print()
print("=" * 50)
print("✅ Розыгрыш завершен успешно!")
print(f"\n📈 Финальная статистика...")
# Получаем победителей из базы данных
winners = await LotteryService.get_winners(session, lottery.id)
print(f"🏆 Сохранено победителей в базе: {len(winners)}")
# Проверяем участия пользователей
for user in users[:2]: # Проверяем первых двух
participations = await ParticipationService.get_user_participations(
session, user.id
)
print(f"📝 {user.first_name} участвует в {len(participations)} розыгрышах")
async def test_edge_cases():
"""Тестирование граничных случаев"""
print("\n🧪 Тестирование граничных случаев...")
async with async_session_maker() as session:
# Попытка добавить участника дважды
user = await UserService.get_user_by_telegram_id(session, 100000001)
lottery = (await LotteryService.get_active_lotteries(session))[0]
success = await LotteryService.add_participant(session, lottery.id, user.id)
print(f"Повторное добавление участника: {'❌ Заблокировано' if not success else '⚠️ Разрешено'}")
# Попытка установить ручного победителя для несуществующего пользователя
success = await LotteryService.set_manual_winner(session, lottery.id, 10, 999999999)
print(f"Установка несуществующего победителя: {'❌ Заблокировано' if not success else '⚠️ Разрешено'}")
# Попытка провести розыгрыш завершенной лотереи
results = await LotteryService.conduct_draw(session, lottery.id)
print(f"Повторный розыгрыш: {'❌ Заблокирован' if not results else '⚠️ Разрешен'}")
if __name__ == "__main__":
print("🚀 Запуск примера использования бота для розыгрышей")
print("=" * 60)
try:
asyncio.run(example_usage())
asyncio.run(test_edge_cases())
print("\n" + "=" * 60)
print("Все тесты выполнены успешно!")
print("🎉 Бот готов к использованию!")
except Exception as e:
print(f"\n❌ Ошибка во время выполнения: {e}")
import traceback
traceback.print_exc()