Skip to content

Commit 65ba378

Browse files
authored
Adds events/broadcaster (#110)
* Adds events/broadcaster Adds a generic buffered dmessage broadcaster which will relay a typed message to a dynamic set of subscribers. Signed-off-by: joshvanl <[email protected]> * Lint Signed-off-by: joshvanl <[email protected]> * Review comments Signed-off-by: joshvanl <[email protected]> --------- Signed-off-by: joshvanl <[email protected]>
1 parent 30e2c24 commit 65ba378

File tree

1 file changed

+125
-0
lines changed

1 file changed

+125
-0
lines changed

events/broadcaster/broadcaster.go

+125
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
/*
2+
Copyright 2024 The Dapr Authors
3+
Licensed under the Apache License, Version 2.0 (the "License");
4+
you may not use this file except in compliance with the License.
5+
You may obtain a copy of the License at
6+
http://www.apache.org/licenses/LICENSE-2.0
7+
Unless required by applicable law or agreed to in writing, software
8+
distributed under the License is distributed on an "AS IS" BASIS,
9+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
See the License for the specific language governing permissions and
11+
limitations under the License.
12+
*/
13+
14+
package broadcaster
15+
16+
import (
17+
"context"
18+
"sync"
19+
"sync/atomic"
20+
)
21+
22+
const bufferSize = 10
23+
24+
type eventCh[T any] struct {
25+
id uint64
26+
ch chan<- T
27+
closeEventCh chan struct{}
28+
}
29+
30+
type Broadcaster[T any] struct {
31+
eventChs []*eventCh[T]
32+
currentID uint64
33+
34+
lock sync.Mutex
35+
wg sync.WaitGroup
36+
closeCh chan struct{}
37+
closed atomic.Bool
38+
}
39+
40+
// New creates a new Broadcaster with the given interval and key type.
41+
func New[T any]() *Broadcaster[T] {
42+
return &Broadcaster[T]{
43+
closeCh: make(chan struct{}),
44+
}
45+
}
46+
47+
// Subscribe adds a new event channel subscriber. If the batcher is closed, the
48+
// subscriber is silently dropped.
49+
func (b *Broadcaster[T]) Subscribe(ctx context.Context, ch ...chan<- T) {
50+
b.lock.Lock()
51+
defer b.lock.Unlock()
52+
for _, c := range ch {
53+
b.subscribe(ctx, c)
54+
}
55+
}
56+
57+
func (b *Broadcaster[T]) subscribe(ctx context.Context, ch chan<- T) {
58+
if b.closed.Load() {
59+
return
60+
}
61+
62+
id := b.currentID
63+
b.currentID++
64+
bufferedCh := make(chan T, bufferSize)
65+
closeEventCh := make(chan struct{})
66+
b.eventChs = append(b.eventChs, &eventCh[T]{
67+
id: id,
68+
ch: bufferedCh,
69+
closeEventCh: closeEventCh,
70+
})
71+
72+
b.wg.Add(1)
73+
go func() {
74+
defer func() {
75+
close(closeEventCh)
76+
77+
b.lock.Lock()
78+
for i, eventCh := range b.eventChs {
79+
if eventCh.id == id {
80+
b.eventChs = append(b.eventChs[:i], b.eventChs[i+1:]...)
81+
break
82+
}
83+
}
84+
b.lock.Unlock()
85+
b.wg.Done()
86+
}()
87+
88+
for {
89+
select {
90+
case <-ctx.Done():
91+
return
92+
case <-b.closeCh:
93+
return
94+
case ch <- <-bufferedCh:
95+
}
96+
}
97+
}()
98+
}
99+
100+
// Broadcast sends the given value to all subscribers.
101+
func (b *Broadcaster[T]) Broadcast(value T) {
102+
b.lock.Lock()
103+
defer b.lock.Unlock()
104+
if b.closed.Load() {
105+
return
106+
}
107+
for _, ev := range b.eventChs {
108+
select {
109+
case <-ev.closeEventCh:
110+
case ev.ch <- value:
111+
case <-b.closeCh:
112+
}
113+
}
114+
}
115+
116+
// Close closes the Broadcaster. It blocks until all events have been sent to
117+
// the subscribers. The Broadcaster will be a no-op after this call.
118+
func (b *Broadcaster[T]) Close() {
119+
defer b.wg.Wait()
120+
b.lock.Lock()
121+
if b.closed.CompareAndSwap(false, true) {
122+
close(b.closeCh)
123+
}
124+
b.lock.Unlock()
125+
}

0 commit comments

Comments
 (0)