Skip to content

Commit

Permalink
Mutexmap (#95)
Browse files Browse the repository at this point in the history
* Adds Mutex Map

Signed-off-by: Elena Kolevska <[email protected]>

* Adds an atomic map

Signed-off-by: Elena Kolevska <[email protected]>

* More work on atomic map and mutex map

Signed-off-by: Elena Kolevska <[email protected]>

* Fixes, improvements and more tests

Signed-off-by: Elena Kolevska <[email protected]>

* Updates interface

Signed-off-by: Elena Kolevska <[email protected]>

* Linter

Signed-off-by: Elena Kolevska <[email protected]>

* Refactors atomic map to use generics

Signed-off-by: Elena Kolevska <[email protected]>

* cleanups

Signed-off-by: Elena Kolevska <[email protected]>

* Apply suggestions from code review

Co-authored-by: Cassie Coyle <[email protected]>
Signed-off-by: Elena Kolevska <[email protected]>

* small reorg

Signed-off-by: Elena Kolevska <[email protected]>

* Adds ItemCount()

Signed-off-by: Elena Kolevska <[email protected]>

* Removes atomicmap in favour of haxmap

Signed-off-by: Elena Kolevska <[email protected]>

* formats fix and adds comment

Signed-off-by: Elena Kolevska <[email protected]>

* Update concurrency/mutexmap.go

Co-authored-by: Josh van Leeuwen <[email protected]>
Signed-off-by: Elena Kolevska <[email protected]>

* Uses built in `clear`

Signed-off-by: Elena Kolevska <[email protected]>

* Revert "Removes atomicmap in favour of haxmap"

This reverts commit 20ca9ad.

Signed-off-by: Elena Kolevska <[email protected]>

* Uses clear() for atomic map too

Signed-off-by: Elena Kolevska <[email protected]>

---------

Signed-off-by: Elena Kolevska <[email protected]>
Signed-off-by: Elena Kolevska <[email protected]>
Co-authored-by: Cassie Coyle <[email protected]>
Co-authored-by: Josh van Leeuwen <[email protected]>
  • Loading branch information
3 people authored May 23, 2024
1 parent ccffb60 commit 106329e
Show file tree
Hide file tree
Showing 5 changed files with 420 additions and 0 deletions.
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
**/.DS_Store
.idea
.vscode
.vs
111 changes: 111 additions & 0 deletions concurrency/atomicmap.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
/*
Copyright 2024 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package concurrency

import (
"sync"

"golang.org/x/exp/constraints"
)

type AtomicValue[T constraints.Integer] struct {
lock sync.RWMutex
value T
}

func (a *AtomicValue[T]) Load() T {
a.lock.RLock()
defer a.lock.RUnlock()
return a.value
}

func (a *AtomicValue[T]) Store(v T) {
a.lock.Lock()
defer a.lock.Unlock()
a.value = v
}

func (a *AtomicValue[T]) Add(v T) T {
a.lock.Lock()
defer a.lock.Unlock()
a.value += v
return a.value
}

type AtomicMap[K comparable, T constraints.Integer] interface {
Get(key K) (*AtomicValue[T], bool)
GetOrCreate(key K, createT T) *AtomicValue[T]
Delete(key K)
ForEach(fn func(key K, value *AtomicValue[T]))
Clear()
}

type atomicMap[K comparable, T constraints.Integer] struct {
lock sync.RWMutex
items map[K]*AtomicValue[T]
}

func NewAtomicMap[K comparable, T constraints.Integer]() AtomicMap[K, T] {
return &atomicMap[K, T]{
items: make(map[K]*AtomicValue[T]),
}
}

func (a *atomicMap[K, T]) Get(key K) (*AtomicValue[T], bool) {
a.lock.RLock()
defer a.lock.RUnlock()

item, ok := a.items[key]
if !ok {
return nil, false
}
return item, true
}

func (a *atomicMap[K, T]) GetOrCreate(key K, createT T) *AtomicValue[T] {
a.lock.RLock()
item, ok := a.items[key]
a.lock.RUnlock()
if !ok {
a.lock.Lock()
// Double-check the key exists to avoid race condition
item, ok = a.items[key]
if !ok {
item = &AtomicValue[T]{value: createT}
a.items[key] = item
}
a.lock.Unlock()
}
return item
}

func (a *atomicMap[K, T]) Delete(key K) {
a.lock.Lock()
delete(a.items, key)
a.lock.Unlock()
}

func (a *atomicMap[K, T]) ForEach(fn func(key K, value *AtomicValue[T])) {
a.lock.RLock()
defer a.lock.RUnlock()
for k, v := range a.items {
fn(k, v)
}
}

func (a *atomicMap[K, T]) Clear() {
a.lock.Lock()
defer a.lock.Unlock()
clear(a.items)
}
79 changes: 79 additions & 0 deletions concurrency/atomicmap_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
Copyright 2024 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package concurrency

import (
"sync"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestAtomicMapInt32_New_Get_Delete(t *testing.T) {
m := NewAtomicMap[string, int32]().(*atomicMap[string, int32])

require.NotNil(t, m)
require.NotNil(t, m.items)
require.Empty(t, m.items)

t.Run("basic operations", func(t *testing.T) {
key := "key1"
value := int32(10)

// Initially, the key should not exist
_, ok := m.Get(key)
require.False(t, ok)

// Add a value and check it
m.GetOrCreate(key, 0).Store(value)
result, ok := m.Get(key)
require.True(t, ok)
assert.Equal(t, value, result.Load())

// Delete the key and check it no longer exists
m.Delete(key)
_, ok = m.Get(key)
require.False(t, ok)
})

t.Run("concurrent access multiple keys", func(t *testing.T) {
var wg sync.WaitGroup
keys := []string{"key1", "key2", "key3"}
iterations := 100

wg.Add(len(keys) * 2)
for _, key := range keys {
go func(k string) {
defer wg.Done()
for i := 0; i < iterations; i++ {
m.GetOrCreate(k, 0).Add(1)
}
}(key)
go func(k string) {
defer wg.Done()
for i := 0; i < iterations; i++ {
m.GetOrCreate(k, 0).Add(-1)
}
}(key)
}
wg.Wait()

for _, key := range keys {
val, ok := m.Get(key)
require.True(t, ok)
require.Equal(t, int32(0), val.Load())
}
})
}
119 changes: 119 additions & 0 deletions concurrency/mutexmap.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
/*
Copyright 2024 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package concurrency

import (
"sync"
)

// MutexMap is an interface that defines a thread-safe map with keys of type T associated to
// read-write mutexes (sync.RWMutex), allowing for granular locking on a per-key basis.
// This can be useful for scenarios where fine-grained concurrency control is needed.
//
// Methods:
// - Lock(key T): Acquires an exclusive lock on the mutex associated with the given key.
// - Unlock(key T): Releases the exclusive lock on the mutex associated with the given key.
// - RLock(key T): Acquires a read lock on the mutex associated with the given key.
// - RUnlock(key T): Releases the read lock on the mutex associated with the given key.
// - Delete(key T): Removes the mutex associated with the given key from the map.
// - Clear(): Removes all mutexes from the map.
// - ItemCount() int: Returns the number of items (mutexes) in the map.
type MutexMap[T comparable] interface {
Lock(key T)
Unlock(key T)
RLock(key T)
RUnlock(key T)
Delete(key T)
Clear()
ItemCount() int
}

type mutexMap[T comparable] struct {
lock sync.RWMutex
items map[T]*sync.RWMutex
}

func NewMutexMap[T comparable]() MutexMap[T] {
return &mutexMap[T]{
items: make(map[T]*sync.RWMutex),
}
}

func (a *mutexMap[T]) Lock(key T) {
a.lock.RLock()
mutex, ok := a.items[key]
a.lock.RUnlock()
if !ok {
a.lock.Lock()
mutex, ok = a.items[key]
if !ok {
mutex = &sync.RWMutex{}
a.items[key] = mutex
}
a.lock.Unlock()
}
mutex.Lock()
}

func (a *mutexMap[T]) Unlock(key T) {
a.lock.RLock()
mutex, ok := a.items[key]
a.lock.RUnlock()
if ok {
mutex.Unlock()
}
}

func (a *mutexMap[T]) RLock(key T) {
a.lock.RLock()
mutex, ok := a.items[key]
a.lock.RUnlock()
if !ok {
a.lock.Lock()
mutex, ok = a.items[key]
if !ok {
mutex = &sync.RWMutex{}
a.items[key] = mutex
}
a.lock.Unlock()
}
mutex.Lock()
}

func (a *mutexMap[T]) RUnlock(key T) {
a.lock.RLock()
mutex, ok := a.items[key]
a.lock.RUnlock()
if ok {
mutex.Unlock()
}
}

func (a *mutexMap[T]) Delete(key T) {
a.lock.Lock()
delete(a.items, key)
a.lock.Unlock()
}

func (a *mutexMap[T]) Clear() {
a.lock.Lock()
clear(a.items)
a.lock.Unlock()
}

func (a *mutexMap[T]) ItemCount() int {
a.lock.Lock()
defer a.lock.Unlock()
return len(a.items)
}
Loading

0 comments on commit 106329e

Please sign in to comment.