From 12a9f859aac6e754174f27ba4d3d33fffa9295d9 Mon Sep 17 00:00:00 2001 From: just n Date: Sat, 13 Dec 2025 18:29:29 +0000 Subject: [PATCH] Add bot.py --- bot.py | 1348 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 1348 insertions(+) create mode 100644 bot.py diff --git a/bot.py b/bot.py new file mode 100644 index 0000000..b5c5839 --- /dev/null +++ b/bot.py @@ -0,0 +1,1348 @@ +#!/usr/bin/env python3 +""" +Main Application Entry Point. + +This module initializes the XMPP bot, loads configuration, manages plugins +(OMEMO, MUC, Jingle), and orchestrates the interaction between the audio +streamer, playlist manager, and XMPP interface. +""" + +import os +import sys +import asyncio +import configparser +import logging +import json +import gc +import time +import weakref +from pathlib import Path +from typing import Optional, Dict, List, Set, Any, FrozenSet +from dataclasses import dataclass +from collections import OrderedDict +import xml.etree.ElementTree as ET + +import slixmpp +from slixmpp.jid import JID +from slixmpp.stanza import Iq, Message +from slixmpp.xmlstream.handler import Callback, CoroutineCallback +from slixmpp.xmlstream.matcher import MatchXPath +from slixmpp.plugins import register_plugin + +from audio_streamer import AudioStreamer +from playlist_manager import PlaylistManager, PlaybackMode, Track +from station_parser import StationParser +from ytdlp_handler import YtDlpHandler, SearchResult + +try: + from jingle_webrtc import JingleWebRTCHandler, JingleSession, CallState, AIORTC_AVAILABLE + JINGLE_AVAILABLE = AIORTC_AVAILABLE +except ImportError: + JINGLE_AVAILABLE = False + JingleWebRTCHandler = None + +try: + from omemo.storage import Just, Maybe, Nothing, Storage + from omemo.types import DeviceInformation, JSONType + from slixmpp_omemo import TrustLevel, XEP_0384 + OMEMO_AVAILABLE = True +except ImportError: + OMEMO_AVAILABLE = False + +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(levelname)s - %(name)s - %(message)s' +) +logger = logging.getLogger(__name__) + +NS_JINGLE = "urn:xmpp:jingle:1" +NS_JINGLE_MSG = "urn:xmpp:jingle-message:0" +NS_HINTS = "urn:xmpp:hints" + + +class LRUCache(OrderedDict): + """Simple LRU Cache implementation for storing search results.""" + def __init__(self, maxsize=25): + super().__init__() + self.maxsize = maxsize + + def __setitem__(self, key, value): + if key in self: + self.move_to_end(key) + super().__setitem__(key, value) + if len(self) > self.maxsize: + oldest = next(iter(self)) + del self[oldest] + + def get(self, key, default=None): + if key in self: + self.move_to_end(key) + return self[key] + return default + + def clear_old(self, keep=10): + while len(self) > keep: + oldest = next(iter(self)) + del self[oldest] + + +if OMEMO_AVAILABLE: + class OMEMOStorageImpl(Storage): + """File-based storage backend for OMEMO keys.""" + + def __init__(self, json_file_path: Path) -> None: + super().__init__() + self.__json_file_path = json_file_path + self.__data: Dict[str, Any] = {} + try: + with open(self.__json_file_path, encoding="utf8") as f: + self.__data = json.load(f) + except Exception: + pass + + async def _load(self, key: str) -> Maybe[Any]: + if key in self.__data: + return Just(self.__data[key]) + return Nothing() + + async def _store(self, key: str, value: Any) -> None: + self.__data[key] = value + with open(self.__json_file_path, "w", encoding="utf8") as f: + json.dump(self.__data, f) + + async def _delete(self, key: str) -> None: + self.__data.pop(key, None) + with open(self.__json_file_path, "w", encoding="utf8") as f: + json.dump(self.__data, f) + + + class XEP_0384Impl(XEP_0384): + """OMEMO Plugin Configuration.""" + + default_config = { + "fallback_message": "This message is OMEMO encrypted.", + "json_file_path": None + } + + def __init__(self, *args: Any, **kwargs: Any) -> None: + super().__init__(*args, **kwargs) + self.__storage: Storage + + def plugin_init(self) -> None: + if not self.json_file_path: + raise Exception("JSON file path not specified for OMEMO storage.") + self.__storage = OMEMOStorageImpl(Path(self.json_file_path)) + super().plugin_init() + + @property + def storage(self) -> Storage: + return self.__storage + + @property + def _btbv_enabled(self) -> bool: + return True + + async def _devices_blindly_trusted( + self, + blindly_trusted: FrozenSet[DeviceInformation], + identifier: Optional[str] + ) -> None: + pass + + async def _prompt_manual_trust( + self, + manually_trusted: FrozenSet[DeviceInformation], + identifier: Optional[str] + ) -> None: + session_manager = await self.get_session_manager() + for device in manually_trusted: + await session_manager.set_trust( + device.bare_jid, + device.identity_key, + TrustLevel.TRUSTED.value + ) + + register_plugin(XEP_0384Impl) + + +@dataclass(slots=True) +class MessageContext: + """Encapsulates context about an incoming message for command handling.""" + sender: JID + sender_bare: str + mtype: str + is_encrypted: bool + is_muc: bool + room_jid: Optional[str] = None + nick: Optional[str] = None + + +@dataclass +class BotConfig: + jid: str + password: str + resource: str + music_directory: str + playback_mode: str + audio_formats: List[str] + default_volume: int + http_enabled: bool + stream_host: str + stream_port: int + stream_format: str + bitrate: int + mount_point: str + jingle_enabled: bool + stun_server: str + stations: Dict[str, str] + radio_room: str + radio_room_nick: str + management_room: str + management_room_nick: str + control_mode: str + allowed_jids: Set[str] + admin_jids: Set[str] + ytdlp_enabled: bool + ytdlp_format: str + ytdlp_max_results: int + ytdlp_max_filesize: int + ytdlp_download_subdir: str + omemo_enabled: bool + omemo_store_path: str + + +def load_config(config_file: str = "config.ini") -> BotConfig: + if not os.path.exists(config_file): + raise FileNotFoundError(f"Config file not found: {config_file}") + + c = configparser.ConfigParser() + c.read(config_file) + + stations = dict(c['Stations']) if 'Stations' in c else {} + + def parse_jids(v): + return {j.strip() for j in v.split(',') if j.strip()} if v else set() + + return BotConfig( + jid=c.get('XMPP', 'jid'), + password=c.get('XMPP', 'password'), + resource=c.get('XMPP', 'resource', fallback='RadioBot'), + music_directory=c.get('Radio', 'music_directory', fallback='./music'), + playback_mode=c.get('Radio', 'playback_mode', fallback='random'), + audio_formats=c.get('Radio', 'audio_formats', fallback='flac,opus,ogg,wav,mp3').split(','), + default_volume=c.getint('Radio', 'default_volume', fallback=80), + http_enabled=c.getboolean('Stream', 'http_enabled', fallback=False), + stream_host=c.get('Stream', 'stream_host', fallback='0.0.0.0'), + stream_port=c.getint('Stream', 'stream_port', fallback=8000), + stream_format=c.get('Stream', 'stream_format', fallback='mp3'), + bitrate=c.getint('Stream', 'bitrate', fallback=1), + mount_point=c.get('Stream', 'mount_point', fallback='/radio'), + jingle_enabled=c.getboolean('Jingle', 'enabled', fallback=True), + stun_server=c.get('Jingle', 'stun_server', fallback='stun:stun.l.google.com:19302'), + stations=stations, + radio_room=c.get('MUC', 'radio_room', fallback=''), + radio_room_nick=c.get('MUC', 'radio_room_nick', fallback='RadioBot'), + management_room=c.get('MUC', 'management_room', fallback=''), + management_room_nick=c.get('MUC', 'management_room_nick', fallback='RadioControl'), + control_mode=c.get('Access', 'control_mode', fallback='admins'), + allowed_jids=parse_jids(c.get('Access', 'allowed_jids', fallback='')), + admin_jids=parse_jids(c.get('Access', 'admin_jids', fallback='')), + ytdlp_enabled=c.getboolean('YtDlp', 'enabled', fallback=True), + ytdlp_format=c.get('YtDlp', 'format', fallback='bestaudio'), + ytdlp_max_results=c.getint('YtDlp', 'max_search_results', fallback=10), + ytdlp_max_filesize=c.getint('YtDlp', 'max_filesize', fallback=50), + ytdlp_download_subdir=c.get('YtDlp', 'download_subdir', fallback='downloads'), + omemo_enabled=c.getboolean('OMEMO', 'enabled', fallback=False), + omemo_store_path=c.get('OMEMO', 'store_path', fallback='radio_omemo_store.json') + ) + + +class RadioBot(slixmpp.ClientXMPP): + """ + Core XMPP Bot logic. + + Handles: + - Connection and Authentication + - OMEMO and Plaintext message routing + - Command processing + - Integration with PlaylistManager, AudioStreamer, and JingleWebRTCHandler + - Task management and memory monitoring + """ + + MEMORY_WARNING_THRESHOLD = 400 + MEMORY_CRITICAL_THRESHOLD = 600 + + def __init__(self, config: BotConfig): + super().__init__(f"{config.jid}/{config.resource}", config.password) + self.config = config + self.our_jid = f"{config.jid}/{config.resource}" + + self.omemo_active = config.omemo_enabled and OMEMO_AVAILABLE + + self.streamer = AudioStreamer( + config.stream_host, config.stream_port, config.mount_point, + config.stream_format, config.bitrate + ) if config.http_enabled else None + + self.playlist_manager = PlaylistManager(config.music_directory, config.audio_formats) + self.playlist_manager.set_playback_mode( + PlaybackMode.RANDOM if config.playback_mode == 'random' else PlaybackMode.SEQUENTIAL + ) + + self.station_parser = StationParser() + + self.ytdlp = YtDlpHandler( + os.path.join(config.music_directory, config.ytdlp_download_subdir), + config.ytdlp_format, config.ytdlp_max_filesize, config.ytdlp_max_results + ) if config.ytdlp_enabled else None + + self.jingle = None + if config.jingle_enabled and JINGLE_AVAILABLE: + self.jingle = JingleWebRTCHandler( + stun_server=config.stun_server, + send_transport_info_callback=self._send_transport_info, + on_track_end=self.on_track_end + ) + logger.info("Jingle enabled with WebRTC") + elif config.jingle_enabled: + logger.warning("Jingle enabled but 'aiortc' missing") + + self.current_station = None + self.is_playing_station = False + self._search_results = LRUCache(maxsize=25) + + self._track_transition_lock = asyncio.Lock() + self._last_track_end_time = 0 + + self._is_transitioning = False + + # Rapid skip detection variables + self._skip_count = 0 + self._skip_window_start = 0 + + # Task management set to allow proper cleanup on shutdown + self._managed_tasks: Set[asyncio.Task] = set() + self._gc_task: Optional[asyncio.Task] = None + self._memory_monitor_task: Optional[asyncio.Task] = None + + self.register_plugin('xep_0030') # Service Discovery + self.register_plugin('xep_0045') # MUC + self.register_plugin('xep_0115') # Entity Capabilities + self.register_plugin('xep_0199') # Ping + + if self.omemo_active: + self.register_plugin('xep_0085') + self.register_plugin('xep_0380') + + self.register_plugin( + "xep_0384", + {"json_file_path": config.omemo_store_path}, + module=sys.modules[__name__] + ) + logger.info("OMEMO encryption enabled") + elif config.omemo_enabled: + logger.warning("OMEMO enabled but libraries missing") + + if self.jingle: + for f in [NS_JINGLE, NS_JINGLE_MSG, + "urn:xmpp:jingle:apps:rtp:1", "urn:xmpp:jingle:apps:rtp:audio", + "urn:xmpp:jingle:transports:ice-udp:1", "urn:xmpp:jingle:apps:dtls:0"]: + self['xep_0030'].add_feature(f) + + self.add_event_handler("session_start", self.on_start) + + if self.omemo_active: + self.register_handler(CoroutineCallback( + "DirectMessagesOMEMO", + MatchXPath(f"{{{self.default_ns}}}message[@type='chat']"), + self._handle_direct_message + )) + self.register_handler(CoroutineCallback( + "GroupChatOMEMO", + MatchXPath(f"{{{self.default_ns}}}message[@type='groupchat']"), + self._handle_muc_message + )) + else: + self.add_event_handler("message", self._handle_plain_message) + self.add_event_handler("groupchat_message", self._handle_plain_muc_message) + + # Jingle Message Initiation (XEP-0353) + self.register_handler(Callback( + 'JinglePropose', + MatchXPath(f'{{jabber:client}}message/{{{NS_JINGLE_MSG}}}propose'), + self._handle_jingle_propose + )) + + self.register_handler(Callback( + 'JingleRetract', + MatchXPath(f'{{jabber:client}}message/{{{NS_JINGLE_MSG}}}retract'), + self._handle_jingle_retract + )) + + # Standard Jingle IQs + self.register_handler(Callback( + 'JingleIQ', + MatchXPath(f'{{jabber:client}}iq/{{{NS_JINGLE}}}jingle'), + self._handle_jingle_iq + )) + + if self.streamer and not self.jingle: + self.streamer.on_track_end = self.on_track_end + + self.commands = { + 'help': self.cmd_help, + 'play': self.cmd_play, + 'stop': self.cmd_stop, + 'next': self.cmd_next, + 'prev': self.cmd_prev, + 'np': self.cmd_np, + 'station': self.cmd_station, + 'stations': self.cmd_stations, + 'list': self.cmd_list, + 'shuffle': self.cmd_shuffle, + 'search': self.cmd_search, + 'dl': self.cmd_dl, + 'scan': self.cmd_scan, + 'url': self.cmd_url, + 'callers': self.cmd_callers, + 'hangup': self.cmd_hangup, + 'queue': self.cmd_queue, + 'q': self.cmd_queue, + 'addq': self.cmd_add_queue, + 'playnext': self.cmd_play_next, + 'clearq': self.cmd_clear_queue, + 'removeq': self.cmd_remove_queue, + 'shuffleq': self.cmd_shuffle_queue, + 'history': self.cmd_history, + 'mem': self.cmd_memory, + } + + def _create_task(self, coro) -> asyncio.Task: + """Helper to create asyncio tasks and track them for cleanup.""" + task = asyncio.create_task(coro) + self._managed_tasks.add(task) + task.add_done_callback(lambda t: self._managed_tasks.discard(t)) + return task + + def _cleanup_done_tasks(self): + """Removes completed tasks from the tracking set.""" + done_tasks = {t for t in self._managed_tasks if t.done()} + self._managed_tasks -= done_tasks + return len(done_tasks) + + async def on_start(self, event): + """Called when XMPP session is established.""" + logger.info("Bot Session Started") + self.send_presence() + await self.get_roster() + + if self.streamer: + await self.streamer.start() + logger.info(f"HTTP Stream: {self.streamer.stream_url}") + + if self.jingle: + await self.jingle.start_cleanup_task() + + count = await self.playlist_manager.scan_library() + self.playlist_manager.add_library_to_playlist() + logger.info(f"Library loaded: {count} tracks") + + for room, nick in [(self.config.radio_room, self.config.radio_room_nick), + (self.config.management_room, self.config.management_room_nick)]: + if room: + try: + await self.plugin['xep_0045'].join_muc(room, nick) + logger.info(f"Joined MUC: {room}") + except Exception as e: + logger.error(f"MUC Join Failed ({room}): {e}") + + if self.playlist_manager.library: + track = self.playlist_manager.next_track() + if track: + await self._play_track(track) + + self._gc_task = asyncio.create_task(self._periodic_gc()) + self._memory_monitor_task = asyncio.create_task(self._memory_monitor()) + + async def _periodic_gc(self): + """Aggressive garbage collection to handle heavy media object churn.""" + while True: + await asyncio.sleep(120) + + try: + self._search_results.clear_old(keep=10) + + cleaned = self._cleanup_done_tasks() + + gc.collect(generation=0) + gc.collect(generation=1) + gc.collect(generation=2) + + logger.debug(f"GC completed, cleaned {cleaned} tasks, remaining: {len(self._managed_tasks)}") + + except Exception as e: + logger.error(f"GC error: {e}") + + async def _memory_monitor(self): + """Monitors RSS memory usage and triggers emergency cleanup if critical.""" + try: + import resource + has_resource = True + except ImportError: + has_resource = False + logger.warning("resource module not available, memory monitoring limited") + + while True: + await asyncio.sleep(300) + + try: + if has_resource: + usage = resource.getrusage(resource.RUSAGE_SELF) + # Convert to MB (Linux returns KB, macOS returns bytes) + mem_mb = usage.ru_maxrss / 1024 + if sys.platform == 'darwin': + mem_mb = usage.ru_maxrss / (1024 * 1024) + + logger.info(f"Memory usage: {mem_mb:.1f} MB, Tasks: {len(self._managed_tasks)}") + + if mem_mb > self.MEMORY_WARNING_THRESHOLD: + logger.warning(f"High memory usage ({mem_mb:.1f} MB)") + + self._search_results.clear() + self._cleanup_done_tasks() + + gc.collect() + gc.collect() + gc.collect() + + if mem_mb > self.MEMORY_CRITICAL_THRESHOLD: + logger.error(f"Critical memory usage ({mem_mb:.1f} MB), aggressive cleanup") + + # Terminate oldest calls to save memory + if self.jingle and len(self.jingle.sessions) > 5: + oldest_sessions = list(self.jingle.sessions.keys())[:-5] + for sid in oldest_sessions: + await self.jingle.stop_session(sid) + + gc.collect() + + except Exception as e: + logger.error(f"Memory monitor error: {e}") + + def _get_memory_mb(self) -> float: + try: + import resource + usage = resource.getrusage(resource.RUSAGE_SELF) + mem_mb = usage.ru_maxrss / 1024 + if sys.platform == 'darwin': + mem_mb = usage.ru_maxrss / (1024 * 1024) + return mem_mb + except ImportError: + return 0.0 + + async def reply(self, ctx: MessageContext, text: str) -> None: + """Sends a reply matching the context (MUC vs Direct, OMEMO vs Plain).""" + if ctx.is_muc: + msg = self.make_message(mto=ctx.room_jid, mtype='groupchat') + msg["body"] = text + msg.send() + elif ctx.is_encrypted and self.omemo_active: + await self._send_encrypted(ctx.sender, ctx.mtype, text) + else: + await self._send_plain(ctx.sender, ctx.mtype, text) + + async def _send_plain(self, mto: JID, mtype: str, text: str) -> None: + msg = self.make_message(mto=mto, mtype=mtype) + msg["body"] = text + msg.send() + + async def _send_encrypted(self, mto: JID, mtype: str, text: str) -> None: + try: + xep_0384 = self["xep_0384"] + msg = self.make_message(mto=mto, mtype=mtype) + msg["body"] = text + encrypt_for: Set[JID] = {JID(mto)} + messages, errors = await xep_0384.encrypt_message(msg, encrypt_for) + + if errors: + logger.warning(f"OMEMO encryption errors: {errors}") + + for namespace, message in messages.items(): + message["eme"]["namespace"] = namespace + message["eme"]["name"] = self["xep_0380"].mechanisms[namespace] + message.send() + return + + except Exception as e: + logger.error(f"Encryption failed, falling back to plaintext: {e}") + await self._send_plain(mto, mtype, text) + + async def _handle_direct_message(self, stanza: Message) -> None: + """Handles OMEMO-aware direct messages.""" + try: + mfrom = stanza["from"] + mtype = stanza["type"] + + if mtype not in {"chat", "normal"}: + return + + if mfrom.bare == self.boundjid.bare: + return + + # Ignore Jingle signal messages treated as chat + if stanza.xml.find(f'{{{NS_JINGLE_MSG}}}propose') is not None: + return + if stanza.xml.find(f'{{{NS_JINGLE_MSG}}}proceed') is not None: + return + if stanza.xml.find(f'{{{NS_JINGLE_MSG}}}reject') is not None: + return + + xep_0384 = self["xep_0384"] + namespace = xep_0384.is_encrypted(stanza) + + body = None + is_encrypted = False + + if namespace: + try: + decrypted_msg, device_info = await xep_0384.decrypt_message(stanza) + if decrypted_msg.get("body"): + body = decrypted_msg["body"].strip() + is_encrypted = True + except Exception as e: + logger.error(f"Decryption failed from {mfrom}: {e}") + ctx = MessageContext( + sender=mfrom, sender_bare=str(mfrom.bare), + mtype=mtype, is_encrypted=True, is_muc=False + ) + await self.reply(ctx, f"Could not decrypt: {e}") + return + else: + if stanza["body"]: + body = stanza["body"].strip() + + if not body: + return + + ctx = MessageContext( + sender=mfrom, sender_bare=str(mfrom.bare), + mtype=mtype, is_encrypted=is_encrypted, is_muc=False + ) + + await self._process_direct_message(ctx, body) + + except Exception as e: + logger.error(f"Error in direct message handler: {e}", exc_info=True) + + async def _handle_muc_message(self, stanza: Message) -> None: + try: + room_jid = str(stanza['from'].bare) + sender_nick = stanza['mucnick'] + + if sender_nick == self.config.radio_room_nick: + return + if sender_nick == self.config.management_room_nick: + return + + xep_0384 = self["xep_0384"] + namespace = xep_0384.is_encrypted(stanza) + + body = None + + if namespace: + try: + decrypted_msg, device_info = await xep_0384.decrypt_message(stanza) + if decrypted_msg.get("body"): + body = decrypted_msg["body"].strip() + except Exception as e: + logger.error(f"Failed to decrypt MUC message: {e}") + return + else: + if stanza['body']: + body = stanza['body'].strip() + + if not body: + return + + ctx = MessageContext( + sender=stanza['from'], sender_bare=str(stanza['from'].bare), + mtype='groupchat', is_encrypted=False, is_muc=True, + room_jid=room_jid, nick=sender_nick + ) + + if body.startswith('!'): + await self._run_command(ctx, body[1:]) + + except Exception as e: + logger.error(f"Error in MUC message handler: {e}", exc_info=True) + + def _handle_plain_message(self, msg): + if msg['type'] not in ('chat', 'normal'): + return + if msg.xml.find(f'{{{NS_JINGLE_MSG}}}propose') is not None: + return + + body = msg['body'].strip() if msg['body'] else '' + if body: + ctx = MessageContext( + sender=msg['from'], sender_bare=str(msg['from'].bare), + mtype=msg['type'], is_encrypted=False, is_muc=False + ) + self._create_task(self._process_direct_message(ctx, body)) + + async def _handle_plain_muc_message(self, msg): + if msg['mucnick'] == self.config.radio_room_nick: + return + if msg['mucnick'] == self.config.management_room_nick: + return + + body = msg['body'].strip() if msg['body'] else '' + if body and body.startswith('!'): + ctx = MessageContext( + sender=msg['from'], sender_bare=str(msg['from'].bare), + mtype='groupchat', is_encrypted=False, is_muc=True, + room_jid=str(msg['from'].bare), nick=msg['mucnick'] + ) + await self._run_command(ctx, body[1:]) + + async def _process_direct_message(self, ctx: MessageContext, body: str): + if body.startswith('!'): + await self._run_command(ctx, body[1:]) + elif body.lower() in ('radio', 'help'): + await self.reply(ctx, "XMPP Radio Bot\nType !help for commands.\nCall me to listen!") + + async def _run_command(self, ctx: MessageContext, cmdline: str): + """Parses and executes bot commands.""" + parts = cmdline.split(None, 1) + cmd = parts[0].lower() + arg = parts[1] if len(parts) > 1 else "" + + needs_auth = {'stop', 'next', 'station', 'dl', 'scan', 'hangup', 'clearq'} + if cmd in needs_auth and not self._check_access(ctx.sender_bare): + await self.reply(ctx, "Permission Denied") + return + + if cmd in self.commands: + try: + await self.commands[cmd](ctx, arg) + except Exception as e: + logger.error(f"Command error: {e}", exc_info=True) + await self.reply(ctx, f"Error: {e}") + + async def _send_transport_info(self, session: JingleSession, candidate: Dict): + """Sends an ICE candidate to the peer via Jingle transport-info.""" + iq = self.make_iq_set(ito=session.peer_jid) + + jingle = ET.Element('jingle', { + 'xmlns': NS_JINGLE, + 'action': 'transport-info', + 'sid': session.sid + }) + + content = ET.SubElement(jingle, 'content', { + 'creator': 'initiator', + 'name': '0' + }) + + transport = ET.SubElement(content, 'transport', { + 'xmlns': 'urn:xmpp:jingle:transports:ice-udp:1' + }) + + cand_elem = ET.SubElement(transport, 'candidate', { + 'component': str(candidate.get('component', '1')), + 'foundation': str(candidate.get('foundation', '1')), + 'generation': str(candidate.get('generation', '0')), + 'id': candidate.get('id', 'cand-0'), + 'ip': candidate.get('ip', ''), + 'port': str(candidate.get('port', '0')), + 'priority': str(candidate.get('priority', '1')), + 'protocol': candidate.get('protocol', 'udp'), + 'type': candidate.get('type', 'host') + }) + + if 'rel-addr' in candidate: + cand_elem.set('rel-addr', candidate['rel-addr']) + if 'rel-port' in candidate: + cand_elem.set('rel-port', str(candidate['rel-port'])) + + iq.xml.append(jingle) + + try: + iq.send() + logger.debug(f"Sent transport-info: {candidate.get('type')} {candidate.get('ip')}") + except Exception as e: + logger.error(f"Failed to send transport-info: {e}") + + def _handle_jingle_propose(self, msg): + """Handles incoming XEP-0353 Jingle Message initiation.""" + sender = str(msg['from']) + propose = msg.xml.find(f'{{{NS_JINGLE_MSG}}}propose') + sid = propose.get('id') + + logger.info(f"Call from {sender}, ID={sid}") + self._send_proceed(sender, sid) + + if self.jingle: + self.jingle.register_proposed_session(sid, sender) + + def _handle_jingle_retract(self, msg): + retract = msg.xml.find(f'{{{NS_JINGLE_MSG}}}retract') + sid = retract.get('id') + logger.info(f"Call Retracted: {sid}") + if self.jingle: + self._create_task(self.jingle.stop_session(sid)) + + def _send_proceed(self, to_jid: str, sid: str): + msg = self.make_message(mto=to_jid, mtype='chat') + proceed = ET.Element(f'{{{NS_JINGLE_MSG}}}proceed', {'id': sid}) + msg.xml.append(proceed) + msg.xml.append(ET.Element(f'{{{NS_HINTS}}}store')) + msg.send() + logger.info(f"Sent PROCEED to {to_jid}") + + def _handle_jingle_iq(self, iq): + if not self.jingle: + iq.reply().error().set_payload(ET.Element('service-unavailable')).send() + return + self._create_task(self._process_jingle_iq(iq)) + + async def _process_jingle_iq(self, iq): + jingle = iq.xml.find(f'{{{NS_JINGLE}}}jingle') + if jingle is None: + return + + action = jingle.get('action') + sid = jingle.get('sid') + peer = str(iq['from']) + + logger.info(f"Jingle IQ: {action} (sid={sid})") + + try: + if action == 'session-initiate': + source = self._get_audio_source() + if source: + self.jingle.set_audio_source(source) + + session, accept_xml = await self.jingle.handle_session_initiate(jingle, peer, self.our_jid) + + iq.reply().send() + + accept_iq = self.make_iq_set(ito=peer) + accept_iq.xml.append(ET.fromstring(accept_xml)) + accept_iq.send() + logger.info("Sent session-accept") + + elif action == 'transport-info': + session = self.jingle.get_session(sid) + if session: + for content in jingle: + for child in content: + for cand in child: + if 'candidate' in cand.tag: + await self.jingle.add_ice_candidate(session, cand) + iq.reply().send() + + elif action == 'session-terminate': + logger.info(f"Peer hung up: {sid}") + await self.jingle.stop_session(sid) + iq.reply().send() + + else: + iq.reply().send() + + except Exception as e: + logger.error(f"Jingle Error: {e}", exc_info=True) + try: + iq.reply().send() + except: + pass + + def _check_access(self, jid): + jid = jid.split('/')[0] + if jid in self.config.admin_jids: + return True + if self.config.control_mode == 'everyone': + return True + return jid in self.config.allowed_jids + + async def _play_track(self, track: Track): + """Starts playback for both HTTP and Jingle subsystems.""" + async with self._track_transition_lock: + self.is_playing_station = False + self.current_station = None + + logger.info(f"Playing track: {track.display_name}") + + try: + if self.streamer: + await self.streamer.play_file( + track.path, + track.display_name, + track.artist + ) + + if self.jingle: + # Slight delay ensures files handle gets released if necessary + await asyncio.sleep(0.1) + self.jingle.set_audio_source(track.path, force_restart=True) + + except Exception as e: + logger.error(f"Error playing track: {e}", exc_info=True) + finally: + gc.collect(generation=0) + + async def on_track_end(self): + """Callback for when a track finishes. Handles auto-advance logic.""" + current_time = time.time() + + # Detect rapid skipping loop (usually corrupt files) + if current_time - self._skip_window_start > 10.0: + self._skip_count = 0 + self._skip_window_start = current_time + + self._skip_count += 1 + if self._skip_count > 5: + logger.error("Detected rapid skipping loop. Stopping playback.") + await self.cmd_stop(None, "force") + return + + if current_time - self._last_track_end_time < 2.0: + logger.debug("Ignoring duplicate track end callback") + return + + if self._is_transitioning: + logger.debug("Already transitioning, skipping") + return + + try: + self._is_transitioning = True + self._last_track_end_time = current_time + + if self.is_playing_station: + logger.debug("Playing station, not auto-transitioning") + return + + logger.info("Track ended, getting next track...") + + t = self.playlist_manager.next_track() + if t: + await asyncio.sleep(0.5) + await self._play_track(t) + logger.info(f"Auto-playing: {t.display_name}") + else: + logger.info("No more tracks in queue/playlist") + + except Exception as e: + logger.error(f"Error in track transition: {e}", exc_info=True) + finally: + self._is_transitioning = False + gc.collect(generation=0) + + def _get_audio_source(self): + if self.streamer and self.streamer.current_source: + return self.streamer.current_source + t = self.playlist_manager.get_current_track() + return t.path if t else None + + def _get_np_text(self): + if self.is_playing_station: + return f"Station: {self.current_station}" + + t = self.playlist_manager.get_current_track() + if t: + return t.display_name + + if self.streamer and self.streamer.state.is_playing: + return self.streamer.state.title + + return "Nothing" + + async def cmd_help(self, ctx: MessageContext, arg: str): + help_text = ( + "Radio Bot Commands:\n\n" + "Playback:\n" + "!play [query] - Play/Search music\n" + "!stop / !next / !prev - Control\n" + "!np - Now playing\n" + "!shuffle - Shuffle playlist\n\n" + "Queue:\n" + "!queue (!q) - View queue\n" + "!addq - Add to queue\n" + "!playnext - Play next\n" + "!clearq - Clear queue\n" + "!removeq <#> - Remove from queue\n" + "!shuffleq - Shuffle queue\n" + "!history - Recent tracks\n\n" + "Download:\n" + "!search - Search YouTube\n" + "!dl - Download\n\n" + "Radio:\n" + "!station - Play station\n" + "!stations - List stations\n" + "!list [page] - Show playlist\n" + "!scan - Rescan library\n" + "!callers - Active calls\n\n" + "Unnecessary commands DO NOT USE:\n" + "!url - Returns a local URL so nobody needs it\n" + "!mem - Shows memory usage but it crashes anyways\n" + "!hangup - Just use your clients disconnect button\n\n" + "Version Info and About:\n" + "XMPP Radio Bot v0.1 (codename: Absolute Solver)" + ) + await self.reply(ctx, help_text) + + async def cmd_play(self, ctx: MessageContext, arg: str): + if arg: + res = self.playlist_manager.search_library(arg) + if res: + await self._play_track(res[0]) + await self.reply(ctx, f"Playing: {res[0].display_name}") + else: + await self.reply(ctx, "Not found in library") + else: + t = self.playlist_manager.next_track() + if t: + await self._play_track(t) + await self.reply(ctx, f"Playing: {t.display_name}") + + async def cmd_stop(self, ctx: MessageContext, arg: str): + if self.streamer: + await self.streamer.stop_playback() + + if self.jingle: + await self.jingle.stop_playback() + + async with self._track_transition_lock: + self.is_playing_station = False + self.current_station = None + + await self.reply(ctx, "Stopped") + + async def cmd_next(self, ctx: MessageContext, arg: str): + t = self.playlist_manager.next_track() + if t: + await self._play_track(t) + await self.reply(ctx, f"Next: {t.display_name}") + else: + await self.reply(ctx, "No next track") + + async def cmd_prev(self, ctx: MessageContext, arg: str): + t = self.playlist_manager.previous_track() + if t: + await self._play_track(t) + await self.reply(ctx, f"Previous: {t.display_name}") + else: + await self.reply(ctx, "No previous track") + + async def cmd_np(self, ctx: MessageContext, arg: str): + np_text = self._get_np_text() + queue_size = self.playlist_manager.queue.size + queue_info = f"\nQueue: {queue_size} tracks" if queue_size > 0 else "" + await self.reply(ctx, f"Now Playing: {np_text}{queue_info}") + + async def cmd_station(self, ctx: MessageContext, arg: str): + if not arg or arg not in self.config.stations: + stations = ', '.join(self.config.stations.keys()) or "None configured" + await self.reply(ctx, f"Stations: {stations}") + return + + url, _ = await self.station_parser.resolve_stream_url(self.config.stations[arg]) + if url: + try: + async with self._track_transition_lock: + self.is_playing_station = True + self.current_station = arg + + if self.streamer: + await self.streamer.play_url(url, arg) + + if self.jingle: + await asyncio.sleep(0.1) + self.jingle.set_audio_source(url, force_restart=True) + + await self.reply(ctx, f"Playing: {arg}") + except Exception as e: + logger.error(f"Error playing station: {e}") + await self.reply(ctx, f"Failed: {e}") + else: + await self.reply(ctx, "Failed to connect to station") + + async def cmd_stations(self, ctx: MessageContext, arg: str): + stations = ', '.join(self.config.stations.keys()) or "None configured" + await self.reply(ctx, f"Stations: {stations}") + + async def cmd_list(self, ctx: MessageContext, arg: str): + page_size = 15 + + try: + page = int(arg) if arg.strip() else 1 + if page < 1: + page = 1 + except ValueError: + page = 1 + + start_index = (page - 1) * page_size + tracks = self.playlist_manager.list_tracks(start_index, page_size) + + if tracks: + text = "\n".join([f"{idx+1}. {track.display_name}" for idx, track in tracks]) + await self.reply(ctx, f"Playlist (Page {page}):\n{text}\n\nType '!list {page+1}' for more.") + else: + if page > 1: + await self.reply(ctx, f"No tracks on page {page}.") + else: + await self.reply(ctx, "Playlist is empty") + + async def cmd_shuffle(self, ctx: MessageContext, arg: str): + if self.playlist_manager.current_playlist: + self.playlist_manager.current_playlist.shuffle() + await self.reply(ctx, "Playlist shuffled") + + async def cmd_search(self, ctx: MessageContext, arg: str): + if not self.ytdlp: + await self.reply(ctx, "yt-dlp is disabled") + return + if not arg: + await self.reply(ctx, "Usage: !search ") + return + + await self.reply(ctx, f"Searching: {arg}...") + res = await self.ytdlp.search(arg) + if res: + self._search_results[ctx.sender_bare] = res + text = "\n".join([f"{i+1}. {r.title} [{r.duration_str}]" for i, r in enumerate(res[:5])]) + await self.reply(ctx, f"Results:\n{text}\n\nUse !dl # to download") + else: + await self.reply(ctx, "No results found") + + async def cmd_dl(self, ctx: MessageContext, arg: str): + if not self.ytdlp: + await self.reply(ctx, "yt-dlp is disabled") + return + if not arg: + await self.reply(ctx, "Usage: !dl ") + return + + target_url = None + title = "Unknown" + + if arg.startswith(('http', 'www')): + target_url = arg + title = "URL" + elif arg.startswith('#') and ctx.sender_bare in self._search_results: + try: + idx = int(arg[1:]) - 1 + results = self._search_results[ctx.sender_bare] + if 0 <= idx < len(results): + res = results[idx] + target_url = f"https://youtube.com/watch?v={res.id}" + title = res.title + except ValueError: + pass + + if not target_url: + await self.reply(ctx, f"Searching: {arg}...") + res = await self.ytdlp.search(arg) + if res: + target_url = f"https://youtube.com/watch?v={res[0].id}" + title = res[0].title + await self.reply(ctx, f"Found: {title}") + else: + await self.reply(ctx, "No results found") + return + + await self.reply(ctx, f"Downloading: {title}...") + path = await self.ytdlp.download(target_url) + + if path: + track = await self.playlist_manager._create_track(path) + if track: + self.playlist_manager.library.append(track) + self.playlist_manager.add_to_playlist(track) + await self.reply(ctx, f"Downloaded: {track.display_name}") + else: + await self.reply(ctx, "Downloaded file") + else: + await self.reply(ctx, "Download failed") + + async def cmd_scan(self, ctx: MessageContext, arg: str): + c = await self.playlist_manager.scan_library() + self.playlist_manager.add_library_to_playlist() + await self.reply(ctx, f"Found {c} tracks") + + async def cmd_url(self, ctx: MessageContext, arg: str): + if self.streamer: + await self.reply(ctx, f"Stream: {self.streamer.stream_url}") + else: + await self.reply(ctx, "HTTP streaming is disabled") + + async def cmd_callers(self, ctx: MessageContext, arg: str): + if self.jingle: + sessions = self.jingle.get_active_sessions() + await self.reply(ctx, f"Active callers: {len(sessions)}") + else: + await self.reply(ctx, "Jingle/calls are disabled") + + async def cmd_hangup(self, ctx: MessageContext, arg: str): + if self.jingle: + count = len(self.jingle.sessions) + for s in list(self.jingle.sessions.values()): + await self.jingle.stop_session(s.sid) + await self.reply(ctx, f"Ended {count} call(s)") + else: + await self.reply(ctx, "Jingle/calls are disabled") + + async def cmd_memory(self, ctx: MessageContext, arg: str): + mem_mb = self._get_memory_mb() + tasks = len(self._managed_tasks) + sessions = len(self.jingle.sessions) if self.jingle else 0 + cache_size = len(self._search_results) + + status = f"Memory Status:\n• Memory: {mem_mb:.1f} MB\n• Managed Tasks: {tasks}\n• Jingle Sessions: {sessions}\n• Search Cache: {cache_size}" + + if self.streamer: + status += f"\n• HTTP Listeners: {self.streamer.listener_count}" + + await self.reply(ctx, status) + + async def cmd_queue(self, ctx: MessageContext, arg: str): + queue = self.playlist_manager.get_queue(10) + current = self.playlist_manager.queue.current + + lines = [] + if current: + lines.append(f"Now: {current.display_name}") + + if queue: + lines.append("\nUp Next:") + for i, track in enumerate(queue, 1): + lines.append(f" {i}. {track.display_name}") + else: + lines.append("\nQueue is empty") + + lines.append(f"\n{self.playlist_manager.queue.size} in queue") + await self.reply(ctx, "\n".join(lines)) + + async def cmd_add_queue(self, ctx: MessageContext, arg: str): + if not arg: + await self.reply(ctx, "Usage: !addq ") + return + + results = self.playlist_manager.search_library(arg, limit=1) + if results: + track = results[0] + if self.playlist_manager.add_to_queue(track): + await self.reply(ctx, f"Added to queue: {track.display_name}") + else: + await self.reply(ctx, "Queue is full") + else: + await self.reply(ctx, "Not found in library. Try !dl first.") + + async def cmd_play_next(self, ctx: MessageContext, arg: str): + if not arg: + await self.reply(ctx, "Usage: !playnext ") + return + + results = self.playlist_manager.search_library(arg, limit=1) + if results: + track = results[0] + if self.playlist_manager.add_next_to_queue(track): + await self.reply(ctx, f"Playing next: {track.display_name}") + else: + await self.reply(ctx, "Queue is full") + else: + await self.reply(ctx, "Not found in library") + + async def cmd_clear_queue(self, ctx: MessageContext, arg: str): + self.playlist_manager.clear_queue() + await self.reply(ctx, "Queue cleared") + + async def cmd_remove_queue(self, ctx: MessageContext, arg: str): + if not arg: + await self.reply(ctx, "Usage: !removeq ") + return + + try: + idx = int(arg) - 1 + track = self.playlist_manager.remove_from_queue(idx) + if track: + await self.reply(ctx, f"Removed: {track.display_name}") + else: + await self.reply(ctx, "Invalid index") + except ValueError: + await self.reply(ctx, "Invalid number") + + async def cmd_shuffle_queue(self, ctx: MessageContext, arg: str): + self.playlist_manager.shuffle_queue() + await self.reply(ctx, "Queue shuffled") + + async def cmd_history(self, ctx: MessageContext, arg: str): + history = self.playlist_manager.queue.get_history(10) + if history: + lines = ["Recently Played:"] + for i, track in enumerate(reversed(history), 1): + lines.append(f" {i}. {track.display_name}") + await self.reply(ctx, "\n".join(lines)) + else: + await self.reply(ctx, "No history yet") + + +async def main(): + """Main execution function; handles CLI args and shutdown signals.""" + import argparse + p = argparse.ArgumentParser(description="XMPP Radio Bot") + p.add_argument('-c', '--config', default='config.ini', help='Config file path') + p.add_argument('-v', '--verbose', action='store_true', help='Enable verbose logging') + args = p.parse_args() + + if args.verbose: + logging.getLogger().setLevel(logging.DEBUG) + + config = load_config(args.config) + bot = RadioBot(config) + bot.connect() + + try: + await bot.disconnected + except KeyboardInterrupt: + logger.info("Shutting down...") + finally: + # Graceful cleanup of background tasks + if bot._gc_task: + bot._gc_task.cancel() + try: + await asyncio.wait_for(bot._gc_task, timeout=2.0) + except (asyncio.CancelledError, asyncio.TimeoutError): + pass + + if bot._memory_monitor_task: + bot._memory_monitor_task.cancel() + try: + await asyncio.wait_for(bot._memory_monitor_task, timeout=2.0) + except (asyncio.CancelledError, asyncio.TimeoutError): + pass + + for task in list(bot._managed_tasks): + task.cancel() + + if bot._managed_tasks: + await asyncio.gather(*bot._managed_tasks, return_exceptions=True) + bot._managed_tasks.clear() + + if bot.streamer: + await bot.streamer.stop() + + if bot.jingle: + await bot.jingle.end_all_sessions() + + bot._search_results.clear() + + gc.collect() + gc.collect() + gc.collect() + + logger.info("Bot stopped cleanly") + + +if __name__ == '__main__': + asyncio.run(main()) \ No newline at end of file