Files
matrix-vona/vona/globals/__init__.py

469 lines
9.6 KiB
Python

from resolvematrix import ServerResolver
from types import SimpleNamespace
from collections import Counter
import vona.config as config
import nacl.signing
import time as ti
import hashlib
import base64
import random
import httpx
import json
import copy
import re
version = "1.5.0"
def canonical_json(value):
return json.dumps(
value,
ensure_ascii=False,
separators=(",", ":"),
sort_keys=True,
).encode("UTF-8")
def sign_json(data):
parts = config.signing_key.split()
base64_key = parts[2]
while len(base64_key) % 4 != 0:
base64_key += "="
decoded_key = base64.b64decode(base64_key)
signing_key = nacl.signing.SigningKey(decoded_key)
signed_message = signing_key.sign(canonical_json(data))
signature = signed_message.signature
key_version = parts[1]
signature_base64 = base64.b64encode(signature).decode("utf-8").rstrip("=")
signed_json = {
**data,
"signatures": {
config.server_name: {
f"{parts[0]}:{key_version}": signature_base64,
},
},
}
return signed_json
def sign_json_without_discard(data):
parts = config.signing_key.split()
base64_key = parts[2]
while len(base64_key) % 4 != 0:
base64_key += "="
decoded_key = base64.b64decode(base64_key)
signing_key = nacl.signing.SigningKey(decoded_key)
unsigned_keys = {key: data[key] for key in list(data.keys()) if key == "unsigned"}
for key in unsigned_keys:
del data[key]
signed_message = signing_key.sign(canonical_json(data))
signature = signed_message.signature
key_version = parts[1]
signature_base64 = base64.b64encode(signature).decode("utf-8").rstrip("=")
new_signature = {f"ed25519:{key_version}": signature_base64}
if "signatures" in data:
data["signatures"][config.server_name] = {
**data["signatures"].get(config.server_name, {}),
**new_signature,
}
else:
data["signatures"] = {config.server_name: new_signature}
return data
def make_event_id(seed=None):
if seed is not None:
random.seed(seed)
random_bytes = bytearray(random.getrandbits(8) for _ in range(32))
event_id = "$"
event_id += re.sub(
r"[\/+=]",
"_",
base64.b64encode(random_bytes).decode("utf-8"),
).rstrip("=")[:44]
event_id += ":" + config.server_name
return event_id
def event_hash(event_object):
event_object = dict(event_object)
event_object.pop("unsigned", None)
event_object.pop("signatures", None)
event_object.pop("hashes", None)
event_json_bytes = canonical_json(event_object)
return base64.b64encode(
hashlib.sha256(event_json_bytes).digest()
).decode("utf-8").rstrip("=")
def pubkey() -> str:
private_key = config.signing_key.split()[2]
while len(private_key) % 4 != 0:
private_key += "="
public_key = nacl.signing.SigningKey(base64.b64decode(private_key)).verify_key
return (
public_key.encode(encoder=nacl.encoding.Base64Encoder)
.decode("utf-8")
.rstrip("=")
)
def make_auth_header(
destination: str,
method: str,
path: str,
content = None
) -> str:
request_json = {
"method": method,
"uri": path,
"origin": config.server_name,
"destination": destination,
}
if content is not None:
request_json["content"] = content
signed_json = sign_json(request_json)
authorization_headers = []
for key, sig in signed_json["signatures"][config.server_name].items():
authorization_headers.append(
bytes(
'X-Matrix origin="%s",destination="%s",key="%s",sig="%s"'
% (
config.server_name,
destination,
key,
sig,
),
"utf-8",
)
)
return authorization_headers[0].decode("utf-8")
def redact_event(
event: dict,
for_event_id: bool = False,
room_ver: int = 1,
):
# Returns a redacted event as per
# the algorithm for v1 to v11 rooms.
allowed_keys = [
"event_id",
"type",
"room_id",
"sender",
"state_key",
"content",
"hashes",
"depth",
"prev_events",
"auth_events",
"origin_server_ts",
]
if not for_event_id:
allowed_keys.append("signatures")
if room_ver < 11:
allowed_keys.append("origin")
allowed_keys.append("membership")
allowed_keys.append("prev_state")
redacted_event = {k: v for k, v in event.items() if k in allowed_keys}
if "type" in redacted_event and "content" in redacted_event:
event_type = redacted_event["type"]
content_key_rules = {
"m.room.member": ["membership"],
"m.room.create": ["creator"],
"m.room.join_rules": ["join_rule"],
"m.room.power_levels": [
"ban",
"events",
"events_default",
"kick",
"redact",
"state_default",
"users",
"users_default",
],
"m.room.aliases": ["aliases"],
"m.room.history_visibility": ["history_visibility"],
}
if room_ver >= 6:
del content_key_rules["m.room.aliases"]
if room_ver >= 8:
content_key_rules["m.room.join_rules"].append("allow")
if room_ver >= 9:
content_key_rules["m.room.member"].append("join_authorised_via_users_server")
if room_ver >= 11:
content_key_rules["m.room.redaction"] = ["redacts"]
del content_key_rules["m.room.create"] # All keys will be permitted
if event_type in content_key_rules:
allowed_content_keys = content_key_rules[event_type]
if (
room_ver >= 11
and "third_party_invite" in redacted_event
and "signed" in redacted_event["third_party_invite"]
):
third_party_invite_signature = copy.deepcopy(redacted_event["third_party_invite"]["signed"])
else:
third_party_invite_signature = None
redacted_event["content"] = {
k: v
for k, v in redacted_event["content"].items()
if k in allowed_content_keys
}
if third_party_invite_signature:
redacted_event["content"]["third_party_invite"] = {
"signed": third_party_invite_signature
}
else:
if room_ver >= 11 and event_type == "m.room.create":
pass
else:
redacted_event["content"] = {}
return redacted_event
def hash_and_sign_event(
event_object: dict,
room_ver: int = 1,
):
content_hash = event_hash(event_object)
event_object["hashes"] = {"sha256": content_hash}
stripped_object = redact_event(
event=event_object,
for_event_id=False,
room_ver=room_ver,
)
signed_object = sign_json(stripped_object)
event_object["signatures"] = signed_object["signatures"]
return event_object
def make_ref_hash(
event: dict,
room_ver: int = 3,
):
stripped = redact_event(
event=event,
for_event_id=True,
room_ver=room_ver,
)
evt_bytes = canonical_json(stripped)
evt_hash = base64.b64encode(
hashlib.sha256(evt_bytes).digest()
).decode("utf-8").rstrip("=")
if room_ver > 3:
while "+" in evt_hash:
evt_hash = evt_hash.replace("+", "-")
while "/" in evt_hash:
evt_hash = evt_hash.replace("/", "_")
return "$" + evt_hash
def room_version_from_id(room):
room_id_no_sigil = (
room
.replace("!", "")
.replace(f":{config.server_name}", "")
)
hexadecimal_room_id = bytes(room_id_no_sigil, "utf-8").hex()
versions = [str(i) for i in range(0, 10)]
versions.append("a")
if not any(ver in hexadecimal_room_id for ver in versions):
hexadecimal_room_id = "2" + hexadecimal_room_id[1:]
def remove_chars(s):
return re.sub(f"[^{''.join(versions)}]", "", s)
nums = remove_chars(hexadecimal_room_id)
def most_common_character(s):
s = s.replace(" ", "").lower()
counts = Counter(s)
most_common = counts.most_common(1)
actual_most_common = most_common[0] if most_common else ("4",)
if actual_most_common[0] == "0":
return ("10",)
elif actual_most_common[0] == "a":
return ("11",)
return actual_most_common
return str(most_common_character(nums)[0])
room_dir = {
"chunk": [{
"avatar_url": f"mxc://{config.server_name}/cat",
"guest_can_join": False,
"join_rule": "public",
"name": "Vona",
"num_joined_members": 1,
"room_id": make_event_id().replace("$", "!"),
"room_type": "m.room",
"topic": "",
"world_readable": False,
"via": [config.server_name]
}],
"total_room_count_estimate": 1
}
class http_client:
http = httpx.Client(headers={"User-Agent": f"Vona/{version}"})
resolver = ServerResolver(client=http)
def _resolve(self, target) -> SimpleNamespace:
r = self.resolver.resolve(target)
if r.sni:
sni = r.sni
else:
sni = r.host_header
return SimpleNamespace(
base_url=r.base_url,
host_header=r.host_header,
sni=sni
)
def put(
self,
path: str,
destination: str,
headers: dict = {},
authorize: bool = True,
json: dict = {},
):
resolved = self._resolve(destination)
if authorize:
headers["Authorization"] = make_auth_header(
method="PUT",
destination=destination,
path=path,
content=json,
)
headers["Host"] = resolved.host_header
return self.http.put(
f"{resolved.base_url}{path}",
headers=headers,
extensions={"sni_hostname": resolved.sni},
json=json,
)
def get(
self,
path: str,
destination: str,
headers: dict = {},
authorize: bool = True,
):
resolved = self._resolve(destination)
if authorize:
headers["Authorization"] = make_auth_header(
method="GET",
destination=destination,
path=path,
)
headers["Host"] = resolved.host_header
return self.http.get(
f"{resolved.base_url}{path}",
headers=headers,
extensions={"sni_hostname": resolved.sni},
)
def strip_state(state_events) -> list:
if not isinstance(state_events, list):
return state_events
keys_to_remove = [
"auth_events",
"prev_events",
"signatures",
"hashes",
"depth"
]
new_list = []
for d in state_events:
if not isinstance(d, dict):
new_list.append(d)
continue
if "room_id" in d:
ver = int(room_version_from_id(d["room_id"]))
else:
ver = 4
event_id = make_ref_hash(d, ver)
new_dict = {}
for k, v in d.items():
if k in keys_to_remove:
continue
new_dict[k] = strip_state(v)
new_dict["event_id"] = event_id
new_list.append(new_dict)
return new_list
def time() -> int:
return int(str(ti.time() * 1000).split(".")[0])