Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Record request/response pairs #72

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 55 additions & 34 deletions internal/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ import (
"encoding/json"
"fmt"
"os"
"reflect"
"strings"
"time"

"github.com/apache/thrift/lib/go/thrift"
"github.com/databricks/databricks-sql-go/driverctx"
Expand All @@ -18,7 +21,6 @@ import (

// this is used to generate test data. Developer should change this manually
var RecordResults bool
var resultIndex int

type ThriftServiceClient struct {
*cli_service.TCLIServiceClient
Expand All @@ -33,54 +35,49 @@ func (tsc *ThriftServiceClient) OpenSession(ctx context.Context, req *cli_servic
log := logger.WithContext(SprintGuid(resp.SessionHandle.SessionId.GUID), driverctx.CorrelationIdFromContext(ctx), "")
defer log.Duration(msg, start)
if RecordResults {
j, _ := json.MarshalIndent(resp, "", " ")
_ = os.WriteFile(fmt.Sprintf("OpenSession%d.json", resultIndex), j, 0600)
resultIndex++
recordRequestResponse(req, resp, time.Since(start))
}
return resp, CheckStatus(resp)
}

func (tsc *ThriftServiceClient) CloseSession(ctx context.Context, req *cli_service.TCloseSessionReq) (*cli_service.TCloseSessionResp, error) {
log := logger.WithContext(driverctx.ConnIdFromContext(ctx), driverctx.CorrelationIdFromContext(ctx), "")
defer log.Duration(logger.Track("CloseSession"))
msg, start := logger.Track("CloseSession")
defer log.Duration(msg, start)
resp, err := tsc.TCLIServiceClient.CloseSession(ctx, req)
if err != nil {
return resp, errors.Wrap(err, "close session request error")
}
if RecordResults {
j, _ := json.MarshalIndent(resp, "", " ")
_ = os.WriteFile(fmt.Sprintf("CloseSession%d.json", resultIndex), j, 0600)
resultIndex++
recordRequestResponse(req, resp, time.Since(start))
}
return resp, CheckStatus(resp)
}

func (tsc *ThriftServiceClient) FetchResults(ctx context.Context, req *cli_service.TFetchResultsReq) (*cli_service.TFetchResultsResp, error) {
log := logger.WithContext(driverctx.ConnIdFromContext(ctx), driverctx.CorrelationIdFromContext(ctx), SprintGuid(req.OperationHandle.OperationId.GUID))
defer log.Duration(logger.Track("FetchResults"))
msg, start := logger.Track("FetchResults")
defer log.Duration(msg, start)
resp, err := tsc.TCLIServiceClient.FetchResults(ctx, req)
if err != nil {
return resp, errors.Wrap(err, "fetch results request error")
}
if RecordResults {
j, _ := json.MarshalIndent(resp, "", " ")
_ = os.WriteFile(fmt.Sprintf("FetchResults%d.json", resultIndex), j, 0600)
resultIndex++
recordRequestResponse(req, resp, time.Since(start))
}
return resp, CheckStatus(resp)
}

func (tsc *ThriftServiceClient) GetResultSetMetadata(ctx context.Context, req *cli_service.TGetResultSetMetadataReq) (*cli_service.TGetResultSetMetadataResp, error) {
log := logger.WithContext(driverctx.ConnIdFromContext(ctx), driverctx.CorrelationIdFromContext(ctx), SprintGuid(req.OperationHandle.OperationId.GUID))
defer log.Duration(logger.Track("GetResultSetMetadata"))
msg, start := logger.Track("GetResultSetMetadata")
defer log.Duration(msg, start)
resp, err := tsc.TCLIServiceClient.GetResultSetMetadata(ctx, req)
if err != nil {
return resp, errors.Wrap(err, "get result set metadata request error")
}
if RecordResults {
j, _ := json.MarshalIndent(resp, "", " ")
_ = os.WriteFile(fmt.Sprintf("ExecuteStatement%d.json", resultIndex), j, 0600)
resultIndex++
recordRequestResponse(req, resp, time.Since(start))
}
return resp, CheckStatus(resp)
}
Expand All @@ -92,12 +89,7 @@ func (tsc *ThriftServiceClient) ExecuteStatement(ctx context.Context, req *cli_s
return resp, errors.Wrap(err, "execute statement request error")
}
if RecordResults {
j, _ := json.MarshalIndent(resp, "", " ")
_ = os.WriteFile(fmt.Sprintf("ExecuteStatement%d.json", resultIndex), j, 0600)
// f, _ := os.ReadFile(fmt.Sprintf("ExecuteStatement%d.json", resultIndex))
// var resp2 cli_service.TExecuteStatementResp
// json.Unmarshal(f, &resp2)
resultIndex++
recordRequestResponse(req, resp, time.Since(start))
}
if resp != nil && resp.OperationHandle != nil {
log := logger.WithContext(driverctx.ConnIdFromContext(ctx), driverctx.CorrelationIdFromContext(ctx), SprintGuid(resp.OperationHandle.OperationId.GUID))
Expand All @@ -108,45 +100,42 @@ func (tsc *ThriftServiceClient) ExecuteStatement(ctx context.Context, req *cli_s

func (tsc *ThriftServiceClient) GetOperationStatus(ctx context.Context, req *cli_service.TGetOperationStatusReq) (*cli_service.TGetOperationStatusResp, error) {
log := logger.WithContext(driverctx.ConnIdFromContext(ctx), driverctx.CorrelationIdFromContext(ctx), SprintGuid(req.OperationHandle.OperationId.GUID))
defer log.Duration(logger.Track("GetOperationStatus"))
msg, start := logger.Track("GetOperationStatus")
defer log.Duration(msg, start)
resp, err := tsc.TCLIServiceClient.GetOperationStatus(ctx, req)
if err != nil {
return resp, errors.Wrap(err, "get operation status request error")
}
if RecordResults {
j, _ := json.MarshalIndent(resp, "", " ")
_ = os.WriteFile(fmt.Sprintf("GetOperationStatus%d.json", resultIndex), j, 0600)
resultIndex++
recordRequestResponse(req, resp, time.Since(start))
}
return resp, CheckStatus(resp)
}

func (tsc *ThriftServiceClient) CloseOperation(ctx context.Context, req *cli_service.TCloseOperationReq) (*cli_service.TCloseOperationResp, error) {
log := logger.WithContext(driverctx.ConnIdFromContext(ctx), driverctx.CorrelationIdFromContext(ctx), SprintGuid(req.OperationHandle.OperationId.GUID))
defer log.Duration(logger.Track("CloseOperation"))
msg, start := logger.Track("CloseOperation")
defer log.Duration(msg, start)
resp, err := tsc.TCLIServiceClient.CloseOperation(ctx, req)
if err != nil {
return resp, errors.Wrap(err, "close operation request error")
}
if RecordResults {
j, _ := json.MarshalIndent(resp, "", " ")
_ = os.WriteFile(fmt.Sprintf("CloseOperation%d.json", resultIndex), j, 0600)
resultIndex++
recordRequestResponse(req, resp, time.Since(start))
}
return resp, CheckStatus(resp)
}

func (tsc *ThriftServiceClient) CancelOperation(ctx context.Context, req *cli_service.TCancelOperationReq) (*cli_service.TCancelOperationResp, error) {
log := logger.WithContext(driverctx.ConnIdFromContext(ctx), driverctx.CorrelationIdFromContext(ctx), SprintGuid(req.OperationHandle.OperationId.GUID))
defer log.Duration(logger.Track("CancelOperation"))
msg, start := logger.Track("CancelOperation")
defer log.Duration(msg, start)
resp, err := tsc.TCLIServiceClient.CancelOperation(ctx, req)
if err != nil {
return resp, errors.Wrap(err, "cancel operation request error")
}
if RecordResults {
j, _ := json.MarshalIndent(resp, "", " ")
_ = os.WriteFile(fmt.Sprintf("CancelOperation%d.json", resultIndex), j, 0600)
resultIndex++
recordRequestResponse(req, resp, time.Since(start))
}
return resp, CheckStatus(resp)
}
Expand Down Expand Up @@ -254,3 +243,35 @@ func SprintGuid(bts []byte) string {
logger.Warn().Msgf("GUID not valid: %x", bts)
return fmt.Sprintf("%x", bts)
}

type rr struct {
RequestType string
RequestTime time.Duration
Request any
Response any
}

func recordRequestResponse(req, resp any, dur time.Duration) {

rt := reflect.TypeOf(req).String()
rt = rt[strings.LastIndex(rt, ".")+1:]
pair := rr{
RequestType: rt,
RequestTime: dur,
Request: req,
Response: resp,
}

j, err := json.MarshalIndent(&pair, "", " ")
// fmt.Println(string(j))
if err == nil {
f, err := os.OpenFile("session.json", os.O_APPEND|os.O_WRONLY|os.O_CREATE, 0600)
if err == nil {
defer f.Close()
_, err = f.WriteString(string(j) + ",\n")
if err != nil {
fmt.Println(err.Error())
}
}
}
}