From 3917a51e53a8b3a3873b968b551aae1a14efcb68 Mon Sep 17 00:00:00 2001 From: just n Date: Sat, 13 Dec 2025 18:26:54 +0000 Subject: [PATCH] Add ytdlp_handler.py --- ytdlp_handler.py | 337 +++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 337 insertions(+) create mode 100644 ytdlp_handler.py diff --git a/ytdlp_handler.py b/ytdlp_handler.py new file mode 100644 index 0000000..ee00506 --- /dev/null +++ b/ytdlp_handler.py @@ -0,0 +1,337 @@ +#!/usr/bin/env python3 +""" +YouTube-DLP Integration Module. + +Provides async wrappers for searching and downloading content via yt-dlp. +Includes parsing logic for yt-dlp's JSON output and progress updates. +""" + +import os +import asyncio +import subprocess +import json +import re +from pathlib import Path +from typing import Optional, List, Dict, Callable +from dataclasses import dataclass +import logging + +logger = logging.getLogger(__name__) + + +@dataclass +class SearchResult: + """Represents a single search result from yt-dlp.""" + id: str + title: str + duration: float + uploader: str + url: str + thumbnail: Optional[str] = None + view_count: int = 0 + + @property + def duration_str(self) -> str: + """Formats duration seconds into HH:MM:SS string.""" + try: + total_seconds = int(self.duration) if self.duration else 0 + + if total_seconds < 0: + return "??:??" + + hours = total_seconds // 3600 + minutes = (total_seconds % 3600) // 60 + seconds = total_seconds % 60 + + if hours > 0: + return f"{hours}:{minutes:02d}:{seconds:02d}" + return f"{minutes}:{seconds:02d}" + except (TypeError, ValueError): + return "??:??" + + def to_dict(self) -> Dict: + return { + 'id': self.id, + 'title': self.title, + 'duration': self.duration, + 'duration_str': self.duration_str, + 'uploader': self.uploader, + 'url': self.url, + 'thumbnail': self.thumbnail, + 'view_count': self.view_count + } + + +@dataclass +class DownloadProgress: + """Real-time status of an active download.""" + filename: str + status: str + percent: float = 0.0 + speed: str = "" + eta: str = "" + error: str = "" + + +class YtDlpHandler: + """Manages yt-dlp subprocesses.""" + + def __init__( + self, + download_directory: str, + audio_format: str = "bestaudio", + max_filesize: int = 50, + max_search_results: int = 10 + ): + self.download_directory = Path(download_directory) + self.download_directory.mkdir(parents=True, exist_ok=True) + + self.audio_format = audio_format + self.max_filesize = max_filesize * 1024 * 1024 if max_filesize > 0 else 0 + self.max_search_results = max_search_results + + self._ytdlp_path = self._find_ytdlp() + self._active_downloads: Dict[str, asyncio.subprocess.Process] = {} + + def _find_ytdlp(self) -> str: + """Locates the yt-dlp binary in the system path.""" + for name in ['yt-dlp', 'yt-dlp.exe', 'youtube-dl']: + try: + result = subprocess.run( + [name, '--version'], + capture_output=True, + text=True + ) + if result.returncode == 0: + logger.info(f"Found yt-dlp: {name} (version {result.stdout.strip()})") + return name + except FileNotFoundError: + continue + + raise RuntimeError("yt-dlp not found. Install with: pip install yt-dlp") + + async def search(self, query: str, source: str = "youtube") -> List[SearchResult]: + """Performs a non-download search using yt-dlp's internal search operators.""" + search_prefix = { + "youtube": "ytsearch", + "soundcloud": "scsearch", + "bandcamp": "bcsearch" + }.get(source, "ytsearch") + + search_query = f"{search_prefix}{self.max_search_results}:{query}" + + cmd = [ + self._ytdlp_path, + search_query, + '--dump-json', + '--no-download', + '--flat-playlist', + '--ignore-errors', + '--no-warnings' + ] + + try: + process = await asyncio.create_subprocess_exec( + *cmd, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE + ) + + stdout, stderr = await asyncio.wait_for( + process.communicate(), + timeout=60 + ) + + results = [] + for line in stdout.decode().strip().split('\n'): + if line: + try: + data = json.loads(line) + + duration = data.get('duration') + if duration is None: + duration = 0 + else: + try: + duration = float(duration) + except (TypeError, ValueError): + duration = 0 + + result = SearchResult( + id=data.get('id', ''), + title=data.get('title', 'Unknown'), + duration=duration, + uploader=data.get('uploader', data.get('channel', 'Unknown')) or 'Unknown', + url=data.get('url', data.get('webpage_url', '')), + thumbnail=data.get('thumbnail'), + view_count=int(data.get('view_count', 0) or 0) + ) + results.append(result) + except json.JSONDecodeError: + continue + except Exception as e: + logger.debug(f"Error parsing result: {e}") + continue + + return results + + except asyncio.TimeoutError: + logger.error("Search timed out") + return [] + except Exception as e: + logger.error(f"Search error: {e}") + return [] + + async def download( + self, + url: str, + progress_callback: Optional[Callable[[DownloadProgress], None]] = None + ) -> Optional[str]: + """Downloads audio and converts it to Opus.""" + output_template = str(self.download_directory / "%(title)s.%(ext)s") + + cmd = [ + self._ytdlp_path, + url, + '-x', + '--audio-format', 'opus', + '--audio-quality', '0', + '-o', output_template, + '--no-playlist', + '--no-warnings', + '--newline', + '--restrict-filenames', + ] + + if self.max_filesize > 0: + cmd.extend(['--max-filesize', str(self.max_filesize)]) + + cmd.extend(['-f', self.audio_format]) + + try: + process = await asyncio.create_subprocess_exec( + *cmd, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE + ) + + download_id = url + self._active_downloads[download_id] = process + + output_file = None + + async for line in process.stdout: + line = line.decode().strip() + logger.debug(f"yt-dlp: {line}") + + if progress_callback: + progress = self._parse_progress(line) + if progress: + progress_callback(progress) + + # Parse stdout to find where the file is being saved + if '[download] Destination:' in line: + output_file = line.split('Destination:', 1)[1].strip() + elif 'has already been downloaded' in line: + match = re.search(r'\[download\] (.+) has already been downloaded', line) + if match: + output_file = match.group(1) + elif '[ExtractAudio] Destination:' in line: + output_file = line.split('Destination:', 1)[1].strip() + + await process.wait() + + if download_id in self._active_downloads: + del self._active_downloads[download_id] + + if process.returncode == 0: + if not output_file or not os.path.exists(output_file): + # Fallback: check most recent file in download dir + recent_files = sorted( + self.download_directory.glob('*.opus'), + key=lambda x: x.stat().st_mtime, + reverse=True + ) + if recent_files: + output_file = str(recent_files[0]) + + if output_file and os.path.exists(output_file): + logger.info(f"Downloaded: {output_file}") + return output_file + + stderr_output = await process.stderr.read() + logger.error(f"Download failed: {stderr_output.decode()}") + return None + + except asyncio.CancelledError: + if download_id in self._active_downloads: + self._active_downloads[download_id].terminate() + del self._active_downloads[download_id] + raise + except Exception as e: + logger.error(f"Download error: {e}") + return None + + def _parse_progress(self, line: str) -> Optional[DownloadProgress]: + """Parses standard yt-dlp stdout progress lines.""" + percent_match = re.search(r'(\d+\.?\d*)%', line) + speed_match = re.search(r'at\s+(\S+/s)', line) + eta_match = re.search(r'ETA\s+(\S+)', line) + + if percent_match: + return DownloadProgress( + filename="", + status="downloading", + percent=float(percent_match.group(1)), + speed=speed_match.group(1) if speed_match else "", + eta=eta_match.group(1) if eta_match else "" + ) + + if 'Deleting original file' in line or '[ExtractAudio]' in line: + return DownloadProgress( + filename="", + status="converting", + percent=100.0 + ) + + return None + + async def get_info(self, url: str) -> Optional[Dict]: + cmd = [ + self._ytdlp_path, + url, + '--dump-json', + '--no-download', + '--no-warnings' + ] + + try: + process = await asyncio.create_subprocess_exec( + *cmd, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE + ) + + stdout, _ = await asyncio.wait_for( + process.communicate(), + timeout=30 + ) + + if process.returncode == 0: + return json.loads(stdout.decode()) + + except Exception as e: + logger.error(f"Error getting info: {e}") + + return None + + def cancel_download(self, url: str) -> bool: + if url in self._active_downloads: + self._active_downloads[url].terminate() + return True + return False + + def cancel_all_downloads(self): + for process in self._active_downloads.values(): + process.terminate() + self._active_downloads.clear() \ No newline at end of file