From 130d3f6bb2b1897bfe5ad7f647254db351385b5d Mon Sep 17 00:00:00 2001 From: just n Date: Thu, 15 Jan 2026 20:42:13 +0000 Subject: [PATCH] Add mafia.py --- mafia.py | 1717 ++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 1717 insertions(+) create mode 100644 mafia.py diff --git a/mafia.py b/mafia.py new file mode 100644 index 0000000..12a3e56 --- /dev/null +++ b/mafia.py @@ -0,0 +1,1717 @@ +#!/usr/bin/env python3 +import slixmpp +import random +import asyncio +import logging +import configparser +from enum import Enum +from typing import Dict, Set, Optional, List, Tuple, Any +from dataclasses import dataclass, field +from difflib import SequenceMatcher +import time +import json +from pathlib import Path + +from slixmpp.jid import JID +from slixmpp.stanza import Message + + +# -------------------- DATA STRUCTURES & SETTINGS -------------------- + +class Role(Enum): + # These are the specific jobs players can have + VILLAGER = "Villager" + MAFIA = "Mafia" + DETECTIVE = "Detective" + DOCTOR = "Doctor" + + @property + def team(self) -> str: + # Used to calculate who wins at the end + if self == Role.MAFIA: + return "Mafia" + return "Village" + + @property + def description(self) -> str: + # The text sent to players explaining what they should do + descriptions = { + Role.VILLAGER: "A regular villager. Use your wits to find the Mafia!", + Role.MAFIA: "You are Mafia! Kill villagers at night without being detected.", + Role.DETECTIVE: "Investigate one player each night to learn if they are Mafia.", + Role.DOCTOR: "Protect one player each night from being killed." + } + return descriptions.get(self, "Unknown role") + + +class Phase(Enum): + # The different stages of the game loop + LOBBY = "lobby" # Waiting for players to join + NIGHT = "night" # Mafia is killing, special roles are acting + DAY = "day" # Everyone is discussing and voting + ENDED = "ended" # Game over screen + + +@dataclass +class Player: + # Keeps track of a single person in the game + jid: str # Their chat room address (e.g., room@conference.server/Nickname) + nick: str # Their display name + bare_jid: str = "" # Their real address, used to send them private secrets + role: Optional[Role] = None + alive: bool = True # False means they are dead and can't vote + + def __hash__(self): + # Needed to store players in a list without duplicates + return hash(self.jid) + + def __eq__(self, other): + # Checks if two player objects are actually the same person + if isinstance(other, Player): + return self.jid == other.jid + return False + + +@dataclass +class GameConfig: + # All the settings the host can change with the !config command + mafia_count: int = 0 # 0 means the bot decides automatically based on player count + has_detective: bool = False + has_doctor: bool = True + day_duration: int = 120 # How long players have to argue (seconds) + night_duration: int = 60 # How long special roles have to act (seconds) + auto_switch: bool = True # If True, the timer advances the game automatically + auto_next: bool = True # If True, game advances immediately when everyone has acted + reveal_role_on_death: bool = True # Show if the dead person was Mafia or not + allow_no_kill: bool = True # Allows voting to "skip" instead of killing someone + min_players: int = 4 # Minimum people needed to start + + def to_dict(self) -> dict: + # Converts settings to a format we can easily read/print + return { + "mafia_count": self.mafia_count, + "has_detective": self.has_detective, + "has_doctor": self.has_doctor, + "day_duration": self.day_duration, + "night_duration": self.night_duration, + "auto_switch": self.auto_switch, + "auto_next": self.auto_next, + "reveal_role_on_death": self.reveal_role_on_death, + "allow_no_kill": self.allow_no_kill, + "min_players": self.min_players + } + + @classmethod + def from_dict(cls, data: dict) -> 'GameConfig': + # Loads settings back from a dictionary + config = cls() + for key, value in data.items(): + if hasattr(config, key): + setattr(config, key, value) + return config + + +# -------------------- CORE GAME LOGIC -------------------- + +class MafiaGame: + # This class handles all the rules, voting, and state for ONE specific chat room + + def __init__(self, room_jid: str, host_jid: str, host_nick: str): + self.room_jid = room_jid + self.host_jid = host_jid + self.host_nick = host_nick # The person who started the game + self.players: Dict[str, Player] = {} # List of everyone playing + self.phase = Phase.LOBBY + self.day_number = 0 + self.config = GameConfig() + + # Memory of what happened during the night + self.mafia_votes: Dict[str, str] = {} # Who the mafia members voted to kill + self.mafia_kill_target: Optional[str] = None # The final victim + self.detective_target: Optional[str] = None # Who the detective checked + self.doctor_target: Optional[str] = None # Who the doctor saved + self.protected_last_night: Optional[str] = None # Prevents saving same person twice + + # Memory of what is happening during the day + self.day_votes: Dict[str, str] = {} # Who voted for whom to be hanged + + # Timers and Logs + self.phase_task: Optional[asyncio.Task] = None # The background timer + self.phase_start_time: float = 0 + self.game_log: List[str] = [] # A history of events + + def log(self, message: str): + # Saves an event to the logs (for debugging or history) + timestamp = time.strftime("%H:%M:%S") + self.game_log.append(f"[{timestamp}] {message}") + logging.info(f"[Game {self.room_jid}] {message}") + + # -------------------- PLAYER MANAGEMENT -------------------- + + def add_player(self, nick: str, jid: str, bare_jid: str = "") -> Tuple[bool, str]: + # Tries to add a user to the lobby + if self.phase != Phase.LOBBY: + return False, "❌ The game is already running. Please wait for it to finish!" + + if nick in self.players: + return False, f"❌ {nick} is already on the list!" + + self.players[nick] = Player(jid=jid, nick=nick, bare_jid=bare_jid) + self.log(f"{nick} joined the game") + + return True, f"✅ {nick} has joined! (Total: {len(self.players)})" + + def remove_player(self, nick: str) -> Tuple[bool, str]: + # Removes a user from the lobby + if self.phase != Phase.LOBBY: + return False, "❌ You can't leave while the game is running (it ruins the balance)!" + + if nick not in self.players: + return False, "❌ You aren't in the player list!" + + del self.players[nick] + self.log(f"{nick} left the game") + + return True, f"👋 {nick} has left. (Total: {len(self.players)})" + + def find_player(self, name: str, alive_only: bool = True) -> Optional[Player]: + # Smart search: finds a player even if you only type part of their name + name_lower = name.lower().strip() + + candidates = self.players.values() + if alive_only: + candidates = [p for p in candidates if p.alive] + + # 1. Exact match (e.g., "John") + for p in candidates: + if p.nick.lower() == name_lower: + return p + + # 2. Starts with (e.g., "Jo" -> "John") + for p in candidates: + if p.nick.lower().startswith(name_lower): + return p + + # 3. Contains (e.g., "oh" -> "John") + for p in candidates: + if name_lower in p.nick.lower(): + return p + + # 4. Fuzzy match (handles small typos) + best_match = None + best_ratio = 0.6 + + for p in candidates: + ratio = SequenceMatcher(None, name_lower, p.nick.lower()).ratio() + if ratio > best_ratio: + best_ratio = ratio + best_match = p + + return best_match + + def get_player_by_nick(self, nick: str) -> Optional[Player]: + # Simple lookup by exact nickname + return self.players.get(nick) + + def get_alive_players(self) -> List[Player]: + # Returns a list of everyone who hasn't been killed yet + return [p for p in self.players.values() if p.alive] + + def get_dead_players(self) -> List[Player]: + # Returns a list of the spectators/dead players + return [p for p in self.players.values() if not p.alive] + + def get_mafia_members(self) -> List[Player]: + # Returns the list of active bad guys + return [p for p in self.players.values() if p.role == Role.MAFIA and p.alive] + + def get_player_by_role(self, role: Role, alive_only: bool = True) -> Optional[Player]: + # Finds a specific special role (like the Doctor) + for p in self.players.values(): + if p.role == role: + if alive_only and not p.alive: + continue + return p + return None + + # -------------------- PHASE CONTROL -------------------- + + def can_start(self) -> Tuple[bool, str]: + # Checks if we have enough people to launch + player_count = len(self.players) + + if player_count < self.config.min_players: + return False, f"❌ Not enough players! Need at least {self.config.min_players}. (Current: {player_count})" + + return True, "" + + def assign_roles(self) -> List[Tuple[Player, Role]]: + # Randomly decides who is Mafia, who is Villager, etc. + player_count = len(self.players) + + # Decide how many Mafia to have + if self.config.mafia_count > 0: + mafia_count = min(self.config.mafia_count, player_count // 3) + else: + # Default rule: About 1/3rd of players are Mafia + mafia_count = max(1, player_count // 3) + + nicks = list(self.players.keys()) + random.shuffle(nicks) + + assignments = [] + role_idx = 0 + + # 1. Pick Mafia + for i in range(mafia_count): + player = self.players[nicks[role_idx]] + player.role = Role.MAFIA + assignments.append((player, Role.MAFIA)) + role_idx += 1 + + # 2. Pick Detective (if turned on) + if self.config.has_detective and role_idx < len(nicks): + player = self.players[nicks[role_idx]] + player.role = Role.DETECTIVE + assignments.append((player, Role.DETECTIVE)) + role_idx += 1 + + # 3. Pick Doctor (if turned on) + if self.config.has_doctor and role_idx < len(nicks): + player = self.players[nicks[role_idx]] + player.role = Role.DOCTOR + assignments.append((player, Role.DOCTOR)) + role_idx += 1 + + # 4. Everyone else is a Villager + while role_idx < len(nicks): + player = self.players[nicks[role_idx]] + player.role = Role.VILLAGER + assignments.append((player, Role.VILLAGER)) + role_idx += 1 + + self.log(f"Roles assigned: {mafia_count} Mafia, {player_count - mafia_count} Village") + return assignments + + def start_game(self) -> Tuple[bool, str, List[Tuple[Player, Role]]]: + # Moves the game from Lobby to the first Night + can_start, reason = self.can_start() + if not can_start: + return False, reason, [] + + assignments = self.assign_roles() + self.phase = Phase.NIGHT + self.day_number = 1 + self.phase_start_time = time.time() + + self.log("Game started!") + + return True, self.get_night_start_message(), assignments + + def start_night(self) -> str: + # Sets up the Night phase + self.phase = Phase.NIGHT + self.phase_start_time = time.time() + + # Clear all the actions from the previous night + self.mafia_votes = {} + self.mafia_kill_target = None + self.detective_target = None + self.doctor_target = None + + self.log(f"Night {self.day_number} started") + + return self.get_night_start_message() + + def start_day(self, night_results: str = "") -> str: + # Sets up the Day phase + self.phase = Phase.DAY + self.phase_start_time = time.time() + + # Clear all votes from yesterday + self.day_votes = {} + + self.log(f"Day {self.day_number} started") + + message = self.get_day_start_message(night_results) + return message + + def check_phase_completion(self) -> bool: + """ + Checks if everyone has acted. + If 'auto_next' is on, this returns True when the phase can end early. + """ + if not self.config.auto_next: + return False + + if self.phase == Phase.DAY: + # Day ends if every living player has cast a vote + living_players = len(self.get_alive_players()) + votes_cast = len(self.day_votes) + return votes_cast >= living_players + + elif self.phase == Phase.NIGHT: + # Night ends if Mafia, Doc, and Detective have all done their jobs + + # 1. Have all living Mafia members voted? + mafia_members = self.get_mafia_members() + mafia_done = len(self.mafia_votes) >= len(mafia_members) + + # 2. Has Detective acted? (If they exist and are alive) + det = self.get_player_by_role(Role.DETECTIVE) + det_done = True + if det and det.alive and self.detective_target is None: + det_done = False + + # 3. Has Doctor acted? (If they exist and are alive) + doc = self.get_player_by_role(Role.DOCTOR) + doc_done = True + if doc and doc.alive and self.doctor_target is None: + doc_done = False + + return mafia_done and det_done and doc_done + + return False + + def get_night_start_message(self) -> str: + # The text everyone sees when night falls + alive_count = len(self.get_alive_players()) + mafia_count = len(self.get_mafia_members()) + + msg = f"""🌙 NIGHT {self.day_number} 🌙 + +The village falls asleep... 😴 + +• {alive_count} players alive ({mafia_count} Mafia hiding among you) +• Mafia: whisper your kill vote +• Special roles: perform your night actions + +💀 The night is dark and full of terrors...""" + + if self.config.auto_switch and self.config.night_duration > 0: + msg += f"\n⏰ Night ends in {self.config.night_duration} seconds" + if self.config.auto_next: + msg += " (or sooner if everyone acts)" + else: + msg += "\n⏰ Host: use !mafia next to advance" + + return msg + + def get_day_start_message(self, night_results: str = "") -> str: + # The text everyone sees when the sun rises + alive = self.get_alive_players() + alive_count = len(alive) + + msg = f"""☀️ DAY {self.day_number} ☀️ + +The sun rises over the village... + +{night_results} + +• {alive_count} players remain alive +• Discuss and find the Mafia! +• Whisper your vote to eliminate a suspect + +📋 Alive: {', '.join(p.nick for p in alive)}""" + + if self.config.auto_switch and self.config.day_duration > 0: + msg += f"\n⏰ Day ends in {self.config.day_duration} seconds" + if self.config.auto_next: + msg += " (or sooner if everyone votes)" + else: + msg += "\n⏰ Host: use !mafia next to advance" + + if self.config.allow_no_kill: + msg += "\n💡 Vote 'skip' to skip elimination" + + return msg + + # -------------------- NIGHT ACTION LOGIC -------------------- + + def mafia_vote(self, voter_nick: str, target_name: str) -> Tuple[bool, str, Optional[str], bool]: + """ + Handles a Mafia member voting for a kill. + Returns: (success, reply_msg, broadcast_msg, phase_complete) + """ + voter = self.get_player_by_nick(voter_nick) + + if not voter or voter.role != Role.MAFIA: + return False, "❌ Only Mafia members can vote to kill!", None, False + + if not voter.alive: + return False, "❌ Dead players cannot vote!", None, False + + if self.phase != Phase.NIGHT: + return False, "❌ You can only kill during the night!", None, False + + target = self.find_player(target_name) + if not target: + return False, f"❌ Player '{target_name}' not found!", None, False + + if not target.alive: + return False, f"❌ {target.nick} is already dead!", None, False + + if target.role == Role.MAFIA: + return False, "❌ You cannot kill your fellow Mafia member!", None, False + + self.mafia_votes[voter_nick] = target.nick + + # Count current votes to show the mafia team status + mafia_members = self.get_mafia_members() + votes_cast = len(self.mafia_votes) + + vote_counts: Dict[str, int] = {} + for target_nick in self.mafia_votes.values(): + vote_counts[target_nick] = vote_counts.get(target_nick, 0) + 1 + + vote_summary = ", ".join(f"{v}: {k}" for k, v in vote_counts.items()) + response = f"🗳️ You voted to kill {target.nick}\n" + response += f"📊 Votes ({votes_cast}/{len(mafia_members)}): {vote_summary}" + + # Announcement for other mafia members + announcement = None + if len(mafia_members) > 1: + announcement = f"🔪 {voter_nick} voted to kill {target.nick} ({votes_cast}/{len(mafia_members)})" + + # Check if we can end the night early + phase_complete = self.check_phase_completion() + + return True, response, announcement, phase_complete + + def detective_investigate(self, detective_nick: str, target_name: str) -> Tuple[bool, str, bool]: + # Handles the Detective checking someone's role + detective = self.get_player_by_nick(detective_nick) + + if not detective or detective.role != Role.DETECTIVE: + return False, "❌ Only the Detective can investigate!", False + + if not detective.alive: + return False, "❌ Dead players cannot use abilities!", False + + if self.phase != Phase.NIGHT: + return False, "❌ You can only investigate during the night!", False + + if self.detective_target is not None: + return False, f"❌ You already investigated {self.detective_target} tonight!", False + + target = self.find_player(target_name) + if not target: + return False, f"❌ Player '{target_name}' not found!", False + + if not target.alive: + return False, f"❌ {target.nick} is already dead!", False + + if target.nick == detective_nick: + return False, "❌ You cannot investigate yourself!", False + + self.detective_target = target.nick + + # Result logic + is_mafia = target.role == Role.MAFIA + + if is_mafia: + result = f"🔍 {target.nick} is MAFIA! 🔴" + else: + result = f"🔍 {target.nick} is NOT MAFIA ✅" + + self.log(f"Detective investigated {target.nick}: {'Mafia' if is_mafia else 'Not Mafia'}") + + phase_complete = self.check_phase_completion() + return True, result, phase_complete + + def doctor_protect(self, doctor_nick: str, target_name: str) -> Tuple[bool, str, bool]: + # Handles the Doctor trying to save someone + doctor = self.get_player_by_nick(doctor_nick) + + if not doctor or doctor.role != Role.DOCTOR: + return False, "❌ Only the Doctor can protect players!", False + + if not doctor.alive: + return False, "❌ Dead players cannot use abilities!", False + + if self.phase != Phase.NIGHT: + return False, "❌ You can only protect during the night!", False + + if self.doctor_target is not None: + return False, f"❌ You already chose to protect {self.doctor_target} tonight!", False + + target = self.find_player(target_name) + if not target: + return False, f"❌ Player '{target_name}' not found!", False + + if not target.alive: + return False, f"❌ {target.nick} is already dead!", False + + # Rule: Can't save the same person twice in a row + if self.protected_last_night and target.nick == self.protected_last_night: + return False, f"❌ You cannot protect {target.nick} two nights in a row!", False + + self.doctor_target = target.nick + + self.log(f"Doctor chose to protect {target.nick}") + + phase_complete = self.check_phase_completion() + return True, f"🏥 You will protect {target.nick} tonight.", phase_complete + + def resolve_night(self) -> Tuple[str, Optional[Player], bool]: + """ + Calculates what happened during the night. + Returns: (narrative_text, player_who_died, is_game_over) + """ + killed_player = None + result_parts = [] + + # 1. Decide who Mafia kills (based on majority vote) + if self.mafia_votes: + vote_counts: Dict[str, int] = {} + for target_nick in self.mafia_votes.values(): + vote_counts[target_nick] = vote_counts.get(target_nick, 0) + 1 + + if vote_counts: + max_votes = max(vote_counts.values()) + # If there is a tie, pick one randomly + top_targets = [t for t, v in vote_counts.items() if v == max_votes] + self.mafia_kill_target = random.choice(top_targets) + + # 2. Update Doctor's history + if self.doctor_target: + self.protected_last_night = self.doctor_target + self.log(f"Doctor protected {self.doctor_target}") + else: + self.protected_last_night = None + + # 3. Apply the kill + if self.mafia_kill_target: + target = self.get_player_by_nick(self.mafia_kill_target) + + if target and target.alive: + if self.doctor_target == self.mafia_kill_target: + # Saved by the doc! + result_parts.append(f"🏥 The Doctor saved someone from death!") + self.log(f"Doctor saved {self.mafia_kill_target} from Mafia") + else: + # Player dies + target.alive = False + killed_player = target + + role_reveal = "" + if self.config.reveal_role_on_death: + role_reveal = f" They were a {target.role.value}." + + result_parts.append(f"💀 {target.nick} was killed by the Mafia!{role_reveal}") + self.log(f"{target.nick} ({target.role.value}) was killed by Mafia") + else: + result_parts.append("🌙 The night passed peacefully... no one died!") + self.log("No one was killed during the night") + + result_message = "\n".join(result_parts) if result_parts else "The night was uneventful." + + # 4. Check if the game is over + winner = self.check_win_condition() + game_ended = winner is not None + + if game_ended: + result_message += f"\n\n🏆 {winner.upper()} WINS! 🏆" + self.phase = Phase.ENDED + + return result_message, killed_player, game_ended + + # -------------------- DAY ACTION LOGIC -------------------- + + def day_vote(self, voter_nick: str, target_name: str) -> Tuple[bool, str, str, bool]: + """ + Handles a player voting to eliminate someone. + Returns: (success, reply_msg, broadcast_msg, phase_complete) + """ + voter = self.get_player_by_nick(voter_nick) + + if not voter: + return False, "❌ You're not in this game!", "", False + + if not voter.alive: + return False, "❌ Dead players cannot vote!", "", False + + if self.phase != Phase.DAY: + return False, "❌ You can only vote during the day!", "", False + + # Logic for skipping + if target_name.lower() == "skip": + if not self.config.allow_no_kill: + return False, "❌ Skipping elimination is not allowed!", "", False + + old_vote = self.day_votes.get(voter_nick) + self.day_votes[voter_nick] = "skip" + + if old_vote: + announcement = f"🗳️ {voter_nick} changed vote to SKIP" + else: + announcement = f"🗳️ {voter_nick} votes to SKIP" + + phase_complete = self.check_phase_completion() + return True, "✅ You voted to skip elimination.", announcement, phase_complete + + # Logic for voting a specific person + target = self.find_player(target_name) + if not target: + return False, f"❌ Player '{target_name}' not found!", "", False + + if not target.alive: + return False, f"❌ {target.nick} is already dead!", "", False + + if target.nick == voter_nick: + return False, "❌ You cannot vote for yourself!", "", False + + old_vote = self.day_votes.get(voter_nick) + self.day_votes[voter_nick] = target.nick + + if old_vote: + announcement = f"🗳️ {voter_nick}: {old_vote} → {target.nick}" + else: + announcement = f"🗳️ {voter_nick} votes for {target.nick}" + + vote_tally = self.get_vote_tally() + + phase_complete = self.check_phase_completion() + return True, f"✅ You voted to eliminate {target.nick}\n\n{vote_tally}", announcement, phase_complete + + def unvote(self, voter_nick: str) -> Tuple[bool, str, str]: + # Removes a player's vote + voter = self.get_player_by_nick(voter_nick) + + if not voter: + return False, "❌ You're not in this game!", "" + + if not voter.alive: + return False, "❌ Dead players cannot vote!", "" + + if self.phase != Phase.DAY: + return False, "❌ You can only change votes during the day!", "" + + if voter_nick not in self.day_votes: + return False, "❌ You haven't voted yet!", "" + + old_vote = self.day_votes.pop(voter_nick) + announcement = f"🗳️ {voter_nick} removed vote (was: {old_vote})" + + return True, "✅ Your vote has been removed.", announcement + + def get_vote_tally(self) -> str: + # Creates a text summary of who voted for whom + alive_count = len(self.get_alive_players()) + votes_cast = len(self.day_votes) + majority = (alive_count // 2) + 1 + + vote_counts: Dict[str, List[str]] = {} + for voter, target in self.day_votes.items(): + if target not in vote_counts: + vote_counts[target] = [] + vote_counts[target].append(voter) + + if not vote_counts: + return f"📊 No votes yet (need {majority} for majority)" + + # Sort by most votes first + sorted_targets = sorted(vote_counts.items(), key=lambda x: len(x[1]), reverse=True) + + lines = [f"📊 Votes ({votes_cast}/{alive_count}, need {majority}):"] + for target, voters in sorted_targets: + voter_list = ", ".join(voters) + lines.append(f" {target}: {len(voters)} ({voter_list})") + + return "\n".join(lines) + + def resolve_day(self) -> Tuple[str, Optional[Player], bool]: + """ + Calculates who gets eliminated at the end of the day. + Returns: (narrative, eliminated_player, is_game_over) + """ + alive_players = self.get_alive_players() + alive_count = len(alive_players) + majority = (alive_count // 2) + 1 + + vote_counts: Dict[str, int] = {} + for target in self.day_votes.values(): + vote_counts[target] = vote_counts.get(target, 0) + 1 + + eliminated_player = None + result_parts = [] + + if not vote_counts: + result_parts.append("📭 No votes cast. No one eliminated.") + self.log("No votes cast during the day") + else: + max_votes = max(vote_counts.values()) + top_targets = [t for t, v in vote_counts.items() if v == max_votes] + + if "skip" in top_targets and max_votes >= majority: + result_parts.append("⏭️ Village voted to skip elimination.") + self.log("Village voted to skip elimination") + elif max_votes >= majority: + # Elimination successful + eliminated_nick = random.choice([t for t in top_targets if t != "skip"]) + target = self.get_player_by_nick(eliminated_nick) + + if target: + target.alive = False + eliminated_player = target + + role_reveal = "" + if self.config.reveal_role_on_death: + role_reveal = f" They were a {target.role.value}!" + + result_parts.append(f"⚖️ {target.nick} has been eliminated!{role_reveal}") + self.log(f"{target.nick} ({target.role.value}) was eliminated by vote") + else: + # Not enough votes for a majority + tally = ", ".join(f"{t}: {v}" for t, v in sorted(vote_counts.items(), key=lambda x: -x[1])) + result_parts.append(f"🤷 No majority! Votes: {tally}") + result_parts.append("No one eliminated.") + self.log(f"No majority reached. Votes: {tally}") + + result_message = "\n".join(result_parts) + + winner = self.check_win_condition() + game_ended = winner is not None + + if game_ended: + result_message += f"\n\n🏆 {winner.upper()} WINS! 🏆" + self.phase = Phase.ENDED + + return result_message, eliminated_player, game_ended + + # -------------------- END GAME & STATUS -------------------- + + def check_win_condition(self) -> Optional[str]: + # Checks if either side has won + alive = self.get_alive_players() + mafia_alive = len([p for p in alive if p.role == Role.MAFIA]) + village_alive = len(alive) - mafia_alive + + if mafia_alive == 0: + self.log("Village wins - all Mafia eliminated!") + return "Village" + + if mafia_alive >= village_alive: + self.log("Mafia wins - numerical parity reached!") + return "Mafia" + + return None + + def get_end_game_summary(self) -> str: + # Generates the final scoreboard + lines = ["🏁 GAME OVER 🏁", ""] + lines.append("Final Roles:") + + by_role: Dict[Role, List[Player]] = {} + for p in self.players.values(): + if p.role not in by_role: + by_role[p.role] = [] + by_role[p.role].append(p) + + for role in [Role.MAFIA, Role.DETECTIVE, Role.DOCTOR, Role.VILLAGER]: + if role in by_role: + players = by_role[role] + status = [f"{p.nick} ({'💀' if not p.alive else '✅'})" for p in players] + lines.append(f" {role.value}: {', '.join(status)}") + + lines.append("") + lines.append(f"📊 Game lasted {self.day_number} day(s)") + + return "\n".join(lines) + + def get_status(self) -> str: + # Returns current state (used for !status command) + lines = [] + + if self.phase == Phase.LOBBY: + lines.append("🎮 Status: LOBBY") + lines.append(f"👥 Players ({len(self.players)}):") + for nick in self.players.keys(): + lines.append(f" • {nick}") + lines.append("") + lines.append(f"🎯 Host: {self.host_nick}") + lines.append(f"⚙️ Min players: {self.config.min_players}") + + elif self.phase == Phase.NIGHT: + lines.append(f"🌙 NIGHT {self.day_number}") + alive = self.get_alive_players() + lines.append(f"👥 {len(alive)} players alive") + lines.append("Waiting for night actions...") + + elif self.phase == Phase.DAY: + lines.append(f"☀️ DAY {self.day_number}") + alive = self.get_alive_players() + lines.append(f"👥 {len(alive)} players alive:") + for p in alive: + lines.append(f" • {p.nick}") + lines.append("") + lines.append(self.get_vote_tally()) + + elif self.phase == Phase.ENDED: + lines.append(self.get_end_game_summary()) + + return "\n".join(lines) + + def get_config_display(self) -> str: + # Returns current settings (used for !config command) + lines = ["⚙️ Game Configuration:"] + lines.append(f" Mafia count: {'Auto' if self.config.mafia_count == 0 else self.config.mafia_count}") + lines.append(f" Detective: {'Yes' if self.config.has_detective else 'No'}") + lines.append(f" Doctor: {'Yes' if self.config.has_doctor else 'No'}") + lines.append(f" Auto switch: {'Yes' if self.config.auto_switch else 'No (manual)'}") + lines.append(f" Auto next (skip wait): {'Yes' if self.config.auto_next else 'No'}") + if self.config.auto_switch: + lines.append(f" Day duration: {self.config.day_duration}s") + lines.append(f" Night duration: {self.config.night_duration}s") + lines.append(f" Reveal role on death: {'Yes' if self.config.reveal_role_on_death else 'No'}") + lines.append(f" Allow skip vote: {'Yes' if self.config.allow_no_kill else 'No'}") + lines.append(f" Min players: {self.config.min_players}") + return "\n".join(lines) + + +# -------------------- XMPP BOT CLIENT -------------------- + +class MafiaBot(slixmpp.ClientXMPP): + # The actual bot that connects to the internet and talks to users + + def __init__(self, jid: str, password: str, rooms: List[str], + room_nicknames: Dict[str, str], nickname: str, + trigger: str = "!mafia", admin_users: Optional[List[str]] = None): + + super().__init__(jid, password) + + self.rooms = rooms + self.room_nicknames = room_nicknames or {} + self.nickname = nickname + self.trigger = trigger + self.admin_users = set(u.lower() for u in (admin_users or [])) + + # State storage for active games + self.games: Dict[str, MafiaGame] = {} # Key: room_jid + + # Enable required XMPP extensions + self.register_plugin('xep_0030') # Service Discovery + self.register_plugin('xep_0045') # Multi-User Chat + self.register_plugin('xep_0199') # XMPP Ping + + # Bind event callbacks + self.add_event_handler("session_start", self.start) + self.add_event_handler("groupchat_message", self.handle_groupchat) + self.add_event_handler("message", self.handle_private_message) + + async def start(self, event): + # Called when the bot successfully connects + self.send_presence() + await self.get_roster() + + for room in self.rooms: + nick = self.room_nicknames.get(room, self.nickname) + try: + await self.plugin['xep_0045'].join_muc(room, nick) + logging.info(f"Joined room: {room} as {nick}") + except Exception as e: + logging.error(f"Failed to join {room}: {e}") + + def get_room_nick(self, room_jid: str) -> str: + # Helper to find what nickname the bot uses in a specific room + return self.room_nicknames.get(room_jid, self.nickname) + + async def send_room_message(self, room_jid: str, message: str): + # Sends a public message to the MUC + self.send_message(mto=room_jid, mbody=message, mtype='groupchat') + + async def send_private_muc_message(self, room_jid: str, nick: str, message: str): + # Sends a private whisper to a user inside a MUC context + to_jid = f"{room_jid}/{nick}" + self.send_message(mto=to_jid, mbody=message, mtype='chat') + + async def send_direct_message(self, jid: str, message: str): + # Sends a standard 1-on-1 chat message + self.send_message(mto=jid, mbody=message, mtype='chat') + + # -------------------- INCOMING MESSAGE PROCESSING -------------------- + + async def handle_groupchat(self, msg): + # Processes messages sent to the public chat room + if msg['type'] != 'groupchat': + return + + room_jid = msg['from'].bare + sender_nick = msg['mucnick'] + bot_nick = self.get_room_nick(room_jid) + + # Don't let the bot reply to itself + if sender_nick == bot_nick: + return + + body = msg['body'].strip() + if not body: + return + + cmd_text = None + + # 1. Standard Prefix (e.g. "!mafia join") + if body.startswith(self.trigger): + cmd_text = body[len(self.trigger):].strip() + + # 2. Alias Addressing (e.g. "MafiaBot: join") + else: + # Check for: Space, Comma, Dot, Colon etc. + for character in (" ", ",", ".", ":", ";", "-", "—"): + prefix = bot_nick + character + if body.startswith(prefix): + # Found a match! Remove the name and pass the rest + cmd_text = body[len(prefix):].strip() + break + + if cmd_text is not None: + await self.handle_command(room_jid, sender_nick, cmd_text, is_private=False) + + async def handle_private_message(self, msg): + # Processes direct messages and whispers + if msg['type'] not in ('chat', 'normal'): + return + + from_jid = msg['from'] + body = msg['body'].strip() + + if not body: + return + + # Check if this is a room whisper (room@server/User) + if '/' in str(from_jid): + room_jid = from_jid.bare + sender_nick = from_jid.resource + + # Make sure we actually know this game + if room_jid in self.games: + await self.handle_whisper(room_jid, sender_nick, body, from_jid) + else: + # Direct Message (not inside a room) + sender_bare = from_jid.bare.lower() + + # Find which game this player belongs to + for room_jid, game in self.games.items(): + for player in game.players.values(): + if player.bare_jid.lower() == sender_bare: + await self.handle_whisper(room_jid, player.nick, body, from_jid) + return + + # If we can't find them, tell them + await self.send_direct_message(str(from_jid.bare), + "You're not in any active Mafia game. Join a game room first!") + + async def handle_whisper(self, room_jid: str, sender_nick: str, body: str, from_jid: JID): + # Executes commands sent via private message + game = self.games.get(room_jid) + bot_nick = self.get_room_nick(room_jid) + + if not game: + await self.send_private_muc_message(room_jid, sender_nick, + "No game in this room! Use '!mafia new' to start one.") + return + + # 1. Support "MafiaBot: kill" inside whispers too + # This handles cases where users copy/paste from main chat or get confused. + cmd_body = body + for character in (" ", ",", ".", ":", ";", "-", "—"): + prefix = bot_nick + character + if body.startswith(prefix): + cmd_body = body[len(prefix):].strip() + break + + # Separate command from arguments + parts = cmd_body.split(None, 1) + cmd = parts[0].lower().lstrip('!') + args = parts[1] if len(parts) > 1 else "" + + # Route commands to appropriate handlers + if cmd == "kill": + await self.handle_mafia_kill(game, room_jid, sender_nick, args) + elif cmd == "investigate" or cmd == "inv": + await self.handle_investigate(game, room_jid, sender_nick, args) + elif cmd == "protect" or cmd == "heal": + await self.handle_protect(game, room_jid, sender_nick, args) + elif cmd == "vote": + await self.handle_vote(game, room_jid, sender_nick, args) + elif cmd == "unvote": + await self.handle_unvote(game, room_jid, sender_nick) + elif cmd == "role": + await self.handle_role_query(game, room_jid, sender_nick) + elif cmd == "help": + await self.send_private_muc_message(room_jid, sender_nick, self.get_whisper_help()) + else: + await self.send_private_muc_message(room_jid, sender_nick, + f"Unknown command: {cmd}\nUse 'help' for commands.") + + async def handle_command(self, room_jid: str, sender_nick: str, cmd_text: str, is_private: bool): + # Executes public commands from the group chat + parts = cmd_text.split(None, 1) + + if not parts: + cmd = "help" + args = "" + else: + cmd = parts[0].lower() + args = parts[1] if len(parts) > 1 else "" + + # Route to specific command handlers + if cmd == "new" or cmd == "create": + await self.handle_new_game(room_jid, sender_nick) + elif cmd == "join": + await self.handle_join(room_jid, sender_nick) + elif cmd == "leave": + await self.handle_leave(room_jid, sender_nick) + elif cmd == "start": + await self.handle_start(room_jid, sender_nick) + elif cmd == "end" or cmd == "stop": + await self.handle_end(room_jid, sender_nick) + elif cmd == "status": + await self.handle_status(room_jid) + elif cmd == "players": + await self.handle_players(room_jid) + elif cmd == "config": + await self.handle_config(room_jid, sender_nick, args) + elif cmd == "next": + await self.handle_next_phase(room_jid, sender_nick) + elif cmd == "help": + await self.send_private_muc_message(room_jid, sender_nick, self.get_help_message()) + await self.send_room_message(room_jid, f"📖 Help sent to {sender_nick} via whisper.") + elif cmd == "rules": + await self.send_private_muc_message(room_jid, sender_nick, self.get_rules_message()) + await self.send_room_message(room_jid, f"📜 Rules sent to {sender_nick} via whisper.") + else: + await self.send_room_message(room_jid, + f"Unknown command: {cmd}. Use '{self.trigger} help' for commands.") + + # -------------------- COMMAND IMPLEMENTATIONS -------------------- + + async def handle_new_game(self, room_jid: str, sender_nick: str): + # Logic to initialize a new game instance + if room_jid in self.games and self.games[room_jid].phase != Phase.ENDED: + await self.send_room_message(room_jid, + "❌ A game is already in progress! End it first with '!mafia end'") + return + + sender_jid = f"{room_jid}/{sender_nick}" + self.games[room_jid] = MafiaGame(room_jid, sender_jid, sender_nick) + + await self.send_room_message(room_jid, f"""🎮 NEW MAFIA GAME 🎮 + +{sender_nick} is hosting a new game! + +📝 To join: {self.trigger} join (or "{self.nickname}: join") +⚙️ To configure: {self.trigger} config +🚀 To start: {self.trigger} start + +Type '{self.trigger} help' for all commands!""") + + async def handle_join(self, room_jid: str, sender_nick: str): + # Logic for a player entering the lobby + game = self.games.get(room_jid) + + if not game: + await self.send_room_message(room_jid, + f"❌ No game exists! Create one with '{self.trigger} new'") + return + + sender_jid = f"{room_jid}/{sender_nick}" + + # Attempt to discover the real JID for secure whispering + bare_jid = "" + try: + roster = self.plugin['xep_0045'].get_roster(room_jid) + if sender_nick in roster: + jid_info = roster[sender_nick] + if hasattr(jid_info, 'jid') and jid_info.jid: + bare_jid = str(jid_info.jid.bare) + except: + pass + + success, message = game.add_player(sender_nick, sender_jid, bare_jid) + await self.send_room_message(room_jid, message) + + async def handle_leave(self, room_jid: str, sender_nick: str): + # Logic for a player exiting the lobby + game = self.games.get(room_jid) + + if not game: + await self.send_room_message(room_jid, "❌ No game exists!") + return + + success, message = game.remove_player(sender_nick) + await self.send_room_message(room_jid, message) + + async def handle_start(self, room_jid: str, sender_nick: str): + # Logic to begin the game loop + game = self.games.get(room_jid) + + if not game: + await self.send_room_message(room_jid, + f"❌ No game exists! Create one with '{self.trigger} new'") + return + + if sender_nick != game.host_nick: + await self.send_room_message(room_jid, + f"❌ Only the host ({game.host_nick}) can start the game!") + return + + success, message, assignments = game.start_game() + + if not success: + await self.send_room_message(room_jid, message) + return + + # Publicly announce start + await self.send_room_message(room_jid, message) + + # Privately distribute roles + for player, role in assignments: + role_message = f"""🎭 YOUR ROLE 🎭 + +You are a {role.value}! + +{role.description}""" + + if role == Role.MAFIA: + mafia_members = game.get_mafia_members() + if len(mafia_members) > 1: + other_mafia = [p.nick for p in mafia_members if p.nick != player.nick] + role_message += f"\n\n🔪 Fellow Mafia: {', '.join(other_mafia)}" + role_message += "\n\n📝 Night: whisper '!kill ' to vote" + elif role == Role.DETECTIVE: + role_message += "\n\n📝 Night: whisper '!investigate '" + elif role == Role.DOCTOR: + role_message += "\n\n📝 Night: whisper '!protect '" + + role_message += "\n📝 Day: whisper '!vote ' to eliminate" + + await self.send_private_muc_message(room_jid, player.nick, role_message) + + # Init timer if enabled + if game.config.auto_switch and game.config.night_duration > 0: + game.phase_task = asyncio.create_task( + self.phase_timer(room_jid, game.config.night_duration) + ) + + async def handle_end(self, room_jid: str, sender_nick: str): + # Forcefully terminates the game + game = self.games.get(room_jid) + + if not game: + await self.send_room_message(room_jid, "❌ No game exists!") + return + + is_host = sender_nick == game.host_nick + is_admin = sender_nick.lower() in self.admin_users + + if not (is_host or is_admin): + await self.send_room_message(room_jid, + f"❌ Only the host ({game.host_nick}) can end the game!") + return + + if game.phase_task: + game.phase_task.cancel() + + game.phase = Phase.ENDED + + summary = game.get_end_game_summary() + await self.send_room_message(room_jid, f"🛑 Game ended by {sender_nick}!\n\n{summary}") + + async def handle_status(self, room_jid: str): + # Displays game info + game = self.games.get(room_jid) + + if not game: + await self.send_room_message(room_jid, + f"No active game. Create one with '{self.trigger} new'") + return + + await self.send_room_message(room_jid, game.get_status()) + + async def handle_players(self, room_jid: str): + # Lists current participants + game = self.games.get(room_jid) + + if not game: + await self.send_room_message(room_jid, "No active game.") + return + + if game.phase == Phase.LOBBY: + players = list(game.players.keys()) + await self.send_room_message(room_jid, + f"👥 Players ({len(players)}): {', '.join(players) if players else 'None yet'}") + else: + alive = [p.nick for p in game.get_alive_players()] + dead = [p.nick for p in game.get_dead_players()] + + msg = f"👥 Players:\n" + msg += f" ✅ Alive ({len(alive)}): {', '.join(alive)}\n" + msg += f" 💀 Dead ({len(dead)}): {', '.join(dead) if dead else 'None'}" + + await self.send_room_message(room_jid, msg) + + async def handle_config(self, room_jid: str, sender_nick: str, args: str): + # Modifies game settings + game = self.games.get(room_jid) + + if not game: + await self.send_room_message(room_jid, "❌ No game exists!") + return + + if game.phase != Phase.LOBBY: + await self.send_room_message(room_jid, "❌ Can only configure during lobby!") + return + + if sender_nick != game.host_nick: + await self.send_room_message(room_jid, + f"❌ Only the host ({game.host_nick}) can change settings!") + return + + if not args: + await self.send_private_muc_message(room_jid, sender_nick, game.get_config_display()) + await self.send_room_message(room_jid, f"⚙️ Config sent to {sender_nick} via whisper.") + return + + parts = args.split(None, 1) + option = parts[0].lower() + value = parts[1] if len(parts) > 1 else "" + + try: + if option == "mafia": + game.config.mafia_count = int(value) if value else 0 + await self.send_room_message(room_jid, + f"✅ Mafia count: {'Auto' if game.config.mafia_count == 0 else game.config.mafia_count}") + + elif option == "detective": + game.config.has_detective = value.lower() in ('yes', 'true', 'on', '1') + await self.send_room_message(room_jid, + f"✅ Detective: {'Enabled' if game.config.has_detective else 'Disabled'}") + + elif option == "doctor": + game.config.has_doctor = value.lower() in ('yes', 'true', 'on', '1') + await self.send_room_message(room_jid, + f"✅ Doctor: {'Enabled' if game.config.has_doctor else 'Disabled'}") + + elif option == "auto": + game.config.auto_switch = value.lower() in ('yes', 'true', 'on', '1') + await self.send_room_message(room_jid, + f"✅ Auto phase switch: {'Enabled' if game.config.auto_switch else 'Disabled'}") + + elif option == "autonext" or option == "next": + game.config.auto_next = value.lower() in ('yes', 'true', 'on', '1') + await self.send_room_message(room_jid, + f"✅ Auto next (skip wait): {'Enabled' if game.config.auto_next else 'Disabled'}") + + elif option == "daytime" or option == "day": + game.config.day_duration = int(value) + await self.send_room_message(room_jid, + f"✅ Day duration: {game.config.day_duration}s") + + elif option == "nighttime" or option == "night": + game.config.night_duration = int(value) + await self.send_room_message(room_jid, + f"✅ Night duration: {game.config.night_duration}s") + + elif option == "reveal": + game.config.reveal_role_on_death = value.lower() in ('yes', 'true', 'on', '1') + await self.send_room_message(room_jid, + f"✅ Reveal role on death: {'Yes' if game.config.reveal_role_on_death else 'No'}") + + elif option == "skip": + game.config.allow_no_kill = value.lower() in ('yes', 'true', 'on', '1') + await self.send_room_message(room_jid, + f"✅ Allow skip vote: {'Yes' if game.config.allow_no_kill else 'No'}") + + elif option == "minplayers": + game.config.min_players = max(4, int(value)) + await self.send_room_message(room_jid, + f"✅ Minimum players: {game.config.min_players}") + + else: + await self.send_room_message(room_jid, + f"❌ Unknown option: {option}. Options: mafia, detective, doctor, auto, autonext, daytime, nighttime, reveal, skip, minplayers") + + except ValueError as e: + await self.send_room_message(room_jid, f"❌ Invalid value: {e}") + + async def handle_next_phase(self, room_jid: str, sender_nick: str): + # Manually triggers phase transition + game = self.games.get(room_jid) + + if not game: + await self.send_room_message(room_jid, "❌ No game exists!") + return + + is_host = sender_nick == game.host_nick + is_admin = sender_nick.lower() in self.admin_users + + if not (is_host or is_admin): + await self.send_room_message(room_jid, + f"❌ Only the host ({game.host_nick}) can advance phases!") + return + + await self.advance_phase(room_jid) + + # -------------------- TIMER & SCHEDULING -------------------- + + async def phase_timer(self, room_jid: str, duration: int): + # Background task that waits and then advances the phase + try: + # 1. Wait for half the duration, send warning + await asyncio.sleep(duration // 2) + game = self.games.get(room_jid) + if game and game.phase in (Phase.NIGHT, Phase.DAY): + remaining = duration - (duration // 2) + await self.send_room_message(room_jid, f"⏰ {remaining} seconds remaining!") + + # 2. Wait until 30s remain + await asyncio.sleep(max(0, remaining - 30)) + game = self.games.get(room_jid) + if game and game.phase in (Phase.NIGHT, Phase.DAY): + await self.send_room_message(room_jid, "⏰ 30 seconds remaining!") + + # 3. Final countdown + await asyncio.sleep(30) + + game = self.games.get(room_jid) + if game and game.phase in (Phase.NIGHT, Phase.DAY): + await self.advance_phase(room_jid) + + except asyncio.CancelledError: + pass + + async def advance_phase(self, room_jid: str): + # Executes the transition between Day and Night + game = self.games.get(room_jid) + + if not game: + return + + # Stop existing timer + if game.phase_task: + game.phase_task.cancel() + game.phase_task = None + + if game.phase == Phase.NIGHT: + # Process night results -> Switch to DAY + result_message, killed_player, game_ended = game.resolve_night() + + if game_ended: + summary = game.get_end_game_summary() + await self.send_room_message(room_jid, f"{result_message}\n\n{summary}") + return + + day_message = game.start_day(result_message) + await self.send_room_message(room_jid, day_message) + + # Schedule day timer + if game.config.auto_switch and game.config.day_duration > 0: + game.phase_task = asyncio.create_task( + self.phase_timer(room_jid, game.config.day_duration) + ) + + elif game.phase == Phase.DAY: + # Process day results -> Switch to NIGHT + result_message, eliminated_player, game_ended = game.resolve_day() + + await self.send_room_message(room_jid, result_message) + + if game_ended: + summary = game.get_end_game_summary() + await self.send_room_message(room_jid, summary) + return + + # Start next night + game.day_number += 1 + night_message = game.start_night() + await self.send_room_message(room_jid, night_message) + + # Send private role reminders + await self.send_night_reminders(room_jid, game) + + # Schedule night timer + if game.config.auto_switch and game.config.night_duration > 0: + game.phase_task = asyncio.create_task( + self.phase_timer(room_jid, game.config.night_duration) + ) + + async def send_night_reminders(self, room_jid: str, game: MafiaGame): + # Notifies players that they need to perform night actions + + # Notify Mafia + for mafia in game.get_mafia_members(): + await self.send_private_muc_message(room_jid, mafia.nick, + "🌙 Night time! Whisper '!kill ' to vote for a kill.") + + # Notify Detective + detective = game.get_player_by_role(Role.DETECTIVE) + if detective: + await self.send_private_muc_message(room_jid, detective.nick, + "🌙 Night time! Whisper '!investigate ' to check them.") + + # Notify Doctor + doctor = game.get_player_by_role(Role.DOCTOR) + if doctor: + msg = "🌙 Night time! Whisper '!protect ' to save them." + if game.protected_last_night: + msg += f"\n⚠️ Cannot protect {game.protected_last_night} again." + await self.send_private_muc_message(room_jid, doctor.nick, msg) + + # -------------------- SPECIFIC ACTION HANDLERS -------------------- + + async def handle_mafia_kill(self, game: MafiaGame, room_jid: str, sender_nick: str, target_name: str): + # Processes mafia kill commands + if not target_name: + await self.send_private_muc_message(room_jid, sender_nick, + "Usage: !kill ") + return + + success, response, announcement, phase_complete = game.mafia_vote(sender_nick, target_name) + await self.send_private_muc_message(room_jid, sender_nick, response) + + # Relay vote to other mafia + if announcement: + for mafia in game.get_mafia_members(): + if mafia.nick != sender_nick: + await self.send_private_muc_message(room_jid, mafia.nick, announcement) + + # Check if everyone has acted + if phase_complete: + await self.send_room_message(room_jid, "⚡ Everyone has acted. Ending phase early...") + await self.advance_phase(room_jid) + + async def handle_investigate(self, game: MafiaGame, room_jid: str, sender_nick: str, target_name: str): + # Processes detective commands + if not target_name: + await self.send_private_muc_message(room_jid, sender_nick, + "Usage: !investigate ") + return + + success, response, phase_complete = game.detective_investigate(sender_nick, target_name) + await self.send_private_muc_message(room_jid, sender_nick, response) + + if phase_complete: + await self.send_room_message(room_jid, "⚡ Everyone has acted. Ending phase early...") + await self.advance_phase(room_jid) + + async def handle_protect(self, game: MafiaGame, room_jid: str, sender_nick: str, target_name: str): + # Processes doctor commands + if not target_name: + await self.send_private_muc_message(room_jid, sender_nick, + "Usage: !protect ") + return + + success, response, phase_complete = game.doctor_protect(sender_nick, target_name) + await self.send_private_muc_message(room_jid, sender_nick, response) + + if phase_complete: + await self.send_room_message(room_jid, "⚡ Everyone has acted. Ending phase early...") + await self.advance_phase(room_jid) + + async def handle_vote(self, game: MafiaGame, room_jid: str, sender_nick: str, target_name: str): + # Processes day vote commands + if not target_name: + await self.send_private_muc_message(room_jid, sender_nick, + "Usage: !vote or !vote skip") + return + + success, response, announcement, phase_complete = game.day_vote(sender_nick, target_name) + await self.send_private_muc_message(room_jid, sender_nick, response) + + if success and announcement: + await self.send_room_message(room_jid, announcement) + + # Announce tally + tally = game.get_vote_tally() + await self.send_room_message(room_jid, tally) + + if phase_complete: + await self.send_room_message(room_jid, "⚡ Everyone has voted. Ending phase early...") + await self.advance_phase(room_jid) + + async def handle_unvote(self, game: MafiaGame, room_jid: str, sender_nick: str): + # Processes vote retraction + success, response, announcement = game.unvote(sender_nick) + await self.send_private_muc_message(room_jid, sender_nick, response) + + if success and announcement: + await self.send_room_message(room_jid, announcement) + + async def handle_role_query(self, game: MafiaGame, room_jid: str, sender_nick: str): + # Processes requests to view own role + player = game.get_player_by_nick(sender_nick) + + if not player: + await self.send_private_muc_message(room_jid, sender_nick, + "❌ You're not in this game!") + return + + if game.phase == Phase.LOBBY: + await self.send_private_muc_message(room_jid, sender_nick, + "❓ Roles haven't been assigned yet!") + return + + role_msg = f"🎭 Your role: {player.role.value}\n\n{player.role.description}" + + if player.role == Role.MAFIA: + mafia_members = game.get_mafia_members() + if len(mafia_members) > 1: + others = [p.nick for p in mafia_members if p.nick != sender_nick] + role_msg += f"\n\n🔪 Fellow Mafia: {', '.join(others)}" + else: + role_msg += "\n\n🔪 You are the only Mafia member!" + + await self.send_private_muc_message(room_jid, sender_nick, role_msg) + + # -------------------- INFORMATIONAL STRINGS -------------------- + + def get_help_message(self) -> str: + # Returns general help text + return f"""🎮 MAFIA BOT COMMANDS + +Addressing: + You can use the prefix {self.trigger} OR address me by name: + "{self.nickname}: help", "{self.nickname}, join", etc. + +Game Setup: + {self.trigger} new - Create a new game + {self.trigger} join - Join the game + {self.trigger} leave - Leave (lobby only) + {self.trigger} start - Start game (host) + {self.trigger} end - End game (host) + +During Game: + {self.trigger} status - Show status + {self.trigger} players - List players + {self.trigger} next - Next phase (host) + +Config (lobby, host only): + {self.trigger} config - Show config + {self.trigger} config mafia + {self.trigger} config detective yes/no + {self.trigger} config doctor yes/no + {self.trigger} config auto yes/no + {self.trigger} config autonext yes/no + {self.trigger} config daytime + {self.trigger} config nighttime + {self.trigger} config reveal yes/no + {self.trigger} config skip yes/no + +Info: + {self.trigger} rules - Game rules + {self.trigger} help - This help + +Private commands (whisper): + !role - Check your role + !vote - Vote (day) + !unvote - Remove vote + !kill - Mafia kill (night) + !investigate - Detective (night) + !protect - Doctor (night)""" + + def get_whisper_help(self) -> str: + # Returns help text for private messages + return """🤫 PRIVATE COMMANDS + +Info: + !role - Check your role + !help - This help + +Day Phase: + !vote - Vote to eliminate + !vote skip - Skip elimination + !unvote - Remove your vote + +Night Phase: + !kill - Mafia: vote to kill + !investigate - Detective: check + !protect - Doctor: save + +Tip: You can use partial names.""" + + def get_rules_message(self) -> str: + # Returns the rules of the game + return """📜 MAFIA GAME RULES + +Mafia is a social deduction game where an informed minority (Mafia) battles an uninformed majority (Village). + +ROLES: +🏘️ Villager - Find and eliminate the Mafia +🔪 Mafia - Kill villagers without being caught +🔍 Detective - Investigate one player per night +🏥 Doctor - Protect one player per night + +GAMEPLAY: +🌙 Night Phase: + • Mafia secretly votes to kill someone + • Detective investigates one player + • Doctor protects one player + +☀️ Day Phase: + • The village discusses suspects + • Players vote to eliminate + • Majority vote eliminates a player + +WIN CONDITIONS: +✅ Village wins if all Mafia eliminated +❌ Mafia wins if they equal or outnumber villagers + +TIPS: +• Watch voting patterns +• Mafia knows each other, villagers don't +• Dead players cannot communicate +• Doctor can't protect same person twice in a row + +Good luck! 🎲""" + + +# -------------------- ENTRY POINT -------------------- + +async def main(): + # Bootstrap the application + logging.basicConfig( + level=logging.INFO, + format="%(asctime)s [%(levelname)s] %(message)s" + ) + + # Parse external configuration file + config = configparser.ConfigParser() + config.read('mafiaconfig.ini') + + jid = config['XMPP']['jid'] + password = config['XMPP']['password'] + rooms = [r.strip() for r in config['XMPP']['rooms'].split(',')] + + nickname = config.get('Bot', 'nickname', fallback='MafiaBot') + trigger = config.get('Bot', 'trigger', fallback='!mafia') + admin_users = [u.strip() for u in config.get('Bot', 'admin_users', fallback='').split(',') if u.strip()] + + # Load custom nicknames for specific rooms + room_nicknames = {} + for key in config['Bot']: + if key.startswith('nickname.'): + room = key.split('.', 1)[1].strip() + nick = config['Bot'][key].strip() + if nick: + room_nicknames[room] = nick + + bot = MafiaBot( + jid=jid, + password=password, + rooms=rooms, + room_nicknames=room_nicknames, + nickname=nickname, + trigger=trigger, + admin_users=admin_users + ) + + bot.connect() + logging.info("Mafia Bot starting...") + + await bot.disconnected + + +if __name__ == '__main__': + asyncio.run(main()) +