Add playlist_manager.py

This commit is contained in:
2025-12-13 18:30:46 +00:00
parent 12a9f859aa
commit ee9fc9120a

441
playlist_manager.py Normal file
View File

@@ -0,0 +1,441 @@
#!/usr/bin/env python3
"""
Playlist and Library Management Module.
This module handles the music library (scanning, metadata extraction),
the playback queue, and playlist operations. It differentiates between
a static playlist and a priority queue.
"""
import os
import random
import asyncio
import gc
from pathlib import Path
from typing import Optional, List, Dict, Tuple, Deque
from dataclasses import dataclass, field
from enum import Enum
from collections import deque
import logging
try:
from mutagen import File as MutagenFile
MUTAGEN_AVAILABLE = True
except ImportError:
MUTAGEN_AVAILABLE = False
logging.warning("Mutagen not available - metadata extraction disabled")
logger = logging.getLogger(__name__)
class PlaybackMode(Enum):
SEQUENTIAL = "sequential"
RANDOM = "random"
REPEAT_ONE = "repeat_one"
REPEAT_ALL = "repeat_all"
@dataclass(slots=True)
class Track:
"""
Represents a single music track.
Uses slots to reduce memory footprint for large libraries.
"""
path: str
filename: str
title: str = ""
artist: str = ""
album: str = ""
duration: float = 0.0
format: str = ""
def __post_init__(self):
if not self.title:
self.title = Path(self.filename).stem
if not self.format:
self.format = Path(self.filename).suffix.lower().lstrip('.')
@property
def display_name(self) -> str:
if self.artist:
return f"{self.artist} - {self.title}"
return self.title
def to_dict(self) -> Dict:
return {
'path': self.path,
'filename': self.filename,
'title': self.title,
'artist': self.artist,
'album': self.album,
'duration': self.duration,
'format': self.format
}
@dataclass
class Playlist:
"""Represents a collection of tracks with a playback state."""
name: str
tracks: List[Track] = field(default_factory=list)
current_index: int = 0
mode: PlaybackMode = PlaybackMode.SEQUENTIAL
def add_track(self, track: Track):
self.tracks.append(track)
def remove_track(self, index: int) -> Optional[Track]:
if 0 <= index < len(self.tracks):
return self.tracks.pop(index)
return None
def clear(self):
self.tracks.clear()
self.current_index = 0
def shuffle(self):
if self.tracks:
current = self.current_track
random.shuffle(self.tracks)
if current:
try:
self.current_index = self.tracks.index(current)
except ValueError:
self.current_index = 0
@property
def current_track(self) -> Optional[Track]:
if 0 <= self.current_index < len(self.tracks):
return self.tracks[self.current_index]
return None
def next_track(self) -> Optional[Track]:
"""Calculates the next track based on playback mode."""
if not self.tracks:
return None
if self.mode == PlaybackMode.REPEAT_ONE:
return self.current_track
if self.mode == PlaybackMode.RANDOM:
if len(self.tracks) == 1:
return self.tracks[0]
available_indices = [i for i in range(len(self.tracks)) if i != self.current_index]
if available_indices:
self.current_index = random.choice(available_indices)
else:
self.current_index += 1
if self.current_index >= len(self.tracks):
if self.mode == PlaybackMode.REPEAT_ALL:
self.current_index = 0
else:
return None
return self.current_track
def previous_track(self) -> Optional[Track]:
if not self.tracks:
return None
self.current_index -= 1
if self.current_index < 0:
self.current_index = len(self.tracks) - 1
return self.current_track
def goto_track(self, index: int) -> Optional[Track]:
if 0 <= index < len(self.tracks):
self.current_index = index
return self.current_track
return None
class MusicQueue:
"""
Priority queue system.
Tracks in this queue take precedence over the standard playlist.
"""
MAX_QUEUE_SIZE = 100
MAX_HISTORY_SIZE = 50
def __init__(self):
self._queue: Deque[Track] = deque(maxlen=self.MAX_QUEUE_SIZE)
self._history: Deque[Track] = deque(maxlen=self.MAX_HISTORY_SIZE)
self._current: Optional[Track] = None
@property
def current(self) -> Optional[Track]:
return self._current
@current.setter
def current(self, track: Optional[Track]):
if self._current:
self._history.append(self._current)
self._current = track
def add(self, track: Track) -> bool:
if len(self._queue) >= self.MAX_QUEUE_SIZE:
return False
self._queue.append(track)
return True
def add_next(self, track: Track) -> bool:
if len(self._queue) >= self.MAX_QUEUE_SIZE:
return False
self._queue.appendleft(track)
return True
def pop(self) -> Optional[Track]:
if self._queue:
return self._queue.popleft()
return None
def peek(self, count: int = 5) -> List[Track]:
return list(self._queue)[:count]
def remove(self, index: int) -> Optional[Track]:
if 0 <= index < len(self._queue):
track = self._queue[index]
del self._queue[index]
return track
return None
def clear(self):
self._queue.clear()
def move(self, from_index: int, to_index: int) -> bool:
if not (0 <= from_index < len(self._queue) and 0 <= to_index < len(self._queue)):
return False
track = self._queue[from_index]
del self._queue[from_index]
self._queue.insert(to_index, track)
return True
def shuffle(self):
tracks = list(self._queue)
random.shuffle(tracks)
self._queue.clear()
self._queue.extend(tracks)
@property
def is_empty(self) -> bool:
return len(self._queue) == 0
@property
def size(self) -> int:
return len(self._queue)
def get_history(self, count: int = 10) -> List[Track]:
return list(self._history)[-count:]
def previous(self) -> Optional[Track]:
if self._history:
track = self._history.pop()
if self._current:
self._queue.appendleft(self._current)
self._current = track
return track
return None
class PlaylistManager:
"""
Coordinating class for library, playlists, and playback queue.
"""
SUPPORTED_FORMATS = {'.mp3', '.ogg', '.flac', '.wav', '.m4a', '.opus', '.aac', '.wma'}
MAX_LIBRARY_SIZE = 10000
def __init__(self, music_directory: str, supported_formats: Optional[List[str]] = None):
self.music_directory = Path(music_directory)
self.music_directory.mkdir(parents=True, exist_ok=True)
if supported_formats:
self.supported_formats = {f'.{fmt.lower().lstrip(".")}' for fmt in supported_formats}
else:
self.supported_formats = self.SUPPORTED_FORMATS
self.library: List[Track] = []
self.playlists: Dict[str, Playlist] = {}
self.current_playlist: Optional[Playlist] = None
self.queue = MusicQueue()
self.playlists['default'] = Playlist('default')
self.current_playlist = self.playlists['default']
async def scan_library(self) -> int:
"""Asynchronously scans the music directory for supported files."""
self.library.clear()
gc.collect()
count = 0
for root, dirs, files in os.walk(self.music_directory):
for filename in files:
if count >= self.MAX_LIBRARY_SIZE:
logger.warning(f"Library limit reached: {self.MAX_LIBRARY_SIZE}")
return count
ext = Path(filename).suffix.lower()
if ext in self.supported_formats:
filepath = os.path.join(root, filename)
track = await self._create_track(filepath)
if track:
self.library.append(track)
count += 1
if count % 500 == 0:
gc.collect()
logger.info(f"Library scanned: {count} tracks")
return count
async def _create_track(self, filepath: str) -> Optional[Track]:
try:
filename = os.path.basename(filepath)
track = Track(path=filepath, filename=filename)
if MUTAGEN_AVAILABLE:
await self._extract_metadata(track)
return track
except Exception as e:
logger.error(f"Error creating track from {filepath}: {e}")
return None
async def _extract_metadata(self, track: Track) -> None:
try:
audio = MutagenFile(track.path, easy=True)
if audio is None:
return
if hasattr(audio, 'info') and hasattr(audio.info, 'length'):
track.duration = audio.info.length
if hasattr(audio, 'tags') and audio.tags:
track.title = audio.tags.get('title', [track.title])[0] if 'title' in audio.tags else track.title
track.artist = audio.tags.get('artist', [''])[0] if 'artist' in audio.tags else ''
track.album = audio.tags.get('album', [''])[0] if 'album' in audio.tags else ''
except Exception as e:
logger.debug(f"Metadata extraction failed for {track.path}: {e}")
def search_library(self, query: str, limit: int = 20) -> List[Track]:
query = query.lower()
results = []
for track in self.library:
if len(results) >= limit:
break
if (query in track.title.lower() or
query in track.artist.lower() or
query in track.album.lower() or
query in track.filename.lower()):
results.append(track)
return results
def add_to_queue(self, track: Track) -> bool:
return self.queue.add(track)
def add_next_to_queue(self, track: Track) -> bool:
return self.queue.add_next(track)
def get_queue(self, count: int = 10) -> List[Track]:
return self.queue.peek(count)
def clear_queue(self):
self.queue.clear()
def remove_from_queue(self, index: int) -> Optional[Track]:
return self.queue.remove(index)
def shuffle_queue(self):
self.queue.shuffle()
def add_to_playlist(self, track: Track, playlist_name: str = 'default') -> bool:
if playlist_name not in self.playlists:
self.playlists[playlist_name] = Playlist(playlist_name)
self.playlists[playlist_name].add_track(track)
return True
def add_library_to_playlist(self, playlist_name: str = 'default') -> int:
if playlist_name not in self.playlists:
self.playlists[playlist_name] = Playlist(playlist_name)
playlist = self.playlists[playlist_name]
count = 0
for track in self.library:
playlist.add_track(track)
count += 1
return count
def get_playlist(self, name: str) -> Optional[Playlist]:
return self.playlists.get(name)
def set_current_playlist(self, name: str) -> bool:
if name in self.playlists:
self.current_playlist = self.playlists[name]
return True
return False
def set_playback_mode(self, mode: PlaybackMode):
if self.current_playlist:
self.current_playlist.mode = mode
def get_current_track(self) -> Optional[Track]:
"""Returns the currently playing track from Queue or Playlist."""
return self.queue.current or (self.current_playlist.current_track if self.current_playlist else None)
def next_track(self) -> Optional[Track]:
"""
Determines the next track to play.
Logic: Queue priority > Playlist > None
"""
queued = self.queue.pop()
if queued:
self.queue.current = queued
return queued
if self.current_playlist:
track = self.current_playlist.next_track()
if track:
self.queue.current = track
return track
return None
def previous_track(self) -> Optional[Track]:
prev = self.queue.previous()
if prev:
return prev
if self.current_playlist:
return self.current_playlist.previous_track()
return None
def list_tracks(self, start: int = 0, count: int = 20) -> List[Tuple[int, Track]]:
if not self.current_playlist:
return []
tracks = self.current_playlist.tracks[start:start + count]
return [(start + i, track) for i, track in enumerate(tracks)]
def get_stats(self) -> Dict:
total_duration = sum(t.duration for t in self.library)
return {
'library_count': len(self.library),
'library_duration': total_duration,
'playlist_count': len(self.playlists),
'queue_size': self.queue.size,
'current_playlist': self.current_playlist.name if self.current_playlist else None,
'current_playlist_count': len(self.current_playlist.tracks) if self.current_playlist else 0
}