geoip intgration
This commit is contained in:
@@ -6,10 +6,11 @@ from urllib.parse import unquote, parse_qs
|
||||
from django.utils import timezone
|
||||
import html
|
||||
from hotels.models import Room, Hotel
|
||||
from .models import UserActivityLog, ExternalDBSettings
|
||||
from .models import UserActivityLog, ExternalDBSettings, SyncLog
|
||||
from touchh.utils.log import CustomLogger
|
||||
from concurrent.futures import ThreadPoolExecutor, TimeoutError
|
||||
from decouple import config
|
||||
from django.db.models import F
|
||||
|
||||
class DatabaseConnector:
|
||||
def __init__(self, db_settings_id):
|
||||
@@ -40,8 +41,8 @@ class DatabaseConnector:
|
||||
|
||||
def connect(self):
|
||||
try:
|
||||
self.logger.info(f"Connecting to DB with settings: {self.db_settings}")
|
||||
self.connection = pymysql.connect(
|
||||
|
||||
host=self.db_settings["host"],
|
||||
port=self.db_settings["port"],
|
||||
user=self.db_settings["user"],
|
||||
@@ -78,9 +79,10 @@ class DataProcessor:
|
||||
self.logger = logger
|
||||
|
||||
def decode_html_entities(self, text):
|
||||
if text and isinstance(text, str):
|
||||
return html.unescape(unquote(text))
|
||||
return text
|
||||
if not text:
|
||||
self.logger.warning("Empty text received for decoding HTML entities.")
|
||||
return html.unescape(unquote(text)) if text else text
|
||||
|
||||
|
||||
def parse_datetime(self, dt_str):
|
||||
try:
|
||||
@@ -92,7 +94,13 @@ class DataProcessor:
|
||||
self.logger.error(f"Datetime parsing error: {e}")
|
||||
return None
|
||||
|
||||
def url_parameters_parser(self, url_parameters):
|
||||
def parse_url_parameters(self, url_parameters):
|
||||
"""
|
||||
Парсит строку URL-параметров в словарь.
|
||||
|
||||
:param url_parameters: Строка с URL-параметрами.
|
||||
:return: Словарь с распарсенными параметрами.
|
||||
"""
|
||||
try:
|
||||
if not url_parameters:
|
||||
return {}
|
||||
@@ -100,7 +108,7 @@ class DataProcessor:
|
||||
parsed_params = parse_qs(decoded_params)
|
||||
return {key: value[0] for key, value in parsed_params.items()}
|
||||
except Exception as e:
|
||||
self.logger.error(f"URL parameters parsing error: {e}")
|
||||
self.logger.error(f"Error parsing URL parameters: {e}")
|
||||
return {}
|
||||
|
||||
|
||||
@@ -135,24 +143,26 @@ class HotelRoomManager:
|
||||
self.logger.warning(f"Room creation skipped: missing room_number for hotel {hotel.name}.")
|
||||
return None
|
||||
|
||||
external_id = f"{hotel.hotel_id}_{room_number}".lower()
|
||||
try:
|
||||
# Проверяем существование комнаты
|
||||
room = Room.objects.filter(hotel=hotel, number=room_number).first()
|
||||
if room:
|
||||
self.logger.info(f"Room '{room_number}' already exists in hotel '{hotel.name}'.")
|
||||
return room
|
||||
|
||||
room, created = Room.objects.get_or_create(
|
||||
hotel=hotel,
|
||||
number=room_number,
|
||||
defaults={
|
||||
"external_id": external_id,
|
||||
"description": "Автоматически созданная комната",
|
||||
}
|
||||
)
|
||||
|
||||
if created:
|
||||
self.logger.info(f"Room '{room.number}' (external_id: {external_id}) created in hotel '{hotel.name}'")
|
||||
else:
|
||||
self.logger.info(f"Room '{room.number}' already exists in hotel '{hotel.name}'")
|
||||
|
||||
return room
|
||||
# Создаем комнату, если она не найдена
|
||||
room = Room.objects.create(
|
||||
hotel=hotel,
|
||||
number=room_number,
|
||||
external_id=f"{hotel.hotel_id}_{room_number}".lower(),
|
||||
description="Automatically added room",
|
||||
)
|
||||
self.logger.info(f"Room '{room.number}' created in hotel '{hotel.name}'.")
|
||||
return room
|
||||
|
||||
except Exception as e:
|
||||
self.logger.error(f"Error creating room '{room_number}' in hotel '{hotel.name}': {e}")
|
||||
return None
|
||||
|
||||
class DataSyncManager:
|
||||
def __init__(self, db_settings_id):
|
||||
@@ -167,6 +177,12 @@ class DataSyncManager:
|
||||
return record.id if record else 0
|
||||
|
||||
def fetch_new_data(self, last_id):
|
||||
"""
|
||||
Извлекает новые данные из таблицы для синхронизации.
|
||||
|
||||
:param last_id: Последний обработанный ID.
|
||||
:return: Список строк, полученных из базы данных.
|
||||
"""
|
||||
query = f"""
|
||||
SELECT * FROM `{self.db_settings.get('table_name')}`
|
||||
WHERE id > {last_id}
|
||||
@@ -176,48 +192,94 @@ class DataSyncManager:
|
||||
ORDER BY id ASC
|
||||
LIMIT 1000;
|
||||
"""
|
||||
|
||||
self.logger.info(f"Fetching new data with query: {query}")
|
||||
return self.db_connector.execute_query(query)
|
||||
|
||||
|
||||
try:
|
||||
rows = self.db_connector.execute_query(query)
|
||||
self.logger.info(f"Fetched {len(rows)} records from the database.")
|
||||
return rows
|
||||
except Exception as e:
|
||||
self.logger.error(f"Error fetching data: {e}")
|
||||
return []
|
||||
def update_sync_log(self, hotel, recieved_records, processed_records):
|
||||
try:
|
||||
log, created = SyncLog.objects.get_or_create(hotel=hotel)
|
||||
if created:
|
||||
log.recieved_records = recieved_records
|
||||
log.processed_records = processed_records
|
||||
else:
|
||||
log.recieved_records += recieved_records
|
||||
log.processed_records += processed_records
|
||||
log.save()
|
||||
self.logger.info(f"Sync log updated for hotel '{hotel.name}'.")
|
||||
except Exception as e:
|
||||
self.logger.error(f"Error updating sync log for hotel '{hotel.name}': {e}")
|
||||
|
||||
def process_and_save_data(self, rows):
|
||||
"""
|
||||
Обрабатывает и сохраняет данные из внешнего источника.
|
||||
|
||||
:param rows: Список строк данных, полученных из базы данных.
|
||||
"""
|
||||
seen_entries = set()
|
||||
|
||||
for row in rows:
|
||||
# Получение и декодирование URL-параметров
|
||||
url_parameters = row.get("url_parameters")
|
||||
if not url_parameters:
|
||||
self.logger.warning(f"Skipping record with missing URL parameters: {row}")
|
||||
continue
|
||||
|
||||
parsed_params = self.data_processor.parse_url_parameters(url_parameters)
|
||||
hotel_id = parsed_params.get("utm_content") # Извлекаем hotel_id из параметров
|
||||
room_number = parsed_params.get("utm_term") # Извлекаем room_number из параметров
|
||||
|
||||
if not hotel_id or not room_number:
|
||||
self.logger.warning(f"Skipping record with missing data: hotel_id={hotel_id}, room_number={room_number}")
|
||||
continue
|
||||
|
||||
# Проверка на дубликаты
|
||||
if (hotel_id, room_number) in seen_entries:
|
||||
self.logger.warning(f"Duplicate record skipped: hotel_id={hotel_id}, room_number={room_number}")
|
||||
continue
|
||||
|
||||
seen_entries.add((hotel_id, room_number))
|
||||
|
||||
try:
|
||||
url_params = self.data_processor.decode_html_entities(row.get("url_parameters", ""))
|
||||
params = self.data_processor.url_parameters_parser(url_params)
|
||||
# Получение или создание отеля
|
||||
hotel = self.hotel_manager.get_or_create_hotel(hotel_id, row.get("page_title"))
|
||||
if not hotel:
|
||||
self.logger.warning(f"Skipping record: Failed to create or retrieve hotel with ID {hotel_id}")
|
||||
continue
|
||||
|
||||
hotel_id = params.get("utm_content")
|
||||
room_number = params.get("utm_term")
|
||||
page_title = row.get("page_title")
|
||||
external_id = row.get("id")
|
||||
hits = row.get("hits") or 0
|
||||
|
||||
hotel = self.hotel_manager.get_or_create_hotel(hotel_id, page_title)
|
||||
# Получение или создание комнаты
|
||||
room = self.hotel_manager.get_or_create_room(hotel, room_number)
|
||||
page_url = row.get("page_url")
|
||||
if not room:
|
||||
self.logger.warning(f"Skipping record: Failed to create or retrieve room {room_number} in hotel {hotel.name}")
|
||||
continue
|
||||
|
||||
if hits != 0 and page_title is not None:
|
||||
UserActivityLog.objects.update_or_create(
|
||||
external_id=external_id,
|
||||
defaults={
|
||||
"user_id": row.get("user_id") or 0,
|
||||
"timestamp": row.get("timestamp"),
|
||||
"date_time": row.get("date_time"),
|
||||
"ip": row.get("ip") or "0.0.0.0",
|
||||
"created": self.data_processor.parse_datetime(row.get("created")) or timezone.now(),
|
||||
"url_parameters": url_params,
|
||||
"page_id": room.id if room else None,
|
||||
"page_title": html.unescape(page_title),
|
||||
"hits": hits,
|
||||
"page_url": html.unescape(page_url),
|
||||
}
|
||||
)
|
||||
else:
|
||||
self.logger.warning("Invalid data for UserActivityLog.")
|
||||
self.logger.info(f"Record ID {external_id} processed successfully.")
|
||||
# Создание или обновление записи активности пользователя
|
||||
UserActivityLog.objects.update_or_create(
|
||||
external_id=row.get("id"),
|
||||
defaults={
|
||||
"user_id": row.get("user_id") or 0,
|
||||
"ip": row.get("ip") or "0.0.0.0",
|
||||
"created": self.data_processor.parse_datetime(row.get("created")),
|
||||
"timestamp": row.get("timestamp") or datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
|
||||
"date_time": self.data_processor.parse_datetime(row.get("date_time")),
|
||||
"url_parameters": parsed_params,
|
||||
"page_title": self.data_processor.decode_html_entities(row.get("page_title")) or "Untitled",
|
||||
"page_url": row.get("page_url") or "",
|
||||
"hits": row.get("hits") or 0,
|
||||
}
|
||||
)
|
||||
self.logger.info(f"Record ID {row.get('id')} processed successfully.")
|
||||
except Exception as e:
|
||||
self.logger.error(f"Error processing record ID {row.get('id')}: {e}")
|
||||
|
||||
self.logger.info(f"Data processing completed. Processed {len(seen_entries)} unique records.")
|
||||
|
||||
|
||||
def sync(self):
|
||||
self.db_connector.connect()
|
||||
try:
|
||||
@@ -230,7 +292,7 @@ class DataSyncManager:
|
||||
|
||||
|
||||
def scheduled_sync():
|
||||
logger = CustomLogger(name="DatabaseSyncScheduler", log_level="DEBUG").get_logger()
|
||||
logger = CustomLogger(name="DatabaseSyncScheduler", log_level="ERROR").get_logger()
|
||||
logger.info("Starting scheduled sync.")
|
||||
|
||||
active_db_settings = ExternalDBSettings.objects.filter(is_active=True)
|
||||
@@ -256,5 +318,5 @@ def scheduled_sync():
|
||||
future.result(timeout=300)
|
||||
except TimeoutError:
|
||||
logger.error("Sync task timed out.")
|
||||
|
||||
|
||||
logger.info("Scheduled sync completed.")
|
||||
|
||||
Reference in New Issue
Block a user