Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update concurrentqueue.h #406

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 13 additions & 13 deletions concurrentqueue.h
Original file line number Diff line number Diff line change
Expand Up @@ -1493,7 +1493,7 @@ class ConcurrentQueue
assert((head->freeListRefs.load(std::memory_order_relaxed) & SHOULD_BE_ON_FREELIST) == 0);

// Decrease refcount twice, once for our ref, and once for the list's ref
head->freeListRefs.fetch_sub(2, std::memory_order_release);
head->freeListRefs.fetch_sub(2, std::memory_order_acq_rel);
return head;
}

Expand Down Expand Up @@ -1529,7 +1529,7 @@ class ConcurrentQueue
node->freeListRefs.store(1, std::memory_order_release);
if (!freeListHead.compare_exchange_strong(head, node, std::memory_order_release, std::memory_order_relaxed)) {
// Hmm, the add failed, but we can only try again when the refcount goes back to zero
if (node->freeListRefs.fetch_add(SHOULD_BE_ON_FREELIST - 1, std::memory_order_release) == 1) {
if (node->freeListRefs.fetch_add(SHOULD_BE_ON_FREELIST - 1, std::memory_order_acq_rel) == 1) {
continue;
}
}
Expand Down Expand Up @@ -1604,7 +1604,7 @@ class ConcurrentQueue
}
else {
// Increment counter
auto prevVal = elementsCompletelyDequeued.fetch_add(1, std::memory_order_release);
auto prevVal = elementsCompletelyDequeued.fetch_add(1, std::memory_order_acq_rel);
assert(prevVal < BLOCK_SIZE);
return prevVal == BLOCK_SIZE - 1;
}
Expand All @@ -1627,7 +1627,7 @@ class ConcurrentQueue
}
else {
// Increment counter
auto prevVal = elementsCompletelyDequeued.fetch_add(count, std::memory_order_release);
auto prevVal = elementsCompletelyDequeued.fetch_add(count, std::memory_order_acq_rel);
assert(prevVal + count <= BLOCK_SIZE);
return prevVal + count == BLOCK_SIZE;
}
Expand Down Expand Up @@ -2044,7 +2044,7 @@ class ConcurrentQueue
}
else {
// Wasn't anything to dequeue after all; make the effective dequeue count eventually consistent
this->dequeueOvercommit.fetch_add(1, std::memory_order_release); // Release so that the fetch_add on dequeueOptimisticCount is guaranteed to happen before this write
this->dequeueOvercommit.fetch_add(1, std::memory_order_acq_rel); // Release so that the fetch_add on dequeueOptimisticCount is guaranteed to happen before this write
}
}

Expand Down Expand Up @@ -2261,7 +2261,7 @@ class ConcurrentQueue
if (details::circular_less_than<size_t>(0, actualCount)) {
actualCount = desiredCount < actualCount ? desiredCount : actualCount;
if (actualCount < desiredCount) {
this->dequeueOvercommit.fetch_add(desiredCount - actualCount, std::memory_order_release);
this->dequeueOvercommit.fetch_add(desiredCount - actualCount, std::memory_order_acq_rel);
}

// Get the first index. Note that since there's guaranteed to be at least actualCount elements, this
Expand Down Expand Up @@ -2330,7 +2330,7 @@ class ConcurrentQueue
}
else {
// Wasn't anything to dequeue after all; make the effective dequeue count eventually consistent
this->dequeueOvercommit.fetch_add(desiredCount, std::memory_order_release);
this->dequeueOvercommit.fetch_add(desiredCount, std::memory_order_acq_rel);
}
}

Expand Down Expand Up @@ -2611,7 +2611,7 @@ class ConcurrentQueue
return true;
}
else {
this->dequeueOvercommit.fetch_add(1, std::memory_order_release);
this->dequeueOvercommit.fetch_add(1, std::memory_order_acq_rel);
}
}

Expand Down Expand Up @@ -2793,7 +2793,7 @@ class ConcurrentQueue
if (details::circular_less_than<size_t>(0, actualCount)) {
actualCount = desiredCount < actualCount ? desiredCount : actualCount;
if (actualCount < desiredCount) {
this->dequeueOvercommit.fetch_add(desiredCount - actualCount, std::memory_order_release);
this->dequeueOvercommit.fetch_add(desiredCount - actualCount, std::memory_order_acq_rel);
}

// Get the first index. Note that since there's guaranteed to be at least actualCount elements, this
Expand Down Expand Up @@ -2871,7 +2871,7 @@ class ConcurrentQueue
return actualCount;
}
else {
this->dequeueOvercommit.fetch_add(desiredCount, std::memory_order_release);
this->dequeueOvercommit.fetch_add(desiredCount, std::memory_order_acq_rel);
}
}

Expand Down Expand Up @@ -3452,7 +3452,7 @@ class ConcurrentQueue
auto newCount = 1 + implicitProducerHashCount.fetch_add(1, std::memory_order_relaxed);
while (true) {
// NOLINTNEXTLINE(clang-analyzer-core.NullDereference)
if (newCount >= (mainHash->capacity >> 1) && !implicitProducerHashResizeInProgress.test_and_set(std::memory_order_acquire)) {
if (newCount >= (mainHash->capacity >> 1) && !implicitProducerHashResizeInProgress.test_and_set(std::memory_order_acq_rel)) {
// We've acquired the resize lock, try to allocate a bigger hash table.
// Note the acquire fence synchronizes with the release fence at the end of this block, and hence when
// we reload implicitProducerHash it must be the most recent version (it only gets changed within this
Expand Down Expand Up @@ -3702,15 +3702,15 @@ template<typename T, typename Traits>
ConsumerToken::ConsumerToken(ConcurrentQueue<T, Traits>& queue)
: itemsConsumedFromCurrent(0), currentProducer(nullptr), desiredProducer(nullptr)
{
initialOffset = queue.nextExplicitConsumerId.fetch_add(1, std::memory_order_release);
initialOffset = queue.nextExplicitConsumerId.fetch_add(1, std::memory_order_acq_rel);
lastKnownGlobalOffset = static_cast<std::uint32_t>(-1);
}

template<typename T, typename Traits>
ConsumerToken::ConsumerToken(BlockingConcurrentQueue<T, Traits>& queue)
: itemsConsumedFromCurrent(0), currentProducer(nullptr), desiredProducer(nullptr)
{
initialOffset = reinterpret_cast<ConcurrentQueue<T, Traits>*>(&queue)->nextExplicitConsumerId.fetch_add(1, std::memory_order_release);
initialOffset = reinterpret_cast<ConcurrentQueue<T, Traits>*>(&queue)->nextExplicitConsumerId.fetch_add(1, std::memory_order_acq_rel);
lastKnownGlobalOffset = static_cast<std::uint32_t>(-1);
}

Expand Down