#!/usr/bin/env python3 """ Audio Streamer Module. This module provides an HTTP streaming server that distributes audio to connected clients (like Shoutcast/Icecast). It pipes audio from an FFMPEG process into an asyncio buffer and serves it via aiohttp. """ import os import asyncio import subprocess import gc import weakref from pathlib import Path from typing import Optional, Dict, Set, Callable, Any from dataclasses import dataclass, field from enum import Enum from aiohttp import web import logging logger = logging.getLogger(__name__) class StreamSource(Enum): FILE = "file" URL = "url" SILENCE = "silence" @dataclass(slots=True) class StreamState: """Holds metadata about the current stream.""" source_type: StreamSource = StreamSource.SILENCE source_path: str = "" title: str = "Silence" artist: str = "" is_playing: bool = False duration: float = 0.0 listeners: int = 0 class AudioStreamer: """ Manages the FFMPEG subprocess and HTTP server for audio streaming. Attributes: _audio_buffer: Queue holding audio chunks from FFMPEG. _clients: Set of active HTTP response objects to write to. """ CHUNK_SIZE = 2048 MAX_BUFFER_SIZE = 30 MAX_CLIENTS = 50 def __init__( self, host: str = "0.0.0.0", port: int = 8000, mount_point: str = "/radio", stream_format: str = "mp3", bitrate: int = 128 ): self.host = host self.port = port self.mount_point = mount_point.rstrip('/') self.stream_format = stream_format self.bitrate = bitrate self.state = StreamState() self._clients: Set[web.StreamResponse] = set() self._ffmpeg_process: Optional[asyncio.subprocess.Process] = None self._stream_task: Optional[asyncio.Task] = None self._app: Optional[web.Application] = None self._runner: Optional[web.AppRunner] = None self._site: Optional[web.TCPSite] = None self.on_track_end: Optional[Callable[[], Any]] = None self.on_error: Optional[Callable[[str], Any]] = None self._audio_buffer: asyncio.Queue = asyncio.Queue(maxsize=self.MAX_BUFFER_SIZE) self._broadcast_task: Optional[asyncio.Task] = None self._shutdown = False self._pending_tasks: Set[asyncio.Task] = set() @property def stream_url(self) -> str: return f"http://{self.host}:{self.port}{self.mount_point}" @property def listener_count(self) -> int: return len(self._clients) @property def current_source(self) -> str: return self.state.source_path async def start(self): """Initializes the web app and starts the broadcast loop.""" self._shutdown = False self._app = web.Application() self._app.router.add_get(self.mount_point, self._handle_stream) self._app.router.add_get(f"{self.mount_point}.m3u", self._handle_playlist) self._app.router.add_get("/status", self._handle_status) self._app.router.add_get("/", self._handle_info) self._runner = web.AppRunner(self._app) await self._runner.setup() self._site = web.TCPSite(self._runner, self.host, self.port) await self._site.start() self._broadcast_task = asyncio.create_task(self._broadcast_loop()) logger.info(f"HTTP streaming server started at {self.stream_url}") async def stop(self): """Stops FFMPEG, disconnects clients, and shuts down the server.""" self._shutdown = True await self._stop_ffmpeg() if self._broadcast_task: self._broadcast_task.cancel() try: await self._broadcast_task except asyncio.CancelledError: pass self._broadcast_task = None # Cleanup pending callback tasks for task in self._pending_tasks: task.cancel() try: await task except (asyncio.CancelledError, Exception): pass self._pending_tasks.clear() for client in list(self._clients): try: await client.write_eof() except Exception: pass self._clients.clear() if self._site: await self._site.stop() if self._runner: await self._runner.cleanup() self._clear_buffer() gc.collect() logger.info("HTTP streaming server stopped") def _clear_buffer(self): """Drains the audio buffer to prevent memory bloat.""" count = 0 while not self._audio_buffer.empty(): try: chunk = self._audio_buffer.get_nowait() del chunk count += 1 except asyncio.QueueEmpty: break if count > 0: logger.debug(f"Cleared {count} chunks from buffer") async def play_file(self, filepath: str, title: str = "", artist: str = "") -> bool: """Starts streaming a local file.""" if not os.path.exists(filepath): logger.error(f"File not found: {filepath}") return False await self._stop_ffmpeg() self.state.source_type = StreamSource.FILE self.state.source_path = filepath self.state.title = title or Path(filepath).stem self.state.artist = artist self.state.duration = await self._get_duration(filepath) success = await self._start_ffmpeg(filepath) if success: self.state.is_playing = True logger.info(f"Playing file: {filepath}") return success async def play_url(self, url: str, title: str = "") -> bool: """Starts streaming from a remote URL.""" await self._stop_ffmpeg() self.state.source_type = StreamSource.URL self.state.source_path = url self.state.title = title or "Online Radio" self.state.artist = "" self.state.duration = 0.0 success = await self._start_ffmpeg(url) if success: self.state.is_playing = True logger.info(f"Playing URL: {url}") return success async def stop_playback(self): await self._stop_ffmpeg() self.state.is_playing = False self.state.source_type = StreamSource.SILENCE self.state.source_path = "" self.state.title = "Silence" self.state.artist = "" async def _start_ffmpeg(self, source: str) -> bool: """Constructs and executes the FFMPEG command to transcode audio.""" codec_args = { 'mp3': ['-c:a', 'libmp3lame', '-b:a', f'{self.bitrate}k'], 'ogg': ['-c:a', 'libvorbis', '-b:a', f'{self.bitrate}k'], 'aac': ['-c:a', 'aac', '-b:a', f'{self.bitrate}k'], 'opus': ['-c:a', 'libopus', '-b:a', f'{self.bitrate}k'] } format_args = codec_args.get(self.stream_format, codec_args['mp3']) # Optimize inputs for network streams if source.startswith(('http://', 'https://', 'rtmp://')): cmd = [ 'ffmpeg', '-reconnect', '1', '-reconnect_streamed', '1', '-reconnect_delay_max', '5', '-re', '-thread_queue_size', '256', '-i', source, '-vn' ] else: cmd = ['ffmpeg', '-re', '-i', source, '-vn'] cmd.extend(format_args) cmd.extend([ '-f', 'mp3' if self.stream_format == 'mp3' else self.stream_format, '-bufsize', '32k', '-' ]) try: self._ffmpeg_process = await asyncio.create_subprocess_exec( *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.DEVNULL, limit=1024 * 32 ) self._stream_task = asyncio.create_task(self._read_stream()) return True except Exception as e: logger.error(f"Failed to start ffmpeg: {e}") return False async def _stop_ffmpeg(self): """Terminates the FFMPEG process.""" if self._stream_task: self._stream_task.cancel() try: await asyncio.wait_for(self._stream_task, timeout=2.0) except (asyncio.CancelledError, asyncio.TimeoutError): pass self._stream_task = None if self._ffmpeg_process: try: self._ffmpeg_process.terminate() try: await asyncio.wait_for(self._ffmpeg_process.wait(), timeout=2.0) except asyncio.TimeoutError: self._ffmpeg_process.kill() try: await asyncio.wait_for(self._ffmpeg_process.wait(), timeout=1.0) except asyncio.TimeoutError: pass except ProcessLookupError: pass except Exception as e: logger.debug(f"Error stopping ffmpeg: {e}") finally: self._ffmpeg_process = None self._clear_buffer() gc.collect() async def _read_stream(self): """Reads stdout from FFMPEG and pushes to the broadcast buffer.""" try: while not self._shutdown: if not self._ffmpeg_process or self._ffmpeg_process.returncode is not None: break try: chunk = await asyncio.wait_for( self._ffmpeg_process.stdout.read(self.CHUNK_SIZE), timeout=10.0 ) except asyncio.TimeoutError: continue if not chunk: logger.debug('Stream EOF') break # Drop frames if buffer is full to keep stream "live" dropped = 0 while self._audio_buffer.full(): try: old_chunk = self._audio_buffer.get_nowait() del old_chunk dropped += 1 except asyncio.QueueEmpty: break if dropped > 5: logger.debug(f"Dropped {dropped} old chunks (buffer pressure)") try: self._audio_buffer.put_nowait(chunk) except asyncio.QueueFull: del chunk self.state.is_playing = False # Fire track end callback safely if self.on_track_end and not self._shutdown: task = asyncio.create_task(self._safe_callback(self.on_track_end)) self._pending_tasks.add(task) task.add_done_callback(self._pending_tasks.discard) except asyncio.CancelledError: pass except Exception as e: logger.error(f"Stream read error: {e}") if self.on_error: task = asyncio.create_task(self._safe_callback(self.on_error, str(e))) self._pending_tasks.add(task) task.add_done_callback(self._pending_tasks.discard) async def _safe_callback(self, callback: Callable, *args): try: result = callback(*args) if asyncio.iscoroutine(result): await result except Exception as e: logger.error(f"Callback error: {e}") async def _broadcast_loop(self): """Reads from the buffer and writes to all connected HTTP clients.""" try: while not self._shutdown: try: chunk = await asyncio.wait_for(self._audio_buffer.get(), timeout=1.0) except asyncio.TimeoutError: continue if not self._clients: del chunk continue disconnected = set() for client in list(self._clients): try: await asyncio.wait_for(client.write(chunk), timeout=1.0) except Exception: disconnected.add(client) self._clients -= disconnected self.state.listeners = len(self._clients) del chunk except asyncio.CancelledError: pass except Exception as e: logger.error(f"Broadcast loop error: {e}") async def _get_duration(self, filepath: str) -> float: """Uses ffprobe to determine file duration.""" try: process = await asyncio.create_subprocess_exec( 'ffprobe', '-v', 'quiet', '-show_entries', 'format=duration', '-of', 'default=noprint_wrappers=1:nokey=1', filepath, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.DEVNULL ) stdout, _ = await asyncio.wait_for(process.communicate(), timeout=10) return float(stdout.decode().strip()) except Exception: return 0.0 async def _handle_stream(self, request: web.Request) -> web.StreamResponse: """Handles HTTP GET requests for the audio stream.""" if len(self._clients) >= self.MAX_CLIENTS: return web.Response(status=503, text="Server at capacity") content_type = { 'mp3': 'audio/mpeg', 'ogg': 'audio/ogg', 'aac': 'audio/aac', 'opus': 'audio/opus' }.get(self.stream_format, 'audio/mpeg') response = web.StreamResponse( status=200, headers={ 'Content-Type': content_type, 'Cache-Control': 'no-cache, no-store', 'Connection': 'keep-alive', 'icy-name': 'XMPP Radio Bot', } ) await response.prepare(request) self._clients.add(response) self.state.listeners = len(self._clients) try: while not response.task.done() and not self._shutdown: await asyncio.sleep(1) except asyncio.CancelledError: pass finally: self._clients.discard(response) self.state.listeners = len(self._clients) return response async def _handle_playlist(self, request: web.Request) -> web.Response: m3u = f"#EXTM3U\n#EXTINF:-1,XMPP Radio\n{self.stream_url}\n" return web.Response(text=m3u, content_type='audio/x-mpegurl') async def _handle_status(self, request: web.Request) -> web.Response: import json status = { 'is_playing': self.state.is_playing, 'title': self.state.title, 'artist': self.state.artist, 'listeners': self.state.listeners, 'stream_url': self.stream_url } return web.Response(text=json.dumps(status), content_type='application/json') async def _handle_info(self, request: web.Request) -> web.Response: html = f"
Now Playing: {self.state.title}
Listeners: {self.state.listeners}
" return web.Response(text=html, content_type='text/html')