-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathevent_bus_local.go
138 lines (118 loc) · 2.79 KB
/
event_bus_local.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
package gddd
import (
"context"
"sync"
)
type localEventbusGroups struct {
mutex *sync.Mutex
groups map[string]chan *localEvent
eventCountMap map[string]*sync.WaitGroup
}
func (g *localEventbusGroups) addGroup(name string, ch chan *localEvent, count *sync.WaitGroup) {
g.mutex.Lock()
defer g.mutex.Unlock()
g.eventCountMap[name] = count
g.groups[name] = ch
}
func (g *localEventbusGroups) removeGroup(name string) {
g.mutex.Lock()
defer g.mutex.Unlock()
delete(g.groups, name)
delete(g.eventCountMap, name)
if len(g.groups) == 0 {
_localEventbusGroups = nil
}
}
func (g *localEventbusGroups) send(topic string, event interface{}) {
g.mutex.Lock()
defer g.mutex.Unlock()
e := &localEvent{
topic: topic,
value: event,
}
for name, events := range g.groups {
events <- e
g.eventCountMap[name].Add(1)
}
}
var _localEventbusGroups *localEventbusGroups
func NewLocalEventBus(name string) (bus EventBus) {
if _localEventbusGroups == nil {
_localEventbusGroups = &localEventbusGroups{
mutex: &sync.Mutex{},
groups: make(map[string]chan *localEvent),
eventCountMap: make(map[string]*sync.WaitGroup),
}
}
eventsCh := make(chan *localEvent, 1024)
eventCount := &sync.WaitGroup{}
_localEventbusGroups.addGroup(name, eventsCh, eventCount)
bus = &LocalEventBus{
eventCount: eventCount,
name: name,
eventsCh: eventsCh,
consumers: make(map[string]EventHandle),
}
return
}
type localEvent struct {
topic string
value interface{}
}
type LocalEventBus struct {
eventCount *sync.WaitGroup
name string
eventsCh chan *localEvent
consumers map[string]EventHandle
}
func (bus *LocalEventBus) Name() string {
return bus.name
}
func (bus *LocalEventBus) Send(ctx context.Context, eventMessages ...DomainEventMessage) (err error) {
if eventMessages == nil {
return
}
for _, eventMessage := range eventMessages {
_localEventbusGroups.send(eventMessage.TopicName(bus.name), eventMessage)
}
return
}
func (bus *LocalEventBus) Recv(ctx context.Context, topic string, handle EventHandle) (err error) {
if topic == "" {
return
}
if handle == nil {
return
}
bus.consumers[topic] = handle
return
}
func (bus *LocalEventBus) Start(ctx context.Context) (err error) {
go func(ctx context.Context, bus *LocalEventBus) {
for {
event, ok := <-bus.eventsCh
if !ok {
break
}
handle, has := bus.consumers[event.topic]
if !has {
continue
}
domainEvent := event.value.(DomainEvent)
_, _ = handle(ctx, domainEvent)
bus.eventCount.Done()
}
}(ctx, bus)
return
}
func (bus *LocalEventBus) Shutdown() {
close(bus.eventsCh)
_localEventbusGroups.removeGroup(bus.name)
}
func (bus *LocalEventBus) Await() {
}
func (bus *LocalEventBus) Close(ctx context.Context) (err error) {
bus.Shutdown()
bus.Await()
return
}