Files
Touchh/antifroud/data_sync.py
2024-12-13 22:25:11 +09:00

244 lines
9.6 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import pymysql
from django.db import transaction
from django.utils import timezone
from datetime import datetime
from html import unescape
from urllib.parse import unquote, parse_qs
from .models import ExternalDBSettings, UserActivityLog, RoomDiscrepancy, ImportedHotel
from hotels.models import Reservation, Hotel
class DataSyncManager:
"""
Класс для управления загрузкой, записью и сверкой данных.
"""
def __init__(self, db_settings_id):
self.db_settings_id = db_settings_id
self.db_settings = None
self.connection = None
self.table_name = None
def connect_to_db(self):
"""
Устанавливает соединение с внешней базой данных и получает имя таблицы.
"""
try:
self.db_settings = ExternalDBSettings.objects.get(id=self.db_settings_id)
self.table_name = self.db_settings.table_name
self.connection = pymysql.connect(
host=self.db_settings.host,
port=self.db_settings.port,
user=self.db_settings.user,
password=self.db_settings.password,
database=self.db_settings.database,
charset='utf8mb4',
use_unicode=True
)
except ExternalDBSettings.DoesNotExist:
raise ValueError("Настройки подключения не найдены.")
except pymysql.MySQLError as e:
raise ConnectionError(f"Ошибка подключения к базе данных: {e}")
def fetch_data(self, limit=100):
"""
Загружает данные из указанной таблицы.
"""
if not self.connection:
self.connect_to_db()
cursor = self.connection.cursor()
try:
cursor.execute(f"SELECT * FROM `{self.table_name}` LIMIT {limit};")
columns = [desc[0] for desc in cursor.description]
rows = cursor.fetchall()
return {"columns": columns, "rows": rows}
except pymysql.MySQLError as e:
raise RuntimeError(f"Ошибка выполнения запроса: {e}")
finally:
cursor.close()
def parse_datetime(self, dt_str):
"""
Преобразует строку формата 'YYYY-MM-DD HH:MM:SS' или 'YYYY-MM-DDTHH:MM:SS' в aware datetime.
"""
if dt_str is None:
return None
if isinstance(dt_str, datetime):
if timezone.is_naive(dt_str):
return timezone.make_aware(dt_str, timezone.get_default_timezone())
return dt_str
for fmt in ("%Y-%m-%d %H:%M:%S", "%Y-%m-%dT%H:%M:%S"):
try:
naive_dt = datetime.strptime(dt_str, fmt)
return timezone.make_aware(naive_dt, timezone.get_default_timezone())
except ValueError:
continue
return None
def decode_html_entities(self, text):
"""
Раскодирует HTML-сущности в строке.
"""
if text and isinstance(text, str):
return unescape(text)
return text
def process_url_parameters(self, url_params):
"""
Парсит url_parameters, извлекает utm_content (имя отеля) и utm_term (ID отеля).
"""
if not url_params:
return {}
decoded = unquote(url_params)
qs = parse_qs(decoded)
hotel_name = qs.get('utm_content', [None])[0]
hotel_id = qs.get('utm_term', [None])[0]
return {
'hotel_name': hotel_name,
'hotel_id': hotel_id
}
def check_and_store_imported_hotel(self, hotel_name, hotel_id):
"""
Проверяет, есть ли отель с данным ID в основной БД.
Если hotel_id не число или отеля с таким ID нет, добавляет во временную таблицу ImportedHotel.
"""
if not hotel_id or not hotel_name:
return
# Проверим, что hotel_id — число
if hotel_id.isdigit():
hotel_id_int = int(hotel_id)
hotel_exists = Hotel.objects.filter(id=hotel_id_int).exists()
else:
# Если не число, считаем что отеля в основной БД нет
hotel_exists = False
if not hotel_exists:
ImportedHotel.objects.update_or_create(
external_id=str(hotel_id),
defaults={
'name': hotel_name
}
)
@transaction.atomic
def write_to_db(self, data):
"""
Записывает данные в UserActivityLog и при необходимости в ImportedHotel.
"""
for row in data["rows"]:
record = dict(zip(data["columns"], row))
external_id = record.get("id", None)
if external_id is not None:
external_id = str(external_id)
created = self.parse_datetime(record.get("created"))
date_time = self.parse_datetime(record.get("date_time"))
referred = self.decode_html_entities(record.get("referred", ""))
agent = self.decode_html_entities(record.get("agent", ""))
platform = self.decode_html_entities(record.get("platform", ""))
version = self.decode_html_entities(record.get("version", ""))
model = self.decode_html_entities(record.get("model", ""))
device = self.decode_html_entities(record.get("device", ""))
UAString = self.decode_html_entities(record.get("UAString", ""))
location = self.decode_html_entities(record.get("location", ""))
page_title = self.decode_html_entities(record.get("page_title", ""))
page_url = self.decode_html_entities(record.get("page_url", ""))
url_parameters = self.decode_html_entities(record.get("url_parameters", ""))
hotel_info = self.process_url_parameters(url_parameters)
if hotel_info.get('hotel_name') and hotel_info.get('hotel_id'):
self.check_and_store_imported_hotel(
hotel_name=hotel_info['hotel_name'],
hotel_id=hotel_info['hotel_id']
)
url_parameters = unquote(url_parameters)
UserActivityLog.objects.update_or_create(
external_id=external_id,
defaults={
"user_id": record.get("user_id"),
"ip": record.get("ip"),
"created": created,
"timestamp": record.get("timestamp"),
"date_time": date_time,
"referred": referred,
"agent": agent,
"platform": platform,
"version": version,
"model": model,
"device": device,
"UAString": UAString,
"location": location,
"page_id": record.get("page_id"),
"url_parameters": url_parameters,
"page_title": page_title,
"type": record.get("type"),
"last_counter": record.get("last_counter"),
"hits": record.get("hits"),
"honeypot": record.get("honeypot"),
"reply": record.get("reply"),
"page_url": page_url,
}
)
def reconcile_data(self):
"""
Сверяет данные таблицы user_activity_log с таблицей hotels.reservations
и записывает несоответствия в таблицу RoomDiscrepancy.
"""
discrepancies = []
reservations = Reservation.objects.values("hotel_id", "room_number", "check_in", "check_out")
for log in UserActivityLog.objects.all():
for reservation in reservations:
if (
log.page_id != reservation["room_number"] or
log.created.date() < reservation["check_in"] or
log.created.date() > reservation["check_out"]
):
discrepancies.append(RoomDiscrepancy(
hotel_id=reservation["hotel_id"],
room_number=log.page_id,
booking_id=f"Log-{log.id}",
check_in_date_expected=reservation["check_in"],
check_in_date_actual=log.created.date() if log.created else None,
discrepancy_type="Mismatch",
))
RoomDiscrepancy.objects.bulk_create(discrepancies)
def sync(self):
"""
Основной метод для загрузки, записи и сверки данных.
"""
try:
self.connect_to_db()
data = self.fetch_data()
self.write_to_db(data)
self.reconcile_data()
except Exception as e:
raise RuntimeError(f"Ошибка синхронизации данных: {e}")
finally:
if self.connection:
self.connection.close()
def scheduled_sync():
"""
Плановая задача для синхронизации данных.
"""
db_settings_list = ExternalDBSettings.objects.filter(is_active=True)
for db_settings in db_settings_list:
sync_manager = DataSyncManager(db_settings.id)
sync_manager.sync()