Files
matrix-vona/vona/globals/__init__.py
2025-10-25 23:53:17 -04:00

402 lines
8.1 KiB
Python

from resolvematrix import ServerResolver
from types import SimpleNamespace
from collections import Counter
import vona.config as config
import nacl.signing
import hashlib
import base64
import random
import httpx
import json
import re
version = "1.5.0"
def canonical_json(value):
return json.dumps(
value,
ensure_ascii=False,
separators=(",", ":"),
sort_keys=True,
).encode("UTF-8")
def sign_json(data):
parts = config.signing_key.split()
base64_key = parts[2]
while len(base64_key) % 4 != 0:
base64_key += "="
decoded_key = base64.b64decode(base64_key)
signing_key = nacl.signing.SigningKey(decoded_key)
signed_message = signing_key.sign(canonical_json(data))
signature = signed_message.signature
key_version = parts[1]
signature_base64 = base64.b64encode(signature).decode("utf-8").rstrip("=")
signed_json = {
**data,
"signatures": {
config.server_name: {
f"{parts[0]}:{key_version}": signature_base64,
},
},
}
return signed_json
def sign_json_without_discard(data):
parts = config.signing_key.split()
base64_key = parts[2]
while len(base64_key) % 4 != 0:
base64_key += "="
decoded_key = base64.b64decode(base64_key)
signing_key = nacl.signing.SigningKey(decoded_key)
unsigned_keys = {key: data[key] for key in list(data.keys()) if key == "unsigned"}
for key in unsigned_keys:
del data[key]
signed_message = signing_key.sign(canonical_json(data))
signature = signed_message.signature
key_version = parts[1]
signature_base64 = base64.b64encode(signature).decode("utf-8").rstrip("=")
new_signature = {f"ed25519:{key_version}": signature_base64}
if "signatures" in data:
data["signatures"][config.server_name] = {
**data["signatures"].get(config.server_name, {}),
**new_signature,
}
else:
data["signatures"] = {config.server_name: new_signature}
return data
def make_event_id(seed=None):
if seed is not None:
random.seed(seed)
random_bytes = bytearray(random.getrandbits(8) for _ in range(32))
event_id = "$"
event_id += re.sub(
r"[\/+=]",
"_",
base64.b64encode(random_bytes).decode("utf-8"),
).rstrip("=")[:44]
event_id += ":" + config.server_name
return event_id
def event_hash(event_object):
event_object = dict(event_object)
event_object.pop("unsigned", None)
event_object.pop("signatures", None)
event_object.pop("hashes", None)
event_json_bytes = canonical_json(event_object)
return base64.b64encode(
hashlib.sha256(event_json_bytes).digest()
).decode("utf-8").rstrip("=")
def pubkey() -> str:
private_key = config.signing_key.split()[2]
while len(private_key) % 4 != 0:
private_key += "="
public_key = nacl.signing.SigningKey(base64.b64decode(private_key)).verify_key
return (
public_key.encode(encoder=nacl.encoding.Base64Encoder)
.decode("utf-8")
.rstrip("=")
)
def make_auth_header(
destination: str,
method: str,
path: str,
content = None
) -> str:
request_json = {
"method": method,
"uri": path,
"origin": config.server_name,
"destination": destination,
}
if content is not None:
request_json["content"] = content
signed_json = sign_json(request_json)
authorization_headers = []
for key, sig in signed_json["signatures"][config.server_name].items():
authorization_headers.append(
bytes(
'X-Matrix origin="%s",destination="%s",key="%s",sig="%s"'
% (
config.server_name,
destination,
key,
sig,
),
"utf-8",
)
)
return authorization_headers[0].decode("utf-8")
def redact_event(
event: dict,
for_event_id: bool = False,
):
# Returns a redacted event as per
# the algorithm for v1/v2 rooms.
allowed_keys = [
"event_id",
"type",
"room_id",
"sender",
"state_key",
"content",
"hashes",
"depth",
"prev_events",
"prev_state",
"auth_events",
"origin",
"origin_server_ts",
"membership",
]
if not for_event_id:
allowed_keys.append("signatures")
redacted_event = {k: v for k, v in event.items() if k in allowed_keys}
if "type" in redacted_event and "content" in redacted_event:
event_type = redacted_event["type"]
content_key_rules = {
"m.room.member": ["membership"],
"m.room.create": ["creator"],
"m.room.join_rules": ["join_rule"],
"m.room.power_levels": [
"ban",
"events",
"events_default",
"kick",
"redact",
"state_default",
"users",
"users_default",
],
"m.room.aliases": ["aliases"],
"m.room.history_visibility": ["history_visibility"],
}
if event_type in content_key_rules:
allowed_content_keys = content_key_rules[event_type]
redacted_event["content"] = {
k: v
for k, v in redacted_event["content"].items()
if k in allowed_content_keys
}
else:
redacted_event["content"] = {}
return redacted_event
def hash_and_sign_event(event_object):
content_hash = event_hash(event_object)
event_object["hashes"] = {"sha256": content_hash}
stripped_object = redact_event(event_object)
signed_object = sign_json(stripped_object)
event_object["signatures"] = signed_object["signatures"]
return event_object
def make_ref_hash(
event: dict,
room_ver: int = 3,
):
stripped = redact_event(event, True)
evt_bytes = canonical_json(stripped)
evt_hash = base64.b64encode(
hashlib.sha256(evt_bytes).digest()
).decode("utf-8").rstrip("=")
if room_ver > 3:
while "+" in evt_hash:
evt_hash = evt_hash.replace("+", "-")
while "/" in evt_hash:
evt_hash = evt_hash.replace("/", "_")
return "$" + evt_hash
def room_version_from_id(room):
room_id_no_sigil = (
room
.replace("!", "")
.replace(f":{config.server_name}", "")
)
hexadecimal_room_id = bytes(room_id_no_sigil, "utf-8").hex()
versions = [str(i) for i in range(1, 10)]
if not any(ver in hexadecimal_room_id for ver in versions):
hexadecimal_room_id = "2" + hexadecimal_room_id[1:]
def remove_chars(s):
return re.sub(f"[^{''.join(versions)}]", "", s)
nums = remove_chars(hexadecimal_room_id)
def most_common_character(s):
s = s.replace(" ", "").lower()
counts = Counter(s)
most_common = counts.most_common(1)
return most_common[0] if most_common else ("2",)
return str(most_common_character(nums)[0])
room_dir = {
"chunk": [{
"avatar_url": f"mxc://{config.server_name}/cat",
"guest_can_join": False,
"join_rule": "public",
"name": "Vona",
"num_joined_members": 1,
"room_id": make_event_id().replace("$", "!"),
"room_type": "m.room",
"topic": "",
"world_readable": False,
"via": [config.server_name]
}],
"total_room_count_estimate": 1
}
class http_client:
http = httpx.Client(headers={"User-Agent": f"Vona/{version}"})
resolver = ServerResolver(client=http)
def _resolve(self, target) -> SimpleNamespace:
r = self.resolver.resolve(target)
if r.sni:
sni = r.sni
else:
sni = r.host_header
return SimpleNamespace(
base_url=r.base_url,
host_header=r.host_header,
sni=sni
)
def put(
self,
path: str,
destination: str,
headers: dict = {},
authorize: bool = True,
json: dict = {},
):
resolved = self._resolve(destination)
if authorize:
headers["Authorization"] = make_auth_header(
method="PUT",
destination=destination,
path=path,
content=json,
)
headers["Host"] = resolved.host_header
return self.http.put(
f"{resolved.base_url}{path}",
headers=headers,
extensions={"sni_hostname": resolved.sni},
json=json,
)
def get(
self,
path: str,
destination: str,
headers: dict = {},
authorize: bool = True,
):
resolved = self._resolve(destination)
if authorize:
headers["Authorization"] = make_auth_header(
method="GET",
destination=destination,
path=path,
)
headers["Host"] = resolved.host_header
return self.http.get(
f"{resolved.base_url}{path}",
headers=headers,
extensions={"sni_hostname": resolved.sni},
)
def strip_state(state_events) -> list:
if not isinstance(state_events, list):
return state_events
keys_to_remove = [
"auth_events",
"prev_events",
"signatures",
"hashes",
"depth"
]
new_list = []
for d in state_events:
if not isinstance(d, dict):
new_list.append(d)
continue
new_dict = {}
for k, v in d.items():
if k in keys_to_remove:
continue
new_dict[k] = strip_state(v)
new_list.append(new_dict)
return new_list