282 lines
12 KiB
Python
282 lines
12 KiB
Python
#!/usr/bin/env python3
|
||
import slixmpp
|
||
import asyncio
|
||
import logging
|
||
import configparser
|
||
import re
|
||
import json
|
||
from datetime import datetime, timedelta
|
||
from dataclasses import dataclass, field, asdict
|
||
from typing import Dict, List, Optional
|
||
from pathlib import Path
|
||
|
||
EMOJIS = ['1️⃣','2️⃣','3️⃣','4️⃣','5️⃣','6️⃣','7️⃣','8️⃣','9️⃣','🔟']
|
||
|
||
@dataclass
|
||
class Poll:
|
||
id: str
|
||
room: str
|
||
creator: str
|
||
question: str
|
||
options: List[str]
|
||
votes: Dict[str, int] = field(default_factory=dict)
|
||
anon: bool = False
|
||
start: Optional[str] = None
|
||
end: Optional[str] = None
|
||
max_votes: Optional[int] = None
|
||
closed: bool = False
|
||
started: bool = True
|
||
|
||
class PollBot(slixmpp.ClientXMPP):
|
||
def __init__(self, jid, password, rooms, nick, store_path='polls.json', loop=None):
|
||
super().__init__(jid, password, loop=loop)
|
||
self.rooms, self.nick, self.store_path = rooms, nick, Path(store_path)
|
||
self.polls: Dict[str, Poll] = {}
|
||
self.cnt = 0
|
||
self.load_polls()
|
||
self.register_plugin('xep_0045')
|
||
self.register_plugin('xep_0444')
|
||
self.add_event_handler("session_start", self.start)
|
||
self.add_event_handler("message", self.on_msg)
|
||
self.add_event_handler("reactions", self.on_reaction)
|
||
|
||
def load_polls(self):
|
||
try:
|
||
data = json.loads(self.store_path.read_text())
|
||
self.polls = {k: Poll(**v) for k, v in data.get('polls', {}).items()}
|
||
self.cnt = data.get('cnt', 0)
|
||
except: pass
|
||
|
||
def save_polls(self):
|
||
self.store_path.write_text(json.dumps({'polls': {k: asdict(v) for k, v in self.polls.items()}, 'cnt': self.cnt}))
|
||
|
||
async def start(self, event):
|
||
self.send_presence()
|
||
self.get_roster()
|
||
for r in self.rooms:
|
||
self.plugin['xep_0045'].join_muc(r, self.nick)
|
||
asyncio.create_task(self.scheduler())
|
||
|
||
async def scheduler(self):
|
||
while True:
|
||
await asyncio.sleep(30)
|
||
now = datetime.now()
|
||
for pid, p in list(self.polls.items()):
|
||
if not p.started and p.start and now >= datetime.fromisoformat(p.start):
|
||
p.started = True
|
||
self.announce(p)
|
||
self.save_polls()
|
||
if not p.closed and p.end and now >= datetime.fromisoformat(p.end):
|
||
self.close(pid, auto=True)
|
||
|
||
def on_msg(self, msg):
|
||
# CHANGED: Don't ignore groupchat, but ignore our own messages to prevent loops
|
||
if msg['type'] == 'groupchat' and msg['mucnick'] == self.nick: return
|
||
|
||
body = msg['body'].strip()
|
||
if not body: return
|
||
|
||
# Room logic: In groupchat, 'from'.bare is the room JID. In chat, it's the sender.
|
||
room = msg['from'].bare
|
||
|
||
parts = body.split(None, 1)
|
||
cmd, args = parts[0].lower(), parts[1] if len(parts) > 1 else ''
|
||
|
||
handlers = {'!poll': self.create_poll, '!vote': self.vote, '!close': self.close_cmd,
|
||
'!status': self.status, '!list': self.list_polls, '!help': self.help, '!delete': self.delete}
|
||
if cmd in handlers:
|
||
handlers[cmd](msg, args, room)
|
||
|
||
def on_reaction(self, msg):
|
||
try:
|
||
reactions = msg['reactions']['values']
|
||
sender = str(msg['from'].bare)
|
||
# Reaction parsing remains mostly the same, though reactions in MUC usually handle themselves
|
||
for p in self.polls.values():
|
||
if p.closed or not p.started: continue
|
||
for emoji in reactions:
|
||
if emoji in EMOJIS:
|
||
idx = EMOJIS.index(emoji)
|
||
if idx < len(p.options):
|
||
p.votes[sender] = idx
|
||
self.save_polls()
|
||
# We don't reply to reactions to avoid spamming the chat flow
|
||
if p.max_votes and len(p.votes) >= p.max_votes:
|
||
self.close(p.id, auto=True)
|
||
return
|
||
except: pass
|
||
|
||
def create_poll(self, msg, args, room):
|
||
parts = re.findall(r'"([^"]+)"', args)
|
||
if len(parts) < 3:
|
||
return self.reply(msg, '!poll "Question" "A" "B" [duration=1h/1d] [anon=true] [votes=N] [start=YYYY-MM-DDTHH:MM]')
|
||
q, opts = parts[0], parts[1:11]
|
||
anon = 'anon=true' in args.lower()
|
||
dur, start, max_v = self.parse_dur(args), self.parse_time(args, 'start'), self.parse_int(args, 'votes')
|
||
end = None
|
||
if dur:
|
||
if dur > timedelta(days=30): return self.reply(msg, 'Max duration: 30 days')
|
||
base = datetime.fromisoformat(start) if start else datetime.now()
|
||
end = (base + dur).isoformat()
|
||
self.cnt += 1
|
||
pid = f"p{self.cnt}"
|
||
# For groupchat messages, msg['from'] includes the resource (nickname),
|
||
# so we strip it to get the user JID only if possible, or use the full from address.
|
||
# In standard MUCs without XEP-0084/others, getting the real JID is tricky if not moderator.
|
||
# We will use the string representation of 'from' for tracking ownership.
|
||
creator = str(msg['from'])
|
||
|
||
p = Poll(pid, room, creator, q, opts, {}, anon, start, end, max_v, False, start is None)
|
||
self.polls[pid] = p
|
||
self.save_polls()
|
||
if p.started: self.announce(p)
|
||
else: self.reply(msg, f'Poll {pid} scheduled for {start}')
|
||
|
||
def announce(self, p):
|
||
lines = [f"📊 Poll {p.id}: {p.question}"] + [f"{EMOJIS[i]} {o}" for i, o in enumerate(p.options)]
|
||
lines.append(f"Vote: !vote {p.id} <1-{len(p.options)}> or react with emoji")
|
||
if p.anon: lines.append("🔒 Anonymous voting (Results hidden until close)")
|
||
if p.end: lines.append(f"⏰ Closes: {p.end}")
|
||
if p.max_votes: lines.append(f"🎯 Closes at {p.max_votes} votes")
|
||
self.send_message(mto=p.room, mbody='\n'.join(lines), mtype='groupchat')
|
||
|
||
def vote(self, msg, args, room):
|
||
parts = args.split()
|
||
if len(parts) != 2: return self.reply(msg, '!vote <poll_id> <number>')
|
||
pid, choice = parts
|
||
if pid not in self.polls: return self.reply(msg, 'Poll not found')
|
||
p = self.polls[pid]
|
||
if p.closed: return self.reply(msg, 'Poll closed')
|
||
if not p.started: return self.reply(msg, 'Poll not started')
|
||
try:
|
||
idx = int(choice) - 1
|
||
assert 0 <= idx < len(p.options)
|
||
except: return self.reply(msg, f'Pick 1-{len(p.options)}')
|
||
|
||
voter = str(msg['from'].bare) # Use bare JID so it works for PMs and MUCs
|
||
changed = voter in p.votes
|
||
p.votes[voter] = idx
|
||
self.save_polls()
|
||
|
||
# If anon, don't leak the choice in the public reply
|
||
if p.anon:
|
||
self.reply(msg, 'Vote recorded (Anonymous)')
|
||
else:
|
||
self.reply(msg, f'{"Changed to" if changed else "Voted"}: {p.options[idx]}')
|
||
|
||
if p.max_votes and len(p.votes) >= p.max_votes:
|
||
self.close(pid, auto=True)
|
||
|
||
def close_cmd(self, msg, args, room):
|
||
pid = args.strip()
|
||
if pid not in self.polls: return self.reply(msg, 'Not found')
|
||
p = self.polls[pid]
|
||
# In MUC, simple string comparison might fail if nick/resource varies.
|
||
# Simple auth check:
|
||
if str(msg['from']) != p.creator and str(msg['from'].bare) != p.creator:
|
||
return self.reply(msg, 'Only creator can close')
|
||
self.close(pid)
|
||
self.reply(msg, f'Closed {pid}')
|
||
|
||
def close(self, pid, auto=False):
|
||
p = self.polls[pid]
|
||
if p.closed: return
|
||
p.closed = True
|
||
self.save_polls()
|
||
cnt = [sum(1 for v in p.votes.values() if v == i) for i in range(len(p.options))]
|
||
total = sum(cnt)
|
||
lines = [f"📊 Results - {p.id}: {p.question}"]
|
||
winner_idx = cnt.index(max(cnt)) if total else -1
|
||
for i, o in enumerate(p.options):
|
||
pct = cnt[i]/total*100 if total else 0
|
||
bar = '▓' * int(pct/5) + '░' * (20-int(pct/5))
|
||
win = ' 🏆' if i == winner_idx and total else ''
|
||
lines.append(f"{EMOJIS[i]} {o}: {cnt[i]} ({pct:.0f}%) {bar}{win}")
|
||
lines.append(f"📈 Total: {total} vote{'s' if total!=1 else ''}{' (auto-closed)' if auto else ''}")
|
||
self.send_message(mto=p.room, mbody='\n'.join(lines), mtype='groupchat')
|
||
|
||
def status(self, msg, args, room):
|
||
pid = args.strip()
|
||
if pid not in self.polls: return self.reply(msg, 'Not found')
|
||
p = self.polls[pid]
|
||
info = [f"Poll {pid}: {p.question}", f"Status: {'closed' if p.closed else 'scheduled' if not p.started else 'open'}"]
|
||
if p.anon:
|
||
info.append(f"Votes: {len(p.votes)} (anonymous)")
|
||
else:
|
||
cnt = [sum(1 for v in p.votes.values() if v == i) for i in range(len(p.options))]
|
||
info += [f"{o}: {cnt[i]}" for i, o in enumerate(p.options)]
|
||
if p.end: info.append(f"Ends: {p.end}")
|
||
self.reply(msg, '\n'.join(info))
|
||
|
||
def list_polls(self, msg, args, room):
|
||
# Filter polls by the current room
|
||
active = [p for p in self.polls.values() if not p.closed and p.room == room]
|
||
if not active: return self.reply(msg, 'No active polls in this room')
|
||
lines = [f"{p.id}: {p.question[:50]}{'...' if len(p.question)>50 else ''} [{'scheduled' if not p.started else 'open'}]" for p in active]
|
||
self.reply(msg, '\n'.join(lines))
|
||
|
||
def delete(self, msg, args, room):
|
||
pid = args.strip()
|
||
if pid not in self.polls: return self.reply(msg, 'Not found')
|
||
if str(msg['from']) != self.polls[pid].creator: return self.reply(msg, 'Only creator can delete')
|
||
del self.polls[pid]
|
||
self.save_polls()
|
||
self.reply(msg, f'Deleted {pid}')
|
||
|
||
def help(self, msg, args, room):
|
||
self.reply(msg, '''📊 Poll Bot Commands:
|
||
!poll "Question" "Opt1" "Opt2" [options] - Create poll
|
||
Options: duration=1h/1d anon=true votes=N start=YYYY-MM-DDTHH:MM
|
||
!vote <id> <num> - Cast vote (or react with 1️⃣2️⃣3️⃣ emoji)
|
||
!close <id> - Close poll & show results
|
||
!status <id> - Check poll status
|
||
!list - List active polls
|
||
!delete <id> - Delete your poll''')
|
||
|
||
def reply(self, msg, text):
|
||
# CHANGED: Detect message type to reply in context
|
||
mtype = msg['type']
|
||
if mtype == 'groupchat':
|
||
# In groupchat, reply to the bare JID (the room)
|
||
self.send_message(mto=msg['from'].bare, mbody=text, mtype='groupchat')
|
||
else:
|
||
# In normal chat, reply to the sender
|
||
self.send_message(mto=msg['from'], mbody=text, mtype='chat')
|
||
|
||
def parse_dur(self, s):
|
||
m = re.search(r'duration=(\d+)([mhd])', s.lower())
|
||
if not m: return None
|
||
v, u = int(m[1]), m[2]
|
||
return {'m': timedelta(minutes=v), 'h': timedelta(hours=v), 'd': timedelta(days=v)}[u]
|
||
|
||
def parse_time(self, s, key):
|
||
m = re.search(rf'{key}=(\d{{4}}-\d{{2}}-\d{{2}}T\d{{2}}:\d{{2}})', s)
|
||
return m[1] if m else None
|
||
|
||
def parse_int(self, s, key):
|
||
m = re.search(rf'{key}=(\d+)', s)
|
||
return int(m[1]) if m else None
|
||
|
||
if __name__ == '__main__':
|
||
async def main():
|
||
logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s: %(message)s')
|
||
c = configparser.ConfigParser()
|
||
c.read('pollconfig.ini')
|
||
|
||
loop = asyncio.get_running_loop()
|
||
|
||
bot = PollBot(
|
||
c['XMPP']['jid'], c['XMPP']['password'],
|
||
[r.strip() for r in c['XMPP']['rooms'].split(',')],
|
||
c.get('Bot', 'nickname', fallback='PollBot'),
|
||
c.get('Bot', 'store_path', fallback='polls.json'),
|
||
loop=loop)
|
||
|
||
bot.connect()
|
||
logging.info("Poll bot connected...")
|
||
|
||
await bot.disconnected
|
||
|
||
asyncio.run(main())
|
||
|