from resolvematrix import ServerResolver from types import SimpleNamespace from collections import Counter import vona.config as config import nacl.encoding import nacl.signing import time as ti import hashlib import base64 import random import httpx import json import copy import re version = "1.5.0" def canonical_json(value: dict | list) -> bytes: return json.dumps( value, ensure_ascii=False, separators=(",", ":"), sort_keys=True, ).encode("UTF-8") def sign_json(data: dict) -> dict: parts = config.signing_key.split() base64_key = parts[2] while len(base64_key) % 4 != 0: base64_key += "=" decoded_key = base64.b64decode(base64_key) signing_key = nacl.signing.SigningKey(decoded_key) signed_message = signing_key.sign(canonical_json(data)) signature = signed_message.signature key_version = parts[1] signature_base64 = base64.b64encode(signature).decode("utf-8").rstrip("=") signed_json = { **data, "signatures": { config.server_name: { f"{parts[0]}:{key_version}": signature_base64, }, }, } return signed_json def sign_json_without_discard(data: dict) -> dict: parts = config.signing_key.split() base64_key = parts[2] while len(base64_key) % 4 != 0: base64_key += "=" decoded_key = base64.b64decode(base64_key) signing_key = nacl.signing.SigningKey(decoded_key) unsigned_keys = {key: data[key] for key in list(data.keys()) if key == "unsigned"} for key in unsigned_keys: del data[key] signed_message = signing_key.sign(canonical_json(data)) signature = signed_message.signature key_version = parts[1] signature_base64 = base64.b64encode(signature).decode("utf-8").rstrip("=") new_signature = {f"ed25519:{key_version}": signature_base64} if "signatures" in data: data["signatures"][config.server_name] = { **data["signatures"].get(config.server_name, {}), **new_signature, } else: data["signatures"] = {config.server_name: new_signature} return data def make_event_id(seed: str | int | None = None) -> str: if seed is not None: random.seed(seed) random_bytes = bytearray(random.getrandbits(8) for _ in range(32)) event_id = "$" event_id += re.sub( r"[\/+=]", "_", base64.b64encode(random_bytes).decode("utf-8"), ).rstrip("=")[:44] event_id += ":" + config.server_name return event_id def event_hash(event_object: dict) -> str: event_object = dict(event_object) event_object.pop("unsigned", None) event_object.pop("signatures", None) event_object.pop("hashes", None) event_json_bytes = canonical_json(event_object) return base64.b64encode( hashlib.sha256(event_json_bytes).digest() ).decode("utf-8").rstrip("=") def pubkey() -> str: private_key = config.signing_key.split()[2] while len(private_key) % 4 != 0: private_key += "=" public_key = nacl.signing.SigningKey(base64.b64decode(private_key)).verify_key return ( public_key.encode(encoder=nacl.encoding.Base64Encoder) .decode("utf-8") .rstrip("=") ) def make_auth_header( destination: str, method: str, path: str, content = None ) -> str: request_json = { "method": method, "uri": path, "origin": config.server_name, "destination": destination, } if content is not None: request_json["content"] = content signed_json = sign_json(request_json) authorization_headers = [] for key, sig in signed_json["signatures"][config.server_name].items(): authorization_headers.append( bytes( 'X-Matrix origin="%s",destination="%s",key="%s",sig="%s"' % ( config.server_name, destination, key, sig, ), "utf-8", ) ) return authorization_headers[0].decode("utf-8") def redact_event( event: dict, for_event_id: bool = False, room_ver: int = 1, ) -> dict: # Returns a redacted event as per # the algorithm for v1 to v11 rooms. allowed_keys = [ "event_id", "type", "room_id", "sender", "state_key", "content", "hashes", "depth", "prev_events", "auth_events", "origin_server_ts", ] if not for_event_id: allowed_keys.append("signatures") if room_ver < 11: allowed_keys.append("origin") allowed_keys.append("membership") allowed_keys.append("prev_state") redacted_event = {k: v for k, v in event.items() if k in allowed_keys} if "type" in redacted_event and "content" in redacted_event: event_type = redacted_event["type"] content_key_rules = { "m.room.member": ["membership"], "m.room.create": ["creator"], "m.room.join_rules": ["join_rule"], "m.room.power_levels": [ "ban", "events", "events_default", "kick", "redact", "state_default", "users", "users_default", ], "m.room.aliases": ["aliases"], "m.room.history_visibility": ["history_visibility"], } if room_ver >= 6: del content_key_rules["m.room.aliases"] if room_ver >= 8: content_key_rules["m.room.join_rules"].append("allow") if room_ver >= 9: content_key_rules["m.room.member"].append("join_authorised_via_users_server") if room_ver >= 11: content_key_rules["m.room.redaction"] = ["redacts"] del content_key_rules["m.room.create"] # All keys will be permitted if event_type in content_key_rules: allowed_content_keys = content_key_rules[event_type] if ( room_ver >= 11 and "third_party_invite" in redacted_event and "signed" in redacted_event["third_party_invite"] ): third_party_invite_signature = copy.deepcopy(redacted_event["third_party_invite"]["signed"]) else: third_party_invite_signature = None redacted_event["content"] = { k: v for k, v in redacted_event["content"].items() if k in allowed_content_keys } if third_party_invite_signature: redacted_event["content"]["third_party_invite"] = { "signed": third_party_invite_signature } else: if room_ver >= 11 and event_type == "m.room.create": pass else: redacted_event["content"] = {} return redacted_event def hash_and_sign_event( event_object: dict, room_ver: int = 1, ) -> dict: content_hash = event_hash(event_object) event_object["hashes"] = {"sha256": content_hash} stripped_object = redact_event( event=event_object, for_event_id=False, room_ver=room_ver, ) signed_object = sign_json(stripped_object) event_object["signatures"] = signed_object["signatures"] return event_object def make_ref_hash( event: dict, room_ver: int = 3, ) -> str: stripped = redact_event( event=event, for_event_id=True, room_ver=room_ver, ) evt_bytes = canonical_json(stripped) evt_hash = base64.b64encode( hashlib.sha256(evt_bytes).digest() ).decode("utf-8").rstrip("=") if room_ver > 3: while "+" in evt_hash: evt_hash = evt_hash.replace("+", "-") while "/" in evt_hash: evt_hash = evt_hash.replace("/", "_") return "$" + evt_hash def room_version_from_id(room) -> str: room_id_no_sigil = ( room .replace("!", "") .replace(f":{config.server_name}", "") ) hexadecimal_room_id = bytes(room_id_no_sigil, "utf-8").hex() versions = [str(i) for i in range(0, 10)] versions.append("a") if not any(ver in hexadecimal_room_id for ver in versions): hexadecimal_room_id = "2" + hexadecimal_room_id[1:] def remove_chars(s): return re.sub(f"[^{''.join(versions)}]", "", s) nums = remove_chars(hexadecimal_room_id) def most_common_character(s): s = s.replace(" ", "").lower() counts = Counter(s) most_common = counts.most_common(1) actual_most_common = most_common[0] if most_common else ("4",) if actual_most_common[0] == "0": return ("10",) elif actual_most_common[0] == "a": return ("11",) return actual_most_common return str(most_common_character(nums)[0]) room_dir = { "chunk": [{ "avatar_url": f"mxc://{config.server_name}/cat", "guest_can_join": False, "join_rule": "public", "name": "Vona", "num_joined_members": 1, "room_id": make_event_id().replace("$", "!"), "room_type": "m.room", "topic": "", "world_readable": False, "via": [config.server_name] }], "total_room_count_estimate": 1 } class http_client: http = httpx.Client(headers={"User-Agent": f"Vona/{version}"}) resolver = ServerResolver(client=http) def _resolve(self, target) -> SimpleNamespace: r = self.resolver.resolve(target) if r.sni: sni = r.sni else: sni = r.host_header return SimpleNamespace( base_url=r.base_url, host_header=r.host_header, sni=sni ) def put( self, path: str, destination: str, headers: dict = {}, authorize: bool = True, json: dict = {}, ): resolved = self._resolve(destination) if authorize: headers["Authorization"] = make_auth_header( method="PUT", destination=destination, path=path, content=json, ) headers["Host"] = resolved.host_header return self.http.put( f"{resolved.base_url}{path}", headers=headers, extensions={"sni_hostname": resolved.sni}, json=json, ) def get( self, path: str, destination: str, headers: dict = {}, authorize: bool = True, ): resolved = self._resolve(destination) if authorize: headers["Authorization"] = make_auth_header( method="GET", destination=destination, path=path, ) headers["Host"] = resolved.host_header return self.http.get( f"{resolved.base_url}{path}", headers=headers, extensions={"sni_hostname": resolved.sni}, ) def strip_state(state_events) -> list: if not isinstance(state_events, list): return state_events keys_to_remove = [ "auth_events", "prev_events", "signatures", "hashes", "depth" ] new_list = [] for d in state_events: if not isinstance(d, dict): new_list.append(d) continue if "room_id" in d: ver = int(room_version_from_id(d["room_id"])) else: ver = 4 event_id = make_ref_hash(d, ver) new_dict = {} for k, v in d.items(): if k in keys_to_remove: continue new_dict[k] = strip_state(v) new_dict["event_id"] = event_id new_list.append(new_dict) return new_list def time() -> int: return int(str(ti.time() * 1000).split(".")[0])