Files
Touchh/antifroud/check_fraud.py
2024-12-18 10:49:40 +09:00

148 lines
6.5 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import logging
from datetime import timedelta
from urllib.parse import parse_qs
from django.utils import timezone
from django.db.models import Q
from hotels.models import Reservation, Hotel
from .models import UserActivityLog, ViolationLog
# Настройка логирования
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class ReservationChecker:
"""
Класс для проверки несоответствий между бронированиями и логами заселения.
"""
def __init__(self):
"""
Инициализация времени проверки и списка нарушений.
"""
self.start_time = timezone.now() - timedelta(days=30)
self.end_time = timezone.now()
self.violations = []
def log_info(self, message):
logger.info(message)
def log_warning(self, message):
logger.warning(message)
def log_error(self, message):
logger.error(message)
def fetch_user_logs(self):
""" Извлекает записи из UserActivityLog за период. """
user_logs = UserActivityLog.objects.filter(created__range=(self.start_time, self.end_time))
self.log_info(f"Найдено {user_logs.count()} логов активности для анализа.")
return user_logs
def fetch_reservations(self):
""" Извлекает бронирования за период. """
reservations = Reservation.objects.filter(
Q(check_in__lte=self.end_time) & Q(check_out__gte=self.start_time)
)
self.log_info(f"Найдено {reservations.count()} бронирований для анализа.")
return reservations
def find_violations(self):
"""
Сравнивает записи бронирований и логи активности для выявления нарушений:
- Сканирование QR без бронирования.
- Бронь со статусом "заселен" без сканирования QR.
- Раннее заселение.
"""
user_logs = self.fetch_user_logs()
reservations = self.fetch_reservations()
# Сопоставляем записи
log_lookup = {} # Словарь: (hotel_id, room_number) -> список логов
for log in user_logs:
params = parse_qs(log.url_parameters or "")
hotel_id = params.get("utm_content", [None])[0]
room_number = params.get("utm_term", [None])[0]
if hotel_id and room_number:
key = (hotel_id, room_number)
log_lookup.setdefault(key, []).append(log)
for reservation in reservations:
key = (reservation.hotel.hotel_id, reservation.room_number)
logs = log_lookup.get(key, [])
# Бронь со статусом "заселен" без сканирования QR
if reservation.status == "заселен" and not logs:
self.record_violation(
hotel=reservation.hotel,
room_number=reservation.room_number,
violation_type="no_qr_scan",
details=f"Бронирование для номера {reservation.room_number} в отеле '{reservation.hotel.name}' "
f"не имеет записи сканирования QR."
)
# Раннее заселение
for log in logs:
if log.created < reservation.check_in:
self.record_violation(
hotel=reservation.hotel,
room_number=reservation.room_number,
violation_type="early_check_in",
details=f"Раннее заселение: сканирование QR {log.created} раньше времени заезда "
f"{reservation.check_in} для номера {reservation.room_number}."
)
# Сканирование QR без бронирования
for (hotel_id, room_number), logs in log_lookup.items():
matching_reservations = reservations.filter(
hotel__hotel_id=hotel_id,
room_number=room_number
)
if not matching_reservations.exists():
for log in logs:
self.record_violation(
hotel=Hotel.objects.filter(hotel_id=hotel_id).first(),
room_number=room_number,
violation_type="no_reservation",
details=f"Сканирование QR {log.created} для номера {room_number} в отеле с ID '{hotel_id}' "
f"не соответствует ни одному бронированию."
)
def record_violation(self, hotel, room_number, violation_type, details):
"""
Записывает нарушение в список для последующего сохранения.
"""
if hotel:
self.violations.append(ViolationLog(
hotel=hotel,
room_number=room_number,
violation_type=violation_type,
violation_details=details
))
self.log_warning(f"Зафиксировано нарушение: {details}")
def save_violations(self):
""" Сохраняет найденные нарушения в базу данных. """
if self.violations:
ViolationLog.objects.bulk_create(self.violations)
self.log_info(f"Создано {len(self.violations)} записей в ViolationLog.")
else:
self.log_info("Нарушений не обнаружено.")
def run_check(self):
""" Основной метод для запуска проверки. """
self.log_info(f"Запуск проверки с {self.start_time} по {self.end_time}.")
try:
self.find_violations()
self.save_violations()
except Exception as e:
self.log_error(f"Ошибка при выполнении проверки: {e}")
self.log_info("Проверка завершена.")
# Функция для запуска из планировщика
def run_reservation_check():
checker = ReservationChecker()
checker.run_check()