import json import zipfile import tempfile import os from pathlib import Path from django.http import HttpResponse, Http404 from django.conf import settings from django.shortcuts import get_object_or_404 from django.core.files.storage import default_storage from django.core.files.base import ContentFile from django.db import transaction from rest_framework import status from rest_framework.decorators import api_view, permission_classes from rest_framework.permissions import IsAuthenticated from rest_framework.response import Response from django.utils import timezone import shutil from .models import ExportTask, ImportTask from users.models import User from links.models import LinkGroup, Link from customization.models import DesignSettings @api_view(['POST']) @permission_classes([IsAuthenticated]) def create_export(request): """Создание задачи экспорта профиля""" # Получаем параметры экспорта include_groups = request.data.get('include_groups', True) include_links = request.data.get('include_links', True) include_styles = request.data.get('include_styles', True) include_media = request.data.get('include_media', True) # Создаем задачу экспорта export_task = ExportTask.objects.create( user=request.user, include_groups=include_groups, include_links=include_links, include_styles=include_styles, include_media=include_media, ) try: # Обновляем статус export_task.status = 'processing' export_task.save() # Создаем архив с данными профиля export_file_path = _create_profile_archive(export_task) # Сохраняем путь к файлу в задаче with open(export_file_path, 'rb') as f: export_task.export_file.save( f'profile_export_{export_task.user.username}_{timezone.now().strftime("%Y%m%d_%H%M%S")}.zip', ContentFile(f.read()), save=True ) # Удаляем временный файл os.remove(export_file_path) export_task.status = 'completed' export_task.save() return Response({ 'task_id': export_task.id, 'status': export_task.status, 'download_url': f'/api/export/{export_task.id}/download/', 'message': 'Экспорт профиля завершен успешно' }) except Exception as e: export_task.status = 'failed' export_task.error_message = str(e) export_task.save() return Response({ 'error': 'Ошибка при создании экспорта', 'details': str(e) }, status=status.HTTP_500_INTERNAL_SERVER_ERROR) @api_view(['GET']) @permission_classes([IsAuthenticated]) def download_export(request, task_id): """Скачивание файла экспорта""" export_task = get_object_or_404(ExportTask, id=task_id, user=request.user) if export_task.status != 'completed' or not export_task.export_file: return Response({ 'error': 'Файл экспорта недоступен' }, status=status.HTTP_404_NOT_FOUND) try: response = HttpResponse( export_task.export_file.read(), content_type='application/zip' ) response['Content-Disposition'] = f'attachment; filename="profile_export_{request.user.username}.zip"' return response except FileNotFoundError: raise Http404("Файл экспорта не найден") @api_view(['GET']) @permission_classes([IsAuthenticated]) def export_status(request, task_id): """Получение статуса задачи экспорта""" export_task = get_object_or_404(ExportTask, id=task_id, user=request.user) return Response({ 'task_id': export_task.id, 'status': export_task.status, 'created_at': export_task.created_at, 'updated_at': export_task.updated_at, 'error_message': export_task.error_message, 'download_url': f'/api/export/{export_task.id}/download/' if export_task.status == 'completed' else None }) def _create_profile_archive(export_task): """Создание архива с данными профиля""" user = export_task.user # Создаем временную директорию with tempfile.TemporaryDirectory() as temp_dir: profile_dir = Path(temp_dir) / 'profile_export' profile_dir.mkdir() # Создаем структуру данных для экспорта export_data = { 'export_info': { 'username': user.username, 'export_date': timezone.now().isoformat(), 'export_options': { 'include_groups': export_task.include_groups, 'include_links': export_task.include_links, 'include_styles': export_task.include_styles, 'include_media': export_task.include_media, } }, 'user_data': { 'username': user.username, 'email': user.email, 'first_name': user.first_name, 'last_name': user.last_name, 'bio': getattr(user, 'bio', ''), 'avatar': user.avatar.url if user.avatar else None, 'cover': user.cover.url if user.cover else None, }, 'groups': [], 'links': [], 'design_settings': {}, } # Экспорт групп if export_task.include_groups: for group in LinkGroup.objects.filter(owner=user): group_data = { 'id': group.id, 'title': group.title, 'description': group.description, 'image': group.image.url if group.image else None, 'is_active': group.is_active, 'is_public': group.is_public, 'is_featured': group.is_featured, 'created_at': group.created_at.isoformat(), 'order': group.order, } export_data['groups'].append(group_data) # Экспорт ссылок if export_task.include_links: for link in Link.objects.filter(group__owner=user): link_data = { 'id': link.id, 'group_id': link.group.id, 'title': link.title, 'url': link.url, 'description': link.description, 'image': link.image.url if link.image else None, 'is_active': link.is_active, 'is_public': link.is_public, 'is_featured': link.is_featured, 'created_at': link.created_at.isoformat(), 'order': link.order, } export_data['links'].append(link_data) # Экспорт настроек дизайна if export_task.include_styles: try: design_settings = DesignSettings.objects.get(user=user) export_data['design_settings'] = { 'background_image': design_settings.background_image.url if design_settings.background_image else None, 'theme_color': design_settings.theme_color, 'dashboard_layout': design_settings.dashboard_layout, 'groups_default_expanded': design_settings.groups_default_expanded, 'show_group_icons': design_settings.show_group_icons, 'show_link_icons': design_settings.show_link_icons, 'dashboard_background_color': design_settings.dashboard_background_color, 'font_family': design_settings.font_family, 'custom_css': design_settings.custom_css, 'header_text_color': design_settings.header_text_color, 'group_text_color': design_settings.group_text_color, 'link_text_color': design_settings.link_text_color, 'cover_overlay_enabled': design_settings.cover_overlay_enabled, 'cover_overlay_color': design_settings.cover_overlay_color, 'cover_overlay_opacity': design_settings.cover_overlay_opacity, 'group_overlay_enabled': design_settings.group_overlay_enabled, 'group_overlay_color': design_settings.group_overlay_color, 'group_overlay_opacity': design_settings.group_overlay_opacity, 'show_groups_title': design_settings.show_groups_title, 'group_description_text_color': design_settings.group_description_text_color, 'body_font_family': design_settings.body_font_family, 'heading_font_family': design_settings.heading_font_family, 'template_id': design_settings.template_id, 'link_overlay_enabled': design_settings.link_overlay_enabled, 'link_overlay_color': design_settings.link_overlay_color, 'link_overlay_opacity': design_settings.link_overlay_opacity, } except DesignSettings.DoesNotExist: export_data['design_settings'] = {} # Сохраняем данные в JSON json_file = profile_dir / 'profile_data.json' with open(json_file, 'w', encoding='utf-8') as f: json.dump(export_data, f, indent=2, ensure_ascii=False) # Копируем медиафайлы if export_task.include_media: media_dir = profile_dir / 'media' media_dir.mkdir() # Копируем файлы пользователя _copy_user_media_files(user, media_dir, export_data) # Создаем ZIP архив archive_path = temp_dir + '.zip' with zipfile.ZipFile(archive_path, 'w', zipfile.ZIP_DEFLATED) as zipf: for root, dirs, files in os.walk(profile_dir): for file in files: file_path = Path(root) / file arc_name = file_path.relative_to(profile_dir) zipf.write(file_path, arc_name) return archive_path def _copy_user_media_files(user, media_dir, export_data): """Копирование медиафайлов пользователя""" media_root = Path(settings.MEDIA_ROOT) # Функция для копирования файла def copy_file_if_exists(url, subdir): if url and url.startswith('/storage/'): file_path = media_root / url[9:] # убираем /storage/ if file_path.exists(): target_dir = media_dir / subdir target_dir.mkdir(exist_ok=True, parents=True) shutil.copy2(file_path, target_dir / file_path.name) # Аватар и обложка пользователя copy_file_if_exists(export_data['user_data']['avatar'], 'avatars') copy_file_if_exists(export_data['user_data']['cover'], 'avatars') # Фоновые изображения в настройках дизайна if export_data['design_settings'].get('background_image'): copy_file_if_exists(export_data['design_settings']['background_image'], 'customization') # Изображения групп for group in export_data['groups']: copy_file_if_exists(group.get('image'), 'link_groups') # Изображения ссылок for link in export_data['links']: copy_file_if_exists(link.get('image'), 'links') @api_view(['GET']) def export_list(request): """Получение списка задач экспорта пользователя""" # Для тестирования - простой ответ if not request.user.is_authenticated: return Response({ 'message': 'Export API доступен', 'authenticated': False }) export_tasks = ExportTask.objects.filter(user=request.user) tasks_data = [] for task in export_tasks: tasks_data.append({ 'id': task.id, 'status': task.status, 'created_at': task.created_at, 'updated_at': task.updated_at, 'include_groups': task.include_groups, 'include_links': task.include_links, 'include_styles': task.include_styles, 'include_media': task.include_media, 'download_url': f'/api/export/{task.id}/download/' if task.status == 'completed' else None, 'error_message': task.error_message, }) return Response({ 'tasks': tasks_data, 'count': len(tasks_data) }) @api_view(['POST']) @permission_classes([IsAuthenticated]) def create_import(request): """Создание задачи импорта профиля""" # Проверяем наличие файла if 'import_file' not in request.FILES: return Response({ 'error': 'Файл для импорта не предоставлен' }, status=status.HTTP_400_BAD_REQUEST) import_file = request.FILES['import_file'] # Проверяем тип файла if not import_file.name.endswith('.zip'): return Response({ 'error': 'Поддерживаются только ZIP архивы' }, status=status.HTTP_400_BAD_REQUEST) # Получаем параметры импорта из POST данных (для multipart/form-data) def get_bool_param(name, default=True): value = request.data.get(name, request.POST.get(name, str(default))) return str(value).lower() in ('true', '1', 'yes', 'on') import_groups = get_bool_param('import_groups', True) import_links = get_bool_param('import_links', True) import_styles = get_bool_param('import_styles', True) import_media = get_bool_param('import_media', True) overwrite_existing = get_bool_param('overwrite_existing', False) # Создаем задачу импорта try: import_task = ImportTask.objects.create( user=request.user, import_file=import_file, import_groups=import_groups, import_links=import_links, import_styles=import_styles, import_media=import_media, overwrite_existing=overwrite_existing, ) # Обновляем статус import_task.status = 'processing' import_task.save() # Выполняем импорт _process_import(import_task) import_task.status = 'completed' import_task.save() return Response({ 'task_id': import_task.id, 'status': import_task.status, 'imported_groups_count': import_task.imported_groups_count, 'imported_links_count': import_task.imported_links_count, 'imported_media_count': import_task.imported_media_count, 'message': 'Импорт профиля завершен успешно' }) except Exception as e: # Если задача была создана, обновляем её статус if 'import_task' in locals(): import_task.status = 'failed' import_task.error_message = str(e) import_task.save() return Response({ 'error': 'Ошибка при импорте', 'details': str(e) }, status=status.HTTP_500_INTERNAL_SERVER_ERROR) @api_view(['GET']) @permission_classes([IsAuthenticated]) def import_status(request, task_id): """Получение статуса задачи импорта""" import_task = get_object_or_404(ImportTask, id=task_id, user=request.user) return Response({ 'task_id': import_task.id, 'status': import_task.status, 'created_at': import_task.created_at, 'updated_at': import_task.updated_at, 'import_groups': import_task.import_groups, 'import_links': import_task.import_links, 'import_styles': import_task.import_styles, 'import_media': import_task.import_media, 'overwrite_existing': import_task.overwrite_existing, 'imported_groups_count': import_task.imported_groups_count, 'imported_links_count': import_task.imported_links_count, 'imported_media_count': import_task.imported_media_count, 'error_message': import_task.error_message, }) @api_view(['GET']) @permission_classes([IsAuthenticated]) def import_list(request): """Получение списка задач импорта пользователя""" import_tasks = ImportTask.objects.filter(user=request.user) tasks_data = [] for task in import_tasks: tasks_data.append({ 'id': task.id, 'status': task.status, 'created_at': task.created_at, 'updated_at': task.updated_at, 'import_groups': task.import_groups, 'import_links': task.import_links, 'import_styles': task.import_styles, 'import_media': task.import_media, 'overwrite_existing': task.overwrite_existing, 'imported_groups_count': task.imported_groups_count, 'imported_links_count': task.imported_links_count, 'imported_media_count': task.imported_media_count, 'error_message': task.error_message, }) return Response({ 'tasks': tasks_data, 'count': len(tasks_data) }) @api_view(['POST']) @permission_classes([IsAuthenticated]) def preview_import(request): """Предварительный просмотр содержимого архива импорта""" if 'import_file' not in request.FILES: return Response({ 'error': 'Файл для импорта не предоставлен' }, status=status.HTTP_400_BAD_REQUEST) import_file = request.FILES['import_file'] if not import_file.name.endswith('.zip'): return Response({ 'error': 'Поддерживаются только ZIP архивы' }, status=status.HTTP_400_BAD_REQUEST) try: with tempfile.TemporaryDirectory() as temp_dir: archive_path = Path(temp_dir) / 'preview.zip' # Сохраняем файл with open(archive_path, 'wb') as f: for chunk in import_file.chunks(): f.write(chunk) # Извлекаем архив extract_dir = Path(temp_dir) / 'extracted' with zipfile.ZipFile(archive_path, 'r') as zipf: zipf.extractall(extract_dir) # Читаем данные профиля profile_data_path = extract_dir / 'profile_data.json' if not profile_data_path.exists(): return Response({ 'error': 'Файл profile_data.json не найден в архиве' }, status=status.HTTP_400_BAD_REQUEST) with open(profile_data_path, 'r', encoding='utf-8') as f: profile_data = json.load(f) # Формируем превью preview = { 'export_info': profile_data.get('export_info', {}), 'user_data': profile_data.get('user_data', {}), 'groups_count': len(profile_data.get('groups', [])), 'links_count': len(profile_data.get('links', [])), 'has_design_settings': bool(profile_data.get('design_settings')), 'media_files': _count_media_files(extract_dir), 'groups_preview': profile_data.get('groups', [])[:5], # Первые 5 групп для превью 'links_preview': profile_data.get('links', [])[:10], # Первые 10 ссылок для превью } return Response(preview) except Exception as e: return Response({ 'error': 'Ошибка при обработке архива', 'details': str(e) }, status=status.HTTP_500_INTERNAL_SERVER_ERROR) def _count_media_files(extract_dir): """Подсчет медиафайлов в архиве""" media_dir = extract_dir / 'media' if not media_dir.exists(): return {} counts = {} for category in ['avatars', 'customization', 'link_groups', 'links']: category_dir = media_dir / category if category_dir.exists(): counts[category] = len([f for f in category_dir.iterdir() if f.is_file()]) else: counts[category] = 0 return counts def _process_import(import_task): """Обработка импорта профиля""" user = import_task.user # Создаем временную директорию для извлечения архива with tempfile.TemporaryDirectory() as temp_dir: archive_path = Path(temp_dir) / 'import.zip' # Сохраняем файл во временную директорию with open(archive_path, 'wb') as f: for chunk in import_task.import_file.chunks(): f.write(chunk) # Извлекаем архив extract_dir = Path(temp_dir) / 'extracted' with zipfile.ZipFile(archive_path, 'r') as zipf: zipf.extractall(extract_dir) # Читаем данные профиля profile_data_path = extract_dir / 'profile_data.json' if not profile_data_path.exists(): raise Exception('Файл profile_data.json не найден в архиве') with open(profile_data_path, 'r', encoding='utf-8') as f: profile_data = json.load(f) # Импортируем данные в транзакции with transaction.atomic(): _import_profile_data(import_task, profile_data, extract_dir) def _import_profile_data(import_task, profile_data, extract_dir): """Импорт данных профиля""" user = import_task.user # Импорт групп if import_task.import_groups and 'groups' in profile_data: groups_count = _import_groups(user, profile_data['groups'], import_task.overwrite_existing) import_task.imported_groups_count = groups_count # Импорт ссылок if import_task.import_links and 'links' in profile_data: links_count = _import_links(user, profile_data['links'], profile_data.get('groups', []), import_task.overwrite_existing) import_task.imported_links_count = links_count # Импорт настроек дизайна if import_task.import_styles and 'design_settings' in profile_data: _import_design_settings(user, profile_data['design_settings'], import_task.overwrite_existing) # Импорт медиафайлов if import_task.import_media: media_count = _import_media_files(user, extract_dir, import_task.overwrite_existing) import_task.imported_media_count = media_count import_task.save() def _import_groups(user, groups_data, overwrite_existing): """Импорт групп ссылок""" imported_count = 0 for group_data in groups_data: # Проверяем существование группы по названию existing_group = LinkGroup.objects.filter( owner=user, title=group_data['title'] ).first() if existing_group and not overwrite_existing: continue # Пропускаем если группа существует и перезапись отключена # Создаем или обновляем группу group_defaults = { 'description': group_data.get('description', ''), 'is_active': group_data.get('is_active', True), 'is_public': group_data.get('is_public', False), 'is_featured': group_data.get('is_featured', False), 'order': group_data.get('order', 0), } group, created = LinkGroup.objects.update_or_create( owner=user, title=group_data['title'], defaults=group_defaults ) imported_count += 1 return imported_count def _import_links(user, links_data, groups_data, overwrite_existing): """Импорт ссылок""" imported_count = 0 # Создаем словарь соответствия старых ID групп к новым объектам group_mapping = {} for group_data in groups_data: group = LinkGroup.objects.filter( owner=user, title=group_data['title'] ).first() if group: group_mapping[group_data['id']] = group for link_data in links_data: # Находим группу для ссылки old_group_id = link_data.get('group_id') if old_group_id not in group_mapping: continue # Пропускаем если группа не найдена target_group = group_mapping[old_group_id] # Проверяем существование ссылки по URL и группе existing_link = Link.objects.filter( group=target_group, url=link_data['url'] ).first() if existing_link and not overwrite_existing: continue # Пропускаем если ссылка существует и перезапись отключена # Создаем или обновляем ссылку link_defaults = { 'title': link_data.get('title', ''), 'description': link_data.get('description', ''), 'is_active': link_data.get('is_active', True), 'is_public': link_data.get('is_public', False), 'is_featured': link_data.get('is_featured', False), 'order': link_data.get('order', 0), } link, created = Link.objects.update_or_create( group=target_group, url=link_data['url'], defaults=link_defaults ) imported_count += 1 return imported_count def _import_design_settings(user, design_data, overwrite_existing): """Импорт настроек дизайна""" if not design_data: return # Получаем или создаем настройки дизайна design_settings, created = DesignSettings.objects.get_or_create( user=user, defaults={} ) if not created and not overwrite_existing: return # Пропускаем если настройки существуют и перезапись отключена # Обновляем настройки for field, value in design_data.items(): if field != 'background_image' and hasattr(design_settings, field): setattr(design_settings, field, value) design_settings.save() def _import_media_files(user, extract_dir, overwrite_existing): """Импорт медиафайлов""" imported_count = 0 media_dir = extract_dir / 'media' if not media_dir.exists(): return imported_count # Создаем соответствующие директории в медиа user_media_root = Path(settings.MEDIA_ROOT) # Импорт файлов по категориям for category in ['avatars', 'customization', 'link_groups', 'links']: category_dir = media_dir / category if not category_dir.exists(): continue target_dir = user_media_root / category target_dir.mkdir(exist_ok=True, parents=True) # Копируем файлы for file_path in category_dir.iterdir(): if file_path.is_file(): target_file = target_dir / file_path.name if target_file.exists() and not overwrite_existing: continue shutil.copy2(file_path, target_file) imported_count += 1 return imported_count