Files
Touchh/bot/operations/statistics.py

206 lines
8.5 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from datetime import datetime, timedelta, timezone
from telegram import InlineKeyboardButton, InlineKeyboardMarkup, Update
from telegram.ext import ContextTypes
from asgiref.sync import sync_to_async
from hotels.models import Reservation, Hotel
from users.models import User
from bot.utils.pdf_report import generate_pdf_report
from bot.utils.database import get_hotels_for_user, get_hotel_by_name
from datetime import datetime
from django.utils.timezone import make_aware, is_aware, is_naive
import os
import traceback
async def statistics(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Вывод списка отелей для статистики."""
query = update.callback_query
user_id = query.from_user.id
await query.answer()
# Получаем пользователя
user = await sync_to_async(User.objects.filter(chat_id=user_id).first)()
if not user:
await query.edit_message_text("Вы не зарегистрированы в системе.")
return
# Получаем отели, связанные с пользователем
user_hotels = await get_hotels_for_user(user)
if not user_hotels:
await query.edit_message_text("У вас нет доступных отелей для статистики.")
return
# Формируем кнопки для выбора отеля
keyboard = [
[InlineKeyboardButton(hotel.hotel.name, callback_data=f"stats_hotel_{hotel.hotel.id}")]
for hotel in user_hotels
]
keyboard.append([InlineKeyboardButton("🏠 Главная", callback_data="main_menu")])
reply_markup = InlineKeyboardMarkup(keyboard)
await query.edit_message_text("Выберите отель:", reply_markup=reply_markup)
async def stats_select_period(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Выбор периода времени для статистики."""
query = update.callback_query
await query.answer()
hotel_id = int(query.data.split("_")[2])
context.user_data["selected_hotel"] = hotel_id
keyboard = [
[InlineKeyboardButton("День", callback_data="stats_period_day")],
[InlineKeyboardButton("Неделя", callback_data="stats_period_week")],
[InlineKeyboardButton("Месяц", callback_data="stats_period_month")],
[InlineKeyboardButton("Все время", callback_data="stats_period_all")],
[InlineKeyboardButton("🏠 Главная", callback_data="main_menu")],
[InlineKeyboardButton("🔙 Назад", callback_data="statistics")],
]
reply_markup = InlineKeyboardMarkup(keyboard)
await query.edit_message_text("Выберите период времени:", reply_markup=reply_markup)
def ensure_datetime(value):
# print(f"statistics.py [DEBUG] ensure_datetime: Received value: {value} ({type(value)})")
"""
Ensure that the given value is a timezone-aware datetime object.
If the given value is a string, it is assumed to be in the format
'%Y-%m-%d %H:%M:%S'. If the given value is a naive datetime object,
it is converted to a timezone-aware datetime object using
django.utils.timezone.make_aware.
:param value: The value to be converted
:type value: str or datetime
:return: A timezone-aware datetime object
:rtype: datetime
"""
if isinstance(value, str):
value = datetime.strptime(value, '%Y-%m-%d %H:%M:%S')
if isinstance(value, datetime) and is_naive(value):
value = make_aware(value)
# print(f"statistics.py [DEBUG] ensure_datetime: Returning value: {value} ({type(value)})")
return value
async def generate_statistics(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Генерация и отправка статистики."""
query = update.callback_query
await query.answer()
try:
hotel_id = context.user_data.get("selected_hotel")
if not hotel_id:
raise ValueError(f"ID отеля не найден в user_data: {context.user_data}")
period = query.data.split("_")[2]
now = datetime.utcnow().replace(tzinfo=timezone.utc)
print(type(now))
print(type(period))
start_date, end_date = get_period_dates(period, now)
try:
# Получаем бронирования
reservations = await sync_to_async(list)(
Reservation.objects.filter(
hotel_id=hotel_id,
check_in__gte=start_date,
check_in__lte=end_date
).select_related('hotel')
)
except Exception as e:
raise RuntimeError(f"statistics.py Ошибка при выборке бронирований: {e}") from e
if not reservations:
await query.edit_message_text("statistics.py Нет данных для статистики за выбранный период.")
return
try:
# Получаем данные об отеле
hotel = await sync_to_async(Hotel.objects.get)(id=hotel_id)
except Hotel.DoesNotExist:
raise RuntimeError(f"statistics.py Отель с ID {hotel_id} не найден")
except Exception as e:
raise RuntimeError(f"statistics.py Ошибка при выборке отеля: {e}") from e
try:
# Генерация отчета
file_path = await generate_pdf_report(hotel.name, reservations, start_date, end_date)
except Exception as e:
raise RuntimeError(f"statistics.py [ERROR] Ошибка при генерации PDF-отчета: {e}") from e
try:
# Отправка файла через Telegram
with open(file_path, "rb") as file:
await query.message.reply_document(document=file, filename=f"{hotel.name}_report.pdf")
except Exception as e:
raise RuntimeError(f"Ошибка при отправке PDF-файла: {e}") from e
# Удаляем временный файл
if os.path.exists(file_path):
os.remove(file_path)
except Exception as e:
# Логируем стек вызовов для детального анализа
error_trace = traceback.format_exc()
await query.edit_message_text(f"Произошла ошибка: {str(e)}")
def get_period_dates(period, now):
if period == "day":
start_date = (now - timedelta(days=1)).replace(hour=0, minute=0, second=0, microsecond=0)
end_date = now.replace(hour=23, minute=59, second=59, microsecond=999999)
elif period == "week":
start_date = (now - timedelta(days=7)).replace(hour=0, minute=0, second=0, microsecond=0)
end_date = now.replace(hour=23, minute=59, second=59, microsecond=999999)
elif period == "month":
start_date = (now - timedelta(days=30)).replace(hour=0, minute=0, second=0, microsecond=0)
end_date = now.replace(hour=23, minute=59, second=59, microsecond=999999)
elif period == "all":
start_date = (now - timedelta(days=1500)).replace(hour=0, minute=0, second=0, microsecond=0)
end_date = now.replace(hour=23, minute=59, second=59, microsecond=999999)
else:
start_date = (now - timedelta(days=1500)).replace(hour=0, minute=0, second=0, microsecond=0)
end_date = now.replace(hour=23, minute=59, second=59, microsecond=999999)
return start_date, end_date
async def stats_back(update: Update, context):
"""Возврат к выбору отеля."""
query = update.callback_query
await query.answer()
# Получаем отели, связанные с пользователем
user_id = query.from_user.id
user = await sync_to_async(User.objects.filter(chat_id=user_id).first)()
if not user:
await query.edit_message_text("Ошибка: Пользователь не найден.")
return
hotels = await get_hotels_for_user(user)
if not hotels:
await query.edit_message_text("У вас нет доступных отелей.")
return
# Формируем кнопки для выбора отеля
keyboard = [
[InlineKeyboardButton(hotel.name, callback_data=f"stats_hotel_{hotel.id}")]
for hotel in hotels
]
keyboard.append([InlineKeyboardButton("🏠 Главная", callback_data="main_menu")])
reply_markup = InlineKeyboardMarkup(keyboard)
await query.edit_message_text("Выберите отель:", reply_markup=reply_markup)