Skip to content

Commit d61a2d6

Browse files
committed
prototype
0 parents  commit d61a2d6

12 files changed

+664
-0
lines changed

.gitignore

+24
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
# Compiled Object files, Static and Dynamic libs (Shared Objects)
2+
*.o
3+
*.a
4+
*.so
5+
6+
# Folders
7+
_obj
8+
_test
9+
10+
# Architecture specific extensions/prefixes
11+
*.[568vq]
12+
[568vq].out
13+
14+
*.cgo1.go
15+
*.cgo2.c
16+
_cgo_defun.c
17+
_cgo_gotypes.go
18+
_cgo_export.*
19+
20+
_testmain.go
21+
22+
*.exe
23+
*.test
24+
*.prof

LICENSE

+21
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
MIT License
2+
3+
Copyright (c) 2017 Kirill Shvakov
4+
5+
Permission is hereby granted, free of charge, to any person obtaining a copy
6+
of this software and associated documentation files (the "Software"), to deal
7+
in the Software without restriction, including without limitation the rights
8+
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9+
copies of the Software, and to permit persons to whom the Software is
10+
furnished to do so, subject to the following conditions:
11+
12+
The above copyright notice and this permission notice shall be included in all
13+
copies or substantial portions of the Software.
14+
15+
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16+
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17+
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18+
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19+
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20+
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
21+
SOFTWARE.

README.md

+31
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
# clickhouse
2+
3+
```go
4+
import(
5+
"database/sql"
6+
_ "github.com/kshvakov/clickhouse"
7+
)
8+
func main(){
9+
connect, _ := sql.Open("clickhouse", "http://127.0.0.1:8123?debug=true")
10+
rows, _ := connect.Query("SELECT app_id, language, country, date, datetime FROM stats WHERE app_id IN (?, ?, ?) LIMIT 20", 1, 2, 3)
11+
fmt.Println(rows.Columns())
12+
for rows.Next() {
13+
var (
14+
appID int
15+
language, country string
16+
date, datetime time.Time
17+
)
18+
if err := rows.Scan(&appID, &language, &country, &date, &datetime); err == nil {
19+
fmt.Printf("AppID: %d, language: %s, country: %s, date: %s, datetime: %s\n", appID, language, country, date, datetime)
20+
}
21+
}
22+
23+
tx, _ := connect.Begin()
24+
if stmt, err := tx.Prepare("INSERT INTO imps (a, b, c, d) VALUES (?, ?, ?, ?)"); err == nil {
25+
for i := 0; i < 10; i++ {
26+
stmt.Exec(1, 2, 3, 4)
27+
}
28+
}
29+
tx.Commit()
30+
}
31+
```

connect.go

+203
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,203 @@
1+
package clickhouse
2+
3+
import (
4+
"bytes"
5+
"crypto/tls"
6+
"database/sql"
7+
"database/sql/driver"
8+
"errors"
9+
"fmt"
10+
"io"
11+
"io/ioutil"
12+
"log"
13+
"net"
14+
"net/http"
15+
"net/url"
16+
"os"
17+
"strconv"
18+
"strings"
19+
"time"
20+
)
21+
22+
const (
23+
DefaultTimeout = time.Minute
24+
)
25+
26+
func init() {
27+
sql.Register("clickhouse", &connect{})
28+
}
29+
30+
type logger func(format string, v ...interface{})
31+
32+
var (
33+
ErrTransactionInProgress = errors.New("there is already a transaction in progress")
34+
ErrNoTransactionInProgress = errors.New("there is no transaction in progress")
35+
)
36+
37+
var (
38+
nolog = func(string, ...interface{}) {}
39+
debuglog = log.New(os.Stdout, "[clickhouse]", 0).Printf
40+
)
41+
42+
type connect struct {
43+
http http.Client
44+
log logger
45+
queries []string
46+
buffers []bytes.Buffer
47+
inTransaction bool
48+
}
49+
50+
func (conn *connect) Open(dsn string) (driver.Conn, error) {
51+
url, err := url.Parse(dsn)
52+
if err != nil {
53+
return nil, err
54+
}
55+
var (
56+
hosts = []string{url.Host}
57+
log = nolog
58+
timeout = DefaultTimeout
59+
compress bool
60+
)
61+
if altHosts := strings.Split(url.Query().Get("alt_hosts"), ","); len(altHosts) != 0 && len(altHosts[0]) != 0 {
62+
hosts = append(hosts, altHosts...)
63+
}
64+
if t, err := strconv.ParseInt(url.Query().Get("timeout"), 10, 64); err == nil && t != 0 {
65+
timeout = time.Duration(t) * time.Second
66+
}
67+
if v, err := strconv.ParseBool(url.Query().Get("compress")); err == nil {
68+
compress = v
69+
}
70+
if debug, err := strconv.ParseBool(url.Query().Get("debug")); err == nil && debug {
71+
log = debuglog
72+
log("hosts: %v, timeout: %s, compress: %t", hosts, timeout, compress)
73+
if username := url.Query().Get("username"); len(username) != 0 {
74+
log("[basic auth], username: %s, password: %s", username, url.Query().Get("password"))
75+
}
76+
}
77+
return &connect{
78+
log: log,
79+
http: http.Client{
80+
Timeout: timeout,
81+
Transport: &transport{
82+
hosts: hosts,
83+
scheme: url.Scheme,
84+
compress: compress,
85+
username: url.Query().Get("username"),
86+
password: url.Query().Get("password"),
87+
origin: &http.Transport{
88+
DialContext: (&net.Dialer{
89+
Timeout: 30 * time.Second,
90+
KeepAlive: 30 * time.Second,
91+
}).DialContext,
92+
MaxIdleConns: 100,
93+
MaxIdleConnsPerHost: 20,
94+
IdleConnTimeout: 90 * time.Second,
95+
TLSHandshakeTimeout: 10 * time.Second,
96+
ExpectContinueTimeout: 1 * time.Second,
97+
TLSClientConfig: &tls.Config{
98+
InsecureSkipVerify: true,
99+
},
100+
},
101+
},
102+
},
103+
}, nil
104+
}
105+
106+
func (conn *connect) Prepare(query string) (driver.Stmt, error) {
107+
conn.log("[connect] prepare: %s", query)
108+
var (
109+
index int
110+
numInput = len(strings.Split(query, "?")) - 1
111+
)
112+
if table, ok := isInsert(query); ok {
113+
query = "INSERT INTO " + table
114+
if conn.inTransaction {
115+
conn.queries = append(conn.queries, query)
116+
conn.buffers = append(conn.buffers, bytes.Buffer{})
117+
index = len(conn.buffers) - 1
118+
conn.log("[connect] [prepare] tx len: %d", len(conn.queries))
119+
}
120+
}
121+
return &stmt{
122+
conn: conn,
123+
query: query,
124+
index: index,
125+
numInput: numInput,
126+
}, nil
127+
}
128+
129+
func (conn *connect) Begin() (driver.Tx, error) {
130+
conn.log("[connect] begin")
131+
if conn.inTransaction {
132+
return nil, ErrTransactionInProgress
133+
}
134+
conn.inTransaction = true
135+
return conn, nil
136+
}
137+
138+
func (conn *connect) Commit() error {
139+
conn.log("[connect] commit")
140+
defer conn.reset()
141+
if !conn.inTransaction {
142+
return ErrNoTransactionInProgress
143+
}
144+
for index, query := range conn.queries {
145+
if _, err := conn.do(query, &conn.buffers[index]); err != nil {
146+
return err
147+
}
148+
}
149+
return nil
150+
}
151+
152+
func (conn *connect) do(query string, data io.Reader) (io.Reader, error) {
153+
conn.log("[connect] [do] query: %s", query)
154+
if _, ok := isInsert(query); ok {
155+
query += " FORMAT TabSeparated"
156+
} else {
157+
query += " FORMAT TabSeparatedWithNamesAndTypes"
158+
}
159+
response, err := conn.http.Post("?"+(&url.Values{"query": []string{query}}).Encode(), "application/x-www-form-urlencoded", data)
160+
if err != nil {
161+
return nil, err
162+
}
163+
defer response.Body.Close()
164+
if response.StatusCode != http.StatusOK {
165+
message, err := ioutil.ReadAll(response.Body)
166+
if err != nil {
167+
return nil, err
168+
}
169+
return nil, fmt.Errorf(string(message))
170+
}
171+
var body bytes.Buffer
172+
if _, err := io.Copy(&body, response.Body); err != nil {
173+
return nil, err
174+
}
175+
return &body, nil
176+
}
177+
178+
func (conn *connect) Rollback() error {
179+
conn.log("[connect] rollback")
180+
defer conn.reset()
181+
if !conn.inTransaction {
182+
return ErrNoTransactionInProgress
183+
}
184+
return nil
185+
}
186+
187+
func (conn *connect) Close() error {
188+
conn.log("[connect] close")
189+
conn.reset()
190+
return nil
191+
}
192+
193+
func (conn *connect) reset() {
194+
conn.log("[connect] reset")
195+
if conn.inTransaction {
196+
conn.inTransaction = false
197+
for _, buffer := range conn.buffers {
198+
buffer.Reset()
199+
}
200+
conn.queries = conn.queries[0:0]
201+
conn.buffers = conn.buffers[0:0]
202+
}
203+
}

connect_test.go

+84
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
package clickhouse
2+
3+
import (
4+
"database/sql"
5+
"testing"
6+
"time"
7+
8+
"github.com/stretchr/testify/assert"
9+
)
10+
11+
func Test_Open(t *testing.T) {
12+
connect, err := sql.Open("clickhouse", "http://127.0.0.1:8123?username=username&password=password&timeout=90&debug=true&alt_hosts=127.0.0.1:8123,127.0.0.1:8124")
13+
if assert.NoError(t, err) {
14+
assert.NoError(t, connect.Ping())
15+
}
16+
}
17+
18+
func Test_Batch(t *testing.T) {
19+
connect, err := sql.Open("clickhouse", "http://127.0.0.1:8123?debug=true")
20+
if assert.NoError(t, err) {
21+
assert.NoError(t, connect.Ping())
22+
}
23+
if tx, err := connect.Begin(); assert.NoError(t, err) {
24+
tx.Query("SELECT * FROM imps")
25+
if stmt, err := tx.Prepare("INSERT INTO imps (a, b, c, d) VALUES (?, ?, ?, ?)"); assert.NoError(t, err) {
26+
for i := 0; i < 10; i++ {
27+
if _, err := stmt.Exec(1, 2, 3, 4); assert.NoError(t, err) {
28+
}
29+
}
30+
}
31+
tx.Query("SELECT * FROM imps2")
32+
if stmt, err := tx.Prepare("INSERT INTO imps2 VALUES (?, ?, ?)"); assert.NoError(t, err) {
33+
for i := 0; i < 10; i++ {
34+
if _, err := stmt.Exec(1, 2, "tab tab tab"); assert.NoError(t, err) {
35+
}
36+
}
37+
}
38+
if err := tx.Commit(); assert.NoError(t, err) {
39+
assert.Equal(t, sql.ErrTxDone, tx.Rollback())
40+
}
41+
}
42+
}
43+
44+
func Test_Exec(t *testing.T) {
45+
connect, err := sql.Open("clickhouse", "http://127.0.0.1:8123?debug=true")
46+
if assert.NoError(t, err) {
47+
assert.NoError(t, connect.Ping())
48+
}
49+
if _, err := connect.Exec("INSERT INTO simple_exec VALUES (?, ?, ?)", "a", "b", "c"); assert.NoError(t, err) {
50+
if _, err := connect.Exec("INSERT INTO simple_exec2 VALUES (?, ?, ?)", "a2", "b2", "c2"); assert.NoError(t, err) {
51+
}
52+
}
53+
}
54+
55+
/*
56+
CREATE TABLE stats
57+
(
58+
app_id UInt32,
59+
language FixedString(2),
60+
country FixedString(2),
61+
date Date,
62+
datetime DateTime
63+
) ENGINE = Memory
64+
*/
65+
func Test_Query(t *testing.T) {
66+
connect, err := sql.Open("clickhouse", "http://127.0.0.1:8123?debug=true")
67+
if err != nil {
68+
t.Fatal(err)
69+
}
70+
rows, err := connect.Query("SELECT app_id, language, country, date, datetime FROM stats WHERE app_id IN (?, ?, ?) LIMIT 20", 1, 2, 3)
71+
if assert.NoError(t, err) {
72+
t.Log(rows.Columns())
73+
for rows.Next() {
74+
var (
75+
appID int
76+
language, country string
77+
date, datetime time.Time
78+
)
79+
if err := rows.Scan(&appID, &language, &country, &date, &datetime); assert.NoError(t, err) {
80+
t.Logf("AppID: %d, language: %s, country: %s, date: %s, datetime: %s", appID, language, country, date, datetime)
81+
}
82+
}
83+
}
84+
}

0 commit comments

Comments
 (0)