diff --git a/policy/data/gql_async.go b/policy/data/gql_async.go index 6808116..fc8a15c 100644 --- a/policy/data/gql_async.go +++ b/policy/data/gql_async.go @@ -3,6 +3,7 @@ package data import ( "bytes" "context" + b64 "encoding/base64" "fmt" "io" "net/http" @@ -12,6 +13,8 @@ import ( "olympos.io/encoding/edn" ) +const AsyncQueryName = "async-query" + type ( AsyncQueryBody struct { Query string `edn:"query"` @@ -25,52 +28,77 @@ type ( } AsyncQueryResponse struct { - Data map[edn.Keyword]edn.RawMessage `edn:"data"` + Data edn.RawMessage `edn:"data"` Errors []struct { Message string `edn:"message"` } } + AsyncResultMetadata struct { + SubscriptionResults [][]edn.RawMessage `edn:"subscription"` + AsyncQueryResults map[string]AsyncQueryResponse `edn:"results"` + InFlightQueryName string `edn:"query-name"` + } + AsyncDataSource struct { - log skill.Logger - url string - token string - metadata string + log skill.Logger + url string + token string + subscriptionResults [][]edn.RawMessage + asyncResults map[string]AsyncQueryResponse } ) -func NewAsyncDataSource(req skill.RequestContext, metadata string) AsyncDataSource { +func NewAsyncDataSource(req skill.RequestContext, subscriptionResults [][]edn.RawMessage, asyncResults map[string]AsyncQueryResponse) AsyncDataSource { return AsyncDataSource{ - log: req.Log, - url: fmt.Sprintf("%s:enqueue", req.Event.Urls.Graphql), - token: req.Event.Token, - metadata: metadata, + log: req.Log, + url: fmt.Sprintf("%s:enqueue", req.Event.Urls.Graphql), + token: req.Event.Token, + subscriptionResults: subscriptionResults, + asyncResults: asyncResults, } } func (ds AsyncDataSource) Query(ctx context.Context, queryName string, query string, variables map[string]interface{}, output interface{}) (*QueryResponse, error) { + if existingResult, ok := ds.asyncResults[queryName]; ok { + if len(existingResult.Errors) != 0 { + return nil, fmt.Errorf("async query returned error: %s", existingResult.Errors[0].Message) + } + return &QueryResponse{}, edn.Unmarshal(existingResult.Data, output) + } + ednVariables := map[edn.Keyword]interface{}{} for k, v := range variables { ednVariables[edn.Keyword(k)] = v } + metadata := AsyncResultMetadata{ + SubscriptionResults: ds.subscriptionResults, + AsyncQueryResults: ds.asyncResults, + InFlightQueryName: queryName, + } + metadataEdn, err := edn.Marshal(metadata) + if err != nil { + return nil, fmt.Errorf("failed to marshal metadata: %w", err) + } + request := AsyncQueryRequest{ - Name: queryName, + Name: AsyncQueryName, Body: AsyncQueryBody{ Query: query, Variables: ednVariables, }, - Metadata: ds.metadata, + Metadata: b64.StdEncoding.EncodeToString(metadataEdn), } - edn, err := edn.Marshal(request) + reqEdn, err := edn.Marshal(request) if err != nil { return nil, err } - ds.log.Infof("Async request: %s", string(edn)) + ds.log.Infof("Async request: %s", string(reqEdn)) - req, err := http.NewRequest(http.MethodPost, ds.url, bytes.NewBuffer(edn)) + req, err := http.NewRequest(http.MethodPost, ds.url, bytes.NewBuffer(reqEdn)) if err != nil { return nil, err } @@ -94,27 +122,3 @@ func (ds AsyncDataSource) Query(ctx context.Context, queryName string, query str return &QueryResponse{AsyncRequestMade: true}, nil } - -func UnwrapAsyncResponse(result map[edn.Keyword]edn.RawMessage) (DataSource, error) { - ednBody, err := edn.Marshal(result) - if err != nil { - return nil, err - } - - var response AsyncQueryResponse - err = edn.Unmarshal(ednBody, &response) - if err != nil { - return nil, err - } - - if len(response.Errors) > 0 { - return nil, fmt.Errorf(response.Errors[0].Message) - } - - queryResponses := map[string][]byte{} - for k, v := range response.Data { - queryResponses[string(k)] = v - } - - return NewFixedDataSource(edn.Unmarshal, queryResponses), nil -} diff --git a/policy/policy_handler/async.go b/policy/policy_handler/async.go index 5726405..8220d1f 100644 --- a/policy/policy_handler/async.go +++ b/policy/policy_handler/async.go @@ -4,13 +4,12 @@ import ( "context" b64 "encoding/base64" "fmt" - "github.com/atomist-skills/go-skill" "github.com/atomist-skills/go-skill/policy/data" "olympos.io/encoding/edn" ) -const eventNameAsyncQuery = "async_query" +const EventNameAsyncQuery = data.AsyncQueryName // these must match for the event handler to be registered // WithAsync will enable async graphql queries for the EventHandler. // When used, data.QueryResponse#AsyncRequestMade will be true when performed asynchronously. @@ -18,30 +17,29 @@ const eventNameAsyncQuery = "async_query" // It will be automatically retried once the async query results are returned. func WithAsync() Opt { return func(h *EventHandler) { - h.subscriptionNames = append(h.subscriptionNames, eventNameAsyncQuery) + h.subscriptionNames = append(h.subscriptionNames, EventNameAsyncQuery) h.subscriptionDataProviders = append(h.subscriptionDataProviders, getAsyncSubscriptionData) h.dataSourceProviders = append(h.dataSourceProviders, buildAsyncDataSources) } } func getAsyncSubscriptionData(ctx context.Context, req skill.RequestContext) ([][]edn.RawMessage, skill.Configuration, error) { - if req.Event.Context.AsyncQueryResult.Name != eventNameAsyncQuery { + if req.Event.Context.AsyncQueryResult.Name != EventNameAsyncQuery { return nil, skill.Configuration{}, nil } - metadata := req.Event.Context.AsyncQueryResult.Metadata - encoded, err := b64.StdEncoding.DecodeString(metadata) + metaEdn, err := b64.StdEncoding.DecodeString(req.Event.Context.AsyncQueryResult.Metadata) if err != nil { return nil, skill.Configuration{}, fmt.Errorf("failed to decode async metadata: %w", err) } - var subscriptionResult [][]edn.RawMessage - err = edn.Unmarshal(encoded, &subscriptionResult) + var metadata data.AsyncResultMetadata + err = edn.Unmarshal(metaEdn, &metadata) if err != nil { return nil, skill.Configuration{}, fmt.Errorf("failed to unmarshal async metadata: %w", err) } - return subscriptionResult, req.Event.Context.AsyncQueryResult.Configuration, nil + return metadata.SubscriptionResults, req.Event.Context.AsyncQueryResult.Configuration, nil } // buildAsyncDataSources always returns at least a data.AsyncDataSource, @@ -52,23 +50,28 @@ func buildAsyncDataSources(ctx context.Context, req skill.RequestContext) ([]dat return []data.DataSource{}, nil } - if req.Event.Context.AsyncQueryResult.Name == eventNameAsyncQuery { - responseSource, err := data.UnwrapAsyncResponse(req.Event.Context.AsyncQueryResult.Result) - if err != nil { - return nil, err - } + if req.Event.Context.AsyncQueryResult.Name != EventNameAsyncQuery { return []data.DataSource{ - responseSource, - data.NewAsyncDataSource(req, req.Event.Context.AsyncQueryResult.Metadata), + data.NewAsyncDataSource(req, req.Event.Context.Subscription.Result, map[string]data.AsyncQueryResponse{}), }, nil } - ednBody, err := edn.Marshal(req.Event.Context.Subscription.Result) + metaEdn, err := b64.StdEncoding.DecodeString(req.Event.Context.AsyncQueryResult.Metadata) + + var metadata data.AsyncResultMetadata + err = edn.Unmarshal(metaEdn, &metadata) + if err != nil { + return nil, fmt.Errorf("failed to unmarshal metadata: %w", err) + } + + var queryResponse data.AsyncQueryResponse + err = edn.Unmarshal(req.Event.Context.AsyncQueryResult.Result, &queryResponse) if err != nil { - return nil, fmt.Errorf("failed to marshal metadata [%w]", err) + return nil, fmt.Errorf("failed to unmarshal async query result: %w", err) } + metadata.AsyncQueryResults[metadata.InFlightQueryName] = queryResponse return []data.DataSource{ - data.NewAsyncDataSource(req, b64.StdEncoding.EncodeToString(ednBody)), + data.NewAsyncDataSource(req, metadata.SubscriptionResults, metadata.AsyncQueryResults), }, nil } diff --git a/types.go b/types.go index f4c39f9..29297c2 100644 --- a/types.go +++ b/types.go @@ -73,10 +73,10 @@ type EventIncoming struct { Metadata map[edn.Keyword]edn.RawMessage `edn:"metadata"` } `edn:"sync-request"` AsyncQueryResult struct { - Name string `edn:"name"` - Configuration Configuration `edn:"configuration"` - Metadata string `edn:"metadata"` - Result map[edn.Keyword]edn.RawMessage `edn:"result"` + Name string `edn:"name"` + Configuration Configuration `edn:"configuration"` + Metadata string `edn:"metadata"` + Result edn.RawMessage `edn:"result"` } `edn:"query-result"` Event struct { Name string `edn:"name"`