import vona.globals as globals import vona.config as config import urllib.parse import httpx import json import time http_client = globals.http_client def get_user_input(prompt): try: return urllib.parse.quote(input(prompt).replace("\\n", "\n", count=32)) except Exception as e: print(f"Error reading input: {e}") return None username = get_user_input("Username:\n\t") room_id = get_user_input("Room ID:\n\t") try: unresolved_server_name = input("\nServer name before resolve:\n\t") resolved_server_name = input("Server name after resolve:\n\t") except Exception as e: print(f"Error reading server names: {e}") exit(1) try: make_join_auth = globals.make_auth_header( unresolved_server_name, "GET", f"/_matrix/federation/v1/make_join/{room_id}/%40{username}%3A{config.server_name}?ver=1&ver=2&ver=3&ver=4&ver=5&ver=6&ver=7&ver=8&ver=9&ver=10&ver=11&ver=12", ) print("\nSending make_join request..") make_join_response = http_client.get( f"https://{resolved_server_name}/_matrix/federation/v1/make_join/{room_id}/%40{username}%3A{config.server_name}?ver=1&ver=2&ver=3&ver=4&ver=5&ver=6&ver=7&ver=8&ver=9&ver=10&ver=11&ver=12", headers={ "Authorization": make_join_auth }, ) make_join_response.raise_for_status() make_join = make_join_response.json() except httpx.HTTPStatusError as e: print(f"HTTP error occurred: {e.response.status_code} - {e.response.text}") exit(1) except json.JSONDecodeError: print("Failed to decode JSON response.") exit(1) except Exception as e: print(f"An error occurred: {e}") exit(1) join_event = make_join.get("event", {}) if make_join.get("room_version") in ["1", "2"]: # NOTE: if we always make it opaque than Synapse will 500 lmao join_event["event_id"] = globals.make_event_id() timestamp = input("\nTimestamp (leave blank for now):\n\t") try: join_event["origin_server_ts"] = int(timestamp) except ValueError: join_event["origin_server_ts"] = int(f"{time.time() * 1000}".split(".")[0]) signed_join = globals.hash_and_sign_event(join_event) try: send_join_auth = globals.make_auth_header( unresolved_server_name, "PUT", f"/_matrix/federation/v2/send_join/{room_id}/%24doesntmatter?omit_members=true", signed_join, ) send_join_response = http_client.put( f"https://{resolved_server_name}/_matrix/federation/v2/send_join/{room_id}/%24doesntmatter?omit_members=true", headers={ "Authorization": send_join_auth }, json=signed_join, ) send_join_response.raise_for_status() print("\nSuccess :)") except httpx.HTTPStatusError as e: print( f"HTTP error occurred during send_join: {e.response.status_code} - {e.response.text}" ) except Exception as e: print(f"An error occurred during send_join: {e}")