Files
xmpp-poll-bot/pollbot.py
2026-01-16 20:39:47 +00:00

282 lines
12 KiB
Python
Raw Blame History

This file contains invisible Unicode characters
This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
import slixmpp
import asyncio
import logging
import configparser
import re
import json
from datetime import datetime, timedelta
from dataclasses import dataclass, field, asdict
from typing import Dict, List, Optional
from pathlib import Path
EMOJIS = ['1','2','3','4','5','6','7','8','9','🔟']
@dataclass
class Poll:
id: str
room: str
creator: str
question: str
options: List[str]
votes: Dict[str, int] = field(default_factory=dict)
anon: bool = False
start: Optional[str] = None
end: Optional[str] = None
max_votes: Optional[int] = None
closed: bool = False
started: bool = True
class PollBot(slixmpp.ClientXMPP):
def __init__(self, jid, password, rooms, nick, store_path='polls.json', loop=None):
super().__init__(jid, password, loop=loop)
self.rooms, self.nick, self.store_path = rooms, nick, Path(store_path)
self.polls: Dict[str, Poll] = {}
self.cnt = 0
self.load_polls()
self.register_plugin('xep_0045')
self.register_plugin('xep_0444')
self.add_event_handler("session_start", self.start)
self.add_event_handler("message", self.on_msg)
self.add_event_handler("reactions", self.on_reaction)
def load_polls(self):
try:
data = json.loads(self.store_path.read_text())
self.polls = {k: Poll(**v) for k, v in data.get('polls', {}).items()}
self.cnt = data.get('cnt', 0)
except: pass
def save_polls(self):
self.store_path.write_text(json.dumps({'polls': {k: asdict(v) for k, v in self.polls.items()}, 'cnt': self.cnt}))
async def start(self, event):
self.send_presence()
self.get_roster()
for r in self.rooms:
self.plugin['xep_0045'].join_muc(r, self.nick)
asyncio.create_task(self.scheduler())
async def scheduler(self):
while True:
await asyncio.sleep(30)
now = datetime.now()
for pid, p in list(self.polls.items()):
if not p.started and p.start and now >= datetime.fromisoformat(p.start):
p.started = True
self.announce(p)
self.save_polls()
if not p.closed and p.end and now >= datetime.fromisoformat(p.end):
self.close(pid, auto=True)
def on_msg(self, msg):
# CHANGED: Don't ignore groupchat, but ignore our own messages to prevent loops
if msg['type'] == 'groupchat' and msg['mucnick'] == self.nick: return
body = msg['body'].strip()
if not body: return
# Room logic: In groupchat, 'from'.bare is the room JID. In chat, it's the sender.
room = msg['from'].bare
parts = body.split(None, 1)
cmd, args = parts[0].lower(), parts[1] if len(parts) > 1 else ''
handlers = {'!poll': self.create_poll, '!vote': self.vote, '!close': self.close_cmd,
'!status': self.status, '!list': self.list_polls, '!help': self.help, '!delete': self.delete}
if cmd in handlers:
handlers[cmd](msg, args, room)
def on_reaction(self, msg):
try:
reactions = msg['reactions']['values']
sender = str(msg['from'].bare)
# Reaction parsing remains mostly the same, though reactions in MUC usually handle themselves
for p in self.polls.values():
if p.closed or not p.started: continue
for emoji in reactions:
if emoji in EMOJIS:
idx = EMOJIS.index(emoji)
if idx < len(p.options):
p.votes[sender] = idx
self.save_polls()
# We don't reply to reactions to avoid spamming the chat flow
if p.max_votes and len(p.votes) >= p.max_votes:
self.close(p.id, auto=True)
return
except: pass
def create_poll(self, msg, args, room):
parts = re.findall(r'"([^"]+)"', args)
if len(parts) < 3:
return self.reply(msg, '!poll "Question" "A" "B" [duration=1h/1d] [anon=true] [votes=N] [start=YYYY-MM-DDTHH:MM]')
q, opts = parts[0], parts[1:11]
anon = 'anon=true' in args.lower()
dur, start, max_v = self.parse_dur(args), self.parse_time(args, 'start'), self.parse_int(args, 'votes')
end = None
if dur:
if dur > timedelta(days=30): return self.reply(msg, 'Max duration: 30 days')
base = datetime.fromisoformat(start) if start else datetime.now()
end = (base + dur).isoformat()
self.cnt += 1
pid = f"p{self.cnt}"
# For groupchat messages, msg['from'] includes the resource (nickname),
# so we strip it to get the user JID only if possible, or use the full from address.
# In standard MUCs without XEP-0084/others, getting the real JID is tricky if not moderator.
# We will use the string representation of 'from' for tracking ownership.
creator = str(msg['from'])
p = Poll(pid, room, creator, q, opts, {}, anon, start, end, max_v, False, start is None)
self.polls[pid] = p
self.save_polls()
if p.started: self.announce(p)
else: self.reply(msg, f'Poll {pid} scheduled for {start}')
def announce(self, p):
lines = [f"📊 Poll {p.id}: {p.question}"] + [f"{EMOJIS[i]} {o}" for i, o in enumerate(p.options)]
lines.append(f"Vote: !vote {p.id} <1-{len(p.options)}> or react with emoji")
if p.anon: lines.append("🔒 Anonymous voting (Results hidden until close)")
if p.end: lines.append(f"⏰ Closes: {p.end}")
if p.max_votes: lines.append(f"🎯 Closes at {p.max_votes} votes")
self.send_message(mto=p.room, mbody='\n'.join(lines), mtype='groupchat')
def vote(self, msg, args, room):
parts = args.split()
if len(parts) != 2: return self.reply(msg, '!vote <poll_id> <number>')
pid, choice = parts
if pid not in self.polls: return self.reply(msg, 'Poll not found')
p = self.polls[pid]
if p.closed: return self.reply(msg, 'Poll closed')
if not p.started: return self.reply(msg, 'Poll not started')
try:
idx = int(choice) - 1
assert 0 <= idx < len(p.options)
except: return self.reply(msg, f'Pick 1-{len(p.options)}')
voter = str(msg['from'].bare) # Use bare JID so it works for PMs and MUCs
changed = voter in p.votes
p.votes[voter] = idx
self.save_polls()
# If anon, don't leak the choice in the public reply
if p.anon:
self.reply(msg, 'Vote recorded (Anonymous)')
else:
self.reply(msg, f'{"Changed to" if changed else "Voted"}: {p.options[idx]}')
if p.max_votes and len(p.votes) >= p.max_votes:
self.close(pid, auto=True)
def close_cmd(self, msg, args, room):
pid = args.strip()
if pid not in self.polls: return self.reply(msg, 'Not found')
p = self.polls[pid]
# In MUC, simple string comparison might fail if nick/resource varies.
# Simple auth check:
if str(msg['from']) != p.creator and str(msg['from'].bare) != p.creator:
return self.reply(msg, 'Only creator can close')
self.close(pid)
self.reply(msg, f'Closed {pid}')
def close(self, pid, auto=False):
p = self.polls[pid]
if p.closed: return
p.closed = True
self.save_polls()
cnt = [sum(1 for v in p.votes.values() if v == i) for i in range(len(p.options))]
total = sum(cnt)
lines = [f"📊 Results - {p.id}: {p.question}"]
winner_idx = cnt.index(max(cnt)) if total else -1
for i, o in enumerate(p.options):
pct = cnt[i]/total*100 if total else 0
bar = '' * int(pct/5) + '' * (20-int(pct/5))
win = ' 🏆' if i == winner_idx and total else ''
lines.append(f"{EMOJIS[i]} {o}: {cnt[i]} ({pct:.0f}%) {bar}{win}")
lines.append(f"📈 Total: {total} vote{'s' if total!=1 else ''}{' (auto-closed)' if auto else ''}")
self.send_message(mto=p.room, mbody='\n'.join(lines), mtype='groupchat')
def status(self, msg, args, room):
pid = args.strip()
if pid not in self.polls: return self.reply(msg, 'Not found')
p = self.polls[pid]
info = [f"Poll {pid}: {p.question}", f"Status: {'closed' if p.closed else 'scheduled' if not p.started else 'open'}"]
if p.anon:
info.append(f"Votes: {len(p.votes)} (anonymous)")
else:
cnt = [sum(1 for v in p.votes.values() if v == i) for i in range(len(p.options))]
info += [f"{o}: {cnt[i]}" for i, o in enumerate(p.options)]
if p.end: info.append(f"Ends: {p.end}")
self.reply(msg, '\n'.join(info))
def list_polls(self, msg, args, room):
# Filter polls by the current room
active = [p for p in self.polls.values() if not p.closed and p.room == room]
if not active: return self.reply(msg, 'No active polls in this room')
lines = [f"{p.id}: {p.question[:50]}{'...' if len(p.question)>50 else ''} [{'scheduled' if not p.started else 'open'}]" for p in active]
self.reply(msg, '\n'.join(lines))
def delete(self, msg, args, room):
pid = args.strip()
if pid not in self.polls: return self.reply(msg, 'Not found')
if str(msg['from']) != self.polls[pid].creator: return self.reply(msg, 'Only creator can delete')
del self.polls[pid]
self.save_polls()
self.reply(msg, f'Deleted {pid}')
def help(self, msg, args, room):
self.reply(msg, '''📊 Poll Bot Commands:
!poll "Question" "Opt1" "Opt2" [options] - Create poll
Options: duration=1h/1d anon=true votes=N start=YYYY-MM-DDTHH:MM
!vote <id> <num> - Cast vote (or react with 1⃣2⃣3⃣ emoji)
!close <id> - Close poll & show results
!status <id> - Check poll status
!list - List active polls
!delete <id> - Delete your poll''')
def reply(self, msg, text):
# CHANGED: Detect message type to reply in context
mtype = msg['type']
if mtype == 'groupchat':
# In groupchat, reply to the bare JID (the room)
self.send_message(mto=msg['from'].bare, mbody=text, mtype='groupchat')
else:
# In normal chat, reply to the sender
self.send_message(mto=msg['from'], mbody=text, mtype='chat')
def parse_dur(self, s):
m = re.search(r'duration=(\d+)([mhd])', s.lower())
if not m: return None
v, u = int(m[1]), m[2]
return {'m': timedelta(minutes=v), 'h': timedelta(hours=v), 'd': timedelta(days=v)}[u]
def parse_time(self, s, key):
m = re.search(rf'{key}=(\d{{4}}-\d{{2}}-\d{{2}}T\d{{2}}:\d{{2}})', s)
return m[1] if m else None
def parse_int(self, s, key):
m = re.search(rf'{key}=(\d+)', s)
return int(m[1]) if m else None
if __name__ == '__main__':
async def main():
logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s: %(message)s')
c = configparser.ConfigParser()
c.read('pollconfig.ini')
loop = asyncio.get_running_loop()
bot = PollBot(
c['XMPP']['jid'], c['XMPP']['password'],
[r.strip() for r in c['XMPP']['rooms'].split(',')],
c.get('Bot', 'nickname', fallback='PollBot'),
c.get('Bot', 'store_path', fallback='polls.json'),
loop=loop)
bot.connect()
logging.info("Poll bot connected...")
await bot.disconnected
asyncio.run(main())