Files
matrix-vona/vona/globals/__init__.py

257 lines
5.5 KiB
Python

from collections import Counter
import vona.config as config
import nacl.signing
import hashlib
import base64
import random
import httpx
import copy
import json
import re
version = "1.4.3"
http_client = httpx.Client(headers={"User-Agent": f"Vona/{version}"})
def canonical_json(value):
return json.dumps(
value,
ensure_ascii=False,
separators=(",", ":"),
sort_keys=True,
).encode("UTF-8")
def sign_json(data):
parts = config.signing_key.split()
base64_key = parts[2]
while len(base64_key) % 4 != 0:
base64_key += "="
decoded_key = base64.b64decode(base64_key)
signing_key = nacl.signing.SigningKey(decoded_key)
signed_message = signing_key.sign(canonical_json(data))
signature = signed_message.signature
key_version = parts[1]
signature_base64 = base64.b64encode(signature).decode("utf-8").rstrip("=")
signed_json = {
**data,
"signatures": {
config.server_name: {
f"{parts[0]}:{key_version}": signature_base64,
},
},
}
return signed_json
def sign_json_without_discard(data):
parts = config.signing_key.split()
base64_key = parts[2]
while len(base64_key) % 4 != 0:
base64_key += "="
decoded_key = base64.b64decode(base64_key)
signing_key = nacl.signing.SigningKey(decoded_key)
unsigned_keys = {key: data[key] for key in list(data.keys()) if key == "unsigned"}
for key in unsigned_keys:
del data[key]
signed_message = signing_key.sign(canonical_json(data))
signature = signed_message.signature
key_version = parts[1]
signature_base64 = base64.b64encode(signature).decode("utf-8").rstrip("=")
new_signature = {f"ed25519:{key_version}": signature_base64}
if "signatures" in data:
data["signatures"][config.server_name] = {
**data["signatures"].get(config.server_name, {}),
**new_signature,
}
else:
data["signatures"] = {config.server_name: new_signature}
return data
def make_event_id(seed=None):
if seed is not None:
random.seed(seed)
random_bytes = bytearray(random.getrandbits(8) for _ in range(32))
event_id = "$"
event_id += re.sub(
r"[\/+=]",
"_",
base64.b64encode(random_bytes).decode("utf-8"),
).rstrip("=")[:44]
event_id += ":" + config.server_name
return event_id
def event_hash(event_object):
event_object = dict(event_object)
event_object.pop("unsigned", None)
event_object.pop("signatures", None)
event_object.pop("hashes", None)
event_json_bytes = canonical_json(event_object)
return base64.b64encode(
hashlib.sha256(event_json_bytes).digest()
).decode("utf-8").rstrip("=")
def pubkey() -> str:
private_key = config.signing_key.split()[2]
while len(private_key) % 4 != 0:
private_key += "="
public_key = nacl.signing.SigningKey(base64.b64decode(private_key)).verify_key
return (
public_key.encode(encoder=nacl.encoding.Base64Encoder)
.decode("utf-8")
.rstrip("=")
)
def make_auth_header(destination, method, path, content=None) -> str:
request_json = {
"method": method,
"uri": path,
"origin": config.server_name,
"destination": destination,
}
if content is not None:
request_json["content"] = content
signed_json = sign_json(request_json)
authorization_headers = []
for key, sig in signed_json["signatures"][config.server_name].items():
authorization_headers.append(
bytes(
'X-Matrix origin="%s",destination="%s",key="%s",sig="%s"'
% (
config.server_name,
destination,
key,
sig,
),
"utf-8",
)
)
return authorization_headers[0].decode("utf-8")
def redact_event(event):
# Returns a redacted event as per
# the algorithm for v1/v2 rooms.
allowed_keys = [
"event_id",
"type",
"room_id",
"sender",
"state_key",
"content",
"hashes",
"signatures",
"depth",
"prev_events",
"prev_state",
"auth_events",
"origin",
"origin_server_ts",
"membership",
]
redacted_event = {k: v for k, v in event.items() if k in allowed_keys}
if "type" in redacted_event and "content" in redacted_event:
event_type = redacted_event["type"]
content_key_rules = {
"m.room.member": ["membership"],
"m.room.create": ["creator"],
"m.room.join_rules": ["join_rule"],
"m.room.power_levels": [
"ban",
"events",
"events_default",
"kick",
"redact",
"state_default",
"users",
"users_default",
],
"m.room.aliases": ["aliases"],
"m.room.history_visibility": ["history_visibility"],
}
if event_type in content_key_rules:
allowed_content_keys = content_key_rules[event_type]
redacted_event["content"] = {
k: v
for k, v in redacted_event["content"].items()
if k in allowed_content_keys
}
else:
redacted_event["content"] = {}
return redacted_event
def hash_and_sign_event(event_object):
content_hash = event_hash(event_object)
event_object["hashes"] = {"sha256": content_hash}
stripped_object = redact_event(event_object)
signed_object = sign_json(stripped_object)
event_object["signatures"] = signed_object["signatures"]
return event_object
def room_version_from_id(room_id):
room_id_no_sigil = room_id.replace("!", "")
hexadecimal_room_id = bytes(room_id_no_sigil, "utf-8").hex()
if "1" not in hexadecimal_room_id and "2" not in hexadecimal_room_id:
# NOTE: v2 if impossible from room ID alone
hexadecimal_room_id = "2" + hexadecimal_room_id[1:]
def remove_chars(s):
return re.sub("[^12]", "", s)
nums = remove_chars(hexadecimal_room_id)
def most_common_character(s):
s = s.replace(" ", "").lower()
counts = Counter(s)
most_common = counts.most_common(1)
return most_common[0] if most_common else None
return most_common_character(nums)[0]