-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathevent_bus.go
55 lines (48 loc) · 1.5 KB
/
event_bus.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
package gddd
import (
"context"
"encoding/json"
"fmt"
"strings"
)
// EventHandle handle event, if return err is not nil, it will reject event,
// if want to requeue, then make the requeue be true
type EventHandle func(ctx context.Context, domainEvent DomainEvent) (err error, requeue bool)
type EventBus interface {
Name() string
Send(ctx context.Context, eventMessages ...DomainEventMessage) (err error)
Recv(ctx context.Context, topic string, handle EventHandle) (err error)
Start(ctx context.Context) (err error)
Shutdown()
Await()
Close(ctx context.Context) (err error)
}
type DomainEventMessage struct {
AggregateId int64 `json:"aggregate_id"`
AggregateName string `json:"aggregate_name"`
EventName string `json:"event_name"`
EventId int64 `json:"event_id"`
EventBody []byte `json:"event_body"`
}
func (msg *DomainEventMessage) TopicName(eventBusName string) string {
topic := fmt.Sprintf("%s_%s", eventBusName, strings.TrimSuffix(strings.ToLower(msg.AggregateName), "aggregate"))
return topic
}
func (msg *DomainEventMessage) Decode(byteData []byte) (err error) {
err = json.Unmarshal(byteData, msg)
return
}
func newDomainEventMessage(event DomainEvent) (msg DomainEventMessage, err error) {
eventBodyRaw, err := event.EventBodyRaw()
if err != nil {
return
}
msg = DomainEventMessage{
AggregateId: event.AggregateId(),
AggregateName: event.AggregateName(),
EventName: event.EventName(),
EventId: event.EventId(),
EventBody: eventBodyRaw,
}
return
}