Files
xmpp-radio-tower/station_parser.py
2025-12-13 18:28:18 +00:00

197 lines
6.9 KiB
Python

#!/usr/bin/env python3
"""
Radio Station Playlist Parser.
Supports parsing of M3U, M3U8, and PLS playlist formats to resolving
actual stream URLs. Handles recursive playlists and HLS stream detection.
"""
import re
import asyncio
import aiohttp
from urllib.parse import urljoin, urlparse
from typing import Optional, List, Dict, Tuple
import logging
logger = logging.getLogger(__name__)
class StationParser:
"""Parses playlist files to extract the underlying media stream URL."""
def __init__(self, timeout: int = 10):
self.timeout = aiohttp.ClientTimeout(total=timeout)
self.user_agent = "XMPP-RadioBot/1.0"
async def fetch_url(self, url: str) -> Optional[str]:
headers = {"User-Agent": self.user_agent}
try:
async with aiohttp.ClientSession(timeout=self.timeout) as session:
async with session.get(url, headers=headers, allow_redirects=True) as response:
if response.status == 200:
return await response.text()
logger.warning(f"Failed to fetch {url}: HTTP {response.status}")
except asyncio.TimeoutError:
logger.error(f"Timeout fetching {url}")
except aiohttp.ClientError as e:
logger.error(f"Error fetching {url}: {e}")
return None
def is_playlist_url(self, url: str) -> bool:
"""Checks if a URL points to a supported playlist format."""
parsed = urlparse(url.lower())
path = parsed.path
return any(path.endswith(ext) for ext in ['.m3u', '.m3u8', '.pls'])
def parse_m3u(self, content: str, base_url: str = "") -> List[Dict[str, str]]:
"""Parses M3U/M3U8 content."""
streams = []
lines = content.strip().split('\n')
current_info = {}
for line in lines:
line = line.strip()
if not line:
continue
if line.startswith('#EXTM3U'):
continue
if line.startswith('#EXTINF:'):
match = re.match(r'#EXTINF:(-?\d+)(?:\s+(.+))?,(.+)', line)
if match:
current_info = {
'duration': match.group(1),
'attributes': match.group(2) or '',
'title': match.group(3).strip()
}
continue
# Handle M3U8 stream attributes (bandwidth, resolution)
if line.startswith('#EXT-X-STREAM-INF:'):
attrs = self._parse_attributes(line[18:])
current_info = {
'bandwidth': attrs.get('BANDWIDTH', ''),
'resolution': attrs.get('RESOLUTION', ''),
'codecs': attrs.get('CODECS', '')
}
continue
if line.startswith('#'):
continue
url = line
if not url.startswith(('http://', 'https://', 'rtmp://', 'rtsp://')):
if base_url:
url = urljoin(base_url, url)
stream_entry = {'url': url}
stream_entry.update(current_info)
streams.append(stream_entry)
current_info = {}
return streams
def parse_pls(self, content: str) -> List[Dict[str, str]]:
"""Parses PLS INI-style content."""
streams = []
entries = {}
for line in content.split('\n'):
line = line.strip()
if '=' in line:
key, value = line.split('=', 1)
key = key.lower()
match = re.match(r'(file|title|length)(\d+)', key)
if match:
field, num = match.groups()
if num not in entries:
entries[num] = {}
entries[num][field] = value
for num in sorted(entries.keys(), key=int):
entry = entries[num]
if 'file' in entry:
streams.append({
'url': entry['file'],
'title': entry.get('title', ''),
'duration': entry.get('length', '-1')
})
return streams
def _parse_attributes(self, attr_string: str) -> Dict[str, str]:
"""Helper to parse key="value" attributes in M3U8 tags."""
attrs = {}
pattern = r'([A-Z-]+)=(?:"([^"]+)"|([^,]+))'
for match in re.finditer(pattern, attr_string):
key = match.group(1)
value = match.group(2) or match.group(3)
attrs[key] = value
return attrs
async def resolve_stream_url(self, url: str) -> Tuple[Optional[str], Optional[Dict]]:
"""
Recursively resolves a URL until a raw stream is found.
Returns:
Tuple containing the resolved URL and its metadata.
"""
if not self.is_playlist_url(url):
return url, {'original_url': url}
content = await self.fetch_url(url)
if not content:
return None, None
# Return the URL immediately if it's an HLS master playlist, as ffmpeg handles these.
if '#EXT-X-TARGETDURATION' in content:
logger.info(f"Detected HLS Media Playlist: {url}")
return url, {'original_url': url, 'is_hls': True}
if url.lower().endswith('.pls'):
streams = self.parse_pls(content)
else:
streams = self.parse_m3u(content, url)
if not streams:
return None, None
# Default to the first stream, but prefer higher bandwidth for adaptive streams.
best_stream = streams[0]
if url.lower().endswith('.m3u8'):
streams_with_bandwidth = [s for s in streams if s.get('bandwidth')]
if streams_with_bandwidth:
best_stream = max(streams_with_bandwidth,
key=lambda x: int(x.get('bandwidth', 0)))
stream_url = best_stream['url']
# Recurse if the result is another playlist (nested playlists).
if self.is_playlist_url(stream_url):
return await self.resolve_stream_url(stream_url)
return stream_url, best_stream
async def get_stream_info(self, url: str) -> Optional[Dict]:
content = await self.fetch_url(url)
if not content:
return None
info = {
'url': url,
'is_playlist': self.is_playlist_url(url),
'streams': []
}
if self.is_playlist_url(url):
if url.lower().endswith('.pls'):
info['streams'] = self.parse_pls(content)
else:
info['streams'] = self.parse_m3u(content, url)
return info