420 lines
20 KiB
Python
420 lines
20 KiB
Python
import logging
|
||
import pymysql
|
||
from datetime import datetime
|
||
from urllib.parse import unquote, parse_qs
|
||
import pytz
|
||
from django.utils import timezone
|
||
from django.db import transaction
|
||
from django.conf import settings
|
||
from html import unescape
|
||
import chardet
|
||
import html
|
||
from .models import SyncLog, ExternalDBSettings, UserActivityLog, RoomDiscrepancy, ImportedHotel
|
||
from hotels.models import Reservation, Hotel
|
||
|
||
class DataSyncManager:
|
||
"""
|
||
Класс для управления загрузкой, записью и сверкой данных.
|
||
"""
|
||
|
||
def __init__(self, db_settings_id, use_local_db=False):
|
||
self.db_settings_id = db_settings_id
|
||
self.use_local_db = use_local_db # Если True, используем локальную БД
|
||
self.db_settings = None
|
||
self.connection = None
|
||
self.table_name = None
|
||
|
||
# Настройка логирования
|
||
self.logger = logging.getLogger(__name__)
|
||
self.logger.setLevel(logging.DEBUG)
|
||
handler = logging.FileHandler('data_sync.log')
|
||
handler.setLevel(logging.DEBUG)
|
||
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
|
||
handler.setFormatter(formatter)
|
||
self.logger.addHandler(handler)
|
||
|
||
def connect_to_db(self):
|
||
"""
|
||
Устанавливает соединение с БД в зависимости от настройки.
|
||
"""
|
||
try:
|
||
if self.use_local_db:
|
||
# Подключаемся к локальной базе данных
|
||
self.db_settings = settings.LocalDataBase.objects.first() # Получаем настройки первой базы
|
||
self.table_name = self.db_settings.database
|
||
self.connection = pymysql.connect(
|
||
host=self.db_settings.host,
|
||
port=self.db_settings.port,
|
||
user=self.db_settings.user,
|
||
password=self.db_settings.password,
|
||
database=self.db_settings.database,
|
||
charset='utf8mb4',
|
||
use_unicode=True,
|
||
cursorclass=pymysql.cursors.DictCursor,
|
||
)
|
||
else:
|
||
# Подключаемся к внешней базе данных
|
||
self.db_settings = ExternalDBSettings.objects.get(id=self.db_settings_id)
|
||
self.table_name = self.db_settings.table_name
|
||
self.connection = pymysql.connect(
|
||
host=self.db_settings.host,
|
||
port=self.db_settings.port,
|
||
user=self.db_settings.user,
|
||
password=self.db_settings.password,
|
||
database=self.db_settings.database,
|
||
charset='utf8mb4',
|
||
use_unicode=True,
|
||
cursorclass=pymysql.cursors.DictCursor,
|
||
)
|
||
|
||
except ExternalDBSettings.DoesNotExist:
|
||
raise ValueError("Настройки подключения не найдены.")
|
||
except pymysql.MySQLError as e:
|
||
raise ConnectionError(f"Ошибка подключения к базе данных: {e}")
|
||
def get_last_saved_record(self):
|
||
"""
|
||
Получает последнюю запись из таблицы UserActivityLog.
|
||
"""
|
||
last_record = UserActivityLog.objects.order_by('-id').first()
|
||
if last_record:
|
||
self.logger.info(f"Последняя запись в UserActivityLog: ID={last_record.id}")
|
||
return last_record.id
|
||
self.logger.info("Таблица UserActivityLog пуста.")
|
||
return None
|
||
|
||
def fetch_new_data(self, last_id=0, limit=100):
|
||
"""
|
||
Загружает новые записи из указанной таблицы, которые идут после last_id.
|
||
"""
|
||
if not self.connection:
|
||
self.connect_to_db()
|
||
|
||
cursor = self.connection.cursor(pymysql.cursors.DictCursor) # Используем DictCursor для получения словарей
|
||
try:
|
||
# Формируем SQL-запрос
|
||
if last_id:
|
||
query = f"""
|
||
SELECT * FROM `{self.table_name}`
|
||
WHERE id > {last_id} AND url_parameters IS NOT NULL AND url_parameters != ''
|
||
ORDER BY id ASC
|
||
LIMIT {limit};
|
||
"""
|
||
else:
|
||
query = f"""
|
||
SELECT * FROM `{self.table_name}`
|
||
WHERE url_parameters IS NOT NULL AND url_parameters != ''
|
||
ORDER BY id ASC
|
||
LIMIT {limit};
|
||
"""
|
||
|
||
self.logger.info(f"Выполняется запрос: {query}")
|
||
cursor.execute(query)
|
||
|
||
# Получаем результаты
|
||
rows = cursor.fetchall()
|
||
if not rows:
|
||
self.logger.info("Нет данных для загрузки.")
|
||
return {"columns": [], "rows": []}
|
||
|
||
# Получаем названия колонок
|
||
columns = rows[0].keys() if rows else []
|
||
return {"columns": list(columns), "rows": rows}
|
||
|
||
except pymysql.MySQLError as e:
|
||
self.logger.error(f"Ошибка выполнения запроса: {e}")
|
||
return {"columns": [], "rows": []}
|
||
finally:
|
||
cursor.close()
|
||
|
||
|
||
def parse_datetime(self, dt_str, hotel_timezone=None):
|
||
"""
|
||
Преобразует строку формата 'YYYY-MM-DD HH:MM:SS' или 'YYYY-MM-DDTHH:MM:SS' в aware datetime.
|
||
Преобразует время в часовой пояс отеля и корректирует на часовой пояс сервера.
|
||
"""
|
||
if dt_str is None:
|
||
return None
|
||
|
||
if isinstance(dt_str, datetime):
|
||
if timezone.is_naive(dt_str):
|
||
return timezone.make_aware(dt_str, timezone.get_default_timezone())
|
||
return dt_str
|
||
|
||
for fmt in ("%Y-%m-%d %H:%M:%S", "%Y-%m-%dT%H:%M:%S"):
|
||
try:
|
||
# Преобразуем строку в naive datetime
|
||
naive_dt = datetime.strptime(dt_str, fmt)
|
||
|
||
# Если передан часовой пояс отеля
|
||
if hotel_timezone:
|
||
# Переводим время из часового пояса отеля в UTC (если данные из PMS уже UTC+X)
|
||
tz = pytz.timezone(hotel_timezone) # Часовой пояс отеля
|
||
aware_dt = tz.localize(naive_dt) # Локализуем в часовой пояс отеля
|
||
|
||
# Теперь приводим время к серверному часовому поясу (например, Московское время)
|
||
server_tz = timezone.get_default_timezone() # Например, Moscow (UTC+3)
|
||
return aware_dt.astimezone(server_tz) # Переводим в серверное время
|
||
|
||
# Если часовой пояс отеля не передан, используем серверный часовой пояс по умолчанию
|
||
return timezone.make_aware(naive_dt, timezone.get_default_timezone())
|
||
|
||
except ValueError:
|
||
continue
|
||
|
||
return None
|
||
|
||
def decode_html_entities(self, text):
|
||
"""
|
||
Декодирует URL и HTML-сущности в строке.
|
||
Пытается автоматически декодировать строку в правильную кодировку.
|
||
"""
|
||
if text and isinstance(text, str):
|
||
text = unquote(text) # Декодируем URL
|
||
text = html.unescape(text) # Расшифровываем HTML сущности
|
||
|
||
# Попробуем определить кодировку и привести строку к utf-8
|
||
try:
|
||
detected = chardet.detect(text.encode())
|
||
encoding = detected['encoding']
|
||
if encoding and encoding != 'utf-8':
|
||
text = text.encode(encoding).decode('utf-8', errors='ignore')
|
||
except Exception as e:
|
||
self.logger.error(f"Ошибка при обработке кодировки: {e}")
|
||
|
||
return text
|
||
return text
|
||
|
||
def process_url_parameters(self, url_params):
|
||
"""
|
||
Парсит url_parameters и возвращает hotel_name и hotel_id.
|
||
"""
|
||
# Проверка корректности данных
|
||
if not url_params or not isinstance(url_params, str):
|
||
self.logger.error(f"Ошибка: некорректные url_parameters: {url_params}")
|
||
return {}
|
||
|
||
# Декодируем параметры URL
|
||
decoded = unquote(url_params)
|
||
qs = parse_qs(decoded)
|
||
|
||
# Извлекаем hotel_name и hotel_id_term
|
||
hotel_name = qs.get('utm_content', [None])[0] # Умолчание: None, если параметр не найден
|
||
hotel_id_term = qs.get('utm_term', [None])[0] # Умолчание: None, если параметр не найден
|
||
|
||
# Формируем hotel_id
|
||
hotel_id = f"{hotel_name}_{hotel_id_term}" if hotel_name and hotel_id_term else None
|
||
|
||
# Логирование для отладки
|
||
self.logger.debug(f"Извлечено из url_parameters: hotel_name={hotel_name}, hotel_id_term={hotel_id_term}, hotel_id={hotel_id}")
|
||
|
||
# Возврат результата
|
||
return {
|
||
'hotel_name': hotel_name,
|
||
'hotel_id': hotel_id
|
||
}
|
||
|
||
|
||
def check_and_store_imported_hotel(self, hotel_name, hotel_id):
|
||
"""
|
||
Проверяет, есть ли отель с данным ID в таблице ImportedHotel.
|
||
Если отеля с таким external_id нет, добавляет новый в таблицы ImportedHotel и Hotel.
|
||
"""
|
||
if not hotel_id or not hotel_name:
|
||
return None
|
||
|
||
# Генерация external_id в формате 'hotel_name_hotel_id'
|
||
external_id = f"{hotel_name}_{hotel_id}"
|
||
|
||
# Проверяем, существует ли запись с таким external_id в ImportedHotel
|
||
existing_hotel = ImportedHotel.objects.filter(external_id=external_id).first()
|
||
|
||
if existing_hotel:
|
||
self.logger.info(f"Отель с external_id {external_id} уже существует в ImportedHotel.")
|
||
else:
|
||
try:
|
||
# Создаем новую запись в ImportedHotel
|
||
with transaction.atomic():
|
||
imported_hotel = ImportedHotel.objects.create(
|
||
external_id=external_id,
|
||
name=hotel_name,
|
||
display_name=hotel_name,
|
||
imported=True # Отмечаем, что отель импортирован
|
||
)
|
||
self.logger.info(f"Отель с external_id {external_id} добавлен в ImportedHotel.")
|
||
|
||
# Создаем новый отель в основной таблице Hotel
|
||
hotel = Hotel.objects.create(
|
||
hotel_id=external_id,
|
||
name=hotel_name,
|
||
phone=None,
|
||
email=None,
|
||
address=None,
|
||
city=None,
|
||
timezone="UTC",
|
||
description="Автоматически импортированный отель",
|
||
|
||
)
|
||
self.logger.info(f"Отель с hotel_id {external_id} добавлен в Hotel с флагом is_imported=True.")
|
||
return hotel
|
||
except Exception as e:
|
||
self.logger.error(f"Ошибка при добавлении отеля {hotel_name} с external_id {external_id}: {e}")
|
||
return None
|
||
|
||
return existing_hotel
|
||
|
||
def write_to_db(self, data):
|
||
"""
|
||
Записывает данные в UserActivityLog и при необходимости в ImportedHotel.
|
||
Записывает лог синхронизации в SyncLog.
|
||
"""
|
||
processed_records = 0
|
||
received_records = len(data["rows"])
|
||
|
||
print(f"Received records: {received_records}")
|
||
|
||
for row in data["rows"]:
|
||
print(f'\n------\n row: {row}\n------\n')
|
||
# record = dict(zip(data["columns"], row)) # Преобразуем строку в словарь
|
||
# Получаем url_parameters из записи
|
||
url_parameters = self.decode_html_entities(row.get("url_parameters", ""))
|
||
print(f'\n------\n url_parameters: {url_parameters}\n------\n')
|
||
|
||
# Проверка на пустое значение
|
||
if not url_parameters:
|
||
print(f"Error: url_parameters is empty in record {row}")
|
||
continue # Пропускаем запись, если url_parameters отсутствует
|
||
|
||
# Пытаемся извлечь информацию о отеле из url_parameters
|
||
hotel_name = None
|
||
hotel_id = None
|
||
|
||
if url_parameters:
|
||
hotel_info = self.process_url_parameters(url_parameters)
|
||
hotel_id = hotel_info.get('hotel_id')
|
||
|
||
if not hotel_id:
|
||
print(f"Error: hotel_id is empty in record {row}")
|
||
continue # Пропускаем запись, если hotel_id отсутствует
|
||
|
||
# Проверяем, существует ли отель с таким hotel_id
|
||
hotel = self.check_and_store_imported_hotel(hotel_name=hotel_id, hotel_id=hotel_id)
|
||
|
||
if not hotel:
|
||
print(f"Error: Could not find or create hotel for hotel_id {hotel_id}")
|
||
continue # Пропускаем запись, если отель не найден или не создан
|
||
|
||
# Преобразуем дату
|
||
created = self.parse_datetime(row.get("created"))
|
||
date_time = self.parse_datetime(row.get("date_time"))
|
||
|
||
# Декодируем все строки, которые могут содержать HTML-сущности
|
||
referred = self.decode_html_entities(row.get("referred", ""))
|
||
agent = self.decode_html_entities(row.get("agent", ""))
|
||
platform = self.decode_html_entities(row.get("platform", ""))
|
||
version = self.decode_html_entities(row.get("version", ""))
|
||
model = self.decode_html_entities(row.get("model", ""))
|
||
device = self.decode_html_entities(row.get("device", ""))
|
||
UAString = self.decode_html_entities(row.get("UAString", ""))
|
||
location = self.decode_html_entities(row.get("location", ""))
|
||
page_title = self.decode_html_entities(row.get("page_title", ""))
|
||
page_url = self.decode_html_entities(row.get("page_url", ""))
|
||
|
||
# Запись в UserActivityLog
|
||
UserActivityLog.objects.update_or_create(
|
||
external_id=row.get("id", None),
|
||
defaults={
|
||
"user_id": row.get("user_id"),
|
||
"ip": row.get("ip"),
|
||
"created": created,
|
||
"timestamp": row.get("timestamp"),
|
||
"date_time": date_time,
|
||
"referred": referred,
|
||
"agent": agent,
|
||
"platform": platform,
|
||
"version": version,
|
||
"model": model,
|
||
"device": device,
|
||
"UAString": UAString,
|
||
"location": location,
|
||
"page_id": row.get("page_id"),
|
||
"url_parameters": url_parameters,
|
||
"page_title": page_title,
|
||
"type": row.get("type"),
|
||
"last_counter": row.get("last_counter"),
|
||
"hits": row.get("hits"),
|
||
"honeypot": row.get("honeypot"),
|
||
"reply": row.get("reply"),
|
||
"page_url": page_url,
|
||
}
|
||
)
|
||
|
||
processed_records += 1
|
||
|
||
# Логируем обработанные записи
|
||
print(f"Processed records: {processed_records}")
|
||
|
||
|
||
def reconcile_data(self):
|
||
"""
|
||
Сверяет данные таблицы user_activity_log с таблицей hotels.reservations
|
||
и записывает несоответствия в таблицу RoomDiscrepancy.
|
||
"""
|
||
discrepancies = []
|
||
reservations = Reservation.objects.values("hotel_id", "room_number", "check_in", "check_out")
|
||
|
||
for log in UserActivityLog.objects.all():
|
||
for reservation in reservations:
|
||
if (
|
||
log.page_id != reservation["room_number"] or
|
||
log.created.date() < reservation["check_in"] or
|
||
log.created.date() > reservation["check_out"]
|
||
):
|
||
discrepancies.append(RoomDiscrepancy(
|
||
hotel_id=reservation["hotel_id"],
|
||
room_number=log.page_id,
|
||
booking_id=f"Log-{log.id}",
|
||
check_in_date_expected=reservation["check_in"],
|
||
check_in_date_actual=log.created.date() if log.created else None,
|
||
discrepancy_type="Mismatch",
|
||
))
|
||
|
||
RoomDiscrepancy.objects.bulk_create(discrepancies)
|
||
|
||
def sync(self):
|
||
try:
|
||
last_id = self.get_last_saved_record()
|
||
self.logger.info(f"Синхронизация начата. Последний сохранённый ID: {last_id}")
|
||
|
||
# Загружаем новые данные
|
||
data = self.fetch_new_data(last_id=last_id)
|
||
print(f'\n------\n data: {data}\n------\n')
|
||
if not data["rows"]:
|
||
self.logger.info("Нет новых данных для синхронизации.")
|
||
return
|
||
|
||
# Логирование типов данных
|
||
self.logger.debug(f"Тип первой строки: {type(data['rows'][0])}")
|
||
|
||
first_row_id = data["rows"][0]["id"]
|
||
if last_id is not None and first_row_id <= last_id:
|
||
self.logger.info(f"Нет новых записей для синхронизации. Последний ID: {last_id}, первый ID из внешней таблицы: {first_row_id}.")
|
||
return
|
||
|
||
processed_records = self.write_to_db(data)
|
||
self.logger.info(f"Синхронизация завершена. Обработано записей: {processed_records}")
|
||
except Exception as e:
|
||
self.logger.error(f"Ошибка синхронизации данных: {e}")
|
||
raise RuntimeError(f"Ошибка синхронизации данных: {e}")
|
||
finally:
|
||
if self.connection:
|
||
self.connection.close()
|
||
|
||
def scheduled_sync():
|
||
"""
|
||
Плановая задача для синхронизации данных.
|
||
"""
|
||
db_settings_list = ExternalDBSettings.objects.filter(is_active=True)
|
||
for db_settings in db_settings_list:
|
||
sync_manager = DataSyncManager(db_settings.id)
|
||
sync_manager.sync()
|