From ee9fc9120afae168e773efa7ab7b3d613d7a0a98 Mon Sep 17 00:00:00 2001 From: just n Date: Sat, 13 Dec 2025 18:30:46 +0000 Subject: [PATCH] Add playlist_manager.py --- playlist_manager.py | 441 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 441 insertions(+) create mode 100644 playlist_manager.py diff --git a/playlist_manager.py b/playlist_manager.py new file mode 100644 index 0000000..d93cc5b --- /dev/null +++ b/playlist_manager.py @@ -0,0 +1,441 @@ +#!/usr/bin/env python3 +""" +Playlist and Library Management Module. + +This module handles the music library (scanning, metadata extraction), +the playback queue, and playlist operations. It differentiates between +a static playlist and a priority queue. +""" + +import os +import random +import asyncio +import gc +from pathlib import Path +from typing import Optional, List, Dict, Tuple, Deque +from dataclasses import dataclass, field +from enum import Enum +from collections import deque +import logging + +try: + from mutagen import File as MutagenFile + MUTAGEN_AVAILABLE = True +except ImportError: + MUTAGEN_AVAILABLE = False + logging.warning("Mutagen not available - metadata extraction disabled") + +logger = logging.getLogger(__name__) + + +class PlaybackMode(Enum): + SEQUENTIAL = "sequential" + RANDOM = "random" + REPEAT_ONE = "repeat_one" + REPEAT_ALL = "repeat_all" + + +@dataclass(slots=True) +class Track: + """ + Represents a single music track. + + Uses slots to reduce memory footprint for large libraries. + """ + path: str + filename: str + title: str = "" + artist: str = "" + album: str = "" + duration: float = 0.0 + format: str = "" + + def __post_init__(self): + if not self.title: + self.title = Path(self.filename).stem + if not self.format: + self.format = Path(self.filename).suffix.lower().lstrip('.') + + @property + def display_name(self) -> str: + if self.artist: + return f"{self.artist} - {self.title}" + return self.title + + def to_dict(self) -> Dict: + return { + 'path': self.path, + 'filename': self.filename, + 'title': self.title, + 'artist': self.artist, + 'album': self.album, + 'duration': self.duration, + 'format': self.format + } + + +@dataclass +class Playlist: + """Represents a collection of tracks with a playback state.""" + name: str + tracks: List[Track] = field(default_factory=list) + current_index: int = 0 + mode: PlaybackMode = PlaybackMode.SEQUENTIAL + + def add_track(self, track: Track): + self.tracks.append(track) + + def remove_track(self, index: int) -> Optional[Track]: + if 0 <= index < len(self.tracks): + return self.tracks.pop(index) + return None + + def clear(self): + self.tracks.clear() + self.current_index = 0 + + def shuffle(self): + if self.tracks: + current = self.current_track + random.shuffle(self.tracks) + if current: + try: + self.current_index = self.tracks.index(current) + except ValueError: + self.current_index = 0 + + @property + def current_track(self) -> Optional[Track]: + if 0 <= self.current_index < len(self.tracks): + return self.tracks[self.current_index] + return None + + def next_track(self) -> Optional[Track]: + """Calculates the next track based on playback mode.""" + if not self.tracks: + return None + + if self.mode == PlaybackMode.REPEAT_ONE: + return self.current_track + + if self.mode == PlaybackMode.RANDOM: + if len(self.tracks) == 1: + return self.tracks[0] + + available_indices = [i for i in range(len(self.tracks)) if i != self.current_index] + if available_indices: + self.current_index = random.choice(available_indices) + else: + self.current_index += 1 + if self.current_index >= len(self.tracks): + if self.mode == PlaybackMode.REPEAT_ALL: + self.current_index = 0 + else: + return None + + return self.current_track + + def previous_track(self) -> Optional[Track]: + if not self.tracks: + return None + + self.current_index -= 1 + if self.current_index < 0: + self.current_index = len(self.tracks) - 1 + + return self.current_track + + def goto_track(self, index: int) -> Optional[Track]: + if 0 <= index < len(self.tracks): + self.current_index = index + return self.current_track + return None + + +class MusicQueue: + """ + Priority queue system. + + Tracks in this queue take precedence over the standard playlist. + """ + MAX_QUEUE_SIZE = 100 + MAX_HISTORY_SIZE = 50 + + def __init__(self): + self._queue: Deque[Track] = deque(maxlen=self.MAX_QUEUE_SIZE) + self._history: Deque[Track] = deque(maxlen=self.MAX_HISTORY_SIZE) + self._current: Optional[Track] = None + + @property + def current(self) -> Optional[Track]: + return self._current + + @current.setter + def current(self, track: Optional[Track]): + if self._current: + self._history.append(self._current) + self._current = track + + def add(self, track: Track) -> bool: + if len(self._queue) >= self.MAX_QUEUE_SIZE: + return False + self._queue.append(track) + return True + + def add_next(self, track: Track) -> bool: + if len(self._queue) >= self.MAX_QUEUE_SIZE: + return False + self._queue.appendleft(track) + return True + + def pop(self) -> Optional[Track]: + if self._queue: + return self._queue.popleft() + return None + + def peek(self, count: int = 5) -> List[Track]: + return list(self._queue)[:count] + + def remove(self, index: int) -> Optional[Track]: + if 0 <= index < len(self._queue): + track = self._queue[index] + del self._queue[index] + return track + return None + + def clear(self): + self._queue.clear() + + def move(self, from_index: int, to_index: int) -> bool: + if not (0 <= from_index < len(self._queue) and 0 <= to_index < len(self._queue)): + return False + + track = self._queue[from_index] + del self._queue[from_index] + self._queue.insert(to_index, track) + return True + + def shuffle(self): + tracks = list(self._queue) + random.shuffle(tracks) + self._queue.clear() + self._queue.extend(tracks) + + @property + def is_empty(self) -> bool: + return len(self._queue) == 0 + + @property + def size(self) -> int: + return len(self._queue) + + def get_history(self, count: int = 10) -> List[Track]: + return list(self._history)[-count:] + + def previous(self) -> Optional[Track]: + if self._history: + track = self._history.pop() + if self._current: + self._queue.appendleft(self._current) + self._current = track + return track + return None + + +class PlaylistManager: + """ + Coordinating class for library, playlists, and playback queue. + """ + SUPPORTED_FORMATS = {'.mp3', '.ogg', '.flac', '.wav', '.m4a', '.opus', '.aac', '.wma'} + MAX_LIBRARY_SIZE = 10000 + + def __init__(self, music_directory: str, supported_formats: Optional[List[str]] = None): + self.music_directory = Path(music_directory) + self.music_directory.mkdir(parents=True, exist_ok=True) + + if supported_formats: + self.supported_formats = {f'.{fmt.lower().lstrip(".")}' for fmt in supported_formats} + else: + self.supported_formats = self.SUPPORTED_FORMATS + + self.library: List[Track] = [] + self.playlists: Dict[str, Playlist] = {} + self.current_playlist: Optional[Playlist] = None + + self.queue = MusicQueue() + + self.playlists['default'] = Playlist('default') + self.current_playlist = self.playlists['default'] + + async def scan_library(self) -> int: + """Asynchronously scans the music directory for supported files.""" + self.library.clear() + gc.collect() + + count = 0 + + for root, dirs, files in os.walk(self.music_directory): + for filename in files: + if count >= self.MAX_LIBRARY_SIZE: + logger.warning(f"Library limit reached: {self.MAX_LIBRARY_SIZE}") + return count + + ext = Path(filename).suffix.lower() + if ext in self.supported_formats: + filepath = os.path.join(root, filename) + track = await self._create_track(filepath) + if track: + self.library.append(track) + count += 1 + + if count % 500 == 0: + gc.collect() + + logger.info(f"Library scanned: {count} tracks") + return count + + async def _create_track(self, filepath: str) -> Optional[Track]: + try: + filename = os.path.basename(filepath) + track = Track(path=filepath, filename=filename) + + if MUTAGEN_AVAILABLE: + await self._extract_metadata(track) + + return track + except Exception as e: + logger.error(f"Error creating track from {filepath}: {e}") + return None + + async def _extract_metadata(self, track: Track) -> None: + try: + audio = MutagenFile(track.path, easy=True) + if audio is None: + return + + if hasattr(audio, 'info') and hasattr(audio.info, 'length'): + track.duration = audio.info.length + + if hasattr(audio, 'tags') and audio.tags: + track.title = audio.tags.get('title', [track.title])[0] if 'title' in audio.tags else track.title + track.artist = audio.tags.get('artist', [''])[0] if 'artist' in audio.tags else '' + track.album = audio.tags.get('album', [''])[0] if 'album' in audio.tags else '' + + except Exception as e: + logger.debug(f"Metadata extraction failed for {track.path}: {e}") + + def search_library(self, query: str, limit: int = 20) -> List[Track]: + query = query.lower() + results = [] + + for track in self.library: + if len(results) >= limit: + break + if (query in track.title.lower() or + query in track.artist.lower() or + query in track.album.lower() or + query in track.filename.lower()): + results.append(track) + + return results + + def add_to_queue(self, track: Track) -> bool: + return self.queue.add(track) + + def add_next_to_queue(self, track: Track) -> bool: + return self.queue.add_next(track) + + def get_queue(self, count: int = 10) -> List[Track]: + return self.queue.peek(count) + + def clear_queue(self): + self.queue.clear() + + def remove_from_queue(self, index: int) -> Optional[Track]: + return self.queue.remove(index) + + def shuffle_queue(self): + self.queue.shuffle() + + def add_to_playlist(self, track: Track, playlist_name: str = 'default') -> bool: + if playlist_name not in self.playlists: + self.playlists[playlist_name] = Playlist(playlist_name) + self.playlists[playlist_name].add_track(track) + return True + + def add_library_to_playlist(self, playlist_name: str = 'default') -> int: + if playlist_name not in self.playlists: + self.playlists[playlist_name] = Playlist(playlist_name) + + playlist = self.playlists[playlist_name] + count = 0 + + for track in self.library: + playlist.add_track(track) + count += 1 + + return count + + def get_playlist(self, name: str) -> Optional[Playlist]: + return self.playlists.get(name) + + def set_current_playlist(self, name: str) -> bool: + if name in self.playlists: + self.current_playlist = self.playlists[name] + return True + return False + + def set_playback_mode(self, mode: PlaybackMode): + if self.current_playlist: + self.current_playlist.mode = mode + + def get_current_track(self) -> Optional[Track]: + """Returns the currently playing track from Queue or Playlist.""" + return self.queue.current or (self.current_playlist.current_track if self.current_playlist else None) + + def next_track(self) -> Optional[Track]: + """ + Determines the next track to play. + Logic: Queue priority > Playlist > None + """ + queued = self.queue.pop() + if queued: + self.queue.current = queued + return queued + + if self.current_playlist: + track = self.current_playlist.next_track() + if track: + self.queue.current = track + return track + + return None + + def previous_track(self) -> Optional[Track]: + prev = self.queue.previous() + if prev: + return prev + + if self.current_playlist: + return self.current_playlist.previous_track() + + return None + + def list_tracks(self, start: int = 0, count: int = 20) -> List[Tuple[int, Track]]: + if not self.current_playlist: + return [] + + tracks = self.current_playlist.tracks[start:start + count] + return [(start + i, track) for i, track in enumerate(tracks)] + + def get_stats(self) -> Dict: + total_duration = sum(t.duration for t in self.library) + + return { + 'library_count': len(self.library), + 'library_duration': total_duration, + 'playlist_count': len(self.playlists), + 'queue_size': self.queue.size, + 'current_playlist': self.current_playlist.name if self.current_playlist else None, + 'current_playlist_count': len(self.current_playlist.tracks) if self.current_playlist else 0 + } \ No newline at end of file