-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdata_reader.py
152 lines (132 loc) · 5.87 KB
/
data_reader.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
from boto3.dynamodb.conditions import Key, Attr
import boto3, os, csv, botocore, sys, struct
# import rospy, time, struct,
from rf_msgs.msg import Wifi
# from std_msgs.msg import String
from _CONST import _Const
from botocore.config import Config
from datetime import datetime
def write_csv(items):
# Write data to the CSV file
with open(f"csi_data_from {CONST.SORT_KEY_LOWER_BOUND}_to_{CONST.SORT_KEY_UPPER_BOUND}.csv", 'w', newline='') as csvfile:
fieldnames = list(items[0].keys())
csvwriter = csv.DictWriter(csvfile, delimiter=',',fieldnames = fieldnames)
csvwriter.writeheader()
csvwriter.writerows(items)
def get_DB_items():
aws_config = Config(
region_name = f'{CONST.SERVER_AREA}'
)
dynamodb = boto3.resource(
'dynamodb',
config = aws_config,
aws_access_key_id = f"{CONST.AWS_ACCESS_KEY_ID}",
aws_secret_access_key = f"{CONST.AWS_SECRET_ACCESS_KEY}",
)
table = dynamodb.Table(f'{CONST.DB_NAME}')
response = table.query(
KeyConditionExpression=Key(f'{CONST.PARTITION_KEY}').eq(f"{CONST.PARTITION_KEY_VALUE}")
& Key(f'{CONST.SORT_KEY}').between(CONST.SORT_KEY_LOWER_BOUND, CONST.SORT_KEY_UPPER_BOUND)
)
return response
def create_file_list(items):
# Mapping every file to the corresponding S3 bucket
binary_files_dict = {}
for entry in items:
file_name = entry['file_name']
if file_name not in binary_files_dict:
binary_files_dict[file_name] = entry['bucket_name']
if (CONST.CSV_FOR_NEEDED_FILES):
# Create a CSV file of file names and S3 bucket names of needed binary files from query data
with open('needed_binary_files.csv', 'w', newline='') as file:
writer = csv.writer(file)
# Write the header
writer.writerow(['file_name', 'bucket_name'])
# Write each file name and corresponding S3 bucket name to the CSV
for file_name, bucket_name in binary_files_dict.items():
writer.writerow([file_name, bucket_name])
return binary_files_dict
# download to target folder and return a list of DNE files
def download_needed_files(binary_files_dict):
# configure AWS s3
aws_config = Config(
region_name = f'{CONST.SERVER_AREA}'
)
s3 = boto3.client(
's3',
config = aws_config,
aws_access_key_id = f"{CONST.AWS_ACCESS_KEY_ID}",
aws_secret_access_key = f"{CONST.AWS_SECRET_ACCESS_KEY}",
)
# Get a list of all entries in the binary_data
directory_path = CONST.BINARY_FILES_FOLDER
entries = os.listdir(directory_path)
file_names = [entry for entry in entries if os.path.isfile(os.path.join(directory_path, entry))]
DNE_files = []
print("The following files exist, ignoring...")
print(file_names)
for binary_file in list(binary_files_dict.keys()):
if (binary_file not in file_names):
try:
# check if bucket exists
bucket_name = binary_files_dict[binary_file]
response = s3.head_object(Bucket = bucket_name, Key = binary_file)
print(f'downloading file {binary_file}')
s3.download_file(bucket_name, binary_file, Filename = f'binary_data/{binary_file}')
except botocore.exceptions.ClientError as e:
if e.response['Error']['Code'] == '404':
print(f"The file with key '{binary_file}' does not exist in the bucket '{bucket_name}'.")
DNE_files.append(binary_file)
else:
# Handle other exceptions as needed
print(f"An error occurred: {e}")
raise
print(f"\ndownloading complete.\nfiles not found:\n {str(DNE_files)}")
return DNE_files
# takes a binary file and return a float array
def read_floats_from_file(filename):
floats = []
with open(str(f'{CONST.BINARY_FILES_FOLDER}/{filename}'), 'rb') as file:
binary_data = file.read()
num_floats = len(binary_data) // struct.calcsize('d')
array = list(struct.unpack('d' * num_floats, binary_data))
return array
# if required binary file is not downloaded, csi in entries will be None
def add_csi_to_entries(entries_list):
for entry in entries_list:
file_name = entry['file_name']
offset = int(entry['offset_in_file'])
# ignore if no such a binary file
dirs = os.listdir(CONST.BINARY_FILES_FOLDER)
existed_files = [entry for entry in dirs if os.path.isfile(os.path.join(CONST.BINARY_FILES_FOLDER, entry))]
if file_name in existed_files:
csi_real_imag = read_floats_from_file(file_name)[offset* CONST.COL_PER_ROW:(offset+1)* CONST.COL_PER_ROW]
entry['csi_real'] = csi_real_imag[0:int(CONST.COL_PER_ROW/2)]
entry['csi_imag'] = csi_real_imag[int(CONST.COL_PER_ROW/2) :-1]
print(f"inserted to entry: {entry[CONST.PARTITION_KEY]}, {entry[CONST.SORT_KEY]}")
else:
entry['csi_imag'],entry['csi_real'] = None, None
return entries_list
if __name__ == '__main__':
# initialize
CONST = _Const(os)
print("AWS_ACCESS_KEY_ID: " + CONST.AWS_ACCESS_KEY_ID)
print("AWS_SECRET_ACCESS_KEY: " + CONST.AWS_SECRET_ACCESS_KEY)
print("DB: " + CONST.DB_NAME +f'\n')
response = get_DB_items()
# items are a list of queried entires from DB, where entires are dicts
items = response['Items']
if (len(items) <= 0):
print("no items found")
sys.exit(0)
binary_files_dict = create_file_list(items)
if (CONST.DOWNLOAD_NEEDED_FILES):
download_needed_files(binary_files_dict)
# TODO:
items = add_csi_to_entries(items)
write_csv(items)
print(f'{len(items)} items found, ')
# for each in items:
# print(each, end=f'\n\n\n\n')
# response.pop('Items')
# print(response)