-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathprotocol_wrapper.py
155 lines (119 loc) · 6.44 KB
/
protocol_wrapper.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
from protocol import ExProtocol
import json
class ProtocolWrapper:
def __init__(self):
self.protocol = ExProtocol()
self.connection_id = None
self.private_key = None
def create_handshake_request(self):
# Create a handshake request and store the private key
pow_request, self.private_key = self.protocol.initiate_handshake_request()
if not pow_request:
raise Exception("Failed to initiate handshake request.")
if self.private_key is None:
raise Exception("Private key was not generated.")
return pow_request
def respond_handshake(self, pow_request):
# Respond to a PoW request and generate a new private key
pow_challenge, peer_public_key_bytes = self.protocol.create_pow_challenge(pow_request)
if not pow_challenge:
raise Exception("Failed to create PoW challenge.")
return pow_challenge
def complete_handshake_request(self, pow_challenge):
# Complete the handshake request with PoW solution
handshake_request = self.protocol.complete_pow_request(pow_challenge, self.private_key)
if not handshake_request:
raise Exception("Failed to complete handshake request.")
return handshake_request
def perform_handshake_response(self, handshake_request):
# Respond to a handshake request and generate a new private key
response, self.private_key, self.connection_id = self.protocol.perform_handshake_response(handshake_request)
if not response:
raise Exception("Failed to create handshake response.")
if self.private_key is None:
raise Exception("Private key was not generated.")
if not self.connection_id:
raise Exception("Connection ID was not generated.")
print(f"Connection ID established: {self.connection_id.hex()}")
return response
def complete_handshake(self, response):
# Complete the handshake using the response and private key
self.connection_id = self.protocol.complete_handshake(response, self.private_key)
if not self.connection_id:
raise Exception("Failed to complete handshake.")
print(f"Connection ID established: {self.connection_id.hex()}")
def send_data(self, data, header=None):
if isinstance(data, dict):
data = json.dumps(data).encode('utf-8')
if not self.connection_id:
raise Exception("No active connection. Please initiate a handshake first.")
encrypted_message, packet_uuid = self.protocol.connections[self.connection_id].create_data_packet(data, header)
if encrypted_message is None:
raise Exception("Failed to encrypt message.")
return encrypted_message, packet_uuid
def send_response(self, data, original_packet_uuid, header=None):
if not self.connection_id:
raise Exception("No active connection. Please initiate a handshake first.")
response_data = json.dumps(data).encode('utf-8')
encrypted_message = self.protocol.connections[self.connection_id].create_response_packet(response_data, original_packet_uuid, header)
if encrypted_message is None:
raise Exception("Failed to encrypt message.")
return encrypted_message
def decrypt_data(self, encrypted_data_packet):
if not self.connection_id:
raise Exception("No active connection. Please initiate a handshake first.")
packet = self.protocol.connections[self.connection_id].decrypt_packet(encrypted_data_packet)
if packet is None:
raise Exception("Failed to decrypt data packet.")
if packet.packet_type != ExProtocol.DATA_FLAG:
raise Exception("Invalid flag for data packet.")
data = json.loads(packet.payload.decode(packet.header_dict['encoding']))
return data, packet.header_dict, packet.packet_uuid
def decrypt_response(self, encrypted_response_packet):
if not self.connection_id:
raise Exception("No active connection. Please initiate a handshake first.")
packet = self.protocol.connections[self.connection_id].decrypt_packet(encrypted_response_packet)
if packet is None:
raise Exception("Failed to decrypt response packet.")
if packet.packet_type != ExProtocol.RESPONSE_FLAG:
raise Exception("Invalid flag for response packet.")
response = json.loads(packet.payload.decode(packet.header_dict['encoding']))
return response, packet.header_dict, packet.packet_uuid
# Example usage
def main():
print("=== Testing ProtocolWrapper with PoW ===")
# Initialize wrapper objects for Node A and Node B
wrapper_a = ProtocolWrapper()
wrapper_b = ProtocolWrapper()
# Node A creates a PoW request
pow_request = wrapper_a.create_handshake_request()
print(f"Node A created PoW request")
# Node B responds with a PoW challenge
pow_challenge = wrapper_b.respond_handshake(pow_request)
print(f"Node B created PoW challenge")
# Node A completes the handshake request with PoW solution
handshake_request = wrapper_a.complete_handshake_request(pow_challenge)
print(f"Node A completed handshake request with PoW solution")
# Node B processes the handshake request and responds
response = wrapper_b.perform_handshake_response(handshake_request)
print(f"Node B created handshake response")
# Node A completes the handshake using the response
wrapper_a.complete_handshake(response)
# Node A sends a request to Node B
request_data = {"action": "get_data"}
encrypted_request, request_uuid_a = wrapper_a.send_data(request_data)
print("Node A sends encrypted request")
# Node B decrypts the request and sends a response
received_request, request_header, request_uuid_b = wrapper_b.decrypt_data(encrypted_request)
print("Request Header:", request_header)
print("Node B received request:", received_request)
response_data = {"data": "Here is your data"}
encrypted_response = wrapper_b.send_response(response_data, original_packet_uuid=request_uuid_b)
print(f"Node B sends encrypted response")
# Node A decrypts the response
received_response, response_header, request_uuid_c = wrapper_a.decrypt_response(encrypted_response)
print("Response Header:", response_header)
print("Node A received response:", received_response)
assert request_uuid_c == request_uuid_b == request_uuid_a
if __name__ == "__main__":
main()