Add station_parser.py
This commit is contained in:
197
station_parser.py
Normal file
197
station_parser.py
Normal file
@@ -0,0 +1,197 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Radio Station Playlist Parser.
|
||||
|
||||
Supports parsing of M3U, M3U8, and PLS playlist formats to resolving
|
||||
actual stream URLs. Handles recursive playlists and HLS stream detection.
|
||||
"""
|
||||
|
||||
import re
|
||||
import asyncio
|
||||
import aiohttp
|
||||
from urllib.parse import urljoin, urlparse
|
||||
from typing import Optional, List, Dict, Tuple
|
||||
import logging
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class StationParser:
|
||||
"""Parses playlist files to extract the underlying media stream URL."""
|
||||
|
||||
def __init__(self, timeout: int = 10):
|
||||
self.timeout = aiohttp.ClientTimeout(total=timeout)
|
||||
self.user_agent = "XMPP-RadioBot/1.0"
|
||||
|
||||
async def fetch_url(self, url: str) -> Optional[str]:
|
||||
headers = {"User-Agent": self.user_agent}
|
||||
try:
|
||||
async with aiohttp.ClientSession(timeout=self.timeout) as session:
|
||||
async with session.get(url, headers=headers, allow_redirects=True) as response:
|
||||
if response.status == 200:
|
||||
return await response.text()
|
||||
logger.warning(f"Failed to fetch {url}: HTTP {response.status}")
|
||||
except asyncio.TimeoutError:
|
||||
logger.error(f"Timeout fetching {url}")
|
||||
except aiohttp.ClientError as e:
|
||||
logger.error(f"Error fetching {url}: {e}")
|
||||
return None
|
||||
|
||||
def is_playlist_url(self, url: str) -> bool:
|
||||
"""Checks if a URL points to a supported playlist format."""
|
||||
parsed = urlparse(url.lower())
|
||||
path = parsed.path
|
||||
return any(path.endswith(ext) for ext in ['.m3u', '.m3u8', '.pls'])
|
||||
|
||||
def parse_m3u(self, content: str, base_url: str = "") -> List[Dict[str, str]]:
|
||||
"""Parses M3U/M3U8 content."""
|
||||
streams = []
|
||||
lines = content.strip().split('\n')
|
||||
|
||||
current_info = {}
|
||||
|
||||
for line in lines:
|
||||
line = line.strip()
|
||||
|
||||
if not line:
|
||||
continue
|
||||
|
||||
if line.startswith('#EXTM3U'):
|
||||
continue
|
||||
|
||||
if line.startswith('#EXTINF:'):
|
||||
match = re.match(r'#EXTINF:(-?\d+)(?:\s+(.+))?,(.+)', line)
|
||||
if match:
|
||||
current_info = {
|
||||
'duration': match.group(1),
|
||||
'attributes': match.group(2) or '',
|
||||
'title': match.group(3).strip()
|
||||
}
|
||||
continue
|
||||
|
||||
# Handle M3U8 stream attributes (bandwidth, resolution)
|
||||
if line.startswith('#EXT-X-STREAM-INF:'):
|
||||
attrs = self._parse_attributes(line[18:])
|
||||
current_info = {
|
||||
'bandwidth': attrs.get('BANDWIDTH', ''),
|
||||
'resolution': attrs.get('RESOLUTION', ''),
|
||||
'codecs': attrs.get('CODECS', '')
|
||||
}
|
||||
continue
|
||||
|
||||
if line.startswith('#'):
|
||||
continue
|
||||
|
||||
url = line
|
||||
if not url.startswith(('http://', 'https://', 'rtmp://', 'rtsp://')):
|
||||
if base_url:
|
||||
url = urljoin(base_url, url)
|
||||
|
||||
stream_entry = {'url': url}
|
||||
stream_entry.update(current_info)
|
||||
streams.append(stream_entry)
|
||||
current_info = {}
|
||||
|
||||
return streams
|
||||
|
||||
def parse_pls(self, content: str) -> List[Dict[str, str]]:
|
||||
"""Parses PLS INI-style content."""
|
||||
streams = []
|
||||
entries = {}
|
||||
|
||||
for line in content.split('\n'):
|
||||
line = line.strip()
|
||||
if '=' in line:
|
||||
key, value = line.split('=', 1)
|
||||
key = key.lower()
|
||||
|
||||
match = re.match(r'(file|title|length)(\d+)', key)
|
||||
if match:
|
||||
field, num = match.groups()
|
||||
if num not in entries:
|
||||
entries[num] = {}
|
||||
entries[num][field] = value
|
||||
|
||||
for num in sorted(entries.keys(), key=int):
|
||||
entry = entries[num]
|
||||
if 'file' in entry:
|
||||
streams.append({
|
||||
'url': entry['file'],
|
||||
'title': entry.get('title', ''),
|
||||
'duration': entry.get('length', '-1')
|
||||
})
|
||||
|
||||
return streams
|
||||
|
||||
def _parse_attributes(self, attr_string: str) -> Dict[str, str]:
|
||||
"""Helper to parse key="value" attributes in M3U8 tags."""
|
||||
attrs = {}
|
||||
pattern = r'([A-Z-]+)=(?:"([^"]+)"|([^,]+))'
|
||||
for match in re.finditer(pattern, attr_string):
|
||||
key = match.group(1)
|
||||
value = match.group(2) or match.group(3)
|
||||
attrs[key] = value
|
||||
return attrs
|
||||
|
||||
async def resolve_stream_url(self, url: str) -> Tuple[Optional[str], Optional[Dict]]:
|
||||
"""
|
||||
Recursively resolves a URL until a raw stream is found.
|
||||
|
||||
Returns:
|
||||
Tuple containing the resolved URL and its metadata.
|
||||
"""
|
||||
if not self.is_playlist_url(url):
|
||||
return url, {'original_url': url}
|
||||
|
||||
content = await self.fetch_url(url)
|
||||
if not content:
|
||||
return None, None
|
||||
|
||||
# Return the URL immediately if it's an HLS master playlist, as ffmpeg handles these.
|
||||
if '#EXT-X-TARGETDURATION' in content:
|
||||
logger.info(f"Detected HLS Media Playlist: {url}")
|
||||
return url, {'original_url': url, 'is_hls': True}
|
||||
|
||||
if url.lower().endswith('.pls'):
|
||||
streams = self.parse_pls(content)
|
||||
else:
|
||||
streams = self.parse_m3u(content, url)
|
||||
|
||||
if not streams:
|
||||
return None, None
|
||||
|
||||
# Default to the first stream, but prefer higher bandwidth for adaptive streams.
|
||||
best_stream = streams[0]
|
||||
|
||||
if url.lower().endswith('.m3u8'):
|
||||
streams_with_bandwidth = [s for s in streams if s.get('bandwidth')]
|
||||
if streams_with_bandwidth:
|
||||
best_stream = max(streams_with_bandwidth,
|
||||
key=lambda x: int(x.get('bandwidth', 0)))
|
||||
|
||||
stream_url = best_stream['url']
|
||||
|
||||
# Recurse if the result is another playlist (nested playlists).
|
||||
if self.is_playlist_url(stream_url):
|
||||
return await self.resolve_stream_url(stream_url)
|
||||
|
||||
return stream_url, best_stream
|
||||
|
||||
async def get_stream_info(self, url: str) -> Optional[Dict]:
|
||||
content = await self.fetch_url(url)
|
||||
if not content:
|
||||
return None
|
||||
|
||||
info = {
|
||||
'url': url,
|
||||
'is_playlist': self.is_playlist_url(url),
|
||||
'streams': []
|
||||
}
|
||||
|
||||
if self.is_playlist_url(url):
|
||||
if url.lower().endswith('.pls'):
|
||||
info['streams'] = self.parse_pls(content)
|
||||
else:
|
||||
info['streams'] = self.parse_m3u(content, url)
|
||||
|
||||
return info
|
||||
Reference in New Issue
Block a user