#!/usr/bin/env python3 """ Playlist and Library Management Module. This module handles the music library (scanning, metadata extraction), the playback queue, and playlist operations. It differentiates between a static playlist and a priority queue. """ import os import random import asyncio import gc from pathlib import Path from typing import Optional, List, Dict, Tuple, Deque from dataclasses import dataclass, field from enum import Enum from collections import deque import logging try: from mutagen import File as MutagenFile MUTAGEN_AVAILABLE = True except ImportError: MUTAGEN_AVAILABLE = False logging.warning("Mutagen not available - metadata extraction disabled") logger = logging.getLogger(__name__) class PlaybackMode(Enum): SEQUENTIAL = "sequential" RANDOM = "random" REPEAT_ONE = "repeat_one" REPEAT_ALL = "repeat_all" @dataclass(slots=True) class Track: """ Represents a single music track. Uses slots to reduce memory footprint for large libraries. """ path: str filename: str title: str = "" artist: str = "" album: str = "" duration: float = 0.0 format: str = "" def __post_init__(self): if not self.title: self.title = Path(self.filename).stem if not self.format: self.format = Path(self.filename).suffix.lower().lstrip('.') @property def display_name(self) -> str: if self.artist: return f"{self.artist} - {self.title}" return self.title def to_dict(self) -> Dict: return { 'path': self.path, 'filename': self.filename, 'title': self.title, 'artist': self.artist, 'album': self.album, 'duration': self.duration, 'format': self.format } @dataclass class Playlist: """Represents a collection of tracks with a playback state.""" name: str tracks: List[Track] = field(default_factory=list) current_index: int = 0 mode: PlaybackMode = PlaybackMode.SEQUENTIAL def add_track(self, track: Track): self.tracks.append(track) def remove_track(self, index: int) -> Optional[Track]: if 0 <= index < len(self.tracks): return self.tracks.pop(index) return None def clear(self): self.tracks.clear() self.current_index = 0 def shuffle(self): if self.tracks: current = self.current_track random.shuffle(self.tracks) if current: try: self.current_index = self.tracks.index(current) except ValueError: self.current_index = 0 @property def current_track(self) -> Optional[Track]: if 0 <= self.current_index < len(self.tracks): return self.tracks[self.current_index] return None def next_track(self) -> Optional[Track]: """Calculates the next track based on playback mode.""" if not self.tracks: return None if self.mode == PlaybackMode.REPEAT_ONE: return self.current_track if self.mode == PlaybackMode.RANDOM: if len(self.tracks) == 1: return self.tracks[0] available_indices = [i for i in range(len(self.tracks)) if i != self.current_index] if available_indices: self.current_index = random.choice(available_indices) else: self.current_index += 1 if self.current_index >= len(self.tracks): if self.mode == PlaybackMode.REPEAT_ALL: self.current_index = 0 else: return None return self.current_track def previous_track(self) -> Optional[Track]: if not self.tracks: return None self.current_index -= 1 if self.current_index < 0: self.current_index = len(self.tracks) - 1 return self.current_track def goto_track(self, index: int) -> Optional[Track]: if 0 <= index < len(self.tracks): self.current_index = index return self.current_track return None class MusicQueue: """ Priority queue system. Tracks in this queue take precedence over the standard playlist. """ MAX_QUEUE_SIZE = 100 MAX_HISTORY_SIZE = 50 def __init__(self): self._queue: Deque[Track] = deque(maxlen=self.MAX_QUEUE_SIZE) self._history: Deque[Track] = deque(maxlen=self.MAX_HISTORY_SIZE) self._current: Optional[Track] = None @property def current(self) -> Optional[Track]: return self._current @current.setter def current(self, track: Optional[Track]): if self._current: self._history.append(self._current) self._current = track def add(self, track: Track) -> bool: if len(self._queue) >= self.MAX_QUEUE_SIZE: return False self._queue.append(track) return True def add_next(self, track: Track) -> bool: if len(self._queue) >= self.MAX_QUEUE_SIZE: return False self._queue.appendleft(track) return True def pop(self) -> Optional[Track]: if self._queue: return self._queue.popleft() return None def peek(self, count: int = 5) -> List[Track]: return list(self._queue)[:count] def remove(self, index: int) -> Optional[Track]: if 0 <= index < len(self._queue): track = self._queue[index] del self._queue[index] return track return None def clear(self): self._queue.clear() def move(self, from_index: int, to_index: int) -> bool: if not (0 <= from_index < len(self._queue) and 0 <= to_index < len(self._queue)): return False track = self._queue[from_index] del self._queue[from_index] self._queue.insert(to_index, track) return True def shuffle(self): tracks = list(self._queue) random.shuffle(tracks) self._queue.clear() self._queue.extend(tracks) @property def is_empty(self) -> bool: return len(self._queue) == 0 @property def size(self) -> int: return len(self._queue) def get_history(self, count: int = 10) -> List[Track]: return list(self._history)[-count:] def previous(self) -> Optional[Track]: if self._history: track = self._history.pop() if self._current: self._queue.appendleft(self._current) self._current = track return track return None class PlaylistManager: """ Coordinating class for library, playlists, and playback queue. """ SUPPORTED_FORMATS = {'.mp3', '.ogg', '.flac', '.wav', '.m4a', '.opus', '.aac', '.wma'} MAX_LIBRARY_SIZE = 10000 def __init__(self, music_directory: str, supported_formats: Optional[List[str]] = None): self.music_directory = Path(music_directory) self.music_directory.mkdir(parents=True, exist_ok=True) if supported_formats: self.supported_formats = {f'.{fmt.lower().lstrip(".")}' for fmt in supported_formats} else: self.supported_formats = self.SUPPORTED_FORMATS self.library: List[Track] = [] self.playlists: Dict[str, Playlist] = {} self.current_playlist: Optional[Playlist] = None self.queue = MusicQueue() self.playlists['default'] = Playlist('default') self.current_playlist = self.playlists['default'] async def scan_library(self) -> int: """Asynchronously scans the music directory for supported files.""" self.library.clear() gc.collect() count = 0 for root, dirs, files in os.walk(self.music_directory): for filename in files: if count >= self.MAX_LIBRARY_SIZE: logger.warning(f"Library limit reached: {self.MAX_LIBRARY_SIZE}") return count ext = Path(filename).suffix.lower() if ext in self.supported_formats: filepath = os.path.join(root, filename) track = await self._create_track(filepath) if track: self.library.append(track) count += 1 if count % 500 == 0: gc.collect() logger.info(f"Library scanned: {count} tracks") return count async def _create_track(self, filepath: str) -> Optional[Track]: try: filename = os.path.basename(filepath) track = Track(path=filepath, filename=filename) if MUTAGEN_AVAILABLE: await self._extract_metadata(track) return track except Exception as e: logger.error(f"Error creating track from {filepath}: {e}") return None async def _extract_metadata(self, track: Track) -> None: try: audio = MutagenFile(track.path, easy=True) if audio is None: return if hasattr(audio, 'info') and hasattr(audio.info, 'length'): track.duration = audio.info.length if hasattr(audio, 'tags') and audio.tags: track.title = audio.tags.get('title', [track.title])[0] if 'title' in audio.tags else track.title track.artist = audio.tags.get('artist', [''])[0] if 'artist' in audio.tags else '' track.album = audio.tags.get('album', [''])[0] if 'album' in audio.tags else '' except Exception as e: logger.debug(f"Metadata extraction failed for {track.path}: {e}") def search_library(self, query: str, limit: int = 20) -> List[Track]: query = query.lower() results = [] for track in self.library: if len(results) >= limit: break if (query in track.title.lower() or query in track.artist.lower() or query in track.album.lower() or query in track.filename.lower()): results.append(track) return results def add_to_queue(self, track: Track) -> bool: return self.queue.add(track) def add_next_to_queue(self, track: Track) -> bool: return self.queue.add_next(track) def get_queue(self, count: int = 10) -> List[Track]: return self.queue.peek(count) def clear_queue(self): self.queue.clear() def remove_from_queue(self, index: int) -> Optional[Track]: return self.queue.remove(index) def shuffle_queue(self): self.queue.shuffle() def add_to_playlist(self, track: Track, playlist_name: str = 'default') -> bool: if playlist_name not in self.playlists: self.playlists[playlist_name] = Playlist(playlist_name) self.playlists[playlist_name].add_track(track) return True def add_library_to_playlist(self, playlist_name: str = 'default') -> int: if playlist_name not in self.playlists: self.playlists[playlist_name] = Playlist(playlist_name) playlist = self.playlists[playlist_name] count = 0 for track in self.library: playlist.add_track(track) count += 1 return count def get_playlist(self, name: str) -> Optional[Playlist]: return self.playlists.get(name) def set_current_playlist(self, name: str) -> bool: if name in self.playlists: self.current_playlist = self.playlists[name] return True return False def set_playback_mode(self, mode: PlaybackMode): if self.current_playlist: self.current_playlist.mode = mode def get_current_track(self) -> Optional[Track]: """Returns the currently playing track from Queue or Playlist.""" return self.queue.current or (self.current_playlist.current_track if self.current_playlist else None) def next_track(self) -> Optional[Track]: """ Determines the next track to play. Logic: Queue priority > Playlist > None """ queued = self.queue.pop() if queued: self.queue.current = queued return queued if self.current_playlist: track = self.current_playlist.next_track() if track: self.queue.current = track return track return None def previous_track(self) -> Optional[Track]: prev = self.queue.previous() if prev: return prev if self.current_playlist: return self.current_playlist.previous_track() return None def list_tracks(self, start: int = 0, count: int = 20) -> List[Tuple[int, Track]]: if not self.current_playlist: return [] tracks = self.current_playlist.tracks[start:start + count] return [(start + i, track) for i, track in enumerate(tracks)] def get_stats(self) -> Dict: total_duration = sum(t.duration for t in self.library) return { 'library_count': len(self.library), 'library_duration': total_duration, 'playlist_count': len(self.playlists), 'queue_size': self.queue.size, 'current_playlist': self.current_playlist.name if self.current_playlist else None, 'current_playlist_count': len(self.current_playlist.tracks) if self.current_playlist else 0 }