Files
matrix-vona/vona/federation/rooms.py

285 lines
6.7 KiB
Python

import vona.globals as globals
import vona.config as config
# This file is responsible for creating Matrix rooms.
# Room V1/V2
def v1_v2(request, room) -> dict:
event_chain = []
event_ids = [
globals.make_event_id(seed=f"1_{room}"),
globals.make_event_id(seed=f"2_{room}"),
globals.make_event_id(seed=f"3_{room}"),
globals.make_event_id(seed=f"4_{room}"),
globals.make_event_id(seed=f"5_{room}"),
globals.make_event_id(seed=f"6_{room}"),
]
create_event = {
"content": {
"m.federate": True,
"creator": f"@vona:{config.server_name}",
"room_version": globals.room_version_from_id(room)
},
"event_id": event_ids[0],
"origin_server_ts": 1,
"room_id": room,
"sender": f"@vona:{config.server_name}",
"state_key": "",
"depth": 1,
"type": "m.room.create",
"auth_events": [],
"prev_events": []
}
screate_event = globals.hash_and_sign_event(create_event)
event_chain.append(screate_event)
our_join = {
"content": {
"displayname": "Vona",
"avatar_url": f"mxc://{config.server_name}/cat",
"membership": "join"
},
"origin_server_ts": 2,
"sender": f"@vona:{config.server_name}",
"state_key": f"@vona:{config.server_name}",
"type": "m.room.member",
"event_id": event_ids[1],
"room_id": room,
"depth": 2,
"auth_events": [[
screate_event["event_id"],
screate_event["hashes"]
]],
"prev_events": [[
screate_event["event_id"],
screate_event["hashes"]
]]
}
sour_join = globals.hash_and_sign_event(our_join)
event_chain.append(sour_join)
pls = {
"content": {
"users": {
f"@vona:{config.server_name}": "100"
}
},
"origin_server_ts": 3,
"room_id": room,
"sender": f"@vona:{config.server_name}",
"state_key": "",
"type": "m.room.power_levels",
"event_id": event_ids[2],
"depth": 3,
"user_id": f"@vona:{config.server_name}",
"auth_events": [
[
screate_event["event_id"],
screate_event["hashes"]
],
[
sour_join["event_id"],
sour_join["hashes"]
]
],
"prev_events": [[
sour_join["event_id"],
sour_join["hashes"]
]]
}
spls = globals.hash_and_sign_event(pls)
event_chain.append(spls)
join_rule = {
"content": {
"join_rule": "public"
},
"origin_server_ts": 4,
"sender": f"@vona:{config.server_name}",
"state_key": "",
"type": "m.room.join_rules",
"event_id": event_ids[3],
"room_id": room,
"depth": 4,
"auth_events": [
[
screate_event["event_id"],
screate_event["hashes"]
],
[
sour_join["event_id"],
sour_join["hashes"]
],
[
spls["event_id"],
spls["hashes"]
]
],
"prev_events": [[
spls["event_id"],
spls["hashes"]
]]
}
sjoin_rule = globals.hash_and_sign_event(join_rule)
event_chain.append(sjoin_rule)
guest_access = {
"content": {
"guest_access": "forbidden"
},
"origin_server_ts": 5,
"depth": 5,
"sender": f"@vona:{config.server_name}",
"state_key": "",
"type": "m.room.guest_access",
"event_id": event_ids[4],
"room_id": room,
"auth_events": [
[
screate_event["event_id"],
screate_event["hashes"]
],
[
sour_join["event_id"],
sour_join["hashes"]
],
[
spls["event_id"],
spls["hashes"]
]
],
"prev_events": [[
sjoin_rule["event_id"],
sjoin_rule["hashes"]
]]
}
sguest_access = globals.hash_and_sign_event(guest_access)
event_chain.append(sguest_access)
history = {
"content": {
"history_visibility": "shared"
},
"type": "m.room.history_visibility",
"sender": f"@vona:{config.server_name}",
"state_key": "",
"origin_server_ts": 6,
"depth": 6,
"event_id": event_ids[5],
"room_id": room,
"auth_events": [
[
screate_event["event_id"],
screate_event["hashes"]
],
[
sour_join["event_id"],
sour_join["hashes"]
],
[
spls["event_id"],
spls["hashes"]
]
],
"prev_events": [[
sguest_access["event_id"],
sguest_access["hashes"]
]]
}
shistory = globals.hash_and_sign_event(history)
event_chain.append(shistory)
remote_join = request.get_json()
response = {
"auth_chain": event_chain,
"event": remote_join,
"members_omitted": False,
"servers_in_room": [config.server_name],
"state": event_chain
}
return response
# Room V3 to V9
def v3(request, room) -> dict:
initial_response = v1_v2(request, room)
state = list(initial_response["state"])
ver = int(globals.room_version_from_id(room))
events = {}
hash_map = {}
for event in state:
events[event["type"]] = event
del event["event_id"]
del event["hashes"]
del event["signatures"]
# m.room.create doesn't have prev_events or auth_events
events["m.room.create"] = globals.hash_and_sign_event(events["m.room.create"])
hash_map["m.room.create"] = globals.make_ref_hash(events["m.room.create"], ver)
events["m.room.member"]["auth_events"] = [hash_map["m.room.create"]]
events["m.room.member"]["prev_events"] = [hash_map["m.room.create"]]
events["m.room.member"] = globals.hash_and_sign_event(events["m.room.member"])
hash_map["m.room.member"] = globals.make_ref_hash(events["m.room.member"], ver)
events["m.room.power_levels"]["auth_events"] = [
hash_map["m.room.create"],
hash_map["m.room.member"],
]
events["m.room.power_levels"]["prev_events"] = [hash_map["m.room.member"]]
events["m.room.power_levels"] = globals.hash_and_sign_event(events["m.room.power_levels"])
hash_map["m.room.power_levels"] = globals.make_ref_hash(events["m.room.power_levels"], ver)
events["m.room.join_rules"]["auth_events"] = [
hash_map["m.room.create"],
hash_map["m.room.member"],
hash_map["m.room.power_levels"],
]
events["m.room.join_rules"]["prev_events"] = [hash_map["m.room.power_levels"]]
events["m.room.join_rules"] = globals.hash_and_sign_event(events["m.room.join_rules"])
hash_map["m.room.join_rules"] = globals.make_ref_hash(events["m.room.join_rules"], ver)
events["m.room.guest_access"]["auth_events"] = [
hash_map["m.room.create"],
hash_map["m.room.member"],
hash_map["m.room.power_levels"],
]
events["m.room.guest_access"]["prev_events"] = [hash_map["m.room.join_rules"]]
events["m.room.guest_access"] = globals.hash_and_sign_event(events["m.room.guest_access"])
hash_map["m.room.guest_access"] = globals.make_ref_hash(events["m.room.guest_access"], ver)
events["m.room.history_visibility"]["auth_events"] = [
hash_map["m.room.create"],
hash_map["m.room.member"],
hash_map["m.room.power_levels"],
]
events["m.room.history_visibility"]["prev_events"] = [hash_map["m.room.guest_access"]]
events["m.room.history_visibility"] = globals.hash_and_sign_event(events["m.room.history_visibility"])
new_state = []
for event in events:
new_state.append(events[event])
resp = {
"auth_chain": new_state,
"event": initial_response["event"],
"members_omitted": False,
"servers_in_room": [config.server_name],
"state": new_state
}
return resp