Host our public key, and add proper ways to sign JSON + send requests.

This commit is contained in:
2025-09-10 20:05:38 -04:00
parent 4f17450992
commit 9fd2f84741
4 changed files with 142 additions and 47 deletions

View File

@@ -1,50 +1,107 @@
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization
import nacl.signing
import hashlib
import base64
import config
import json
import re
import os
vona_version = "1.2.4"
def canonical_json(value):
return json.dumps(
value,
ensure_ascii=False,
separators=(',',':'),
sort_keys=True
separators=(",", ":"),
sort_keys=True,
).encode("UTF-8")
'''
def encode_base64(data: bytes) -> str:
return base64.b64encode(data).decode('utf-8')
def sign_json(json_object, signing_key, signing_name):
signatures = json_object.pop("signatures", {})
unsigned = json_object.pop("unsigned", None)
def sign_json(data):
parts = config.signing_key.split()
base64_key = parts[2]
signed = signing_key.sign(canonical_json(json_object))
signature_base64 = encode_base64(signed)
while len(base64_key) % 4 != 0:
base64_key += "="
key_id = "ed25519:VonaA"
signatures.setdefault(signing_name, {})[key_id] = signature_base64
decoded_key = base64.b64decode(base64_key)
signing_key = nacl.signing.SigningKey(decoded_key)
json_object["signatures"] = signatures
if unsigned is not None:
json_object["unsigned"] = unsigned
signed_message = signing_key.sign(canonical_json(data))
return json_object
'''
signature = signed_message.signature
key_version = parts[1]
signature_base64 = base64.b64encode(signature).decode("utf-8").rstrip("=")
signed_json = {
**data,
"signatures": {
config.server_name: {
f"ed25519:{key_version}": signature_base64,
},
},
}
return signed_json
vona_version = '1.2.3'
def make_event_id():
return re.sub(r'[\/+=]', '_', base64.b64encode(os.urandom(32)).decode('utf-8'))[:44]
return re.sub(r"[\/+=]", "_", base64.b64encode(os.urandom(32)).decode("utf-8"))[:44]
def hash_event(input) -> str:
input.pop('signatures', None)
input.pop('unsigned', None)
input.pop("signatures", None)
input.pop("unsigned", None)
sha256_hash = hashlib.sha256(canonical_json(input)).digest()
base64_encoded = base64.b64encode(sha256_hash)
return base64_encoded.decode().rstrip('=')
return base64_encoded.decode().rstrip("=")
def pubkey() -> str:
private_key = config.signing_key.split()[2]
while len(private_key) % 4 != 0:
private_key += "="
public_key = nacl.signing.SigningKey(base64.b64decode(private_key)).verify_key
return (
public_key.encode(encoder=nacl.encoding.Base64Encoder)
.decode("utf-8")
.rstrip("=")
)
def make_auth_header(destination, method, path, content=None) -> str:
request_json = {
"method": method,
"uri": path,
"origin": config.server_name,
"destination": destination,
}
if content is not None:
request_json["content"] = content
signed_json = sign_json(request_json)
authorization_headers = []
for key, sig in signed_json["signatures"][origin_name].items():
authorization_headers.append(
bytes(
'X-Matrix origin="%s",destination="%s",key="%s",sig="%s"'
% (
config.server_name,
destination,
key,
sig,
)
)
)
return ("Authorization", authorization_headers[0])