Skip to content

Prevent memory leak at high throughput #71

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 15 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 10 additions & 7 deletions src/consumer_queue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
#include "sample.h"
#include "send_buffer.h"
#include <chrono>
#include <thread>

using namespace lsl;

Expand All @@ -23,30 +22,34 @@ consumer_queue::~consumer_queue() {
}

void consumer_queue::push_sample(const sample_p &sample) {
// acquire lock for more predictable behavior and avoid race condition with pop_sample()
std::unique_lock<std::mutex> lk(mut_);
// if the buffer is full, drop oldest samples
while (!buffer_.push(sample)) {
sample_p dummy;
buffer_.pop(dummy);
}
cv_.notify_one();
}

sample_p consumer_queue::pop_sample(double timeout) {
sample_p result;
if (timeout <= 0.0) {
std::unique_lock<std::mutex> lk(mut_);
buffer_.pop(result);
} else {
std::unique_lock<std::mutex> lk(mut_);
if (!buffer_.pop(result)) {
// turn timeout into the point in time at which we give up
timeout += lsl::lsl_clock();
do {
if (lsl::lsl_clock() >= timeout) break;
std::this_thread::sleep_for(std::chrono::milliseconds(1));
} while (!buffer_.pop(result));
// release lock, wait for a new sample until the thread calling push_sample delivers one, or until timeout
std::chrono::duration<double> sec(timeout);
cv_.wait_for(lk, sec, [&]{ return this->buffer_.pop(result); });
}
}
return result;
}

uint32_t consumer_queue::flush() noexcept {
std::unique_lock<std::mutex> lk(mut_);
uint32_t n = 0;
while (buffer_.pop()) n++;
return n;
Expand Down
5 changes: 5 additions & 0 deletions src/consumer_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
#include "common.h"
#include "forward.h"
#include <boost/lockfree/spsc_queue.hpp>
#include <mutex>
#include <condition_variable>

namespace lsl {
/**
Expand Down Expand Up @@ -52,6 +54,9 @@ class consumer_queue {
private:
send_buffer_p registry_; // optional consumer registry
buffer_type buffer_; // the sample buffer
// used to wait for new samples
std::mutex mut_;
std::condition_variable cv_;
};

} // namespace lsl
Expand Down
21 changes: 15 additions & 6 deletions src/sample.h
Original file line number Diff line number Diff line change
Expand Up @@ -146,12 +146,10 @@ class sample {
void operator delete(void *x) {
// delete the underlying memory only if it wasn't allocated in the factory's storage area
sample *s = (sample *)x;
if (s && !(s->factory_ &&
(((char *)s) >= s->factory_->storage_ &&
((char *)s) <= s->factory_->storage_ + s->factory_->storage_size_)))
if (s && !s->is_from_factory())
delete[](char *) x;
}

/// Test for equality with another sample.
bool operator==(const sample &rhs) const noexcept;

Expand Down Expand Up @@ -368,16 +366,27 @@ class sample {
;
}

/// Test if the sample wasn't allocated in the factory's storage area
bool is_from_factory(void) {
return (factory_ && (((char *)this) >= factory_->storage_ &&
((char *)this) <= factory_->storage_ + factory_->storage_size_));
}

/// Increment ref count.
friend void intrusive_ptr_add_ref(sample *s) {
s->refcount_.fetch_add(1, std::memory_order_relaxed);
}

/// Decrement ref count and reclaim if unreferenced.
/// Decrement ref count, reclaim if unreferenced and belong to factory's storage, delete otherwise to avoid memory leaks
friend void intrusive_ptr_release(sample *s) {
if (s->refcount_.fetch_sub(1, std::memory_order_release) == 1) {
std::atomic_thread_fence(std::memory_order_acquire);
s->factory_->reclaim_sample(s);
if (s->is_from_factory()) {
s->factory_->reclaim_sample(s);
}
else {
delete s;
}
}
}
};
Expand Down