278 lines
12 KiB
Python
278 lines
12 KiB
Python
"""
|
||
Тест новой функциональности: клиентские счета и многопоточность
|
||
"""
|
||
import asyncio
|
||
import sys
|
||
import os
|
||
|
||
# Добавляем путь к проекту
|
||
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
||
|
||
from src.core.database import init_db, async_session_maker
|
||
from src.core.services import UserService, LotteryService
|
||
from src.utils.account_utils import generate_account_number, validate_account_number, mask_account_number
|
||
from src.utils.task_manager import task_manager, TaskPriority
|
||
from src.display.winner_display import format_winner_display, validate_display_type
|
||
from src.utils.async_decorators import get_task_stats, format_task_stats
|
||
import time
|
||
|
||
|
||
async def test_account_functionality():
|
||
"""Тест функций работы со счетами"""
|
||
print("🧪 Тестирование функций работы со счетами")
|
||
print("=" * 50)
|
||
|
||
async with async_session_maker() as session:
|
||
# Создаём тестовых пользователей
|
||
users_data = [
|
||
{
|
||
'telegram_id': 777000001,
|
||
'username': 'client1',
|
||
'first_name': 'Клиент',
|
||
'last_name': 'Первый',
|
||
'account': '12-34-56-78-90-12-34-56'
|
||
},
|
||
{
|
||
'telegram_id': 777000002,
|
||
'username': 'client2',
|
||
'first_name': 'Клиент',
|
||
'last_name': 'Второй',
|
||
'account': '11-22-33-44-55-66-77-88'
|
||
},
|
||
{
|
||
'telegram_id': 777000003,
|
||
'username': 'client3',
|
||
'first_name': 'Клиент',
|
||
'last_name': 'Третий',
|
||
'account': None # Без счёта
|
||
}
|
||
]
|
||
|
||
created_users = []
|
||
|
||
for user_data in users_data:
|
||
# Создаём пользователя
|
||
user = await UserService.get_or_create_user(
|
||
session,
|
||
telegram_id=user_data['telegram_id'],
|
||
username=user_data['username'],
|
||
first_name=user_data['first_name'],
|
||
last_name=user_data['last_name']
|
||
)
|
||
|
||
# Устанавливаем номер счёта, если есть
|
||
if user_data['account']:
|
||
success = await UserService.set_account_number(
|
||
session, user.telegram_id, user_data['account']
|
||
)
|
||
if success:
|
||
print(f"✅ Создан пользователь {user.first_name} со счётом {user_data['account']}")
|
||
else:
|
||
print(f"❌ Ошибка установки счёта для {user.first_name}")
|
||
else:
|
||
print(f"✅ Создан пользователь {user.first_name} без счёта")
|
||
|
||
created_users.append(user)
|
||
|
||
# Тесты валидации
|
||
print("\n📋 Тестирование валидации номеров счетов:")
|
||
|
||
test_numbers = [
|
||
("12-34-56-78-90-12-34-56", True), # Корректный
|
||
("12345678901234567890123456", False), # Без дефисов
|
||
("12-34-56-78-90-12-34", False), # Короткий
|
||
("12-34-56-78-90-12-34-56-78", False), # Длинный
|
||
("ab-cd-ef-gh-ij-kl-mn-op", False), # Буквы
|
||
("", False) # Пустой
|
||
]
|
||
|
||
for number, should_be_valid in test_numbers:
|
||
is_valid = validate_account_number(number)
|
||
status = "✅" if is_valid == should_be_valid else "❌"
|
||
print(f"{status} {number} -> {'Корректный' if is_valid else 'Некорректный'}")
|
||
|
||
# Тест маскирования
|
||
print("\n🎭 Тестирование маскирования номеров:")
|
||
test_account = "12-34-56-78-90-12-34-56"
|
||
for digits in [4, 6, 8]:
|
||
masked = mask_account_number(test_account, show_last_digits=digits)
|
||
print(f"Показать последние {digits} цифр: {masked}")
|
||
|
||
# Тест поиска по счёту
|
||
print("\n🔍 Тестирование поиска по номеру счёта:")
|
||
|
||
async with async_session_maker() as session:
|
||
# Полный номер
|
||
user = await UserService.get_user_by_account(session, "12-34-56-78-90-12-34-56")
|
||
if user:
|
||
print(f"✅ Найден пользователь по полному номеру: {user.first_name}")
|
||
|
||
# Поиск по части номера
|
||
users = await UserService.search_by_account(session, "12-34")
|
||
print(f"🔍 Найдено пользователей по шаблону '12-34': {len(users)}")
|
||
|
||
for user in users:
|
||
if user.account_number:
|
||
masked = mask_account_number(user.account_number, show_last_digits=6)
|
||
print(f" • {user.first_name}: {masked}")
|
||
|
||
|
||
async def test_display_types():
|
||
"""Тест различных типов отображения победителей"""
|
||
print("\n🎨 Тестирование типов отображения победителей")
|
||
print("=" * 50)
|
||
|
||
async with async_session_maker() as session:
|
||
# Создаём розыгрыш
|
||
lottery = await LotteryService.create_lottery(
|
||
session,
|
||
title="Тест отображения победителей",
|
||
description="Розыгрыш для тестирования различных типов отображения",
|
||
prizes=["Первый приз", "Второй приз"],
|
||
creator_id=1
|
||
)
|
||
|
||
# Получаем пользователя со счётом
|
||
user = await UserService.get_user_by_account(session, "12-34-56-78-90-12-34-56")
|
||
|
||
if user and lottery:
|
||
print(f"👤 Тестируем отображение для пользователя: {user.first_name}")
|
||
print(f"💳 Номер счёта: {user.account_number}")
|
||
|
||
# Тестируем разные типы отображения
|
||
display_types = ['username', 'chat_id', 'account_number']
|
||
|
||
for display_type in display_types:
|
||
# Устанавливаем тип отображения
|
||
success = await LotteryService.set_winner_display_type(
|
||
session, lottery.id, display_type
|
||
)
|
||
|
||
if success:
|
||
# Получаем обновлённый розыгрыш
|
||
updated_lottery = await LotteryService.get_lottery(session, lottery.id)
|
||
|
||
# Тестируем отображение
|
||
public_display = format_winner_display(user, updated_lottery, show_sensitive_data=False)
|
||
admin_display = format_winner_display(user, updated_lottery, show_sensitive_data=True)
|
||
|
||
print(f"\n📺 Тип отображения: {display_type}")
|
||
print(f" 👥 Публичное: {public_display}")
|
||
print(f" 🔧 Админское: {admin_display}")
|
||
else:
|
||
print(f"❌ Ошибка установки типа отображения {display_type}")
|
||
else:
|
||
print("❌ Не удалось создать тестовые данные")
|
||
|
||
|
||
async def test_multithreading():
|
||
"""Тест многопоточности"""
|
||
print("\n⚡ Тестирование системы многопоточности")
|
||
print("=" * 50)
|
||
|
||
# Запускаем менеджер задач
|
||
await task_manager.start()
|
||
|
||
# Функция-заглушка для тестирования
|
||
async def test_task(task_name: str, duration: float, user_id_arg: int):
|
||
print(f"🔄 Запуск задачи {task_name} для пользователя {user_id_arg}")
|
||
await asyncio.sleep(duration)
|
||
print(f"✅ Завершена задача {task_name} для пользователя {user_id_arg}")
|
||
return f"Результат {task_name}"
|
||
|
||
# Добавляем задачи разных приоритетов
|
||
tasks = [
|
||
("critical_task", 2.0, 100, TaskPriority.CRITICAL),
|
||
("high_task_1", 1.5, 200, TaskPriority.HIGH),
|
||
("normal_task_1", 1.0, 300, TaskPriority.NORMAL),
|
||
("normal_task_2", 1.2, 300, TaskPriority.NORMAL),
|
||
("high_task_2", 0.8, 200, TaskPriority.HIGH),
|
||
("low_task", 0.5, 400, TaskPriority.LOW)
|
||
]
|
||
|
||
print(f"📤 Добавляем {len(tasks)} задач...")
|
||
|
||
for task_name, duration, user_id, priority in tasks:
|
||
try:
|
||
task_id = await task_manager.add_task(
|
||
task_id=f"{task_name}_{int(time.time())}",
|
||
user_id=user_id,
|
||
func=test_task,
|
||
priority=priority,
|
||
timeout=10.0,
|
||
task_name=task_name,
|
||
duration=duration,
|
||
user_id_arg=user_id
|
||
)
|
||
print(f" ➕ Добавлена задача {task_name} (приоритет: {priority.name})")
|
||
except ValueError as e:
|
||
print(f" ❌ Ошибка добавления задачи {task_name}: {e}")
|
||
|
||
# Показываем статистику
|
||
print("\n📊 Начальная статистика:")
|
||
stats_text = await format_task_stats()
|
||
print(stats_text)
|
||
|
||
# Ждём выполнения задач
|
||
print("\n⏳ Ожидание выполнения задач...")
|
||
await asyncio.sleep(5.0)
|
||
|
||
# Показываем финальную статистику
|
||
print("\n📈 Финальная статистика:")
|
||
final_stats = await format_task_stats()
|
||
print(final_stats)
|
||
|
||
# Тестируем лимиты пользователя
|
||
print("\n🚧 Тестирование лимитов пользователя:")
|
||
|
||
# Пробуем добавить много задач одному пользователю
|
||
user_id = 999
|
||
added_tasks = 0
|
||
|
||
for i in range(8): # Больше чем лимит (5)
|
||
try:
|
||
await task_manager.add_task(
|
||
task_id=f"limit_test_{i}_{int(time.time())}",
|
||
user_id=user_id,
|
||
func=test_task,
|
||
priority=TaskPriority.NORMAL,
|
||
task_name=f"limit_test_{i}",
|
||
duration=0.1,
|
||
user_id_arg=user_id
|
||
)
|
||
added_tasks += 1
|
||
except ValueError as e:
|
||
print(f" 🛑 Лимит достигнут после {added_tasks} задач: {e}")
|
||
break
|
||
|
||
# Ждём немного и останавливаем менеджер
|
||
await asyncio.sleep(2.0)
|
||
await task_manager.stop()
|
||
|
||
|
||
async def main():
|
||
"""Главная функция теста"""
|
||
print("🚀 Запуск тестирования новой функциональности")
|
||
print("=" * 60)
|
||
|
||
# Инициализируем базу данных
|
||
await init_db()
|
||
|
||
try:
|
||
# Запускаем тесты
|
||
await test_account_functionality()
|
||
await test_display_types()
|
||
await test_multithreading()
|
||
|
||
print("\n🎉 Все тесты завершены успешно!")
|
||
print("=" * 60)
|
||
|
||
except Exception as e:
|
||
print(f"\n❌ Ошибка во время тестирования: {e}")
|
||
import traceback
|
||
traceback.print_exc()
|
||
|
||
|
||
if __name__ == "__main__":
|
||
asyncio.run(main())
|