Files
quiz_test/src/database/database.py
Andrey K. Choi fcf27c1639
Some checks reported errors
continuous-integration/drone/push Build encountered an error
devops
2025-09-11 08:02:35 +09:00

449 lines
18 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import json
import logging
from typing import Dict, List, Optional, Tuple, Union
import aiosqlite
class DatabaseManager:
def __init__(self, db_path: str):
self.db_path = db_path
async def init_database(self):
"""Инициализация базы данных и создание таблиц"""
async with aiosqlite.connect(self.db_path) as db:
# Таблица пользователей
await db.execute(
"""
CREATE TABLE IF NOT EXISTS users (
user_id INTEGER PRIMARY KEY,
username TEXT,
first_name TEXT,
last_name TEXT,
language_code TEXT DEFAULT 'ru',
registration_date TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
is_guest BOOLEAN DEFAULT TRUE,
total_questions INTEGER DEFAULT 0,
correct_answers INTEGER DEFAULT 0
)
"""
)
# Таблица тестов
await db.execute(
"""
CREATE TABLE IF NOT EXISTS tests (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
description TEXT,
level INTEGER,
category TEXT,
csv_file TEXT,
created_date TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
is_active BOOLEAN DEFAULT TRUE
)
"""
)
# Таблица вопросов
await db.execute(
"""
CREATE TABLE IF NOT EXISTS questions (
id INTEGER PRIMARY KEY AUTOINCREMENT,
test_id INTEGER,
question TEXT NOT NULL,
option1 TEXT NOT NULL,
option2 TEXT NOT NULL,
option3 TEXT NOT NULL,
option4 TEXT NOT NULL,
correct_answer INTEGER NOT NULL,
FOREIGN KEY (test_id) REFERENCES tests (id)
)
"""
)
# Таблица результатов
await db.execute(
"""
CREATE TABLE IF NOT EXISTS results (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id INTEGER,
test_id INTEGER,
mode TEXT, -- 'guest' or 'test'
questions_asked INTEGER,
correct_answers INTEGER,
total_time INTEGER,
start_time TIMESTAMP,
end_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
score REAL,
FOREIGN KEY (user_id) REFERENCES users (user_id),
FOREIGN KEY (test_id) REFERENCES tests (id)
)
"""
)
# Таблица активных сессий
await db.execute(
"""
CREATE TABLE IF NOT EXISTS active_sessions (
user_id INTEGER PRIMARY KEY,
test_id INTEGER,
current_question INTEGER DEFAULT 0,
correct_count INTEGER DEFAULT 0,
questions_data TEXT, -- JSON с вопросами сессии
start_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
mode TEXT,
FOREIGN KEY (user_id) REFERENCES users (user_id),
FOREIGN KEY (test_id) REFERENCES tests (id)
)
"""
)
await db.commit()
logging.info("Database initialized successfully")
async def register_user(
self,
user_id: int,
username: Optional[str] = None,
first_name: Optional[str] = None,
last_name: Optional[str] = None,
language_code: str = "ru",
is_guest: bool = True,
) -> bool:
"""Регистрация нового пользователя"""
try:
async with aiosqlite.connect(self.db_path) as db:
await db.execute(
"""
INSERT OR REPLACE INTO users
(user_id, username, first_name, last_name, language_code, is_guest)
VALUES (?, ?, ?, ?, ?, ?)
""",
(user_id, username, first_name, last_name, language_code, is_guest),
)
await db.commit()
return True
except Exception as e:
logging.error(f"Error registering user {user_id}: {e}")
return False
async def get_user(self, user_id: int) -> Optional[Dict]:
"""Получение данных пользователя"""
try:
async with aiosqlite.connect(self.db_path) as db:
cursor = await db.execute(
"SELECT * FROM users WHERE user_id = ?", (user_id,)
)
row = await cursor.fetchone()
if row:
columns = [description[0] for description in cursor.description]
return dict(zip(columns, row))
return None
except Exception as e:
logging.error(f"Error getting user {user_id}: {e}")
return None
async def add_test(
self, name: str, description: str, level: int, category: str, csv_file: str
) -> Optional[int]:
"""Добавление нового теста"""
try:
async with aiosqlite.connect(self.db_path) as db:
cursor = await db.execute(
"""
INSERT INTO tests (name, description, level, category, csv_file)
VALUES (?, ?, ?, ?, ?)
""",
(name, description, level, category, csv_file),
)
await db.commit()
return cursor.lastrowid
except Exception as e:
logging.error(f"Error adding test: {e}")
return None
async def get_tests_by_category(self, category: Optional[str] = None) -> List[Dict]:
"""Получение тестов по категории"""
try:
async with aiosqlite.connect(self.db_path) as db:
if category:
cursor = await db.execute(
"SELECT * FROM tests WHERE category = ? AND is_active = TRUE ORDER BY level",
(category,),
)
else:
cursor = await db.execute(
"SELECT * FROM tests WHERE is_active = TRUE ORDER BY category, level"
)
rows = await cursor.fetchall()
columns = [description[0] for description in cursor.description]
return [dict(zip(columns, row)) for row in rows]
except Exception as e:
logging.error(f"Error getting tests: {e}")
return []
async def add_questions_to_test(self, test_id: int, questions: List[Dict]) -> bool:
"""Добавление вопросов к тесту"""
try:
async with aiosqlite.connect(self.db_path) as db:
for q in questions:
await db.execute(
"""
INSERT INTO questions
(test_id, question, option1, option2, option3, option4, correct_answer)
VALUES (?, ?, ?, ?, ?, ?, ?)
""",
(
test_id,
q["question"],
q["option1"],
q["option2"],
q["option3"],
q["option4"],
q["correct_answer"],
),
)
await db.commit()
return True
except Exception as e:
logging.error(f"Error adding questions to test {test_id}: {e}")
return False
async def get_random_questions(self, test_id: int, count: int = 10) -> List[Dict]:
"""Получение случайных вопросов из теста"""
try:
async with aiosqlite.connect(self.db_path) as db:
cursor = await db.execute(
"""
SELECT * FROM questions WHERE test_id = ?
ORDER BY RANDOM() LIMIT ?
""",
(test_id, count),
)
rows = await cursor.fetchall()
columns = [description[0] for description in cursor.description]
return [dict(zip(columns, row)) for row in rows]
except Exception as e:
logging.error(f"Error getting random questions: {e}")
return []
async def start_session(
self, user_id: int, test_id: int, questions: List[Dict], mode: str
) -> bool:
"""Начало новой сессии викторины"""
try:
async with aiosqlite.connect(self.db_path) as db:
questions_json = json.dumps(questions)
await db.execute(
"""
INSERT OR REPLACE INTO active_sessions
(user_id, test_id, questions_data, mode)
VALUES (?, ?, ?, ?)
""",
(user_id, test_id, questions_json, mode),
)
await db.commit()
return True
except Exception as e:
logging.error(f"Error starting session: {e}")
return False
async def get_active_session(self, user_id: int) -> Optional[Dict]:
"""Получение активной сессии пользователя"""
try:
async with aiosqlite.connect(self.db_path) as db:
cursor = await db.execute(
"SELECT * FROM active_sessions WHERE user_id = ?", (user_id,)
)
row = await cursor.fetchone()
if row:
columns = [description[0] for description in cursor.description]
session = dict(zip(columns, row))
session["questions_data"] = json.loads(session["questions_data"])
return session
return None
except Exception as e:
logging.error(f"Error getting active session: {e}")
return None
async def update_session_progress(
self, user_id: int, question_num: int, correct_count: int
) -> bool:
"""Обновление прогресса сессии"""
try:
async with aiosqlite.connect(self.db_path) as db:
await db.execute(
"""
UPDATE active_sessions
SET current_question = ?, correct_count = ?
WHERE user_id = ?
""",
(question_num, correct_count, user_id),
)
await db.commit()
return True
except Exception as e:
logging.error(f"Error updating session progress: {e}")
return False
async def update_session_questions(
self, user_id: int, questions_data: list
) -> bool:
"""Обновление данных вопросов в сессии (например, после перемешивания)"""
try:
async with aiosqlite.connect(self.db_path) as db:
questions_json = json.dumps(questions_data, ensure_ascii=False)
await db.execute(
"""
UPDATE active_sessions
SET questions_data = ?
WHERE user_id = ?
""",
(questions_json, user_id),
)
await db.commit()
return True
except Exception as e:
logging.error(f"Error updating session questions: {e}")
return False
async def finish_session(self, user_id: int, score: float) -> bool:
"""Завершение сессии и сохранение результатов"""
try:
async with aiosqlite.connect(self.db_path) as db:
# Получаем данные сессии
session = await self.get_active_session(user_id)
if not session:
return False
# Сохраняем результат
await db.execute(
"""
INSERT INTO results
(user_id, test_id, mode, questions_asked, correct_answers, score)
VALUES (?, ?, ?, ?, ?, ?)
""",
(
user_id,
session["test_id"],
session["mode"],
len(session["questions_data"]),
session["correct_count"],
score,
),
)
# Обновляем статистику пользователя
await db.execute(
"""
UPDATE users
SET total_questions = total_questions + ?,
correct_answers = correct_answers + ?
WHERE user_id = ?
""",
(len(session["questions_data"]), session["correct_count"], user_id),
)
# Удаляем активную сессию
await db.execute(
"DELETE FROM active_sessions WHERE user_id = ?", (user_id,)
)
await db.commit()
return True
except Exception as e:
logging.error(f"Error finishing session: {e}")
return False
async def get_user_stats(self, user_id: int) -> Optional[Dict]:
"""Получение статистики пользователя"""
try:
async with aiosqlite.connect(self.db_path) as db:
cursor = await db.execute(
"""
SELECT
u.total_questions,
u.correct_answers,
COUNT(r.id) as sessions_completed,
MAX(r.score) as best_score,
AVG(r.score) as average_score,
COUNT(CASE WHEN r.mode = 'guest' THEN 1 END) as guest_sessions,
COUNT(CASE WHEN r.mode = 'test' THEN 1 END) as test_sessions
FROM users u
LEFT JOIN results r ON u.user_id = r.user_id
WHERE u.user_id = ?
GROUP BY u.user_id
""",
(user_id,),
)
row = await cursor.fetchone()
if row:
columns = [description[0] for description in cursor.description]
stats = dict(zip(columns, row))
return stats
return None
except Exception as e:
logging.error(f"Error getting user stats: {e}")
return None
async def get_recent_results(self, user_id: int, limit: int = 5) -> List[Dict]:
"""Получение последних результатов пользователя"""
try:
async with aiosqlite.connect(self.db_path) as db:
cursor = await db.execute(
"""
SELECT
r.mode,
r.questions_asked,
r.correct_answers,
r.score,
r.end_time,
t.name as test_name,
t.level
FROM results r
LEFT JOIN tests t ON r.test_id = t.id
WHERE r.user_id = ?
ORDER BY r.end_time DESC
LIMIT ?
""",
(user_id, limit),
)
rows = await cursor.fetchall()
columns = [description[0] for description in cursor.description]
return [dict(zip(columns, row)) for row in rows]
except Exception as e:
logging.error(f"Error getting recent results: {e}")
return []
async def get_category_stats(self, user_id: int) -> List[Dict]:
"""Получение статистики по категориям"""
try:
async with aiosqlite.connect(self.db_path) as db:
cursor = await db.execute(
"""
SELECT
t.category,
COUNT(r.id) as attempts,
AVG(r.score) as avg_score,
MAX(r.score) as best_score,
SUM(r.questions_asked) as total_questions,
SUM(r.correct_answers) as correct_answers
FROM results r
JOIN tests t ON r.test_id = t.id
WHERE r.user_id = ?
GROUP BY t.category
ORDER BY attempts DESC
""",
(user_id,),
)
rows = await cursor.fetchall()
columns = [description[0] for description in cursor.description]
return [dict(zip(columns, row)) for row in rows]
except Exception as e:
logging.error(f"Error getting category stats: {e}")
return []