-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathstream.cpp
313 lines (273 loc) · 10.7 KB
/
stream.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
#include<thread>
#include<vector>
#include<chrono>
#include<atomic>
#include<string>
#include<iostream>
#include<cmath>
#include<math.h>
#include <assert.h>
#ifdef GEM5_ANNOTATION
#include<gem5/m5ops.h>
#endif
struct error_stat {
double total_error;
size_t error_count;
};
// setup global variables
unsigned int total_supported_threads = -1;
std::atomic<unsigned int> barrier_variable_v2(0);
// std::atomic<size_t> part;
size_t part = 0; // has to be reset before invoking each kernel
// declare all the kernel functions here
void kernel_copy(double *A, double *B, size_t size, size_t idx) {
// for a portion of the array, we have to use a serial approach
size_t chunk_size = size / total_supported_threads;
for(size_t i = idx * chunk_size; i < (idx + 1) * chunk_size ; i++)
A[i] = B[i];
barrier_variable_v2++;
}
void kernel_scale(double alpha, double *A, double *B, size_t size,
size_t idx) {
size_t chunk_size = size / total_supported_threads;
for(size_t i = idx * chunk_size; i < (idx + 1) * chunk_size ; i++)
A[i] = alpha * B[i];
barrier_variable_v2++;
}
void kernel_sum(double* A, double* B, double* C, size_t size, size_t idx) {
size_t chunk_size = size / total_supported_threads;
for(size_t i = idx * chunk_size; i < (idx + 1) * chunk_size ; i++)
A[i] = B[i] + C[i];
barrier_variable_v2++;
}
void kernel_triad(double alpha, double* A, double* B, double* C,
size_t size, size_t idx) {
size_t chunk_size = size / total_supported_threads;
for(size_t i = idx * chunk_size; i < (idx + 1) * chunk_size ; i++)
A[i] = B[i] + alpha * C[i];
barrier_variable_v2++;
}
// declare all the utility functions here
struct error_stat total_error(double* a, double *b, size_t size) {
// a is the parallel array
// b is the ground truth (the expected array)
error_stat et;
et.total_error = 0;
et.error_count = 0;
for(int i = 0 ; i < size ; i++) {
et.total_error += abs(b[i] - a[i]);
if(a[i] != b[i]) {
// std::cout << "index " << i << std::endl;
et.error_count++;
}
}
return et;
}
void stats_printer(std::string kernel_name, double data_size_in_gb,
double time_copy, error_stat error_stats) {
double mult_factor = 3;
if(kernel_name == "COPY" || kernel_name == "SCALE")
mult_factor = 2;
std::cout << "STREAM info: " << kernel_name << " Kernel";
std::cout << "\t\tBandwidth : " <<
(mult_factor * data_size_in_gb * 1e6)/time_copy << " GB/s";
std::cout << "\tTime: " << time_copy/1e6 << " s";
std::cout << "\t\tTotal Error: " << error_stats.total_error;
std::cout << "\tError Count: " << error_stats.error_count << std::endl;
}
int main(int argc, char *argv[]) {
if(argc < 2) {
std::cout
<< "Usage: $./stream <size of each array> <optional: num threads>"
<< std::endl;
return -1;
}
// if the user specified the number of threads to use, then override the
// number of total_supported threads by the user specified number.
// otherwise use the number of threads supported by the hardware.
total_supported_threads = std::thread::hardware_concurrency();;
if(argc == 3)
// number of threads is also provided by the user
total_supported_threads = atoi(argv[2]);
// size of the array is supplied by the user. this has to be present.
size_t array_size = (size_t)atol(argv[1]);
// warn the user that the size of the array has to be a multiple of the
// number of threads. I am too lazy to pad the arry to avoid this error.
if(array_size % (size_t)total_supported_threads != 0)
std::cout << "STREAM warn: Expect errors. The size of the array has"
<< " to be a multiple of the total number of threads!"
<< std::endl;
// calculate total size of the array and print this info on the terminal.
double data_size_in_gb =
array_size * sizeof(double) / 1024.0 / 1024.0 / 1024.0;
std::cout << "STREAM info: Array Size : " << data_size_in_gb << " GB"
<< std::endl;
std::cout << "STREAM info: Total Memory Occupied : " <<
4 * data_size_in_gb << " GB" << std::endl;
// allocate all the arrays.
double *a = new double[array_size];
double *b = new double[array_size];
double *c = new double[array_size];
double alpha = 1000;
// setup the values in all these arrays
for(size_t i = 0 ; i < array_size ; i++) {
a[i] = 1;
b[i] = 2;
c[i] = 3;
}
// setup threads here. sanity check for the number of threads. if not, then
// crash the program
assert(total_supported_threads != -1);
std::cout << "STREAM info: Total Number of Threads : " <<
total_supported_threads << std::endl;
// end of overall stats print. this looks good.
std::cout << std::endl;
// now allocate memory for each of the threads
std::thread* stream_threads = new std::thread[total_supported_threads];
// warmup : do all the kernels without timing and in serial
//
std::cout << "STREAM info: NOT warming up!\n" << std::endl;
//
// idk how to do this. so skipping this for now.
// setup some initial numbers for uniformity
part = 0;
barrier_variable_v2 = 0;
for(unsigned int i = 0 ; i < total_supported_threads ; i++)
// index will be multiplied to find the next index
stream_threads[i] = std::thread(kernel_copy, c, a, array_size, i);
// setup initial timers for the actual kernel calls
auto start = std::chrono::steady_clock::now();
#ifdef GEM5_ANNOTATION
m5_work_begin(0,0);
#endif
for(unsigned int i = 0 ; i < total_supported_threads ; i++)
stream_threads[i].join();
// there has to be a barrier for the stats to be accurate.
while(barrier_variable_v2 != total_supported_threads);
#ifdef GEM5_ANNOTATION
m5_work_end(0,0);
#endif
auto end = std::chrono::steady_clock::now();
auto elapsed =
std::chrono::duration_cast<std::chrono::microseconds>(
end - start
).count();
double time_copy = std::chrono::duration<double>(elapsed).count();
// verify and print the stats of this kernel.
stats_printer(
"COPY", data_size_in_gb, time_copy, total_error(c, a, array_size));
// -------------------------- end of copy kernel ----------------------- //
// reset the part counter for the new kernel to run.
part = 0;
barrier_variable_v2 = 0;
for(unsigned int i = 0 ; i < total_supported_threads ; i++)
stream_threads[i] = std::thread(
kernel_scale, alpha, b, c, array_size, i);
// setup the initial timers to keep a track of time.
start = std::chrono::steady_clock::now();
#ifdef GEM5_ANNOTATION
m5_work_begin(0,0);
#endif
for(unsigned int i = 0 ; i < total_supported_threads ; i++)
stream_threads[i].join();
// there has to be a barrier for the stats to be accurate.
while(barrier_variable_v2 != total_supported_threads);
#ifdef GEM5_ANNOTATION
m5_work_end(0,0);
#endif
end = std::chrono::steady_clock::now();
elapsed =
std::chrono::duration_cast<std::chrono::microseconds>(
end - start
).count();
time_copy = std::chrono::duration<double>(elapsed).count();
// verify the array. print stats --------------------------------------- //
// create another array which has the expected results
double *expected_array = new double[array_size];
for(size_t i = 0 ; i < array_size ; i++)
expected_array[i] = alpha * a[i];
// verify and print the stats of this kernel.
stats_printer(
"SCALE", data_size_in_gb, time_copy, total_error(
expected_array, b, array_size
)
);
delete expected_array;
// ------------------------- end of scale kernel ----------------------- //
// reset the part counter for the new kernel to run.
part = 0;
barrier_variable_v2 = 0;
// create this new kernel
for(unsigned int i = 0 ; i < total_supported_threads ; i++)
stream_threads[i] = std::thread(kernel_sum, c, a, b, array_size, i);
// setup the initial timers to keep a track of time.
start = std::chrono::steady_clock::now();
#ifdef GEM5_ANNOTATION
m5_work_begin(0,0);
#endif
for(unsigned int i = 0 ; i < total_supported_threads ; i++)
stream_threads[i].join();
// there has to be a barrier for the stats to be accurate.
while(barrier_variable_v2 != total_supported_threads);
#ifdef GEM5_ANNOTATION
m5_work_end(0,0);
#endif
end = std::chrono::steady_clock::now();
elapsed =
std::chrono::duration_cast<std::chrono::microseconds>(
end - start
).count();
time_copy = std::chrono::duration<double>(elapsed).count();
// verify the array. print stats
// create another array which has the expected results
expected_array = new double[array_size];
for(size_t i = 0 ; i < array_size ; i++)
expected_array[i] = a[i] + b[i];
// verify and print the stats of this kernel.
stats_printer(
"SUM ", data_size_in_gb, time_copy, total_error(
expected_array, c, array_size
)
);
delete expected_array;
// -------------------------- end of sum kernel ------------------------ //
// reset the part counter for the new kernel to run.
part = 0;
barrier_variable_v2 = 0;
// create this new kernel
for(unsigned int i = 0 ; i < total_supported_threads ; i++)
stream_threads[i] =
std::thread(kernel_triad, alpha, a, b, c, array_size, i);
// setup the initial timers to keep a track of time.
start = std::chrono::steady_clock::now();
#ifdef GEM5_ANNOTATION
m5_work_begin(0,0);
#endif
for(unsigned int i = 0 ; i < total_supported_threads ; i++)
stream_threads[i].join();
// there has to be a barrier for the stats to be accurate.
while(barrier_variable_v2 != total_supported_threads);
#ifdef GEM5_ANNOTATION
m5_work_end(0,0);
#endif
end = std::chrono::steady_clock::now();
elapsed =
std::chrono::duration_cast<std::chrono::microseconds>(
end - start
).count();
time_copy = std::chrono::duration<double>(elapsed).count();
// verify the array. print stats
// create another array which has the expected results
expected_array = new double[array_size];
for(size_t i = 0 ; i < array_size ; i++)
expected_array[i] = b[i] + alpha * c[i];
// verify and print the stats of this kernel.
stats_printer(
"TRIAD", data_size_in_gb, time_copy, total_error(
expected_array, a, array_size
)
);
delete expected_array;
// -------------------------- end of triad kernel ---------------------- //
return 0;
}