Files
Touchh/bot/operations/statistics.py
2024-12-27 14:47:04 +09:00

203 lines
9.4 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from datetime import datetime, timedelta, timezone
from telegram import InlineKeyboardButton, InlineKeyboardMarkup, Update
from telegram.ext import ContextTypes
from asgiref.sync import sync_to_async
from hotels.models import Reservation, Hotel
from users.models import User
import pytz
from pytz import timezone
from bot.utils.pdf_report import generate_pdf_report
from bot.utils.database import get_hotels_for_user, get_hotel_by_name
from datetime import datetime
from django.utils.timezone import make_aware, is_aware, is_naive
import os
import traceback
import logging
from touchh.utils.log import CustomLogger
from django.utils.timezone import localtime
from ..utils.date_utils import ensure_datetime
logger = CustomLogger(name="Statistics.py", log_level="DEBUG").get_logger()
async def statistics(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Вывод списка отелей для статистики."""
query = update.callback_query
user_id = query.from_user.id
await query.answer()
# Получаем пользователя
user = await sync_to_async(User.objects.filter(chat_id=user_id).first)()
if not user:
await query.edit_message_text("Вы не зарегистрированы в системе.")
return
# Получаем отели, связанные с пользователем
user_hotels = await get_hotels_for_user(user)
if not user_hotels:
await query.edit_message_text("У вас нет доступных отелей для статистики.")
return
# Формируем кнопки для выбора отеля
keyboard = [
[InlineKeyboardButton(f'🏨 {hotel.hotel.name}', callback_data=f"stats_hotel_{hotel.hotel.id}")]
for hotel in user_hotels
]
keyboard.append([InlineKeyboardButton("🏠 Главная", callback_data="main_menu")])
reply_markup = InlineKeyboardMarkup(keyboard)
await query.edit_message_text("Выберите отель:", reply_markup=reply_markup)
async def stats_select_period(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Выбор периода времени для статистики."""
query = update.callback_query
await query.answer()
hotel_id = int(query.data.split("_")[2])
context.user_data["selected_hotel"] = hotel_id
keyboard = [
[InlineKeyboardButton("Сегодня", callback_data="stats_period_today")],
[InlineKeyboardButton("Вчера", callback_data="stats_period_yesterday")],
[InlineKeyboardButton("Неделя", callback_data="stats_period_week")],
[InlineKeyboardButton("Этот месяц", callback_data="stats_period_thismonth")],
[InlineKeyboardButton("Прошлый месяц", callback_data="stats_period_lastmonth")],
[InlineKeyboardButton("Этот год", callback_data="stats_period_thisyear")],
[InlineKeyboardButton("Прошлый год", callback_data="stats_period_lastyear")],
[InlineKeyboardButton("🏠 Главная", callback_data="main_menu")],
[InlineKeyboardButton("🔙 Назад", callback_data="statistics")],
]
reply_markup = InlineKeyboardMarkup(keyboard)
await query.edit_message_text("Выберите период времени:", reply_markup=reply_markup)
async def generate_statistics(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Генерация и отправка статистики."""
query = update.callback_query
await query.answer()
try:
hotel_id = context.user_data.get("selected_hotel")
if not hotel_id:
raise ValueError("ID отеля не найден в user_data")
period = query.data.split("_")[2]
now = ensure_datetime(datetime.utcnow())
start_date, end_date = get_period_dates(period, now)
print(type(start_date), type(end_date))
reservations = await sync_to_async(list)(
Reservation.objects.filter(
hotel_id=hotel_id,
check_in__gte=start_date,
check_in__lte=end_date
).select_related('hotel')
)
if not reservations:
await query.edit_message_text(f"Нет данных для статистики за выбранный период.{start_date} - {end_date}")
return
hotel = await sync_to_async(Hotel.objects.get)(id=hotel_id)
file_path = await generate_pdf_report(hotel.name, reservations, start_date, end_date)
with open(file_path, "rb") as file:
await query.message.reply_document(document=file, filename=f"{hotel.name}_report.pdf")
if os.path.exists(file_path):
os.remove(file_path)
except Exception as e:
logging.error(f"Ошибка в generate_statistics: {e}", exc_info=True)
await query.edit_message_text(f"Произошла ошибка: {e}")
def get_period_dates(period, now=None):
"""
Возвращает диапазон дат (start_date, end_date) для заданного периода.
:param period: Период (строка: 'today', 'yesterday', 'last_week', 'last_month').
:param now: Текущая дата/время (опционально).
:return: Кортеж (start_date, end_date).
:raises: ValueError, если период не поддерживается.
"""
if now is None:
now = datetime.now(pytz.UTC)
else:
now = ensure_datetime(now) # Приведение now к timezone-aware
if period == "today":
start_date = now.replace(hour=0, minute=0, second=0, microsecond=0)
end_date = now.replace(hour=23, minute=59, second=59, microsecond=999999)
elif period == "yesterday":
start_date = (now - timedelta(days=1)).replace(hour=0, minute=0, second=0, microsecond=0)
end_date = (now - timedelta(days=1)).replace(hour=23, minute=59, second=59, microsecond=999999)
elif period == "week":
# Последняя неделя: с понедельника предыдущей недели до воскресенья
start_date = (now - timedelta(days=now.weekday() + 7)).replace(hour=0, minute=0, second=0, microsecond=0)
end_date = (start_date + timedelta(days=6)).replace(hour=23, minute=59, second=59, microsecond=999999)
elif period == "month":
# Текущий месяц: с первого дня месяца до текущей даты
start_date = now.replace(day=1, hour=0, minute=0, second=0, microsecond=0)
end_date = now.replace(hour=23, minute=59, second=59, microsecond=999999)
elif period == "lastmonth":
# Последний месяц: с первого дня прошлого месяца до последнего дня прошлого месяца
first_day_of_current_month = now.replace(day=1, hour=0, minute=0, second=0, microsecond=0)
last_day_of_previous_month = first_day_of_current_month - timedelta(days=1)
start_date = last_day_of_previous_month.replace(day=1, hour=0, minute=0, second=0, microsecond=0)
end_date = last_day_of_previous_month.replace(hour=23, minute=59, second=59, microsecond=999999)
elif period == "year":
# Текущий год: с первого дня года до текущей даты
start_date = now.replace(month=1, day=1, hour=0, minute=0, second=0, microsecond=0)
end_date = now.replace(hour=23, minute=59, second=59, microsecond=999999)
elif period == "lastyear":
# Последний год: с 1 января предыдущего года до 31 декабря предыдущего года
start_date = now.replace(year=now.year - 1, month=1, day=1, hour=0, minute=0, second=0, microsecond=0)
end_date = now.replace(year=now.year - 1, month=12, day=31, hour=23, minute=59, second=59, microsecond=999999)
else:
raise ValueError(f"Неподдерживаемый период: {period}")
# Приводим start_date и end_date к timezone-aware, если это не так
if start_date.tzinfo is None:
start_date = start_date.replace(tzinfo=pytz.UTC)
if end_date.tzinfo is None:
end_date = end_date.replace(tzinfo=pytz.UTC)
# Приведение дат к локальному времени
return localtime(start_date), localtime(end_date)
async def stats_back(update: Update, context):
"""Возврат к выбору отеля."""
query = update.callback_query
await query.answer()
# Получаем отели, связанные с пользователем
user_id = query.from_user.id
user = await sync_to_async(User.objects.filter(chat_id=user_id).first)()
if not user:
await query.edit_message_text("Ошибка: Пользователь не найден.")
return
hotels = await get_hotels_for_user(user)
if not hotels:
await query.edit_message_text("У вас нет доступных отелей.")
return
# Формируем кнопки для выбора отеля
keyboard = [
[InlineKeyboardButton(hotel.name, callback_data=f"stats_hotel_{hotel.id}")]
for hotel in hotels
]
keyboard.append([InlineKeyboardButton("🏠 Главная", callback_data="main_menu")])
reply_markup = InlineKeyboardMarkup(keyboard)
await query.edit_message_text("Выберите отель:", reply_markup=reply_markup)