Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,6 @@

# Dependency directories (remove the comment below to include it)
# vendor/

# IDE
.idea/
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ module github.com/dtm-labs/client
go 1.16

require (
github.com/cloudwego/kitex v0.4.2
github.com/dtm-labs/dtmdriver v0.0.6
github.com/go-redis/redis/v8 v8.11.5
github.com/go-resty/resty/v2 v2.7.0
Expand Down
73 changes: 73 additions & 0 deletions go.sum

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions workflow/imp.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ func (w *workflowFactory) newWorkflow(ctx context.Context, name string, gid stri
wf.Context = context.WithValue(wf.Context, wfMeta{}, wf)
wf.Options.HTTPResp2DtmError = HTTPResp2DtmError
wf.Options.GRPCError2DtmError = GrpcError2DtmError
wf.Options.KITEXError2DtmError = KitexError2DtmError
wf.initRestyClient()
return wf
}
Expand Down
31 changes: 31 additions & 0 deletions workflow/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"net/http"
"strconv"

kitexcodes "github.com/cloudwego/kitex/pkg/remote/trans/nphttp2/codes"
kitexstatus "github.com/cloudwego/kitex/pkg/remote/trans/nphttp2/status"
"github.com/dtm-labs/client/dtmcli"
"github.com/dtm-labs/client/dtmcli/dtmimp"
"github.com/dtm-labs/client/dtmgrpc/dtmgimp"
Expand Down Expand Up @@ -94,6 +96,17 @@ func GrpcError2DtmError(err error) error {
return err
}

// KitexError2DtmError translate kitex error to dtm error
func KitexError2DtmError(err error) error {
st, _ := kitexstatus.FromError(err)
if st != nil && (st.Code() == kitexcodes.Aborted || st.Code() == kitexcodes.Internal) {
return dtmcli.ErrorMessage2Error(st.Message(), dtmcli.ErrFailure)
} else if st != nil && st.Code() == kitexcodes.FailedPrecondition {
return dtmcli.ErrorMessage2Error(st.Message(), dtmcli.ErrOngoing)
}
return err
}

func (wf *Workflow) stepResultFromLocal(data []byte, err error) *stepResult {
return &stepResult{
Error: err,
Expand Down Expand Up @@ -124,6 +137,24 @@ func (wf *Workflow) stepResultToGrpc(s *stepResult, reply interface{}) error {
return s.Error
}

func (wf *Workflow) stepResultFromKitex(reply interface{}, err error) *stepResult {
sr := &stepResult{Error: wf.Options.KITEXError2DtmError(err)}
sr.Status = wfErrorToStatus(sr.Error)
if sr.Error == nil {
sr.Data = dtmimp.MustMarshal(reply)
} else if sr.Status == dtmcli.StatusFailed {
sr.Data = []byte(err.Error())
}
return sr
}

func (wf *Workflow) stepResultToKitex(s *stepResult, reply interface{}) error {
if s.Error == nil && s.Status == dtmcli.StatusSucceed {
dtmimp.MustUnmarshal(s.Data, &reply)
}
return s.Error
}

func (wf *Workflow) stepResultFromHTTP(resp *http.Response, err error) *stepResult {
sr := &stepResult{Error: err}
if err == nil {
Expand Down
40 changes: 40 additions & 0 deletions workflow/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"context"
"database/sql"
"fmt"
"github.com/cloudwego/kitex/pkg/endpoint"
"github.com/cloudwego/kitex/pkg/rpcinfo"
"net/http"
"net/url"

Expand Down Expand Up @@ -91,6 +93,9 @@ type Options struct {
// Default == GrpcError2DtmError: Code Aborted => ErrFailure; Code FailedPrecondition => ErrOngoing
GRPCError2DtmError func(error) error

// Default == KitexError2DtmError: Code Aborted | Code Internal => ErrFailure; Code FailedPrecondition => ErrOngoing
KITEXError2DtmError func(error) error

// This Option specify whether a branch returning ErrFailure should be compensated on rollback.
// for most idempotent branches, no compensation is needed.
// But for a timeout request, the caller cannot know where the request is successful, so the compensation should be called
Expand Down Expand Up @@ -261,3 +266,38 @@ func Interceptor(ctx context.Context, method string, req, reply interface{}, cc
})
return wf.stepResultToGrpc(sr, reply)
}

// KitexInterceptor is the middleware for workflow to capture kitex grpc call result
func KitexInterceptor(next endpoint.Endpoint) endpoint.Endpoint {
return func(ctx context.Context, req, resp interface{}) (err error) {
ri := rpcinfo.GetRPCInfo(ctx)

logger.Debugf("Kitex client calling: %s%s %v", ri.To().ServiceName(), ri.To().Method(), dtmimp.MustMarshalString(req))
wfVal := ctx.Value(wfMeta{})
if wfVal == nil {
return next(ctx, req, resp)
}
wf, _ := wfVal.(*Workflow)
origin := func() error {
ctx1 := dtmgimp.TransInfo2Ctx(ctx, wf.Gid, wf.TransType, wf.currentBranch, wf.currentOp, wf.Dtm)
err := next(ctx1, req, resp)
res := fmt.Sprintf("Kitex client called: %s%s %s result: %s err: %v",
ri.To().ServiceName(), ri.To().Method(), dtmimp.MustMarshalString(req), dtmimp.MustMarshalString(resp), err)
if err != nil {
logger.Errorf("%s", res)
} else {
logger.Debugf("%s", res)
}
return err
}
if wf.currentOp != dtmimp.OpAction {
return origin()
}
sr := wf.recordedDo(func(bb *dtmcli.BranchBarrier) *stepResult {
err := origin()
return wf.stepResultFromKitex(resp, err)
})
return wf.stepResultToKitex(sr, resp)
}

}