#!/usr/bin/env python3 import os import asyncio import subprocess import gc import weakref from pathlib import Path from typing import Optional, Dict, Set, Callable, Any from dataclasses import dataclass, field from enum import Enum from aiohttp import web import logging logger = logging.getLogger(__name__) class StreamSource(Enum): FILE = "file" URL = "url" SILENCE = "silence" @dataclass(slots=True) class StreamState: source_type: StreamSource = StreamSource.SILENCE source_path: str = "" title: str = "Silence" artist: str = "" is_playing: bool = False duration: float = 0.0 listeners: int = 0 class AudioStreamer: CHUNK_SIZE = 2048 MAX_BUFFER_SIZE = 30 MAX_CLIENTS = 50 def __init__( self, host: str = "0.0.0.0", port: int = 8000, mount_point: str = "/radio", stream_format: str = "mp3", bitrate: int = 128 ): self.host = host self.port = port self.mount_point = mount_point.rstrip('/') self.stream_format = stream_format self.bitrate = bitrate self.state = StreamState() self._clients: Set[web.StreamResponse] = set() self._ffmpeg_process: Optional[asyncio.subprocess.Process] = None self._stream_task: Optional[asyncio.Task] = None self._app: Optional[web.Application] = None self._runner: Optional[web.AppRunner] = None self._site: Optional[web.TCPSite] = None self.on_track_end: Optional[Callable[[], Any]] = None self.on_error: Optional[Callable[[str], Any]] = None self._audio_buffer: asyncio.Queue = asyncio.Queue(maxsize=self.MAX_BUFFER_SIZE) self._broadcast_task: Optional[asyncio.Task] = None self._shutdown = False self._pending_tasks: Set[asyncio.Task] = set() @property def stream_url(self) -> str: return f"http://{self.host}:{self.port}{self.mount_point}" @property def listener_count(self) -> int: return len(self._clients) @property def current_source(self) -> str: return self.state.source_path async def start(self): self._shutdown = False self._app = web.Application() self._app.router.add_get(self.mount_point, self._handle_stream) self._app.router.add_get(f"{self.mount_point}.m3u", self._handle_playlist) self._app.router.add_get("/status", self._handle_status) self._app.router.add_get("/", self._handle_info) self._runner = web.AppRunner(self._app) await self._runner.setup() self._site = web.TCPSite(self._runner, self.host, self.port) await self._site.start() self._broadcast_task = asyncio.create_task(self._broadcast_loop()) logger.info(f"🔊 HTTP streaming server started at {self.stream_url}") async def stop(self): self._shutdown = True await self._stop_ffmpeg() if self._broadcast_task: self._broadcast_task.cancel() try: await self._broadcast_task except asyncio.CancelledError: pass self._broadcast_task = None for task in self._pending_tasks: task.cancel() try: await task except (asyncio.CancelledError, Exception): pass self._pending_tasks.clear() for client in list(self._clients): try: await client.write_eof() except Exception: pass self._clients.clear() if self._site: await self._site.stop() if self._runner: await self._runner.cleanup() self._clear_buffer() gc.collect() logger.info("🔇 HTTP streaming server stopped") def _clear_buffer(self): count = 0 while not self._audio_buffer.empty(): try: chunk = self._audio_buffer.get_nowait() del chunk count += 1 except asyncio.QueueEmpty: break if count > 0: logger.debug(f"Cleared {count} chunks from buffer") async def play_file(self, filepath: str, title: str = "", artist: str = "") -> bool: if not os.path.exists(filepath): logger.error(f"File not found: {filepath}") return False await self._stop_ffmpeg() self.state.source_type = StreamSource.FILE self.state.source_path = filepath self.state.title = title or Path(filepath).stem self.state.artist = artist self.state.duration = await self._get_duration(filepath) success = await self._start_ffmpeg(filepath) if success: self.state.is_playing = True logger.info(f"▶️ Playing file: {filepath}") return success async def play_url(self, url: str, title: str = "") -> bool: await self._stop_ffmpeg() self.state.source_type = StreamSource.URL self.state.source_path = url self.state.title = title or "Online Radio" self.state.artist = "" self.state.duration = 0.0 success = await self._start_ffmpeg(url) if success: self.state.is_playing = True logger.info(f"📻 Playing URL: {url}") return success async def stop_playback(self): await self._stop_ffmpeg() self.state.is_playing = False self.state.source_type = StreamSource.SILENCE self.state.source_path = "" self.state.title = "Silence" self.state.artist = "" async def _start_ffmpeg(self, source: str) -> bool: codec_args = { 'mp3': ['-c:a', 'libmp3lame', '-b:a', f'{self.bitrate}k'], 'ogg': ['-c:a', 'libvorbis', '-b:a', f'{self.bitrate}k'], 'aac': ['-c:a', 'aac', '-b:a', f'{self.bitrate}k'], 'opus': ['-c:a', 'libopus', '-b:a', f'{self.bitrate}k'] } format_args = codec_args.get(self.stream_format, codec_args['mp3']) if source.startswith(('http://', 'https://', 'rtmp://')): cmd = [ 'ffmpeg', '-reconnect', '1', '-reconnect_streamed', '1', '-reconnect_delay_max', '5', '-re', '-thread_queue_size', '256', '-i', source, '-vn' ] else: cmd = ['ffmpeg', '-re', '-i', source, '-vn'] cmd.extend(format_args) cmd.extend([ '-f', 'mp3' if self.stream_format == 'mp3' else self.stream_format, '-bufsize', '32k', '-' ]) try: self._ffmpeg_process = await asyncio.create_subprocess_exec( *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.DEVNULL, limit=1024 * 32 ) self._stream_task = asyncio.create_task(self._read_stream()) return True except Exception as e: logger.error(f"Failed to start ffmpeg: {e}") return False async def _stop_ffmpeg(self): if self._stream_task: self._stream_task.cancel() try: await asyncio.wait_for(self._stream_task, timeout=2.0) except (asyncio.CancelledError, asyncio.TimeoutError): pass self._stream_task = None if self._ffmpeg_process: try: self._ffmpeg_process.terminate() try: await asyncio.wait_for(self._ffmpeg_process.wait(), timeout=2.0) except asyncio.TimeoutError: self._ffmpeg_process.kill() try: await asyncio.wait_for(self._ffmpeg_process.wait(), timeout=1.0) except asyncio.TimeoutError: pass except ProcessLookupError: pass except Exception as e: logger.debug(f"Error stopping ffmpeg: {e}") finally: self._ffmpeg_process = None self._clear_buffer() gc.collect() async def _read_stream(self): try: while not self._shutdown: if not self._ffmpeg_process or self._ffmpeg_process.returncode is not None: break try: chunk = await asyncio.wait_for( self._ffmpeg_process.stdout.read(self.CHUNK_SIZE), timeout=10.0 ) except asyncio.TimeoutError: continue if not chunk: logger.debug('Stream EOF') break dropped = 0 while self._audio_buffer.full(): try: old_chunk = self._audio_buffer.get_nowait() del old_chunk dropped += 1 except asyncio.QueueEmpty: break if dropped > 5: logger.debug(f"Dropped {dropped} old chunks (buffer pressure)") try: self._audio_buffer.put_nowait(chunk) except asyncio.QueueFull: del chunk self.state.is_playing = False if self.on_track_end and not self._shutdown: task = asyncio.create_task(self._safe_callback(self.on_track_end)) self._pending_tasks.add(task) task.add_done_callback(self._pending_tasks.discard) except asyncio.CancelledError: pass except Exception as e: logger.error(f"Stream read error: {e}") if self.on_error: task = asyncio.create_task(self._safe_callback(self.on_error, str(e))) self._pending_tasks.add(task) task.add_done_callback(self._pending_tasks.discard) async def _safe_callback(self, callback: Callable, *args): try: result = callback(*args) if asyncio.iscoroutine(result): await result except Exception as e: logger.error(f"Callback error: {e}") async def _broadcast_loop(self): try: while not self._shutdown: try: chunk = await asyncio.wait_for(self._audio_buffer.get(), timeout=1.0) except asyncio.TimeoutError: continue if not self._clients: del chunk continue disconnected = set() for client in list(self._clients): try: await asyncio.wait_for(client.write(chunk), timeout=1.0) except Exception: disconnected.add(client) self._clients -= disconnected self.state.listeners = len(self._clients) del chunk except asyncio.CancelledError: pass except Exception as e: logger.error(f"Broadcast loop error: {e}") async def _get_duration(self, filepath: str) -> float: try: process = await asyncio.create_subprocess_exec( 'ffprobe', '-v', 'quiet', '-show_entries', 'format=duration', '-of', 'default=noprint_wrappers=1:nokey=1', filepath, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.DEVNULL ) stdout, _ = await asyncio.wait_for(process.communicate(), timeout=10) return float(stdout.decode().strip()) except Exception: return 0.0 async def _handle_stream(self, request: web.Request) -> web.StreamResponse: if len(self._clients) >= self.MAX_CLIENTS: return web.Response(status=503, text="Server at capacity") content_type = { 'mp3': 'audio/mpeg', 'ogg': 'audio/ogg', 'aac': 'audio/aac', 'opus': 'audio/opus' }.get(self.stream_format, 'audio/mpeg') response = web.StreamResponse( status=200, headers={ 'Content-Type': content_type, 'Cache-Control': 'no-cache, no-store', 'Connection': 'keep-alive', 'icy-name': 'XMPP Radio Bot', } ) await response.prepare(request) self._clients.add(response) self.state.listeners = len(self._clients) try: while not response.task.done() and not self._shutdown: await asyncio.sleep(1) except asyncio.CancelledError: pass finally: self._clients.discard(response) self.state.listeners = len(self._clients) return response async def _handle_playlist(self, request: web.Request) -> web.Response: m3u = f"#EXTM3U\n#EXTINF:-1,XMPP Radio\n{self.stream_url}\n" return web.Response(text=m3u, content_type='audio/x-mpegurl') async def _handle_status(self, request: web.Request) -> web.Response: import json status = { 'is_playing': self.state.is_playing, 'title': self.state.title, 'artist': self.state.artist, 'listeners': self.state.listeners, 'stream_url': self.stream_url } return web.Response(text=json.dumps(status), content_type='application/json') async def _handle_info(self, request: web.Request) -> web.Response: html = f""" XMPP Radio

🎵 XMPP Radio Bot

Now Playing: {self.state.title}

Listeners: {self.state.listeners}

""" return web.Response(text=html, content_type='text/html')