253 lines
12 KiB
Python
253 lines
12 KiB
Python
from django.contrib import admin
|
||
from django.urls import path
|
||
from django.http import JsonResponse
|
||
from django.shortcuts import redirect, get_object_or_404
|
||
from django.contrib import messages
|
||
from django.db import transaction
|
||
from antifroud.models import UserActivityLog, ExternalDBSettings, RoomDiscrepancy, ImportedHotel
|
||
from hotels.models import Hotel
|
||
import pymysql
|
||
import logging
|
||
from django.urls import reverse
|
||
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
|
||
@admin.register(ExternalDBSettings)
|
||
class ExternalDBSettingsAdmin(admin.ModelAdmin):
|
||
change_form_template = "antifroud/admin/external_db_settings_change_form.html"
|
||
list_display = ("name", "host", "port", "user", "database", "table_name", "is_active", "created_at", "updated_at")
|
||
search_fields = ("name", "host", "user", "database")
|
||
list_filter = ("is_active", "created_at", "updated_at")
|
||
readonly_fields = ("created_at", "updated_at")
|
||
|
||
def add_view(self, request, form_url='', extra_context=None):
|
||
new_instance = ExternalDBSettings.objects.create(
|
||
name="Новая настройка", # Значение по умолчанию
|
||
host="",
|
||
port=3306,
|
||
user="",
|
||
password="",
|
||
is_active=False
|
||
)
|
||
return redirect(reverse('admin:antifroud_externaldbsettings_change', args=(new_instance.id,)))
|
||
|
||
def get_urls(self):
|
||
urls = super().get_urls()
|
||
custom_urls = [
|
||
path('test-connection/', self.admin_site.admin_view(self.test_connection), name='test_connection'),
|
||
path('fetch-tables/', self.admin_site.admin_view(self.fetch_tables), name='fetch_tables'),
|
||
path('fetch-table-data/', self.admin_site.admin_view(self.fetch_table_data), name='fetch_table_data'),
|
||
]
|
||
return custom_urls + urls
|
||
|
||
def test_connection(self, request):
|
||
db_id = request.GET.get('db_id')
|
||
if not db_id:
|
||
return JsonResponse({"status": "error", "message": "ID подключения отсутствует."}, status=400)
|
||
try:
|
||
db_settings = ExternalDBSettings.objects.get(id=db_id)
|
||
if not db_settings.user or not db_settings.password:
|
||
return JsonResponse({"status": "error", "message": "Имя пользователя или пароль не указаны."}, status=400)
|
||
|
||
connection = pymysql.connect(
|
||
host=db_settings.host,
|
||
port=db_settings.port,
|
||
user=db_settings.user,
|
||
password=db_settings.password,
|
||
database=db_settings.database
|
||
)
|
||
connection.close()
|
||
return JsonResponse({"status": "success", "message": "Подключение успешно установлено."})
|
||
except ExternalDBSettings.DoesNotExist:
|
||
return JsonResponse({"status": "error", "message": "Настройки подключения не найдены."}, status=404)
|
||
except pymysql.MySQLError as e:
|
||
return JsonResponse({"status": "error", "message": f"Ошибка MySQL: {str(e)}"}, status=500)
|
||
except Exception as e:
|
||
return JsonResponse({"status": "error", "message": f"Неизвестная ошибка: {str(e)}"}, status=500)
|
||
|
||
def fetch_tables(self, request):
|
||
try:
|
||
db_id = request.GET.get('db_id')
|
||
db_settings = ExternalDBSettings.objects.get(id=db_id)
|
||
connection = pymysql.connect(
|
||
host=db_settings.host,
|
||
port=db_settings.port,
|
||
user=db_settings.user,
|
||
password=db_settings.password,
|
||
database=db_settings.database
|
||
)
|
||
cursor = connection.cursor()
|
||
cursor.execute("SHOW TABLES;")
|
||
tables = [row[0] for row in cursor.fetchall()]
|
||
connection.close()
|
||
return JsonResponse({"status": "success", "tables": tables})
|
||
except Exception as e:
|
||
return JsonResponse({"status": "error", "message": str(e)})
|
||
|
||
def fetch_table_data(self, request):
|
||
try:
|
||
db_id = request.GET.get('db_id')
|
||
table_name = request.GET.get('table_name')
|
||
db_settings = ExternalDBSettings.objects.get(id=db_id)
|
||
connection = pymysql.connect(
|
||
host=db_settings.host,
|
||
port=db_settings.port,
|
||
user=db_settings.user,
|
||
password=db_settings.password,
|
||
database=db_settings.database
|
||
)
|
||
cursor = connection.cursor()
|
||
cursor.execute(f"SELECT * FROM `{table_name}` LIMIT 10;")
|
||
columns = [desc[0] for desc in cursor.description]
|
||
rows = cursor.fetchall()
|
||
connection.close()
|
||
return JsonResponse({"status": "success", "columns": columns, "rows": rows})
|
||
except Exception as e:
|
||
return JsonResponse({"status": "error", "message": str(e)})
|
||
|
||
|
||
@admin.register(UserActivityLog)
|
||
class UserActivityLogAdmin(admin.ModelAdmin):
|
||
list_display = ("id", "timestamp", "date_time", "page_id", "url_parameters", "created", "page_title", "type", "hits")
|
||
search_fields = ("page_title", "url_parameters")
|
||
list_filter = ("type", "created")
|
||
readonly_fields = ("created", "timestamp")
|
||
|
||
|
||
@admin.register(RoomDiscrepancy)
|
||
class RoomDiscrepancyAdmin(admin.ModelAdmin):
|
||
list_display = ("hotel", "room_number", "booking_id", "check_in_date_expected", "check_in_date_actual", "discrepancy_type", "created_at")
|
||
search_fields = ("hotel__name", "room_number", "booking_id")
|
||
list_filter = ("discrepancy_type", "created_at")
|
||
readonly_fields = ("created_at",)
|
||
|
||
|
||
# @admin.register(ImportedHotel)
|
||
# class ImportedHotelAdmin(admin.ModelAdmin):
|
||
# change_list_template = "antifroud/admin/imported_hotels.html"
|
||
# list_display = ("external_id", "display_name", "name", "created", "updated", "imported")
|
||
# search_fields = ("name", "display_name", "external_id")
|
||
# list_filter = ("imported", "created", "updated")
|
||
# actions = ['mark_as_imported', 'delete_selected_hotels_action']
|
||
|
||
# def get_urls(self):
|
||
# urls = super().get_urls()
|
||
# custom_urls = [
|
||
# path('import_selected_hotels/', self.import_selected_hotels, name='antifroud_importedhotels_import_selected_hotels'),
|
||
# path('delete_selected_hotels/', self.delete_selected_hotels, name='delete_selected_hotels'),
|
||
# path('edit_hotel/', self.edit_hotel, name='edit_hotel'),
|
||
# path('delete_hotel/', self.delete_hotel, name='delete_hotel'),
|
||
# ]
|
||
# return custom_urls + urls
|
||
|
||
# @transaction.atomic
|
||
# def import_selected_hotels(self, request): # Метод теперь правильно принимает request
|
||
# if request.method == 'POST':
|
||
# selected_hotels = request.POST.getlist('hotels')
|
||
# if selected_hotels:
|
||
# # Обновление статуса импорта для выбранных отелей
|
||
# ImportedHotel.objects.filter(id__in=selected_hotels).update(imported=True)
|
||
# return JsonResponse({'success': True})
|
||
# else:
|
||
# return JsonResponse({'success': False})
|
||
# return JsonResponse({'success': False})
|
||
|
||
# @transaction.atomic
|
||
# def delete_selected_hotels(self, request):
|
||
# if request.method == 'POST':
|
||
# selected = request.POST.get('selected', '')
|
||
# if selected:
|
||
# external_ids = selected.split(',')
|
||
# deleted_count, _ = ImportedHotel.objects.filter(external_id__in=external_ids).delete()
|
||
# messages.success(request, f"Удалено отелей: {deleted_count}")
|
||
# else:
|
||
# messages.warning(request, "Не выбрано ни одного отеля для удаления.")
|
||
# return redirect('admin:antifroud_importedhotel_changelist')
|
||
|
||
# @transaction.atomic
|
||
# def delete_hotel(self, request):
|
||
# if request.method == 'POST':
|
||
# hotel_id = request.POST.get('hotel_id')
|
||
# imported_hotel = get_object_or_404(ImportedHotel, id=hotel_id)
|
||
# imported_hotel.delete()
|
||
# messages.success(request, f"Отель {imported_hotel.name} успешно удалён.")
|
||
# return redirect('admin:antifroud_importedhotel_changelist')
|
||
|
||
# def delete_selected_hotels_action(self, request, queryset):
|
||
# deleted_count, _ = queryset.delete()
|
||
# self.message_user(request, f'{deleted_count} отелей было удалено.')
|
||
# delete_selected_hotels_action.short_description = "Удалить выбранные отели"
|
||
|
||
# def mark_as_imported(self, request, queryset):
|
||
# updated = queryset.update(imported=True)
|
||
# self.message_user(request, f"Отмечено как импортированное: {updated}", messages.SUCCESS)
|
||
# mark_as_imported.short_description = "Отметить выбранные как импортированные"
|
||
|
||
# def edit_hotel(self, request):
|
||
# if request.method == 'POST':
|
||
# hotel_id = request.POST.get('hotel_id')
|
||
# display_name = request.POST.get('display_name')
|
||
# original_name = request.POST.get('original_name')
|
||
# imported = request.POST.get('imported') == 'True'
|
||
|
||
# imported_hotel = get_object_or_404(ImportedHotel, id=hotel_id)
|
||
# imported_hotel.display_name = display_name
|
||
# imported_hotel.name = original_name
|
||
# imported_hotel.imported = imported
|
||
# imported_hotel.save()
|
||
|
||
# messages.success(request, f"Отель {imported_hotel.name} успешно обновлён.")
|
||
# return redirect('admin:antifroud_importedhotel_changelist')
|
||
# return redirect('admin:antifroud_importedhotel_changelist')
|
||
|
||
from .views import import_selected_hotels
|
||
# Регистрируем admin класс для ImportedHotel
|
||
@admin.register(ImportedHotel)
|
||
class ImportedHotelAdmin(admin.ModelAdmin):
|
||
change_list_template = "antifroud/admin/import_hotels.html"
|
||
list_display = ("external_id", "display_name", "name", "created", "updated", "imported")
|
||
search_fields = ("name", "display_name", "external_id")
|
||
list_filter = ("name", "display_name", "external_id")
|
||
actions = ['mark_as_imported', 'delete_selected_hotels_action']
|
||
|
||
def get_urls(self):
|
||
# Получаем стандартные URL-адреса и добавляем наши
|
||
urls = super().get_urls()
|
||
custom_urls = [
|
||
path('import_selected_hotels/', import_selected_hotels, name='antifroud_importedhotels_import_selected_hotels'),
|
||
path('delete_selected_hotels/', self.delete_selected_hotels, name='delete_selected_hotels'),
|
||
path('delete_hotel/<int:hotel_id>/', self.delete_hotel, name='delete_hotel'), # Изменили на URL параметр
|
||
]
|
||
return custom_urls + urls
|
||
|
||
@transaction.atomic
|
||
def delete_selected_hotels(self, request):
|
||
if request.method == 'POST':
|
||
selected = request.POST.get('selected', '')
|
||
if selected:
|
||
external_ids = selected.split(',')
|
||
deleted_count, _ = ImportedHotel.objects.filter(external_id__in=external_ids).delete()
|
||
messages.success(request, f"Удалено отелей: {deleted_count}")
|
||
else:
|
||
messages.warning(request, "Не выбрано ни одного отеля для удаления.")
|
||
return redirect('admin:antifroud_importedhotel_changelist')
|
||
|
||
def delete_selected_hotels(self, request, queryset):
|
||
deleted_count, _ = queryset.delete()
|
||
self.message_user(request, f'{deleted_count} отелей было удалено.')
|
||
delete_selected_hotels.short_description = "Удалить выбранные отели"
|
||
|
||
def mark_as_imported(self, request, queryset):
|
||
updated = queryset.update(imported=True)
|
||
self.message_user(request, f"Отмечено как импортированное: {updated}", messages.SUCCESS)
|
||
mark_as_imported.short_description = "Отметить выбранные как импортированные"
|
||
|
||
# Метод для удаления одного отеля
|
||
@transaction.atomic
|
||
def delete_hotel(self, request, hotel_id):
|
||
imported_hotel = get_object_or_404(ImportedHotel, id=hotel_id)
|
||
imported_hotel.delete()
|
||
messages.success(request, f"Отель {imported_hotel.name} успешно удалён.")
|
||
return redirect('admin:antifroud_importedhotel_changelist') |