Skip to content

Commit

Permalink
first commit
Browse files Browse the repository at this point in the history
  • Loading branch information
xiandong-italki committed Jul 5, 2022
0 parents commit cd6e1ab
Show file tree
Hide file tree
Showing 24 changed files with 1,988 additions and 0 deletions.
15 changes: 15 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
# Binaries for programs and plugins
*.exe
*.exe~
*.dll
*.so
*.dylib

# Test binary, build with `go test -c`
*.test

# Output of the go coverage tool, specifically when used with LiteIDE
*.out

# goland
.idea
39 changes: 39 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@

## 执行顺序

```
| Command |
| Repository | Aggregate |
| | EventBus |
| EventStore |
```

## Command(命令)

命令

## Aggregate(聚合)

用于描述业务,并产生事件
定义实体,和事件,所有实体更改只能通过事件

## Repository(仓库)

用于存储数据的接口

## EventBus(事件总线)

用于传输事件

## EventStore(事件存储)

用于存储事件,并可回溯(EventSourcing)

## TODO

- [ ] 事件消费
- [ ] 事件溯源
- [ ] go-zero 适配
- [ ] 归档:将每日的事件归档
- [ ] 唯一索引:创建用户时,邮箱唯一
- [ ] 文档
46 changes: 46 additions & 0 deletions aggregate.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package gddd

import "context"

func NewAggregateId() (id int64) {
id = node.Generate().Int64()
return
}

func ApplyAggregateChange(ctx context.Context, aggregate Aggregate, change aggregateChange) {
aggregate.Apply(aggregate, change)
}

type Aggregate interface {
InitId()
Identifier() (id int64)
Apply(agg Aggregate, event aggregateChange)
Applied() (events []DomainEvent)
}

type AbstractAggregate struct {
Id int64 `json:"id"`
lifecycle aggregateLifecycle
}

func (a *AbstractAggregate) InitId() {
if a.Id == 0 {
a.Id = NewAggregateId()
}
return
}

func (a *AbstractAggregate) Identifier() (id int64) {
id = a.Id
return
}

func (a *AbstractAggregate) Apply(agg Aggregate, aggChange aggregateChange) {
a.lifecycle.apply(agg, aggChange)
return
}

func (a *AbstractAggregate) Applied() (events []DomainEvent) {
events = a.lifecycle.getDomainEvents()
return
}
55 changes: 55 additions & 0 deletions aggregate_lifecycle.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package gddd

import (
"errors"
"strings"
"sync"
)

type aggregateLifecycle struct {
mutex sync.Mutex
domainEvents []DomainEvent
}

type aggregateChange interface{}

func (c *aggregateLifecycle) apply(agg Aggregate, aggChange aggregateChange) {
c.mutex.Lock()
if aggChange == nil {
c.mutex.Unlock()
panic(errors.New("aggregate apply failed, aggregateChange is nil"))
return
}
eventName := strings.TrimSpace(getAggregateChangeName(aggChange))
if eventName == "" {
c.mutex.Unlock()
panic(errors.New("aggregate apply failed, eventName is empty"))
return
}
domainEvent := SampleDomainEvent{
aggregateId: agg.Identifier(),
aggregateName: getAggregateName(agg),
eventName: eventName,
eventBody: aggChange,
}
domainEvent.initEventId()
err := handleAppliedDomainEvent(agg, &domainEvent)
if err != nil {
c.mutex.Unlock()
panic(errors.New("aggregate apply failed, apply domain event failed"))
return
}
c.domainEvents = append(c.domainEvents, &domainEvent)
c.mutex.Unlock()
return
}

func (c *aggregateLifecycle) getDomainEvents() []DomainEvent {
return c.domainEvents
}

func (c *aggregateLifecycle) cleanDomainEvents() {
c.mutex.Lock()
c.domainEvents = make([]DomainEvent, 0, 1)
c.mutex.Unlock()
}
12 changes: 12 additions & 0 deletions command_bus.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package gddd

import "context"

type Command interface {
}

type AbstractCommand struct {
AggregateId string
}

type CommandHandle func(ctx context.Context, command Command) (result interface{}, err error)
58 changes: 58 additions & 0 deletions common.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package gddd

import (
"fmt"
"github.com/bwmarrin/snowflake"
"reflect"
"time"
)

var node *snowflake.Node

func init() {
// init snowflake
var nodeErr error
node, nodeErr = snowflake.NewNode(NodeNumber)
if nodeErr != nil {
fmt.Println(nodeErr)
return
}
}

func getAggregateName(a Aggregate) string {
rt := reflect.TypeOf(a)
if rt.Kind() == reflect.Ptr {
rt = rt.Elem()
}
if rt.Kind() != reflect.Struct {
panic("bad aggregate type")
}
return rt.Name()
}

func getAggregateChangeName(e aggregateChange) string {
rt := reflect.TypeOf(e)
if rt.Kind() == reflect.Ptr {
rt = rt.Elem()
}
if rt.Kind() != reflect.Struct {
panic("bad aggregateChange type")
}
return rt.Name()
}

func getEventName(e DomainEvent) string {
rt := reflect.TypeOf(e)
if rt.Kind() == reflect.Ptr {
rt = rt.Elem()
}
if rt.Kind() != reflect.Struct {
panic("bad event type")
}
return rt.Name()
}

func NodeTime(snowflakeId int64) time.Time {
milliTimeStamp := (snowflakeId >> (snowflake.NodeBits + snowflake.StepBits)) + snowflake.Epoch
return time.Unix(milliTimeStamp/1000, milliTimeStamp%1000*1000000)
}
13 changes: 13 additions & 0 deletions common_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package gddd

import (
"fmt"
"github.com/bwmarrin/snowflake"
"testing"
)

func TestNodeTime(t *testing.T) {
node, _ = snowflake.NewNode(1)
time0 := NodeTime(node.Generate().Int64())
fmt.Println("time is: ", time0)
}
16 changes: 16 additions & 0 deletions context.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package gddd

import (
"context"
"database/sql"
)

type Context struct {
ctx context.Context
tx *sql.Tx
}

func (c *Context) Apply(aggregate Aggregate, change aggregateChange) error {
ApplyAggregateChange(c.ctx, aggregate, change)
return nil
}
77 changes: 77 additions & 0 deletions event.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package gddd

import (
"encoding/json"
"fmt"
"time"
)

func NewDomainEventId() (id int64) {
id = node.Generate().Int64()
return
}

type DomainEvent interface {
AggregateId() int64
AggregateName() string
EventId() int64
EventName() string
EventBody() interface{}
EventBodyRaw() ([]byte, error)
EventCreateTime() time.Time
initEventId()
}

type SampleDomainEvent struct {
aggregateId int64
aggregateName string
eventId int64
eventName string
eventBody interface{}
}

func (s *SampleDomainEvent) initEventId() {
if s.eventId == 0 {
s.eventId = NewDomainEventId()
}
return
}

func (s *SampleDomainEvent) AggregateId() (id int64) {
id = s.aggregateId
return
}

func (s *SampleDomainEvent) AggregateName() (name string) {
name = s.aggregateName
return
}

func (s *SampleDomainEvent) EventId() (id int64) {
id = s.eventId
return
}

func (s *SampleDomainEvent) EventName() (name string) {
name = s.eventName
return
}

func (s *SampleDomainEvent) EventBody() (body interface{}) {
body = s.eventBody
return
}

func (s *SampleDomainEvent) EventCreateTime() (createTime time.Time) {
createTime = NodeTime(s.eventId)
return
}

func (s *SampleDomainEvent) EventBodyRaw() (bodyRaw []byte, err error) {
bodyRaw, err = json.Marshal(s.eventBody)
if err != nil {
err = fmt.Errorf("marshal domain event failed, %v", err)
return
}
return
}
55 changes: 55 additions & 0 deletions event_bus.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package gddd

import (
"context"
"encoding/json"
"fmt"
"strings"
)

// EventHandle handle event, if return err is not nil, it will reject event,
// if want to requeue, then make the requeue be true
type EventHandle func(ctx context.Context, eventDecoder EventDecoder) (err error, requeue bool)

type EventBus interface {
Name() string
Send(ctx context.Context, eventMessages ...DomainEventMessage) (err error)
Recv(ctx context.Context, topic string, handle EventHandle) (err error)
Start(ctx context.Context) (err error)
Shutdown()
Await()
Close(ctx context.Context) (err error)
}

type DomainEventMessage struct {
AggregateId int64 `json:"aggregate_id"`
AggregateName string `json:"aggregate_name"`
EventName string `json:"event_name"`
EventId int64 `json:"event_id"`
EventBody []byte `json:"event_body"`
}

func (msg *DomainEventMessage) TopicName(eventBusName string) string {
topic := fmt.Sprintf("%s_%s", eventBusName, strings.TrimSuffix(strings.ToLower(msg.AggregateName), "aggregate"))
return topic
}

func (msg *DomainEventMessage) Decode(byteData []byte) (err error) {
err = json.Unmarshal(byteData, msg)
return
}

func newDomainEventMessage(event DomainEvent) (msg DomainEventMessage, err error) {
eventBodyRaw, err := event.EventBodyRaw()
if err != nil {
return
}
msg = DomainEventMessage{
AggregateId: event.AggregateId(),
AggregateName: event.AggregateName(),
EventName: event.EventName(),
EventId: event.EventId(),
EventBody: eventBodyRaw,
}
return
}
Loading

0 comments on commit cd6e1ab

Please sign in to comment.