import pymysql from django.db import transaction from django.utils import timezone from datetime import datetime from html import unescape from urllib.parse import unquote, parse_qs from .models import ExternalDBSettings, UserActivityLog, RoomDiscrepancy, ImportedHotel from hotels.models import Reservation, Hotel class DataSyncManager: """ Класс для управления загрузкой, записью и сверкой данных. """ def __init__(self, db_settings_id): self.db_settings_id = db_settings_id self.db_settings = None self.connection = None self.table_name = None def connect_to_db(self): """ Устанавливает соединение с внешней базой данных и получает имя таблицы. """ try: self.db_settings = ExternalDBSettings.objects.get(id=self.db_settings_id) self.table_name = self.db_settings.table_name self.connection = pymysql.connect( host=self.db_settings.host, port=self.db_settings.port, user=self.db_settings.user, password=self.db_settings.password, database=self.db_settings.database, charset='utf8mb4', use_unicode=True ) except ExternalDBSettings.DoesNotExist: raise ValueError("Настройки подключения не найдены.") except pymysql.MySQLError as e: raise ConnectionError(f"Ошибка подключения к базе данных: {e}") def fetch_data(self, limit=100): """ Загружает данные из указанной таблицы. """ if not self.connection: self.connect_to_db() cursor = self.connection.cursor() try: cursor.execute(f"SELECT * FROM `{self.table_name}` LIMIT {limit};") columns = [desc[0] for desc in cursor.description] rows = cursor.fetchall() return {"columns": columns, "rows": rows} except pymysql.MySQLError as e: raise RuntimeError(f"Ошибка выполнения запроса: {e}") finally: cursor.close() def parse_datetime(self, dt_str): """ Преобразует строку формата 'YYYY-MM-DD HH:MM:SS' или 'YYYY-MM-DDTHH:MM:SS' в aware datetime. """ if dt_str is None: return None if isinstance(dt_str, datetime): if timezone.is_naive(dt_str): return timezone.make_aware(dt_str, timezone.get_default_timezone()) return dt_str for fmt in ("%Y-%m-%d %H:%M:%S", "%Y-%m-%dT%H:%M:%S"): try: naive_dt = datetime.strptime(dt_str, fmt) return timezone.make_aware(naive_dt, timezone.get_default_timezone()) except ValueError: continue return None def decode_html_entities(self, text): """ Раскодирует HTML-сущности в строке. """ if text and isinstance(text, str): return unescape(text) return text def process_url_parameters(self, url_params): """ Парсит url_parameters, извлекает utm_content (имя отеля) и utm_term (ID отеля). """ if not url_params: return {} decoded = unquote(url_params) qs = parse_qs(decoded) hotel_name = qs.get('utm_content', [None])[0] hotel_id = qs.get('utm_term', [None])[0] return { 'hotel_name': hotel_name, 'hotel_id': hotel_id } def check_and_store_imported_hotel(self, hotel_name, hotel_id): """ Проверяет, есть ли отель с данным ID в основной БД. Если hotel_id не число или отеля с таким ID нет, добавляет во временную таблицу ImportedHotel. """ if not hotel_id or not hotel_name: return # Проверим, что hotel_id — число if hotel_id.isdigit(): hotel_id_int = int(hotel_id) hotel_exists = Hotel.objects.filter(id=hotel_id_int).exists() else: # Если не число, считаем что отеля в основной БД нет hotel_exists = False if not hotel_exists: ImportedHotel.objects.update_or_create( external_id=str(hotel_id), defaults={ 'name': hotel_name } ) @transaction.atomic def write_to_db(self, data): """ Записывает данные в UserActivityLog и при необходимости в ImportedHotel. """ for row in data["rows"]: record = dict(zip(data["columns"], row)) external_id = record.get("id", None) if external_id is not None: external_id = str(external_id) created = self.parse_datetime(record.get("created")) date_time = self.parse_datetime(record.get("date_time")) referred = self.decode_html_entities(record.get("referred", "")) agent = self.decode_html_entities(record.get("agent", "")) platform = self.decode_html_entities(record.get("platform", "")) version = self.decode_html_entities(record.get("version", "")) model = self.decode_html_entities(record.get("model", "")) device = self.decode_html_entities(record.get("device", "")) UAString = self.decode_html_entities(record.get("UAString", "")) location = self.decode_html_entities(record.get("location", "")) page_title = self.decode_html_entities(record.get("page_title", "")) page_url = self.decode_html_entities(record.get("page_url", "")) url_parameters = self.decode_html_entities(record.get("url_parameters", "")) hotel_info = self.process_url_parameters(url_parameters) if hotel_info.get('hotel_name') and hotel_info.get('hotel_id'): self.check_and_store_imported_hotel( hotel_name=hotel_info['hotel_name'], hotel_id=hotel_info['hotel_id'] ) url_parameters = unquote(url_parameters) UserActivityLog.objects.update_or_create( external_id=external_id, defaults={ "user_id": record.get("user_id"), "ip": record.get("ip"), "created": created, "timestamp": record.get("timestamp"), "date_time": date_time, "referred": referred, "agent": agent, "platform": platform, "version": version, "model": model, "device": device, "UAString": UAString, "location": location, "page_id": record.get("page_id"), "url_parameters": url_parameters, "page_title": page_title, "type": record.get("type"), "last_counter": record.get("last_counter"), "hits": record.get("hits"), "honeypot": record.get("honeypot"), "reply": record.get("reply"), "page_url": page_url, } ) def reconcile_data(self): """ Сверяет данные таблицы user_activity_log с таблицей hotels.reservations и записывает несоответствия в таблицу RoomDiscrepancy. """ discrepancies = [] reservations = Reservation.objects.values("hotel_id", "room_number", "check_in", "check_out") for log in UserActivityLog.objects.all(): for reservation in reservations: if ( log.page_id != reservation["room_number"] or log.created.date() < reservation["check_in"] or log.created.date() > reservation["check_out"] ): discrepancies.append(RoomDiscrepancy( hotel_id=reservation["hotel_id"], room_number=log.page_id, booking_id=f"Log-{log.id}", check_in_date_expected=reservation["check_in"], check_in_date_actual=log.created.date() if log.created else None, discrepancy_type="Mismatch", )) RoomDiscrepancy.objects.bulk_create(discrepancies) def sync(self): """ Основной метод для загрузки, записи и сверки данных. """ try: self.connect_to_db() data = self.fetch_data() self.write_to_db(data) self.reconcile_data() except Exception as e: raise RuntimeError(f"Ошибка синхронизации данных: {e}") finally: if self.connection: self.connection.close() def scheduled_sync(): """ Плановая задача для синхронизации данных. """ db_settings_list = ExternalDBSettings.objects.filter(is_active=True) for db_settings in db_settings_list: sync_manager = DataSyncManager(db_settings.id) sync_manager.sync()