Skip to content

Commit

Permalink
add event_bus_dtm.go
Browse files Browse the repository at this point in the history
  • Loading branch information
xiandong-italki committed Mar 23, 2023
1 parent 32337ef commit ec728d8
Show file tree
Hide file tree
Showing 9 changed files with 699 additions and 31 deletions.
12 changes: 12 additions & 0 deletions event_bus.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"encoding/json"
"fmt"
"github.com/wuyazi/gddd/proto"
"strings"
)

Expand Down Expand Up @@ -53,3 +54,14 @@ func newDomainEventMessage(event DomainEvent) (msg DomainEventMessage, err error
}
return
}

func newDomainEventMessageProto(msg0 DomainEventMessage) (msg proto.DomainEventMessageProto) {
msg = proto.DomainEventMessageProto{
AggregateId: msg0.AggregateId,
AggregateName: msg0.AggregateName,
EventName: msg0.EventName,
EventId: msg0.EventId,
EventBody: msg0.EventBody,
}
return
}
181 changes: 181 additions & 0 deletions event_bus_dtm.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
package gddd

import (
"context"
"database/sql"
"encoding/json"
"fmt"
"github.com/apache/rocketmq-client-go/v2"
"github.com/apache/rocketmq-client-go/v2/consumer"
"github.com/apache/rocketmq-client-go/v2/primitive"
"github.com/dtm-labs/client/dtmcli"
"github.com/dtm-labs/client/dtmgrpc"
"github.com/lithammer/shortuuid/v3"
"strings"
)

type DtmEventProducerConfig struct {
DomainName string
SubDomainName string
NameServers []string
EventStore *EventStore
}

type DtmEventProducer struct {
Name string
Brokers []string
Producer *dtmcli.Msg
EventStore EventStore
}

func NewDtmEventProducer(ctx context.Context, config DtmEventProducerConfig) (eventProducer DtmEventProducer, err error) {
domainName := strings.TrimSpace(config.DomainName)
if domainName == "" {
err = fmt.Errorf("new dtm event producer failed, DomainName is empty")
return
}
subDomainName := strings.TrimSpace(config.SubDomainName)
if subDomainName == "" {
err = fmt.Errorf("new dtm event producer failed, SubDomainName is empty")
return
}
if config.NameServers == nil || len(config.NameServers) == 0 {
err = fmt.Errorf("new dtm event producer failed, NameServers is nil or empty")
return
}
eventProducer = DtmEventProducer{
Name: fmt.Sprintf("%s_%s", config.DomainName, config.SubDomainName),
Brokers: config.NameServers,
Producer: nil,
EventStore: *config.EventStore,
}
return
}

func ExecuteLocalTransaction(ctx context.Context, es EventStore, eventsMessages []DomainEventMessage) error {
storedEvents := make([]StoredEvent, 0, 1)
for _, eventMessage := range eventsMessages {
storedEvent, err := newJsonStoredEvent(eventMessage.AggregateId, eventMessage.AggregateName, eventMessage.EventId, eventMessage.EventName, eventMessage.EventBody)
if err != nil {
return fmt.Errorf("newJsonStoredEvent error")
}
storedEvents = append(storedEvents, storedEvent)
}
storeEventsErr := es.StoreEvents(ctx, storedEvents)
if storeEventsErr != nil {
return fmt.Errorf("storeEventsErr error")
}
return nil
}

func (p *DtmEventProducer) Send(ctx context.Context, eventMessages ...DomainEventMessage) (err error) {
if eventMessages == nil || len(eventMessages) == 0 {
err = fmt.Errorf("dtm event producer send event failed, eventMessages is nil or empty")
return
}
//dtmMsg := dtmcli.NewMsg("http://localhost:36789/api/dtmsvr", shortuuid.New())
dtmMsg := dtmgrpc.NewMsgGrpc("localhost:36790", shortuuid.New())
for _, eventMessage := range eventMessages {
var messageBody []byte
messageBody, err = json.Marshal(eventMessage)
if err != nil {
return
}
fmt.Errorf("%+v", messageBody)
//dtmMsg = dtmMsg.Add("http://localhost:8081/api/busi/TransIn", &messageBody)
msg := newDomainEventMessageProto(eventMessage)
dtmMsg = dtmMsg.Add("localhost:8080/proto.userQuery/insertUser", &msg)
}
err = dtmMsg.DoAndSubmitDB("localhost:8081/busi.Busi/QueryPreparedB", p.EventStore.GetDB(ctx), func(tx *sql.Tx) error {
// TODO use tx
return ExecuteLocalTransaction(ctx, p.EventStore, eventMessages)
})
if err != nil {
return err
}
return
}

type DtmEventConsumerConfig struct {
DomainName string
GroupName string
NameServers []string
}

type DtmEventConsumer struct {
DomainName string
GroupName string
Consumer rocketmq.PushConsumer
}

func NewDtmEventConsumer(ctx context.Context, config DtmEventConsumerConfig) (eventConsumer *DtmEventConsumer, err error) {
eventConsumer.DomainName = config.DomainName
eventConsumer.GroupName = config.GroupName
c, err := rocketmq.NewPushConsumer(
consumer.WithGroupName(eventConsumer.GroupName),
consumer.WithNsResolver(primitive.NewPassthroughResolver(config.NameServers)),
)
eventConsumer.Consumer = c
return
}

func (c *DtmEventConsumer) Start() {
err := c.Consumer.Start()
if err != nil {
fmt.Printf("DtmEventConsumer start error: %v\n", err)
panic(err)
}
}

func (c *DtmEventConsumer) Stop() {
err := c.Consumer.Shutdown()
if err != nil {
fmt.Printf("DtmEventConsumer stop error: %v\n", err)
panic(err)
}
}

func (c *DtmEventConsumer) Subscribe(topicName string, change aggregateChange, eventHandle EventHandle) {
topicName = strings.TrimSpace(topicName)
if topicName == "" {
err := fmt.Errorf("DtmEventConsumer subscribe event failed, topicName is empty")
panic(err)
}
if change == nil {
err := fmt.Errorf("DtmEventConsumer subscribe event failed, change is nil")
panic(err)
}
if eventHandle == nil {
err := fmt.Errorf("DtmEventConsumer subscribe event failed, eventHandle is nil")
panic(err)
}
err := c.Consumer.Subscribe(topicName, consumer.MessageSelector{},
func(ctx context.Context, msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) {
for _, msg := range msgs {
if topicName == msg.Message.Topic {
domainEventMessage := new(DomainEventMessage)
err := json.Unmarshal(msg.Message.Body, domainEventMessage)
if err != nil {
return consumer.ConsumeRetryLater, err
}
if getAggregateChangeName(change) == domainEventMessage.EventName {
var event SampleDomainEvent
// TODO: change is read only?
newChange := change
event, err = newSampleDomainEvent(*domainEventMessage, newChange)
if err != nil {
return 0, err
}
err, _ = eventHandle(context.TODO(), &event)
if err != nil {
return consumer.ConsumeRetryLater, err
}
}
}
}
return consumer.ConsumeSuccess, nil
})
if err != nil {
panic(nil)
}
}
2 changes: 2 additions & 0 deletions event_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@ package gddd

import (
"context"
"database/sql"
"time"
)

type EventStore interface {
Name() (name string)
GetDB(ctx context.Context) *sql.DB
InitDomainEventStoreTable(ctx context.Context, aggregateName string)
ReadEvents(ctx context.Context, aggregateName string, aggregateId int64, lastEventId int64) (events []StoredEvent, err error)
StoreEvents(ctx context.Context, events []StoredEvent) (err error)
Expand Down
4 changes: 4 additions & 0 deletions event_store_mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,10 @@ type mysqlEventStore struct {
snapshotsChan chan Aggregate
}

func (es *mysqlEventStore) GetDB(ctx context.Context) *sql.DB {
return es.db
}

func (es *mysqlEventStore) listenSnapshotsChan(ctx context.Context) {
go func(ctx context.Context, es *mysqlEventStore) {
stopped := false
Expand Down
54 changes: 43 additions & 11 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,33 +3,65 @@ module github.com/wuyazi/gddd
go 1.18

require (
github.com/apache/rocketmq-client-go/v2 v2.1.0
github.com/apache/rocketmq-client-go/v2 v2.1.1-rc2
github.com/bwmarrin/snowflake v0.3.0
github.com/jinzhu/copier v0.3.5
github.com/valyala/bytebufferpool v1.0.0
)

require (
github.com/BurntSushi/toml v1.2.0 // indirect
github.com/cespare/xxhash/v2 v2.1.2 // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
github.com/dtm-labs/client v1.18.7 // indirect
github.com/dtm-labs/dtmdriver v0.0.6 // indirect
github.com/dtm-labs/logger v0.0.1 // indirect
github.com/emirpasic/gods v1.12.0 // indirect
github.com/golang/mock v1.3.1 // indirect
github.com/go-redis/redis/v8 v8.11.5 // indirect
github.com/go-resty/resty/v2 v2.7.0 // indirect
github.com/go-stack/stack v1.8.0 // indirect
github.com/golang/mock v1.6.0 // indirect
github.com/golang/protobuf v1.5.2 // indirect
github.com/golang/snappy v0.0.1 // indirect
github.com/google/uuid v1.2.0 // indirect
github.com/gopherjs/gopherjs v0.0.0-20200217142428-fce0ec30dd00 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421 // indirect
github.com/klauspost/compress v1.13.6 // indirect
github.com/kr/pretty v0.3.0 // indirect
github.com/lithammer/shortuuid/v3 v3.0.7 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/natefinch/lumberjack v2.0.0+incompatible // indirect
github.com/patrickmn/go-cache v2.1.0+incompatible // indirect
github.com/pkg/errors v0.8.1 // indirect
github.com/satori/go.uuid v1.2.0 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/rogpeppe/go-internal v1.8.1 // indirect
github.com/satori/go.uuid v1.2.1-0.20181028125025-b2ce2384e17b // indirect
github.com/sirupsen/logrus v1.8.1 // indirect
github.com/smartystreets/assertions v1.1.0 // indirect
github.com/stathat/consistent v1.0.0 // indirect
github.com/stretchr/testify v1.7.0 // indirect
github.com/stretchr/testify v1.8.0 // indirect
github.com/tidwall/gjson v1.13.0 // indirect
github.com/tidwall/match v1.1.1 // indirect
github.com/tidwall/pretty v1.2.0 // indirect
go.uber.org/atomic v1.5.1 // indirect
golang.org/x/lint v0.0.0-20190930215403-16217165b5de // indirect
golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e // indirect
golang.org/x/tools v0.0.0-20191029041327-9cc4af7d6b2c // indirect
github.com/xdg-go/pbkdf2 v1.0.0 // indirect
github.com/xdg-go/scram v1.0.2 // indirect
github.com/xdg-go/stringprep v1.0.2 // indirect
github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d // indirect
go.mongodb.org/mongo-driver v1.9.1 // indirect
go.uber.org/atomic v1.9.0 // indirect
go.uber.org/multierr v1.6.0 // indirect
go.uber.org/zap v1.21.0 // indirect
golang.org/x/crypto v0.0.0-20201216223049-8b5274cf687f // indirect
golang.org/x/net v0.0.0-20211029224645-99673261e6eb // indirect
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c // indirect
golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a // indirect
golang.org/x/text v0.3.6 // indirect
google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013 // indirect
google.golang.org/grpc v1.48.0 // indirect
google.golang.org/protobuf v1.28.0 // indirect
gopkg.in/natefinch/lumberjack.v2 v2.0.0 // indirect
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b // indirect
)

replace github.com/apache/rocketmq-client-go/v2 => ../../../github.com/wuyazi/rocketmq-client-go

replace github.com/dtm-labs/dtm => ../../../github.com/wuyazi/dtm
Loading

0 comments on commit ec728d8

Please sign in to comment.