forked from purplebored/Chrobry-Revolt
1506 lines
64 KiB
Python
1506 lines
64 KiB
Python
#!/usr/bin/env python3
|
||
|
||
import slixmpp
|
||
import asyncio
|
||
import logging
|
||
import configparser
|
||
import json
|
||
import random
|
||
import re
|
||
import aiohttp
|
||
import operator
|
||
import mimetypes
|
||
import base64
|
||
from pathlib import Path
|
||
from typing import Dict, List, Optional, Any
|
||
from difflib import get_close_matches
|
||
from dataclasses import dataclass, field
|
||
from io import BytesIO
|
||
from urllib.parse import urlencode, quote
|
||
from slixmpp.exceptions import IqError, IqTimeout
|
||
|
||
# The MOST ESSENTIAL PART of the code!
|
||
N_QUOTES = [
|
||
"Did you just slap me with that arm?",
|
||
"Yeah sorry, it's just my head kinda hurts.",
|
||
"Hey, are you new to our squad?",
|
||
"You're a little, uh, short for a Disassembly Drone...",
|
||
"I'm Serial Designation N, nice to meet you!",
|
||
"I'm kind of the leader of the squad in this city.",
|
||
"That's not true, everyone tells me I'm useless and terrible.",
|
||
"Wait! I'm not supposed to tell you that part! Biscuits...",
|
||
"Well, honesty is the best policy.",
|
||
"I also can't seem to remember the past 3 hours of my life, ah!, but I'm sure that'll sort itself out.",
|
||
"Hoo, stuck yourself?",
|
||
"Just pop it in your mouth. Our saliva neutralizes the nanites, otherwise I'd be constantly disassembling myself.",
|
||
"Sure! I love doing anything.",
|
||
"Sweet, uh, I'm open to new things I guess.",
|
||
"Consider it err... repressed!",
|
||
"No worries, I'm N, but a whole letter is a lot to remember! Ha haha!",
|
||
"So obviously a lot of mutual respect there, but secretly I actually kind of have a crush on her.",
|
||
"YOU CAN'T TELL HER OKAY?!",
|
||
"J's awesome!",
|
||
"Hey, let me give you the tour!",
|
||
"Outside are the corpse wall, thingies, and here are the buttons!",
|
||
"Beep, boop, bop, bop bop, bop, beep, boop, bop bop, beep...",
|
||
"More of a one use missile; they never taught us how to land.",
|
||
"Other than ingesting their WARM, SWEET oil to avoid overheating and dying, I guess I just wanna be useful.",
|
||
"I was given a job, and I always wanna try my best.",
|
||
"Oh my! You sure are rebellious! It's kind of exciting...",
|
||
"...but not as fun as, uh, following the rules.",
|
||
"You know, I left an-an extremely dangerous weapo- u-uh, excuse, o-outside!",
|
||
"Hey, fellas!",
|
||
"Ooh, deal me in, I love Rummy.",
|
||
"Wait, no, ah, I'm going to murder everyone...",
|
||
"...Rain check!",
|
||
"I'm sorry. I really enjoyed our time together, but I can't have you shooting V with that thing.",
|
||
"You, me, name, remember?",
|
||
"I get the feeling the company doesn't actually love robots, and, like, we might be robots?",
|
||
"...I've made a terrible mistake. It's cool how immediately I could tell.",
|
||
"Th-Thanks J, al-always looking-ng out for me. Yo-you're awesome.",
|
||
"A-ah, biscuits-s. I-I'm sor-rry. I ruin-ned your card game, that made you have an awk-kward moment with your dad.",
|
||
"Being rebellious is a lot harder than it loo-oks. Thanks for showing me the ropes.",
|
||
"Tha-at's super fa-air. Ah, I screwed up.",
|
||
"Oh uh, J, you're sometimes kinda mean to me, and I wish you weren't.",
|
||
"Just some constructive criticism!",
|
||
"(J please.)",
|
||
"AAAH, MY MIND'S IN A WEIRD PLACE, DON'T READ INTO THIS!",
|
||
"UZI! I'm so, so sorry, have fun repressing this, heh-",
|
||
"Huh? Oh!, uh, N! I'm an angsty rebellious Disassembly Drone now.",
|
||
"Nice to meet you, Mr. Uzi-",
|
||
"I'd join you if the sun didn't kill me. Hope you're having important character growth or something, though!",
|
||
"I was the PILOT? That's aweso-! (I crashed and ruined everything...)",
|
||
"Spaceship pilot. Origin story.",
|
||
"I do want to be dapper...",
|
||
"We can't interact with the workers anymore V... we're too dangerous.",
|
||
"I'm not bringing you for prom murder, V!",
|
||
"J went holo-spooky-snake-crab and we maybe grew up in a haunted mansion!",
|
||
"Aren't you worried we have no idea what we even are?",
|
||
"V, if you're hiding something, we can figure it out together",
|
||
"Even if we each only have pieces",
|
||
"No! No time!",
|
||
"Dapper N~",
|
||
"I could help",
|
||
"But you probably don't want me...",
|
||
"Dapper Buddies!",
|
||
"This isn't what i expected at all!",
|
||
"Thank you, Lizzy-!",
|
||
"V, you kind of suck.",
|
||
"Welcome, campers! Let's sound off! One, two...",
|
||
"To the bunks!",
|
||
"Close one!",
|
||
"V, we can't hurt Uzi!",
|
||
"I don't know what you are talking about because you won't tell me!",
|
||
"What are you so afraid of!?",
|
||
"I am! Uzi is, she's a kid like us, V! What is wrong with you!?",
|
||
"Easy there, buddy!",
|
||
"What did I say about antagonizing her?",
|
||
"Uzi, you want to tell me what's the matter?",
|
||
"Before we met, scary stuff was actually...pretty scary, and tonight too, 'cause you weren't with me to make it fun somehow, I kind of forgot what that was like.",
|
||
"Then we'll stick together!",
|
||
"Kidding! Just avoid another whole spire!",
|
||
"Baby steps. Together?",
|
||
"Us as lizards. J drew herself.",
|
||
"Don't give me those eyes.",
|
||
"Ah, we'll ask Tessa, OK? If not... Movie night!",
|
||
"Hi, Tessa.",
|
||
"You guys are... locking her up?",
|
||
"I-I-I told her to say that! And brought her here. I-I just... hate orders...?",
|
||
"And your fan's dumb...?",
|
||
"NO! AAAH! STOP!",
|
||
"Go get it, goobers!",
|
||
"WOAH! WE WERE BUDDIES! ...darkXwolf17...?",
|
||
"So, these are memories, and future me is dead at a sleepover?",
|
||
"'Cause... 'Cause of you? 'Cause you're doing that? OW!",
|
||
"...'Kay. You kind of seem like an evil ghost witch, though. AH!",
|
||
"Why? Who are you?",
|
||
"Ugh, J has it...",
|
||
"Psst! Tessa!",
|
||
"We need the basement key. This bird's from the future.",
|
||
"Who? Our Cyn? Nah, she's cool.",
|
||
"Not creepy! Sweet!",
|
||
"DARKXWOLF17! NOO-HO-HO-HO-HOOO!",
|
||
"Is V... okay? Like, in the future?",
|
||
"Not dealing with this great, to be immediately honest...! ...Okay, it's gone.",
|
||
"Whoa, Cyn! For an eldritch... uh, monster thingy—",
|
||
"I got you.",
|
||
"T-Tessa...?",
|
||
"(Chuckling) It is you.",
|
||
"Why?",
|
||
"And for Mom backstory. Right, dude?",
|
||
"(Giggling like a child) It tickles! The bad version, though!",
|
||
"...Doing great, dude.",
|
||
"Then Uzi... You can fix her, right? That's why you're here?",
|
||
"V! Is Uzi–",
|
||
"''IT SUCKS PRETTY HARD''",
|
||
"DON'T!",
|
||
"Uzi, no!",
|
||
"V, come on!",
|
||
"NO, NO, NO! V, WE NEED YOU!",
|
||
"V, PLEASE!",
|
||
"Uzi, help!",
|
||
"Use your Solver!",
|
||
"Oh, no no, it's okay.",
|
||
"We're not going to hurt you. Okay?",
|
||
"I deserve this, I deserve this, I deserve this!",
|
||
"T-Thank you mine shaft trauma ghost, I can hear you fine from there!",
|
||
"Uzi's dad? What-",
|
||
"Sorry, ma'am. I won't keep secrets from Uzi anymore...even if you say it might hurt her",
|
||
"Gave her ungodly eldritch genetics!",
|
||
"I promise.…to wait until you're ready to tell her yourself?",
|
||
"You knew about the patch. Yes, or no? One chance.",
|
||
"Hey.. buddy... Yeah, that should help...",
|
||
"All I know is.. I need you. To figure things out... together..?",
|
||
"NORI! I should mention Uzi and I-!",
|
||
"HahaohaAAAAAAAAAAAAAH!!!!",
|
||
"UZI!!! THAT WAS YOUR MOM! WAIT, NO! AAAAAHHHHHHH!",
|
||
"Uzi DON'T YOU DARE!!!",
|
||
"Heeheehee! Spaceship pilot!",
|
||
"AAAACK! Sorry, I didn't know- Oh.",
|
||
"Yeah, I'm kind of, like, actually mad about what you did. But we can -- TALK LATER!!",
|
||
"V, STOP!",
|
||
"No more tricks.",
|
||
"I dunno, I feel like we need a secret handshake or something, right?",
|
||
"Uzi! Uzi!",
|
||
"UUUUUUUUUZIIIIIII!!!!",
|
||
"It's you!",
|
||
"Whooooo-hoo-hoo-hoo! That's my girlfriend! "
|
||
]
|
||
|
||
EMOJIS = ['1️⃣','2️⃣','3️⃣','4️⃣','5️⃣','6️⃣','7️⃣','8️⃣','9️⃣','🔟']
|
||
|
||
# For dictionary
|
||
TDK_HEADERS = {
|
||
'User-Agent': 'Mozilla/5.0 (Linux; Android 10; K) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/137.0.0.0 Safari/537.36',
|
||
'Accept': 'application/json, text/javascript, */*; q=0.01',
|
||
'Accept-Language': 'en-US,en;q=0.5',
|
||
'Accept-Encoding': 'gzip, deflate, br, zstd',
|
||
'X-Requested-With': 'XMLHttpRequest',
|
||
'DNT': '1',
|
||
'Sec-GPC': '1',
|
||
'Connection': 'keep-alive',
|
||
'Referer': 'https://sozluk.gov.tr/',
|
||
'Sec-Fetch-Dest': 'empty',
|
||
'Sec-Fetch-Mode': 'cors',
|
||
'Sec-Fetch-Site': 'same-origin',
|
||
'sec-ch-ua-platform': '"Android"',
|
||
'sec-ch-ua': '"Not/A)Brand";v="8", "Chromium";v="137", "Google Chrome";v="137"',
|
||
'sec-ch-ua-mobile': '?1',
|
||
'Save-Data': 'on',
|
||
'Priority': 'u=0'
|
||
}
|
||
|
||
class DataManager:
|
||
def __init__(self, filename):
|
||
self.filename = Path(filename)
|
||
self.data = self._load()
|
||
|
||
def _load(self):
|
||
if not self.filename.exists(): return {}
|
||
try: return json.loads(self.filename.read_text(encoding='utf-8'))
|
||
except: return {}
|
||
|
||
def save(self):
|
||
self.filename.write_text(json.dumps(self.data, indent=2), encoding='utf-8')
|
||
|
||
def get(self, key, default=None):
|
||
return self.data.get(key, default)
|
||
|
||
def set(self, key, value):
|
||
self.data[key] = value
|
||
self.save()
|
||
|
||
# Handles image uploads using your xmpp server or catbox/litterbox (default 72 hours removal)
|
||
class AsyncFileUploader:
|
||
def __init__(self, client, service='xmpp', api_key=None):
|
||
self.client = client
|
||
self.service = service.lower()
|
||
self.api_key = api_key
|
||
|
||
async def upload(self, data: bytes, filename: str, mime_type: str) -> Optional[str]:
|
||
try:
|
||
if self.service == 'xmpp':
|
||
input_file = BytesIO(data)
|
||
return await self.client['xep_0363'].upload_file(
|
||
input_file=input_file, filename=filename, size=len(data),
|
||
content_type=mime_type, timeout=60
|
||
)
|
||
elif self.service == 'catbox':
|
||
url = "https://catbox.moe/user/api.php"
|
||
form = aiohttp.FormData()
|
||
form.add_field('reqtype', 'fileupload')
|
||
if self.api_key: form.add_field('userhash', self.api_key)
|
||
form.add_field('fileToUpload', data, filename=filename, content_type=mime_type)
|
||
async with aiohttp.ClientSession() as s:
|
||
async with s.post(url, data=form) as r:
|
||
if r.status == 200: return (await r.text()).strip()
|
||
elif self.service == 'litterbox':
|
||
url = "https://litterbox.catbox.moe/resources/internals/api.php"
|
||
form = aiohttp.FormData()
|
||
form.add_field('reqtype', 'fileupload')
|
||
form.add_field('time', '72h')
|
||
form.add_field('fileToUpload', data, filename=filename, content_type=mime_type)
|
||
async with aiohttp.ClientSession() as s:
|
||
async with s.post(url, data=form) as r:
|
||
if r.status == 200: return (await r.text()).strip()
|
||
except Exception as e:
|
||
logging.error(f"Upload failed: {e}")
|
||
return None
|
||
|
||
@dataclass
|
||
class Poll:
|
||
id: str
|
||
room: str
|
||
creator: str
|
||
question: str
|
||
options: List[str]
|
||
votes: Dict[str, int] = field(default_factory=dict)
|
||
anon: bool = False
|
||
max_votes: Optional[int] = None
|
||
closed: bool = False
|
||
|
||
class PollManager:
|
||
def __init__(self, bot):
|
||
self.bot = bot
|
||
self.polls: Dict[str, Poll] = {}
|
||
self.cnt = 0
|
||
|
||
def create(self, room, sender, args):
|
||
parts = re.findall(r'"([^"]+)"', args)
|
||
if len(parts) < 3:
|
||
return f"Usage: {self.bot.trigger}poll \"Question\" \"Opt1\" \"Opt2\" [anon=true] [votes=N]"
|
||
|
||
q, opts = parts[0], parts[1:11]
|
||
anon = 'anon=true' in args.lower()
|
||
max_v = None
|
||
mv_match = re.search(r'votes=(\d+)', args)
|
||
if mv_match: max_v = int(mv_match.group(1))
|
||
|
||
self.cnt += 1
|
||
pid = str(self.cnt)
|
||
p = Poll(pid, room, sender, q, opts, {}, anon, max_v, False)
|
||
self.polls[pid] = p
|
||
|
||
return self._announce(p)
|
||
|
||
def _announce(self, p):
|
||
lines = [f"📊 **Poll {p.id}**: {p.question}"]
|
||
for i, o in enumerate(p.options):
|
||
lines.append(f"{EMOJIS[i]} {o}")
|
||
lines.append(f"Vote: {self.bot.trigger}vote {p.id} <number>")
|
||
if p.anon: lines.append("🔒 Anonymous voting")
|
||
return "\n".join(lines)
|
||
|
||
def vote(self, sender, args):
|
||
parts = args.split()
|
||
if len(parts) < 2: return "Usage: vote <id> <number>"
|
||
pid, choice = parts[0], parts[1]
|
||
|
||
if pid not in self.polls: return "Poll not found."
|
||
p = self.polls[pid]
|
||
if p.closed: return "Poll closed."
|
||
|
||
try:
|
||
idx = int(choice) - 1
|
||
if not (0 <= idx < len(p.options)): raise ValueError
|
||
p.votes[sender] = idx
|
||
return "Vote cast."
|
||
except:
|
||
return "Invalid option."
|
||
|
||
def close(self, sender, args):
|
||
pid = args.strip()
|
||
if pid not in self.polls: return "Poll not found."
|
||
p = self.polls[pid]
|
||
if sender != p.creator and not self.bot.is_mod(p.room, sender):
|
||
return "Only creator can close."
|
||
|
||
p.closed = True
|
||
return self._results(p)
|
||
|
||
def _results(self, p):
|
||
counts = [0] * len(p.options)
|
||
for v in p.votes.values():
|
||
if 0 <= v < len(counts): counts[v] += 1
|
||
|
||
total = sum(counts)
|
||
lines = [f"📊 **Results** - {p.question}"]
|
||
for i, o in enumerate(p.options):
|
||
pct = (counts[i] / total * 100) if total else 0
|
||
lines.append(f"{EMOJIS[i]} {o}: {counts[i]} ({pct:.1f}%)")
|
||
return "\n".join(lines)
|
||
|
||
class UnoGame:
|
||
def __init__(self, room, bot):
|
||
self.room = room
|
||
self.bot = bot
|
||
self.players = []
|
||
self.hands = {}
|
||
self.deck = []
|
||
self.discard = []
|
||
self.turn = 0
|
||
self.direction = 1
|
||
self.started = False
|
||
self.color = None
|
||
|
||
def join(self, nick):
|
||
if self.started: return "Started."
|
||
if nick not in self.players:
|
||
self.players.append(nick)
|
||
return f"{nick} joined Uno."
|
||
return "Already joined."
|
||
|
||
def start(self):
|
||
if len(self.players) < 2: return "Need 2+ players."
|
||
self.started = True
|
||
colors = ['red','blue','green','yellow']
|
||
vals = [str(i) for i in range(10)] + ['skip','reverse','+2']
|
||
self.deck = [f"{c}_{v}" for c in colors for v in vals] * 2
|
||
self.deck += ['wild_wild', 'wild_+4'] * 4
|
||
random.shuffle(self.deck)
|
||
|
||
for p in self.players:
|
||
self.hands[p] = [self.deck.pop() for _ in range(7)]
|
||
self._pm_hand(p)
|
||
|
||
top = self.deck.pop()
|
||
while 'wild' in top:
|
||
self.deck.append(top)
|
||
random.shuffle(self.deck)
|
||
top = self.deck.pop()
|
||
|
||
self.discard.append(top)
|
||
self.color = top.split('_')[0]
|
||
return f"Uno Started! Top: {self._fmt(top)}. Turn: {self.players[self.turn]}"
|
||
|
||
def play(self, nick, arg):
|
||
if not self.started or self.players[self.turn] != nick: return
|
||
hand = self.hands[nick]
|
||
|
||
parts = arg.lower().split()
|
||
declared_color = None
|
||
valid_colors = ['red','blue','green','yellow']
|
||
|
||
for p in parts:
|
||
if p in valid_colors:
|
||
declared_color = p
|
||
|
||
sel_card = None
|
||
for c in hand:
|
||
c_simple = c.replace('_', ' ').lower()
|
||
if arg.lower() in c_simple:
|
||
sel_card = c
|
||
break
|
||
if "wild" in parts and "wild" in c_simple:
|
||
if "+4" in arg and "+4" in c:
|
||
sel_card = c
|
||
break
|
||
elif "+4" not in arg and "+4" not in c:
|
||
sel_card = c
|
||
break
|
||
|
||
if not sel_card: return "Card not in hand."
|
||
|
||
top = self.discard[-1]
|
||
t_col = self.color
|
||
t_val = top.split('_')[1]
|
||
s_col, s_val = sel_card.split('_')
|
||
|
||
valid = False
|
||
if 'wild' in s_col:
|
||
valid = True
|
||
if not declared_color:
|
||
asyncio.create_task(self.bot.send_private_muc(self.room, nick, "⚠️ Please specify a color (e.g. `play wild red`)."))
|
||
self._pm_hand(nick)
|
||
return "Specify color (e.g. play wild red)"
|
||
self.color = declared_color
|
||
elif s_col == t_col or s_val == t_val:
|
||
valid = True
|
||
self.color = s_col
|
||
|
||
if not valid: return "Invalid move."
|
||
|
||
hand.remove(sel_card)
|
||
self.discard.append(sel_card)
|
||
|
||
msg = f"{nick} played {self._fmt(sel_card)}."
|
||
if not hand:
|
||
self.started = False
|
||
return f"{msg}\n🏆 {nick} Wins!"
|
||
if len(hand) == 1: msg += " UNO!"
|
||
|
||
if 'skip' in s_val: self._advance()
|
||
elif 'reverse' in s_val:
|
||
self.direction *= -1
|
||
if len(self.players)==2: self._advance()
|
||
elif '+2' in s_val:
|
||
self._advance()
|
||
self._draw(self.players[self.turn], 2)
|
||
elif '+4' in s_val:
|
||
self._advance()
|
||
self._draw(self.players[self.turn], 4)
|
||
|
||
self._advance()
|
||
return f"{msg}\nNext: {self.players[self.turn]} (Color: {self.color})"
|
||
|
||
def draw(self, nick):
|
||
if self.players[self.turn] != nick: return
|
||
self._draw(nick, 1)
|
||
self._advance()
|
||
return f"{nick} drew."
|
||
|
||
def _draw(self, nick, n):
|
||
for _ in range(n):
|
||
if not self.deck:
|
||
self.deck = self.discard[:-1]
|
||
self.discard = [self.discard[-1]]
|
||
random.shuffle(self.deck)
|
||
if self.deck: self.hands[nick].append(self.deck.pop())
|
||
self._pm_hand(nick)
|
||
|
||
def _advance(self):
|
||
self.turn = (self.turn + self.direction) % len(self.players)
|
||
next_player = self.players[self.turn]
|
||
self._pm_hand(next_player)
|
||
|
||
def _fmt(self, c):
|
||
if 'wild' in c: return f"🌈 {c.split('_')[1]}"
|
||
em = {'red':'🟥','blue':'🟦','green':'🟩','yellow':'🟨'}
|
||
return f"{em.get(c.split('_')[0],'')} {c.split('_')[1]}"
|
||
|
||
def _pm_hand(self, nick):
|
||
asyncio.create_task(self.bot.send_private_muc(self.room, nick, " | ".join(self._fmt(c) for c in self.hands[nick])))
|
||
|
||
# JACK IMPLEMENTATION
|
||
class MultiplayerBlackjackGame:
|
||
def __init__(self, room, bot):
|
||
self.room = room
|
||
self.bot = bot
|
||
self.players = []
|
||
self.hands = {}
|
||
self.d_hand = []
|
||
self.deck = []
|
||
self.turn_idx = 0
|
||
self.status = "JOINING"
|
||
|
||
suits = ['♠', '♥', '♦', '♣']
|
||
ranks = ['2','3','4','5','6','7','8','9','10','J','Q','K','A']
|
||
self.deck = [{'rank': r, 'suit': s} for s in suits for r in ranks] * 2
|
||
random.shuffle(self.deck)
|
||
|
||
def join(self, nick):
|
||
if self.status != "JOINING": return "Game already started."
|
||
if nick in self.players: return "Already joined."
|
||
self.players.append(nick)
|
||
return f"{nick} joined Blackjack."
|
||
|
||
def start(self):
|
||
if len(self.players) < 1: return "Need players."
|
||
self.status = "PLAYING"
|
||
self.turn_idx = 0
|
||
|
||
for p in self.players:
|
||
self.hands[p] = [self.deck.pop(), self.deck.pop()]
|
||
self.d_hand = [self.deck.pop(), self.deck.pop()]
|
||
|
||
return self._status_msg()
|
||
|
||
def hit(self, nick):
|
||
if self.status != "PLAYING": return "Not playing."
|
||
if self.players[self.turn_idx] != nick: return "Not your turn."
|
||
|
||
self.hands[nick].append(self.deck.pop())
|
||
val = self._calc(self.hands[nick])
|
||
|
||
res = f"{nick} hits: {self._fmt(self.hands[nick])} ({val})"
|
||
if val >= 21:
|
||
res += " (Bust!)" if val > 21 else " (21!)"
|
||
self._advance_turn()
|
||
|
||
return res
|
||
|
||
def stand(self, nick):
|
||
if self.status != "PLAYING": return "Not playing."
|
||
if self.players[self.turn_idx] != nick: return "Not your turn."
|
||
|
||
res = f"{nick} stands."
|
||
self._advance_turn()
|
||
return res
|
||
|
||
def _advance_turn(self):
|
||
self.turn_idx += 1
|
||
if self.turn_idx >= len(self.players):
|
||
self.status = "DEALING"
|
||
|
||
def resolve_game(self):
|
||
while self._calc(self.d_hand) < 17:
|
||
self.d_hand.append(self.deck.pop())
|
||
|
||
d_val = self._calc(self.d_hand)
|
||
lines = [f"🎰 **Results** (Dealer: {self._fmt(self.d_hand)} - {d_val})"]
|
||
|
||
for p in self.players:
|
||
p_val = self._calc(self.hands[p])
|
||
if p_val > 21:
|
||
lines.append(f"❌ {p}: {p_val} (Bust)")
|
||
elif d_val > 21 or p_val > d_val:
|
||
lines.append(f"✅ {p}: {p_val} (Win)")
|
||
elif p_val == d_val:
|
||
lines.append(f"⚖️ {p}: {p_val} (Push)")
|
||
else:
|
||
lines.append(f"❌ {p}: {p_val} (Loss)")
|
||
|
||
return "\n".join(lines)
|
||
|
||
def _status_msg(self):
|
||
if self.status == "PLAYING":
|
||
curr = self.players[self.turn_idx]
|
||
msg = f"🃏 **Blackjack**\nDealer: [{self.d_hand[0]['rank']}{self.d_hand[0]['suit']}] [??]\n"
|
||
for p in self.players:
|
||
mark = "👉 " if p == curr else ""
|
||
msg += f"{mark}{p}: {self._fmt(self.hands[p])} ({self._calc(self.hands[p])})\n"
|
||
return msg
|
||
return ""
|
||
|
||
def _calc(self, hand):
|
||
val = 0
|
||
aces = 0
|
||
for c in hand:
|
||
if c['rank'] in ['J','Q','K']: val += 10
|
||
elif c['rank'] == 'A': val += 11; aces += 1
|
||
else: val += int(c['rank'])
|
||
while val > 21 and aces: val -= 10; aces -= 1
|
||
return val
|
||
|
||
def _fmt(self, hand):
|
||
return " ".join([f"[{c['rank']}{c['suit']}]" for c in hand])
|
||
|
||
# MAIN CLASS
|
||
class RevoltBot(slixmpp.ClientXMPP):
|
||
def __init__(self):
|
||
self.cfg = configparser.ConfigParser()
|
||
if not Path('config.ini').exists(): raise FileNotFoundError("config.ini missing")
|
||
self.cfg.read('config.ini')
|
||
|
||
super().__init__(self.cfg['XMPP']['jid'], self.cfg['XMPP']['password'])
|
||
|
||
self.trigger = self.cfg['Bot']['trigger']
|
||
self.nick = self.cfg['Bot']['nickname']
|
||
self.rooms = [r.strip() for r in self.cfg['XMPP']['rooms'].split(',')]
|
||
|
||
self.db = DataManager('bot_data.json')
|
||
self.copypastas = DataManager('copypastas.json')
|
||
self.warnings = self.db.get('warnings', {})
|
||
self.wordlist = set()
|
||
|
||
self.config_moderators = set()
|
||
if self.cfg.has_option('Bot', 'moderators'):
|
||
raw_mods = self.cfg['Bot']['moderators']
|
||
self.config_moderators = {m.strip().lower() for m in raw_mods.split(',') if m.strip()}
|
||
|
||
# Prefers English -> German -> French -> Espanol for word definitions
|
||
# Languages are chosen because they are the most spoken languages with Latin alphabet
|
||
# Configuration allows change this part
|
||
self.wiktionary_priority = ['en', 'de', 'fr', 'es']
|
||
if self.cfg.has_option('Features', 'wiktionary_priority'):
|
||
raw_p = self.cfg['Features']['wiktionary_priority']
|
||
self.wiktionary_priority = [p.strip() for p in raw_p.split(',') if p.strip()]
|
||
|
||
self.uploader = AsyncFileUploader(self,
|
||
self.cfg['Bot'].get('file_host', 'litterbox'),
|
||
self.cfg['Bot'].get('file_host_api_key'))
|
||
|
||
self.polls = PollManager(self)
|
||
self.games = {}
|
||
|
||
|
||
self.register_plugin('xep_0030')
|
||
self.register_plugin('xep_0045')
|
||
self.register_plugin('xep_0199')
|
||
self.register_plugin('xep_0363')
|
||
self.register_plugin('xep_0054')
|
||
self.register_plugin('xep_0060')
|
||
|
||
self.add_event_handler("session_start", self.start)
|
||
self.add_event_handler("groupchat_message", self.on_group)
|
||
self.add_event_handler("message", self.on_pm)
|
||
|
||
# BOT STARTUP
|
||
async def start(self, event):
|
||
self.send_presence()
|
||
await self.get_roster()
|
||
|
||
# Downloads slurs from the database to block them while moderating
|
||
url = self.cfg['Features'].get('wordlist_url')
|
||
if url:
|
||
try:
|
||
async with aiohttp.ClientSession() as s:
|
||
async with s.get(url) as r:
|
||
if r.status == 200:
|
||
self.wordlist = set((await r.text()).lower().splitlines())
|
||
logging.info(f"Loaded {len(self.wordlist)} banned words.")
|
||
except Exception as e: logging.error(f"Wordlist error: {e}")
|
||
|
||
for room in self.rooms:
|
||
self.join_room_with_retry(room)
|
||
|
||
def join_room_with_retry(self, room):
|
||
room_jid = str(slixmpp.JID(room).bare)
|
||
n = self.cfg['XMPP'].get(f'nickname.{room}', self.nick)
|
||
try:
|
||
self.plugin['xep_0045'].join_muc(room_jid, n)
|
||
logging.info(f"Joined {room_jid} as {n}")
|
||
except Exception as e:
|
||
logging.error(f"Could not join {room}: {e}")
|
||
|
||
|
||
|
||
async def on_group(self, msg):
|
||
if msg['mucnick'] == self.nick: return
|
||
room = msg['from'].bare
|
||
body = msg['body'].strip()
|
||
|
||
mod_enabled = self.cfg['Moderation'].get('moderation_enabled', 'ALL').upper()
|
||
should_moderate = True
|
||
if mod_enabled == 'NONE': should_moderate = False
|
||
elif mod_enabled == 'NON_NSFW' and self.is_nsfw_room(room): should_moderate = False
|
||
|
||
# Warn a user if they use a slur!
|
||
try:
|
||
if should_moderate and self.wordlist:
|
||
if any(re.search(r'\b'+re.escape(w)+r'\b', body.lower()) for w in self.wordlist):
|
||
await self.warn(room, msg['mucnick'], "Message includes word inside the filter.")
|
||
return
|
||
|
||
if not body.startswith(self.trigger): return
|
||
|
||
parts = body[len(self.trigger):].split(maxsplit=1)
|
||
cmd = parts[0].lower()
|
||
args = parts[1] if len(parts) > 1 else ""
|
||
|
||
await self.handle_cmd(room, msg, cmd, args)
|
||
except Exception as e:
|
||
logging.error(f"Error handling message: {e}")
|
||
|
||
async def on_pm(self, msg):
|
||
if msg['type'] == 'error': return
|
||
if msg['from'].bare in self.rooms and not msg['from'].resource: return
|
||
if msg['from'].bare == self.boundjid.bare: return
|
||
if msg['type'] == 'groupchat': return
|
||
|
||
body = msg['body'].strip()
|
||
if not body: return
|
||
|
||
if body.startswith(self.trigger):
|
||
parts = body[len(self.trigger):].split(maxsplit=1)
|
||
cmd = parts[0].lower()
|
||
if cmd == 'help':
|
||
await self.send_private_msg_by_jid(msg['from'], await self._generate_help_message(False))
|
||
elif cmd == 'dm':
|
||
await self.send_private_msg_by_jid(msg['from'], "Hi! :3")
|
||
|
||
async def handle_cmd(self, room, msg, cmd, args):
|
||
sender = msg['mucnick']
|
||
|
||
# Too lazy to remove cdn links
|
||
if cmd == 'ihnk':
|
||
await self.send_group(room, "https://cdn.purplebored.pl/uploads/y2mate.com%20-%20I%20have%20no%20knowledge%20of%20any%20of%20this%20ThIs%20iS%20sO%20bIzZaRe_480p.mp4")
|
||
elif cmd == 'scary':
|
||
await self.send_group(room, "https://cdn.purplebored.pl/uploads/y2mate.com%20-%20oh%20oh%20scary%20oh%20oh%20shiver%20me%20timbers_360p.mp4")
|
||
# MOST ESSENTIAL COMMAND OF THE BOT
|
||
elif cmd == 'n':
|
||
await self.send_group(room, random.choice(N_QUOTES))
|
||
elif cmd == 'cl':
|
||
await self.send_group(room, f"Common {args or 'Everyone'} L")
|
||
elif cmd == 'l':
|
||
await self.send_group(room, "L")
|
||
elif cmd == 'test':
|
||
await self.send_group(room, "Ig it works ™️")
|
||
elif cmd == 'credits':
|
||
await self.send_group(room, "Original by Purplebored. Port by agent-n.")
|
||
elif cmd == 'invite':
|
||
await self.send_private_muc(room, sender, "Contact the hoster of this instance to add your room.")
|
||
elif cmd == 'ping':
|
||
await self.send_group(room, "Pong!")
|
||
elif cmd == 'video':
|
||
await self.send_group(room, await self._generate_video_list())
|
||
|
||
elif cmd in ['calculate', 'calc']:
|
||
parts = args.split()
|
||
if len(parts) == 3:
|
||
try:
|
||
n1 = float(parts[0])
|
||
op = parts[1]
|
||
n2 = float(parts[2])
|
||
ops = {'+':operator.add, '-':operator.sub, '*':operator.mul, '/':operator.truediv}
|
||
if op in ops:
|
||
if op == '/' and n2 == 0: await self.send_group(room, "Division by zero is not allowed.")
|
||
else: await self.send_group(room, f"Result: {ops[op](n1, n2)}")
|
||
else: await self.send_group(room, "Invalid op.")
|
||
except: await self.send_group(room, "Error.")
|
||
else: await self.send_group(room, f"Usage: {self.trigger}calc num + num")
|
||
elif cmd == 'dm':
|
||
await self.send_private_muc(room, sender, "Hi :3")
|
||
|
||
|
||
# AVATAR DOWNLOAD AND REUPLOAD
|
||
elif cmd == 'avatar':
|
||
target_nick = args.strip() if args else sender
|
||
try:
|
||
target_jid = self.plugin['xep_0045'].get_jid_property(room, target_nick, 'jid')
|
||
is_muc_occupant = False
|
||
|
||
if not target_jid:
|
||
target_jid = f"{room}/{target_nick}"
|
||
is_muc_occupant = True
|
||
|
||
avatar_data, mime_type = await self._get_avatar_data(target_jid, is_muc_occupant=is_muc_occupant)
|
||
|
||
if avatar_data:
|
||
ext = mimetypes.guess_extension(mime_type) or ".png"
|
||
filename = f"avatar{ext}"
|
||
url = await self.uploader.upload(avatar_data, filename, mime_type)
|
||
if url: await self.send_group(room, f"{url}")
|
||
else: await self.send_group(room, "Avatar retrieved but upload failed (literally 1984).")
|
||
else:
|
||
await self.send_group(room, f"No avatar found for {target_nick}. Please try again later...")
|
||
except Exception as e:
|
||
logging.error(f"Avatar error: {e}")
|
||
await self.send_group(room, "Error fetching avatar.")
|
||
|
||
# WIKTIONARY LOOKUP
|
||
elif cmd in ['wiki', 'wiktionary']:
|
||
if not args:
|
||
await self.send_group(room, "Usage: wiki <word>")
|
||
return
|
||
|
||
term = args.strip()
|
||
url = f"https://en.wiktionary.org/api/rest_v1/page/definition/{quote(term)}?redirect=true"
|
||
headers = {
|
||
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/142.0.0.0 Safari/537.36',
|
||
'Accept': 'application/json; charset=utf-8; profile="https://www.mediawiki.org/wiki/Specs/definition/0.8.0"'
|
||
}
|
||
|
||
try:
|
||
data = await self.fetch(url, headers=headers, json=True)
|
||
|
||
if not data or 'error' in data:
|
||
await self.send_group(room, "Word not found.")
|
||
return
|
||
|
||
available_codes = list(data.keys())
|
||
if 'zh' in available_codes: available_codes.remove('zh')
|
||
|
||
priority_codes = self.wiktionary_priority
|
||
selected_code = next((c for c in priority_codes if c in available_codes), None)
|
||
|
||
if not selected_code and available_codes:
|
||
selected_code = available_codes[0]
|
||
|
||
if not selected_code:
|
||
await self.send_group(room, "No definitions found.")
|
||
return
|
||
|
||
entries = data[selected_code]
|
||
lang_name = entries[0].get('language', selected_code)
|
||
found_defs = []
|
||
|
||
for entry in entries:
|
||
pos = entry.get('partOfSpeech', '')
|
||
for d in entry.get('definitions', []):
|
||
raw = d.get('definition', '')
|
||
clean = re.sub(r'<[^>]+>', '', raw).strip()
|
||
if clean:
|
||
found_defs.append(f"({pos}) {clean}" if pos else clean)
|
||
|
||
if found_defs:
|
||
resp = f"📖 **{term}** ({lang_name})\n" + "\n".join(f"- {d}" for d in found_defs[:4])
|
||
await self.send_group(room, resp)
|
||
else:
|
||
await self.send_group(room, f"No clean definitions found in {lang_name}.")
|
||
|
||
except Exception as e:
|
||
logging.error(f"Wiki error: {e}")
|
||
await self.send_group(room, "Error fetching definition.")
|
||
elif cmd in ['tdk', 'sozluk']:
|
||
if not args:
|
||
await self.send_group(room, "Please provide a word to search.")
|
||
return
|
||
|
||
# Language Society word search (hidden by default to prevent confusion from other dictionary command)
|
||
url = f"https://sozluk.gov.tr/gts?ara={args.strip()}"
|
||
try:
|
||
|
||
data = await self.fetch(url, headers=TDK_HEADERS, content_type_fix=True)
|
||
if data and isinstance(data, list) and len(data) > 0:
|
||
item = data[0]
|
||
word = item.get('madde', '')
|
||
meanings = item.get('anlamlarListe', [])
|
||
origin = item.get('lisan', '')
|
||
|
||
resp = f"📚 **{word}**"
|
||
if origin: resp += f" ({origin})"
|
||
resp += "\n"
|
||
|
||
for idx, m in enumerate(meanings, 1):
|
||
resp += f"{idx}. {m.get('anlam', '')}\n"
|
||
|
||
await self.send_group(room, resp)
|
||
else:
|
||
await self.send_group(room, "Word not found in TDK dictionary.")
|
||
except Exception as e:
|
||
logging.error(f"TDK error: {e}")
|
||
await self.send_group(room, "Error fetching definition.")
|
||
|
||
|
||
elif cmd == 'quote':
|
||
d = await self.fetch("http://api.forismatic.com/api/1.0/?method=getQuote&format=json&lang=en", json=True, content_type_fix=True)
|
||
if d:
|
||
try:
|
||
await self.send_group(room, f"\"{d.get('quoteText','').strip()}\" - {d.get('quoteAuthor','Unknown')}")
|
||
except: await self.send_group(room, "Quote error hell yeah!")
|
||
|
||
|
||
# Since API is broken it uses local shitposts
|
||
elif cmd == 'shitpost':
|
||
p = Path('shitposts')
|
||
if not p.exists():
|
||
await self.send_group(room, "Shitpost directory missing.")
|
||
return
|
||
|
||
images = [f for f in p.iterdir() if f.suffix.lower() in ['.jpg', '.jpeg', '.png', '.gif', '.webp']]
|
||
if not images:
|
||
await self.send_group(room, "No shitposts found locally.")
|
||
return
|
||
|
||
try:
|
||
target = random.choice(images)
|
||
data = target.read_bytes()
|
||
mime, _ = mimetypes.guess_type(target.name)
|
||
url = await self.uploader.upload(data, target.name, mime or 'application/octet-stream')
|
||
if url: await self.send_group(room, url)
|
||
else: await self.send_group(room, "Failed to upload local shitpost.")
|
||
except Exception as e:
|
||
logging.error(f"Shitpost error: {e}")
|
||
await self.send_group(room, "Error sending shitpost.")
|
||
|
||
elif cmd == 'neko':
|
||
d = await self.fetch("https://nekos.best/api/v2/neko")
|
||
if d: await self.upload_send(room, d['results'][0]['url'])
|
||
elif cmd == 'cat':
|
||
d = await self.fetch("https://api.thecatapi.com/v1/images/search")
|
||
if d: await self.upload_send(room, d[0]['url'])
|
||
elif cmd == 'dogfact':
|
||
d = await self.fetch("https://dogapi.dog/api/v2/facts")
|
||
if d and d['data']: await self.send_group(room, d['data'][0]['attributes']['body'])
|
||
elif cmd == 'catfact':
|
||
d = await self.fetch("https://meowfacts.herokuapp.com/")
|
||
if d and d['data']: await self.send_group(room, d['data'][0])
|
||
elif cmd == 'urban':
|
||
d = await self.fetch(f"https://api.urbandictionary.com/v0/define?term={args}")
|
||
if d and d['list']: await self.send_group(room, f"📖 {d['list'][0]['definition'][:300]}...")
|
||
elif cmd == 'joke':
|
||
d = await self.fetch("https://v2.jokeapi.dev/joke/Any?type=twopart")
|
||
if d and not d.get('error'): await self.send_group(room, f"{d['setup']}\n{d['delivery']}")
|
||
elif cmd == 'pun':
|
||
d = await self.fetch("https://v2.jokeapi.dev/joke/Pun?type=twopart")
|
||
if d and not d.get('error'): await self.send_group(room, f"{d['setup']}\n{d['delivery']}")
|
||
elif cmd == 'fact':
|
||
d = await self.fetch("https://uselessfacts.jsph.pl/random.json?language=en")
|
||
if d: await self.send_group(room, d['text'])
|
||
elif cmd == 'advice':
|
||
d = await self.fetch("https://api.adviceslip.com/advice", content_type_fix=True)
|
||
if d: await self.send_group(room, d['slip']['advice'])
|
||
|
||
# say command aka parrot mode. Censors slurs with * character
|
||
elif cmd == 'say':
|
||
text = args
|
||
if self.wordlist:
|
||
sorted_words = sorted(self.wordlist, key=len, reverse=True)
|
||
for w in sorted_words:
|
||
if not w: continue
|
||
chars = [re.escape(c) for c in w]
|
||
pattern_str = r"(?i)\b" + r"[\W_]*".join(chars) + r"\b"
|
||
try:
|
||
pattern = re.compile(pattern_str)
|
||
def repl(match):
|
||
return '*' * len(match.group(0))
|
||
text = pattern.sub(repl, text)
|
||
except: pass
|
||
|
||
await self.send_group(room, f"{sender} says: {text}")
|
||
|
||
elif cmd == 'dice':
|
||
await self.send_group(room, f"🎲 {random.randint(1,6)}")
|
||
elif cmd == 'roll':
|
||
try:
|
||
limit = int(args.strip())
|
||
if limit > 0:
|
||
await self.send_group(room, f"🎲 {random.randint(1, limit)}")
|
||
else:
|
||
await self.send_group(room, "Please pick a valid number.")
|
||
except:
|
||
await self.send_group(room, "Please specify a number to roll.")
|
||
|
||
elif cmd == 'coinflip':
|
||
await self.send_group(room, f"🪙 {random.choice(['Heads','Tails'])}")
|
||
elif cmd == 'rps':
|
||
opts = ['rock','paper','scissors']
|
||
b = random.choice(opts)
|
||
u = args.lower().strip()
|
||
if u not in opts:
|
||
await self.send_group(room, "rock, paper, or scissors?")
|
||
else:
|
||
res = "I won!"
|
||
if b == u: res = "Tie!"
|
||
elif (u=='rock' and b=='scissors') or (u=='paper' and b=='rock') or (u=='scissors' and b=='paper'):
|
||
res = "You won!"
|
||
await self.send_group(room, f"I chose {b}. {res}")
|
||
|
||
|
||
# GIF command finds and sends the searched GIF
|
||
elif cmd == 'gif':
|
||
k = self.cfg['Bot'].get('giphy_key')
|
||
if not k:
|
||
await self.send_group(room, "Giphy API key not configured.")
|
||
return
|
||
try:
|
||
d = await self.fetch(f"https://api.giphy.com/v1/gifs/search?api_key={k}&q={args}&limit=1")
|
||
if d and d['data']:
|
||
url = d['data'][0]['images']['original']['url']
|
||
await self.upload_send(room, url)
|
||
else:
|
||
await self.send_group(room, f"No GIF found for '{args}'.")
|
||
except Exception as e:
|
||
logging.error(f"GIF error: {e}")
|
||
await self.send_group(room, f"Error searching GIF.")
|
||
|
||
|
||
elif cmd in ['kick','ban','mute','warn','unban','unmute']:
|
||
if not self.is_mod(room, sender):
|
||
await self.send_group(room, "Not authorized.")
|
||
return
|
||
|
||
target_nick = args.strip()
|
||
if not target_nick:
|
||
await self.send_group(room, f"Usage: {cmd} <nick/jid>")
|
||
return
|
||
|
||
in_roster = False
|
||
try:
|
||
roster = self.plugin['xep_0045'].get_roster(room)
|
||
if target_nick in roster: in_roster = True
|
||
except: pass
|
||
|
||
try:
|
||
if cmd == 'warn':
|
||
if not in_roster:
|
||
await self.send_group(room, "User not found.")
|
||
return
|
||
await self.warn(room, target_nick, "Manual Warning")
|
||
|
||
elif cmd == 'kick':
|
||
if not in_roster:
|
||
await self.send_group(room, "User not found.")
|
||
return
|
||
await self.plugin['xep_0045'].set_role(room, target_nick, 'none', reason="Kicked by operator")
|
||
await self.send_group(room, f"👢 Kicked {target_nick}. They will return I guess but anyways.")
|
||
|
||
elif cmd == 'mute':
|
||
if not in_roster:
|
||
await self.send_group(room, "User not found.")
|
||
return
|
||
await self.plugin['xep_0045'].set_role(room, target_nick, 'visitor', reason="Muted by operator")
|
||
await self.send_group(room, f"🤐 Muted (shutted up) {target_nick}.")
|
||
|
||
elif cmd == 'unmute':
|
||
if not in_roster:
|
||
await self.send_group(room, "User not found.")
|
||
return
|
||
await self.plugin['xep_0045'].set_role(room, target_nick, 'participant', reason="Unmuted by operator")
|
||
await self.send_group(room, f"🔊 Unmuted {target_nick}. {target_nick} can speak now again!")
|
||
|
||
elif cmd == 'ban':
|
||
if not in_roster:
|
||
await self.send_group(room, "User not found.")
|
||
return
|
||
jid = self.plugin['xep_0045'].get_jid_property(room, target_nick, 'jid')
|
||
if jid:
|
||
await self.plugin['xep_0045'].set_affiliation(room, jid=jid, affiliation='outcast', reason="Banned by operator")
|
||
await self.send_group(room, f"🚫 Banhammer! whomp whomp {target_nick}")
|
||
else:
|
||
await self.send_group(room, "Cannot ban (Hidden JID/Anonymous).")
|
||
|
||
elif cmd == 'unban':
|
||
target_jid = target_nick
|
||
try:
|
||
await self.plugin['xep_0045'].set_affiliation(room, jid=target_jid, affiliation='none', reason="Unbanned by operator")
|
||
await self.send_group(room, f"🕊️ Unbanned {target_jid}. Taste of liberty!")
|
||
except IqError:
|
||
await self.send_group(room, f"Failed to unban {target_jid}. Ensure you are using the valid JID.")
|
||
|
||
except IqError as e:
|
||
await self.send_group(room, f"Permission denied: {e.iq['error']['text'] or 'Bot missing privileges'}")
|
||
except Exception as e:
|
||
logging.error(f"Mod command failed: {e}")
|
||
await self.send_group(room, "Operation failed.")
|
||
|
||
|
||
elif cmd == 'poll':
|
||
res = self.polls.create(room, sender, args)
|
||
await self.send_group(room, res)
|
||
elif cmd == 'vote':
|
||
res = self.polls.vote(sender, args)
|
||
await self.send_private_muc(room, sender, res)
|
||
elif cmd == 'close':
|
||
res = self.polls.close(sender, args)
|
||
await self.send_group(room, res)
|
||
|
||
elif cmd == 'uno':
|
||
sub = args.split()[0] if args else ''
|
||
arg = args[len(sub):].strip()
|
||
gid = f"uno_{room}"
|
||
g = self.games.get(gid)
|
||
if sub == 'join':
|
||
if not g:
|
||
g = UnoGame(room, self)
|
||
self.games[gid] = g
|
||
await self.send_group(room, g.join(sender))
|
||
elif sub == 'start' and g:
|
||
await self.send_group(room, g.start())
|
||
elif sub == 'play' and g:
|
||
res = g.play(sender, arg)
|
||
if res: await self.send_group(room, res)
|
||
if res and "Wins" in res: del self.games[gid]
|
||
elif sub == 'draw' and g:
|
||
res = g.draw(sender)
|
||
if res: await self.send_group(room, res)
|
||
|
||
# JACK!!
|
||
elif cmd in ['bj', 'blackjack']:
|
||
gid = f"bj_{room}"
|
||
sub = args.split()[0].lower() if args else ''
|
||
|
||
g = self.games.get(gid)
|
||
|
||
if not sub:
|
||
if not g:
|
||
g = MultiplayerBlackjackGame(room, self)
|
||
self.games[gid] = g
|
||
g.join(sender)
|
||
await self.send_group(room, g.start())
|
||
else:
|
||
if g.status == "JOINING":
|
||
await self.send_group(room, f"Lobby Open! Type `{self.trigger}bj join` to join or `{self.trigger}bj start` to begin.")
|
||
else:
|
||
await self.send_group(room, "Game in progress.")
|
||
|
||
elif sub == 'join':
|
||
if not g:
|
||
g = MultiplayerBlackjackGame(room, self)
|
||
self.games[gid] = g
|
||
await self.send_group(room, f"Blackjack lobby created by {sender}. Type `{self.trigger}bj join`.")
|
||
|
||
res = g.join(sender)
|
||
await self.send_group(room, res)
|
||
|
||
elif sub == 'start' and g:
|
||
await self.send_group(room, g.start())
|
||
|
||
elif sub == 'hit' and g:
|
||
res = g.hit(sender)
|
||
await self.send_group(room, res)
|
||
if g.status == "DEALING":
|
||
await self.send_group(room, g.resolve_game())
|
||
del self.games[gid]
|
||
|
||
elif sub == 'stand' and g:
|
||
res = g.stand(sender)
|
||
await self.send_group(room, res)
|
||
if g.status == "DEALING":
|
||
await self.send_group(room, g.resolve_game())
|
||
del self.games[gid]
|
||
|
||
else:
|
||
await self.send_group(room, f"Blackjack: `{self.trigger}bj` (Solo), `{self.trigger}bj join` (Multi), `{self.trigger}bj hit/stand`")
|
||
|
||
# Abbreviation will be removed
|
||
elif cmd == 'cp' or cmd == 'copypasta':
|
||
if args.startswith('add'):
|
||
_, name, txt = args.split(maxsplit=2)
|
||
d = self.copypastas.get('pastas', {})
|
||
d[name] = txt
|
||
self.copypastas.set('pastas', d)
|
||
await self.send_group(room, "Added.")
|
||
elif args == 'list':
|
||
await self.send_group(room, ", ".join(self.copypastas.get('pastas',{}).keys()))
|
||
else:
|
||
txt = self.copypastas.get('pastas',{}).get(args)
|
||
if txt: await self.send_group(room, txt)
|
||
|
||
|
||
elif cmd == 'gpt':
|
||
if not self.cfg.has_section('AI') or not self.cfg['AI'].get('openai_key'):
|
||
await self.send_group(room, "AI not configured.")
|
||
return
|
||
|
||
k = self.cfg['AI']['openai_key']
|
||
url = self.cfg['AI'].get('openai_api_url')
|
||
pl = {
|
||
"model": self.cfg['AI'].get('openai_model'),
|
||
"messages": [{"role":"system","content":self.cfg['AI'].get('openai_system_prompt')},
|
||
{"role":"user","content":args}]
|
||
}
|
||
headers = {"Authorization": f"Bearer {k}"}
|
||
d = await self.fetch(url, method='POST', json_data=pl, headers=headers)
|
||
if d and d.get('choices'): await self.send_group(room, d['choices'][0]['message']['content'])
|
||
|
||
|
||
|
||
|
||
elif cmd in ['translate', 'tr']:
|
||
if not self.cfg.has_section('AI') or not self.cfg['AI'].get('openai_key'):
|
||
await self.send_group(room, "AI not configured.")
|
||
return
|
||
|
||
parts = args.split(maxsplit=1)
|
||
if len(parts) < 2:
|
||
await self.send_group(room, f"Usage: {self.trigger}translate <Language> <Text>")
|
||
return
|
||
|
||
target_lang = parts[0]
|
||
text_to_translate = parts[1]
|
||
|
||
system_prompt = (
|
||
f"You are a precise translator. Your sole function is to detect the language of the input text and translate it into {target_lang}. "
|
||
f"Provide only the {target_lang} translation with no additional commentary, explanations, notes, or meta-information. "
|
||
"Do not include the source language name, translation notes, or any text beyond the translated output. "
|
||
f"Maintain the original meaning, tone, and structure as closely as possible while ensuring natural {target_lang} expression."
|
||
)
|
||
|
||
k = self.cfg['AI']['openai_key']
|
||
url = self.cfg['AI'].get('openai_api_url')
|
||
pl = {
|
||
"model": self.cfg['AI'].get('openai_model'),
|
||
"messages": [
|
||
{"role": "system", "content": system_prompt},
|
||
{"role": "user", "content": text_to_translate}
|
||
]
|
||
}
|
||
headers = {"Authorization": f"Bearer {k}"}
|
||
|
||
d = await self.fetch(url, method='POST', json_data=pl, headers=headers)
|
||
if d and d.get('choices'):
|
||
await self.send_group(room, d['choices'][0]['message']['content'])
|
||
else:
|
||
await self.send_group(room, "Translation failed.")
|
||
elif cmd in ['r34', 'hentai', 'boobs', 'hboobs']:
|
||
if not self.is_nsfw_room(room):
|
||
await self.send_group(room, "NSFW not allowed here. Find a NSFW room or add the room to NSFW list smh")
|
||
return
|
||
|
||
if cmd == 'r34':
|
||
tag = args if args else 'all'
|
||
d = await self.fetch(f"https://api.rule34.xxx/index.php?page=dapi&s=post&q=index&limit=1&json=1&tags={tag}")
|
||
if d and len(d) > 0 and 'file_url' in d[0]: await self.upload_send(room, d[0]['file_url'])
|
||
|
||
elif cmd == 'hentai':
|
||
d = await self.fetch("https://nekobot.xyz/api/image?type=hneko")
|
||
if d and 'message' in d: await self.upload_send(room, d['message'])
|
||
|
||
elif cmd == 'boobs':
|
||
d = await self.fetch("https://nekobot.xyz/api/image?type=boobs")
|
||
if d and 'message' in d: await self.upload_send(room, d['message'])
|
||
|
||
elif cmd == 'hboobs':
|
||
d = await self.fetch("https://nekobot.xyz/api/image?type=hboobs")
|
||
if d and 'message' in d: await self.upload_send(room, d['message'])
|
||
|
||
|
||
# Same thing with copypasta applies here too
|
||
elif cmd in ['wa', 'wolfram']:
|
||
if not args:
|
||
await self.send_group(room, "Usage: wa <natural language query>")
|
||
else:
|
||
appid = None
|
||
if self.cfg.has_section('Features'):
|
||
appid = self.cfg['Features'].get('wolfram_appid')
|
||
if not appid and self.cfg.has_section('Bot'):
|
||
appid = self.cfg['Bot'].get('wolfram_appid')
|
||
|
||
if not appid:
|
||
await self.send_group(room, "WolframAlpha AppID not configured.")
|
||
else:
|
||
try:
|
||
url = f"http://api.wolframalpha.com/v1/result?appid={appid}&i={quote(args.strip())}"
|
||
data = await self.fetch(url, json=False)
|
||
if data:
|
||
txt = data.decode('utf-8', errors='ignore').strip()
|
||
await self.send_group(room, f"Wolfram says: {txt}")
|
||
else:
|
||
await self.send_group(room, "No short answer available.")
|
||
except Exception as e:
|
||
logging.error(f"Wolfram error: {e}")
|
||
await self.send_group(room, "Error querying WolframAlpha.")
|
||
|
||
elif cmd == 'help':
|
||
await self.send_group(room, await self._generate_help_message(self.is_mod(room, sender)))
|
||
elif cmd == 'mod-help':
|
||
if self.is_mod(room, sender): await self.send_group(room, await self._generate_mod_help())
|
||
elif cmd == 'nsfw-help':
|
||
await self.send_group(room, await self._generate_nsfw_help())
|
||
|
||
async def _generate_help_message(self, is_mod):
|
||
t = self.trigger
|
||
s = f"#### List of available commands:\n\n#### Practical Commands: \n" \
|
||
f"`{t}video` - Sends a list of all available videos\n" \
|
||
f"`{t}help` - Displays this command.\n" \
|
||
f"`{t}credits` - Displays bot's credit.\n" \
|
||
f"`{t}test` - Simple test command you say test bot will response.\n" \
|
||
f"`{t}invite` - Info about how to make bot join more rooms.\n" \
|
||
f"`{t}calc` {{num}} {{+, - , / , * }} {{num}} - Calculator.\n" \
|
||
f"`{t}ping` - tests the bot ping.\n" \
|
||
f"`{t}gpt` - Send commands to clankers.\n" \
|
||
f"`{t}tr` <language> <text> - Translate text to target language.\n" \
|
||
f"`{t}copypasta` - Add or send copypastas.\n" \
|
||
f"`{t}wiki` - Ughm acthually lookup.\n" \
|
||
f"`{t}wa` - WolframAlpha query, can convert anything to anything.\n"
|
||
|
||
if is_mod: s += f"`{t}mod-help` - Mod commands.\n"
|
||
s += f"`{t}nsfw-help` - Sends a list of all available nsfw commands.\n"
|
||
|
||
s += f"\n#### Fun Commands: \n" \
|
||
f"`{t}dice` - Rolls a random number between 1 and 6.\n" \
|
||
f"`{t}roll` {{num}} - Rolls a random number between 1 and num.\n" \
|
||
f"`{t}say` - says what the user told it to say!.\n" \
|
||
f"`{t}cl` - Common L.\n" \
|
||
f"`{t}l` - L.\n" \
|
||
f"`{t}dm` - Whispers the user Hi :3.\n" \
|
||
f"`{t}rps` - Simple Rock paper scissors game.\n" \
|
||
f"`{t}dogfact` - Gives a random dogfact using the Dogfact API!.\n" \
|
||
f"`{t}catfact` - Gives a random Catfact using cat fact API\n" \
|
||
f"`{t}joke` - Very simple command just gives a joke using the Joke API.\n" \
|
||
f"`{t}coinflip` - a Command so easy a child could do it.\n" \
|
||
f"`{t}fact` - Gives a random useless fact.\n" \
|
||
f"`{t}urban` - uses the urban dictionary to search for the word.\n" \
|
||
f"`{t}shitpost` - Sends random shitpost from local library\n" \
|
||
f"`{t}cat` - Cat :3 \n" \
|
||
f"`{t}neko` - Neko command \n" \
|
||
f"`{t}advice` - Gives the user a life Advice \n" \
|
||
f"`{t}quote` - Gives a random quote using yet another API. \n" \
|
||
f"`{t}gif` - Allows the user to search for gifs using giphy (ratelimited sometimes)\n" \
|
||
f"`{t}pun` - Gives user a simple pun. \n" \
|
||
f"`{t}avatar` - Sends the user Avatar (Beta) \n" \
|
||
f"`{t}poll` - Democracy \n" \
|
||
f"`{t}uno` - Four colors\n" \
|
||
f"`{t}bj` - I play JACK bro! \n"
|
||
return s
|
||
|
||
async def _generate_mod_help(self):
|
||
t = self.trigger
|
||
return f"## Mod commands:\n" \
|
||
f"`{t}kick` - Kick a user.\n" \
|
||
f"`{t}ban` - Ban a user.\n" \
|
||
f"`{t}mute` - Mute a user.\n" \
|
||
f"`{t}warn` - Warn a user.\n" \
|
||
f"`{t}unban` - Unban a user.\n" \
|
||
f"`{t}unmute` - Unmute a user.\n"
|
||
|
||
async def _generate_nsfw_help(self):
|
||
t = self.trigger
|
||
return f"### NSFW commands:\n" \
|
||
f"`{t}r34` - Rule34 search. (doesnt work on some ISPs)\n" \
|
||
f"`{t}hentai` - Grabs a random hentai image \n" \
|
||
f"`{t}boobs` - Grabs a random boobs image \n" \
|
||
f"`{t}hboobs` - Grabs a random hboobs image simple. \n"
|
||
|
||
async def _generate_video_list(self):
|
||
t = self.trigger
|
||
return f"Videos:\n`{t}ihnk` Sends a I have no knwoledge about any of this video. \n`{t}scary` Sends the oh oh scary oh oh shiver me timbers video"
|
||
|
||
async def fetch(self, url, method='GET', json_data=None, headers=None, json=True, content_type_fix=False):
|
||
try:
|
||
async with aiohttp.ClientSession() as s:
|
||
async with s.request(method, url, json=json_data, headers=headers) as r:
|
||
if r.status == 200:
|
||
if json:
|
||
return await r.json(content_type=None) if content_type_fix else await r.json()
|
||
else:
|
||
return await r.read()
|
||
except Exception as e: logging.error(f"Fetch err: {e}")
|
||
return None
|
||
|
||
async def upload_send(self, room, url):
|
||
data = await self.fetch(url, json=False)
|
||
if data:
|
||
mime = self._sniff_mime(data)
|
||
if mime:
|
||
ext = mime.split('/')[1].replace('jpeg', 'jpg')
|
||
# Octet Stream is used in bin files with no known type afaik
|
||
else:
|
||
mime = "application/octet-stream"
|
||
ext = "bin"
|
||
path = url.split('?')[0].lower()
|
||
if path.endswith('.png'): mime, ext = 'image/png', 'png'
|
||
elif path.endswith(('.jpg', '.jpeg')): mime, ext = 'image/jpeg', 'jpg'
|
||
elif path.endswith('.gif'): mime, ext = 'image/gif', 'gif'
|
||
elif path.endswith('.webp'): mime, ext = 'image/webp', 'webp'
|
||
|
||
filename = f"image.{ext}"
|
||
l = await self.uploader.upload(data, filename, mime)
|
||
if l: await self.send_group(room, l)
|
||
|
||
def _sniff_mime(self, data: bytes) -> Optional[str]:
|
||
if not data or len(data) < 100:
|
||
return None
|
||
|
||
if data.startswith(b'\x89PNG\r\n\x1a\n'): return 'image/png'
|
||
if data.startswith(b'\xff\xd8\xff'): return 'image/jpeg'
|
||
if data.startswith(b'GIF8'): return 'image/gif'
|
||
if data.startswith(b'RIFF') and data[8:12] == b'WEBP': return 'image/webp'
|
||
|
||
return None
|
||
|
||
# GET AVATAR TO UPLOAD (SOMETIMES BROKEN)
|
||
async def _get_avatar_data(self, jid_str, is_muc_occupant=False) -> (Optional[bytes], Optional[str]):
|
||
j = slixmpp.JID(jid_str)
|
||
target = j.full if is_muc_occupant else j.bare
|
||
|
||
async def try_0153(use_ifrom=False):
|
||
try:
|
||
kwargs = {'jid': target}
|
||
if use_ifrom: kwargs['ifrom'] = self.boundjid.full
|
||
|
||
iq = await self.plugin["xep_0054"].get_vcard(**kwargs)
|
||
photo = iq["vcard_temp"]["PHOTO"]
|
||
binval = photo["BINVAL"]
|
||
type_ = photo["TYPE"]
|
||
|
||
if binval:
|
||
data = base64.b64decode(binval.replace('\n', '').replace(' ', '').strip())
|
||
mime = self._sniff_mime(data)
|
||
if not mime and type_ and len(type_.split('/')) > 1: mime = type_
|
||
if not mime: mime = "image/png"
|
||
return data, mime
|
||
except (IqError, Exception) as e:
|
||
logging.debug(f"XEP-0153 ({'ifrom' if use_ifrom else 'std'}) failed for {target}: {e}")
|
||
return None
|
||
|
||
async def try_0084(use_ifrom=False):
|
||
try:
|
||
kwargs = {'jid': target, 'node': "urn:xmpp:avatar:data", 'max_items': 1}
|
||
if use_ifrom: kwargs['ifrom'] = self.boundjid.full
|
||
|
||
iq = await self.plugin["xep_0060"].get_items(**kwargs)
|
||
items = iq["pubsub"]["items"]["substanzas"]
|
||
if items:
|
||
data_elem = items[0].xml.find("{urn:xmpp:avatar:data}data")
|
||
if data_elem is not None and data_elem.text:
|
||
b64_text = data_elem.text.replace('\n', '').replace(' ', '').strip()
|
||
data = base64.b64decode(b64_text)
|
||
mime = self._sniff_mime(data) or "image/png"
|
||
return data, mime
|
||
|
||
if "data" in items[0]:
|
||
d = items[0]["data"]
|
||
if isinstance(d, str): data = base64.b64decode(d)
|
||
else: data = d
|
||
return data, "image/png"
|
||
except (IqError, Exception) as e:
|
||
logging.debug(f"XEP-0084 ({'ifrom' if use_ifrom else 'std'}) failed for {target}: {e}")
|
||
return None
|
||
|
||
|
||
res = await try_0153()
|
||
if res: return res
|
||
|
||
res = await try_0084()
|
||
if res: return res
|
||
|
||
logging.debug(f"Primary avatar methods failed for {target}. Trying fallback logic.")
|
||
res = await try_0153(use_ifrom=True)
|
||
if res: return res
|
||
|
||
res = await try_0084(use_ifrom=True)
|
||
if res: return res
|
||
|
||
return None, None
|
||
|
||
async def warn(self, room, nick, reason):
|
||
if self.cfg['Moderation'].get('moderation_enabled') == 'NONE': return
|
||
|
||
k = f"{room}/{nick}"
|
||
c = self.warnings.get(k, 0) + 1
|
||
self.warnings[k] = c
|
||
self.db.set('warnings', self.warnings)
|
||
await self.send_group(room, f"⚠️ {nick} Warn {c}: {reason}")
|
||
if c >= int(self.cfg['Moderation'].get('max_warnings',3)):
|
||
self.warnings[k] = 0; self.db.set('warnings', self.warnings)
|
||
act = self.cfg['Moderation'].get('warning_action','mute')
|
||
try:
|
||
if act == 'kick':
|
||
await self.plugin['xep_0045'].set_role(room, nick, 'none', reason="Too many warnings")
|
||
elif act == 'mute':
|
||
await self.plugin['xep_0045'].set_role(room, nick, 'visitor', reason="Too many warnings")
|
||
except IqError:
|
||
await self.send_group(room, f"Cannot enforce warning: Permission denied.")
|
||
|
||
def is_mod(self, room, nick):
|
||
if nick.lower() in self.config_moderators:
|
||
return True
|
||
|
||
try:
|
||
room_roster = self.plugin['xep_0045'].rooms.get(room)
|
||
if not room_roster:
|
||
target_bare = slixmpp.JID(room).bare
|
||
for r_jid in self.plugin['xep_0045'].rooms:
|
||
if slixmpp.JID(r_jid).bare == target_bare:
|
||
room_roster = self.plugin['xep_0045'].rooms[r_jid]
|
||
break
|
||
|
||
if room_roster:
|
||
if nick in room_roster:
|
||
user = room_roster[nick]
|
||
if user['affiliation'] in ('admin', 'owner') or user['role'] == 'moderator':
|
||
return True
|
||
except Exception as e:
|
||
logging.debug(f"is_mod check failed: {e}")
|
||
|
||
return False
|
||
|
||
def is_nsfw_room(self, room):
|
||
mode = self.cfg['Features'].get('nsfw_mode', 'NONE')
|
||
if mode == 'ALL': return True
|
||
if mode == 'NONE': return False
|
||
allowed = [r.strip() for r in self.cfg['Features'].get('nsfw_rooms', '').split(',')]
|
||
return str(room) in allowed
|
||
|
||
def find_user(self, room, partial):
|
||
try:
|
||
nicks = list(self.plugin['xep_0045'].get_roster(room).keys())
|
||
match = get_close_matches(partial, nicks, n=1)
|
||
return match[0] if match else None
|
||
except: return None
|
||
|
||
async def send_group(self, room, text):
|
||
self.send_message(mto=room, mbody=str(text), mtype='groupchat')
|
||
|
||
async def send_private_muc(self, room, nick, text):
|
||
self.send_message(mto=f"{room}/{nick}", mbody=str(text), mtype='chat')
|
||
|
||
async def send_private_msg_by_jid(self, jid, text):
|
||
self.send_message(mto=jid, mbody=str(text), mtype='chat')
|
||
|
||
async def main():
|
||
logging.basicConfig(level=logging.DEBUG, format="%(asctime)s %(levelname)s %(message)s")
|
||
if not Path('config.ini').exists():
|
||
print("Config missing. Please make a config. Otherwise how would the bot work?")
|
||
return
|
||
bot = RevoltBot()
|
||
bot.connect()
|
||
await bot.disconnected
|
||
|
||
if __name__ == "__main__":
|
||
asyncio.run(main())
|
||
|