Skip to content
This repository was archived by the owner on May 31, 2025. It is now read-only.

Commit 7e1c6ff

Browse files
committed
Added multithreading
- Mixer creates producer consumer relations: picks a random producer, and then a random consumer to free the memory allocated in the producer.
1 parent 4f42dbf commit 7e1c6ff

File tree

8 files changed

+138
-44
lines changed

8 files changed

+138
-44
lines changed

CMakeLists.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,6 @@ add_executable(memsetVsMadvise memsetVsMadvise.cpp)
77
set_property(TARGET memsetVsMadvise PROPERTY CXX_STANDARD 11)
88
target_link_libraries(memsetVsMadvise PRIVATE ${CMAKE_THREAD_LIBS_INIT} gflags)
99

10-
add_executable(stress stress_test/Main.cpp stress_test/Producers.cpp stress_test/Mixer.cpp)
10+
add_executable(stress stress_test/Main.cpp stress_test/Producers.cpp stress_test/Mixer.cpp stress_test/ToFreeQueue.cpp)
1111
set_property(TARGET stress PROPERTY CXX_STANDARD 14)
1212
target_link_libraries(stress PRIVATE gflags)

stress_test/Main.cpp

Lines changed: 38 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,28 +1,58 @@
11
#include <chrono>
22
#include <iostream>
3+
#include <thread>
34
#include <vector>
45

56
#include <gflags/gflags.h>
67

78
#include "Mixer.h"
89

910
DEFINE_int32(num_producers, 100, "number of producers to run");
11+
DEFINE_int32(num_threads, 1, "number of threads to run");
1012

11-
int main(int argc, char **argv) {
13+
using std::make_shared;
14+
using std::shared_ptr;
15+
using std::vector;
16+
17+
void createAndRunMixer(vector<shared_ptr<Producer>> producers, int me,
18+
vector<shared_ptr<ToFreeQueue>> toFreeQueues) {
19+
Mixer m(producers, FLAGS_num_producers, me, toFreeQueues);
20+
m.run();
21+
}
1222

23+
int main(int argc, char **argv) {
1324
gflags::ParseCommandLineFlags(&argc, &argv, true);
1425

15-
vector<std::unique_ptr<Producer>> producers;
16-
producers.push_back(std::move(std::make_unique<SimpleProducer>(8, 100000)));
17-
producers.push_back(std::move(std::make_unique<VectorProducer>(
18-
100000, std::chrono::duration<double>(1.0))));
26+
// Initialize producers
27+
vector<shared_ptr<Producer>> producers;
28+
producers.push_back(make_shared<SimpleProducer>(8, 100000));
29+
producers.push_back(
30+
make_shared<VectorProducer>(100000, std::chrono::duration<double>(1.0)));
31+
32+
// Set up a work queue for each thread
33+
vector<std::thread> threads;
34+
vector<shared_ptr<ToFreeQueue>> toFreeQueues;
35+
for (int i = 0; i < FLAGS_num_threads; i++) {
36+
shared_ptr<ToFreeQueue> toFreeQ = make_shared<ToFreeQueue>();
37+
toFreeQueues.push_back(toFreeQ);
38+
}
39+
40+
for (int i = 0; i < FLAGS_num_threads; i++) {
41+
// each thread gets an arbitrary id given by [i]
42+
threads.push_back(
43+
std::thread(createAndRunMixer, producers, i, toFreeQueues));
44+
}
1945

2046
using namespace std::chrono;
2147

22-
Mixer m(std::move(producers), FLAGS_num_producers);
23-
2448
high_resolution_clock::time_point beginTime = high_resolution_clock::now();
25-
m.run();
49+
for (auto it = begin(threads); it != end(threads); ++it) {
50+
it->join();
51+
}
52+
// Cleanup any remaining memory
53+
for (int i = 0; i < FLAGS_num_threads; i++) {
54+
toFreeQueues[i]->freeIgnoreLifetime();
55+
}
2656
high_resolution_clock::time_point endTime = high_resolution_clock::now();
2757

2858
duration<double> span = duration_cast<duration<double>>(endTime - beginTime);

stress_test/Mixer.cpp

Lines changed: 21 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,34 +1,33 @@
11
#include "Mixer.h"
22

3-
Mixer::Mixer(vector<unique_ptr<Producer>> producers, int numProducers)
4-
: producers_(move(producers)), producersRemaining_(numProducers),
5-
producerIndexPicker_(0, this->producers_.size() - 1) {}
3+
Mixer::Mixer(vector<shared_ptr<Producer>> producers, int numProducers, int me,
4+
vector<shared_ptr<ToFreeQueue>> toFreeQueues)
5+
: producers_(producers), producersRemaining_(numProducers),
6+
toFreeQueues_(toFreeQueues), me_(me),
7+
producerIdPicker_(0, producers.size() - 1),
8+
consumerIdPicker_(0, toFreeQueues.size() - 1) {}
69

710
// Picks next producer for the mixer to run. Currently uniform random choice
8-
const Producer &Mixer::pick() {
9-
int producerIndex = this->producerIndexPicker_(this->generator_);
11+
const Producer &Mixer::pickProducer() {
12+
int producerIndex = this->producerIdPicker_(this->generator_);
1013
return *(this->producers_[producerIndex]);
1114
}
1215

16+
// Picks next producer for the mixer to run. Currently uniform random choice
17+
ToFreeQueue &Mixer::pickConsumer() {
18+
int consumerIndex = this->consumerIdPicker_(this->generator_);
19+
return *(this->toFreeQueues_[consumerIndex]);
20+
}
21+
1322
void Mixer::run() {
1423
while (this->producersRemaining_ > 0) {
15-
if (this->allocated_.size() > 0 &&
16-
this->allocated_.back().freeAfter() <
17-
std::chrono::high_resolution_clock::now()) {
18-
// deallocate something if it's lifetime has expired
19-
this->allocated_.pop_back();
20-
} else {
21-
// otherwise run a random producer
22-
Allocation a = this->pick().run();
23-
if (!a.isEmpty()) {
24-
this->allocated_.push_back(std::move(a));
25-
sort(rbegin(this->allocated_), rend(this->allocated_));
26-
}
27-
producersRemaining_--;
24+
this->toFreeQueues_[this->me_]->free();
25+
// otherwise run a random producer
26+
Allocation a = this->pickProducer().run();
27+
if (!a.isEmpty()) {
28+
this->pickConsumer().addToFree(std::move(a));
2829
}
30+
producersRemaining_--;
2931
}
30-
// cleanup remaining allocated things immediately, regardless of lifetime
31-
while (!this->allocated_.empty()) {
32-
this->allocated_.pop_back();
33-
}
32+
// Main loop will eventually cleanup memory
3433
}

stress_test/Mixer.h

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,22 +4,33 @@
44
#include <vector>
55

66
#include "Producers.h"
7+
#include "ToFreeQueue.h"
78

8-
using std::unique_ptr;
9+
using std::shared_ptr;
910
using std::vector;
1011

1112
class Mixer {
1213
public:
1314
void run();
14-
Mixer(vector<unique_ptr<Producer>> producers, int numProducers);
15+
Mixer(vector<shared_ptr<Producer>> producers, int numProducers, int me,
16+
vector<shared_ptr<ToFreeQueue>> toFreeQueues);
1517

1618
private:
1719
/* maintains reverse-sorted order by lifetime; i.e. [push_back] yields the
1820
* allocation that should be deallocated the soonest */
1921
vector<Allocation> allocated_;
20-
vector<unique_ptr<Producer>> producers_;
22+
vector<shared_ptr<Producer>> producers_;
2123
int producersRemaining_;
22-
const Producer &pick();
23-
std::uniform_int_distribution<int> producerIndexPicker_;
24+
// the thread id that this mixer is running on
25+
int me_;
26+
// work queues for each thread indexed by thread number
27+
vector<shared_ptr<ToFreeQueue>> toFreeQueues_;
28+
// Picks next producer for the mixer to run. Currently uniform random choice.
29+
const Producer &pickProducer();
30+
// Picks a consumer to free memory allocated by a producer. Currently uniform
31+
// random choice.
32+
ToFreeQueue &pickConsumer();
33+
std::uniform_int_distribution<int> producerIdPicker_;
34+
std::uniform_int_distribution<int> consumerIdPicker_;
2435
std::default_random_engine generator_;
2536
};

stress_test/Producers.cpp

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,10 @@ bool Allocation::operator<(const Allocation &that) const {
88
return this->toFree_ < that.toFree_;
99
}
1010

11+
bool Allocation::operator>(const Allocation &that) const {
12+
return !(*this < that);
13+
}
14+
1115
bool Allocation::isEmpty() const { return this->toFree_.size() == 0; }
1216

1317
std::chrono::high_resolution_clock::time_point Allocation::freeAfter() const {
@@ -41,8 +45,8 @@ Allocation SimpleProducer::run() const {
4145
}
4246

4347
void swap(Allocation &a1, Allocation &a2) {
44-
a1.toFree_.swap(a2.toFree_);
45-
std::swap(a1.freeAfter_, a2.freeAfter_);
48+
a1.toFree_.swap(a2.toFree_);
49+
std::swap(a1.freeAfter_, a2.freeAfter_);
4650
}
4751

4852
// Vector Producer

stress_test/Producers.h

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ class Allocation {
77
public:
88
// sorts based on [freeAfter] field
99
bool operator<(const Allocation &that) const;
10+
bool operator>(const Allocation &that) const;
1011
// true iff [this->toFree_] is empty
1112
bool isEmpty() const;
1213
std::chrono::high_resolution_clock::time_point freeAfter() const;
@@ -19,15 +20,15 @@ class Allocation {
1920
Allocation(Allocation const &) = delete;
2021
Allocation &operator=(Allocation const &) = delete;
2122

22-
// must define a move constructor since we deleted the copy constructor
23-
Allocation(Allocation&&) = default;
24-
Allocation& operator=(Allocation&&) = default;
23+
// must define a move constructor since we deleted the copy constructor
24+
Allocation(Allocation &&) = default;
25+
Allocation &operator=(Allocation &&) = default;
2526

26-
// The destructor deallocates the memory in [toFree_]
27+
// The destructor deallocates the memory in [toFree_]
2728
~Allocation();
2829

29-
// needed to sort
30-
friend void swap(Allocation &a1, Allocation &a2);
30+
// needed to sort
31+
friend void swap(Allocation &a1, Allocation &a2);
3132

3233
private:
3334
std::vector<void *> toFree_;

stress_test/ToFreeQueue.cpp

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
#include "ToFreeQueue.h"
2+
3+
#include <chrono>
4+
#include <mutex>
5+
#include <thread>
6+
7+
void ToFreeQueue::free() {
8+
std::lock_guard<std::mutex> guard(this->lock_);
9+
10+
while (!this->q_.empty() && this->q_.top().freeAfter() <
11+
std::chrono::high_resolution_clock::now()) {
12+
this->q_.pop();
13+
}
14+
}
15+
16+
void ToFreeQueue::freeIgnoreLifetime() {
17+
std::lock_guard<std::mutex> guard(this->lock_);
18+
19+
while (!this->q_.empty()) {
20+
this->q_.pop();
21+
}
22+
}
23+
24+
void ToFreeQueue::addToFree(Allocation a) {
25+
std::lock_guard<std::mutex> guard(this->lock_);
26+
this->q_.push(std::move(a));
27+
}

stress_test/ToFreeQueue.h

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
#pragma once
2+
3+
#include <queue>
4+
#include <thread>
5+
6+
#include "Producers.h"
7+
8+
class ToFreeQueue {
9+
public:
10+
// frees all allocations whose lifetime has elapsed
11+
void free();
12+
// free all allocations, even if the lifetime hasn't expired
13+
void freeIgnoreLifetime();
14+
// Add an allocation to be freed after a particular time
15+
void addToFree(Allocation a);
16+
17+
private:
18+
std::mutex lock_;
19+
std::priority_queue<Allocation, std::vector<Allocation>,
20+
std::greater<Allocation>>
21+
q_;
22+
};

0 commit comments

Comments
 (0)