1320 lines
49 KiB
Python
1320 lines
49 KiB
Python
#!/usr/bin/env python3
|
||
|
||
import os
|
||
import sys
|
||
import asyncio
|
||
import configparser
|
||
import logging
|
||
import json
|
||
import gc
|
||
import time
|
||
import weakref
|
||
from pathlib import Path
|
||
from typing import Optional, Dict, List, Set, Any, FrozenSet
|
||
from dataclasses import dataclass
|
||
from collections import OrderedDict
|
||
import xml.etree.ElementTree as ET
|
||
|
||
import slixmpp
|
||
from slixmpp.jid import JID
|
||
from slixmpp.stanza import Iq, Message
|
||
from slixmpp.xmlstream.handler import Callback, CoroutineCallback
|
||
from slixmpp.xmlstream.matcher import MatchXPath
|
||
from slixmpp.plugins import register_plugin
|
||
|
||
from audio_streamer import AudioStreamer
|
||
from playlist_manager import PlaylistManager, PlaybackMode, Track
|
||
from station_parser import StationParser
|
||
from ytdlp_handler import YtDlpHandler, SearchResult
|
||
|
||
try:
|
||
from jingle_webrtc import JingleWebRTCHandler, JingleSession, CallState, AIORTC_AVAILABLE
|
||
JINGLE_AVAILABLE = AIORTC_AVAILABLE
|
||
except ImportError:
|
||
JINGLE_AVAILABLE = False
|
||
JingleWebRTCHandler = None
|
||
|
||
try:
|
||
from omemo.storage import Just, Maybe, Nothing, Storage
|
||
from omemo.types import DeviceInformation, JSONType
|
||
from slixmpp_omemo import TrustLevel, XEP_0384
|
||
OMEMO_AVAILABLE = True
|
||
except ImportError:
|
||
OMEMO_AVAILABLE = False
|
||
|
||
logging.basicConfig(
|
||
level=logging.INFO,
|
||
format='%(asctime)s - %(levelname)s - %(name)s - %(message)s'
|
||
)
|
||
logger = logging.getLogger(__name__)
|
||
|
||
NS_JINGLE = "urn:xmpp:jingle:1"
|
||
NS_JINGLE_MSG = "urn:xmpp:jingle-message:0"
|
||
NS_HINTS = "urn:xmpp:hints"
|
||
|
||
class LRUCache(OrderedDict):
|
||
def __init__(self, maxsize=25):
|
||
super().__init__()
|
||
self.maxsize = maxsize
|
||
|
||
def __setitem__(self, key, value):
|
||
if key in self:
|
||
self.move_to_end(key)
|
||
super().__setitem__(key, value)
|
||
if len(self) > self.maxsize:
|
||
oldest = next(iter(self))
|
||
del self[oldest]
|
||
|
||
def get(self, key, default=None):
|
||
if key in self:
|
||
self.move_to_end(key)
|
||
return self[key]
|
||
return default
|
||
|
||
def clear_old(self, keep=10):
|
||
while len(self) > keep:
|
||
oldest = next(iter(self))
|
||
del self[oldest]
|
||
|
||
if OMEMO_AVAILABLE:
|
||
class OMEMOStorageImpl(Storage):
|
||
|
||
def __init__(self, json_file_path: Path) -> None:
|
||
super().__init__()
|
||
self.__json_file_path = json_file_path
|
||
self.__data: Dict[str, Any] = {}
|
||
try:
|
||
with open(self.__json_file_path, encoding="utf8") as f:
|
||
self.__data = json.load(f)
|
||
except Exception:
|
||
pass
|
||
|
||
async def _load(self, key: str) -> Maybe[Any]:
|
||
if key in self.__data:
|
||
return Just(self.__data[key])
|
||
return Nothing()
|
||
|
||
async def _store(self, key: str, value: Any) -> None:
|
||
self.__data[key] = value
|
||
with open(self.__json_file_path, "w", encoding="utf8") as f:
|
||
json.dump(self.__data, f)
|
||
|
||
async def _delete(self, key: str) -> None:
|
||
self.__data.pop(key, None)
|
||
with open(self.__json_file_path, "w", encoding="utf8") as f:
|
||
json.dump(self.__data, f)
|
||
|
||
|
||
class XEP_0384Impl(XEP_0384):
|
||
|
||
default_config = {
|
||
"fallback_message": "This message is OMEMO encrypted.",
|
||
"json_file_path": None
|
||
}
|
||
|
||
def __init__(self, *args: Any, **kwargs: Any) -> None:
|
||
super().__init__(*args, **kwargs)
|
||
self.__storage: Storage
|
||
|
||
def plugin_init(self) -> None:
|
||
if not self.json_file_path:
|
||
raise Exception("JSON file path not specified for OMEMO storage.")
|
||
self.__storage = OMEMOStorageImpl(Path(self.json_file_path))
|
||
super().plugin_init()
|
||
|
||
@property
|
||
def storage(self) -> Storage:
|
||
return self.__storage
|
||
|
||
@property
|
||
def _btbv_enabled(self) -> bool:
|
||
return True
|
||
|
||
async def _devices_blindly_trusted(
|
||
self,
|
||
blindly_trusted: FrozenSet[DeviceInformation],
|
||
identifier: Optional[str]
|
||
) -> None:
|
||
pass
|
||
|
||
async def _prompt_manual_trust(
|
||
self,
|
||
manually_trusted: FrozenSet[DeviceInformation],
|
||
identifier: Optional[str]
|
||
) -> None:
|
||
session_manager = await self.get_session_manager()
|
||
for device in manually_trusted:
|
||
await session_manager.set_trust(
|
||
device.bare_jid,
|
||
device.identity_key,
|
||
TrustLevel.TRUSTED.value
|
||
)
|
||
|
||
register_plugin(XEP_0384Impl)
|
||
|
||
@dataclass(slots=True)
|
||
class MessageContext:
|
||
sender: JID
|
||
sender_bare: str
|
||
mtype: str
|
||
is_encrypted: bool
|
||
is_muc: bool
|
||
room_jid: Optional[str] = None
|
||
nick: Optional[str] = None
|
||
|
||
@dataclass
|
||
class BotConfig:
|
||
jid: str
|
||
password: str
|
||
resource: str
|
||
music_directory: str
|
||
playback_mode: str
|
||
audio_formats: List[str]
|
||
default_volume: int
|
||
http_enabled: bool
|
||
stream_host: str
|
||
stream_port: int
|
||
stream_format: str
|
||
bitrate: int
|
||
mount_point: str
|
||
jingle_enabled: bool
|
||
stun_server: str
|
||
stations: Dict[str, str]
|
||
radio_room: str
|
||
radio_room_nick: str
|
||
management_room: str
|
||
management_room_nick: str
|
||
control_mode: str
|
||
allowed_jids: Set[str]
|
||
admin_jids: Set[str]
|
||
ytdlp_enabled: bool
|
||
ytdlp_format: str
|
||
ytdlp_max_results: int
|
||
ytdlp_max_filesize: int
|
||
ytdlp_download_subdir: str
|
||
omemo_enabled: bool
|
||
omemo_store_path: str
|
||
|
||
def load_config(config_file: str = "config.ini") -> BotConfig:
|
||
if not os.path.exists(config_file):
|
||
raise FileNotFoundError(f"Config file not found: {config_file}")
|
||
|
||
c = configparser.ConfigParser()
|
||
c.read(config_file)
|
||
|
||
stations = dict(c['Stations']) if 'Stations' in c else {}
|
||
|
||
def parse_jids(v):
|
||
return {j.strip() for j in v.split(',') if j.strip()} if v else set()
|
||
|
||
return BotConfig(
|
||
jid=c.get('XMPP', 'jid'),
|
||
password=c.get('XMPP', 'password'),
|
||
resource=c.get('XMPP', 'resource', fallback='RadioBot'),
|
||
music_directory=c.get('Radio', 'music_directory', fallback='./music'),
|
||
playback_mode=c.get('Radio', 'playback_mode', fallback='random'),
|
||
audio_formats=c.get('Radio', 'audio_formats', fallback='flac,opus,ogg,wav,mp3').split(','),
|
||
default_volume=c.getint('Radio', 'default_volume', fallback=80),
|
||
http_enabled=c.getboolean('Stream', 'http_enabled', fallback=False),
|
||
stream_host=c.get('Stream', 'stream_host', fallback='0.0.0.0'),
|
||
stream_port=c.getint('Stream', 'stream_port', fallback=8000),
|
||
stream_format=c.get('Stream', 'stream_format', fallback='mp3'),
|
||
bitrate=c.getint('Stream', 'bitrate', fallback=1),
|
||
mount_point=c.get('Stream', 'mount_point', fallback='/radio'),
|
||
jingle_enabled=c.getboolean('Jingle', 'enabled', fallback=True),
|
||
stun_server=c.get('Jingle', 'stun_server', fallback='stun:stun.l.google.com:19302'),
|
||
stations=stations,
|
||
radio_room=c.get('MUC', 'radio_room', fallback=''),
|
||
radio_room_nick=c.get('MUC', 'radio_room_nick', fallback='RadioBot'),
|
||
management_room=c.get('MUC', 'management_room', fallback=''),
|
||
management_room_nick=c.get('MUC', 'management_room_nick', fallback='RadioControl'),
|
||
control_mode=c.get('Access', 'control_mode', fallback='admins'),
|
||
allowed_jids=parse_jids(c.get('Access', 'allowed_jids', fallback='')),
|
||
admin_jids=parse_jids(c.get('Access', 'admin_jids', fallback='')),
|
||
ytdlp_enabled=c.getboolean('YtDlp', 'enabled', fallback=True),
|
||
ytdlp_format=c.get('YtDlp', 'format', fallback='bestaudio'),
|
||
ytdlp_max_results=c.getint('YtDlp', 'max_search_results', fallback=10),
|
||
ytdlp_max_filesize=c.getint('YtDlp', 'max_filesize', fallback=50),
|
||
ytdlp_download_subdir=c.get('YtDlp', 'download_subdir', fallback='downloads'),
|
||
omemo_enabled=c.getboolean('OMEMO', 'enabled', fallback=False),
|
||
omemo_store_path=c.get('OMEMO', 'store_path', fallback='radio_omemo_store.json')
|
||
)
|
||
|
||
class RadioBot(slixmpp.ClientXMPP):
|
||
|
||
MEMORY_WARNING_THRESHOLD = 400
|
||
MEMORY_CRITICAL_THRESHOLD = 600
|
||
|
||
def __init__(self, config: BotConfig):
|
||
super().__init__(f"{config.jid}/{config.resource}", config.password)
|
||
self.config = config
|
||
self.our_jid = f"{config.jid}/{config.resource}"
|
||
|
||
self.omemo_active = config.omemo_enabled and OMEMO_AVAILABLE
|
||
|
||
self.streamer = AudioStreamer(
|
||
config.stream_host, config.stream_port, config.mount_point,
|
||
config.stream_format, config.bitrate
|
||
) if config.http_enabled else None
|
||
|
||
self.playlist_manager = PlaylistManager(config.music_directory, config.audio_formats)
|
||
self.playlist_manager.set_playback_mode(
|
||
PlaybackMode.RANDOM if config.playback_mode == 'random' else PlaybackMode.SEQUENTIAL
|
||
)
|
||
|
||
self.station_parser = StationParser()
|
||
|
||
self.ytdlp = YtDlpHandler(
|
||
os.path.join(config.music_directory, config.ytdlp_download_subdir),
|
||
config.ytdlp_format, config.ytdlp_max_filesize, config.ytdlp_max_results
|
||
) if config.ytdlp_enabled else None
|
||
|
||
self.jingle = None
|
||
if config.jingle_enabled and JINGLE_AVAILABLE:
|
||
self.jingle = JingleWebRTCHandler(
|
||
stun_server=config.stun_server,
|
||
send_transport_info_callback=self._send_transport_info,
|
||
on_track_end=self.on_track_end
|
||
)
|
||
logger.info("✅ Jingle enabled with WebRTC")
|
||
elif config.jingle_enabled:
|
||
logger.warning("⚠️ Jingle enabled but 'aiortc' missing")
|
||
|
||
self.current_station = None
|
||
self.is_playing_station = False
|
||
self._search_results = LRUCache(maxsize=25)
|
||
|
||
self._track_transition_lock = asyncio.Lock()
|
||
self._last_track_end_time = 0
|
||
|
||
self._is_transitioning = False
|
||
|
||
self._skip_count = 0
|
||
self._skip_window_start = 0
|
||
|
||
self._managed_tasks: Set[asyncio.Task] = set()
|
||
self._gc_task: Optional[asyncio.Task] = None
|
||
self._memory_monitor_task: Optional[asyncio.Task] = None
|
||
|
||
self.register_plugin('xep_0030')
|
||
self.register_plugin('xep_0045')
|
||
self.register_plugin('xep_0115')
|
||
self.register_plugin('xep_0199')
|
||
|
||
if self.omemo_active:
|
||
self.register_plugin('xep_0085')
|
||
self.register_plugin('xep_0380')
|
||
|
||
self.register_plugin(
|
||
"xep_0384",
|
||
{"json_file_path": config.omemo_store_path},
|
||
module=sys.modules[__name__]
|
||
)
|
||
logger.info("🔒 OMEMO encryption enabled")
|
||
elif config.omemo_enabled:
|
||
logger.warning("⚠️ OMEMO enabled but libraries missing")
|
||
|
||
if self.jingle:
|
||
for f in [NS_JINGLE, NS_JINGLE_MSG,
|
||
"urn:xmpp:jingle:apps:rtp:1", "urn:xmpp:jingle:apps:rtp:audio",
|
||
"urn:xmpp:jingle:transports:ice-udp:1", "urn:xmpp:jingle:apps:dtls:0"]:
|
||
self['xep_0030'].add_feature(f)
|
||
|
||
self.add_event_handler("session_start", self.on_start)
|
||
|
||
if self.omemo_active:
|
||
self.register_handler(CoroutineCallback(
|
||
"DirectMessagesOMEMO",
|
||
MatchXPath(f"{{{self.default_ns}}}message[@type='chat']"),
|
||
self._handle_direct_message
|
||
))
|
||
self.register_handler(CoroutineCallback(
|
||
"GroupChatOMEMO",
|
||
MatchXPath(f"{{{self.default_ns}}}message[@type='groupchat']"),
|
||
self._handle_muc_message
|
||
))
|
||
else:
|
||
self.add_event_handler("message", self._handle_plain_message)
|
||
self.add_event_handler("groupchat_message", self._handle_plain_muc_message)
|
||
|
||
self.register_handler(Callback(
|
||
'JinglePropose',
|
||
MatchXPath(f'{{jabber:client}}message/{{{NS_JINGLE_MSG}}}propose'),
|
||
self._handle_jingle_propose
|
||
))
|
||
|
||
self.register_handler(Callback(
|
||
'JingleRetract',
|
||
MatchXPath(f'{{jabber:client}}message/{{{NS_JINGLE_MSG}}}retract'),
|
||
self._handle_jingle_retract
|
||
))
|
||
|
||
self.register_handler(Callback(
|
||
'JingleIQ',
|
||
MatchXPath(f'{{jabber:client}}iq/{{{NS_JINGLE}}}jingle'),
|
||
self._handle_jingle_iq
|
||
))
|
||
|
||
if self.streamer and not self.jingle:
|
||
self.streamer.on_track_end = self.on_track_end
|
||
|
||
self.commands = {
|
||
'help': self.cmd_help,
|
||
'play': self.cmd_play,
|
||
'stop': self.cmd_stop,
|
||
'next': self.cmd_next,
|
||
'prev': self.cmd_prev,
|
||
'np': self.cmd_np,
|
||
'station': self.cmd_station,
|
||
'stations': self.cmd_stations,
|
||
'list': self.cmd_list,
|
||
'shuffle': self.cmd_shuffle,
|
||
'search': self.cmd_search,
|
||
'sc': self.cmd_sc,
|
||
'bc': self.cmd_bc,
|
||
'lib': self.cmd_library,
|
||
'local': self.cmd_library,
|
||
'dl': self.cmd_dl,
|
||
'scan': self.cmd_scan,
|
||
'url': self.cmd_url,
|
||
'callers': self.cmd_callers,
|
||
'hangup': self.cmd_hangup,
|
||
'queue': self.cmd_queue,
|
||
'q': self.cmd_queue,
|
||
'addq': self.cmd_add_queue,
|
||
'playnext': self.cmd_play_next,
|
||
'clearq': self.cmd_clear_queue,
|
||
'removeq': self.cmd_remove_queue,
|
||
'shuffleq': self.cmd_shuffle_queue,
|
||
'history': self.cmd_history,
|
||
'mem': self.cmd_memory,
|
||
}
|
||
|
||
def _create_task(self, coro) -> asyncio.Task:
|
||
task = asyncio.create_task(coro)
|
||
self._managed_tasks.add(task)
|
||
task.add_done_callback(lambda t: self._managed_tasks.discard(t))
|
||
return task
|
||
|
||
def _cleanup_done_tasks(self):
|
||
done_tasks = {t for t in self._managed_tasks if t.done()}
|
||
self._managed_tasks -= done_tasks
|
||
return len(done_tasks)
|
||
|
||
async def on_start(self, event):
|
||
logger.info("🚀 Bot Session Started")
|
||
self.send_presence()
|
||
await self.get_roster()
|
||
|
||
if self.streamer:
|
||
await self.streamer.start()
|
||
logger.info(f"📡 HTTP Stream: {self.streamer.stream_url}")
|
||
|
||
if self.jingle:
|
||
await self.jingle.start_cleanup_task()
|
||
|
||
count = await self.playlist_manager.scan_library()
|
||
self.playlist_manager.add_library_to_playlist()
|
||
logger.info(f"🎵 Library loaded: {count} tracks")
|
||
|
||
for room, nick in [(self.config.radio_room, self.config.radio_room_nick),
|
||
(self.config.management_room, self.config.management_room_nick)]:
|
||
if room:
|
||
try:
|
||
await self.plugin['xep_0045'].join_muc(room, nick)
|
||
logger.info(f"📢 Joined MUC: {room}")
|
||
except Exception as e:
|
||
logger.error(f"MUC Join Failed ({room}): {e}")
|
||
|
||
if self.playlist_manager.library:
|
||
track = self.playlist_manager.next_track()
|
||
if track:
|
||
await self._play_track(track)
|
||
|
||
self._gc_task = asyncio.create_task(self._periodic_gc())
|
||
self._memory_monitor_task = asyncio.create_task(self._memory_monitor())
|
||
|
||
async def _periodic_gc(self):
|
||
while True:
|
||
await asyncio.sleep(120)
|
||
|
||
try:
|
||
self._search_results.clear_old(keep=10)
|
||
cleaned = self._cleanup_done_tasks()
|
||
gc.collect(generation=0)
|
||
gc.collect(generation=1)
|
||
gc.collect(generation=2)
|
||
logger.debug(f"🧹 GC completed, cleaned {cleaned} tasks, remaining: {len(self._managed_tasks)}")
|
||
except Exception as e:
|
||
logger.error(f"GC error: {e}")
|
||
|
||
async def _memory_monitor(self):
|
||
try:
|
||
import resource
|
||
has_resource = True
|
||
except ImportError:
|
||
has_resource = False
|
||
logger.warning("resource module not available, memory monitoring limited")
|
||
|
||
while True:
|
||
await asyncio.sleep(300)
|
||
|
||
try:
|
||
if has_resource:
|
||
usage = resource.getrusage(resource.RUSAGE_SELF)
|
||
mem_mb = usage.ru_maxrss / 1024
|
||
if sys.platform == 'darwin':
|
||
mem_mb = usage.ru_maxrss / (1024 * 1024)
|
||
|
||
logger.info(f"📊 Memory usage: {mem_mb:.1f} MB, Tasks: {len(self._managed_tasks)}")
|
||
|
||
if mem_mb > self.MEMORY_WARNING_THRESHOLD:
|
||
logger.warning(f"⚠️ High memory usage ({mem_mb:.1f} MB)")
|
||
self._search_results.clear()
|
||
self._cleanup_done_tasks()
|
||
gc.collect()
|
||
gc.collect()
|
||
gc.collect()
|
||
|
||
if mem_mb > self.MEMORY_CRITICAL_THRESHOLD:
|
||
logger.error(f"🚨 Critical memory usage ({mem_mb:.1f} MB), aggressive cleanup")
|
||
if self.jingle and len(self.jingle.sessions) > 5:
|
||
oldest_sessions = list(self.jingle.sessions.keys())[:-5]
|
||
for sid in oldest_sessions:
|
||
await self.jingle.stop_session(sid)
|
||
gc.collect()
|
||
|
||
except Exception as e:
|
||
logger.error(f"Memory monitor error: {e}")
|
||
|
||
def _get_memory_mb(self) -> float:
|
||
try:
|
||
import resource
|
||
usage = resource.getrusage(resource.RUSAGE_SELF)
|
||
mem_mb = usage.ru_maxrss / 1024
|
||
if sys.platform == 'darwin':
|
||
mem_mb = usage.ru_maxrss / (1024 * 1024)
|
||
return mem_mb
|
||
except ImportError:
|
||
return 0.0
|
||
|
||
async def reply(self, ctx: MessageContext, text: str) -> None:
|
||
if ctx.is_muc:
|
||
msg = self.make_message(mto=ctx.room_jid, mtype='groupchat')
|
||
msg["body"] = text
|
||
msg.send()
|
||
elif ctx.is_encrypted and self.omemo_active:
|
||
await self._send_encrypted(ctx.sender, ctx.mtype, text)
|
||
else:
|
||
await self._send_plain(ctx.sender, ctx.mtype, text)
|
||
|
||
async def _send_plain(self, mto: JID, mtype: str, text: str) -> None:
|
||
msg = self.make_message(mto=mto, mtype=mtype)
|
||
msg["body"] = text
|
||
msg.send()
|
||
|
||
async def _send_encrypted(self, mto: JID, mtype: str, text: str) -> None:
|
||
try:
|
||
xep_0384 = self["xep_0384"]
|
||
msg = self.make_message(mto=mto, mtype=mtype)
|
||
msg["body"] = text
|
||
encrypt_for: Set[JID] = {JID(mto)}
|
||
messages, errors = await xep_0384.encrypt_message(msg, encrypt_for)
|
||
|
||
if errors:
|
||
logger.warning(f"OMEMO encryption errors: {errors}")
|
||
|
||
for namespace, message in messages.items():
|
||
message["eme"]["namespace"] = namespace
|
||
message["eme"]["name"] = self["xep_0380"].mechanisms[namespace]
|
||
message.send()
|
||
return
|
||
|
||
except Exception as e:
|
||
logger.error(f"Encryption failed, falling back to plaintext: {e}")
|
||
await self._send_plain(mto, mtype, text)
|
||
|
||
async def _handle_direct_message(self, stanza: Message) -> None:
|
||
try:
|
||
mfrom = stanza["from"]
|
||
mtype = stanza["type"]
|
||
|
||
if mtype not in {"chat", "normal"}:
|
||
return
|
||
|
||
if mfrom.bare == self.boundjid.bare:
|
||
return
|
||
|
||
if stanza.xml.find(f'{{{NS_JINGLE_MSG}}}propose') is not None:
|
||
return
|
||
if stanza.xml.find(f'{{{NS_JINGLE_MSG}}}proceed') is not None:
|
||
return
|
||
if stanza.xml.find(f'{{{NS_JINGLE_MSG}}}reject') is not None:
|
||
return
|
||
|
||
xep_0384 = self["xep_0384"]
|
||
namespace = xep_0384.is_encrypted(stanza)
|
||
|
||
body = None
|
||
is_encrypted = False
|
||
|
||
if namespace:
|
||
try:
|
||
decrypted_msg, device_info = await xep_0384.decrypt_message(stanza)
|
||
if decrypted_msg.get("body"):
|
||
body = decrypted_msg["body"].strip()
|
||
is_encrypted = True
|
||
except Exception as e:
|
||
logger.error(f"Decryption failed from {mfrom}: {e}")
|
||
ctx = MessageContext(
|
||
sender=mfrom, sender_bare=str(mfrom.bare),
|
||
mtype=mtype, is_encrypted=True, is_muc=False
|
||
)
|
||
await self.reply(ctx, f"❌ Could not decrypt: {e}")
|
||
return
|
||
else:
|
||
if stanza["body"]:
|
||
body = stanza["body"].strip()
|
||
|
||
if not body:
|
||
return
|
||
|
||
ctx = MessageContext(
|
||
sender=mfrom, sender_bare=str(mfrom.bare),
|
||
mtype=mtype, is_encrypted=is_encrypted, is_muc=False
|
||
)
|
||
|
||
await self._process_direct_message(ctx, body)
|
||
|
||
except Exception as e:
|
||
logger.error(f"Error in direct message handler: {e}", exc_info=True)
|
||
|
||
async def _handle_muc_message(self, stanza: Message) -> None:
|
||
try:
|
||
room_jid = str(stanza['from'].bare)
|
||
sender_nick = stanza['mucnick']
|
||
|
||
if sender_nick == self.config.radio_room_nick:
|
||
return
|
||
if sender_nick == self.config.management_room_nick:
|
||
return
|
||
|
||
xep_0384 = self["xep_0384"]
|
||
namespace = xep_0384.is_encrypted(stanza)
|
||
|
||
body = None
|
||
|
||
if namespace:
|
||
try:
|
||
decrypted_msg, device_info = await xep_0384.decrypt_message(stanza)
|
||
if decrypted_msg.get("body"):
|
||
body = decrypted_msg["body"].strip()
|
||
except Exception as e:
|
||
logger.error(f"Failed to decrypt MUC message: {e}")
|
||
return
|
||
else:
|
||
if stanza['body']:
|
||
body = stanza['body'].strip()
|
||
|
||
if not body:
|
||
return
|
||
|
||
ctx = MessageContext(
|
||
sender=stanza['from'], sender_bare=str(stanza['from'].bare),
|
||
mtype='groupchat', is_encrypted=False, is_muc=True,
|
||
room_jid=room_jid, nick=sender_nick
|
||
)
|
||
|
||
if body.startswith('!'):
|
||
await self._run_command(ctx, body[1:])
|
||
|
||
except Exception as e:
|
||
logger.error(f"Error in MUC message handler: {e}", exc_info=True)
|
||
|
||
def _handle_plain_message(self, msg):
|
||
if msg['type'] not in ('chat', 'normal'):
|
||
return
|
||
if msg.xml.find(f'{{{NS_JINGLE_MSG}}}propose') is not None:
|
||
return
|
||
|
||
body = msg['body'].strip() if msg['body'] else ''
|
||
if body:
|
||
ctx = MessageContext(
|
||
sender=msg['from'], sender_bare=str(msg['from'].bare),
|
||
mtype=msg['type'], is_encrypted=False, is_muc=False
|
||
)
|
||
self._create_task(self._process_direct_message(ctx, body))
|
||
|
||
async def _handle_plain_muc_message(self, msg):
|
||
if msg['mucnick'] == self.config.radio_room_nick:
|
||
return
|
||
if msg['mucnick'] == self.config.management_room_nick:
|
||
return
|
||
|
||
body = msg['body'].strip() if msg['body'] else ''
|
||
if body and body.startswith('!'):
|
||
ctx = MessageContext(
|
||
sender=msg['from'], sender_bare=str(msg['from'].bare),
|
||
mtype='groupchat', is_encrypted=False, is_muc=True,
|
||
room_jid=str(msg['from'].bare), nick=msg['mucnick']
|
||
)
|
||
await self._run_command(ctx, body[1:])
|
||
|
||
async def _process_direct_message(self, ctx: MessageContext, body: str):
|
||
if body.startswith('!'):
|
||
await self._run_command(ctx, body[1:])
|
||
elif body.lower() in ('radio', 'help'):
|
||
await self.reply(ctx, "📻 XMPP Radio Bot\nType !help for commands.\nCall me to listen!")
|
||
|
||
async def _run_command(self, ctx: MessageContext, cmdline: str):
|
||
parts = cmdline.split(None, 1)
|
||
cmd = parts[0].lower()
|
||
arg = parts[1] if len(parts) > 1 else ""
|
||
|
||
needs_auth = {'stop', 'next', 'station', 'dl', 'scan', 'hangup', 'clearq'}
|
||
if cmd in needs_auth and not self._check_access(ctx.sender_bare):
|
||
await self.reply(ctx, "⛔ Permission Denied")
|
||
return
|
||
|
||
if cmd in self.commands:
|
||
try:
|
||
await self.commands[cmd](ctx, arg)
|
||
except Exception as e:
|
||
logger.error(f"Command error: {e}", exc_info=True)
|
||
await self.reply(ctx, f"❌ Error: {e}")
|
||
|
||
async def _send_transport_info(self, session: JingleSession, candidate: Dict):
|
||
iq = self.make_iq_set(ito=session.peer_jid)
|
||
|
||
jingle = ET.Element('jingle', {
|
||
'xmlns': NS_JINGLE,
|
||
'action': 'transport-info',
|
||
'sid': session.sid
|
||
})
|
||
|
||
content = ET.SubElement(jingle, 'content', {
|
||
'creator': 'initiator',
|
||
'name': '0'
|
||
})
|
||
|
||
transport = ET.SubElement(content, 'transport', {
|
||
'xmlns': 'urn:xmpp:jingle:transports:ice-udp:1'
|
||
})
|
||
|
||
cand_elem = ET.SubElement(transport, 'candidate', {
|
||
'component': str(candidate.get('component', '1')),
|
||
'foundation': str(candidate.get('foundation', '1')),
|
||
'generation': str(candidate.get('generation', '0')),
|
||
'id': candidate.get('id', 'cand-0'),
|
||
'ip': candidate.get('ip', ''),
|
||
'port': str(candidate.get('port', '0')),
|
||
'priority': str(candidate.get('priority', '1')),
|
||
'protocol': candidate.get('protocol', 'udp'),
|
||
'type': candidate.get('type', 'host')
|
||
})
|
||
|
||
if 'rel-addr' in candidate:
|
||
cand_elem.set('rel-addr', candidate['rel-addr'])
|
||
if 'rel-port' in candidate:
|
||
cand_elem.set('rel-port', str(candidate['rel-port']))
|
||
|
||
iq.xml.append(jingle)
|
||
|
||
try:
|
||
iq.send()
|
||
logger.debug(f"📤 Sent transport-info: {candidate.get('type')} {candidate.get('ip')}")
|
||
except Exception as e:
|
||
logger.error(f"Failed to send transport-info: {e}")
|
||
|
||
def _handle_jingle_propose(self, msg):
|
||
sender = str(msg['from'])
|
||
propose = msg.xml.find(f'{{{NS_JINGLE_MSG}}}propose')
|
||
sid = propose.get('id')
|
||
|
||
logger.info(f"📞 Call from {sender}, ID={sid}")
|
||
self._send_proceed(sender, sid)
|
||
|
||
if self.jingle:
|
||
self.jingle.register_proposed_session(sid, sender)
|
||
|
||
def _handle_jingle_retract(self, msg):
|
||
retract = msg.xml.find(f'{{{NS_JINGLE_MSG}}}retract')
|
||
sid = retract.get('id')
|
||
logger.info(f"🔴 Call Retracted: {sid}")
|
||
if self.jingle:
|
||
self._create_task(self.jingle.stop_session(sid))
|
||
|
||
def _send_proceed(self, to_jid: str, sid: str):
|
||
msg = self.make_message(mto=to_jid, mtype='chat')
|
||
proceed = ET.Element(f'{{{NS_JINGLE_MSG}}}proceed', {'id': sid})
|
||
msg.xml.append(proceed)
|
||
msg.xml.append(ET.Element(f'{{{NS_HINTS}}}store'))
|
||
msg.send()
|
||
logger.info(f"📤 Sent PROCEED to {to_jid}")
|
||
|
||
def _handle_jingle_iq(self, iq):
|
||
if not self.jingle:
|
||
iq.reply().error().set_payload(ET.Element('service-unavailable')).send()
|
||
return
|
||
self._create_task(self._process_jingle_iq(iq))
|
||
|
||
async def _process_jingle_iq(self, iq):
|
||
jingle = iq.xml.find(f'{{{NS_JINGLE}}}jingle')
|
||
if jingle is None:
|
||
return
|
||
|
||
action = jingle.get('action')
|
||
sid = jingle.get('sid')
|
||
peer = str(iq['from'])
|
||
|
||
logger.info(f"📨 Jingle IQ: {action} (sid={sid})")
|
||
|
||
try:
|
||
if action == 'session-initiate':
|
||
source = self._get_audio_source()
|
||
if source:
|
||
self.jingle.set_audio_source(source)
|
||
|
||
session, accept_xml = await self.jingle.handle_session_initiate(jingle, peer, self.our_jid)
|
||
|
||
iq.reply().send()
|
||
|
||
accept_iq = self.make_iq_set(ito=peer)
|
||
accept_iq.xml.append(ET.fromstring(accept_xml))
|
||
accept_iq.send()
|
||
logger.info("✅ Sent session-accept")
|
||
|
||
elif action == 'transport-info':
|
||
session = self.jingle.get_session(sid)
|
||
if session:
|
||
for content in jingle:
|
||
for child in content:
|
||
for cand in child:
|
||
if 'candidate' in cand.tag:
|
||
await self.jingle.add_ice_candidate(session, cand)
|
||
iq.reply().send()
|
||
|
||
elif action == 'session-terminate':
|
||
logger.info(f"🔴 Peer hung up: {sid}")
|
||
await self.jingle.stop_session(sid)
|
||
iq.reply().send()
|
||
|
||
else:
|
||
iq.reply().send()
|
||
|
||
except Exception as e:
|
||
logger.error(f"Jingle Error: {e}", exc_info=True)
|
||
try:
|
||
iq.reply().send()
|
||
except:
|
||
pass
|
||
|
||
def _check_access(self, jid):
|
||
jid = jid.split('/')[0]
|
||
if jid in self.config.admin_jids:
|
||
return True
|
||
if self.config.control_mode == 'everyone':
|
||
return True
|
||
return jid in self.config.allowed_jids
|
||
|
||
async def _play_track(self, track: Track):
|
||
async with self._track_transition_lock:
|
||
self.is_playing_station = False
|
||
self.current_station = None
|
||
|
||
logger.info(f"🎵 Playing track: {track.display_name}")
|
||
|
||
try:
|
||
if self.streamer:
|
||
await self.streamer.play_file(
|
||
track.path,
|
||
track.display_name,
|
||
track.artist
|
||
)
|
||
|
||
if self.jingle:
|
||
await asyncio.sleep(0.1)
|
||
self.jingle.set_audio_source(track.path, force_restart=True)
|
||
|
||
except Exception as e:
|
||
logger.error(f"Error playing track: {e}", exc_info=True)
|
||
finally:
|
||
gc.collect(generation=0)
|
||
|
||
async def on_track_end(self):
|
||
current_time = time.time()
|
||
|
||
if current_time - self._skip_window_start > 10.0:
|
||
self._skip_count = 0
|
||
self._skip_window_start = current_time
|
||
|
||
self._skip_count += 1
|
||
if self._skip_count > 5:
|
||
logger.error("🚨 Detected rapid skipping loop. Stopping playback.")
|
||
await self.cmd_stop(None, "force")
|
||
return
|
||
|
||
if current_time - self._last_track_end_time < 2.0:
|
||
logger.debug("Ignoring duplicate track end callback")
|
||
return
|
||
|
||
if self._is_transitioning:
|
||
logger.debug("Already transitioning, skipping")
|
||
return
|
||
|
||
try:
|
||
self._is_transitioning = True
|
||
self._last_track_end_time = current_time
|
||
|
||
if self.is_playing_station:
|
||
logger.debug("Playing station, not auto-transitioning")
|
||
return
|
||
|
||
logger.info("🔄 Track ended, getting next track...")
|
||
|
||
t = self.playlist_manager.next_track()
|
||
if t:
|
||
await asyncio.sleep(0.5)
|
||
await self._play_track(t)
|
||
logger.info(f"▶️ Auto-playing: {t.display_name}")
|
||
else:
|
||
logger.info("No more tracks in queue/playlist")
|
||
|
||
except Exception as e:
|
||
logger.error(f"Error in track transition: {e}", exc_info=True)
|
||
finally:
|
||
self._is_transitioning = False
|
||
gc.collect(generation=0)
|
||
|
||
def _get_audio_source(self):
|
||
if self.streamer and self.streamer.current_source:
|
||
return self.streamer.current_source
|
||
t = self.playlist_manager.get_current_track()
|
||
return t.path if t else None
|
||
|
||
def _get_np_text(self):
|
||
if self.is_playing_station:
|
||
return f"📻 {self.current_station}"
|
||
|
||
t = self.playlist_manager.get_current_track()
|
||
if t:
|
||
return t.display_name
|
||
|
||
if self.streamer and self.streamer.state.is_playing:
|
||
return self.streamer.state.title
|
||
|
||
return "Nothing"
|
||
|
||
async def cmd_help(self, ctx: MessageContext, arg: str):
|
||
help_text = (
|
||
"🎵 **Radio Bot Commands:**\n\n"
|
||
"**Playback:**\n"
|
||
"!play [query] - Play/Search music\n"
|
||
"!stop / !next / !prev - Control\n"
|
||
"!np - Now playing\n"
|
||
"!shuffle - Shuffle playlist\n\n"
|
||
"**Queue:**\n"
|
||
"!queue (!q) - View queue\n"
|
||
"!addq <query> - Add to queue\n"
|
||
"!playnext <query> - Play next\n"
|
||
"!clearq - Clear queue\n"
|
||
"!removeq <#> - Remove from queue\n"
|
||
"!shuffleq - Shuffle queue\n"
|
||
"!history - Recent tracks\n\n"
|
||
"**Search & Download:**\n"
|
||
"!search <query> - Search YouTube\n"
|
||
"!sc <query> - Search SoundCloud\n"
|
||
"!bc <query> - Search Bandcamp\n"
|
||
"!lib <query> - Search Local Library\n"
|
||
"!dl <url|query|#num> - Download\n\n"
|
||
"**Radio:**\n"
|
||
"!station <name> - Play station\n"
|
||
"!stations - List stations\n"
|
||
"!list [page] - Show playlist\n"
|
||
"!scan - Rescan library\n"
|
||
"!callers - Active calls\n\n"
|
||
"**Unnecessary commands DO NOT USE:**\n"
|
||
"!url - Returns a local URL so nobody needs it\n"
|
||
"!mem - Shows memory usage but it crashes anyways\n"
|
||
"!hangup - Just use your clients disconnect button\n\n"
|
||
"**Version Info and About:**\n"
|
||
"XMPP Radio Bot v0.2 (codename: Branded Pens)"
|
||
)
|
||
await self.reply(ctx, help_text)
|
||
|
||
async def cmd_play(self, ctx: MessageContext, arg: str):
|
||
if arg:
|
||
res = self.playlist_manager.search_library(arg)
|
||
if res:
|
||
await self._play_track(res[0])
|
||
await self.reply(ctx, f"▶️ {res[0].display_name}")
|
||
else:
|
||
await self.reply(ctx, "❌ Not found in library")
|
||
else:
|
||
t = self.playlist_manager.next_track()
|
||
if t:
|
||
await self._play_track(t)
|
||
await self.reply(ctx, f"▶️ {t.display_name}")
|
||
|
||
async def cmd_stop(self, ctx: MessageContext, arg: str):
|
||
if self.streamer:
|
||
await self.streamer.stop_playback()
|
||
|
||
if self.jingle:
|
||
await self.jingle.stop_playback()
|
||
|
||
async with self._track_transition_lock:
|
||
self.is_playing_station = False
|
||
self.current_station = None
|
||
|
||
await self.reply(ctx, "⏹️ Stopped")
|
||
|
||
async def cmd_next(self, ctx: MessageContext, arg: str):
|
||
t = self.playlist_manager.next_track()
|
||
if t:
|
||
await self._play_track(t)
|
||
await self.reply(ctx, f"⏭️ {t.display_name}")
|
||
else:
|
||
await self.reply(ctx, "❌ No next track")
|
||
|
||
async def cmd_prev(self, ctx: MessageContext, arg: str):
|
||
t = self.playlist_manager.previous_track()
|
||
if t:
|
||
await self._play_track(t)
|
||
await self.reply(ctx, f"⏮️ {t.display_name}")
|
||
else:
|
||
await self.reply(ctx, "❌ No previous track")
|
||
|
||
async def cmd_np(self, ctx: MessageContext, arg: str):
|
||
np_text = self._get_np_text()
|
||
queue_size = self.playlist_manager.queue.size
|
||
queue_info = f"\n📋 Queue: {queue_size} tracks" if queue_size > 0 else ""
|
||
await self.reply(ctx, f"🎵 Now Playing: {np_text}{queue_info}")
|
||
|
||
async def cmd_station(self, ctx: MessageContext, arg: str):
|
||
if not arg or arg not in self.config.stations:
|
||
stations = ', '.join(self.config.stations.keys()) or "None configured"
|
||
await self.reply(ctx, f"📻 Stations: {stations}")
|
||
return
|
||
|
||
url, _ = await self.station_parser.resolve_stream_url(self.config.stations[arg])
|
||
if url:
|
||
try:
|
||
async with self._track_transition_lock:
|
||
self.is_playing_station = True
|
||
self.current_station = arg
|
||
|
||
if self.streamer:
|
||
await self.streamer.play_url(url, arg)
|
||
|
||
if self.jingle:
|
||
await asyncio.sleep(0.1)
|
||
self.jingle.set_audio_source(url, force_restart=True)
|
||
|
||
await self.reply(ctx, f"📻 Playing: {arg}")
|
||
except Exception as e:
|
||
logger.error(f"Error playing station: {e}")
|
||
await self.reply(ctx, f"❌ Failed: {e}")
|
||
else:
|
||
await self.reply(ctx, "❌ Failed to connect to station")
|
||
|
||
async def cmd_stations(self, ctx: MessageContext, arg: str):
|
||
stations = ', '.join(self.config.stations.keys()) or "None configured"
|
||
await self.reply(ctx, f"📻 Stations: {stations}")
|
||
|
||
async def cmd_list(self, ctx: MessageContext, arg: str):
|
||
page_size = 15
|
||
|
||
try:
|
||
page = int(arg) if arg.strip() else 1
|
||
if page < 1:
|
||
page = 1
|
||
except ValueError:
|
||
page = 1
|
||
|
||
start_index = (page - 1) * page_size
|
||
tracks = self.playlist_manager.list_tracks(start_index, page_size)
|
||
|
||
if tracks:
|
||
text = "\n".join([f"{idx+1}. {track.display_name}" for idx, track in tracks])
|
||
await self.reply(ctx, f"📋 Playlist (Page {page}):\n{text}\n\nType '!list {page+1}' for more.")
|
||
else:
|
||
if page > 1:
|
||
await self.reply(ctx, f"📋 No tracks on page {page}.")
|
||
else:
|
||
await self.reply(ctx, "📋 Playlist is empty")
|
||
|
||
async def cmd_shuffle(self, ctx: MessageContext, arg: str):
|
||
if self.playlist_manager.current_playlist:
|
||
self.playlist_manager.current_playlist.shuffle()
|
||
await self.reply(ctx, "🔀 Playlist shuffled")
|
||
|
||
async def _search_generic(self, ctx: MessageContext, query: str, source: str):
|
||
if not self.ytdlp:
|
||
await self.reply(ctx, "❌ yt-dlp is disabled")
|
||
return
|
||
if not query:
|
||
await self.reply(ctx, f"Usage: !{source} <query>")
|
||
return
|
||
|
||
await self.reply(ctx, f"🔍 Searching {source}: {query}...")
|
||
res = await self.ytdlp.search(query, source=source)
|
||
if res:
|
||
self._search_results[ctx.sender_bare] = res
|
||
text = "\n".join([f"{i+1}. {r.title} [{r.duration_str}]" for i, r in enumerate(res[:10])])
|
||
await self.reply(ctx, f"🔍 Results ({source}):\n{text}\n\nUse !dl #<number> to download")
|
||
else:
|
||
await self.reply(ctx, "❌ No results found")
|
||
|
||
async def cmd_search(self, ctx: MessageContext, arg: str):
|
||
await self._search_generic(ctx, arg, "youtube")
|
||
|
||
async def cmd_sc(self, ctx: MessageContext, arg: str):
|
||
await self._search_generic(ctx, arg, "soundcloud")
|
||
|
||
async def cmd_bc(self, ctx: MessageContext, arg: str):
|
||
await self._search_generic(ctx, arg, "bandcamp")
|
||
|
||
async def cmd_library(self, ctx: MessageContext, arg: str):
|
||
if not arg:
|
||
await self.reply(ctx, "Usage: !lib <query>")
|
||
return
|
||
|
||
results = self.playlist_manager.search_library(arg, limit=10)
|
||
if results:
|
||
text = "\n".join([f"{i+1}. {t.display_name}" for i, t in enumerate(results)])
|
||
await self.reply(ctx, f"📚 Library Results:\n{text}\n\nUse !play <query> to play.")
|
||
else:
|
||
await self.reply(ctx, "❌ No tracks found in library")
|
||
|
||
async def cmd_dl(self, ctx: MessageContext, arg: str):
|
||
if not self.ytdlp:
|
||
await self.reply(ctx, "❌ yt-dlp is disabled")
|
||
return
|
||
if not arg:
|
||
await self.reply(ctx, "Usage: !dl <url|query|#num>")
|
||
return
|
||
|
||
target_url = None
|
||
title = "Unknown"
|
||
|
||
if arg.startswith(('http', 'www')):
|
||
target_url = arg
|
||
title = "URL"
|
||
elif arg.startswith('#') and ctx.sender_bare in self._search_results:
|
||
try:
|
||
idx = int(arg[1:]) - 1
|
||
results = self._search_results[ctx.sender_bare]
|
||
if 0 <= idx < len(results):
|
||
res = results[idx]
|
||
target_url = f"https://youtube.com/watch?v={res.id}" if 'youtube' in res.url else res.url
|
||
title = res.title
|
||
except ValueError:
|
||
pass
|
||
|
||
if not target_url:
|
||
await self.reply(ctx, f"🔍 Searching: {arg}...")
|
||
res = await self.ytdlp.search(arg)
|
||
if res:
|
||
target_url = res[0].url
|
||
title = res[0].title
|
||
await self.reply(ctx, f"🎯 Found: {title}")
|
||
else:
|
||
await self.reply(ctx, "❌ No results found")
|
||
return
|
||
|
||
await self.reply(ctx, f"⬇️ Downloading: {title}...")
|
||
path = await self.ytdlp.download(target_url)
|
||
|
||
if path:
|
||
track = await self.playlist_manager._create_track(path)
|
||
if track:
|
||
self.playlist_manager.library.append(track)
|
||
self.playlist_manager.add_to_playlist(track)
|
||
await self.reply(ctx, f"✅ Downloaded: {track.display_name}")
|
||
else:
|
||
await self.reply(ctx, "✅ Downloaded file")
|
||
else:
|
||
await self.reply(ctx, "❌ Download failed")
|
||
|
||
async def cmd_scan(self, ctx: MessageContext, arg: str):
|
||
c = await self.playlist_manager.scan_library()
|
||
self.playlist_manager.add_library_to_playlist()
|
||
await self.reply(ctx, f"✅ Found {c} tracks")
|
||
|
||
async def cmd_url(self, ctx: MessageContext, arg: str):
|
||
if self.streamer:
|
||
await self.reply(ctx, f"🔗 Stream: {self.streamer.stream_url}")
|
||
else:
|
||
await self.reply(ctx, "❌ HTTP streaming is disabled")
|
||
|
||
async def cmd_callers(self, ctx: MessageContext, arg: str):
|
||
if self.jingle:
|
||
sessions = self.jingle.get_active_sessions()
|
||
await self.reply(ctx, f"📞 Active callers: {len(sessions)}")
|
||
else:
|
||
await self.reply(ctx, "❌ Jingle/calls are disabled")
|
||
|
||
async def cmd_hangup(self, ctx: MessageContext, arg: str):
|
||
if self.jingle:
|
||
count = len(self.jingle.sessions)
|
||
for s in list(self.jingle.sessions.values()):
|
||
await self.jingle.stop_session(s.sid)
|
||
await self.reply(ctx, f"🔴 Ended {count} call(s)")
|
||
else:
|
||
await self.reply(ctx, "❌ Jingle/calls are disabled")
|
||
|
||
async def cmd_memory(self, ctx: MessageContext, arg: str):
|
||
mem_mb = self._get_memory_mb()
|
||
tasks = len(self._managed_tasks)
|
||
sessions = len(self.jingle.sessions) if self.jingle else 0
|
||
cache_size = len(self._search_results)
|
||
|
||
status = f"""📊 **Memory Status:**
|
||
• Memory: {mem_mb:.1f} MB
|
||
• Managed Tasks: {tasks}
|
||
• Jingle Sessions: {sessions}
|
||
• Search Cache: {cache_size}"""
|
||
|
||
if self.streamer:
|
||
status += f"\n• HTTP Listeners: {self.streamer.listener_count}"
|
||
|
||
await self.reply(ctx, status)
|
||
|
||
async def cmd_queue(self, ctx: MessageContext, arg: str):
|
||
queue = self.playlist_manager.get_queue(10)
|
||
current = self.playlist_manager.queue.current
|
||
|
||
lines = []
|
||
if current:
|
||
lines.append(f"▶️ Now: {current.display_name}")
|
||
|
||
if queue:
|
||
lines.append("\n📋 Up Next:")
|
||
for i, track in enumerate(queue, 1):
|
||
lines.append(f" {i}. {track.display_name}")
|
||
else:
|
||
lines.append("\n📋 Queue is empty")
|
||
|
||
lines.append(f"\n📊 {self.playlist_manager.queue.size} in queue")
|
||
await self.reply(ctx, "\n".join(lines))
|
||
|
||
async def cmd_add_queue(self, ctx: MessageContext, arg: str):
|
||
if not arg:
|
||
await self.reply(ctx, "Usage: !addq <query>")
|
||
return
|
||
|
||
results = self.playlist_manager.search_library(arg, limit=1)
|
||
if results:
|
||
track = results[0]
|
||
if self.playlist_manager.add_to_queue(track):
|
||
await self.reply(ctx, f"➕ Added to queue: {track.display_name}")
|
||
else:
|
||
await self.reply(ctx, "❌ Queue is full")
|
||
else:
|
||
await self.reply(ctx, "❌ Not found in library. Try !dl first.")
|
||
|
||
async def cmd_play_next(self, ctx: MessageContext, arg: str):
|
||
if not arg:
|
||
await self.reply(ctx, "Usage: !playnext <query>")
|
||
return
|
||
|
||
results = self.playlist_manager.search_library(arg, limit=1)
|
||
if results:
|
||
track = results[0]
|
||
if self.playlist_manager.add_next_to_queue(track):
|
||
await self.reply(ctx, f"⏭️ Playing next: {track.display_name}")
|
||
else:
|
||
await self.reply(ctx, "❌ Queue is full")
|
||
else:
|
||
await self.reply(ctx, "❌ Not found in library")
|
||
|
||
async def cmd_clear_queue(self, ctx: MessageContext, arg: str):
|
||
self.playlist_manager.clear_queue()
|
||
await self.reply(ctx, "🗑️ Queue cleared")
|
||
|
||
async def cmd_remove_queue(self, ctx: MessageContext, arg: str):
|
||
if not arg:
|
||
await self.reply(ctx, "Usage: !removeq <number>")
|
||
return
|
||
|
||
try:
|
||
idx = int(arg) - 1
|
||
track = self.playlist_manager.remove_from_queue(idx)
|
||
if track:
|
||
await self.reply(ctx, f"🗑️ Removed: {track.display_name}")
|
||
else:
|
||
await self.reply(ctx, "❌ Invalid index")
|
||
except ValueError:
|
||
await self.reply(ctx, "❌ Invalid number")
|
||
|
||
async def cmd_shuffle_queue(self, ctx: MessageContext, arg: str):
|
||
self.playlist_manager.shuffle_queue()
|
||
await self.reply(ctx, "🔀 Queue shuffled")
|
||
|
||
async def cmd_history(self, ctx: MessageContext, arg: str):
|
||
history = self.playlist_manager.queue.get_history(10)
|
||
if history:
|
||
lines = ["📜 Recently Played:"]
|
||
for i, track in enumerate(reversed(history), 1):
|
||
lines.append(f" {i}. {track.display_name}")
|
||
await self.reply(ctx, "\n".join(lines))
|
||
else:
|
||
await self.reply(ctx, "📜 No history yet")
|
||
|
||
async def main():
|
||
import argparse
|
||
p = argparse.ArgumentParser(description="XMPP Radio Bot")
|
||
p.add_argument('-c', '--config', default='config.ini', help='Config file path')
|
||
p.add_argument('-v', '--verbose', action='store_true', help='Enable verbose logging')
|
||
args = p.parse_args()
|
||
|
||
if args.verbose:
|
||
logging.getLogger().setLevel(logging.DEBUG)
|
||
|
||
config = load_config(args.config)
|
||
bot = RadioBot(config)
|
||
bot.connect()
|
||
|
||
try:
|
||
await bot.disconnected
|
||
except KeyboardInterrupt:
|
||
logger.info("Shutting down...")
|
||
finally:
|
||
if bot._gc_task:
|
||
bot._gc_task.cancel()
|
||
try:
|
||
await asyncio.wait_for(bot._gc_task, timeout=2.0)
|
||
except (asyncio.CancelledError, asyncio.TimeoutError):
|
||
pass
|
||
|
||
if bot._memory_monitor_task:
|
||
bot._memory_monitor_task.cancel()
|
||
try:
|
||
await asyncio.wait_for(bot._memory_monitor_task, timeout=2.0)
|
||
except (asyncio.CancelledError, asyncio.TimeoutError):
|
||
pass
|
||
|
||
for task in list(bot._managed_tasks):
|
||
task.cancel()
|
||
|
||
if bot._managed_tasks:
|
||
await asyncio.gather(*bot._managed_tasks, return_exceptions=True)
|
||
bot._managed_tasks.clear()
|
||
|
||
if bot.streamer:
|
||
await bot.streamer.stop()
|
||
|
||
if bot.jingle:
|
||
await bot.jingle.end_all_sessions()
|
||
|
||
bot._search_results.clear()
|
||
|
||
gc.collect()
|
||
gc.collect()
|
||
gc.collect()
|
||
|
||
logger.info("👋 Bot stopped cleanly")
|
||
|
||
if __name__ == '__main__':
|
||
asyncio.run(main()) |