diff --git a/station_parser.py b/station_parser.py new file mode 100644 index 0000000..fd2f18f --- /dev/null +++ b/station_parser.py @@ -0,0 +1,197 @@ +#!/usr/bin/env python3 +""" +Radio Station Playlist Parser. + +Supports parsing of M3U, M3U8, and PLS playlist formats to resolving +actual stream URLs. Handles recursive playlists and HLS stream detection. +""" + +import re +import asyncio +import aiohttp +from urllib.parse import urljoin, urlparse +from typing import Optional, List, Dict, Tuple +import logging + +logger = logging.getLogger(__name__) + + +class StationParser: + """Parses playlist files to extract the underlying media stream URL.""" + + def __init__(self, timeout: int = 10): + self.timeout = aiohttp.ClientTimeout(total=timeout) + self.user_agent = "XMPP-RadioBot/1.0" + + async def fetch_url(self, url: str) -> Optional[str]: + headers = {"User-Agent": self.user_agent} + try: + async with aiohttp.ClientSession(timeout=self.timeout) as session: + async with session.get(url, headers=headers, allow_redirects=True) as response: + if response.status == 200: + return await response.text() + logger.warning(f"Failed to fetch {url}: HTTP {response.status}") + except asyncio.TimeoutError: + logger.error(f"Timeout fetching {url}") + except aiohttp.ClientError as e: + logger.error(f"Error fetching {url}: {e}") + return None + + def is_playlist_url(self, url: str) -> bool: + """Checks if a URL points to a supported playlist format.""" + parsed = urlparse(url.lower()) + path = parsed.path + return any(path.endswith(ext) for ext in ['.m3u', '.m3u8', '.pls']) + + def parse_m3u(self, content: str, base_url: str = "") -> List[Dict[str, str]]: + """Parses M3U/M3U8 content.""" + streams = [] + lines = content.strip().split('\n') + + current_info = {} + + for line in lines: + line = line.strip() + + if not line: + continue + + if line.startswith('#EXTM3U'): + continue + + if line.startswith('#EXTINF:'): + match = re.match(r'#EXTINF:(-?\d+)(?:\s+(.+))?,(.+)', line) + if match: + current_info = { + 'duration': match.group(1), + 'attributes': match.group(2) or '', + 'title': match.group(3).strip() + } + continue + + # Handle M3U8 stream attributes (bandwidth, resolution) + if line.startswith('#EXT-X-STREAM-INF:'): + attrs = self._parse_attributes(line[18:]) + current_info = { + 'bandwidth': attrs.get('BANDWIDTH', ''), + 'resolution': attrs.get('RESOLUTION', ''), + 'codecs': attrs.get('CODECS', '') + } + continue + + if line.startswith('#'): + continue + + url = line + if not url.startswith(('http://', 'https://', 'rtmp://', 'rtsp://')): + if base_url: + url = urljoin(base_url, url) + + stream_entry = {'url': url} + stream_entry.update(current_info) + streams.append(stream_entry) + current_info = {} + + return streams + + def parse_pls(self, content: str) -> List[Dict[str, str]]: + """Parses PLS INI-style content.""" + streams = [] + entries = {} + + for line in content.split('\n'): + line = line.strip() + if '=' in line: + key, value = line.split('=', 1) + key = key.lower() + + match = re.match(r'(file|title|length)(\d+)', key) + if match: + field, num = match.groups() + if num not in entries: + entries[num] = {} + entries[num][field] = value + + for num in sorted(entries.keys(), key=int): + entry = entries[num] + if 'file' in entry: + streams.append({ + 'url': entry['file'], + 'title': entry.get('title', ''), + 'duration': entry.get('length', '-1') + }) + + return streams + + def _parse_attributes(self, attr_string: str) -> Dict[str, str]: + """Helper to parse key="value" attributes in M3U8 tags.""" + attrs = {} + pattern = r'([A-Z-]+)=(?:"([^"]+)"|([^,]+))' + for match in re.finditer(pattern, attr_string): + key = match.group(1) + value = match.group(2) or match.group(3) + attrs[key] = value + return attrs + + async def resolve_stream_url(self, url: str) -> Tuple[Optional[str], Optional[Dict]]: + """ + Recursively resolves a URL until a raw stream is found. + + Returns: + Tuple containing the resolved URL and its metadata. + """ + if not self.is_playlist_url(url): + return url, {'original_url': url} + + content = await self.fetch_url(url) + if not content: + return None, None + + # Return the URL immediately if it's an HLS master playlist, as ffmpeg handles these. + if '#EXT-X-TARGETDURATION' in content: + logger.info(f"Detected HLS Media Playlist: {url}") + return url, {'original_url': url, 'is_hls': True} + + if url.lower().endswith('.pls'): + streams = self.parse_pls(content) + else: + streams = self.parse_m3u(content, url) + + if not streams: + return None, None + + # Default to the first stream, but prefer higher bandwidth for adaptive streams. + best_stream = streams[0] + + if url.lower().endswith('.m3u8'): + streams_with_bandwidth = [s for s in streams if s.get('bandwidth')] + if streams_with_bandwidth: + best_stream = max(streams_with_bandwidth, + key=lambda x: int(x.get('bandwidth', 0))) + + stream_url = best_stream['url'] + + # Recurse if the result is another playlist (nested playlists). + if self.is_playlist_url(stream_url): + return await self.resolve_stream_url(stream_url) + + return stream_url, best_stream + + async def get_stream_info(self, url: str) -> Optional[Dict]: + content = await self.fetch_url(url) + if not content: + return None + + info = { + 'url': url, + 'is_playlist': self.is_playlist_url(url), + 'streams': [] + } + + if self.is_playlist_url(url): + if url.lower().endswith('.pls'): + info['streams'] = self.parse_pls(content) + else: + info['streams'] = self.parse_m3u(content, url) + + return info \ No newline at end of file