Skip to content

Commit af7f8a4

Browse files
authored
Merge pull request #122 from quickwit-oss/ddelemeny/refactor-multisearch
Refactor multisearch, fix numeric timestamp unmarshalling
2 parents 35f971f + b28e602 commit af7f8a4

13 files changed

+298
-341
lines changed

pkg/quickwit/client/client.go

Lines changed: 63 additions & 119 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@ import (
1212
"strings"
1313
"time"
1414

15-
"github.com/grafana/grafana-plugin-sdk-go/backend"
1615
"github.com/grafana/grafana-plugin-sdk-go/backend/log"
1716
)
1817

@@ -32,6 +31,7 @@ type DatasourceInfo struct {
3231
ShouldInit bool
3332
}
3433

34+
// TODO: Move ConfiguredFields closer to handlers, the client layer doesn't need this stuff
3535
type ConfiguredFields struct {
3636
TimeField string
3737
TimeOutputFormat string
@@ -41,85 +41,29 @@ type ConfiguredFields struct {
4141

4242
// Client represents a client which can interact with elasticsearch api
4343
type Client interface {
44-
GetConfiguredFields() ConfiguredFields
45-
ExecuteMultisearch(r *MultiSearchRequest) (*MultiSearchResponse, error)
46-
MultiSearch() *MultiSearchRequestBuilder
44+
ExecuteMultisearch(r []*SearchRequest) ([]*json.RawMessage, error)
4745
}
4846

47+
var logger = log.New()
48+
4949
// NewClient creates a new Quickwit client
50-
var NewClient = func(ctx context.Context, ds *DatasourceInfo, timeRange backend.TimeRange) (Client, error) {
51-
logger := log.New()
52-
logger.Debug("Creating new client", "configuredFields", fmt.Sprintf("%#v", ds.ConfiguredFields), "index", ds.Database)
50+
var NewClient = func(ctx context.Context, ds *DatasourceInfo) (Client, error) {
51+
logger.Debug("Creating new client", "index", ds.Database)
5352

5453
return &baseClientImpl{
55-
logger: logger,
56-
ctx: ctx,
57-
ds: ds,
58-
configuredFields: ds.ConfiguredFields,
59-
index: ds.Database,
60-
timeRange: timeRange,
54+
ctx: ctx,
55+
ds: ds,
56+
index: ds.Database,
6157
}, nil
6258
}
6359

6460
type baseClientImpl struct {
65-
ctx context.Context
66-
ds *DatasourceInfo
67-
configuredFields ConfiguredFields
68-
index string
69-
timeRange backend.TimeRange
70-
logger log.Logger
71-
}
72-
73-
func (c *baseClientImpl) GetConfiguredFields() ConfiguredFields {
74-
return c.configuredFields
75-
}
76-
77-
type multiRequest struct {
78-
header map[string]interface{}
79-
body interface{}
80-
interval time.Duration
81-
}
82-
83-
func (c *baseClientImpl) executeBatchRequest(uriPath, uriQuery string, requests []*multiRequest) (*http.Response, error) {
84-
bytes, err := c.encodeBatchRequests(requests)
85-
if err != nil {
86-
return nil, err
87-
}
88-
return c.executeRequest(http.MethodPost, uriPath, uriQuery, bytes)
89-
}
90-
91-
func (c *baseClientImpl) encodeBatchRequests(requests []*multiRequest) ([]byte, error) {
92-
c.logger.Debug("Encoding batch requests to json", "batch requests", len(requests))
93-
start := time.Now()
94-
95-
payload := bytes.Buffer{}
96-
for _, r := range requests {
97-
reqHeader, err := json.Marshal(r.header)
98-
if err != nil {
99-
return nil, err
100-
}
101-
payload.WriteString(string(reqHeader) + "\n")
102-
103-
reqBody, err := json.Marshal(r.body)
104-
105-
if err != nil {
106-
return nil, err
107-
}
108-
109-
body := string(reqBody)
110-
body = strings.ReplaceAll(body, "$__interval_ms", strconv.FormatInt(r.interval.Milliseconds(), 10))
111-
body = strings.ReplaceAll(body, "$__interval", r.interval.String())
112-
113-
payload.WriteString(body + "\n")
114-
}
115-
116-
elapsed := time.Since(start)
117-
c.logger.Debug("Encoded batch requests to json", "took", elapsed)
118-
119-
return payload.Bytes(), nil
61+
ctx context.Context
62+
ds *DatasourceInfo
63+
index string
12064
}
12165

122-
func (c *baseClientImpl) executeRequest(method, uriPath, uriQuery string, body []byte) (*http.Response, error) {
66+
func (c *baseClientImpl) makeRequest(method, uriPath, uriQuery string, body []byte) (*http.Request, error) {
12367
u, err := url.Parse(c.ds.URL)
12468
if err != nil {
12569
return nil, err
@@ -136,59 +80,49 @@ func (c *baseClientImpl) executeRequest(method, uriPath, uriQuery string, body [
13680
if err != nil {
13781
return nil, err
13882
}
139-
140-
c.logger.Debug("Executing request", "url", req.URL.String(), "method", method)
141-
14283
req.Header.Set("Content-Type", "application/x-ndjson")
84+
return req, nil
85+
}
14386

144-
start := time.Now()
145-
defer func() {
146-
elapsed := time.Since(start)
147-
c.logger.Debug("Executed request", "took", elapsed)
148-
}()
149-
//nolint:bodyclose
150-
resp, err := c.ds.HTTPClient.Do(req)
87+
// Multisearch uses a shallow unmarshalled struct to defer the decoding to downstream handlers
88+
type MultiSearchResponse struct {
89+
Responses []*json.RawMessage `json:"responses"`
90+
}
91+
92+
func (c *baseClientImpl) ExecuteMultisearch(requests []*SearchRequest) ([]*json.RawMessage, error) {
93+
req, err := c.createMultiSearchRequest(requests, c.index)
15194
if err != nil {
15295
return nil, err
15396
}
15497

155-
return resp, nil
156-
}
157-
158-
func (c *baseClientImpl) ExecuteMultisearch(r *MultiSearchRequest) (*MultiSearchResponse, error) {
159-
c.logger.Debug("Executing multisearch", "search requests", r.Requests)
160-
161-
multiRequests := c.createMultiSearchRequests(r.Requests)
162-
queryParams := c.getMultiSearchQueryParameters()
163-
clientRes, err := c.executeBatchRequest("_elastic/_msearch", queryParams, multiRequests)
98+
res, err := c.ds.HTTPClient.Do(req)
16499
if err != nil {
165100
return nil, err
166101
}
167-
res := clientRes
168102
defer func() {
169103
if err := res.Body.Close(); err != nil {
170-
c.logger.Warn("Failed to close response body", "err", err)
104+
logger.Warn("Failed to close response body", "err", err)
171105
}
172106
}()
173107

174-
c.logger.Debug("Received multisearch response", "code", res.StatusCode, "status", res.Status, "content-length", res.ContentLength)
108+
logger.Debug("Received multisearch response", "code", res.StatusCode, "status", res.Status, "content-length", res.ContentLength)
175109

176110
if res.StatusCode >= 400 {
177111
qe := QuickwitQueryError{
178112
Status: res.StatusCode,
179113
Message: "Error on multisearch",
180114
ResponseBody: res.Body,
181-
QueryParam: queryParams,
182-
RequestBody: r.Requests,
115+
QueryParam: req.URL.RawQuery,
116+
RequestBody: requests,
183117
}
184118

185119
errorPayload, _ := json.Marshal(qe)
186-
c.logger.Error(string(errorPayload))
120+
logger.Error(string(errorPayload))
187121
return nil, fmt.Errorf(string(errorPayload))
188122
}
189123

190124
start := time.Now()
191-
c.logger.Debug("Decoding multisearch json response")
125+
logger.Debug("Decoding multisearch json response")
192126

193127
var msr MultiSearchResponse
194128
dec := json.NewDecoder(res.Body)
@@ -198,43 +132,53 @@ func (c *baseClientImpl) ExecuteMultisearch(r *MultiSearchRequest) (*MultiSearch
198132
}
199133

200134
elapsed := time.Since(start)
201-
c.logger.Debug("Decoded multisearch json response", "took", elapsed)
135+
logger.Debug("Decoded multisearch json response", "took", elapsed)
202136

203-
msr.Status = res.StatusCode
204-
205-
return &msr, nil
137+
return msr.Responses, nil
206138
}
207139

208-
func (c *baseClientImpl) createMultiSearchRequests(searchRequests []*SearchRequest) []*multiRequest {
209-
multiRequests := []*multiRequest{}
210-
211-
for _, searchReq := range searchRequests {
212-
mr := multiRequest{
213-
header: map[string]interface{}{
214-
"ignore_unavailable": true,
215-
"index": strings.Split(c.index, ","),
216-
},
217-
body: searchReq,
218-
interval: searchReq.Interval,
140+
func (c *baseClientImpl) makeMultiSearchPayload(searchRequests []*SearchRequest, index string) ([]byte, error) {
141+
// Format, marshall and interpolate
142+
payload := bytes.Buffer{}
143+
for _, r := range searchRequests {
144+
header := map[string]interface{}{
145+
"ignore_unavailable": true,
146+
"index": strings.Split(index, ","),
219147
}
148+
reqHeader, err := json.Marshal(header)
149+
if err != nil {
150+
return nil, err
151+
}
152+
payload.WriteString(string(reqHeader) + "\n")
220153

221-
multiRequests = append(multiRequests, &mr)
222-
}
154+
reqBody, err := json.Marshal(r)
155+
156+
if err != nil {
157+
return nil, err
158+
}
159+
160+
body := string(reqBody)
161+
body = strings.ReplaceAll(body, "$__interval_ms", strconv.FormatInt(r.Interval.Milliseconds(), 10))
162+
body = strings.ReplaceAll(body, "$__interval", r.Interval.String())
223163

224-
return multiRequests
164+
payload.WriteString(body + "\n")
165+
}
166+
return payload.Bytes(), nil
225167
}
226168

227-
func (c *baseClientImpl) getMultiSearchQueryParameters() string {
228-
var qs []string
169+
func (c *baseClientImpl) createMultiSearchRequest(requests []*SearchRequest, index string) (*http.Request, error) {
170+
body, err := c.makeMultiSearchPayload(requests, index)
171+
if err != nil {
172+
return nil, err
173+
}
229174

175+
var qs []string
230176
maxConcurrentShardRequests := c.ds.MaxConcurrentShardRequests
231177
if maxConcurrentShardRequests == 0 {
232178
maxConcurrentShardRequests = 5
233179
}
234180
qs = append(qs, fmt.Sprintf("max_concurrent_shard_requests=%d", maxConcurrentShardRequests))
235-
return strings.Join(qs, "&")
236-
}
181+
queryParams := strings.Join(qs, "&")
237182

238-
func (c *baseClientImpl) MultiSearch() *MultiSearchRequestBuilder {
239-
return NewMultiSearchRequestBuilder()
183+
return c.makeRequest(http.MethodPost, "_elastic/_msearch", queryParams, body)
240184
}

pkg/quickwit/client/client_test.go

Lines changed: 5 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@ import (
99
"testing"
1010
"time"
1111

12-
"github.com/grafana/grafana-plugin-sdk-go/backend"
1312
"github.com/stretchr/testify/assert"
1413
"github.com/stretchr/testify/require"
1514

@@ -56,22 +55,15 @@ func TestClient_ExecuteMultisearch(t *testing.T) {
5655
MaxConcurrentShardRequests: 6,
5756
}
5857

59-
from := time.Date(2018, 5, 15, 17, 50, 0, 0, time.UTC)
60-
to := time.Date(2018, 5, 15, 17, 55, 0, 0, time.UTC)
61-
timeRange := backend.TimeRange{
62-
From: from,
63-
To: to,
64-
}
65-
66-
c, err := NewClient(context.Background(), &ds, timeRange)
58+
c, err := NewClient(context.Background(), &ds)
6759
require.NoError(t, err)
6860
require.NotNil(t, c)
6961

7062
t.Cleanup(func() {
7163
ts.Close()
7264
})
7365

74-
ms, err := createMultisearchForTest(t, c)
66+
ms, err := createMultisearchForTest(t)
7567
require.NoError(t, err)
7668
res, err := c.ExecuteMultisearch(ms)
7769
require.NoError(t, err)
@@ -102,15 +94,14 @@ func TestClient_ExecuteMultisearch(t *testing.T) {
10294

10395
assert.Equal(t, "15s", jBody.GetPath("aggs", "2", "date_histogram", "fixed_interval").MustString())
10496

105-
assert.Equal(t, 200, res.Status)
106-
require.Len(t, res.Responses, 1)
97+
require.Len(t, res, 1)
10798
})
10899
}
109100

110-
func createMultisearchForTest(t *testing.T, c Client) (*MultiSearchRequest, error) {
101+
func createMultisearchForTest(t *testing.T) ([]*SearchRequest, error) {
111102
t.Helper()
112103

113-
msb := c.MultiSearch()
104+
msb := NewMultiSearchRequestBuilder()
114105
s := msb.Search(15 * time.Second)
115106
s.Agg().DateHistogram("2", "@timestamp", func(a *DateHistogramAgg, ab AggBuilder) {
116107
a.FixedInterval = "$__interval"

pkg/quickwit/client/models.go

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -59,17 +59,6 @@ type SearchResponse struct {
5959
Hits *SearchResponseHits `json:"hits"`
6060
}
6161

62-
// MultiSearchRequest represents a multi search request
63-
type MultiSearchRequest struct {
64-
Requests []*SearchRequest
65-
}
66-
67-
// MultiSearchResponse represents a multi search response
68-
type MultiSearchResponse struct {
69-
Status int `json:"status,omitempty"`
70-
Responses []*SearchResponse `json:"responses"`
71-
}
72-
7362
// Query represents a query
7463
type Query struct {
7564
Bool *BoolQuery `json:"bool"`

pkg/quickwit/client/search_request.go

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,7 @@ func (m *MultiSearchRequestBuilder) Search(interval time.Duration) *SearchReques
143143
}
144144

145145
// Build builds and return a multi search request
146-
func (m *MultiSearchRequestBuilder) Build() (*MultiSearchRequest, error) {
146+
func (m *MultiSearchRequestBuilder) Build() ([]*SearchRequest, error) {
147147
requests := []*SearchRequest{}
148148
for _, sb := range m.requestBuilders {
149149
searchRequest, err := sb.Build()
@@ -153,9 +153,7 @@ func (m *MultiSearchRequestBuilder) Build() (*MultiSearchRequest, error) {
153153
requests = append(requests, searchRequest)
154154
}
155155

156-
return &MultiSearchRequest{
157-
Requests: requests,
158-
}, nil
156+
return requests, nil
159157
}
160158

161159
// QueryBuilder represents a query builder

pkg/quickwit/client/search_request_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -404,7 +404,7 @@ func TestMultiSearchRequest(t *testing.T) {
404404
t.Run("When building search request should contain one search request", func(t *testing.T) {
405405
mr, err := b.Build()
406406
require.Nil(t, err)
407-
require.Equal(t, 1, len(mr.Requests))
407+
require.Equal(t, 1, len(mr))
408408
})
409409
})
410410

@@ -416,7 +416,7 @@ func TestMultiSearchRequest(t *testing.T) {
416416
t.Run("When building search request should contain two search requests", func(t *testing.T) {
417417
mr, err := b.Build()
418418
require.Nil(t, err)
419-
require.Equal(t, 2, len(mr.Requests))
419+
require.Equal(t, 2, len(mr))
420420
})
421421
})
422422
}

0 commit comments

Comments
 (0)