from collections import Counter import vona.config as config import nacl.signing import hashlib import base64 import random import httpx import copy import json import re version = "1.4.3" http_client = httpx.Client(headers={"User-Agent": f"Vona/{version}"}) def canonical_json(value): return json.dumps( value, ensure_ascii=False, separators=(",", ":"), sort_keys=True, ).encode("UTF-8") def sign_json(data): parts = config.signing_key.split() base64_key = parts[2] while len(base64_key) % 4 != 0: base64_key += "=" decoded_key = base64.b64decode(base64_key) signing_key = nacl.signing.SigningKey(decoded_key) signed_message = signing_key.sign(canonical_json(data)) signature = signed_message.signature key_version = parts[1] signature_base64 = base64.b64encode(signature).decode("utf-8").rstrip("=") signed_json = { **data, "signatures": { config.server_name: { f"{parts[0]}:{key_version}": signature_base64, }, }, } return signed_json def sign_json_without_discard(data): parts = config.signing_key.split() base64_key = parts[2] while len(base64_key) % 4 != 0: base64_key += "=" decoded_key = base64.b64decode(base64_key) signing_key = nacl.signing.SigningKey(decoded_key) unsigned_keys = {key: data[key] for key in list(data.keys()) if key == "unsigned"} for key in unsigned_keys: del data[key] signed_message = signing_key.sign(canonical_json(data)) signature = signed_message.signature key_version = parts[1] signature_base64 = base64.b64encode(signature).decode("utf-8").rstrip("=") new_signature = {f"ed25519:{key_version}": signature_base64} if "signatures" in data: data["signatures"][config.server_name] = { **data["signatures"].get(config.server_name, {}), **new_signature, } else: data["signatures"] = {config.server_name: new_signature} return data def make_event_id(seed=None): if seed is not None: random.seed(seed) random_bytes = bytearray(random.getrandbits(8) for _ in range(32)) event_id = "$" event_id += re.sub( r"[\/+=]", "_", base64.b64encode(random_bytes).decode("utf-8"), ).rstrip("=")[:44] event_id += ":" + config.server_name return event_id def event_hash(event_object): event_object = dict(event_object) event_object.pop("unsigned", None) event_object.pop("signatures", None) event_object.pop("hashes", None) event_json_bytes = canonical_json(event_object) return base64.b64encode( hashlib.sha256(event_json_bytes).digest() ).decode("utf-8").rstrip("=") def pubkey() -> str: private_key = config.signing_key.split()[2] while len(private_key) % 4 != 0: private_key += "=" public_key = nacl.signing.SigningKey(base64.b64decode(private_key)).verify_key return ( public_key.encode(encoder=nacl.encoding.Base64Encoder) .decode("utf-8") .rstrip("=") ) def make_auth_header(destination, method, path, content=None) -> str: request_json = { "method": method, "uri": path, "origin": config.server_name, "destination": destination, } if content is not None: request_json["content"] = content signed_json = sign_json(request_json) authorization_headers = [] for key, sig in signed_json["signatures"][config.server_name].items(): authorization_headers.append( bytes( 'X-Matrix origin="%s",destination="%s",key="%s",sig="%s"' % ( config.server_name, destination, key, sig, ), "utf-8", ) ) return authorization_headers[0].decode("utf-8") def redact_event(event): # Returns a redacted event as per # the algorithm for v1/v2 rooms. allowed_keys = [ "event_id", "type", "room_id", "sender", "state_key", "content", "hashes", "signatures", "depth", "prev_events", "prev_state", "auth_events", "origin", "origin_server_ts", "membership", ] redacted_event = {k: v for k, v in event.items() if k in allowed_keys} if "type" in redacted_event and "content" in redacted_event: event_type = redacted_event["type"] content_key_rules = { "m.room.member": ["membership"], "m.room.create": ["creator"], "m.room.join_rules": ["join_rule"], "m.room.power_levels": [ "ban", "events", "events_default", "kick", "redact", "state_default", "users", "users_default", ], "m.room.aliases": ["aliases"], "m.room.history_visibility": ["history_visibility"], } if event_type in content_key_rules: allowed_content_keys = content_key_rules[event_type] redacted_event["content"] = { k: v for k, v in redacted_event["content"].items() if k in allowed_content_keys } else: redacted_event["content"] = {} return redacted_event def hash_and_sign_event(event_object): content_hash = event_hash(event_object) event_object["hashes"] = {"sha256": content_hash} stripped_object = redact_event(event_object) signed_object = sign_json(stripped_object) event_object["signatures"] = signed_object["signatures"] return event_object def room_version_from_id(room_id): room_id_no_sigil = room_id.replace("!", "") hexadecimal_room_id = bytes(room_id_no_sigil, "utf-8").hex() if "1" not in hexadecimal_room_id and "2" not in hexadecimal_room_id: # NOTE: v2 if impossible from room ID alone hexadecimal_room_id = "2" + hexadecimal_room_id[1:] def remove_chars(s): return re.sub("[^12]", "", s) nums = remove_chars(hexadecimal_room_id) def most_common_character(s): s = s.replace(" ", "").lower() counts = Counter(s) most_common = counts.most_common(1) return most_common[0] if most_common else None return most_common_character(nums)[0] room_dir = { "chunk": [{ "avatar_url": f"mxc://{config.server_name}/cat", "guest_can_join": False, "join_rule": "public", "name": "Vona", "num_joined_members": 1, "room_id": make_event_id().replace("$", "!"), "room_type": "m.room", "topic": "", "world_readable": False, "via": [config.server_name] }], "total_room_count_estimate": 1 }