Skip to content

Commit f7f593f

Browse files
authored
fix(conf): disable auto_flush when set to off in config (#51)
1 parent 1532472 commit f7f593f

File tree

5 files changed

+120
-56
lines changed

5 files changed

+120
-56
lines changed

conf_parse.go

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ type configData struct {
3737
}
3838

3939
func confFromStr(conf string) (*lineSenderConfig, error) {
40-
senderConf := &lineSenderConfig{}
40+
var senderConf *lineSenderConfig
4141

4242
data, err := parseConfigStr(conf)
4343
if err != nil {
@@ -46,14 +46,14 @@ func confFromStr(conf string) (*lineSenderConfig, error) {
4646

4747
switch data.Schema {
4848
case "http":
49-
senderConf.senderType = httpSenderType
49+
senderConf = newLineSenderConfig(httpSenderType)
5050
case "https":
51-
senderConf.senderType = httpSenderType
51+
senderConf = newLineSenderConfig(httpSenderType)
5252
senderConf.tlsMode = tlsEnabled
5353
case "tcp":
54-
senderConf.senderType = tcpSenderType
54+
senderConf = newLineSenderConfig(tcpSenderType)
5555
case "tcps":
56-
senderConf.senderType = tcpSenderType
56+
senderConf = newLineSenderConfig(tcpSenderType)
5757
senderConf.tlsMode = tlsEnabled
5858
default:
5959
return nil, fmt.Errorf("invalid schema: %s", data.Schema)
@@ -90,6 +90,7 @@ func confFromStr(conf string) (*lineSenderConfig, error) {
9090
case "token_y":
9191
// Some clients require public key.
9292
// But since Go sender doesn't need it, we ignore the values.
93+
continue
9394
case "auto_flush":
9495
if v == "off" {
9596
senderConf.autoFlushRows = 0

conf_test.go

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -432,7 +432,15 @@ func TestHappyCasesFromConf(t *testing.T) {
432432
actual, err := qdb.ConfFromStr(tc.config)
433433
assert.NoError(t, err)
434434

435-
expected := &qdb.LineSenderConfig{}
435+
var expected *qdb.LineSenderConfig
436+
switch tc.config[0] {
437+
case 'h':
438+
expected = qdb.NewLineSenderConfig(qdb.HttpSenderType)
439+
case 't':
440+
expected = qdb.NewLineSenderConfig(qdb.TcpSenderType)
441+
default:
442+
assert.FailNow(t, "happy case configs must start with either 'http' or 'tcp'")
443+
}
436444
for _, opt := range tc.expectedOpts {
437445
opt(expected)
438446
}

export_test.go

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,10 +29,16 @@ type (
2929
ConfigData = configData
3030
TcpLineSender = tcpLineSender
3131
LineSenderConfig = lineSenderConfig
32+
SenderType = senderType
3233
)
3334

3435
var (
35-
GlobalTransport = globalTransport
36+
GlobalTransport = globalTransport
37+
NoSenderType SenderType = noSenderType
38+
HttpSenderType SenderType = httpSenderType
39+
TcpSenderType SenderType = tcpSenderType
40+
DefaultAutoFlushInterval = defaultAutoFlushInterval
41+
DefaultAutoFlushRows = defaultAutoFlushRows
3642
)
3743

3844
func NewBuffer(initBufSize int, maxBufSize int, fileNameLimit int) Buffer {
@@ -76,3 +82,7 @@ func BufLen(s LineSender) int {
7682
}
7783
panic("unexpected struct")
7884
}
85+
86+
func NewLineSenderConfig(t SenderType) *LineSenderConfig {
87+
return newLineSenderConfig(t)
88+
}

http_sender_test.go

Lines changed: 45 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -502,6 +502,12 @@ func TestRowBasedAutoFlushWithTimeBasedFlushDisabled(t *testing.T) {
502502

503503
assert.Equal(t, autoFlushRows-1, qdb.MsgCount(sender))
504504

505+
// Sleep past the default interval
506+
time.Sleep(qdb.DefaultAutoFlushInterval + time.Millisecond)
507+
508+
// Check that the number of messages hasn't changed
509+
assert.Equal(t, autoFlushRows-1, qdb.MsgCount(sender))
510+
505511
// Send one additional message and ensure that all are flushed
506512
err = sender.Table(testTable).StringColumn("bar", "baz").AtNow(ctx)
507513
assert.NoError(t, err)
@@ -511,8 +517,6 @@ func TestRowBasedAutoFlushWithTimeBasedFlushDisabled(t *testing.T) {
511517

512518
func TestNoFlushWhenAutoFlushDisabled(t *testing.T) {
513519
ctx := context.Background()
514-
autoFlushRows := 10
515-
autoFlushInterval := time.Duration(autoFlushRows-1) * time.Millisecond
516520

517521
srv, err := newTestHttpServer(readAndDiscard)
518522
assert.NoError(t, err)
@@ -524,21 +528,54 @@ func TestNoFlushWhenAutoFlushDisabled(t *testing.T) {
524528
ctx,
525529
qdb.WithHttp(),
526530
qdb.WithAddress(srv.Addr()),
527-
qdb.WithAutoFlushRows(autoFlushRows),
528-
qdb.WithAutoFlushInterval(autoFlushInterval),
531+
qdb.WithAutoFlushRows(qdb.DefaultAutoFlushRows),
532+
qdb.WithAutoFlushInterval(qdb.DefaultAutoFlushInterval),
529533
qdb.WithAutoFlushDisabled(),
530534
)
531535
assert.NoError(t, err)
532536
defer sender.Close(ctx)
533537

534538
// Send autoFlushRows + 1 messages and ensure all are buffered
535-
for i := 0; i < autoFlushRows+1; i++ {
539+
for i := 0; i < qdb.DefaultAutoFlushRows+1; i++ {
536540
err = sender.Table(testTable).StringColumn("bar", "baz").AtNow(ctx)
537541
assert.NoError(t, err)
538-
time.Sleep(time.Millisecond)
539542
}
540543

541-
assert.Equal(t, autoFlushRows+1, qdb.MsgCount(sender))
544+
// Sleep past the default interval
545+
time.Sleep(qdb.DefaultAutoFlushInterval + time.Millisecond)
546+
547+
assert.Equal(t, qdb.DefaultAutoFlushRows+1, qdb.MsgCount(sender))
548+
}
549+
550+
func TestNoFlushWhenAutoFlushRowsAndIntervalAre0(t *testing.T) {
551+
ctx := context.Background()
552+
553+
srv, err := newTestHttpServer(readAndDiscard)
554+
assert.NoError(t, err)
555+
defer srv.Close()
556+
557+
// opts are processed sequentially, so AutoFlushDisabled will
558+
// override AutoFlushRows
559+
sender, err := qdb.NewLineSender(
560+
ctx,
561+
qdb.WithHttp(),
562+
qdb.WithAddress(srv.Addr()),
563+
qdb.WithAutoFlushRows(0),
564+
qdb.WithAutoFlushInterval(0),
565+
)
566+
assert.NoError(t, err)
567+
defer sender.Close(ctx)
568+
569+
// Send autoFlushRows + 1 messages and ensure all are buffered
570+
for i := 0; i < qdb.DefaultAutoFlushRows+1; i++ {
571+
err = sender.Table(testTable).StringColumn("bar", "baz").AtNow(ctx)
572+
assert.NoError(t, err)
573+
}
574+
575+
// Sleep past the default interval
576+
time.Sleep(qdb.DefaultAutoFlushInterval + time.Millisecond)
577+
578+
assert.Equal(t, qdb.DefaultAutoFlushRows+1, qdb.MsgCount(sender))
542579
}
543580

544581
func TestSenderDoubleClose(t *testing.T) {
@@ -625,7 +662,7 @@ func TestNoFlushWhenSenderIsClosedAndAutoFlushIsDisabled(t *testing.T) {
625662

626663
err = sender.Close(ctx)
627664
assert.NoError(t, err)
628-
assert.Empty(t, qdb.Messages(sender))
665+
assert.NotEmpty(t, qdb.Messages(sender))
629666
}
630667

631668
func TestSuccessAfterRetries(t *testing.T) {

sender.go

Lines changed: 49 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -481,13 +481,61 @@ func LineSenderFromConf(ctx context.Context, conf string) (LineSender, error) {
481481
// sender corresponds to a single client connection. LineSender should
482482
// not be called concurrently by multiple goroutines.
483483
func NewLineSender(ctx context.Context, opts ...LineSenderOption) (LineSender, error) {
484-
conf := &lineSenderConfig{}
484+
var conf *lineSenderConfig
485+
486+
// Iterate over all options to determine the sender type
487+
// This is used to set defaults based on the type of sender (http vs tcp)
488+
// Worst case performance is 2N for the number of LineSenderOptions
489+
tmp := newLineSenderConfig(noSenderType)
490+
for _, opt := range opts {
491+
opt(tmp)
492+
switch tmp.senderType {
493+
case httpSenderType:
494+
conf = newLineSenderConfig(httpSenderType)
495+
case tcpSenderType:
496+
conf = newLineSenderConfig(tcpSenderType)
497+
}
498+
499+
if conf != nil {
500+
break
501+
}
502+
}
503+
504+
if tmp.senderType == noSenderType {
505+
return nil, errors.New("sender type is not specified: use WithHttp or WithTcp")
506+
}
507+
485508
for _, opt := range opts {
486509
opt(conf)
487510
}
488511
return newLineSender(ctx, conf)
489512
}
490513

514+
func newLineSenderConfig(t senderType) *lineSenderConfig {
515+
switch t {
516+
case tcpSenderType:
517+
return &lineSenderConfig{
518+
senderType: t,
519+
address: defaultTcpAddress,
520+
initBufSize: defaultInitBufferSize,
521+
fileNameLimit: defaultFileNameLimit,
522+
}
523+
default:
524+
return &lineSenderConfig{
525+
senderType: t,
526+
address: defaultHttpAddress,
527+
requestTimeout: defaultRequestTimeout,
528+
retryTimeout: defaultRetryTimeout,
529+
minThroughput: defaultMinThroughput,
530+
autoFlushRows: defaultAutoFlushRows,
531+
autoFlushInterval: defaultAutoFlushInterval,
532+
initBufSize: defaultInitBufferSize,
533+
maxBufSize: defaultMaxBufferSize,
534+
fileNameLimit: defaultFileNameLimit,
535+
}
536+
}
537+
}
538+
491539
func newLineSender(ctx context.Context, conf *lineSenderConfig) (LineSender, error) {
492540
switch conf.senderType {
493541
case tcpSenderType:
@@ -538,17 +586,6 @@ func sanitizeTcpConf(conf *lineSenderConfig) error {
538586
return errors.New("tcpKeyId is empty and tcpKey is not. both (or none) must be provided")
539587
}
540588

541-
// Set defaults
542-
if conf.address == "" {
543-
conf.address = defaultTcpAddress
544-
}
545-
if conf.initBufSize == 0 {
546-
conf.initBufSize = defaultInitBufferSize
547-
}
548-
if conf.fileNameLimit == 0 {
549-
conf.fileNameLimit = defaultFileNameLimit
550-
}
551-
552589
return nil
553590
}
554591

@@ -563,35 +600,6 @@ func sanitizeHttpConf(conf *lineSenderConfig) error {
563600
return errors.New("both basic and token authentication cannot be used")
564601
}
565602

566-
// Set defaults
567-
if conf.address == "" {
568-
conf.address = defaultHttpAddress
569-
}
570-
if conf.requestTimeout == 0 {
571-
conf.requestTimeout = defaultRequestTimeout
572-
}
573-
if conf.retryTimeout == 0 {
574-
conf.retryTimeout = defaultRetryTimeout
575-
}
576-
if conf.minThroughput == 0 {
577-
conf.minThroughput = defaultMinThroughput
578-
}
579-
if conf.autoFlushRows == 0 {
580-
conf.autoFlushRows = defaultAutoFlushRows
581-
}
582-
if conf.autoFlushInterval == 0 {
583-
conf.autoFlushInterval = defaultAutoFlushInterval
584-
}
585-
if conf.initBufSize == 0 {
586-
conf.initBufSize = defaultInitBufferSize
587-
}
588-
if conf.maxBufSize == 0 {
589-
conf.maxBufSize = defaultMaxBufferSize
590-
}
591-
if conf.fileNameLimit == 0 {
592-
conf.fileNameLimit = defaultFileNameLimit
593-
}
594-
595603
return nil
596604
}
597605

0 commit comments

Comments
 (0)