#!/usr/bin/env python3 import slixmpp import asyncio import logging import configparser import json import random import re import aiohttp import operator import mimetypes import base64 from pathlib import Path from typing import Dict, List, Optional, Any from difflib import get_close_matches from dataclasses import dataclass, field from io import BytesIO from urllib.parse import urlencode, quote from slixmpp.exceptions import IqError, IqTimeout # The MOST ESSENTIAL PART of the code! N_QUOTES = [ "Did you just slap me with that arm?", "Yeah sorry, it's just my head kinda hurts.", "Hey, are you new to our squad?", "You're a little, uh, short for a Disassembly Drone...", "I'm Serial Designation N, nice to meet you!", "I'm kind of the leader of the squad in this city.", "That's not true, everyone tells me I'm useless and terrible.", "Wait! I'm not supposed to tell you that part! Biscuits...", "Well, honesty is the best policy.", "I also can't seem to remember the past 3 hours of my life, ah!, but I'm sure that'll sort itself out.", "Hoo, stuck yourself?", "Just pop it in your mouth. Our saliva neutralizes the nanites, otherwise I'd be constantly disassembling myself.", "Sure! I love doing anything.", "Sweet, uh, I'm open to new things I guess.", "Consider it err... repressed!", "No worries, I'm N, but a whole letter is a lot to remember! Ha haha!", "So obviously a lot of mutual respect there, but secretly I actually kind of have a crush on her.", "YOU CAN'T TELL HER OKAY?!", "J's awesome!", "Hey, let me give you the tour!", "Outside are the corpse wall, thingies, and here are the buttons!", "Beep, boop, bop, bop bop, bop, beep, boop, bop bop, beep...", "More of a one use missile; they never taught us how to land.", "Other than ingesting their WARM, SWEET oil to avoid overheating and dying, I guess I just wanna be useful.", "I was given a job, and I always wanna try my best.", "Oh my! You sure are rebellious! It's kind of exciting...", "...but not as fun as, uh, following the rules.", "You know, I left an-an extremely dangerous weapo- u-uh, excuse, o-outside!", "Hey, fellas!", "Ooh, deal me in, I love Rummy.", "Wait, no, ah, I'm going to murder everyone...", "...Rain check!", "I'm sorry. I really enjoyed our time together, but I can't have you shooting V with that thing.", "You, me, name, remember?", "I get the feeling the company doesn't actually love robots, and, like, we might be robots?", "...I've made a terrible mistake. It's cool how immediately I could tell.", "Th-Thanks J, al-always looking-ng out for me. Yo-you're awesome.", "A-ah, biscuits-s. I-I'm sor-rry. I ruin-ned your card game, that made you have an awk-kward moment with your dad.", "Being rebellious is a lot harder than it loo-oks. Thanks for showing me the ropes.", "Tha-at's super fa-air. Ah, I screwed up.", "Oh uh, J, you're sometimes kinda mean to me, and I wish you weren't.", "Just some constructive criticism!", "(J please.)", "AAAH, MY MIND'S IN A WEIRD PLACE, DON'T READ INTO THIS!", "UZI! I'm so, so sorry, have fun repressing this, heh-", "Huh? Oh!, uh, N! I'm an angsty rebellious Disassembly Drone now.", "Nice to meet you, Mr. Uzi-", "I'd join you if the sun didn't kill me. Hope you're having important character growth or something, though!", "I was the PILOT? That's aweso-! (I crashed and ruined everything...)", "Spaceship pilot. Origin story.", "I do want to be dapper...", "We can't interact with the workers anymore V... we're too dangerous.", "I'm not bringing you for prom murder, V!", "J went holo-spooky-snake-crab and we maybe grew up in a haunted mansion!", "Aren't you worried we have no idea what we even are?", "V, if you're hiding something, we can figure it out together", "Even if we each only have pieces", "No! No time!", "Dapper N~", "I could help", "But you probably don't want me...", "Dapper Buddies!", "This isn't what i expected at all!", "Thank you, Lizzy-!", "V, you kind of suck.", "Welcome, campers! Let's sound off! One, two...", "To the bunks!", "Close one!", "V, we can't hurt Uzi!", "I don't know what you are talking about because you won't tell me!", "What are you so afraid of!?", "I am! Uzi is, she's a kid like us, V! What is wrong with you!?", "Easy there, buddy!", "What did I say about antagonizing her?", "Uzi, you want to tell me what's the matter?", "Before we met, scary stuff was actually...pretty scary, and tonight too, 'cause you weren't with me to make it fun somehow, I kind of forgot what that was like.", "Then we'll stick together!", "Kidding! Just avoid another whole spire!", "Baby steps. Together?", "Us as lizards. J drew herself.", "Don't give me those eyes.", "Ah, we'll ask Tessa, OK? If not... Movie night!", "Hi, Tessa.", "You guys are... locking her up?", "I-I-I told her to say that! And brought her here. I-I just... hate orders...?", "And your fan's dumb...?", "NO! AAAH! STOP!", "Go get it, goobers!", "WOAH! WE WERE BUDDIES! ...darkXwolf17...?", "So, these are memories, and future me is dead at a sleepover?", "'Cause... 'Cause of you? 'Cause you're doing that? OW!", "...'Kay. You kind of seem like an evil ghost witch, though. AH!", "Why? Who are you?", "Ugh, J has it...", "Psst! Tessa!", "We need the basement key. This bird's from the future.", "Who? Our Cyn? Nah, she's cool.", "Not creepy! Sweet!", "DARKXWOLF17! NOO-HO-HO-HO-HOOO!", "Is V... okay? Like, in the future?", "Not dealing with this great, to be immediately honest...! ...Okay, it's gone.", "Whoa, Cyn! For an eldritch... uh, monster thingy—", "I got you.", "T-Tessa...?", "(Chuckling) It is you.", "Why?", "And for Mom backstory. Right, dude?", "(Giggling like a child) It tickles! The bad version, though!", "...Doing great, dude.", "Then Uzi... You can fix her, right? That's why you're here?", "V! Is Uzi–", "''IT SUCKS PRETTY HARD''", "DON'T!", "Uzi, no!", "V, come on!", "NO, NO, NO! V, WE NEED YOU!", "V, PLEASE!", "Uzi, help!", "Use your Solver!", "Oh, no no, it's okay.", "We're not going to hurt you. Okay?", "I deserve this, I deserve this, I deserve this!", "T-Thank you mine shaft trauma ghost, I can hear you fine from there!", "Uzi's dad? What-", "Sorry, ma'am. I won't keep secrets from Uzi anymore...even if you say it might hurt her", "Gave her ungodly eldritch genetics!", "I promise.…to wait until you're ready to tell her yourself?", "You knew about the patch. Yes, or no? One chance.", "Hey.. buddy... Yeah, that should help...", "All I know is.. I need you. To figure things out... together..?", "NORI! I should mention Uzi and I-!", "HahaohaAAAAAAAAAAAAAH!!!!", "UZI!!! THAT WAS YOUR MOM! WAIT, NO! AAAAAHHHHHHH!", "Uzi DON'T YOU DARE!!!", "Heeheehee! Spaceship pilot!", "AAAACK! Sorry, I didn't know- Oh.", "Yeah, I'm kind of, like, actually mad about what you did. But we can -- TALK LATER!!", "V, STOP!", "No more tricks.", "I dunno, I feel like we need a secret handshake or something, right?", "Uzi! Uzi!", "UUUUUUUUUZIIIIIII!!!!", "It's you!", "Whooooo-hoo-hoo-hoo! That's my girlfriend! " ] EMOJIS = ['1️⃣','2️⃣','3️⃣','4️⃣','5️⃣','6️⃣','7️⃣','8️⃣','9️⃣','🔟'] # For dictionary TDK_HEADERS = { 'User-Agent': 'Mozilla/5.0 (Linux; Android 10; K) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/137.0.0.0 Safari/537.36', 'Accept': 'application/json, text/javascript, */*; q=0.01', 'Accept-Language': 'en-US,en;q=0.5', 'Accept-Encoding': 'gzip, deflate, br, zstd', 'X-Requested-With': 'XMLHttpRequest', 'DNT': '1', 'Sec-GPC': '1', 'Connection': 'keep-alive', 'Referer': 'https://sozluk.gov.tr/', 'Sec-Fetch-Dest': 'empty', 'Sec-Fetch-Mode': 'cors', 'Sec-Fetch-Site': 'same-origin', 'sec-ch-ua-platform': '"Android"', 'sec-ch-ua': '"Not/A)Brand";v="8", "Chromium";v="137", "Google Chrome";v="137"', 'sec-ch-ua-mobile': '?1', 'Save-Data': 'on', 'Priority': 'u=0' } class DataManager: def __init__(self, filename): self.filename = Path(filename) self.data = self._load() def _load(self): if not self.filename.exists(): return {} try: return json.loads(self.filename.read_text(encoding='utf-8')) except: return {} def save(self): self.filename.write_text(json.dumps(self.data, indent=2), encoding='utf-8') def get(self, key, default=None): return self.data.get(key, default) def set(self, key, value): self.data[key] = value self.save() # Handles image uploads using your xmpp server or catbox/litterbox (default 72 hours removal) class AsyncFileUploader: def __init__(self, client, service='xmpp', api_key=None): self.client = client self.service = service.lower() self.api_key = api_key async def upload(self, data: bytes, filename: str, mime_type: str) -> Optional[str]: try: if self.service == 'xmpp': input_file = BytesIO(data) return await self.client['xep_0363'].upload_file( input_file=input_file, filename=filename, size=len(data), content_type=mime_type, timeout=60 ) elif self.service == 'catbox': url = "https://catbox.moe/user/api.php" form = aiohttp.FormData() form.add_field('reqtype', 'fileupload') if self.api_key: form.add_field('userhash', self.api_key) form.add_field('fileToUpload', data, filename=filename, content_type=mime_type) async with aiohttp.ClientSession() as s: async with s.post(url, data=form) as r: if r.status == 200: return (await r.text()).strip() elif self.service == 'litterbox': url = "https://litterbox.catbox.moe/resources/internals/api.php" form = aiohttp.FormData() form.add_field('reqtype', 'fileupload') form.add_field('time', '72h') form.add_field('fileToUpload', data, filename=filename, content_type=mime_type) async with aiohttp.ClientSession() as s: async with s.post(url, data=form) as r: if r.status == 200: return (await r.text()).strip() except Exception as e: logging.error(f"Upload failed: {e}") return None @dataclass class Poll: id: str room: str creator: str question: str options: List[str] votes: Dict[str, int] = field(default_factory=dict) anon: bool = False max_votes: Optional[int] = None closed: bool = False class PollManager: def __init__(self, bot): self.bot = bot self.polls: Dict[str, Poll] = {} self.cnt = 0 def create(self, room, sender, args): parts = re.findall(r'"([^"]+)"', args) if len(parts) < 3: return f"Usage: {self.bot.trigger}poll \"Question\" \"Opt1\" \"Opt2\" [anon=true] [votes=N]" q, opts = parts[0], parts[1:11] anon = 'anon=true' in args.lower() max_v = None mv_match = re.search(r'votes=(\d+)', args) if mv_match: max_v = int(mv_match.group(1)) self.cnt += 1 pid = str(self.cnt) p = Poll(pid, room, sender, q, opts, {}, anon, max_v, False) self.polls[pid] = p return self._announce(p) def _announce(self, p): lines = [f"📊 **Poll {p.id}**: {p.question}"] for i, o in enumerate(p.options): lines.append(f"{EMOJIS[i]} {o}") lines.append(f"Vote: {self.bot.trigger}vote {p.id} ") if p.anon: lines.append("🔒 Anonymous voting") return "\n".join(lines) def vote(self, sender, args): parts = args.split() if len(parts) < 2: return "Usage: vote " pid, choice = parts[0], parts[1] if pid not in self.polls: return "Poll not found." p = self.polls[pid] if p.closed: return "Poll closed." try: idx = int(choice) - 1 if not (0 <= idx < len(p.options)): raise ValueError p.votes[sender] = idx return "Vote cast." except: return "Invalid option." def close(self, sender, args): pid = args.strip() if pid not in self.polls: return "Poll not found." p = self.polls[pid] if sender != p.creator and not self.bot.is_mod(p.room, sender): return "Only creator can close." p.closed = True return self._results(p) def _results(self, p): counts = [0] * len(p.options) for v in p.votes.values(): if 0 <= v < len(counts): counts[v] += 1 total = sum(counts) lines = [f"📊 **Results** - {p.question}"] for i, o in enumerate(p.options): pct = (counts[i] / total * 100) if total else 0 lines.append(f"{EMOJIS[i]} {o}: {counts[i]} ({pct:.1f}%)") return "\n".join(lines) class UnoGame: def __init__(self, room, bot): self.room = room self.bot = bot self.players = [] self.hands = {} self.deck = [] self.discard = [] self.turn = 0 self.direction = 1 self.started = False self.color = None def join(self, nick): if self.started: return "Started." if nick not in self.players: self.players.append(nick) return f"{nick} joined Uno." return "Already joined." def start(self): if len(self.players) < 2: return "Need 2+ players." self.started = True colors = ['red','blue','green','yellow'] vals = [str(i) for i in range(10)] + ['skip','reverse','+2'] self.deck = [f"{c}_{v}" for c in colors for v in vals] * 2 self.deck += ['wild_wild', 'wild_+4'] * 4 random.shuffle(self.deck) for p in self.players: self.hands[p] = [self.deck.pop() for _ in range(7)] self._pm_hand(p) top = self.deck.pop() while 'wild' in top: self.deck.append(top) random.shuffle(self.deck) top = self.deck.pop() self.discard.append(top) self.color = top.split('_')[0] return f"Uno Started! Top: {self._fmt(top)}. Turn: {self.players[self.turn]}" def play(self, nick, arg): if not self.started or self.players[self.turn] != nick: return hand = self.hands[nick] parts = arg.lower().split() declared_color = None valid_colors = ['red','blue','green','yellow'] for p in parts: if p in valid_colors: declared_color = p sel_card = None for c in hand: c_simple = c.replace('_', ' ').lower() if arg.lower() in c_simple: sel_card = c break if "wild" in parts and "wild" in c_simple: if "+4" in arg and "+4" in c: sel_card = c break elif "+4" not in arg and "+4" not in c: sel_card = c break if not sel_card: return "Card not in hand." top = self.discard[-1] t_col = self.color t_val = top.split('_')[1] s_col, s_val = sel_card.split('_') valid = False if 'wild' in s_col: valid = True if not declared_color: asyncio.create_task(self.bot.send_private_muc(self.room, nick, "⚠️ Please specify a color (e.g. `play wild red`).")) self._pm_hand(nick) return "Specify color (e.g. play wild red)" self.color = declared_color elif s_col == t_col or s_val == t_val: valid = True self.color = s_col if not valid: return "Invalid move." hand.remove(sel_card) self.discard.append(sel_card) msg = f"{nick} played {self._fmt(sel_card)}." if not hand: self.started = False return f"{msg}\n🏆 {nick} Wins!" if len(hand) == 1: msg += " UNO!" if 'skip' in s_val: self._advance() elif 'reverse' in s_val: self.direction *= -1 if len(self.players)==2: self._advance() elif '+2' in s_val: self._advance() self._draw(self.players[self.turn], 2) elif '+4' in s_val: self._advance() self._draw(self.players[self.turn], 4) self._advance() return f"{msg}\nNext: {self.players[self.turn]} (Color: {self.color})" def draw(self, nick): if self.players[self.turn] != nick: return self._draw(nick, 1) self._advance() return f"{nick} drew." def _draw(self, nick, n): for _ in range(n): if not self.deck: self.deck = self.discard[:-1] self.discard = [self.discard[-1]] random.shuffle(self.deck) if self.deck: self.hands[nick].append(self.deck.pop()) self._pm_hand(nick) def _advance(self): self.turn = (self.turn + self.direction) % len(self.players) next_player = self.players[self.turn] self._pm_hand(next_player) def _fmt(self, c): if 'wild' in c: return f"🌈 {c.split('_')[1]}" em = {'red':'🟥','blue':'🟦','green':'🟩','yellow':'🟨'} return f"{em.get(c.split('_')[0],'')} {c.split('_')[1]}" def _pm_hand(self, nick): asyncio.create_task(self.bot.send_private_muc(self.room, nick, " | ".join(self._fmt(c) for c in self.hands[nick]))) # JACK IMPLEMENTATION class MultiplayerBlackjackGame: def __init__(self, room, bot): self.room = room self.bot = bot self.players = [] self.hands = {} self.d_hand = [] self.deck = [] self.turn_idx = 0 self.status = "JOINING" suits = ['♠', '♥', '♦', '♣'] ranks = ['2','3','4','5','6','7','8','9','10','J','Q','K','A'] self.deck = [{'rank': r, 'suit': s} for s in suits for r in ranks] * 2 random.shuffle(self.deck) def join(self, nick): if self.status != "JOINING": return "Game already started." if nick in self.players: return "Already joined." self.players.append(nick) return f"{nick} joined Blackjack." def start(self): if len(self.players) < 1: return "Need players." self.status = "PLAYING" self.turn_idx = 0 for p in self.players: self.hands[p] = [self.deck.pop(), self.deck.pop()] self.d_hand = [self.deck.pop(), self.deck.pop()] return self._status_msg() def hit(self, nick): if self.status != "PLAYING": return "Not playing." if self.players[self.turn_idx] != nick: return "Not your turn." self.hands[nick].append(self.deck.pop()) val = self._calc(self.hands[nick]) res = f"{nick} hits: {self._fmt(self.hands[nick])} ({val})" if val >= 21: res += " (Bust!)" if val > 21 else " (21!)" self._advance_turn() return res def stand(self, nick): if self.status != "PLAYING": return "Not playing." if self.players[self.turn_idx] != nick: return "Not your turn." res = f"{nick} stands." self._advance_turn() return res def _advance_turn(self): self.turn_idx += 1 if self.turn_idx >= len(self.players): self.status = "DEALING" def resolve_game(self): while self._calc(self.d_hand) < 17: self.d_hand.append(self.deck.pop()) d_val = self._calc(self.d_hand) lines = [f"🎰 **Results** (Dealer: {self._fmt(self.d_hand)} - {d_val})"] for p in self.players: p_val = self._calc(self.hands[p]) if p_val > 21: lines.append(f"❌ {p}: {p_val} (Bust)") elif d_val > 21 or p_val > d_val: lines.append(f"✅ {p}: {p_val} (Win)") elif p_val == d_val: lines.append(f"⚖️ {p}: {p_val} (Push)") else: lines.append(f"❌ {p}: {p_val} (Loss)") return "\n".join(lines) def _status_msg(self): if self.status == "PLAYING": curr = self.players[self.turn_idx] msg = f"🃏 **Blackjack**\nDealer: [{self.d_hand[0]['rank']}{self.d_hand[0]['suit']}] [??]\n" for p in self.players: mark = "👉 " if p == curr else "" msg += f"{mark}{p}: {self._fmt(self.hands[p])} ({self._calc(self.hands[p])})\n" return msg return "" def _calc(self, hand): val = 0 aces = 0 for c in hand: if c['rank'] in ['J','Q','K']: val += 10 elif c['rank'] == 'A': val += 11; aces += 1 else: val += int(c['rank']) while val > 21 and aces: val -= 10; aces -= 1 return val def _fmt(self, hand): return " ".join([f"[{c['rank']}{c['suit']}]" for c in hand]) # MAIN CLASS class RevoltBot(slixmpp.ClientXMPP): def __init__(self): self.cfg = configparser.ConfigParser() if not Path('config.ini').exists(): raise FileNotFoundError("config.ini missing") self.cfg.read('config.ini') super().__init__(self.cfg['XMPP']['jid'], self.cfg['XMPP']['password']) self.trigger = self.cfg['Bot']['trigger'] self.nick = self.cfg['Bot']['nickname'] self.rooms = [r.strip() for r in self.cfg['XMPP']['rooms'].split(',')] self.db = DataManager('bot_data.json') self.copypastas = DataManager('copypastas.json') self.warnings = self.db.get('warnings', {}) self.wordlist = set() self.config_moderators = set() if self.cfg.has_option('Bot', 'moderators'): raw_mods = self.cfg['Bot']['moderators'] self.config_moderators = {m.strip().lower() for m in raw_mods.split(',') if m.strip()} # Prefers English -> German -> French -> Espanol for word definitions # Languages are chosen because they are the most spoken languages with Latin alphabet # Configuration allows change this part self.wiktionary_priority = ['en', 'de', 'fr', 'es'] if self.cfg.has_option('Features', 'wiktionary_priority'): raw_p = self.cfg['Features']['wiktionary_priority'] self.wiktionary_priority = [p.strip() for p in raw_p.split(',') if p.strip()] self.uploader = AsyncFileUploader(self, self.cfg['Bot'].get('file_host', 'litterbox'), self.cfg['Bot'].get('file_host_api_key')) self.polls = PollManager(self) self.games = {} self.register_plugin('xep_0030') self.register_plugin('xep_0045') self.register_plugin('xep_0199') self.register_plugin('xep_0363') self.register_plugin('xep_0054') self.register_plugin('xep_0060') self.add_event_handler("session_start", self.start) self.add_event_handler("groupchat_message", self.on_group) self.add_event_handler("message", self.on_pm) # BOT STARTUP async def start(self, event): self.send_presence() await self.get_roster() # Downloads slurs from the database to block them while moderating url = self.cfg['Features'].get('wordlist_url') if url: try: async with aiohttp.ClientSession() as s: async with s.get(url) as r: if r.status == 200: self.wordlist = set((await r.text()).lower().splitlines()) logging.info(f"Loaded {len(self.wordlist)} banned words.") except Exception as e: logging.error(f"Wordlist error: {e}") for room in self.rooms: self.join_room_with_retry(room) def join_room_with_retry(self, room): room_jid = str(slixmpp.JID(room).bare) n = self.cfg['XMPP'].get(f'nickname.{room}', self.nick) try: self.plugin['xep_0045'].join_muc(room_jid, n) logging.info(f"Joined {room_jid} as {n}") except Exception as e: logging.error(f"Could not join {room}: {e}") async def on_group(self, msg): if msg['mucnick'] == self.nick: return room = msg['from'].bare body = msg['body'].strip() mod_enabled = self.cfg['Moderation'].get('moderation_enabled', 'ALL').upper() should_moderate = True if mod_enabled == 'NONE': should_moderate = False elif mod_enabled == 'NON_NSFW' and self.is_nsfw_room(room): should_moderate = False # Warn a user if they use a slur! try: if should_moderate and self.wordlist: if any(re.search(r'\b'+re.escape(w)+r'\b', body.lower()) for w in self.wordlist): await self.warn(room, msg['mucnick'], "Message includes word inside the filter.") return if not body.startswith(self.trigger): return parts = body[len(self.trigger):].split(maxsplit=1) cmd = parts[0].lower() args = parts[1] if len(parts) > 1 else "" await self.handle_cmd(room, msg, cmd, args) except Exception as e: logging.error(f"Error handling message: {e}") async def on_pm(self, msg): if msg['type'] == 'error': return if msg['from'].bare in self.rooms and not msg['from'].resource: return if msg['from'].bare == self.boundjid.bare: return if msg['type'] == 'groupchat': return body = msg['body'].strip() if not body: return if body.startswith(self.trigger): parts = body[len(self.trigger):].split(maxsplit=1) cmd = parts[0].lower() if cmd == 'help': await self.send_private_msg_by_jid(msg['from'], await self._generate_help_message(False)) elif cmd == 'dm': await self.send_private_msg_by_jid(msg['from'], "Hi! :3") async def handle_cmd(self, room, msg, cmd, args): sender = msg['mucnick'] # Too lazy to remove cdn links if cmd == 'ihnk': await self.send_group(room, "https://cdn.purplebored.pl/uploads/y2mate.com%20-%20I%20have%20no%20knowledge%20of%20any%20of%20this%20ThIs%20iS%20sO%20bIzZaRe_480p.mp4") elif cmd == 'scary': await self.send_group(room, "https://cdn.purplebored.pl/uploads/y2mate.com%20-%20oh%20oh%20scary%20oh%20oh%20shiver%20me%20timbers_360p.mp4") # MOST ESSENTIAL COMMAND OF THE BOT elif cmd == 'n': await self.send_group(room, random.choice(N_QUOTES)) elif cmd == 'cl': await self.send_group(room, f"Common {args or 'Everyone'} L") elif cmd == 'l': await self.send_group(room, "L") elif cmd == 'test': await self.send_group(room, "Ig it works ™️") elif cmd == 'credits': await self.send_group(room, "Original by Purplebored. Port by agent-n.") elif cmd == 'invite': await self.send_private_muc(room, sender, "Contact the hoster of this instance to add your room.") elif cmd == 'ping': await self.send_group(room, "Pong!") elif cmd == 'video': await self.send_group(room, await self._generate_video_list()) elif cmd in ['calculate', 'calc']: parts = args.split() if len(parts) == 3: try: n1 = float(parts[0]) op = parts[1] n2 = float(parts[2]) ops = {'+':operator.add, '-':operator.sub, '*':operator.mul, '/':operator.truediv} if op in ops: if op == '/' and n2 == 0: await self.send_group(room, "Division by zero is not allowed.") else: await self.send_group(room, f"Result: {ops[op](n1, n2)}") else: await self.send_group(room, "Invalid op.") except: await self.send_group(room, "Error.") else: await self.send_group(room, f"Usage: {self.trigger}calc num + num") elif cmd == 'dm': await self.send_private_muc(room, sender, "Hi :3") # AVATAR DOWNLOAD AND REUPLOAD elif cmd == 'avatar': target_nick = args.strip() if args else sender try: target_jid = self.plugin['xep_0045'].get_jid_property(room, target_nick, 'jid') is_muc_occupant = False if not target_jid: target_jid = f"{room}/{target_nick}" is_muc_occupant = True avatar_data, mime_type = await self._get_avatar_data(target_jid, is_muc_occupant=is_muc_occupant) if avatar_data: ext = mimetypes.guess_extension(mime_type) or ".png" filename = f"avatar{ext}" url = await self.uploader.upload(avatar_data, filename, mime_type) if url: await self.send_group(room, f"{url}") else: await self.send_group(room, "Avatar retrieved but upload failed (literally 1984).") else: await self.send_group(room, f"No avatar found for {target_nick}. Please try again later...") except Exception as e: logging.error(f"Avatar error: {e}") await self.send_group(room, "Error fetching avatar.") # WIKTIONARY LOOKUP elif cmd in ['wiki', 'wiktionary']: if not args: await self.send_group(room, "Usage: wiki ") return term = args.strip() url = f"https://en.wiktionary.org/api/rest_v1/page/definition/{quote(term)}?redirect=true" headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/142.0.0.0 Safari/537.36', 'Accept': 'application/json; charset=utf-8; profile="https://www.mediawiki.org/wiki/Specs/definition/0.8.0"' } try: data = await self.fetch(url, headers=headers, json=True) if not data or 'error' in data: await self.send_group(room, "Word not found.") return available_codes = list(data.keys()) if 'zh' in available_codes: available_codes.remove('zh') priority_codes = self.wiktionary_priority selected_code = next((c for c in priority_codes if c in available_codes), None) if not selected_code and available_codes: selected_code = available_codes[0] if not selected_code: await self.send_group(room, "No definitions found.") return entries = data[selected_code] lang_name = entries[0].get('language', selected_code) found_defs = [] for entry in entries: pos = entry.get('partOfSpeech', '') for d in entry.get('definitions', []): raw = d.get('definition', '') clean = re.sub(r'<[^>]+>', '', raw).strip() if clean: found_defs.append(f"({pos}) {clean}" if pos else clean) if found_defs: resp = f"📖 **{term}** ({lang_name})\n" + "\n".join(f"- {d}" for d in found_defs[:4]) await self.send_group(room, resp) else: await self.send_group(room, f"No clean definitions found in {lang_name}.") except Exception as e: logging.error(f"Wiki error: {e}") await self.send_group(room, "Error fetching definition.") elif cmd in ['tdk', 'sozluk']: if not args: await self.send_group(room, "Please provide a word to search.") return # Language Society word search (hidden by default to prevent confusion from other dictionary command) url = f"https://sozluk.gov.tr/gts?ara={args.strip()}" try: data = await self.fetch(url, headers=TDK_HEADERS, content_type_fix=True) if data and isinstance(data, list) and len(data) > 0: item = data[0] word = item.get('madde', '') meanings = item.get('anlamlarListe', []) origin = item.get('lisan', '') resp = f"📚 **{word}**" if origin: resp += f" ({origin})" resp += "\n" for idx, m in enumerate(meanings, 1): resp += f"{idx}. {m.get('anlam', '')}\n" await self.send_group(room, resp) else: await self.send_group(room, "Word not found in TDK dictionary.") except Exception as e: logging.error(f"TDK error: {e}") await self.send_group(room, "Error fetching definition.") elif cmd == 'quote': d = await self.fetch("http://api.forismatic.com/api/1.0/?method=getQuote&format=json&lang=en", json=True, content_type_fix=True) if d: try: await self.send_group(room, f"\"{d.get('quoteText','').strip()}\" - {d.get('quoteAuthor','Unknown')}") except: await self.send_group(room, "Quote error hell yeah!") # Since API is broken it uses local shitposts elif cmd == 'shitpost': p = Path('shitposts') if not p.exists(): await self.send_group(room, "Shitpost directory missing.") return images = [f for f in p.iterdir() if f.suffix.lower() in ['.jpg', '.jpeg', '.png', '.gif', '.webp']] if not images: await self.send_group(room, "No shitposts found locally.") return try: target = random.choice(images) data = target.read_bytes() mime, _ = mimetypes.guess_type(target.name) url = await self.uploader.upload(data, target.name, mime or 'application/octet-stream') if url: await self.send_group(room, url) else: await self.send_group(room, "Failed to upload local shitpost.") except Exception as e: logging.error(f"Shitpost error: {e}") await self.send_group(room, "Error sending shitpost.") elif cmd == 'neko': d = await self.fetch("https://nekos.best/api/v2/neko") if d: await self.upload_send(room, d['results'][0]['url']) elif cmd == 'cat': d = await self.fetch("https://api.thecatapi.com/v1/images/search") if d: await self.upload_send(room, d[0]['url']) elif cmd == 'dogfact': d = await self.fetch("https://dogapi.dog/api/v2/facts") if d and d['data']: await self.send_group(room, d['data'][0]['attributes']['body']) elif cmd == 'catfact': d = await self.fetch("https://meowfacts.herokuapp.com/") if d and d['data']: await self.send_group(room, d['data'][0]) elif cmd == 'urban': d = await self.fetch(f"https://api.urbandictionary.com/v0/define?term={args}") if d and d['list']: await self.send_group(room, f"📖 {d['list'][0]['definition'][:300]}...") elif cmd == 'joke': d = await self.fetch("https://v2.jokeapi.dev/joke/Any?type=twopart") if d and not d.get('error'): await self.send_group(room, f"{d['setup']}\n{d['delivery']}") elif cmd == 'pun': d = await self.fetch("https://v2.jokeapi.dev/joke/Pun?type=twopart") if d and not d.get('error'): await self.send_group(room, f"{d['setup']}\n{d['delivery']}") elif cmd == 'fact': d = await self.fetch("https://uselessfacts.jsph.pl/random.json?language=en") if d: await self.send_group(room, d['text']) elif cmd == 'advice': d = await self.fetch("https://api.adviceslip.com/advice", content_type_fix=True) if d: await self.send_group(room, d['slip']['advice']) # say command aka parrot mode. Censors slurs with * character elif cmd == 'say': text = args if self.wordlist: sorted_words = sorted(self.wordlist, key=len, reverse=True) for w in sorted_words: if not w: continue chars = [re.escape(c) for c in w] pattern_str = r"(?i)\b" + r"[\W_]*".join(chars) + r"\b" try: pattern = re.compile(pattern_str) def repl(match): return '*' * len(match.group(0)) text = pattern.sub(repl, text) except: pass await self.send_group(room, f"{sender} says: {text}") elif cmd == 'dice': await self.send_group(room, f"🎲 {random.randint(1,6)}") elif cmd == 'roll': try: limit = int(args.strip()) if limit > 0: await self.send_group(room, f"🎲 {random.randint(1, limit)}") else: await self.send_group(room, "Please pick a valid number.") except: await self.send_group(room, "Please specify a number to roll.") elif cmd == 'coinflip': await self.send_group(room, f"🪙 {random.choice(['Heads','Tails'])}") elif cmd == 'rps': opts = ['rock','paper','scissors'] b = random.choice(opts) u = args.lower().strip() if u not in opts: await self.send_group(room, "rock, paper, or scissors?") else: res = "I won!" if b == u: res = "Tie!" elif (u=='rock' and b=='scissors') or (u=='paper' and b=='rock') or (u=='scissors' and b=='paper'): res = "You won!" await self.send_group(room, f"I chose {b}. {res}") # GIF command finds and sends the searched GIF elif cmd == 'gif': k = self.cfg['Bot'].get('giphy_key') if not k: await self.send_group(room, "Giphy API key not configured.") return try: d = await self.fetch(f"https://api.giphy.com/v1/gifs/search?api_key={k}&q={args}&limit=1") if d and d['data']: url = d['data'][0]['images']['original']['url'] await self.upload_send(room, url) else: await self.send_group(room, f"No GIF found for '{args}'.") except Exception as e: logging.error(f"GIF error: {e}") await self.send_group(room, f"Error searching GIF.") elif cmd in ['kick','ban','mute','warn','unban','unmute']: if not self.is_mod(room, sender): await self.send_group(room, "Not authorized.") return target_nick = args.strip() if not target_nick: await self.send_group(room, f"Usage: {cmd} ") return in_roster = False try: roster = self.plugin['xep_0045'].get_roster(room) if target_nick in roster: in_roster = True except: pass try: if cmd == 'warn': if not in_roster: await self.send_group(room, "User not found.") return await self.warn(room, target_nick, "Manual Warning") elif cmd == 'kick': if not in_roster: await self.send_group(room, "User not found.") return await self.plugin['xep_0045'].set_role(room, target_nick, 'none', reason="Kicked by operator") await self.send_group(room, f"👢 Kicked {target_nick}. They will return I guess but anyways.") elif cmd == 'mute': if not in_roster: await self.send_group(room, "User not found.") return await self.plugin['xep_0045'].set_role(room, target_nick, 'visitor', reason="Muted by operator") await self.send_group(room, f"🤐 Muted (shutted up) {target_nick}.") elif cmd == 'unmute': if not in_roster: await self.send_group(room, "User not found.") return await self.plugin['xep_0045'].set_role(room, target_nick, 'participant', reason="Unmuted by operator") await self.send_group(room, f"🔊 Unmuted {target_nick}. {target_nick} can speak now again!") elif cmd == 'ban': if not in_roster: await self.send_group(room, "User not found.") return jid = self.plugin['xep_0045'].get_jid_property(room, target_nick, 'jid') if jid: await self.plugin['xep_0045'].set_affiliation(room, jid=jid, affiliation='outcast', reason="Banned by operator") await self.send_group(room, f"🚫 Banhammer! whomp whomp {target_nick}") else: await self.send_group(room, "Cannot ban (Hidden JID/Anonymous).") elif cmd == 'unban': target_jid = target_nick try: await self.plugin['xep_0045'].set_affiliation(room, jid=target_jid, affiliation='none', reason="Unbanned by operator") await self.send_group(room, f"🕊️ Unbanned {target_jid}. Taste of liberty!") except IqError: await self.send_group(room, f"Failed to unban {target_jid}. Ensure you are using the valid JID.") except IqError as e: await self.send_group(room, f"Permission denied: {e.iq['error']['text'] or 'Bot missing privileges'}") except Exception as e: logging.error(f"Mod command failed: {e}") await self.send_group(room, "Operation failed.") elif cmd == 'poll': res = self.polls.create(room, sender, args) await self.send_group(room, res) elif cmd == 'vote': res = self.polls.vote(sender, args) await self.send_private_muc(room, sender, res) elif cmd == 'close': res = self.polls.close(sender, args) await self.send_group(room, res) elif cmd == 'uno': sub = args.split()[0] if args else '' arg = args[len(sub):].strip() gid = f"uno_{room}" g = self.games.get(gid) if sub == 'join': if not g: g = UnoGame(room, self) self.games[gid] = g await self.send_group(room, g.join(sender)) elif sub == 'start' and g: await self.send_group(room, g.start()) elif sub == 'play' and g: res = g.play(sender, arg) if res: await self.send_group(room, res) if res and "Wins" in res: del self.games[gid] elif sub == 'draw' and g: res = g.draw(sender) if res: await self.send_group(room, res) # JACK!! elif cmd in ['bj', 'blackjack']: gid = f"bj_{room}" sub = args.split()[0].lower() if args else '' g = self.games.get(gid) if not sub: if not g: g = MultiplayerBlackjackGame(room, self) self.games[gid] = g g.join(sender) await self.send_group(room, g.start()) else: if g.status == "JOINING": await self.send_group(room, f"Lobby Open! Type `{self.trigger}bj join` to join or `{self.trigger}bj start` to begin.") else: await self.send_group(room, "Game in progress.") elif sub == 'join': if not g: g = MultiplayerBlackjackGame(room, self) self.games[gid] = g await self.send_group(room, f"Blackjack lobby created by {sender}. Type `{self.trigger}bj join`.") res = g.join(sender) await self.send_group(room, res) elif sub == 'start' and g: await self.send_group(room, g.start()) elif sub == 'hit' and g: res = g.hit(sender) await self.send_group(room, res) if g.status == "DEALING": await self.send_group(room, g.resolve_game()) del self.games[gid] elif sub == 'stand' and g: res = g.stand(sender) await self.send_group(room, res) if g.status == "DEALING": await self.send_group(room, g.resolve_game()) del self.games[gid] else: await self.send_group(room, f"Blackjack: `{self.trigger}bj` (Solo), `{self.trigger}bj join` (Multi), `{self.trigger}bj hit/stand`") # Abbreviation will be removed elif cmd == 'cp' or cmd == 'copypasta': if args.startswith('add'): _, name, txt = args.split(maxsplit=2) d = self.copypastas.get('pastas', {}) d[name] = txt self.copypastas.set('pastas', d) await self.send_group(room, "Added.") elif args == 'list': await self.send_group(room, ", ".join(self.copypastas.get('pastas',{}).keys())) else: txt = self.copypastas.get('pastas',{}).get(args) if txt: await self.send_group(room, txt) elif cmd == 'gpt': if not self.cfg.has_section('AI') or not self.cfg['AI'].get('openai_key'): await self.send_group(room, "AI not configured.") return k = self.cfg['AI']['openai_key'] url = self.cfg['AI'].get('openai_api_url') pl = { "model": self.cfg['AI'].get('openai_model'), "messages": [{"role":"system","content":self.cfg['AI'].get('openai_system_prompt')}, {"role":"user","content":args}] } headers = {"Authorization": f"Bearer {k}"} d = await self.fetch(url, method='POST', json_data=pl, headers=headers) if d and d.get('choices'): await self.send_group(room, d['choices'][0]['message']['content']) elif cmd in ['translate', 'tr']: if not self.cfg.has_section('AI') or not self.cfg['AI'].get('openai_key'): await self.send_group(room, "AI not configured.") return parts = args.split(maxsplit=1) if len(parts) < 2: await self.send_group(room, f"Usage: {self.trigger}translate ") return target_lang = parts[0] text_to_translate = parts[1] system_prompt = ( f"You are a precise translator. Your sole function is to detect the language of the input text and translate it into {target_lang}. " f"Provide only the {target_lang} translation with no additional commentary, explanations, notes, or meta-information. " "Do not include the source language name, translation notes, or any text beyond the translated output. " f"Maintain the original meaning, tone, and structure as closely as possible while ensuring natural {target_lang} expression." ) k = self.cfg['AI']['openai_key'] url = self.cfg['AI'].get('openai_api_url') pl = { "model": self.cfg['AI'].get('openai_model'), "messages": [ {"role": "system", "content": system_prompt}, {"role": "user", "content": text_to_translate} ] } headers = {"Authorization": f"Bearer {k}"} d = await self.fetch(url, method='POST', json_data=pl, headers=headers) if d and d.get('choices'): await self.send_group(room, d['choices'][0]['message']['content']) else: await self.send_group(room, "Translation failed.") elif cmd in ['r34', 'hentai', 'boobs', 'hboobs']: if not self.is_nsfw_room(room): await self.send_group(room, "NSFW not allowed here. Find a NSFW room or add the room to NSFW list smh") return if cmd == 'r34': tag = args if args else 'all' d = await self.fetch(f"https://api.rule34.xxx/index.php?page=dapi&s=post&q=index&limit=1&json=1&tags={tag}") if d and len(d) > 0 and 'file_url' in d[0]: await self.upload_send(room, d[0]['file_url']) elif cmd == 'hentai': d = await self.fetch("https://nekobot.xyz/api/image?type=hneko") if d and 'message' in d: await self.upload_send(room, d['message']) elif cmd == 'boobs': d = await self.fetch("https://nekobot.xyz/api/image?type=boobs") if d and 'message' in d: await self.upload_send(room, d['message']) elif cmd == 'hboobs': d = await self.fetch("https://nekobot.xyz/api/image?type=hboobs") if d and 'message' in d: await self.upload_send(room, d['message']) # Same thing with copypasta applies here too elif cmd in ['wa', 'wolfram']: if not args: await self.send_group(room, "Usage: wa ") else: appid = None if self.cfg.has_section('Features'): appid = self.cfg['Features'].get('wolfram_appid') if not appid and self.cfg.has_section('Bot'): appid = self.cfg['Bot'].get('wolfram_appid') if not appid: await self.send_group(room, "WolframAlpha AppID not configured.") else: try: url = f"http://api.wolframalpha.com/v1/result?appid={appid}&i={quote(args.strip())}" data = await self.fetch(url, json=False) if data: txt = data.decode('utf-8', errors='ignore').strip() await self.send_group(room, f"Wolfram says: {txt}") else: await self.send_group(room, "No short answer available.") except Exception as e: logging.error(f"Wolfram error: {e}") await self.send_group(room, "Error querying WolframAlpha.") elif cmd == 'help': await self.send_group(room, await self._generate_help_message(self.is_mod(room, sender))) elif cmd == 'mod-help': if self.is_mod(room, sender): await self.send_group(room, await self._generate_mod_help()) elif cmd == 'nsfw-help': await self.send_group(room, await self._generate_nsfw_help()) async def _generate_help_message(self, is_mod): t = self.trigger s = f"#### List of available commands:\n\n#### Practical Commands: \n" \ f"`{t}video` - Sends a list of all available videos\n" \ f"`{t}help` - Displays this command.\n" \ f"`{t}credits` - Displays bot's credit.\n" \ f"`{t}test` - Simple test command you say test bot will response.\n" \ f"`{t}invite` - Info about how to make bot join more rooms.\n" \ f"`{t}calc` {{num}} {{+, - , / , * }} {{num}} - Calculator.\n" \ f"`{t}ping` - tests the bot ping.\n" \ f"`{t}gpt` - Send commands to clankers.\n" \ f"`{t}tr` - Translate text to target language.\n" \ f"`{t}copypasta` - Add or send copypastas.\n" \ f"`{t}wiki` - Ughm acthually lookup.\n" \ f"`{t}wa` - WolframAlpha query, can convert anything to anything.\n" if is_mod: s += f"`{t}mod-help` - Mod commands.\n" s += f"`{t}nsfw-help` - Sends a list of all available nsfw commands.\n" s += f"\n#### Fun Commands: \n" \ f"`{t}dice` - Rolls a random number between 1 and 6.\n" \ f"`{t}roll` {{num}} - Rolls a random number between 1 and num.\n" \ f"`{t}say` - says what the user told it to say!.\n" \ f"`{t}cl` - Common L.\n" \ f"`{t}l` - L.\n" \ f"`{t}dm` - Whispers the user Hi :3.\n" \ f"`{t}rps` - Simple Rock paper scissors game.\n" \ f"`{t}dogfact` - Gives a random dogfact using the Dogfact API!.\n" \ f"`{t}catfact` - Gives a random Catfact using cat fact API\n" \ f"`{t}joke` - Very simple command just gives a joke using the Joke API.\n" \ f"`{t}coinflip` - a Command so easy a child could do it.\n" \ f"`{t}fact` - Gives a random useless fact.\n" \ f"`{t}urban` - uses the urban dictionary to search for the word.\n" \ f"`{t}shitpost` - Sends random shitpost from local library\n" \ f"`{t}cat` - Cat :3 \n" \ f"`{t}neko` - Neko command \n" \ f"`{t}advice` - Gives the user a life Advice \n" \ f"`{t}quote` - Gives a random quote using yet another API. \n" \ f"`{t}gif` - Allows the user to search for gifs using giphy (ratelimited sometimes)\n" \ f"`{t}pun` - Gives user a simple pun. \n" \ f"`{t}avatar` - Sends the user Avatar (Beta) \n" \ f"`{t}poll` - Democracy \n" \ f"`{t}uno` - Four colors\n" \ f"`{t}bj` - I play JACK bro! \n" return s async def _generate_mod_help(self): t = self.trigger return f"## Mod commands:\n" \ f"`{t}kick` - Kick a user.\n" \ f"`{t}ban` - Ban a user.\n" \ f"`{t}mute` - Mute a user.\n" \ f"`{t}warn` - Warn a user.\n" \ f"`{t}unban` - Unban a user.\n" \ f"`{t}unmute` - Unmute a user.\n" async def _generate_nsfw_help(self): t = self.trigger return f"### NSFW commands:\n" \ f"`{t}r34` - Rule34 search. (doesnt work on some ISPs)\n" \ f"`{t}hentai` - Grabs a random hentai image \n" \ f"`{t}boobs` - Grabs a random boobs image \n" \ f"`{t}hboobs` - Grabs a random hboobs image simple. \n" async def _generate_video_list(self): t = self.trigger return f"Videos:\n`{t}ihnk` Sends a I have no knwoledge about any of this video. \n`{t}scary` Sends the oh oh scary oh oh shiver me timbers video" async def fetch(self, url, method='GET', json_data=None, headers=None, json=True, content_type_fix=False): try: async with aiohttp.ClientSession() as s: async with s.request(method, url, json=json_data, headers=headers) as r: if r.status == 200: if json: return await r.json(content_type=None) if content_type_fix else await r.json() else: return await r.read() except Exception as e: logging.error(f"Fetch err: {e}") return None async def upload_send(self, room, url): data = await self.fetch(url, json=False) if data: mime = self._sniff_mime(data) if mime: ext = mime.split('/')[1].replace('jpeg', 'jpg') # Octet Stream is used in bin files with no known type afaik else: mime = "application/octet-stream" ext = "bin" path = url.split('?')[0].lower() if path.endswith('.png'): mime, ext = 'image/png', 'png' elif path.endswith(('.jpg', '.jpeg')): mime, ext = 'image/jpeg', 'jpg' elif path.endswith('.gif'): mime, ext = 'image/gif', 'gif' elif path.endswith('.webp'): mime, ext = 'image/webp', 'webp' filename = f"image.{ext}" l = await self.uploader.upload(data, filename, mime) if l: await self.send_group(room, l) def _sniff_mime(self, data: bytes) -> Optional[str]: if not data or len(data) < 100: return None if data.startswith(b'\x89PNG\r\n\x1a\n'): return 'image/png' if data.startswith(b'\xff\xd8\xff'): return 'image/jpeg' if data.startswith(b'GIF8'): return 'image/gif' if data.startswith(b'RIFF') and data[8:12] == b'WEBP': return 'image/webp' return None # GET AVATAR TO UPLOAD (SOMETIMES BROKEN) async def _get_avatar_data(self, jid_str, is_muc_occupant=False) -> (Optional[bytes], Optional[str]): j = slixmpp.JID(jid_str) target = j.full if is_muc_occupant else j.bare async def try_0153(use_ifrom=False): try: kwargs = {'jid': target} if use_ifrom: kwargs['ifrom'] = self.boundjid.full iq = await self.plugin["xep_0054"].get_vcard(**kwargs) photo = iq["vcard_temp"]["PHOTO"] binval = photo["BINVAL"] type_ = photo["TYPE"] if binval: data = base64.b64decode(binval.replace('\n', '').replace(' ', '').strip()) mime = self._sniff_mime(data) if not mime and type_ and len(type_.split('/')) > 1: mime = type_ if not mime: mime = "image/png" return data, mime except (IqError, Exception) as e: logging.debug(f"XEP-0153 ({'ifrom' if use_ifrom else 'std'}) failed for {target}: {e}") return None async def try_0084(use_ifrom=False): try: kwargs = {'jid': target, 'node': "urn:xmpp:avatar:data", 'max_items': 1} if use_ifrom: kwargs['ifrom'] = self.boundjid.full iq = await self.plugin["xep_0060"].get_items(**kwargs) items = iq["pubsub"]["items"]["substanzas"] if items: data_elem = items[0].xml.find("{urn:xmpp:avatar:data}data") if data_elem is not None and data_elem.text: b64_text = data_elem.text.replace('\n', '').replace(' ', '').strip() data = base64.b64decode(b64_text) mime = self._sniff_mime(data) or "image/png" return data, mime if "data" in items[0]: d = items[0]["data"] if isinstance(d, str): data = base64.b64decode(d) else: data = d return data, "image/png" except (IqError, Exception) as e: logging.debug(f"XEP-0084 ({'ifrom' if use_ifrom else 'std'}) failed for {target}: {e}") return None res = await try_0153() if res: return res res = await try_0084() if res: return res logging.debug(f"Primary avatar methods failed for {target}. Trying fallback logic.") res = await try_0153(use_ifrom=True) if res: return res res = await try_0084(use_ifrom=True) if res: return res return None, None async def warn(self, room, nick, reason): if self.cfg['Moderation'].get('moderation_enabled') == 'NONE': return k = f"{room}/{nick}" c = self.warnings.get(k, 0) + 1 self.warnings[k] = c self.db.set('warnings', self.warnings) await self.send_group(room, f"⚠️ {nick} Warn {c}: {reason}") if c >= int(self.cfg['Moderation'].get('max_warnings',3)): self.warnings[k] = 0; self.db.set('warnings', self.warnings) act = self.cfg['Moderation'].get('warning_action','mute') try: if act == 'kick': await self.plugin['xep_0045'].set_role(room, nick, 'none', reason="Too many warnings") elif act == 'mute': await self.plugin['xep_0045'].set_role(room, nick, 'visitor', reason="Too many warnings") except IqError: await self.send_group(room, f"Cannot enforce warning: Permission denied.") def is_mod(self, room, nick): if nick.lower() in self.config_moderators: return True try: room_roster = self.plugin['xep_0045'].rooms.get(room) if not room_roster: target_bare = slixmpp.JID(room).bare for r_jid in self.plugin['xep_0045'].rooms: if slixmpp.JID(r_jid).bare == target_bare: room_roster = self.plugin['xep_0045'].rooms[r_jid] break if room_roster: if nick in room_roster: user = room_roster[nick] if user['affiliation'] in ('admin', 'owner') or user['role'] == 'moderator': return True except Exception as e: logging.debug(f"is_mod check failed: {e}") return False def is_nsfw_room(self, room): mode = self.cfg['Features'].get('nsfw_mode', 'NONE') if mode == 'ALL': return True if mode == 'NONE': return False allowed = [r.strip() for r in self.cfg['Features'].get('nsfw_rooms', '').split(',')] return str(room) in allowed def find_user(self, room, partial): try: nicks = list(self.plugin['xep_0045'].get_roster(room).keys()) match = get_close_matches(partial, nicks, n=1) return match[0] if match else None except: return None async def send_group(self, room, text): self.send_message(mto=room, mbody=str(text), mtype='groupchat') async def send_private_muc(self, room, nick, text): self.send_message(mto=f"{room}/{nick}", mbody=str(text), mtype='chat') async def send_private_msg_by_jid(self, jid, text): self.send_message(mto=jid, mbody=str(text), mtype='chat') async def main(): logging.basicConfig(level=logging.DEBUG, format="%(asctime)s %(levelname)s %(message)s") if not Path('config.ini').exists(): print("Config missing. Please make a config. Otherwise how would the bot work?") return bot = RevoltBot() bot.connect() await bot.disconnected if __name__ == "__main__": asyncio.run(main())