Files
Touchh/antifroud/check_fraud.py
2024-12-17 21:39:33 +09:00

160 lines
6.5 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import logging
from datetime import timedelta
from urllib.parse import parse_qs
from django.utils import timezone
from django.db.models import Q
from hotels.models import Reservation, Hotel
from .models import UserActivityLog, ViolationLog
# Настройка логирования
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class ReservationChecker:
"""
Класс для проверки несоответствий между бронированиями и логами заселения.
"""
def __init__(self):
"""
Инициализация времени проверки и списка нарушений.
"""
self.start_time = timezone.now() - timedelta(days=30)
self.end_time = timezone.now()
self.violations = []
def log_info(self, message):
"""Логирование информационных сообщений."""
logger.info(message)
def log_warning(self, message):
"""Логирование предупреждений."""
logger.warning(message)
def log_error(self, message):
"""Логирование ошибок."""
logger.error(message)
def fetch_user_logs(self):
"""
Извлекает записи из UserActivityLog за последние 12 часов.
"""
print(f"Fetching user logs from {self.start_time} to {self.end_time}")
user_logs = UserActivityLog.objects.filter(created__range=(self.start_time, self.end_time))
print(f"Found {user_logs.count()} logs for analysis.")
return user_logs
def fetch_hotels(self, hotel_ids):
"""
Извлекает отели по hotel_id из логов.
"""
hotels = {hotel.hotel_id: hotel for hotel in Hotel.objects.filter(hotel_id__in=hotel_ids)}
self.log_info(f"Найдено {len(hotels)} отелей для сверки.")
return hotels
def find_violations(self, user_logs, hotels):
"""
Сопоставляет логи активности с бронированиями и фиксирует нарушения.
"""
for log in user_logs:
if not log.url_parameters:
self.log_warning(f"Пропущена запись ID {log.id}: отсутствуют URL-параметры.")
continue
# Парсинг параметров URL
params = parse_qs(log.url_parameters)
hotel_id = params.get("utm_content", [None])[0]
room_number = params.get("utm_term", [None])[0]
print(f"Processing log ID {log.id} with hotel ID {hotel_id} and room number {room_number}")
if not hotel_id or not room_number:
self.log_warning(f"Пропущена запись ID {log.id}: некорректные параметры URL.")
continue
if hotel_id not in hotels:
self.log_warning(f"Пропущена запись ID {log.id}: отель с ID {hotel_id} не найден.")
continue
hotel = hotels[hotel_id]
log_time = timezone.localtime(log.created)
# Проверка существования бронирования
matching_reservations = Reservation.objects.filter(
hotel=hotel,
room_number=room_number,
check_in__lte=log_time,
check_out__gte=log_time
)
print(f"Found {matching_reservations.count()} matching reservations")
if not matching_reservations.exists():
violation_details = (
f"Не найдено бронирование для номера {room_number} в отеле '{hotel.name}' на {log_time}."
)
# Добавляем нарушение, если его ещё нет в базе
if not ViolationLog.objects.filter(
hotel=hotel,
room_number=room_number,
violation_type="missed",
violation_details=violation_details
).exists():
self.violations.append(ViolationLog(
hotel=hotel,
room_number=room_number,
violation_type="missed",
violation_details=violation_details
))
self.log_warning(f"Зафиксировано нарушение: {violation_details}")
def save_violations(self):
"""
Сохраняет найденные нарушения в базу данных.
"""
if self.violations:
ViolationLog.objects.bulk_create(self.violations)
self.log_info(f"Создано {len(self.violations)} записей в ViolationLog.")
else:
self.log_info("Нарушений не обнаружено.")
def run_check(self):
"""
Основной метод для запуска проверки.
"""
self.log_info(f"Запуск проверки бронирований с {self.start_time} по {self.end_time}.")
try:
# Получаем логи пользователей
user_logs = self.fetch_user_logs()
# Извлекаем hotel_id из логов
hotel_ids = set()
for log in user_logs:
if log.url_parameters:
params = parse_qs(log.url_parameters)
hotel_id = params.get("utm_content", [None])[0]
if hotel_id:
hotel_ids.add(hotel_id)
# Предзагружаем отели
hotels = self.fetch_hotels(hotel_ids)
# Сравниваем логи с бронированиями
self.find_violations(user_logs, hotels)
# Сохраняем результаты
self.save_violations()
except Exception as e:
self.log_error(f"Произошла ошибка при выполнении проверки: {e}")
self.log_info("Проверка бронирований завершена.")
# Функция для запуска проверки из планировщика
def run_reservation_check():
"""
Функция для запуска проверки бронирований.
"""
checker = ReservationChecker()
checker.run_check()