Files
Touchh/antifroud/data_sync.py
2024-12-17 11:49:26 +09:00

420 lines
20 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import logging
import pymysql
from datetime import datetime
from urllib.parse import unquote, parse_qs
import pytz
from django.utils import timezone
from django.db import transaction
from django.conf import settings
from html import unescape
import chardet
import html
from .models import SyncLog, ExternalDBSettings, UserActivityLog, RoomDiscrepancy, ImportedHotel
from hotels.models import Reservation, Hotel
class DataSyncManager:
"""
Класс для управления загрузкой, записью и сверкой данных.
"""
def __init__(self, db_settings_id, use_local_db=False):
self.db_settings_id = db_settings_id
self.use_local_db = use_local_db # Если True, используем локальную БД
self.db_settings = None
self.connection = None
self.table_name = None
# Настройка логирования
self.logger = logging.getLogger(__name__)
self.logger.setLevel(logging.WARNING)
handler = logging.FileHandler('data_sync.log')
handler.setLevel(logging.DEBUG)
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter)
self.logger.addHandler(handler)
def connect_to_db(self):
"""
Устанавливает соединение с БД в зависимости от настройки.
"""
try:
if self.use_local_db:
# Подключаемся к локальной базе данных
self.db_settings = settings.LocalDataBase.objects.first() # Получаем настройки первой базы
self.table_name = self.db_settings.database
self.connection = pymysql.connect(
host=self.db_settings.host,
port=self.db_settings.port,
user=self.db_settings.user,
password=self.db_settings.password,
database=self.db_settings.database,
charset='utf8mb4',
use_unicode=True,
cursorclass=pymysql.cursors.DictCursor,
)
else:
# Подключаемся к внешней базе данных
self.db_settings = ExternalDBSettings.objects.get(id=self.db_settings_id)
self.table_name = self.db_settings.table_name
self.connection = pymysql.connect(
host=self.db_settings.host,
port=self.db_settings.port,
user=self.db_settings.user,
password=self.db_settings.password,
database=self.db_settings.database,
charset='utf8mb4',
use_unicode=True,
cursorclass=pymysql.cursors.DictCursor,
)
except ExternalDBSettings.DoesNotExist:
raise ValueError("Настройки подключения не найдены.")
except pymysql.MySQLError as e:
raise ConnectionError(f"Ошибка подключения к базе данных: {e}")
def get_last_saved_record(self):
"""
Получает последнюю запись из таблицы UserActivityLog.
"""
last_record = UserActivityLog.objects.order_by('-id').first()
if last_record:
self.logger.info(f"Последняя запись в UserActivityLog: ID={last_record.id}")
return last_record.id
self.logger.info("Таблица UserActivityLog пуста.")
return None
def fetch_new_data(self, last_id=0, limit=100):
"""
Загружает новые записи из указанной таблицы, которые идут после last_id.
"""
if not self.connection:
self.connect_to_db()
cursor = self.connection.cursor(pymysql.cursors.DictCursor) # Используем DictCursor для получения словарей
try:
# Формируем SQL-запрос
if last_id:
query = f"""
SELECT * FROM `{self.table_name}`
WHERE id > {last_id} AND url_parameters IS NOT NULL AND url_parameters != ''
ORDER BY id ASC
LIMIT {limit};
"""
else:
query = f"""
SELECT * FROM `{self.table_name}`
WHERE url_parameters IS NOT NULL AND url_parameters != ''
ORDER BY id ASC
LIMIT {limit};
"""
self.logger.info(f"Выполняется запрос: {query}")
cursor.execute(query)
# Получаем результаты
rows = cursor.fetchall()
if not rows:
self.logger.info("Нет данных для загрузки.")
return {"columns": [], "rows": []}
# Получаем названия колонок
columns = rows[0].keys() if rows else []
return {"columns": list(columns), "rows": rows}
except pymysql.MySQLError as e:
self.logger.error(f"Ошибка выполнения запроса: {e}")
return {"columns": [], "rows": []}
finally:
cursor.close()
def parse_datetime(self, dt_str, hotel_timezone=None):
"""
Преобразует строку формата 'YYYY-MM-DD HH:MM:SS' или 'YYYY-MM-DDTHH:MM:SS' в aware datetime.
Преобразует время в часовой пояс отеля и корректирует на часовой пояс сервера.
"""
if dt_str is None:
return None
if isinstance(dt_str, datetime):
if timezone.is_naive(dt_str):
return timezone.make_aware(dt_str, timezone.get_default_timezone())
return dt_str
for fmt in ("%Y-%m-%d %H:%M:%S", "%Y-%m-%dT%H:%M:%S"):
try:
# Преобразуем строку в naive datetime
naive_dt = datetime.strptime(dt_str, fmt)
# Если передан часовой пояс отеля
if hotel_timezone:
# Переводим время из часового пояса отеля в UTC (если данные из PMS уже UTC+X)
tz = pytz.timezone(hotel_timezone) # Часовой пояс отеля
aware_dt = tz.localize(naive_dt) # Локализуем в часовой пояс отеля
# Теперь приводим время к серверному часовому поясу (например, Московское время)
server_tz = timezone.get_default_timezone() # Например, Moscow (UTC+3)
return aware_dt.astimezone(server_tz) # Переводим в серверное время
# Если часовой пояс отеля не передан, используем серверный часовой пояс по умолчанию
return timezone.make_aware(naive_dt, timezone.get_default_timezone())
except ValueError:
continue
return None
def decode_html_entities(self, text):
"""
Декодирует URL и HTML-сущности в строке.
Пытается автоматически декодировать строку в правильную кодировку.
"""
if text and isinstance(text, str):
text = unquote(text) # Декодируем URL
text = html.unescape(text) # Расшифровываем HTML сущности
# Попробуем определить кодировку и привести строку к utf-8
try:
detected = chardet.detect(text.encode())
encoding = detected['encoding']
if encoding and encoding != 'utf-8':
text = text.encode(encoding).decode('utf-8', errors='ignore')
except Exception as e:
self.logger.error(f"Ошибка при обработке кодировки: {e}")
return text
return text
def process_url_parameters(self, url_params):
"""
Парсит url_parameters и возвращает hotel_name и hotel_id.
"""
# Проверка корректности данных
if not url_params or not isinstance(url_params, str):
self.logger.error(f"Ошибка: некорректные url_parameters: {url_params}")
return {}
# Декодируем параметры URL
decoded = unquote(url_params)
qs = parse_qs(decoded)
# Извлекаем hotel_name и hotel_id_term
hotel_name = qs.get('utm_content', [None])[0] # Умолчание: None, если параметр не найден
hotel_id_term = qs.get('utm_term', [None])[0] # Умолчание: None, если параметр не найден
# Формируем hotel_id
hotel_id = f"{hotel_name}_{hotel_id_term}" if hotel_name and hotel_id_term else None
# Логирование для отладки
self.logger.debug(f"Извлечено из url_parameters: hotel_name={hotel_name}, hotel_id_term={hotel_id_term}, hotel_id={hotel_id}")
# Возврат результата
return {
'hotel_name': hotel_name,
'hotel_id': hotel_id
}
def check_and_store_imported_hotel(self, hotel_name, hotel_id):
"""
Проверяет, есть ли отель с данным ID в таблице ImportedHotel.
Если отеля с таким external_id нет, добавляет новый в таблицы ImportedHotel и Hotel.
"""
if not hotel_id or not hotel_name:
return None
# Генерация external_id в формате 'hotel_name_hotel_id'
external_id = f"{hotel_name}"
# Проверяем, существует ли запись с таким external_id в ImportedHotel
existing_hotel = ImportedHotel.objects.filter(external_id=external_id).first()
if existing_hotel:
self.logger.info(f"Отель с external_id {external_id} уже существует в ImportedHotel.")
else:
try:
# Создаем новую запись в ImportedHotel
with transaction.atomic():
imported_hotel = ImportedHotel.objects.create(
external_id=external_id,
name=hotel_name,
display_name=hotel_name,
imported=True # Отмечаем, что отель импортирован
)
self.logger.info(f"Отель с external_id {external_id} добавлен в ImportedHotel.")
# Создаем новый отель в основной таблице Hotel
hotel = Hotel.objects.create(
hotel_id=external_id,
name=hotel_name,
phone=None,
email=None,
address=None,
city=None,
timezone="UTC",
description="Автоматически импортированный отель",
)
self.logger.info(f"Отель с hotel_id {external_id} добавлен в Hotel с флагом is_imported=True.")
return hotel
except Exception as e:
self.logger.error(f"Ошибка при добавлении отеля {hotel_name} с external_id {external_id}: {e}")
return None
return existing_hotel
def write_to_db(self, data):
"""
Записывает данные в UserActivityLog и при необходимости в ImportedHotel.
Записывает лог синхронизации в SyncLog.
"""
processed_records = 0
received_records = len(data["rows"])
print(f"Received records: {received_records}")
for row in data["rows"]:
print(f'\n------\n row: {row}\n------\n')
# record = dict(zip(data["columns"], row)) # Преобразуем строку в словарь
# Получаем url_parameters из записи
url_parameters = self.decode_html_entities(row.get("url_parameters", ""))
print(f'\n------\n url_parameters: {url_parameters}\n------\n')
# Проверка на пустое значение
if not url_parameters:
print(f"Error: url_parameters is empty in record {row}")
continue # Пропускаем запись, если url_parameters отсутствует
# Пытаемся извлечь информацию о отеле из url_parameters
hotel_name = None
hotel_id = None
if url_parameters:
hotel_info = self.process_url_parameters(url_parameters)
hotel_id = hotel_info.get('hotel_id')
if not hotel_id:
print(f"Error: hotel_id is empty in record {row}")
continue # Пропускаем запись, если hotel_id отсутствует
# Проверяем, существует ли отель с таким hotel_id
hotel = self.check_and_store_imported_hotel(hotel_name=hotel_id, hotel_id=hotel_id)
if not hotel:
print(f"Error: Could not find or create hotel for hotel_id {hotel_id}")
continue # Пропускаем запись, если отель не найден или не создан
# Преобразуем дату
created = self.parse_datetime(row.get("created"))
date_time = self.parse_datetime(row.get("date_time"))
# Декодируем все строки, которые могут содержать HTML-сущности
referred = self.decode_html_entities(row.get("referred", ""))
agent = self.decode_html_entities(row.get("agent", ""))
platform = self.decode_html_entities(row.get("platform", ""))
version = self.decode_html_entities(row.get("version", ""))
model = self.decode_html_entities(row.get("model", ""))
device = self.decode_html_entities(row.get("device", ""))
UAString = self.decode_html_entities(row.get("UAString", ""))
location = self.decode_html_entities(row.get("location", ""))
page_title = self.decode_html_entities(row.get("page_title", ""))
page_url = self.decode_html_entities(row.get("page_url", ""))
# Запись в UserActivityLog
UserActivityLog.objects.update_or_create(
external_id=row.get("id", None),
defaults={
"user_id": row.get("user_id"),
"ip": row.get("ip"),
"created": created,
"timestamp": row.get("timestamp"),
"date_time": date_time,
"referred": referred,
"agent": agent,
"platform": platform,
"version": version,
"model": model,
"device": device,
"UAString": UAString,
"location": location,
"page_id": row.get("page_id"),
"url_parameters": url_parameters,
"page_title": page_title,
"type": row.get("type"),
"last_counter": row.get("last_counter"),
"hits": row.get("hits"),
"honeypot": row.get("honeypot"),
"reply": row.get("reply"),
"page_url": page_url,
}
)
processed_records += 1
# Логируем обработанные записи
print(f"Processed records: {processed_records}")
def reconcile_data(self):
"""
Сверяет данные таблицы user_activity_log с таблицей hotels.reservations
и записывает несоответствия в таблицу RoomDiscrepancy.
"""
discrepancies = []
reservations = Reservation.objects.values("hotel_id", "room_number", "check_in", "check_out")
for log in UserActivityLog.objects.all():
for reservation in reservations:
if (
log.page_id != reservation["room_number"] or
log.created.date() < reservation["check_in"] or
log.created.date() > reservation["check_out"]
):
discrepancies.append(RoomDiscrepancy(
hotel_id=reservation["hotel_id"],
room_number=log.page_id,
booking_id=f"Log-{log.id}",
check_in_date_expected=reservation["check_in"],
check_in_date_actual=log.created.date() if log.created else None,
discrepancy_type="Mismatch",
))
RoomDiscrepancy.objects.bulk_create(discrepancies)
def sync(self):
try:
last_id = self.get_last_saved_record()
self.logger.info(f"Синхронизация начата. Последний сохранённый ID: {last_id}")
# Загружаем новые данные
data = self.fetch_new_data(last_id=last_id)
print(f'\n------\n data: {data}\n------\n')
if not data["rows"]:
self.logger.info("Нет новых данных для синхронизации.")
return
# Логирование типов данных
self.logger.debug(f"Тип первой строки: {type(data['rows'][0])}")
first_row_id = data["rows"][0]["id"]
if last_id is not None and first_row_id <= last_id:
self.logger.info(f"Нет новых записей для синхронизации. Последний ID: {last_id}, первый ID из внешней таблицы: {first_row_id}.")
return
processed_records = self.write_to_db(data)
self.logger.info(f"Синхронизация завершена. Обработано записей: {processed_records}")
except Exception as e:
self.logger.error(f"Ошибка синхронизации данных: {e}")
raise RuntimeError(f"Ошибка синхронизации данных: {e}")
finally:
if self.connection:
self.connection.close()
def scheduled_sync():
"""
Плановая задача для синхронизации данных.
"""
db_settings_list = ExternalDBSettings.objects.filter(is_active=True)
for db_settings in db_settings_list:
sync_manager = DataSyncManager(db_settings.id)
sync_manager.sync()