From 21bc452b03f1ed0c8fbd7722b1dab3acd8c2096f Mon Sep 17 00:00:00 2001 From: xiandong Date: Mon, 17 Oct 2022 22:58:02 +0800 Subject: [PATCH] eventBus -> eventProducer --- event.go | 17 ++++ event_bus.go | 2 +- event_bus_local.go | 8 +- event_bus_rocketmq.go | 180 +++++++++++++++++++++---------------- event_bus_rocketmq_test.go | 36 +++----- go.mod | 1 + go.sum | 2 + repository.go | 11 +-- 8 files changed, 146 insertions(+), 111 deletions(-) diff --git a/event.go b/event.go index 59c0ae5..7598a5e 100644 --- a/event.go +++ b/event.go @@ -22,6 +22,9 @@ type DomainEvent interface { initEventId() } +type RegularEvent interface { +} + type SampleDomainEvent struct { aggregateId int64 aggregateName string @@ -75,3 +78,17 @@ func (s *SampleDomainEvent) EventBodyRaw() (bodyRaw []byte, err error) { } return } + +func newSampleDomainEvent(eventMessage DomainEventMessage, change aggregateChange) (domainEvent SampleDomainEvent, err error) { + domainEvent = SampleDomainEvent{ + aggregateId: eventMessage.AggregateId, + aggregateName: eventMessage.AggregateName, + eventId: eventMessage.EventId, + eventName: eventMessage.EventName, + } + err = json.Unmarshal(eventMessage.EventBody, &change) + if err != nil { + return + } + return +} diff --git a/event_bus.go b/event_bus.go index d9600b0..0d44773 100644 --- a/event_bus.go +++ b/event_bus.go @@ -9,7 +9,7 @@ import ( // EventHandle handle event, if return err is not nil, it will reject event, // if want to requeue, then make the requeue be true -type EventHandle func(ctx context.Context, eventDecoder EventDecoder) (err error, requeue bool) +type EventHandle func(ctx context.Context, domainEvent DomainEvent) (err error, requeue bool) type EventBus interface { Name() string diff --git a/event_bus_local.go b/event_bus_local.go index c6b7300..2c7ee68 100644 --- a/event_bus_local.go +++ b/event_bus_local.go @@ -2,7 +2,6 @@ package gddd import ( "context" - "encoding/json" "sync" ) @@ -115,11 +114,8 @@ func (bus *LocalEventBus) Start(ctx context.Context) (err error) { if !has { continue } - p, jsonErr := json.Marshal(event.value) - if jsonErr != nil { - continue - } - _, _ = handle(ctx, NewJsonEventDecoder(p)) + domainEvent := event.value.(DomainEvent) + _, _ = handle(ctx, domainEvent) bus.eventCount.Done() } }(ctx, bus) diff --git a/event_bus_rocketmq.go b/event_bus_rocketmq.go index 7ef9fa2..43bd811 100644 --- a/event_bus_rocketmq.go +++ b/event_bus_rocketmq.go @@ -11,26 +11,32 @@ import ( "strings" ) -type RocketMqEventBusConfig struct { +type RocketMqEventProducerConfig struct { DomainName string SubDomainName string NameServers []string EventStore *EventStore } -func NewRocketMqEventBus(ctx context.Context, config RocketMqEventBusConfig) (bus EventBus, err error) { +type RocketmqEventProducer struct { + Name string + Brokers []string + Producer rocketmq.TransactionProducer +} + +func NewRocketMqEventProducer(ctx context.Context, config RocketMqEventProducerConfig) (eventProducer RocketmqEventProducer, err error) { domainName := strings.TrimSpace(config.DomainName) if domainName == "" { - err = fmt.Errorf("new rocketmq event bus failed, DomainName is empty") + err = fmt.Errorf("new rocketmq event producer failed, DomainName is empty") return } subDomainName := strings.TrimSpace(config.SubDomainName) if subDomainName == "" { - err = fmt.Errorf("new rocketmq event bus failed, SubDomainName is empty") + err = fmt.Errorf("new rocketmq event producer failed, SubDomainName is empty") return } if config.NameServers == nil || len(config.NameServers) == 0 { - err = fmt.Errorf("new rocketmq event bus failed, NameServers is nil or empty") + err = fmt.Errorf("new rocketmq event producer failed, NameServers is nil or empty") return } p, err := rocketmq.NewTransactionProducer( @@ -42,54 +48,39 @@ func NewRocketMqEventBus(ctx context.Context, config RocketMqEventBusConfig) (bu fmt.Printf("new producer error: %s\n", err.Error()) panic(err) } - err = p.Start() + eventProducer = RocketmqEventProducer{ + Name: fmt.Sprintf("%s_%s", config.DomainName, config.SubDomainName), + Brokers: config.NameServers, + Producer: p, + } + eventProducer.Producer = p + return +} + +func (p *RocketmqEventProducer) Start() { + err := p.Producer.Start() if err != nil { fmt.Printf("start producer error: %s\n", err.Error()) panic(err) } +} - // consumer - c, _ := rocketmq.NewPushConsumer( - consumer.WithGroupName("testGroup"), - consumer.WithNsResolver(primitive.NewPassthroughResolver(config.NameServers)), - ) +func (p *RocketmqEventProducer) Stop() { + err := p.Producer.Shutdown() if err != nil { - fmt.Printf("new consumer error: %s\n", err.Error()) + fmt.Printf("stop producer error: %s\n", err.Error()) panic(err) } - - consumers := make(map[string]EventHandle) - bus0 := &rocketmqEventBus{ - name: fmt.Sprintf("%s_%s", config.DomainName, config.SubDomainName), - //running: NewAtomicSwitch(), - brokers: config.NameServers, - producer: p, - consumer: c, - consumers: consumers, - } - bus = bus0 return } -type rocketmqEventBus struct { - name string - brokers []string - producer rocketmq.TransactionProducer - consumer rocketmq.PushConsumer - consumers map[string]EventHandle -} - -func (bus *rocketmqEventBus) Name() string { - return bus.name -} - -func (bus *rocketmqEventBus) Send(ctx context.Context, eventMessages ...DomainEventMessage) (err error) { - if bus.producer == nil { - err = fmt.Errorf("rocketmq event bus send event failed, producer is nil") +func (p *RocketmqEventProducer) Send(ctx context.Context, eventMessages ...DomainEventMessage) (err error) { + if p.Producer == nil { + err = fmt.Errorf("rocketmq event producer send event failed, Producer is nil") return } if eventMessages == nil || len(eventMessages) == 0 { - err = fmt.Errorf("rocketmq event bus send event failed, eventMessages is nil or empty") + err = fmt.Errorf("rocketmq event producer send event failed, eventMessages is nil or empty") return } msgs := make([]*primitive.Message, 0, 1) @@ -99,10 +90,10 @@ func (bus *rocketmqEventBus) Send(ctx context.Context, eventMessages ...DomainEv if err != nil { return } - msg := primitive.NewMessage(eventMessage.TopicName(bus.name), messageBody) + msg := primitive.NewMessage(eventMessage.TopicName(p.Name), messageBody) msgs = append(msgs, msg) } - res, err := bus.producer.SendMessageInTransaction(ctx, msgs...) + res, err := p.Producer.SendMessageInTransaction(ctx, msgs...) if err != nil { fmt.Printf("send message error: %s\n", err) } else { @@ -111,47 +102,86 @@ func (bus *rocketmqEventBus) Send(ctx context.Context, eventMessages ...DomainEv return } -func (bus *rocketmqEventBus) Recv(_ context.Context, topic string, handle EventHandle) (err error) { - topic = strings.TrimSpace(topic) - if topic == "" { - err = fmt.Errorf("rocketmq event bus recv event failed, topic is empty") - return - } - if handle == nil { - err = fmt.Errorf("rocketmq event bus recv event failed, handle is nil") - return - } - bus.consumers[topic] = handle - return +type RocketMqEventConsumerConfig struct { + DomainName string + GroupName string + NameServers []string } -func (bus *rocketmqEventBus) Start(ctx context.Context) (err error) { - go func() { - for { - err = bus.consumer.Subscribe("DefaultCluster", consumer.MessageSelector{}, func(ctx context.Context, - msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) { - for i := range msgs { - fmt.Printf("subscribe callback: %v \n", msgs[i]) - } - //这个相当于消费者 消息ack,如果失败可以返回 consumer.ConsumeRetryLater - return consumer.ConsumeSuccess, nil - }) - if err != nil { - fmt.Printf("consume error: %s\n", err.Error()) - } - } - }() - return nil +type RocketMqEventConsumer struct { + DomainName string + GroupName string + Consumer rocketmq.PushConsumer } -func (bus *rocketmqEventBus) Shutdown() { +func NewRocketMqEventConsumer(ctx context.Context, config RocketMqEventConsumerConfig) (eventConsumer *RocketMqEventConsumer, err error) { + eventConsumer.DomainName = config.DomainName + eventConsumer.GroupName = config.GroupName + c, err := rocketmq.NewPushConsumer( + consumer.WithGroupName(eventConsumer.GroupName), + consumer.WithNsResolver(primitive.NewPassthroughResolver(config.NameServers)), + ) + eventConsumer.Consumer = c return } -func (bus *rocketmqEventBus) Await() { - return +func (c *RocketMqEventConsumer) Start() { + err := c.Consumer.Start() + if err != nil { + fmt.Printf("RocketMqEventConsumer start error: %v\n", err) + panic(err) + } } -func (bus *rocketmqEventBus) Close(ctx context.Context) (err error) { - return +func (c *RocketMqEventConsumer) Stop() { + err := c.Consumer.Shutdown() + if err != nil { + fmt.Printf("RocketMqEventConsumer stop error: %v\n", err) + panic(err) + } +} + +func (c *RocketMqEventConsumer) Subscribe(topicName string, change aggregateChange, eventHandle EventHandle) { + topicName = strings.TrimSpace(topicName) + if topicName == "" { + err := fmt.Errorf("RocketMqEventConsumer subscribe event failed, topicName is empty") + panic(err) + } + if change == nil { + err := fmt.Errorf("RocketMqEventConsumer subscribe event failed, change is nil") + panic(err) + } + if eventHandle == nil { + err := fmt.Errorf("RocketMqEventConsumer subscribe event failed, eventHandle is nil") + panic(err) + } + err := c.Consumer.Subscribe(topicName, consumer.MessageSelector{}, + func(ctx context.Context, msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) { + for _, msg := range msgs { + if topicName == msg.Message.Topic { + domainEventMessage := new(DomainEventMessage) + err := json.Unmarshal(msg.Message.Body, domainEventMessage) + if err != nil { + return consumer.ConsumeRetryLater, err + } + if getAggregateChangeName(change) == domainEventMessage.EventName { + var event SampleDomainEvent + // TODO: change is read only? + newChange := change + event, err = newSampleDomainEvent(*domainEventMessage, newChange) + if err != nil { + return 0, err + } + err, _ = eventHandle(context.TODO(), &event) + if err != nil { + return consumer.ConsumeRetryLater, err + } + } + } + } + return consumer.ConsumeSuccess, nil + }) + if err != nil { + panic(nil) + } } diff --git a/event_bus_rocketmq_test.go b/event_bus_rocketmq_test.go index 6a49ff5..39052db 100644 --- a/event_bus_rocketmq_test.go +++ b/event_bus_rocketmq_test.go @@ -8,34 +8,30 @@ import ( ) func TestNewRocketMqEventBus(t *testing.T) { - config := RocketMqEventBusConfig{"apple", "community", []string{"127.0.0.1:9876"}, nil} - gotBus, err := NewRocketMqEventBus(context.TODO(), config) + config := RocketMqEventProducerConfig{"apple", "community", []string{"127.0.0.1:9876"}, nil} + gotBus, err := NewRocketMqEventProducer(context.TODO(), config) if err != nil { - t.Errorf("NewRocketMqEventBus() error = %v", err) + t.Errorf("NewRocketMqEventProducer() error = %v", err) return } fmt.Println(gotBus) } func Test_rocketmqEventBus_Start(t *testing.T) { - config := RocketMqEventBusConfig{"apple", "community", []string{"127.0.0.1:9876"}, nil} - gotBus, err := NewRocketMqEventBus(context.TODO(), config) + config := RocketMqEventProducerConfig{"apple", "community", []string{"127.0.0.1:9876"}, nil} + gotBus, err := NewRocketMqEventProducer(context.TODO(), config) if err != nil { - t.Errorf("NewRocketMqEventBus() error = %v", err) - return - } - err = gotBus.Start(context.TODO()) - if err != nil { - t.Errorf("start error = %v", err) + t.Errorf("NewRocketMqEventProducer() error = %v", err) return } + gotBus.Start() } func Test_rocketmqEventBus_Send(t *testing.T) { - config := RocketMqEventBusConfig{"apple", "community", []string{"127.0.0.1:9876"}, nil} - gotBus, err := NewRocketMqEventBus(context.TODO(), config) + config := RocketMqEventProducerConfig{"apple", "community", []string{"127.0.0.1:9876"}, nil} + gotBus, err := NewRocketMqEventProducer(context.TODO(), config) if err != nil { - t.Errorf("NewRocketMqEventBus() error = %v", err) + t.Errorf("NewRocketMqEventProducer() error = %v", err) return } event := SampleDomainEvent{aggregateId: 1, aggregateName: "book", eventId: 1, eventName: "BookCreated", eventBody: BookCreated{Book: "aa", Price: 2, CreateTime: time.Now().UTC()}} @@ -48,16 +44,12 @@ func Test_rocketmqEventBus_Send(t *testing.T) { } func Test_rocketmqEventBus_Recv(t *testing.T) { - config := RocketMqEventBusConfig{"apple", "community", []string{"127.0.0.1:9876"}, nil} - gotBus, err := NewRocketMqEventBus(context.TODO(), config) - if err != nil { - t.Errorf("NewRocketMqEventBus() error = %v", err) - return - } - err = gotBus.Start(context.TODO()) + config := RocketMqEventProducerConfig{"apple", "community", []string{"127.0.0.1:9876"}, nil} + gotBus, err := NewRocketMqEventProducer(context.TODO(), config) if err != nil { - t.Errorf("start error = %v", err) + t.Errorf("NewRocketMqEventProducer() error = %v", err) return } + gotBus.Start() select {} } diff --git a/go.mod b/go.mod index 431f344..85b5f93 100644 --- a/go.mod +++ b/go.mod @@ -5,6 +5,7 @@ go 1.16 require ( github.com/apache/rocketmq-client-go/v2 v2.1.0 github.com/bwmarrin/snowflake v0.3.0 + github.com/google/uuid v1.3.0 github.com/jinzhu/copier v0.3.5 github.com/sirupsen/logrus v1.8.1 // indirect github.com/stretchr/testify v1.7.0 // indirect diff --git a/go.sum b/go.sum index a1e40ba..3abe816 100644 --- a/go.sum +++ b/go.sum @@ -10,6 +10,8 @@ github.com/emirpasic/gods v1.12.0/go.mod h1:YfzfFFoVP/catgzJb4IKIqXjX78Ha8FMSDh3 github.com/golang/mock v1.3.1 h1:qGJ6qTW+x6xX/my+8YUVl4WNpX9B7+/l2tRsHGZ7f2s= github.com/golang/mock v1.3.1/go.mod h1:sBzyDLLjw3U8JLTeZvSv8jJB+tU5PVekmnlKIyFUx0Y= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= +github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I= +github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1 h1:EGx4pi6eqNxGaHF6qqu48+N2wcFQ5qg5FXgOdqsJ5d8= github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY= github.com/jinzhu/copier v0.3.5 h1:GlvfUwHk62RokgqVNvYsku0TATCF7bAHVwEXoBh3iJg= diff --git a/repository.go b/repository.go index 0312749..0a365d5 100644 --- a/repository.go +++ b/repository.go @@ -39,20 +39,17 @@ func NewRepository(ctx context.Context, config *RepositoryConfig) (r *Repository if esErr != nil { panic(fmt.Errorf("aggregates repository create failed, new event store failed, err: %w", esErr)) } - rocketMqConfig := RocketMqEventBusConfig{ + rocketMqConfig := RocketMqEventProducerConfig{ DomainName: config.DomainName, SubDomainName: config.SubDomainName, NameServers: config.RocketMqEventBusNameServers, EventStore: &es, } - eb, ebErr := NewRocketMqEventBus(ctx, rocketMqConfig) + eb, ebErr := NewRocketMqEventProducer(ctx, rocketMqConfig) if ebErr != nil { panic(fmt.Errorf("aggregates repository create failed, new rocketMq eventBus failed, err: %w", ebErr)) } - ebStartErr := eb.Start(context.TODO()) - if ebStartErr != nil { - panic(fmt.Errorf("aggregates repository create failed, start rocketMq eventBus failed, err: %w", ebStartErr)) - } + eb.Start() r = &Repository{ es: es, eb: eb, @@ -64,7 +61,7 @@ func NewRepository(ctx context.Context, config *RepositoryConfig) (r *Repository type Repository struct { //noCopy noCopy es EventStore - eb EventBus + eb RocketmqEventProducer saveListener RepositorySaveListener }