Skip to content

Commit d282cfc

Browse files
jerrinotpuzpuzpuz
andauthored
feat(client): add LineSenderFromEnv method (#35)
Also improves the docs around HTTP transport --------- Co-authored-by: Andrey Pechkurov <[email protected]>
1 parent 28bf280 commit d282cfc

File tree

4 files changed

+160
-1
lines changed

4 files changed

+160
-1
lines changed

README.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,10 @@ func main() {
112112
// Alternatively, you can use the LineSenderFromConf function:
113113
// sender, err := qdb.LineSenderFromConf(ctx, "http::addr=localhost:9000;")
114114
// ...
115+
// or you can export the "http::addr=localhost:9000;" config string to
116+
// the QDB_CLIENT_CONF environment variable and use the LineSenderFromEnv function:
117+
// sender, err := qdb.LineSenderFromEnv(ctx)
118+
// ...
115119
defer sender.Close(context.TODO())
116120
// ...
117121
}

http_sender_test.go

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import (
2929
"errors"
3030
"fmt"
3131
"net/http"
32+
"os"
3233
"testing"
3334
"time"
3435

@@ -91,6 +92,35 @@ func TestHttpHappyCasesFromConf(t *testing.T) {
9192
}
9293
}
9394

95+
func TestHttpHappyCasesFromEnv(t *testing.T) {
96+
var (
97+
addr = "localhost:1111"
98+
)
99+
100+
testCases := []httpConfigTestCase{
101+
{
102+
name: "addr only",
103+
config: fmt.Sprintf("http::addr=%s", addr),
104+
},
105+
{
106+
name: "auto flush",
107+
config: fmt.Sprintf("http::addr=%s;auto_flush_rows=100;auto_flush_interval=1000;",
108+
addr),
109+
},
110+
}
111+
112+
for _, tc := range testCases {
113+
t.Run(tc.name, func(t *testing.T) {
114+
os.Setenv("QDB_CLIENT_CONF", tc.config)
115+
sender, err := qdb.LineSenderFromEnv(context.Background())
116+
assert.NoError(t, err)
117+
118+
sender.Close(context.Background())
119+
os.Unsetenv("QDB_CLIENT_CONF")
120+
})
121+
}
122+
}
123+
94124
func TestHttpPathologicalCasesFromConf(t *testing.T) {
95125
testCases := []httpConfigTestCase{
96126
{
@@ -148,6 +178,42 @@ func TestHttpPathologicalCasesFromConf(t *testing.T) {
148178
}
149179
}
150180

181+
func TestHttpPathologicalCasesFromEnv(t *testing.T) {
182+
// Test a few cases just to make sure that the config is read
183+
// from the env variable.
184+
testCases := []httpConfigTestCase{
185+
{
186+
name: "basic_and_token_auth",
187+
config: "http::username=test_user;token=test_token;",
188+
expectedErr: "both basic and token",
189+
},
190+
{
191+
name: "negative max_buf_size",
192+
config: "http::max_buf_size=-1;",
193+
expectedErr: "max buffer size is negative",
194+
},
195+
{
196+
name: "schema is case-sensitive",
197+
config: "hTtp::addr=localhost:1234;",
198+
expectedErr: "invalid schema",
199+
},
200+
}
201+
202+
for _, tc := range testCases {
203+
t.Run(tc.name, func(t *testing.T) {
204+
os.Setenv("QDB_CLIENT_CONF", tc.config)
205+
_, err := qdb.LineSenderFromEnv(context.Background())
206+
assert.ErrorContains(t, err, tc.expectedErr)
207+
os.Unsetenv("QDB_CLIENT_CONF")
208+
})
209+
}
210+
}
211+
212+
func TestHttpEmptyEnvVariableCaseFromEnv(t *testing.T) {
213+
_, err := qdb.LineSenderFromEnv(context.Background())
214+
assert.ErrorContains(t, err, "QDB_CLIENT_CONF environment variable is not set")
215+
}
216+
151217
func TestErrorWhenSenderTypeIsNotSpecified(t *testing.T) {
152218
ctx := context.Background()
153219

sender.go

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@ import (
3030
"fmt"
3131
"math/big"
3232
"net/http"
33+
"os"
34+
"strings"
3335
"time"
3436
)
3537

@@ -39,7 +41,9 @@ import (
3941
// Each sender corresponds to a single client-server connection.
4042
// A sender should not be called concurrently by multiple goroutines.
4143
//
42-
// HTTP senders also reuse connections from a global pool by default.
44+
// HTTP senders reuse connections from a global pool by default. You can
45+
// customize the HTTP transport by passing a custom http.Transport to the
46+
// WithHttpTransport option.
4347
type LineSender interface {
4448
// Table sets the table name (metric) for a new ILP message. Should be
4549
// called before any Symbol or Column method.
@@ -348,6 +352,7 @@ func WithTlsInsecureSkipVerify() LineSenderOption {
348352
// WithHttpTransport sets the client's http transport to the
349353
// passed pointer instead of the global transport. This can be
350354
// used for customizing the http transport used by the LineSender.
355+
// For example to set custom timeouts, TLS settings, etc.
351356
// WithTlsInsecureSkipVerify is ignored when this option is in use.
352357
//
353358
// Only available for the HTTP sender.
@@ -389,6 +394,22 @@ func WithAutoFlushInterval(interval time.Duration) LineSenderOption {
389394
}
390395
}
391396

397+
// LineSenderFromEnv creates a LineSender with a config string defined by the QDB_CLIENT_CONF
398+
// environment variable. See LineSenderFromConf for the config string format.
399+
//
400+
// This is a convenience method suitable for Cloud environments.
401+
func LineSenderFromEnv(ctx context.Context) (LineSender, error) {
402+
conf := strings.TrimSpace(os.Getenv("QDB_CLIENT_CONF"))
403+
if conf == "" {
404+
return nil, errors.New("QDB_CLIENT_CONF environment variable is not set")
405+
}
406+
c, err := confFromStr(conf)
407+
if err != nil {
408+
return nil, err
409+
}
410+
return newLineSender(ctx, c)
411+
}
412+
392413
// LineSenderFromConf creates a LineSender using the QuestDB config string format.
393414
//
394415
// Example config string: "http::addr=localhost;username=joe;password=123;auto_flush_rows=1000;"

tcp_sender_test.go

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ package questdb_test
2727
import (
2828
"context"
2929
"fmt"
30+
"os"
3031
"testing"
3132
"time"
3233

@@ -78,6 +79,72 @@ func TestTcpHappyCasesFromConf(t *testing.T) {
7879
}
7980
}
8081

82+
func TestTcpHappyCasesFromEnv(t *testing.T) {
83+
var (
84+
initBufSize = 4200
85+
)
86+
87+
testServer, err := newTestTcpServer(readAndDiscard)
88+
assert.NoError(t, err)
89+
defer testServer.Close()
90+
91+
addr := testServer.Addr()
92+
93+
testCases := []tcpConfigTestCase{
94+
{
95+
name: "addr only",
96+
config: fmt.Sprintf("tcp::addr=%s;", addr),
97+
},
98+
{
99+
name: "init_buf_size",
100+
config: fmt.Sprintf("tcp::addr=%s;init_buf_size=%d;",
101+
addr, initBufSize),
102+
},
103+
}
104+
105+
for _, tc := range testCases {
106+
t.Run(tc.name, func(t *testing.T) {
107+
os.Setenv("QDB_CLIENT_CONF", tc.config)
108+
sender, err := qdb.LineSenderFromEnv(context.Background())
109+
assert.NoError(t, err)
110+
111+
sender.Close(context.Background())
112+
os.Unsetenv("QDB_CLIENT_CONF")
113+
})
114+
}
115+
}
116+
117+
func TestTcpPathologicalCasesFromEnv(t *testing.T) {
118+
// Test a few cases just to make sure that the config is read
119+
// from the env variable.
120+
testCases := []tcpConfigTestCase{
121+
{
122+
name: "request_timeout",
123+
config: "tcp::request_timeout=5;",
124+
expectedErr: "requestTimeout setting is not available",
125+
},
126+
{
127+
name: "min_throughput",
128+
config: "tcp::min_throughput=5;",
129+
expectedErr: "minThroughput setting is not available",
130+
},
131+
{
132+
name: "auto_flush_rows",
133+
config: "tcp::auto_flush_rows=5;",
134+
expectedErr: "autoFlushRows setting is not available",
135+
},
136+
}
137+
138+
for _, tc := range testCases {
139+
t.Run(tc.name, func(t *testing.T) {
140+
os.Setenv("QDB_CLIENT_CONF", tc.config)
141+
_, err := qdb.LineSenderFromEnv(context.Background())
142+
assert.ErrorContains(t, err, tc.expectedErr)
143+
os.Unsetenv("QDB_CLIENT_CONF")
144+
})
145+
}
146+
}
147+
81148
func TestTcpPathologicalCasesFromConf(t *testing.T) {
82149
testCases := []tcpConfigTestCase{
83150
{
@@ -139,6 +206,7 @@ func TestTcpPathologicalCasesFromConf(t *testing.T) {
139206
})
140207
}
141208
}
209+
142210
func TestErrorOnFlushWhenMessageIsPending(t *testing.T) {
143211
ctx := context.Background()
144212

0 commit comments

Comments
 (0)