diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..db67165 --- /dev/null +++ b/.gitignore @@ -0,0 +1,15 @@ +# Binaries for programs and plugins +*.exe +*.exe~ +*.dll +*.so +*.dylib + +# Test binary, build with `go test -c` +*.test + +# Output of the go coverage tool, specifically when used with LiteIDE +*.out + +# goland +.idea \ No newline at end of file diff --git a/README.md b/README.md new file mode 100644 index 0000000..be186e9 --- /dev/null +++ b/README.md @@ -0,0 +1,39 @@ + +## 执行顺序 + +``` +| Command | +| Repository | Aggregate | +| | EventBus | +| EventStore | +``` + +## Command(命令) + +命令 + +## Aggregate(聚合) + +用于描述业务,并产生事件 +定义实体,和事件,所有实体更改只能通过事件 + +## Repository(仓库) + +用于存储数据的接口 + +## EventBus(事件总线) + +用于传输事件 + +## EventStore(事件存储) + +用于存储事件,并可回溯(EventSourcing) + +## TODO + +- [ ] 事件消费 +- [ ] 事件溯源 +- [ ] go-zero 适配 +- [ ] 归档:将每日的事件归档 +- [ ] 唯一索引:创建用户时,邮箱唯一 +- [ ] 文档 diff --git a/aggregate.go b/aggregate.go new file mode 100644 index 0000000..1655c66 --- /dev/null +++ b/aggregate.go @@ -0,0 +1,46 @@ +package gddd + +import "context" + +func NewAggregateId() (id int64) { + id = node.Generate().Int64() + return +} + +func ApplyAggregateChange(ctx context.Context, aggregate Aggregate, change aggregateChange) { + aggregate.Apply(aggregate, change) +} + +type Aggregate interface { + InitId() + Identifier() (id int64) + Apply(agg Aggregate, event aggregateChange) + Applied() (events []DomainEvent) +} + +type AbstractAggregate struct { + Id int64 `json:"id"` + lifecycle aggregateLifecycle +} + +func (a *AbstractAggregate) InitId() { + if a.Id == 0 { + a.Id = NewAggregateId() + } + return +} + +func (a *AbstractAggregate) Identifier() (id int64) { + id = a.Id + return +} + +func (a *AbstractAggregate) Apply(agg Aggregate, aggChange aggregateChange) { + a.lifecycle.apply(agg, aggChange) + return +} + +func (a *AbstractAggregate) Applied() (events []DomainEvent) { + events = a.lifecycle.getDomainEvents() + return +} diff --git a/aggregate_lifecycle.go b/aggregate_lifecycle.go new file mode 100644 index 0000000..5d5b410 --- /dev/null +++ b/aggregate_lifecycle.go @@ -0,0 +1,55 @@ +package gddd + +import ( + "errors" + "strings" + "sync" +) + +type aggregateLifecycle struct { + mutex sync.Mutex + domainEvents []DomainEvent +} + +type aggregateChange interface{} + +func (c *aggregateLifecycle) apply(agg Aggregate, aggChange aggregateChange) { + c.mutex.Lock() + if aggChange == nil { + c.mutex.Unlock() + panic(errors.New("aggregate apply failed, aggregateChange is nil")) + return + } + eventName := strings.TrimSpace(getAggregateChangeName(aggChange)) + if eventName == "" { + c.mutex.Unlock() + panic(errors.New("aggregate apply failed, eventName is empty")) + return + } + domainEvent := SampleDomainEvent{ + aggregateId: agg.Identifier(), + aggregateName: getAggregateName(agg), + eventName: eventName, + eventBody: aggChange, + } + domainEvent.initEventId() + err := handleAppliedDomainEvent(agg, &domainEvent) + if err != nil { + c.mutex.Unlock() + panic(errors.New("aggregate apply failed, apply domain event failed")) + return + } + c.domainEvents = append(c.domainEvents, &domainEvent) + c.mutex.Unlock() + return +} + +func (c *aggregateLifecycle) getDomainEvents() []DomainEvent { + return c.domainEvents +} + +func (c *aggregateLifecycle) cleanDomainEvents() { + c.mutex.Lock() + c.domainEvents = make([]DomainEvent, 0, 1) + c.mutex.Unlock() +} diff --git a/command_bus.go b/command_bus.go new file mode 100644 index 0000000..0aa346a --- /dev/null +++ b/command_bus.go @@ -0,0 +1,12 @@ +package gddd + +import "context" + +type Command interface { +} + +type AbstractCommand struct { + AggregateId string +} + +type CommandHandle func(ctx context.Context, command Command) (result interface{}, err error) diff --git a/common.go b/common.go new file mode 100644 index 0000000..2318b0b --- /dev/null +++ b/common.go @@ -0,0 +1,58 @@ +package gddd + +import ( + "fmt" + "github.com/bwmarrin/snowflake" + "reflect" + "time" +) + +var node *snowflake.Node + +func init() { + // init snowflake + var nodeErr error + node, nodeErr = snowflake.NewNode(NodeNumber) + if nodeErr != nil { + fmt.Println(nodeErr) + return + } +} + +func getAggregateName(a Aggregate) string { + rt := reflect.TypeOf(a) + if rt.Kind() == reflect.Ptr { + rt = rt.Elem() + } + if rt.Kind() != reflect.Struct { + panic("bad aggregate type") + } + return rt.Name() +} + +func getAggregateChangeName(e aggregateChange) string { + rt := reflect.TypeOf(e) + if rt.Kind() == reflect.Ptr { + rt = rt.Elem() + } + if rt.Kind() != reflect.Struct { + panic("bad aggregateChange type") + } + return rt.Name() +} + +func getEventName(e DomainEvent) string { + rt := reflect.TypeOf(e) + if rt.Kind() == reflect.Ptr { + rt = rt.Elem() + } + if rt.Kind() != reflect.Struct { + panic("bad event type") + } + return rt.Name() +} + +func NodeTime(snowflakeId int64) time.Time { + milliTimeStamp := (snowflakeId >> (snowflake.NodeBits + snowflake.StepBits)) + snowflake.Epoch + return time.Unix(milliTimeStamp/1000, milliTimeStamp%1000*1000000) +} diff --git a/common_test.go b/common_test.go new file mode 100644 index 0000000..105014c --- /dev/null +++ b/common_test.go @@ -0,0 +1,13 @@ +package gddd + +import ( + "fmt" + "github.com/bwmarrin/snowflake" + "testing" +) + +func TestNodeTime(t *testing.T) { + node, _ = snowflake.NewNode(1) + time0 := NodeTime(node.Generate().Int64()) + fmt.Println("time is: ", time0) +} diff --git a/context.go b/context.go new file mode 100644 index 0000000..29d590c --- /dev/null +++ b/context.go @@ -0,0 +1,16 @@ +package gddd + +import ( + "context" + "database/sql" +) + +type Context struct { + ctx context.Context + tx *sql.Tx +} + +func (c *Context) Apply(aggregate Aggregate, change aggregateChange) error { + ApplyAggregateChange(c.ctx, aggregate, change) + return nil +} diff --git a/event.go b/event.go new file mode 100644 index 0000000..59c0ae5 --- /dev/null +++ b/event.go @@ -0,0 +1,77 @@ +package gddd + +import ( + "encoding/json" + "fmt" + "time" +) + +func NewDomainEventId() (id int64) { + id = node.Generate().Int64() + return +} + +type DomainEvent interface { + AggregateId() int64 + AggregateName() string + EventId() int64 + EventName() string + EventBody() interface{} + EventBodyRaw() ([]byte, error) + EventCreateTime() time.Time + initEventId() +} + +type SampleDomainEvent struct { + aggregateId int64 + aggregateName string + eventId int64 + eventName string + eventBody interface{} +} + +func (s *SampleDomainEvent) initEventId() { + if s.eventId == 0 { + s.eventId = NewDomainEventId() + } + return +} + +func (s *SampleDomainEvent) AggregateId() (id int64) { + id = s.aggregateId + return +} + +func (s *SampleDomainEvent) AggregateName() (name string) { + name = s.aggregateName + return +} + +func (s *SampleDomainEvent) EventId() (id int64) { + id = s.eventId + return +} + +func (s *SampleDomainEvent) EventName() (name string) { + name = s.eventName + return +} + +func (s *SampleDomainEvent) EventBody() (body interface{}) { + body = s.eventBody + return +} + +func (s *SampleDomainEvent) EventCreateTime() (createTime time.Time) { + createTime = NodeTime(s.eventId) + return +} + +func (s *SampleDomainEvent) EventBodyRaw() (bodyRaw []byte, err error) { + bodyRaw, err = json.Marshal(s.eventBody) + if err != nil { + err = fmt.Errorf("marshal domain event failed, %v", err) + return + } + return +} diff --git a/event_bus.go b/event_bus.go new file mode 100644 index 0000000..d9600b0 --- /dev/null +++ b/event_bus.go @@ -0,0 +1,55 @@ +package gddd + +import ( + "context" + "encoding/json" + "fmt" + "strings" +) + +// EventHandle handle event, if return err is not nil, it will reject event, +// if want to requeue, then make the requeue be true +type EventHandle func(ctx context.Context, eventDecoder EventDecoder) (err error, requeue bool) + +type EventBus interface { + Name() string + Send(ctx context.Context, eventMessages ...DomainEventMessage) (err error) + Recv(ctx context.Context, topic string, handle EventHandle) (err error) + Start(ctx context.Context) (err error) + Shutdown() + Await() + Close(ctx context.Context) (err error) +} + +type DomainEventMessage struct { + AggregateId int64 `json:"aggregate_id"` + AggregateName string `json:"aggregate_name"` + EventName string `json:"event_name"` + EventId int64 `json:"event_id"` + EventBody []byte `json:"event_body"` +} + +func (msg *DomainEventMessage) TopicName(eventBusName string) string { + topic := fmt.Sprintf("%s_%s", eventBusName, strings.TrimSuffix(strings.ToLower(msg.AggregateName), "aggregate")) + return topic +} + +func (msg *DomainEventMessage) Decode(byteData []byte) (err error) { + err = json.Unmarshal(byteData, msg) + return +} + +func newDomainEventMessage(event DomainEvent) (msg DomainEventMessage, err error) { + eventBodyRaw, err := event.EventBodyRaw() + if err != nil { + return + } + msg = DomainEventMessage{ + AggregateId: event.AggregateId(), + AggregateName: event.AggregateName(), + EventName: event.EventName(), + EventId: event.EventId(), + EventBody: eventBodyRaw, + } + return +} diff --git a/event_bus_encode.go b/event_bus_encode.go new file mode 100644 index 0000000..9048ac7 --- /dev/null +++ b/event_bus_encode.go @@ -0,0 +1,51 @@ +package gddd + +import ( + "encoding/json" + "errors" +) + +type EventDecoder interface { + RAW() (raw []byte) + Decode(v interface{}) (err error) +} + +func NewRawEventDecoder(raw []byte) *RawEventDecoder { + return &RawEventDecoder{ + raw: raw, + } +} + +type RawEventDecoder struct { + raw []byte +} + +func (d *RawEventDecoder) RAW() (raw []byte) { + raw = d.raw + return +} + +func (d *RawEventDecoder) Decode(v interface{}) (err error) { + err = errors.New("event decoder failed, it is native, please call decoder.RAW to get event bytes") + return +} + +func NewJsonEventDecoder(raw []byte) *JsonEventDecoder { + return &JsonEventDecoder{ + raw: raw, + } +} + +type JsonEventDecoder struct { + raw json.RawMessage +} + +func (d *JsonEventDecoder) RAW() (raw []byte) { + raw = d.raw + return +} + +func (d *JsonEventDecoder) Decode(v interface{}) (err error) { + err = json.Unmarshal(d.raw, v) + return +} diff --git a/event_bus_local.go b/event_bus_local.go new file mode 100644 index 0000000..c6b7300 --- /dev/null +++ b/event_bus_local.go @@ -0,0 +1,142 @@ +package gddd + +import ( + "context" + "encoding/json" + "sync" +) + +type localEventbusGroups struct { + mutex *sync.Mutex + groups map[string]chan *localEvent + eventCountMap map[string]*sync.WaitGroup +} + +func (g *localEventbusGroups) addGroup(name string, ch chan *localEvent, count *sync.WaitGroup) { + g.mutex.Lock() + defer g.mutex.Unlock() + g.eventCountMap[name] = count + g.groups[name] = ch +} + +func (g *localEventbusGroups) removeGroup(name string) { + g.mutex.Lock() + defer g.mutex.Unlock() + delete(g.groups, name) + delete(g.eventCountMap, name) + if len(g.groups) == 0 { + _localEventbusGroups = nil + } +} + +func (g *localEventbusGroups) send(topic string, event interface{}) { + g.mutex.Lock() + defer g.mutex.Unlock() + e := &localEvent{ + topic: topic, + value: event, + } + for name, events := range g.groups { + events <- e + g.eventCountMap[name].Add(1) + } +} + +var _localEventbusGroups *localEventbusGroups + +func NewLocalEventBus(name string) (bus EventBus) { + if _localEventbusGroups == nil { + _localEventbusGroups = &localEventbusGroups{ + mutex: &sync.Mutex{}, + groups: make(map[string]chan *localEvent), + eventCountMap: make(map[string]*sync.WaitGroup), + } + } + eventsCh := make(chan *localEvent, 1024) + eventCount := &sync.WaitGroup{} + _localEventbusGroups.addGroup(name, eventsCh, eventCount) + bus = &LocalEventBus{ + eventCount: eventCount, + name: name, + eventsCh: eventsCh, + consumers: make(map[string]EventHandle), + } + + return +} + +type localEvent struct { + topic string + value interface{} +} + +type LocalEventBus struct { + eventCount *sync.WaitGroup + name string + eventsCh chan *localEvent + consumers map[string]EventHandle +} + +func (bus *LocalEventBus) Name() string { + return bus.name +} + +func (bus *LocalEventBus) Send(ctx context.Context, eventMessages ...DomainEventMessage) (err error) { + if eventMessages == nil { + return + } + for _, eventMessage := range eventMessages { + _localEventbusGroups.send(eventMessage.TopicName(bus.name), eventMessage) + } + return +} + +func (bus *LocalEventBus) Recv(ctx context.Context, topic string, handle EventHandle) (err error) { + + if topic == "" { + return + } + if handle == nil { + return + } + bus.consumers[topic] = handle + return +} + +func (bus *LocalEventBus) Start(ctx context.Context) (err error) { + + go func(ctx context.Context, bus *LocalEventBus) { + for { + event, ok := <-bus.eventsCh + if !ok { + break + } + handle, has := bus.consumers[event.topic] + if !has { + continue + } + p, jsonErr := json.Marshal(event.value) + if jsonErr != nil { + continue + } + _, _ = handle(ctx, NewJsonEventDecoder(p)) + bus.eventCount.Done() + } + }(ctx, bus) + return +} + +func (bus *LocalEventBus) Shutdown() { + close(bus.eventsCh) + _localEventbusGroups.removeGroup(bus.name) +} + +func (bus *LocalEventBus) Await() { + +} + +func (bus *LocalEventBus) Close(ctx context.Context) (err error) { + bus.Shutdown() + bus.Await() + return +} diff --git a/event_bus_rocketmq.go b/event_bus_rocketmq.go new file mode 100644 index 0000000..7ef9fa2 --- /dev/null +++ b/event_bus_rocketmq.go @@ -0,0 +1,157 @@ +package gddd + +import ( + "context" + "encoding/json" + "fmt" + "github.com/apache/rocketmq-client-go/v2" + "github.com/apache/rocketmq-client-go/v2/consumer" + "github.com/apache/rocketmq-client-go/v2/primitive" + "github.com/apache/rocketmq-client-go/v2/producer" + "strings" +) + +type RocketMqEventBusConfig struct { + DomainName string + SubDomainName string + NameServers []string + EventStore *EventStore +} + +func NewRocketMqEventBus(ctx context.Context, config RocketMqEventBusConfig) (bus EventBus, err error) { + domainName := strings.TrimSpace(config.DomainName) + if domainName == "" { + err = fmt.Errorf("new rocketmq event bus failed, DomainName is empty") + return + } + subDomainName := strings.TrimSpace(config.SubDomainName) + if subDomainName == "" { + err = fmt.Errorf("new rocketmq event bus failed, SubDomainName is empty") + return + } + if config.NameServers == nil || len(config.NameServers) == 0 { + err = fmt.Errorf("new rocketmq event bus failed, NameServers is nil or empty") + return + } + p, err := rocketmq.NewTransactionProducer( + NewDemoListener(config.EventStore), + producer.WithNsResolver(primitive.NewPassthroughResolver(config.NameServers)), + producer.WithRetry(1), + ) + if err != nil { + fmt.Printf("new producer error: %s\n", err.Error()) + panic(err) + } + err = p.Start() + if err != nil { + fmt.Printf("start producer error: %s\n", err.Error()) + panic(err) + } + + // consumer + c, _ := rocketmq.NewPushConsumer( + consumer.WithGroupName("testGroup"), + consumer.WithNsResolver(primitive.NewPassthroughResolver(config.NameServers)), + ) + if err != nil { + fmt.Printf("new consumer error: %s\n", err.Error()) + panic(err) + } + + consumers := make(map[string]EventHandle) + bus0 := &rocketmqEventBus{ + name: fmt.Sprintf("%s_%s", config.DomainName, config.SubDomainName), + //running: NewAtomicSwitch(), + brokers: config.NameServers, + producer: p, + consumer: c, + consumers: consumers, + } + bus = bus0 + return +} + +type rocketmqEventBus struct { + name string + brokers []string + producer rocketmq.TransactionProducer + consumer rocketmq.PushConsumer + consumers map[string]EventHandle +} + +func (bus *rocketmqEventBus) Name() string { + return bus.name +} + +func (bus *rocketmqEventBus) Send(ctx context.Context, eventMessages ...DomainEventMessage) (err error) { + if bus.producer == nil { + err = fmt.Errorf("rocketmq event bus send event failed, producer is nil") + return + } + if eventMessages == nil || len(eventMessages) == 0 { + err = fmt.Errorf("rocketmq event bus send event failed, eventMessages is nil or empty") + return + } + msgs := make([]*primitive.Message, 0, 1) + for _, eventMessage := range eventMessages { + var messageBody []byte + messageBody, err = json.Marshal(eventMessage) + if err != nil { + return + } + msg := primitive.NewMessage(eventMessage.TopicName(bus.name), messageBody) + msgs = append(msgs, msg) + } + res, err := bus.producer.SendMessageInTransaction(ctx, msgs...) + if err != nil { + fmt.Printf("send message error: %s\n", err) + } else { + fmt.Printf("send message success: result=%s\n", res.String()) + } + return +} + +func (bus *rocketmqEventBus) Recv(_ context.Context, topic string, handle EventHandle) (err error) { + topic = strings.TrimSpace(topic) + if topic == "" { + err = fmt.Errorf("rocketmq event bus recv event failed, topic is empty") + return + } + if handle == nil { + err = fmt.Errorf("rocketmq event bus recv event failed, handle is nil") + return + } + bus.consumers[topic] = handle + return +} + +func (bus *rocketmqEventBus) Start(ctx context.Context) (err error) { + go func() { + for { + err = bus.consumer.Subscribe("DefaultCluster", consumer.MessageSelector{}, func(ctx context.Context, + msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) { + for i := range msgs { + fmt.Printf("subscribe callback: %v \n", msgs[i]) + } + //这个相当于消费者 消息ack,如果失败可以返回 consumer.ConsumeRetryLater + return consumer.ConsumeSuccess, nil + }) + if err != nil { + fmt.Printf("consume error: %s\n", err.Error()) + } + } + }() + return nil +} + +func (bus *rocketmqEventBus) Shutdown() { + return +} + +func (bus *rocketmqEventBus) Await() { + return +} + +func (bus *rocketmqEventBus) Close(ctx context.Context) (err error) { + return +} diff --git a/event_bus_rocketmq_demo_listener.go b/event_bus_rocketmq_demo_listener.go new file mode 100644 index 0000000..f78741f --- /dev/null +++ b/event_bus_rocketmq_demo_listener.go @@ -0,0 +1,69 @@ +package gddd + +import ( + "context" + "github.com/apache/rocketmq-client-go/v2/primitive" + "sync" +) + +type DemoListener struct { + es *EventStore + localTrans *sync.Map + transactionIndex int32 +} + +func NewDemoListener(eventStore *EventStore) *DemoListener { + return &DemoListener{ + es: eventStore, + localTrans: new(sync.Map), + } +} + +// ExecuteLocalTransaction 在 SendMessageInTransaction 方法调用 +// 如果返回 primitive.UnknowState 那么 brocker 就会调用 CheckLocalTransaction 方法检查消息状态 +// 如果返回 primitive.CommitMessageState 和 primitive.RollbackMessageState 则不会调用 CheckLocalTransaction +func (dl *DemoListener) ExecuteLocalTransaction(ctx context.Context, msgs ...*primitive.Message) primitive.LocalTransactionState { + storedEvents := make([]StoredEvent, 0, 1) + for _, msg := range msgs { + eventMessage := new(DomainEventMessage) + err := eventMessage.Decode(msg.Body) + if err != nil { + return primitive.RollbackMessageState + } + var storedEvent StoredEvent + storedEvent, err = newJsonStoredEvent(eventMessage.AggregateId, eventMessage.AggregateName, eventMessage.EventId, eventMessage.EventName, eventMessage.EventBody) + if err != nil { + return primitive.RollbackMessageState + } + storedEvents = append(storedEvents, storedEvent) + } + storeEventsErr := (*dl.es).StoreEvents(ctx, storedEvents) + if storeEventsErr != nil { + //err := fmt.Errorf("aggregates repository save failed, store domain events failed, %v", storeEventsErr) + return primitive.RollbackMessageState + } + return primitive.CommitMessageState +} + +func (dl *DemoListener) CheckLocalTransaction(msges ...*primitive.MessageExt) primitive.LocalTransactionState { + storedEvents := make([]StoredEvent, 0, 1) + for _, msge := range msges { + eventMessage := new(DomainEventMessage) + err := eventMessage.Decode(msge.Message.Body) + if err != nil { + return primitive.RollbackMessageState + } + var storedEvent StoredEvent + storedEvent, err = newJsonStoredEvent(eventMessage.AggregateId, eventMessage.AggregateName, eventMessage.EventId, eventMessage.EventName, eventMessage.EventBody) + if err != nil { + return primitive.RollbackMessageState + } + storedEvents = append(storedEvents, storedEvent) + } + err := (*dl.es).CheckEvents(context.TODO(), storedEvents) + if err != nil { + // todo + return primitive.RollbackMessageState + } + return primitive.CommitMessageState +} diff --git a/event_bus_rocketmq_test.go b/event_bus_rocketmq_test.go new file mode 100644 index 0000000..6a49ff5 --- /dev/null +++ b/event_bus_rocketmq_test.go @@ -0,0 +1,63 @@ +package gddd + +import ( + "context" + "fmt" + "testing" + "time" +) + +func TestNewRocketMqEventBus(t *testing.T) { + config := RocketMqEventBusConfig{"apple", "community", []string{"127.0.0.1:9876"}, nil} + gotBus, err := NewRocketMqEventBus(context.TODO(), config) + if err != nil { + t.Errorf("NewRocketMqEventBus() error = %v", err) + return + } + fmt.Println(gotBus) +} + +func Test_rocketmqEventBus_Start(t *testing.T) { + config := RocketMqEventBusConfig{"apple", "community", []string{"127.0.0.1:9876"}, nil} + gotBus, err := NewRocketMqEventBus(context.TODO(), config) + if err != nil { + t.Errorf("NewRocketMqEventBus() error = %v", err) + return + } + err = gotBus.Start(context.TODO()) + if err != nil { + t.Errorf("start error = %v", err) + return + } +} + +func Test_rocketmqEventBus_Send(t *testing.T) { + config := RocketMqEventBusConfig{"apple", "community", []string{"127.0.0.1:9876"}, nil} + gotBus, err := NewRocketMqEventBus(context.TODO(), config) + if err != nil { + t.Errorf("NewRocketMqEventBus() error = %v", err) + return + } + event := SampleDomainEvent{aggregateId: 1, aggregateName: "book", eventId: 1, eventName: "BookCreated", eventBody: BookCreated{Book: "aa", Price: 2, CreateTime: time.Now().UTC()}} + msg, _ := newDomainEventMessage(&event) + err = gotBus.Send(context.TODO(), msg) + if err != nil { + t.Errorf("send error = %v", err) + return + } +} + +func Test_rocketmqEventBus_Recv(t *testing.T) { + config := RocketMqEventBusConfig{"apple", "community", []string{"127.0.0.1:9876"}, nil} + gotBus, err := NewRocketMqEventBus(context.TODO(), config) + if err != nil { + t.Errorf("NewRocketMqEventBus() error = %v", err) + return + } + err = gotBus.Start(context.TODO()) + if err != nil { + t.Errorf("start error = %v", err) + return + } + select {} +} diff --git a/event_sourcing.go b/event_sourcing.go new file mode 100644 index 0000000..1a179df --- /dev/null +++ b/event_sourcing.go @@ -0,0 +1,67 @@ +package gddd + +import ( + "encoding/json" + "fmt" + "github.com/jinzhu/copier" +) + +func handleAppliedDomainEventRecursively(aggregate Aggregate, events []DomainEvent) (err error) { + if events == nil || len(events) == 0 { + return + } + for _, event := range events { + err = handleAppliedDomainEvent(aggregate, event) + if err != nil { + return + } + } + return +} + +func handleStoredEventRecursively(aggregate Aggregate, events []StoredEvent) (err error) { + if events == nil || len(events) == 0 { + return + } + aggregateName := getAggregateName(aggregate) + aggregateId := aggregate.Identifier() + + for _, event := range events { + eventId := event.EventId() + if eventId == 0 { + err = fmt.Errorf("aggregates handle recursively stored domain events failed, domain event id is 0, aggregateName is %s, aggregateId is %d", aggregateName, aggregateId) + return + } + eventName := event.EventName() + if eventName == "" { + err = fmt.Errorf("aggregates handle recursively stored domain events failed, domain event name is empty, aggregateName is %s, aggregateId is %d", aggregateName, aggregateId) + return + } + eventBody := event.EventBodyRaw() + if eventBody == nil { + err = fmt.Errorf("aggregates handle recursively stored domain events failed, domain event data is nil, aggregateName is %s, aggregateId is %d", aggregateName, aggregateId) + return + } + err = handleStoredEvent(aggregate, event) + if err != nil { + return + } + } + return +} + +func handleAppliedDomainEvent(aggregate Aggregate, event DomainEvent) (err error) { + err = copier.CopyWithOption(aggregate, event.EventBody(), copier.Option{IgnoreEmpty: false, DeepCopy: true}) + if err != nil { + return + } + return +} + +func handleStoredEvent(aggregate Aggregate, event StoredEvent) (err error) { + err = json.Unmarshal(event.EventBodyRaw(), aggregate) + if err != nil { + return + } + return +} diff --git a/event_sourcing_test.go b/event_sourcing_test.go new file mode 100644 index 0000000..110de9d --- /dev/null +++ b/event_sourcing_test.go @@ -0,0 +1,64 @@ +package gddd + +import ( + "fmt" + "testing" + "time" +) + +type BookAggregate struct { + AbstractAggregate + Book string `json:"book"` + Price int64 `json:"price"` + CreateTime time.Time `json:"create_time"` +} + +type BookCreated struct { + Book string `json:"book"` + Price int64 `json:"price"` + CreateTime time.Time `json:"create_time"` +} + +type BookChangedPrice struct { + Price int64 `json:"price"` +} + +func Test_handleAggregate(t *testing.T) { + time_, _ := time.Parse("2006-01-02T15:04:05", "2022-01-19T23:59:59") + type args struct { + aggregate Aggregate + event DomainEvent + } + tests := []struct { + name string + args args + }{ + { + "BookCreated", + args{ + aggregate: &BookAggregate{AbstractAggregate: AbstractAggregate{Id: 1}}, + event: &SampleDomainEvent{aggregateId: 1, aggregateName: "book", eventId: 1, eventName: "BookCreated", eventBody: BookCreated{Book: "aa", Price: 2, CreateTime: time.Now().UTC()}}, + }, + }, + { + "BookChangedPrice", + args{ + aggregate: &BookAggregate{AbstractAggregate: AbstractAggregate{Id: 1}, Book: "aa", Price: 2, CreateTime: time_}, + event: &SampleDomainEvent{aggregateId: 1, aggregateName: "book", eventId: 2, eventName: "BookChangedPrice", eventBody: BookChangedPrice{Price: 3}}, + }, + }, + { + "BookChangedPrice", + args{ + aggregate: &BookAggregate{AbstractAggregate: AbstractAggregate{Id: 1}, Book: "aa", Price: 3, CreateTime: time_}, + event: &SampleDomainEvent{aggregateId: 1, aggregateName: "book", eventId: 2, eventName: "BookChangedPrice", eventBody: BookChangedPrice{Price: 4}}, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + handleAppliedDomainEvent(tt.args.aggregate, tt.args.event) + fmt.Println("--", tt.args.aggregate) + }) + } +} diff --git a/event_store.go b/event_store.go new file mode 100644 index 0000000..ed53ea1 --- /dev/null +++ b/event_store.go @@ -0,0 +1,68 @@ +package gddd + +import ( + "context" + "time" +) + +type EventStore interface { + Name() (name string) + InitDomainEventStoreTable(ctx context.Context, aggregateName string) + ReadEvents(ctx context.Context, aggregateName string, aggregateId int64, lastEventId int64) (events []StoredEvent, err error) + StoreEvents(ctx context.Context, events []StoredEvent) (err error) + CheckEvents(ctx context.Context, events []StoredEvent) (err error) + MakeSnapshot(ctx context.Context, aggregate Aggregate) (err error) + LoadSnapshot(ctx context.Context, aggregateId int64, aggregate Aggregate) (lastEventId int64, err error) +} + +type StoredEvent interface { + AggregateId() int64 + AggregateName() string + EventId() int64 + EventName() string + EventBodyRaw() []byte + EventCreateTime() time.Time +} + +type jsonStoredEvent struct { + aggregateId int64 + aggregateName string + eventId int64 + eventName string + eventBodyRaw []byte +} + +func newJsonStoredEvent(aggregateId int64, aggregateName string, eventId int64, eventName string, eventByte []byte) (event *jsonStoredEvent, err error) { + event = &jsonStoredEvent{ + aggregateId: aggregateId, + aggregateName: aggregateName, + eventId: eventId, + eventName: eventName, + eventBodyRaw: eventByte, + } + return +} + +func (e *jsonStoredEvent) AggregateId() int64 { + return e.aggregateId +} + +func (e *jsonStoredEvent) AggregateName() string { + return e.aggregateName +} + +func (e *jsonStoredEvent) EventId() int64 { + return e.eventId +} + +func (e *jsonStoredEvent) EventName() string { + return e.eventName +} + +func (e *jsonStoredEvent) EventBodyRaw() []byte { + return e.eventBodyRaw +} + +func (e *jsonStoredEvent) EventCreateTime() time.Time { + return NodeTime(e.eventId) +} diff --git a/event_store_archive_mysql.go b/event_store_archive_mysql.go new file mode 100644 index 0000000..16c3115 --- /dev/null +++ b/event_store_archive_mysql.go @@ -0,0 +1 @@ +package gddd diff --git a/event_store_mysql.go b/event_store_mysql.go new file mode 100644 index 0000000..a867495 --- /dev/null +++ b/event_store_mysql.go @@ -0,0 +1,600 @@ +package gddd + +import ( + "context" + "database/sql" + "encoding/json" + "errors" + "fmt" + "github.com/valyala/bytebufferpool" + "reflect" + "strings" +) + +type DBConfig struct { + SqlDataSourceName string `json:"sql_data_source_name"` + MaxIdleConnections int `json:"max_idle_connections"` + MaxOpenConnections int `json:"max_open_connections"` +} + +type MysqlEventStoreConfig struct { + SubDomainName string + DBConfig DBConfig +} + +// for example: users_location_domain_events +func getDomainTableName(subDomainName string, aggregateName string, tableType string) string { + return fmt.Sprintf("%s_%s_domain_%s", strings.ToLower(subDomainName), + strings.TrimSuffix(strings.ToLower(aggregateName), "aggregate"), tableType) +} + +const ( + // create table + sqlCreateDomainEventTable = `create table if not exists %s ( + id bigint unsigned auto_increment primary key, + aggregate_id bigint not null, + event_id bigint not null, + event_name varchar(50) not null, + event_data text not null, + unique index ix_aggregate_id (aggregate_id), + unique index ix_event_id (event_id) + )charset 'utf8mb4';` + sqlCreateDomainSnapshotTable = `create table if not exists %s ( + aggregate_id bigint not null primary key, + last_event_id bigint not null, + snapshot_data text not null + )charset 'utf8mb4';` + // events + sqlInsertEventPrefix = "INSERT INTO %s (aggregate_id, event_id, event_name, event_data) VALUES " + sqlSelectEventsByAggregateId = "SELECT event_id, event_name, event_data FROM %s WHERE aggregate_id = ? ORDER BY id ASC" + sqlSelectEventsByAggregateIdAndEventId = "SELECT event_id, event_name, event_data FROM %s WHERE aggregate_id = ? AND id > (SELECT id FROM %s where event_id = ?) ORDER BY id ASC" + sqlGetLatestEventRowIdByAggregateId = "SELECT id FROM %s WHERE aggregate_id = ? ORDER BY id DESC LIMIT 1 OFFSET 0" + sqlGetEventRowIdByEventId = "SELECT id FROM %s WHERE event_id = ?" + sqlCountEventsByAggregateIdAndEventsIdRange = "SELECT count(id) FROM %s WHERE aggregate_id = ? and id > ? and id <= ? ORDER BY id ASC" + sqlSelectEventsByAggregateIdAndEventsIdRange = "SELECT event_id, event_name, event_data FROM %s WHERE aggregate_id = ? and id > ? and id <= ? ORDER BY id ASC" + sqlGetLastEventIdByAggregateId = "SELECT last_event_id FROM %s WHERE aggregate_id = ?" + // snapshot + sqlInsertOrUpdateSnapshot = "INSERT INTO %s (aggregate_id, last_event_id, snapshot_data) VALUES (?, ?, ?) ON DUPLICATE KEY UPDATE snapshot_data = ?" + sqlGetSnapshotDataByAggregateId = "SELECT last_event_id, snapshot_data FROM %s WHERE aggregate_id = ?" +) + +func NewMysqlEventStore(ctx context.Context, config MysqlEventStoreConfig) (es EventStore, err error) { + subDomainName := strings.TrimSpace(config.SubDomainName) + if subDomainName == "" { + err = errors.New("new mysql event store failed, SubDomainName is empty") + return + } + sqlDataSourceName := strings.TrimSpace(config.DBConfig.SqlDataSourceName) + if sqlDataSourceName == "" { + err = errors.New("new mysql event store failed, SqlDataSourceName is empty") + return + } + db, openDBErr := sql.Open("mysql", config.DBConfig.SqlDataSourceName) + if openDBErr != nil { + err = fmt.Errorf("new mysql event store failed, open mysql db failed, %w", openDBErr) + return + } + es0 := &mysqlEventStore{ + //noCopy: noCopy{}, + subDomainName: subDomainName, + db: db, + snapshotsChan: make(chan Aggregate, 1024), + } + es0.listenSnapshotsChan(ctx) + es = es0 + return +} + +type mysqlEventStore struct { + //noCopy noCopy + subDomainName string + db *sql.DB + snapshotsChan chan Aggregate +} + +func (es *mysqlEventStore) listenSnapshotsChan(ctx context.Context) { + go func(ctx context.Context, es *mysqlEventStore) { + stopped := false + for { + select { + case <-ctx.Done(): + stopped = true + break + case snapshot, ok := <-es.snapshotsChan: + if !ok { + stopped = true + break + } + _ = es.doMakeSnapshot(ctx, snapshot) + } + if stopped { + break + } + } + }(ctx, es) + return +} + +func (es *mysqlEventStore) Name() (name string) { + name = es.subDomainName + return +} + +func (es *mysqlEventStore) InitDomainEventStoreTable(ctx context.Context, aggregateName string) { + domainEventTableName := getDomainTableName(es.subDomainName, aggregateName, "events") + createEventSql := fmt.Sprintf(sqlCreateDomainEventTable, domainEventTableName) + _, createEventTableErr := es.db.ExecContext(ctx, createEventSql) + if createEventTableErr != nil { + err := fmt.Errorf("mysql event store (%s) create event table failed, aggregateName is %s, %v", es.subDomainName, aggregateName, createEventTableErr) + panic(err) + } + domainSnapshotTableName := getDomainTableName(es.subDomainName, aggregateName, "snapshot") + createSnapshotSql := fmt.Sprintf(sqlCreateDomainSnapshotTable, domainSnapshotTableName) + _, createSnapshotTableErr := es.db.ExecContext(ctx, createSnapshotSql) + if createSnapshotTableErr != nil { + err := fmt.Errorf("mysql event store (%s) create snapshot table failed, aggregateName is %s, %v", es.subDomainName, aggregateName, createSnapshotTableErr) + panic(err) + } + return +} + +func (es *mysqlEventStore) ReadEvents(ctx context.Context, aggregateName string, aggregateId int64, lastEventId int64) (events []StoredEvent, err error) { + if aggregateId == 0 { + err = fmt.Errorf("mysql event store (%s) read events failed, aggregateId is empty", es.subDomainName) + return + } + domainEventTableName := getDomainTableName(es.subDomainName, aggregateName, "events") + var listSQL string + listArgs := make([]interface{}, 0, 1) + if lastEventId == 0 { + listSQL = fmt.Sprintf(sqlSelectEventsByAggregateId, domainEventTableName) + listArgs = append(listArgs, aggregateId) + } else { + listSQL = fmt.Sprintf(sqlSelectEventsByAggregateIdAndEventId, domainEventTableName, domainEventTableName) + listArgs = append(listArgs, aggregateId, lastEventId) + } + rows, queryErr := es.db.QueryContext(ctx, listSQL, listArgs...) + if queryErr != nil { + err = fmt.Errorf("mysql event store (%s) read events failed, query failed, aggregateName is %s, aggregateId is %d, lastEventId is %d, %v", es.subDomainName, aggregateName, aggregateId, lastEventId, queryErr) + return + } + + events = make([]StoredEvent, 0, 1) + for rows.Next() { + var eventId int64 + var eventName string + var eventData = make([]byte, 0, 1) + scanErr := rows.Scan( + &eventId, + &eventName, + &eventData, + ) + if scanErr != nil { + _ = rows.Close() + err = fmt.Errorf("mysql event store (%s) read events failed, query failed at scan, aggregateName is %s, aggregateId is %d, lastEventId is %d, %v", es.subDomainName, aggregateName, aggregateId, lastEventId, scanErr) + return + } + var event StoredEvent + event, err = newJsonStoredEvent(aggregateId, aggregateName, eventId, eventName, eventData) + if err != nil { + return + } + events = append(events, event) + } + closeErr := rows.Close() + if closeErr != nil { + err = fmt.Errorf("mysql event store (%s) read events failed, close failed, aggregateName is %s, aggregateId is %d, lastEventId is %d, %v", es.subDomainName, aggregateName, aggregateId, lastEventId, closeErr) + return + } + return +} + +func (es *mysqlEventStore) StoreEvents(ctx context.Context, events []StoredEvent) (err error) { + if events == nil || len(events) == 0 { + err = fmt.Errorf("mysql event store (%s) store events failed, events is nil or empty", es.subDomainName) + return + } + sqlStmtMap := make(map[string][]interface{}) + for _, event := range events { + aggregateName := event.AggregateName() + if _, ok := sqlStmtMap[aggregateName]; !ok { + sqlStmtMap[aggregateName] = make([]interface{}, 0, 1) + } + sqlStmtMap[aggregateName] = append(sqlStmtMap[aggregateName], + event.AggregateId(), + event.EventId(), + event.EventName(), + event.EventBodyRaw(), + ) + } + tx, txErr := es.db.BeginTx(ctx, &sql.TxOptions{Isolation: sql.LevelReadCommitted, ReadOnly: false}) + if txErr != nil { + err = fmt.Errorf("mysql event store (%s) store events failed, begin database tx failed, %v", es.subDomainName, txErr) + return + } + for aggregateName, sqlStmtArgs := range sqlStmtMap { + insertSQLBuffer := bytebufferpool.Get() + domainEventTableName := getDomainTableName(es.subDomainName, aggregateName, "events") + _, _ = insertSQLBuffer.WriteString(fmt.Sprintf(sqlInsertEventPrefix, domainEventTableName)) + _, _ = insertSQLBuffer.WriteString(strings.Repeat("(?, ?, ?, ?), ", len(sqlStmtArgs)/4)) + insertSQL := strings.TrimSuffix(insertSQLBuffer.String(), ", ") + rs, insertErr := tx.ExecContext(ctx, insertSQL, sqlStmtArgs...) + if insertErr != nil { + _ = tx.Rollback() + err = fmt.Errorf("mysql event store (%s) store events failed, insert failed, aggregateName is %s, %v", es.subDomainName, aggregateName, insertErr) + return + } + affected, affectedErr := rs.RowsAffected() + if affectedErr != nil { + _ = tx.Rollback() + err = fmt.Errorf("mysql event store (%s) store events failed, insert failed. aggregateName is %s, get rows affected failed, %v", es.subDomainName, aggregateName, affectedErr) + return + } + if affected == 0 { + _ = tx.Rollback() + err = fmt.Errorf("mysql event store (%s) store events failed, insert failed. aggregateName is %s, no rows affected failed, affected = %v", es.subDomainName, aggregateName, affected) + return + } + } + cmtErr := tx.Commit() + if cmtErr != nil { + _ = tx.Rollback() + err = fmt.Errorf("mysql event store (%s) store events failed, insert failed. commit failed, %v", es.subDomainName, cmtErr) + return + } + return +} + +func (es *mysqlEventStore) CheckEvents(ctx context.Context, events []StoredEvent) (err error) { + if events == nil { + err = fmt.Errorf("mysql event store (%s) check events failed, events is nil", es.subDomainName) + return + } + for _, event := range events { + aggregateName := event.AggregateName() + var rowId int64 + rowId, err = es.getEventRowId(ctx, aggregateName, event.EventId()) + if err != nil { + err = fmt.Errorf("mysql event store (%s) check events failed, get rowId failed, aggregateName is %s", es.subDomainName, aggregateName) + return + } + if rowId == 0 { + err = fmt.Errorf("mysql event store (%s) check events failed, get rowId failed, rowId is 0, aggregateName is %s", es.subDomainName, aggregateName) + return + } + } + return +} + +func (es *mysqlEventStore) getEventRowId(ctx context.Context, aggregateName string, eventId int64) (rowId int64, err error) { + domainEventTableName := getDomainTableName(es.subDomainName, aggregateName, "events") + row, queryErr := es.db.QueryContext(ctx, fmt.Sprintf(sqlGetEventRowIdByEventId, domainEventTableName), &eventId) + if queryErr != nil { + err = fmt.Errorf("mysql event store (%s) make snapshot failed, load row id from %s failed, eventId is %d, %v", es.subDomainName, domainEventTableName, eventId, queryErr) + return + } + if row.Next() { + eventRowId := sql.NullInt64{} + scanErr := row.Scan(&eventRowId) + if scanErr != nil { + _ = row.Close() + err = fmt.Errorf("mysql event store (%s) make snapshot failed, load row id from %s failed at scan, eventId is %d, %v", es.subDomainName, domainEventTableName, eventId, scanErr) + return + } + rowId = eventRowId.Int64 + } + closeErr := row.Close() + if closeErr != nil { + err = fmt.Errorf("mysql event store (%s) make snapshot failed, close failed, eventId is %d, %v", es.subDomainName, eventId, closeErr) + return + } + return +} + +func (es *mysqlEventStore) getLatestEventRowIdOfAggregate(ctx context.Context, aggregateName string, aggregateId int64) (rowId int64, err error) { + domainEventTableName := getDomainTableName(es.subDomainName, aggregateName, "events") + row, queryErr := es.db.QueryContext(ctx, fmt.Sprintf(sqlGetLatestEventRowIdByAggregateId, domainEventTableName), &aggregateId) + if queryErr != nil { + err = fmt.Errorf("mysql event store (%s) make snapshot failed, load latest event id of aggregate from %s failed, aggregateName is %s, aggregateId is %d, %v", es.subDomainName, domainEventTableName, aggregateName, aggregateId, queryErr) + return + } + if row.Next() { + eventRowId := sql.NullInt64{} + scanErr := row.Scan(&eventRowId) + if scanErr != nil { + _ = row.Close() + err = fmt.Errorf("mysql event store (%s) make snapshot failed, load latest event id of aggregate from %s failed at scan, aggregateName is %s, aggregateId is %d, %v", es.subDomainName, domainEventTableName, aggregateName, aggregateId, scanErr) + return + } + rowId = eventRowId.Int64 + } + closeErr := row.Close() + if closeErr != nil { + err = fmt.Errorf("mysql event store (%s) make snapshot failed, close failed, aggregateName is %s, aggregateId is %d, %v", es.subDomainName, aggregateName, aggregateId, closeErr) + return + } + return +} + +// list >leftRowId, <=rightRowId stored events +func (es *mysqlEventStore) countStoredEvents(ctx context.Context, aggregateName string, aggregateId int64, leftRowId int64, rightRowId int64) (count int64, err error) { + if aggregateId == 0 { + err = fmt.Errorf("mysql event store (%s) count stored events failed, aggregateName is %s, aggregateId is empty", es.subDomainName, aggregateName) + return + } + domainEventTableName := getDomainTableName(es.subDomainName, aggregateName, "events") + listSQL := fmt.Sprintf(sqlCountEventsByAggregateIdAndEventsIdRange, domainEventTableName) + rows, queryErr := es.db.QueryContext(ctx, listSQL, aggregateId, leftRowId, rightRowId) + if queryErr != nil { + err = fmt.Errorf("mysql event store (%s) count stored events failed, query failed, aggregateName is %s, aggregateId is %d, leftRowId is %d, rightRowId is %d, %v", es.subDomainName, aggregateName, aggregateId, leftRowId, rightRowId, queryErr) + return + } + + for rows.Next() { + count0 := sql.NullInt64{} + scanErr := rows.Scan( + &count0, + ) + if scanErr != nil { + _ = rows.Close() + err = fmt.Errorf("mysql event store (%s) count stored events failed, query failed at scan, aggregateName is %s, aggregateId is %d, leftRowId is %d, rightRowId is %d, %v", es.subDomainName, aggregateName, aggregateId, leftRowId, rightRowId, scanErr) + return + } + count = count0.Int64 + } + closeErr := rows.Close() + if closeErr != nil { + err = fmt.Errorf("mysql event store (%s) count stored events failed, close failed, aggregateName is %s, aggregateId is %d, leftRowId is %d, rightRowId is %d, %v", es.subDomainName, aggregateName, aggregateId, leftRowId, rightRowId, closeErr) + return + } + return +} + +// list >leftRowId, <=rightRowId stored events +func (es *mysqlEventStore) rangeStoredEvents(ctx context.Context, aggregateName string, aggregateId int64, leftRowId int64, rightRowId int64) (events []StoredEvent, err error) { + if aggregateId == 0 { + err = fmt.Errorf("mysql event store (%s) range stored events failed, aggregateName is %s, aggregateId is empty", es.subDomainName, aggregateName) + return + } + domainEventTableName := getDomainTableName(es.subDomainName, aggregateName, "events") + listSQL := fmt.Sprintf(sqlSelectEventsByAggregateIdAndEventsIdRange, domainEventTableName) + rows, queryErr := es.db.QueryContext(ctx, listSQL, aggregateId, leftRowId, rightRowId) + if queryErr != nil { + err = fmt.Errorf("mysql event store (%s) range stored events failed, query failed, aggregateName is %s, aggregateId is %d, leftRowId is %d, rightRowId is %d, %v", es.subDomainName, aggregateName, aggregateId, leftRowId, rightRowId, queryErr) + return + } + + events = make([]StoredEvent, 0, 1) + for rows.Next() { + eventId := sql.NullInt64{} + eventName := sql.NullString{} + eventData := make([]byte, 0, 1) + scanErr := rows.Scan( + &eventId, + &eventName, + &eventData, + ) + if scanErr != nil { + _ = rows.Close() + err = fmt.Errorf("mysql event store (%s) range stored events failed, query failed at scan, aggregateName is %s, aggregateId is %d, leftRowId is %d, rightRowId is %d, %v", es.subDomainName, aggregateName, aggregateId, leftRowId, rightRowId, scanErr) + return + } + var event StoredEvent + event, err = newJsonStoredEvent(aggregateId, aggregateName, eventId.Int64, eventName.String, eventData) + if err != nil { + return + } + events = append(events, event) + } + closeErr := rows.Close() + if closeErr != nil { + err = fmt.Errorf("mysql event store (%s) range stored events failed, close failed, aggregateName is %s, aggregateId is %d, leftRowId is %d, rightRowId is %d, %v", es.subDomainName, aggregateName, aggregateId, leftRowId, rightRowId, closeErr) + return + } + return +} + +func (es *mysqlEventStore) MakeSnapshot(_ context.Context, aggregate Aggregate) (err error) { + if aggregate == nil { + err = fmt.Errorf("mysql event store (%s) make snapshot failed, aggregate is nil", es.subDomainName) + return + } + aggregateId := aggregate.Identifier() + if aggregateId == 0 { + err = fmt.Errorf("mysql event store (%s) make snapshot failed, aggregateId is nil", es.subDomainName) + return + } + es.snapshotsChan <- aggregate + return +} + +func (es *mysqlEventStore) doMakeSnapshot(ctx context.Context, aggregate Aggregate) (err error) { + if aggregate == nil { + err = fmt.Errorf("mysql event store (%s) make snapshot failed, aggregate is nil", es.subDomainName) + return + } + aggregateId := aggregate.Identifier() + if aggregateId == 0 { + err = fmt.Errorf("mysql event store (%s) make snapshot failed, aggregateId is nil", es.subDomainName) + return + } + + // todo + emptyAggregate0 := reflect.New(reflect.TypeOf(aggregate).Elem()) + emptyAggregateInterface := emptyAggregate0.Interface() + emptyAggregate, isAggregate := emptyAggregateInterface.(Aggregate) + if !isAggregate { + err = fmt.Errorf("mysql event store (%s) make snapshot failed, aggregate is not aggregate, %v", es.subDomainName, aggregate) + return + } + + aggregateName := getAggregateName(emptyAggregate) + prevEventId, getPrevEventIdErr := es.LoadSnapshot(ctx, aggregateId, emptyAggregate) + if getPrevEventIdErr != nil { + err = fmt.Errorf("mysql event store (%s) make snapshot failed, aggregateName is %s, aggregateId is %d, %v", es.subDomainName, aggregateName, aggregateId, getPrevEventIdErr) + return + } + prevEventRowId := int64(0) + if prevEventId != 0 { + prevEventRowId0, getPrevEventRowIdErr := es.getEventRowId(ctx, aggregateName, prevEventId) + if getPrevEventRowIdErr != nil { + err = fmt.Errorf("mysql event store (%s) make snapshot failed, aggregateName is %s, aggregateId is %d, %v", es.subDomainName, aggregateName, aggregateId, getPrevEventRowIdErr) + return + } + prevEventRowId = prevEventRowId0 + } else { + emptyAggregate = aggregate + } + // count of not be made snapshot events + latestEventRowId, getLatestEventIdErr := es.getLatestEventRowIdOfAggregate(ctx, aggregateName, aggregateId) + if getLatestEventIdErr != nil { + err = fmt.Errorf("mysql event store (%s) make snapshot failed, aggregateName is %s, aggregateId is %d, %v", es.subDomainName, aggregateName, aggregateId, getLatestEventIdErr) + return + } + count, countErr := es.countStoredEvents(ctx, aggregateName, aggregateId, prevEventRowId, latestEventRowId) + if countErr != nil { + err = fmt.Errorf("mysql event store (%s) make snapshot failed, aggregateName is %s, aggregateId is %d, %v", es.subDomainName, aggregateName, aggregateId, countErr) + return + } + + makeFlag := count > 10 + if !makeFlag { + return + } + + // make + // get not be made events (prevEventId:latestEventId] + storedEvents, rangeStoredEventsErr := es.rangeStoredEvents(ctx, aggregateName, aggregateId, prevEventRowId, latestEventRowId) + if rangeStoredEventsErr != nil { + err = fmt.Errorf("mysql event store (%s) make snapshot failed, aggregateName is %s, aggregateId is %d, %v", es.subDomainName, aggregateName, aggregateId, rangeStoredEventsErr) + return + } + if storedEvents == nil || len(storedEvents) == 0 { + return + } + + lastEventId := storedEvents[len(storedEvents)-1].EventId() + handleErr := handleStoredEventRecursively(emptyAggregate, storedEvents) + if handleErr != nil { + err = fmt.Errorf("mysql event store (%s) make snapshot failed, handle stored event recursively falied, aggregateName is %s, aggregateId is %d, %v", es.subDomainName, aggregateName, aggregateId, handleErr) + return + } + + aggregateBytes, encodeErr := json.Marshal(emptyAggregate) + if encodeErr != nil { + err = fmt.Errorf("mysql event store (%s) make snapshot failed, encode aggregate failed, aggregateName is %s, aggregateId is %d, lastEventId is %d, %v", es.subDomainName, aggregateName, aggregateId, lastEventId, encodeErr) + return + } + domainSnapshotTableName := getDomainTableName(es.subDomainName, aggregateName, "snapshot") + insertSQL := fmt.Sprintf(sqlInsertOrUpdateSnapshot, domainSnapshotTableName) + tx, txErr := es.db.BeginTx(ctx, &sql.TxOptions{Isolation: sql.LevelReadCommitted, ReadOnly: false}) + if txErr != nil { + err = fmt.Errorf("mysql event store (%s) make snapshot failed, begin database tx failed, aggregateName is %s, aggregateId is %d, lastEventId is %d, %v", es.subDomainName, aggregateName, aggregateId, lastEventId, txErr) + return + } + rs, insertErr := tx.ExecContext(ctx, insertSQL, aggregateId, lastEventId, aggregateBytes, aggregateBytes) + if insertErr != nil { + _ = tx.Rollback() + err = fmt.Errorf("mysql event store (%s) make snapshot failed, insert failed, aggregateName is %s, aggregateId is %d, lastEventId is %d, %v", es.subDomainName, aggregateName, aggregateId, lastEventId, insertErr) + return + } + affected, affectedErr := rs.RowsAffected() + if affectedErr != nil { + _ = tx.Rollback() + err = fmt.Errorf("mysql event store (%s) make snapshot failed, insert failed, aggregateName is %s, aggregateId is %d, lastEventId is %d. get rows affected failed, %v", es.subDomainName, aggregateName, aggregateId, lastEventId, affectedErr) + return + } + if affected == 0 { + _ = tx.Rollback() + err = fmt.Errorf("mysql event store (%s) make snapshot failed, insert failed, aggregateName is %s, aggregateId is %d, lastEventId is %d. no rows affected failed, affected = %v", es.subDomainName, aggregateName, aggregateId, lastEventId, affected) + return + } + cmtErr := tx.Commit() + if cmtErr != nil { + _ = tx.Rollback() + err = fmt.Errorf("mysql event store (%s) make snapshot failed, insert failed, aggregateName is %s, aggregateId is %d, lastEventId is %d. commit failed, %v", es.subDomainName, aggregateName, aggregateId, lastEventId, cmtErr) + return + } + return +} + +func (es *mysqlEventStore) getLastEventIdFromSnapshot(ctx context.Context, aggregateName string, aggregateId int64) (lastEventId int64, err error) { + if aggregateId == 0 { + err = fmt.Errorf("mysql event store (%s) get last event id from snapshot failed, aggregateId is empty", es.subDomainName) + return + } + domainSnapshotTableName := getDomainTableName(es.subDomainName, aggregateName, "snapshot") + getSQL := fmt.Sprintf(sqlGetLastEventIdByAggregateId, domainSnapshotTableName) + rows, queryErr := es.db.QueryContext(ctx, getSQL, &aggregateId) + if queryErr != nil { + err = fmt.Errorf("mysql event store (%s) get last event id from snapshot failed, query failed, aggregateName is %s, aggregateId is %d, %v", es.subDomainName, aggregateName, aggregateId, queryErr) + return + } + if rows.Next() { + lastEventId0 := sql.NullInt64{} + scanErr := rows.Scan( + &lastEventId0, + ) + if scanErr != nil { + _ = rows.Close() + err = fmt.Errorf("mysql event store (%s) get last event id from snapshot failed, query failed at scan, aggregateName is %s, aggregateId is %d, %v", es.subDomainName, aggregateName, aggregateId, scanErr) + return + } + lastEventId = lastEventId0.Int64 + } + closeErr := rows.Close() + if closeErr != nil { + err = fmt.Errorf("mysql event store (%s) get last event id from snapshot failed, close failed, aggregateName is %s, aggregateId is %d, %v", es.subDomainName, aggregateName, aggregateId, closeErr) + return + } + return +} + +func (es *mysqlEventStore) LoadSnapshot(ctx context.Context, aggregateId int64, emptyAggregate Aggregate) (lastEventId int64, err error) { + if aggregateId == 0 { + err = fmt.Errorf("mysql event store (%s) load snapshot failed, aggregateId is empty", es.subDomainName) + return + } + if emptyAggregate == nil { + err = fmt.Errorf("mysql event store (%s) load snapshot failed, aggregate is nil", es.subDomainName) + return + } + aggregateName := getAggregateName(emptyAggregate) + domainSnapshotTableName := getDomainTableName(es.subDomainName, aggregateName, "snapshot") + getSQL := fmt.Sprintf(sqlGetSnapshotDataByAggregateId, domainSnapshotTableName) + rows, queryErr := es.db.QueryContext(ctx, getSQL, &aggregateId) + if queryErr != nil { + err = fmt.Errorf("mysql event store (%s) load snapshot failed, query failed, aggregateName is %s, aggregateId is %d, %v", es.subDomainName, aggregateName, aggregateId, queryErr) + return + } + + snapshotData := make([]byte, 0, 1) + if rows.Next() { + lastEventId0 := sql.NullInt64{} + scanErr := rows.Scan( + &lastEventId0, + &snapshotData, + ) + if scanErr != nil { + _ = rows.Close() + err = fmt.Errorf("mysql event store (%s) load snapshot failed, query failed at scan, aggregateName is %s, aggregateId is %d, %v", es.subDomainName, aggregateName, aggregateId, scanErr) + return + } + lastEventId = lastEventId0.Int64 + } + closeErr := rows.Close() + if closeErr != nil { + err = fmt.Errorf("mysql event store (%s) load snapshot failed, close failed, aggregateName is %s, aggregateId is %d, %v", es.subDomainName, aggregateName, aggregateId, closeErr) + return + } + + if lastEventId == 0 { + return + } + decodeErr := json.Unmarshal(snapshotData, emptyAggregate) + if decodeErr != nil { + err = fmt.Errorf("mysql event store (%s) load snapshot failed, decode failed, aggregateName is %s, aggregateId is %d, %v", es.subDomainName, aggregateName, aggregateId, decodeErr) + return + } + return +} diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..431f344 --- /dev/null +++ b/go.mod @@ -0,0 +1,16 @@ +module github.com/wuyazi/gddd + +go 1.16 + +require ( + github.com/apache/rocketmq-client-go/v2 v2.1.0 + github.com/bwmarrin/snowflake v0.3.0 + github.com/jinzhu/copier v0.3.5 + github.com/sirupsen/logrus v1.8.1 // indirect + github.com/stretchr/testify v1.7.0 // indirect + github.com/valyala/bytebufferpool v1.0.0 + golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e // indirect + gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b // indirect +) + +replace github.com/apache/rocketmq-client-go/v2 => ../../../github.com/wuyazi/rocketmq-client-go diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..a1e40ba --- /dev/null +++ b/go.sum @@ -0,0 +1,93 @@ +github.com/BurntSushi/toml v1.1.0 h1:ksErzDEI1khOiGPgpwuI7x2ebx/uXQNw7xJpn9Eq1+I= +github.com/BurntSushi/toml v1.1.0/go.mod h1:CxXYINrC8qIiEnFrOxCa7Jy5BFHlXnUU2pbicEuybxQ= +github.com/bwmarrin/snowflake v0.3.0 h1:xm67bEhkKh6ij1790JB83OujPR5CzNe8QuQqAgISZN0= +github.com/bwmarrin/snowflake v0.3.0/go.mod h1:NdZxfVWX+oR6y2K0o6qAYv6gIOP9rjG0/E9WsDpxqwE= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/emirpasic/gods v1.12.0 h1:QAUIPSaCu4G+POclxeqb3F+WPpdKqFGlw36+yOzGlrg= +github.com/emirpasic/gods v1.12.0/go.mod h1:YfzfFFoVP/catgzJb4IKIqXjX78Ha8FMSDh3ymbK86o= +github.com/golang/mock v1.3.1 h1:qGJ6qTW+x6xX/my+8YUVl4WNpX9B7+/l2tRsHGZ7f2s= +github.com/golang/mock v1.3.1/go.mod h1:sBzyDLLjw3U8JLTeZvSv8jJB+tU5PVekmnlKIyFUx0Y= +github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= +github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1 h1:EGx4pi6eqNxGaHF6qqu48+N2wcFQ5qg5FXgOdqsJ5d8= +github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY= +github.com/jinzhu/copier v0.3.5 h1:GlvfUwHk62RokgqVNvYsku0TATCF7bAHVwEXoBh3iJg= +github.com/jinzhu/copier v0.3.5/go.mod h1:DfbEm0FYsaqBcKcFuvmOZb218JkPGtvSHsKg8S8hyyg= +github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= +github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= +github.com/jtolds/gls v4.20.0+incompatible h1:xdiiI2gbIgH/gLH7ADydsJ1uDOEzR8yvV7C0MuV77Wo= +github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU= +github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= +github.com/kr/pretty v0.2.1 h1:Fmg33tUaq4/8ym9TJN1x7sLJnHVwhP33CNkpYV/7rwI= +github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= +github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= +github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= +github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= +github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421 h1:ZqeYNhU3OHLH3mGKHDcjJRFFRrJa6eAM5H+CtDdOsPc= +github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= +github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M= +github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= +github.com/patrickmn/go-cache v2.1.0+incompatible h1:HRMgzkcYKYpi3C8ajMPV8OFXaaRUnok+kx1WdO15EQc= +github.com/patrickmn/go-cache v2.1.0+incompatible/go.mod h1:3Qf8kWWT7OJRJbdiICTKqZju1ZixQ/KpMGzzAfe6+WQ= +github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I= +github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/satori/go.uuid v1.2.0 h1:0uYX9dsZ2yD7q2RtLRtPSdGDWzjeM3TbMJP9utgA0ww= +github.com/satori/go.uuid v1.2.0/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdhQKdks0= +github.com/sirupsen/logrus v1.4.1/go.mod h1:ni0Sbl8bgC9z8RoU9G6nDWqqs/fq4eDPysMBDgk/93Q= +github.com/sirupsen/logrus v1.8.1 h1:dJKuHgqk1NNQlqoA6BTlM1Wf9DOH3NBjQyu0h9+AZZE= +github.com/sirupsen/logrus v1.8.1/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic61uBYv0= +github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d h1:zE9ykElWQ6/NYmHa3jpm/yHnI4xSofP+UP6SpjHcSeM= +github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d/go.mod h1:OnSkiWE9lh6wB0YB77sQom3nweQdgAjqCqsofrRNTgc= +github.com/smartystreets/goconvey v1.6.4 h1:fv0U8FUIMPNf1L9lnHLvLhgicrIVChEkdzIKYqbNC9s= +github.com/smartystreets/goconvey v1.6.4/go.mod h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9QV7WQ/tjFTllLA= +github.com/stathat/consistent v1.0.0 h1:ZFJ1QTRn8npNBKW065raSZ8xfOqhpb8vLOkfp4CcL/U= +github.com/stathat/consistent v1.0.0/go.mod h1:uajTPbgSygZBJ+V+0mY7meZ8i0XAcZs7AQ6V121XSxw= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY= +github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/tidwall/gjson v1.13.0 h1:3TFY9yxOQShrvmjdM76K+jc66zJeT6D3/VFFYCGQf7M= +github.com/tidwall/gjson v1.13.0/go.mod h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk= +github.com/tidwall/match v1.1.1 h1:+Ho715JplO36QYgwN9PGYNhgZvoUSc9X2c80KVTi+GA= +github.com/tidwall/match v1.1.1/go.mod h1:eRSPERbgtNPcGhD8UCthc6PmLEQXEWd3PRB5JTxsfmM= +github.com/tidwall/pretty v1.2.0 h1:RWIZEg2iJ8/g6fDDYzMpobmaoGh5OLl4AXtGUGPcqCs= +github.com/tidwall/pretty v1.2.0/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU= +github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw= +github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc= +go.uber.org/atomic v1.5.1 h1:rsqfU5vBkVknbhUGbAUwQKR2H4ItV8tjJ+6kJX4cxHM= +go.uber.org/atomic v1.5.1/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/lint v0.0.0-20190930215403-16217165b5de h1:5hukYrvBGR8/eNkX5mdUezrA6JiaEZDtJb9Ei+1LlBs= +golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= +golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e h1:fLOSk5Q00efkSvAm+4xcoXD+RRmLmmulPn5I3Y9F2EM= +golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= +golang.org/x/tools v0.0.0-20190328211700-ab21143f2384/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= +golang.org/x/tools v0.0.0-20190425150028-36563e24a262/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q= +golang.org/x/tools v0.0.0-20191029041327-9cc4af7d6b2c h1:IGkKhmfzcztjm6gYkykvu/NiS8kaqbCWAEWWAyf8J5U= +golang.org/x/tools v0.0.0-20191029041327-9cc4af7d6b2c/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= +gopkg.in/natefinch/lumberjack.v2 v2.0.0 h1:1Lc07Kr7qY4U2YPouBjpCLxpiyxIVoxqXgkXLknAOE8= +gopkg.in/natefinch/lumberjack.v2 v2.0.0/go.mod h1:l0ndWWf7gzL7RNwBG7wST/UCcT4T24xpD6X8LsfU/+k= +gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= +gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b h1:h8qDotaEPuJATrMmW04NCwg7v22aHH28wwpauUhK9Oo= +gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +stathat.com/c/consistent v1.0.0 h1:ezyc51EGcRPJUxfHGSgJjWzJdj3NiMU9pNfLNGiXV0c= +stathat.com/c/consistent v1.0.0/go.mod h1:QkzMWzcbB+yQBL2AttO6sgsQS/JSTapcDISJalmCDS0= diff --git a/main.go b/main.go new file mode 100644 index 0000000..d3120fc --- /dev/null +++ b/main.go @@ -0,0 +1,11 @@ +package gddd + +import ( + "errors" + "fmt" +) + +func main() { + err := errors.New("aa") + fmt.Println(err.Error()) +} diff --git a/repository.go b/repository.go new file mode 100644 index 0000000..34d08a0 --- /dev/null +++ b/repository.go @@ -0,0 +1,200 @@ +package gddd + +import ( + "context" + "errors" + "fmt" + "reflect" + "strings" +) + +var NodeNumber int64 = 1 + +type RepositorySaveListener interface { + Handle(ctx context.Context, event DomainEventMessage) +} + +type RepositoryConfig struct { + DomainName string `json:"domain_name"` + SubDomainName string `json:"sub_domain_name"` + MysqlEventStoreDBConfig DBConfig `json:"mysql_event_store_db_config"` + RocketMqEventBusNameServers []string `json:"rocket_mq_event_bus_name_servers"` + SaveListener RepositorySaveListener +} + +func NewRepository(ctx context.Context, config *RepositoryConfig) (r *Repository, err error) { + domainName := strings.TrimSpace(config.DomainName) + if domainName == "" { + panic("aggregates repository create failed, DomainName is empty") + } + subDomainName := strings.TrimSpace(config.SubDomainName) + if subDomainName == "" { + panic("aggregates repository create failed, SubDomainName is empty") + } + mysqlEventStoreConfig := MysqlEventStoreConfig{ + SubDomainName: config.SubDomainName, + DBConfig: config.MysqlEventStoreDBConfig, + } + es, esErr := NewMysqlEventStore(ctx, mysqlEventStoreConfig) + if esErr != nil { + panic(fmt.Errorf("aggregates repository create failed, new event store failed, err: %w", esErr)) + } + rocketMqConfig := RocketMqEventBusConfig{ + DomainName: config.DomainName, + SubDomainName: config.SubDomainName, + NameServers: config.RocketMqEventBusNameServers, + EventStore: &es, + } + eb, ebErr := NewRocketMqEventBus(ctx, rocketMqConfig) + if ebErr != nil { + panic(fmt.Errorf("aggregates repository create failed, new rocketMq eventBus failed, err: %w", ebErr)) + } + ebStartErr := eb.Start(context.TODO()) + if ebStartErr != nil { + panic(fmt.Errorf("aggregates repository create failed, start rocketMq eventBus failed, err: %w", ebStartErr)) + } + r = &Repository{ + es: es, + eb: eb, + saveListener: config.SaveListener, + } + return +} + +type Repository struct { + //noCopy noCopy + es EventStore + eb EventBus + saveListener RepositorySaveListener +} + +func (r *Repository) RegisterAggregates(ctx context.Context, aggregates ...Aggregate) (err error) { + for _, aggregate := range aggregates { + aggregateName := getAggregateName(aggregate) + r.es.InitDomainEventStoreTable(ctx, aggregateName) + } + return +} + +func (r *Repository) Load(ctx context.Context, aggregate Aggregate) (has bool, err error) { + aggregateId := aggregate.Identifier() + if aggregateId == 0 { + err = errors.New("aggregates repository load failed, aggregateId is empty") + return + } + if aggregate == nil { + err = errors.New("aggregates repository load failed, aggregate is nil") + return + } + // load snapshot + aggregateName := getAggregateName(aggregate) + lastEventId, loadSnapshotErr := r.es.LoadSnapshot(ctx, aggregateId, aggregate) + if loadSnapshotErr != nil { + err = fmt.Errorf("aggregates repository load failed, load snapshot failed, aggregateName is %s, aggregateId is %d, %v", aggregateName, aggregateId, loadSnapshotErr) + return + } + storedEvents, readEventsErr := r.es.ReadEvents(ctx, aggregateName, aggregateId, lastEventId) + if readEventsErr != nil { + err = fmt.Errorf("aggregates repository load failed, load events failed, aggregateName is %s, aggregateId is %d, %v", aggregateName, aggregateId, readEventsErr) + return + } + if storedEvents == nil || len(storedEvents) == 0 { + if lastEventId != 0 { + has = true + return + } + return + } + + handleErr := handleStoredEventRecursively(aggregate, storedEvents) + if handleErr != nil { + err = fmt.Errorf("aggregates repository load failed, handle stored event recursively failed, aggregateName is %s, aggregateId is %d, %v", aggregateName, aggregateId, handleErr) + return + } + + has = true + return +} + +func (r *Repository) Save(ctx context.Context, aggregates ...Aggregate) (ok bool, err error) { + if aggregates == nil || len(aggregates) == 0 { + err = errors.New("aggregates repository save failed, aggregate is nil") + return + } + + eventsMessages := make([]DomainEventMessage, 0, 1) + + for _, aggregate := range aggregates { + if aggregate == nil { + err = errors.New("aggregates repository save failed, aggregate is nil") + return + } + aggregateId := aggregate.Identifier() + if aggregateId == 0 { + err = errors.New("aggregates repository save failed, Identifier is empty") + return + } + aggregateName := strings.TrimSpace(getAggregateName(aggregate)) + if aggregateName == "" { + err = errors.New("aggregates repository save failed, AggregateName is empty") + return + } + + abstractAggregateField := reflect.ValueOf(aggregate).Elem().FieldByName("AbstractAggregate") + if !abstractAggregateField.IsValid() { + err = fmt.Errorf("aggregates repository save failed, check AbstractAggregateField failed, aggregateName is %s, aggregateId is %d", aggregateName, aggregateId) + return + } + abstractAggregate, isAbstractAggregate := abstractAggregateField.Interface().(AbstractAggregate) + if !isAbstractAggregate { + err = fmt.Errorf("aggregates repository save failed, check AbstractAggregate failed, aggregateName is %s, aggregateId is %d", aggregateName, aggregateId) + return + } + domainEvents := abstractAggregate.Applied() + if domainEvents == nil || len(domainEvents) == 0 { + err = fmt.Errorf("aggregates repository save failed, changes is empty, aggregateName is %s, aggregateId is %d", aggregateName, aggregateId) + return + } + + handleErr := handleAppliedDomainEventRecursively(aggregate, domainEvents) + if handleErr != nil { + err = fmt.Errorf("aggregates repository save failed, handle event recursively failed, aggregateName is %s, aggregateId is %d, %v", aggregateName, aggregateId, handleErr) + return + } + for _, domainEvent := range domainEvents { + publishEventsMessage, newMessageErr := newDomainEventMessage(domainEvent) + if newMessageErr != nil { + err = fmt.Errorf("aggregates repository save failed, new domain event message failed, aggregateName is %s, aggregateId is %d, eventId is %d, %v", aggregateName, aggregateId, newMessageErr, domainEvent.EventId()) + return + } + eventsMessages = append(eventsMessages, publishEventsMessage) + } + abstractAggregate.lifecycle.cleanDomainEvents() + } + + // send event message + sendErr := r.eb.Send(ctx, eventsMessages...) + if sendErr != nil { + err = fmt.Errorf("aggregates repository save warn, send domain event failed, %v", sendErr) + return + } + + ok = true + + if r.saveListener != nil { + for _, eventMessage := range eventsMessages { + r.saveListener.Handle(ctx, eventMessage) + } + } + + // snapshot + for _, aggregate := range aggregates { + makeSnapshotErr := r.es.MakeSnapshot(ctx, aggregate) + if makeSnapshotErr != nil { + // todo log it + //err0 := fmt.Errorf("aggregates repository save warn, make snapshot failed, aggregateName is %s, aggregateId is %d, %v", getAggregateName(aggregate), aggregate.Identifier(), makeSnapshotErr) + } + } + + return +}