diff --git a/antifroud/data_sync.py b/antifroud/data_sync.py index 4c3c87f6..7cf38b2c 100644 --- a/antifroud/data_sync.py +++ b/antifroud/data_sync.py @@ -1,235 +1,174 @@ + import logging import pymysql from datetime import datetime from urllib.parse import unquote, parse_qs -import pytz from django.utils import timezone -from django.conf import settings -import chardet import html from hotels.models import Room, Hotel from .models import UserActivityLog, ExternalDBSettings - +from touchh.utils.log import CustomLogger +from concurrent.futures import ThreadPoolExecutor, TimeoutError +from decouple import config class DatabaseConnector: - """ - Класс для подключения к внешней базе данных. - """ def __init__(self, db_settings_id): self.db_settings_id = db_settings_id + self.logger = CustomLogger(name="DatabaseConnector", log_level="DEBUG").get_logger() self.connection = None - self.logger = self.setup_logger() - self.db_settings = None + self.db_settings = self.get_db_settings() - def setup_logger(self): - default_level = logging.INFO # Уровень по умолчанию - level_name = getattr(settings, "DATA_SYNC_LOG_LEVEL", "INFO").upper() - log_level = getattr(logging, level_name, default_level) - logger.setLevel(log_level) - - # Настройка обработчика для файла - handler = logging.FileHandler("data_sync.log") - handler.setFormatter(logging.Formatter("%(asctime)s - %(levelname)s - %(message)s")) - handler.setLevel(log_level) - - # Удаляем старые обработчики, чтобы избежать дублирования - if logger.hasHandlers(): - logger.handlers.clear() - logger.addHandler(handler) - - # Сообщение о текущем уровне логирования - logger.info(f"Уровень логирования установлен: {logging.getLevelName(log_level)}") - - logger.addHandler(handler) - return logger + def get_db_settings(self): + try: + settings = ExternalDBSettings.objects.get(id=self.db_settings_id) + self.logger.info(f"Retrieved DB settings: {settings}") + return { + "host": settings.host, + "port": settings.port, + "user": settings.user, + "password": settings.password, + "database": settings.database, + "table_name": settings.table_name + + } + except ExternalDBSettings.DoesNotExist: + self.logger.error(f"Settings with ID {self.db_settings_id} not found.") + raise ValueError("Invalid db_settings_id") + except Exception as e: + self.logger.error(f"Error retrieving settings: {e}") + raise def connect(self): - """Подключение к базе данных.""" try: - self.db_settings = ExternalDBSettings.objects.get(id=self.db_settings_id) + self.logger.info(f"Connecting to DB with settings: {self.db_settings}") self.connection = pymysql.connect( - host=self.db_settings.host, - port=self.db_settings.port, - user=self.db_settings.user, - password=self.db_settings.password, - database=self.db_settings.database, + host=self.db_settings["host"], + port=self.db_settings["port"], + user=self.db_settings["user"], + password=self.db_settings["password"], + database=self.db_settings["database"], charset="utf8mb4", cursorclass=pymysql.cursors.DictCursor, ) - self.logger.info("Подключение к базе данных успешно установлено.") + self.logger.info("Database connection established successfully.") + except pymysql.err.OperationalError as e: + self.logger.error(f"Operational error during DB connection: {e}") + raise ConnectionError(f"Operational error: {e}") except Exception as e: - self.logger.error(f"Ошибка подключения к БД: {e}") + self.logger.error(f"Unexpected database connection error: {e}") raise ConnectionError(e) + def execute_query(self, query): + try: + with self.connection.cursor() as cursor: + cursor.execute(query) + return cursor.fetchall() + except Exception as e: + self.logger.error(f"Query execution error: {e}") + return [] + def close(self): - """Закрывает соединение с базой данных.""" if self.connection: self.connection.close() - self.logger.info("Соединение с базой данных закрыто.") - - def execute_query(self, query): - """Выполнение запроса и возврат результатов.""" - with self.connection.cursor() as cursor: - cursor.execute(query) - return cursor.fetchall() + self.logger.info("Database connection closed.") class DataProcessor: - """ - Обрабатывает и сохраняет данные. - """ def __init__(self, logger): self.logger = logger def decode_html_entities(self, text): - """Декодирует URL и HTML-сущности.""" if text and isinstance(text, str): - text = unquote(text) - text = html.unescape(text) - try: - detected = chardet.detect(text.encode()) - encoding = detected['encoding'] - if encoding and encoding != 'utf-8': - text = text.encode(encoding).decode('utf-8', errors='ignore') - except Exception as e: - self.logger.error(f"Ошибка кодировки: {e}") + return html.unescape(unquote(text)) return text def parse_datetime(self, dt_str): - """Преобразует строку даты в aware datetime.""" try: if isinstance(dt_str, datetime): return timezone.make_aware(dt_str) if timezone.is_naive(dt_str) else dt_str if dt_str: return timezone.make_aware(datetime.strptime(dt_str, "%Y-%m-%d %H:%M:%S")) except Exception as e: - self.logger.error(f"Ошибка парсинга даты: {e}") + self.logger.error(f"Datetime parsing error: {e}") return None + def url_parameters_parser(self, url_parameters): - """ - Парсит строку URL-параметров и возвращает словарь с ключами и значениями. - - Пример входа: - url_parameters = "utm_medium=qr-3&utm_content=chisto-pozhit&utm_term=101" - - Возвращает: - { - "utm_medium": "qr-3", - "utm_content": "chisto-pozhit", - "utm_term": "101" - } - """ try: if not url_parameters: return {} - - # Декодируем параметры URL decoded_params = unquote(url_parameters) parsed_params = parse_qs(decoded_params) - - # Преобразуем список значений в строку - result = {key: value[0] for key, value in parsed_params.items()} - return result - + return {key: value[0] for key, value in parsed_params.items()} except Exception as e: - self.logger.error(f"Ошибка парсинга URL-параметров: {e}") + self.logger.error(f"URL parameters parsing error: {e}") return {} + class HotelRoomManager: - """ - Управляет созданием отелей и номеров. - """ def __init__(self, logger): self.logger = logger def get_or_create_hotel(self, hotel_id, page_title): - """ - Создает или получает отель. - - :param hotel_id: Значение из utm_content (индекс отеля) - :param page_title: Название отеля из поля page_title - """ if not hotel_id: - self.logger.warning("Пропущено создание отеля: отсутствует hotel_id.") + self.logger.warning("Hotel creation skipped: missing hotel_id.") return None - # Создаем или получаем отель с hotel_id и устанавливаем name по page_title hotel, created = Hotel.objects.get_or_create( hotel_id=hotel_id, defaults={ "name": html.unescape(page_title) or f"Отель {hotel_id}", - "description": "Автоматически добавленный отель" + "description": "Автоматически созданный отель", } ) if created: - self.logger.info(f"Создан отель '{hotel.name}' с hotel_id: {hotel_id}") + self.logger.info(f"Hotel '{hotel.name}' created with hotel_id: {hotel_id}") else: - self.logger.info(f"Отель '{hotel.name}' уже существует с hotel_id: {hotel_id}") + self.logger.info(f"Hotel '{hotel.name}' already exists with hotel_id: {hotel_id}") return hotel def get_or_create_room(self, hotel, room_number): - """ - Создает или получает номер отеля. - - :param hotel: Экземпляр модели Hotel - :param room_number: Номер комнаты из utm_term - """ if not hotel: - self.logger.warning("Пропущено создание номера: отсутствует отель.") + self.logger.warning("Room creation skipped: missing hotel.") return None if not room_number: - self.logger.warning(f"Пропущено создание номера: отсутствует room_number для отеля {hotel.name}.") + self.logger.warning(f"Room creation skipped: missing room_number for hotel {hotel.name}.") return None - # Генерация уникального external_id на основе hotel_id и room_number external_id = f"{hotel.hotel_id}_{room_number}".lower() - # Создаем или получаем номер room, created = Room.objects.get_or_create( hotel=hotel, number=room_number, defaults={ - "number": room_number, # Используем room_number как название номера "external_id": external_id, - "description": "Автоматически добавленный номер" + "description": "Автоматически созданная комната", } ) if created: - self.logger.info(f"Создан номер '{room.number}' (external_id: {external_id}) в отеле '{hotel.name}'") + self.logger.info(f"Room '{room.number}' (external_id: {external_id}) created in hotel '{hotel.name}'") else: - self.logger.info(f"Номер '{room.number}' уже существует в отеле '{hotel.name}'") + self.logger.info(f"Room '{room.number}' already exists in hotel '{hotel.name}'") return room + class DataSyncManager: - """ - Главный класс для синхронизации данных. - """ def __init__(self, db_settings_id): - self.logger = self.setup_logger() + self.logger = CustomLogger(name="DataSyncManager", log_level="DEBUG").get_logger() self.db_connector = DatabaseConnector(db_settings_id) + self.db_settings = self.db_connector.db_settings # Сохраняем настройки базы данных self.data_processor = DataProcessor(self.logger) self.hotel_manager = HotelRoomManager(self.logger) - def setup_logger(self): - logger = logging.getLogger(__name__) - logger.setLevel(logging.DEBUG) - handler = logging.FileHandler("data_sync.log") - handler.setFormatter(logging.Formatter("%(asctime)s - %(levelname)s - %(message)s")) - logger.addHandler(handler) - return logger - def get_last_saved_record(self): - """Получает ID последней записи.""" record = UserActivityLog.objects.order_by("-id").first() return record.id if record else 0 def fetch_new_data(self, last_id): - """Получает новые данные из БД.""" query = f""" - SELECT * FROM `{self.db_connector.db_settings.table_name}` + SELECT * FROM `{self.db_settings.get('table_name')}` WHERE id > {last_id} AND url_parameters IS NOT NULL AND url_parameters LIKE '%utm_medium%' @@ -237,100 +176,85 @@ class DataSyncManager: ORDER BY id ASC LIMIT 1000; """ - self.logger.info(f"Запрос на получение новых данных отправлен. \n Содержание запроса: {query}") + + self.logger.info(f"Fetching new data with query: {query}") return self.db_connector.execute_query(query) def process_and_save_data(self, rows): - """ - Обрабатывает и сохраняет данные из внешней базы данных. - """ for row in rows: try: - # Декодирование URL-параметров url_params = self.data_processor.decode_html_entities(row.get("url_parameters", "")) params = self.data_processor.url_parameters_parser(url_params) - timestamp = params.get("timestamp") - date_time = params.get("date_time") - # Извлечение данных + hotel_id = params.get("utm_content") room_number = params.get("utm_term") - page_title = row.get("page_title") # Название отеля из page_title + page_title = row.get("page_title") external_id = row.get("id") + hits = row.get("hits") or 0 - # Создание отеля и комнаты hotel = self.hotel_manager.get_or_create_hotel(hotel_id, page_title) room = self.hotel_manager.get_or_create_room(hotel, room_number) page_url = row.get("page_url") - # Заполнение записи - UserActivityLog.objects.update_or_create( - external_id=external_id, - defaults={ - "user_id": row.get("user_id") or 0, - "timestamp": row.get("timestamp"), - "date_time": row.get("date_time"), - "ip": row.get("ip") or "0.0.0.0", - "created": self.data_processor.parse_datetime(row.get("created")) or timezone.now(), - "url_parameters": url_params, - "page_id": room.id if room else None, - "page_title": html.unescape(page_title), - "hits": row.get("hits") or 0, - "page_url": html.unescape(page_url), - } - ) - self.logger.info(f"Запись ID {external_id} успешно обработана.") + + if hits != 0 and page_title is not None: + UserActivityLog.objects.update_or_create( + external_id=external_id, + defaults={ + "user_id": row.get("user_id") or 0, + "timestamp": row.get("timestamp"), + "date_time": row.get("date_time"), + "ip": row.get("ip") or "0.0.0.0", + "created": self.data_processor.parse_datetime(row.get("created")) or timezone.now(), + "url_parameters": url_params, + "page_id": room.id if room else None, + "page_title": html.unescape(page_title), + "hits": hits, + "page_url": html.unescape(page_url), + } + ) + else: + self.logger.warning("Invalid data for UserActivityLog.") + self.logger.info(f"Record ID {external_id} processed successfully.") except Exception as e: - self.logger.error(f"Ошибка при обработке записи ID {row.get('id')}: {e}") - - + self.logger.error(f"Error processing record ID {row.get('id')}: {e}") def sync(self): - """Запускает процесс синхронизации.""" self.db_connector.connect() try: last_id = self.get_last_saved_record() rows = self.fetch_new_data(last_id) self.process_and_save_data(rows) - self.logger.info("Синхронизация завершена.") + self.logger.info("Sync completed.") finally: self.db_connector.close() -import logging -from concurrent.futures import ThreadPoolExecutor -from .models import ExternalDBSettings - -logger = logging.getLogger(__name__) -logger.setLevel(logging.DEBUG) def scheduled_sync(): - """ - Планировщик синхронизации для всех активных подключений. - Каждое подключение обрабатывается отдельно. - """ - logger.info("Запуск планировщика синхронизации.") + logger = CustomLogger(name="DatabaseSyncScheduler", log_level="DEBUG").get_logger() + logger.info("Starting scheduled sync.") - # Получаем все активные настройки подключения active_db_settings = ExternalDBSettings.objects.filter(is_active=True) if not active_db_settings.exists(): - logger.warning("Не найдено активных подключений для синхронизации.") + logger.warning("No active database connections found.") return - logger.info(f"Найдено активных подключений: {len(active_db_settings)}") + logger.info(f"Found {len(active_db_settings)} active database connections.") def sync_task(db_settings): - """ - Выполняет синхронизацию для одного подключения. - """ try: - logger.info(f"Начало синхронизации для подключения: {db_settings.name} (ID={db_settings.id})") + logger.info(f"Syncing connection: {db_settings.name} (ID={db_settings.id})") sync_manager = DataSyncManager(db_settings.id) sync_manager.sync() - logger.info(f"Синхронизация успешно завершена для подключения: {db_settings.name}") + logger.info(f"Sync completed for connection: {db_settings}") except Exception as e: - logger.error(f"Ошибка синхронизации для подключения {db_settings.name}: {e}") + logger.error(f"Error syncing connection {db_settings}: {e}") - # Параллельное выполнение задач синхронизации - with ThreadPoolExecutor(max_workers=5) as executor: # Максимальное количество потоков = 5 - for db_settings in active_db_settings: - executor.submit(sync_task, db_settings) + with ThreadPoolExecutor(max_workers=5) as executor: + futures = [executor.submit(sync_task, db_settings) for db_settings in active_db_settings] + for future in futures: + try: + future.result(timeout=300) + except TimeoutError: + logger.error("Sync task timed out.") - logger.info("Планировщик синхронизации завершил работу.") + logger.info("Scheduled sync completed.") diff --git a/bot/management/commands/run_bot.py b/bot/management/commands/run_bot.py index 38cd3400..212f8332 100644 --- a/bot/management/commands/run_bot.py +++ b/bot/management/commands/run_bot.py @@ -7,6 +7,7 @@ from telegram.ext import Application from bot.utils.bot_setup import setup_bot from scheduler.tasks import load_tasks_to_scheduler from settings.models import TelegramSettings +from touchh.utils.log import CustomLogger class Command(BaseCommand): help = "Запуск Telegram бота и планировщика" diff --git a/manage.py b/manage.py index 915feb02..27349e86 100755 --- a/manage.py +++ b/manage.py @@ -6,7 +6,7 @@ import logging # Настройка логирования logging.basicConfig( - level=logging.INFO, # Уровень логирования (можно DEBUG для полной информации) + level=logging.ERROR, # Уровень логирования (можно DEBUG для полной информации) format="%(asctime)s - %(levelname)s - %(message)s", handlers=[ logging.FileHandler("bot.log"), # Логи будут записываться в файл bot.log diff --git a/requierments.txt b/requierments.txt index 6c77e403..1b4fee9e 100644 --- a/requierments.txt +++ b/requierments.txt @@ -66,4 +66,4 @@ user-agents==2.2.0 yarl==1.18.3 mysqlclient chardet -decouple \ No newline at end of file +python-decouple \ No newline at end of file diff --git a/requirements.txt b/requirements.txt deleted file mode 100644 index e00e4155..00000000 --- a/requirements.txt +++ /dev/null @@ -1,109 +0,0 @@ -ace_tools==0.0 -aiohappyeyeballs==2.4.4 -aiohttp==3.11.10 -aiosignal==1.3.1 -anyio==4.6.2.post1 -APScheduler==3.11.0 -asgiref==3.8.1 -async-timeout==5.0.1 -attrs==24.2.0 -certifi==2024.8.30 -charset-normalizer==3.4.0 -Django==5.1.4 -django-filter==24.3 -django-jazzmin==3.0.1 -django-jet==1.0.8 -et_xmlfile==2.0.0 -exceptiongroup==1.2.2 -fpdf==1.7.2 -frozenlist==1.5.0 -geoip2==4.8.1 -h11==0.14.0 -httpcore==1.0.7 -httpx==0.28.0 -idna==3.10 -jsonschema==4.23.0 -jsonschema-specifications==2024.10.1 -maxminddb==2.6.2 -multidict==6.1.0 -numpy==2.1.3 -openpyxl==3.1.5 -pandas==2.2.3 -pathspec==0.12.1 -pillow==11.0.0 -propcache==0.2.1 -PyMySQL==1.1.1 -python-dateutil==2.9.0.post0 -python-dotenv==1.0.1 -python-telegram-bot==21.8 -pytz==2024.2 -PyYAML==6.0.2 -referencing==0.35.1 -requests==2.32.3 -rpds-py==0.22.3 -six==1.17.0 -sniffio==1.3.1 -sqlparse==0.5.2 -typing_extensions==4.12.2 -tzdata==2024.2 -tzlocal==5.2 -ua-parser==1.0.0 -ua-parser-builtins==0.18.0.post1 -urllib3==2.2.3 -user-agents==2.2.0 -yarl==1.18.3 -ace_tools==0.0 -aiohappyeyeballs==2.4.4 -aiohttp==3.11.10 -aiosignal==1.3.1 -anyio==4.6.2.post1 -APScheduler==3.11.0 -asgiref==3.8.1 -async-timeout==5.0.1 -attrs==24.2.0 -certifi==2024.8.30 -charset-normalizer==3.4.0 -Django==5.1.4 -django-filter==24.3 -django-health-check==3.18.3 -django-jazzmin==3.0.1 -django-jet==1.0.8 -et_xmlfile==2.0.0 -exceptiongroup==1.2.2 -fpdf==1.7.2 -frozenlist==1.5.0 -geoip2==4.8.1 -h11==0.14.0 -httpcore==1.0.7 -httpx==0.28.0 -idna==3.10 -jsonschema==4.23.0 -jsonschema-specifications==2024.10.1 -maxminddb==2.6.2 -multidict==6.1.0 -numpy==2.1.3 -openpyxl==3.1.5 -pandas==2.2.3 -pathspec==0.12.1 -pillow==11.0.0 -propcache==0.2.1 -PyMySQL==1.1.1 -python-dateutil==2.9.0.post0 -python-dotenv==1.0.1 -python-telegram-bot==21.8 -pytz==2024.2 -PyYAML==6.0.2 -referencing==0.35.1 -requests==2.32.3 -rpds-py==0.22.3 -six==1.17.0 -sniffio==1.3.1 -sqlparse==0.5.2 -typing_extensions==4.12.2 -tzdata==2024.2 -tzlocal==5.2 -ua-parser==1.0.0 -ua-parser-builtins==0.18.0.post1 -urllib3==2.2.3 -user-agents==2.2.0 -yarl==1.18.3 diff --git a/touchh/settings.py b/touchh/settings.py index 843688ed..621f8931 100644 --- a/touchh/settings.py +++ b/touchh/settings.py @@ -131,7 +131,7 @@ LOGGING = { 'disable_existing_loggers': False, 'handlers': { 'file': { - 'level': 'WARNING', + 'level': os.getenv("LOG_LEVEL"), 'class': 'logging.FileHandler', 'filename': 'import_hotels.log', # Лог будет записываться в этот файл }, @@ -139,12 +139,12 @@ LOGGING = { 'loggers': { 'django': { 'handlers': ['file'], - 'level': 'WARNING', + 'level': os.getenv("LOG_LEVEL"), 'propagate': True, }, 'antifroud': { 'handlers': ['file'], - 'level': 'WARNING', + 'level': os.getenv("LOG_LEVEL"), 'propagate': True, }, }, diff --git a/touchh/utils/__init__.py b/touchh/utils/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/touchh/utils/log.py b/touchh/utils/log.py new file mode 100644 index 00000000..bd8234d7 --- /dev/null +++ b/touchh/utils/log.py @@ -0,0 +1,55 @@ +import logging +import os +from dotenv import load_dotenv + +# Загрузка переменных из .env +load_dotenv() + +class CustomLogger: + """ + Универсальный логгер для использования в проекте. + """ + + def __init__(self, name, log_level=None): + """ + Инициализирует логгер. + + :param name: Имя логгера (обычно имя функции или класса) + :param log_level: Уровень логирования (по умолчанию из .env) + """ + self.logger = logging.getLogger(name) + + # Уровень логирования по умолчанию из .env + default_level = os.getenv("LOG_LEVEL", "INFO").upper() + self.log_level = getattr(logging, log_level.upper(), getattr(logging, default_level, logging.INFO)) + + self.setup_logger() + + def setup_logger(self): + """ + Настраивает логгер с обработчиком и форматом. + """ + self.logger.setLevel(self.log_level) + + # Удаляем старые обработчики, чтобы избежать дублирования + if self.logger.hasHandlers(): + self.logger.handlers.clear() + + # Добавляем обработчик для файла + file_handler = logging.FileHandler("project.log") + file_handler.setFormatter(logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")) + file_handler.setLevel(self.log_level) + + # Добавляем обработчик для консоли + console_handler = logging.StreamHandler() + console_handler.setFormatter(logging.Formatter("%(name)s - %(levelname)s - %(message)s")) + console_handler.setLevel(self.log_level) + + self.logger.addHandler(file_handler) + self.logger.addHandler(console_handler) + + def get_logger(self): + """ + Возвращает настроенный логгер. + """ + return self.logger