Skip to content

Commit

Permalink
SmartRedis and SmartSim logging added as well as pre-run checks (#37)
Browse files Browse the repository at this point in the history
* update logging PR

* small logging changes

* pushing edits, stopped by auto tests

* updated smartredis logging for all cpp files

* throughput smartredis logging complete

* half way through inference cpp for smartredis

* cpp-inference completed

* data aggregation completed

* updating smartsim logging

* last push

* addressing bills comments

* addressing Bill comments final for today

* pushing updates

* rereview

* pushing changes

* pushing for bill review

---------

Co-authored-by: Amanda Richardson <[email protected]>
  • Loading branch information
amandarichardsonn and amandarichardsonn authored Jul 3, 2023
1 parent 748d1c1 commit 9e610b1
Show file tree
Hide file tree
Showing 16 changed files with 591 additions and 206 deletions.
58 changes: 43 additions & 15 deletions cpp-data-aggregation/aggregation_consumer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,52 +11,67 @@ int get_iterations() {
void run_aggregation_consumer(std::ofstream& timing_file,
int list_length)
{
//Initializing rank
int rank;
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
std::string context("Data Aggregation MPI Consumer Rank: " + std::to_string(rank));
log_data(context, LLDebug, "Initialized rank");

//Indicate Client creation
if (rank == 0)
log_data(context, LLInfo, "Connecting clients");
std::cout << "Connecting clients" << std::endl;

// Connect the client and save connection time
double constructor_start = MPI_Wtime();
SmartRedis::Client client(true);
SmartRedis::Client client(true, context);
double constructor_end = MPI_Wtime();
double delta_t = constructor_end - constructor_start;
timing_file << rank << "," << "client()" << ","
<< delta_t << "\n";
//print rank # storing client() for debugging
log_data(context, LLDebug, "client() time stored");

// Allocate arrays to hold timings
std::vector<double> get_list_times;

// Retrieve the number of iterations to run
int iterations = get_iterations();
log_data(context, LLDebug, "Running with iterations: " + std::to_string(iterations));

// Block to make sure all clients are connected
MPI_Barrier(MPI_COMM_WORLD);

log_data(context, LLDebug, "Iteration loop starting...");

// Retrieve rank-local loop start time
double loop_start = MPI_Wtime();

// Perform dataset aggregation retrieval
for (int i=0; i<iterations; i++) {
log_data(context, LLDebug, "Running iteration: " + std::to_string(i));

// Create aggregation list name
std::string list_name = "iteration_" + std::to_string(i);

if (rank == 0) {
std::cout << "Consuming list " << i << std::endl;
log_data(context, LLInfo, "Consuming list " + std::to_string(i));
}

// Have rank 0 check that the aggregation list is full
if(rank == 0) {
bool list_is_ready = client.poll_list_length(list_name,
list_length,
5, 100000);
if(!list_is_ready)
throw std::runtime_error("There was an error in the "\
if(!list_is_ready) {
std::string list_size_error = "There was an error in the "\
"aggregation scaling test. "\
"The list never reached size of " +
std::to_string(list_length));
std::to_string(list_length);
log_error(context, LLDebug, list_size_error);
throw std::runtime_error(list_size_error);
}
}

// Have all ranks wait until the aggregation list is full
Expand All @@ -67,6 +82,7 @@ void run_aggregation_consumer(std::ofstream& timing_file,
std::vector<SmartRedis::DataSet> result =
client.get_datasets_from_list(list_name);
double get_list_end = MPI_Wtime();
log_data(context, LLDebug, "get_list completed");
delta_t = get_list_end - get_list_start;
get_list_times.push_back(delta_t);

Expand All @@ -76,11 +92,12 @@ void run_aggregation_consumer(std::ofstream& timing_file,
// Delete the list so the producer knows the list has been consumed
if (rank == 0) {
client.delete_list(list_name);
log_data(context, LLDebug, "Data Agg List " + list_name + " is deleted");
}
}

// Compute loop execution time
double loop_end = MPI_Wtime();
log_data(context, LLDebug, "All iterations complete");
delta_t = loop_end - loop_start;

// Write aggregation times to file
Expand All @@ -93,6 +110,7 @@ void run_aggregation_consumer(std::ofstream& timing_file,
timing_file << rank << "," << "loop_time" << ","
<< delta_t << "\n";

log_data(context, LLDebug, "Data written to files");
// Flush the output stream
timing_file << std::flush;

Expand All @@ -103,39 +121,49 @@ int main(int argc, char* argv[]) {

MPI_Init(&argc, &argv);

//initializing rank
int rank;
MPI_Comm_rank(MPI_COMM_WORLD, &rank);

std::string context("Data Aggregation Tests Consumer Rank: " + std::to_string(rank));
log_data(context, LLDebug, "Rank initialized");
log_data(context, LLDebug, "Starting Data Aggregation tests");
double main_start = MPI_Wtime();

// Get command line arguments
if(argc==1)
throw std::runtime_error("The expected list length must be "
"passed in.");

if(argc==1) {
std::string list_length_error = "The expected list length must be passed in.";
log_error(context, LLInfo, list_length_error);
throw std::runtime_error(list_length_error);
}
std::string s_list_length(argv[1]);
int list_length = std::stoi(s_list_length);

if(rank==0)
if(rank==0) {
log_data(context, LLInfo, "Running aggregate scaling test consumer.");
std::cout << "Running aggregate scaling test consumer." << std::endl;

}
// Open Timing file
std::ofstream timing_file;
timing_file.open("rank_" + std::to_string(rank) + "_timing.csv");

// Run the aggregation scaling study
run_aggregation_consumer(timing_file, list_length);

if(rank==0)
std::cout << "Finished aggregation scaling consumer." << std::endl;

// Save time it took to run main function
double main_end = MPI_Wtime();

//Indicate test end to user
if(rank==0) {
log_data(context, LLInfo, "Finished aggregation scaling consumer.");
std::cout << "Finished aggregation scaling consumer." << std::endl;
}
//Logging total Data Agg time to file
double delta_t = main_end - main_start;
timing_file << rank << "," << "main()" << ","
<< delta_t << std::endl << std::flush;

MPI_Finalize();

return 0;
}
}
56 changes: 36 additions & 20 deletions cpp-data-aggregation/aggregation_producer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,19 @@ int get_iterations() {
void run_aggregation_production(size_t n_bytes,
size_t tensors_per_dataset)
{
//Initializing rank
int rank;
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
std::string context("Data Aggregation MPI Producer Rank: " + std::to_string(rank));
log_data(context, LLDebug, "Initialized rank");

if (rank == 0)
//Indicate Client creation
if (rank == 0) {
log_data(context, LLInfo, "Connecting clients");
std::cout << "Connecting clients" << std::endl;

}
// Connect a client for each MPI rank
SmartRedis::Client client(true);
SmartRedis::Client client(true, context);

// Block for all clients to be connected
MPI_Barrier(MPI_COMM_WORLD);
Expand All @@ -32,6 +37,7 @@ void run_aggregation_production(size_t n_bytes,

// Get the number of iterations to perform
int iterations = get_iterations();
log_data(context, LLDebug, "Running with iterations: " + std::to_string(iterations));

// Put the datasets into the database. The raw
// dataset data can be re-used between iterations
Expand Down Expand Up @@ -59,6 +65,7 @@ void run_aggregation_production(size_t n_bytes,
// A new list is created for each iteration
// to measure dataset aggregation throughput
for (int i = 0; i < iterations; i++) {
log_data(context, LLDebug, "Running iteration: " + std::to_string(i));

// Set the list name (not MPI rank dependent)
std::string list_name = "iteration_" + std::to_string(i);
Expand All @@ -84,39 +91,48 @@ int main(int argc, char* argv[]) {

MPI_Init(&argc, &argv);

//initializing rank
int rank;
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
std::string context("Data Aggregation Tests Producer Rank: " + std::to_string(rank));
log_data(context, LLDebug, "Rank initialized");

// Get command line arguments
if(argc==1)
throw std::runtime_error("The number tensor size in "\
if(argc==1) {
std::string tensor_size_error = "The number tensor size in "\
"bytes must be provided as "\
"a command line argument.");

if(argc==2)
throw std::runtime_error("The number of tensors per "\
"a command line argument.";
log_error(context, LLDebug, tensor_size_error);
throw std::runtime_error(tensor_size_error);
}
if(argc==2) {
std::string tensor_per_error = "The number of tensors per "\
"dataset must be provided as "\
"a command line argument.");

"a command line argument.";
log_error(context, LLDebug, tensor_per_error);
throw std::runtime_error(tensor_per_error);
}
std::string s_bytes(argv[1]);
int n_bytes = std::stoi(s_bytes);

std::string s_tensors_per_dataset(argv[2]);
int tensors_per_dataset = std::stoi(s_tensors_per_dataset);

if(rank==0)
std::cout << "Running aggregate scaling producer test with "\
"tensor size of " << n_bytes <<
" bytes and "<< tensors_per_dataset <<
" tensors per dataset." << std::endl;

if(rank==0) {
std::string tensor_text = "Running aggregate scaling producer test with tensor size of ";
tensor_text += std::to_string(n_bytes) + " bytes and " + std::to_string(tensors_per_dataset);
tensor_text += " tensors per dataset.";
log_data(context, LLInfo, tensor_text);
std::cout << tensor_text << std::endl;
}
// Run the dataset and aggregation list production
run_aggregation_production(n_bytes, tensors_per_dataset);

if(rank==0)
if(rank==0) {
log_data(context, LLInfo, "Finished data aggregation production.");
std::cout << "Finished data aggregation production." << std::endl;

}
MPI_Finalize();

return 0;
}
}
Loading

0 comments on commit 9e610b1

Please sign in to comment.