Skip to content

Commit 1a4434f

Browse files
authored
chore(client): add pending message validation to Flush call (#16)
1 parent 464ba3e commit 1a4434f

File tree

3 files changed

+48
-12
lines changed

3 files changed

+48
-12
lines changed

.github/workflows/build.yml

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ jobs:
66
runs-on: ubuntu-latest
77
strategy:
88
matrix:
9-
go-version: [1.17.x]
9+
go-version: [1.20.x]
1010
name: Build with Go ${{ matrix.go-version }}
1111
steps:
1212
- name: Checkout repository
@@ -21,13 +21,15 @@ jobs:
2121
stable: false
2222
go-version: ${{ matrix.go-version }}
2323

24-
- name: Install staticcheck
25-
run: go get honnef.co/go/tools/cmd/[email protected]
24+
- name: Run vet
25+
run: go vet ./...
2626

27-
- name: Run linters
28-
run: |
29-
go vet ./...
30-
staticcheck ./...
27+
- name: Run staticcheck
28+
uses: dominikh/[email protected]
29+
with:
30+
version: "2023.1.2"
31+
install-go: false
32+
cache-key: ${{ matrix.go-version }}
3133

3234
- name: Run tests
3335
run: go test -v ./...

sender.go

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -735,14 +735,15 @@ func (s *LineSender) At(ctx context.Context, ts int64) error {
735735
err := s.lastErr
736736
s.lastErr = nil
737737
if err != nil {
738-
// Discard the partially written message.
739-
s.buf.Truncate(s.lastMsgPos)
738+
s.discardPendingMsg()
740739
return err
741740
}
742741
if !s.hasTable {
742+
s.discardPendingMsg()
743743
return fmt.Errorf("table name was not provided: %w", ErrInvalidMsg)
744744
}
745745
if !s.hasTags && !s.hasFields {
746+
s.discardPendingMsg()
746747
return fmt.Errorf("no symbols or columns were provided: %w", ErrInvalidMsg)
747748
}
748749

@@ -753,9 +754,7 @@ func (s *LineSender) At(ctx context.Context, ts int64) error {
753754
s.buf.WriteByte('\n')
754755

755756
s.lastMsgPos = s.buf.Len()
756-
s.hasTable = false
757-
s.hasTags = false
758-
s.hasFields = false
757+
s.resetMsgFlags()
759758

760759
if s.buf.Len() > s.bufCap {
761760
return s.Flush(ctx)
@@ -776,8 +775,13 @@ func (s *LineSender) Flush(ctx context.Context) error {
776775
err := s.lastErr
777776
s.lastErr = nil
778777
if err != nil {
778+
s.discardPendingMsg()
779779
return err
780780
}
781+
if s.hasTable {
782+
s.discardPendingMsg()
783+
return errors.New("pending ILP message must be finalized with At or AtNow before calling Flush")
784+
}
781785

782786
if err = ctx.Err(); err != nil {
783787
return err
@@ -804,6 +808,17 @@ func (s *LineSender) Flush(ctx context.Context) error {
804808
return nil
805809
}
806810

811+
func (s *LineSender) discardPendingMsg() {
812+
s.buf.Truncate(s.lastMsgPos)
813+
s.resetMsgFlags()
814+
}
815+
816+
func (s *LineSender) resetMsgFlags() {
817+
s.hasTable = false
818+
s.hasTags = false
819+
s.hasFields = false
820+
}
821+
807822
// Messages returns a copy of accumulated ILP messages that are not
808823
// flushed to the TCP connection yet. Useful for debugging purposes.
809824
func (s *LineSender) Messages() string {

sender_test.go

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -309,6 +309,7 @@ func TestErrorOnLengthyNames(t *testing.T) {
309309

310310
err = tc.writerFn(sender)
311311
assert.ErrorContains(t, err, tc.expectedErrMsg)
312+
assert.Empty(t, sender.Messages())
312313

313314
sender.Close()
314315
srv.close()
@@ -519,6 +520,24 @@ func TestErrorOnSymbolCallAfterColumn(t *testing.T) {
519520
}
520521
}
521522

523+
func TestErrorOnFlushWhenMessageIsPending(t *testing.T) {
524+
ctx := context.Background()
525+
526+
srv, err := newTestServer(readAndDiscard)
527+
assert.NoError(t, err)
528+
defer srv.close()
529+
530+
sender, err := qdb.NewLineSender(ctx, qdb.WithAddress(srv.addr))
531+
assert.NoError(t, err)
532+
defer sender.Close()
533+
534+
sender.Table(testTable)
535+
err = sender.Flush(ctx)
536+
537+
assert.ErrorContains(t, err, "pending ILP message must be finalized with At or AtNow before calling Flush")
538+
assert.Empty(t, sender.Messages())
539+
}
540+
522541
func TestInvalidMessageGetsDiscarded(t *testing.T) {
523542
ctx := context.Background()
524543

0 commit comments

Comments
 (0)