Skip to content

Commit db6565d

Browse files
committed
kitex error handling
1 parent 4836146 commit db6565d

File tree

3 files changed

+39
-4
lines changed

3 files changed

+39
-4
lines changed

workflow/imp.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@ func (w *workflowFactory) newWorkflow(ctx context.Context, name string, gid stri
7474
wf.Context = context.WithValue(wf.Context, wfMeta{}, wf)
7575
wf.Options.HTTPResp2DtmError = HTTPResp2DtmError
7676
wf.Options.GRPCError2DtmError = GrpcError2DtmError
77+
wf.Options.KITEXError2DtmError = KitexError2DtmError
7778
wf.initRestyClient()
7879
return wf
7980
}

workflow/utils.go

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@ import (
77
"net/http"
88
"strconv"
99

10+
kitexcodes "github.com/cloudwego/kitex/pkg/remote/trans/nphttp2/codes"
11+
kitexstatus "github.com/cloudwego/kitex/pkg/remote/trans/nphttp2/status"
1012
"github.com/dtm-labs/client/dtmcli"
1113
"github.com/dtm-labs/client/dtmcli/dtmimp"
1214
"github.com/dtm-labs/client/dtmgrpc/dtmgimp"
@@ -94,6 +96,17 @@ func GrpcError2DtmError(err error) error {
9496
return err
9597
}
9698

99+
// KitexError2DtmError translate kitex error to dtm error
100+
func KitexError2DtmError(err error) error {
101+
st, _ := kitexstatus.FromError(err)
102+
if st != nil && (st.Code() == kitexcodes.Aborted || st.Code() == kitexcodes.Internal) {
103+
return dtmcli.ErrorMessage2Error(st.Message(), dtmcli.ErrFailure)
104+
} else if st != nil && st.Code() == kitexcodes.FailedPrecondition {
105+
return dtmcli.ErrorMessage2Error(st.Message(), dtmcli.ErrOngoing)
106+
}
107+
return err
108+
}
109+
97110
func (wf *Workflow) stepResultFromLocal(data []byte, err error) *stepResult {
98111
return &stepResult{
99112
Error: err,
@@ -124,6 +137,24 @@ func (wf *Workflow) stepResultToGrpc(s *stepResult, reply interface{}) error {
124137
return s.Error
125138
}
126139

140+
func (wf *Workflow) stepResultFromKitex(reply interface{}, err error) *stepResult {
141+
sr := &stepResult{Error: wf.Options.KITEXError2DtmError(err)}
142+
sr.Status = wfErrorToStatus(sr.Error)
143+
if sr.Error == nil {
144+
sr.Data = dtmimp.MustMarshal(reply)
145+
} else if sr.Status == dtmcli.StatusFailed {
146+
sr.Data = []byte(err.Error())
147+
}
148+
return sr
149+
}
150+
151+
func (wf *Workflow) stepResultToKitex(s *stepResult, reply interface{}) error {
152+
if s.Error == nil && s.Status == dtmcli.StatusSucceed {
153+
dtmimp.MustUnmarshal(s.Data, &reply)
154+
}
155+
return s.Error
156+
}
157+
127158
func (wf *Workflow) stepResultFromHTTP(resp *http.Response, err error) *stepResult {
128159
sr := &stepResult{Error: err}
129160
if err == nil {

workflow/workflow.go

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,9 @@ type Options struct {
9393
// Default == GrpcError2DtmError: Code Aborted => ErrFailure; Code FailedPrecondition => ErrOngoing
9494
GRPCError2DtmError func(error) error
9595

96+
// Default == KitexError2DtmError: Code Aborted | Code Internal => ErrFailure; Code FailedPrecondition => ErrOngoing
97+
KITEXError2DtmError func(error) error
98+
9699
// This Option specify whether a branch returning ErrFailure should be compensated on rollback.
97100
// for most idempotent branches, no compensation is needed.
98101
// But for a timeout request, the caller cannot know where the request is successful, so the compensation should be called
@@ -264,7 +267,7 @@ func Interceptor(ctx context.Context, method string, req, reply interface{}, cc
264267
return wf.stepResultToGrpc(sr, reply)
265268
}
266269

267-
// KitexInterceptor is the middleware for workflow to capture grpc call result
270+
// KitexInterceptor is the middleware for workflow to capture kitex grpc call result
268271
func KitexInterceptor(next endpoint.Endpoint) endpoint.Endpoint {
269272
return func(ctx context.Context, req, resp interface{}) (err error) {
270273
ri := rpcinfo.GetRPCInfo(ctx)
@@ -278,7 +281,7 @@ func KitexInterceptor(next endpoint.Endpoint) endpoint.Endpoint {
278281
origin := func() error {
279282
ctx1 := dtmgimp.TransInfo2Ctx(ctx, wf.Gid, wf.TransType, wf.currentBranch, wf.currentOp, wf.Dtm)
280283
err := next(ctx1, req, resp)
281-
res := fmt.Sprintf("grpc client called: %s%s %s result: %s err: %v",
284+
res := fmt.Sprintf("Kitex client called: %s%s %s result: %s err: %v",
282285
ri.To().ServiceName(), ri.To().Method(), dtmimp.MustMarshalString(req), dtmimp.MustMarshalString(resp), err)
283286
if err != nil {
284287
logger.Errorf("%s", res)
@@ -292,9 +295,9 @@ func KitexInterceptor(next endpoint.Endpoint) endpoint.Endpoint {
292295
}
293296
sr := wf.recordedDo(func(bb *dtmcli.BranchBarrier) *stepResult {
294297
err := origin()
295-
return wf.stepResultFromGrpc(resp, err)
298+
return wf.stepResultFromKitex(resp, err)
296299
})
297-
return wf.stepResultToGrpc(sr, resp)
300+
return wf.stepResultToKitex(sr, resp)
298301
}
299302

300303
}

0 commit comments

Comments
 (0)