Skip to content

Commit d6c12a4

Browse files
committed
ClickHouse#74 ClickHouse#86 read/write timeouts
1 parent 86be830 commit d6c12a4

8 files changed

+45
-41
lines changed

.travis.yml

-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
group: deprecated-2017Q4
21
sudo: required
32
language: go
43
go:

bootstrap.go

+18-6
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"os"
1212
"strconv"
1313
"strings"
14+
"sync/atomic"
1415
"time"
1516

1617
"github.com/kshvakov/clickhouse/lib/binary"
@@ -26,12 +27,25 @@ const (
2627
)
2728

2829
var (
30+
unixtime int64
2931
logOutput io.Writer = os.Stdout
3032
hostname, _ = os.Hostname()
3133
)
3234

3335
func init() {
3436
sql.Register("clickhouse", &bootstrap{})
37+
go func() {
38+
for tick := time.Tick(time.Second); ; {
39+
select {
40+
case <-tick:
41+
atomic.AddInt64(&unixtime, int64(time.Second))
42+
}
43+
}
44+
}()
45+
}
46+
47+
func now() time.Time {
48+
return time.Unix(atomic.LoadInt64(&unixtime), 0)
3549
}
3650

3751
type bootstrap struct{}
@@ -112,11 +126,9 @@ func open(dsn string) (*clickhouse, error) {
112126

113127
var (
114128
ch = clickhouse{
115-
logf: func(string, ...interface{}) {},
116-
compress: compress,
117-
blockSize: blockSize,
118-
readTimeout: readTimeout,
119-
writeTimeout: writeTimeout,
129+
logf: func(string, ...interface{}) {},
130+
compress: compress,
131+
blockSize: blockSize,
120132
ServerInfo: data.ServerInfo{
121133
Timezone: time.Local,
122134
},
@@ -131,7 +143,7 @@ func open(dsn string) (*clickhouse, error) {
131143
database,
132144
username,
133145
)
134-
if ch.conn, err = dial(secure, skipVerify, hosts, noDelay, connOpenStrategy, ch.logf); err != nil {
146+
if ch.conn, err = dial(secure, skipVerify, hosts, readTimeout, writeTimeout, noDelay, connOpenStrategy, ch.logf); err != nil {
135147
return nil, err
136148
}
137149
logger.SetPrefix(fmt.Sprintf("[clickhouse][connect=%d]", ch.conn.ident))

clickhouse.go

-2
Original file line numberDiff line numberDiff line change
@@ -43,8 +43,6 @@ type clickhouse struct {
4343
compress bool
4444
blockSize int
4545
inTransaction bool
46-
readTimeout time.Duration
47-
writeTimeout time.Duration
4846
}
4947

5048
func (ch *clickhouse) Prepare(query string) (driver.Stmt, error) {

clickhouse_read_block.go

-7
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,10 @@
11
package clickhouse
22

33
import (
4-
"time"
5-
64
"github.com/kshvakov/clickhouse/lib/data"
75
)
86

97
func (ch *clickhouse) readBlock() (*data.Block, error) {
10-
{
11-
ch.conn.SetReadDeadline(time.Now().Add(ch.readTimeout))
12-
ch.conn.SetWriteDeadline(time.Now().Add(ch.writeTimeout))
13-
}
14-
158
if _, err := ch.decoder.String(); err != nil { // temporary table
169
return nil, err
1710
}

clickhouse_send_query.go

-6
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,12 @@
11
package clickhouse
22

33
import (
4-
"time"
5-
64
"github.com/kshvakov/clickhouse/lib/data"
75
"github.com/kshvakov/clickhouse/lib/protocol"
86
)
97

108
func (ch *clickhouse) sendQuery(query string) error {
119
ch.logf("[send query] %s", query)
12-
{
13-
ch.conn.SetReadDeadline(time.Now().Add(ch.readTimeout))
14-
ch.conn.SetWriteDeadline(time.Now().Add(ch.writeTimeout))
15-
}
1610
if err := ch.encoder.Uvarint(protocol.ClientQuery); err != nil {
1711
return err
1812
}

clickhouse_test.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -1002,7 +1002,7 @@ func Test_Context_Timeout(t *testing.T) {
10021002
{
10031003
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*20)
10041004
defer cancel()
1005-
if row := connect.QueryRowContext(ctx, "SELECT 1, sleep(10)"); assert.NotNil(t, row) {
1005+
if row := connect.QueryRowContext(ctx, "SELECT 1, sleep(2)"); assert.NotNil(t, row) {
10061006
var a, b int
10071007
assert.Equal(t, driver.ErrBadConn, row.Scan(&a, &b))
10081008
}
@@ -1023,7 +1023,7 @@ func Test_Context_Timeout(t *testing.T) {
10231023
func Test_Timeout(t *testing.T) {
10241024
if connect, err := sql.Open("clickhouse", "tcp://127.0.0.1:9000?debug=true&read_timeout=0.2"); assert.NoError(t, err) && assert.NoError(t, connect.Ping()) {
10251025
{
1026-
if row := connect.QueryRow("SELECT 1, sleep(10)"); assert.NotNil(t, row) {
1026+
if row := connect.QueryRow("SELECT 1, sleep(2)"); assert.NotNil(t, row) {
10271027
var a, b int
10281028
assert.Equal(t, driver.ErrBadConn, row.Scan(&a, &b))
10291029
}

clickhouse_write_block.go

-6
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,13 @@
11
package clickhouse
22

33
import (
4-
"time"
5-
64
"github.com/kshvakov/clickhouse/lib/data"
75
"github.com/kshvakov/clickhouse/lib/protocol"
86
)
97

108
func (ch *clickhouse) writeBlock(block *data.Block) error {
119
ch.Lock()
1210
defer ch.Unlock()
13-
{
14-
ch.conn.SetReadDeadline(time.Now().Add(ch.readTimeout))
15-
ch.conn.SetWriteDeadline(time.Now().Add(ch.writeTimeout))
16-
}
1711
if err := ch.encoder.Uvarint(protocol.ClientData); err != nil {
1812
return err
1913
}

connect.go

+25-11
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ const (
2626
connOpenInOrder
2727
)
2828

29-
func dial(secure, skipVerify bool, hosts []string, noDelay bool, openStrategy openStrategy, logf func(string, ...interface{})) (*connect, error) {
29+
func dial(secure, skipVerify bool, hosts []string, readTimeout, writeTimeout time.Duration, noDelay bool, openStrategy openStrategy, logf func(string, ...interface{})) (*connect, error) {
3030
var (
3131
err error
3232
abs = func(v int) int {
@@ -66,10 +66,12 @@ func dial(secure, skipVerify bool, hosts []string, noDelay bool, openStrategy op
6666
tcp.SetNoDelay(noDelay) // Disable or enable the Nagle Algorithm for this tcp socket
6767
}
6868
return &connect{
69-
Conn: conn,
70-
logf: logf,
71-
ident: ident,
72-
buffer: bufio.NewReaderSize(conn, 4*1024*1024),
69+
Conn: conn,
70+
logf: logf,
71+
ident: ident,
72+
buffer: bufio.NewReaderSize(conn, 4*1024*1024),
73+
readTimeout: readTimeout,
74+
writeTimeout: writeTimeout,
7375
}, nil
7476
}
7577
}
@@ -78,10 +80,14 @@ func dial(secure, skipVerify bool, hosts []string, noDelay bool, openStrategy op
7880

7981
type connect struct {
8082
net.Conn
81-
logf func(string, ...interface{})
82-
ident int
83-
buffer *bufio.Reader
84-
closed bool
83+
logf func(string, ...interface{})
84+
ident int
85+
buffer *bufio.Reader
86+
closed bool
87+
readTimeout time.Duration
88+
writeTimeout time.Duration
89+
lastReadDeadlineTime time.Time
90+
lastWriteDeadlineTime time.Time
8591
}
8692

8793
func (conn *connect) Read(b []byte) (int, error) {
@@ -91,10 +97,14 @@ func (conn *connect) Read(b []byte) (int, error) {
9197
total int
9298
dstLen = len(b)
9399
)
100+
if currentTime := now(); conn.readTimeout != 0 && currentTime.Sub(conn.lastReadDeadlineTime) > (conn.readTimeout>>2) {
101+
conn.SetReadDeadline(time.Now().Add(conn.readTimeout))
102+
conn.lastReadDeadlineTime = currentTime
103+
}
94104
for total < dstLen {
95105
if n, err = conn.buffer.Read(b[total:]); err != nil {
96106
conn.logf("[connect] read error: %v", err)
97-
conn.closed = true
107+
conn.Close()
98108
return n, driver.ErrBadConn
99109
}
100110
total += n
@@ -109,10 +119,14 @@ func (conn *connect) Write(b []byte) (int, error) {
109119
total int
110120
srcLen = len(b)
111121
)
122+
if currentTime := now(); conn.writeTimeout != 0 && currentTime.Sub(conn.lastWriteDeadlineTime) > (conn.writeTimeout>>2) {
123+
conn.SetWriteDeadline(time.Now().Add(conn.writeTimeout))
124+
conn.lastWriteDeadlineTime = currentTime
125+
}
112126
for total < srcLen {
113127
if n, err = conn.Conn.Write(b[total:]); err != nil {
114128
conn.logf("[connect] write error: %v", err)
115-
conn.closed = true
129+
conn.Close()
116130
return n, driver.ErrBadConn
117131
}
118132
total += n

0 commit comments

Comments
 (0)