Files
links/backend/export_import/views.py
Andrey K. Choi d78c296e5a Добавлен полнофункциональный экспорт/импорт профилей
- Кнопки 'убрать фон' для всех элементов: профиль, группы, ссылки
- Кнопка 'сбросить настройки интерфейса' с подтверждением
- Django app export_import с полным API для бэкапа и восстановления
- Экспорт: создание ZIP архивов с данными профиля и медиафайлами
- Импорт: селективная загрузка групп, ссылок, стилей, медиа
- Обработка мультипарт форм, Django транзакции, управление ошибками
- Полное тестирование: экспорт → импорт данных между пользователями
- API эндпоинты: /api/export/, /api/import/, превью архивов
- Готовая система для производственного развертывания
2025-11-09 14:28:45 +09:00

732 lines
28 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import json
import zipfile
import tempfile
import os
from pathlib import Path
from django.http import HttpResponse, Http404
from django.conf import settings
from django.shortcuts import get_object_or_404
from django.core.files.storage import default_storage
from django.core.files.base import ContentFile
from django.db import transaction
from rest_framework import status
from rest_framework.decorators import api_view, permission_classes
from rest_framework.permissions import IsAuthenticated
from rest_framework.response import Response
from django.utils import timezone
import shutil
from .models import ExportTask, ImportTask
from users.models import User
from links.models import LinkGroup, Link
from customization.models import DesignSettings
@api_view(['POST'])
@permission_classes([IsAuthenticated])
def create_export(request):
"""Создание задачи экспорта профиля"""
# Получаем параметры экспорта
include_groups = request.data.get('include_groups', True)
include_links = request.data.get('include_links', True)
include_styles = request.data.get('include_styles', True)
include_media = request.data.get('include_media', True)
# Создаем задачу экспорта
export_task = ExportTask.objects.create(
user=request.user,
include_groups=include_groups,
include_links=include_links,
include_styles=include_styles,
include_media=include_media,
)
try:
# Обновляем статус
export_task.status = 'processing'
export_task.save()
# Создаем архив с данными профиля
export_file_path = _create_profile_archive(export_task)
# Сохраняем путь к файлу в задаче
with open(export_file_path, 'rb') as f:
export_task.export_file.save(
f'profile_export_{export_task.user.username}_{timezone.now().strftime("%Y%m%d_%H%M%S")}.zip',
ContentFile(f.read()),
save=True
)
# Удаляем временный файл
os.remove(export_file_path)
export_task.status = 'completed'
export_task.save()
return Response({
'task_id': export_task.id,
'status': export_task.status,
'download_url': f'/api/export/{export_task.id}/download/',
'message': 'Экспорт профиля завершен успешно'
})
except Exception as e:
export_task.status = 'failed'
export_task.error_message = str(e)
export_task.save()
return Response({
'error': 'Ошибка при создании экспорта',
'details': str(e)
}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
@api_view(['GET'])
@permission_classes([IsAuthenticated])
def download_export(request, task_id):
"""Скачивание файла экспорта"""
export_task = get_object_or_404(ExportTask, id=task_id, user=request.user)
if export_task.status != 'completed' or not export_task.export_file:
return Response({
'error': 'Файл экспорта недоступен'
}, status=status.HTTP_404_NOT_FOUND)
try:
response = HttpResponse(
export_task.export_file.read(),
content_type='application/zip'
)
response['Content-Disposition'] = f'attachment; filename="profile_export_{request.user.username}.zip"'
return response
except FileNotFoundError:
raise Http404("Файл экспорта не найден")
@api_view(['GET'])
@permission_classes([IsAuthenticated])
def export_status(request, task_id):
"""Получение статуса задачи экспорта"""
export_task = get_object_or_404(ExportTask, id=task_id, user=request.user)
return Response({
'task_id': export_task.id,
'status': export_task.status,
'created_at': export_task.created_at,
'updated_at': export_task.updated_at,
'error_message': export_task.error_message,
'download_url': f'/api/export/{export_task.id}/download/' if export_task.status == 'completed' else None
})
def _create_profile_archive(export_task):
"""Создание архива с данными профиля"""
user = export_task.user
# Создаем временную директорию
with tempfile.TemporaryDirectory() as temp_dir:
profile_dir = Path(temp_dir) / 'profile_export'
profile_dir.mkdir()
# Создаем структуру данных для экспорта
export_data = {
'export_info': {
'username': user.username,
'export_date': timezone.now().isoformat(),
'export_options': {
'include_groups': export_task.include_groups,
'include_links': export_task.include_links,
'include_styles': export_task.include_styles,
'include_media': export_task.include_media,
}
},
'user_data': {
'username': user.username,
'email': user.email,
'first_name': user.first_name,
'last_name': user.last_name,
'bio': getattr(user, 'bio', ''),
'avatar': user.avatar.url if user.avatar else None,
'cover': user.cover.url if user.cover else None,
},
'groups': [],
'links': [],
'design_settings': {},
}
# Экспорт групп
if export_task.include_groups:
for group in LinkGroup.objects.filter(owner=user):
group_data = {
'id': group.id,
'title': group.title,
'description': group.description,
'image': group.image.url if group.image else None,
'is_active': group.is_active,
'is_public': group.is_public,
'is_featured': group.is_featured,
'created_at': group.created_at.isoformat(),
'order': group.order,
}
export_data['groups'].append(group_data)
# Экспорт ссылок
if export_task.include_links:
for link in Link.objects.filter(group__owner=user):
link_data = {
'id': link.id,
'group_id': link.group.id,
'title': link.title,
'url': link.url,
'description': link.description,
'image': link.image.url if link.image else None,
'is_active': link.is_active,
'is_public': link.is_public,
'is_featured': link.is_featured,
'created_at': link.created_at.isoformat(),
'order': link.order,
}
export_data['links'].append(link_data)
# Экспорт настроек дизайна
if export_task.include_styles:
try:
design_settings = DesignSettings.objects.get(user=user)
export_data['design_settings'] = {
'background_image': design_settings.background_image.url if design_settings.background_image else None,
'theme_color': design_settings.theme_color,
'dashboard_layout': design_settings.dashboard_layout,
'groups_default_expanded': design_settings.groups_default_expanded,
'show_group_icons': design_settings.show_group_icons,
'show_link_icons': design_settings.show_link_icons,
'dashboard_background_color': design_settings.dashboard_background_color,
'font_family': design_settings.font_family,
'custom_css': design_settings.custom_css,
'header_text_color': design_settings.header_text_color,
'group_text_color': design_settings.group_text_color,
'link_text_color': design_settings.link_text_color,
'cover_overlay_enabled': design_settings.cover_overlay_enabled,
'cover_overlay_color': design_settings.cover_overlay_color,
'cover_overlay_opacity': design_settings.cover_overlay_opacity,
'group_overlay_enabled': design_settings.group_overlay_enabled,
'group_overlay_color': design_settings.group_overlay_color,
'group_overlay_opacity': design_settings.group_overlay_opacity,
'show_groups_title': design_settings.show_groups_title,
'group_description_text_color': design_settings.group_description_text_color,
'body_font_family': design_settings.body_font_family,
'heading_font_family': design_settings.heading_font_family,
'template_id': design_settings.template_id,
'link_overlay_enabled': design_settings.link_overlay_enabled,
'link_overlay_color': design_settings.link_overlay_color,
'link_overlay_opacity': design_settings.link_overlay_opacity,
}
except DesignSettings.DoesNotExist:
export_data['design_settings'] = {}
# Сохраняем данные в JSON
json_file = profile_dir / 'profile_data.json'
with open(json_file, 'w', encoding='utf-8') as f:
json.dump(export_data, f, indent=2, ensure_ascii=False)
# Копируем медиафайлы
if export_task.include_media:
media_dir = profile_dir / 'media'
media_dir.mkdir()
# Копируем файлы пользователя
_copy_user_media_files(user, media_dir, export_data)
# Создаем ZIP архив
archive_path = temp_dir + '.zip'
with zipfile.ZipFile(archive_path, 'w', zipfile.ZIP_DEFLATED) as zipf:
for root, dirs, files in os.walk(profile_dir):
for file in files:
file_path = Path(root) / file
arc_name = file_path.relative_to(profile_dir)
zipf.write(file_path, arc_name)
return archive_path
def _copy_user_media_files(user, media_dir, export_data):
"""Копирование медиафайлов пользователя"""
media_root = Path(settings.MEDIA_ROOT)
# Функция для копирования файла
def copy_file_if_exists(url, subdir):
if url and url.startswith('/storage/'):
file_path = media_root / url[9:] # убираем /storage/
if file_path.exists():
target_dir = media_dir / subdir
target_dir.mkdir(exist_ok=True, parents=True)
shutil.copy2(file_path, target_dir / file_path.name)
# Аватар и обложка пользователя
copy_file_if_exists(export_data['user_data']['avatar'], 'avatars')
copy_file_if_exists(export_data['user_data']['cover'], 'avatars')
# Фоновые изображения в настройках дизайна
if export_data['design_settings'].get('background_image'):
copy_file_if_exists(export_data['design_settings']['background_image'], 'customization')
# Изображения групп
for group in export_data['groups']:
copy_file_if_exists(group.get('image'), 'link_groups')
# Изображения ссылок
for link in export_data['links']:
copy_file_if_exists(link.get('image'), 'links')
@api_view(['GET'])
def export_list(request):
"""Получение списка задач экспорта пользователя"""
# Для тестирования - простой ответ
if not request.user.is_authenticated:
return Response({
'message': 'Export API доступен',
'authenticated': False
})
export_tasks = ExportTask.objects.filter(user=request.user)
tasks_data = []
for task in export_tasks:
tasks_data.append({
'id': task.id,
'status': task.status,
'created_at': task.created_at,
'updated_at': task.updated_at,
'include_groups': task.include_groups,
'include_links': task.include_links,
'include_styles': task.include_styles,
'include_media': task.include_media,
'download_url': f'/api/export/{task.id}/download/' if task.status == 'completed' else None,
'error_message': task.error_message,
})
return Response({
'tasks': tasks_data,
'count': len(tasks_data)
})
@api_view(['POST'])
@permission_classes([IsAuthenticated])
def create_import(request):
"""Создание задачи импорта профиля"""
# Проверяем наличие файла
if 'import_file' not in request.FILES:
return Response({
'error': 'Файл для импорта не предоставлен'
}, status=status.HTTP_400_BAD_REQUEST)
import_file = request.FILES['import_file']
# Проверяем тип файла
if not import_file.name.endswith('.zip'):
return Response({
'error': 'Поддерживаются только ZIP архивы'
}, status=status.HTTP_400_BAD_REQUEST)
# Получаем параметры импорта из POST данных (для multipart/form-data)
def get_bool_param(name, default=True):
value = request.data.get(name, request.POST.get(name, str(default)))
return str(value).lower() in ('true', '1', 'yes', 'on')
import_groups = get_bool_param('import_groups', True)
import_links = get_bool_param('import_links', True)
import_styles = get_bool_param('import_styles', True)
import_media = get_bool_param('import_media', True)
overwrite_existing = get_bool_param('overwrite_existing', False)
# Создаем задачу импорта
try:
import_task = ImportTask.objects.create(
user=request.user,
import_file=import_file,
import_groups=import_groups,
import_links=import_links,
import_styles=import_styles,
import_media=import_media,
overwrite_existing=overwrite_existing,
)
# Обновляем статус
import_task.status = 'processing'
import_task.save()
# Выполняем импорт
_process_import(import_task)
import_task.status = 'completed'
import_task.save()
return Response({
'task_id': import_task.id,
'status': import_task.status,
'imported_groups_count': import_task.imported_groups_count,
'imported_links_count': import_task.imported_links_count,
'imported_media_count': import_task.imported_media_count,
'message': 'Импорт профиля завершен успешно'
})
except Exception as e:
# Если задача была создана, обновляем её статус
if 'import_task' in locals():
import_task.status = 'failed'
import_task.error_message = str(e)
import_task.save()
return Response({
'error': 'Ошибка при импорте',
'details': str(e)
}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
@api_view(['GET'])
@permission_classes([IsAuthenticated])
def import_status(request, task_id):
"""Получение статуса задачи импорта"""
import_task = get_object_or_404(ImportTask, id=task_id, user=request.user)
return Response({
'task_id': import_task.id,
'status': import_task.status,
'created_at': import_task.created_at,
'updated_at': import_task.updated_at,
'import_groups': import_task.import_groups,
'import_links': import_task.import_links,
'import_styles': import_task.import_styles,
'import_media': import_task.import_media,
'overwrite_existing': import_task.overwrite_existing,
'imported_groups_count': import_task.imported_groups_count,
'imported_links_count': import_task.imported_links_count,
'imported_media_count': import_task.imported_media_count,
'error_message': import_task.error_message,
})
@api_view(['GET'])
@permission_classes([IsAuthenticated])
def import_list(request):
"""Получение списка задач импорта пользователя"""
import_tasks = ImportTask.objects.filter(user=request.user)
tasks_data = []
for task in import_tasks:
tasks_data.append({
'id': task.id,
'status': task.status,
'created_at': task.created_at,
'updated_at': task.updated_at,
'import_groups': task.import_groups,
'import_links': task.import_links,
'import_styles': task.import_styles,
'import_media': task.import_media,
'overwrite_existing': task.overwrite_existing,
'imported_groups_count': task.imported_groups_count,
'imported_links_count': task.imported_links_count,
'imported_media_count': task.imported_media_count,
'error_message': task.error_message,
})
return Response({
'tasks': tasks_data,
'count': len(tasks_data)
})
@api_view(['POST'])
@permission_classes([IsAuthenticated])
def preview_import(request):
"""Предварительный просмотр содержимого архива импорта"""
if 'import_file' not in request.FILES:
return Response({
'error': 'Файл для импорта не предоставлен'
}, status=status.HTTP_400_BAD_REQUEST)
import_file = request.FILES['import_file']
if not import_file.name.endswith('.zip'):
return Response({
'error': 'Поддерживаются только ZIP архивы'
}, status=status.HTTP_400_BAD_REQUEST)
try:
with tempfile.TemporaryDirectory() as temp_dir:
archive_path = Path(temp_dir) / 'preview.zip'
# Сохраняем файл
with open(archive_path, 'wb') as f:
for chunk in import_file.chunks():
f.write(chunk)
# Извлекаем архив
extract_dir = Path(temp_dir) / 'extracted'
with zipfile.ZipFile(archive_path, 'r') as zipf:
zipf.extractall(extract_dir)
# Читаем данные профиля
profile_data_path = extract_dir / 'profile_data.json'
if not profile_data_path.exists():
return Response({
'error': 'Файл profile_data.json не найден в архиве'
}, status=status.HTTP_400_BAD_REQUEST)
with open(profile_data_path, 'r', encoding='utf-8') as f:
profile_data = json.load(f)
# Формируем превью
preview = {
'export_info': profile_data.get('export_info', {}),
'user_data': profile_data.get('user_data', {}),
'groups_count': len(profile_data.get('groups', [])),
'links_count': len(profile_data.get('links', [])),
'has_design_settings': bool(profile_data.get('design_settings')),
'media_files': _count_media_files(extract_dir),
'groups_preview': profile_data.get('groups', [])[:5], # Первые 5 групп для превью
'links_preview': profile_data.get('links', [])[:10], # Первые 10 ссылок для превью
}
return Response(preview)
except Exception as e:
return Response({
'error': 'Ошибка при обработке архива',
'details': str(e)
}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
def _count_media_files(extract_dir):
"""Подсчет медиафайлов в архиве"""
media_dir = extract_dir / 'media'
if not media_dir.exists():
return {}
counts = {}
for category in ['avatars', 'customization', 'link_groups', 'links']:
category_dir = media_dir / category
if category_dir.exists():
counts[category] = len([f for f in category_dir.iterdir() if f.is_file()])
else:
counts[category] = 0
return counts
def _process_import(import_task):
"""Обработка импорта профиля"""
user = import_task.user
# Создаем временную директорию для извлечения архива
with tempfile.TemporaryDirectory() as temp_dir:
archive_path = Path(temp_dir) / 'import.zip'
# Сохраняем файл во временную директорию
with open(archive_path, 'wb') as f:
for chunk in import_task.import_file.chunks():
f.write(chunk)
# Извлекаем архив
extract_dir = Path(temp_dir) / 'extracted'
with zipfile.ZipFile(archive_path, 'r') as zipf:
zipf.extractall(extract_dir)
# Читаем данные профиля
profile_data_path = extract_dir / 'profile_data.json'
if not profile_data_path.exists():
raise Exception('Файл profile_data.json не найден в архиве')
with open(profile_data_path, 'r', encoding='utf-8') as f:
profile_data = json.load(f)
# Импортируем данные в транзакции
with transaction.atomic():
_import_profile_data(import_task, profile_data, extract_dir)
def _import_profile_data(import_task, profile_data, extract_dir):
"""Импорт данных профиля"""
user = import_task.user
# Импорт групп
if import_task.import_groups and 'groups' in profile_data:
groups_count = _import_groups(user, profile_data['groups'], import_task.overwrite_existing)
import_task.imported_groups_count = groups_count
# Импорт ссылок
if import_task.import_links and 'links' in profile_data:
links_count = _import_links(user, profile_data['links'], profile_data.get('groups', []), import_task.overwrite_existing)
import_task.imported_links_count = links_count
# Импорт настроек дизайна
if import_task.import_styles and 'design_settings' in profile_data:
_import_design_settings(user, profile_data['design_settings'], import_task.overwrite_existing)
# Импорт медиафайлов
if import_task.import_media:
media_count = _import_media_files(user, extract_dir, import_task.overwrite_existing)
import_task.imported_media_count = media_count
import_task.save()
def _import_groups(user, groups_data, overwrite_existing):
"""Импорт групп ссылок"""
imported_count = 0
for group_data in groups_data:
# Проверяем существование группы по названию
existing_group = LinkGroup.objects.filter(
owner=user,
title=group_data['title']
).first()
if existing_group and not overwrite_existing:
continue # Пропускаем если группа существует и перезапись отключена
# Создаем или обновляем группу
group_defaults = {
'description': group_data.get('description', ''),
'is_active': group_data.get('is_active', True),
'is_public': group_data.get('is_public', False),
'is_featured': group_data.get('is_featured', False),
'order': group_data.get('order', 0),
}
group, created = LinkGroup.objects.update_or_create(
owner=user,
title=group_data['title'],
defaults=group_defaults
)
imported_count += 1
return imported_count
def _import_links(user, links_data, groups_data, overwrite_existing):
"""Импорт ссылок"""
imported_count = 0
# Создаем словарь соответствия старых ID групп к новым объектам
group_mapping = {}
for group_data in groups_data:
group = LinkGroup.objects.filter(
owner=user,
title=group_data['title']
).first()
if group:
group_mapping[group_data['id']] = group
for link_data in links_data:
# Находим группу для ссылки
old_group_id = link_data.get('group_id')
if old_group_id not in group_mapping:
continue # Пропускаем если группа не найдена
target_group = group_mapping[old_group_id]
# Проверяем существование ссылки по URL и группе
existing_link = Link.objects.filter(
group=target_group,
url=link_data['url']
).first()
if existing_link and not overwrite_existing:
continue # Пропускаем если ссылка существует и перезапись отключена
# Создаем или обновляем ссылку
link_defaults = {
'title': link_data.get('title', ''),
'description': link_data.get('description', ''),
'is_active': link_data.get('is_active', True),
'is_public': link_data.get('is_public', False),
'is_featured': link_data.get('is_featured', False),
'order': link_data.get('order', 0),
}
link, created = Link.objects.update_or_create(
group=target_group,
url=link_data['url'],
defaults=link_defaults
)
imported_count += 1
return imported_count
def _import_design_settings(user, design_data, overwrite_existing):
"""Импорт настроек дизайна"""
if not design_data:
return
# Получаем или создаем настройки дизайна
design_settings, created = DesignSettings.objects.get_or_create(
user=user,
defaults={}
)
if not created and not overwrite_existing:
return # Пропускаем если настройки существуют и перезапись отключена
# Обновляем настройки
for field, value in design_data.items():
if field != 'background_image' and hasattr(design_settings, field):
setattr(design_settings, field, value)
design_settings.save()
def _import_media_files(user, extract_dir, overwrite_existing):
"""Импорт медиафайлов"""
imported_count = 0
media_dir = extract_dir / 'media'
if not media_dir.exists():
return imported_count
# Создаем соответствующие директории в медиа
user_media_root = Path(settings.MEDIA_ROOT)
# Импорт файлов по категориям
for category in ['avatars', 'customization', 'link_groups', 'links']:
category_dir = media_dir / category
if not category_dir.exists():
continue
target_dir = user_media_root / category
target_dir.mkdir(exist_ok=True, parents=True)
# Копируем файлы
for file_path in category_dir.iterdir():
if file_path.is_file():
target_file = target_dir / file_path.name
if target_file.exists() and not overwrite_existing:
continue
shutil.copy2(file_path, target_file)
imported_count += 1
return imported_count