diff --git a/chrobry.py b/chrobry.py new file mode 100644 index 0000000..58a1745 --- /dev/null +++ b/chrobry.py @@ -0,0 +1,1505 @@ +#!/usr/bin/env python3 + +import slixmpp +import asyncio +import logging +import configparser +import json +import random +import re +import aiohttp +import operator +import mimetypes +import base64 +from pathlib import Path +from typing import Dict, List, Optional, Any +from difflib import get_close_matches +from dataclasses import dataclass, field +from io import BytesIO +from urllib.parse import urlencode, quote +from slixmpp.exceptions import IqError, IqTimeout + +# The MOST ESSENTIAL PART of the code! +N_QUOTES = [ + "Did you just slap me with that arm?", + "Yeah sorry, it's just my head kinda hurts.", + "Hey, are you new to our squad?", + "You're a little, uh, short for a Disassembly Drone...", + "I'm Serial Designation N, nice to meet you!", + "I'm kind of the leader of the squad in this city.", + "That's not true, everyone tells me I'm useless and terrible.", + "Wait! I'm not supposed to tell you that part! Biscuits...", + "Well, honesty is the best policy.", + "I also can't seem to remember the past 3 hours of my life, ah!, but I'm sure that'll sort itself out.", + "Hoo, stuck yourself?", + "Just pop it in your mouth. Our saliva neutralizes the nanites, otherwise I'd be constantly disassembling myself.", + "Sure! I love doing anything.", + "Sweet, uh, I'm open to new things I guess.", + "Consider it err... repressed!", + "No worries, I'm N, but a whole letter is a lot to remember! Ha haha!", + "So obviously a lot of mutual respect there, but secretly I actually kind of have a crush on her.", + "YOU CAN'T TELL HER OKAY?!", + "J's awesome!", + "Hey, let me give you the tour!", + "Outside are the corpse wall, thingies, and here are the buttons!", + "Beep, boop, bop, bop bop, bop, beep, boop, bop bop, beep...", + "More of a one use missile; they never taught us how to land.", + "Other than ingesting their WARM, SWEET oil to avoid overheating and dying, I guess I just wanna be useful.", + "I was given a job, and I always wanna try my best.", + "Oh my! You sure are rebellious! It's kind of exciting...", + "...but not as fun as, uh, following the rules.", + "You know, I left an-an extremely dangerous weapo- u-uh, excuse, o-outside!", + "Hey, fellas!", + "Ooh, deal me in, I love Rummy.", + "Wait, no, ah, I'm going to murder everyone...", + "...Rain check!", + "I'm sorry. I really enjoyed our time together, but I can't have you shooting V with that thing.", + "You, me, name, remember?", + "I get the feeling the company doesn't actually love robots, and, like, we might be robots?", + "...I've made a terrible mistake. It's cool how immediately I could tell.", + "Th-Thanks J, al-always looking-ng out for me. Yo-you're awesome.", + "A-ah, biscuits-s. I-I'm sor-rry. I ruin-ned your card game, that made you have an awk-kward moment with your dad.", + "Being rebellious is a lot harder than it loo-oks. Thanks for showing me the ropes.", + "Tha-at's super fa-air. Ah, I screwed up.", + "Oh uh, J, you're sometimes kinda mean to me, and I wish you weren't.", + "Just some constructive criticism!", + "(J please.)", + "AAAH, MY MIND'S IN A WEIRD PLACE, DON'T READ INTO THIS!", + "UZI! I'm so, so sorry, have fun repressing this, heh-", + "Huh? Oh!, uh, N! I'm an angsty rebellious Disassembly Drone now.", + "Nice to meet you, Mr. Uzi-", + "I'd join you if the sun didn't kill me. Hope you're having important character growth or something, though!", + "I was the PILOT? That's aweso-! (I crashed and ruined everything...)", + "Spaceship pilot. Origin story.", + "I do want to be dapper...", + "We can't interact with the workers anymore V... we're too dangerous.", + "I'm not bringing you for prom murder, V!", + "J went holo-spooky-snake-crab and we maybe grew up in a haunted mansion!", + "Aren't you worried we have no idea what we even are?", + "V, if you're hiding something, we can figure it out together", + "Even if we each only have pieces", + "No! No time!", + "Dapper N~", + "I could help", + "But you probably don't want me...", + "Dapper Buddies!", + "This isn't what i expected at all!", + "Thank you, Lizzy-!", + "V, you kind of suck.", + "Welcome, campers! Let's sound off! One, two...", + "To the bunks!", + "Close one!", + "V, we can't hurt Uzi!", + "I don't know what you are talking about because you won't tell me!", + "What are you so afraid of!?", + "I am! Uzi is, she's a kid like us, V! What is wrong with you!?", + "Easy there, buddy!", + "What did I say about antagonizing her?", + "Uzi, you want to tell me what's the matter?", + "Before we met, scary stuff was actually...pretty scary, and tonight too, 'cause you weren't with me to make it fun somehow, I kind of forgot what that was like.", + "Then we'll stick together!", + "Kidding! Just avoid another whole spire!", + "Baby steps. Together?", + "Us as lizards. J drew herself.", + "Don't give me those eyes.", + "Ah, we'll ask Tessa, OK? If not... Movie night!", + "Hi, Tessa.", + "You guys are... locking her up?", + "I-I-I told her to say that! And brought her here. I-I just... hate orders...?", + "And your fan's dumb...?", + "NO! AAAH! STOP!", + "Go get it, goobers!", + "WOAH! WE WERE BUDDIES! ...darkXwolf17...?", + "So, these are memories, and future me is dead at a sleepover?", + "'Cause... 'Cause of you? 'Cause you're doing that? OW!", + "...'Kay. You kind of seem like an evil ghost witch, though. AH!", + "Why? Who are you?", + "Ugh, J has it...", + "Psst! Tessa!", + "We need the basement key. This bird's from the future.", + "Who? Our Cyn? Nah, she's cool.", + "Not creepy! Sweet!", + "DARKXWOLF17! NOO-HO-HO-HO-HOOO!", + "Is V... okay? Like, in the future?", + "Not dealing with this great, to be immediately honest...! ...Okay, it's gone.", + "Whoa, Cyn! For an eldritch... uh, monster thingy—", + "I got you.", + "T-Tessa...?", + "(Chuckling) It is you.", + "Why?", + "And for Mom backstory. Right, dude?", + "(Giggling like a child) It tickles! The bad version, though!", + "...Doing great, dude.", + "Then Uzi... You can fix her, right? That's why you're here?", + "V! Is Uzi–", + "''IT SUCKS PRETTY HARD''", + "DON'T!", + "Uzi, no!", + "V, come on!", + "NO, NO, NO! V, WE NEED YOU!", + "V, PLEASE!", + "Uzi, help!", + "Use your Solver!", + "Oh, no no, it's okay.", + "We're not going to hurt you. Okay?", + "I deserve this, I deserve this, I deserve this!", + "T-Thank you mine shaft trauma ghost, I can hear you fine from there!", + "Uzi's dad? What-", + "Sorry, ma'am. I won't keep secrets from Uzi anymore...even if you say it might hurt her", + "Gave her ungodly eldritch genetics!", + "I promise.…to wait until you're ready to tell her yourself?", + "You knew about the patch. Yes, or no? One chance.", + "Hey.. buddy... Yeah, that should help...", + "All I know is.. I need you. To figure things out... together..?", + "NORI! I should mention Uzi and I-!", + "HahaohaAAAAAAAAAAAAAH!!!!", + "UZI!!! THAT WAS YOUR MOM! WAIT, NO! AAAAAHHHHHHH!", + "Uzi DON'T YOU DARE!!!", + "Heeheehee! Spaceship pilot!", + "AAAACK! Sorry, I didn't know- Oh.", + "Yeah, I'm kind of, like, actually mad about what you did. But we can -- TALK LATER!!", + "V, STOP!", + "No more tricks.", + "I dunno, I feel like we need a secret handshake or something, right?", + "Uzi! Uzi!", + "UUUUUUUUUZIIIIIII!!!!", + "It's you!", + "Whooooo-hoo-hoo-hoo! That's my girlfriend! " +] + +EMOJIS = ['1️⃣','2️⃣','3️⃣','4️⃣','5️⃣','6️⃣','7️⃣','8️⃣','9️⃣','🔟'] + +# For dictionary +TDK_HEADERS = { + 'User-Agent': 'Mozilla/5.0 (Linux; Android 10; K) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/137.0.0.0 Safari/537.36', + 'Accept': 'application/json, text/javascript, */*; q=0.01', + 'Accept-Language': 'en-US,en;q=0.5', + 'Accept-Encoding': 'gzip, deflate, br, zstd', + 'X-Requested-With': 'XMLHttpRequest', + 'DNT': '1', + 'Sec-GPC': '1', + 'Connection': 'keep-alive', + 'Referer': 'https://sozluk.gov.tr/', + 'Sec-Fetch-Dest': 'empty', + 'Sec-Fetch-Mode': 'cors', + 'Sec-Fetch-Site': 'same-origin', + 'sec-ch-ua-platform': '"Android"', + 'sec-ch-ua': '"Not/A)Brand";v="8", "Chromium";v="137", "Google Chrome";v="137"', + 'sec-ch-ua-mobile': '?1', + 'Save-Data': 'on', + 'Priority': 'u=0' +} + +class DataManager: + def __init__(self, filename): + self.filename = Path(filename) + self.data = self._load() + + def _load(self): + if not self.filename.exists(): return {} + try: return json.loads(self.filename.read_text(encoding='utf-8')) + except: return {} + + def save(self): + self.filename.write_text(json.dumps(self.data, indent=2), encoding='utf-8') + + def get(self, key, default=None): + return self.data.get(key, default) + + def set(self, key, value): + self.data[key] = value + self.save() + +# Handles image uploads using your xmpp server or catbox/litterbox (default 72 hours removal) +class AsyncFileUploader: + def __init__(self, client, service='xmpp', api_key=None): + self.client = client + self.service = service.lower() + self.api_key = api_key + + async def upload(self, data: bytes, filename: str, mime_type: str) -> Optional[str]: + try: + if self.service == 'xmpp': + input_file = BytesIO(data) + return await self.client['xep_0363'].upload_file( + input_file=input_file, filename=filename, size=len(data), + content_type=mime_type, timeout=60 + ) + elif self.service == 'catbox': + url = "https://catbox.moe/user/api.php" + form = aiohttp.FormData() + form.add_field('reqtype', 'fileupload') + if self.api_key: form.add_field('userhash', self.api_key) + form.add_field('fileToUpload', data, filename=filename, content_type=mime_type) + async with aiohttp.ClientSession() as s: + async with s.post(url, data=form) as r: + if r.status == 200: return (await r.text()).strip() + elif self.service == 'litterbox': + url = "https://litterbox.catbox.moe/resources/internals/api.php" + form = aiohttp.FormData() + form.add_field('reqtype', 'fileupload') + form.add_field('time', '72h') + form.add_field('fileToUpload', data, filename=filename, content_type=mime_type) + async with aiohttp.ClientSession() as s: + async with s.post(url, data=form) as r: + if r.status == 200: return (await r.text()).strip() + except Exception as e: + logging.error(f"Upload failed: {e}") + return None + +@dataclass +class Poll: + id: str + room: str + creator: str + question: str + options: List[str] + votes: Dict[str, int] = field(default_factory=dict) + anon: bool = False + max_votes: Optional[int] = None + closed: bool = False + +class PollManager: + def __init__(self, bot): + self.bot = bot + self.polls: Dict[str, Poll] = {} + self.cnt = 0 + + def create(self, room, sender, args): + parts = re.findall(r'"([^"]+)"', args) + if len(parts) < 3: + return f"Usage: {self.bot.trigger}poll \"Question\" \"Opt1\" \"Opt2\" [anon=true] [votes=N]" + + q, opts = parts[0], parts[1:11] + anon = 'anon=true' in args.lower() + max_v = None + mv_match = re.search(r'votes=(\d+)', args) + if mv_match: max_v = int(mv_match.group(1)) + + self.cnt += 1 + pid = str(self.cnt) + p = Poll(pid, room, sender, q, opts, {}, anon, max_v, False) + self.polls[pid] = p + + return self._announce(p) + + def _announce(self, p): + lines = [f"📊 **Poll {p.id}**: {p.question}"] + for i, o in enumerate(p.options): + lines.append(f"{EMOJIS[i]} {o}") + lines.append(f"Vote: {self.bot.trigger}vote {p.id} ") + if p.anon: lines.append("🔒 Anonymous voting") + return "\n".join(lines) + + def vote(self, sender, args): + parts = args.split() + if len(parts) < 2: return "Usage: vote " + pid, choice = parts[0], parts[1] + + if pid not in self.polls: return "Poll not found." + p = self.polls[pid] + if p.closed: return "Poll closed." + + try: + idx = int(choice) - 1 + if not (0 <= idx < len(p.options)): raise ValueError + p.votes[sender] = idx + return "Vote cast." + except: + return "Invalid option." + + def close(self, sender, args): + pid = args.strip() + if pid not in self.polls: return "Poll not found." + p = self.polls[pid] + if sender != p.creator and not self.bot.is_mod(p.room, sender): + return "Only creator can close." + + p.closed = True + return self._results(p) + + def _results(self, p): + counts = [0] * len(p.options) + for v in p.votes.values(): + if 0 <= v < len(counts): counts[v] += 1 + + total = sum(counts) + lines = [f"📊 **Results** - {p.question}"] + for i, o in enumerate(p.options): + pct = (counts[i] / total * 100) if total else 0 + lines.append(f"{EMOJIS[i]} {o}: {counts[i]} ({pct:.1f}%)") + return "\n".join(lines) + +class UnoGame: + def __init__(self, room, bot): + self.room = room + self.bot = bot + self.players = [] + self.hands = {} + self.deck = [] + self.discard = [] + self.turn = 0 + self.direction = 1 + self.started = False + self.color = None + + def join(self, nick): + if self.started: return "Started." + if nick not in self.players: + self.players.append(nick) + return f"{nick} joined Uno." + return "Already joined." + + def start(self): + if len(self.players) < 2: return "Need 2+ players." + self.started = True + colors = ['red','blue','green','yellow'] + vals = [str(i) for i in range(10)] + ['skip','reverse','+2'] + self.deck = [f"{c}_{v}" for c in colors for v in vals] * 2 + self.deck += ['wild_wild', 'wild_+4'] * 4 + random.shuffle(self.deck) + + for p in self.players: + self.hands[p] = [self.deck.pop() for _ in range(7)] + self._pm_hand(p) + + top = self.deck.pop() + while 'wild' in top: + self.deck.append(top) + random.shuffle(self.deck) + top = self.deck.pop() + + self.discard.append(top) + self.color = top.split('_')[0] + return f"Uno Started! Top: {self._fmt(top)}. Turn: {self.players[self.turn]}" + + def play(self, nick, arg): + if not self.started or self.players[self.turn] != nick: return + hand = self.hands[nick] + + parts = arg.lower().split() + declared_color = None + valid_colors = ['red','blue','green','yellow'] + + for p in parts: + if p in valid_colors: + declared_color = p + + sel_card = None + for c in hand: + c_simple = c.replace('_', ' ').lower() + if arg.lower() in c_simple: + sel_card = c + break + if "wild" in parts and "wild" in c_simple: + if "+4" in arg and "+4" in c: + sel_card = c + break + elif "+4" not in arg and "+4" not in c: + sel_card = c + break + + if not sel_card: return "Card not in hand." + + top = self.discard[-1] + t_col = self.color + t_val = top.split('_')[1] + s_col, s_val = sel_card.split('_') + + valid = False + if 'wild' in s_col: + valid = True + if not declared_color: + asyncio.create_task(self.bot.send_private_muc(self.room, nick, "⚠️ Please specify a color (e.g. `play wild red`).")) + self._pm_hand(nick) + return "Specify color (e.g. play wild red)" + self.color = declared_color + elif s_col == t_col or s_val == t_val: + valid = True + self.color = s_col + + if not valid: return "Invalid move." + + hand.remove(sel_card) + self.discard.append(sel_card) + + msg = f"{nick} played {self._fmt(sel_card)}." + if not hand: + self.started = False + return f"{msg}\n🏆 {nick} Wins!" + if len(hand) == 1: msg += " UNO!" + + if 'skip' in s_val: self._advance() + elif 'reverse' in s_val: + self.direction *= -1 + if len(self.players)==2: self._advance() + elif '+2' in s_val: + self._advance() + self._draw(self.players[self.turn], 2) + elif '+4' in s_val: + self._advance() + self._draw(self.players[self.turn], 4) + + self._advance() + return f"{msg}\nNext: {self.players[self.turn]} (Color: {self.color})" + + def draw(self, nick): + if self.players[self.turn] != nick: return + self._draw(nick, 1) + self._advance() + return f"{nick} drew." + + def _draw(self, nick, n): + for _ in range(n): + if not self.deck: + self.deck = self.discard[:-1] + self.discard = [self.discard[-1]] + random.shuffle(self.deck) + if self.deck: self.hands[nick].append(self.deck.pop()) + self._pm_hand(nick) + + def _advance(self): + self.turn = (self.turn + self.direction) % len(self.players) + next_player = self.players[self.turn] + self._pm_hand(next_player) + + def _fmt(self, c): + if 'wild' in c: return f"🌈 {c.split('_')[1]}" + em = {'red':'🟥','blue':'🟦','green':'🟩','yellow':'🟨'} + return f"{em.get(c.split('_')[0],'')} {c.split('_')[1]}" + + def _pm_hand(self, nick): + asyncio.create_task(self.bot.send_private_muc(self.room, nick, " | ".join(self._fmt(c) for c in self.hands[nick]))) + +# JACK IMPLEMENTATION +class MultiplayerBlackjackGame: + def __init__(self, room, bot): + self.room = room + self.bot = bot + self.players = [] + self.hands = {} + self.d_hand = [] + self.deck = [] + self.turn_idx = 0 + self.status = "JOINING" + + suits = ['♠', '♥', '♦', '♣'] + ranks = ['2','3','4','5','6','7','8','9','10','J','Q','K','A'] + self.deck = [{'rank': r, 'suit': s} for s in suits for r in ranks] * 2 + random.shuffle(self.deck) + + def join(self, nick): + if self.status != "JOINING": return "Game already started." + if nick in self.players: return "Already joined." + self.players.append(nick) + return f"{nick} joined Blackjack." + + def start(self): + if len(self.players) < 1: return "Need players." + self.status = "PLAYING" + self.turn_idx = 0 + + for p in self.players: + self.hands[p] = [self.deck.pop(), self.deck.pop()] + self.d_hand = [self.deck.pop(), self.deck.pop()] + + return self._status_msg() + + def hit(self, nick): + if self.status != "PLAYING": return "Not playing." + if self.players[self.turn_idx] != nick: return "Not your turn." + + self.hands[nick].append(self.deck.pop()) + val = self._calc(self.hands[nick]) + + res = f"{nick} hits: {self._fmt(self.hands[nick])} ({val})" + if val >= 21: + res += " (Bust!)" if val > 21 else " (21!)" + self._advance_turn() + + return res + + def stand(self, nick): + if self.status != "PLAYING": return "Not playing." + if self.players[self.turn_idx] != nick: return "Not your turn." + + res = f"{nick} stands." + self._advance_turn() + return res + + def _advance_turn(self): + self.turn_idx += 1 + if self.turn_idx >= len(self.players): + self.status = "DEALING" + + def resolve_game(self): + while self._calc(self.d_hand) < 17: + self.d_hand.append(self.deck.pop()) + + d_val = self._calc(self.d_hand) + lines = [f"🎰 **Results** (Dealer: {self._fmt(self.d_hand)} - {d_val})"] + + for p in self.players: + p_val = self._calc(self.hands[p]) + if p_val > 21: + lines.append(f"❌ {p}: {p_val} (Bust)") + elif d_val > 21 or p_val > d_val: + lines.append(f"✅ {p}: {p_val} (Win)") + elif p_val == d_val: + lines.append(f"⚖️ {p}: {p_val} (Push)") + else: + lines.append(f"❌ {p}: {p_val} (Loss)") + + return "\n".join(lines) + + def _status_msg(self): + if self.status == "PLAYING": + curr = self.players[self.turn_idx] + msg = f"🃏 **Blackjack**\nDealer: [{self.d_hand[0]['rank']}{self.d_hand[0]['suit']}] [??]\n" + for p in self.players: + mark = "👉 " if p == curr else "" + msg += f"{mark}{p}: {self._fmt(self.hands[p])} ({self._calc(self.hands[p])})\n" + return msg + return "" + + def _calc(self, hand): + val = 0 + aces = 0 + for c in hand: + if c['rank'] in ['J','Q','K']: val += 10 + elif c['rank'] == 'A': val += 11; aces += 1 + else: val += int(c['rank']) + while val > 21 and aces: val -= 10; aces -= 1 + return val + + def _fmt(self, hand): + return " ".join([f"[{c['rank']}{c['suit']}]" for c in hand]) + +# MAIN CLASS +class RevoltBot(slixmpp.ClientXMPP): + def __init__(self): + self.cfg = configparser.ConfigParser() + if not Path('config.ini').exists(): raise FileNotFoundError("config.ini missing") + self.cfg.read('config.ini') + + super().__init__(self.cfg['XMPP']['jid'], self.cfg['XMPP']['password']) + + self.trigger = self.cfg['Bot']['trigger'] + self.nick = self.cfg['Bot']['nickname'] + self.rooms = [r.strip() for r in self.cfg['XMPP']['rooms'].split(',')] + + self.db = DataManager('bot_data.json') + self.copypastas = DataManager('copypastas.json') + self.warnings = self.db.get('warnings', {}) + self.wordlist = set() + + self.config_moderators = set() + if self.cfg.has_option('Bot', 'moderators'): + raw_mods = self.cfg['Bot']['moderators'] + self.config_moderators = {m.strip().lower() for m in raw_mods.split(',') if m.strip()} + + # Prefers English -> German -> French -> Espanol for word definitions + # Languages are chosen because they are the most spoken languages with Latin alphabet + # Configuration allows change this part + self.wiktionary_priority = ['en', 'de', 'fr', 'es'] + if self.cfg.has_option('Features', 'wiktionary_priority'): + raw_p = self.cfg['Features']['wiktionary_priority'] + self.wiktionary_priority = [p.strip() for p in raw_p.split(',') if p.strip()] + + self.uploader = AsyncFileUploader(self, + self.cfg['Bot'].get('file_host', 'litterbox'), + self.cfg['Bot'].get('file_host_api_key')) + + self.polls = PollManager(self) + self.games = {} + + + self.register_plugin('xep_0030') + self.register_plugin('xep_0045') + self.register_plugin('xep_0199') + self.register_plugin('xep_0363') + self.register_plugin('xep_0054') + self.register_plugin('xep_0060') + + self.add_event_handler("session_start", self.start) + self.add_event_handler("groupchat_message", self.on_group) + self.add_event_handler("message", self.on_pm) + +# BOT STARTUP + async def start(self, event): + self.send_presence() + await self.get_roster() + +# Downloads slurs from the database to block them while moderating + url = self.cfg['Features'].get('wordlist_url') + if url: + try: + async with aiohttp.ClientSession() as s: + async with s.get(url) as r: + if r.status == 200: + self.wordlist = set((await r.text()).lower().splitlines()) + logging.info(f"Loaded {len(self.wordlist)} banned words.") + except Exception as e: logging.error(f"Wordlist error: {e}") + + for room in self.rooms: + self.join_room_with_retry(room) + + def join_room_with_retry(self, room): + room_jid = str(slixmpp.JID(room).bare) + n = self.cfg['XMPP'].get(f'nickname.{room}', self.nick) + try: + self.plugin['xep_0045'].join_muc(room_jid, n) + logging.info(f"Joined {room_jid} as {n}") + except Exception as e: + logging.error(f"Could not join {room}: {e}") + + + + async def on_group(self, msg): + if msg['mucnick'] == self.nick: return + room = msg['from'].bare + body = msg['body'].strip() + + mod_enabled = self.cfg['Moderation'].get('moderation_enabled', 'ALL').upper() + should_moderate = True + if mod_enabled == 'NONE': should_moderate = False + elif mod_enabled == 'NON_NSFW' and self.is_nsfw_room(room): should_moderate = False + +# Warn a user if they use a slur! + try: + if should_moderate and self.wordlist: + if any(re.search(r'\b'+re.escape(w)+r'\b', body.lower()) for w in self.wordlist): + await self.warn(room, msg['mucnick'], "Message includes word inside the filter.") + return + + if not body.startswith(self.trigger): return + + parts = body[len(self.trigger):].split(maxsplit=1) + cmd = parts[0].lower() + args = parts[1] if len(parts) > 1 else "" + + await self.handle_cmd(room, msg, cmd, args) + except Exception as e: + logging.error(f"Error handling message: {e}") + + async def on_pm(self, msg): + if msg['type'] == 'error': return + if msg['from'].bare in self.rooms and not msg['from'].resource: return + if msg['from'].bare == self.boundjid.bare: return + if msg['type'] == 'groupchat': return + + body = msg['body'].strip() + if not body: return + + if body.startswith(self.trigger): + parts = body[len(self.trigger):].split(maxsplit=1) + cmd = parts[0].lower() + if cmd == 'help': + await self.send_private_msg_by_jid(msg['from'], await self._generate_help_message(False)) + elif cmd == 'dm': + await self.send_private_msg_by_jid(msg['from'], "Hi! :3") + + async def handle_cmd(self, room, msg, cmd, args): + sender = msg['mucnick'] + +# Too lazy to remove cdn links + if cmd == 'ihnk': + await self.send_group(room, "https://cdn.purplebored.pl/uploads/y2mate.com%20-%20I%20have%20no%20knowledge%20of%20any%20of%20this%20ThIs%20iS%20sO%20bIzZaRe_480p.mp4") + elif cmd == 'scary': + await self.send_group(room, "https://cdn.purplebored.pl/uploads/y2mate.com%20-%20oh%20oh%20scary%20oh%20oh%20shiver%20me%20timbers_360p.mp4") +# MOST ESSENTIAL COMMAND OF THE BOT + elif cmd == 'n': + await self.send_group(room, random.choice(N_QUOTES)) + elif cmd == 'cl': + await self.send_group(room, f"Common {args or 'Everyone'} L") + elif cmd == 'l': + await self.send_group(room, "L") + elif cmd == 'test': + await self.send_group(room, "Ig it works ™️") + elif cmd == 'credits': + await self.send_group(room, "Original by Purplebored. Port by agent-n.") + elif cmd == 'invite': + await self.send_private_muc(room, sender, "Contact the hoster of this instance to add your room.") + elif cmd == 'ping': + await self.send_group(room, "Pong!") + elif cmd == 'video': + await self.send_group(room, await self._generate_video_list()) + + elif cmd in ['calculate', 'calc']: + parts = args.split() + if len(parts) == 3: + try: + n1 = float(parts[0]) + op = parts[1] + n2 = float(parts[2]) + ops = {'+':operator.add, '-':operator.sub, '*':operator.mul, '/':operator.truediv} + if op in ops: + if op == '/' and n2 == 0: await self.send_group(room, "Division by zero is not allowed.") + else: await self.send_group(room, f"Result: {ops[op](n1, n2)}") + else: await self.send_group(room, "Invalid op.") + except: await self.send_group(room, "Error.") + else: await self.send_group(room, f"Usage: {self.trigger}calc num + num") + elif cmd == 'dm': + await self.send_private_muc(room, sender, "Hi :3") + + +# AVATAR DOWNLOAD AND REUPLOAD + elif cmd == 'avatar': + target_nick = args.strip() if args else sender + try: + target_jid = self.plugin['xep_0045'].get_jid_property(room, target_nick, 'jid') + is_muc_occupant = False + + if not target_jid: + target_jid = f"{room}/{target_nick}" + is_muc_occupant = True + + avatar_data, mime_type = await self._get_avatar_data(target_jid, is_muc_occupant=is_muc_occupant) + + if avatar_data: + ext = mimetypes.guess_extension(mime_type) or ".png" + filename = f"avatar{ext}" + url = await self.uploader.upload(avatar_data, filename, mime_type) + if url: await self.send_group(room, f"{url}") + else: await self.send_group(room, "Avatar retrieved but upload failed (literally 1984).") + else: + await self.send_group(room, f"No avatar found for {target_nick}. Please try again later...") + except Exception as e: + logging.error(f"Avatar error: {e}") + await self.send_group(room, "Error fetching avatar.") + +# WIKTIONARY LOOKUP + elif cmd in ['wiki', 'wiktionary']: + if not args: + await self.send_group(room, "Usage: wiki ") + return + + term = args.strip() + url = f"https://en.wiktionary.org/api/rest_v1/page/definition/{quote(term)}?redirect=true" + headers = { + 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/142.0.0.0 Safari/537.36', + 'Accept': 'application/json; charset=utf-8; profile="https://www.mediawiki.org/wiki/Specs/definition/0.8.0"' + } + + try: + data = await self.fetch(url, headers=headers, json=True) + + if not data or 'error' in data: + await self.send_group(room, "Word not found.") + return + + available_codes = list(data.keys()) + if 'zh' in available_codes: available_codes.remove('zh') + + priority_codes = self.wiktionary_priority + selected_code = next((c for c in priority_codes if c in available_codes), None) + + if not selected_code and available_codes: + selected_code = available_codes[0] + + if not selected_code: + await self.send_group(room, "No definitions found.") + return + + entries = data[selected_code] + lang_name = entries[0].get('language', selected_code) + found_defs = [] + + for entry in entries: + pos = entry.get('partOfSpeech', '') + for d in entry.get('definitions', []): + raw = d.get('definition', '') + clean = re.sub(r'<[^>]+>', '', raw).strip() + if clean: + found_defs.append(f"({pos}) {clean}" if pos else clean) + + if found_defs: + resp = f"📖 **{term}** ({lang_name})\n" + "\n".join(f"- {d}" for d in found_defs[:4]) + await self.send_group(room, resp) + else: + await self.send_group(room, f"No clean definitions found in {lang_name}.") + + except Exception as e: + logging.error(f"Wiki error: {e}") + await self.send_group(room, "Error fetching definition.") + elif cmd in ['tdk', 'sozluk']: + if not args: + await self.send_group(room, "Please provide a word to search.") + return + +# Language Society word search (hidden by default to prevent confusion from other dictionary command) + url = f"https://sozluk.gov.tr/gts?ara={args.strip()}" + try: + + data = await self.fetch(url, headers=TDK_HEADERS, content_type_fix=True) + if data and isinstance(data, list) and len(data) > 0: + item = data[0] + word = item.get('madde', '') + meanings = item.get('anlamlarListe', []) + origin = item.get('lisan', '') + + resp = f"📚 **{word}**" + if origin: resp += f" ({origin})" + resp += "\n" + + for idx, m in enumerate(meanings, 1): + resp += f"{idx}. {m.get('anlam', '')}\n" + + await self.send_group(room, resp) + else: + await self.send_group(room, "Word not found in TDK dictionary.") + except Exception as e: + logging.error(f"TDK error: {e}") + await self.send_group(room, "Error fetching definition.") + + + elif cmd == 'quote': + d = await self.fetch("http://api.forismatic.com/api/1.0/?method=getQuote&format=json&lang=en", json=True, content_type_fix=True) + if d: + try: + await self.send_group(room, f"\"{d.get('quoteText','').strip()}\" - {d.get('quoteAuthor','Unknown')}") + except: await self.send_group(room, "Quote error hell yeah!") + + +# Since API is broken it uses local shitposts + elif cmd == 'shitpost': + p = Path('shitposts') + if not p.exists(): + await self.send_group(room, "Shitpost directory missing.") + return + + images = [f for f in p.iterdir() if f.suffix.lower() in ['.jpg', '.jpeg', '.png', '.gif', '.webp']] + if not images: + await self.send_group(room, "No shitposts found locally.") + return + + try: + target = random.choice(images) + data = target.read_bytes() + mime, _ = mimetypes.guess_type(target.name) + url = await self.uploader.upload(data, target.name, mime or 'application/octet-stream') + if url: await self.send_group(room, url) + else: await self.send_group(room, "Failed to upload local shitpost.") + except Exception as e: + logging.error(f"Shitpost error: {e}") + await self.send_group(room, "Error sending shitpost.") + + elif cmd == 'neko': + d = await self.fetch("https://nekos.best/api/v2/neko") + if d: await self.upload_send(room, d['results'][0]['url']) + elif cmd == 'cat': + d = await self.fetch("https://api.thecatapi.com/v1/images/search") + if d: await self.upload_send(room, d[0]['url']) + elif cmd == 'dogfact': + d = await self.fetch("https://dogapi.dog/api/v2/facts") + if d and d['data']: await self.send_group(room, d['data'][0]['attributes']['body']) + elif cmd == 'catfact': + d = await self.fetch("https://meowfacts.herokuapp.com/") + if d and d['data']: await self.send_group(room, d['data'][0]) + elif cmd == 'urban': + d = await self.fetch(f"https://api.urbandictionary.com/v0/define?term={args}") + if d and d['list']: await self.send_group(room, f"📖 {d['list'][0]['definition'][:300]}...") + elif cmd == 'joke': + d = await self.fetch("https://v2.jokeapi.dev/joke/Any?type=twopart") + if d and not d.get('error'): await self.send_group(room, f"{d['setup']}\n{d['delivery']}") + elif cmd == 'pun': + d = await self.fetch("https://v2.jokeapi.dev/joke/Pun?type=twopart") + if d and not d.get('error'): await self.send_group(room, f"{d['setup']}\n{d['delivery']}") + elif cmd == 'fact': + d = await self.fetch("https://uselessfacts.jsph.pl/random.json?language=en") + if d: await self.send_group(room, d['text']) + elif cmd == 'advice': + d = await self.fetch("https://api.adviceslip.com/advice", content_type_fix=True) + if d: await self.send_group(room, d['slip']['advice']) + +# say command aka parrot mode. Censors slurs with * character + elif cmd == 'say': + text = args + if self.wordlist: + sorted_words = sorted(self.wordlist, key=len, reverse=True) + for w in sorted_words: + if not w: continue + chars = [re.escape(c) for c in w] + pattern_str = r"(?i)\b" + r"[\W_]*".join(chars) + r"\b" + try: + pattern = re.compile(pattern_str) + def repl(match): + return '*' * len(match.group(0)) + text = pattern.sub(repl, text) + except: pass + + await self.send_group(room, f"{sender} says: {text}") + + elif cmd == 'dice': + await self.send_group(room, f"🎲 {random.randint(1,6)}") + elif cmd == 'roll': + try: + limit = int(args.strip()) + if limit > 0: + await self.send_group(room, f"🎲 {random.randint(1, limit)}") + else: + await self.send_group(room, "Please pick a valid number.") + except: + await self.send_group(room, "Please specify a number to roll.") + + elif cmd == 'coinflip': + await self.send_group(room, f"🪙 {random.choice(['Heads','Tails'])}") + elif cmd == 'rps': + opts = ['rock','paper','scissors'] + b = random.choice(opts) + u = args.lower().strip() + if u not in opts: + await self.send_group(room, "rock, paper, or scissors?") + else: + res = "I won!" + if b == u: res = "Tie!" + elif (u=='rock' and b=='scissors') or (u=='paper' and b=='rock') or (u=='scissors' and b=='paper'): + res = "You won!" + await self.send_group(room, f"I chose {b}. {res}") + + +# GIF command finds and sends the searched GIF + elif cmd == 'gif': + k = self.cfg['Bot'].get('giphy_key') + if not k: + await self.send_group(room, "Giphy API key not configured.") + return + try: + d = await self.fetch(f"https://api.giphy.com/v1/gifs/search?api_key={k}&q={args}&limit=1") + if d and d['data']: + url = d['data'][0]['images']['original']['url'] + await self.upload_send(room, url) + else: + await self.send_group(room, f"No GIF found for '{args}'.") + except Exception as e: + logging.error(f"GIF error: {e}") + await self.send_group(room, f"Error searching GIF.") + + + elif cmd in ['kick','ban','mute','warn','unban','unmute']: + if not self.is_mod(room, sender): + await self.send_group(room, "Not authorized.") + return + + target_nick = args.strip() + if not target_nick: + await self.send_group(room, f"Usage: {cmd} ") + return + + in_roster = False + try: + roster = self.plugin['xep_0045'].get_roster(room) + if target_nick in roster: in_roster = True + except: pass + + try: + if cmd == 'warn': + if not in_roster: + await self.send_group(room, "User not found.") + return + await self.warn(room, target_nick, "Manual Warning") + + elif cmd == 'kick': + if not in_roster: + await self.send_group(room, "User not found.") + return + await self.plugin['xep_0045'].set_role(room, target_nick, 'none', reason="Kicked by operator") + await self.send_group(room, f"👢 Kicked {target_nick}. They will return I guess but anyways.") + + elif cmd == 'mute': + if not in_roster: + await self.send_group(room, "User not found.") + return + await self.plugin['xep_0045'].set_role(room, target_nick, 'visitor', reason="Muted by operator") + await self.send_group(room, f"🤐 Muted (shutted up) {target_nick}.") + + elif cmd == 'unmute': + if not in_roster: + await self.send_group(room, "User not found.") + return + await self.plugin['xep_0045'].set_role(room, target_nick, 'participant', reason="Unmuted by operator") + await self.send_group(room, f"🔊 Unmuted {target_nick}. {target_nick} can speak now again!") + + elif cmd == 'ban': + if not in_roster: + await self.send_group(room, "User not found.") + return + jid = self.plugin['xep_0045'].get_jid_property(room, target_nick, 'jid') + if jid: + await self.plugin['xep_0045'].set_affiliation(room, jid=jid, affiliation='outcast', reason="Banned by operator") + await self.send_group(room, f"🚫 Banhammer! whomp whomp {target_nick}") + else: + await self.send_group(room, "Cannot ban (Hidden JID/Anonymous).") + + elif cmd == 'unban': + target_jid = target_nick + try: + await self.plugin['xep_0045'].set_affiliation(room, jid=target_jid, affiliation='none', reason="Unbanned by operator") + await self.send_group(room, f"🕊️ Unbanned {target_jid}. Taste of liberty!") + except IqError: + await self.send_group(room, f"Failed to unban {target_jid}. Ensure you are using the valid JID.") + + except IqError as e: + await self.send_group(room, f"Permission denied: {e.iq['error']['text'] or 'Bot missing privileges'}") + except Exception as e: + logging.error(f"Mod command failed: {e}") + await self.send_group(room, "Operation failed.") + + + elif cmd == 'poll': + res = self.polls.create(room, sender, args) + await self.send_group(room, res) + elif cmd == 'vote': + res = self.polls.vote(sender, args) + await self.send_private_muc(room, sender, res) + elif cmd == 'close': + res = self.polls.close(sender, args) + await self.send_group(room, res) + + elif cmd == 'uno': + sub = args.split()[0] if args else '' + arg = args[len(sub):].strip() + gid = f"uno_{room}" + g = self.games.get(gid) + if sub == 'join': + if not g: + g = UnoGame(room, self) + self.games[gid] = g + await self.send_group(room, g.join(sender)) + elif sub == 'start' and g: + await self.send_group(room, g.start()) + elif sub == 'play' and g: + res = g.play(sender, arg) + if res: await self.send_group(room, res) + if res and "Wins" in res: del self.games[gid] + elif sub == 'draw' and g: + res = g.draw(sender) + if res: await self.send_group(room, res) + +# JACK!! + elif cmd in ['bj', 'blackjack']: + gid = f"bj_{room}" + sub = args.split()[0].lower() if args else '' + + g = self.games.get(gid) + + if not sub: + if not g: + g = MultiplayerBlackjackGame(room, self) + self.games[gid] = g + g.join(sender) + await self.send_group(room, g.start()) + else: + if g.status == "JOINING": + await self.send_group(room, f"Lobby Open! Type `{self.trigger}bj join` to join or `{self.trigger}bj start` to begin.") + else: + await self.send_group(room, "Game in progress.") + + elif sub == 'join': + if not g: + g = MultiplayerBlackjackGame(room, self) + self.games[gid] = g + await self.send_group(room, f"Blackjack lobby created by {sender}. Type `{self.trigger}bj join`.") + + res = g.join(sender) + await self.send_group(room, res) + + elif sub == 'start' and g: + await self.send_group(room, g.start()) + + elif sub == 'hit' and g: + res = g.hit(sender) + await self.send_group(room, res) + if g.status == "DEALING": + await self.send_group(room, g.resolve_game()) + del self.games[gid] + + elif sub == 'stand' and g: + res = g.stand(sender) + await self.send_group(room, res) + if g.status == "DEALING": + await self.send_group(room, g.resolve_game()) + del self.games[gid] + + else: + await self.send_group(room, f"Blackjack: `{self.trigger}bj` (Solo), `{self.trigger}bj join` (Multi), `{self.trigger}bj hit/stand`") + +# Abbreviation will be removed + elif cmd == 'cp' or cmd == 'copypasta': + if args.startswith('add'): + _, name, txt = args.split(maxsplit=2) + d = self.copypastas.get('pastas', {}) + d[name] = txt + self.copypastas.set('pastas', d) + await self.send_group(room, "Added.") + elif args == 'list': + await self.send_group(room, ", ".join(self.copypastas.get('pastas',{}).keys())) + else: + txt = self.copypastas.get('pastas',{}).get(args) + if txt: await self.send_group(room, txt) + + + elif cmd == 'gpt': + if not self.cfg.has_section('AI') or not self.cfg['AI'].get('openai_key'): + await self.send_group(room, "AI not configured.") + return + + k = self.cfg['AI']['openai_key'] + url = self.cfg['AI'].get('openai_api_url') + pl = { + "model": self.cfg['AI'].get('openai_model'), + "messages": [{"role":"system","content":self.cfg['AI'].get('openai_system_prompt')}, + {"role":"user","content":args}] + } + headers = {"Authorization": f"Bearer {k}"} + d = await self.fetch(url, method='POST', json_data=pl, headers=headers) + if d and d.get('choices'): await self.send_group(room, d['choices'][0]['message']['content']) + + + + + elif cmd in ['translate', 'tr']: + if not self.cfg.has_section('AI') or not self.cfg['AI'].get('openai_key'): + await self.send_group(room, "AI not configured.") + return + + parts = args.split(maxsplit=1) + if len(parts) < 2: + await self.send_group(room, f"Usage: {self.trigger}translate ") + return + + target_lang = parts[0] + text_to_translate = parts[1] + + system_prompt = ( + f"You are a precise translator. Your sole function is to detect the language of the input text and translate it into {target_lang}. " + f"Provide only the {target_lang} translation with no additional commentary, explanations, notes, or meta-information. " + "Do not include the source language name, translation notes, or any text beyond the translated output. " + f"Maintain the original meaning, tone, and structure as closely as possible while ensuring natural {target_lang} expression." + ) + + k = self.cfg['AI']['openai_key'] + url = self.cfg['AI'].get('openai_api_url') + pl = { + "model": self.cfg['AI'].get('openai_model'), + "messages": [ + {"role": "system", "content": system_prompt}, + {"role": "user", "content": text_to_translate} + ] + } + headers = {"Authorization": f"Bearer {k}"} + + d = await self.fetch(url, method='POST', json_data=pl, headers=headers) + if d and d.get('choices'): + await self.send_group(room, d['choices'][0]['message']['content']) + else: + await self.send_group(room, "Translation failed.") + elif cmd in ['r34', 'hentai', 'boobs', 'hboobs']: + if not self.is_nsfw_room(room): + await self.send_group(room, "NSFW not allowed here. Find a NSFW room or add the room to NSFW list smh") + return + + if cmd == 'r34': + tag = args if args else 'all' + d = await self.fetch(f"https://api.rule34.xxx/index.php?page=dapi&s=post&q=index&limit=1&json=1&tags={tag}") + if d and len(d) > 0 and 'file_url' in d[0]: await self.upload_send(room, d[0]['file_url']) + + elif cmd == 'hentai': + d = await self.fetch("https://nekobot.xyz/api/image?type=hneko") + if d and 'message' in d: await self.upload_send(room, d['message']) + + elif cmd == 'boobs': + d = await self.fetch("https://nekobot.xyz/api/image?type=boobs") + if d and 'message' in d: await self.upload_send(room, d['message']) + + elif cmd == 'hboobs': + d = await self.fetch("https://nekobot.xyz/api/image?type=hboobs") + if d and 'message' in d: await self.upload_send(room, d['message']) + + +# Same thing with copypasta applies here too + elif cmd in ['wa', 'wolfram']: + if not args: + await self.send_group(room, "Usage: wa ") + else: + appid = None + if self.cfg.has_section('Features'): + appid = self.cfg['Features'].get('wolfram_appid') + if not appid and self.cfg.has_section('Bot'): + appid = self.cfg['Bot'].get('wolfram_appid') + + if not appid: + await self.send_group(room, "WolframAlpha AppID not configured.") + else: + try: + url = f"http://api.wolframalpha.com/v1/result?appid={appid}&i={quote(args.strip())}" + data = await self.fetch(url, json=False) + if data: + txt = data.decode('utf-8', errors='ignore').strip() + await self.send_group(room, f"Wolfram says: {txt}") + else: + await self.send_group(room, "No short answer available.") + except Exception as e: + logging.error(f"Wolfram error: {e}") + await self.send_group(room, "Error querying WolframAlpha.") + + elif cmd == 'help': + await self.send_group(room, await self._generate_help_message(self.is_mod(room, sender))) + elif cmd == 'mod-help': + if self.is_mod(room, sender): await self.send_group(room, await self._generate_mod_help()) + elif cmd == 'nsfw-help': + await self.send_group(room, await self._generate_nsfw_help()) + + async def _generate_help_message(self, is_mod): + t = self.trigger + s = f"#### List of available commands:\n\n#### Practical Commands: \n" \ + f"`{t}video` - Sends a list of all available videos\n" \ + f"`{t}help` - Displays this command.\n" \ + f"`{t}credits` - Displays bot's credit.\n" \ + f"`{t}test` - Simple test command you say test bot will response.\n" \ + f"`{t}invite` - Info about how to make bot join more rooms.\n" \ + f"`{t}calc` {{num}} {{+, - , / , * }} {{num}} - Calculator.\n" \ + f"`{t}ping` - tests the bot ping.\n" \ + f"`{t}gpt` - Send commands to clankers.\n" \ + f"`{t}tr` - Translate text to target language.\n" \ + f"`{t}copypasta` - Add or send copypastas.\n" \ + f"`{t}wiki` - Ughm acthually lookup.\n" \ + f"`{t}wa` - WolframAlpha query, can convert anything to anything.\n" + + if is_mod: s += f"`{t}mod-help` - Mod commands.\n" + s += f"`{t}nsfw-help` - Sends a list of all available nsfw commands.\n" + + s += f"\n#### Fun Commands: \n" \ + f"`{t}dice` - Rolls a random number between 1 and 6.\n" \ + f"`{t}roll` {{num}} - Rolls a random number between 1 and num.\n" \ + f"`{t}say` - says what the user told it to say!.\n" \ + f"`{t}cl` - Common L.\n" \ + f"`{t}l` - L.\n" \ + f"`{t}dm` - Whispers the user Hi :3.\n" \ + f"`{t}rps` - Simple Rock paper scissors game.\n" \ + f"`{t}dogfact` - Gives a random dogfact using the Dogfact API!.\n" \ + f"`{t}catfact` - Gives a random Catfact using cat fact API\n" \ + f"`{t}joke` - Very simple command just gives a joke using the Joke API.\n" \ + f"`{t}coinflip` - a Command so easy a child could do it.\n" \ + f"`{t}fact` - Gives a random useless fact.\n" \ + f"`{t}urban` - uses the urban dictionary to search for the word.\n" \ + f"`{t}shitpost` - Sends random shitpost from local library\n" \ + f"`{t}cat` - Cat :3 \n" \ + f"`{t}neko` - Neko command \n" \ + f"`{t}advice` - Gives the user a life Advice \n" \ + f"`{t}quote` - Gives a random quote using yet another API. \n" \ + f"`{t}gif` - Allows the user to search for gifs using giphy (ratelimited sometimes)\n" \ + f"`{t}pun` - Gives user a simple pun. \n" \ + f"`{t}avatar` - Sends the user Avatar (Beta) \n" \ + f"`{t}poll` - Democracy \n" \ + f"`{t}uno` - Four colors\n" \ + f"`{t}bj` - I play JACK bro! \n" + return s + + async def _generate_mod_help(self): + t = self.trigger + return f"## Mod commands:\n" \ + f"`{t}kick` - Kick a user.\n" \ + f"`{t}ban` - Ban a user.\n" \ + f"`{t}mute` - Mute a user.\n" \ + f"`{t}warn` - Warn a user.\n" \ + f"`{t}unban` - Unban a user.\n" \ + f"`{t}unmute` - Unmute a user.\n" + + async def _generate_nsfw_help(self): + t = self.trigger + return f"### NSFW commands:\n" \ + f"`{t}r34` - Rule34 search. (doesnt work on some ISPs)\n" \ + f"`{t}hentai` - Grabs a random hentai image \n" \ + f"`{t}boobs` - Grabs a random boobs image \n" \ + f"`{t}hboobs` - Grabs a random hboobs image simple. \n" + + async def _generate_video_list(self): + t = self.trigger + return f"Videos:\n`{t}ihnk` Sends a I have no knwoledge about any of this video. \n`{t}scary` Sends the oh oh scary oh oh shiver me timbers video" + + async def fetch(self, url, method='GET', json_data=None, headers=None, json=True, content_type_fix=False): + try: + async with aiohttp.ClientSession() as s: + async with s.request(method, url, json=json_data, headers=headers) as r: + if r.status == 200: + if json: + return await r.json(content_type=None) if content_type_fix else await r.json() + else: + return await r.read() + except Exception as e: logging.error(f"Fetch err: {e}") + return None + + async def upload_send(self, room, url): + data = await self.fetch(url, json=False) + if data: + mime = self._sniff_mime(data) + if mime: + ext = mime.split('/')[1].replace('jpeg', 'jpg') +# Octet Stream is used in bin files with no known type afaik + else: + mime = "application/octet-stream" + ext = "bin" + path = url.split('?')[0].lower() + if path.endswith('.png'): mime, ext = 'image/png', 'png' + elif path.endswith(('.jpg', '.jpeg')): mime, ext = 'image/jpeg', 'jpg' + elif path.endswith('.gif'): mime, ext = 'image/gif', 'gif' + elif path.endswith('.webp'): mime, ext = 'image/webp', 'webp' + + filename = f"image.{ext}" + l = await self.uploader.upload(data, filename, mime) + if l: await self.send_group(room, l) + + def _sniff_mime(self, data: bytes) -> Optional[str]: + if not data or len(data) < 100: + return None + + if data.startswith(b'\x89PNG\r\n\x1a\n'): return 'image/png' + if data.startswith(b'\xff\xd8\xff'): return 'image/jpeg' + if data.startswith(b'GIF8'): return 'image/gif' + if data.startswith(b'RIFF') and data[8:12] == b'WEBP': return 'image/webp' + + return None + +# GET AVATAR TO UPLOAD (SOMETIMES BROKEN) + async def _get_avatar_data(self, jid_str, is_muc_occupant=False) -> (Optional[bytes], Optional[str]): + j = slixmpp.JID(jid_str) + target = j.full if is_muc_occupant else j.bare + + async def try_0153(use_ifrom=False): + try: + kwargs = {'jid': target} + if use_ifrom: kwargs['ifrom'] = self.boundjid.full + + iq = await self.plugin["xep_0054"].get_vcard(**kwargs) + photo = iq["vcard_temp"]["PHOTO"] + binval = photo["BINVAL"] + type_ = photo["TYPE"] + + if binval: + data = base64.b64decode(binval.replace('\n', '').replace(' ', '').strip()) + mime = self._sniff_mime(data) + if not mime and type_ and len(type_.split('/')) > 1: mime = type_ + if not mime: mime = "image/png" + return data, mime + except (IqError, Exception) as e: + logging.debug(f"XEP-0153 ({'ifrom' if use_ifrom else 'std'}) failed for {target}: {e}") + return None + + async def try_0084(use_ifrom=False): + try: + kwargs = {'jid': target, 'node': "urn:xmpp:avatar:data", 'max_items': 1} + if use_ifrom: kwargs['ifrom'] = self.boundjid.full + + iq = await self.plugin["xep_0060"].get_items(**kwargs) + items = iq["pubsub"]["items"]["substanzas"] + if items: + data_elem = items[0].xml.find("{urn:xmpp:avatar:data}data") + if data_elem is not None and data_elem.text: + b64_text = data_elem.text.replace('\n', '').replace(' ', '').strip() + data = base64.b64decode(b64_text) + mime = self._sniff_mime(data) or "image/png" + return data, mime + + if "data" in items[0]: + d = items[0]["data"] + if isinstance(d, str): data = base64.b64decode(d) + else: data = d + return data, "image/png" + except (IqError, Exception) as e: + logging.debug(f"XEP-0084 ({'ifrom' if use_ifrom else 'std'}) failed for {target}: {e}") + return None + + + res = await try_0153() + if res: return res + + res = await try_0084() + if res: return res + + logging.debug(f"Primary avatar methods failed for {target}. Trying fallback logic.") + res = await try_0153(use_ifrom=True) + if res: return res + + res = await try_0084(use_ifrom=True) + if res: return res + + return None, None + + async def warn(self, room, nick, reason): + if self.cfg['Moderation'].get('moderation_enabled') == 'NONE': return + + k = f"{room}/{nick}" + c = self.warnings.get(k, 0) + 1 + self.warnings[k] = c + self.db.set('warnings', self.warnings) + await self.send_group(room, f"⚠️ {nick} Warn {c}: {reason}") + if c >= int(self.cfg['Moderation'].get('max_warnings',3)): + self.warnings[k] = 0; self.db.set('warnings', self.warnings) + act = self.cfg['Moderation'].get('warning_action','mute') + try: + if act == 'kick': + await self.plugin['xep_0045'].set_role(room, nick, 'none', reason="Too many warnings") + elif act == 'mute': + await self.plugin['xep_0045'].set_role(room, nick, 'visitor', reason="Too many warnings") + except IqError: + await self.send_group(room, f"Cannot enforce warning: Permission denied.") + + def is_mod(self, room, nick): + if nick.lower() in self.config_moderators: + return True + + try: + room_roster = self.plugin['xep_0045'].rooms.get(room) + if not room_roster: + target_bare = slixmpp.JID(room).bare + for r_jid in self.plugin['xep_0045'].rooms: + if slixmpp.JID(r_jid).bare == target_bare: + room_roster = self.plugin['xep_0045'].rooms[r_jid] + break + + if room_roster: + if nick in room_roster: + user = room_roster[nick] + if user['affiliation'] in ('admin', 'owner') or user['role'] == 'moderator': + return True + except Exception as e: + logging.debug(f"is_mod check failed: {e}") + + return False + + def is_nsfw_room(self, room): + mode = self.cfg['Features'].get('nsfw_mode', 'NONE') + if mode == 'ALL': return True + if mode == 'NONE': return False + allowed = [r.strip() for r in self.cfg['Features'].get('nsfw_rooms', '').split(',')] + return str(room) in allowed + + def find_user(self, room, partial): + try: + nicks = list(self.plugin['xep_0045'].get_roster(room).keys()) + match = get_close_matches(partial, nicks, n=1) + return match[0] if match else None + except: return None + + async def send_group(self, room, text): + self.send_message(mto=room, mbody=str(text), mtype='groupchat') + + async def send_private_muc(self, room, nick, text): + self.send_message(mto=f"{room}/{nick}", mbody=str(text), mtype='chat') + + async def send_private_msg_by_jid(self, jid, text): + self.send_message(mto=jid, mbody=str(text), mtype='chat') + +async def main(): + logging.basicConfig(level=logging.DEBUG, format="%(asctime)s %(levelname)s %(message)s") + if not Path('config.ini').exists(): + print("Config missing. Please make a config. Otherwise how would the bot work?") + return + bot = RevoltBot() + bot.connect() + await bot.disconnected + +if __name__ == "__main__": + asyncio.run(main()) +