Skip to content

Commit 857a531

Browse files
authored
feat(client): add support for "off" value in auto_flush_rows and auto_flush_interval (#37)
1 parent 4776744 commit 857a531

File tree

4 files changed

+97
-1
lines changed

4 files changed

+97
-1
lines changed

conf_parse.go

+8
Original file line numberDiff line numberDiff line change
@@ -98,12 +98,20 @@ func confFromStr(conf string) (*lineSenderConfig, error) {
9898
return nil, NewInvalidConfigStrError("invalid %s value, %q is not 'on' or 'off'", k, v)
9999
}
100100
case "auto_flush_rows":
101+
if v == "off" {
102+
senderConf.autoFlushRows = 0
103+
continue
104+
}
101105
parsedVal, err := strconv.Atoi(v)
102106
if err != nil {
103107
return nil, NewInvalidConfigStrError("invalid %s value, %q is not a valid int", k, v)
104108
}
105109
senderConf.autoFlushRows = parsedVal
106110
case "auto_flush_interval":
111+
if v == "off" {
112+
senderConf.autoFlushInterval = 0
113+
continue
114+
}
107115
parsedVal, err := strconv.Atoi(v)
108116
if err != nil {
109117
return nil, NewInvalidConfigStrError("invalid %s value, %q is not a valid int", k, v)

conf_test.go

+22
Original file line numberDiff line numberDiff line change
@@ -403,6 +403,28 @@ func TestHappyCasesFromConf(t *testing.T) {
403403
qdb.WithAutoFlushInterval(1000),
404404
},
405405
},
406+
{
407+
name: "auto flush interval off",
408+
config: fmt.Sprintf("http::addr=%s;auto_flush_rows=100;auto_flush_interval=off;",
409+
addr),
410+
expectedOpts: []qdb.LineSenderOption{
411+
qdb.WithHttp(),
412+
qdb.WithAddress(addr),
413+
qdb.WithAutoFlushRows(100),
414+
qdb.WithAutoFlushInterval(0),
415+
},
416+
},
417+
{
418+
name: "auto flush rows off",
419+
config: fmt.Sprintf("http::addr=%s;auto_flush_rows=off;auto_flush_interval=1000;",
420+
addr),
421+
expectedOpts: []qdb.LineSenderOption{
422+
qdb.WithHttp(),
423+
qdb.WithAddress(addr),
424+
qdb.WithAutoFlushRows(0),
425+
qdb.WithAutoFlushInterval(1000),
426+
},
427+
},
406428
}
407429

408430
for _, tc := range testCases {

http_sender_test.go

+65
Original file line numberDiff line numberDiff line change
@@ -454,6 +454,71 @@ func TestTimeBasedAutoFlush(t *testing.T) {
454454
assert.Equal(t, 0, qdb.MsgCount(sender))
455455
}
456456

457+
func TestTimeBasedAutoFlushWithRowBasedFlushDisabled(t *testing.T) {
458+
ctx := context.Background()
459+
autoFlushInterval := 10 * time.Millisecond
460+
461+
srv, err := newTestHttpServer(readAndDiscard)
462+
assert.NoError(t, err)
463+
defer srv.Close()
464+
465+
sender, err := qdb.NewLineSender(
466+
ctx,
467+
qdb.WithHttp(),
468+
qdb.WithAddress(srv.Addr()),
469+
qdb.WithAutoFlushRows(0),
470+
qdb.WithAutoFlushInterval(autoFlushInterval),
471+
)
472+
assert.NoError(t, err)
473+
defer sender.Close(ctx)
474+
475+
// Send a message and ensure it's buffered
476+
err = sender.Table(testTable).StringColumn("bar", "baz").AtNow(ctx)
477+
assert.NoError(t, err)
478+
assert.Equal(t, 1, qdb.MsgCount(sender))
479+
480+
time.Sleep(2 * autoFlushInterval)
481+
482+
// Send one additional message and ensure that both messages are flushed
483+
err = sender.Table(testTable).StringColumn("bar", "baz").AtNow(ctx)
484+
assert.NoError(t, err)
485+
486+
assert.Equal(t, 0, qdb.MsgCount(sender))
487+
}
488+
489+
func TestRowBasedAutoFlushWithTimeBasedFlushDisabled(t *testing.T) {
490+
ctx := context.Background()
491+
autoFlushRows := 1000
492+
493+
srv, err := newTestHttpServer(readAndDiscard)
494+
assert.NoError(t, err)
495+
defer srv.Close()
496+
497+
sender, err := qdb.NewLineSender(
498+
ctx,
499+
qdb.WithHttp(),
500+
qdb.WithAddress(srv.Addr()),
501+
qdb.WithAutoFlushRows(autoFlushRows),
502+
qdb.WithAutoFlushInterval(0),
503+
)
504+
assert.NoError(t, err)
505+
defer sender.Close(ctx)
506+
507+
// Send autoFlushRows - 1 messages and ensure all are buffered
508+
for i := 0; i < autoFlushRows-1; i++ {
509+
err = sender.Table(testTable).StringColumn("bar", "baz").AtNow(ctx)
510+
assert.NoError(t, err)
511+
}
512+
513+
assert.Equal(t, autoFlushRows-1, qdb.MsgCount(sender))
514+
515+
// Send one additional message and ensure that all are flushed
516+
err = sender.Table(testTable).StringColumn("bar", "baz").AtNow(ctx)
517+
assert.NoError(t, err)
518+
519+
assert.Equal(t, 0, qdb.MsgCount(sender))
520+
}
521+
457522
func TestNoFlushWhenAutoFlushDisabled(t *testing.T) {
458523
ctx := context.Background()
459524
autoFlushRows := 10

sender.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -440,7 +440,8 @@ func LineSenderFromEnv(ctx context.Context) (LineSender, error) {
440440
// password: for basic authentication
441441
// token: bearer token auth (used instead of basic authentication)
442442
// auto_flush: determines if auto-flushing is enabled (values "on" or "off", defaults to "on")
443-
// auto_flush_rows: auto-flushing is triggered above this row count (defaults to 75000). If set, explicitly implies auto_flush=on
443+
// auto_flush_rows: auto-flushing is triggered above this row count (defaults to 75000). If set, explicitly implies auto_flush=on. Set to 'off' to disable.
444+
//auto_flush_interval auto-flushing is triggered above this time (defaults to 1000 milliseconds). If set, explicitly implies auto_flush=on. Set to 'off' to disable.
444445
// request_min_throughput: bytes per second, used to calculate each request's timeout (defaults to 100KiB/s)
445446
// request_timeout: minimum request timeout in milliseconds (defaults to 10 seconds)
446447
// retry_timeout: cumulative maximum millisecond duration spent in retries (defaults to 10 seconds)

0 commit comments

Comments
 (0)