Add latex.py

This commit is contained in:
2026-01-16 20:35:18 +00:00
parent dcfc8a6598
commit 3202857b32

988
latex.py Normal file
View File

@@ -0,0 +1,988 @@
#!/usr/bin/env python3
import slixmpp
import requests
import json
import time
import configparser
import asyncio
import logging
from pathlib import Path
from typing import Any, Dict, FrozenSet, Optional, Set
import re
import base64
from io import BytesIO
import tempfile
import subprocess
import os
import matplotlib
matplotlib.use('Agg')
import matplotlib.pyplot as plt
from matplotlib import mathtext
from slixmpp.jid import JID
from slixmpp.stanza import Message
from slixmpp.xmlstream.handler import CoroutineCallback
from slixmpp.xmlstream.matcher import MatchXPath
from slixmpp.plugins import register_plugin
from omemo.storage import Just, Maybe, Nothing, Storage
from omemo.types import DeviceInformation, JSONType
from slixmpp_omemo import TrustLevel, XEP_0384
class ImageUploader:
"""Handles uploading images to various third-party hosts"""
def __init__(self, service='catbox', api_key=None):
self.service = service.lower()
self.api_key = api_key
def upload(self, image_data: bytes, filename: str = "latex.png") -> Optional[str]:
"""Upload image and return URL"""
try:
if self.service == 'catbox':
return self._upload_catbox(image_data, filename)
elif self.service == 'litterbox':
return self._upload_litterbox(image_data, filename)
elif self.service == '0x0':
return self._upload_0x0(image_data, filename)
elif self.service == 'imgur':
return self._upload_imgur(image_data)
elif self.service == 'imgbb':
return self._upload_imgbb(image_data)
elif self.service == 'envs':
return self._upload_envs(image_data, filename)
elif self.service == 'uguu':
return self._upload_uguu(image_data, filename)
else:
logging.error(f"Unknown image hosting service: {self.service}")
return None
except Exception as e:
logging.error(f"Image upload failed: {e}")
return None
def _upload_catbox(self, image_data: bytes, filename: str) -> Optional[str]:
"""Upload to catbox.moe (permanent, no key required)"""
url = "https://catbox.moe/user/api.php"
files = {
'fileToUpload': (filename, BytesIO(image_data), 'image/png')
}
data = {
'reqtype': 'fileupload'
}
if self.api_key:
data['userhash'] = self.api_key
response = requests.post(url, files=files, data=data, timeout=30)
response.raise_for_status()
result = response.text.strip()
if result.startswith('https://'):
logging.info(f"Uploaded to catbox: {result}")
return result
else:
logging.error(f"Catbox upload failed: {result}")
return None
def _upload_litterbox(self, image_data: bytes, filename: str, time: str = "72h") -> Optional[str]:
"""Upload to litterbox.catbox.moe (temporary, no key required)
time options: 1h, 12h, 24h, 72h
"""
url = "https://litterbox.catbox.moe/resources/internals/api.php"
files = {
'fileToUpload': (filename, BytesIO(image_data), 'image/png')
}
data = {
'reqtype': 'fileupload',
'time': time
}
response = requests.post(url, files=files, data=data, timeout=30)
response.raise_for_status()
result = response.text.strip()
if result.startswith('https://'):
logging.info(f"Uploaded to litterbox: {result}")
return result
else:
logging.error(f"Litterbox upload failed: {result}")
return None
def _upload_0x0(self, image_data: bytes, filename: str) -> Optional[str]:
"""Upload to 0x0.st (temporary, no key required)"""
url = "https://0x0.st"
files = {
'file': (filename, BytesIO(image_data), 'image/png')
}
response = requests.post(url, files=files, timeout=30)
response.raise_for_status()
result = response.text.strip()
if result.startswith('https://'):
logging.info(f"Uploaded to 0x0.st: {result}")
return result
else:
logging.error(f"0x0.st upload failed: {result}")
return None
def _upload_imgur(self, image_data: bytes) -> Optional[str]:
"""Upload to imgur.com (requires API key)"""
if not self.api_key:
logging.error("Imgur requires an API key (Client-ID)")
return None
url = "https://api.imgur.com/3/image"
headers = {
'Authorization': f'Client-ID {self.api_key}'
}
data = {
'image': base64.b64encode(image_data).decode('utf-8'),
'type': 'base64'
}
response = requests.post(url, headers=headers, data=data, timeout=30)
response.raise_for_status()
result = response.json()
if result.get('success'):
link = result['data']['link']
logging.info(f"Uploaded to imgur: {link}")
return link
else:
logging.error(f"Imgur upload failed: {result}")
return None
def _upload_imgbb(self, image_data: bytes) -> Optional[str]:
"""Upload to imgbb.com (requires API key)"""
if not self.api_key:
logging.error("imgbb requires an API key")
return None
url = "https://api.imgbb.com/1/upload"
data = {
'key': self.api_key,
'image': base64.b64encode(image_data).decode('utf-8')
}
response = requests.post(url, data=data, timeout=30)
response.raise_for_status()
result = response.json()
if result.get('success'):
link = result['data']['url']
logging.info(f"Uploaded to imgbb: {link}")
return link
else:
logging.error(f"imgbb upload failed: {result}")
return None
def _upload_envs(self, image_data: bytes, filename: str) -> Optional[str]:
"""Upload to envs.sh (temporary, no key required)"""
url = "https://envs.sh"
files = {
'file': (filename, BytesIO(image_data), 'image/png')
}
response = requests.post(url, files=files, timeout=30)
response.raise_for_status()
result = response.text.strip()
if result.startswith('https://'):
logging.info(f"Uploaded to envs.sh: {result}")
return result
else:
logging.error(f"envs.sh upload failed: {result}")
return None
def _upload_uguu(self, image_data: bytes, filename: str) -> Optional[str]:
"""Upload to uguu.se (temporary 48h, no key required)"""
url = "https://uguu.se/upload.php"
files = {
'files[]': (filename, BytesIO(image_data), 'image/png')
}
response = requests.post(url, files=files, timeout=30)
response.raise_for_status()
result = response.json()
if result.get('success') and result.get('files'):
link = result['files'][0]['url']
logging.info(f"Uploaded to uguu.se: {link}")
return link
else:
logging.error(f"uguu.se upload failed: {result}")
return None
class StorageImpl(Storage):
def __init__(self, json_file_path: Path) -> None:
super().__init__()
self.__json_file_path = json_file_path
self.__data: Dict[str, JSONType] = {}
try:
with open(self.__json_file_path, encoding="utf8") as f:
self.__data = json.load(f)
except Exception:
pass
async def _load(self, key: str) -> Maybe[JSONType]:
if key in self.__data:
return Just(self.__data[key])
return Nothing()
async def _store(self, key: str, value: JSONType) -> None:
self.__data[key] = value
with open(self.__json_file_path, "w", encoding="utf8") as f:
json.dump(self.__data, f)
async def _delete(self, key: str) -> None:
self.__data.pop(key, None)
with open(self.__json_file_path, "w", encoding="utf8") as f:
json.dump(self.__data, f)
class PluginCouldNotLoad(Exception):
pass
class XEP_0384Impl(XEP_0384):
default_config = {
"fallback_message": "This message is OMEMO encrypted.",
"json_file_path": None
}
def __init__(self, *args: Any, **kwargs: Any) -> None:
super().__init__(*args, **kwargs)
self.__storage: Storage
def plugin_init(self) -> None:
if not self.json_file_path:
raise PluginCouldNotLoad("JSON file path not specified.")
self.__storage = StorageImpl(Path(self.json_file_path))
super().plugin_init()
@property
def storage(self) -> Storage:
return self.__storage
@property
def _btbv_enabled(self) -> bool:
return True
async def _devices_blindly_trusted(
self,
blindly_trusted: FrozenSet[DeviceInformation],
identifier: Optional[str]
) -> None:
logging.info(f"[{identifier}] Devices trusted blindly: {blindly_trusted}")
async def _prompt_manual_trust(
self,
manually_trusted: FrozenSet[DeviceInformation],
identifier: Optional[str]
) -> None:
session_manager = await self.get_session_manager()
for device in manually_trusted:
logging.info(f"Auto-trusting device: {device}")
await session_manager.set_trust(
device.bare_jid,
device.identity_key,
TrustLevel.TRUSTED.value
)
register_plugin(XEP_0384Impl)
class LaTeXBot(slixmpp.ClientXMPP):
def __init__(self, jid, password, rooms, room_nicknames, trigger, mentions,
max_length, nickname, privileged_users,
quote_reply=True, mention_reply=True,
allow_dm=True, dm_mode='whitelist', dm_list=None,
enable_omemo=True, omemo_store_path="omemo_store.json",
join_retry_attempts=5, join_retry_delay=10,
use_full_latex=False,
default_font_size=20, default_dpi=150,
image_host='catbox', image_host_api_key=None,
loop=None):
super().__init__(jid, password, loop=loop, sasl_mech='PLAIN')
self.enable_direct_tls = True
self.rooms = rooms
self.room_nicknames = room_nicknames or {}
self.trigger = trigger
self.mentions = mentions
self.max_length = max_length
self.nickname = nickname
self.privileged_users = {u.lower() for u in privileged_users}
self.quote_reply = quote_reply
self.mention_reply = mention_reply
self.allow_dm = allow_dm
self.dm_mode = dm_mode.lower()
self.dm_list = {x.lower() for x in (dm_list or [])}
self.enable_omemo = enable_omemo
self.omemo_store_path = omemo_store_path
self.join_retry_attempts = join_retry_attempts
self.join_retry_delay = join_retry_delay
self.use_full_latex = use_full_latex
self.default_font_size = default_font_size
self.default_dpi = default_dpi
# Image uploader
self.uploader = ImageUploader(service=image_host, api_key=image_host_api_key)
# Check if full LaTeX is available
self.latex_available = self._check_latex_available()
if self.use_full_latex and not self.latex_available:
logging.warning("Full LaTeX not available, falling back to matplotlib")
self.use_full_latex = False
self.register_plugin('xep_0030')
self.register_plugin('xep_0045')
self.register_plugin('xep_0199')
self.register_plugin('xep_0066') # Out of Band Data
if self.enable_omemo:
self.register_plugin('xep_0085')
self.register_plugin('xep_0380')
import sys
self.register_plugin(
"xep_0384",
{"json_file_path": self.omemo_store_path},
module=sys.modules[__name__]
)
logging.info("OMEMO support enabled")
self.add_event_handler("session_start", self.start)
self.add_event_handler("groupchat_message", self.groupchat_message)
if self.enable_omemo:
self.register_handler(CoroutineCallback(
"DirectMessages",
MatchXPath(f"{{{self.default_ns}}}message[@type='chat']"),
self.direct_message_async
))
else:
self.add_event_handler("message", self.direct_message)
def _check_latex_available(self):
"""Check if pdflatex and convert (ImageMagick) are available"""
try:
subprocess.run(['pdflatex', '--version'], capture_output=True, check=True)
subprocess.run(['convert', '--version'], capture_output=True, check=True)
return True
except (subprocess.CalledProcessError, FileNotFoundError):
return False
def start(self, event):
self.send_presence()
self.get_roster()
for room in self.rooms:
nick = self.room_nicknames.get(room, self.nickname)
self.join_room_with_retry(room, nick)
def join_room_with_retry(self, room, nick):
for attempt in range(self.join_retry_attempts):
try:
self.plugin['xep_0045'].join_muc(room, nick)
logging.info(f"Successfully joined room: {room} as {nick}")
return True
except Exception as e:
logging.error(f"Failed to join {room} (attempt {attempt+1}/{self.join_retry_attempts}): {e}")
if attempt < self.join_retry_attempts - 1:
logging.info(f"Retrying in {self.join_retry_delay} seconds...")
time.sleep(self.join_retry_delay)
else:
logging.error(f"Failed to join {room} after {self.join_retry_attempts} attempts")
return False
def sanitize_input(self, text):
if not isinstance(text, str):
return ""
text = text.replace('\x00', '')
text = ''.join(char for char in text if char.isprintable() or char in '\n\r\t')
return text.strip()
def extract_latex(self, body):
"""Extract LaTeX code from message body"""
latex = body
# Check for code blocks
code_block_match = re.search(r'```(?:latex|tex)?\s*([\s\S]*?)```', latex)
if code_block_match:
return code_block_match.group(1).strip()
# Check for inline code
inline_match = re.search(r'`([^`]+)`', latex)
if inline_match:
return inline_match.group(1).strip()
# Check for $$ delimiters (display math)
display_match = re.search(r'\$\$([\s\S]*?)\$\$', latex)
if display_match:
return display_match.group(1).strip()
# Check for single $ delimiters (inline math)
inline_math_match = re.search(r'\$([^\$]+)\$', latex)
if inline_math_match:
return inline_math_match.group(1).strip()
# Return as-is
return latex.strip()
def render_latex_matplotlib(self, latex_code, font_size=None, dpi=None):
"""Render LaTeX using matplotlib's mathtext or usetex"""
if font_size is None:
font_size = self.default_font_size
if dpi is None:
dpi = self.default_dpi
try:
fig, ax = plt.subplots(figsize=(0.01, 0.01))
ax.axis('off')
# Wrap in display math if needed
if not latex_code.startswith('$') and not latex_code.startswith('\\'):
latex_code = f'${latex_code}$'
elif not latex_code.startswith('$'):
latex_code = f'${latex_code}$'
text = ax.text(0, 0, latex_code, fontsize=font_size,
ha='left', va='bottom',
usetex=self.latex_available)
fig.canvas.draw()
renderer = fig.canvas.get_renderer()
bbox = text.get_window_extent(renderer)
plt.close(fig)
width = (bbox.width / dpi) + 0.4
height = (bbox.height / dpi) + 0.4
fig, ax = plt.subplots(figsize=(max(width, 1), max(height, 0.5)))
ax.axis('off')
ax.set_xlim(0, 1)
ax.set_ylim(0, 1)
ax.text(0.5, 0.5, latex_code, fontsize=font_size,
ha='center', va='center',
usetex=self.latex_available)
buf = BytesIO()
plt.savefig(buf, format='png', bbox_inches='tight',
pad_inches=0.1, dpi=dpi, facecolor='white',
edgecolor='none')
plt.close(fig)
buf.seek(0)
return buf.read()
except Exception as e:
logging.error(f"Matplotlib rendering failed: {e}")
try:
fig, ax = plt.subplots(figsize=(10, 2))
ax.axis('off')
if not latex_code.startswith('$'):
latex_code = f'${latex_code}$'
ax.text(0.5, 0.5, latex_code, fontsize=font_size,
ha='center', va='center', transform=ax.transAxes,
usetex=False)
buf = BytesIO()
plt.savefig(buf, format='png', bbox_inches='tight',
pad_inches=0.1, dpi=dpi, facecolor='white')
plt.close(fig)
buf.seek(0)
return buf.read()
except Exception as e2:
logging.error(f"Fallback rendering also failed: {e2}")
return None
def render_latex_full(self, latex_code, dpi=None):
"""Render LaTeX using full LaTeX installation"""
if dpi is None:
dpi = self.default_dpi
try:
with tempfile.TemporaryDirectory() as tmpdir:
tex_file = Path(tmpdir) / "formula.tex"
if '\\documentclass' in latex_code:
tex_content = latex_code
elif '\\begin{' in latex_code:
tex_content = f"""\\documentclass[preview,border=5pt]{{standalone}}
\\usepackage{{amsmath,amssymb,amsfonts,amsbsy}}
\\usepackage{{mathtools}}
\\usepackage{{physics}}
\\usepackage{{siunitx}}
\\usepackage{{chemformula}}
\\usepackage{{tikz}}
\\usepackage{{pgfplots}}
\\pgfplotsset{{compat=1.18}}
\\begin{{document}}
{latex_code}
\\end{{document}}
"""
else:
tex_content = f"""\\documentclass[preview,border=5pt]{{standalone}}
\\usepackage{{amsmath,amssymb,amsfonts,amsbsy}}
\\usepackage{{mathtools}}
\\usepackage{{physics}}
\\begin{{document}}
$\\displaystyle {latex_code}$
\\end{{document}}
"""
tex_file.write_text(tex_content)
result = subprocess.run(
['pdflatex', '-interaction=nonstopmode', '-halt-on-error', str(tex_file)],
cwd=tmpdir,
capture_output=True,
timeout=30
)
pdf_file = Path(tmpdir) / "formula.pdf"
if not pdf_file.exists():
error_log = (Path(tmpdir) / "formula.log").read_text()
logging.error(f"LaTeX compilation failed: {error_log[-1000:]}")
return None
png_file = Path(tmpdir) / "formula.png"
subprocess.run(
['convert', '-density', str(dpi), str(pdf_file),
'-quality', '100', '-flatten', str(png_file)],
capture_output=True,
timeout=30
)
if png_file.exists():
return png_file.read_bytes()
logging.error("PNG conversion failed")
return None
except subprocess.TimeoutExpired:
logging.error("LaTeX rendering timed out")
return None
except Exception as e:
logging.error(f"Full LaTeX rendering failed: {e}")
return None
def render_latex(self, latex_code, font_size=None, dpi=None):
"""Main rendering method"""
if self.use_full_latex and self.latex_available:
result = self.render_latex_full(latex_code, dpi)
if result:
return result
logging.warning("Full LaTeX failed, falling back to matplotlib")
return self.render_latex_matplotlib(latex_code, font_size, dpi)
def upload_image(self, image_data: bytes) -> Optional[str]:
"""Upload image using configured uploader"""
return self.uploader.upload(image_data)
def extract_quoted_text(self, body):
lines = body.split('\n')
quoted = []
non_quoted = []
for line in lines:
stripped = line.strip()
if stripped.startswith('>'):
quoted.append(stripped[1:].strip())
else:
non_quoted.append(line)
return '\n'.join(quoted), '\n'.join(non_quoted)
def is_replying_to_bot(self, msg):
room_jid = msg['from'].bare
bot_nick = self.room_nicknames.get(room_jid, self.nickname)
body = msg['body']
quoted_text, _ = self.extract_quoted_text(body)
if quoted_text:
quoted_lines = quoted_text.split('\n')
for line in quoted_lines:
line = line.strip()
while line.startswith('>'):
line = line[1:].strip()
if line.lower().startswith(f"{bot_nick.lower()}:"):
return True
if line.lower().startswith(f"{bot_nick.lower()} "):
return True
return False
def parse_render_options(self, text):
"""Parse optional render parameters from the text"""
options = {
'font_size': self.default_font_size,
'dpi': self.default_dpi,
'use_full': self.use_full_latex
}
size_match = re.search(r'--(?:font)?size[=\s](\d+)', text)
if size_match:
options['font_size'] = int(size_match.group(1))
text = re.sub(r'--(?:font)?size[=\s]\d+', '', text)
dpi_match = re.search(r'--dpi[=\s](\d+)', text)
if dpi_match:
options['dpi'] = int(dpi_match.group(1))
text = re.sub(r'--dpi[=\s]\d+', '', text)
if '--full' in text:
options['use_full'] = True
text = text.replace('--full', '')
if '--simple' in text:
options['use_full'] = False
text = text.replace('--simple', '')
return text.strip(), options
def direct_message(self, msg):
if not self.allow_dm:
return
if msg['type'] not in ('chat', 'normal'):
return
if msg['from'].bare == self.boundjid.bare:
return
sender = msg['from'].bare.lower()
if self.dm_mode == 'whitelist' and sender not in self.dm_list:
return
if self.dm_mode == 'blacklist' and sender in self.dm_list:
return
body = self.sanitize_input(msg['body'])
if not body or len(body) > self.max_length:
return
logging.info(f"Direct message from {sender}: {body[:50]}...")
body, options = self.parse_render_options(body)
latex_code = self.extract_latex(body)
if not latex_code:
msg.reply("Please provide LaTeX code to render. Example: $E = mc^2$").send()
return
old_use_full = self.use_full_latex
self.use_full_latex = options['use_full']
image_data = self.render_latex(latex_code, options['font_size'], options['dpi'])
self.use_full_latex = old_use_full
if image_data:
url = self.upload_image(image_data)
if url:
reply = msg.reply(url)
reply['oob']['url'] = url
reply.send()
logging.info(f"Sent rendered LaTeX to {sender}")
else:
msg.reply("Failed to upload rendered image").send()
else:
msg.reply(f"Failed to render LaTeX: {latex_code[:100]}").send()
async def direct_message_async(self, stanza: Message) -> None:
if not self.allow_dm:
return
mfrom = stanza["from"]
mtype = stanza["type"]
if mtype not in {"chat", "normal"}:
return
if mfrom.bare == self.boundjid.bare:
return
sender = mfrom.bare.lower()
if self.dm_mode == 'whitelist' and sender not in self.dm_list:
return
if self.dm_mode == 'blacklist' and sender in self.dm_list:
return
xep_0384 = self["xep_0384"]
namespace = xep_0384.is_encrypted(stanza)
body = None
is_encrypted = False
if namespace:
logging.debug(f"Encrypted message received from {mfrom}")
try:
decrypted_msg, device_info = await xep_0384.decrypt_message(stanza)
if decrypted_msg.get("body"):
body = self.sanitize_input(decrypted_msg["body"])
is_encrypted = True
logging.info(f"Decrypted message from {sender}: {body[:50]}...")
except Exception as e:
logging.error(f"Decryption failed: {e}")
await self._plain_reply(mfrom, mtype, f"Error decrypting message: {e}")
return
else:
if stanza["body"]:
body = self.sanitize_input(stanza["body"])
logging.info(f"Plaintext message from {sender}: {body[:50]}...")
if not body or len(body) > self.max_length:
return
body, options = self.parse_render_options(body)
latex_code = self.extract_latex(body)
if not latex_code:
reply = "Please provide LaTeX code to render. Example: $E = mc^2$"
if is_encrypted:
await self._encrypted_reply(mfrom, mtype, reply)
else:
await self._plain_reply(mfrom, mtype, reply)
return
loop = asyncio.get_running_loop()
old_use_full = self.use_full_latex
self.use_full_latex = options['use_full']
image_data = await loop.run_in_executor(
None,
self.render_latex,
latex_code,
options['font_size'],
options['dpi']
)
self.use_full_latex = old_use_full
if image_data:
url = await loop.run_in_executor(None, self.upload_image, image_data)
if url:
if is_encrypted:
await self._encrypted_reply(mfrom, mtype, url)
else:
await self._plain_reply_with_oob(mfrom, mtype, url)
logging.info(f"Sent rendered LaTeX to {sender}")
else:
error = "Failed to upload rendered image"
if is_encrypted:
await self._encrypted_reply(mfrom, mtype, error)
else:
await self._plain_reply(mfrom, mtype, error)
else:
error = f"Failed to render LaTeX: {latex_code[:100]}"
if is_encrypted:
await self._encrypted_reply(mfrom, mtype, error)
else:
await self._plain_reply(mfrom, mtype, error)
async def _plain_reply(self, mto: JID, mtype: str, reply_text: str) -> None:
msg = self.make_message(mto=mto, mtype=mtype)
msg["body"] = reply_text
msg.send()
async def _plain_reply_with_oob(self, mto: JID, mtype: str, url: str) -> None:
msg = self.make_message(mto=mto, mtype=mtype)
msg["body"] = url
msg['oob']['url'] = url
msg.send()
async def _encrypted_reply(self, mto: JID, mtype: str, reply_text: str) -> None:
xep_0384 = self["xep_0384"]
msg = self.make_message(mto=mto, mtype=mtype)
msg["body"] = reply_text
encrypt_for: Set[JID] = {JID(mto)}
try:
messages, encryption_errors = await xep_0384.encrypt_message(msg, encrypt_for)
if encryption_errors:
logging.warning(f"Encryption errors: {encryption_errors}")
for namespace, message in messages.items():
message["eme"]["namespace"] = namespace
message["eme"]["name"] = self["xep_0380"].mechanisms[namespace]
message.send()
logging.debug(f"Sent encrypted message to {mto}")
except Exception as e:
logging.error(f"Failed to send encrypted reply: {e}")
await self._plain_reply(mto, mtype, f"Error encrypting reply: {e}")
def groupchat_message(self, msg):
if msg['type'] != 'groupchat':
return
room_jid = msg['from'].bare
bot_nick = self.room_nicknames.get(room_jid, self.nickname)
if msg['mucnick'] == bot_nick:
return
body = self.sanitize_input(msg['body'])
if len(body) > self.max_length:
return
sender_nick = msg['mucnick']
sender_lower = sender_nick.lower()
is_privileged = sender_lower in self.privileged_users
query = None
is_reply = self.is_replying_to_bot(msg)
if body.startswith(self.trigger):
query = body[len(self.trigger):].strip()
elif self.mentions and f"@{bot_nick}" in body:
query = body.replace(f"@{bot_nick}", "").strip()
elif self.mentions and body.lower().startswith(f"{bot_nick.lower()}:"):
query = body[len(bot_nick)+1:].strip()
elif self.mentions and body.lower().startswith(f"{bot_nick.lower()},"):
query = body[len(bot_nick)+1:].strip()
elif self.mentions and body.lower().startswith(f"{bot_nick.lower()} "):
query = body[len(bot_nick)+1:].strip()
elif is_reply:
_, non_quoted = self.extract_quoted_text(body)
query = non_quoted.strip()
logging.info(f"Detected reply to bot from {sender_nick}")
elif is_privileged and body:
query = body
if query:
logging.info(f"LaTeX request from {sender_nick} in {room_jid}: {query[:50]}...")
query, options = self.parse_render_options(query)
latex_code = self.extract_latex(query)
if not latex_code:
response = f"{sender_nick}: Please provide LaTeX code. Example: $E = mc^2$"
msg.reply(response).send()
return
old_use_full = self.use_full_latex
self.use_full_latex = options['use_full']
image_data = self.render_latex(latex_code, options['font_size'], options['dpi'])
self.use_full_latex = old_use_full
if image_data:
url = self.upload_image(image_data)
if url:
response = url
if self.mention_reply:
response = f"{sender_nick}: {response}"
if self.quote_reply:
short_latex = latex_code[:50] + ("..." if len(latex_code) > 50 else "")
quoted = f"> {short_latex}"
response = f"{quoted}\n{response}"
reply = msg.reply(response)
reply['oob']['url'] = url
reply.send()
logging.info(f"Sent rendered LaTeX in {room_jid}")
else:
msg.reply(f"{sender_nick}: Failed to upload rendered image").send()
else:
msg.reply(f"{sender_nick}: Failed to render LaTeX").send()
if __name__ == '__main__':
config = configparser.ConfigParser()
config.read('latexconfig.ini')
jid = config['XMPP']['jid']
password = config['XMPP']['password']
rooms = [r.strip() for r in config['XMPP']['rooms'].split(',') if r.strip()]
trigger = config.get('Bot', 'trigger', fallback='!latex')
mentions = config.getboolean('Bot', 'mentions', fallback=True)
max_length = config.getint('Bot', 'max_length', fallback=4000)
nickname = config.get('Bot', 'nickname', fallback='LaTeXBot')
privileged_users = [u.strip() for u in config.get('Bot', 'privileged_users', fallback='').split(',') if u.strip()]
quote_reply = config.getboolean('Bot', 'quote_reply', fallback=True)
mention_reply = config.getboolean('Bot', 'mention_reply', fallback=True)
allow_dm = config.getboolean('Bot', 'allow_dm', fallback=True)
dm_mode = config.get('Bot', 'dm_mode', fallback='none')
dm_list = [x.strip() for x in config.get('Bot', 'dm_list', fallback='').split(',') if x.strip()]
enable_omemo = config.getboolean('Bot', 'enable_omemo', fallback=True)
omemo_store_path = config.get('Bot', 'omemo_store_path', fallback='omemo_store.json')
join_retry_attempts = config.getint('Bot', 'join_retry_attempts', fallback=5)
join_retry_delay = config.getint('Bot', 'join_retry_delay', fallback=10)
# LaTeX-specific settings
use_full_latex = config.getboolean('Bot', 'use_full_latex', fallback=False)
default_font_size = config.getint('Bot', 'default_font_size', fallback=20)
default_dpi = config.getint('Bot', 'default_dpi', fallback=150)
# Image hosting settings
image_host = config.get('Bot', 'image_host', fallback='catbox')
image_host_api_key = config.get('Bot', 'image_host_api_key', fallback='')
room_nicknames = {}
for key in config['Bot']:
if key.startswith('nickname.'):
room = key.split('.', 1)[1].strip()
nick = config['Bot'][key].strip()
if nick:
room_nicknames[room] = nick
async def main():
logging.basicConfig(level=logging.DEBUG, format="%(levelname)-8s %(message)s")
loop = asyncio.get_running_loop()
bot = LaTeXBot(
jid, password, rooms, room_nicknames, trigger, mentions,
max_length, nickname, privileged_users,
quote_reply=quote_reply, mention_reply=mention_reply,
allow_dm=allow_dm, dm_mode=dm_mode, dm_list=dm_list,
enable_omemo=enable_omemo, omemo_store_path=omemo_store_path,
join_retry_attempts=join_retry_attempts,
join_retry_delay=join_retry_delay,
use_full_latex=use_full_latex,
default_font_size=default_font_size,
default_dpi=default_dpi,
image_host=image_host,
image_host_api_key=image_host_api_key,
loop=loop
)
bot.connect()
logging.info("LaTeX Bot connected, waiting for messages...")
await bot.disconnected
asyncio.run(main())