Skip to content

Commit

Permalink
aggregateChange = protoreflect.ProtoMessage
Browse files Browse the repository at this point in the history
  • Loading branch information
xiandong-italki committed Apr 21, 2023
1 parent b340d3c commit 7909a9b
Show file tree
Hide file tree
Showing 6 changed files with 53 additions and 21 deletions.
3 changes: 2 additions & 1 deletion aggregate_lifecycle.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package gddd

import (
"errors"
"google.golang.org/protobuf/reflect/protoreflect"
"strings"
"sync"
)
Expand All @@ -11,7 +12,7 @@ type aggregateLifecycle struct {
domainEvents []DomainEvent
}

type aggregateChange interface{}
type aggregateChange = protoreflect.ProtoMessage

func (c *aggregateLifecycle) apply(agg Aggregate, aggChange aggregateChange) {
c.mutex.Lock()
Expand Down
7 changes: 4 additions & 3 deletions event.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package gddd
import (
"encoding/json"
"fmt"
"google.golang.org/protobuf/reflect/protoreflect"
"time"
)

Expand All @@ -16,7 +17,7 @@ type DomainEvent interface {
AggregateName() string
EventId() int64
EventName() string
EventBody() interface{}
EventBody() protoreflect.ProtoMessage
EventBodyRaw() ([]byte, error)
EventCreateTime() time.Time
initEventId()
Expand All @@ -30,7 +31,7 @@ type SampleDomainEvent struct {
aggregateName string
eventId int64
eventName string
eventBody interface{}
eventBody protoreflect.ProtoMessage
}

func (s *SampleDomainEvent) initEventId() {
Expand Down Expand Up @@ -60,7 +61,7 @@ func (s *SampleDomainEvent) EventName() (name string) {
return
}

func (s *SampleDomainEvent) EventBody() (body interface{}) {
func (s *SampleDomainEvent) EventBody() (body protoreflect.ProtoMessage) {
body = s.eventBody
return
}
Expand Down
39 changes: 34 additions & 5 deletions event_bus_dtm.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,12 @@ import (
"github.com/dtm-labs/client/dtmcli"
"github.com/dtm-labs/client/dtmgrpc"
"github.com/lithammer/shortuuid/v3"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
"log"
"runtime"
"strings"
"time"
)

type DtmEventProducerConfig struct {
Expand Down Expand Up @@ -52,10 +57,14 @@ func NewDtmEventProducer(ctx context.Context, config DtmEventProducerConfig) (ev
return
}

func ExecuteLocalTransaction(ctx context.Context, es EventStore, eventsMessages []DomainEventMessage) error {
func ExecuteLocalTransaction(ctx context.Context, es EventStore, eventsMessages []DomainEvent) error {
storedEvents := make([]StoredEvent, 0, 1)
for _, eventMessage := range eventsMessages {
storedEvent, err := newJsonStoredEvent(eventMessage.AggregateId, eventMessage.AggregateName, eventMessage.EventId, eventMessage.EventName, eventMessage.EventBody)
EventBodyRaw, err := eventMessage.EventBodyRaw()
if err != nil {

}
storedEvent, err := newJsonStoredEvent(eventMessage.AggregateId(), eventMessage.AggregateName(), eventMessage.EventId(), eventMessage.EventName(), EventBodyRaw)
if err != nil {
return fmt.Errorf("newJsonStoredEvent error")
}
Expand All @@ -68,7 +77,7 @@ func ExecuteLocalTransaction(ctx context.Context, es EventStore, eventsMessages
return nil
}

func (p *DtmEventProducer) Send(ctx context.Context, eventMessages ...DomainEventMessage) (err error) {
func (p *DtmEventProducer) Send(ctx context.Context, eventMessages ...DomainEvent) (err error) {
if eventMessages == nil || len(eventMessages) == 0 {
err = fmt.Errorf("dtm event producer send event failed, eventMessages is nil or empty")
return
Expand All @@ -83,8 +92,7 @@ func (p *DtmEventProducer) Send(ctx context.Context, eventMessages ...DomainEven
}
fmt.Errorf("%+v", messageBody)
//dtmMsg = dtmMsg.Add("http://localhost:8081/api/busi/TransIn", &messageBody)
msg := newDomainEventMessageProto(eventMessage)
dtmMsg = dtmMsg.Add("localhost:8080/proto.userQuery/insertUser", &msg)
dtmMsg = dtmMsg.Add("localhost:8080/proto.userQuery/insertUser", eventMessage.EventBody())
}
err = dtmMsg.DoAndSubmitDB("localhost:8081/busi.Busi/QueryPreparedB", p.EventStore.GetDB(ctx), func(tx *sql.Tx) error {
// TODO use tx
Expand All @@ -96,6 +104,27 @@ func (p *DtmEventProducer) Send(ctx context.Context, eventMessages ...DomainEven
return
}

func UnaryClientInterceptor() grpc.UnaryClientInterceptor {
return func(ctx context.Context, method string, req, reply interface{},
cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
// 预处理(pre-processing)
start := time.Now()
// 获取正在运行程序的操作系统
cos := runtime.GOOS
// 将操作系统信息附加到传出请求
ctx = metadata.AppendToOutgoingContext(ctx, "client-os", cos)

// 可以看做是当前 RPC 方法,一般在拦截器中调用 invoker 能达到调用 RPC 方法的效果,当然底层也是 gRPC 在处理。
// 调用RPC方法(invoking RPC method)
err := invoker(ctx, method, req, reply, cc, opts...)

// 后处理(post-processing)
end := time.Now()
log.Printf("RPC: %s,,client-OS: '%v' req:%v start time: %s, end time: %s, err: %v", method, cos, req, start.Format(time.RFC3339), end.Format(time.RFC3339), err)
return err
}
}

type DtmEventConsumerConfig struct {
DomainName string
GroupName string
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ require (
github.com/jinzhu/copier v0.3.5
github.com/lithammer/shortuuid/v3 v3.0.7
github.com/valyala/bytebufferpool v1.0.0
google.golang.org/grpc v1.48.0
google.golang.org/protobuf v1.28.0
)

Expand Down Expand Up @@ -56,7 +57,6 @@ require (
golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f // indirect
golang.org/x/text v0.3.6 // indirect
google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013 // indirect
google.golang.org/grpc v1.48.0 // indirect
gopkg.in/natefinch/lumberjack.v2 v2.0.0 // indirect
)

Expand Down
10 changes: 6 additions & 4 deletions message.proto
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ syntax = "proto3";
package proto;
option go_package = "github.com/wuyazi/gddd/proto";

import "google/protobuf/timestamp.proto";

message DomainEventMessageProto {
int64 aggregateId = 1;
string aggregateName = 2;
Expand All @@ -20,8 +22,8 @@ message AbstractEvent {
}

message Extra {
int64 aggregateId = 1;
string aggregateName = 2;
int64 eventId = 3;
string eventName = 4;
string user_ip = 5;
string user_agent = 6;
string accept_language = 7;
google.protobuf.Timestamp createTime = 3;
}
13 changes: 6 additions & 7 deletions repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,13 +171,12 @@ func (r *Repository) Save(ctx context.Context, aggregates ...Aggregate) (ok bool
eventsMessages = append(eventsMessages, publishEventsMessage)
}
abstractAggregate.lifecycle.cleanDomainEvents()
}

// send event message
sendErr := r.eb.Send(ctx, eventsMessages...)
if sendErr != nil {
err = fmt.Errorf("aggregates repository save warn, send domain event failed, %v", sendErr)
return
// send message
sendErr := r.eb.Send(ctx, domainEvents...)
if sendErr != nil {
err = fmt.Errorf("aggregates repository save warn, send domain event failed, %v", sendErr)
return
}
}

ok = true
Expand Down

0 comments on commit 7909a9b

Please sign in to comment.