197 lines
6.9 KiB
Python
197 lines
6.9 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Radio Station Playlist Parser.
|
|
|
|
Supports parsing of M3U, M3U8, and PLS playlist formats to resolving
|
|
actual stream URLs. Handles recursive playlists and HLS stream detection.
|
|
"""
|
|
|
|
import re
|
|
import asyncio
|
|
import aiohttp
|
|
from urllib.parse import urljoin, urlparse
|
|
from typing import Optional, List, Dict, Tuple
|
|
import logging
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class StationParser:
|
|
"""Parses playlist files to extract the underlying media stream URL."""
|
|
|
|
def __init__(self, timeout: int = 10):
|
|
self.timeout = aiohttp.ClientTimeout(total=timeout)
|
|
self.user_agent = "XMPP-RadioBot/1.0"
|
|
|
|
async def fetch_url(self, url: str) -> Optional[str]:
|
|
headers = {"User-Agent": self.user_agent}
|
|
try:
|
|
async with aiohttp.ClientSession(timeout=self.timeout) as session:
|
|
async with session.get(url, headers=headers, allow_redirects=True) as response:
|
|
if response.status == 200:
|
|
return await response.text()
|
|
logger.warning(f"Failed to fetch {url}: HTTP {response.status}")
|
|
except asyncio.TimeoutError:
|
|
logger.error(f"Timeout fetching {url}")
|
|
except aiohttp.ClientError as e:
|
|
logger.error(f"Error fetching {url}: {e}")
|
|
return None
|
|
|
|
def is_playlist_url(self, url: str) -> bool:
|
|
"""Checks if a URL points to a supported playlist format."""
|
|
parsed = urlparse(url.lower())
|
|
path = parsed.path
|
|
return any(path.endswith(ext) for ext in ['.m3u', '.m3u8', '.pls'])
|
|
|
|
def parse_m3u(self, content: str, base_url: str = "") -> List[Dict[str, str]]:
|
|
"""Parses M3U/M3U8 content."""
|
|
streams = []
|
|
lines = content.strip().split('\n')
|
|
|
|
current_info = {}
|
|
|
|
for line in lines:
|
|
line = line.strip()
|
|
|
|
if not line:
|
|
continue
|
|
|
|
if line.startswith('#EXTM3U'):
|
|
continue
|
|
|
|
if line.startswith('#EXTINF:'):
|
|
match = re.match(r'#EXTINF:(-?\d+)(?:\s+(.+))?,(.+)', line)
|
|
if match:
|
|
current_info = {
|
|
'duration': match.group(1),
|
|
'attributes': match.group(2) or '',
|
|
'title': match.group(3).strip()
|
|
}
|
|
continue
|
|
|
|
# Handle M3U8 stream attributes (bandwidth, resolution)
|
|
if line.startswith('#EXT-X-STREAM-INF:'):
|
|
attrs = self._parse_attributes(line[18:])
|
|
current_info = {
|
|
'bandwidth': attrs.get('BANDWIDTH', ''),
|
|
'resolution': attrs.get('RESOLUTION', ''),
|
|
'codecs': attrs.get('CODECS', '')
|
|
}
|
|
continue
|
|
|
|
if line.startswith('#'):
|
|
continue
|
|
|
|
url = line
|
|
if not url.startswith(('http://', 'https://', 'rtmp://', 'rtsp://')):
|
|
if base_url:
|
|
url = urljoin(base_url, url)
|
|
|
|
stream_entry = {'url': url}
|
|
stream_entry.update(current_info)
|
|
streams.append(stream_entry)
|
|
current_info = {}
|
|
|
|
return streams
|
|
|
|
def parse_pls(self, content: str) -> List[Dict[str, str]]:
|
|
"""Parses PLS INI-style content."""
|
|
streams = []
|
|
entries = {}
|
|
|
|
for line in content.split('\n'):
|
|
line = line.strip()
|
|
if '=' in line:
|
|
key, value = line.split('=', 1)
|
|
key = key.lower()
|
|
|
|
match = re.match(r'(file|title|length)(\d+)', key)
|
|
if match:
|
|
field, num = match.groups()
|
|
if num not in entries:
|
|
entries[num] = {}
|
|
entries[num][field] = value
|
|
|
|
for num in sorted(entries.keys(), key=int):
|
|
entry = entries[num]
|
|
if 'file' in entry:
|
|
streams.append({
|
|
'url': entry['file'],
|
|
'title': entry.get('title', ''),
|
|
'duration': entry.get('length', '-1')
|
|
})
|
|
|
|
return streams
|
|
|
|
def _parse_attributes(self, attr_string: str) -> Dict[str, str]:
|
|
"""Helper to parse key="value" attributes in M3U8 tags."""
|
|
attrs = {}
|
|
pattern = r'([A-Z-]+)=(?:"([^"]+)"|([^,]+))'
|
|
for match in re.finditer(pattern, attr_string):
|
|
key = match.group(1)
|
|
value = match.group(2) or match.group(3)
|
|
attrs[key] = value
|
|
return attrs
|
|
|
|
async def resolve_stream_url(self, url: str) -> Tuple[Optional[str], Optional[Dict]]:
|
|
"""
|
|
Recursively resolves a URL until a raw stream is found.
|
|
|
|
Returns:
|
|
Tuple containing the resolved URL and its metadata.
|
|
"""
|
|
if not self.is_playlist_url(url):
|
|
return url, {'original_url': url}
|
|
|
|
content = await self.fetch_url(url)
|
|
if not content:
|
|
return None, None
|
|
|
|
# Return the URL immediately if it's an HLS master playlist, as ffmpeg handles these.
|
|
if '#EXT-X-TARGETDURATION' in content:
|
|
logger.info(f"Detected HLS Media Playlist: {url}")
|
|
return url, {'original_url': url, 'is_hls': True}
|
|
|
|
if url.lower().endswith('.pls'):
|
|
streams = self.parse_pls(content)
|
|
else:
|
|
streams = self.parse_m3u(content, url)
|
|
|
|
if not streams:
|
|
return None, None
|
|
|
|
# Default to the first stream, but prefer higher bandwidth for adaptive streams.
|
|
best_stream = streams[0]
|
|
|
|
if url.lower().endswith('.m3u8'):
|
|
streams_with_bandwidth = [s for s in streams if s.get('bandwidth')]
|
|
if streams_with_bandwidth:
|
|
best_stream = max(streams_with_bandwidth,
|
|
key=lambda x: int(x.get('bandwidth', 0)))
|
|
|
|
stream_url = best_stream['url']
|
|
|
|
# Recurse if the result is another playlist (nested playlists).
|
|
if self.is_playlist_url(stream_url):
|
|
return await self.resolve_stream_url(stream_url)
|
|
|
|
return stream_url, best_stream
|
|
|
|
async def get_stream_info(self, url: str) -> Optional[Dict]:
|
|
content = await self.fetch_url(url)
|
|
if not content:
|
|
return None
|
|
|
|
info = {
|
|
'url': url,
|
|
'is_playlist': self.is_playlist_url(url),
|
|
'streams': []
|
|
}
|
|
|
|
if self.is_playlist_url(url):
|
|
if url.lower().endswith('.pls'):
|
|
info['streams'] = self.parse_pls(content)
|
|
else:
|
|
info['streams'] = self.parse_m3u(content, url)
|
|
|
|
return info |