@@ -4,15 +4,52 @@ import (
4
4
"io"
5
5
"os"
6
6
"reflect"
7
+ "runtime"
8
+ "strconv"
9
+ "strings"
7
10
8
11
opentracing "github.com/opentracing/opentracing-go"
12
+ "github.com/pbnjay/memory"
13
+ "github.com/sirupsen/logrus"
9
14
"gopkg.in/src-d/go-mysql-server.v0/sql"
10
15
)
11
16
12
- const experimentalInMemoryJoinKey = "EXPERIMENTAL_IN_MEMORY_JOIN"
13
- const inMemoryJoinSessionVar = "inmemory_joins"
17
+ const (
18
+ experimentalInMemoryJoinKey = "EXPERIMENTAL_IN_MEMORY_JOIN"
19
+ maxMemoryJoinKey = "MAX_MEMORY_INNER_JOIN"
20
+ inMemoryJoinSessionVar = "inmemory_joins"
21
+ memoryThresholdSessionVar = "max_memory_joins"
22
+ )
23
+
24
+ var (
25
+ useInMemoryJoins = shouldUseMemoryJoinsByEnv ()
26
+ // One fifth of the total physical memory available on the OS (ignoring the
27
+ // memory used by other processes).
28
+ defaultMemoryThreshold = memory .TotalMemory () / 5
29
+ // Maximum amount of memory the gitbase server can have in use before
30
+ // considering all inner joins should be done using multipass mode.
31
+ maxMemoryJoin = loadMemoryThreshold ()
32
+ )
33
+
34
+ func shouldUseMemoryJoinsByEnv () bool {
35
+ v := strings .TrimSpace (strings .ToLower (os .Getenv (experimentalInMemoryJoinKey )))
36
+ return v == "on" || v == "1"
37
+ }
38
+
39
+ func loadMemoryThreshold () uint64 {
40
+ v , ok := os .LookupEnv (maxMemoryJoinKey )
41
+ if ! ok {
42
+ return defaultMemoryThreshold
43
+ }
14
44
15
- var useInMemoryJoins = os .Getenv (experimentalInMemoryJoinKey ) != ""
45
+ n , err := strconv .ParseUint (v , 10 , 64 )
46
+ if err != nil {
47
+ logrus .Warnf ("invalid value %q given to %s environment variable" , v , maxMemoryJoinKey )
48
+ return defaultMemoryThreshold
49
+ }
50
+
51
+ return n
52
+ }
16
53
17
54
// InnerJoin is an inner join between two tables.
18
55
type InnerJoin struct {
@@ -73,27 +110,17 @@ func (j *InnerJoin) RowIter(ctx *sql.Context) (sql.RowIter, error) {
73
110
inMemorySession = true
74
111
}
75
112
76
- var iter sql. RowIter
113
+ var mode = unknownMode
77
114
if useInMemoryJoins || inMemorySession {
78
- r , err := j .Right .RowIter (ctx )
79
- if err != nil {
80
- span .Finish ()
81
- return nil , err
82
- }
115
+ mode = memoryMode
116
+ }
83
117
84
- iter = & innerJoinMemoryIter {
85
- l : l ,
86
- r : r ,
87
- ctx : ctx ,
88
- cond : j .Cond ,
89
- }
90
- } else {
91
- iter = & innerJoinIter {
92
- l : l ,
93
- rp : j .Right ,
94
- ctx : ctx ,
95
- cond : j .Cond ,
96
- }
118
+ iter := & innerJoinIter {
119
+ l : l ,
120
+ rp : j .Right ,
121
+ ctx : ctx ,
122
+ cond : j .Cond ,
123
+ mode : mode ,
97
124
}
98
125
99
126
return sql .NewSpanIter (span , iter ), nil
@@ -156,6 +183,25 @@ func (j *InnerJoin) TransformExpressions(f sql.TransformExprFunc) (sql.Node, err
156
183
return NewInnerJoin (j .Left , j .Right , cond ), nil
157
184
}
158
185
186
+ // innerJoinMode defines the mode in which an inner join will be performed.
187
+ type innerJoinMode byte
188
+
189
+ const (
190
+ // unknownMode is the default mode. It will start iterating without really
191
+ // knowing in which mode it will end up computing the inner join. If it
192
+ // iterates the right side fully one time and so far it fits in memory,
193
+ // then it will switch to memory mode. Otherwise, if at some point during
194
+ // this first iteration it finds that it does not fit in memory, will
195
+ // switch to multipass mode.
196
+ unknownMode innerJoinMode = iota
197
+ // memoryMode computes all the inner join directly in memory iterating each
198
+ // side of the join exactly once.
199
+ memoryMode
200
+ // multipassMode computes the inner join by iterating the left side once,
201
+ // and the right side one time for each row in the left side.
202
+ multipassMode
203
+ )
204
+
159
205
type innerJoinIter struct {
160
206
l sql.RowIter
161
207
rp rowIterProvider
@@ -164,118 +210,140 @@ type innerJoinIter struct {
164
210
cond sql.Expression
165
211
166
212
leftRow sql.Row
167
- }
168
213
169
- func (i * innerJoinIter ) Next () (sql.Row , error ) {
170
- for {
171
- if i .leftRow == nil {
172
- r , err := i .l .Next ()
173
- if err != nil {
174
- return nil , err
175
- }
214
+ // used to compute in-memory
215
+ mode innerJoinMode
216
+ right []sql.Row
217
+ pos int
218
+ }
176
219
177
- i .leftRow = r
220
+ func (i * innerJoinIter ) loadLeft () error {
221
+ if i .leftRow == nil {
222
+ r , err := i .l .Next ()
223
+ if err != nil {
224
+ return err
178
225
}
179
226
180
- if i .r == nil {
181
- iter , err := i .rp .RowIter (i .ctx )
182
- if err != nil {
183
- return nil , err
184
- }
227
+ i .leftRow = r
228
+ }
185
229
186
- i . r = iter
187
- }
230
+ return nil
231
+ }
188
232
189
- rightRow , err := i .r .Next ()
190
- if err == io .EOF {
191
- i .r = nil
192
- i .leftRow = nil
193
- continue
233
+ func (i * innerJoinIter ) loadRightInMemory () error {
234
+ iter , err := i .rp .RowIter (i .ctx )
235
+ if err != nil {
236
+ return err
237
+ }
238
+
239
+ i .right , err = sql .RowIterToRows (iter )
240
+ if err != nil {
241
+ return err
242
+ }
243
+
244
+ if len (i .right ) == 0 {
245
+ return io .EOF
246
+ }
247
+
248
+ return nil
249
+ }
250
+
251
+ func (i * innerJoinIter ) fitsInMemory () bool {
252
+ var maxMemory uint64
253
+ _ , v := i .ctx .Session .Get (memoryThresholdSessionVar )
254
+ if n , ok := v .(int64 ); ok {
255
+ maxMemory = uint64 (n )
256
+ } else {
257
+ maxMemory = maxMemoryJoin
258
+ }
259
+
260
+ if maxMemory <= 0 {
261
+ return true
262
+ }
263
+
264
+ var ms runtime.MemStats
265
+ runtime .ReadMemStats (& ms )
266
+
267
+ return (ms .HeapInuse + ms .StackInuse ) < maxMemory
268
+ }
269
+
270
+ func (i * innerJoinIter ) loadRight () (row sql.Row , skip bool , err error ) {
271
+ if i .mode == memoryMode {
272
+ if len (i .right ) == 0 {
273
+ if err := i .loadRightInMemory (); err != nil {
274
+ return nil , false , err
275
+ }
194
276
}
195
277
196
- if err != nil {
197
- return nil , err
278
+ if i .pos >= len (i .right ) {
279
+ i .leftRow = nil
280
+ i .pos = 0
281
+ return nil , true , nil
198
282
}
199
283
200
- var row = make (sql.Row , len (i .leftRow )+ len (rightRow ))
201
- copy (row , i .leftRow )
202
- copy (row [len (i .leftRow ):], rightRow )
284
+ row := i .right [i .pos ]
285
+ i .pos ++
286
+ return row , false , nil
287
+ }
203
288
204
- v , err := i .cond .Eval (i .ctx , row )
289
+ if i .r == nil {
290
+ iter , err := i .rp .RowIter (i .ctx )
205
291
if err != nil {
206
- return nil , err
292
+ return nil , false , err
207
293
}
208
294
209
- if v == true {
210
- return row , nil
211
- }
295
+ i .r = iter
212
296
}
213
- }
214
297
215
- func (i * innerJoinIter ) Close () error {
216
- if err := i .l .Close (); err != nil {
217
- if i .r != nil {
218
- _ = i .r .Close ()
298
+ rightRow , err := i .r .Next ()
299
+ if err != nil {
300
+ if err == io .EOF {
301
+ i .r = nil
302
+ i .leftRow = nil
303
+
304
+ // If we got to this point and the mode is still unknown it means
305
+ // the right side fits in memory, so the mode changes to memory
306
+ // inner join.
307
+ if i .mode == unknownMode {
308
+ i .mode = memoryMode
309
+ }
310
+
311
+ return nil , true , nil
219
312
}
220
- return err
313
+ return nil , false , err
221
314
}
222
315
223
- if i .r != nil {
224
- return i .r .Close ()
316
+ if i .mode == unknownMode {
317
+ if ! i .fitsInMemory () {
318
+ i .right = nil
319
+ i .mode = multipassMode
320
+ } else {
321
+ i .right = append (i .right , rightRow )
322
+ }
225
323
}
226
324
227
- return nil
228
- }
229
-
230
- type innerJoinMemoryIter struct {
231
- l sql.RowIter
232
- r sql.RowIter
233
- ctx * sql.Context
234
- cond sql.Expression
235
- pos int
236
- leftRow sql.Row
237
- right []sql.Row
325
+ return rightRow , false , err
238
326
}
239
327
240
- func (i * innerJoinMemoryIter ) Next () (sql.Row , error ) {
328
+ func (i * innerJoinIter ) Next () (sql.Row , error ) {
241
329
for {
242
- if i .leftRow == nil {
243
- r , err := i .l .Next ()
244
- if err != nil {
245
- return nil , err
246
- }
247
-
248
- i .leftRow = r
330
+ if err := i .loadLeft (); err != nil {
331
+ return nil , err
249
332
}
250
333
251
- if i .r != nil {
252
- for {
253
- row , err := i .r .Next ()
254
- if err != nil {
255
- if err == io .EOF {
256
- break
257
- }
258
- return nil , err
259
- }
260
-
261
- i .right = append (i .right , row )
262
- }
263
- i .r = nil
334
+ rightRow , skip , err := i .loadRight ()
335
+ if err != nil {
336
+ return nil , err
264
337
}
265
338
266
- if i .pos >= len (i .right ) {
267
- i .pos = 0
268
- i .leftRow = nil
339
+ if skip {
269
340
continue
270
341
}
271
342
272
- rightRow := i .right [i .pos ]
273
343
var row = make (sql.Row , len (i .leftRow )+ len (rightRow ))
274
344
copy (row , i .leftRow )
275
345
copy (row [len (i .leftRow ):], rightRow )
276
346
277
- i .pos ++
278
-
279
347
v , err := i .cond .Eval (i .ctx , row )
280
348
if err != nil {
281
349
return nil , err
@@ -287,7 +355,7 @@ func (i *innerJoinMemoryIter) Next() (sql.Row, error) {
287
355
}
288
356
}
289
357
290
- func (i * innerJoinMemoryIter ) Close () error {
358
+ func (i * innerJoinIter ) Close () error {
291
359
if err := i .l .Close (); err != nil {
292
360
if i .r != nil {
293
361
_ = i .r .Close ()
@@ -299,5 +367,7 @@ func (i *innerJoinMemoryIter) Close() error {
299
367
return i .r .Close ()
300
368
}
301
369
370
+ i .right = nil
371
+
302
372
return nil
303
373
}
0 commit comments