Files
matrix-vona/vona/globals/__init__.py

381 lines
7.6 KiB
Python

from resolvematrix import ServerResolver
from types import SimpleNamespace
from collections import Counter
import vona.config as config
import nacl.signing
import hashlib
import base64
import random
import httpx
import copy
import json
import re
version = "1.4.4"
def canonical_json(value):
return json.dumps(
value,
ensure_ascii=False,
separators=(",", ":"),
sort_keys=True,
).encode("UTF-8")
def sign_json(data):
parts = config.signing_key.split()
base64_key = parts[2]
while len(base64_key) % 4 != 0:
base64_key += "="
decoded_key = base64.b64decode(base64_key)
signing_key = nacl.signing.SigningKey(decoded_key)
signed_message = signing_key.sign(canonical_json(data))
signature = signed_message.signature
key_version = parts[1]
signature_base64 = base64.b64encode(signature).decode("utf-8").rstrip("=")
signed_json = {
**data,
"signatures": {
config.server_name: {
f"{parts[0]}:{key_version}": signature_base64,
},
},
}
return signed_json
def sign_json_without_discard(data):
parts = config.signing_key.split()
base64_key = parts[2]
while len(base64_key) % 4 != 0:
base64_key += "="
decoded_key = base64.b64decode(base64_key)
signing_key = nacl.signing.SigningKey(decoded_key)
unsigned_keys = {key: data[key] for key in list(data.keys()) if key == "unsigned"}
for key in unsigned_keys:
del data[key]
signed_message = signing_key.sign(canonical_json(data))
signature = signed_message.signature
key_version = parts[1]
signature_base64 = base64.b64encode(signature).decode("utf-8").rstrip("=")
new_signature = {f"ed25519:{key_version}": signature_base64}
if "signatures" in data:
data["signatures"][config.server_name] = {
**data["signatures"].get(config.server_name, {}),
**new_signature,
}
else:
data["signatures"] = {config.server_name: new_signature}
return data
def make_event_id(seed=None):
if seed is not None:
random.seed(seed)
random_bytes = bytearray(random.getrandbits(8) for _ in range(32))
event_id = "$"
event_id += re.sub(
r"[\/+=]",
"_",
base64.b64encode(random_bytes).decode("utf-8"),
).rstrip("=")[:44]
event_id += ":" + config.server_name
return event_id
def event_hash(event_object):
event_object = dict(event_object)
event_object.pop("unsigned", None)
event_object.pop("signatures", None)
event_object.pop("hashes", None)
event_json_bytes = canonical_json(event_object)
return base64.b64encode(
hashlib.sha256(event_json_bytes).digest()
).decode("utf-8").rstrip("=")
def pubkey() -> str:
private_key = config.signing_key.split()[2]
while len(private_key) % 4 != 0:
private_key += "="
public_key = nacl.signing.SigningKey(base64.b64decode(private_key)).verify_key
return (
public_key.encode(encoder=nacl.encoding.Base64Encoder)
.decode("utf-8")
.rstrip("=")
)
def make_auth_header(
destination: str,
method: str,
path: str,
content = None
) -> str:
request_json = {
"method": method,
"uri": path,
"origin": config.server_name,
"destination": destination,
}
if content is not None:
request_json["content"] = content
signed_json = sign_json(request_json)
authorization_headers = []
for key, sig in signed_json["signatures"][config.server_name].items():
authorization_headers.append(
bytes(
'X-Matrix origin="%s",destination="%s",key="%s",sig="%s"'
% (
config.server_name,
destination,
key,
sig,
),
"utf-8",
)
)
return authorization_headers[0].decode("utf-8")
def redact_event(event):
# Returns a redacted event as per
# the algorithm for v1/v2 rooms.
allowed_keys = [
"event_id",
"type",
"room_id",
"sender",
"state_key",
"content",
"hashes",
"signatures",
"depth",
"prev_events",
"prev_state",
"auth_events",
"origin",
"origin_server_ts",
"membership",
]
redacted_event = {k: v for k, v in event.items() if k in allowed_keys}
if "type" in redacted_event and "content" in redacted_event:
event_type = redacted_event["type"]
content_key_rules = {
"m.room.member": ["membership"],
"m.room.create": ["creator"],
"m.room.join_rules": ["join_rule"],
"m.room.power_levels": [
"ban",
"events",
"events_default",
"kick",
"redact",
"state_default",
"users",
"users_default",
],
"m.room.aliases": ["aliases"],
"m.room.history_visibility": ["history_visibility"],
}
if event_type in content_key_rules:
allowed_content_keys = content_key_rules[event_type]
redacted_event["content"] = {
k: v
for k, v in redacted_event["content"].items()
if k in allowed_content_keys
}
else:
redacted_event["content"] = {}
return redacted_event
def hash_and_sign_event(event_object):
content_hash = event_hash(event_object)
event_object["hashes"] = {"sha256": content_hash}
stripped_object = redact_event(event_object)
signed_object = sign_json(stripped_object)
event_object["signatures"] = signed_object["signatures"]
return event_object
def room_version_from_id(room_id):
room_id_no_sigil = room_id.replace("!", "")
hexadecimal_room_id = bytes(room_id_no_sigil, "utf-8").hex()
if "1" not in hexadecimal_room_id and "2" not in hexadecimal_room_id:
# NOTE: v2 if impossible from room ID alone
hexadecimal_room_id = "2" + hexadecimal_room_id[1:]
def remove_chars(s):
return re.sub("[^12]", "", s)
nums = remove_chars(hexadecimal_room_id)
def most_common_character(s):
s = s.replace(" ", "").lower()
counts = Counter(s)
most_common = counts.most_common(1)
return most_common[0] if most_common else None
return most_common_character(nums)[0]
room_dir = {
"chunk": [{
"avatar_url": f"mxc://{config.server_name}/cat",
"guest_can_join": False,
"join_rule": "public",
"name": "Vona",
"num_joined_members": 1,
"room_id": make_event_id().replace("$", "!"),
"room_type": "m.room",
"topic": "",
"world_readable": False,
"via": [config.server_name]
}],
"total_room_count_estimate": 1
}
class http_client:
http = httpx.Client(headers={"User-Agent": f"Vona/{version}"})
resolver = ServerResolver(client=http)
def _resolve(self, target) -> SimpleNamespace:
r = self.resolver.resolve(target)
if r.sni:
sni = r.sni
else:
sni = r.host_header
return SimpleNamespace(
base_url=r.base_url,
host_header=r.host_header,
sni=sni
)
def put(
self,
path: str,
destination: str,
headers: dict = {},
authorize: bool = True,
json: dict = {},
verify: bool = True,
):
resolved = self._resolve(destination)
if authorize:
headers["Authorization"] = make_auth_header(
method="PUT",
destination=destination,
path=path,
content=json,
)
headers["Host"] = resolved.host_header
return self.http.put(
f"{resolved.base_url}{path}",
headers=headers,
extensions={"sni_hostname": resolved.sni},
json=json,
verify=verify,
)
def get(
self,
path: str,
destination: str,
headers: dict = {},
authorize: bool = True,
verify: bool = True,
):
resolved = self._resolve(destination)
if authorize:
headers["Authorization"] = make_auth_header(
method="GET",
destination=destination,
path=path,
)
headers["Host"] = resolved.host_header
return self.http.get(
f"{resolved.base_url}{path}",
headers=headers,
extensions={"sni_hostname": resolved.sni},
verify=verify,
)
def strip_state(l) -> dict:
if not isinstance(l, list):
return l
keys_to_remove = [
"auth_events",
"prev_events",
"signatures",
"hashes",
"depth"
]
new_list = []
for d in l:
if not isinstance(d, dict):
new_list.append(d)
continue
new_dict = {}
for k, v in d.items():
if k in keys_to_remove:
continue
new_dict[k] = strip_state(v)
new_list.append(new_dict)
return new_list