from resolvematrix import ServerResolver from types import SimpleNamespace from collections import Counter import vona.config as config import nacl.signing import hashlib import base64 import random import httpx import copy import json import re version = "1.4.4" def canonical_json(value): return json.dumps( value, ensure_ascii=False, separators=(",", ":"), sort_keys=True, ).encode("UTF-8") def sign_json(data): parts = config.signing_key.split() base64_key = parts[2] while len(base64_key) % 4 != 0: base64_key += "=" decoded_key = base64.b64decode(base64_key) signing_key = nacl.signing.SigningKey(decoded_key) signed_message = signing_key.sign(canonical_json(data)) signature = signed_message.signature key_version = parts[1] signature_base64 = base64.b64encode(signature).decode("utf-8").rstrip("=") signed_json = { **data, "signatures": { config.server_name: { f"{parts[0]}:{key_version}": signature_base64, }, }, } return signed_json def sign_json_without_discard(data): parts = config.signing_key.split() base64_key = parts[2] while len(base64_key) % 4 != 0: base64_key += "=" decoded_key = base64.b64decode(base64_key) signing_key = nacl.signing.SigningKey(decoded_key) unsigned_keys = {key: data[key] for key in list(data.keys()) if key == "unsigned"} for key in unsigned_keys: del data[key] signed_message = signing_key.sign(canonical_json(data)) signature = signed_message.signature key_version = parts[1] signature_base64 = base64.b64encode(signature).decode("utf-8").rstrip("=") new_signature = {f"ed25519:{key_version}": signature_base64} if "signatures" in data: data["signatures"][config.server_name] = { **data["signatures"].get(config.server_name, {}), **new_signature, } else: data["signatures"] = {config.server_name: new_signature} return data def make_event_id(seed=None): if seed is not None: random.seed(seed) random_bytes = bytearray(random.getrandbits(8) for _ in range(32)) event_id = "$" event_id += re.sub( r"[\/+=]", "_", base64.b64encode(random_bytes).decode("utf-8"), ).rstrip("=")[:44] event_id += ":" + config.server_name return event_id def event_hash(event_object): event_object = dict(event_object) event_object.pop("unsigned", None) event_object.pop("signatures", None) event_object.pop("hashes", None) event_json_bytes = canonical_json(event_object) return base64.b64encode( hashlib.sha256(event_json_bytes).digest() ).decode("utf-8").rstrip("=") def pubkey() -> str: private_key = config.signing_key.split()[2] while len(private_key) % 4 != 0: private_key += "=" public_key = nacl.signing.SigningKey(base64.b64decode(private_key)).verify_key return ( public_key.encode(encoder=nacl.encoding.Base64Encoder) .decode("utf-8") .rstrip("=") ) def make_auth_header( destination: str, method: str, path: str, content = None ) -> str: request_json = { "method": method, "uri": path, "origin": config.server_name, "destination": destination, } if content is not None: request_json["content"] = content signed_json = sign_json(request_json) authorization_headers = [] for key, sig in signed_json["signatures"][config.server_name].items(): authorization_headers.append( bytes( 'X-Matrix origin="%s",destination="%s",key="%s",sig="%s"' % ( config.server_name, destination, key, sig, ), "utf-8", ) ) return authorization_headers[0].decode("utf-8") def redact_event(event): # Returns a redacted event as per # the algorithm for v1/v2 rooms. allowed_keys = [ "event_id", "type", "room_id", "sender", "state_key", "content", "hashes", "signatures", "depth", "prev_events", "prev_state", "auth_events", "origin", "origin_server_ts", "membership", ] redacted_event = {k: v for k, v in event.items() if k in allowed_keys} if "type" in redacted_event and "content" in redacted_event: event_type = redacted_event["type"] content_key_rules = { "m.room.member": ["membership"], "m.room.create": ["creator"], "m.room.join_rules": ["join_rule"], "m.room.power_levels": [ "ban", "events", "events_default", "kick", "redact", "state_default", "users", "users_default", ], "m.room.aliases": ["aliases"], "m.room.history_visibility": ["history_visibility"], } if event_type in content_key_rules: allowed_content_keys = content_key_rules[event_type] redacted_event["content"] = { k: v for k, v in redacted_event["content"].items() if k in allowed_content_keys } else: redacted_event["content"] = {} return redacted_event def hash_and_sign_event(event_object): content_hash = event_hash(event_object) event_object["hashes"] = {"sha256": content_hash} stripped_object = redact_event(event_object) signed_object = sign_json(stripped_object) event_object["signatures"] = signed_object["signatures"] return event_object def room_version_from_id(room): room_id_no_sigil = room.replace("!", "") hexadecimal_room_id = bytes(room_id_no_sigil, "utf-8").hex() if "1" not in hexadecimal_room_id and "2" not in hexadecimal_room_id: hexadecimal_room_id = "1" + hexadecimal_room_id[1:] def remove_chars(s): return re.sub("[^12]", "", s) nums = remove_chars(hexadecimal_room_id) def most_common_character(s): s = s.replace(" ", "").lower() counts = Counter(s) most_common = counts.most_common(1) return most_common[0] if most_common else None return most_common_character(nums)[0] room_dir = { "chunk": [{ "avatar_url": f"mxc://{config.server_name}/cat", "guest_can_join": False, "join_rule": "public", "name": "Vona", "num_joined_members": 1, "room_id": make_event_id().replace("$", "!"), "room_type": "m.room", "topic": "", "world_readable": False, "via": [config.server_name] }], "total_room_count_estimate": 1 } class http_client: http = httpx.Client(headers={"User-Agent": f"Vona/{version}"}) resolver = ServerResolver(client=http) def _resolve(self, target) -> SimpleNamespace: r = self.resolver.resolve(target) if r.sni: sni = r.sni else: sni = r.host_header return SimpleNamespace( base_url=r.base_url, host_header=r.host_header, sni=sni ) def put( self, path: str, destination: str, headers: dict = {}, authorize: bool = True, json: dict = {}, ): resolved = self._resolve(destination) if authorize: headers["Authorization"] = make_auth_header( method="PUT", destination=destination, path=path, content=json, ) headers["Host"] = resolved.host_header return self.http.put( f"{resolved.base_url}{path}", headers=headers, extensions={"sni_hostname": resolved.sni}, json=json, ) def get( self, path: str, destination: str, headers: dict = {}, authorize: bool = True, ): resolved = self._resolve(destination) if authorize: headers["Authorization"] = make_auth_header( method="GET", destination=destination, path=path, ) headers["Host"] = resolved.host_header return self.http.get( f"{resolved.base_url}{path}", headers=headers, extensions={"sni_hostname": resolved.sni}, ) def strip_state(l) -> dict: if not isinstance(l, list): return l keys_to_remove = [ "auth_events", "prev_events", "signatures", "hashes", "depth" ] new_list = [] for d in l: if not isinstance(d, dict): new_list.append(d) continue new_dict = {} for k, v in d.items(): if k in keys_to_remove: continue new_dict[k] = strip_state(v) new_list.append(new_dict) return new_list