1718 lines
67 KiB
Python
1718 lines
67 KiB
Python
#!/usr/bin/env python3
|
|
import slixmpp
|
|
import random
|
|
import asyncio
|
|
import logging
|
|
import configparser
|
|
from enum import Enum
|
|
from typing import Dict, Set, Optional, List, Tuple, Any
|
|
from dataclasses import dataclass, field
|
|
from difflib import SequenceMatcher
|
|
import time
|
|
import json
|
|
from pathlib import Path
|
|
|
|
from slixmpp.jid import JID
|
|
from slixmpp.stanza import Message
|
|
|
|
|
|
# -------------------- DATA STRUCTURES & SETTINGS --------------------
|
|
|
|
class Role(Enum):
|
|
# These are the specific jobs players can have
|
|
VILLAGER = "Villager"
|
|
MAFIA = "Mafia"
|
|
DETECTIVE = "Detective"
|
|
DOCTOR = "Doctor"
|
|
|
|
@property
|
|
def team(self) -> str:
|
|
# Used to calculate who wins at the end
|
|
if self == Role.MAFIA:
|
|
return "Mafia"
|
|
return "Village"
|
|
|
|
@property
|
|
def description(self) -> str:
|
|
# The text sent to players explaining what they should do
|
|
descriptions = {
|
|
Role.VILLAGER: "A regular villager. Use your wits to find the Mafia!",
|
|
Role.MAFIA: "You are Mafia! Kill villagers at night without being detected.",
|
|
Role.DETECTIVE: "Investigate one player each night to learn if they are Mafia.",
|
|
Role.DOCTOR: "Protect one player each night from being killed."
|
|
}
|
|
return descriptions.get(self, "Unknown role")
|
|
|
|
|
|
class Phase(Enum):
|
|
# The different stages of the game loop
|
|
LOBBY = "lobby" # Waiting for players to join
|
|
NIGHT = "night" # Mafia is killing, special roles are acting
|
|
DAY = "day" # Everyone is discussing and voting
|
|
ENDED = "ended" # Game over screen
|
|
|
|
|
|
@dataclass
|
|
class Player:
|
|
# Keeps track of a single person in the game
|
|
jid: str # Their chat room address (e.g., room@conference.server/Nickname)
|
|
nick: str # Their display name
|
|
bare_jid: str = "" # Their real address, used to send them private secrets
|
|
role: Optional[Role] = None
|
|
alive: bool = True # False means they are dead and can't vote
|
|
|
|
def __hash__(self):
|
|
# Needed to store players in a list without duplicates
|
|
return hash(self.jid)
|
|
|
|
def __eq__(self, other):
|
|
# Checks if two player objects are actually the same person
|
|
if isinstance(other, Player):
|
|
return self.jid == other.jid
|
|
return False
|
|
|
|
|
|
@dataclass
|
|
class GameConfig:
|
|
# All the settings the host can change with the !config command
|
|
mafia_count: int = 0 # 0 means the bot decides automatically based on player count
|
|
has_detective: bool = False
|
|
has_doctor: bool = True
|
|
day_duration: int = 120 # How long players have to argue (seconds)
|
|
night_duration: int = 60 # How long special roles have to act (seconds)
|
|
auto_switch: bool = True # If True, the timer advances the game automatically
|
|
auto_next: bool = True # If True, game advances immediately when everyone has acted
|
|
reveal_role_on_death: bool = True # Show if the dead person was Mafia or not
|
|
allow_no_kill: bool = True # Allows voting to "skip" instead of killing someone
|
|
min_players: int = 4 # Minimum people needed to start
|
|
|
|
def to_dict(self) -> dict:
|
|
# Converts settings to a format we can easily read/print
|
|
return {
|
|
"mafia_count": self.mafia_count,
|
|
"has_detective": self.has_detective,
|
|
"has_doctor": self.has_doctor,
|
|
"day_duration": self.day_duration,
|
|
"night_duration": self.night_duration,
|
|
"auto_switch": self.auto_switch,
|
|
"auto_next": self.auto_next,
|
|
"reveal_role_on_death": self.reveal_role_on_death,
|
|
"allow_no_kill": self.allow_no_kill,
|
|
"min_players": self.min_players
|
|
}
|
|
|
|
@classmethod
|
|
def from_dict(cls, data: dict) -> 'GameConfig':
|
|
# Loads settings back from a dictionary
|
|
config = cls()
|
|
for key, value in data.items():
|
|
if hasattr(config, key):
|
|
setattr(config, key, value)
|
|
return config
|
|
|
|
|
|
# -------------------- CORE GAME LOGIC --------------------
|
|
|
|
class MafiaGame:
|
|
# This class handles all the rules, voting, and state for ONE specific chat room
|
|
|
|
def __init__(self, room_jid: str, host_jid: str, host_nick: str):
|
|
self.room_jid = room_jid
|
|
self.host_jid = host_jid
|
|
self.host_nick = host_nick # The person who started the game
|
|
self.players: Dict[str, Player] = {} # List of everyone playing
|
|
self.phase = Phase.LOBBY
|
|
self.day_number = 0
|
|
self.config = GameConfig()
|
|
|
|
# Memory of what happened during the night
|
|
self.mafia_votes: Dict[str, str] = {} # Who the mafia members voted to kill
|
|
self.mafia_kill_target: Optional[str] = None # The final victim
|
|
self.detective_target: Optional[str] = None # Who the detective checked
|
|
self.doctor_target: Optional[str] = None # Who the doctor saved
|
|
self.protected_last_night: Optional[str] = None # Prevents saving same person twice
|
|
|
|
# Memory of what is happening during the day
|
|
self.day_votes: Dict[str, str] = {} # Who voted for whom to be hanged
|
|
|
|
# Timers and Logs
|
|
self.phase_task: Optional[asyncio.Task] = None # The background timer
|
|
self.phase_start_time: float = 0
|
|
self.game_log: List[str] = [] # A history of events
|
|
|
|
def log(self, message: str):
|
|
# Saves an event to the logs (for debugging or history)
|
|
timestamp = time.strftime("%H:%M:%S")
|
|
self.game_log.append(f"[{timestamp}] {message}")
|
|
logging.info(f"[Game {self.room_jid}] {message}")
|
|
|
|
# -------------------- PLAYER MANAGEMENT --------------------
|
|
|
|
def add_player(self, nick: str, jid: str, bare_jid: str = "") -> Tuple[bool, str]:
|
|
# Tries to add a user to the lobby
|
|
if self.phase != Phase.LOBBY:
|
|
return False, "❌ The game is already running. Please wait for it to finish!"
|
|
|
|
if nick in self.players:
|
|
return False, f"❌ {nick} is already on the list!"
|
|
|
|
self.players[nick] = Player(jid=jid, nick=nick, bare_jid=bare_jid)
|
|
self.log(f"{nick} joined the game")
|
|
|
|
return True, f"✅ {nick} has joined! (Total: {len(self.players)})"
|
|
|
|
def remove_player(self, nick: str) -> Tuple[bool, str]:
|
|
# Removes a user from the lobby
|
|
if self.phase != Phase.LOBBY:
|
|
return False, "❌ You can't leave while the game is running (it ruins the balance)!"
|
|
|
|
if nick not in self.players:
|
|
return False, "❌ You aren't in the player list!"
|
|
|
|
del self.players[nick]
|
|
self.log(f"{nick} left the game")
|
|
|
|
return True, f"👋 {nick} has left. (Total: {len(self.players)})"
|
|
|
|
def find_player(self, name: str, alive_only: bool = True) -> Optional[Player]:
|
|
# Smart search: finds a player even if you only type part of their name
|
|
name_lower = name.lower().strip()
|
|
|
|
candidates = self.players.values()
|
|
if alive_only:
|
|
candidates = [p for p in candidates if p.alive]
|
|
|
|
# 1. Exact match (e.g., "John")
|
|
for p in candidates:
|
|
if p.nick.lower() == name_lower:
|
|
return p
|
|
|
|
# 2. Starts with (e.g., "Jo" -> "John")
|
|
for p in candidates:
|
|
if p.nick.lower().startswith(name_lower):
|
|
return p
|
|
|
|
# 3. Contains (e.g., "oh" -> "John")
|
|
for p in candidates:
|
|
if name_lower in p.nick.lower():
|
|
return p
|
|
|
|
# 4. Fuzzy match (handles small typos)
|
|
best_match = None
|
|
best_ratio = 0.6
|
|
|
|
for p in candidates:
|
|
ratio = SequenceMatcher(None, name_lower, p.nick.lower()).ratio()
|
|
if ratio > best_ratio:
|
|
best_ratio = ratio
|
|
best_match = p
|
|
|
|
return best_match
|
|
|
|
def get_player_by_nick(self, nick: str) -> Optional[Player]:
|
|
# Simple lookup by exact nickname
|
|
return self.players.get(nick)
|
|
|
|
def get_alive_players(self) -> List[Player]:
|
|
# Returns a list of everyone who hasn't been killed yet
|
|
return [p for p in self.players.values() if p.alive]
|
|
|
|
def get_dead_players(self) -> List[Player]:
|
|
# Returns a list of the spectators/dead players
|
|
return [p for p in self.players.values() if not p.alive]
|
|
|
|
def get_mafia_members(self) -> List[Player]:
|
|
# Returns the list of active bad guys
|
|
return [p for p in self.players.values() if p.role == Role.MAFIA and p.alive]
|
|
|
|
def get_player_by_role(self, role: Role, alive_only: bool = True) -> Optional[Player]:
|
|
# Finds a specific special role (like the Doctor)
|
|
for p in self.players.values():
|
|
if p.role == role:
|
|
if alive_only and not p.alive:
|
|
continue
|
|
return p
|
|
return None
|
|
|
|
# -------------------- PHASE CONTROL --------------------
|
|
|
|
def can_start(self) -> Tuple[bool, str]:
|
|
# Checks if we have enough people to launch
|
|
player_count = len(self.players)
|
|
|
|
if player_count < self.config.min_players:
|
|
return False, f"❌ Not enough players! Need at least {self.config.min_players}. (Current: {player_count})"
|
|
|
|
return True, ""
|
|
|
|
def assign_roles(self) -> List[Tuple[Player, Role]]:
|
|
# Randomly decides who is Mafia, who is Villager, etc.
|
|
player_count = len(self.players)
|
|
|
|
# Decide how many Mafia to have
|
|
if self.config.mafia_count > 0:
|
|
mafia_count = min(self.config.mafia_count, player_count // 3)
|
|
else:
|
|
# Default rule: About 1/3rd of players are Mafia
|
|
mafia_count = max(1, player_count // 3)
|
|
|
|
nicks = list(self.players.keys())
|
|
random.shuffle(nicks)
|
|
|
|
assignments = []
|
|
role_idx = 0
|
|
|
|
# 1. Pick Mafia
|
|
for i in range(mafia_count):
|
|
player = self.players[nicks[role_idx]]
|
|
player.role = Role.MAFIA
|
|
assignments.append((player, Role.MAFIA))
|
|
role_idx += 1
|
|
|
|
# 2. Pick Detective (if turned on)
|
|
if self.config.has_detective and role_idx < len(nicks):
|
|
player = self.players[nicks[role_idx]]
|
|
player.role = Role.DETECTIVE
|
|
assignments.append((player, Role.DETECTIVE))
|
|
role_idx += 1
|
|
|
|
# 3. Pick Doctor (if turned on)
|
|
if self.config.has_doctor and role_idx < len(nicks):
|
|
player = self.players[nicks[role_idx]]
|
|
player.role = Role.DOCTOR
|
|
assignments.append((player, Role.DOCTOR))
|
|
role_idx += 1
|
|
|
|
# 4. Everyone else is a Villager
|
|
while role_idx < len(nicks):
|
|
player = self.players[nicks[role_idx]]
|
|
player.role = Role.VILLAGER
|
|
assignments.append((player, Role.VILLAGER))
|
|
role_idx += 1
|
|
|
|
self.log(f"Roles assigned: {mafia_count} Mafia, {player_count - mafia_count} Village")
|
|
return assignments
|
|
|
|
def start_game(self) -> Tuple[bool, str, List[Tuple[Player, Role]]]:
|
|
# Moves the game from Lobby to the first Night
|
|
can_start, reason = self.can_start()
|
|
if not can_start:
|
|
return False, reason, []
|
|
|
|
assignments = self.assign_roles()
|
|
self.phase = Phase.NIGHT
|
|
self.day_number = 1
|
|
self.phase_start_time = time.time()
|
|
|
|
self.log("Game started!")
|
|
|
|
return True, self.get_night_start_message(), assignments
|
|
|
|
def start_night(self) -> str:
|
|
# Sets up the Night phase
|
|
self.phase = Phase.NIGHT
|
|
self.phase_start_time = time.time()
|
|
|
|
# Clear all the actions from the previous night
|
|
self.mafia_votes = {}
|
|
self.mafia_kill_target = None
|
|
self.detective_target = None
|
|
self.doctor_target = None
|
|
|
|
self.log(f"Night {self.day_number} started")
|
|
|
|
return self.get_night_start_message()
|
|
|
|
def start_day(self, night_results: str = "") -> str:
|
|
# Sets up the Day phase
|
|
self.phase = Phase.DAY
|
|
self.phase_start_time = time.time()
|
|
|
|
# Clear all votes from yesterday
|
|
self.day_votes = {}
|
|
|
|
self.log(f"Day {self.day_number} started")
|
|
|
|
message = self.get_day_start_message(night_results)
|
|
return message
|
|
|
|
def check_phase_completion(self) -> bool:
|
|
"""
|
|
Checks if everyone has acted.
|
|
If 'auto_next' is on, this returns True when the phase can end early.
|
|
"""
|
|
if not self.config.auto_next:
|
|
return False
|
|
|
|
if self.phase == Phase.DAY:
|
|
# Day ends if every living player has cast a vote
|
|
living_players = len(self.get_alive_players())
|
|
votes_cast = len(self.day_votes)
|
|
return votes_cast >= living_players
|
|
|
|
elif self.phase == Phase.NIGHT:
|
|
# Night ends if Mafia, Doc, and Detective have all done their jobs
|
|
|
|
# 1. Have all living Mafia members voted?
|
|
mafia_members = self.get_mafia_members()
|
|
mafia_done = len(self.mafia_votes) >= len(mafia_members)
|
|
|
|
# 2. Has Detective acted? (If they exist and are alive)
|
|
det = self.get_player_by_role(Role.DETECTIVE)
|
|
det_done = True
|
|
if det and det.alive and self.detective_target is None:
|
|
det_done = False
|
|
|
|
# 3. Has Doctor acted? (If they exist and are alive)
|
|
doc = self.get_player_by_role(Role.DOCTOR)
|
|
doc_done = True
|
|
if doc and doc.alive and self.doctor_target is None:
|
|
doc_done = False
|
|
|
|
return mafia_done and det_done and doc_done
|
|
|
|
return False
|
|
|
|
def get_night_start_message(self) -> str:
|
|
# The text everyone sees when night falls
|
|
alive_count = len(self.get_alive_players())
|
|
mafia_count = len(self.get_mafia_members())
|
|
|
|
msg = f"""🌙 NIGHT {self.day_number} 🌙
|
|
|
|
The village falls asleep... 😴
|
|
|
|
• {alive_count} players alive ({mafia_count} Mafia hiding among you)
|
|
• Mafia: whisper your kill vote
|
|
• Special roles: perform your night actions
|
|
|
|
💀 The night is dark and full of terrors..."""
|
|
|
|
if self.config.auto_switch and self.config.night_duration > 0:
|
|
msg += f"\n⏰ Night ends in {self.config.night_duration} seconds"
|
|
if self.config.auto_next:
|
|
msg += " (or sooner if everyone acts)"
|
|
else:
|
|
msg += "\n⏰ Host: use !mafia next to advance"
|
|
|
|
return msg
|
|
|
|
def get_day_start_message(self, night_results: str = "") -> str:
|
|
# The text everyone sees when the sun rises
|
|
alive = self.get_alive_players()
|
|
alive_count = len(alive)
|
|
|
|
msg = f"""☀️ DAY {self.day_number} ☀️
|
|
|
|
The sun rises over the village...
|
|
|
|
{night_results}
|
|
|
|
• {alive_count} players remain alive
|
|
• Discuss and find the Mafia!
|
|
• Whisper your vote to eliminate a suspect
|
|
|
|
📋 Alive: {', '.join(p.nick for p in alive)}"""
|
|
|
|
if self.config.auto_switch and self.config.day_duration > 0:
|
|
msg += f"\n⏰ Day ends in {self.config.day_duration} seconds"
|
|
if self.config.auto_next:
|
|
msg += " (or sooner if everyone votes)"
|
|
else:
|
|
msg += "\n⏰ Host: use !mafia next to advance"
|
|
|
|
if self.config.allow_no_kill:
|
|
msg += "\n💡 Vote 'skip' to skip elimination"
|
|
|
|
return msg
|
|
|
|
# -------------------- NIGHT ACTION LOGIC --------------------
|
|
|
|
def mafia_vote(self, voter_nick: str, target_name: str) -> Tuple[bool, str, Optional[str], bool]:
|
|
"""
|
|
Handles a Mafia member voting for a kill.
|
|
Returns: (success, reply_msg, broadcast_msg, phase_complete)
|
|
"""
|
|
voter = self.get_player_by_nick(voter_nick)
|
|
|
|
if not voter or voter.role != Role.MAFIA:
|
|
return False, "❌ Only Mafia members can vote to kill!", None, False
|
|
|
|
if not voter.alive:
|
|
return False, "❌ Dead players cannot vote!", None, False
|
|
|
|
if self.phase != Phase.NIGHT:
|
|
return False, "❌ You can only kill during the night!", None, False
|
|
|
|
target = self.find_player(target_name)
|
|
if not target:
|
|
return False, f"❌ Player '{target_name}' not found!", None, False
|
|
|
|
if not target.alive:
|
|
return False, f"❌ {target.nick} is already dead!", None, False
|
|
|
|
if target.role == Role.MAFIA:
|
|
return False, "❌ You cannot kill your fellow Mafia member!", None, False
|
|
|
|
self.mafia_votes[voter_nick] = target.nick
|
|
|
|
# Count current votes to show the mafia team status
|
|
mafia_members = self.get_mafia_members()
|
|
votes_cast = len(self.mafia_votes)
|
|
|
|
vote_counts: Dict[str, int] = {}
|
|
for target_nick in self.mafia_votes.values():
|
|
vote_counts[target_nick] = vote_counts.get(target_nick, 0) + 1
|
|
|
|
vote_summary = ", ".join(f"{v}: {k}" for k, v in vote_counts.items())
|
|
response = f"🗳️ You voted to kill {target.nick}\n"
|
|
response += f"📊 Votes ({votes_cast}/{len(mafia_members)}): {vote_summary}"
|
|
|
|
# Announcement for other mafia members
|
|
announcement = None
|
|
if len(mafia_members) > 1:
|
|
announcement = f"🔪 {voter_nick} voted to kill {target.nick} ({votes_cast}/{len(mafia_members)})"
|
|
|
|
# Check if we can end the night early
|
|
phase_complete = self.check_phase_completion()
|
|
|
|
return True, response, announcement, phase_complete
|
|
|
|
def detective_investigate(self, detective_nick: str, target_name: str) -> Tuple[bool, str, bool]:
|
|
# Handles the Detective checking someone's role
|
|
detective = self.get_player_by_nick(detective_nick)
|
|
|
|
if not detective or detective.role != Role.DETECTIVE:
|
|
return False, "❌ Only the Detective can investigate!", False
|
|
|
|
if not detective.alive:
|
|
return False, "❌ Dead players cannot use abilities!", False
|
|
|
|
if self.phase != Phase.NIGHT:
|
|
return False, "❌ You can only investigate during the night!", False
|
|
|
|
if self.detective_target is not None:
|
|
return False, f"❌ You already investigated {self.detective_target} tonight!", False
|
|
|
|
target = self.find_player(target_name)
|
|
if not target:
|
|
return False, f"❌ Player '{target_name}' not found!", False
|
|
|
|
if not target.alive:
|
|
return False, f"❌ {target.nick} is already dead!", False
|
|
|
|
if target.nick == detective_nick:
|
|
return False, "❌ You cannot investigate yourself!", False
|
|
|
|
self.detective_target = target.nick
|
|
|
|
# Result logic
|
|
is_mafia = target.role == Role.MAFIA
|
|
|
|
if is_mafia:
|
|
result = f"🔍 {target.nick} is MAFIA! 🔴"
|
|
else:
|
|
result = f"🔍 {target.nick} is NOT MAFIA ✅"
|
|
|
|
self.log(f"Detective investigated {target.nick}: {'Mafia' if is_mafia else 'Not Mafia'}")
|
|
|
|
phase_complete = self.check_phase_completion()
|
|
return True, result, phase_complete
|
|
|
|
def doctor_protect(self, doctor_nick: str, target_name: str) -> Tuple[bool, str, bool]:
|
|
# Handles the Doctor trying to save someone
|
|
doctor = self.get_player_by_nick(doctor_nick)
|
|
|
|
if not doctor or doctor.role != Role.DOCTOR:
|
|
return False, "❌ Only the Doctor can protect players!", False
|
|
|
|
if not doctor.alive:
|
|
return False, "❌ Dead players cannot use abilities!", False
|
|
|
|
if self.phase != Phase.NIGHT:
|
|
return False, "❌ You can only protect during the night!", False
|
|
|
|
if self.doctor_target is not None:
|
|
return False, f"❌ You already chose to protect {self.doctor_target} tonight!", False
|
|
|
|
target = self.find_player(target_name)
|
|
if not target:
|
|
return False, f"❌ Player '{target_name}' not found!", False
|
|
|
|
if not target.alive:
|
|
return False, f"❌ {target.nick} is already dead!", False
|
|
|
|
# Rule: Can't save the same person twice in a row
|
|
if self.protected_last_night and target.nick == self.protected_last_night:
|
|
return False, f"❌ You cannot protect {target.nick} two nights in a row!", False
|
|
|
|
self.doctor_target = target.nick
|
|
|
|
self.log(f"Doctor chose to protect {target.nick}")
|
|
|
|
phase_complete = self.check_phase_completion()
|
|
return True, f"🏥 You will protect {target.nick} tonight.", phase_complete
|
|
|
|
def resolve_night(self) -> Tuple[str, Optional[Player], bool]:
|
|
"""
|
|
Calculates what happened during the night.
|
|
Returns: (narrative_text, player_who_died, is_game_over)
|
|
"""
|
|
killed_player = None
|
|
result_parts = []
|
|
|
|
# 1. Decide who Mafia kills (based on majority vote)
|
|
if self.mafia_votes:
|
|
vote_counts: Dict[str, int] = {}
|
|
for target_nick in self.mafia_votes.values():
|
|
vote_counts[target_nick] = vote_counts.get(target_nick, 0) + 1
|
|
|
|
if vote_counts:
|
|
max_votes = max(vote_counts.values())
|
|
# If there is a tie, pick one randomly
|
|
top_targets = [t for t, v in vote_counts.items() if v == max_votes]
|
|
self.mafia_kill_target = random.choice(top_targets)
|
|
|
|
# 2. Update Doctor's history
|
|
if self.doctor_target:
|
|
self.protected_last_night = self.doctor_target
|
|
self.log(f"Doctor protected {self.doctor_target}")
|
|
else:
|
|
self.protected_last_night = None
|
|
|
|
# 3. Apply the kill
|
|
if self.mafia_kill_target:
|
|
target = self.get_player_by_nick(self.mafia_kill_target)
|
|
|
|
if target and target.alive:
|
|
if self.doctor_target == self.mafia_kill_target:
|
|
# Saved by the doc!
|
|
result_parts.append(f"🏥 The Doctor saved someone from death!")
|
|
self.log(f"Doctor saved {self.mafia_kill_target} from Mafia")
|
|
else:
|
|
# Player dies
|
|
target.alive = False
|
|
killed_player = target
|
|
|
|
role_reveal = ""
|
|
if self.config.reveal_role_on_death:
|
|
role_reveal = f" They were a {target.role.value}."
|
|
|
|
result_parts.append(f"💀 {target.nick} was killed by the Mafia!{role_reveal}")
|
|
self.log(f"{target.nick} ({target.role.value}) was killed by Mafia")
|
|
else:
|
|
result_parts.append("🌙 The night passed peacefully... no one died!")
|
|
self.log("No one was killed during the night")
|
|
|
|
result_message = "\n".join(result_parts) if result_parts else "The night was uneventful."
|
|
|
|
# 4. Check if the game is over
|
|
winner = self.check_win_condition()
|
|
game_ended = winner is not None
|
|
|
|
if game_ended:
|
|
result_message += f"\n\n🏆 {winner.upper()} WINS! 🏆"
|
|
self.phase = Phase.ENDED
|
|
|
|
return result_message, killed_player, game_ended
|
|
|
|
# -------------------- DAY ACTION LOGIC --------------------
|
|
|
|
def day_vote(self, voter_nick: str, target_name: str) -> Tuple[bool, str, str, bool]:
|
|
"""
|
|
Handles a player voting to eliminate someone.
|
|
Returns: (success, reply_msg, broadcast_msg, phase_complete)
|
|
"""
|
|
voter = self.get_player_by_nick(voter_nick)
|
|
|
|
if not voter:
|
|
return False, "❌ You're not in this game!", "", False
|
|
|
|
if not voter.alive:
|
|
return False, "❌ Dead players cannot vote!", "", False
|
|
|
|
if self.phase != Phase.DAY:
|
|
return False, "❌ You can only vote during the day!", "", False
|
|
|
|
# Logic for skipping
|
|
if target_name.lower() == "skip":
|
|
if not self.config.allow_no_kill:
|
|
return False, "❌ Skipping elimination is not allowed!", "", False
|
|
|
|
old_vote = self.day_votes.get(voter_nick)
|
|
self.day_votes[voter_nick] = "skip"
|
|
|
|
if old_vote:
|
|
announcement = f"🗳️ {voter_nick} changed vote to SKIP"
|
|
else:
|
|
announcement = f"🗳️ {voter_nick} votes to SKIP"
|
|
|
|
phase_complete = self.check_phase_completion()
|
|
return True, "✅ You voted to skip elimination.", announcement, phase_complete
|
|
|
|
# Logic for voting a specific person
|
|
target = self.find_player(target_name)
|
|
if not target:
|
|
return False, f"❌ Player '{target_name}' not found!", "", False
|
|
|
|
if not target.alive:
|
|
return False, f"❌ {target.nick} is already dead!", "", False
|
|
|
|
if target.nick == voter_nick:
|
|
return False, "❌ You cannot vote for yourself!", "", False
|
|
|
|
old_vote = self.day_votes.get(voter_nick)
|
|
self.day_votes[voter_nick] = target.nick
|
|
|
|
if old_vote:
|
|
announcement = f"🗳️ {voter_nick}: {old_vote} → {target.nick}"
|
|
else:
|
|
announcement = f"🗳️ {voter_nick} votes for {target.nick}"
|
|
|
|
vote_tally = self.get_vote_tally()
|
|
|
|
phase_complete = self.check_phase_completion()
|
|
return True, f"✅ You voted to eliminate {target.nick}\n\n{vote_tally}", announcement, phase_complete
|
|
|
|
def unvote(self, voter_nick: str) -> Tuple[bool, str, str]:
|
|
# Removes a player's vote
|
|
voter = self.get_player_by_nick(voter_nick)
|
|
|
|
if not voter:
|
|
return False, "❌ You're not in this game!", ""
|
|
|
|
if not voter.alive:
|
|
return False, "❌ Dead players cannot vote!", ""
|
|
|
|
if self.phase != Phase.DAY:
|
|
return False, "❌ You can only change votes during the day!", ""
|
|
|
|
if voter_nick not in self.day_votes:
|
|
return False, "❌ You haven't voted yet!", ""
|
|
|
|
old_vote = self.day_votes.pop(voter_nick)
|
|
announcement = f"🗳️ {voter_nick} removed vote (was: {old_vote})"
|
|
|
|
return True, "✅ Your vote has been removed.", announcement
|
|
|
|
def get_vote_tally(self) -> str:
|
|
# Creates a text summary of who voted for whom
|
|
alive_count = len(self.get_alive_players())
|
|
votes_cast = len(self.day_votes)
|
|
majority = (alive_count // 2) + 1
|
|
|
|
vote_counts: Dict[str, List[str]] = {}
|
|
for voter, target in self.day_votes.items():
|
|
if target not in vote_counts:
|
|
vote_counts[target] = []
|
|
vote_counts[target].append(voter)
|
|
|
|
if not vote_counts:
|
|
return f"📊 No votes yet (need {majority} for majority)"
|
|
|
|
# Sort by most votes first
|
|
sorted_targets = sorted(vote_counts.items(), key=lambda x: len(x[1]), reverse=True)
|
|
|
|
lines = [f"📊 Votes ({votes_cast}/{alive_count}, need {majority}):"]
|
|
for target, voters in sorted_targets:
|
|
voter_list = ", ".join(voters)
|
|
lines.append(f" {target}: {len(voters)} ({voter_list})")
|
|
|
|
return "\n".join(lines)
|
|
|
|
def resolve_day(self) -> Tuple[str, Optional[Player], bool]:
|
|
"""
|
|
Calculates who gets eliminated at the end of the day.
|
|
Returns: (narrative, eliminated_player, is_game_over)
|
|
"""
|
|
alive_players = self.get_alive_players()
|
|
alive_count = len(alive_players)
|
|
majority = (alive_count // 2) + 1
|
|
|
|
vote_counts: Dict[str, int] = {}
|
|
for target in self.day_votes.values():
|
|
vote_counts[target] = vote_counts.get(target, 0) + 1
|
|
|
|
eliminated_player = None
|
|
result_parts = []
|
|
|
|
if not vote_counts:
|
|
result_parts.append("📭 No votes cast. No one eliminated.")
|
|
self.log("No votes cast during the day")
|
|
else:
|
|
max_votes = max(vote_counts.values())
|
|
top_targets = [t for t, v in vote_counts.items() if v == max_votes]
|
|
|
|
if "skip" in top_targets and max_votes >= majority:
|
|
result_parts.append("⏭️ Village voted to skip elimination.")
|
|
self.log("Village voted to skip elimination")
|
|
elif max_votes >= majority:
|
|
# Elimination successful
|
|
eliminated_nick = random.choice([t for t in top_targets if t != "skip"])
|
|
target = self.get_player_by_nick(eliminated_nick)
|
|
|
|
if target:
|
|
target.alive = False
|
|
eliminated_player = target
|
|
|
|
role_reveal = ""
|
|
if self.config.reveal_role_on_death:
|
|
role_reveal = f" They were a {target.role.value}!"
|
|
|
|
result_parts.append(f"⚖️ {target.nick} has been eliminated!{role_reveal}")
|
|
self.log(f"{target.nick} ({target.role.value}) was eliminated by vote")
|
|
else:
|
|
# Not enough votes for a majority
|
|
tally = ", ".join(f"{t}: {v}" for t, v in sorted(vote_counts.items(), key=lambda x: -x[1]))
|
|
result_parts.append(f"🤷 No majority! Votes: {tally}")
|
|
result_parts.append("No one eliminated.")
|
|
self.log(f"No majority reached. Votes: {tally}")
|
|
|
|
result_message = "\n".join(result_parts)
|
|
|
|
winner = self.check_win_condition()
|
|
game_ended = winner is not None
|
|
|
|
if game_ended:
|
|
result_message += f"\n\n🏆 {winner.upper()} WINS! 🏆"
|
|
self.phase = Phase.ENDED
|
|
|
|
return result_message, eliminated_player, game_ended
|
|
|
|
# -------------------- END GAME & STATUS --------------------
|
|
|
|
def check_win_condition(self) -> Optional[str]:
|
|
# Checks if either side has won
|
|
alive = self.get_alive_players()
|
|
mafia_alive = len([p for p in alive if p.role == Role.MAFIA])
|
|
village_alive = len(alive) - mafia_alive
|
|
|
|
if mafia_alive == 0:
|
|
self.log("Village wins - all Mafia eliminated!")
|
|
return "Village"
|
|
|
|
if mafia_alive >= village_alive:
|
|
self.log("Mafia wins - numerical parity reached!")
|
|
return "Mafia"
|
|
|
|
return None
|
|
|
|
def get_end_game_summary(self) -> str:
|
|
# Generates the final scoreboard
|
|
lines = ["🏁 GAME OVER 🏁", ""]
|
|
lines.append("Final Roles:")
|
|
|
|
by_role: Dict[Role, List[Player]] = {}
|
|
for p in self.players.values():
|
|
if p.role not in by_role:
|
|
by_role[p.role] = []
|
|
by_role[p.role].append(p)
|
|
|
|
for role in [Role.MAFIA, Role.DETECTIVE, Role.DOCTOR, Role.VILLAGER]:
|
|
if role in by_role:
|
|
players = by_role[role]
|
|
status = [f"{p.nick} ({'💀' if not p.alive else '✅'})" for p in players]
|
|
lines.append(f" {role.value}: {', '.join(status)}")
|
|
|
|
lines.append("")
|
|
lines.append(f"📊 Game lasted {self.day_number} day(s)")
|
|
|
|
return "\n".join(lines)
|
|
|
|
def get_status(self) -> str:
|
|
# Returns current state (used for !status command)
|
|
lines = []
|
|
|
|
if self.phase == Phase.LOBBY:
|
|
lines.append("🎮 Status: LOBBY")
|
|
lines.append(f"👥 Players ({len(self.players)}):")
|
|
for nick in self.players.keys():
|
|
lines.append(f" • {nick}")
|
|
lines.append("")
|
|
lines.append(f"🎯 Host: {self.host_nick}")
|
|
lines.append(f"⚙️ Min players: {self.config.min_players}")
|
|
|
|
elif self.phase == Phase.NIGHT:
|
|
lines.append(f"🌙 NIGHT {self.day_number}")
|
|
alive = self.get_alive_players()
|
|
lines.append(f"👥 {len(alive)} players alive")
|
|
lines.append("Waiting for night actions...")
|
|
|
|
elif self.phase == Phase.DAY:
|
|
lines.append(f"☀️ DAY {self.day_number}")
|
|
alive = self.get_alive_players()
|
|
lines.append(f"👥 {len(alive)} players alive:")
|
|
for p in alive:
|
|
lines.append(f" • {p.nick}")
|
|
lines.append("")
|
|
lines.append(self.get_vote_tally())
|
|
|
|
elif self.phase == Phase.ENDED:
|
|
lines.append(self.get_end_game_summary())
|
|
|
|
return "\n".join(lines)
|
|
|
|
def get_config_display(self) -> str:
|
|
# Returns current settings (used for !config command)
|
|
lines = ["⚙️ Game Configuration:"]
|
|
lines.append(f" Mafia count: {'Auto' if self.config.mafia_count == 0 else self.config.mafia_count}")
|
|
lines.append(f" Detective: {'Yes' if self.config.has_detective else 'No'}")
|
|
lines.append(f" Doctor: {'Yes' if self.config.has_doctor else 'No'}")
|
|
lines.append(f" Auto switch: {'Yes' if self.config.auto_switch else 'No (manual)'}")
|
|
lines.append(f" Auto next (skip wait): {'Yes' if self.config.auto_next else 'No'}")
|
|
if self.config.auto_switch:
|
|
lines.append(f" Day duration: {self.config.day_duration}s")
|
|
lines.append(f" Night duration: {self.config.night_duration}s")
|
|
lines.append(f" Reveal role on death: {'Yes' if self.config.reveal_role_on_death else 'No'}")
|
|
lines.append(f" Allow skip vote: {'Yes' if self.config.allow_no_kill else 'No'}")
|
|
lines.append(f" Min players: {self.config.min_players}")
|
|
return "\n".join(lines)
|
|
|
|
|
|
# -------------------- XMPP BOT CLIENT --------------------
|
|
|
|
class MafiaBot(slixmpp.ClientXMPP):
|
|
# The actual bot that connects to the internet and talks to users
|
|
|
|
def __init__(self, jid: str, password: str, rooms: List[str],
|
|
room_nicknames: Dict[str, str], nickname: str,
|
|
trigger: str = "!mafia", admin_users: Optional[List[str]] = None):
|
|
|
|
super().__init__(jid, password)
|
|
|
|
self.rooms = rooms
|
|
self.room_nicknames = room_nicknames or {}
|
|
self.nickname = nickname
|
|
self.trigger = trigger
|
|
self.admin_users = set(u.lower() for u in (admin_users or []))
|
|
|
|
# State storage for active games
|
|
self.games: Dict[str, MafiaGame] = {} # Key: room_jid
|
|
|
|
# Enable required XMPP extensions
|
|
self.register_plugin('xep_0030') # Service Discovery
|
|
self.register_plugin('xep_0045') # Multi-User Chat
|
|
self.register_plugin('xep_0199') # XMPP Ping
|
|
|
|
# Bind event callbacks
|
|
self.add_event_handler("session_start", self.start)
|
|
self.add_event_handler("groupchat_message", self.handle_groupchat)
|
|
self.add_event_handler("message", self.handle_private_message)
|
|
|
|
async def start(self, event):
|
|
# Called when the bot successfully connects
|
|
self.send_presence()
|
|
await self.get_roster()
|
|
|
|
for room in self.rooms:
|
|
nick = self.room_nicknames.get(room, self.nickname)
|
|
try:
|
|
await self.plugin['xep_0045'].join_muc(room, nick)
|
|
logging.info(f"Joined room: {room} as {nick}")
|
|
except Exception as e:
|
|
logging.error(f"Failed to join {room}: {e}")
|
|
|
|
def get_room_nick(self, room_jid: str) -> str:
|
|
# Helper to find what nickname the bot uses in a specific room
|
|
return self.room_nicknames.get(room_jid, self.nickname)
|
|
|
|
async def send_room_message(self, room_jid: str, message: str):
|
|
# Sends a public message to the MUC
|
|
self.send_message(mto=room_jid, mbody=message, mtype='groupchat')
|
|
|
|
async def send_private_muc_message(self, room_jid: str, nick: str, message: str):
|
|
# Sends a private whisper to a user inside a MUC context
|
|
to_jid = f"{room_jid}/{nick}"
|
|
self.send_message(mto=to_jid, mbody=message, mtype='chat')
|
|
|
|
async def send_direct_message(self, jid: str, message: str):
|
|
# Sends a standard 1-on-1 chat message
|
|
self.send_message(mto=jid, mbody=message, mtype='chat')
|
|
|
|
# -------------------- INCOMING MESSAGE PROCESSING --------------------
|
|
|
|
async def handle_groupchat(self, msg):
|
|
# Processes messages sent to the public chat room
|
|
if msg['type'] != 'groupchat':
|
|
return
|
|
|
|
room_jid = msg['from'].bare
|
|
sender_nick = msg['mucnick']
|
|
bot_nick = self.get_room_nick(room_jid)
|
|
|
|
# Don't let the bot reply to itself
|
|
if sender_nick == bot_nick:
|
|
return
|
|
|
|
body = msg['body'].strip()
|
|
if not body:
|
|
return
|
|
|
|
cmd_text = None
|
|
|
|
# 1. Standard Prefix (e.g. "!mafia join")
|
|
if body.startswith(self.trigger):
|
|
cmd_text = body[len(self.trigger):].strip()
|
|
|
|
# 2. Alias Addressing (e.g. "MafiaBot: join")
|
|
else:
|
|
# Check for: Space, Comma, Dot, Colon etc.
|
|
for character in (" ", ",", ".", ":", ";", "-", "—"):
|
|
prefix = bot_nick + character
|
|
if body.startswith(prefix):
|
|
# Found a match! Remove the name and pass the rest
|
|
cmd_text = body[len(prefix):].strip()
|
|
break
|
|
|
|
if cmd_text is not None:
|
|
await self.handle_command(room_jid, sender_nick, cmd_text, is_private=False)
|
|
|
|
async def handle_private_message(self, msg):
|
|
# Processes direct messages and whispers
|
|
if msg['type'] not in ('chat', 'normal'):
|
|
return
|
|
|
|
from_jid = msg['from']
|
|
body = msg['body'].strip()
|
|
|
|
if not body:
|
|
return
|
|
|
|
# Check if this is a room whisper (room@server/User)
|
|
if '/' in str(from_jid):
|
|
room_jid = from_jid.bare
|
|
sender_nick = from_jid.resource
|
|
|
|
# Make sure we actually know this game
|
|
if room_jid in self.games:
|
|
await self.handle_whisper(room_jid, sender_nick, body, from_jid)
|
|
else:
|
|
# Direct Message (not inside a room)
|
|
sender_bare = from_jid.bare.lower()
|
|
|
|
# Find which game this player belongs to
|
|
for room_jid, game in self.games.items():
|
|
for player in game.players.values():
|
|
if player.bare_jid.lower() == sender_bare:
|
|
await self.handle_whisper(room_jid, player.nick, body, from_jid)
|
|
return
|
|
|
|
# If we can't find them, tell them
|
|
await self.send_direct_message(str(from_jid.bare),
|
|
"You're not in any active Mafia game. Join a game room first!")
|
|
|
|
async def handle_whisper(self, room_jid: str, sender_nick: str, body: str, from_jid: JID):
|
|
# Executes commands sent via private message
|
|
game = self.games.get(room_jid)
|
|
bot_nick = self.get_room_nick(room_jid)
|
|
|
|
if not game:
|
|
await self.send_private_muc_message(room_jid, sender_nick,
|
|
"No game in this room! Use '!mafia new' to start one.")
|
|
return
|
|
|
|
# 1. Support "MafiaBot: kill" inside whispers too
|
|
# This handles cases where users copy/paste from main chat or get confused.
|
|
cmd_body = body
|
|
for character in (" ", ",", ".", ":", ";", "-", "—"):
|
|
prefix = bot_nick + character
|
|
if body.startswith(prefix):
|
|
cmd_body = body[len(prefix):].strip()
|
|
break
|
|
|
|
# Separate command from arguments
|
|
parts = cmd_body.split(None, 1)
|
|
cmd = parts[0].lower().lstrip('!')
|
|
args = parts[1] if len(parts) > 1 else ""
|
|
|
|
# Route commands to appropriate handlers
|
|
if cmd == "kill":
|
|
await self.handle_mafia_kill(game, room_jid, sender_nick, args)
|
|
elif cmd == "investigate" or cmd == "inv":
|
|
await self.handle_investigate(game, room_jid, sender_nick, args)
|
|
elif cmd == "protect" or cmd == "heal":
|
|
await self.handle_protect(game, room_jid, sender_nick, args)
|
|
elif cmd == "vote":
|
|
await self.handle_vote(game, room_jid, sender_nick, args)
|
|
elif cmd == "unvote":
|
|
await self.handle_unvote(game, room_jid, sender_nick)
|
|
elif cmd == "role":
|
|
await self.handle_role_query(game, room_jid, sender_nick)
|
|
elif cmd == "help":
|
|
await self.send_private_muc_message(room_jid, sender_nick, self.get_whisper_help())
|
|
else:
|
|
await self.send_private_muc_message(room_jid, sender_nick,
|
|
f"Unknown command: {cmd}\nUse 'help' for commands.")
|
|
|
|
async def handle_command(self, room_jid: str, sender_nick: str, cmd_text: str, is_private: bool):
|
|
# Executes public commands from the group chat
|
|
parts = cmd_text.split(None, 1)
|
|
|
|
if not parts:
|
|
cmd = "help"
|
|
args = ""
|
|
else:
|
|
cmd = parts[0].lower()
|
|
args = parts[1] if len(parts) > 1 else ""
|
|
|
|
# Route to specific command handlers
|
|
if cmd == "new" or cmd == "create":
|
|
await self.handle_new_game(room_jid, sender_nick)
|
|
elif cmd == "join":
|
|
await self.handle_join(room_jid, sender_nick)
|
|
elif cmd == "leave":
|
|
await self.handle_leave(room_jid, sender_nick)
|
|
elif cmd == "start":
|
|
await self.handle_start(room_jid, sender_nick)
|
|
elif cmd == "end" or cmd == "stop":
|
|
await self.handle_end(room_jid, sender_nick)
|
|
elif cmd == "status":
|
|
await self.handle_status(room_jid)
|
|
elif cmd == "players":
|
|
await self.handle_players(room_jid)
|
|
elif cmd == "config":
|
|
await self.handle_config(room_jid, sender_nick, args)
|
|
elif cmd == "next":
|
|
await self.handle_next_phase(room_jid, sender_nick)
|
|
elif cmd == "help":
|
|
await self.send_private_muc_message(room_jid, sender_nick, self.get_help_message())
|
|
await self.send_room_message(room_jid, f"📖 Help sent to {sender_nick} via whisper.")
|
|
elif cmd == "rules":
|
|
await self.send_private_muc_message(room_jid, sender_nick, self.get_rules_message())
|
|
await self.send_room_message(room_jid, f"📜 Rules sent to {sender_nick} via whisper.")
|
|
else:
|
|
await self.send_room_message(room_jid,
|
|
f"Unknown command: {cmd}. Use '{self.trigger} help' for commands.")
|
|
|
|
# -------------------- COMMAND IMPLEMENTATIONS --------------------
|
|
|
|
async def handle_new_game(self, room_jid: str, sender_nick: str):
|
|
# Logic to initialize a new game instance
|
|
if room_jid in self.games and self.games[room_jid].phase != Phase.ENDED:
|
|
await self.send_room_message(room_jid,
|
|
"❌ A game is already in progress! End it first with '!mafia end'")
|
|
return
|
|
|
|
sender_jid = f"{room_jid}/{sender_nick}"
|
|
self.games[room_jid] = MafiaGame(room_jid, sender_jid, sender_nick)
|
|
|
|
await self.send_room_message(room_jid, f"""🎮 NEW MAFIA GAME 🎮
|
|
|
|
{sender_nick} is hosting a new game!
|
|
|
|
📝 To join: {self.trigger} join (or "{self.nickname}: join")
|
|
⚙️ To configure: {self.trigger} config
|
|
🚀 To start: {self.trigger} start
|
|
|
|
Type '{self.trigger} help' for all commands!""")
|
|
|
|
async def handle_join(self, room_jid: str, sender_nick: str):
|
|
# Logic for a player entering the lobby
|
|
game = self.games.get(room_jid)
|
|
|
|
if not game:
|
|
await self.send_room_message(room_jid,
|
|
f"❌ No game exists! Create one with '{self.trigger} new'")
|
|
return
|
|
|
|
sender_jid = f"{room_jid}/{sender_nick}"
|
|
|
|
# Attempt to discover the real JID for secure whispering
|
|
bare_jid = ""
|
|
try:
|
|
roster = self.plugin['xep_0045'].get_roster(room_jid)
|
|
if sender_nick in roster:
|
|
jid_info = roster[sender_nick]
|
|
if hasattr(jid_info, 'jid') and jid_info.jid:
|
|
bare_jid = str(jid_info.jid.bare)
|
|
except:
|
|
pass
|
|
|
|
success, message = game.add_player(sender_nick, sender_jid, bare_jid)
|
|
await self.send_room_message(room_jid, message)
|
|
|
|
async def handle_leave(self, room_jid: str, sender_nick: str):
|
|
# Logic for a player exiting the lobby
|
|
game = self.games.get(room_jid)
|
|
|
|
if not game:
|
|
await self.send_room_message(room_jid, "❌ No game exists!")
|
|
return
|
|
|
|
success, message = game.remove_player(sender_nick)
|
|
await self.send_room_message(room_jid, message)
|
|
|
|
async def handle_start(self, room_jid: str, sender_nick: str):
|
|
# Logic to begin the game loop
|
|
game = self.games.get(room_jid)
|
|
|
|
if not game:
|
|
await self.send_room_message(room_jid,
|
|
f"❌ No game exists! Create one with '{self.trigger} new'")
|
|
return
|
|
|
|
if sender_nick != game.host_nick:
|
|
await self.send_room_message(room_jid,
|
|
f"❌ Only the host ({game.host_nick}) can start the game!")
|
|
return
|
|
|
|
success, message, assignments = game.start_game()
|
|
|
|
if not success:
|
|
await self.send_room_message(room_jid, message)
|
|
return
|
|
|
|
# Publicly announce start
|
|
await self.send_room_message(room_jid, message)
|
|
|
|
# Privately distribute roles
|
|
for player, role in assignments:
|
|
role_message = f"""🎭 YOUR ROLE 🎭
|
|
|
|
You are a {role.value}!
|
|
|
|
{role.description}"""
|
|
|
|
if role == Role.MAFIA:
|
|
mafia_members = game.get_mafia_members()
|
|
if len(mafia_members) > 1:
|
|
other_mafia = [p.nick for p in mafia_members if p.nick != player.nick]
|
|
role_message += f"\n\n🔪 Fellow Mafia: {', '.join(other_mafia)}"
|
|
role_message += "\n\n📝 Night: whisper '!kill <player>' to vote"
|
|
elif role == Role.DETECTIVE:
|
|
role_message += "\n\n📝 Night: whisper '!investigate <player>'"
|
|
elif role == Role.DOCTOR:
|
|
role_message += "\n\n📝 Night: whisper '!protect <player>'"
|
|
|
|
role_message += "\n📝 Day: whisper '!vote <player>' to eliminate"
|
|
|
|
await self.send_private_muc_message(room_jid, player.nick, role_message)
|
|
|
|
# Init timer if enabled
|
|
if game.config.auto_switch and game.config.night_duration > 0:
|
|
game.phase_task = asyncio.create_task(
|
|
self.phase_timer(room_jid, game.config.night_duration)
|
|
)
|
|
|
|
async def handle_end(self, room_jid: str, sender_nick: str):
|
|
# Forcefully terminates the game
|
|
game = self.games.get(room_jid)
|
|
|
|
if not game:
|
|
await self.send_room_message(room_jid, "❌ No game exists!")
|
|
return
|
|
|
|
is_host = sender_nick == game.host_nick
|
|
is_admin = sender_nick.lower() in self.admin_users
|
|
|
|
if not (is_host or is_admin):
|
|
await self.send_room_message(room_jid,
|
|
f"❌ Only the host ({game.host_nick}) can end the game!")
|
|
return
|
|
|
|
if game.phase_task:
|
|
game.phase_task.cancel()
|
|
|
|
game.phase = Phase.ENDED
|
|
|
|
summary = game.get_end_game_summary()
|
|
await self.send_room_message(room_jid, f"🛑 Game ended by {sender_nick}!\n\n{summary}")
|
|
|
|
async def handle_status(self, room_jid: str):
|
|
# Displays game info
|
|
game = self.games.get(room_jid)
|
|
|
|
if not game:
|
|
await self.send_room_message(room_jid,
|
|
f"No active game. Create one with '{self.trigger} new'")
|
|
return
|
|
|
|
await self.send_room_message(room_jid, game.get_status())
|
|
|
|
async def handle_players(self, room_jid: str):
|
|
# Lists current participants
|
|
game = self.games.get(room_jid)
|
|
|
|
if not game:
|
|
await self.send_room_message(room_jid, "No active game.")
|
|
return
|
|
|
|
if game.phase == Phase.LOBBY:
|
|
players = list(game.players.keys())
|
|
await self.send_room_message(room_jid,
|
|
f"👥 Players ({len(players)}): {', '.join(players) if players else 'None yet'}")
|
|
else:
|
|
alive = [p.nick for p in game.get_alive_players()]
|
|
dead = [p.nick for p in game.get_dead_players()]
|
|
|
|
msg = f"👥 Players:\n"
|
|
msg += f" ✅ Alive ({len(alive)}): {', '.join(alive)}\n"
|
|
msg += f" 💀 Dead ({len(dead)}): {', '.join(dead) if dead else 'None'}"
|
|
|
|
await self.send_room_message(room_jid, msg)
|
|
|
|
async def handle_config(self, room_jid: str, sender_nick: str, args: str):
|
|
# Modifies game settings
|
|
game = self.games.get(room_jid)
|
|
|
|
if not game:
|
|
await self.send_room_message(room_jid, "❌ No game exists!")
|
|
return
|
|
|
|
if game.phase != Phase.LOBBY:
|
|
await self.send_room_message(room_jid, "❌ Can only configure during lobby!")
|
|
return
|
|
|
|
if sender_nick != game.host_nick:
|
|
await self.send_room_message(room_jid,
|
|
f"❌ Only the host ({game.host_nick}) can change settings!")
|
|
return
|
|
|
|
if not args:
|
|
await self.send_private_muc_message(room_jid, sender_nick, game.get_config_display())
|
|
await self.send_room_message(room_jid, f"⚙️ Config sent to {sender_nick} via whisper.")
|
|
return
|
|
|
|
parts = args.split(None, 1)
|
|
option = parts[0].lower()
|
|
value = parts[1] if len(parts) > 1 else ""
|
|
|
|
try:
|
|
if option == "mafia":
|
|
game.config.mafia_count = int(value) if value else 0
|
|
await self.send_room_message(room_jid,
|
|
f"✅ Mafia count: {'Auto' if game.config.mafia_count == 0 else game.config.mafia_count}")
|
|
|
|
elif option == "detective":
|
|
game.config.has_detective = value.lower() in ('yes', 'true', 'on', '1')
|
|
await self.send_room_message(room_jid,
|
|
f"✅ Detective: {'Enabled' if game.config.has_detective else 'Disabled'}")
|
|
|
|
elif option == "doctor":
|
|
game.config.has_doctor = value.lower() in ('yes', 'true', 'on', '1')
|
|
await self.send_room_message(room_jid,
|
|
f"✅ Doctor: {'Enabled' if game.config.has_doctor else 'Disabled'}")
|
|
|
|
elif option == "auto":
|
|
game.config.auto_switch = value.lower() in ('yes', 'true', 'on', '1')
|
|
await self.send_room_message(room_jid,
|
|
f"✅ Auto phase switch: {'Enabled' if game.config.auto_switch else 'Disabled'}")
|
|
|
|
elif option == "autonext" or option == "next":
|
|
game.config.auto_next = value.lower() in ('yes', 'true', 'on', '1')
|
|
await self.send_room_message(room_jid,
|
|
f"✅ Auto next (skip wait): {'Enabled' if game.config.auto_next else 'Disabled'}")
|
|
|
|
elif option == "daytime" or option == "day":
|
|
game.config.day_duration = int(value)
|
|
await self.send_room_message(room_jid,
|
|
f"✅ Day duration: {game.config.day_duration}s")
|
|
|
|
elif option == "nighttime" or option == "night":
|
|
game.config.night_duration = int(value)
|
|
await self.send_room_message(room_jid,
|
|
f"✅ Night duration: {game.config.night_duration}s")
|
|
|
|
elif option == "reveal":
|
|
game.config.reveal_role_on_death = value.lower() in ('yes', 'true', 'on', '1')
|
|
await self.send_room_message(room_jid,
|
|
f"✅ Reveal role on death: {'Yes' if game.config.reveal_role_on_death else 'No'}")
|
|
|
|
elif option == "skip":
|
|
game.config.allow_no_kill = value.lower() in ('yes', 'true', 'on', '1')
|
|
await self.send_room_message(room_jid,
|
|
f"✅ Allow skip vote: {'Yes' if game.config.allow_no_kill else 'No'}")
|
|
|
|
elif option == "minplayers":
|
|
game.config.min_players = max(4, int(value))
|
|
await self.send_room_message(room_jid,
|
|
f"✅ Minimum players: {game.config.min_players}")
|
|
|
|
else:
|
|
await self.send_room_message(room_jid,
|
|
f"❌ Unknown option: {option}. Options: mafia, detective, doctor, auto, autonext, daytime, nighttime, reveal, skip, minplayers")
|
|
|
|
except ValueError as e:
|
|
await self.send_room_message(room_jid, f"❌ Invalid value: {e}")
|
|
|
|
async def handle_next_phase(self, room_jid: str, sender_nick: str):
|
|
# Manually triggers phase transition
|
|
game = self.games.get(room_jid)
|
|
|
|
if not game:
|
|
await self.send_room_message(room_jid, "❌ No game exists!")
|
|
return
|
|
|
|
is_host = sender_nick == game.host_nick
|
|
is_admin = sender_nick.lower() in self.admin_users
|
|
|
|
if not (is_host or is_admin):
|
|
await self.send_room_message(room_jid,
|
|
f"❌ Only the host ({game.host_nick}) can advance phases!")
|
|
return
|
|
|
|
await self.advance_phase(room_jid)
|
|
|
|
# -------------------- TIMER & SCHEDULING --------------------
|
|
|
|
async def phase_timer(self, room_jid: str, duration: int):
|
|
# Background task that waits and then advances the phase
|
|
try:
|
|
# 1. Wait for half the duration, send warning
|
|
await asyncio.sleep(duration // 2)
|
|
game = self.games.get(room_jid)
|
|
if game and game.phase in (Phase.NIGHT, Phase.DAY):
|
|
remaining = duration - (duration // 2)
|
|
await self.send_room_message(room_jid, f"⏰ {remaining} seconds remaining!")
|
|
|
|
# 2. Wait until 30s remain
|
|
await asyncio.sleep(max(0, remaining - 30))
|
|
game = self.games.get(room_jid)
|
|
if game and game.phase in (Phase.NIGHT, Phase.DAY):
|
|
await self.send_room_message(room_jid, "⏰ 30 seconds remaining!")
|
|
|
|
# 3. Final countdown
|
|
await asyncio.sleep(30)
|
|
|
|
game = self.games.get(room_jid)
|
|
if game and game.phase in (Phase.NIGHT, Phase.DAY):
|
|
await self.advance_phase(room_jid)
|
|
|
|
except asyncio.CancelledError:
|
|
pass
|
|
|
|
async def advance_phase(self, room_jid: str):
|
|
# Executes the transition between Day and Night
|
|
game = self.games.get(room_jid)
|
|
|
|
if not game:
|
|
return
|
|
|
|
# Stop existing timer
|
|
if game.phase_task:
|
|
game.phase_task.cancel()
|
|
game.phase_task = None
|
|
|
|
if game.phase == Phase.NIGHT:
|
|
# Process night results -> Switch to DAY
|
|
result_message, killed_player, game_ended = game.resolve_night()
|
|
|
|
if game_ended:
|
|
summary = game.get_end_game_summary()
|
|
await self.send_room_message(room_jid, f"{result_message}\n\n{summary}")
|
|
return
|
|
|
|
day_message = game.start_day(result_message)
|
|
await self.send_room_message(room_jid, day_message)
|
|
|
|
# Schedule day timer
|
|
if game.config.auto_switch and game.config.day_duration > 0:
|
|
game.phase_task = asyncio.create_task(
|
|
self.phase_timer(room_jid, game.config.day_duration)
|
|
)
|
|
|
|
elif game.phase == Phase.DAY:
|
|
# Process day results -> Switch to NIGHT
|
|
result_message, eliminated_player, game_ended = game.resolve_day()
|
|
|
|
await self.send_room_message(room_jid, result_message)
|
|
|
|
if game_ended:
|
|
summary = game.get_end_game_summary()
|
|
await self.send_room_message(room_jid, summary)
|
|
return
|
|
|
|
# Start next night
|
|
game.day_number += 1
|
|
night_message = game.start_night()
|
|
await self.send_room_message(room_jid, night_message)
|
|
|
|
# Send private role reminders
|
|
await self.send_night_reminders(room_jid, game)
|
|
|
|
# Schedule night timer
|
|
if game.config.auto_switch and game.config.night_duration > 0:
|
|
game.phase_task = asyncio.create_task(
|
|
self.phase_timer(room_jid, game.config.night_duration)
|
|
)
|
|
|
|
async def send_night_reminders(self, room_jid: str, game: MafiaGame):
|
|
# Notifies players that they need to perform night actions
|
|
|
|
# Notify Mafia
|
|
for mafia in game.get_mafia_members():
|
|
await self.send_private_muc_message(room_jid, mafia.nick,
|
|
"🌙 Night time! Whisper '!kill <player>' to vote for a kill.")
|
|
|
|
# Notify Detective
|
|
detective = game.get_player_by_role(Role.DETECTIVE)
|
|
if detective:
|
|
await self.send_private_muc_message(room_jid, detective.nick,
|
|
"🌙 Night time! Whisper '!investigate <player>' to check them.")
|
|
|
|
# Notify Doctor
|
|
doctor = game.get_player_by_role(Role.DOCTOR)
|
|
if doctor:
|
|
msg = "🌙 Night time! Whisper '!protect <player>' to save them."
|
|
if game.protected_last_night:
|
|
msg += f"\n⚠️ Cannot protect {game.protected_last_night} again."
|
|
await self.send_private_muc_message(room_jid, doctor.nick, msg)
|
|
|
|
# -------------------- SPECIFIC ACTION HANDLERS --------------------
|
|
|
|
async def handle_mafia_kill(self, game: MafiaGame, room_jid: str, sender_nick: str, target_name: str):
|
|
# Processes mafia kill commands
|
|
if not target_name:
|
|
await self.send_private_muc_message(room_jid, sender_nick,
|
|
"Usage: !kill <player>")
|
|
return
|
|
|
|
success, response, announcement, phase_complete = game.mafia_vote(sender_nick, target_name)
|
|
await self.send_private_muc_message(room_jid, sender_nick, response)
|
|
|
|
# Relay vote to other mafia
|
|
if announcement:
|
|
for mafia in game.get_mafia_members():
|
|
if mafia.nick != sender_nick:
|
|
await self.send_private_muc_message(room_jid, mafia.nick, announcement)
|
|
|
|
# Check if everyone has acted
|
|
if phase_complete:
|
|
await self.send_room_message(room_jid, "⚡ Everyone has acted. Ending phase early...")
|
|
await self.advance_phase(room_jid)
|
|
|
|
async def handle_investigate(self, game: MafiaGame, room_jid: str, sender_nick: str, target_name: str):
|
|
# Processes detective commands
|
|
if not target_name:
|
|
await self.send_private_muc_message(room_jid, sender_nick,
|
|
"Usage: !investigate <player>")
|
|
return
|
|
|
|
success, response, phase_complete = game.detective_investigate(sender_nick, target_name)
|
|
await self.send_private_muc_message(room_jid, sender_nick, response)
|
|
|
|
if phase_complete:
|
|
await self.send_room_message(room_jid, "⚡ Everyone has acted. Ending phase early...")
|
|
await self.advance_phase(room_jid)
|
|
|
|
async def handle_protect(self, game: MafiaGame, room_jid: str, sender_nick: str, target_name: str):
|
|
# Processes doctor commands
|
|
if not target_name:
|
|
await self.send_private_muc_message(room_jid, sender_nick,
|
|
"Usage: !protect <player>")
|
|
return
|
|
|
|
success, response, phase_complete = game.doctor_protect(sender_nick, target_name)
|
|
await self.send_private_muc_message(room_jid, sender_nick, response)
|
|
|
|
if phase_complete:
|
|
await self.send_room_message(room_jid, "⚡ Everyone has acted. Ending phase early...")
|
|
await self.advance_phase(room_jid)
|
|
|
|
async def handle_vote(self, game: MafiaGame, room_jid: str, sender_nick: str, target_name: str):
|
|
# Processes day vote commands
|
|
if not target_name:
|
|
await self.send_private_muc_message(room_jid, sender_nick,
|
|
"Usage: !vote <player> or !vote skip")
|
|
return
|
|
|
|
success, response, announcement, phase_complete = game.day_vote(sender_nick, target_name)
|
|
await self.send_private_muc_message(room_jid, sender_nick, response)
|
|
|
|
if success and announcement:
|
|
await self.send_room_message(room_jid, announcement)
|
|
|
|
# Announce tally
|
|
tally = game.get_vote_tally()
|
|
await self.send_room_message(room_jid, tally)
|
|
|
|
if phase_complete:
|
|
await self.send_room_message(room_jid, "⚡ Everyone has voted. Ending phase early...")
|
|
await self.advance_phase(room_jid)
|
|
|
|
async def handle_unvote(self, game: MafiaGame, room_jid: str, sender_nick: str):
|
|
# Processes vote retraction
|
|
success, response, announcement = game.unvote(sender_nick)
|
|
await self.send_private_muc_message(room_jid, sender_nick, response)
|
|
|
|
if success and announcement:
|
|
await self.send_room_message(room_jid, announcement)
|
|
|
|
async def handle_role_query(self, game: MafiaGame, room_jid: str, sender_nick: str):
|
|
# Processes requests to view own role
|
|
player = game.get_player_by_nick(sender_nick)
|
|
|
|
if not player:
|
|
await self.send_private_muc_message(room_jid, sender_nick,
|
|
"❌ You're not in this game!")
|
|
return
|
|
|
|
if game.phase == Phase.LOBBY:
|
|
await self.send_private_muc_message(room_jid, sender_nick,
|
|
"❓ Roles haven't been assigned yet!")
|
|
return
|
|
|
|
role_msg = f"🎭 Your role: {player.role.value}\n\n{player.role.description}"
|
|
|
|
if player.role == Role.MAFIA:
|
|
mafia_members = game.get_mafia_members()
|
|
if len(mafia_members) > 1:
|
|
others = [p.nick for p in mafia_members if p.nick != sender_nick]
|
|
role_msg += f"\n\n🔪 Fellow Mafia: {', '.join(others)}"
|
|
else:
|
|
role_msg += "\n\n🔪 You are the only Mafia member!"
|
|
|
|
await self.send_private_muc_message(room_jid, sender_nick, role_msg)
|
|
|
|
# -------------------- INFORMATIONAL STRINGS --------------------
|
|
|
|
def get_help_message(self) -> str:
|
|
# Returns general help text
|
|
return f"""🎮 MAFIA BOT COMMANDS
|
|
|
|
Addressing:
|
|
You can use the prefix {self.trigger} OR address me by name:
|
|
"{self.nickname}: help", "{self.nickname}, join", etc.
|
|
|
|
Game Setup:
|
|
{self.trigger} new - Create a new game
|
|
{self.trigger} join - Join the game
|
|
{self.trigger} leave - Leave (lobby only)
|
|
{self.trigger} start - Start game (host)
|
|
{self.trigger} end - End game (host)
|
|
|
|
During Game:
|
|
{self.trigger} status - Show status
|
|
{self.trigger} players - List players
|
|
{self.trigger} next - Next phase (host)
|
|
|
|
Config (lobby, host only):
|
|
{self.trigger} config - Show config
|
|
{self.trigger} config mafia <n>
|
|
{self.trigger} config detective yes/no
|
|
{self.trigger} config doctor yes/no
|
|
{self.trigger} config auto yes/no
|
|
{self.trigger} config autonext yes/no
|
|
{self.trigger} config daytime <sec>
|
|
{self.trigger} config nighttime <sec>
|
|
{self.trigger} config reveal yes/no
|
|
{self.trigger} config skip yes/no
|
|
|
|
Info:
|
|
{self.trigger} rules - Game rules
|
|
{self.trigger} help - This help
|
|
|
|
Private commands (whisper):
|
|
!role - Check your role
|
|
!vote <name> - Vote (day)
|
|
!unvote - Remove vote
|
|
!kill <name> - Mafia kill (night)
|
|
!investigate <name> - Detective (night)
|
|
!protect <name> - Doctor (night)"""
|
|
|
|
def get_whisper_help(self) -> str:
|
|
# Returns help text for private messages
|
|
return """🤫 PRIVATE COMMANDS
|
|
|
|
Info:
|
|
!role - Check your role
|
|
!help - This help
|
|
|
|
Day Phase:
|
|
!vote <name> - Vote to eliminate
|
|
!vote skip - Skip elimination
|
|
!unvote - Remove your vote
|
|
|
|
Night Phase:
|
|
!kill <name> - Mafia: vote to kill
|
|
!investigate <name> - Detective: check
|
|
!protect <name> - Doctor: save
|
|
|
|
Tip: You can use partial names."""
|
|
|
|
def get_rules_message(self) -> str:
|
|
# Returns the rules of the game
|
|
return """📜 MAFIA GAME RULES
|
|
|
|
Mafia is a social deduction game where an informed minority (Mafia) battles an uninformed majority (Village).
|
|
|
|
ROLES:
|
|
🏘️ Villager - Find and eliminate the Mafia
|
|
🔪 Mafia - Kill villagers without being caught
|
|
🔍 Detective - Investigate one player per night
|
|
🏥 Doctor - Protect one player per night
|
|
|
|
GAMEPLAY:
|
|
🌙 Night Phase:
|
|
• Mafia secretly votes to kill someone
|
|
• Detective investigates one player
|
|
• Doctor protects one player
|
|
|
|
☀️ Day Phase:
|
|
• The village discusses suspects
|
|
• Players vote to eliminate
|
|
• Majority vote eliminates a player
|
|
|
|
WIN CONDITIONS:
|
|
✅ Village wins if all Mafia eliminated
|
|
❌ Mafia wins if they equal or outnumber villagers
|
|
|
|
TIPS:
|
|
• Watch voting patterns
|
|
• Mafia knows each other, villagers don't
|
|
• Dead players cannot communicate
|
|
• Doctor can't protect same person twice in a row
|
|
|
|
Good luck! 🎲"""
|
|
|
|
|
|
# -------------------- ENTRY POINT --------------------
|
|
|
|
async def main():
|
|
# Bootstrap the application
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format="%(asctime)s [%(levelname)s] %(message)s"
|
|
)
|
|
|
|
# Parse external configuration file
|
|
config = configparser.ConfigParser()
|
|
config.read('mafiaconfig.ini')
|
|
|
|
jid = config['XMPP']['jid']
|
|
password = config['XMPP']['password']
|
|
rooms = [r.strip() for r in config['XMPP']['rooms'].split(',')]
|
|
|
|
nickname = config.get('Bot', 'nickname', fallback='MafiaBot')
|
|
trigger = config.get('Bot', 'trigger', fallback='!mafia')
|
|
admin_users = [u.strip() for u in config.get('Bot', 'admin_users', fallback='').split(',') if u.strip()]
|
|
|
|
# Load custom nicknames for specific rooms
|
|
room_nicknames = {}
|
|
for key in config['Bot']:
|
|
if key.startswith('nickname.'):
|
|
room = key.split('.', 1)[1].strip()
|
|
nick = config['Bot'][key].strip()
|
|
if nick:
|
|
room_nicknames[room] = nick
|
|
|
|
bot = MafiaBot(
|
|
jid=jid,
|
|
password=password,
|
|
rooms=rooms,
|
|
room_nicknames=room_nicknames,
|
|
nickname=nickname,
|
|
trigger=trigger,
|
|
admin_users=admin_users
|
|
)
|
|
|
|
bot.connect()
|
|
logging.info("Mafia Bot starting...")
|
|
|
|
await bot.disconnected
|
|
|
|
|
|
if __name__ == '__main__':
|
|
asyncio.run(main())
|
|
|