@@ -8,7 +8,10 @@ import (
8
8
"io/ioutil"
9
9
"os"
10
10
"path/filepath"
11
+ "runtime"
11
12
"strings"
13
+ "sync"
14
+ "sync/atomic"
12
15
"time"
13
16
14
17
opentracing "github.com/opentracing/opentracing-go"
@@ -217,13 +220,13 @@ func (d *Driver) savePartition(
217
220
kviter sql.IndexKeyValueIter ,
218
221
idx * pilosaIndex ,
219
222
pilosaIndex * concurrentPilosaIndex ,
220
- offset uint64 ,
221
223
b * batch ,
222
224
) (uint64 , error ) {
223
225
var (
224
226
colID uint64
225
227
err error
226
228
)
229
+
227
230
for i , e := range idx .Expressions () {
228
231
name := fieldName (idx .ID (), e , p )
229
232
pilosaIndex .DeleteField (name )
@@ -254,7 +257,7 @@ func (d *Driver) savePartition(
254
257
kviter .Close ()
255
258
}()
256
259
257
- for colID = offset ; err == nil ; colID ++ {
260
+ for colID = 0 ; err == nil ; colID ++ {
258
261
// commit each batch of objects (pilosa and boltdb)
259
262
if colID % sql .IndexBatchSize == 0 && colID != 0 {
260
263
if err = d .saveBatch (ctx , idx .mapping , colID , b ); err != nil {
@@ -265,28 +268,30 @@ func (d *Driver) savePartition(
265
268
select {
266
269
case <- ctx .Context .Done ():
267
270
return 0 , ctx .Context .Err ()
268
-
269
271
default :
270
- var (
271
- values []interface {}
272
- location []byte
273
- )
274
- if values , location , err = kviter .Next (); err != nil {
275
- break
276
- }
272
+ }
277
273
278
- for i , field := range b . fields {
279
- if values [ i ] = = nil {
280
- continue
281
- }
274
+ values , location , err := kviter . Next ()
275
+ if err ! = nil {
276
+ break
277
+ }
282
278
283
- rowID , err := idx .mapping .getRowID (field .Name (), values [i ])
284
- if err != nil {
285
- return 0 , err
286
- }
287
- b .bitBatches [i ].Add (rowID , colID )
279
+ for i , field := range b .fields {
280
+ if values [i ] == nil {
281
+ continue
288
282
}
289
- err = idx .mapping .putLocation (pilosaIndex .Name (), colID , location )
283
+
284
+ rowID , err := idx .mapping .getRowID (field .Name (), values [i ])
285
+ if err != nil {
286
+ return 0 , err
287
+ }
288
+
289
+ b .bitBatches [i ].Add (rowID , colID )
290
+ }
291
+
292
+ err = idx .mapping .putLocation (pilosaIndex .Name (), p , colID , location )
293
+ if err != nil {
294
+ return 0 , err
290
295
}
291
296
}
292
297
@@ -307,7 +312,7 @@ func (d *Driver) savePartition(
307
312
}
308
313
}
309
314
310
- return colID - offset , err
315
+ return colID , err
311
316
}
312
317
313
318
// Save the given index (mapping and bitmap)
@@ -331,44 +336,86 @@ func (d *Driver) Save(
331
336
idx .wg .Add (1 )
332
337
defer idx .wg .Done ()
333
338
334
- var b = batch {
335
- fields : make ([]* pilosa.Field , len (idx .Expressions ())),
336
- bitBatches : make ([]* bitBatch , len (idx .Expressions ())),
337
- }
338
-
339
339
ctx .Context , idx .cancel = context .WithCancel (ctx .Context )
340
340
processingFile := d .processingFilePath (i .Database (), i .Table (), i .ID ())
341
- if err : = index .WriteProcessingFile (
341
+ err = index .WriteProcessingFile (
342
342
processingFile ,
343
343
[]byte {processingFileOnSave },
344
- ); err != nil {
344
+ )
345
+ if err != nil {
345
346
return err
346
347
}
347
348
348
349
defer iter .Close ()
349
350
pilosaIndex := idx .index
350
- var rows uint64
351
+
352
+ var (
353
+ rows , timePilosa , timeMapping uint64
354
+
355
+ wg sync.WaitGroup
356
+ tokens = make (chan struct {}, runtime .NumCPU ())
357
+
358
+ errors []error
359
+ errmut sync.Mutex
360
+ )
361
+
351
362
for {
363
+ select {
364
+ case <- ctx .Done ():
365
+ return
366
+ default :
367
+ }
368
+
352
369
p , kviter , err := iter .Next ()
353
370
if err != nil {
354
371
if err == io .EOF {
355
372
break
356
373
}
357
- return err
358
- }
359
374
360
- numRows , err := d . savePartition ( ctx , p , kviter , idx , pilosaIndex , rows , & b )
361
- if err != nil {
375
+ idx . cancel ( )
376
+ wg . Wait ()
362
377
return err
363
378
}
364
379
365
- rows += numRows
380
+ wg .Add (1 )
381
+
382
+ go func () {
383
+ defer func () {
384
+ wg .Done ()
385
+ <- tokens
386
+ }()
387
+
388
+ tokens <- struct {}{}
389
+
390
+ var b = & batch {
391
+ fields : make ([]* pilosa.Field , len (idx .Expressions ())),
392
+ bitBatches : make ([]* bitBatch , len (idx .Expressions ())),
393
+ }
394
+
395
+ numRows , err := d .savePartition (ctx , p , kviter , idx , pilosaIndex , b )
396
+ if err != nil {
397
+ errmut .Lock ()
398
+ errors = append (errors , err )
399
+ idx .cancel ()
400
+ errmut .Unlock ()
401
+ return
402
+ }
403
+
404
+ atomic .AddUint64 (& timeMapping , uint64 (b .timeMapping ))
405
+ atomic .AddUint64 (& timePilosa , uint64 (b .timePilosa ))
406
+ atomic .AddUint64 (& rows , numRows )
407
+ }()
408
+ }
409
+
410
+ wg .Wait ()
411
+ if len (errors ) > 0 {
412
+ return errors [0 ]
366
413
}
367
414
368
415
logrus .WithFields (logrus.Fields {
369
416
"duration" : time .Since (start ),
370
- "pilosa" : b . timePilosa ,
371
- "mapping" : b . timeMapping ,
417
+ "pilosa" : timePilosa ,
418
+ "mapping" : timeMapping ,
372
419
"rows" : rows ,
373
420
"id" : i .ID (),
374
421
}).Debugf ("finished pilosa indexing" )
@@ -421,18 +468,18 @@ func (d *Driver) Delete(i sql.Index, partitions sql.PartitionIter) error {
421
468
return partitions .Close ()
422
469
}
423
470
424
- func (d * Driver ) saveBatch (ctx * sql.Context , m * mapping , colID uint64 , b * batch ) error {
425
- err := d .savePilosa (ctx , colID , b )
471
+ func (d * Driver ) saveBatch (ctx * sql.Context , m * mapping , cols uint64 , b * batch ) error {
472
+ err := d .savePilosa (ctx , cols , b )
426
473
if err != nil {
427
474
return err
428
475
}
429
476
430
- return d .saveMapping (ctx , m , colID , true , b )
477
+ return d .saveMapping (ctx , m , cols , true , b )
431
478
}
432
479
433
- func (d * Driver ) savePilosa (ctx * sql.Context , colID uint64 , b * batch ) error {
480
+ func (d * Driver ) savePilosa (ctx * sql.Context , cols uint64 , b * batch ) error {
434
481
span , _ := ctx .Span ("pilosa.Save.bitBatch" ,
435
- opentracing.Tag {Key : "cols" , Value : colID },
482
+ opentracing.Tag {Key : "cols" , Value : cols },
436
483
opentracing.Tag {Key : "fields" , Value : len (b .fields )},
437
484
)
438
485
defer span .Finish ()
@@ -457,12 +504,12 @@ func (d *Driver) savePilosa(ctx *sql.Context, colID uint64, b *batch) error {
457
504
func (d * Driver ) saveMapping (
458
505
ctx * sql.Context ,
459
506
m * mapping ,
460
- colID uint64 ,
507
+ cols uint64 ,
461
508
cont bool ,
462
509
b * batch ,
463
510
) error {
464
511
span , _ := ctx .Span ("pilosa.Save.mapping" ,
465
- opentracing.Tag {Key : "cols" , Value : colID },
512
+ opentracing.Tag {Key : "cols" , Value : cols },
466
513
opentracing.Tag {Key : "continues" , Value : cont },
467
514
)
468
515
defer span .Finish ()
0 commit comments