From 0bd32f3aa4983baf960f6ab889662c738cdbaf27 Mon Sep 17 00:00:00 2001 From: just n Date: Fri, 16 Jan 2026 20:39:47 +0000 Subject: [PATCH] Add pollbot.py --- pollbot.py | 281 +++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 281 insertions(+) create mode 100644 pollbot.py diff --git a/pollbot.py b/pollbot.py new file mode 100644 index 0000000..670634c --- /dev/null +++ b/pollbot.py @@ -0,0 +1,281 @@ +#!/usr/bin/env python3 +import slixmpp +import asyncio +import logging +import configparser +import re +import json +from datetime import datetime, timedelta +from dataclasses import dataclass, field, asdict +from typing import Dict, List, Optional +from pathlib import Path + +EMOJIS = ['1️⃣','2️⃣','3️⃣','4️⃣','5️⃣','6️⃣','7️⃣','8️⃣','9️⃣','🔟'] + +@dataclass +class Poll: + id: str + room: str + creator: str + question: str + options: List[str] + votes: Dict[str, int] = field(default_factory=dict) + anon: bool = False + start: Optional[str] = None + end: Optional[str] = None + max_votes: Optional[int] = None + closed: bool = False + started: bool = True + +class PollBot(slixmpp.ClientXMPP): + def __init__(self, jid, password, rooms, nick, store_path='polls.json', loop=None): + super().__init__(jid, password, loop=loop) + self.rooms, self.nick, self.store_path = rooms, nick, Path(store_path) + self.polls: Dict[str, Poll] = {} + self.cnt = 0 + self.load_polls() + self.register_plugin('xep_0045') + self.register_plugin('xep_0444') + self.add_event_handler("session_start", self.start) + self.add_event_handler("message", self.on_msg) + self.add_event_handler("reactions", self.on_reaction) + + def load_polls(self): + try: + data = json.loads(self.store_path.read_text()) + self.polls = {k: Poll(**v) for k, v in data.get('polls', {}).items()} + self.cnt = data.get('cnt', 0) + except: pass + + def save_polls(self): + self.store_path.write_text(json.dumps({'polls': {k: asdict(v) for k, v in self.polls.items()}, 'cnt': self.cnt})) + + async def start(self, event): + self.send_presence() + self.get_roster() + for r in self.rooms: + self.plugin['xep_0045'].join_muc(r, self.nick) + asyncio.create_task(self.scheduler()) + + async def scheduler(self): + while True: + await asyncio.sleep(30) + now = datetime.now() + for pid, p in list(self.polls.items()): + if not p.started and p.start and now >= datetime.fromisoformat(p.start): + p.started = True + self.announce(p) + self.save_polls() + if not p.closed and p.end and now >= datetime.fromisoformat(p.end): + self.close(pid, auto=True) + + def on_msg(self, msg): + # CHANGED: Don't ignore groupchat, but ignore our own messages to prevent loops + if msg['type'] == 'groupchat' and msg['mucnick'] == self.nick: return + + body = msg['body'].strip() + if not body: return + + # Room logic: In groupchat, 'from'.bare is the room JID. In chat, it's the sender. + room = msg['from'].bare + + parts = body.split(None, 1) + cmd, args = parts[0].lower(), parts[1] if len(parts) > 1 else '' + + handlers = {'!poll': self.create_poll, '!vote': self.vote, '!close': self.close_cmd, + '!status': self.status, '!list': self.list_polls, '!help': self.help, '!delete': self.delete} + if cmd in handlers: + handlers[cmd](msg, args, room) + + def on_reaction(self, msg): + try: + reactions = msg['reactions']['values'] + sender = str(msg['from'].bare) + # Reaction parsing remains mostly the same, though reactions in MUC usually handle themselves + for p in self.polls.values(): + if p.closed or not p.started: continue + for emoji in reactions: + if emoji in EMOJIS: + idx = EMOJIS.index(emoji) + if idx < len(p.options): + p.votes[sender] = idx + self.save_polls() + # We don't reply to reactions to avoid spamming the chat flow + if p.max_votes and len(p.votes) >= p.max_votes: + self.close(p.id, auto=True) + return + except: pass + + def create_poll(self, msg, args, room): + parts = re.findall(r'"([^"]+)"', args) + if len(parts) < 3: + return self.reply(msg, '!poll "Question" "A" "B" [duration=1h/1d] [anon=true] [votes=N] [start=YYYY-MM-DDTHH:MM]') + q, opts = parts[0], parts[1:11] + anon = 'anon=true' in args.lower() + dur, start, max_v = self.parse_dur(args), self.parse_time(args, 'start'), self.parse_int(args, 'votes') + end = None + if dur: + if dur > timedelta(days=30): return self.reply(msg, 'Max duration: 30 days') + base = datetime.fromisoformat(start) if start else datetime.now() + end = (base + dur).isoformat() + self.cnt += 1 + pid = f"p{self.cnt}" + # For groupchat messages, msg['from'] includes the resource (nickname), + # so we strip it to get the user JID only if possible, or use the full from address. + # In standard MUCs without XEP-0084/others, getting the real JID is tricky if not moderator. + # We will use the string representation of 'from' for tracking ownership. + creator = str(msg['from']) + + p = Poll(pid, room, creator, q, opts, {}, anon, start, end, max_v, False, start is None) + self.polls[pid] = p + self.save_polls() + if p.started: self.announce(p) + else: self.reply(msg, f'Poll {pid} scheduled for {start}') + + def announce(self, p): + lines = [f"📊 Poll {p.id}: {p.question}"] + [f"{EMOJIS[i]} {o}" for i, o in enumerate(p.options)] + lines.append(f"Vote: !vote {p.id} <1-{len(p.options)}> or react with emoji") + if p.anon: lines.append("🔒 Anonymous voting (Results hidden until close)") + if p.end: lines.append(f"⏰ Closes: {p.end}") + if p.max_votes: lines.append(f"🎯 Closes at {p.max_votes} votes") + self.send_message(mto=p.room, mbody='\n'.join(lines), mtype='groupchat') + + def vote(self, msg, args, room): + parts = args.split() + if len(parts) != 2: return self.reply(msg, '!vote ') + pid, choice = parts + if pid not in self.polls: return self.reply(msg, 'Poll not found') + p = self.polls[pid] + if p.closed: return self.reply(msg, 'Poll closed') + if not p.started: return self.reply(msg, 'Poll not started') + try: + idx = int(choice) - 1 + assert 0 <= idx < len(p.options) + except: return self.reply(msg, f'Pick 1-{len(p.options)}') + + voter = str(msg['from'].bare) # Use bare JID so it works for PMs and MUCs + changed = voter in p.votes + p.votes[voter] = idx + self.save_polls() + + # If anon, don't leak the choice in the public reply + if p.anon: + self.reply(msg, 'Vote recorded (Anonymous)') + else: + self.reply(msg, f'{"Changed to" if changed else "Voted"}: {p.options[idx]}') + + if p.max_votes and len(p.votes) >= p.max_votes: + self.close(pid, auto=True) + + def close_cmd(self, msg, args, room): + pid = args.strip() + if pid not in self.polls: return self.reply(msg, 'Not found') + p = self.polls[pid] + # In MUC, simple string comparison might fail if nick/resource varies. + # Simple auth check: + if str(msg['from']) != p.creator and str(msg['from'].bare) != p.creator: + return self.reply(msg, 'Only creator can close') + self.close(pid) + self.reply(msg, f'Closed {pid}') + + def close(self, pid, auto=False): + p = self.polls[pid] + if p.closed: return + p.closed = True + self.save_polls() + cnt = [sum(1 for v in p.votes.values() if v == i) for i in range(len(p.options))] + total = sum(cnt) + lines = [f"📊 Results - {p.id}: {p.question}"] + winner_idx = cnt.index(max(cnt)) if total else -1 + for i, o in enumerate(p.options): + pct = cnt[i]/total*100 if total else 0 + bar = '▓' * int(pct/5) + '░' * (20-int(pct/5)) + win = ' 🏆' if i == winner_idx and total else '' + lines.append(f"{EMOJIS[i]} {o}: {cnt[i]} ({pct:.0f}%) {bar}{win}") + lines.append(f"📈 Total: {total} vote{'s' if total!=1 else ''}{' (auto-closed)' if auto else ''}") + self.send_message(mto=p.room, mbody='\n'.join(lines), mtype='groupchat') + + def status(self, msg, args, room): + pid = args.strip() + if pid not in self.polls: return self.reply(msg, 'Not found') + p = self.polls[pid] + info = [f"Poll {pid}: {p.question}", f"Status: {'closed' if p.closed else 'scheduled' if not p.started else 'open'}"] + if p.anon: + info.append(f"Votes: {len(p.votes)} (anonymous)") + else: + cnt = [sum(1 for v in p.votes.values() if v == i) for i in range(len(p.options))] + info += [f"{o}: {cnt[i]}" for i, o in enumerate(p.options)] + if p.end: info.append(f"Ends: {p.end}") + self.reply(msg, '\n'.join(info)) + + def list_polls(self, msg, args, room): + # Filter polls by the current room + active = [p for p in self.polls.values() if not p.closed and p.room == room] + if not active: return self.reply(msg, 'No active polls in this room') + lines = [f"{p.id}: {p.question[:50]}{'...' if len(p.question)>50 else ''} [{'scheduled' if not p.started else 'open'}]" for p in active] + self.reply(msg, '\n'.join(lines)) + + def delete(self, msg, args, room): + pid = args.strip() + if pid not in self.polls: return self.reply(msg, 'Not found') + if str(msg['from']) != self.polls[pid].creator: return self.reply(msg, 'Only creator can delete') + del self.polls[pid] + self.save_polls() + self.reply(msg, f'Deleted {pid}') + + def help(self, msg, args, room): + self.reply(msg, '''📊 Poll Bot Commands: +!poll "Question" "Opt1" "Opt2" [options] - Create poll + Options: duration=1h/1d anon=true votes=N start=YYYY-MM-DDTHH:MM +!vote - Cast vote (or react with 1️⃣2️⃣3️⃣ emoji) +!close - Close poll & show results +!status - Check poll status +!list - List active polls +!delete - Delete your poll''') + + def reply(self, msg, text): + # CHANGED: Detect message type to reply in context + mtype = msg['type'] + if mtype == 'groupchat': + # In groupchat, reply to the bare JID (the room) + self.send_message(mto=msg['from'].bare, mbody=text, mtype='groupchat') + else: + # In normal chat, reply to the sender + self.send_message(mto=msg['from'], mbody=text, mtype='chat') + + def parse_dur(self, s): + m = re.search(r'duration=(\d+)([mhd])', s.lower()) + if not m: return None + v, u = int(m[1]), m[2] + return {'m': timedelta(minutes=v), 'h': timedelta(hours=v), 'd': timedelta(days=v)}[u] + + def parse_time(self, s, key): + m = re.search(rf'{key}=(\d{{4}}-\d{{2}}-\d{{2}}T\d{{2}}:\d{{2}})', s) + return m[1] if m else None + + def parse_int(self, s, key): + m = re.search(rf'{key}=(\d+)', s) + return int(m[1]) if m else None + +if __name__ == '__main__': + async def main(): + logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s: %(message)s') + c = configparser.ConfigParser() + c.read('pollconfig.ini') + + loop = asyncio.get_running_loop() + + bot = PollBot( + c['XMPP']['jid'], c['XMPP']['password'], + [r.strip() for r in c['XMPP']['rooms'].split(',')], + c.get('Bot', 'nickname', fallback='PollBot'), + c.get('Bot', 'store_path', fallback='polls.json'), + loop=loop) + + bot.connect() + logging.info("Poll bot connected...") + + await bot.disconnected + + asyncio.run(main()) +