#!/usr/bin/env python3 """ Main Application Entry Point. This module initializes the XMPP bot, loads configuration, manages plugins (OMEMO, MUC, Jingle), and orchestrates the interaction between the audio streamer, playlist manager, and XMPP interface. """ import os import sys import asyncio import configparser import logging import json import gc import time import weakref from pathlib import Path from typing import Optional, Dict, List, Set, Any, FrozenSet from dataclasses import dataclass from collections import OrderedDict import xml.etree.ElementTree as ET import slixmpp from slixmpp.jid import JID from slixmpp.stanza import Iq, Message from slixmpp.xmlstream.handler import Callback, CoroutineCallback from slixmpp.xmlstream.matcher import MatchXPath from slixmpp.plugins import register_plugin from audio_streamer import AudioStreamer from playlist_manager import PlaylistManager, PlaybackMode, Track from station_parser import StationParser from ytdlp_handler import YtDlpHandler, SearchResult try: from jingle_webrtc import JingleWebRTCHandler, JingleSession, CallState, AIORTC_AVAILABLE JINGLE_AVAILABLE = AIORTC_AVAILABLE except ImportError: JINGLE_AVAILABLE = False JingleWebRTCHandler = None try: from omemo.storage import Just, Maybe, Nothing, Storage from omemo.types import DeviceInformation, JSONType from slixmpp_omemo import TrustLevel, XEP_0384 OMEMO_AVAILABLE = True except ImportError: OMEMO_AVAILABLE = False logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(name)s - %(message)s' ) logger = logging.getLogger(__name__) NS_JINGLE = "urn:xmpp:jingle:1" NS_JINGLE_MSG = "urn:xmpp:jingle-message:0" NS_HINTS = "urn:xmpp:hints" class LRUCache(OrderedDict): """Simple LRU Cache implementation for storing search results.""" def __init__(self, maxsize=25): super().__init__() self.maxsize = maxsize def __setitem__(self, key, value): if key in self: self.move_to_end(key) super().__setitem__(key, value) if len(self) > self.maxsize: oldest = next(iter(self)) del self[oldest] def get(self, key, default=None): if key in self: self.move_to_end(key) return self[key] return default def clear_old(self, keep=10): while len(self) > keep: oldest = next(iter(self)) del self[oldest] if OMEMO_AVAILABLE: class OMEMOStorageImpl(Storage): """File-based storage backend for OMEMO keys.""" def __init__(self, json_file_path: Path) -> None: super().__init__() self.__json_file_path = json_file_path self.__data: Dict[str, Any] = {} try: with open(self.__json_file_path, encoding="utf8") as f: self.__data = json.load(f) except Exception: pass async def _load(self, key: str) -> Maybe[Any]: if key in self.__data: return Just(self.__data[key]) return Nothing() async def _store(self, key: str, value: Any) -> None: self.__data[key] = value with open(self.__json_file_path, "w", encoding="utf8") as f: json.dump(self.__data, f) async def _delete(self, key: str) -> None: self.__data.pop(key, None) with open(self.__json_file_path, "w", encoding="utf8") as f: json.dump(self.__data, f) class XEP_0384Impl(XEP_0384): """OMEMO Plugin Configuration.""" default_config = { "fallback_message": "This message is OMEMO encrypted.", "json_file_path": None } def __init__(self, *args: Any, **kwargs: Any) -> None: super().__init__(*args, **kwargs) self.__storage: Storage def plugin_init(self) -> None: if not self.json_file_path: raise Exception("JSON file path not specified for OMEMO storage.") self.__storage = OMEMOStorageImpl(Path(self.json_file_path)) super().plugin_init() @property def storage(self) -> Storage: return self.__storage @property def _btbv_enabled(self) -> bool: return True async def _devices_blindly_trusted( self, blindly_trusted: FrozenSet[DeviceInformation], identifier: Optional[str] ) -> None: pass async def _prompt_manual_trust( self, manually_trusted: FrozenSet[DeviceInformation], identifier: Optional[str] ) -> None: session_manager = await self.get_session_manager() for device in manually_trusted: await session_manager.set_trust( device.bare_jid, device.identity_key, TrustLevel.TRUSTED.value ) register_plugin(XEP_0384Impl) @dataclass(slots=True) class MessageContext: """Encapsulates context about an incoming message for command handling.""" sender: JID sender_bare: str mtype: str is_encrypted: bool is_muc: bool room_jid: Optional[str] = None nick: Optional[str] = None @dataclass class BotConfig: jid: str password: str resource: str music_directory: str playback_mode: str audio_formats: List[str] default_volume: int http_enabled: bool stream_host: str stream_port: int stream_format: str bitrate: int mount_point: str jingle_enabled: bool stun_server: str stations: Dict[str, str] radio_room: str radio_room_nick: str management_room: str management_room_nick: str control_mode: str allowed_jids: Set[str] admin_jids: Set[str] ytdlp_enabled: bool ytdlp_format: str ytdlp_max_results: int ytdlp_max_filesize: int ytdlp_download_subdir: str omemo_enabled: bool omemo_store_path: str def load_config(config_file: str = "config.ini") -> BotConfig: if not os.path.exists(config_file): raise FileNotFoundError(f"Config file not found: {config_file}") c = configparser.ConfigParser() c.read(config_file) stations = dict(c['Stations']) if 'Stations' in c else {} def parse_jids(v): return {j.strip() for j in v.split(',') if j.strip()} if v else set() return BotConfig( jid=c.get('XMPP', 'jid'), password=c.get('XMPP', 'password'), resource=c.get('XMPP', 'resource', fallback='RadioBot'), music_directory=c.get('Radio', 'music_directory', fallback='./music'), playback_mode=c.get('Radio', 'playback_mode', fallback='random'), audio_formats=c.get('Radio', 'audio_formats', fallback='flac,opus,ogg,wav,mp3').split(','), default_volume=c.getint('Radio', 'default_volume', fallback=80), http_enabled=c.getboolean('Stream', 'http_enabled', fallback=False), stream_host=c.get('Stream', 'stream_host', fallback='0.0.0.0'), stream_port=c.getint('Stream', 'stream_port', fallback=8000), stream_format=c.get('Stream', 'stream_format', fallback='mp3'), bitrate=c.getint('Stream', 'bitrate', fallback=1), mount_point=c.get('Stream', 'mount_point', fallback='/radio'), jingle_enabled=c.getboolean('Jingle', 'enabled', fallback=True), stun_server=c.get('Jingle', 'stun_server', fallback='stun:stun.l.google.com:19302'), stations=stations, radio_room=c.get('MUC', 'radio_room', fallback=''), radio_room_nick=c.get('MUC', 'radio_room_nick', fallback='RadioBot'), management_room=c.get('MUC', 'management_room', fallback=''), management_room_nick=c.get('MUC', 'management_room_nick', fallback='RadioControl'), control_mode=c.get('Access', 'control_mode', fallback='admins'), allowed_jids=parse_jids(c.get('Access', 'allowed_jids', fallback='')), admin_jids=parse_jids(c.get('Access', 'admin_jids', fallback='')), ytdlp_enabled=c.getboolean('YtDlp', 'enabled', fallback=True), ytdlp_format=c.get('YtDlp', 'format', fallback='bestaudio'), ytdlp_max_results=c.getint('YtDlp', 'max_search_results', fallback=10), ytdlp_max_filesize=c.getint('YtDlp', 'max_filesize', fallback=50), ytdlp_download_subdir=c.get('YtDlp', 'download_subdir', fallback='downloads'), omemo_enabled=c.getboolean('OMEMO', 'enabled', fallback=False), omemo_store_path=c.get('OMEMO', 'store_path', fallback='radio_omemo_store.json') ) class RadioBot(slixmpp.ClientXMPP): """ Core XMPP Bot logic. Handles: - Connection and Authentication - OMEMO and Plaintext message routing - Command processing - Integration with PlaylistManager, AudioStreamer, and JingleWebRTCHandler - Task management and memory monitoring """ MEMORY_WARNING_THRESHOLD = 400 MEMORY_CRITICAL_THRESHOLD = 600 def __init__(self, config: BotConfig): super().__init__(f"{config.jid}/{config.resource}", config.password) self.config = config self.our_jid = f"{config.jid}/{config.resource}" self.omemo_active = config.omemo_enabled and OMEMO_AVAILABLE self.streamer = AudioStreamer( config.stream_host, config.stream_port, config.mount_point, config.stream_format, config.bitrate ) if config.http_enabled else None self.playlist_manager = PlaylistManager(config.music_directory, config.audio_formats) self.playlist_manager.set_playback_mode( PlaybackMode.RANDOM if config.playback_mode == 'random' else PlaybackMode.SEQUENTIAL ) self.station_parser = StationParser() self.ytdlp = YtDlpHandler( os.path.join(config.music_directory, config.ytdlp_download_subdir), config.ytdlp_format, config.ytdlp_max_filesize, config.ytdlp_max_results ) if config.ytdlp_enabled else None self.jingle = None if config.jingle_enabled and JINGLE_AVAILABLE: self.jingle = JingleWebRTCHandler( stun_server=config.stun_server, send_transport_info_callback=self._send_transport_info, on_track_end=self.on_track_end ) logger.info("Jingle enabled with WebRTC") elif config.jingle_enabled: logger.warning("Jingle enabled but 'aiortc' missing") self.current_station = None self.is_playing_station = False self._search_results = LRUCache(maxsize=25) self._track_transition_lock = asyncio.Lock() self._last_track_end_time = 0 self._is_transitioning = False # Rapid skip detection variables self._skip_count = 0 self._skip_window_start = 0 # Task management set to allow proper cleanup on shutdown self._managed_tasks: Set[asyncio.Task] = set() self._gc_task: Optional[asyncio.Task] = None self._memory_monitor_task: Optional[asyncio.Task] = None self.register_plugin('xep_0030') # Service Discovery self.register_plugin('xep_0045') # MUC self.register_plugin('xep_0115') # Entity Capabilities self.register_plugin('xep_0199') # Ping if self.omemo_active: self.register_plugin('xep_0085') self.register_plugin('xep_0380') self.register_plugin( "xep_0384", {"json_file_path": config.omemo_store_path}, module=sys.modules[__name__] ) logger.info("OMEMO encryption enabled") elif config.omemo_enabled: logger.warning("OMEMO enabled but libraries missing") if self.jingle: for f in [NS_JINGLE, NS_JINGLE_MSG, "urn:xmpp:jingle:apps:rtp:1", "urn:xmpp:jingle:apps:rtp:audio", "urn:xmpp:jingle:transports:ice-udp:1", "urn:xmpp:jingle:apps:dtls:0"]: self['xep_0030'].add_feature(f) self.add_event_handler("session_start", self.on_start) if self.omemo_active: self.register_handler(CoroutineCallback( "DirectMessagesOMEMO", MatchXPath(f"{{{self.default_ns}}}message[@type='chat']"), self._handle_direct_message )) self.register_handler(CoroutineCallback( "GroupChatOMEMO", MatchXPath(f"{{{self.default_ns}}}message[@type='groupchat']"), self._handle_muc_message )) else: self.add_event_handler("message", self._handle_plain_message) self.add_event_handler("groupchat_message", self._handle_plain_muc_message) # Jingle Message Initiation (XEP-0353) self.register_handler(Callback( 'JinglePropose', MatchXPath(f'{{jabber:client}}message/{{{NS_JINGLE_MSG}}}propose'), self._handle_jingle_propose )) self.register_handler(Callback( 'JingleRetract', MatchXPath(f'{{jabber:client}}message/{{{NS_JINGLE_MSG}}}retract'), self._handle_jingle_retract )) # Standard Jingle IQs self.register_handler(Callback( 'JingleIQ', MatchXPath(f'{{jabber:client}}iq/{{{NS_JINGLE}}}jingle'), self._handle_jingle_iq )) if self.streamer and not self.jingle: self.streamer.on_track_end = self.on_track_end self.commands = { 'help': self.cmd_help, 'play': self.cmd_play, 'stop': self.cmd_stop, 'next': self.cmd_next, 'prev': self.cmd_prev, 'np': self.cmd_np, 'station': self.cmd_station, 'stations': self.cmd_stations, 'list': self.cmd_list, 'shuffle': self.cmd_shuffle, 'search': self.cmd_search, 'dl': self.cmd_dl, 'scan': self.cmd_scan, 'url': self.cmd_url, 'callers': self.cmd_callers, 'hangup': self.cmd_hangup, 'queue': self.cmd_queue, 'q': self.cmd_queue, 'addq': self.cmd_add_queue, 'playnext': self.cmd_play_next, 'clearq': self.cmd_clear_queue, 'removeq': self.cmd_remove_queue, 'shuffleq': self.cmd_shuffle_queue, 'history': self.cmd_history, 'mem': self.cmd_memory, } def _create_task(self, coro) -> asyncio.Task: """Helper to create asyncio tasks and track them for cleanup.""" task = asyncio.create_task(coro) self._managed_tasks.add(task) task.add_done_callback(lambda t: self._managed_tasks.discard(t)) return task def _cleanup_done_tasks(self): """Removes completed tasks from the tracking set.""" done_tasks = {t for t in self._managed_tasks if t.done()} self._managed_tasks -= done_tasks return len(done_tasks) async def on_start(self, event): """Called when XMPP session is established.""" logger.info("Bot Session Started") self.send_presence() await self.get_roster() if self.streamer: await self.streamer.start() logger.info(f"HTTP Stream: {self.streamer.stream_url}") if self.jingle: await self.jingle.start_cleanup_task() count = await self.playlist_manager.scan_library() self.playlist_manager.add_library_to_playlist() logger.info(f"Library loaded: {count} tracks") for room, nick in [(self.config.radio_room, self.config.radio_room_nick), (self.config.management_room, self.config.management_room_nick)]: if room: try: await self.plugin['xep_0045'].join_muc(room, nick) logger.info(f"Joined MUC: {room}") except Exception as e: logger.error(f"MUC Join Failed ({room}): {e}") if self.playlist_manager.library: track = self.playlist_manager.next_track() if track: await self._play_track(track) self._gc_task = asyncio.create_task(self._periodic_gc()) self._memory_monitor_task = asyncio.create_task(self._memory_monitor()) async def _periodic_gc(self): """Aggressive garbage collection to handle heavy media object churn.""" while True: await asyncio.sleep(120) try: self._search_results.clear_old(keep=10) cleaned = self._cleanup_done_tasks() gc.collect(generation=0) gc.collect(generation=1) gc.collect(generation=2) logger.debug(f"GC completed, cleaned {cleaned} tasks, remaining: {len(self._managed_tasks)}") except Exception as e: logger.error(f"GC error: {e}") async def _memory_monitor(self): """Monitors RSS memory usage and triggers emergency cleanup if critical.""" try: import resource has_resource = True except ImportError: has_resource = False logger.warning("resource module not available, memory monitoring limited") while True: await asyncio.sleep(300) try: if has_resource: usage = resource.getrusage(resource.RUSAGE_SELF) # Convert to MB (Linux returns KB, macOS returns bytes) mem_mb = usage.ru_maxrss / 1024 if sys.platform == 'darwin': mem_mb = usage.ru_maxrss / (1024 * 1024) logger.info(f"Memory usage: {mem_mb:.1f} MB, Tasks: {len(self._managed_tasks)}") if mem_mb > self.MEMORY_WARNING_THRESHOLD: logger.warning(f"High memory usage ({mem_mb:.1f} MB)") self._search_results.clear() self._cleanup_done_tasks() gc.collect() gc.collect() gc.collect() if mem_mb > self.MEMORY_CRITICAL_THRESHOLD: logger.error(f"Critical memory usage ({mem_mb:.1f} MB), aggressive cleanup") # Terminate oldest calls to save memory if self.jingle and len(self.jingle.sessions) > 5: oldest_sessions = list(self.jingle.sessions.keys())[:-5] for sid in oldest_sessions: await self.jingle.stop_session(sid) gc.collect() except Exception as e: logger.error(f"Memory monitor error: {e}") def _get_memory_mb(self) -> float: try: import resource usage = resource.getrusage(resource.RUSAGE_SELF) mem_mb = usage.ru_maxrss / 1024 if sys.platform == 'darwin': mem_mb = usage.ru_maxrss / (1024 * 1024) return mem_mb except ImportError: return 0.0 async def reply(self, ctx: MessageContext, text: str) -> None: """Sends a reply matching the context (MUC vs Direct, OMEMO vs Plain).""" if ctx.is_muc: msg = self.make_message(mto=ctx.room_jid, mtype='groupchat') msg["body"] = text msg.send() elif ctx.is_encrypted and self.omemo_active: await self._send_encrypted(ctx.sender, ctx.mtype, text) else: await self._send_plain(ctx.sender, ctx.mtype, text) async def _send_plain(self, mto: JID, mtype: str, text: str) -> None: msg = self.make_message(mto=mto, mtype=mtype) msg["body"] = text msg.send() async def _send_encrypted(self, mto: JID, mtype: str, text: str) -> None: try: xep_0384 = self["xep_0384"] msg = self.make_message(mto=mto, mtype=mtype) msg["body"] = text encrypt_for: Set[JID] = {JID(mto)} messages, errors = await xep_0384.encrypt_message(msg, encrypt_for) if errors: logger.warning(f"OMEMO encryption errors: {errors}") for namespace, message in messages.items(): message["eme"]["namespace"] = namespace message["eme"]["name"] = self["xep_0380"].mechanisms[namespace] message.send() return except Exception as e: logger.error(f"Encryption failed, falling back to plaintext: {e}") await self._send_plain(mto, mtype, text) async def _handle_direct_message(self, stanza: Message) -> None: """Handles OMEMO-aware direct messages.""" try: mfrom = stanza["from"] mtype = stanza["type"] if mtype not in {"chat", "normal"}: return if mfrom.bare == self.boundjid.bare: return # Ignore Jingle signal messages treated as chat if stanza.xml.find(f'{{{NS_JINGLE_MSG}}}propose') is not None: return if stanza.xml.find(f'{{{NS_JINGLE_MSG}}}proceed') is not None: return if stanza.xml.find(f'{{{NS_JINGLE_MSG}}}reject') is not None: return xep_0384 = self["xep_0384"] namespace = xep_0384.is_encrypted(stanza) body = None is_encrypted = False if namespace: try: decrypted_msg, device_info = await xep_0384.decrypt_message(stanza) if decrypted_msg.get("body"): body = decrypted_msg["body"].strip() is_encrypted = True except Exception as e: logger.error(f"Decryption failed from {mfrom}: {e}") ctx = MessageContext( sender=mfrom, sender_bare=str(mfrom.bare), mtype=mtype, is_encrypted=True, is_muc=False ) await self.reply(ctx, f"Could not decrypt: {e}") return else: if stanza["body"]: body = stanza["body"].strip() if not body: return ctx = MessageContext( sender=mfrom, sender_bare=str(mfrom.bare), mtype=mtype, is_encrypted=is_encrypted, is_muc=False ) await self._process_direct_message(ctx, body) except Exception as e: logger.error(f"Error in direct message handler: {e}", exc_info=True) async def _handle_muc_message(self, stanza: Message) -> None: try: room_jid = str(stanza['from'].bare) sender_nick = stanza['mucnick'] if sender_nick == self.config.radio_room_nick: return if sender_nick == self.config.management_room_nick: return xep_0384 = self["xep_0384"] namespace = xep_0384.is_encrypted(stanza) body = None if namespace: try: decrypted_msg, device_info = await xep_0384.decrypt_message(stanza) if decrypted_msg.get("body"): body = decrypted_msg["body"].strip() except Exception as e: logger.error(f"Failed to decrypt MUC message: {e}") return else: if stanza['body']: body = stanza['body'].strip() if not body: return ctx = MessageContext( sender=stanza['from'], sender_bare=str(stanza['from'].bare), mtype='groupchat', is_encrypted=False, is_muc=True, room_jid=room_jid, nick=sender_nick ) if body.startswith('!'): await self._run_command(ctx, body[1:]) except Exception as e: logger.error(f"Error in MUC message handler: {e}", exc_info=True) def _handle_plain_message(self, msg): if msg['type'] not in ('chat', 'normal'): return if msg.xml.find(f'{{{NS_JINGLE_MSG}}}propose') is not None: return body = msg['body'].strip() if msg['body'] else '' if body: ctx = MessageContext( sender=msg['from'], sender_bare=str(msg['from'].bare), mtype=msg['type'], is_encrypted=False, is_muc=False ) self._create_task(self._process_direct_message(ctx, body)) async def _handle_plain_muc_message(self, msg): if msg['mucnick'] == self.config.radio_room_nick: return if msg['mucnick'] == self.config.management_room_nick: return body = msg['body'].strip() if msg['body'] else '' if body and body.startswith('!'): ctx = MessageContext( sender=msg['from'], sender_bare=str(msg['from'].bare), mtype='groupchat', is_encrypted=False, is_muc=True, room_jid=str(msg['from'].bare), nick=msg['mucnick'] ) await self._run_command(ctx, body[1:]) async def _process_direct_message(self, ctx: MessageContext, body: str): if body.startswith('!'): await self._run_command(ctx, body[1:]) elif body.lower() in ('radio', 'help'): await self.reply(ctx, "XMPP Radio Bot\nType !help for commands.\nCall me to listen!") async def _run_command(self, ctx: MessageContext, cmdline: str): """Parses and executes bot commands.""" parts = cmdline.split(None, 1) cmd = parts[0].lower() arg = parts[1] if len(parts) > 1 else "" needs_auth = {'stop', 'next', 'station', 'dl', 'scan', 'hangup', 'clearq'} if cmd in needs_auth and not self._check_access(ctx.sender_bare): await self.reply(ctx, "Permission Denied") return if cmd in self.commands: try: await self.commands[cmd](ctx, arg) except Exception as e: logger.error(f"Command error: {e}", exc_info=True) await self.reply(ctx, f"Error: {e}") async def _send_transport_info(self, session: JingleSession, candidate: Dict): """Sends an ICE candidate to the peer via Jingle transport-info.""" iq = self.make_iq_set(ito=session.peer_jid) jingle = ET.Element('jingle', { 'xmlns': NS_JINGLE, 'action': 'transport-info', 'sid': session.sid }) content = ET.SubElement(jingle, 'content', { 'creator': 'initiator', 'name': '0' }) transport = ET.SubElement(content, 'transport', { 'xmlns': 'urn:xmpp:jingle:transports:ice-udp:1' }) cand_elem = ET.SubElement(transport, 'candidate', { 'component': str(candidate.get('component', '1')), 'foundation': str(candidate.get('foundation', '1')), 'generation': str(candidate.get('generation', '0')), 'id': candidate.get('id', 'cand-0'), 'ip': candidate.get('ip', ''), 'port': str(candidate.get('port', '0')), 'priority': str(candidate.get('priority', '1')), 'protocol': candidate.get('protocol', 'udp'), 'type': candidate.get('type', 'host') }) if 'rel-addr' in candidate: cand_elem.set('rel-addr', candidate['rel-addr']) if 'rel-port' in candidate: cand_elem.set('rel-port', str(candidate['rel-port'])) iq.xml.append(jingle) try: iq.send() logger.debug(f"Sent transport-info: {candidate.get('type')} {candidate.get('ip')}") except Exception as e: logger.error(f"Failed to send transport-info: {e}") def _handle_jingle_propose(self, msg): """Handles incoming XEP-0353 Jingle Message initiation.""" sender = str(msg['from']) propose = msg.xml.find(f'{{{NS_JINGLE_MSG}}}propose') sid = propose.get('id') logger.info(f"Call from {sender}, ID={sid}") self._send_proceed(sender, sid) if self.jingle: self.jingle.register_proposed_session(sid, sender) def _handle_jingle_retract(self, msg): retract = msg.xml.find(f'{{{NS_JINGLE_MSG}}}retract') sid = retract.get('id') logger.info(f"Call Retracted: {sid}") if self.jingle: self._create_task(self.jingle.stop_session(sid)) def _send_proceed(self, to_jid: str, sid: str): msg = self.make_message(mto=to_jid, mtype='chat') proceed = ET.Element(f'{{{NS_JINGLE_MSG}}}proceed', {'id': sid}) msg.xml.append(proceed) msg.xml.append(ET.Element(f'{{{NS_HINTS}}}store')) msg.send() logger.info(f"Sent PROCEED to {to_jid}") def _handle_jingle_iq(self, iq): if not self.jingle: iq.reply().error().set_payload(ET.Element('service-unavailable')).send() return self._create_task(self._process_jingle_iq(iq)) async def _process_jingle_iq(self, iq): jingle = iq.xml.find(f'{{{NS_JINGLE}}}jingle') if jingle is None: return action = jingle.get('action') sid = jingle.get('sid') peer = str(iq['from']) logger.info(f"Jingle IQ: {action} (sid={sid})") try: if action == 'session-initiate': source = self._get_audio_source() if source: self.jingle.set_audio_source(source) session, accept_xml = await self.jingle.handle_session_initiate(jingle, peer, self.our_jid) iq.reply().send() accept_iq = self.make_iq_set(ito=peer) accept_iq.xml.append(ET.fromstring(accept_xml)) accept_iq.send() logger.info("Sent session-accept") elif action == 'transport-info': session = self.jingle.get_session(sid) if session: for content in jingle: for child in content: for cand in child: if 'candidate' in cand.tag: await self.jingle.add_ice_candidate(session, cand) iq.reply().send() elif action == 'session-terminate': logger.info(f"Peer hung up: {sid}") await self.jingle.stop_session(sid) iq.reply().send() else: iq.reply().send() except Exception as e: logger.error(f"Jingle Error: {e}", exc_info=True) try: iq.reply().send() except: pass def _check_access(self, jid): jid = jid.split('/')[0] if jid in self.config.admin_jids: return True if self.config.control_mode == 'everyone': return True return jid in self.config.allowed_jids async def _play_track(self, track: Track): """Starts playback for both HTTP and Jingle subsystems.""" async with self._track_transition_lock: self.is_playing_station = False self.current_station = None logger.info(f"Playing track: {track.display_name}") try: if self.streamer: await self.streamer.play_file( track.path, track.display_name, track.artist ) if self.jingle: # Slight delay ensures files handle gets released if necessary await asyncio.sleep(0.1) self.jingle.set_audio_source(track.path, force_restart=True) except Exception as e: logger.error(f"Error playing track: {e}", exc_info=True) finally: gc.collect(generation=0) async def on_track_end(self): """Callback for when a track finishes. Handles auto-advance logic.""" current_time = time.time() # Detect rapid skipping loop (usually corrupt files) if current_time - self._skip_window_start > 10.0: self._skip_count = 0 self._skip_window_start = current_time self._skip_count += 1 if self._skip_count > 5: logger.error("Detected rapid skipping loop. Stopping playback.") await self.cmd_stop(None, "force") return if current_time - self._last_track_end_time < 2.0: logger.debug("Ignoring duplicate track end callback") return if self._is_transitioning: logger.debug("Already transitioning, skipping") return try: self._is_transitioning = True self._last_track_end_time = current_time if self.is_playing_station: logger.debug("Playing station, not auto-transitioning") return logger.info("Track ended, getting next track...") t = self.playlist_manager.next_track() if t: await asyncio.sleep(0.5) await self._play_track(t) logger.info(f"Auto-playing: {t.display_name}") else: logger.info("No more tracks in queue/playlist") except Exception as e: logger.error(f"Error in track transition: {e}", exc_info=True) finally: self._is_transitioning = False gc.collect(generation=0) def _get_audio_source(self): if self.streamer and self.streamer.current_source: return self.streamer.current_source t = self.playlist_manager.get_current_track() return t.path if t else None def _get_np_text(self): if self.is_playing_station: return f"Station: {self.current_station}" t = self.playlist_manager.get_current_track() if t: return t.display_name if self.streamer and self.streamer.state.is_playing: return self.streamer.state.title return "Nothing" async def cmd_help(self, ctx: MessageContext, arg: str): help_text = ( "Radio Bot Commands:\n\n" "Playback:\n" "!play [query] - Play/Search music\n" "!stop / !next / !prev - Control\n" "!np - Now playing\n" "!shuffle - Shuffle playlist\n\n" "Queue:\n" "!queue (!q) - View queue\n" "!addq - Add to queue\n" "!playnext - Play next\n" "!clearq - Clear queue\n" "!removeq <#> - Remove from queue\n" "!shuffleq - Shuffle queue\n" "!history - Recent tracks\n\n" "Download:\n" "!search - Search YouTube\n" "!dl - Download\n\n" "Radio:\n" "!station - Play station\n" "!stations - List stations\n" "!list [page] - Show playlist\n" "!scan - Rescan library\n" "!callers - Active calls\n\n" "Unnecessary commands DO NOT USE:\n" "!url - Returns a local URL so nobody needs it\n" "!mem - Shows memory usage but it crashes anyways\n" "!hangup - Just use your clients disconnect button\n\n" "Version Info and About:\n" "XMPP Radio Bot v0.1 (codename: Absolute Solver)" ) await self.reply(ctx, help_text) async def cmd_play(self, ctx: MessageContext, arg: str): if arg: res = self.playlist_manager.search_library(arg) if res: await self._play_track(res[0]) await self.reply(ctx, f"Playing: {res[0].display_name}") else: await self.reply(ctx, "Not found in library") else: t = self.playlist_manager.next_track() if t: await self._play_track(t) await self.reply(ctx, f"Playing: {t.display_name}") async def cmd_stop(self, ctx: MessageContext, arg: str): if self.streamer: await self.streamer.stop_playback() if self.jingle: await self.jingle.stop_playback() async with self._track_transition_lock: self.is_playing_station = False self.current_station = None await self.reply(ctx, "Stopped") async def cmd_next(self, ctx: MessageContext, arg: str): t = self.playlist_manager.next_track() if t: await self._play_track(t) await self.reply(ctx, f"Next: {t.display_name}") else: await self.reply(ctx, "No next track") async def cmd_prev(self, ctx: MessageContext, arg: str): t = self.playlist_manager.previous_track() if t: await self._play_track(t) await self.reply(ctx, f"Previous: {t.display_name}") else: await self.reply(ctx, "No previous track") async def cmd_np(self, ctx: MessageContext, arg: str): np_text = self._get_np_text() queue_size = self.playlist_manager.queue.size queue_info = f"\nQueue: {queue_size} tracks" if queue_size > 0 else "" await self.reply(ctx, f"Now Playing: {np_text}{queue_info}") async def cmd_station(self, ctx: MessageContext, arg: str): if not arg or arg not in self.config.stations: stations = ', '.join(self.config.stations.keys()) or "None configured" await self.reply(ctx, f"Stations: {stations}") return url, _ = await self.station_parser.resolve_stream_url(self.config.stations[arg]) if url: try: async with self._track_transition_lock: self.is_playing_station = True self.current_station = arg if self.streamer: await self.streamer.play_url(url, arg) if self.jingle: await asyncio.sleep(0.1) self.jingle.set_audio_source(url, force_restart=True) await self.reply(ctx, f"Playing: {arg}") except Exception as e: logger.error(f"Error playing station: {e}") await self.reply(ctx, f"Failed: {e}") else: await self.reply(ctx, "Failed to connect to station") async def cmd_stations(self, ctx: MessageContext, arg: str): stations = ', '.join(self.config.stations.keys()) or "None configured" await self.reply(ctx, f"Stations: {stations}") async def cmd_list(self, ctx: MessageContext, arg: str): page_size = 15 try: page = int(arg) if arg.strip() else 1 if page < 1: page = 1 except ValueError: page = 1 start_index = (page - 1) * page_size tracks = self.playlist_manager.list_tracks(start_index, page_size) if tracks: text = "\n".join([f"{idx+1}. {track.display_name}" for idx, track in tracks]) await self.reply(ctx, f"Playlist (Page {page}):\n{text}\n\nType '!list {page+1}' for more.") else: if page > 1: await self.reply(ctx, f"No tracks on page {page}.") else: await self.reply(ctx, "Playlist is empty") async def cmd_shuffle(self, ctx: MessageContext, arg: str): if self.playlist_manager.current_playlist: self.playlist_manager.current_playlist.shuffle() await self.reply(ctx, "Playlist shuffled") async def cmd_search(self, ctx: MessageContext, arg: str): if not self.ytdlp: await self.reply(ctx, "yt-dlp is disabled") return if not arg: await self.reply(ctx, "Usage: !search ") return await self.reply(ctx, f"Searching: {arg}...") res = await self.ytdlp.search(arg) if res: self._search_results[ctx.sender_bare] = res text = "\n".join([f"{i+1}. {r.title} [{r.duration_str}]" for i, r in enumerate(res[:5])]) await self.reply(ctx, f"Results:\n{text}\n\nUse !dl # to download") else: await self.reply(ctx, "No results found") async def cmd_dl(self, ctx: MessageContext, arg: str): if not self.ytdlp: await self.reply(ctx, "yt-dlp is disabled") return if not arg: await self.reply(ctx, "Usage: !dl ") return target_url = None title = "Unknown" if arg.startswith(('http', 'www')): target_url = arg title = "URL" elif arg.startswith('#') and ctx.sender_bare in self._search_results: try: idx = int(arg[1:]) - 1 results = self._search_results[ctx.sender_bare] if 0 <= idx < len(results): res = results[idx] target_url = f"https://youtube.com/watch?v={res.id}" title = res.title except ValueError: pass if not target_url: await self.reply(ctx, f"Searching: {arg}...") res = await self.ytdlp.search(arg) if res: target_url = f"https://youtube.com/watch?v={res[0].id}" title = res[0].title await self.reply(ctx, f"Found: {title}") else: await self.reply(ctx, "No results found") return await self.reply(ctx, f"Downloading: {title}...") path = await self.ytdlp.download(target_url) if path: track = await self.playlist_manager._create_track(path) if track: self.playlist_manager.library.append(track) self.playlist_manager.add_to_playlist(track) await self.reply(ctx, f"Downloaded: {track.display_name}") else: await self.reply(ctx, "Downloaded file") else: await self.reply(ctx, "Download failed") async def cmd_scan(self, ctx: MessageContext, arg: str): c = await self.playlist_manager.scan_library() self.playlist_manager.add_library_to_playlist() await self.reply(ctx, f"Found {c} tracks") async def cmd_url(self, ctx: MessageContext, arg: str): if self.streamer: await self.reply(ctx, f"Stream: {self.streamer.stream_url}") else: await self.reply(ctx, "HTTP streaming is disabled") async def cmd_callers(self, ctx: MessageContext, arg: str): if self.jingle: sessions = self.jingle.get_active_sessions() await self.reply(ctx, f"Active callers: {len(sessions)}") else: await self.reply(ctx, "Jingle/calls are disabled") async def cmd_hangup(self, ctx: MessageContext, arg: str): if self.jingle: count = len(self.jingle.sessions) for s in list(self.jingle.sessions.values()): await self.jingle.stop_session(s.sid) await self.reply(ctx, f"Ended {count} call(s)") else: await self.reply(ctx, "Jingle/calls are disabled") async def cmd_memory(self, ctx: MessageContext, arg: str): mem_mb = self._get_memory_mb() tasks = len(self._managed_tasks) sessions = len(self.jingle.sessions) if self.jingle else 0 cache_size = len(self._search_results) status = f"Memory Status:\n• Memory: {mem_mb:.1f} MB\n• Managed Tasks: {tasks}\n• Jingle Sessions: {sessions}\n• Search Cache: {cache_size}" if self.streamer: status += f"\n• HTTP Listeners: {self.streamer.listener_count}" await self.reply(ctx, status) async def cmd_queue(self, ctx: MessageContext, arg: str): queue = self.playlist_manager.get_queue(10) current = self.playlist_manager.queue.current lines = [] if current: lines.append(f"Now: {current.display_name}") if queue: lines.append("\nUp Next:") for i, track in enumerate(queue, 1): lines.append(f" {i}. {track.display_name}") else: lines.append("\nQueue is empty") lines.append(f"\n{self.playlist_manager.queue.size} in queue") await self.reply(ctx, "\n".join(lines)) async def cmd_add_queue(self, ctx: MessageContext, arg: str): if not arg: await self.reply(ctx, "Usage: !addq ") return results = self.playlist_manager.search_library(arg, limit=1) if results: track = results[0] if self.playlist_manager.add_to_queue(track): await self.reply(ctx, f"Added to queue: {track.display_name}") else: await self.reply(ctx, "Queue is full") else: await self.reply(ctx, "Not found in library. Try !dl first.") async def cmd_play_next(self, ctx: MessageContext, arg: str): if not arg: await self.reply(ctx, "Usage: !playnext ") return results = self.playlist_manager.search_library(arg, limit=1) if results: track = results[0] if self.playlist_manager.add_next_to_queue(track): await self.reply(ctx, f"Playing next: {track.display_name}") else: await self.reply(ctx, "Queue is full") else: await self.reply(ctx, "Not found in library") async def cmd_clear_queue(self, ctx: MessageContext, arg: str): self.playlist_manager.clear_queue() await self.reply(ctx, "Queue cleared") async def cmd_remove_queue(self, ctx: MessageContext, arg: str): if not arg: await self.reply(ctx, "Usage: !removeq ") return try: idx = int(arg) - 1 track = self.playlist_manager.remove_from_queue(idx) if track: await self.reply(ctx, f"Removed: {track.display_name}") else: await self.reply(ctx, "Invalid index") except ValueError: await self.reply(ctx, "Invalid number") async def cmd_shuffle_queue(self, ctx: MessageContext, arg: str): self.playlist_manager.shuffle_queue() await self.reply(ctx, "Queue shuffled") async def cmd_history(self, ctx: MessageContext, arg: str): history = self.playlist_manager.queue.get_history(10) if history: lines = ["Recently Played:"] for i, track in enumerate(reversed(history), 1): lines.append(f" {i}. {track.display_name}") await self.reply(ctx, "\n".join(lines)) else: await self.reply(ctx, "No history yet") async def main(): """Main execution function; handles CLI args and shutdown signals.""" import argparse p = argparse.ArgumentParser(description="XMPP Radio Bot") p.add_argument('-c', '--config', default='config.ini', help='Config file path') p.add_argument('-v', '--verbose', action='store_true', help='Enable verbose logging') args = p.parse_args() if args.verbose: logging.getLogger().setLevel(logging.DEBUG) config = load_config(args.config) bot = RadioBot(config) bot.connect() try: await bot.disconnected except KeyboardInterrupt: logger.info("Shutting down...") finally: # Graceful cleanup of background tasks if bot._gc_task: bot._gc_task.cancel() try: await asyncio.wait_for(bot._gc_task, timeout=2.0) except (asyncio.CancelledError, asyncio.TimeoutError): pass if bot._memory_monitor_task: bot._memory_monitor_task.cancel() try: await asyncio.wait_for(bot._memory_monitor_task, timeout=2.0) except (asyncio.CancelledError, asyncio.TimeoutError): pass for task in list(bot._managed_tasks): task.cancel() if bot._managed_tasks: await asyncio.gather(*bot._managed_tasks, return_exceptions=True) bot._managed_tasks.clear() if bot.streamer: await bot.streamer.stop() if bot.jingle: await bot.jingle.end_all_sessions() bot._search_results.clear() gc.collect() gc.collect() gc.collect() logger.info("Bot stopped cleanly") if __name__ == '__main__': asyncio.run(main())