init commit

This commit is contained in:
2025-09-11 07:34:50 +09:00
commit 5ddc540f9e
36 changed files with 5103 additions and 0 deletions

1
src/__init__.py Normal file
View File

@@ -0,0 +1 @@
# Src package

553
src/bot.py Normal file
View File

@@ -0,0 +1,553 @@
#!/usr/bin/env python3
"""
Исправленная версия бота с правильным HTML форматированием
"""
import asyncio
import logging
import random
from aiogram import Bot, Dispatcher, F
from aiogram.filters import Command, StateFilter
from aiogram.fsm.context import FSMContext
from aiogram.fsm.state import State, StatesGroup
from aiogram.fsm.storage.memory import MemoryStorage
from aiogram.types import Message, CallbackQuery, InlineKeyboardMarkup, InlineKeyboardButton, InaccessibleMessage
from aiogram.utils.keyboard import InlineKeyboardBuilder
import sys
import os
# Добавляем путь к проекту
project_root = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
sys.path.insert(0, project_root)
from config.config import config
from src.database.database import DatabaseManager
from src.services.csv_service import CSVQuizLoader, QuizGenerator
# Настройка логирования
logging.basicConfig(level=logging.INFO)
class QuizStates(StatesGroup):
choosing_mode = State()
choosing_category = State()
choosing_level = State()
in_quiz = State()
class QuizBot:
def __init__(self):
self.bot = Bot(token=config.bot_token)
self.dp = Dispatcher(storage=MemoryStorage())
self.db = DatabaseManager(config.database_path)
self.csv_loader = CSVQuizLoader(config.csv_data_path)
# Регистрируем обработчики
self.setup_handlers()
def setup_handlers(self):
"""Регистрация всех обработчиков"""
# Команды
self.dp.message(Command("start"))(self.start_command)
self.dp.message(Command("help"))(self.help_command)
self.dp.message(Command("stats"))(self.stats_command)
self.dp.message(Command("stop"))(self.stop_command)
# Callback обработчики
self.dp.callback_query(F.data == "guest_mode")(self.guest_mode_handler)
self.dp.callback_query(F.data == "test_mode")(self.test_mode_handler)
self.dp.callback_query(F.data.startswith("category_"))(self.category_handler)
self.dp.callback_query(F.data.startswith("level_"))(self.level_handler)
self.dp.callback_query(F.data.startswith("answer_"))(self.answer_handler)
self.dp.callback_query(F.data == "next_question")(self.next_question)
self.dp.callback_query(F.data == "stats")(self.stats_callback_handler)
self.dp.callback_query(F.data == "back_to_menu")(self.back_to_menu)
async def start_command(self, message: Message, state: FSMContext):
"""Обработка команды /start"""
user = message.from_user
# Регистрируем пользователя
await self.db.register_user(
user_id=user.id,
username=user.username,
first_name=user.first_name,
last_name=user.last_name,
language_code=user.language_code or 'ru'
)
await state.set_state(QuizStates.choosing_mode)
keyboard = InlineKeyboardMarkup(inline_keyboard=[
[InlineKeyboardButton(text="🎯 Гостевой режим (QUIZ)", callback_data="guest_mode")],
[InlineKeyboardButton(text="📚 Тестирование по материалам", callback_data="test_mode")],
[InlineKeyboardButton(text="📊 Моя статистика", callback_data="stats")],
])
await message.answer(
f"👋 Добро пожаловать в Quiz Bot, {user.first_name}!\n\n"
"🎯 <b>Гостевой режим</b> - быстрая викторина для развлечения\n"
"📚 <b>Тестирование</b> - серьезное изучение материалов с результатами\n\n"
"Выберите режим работы:",
reply_markup=keyboard,
parse_mode='HTML'
)
async def help_command(self, message: Message):
"""Обработка команды /help"""
help_text = """🤖 <b>Команды бота:</b>
/start - Главное меню
/help - Справка
/stats - Ваша статистика
/stop - Остановить текущий тест
🎯 <b>Гостевой режим:</b>
• Быстрые викторины
• Показ правильных ответов
• Развлекательная атмосфера
• 5 случайных вопросов
📚 <b>Режим тестирования:</b>
• Серьезное тестирование знаний
• Без показа правильных ответов
• Рандомные варианты ответов
• 10 вопросов, детальная статистика
📊 <b>Доступные категории:</b>
• Корейский язык (уровни 1-5)
• Более 120 уникальных вопросов"""
await message.answer(help_text, parse_mode='HTML')
async def stats_command(self, message: Message):
"""Обработка команды /stats"""
user_stats = await self.db.get_user_stats(message.from_user.id)
if not user_stats or user_stats['total_questions'] == 0:
await message.answer("📊 У вас пока нет статистики. Пройдите первый тест!")
return
accuracy = (user_stats['correct_answers'] / user_stats['total_questions']) * 100 if user_stats['total_questions'] > 0 else 0
stats_text = f"""📊 <b>Ваша статистика:</b>
Всего вопросов: {user_stats['total_questions']}
✅ Правильных ответов: {user_stats['correct_answers']}
📈 Точность: {accuracy:.1f}%
🎯 Завершенных сессий: {user_stats['sessions_completed'] or 0}
🏆 Лучший результат: {user_stats['best_score'] or 0:.1f}%
📊 Средний балл: {user_stats['average_score'] or 0:.1f}%"""
keyboard = InlineKeyboardMarkup(inline_keyboard=[
[InlineKeyboardButton(text="🏠 Главное меню", callback_data="back_to_menu")]
])
await message.answer(stats_text, reply_markup=keyboard, parse_mode='HTML')
async def stop_command(self, message: Message):
"""Остановка текущего теста"""
session = await self.db.get_active_session(message.from_user.id)
if session:
await self.db.finish_session(message.from_user.id, 0)
await message.answer("❌ Текущий тест остановлен.")
else:
await message.answer("У вас нет активного теста.")
keyboard = InlineKeyboardMarkup(inline_keyboard=[
[InlineKeyboardButton(text="🏠 Главное меню", callback_data="back_to_menu")]
])
await message.answer("Что бы вы хотели сделать дальше?", reply_markup=keyboard)
async def guest_mode_handler(self, callback: CallbackQuery, state: FSMContext):
"""Обработка выбора гостевого режима"""
await state.update_data(mode='guest')
await state.set_state(QuizStates.choosing_category)
keyboard = InlineKeyboardMarkup(inline_keyboard=[
[InlineKeyboardButton(text="🇰🇷 Корейский язык", callback_data="category_korean")],
[InlineKeyboardButton(text="🔙 Назад", callback_data="back_to_menu")]
])
await callback.message.edit_text(
"🎯 <b>Гостевой режим</b>\n\nВыберите категорию для викторины:",
reply_markup=keyboard,
parse_mode='HTML'
)
await callback.answer()
async def test_mode_handler(self, callback: CallbackQuery, state: FSMContext):
"""Обработка выбора режима тестирования"""
await state.update_data(mode='test')
await state.set_state(QuizStates.choosing_category)
keyboard = InlineKeyboardMarkup(inline_keyboard=[
[InlineKeyboardButton(text="🇰🇷 Корейский язык", callback_data="category_korean")],
[InlineKeyboardButton(text="🔙 Назад", callback_data="back_to_menu")]
])
await callback.message.edit_text(
"📚 <b>Режим тестирования</b>\n\nВыберите категорию для серьезного изучения:",
reply_markup=keyboard,
parse_mode='HTML'
)
await callback.answer()
async def category_handler(self, callback: CallbackQuery, state: FSMContext):
"""Обработка выбора категории"""
category = callback.data.split("_")[1]
await state.update_data(category=category)
keyboard = InlineKeyboardMarkup(inline_keyboard=[
[InlineKeyboardButton(text="🥉 Уровень 1 (начальный)", callback_data="level_1")],
[InlineKeyboardButton(text="🥈 Уровень 2 (базовый)", callback_data="level_2")],
[InlineKeyboardButton(text="🥇 Уровень 3 (средний)", callback_data="level_3")],
[InlineKeyboardButton(text="🏆 Уровень 4 (продвинутый)", callback_data="level_4")],
[InlineKeyboardButton(text="💎 Уровень 5 (эксперт)", callback_data="level_5")],
[InlineKeyboardButton(text="🔙 Назад", callback_data="back_to_menu")]
])
await callback.message.edit_text(
f"🇰🇷 <b>Корейский язык</b>\n\nВыберите уровень сложности:",
reply_markup=keyboard,
parse_mode='HTML'
)
await callback.answer()
async def level_handler(self, callback: CallbackQuery, state: FSMContext):
"""Обработка выбора уровня"""
level = int(callback.data.split("_")[1])
data = await state.get_data()
# Загружаем вопросы
filename = f"{data['category']}_level_{level}.csv"
questions = await self.csv_loader.load_questions_from_csv(filename)
if not questions:
await callback.message.edit_text(
"❌ Вопросы для этого уровня пока недоступны.",
reply_markup=InlineKeyboardMarkup(inline_keyboard=[
[InlineKeyboardButton(text="🔙 Назад", callback_data="back_to_menu")]
])
)
await callback.answer()
return
# Определяем количество вопросов
questions_count = 5 if data['mode'] == 'guest' else 10
# Берем случайные вопросы
selected_questions = random.sample(questions, min(questions_count, len(questions)))
# Создаем тестовую запись в БД
test_id = await self.db.add_test(
name=f"{data['category'].title()} Level {level}",
description=f"Тест по {data['category']} языку, уровень {level}",
level=level,
category=data['category'],
csv_file=filename
)
# Начинаем сессию
await self.db.start_session(
user_id=callback.from_user.id,
test_id=test_id or 1,
questions=selected_questions,
mode=data['mode']
)
await state.set_state(QuizStates.in_quiz)
await self.show_question_safe(callback, callback.from_user.id, 0)
await callback.answer()
def shuffle_answers(self, question_data: dict) -> dict:
"""Перемешивает варианты ответов и обновляет правильный ответ"""
options = [
question_data['option1'],
question_data['option2'],
question_data['option3'],
question_data['option4']
]
correct_answer_text = options[question_data['correct_answer'] - 1]
# Перемешиваем варианты
random.shuffle(options)
# Находим новую позицию правильного ответа
new_correct_position = options.index(correct_answer_text) + 1
# Обновляем данные вопроса
shuffled_question = question_data.copy()
shuffled_question['option1'] = options[0]
shuffled_question['option2'] = options[1]
shuffled_question['option3'] = options[2]
shuffled_question['option4'] = options[3]
shuffled_question['correct_answer'] = new_correct_position
return shuffled_question
async def show_question_safe(self, callback: CallbackQuery, user_id: int, question_index: int):
"""Безопасный показ вопроса через callback"""
session = await self.db.get_active_session(user_id)
if not session or question_index >= len(session['questions_data']):
return
question = session['questions_data'][question_index]
# Перемешиваем варианты ответов только в тестовом режиме
if session['mode'] == 'test':
question = self.shuffle_answers(question)
session['questions_data'][question_index] = question
await self.db.update_session_questions(user_id, session['questions_data'])
total_questions = len(session['questions_data'])
# Создаем клавиатуру с ответами
keyboard_builder = InlineKeyboardBuilder()
for i in range(1, 5):
keyboard_builder.add(InlineKeyboardButton(
text=f"{i}. {question[f'option{i}']}",
callback_data=f"answer_{i}"
))
keyboard_builder.adjust(1)
question_text = (
f"❓ <b>Вопрос {question_index + 1}/{total_questions}</b>\n\n"
f"<b>{question['question']}</b>"
)
# Безопасная отправка сообщения
if callback.message and not isinstance(callback.message, InaccessibleMessage):
try:
await callback.message.edit_text(question_text, reply_markup=keyboard_builder.as_markup(), parse_mode='HTML')
except Exception:
await self.bot.send_message(callback.from_user.id, question_text, reply_markup=keyboard_builder.as_markup(), parse_mode='HTML')
else:
await self.bot.send_message(callback.from_user.id, question_text, reply_markup=keyboard_builder.as_markup(), parse_mode='HTML')
async def answer_handler(self, callback: CallbackQuery, state: FSMContext):
"""Обработка ответа на вопрос"""
answer = int(callback.data.split("_")[1])
user_id = callback.from_user.id
session = await self.db.get_active_session(user_id)
if not session:
await callback.answer("❌ Сессия не найдена")
return
current_q_index = session['current_question']
question = session['questions_data'][current_q_index]
is_correct = answer == question['correct_answer']
mode = session['mode']
# Обновляем счетчик правильных ответов
if is_correct:
session['correct_count'] += 1
# Обновляем прогресс в базе
await self.db.update_session_progress(
user_id, current_q_index + 1, session['correct_count']
)
# Проверяем, есть ли еще вопросы
if current_q_index + 1 >= len(session['questions_data']):
# Тест завершен
score = (session['correct_count'] / len(session['questions_data'])) * 100
await self.db.finish_session(user_id, score)
keyboard = InlineKeyboardMarkup(inline_keyboard=[
[InlineKeyboardButton(text="🏠 Главное меню", callback_data="back_to_menu")],
[InlineKeyboardButton(text="📊 Моя статистика", callback_data="stats")]
])
# Разный текст для разных режимов
if mode == 'test':
final_text = (
f"🎉 <b>Тест завершен!</b>\n\n"
f"📊 Результат: {session['correct_count']}/{len(session['questions_data'])}\n"
f"📈 Точность: {score:.1f}%\n"
f"🏆 Оценка: {self.get_grade(score)}\n\n"
f"💡 Результат сохранен в вашей статистике"
)
else:
result_text = "✅ Правильно!" if is_correct else f"❌ Неправильно. Правильный ответ: {question['correct_answer']}"
final_text = (
f"{result_text}\n\n"
f"🎉 <b>Викторина завершена!</b>\n\n"
f"📊 Результат: {session['correct_count']}/{len(session['questions_data'])}\n"
f"📈 Точность: {score:.1f}%\n"
f"🏆 Оценка: {self.get_grade(score)}"
)
# Безопасная отправка сообщения
if callback.message and not isinstance(callback.message, InaccessibleMessage):
try:
await callback.message.edit_text(final_text, reply_markup=keyboard, parse_mode='HTML')
except Exception:
await self.bot.send_message(callback.from_user.id, final_text, reply_markup=keyboard, parse_mode='HTML')
else:
await self.bot.send_message(callback.from_user.id, final_text, reply_markup=keyboard, parse_mode='HTML')
else:
# Есть еще вопросы
if mode == 'test':
# В тестовом режиме сразу переходим к следующему вопросу
await self.show_question_safe(callback, callback.from_user.id, current_q_index + 1)
else:
# В гостевом режиме показываем результат и кнопку "Следующий"
result_text = "✅ Правильно!" if is_correct else f"❌ Неправильно. Правильный ответ: {question['correct_answer']}"
keyboard = InlineKeyboardMarkup(inline_keyboard=[
[InlineKeyboardButton(text="➡️ Следующий вопрос", callback_data="next_question")]
])
# Безопасная отправка сообщения
if callback.message and not isinstance(callback.message, InaccessibleMessage):
try:
await callback.message.edit_text(result_text, reply_markup=keyboard)
except Exception:
await self.bot.send_message(callback.from_user.id, result_text, reply_markup=keyboard)
else:
await self.bot.send_message(callback.from_user.id, result_text, reply_markup=keyboard)
await callback.answer()
async def next_question(self, callback: CallbackQuery):
"""Переход к следующему вопросу"""
session = await self.db.get_active_session(callback.from_user.id)
if session:
await self.show_question_safe(callback, callback.from_user.id, session['current_question'])
await callback.answer()
async def stats_callback_handler(self, callback: CallbackQuery):
"""Обработчик кнопки статистики через callback"""
user_stats = await self.db.get_user_stats(callback.from_user.id)
if not user_stats or user_stats['total_questions'] == 0:
stats_text = "📊 У вас пока нет статистики. Пройдите первый тест!"
else:
accuracy = (user_stats['correct_answers'] / user_stats['total_questions']) * 100 if user_stats['total_questions'] > 0 else 0
# Получаем дополнительную статистику
recent_results = await self.db.get_recent_results(callback.from_user.id, 3)
category_stats = await self.db.get_category_stats(callback.from_user.id)
best_score = user_stats['best_score'] or 0
avg_score = user_stats['average_score'] or 0
stats_text = f"""📊 <b>Ваша статистика:</b>
📈 <b>Общие показатели:</b>
Всего вопросов: {user_stats['total_questions']}
✅ Правильных ответов: {user_stats['correct_answers']}
🎯 Общая точность: {accuracy:.1f}%
🎪 Завершенных сессий: {user_stats['sessions_completed'] or 0}
🏆 Лучший результат: {best_score:.1f}%
📊 Средний балл: {avg_score:.1f}%
🎮 <b>По режимам:</b>
🎯 Гостевые викторины: {user_stats['guest_sessions'] or 0}
📚 Серьезные тесты: {user_stats['test_sessions'] or 0}"""
# Добавляем статистику по категориям
if category_stats:
stats_text += "\n\n🏷️ <b>По категориям:</b>"
for cat_stat in category_stats[:2]:
cat_accuracy = (cat_stat['correct_answers'] / cat_stat['total_questions']) * 100 if cat_stat['total_questions'] > 0 else 0
stats_text += f"\n📖 {cat_stat['category']}: {cat_stat['attempts']} попыток, {cat_accuracy:.1f}% точность"
# Добавляем последние результаты
if recent_results:
stats_text += "\n\n📈 <b>Последние результаты:</b>"
for result in recent_results:
mode_emoji = "🎯" if result['mode'] == 'guest' else "📚"
stats_text += f"\n{mode_emoji} {result['score']:.1f}% ({result['correct_answers']}/{result['questions_asked']})"
keyboard = InlineKeyboardMarkup(inline_keyboard=[
[InlineKeyboardButton(text="🏠 Главное меню", callback_data="back_to_menu")],
[InlineKeyboardButton(text="🔄 Обновить статистику", callback_data="stats")]
])
# Безопасная отправка сообщения
if callback.message and not isinstance(callback.message, InaccessibleMessage):
try:
await callback.message.edit_text(stats_text, reply_markup=keyboard, parse_mode='HTML')
except Exception:
await self.bot.send_message(callback.from_user.id, stats_text, reply_markup=keyboard, parse_mode='HTML')
else:
await self.bot.send_message(callback.from_user.id, stats_text, reply_markup=keyboard, parse_mode='HTML')
await callback.answer()
async def back_to_menu(self, callback: CallbackQuery, state: FSMContext):
"""Возврат в главное меню"""
await state.clear()
user = callback.from_user
# Регистрируем пользователя (если еще не зарегистрирован)
await self.db.register_user(
user_id=user.id,
username=user.username,
first_name=user.first_name,
last_name=user.last_name,
language_code=user.language_code or 'ru'
)
await state.set_state(QuizStates.choosing_mode)
keyboard = InlineKeyboardMarkup(inline_keyboard=[
[InlineKeyboardButton(text="🎯 Гостевой режим (QUIZ)", callback_data="guest_mode")],
[InlineKeyboardButton(text="📚 Тестирование по материалам", callback_data="test_mode")],
[InlineKeyboardButton(text="📊 Моя статистика", callback_data="stats")],
])
text = (f"👋 Добро пожаловать в Quiz Bot, {user.first_name}!\n\n"
"🎯 <b>Гостевой режим</b> - быстрая викторина для развлечения\n"
"📚 <b>Тестирование</b> - серьезное изучение материалов с результатами\n\n"
"Выберите режим работы:")
if callback.message and not isinstance(callback.message, InaccessibleMessage):
try:
await callback.message.edit_text(text, reply_markup=keyboard, parse_mode='HTML')
except Exception:
await self.bot.send_message(callback.from_user.id, text, reply_markup=keyboard, parse_mode='HTML')
else:
await self.bot.send_message(callback.from_user.id, text, reply_markup=keyboard, parse_mode='HTML')
await callback.answer()
def get_grade(self, score: float) -> str:
"""Получение оценки по проценту правильных ответов"""
if score >= 90:
return "Отлично! 🌟"
elif score >= 70:
return "Хорошо! 👍"
elif score >= 50:
return "Удовлетворительно 📚"
else:
return "Нужно подтянуть знания 📖"
async def start(self):
"""Запуск бота"""
# Проверяем токен
if not config.bot_token or config.bot_token in ['your_bot_token_here', 'test_token_for_demo_purposes']:
print("❌ Ошибка: не настроен BOT_TOKEN в файле .env")
return False
# Инициализируем базу данных
await self.db.init_database()
print("✅ Bot starting...")
print(f"🗄️ Database: {config.database_path}")
print(f"📁 CSV files: {config.csv_data_path}")
try:
await self.dp.start_polling(self.bot)
except Exception as e:
logging.error(f"Error starting bot: {e}")
return False
async def main():
"""Главная функция"""
bot = QuizBot()
await bot.start()
if __name__ == "__main__":
asyncio.run(main())

390
src/bot_backup.py Normal file
View File

@@ -0,0 +1,390 @@
import aiosqlite
import logging
from typing import List, Dict, Optional, Tuple, Union
import json
class DatabaseManager:
def __init__(self, db_path: str):
self.db_path = db_path
async def init_database(self):
"""Инициализация базы данных и создание таблиц"""
async with aiosqlite.connect(self.db_path) as db:
# Таблица пользователей
await db.execute("""
CREATE TABLE IF NOT EXISTS users (
user_id INTEGER PRIMARY KEY,
username TEXT,
first_name TEXT,
last_name TEXT,
language_code TEXT DEFAULT 'ru',
registration_date TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
is_guest BOOLEAN DEFAULT TRUE,
total_questions INTEGER DEFAULT 0,
correct_answers INTEGER DEFAULT 0
)
""")
# Таблица тестов
await db.execute("""
CREATE TABLE IF NOT EXISTS tests (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
description TEXT,
level INTEGER,
category TEXT,
csv_file TEXT,
created_date TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
is_active BOOLEAN DEFAULT TRUE
)
""")
# Таблица вопросов
await db.execute("""
CREATE TABLE IF NOT EXISTS questions (
id INTEGER PRIMARY KEY AUTOINCREMENT,
test_id INTEGER,
question TEXT NOT NULL,
option1 TEXT NOT NULL,
option2 TEXT NOT NULL,
option3 TEXT NOT NULL,
option4 TEXT NOT NULL,
correct_answer INTEGER NOT NULL,
FOREIGN KEY (test_id) REFERENCES tests (id)
)
""")
# Таблица результатов
await db.execute("""
CREATE TABLE IF NOT EXISTS results (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id INTEGER,
test_id INTEGER,
mode TEXT, -- 'guest' or 'test'
questions_asked INTEGER,
correct_answers INTEGER,
total_time INTEGER,
start_time TIMESTAMP,
end_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
score REAL,
FOREIGN KEY (user_id) REFERENCES users (user_id),
FOREIGN KEY (test_id) REFERENCES tests (id)
)
""")
# Таблица активных сессий
await db.execute("""
CREATE TABLE IF NOT EXISTS active_sessions (
user_id INTEGER PRIMARY KEY,
test_id INTEGER,
current_question INTEGER DEFAULT 0,
correct_count INTEGER DEFAULT 0,
questions_data TEXT, -- JSON с вопросами сессии
start_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
mode TEXT,
FOREIGN KEY (user_id) REFERENCES users (user_id),
FOREIGN KEY (test_id) REFERENCES tests (id)
)
""")
await db.commit()
logging.info("Database initialized successfully")
async def register_user(self, user_id: int, username: Optional[str] = None,
first_name: Optional[str] = None, last_name: Optional[str] = None,
language_code: str = 'ru', is_guest: bool = True) -> bool:
"""Регистрация нового пользователя"""
try:
async with aiosqlite.connect(self.db_path) as db:
await db.execute("""
INSERT OR REPLACE INTO users
(user_id, username, first_name, last_name, language_code, is_guest)
VALUES (?, ?, ?, ?, ?, ?)
""", (user_id, username, first_name, last_name, language_code, is_guest))
await db.commit()
return True
except Exception as e:
logging.error(f"Error registering user {user_id}: {e}")
return False
async def get_user(self, user_id: int) -> Optional[Dict]:
"""Получение данных пользователя"""
try:
async with aiosqlite.connect(self.db_path) as db:
cursor = await db.execute(
"SELECT * FROM users WHERE user_id = ?", (user_id,)
)
row = await cursor.fetchone()
if row:
columns = [description[0] for description in cursor.description]
return dict(zip(columns, row))
return None
except Exception as e:
logging.error(f"Error getting user {user_id}: {e}")
return None
async def add_test(self, name: str, description: str, level: int,
category: str, csv_file: str) -> Optional[int]:
"""Добавление нового теста"""
try:
async with aiosqlite.connect(self.db_path) as db:
cursor = await db.execute("""
INSERT INTO tests (name, description, level, category, csv_file)
VALUES (?, ?, ?, ?, ?)
""", (name, description, level, category, csv_file))
await db.commit()
return cursor.lastrowid
except Exception as e:
logging.error(f"Error adding test: {e}")
return None
async def get_tests_by_category(self, category: Optional[str] = None) -> List[Dict]:
"""Получение тестов по категории"""
try:
async with aiosqlite.connect(self.db_path) as db:
if category:
cursor = await db.execute(
"SELECT * FROM tests WHERE category = ? AND is_active = TRUE ORDER BY level",
(category,)
)
else:
cursor = await db.execute(
"SELECT * FROM tests WHERE is_active = TRUE ORDER BY category, level"
)
rows = await cursor.fetchall()
columns = [description[0] for description in cursor.description]
return [dict(zip(columns, row)) for row in rows]
except Exception as e:
logging.error(f"Error getting tests: {e}")
return []
async def add_questions_to_test(self, test_id: int, questions: List[Dict]) -> bool:
"""Добавление вопросов к тесту"""
try:
async with aiosqlite.connect(self.db_path) as db:
for q in questions:
await db.execute("""
INSERT INTO questions
(test_id, question, option1, option2, option3, option4, correct_answer)
VALUES (?, ?, ?, ?, ?, ?, ?)
""", (test_id, q['question'], q['option1'], q['option2'],
q['option3'], q['option4'], q['correct_answer']))
await db.commit()
return True
except Exception as e:
logging.error(f"Error adding questions to test {test_id}: {e}")
return False
async def get_random_questions(self, test_id: int, count: int = 10) -> List[Dict]:
"""Получение случайных вопросов из теста"""
try:
async with aiosqlite.connect(self.db_path) as db:
cursor = await db.execute("""
SELECT * FROM questions WHERE test_id = ?
ORDER BY RANDOM() LIMIT ?
""", (test_id, count))
rows = await cursor.fetchall()
columns = [description[0] for description in cursor.description]
return [dict(zip(columns, row)) for row in rows]
except Exception as e:
logging.error(f"Error getting random questions: {e}")
return []
async def start_session(self, user_id: int, test_id: int,
questions: List[Dict], mode: str) -> bool:
"""Начало новой сессии викторины"""
try:
async with aiosqlite.connect(self.db_path) as db:
questions_json = json.dumps(questions)
await db.execute("""
INSERT OR REPLACE INTO active_sessions
(user_id, test_id, questions_data, mode)
VALUES (?, ?, ?, ?)
""", (user_id, test_id, questions_json, mode))
await db.commit()
return True
except Exception as e:
logging.error(f"Error starting session: {e}")
return False
async def get_active_session(self, user_id: int) -> Optional[Dict]:
"""Получение активной сессии пользователя"""
try:
async with aiosqlite.connect(self.db_path) as db:
cursor = await db.execute(
"SELECT * FROM active_sessions WHERE user_id = ?", (user_id,)
)
row = await cursor.fetchone()
if row:
columns = [description[0] for description in cursor.description]
session = dict(zip(columns, row))
session['questions_data'] = json.loads(session['questions_data'])
return session
return None
except Exception as e:
logging.error(f"Error getting active session: {e}")
return None
async def update_session_progress(self, user_id: int, question_num: int,
correct_count: int) -> bool:
"""Обновление прогресса сессии"""
try:
async with aiosqlite.connect(self.db_path) as db:
await db.execute("""
UPDATE active_sessions
SET current_question = ?, correct_count = ?
WHERE user_id = ?
""", (question_num, correct_count, user_id))
await db.commit()
return True
except Exception as e:
logging.error(f"Error updating session progress: {e}")
return False
async def update_session_questions(self, user_id: int, questions_data: list) -> bool:
"""Обновление данных вопросов в сессии (например, после перемешивания)"""
try:
async with aiosqlite.connect(self.db_path) as db:
questions_json = json.dumps(questions_data, ensure_ascii=False)
await db.execute("""
UPDATE active_sessions
SET questions_data = ?
WHERE user_id = ?
""", (questions_json, user_id))
await db.commit()
return True
except Exception as e:
logging.error(f"Error updating session questions: {e}")
return False
async def finish_session(self, user_id: int, score: float) -> bool:
"""Завершение сессии и сохранение результатов"""
try:
async with aiosqlite.connect(self.db_path) as db:
# Получаем данные сессии
session = await self.get_active_session(user_id)
if not session:
return False
# Сохраняем результат
await db.execute("""
INSERT INTO results
(user_id, test_id, mode, questions_asked, correct_answers, score)
VALUES (?, ?, ?, ?, ?, ?)
""", (user_id, session['test_id'], session['mode'],
len(session['questions_data']), session['correct_count'], score))
# Обновляем статистику пользователя
await db.execute("""
UPDATE users
SET total_questions = total_questions + ?,
correct_answers = correct_answers + ?
WHERE user_id = ?
""", (len(session['questions_data']), session['correct_count'], user_id))
# Удаляем активную сессию
await db.execute("DELETE FROM active_sessions WHERE user_id = ?", (user_id,))
await db.commit()
return True
except Exception as e:
logging.error(f"Error finishing session: {e}")
return False
async def get_user_stats(self, user_id: int) -> Optional[Dict]:
"""Получение статистики пользователя"""
try:
async with aiosqlite.connect(self.db_path) as db:
cursor = await db.execute("""
SELECT
u.total_questions,
u.correct_answers,
COUNT(r.id) as sessions_completed,
MAX(r.score) as best_score,
AVG(r.score) as average_score,
COUNT(CASE WHEN r.mode = 'guest' THEN 1 END) as guest_sessions,
COUNT(CASE WHEN r.mode = 'test' THEN 1 END) as test_sessions
FROM users u
LEFT JOIN results r ON u.user_id = r.user_id
WHERE u.user_id = ?
GROUP BY u.user_id
""", (user_id,))
row = await cursor.fetchone()
if row:
columns = [description[0] for description in cursor.description]
stats = dict(zip(columns, row))
return stats
return None
except Exception as e:
logging.error(f"Error getting user stats: {e}")
return None
return None
async def get_recent_results(self, user_id: int, limit: int = 5) -> List[Dict]:
"""Получение последних результатов пользователя"""
try:
async with aiosqlite.connect(self.db_path) as db:
cursor = await db.execute("""
SELECT
r.mode,
r.questions_asked,
r.correct_answers,
r.score,
r.end_time,
t.name as test_name,
t.level
FROM results r
LEFT JOIN tests t ON r.test_id = t.id
WHERE r.user_id = ?
ORDER BY r.end_time DESC
LIMIT ?
""", (user_id, limit))
rows = await cursor.fetchall()
columns = [description[0] for description in cursor.description]
return [dict(zip(columns, row)) for row in rows]
except Exception as e:
logging.error(f"Error getting recent results: {e}")
return []
async def get_category_stats(self, user_id: int) -> List[Dict]:
"""Получение статистики по категориям"""
try:
async with aiosqlite.connect(self.db_path) as db:
cursor = await db.execute("""
SELECT
t.category,
COUNT(r.id) as attempts,
AVG(r.score) as avg_score,
MAX(r.score) as best_score,
SUM(r.questions_asked) as total_questions,
SUM(r.correct_answers) as correct_answers
FROM results r
JOIN tests t ON r.test_id = t.id
WHERE r.user_id = ?
GROUP BY t.category
ORDER BY attempts DESC
""", (user_id,))
rows = await cursor.fetchall()
columns = [description[0] for description in cursor.description]
return [dict(zip(columns, row)) for row in rows]
except Exception as e:
logging.error(f"Error getting category stats: {e}")
return []
AVG(r.score) as average_score,
MAX(r.score) as best_score
FROM users u
LEFT JOIN results r ON u.user_id = r.user_id
WHERE u.user_id = ?
GROUP BY u.user_id
""", (user_id,))
row = await cursor.fetchone()
if row:
columns = [description[0] for description in cursor.description]
return dict(zip(columns, row))
return None
except Exception as e:
logging.error(f"Error getting user stats: {e}")
return None

553
src/bot_fixed.py Normal file
View File

@@ -0,0 +1,553 @@
#!/usr/bin/env python3
"""
Исправленная версия бота с правильным HTML форматированием
"""
import asyncio
import logging
import random
from aiogram import Bot, Dispatcher, F
from aiogram.filters import Command, StateFilter
from aiogram.fsm.context import FSMContext
from aiogram.fsm.state import State, StatesGroup
from aiogram.fsm.storage.memory import MemoryStorage
from aiogram.types import Message, CallbackQuery, InlineKeyboardMarkup, InlineKeyboardButton, InaccessibleMessage
from aiogram.utils.keyboard import InlineKeyboardBuilder
import sys
import os
# Добавляем путь к проекту
project_root = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
sys.path.insert(0, project_root)
from config.config import config
from src.database.database import DatabaseManager
from src.services.csv_service import CSVQuizLoader, QuizGenerator
# Настройка логирования
logging.basicConfig(level=logging.INFO)
class QuizStates(StatesGroup):
choosing_mode = State()
choosing_category = State()
choosing_level = State()
in_quiz = State()
class QuizBot:
def __init__(self):
self.bot = Bot(token=config.bot_token)
self.dp = Dispatcher(storage=MemoryStorage())
self.db = DatabaseManager(config.database_path)
self.csv_loader = CSVQuizLoader(config.csv_data_path)
# Регистрируем обработчики
self.setup_handlers()
def setup_handlers(self):
"""Регистрация всех обработчиков"""
# Команды
self.dp.message(Command("start"))(self.start_command)
self.dp.message(Command("help"))(self.help_command)
self.dp.message(Command("stats"))(self.stats_command)
self.dp.message(Command("stop"))(self.stop_command)
# Callback обработчики
self.dp.callback_query(F.data == "guest_mode")(self.guest_mode_handler)
self.dp.callback_query(F.data == "test_mode")(self.test_mode_handler)
self.dp.callback_query(F.data.startswith("category_"))(self.category_handler)
self.dp.callback_query(F.data.startswith("level_"))(self.level_handler)
self.dp.callback_query(F.data.startswith("answer_"))(self.answer_handler)
self.dp.callback_query(F.data == "next_question")(self.next_question)
self.dp.callback_query(F.data == "stats")(self.stats_callback_handler)
self.dp.callback_query(F.data == "back_to_menu")(self.back_to_menu)
async def start_command(self, message: Message, state: FSMContext):
"""Обработка команды /start"""
user = message.from_user
# Регистрируем пользователя
await self.db.register_user(
user_id=user.id,
username=user.username,
first_name=user.first_name,
last_name=user.last_name,
language_code=user.language_code or 'ru'
)
await state.set_state(QuizStates.choosing_mode)
keyboard = InlineKeyboardMarkup(inline_keyboard=[
[InlineKeyboardButton(text="🎯 Гостевой режим (QUIZ)", callback_data="guest_mode")],
[InlineKeyboardButton(text="📚 Тестирование по материалам", callback_data="test_mode")],
[InlineKeyboardButton(text="📊 Моя статистика", callback_data="stats")],
])
await message.answer(
f"👋 Добро пожаловать в Quiz Bot, {user.first_name}!\n\n"
"🎯 <b>Гостевой режим</b> - быстрая викторина для развлечения\n"
"📚 <b>Тестирование</b> - серьезное изучение материалов с результатами\n\n"
"Выберите режим работы:",
reply_markup=keyboard,
parse_mode='HTML'
)
async def help_command(self, message: Message):
"""Обработка команды /help"""
help_text = """🤖 <b>Команды бота:</b>
/start - Главное меню
/help - Справка
/stats - Ваша статистика
/stop - Остановить текущий тест
🎯 <b>Гостевой режим:</b>
• Быстрые викторины
• Показ правильных ответов
• Развлекательная атмосфера
• 5 случайных вопросов
📚 <b>Режим тестирования:</b>
• Серьезное тестирование знаний
• Без показа правильных ответов
• Рандомные варианты ответов
• 10 вопросов, детальная статистика
📊 <b>Доступные категории:</b>
• Корейский язык (уровни 1-5)
• Более 120 уникальных вопросов"""
await message.answer(help_text, parse_mode='HTML')
async def stats_command(self, message: Message):
"""Обработка команды /stats"""
user_stats = await self.db.get_user_stats(message.from_user.id)
if not user_stats or user_stats['total_questions'] == 0:
await message.answer("📊 У вас пока нет статистики. Пройдите первый тест!")
return
accuracy = (user_stats['correct_answers'] / user_stats['total_questions']) * 100 if user_stats['total_questions'] > 0 else 0
stats_text = f"""📊 <b>Ваша статистика:</b>
Всего вопросов: {user_stats['total_questions']}
✅ Правильных ответов: {user_stats['correct_answers']}
📈 Точность: {accuracy:.1f}%
🎯 Завершенных сессий: {user_stats['sessions_completed'] or 0}
🏆 Лучший результат: {user_stats['best_score'] or 0:.1f}%
📊 Средний балл: {user_stats['average_score'] or 0:.1f}%"""
keyboard = InlineKeyboardMarkup(inline_keyboard=[
[InlineKeyboardButton(text="🏠 Главное меню", callback_data="back_to_menu")]
])
await message.answer(stats_text, reply_markup=keyboard, parse_mode='HTML')
async def stop_command(self, message: Message):
"""Остановка текущего теста"""
session = await self.db.get_active_session(message.from_user.id)
if session:
await self.db.finish_session(message.from_user.id, 0)
await message.answer("❌ Текущий тест остановлен.")
else:
await message.answer("У вас нет активного теста.")
keyboard = InlineKeyboardMarkup(inline_keyboard=[
[InlineKeyboardButton(text="🏠 Главное меню", callback_data="back_to_menu")]
])
await message.answer("Что бы вы хотели сделать дальше?", reply_markup=keyboard)
async def guest_mode_handler(self, callback: CallbackQuery, state: FSMContext):
"""Обработка выбора гостевого режима"""
await state.update_data(mode='guest')
await state.set_state(QuizStates.choosing_category)
keyboard = InlineKeyboardMarkup(inline_keyboard=[
[InlineKeyboardButton(text="🇰🇷 Корейский язык", callback_data="category_korean")],
[InlineKeyboardButton(text="🔙 Назад", callback_data="back_to_menu")]
])
await callback.message.edit_text(
"🎯 <b>Гостевой режим</b>\n\nВыберите категорию для викторины:",
reply_markup=keyboard,
parse_mode='HTML'
)
await callback.answer()
async def test_mode_handler(self, callback: CallbackQuery, state: FSMContext):
"""Обработка выбора режима тестирования"""
await state.update_data(mode='test')
await state.set_state(QuizStates.choosing_category)
keyboard = InlineKeyboardMarkup(inline_keyboard=[
[InlineKeyboardButton(text="🇰🇷 Корейский язык", callback_data="category_korean")],
[InlineKeyboardButton(text="🔙 Назад", callback_data="back_to_menu")]
])
await callback.message.edit_text(
"📚 <b>Режим тестирования</b>\n\nВыберите категорию для серьезного изучения:",
reply_markup=keyboard,
parse_mode='HTML'
)
await callback.answer()
async def category_handler(self, callback: CallbackQuery, state: FSMContext):
"""Обработка выбора категории"""
category = callback.data.split("_")[1]
await state.update_data(category=category)
keyboard = InlineKeyboardMarkup(inline_keyboard=[
[InlineKeyboardButton(text="🥉 Уровень 1 (начальный)", callback_data="level_1")],
[InlineKeyboardButton(text="🥈 Уровень 2 (базовый)", callback_data="level_2")],
[InlineKeyboardButton(text="🥇 Уровень 3 (средний)", callback_data="level_3")],
[InlineKeyboardButton(text="🏆 Уровень 4 (продвинутый)", callback_data="level_4")],
[InlineKeyboardButton(text="💎 Уровень 5 (эксперт)", callback_data="level_5")],
[InlineKeyboardButton(text="🔙 Назад", callback_data="back_to_menu")]
])
await callback.message.edit_text(
f"🇰🇷 <b>Корейский язык</b>\n\nВыберите уровень сложности:",
reply_markup=keyboard,
parse_mode='HTML'
)
await callback.answer()
async def level_handler(self, callback: CallbackQuery, state: FSMContext):
"""Обработка выбора уровня"""
level = int(callback.data.split("_")[1])
data = await state.get_data()
# Загружаем вопросы
filename = f"{data['category']}_level_{level}.csv"
questions = await self.csv_loader.load_questions_from_file(filename)
if not questions:
await callback.message.edit_text(
"❌ Вопросы для этого уровня пока недоступны.",
reply_markup=InlineKeyboardMarkup(inline_keyboard=[
[InlineKeyboardButton(text="🔙 Назад", callback_data="back_to_menu")]
])
)
await callback.answer()
return
# Определяем количество вопросов
questions_count = 5 if data['mode'] == 'guest' else 10
# Берем случайные вопросы
selected_questions = random.sample(questions, min(questions_count, len(questions)))
# Создаем тестовую запись в БД
test_id = await self.db.add_test(
name=f"{data['category'].title()} Level {level}",
description=f"Тест по {data['category']} языку, уровень {level}",
level=level,
category=data['category'],
csv_file=filename
)
# Начинаем сессию
await self.db.start_session(
user_id=callback.from_user.id,
test_id=test_id or 1,
questions=selected_questions,
mode=data['mode']
)
await state.set_state(QuizStates.in_quiz)
await self.show_question_safe(callback, callback.from_user.id, 0)
await callback.answer()
def shuffle_answers(self, question_data: dict) -> dict:
"""Перемешивает варианты ответов и обновляет правильный ответ"""
options = [
question_data['option1'],
question_data['option2'],
question_data['option3'],
question_data['option4']
]
correct_answer_text = options[question_data['correct_answer'] - 1]
# Перемешиваем варианты
random.shuffle(options)
# Находим новую позицию правильного ответа
new_correct_position = options.index(correct_answer_text) + 1
# Обновляем данные вопроса
shuffled_question = question_data.copy()
shuffled_question['option1'] = options[0]
shuffled_question['option2'] = options[1]
shuffled_question['option3'] = options[2]
shuffled_question['option4'] = options[3]
shuffled_question['correct_answer'] = new_correct_position
return shuffled_question
async def show_question_safe(self, callback: CallbackQuery, user_id: int, question_index: int):
"""Безопасный показ вопроса через callback"""
session = await self.db.get_active_session(user_id)
if not session or question_index >= len(session['questions_data']):
return
question = session['questions_data'][question_index]
# Перемешиваем варианты ответов только в тестовом режиме
if session['mode'] == 'test':
question = self.shuffle_answers(question)
session['questions_data'][question_index] = question
await self.db.update_session_questions(user_id, session['questions_data'])
total_questions = len(session['questions_data'])
# Создаем клавиатуру с ответами
keyboard_builder = InlineKeyboardBuilder()
for i in range(1, 5):
keyboard_builder.add(InlineKeyboardButton(
text=f"{i}. {question[f'option{i}']}",
callback_data=f"answer_{i}"
))
keyboard_builder.adjust(1)
question_text = (
f"❓ <b>Вопрос {question_index + 1}/{total_questions}</b>\n\n"
f"<b>{question['question']}</b>"
)
# Безопасная отправка сообщения
if callback.message and not isinstance(callback.message, InaccessibleMessage):
try:
await callback.message.edit_text(question_text, reply_markup=keyboard_builder.as_markup(), parse_mode='HTML')
except Exception:
await self.bot.send_message(callback.from_user.id, question_text, reply_markup=keyboard_builder.as_markup(), parse_mode='HTML')
else:
await self.bot.send_message(callback.from_user.id, question_text, reply_markup=keyboard_builder.as_markup(), parse_mode='HTML')
async def answer_handler(self, callback: CallbackQuery, state: FSMContext):
"""Обработка ответа на вопрос"""
answer = int(callback.data.split("_")[1])
user_id = callback.from_user.id
session = await self.db.get_active_session(user_id)
if not session:
await callback.answer("❌ Сессия не найдена")
return
current_q_index = session['current_question']
question = session['questions_data'][current_q_index]
is_correct = answer == question['correct_answer']
mode = session['mode']
# Обновляем счетчик правильных ответов
if is_correct:
session['correct_count'] += 1
# Обновляем прогресс в базе
await self.db.update_session_progress(
user_id, current_q_index + 1, session['correct_count']
)
# Проверяем, есть ли еще вопросы
if current_q_index + 1 >= len(session['questions_data']):
# Тест завершен
score = (session['correct_count'] / len(session['questions_data'])) * 100
await self.db.finish_session(user_id, score)
keyboard = InlineKeyboardMarkup(inline_keyboard=[
[InlineKeyboardButton(text="🏠 Главное меню", callback_data="back_to_menu")],
[InlineKeyboardButton(text="📊 Моя статистика", callback_data="stats")]
])
# Разный текст для разных режимов
if mode == 'test':
final_text = (
f"🎉 <b>Тест завершен!</b>\n\n"
f"📊 Результат: {session['correct_count']}/{len(session['questions_data'])}\n"
f"📈 Точность: {score:.1f}%\n"
f"🏆 Оценка: {self.get_grade(score)}\n\n"
f"💡 Результат сохранен в вашей статистике"
)
else:
result_text = "✅ Правильно!" if is_correct else f"❌ Неправильно. Правильный ответ: {question['correct_answer']}"
final_text = (
f"{result_text}\n\n"
f"🎉 <b>Викторина завершена!</b>\n\n"
f"📊 Результат: {session['correct_count']}/{len(session['questions_data'])}\n"
f"📈 Точность: {score:.1f}%\n"
f"🏆 Оценка: {self.get_grade(score)}"
)
# Безопасная отправка сообщения
if callback.message and not isinstance(callback.message, InaccessibleMessage):
try:
await callback.message.edit_text(final_text, reply_markup=keyboard, parse_mode='HTML')
except Exception:
await self.bot.send_message(callback.from_user.id, final_text, reply_markup=keyboard, parse_mode='HTML')
else:
await self.bot.send_message(callback.from_user.id, final_text, reply_markup=keyboard, parse_mode='HTML')
else:
# Есть еще вопросы
if mode == 'test':
# В тестовом режиме сразу переходим к следующему вопросу
await self.show_question_safe(callback, callback.from_user.id, current_q_index + 1)
else:
# В гостевом режиме показываем результат и кнопку "Следующий"
result_text = "✅ Правильно!" if is_correct else f"❌ Неправильно. Правильный ответ: {question['correct_answer']}"
keyboard = InlineKeyboardMarkup(inline_keyboard=[
[InlineKeyboardButton(text="➡️ Следующий вопрос", callback_data="next_question")]
])
# Безопасная отправка сообщения
if callback.message and not isinstance(callback.message, InaccessibleMessage):
try:
await callback.message.edit_text(result_text, reply_markup=keyboard)
except Exception:
await self.bot.send_message(callback.from_user.id, result_text, reply_markup=keyboard)
else:
await self.bot.send_message(callback.from_user.id, result_text, reply_markup=keyboard)
await callback.answer()
async def next_question(self, callback: CallbackQuery):
"""Переход к следующему вопросу"""
session = await self.db.get_active_session(callback.from_user.id)
if session:
await self.show_question_safe(callback, callback.from_user.id, session['current_question'])
await callback.answer()
async def stats_callback_handler(self, callback: CallbackQuery):
"""Обработчик кнопки статистики через callback"""
user_stats = await self.db.get_user_stats(callback.from_user.id)
if not user_stats or user_stats['total_questions'] == 0:
stats_text = "📊 У вас пока нет статистики. Пройдите первый тест!"
else:
accuracy = (user_stats['correct_answers'] / user_stats['total_questions']) * 100 if user_stats['total_questions'] > 0 else 0
# Получаем дополнительную статистику
recent_results = await self.db.get_recent_results(callback.from_user.id, 3)
category_stats = await self.db.get_category_stats(callback.from_user.id)
best_score = user_stats['best_score'] or 0
avg_score = user_stats['average_score'] or 0
stats_text = f"""📊 <b>Ваша статистика:</b>
📈 <b>Общие показатели:</b>
Всего вопросов: {user_stats['total_questions']}
✅ Правильных ответов: {user_stats['correct_answers']}
🎯 Общая точность: {accuracy:.1f}%
🎪 Завершенных сессий: {user_stats['sessions_completed'] or 0}
🏆 Лучший результат: {best_score:.1f}%
📊 Средний балл: {avg_score:.1f}%
🎮 <b>По режимам:</b>
🎯 Гостевые викторины: {user_stats['guest_sessions'] or 0}
📚 Серьезные тесты: {user_stats['test_sessions'] or 0}"""
# Добавляем статистику по категориям
if category_stats:
stats_text += "\n\n🏷️ <b>По категориям:</b>"
for cat_stat in category_stats[:2]:
cat_accuracy = (cat_stat['correct_answers'] / cat_stat['total_questions']) * 100 if cat_stat['total_questions'] > 0 else 0
stats_text += f"\n📖 {cat_stat['category']}: {cat_stat['attempts']} попыток, {cat_accuracy:.1f}% точность"
# Добавляем последние результаты
if recent_results:
stats_text += "\n\n📈 <b>Последние результаты:</b>"
for result in recent_results:
mode_emoji = "🎯" if result['mode'] == 'guest' else "📚"
stats_text += f"\n{mode_emoji} {result['score']:.1f}% ({result['correct_answers']}/{result['questions_asked']})"
keyboard = InlineKeyboardMarkup(inline_keyboard=[
[InlineKeyboardButton(text="🏠 Главное меню", callback_data="back_to_menu")],
[InlineKeyboardButton(text="🔄 Обновить статистику", callback_data="stats")]
])
# Безопасная отправка сообщения
if callback.message and not isinstance(callback.message, InaccessibleMessage):
try:
await callback.message.edit_text(stats_text, reply_markup=keyboard, parse_mode='HTML')
except Exception:
await self.bot.send_message(callback.from_user.id, stats_text, reply_markup=keyboard, parse_mode='HTML')
else:
await self.bot.send_message(callback.from_user.id, stats_text, reply_markup=keyboard, parse_mode='HTML')
await callback.answer()
async def back_to_menu(self, callback: CallbackQuery, state: FSMContext):
"""Возврат в главное меню"""
await state.clear()
user = callback.from_user
# Регистрируем пользователя (если еще не зарегистрирован)
await self.db.register_user(
user_id=user.id,
username=user.username,
first_name=user.first_name,
last_name=user.last_name,
language_code=user.language_code or 'ru'
)
await state.set_state(QuizStates.choosing_mode)
keyboard = InlineKeyboardMarkup(inline_keyboard=[
[InlineKeyboardButton(text="🎯 Гостевой режим (QUIZ)", callback_data="guest_mode")],
[InlineKeyboardButton(text="📚 Тестирование по материалам", callback_data="test_mode")],
[InlineKeyboardButton(text="📊 Моя статистика", callback_data="stats")],
])
text = (f"👋 Добро пожаловать в Quiz Bot, {user.first_name}!\n\n"
"🎯 <b>Гостевой режим</b> - быстрая викторина для развлечения\n"
"📚 <b>Тестирование</b> - серьезное изучение материалов с результатами\n\n"
"Выберите режим работы:")
if callback.message and not isinstance(callback.message, InaccessibleMessage):
try:
await callback.message.edit_text(text, reply_markup=keyboard, parse_mode='HTML')
except Exception:
await self.bot.send_message(callback.from_user.id, text, reply_markup=keyboard, parse_mode='HTML')
else:
await self.bot.send_message(callback.from_user.id, text, reply_markup=keyboard, parse_mode='HTML')
await callback.answer()
def get_grade(self, score: float) -> str:
"""Получение оценки по проценту правильных ответов"""
if score >= 90:
return "Отлично! 🌟"
elif score >= 70:
return "Хорошо! 👍"
elif score >= 50:
return "Удовлетворительно 📚"
else:
return "Нужно подтянуть знания 📖"
async def start(self):
"""Запуск бота"""
# Проверяем токен
if not config.bot_token or config.bot_token in ['your_bot_token_here', 'test_token_for_demo_purposes']:
print("❌ Ошибка: не настроен BOT_TOKEN в файле .env")
return False
# Инициализируем базу данных
await self.db.init_db()
print("✅ Bot starting...")
print(f"🗄️ Database: {config.database_path}")
print(f"📁 CSV files: {config.csv_data_path}")
try:
await self.dp.start_polling(self.bot)
except Exception as e:
logging.error(f"Error starting bot: {e}")
return False
async def main():
"""Главная функция"""
bot = QuizBot()
await bot.start()
if __name__ == "__main__":
asyncio.run(main())

1
src/database/__init__.py Normal file
View File

@@ -0,0 +1 @@
# Database package

374
src/database/database.py Normal file
View File

@@ -0,0 +1,374 @@
import aiosqlite
import logging
from typing import List, Dict, Optional, Tuple, Union
import json
class DatabaseManager:
def __init__(self, db_path: str):
self.db_path = db_path
async def init_database(self):
"""Инициализация базы данных и создание таблиц"""
async with aiosqlite.connect(self.db_path) as db:
# Таблица пользователей
await db.execute("""
CREATE TABLE IF NOT EXISTS users (
user_id INTEGER PRIMARY KEY,
username TEXT,
first_name TEXT,
last_name TEXT,
language_code TEXT DEFAULT 'ru',
registration_date TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
is_guest BOOLEAN DEFAULT TRUE,
total_questions INTEGER DEFAULT 0,
correct_answers INTEGER DEFAULT 0
)
""")
# Таблица тестов
await db.execute("""
CREATE TABLE IF NOT EXISTS tests (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
description TEXT,
level INTEGER,
category TEXT,
csv_file TEXT,
created_date TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
is_active BOOLEAN DEFAULT TRUE
)
""")
# Таблица вопросов
await db.execute("""
CREATE TABLE IF NOT EXISTS questions (
id INTEGER PRIMARY KEY AUTOINCREMENT,
test_id INTEGER,
question TEXT NOT NULL,
option1 TEXT NOT NULL,
option2 TEXT NOT NULL,
option3 TEXT NOT NULL,
option4 TEXT NOT NULL,
correct_answer INTEGER NOT NULL,
FOREIGN KEY (test_id) REFERENCES tests (id)
)
""")
# Таблица результатов
await db.execute("""
CREATE TABLE IF NOT EXISTS results (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id INTEGER,
test_id INTEGER,
mode TEXT, -- 'guest' or 'test'
questions_asked INTEGER,
correct_answers INTEGER,
total_time INTEGER,
start_time TIMESTAMP,
end_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
score REAL,
FOREIGN KEY (user_id) REFERENCES users (user_id),
FOREIGN KEY (test_id) REFERENCES tests (id)
)
""")
# Таблица активных сессий
await db.execute("""
CREATE TABLE IF NOT EXISTS active_sessions (
user_id INTEGER PRIMARY KEY,
test_id INTEGER,
current_question INTEGER DEFAULT 0,
correct_count INTEGER DEFAULT 0,
questions_data TEXT, -- JSON с вопросами сессии
start_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
mode TEXT,
FOREIGN KEY (user_id) REFERENCES users (user_id),
FOREIGN KEY (test_id) REFERENCES tests (id)
)
""")
await db.commit()
logging.info("Database initialized successfully")
async def register_user(self, user_id: int, username: Optional[str] = None,
first_name: Optional[str] = None, last_name: Optional[str] = None,
language_code: str = 'ru', is_guest: bool = True) -> bool:
"""Регистрация нового пользователя"""
try:
async with aiosqlite.connect(self.db_path) as db:
await db.execute("""
INSERT OR REPLACE INTO users
(user_id, username, first_name, last_name, language_code, is_guest)
VALUES (?, ?, ?, ?, ?, ?)
""", (user_id, username, first_name, last_name, language_code, is_guest))
await db.commit()
return True
except Exception as e:
logging.error(f"Error registering user {user_id}: {e}")
return False
async def get_user(self, user_id: int) -> Optional[Dict]:
"""Получение данных пользователя"""
try:
async with aiosqlite.connect(self.db_path) as db:
cursor = await db.execute(
"SELECT * FROM users WHERE user_id = ?", (user_id,)
)
row = await cursor.fetchone()
if row:
columns = [description[0] for description in cursor.description]
return dict(zip(columns, row))
return None
except Exception as e:
logging.error(f"Error getting user {user_id}: {e}")
return None
async def add_test(self, name: str, description: str, level: int,
category: str, csv_file: str) -> Optional[int]:
"""Добавление нового теста"""
try:
async with aiosqlite.connect(self.db_path) as db:
cursor = await db.execute("""
INSERT INTO tests (name, description, level, category, csv_file)
VALUES (?, ?, ?, ?, ?)
""", (name, description, level, category, csv_file))
await db.commit()
return cursor.lastrowid
except Exception as e:
logging.error(f"Error adding test: {e}")
return None
async def get_tests_by_category(self, category: Optional[str] = None) -> List[Dict]:
"""Получение тестов по категории"""
try:
async with aiosqlite.connect(self.db_path) as db:
if category:
cursor = await db.execute(
"SELECT * FROM tests WHERE category = ? AND is_active = TRUE ORDER BY level",
(category,)
)
else:
cursor = await db.execute(
"SELECT * FROM tests WHERE is_active = TRUE ORDER BY category, level"
)
rows = await cursor.fetchall()
columns = [description[0] for description in cursor.description]
return [dict(zip(columns, row)) for row in rows]
except Exception as e:
logging.error(f"Error getting tests: {e}")
return []
async def add_questions_to_test(self, test_id: int, questions: List[Dict]) -> bool:
"""Добавление вопросов к тесту"""
try:
async with aiosqlite.connect(self.db_path) as db:
for q in questions:
await db.execute("""
INSERT INTO questions
(test_id, question, option1, option2, option3, option4, correct_answer)
VALUES (?, ?, ?, ?, ?, ?, ?)
""", (test_id, q['question'], q['option1'], q['option2'],
q['option3'], q['option4'], q['correct_answer']))
await db.commit()
return True
except Exception as e:
logging.error(f"Error adding questions to test {test_id}: {e}")
return False
async def get_random_questions(self, test_id: int, count: int = 10) -> List[Dict]:
"""Получение случайных вопросов из теста"""
try:
async with aiosqlite.connect(self.db_path) as db:
cursor = await db.execute("""
SELECT * FROM questions WHERE test_id = ?
ORDER BY RANDOM() LIMIT ?
""", (test_id, count))
rows = await cursor.fetchall()
columns = [description[0] for description in cursor.description]
return [dict(zip(columns, row)) for row in rows]
except Exception as e:
logging.error(f"Error getting random questions: {e}")
return []
async def start_session(self, user_id: int, test_id: int,
questions: List[Dict], mode: str) -> bool:
"""Начало новой сессии викторины"""
try:
async with aiosqlite.connect(self.db_path) as db:
questions_json = json.dumps(questions)
await db.execute("""
INSERT OR REPLACE INTO active_sessions
(user_id, test_id, questions_data, mode)
VALUES (?, ?, ?, ?)
""", (user_id, test_id, questions_json, mode))
await db.commit()
return True
except Exception as e:
logging.error(f"Error starting session: {e}")
return False
async def get_active_session(self, user_id: int) -> Optional[Dict]:
"""Получение активной сессии пользователя"""
try:
async with aiosqlite.connect(self.db_path) as db:
cursor = await db.execute(
"SELECT * FROM active_sessions WHERE user_id = ?", (user_id,)
)
row = await cursor.fetchone()
if row:
columns = [description[0] for description in cursor.description]
session = dict(zip(columns, row))
session['questions_data'] = json.loads(session['questions_data'])
return session
return None
except Exception as e:
logging.error(f"Error getting active session: {e}")
return None
async def update_session_progress(self, user_id: int, question_num: int,
correct_count: int) -> bool:
"""Обновление прогресса сессии"""
try:
async with aiosqlite.connect(self.db_path) as db:
await db.execute("""
UPDATE active_sessions
SET current_question = ?, correct_count = ?
WHERE user_id = ?
""", (question_num, correct_count, user_id))
await db.commit()
return True
except Exception as e:
logging.error(f"Error updating session progress: {e}")
return False
async def update_session_questions(self, user_id: int, questions_data: list) -> bool:
"""Обновление данных вопросов в сессии (например, после перемешивания)"""
try:
async with aiosqlite.connect(self.db_path) as db:
questions_json = json.dumps(questions_data, ensure_ascii=False)
await db.execute("""
UPDATE active_sessions
SET questions_data = ?
WHERE user_id = ?
""", (questions_json, user_id))
await db.commit()
return True
except Exception as e:
logging.error(f"Error updating session questions: {e}")
return False
async def finish_session(self, user_id: int, score: float) -> bool:
"""Завершение сессии и сохранение результатов"""
try:
async with aiosqlite.connect(self.db_path) as db:
# Получаем данные сессии
session = await self.get_active_session(user_id)
if not session:
return False
# Сохраняем результат
await db.execute("""
INSERT INTO results
(user_id, test_id, mode, questions_asked, correct_answers, score)
VALUES (?, ?, ?, ?, ?, ?)
""", (user_id, session['test_id'], session['mode'],
len(session['questions_data']), session['correct_count'], score))
# Обновляем статистику пользователя
await db.execute("""
UPDATE users
SET total_questions = total_questions + ?,
correct_answers = correct_answers + ?
WHERE user_id = ?
""", (len(session['questions_data']), session['correct_count'], user_id))
# Удаляем активную сессию
await db.execute("DELETE FROM active_sessions WHERE user_id = ?", (user_id,))
await db.commit()
return True
except Exception as e:
logging.error(f"Error finishing session: {e}")
return False
async def get_user_stats(self, user_id: int) -> Optional[Dict]:
"""Получение статистики пользователя"""
try:
async with aiosqlite.connect(self.db_path) as db:
cursor = await db.execute("""
SELECT
u.total_questions,
u.correct_answers,
COUNT(r.id) as sessions_completed,
MAX(r.score) as best_score,
AVG(r.score) as average_score,
COUNT(CASE WHEN r.mode = 'guest' THEN 1 END) as guest_sessions,
COUNT(CASE WHEN r.mode = 'test' THEN 1 END) as test_sessions
FROM users u
LEFT JOIN results r ON u.user_id = r.user_id
WHERE u.user_id = ?
GROUP BY u.user_id
""", (user_id,))
row = await cursor.fetchone()
if row:
columns = [description[0] for description in cursor.description]
stats = dict(zip(columns, row))
return stats
return None
except Exception as e:
logging.error(f"Error getting user stats: {e}")
return None
async def get_recent_results(self, user_id: int, limit: int = 5) -> List[Dict]:
"""Получение последних результатов пользователя"""
try:
async with aiosqlite.connect(self.db_path) as db:
cursor = await db.execute("""
SELECT
r.mode,
r.questions_asked,
r.correct_answers,
r.score,
r.end_time,
t.name as test_name,
t.level
FROM results r
LEFT JOIN tests t ON r.test_id = t.id
WHERE r.user_id = ?
ORDER BY r.end_time DESC
LIMIT ?
""", (user_id, limit))
rows = await cursor.fetchall()
columns = [description[0] for description in cursor.description]
return [dict(zip(columns, row)) for row in rows]
except Exception as e:
logging.error(f"Error getting recent results: {e}")
return []
async def get_category_stats(self, user_id: int) -> List[Dict]:
"""Получение статистики по категориям"""
try:
async with aiosqlite.connect(self.db_path) as db:
cursor = await db.execute("""
SELECT
t.category,
COUNT(r.id) as attempts,
AVG(r.score) as avg_score,
MAX(r.score) as best_score,
SUM(r.questions_asked) as total_questions,
SUM(r.correct_answers) as correct_answers
FROM results r
JOIN tests t ON r.test_id = t.id
WHERE r.user_id = ?
GROUP BY t.category
ORDER BY attempts DESC
""", (user_id,))
rows = await cursor.fetchall()
columns = [description[0] for description in cursor.description]
return [dict(zip(columns, row)) for row in rows]
except Exception as e:
logging.error(f"Error getting category stats: {e}")
return []

View File

@@ -0,0 +1,390 @@
import aiosqlite
import logging
from typing import List, Dict, Optional, Tuple, Union
import json
class DatabaseManager:
def __init__(self, db_path: str):
self.db_path = db_path
async def init_database(self):
"""Инициализация базы данных и создание таблиц"""
async with aiosqlite.connect(self.db_path) as db:
# Таблица пользователей
await db.execute("""
CREATE TABLE IF NOT EXISTS users (
user_id INTEGER PRIMARY KEY,
username TEXT,
first_name TEXT,
last_name TEXT,
language_code TEXT DEFAULT 'ru',
registration_date TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
is_guest BOOLEAN DEFAULT TRUE,
total_questions INTEGER DEFAULT 0,
correct_answers INTEGER DEFAULT 0
)
""")
# Таблица тестов
await db.execute("""
CREATE TABLE IF NOT EXISTS tests (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
description TEXT,
level INTEGER,
category TEXT,
csv_file TEXT,
created_date TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
is_active BOOLEAN DEFAULT TRUE
)
""")
# Таблица вопросов
await db.execute("""
CREATE TABLE IF NOT EXISTS questions (
id INTEGER PRIMARY KEY AUTOINCREMENT,
test_id INTEGER,
question TEXT NOT NULL,
option1 TEXT NOT NULL,
option2 TEXT NOT NULL,
option3 TEXT NOT NULL,
option4 TEXT NOT NULL,
correct_answer INTEGER NOT NULL,
FOREIGN KEY (test_id) REFERENCES tests (id)
)
""")
# Таблица результатов
await db.execute("""
CREATE TABLE IF NOT EXISTS results (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id INTEGER,
test_id INTEGER,
mode TEXT, -- 'guest' or 'test'
questions_asked INTEGER,
correct_answers INTEGER,
total_time INTEGER,
start_time TIMESTAMP,
end_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
score REAL,
FOREIGN KEY (user_id) REFERENCES users (user_id),
FOREIGN KEY (test_id) REFERENCES tests (id)
)
""")
# Таблица активных сессий
await db.execute("""
CREATE TABLE IF NOT EXISTS active_sessions (
user_id INTEGER PRIMARY KEY,
test_id INTEGER,
current_question INTEGER DEFAULT 0,
correct_count INTEGER DEFAULT 0,
questions_data TEXT, -- JSON с вопросами сессии
start_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
mode TEXT,
FOREIGN KEY (user_id) REFERENCES users (user_id),
FOREIGN KEY (test_id) REFERENCES tests (id)
)
""")
await db.commit()
logging.info("Database initialized successfully")
async def register_user(self, user_id: int, username: Optional[str] = None,
first_name: Optional[str] = None, last_name: Optional[str] = None,
language_code: str = 'ru', is_guest: bool = True) -> bool:
"""Регистрация нового пользователя"""
try:
async with aiosqlite.connect(self.db_path) as db:
await db.execute("""
INSERT OR REPLACE INTO users
(user_id, username, first_name, last_name, language_code, is_guest)
VALUES (?, ?, ?, ?, ?, ?)
""", (user_id, username, first_name, last_name, language_code, is_guest))
await db.commit()
return True
except Exception as e:
logging.error(f"Error registering user {user_id}: {e}")
return False
async def get_user(self, user_id: int) -> Optional[Dict]:
"""Получение данных пользователя"""
try:
async with aiosqlite.connect(self.db_path) as db:
cursor = await db.execute(
"SELECT * FROM users WHERE user_id = ?", (user_id,)
)
row = await cursor.fetchone()
if row:
columns = [description[0] for description in cursor.description]
return dict(zip(columns, row))
return None
except Exception as e:
logging.error(f"Error getting user {user_id}: {e}")
return None
async def add_test(self, name: str, description: str, level: int,
category: str, csv_file: str) -> Optional[int]:
"""Добавление нового теста"""
try:
async with aiosqlite.connect(self.db_path) as db:
cursor = await db.execute("""
INSERT INTO tests (name, description, level, category, csv_file)
VALUES (?, ?, ?, ?, ?)
""", (name, description, level, category, csv_file))
await db.commit()
return cursor.lastrowid
except Exception as e:
logging.error(f"Error adding test: {e}")
return None
async def get_tests_by_category(self, category: Optional[str] = None) -> List[Dict]:
"""Получение тестов по категории"""
try:
async with aiosqlite.connect(self.db_path) as db:
if category:
cursor = await db.execute(
"SELECT * FROM tests WHERE category = ? AND is_active = TRUE ORDER BY level",
(category,)
)
else:
cursor = await db.execute(
"SELECT * FROM tests WHERE is_active = TRUE ORDER BY category, level"
)
rows = await cursor.fetchall()
columns = [description[0] for description in cursor.description]
return [dict(zip(columns, row)) for row in rows]
except Exception as e:
logging.error(f"Error getting tests: {e}")
return []
async def add_questions_to_test(self, test_id: int, questions: List[Dict]) -> bool:
"""Добавление вопросов к тесту"""
try:
async with aiosqlite.connect(self.db_path) as db:
for q in questions:
await db.execute("""
INSERT INTO questions
(test_id, question, option1, option2, option3, option4, correct_answer)
VALUES (?, ?, ?, ?, ?, ?, ?)
""", (test_id, q['question'], q['option1'], q['option2'],
q['option3'], q['option4'], q['correct_answer']))
await db.commit()
return True
except Exception as e:
logging.error(f"Error adding questions to test {test_id}: {e}")
return False
async def get_random_questions(self, test_id: int, count: int = 10) -> List[Dict]:
"""Получение случайных вопросов из теста"""
try:
async with aiosqlite.connect(self.db_path) as db:
cursor = await db.execute("""
SELECT * FROM questions WHERE test_id = ?
ORDER BY RANDOM() LIMIT ?
""", (test_id, count))
rows = await cursor.fetchall()
columns = [description[0] for description in cursor.description]
return [dict(zip(columns, row)) for row in rows]
except Exception as e:
logging.error(f"Error getting random questions: {e}")
return []
async def start_session(self, user_id: int, test_id: int,
questions: List[Dict], mode: str) -> bool:
"""Начало новой сессии викторины"""
try:
async with aiosqlite.connect(self.db_path) as db:
questions_json = json.dumps(questions)
await db.execute("""
INSERT OR REPLACE INTO active_sessions
(user_id, test_id, questions_data, mode)
VALUES (?, ?, ?, ?)
""", (user_id, test_id, questions_json, mode))
await db.commit()
return True
except Exception as e:
logging.error(f"Error starting session: {e}")
return False
async def get_active_session(self, user_id: int) -> Optional[Dict]:
"""Получение активной сессии пользователя"""
try:
async with aiosqlite.connect(self.db_path) as db:
cursor = await db.execute(
"SELECT * FROM active_sessions WHERE user_id = ?", (user_id,)
)
row = await cursor.fetchone()
if row:
columns = [description[0] for description in cursor.description]
session = dict(zip(columns, row))
session['questions_data'] = json.loads(session['questions_data'])
return session
return None
except Exception as e:
logging.error(f"Error getting active session: {e}")
return None
async def update_session_progress(self, user_id: int, question_num: int,
correct_count: int) -> bool:
"""Обновление прогресса сессии"""
try:
async with aiosqlite.connect(self.db_path) as db:
await db.execute("""
UPDATE active_sessions
SET current_question = ?, correct_count = ?
WHERE user_id = ?
""", (question_num, correct_count, user_id))
await db.commit()
return True
except Exception as e:
logging.error(f"Error updating session progress: {e}")
return False
async def update_session_questions(self, user_id: int, questions_data: list) -> bool:
"""Обновление данных вопросов в сессии (например, после перемешивания)"""
try:
async with aiosqlite.connect(self.db_path) as db:
questions_json = json.dumps(questions_data, ensure_ascii=False)
await db.execute("""
UPDATE active_sessions
SET questions_data = ?
WHERE user_id = ?
""", (questions_json, user_id))
await db.commit()
return True
except Exception as e:
logging.error(f"Error updating session questions: {e}")
return False
async def finish_session(self, user_id: int, score: float) -> bool:
"""Завершение сессии и сохранение результатов"""
try:
async with aiosqlite.connect(self.db_path) as db:
# Получаем данные сессии
session = await self.get_active_session(user_id)
if not session:
return False
# Сохраняем результат
await db.execute("""
INSERT INTO results
(user_id, test_id, mode, questions_asked, correct_answers, score)
VALUES (?, ?, ?, ?, ?, ?)
""", (user_id, session['test_id'], session['mode'],
len(session['questions_data']), session['correct_count'], score))
# Обновляем статистику пользователя
await db.execute("""
UPDATE users
SET total_questions = total_questions + ?,
correct_answers = correct_answers + ?
WHERE user_id = ?
""", (len(session['questions_data']), session['correct_count'], user_id))
# Удаляем активную сессию
await db.execute("DELETE FROM active_sessions WHERE user_id = ?", (user_id,))
await db.commit()
return True
except Exception as e:
logging.error(f"Error finishing session: {e}")
return False
async def get_user_stats(self, user_id: int) -> Optional[Dict]:
"""Получение статистики пользователя"""
try:
async with aiosqlite.connect(self.db_path) as db:
cursor = await db.execute("""
SELECT
u.total_questions,
u.correct_answers,
COUNT(r.id) as sessions_completed,
MAX(r.score) as best_score,
AVG(r.score) as average_score,
COUNT(CASE WHEN r.mode = 'guest' THEN 1 END) as guest_sessions,
COUNT(CASE WHEN r.mode = 'test' THEN 1 END) as test_sessions
FROM users u
LEFT JOIN results r ON u.user_id = r.user_id
WHERE u.user_id = ?
GROUP BY u.user_id
""", (user_id,))
row = await cursor.fetchone()
if row:
columns = [description[0] for description in cursor.description]
stats = dict(zip(columns, row))
return stats
return None
except Exception as e:
logging.error(f"Error getting user stats: {e}")
return None
return None
async def get_recent_results(self, user_id: int, limit: int = 5) -> List[Dict]:
"""Получение последних результатов пользователя"""
try:
async with aiosqlite.connect(self.db_path) as db:
cursor = await db.execute("""
SELECT
r.mode,
r.questions_asked,
r.correct_answers,
r.score,
r.end_time,
t.name as test_name,
t.level
FROM results r
LEFT JOIN tests t ON r.test_id = t.id
WHERE r.user_id = ?
ORDER BY r.end_time DESC
LIMIT ?
""", (user_id, limit))
rows = await cursor.fetchall()
columns = [description[0] for description in cursor.description]
return [dict(zip(columns, row)) for row in rows]
except Exception as e:
logging.error(f"Error getting recent results: {e}")
return []
async def get_category_stats(self, user_id: int) -> List[Dict]:
"""Получение статистики по категориям"""
try:
async with aiosqlite.connect(self.db_path) as db:
cursor = await db.execute("""
SELECT
t.category,
COUNT(r.id) as attempts,
AVG(r.score) as avg_score,
MAX(r.score) as best_score,
SUM(r.questions_asked) as total_questions,
SUM(r.correct_answers) as correct_answers
FROM results r
JOIN tests t ON r.test_id = t.id
WHERE r.user_id = ?
GROUP BY t.category
ORDER BY attempts DESC
""", (user_id,))
rows = await cursor.fetchall()
columns = [description[0] for description in cursor.description]
return [dict(zip(columns, row)) for row in rows]
except Exception as e:
logging.error(f"Error getting category stats: {e}")
return []
AVG(r.score) as average_score,
MAX(r.score) as best_score
FROM users u
LEFT JOIN results r ON u.user_id = r.user_id
WHERE u.user_id = ?
GROUP BY u.user_id
""", (user_id,))
row = await cursor.fetchone()
if row:
columns = [description[0] for description in cursor.description]
return dict(zip(columns, row))
return None
except Exception as e:
logging.error(f"Error getting user stats: {e}")
return None

1
src/handlers/__init__.py Normal file
View File

@@ -0,0 +1 @@
# Handlers package

1
src/services/__init__.py Normal file
View File

@@ -0,0 +1 @@
# Services package

967
src/services/csv_service.py Normal file
View File

@@ -0,0 +1,967 @@
import pandas as pd
import os
import logging
from typing import List, Dict
import aiofiles
import csv
class CSVQuizLoader:
"""Класс для загрузки викторин из CSV файлов"""
def __init__(self, csv_data_path: str):
self.csv_data_path = csv_data_path
async def load_questions_from_csv(self, filename: str) -> List[Dict]:
"""
Загрузка вопросов из CSV файла
Формат: Вопрос, Ответ1, Ответ2, Ответ3, Ответ4, Правильный_ответ
"""
filepath = os.path.join(self.csv_data_path, filename)
questions = []
try:
async with aiofiles.open(filepath, mode='r', encoding='utf-8') as file:
content = await file.read()
# Парсим CSV построчно с более надежным парсером
lines = content.strip().split('\n')
# Пропускаем заголовок
if lines:
lines = lines[1:]
for line_num, line in enumerate(lines, 2):
try:
# Используем csv.reader для одной строки
reader = csv.reader([line], quotechar='"', delimiter=',', skipinitialspace=True)
row = next(reader)
if len(row) >= 6:
# Проверяем, что последний элемент - число
correct_answer_str = row[5].strip()
try:
correct_answer = int(correct_answer_str)
except ValueError:
logging.error(f"Invalid correct_answer '{correct_answer_str}' in {filename} line {line_num}")
continue
if correct_answer not in [1, 2, 3, 4]:
logging.error(f"Correct answer must be 1-4, got {correct_answer} in {filename} line {line_num}")
continue
question_data = {
'question': row[0].strip(),
'option1': row[1].strip(),
'option2': row[2].strip(),
'option3': row[3].strip(),
'option4': row[4].strip(),
'correct_answer': correct_answer
}
questions.append(question_data)
else:
logging.warning(f"Not enough columns in {filename} line {line_num}: {len(row)} columns")
except Exception as row_error:
logging.error(f"Error parsing row {line_num} in {filename}: {row_error}")
continue
except Exception as e:
logging.error(f"Error loading CSV {filename}: {e}")
return []
return questions
async def validate_csv_format(self, filename: str) -> bool:
"""Проверка формата CSV файла"""
try:
questions = await self.load_questions_from_csv(filename)
if not questions:
return False
# Проверяем каждый вопрос
for q in questions:
if (not q['question'] or
not all([q['option1'], q['option2'], q['option3'], q['option4']]) or
q['correct_answer'] not in [1, 2, 3, 4]):
return False
return True
except Exception as e:
logging.error(f"CSV validation error: {e}")
return False
def get_available_csv_files(self) -> List[str]:
"""Получение списка доступных CSV файлов"""
try:
files = []
for filename in os.listdir(self.csv_data_path):
if filename.endswith('.csv'):
files.append(filename)
return files
except Exception as e:
logging.error(f"Error getting CSV files: {e}")
return []
class QuizGenerator:
"""Генератор тестовых данных для викторин"""
@staticmethod
def generate_korean_level_1() -> List[Dict]:
"""Генерация вопросов корейского языка уровень 1"""
return [
{
'question': 'Как сказать "привет" на корейском?',
'option1': '안녕하세요',
'option2': '감사합니다',
'option3': '죄송합니다',
'option4': '안녕히 가세요',
'correct_answer': 1
},
{
'question': 'Что означает "감사합니다"?',
'option1': 'До свидания',
'option2': 'Спасибо',
'option3': 'Извините',
'option4': 'Пожалуйста',
'correct_answer': 2
},
{
'question': 'Как сказать "да" по-корейски?',
'option1': '아니요',
'option2': '',
'option3': '모르겠어요',
'option4': '괜찮아요',
'correct_answer': 2
},
{
'question': 'Что означает "이름"?',
'option1': 'Возраст',
'option2': 'Имя',
'option3': 'Адрес',
'option4': 'Работа',
'correct_answer': 2
},
{
'question': 'Как спросить "Как дела?" на корейском?',
'option1': '어떻게 지내세요?',
'option2': '몇 살이에요?',
'option3': '어디에 살아요?',
'option4': '뭘 해요?',
'correct_answer': 1
},
{
'question': 'Что означает ""?',
'option1': 'Огонь',
'option2': 'Вода',
'option3': 'Земля',
'option4': 'Воздух',
'correct_answer': 2
},
{
'question': 'Как сказать "нет" по-корейски?',
'option1': '',
'option2': '아니요',
'option3': '좋아요',
'option4': '싫어요',
'correct_answer': 2
},
{
'question': 'Что означает "학생"?',
'option1': 'Учитель',
'option2': 'Студент',
'option3': 'Врач',
'option4': 'Повар',
'correct_answer': 2
},
{
'question': 'Как сказать "один" по-корейски?',
'option1': '하나',
'option2': '',
'option3': '',
'option4': '',
'correct_answer': 1
},
{
'question': 'Что означает ""?',
'option1': 'Школа',
'option2': 'Дом',
'option3': 'Больница',
'option4': 'Магазин',
'correct_answer': 2
},
{
'question': 'Как спросить "Сколько это стоит?"',
'option1': '얼마예요?',
'option2': '뭐예요?',
'option3': '어디예요?',
'option4': '언제예요?',
'correct_answer': 1
},
{
'question': 'Что означает "먹다"?',
'option1': 'Пить',
'option2': 'Есть',
'option3': 'Спать',
'option4': 'Идти',
'correct_answer': 2
},
{
'question': 'Как сказать "красивый" по-корейски?',
'option1': '예쁘다',
'option2': '큰다',
'option3': '작다',
'option4': '좋다',
'correct_answer': 1
},
{
'question': 'Что означает "시간"?',
'option1': 'День',
'option2': 'Время',
'option3': 'Год',
'option4': 'Месяц',
'correct_answer': 2
},
{
'question': 'Как сказать "семья" по-корейски?',
'option1': '친구',
'option2': '가족',
'option3': '선생님',
'option4': '동생',
'correct_answer': 2
},
{
'question': 'Что означает "좋아하다"?',
'option1': 'Ненавидеть',
'option2': 'Любить/нравиться',
'option3': 'Знать',
'option4': 'Понимать',
'correct_answer': 2
},
{
'question': 'Как сказать "книга" по-корейски?',
'option1': '',
'option2': '',
'option3': '종이',
'option4': '연필',
'correct_answer': 2
},
{
'question': 'Что означает "오다"?',
'option1': 'Уходить',
'option2': 'Приходить',
'option3': 'Стоять',
'option4': 'Сидеть',
'correct_answer': 2
},
{
'question': 'Как спросить "Где туалет?"',
'option1': '화장실이 어디예요?',
'option2': '학교가 어디예요?',
'option3': '집이 어디예요?',
'option4': '병원이 어디예요?',
'correct_answer': 1
},
{
'question': 'Что означает "친구"?',
'option1': 'Враг',
'option2': 'Друг',
'option3': 'Учитель',
'option4': 'Родитель',
'correct_answer': 2
}
]
@staticmethod
def generate_korean_level_2() -> List[Dict]:
"""Генерация вопросов корейского языка уровень 2"""
return [
{
'question': 'Как сказать "Я изучаю корейский язык"?',
'option1': '저는 한국어를 배워요',
'option2': '저는 영어를 배워요',
'option3': '저는 일본어를 배워요',
'option4': '저는 중국어를 배워요',
'correct_answer': 1
},
{
'question': 'Что означает "날씨"?',
'option1': 'Время',
'option2': 'Погода',
'option3': 'Место',
'option4': 'Люди',
'correct_answer': 2
},
{
'question': 'Как сказать "Вчера я ходил в магазин"?',
'option1': '어제 가게에 갔어요',
'option2': '오늘 가게에 갔어요',
'option3': '내일 가게에 갈 거예요',
'option4': '지금 가게에 가요',
'correct_answer': 1
},
{
'question': 'Что означает "음식"?',
'option1': 'Напиток',
'option2': 'Еда',
'option3': 'Одежда',
'option4': 'Деньги',
'correct_answer': 2
},
{
'question': 'Как спросить "Во сколько открывается магазин?"',
'option1': '가게가 몇 시에 열어요?',
'option2': '가게가 어디에 있어요?',
'option3': '가게에서 뭘 팔아요?',
'option4': '가게가 언제 문을 닫아요?',
'correct_answer': 1
},
{
'question': 'Что означает "여행"?',
'option1': 'Работа',
'option2': 'Путешествие',
'option3': 'Учеба',
'option4': 'Отдых',
'correct_answer': 2
},
{
'question': 'Как сказать "Мне нужно купить билет"?',
'option1': '표를 사야 해요',
'option2': '표를 팔아야 해요',
'option3': '표를 잃어버렸어요',
'option4': '표가 없어요',
'correct_answer': 1
},
{
'question': 'Что означает "건강"?',
'option1': 'Болезнь',
'option2': 'Здоровье',
'option3': 'Лекарство',
'option4': 'Больница',
'correct_answer': 2
},
{
'question': 'Как сказать "Сегодня очень жарко"?',
'option1': '오늘 너무 더워요',
'option2': '오늘 너무 추워요',
'option3': '오늘 비가 와요',
'option4': '오늘 눈이 와요',
'correct_answer': 1
},
{
'question': 'Что означает "약속"?',
'option1': 'Встреча',
'option2': 'Обещание/назначенная встреча',
'option3': 'Работа',
'option4': 'Дом',
'correct_answer': 2
},
{
'question': 'Как спросить "Можно ли здесь курить?"',
'option1': '여기서 담배를 피워도 돼요?',
'option2': '여기서 음식을 먹어도 돼요?',
'option3': '여기서 사진을 찍어도 돼요?',
'option4': '여기서 전화해도 돼요?',
'correct_answer': 1
},
{
'question': 'Что означает "교통"?',
'option1': 'Дорога',
'option2': 'Транспорт',
'option3': 'Машина',
'option4': 'Автобус',
'correct_answer': 2
},
{
'question': 'Как сказать "Я опаздываю на работу"?',
'option1': '회사에 늦어요',
'option2': '회사에 일찍 가요',
'option3': '회사에서 쉬어요',
'option4': '회사에 안 가요',
'correct_answer': 1
},
{
'question': 'Что означает "문화"?',
'option1': 'История',
'option2': 'Культура',
'option3': 'Искусство',
'option4': 'Музыка',
'correct_answer': 2
},
{
'question': 'Как спросить "Сколько времени займет дорога?"',
'option1': '얼마나 걸려요?',
'option2': '얼마예요?',
'option3': '몇 개예요?',
'option4': '언제예요?',
'correct_answer': 1
},
{
'question': 'Что означает "경험"?',
'option1': 'Знания',
'option2': 'Опыт',
'option3': 'Умения',
'option4': 'Образование',
'correct_answer': 2
},
{
'question': 'Как сказать "Я хочу изучать корейскую культуру"?',
'option1': '한국 문화를 공부하고 싶어요',
'option2': '한국 음식을 먹고 싶어요',
'option3': '한국에 가고 싶어요',
'option4': '한국 친구를 만나고 싶어요',
'correct_answer': 1
},
{
'question': 'Что означает "예약"?',
'option1': 'Отмена',
'option2': 'Бронирование',
'option3': 'Покупка',
'option4': 'Продажа',
'correct_answer': 2
},
{
'question': 'Как сказать "Извините, я не понимаю"?',
'option1': '죄송해요, 이해 못 해요',
'option2': '죄송해요, 모르겠어요',
'option3': '죄송해요, 못 들었어요',
'option4': '죄송해요, 바빠요',
'correct_answer': 1
},
{
'question': 'Что означает "관심"?',
'option1': 'Скука',
'option2': 'Интерес',
'option3': 'Усталость',
'option4': 'Радость',
'correct_answer': 2
}
]
@staticmethod
def generate_korean_level_3() -> List[Dict]:
"""Генерация вопросов корейского языка уровень 3"""
return [
{
'question': 'Как правильно сказать "Я должен был прийти вчера"?',
'option1': '어제 와야 했어요',
'option2': '어제 왔어요',
'option3': '어제 올 거예요',
'option4': '어제 오고 싶었어요',
'correct_answer': 1
},
{
'question': 'Что означает "끝나다"?',
'option1': 'Начинаться',
'option2': 'Заканчиваться',
'option3': 'Продолжаться',
'option4': 'Останавливаться',
'correct_answer': 2
},
{
'question': 'Как сказать "Хотя дождь идет, я все равно пойду"?',
'option1': '비가 와도 갈 거예요',
'option2': '비가 와서 안 갈 거예요',
'option3': '비가 오면 갈 거예요',
'option4': '비가 오니까 갈 거예요',
'correct_answer': 1
},
{
'question': 'Что означает "준비하다"?',
'option1': 'Убирать',
'option2': 'Готовить(ся)',
'option3': 'Заканчивать',
'option4': 'Начинать',
'correct_answer': 2
},
{
'question': 'Как спросить "Не могли бы вы мне помочь?"',
'option1': '도와주실 수 있어요?',
'option2': '도와주세요',
'option3': '도와줘야 해요',
'option4': '도와주고 싶어요',
'correct_answer': 1
},
{
'question': 'Что означает "복잡하다"?',
'option1': 'Простой',
'option2': 'Сложный',
'option3': 'Легкий',
'option4': 'Понятный',
'correct_answer': 2
},
{
'question': 'Как сказать "Чем больше изучаю, тем интереснее становится"?',
'option1': '공부할수록 재미있어져요',
'option2': '공부하면 재미있어요',
'option3': '공부해서 재미있어요',
'option4': '공부하니까 재미있어요',
'correct_answer': 1
},
{
'question': 'Что означает "발표하다"?',
'option1': 'Слушать',
'option2': 'Презентовать',
'option3': 'Записывать',
'option4': 'Читать',
'correct_answer': 2
},
{
'question': 'Как сказать "Я привык к корейской еде"?',
'option1': '한국 음식에 익숙해졌어요',
'option2': '한국 음식을 좋아해요',
'option3': '한국 음식을 먹어요',
'option4': '한국 음식이 맛있어요',
'correct_answer': 1
},
{
'question': 'Что означает "놀라다"?',
'option1': 'Радоваться',
'option2': 'Удивляться',
'option3': 'Грустить',
'option4': 'Злиться',
'correct_answer': 2
},
{
'question': 'Как сказать "Если бы я знал корейский лучше..."?',
'option1': '한국어를 더 잘 알았다면...',
'option2': '한국어를 더 잘 알아요',
'option3': '한국어를 더 잘 알고 싶어요',
'option4': '한국어를 더 잘 배워요',
'correct_answer': 1
},
{
'question': 'Что означает "실수하다"?',
'option1': 'Успешно делать',
'option2': 'Ошибаться',
'option3': 'Исправлять',
'option4': 'Проверять',
'correct_answer': 2
},
{
'question': 'Как сказать "По-видимому, он не придет"?',
'option1': '아마 안 올 것 같아요',
'option2': '분명히 안 와요',
'option3': '꼭 안 와요',
'option4': '절대 안 와요',
'correct_answer': 1
},
{
'question': 'Что означает "통역하다"?',
'option1': 'Изучать',
'option2': 'Переводить (устно)',
'option3': 'Говорить',
'option4': 'Слушать',
'correct_answer': 2
},
{
'question': 'Как сказать "Я так и думал"?',
'option1': '그럴 줄 알았어요',
'option2': '그렇게 생각해요',
'option3': '그런 것 같아요',
'option4': '그러면 좋겠어요',
'correct_answer': 1
},
{
'question': 'Что означает "주의하다"?',
'option1': 'Игнорировать',
'option2': 'Быть внимательным',
'option3': 'Забывать',
'option4': 'Расслабляться',
'correct_answer': 2
},
{
'question': 'Как сказать "Мне стало лучше после отдыха"?',
'option1': '쉬고 나서 나아졌어요',
'option2': '쉬어서 좋아요',
'option3': '쉬고 싶어요',
'option4': '쉬면서 좋아요',
'correct_answer': 1
},
{
'question': 'Что означает "전달하다"?',
'option1': 'Получать',
'option2': 'Передавать',
'option3': 'Хранить',
'option4': 'Терять',
'correct_answer': 2
},
{
'question': 'Как сказать "Несмотря на трудности, продолжу"?',
'option1': '어려워도 계속할 거예요',
'option2': '어려우면 그만할 거예요',
'option3': '어려우니까 못 해요',
'option4': '어려워서 포기해요',
'correct_answer': 1
},
{
'question': 'Что означает "기대하다"?',
'option1': 'Бояться',
'option2': 'Ожидать',
'option3': 'Избегать',
'option4': 'Отказываться',
'correct_answer': 2
}
]
@staticmethod
def generate_korean_level_4() -> List[Dict]:
"""Генерация вопросов корейского языка уровень 4"""
return [
{
'question': 'Как правильно выразить "Говорят, что он очень умный"?',
'option1': '그가 아주 똑똑하다고 해요',
'option2': '그는 아주 똑똑해요',
'option3': '그가 아주 똑똑할 거예요',
'option4': '그는 아주 똑똑하고 싶어해요',
'correct_answer': 1
},
{
'question': 'Что означает "억지로"?',
'option1': 'Добровольно',
'option2': 'Принудительно',
'option3': 'Естественно',
'option4': 'Случайно',
'correct_answer': 2
},
{
'question': 'Как сказать "Было бы хорошо, если бы ты пришел"?',
'option1': '네가 왔으면 좋겠어요',
'option2': '네가 와서 좋아요',
'option3': '네가 올 거예요',
'option4': '네가 오면 돼요',
'correct_answer': 1
},
{
'question': 'Что означает "상당히"?',
'option1': 'Немного',
'option2': 'Довольно, значительно',
'option3': 'Совсем нет',
'option4': 'Только',
'correct_answer': 2
},
{
'question': 'Как выразить "Не только..., но и..."?',
'option1': '뿐만 아니라',
'option2': '그리고',
'option3': '하지만',
'option4': '그래서',
'correct_answer': 1
},
{
'question': 'Что означает "겸손하다"?',
'option1': 'Гордиться',
'option2': 'Быть скромным',
'option3': 'Хвастаться',
'option4': 'Завидовать',
'correct_answer': 2
},
{
'question': 'Как сказать "По сравнению с прошлым годом"?',
'option1': '작년에 비해서',
'option2': '작년부터',
'option3': '작년처럼',
'option4': '작년까지',
'correct_answer': 1
},
{
'question': 'Что означает "숙제하다"?',
'option1': 'Отдыхать',
'option2': 'Делать домашнее задание',
'option3': 'Играть',
'option4': 'Работать',
'correct_answer': 2
},
{
'question': 'Как выразить "Чем дальше, тем труднее становится"?',
'option1': '갈수록 어려워져요',
'option2': '가면 어려워요',
'option3': '가서 어려워요',
'option4': '가니까 어려워요',
'correct_answer': 1
},
{
'question': 'Что означает "포기하다"?',
'option1': 'Продолжать',
'option2': 'Сдаваться',
'option3': 'Начинать',
'option4': 'Повторять',
'correct_answer': 2
},
{
'question': 'Как сказать "Я делаю вид, что не знаю"?',
'option1': '모르는 척해요',
'option2': '정말 몰라요',
'option3': '알고 싶지 않아요',
'option4': '알려주지 않아요',
'correct_answer': 1
},
{
'question': 'Что означает "극복하다"?',
'option1': 'Избегать',
'option2': 'Преодолевать',
'option3': 'Создавать',
'option4': 'Ухудшать',
'correct_answer': 2
},
{
'question': 'Как выразить "В зависимости от обстоятельств"?',
'option1': '상황에 따라서',
'option2': '상황을 위해서',
'option3': '상황과 같이',
'option4': '상황을 통해서',
'correct_answer': 1
},
{
'question': 'Что означает "절약하다"?',
'option1': 'Тратить',
'option2': 'Экономить',
'option3': 'Зарабатывать',
'option4': 'Терять',
'correct_answer': 2
},
{
'question': 'Как сказать "Стоит попробовать"?',
'option1': '해 볼 만해요',
'option2': '하고 싶어요',
'option3': '해야 돼요',
'option4': '할 수 있어요',
'correct_answer': 1
},
{
'question': 'Что означает "원래"?',
'option1': 'Сейчас',
'option2': 'Изначально',
'option3': 'Потом',
'option4': 'Никогда',
'correct_answer': 2
},
{
'question': 'Как выразить "Я думаю о том, чтобы поехать в Корею"?',
'option1': '한국에 갈까 생각하고 있어요',
'option2': '한국에 가고 싶어요',
'option3': '한국에 갈 거예요',
'option4': '한국에 가야 해요',
'correct_answer': 1
},
{
'question': 'Что означает "신경 쓰다"?',
'option1': 'Игнорировать',
'option2': 'Беспокоиться о чем-то',
'option3': 'Забывать',
'option4': 'Отдыхать',
'correct_answer': 2
},
{
'question': 'Как сказать "Кажется, что будет дождь"?',
'option1': '비가 올 것 같아요',
'option2': '비가 와요',
'option3': '비가 왔어요',
'option4': '비가 오면 좋겠어요',
'correct_answer': 1
},
{
'question': 'Что означает "감동하다"?',
'option1': 'Скучать',
'option2': 'Быть тронутым',
'option3': 'Злиться',
'option4': 'Волноваться',
'correct_answer': 2
}
]
@staticmethod
def generate_korean_level_5() -> List[Dict]:
"""Генерация вопросов корейского языка уровень 5"""
return [
{
'question': 'Как правильно выразить "Если бы я не опоздал, я бы встретил его"?',
'option1': '늦지 않았더라면 그를 만났을 텐데요',
'option2': '늦지 않으면 그를 만날 거예요',
'option3': '늦지 않아서 그를 만났어요',
'option4': '늦지 않았으니까 그를 만나요',
'correct_answer': 1
},
{
'question': 'Что означает "간접적으로"?',
'option1': 'Прямо',
'option2': 'Косвенно',
'option3': 'Быстро',
'option4': 'Медленно',
'correct_answer': 2
},
{
'question': 'Как выразить "Хоть я и не эксперт, но думаю..."?',
'option1': '전문가는 아니지만 제 생각에는...',
'option2': '전문가라서 제 생각에는...',
'option3': '전문가가 되고 싶어서...',
'option4': '전문가처럼 생각해요',
'correct_answer': 1
},
{
'question': 'Что означает "의존하다"?',
'option1': 'Быть независимым',
'option2': 'Зависеть от чего-то',
'option3': 'Помогать',
'option4': 'Противостоять',
'correct_answer': 2
},
{
'question': 'Как сказать "По мере того как время идет"?',
'option1': '시간이 흘러가면서',
'option2': '시간이 있으면서',
'option3': '시간을 보내면서',
'option4': '시간이 부족해서',
'correct_answer': 1
},
{
'question': 'Что означает "추상적이다"?',
'option1': 'Конкретный',
'option2': 'Абстрактный',
'option3': 'Простой',
'option4': 'Реальный',
'correct_answer': 2
},
{
'question': 'Как выразить "Не то чтобы я не хотел, просто у меня нет времени"?',
'option1': '하기 싫은 건 아니고 시간이 없을 뿐이에요',
'option2': '하기 싫어서 시간이 없어요',
'option3': '시간이 없어서 하기 싫어요',
'option4': '하고 싶지만 시간이 없어요',
'correct_answer': 1
},
{
'question': 'Что означает "편견"?',
'option1': 'Объективность',
'option2': 'Предрассудок',
'option3': 'Понимание',
'option4': 'Знание',
'correct_answer': 2
},
{
'question': 'Как сказать "Чем больше думаю об этом, тем страннее кажется"?',
'option1': '생각하면 할수록 이상해요',
'option2': '생각해서 이상해요',
'option3': '생각하니까 이상해요',
'option4': '생각하면 이상할 거예요',
'correct_answer': 1
},
{
'question': 'Что означает "현실적이다"?',
'option1': 'Нереалистичный',
'option2': 'Реалистичный',
'option3': 'Фантастичный',
'option4': 'Идеалистичный',
'correct_answer': 2
},
{
'question': 'Как выразить "Даже если бы я попытался объяснить..."?',
'option1': '아무리 설명하려고 해도...',
'option2': '설명해서...',
'option3': '설명하니까...',
'option4': '설명하면...',
'correct_answer': 1
},
{
'question': 'Что означает "적극적으로"?',
'option1': 'Пассивно',
'option2': 'Активно',
'option3': 'Медленно',
'option4': 'Осторожно',
'correct_answer': 2
},
{
'question': 'Как сказать "В том случае, если случится проблема"?',
'option1': '문제가 생길 경우에는',
'option2': '문제가 생겨서',
'option3': '문제가 생기니까',
'option4': '문제가 생기면서',
'correct_answer': 1
},
{
'question': 'Что означает "객관적이다"?',
'option1': 'Субъективный',
'option2': 'Объективный',
'option3': 'Личный',
'option4': 'Эмоциональный',
'correct_answer': 2
},
{
'question': 'Как выразить "Я склонен думать, что..."?',
'option1': '...라고 생각하는 편이에요',
'option2': '...라고 생각해요',
'option3': '...라고 알아요',
'option4': '...라고 느껴요',
'correct_answer': 1
},
{
'question': 'Что означает "혁신적이다"?',
'option1': 'Традиционный',
'option2': 'Инновационный',
'option3': 'Старомодный',
'option4': 'Обычный',
'correct_answer': 2
},
{
'question': 'Как сказать "Несмотря на то что я много работал, результат не очень хороший"?',
'option1': '많이 노력했는데도 불구하고 결과가 좋지 않아요',
'option2': '많이 노력해서 결과가 좋아요',
'option3': '많이 노력하면 결과가 좋을 거예요',
'option4': '많이 노력하니까 결과가 좋아요',
'correct_answer': 1
},
{
'question': 'Что означает "효율적이다"?',
'option1': 'Неэффективный',
'option2': 'Эффективный',
'option3': 'Медленный',
'option4': 'Бесполезный',
'correct_answer': 2
},
{
'question': 'Как выразить "По-видимому, это не так просто, как казалось"?',
'option1': '생각보다 간단하지 않은 것 같아요',
'option2': '생각해서 간단하지 않아요',
'option3': '생각하니까 간단해요',
'option4': '생각하면 간단할 거예요',
'correct_answer': 1
},
{
'question': 'Что означает "체계적으로"?',
'option1': 'Хаотично',
'option2': 'Систематично',
'option3': 'Случайно',
'option4': 'Быстро',
'correct_answer': 2
}
]
@staticmethod
async def create_all_korean_csv_files(data_path: str):
"""Создание всех CSV файлов с корейскими тестами"""
levels_data = {
1: QuizGenerator.generate_korean_level_1(),
2: QuizGenerator.generate_korean_level_2(),
3: QuizGenerator.generate_korean_level_3(),
4: QuizGenerator.generate_korean_level_4(),
5: QuizGenerator.generate_korean_level_5(),
}
for level, questions in levels_data.items():
filename = f"korean_level_{level}.csv"
filepath = os.path.join(data_path, filename)
# Создаем CSV файл
async with aiofiles.open(filepath, mode='w', encoding='utf-8', newline='') as file:
await file.write("Вопрос,Ответ1,Ответ2,Ответ3,Ответ4,Правильный_ответ\n")
for q in questions:
# Экранируем кавычки в текстах
question = q["question"].replace('"', '""')
option1 = q["option1"].replace('"', '""')
option2 = q["option2"].replace('"', '""')
option3 = q["option3"].replace('"', '""')
option4 = q["option4"].replace('"', '""')
line = f'"{question}","{option1}","{option2}","{option3}","{option4}",{q["correct_answer"]}\n'
await file.write(line)

1
src/utils/__init__.py Normal file
View File

@@ -0,0 +1 @@
# Utils package