Files
Chrobry-XMPP/chrobry.py
2026-01-12 15:49:36 +00:00

1506 lines
64 KiB
Python
Raw Blame History

This file contains invisible Unicode characters
This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
import slixmpp
import asyncio
import logging
import configparser
import json
import random
import re
import aiohttp
import operator
import mimetypes
import base64
from pathlib import Path
from typing import Dict, List, Optional, Any
from difflib import get_close_matches
from dataclasses import dataclass, field
from io import BytesIO
from urllib.parse import urlencode, quote
from slixmpp.exceptions import IqError, IqTimeout
# The MOST ESSENTIAL PART of the code!
N_QUOTES = [
"Did you just slap me with that arm?",
"Yeah sorry, it's just my head kinda hurts.",
"Hey, are you new to our squad?",
"You're a little, uh, short for a Disassembly Drone...",
"I'm Serial Designation N, nice to meet you!",
"I'm kind of the leader of the squad in this city.",
"That's not true, everyone tells me I'm useless and terrible.",
"Wait! I'm not supposed to tell you that part! Biscuits...",
"Well, honesty is the best policy.",
"I also can't seem to remember the past 3 hours of my life, ah!, but I'm sure that'll sort itself out.",
"Hoo, stuck yourself?",
"Just pop it in your mouth. Our saliva neutralizes the nanites, otherwise I'd be constantly disassembling myself.",
"Sure! I love doing anything.",
"Sweet, uh, I'm open to new things I guess.",
"Consider it err... repressed!",
"No worries, I'm N, but a whole letter is a lot to remember! Ha haha!",
"So obviously a lot of mutual respect there, but secretly I actually kind of have a crush on her.",
"YOU CAN'T TELL HER OKAY?!",
"J's awesome!",
"Hey, let me give you the tour!",
"Outside are the corpse wall, thingies, and here are the buttons!",
"Beep, boop, bop, bop bop, bop, beep, boop, bop bop, beep...",
"More of a one use missile; they never taught us how to land.",
"Other than ingesting their WARM, SWEET oil to avoid overheating and dying, I guess I just wanna be useful.",
"I was given a job, and I always wanna try my best.",
"Oh my! You sure are rebellious! It's kind of exciting...",
"...but not as fun as, uh, following the rules.",
"You know, I left an-an extremely dangerous weapo- u-uh, excuse, o-outside!",
"Hey, fellas!",
"Ooh, deal me in, I love Rummy.",
"Wait, no, ah, I'm going to murder everyone...",
"...Rain check!",
"I'm sorry. I really enjoyed our time together, but I can't have you shooting V with that thing.",
"You, me, name, remember?",
"I get the feeling the company doesn't actually love robots, and, like, we might be robots?",
"...I've made a terrible mistake. It's cool how immediately I could tell.",
"Th-Thanks J, al-always looking-ng out for me. Yo-you're awesome.",
"A-ah, biscuits-s. I-I'm sor-rry. I ruin-ned your card game, that made you have an awk-kward moment with your dad.",
"Being rebellious is a lot harder than it loo-oks. Thanks for showing me the ropes.",
"Tha-at's super fa-air. Ah, I screwed up.",
"Oh uh, J, you're sometimes kinda mean to me, and I wish you weren't.",
"Just some constructive criticism!",
"(J please.)",
"AAAH, MY MIND'S IN A WEIRD PLACE, DON'T READ INTO THIS!",
"UZI! I'm so, so sorry, have fun repressing this, heh-",
"Huh? Oh!, uh, N! I'm an angsty rebellious Disassembly Drone now.",
"Nice to meet you, Mr. Uzi-",
"I'd join you if the sun didn't kill me. Hope you're having important character growth or something, though!",
"I was the PILOT? That's aweso-! (I crashed and ruined everything...)",
"Spaceship pilot. Origin story.",
"I do want to be dapper...",
"We can't interact with the workers anymore V... we're too dangerous.",
"I'm not bringing you for prom murder, V!",
"J went holo-spooky-snake-crab and we maybe grew up in a haunted mansion!",
"Aren't you worried we have no idea what we even are?",
"V, if you're hiding something, we can figure it out together",
"Even if we each only have pieces",
"No! No time!",
"Dapper N~",
"I could help",
"But you probably don't want me...",
"Dapper Buddies!",
"This isn't what i expected at all!",
"Thank you, Lizzy-!",
"V, you kind of suck.",
"Welcome, campers! Let's sound off! One, two...",
"To the bunks!",
"Close one!",
"V, we can't hurt Uzi!",
"I don't know what you are talking about because you won't tell me!",
"What are you so afraid of!?",
"I am! Uzi is, she's a kid like us, V! What is wrong with you!?",
"Easy there, buddy!",
"What did I say about antagonizing her?",
"Uzi, you want to tell me what's the matter?",
"Before we met, scary stuff was actually...pretty scary, and tonight too, 'cause you weren't with me to make it fun somehow, I kind of forgot what that was like.",
"Then we'll stick together!",
"Kidding! Just avoid another whole spire!",
"Baby steps. Together?",
"Us as lizards. J drew herself.",
"Don't give me those eyes.",
"Ah, we'll ask Tessa, OK? If not... Movie night!",
"Hi, Tessa.",
"You guys are... locking her up?",
"I-I-I told her to say that! And brought her here. I-I just... hate orders...?",
"And your fan's dumb...?",
"NO! AAAH! STOP!",
"Go get it, goobers!",
"WOAH! WE WERE BUDDIES! ...darkXwolf17...?",
"So, these are memories, and future me is dead at a sleepover?",
"'Cause... 'Cause of you? 'Cause you're doing that? OW!",
"...'Kay. You kind of seem like an evil ghost witch, though. AH!",
"Why? Who are you?",
"Ugh, J has it...",
"Psst! Tessa!",
"We need the basement key. This bird's from the future.",
"Who? Our Cyn? Nah, she's cool.",
"Not creepy! Sweet!",
"DARKXWOLF17! NOO-HO-HO-HO-HOOO!",
"Is V... okay? Like, in the future?",
"Not dealing with this great, to be immediately honest...! ...Okay, it's gone.",
"Whoa, Cyn! For an eldritch... uh, monster thingy—",
"I got you.",
"T-Tessa...?",
"(Chuckling) It is you.",
"Why?",
"And for Mom backstory. Right, dude?",
"(Giggling like a child) It tickles! The bad version, though!",
"...Doing great, dude.",
"Then Uzi... You can fix her, right? That's why you're here?",
"V! Is Uzi",
"''IT SUCKS PRETTY HARD''",
"DON'T!",
"Uzi, no!",
"V, come on!",
"NO, NO, NO! V, WE NEED YOU!",
"V, PLEASE!",
"Uzi, help!",
"Use your Solver!",
"Oh, no no, it's okay.",
"We're not going to hurt you. Okay?",
"I deserve this, I deserve this, I deserve this!",
"T-Thank you mine shaft trauma ghost, I can hear you fine from there!",
"Uzi's dad? What-",
"Sorry, ma'am. I won't keep secrets from Uzi anymore...even if you say it might hurt her",
"Gave her ungodly eldritch genetics!",
"I promise.…to wait until you're ready to tell her yourself?",
"You knew about the patch. Yes, or no? One chance.",
"Hey.. buddy... Yeah, that should help...",
"All I know is.. I need you. To figure things out... together..?",
"NORI! I should mention Uzi and I-!",
"HahaohaAAAAAAAAAAAAAH!!!!",
"UZI!!! THAT WAS YOUR MOM! WAIT, NO! AAAAAHHHHHHH!",
"Uzi DON'T YOU DARE!!!",
"Heeheehee! Spaceship pilot!",
"AAAACK! Sorry, I didn't know- Oh.",
"Yeah, I'm kind of, like, actually mad about what you did. But we can -- TALK LATER!!",
"V, STOP!",
"No more tricks.",
"I dunno, I feel like we need a secret handshake or something, right?",
"Uzi! Uzi!",
"UUUUUUUUUZIIIIIII!!!!",
"It's you!",
"Whooooo-hoo-hoo-hoo! That's my girlfriend! "
]
EMOJIS = ['1','2','3','4','5','6','7','8','9','🔟']
# For dictionary
TDK_HEADERS = {
'User-Agent': 'Mozilla/5.0 (Linux; Android 10; K) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/137.0.0.0 Safari/537.36',
'Accept': 'application/json, text/javascript, */*; q=0.01',
'Accept-Language': 'en-US,en;q=0.5',
'Accept-Encoding': 'gzip, deflate, br, zstd',
'X-Requested-With': 'XMLHttpRequest',
'DNT': '1',
'Sec-GPC': '1',
'Connection': 'keep-alive',
'Referer': 'https://sozluk.gov.tr/',
'Sec-Fetch-Dest': 'empty',
'Sec-Fetch-Mode': 'cors',
'Sec-Fetch-Site': 'same-origin',
'sec-ch-ua-platform': '"Android"',
'sec-ch-ua': '"Not/A)Brand";v="8", "Chromium";v="137", "Google Chrome";v="137"',
'sec-ch-ua-mobile': '?1',
'Save-Data': 'on',
'Priority': 'u=0'
}
class DataManager:
def __init__(self, filename):
self.filename = Path(filename)
self.data = self._load()
def _load(self):
if not self.filename.exists(): return {}
try: return json.loads(self.filename.read_text(encoding='utf-8'))
except: return {}
def save(self):
self.filename.write_text(json.dumps(self.data, indent=2), encoding='utf-8')
def get(self, key, default=None):
return self.data.get(key, default)
def set(self, key, value):
self.data[key] = value
self.save()
# Handles image uploads using your xmpp server or catbox/litterbox (default 72 hours removal)
class AsyncFileUploader:
def __init__(self, client, service='xmpp', api_key=None):
self.client = client
self.service = service.lower()
self.api_key = api_key
async def upload(self, data: bytes, filename: str, mime_type: str) -> Optional[str]:
try:
if self.service == 'xmpp':
input_file = BytesIO(data)
return await self.client['xep_0363'].upload_file(
input_file=input_file, filename=filename, size=len(data),
content_type=mime_type, timeout=60
)
elif self.service == 'catbox':
url = "https://catbox.moe/user/api.php"
form = aiohttp.FormData()
form.add_field('reqtype', 'fileupload')
if self.api_key: form.add_field('userhash', self.api_key)
form.add_field('fileToUpload', data, filename=filename, content_type=mime_type)
async with aiohttp.ClientSession() as s:
async with s.post(url, data=form) as r:
if r.status == 200: return (await r.text()).strip()
elif self.service == 'litterbox':
url = "https://litterbox.catbox.moe/resources/internals/api.php"
form = aiohttp.FormData()
form.add_field('reqtype', 'fileupload')
form.add_field('time', '72h')
form.add_field('fileToUpload', data, filename=filename, content_type=mime_type)
async with aiohttp.ClientSession() as s:
async with s.post(url, data=form) as r:
if r.status == 200: return (await r.text()).strip()
except Exception as e:
logging.error(f"Upload failed: {e}")
return None
@dataclass
class Poll:
id: str
room: str
creator: str
question: str
options: List[str]
votes: Dict[str, int] = field(default_factory=dict)
anon: bool = False
max_votes: Optional[int] = None
closed: bool = False
class PollManager:
def __init__(self, bot):
self.bot = bot
self.polls: Dict[str, Poll] = {}
self.cnt = 0
def create(self, room, sender, args):
parts = re.findall(r'"([^"]+)"', args)
if len(parts) < 3:
return f"Usage: {self.bot.trigger}poll \"Question\" \"Opt1\" \"Opt2\" [anon=true] [votes=N]"
q, opts = parts[0], parts[1:11]
anon = 'anon=true' in args.lower()
max_v = None
mv_match = re.search(r'votes=(\d+)', args)
if mv_match: max_v = int(mv_match.group(1))
self.cnt += 1
pid = str(self.cnt)
p = Poll(pid, room, sender, q, opts, {}, anon, max_v, False)
self.polls[pid] = p
return self._announce(p)
def _announce(self, p):
lines = [f"📊 **Poll {p.id}**: {p.question}"]
for i, o in enumerate(p.options):
lines.append(f"{EMOJIS[i]} {o}")
lines.append(f"Vote: {self.bot.trigger}vote {p.id} <number>")
if p.anon: lines.append("🔒 Anonymous voting")
return "\n".join(lines)
def vote(self, sender, args):
parts = args.split()
if len(parts) < 2: return "Usage: vote <id> <number>"
pid, choice = parts[0], parts[1]
if pid not in self.polls: return "Poll not found."
p = self.polls[pid]
if p.closed: return "Poll closed."
try:
idx = int(choice) - 1
if not (0 <= idx < len(p.options)): raise ValueError
p.votes[sender] = idx
return "Vote cast."
except:
return "Invalid option."
def close(self, sender, args):
pid = args.strip()
if pid not in self.polls: return "Poll not found."
p = self.polls[pid]
if sender != p.creator and not self.bot.is_mod(p.room, sender):
return "Only creator can close."
p.closed = True
return self._results(p)
def _results(self, p):
counts = [0] * len(p.options)
for v in p.votes.values():
if 0 <= v < len(counts): counts[v] += 1
total = sum(counts)
lines = [f"📊 **Results** - {p.question}"]
for i, o in enumerate(p.options):
pct = (counts[i] / total * 100) if total else 0
lines.append(f"{EMOJIS[i]} {o}: {counts[i]} ({pct:.1f}%)")
return "\n".join(lines)
class UnoGame:
def __init__(self, room, bot):
self.room = room
self.bot = bot
self.players = []
self.hands = {}
self.deck = []
self.discard = []
self.turn = 0
self.direction = 1
self.started = False
self.color = None
def join(self, nick):
if self.started: return "Started."
if nick not in self.players:
self.players.append(nick)
return f"{nick} joined Uno."
return "Already joined."
def start(self):
if len(self.players) < 2: return "Need 2+ players."
self.started = True
colors = ['red','blue','green','yellow']
vals = [str(i) for i in range(10)] + ['skip','reverse','+2']
self.deck = [f"{c}_{v}" for c in colors for v in vals] * 2
self.deck += ['wild_wild', 'wild_+4'] * 4
random.shuffle(self.deck)
for p in self.players:
self.hands[p] = [self.deck.pop() for _ in range(7)]
self._pm_hand(p)
top = self.deck.pop()
while 'wild' in top:
self.deck.append(top)
random.shuffle(self.deck)
top = self.deck.pop()
self.discard.append(top)
self.color = top.split('_')[0]
return f"Uno Started! Top: {self._fmt(top)}. Turn: {self.players[self.turn]}"
def play(self, nick, arg):
if not self.started or self.players[self.turn] != nick: return
hand = self.hands[nick]
parts = arg.lower().split()
declared_color = None
valid_colors = ['red','blue','green','yellow']
for p in parts:
if p in valid_colors:
declared_color = p
sel_card = None
for c in hand:
c_simple = c.replace('_', ' ').lower()
if arg.lower() in c_simple:
sel_card = c
break
if "wild" in parts and "wild" in c_simple:
if "+4" in arg and "+4" in c:
sel_card = c
break
elif "+4" not in arg and "+4" not in c:
sel_card = c
break
if not sel_card: return "Card not in hand."
top = self.discard[-1]
t_col = self.color
t_val = top.split('_')[1]
s_col, s_val = sel_card.split('_')
valid = False
if 'wild' in s_col:
valid = True
if not declared_color:
asyncio.create_task(self.bot.send_private_muc(self.room, nick, "⚠️ Please specify a color (e.g. `play wild red`)."))
self._pm_hand(nick)
return "Specify color (e.g. play wild red)"
self.color = declared_color
elif s_col == t_col or s_val == t_val:
valid = True
self.color = s_col
if not valid: return "Invalid move."
hand.remove(sel_card)
self.discard.append(sel_card)
msg = f"{nick} played {self._fmt(sel_card)}."
if not hand:
self.started = False
return f"{msg}\n🏆 {nick} Wins!"
if len(hand) == 1: msg += " UNO!"
if 'skip' in s_val: self._advance()
elif 'reverse' in s_val:
self.direction *= -1
if len(self.players)==2: self._advance()
elif '+2' in s_val:
self._advance()
self._draw(self.players[self.turn], 2)
elif '+4' in s_val:
self._advance()
self._draw(self.players[self.turn], 4)
self._advance()
return f"{msg}\nNext: {self.players[self.turn]} (Color: {self.color})"
def draw(self, nick):
if self.players[self.turn] != nick: return
self._draw(nick, 1)
self._advance()
return f"{nick} drew."
def _draw(self, nick, n):
for _ in range(n):
if not self.deck:
self.deck = self.discard[:-1]
self.discard = [self.discard[-1]]
random.shuffle(self.deck)
if self.deck: self.hands[nick].append(self.deck.pop())
self._pm_hand(nick)
def _advance(self):
self.turn = (self.turn + self.direction) % len(self.players)
next_player = self.players[self.turn]
self._pm_hand(next_player)
def _fmt(self, c):
if 'wild' in c: return f"🌈 {c.split('_')[1]}"
em = {'red':'🟥','blue':'🟦','green':'🟩','yellow':'🟨'}
return f"{em.get(c.split('_')[0],'')} {c.split('_')[1]}"
def _pm_hand(self, nick):
asyncio.create_task(self.bot.send_private_muc(self.room, nick, " | ".join(self._fmt(c) for c in self.hands[nick])))
# JACK IMPLEMENTATION
class MultiplayerBlackjackGame:
def __init__(self, room, bot):
self.room = room
self.bot = bot
self.players = []
self.hands = {}
self.d_hand = []
self.deck = []
self.turn_idx = 0
self.status = "JOINING"
suits = ['', '', '', '']
ranks = ['2','3','4','5','6','7','8','9','10','J','Q','K','A']
self.deck = [{'rank': r, 'suit': s} for s in suits for r in ranks] * 2
random.shuffle(self.deck)
def join(self, nick):
if self.status != "JOINING": return "Game already started."
if nick in self.players: return "Already joined."
self.players.append(nick)
return f"{nick} joined Blackjack."
def start(self):
if len(self.players) < 1: return "Need players."
self.status = "PLAYING"
self.turn_idx = 0
for p in self.players:
self.hands[p] = [self.deck.pop(), self.deck.pop()]
self.d_hand = [self.deck.pop(), self.deck.pop()]
return self._status_msg()
def hit(self, nick):
if self.status != "PLAYING": return "Not playing."
if self.players[self.turn_idx] != nick: return "Not your turn."
self.hands[nick].append(self.deck.pop())
val = self._calc(self.hands[nick])
res = f"{nick} hits: {self._fmt(self.hands[nick])} ({val})"
if val >= 21:
res += " (Bust!)" if val > 21 else " (21!)"
self._advance_turn()
return res
def stand(self, nick):
if self.status != "PLAYING": return "Not playing."
if self.players[self.turn_idx] != nick: return "Not your turn."
res = f"{nick} stands."
self._advance_turn()
return res
def _advance_turn(self):
self.turn_idx += 1
if self.turn_idx >= len(self.players):
self.status = "DEALING"
def resolve_game(self):
while self._calc(self.d_hand) < 17:
self.d_hand.append(self.deck.pop())
d_val = self._calc(self.d_hand)
lines = [f"🎰 **Results** (Dealer: {self._fmt(self.d_hand)} - {d_val})"]
for p in self.players:
p_val = self._calc(self.hands[p])
if p_val > 21:
lines.append(f"{p}: {p_val} (Bust)")
elif d_val > 21 or p_val > d_val:
lines.append(f"{p}: {p_val} (Win)")
elif p_val == d_val:
lines.append(f"⚖️ {p}: {p_val} (Push)")
else:
lines.append(f"{p}: {p_val} (Loss)")
return "\n".join(lines)
def _status_msg(self):
if self.status == "PLAYING":
curr = self.players[self.turn_idx]
msg = f"🃏 **Blackjack**\nDealer: [{self.d_hand[0]['rank']}{self.d_hand[0]['suit']}] [??]\n"
for p in self.players:
mark = "👉 " if p == curr else ""
msg += f"{mark}{p}: {self._fmt(self.hands[p])} ({self._calc(self.hands[p])})\n"
return msg
return ""
def _calc(self, hand):
val = 0
aces = 0
for c in hand:
if c['rank'] in ['J','Q','K']: val += 10
elif c['rank'] == 'A': val += 11; aces += 1
else: val += int(c['rank'])
while val > 21 and aces: val -= 10; aces -= 1
return val
def _fmt(self, hand):
return " ".join([f"[{c['rank']}{c['suit']}]" for c in hand])
# MAIN CLASS
class RevoltBot(slixmpp.ClientXMPP):
def __init__(self):
self.cfg = configparser.ConfigParser()
if not Path('config.ini').exists(): raise FileNotFoundError("config.ini missing")
self.cfg.read('config.ini')
super().__init__(self.cfg['XMPP']['jid'], self.cfg['XMPP']['password'])
self.trigger = self.cfg['Bot']['trigger']
self.nick = self.cfg['Bot']['nickname']
self.rooms = [r.strip() for r in self.cfg['XMPP']['rooms'].split(',')]
self.db = DataManager('bot_data.json')
self.copypastas = DataManager('copypastas.json')
self.warnings = self.db.get('warnings', {})
self.wordlist = set()
self.config_moderators = set()
if self.cfg.has_option('Bot', 'moderators'):
raw_mods = self.cfg['Bot']['moderators']
self.config_moderators = {m.strip().lower() for m in raw_mods.split(',') if m.strip()}
# Prefers English -> German -> French -> Espanol for word definitions
# Languages are chosen because they are the most spoken languages with Latin alphabet
# Configuration allows change this part
self.wiktionary_priority = ['en', 'de', 'fr', 'es']
if self.cfg.has_option('Features', 'wiktionary_priority'):
raw_p = self.cfg['Features']['wiktionary_priority']
self.wiktionary_priority = [p.strip() for p in raw_p.split(',') if p.strip()]
self.uploader = AsyncFileUploader(self,
self.cfg['Bot'].get('file_host', 'litterbox'),
self.cfg['Bot'].get('file_host_api_key'))
self.polls = PollManager(self)
self.games = {}
self.register_plugin('xep_0030')
self.register_plugin('xep_0045')
self.register_plugin('xep_0199')
self.register_plugin('xep_0363')
self.register_plugin('xep_0054')
self.register_plugin('xep_0060')
self.add_event_handler("session_start", self.start)
self.add_event_handler("groupchat_message", self.on_group)
self.add_event_handler("message", self.on_pm)
# BOT STARTUP
async def start(self, event):
self.send_presence()
await self.get_roster()
# Downloads slurs from the database to block them while moderating
url = self.cfg['Features'].get('wordlist_url')
if url:
try:
async with aiohttp.ClientSession() as s:
async with s.get(url) as r:
if r.status == 200:
self.wordlist = set((await r.text()).lower().splitlines())
logging.info(f"Loaded {len(self.wordlist)} banned words.")
except Exception as e: logging.error(f"Wordlist error: {e}")
for room in self.rooms:
self.join_room_with_retry(room)
def join_room_with_retry(self, room):
room_jid = str(slixmpp.JID(room).bare)
n = self.cfg['XMPP'].get(f'nickname.{room}', self.nick)
try:
self.plugin['xep_0045'].join_muc(room_jid, n)
logging.info(f"Joined {room_jid} as {n}")
except Exception as e:
logging.error(f"Could not join {room}: {e}")
async def on_group(self, msg):
if msg['mucnick'] == self.nick: return
room = msg['from'].bare
body = msg['body'].strip()
mod_enabled = self.cfg['Moderation'].get('moderation_enabled', 'ALL').upper()
should_moderate = True
if mod_enabled == 'NONE': should_moderate = False
elif mod_enabled == 'NON_NSFW' and self.is_nsfw_room(room): should_moderate = False
# Warn a user if they use a slur!
try:
if should_moderate and self.wordlist:
if any(re.search(r'\b'+re.escape(w)+r'\b', body.lower()) for w in self.wordlist):
await self.warn(room, msg['mucnick'], "Message includes word inside the filter.")
return
if not body.startswith(self.trigger): return
parts = body[len(self.trigger):].split(maxsplit=1)
cmd = parts[0].lower()
args = parts[1] if len(parts) > 1 else ""
await self.handle_cmd(room, msg, cmd, args)
except Exception as e:
logging.error(f"Error handling message: {e}")
async def on_pm(self, msg):
if msg['type'] == 'error': return
if msg['from'].bare in self.rooms and not msg['from'].resource: return
if msg['from'].bare == self.boundjid.bare: return
if msg['type'] == 'groupchat': return
body = msg['body'].strip()
if not body: return
if body.startswith(self.trigger):
parts = body[len(self.trigger):].split(maxsplit=1)
cmd = parts[0].lower()
if cmd == 'help':
await self.send_private_msg_by_jid(msg['from'], await self._generate_help_message(False))
elif cmd == 'dm':
await self.send_private_msg_by_jid(msg['from'], "Hi! :3")
async def handle_cmd(self, room, msg, cmd, args):
sender = msg['mucnick']
# Too lazy to remove cdn links
if cmd == 'ihnk':
await self.send_group(room, "https://cdn.purplebored.pl/uploads/y2mate.com%20-%20I%20have%20no%20knowledge%20of%20any%20of%20this%20ThIs%20iS%20sO%20bIzZaRe_480p.mp4")
elif cmd == 'scary':
await self.send_group(room, "https://cdn.purplebored.pl/uploads/y2mate.com%20-%20oh%20oh%20scary%20oh%20oh%20shiver%20me%20timbers_360p.mp4")
# MOST ESSENTIAL COMMAND OF THE BOT
elif cmd == 'n':
await self.send_group(room, random.choice(N_QUOTES))
elif cmd == 'cl':
await self.send_group(room, f"Common {args or 'Everyone'} L")
elif cmd == 'l':
await self.send_group(room, "L")
elif cmd == 'test':
await self.send_group(room, "Ig it works ™️")
elif cmd == 'credits':
await self.send_group(room, "Original by Purplebored. Port by agent-n.")
elif cmd == 'invite':
await self.send_private_muc(room, sender, "Contact the hoster of this instance to add your room.")
elif cmd == 'ping':
await self.send_group(room, "Pong!")
elif cmd == 'video':
await self.send_group(room, await self._generate_video_list())
elif cmd in ['calculate', 'calc']:
parts = args.split()
if len(parts) == 3:
try:
n1 = float(parts[0])
op = parts[1]
n2 = float(parts[2])
ops = {'+':operator.add, '-':operator.sub, '*':operator.mul, '/':operator.truediv}
if op in ops:
if op == '/' and n2 == 0: await self.send_group(room, "Division by zero is not allowed.")
else: await self.send_group(room, f"Result: {ops[op](n1, n2)}")
else: await self.send_group(room, "Invalid op.")
except: await self.send_group(room, "Error.")
else: await self.send_group(room, f"Usage: {self.trigger}calc num + num")
elif cmd == 'dm':
await self.send_private_muc(room, sender, "Hi :3")
# AVATAR DOWNLOAD AND REUPLOAD
elif cmd == 'avatar':
target_nick = args.strip() if args else sender
try:
target_jid = self.plugin['xep_0045'].get_jid_property(room, target_nick, 'jid')
is_muc_occupant = False
if not target_jid:
target_jid = f"{room}/{target_nick}"
is_muc_occupant = True
avatar_data, mime_type = await self._get_avatar_data(target_jid, is_muc_occupant=is_muc_occupant)
if avatar_data:
ext = mimetypes.guess_extension(mime_type) or ".png"
filename = f"avatar{ext}"
url = await self.uploader.upload(avatar_data, filename, mime_type)
if url: await self.send_group(room, f"{url}")
else: await self.send_group(room, "Avatar retrieved but upload failed (literally 1984).")
else:
await self.send_group(room, f"No avatar found for {target_nick}. Please try again later...")
except Exception as e:
logging.error(f"Avatar error: {e}")
await self.send_group(room, "Error fetching avatar.")
# WIKTIONARY LOOKUP
elif cmd in ['wiki', 'wiktionary']:
if not args:
await self.send_group(room, "Usage: wiki <word>")
return
term = args.strip()
url = f"https://en.wiktionary.org/api/rest_v1/page/definition/{quote(term)}?redirect=true"
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/142.0.0.0 Safari/537.36',
'Accept': 'application/json; charset=utf-8; profile="https://www.mediawiki.org/wiki/Specs/definition/0.8.0"'
}
try:
data = await self.fetch(url, headers=headers, json=True)
if not data or 'error' in data:
await self.send_group(room, "Word not found.")
return
available_codes = list(data.keys())
if 'zh' in available_codes: available_codes.remove('zh')
priority_codes = self.wiktionary_priority
selected_code = next((c for c in priority_codes if c in available_codes), None)
if not selected_code and available_codes:
selected_code = available_codes[0]
if not selected_code:
await self.send_group(room, "No definitions found.")
return
entries = data[selected_code]
lang_name = entries[0].get('language', selected_code)
found_defs = []
for entry in entries:
pos = entry.get('partOfSpeech', '')
for d in entry.get('definitions', []):
raw = d.get('definition', '')
clean = re.sub(r'<[^>]+>', '', raw).strip()
if clean:
found_defs.append(f"({pos}) {clean}" if pos else clean)
if found_defs:
resp = f"📖 **{term}** ({lang_name})\n" + "\n".join(f"- {d}" for d in found_defs[:4])
await self.send_group(room, resp)
else:
await self.send_group(room, f"No clean definitions found in {lang_name}.")
except Exception as e:
logging.error(f"Wiki error: {e}")
await self.send_group(room, "Error fetching definition.")
elif cmd in ['tdk', 'sozluk']:
if not args:
await self.send_group(room, "Please provide a word to search.")
return
# Language Society word search (hidden by default to prevent confusion from other dictionary command)
url = f"https://sozluk.gov.tr/gts?ara={args.strip()}"
try:
data = await self.fetch(url, headers=TDK_HEADERS, content_type_fix=True)
if data and isinstance(data, list) and len(data) > 0:
item = data[0]
word = item.get('madde', '')
meanings = item.get('anlamlarListe', [])
origin = item.get('lisan', '')
resp = f"📚 **{word}**"
if origin: resp += f" ({origin})"
resp += "\n"
for idx, m in enumerate(meanings, 1):
resp += f"{idx}. {m.get('anlam', '')}\n"
await self.send_group(room, resp)
else:
await self.send_group(room, "Word not found in TDK dictionary.")
except Exception as e:
logging.error(f"TDK error: {e}")
await self.send_group(room, "Error fetching definition.")
elif cmd == 'quote':
d = await self.fetch("http://api.forismatic.com/api/1.0/?method=getQuote&format=json&lang=en", json=True, content_type_fix=True)
if d:
try:
await self.send_group(room, f"\"{d.get('quoteText','').strip()}\" - {d.get('quoteAuthor','Unknown')}")
except: await self.send_group(room, "Quote error hell yeah!")
# Since API is broken it uses local shitposts
elif cmd == 'shitpost':
p = Path('shitposts')
if not p.exists():
await self.send_group(room, "Shitpost directory missing.")
return
images = [f for f in p.iterdir() if f.suffix.lower() in ['.jpg', '.jpeg', '.png', '.gif', '.webp']]
if not images:
await self.send_group(room, "No shitposts found locally.")
return
try:
target = random.choice(images)
data = target.read_bytes()
mime, _ = mimetypes.guess_type(target.name)
url = await self.uploader.upload(data, target.name, mime or 'application/octet-stream')
if url: await self.send_group(room, url)
else: await self.send_group(room, "Failed to upload local shitpost.")
except Exception as e:
logging.error(f"Shitpost error: {e}")
await self.send_group(room, "Error sending shitpost.")
elif cmd == 'neko':
d = await self.fetch("https://nekos.best/api/v2/neko")
if d: await self.upload_send(room, d['results'][0]['url'])
elif cmd == 'cat':
d = await self.fetch("https://api.thecatapi.com/v1/images/search")
if d: await self.upload_send(room, d[0]['url'])
elif cmd == 'dogfact':
d = await self.fetch("https://dogapi.dog/api/v2/facts")
if d and d['data']: await self.send_group(room, d['data'][0]['attributes']['body'])
elif cmd == 'catfact':
d = await self.fetch("https://meowfacts.herokuapp.com/")
if d and d['data']: await self.send_group(room, d['data'][0])
elif cmd == 'urban':
d = await self.fetch(f"https://api.urbandictionary.com/v0/define?term={args}")
if d and d['list']: await self.send_group(room, f"📖 {d['list'][0]['definition'][:300]}...")
elif cmd == 'joke':
d = await self.fetch("https://v2.jokeapi.dev/joke/Any?type=twopart")
if d and not d.get('error'): await self.send_group(room, f"{d['setup']}\n{d['delivery']}")
elif cmd == 'pun':
d = await self.fetch("https://v2.jokeapi.dev/joke/Pun?type=twopart")
if d and not d.get('error'): await self.send_group(room, f"{d['setup']}\n{d['delivery']}")
elif cmd == 'fact':
d = await self.fetch("https://uselessfacts.jsph.pl/random.json?language=en")
if d: await self.send_group(room, d['text'])
elif cmd == 'advice':
d = await self.fetch("https://api.adviceslip.com/advice", content_type_fix=True)
if d: await self.send_group(room, d['slip']['advice'])
# say command aka parrot mode. Censors slurs with * character
elif cmd == 'say':
text = args
if self.wordlist:
sorted_words = sorted(self.wordlist, key=len, reverse=True)
for w in sorted_words:
if not w: continue
chars = [re.escape(c) for c in w]
pattern_str = r"(?i)\b" + r"[\W_]*".join(chars) + r"\b"
try:
pattern = re.compile(pattern_str)
def repl(match):
return '*' * len(match.group(0))
text = pattern.sub(repl, text)
except: pass
await self.send_group(room, f"{sender} says: {text}")
elif cmd == 'dice':
await self.send_group(room, f"🎲 {random.randint(1,6)}")
elif cmd == 'roll':
try:
limit = int(args.strip())
if limit > 0:
await self.send_group(room, f"🎲 {random.randint(1, limit)}")
else:
await self.send_group(room, "Please pick a valid number.")
except:
await self.send_group(room, "Please specify a number to roll.")
elif cmd == 'coinflip':
await self.send_group(room, f"🪙 {random.choice(['Heads','Tails'])}")
elif cmd == 'rps':
opts = ['rock','paper','scissors']
b = random.choice(opts)
u = args.lower().strip()
if u not in opts:
await self.send_group(room, "rock, paper, or scissors?")
else:
res = "I won!"
if b == u: res = "Tie!"
elif (u=='rock' and b=='scissors') or (u=='paper' and b=='rock') or (u=='scissors' and b=='paper'):
res = "You won!"
await self.send_group(room, f"I chose {b}. {res}")
# GIF command finds and sends the searched GIF
elif cmd == 'gif':
k = self.cfg['Bot'].get('giphy_key')
if not k:
await self.send_group(room, "Giphy API key not configured.")
return
try:
d = await self.fetch(f"https://api.giphy.com/v1/gifs/search?api_key={k}&q={args}&limit=1")
if d and d['data']:
url = d['data'][0]['images']['original']['url']
await self.upload_send(room, url)
else:
await self.send_group(room, f"No GIF found for '{args}'.")
except Exception as e:
logging.error(f"GIF error: {e}")
await self.send_group(room, f"Error searching GIF.")
elif cmd in ['kick','ban','mute','warn','unban','unmute']:
if not self.is_mod(room, sender):
await self.send_group(room, "Not authorized.")
return
target_nick = args.strip()
if not target_nick:
await self.send_group(room, f"Usage: {cmd} <nick/jid>")
return
in_roster = False
try:
roster = self.plugin['xep_0045'].get_roster(room)
if target_nick in roster: in_roster = True
except: pass
try:
if cmd == 'warn':
if not in_roster:
await self.send_group(room, "User not found.")
return
await self.warn(room, target_nick, "Manual Warning")
elif cmd == 'kick':
if not in_roster:
await self.send_group(room, "User not found.")
return
await self.plugin['xep_0045'].set_role(room, target_nick, 'none', reason="Kicked by operator")
await self.send_group(room, f"👢 Kicked {target_nick}. They will return I guess but anyways.")
elif cmd == 'mute':
if not in_roster:
await self.send_group(room, "User not found.")
return
await self.plugin['xep_0045'].set_role(room, target_nick, 'visitor', reason="Muted by operator")
await self.send_group(room, f"🤐 Muted (shutted up) {target_nick}.")
elif cmd == 'unmute':
if not in_roster:
await self.send_group(room, "User not found.")
return
await self.plugin['xep_0045'].set_role(room, target_nick, 'participant', reason="Unmuted by operator")
await self.send_group(room, f"🔊 Unmuted {target_nick}. {target_nick} can speak now again!")
elif cmd == 'ban':
if not in_roster:
await self.send_group(room, "User not found.")
return
jid = self.plugin['xep_0045'].get_jid_property(room, target_nick, 'jid')
if jid:
await self.plugin['xep_0045'].set_affiliation(room, jid=jid, affiliation='outcast', reason="Banned by operator")
await self.send_group(room, f"🚫 Banhammer! whomp whomp {target_nick}")
else:
await self.send_group(room, "Cannot ban (Hidden JID/Anonymous).")
elif cmd == 'unban':
target_jid = target_nick
try:
await self.plugin['xep_0045'].set_affiliation(room, jid=target_jid, affiliation='none', reason="Unbanned by operator")
await self.send_group(room, f"🕊️ Unbanned {target_jid}. Taste of liberty!")
except IqError:
await self.send_group(room, f"Failed to unban {target_jid}. Ensure you are using the valid JID.")
except IqError as e:
await self.send_group(room, f"Permission denied: {e.iq['error']['text'] or 'Bot missing privileges'}")
except Exception as e:
logging.error(f"Mod command failed: {e}")
await self.send_group(room, "Operation failed.")
elif cmd == 'poll':
res = self.polls.create(room, sender, args)
await self.send_group(room, res)
elif cmd == 'vote':
res = self.polls.vote(sender, args)
await self.send_private_muc(room, sender, res)
elif cmd == 'close':
res = self.polls.close(sender, args)
await self.send_group(room, res)
elif cmd == 'uno':
sub = args.split()[0] if args else ''
arg = args[len(sub):].strip()
gid = f"uno_{room}"
g = self.games.get(gid)
if sub == 'join':
if not g:
g = UnoGame(room, self)
self.games[gid] = g
await self.send_group(room, g.join(sender))
elif sub == 'start' and g:
await self.send_group(room, g.start())
elif sub == 'play' and g:
res = g.play(sender, arg)
if res: await self.send_group(room, res)
if res and "Wins" in res: del self.games[gid]
elif sub == 'draw' and g:
res = g.draw(sender)
if res: await self.send_group(room, res)
# JACK!!
elif cmd in ['bj', 'blackjack']:
gid = f"bj_{room}"
sub = args.split()[0].lower() if args else ''
g = self.games.get(gid)
if not sub:
if not g:
g = MultiplayerBlackjackGame(room, self)
self.games[gid] = g
g.join(sender)
await self.send_group(room, g.start())
else:
if g.status == "JOINING":
await self.send_group(room, f"Lobby Open! Type `{self.trigger}bj join` to join or `{self.trigger}bj start` to begin.")
else:
await self.send_group(room, "Game in progress.")
elif sub == 'join':
if not g:
g = MultiplayerBlackjackGame(room, self)
self.games[gid] = g
await self.send_group(room, f"Blackjack lobby created by {sender}. Type `{self.trigger}bj join`.")
res = g.join(sender)
await self.send_group(room, res)
elif sub == 'start' and g:
await self.send_group(room, g.start())
elif sub == 'hit' and g:
res = g.hit(sender)
await self.send_group(room, res)
if g.status == "DEALING":
await self.send_group(room, g.resolve_game())
del self.games[gid]
elif sub == 'stand' and g:
res = g.stand(sender)
await self.send_group(room, res)
if g.status == "DEALING":
await self.send_group(room, g.resolve_game())
del self.games[gid]
else:
await self.send_group(room, f"Blackjack: `{self.trigger}bj` (Solo), `{self.trigger}bj join` (Multi), `{self.trigger}bj hit/stand`")
# Abbreviation will be removed
elif cmd == 'cp' or cmd == 'copypasta':
if args.startswith('add'):
_, name, txt = args.split(maxsplit=2)
d = self.copypastas.get('pastas', {})
d[name] = txt
self.copypastas.set('pastas', d)
await self.send_group(room, "Added.")
elif args == 'list':
await self.send_group(room, ", ".join(self.copypastas.get('pastas',{}).keys()))
else:
txt = self.copypastas.get('pastas',{}).get(args)
if txt: await self.send_group(room, txt)
elif cmd == 'gpt':
if not self.cfg.has_section('AI') or not self.cfg['AI'].get('openai_key'):
await self.send_group(room, "AI not configured.")
return
k = self.cfg['AI']['openai_key']
url = self.cfg['AI'].get('openai_api_url')
pl = {
"model": self.cfg['AI'].get('openai_model'),
"messages": [{"role":"system","content":self.cfg['AI'].get('openai_system_prompt')},
{"role":"user","content":args}]
}
headers = {"Authorization": f"Bearer {k}"}
d = await self.fetch(url, method='POST', json_data=pl, headers=headers)
if d and d.get('choices'): await self.send_group(room, d['choices'][0]['message']['content'])
elif cmd in ['translate', 'tr']:
if not self.cfg.has_section('AI') or not self.cfg['AI'].get('openai_key'):
await self.send_group(room, "AI not configured.")
return
parts = args.split(maxsplit=1)
if len(parts) < 2:
await self.send_group(room, f"Usage: {self.trigger}translate <Language> <Text>")
return
target_lang = parts[0]
text_to_translate = parts[1]
system_prompt = (
f"You are a precise translator. Your sole function is to detect the language of the input text and translate it into {target_lang}. "
f"Provide only the {target_lang} translation with no additional commentary, explanations, notes, or meta-information. "
"Do not include the source language name, translation notes, or any text beyond the translated output. "
f"Maintain the original meaning, tone, and structure as closely as possible while ensuring natural {target_lang} expression."
)
k = self.cfg['AI']['openai_key']
url = self.cfg['AI'].get('openai_api_url')
pl = {
"model": self.cfg['AI'].get('openai_model'),
"messages": [
{"role": "system", "content": system_prompt},
{"role": "user", "content": text_to_translate}
]
}
headers = {"Authorization": f"Bearer {k}"}
d = await self.fetch(url, method='POST', json_data=pl, headers=headers)
if d and d.get('choices'):
await self.send_group(room, d['choices'][0]['message']['content'])
else:
await self.send_group(room, "Translation failed.")
elif cmd in ['r34', 'hentai', 'boobs', 'hboobs']:
if not self.is_nsfw_room(room):
await self.send_group(room, "NSFW not allowed here. Find a NSFW room or add the room to NSFW list smh")
return
if cmd == 'r34':
tag = args if args else 'all'
d = await self.fetch(f"https://api.rule34.xxx/index.php?page=dapi&s=post&q=index&limit=1&json=1&tags={tag}")
if d and len(d) > 0 and 'file_url' in d[0]: await self.upload_send(room, d[0]['file_url'])
elif cmd == 'hentai':
d = await self.fetch("https://nekobot.xyz/api/image?type=hneko")
if d and 'message' in d: await self.upload_send(room, d['message'])
elif cmd == 'boobs':
d = await self.fetch("https://nekobot.xyz/api/image?type=boobs")
if d and 'message' in d: await self.upload_send(room, d['message'])
elif cmd == 'hboobs':
d = await self.fetch("https://nekobot.xyz/api/image?type=hboobs")
if d and 'message' in d: await self.upload_send(room, d['message'])
# Same thing with copypasta applies here too
elif cmd in ['wa', 'wolfram']:
if not args:
await self.send_group(room, "Usage: wa <natural language query>")
else:
appid = None
if self.cfg.has_section('Features'):
appid = self.cfg['Features'].get('wolfram_appid')
if not appid and self.cfg.has_section('Bot'):
appid = self.cfg['Bot'].get('wolfram_appid')
if not appid:
await self.send_group(room, "WolframAlpha AppID not configured.")
else:
try:
url = f"http://api.wolframalpha.com/v1/result?appid={appid}&i={quote(args.strip())}"
data = await self.fetch(url, json=False)
if data:
txt = data.decode('utf-8', errors='ignore').strip()
await self.send_group(room, f"Wolfram says: {txt}")
else:
await self.send_group(room, "No short answer available.")
except Exception as e:
logging.error(f"Wolfram error: {e}")
await self.send_group(room, "Error querying WolframAlpha.")
elif cmd == 'help':
await self.send_group(room, await self._generate_help_message(self.is_mod(room, sender)))
elif cmd == 'mod-help':
if self.is_mod(room, sender): await self.send_group(room, await self._generate_mod_help())
elif cmd == 'nsfw-help':
await self.send_group(room, await self._generate_nsfw_help())
async def _generate_help_message(self, is_mod):
t = self.trigger
s = f"#### List of available commands:\n\n#### Practical Commands: \n" \
f"`{t}video` - Sends a list of all available videos\n" \
f"`{t}help` - Displays this command.\n" \
f"`{t}credits` - Displays bot's credit.\n" \
f"`{t}test` - Simple test command you say test bot will response.\n" \
f"`{t}invite` - Info about how to make bot join more rooms.\n" \
f"`{t}calc` {{num}} {{+, - , / , * }} {{num}} - Calculator.\n" \
f"`{t}ping` - tests the bot ping.\n" \
f"`{t}gpt` - Send commands to clankers.\n" \
f"`{t}tr` <language> <text> - Translate text to target language.\n" \
f"`{t}copypasta` - Add or send copypastas.\n" \
f"`{t}wiki` - Ughm acthually lookup.\n" \
f"`{t}wa` - WolframAlpha query, can convert anything to anything.\n"
if is_mod: s += f"`{t}mod-help` - Mod commands.\n"
s += f"`{t}nsfw-help` - Sends a list of all available nsfw commands.\n"
s += f"\n#### Fun Commands: \n" \
f"`{t}dice` - Rolls a random number between 1 and 6.\n" \
f"`{t}roll` {{num}} - Rolls a random number between 1 and num.\n" \
f"`{t}say` - says what the user told it to say!.\n" \
f"`{t}cl` - Common L.\n" \
f"`{t}l` - L.\n" \
f"`{t}dm` - Whispers the user Hi :3.\n" \
f"`{t}rps` - Simple Rock paper scissors game.\n" \
f"`{t}dogfact` - Gives a random dogfact using the Dogfact API!.\n" \
f"`{t}catfact` - Gives a random Catfact using cat fact API\n" \
f"`{t}joke` - Very simple command just gives a joke using the Joke API.\n" \
f"`{t}coinflip` - a Command so easy a child could do it.\n" \
f"`{t}fact` - Gives a random useless fact.\n" \
f"`{t}urban` - uses the urban dictionary to search for the word.\n" \
f"`{t}shitpost` - Sends random shitpost from local library\n" \
f"`{t}cat` - Cat :3 \n" \
f"`{t}neko` - Neko command \n" \
f"`{t}advice` - Gives the user a life Advice \n" \
f"`{t}quote` - Gives a random quote using yet another API. \n" \
f"`{t}gif` - Allows the user to search for gifs using giphy (ratelimited sometimes)\n" \
f"`{t}pun` - Gives user a simple pun. \n" \
f"`{t}avatar` - Sends the user Avatar (Beta) \n" \
f"`{t}poll` - Democracy \n" \
f"`{t}uno` - Four colors\n" \
f"`{t}bj` - I play JACK bro! \n"
return s
async def _generate_mod_help(self):
t = self.trigger
return f"## Mod commands:\n" \
f"`{t}kick` - Kick a user.\n" \
f"`{t}ban` - Ban a user.\n" \
f"`{t}mute` - Mute a user.\n" \
f"`{t}warn` - Warn a user.\n" \
f"`{t}unban` - Unban a user.\n" \
f"`{t}unmute` - Unmute a user.\n"
async def _generate_nsfw_help(self):
t = self.trigger
return f"### NSFW commands:\n" \
f"`{t}r34` - Rule34 search. (doesnt work on some ISPs)\n" \
f"`{t}hentai` - Grabs a random hentai image \n" \
f"`{t}boobs` - Grabs a random boobs image \n" \
f"`{t}hboobs` - Grabs a random hboobs image simple. \n"
async def _generate_video_list(self):
t = self.trigger
return f"Videos:\n`{t}ihnk` Sends a I have no knwoledge about any of this video. \n`{t}scary` Sends the oh oh scary oh oh shiver me timbers video"
async def fetch(self, url, method='GET', json_data=None, headers=None, json=True, content_type_fix=False):
try:
async with aiohttp.ClientSession() as s:
async with s.request(method, url, json=json_data, headers=headers) as r:
if r.status == 200:
if json:
return await r.json(content_type=None) if content_type_fix else await r.json()
else:
return await r.read()
except Exception as e: logging.error(f"Fetch err: {e}")
return None
async def upload_send(self, room, url):
data = await self.fetch(url, json=False)
if data:
mime = self._sniff_mime(data)
if mime:
ext = mime.split('/')[1].replace('jpeg', 'jpg')
# Octet Stream is used in bin files with no known type afaik
else:
mime = "application/octet-stream"
ext = "bin"
path = url.split('?')[0].lower()
if path.endswith('.png'): mime, ext = 'image/png', 'png'
elif path.endswith(('.jpg', '.jpeg')): mime, ext = 'image/jpeg', 'jpg'
elif path.endswith('.gif'): mime, ext = 'image/gif', 'gif'
elif path.endswith('.webp'): mime, ext = 'image/webp', 'webp'
filename = f"image.{ext}"
l = await self.uploader.upload(data, filename, mime)
if l: await self.send_group(room, l)
def _sniff_mime(self, data: bytes) -> Optional[str]:
if not data or len(data) < 100:
return None
if data.startswith(b'\x89PNG\r\n\x1a\n'): return 'image/png'
if data.startswith(b'\xff\xd8\xff'): return 'image/jpeg'
if data.startswith(b'GIF8'): return 'image/gif'
if data.startswith(b'RIFF') and data[8:12] == b'WEBP': return 'image/webp'
return None
# GET AVATAR TO UPLOAD (SOMETIMES BROKEN)
async def _get_avatar_data(self, jid_str, is_muc_occupant=False) -> (Optional[bytes], Optional[str]):
j = slixmpp.JID(jid_str)
target = j.full if is_muc_occupant else j.bare
async def try_0153(use_ifrom=False):
try:
kwargs = {'jid': target}
if use_ifrom: kwargs['ifrom'] = self.boundjid.full
iq = await self.plugin["xep_0054"].get_vcard(**kwargs)
photo = iq["vcard_temp"]["PHOTO"]
binval = photo["BINVAL"]
type_ = photo["TYPE"]
if binval:
data = base64.b64decode(binval.replace('\n', '').replace(' ', '').strip())
mime = self._sniff_mime(data)
if not mime and type_ and len(type_.split('/')) > 1: mime = type_
if not mime: mime = "image/png"
return data, mime
except (IqError, Exception) as e:
logging.debug(f"XEP-0153 ({'ifrom' if use_ifrom else 'std'}) failed for {target}: {e}")
return None
async def try_0084(use_ifrom=False):
try:
kwargs = {'jid': target, 'node': "urn:xmpp:avatar:data", 'max_items': 1}
if use_ifrom: kwargs['ifrom'] = self.boundjid.full
iq = await self.plugin["xep_0060"].get_items(**kwargs)
items = iq["pubsub"]["items"]["substanzas"]
if items:
data_elem = items[0].xml.find("{urn:xmpp:avatar:data}data")
if data_elem is not None and data_elem.text:
b64_text = data_elem.text.replace('\n', '').replace(' ', '').strip()
data = base64.b64decode(b64_text)
mime = self._sniff_mime(data) or "image/png"
return data, mime
if "data" in items[0]:
d = items[0]["data"]
if isinstance(d, str): data = base64.b64decode(d)
else: data = d
return data, "image/png"
except (IqError, Exception) as e:
logging.debug(f"XEP-0084 ({'ifrom' if use_ifrom else 'std'}) failed for {target}: {e}")
return None
res = await try_0153()
if res: return res
res = await try_0084()
if res: return res
logging.debug(f"Primary avatar methods failed for {target}. Trying fallback logic.")
res = await try_0153(use_ifrom=True)
if res: return res
res = await try_0084(use_ifrom=True)
if res: return res
return None, None
async def warn(self, room, nick, reason):
if self.cfg['Moderation'].get('moderation_enabled') == 'NONE': return
k = f"{room}/{nick}"
c = self.warnings.get(k, 0) + 1
self.warnings[k] = c
self.db.set('warnings', self.warnings)
await self.send_group(room, f"⚠️ {nick} Warn {c}: {reason}")
if c >= int(self.cfg['Moderation'].get('max_warnings',3)):
self.warnings[k] = 0; self.db.set('warnings', self.warnings)
act = self.cfg['Moderation'].get('warning_action','mute')
try:
if act == 'kick':
await self.plugin['xep_0045'].set_role(room, nick, 'none', reason="Too many warnings")
elif act == 'mute':
await self.plugin['xep_0045'].set_role(room, nick, 'visitor', reason="Too many warnings")
except IqError:
await self.send_group(room, f"Cannot enforce warning: Permission denied.")
def is_mod(self, room, nick):
if nick.lower() in self.config_moderators:
return True
try:
room_roster = self.plugin['xep_0045'].rooms.get(room)
if not room_roster:
target_bare = slixmpp.JID(room).bare
for r_jid in self.plugin['xep_0045'].rooms:
if slixmpp.JID(r_jid).bare == target_bare:
room_roster = self.plugin['xep_0045'].rooms[r_jid]
break
if room_roster:
if nick in room_roster:
user = room_roster[nick]
if user['affiliation'] in ('admin', 'owner') or user['role'] == 'moderator':
return True
except Exception as e:
logging.debug(f"is_mod check failed: {e}")
return False
def is_nsfw_room(self, room):
mode = self.cfg['Features'].get('nsfw_mode', 'NONE')
if mode == 'ALL': return True
if mode == 'NONE': return False
allowed = [r.strip() for r in self.cfg['Features'].get('nsfw_rooms', '').split(',')]
return str(room) in allowed
def find_user(self, room, partial):
try:
nicks = list(self.plugin['xep_0045'].get_roster(room).keys())
match = get_close_matches(partial, nicks, n=1)
return match[0] if match else None
except: return None
async def send_group(self, room, text):
self.send_message(mto=room, mbody=str(text), mtype='groupchat')
async def send_private_muc(self, room, nick, text):
self.send_message(mto=f"{room}/{nick}", mbody=str(text), mtype='chat')
async def send_private_msg_by_jid(self, jid, text):
self.send_message(mto=jid, mbody=str(text), mtype='chat')
async def main():
logging.basicConfig(level=logging.DEBUG, format="%(asctime)s %(levelname)s %(message)s")
if not Path('config.ini').exists():
print("Config missing. Please make a config. Otherwise how would the bot work?")
return
bot = RevoltBot()
bot.connect()
await bot.disconnected
if __name__ == "__main__":
asyncio.run(main())