diff --git a/audio_streamer.py b/audio_streamer.py new file mode 100644 index 0000000..8a1aa0c --- /dev/null +++ b/audio_streamer.py @@ -0,0 +1,446 @@ +#!/usr/bin/env python3 +""" +Audio Streamer Module. + +This module provides an HTTP streaming server that distributes audio to +connected clients (like Shoutcast/Icecast). It pipes audio from an FFMPEG +process into an asyncio buffer and serves it via aiohttp. +""" + +import os +import asyncio +import subprocess +import gc +import weakref +from pathlib import Path +from typing import Optional, Dict, Set, Callable, Any +from dataclasses import dataclass, field +from enum import Enum +from aiohttp import web +import logging + +logger = logging.getLogger(__name__) + + +class StreamSource(Enum): + FILE = "file" + URL = "url" + SILENCE = "silence" + + +@dataclass(slots=True) +class StreamState: + """Holds metadata about the current stream.""" + source_type: StreamSource = StreamSource.SILENCE + source_path: str = "" + title: str = "Silence" + artist: str = "" + is_playing: bool = False + duration: float = 0.0 + listeners: int = 0 + + +class AudioStreamer: + """ + Manages the FFMPEG subprocess and HTTP server for audio streaming. + + Attributes: + _audio_buffer: Queue holding audio chunks from FFMPEG. + _clients: Set of active HTTP response objects to write to. + """ + + CHUNK_SIZE = 2048 + MAX_BUFFER_SIZE = 30 + MAX_CLIENTS = 50 + + def __init__( + self, + host: str = "0.0.0.0", + port: int = 8000, + mount_point: str = "/radio", + stream_format: str = "mp3", + bitrate: int = 128 + ): + self.host = host + self.port = port + self.mount_point = mount_point.rstrip('/') + self.stream_format = stream_format + self.bitrate = bitrate + + self.state = StreamState() + self._clients: Set[web.StreamResponse] = set() + self._ffmpeg_process: Optional[asyncio.subprocess.Process] = None + self._stream_task: Optional[asyncio.Task] = None + self._app: Optional[web.Application] = None + self._runner: Optional[web.AppRunner] = None + self._site: Optional[web.TCPSite] = None + + self.on_track_end: Optional[Callable[[], Any]] = None + self.on_error: Optional[Callable[[str], Any]] = None + + self._audio_buffer: asyncio.Queue = asyncio.Queue(maxsize=self.MAX_BUFFER_SIZE) + self._broadcast_task: Optional[asyncio.Task] = None + self._shutdown = False + + self._pending_tasks: Set[asyncio.Task] = set() + + @property + def stream_url(self) -> str: + return f"http://{self.host}:{self.port}{self.mount_point}" + + @property + def listener_count(self) -> int: + return len(self._clients) + + @property + def current_source(self) -> str: + return self.state.source_path + + async def start(self): + """Initializes the web app and starts the broadcast loop.""" + self._shutdown = False + self._app = web.Application() + self._app.router.add_get(self.mount_point, self._handle_stream) + self._app.router.add_get(f"{self.mount_point}.m3u", self._handle_playlist) + self._app.router.add_get("/status", self._handle_status) + self._app.router.add_get("/", self._handle_info) + + self._runner = web.AppRunner(self._app) + await self._runner.setup() + + self._site = web.TCPSite(self._runner, self.host, self.port) + await self._site.start() + + self._broadcast_task = asyncio.create_task(self._broadcast_loop()) + logger.info(f"HTTP streaming server started at {self.stream_url}") + + async def stop(self): + """Stops FFMPEG, disconnects clients, and shuts down the server.""" + self._shutdown = True + await self._stop_ffmpeg() + + if self._broadcast_task: + self._broadcast_task.cancel() + try: + await self._broadcast_task + except asyncio.CancelledError: + pass + self._broadcast_task = None + + # Cleanup pending callback tasks + for task in self._pending_tasks: + task.cancel() + try: + await task + except (asyncio.CancelledError, Exception): + pass + self._pending_tasks.clear() + + for client in list(self._clients): + try: + await client.write_eof() + except Exception: + pass + self._clients.clear() + + if self._site: + await self._site.stop() + if self._runner: + await self._runner.cleanup() + + self._clear_buffer() + gc.collect() + + logger.info("HTTP streaming server stopped") + + def _clear_buffer(self): + """Drains the audio buffer to prevent memory bloat.""" + count = 0 + while not self._audio_buffer.empty(): + try: + chunk = self._audio_buffer.get_nowait() + del chunk + count += 1 + except asyncio.QueueEmpty: + break + if count > 0: + logger.debug(f"Cleared {count} chunks from buffer") + + async def play_file(self, filepath: str, title: str = "", artist: str = "") -> bool: + """Starts streaming a local file.""" + if not os.path.exists(filepath): + logger.error(f"File not found: {filepath}") + return False + + await self._stop_ffmpeg() + + self.state.source_type = StreamSource.FILE + self.state.source_path = filepath + self.state.title = title or Path(filepath).stem + self.state.artist = artist + self.state.duration = await self._get_duration(filepath) + + success = await self._start_ffmpeg(filepath) + if success: + self.state.is_playing = True + logger.info(f"Playing file: {filepath}") + + return success + + async def play_url(self, url: str, title: str = "") -> bool: + """Starts streaming from a remote URL.""" + await self._stop_ffmpeg() + + self.state.source_type = StreamSource.URL + self.state.source_path = url + self.state.title = title or "Online Radio" + self.state.artist = "" + self.state.duration = 0.0 + + success = await self._start_ffmpeg(url) + if success: + self.state.is_playing = True + logger.info(f"Playing URL: {url}") + + return success + + async def stop_playback(self): + await self._stop_ffmpeg() + self.state.is_playing = False + self.state.source_type = StreamSource.SILENCE + self.state.source_path = "" + self.state.title = "Silence" + self.state.artist = "" + + async def _start_ffmpeg(self, source: str) -> bool: + """Constructs and executes the FFMPEG command to transcode audio.""" + codec_args = { + 'mp3': ['-c:a', 'libmp3lame', '-b:a', f'{self.bitrate}k'], + 'ogg': ['-c:a', 'libvorbis', '-b:a', f'{self.bitrate}k'], + 'aac': ['-c:a', 'aac', '-b:a', f'{self.bitrate}k'], + 'opus': ['-c:a', 'libopus', '-b:a', f'{self.bitrate}k'] + } + + format_args = codec_args.get(self.stream_format, codec_args['mp3']) + + # Optimize inputs for network streams + if source.startswith(('http://', 'https://', 'rtmp://')): + cmd = [ + 'ffmpeg', '-reconnect', '1', '-reconnect_streamed', '1', + '-reconnect_delay_max', '5', '-re', + '-thread_queue_size', '256', + '-i', source, '-vn' + ] + else: + cmd = ['ffmpeg', '-re', '-i', source, '-vn'] + + cmd.extend(format_args) + cmd.extend([ + '-f', 'mp3' if self.stream_format == 'mp3' else self.stream_format, + '-bufsize', '32k', + '-' + ]) + + try: + self._ffmpeg_process = await asyncio.create_subprocess_exec( + *cmd, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.DEVNULL, + limit=1024 * 32 + ) + self._stream_task = asyncio.create_task(self._read_stream()) + return True + except Exception as e: + logger.error(f"Failed to start ffmpeg: {e}") + return False + + async def _stop_ffmpeg(self): + """Terminates the FFMPEG process.""" + if self._stream_task: + self._stream_task.cancel() + try: + await asyncio.wait_for(self._stream_task, timeout=2.0) + except (asyncio.CancelledError, asyncio.TimeoutError): + pass + self._stream_task = None + + if self._ffmpeg_process: + try: + self._ffmpeg_process.terminate() + try: + await asyncio.wait_for(self._ffmpeg_process.wait(), timeout=2.0) + except asyncio.TimeoutError: + self._ffmpeg_process.kill() + try: + await asyncio.wait_for(self._ffmpeg_process.wait(), timeout=1.0) + except asyncio.TimeoutError: + pass + except ProcessLookupError: + pass + except Exception as e: + logger.debug(f"Error stopping ffmpeg: {e}") + finally: + self._ffmpeg_process = None + + self._clear_buffer() + gc.collect() + + async def _read_stream(self): + """Reads stdout from FFMPEG and pushes to the broadcast buffer.""" + try: + while not self._shutdown: + if not self._ffmpeg_process or self._ffmpeg_process.returncode is not None: + break + + try: + chunk = await asyncio.wait_for( + self._ffmpeg_process.stdout.read(self.CHUNK_SIZE), + timeout=10.0 + ) + except asyncio.TimeoutError: + continue + + if not chunk: + logger.debug('Stream EOF') + break + + # Drop frames if buffer is full to keep stream "live" + dropped = 0 + while self._audio_buffer.full(): + try: + old_chunk = self._audio_buffer.get_nowait() + del old_chunk + dropped += 1 + except asyncio.QueueEmpty: + break + + if dropped > 5: + logger.debug(f"Dropped {dropped} old chunks (buffer pressure)") + + try: + self._audio_buffer.put_nowait(chunk) + except asyncio.QueueFull: + del chunk + + self.state.is_playing = False + + # Fire track end callback safely + if self.on_track_end and not self._shutdown: + task = asyncio.create_task(self._safe_callback(self.on_track_end)) + self._pending_tasks.add(task) + task.add_done_callback(self._pending_tasks.discard) + + except asyncio.CancelledError: + pass + except Exception as e: + logger.error(f"Stream read error: {e}") + if self.on_error: + task = asyncio.create_task(self._safe_callback(self.on_error, str(e))) + self._pending_tasks.add(task) + task.add_done_callback(self._pending_tasks.discard) + + async def _safe_callback(self, callback: Callable, *args): + try: + result = callback(*args) + if asyncio.iscoroutine(result): + await result + except Exception as e: + logger.error(f"Callback error: {e}") + + async def _broadcast_loop(self): + """Reads from the buffer and writes to all connected HTTP clients.""" + try: + while not self._shutdown: + try: + chunk = await asyncio.wait_for(self._audio_buffer.get(), timeout=1.0) + except asyncio.TimeoutError: + continue + + if not self._clients: + del chunk + continue + + disconnected = set() + for client in list(self._clients): + try: + await asyncio.wait_for(client.write(chunk), timeout=1.0) + except Exception: + disconnected.add(client) + + self._clients -= disconnected + self.state.listeners = len(self._clients) + + del chunk + + except asyncio.CancelledError: + pass + except Exception as e: + logger.error(f"Broadcast loop error: {e}") + + async def _get_duration(self, filepath: str) -> float: + """Uses ffprobe to determine file duration.""" + try: + process = await asyncio.create_subprocess_exec( + 'ffprobe', '-v', 'quiet', '-show_entries', 'format=duration', + '-of', 'default=noprint_wrappers=1:nokey=1', filepath, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.DEVNULL + ) + stdout, _ = await asyncio.wait_for(process.communicate(), timeout=10) + return float(stdout.decode().strip()) + except Exception: + return 0.0 + + async def _handle_stream(self, request: web.Request) -> web.StreamResponse: + """Handles HTTP GET requests for the audio stream.""" + if len(self._clients) >= self.MAX_CLIENTS: + return web.Response(status=503, text="Server at capacity") + + content_type = { + 'mp3': 'audio/mpeg', 'ogg': 'audio/ogg', + 'aac': 'audio/aac', 'opus': 'audio/opus' + }.get(self.stream_format, 'audio/mpeg') + + response = web.StreamResponse( + status=200, + headers={ + 'Content-Type': content_type, + 'Cache-Control': 'no-cache, no-store', + 'Connection': 'keep-alive', + 'icy-name': 'XMPP Radio Bot', + } + ) + await response.prepare(request) + + self._clients.add(response) + self.state.listeners = len(self._clients) + + try: + while not response.task.done() and not self._shutdown: + await asyncio.sleep(1) + except asyncio.CancelledError: + pass + finally: + self._clients.discard(response) + self.state.listeners = len(self._clients) + + return response + + async def _handle_playlist(self, request: web.Request) -> web.Response: + m3u = f"#EXTM3U\n#EXTINF:-1,XMPP Radio\n{self.stream_url}\n" + return web.Response(text=m3u, content_type='audio/x-mpegurl') + + async def _handle_status(self, request: web.Request) -> web.Response: + import json + status = { + 'is_playing': self.state.is_playing, + 'title': self.state.title, + 'artist': self.state.artist, + 'listeners': self.state.listeners, + 'stream_url': self.stream_url + } + return web.Response(text=json.dumps(status), content_type='application/json') + + async def _handle_info(self, request: web.Request) -> web.Response: + html = f"
Now Playing: {self.state.title}
Listeners: {self.state.listeners}
" + return web.Response(text=html, content_type='text/html') \ No newline at end of file