Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added emplace variants of enqueue and try_enqueue #275

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 63 additions & 18 deletions concurrentqueue.h
Original file line number Diff line number Diff line change
Expand Up @@ -1009,6 +1009,18 @@ class ConcurrentQueue
else return inner_enqueue<CanAlloc>(std::move(item));
}

// Enqueues a single item (by constructing it in-place from arguments).
// Allocates memory if required. Only fails if memory allocation fails (or implicit
// production is disabled because Traits::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE is 0,
// or Traits::MAX_SUBQUEUE_SIZE has been defined and would be surpassed).
// Thread-safe.
template<typename... Args>
inline bool enqueue_emplace(Args&&... args)
{
MOODYCAMEL_CONSTEXPR_IF (INITIAL_IMPLICIT_PRODUCER_HASH_SIZE == 0) return false;
else return inner_enqueue<CanAlloc>(std::forward<Args>(args)...);
}

// Enqueues a single item (by copying it) using an explicit producer token.
// Allocates memory if required. Only fails if memory allocation fails (or
// Traits::MAX_SUBQUEUE_SIZE has been defined and would be surpassed).
Expand All @@ -1027,6 +1039,17 @@ class ConcurrentQueue
return inner_enqueue<CanAlloc>(token, std::move(item));
}

// Enqueues a single item (by constructing it in-place from arguments) using an explicit
// producer token.
// Allocates memory if required. Only fails if memory allocation fails (or
// Traits::MAX_SUBQUEUE_SIZE has been defined and would be surpassed).
// Thread-safe.
template<typename... Args>
inline bool enqueue_token_emplace(producer_token_t const& token, Args&&... args)
{
return inner_enqueue<CanAlloc>(token, std::forward<Args>(args)...);
}

// Enqueues several items.
// Allocates memory if required. Only fails if memory allocation fails (or
// implicit production is disabled because Traits::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE
Expand Down Expand Up @@ -1074,6 +1097,18 @@ class ConcurrentQueue
else return inner_enqueue<CannotAlloc>(std::move(item));
}

// Enqueues a single item (by constructing it in-place from arguments).
// Does not allocate memory (except for one-time implicit producer).
// Fails if not enough room to enqueue (or implicit production is
// disabled because Traits::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE is 0).
// Thread-safe.
template<typename... Args>
inline bool try_enqueue_emplace(Args&&... args)
{
MOODYCAMEL_CONSTEXPR_IF (INITIAL_IMPLICIT_PRODUCER_HASH_SIZE == 0) return false;
else return inner_enqueue<CannotAlloc>(std::forward<Args>(args)...);
}

// Enqueues a single item (by copying it) using an explicit producer token.
// Does not allocate memory. Fails if not enough room to enqueue.
// Thread-safe.
Expand All @@ -1090,6 +1125,16 @@ class ConcurrentQueue
return inner_enqueue<CannotAlloc>(token, std::move(item));
}

// Enqueues a single item (by constructing it in-place from arguments) using an explicit
// producer token.
// Does not allocate memory. Fails if not enough room to enqueue.
// Thread-safe.
template<typename... Args>
inline bool try_enqueue_token_emplace(producer_token_t const& token, Args&&... args)
{
return inner_enqueue<CannotAlloc>(token, std::forward<Args>(args)...);
}

// Enqueues several items.
// Does not allocate memory (except for one-time implicit producer).
// Fails if not enough room to enqueue (or implicit production is
Expand Down Expand Up @@ -1363,17 +1408,17 @@ class ConcurrentQueue
// Queue methods
///////////////////////////////

template<AllocationMode canAlloc, typename U>
inline bool inner_enqueue(producer_token_t const& token, U&& element)
template<AllocationMode canAlloc, typename... Args>
inline bool inner_enqueue(producer_token_t const& token, Args&&... args)
{
return static_cast<ExplicitProducer*>(token.producer)->ConcurrentQueue::ExplicitProducer::template enqueue<canAlloc>(std::forward<U>(element));
return static_cast<ExplicitProducer*>(token.producer)->ConcurrentQueue::ExplicitProducer::template enqueue<canAlloc>(std::forward<Args>(args)...);
}

template<AllocationMode canAlloc, typename U>
inline bool inner_enqueue(U&& element)
template<AllocationMode canAlloc, typename... Args>
inline bool inner_enqueue(Args&&... args)
{
auto producer = get_or_add_implicit_producer();
return producer == nullptr ? false : producer->ConcurrentQueue::ImplicitProducer::template enqueue<canAlloc>(std::forward<U>(element));
return producer == nullptr ? false : producer->ConcurrentQueue::ImplicitProducer::template enqueue<canAlloc>(std::forward<Args>(args)...);
}

template<AllocationMode canAlloc, typename It>
Expand Down Expand Up @@ -1845,8 +1890,8 @@ class ConcurrentQueue
}
}

template<AllocationMode allocMode, typename U>
inline bool enqueue(U&& element)
template<AllocationMode allocMode, typename... Args>
inline bool enqueue(Args&&... args)
{
index_t currentTailIndex = this->tailIndex.load(std::memory_order_relaxed);
index_t newTailIndex = 1 + currentTailIndex;
Expand Down Expand Up @@ -1912,11 +1957,11 @@ class ConcurrentQueue
++pr_blockIndexSlotsUsed;
}

MOODYCAMEL_CONSTEXPR_IF (!MOODYCAMEL_NOEXCEPT_CTOR(T, U, new (static_cast<T*>(nullptr)) T(std::forward<U>(element)))) {
MOODYCAMEL_CONSTEXPR_IF (!MOODYCAMEL_NOEXCEPT_CTOR(T, U, new (static_cast<T*>(nullptr)) T(std::forward<Args>(args)...))) {
// The constructor may throw. We want the element not to appear in the queue in
// that case (without corrupting the queue):
MOODYCAMEL_TRY {
new ((*this->tailBlock)[currentTailIndex]) T(std::forward<U>(element));
new ((*this->tailBlock)[currentTailIndex]) T(std::forward<Args>(args)...);
}
MOODYCAMEL_CATCH (...) {
// Revert change to the current block, but leave the new block available
Expand All @@ -1938,14 +1983,14 @@ class ConcurrentQueue
blockIndex.load(std::memory_order_relaxed)->front.store(pr_blockIndexFront, std::memory_order_release);
pr_blockIndexFront = (pr_blockIndexFront + 1) & (pr_blockIndexSize - 1);

MOODYCAMEL_CONSTEXPR_IF (!MOODYCAMEL_NOEXCEPT_CTOR(T, U, new (static_cast<T*>(nullptr)) T(std::forward<U>(element)))) {
MOODYCAMEL_CONSTEXPR_IF (!MOODYCAMEL_NOEXCEPT_CTOR(T, U, new (static_cast<T*>(nullptr)) T(std::forward<Args>(args)...))) {
this->tailIndex.store(newTailIndex, std::memory_order_release);
return true;
}
}

// Enqueue
new ((*this->tailBlock)[currentTailIndex]) T(std::forward<U>(element));
new ((*this->tailBlock)[currentTailIndex]) T(std::forward<Args>(args)...);

this->tailIndex.store(newTailIndex, std::memory_order_release);
return true;
Expand Down Expand Up @@ -2483,8 +2528,8 @@ class ConcurrentQueue
}
}

template<AllocationMode allocMode, typename U>
inline bool enqueue(U&& element)
template<AllocationMode allocMode, typename... Args>
inline bool enqueue(Args&&... args)
{
index_t currentTailIndex = this->tailIndex.load(std::memory_order_relaxed);
index_t newTailIndex = 1 + currentTailIndex;
Expand Down Expand Up @@ -2516,10 +2561,10 @@ class ConcurrentQueue
#endif
newBlock->ConcurrentQueue::Block::template reset_empty<implicit_context>();

MOODYCAMEL_CONSTEXPR_IF (!MOODYCAMEL_NOEXCEPT_CTOR(T, U, new (static_cast<T*>(nullptr)) T(std::forward<U>(element)))) {
MOODYCAMEL_CONSTEXPR_IF (!MOODYCAMEL_NOEXCEPT_CTOR(T, U, new (static_cast<T*>(nullptr)) T(std::forward<Args>(args)...))) {
// May throw, try to insert now before we publish the fact that we have this new block
MOODYCAMEL_TRY {
new ((*newBlock)[currentTailIndex]) T(std::forward<U>(element));
new ((*newBlock)[currentTailIndex]) T(std::forward<Args>(args)...);
}
MOODYCAMEL_CATCH (...) {
rewind_block_index_tail();
Expand All @@ -2534,14 +2579,14 @@ class ConcurrentQueue

this->tailBlock = newBlock;

MOODYCAMEL_CONSTEXPR_IF (!MOODYCAMEL_NOEXCEPT_CTOR(T, U, new (static_cast<T*>(nullptr)) T(std::forward<U>(element)))) {
MOODYCAMEL_CONSTEXPR_IF (!MOODYCAMEL_NOEXCEPT_CTOR(T, U, new (static_cast<T*>(nullptr)) T(std::forward<Args>(args)...))) {
this->tailIndex.store(newTailIndex, std::memory_order_release);
return true;
}
}

// Enqueue
new ((*this->tailBlock)[currentTailIndex]) T(std::forward<U>(element));
new ((*this->tailBlock)[currentTailIndex]) T(std::forward<Args>(args)...);

this->tailIndex.store(newTailIndex, std::memory_order_release);
return true;
Expand Down
83 changes: 83 additions & 0 deletions tests/unittests/unittests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,29 @@ struct Moveable {
#endif
};

struct Emplaceable {
Emplaceable() : moved(false), copied(false), copyData(0), moveData(0) {}
Emplaceable(const Copyable& copyData, Moveable&& moveData) : moved(false), copied(false), copyData(copyData), moveData(std::move(moveData)) { }
Emplaceable(Emplaceable&& o) MOODYCAMEL_NOEXCEPT : moved(true), copied(o.copied), copyData(o.copyData), moveData(std::move(o.moveData)) { }
void operator=(Emplaceable&& o) MOODYCAMEL_NOEXCEPT { moved = true; copied = o.copied; copyData = o.copyData; moveData = std::move(o.moveData); }
bool moved;
bool copied;
Copyable copyData;
Moveable moveData;

#if defined(_MSC_VER) && _MSC_VER < 1800
// VS2012's std::is_nothrow_[move_]constructible is broken, so the queue never attempts to
// move objects with that compiler. In this case, we don't know whether it's really a copy
// or not being done, so give the benefit of the doubt (given the tests pass on other platforms)
// and assume it would have done a move if it could have (don't set copied to true).
Emplaceable(Emplaceable const& o) MOODYCAMEL_NOEXCEPT : moved(o.moved), copied(o.copied), copyData(o.copyData), moveData(o.moveData) { }
void operator=(Emplaceable const& o) MOODYCAMEL_NOEXCEPT { moved = o.moved; copied = o.copied; copyData = o.copyData; moveData = o.moveData; }
#else
Emplaceable(Emplaceable const& o) MOODYCAMEL_NOEXCEPT : moved(o.moved), copied(true), copyData(o.copyData), moveData(o.moveData) { }
void operator=(Emplaceable const& o) MOODYCAMEL_NOEXCEPT { moved = o.moved; copied = true; copyData = o.copyData; moveData = o.moveData; }
#endif
};

struct ThrowingMovable {
static std::atomic<int>& ctorCount() { static std::atomic<int> c; return c; }
static std::atomic<int>& destroyCount() { static std::atomic<int> c; return c; }
Expand Down Expand Up @@ -3310,6 +3333,20 @@ class ConcurrentQueueTests : public TestClass<ConcurrentQueueTests>
ASSERT_OR_FAIL(!item.copied);
ASSERT_OR_FAIL(!q.try_dequeue(item));
}
// enqueue_emplace(Args&&...)
{
ConcurrentQueue<Emplaceable, Traits> q;
ASSERT_OR_FAIL(q.enqueue_emplace(Copyable(1234), Moveable(12345)));
Emplaceable item;
ASSERT_OR_FAIL(q.try_dequeue(item));
ASSERT_OR_FAIL(item.copyData.id == 1234);
ASSERT_OR_FAIL(item.moveData.id == 12345);
ASSERT_OR_FAIL(item.moved);
ASSERT_OR_FAIL(!item.copied);
ASSERT_OR_FAIL(item.moveData.moved);
ASSERT_OR_FAIL(!item.moveData.copied);
ASSERT_OR_FAIL(!q.try_dequeue(item));
}
{
ConcurrentQueue<Moveable, Traits> q;
Moveable original(12345);
Expand Down Expand Up @@ -3356,6 +3393,21 @@ class ConcurrentQueueTests : public TestClass<ConcurrentQueueTests>
ASSERT_OR_FAIL(!item.copied);
ASSERT_OR_FAIL(!q.try_dequeue(item));
}
// enqueue_emplace(Token, Args&&...)
{
ConcurrentQueue<Emplaceable, Traits> q;
ProducerToken t(q);
ASSERT_OR_FAIL(q.enqueue_token_emplace(t, Copyable(1234), Moveable(12345)));
Emplaceable item;
ASSERT_OR_FAIL(q.try_dequeue(item));
ASSERT_OR_FAIL(item.copyData.id == 1234);
ASSERT_OR_FAIL(item.moveData.id == 12345);
ASSERT_OR_FAIL(item.moved);
ASSERT_OR_FAIL(!item.copied);
ASSERT_OR_FAIL(item.moveData.moved);
ASSERT_OR_FAIL(!item.moveData.copied);
ASSERT_OR_FAIL(!q.try_dequeue(item));
}
{
ConcurrentQueue<Moveable, Traits> q;
ProducerToken t(q);
Expand Down Expand Up @@ -3423,6 +3475,21 @@ class ConcurrentQueueTests : public TestClass<ConcurrentQueueTests>
ASSERT_OR_FAIL(!q.try_dequeue(item));
}

// try_enqueue_emplace(Args&&...)
{
ConcurrentQueue<Emplaceable, Traits> q;
ASSERT_OR_FAIL(q.try_enqueue_emplace(Copyable(1234), Moveable(12345)));
Emplaceable item;
ASSERT_OR_FAIL(q.try_dequeue(item));
ASSERT_OR_FAIL(item.copyData.id == 1234);
ASSERT_OR_FAIL(item.moveData.id == 12345);
ASSERT_OR_FAIL(item.moved);
ASSERT_OR_FAIL(!item.copied);
ASSERT_OR_FAIL(item.moveData.moved);
ASSERT_OR_FAIL(!item.moveData.copied);
ASSERT_OR_FAIL(!q.try_dequeue(item));
}

// try_enqueue(Token, T const&)
{
ConcurrentQueue<Copyable, Traits> q;
Expand Down Expand Up @@ -3471,6 +3538,22 @@ class ConcurrentQueueTests : public TestClass<ConcurrentQueueTests>
ASSERT_OR_FAIL(!q.try_dequeue(item));
}

// try_enqueue_emplace(Token, Args&&...)
{
ConcurrentQueue<Emplaceable, Traits> q;
ProducerToken t(q);
ASSERT_OR_FAIL(q.try_enqueue_token_emplace(t, Copyable(1234), Moveable(12345)));
Emplaceable item;
ASSERT_OR_FAIL(q.try_dequeue(item));
ASSERT_OR_FAIL(item.copyData.id == 1234);
ASSERT_OR_FAIL(item.moveData.id == 12345);
ASSERT_OR_FAIL(item.moved);
ASSERT_OR_FAIL(!item.copied);
ASSERT_OR_FAIL(item.moveData.moved);
ASSERT_OR_FAIL(!item.moveData.copied);
ASSERT_OR_FAIL(!q.try_dequeue(item));
}

// enqueue_bulk(It itemFirst, size_t count)
{
ConcurrentQueue<Copyable, Traits> q;
Expand Down