326 lines
13 KiB
Python
326 lines
13 KiB
Python
|
||
import logging
|
||
import pymysql
|
||
from datetime import datetime
|
||
from urllib.parse import unquote, parse_qs
|
||
from django.utils import timezone
|
||
import html
|
||
from hotels.models import Room, Hotel
|
||
from .models import UserActivityLog, ExternalDBSettings, SyncLog
|
||
from touchh.utils.log import CustomLogger
|
||
from concurrent.futures import ThreadPoolExecutor, TimeoutError
|
||
from decouple import config
|
||
from django.db.models import F
|
||
|
||
class DatabaseConnector:
|
||
def __init__(self, db_settings_id):
|
||
self.db_settings_id = db_settings_id
|
||
self.logger = CustomLogger(name="DatabaseConnector", log_level="WARNING").get_logger()
|
||
self.connection = None
|
||
self.db_settings = self.get_db_settings()
|
||
|
||
def get_db_settings(self):
|
||
try:
|
||
settings = ExternalDBSettings.objects.get(id=self.db_settings_id)
|
||
self.logger.info(f"Retrieved DB settings: {settings}")
|
||
return {
|
||
"host": settings.host,
|
||
"port": settings.port,
|
||
"user": settings.user,
|
||
"password": settings.password,
|
||
"database": settings.database,
|
||
"table_name": settings.table_name
|
||
|
||
}
|
||
except ExternalDBSettings.DoesNotExist:
|
||
self.logger.error(f"Settings with ID {self.db_settings_id} not found.")
|
||
raise ValueError("Invalid db_settings_id")
|
||
except Exception as e:
|
||
self.logger.error(f"Error retrieving settings: {e}")
|
||
raise
|
||
|
||
def connect(self):
|
||
try:
|
||
self.connection = pymysql.connect(
|
||
|
||
host=self.db_settings["host"],
|
||
port=self.db_settings["port"],
|
||
user=self.db_settings["user"],
|
||
password=self.db_settings["password"],
|
||
database=self.db_settings["database"],
|
||
charset="utf8mb4",
|
||
cursorclass=pymysql.cursors.DictCursor,
|
||
)
|
||
self.logger.info("Database connection established successfully.")
|
||
except pymysql.err.OperationalError as e:
|
||
self.logger.error(f"Operational error during DB connection: {e}")
|
||
raise ConnectionError(f"Operational error: {e}")
|
||
except Exception as e:
|
||
self.logger.error(f"Unexpected database connection error: {e}")
|
||
raise ConnectionError(e)
|
||
|
||
def execute_query(self, query):
|
||
try:
|
||
with self.connection.cursor() as cursor:
|
||
cursor.execute(query)
|
||
return cursor.fetchall()
|
||
except Exception as e:
|
||
self.logger.error(f"Query execution error: {e}")
|
||
return []
|
||
|
||
def close(self):
|
||
if self.connection:
|
||
self.connection.close()
|
||
self.logger.info("Database connection closed.")
|
||
|
||
|
||
class DataProcessor:
|
||
def __init__(self, logger):
|
||
self.logger = logger
|
||
|
||
def decode_html_entities(self, text):
|
||
if not text:
|
||
self.logger.warning("Empty text received for decoding HTML entities.")
|
||
return html.unescape(unquote(text)) if text else text
|
||
|
||
|
||
def parse_datetime(self, dt_str):
|
||
try:
|
||
if isinstance(dt_str, datetime):
|
||
return timezone.make_aware(dt_str) if timezone.is_naive(dt_str) else dt_str
|
||
if dt_str:
|
||
return timezone.make_aware(datetime.strptime(dt_str, "%Y-%m-%d %H:%M:%S"))
|
||
except Exception as e:
|
||
self.logger.error(f"Datetime parsing error: {e}")
|
||
return None
|
||
|
||
def parse_url_parameters(self, url_parameters):
|
||
"""
|
||
Парсит строку URL-параметров в словарь.
|
||
|
||
:param url_parameters: Строка с URL-параметрами.
|
||
:return: Словарь с распарсенными параметрами.
|
||
"""
|
||
try:
|
||
if not url_parameters:
|
||
return {}
|
||
decoded_params = unquote(url_parameters)
|
||
parsed_params = parse_qs(decoded_params)
|
||
return {key: value[0] for key, value in parsed_params.items()}
|
||
except Exception as e:
|
||
self.logger.error(f"Error parsing URL parameters: {e}")
|
||
return {}
|
||
|
||
|
||
class HotelRoomManager:
|
||
def __init__(self, logger):
|
||
self.logger = logger
|
||
|
||
def get_or_create_hotel(self, hotel_id, page_title):
|
||
if not hotel_id:
|
||
self.logger.warning("Hotel creation skipped: missing hotel_id.")
|
||
return None
|
||
|
||
hotel, created = Hotel.objects.get_or_create(
|
||
hotel_id=hotel_id,
|
||
defaults={
|
||
"name": html.unescape(page_title) or f"Отель {hotel_id}",
|
||
"description": "Автоматически созданный отель",
|
||
}
|
||
)
|
||
if created:
|
||
self.logger.info(f"Hotel '{hotel.name}' created with hotel_id: {hotel_id}")
|
||
else:
|
||
self.logger.info(f"Hotel '{hotel.name}' already exists with hotel_id: {hotel_id}")
|
||
return hotel
|
||
|
||
def get_or_create_room(self, hotel, room_number):
|
||
if not hotel:
|
||
self.logger.warning("Room creation skipped: missing hotel.")
|
||
return None
|
||
|
||
if not room_number:
|
||
self.logger.warning(f"Room creation skipped: missing room_number for hotel {hotel.name}.")
|
||
return None
|
||
|
||
try:
|
||
# Проверяем существование комнаты
|
||
room = Room.objects.filter(hotel=hotel, number=room_number).first()
|
||
if room:
|
||
self.logger.info(f"Room '{room_number}' already exists in hotel '{hotel.name}'.")
|
||
return room
|
||
|
||
# Создаем комнату, если она не найдена
|
||
room = Room.objects.create(
|
||
hotel=hotel,
|
||
number=room_number,
|
||
external_id=f"{hotel.hotel_id}_{room_number}".lower(),
|
||
description="Automatically added room",
|
||
)
|
||
self.logger.info(f"Room '{room.number}' created in hotel '{hotel.name}'.")
|
||
return room
|
||
|
||
except Exception as e:
|
||
self.logger.error(f"Error creating room '{room_number}' in hotel '{hotel.name}': {e}")
|
||
return None
|
||
|
||
class DataSyncManager:
|
||
def __init__(self, db_settings_id):
|
||
self.logger = CustomLogger(name="DataSyncManager", log_level="DEBUG").get_logger()
|
||
self.db_connector = DatabaseConnector(db_settings_id)
|
||
self.db_settings = self.db_connector.db_settings # Сохраняем настройки базы данных
|
||
self.data_processor = DataProcessor(self.logger)
|
||
self.hotel_manager = HotelRoomManager(self.logger)
|
||
|
||
def get_last_saved_record(self):
|
||
record = UserActivityLog.objects.order_by("-id").first()
|
||
return record.id if record else 0
|
||
|
||
def fetch_new_data(self, last_id):
|
||
"""
|
||
Извлекает новые данные из таблицы для синхронизации.
|
||
|
||
:param last_id: Последний обработанный ID.
|
||
:return: Список строк, полученных из базы данных.
|
||
"""
|
||
query = f"""
|
||
SELECT * FROM `{self.db_settings.get('table_name')}`
|
||
WHERE id > {last_id}
|
||
AND url_parameters IS NOT NULL
|
||
AND url_parameters LIKE '%utm_medium%'
|
||
AND page_url IS NOT NULL
|
||
ORDER BY id ASC
|
||
LIMIT 1000;
|
||
"""
|
||
self.logger.info(f"Fetching new data with query: {query}")
|
||
|
||
try:
|
||
rows = self.db_connector.execute_query(query)
|
||
self.logger.info(f"Fetched {len(rows)} records from the database.")
|
||
return rows
|
||
except Exception as e:
|
||
self.logger.error(f"Error fetching data: {e}")
|
||
return []
|
||
def update_sync_log(self, hotel, recieved_records, processed_records):
|
||
"""
|
||
Обновляет или создает запись в таблице SyncLog.
|
||
"""
|
||
try:
|
||
log, created = SyncLog.objects.update_or_create(
|
||
hotel=hotel,
|
||
defaults={
|
||
"recieved_records": recieved_records,
|
||
"processed_records": processed_records,
|
||
"created": timezone.now(), # Убедитесь, что дата обновляется
|
||
}
|
||
)
|
||
if created:
|
||
self.logger.info(f"Sync log created for hotel '{hotel.name}'.")
|
||
else:
|
||
self.logger.info(f"Sync log updated for hotel '{hotel.name}'.")
|
||
except Exception as e:
|
||
self.logger.error(f"Error updating sync log for hotel '{hotel.name}': {e}")
|
||
|
||
self.logger.info(f"Attempting to update sync log for hotel: {hotel.name}")
|
||
self.update_sync_log(hotel, recieved_records, processed_records)
|
||
|
||
def process_and_save_data(self, rows):
|
||
hotel_processed_counts = {} # Словарь для подсчёта записей по каждому отелю
|
||
|
||
for row in rows:
|
||
try:
|
||
url_parameters = row.get("url_parameters")
|
||
if not url_parameters:
|
||
self.logger.warning(f"Skipping record with missing URL parameters: {row}")
|
||
continue
|
||
|
||
parsed_params = self.data_processor.parse_url_parameters(url_parameters)
|
||
hotel_id = parsed_params.get("utm_content")
|
||
room_number = parsed_params.get("utm_term")
|
||
|
||
if not hotel_id or not room_number:
|
||
self.logger.warning(f"Skipping record with missing data: hotel_id={hotel_id}, room_number={room_number}")
|
||
continue
|
||
|
||
hotel = self.hotel_manager.get_or_create_hotel(hotel_id, row.get("page_title"))
|
||
if not hotel:
|
||
self.logger.warning(f"Skipping record: Failed to create or retrieve hotel with ID {hotel_id}")
|
||
continue
|
||
|
||
room = self.hotel_manager.get_or_create_room(hotel, room_number)
|
||
if not room:
|
||
self.logger.warning(f"Skipping record: Failed to create or retrieve room {room_number} in hotel {hotel.name}")
|
||
continue
|
||
|
||
UserActivityLog.objects.update_or_create(
|
||
external_id=row.get("id"),
|
||
defaults={
|
||
"user_id": row.get("user_id") or 0,
|
||
"ip": row.get("ip") or "0.0.0.0",
|
||
"created": self.data_processor.parse_datetime(row.get("created")),
|
||
"timestamp": row.get("timestamp") or datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
|
||
"date_time": self.data_processor.parse_datetime(row.get("date_time")),
|
||
"url_parameters": parsed_params,
|
||
"page_title": self.data_processor.decode_html_entities(row.get("page_title")) or "Untitled",
|
||
"page_url": row.get("page_url") or "",
|
||
"page_id": row.get("page_id") or 0,
|
||
"hits": row.get("hits") or 0,
|
||
}
|
||
)
|
||
self.logger.info(f"Record ID {row.get('id')} processed successfully.")
|
||
|
||
if hotel.id not in hotel_processed_counts:
|
||
hotel_processed_counts[hotel.id] = {"recieved_records": 0, "processed_records": 0}
|
||
hotel_processed_counts[hotel.id]["processed_records"] += 1
|
||
|
||
except Exception as e:
|
||
self.logger.error(f"Error processing record ID {row.get('id')}: {e}")
|
||
|
||
for hotel_id, counts in hotel_processed_counts.items():
|
||
hotel = Hotel.objects.get(id=hotel_id)
|
||
self.update_sync_log(hotel, recieved_records=len(rows), processed_records=counts["processed_records"])
|
||
|
||
|
||
|
||
def sync(self):
|
||
self.db_connector.connect()
|
||
try:
|
||
last_id = self.get_last_saved_record()
|
||
rows = self.fetch_new_data(last_id)
|
||
self.process_and_save_data(rows)
|
||
self.logger.info("Sync completed.")
|
||
finally:
|
||
self.db_connector.close()
|
||
|
||
|
||
def scheduled_sync():
|
||
import os
|
||
logger = CustomLogger(name="DatabaseSyncScheduler", log_level=os.getenv("SCHEDULED_SYNC_LOG_LEVEL", default="ERROR")).get_logger()
|
||
logger.info("Starting scheduled sync.")
|
||
|
||
active_db_settings = ExternalDBSettings.objects.filter(is_active=True)
|
||
if not active_db_settings.exists():
|
||
logger.warning("No active database connections found.")
|
||
return
|
||
|
||
logger.info(f"Found {len(active_db_settings)} active database connections.")
|
||
|
||
def sync_task(db_settings):
|
||
try:
|
||
logger.info(f"Syncing connection: {db_settings.name} (ID={db_settings.id})")
|
||
sync_manager = DataSyncManager(db_settings.id)
|
||
sync_manager.sync()
|
||
logger.info(f"Sync completed for connection: {db_settings}")
|
||
except Exception as e:
|
||
logger.error(f"Error syncing connection {db_settings}: {e}")
|
||
|
||
with ThreadPoolExecutor(max_workers=10) as executor:
|
||
futures = [executor.submit(sync_task, db_settings) for db_settings in active_db_settings]
|
||
for future in futures:
|
||
try:
|
||
future.result(timeout=300)
|
||
except TimeoutError:
|
||
logger.error("Sync task timed out.")
|
||
|
||
logger.info("Scheduled sync completed.")
|