Skip to content

Commit

Permalink
eventBus -> eventProducer
Browse files Browse the repository at this point in the history
  • Loading branch information
xiandong-italki committed Oct 17, 2022
1 parent 0c4440d commit 21bc452
Show file tree
Hide file tree
Showing 8 changed files with 146 additions and 111 deletions.
17 changes: 17 additions & 0 deletions event.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ type DomainEvent interface {
initEventId()
}

type RegularEvent interface {
}

type SampleDomainEvent struct {
aggregateId int64
aggregateName string
Expand Down Expand Up @@ -75,3 +78,17 @@ func (s *SampleDomainEvent) EventBodyRaw() (bodyRaw []byte, err error) {
}
return
}

func newSampleDomainEvent(eventMessage DomainEventMessage, change aggregateChange) (domainEvent SampleDomainEvent, err error) {
domainEvent = SampleDomainEvent{
aggregateId: eventMessage.AggregateId,
aggregateName: eventMessage.AggregateName,
eventId: eventMessage.EventId,
eventName: eventMessage.EventName,
}
err = json.Unmarshal(eventMessage.EventBody, &change)
if err != nil {
return
}
return
}
2 changes: 1 addition & 1 deletion event_bus.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (

// EventHandle handle event, if return err is not nil, it will reject event,
// if want to requeue, then make the requeue be true
type EventHandle func(ctx context.Context, eventDecoder EventDecoder) (err error, requeue bool)
type EventHandle func(ctx context.Context, domainEvent DomainEvent) (err error, requeue bool)

type EventBus interface {
Name() string
Expand Down
8 changes: 2 additions & 6 deletions event_bus_local.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package gddd

import (
"context"
"encoding/json"
"sync"
)

Expand Down Expand Up @@ -115,11 +114,8 @@ func (bus *LocalEventBus) Start(ctx context.Context) (err error) {
if !has {
continue
}
p, jsonErr := json.Marshal(event.value)
if jsonErr != nil {
continue
}
_, _ = handle(ctx, NewJsonEventDecoder(p))
domainEvent := event.value.(DomainEvent)
_, _ = handle(ctx, domainEvent)
bus.eventCount.Done()
}
}(ctx, bus)
Expand Down
180 changes: 105 additions & 75 deletions event_bus_rocketmq.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,26 +11,32 @@ import (
"strings"
)

type RocketMqEventBusConfig struct {
type RocketMqEventProducerConfig struct {
DomainName string
SubDomainName string
NameServers []string
EventStore *EventStore
}

func NewRocketMqEventBus(ctx context.Context, config RocketMqEventBusConfig) (bus EventBus, err error) {
type RocketmqEventProducer struct {
Name string
Brokers []string
Producer rocketmq.TransactionProducer
}

func NewRocketMqEventProducer(ctx context.Context, config RocketMqEventProducerConfig) (eventProducer RocketmqEventProducer, err error) {
domainName := strings.TrimSpace(config.DomainName)
if domainName == "" {
err = fmt.Errorf("new rocketmq event bus failed, DomainName is empty")
err = fmt.Errorf("new rocketmq event producer failed, DomainName is empty")
return
}
subDomainName := strings.TrimSpace(config.SubDomainName)
if subDomainName == "" {
err = fmt.Errorf("new rocketmq event bus failed, SubDomainName is empty")
err = fmt.Errorf("new rocketmq event producer failed, SubDomainName is empty")
return
}
if config.NameServers == nil || len(config.NameServers) == 0 {
err = fmt.Errorf("new rocketmq event bus failed, NameServers is nil or empty")
err = fmt.Errorf("new rocketmq event producer failed, NameServers is nil or empty")
return
}
p, err := rocketmq.NewTransactionProducer(
Expand All @@ -42,54 +48,39 @@ func NewRocketMqEventBus(ctx context.Context, config RocketMqEventBusConfig) (bu
fmt.Printf("new producer error: %s\n", err.Error())
panic(err)
}
err = p.Start()
eventProducer = RocketmqEventProducer{
Name: fmt.Sprintf("%s_%s", config.DomainName, config.SubDomainName),
Brokers: config.NameServers,
Producer: p,
}
eventProducer.Producer = p
return
}

func (p *RocketmqEventProducer) Start() {
err := p.Producer.Start()
if err != nil {
fmt.Printf("start producer error: %s\n", err.Error())
panic(err)
}
}

// consumer
c, _ := rocketmq.NewPushConsumer(
consumer.WithGroupName("testGroup"),
consumer.WithNsResolver(primitive.NewPassthroughResolver(config.NameServers)),
)
func (p *RocketmqEventProducer) Stop() {
err := p.Producer.Shutdown()
if err != nil {
fmt.Printf("new consumer error: %s\n", err.Error())
fmt.Printf("stop producer error: %s\n", err.Error())
panic(err)
}

consumers := make(map[string]EventHandle)
bus0 := &rocketmqEventBus{
name: fmt.Sprintf("%s_%s", config.DomainName, config.SubDomainName),
//running: NewAtomicSwitch(),
brokers: config.NameServers,
producer: p,
consumer: c,
consumers: consumers,
}
bus = bus0
return
}

type rocketmqEventBus struct {
name string
brokers []string
producer rocketmq.TransactionProducer
consumer rocketmq.PushConsumer
consumers map[string]EventHandle
}

func (bus *rocketmqEventBus) Name() string {
return bus.name
}

func (bus *rocketmqEventBus) Send(ctx context.Context, eventMessages ...DomainEventMessage) (err error) {
if bus.producer == nil {
err = fmt.Errorf("rocketmq event bus send event failed, producer is nil")
func (p *RocketmqEventProducer) Send(ctx context.Context, eventMessages ...DomainEventMessage) (err error) {
if p.Producer == nil {
err = fmt.Errorf("rocketmq event producer send event failed, Producer is nil")
return
}
if eventMessages == nil || len(eventMessages) == 0 {
err = fmt.Errorf("rocketmq event bus send event failed, eventMessages is nil or empty")
err = fmt.Errorf("rocketmq event producer send event failed, eventMessages is nil or empty")
return
}
msgs := make([]*primitive.Message, 0, 1)
Expand All @@ -99,10 +90,10 @@ func (bus *rocketmqEventBus) Send(ctx context.Context, eventMessages ...DomainEv
if err != nil {
return
}
msg := primitive.NewMessage(eventMessage.TopicName(bus.name), messageBody)
msg := primitive.NewMessage(eventMessage.TopicName(p.Name), messageBody)
msgs = append(msgs, msg)
}
res, err := bus.producer.SendMessageInTransaction(ctx, msgs...)
res, err := p.Producer.SendMessageInTransaction(ctx, msgs...)
if err != nil {
fmt.Printf("send message error: %s\n", err)
} else {
Expand All @@ -111,47 +102,86 @@ func (bus *rocketmqEventBus) Send(ctx context.Context, eventMessages ...DomainEv
return
}

func (bus *rocketmqEventBus) Recv(_ context.Context, topic string, handle EventHandle) (err error) {
topic = strings.TrimSpace(topic)
if topic == "" {
err = fmt.Errorf("rocketmq event bus recv event failed, topic is empty")
return
}
if handle == nil {
err = fmt.Errorf("rocketmq event bus recv event failed, handle is nil")
return
}
bus.consumers[topic] = handle
return
type RocketMqEventConsumerConfig struct {
DomainName string
GroupName string
NameServers []string
}

func (bus *rocketmqEventBus) Start(ctx context.Context) (err error) {
go func() {
for {
err = bus.consumer.Subscribe("DefaultCluster", consumer.MessageSelector{}, func(ctx context.Context,
msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) {
for i := range msgs {
fmt.Printf("subscribe callback: %v \n", msgs[i])
}
//这个相当于消费者 消息ack,如果失败可以返回 consumer.ConsumeRetryLater
return consumer.ConsumeSuccess, nil
})
if err != nil {
fmt.Printf("consume error: %s\n", err.Error())
}
}
}()
return nil
type RocketMqEventConsumer struct {
DomainName string
GroupName string
Consumer rocketmq.PushConsumer
}

func (bus *rocketmqEventBus) Shutdown() {
func NewRocketMqEventConsumer(ctx context.Context, config RocketMqEventConsumerConfig) (eventConsumer *RocketMqEventConsumer, err error) {
eventConsumer.DomainName = config.DomainName
eventConsumer.GroupName = config.GroupName
c, err := rocketmq.NewPushConsumer(
consumer.WithGroupName(eventConsumer.GroupName),
consumer.WithNsResolver(primitive.NewPassthroughResolver(config.NameServers)),
)
eventConsumer.Consumer = c
return
}

func (bus *rocketmqEventBus) Await() {
return
func (c *RocketMqEventConsumer) Start() {
err := c.Consumer.Start()
if err != nil {
fmt.Printf("RocketMqEventConsumer start error: %v\n", err)
panic(err)
}
}

func (bus *rocketmqEventBus) Close(ctx context.Context) (err error) {
return
func (c *RocketMqEventConsumer) Stop() {
err := c.Consumer.Shutdown()
if err != nil {
fmt.Printf("RocketMqEventConsumer stop error: %v\n", err)
panic(err)
}
}

func (c *RocketMqEventConsumer) Subscribe(topicName string, change aggregateChange, eventHandle EventHandle) {
topicName = strings.TrimSpace(topicName)
if topicName == "" {
err := fmt.Errorf("RocketMqEventConsumer subscribe event failed, topicName is empty")
panic(err)
}
if change == nil {
err := fmt.Errorf("RocketMqEventConsumer subscribe event failed, change is nil")
panic(err)
}
if eventHandle == nil {
err := fmt.Errorf("RocketMqEventConsumer subscribe event failed, eventHandle is nil")
panic(err)
}
err := c.Consumer.Subscribe(topicName, consumer.MessageSelector{},
func(ctx context.Context, msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) {
for _, msg := range msgs {
if topicName == msg.Message.Topic {
domainEventMessage := new(DomainEventMessage)
err := json.Unmarshal(msg.Message.Body, domainEventMessage)
if err != nil {
return consumer.ConsumeRetryLater, err
}
if getAggregateChangeName(change) == domainEventMessage.EventName {
var event SampleDomainEvent
// TODO: change is read only?
newChange := change
event, err = newSampleDomainEvent(*domainEventMessage, newChange)
if err != nil {
return 0, err
}
err, _ = eventHandle(context.TODO(), &event)
if err != nil {
return consumer.ConsumeRetryLater, err
}
}
}
}
return consumer.ConsumeSuccess, nil
})
if err != nil {
panic(nil)
}
}
36 changes: 14 additions & 22 deletions event_bus_rocketmq_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,34 +8,30 @@ import (
)

func TestNewRocketMqEventBus(t *testing.T) {
config := RocketMqEventBusConfig{"apple", "community", []string{"127.0.0.1:9876"}, nil}
gotBus, err := NewRocketMqEventBus(context.TODO(), config)
config := RocketMqEventProducerConfig{"apple", "community", []string{"127.0.0.1:9876"}, nil}
gotBus, err := NewRocketMqEventProducer(context.TODO(), config)
if err != nil {
t.Errorf("NewRocketMqEventBus() error = %v", err)
t.Errorf("NewRocketMqEventProducer() error = %v", err)
return
}
fmt.Println(gotBus)
}

func Test_rocketmqEventBus_Start(t *testing.T) {
config := RocketMqEventBusConfig{"apple", "community", []string{"127.0.0.1:9876"}, nil}
gotBus, err := NewRocketMqEventBus(context.TODO(), config)
config := RocketMqEventProducerConfig{"apple", "community", []string{"127.0.0.1:9876"}, nil}
gotBus, err := NewRocketMqEventProducer(context.TODO(), config)
if err != nil {
t.Errorf("NewRocketMqEventBus() error = %v", err)
return
}
err = gotBus.Start(context.TODO())
if err != nil {
t.Errorf("start error = %v", err)
t.Errorf("NewRocketMqEventProducer() error = %v", err)
return
}
gotBus.Start()
}

func Test_rocketmqEventBus_Send(t *testing.T) {
config := RocketMqEventBusConfig{"apple", "community", []string{"127.0.0.1:9876"}, nil}
gotBus, err := NewRocketMqEventBus(context.TODO(), config)
config := RocketMqEventProducerConfig{"apple", "community", []string{"127.0.0.1:9876"}, nil}
gotBus, err := NewRocketMqEventProducer(context.TODO(), config)
if err != nil {
t.Errorf("NewRocketMqEventBus() error = %v", err)
t.Errorf("NewRocketMqEventProducer() error = %v", err)
return
}
event := SampleDomainEvent{aggregateId: 1, aggregateName: "book", eventId: 1, eventName: "BookCreated", eventBody: BookCreated{Book: "aa", Price: 2, CreateTime: time.Now().UTC()}}
Expand All @@ -48,16 +44,12 @@ func Test_rocketmqEventBus_Send(t *testing.T) {
}

func Test_rocketmqEventBus_Recv(t *testing.T) {
config := RocketMqEventBusConfig{"apple", "community", []string{"127.0.0.1:9876"}, nil}
gotBus, err := NewRocketMqEventBus(context.TODO(), config)
if err != nil {
t.Errorf("NewRocketMqEventBus() error = %v", err)
return
}
err = gotBus.Start(context.TODO())
config := RocketMqEventProducerConfig{"apple", "community", []string{"127.0.0.1:9876"}, nil}
gotBus, err := NewRocketMqEventProducer(context.TODO(), config)
if err != nil {
t.Errorf("start error = %v", err)
t.Errorf("NewRocketMqEventProducer() error = %v", err)
return
}
gotBus.Start()
select {}
}
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ go 1.16
require (
github.com/apache/rocketmq-client-go/v2 v2.1.0
github.com/bwmarrin/snowflake v0.3.0
github.com/google/uuid v1.3.0
github.com/jinzhu/copier v0.3.5
github.com/sirupsen/logrus v1.8.1 // indirect
github.com/stretchr/testify v1.7.0 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ github.com/emirpasic/gods v1.12.0/go.mod h1:YfzfFFoVP/catgzJb4IKIqXjX78Ha8FMSDh3
github.com/golang/mock v1.3.1 h1:qGJ6qTW+x6xX/my+8YUVl4WNpX9B7+/l2tRsHGZ7f2s=
github.com/golang/mock v1.3.1/go.mod h1:sBzyDLLjw3U8JLTeZvSv8jJB+tU5PVekmnlKIyFUx0Y=
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I=
github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1 h1:EGx4pi6eqNxGaHF6qqu48+N2wcFQ5qg5FXgOdqsJ5d8=
github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY=
github.com/jinzhu/copier v0.3.5 h1:GlvfUwHk62RokgqVNvYsku0TATCF7bAHVwEXoBh3iJg=
Expand Down
Loading

0 comments on commit 21bc452

Please sign in to comment.