#!/usr/bin/env python3 import os import asyncio import subprocess import json import re from pathlib import Path from typing import Optional, List, Dict, Callable from dataclasses import dataclass import logging logger = logging.getLogger(__name__) @dataclass class SearchResult: id: str title: str duration: float uploader: str url: str thumbnail: Optional[str] = None view_count: int = 0 @property def duration_str(self) -> str: try: total_seconds = int(self.duration) if self.duration else 0 if total_seconds < 0: return "??:??" hours = total_seconds // 3600 minutes = (total_seconds % 3600) // 60 seconds = total_seconds % 60 if hours > 0: return f"{hours}:{minutes:02d}:{seconds:02d}" return f"{minutes}:{seconds:02d}" except (TypeError, ValueError): return "??:??" def to_dict(self) -> Dict: return { 'id': self.id, 'title': self.title, 'duration': self.duration, 'duration_str': self.duration_str, 'uploader': self.uploader, 'url': self.url, 'thumbnail': self.thumbnail, 'view_count': self.view_count } @dataclass class DownloadProgress: filename: str status: str percent: float = 0.0 speed: str = "" eta: str = "" error: str = "" class YtDlpHandler: def __init__( self, download_directory: str, audio_format: str = "bestaudio", max_filesize: int = 50, max_search_results: int = 10 ): self.download_directory = Path(download_directory) self.download_directory.mkdir(parents=True, exist_ok=True) self.audio_format = audio_format self.max_filesize = max_filesize * 1024 * 1024 if max_filesize > 0 else 0 self.max_search_results = max_search_results self._ytdlp_path = self._find_ytdlp() self._active_downloads: Dict[str, asyncio.subprocess.Process] = {} def _find_ytdlp(self) -> str: for name in ['yt-dlp', 'yt-dlp.exe', 'youtube-dl']: try: result = subprocess.run( [name, '--version'], capture_output=True, text=True ) if result.returncode == 0: logger.info(f"Found yt-dlp: {name} (version {result.stdout.strip()})") return name except FileNotFoundError: continue raise RuntimeError("yt-dlp not found. Install with: pip install yt-dlp") async def search(self, query: str, source: str = "youtube") -> List[SearchResult]: search_prefix = { "youtube": "ytsearch", "soundcloud": "scsearch", "bandcamp": "bcsearch" }.get(source, "ytsearch") search_query = f"{search_prefix}{self.max_search_results}:{query}" cmd = [ self._ytdlp_path, search_query, '--dump-json', '--no-download', '--flat-playlist', '--ignore-errors', '--no-warnings' ] try: process = await asyncio.create_subprocess_exec( *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE ) stdout, stderr = await asyncio.wait_for( process.communicate(), timeout=60 ) results = [] for line in stdout.decode().strip().split('\n'): if line: try: data = json.loads(line) duration = data.get('duration') if duration is None: duration = 0 else: try: duration = float(duration) except (TypeError, ValueError): duration = 0 result = SearchResult( id=data.get('id', ''), title=data.get('title', 'Unknown'), duration=duration, uploader=data.get('uploader', data.get('channel', 'Unknown')) or 'Unknown', url=data.get('url', data.get('webpage_url', '')), thumbnail=data.get('thumbnail'), view_count=int(data.get('view_count', 0) or 0) ) results.append(result) except json.JSONDecodeError: continue except Exception as e: logger.debug(f"Error parsing result: {e}") continue return results except asyncio.TimeoutError: logger.error("Search timed out") return [] except Exception as e: logger.error(f"Search error: {e}") return [] async def download( self, url: str, progress_callback: Optional[Callable[[DownloadProgress], None]] = None ) -> Optional[str]: output_template = str(self.download_directory / "%(title)s.%(ext)s") cmd = [ self._ytdlp_path, url, '-x', '--audio-format', 'opus', '--audio-quality', '0', '-o', output_template, '--no-playlist', '--no-warnings', '--newline', '--restrict-filenames', ] if self.max_filesize > 0: cmd.extend(['--max-filesize', str(self.max_filesize)]) cmd.extend(['-f', self.audio_format]) try: process = await asyncio.create_subprocess_exec( *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE ) download_id = url self._active_downloads[download_id] = process output_file = None async for line in process.stdout: line = line.decode().strip() logger.debug(f"yt-dlp: {line}") if progress_callback: progress = self._parse_progress(line) if progress: progress_callback(progress) if '[download] Destination:' in line: output_file = line.split('Destination:', 1)[1].strip() elif 'has already been downloaded' in line: match = re.search(r'\[download\] (.+) has already been downloaded', line) if match: output_file = match.group(1) elif '[ExtractAudio] Destination:' in line: output_file = line.split('Destination:', 1)[1].strip() await process.wait() if download_id in self._active_downloads: del self._active_downloads[download_id] if process.returncode == 0: if not output_file or not os.path.exists(output_file): recent_files = sorted( self.download_directory.glob('*.opus'), key=lambda x: x.stat().st_mtime, reverse=True ) if recent_files: output_file = str(recent_files[0]) if output_file and os.path.exists(output_file): logger.info(f"Downloaded: {output_file}") return output_file stderr_output = await process.stderr.read() logger.error(f"Download failed: {stderr_output.decode()}") return None except asyncio.CancelledError: if download_id in self._active_downloads: self._active_downloads[download_id].terminate() del self._active_downloads[download_id] raise except Exception as e: logger.error(f"Download error: {e}") return None def _parse_progress(self, line: str) -> Optional[DownloadProgress]: percent_match = re.search(r'(\d+\.?\d*)%', line) speed_match = re.search(r'at\s+(\S+/s)', line) eta_match = re.search(r'ETA\s+(\S+)', line) if percent_match: return DownloadProgress( filename="", status="downloading", percent=float(percent_match.group(1)), speed=speed_match.group(1) if speed_match else "", eta=eta_match.group(1) if eta_match else "" ) if 'Deleting original file' in line or '[ExtractAudio]' in line: return DownloadProgress( filename="", status="converting", percent=100.0 ) return None async def get_info(self, url: str) -> Optional[Dict]: cmd = [ self._ytdlp_path, url, '--dump-json', '--no-download', '--no-warnings' ] try: process = await asyncio.create_subprocess_exec( *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE ) stdout, _ = await asyncio.wait_for( process.communicate(), timeout=30 ) if process.returncode == 0: return json.loads(stdout.decode()) except Exception as e: logger.error(f"Error getting info: {e}") return None def cancel_download(self, url: str) -> bool: if url in self._active_downloads: self._active_downloads[url].terminate() return True return False def cancel_all_downloads(self): for process in self._active_downloads.values(): process.terminate() self._active_downloads.clear()