#!/usr/bin/env python3 import slixmpp import asyncio import logging import configparser import re import json from datetime import datetime, timedelta from dataclasses import dataclass, field, asdict from typing import Dict, List, Optional from pathlib import Path EMOJIS = ['1️⃣','2️⃣','3️⃣','4️⃣','5️⃣','6️⃣','7️⃣','8️⃣','9️⃣','🔟'] @dataclass class Poll: id: str room: str creator: str question: str options: List[str] votes: Dict[str, int] = field(default_factory=dict) anon: bool = False start: Optional[str] = None end: Optional[str] = None max_votes: Optional[int] = None closed: bool = False started: bool = True class PollBot(slixmpp.ClientXMPP): def __init__(self, jid, password, rooms, nick, store_path='polls.json', loop=None): super().__init__(jid, password, loop=loop) self.rooms, self.nick, self.store_path = rooms, nick, Path(store_path) self.polls: Dict[str, Poll] = {} self.cnt = 0 self.load_polls() self.register_plugin('xep_0045') self.register_plugin('xep_0444') self.add_event_handler("session_start", self.start) self.add_event_handler("message", self.on_msg) self.add_event_handler("reactions", self.on_reaction) def load_polls(self): try: data = json.loads(self.store_path.read_text()) self.polls = {k: Poll(**v) for k, v in data.get('polls', {}).items()} self.cnt = data.get('cnt', 0) except: pass def save_polls(self): self.store_path.write_text(json.dumps({'polls': {k: asdict(v) for k, v in self.polls.items()}, 'cnt': self.cnt})) async def start(self, event): self.send_presence() self.get_roster() for r in self.rooms: self.plugin['xep_0045'].join_muc(r, self.nick) asyncio.create_task(self.scheduler()) async def scheduler(self): while True: await asyncio.sleep(30) now = datetime.now() for pid, p in list(self.polls.items()): if not p.started and p.start and now >= datetime.fromisoformat(p.start): p.started = True self.announce(p) self.save_polls() if not p.closed and p.end and now >= datetime.fromisoformat(p.end): self.close(pid, auto=True) def on_msg(self, msg): # CHANGED: Don't ignore groupchat, but ignore our own messages to prevent loops if msg['type'] == 'groupchat' and msg['mucnick'] == self.nick: return body = msg['body'].strip() if not body: return # Room logic: In groupchat, 'from'.bare is the room JID. In chat, it's the sender. room = msg['from'].bare parts = body.split(None, 1) cmd, args = parts[0].lower(), parts[1] if len(parts) > 1 else '' handlers = {'!poll': self.create_poll, '!vote': self.vote, '!close': self.close_cmd, '!status': self.status, '!list': self.list_polls, '!help': self.help, '!delete': self.delete} if cmd in handlers: handlers[cmd](msg, args, room) def on_reaction(self, msg): try: reactions = msg['reactions']['values'] sender = str(msg['from'].bare) # Reaction parsing remains mostly the same, though reactions in MUC usually handle themselves for p in self.polls.values(): if p.closed or not p.started: continue for emoji in reactions: if emoji in EMOJIS: idx = EMOJIS.index(emoji) if idx < len(p.options): p.votes[sender] = idx self.save_polls() # We don't reply to reactions to avoid spamming the chat flow if p.max_votes and len(p.votes) >= p.max_votes: self.close(p.id, auto=True) return except: pass def create_poll(self, msg, args, room): parts = re.findall(r'"([^"]+)"', args) if len(parts) < 3: return self.reply(msg, '!poll "Question" "A" "B" [duration=1h/1d] [anon=true] [votes=N] [start=YYYY-MM-DDTHH:MM]') q, opts = parts[0], parts[1:11] anon = 'anon=true' in args.lower() dur, start, max_v = self.parse_dur(args), self.parse_time(args, 'start'), self.parse_int(args, 'votes') end = None if dur: if dur > timedelta(days=30): return self.reply(msg, 'Max duration: 30 days') base = datetime.fromisoformat(start) if start else datetime.now() end = (base + dur).isoformat() self.cnt += 1 pid = f"p{self.cnt}" # For groupchat messages, msg['from'] includes the resource (nickname), # so we strip it to get the user JID only if possible, or use the full from address. # In standard MUCs without XEP-0084/others, getting the real JID is tricky if not moderator. # We will use the string representation of 'from' for tracking ownership. creator = str(msg['from']) p = Poll(pid, room, creator, q, opts, {}, anon, start, end, max_v, False, start is None) self.polls[pid] = p self.save_polls() if p.started: self.announce(p) else: self.reply(msg, f'Poll {pid} scheduled for {start}') def announce(self, p): lines = [f"📊 Poll {p.id}: {p.question}"] + [f"{EMOJIS[i]} {o}" for i, o in enumerate(p.options)] lines.append(f"Vote: !vote {p.id} <1-{len(p.options)}> or react with emoji") if p.anon: lines.append("🔒 Anonymous voting (Results hidden until close)") if p.end: lines.append(f"⏰ Closes: {p.end}") if p.max_votes: lines.append(f"🎯 Closes at {p.max_votes} votes") self.send_message(mto=p.room, mbody='\n'.join(lines), mtype='groupchat') def vote(self, msg, args, room): parts = args.split() if len(parts) != 2: return self.reply(msg, '!vote ') pid, choice = parts if pid not in self.polls: return self.reply(msg, 'Poll not found') p = self.polls[pid] if p.closed: return self.reply(msg, 'Poll closed') if not p.started: return self.reply(msg, 'Poll not started') try: idx = int(choice) - 1 assert 0 <= idx < len(p.options) except: return self.reply(msg, f'Pick 1-{len(p.options)}') voter = str(msg['from'].bare) # Use bare JID so it works for PMs and MUCs changed = voter in p.votes p.votes[voter] = idx self.save_polls() # If anon, don't leak the choice in the public reply if p.anon: self.reply(msg, 'Vote recorded (Anonymous)') else: self.reply(msg, f'{"Changed to" if changed else "Voted"}: {p.options[idx]}') if p.max_votes and len(p.votes) >= p.max_votes: self.close(pid, auto=True) def close_cmd(self, msg, args, room): pid = args.strip() if pid not in self.polls: return self.reply(msg, 'Not found') p = self.polls[pid] # In MUC, simple string comparison might fail if nick/resource varies. # Simple auth check: if str(msg['from']) != p.creator and str(msg['from'].bare) != p.creator: return self.reply(msg, 'Only creator can close') self.close(pid) self.reply(msg, f'Closed {pid}') def close(self, pid, auto=False): p = self.polls[pid] if p.closed: return p.closed = True self.save_polls() cnt = [sum(1 for v in p.votes.values() if v == i) for i in range(len(p.options))] total = sum(cnt) lines = [f"📊 Results - {p.id}: {p.question}"] winner_idx = cnt.index(max(cnt)) if total else -1 for i, o in enumerate(p.options): pct = cnt[i]/total*100 if total else 0 bar = '▓' * int(pct/5) + '░' * (20-int(pct/5)) win = ' 🏆' if i == winner_idx and total else '' lines.append(f"{EMOJIS[i]} {o}: {cnt[i]} ({pct:.0f}%) {bar}{win}") lines.append(f"📈 Total: {total} vote{'s' if total!=1 else ''}{' (auto-closed)' if auto else ''}") self.send_message(mto=p.room, mbody='\n'.join(lines), mtype='groupchat') def status(self, msg, args, room): pid = args.strip() if pid not in self.polls: return self.reply(msg, 'Not found') p = self.polls[pid] info = [f"Poll {pid}: {p.question}", f"Status: {'closed' if p.closed else 'scheduled' if not p.started else 'open'}"] if p.anon: info.append(f"Votes: {len(p.votes)} (anonymous)") else: cnt = [sum(1 for v in p.votes.values() if v == i) for i in range(len(p.options))] info += [f"{o}: {cnt[i]}" for i, o in enumerate(p.options)] if p.end: info.append(f"Ends: {p.end}") self.reply(msg, '\n'.join(info)) def list_polls(self, msg, args, room): # Filter polls by the current room active = [p for p in self.polls.values() if not p.closed and p.room == room] if not active: return self.reply(msg, 'No active polls in this room') lines = [f"{p.id}: {p.question[:50]}{'...' if len(p.question)>50 else ''} [{'scheduled' if not p.started else 'open'}]" for p in active] self.reply(msg, '\n'.join(lines)) def delete(self, msg, args, room): pid = args.strip() if pid not in self.polls: return self.reply(msg, 'Not found') if str(msg['from']) != self.polls[pid].creator: return self.reply(msg, 'Only creator can delete') del self.polls[pid] self.save_polls() self.reply(msg, f'Deleted {pid}') def help(self, msg, args, room): self.reply(msg, '''📊 Poll Bot Commands: !poll "Question" "Opt1" "Opt2" [options] - Create poll Options: duration=1h/1d anon=true votes=N start=YYYY-MM-DDTHH:MM !vote - Cast vote (or react with 1️⃣2️⃣3️⃣ emoji) !close - Close poll & show results !status - Check poll status !list - List active polls !delete - Delete your poll''') def reply(self, msg, text): # CHANGED: Detect message type to reply in context mtype = msg['type'] if mtype == 'groupchat': # In groupchat, reply to the bare JID (the room) self.send_message(mto=msg['from'].bare, mbody=text, mtype='groupchat') else: # In normal chat, reply to the sender self.send_message(mto=msg['from'], mbody=text, mtype='chat') def parse_dur(self, s): m = re.search(r'duration=(\d+)([mhd])', s.lower()) if not m: return None v, u = int(m[1]), m[2] return {'m': timedelta(minutes=v), 'h': timedelta(hours=v), 'd': timedelta(days=v)}[u] def parse_time(self, s, key): m = re.search(rf'{key}=(\d{{4}}-\d{{2}}-\d{{2}}T\d{{2}}:\d{{2}})', s) return m[1] if m else None def parse_int(self, s, key): m = re.search(rf'{key}=(\d+)', s) return int(m[1]) if m else None if __name__ == '__main__': async def main(): logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s: %(message)s') c = configparser.ConfigParser() c.read('pollconfig.ini') loop = asyncio.get_running_loop() bot = PollBot( c['XMPP']['jid'], c['XMPP']['password'], [r.strip() for r in c['XMPP']['rooms'].split(',')], c.get('Bot', 'nickname', fallback='PollBot'), c.get('Bot', 'store_path', fallback='polls.json'), loop=loop) bot.connect() logging.info("Poll bot connected...") await bot.disconnected asyncio.run(main())