-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathpeer_receiver_moonSatellite.py
130 lines (109 loc) · 4.78 KB
/
peer_receiver_moonSatellite.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
import socket
import threading
import csv
import json
import argparse
import os
import ast
# Rahul & Rasika & Weilong & Pradosh main contribution
def create_directory_if_not_exists(directory):
if not os.path.exists(directory):
try:
os.makedirs(directory)
except FileExistsError:
pass # Do nothing
def save_data_csv(peer_name, sensor_name, date, hour, data):
path_to_save = os.path.join("/users/pgrad/singhr6/group24/data", peer_name, date, hour)
create_directory_if_not_exists(path_to_save)
data_dict = ast.literal_eval(data)
with open(os.path.join(path_to_save, f"{peer_name}_{sensor_name}.csv"), 'w', newline='') as csvfile:
# Create a CSV writer object
csvwriter = csv.writer(csvfile)
# Write the header row
csvwriter.writerow(data_dict.keys())
# Find the maximum number of values among all keys
max_values = max(len(values) for values in data_dict.values())
# Write the data rows
for i in range(max_values):
# Create a list to store the values for this row
row_data = []
for key in data_dict.keys():
# If the key has fewer values, use None for missing data
value = data_dict[key][i] if i < len(data_dict[key]) else None
row_data.append(value)
csvwriter.writerow(row_data)
def check_message_correctness(input_data):
# Count occurrences of specified substrings
opening_brace_count = input_data.count('{')
closing_brace_count = input_data.count('}')
colon_brace_count = input_data.count(':{')
# Check conditions
if opening_brace_count == 1 and closing_brace_count == 1 and colon_brace_count == 1:
return True
else:
return False
def get_data(input_data):
data_dict = {}
peer_name = input_data.split(":")[0]
sensor_name = input_data.split(":")[1]
devices = ["Curiosity_Rover", "Mars_Rover", "Lander_Module", "Mars_Satellite", "Moon_Satellite", "Earth"]
for device in devices:
if device in sensor_name:
peer_name = device
sensor_name = sensor_name.replace(device + "_", "")
message = "{" + input_data.split(":{")[1]
temp = (message.split('Timestamp": ["')[1]).split('",')[0]
date = temp.split(" ")[0]
hour = temp.split(" ")[1].split(":")[0]
data_dict['peer_name'] = peer_name
data_dict['sensor_name'] = sensor_name
data_dict['message'] = message
data_dict['date'] = date
data_dict['hour'] = hour
return data_dict
def receive_data(sock):
data = sock.recv(90000)
message = data.decode('utf-8')
return message
def handle_peer(peer_socket, addr):
print(f"Accepted connection from {addr}")
with peer_socket as sock:
while True:
try:
message = receive_data(sock)
print(f"Received message: {message}")
if not message:
break
if check_message_correctness(message):
data_dict = get_data(message)
peer_name, sensor_name = data_dict['peer_name'], data_dict['sensor_name']
date, hour = data_dict['date'], data_dict['hour']
save_data_csv(peer_name, sensor_name, date, hour, data_dict['message'])
response_message = f"Peer received your message: {message}"
sock.send(response_message.encode('utf-8'))
else:
response_message = f"Peer received an invalid message: {message}"
sock.send(response_message.encode('utf-8'))
except ConnectionResetError:
print(f"Connection forcibly closed by {addr}")
break
except Exception as e:
print(f"An exception occurred: {e}")
break
def start_peer(host, port):
peer_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
peer_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
peer_socket.bind((host, port))
peer_socket.listen()
print(f"Peer is listening for connections on {host}:{port}")
while True:
peer_client_socket, addr = peer_socket.accept()
peer_handler = threading.Thread(target=handle_peer, args=(peer_client_socket, addr))
peer_handler.daemon = True
peer_handler.start()
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Peer Receiver Server")
parser.add_argument("--host", default='0.0.0.0', help="Host IP address")
parser.add_argument("--port", type=int, default=33338, help="Port number")
args = parser.parse_args()
start_peer(args.host, args.port)