-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain.cpp
233 lines (190 loc) · 8.03 KB
/
main.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
#include <iostream>
#include <vector>
#include <random>
#include <algorithm>
#include <openmpi/ompi/mpi/cxx/mpicxx.h>
#define MACHINE_ROOT_ID 0
#define MSG_SORT_TAG 0
#define DATA_TYPE std::uint32_t
#define SEND_TYPE MPI_UINT32_T
// Storage info about Machine and clusters
struct Machine {
static bool IsMaster() noexcept {
return m_MachineID == MACHINE_ROOT_ID;
}
inline static int m_MachineID; // Current process machine ID
inline static int m_MachineSize; // Count of concurrent machines
inline static constexpr std::uint32_t m_SizeArray = 100;
};
// Fill array with random values
template <typename Type, typename = std::enable_if_t<std::is_integral_v<Type>>>
void Fill(std::vector<Type>& array) {
std::random_device random_device;
std::mt19937 random_engine(random_device());
std::uniform_int_distribution<int> distribution(1, array.size());
for(auto& value : array) {
value = distribution(random_engine);
}
}
// Sort array with algorithm bubble sort
template <typename Type, typename = std::enable_if_t<std::is_integral_v<Type>>>
void Sort(std::vector<Type>& array)
{
for(Type i = 0; i < array.size(); ++i)
{
bool isFinish = true;
for(Type j = 0; j < array.size() - (i + 1); ++j)
{
if(array[j] > array[j + 1]) {
isFinish = false;
std::swap(array[j], array[j + 1]);
}
}
if(isFinish) {
break;
}
}
}
// Entry point for Master machine
// If no cluster machines, then the Master machine will sort it
// Otherwise, the array is split and sent to clusters for sorting
void Master()
{
// Prepare array and fill with random values
std::vector<DATA_TYPE> array;
array.resize(Machine::m_SizeArray);
Fill(array);
for(auto value : array) {
std::cout << "[Master] ID: " << Machine::m_MachineID << " fill value: " << value << " to array" << std::endl;
}
// Check count of machines
if(Machine::m_MachineSize == 1)
{
std::cout << "[Info] Cluster machines not found, using Master machine for sorting!" << std::endl;
std::cout << "----- [Start sorting with Master] -----" << std::endl;
Sort(array);
for(auto value : array) {
std::cout << "[Master] ID: " << Machine::m_MachineID << " sorted value: " << value << std::endl;
}
} else {
std::cout << std::endl;
std::cout << "----- [Start sorting with Clusters: " << (Machine::m_MachineSize - 1) << "] -----" << std::endl;
// Calculate distribution of array
const std::uint32_t taskPerProcess = static_cast<std::uint32_t>(array.size()) / (Machine::m_MachineSize - 1);
const std::uint32_t taskRemainder = static_cast<std::uint32_t>(array.size()) - (Machine::m_MachineSize - 1) * taskPerProcess;
std::cout << "[Info] TaskPerProcess: " << taskPerProcess << " | taskRemainder: " << taskRemainder << std::endl;
// Send task to Clusters for sorting
std::uint32_t offset = 0;
for(int i = 1; i < Machine::m_MachineSize; ++i)
{
const bool isLast = (i == (Machine::m_MachineSize - 1));
if(isLast) {
std::cout << "[debug] send size: " << taskPerProcess + taskRemainder << std::endl;
MPI_Send(&array[offset], taskPerProcess + taskRemainder, SEND_TYPE, i, MSG_SORT_TAG, MPI_COMM_WORLD);
} else {
std::cout << "[debug] send size: " << taskPerProcess << std::endl;
MPI_Send(&array[offset], taskPerProcess, SEND_TYPE, i, MSG_SORT_TAG, MPI_COMM_WORLD);
}
offset += taskPerProcess;
}
std::cout << std::endl;
for(auto value : array) {
std::cout << "[Master] ID: " << Machine::m_MachineID << " before sort: " << value << std::endl;
}
std::cout << std::endl;
// -------- [Recv from clusters]
// Initialize MPI Status and Vector for storage sorted data from clusters
MPI_Status status;
std::vector<std::vector<DATA_TYPE>> data;
data.resize(Machine::m_MachineSize - 1);
// Recv sorted array from clusters and fill data array's
for(int i = 1; i < Machine::m_MachineSize; ++i)
{
offset = i - 1;
const bool isLast = (i == (Machine::m_MachineSize - 1));
if(isLast) {
data[offset].resize(taskPerProcess + taskRemainder);
} else {
data[offset].resize(taskPerProcess);
}
MPI_Recv(data[offset].data(), data[offset].size(), SEND_TYPE, i, MSG_SORT_TAG, MPI_COMM_WORLD, &status);
if(status.MPI_ERROR) {
std::cout << "[Master] ID: " << Machine::m_MachineID << " recv data from ClusterID: " << i<< " error: " << status.MPI_ERROR << std::endl;
continue;
}
std::int32_t dataSize = 0;
MPI_Get_count(&status, SEND_TYPE, &dataSize);
std::cout << "[Master] ID: " << Machine::m_MachineID << " recv size: " << dataSize << " from ClusterID: " << status.MPI_SOURCE << std::endl;
}
// Output sorted data
// Sorted data storage in "data", where offset 0 - it's cluster offset
std::cout << std::endl;
std::cout << "----------[Results]----------" << std::endl;
for(std::size_t i = 0; i < data.size(); ++i)
{
const auto& vec = data[i];
std::cout << "--[Result from Cluster ID: " << i + 1 << "]--" << std::endl;
for(auto value : vec) {
std::cout << "Sorted value: " << value << std::endl;
}
std::cout << std::endl;
}
std::cout << std::endl;
}
}
// Entry point for Cluster machine
void Cluster() {
MPI_Status status;
std::vector<DATA_TYPE> array;
{
// Calculate size of array
const bool isLast = (Machine::m_MachineID == (Machine::m_MachineSize - 1));
const std::uint32_t taskPerProcess = Machine::m_SizeArray / (Machine::m_MachineSize - 1);
const std::uint32_t taskRemainder = Machine::m_SizeArray - (Machine::m_MachineSize - 1) * taskPerProcess;
if(isLast) {
array.resize(taskPerProcess + taskRemainder);
} else {
array.resize(taskPerProcess);
}
}
// Recv array from Master machine and fill in array
MPI_Recv(array.data(), array.size(), SEND_TYPE, MACHINE_ROOT_ID, MSG_SORT_TAG, MPI_COMM_WORLD, &status);
std::cout << std::endl;
if(status.MPI_ERROR) {
std::cout << "[Cluster] ID: " << Machine::m_MachineID << " MPI_Recv error: " << status.MPI_ERROR << std::endl;
}
/*
std::int32_t dataSize = 0;
MPI_Get_count(&status, SEND_TYPE, &dataSize);
std::cout << "[Cluster] ID: " << Machine::m_MachineID << " recv data size: " << dataSize << std::endl;
for(auto value : array) {
std::cout << "[Cluster] ID: " << Machine::m_MachineID << " recv data value: " << value << std::endl;
}
std::cout << std::endl; */
Sort(array);
/*
for(auto value : array) {
std::cout << "[Cluster] ID: " << Machine::m_MachineID << " sorted value: " << value << std::endl;
}*/
//std::cout << "[Cluster] ID: " << Machine::m_MachineID << " send data size: " << array.size() << " to MasterID: " << status.MPI_SOURCE << std::endl;
// Send result on Master machine
MPI_Send(array.data(), array.size(), SEND_TYPE, status.MPI_SOURCE, MSG_SORT_TAG, MPI_COMM_WORLD);
}
int main(int argc, char** argv)
{
MPI_Init(&argc, &argv);
MPI_Comm_rank(MPI_COMM_WORLD, &Machine::m_MachineID);
MPI_Comm_size(MPI_COMM_WORLD, &Machine::m_MachineSize);
std::cout << (Machine::IsMaster() ? "[Master]" : "[Cluster]") << " ID: " << Machine::m_MachineID << " initialized";
if(Machine::IsMaster() && Machine::m_MachineSize > 1) {
std::cout << ", clusters: " << (Machine::m_MachineSize - 1);
}
std::cout << std::endl;
if(Machine::IsMaster()) {
Master();
} else {
Cluster();
}
MPI_Finalize();
return 0;
}