From 3202857b321e99a23879e8ed854c1027d8a6e86c Mon Sep 17 00:00:00 2001 From: just n Date: Fri, 16 Jan 2026 20:35:18 +0000 Subject: [PATCH] Add latex.py --- latex.py | 988 +++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 988 insertions(+) create mode 100644 latex.py diff --git a/latex.py b/latex.py new file mode 100644 index 0000000..3b80e01 --- /dev/null +++ b/latex.py @@ -0,0 +1,988 @@ +#!/usr/bin/env python3 +import slixmpp +import requests +import json +import time +import configparser +import asyncio +import logging +from pathlib import Path +from typing import Any, Dict, FrozenSet, Optional, Set +import re +import base64 +from io import BytesIO +import tempfile +import subprocess +import os + +import matplotlib +matplotlib.use('Agg') +import matplotlib.pyplot as plt +from matplotlib import mathtext + +from slixmpp.jid import JID +from slixmpp.stanza import Message +from slixmpp.xmlstream.handler import CoroutineCallback +from slixmpp.xmlstream.matcher import MatchXPath +from slixmpp.plugins import register_plugin + +from omemo.storage import Just, Maybe, Nothing, Storage +from omemo.types import DeviceInformation, JSONType +from slixmpp_omemo import TrustLevel, XEP_0384 + + +class ImageUploader: + """Handles uploading images to various third-party hosts""" + + def __init__(self, service='catbox', api_key=None): + self.service = service.lower() + self.api_key = api_key + + def upload(self, image_data: bytes, filename: str = "latex.png") -> Optional[str]: + """Upload image and return URL""" + try: + if self.service == 'catbox': + return self._upload_catbox(image_data, filename) + elif self.service == 'litterbox': + return self._upload_litterbox(image_data, filename) + elif self.service == '0x0': + return self._upload_0x0(image_data, filename) + elif self.service == 'imgur': + return self._upload_imgur(image_data) + elif self.service == 'imgbb': + return self._upload_imgbb(image_data) + elif self.service == 'envs': + return self._upload_envs(image_data, filename) + elif self.service == 'uguu': + return self._upload_uguu(image_data, filename) + else: + logging.error(f"Unknown image hosting service: {self.service}") + return None + except Exception as e: + logging.error(f"Image upload failed: {e}") + return None + + def _upload_catbox(self, image_data: bytes, filename: str) -> Optional[str]: + """Upload to catbox.moe (permanent, no key required)""" + url = "https://catbox.moe/user/api.php" + files = { + 'fileToUpload': (filename, BytesIO(image_data), 'image/png') + } + data = { + 'reqtype': 'fileupload' + } + if self.api_key: + data['userhash'] = self.api_key + + response = requests.post(url, files=files, data=data, timeout=30) + response.raise_for_status() + + result = response.text.strip() + if result.startswith('https://'): + logging.info(f"Uploaded to catbox: {result}") + return result + else: + logging.error(f"Catbox upload failed: {result}") + return None + + def _upload_litterbox(self, image_data: bytes, filename: str, time: str = "72h") -> Optional[str]: + """Upload to litterbox.catbox.moe (temporary, no key required) + + time options: 1h, 12h, 24h, 72h + """ + url = "https://litterbox.catbox.moe/resources/internals/api.php" + files = { + 'fileToUpload': (filename, BytesIO(image_data), 'image/png') + } + data = { + 'reqtype': 'fileupload', + 'time': time + } + + response = requests.post(url, files=files, data=data, timeout=30) + response.raise_for_status() + + result = response.text.strip() + if result.startswith('https://'): + logging.info(f"Uploaded to litterbox: {result}") + return result + else: + logging.error(f"Litterbox upload failed: {result}") + return None + + def _upload_0x0(self, image_data: bytes, filename: str) -> Optional[str]: + """Upload to 0x0.st (temporary, no key required)""" + url = "https://0x0.st" + files = { + 'file': (filename, BytesIO(image_data), 'image/png') + } + + response = requests.post(url, files=files, timeout=30) + response.raise_for_status() + + result = response.text.strip() + if result.startswith('https://'): + logging.info(f"Uploaded to 0x0.st: {result}") + return result + else: + logging.error(f"0x0.st upload failed: {result}") + return None + + def _upload_imgur(self, image_data: bytes) -> Optional[str]: + """Upload to imgur.com (requires API key)""" + if not self.api_key: + logging.error("Imgur requires an API key (Client-ID)") + return None + + url = "https://api.imgur.com/3/image" + headers = { + 'Authorization': f'Client-ID {self.api_key}' + } + data = { + 'image': base64.b64encode(image_data).decode('utf-8'), + 'type': 'base64' + } + + response = requests.post(url, headers=headers, data=data, timeout=30) + response.raise_for_status() + + result = response.json() + if result.get('success'): + link = result['data']['link'] + logging.info(f"Uploaded to imgur: {link}") + return link + else: + logging.error(f"Imgur upload failed: {result}") + return None + + def _upload_imgbb(self, image_data: bytes) -> Optional[str]: + """Upload to imgbb.com (requires API key)""" + if not self.api_key: + logging.error("imgbb requires an API key") + return None + + url = "https://api.imgbb.com/1/upload" + data = { + 'key': self.api_key, + 'image': base64.b64encode(image_data).decode('utf-8') + } + + response = requests.post(url, data=data, timeout=30) + response.raise_for_status() + + result = response.json() + if result.get('success'): + link = result['data']['url'] + logging.info(f"Uploaded to imgbb: {link}") + return link + else: + logging.error(f"imgbb upload failed: {result}") + return None + + def _upload_envs(self, image_data: bytes, filename: str) -> Optional[str]: + """Upload to envs.sh (temporary, no key required)""" + url = "https://envs.sh" + files = { + 'file': (filename, BytesIO(image_data), 'image/png') + } + + response = requests.post(url, files=files, timeout=30) + response.raise_for_status() + + result = response.text.strip() + if result.startswith('https://'): + logging.info(f"Uploaded to envs.sh: {result}") + return result + else: + logging.error(f"envs.sh upload failed: {result}") + return None + + def _upload_uguu(self, image_data: bytes, filename: str) -> Optional[str]: + """Upload to uguu.se (temporary 48h, no key required)""" + url = "https://uguu.se/upload.php" + files = { + 'files[]': (filename, BytesIO(image_data), 'image/png') + } + + response = requests.post(url, files=files, timeout=30) + response.raise_for_status() + + result = response.json() + if result.get('success') and result.get('files'): + link = result['files'][0]['url'] + logging.info(f"Uploaded to uguu.se: {link}") + return link + else: + logging.error(f"uguu.se upload failed: {result}") + return None + + +class StorageImpl(Storage): + + def __init__(self, json_file_path: Path) -> None: + super().__init__() + self.__json_file_path = json_file_path + self.__data: Dict[str, JSONType] = {} + try: + with open(self.__json_file_path, encoding="utf8") as f: + self.__data = json.load(f) + except Exception: + pass + + async def _load(self, key: str) -> Maybe[JSONType]: + if key in self.__data: + return Just(self.__data[key]) + return Nothing() + + async def _store(self, key: str, value: JSONType) -> None: + self.__data[key] = value + with open(self.__json_file_path, "w", encoding="utf8") as f: + json.dump(self.__data, f) + + async def _delete(self, key: str) -> None: + self.__data.pop(key, None) + with open(self.__json_file_path, "w", encoding="utf8") as f: + json.dump(self.__data, f) + + +class PluginCouldNotLoad(Exception): + pass + + +class XEP_0384Impl(XEP_0384): + + default_config = { + "fallback_message": "This message is OMEMO encrypted.", + "json_file_path": None + } + + def __init__(self, *args: Any, **kwargs: Any) -> None: + super().__init__(*args, **kwargs) + self.__storage: Storage + + def plugin_init(self) -> None: + if not self.json_file_path: + raise PluginCouldNotLoad("JSON file path not specified.") + self.__storage = StorageImpl(Path(self.json_file_path)) + super().plugin_init() + + @property + def storage(self) -> Storage: + return self.__storage + + @property + def _btbv_enabled(self) -> bool: + return True + + async def _devices_blindly_trusted( + self, + blindly_trusted: FrozenSet[DeviceInformation], + identifier: Optional[str] + ) -> None: + logging.info(f"[{identifier}] Devices trusted blindly: {blindly_trusted}") + + async def _prompt_manual_trust( + self, + manually_trusted: FrozenSet[DeviceInformation], + identifier: Optional[str] + ) -> None: + session_manager = await self.get_session_manager() + for device in manually_trusted: + logging.info(f"Auto-trusting device: {device}") + await session_manager.set_trust( + device.bare_jid, + device.identity_key, + TrustLevel.TRUSTED.value + ) + + +register_plugin(XEP_0384Impl) + + +class LaTeXBot(slixmpp.ClientXMPP): + + def __init__(self, jid, password, rooms, room_nicknames, trigger, mentions, + max_length, nickname, privileged_users, + quote_reply=True, mention_reply=True, + allow_dm=True, dm_mode='whitelist', dm_list=None, + enable_omemo=True, omemo_store_path="omemo_store.json", + join_retry_attempts=5, join_retry_delay=10, + use_full_latex=False, + default_font_size=20, default_dpi=150, + image_host='catbox', image_host_api_key=None, + loop=None): + + super().__init__(jid, password, loop=loop, sasl_mech='PLAIN') + self.enable_direct_tls = True + + self.rooms = rooms + self.room_nicknames = room_nicknames or {} + self.trigger = trigger + self.mentions = mentions + self.max_length = max_length + self.nickname = nickname + + self.privileged_users = {u.lower() for u in privileged_users} + + self.quote_reply = quote_reply + self.mention_reply = mention_reply + + self.allow_dm = allow_dm + self.dm_mode = dm_mode.lower() + self.dm_list = {x.lower() for x in (dm_list or [])} + + self.enable_omemo = enable_omemo + self.omemo_store_path = omemo_store_path + + self.join_retry_attempts = join_retry_attempts + self.join_retry_delay = join_retry_delay + + self.use_full_latex = use_full_latex + self.default_font_size = default_font_size + self.default_dpi = default_dpi + + # Image uploader + self.uploader = ImageUploader(service=image_host, api_key=image_host_api_key) + + # Check if full LaTeX is available + self.latex_available = self._check_latex_available() + if self.use_full_latex and not self.latex_available: + logging.warning("Full LaTeX not available, falling back to matplotlib") + self.use_full_latex = False + + self.register_plugin('xep_0030') + self.register_plugin('xep_0045') + self.register_plugin('xep_0199') + self.register_plugin('xep_0066') # Out of Band Data + + if self.enable_omemo: + self.register_plugin('xep_0085') + self.register_plugin('xep_0380') + + import sys + self.register_plugin( + "xep_0384", + {"json_file_path": self.omemo_store_path}, + module=sys.modules[__name__] + ) + logging.info("OMEMO support enabled") + + self.add_event_handler("session_start", self.start) + self.add_event_handler("groupchat_message", self.groupchat_message) + + if self.enable_omemo: + self.register_handler(CoroutineCallback( + "DirectMessages", + MatchXPath(f"{{{self.default_ns}}}message[@type='chat']"), + self.direct_message_async + )) + else: + self.add_event_handler("message", self.direct_message) + + def _check_latex_available(self): + """Check if pdflatex and convert (ImageMagick) are available""" + try: + subprocess.run(['pdflatex', '--version'], capture_output=True, check=True) + subprocess.run(['convert', '--version'], capture_output=True, check=True) + return True + except (subprocess.CalledProcessError, FileNotFoundError): + return False + + def start(self, event): + self.send_presence() + self.get_roster() + + for room in self.rooms: + nick = self.room_nicknames.get(room, self.nickname) + self.join_room_with_retry(room, nick) + + def join_room_with_retry(self, room, nick): + for attempt in range(self.join_retry_attempts): + try: + self.plugin['xep_0045'].join_muc(room, nick) + logging.info(f"Successfully joined room: {room} as {nick}") + return True + except Exception as e: + logging.error(f"Failed to join {room} (attempt {attempt+1}/{self.join_retry_attempts}): {e}") + if attempt < self.join_retry_attempts - 1: + logging.info(f"Retrying in {self.join_retry_delay} seconds...") + time.sleep(self.join_retry_delay) + else: + logging.error(f"Failed to join {room} after {self.join_retry_attempts} attempts") + return False + + def sanitize_input(self, text): + if not isinstance(text, str): + return "" + text = text.replace('\x00', '') + text = ''.join(char for char in text if char.isprintable() or char in '\n\r\t') + return text.strip() + + def extract_latex(self, body): + """Extract LaTeX code from message body""" + latex = body + + # Check for code blocks + code_block_match = re.search(r'```(?:latex|tex)?\s*([\s\S]*?)```', latex) + if code_block_match: + return code_block_match.group(1).strip() + + # Check for inline code + inline_match = re.search(r'`([^`]+)`', latex) + if inline_match: + return inline_match.group(1).strip() + + # Check for $$ delimiters (display math) + display_match = re.search(r'\$\$([\s\S]*?)\$\$', latex) + if display_match: + return display_match.group(1).strip() + + # Check for single $ delimiters (inline math) + inline_math_match = re.search(r'\$([^\$]+)\$', latex) + if inline_math_match: + return inline_math_match.group(1).strip() + + # Return as-is + return latex.strip() + + def render_latex_matplotlib(self, latex_code, font_size=None, dpi=None): + """Render LaTeX using matplotlib's mathtext or usetex""" + if font_size is None: + font_size = self.default_font_size + if dpi is None: + dpi = self.default_dpi + + try: + fig, ax = plt.subplots(figsize=(0.01, 0.01)) + ax.axis('off') + + # Wrap in display math if needed + if not latex_code.startswith('$') and not latex_code.startswith('\\'): + latex_code = f'${latex_code}$' + elif not latex_code.startswith('$'): + latex_code = f'${latex_code}$' + + text = ax.text(0, 0, latex_code, fontsize=font_size, + ha='left', va='bottom', + usetex=self.latex_available) + + fig.canvas.draw() + + renderer = fig.canvas.get_renderer() + bbox = text.get_window_extent(renderer) + + plt.close(fig) + + width = (bbox.width / dpi) + 0.4 + height = (bbox.height / dpi) + 0.4 + + fig, ax = plt.subplots(figsize=(max(width, 1), max(height, 0.5))) + ax.axis('off') + ax.set_xlim(0, 1) + ax.set_ylim(0, 1) + + ax.text(0.5, 0.5, latex_code, fontsize=font_size, + ha='center', va='center', + usetex=self.latex_available) + + buf = BytesIO() + plt.savefig(buf, format='png', bbox_inches='tight', + pad_inches=0.1, dpi=dpi, facecolor='white', + edgecolor='none') + plt.close(fig) + buf.seek(0) + return buf.read() + + except Exception as e: + logging.error(f"Matplotlib rendering failed: {e}") + try: + fig, ax = plt.subplots(figsize=(10, 2)) + ax.axis('off') + + if not latex_code.startswith('$'): + latex_code = f'${latex_code}$' + + ax.text(0.5, 0.5, latex_code, fontsize=font_size, + ha='center', va='center', transform=ax.transAxes, + usetex=False) + + buf = BytesIO() + plt.savefig(buf, format='png', bbox_inches='tight', + pad_inches=0.1, dpi=dpi, facecolor='white') + plt.close(fig) + buf.seek(0) + return buf.read() + except Exception as e2: + logging.error(f"Fallback rendering also failed: {e2}") + return None + + def render_latex_full(self, latex_code, dpi=None): + """Render LaTeX using full LaTeX installation""" + if dpi is None: + dpi = self.default_dpi + + try: + with tempfile.TemporaryDirectory() as tmpdir: + tex_file = Path(tmpdir) / "formula.tex" + + if '\\documentclass' in latex_code: + tex_content = latex_code + elif '\\begin{' in latex_code: + tex_content = f"""\\documentclass[preview,border=5pt]{{standalone}} +\\usepackage{{amsmath,amssymb,amsfonts,amsbsy}} +\\usepackage{{mathtools}} +\\usepackage{{physics}} +\\usepackage{{siunitx}} +\\usepackage{{chemformula}} +\\usepackage{{tikz}} +\\usepackage{{pgfplots}} +\\pgfplotsset{{compat=1.18}} +\\begin{{document}} +{latex_code} +\\end{{document}} +""" + else: + tex_content = f"""\\documentclass[preview,border=5pt]{{standalone}} +\\usepackage{{amsmath,amssymb,amsfonts,amsbsy}} +\\usepackage{{mathtools}} +\\usepackage{{physics}} +\\begin{{document}} +$\\displaystyle {latex_code}$ +\\end{{document}} +""" + tex_file.write_text(tex_content) + + result = subprocess.run( + ['pdflatex', '-interaction=nonstopmode', '-halt-on-error', str(tex_file)], + cwd=tmpdir, + capture_output=True, + timeout=30 + ) + + pdf_file = Path(tmpdir) / "formula.pdf" + if not pdf_file.exists(): + error_log = (Path(tmpdir) / "formula.log").read_text() + logging.error(f"LaTeX compilation failed: {error_log[-1000:]}") + return None + + png_file = Path(tmpdir) / "formula.png" + subprocess.run( + ['convert', '-density', str(dpi), str(pdf_file), + '-quality', '100', '-flatten', str(png_file)], + capture_output=True, + timeout=30 + ) + + if png_file.exists(): + return png_file.read_bytes() + + logging.error("PNG conversion failed") + return None + + except subprocess.TimeoutExpired: + logging.error("LaTeX rendering timed out") + return None + except Exception as e: + logging.error(f"Full LaTeX rendering failed: {e}") + return None + + def render_latex(self, latex_code, font_size=None, dpi=None): + """Main rendering method""" + if self.use_full_latex and self.latex_available: + result = self.render_latex_full(latex_code, dpi) + if result: + return result + logging.warning("Full LaTeX failed, falling back to matplotlib") + + return self.render_latex_matplotlib(latex_code, font_size, dpi) + + def upload_image(self, image_data: bytes) -> Optional[str]: + """Upload image using configured uploader""" + return self.uploader.upload(image_data) + + def extract_quoted_text(self, body): + lines = body.split('\n') + quoted = [] + non_quoted = [] + + for line in lines: + stripped = line.strip() + if stripped.startswith('>'): + quoted.append(stripped[1:].strip()) + else: + non_quoted.append(line) + + return '\n'.join(quoted), '\n'.join(non_quoted) + + def is_replying_to_bot(self, msg): + room_jid = msg['from'].bare + bot_nick = self.room_nicknames.get(room_jid, self.nickname) + body = msg['body'] + + quoted_text, _ = self.extract_quoted_text(body) + if quoted_text: + quoted_lines = quoted_text.split('\n') + for line in quoted_lines: + line = line.strip() + while line.startswith('>'): + line = line[1:].strip() + if line.lower().startswith(f"{bot_nick.lower()}:"): + return True + if line.lower().startswith(f"{bot_nick.lower()} "): + return True + return False + + def parse_render_options(self, text): + """Parse optional render parameters from the text""" + options = { + 'font_size': self.default_font_size, + 'dpi': self.default_dpi, + 'use_full': self.use_full_latex + } + + size_match = re.search(r'--(?:font)?size[=\s](\d+)', text) + if size_match: + options['font_size'] = int(size_match.group(1)) + text = re.sub(r'--(?:font)?size[=\s]\d+', '', text) + + dpi_match = re.search(r'--dpi[=\s](\d+)', text) + if dpi_match: + options['dpi'] = int(dpi_match.group(1)) + text = re.sub(r'--dpi[=\s]\d+', '', text) + + if '--full' in text: + options['use_full'] = True + text = text.replace('--full', '') + + if '--simple' in text: + options['use_full'] = False + text = text.replace('--simple', '') + + return text.strip(), options + + def direct_message(self, msg): + if not self.allow_dm: + return + + if msg['type'] not in ('chat', 'normal'): + return + + if msg['from'].bare == self.boundjid.bare: + return + + sender = msg['from'].bare.lower() + + if self.dm_mode == 'whitelist' and sender not in self.dm_list: + return + if self.dm_mode == 'blacklist' and sender in self.dm_list: + return + + body = self.sanitize_input(msg['body']) + if not body or len(body) > self.max_length: + return + + logging.info(f"Direct message from {sender}: {body[:50]}...") + + body, options = self.parse_render_options(body) + latex_code = self.extract_latex(body) + + if not latex_code: + msg.reply("Please provide LaTeX code to render. Example: $E = mc^2$").send() + return + + old_use_full = self.use_full_latex + self.use_full_latex = options['use_full'] + image_data = self.render_latex(latex_code, options['font_size'], options['dpi']) + self.use_full_latex = old_use_full + + if image_data: + url = self.upload_image(image_data) + if url: + reply = msg.reply(url) + reply['oob']['url'] = url + reply.send() + logging.info(f"Sent rendered LaTeX to {sender}") + else: + msg.reply("Failed to upload rendered image").send() + else: + msg.reply(f"Failed to render LaTeX: {latex_code[:100]}").send() + + async def direct_message_async(self, stanza: Message) -> None: + if not self.allow_dm: + return + + mfrom = stanza["from"] + mtype = stanza["type"] + + if mtype not in {"chat", "normal"}: + return + + if mfrom.bare == self.boundjid.bare: + return + + sender = mfrom.bare.lower() + + if self.dm_mode == 'whitelist' and sender not in self.dm_list: + return + if self.dm_mode == 'blacklist' and sender in self.dm_list: + return + + xep_0384 = self["xep_0384"] + namespace = xep_0384.is_encrypted(stanza) + + body = None + is_encrypted = False + + if namespace: + logging.debug(f"Encrypted message received from {mfrom}") + try: + decrypted_msg, device_info = await xep_0384.decrypt_message(stanza) + if decrypted_msg.get("body"): + body = self.sanitize_input(decrypted_msg["body"]) + is_encrypted = True + logging.info(f"Decrypted message from {sender}: {body[:50]}...") + except Exception as e: + logging.error(f"Decryption failed: {e}") + await self._plain_reply(mfrom, mtype, f"Error decrypting message: {e}") + return + else: + if stanza["body"]: + body = self.sanitize_input(stanza["body"]) + logging.info(f"Plaintext message from {sender}: {body[:50]}...") + + if not body or len(body) > self.max_length: + return + + body, options = self.parse_render_options(body) + latex_code = self.extract_latex(body) + + if not latex_code: + reply = "Please provide LaTeX code to render. Example: $E = mc^2$" + if is_encrypted: + await self._encrypted_reply(mfrom, mtype, reply) + else: + await self._plain_reply(mfrom, mtype, reply) + return + + loop = asyncio.get_running_loop() + old_use_full = self.use_full_latex + self.use_full_latex = options['use_full'] + image_data = await loop.run_in_executor( + None, + self.render_latex, + latex_code, + options['font_size'], + options['dpi'] + ) + self.use_full_latex = old_use_full + + if image_data: + url = await loop.run_in_executor(None, self.upload_image, image_data) + if url: + if is_encrypted: + await self._encrypted_reply(mfrom, mtype, url) + else: + await self._plain_reply_with_oob(mfrom, mtype, url) + logging.info(f"Sent rendered LaTeX to {sender}") + else: + error = "Failed to upload rendered image" + if is_encrypted: + await self._encrypted_reply(mfrom, mtype, error) + else: + await self._plain_reply(mfrom, mtype, error) + else: + error = f"Failed to render LaTeX: {latex_code[:100]}" + if is_encrypted: + await self._encrypted_reply(mfrom, mtype, error) + else: + await self._plain_reply(mfrom, mtype, error) + + async def _plain_reply(self, mto: JID, mtype: str, reply_text: str) -> None: + msg = self.make_message(mto=mto, mtype=mtype) + msg["body"] = reply_text + msg.send() + + async def _plain_reply_with_oob(self, mto: JID, mtype: str, url: str) -> None: + msg = self.make_message(mto=mto, mtype=mtype) + msg["body"] = url + msg['oob']['url'] = url + msg.send() + + async def _encrypted_reply(self, mto: JID, mtype: str, reply_text: str) -> None: + xep_0384 = self["xep_0384"] + + msg = self.make_message(mto=mto, mtype=mtype) + msg["body"] = reply_text + + encrypt_for: Set[JID] = {JID(mto)} + + try: + messages, encryption_errors = await xep_0384.encrypt_message(msg, encrypt_for) + + if encryption_errors: + logging.warning(f"Encryption errors: {encryption_errors}") + + for namespace, message in messages.items(): + message["eme"]["namespace"] = namespace + message["eme"]["name"] = self["xep_0380"].mechanisms[namespace] + message.send() + logging.debug(f"Sent encrypted message to {mto}") + except Exception as e: + logging.error(f"Failed to send encrypted reply: {e}") + await self._plain_reply(mto, mtype, f"Error encrypting reply: {e}") + + def groupchat_message(self, msg): + if msg['type'] != 'groupchat': + return + + room_jid = msg['from'].bare + bot_nick = self.room_nicknames.get(room_jid, self.nickname) + + if msg['mucnick'] == bot_nick: + return + + body = self.sanitize_input(msg['body']) + + if len(body) > self.max_length: + return + + sender_nick = msg['mucnick'] + sender_lower = sender_nick.lower() + is_privileged = sender_lower in self.privileged_users + + query = None + is_reply = self.is_replying_to_bot(msg) + + if body.startswith(self.trigger): + query = body[len(self.trigger):].strip() + + elif self.mentions and f"@{bot_nick}" in body: + query = body.replace(f"@{bot_nick}", "").strip() + + elif self.mentions and body.lower().startswith(f"{bot_nick.lower()}:"): + query = body[len(bot_nick)+1:].strip() + + elif self.mentions and body.lower().startswith(f"{bot_nick.lower()},"): + query = body[len(bot_nick)+1:].strip() + + elif self.mentions and body.lower().startswith(f"{bot_nick.lower()} "): + query = body[len(bot_nick)+1:].strip() + + elif is_reply: + _, non_quoted = self.extract_quoted_text(body) + query = non_quoted.strip() + logging.info(f"Detected reply to bot from {sender_nick}") + + elif is_privileged and body: + query = body + + if query: + logging.info(f"LaTeX request from {sender_nick} in {room_jid}: {query[:50]}...") + + query, options = self.parse_render_options(query) + latex_code = self.extract_latex(query) + + if not latex_code: + response = f"{sender_nick}: Please provide LaTeX code. Example: $E = mc^2$" + msg.reply(response).send() + return + + old_use_full = self.use_full_latex + self.use_full_latex = options['use_full'] + image_data = self.render_latex(latex_code, options['font_size'], options['dpi']) + self.use_full_latex = old_use_full + + if image_data: + url = self.upload_image(image_data) + if url: + response = url + if self.mention_reply: + response = f"{sender_nick}: {response}" + + if self.quote_reply: + short_latex = latex_code[:50] + ("..." if len(latex_code) > 50 else "") + quoted = f"> {short_latex}" + response = f"{quoted}\n{response}" + + reply = msg.reply(response) + reply['oob']['url'] = url + reply.send() + logging.info(f"Sent rendered LaTeX in {room_jid}") + else: + msg.reply(f"{sender_nick}: Failed to upload rendered image").send() + else: + msg.reply(f"{sender_nick}: Failed to render LaTeX").send() + + +if __name__ == '__main__': + config = configparser.ConfigParser() + config.read('latexconfig.ini') + + jid = config['XMPP']['jid'] + password = config['XMPP']['password'] + rooms = [r.strip() for r in config['XMPP']['rooms'].split(',') if r.strip()] + + trigger = config.get('Bot', 'trigger', fallback='!latex') + mentions = config.getboolean('Bot', 'mentions', fallback=True) + max_length = config.getint('Bot', 'max_length', fallback=4000) + nickname = config.get('Bot', 'nickname', fallback='LaTeXBot') + privileged_users = [u.strip() for u in config.get('Bot', 'privileged_users', fallback='').split(',') if u.strip()] + + quote_reply = config.getboolean('Bot', 'quote_reply', fallback=True) + mention_reply = config.getboolean('Bot', 'mention_reply', fallback=True) + + allow_dm = config.getboolean('Bot', 'allow_dm', fallback=True) + dm_mode = config.get('Bot', 'dm_mode', fallback='none') + dm_list = [x.strip() for x in config.get('Bot', 'dm_list', fallback='').split(',') if x.strip()] + + enable_omemo = config.getboolean('Bot', 'enable_omemo', fallback=True) + omemo_store_path = config.get('Bot', 'omemo_store_path', fallback='omemo_store.json') + + join_retry_attempts = config.getint('Bot', 'join_retry_attempts', fallback=5) + join_retry_delay = config.getint('Bot', 'join_retry_delay', fallback=10) + + # LaTeX-specific settings + use_full_latex = config.getboolean('Bot', 'use_full_latex', fallback=False) + default_font_size = config.getint('Bot', 'default_font_size', fallback=20) + default_dpi = config.getint('Bot', 'default_dpi', fallback=150) + + # Image hosting settings + image_host = config.get('Bot', 'image_host', fallback='catbox') + image_host_api_key = config.get('Bot', 'image_host_api_key', fallback='') + + room_nicknames = {} + for key in config['Bot']: + if key.startswith('nickname.'): + room = key.split('.', 1)[1].strip() + nick = config['Bot'][key].strip() + if nick: + room_nicknames[room] = nick + + async def main(): + logging.basicConfig(level=logging.DEBUG, format="%(levelname)-8s %(message)s") + + loop = asyncio.get_running_loop() + + bot = LaTeXBot( + jid, password, rooms, room_nicknames, trigger, mentions, + max_length, nickname, privileged_users, + quote_reply=quote_reply, mention_reply=mention_reply, + allow_dm=allow_dm, dm_mode=dm_mode, dm_list=dm_list, + enable_omemo=enable_omemo, omemo_store_path=omemo_store_path, + join_retry_attempts=join_retry_attempts, + join_retry_delay=join_retry_delay, + use_full_latex=use_full_latex, + default_font_size=default_font_size, + default_dpi=default_dpi, + image_host=image_host, + image_host_api_key=image_host_api_key, + loop=loop + ) + + bot.connect() + logging.info("LaTeX Bot connected, waiting for messages...") + + await bot.disconnected + + asyncio.run(main()) +