-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathtrigger.go
71 lines (57 loc) · 1.79 KB
/
trigger.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
// Copyright (c) 2022, Janoš Guljaš <[email protected]>
// All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package feed
import (
"sync"
)
// Trigger notifies over topic based subscription channels if one or more
// trigger events occurred. Trigger is useful in situations where only
// information that something happened is useful, not information about all
// occurred events.
type Trigger[T comparable] struct {
channels map[T][]chan struct{}
mu sync.RWMutex
}
// NewTrigger constructs a new Trigger instance.
func NewTrigger[T comparable]() *Trigger[T] {
return &Trigger[T]{
channels: make(map[T][]chan struct{}),
}
}
// Subscribe returns a channel of empty structs which will return if at least
// one Trigger call has been done on the same topic.
func (t *Trigger[T]) Subscribe(topic T) (c <-chan struct{}, cancel func()) {
channel := make(chan struct{}, 1)
t.mu.Lock()
defer t.mu.Unlock()
t.channels[topic] = append(t.channels[topic], channel)
return channel, func() { t.unsubscribe(topic, channel) }
}
func (t *Trigger[T]) unsubscribe(topic T, c <-chan struct{}) {
t.mu.Lock()
defer t.mu.Unlock()
for i, ch := range t.channels[topic] {
if ch == c {
t.channels[topic] = append(t.channels[topic][:i], t.channels[topic][i+1:]...)
close(ch)
}
}
}
// Trigger notifies all subscritions on the provided topic. Notifications will
// be delivered to subscribers when each of them is ready to receive it, without
// blocking this method call. Returned integer is a number of subscriptions that
// will be notified.
func (t *Trigger[T]) Trigger(topic T) (n int) {
t.mu.RLock()
defer t.mu.RUnlock()
for _, c := range t.channels[topic] {
select {
case c <- struct{}{}:
default:
}
n++
}
return n
}