Add ytdlp_handler.py
This commit is contained in:
337
ytdlp_handler.py
Normal file
337
ytdlp_handler.py
Normal file
@@ -0,0 +1,337 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
YouTube-DLP Integration Module.
|
||||
|
||||
Provides async wrappers for searching and downloading content via yt-dlp.
|
||||
Includes parsing logic for yt-dlp's JSON output and progress updates.
|
||||
"""
|
||||
|
||||
import os
|
||||
import asyncio
|
||||
import subprocess
|
||||
import json
|
||||
import re
|
||||
from pathlib import Path
|
||||
from typing import Optional, List, Dict, Callable
|
||||
from dataclasses import dataclass
|
||||
import logging
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@dataclass
|
||||
class SearchResult:
|
||||
"""Represents a single search result from yt-dlp."""
|
||||
id: str
|
||||
title: str
|
||||
duration: float
|
||||
uploader: str
|
||||
url: str
|
||||
thumbnail: Optional[str] = None
|
||||
view_count: int = 0
|
||||
|
||||
@property
|
||||
def duration_str(self) -> str:
|
||||
"""Formats duration seconds into HH:MM:SS string."""
|
||||
try:
|
||||
total_seconds = int(self.duration) if self.duration else 0
|
||||
|
||||
if total_seconds < 0:
|
||||
return "??:??"
|
||||
|
||||
hours = total_seconds // 3600
|
||||
minutes = (total_seconds % 3600) // 60
|
||||
seconds = total_seconds % 60
|
||||
|
||||
if hours > 0:
|
||||
return f"{hours}:{minutes:02d}:{seconds:02d}"
|
||||
return f"{minutes}:{seconds:02d}"
|
||||
except (TypeError, ValueError):
|
||||
return "??:??"
|
||||
|
||||
def to_dict(self) -> Dict:
|
||||
return {
|
||||
'id': self.id,
|
||||
'title': self.title,
|
||||
'duration': self.duration,
|
||||
'duration_str': self.duration_str,
|
||||
'uploader': self.uploader,
|
||||
'url': self.url,
|
||||
'thumbnail': self.thumbnail,
|
||||
'view_count': self.view_count
|
||||
}
|
||||
|
||||
|
||||
@dataclass
|
||||
class DownloadProgress:
|
||||
"""Real-time status of an active download."""
|
||||
filename: str
|
||||
status: str
|
||||
percent: float = 0.0
|
||||
speed: str = ""
|
||||
eta: str = ""
|
||||
error: str = ""
|
||||
|
||||
|
||||
class YtDlpHandler:
|
||||
"""Manages yt-dlp subprocesses."""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
download_directory: str,
|
||||
audio_format: str = "bestaudio",
|
||||
max_filesize: int = 50,
|
||||
max_search_results: int = 10
|
||||
):
|
||||
self.download_directory = Path(download_directory)
|
||||
self.download_directory.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
self.audio_format = audio_format
|
||||
self.max_filesize = max_filesize * 1024 * 1024 if max_filesize > 0 else 0
|
||||
self.max_search_results = max_search_results
|
||||
|
||||
self._ytdlp_path = self._find_ytdlp()
|
||||
self._active_downloads: Dict[str, asyncio.subprocess.Process] = {}
|
||||
|
||||
def _find_ytdlp(self) -> str:
|
||||
"""Locates the yt-dlp binary in the system path."""
|
||||
for name in ['yt-dlp', 'yt-dlp.exe', 'youtube-dl']:
|
||||
try:
|
||||
result = subprocess.run(
|
||||
[name, '--version'],
|
||||
capture_output=True,
|
||||
text=True
|
||||
)
|
||||
if result.returncode == 0:
|
||||
logger.info(f"Found yt-dlp: {name} (version {result.stdout.strip()})")
|
||||
return name
|
||||
except FileNotFoundError:
|
||||
continue
|
||||
|
||||
raise RuntimeError("yt-dlp not found. Install with: pip install yt-dlp")
|
||||
|
||||
async def search(self, query: str, source: str = "youtube") -> List[SearchResult]:
|
||||
"""Performs a non-download search using yt-dlp's internal search operators."""
|
||||
search_prefix = {
|
||||
"youtube": "ytsearch",
|
||||
"soundcloud": "scsearch",
|
||||
"bandcamp": "bcsearch"
|
||||
}.get(source, "ytsearch")
|
||||
|
||||
search_query = f"{search_prefix}{self.max_search_results}:{query}"
|
||||
|
||||
cmd = [
|
||||
self._ytdlp_path,
|
||||
search_query,
|
||||
'--dump-json',
|
||||
'--no-download',
|
||||
'--flat-playlist',
|
||||
'--ignore-errors',
|
||||
'--no-warnings'
|
||||
]
|
||||
|
||||
try:
|
||||
process = await asyncio.create_subprocess_exec(
|
||||
*cmd,
|
||||
stdout=asyncio.subprocess.PIPE,
|
||||
stderr=asyncio.subprocess.PIPE
|
||||
)
|
||||
|
||||
stdout, stderr = await asyncio.wait_for(
|
||||
process.communicate(),
|
||||
timeout=60
|
||||
)
|
||||
|
||||
results = []
|
||||
for line in stdout.decode().strip().split('\n'):
|
||||
if line:
|
||||
try:
|
||||
data = json.loads(line)
|
||||
|
||||
duration = data.get('duration')
|
||||
if duration is None:
|
||||
duration = 0
|
||||
else:
|
||||
try:
|
||||
duration = float(duration)
|
||||
except (TypeError, ValueError):
|
||||
duration = 0
|
||||
|
||||
result = SearchResult(
|
||||
id=data.get('id', ''),
|
||||
title=data.get('title', 'Unknown'),
|
||||
duration=duration,
|
||||
uploader=data.get('uploader', data.get('channel', 'Unknown')) or 'Unknown',
|
||||
url=data.get('url', data.get('webpage_url', '')),
|
||||
thumbnail=data.get('thumbnail'),
|
||||
view_count=int(data.get('view_count', 0) or 0)
|
||||
)
|
||||
results.append(result)
|
||||
except json.JSONDecodeError:
|
||||
continue
|
||||
except Exception as e:
|
||||
logger.debug(f"Error parsing result: {e}")
|
||||
continue
|
||||
|
||||
return results
|
||||
|
||||
except asyncio.TimeoutError:
|
||||
logger.error("Search timed out")
|
||||
return []
|
||||
except Exception as e:
|
||||
logger.error(f"Search error: {e}")
|
||||
return []
|
||||
|
||||
async def download(
|
||||
self,
|
||||
url: str,
|
||||
progress_callback: Optional[Callable[[DownloadProgress], None]] = None
|
||||
) -> Optional[str]:
|
||||
"""Downloads audio and converts it to Opus."""
|
||||
output_template = str(self.download_directory / "%(title)s.%(ext)s")
|
||||
|
||||
cmd = [
|
||||
self._ytdlp_path,
|
||||
url,
|
||||
'-x',
|
||||
'--audio-format', 'opus',
|
||||
'--audio-quality', '0',
|
||||
'-o', output_template,
|
||||
'--no-playlist',
|
||||
'--no-warnings',
|
||||
'--newline',
|
||||
'--restrict-filenames',
|
||||
]
|
||||
|
||||
if self.max_filesize > 0:
|
||||
cmd.extend(['--max-filesize', str(self.max_filesize)])
|
||||
|
||||
cmd.extend(['-f', self.audio_format])
|
||||
|
||||
try:
|
||||
process = await asyncio.create_subprocess_exec(
|
||||
*cmd,
|
||||
stdout=asyncio.subprocess.PIPE,
|
||||
stderr=asyncio.subprocess.PIPE
|
||||
)
|
||||
|
||||
download_id = url
|
||||
self._active_downloads[download_id] = process
|
||||
|
||||
output_file = None
|
||||
|
||||
async for line in process.stdout:
|
||||
line = line.decode().strip()
|
||||
logger.debug(f"yt-dlp: {line}")
|
||||
|
||||
if progress_callback:
|
||||
progress = self._parse_progress(line)
|
||||
if progress:
|
||||
progress_callback(progress)
|
||||
|
||||
# Parse stdout to find where the file is being saved
|
||||
if '[download] Destination:' in line:
|
||||
output_file = line.split('Destination:', 1)[1].strip()
|
||||
elif 'has already been downloaded' in line:
|
||||
match = re.search(r'\[download\] (.+) has already been downloaded', line)
|
||||
if match:
|
||||
output_file = match.group(1)
|
||||
elif '[ExtractAudio] Destination:' in line:
|
||||
output_file = line.split('Destination:', 1)[1].strip()
|
||||
|
||||
await process.wait()
|
||||
|
||||
if download_id in self._active_downloads:
|
||||
del self._active_downloads[download_id]
|
||||
|
||||
if process.returncode == 0:
|
||||
if not output_file or not os.path.exists(output_file):
|
||||
# Fallback: check most recent file in download dir
|
||||
recent_files = sorted(
|
||||
self.download_directory.glob('*.opus'),
|
||||
key=lambda x: x.stat().st_mtime,
|
||||
reverse=True
|
||||
)
|
||||
if recent_files:
|
||||
output_file = str(recent_files[0])
|
||||
|
||||
if output_file and os.path.exists(output_file):
|
||||
logger.info(f"Downloaded: {output_file}")
|
||||
return output_file
|
||||
|
||||
stderr_output = await process.stderr.read()
|
||||
logger.error(f"Download failed: {stderr_output.decode()}")
|
||||
return None
|
||||
|
||||
except asyncio.CancelledError:
|
||||
if download_id in self._active_downloads:
|
||||
self._active_downloads[download_id].terminate()
|
||||
del self._active_downloads[download_id]
|
||||
raise
|
||||
except Exception as e:
|
||||
logger.error(f"Download error: {e}")
|
||||
return None
|
||||
|
||||
def _parse_progress(self, line: str) -> Optional[DownloadProgress]:
|
||||
"""Parses standard yt-dlp stdout progress lines."""
|
||||
percent_match = re.search(r'(\d+\.?\d*)%', line)
|
||||
speed_match = re.search(r'at\s+(\S+/s)', line)
|
||||
eta_match = re.search(r'ETA\s+(\S+)', line)
|
||||
|
||||
if percent_match:
|
||||
return DownloadProgress(
|
||||
filename="",
|
||||
status="downloading",
|
||||
percent=float(percent_match.group(1)),
|
||||
speed=speed_match.group(1) if speed_match else "",
|
||||
eta=eta_match.group(1) if eta_match else ""
|
||||
)
|
||||
|
||||
if 'Deleting original file' in line or '[ExtractAudio]' in line:
|
||||
return DownloadProgress(
|
||||
filename="",
|
||||
status="converting",
|
||||
percent=100.0
|
||||
)
|
||||
|
||||
return None
|
||||
|
||||
async def get_info(self, url: str) -> Optional[Dict]:
|
||||
cmd = [
|
||||
self._ytdlp_path,
|
||||
url,
|
||||
'--dump-json',
|
||||
'--no-download',
|
||||
'--no-warnings'
|
||||
]
|
||||
|
||||
try:
|
||||
process = await asyncio.create_subprocess_exec(
|
||||
*cmd,
|
||||
stdout=asyncio.subprocess.PIPE,
|
||||
stderr=asyncio.subprocess.PIPE
|
||||
)
|
||||
|
||||
stdout, _ = await asyncio.wait_for(
|
||||
process.communicate(),
|
||||
timeout=30
|
||||
)
|
||||
|
||||
if process.returncode == 0:
|
||||
return json.loads(stdout.decode())
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error getting info: {e}")
|
||||
|
||||
return None
|
||||
|
||||
def cancel_download(self, url: str) -> bool:
|
||||
if url in self._active_downloads:
|
||||
self._active_downloads[url].terminate()
|
||||
return True
|
||||
return False
|
||||
|
||||
def cancel_all_downloads(self):
|
||||
for process in self._active_downloads.values():
|
||||
process.terminate()
|
||||
self._active_downloads.clear()
|
||||
Reference in New Issue
Block a user