Files
Touchh/bot/handlers.py
2024-12-07 13:04:46 +09:00

604 lines
26 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from telegram import InlineKeyboardButton, InlineKeyboardMarkup, Update
from telegram.ext import ContextTypes
from users.models import User
from hotels.models import Hotel, UserHotel, Reservation
from users.models import NotificationSettings
from asgiref.sync import sync_to_async
import smtplib
from hotels.models import PMSIntegrationLog
import requests
from email.mime.text import MIMEText
from django.core.mail import send_mail
from datetime import datetime, timedelta
from fpdf import FPDF
import os
# --- Вспомогательные функции ---
async def get_user_from_chat_id(chat_id):
"""Получение пользователя из базы по chat_id."""
return await sync_to_async(User.objects.filter(chat_id=chat_id).first)()
async def get_hotel_by_id(hotel_id):
"""Получение отеля по ID."""
return await sync_to_async(Hotel.objects.filter(id=hotel_id).first)()
# --- Обработчики команд ---
async def start(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Обработчик команды /start с проверкой chat_id"""
user_id = update.message.from_user.id
print(f"Пользователь {user_id} вызвал команду /start")
user = await get_user_from_chat_id(user_id)
if user:
keyboard = [
[InlineKeyboardButton("📊 Статистика", callback_data="stats")],
[InlineKeyboardButton("🏨 Управление отелями", callback_data="manage_hotels")],
[InlineKeyboardButton("👤 Пользователи", callback_data="manage_users")],
[InlineKeyboardButton("⚙️ Настройки", callback_data="settings")],
]
reply_markup = InlineKeyboardMarkup(keyboard)
await update.message.reply_text("Выберите действие:", reply_markup=reply_markup)
else:
print(f"Пользователь {user_id} не зарегистрирован.")
await update.message.reply_text("Вы не зарегистрированы в системе. Обратитесь к администратору.")
async def manage_hotels(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Вывод списка отелей, связанных с пользователем"""
query = update.callback_query
await query.answer()
user_id = query.from_user.id
print(f"Пользователь {user_id} выбрал 'Управление отелями'")
user = await get_user_from_chat_id(user_id)
if not user:
print(f"Пользователь {user_id} не зарегистрирован.")
await query.edit_message_text("Вы не зарегистрированы в системе.")
return
user_hotels = await sync_to_async(list)(
UserHotel.objects.filter(user=user).select_related("hotel")
)
if not user_hotels:
print(f"У пользователя {user_id} нет связанных отелей.")
await query.edit_message_text("У вас нет связанных отелей.")
return
keyboard = [
[InlineKeyboardButton(f"🏨 {hotel.hotel.name}", callback_data=f"hotel_{hotel.hotel.id}")]
for hotel in user_hotels
]
reply_markup = InlineKeyboardMarkup(keyboard)
await query.edit_message_text("Выберите отель:", reply_markup=reply_markup)
# --- Обработчики кнопок ---
async def handle_button_click(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Обработчик всех нажатий кнопок"""
query = update.callback_query
await query.answer()
if query.data == "stats":
await statistics(update, context) # Добавляем вызов функции для обработки статистики
elif query.data.startswith("stats_hotel_"):
await stats_select_period(update, context)
elif query.data.startswith("stats_period_"):
await generate_statistics(update, context)
elif query.data == "manage_hotels":
await manage_hotels(update, context)
elif query.data.startswith("hotel_"):
await hotel_actions(update, context)
elif query.data.startswith("delete_hotel_"):
await delete_hotel(update, context)
elif query.data.startswith("check_pms_"):
await check_pms(update, context)
elif query.data.startswith("setup_rooms_"):
await setup_rooms(update, context)
elif query.data == "settings":
await settings_menu(update, context)
elif query.data == "toggle_telegram":
await toggle_telegram(update, context)
elif query.data == "toggle_email":
await toggle_email(update, context)
elif query.data == "set_notification_time":
await set_notification_time(update, context)
elif query.data == "current_settings":
await show_current_settings(update, context)
else:
print(f"Неизвестный callback_data: {query.data}")
await query.edit_message_text("Команда не распознана.")
async def hotel_actions(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Обработчик действий для выбранного отеля"""
query = update.callback_query
await query.answer()
hotel_id = int(query.data.split("_")[1])
print(f"Пользователь {query.from_user.id} выбрал отель с ID {hotel_id}")
hotel = await get_hotel_by_id(hotel_id)
if not hotel:
print(f"Отель с ID {hotel_id} не найден.")
await query.edit_message_text("Отель не найден.")
return
keyboard = [
[InlineKeyboardButton("🗑️ Удалить отель", callback_data=f"delete_hotel_{hotel_id}")],
[InlineKeyboardButton("🔗 Проверить интеграцию с PMS", callback_data=f"check_pms_{hotel_id}")],
[InlineKeyboardButton("🛏️ Настроить номера", callback_data=f"setup_rooms_{hotel_id}")],
]
reply_markup = InlineKeyboardMarkup(keyboard)
await query.edit_message_text(f"Управление отелем: {hotel.name}", reply_markup=reply_markup)
async def delete_hotel(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Удаление отеля"""
query = update.callback_query
await query.answer()
hotel_id = int(query.data.split("_")[2])
print(f"Пользователь {query.from_user.id} выбрал удаление отеля с ID {hotel_id}")
hotel = await get_hotel_by_id(hotel_id)
if hotel:
hotel_name = hotel.name
await sync_to_async(hotel.delete)()
print(f"Отель {hotel_name} удалён.")
await query.edit_message_text(f"Отель {hotel_name} успешно удалён.")
else:
print(f"Отель с ID {hotel_id} не найден.")
await query.edit_message_text("Отель не найден.")
async def check_pms(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Проверить интеграцию с PMS"""
query = update.callback_query
await query.answer()
hotel_id = int(query.data.split("_")[2])
print(f"Пользователь {query.from_user.id} проверяет интеграцию PMS для отеля с ID {hotel_id}")
# Асинхронно получаем отель
hotel = await sync_to_async(Hotel.objects.select_related('api', 'pms').get)(id=hotel_id)
if not hotel:
print(f"Отель с ID {hotel_id} не найден.")
await query.edit_message_text("Отель не найден.")
return
# Асинхронно извлекаем связанные данные
api_name = hotel.api.name if hotel.api else "Не настроен"
pms_name = hotel.pms.name if hotel.pms else "Не указана"
# Формируем сообщение
status_message = (
f"Отель: {hotel.name}\n"
f"PMS система: {pms_name}\n"
f"API: {api_name}"
)
await query.edit_message_text(status_message)
async def setup_rooms(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Настроить номера отеля"""
query = update.callback_query
await query.answer()
hotel_id = int(query.data.split("_")[2])
print(f"Пользователь {query.from_user.id} настраивает номера для отеля с ID {hotel_id}")
hotel = await get_hotel_by_id(hotel_id)
if not hotel:
print(f"Отель с ID {hotel_id} не найден.")
await query.edit_message_text("Отель не найден.")
return
await query.edit_message_text(f"Настройка номеров для отеля: {hotel.name}")
async def settings_menu(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Меню настроек уведомлений."""
query = update.callback_query
await query.answer()
user_id = query.from_user.id
user = await get_user_from_chat_id(user_id)
if not user:
await query.edit_message_text("Вы не зарегистрированы.")
return
settings, _ = await sync_to_async(NotificationSettings.objects.get_or_create)(user=user)
telegram_status = "" if settings.telegram_enabled else ""
email_status = "" if settings.email_enabled else ""
keyboard = [
[InlineKeyboardButton(f"{telegram_status} Уведомления в Telegram", callback_data="toggle_telegram")],
[InlineKeyboardButton(f"{email_status} Уведомления по Email", callback_data="toggle_email")],
[InlineKeyboardButton("🕒 Настроить время уведомлений", callback_data="set_notification_time")],
[InlineKeyboardButton("📋 Показать текущие настройки", callback_data="current_settings")],
]
reply_markup = InlineKeyboardMarkup(keyboard)
await query.edit_message_text("Настройки уведомлений:", reply_markup=reply_markup)
async def toggle_telegram(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Переключение состояния Telegram-уведомлений."""
query = update.callback_query
await query.answer()
user_id = query.from_user.id
user = await get_user_from_chat_id(user_id)
if not user:
await query.edit_message_text("Вы не зарегистрированы.")
return
settings, _ = await sync_to_async(NotificationSettings.objects.get_or_create)(user=user)
settings.telegram_enabled = not settings.telegram_enabled
await sync_to_async(settings.save)()
print(f"Пользователь {user_id} переключил Telegram-уведомления: {settings.telegram_enabled}")
await settings_menu(update, context)
async def toggle_email(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Переключение состояния Email-уведомлений."""
query = update.callback_query
await query.answer()
user_id = query.from_user.id
user = await get_user_from_chat_id(user_id)
if not user:
await query.edit_message_text("Вы не зарегистрированы.")
return
settings, _ = await sync_to_async(NotificationSettings.objects.get_or_create)(user=user)
settings.email_enabled = not settings.email_enabled
await sync_to_async(settings.save)()
print(f"Пользователь {user_id} переключил Email-уведомления: {settings.email_enabled}")
await settings_menu(update, context)
async def set_notification_time(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Настройка времени уведомлений."""
query = update.callback_query
await query.answer()
user_id = query.from_user.id
user = await get_user_from_chat_id(user_id)
if not user:
await query.edit_message_text("Вы не зарегистрированы.")
return
await query.edit_message_text("Введите новое время для уведомлений в формате HH:MM (например, 08:30):")
context.user_data["set_time"] = True
print(f"Пользователь {user_id} настраивает время уведомлений.")
async def handle_notification_time(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Обработка ввода времени уведомлений."""
if context.user_data.get("set_time"):
user_id = update.message.from_user.id
new_time = update.message.text
try:
hour, minute = map(int, new_time.split(":"))
user = await get_user_from_chat_id(user_id)
if user:
settings, _ = await sync_to_async(NotificationSettings.objects.get_or_create)(user=user)
settings.notification_time = f"{hour:02}:{minute:02}"
await sync_to_async(settings.save)()
print(f"Пользователь {user_id} установил новое время уведомлений: {new_time}")
await update.message.reply_text(f"Время уведомлений обновлено на {new_time}.")
except ValueError:
print(f"Пользователь {user_id} ввёл некорректное время: {new_time}")
await update.message.reply_text("Неверный формат. Введите время в формате HH:MM.")
finally:
context.user_data["set_time"] = False
async def settings_menu(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Меню настроек уведомлений."""
query = update.callback_query
await query.answer()
user_id = query.from_user.id
user = await get_user_from_chat_id(user_id)
if not user:
await query.edit_message_text("Вы не зарегистрированы.")
return
settings, _ = await sync_to_async(NotificationSettings.objects.get_or_create)(user=user)
telegram_status = "" if settings.telegram_enabled else ""
email_status = "" if settings.email_enabled else ""
keyboard = [
[InlineKeyboardButton(f"{telegram_status} Уведомления в Telegram", callback_data="toggle_telegram")],
[InlineKeyboardButton(f"{email_status} Уведомления по Email", callback_data="toggle_email")],
[InlineKeyboardButton("🕒 Настроить время уведомлений", callback_data="set_notification_time")],
]
reply_markup = InlineKeyboardMarkup(keyboard)
await query.edit_message_text("Настройки уведомлений:", reply_markup=reply_markup)
async def send_telegram_notification(user, message):
"""Отправка уведомления через Telegram."""
# bot = Bot(token="ВАШ_ТОКЕН")
if user.chat_id:
try:
await bot.send_message(chat_id=user.chat_id, text=message)
print(f"Telegram-уведомление отправлено пользователю {user.chat_id}: {message}")
except Exception as e:
print(f"Ошибка отправки Telegram-уведомления пользователю {user.chat_id}: {e}")
def send_email_notification(user, message):
"""Отправка уведомления через Email."""
if user.email:
try:
send_mail(
subject="Уведомление от системы",
message=message,
from_email="noreply@yourdomain.com",
recipient_list=[user.email],
fail_silently=False,
)
print(f"Email-уведомление отправлено на {user.email}: {message}")
except Exception as e:
print(f"Ошибка отправки Email-уведомления пользователю {user.email}: {e}")
async def schedule_notifications():
"""Планировщик уведомлений."""
print("Запуск планировщика уведомлений...")
now = datetime.now().strftime("%H:%M")
users = await sync_to_async(list)(User.objects.all())
for user in users:
settings, _ = await sync_to_async(NotificationSettings.objects.get_or_create)(user=user)
if settings.notification_time == now:
message = "Это ваше уведомление от системы."
if settings.telegram_enabled:
await send_telegram_notification(user, message)
if settings.email_enabled:
send_email_notification(user, message)
async def show_current_settings(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Отображение текущих настроек уведомлений."""
query = update.callback_query
await query.answer()
user_id = query.from_user.id
user = await get_user_from_chat_id(user_id)
if not user:
await query.edit_message_text("Вы не зарегистрированы.")
return
settings, _ = await sync_to_async(NotificationSettings.objects.get_or_create)(user=user)
telegram_status = "✅ Включены" if settings.telegram_enabled else "❌ Выключены"
email_status = "✅ Включены" if settings.email_enabled else "❌ Выключены"
notification_time = settings.notification_time or "Не установлено"
message = (
f"📋 Ваши настройки уведомлений:\n"
f"🔔 Telegram: {telegram_status}\n"
f"📧 Email: {email_status}\n"
f"🕒 Время: {notification_time}"
)
await query.edit_message_text(message)
async def check_pms_integration(update: Update, context: ContextTypes.DEFAULT_TYPE):
query = update.callback_query
await query.answer()
hotel_id = int(query.data.split("_")[2])
hotel = await sync_to_async(Hotel.objects.get)(id=hotel_id)
pms_settings = hotel.pms.parser_settings # Настройки из связанной PMSConfiguration
try:
# Выполняем запрос к PMS
response = requests.post(
url=pms_settings["url"],
headers={
"Authorization": f"Bearer {hotel.api.api_key}",
"Content-Type": "application/json",
},
json={
"from": "2024-01-01T00:00:00Z",
"until": "2024-01-10T00:00:00Z",
"pagination": {"from": 0, "count": 10},
},
)
# Проверяем результат
if response.status_code == 200:
await sync_to_async(PMSIntegrationLog.objects.create)(
hotel=hotel,
status="success",
message="Интеграция успешно проверена.",
)
await query.edit_message_text(f"Интеграция с PMS для отеля '{hotel.name}' успешна.")
else:
await sync_to_async(PMSIntegrationLog.objects.create)(
hotel=hotel,
status="error",
message=f"Ошибка: {response.status_code}",
)
await query.edit_message_text(f"Ошибка интеграции с PMS для отеля '{hotel.name}': {response.status_code}")
except Exception as e:
await sync_to_async(PMSIntegrationLog.objects.create)(
hotel=hotel,
status="error",
message=str(e),
)
await query.edit_message_text(f"Произошла ошибка: {str(e)}")
async def get_hotels_for_user(user_id):
"""Получение отелей, связанных с пользователем."""
user = await sync_to_async(User.objects.filter(chat_id=user_id).first)()
if not user:
return []
return await sync_to_async(list)(
Hotel.objects.filter(userhotel__user=user).distinct()
)
async def get_reservations(hotel_id, start_date=None, end_date=None):
"""Получение статистики бронирований по отелю с гостями."""
query = Reservation.objects.filter(hotel_id=hotel_id)
if start_date:
query = query.filter(check_in__gte=start_date)
if end_date:
query = query.filter(check_out__lte=end_date)
reservations = await sync_to_async(list)(query.prefetch_related('guests'))
return reservations
def generate_pdf_report(hotel_name, reservations, start_date, end_date):
"""Генерация PDF отчета."""
pdf = FPDF()
pdf.add_page()
# Укажите путь к шрифту
font_path = os.path.join("bot", "fonts", "OpenSans-Regular.ttf")
if not os.path.exists(font_path):
raise FileNotFoundError(f"Шрифт {font_path} не найден. Убедитесь, что он находится в указанной папке.")
pdf.add_font("OpenSans", "", font_path, uni=True)
pdf.set_font("OpenSans", size=12)
# Заголовки
title = f"Отчет по заселениям: {hotel_name}"
period = (
f"Период: {start_date.strftime('%d.%m.%Y')} - {end_date.strftime('%d.%m.%Y')}"
if start_date and end_date
else "Период: Все время"
)
pdf.cell(0, 10, txt=title, ln=True, align="C")
pdf.cell(0, 10, txt=period, ln=True, align="C")
pdf.ln(10)
# Ширины колонок
page_width = pdf.w - 20 # Учитываем отступы
col_widths = [page_width * 0.2, # Дата заезда
page_width * 0.2, # Дата выезда
page_width * 0.15, # Номер
page_width * 0.25, # Тип комнаты
page_width * 0.1, # Цена
page_width * 0.1] # Скидка
# Заголовки таблицы
pdf.set_font("OpenSans", size=10)
headers = ["Дата заезда", "Дата выезда", "Номер", "Тип комнаты", "Цена", "Скидка"]
for width, header in zip(col_widths, headers):
pdf.cell(width, 10, header, border=1, align="C")
pdf.ln()
# Инициализация сумм
total_price = 0
total_discount = 0
# Добавление данных
for res in reservations:
guests = ", ".join([guest.name for guest in res.guests.all()])
price = res.price or 0
discount = res.discount or 0
total_price += price
total_discount += discount
pdf.cell(col_widths[0], 10, res.check_in.strftime('%d.%m.%Y'), border=1)
pdf.cell(col_widths[1], 10, res.check_out.strftime('%d.%m.%Y'), border=1)
pdf.cell(col_widths[2], 10, res.room_number, border=1)
pdf.cell(col_widths[3], 10, res.room_type, border=1)
pdf.cell(col_widths[4], 10, f"{price:.2f}", border=1, align="R")
pdf.cell(col_widths[5], 10, f"{discount:.2f}", border=1, align="R")
pdf.ln()
pdf.set_font("OpenSans", size=8)
# pdf.multi_cell(page_width, 5, f"Гости: {guests}", border=0)
pdf.set_font("OpenSans", size=10)
# Итоги
pdf.ln(5)
pdf.set_font("OpenSans", size=12)
pdf.cell(0, 10, "Итоги:", ln=True)
pdf.cell(0, 10, f"Общая сумма цен: {total_price:.2f}", ln=True)
pdf.cell(0, 10, f"Общая сумма скидок: {total_discount:.2f}", ln=True)
# Сохранение файла
file_path = os.path.join("reports", f"{hotel_name}_report.pdf")
os.makedirs(os.path.dirname(file_path), exist_ok=True)
pdf.output(file_path)
return file_path
# --- Обработчики ---
async def statistics(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Вывод списка отелей для статистики."""
query = update.callback_query
user_id = query.from_user.id
await query.answer()
hotels = await get_hotels_for_user(user_id)
if not hotels:
await query.edit_message_text("У вас нет доступных отелей для статистики.")
return
keyboard = [[InlineKeyboardButton(hotel.name, callback_data=f"stats_hotel_{hotel.id}")] for hotel in hotels]
reply_markup = InlineKeyboardMarkup(keyboard)
await query.edit_message_text("Выберите отель:", reply_markup=reply_markup)
async def stats_select_period(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Выбор периода времени для статистики."""
query = update.callback_query
await query.answer()
hotel_id = int(query.data.split("_")[2])
context.user_data["selected_hotel"] = hotel_id
keyboard = [
[InlineKeyboardButton("Неделя", callback_data="stats_period_week")],
[InlineKeyboardButton("Месяц", callback_data="stats_period_month")],
[InlineKeyboardButton("Все время", callback_data="stats_period_all")],
]
reply_markup = InlineKeyboardMarkup(keyboard)
await query.edit_message_text("Выберите период времени:", reply_markup=reply_markup)
async def generate_statistics(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Генерация и отправка статистики."""
query = update.callback_query
await query.answer()
hotel_id = context.user_data["selected_hotel"]
period = query.data.split("_")[2]
now = datetime.now()
if period == "week":
start_date = now - timedelta(days=7)
end_date = now
elif period == "month":
start_date = now - timedelta(days=30)
end_date = now
else:
start_date = None
end_date = None
reservations = await get_reservations(hotel_id, start_date, end_date)
hotel = await sync_to_async(Hotel.objects.get)(id=hotel_id)
file_path = generate_pdf_report(hotel.name, reservations, start_date, end_date)
# Отправляем PDF файл пользователю
with open(file_path, "rb") as file:
await query.message.reply_document(document=file, filename=os.path.basename(file_path))