337 lines
14 KiB
Python
337 lines
14 KiB
Python
import logging
|
||
import pymysql
|
||
from datetime import datetime
|
||
from urllib.parse import unquote, parse_qs
|
||
import pytz
|
||
from django.utils import timezone
|
||
from django.conf import settings
|
||
import chardet
|
||
import html
|
||
from hotels.models import Room, Hotel
|
||
from .models import UserActivityLog, ExternalDBSettings
|
||
|
||
|
||
class DatabaseConnector:
|
||
"""
|
||
Класс для подключения к внешней базе данных.
|
||
"""
|
||
def __init__(self, db_settings_id):
|
||
self.db_settings_id = db_settings_id
|
||
self.connection = None
|
||
self.logger = self.setup_logger()
|
||
self.db_settings = None
|
||
|
||
def setup_logger(self):
|
||
default_level = logging.INFO # Уровень по умолчанию
|
||
level_name = getattr(settings, "DATA_SYNC_LOG_LEVEL", "INFO").upper()
|
||
log_level = getattr(logging, level_name, default_level)
|
||
logger.setLevel(log_level)
|
||
|
||
# Настройка обработчика для файла
|
||
handler = logging.FileHandler("data_sync.log")
|
||
handler.setFormatter(logging.Formatter("%(asctime)s - %(levelname)s - %(message)s"))
|
||
handler.setLevel(log_level)
|
||
|
||
# Удаляем старые обработчики, чтобы избежать дублирования
|
||
if logger.hasHandlers():
|
||
logger.handlers.clear()
|
||
logger.addHandler(handler)
|
||
|
||
# Сообщение о текущем уровне логирования
|
||
logger.info(f"Уровень логирования установлен: {logging.getLevelName(log_level)}")
|
||
|
||
logger.addHandler(handler)
|
||
return logger
|
||
|
||
def connect(self):
|
||
"""Подключение к базе данных."""
|
||
try:
|
||
self.db_settings = ExternalDBSettings.objects.get(id=self.db_settings_id)
|
||
self.connection = pymysql.connect(
|
||
host=self.db_settings.host,
|
||
port=self.db_settings.port,
|
||
user=self.db_settings.user,
|
||
password=self.db_settings.password,
|
||
database=self.db_settings.database,
|
||
charset="utf8mb4",
|
||
cursorclass=pymysql.cursors.DictCursor,
|
||
)
|
||
self.logger.info("Подключение к базе данных успешно установлено.")
|
||
except Exception as e:
|
||
self.logger.error(f"Ошибка подключения к БД: {e}")
|
||
raise ConnectionError(e)
|
||
|
||
def close(self):
|
||
"""Закрывает соединение с базой данных."""
|
||
if self.connection:
|
||
self.connection.close()
|
||
self.logger.info("Соединение с базой данных закрыто.")
|
||
|
||
def execute_query(self, query):
|
||
"""Выполнение запроса и возврат результатов."""
|
||
with self.connection.cursor() as cursor:
|
||
cursor.execute(query)
|
||
return cursor.fetchall()
|
||
|
||
|
||
class DataProcessor:
|
||
"""
|
||
Обрабатывает и сохраняет данные.
|
||
"""
|
||
def __init__(self, logger):
|
||
self.logger = logger
|
||
|
||
def decode_html_entities(self, text):
|
||
"""Декодирует URL и HTML-сущности."""
|
||
if text and isinstance(text, str):
|
||
text = unquote(text)
|
||
text = html.unescape(text)
|
||
try:
|
||
detected = chardet.detect(text.encode())
|
||
encoding = detected['encoding']
|
||
if encoding and encoding != 'utf-8':
|
||
text = text.encode(encoding).decode('utf-8', errors='ignore')
|
||
except Exception as e:
|
||
self.logger.error(f"Ошибка кодировки: {e}")
|
||
return text
|
||
|
||
def parse_datetime(self, dt_str):
|
||
"""Преобразует строку даты в aware datetime."""
|
||
try:
|
||
if isinstance(dt_str, datetime):
|
||
return timezone.make_aware(dt_str) if timezone.is_naive(dt_str) else dt_str
|
||
if dt_str:
|
||
return timezone.make_aware(datetime.strptime(dt_str, "%Y-%m-%d %H:%M:%S"))
|
||
except Exception as e:
|
||
self.logger.error(f"Ошибка парсинга даты: {e}")
|
||
return None
|
||
def url_parameters_parser(self, url_parameters):
|
||
"""
|
||
Парсит строку URL-параметров и возвращает словарь с ключами и значениями.
|
||
|
||
Пример входа:
|
||
url_parameters = "utm_medium=qr-3&utm_content=chisto-pozhit&utm_term=101"
|
||
|
||
Возвращает:
|
||
{
|
||
"utm_medium": "qr-3",
|
||
"utm_content": "chisto-pozhit",
|
||
"utm_term": "101"
|
||
}
|
||
"""
|
||
try:
|
||
if not url_parameters:
|
||
return {}
|
||
|
||
# Декодируем параметры URL
|
||
decoded_params = unquote(url_parameters)
|
||
parsed_params = parse_qs(decoded_params)
|
||
|
||
# Преобразуем список значений в строку
|
||
result = {key: value[0] for key, value in parsed_params.items()}
|
||
return result
|
||
|
||
except Exception as e:
|
||
self.logger.error(f"Ошибка парсинга URL-параметров: {e}")
|
||
return {}
|
||
|
||
class HotelRoomManager:
|
||
"""
|
||
Управляет созданием отелей и номеров.
|
||
"""
|
||
def __init__(self, logger):
|
||
self.logger = logger
|
||
|
||
def get_or_create_hotel(self, hotel_id, page_title):
|
||
"""
|
||
Создает или получает отель.
|
||
|
||
:param hotel_id: Значение из utm_content (индекс отеля)
|
||
:param page_title: Название отеля из поля page_title
|
||
"""
|
||
if not hotel_id:
|
||
self.logger.warning("Пропущено создание отеля: отсутствует hotel_id.")
|
||
return None
|
||
|
||
# Создаем или получаем отель с hotel_id и устанавливаем name по page_title
|
||
hotel, created = Hotel.objects.get_or_create(
|
||
hotel_id=hotel_id,
|
||
defaults={
|
||
"name": html.unescape(page_title) or f"Отель {hotel_id}",
|
||
"description": "Автоматически добавленный отель"
|
||
}
|
||
)
|
||
if created:
|
||
self.logger.info(f"Создан отель '{hotel.name}' с hotel_id: {hotel_id}")
|
||
else:
|
||
self.logger.info(f"Отель '{hotel.name}' уже существует с hotel_id: {hotel_id}")
|
||
return hotel
|
||
|
||
def get_or_create_room(self, hotel, room_number):
|
||
"""
|
||
Создает или получает номер отеля.
|
||
|
||
:param hotel: Экземпляр модели Hotel
|
||
:param room_number: Номер комнаты из utm_term
|
||
"""
|
||
if not hotel:
|
||
self.logger.warning("Пропущено создание номера: отсутствует отель.")
|
||
return None
|
||
|
||
if not room_number:
|
||
self.logger.warning(f"Пропущено создание номера: отсутствует room_number для отеля {hotel.name}.")
|
||
return None
|
||
|
||
# Генерация уникального external_id на основе hotel_id и room_number
|
||
external_id = f"{hotel.hotel_id}_{room_number}".lower()
|
||
|
||
# Создаем или получаем номер
|
||
room, created = Room.objects.get_or_create(
|
||
hotel=hotel,
|
||
number=room_number,
|
||
defaults={
|
||
"number": room_number, # Используем room_number как название номера
|
||
"external_id": external_id,
|
||
"description": "Автоматически добавленный номер"
|
||
}
|
||
)
|
||
|
||
if created:
|
||
self.logger.info(f"Создан номер '{room.number}' (external_id: {external_id}) в отеле '{hotel.name}'")
|
||
else:
|
||
self.logger.info(f"Номер '{room.number}' уже существует в отеле '{hotel.name}'")
|
||
|
||
return room
|
||
|
||
class DataSyncManager:
|
||
"""
|
||
Главный класс для синхронизации данных.
|
||
"""
|
||
def __init__(self, db_settings_id):
|
||
self.logger = self.setup_logger()
|
||
self.db_connector = DatabaseConnector(db_settings_id)
|
||
self.data_processor = DataProcessor(self.logger)
|
||
self.hotel_manager = HotelRoomManager(self.logger)
|
||
|
||
def setup_logger(self):
|
||
logger = logging.getLogger(__name__)
|
||
logger.setLevel(logging.DEBUG)
|
||
handler = logging.FileHandler("data_sync.log")
|
||
handler.setFormatter(logging.Formatter("%(asctime)s - %(levelname)s - %(message)s"))
|
||
logger.addHandler(handler)
|
||
return logger
|
||
|
||
def get_last_saved_record(self):
|
||
"""Получает ID последней записи."""
|
||
record = UserActivityLog.objects.order_by("-id").first()
|
||
return record.id if record else 0
|
||
|
||
def fetch_new_data(self, last_id):
|
||
"""Получает новые данные из БД."""
|
||
query = f"""
|
||
SELECT * FROM `{self.db_connector.db_settings.table_name}`
|
||
WHERE id > {last_id}
|
||
AND url_parameters IS NOT NULL
|
||
AND url_parameters LIKE '%utm_medium%'
|
||
AND page_url IS NOT NULL
|
||
ORDER BY id ASC
|
||
LIMIT 1000;
|
||
"""
|
||
self.logger.info(f"Запрос на получение новых данных отправлен. \n Содержание запроса: {query}")
|
||
return self.db_connector.execute_query(query)
|
||
|
||
def process_and_save_data(self, rows):
|
||
"""
|
||
Обрабатывает и сохраняет данные из внешней базы данных.
|
||
"""
|
||
for row in rows:
|
||
try:
|
||
# Декодирование URL-параметров
|
||
url_params = self.data_processor.decode_html_entities(row.get("url_parameters", ""))
|
||
params = self.data_processor.url_parameters_parser(url_params)
|
||
timestamp = params.get("timestamp")
|
||
date_time = params.get("date_time")
|
||
# Извлечение данных
|
||
hotel_id = params.get("utm_content")
|
||
room_number = params.get("utm_term")
|
||
page_title = row.get("page_title") # Название отеля из page_title
|
||
external_id = row.get("id")
|
||
|
||
# Создание отеля и комнаты
|
||
hotel = self.hotel_manager.get_or_create_hotel(hotel_id, page_title)
|
||
room = self.hotel_manager.get_or_create_room(hotel, room_number)
|
||
page_url = row.get("page_url")
|
||
# Заполнение записи
|
||
UserActivityLog.objects.update_or_create(
|
||
external_id=external_id,
|
||
defaults={
|
||
"user_id": row.get("user_id") or 0,
|
||
"timestamp": row.get("timestamp"),
|
||
"date_time": row.get("date_time"),
|
||
"ip": row.get("ip") or "0.0.0.0",
|
||
"created": self.data_processor.parse_datetime(row.get("created")) or timezone.now(),
|
||
"url_parameters": url_params,
|
||
"page_id": room.id if room else None,
|
||
"page_title": html.unescape(page_title),
|
||
"hits": row.get("hits") or 0,
|
||
"page_url": html.unescape(page_url),
|
||
}
|
||
)
|
||
self.logger.info(f"Запись ID {external_id} успешно обработана.")
|
||
except Exception as e:
|
||
self.logger.error(f"Ошибка при обработке записи ID {row.get('id')}: {e}")
|
||
|
||
|
||
|
||
def sync(self):
|
||
"""Запускает процесс синхронизации."""
|
||
self.db_connector.connect()
|
||
try:
|
||
last_id = self.get_last_saved_record()
|
||
rows = self.fetch_new_data(last_id)
|
||
self.process_and_save_data(rows)
|
||
self.logger.info("Синхронизация завершена.")
|
||
finally:
|
||
self.db_connector.close()
|
||
|
||
|
||
import logging
|
||
from concurrent.futures import ThreadPoolExecutor
|
||
from .models import ExternalDBSettings
|
||
|
||
logger = logging.getLogger(__name__)
|
||
logger.setLevel(logging.DEBUG)
|
||
def scheduled_sync():
|
||
"""
|
||
Планировщик синхронизации для всех активных подключений.
|
||
Каждое подключение обрабатывается отдельно.
|
||
"""
|
||
logger.info("Запуск планировщика синхронизации.")
|
||
|
||
# Получаем все активные настройки подключения
|
||
active_db_settings = ExternalDBSettings.objects.filter(is_active=True)
|
||
if not active_db_settings.exists():
|
||
logger.warning("Не найдено активных подключений для синхронизации.")
|
||
return
|
||
|
||
logger.info(f"Найдено активных подключений: {len(active_db_settings)}")
|
||
|
||
def sync_task(db_settings):
|
||
"""
|
||
Выполняет синхронизацию для одного подключения.
|
||
"""
|
||
try:
|
||
logger.info(f"Начало синхронизации для подключения: {db_settings.name} (ID={db_settings.id})")
|
||
sync_manager = DataSyncManager(db_settings.id)
|
||
sync_manager.sync()
|
||
logger.info(f"Синхронизация успешно завершена для подключения: {db_settings.name}")
|
||
except Exception as e:
|
||
logger.error(f"Ошибка синхронизации для подключения {db_settings.name}: {e}")
|
||
|
||
# Параллельное выполнение задач синхронизации
|
||
with ThreadPoolExecutor(max_workers=5) as executor: # Максимальное количество потоков = 5
|
||
for db_settings in active_db_settings:
|
||
executor.submit(sync_task, db_settings)
|
||
|
||
logger.info("Планировщик синхронизации завершил работу.")
|