299 lines
14 KiB
Python
299 lines
14 KiB
Python
import os
|
||
import re
|
||
import yt_dlp
|
||
import time
|
||
import random
|
||
from tqdm import tqdm
|
||
from colorama import Fore, Style, init
|
||
from config import Config
|
||
from fake_useragent import UserAgent
|
||
|
||
# Инициализируем colorama для кроссплатформенной работы с цветами
|
||
init(autoreset=True)
|
||
|
||
class YouTubeDownloader:
|
||
"""Основной класс для скачивания видео с YouTube"""
|
||
|
||
def __init__(self, config=None):
|
||
self.config = config or Config()
|
||
self.progress_bar = None
|
||
|
||
def validate_url(self, url):
|
||
"""Проверяет корректность YouTube URL"""
|
||
youtube_regex = re.compile(
|
||
r'(https?://)?(www\.)?(youtube|youtu|youtube-nocookie)\.(com|be)/'
|
||
r'(watch\?v=|embed/|v/|.+\?v=)?([^&=%\?]{11})'
|
||
)
|
||
return youtube_regex.match(url) is not None
|
||
|
||
def progress_hook(self, d):
|
||
"""Хук для отображения прогресса загрузки"""
|
||
if d['status'] == 'downloading':
|
||
if self.progress_bar is None:
|
||
total_bytes = d.get('total_bytes') or d.get('total_bytes_estimate')
|
||
if total_bytes:
|
||
self.progress_bar = tqdm(
|
||
total=total_bytes,
|
||
unit='B',
|
||
unit_scale=True,
|
||
desc=f"{Fore.BLUE}Скачивание{Style.RESET_ALL}"
|
||
)
|
||
|
||
if self.progress_bar and 'downloaded_bytes' in d:
|
||
downloaded = d['downloaded_bytes']
|
||
# Используем hasattr для проверки custom атрибута
|
||
if hasattr(self.progress_bar, 'last_downloaded'):
|
||
self.progress_bar.update(downloaded - getattr(self.progress_bar, 'last_downloaded', 0))
|
||
else:
|
||
self.progress_bar.update(downloaded)
|
||
# Устанавливаем custom атрибут
|
||
setattr(self.progress_bar, 'last_downloaded', downloaded)
|
||
|
||
elif d['status'] == 'finished':
|
||
if self.progress_bar:
|
||
self.progress_bar.close()
|
||
self.progress_bar = None
|
||
print(f"{Fore.GREEN}✓ Загрузка завершена: {d['filename']}{Style.RESET_ALL}")
|
||
|
||
def get_video_info(self, url):
|
||
"""Получает информацию о видео без загрузки"""
|
||
ydl_opts = {
|
||
'quiet': True,
|
||
'no_warnings': True,
|
||
}
|
||
|
||
try:
|
||
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
|
||
info = ydl.extract_info(url, download=False)
|
||
return {
|
||
'title': info.get('title', 'Неизвестное название'),
|
||
'duration': info.get('duration', 0),
|
||
'uploader': info.get('uploader', 'Неизвестный автор'),
|
||
'view_count': info.get('view_count', 0),
|
||
'upload_date': info.get('upload_date', ''),
|
||
'description': info.get('description', ''),
|
||
'formats': info.get('formats', [])
|
||
}
|
||
except Exception as e:
|
||
print(f"{Fore.RED}Ошибка получения информации о видео: {str(e)}{Style.RESET_ALL}")
|
||
return None
|
||
|
||
def download_video(self, url, quality='best', audio_only=False, output_dir=None):
|
||
"""Скачивает видео по URL"""
|
||
if not self.validate_url(url):
|
||
print(f"{Fore.RED}Ошибка: Некорректный URL YouTube{Style.RESET_ALL}")
|
||
return False
|
||
|
||
# Получаем информацию о видео
|
||
video_info = self.get_video_info(url)
|
||
if not video_info:
|
||
return False
|
||
|
||
print(f"{Fore.CYAN}Название: {video_info['title']}{Style.RESET_ALL}")
|
||
print(f"{Fore.CYAN}Автор: {video_info['uploader']}{Style.RESET_ALL}")
|
||
|
||
if video_info['duration']:
|
||
duration_str = f"{video_info['duration']//60}:{video_info['duration']%60:02d}"
|
||
print(f"{Fore.CYAN}Длительность: {duration_str}{Style.RESET_ALL}")
|
||
|
||
# Определяем папку для загрузки
|
||
output_path = output_dir or self.config.create_output_directory()
|
||
|
||
# Настройки для yt-dlp с улучшенным обходом блокировок
|
||
ua = UserAgent()
|
||
ydl_opts = {
|
||
'outtmpl': os.path.join(str(output_path), '%(title)s.%(ext)s'),
|
||
'progress_hooks': [self.progress_hook],
|
||
'writeinfojson': self.config.get('add_metadata', True),
|
||
'writesubtitles': self.config.get('download_subtitles', False),
|
||
'writeautomaticsub': self.config.get('download_subtitles', False),
|
||
# Настройки для обхода блокировок
|
||
'http_headers': {
|
||
'User-Agent': ua.random,
|
||
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
|
||
'Accept-Language': 'en-US,en;q=0.5',
|
||
'Accept-Encoding': 'gzip, deflate',
|
||
'DNT': '1',
|
||
'Connection': 'keep-alive',
|
||
'Upgrade-Insecure-Requests': '1',
|
||
},
|
||
'extractor_retries': 3,
|
||
'file_access_retries': 3,
|
||
'fragment_retries': 3,
|
||
'retry_sleep_functions': {
|
||
'http': lambda n: min(4 * 2**n, 30),
|
||
'fragment': lambda n: min(4 * 2**n, 30),
|
||
'file_access': lambda n: min(4 * 2**n, 30),
|
||
},
|
||
'sleep_interval_requests': 1,
|
||
'sleep_interval': 0,
|
||
'max_sleep_interval': 5,
|
||
}
|
||
|
||
# Настройки качества и формата
|
||
if audio_only:
|
||
ydl_opts.update({
|
||
'format': 'bestaudio/best',
|
||
'postprocessors': [{
|
||
'key': 'FFmpegExtractAudio',
|
||
'preferredcodec': self.config.get('audio_format', 'mp3'),
|
||
'preferredquality': '192',
|
||
}],
|
||
})
|
||
print(f"{Fore.YELLOW}Режим: только аудио ({self.config.get('audio_format', 'mp3')}){Style.RESET_ALL}")
|
||
else:
|
||
if quality == 'best':
|
||
format_string = 'best[ext=mp4]/best'
|
||
elif quality.endswith('p'):
|
||
height = quality[:-1]
|
||
format_string = f'best[height<={height}][ext=mp4]/best[height<={height}]/best[ext=mp4]/best'
|
||
else:
|
||
format_string = 'best[ext=mp4]/best'
|
||
|
||
ydl_opts['format'] = format_string
|
||
print(f"{Fore.YELLOW}Качество: {quality}{Style.RESET_ALL}")
|
||
|
||
# Загрузка субтитров
|
||
if self.config.get('download_subtitles', False):
|
||
ydl_opts['subtitleslangs'] = self.config.get('subtitle_languages', ['ru', 'en'])
|
||
|
||
try:
|
||
# Первая попытка с обычными настройками
|
||
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
|
||
print(f"{Fore.BLUE}Начинаю загрузку...{Style.RESET_ALL}")
|
||
ydl.download([url])
|
||
return True
|
||
|
||
except Exception as e:
|
||
error_msg = str(e)
|
||
print(f"{Fore.YELLOW}Первая попытка неудачна: {error_msg}{Style.RESET_ALL}")
|
||
|
||
# Если ошибка 403 или блокировка, пробуем альтернативные настройки
|
||
if "403" in error_msg or "Forbidden" in error_msg or "throttl" in error_msg or "HTTP Error" in error_msg:
|
||
return self._retry_download_with_fallback(url, ydl_opts)
|
||
else:
|
||
print(f"{Fore.RED}Ошибка загрузки: {error_msg}{Style.RESET_ALL}")
|
||
return False
|
||
|
||
def _retry_download_with_fallback(self, url, base_opts):
|
||
"""Повторная попытка загрузки с альтернативными настройками"""
|
||
fallback_strategies = [
|
||
{
|
||
'name': 'Альтернативный User-Agent',
|
||
'opts': {
|
||
**base_opts,
|
||
'http_headers': {
|
||
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
|
||
}
|
||
}
|
||
},
|
||
{
|
||
'name': 'Без метаданных',
|
||
'opts': {
|
||
**{k: v for k, v in base_opts.items() if k not in ['writeinfojson', 'writesubtitles', 'writeautomaticsub']},
|
||
'writeinfojson': False,
|
||
'writesubtitles': False,
|
||
'writeautomaticsub': False
|
||
}
|
||
},
|
||
{
|
||
'name': 'Только аудио формат',
|
||
'opts': {
|
||
**base_opts,
|
||
'format': 'bestaudio[ext=m4a]/bestaudio/best[height<=480]'
|
||
}
|
||
}
|
||
]
|
||
|
||
for i, strategy in enumerate(fallback_strategies, 1):
|
||
print(f"{Fore.CYAN}Попытка {i+1}: {strategy['name']}{Style.RESET_ALL}")
|
||
try:
|
||
# Добавляем задержку между попытками
|
||
time.sleep(random.uniform(2, 5))
|
||
|
||
with yt_dlp.YoutubeDL(strategy['opts']) as ydl:
|
||
ydl.download([url])
|
||
print(f"{Fore.GREEN}✓ Успешно загружено с настройкой: {strategy['name']}{Style.RESET_ALL}")
|
||
return True
|
||
|
||
except Exception as e:
|
||
print(f"{Fore.RED}✗ {strategy['name']}: {str(e)}{Style.RESET_ALL}")
|
||
continue
|
||
|
||
print(f"{Fore.RED}Все попытки загрузки неудачны{Style.RESET_ALL}")
|
||
return False
|
||
|
||
def download_playlist(self, url, quality='best', audio_only=False, output_dir=None):
|
||
"""Скачивает плейлист"""
|
||
if not self.validate_url(url):
|
||
print(f"{Fore.RED}Ошибка: Некорректный URL{Style.RESET_ALL}")
|
||
return False
|
||
|
||
output_path = output_dir or self.config.create_output_directory()
|
||
|
||
ydl_opts = {
|
||
'outtmpl': os.path.join(str(output_path), '%(playlist_title)s/%(playlist_index)02d - %(title)s.%(ext)s'),
|
||
'progress_hooks': [self.progress_hook],
|
||
'writeinfojson': self.config.get('add_metadata', True),
|
||
'noplaylist': False, # Включаем загрузку плейлиста
|
||
}
|
||
|
||
# Настройки для аудио или видео
|
||
if audio_only:
|
||
ydl_opts.update({
|
||
'format': 'bestaudio/best',
|
||
'postprocessors': [{
|
||
'key': 'FFmpegExtractAudio',
|
||
'preferredcodec': self.config.get('audio_format', 'mp3'),
|
||
'preferredquality': '192',
|
||
}],
|
||
})
|
||
else:
|
||
if quality == 'best':
|
||
ydl_opts['format'] = 'best[ext=mp4]/best'
|
||
elif quality.endswith('p'):
|
||
height = quality[:-1]
|
||
ydl_opts['format'] = f'best[height<={height}][ext=mp4]/best[height<={height}]/best[ext=mp4]/best'
|
||
else:
|
||
ydl_opts['format'] = 'best[ext=mp4]/best'
|
||
|
||
try:
|
||
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
|
||
# Получаем информацию о плейлисте
|
||
playlist_info = ydl.extract_info(url, download=False)
|
||
if 'entries' in playlist_info:
|
||
print(f"{Fore.CYAN}Плейлист: {playlist_info.get('title', 'Неизвестный плейлист')}{Style.RESET_ALL}")
|
||
print(f"{Fore.CYAN}Количество видео: {len(playlist_info['entries'])}{Style.RESET_ALL}")
|
||
|
||
print(f"{Fore.BLUE}Начинаю загрузку плейлиста...{Style.RESET_ALL}")
|
||
ydl.download([url])
|
||
return True
|
||
else:
|
||
# Это одиночное видео, а не плейлист
|
||
return self.download_video(url, quality, audio_only, output_dir)
|
||
|
||
except Exception as e:
|
||
print(f"{Fore.RED}Ошибка загрузки плейлиста: {str(e)}{Style.RESET_ALL}")
|
||
return False
|
||
|
||
def get_available_formats(self, url):
|
||
"""Получает список доступных форматов для видео"""
|
||
try:
|
||
with yt_dlp.YoutubeDL({'quiet': True}) as ydl:
|
||
info = ydl.extract_info(url, download=False)
|
||
formats = []
|
||
if info and isinstance(info, dict) and 'formats' in info:
|
||
formats_list = info.get('formats')
|
||
if formats_list:
|
||
for f in formats_list:
|
||
if f.get('height'): # Только видео форматы
|
||
formats.append({
|
||
'quality': f"{f.get('height')}p",
|
||
'ext': f.get('ext', ''),
|
||
'filesize': f.get('filesize', 0),
|
||
'format_id': f.get('format_id', '')
|
||
})
|
||
return formats
|
||
except Exception as e:
|
||
print(f"{Fore.RED}Ошибка получения форматов: {str(e)}{Style.RESET_ALL}")
|
||
return [] |