Skip to content

Commit 97226b9

Browse files
ddelemenyfmassot
authored andcommitted
Add shallow unmarshal step to MultiSearch
1 parent 9499ff0 commit 97226b9

File tree

6 files changed

+23
-17
lines changed

6 files changed

+23
-17
lines changed

pkg/quickwit/client/client.go

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ type ConfiguredFields struct {
4141

4242
// Client represents a client which can interact with elasticsearch api
4343
type Client interface {
44-
ExecuteMultisearch(r []*SearchRequest) (*MultiSearchResponse, error)
44+
ExecuteMultisearch(r []*SearchRequest) ([]*json.RawMessage, error)
4545
}
4646

4747
var logger = log.New()
@@ -84,7 +84,12 @@ func (c *baseClientImpl) makeRequest(method, uriPath, uriQuery string, body []by
8484
return req, nil
8585
}
8686

87-
func (c *baseClientImpl) ExecuteMultisearch(requests []*SearchRequest) (*MultiSearchResponse, error) {
87+
// Multisearch uses a shallow unmarshalled struct to defer the decoding to downstream handlers
88+
type MultiSearchResponse struct {
89+
Responses []*json.RawMessage `json:"responses"`
90+
}
91+
92+
func (c *baseClientImpl) ExecuteMultisearch(requests []*SearchRequest) ([]*json.RawMessage, error) {
8893
req, err := c.createMultiSearchRequest(requests, c.index)
8994
if err != nil {
9095
return nil, err
@@ -129,7 +134,7 @@ func (c *baseClientImpl) ExecuteMultisearch(requests []*SearchRequest) (*MultiSe
129134
elapsed := time.Since(start)
130135
logger.Debug("Decoded multisearch json response", "took", elapsed)
131136

132-
return &msr, nil
137+
return msr.Responses, nil
133138
}
134139

135140
func (c *baseClientImpl) makeMultiSearchPayload(searchRequests []*SearchRequest, index string) ([]byte, error) {

pkg/quickwit/client/client_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ func TestClient_ExecuteMultisearch(t *testing.T) {
9494

9595
assert.Equal(t, "15s", jBody.GetPath("aggs", "2", "date_histogram", "fixed_interval").MustString())
9696

97-
require.Len(t, res.Responses, 1)
97+
require.Len(t, res, 1)
9898
})
9999
}
100100

pkg/quickwit/client/models.go

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -59,11 +59,6 @@ type SearchResponse struct {
5959
Hits *SearchResponseHits `json:"hits"`
6060
}
6161

62-
// MultiSearchResponse represents a multi search response
63-
type MultiSearchResponse struct {
64-
Responses []*SearchResponse `json:"responses"`
65-
}
66-
6762
// Query represents a query
6863
type Query struct {
6964
Bool *BoolQuery `json:"bool"`

pkg/quickwit/data_query_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1667,9 +1667,9 @@ func newFakeClient() *fakeClient {
16671667
}
16681668
}
16691669

1670-
func (c *fakeClient) ExecuteMultisearch(r []*es.SearchRequest) (*es.MultiSearchResponse, error) {
1670+
func (c *fakeClient) ExecuteMultisearch(r []*es.SearchRequest) ([]*json.RawMessage, error) {
16711671
c.multisearchRequests = append(c.multisearchRequests, r)
1672-
return c.multiSearchResponse, c.multiSearchError
1672+
return c.multiSearchResponse.Responses, c.multiSearchError
16731673
}
16741674

16751675
func newDataQuery(body string) (backend.QueryDataRequest, error) {
@@ -1721,5 +1721,5 @@ func executeElasticsearchDataQuery(c es.Client, body string, from, to time.Time)
17211721
return &backend.QueryDataResponse{}, err
17221722
}
17231723

1724-
return parseResponse(res.Responses, queries, configuredFields)
1724+
return parseResponse(res, queries, configuredFields)
17251725
}

pkg/quickwit/elasticsearch.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ func queryData(ctx context.Context, dataQueries []backend.DataQuery, dsInfo *es.
4545
return &backend.QueryDataResponse{}, err
4646
}
4747

48-
return parseResponse(res.Responses, queries, dsInfo.ConfiguredFields)
48+
return parseResponse(res, queries, dsInfo.ConfiguredFields)
4949
}
5050

5151
func handleQuickwitErrors(err error) (*backend.QueryDataResponse, error) {

pkg/quickwit/response_parser.go

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -40,17 +40,23 @@ const (
4040

4141
var searchWordsRegex = regexp.MustCompile(regexp.QuoteMeta(es.HighlightPreTagsString) + `(.*?)` + regexp.QuoteMeta(es.HighlightPostTagsString))
4242

43-
func parseResponse(responses []*es.SearchResponse, targets []*Query, configuredFields es.ConfiguredFields) (*backend.QueryDataResponse, error) {
43+
func parseResponse(rawResponses []*json.RawMessage, targets []*Query, configuredFields es.ConfiguredFields) (*backend.QueryDataResponse, error) {
4444
result := backend.QueryDataResponse{
4545
Responses: backend.Responses{},
4646
}
47-
if responses == nil {
47+
if rawResponses == nil {
4848
return &result, nil
4949
}
5050

51-
for i, res := range responses {
52-
target := targets[i]
51+
for i, rawRes := range rawResponses {
52+
var res *es.SearchResponse
53+
err := json.Unmarshal([]byte(*rawRes), &res)
54+
if nil != err {
55+
qwlog.Debug("Failed to unmarshal response", "err", err.Error(), "byteRes", *rawRes)
56+
continue
57+
}
5358

59+
target := targets[i]
5460
if res.Error != nil {
5561
errResult := getErrorFromElasticResponse(res)
5662
result.Responses[target.RefID] = backend.DataResponse{

0 commit comments

Comments
 (0)