-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathutils_ipynb.py
206 lines (178 loc) · 7.73 KB
/
utils_ipynb.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import os
def generate_datetime_list(start_datetime, increase, num_steps, offset=0, reverse=False):
from datetime import datetime, timedelta
from dateutil.relativedelta import relativedelta
# Ensure start_datetime is a pandas Timestamp and has time components
if not isinstance(start_datetime, pd.Timestamp):
raise ValueError("start_datetime must be a pandas Timestamp.")
# Set time to 00:00:00 if start_datetime does not include hours, minutes, and seconds
if start_datetime.hour == 0 and start_datetime.minute == 0 and start_datetime.second == 0:
start_datetime = start_datetime.replace(hour=0, minute=0, second=0)
# Extract the number and unit from 'increase'
if increase[:-1].isdigit():
n = int(increase[:-1])
unit = increase[-1]
else:
n = 1 # Default increment if no number is provided
unit = increase
# Determine the increment based on the unit
if unit == 'H':
increment = timedelta(hours=n)
elif unit == 'T':
increment = timedelta(minutes=n)
elif unit == 'D':
increment = timedelta(days=n)
elif unit == 'M':
increment = relativedelta(months=n)
elif unit == 'A-DEC':
increment = relativedelta(years=n)
elif unit == 'W-SUN':
increment = relativedelta(weeks=n)
elif unit == 'm':
increment = relativedelta(microseconds=n)
else:
raise ValueError("Invalid increase value. Must be in ['H', 'T', 'D', 'M', 'A-DEC', 'W-SUN'] with optional 'n' prefix.")
# Generate the list of datetime values
datetime_list = []
if reverse:
for i in range(num_steps):
datetime_list.insert(0, start_datetime - (offset + i) * increment)
else:
for i in range(num_steps):
datetime_list.append(start_datetime + (offset + i) * increment)
return datetime_list
def get_pems04_data(duration, pred_hrz, sampling_rate=None, house_id=1, batch_id=0, batch_number=10, data_df=None):
"""
Extracts weather data from the 'Salesforce/lotsa_data' dataset, 'subseasonal' subset.
Parameters:
- duration: The duration of the data in hours.
- pred_hrz: The prediction horizon in hours.
- sampling_rate: Sampling rate in seconds. If None, the original sampling rate from the dataset is used.
- house_id: The ID of the house (defaults to 1).
- batch_id: Batch ID to extract from data (defaults to 0).
- batch_number: Number of batches (defaults to 10).
Returns:
- data: The extracted data for the specified duration.
- test_data: The test data for the prediction horizon.
"""
assert batch_id <= 307
dataset = data_df['train']
dataset_pd = dataset.to_pandas()
start_datetime = dataset_pd['start'][batch_id]
increase = dataset_pd['freq'][0]
len_data = int(duration * 3600 / (sampling_rate))
len_gt = int(pred_hrz * 3600 / (sampling_rate))
tot_len = len_gt+len_data
datetime_list = generate_datetime_list(start_datetime, increase, num_steps=tot_len)
data = dataset_pd['target'][batch_id][0][:tot_len]
data_all = np.stack([data, datetime_list]).T
return data_all[:len_data,:], data_all[len_data: tot_len, :]
def plot_pred(org_data, median, _dir='None', gt=None, low=None, high=None, forecast_index=None, title=None):
# print("Visualizing prediction data...")
if forecast_index is None:
forecast_index = range(len(org_data), len(org_data) + len(median))
plt.figure(figsize=(8, 4))
is_imp = np.isnan(np.float32(org_data)).any()
if is_imp:
nan_idx = np.where(np.isnan(np.float32(org_data)))[0]
plt.plot(org_data, color="royalblue", label="historical data")
if is_imp:
plt.plot(nan_idx, median, color="tomato", label="median forecast")
else:
plt.plot(forecast_index, median, color="tomato", label="median forecast")
if low is not None and high is not None:
plt.fill_between(forecast_index, low, high, color="tomato", alpha=0.3, label="80% prediction interval")
if gt is not None:
if is_imp:
plt.plot(nan_idx, gt, color="green", label="gt")
else:
plt.plot(range(len(org_data), len(org_data) + len(gt)), gt, color="green", label="gt")
plt.legend()
plt.grid()
directory = f"./{_dir}"
if not os.path.exists(directory):
os.makedirs(directory, exist_ok=True)
if title is not None:
plt.savefig(f'{directory}/{title}.jpg')
plt.show()
plt.close()
def calc_normalized_rmse(y_true, y_pred):
"""
Calculate the normalized Root Mean Square Error (RMSE) between two arrays.
Parameters:
y_true (array-like): The ground truth values.
y_pred (array-like): The predicted values.
Returns:
float: The normalized RMSE.
"""
# Convert inputs to numpy arrays
y_true = np.array(y_true)
y_pred = np.array(y_pred)
# Calculate RMSE
rmse = np.sqrt(np.mean((y_true - y_pred) ** 2))
# Normalize RMSE
range_of_data = np.max(y_true) - np.min(y_true)
normalized_rmse = rmse / (range_of_data+1e-20)
return normalized_rmse
def calculate_metrics(gt, forecast):
gt = np.float64(gt)
if np.sum(np.isnan(forecast)) >= 1 or np.sum(np.isnan(gt)) >= 1:
# pdb.set_trace()
print('NaN detected...')
# mse = mean_squared_error(gt, forecast)
mse = np.mean((gt - forecast) ** 2)
rmse = np.sqrt(mse)
normalized_rmse = calc_normalized_rmse(gt, forecast)
return mse, rmse, normalized_rmse
def save_results_for_model(model_name, data, test_data, forecast, duration, pred_hrz, mode, occupancy, batch_id, directory):
"""
Save model predictions and evaluation metrics to a CSV file.
Parameters:
- model_name: str, name of the model
- data: numpy array, input data used for prediction (what get_data functions returns)
- test_data: numpy array, ground truth data for validation (what get_data functions returns)
- forecast: numpy array, predictions made by the model
- duration: int, duration of the data being analyzed
- pred_hrz: int, prediction horizon
- mode: str, mode of operation (e.g., 'off', 'heat')
- occupancy: str, occupancy status ('occupied' or 'unoccupied')
- batch_id: int, identifier for the current batch
- directory: str, path to the directory where results should be saved
Returns:
None
"""
mse, rmse, normalized_rmse = calculate_metrics(test_data[:, 0], forecast)
value_data = data[:, 0]
timestamp_data = data[:, -1]
ground_truth = test_data[:, 0]
timestamp_forecast = test_data[:, -1]
result = {
'batch_id': batch_id,
'MSE': mse,
'RMSE': rmse,
'Norm_RMSE': normalized_rmse,
'Data': value_data.tolist(),
'Timestamp_data': timestamp_data.tolist(),
'GroundTruth': ground_truth.tolist(),
'Timestamp_forecast': timestamp_forecast.tolist(),
'Forecast': forecast.tolist(),
'Duration': duration,
'PredictionHorizon': pred_hrz,
'Mode': mode,
'Occupancy': occupancy,
'Model': model_name
}
# Load existing results if they exist
file_path = f"{directory}/results_{mode}_{occupancy}/{duration}_{pred_hrz}/{model_name}.csv"
os.makedirs(os.path.dirname(file_path), exist_ok=True)
try:
df = pd.read_csv(file_path)
except FileNotFoundError:
df = pd.DataFrame(columns=['batch_id', 'MSE', 'RMSE', 'Norm_RMSE', 'Mode', 'Occupancy', 'Duration', 'PredictionHorizon', 'Data', 'Timestamp_data', 'GroundTruth', 'Timestamp_forecast', 'Forecast', 'Model'])
# Append the new result
df = pd.concat([df, pd.DataFrame([result])], ignore_index=True)
# Save the results back to the CSV file
df.to_csv(file_path, index=False)