Add audio_streamer.py
This commit is contained in:
446
audio_streamer.py
Normal file
446
audio_streamer.py
Normal file
@@ -0,0 +1,446 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
"""
|
||||||
|
Audio Streamer Module.
|
||||||
|
|
||||||
|
This module provides an HTTP streaming server that distributes audio to
|
||||||
|
connected clients (like Shoutcast/Icecast). It pipes audio from an FFMPEG
|
||||||
|
process into an asyncio buffer and serves it via aiohttp.
|
||||||
|
"""
|
||||||
|
|
||||||
|
import os
|
||||||
|
import asyncio
|
||||||
|
import subprocess
|
||||||
|
import gc
|
||||||
|
import weakref
|
||||||
|
from pathlib import Path
|
||||||
|
from typing import Optional, Dict, Set, Callable, Any
|
||||||
|
from dataclasses import dataclass, field
|
||||||
|
from enum import Enum
|
||||||
|
from aiohttp import web
|
||||||
|
import logging
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
class StreamSource(Enum):
|
||||||
|
FILE = "file"
|
||||||
|
URL = "url"
|
||||||
|
SILENCE = "silence"
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass(slots=True)
|
||||||
|
class StreamState:
|
||||||
|
"""Holds metadata about the current stream."""
|
||||||
|
source_type: StreamSource = StreamSource.SILENCE
|
||||||
|
source_path: str = ""
|
||||||
|
title: str = "Silence"
|
||||||
|
artist: str = ""
|
||||||
|
is_playing: bool = False
|
||||||
|
duration: float = 0.0
|
||||||
|
listeners: int = 0
|
||||||
|
|
||||||
|
|
||||||
|
class AudioStreamer:
|
||||||
|
"""
|
||||||
|
Manages the FFMPEG subprocess and HTTP server for audio streaming.
|
||||||
|
|
||||||
|
Attributes:
|
||||||
|
_audio_buffer: Queue holding audio chunks from FFMPEG.
|
||||||
|
_clients: Set of active HTTP response objects to write to.
|
||||||
|
"""
|
||||||
|
|
||||||
|
CHUNK_SIZE = 2048
|
||||||
|
MAX_BUFFER_SIZE = 30
|
||||||
|
MAX_CLIENTS = 50
|
||||||
|
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
host: str = "0.0.0.0",
|
||||||
|
port: int = 8000,
|
||||||
|
mount_point: str = "/radio",
|
||||||
|
stream_format: str = "mp3",
|
||||||
|
bitrate: int = 128
|
||||||
|
):
|
||||||
|
self.host = host
|
||||||
|
self.port = port
|
||||||
|
self.mount_point = mount_point.rstrip('/')
|
||||||
|
self.stream_format = stream_format
|
||||||
|
self.bitrate = bitrate
|
||||||
|
|
||||||
|
self.state = StreamState()
|
||||||
|
self._clients: Set[web.StreamResponse] = set()
|
||||||
|
self._ffmpeg_process: Optional[asyncio.subprocess.Process] = None
|
||||||
|
self._stream_task: Optional[asyncio.Task] = None
|
||||||
|
self._app: Optional[web.Application] = None
|
||||||
|
self._runner: Optional[web.AppRunner] = None
|
||||||
|
self._site: Optional[web.TCPSite] = None
|
||||||
|
|
||||||
|
self.on_track_end: Optional[Callable[[], Any]] = None
|
||||||
|
self.on_error: Optional[Callable[[str], Any]] = None
|
||||||
|
|
||||||
|
self._audio_buffer: asyncio.Queue = asyncio.Queue(maxsize=self.MAX_BUFFER_SIZE)
|
||||||
|
self._broadcast_task: Optional[asyncio.Task] = None
|
||||||
|
self._shutdown = False
|
||||||
|
|
||||||
|
self._pending_tasks: Set[asyncio.Task] = set()
|
||||||
|
|
||||||
|
@property
|
||||||
|
def stream_url(self) -> str:
|
||||||
|
return f"http://{self.host}:{self.port}{self.mount_point}"
|
||||||
|
|
||||||
|
@property
|
||||||
|
def listener_count(self) -> int:
|
||||||
|
return len(self._clients)
|
||||||
|
|
||||||
|
@property
|
||||||
|
def current_source(self) -> str:
|
||||||
|
return self.state.source_path
|
||||||
|
|
||||||
|
async def start(self):
|
||||||
|
"""Initializes the web app and starts the broadcast loop."""
|
||||||
|
self._shutdown = False
|
||||||
|
self._app = web.Application()
|
||||||
|
self._app.router.add_get(self.mount_point, self._handle_stream)
|
||||||
|
self._app.router.add_get(f"{self.mount_point}.m3u", self._handle_playlist)
|
||||||
|
self._app.router.add_get("/status", self._handle_status)
|
||||||
|
self._app.router.add_get("/", self._handle_info)
|
||||||
|
|
||||||
|
self._runner = web.AppRunner(self._app)
|
||||||
|
await self._runner.setup()
|
||||||
|
|
||||||
|
self._site = web.TCPSite(self._runner, self.host, self.port)
|
||||||
|
await self._site.start()
|
||||||
|
|
||||||
|
self._broadcast_task = asyncio.create_task(self._broadcast_loop())
|
||||||
|
logger.info(f"HTTP streaming server started at {self.stream_url}")
|
||||||
|
|
||||||
|
async def stop(self):
|
||||||
|
"""Stops FFMPEG, disconnects clients, and shuts down the server."""
|
||||||
|
self._shutdown = True
|
||||||
|
await self._stop_ffmpeg()
|
||||||
|
|
||||||
|
if self._broadcast_task:
|
||||||
|
self._broadcast_task.cancel()
|
||||||
|
try:
|
||||||
|
await self._broadcast_task
|
||||||
|
except asyncio.CancelledError:
|
||||||
|
pass
|
||||||
|
self._broadcast_task = None
|
||||||
|
|
||||||
|
# Cleanup pending callback tasks
|
||||||
|
for task in self._pending_tasks:
|
||||||
|
task.cancel()
|
||||||
|
try:
|
||||||
|
await task
|
||||||
|
except (asyncio.CancelledError, Exception):
|
||||||
|
pass
|
||||||
|
self._pending_tasks.clear()
|
||||||
|
|
||||||
|
for client in list(self._clients):
|
||||||
|
try:
|
||||||
|
await client.write_eof()
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
self._clients.clear()
|
||||||
|
|
||||||
|
if self._site:
|
||||||
|
await self._site.stop()
|
||||||
|
if self._runner:
|
||||||
|
await self._runner.cleanup()
|
||||||
|
|
||||||
|
self._clear_buffer()
|
||||||
|
gc.collect()
|
||||||
|
|
||||||
|
logger.info("HTTP streaming server stopped")
|
||||||
|
|
||||||
|
def _clear_buffer(self):
|
||||||
|
"""Drains the audio buffer to prevent memory bloat."""
|
||||||
|
count = 0
|
||||||
|
while not self._audio_buffer.empty():
|
||||||
|
try:
|
||||||
|
chunk = self._audio_buffer.get_nowait()
|
||||||
|
del chunk
|
||||||
|
count += 1
|
||||||
|
except asyncio.QueueEmpty:
|
||||||
|
break
|
||||||
|
if count > 0:
|
||||||
|
logger.debug(f"Cleared {count} chunks from buffer")
|
||||||
|
|
||||||
|
async def play_file(self, filepath: str, title: str = "", artist: str = "") -> bool:
|
||||||
|
"""Starts streaming a local file."""
|
||||||
|
if not os.path.exists(filepath):
|
||||||
|
logger.error(f"File not found: {filepath}")
|
||||||
|
return False
|
||||||
|
|
||||||
|
await self._stop_ffmpeg()
|
||||||
|
|
||||||
|
self.state.source_type = StreamSource.FILE
|
||||||
|
self.state.source_path = filepath
|
||||||
|
self.state.title = title or Path(filepath).stem
|
||||||
|
self.state.artist = artist
|
||||||
|
self.state.duration = await self._get_duration(filepath)
|
||||||
|
|
||||||
|
success = await self._start_ffmpeg(filepath)
|
||||||
|
if success:
|
||||||
|
self.state.is_playing = True
|
||||||
|
logger.info(f"Playing file: {filepath}")
|
||||||
|
|
||||||
|
return success
|
||||||
|
|
||||||
|
async def play_url(self, url: str, title: str = "") -> bool:
|
||||||
|
"""Starts streaming from a remote URL."""
|
||||||
|
await self._stop_ffmpeg()
|
||||||
|
|
||||||
|
self.state.source_type = StreamSource.URL
|
||||||
|
self.state.source_path = url
|
||||||
|
self.state.title = title or "Online Radio"
|
||||||
|
self.state.artist = ""
|
||||||
|
self.state.duration = 0.0
|
||||||
|
|
||||||
|
success = await self._start_ffmpeg(url)
|
||||||
|
if success:
|
||||||
|
self.state.is_playing = True
|
||||||
|
logger.info(f"Playing URL: {url}")
|
||||||
|
|
||||||
|
return success
|
||||||
|
|
||||||
|
async def stop_playback(self):
|
||||||
|
await self._stop_ffmpeg()
|
||||||
|
self.state.is_playing = False
|
||||||
|
self.state.source_type = StreamSource.SILENCE
|
||||||
|
self.state.source_path = ""
|
||||||
|
self.state.title = "Silence"
|
||||||
|
self.state.artist = ""
|
||||||
|
|
||||||
|
async def _start_ffmpeg(self, source: str) -> bool:
|
||||||
|
"""Constructs and executes the FFMPEG command to transcode audio."""
|
||||||
|
codec_args = {
|
||||||
|
'mp3': ['-c:a', 'libmp3lame', '-b:a', f'{self.bitrate}k'],
|
||||||
|
'ogg': ['-c:a', 'libvorbis', '-b:a', f'{self.bitrate}k'],
|
||||||
|
'aac': ['-c:a', 'aac', '-b:a', f'{self.bitrate}k'],
|
||||||
|
'opus': ['-c:a', 'libopus', '-b:a', f'{self.bitrate}k']
|
||||||
|
}
|
||||||
|
|
||||||
|
format_args = codec_args.get(self.stream_format, codec_args['mp3'])
|
||||||
|
|
||||||
|
# Optimize inputs for network streams
|
||||||
|
if source.startswith(('http://', 'https://', 'rtmp://')):
|
||||||
|
cmd = [
|
||||||
|
'ffmpeg', '-reconnect', '1', '-reconnect_streamed', '1',
|
||||||
|
'-reconnect_delay_max', '5', '-re',
|
||||||
|
'-thread_queue_size', '256',
|
||||||
|
'-i', source, '-vn'
|
||||||
|
]
|
||||||
|
else:
|
||||||
|
cmd = ['ffmpeg', '-re', '-i', source, '-vn']
|
||||||
|
|
||||||
|
cmd.extend(format_args)
|
||||||
|
cmd.extend([
|
||||||
|
'-f', 'mp3' if self.stream_format == 'mp3' else self.stream_format,
|
||||||
|
'-bufsize', '32k',
|
||||||
|
'-'
|
||||||
|
])
|
||||||
|
|
||||||
|
try:
|
||||||
|
self._ffmpeg_process = await asyncio.create_subprocess_exec(
|
||||||
|
*cmd,
|
||||||
|
stdout=asyncio.subprocess.PIPE,
|
||||||
|
stderr=asyncio.subprocess.DEVNULL,
|
||||||
|
limit=1024 * 32
|
||||||
|
)
|
||||||
|
self._stream_task = asyncio.create_task(self._read_stream())
|
||||||
|
return True
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Failed to start ffmpeg: {e}")
|
||||||
|
return False
|
||||||
|
|
||||||
|
async def _stop_ffmpeg(self):
|
||||||
|
"""Terminates the FFMPEG process."""
|
||||||
|
if self._stream_task:
|
||||||
|
self._stream_task.cancel()
|
||||||
|
try:
|
||||||
|
await asyncio.wait_for(self._stream_task, timeout=2.0)
|
||||||
|
except (asyncio.CancelledError, asyncio.TimeoutError):
|
||||||
|
pass
|
||||||
|
self._stream_task = None
|
||||||
|
|
||||||
|
if self._ffmpeg_process:
|
||||||
|
try:
|
||||||
|
self._ffmpeg_process.terminate()
|
||||||
|
try:
|
||||||
|
await asyncio.wait_for(self._ffmpeg_process.wait(), timeout=2.0)
|
||||||
|
except asyncio.TimeoutError:
|
||||||
|
self._ffmpeg_process.kill()
|
||||||
|
try:
|
||||||
|
await asyncio.wait_for(self._ffmpeg_process.wait(), timeout=1.0)
|
||||||
|
except asyncio.TimeoutError:
|
||||||
|
pass
|
||||||
|
except ProcessLookupError:
|
||||||
|
pass
|
||||||
|
except Exception as e:
|
||||||
|
logger.debug(f"Error stopping ffmpeg: {e}")
|
||||||
|
finally:
|
||||||
|
self._ffmpeg_process = None
|
||||||
|
|
||||||
|
self._clear_buffer()
|
||||||
|
gc.collect()
|
||||||
|
|
||||||
|
async def _read_stream(self):
|
||||||
|
"""Reads stdout from FFMPEG and pushes to the broadcast buffer."""
|
||||||
|
try:
|
||||||
|
while not self._shutdown:
|
||||||
|
if not self._ffmpeg_process or self._ffmpeg_process.returncode is not None:
|
||||||
|
break
|
||||||
|
|
||||||
|
try:
|
||||||
|
chunk = await asyncio.wait_for(
|
||||||
|
self._ffmpeg_process.stdout.read(self.CHUNK_SIZE),
|
||||||
|
timeout=10.0
|
||||||
|
)
|
||||||
|
except asyncio.TimeoutError:
|
||||||
|
continue
|
||||||
|
|
||||||
|
if not chunk:
|
||||||
|
logger.debug('Stream EOF')
|
||||||
|
break
|
||||||
|
|
||||||
|
# Drop frames if buffer is full to keep stream "live"
|
||||||
|
dropped = 0
|
||||||
|
while self._audio_buffer.full():
|
||||||
|
try:
|
||||||
|
old_chunk = self._audio_buffer.get_nowait()
|
||||||
|
del old_chunk
|
||||||
|
dropped += 1
|
||||||
|
except asyncio.QueueEmpty:
|
||||||
|
break
|
||||||
|
|
||||||
|
if dropped > 5:
|
||||||
|
logger.debug(f"Dropped {dropped} old chunks (buffer pressure)")
|
||||||
|
|
||||||
|
try:
|
||||||
|
self._audio_buffer.put_nowait(chunk)
|
||||||
|
except asyncio.QueueFull:
|
||||||
|
del chunk
|
||||||
|
|
||||||
|
self.state.is_playing = False
|
||||||
|
|
||||||
|
# Fire track end callback safely
|
||||||
|
if self.on_track_end and not self._shutdown:
|
||||||
|
task = asyncio.create_task(self._safe_callback(self.on_track_end))
|
||||||
|
self._pending_tasks.add(task)
|
||||||
|
task.add_done_callback(self._pending_tasks.discard)
|
||||||
|
|
||||||
|
except asyncio.CancelledError:
|
||||||
|
pass
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Stream read error: {e}")
|
||||||
|
if self.on_error:
|
||||||
|
task = asyncio.create_task(self._safe_callback(self.on_error, str(e)))
|
||||||
|
self._pending_tasks.add(task)
|
||||||
|
task.add_done_callback(self._pending_tasks.discard)
|
||||||
|
|
||||||
|
async def _safe_callback(self, callback: Callable, *args):
|
||||||
|
try:
|
||||||
|
result = callback(*args)
|
||||||
|
if asyncio.iscoroutine(result):
|
||||||
|
await result
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Callback error: {e}")
|
||||||
|
|
||||||
|
async def _broadcast_loop(self):
|
||||||
|
"""Reads from the buffer and writes to all connected HTTP clients."""
|
||||||
|
try:
|
||||||
|
while not self._shutdown:
|
||||||
|
try:
|
||||||
|
chunk = await asyncio.wait_for(self._audio_buffer.get(), timeout=1.0)
|
||||||
|
except asyncio.TimeoutError:
|
||||||
|
continue
|
||||||
|
|
||||||
|
if not self._clients:
|
||||||
|
del chunk
|
||||||
|
continue
|
||||||
|
|
||||||
|
disconnected = set()
|
||||||
|
for client in list(self._clients):
|
||||||
|
try:
|
||||||
|
await asyncio.wait_for(client.write(chunk), timeout=1.0)
|
||||||
|
except Exception:
|
||||||
|
disconnected.add(client)
|
||||||
|
|
||||||
|
self._clients -= disconnected
|
||||||
|
self.state.listeners = len(self._clients)
|
||||||
|
|
||||||
|
del chunk
|
||||||
|
|
||||||
|
except asyncio.CancelledError:
|
||||||
|
pass
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Broadcast loop error: {e}")
|
||||||
|
|
||||||
|
async def _get_duration(self, filepath: str) -> float:
|
||||||
|
"""Uses ffprobe to determine file duration."""
|
||||||
|
try:
|
||||||
|
process = await asyncio.create_subprocess_exec(
|
||||||
|
'ffprobe', '-v', 'quiet', '-show_entries', 'format=duration',
|
||||||
|
'-of', 'default=noprint_wrappers=1:nokey=1', filepath,
|
||||||
|
stdout=asyncio.subprocess.PIPE,
|
||||||
|
stderr=asyncio.subprocess.DEVNULL
|
||||||
|
)
|
||||||
|
stdout, _ = await asyncio.wait_for(process.communicate(), timeout=10)
|
||||||
|
return float(stdout.decode().strip())
|
||||||
|
except Exception:
|
||||||
|
return 0.0
|
||||||
|
|
||||||
|
async def _handle_stream(self, request: web.Request) -> web.StreamResponse:
|
||||||
|
"""Handles HTTP GET requests for the audio stream."""
|
||||||
|
if len(self._clients) >= self.MAX_CLIENTS:
|
||||||
|
return web.Response(status=503, text="Server at capacity")
|
||||||
|
|
||||||
|
content_type = {
|
||||||
|
'mp3': 'audio/mpeg', 'ogg': 'audio/ogg',
|
||||||
|
'aac': 'audio/aac', 'opus': 'audio/opus'
|
||||||
|
}.get(self.stream_format, 'audio/mpeg')
|
||||||
|
|
||||||
|
response = web.StreamResponse(
|
||||||
|
status=200,
|
||||||
|
headers={
|
||||||
|
'Content-Type': content_type,
|
||||||
|
'Cache-Control': 'no-cache, no-store',
|
||||||
|
'Connection': 'keep-alive',
|
||||||
|
'icy-name': 'XMPP Radio Bot',
|
||||||
|
}
|
||||||
|
)
|
||||||
|
await response.prepare(request)
|
||||||
|
|
||||||
|
self._clients.add(response)
|
||||||
|
self.state.listeners = len(self._clients)
|
||||||
|
|
||||||
|
try:
|
||||||
|
while not response.task.done() and not self._shutdown:
|
||||||
|
await asyncio.sleep(1)
|
||||||
|
except asyncio.CancelledError:
|
||||||
|
pass
|
||||||
|
finally:
|
||||||
|
self._clients.discard(response)
|
||||||
|
self.state.listeners = len(self._clients)
|
||||||
|
|
||||||
|
return response
|
||||||
|
|
||||||
|
async def _handle_playlist(self, request: web.Request) -> web.Response:
|
||||||
|
m3u = f"#EXTM3U\n#EXTINF:-1,XMPP Radio\n{self.stream_url}\n"
|
||||||
|
return web.Response(text=m3u, content_type='audio/x-mpegurl')
|
||||||
|
|
||||||
|
async def _handle_status(self, request: web.Request) -> web.Response:
|
||||||
|
import json
|
||||||
|
status = {
|
||||||
|
'is_playing': self.state.is_playing,
|
||||||
|
'title': self.state.title,
|
||||||
|
'artist': self.state.artist,
|
||||||
|
'listeners': self.state.listeners,
|
||||||
|
'stream_url': self.stream_url
|
||||||
|
}
|
||||||
|
return web.Response(text=json.dumps(status), content_type='application/json')
|
||||||
|
|
||||||
|
async def _handle_info(self, request: web.Request) -> web.Response:
|
||||||
|
html = f"<!DOCTYPE html><html><head><title>XMPP Radio</title></head><body><h1>XMPP Radio Bot</h1><p><b>Now Playing:</b> {self.state.title}</p><p><b>Listeners:</b> {self.state.listeners}</p><audio controls autoplay><source src=\"{self.mount_point}\" type=\"audio/mpeg\"></audio></body></html>"
|
||||||
|
return web.Response(text=html, content_type='text/html')
|
||||||
Reference in New Issue
Block a user