Files
xmpp-radio-tower/station_parser.py
2025-12-17 17:16:36 +00:00

175 lines
6.0 KiB
Python

#!/usr/bin/env python3
import re
import asyncio
import aiohttp
from urllib.parse import urljoin, urlparse
from typing import Optional, List, Dict, Tuple
import logging
logger = logging.getLogger(__name__)
class StationParser:
def __init__(self, timeout: int = 10):
self.timeout = aiohttp.ClientTimeout(total=timeout)
self.user_agent = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/142.0.0.0 Safari/537.36"
async def fetch_url(self, url: str) -> Optional[str]:
headers = {"User-Agent": self.user_agent}
try:
async with aiohttp.ClientSession(timeout=self.timeout) as session:
async with session.get(url, headers=headers, allow_redirects=True) as response:
if response.status == 200:
return await response.text()
logger.warning(f"Failed to fetch {url}: HTTP {response.status}")
except asyncio.TimeoutError:
logger.error(f"Timeout fetching {url}")
except aiohttp.ClientError as e:
logger.error(f"Error fetching {url}: {e}")
return None
def is_playlist_url(self, url: str) -> bool:
parsed = urlparse(url.lower())
path = parsed.path
return any(path.endswith(ext) for ext in ['.m3u', '.m3u8', '.pls'])
def parse_m3u(self, content: str, base_url: str = "") -> List[Dict[str, str]]:
streams = []
lines = content.strip().split('\n')
current_info = {}
for line in lines:
line = line.strip()
if not line:
continue
if line.startswith('#EXTM3U'):
continue
if line.startswith('#EXTINF:'):
match = re.match(r'#EXTINF:(-?\d+)(?:\s+(.+))?,(.+)', line)
if match:
current_info = {
'duration': match.group(1),
'attributes': match.group(2) or '',
'title': match.group(3).strip()
}
continue
if line.startswith('#EXT-X-STREAM-INF:'):
attrs = self._parse_attributes(line[18:])
current_info = {
'bandwidth': attrs.get('BANDWIDTH', ''),
'resolution': attrs.get('RESOLUTION', ''),
'codecs': attrs.get('CODECS', '')
}
continue
if line.startswith('#'):
continue
url = line
if not url.startswith(('http://', 'https://', 'rtmp://', 'rtsp://')):
if base_url:
url = urljoin(base_url, url)
stream_entry = {'url': url}
stream_entry.update(current_info)
streams.append(stream_entry)
current_info = {}
return streams
def parse_pls(self, content: str) -> List[Dict[str, str]]:
streams = []
entries = {}
for line in content.split('\n'):
line = line.strip()
if '=' in line:
key, value = line.split('=', 1)
key = key.lower()
match = re.match(r'(file|title|length)(\d+)', key)
if match:
field, num = match.groups()
if num not in entries:
entries[num] = {}
entries[num][field] = value
for num in sorted(entries.keys(), key=int):
entry = entries[num]
if 'file' in entry:
streams.append({
'url': entry['file'],
'title': entry.get('title', ''),
'duration': entry.get('length', '-1')
})
return streams
def _parse_attributes(self, attr_string: str) -> Dict[str, str]:
attrs = {}
pattern = r'([A-Z-]+)=(?:"([^"]+)"|([^,]+))'
for match in re.finditer(pattern, attr_string):
key = match.group(1)
value = match.group(2) or match.group(3)
attrs[key] = value
return attrs
async def resolve_stream_url(self, url: str) -> Tuple[Optional[str], Optional[Dict]]:
if not self.is_playlist_url(url):
return url, {'original_url': url}
content = await self.fetch_url(url)
if not content:
return None, None
if '#EXT-X-TARGETDURATION' in content:
logger.info(f"Detected HLS Media Playlist: {url}")
return url, {'original_url': url, 'is_hls': True}
if url.lower().endswith('.pls'):
streams = self.parse_pls(content)
else:
streams = self.parse_m3u(content, url)
if not streams:
return None, None
best_stream = streams[0]
if url.lower().endswith('.m3u8'):
streams_with_bandwidth = [s for s in streams if s.get('bandwidth')]
if streams_with_bandwidth:
best_stream = max(streams_with_bandwidth,
key=lambda x: int(x.get('bandwidth', 0)))
stream_url = best_stream['url']
if self.is_playlist_url(stream_url):
return await self.resolve_stream_url(stream_url)
return stream_url, best_stream
async def get_stream_info(self, url: str) -> Optional[Dict]:
content = await self.fetch_url(url)
if not content:
return None
info = {
'url': url,
'is_playlist': self.is_playlist_url(url),
'streams': []
}
if self.is_playlist_url(url):
if url.lower().endswith('.pls'):
info['streams'] = self.parse_pls(content)
else:
info['streams'] = self.parse_m3u(content, url)
return info