- Кнопки 'убрать фон' для всех элементов: профиль, группы, ссылки - Кнопка 'сбросить настройки интерфейса' с подтверждением - Django app export_import с полным API для бэкапа и восстановления - Экспорт: создание ZIP архивов с данными профиля и медиафайлами - Импорт: селективная загрузка групп, ссылок, стилей, медиа - Обработка мультипарт форм, Django транзакции, управление ошибками - Полное тестирование: экспорт → импорт данных между пользователями - API эндпоинты: /api/export/, /api/import/, превью архивов - Готовая система для производственного развертывания
732 lines
28 KiB
Python
732 lines
28 KiB
Python
import json
|
||
import zipfile
|
||
import tempfile
|
||
import os
|
||
from pathlib import Path
|
||
from django.http import HttpResponse, Http404
|
||
from django.conf import settings
|
||
from django.shortcuts import get_object_or_404
|
||
from django.core.files.storage import default_storage
|
||
from django.core.files.base import ContentFile
|
||
from django.db import transaction
|
||
from rest_framework import status
|
||
from rest_framework.decorators import api_view, permission_classes
|
||
from rest_framework.permissions import IsAuthenticated
|
||
from rest_framework.response import Response
|
||
from django.utils import timezone
|
||
import shutil
|
||
|
||
from .models import ExportTask, ImportTask
|
||
from users.models import User
|
||
from links.models import LinkGroup, Link
|
||
from customization.models import DesignSettings
|
||
|
||
|
||
@api_view(['POST'])
|
||
@permission_classes([IsAuthenticated])
|
||
def create_export(request):
|
||
"""Создание задачи экспорта профиля"""
|
||
|
||
# Получаем параметры экспорта
|
||
include_groups = request.data.get('include_groups', True)
|
||
include_links = request.data.get('include_links', True)
|
||
include_styles = request.data.get('include_styles', True)
|
||
include_media = request.data.get('include_media', True)
|
||
|
||
# Создаем задачу экспорта
|
||
export_task = ExportTask.objects.create(
|
||
user=request.user,
|
||
include_groups=include_groups,
|
||
include_links=include_links,
|
||
include_styles=include_styles,
|
||
include_media=include_media,
|
||
)
|
||
|
||
try:
|
||
# Обновляем статус
|
||
export_task.status = 'processing'
|
||
export_task.save()
|
||
|
||
# Создаем архив с данными профиля
|
||
export_file_path = _create_profile_archive(export_task)
|
||
|
||
# Сохраняем путь к файлу в задаче
|
||
with open(export_file_path, 'rb') as f:
|
||
export_task.export_file.save(
|
||
f'profile_export_{export_task.user.username}_{timezone.now().strftime("%Y%m%d_%H%M%S")}.zip',
|
||
ContentFile(f.read()),
|
||
save=True
|
||
)
|
||
|
||
# Удаляем временный файл
|
||
os.remove(export_file_path)
|
||
|
||
export_task.status = 'completed'
|
||
export_task.save()
|
||
|
||
return Response({
|
||
'task_id': export_task.id,
|
||
'status': export_task.status,
|
||
'download_url': f'/api/export/{export_task.id}/download/',
|
||
'message': 'Экспорт профиля завершен успешно'
|
||
})
|
||
|
||
except Exception as e:
|
||
export_task.status = 'failed'
|
||
export_task.error_message = str(e)
|
||
export_task.save()
|
||
|
||
return Response({
|
||
'error': 'Ошибка при создании экспорта',
|
||
'details': str(e)
|
||
}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
|
||
|
||
|
||
@api_view(['GET'])
|
||
@permission_classes([IsAuthenticated])
|
||
def download_export(request, task_id):
|
||
"""Скачивание файла экспорта"""
|
||
|
||
export_task = get_object_or_404(ExportTask, id=task_id, user=request.user)
|
||
|
||
if export_task.status != 'completed' or not export_task.export_file:
|
||
return Response({
|
||
'error': 'Файл экспорта недоступен'
|
||
}, status=status.HTTP_404_NOT_FOUND)
|
||
|
||
try:
|
||
response = HttpResponse(
|
||
export_task.export_file.read(),
|
||
content_type='application/zip'
|
||
)
|
||
response['Content-Disposition'] = f'attachment; filename="profile_export_{request.user.username}.zip"'
|
||
return response
|
||
|
||
except FileNotFoundError:
|
||
raise Http404("Файл экспорта не найден")
|
||
|
||
|
||
@api_view(['GET'])
|
||
@permission_classes([IsAuthenticated])
|
||
def export_status(request, task_id):
|
||
"""Получение статуса задачи экспорта"""
|
||
|
||
export_task = get_object_or_404(ExportTask, id=task_id, user=request.user)
|
||
|
||
return Response({
|
||
'task_id': export_task.id,
|
||
'status': export_task.status,
|
||
'created_at': export_task.created_at,
|
||
'updated_at': export_task.updated_at,
|
||
'error_message': export_task.error_message,
|
||
'download_url': f'/api/export/{export_task.id}/download/' if export_task.status == 'completed' else None
|
||
})
|
||
|
||
|
||
def _create_profile_archive(export_task):
|
||
"""Создание архива с данными профиля"""
|
||
|
||
user = export_task.user
|
||
|
||
# Создаем временную директорию
|
||
with tempfile.TemporaryDirectory() as temp_dir:
|
||
profile_dir = Path(temp_dir) / 'profile_export'
|
||
profile_dir.mkdir()
|
||
|
||
# Создаем структуру данных для экспорта
|
||
export_data = {
|
||
'export_info': {
|
||
'username': user.username,
|
||
'export_date': timezone.now().isoformat(),
|
||
'export_options': {
|
||
'include_groups': export_task.include_groups,
|
||
'include_links': export_task.include_links,
|
||
'include_styles': export_task.include_styles,
|
||
'include_media': export_task.include_media,
|
||
}
|
||
},
|
||
'user_data': {
|
||
'username': user.username,
|
||
'email': user.email,
|
||
'first_name': user.first_name,
|
||
'last_name': user.last_name,
|
||
'bio': getattr(user, 'bio', ''),
|
||
'avatar': user.avatar.url if user.avatar else None,
|
||
'cover': user.cover.url if user.cover else None,
|
||
},
|
||
'groups': [],
|
||
'links': [],
|
||
'design_settings': {},
|
||
}
|
||
|
||
# Экспорт групп
|
||
if export_task.include_groups:
|
||
for group in LinkGroup.objects.filter(owner=user):
|
||
group_data = {
|
||
'id': group.id,
|
||
'title': group.title,
|
||
'description': group.description,
|
||
'image': group.image.url if group.image else None,
|
||
'is_active': group.is_active,
|
||
'is_public': group.is_public,
|
||
'is_featured': group.is_featured,
|
||
'created_at': group.created_at.isoformat(),
|
||
'order': group.order,
|
||
}
|
||
export_data['groups'].append(group_data)
|
||
|
||
# Экспорт ссылок
|
||
if export_task.include_links:
|
||
for link in Link.objects.filter(group__owner=user):
|
||
link_data = {
|
||
'id': link.id,
|
||
'group_id': link.group.id,
|
||
'title': link.title,
|
||
'url': link.url,
|
||
'description': link.description,
|
||
'image': link.image.url if link.image else None,
|
||
'is_active': link.is_active,
|
||
'is_public': link.is_public,
|
||
'is_featured': link.is_featured,
|
||
'created_at': link.created_at.isoformat(),
|
||
'order': link.order,
|
||
}
|
||
export_data['links'].append(link_data)
|
||
|
||
# Экспорт настроек дизайна
|
||
if export_task.include_styles:
|
||
try:
|
||
design_settings = DesignSettings.objects.get(user=user)
|
||
export_data['design_settings'] = {
|
||
'background_image': design_settings.background_image.url if design_settings.background_image else None,
|
||
'theme_color': design_settings.theme_color,
|
||
'dashboard_layout': design_settings.dashboard_layout,
|
||
'groups_default_expanded': design_settings.groups_default_expanded,
|
||
'show_group_icons': design_settings.show_group_icons,
|
||
'show_link_icons': design_settings.show_link_icons,
|
||
'dashboard_background_color': design_settings.dashboard_background_color,
|
||
'font_family': design_settings.font_family,
|
||
'custom_css': design_settings.custom_css,
|
||
'header_text_color': design_settings.header_text_color,
|
||
'group_text_color': design_settings.group_text_color,
|
||
'link_text_color': design_settings.link_text_color,
|
||
'cover_overlay_enabled': design_settings.cover_overlay_enabled,
|
||
'cover_overlay_color': design_settings.cover_overlay_color,
|
||
'cover_overlay_opacity': design_settings.cover_overlay_opacity,
|
||
'group_overlay_enabled': design_settings.group_overlay_enabled,
|
||
'group_overlay_color': design_settings.group_overlay_color,
|
||
'group_overlay_opacity': design_settings.group_overlay_opacity,
|
||
'show_groups_title': design_settings.show_groups_title,
|
||
'group_description_text_color': design_settings.group_description_text_color,
|
||
'body_font_family': design_settings.body_font_family,
|
||
'heading_font_family': design_settings.heading_font_family,
|
||
'template_id': design_settings.template_id,
|
||
'link_overlay_enabled': design_settings.link_overlay_enabled,
|
||
'link_overlay_color': design_settings.link_overlay_color,
|
||
'link_overlay_opacity': design_settings.link_overlay_opacity,
|
||
}
|
||
except DesignSettings.DoesNotExist:
|
||
export_data['design_settings'] = {}
|
||
|
||
# Сохраняем данные в JSON
|
||
json_file = profile_dir / 'profile_data.json'
|
||
with open(json_file, 'w', encoding='utf-8') as f:
|
||
json.dump(export_data, f, indent=2, ensure_ascii=False)
|
||
|
||
# Копируем медиафайлы
|
||
if export_task.include_media:
|
||
media_dir = profile_dir / 'media'
|
||
media_dir.mkdir()
|
||
|
||
# Копируем файлы пользователя
|
||
_copy_user_media_files(user, media_dir, export_data)
|
||
|
||
# Создаем ZIP архив
|
||
archive_path = temp_dir + '.zip'
|
||
with zipfile.ZipFile(archive_path, 'w', zipfile.ZIP_DEFLATED) as zipf:
|
||
for root, dirs, files in os.walk(profile_dir):
|
||
for file in files:
|
||
file_path = Path(root) / file
|
||
arc_name = file_path.relative_to(profile_dir)
|
||
zipf.write(file_path, arc_name)
|
||
|
||
return archive_path
|
||
|
||
|
||
def _copy_user_media_files(user, media_dir, export_data):
|
||
"""Копирование медиафайлов пользователя"""
|
||
|
||
media_root = Path(settings.MEDIA_ROOT)
|
||
|
||
# Функция для копирования файла
|
||
def copy_file_if_exists(url, subdir):
|
||
if url and url.startswith('/storage/'):
|
||
file_path = media_root / url[9:] # убираем /storage/
|
||
if file_path.exists():
|
||
target_dir = media_dir / subdir
|
||
target_dir.mkdir(exist_ok=True, parents=True)
|
||
shutil.copy2(file_path, target_dir / file_path.name)
|
||
|
||
# Аватар и обложка пользователя
|
||
copy_file_if_exists(export_data['user_data']['avatar'], 'avatars')
|
||
copy_file_if_exists(export_data['user_data']['cover'], 'avatars')
|
||
|
||
# Фоновые изображения в настройках дизайна
|
||
if export_data['design_settings'].get('background_image'):
|
||
copy_file_if_exists(export_data['design_settings']['background_image'], 'customization')
|
||
|
||
# Изображения групп
|
||
for group in export_data['groups']:
|
||
copy_file_if_exists(group.get('image'), 'link_groups')
|
||
|
||
# Изображения ссылок
|
||
for link in export_data['links']:
|
||
copy_file_if_exists(link.get('image'), 'links')
|
||
|
||
|
||
@api_view(['GET'])
|
||
def export_list(request):
|
||
"""Получение списка задач экспорта пользователя"""
|
||
|
||
# Для тестирования - простой ответ
|
||
if not request.user.is_authenticated:
|
||
return Response({
|
||
'message': 'Export API доступен',
|
||
'authenticated': False
|
||
})
|
||
|
||
export_tasks = ExportTask.objects.filter(user=request.user)
|
||
|
||
tasks_data = []
|
||
for task in export_tasks:
|
||
tasks_data.append({
|
||
'id': task.id,
|
||
'status': task.status,
|
||
'created_at': task.created_at,
|
||
'updated_at': task.updated_at,
|
||
'include_groups': task.include_groups,
|
||
'include_links': task.include_links,
|
||
'include_styles': task.include_styles,
|
||
'include_media': task.include_media,
|
||
'download_url': f'/api/export/{task.id}/download/' if task.status == 'completed' else None,
|
||
'error_message': task.error_message,
|
||
})
|
||
|
||
return Response({
|
||
'tasks': tasks_data,
|
||
'count': len(tasks_data)
|
||
})
|
||
|
||
|
||
@api_view(['POST'])
|
||
@permission_classes([IsAuthenticated])
|
||
def create_import(request):
|
||
"""Создание задачи импорта профиля"""
|
||
|
||
# Проверяем наличие файла
|
||
if 'import_file' not in request.FILES:
|
||
return Response({
|
||
'error': 'Файл для импорта не предоставлен'
|
||
}, status=status.HTTP_400_BAD_REQUEST)
|
||
|
||
import_file = request.FILES['import_file']
|
||
|
||
# Проверяем тип файла
|
||
if not import_file.name.endswith('.zip'):
|
||
return Response({
|
||
'error': 'Поддерживаются только ZIP архивы'
|
||
}, status=status.HTTP_400_BAD_REQUEST)
|
||
|
||
# Получаем параметры импорта из POST данных (для multipart/form-data)
|
||
def get_bool_param(name, default=True):
|
||
value = request.data.get(name, request.POST.get(name, str(default)))
|
||
return str(value).lower() in ('true', '1', 'yes', 'on')
|
||
|
||
import_groups = get_bool_param('import_groups', True)
|
||
import_links = get_bool_param('import_links', True)
|
||
import_styles = get_bool_param('import_styles', True)
|
||
import_media = get_bool_param('import_media', True)
|
||
overwrite_existing = get_bool_param('overwrite_existing', False)
|
||
|
||
# Создаем задачу импорта
|
||
try:
|
||
import_task = ImportTask.objects.create(
|
||
user=request.user,
|
||
import_file=import_file,
|
||
import_groups=import_groups,
|
||
import_links=import_links,
|
||
import_styles=import_styles,
|
||
import_media=import_media,
|
||
overwrite_existing=overwrite_existing,
|
||
)
|
||
|
||
# Обновляем статус
|
||
import_task.status = 'processing'
|
||
import_task.save()
|
||
|
||
# Выполняем импорт
|
||
_process_import(import_task)
|
||
|
||
import_task.status = 'completed'
|
||
import_task.save()
|
||
|
||
return Response({
|
||
'task_id': import_task.id,
|
||
'status': import_task.status,
|
||
'imported_groups_count': import_task.imported_groups_count,
|
||
'imported_links_count': import_task.imported_links_count,
|
||
'imported_media_count': import_task.imported_media_count,
|
||
'message': 'Импорт профиля завершен успешно'
|
||
})
|
||
|
||
except Exception as e:
|
||
# Если задача была создана, обновляем её статус
|
||
if 'import_task' in locals():
|
||
import_task.status = 'failed'
|
||
import_task.error_message = str(e)
|
||
import_task.save()
|
||
|
||
return Response({
|
||
'error': 'Ошибка при импорте',
|
||
'details': str(e)
|
||
}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
|
||
|
||
|
||
@api_view(['GET'])
|
||
@permission_classes([IsAuthenticated])
|
||
def import_status(request, task_id):
|
||
"""Получение статуса задачи импорта"""
|
||
|
||
import_task = get_object_or_404(ImportTask, id=task_id, user=request.user)
|
||
|
||
return Response({
|
||
'task_id': import_task.id,
|
||
'status': import_task.status,
|
||
'created_at': import_task.created_at,
|
||
'updated_at': import_task.updated_at,
|
||
'import_groups': import_task.import_groups,
|
||
'import_links': import_task.import_links,
|
||
'import_styles': import_task.import_styles,
|
||
'import_media': import_task.import_media,
|
||
'overwrite_existing': import_task.overwrite_existing,
|
||
'imported_groups_count': import_task.imported_groups_count,
|
||
'imported_links_count': import_task.imported_links_count,
|
||
'imported_media_count': import_task.imported_media_count,
|
||
'error_message': import_task.error_message,
|
||
})
|
||
|
||
|
||
@api_view(['GET'])
|
||
@permission_classes([IsAuthenticated])
|
||
def import_list(request):
|
||
"""Получение списка задач импорта пользователя"""
|
||
|
||
import_tasks = ImportTask.objects.filter(user=request.user)
|
||
|
||
tasks_data = []
|
||
for task in import_tasks:
|
||
tasks_data.append({
|
||
'id': task.id,
|
||
'status': task.status,
|
||
'created_at': task.created_at,
|
||
'updated_at': task.updated_at,
|
||
'import_groups': task.import_groups,
|
||
'import_links': task.import_links,
|
||
'import_styles': task.import_styles,
|
||
'import_media': task.import_media,
|
||
'overwrite_existing': task.overwrite_existing,
|
||
'imported_groups_count': task.imported_groups_count,
|
||
'imported_links_count': task.imported_links_count,
|
||
'imported_media_count': task.imported_media_count,
|
||
'error_message': task.error_message,
|
||
})
|
||
|
||
return Response({
|
||
'tasks': tasks_data,
|
||
'count': len(tasks_data)
|
||
})
|
||
|
||
|
||
@api_view(['POST'])
|
||
@permission_classes([IsAuthenticated])
|
||
def preview_import(request):
|
||
"""Предварительный просмотр содержимого архива импорта"""
|
||
|
||
if 'import_file' not in request.FILES:
|
||
return Response({
|
||
'error': 'Файл для импорта не предоставлен'
|
||
}, status=status.HTTP_400_BAD_REQUEST)
|
||
|
||
import_file = request.FILES['import_file']
|
||
|
||
if not import_file.name.endswith('.zip'):
|
||
return Response({
|
||
'error': 'Поддерживаются только ZIP архивы'
|
||
}, status=status.HTTP_400_BAD_REQUEST)
|
||
|
||
try:
|
||
with tempfile.TemporaryDirectory() as temp_dir:
|
||
archive_path = Path(temp_dir) / 'preview.zip'
|
||
|
||
# Сохраняем файл
|
||
with open(archive_path, 'wb') as f:
|
||
for chunk in import_file.chunks():
|
||
f.write(chunk)
|
||
|
||
# Извлекаем архив
|
||
extract_dir = Path(temp_dir) / 'extracted'
|
||
with zipfile.ZipFile(archive_path, 'r') as zipf:
|
||
zipf.extractall(extract_dir)
|
||
|
||
# Читаем данные профиля
|
||
profile_data_path = extract_dir / 'profile_data.json'
|
||
if not profile_data_path.exists():
|
||
return Response({
|
||
'error': 'Файл profile_data.json не найден в архиве'
|
||
}, status=status.HTTP_400_BAD_REQUEST)
|
||
|
||
with open(profile_data_path, 'r', encoding='utf-8') as f:
|
||
profile_data = json.load(f)
|
||
|
||
# Формируем превью
|
||
preview = {
|
||
'export_info': profile_data.get('export_info', {}),
|
||
'user_data': profile_data.get('user_data', {}),
|
||
'groups_count': len(profile_data.get('groups', [])),
|
||
'links_count': len(profile_data.get('links', [])),
|
||
'has_design_settings': bool(profile_data.get('design_settings')),
|
||
'media_files': _count_media_files(extract_dir),
|
||
'groups_preview': profile_data.get('groups', [])[:5], # Первые 5 групп для превью
|
||
'links_preview': profile_data.get('links', [])[:10], # Первые 10 ссылок для превью
|
||
}
|
||
|
||
return Response(preview)
|
||
|
||
except Exception as e:
|
||
return Response({
|
||
'error': 'Ошибка при обработке архива',
|
||
'details': str(e)
|
||
}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
|
||
|
||
|
||
def _count_media_files(extract_dir):
|
||
"""Подсчет медиафайлов в архиве"""
|
||
|
||
media_dir = extract_dir / 'media'
|
||
if not media_dir.exists():
|
||
return {}
|
||
|
||
counts = {}
|
||
for category in ['avatars', 'customization', 'link_groups', 'links']:
|
||
category_dir = media_dir / category
|
||
if category_dir.exists():
|
||
counts[category] = len([f for f in category_dir.iterdir() if f.is_file()])
|
||
else:
|
||
counts[category] = 0
|
||
|
||
return counts
|
||
|
||
|
||
def _process_import(import_task):
|
||
"""Обработка импорта профиля"""
|
||
|
||
user = import_task.user
|
||
|
||
# Создаем временную директорию для извлечения архива
|
||
with tempfile.TemporaryDirectory() as temp_dir:
|
||
archive_path = Path(temp_dir) / 'import.zip'
|
||
|
||
# Сохраняем файл во временную директорию
|
||
with open(archive_path, 'wb') as f:
|
||
for chunk in import_task.import_file.chunks():
|
||
f.write(chunk)
|
||
|
||
# Извлекаем архив
|
||
extract_dir = Path(temp_dir) / 'extracted'
|
||
with zipfile.ZipFile(archive_path, 'r') as zipf:
|
||
zipf.extractall(extract_dir)
|
||
|
||
# Читаем данные профиля
|
||
profile_data_path = extract_dir / 'profile_data.json'
|
||
if not profile_data_path.exists():
|
||
raise Exception('Файл profile_data.json не найден в архиве')
|
||
|
||
with open(profile_data_path, 'r', encoding='utf-8') as f:
|
||
profile_data = json.load(f)
|
||
|
||
# Импортируем данные в транзакции
|
||
with transaction.atomic():
|
||
_import_profile_data(import_task, profile_data, extract_dir)
|
||
|
||
|
||
def _import_profile_data(import_task, profile_data, extract_dir):
|
||
"""Импорт данных профиля"""
|
||
|
||
user = import_task.user
|
||
|
||
# Импорт групп
|
||
if import_task.import_groups and 'groups' in profile_data:
|
||
groups_count = _import_groups(user, profile_data['groups'], import_task.overwrite_existing)
|
||
import_task.imported_groups_count = groups_count
|
||
|
||
# Импорт ссылок
|
||
if import_task.import_links and 'links' in profile_data:
|
||
links_count = _import_links(user, profile_data['links'], profile_data.get('groups', []), import_task.overwrite_existing)
|
||
import_task.imported_links_count = links_count
|
||
|
||
# Импорт настроек дизайна
|
||
if import_task.import_styles and 'design_settings' in profile_data:
|
||
_import_design_settings(user, profile_data['design_settings'], import_task.overwrite_existing)
|
||
|
||
# Импорт медиафайлов
|
||
if import_task.import_media:
|
||
media_count = _import_media_files(user, extract_dir, import_task.overwrite_existing)
|
||
import_task.imported_media_count = media_count
|
||
|
||
import_task.save()
|
||
|
||
|
||
def _import_groups(user, groups_data, overwrite_existing):
|
||
"""Импорт групп ссылок"""
|
||
|
||
imported_count = 0
|
||
|
||
for group_data in groups_data:
|
||
# Проверяем существование группы по названию
|
||
existing_group = LinkGroup.objects.filter(
|
||
owner=user,
|
||
title=group_data['title']
|
||
).first()
|
||
|
||
if existing_group and not overwrite_existing:
|
||
continue # Пропускаем если группа существует и перезапись отключена
|
||
|
||
# Создаем или обновляем группу
|
||
group_defaults = {
|
||
'description': group_data.get('description', ''),
|
||
'is_active': group_data.get('is_active', True),
|
||
'is_public': group_data.get('is_public', False),
|
||
'is_featured': group_data.get('is_featured', False),
|
||
'order': group_data.get('order', 0),
|
||
}
|
||
|
||
group, created = LinkGroup.objects.update_or_create(
|
||
owner=user,
|
||
title=group_data['title'],
|
||
defaults=group_defaults
|
||
)
|
||
|
||
imported_count += 1
|
||
|
||
return imported_count
|
||
|
||
|
||
def _import_links(user, links_data, groups_data, overwrite_existing):
|
||
"""Импорт ссылок"""
|
||
|
||
imported_count = 0
|
||
|
||
# Создаем словарь соответствия старых ID групп к новым объектам
|
||
group_mapping = {}
|
||
for group_data in groups_data:
|
||
group = LinkGroup.objects.filter(
|
||
owner=user,
|
||
title=group_data['title']
|
||
).first()
|
||
if group:
|
||
group_mapping[group_data['id']] = group
|
||
|
||
for link_data in links_data:
|
||
# Находим группу для ссылки
|
||
old_group_id = link_data.get('group_id')
|
||
if old_group_id not in group_mapping:
|
||
continue # Пропускаем если группа не найдена
|
||
|
||
target_group = group_mapping[old_group_id]
|
||
|
||
# Проверяем существование ссылки по URL и группе
|
||
existing_link = Link.objects.filter(
|
||
group=target_group,
|
||
url=link_data['url']
|
||
).first()
|
||
|
||
if existing_link and not overwrite_existing:
|
||
continue # Пропускаем если ссылка существует и перезапись отключена
|
||
|
||
# Создаем или обновляем ссылку
|
||
link_defaults = {
|
||
'title': link_data.get('title', ''),
|
||
'description': link_data.get('description', ''),
|
||
'is_active': link_data.get('is_active', True),
|
||
'is_public': link_data.get('is_public', False),
|
||
'is_featured': link_data.get('is_featured', False),
|
||
'order': link_data.get('order', 0),
|
||
}
|
||
|
||
link, created = Link.objects.update_or_create(
|
||
group=target_group,
|
||
url=link_data['url'],
|
||
defaults=link_defaults
|
||
)
|
||
|
||
imported_count += 1
|
||
|
||
return imported_count
|
||
|
||
|
||
def _import_design_settings(user, design_data, overwrite_existing):
|
||
"""Импорт настроек дизайна"""
|
||
|
||
if not design_data:
|
||
return
|
||
|
||
# Получаем или создаем настройки дизайна
|
||
design_settings, created = DesignSettings.objects.get_or_create(
|
||
user=user,
|
||
defaults={}
|
||
)
|
||
|
||
if not created and not overwrite_existing:
|
||
return # Пропускаем если настройки существуют и перезапись отключена
|
||
|
||
# Обновляем настройки
|
||
for field, value in design_data.items():
|
||
if field != 'background_image' and hasattr(design_settings, field):
|
||
setattr(design_settings, field, value)
|
||
|
||
design_settings.save()
|
||
|
||
|
||
def _import_media_files(user, extract_dir, overwrite_existing):
|
||
"""Импорт медиафайлов"""
|
||
|
||
imported_count = 0
|
||
media_dir = extract_dir / 'media'
|
||
|
||
if not media_dir.exists():
|
||
return imported_count
|
||
|
||
# Создаем соответствующие директории в медиа
|
||
user_media_root = Path(settings.MEDIA_ROOT)
|
||
|
||
# Импорт файлов по категориям
|
||
for category in ['avatars', 'customization', 'link_groups', 'links']:
|
||
category_dir = media_dir / category
|
||
if not category_dir.exists():
|
||
continue
|
||
|
||
target_dir = user_media_root / category
|
||
target_dir.mkdir(exist_ok=True, parents=True)
|
||
|
||
# Копируем файлы
|
||
for file_path in category_dir.iterdir():
|
||
if file_path.is_file():
|
||
target_file = target_dir / file_path.name
|
||
|
||
if target_file.exists() and not overwrite_existing:
|
||
continue
|
||
|
||
shutil.copy2(file_path, target_file)
|
||
imported_count += 1
|
||
|
||
return imported_count |