import logging import pymysql from datetime import datetime from urllib.parse import unquote, parse_qs from django.utils import timezone import html from hotels.models import Room, Hotel from .models import UserActivityLog, ExternalDBSettings, SyncLog from touchh.utils.log import CustomLogger from concurrent.futures import ThreadPoolExecutor, TimeoutError from decouple import config from django.db.models import F class DatabaseConnector: def __init__(self, db_settings_id): self.db_settings_id = db_settings_id self.logger = CustomLogger(name="DatabaseConnector", log_level="DEBUG").get_logger() self.connection = None self.db_settings = self.get_db_settings() def get_db_settings(self): try: settings = ExternalDBSettings.objects.get(id=self.db_settings_id) self.logger.info(f"Retrieved DB settings: {settings}") return { "host": settings.host, "port": settings.port, "user": settings.user, "password": settings.password, "database": settings.database, "table_name": settings.table_name } except ExternalDBSettings.DoesNotExist: self.logger.error(f"Settings with ID {self.db_settings_id} not found.") raise ValueError("Invalid db_settings_id") except Exception as e: self.logger.error(f"Error retrieving settings: {e}") raise def connect(self): try: self.connection = pymysql.connect( host=self.db_settings["host"], port=self.db_settings["port"], user=self.db_settings["user"], password=self.db_settings["password"], database=self.db_settings["database"], charset="utf8mb4", cursorclass=pymysql.cursors.DictCursor, ) self.logger.info("Database connection established successfully.") except pymysql.err.OperationalError as e: self.logger.error(f"Operational error during DB connection: {e}") raise ConnectionError(f"Operational error: {e}") except Exception as e: self.logger.error(f"Unexpected database connection error: {e}") raise ConnectionError(e) def execute_query(self, query): try: with self.connection.cursor() as cursor: cursor.execute(query) return cursor.fetchall() except Exception as e: self.logger.error(f"Query execution error: {e}") return [] def close(self): if self.connection: self.connection.close() self.logger.info("Database connection closed.") class DataProcessor: def __init__(self, logger): self.logger = logger def decode_html_entities(self, text): if not text: self.logger.warning("Empty text received for decoding HTML entities.") return html.unescape(unquote(text)) if text else text def parse_datetime(self, dt_str): try: if isinstance(dt_str, datetime): return timezone.make_aware(dt_str) if timezone.is_naive(dt_str) else dt_str if dt_str: return timezone.make_aware(datetime.strptime(dt_str, "%Y-%m-%d %H:%M:%S")) except Exception as e: self.logger.error(f"Datetime parsing error: {e}") return None def parse_url_parameters(self, url_parameters): """ Парсит строку URL-параметров в словарь. :param url_parameters: Строка с URL-параметрами. :return: Словарь с распарсенными параметрами. """ try: if not url_parameters: return {} decoded_params = unquote(url_parameters) parsed_params = parse_qs(decoded_params) return {key: value[0] for key, value in parsed_params.items()} except Exception as e: self.logger.error(f"Error parsing URL parameters: {e}") return {} class HotelRoomManager: def __init__(self, logger): self.logger = logger def get_or_create_hotel(self, hotel_id, page_title): if not hotel_id: self.logger.warning("Hotel creation skipped: missing hotel_id.") return None hotel, created = Hotel.objects.get_or_create( hotel_id=hotel_id, defaults={ "name": html.unescape(page_title) or f"Отель {hotel_id}", "description": "Автоматически созданный отель", } ) if created: self.logger.info(f"Hotel '{hotel.name}' created with hotel_id: {hotel_id}") else: self.logger.info(f"Hotel '{hotel.name}' already exists with hotel_id: {hotel_id}") return hotel def get_or_create_room(self, hotel, room_number): if not hotel: self.logger.warning("Room creation skipped: missing hotel.") return None if not room_number: self.logger.warning(f"Room creation skipped: missing room_number for hotel {hotel.name}.") return None try: # Проверяем существование комнаты room = Room.objects.filter(hotel=hotel, number=room_number).first() if room: self.logger.info(f"Room '{room_number}' already exists in hotel '{hotel.name}'.") return room # Создаем комнату, если она не найдена room = Room.objects.create( hotel=hotel, number=room_number, external_id=f"{hotel.hotel_id}_{room_number}".lower(), description="Automatically added room", ) self.logger.info(f"Room '{room.number}' created in hotel '{hotel.name}'.") return room except Exception as e: self.logger.error(f"Error creating room '{room_number}' in hotel '{hotel.name}': {e}") return None class DataSyncManager: def __init__(self, db_settings_id): self.logger = CustomLogger(name="DataSyncManager", log_level="DEBUG").get_logger() self.db_connector = DatabaseConnector(db_settings_id) self.db_settings = self.db_connector.db_settings # Сохраняем настройки базы данных self.data_processor = DataProcessor(self.logger) self.hotel_manager = HotelRoomManager(self.logger) def get_last_saved_record(self): record = UserActivityLog.objects.order_by("-id").first() return record.id if record else 0 def fetch_new_data(self, last_id): """ Извлекает новые данные из таблицы для синхронизации. :param last_id: Последний обработанный ID. :return: Список строк, полученных из базы данных. """ query = f""" SELECT * FROM `{self.db_settings.get('table_name')}` WHERE id > {last_id} AND url_parameters IS NOT NULL AND url_parameters LIKE '%utm_medium%' AND page_url IS NOT NULL ORDER BY id ASC LIMIT 1000; """ self.logger.info(f"Fetching new data with query: {query}") try: rows = self.db_connector.execute_query(query) self.logger.info(f"Fetched {len(rows)} records from the database.") return rows except Exception as e: self.logger.error(f"Error fetching data: {e}") return [] def update_sync_log(self, hotel, recieved_records, processed_records): try: log, created = SyncLog.objects.get_or_create(hotel=hotel) if created: log.recieved_records = recieved_records log.processed_records = processed_records else: log.recieved_records += recieved_records log.processed_records += processed_records log.save() self.logger.info(f"Sync log updated for hotel '{hotel.name}'.") except Exception as e: self.logger.error(f"Error updating sync log for hotel '{hotel.name}': {e}") def process_and_save_data(self, rows): """ Обрабатывает и сохраняет данные из внешнего источника. :param rows: Список строк данных, полученных из базы данных. """ seen_entries = set() for row in rows: # Получение и декодирование URL-параметров url_parameters = row.get("url_parameters") if not url_parameters: self.logger.warning(f"Skipping record with missing URL parameters: {row}") continue parsed_params = self.data_processor.parse_url_parameters(url_parameters) hotel_id = parsed_params.get("utm_content") # Извлекаем hotel_id из параметров room_number = parsed_params.get("utm_term") # Извлекаем room_number из параметров if not hotel_id or not room_number: self.logger.warning(f"Skipping record with missing data: hotel_id={hotel_id}, room_number={room_number}") continue # Проверка на дубликаты if (hotel_id, room_number) in seen_entries: self.logger.warning(f"Duplicate record skipped: hotel_id={hotel_id}, room_number={room_number}") continue seen_entries.add((hotel_id, room_number)) try: # Получение или создание отеля hotel = self.hotel_manager.get_or_create_hotel(hotel_id, row.get("page_title")) if not hotel: self.logger.warning(f"Skipping record: Failed to create or retrieve hotel with ID {hotel_id}") continue # Получение или создание комнаты room = self.hotel_manager.get_or_create_room(hotel, room_number) if not room: self.logger.warning(f"Skipping record: Failed to create or retrieve room {room_number} in hotel {hotel.name}") continue # Создание или обновление записи активности пользователя UserActivityLog.objects.update_or_create( external_id=row.get("id"), defaults={ "user_id": row.get("user_id") or 0, "ip": row.get("ip") or "0.0.0.0", "created": self.data_processor.parse_datetime(row.get("created")), "timestamp": row.get("timestamp") or datetime.now().strftime("%Y-%m-%d %H:%M:%S"), "date_time": self.data_processor.parse_datetime(row.get("date_time")), "url_parameters": parsed_params, "page_title": self.data_processor.decode_html_entities(row.get("page_title")) or "Untitled", "page_url": row.get("page_url") or "", "hits": row.get("hits") or 0, } ) self.logger.info(f"Record ID {row.get('id')} processed successfully.") except Exception as e: self.logger.error(f"Error processing record ID {row.get('id')}: {e}") self.logger.info(f"Data processing completed. Processed {len(seen_entries)} unique records.") def sync(self): self.db_connector.connect() try: last_id = self.get_last_saved_record() rows = self.fetch_new_data(last_id) self.process_and_save_data(rows) self.logger.info("Sync completed.") finally: self.db_connector.close() def scheduled_sync(): logger = CustomLogger(name="DatabaseSyncScheduler", log_level="ERROR").get_logger() logger.info("Starting scheduled sync.") active_db_settings = ExternalDBSettings.objects.filter(is_active=True) if not active_db_settings.exists(): logger.warning("No active database connections found.") return logger.info(f"Found {len(active_db_settings)} active database connections.") def sync_task(db_settings): try: logger.info(f"Syncing connection: {db_settings.name} (ID={db_settings.id})") sync_manager = DataSyncManager(db_settings.id) sync_manager.sync() logger.info(f"Sync completed for connection: {db_settings}") except Exception as e: logger.error(f"Error syncing connection {db_settings}: {e}") with ThreadPoolExecutor(max_workers=5) as executor: futures = [executor.submit(sync_task, db_settings) for db_settings in active_db_settings] for future in futures: try: future.result(timeout=300) except TimeoutError: logger.error("Sync task timed out.") logger.info("Scheduled sync completed.")