import logging import pymysql from datetime import datetime from urllib.parse import unquote, parse_qs import pytz from django.utils import timezone from django.db import transaction from django.conf import settings from html import unescape import chardet import html from .models import SyncLog, ExternalDBSettings, UserActivityLog, RoomDiscrepancy, ImportedHotel from hotels.models import Reservation, Hotel class DataSyncManager: """ Класс для управления загрузкой, записью и сверкой данных. """ def __init__(self, db_settings_id, use_local_db=False): self.db_settings_id = db_settings_id self.use_local_db = use_local_db # Если True, используем локальную БД self.db_settings = None self.connection = None self.table_name = None # Настройка логирования self.logger = logging.getLogger(__name__) self.logger.setLevel(logging.DEBUG) handler = logging.FileHandler('data_sync.log') handler.setLevel(logging.DEBUG) formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s') handler.setFormatter(formatter) self.logger.addHandler(handler) def connect_to_db(self): """ Устанавливает соединение с БД в зависимости от настройки. """ try: if self.use_local_db: # Подключаемся к локальной базе данных self.db_settings = settings.LocalDataBase.objects.first() # Получаем настройки первой базы self.table_name = self.db_settings.database self.connection = pymysql.connect( host=self.db_settings.host, port=self.db_settings.port, user=self.db_settings.user, password=self.db_settings.password, database=self.db_settings.database, charset='utf8mb4', use_unicode=True, cursorclass=pymysql.cursors.DictCursor, ) else: # Подключаемся к внешней базе данных self.db_settings = ExternalDBSettings.objects.get(id=self.db_settings_id) self.table_name = self.db_settings.table_name self.connection = pymysql.connect( host=self.db_settings.host, port=self.db_settings.port, user=self.db_settings.user, password=self.db_settings.password, database=self.db_settings.database, charset='utf8mb4', use_unicode=True, cursorclass=pymysql.cursors.DictCursor, ) except ExternalDBSettings.DoesNotExist: raise ValueError("Настройки подключения не найдены.") except pymysql.MySQLError as e: raise ConnectionError(f"Ошибка подключения к базе данных: {e}") def get_last_saved_record(self): """ Получает последнюю запись из таблицы UserActivityLog. """ last_record = UserActivityLog.objects.order_by('-id').first() if last_record: self.logger.info(f"Последняя запись в UserActivityLog: ID={last_record.id}") return last_record.id self.logger.info("Таблица UserActivityLog пуста.") return None def fetch_new_data(self, last_id=0, limit=100): """ Загружает новые записи из указанной таблицы, которые идут после last_id. """ if not self.connection: self.connect_to_db() cursor = self.connection.cursor(pymysql.cursors.DictCursor) # Используем DictCursor для получения словарей try: # Формируем SQL-запрос if last_id: query = f""" SELECT * FROM `{self.table_name}` WHERE id > {last_id} AND url_parameters IS NOT NULL AND url_parameters != '' ORDER BY id ASC LIMIT {limit}; """ else: query = f""" SELECT * FROM `{self.table_name}` WHERE url_parameters IS NOT NULL AND url_parameters != '' ORDER BY id ASC LIMIT {limit}; """ self.logger.info(f"Выполняется запрос: {query}") cursor.execute(query) # Получаем результаты rows = cursor.fetchall() if not rows: self.logger.info("Нет данных для загрузки.") return {"columns": [], "rows": []} # Получаем названия колонок columns = rows[0].keys() if rows else [] return {"columns": list(columns), "rows": rows} except pymysql.MySQLError as e: self.logger.error(f"Ошибка выполнения запроса: {e}") return {"columns": [], "rows": []} finally: cursor.close() def parse_datetime(self, dt_str, hotel_timezone=None): """ Преобразует строку формата 'YYYY-MM-DD HH:MM:SS' или 'YYYY-MM-DDTHH:MM:SS' в aware datetime. Преобразует время в часовой пояс отеля и корректирует на часовой пояс сервера. """ if dt_str is None: return None if isinstance(dt_str, datetime): if timezone.is_naive(dt_str): return timezone.make_aware(dt_str, timezone.get_default_timezone()) return dt_str for fmt in ("%Y-%m-%d %H:%M:%S", "%Y-%m-%dT%H:%M:%S"): try: # Преобразуем строку в naive datetime naive_dt = datetime.strptime(dt_str, fmt) # Если передан часовой пояс отеля if hotel_timezone: # Переводим время из часового пояса отеля в UTC (если данные из PMS уже UTC+X) tz = pytz.timezone(hotel_timezone) # Часовой пояс отеля aware_dt = tz.localize(naive_dt) # Локализуем в часовой пояс отеля # Теперь приводим время к серверному часовому поясу (например, Московское время) server_tz = timezone.get_default_timezone() # Например, Moscow (UTC+3) return aware_dt.astimezone(server_tz) # Переводим в серверное время # Если часовой пояс отеля не передан, используем серверный часовой пояс по умолчанию return timezone.make_aware(naive_dt, timezone.get_default_timezone()) except ValueError: continue return None def decode_html_entities(self, text): """ Декодирует URL и HTML-сущности в строке. Пытается автоматически декодировать строку в правильную кодировку. """ if text and isinstance(text, str): text = unquote(text) # Декодируем URL text = html.unescape(text) # Расшифровываем HTML сущности # Попробуем определить кодировку и привести строку к utf-8 try: detected = chardet.detect(text.encode()) encoding = detected['encoding'] if encoding and encoding != 'utf-8': text = text.encode(encoding).decode('utf-8', errors='ignore') except Exception as e: self.logger.error(f"Ошибка при обработке кодировки: {e}") return text return text def process_url_parameters(self, url_params): """ Парсит url_parameters и возвращает hotel_name и hotel_id. """ # Проверка корректности данных if not url_params or not isinstance(url_params, str): self.logger.error(f"Ошибка: некорректные url_parameters: {url_params}") return {} # Декодируем параметры URL decoded = unquote(url_params) qs = parse_qs(decoded) # Извлекаем hotel_name и hotel_id_term hotel_name = qs.get('utm_content', [None])[0] # Умолчание: None, если параметр не найден hotel_id_term = qs.get('utm_term', [None])[0] # Умолчание: None, если параметр не найден # Формируем hotel_id hotel_id = f"{hotel_name}_{hotel_id_term}" if hotel_name and hotel_id_term else None # Логирование для отладки self.logger.debug(f"Извлечено из url_parameters: hotel_name={hotel_name}, hotel_id_term={hotel_id_term}, hotel_id={hotel_id}") # Возврат результата return { 'hotel_name': hotel_name, 'hotel_id': hotel_id } def check_and_store_imported_hotel(self, hotel_name, hotel_id): """ Проверяет, есть ли отель с данным ID в таблице ImportedHotel. Если отеля с таким external_id нет, добавляет новый в таблицы ImportedHotel и Hotel. """ if not hotel_id or not hotel_name: return None # Генерация external_id в формате 'hotel_name_hotel_id' external_id = f"{hotel_name}_{hotel_id}" # Проверяем, существует ли запись с таким external_id в ImportedHotel existing_hotel = ImportedHotel.objects.filter(external_id=external_id).first() if existing_hotel: self.logger.info(f"Отель с external_id {external_id} уже существует в ImportedHotel.") else: try: # Создаем новую запись в ImportedHotel with transaction.atomic(): imported_hotel = ImportedHotel.objects.create( external_id=external_id, name=hotel_name, display_name=hotel_name, imported=True # Отмечаем, что отель импортирован ) self.logger.info(f"Отель с external_id {external_id} добавлен в ImportedHotel.") # Создаем новый отель в основной таблице Hotel hotel = Hotel.objects.create( hotel_id=external_id, name=hotel_name, phone=None, email=None, address=None, city=None, timezone="UTC", description="Автоматически импортированный отель", ) self.logger.info(f"Отель с hotel_id {external_id} добавлен в Hotel с флагом is_imported=True.") return hotel except Exception as e: self.logger.error(f"Ошибка при добавлении отеля {hotel_name} с external_id {external_id}: {e}") return None return existing_hotel def write_to_db(self, data): """ Записывает данные в UserActivityLog и при необходимости в ImportedHotel. Записывает лог синхронизации в SyncLog. """ processed_records = 0 received_records = len(data["rows"]) print(f"Received records: {received_records}") for row in data["rows"]: print(f'\n------\n row: {row}\n------\n') # record = dict(zip(data["columns"], row)) # Преобразуем строку в словарь # Получаем url_parameters из записи url_parameters = self.decode_html_entities(row.get("url_parameters", "")) print(f'\n------\n url_parameters: {url_parameters}\n------\n') # Проверка на пустое значение if not url_parameters: print(f"Error: url_parameters is empty in record {row}") continue # Пропускаем запись, если url_parameters отсутствует # Пытаемся извлечь информацию о отеле из url_parameters hotel_name = None hotel_id = None if url_parameters: hotel_info = self.process_url_parameters(url_parameters) hotel_id = hotel_info.get('hotel_id') if not hotel_id: print(f"Error: hotel_id is empty in record {row}") continue # Пропускаем запись, если hotel_id отсутствует # Проверяем, существует ли отель с таким hotel_id hotel = self.check_and_store_imported_hotel(hotel_name=hotel_id, hotel_id=hotel_id) if not hotel: print(f"Error: Could not find or create hotel for hotel_id {hotel_id}") continue # Пропускаем запись, если отель не найден или не создан # Преобразуем дату created = self.parse_datetime(row.get("created")) date_time = self.parse_datetime(row.get("date_time")) # Декодируем все строки, которые могут содержать HTML-сущности referred = self.decode_html_entities(row.get("referred", "")) agent = self.decode_html_entities(row.get("agent", "")) platform = self.decode_html_entities(row.get("platform", "")) version = self.decode_html_entities(row.get("version", "")) model = self.decode_html_entities(row.get("model", "")) device = self.decode_html_entities(row.get("device", "")) UAString = self.decode_html_entities(row.get("UAString", "")) location = self.decode_html_entities(row.get("location", "")) page_title = self.decode_html_entities(row.get("page_title", "")) page_url = self.decode_html_entities(row.get("page_url", "")) # Запись в UserActivityLog UserActivityLog.objects.update_or_create( external_id=row.get("id", None), defaults={ "user_id": row.get("user_id"), "ip": row.get("ip"), "created": created, "timestamp": row.get("timestamp"), "date_time": date_time, "referred": referred, "agent": agent, "platform": platform, "version": version, "model": model, "device": device, "UAString": UAString, "location": location, "page_id": row.get("page_id"), "url_parameters": url_parameters, "page_title": page_title, "type": row.get("type"), "last_counter": row.get("last_counter"), "hits": row.get("hits"), "honeypot": row.get("honeypot"), "reply": row.get("reply"), "page_url": page_url, } ) processed_records += 1 # Логируем обработанные записи print(f"Processed records: {processed_records}") def reconcile_data(self): """ Сверяет данные таблицы user_activity_log с таблицей hotels.reservations и записывает несоответствия в таблицу RoomDiscrepancy. """ discrepancies = [] reservations = Reservation.objects.values("hotel_id", "room_number", "check_in", "check_out") for log in UserActivityLog.objects.all(): for reservation in reservations: if ( log.page_id != reservation["room_number"] or log.created.date() < reservation["check_in"] or log.created.date() > reservation["check_out"] ): discrepancies.append(RoomDiscrepancy( hotel_id=reservation["hotel_id"], room_number=log.page_id, booking_id=f"Log-{log.id}", check_in_date_expected=reservation["check_in"], check_in_date_actual=log.created.date() if log.created else None, discrepancy_type="Mismatch", )) RoomDiscrepancy.objects.bulk_create(discrepancies) def sync(self): try: last_id = self.get_last_saved_record() self.logger.info(f"Синхронизация начата. Последний сохранённый ID: {last_id}") # Загружаем новые данные data = self.fetch_new_data(last_id=last_id) print(f'\n------\n data: {data}\n------\n') if not data["rows"]: self.logger.info("Нет новых данных для синхронизации.") return # Логирование типов данных self.logger.debug(f"Тип первой строки: {type(data['rows'][0])}") first_row_id = data["rows"][0]["id"] if last_id is not None and first_row_id <= last_id: self.logger.info(f"Нет новых записей для синхронизации. Последний ID: {last_id}, первый ID из внешней таблицы: {first_row_id}.") return processed_records = self.write_to_db(data) self.logger.info(f"Синхронизация завершена. Обработано записей: {processed_records}") except Exception as e: self.logger.error(f"Ошибка синхронизации данных: {e}") raise RuntimeError(f"Ошибка синхронизации данных: {e}") finally: if self.connection: self.connection.close() def scheduled_sync(): """ Плановая задача для синхронизации данных. """ db_settings_list = ExternalDBSettings.objects.filter(is_active=True) for db_settings in db_settings_list: sync_manager = DataSyncManager(db_settings.id) sync_manager.sync()