Skip to content

Commit 89d21ba

Browse files
committed
write blocks concurrently with columnar writer
this changes the interface to enable accessing clickhouse connection and block interfaces directly, so the caller can create and write additional blocks concurrently
1 parent 8d8c921 commit 89d21ba

7 files changed

+229
-42
lines changed

bootstrap.go

+4
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,10 @@ func (d *bootstrap) Open(dsn string) (driver.Conn, error) {
3737
}
3838

3939
func Open(dsn string) (driver.Conn, error) {
40+
return open(dsn)
41+
}
42+
43+
func open(dsn string) (*clickhouse, error) {
4044
url, err := url.Parse(dsn)
4145
if err != nil {
4246
return nil, err

clickhouse.go

+3-1
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"errors"
99
"fmt"
1010
"regexp"
11+
"sync"
1112
"time"
1213

1314
"github.com/kshvakov/clickhouse/lib/binary"
@@ -27,6 +28,7 @@ var (
2728
type logger func(format string, v ...interface{})
2829

2930
type clickhouse struct {
31+
sync.Mutex
3032
data.ServerInfo
3133
data.ClientInfo
3234
logf logger
@@ -99,7 +101,7 @@ type txOptions struct {
99101
ReadOnly bool
100102
}
101103

102-
func (ch *clickhouse) beginTx(ctx context.Context, opts txOptions) (driver.Tx, error) {
104+
func (ch *clickhouse) beginTx(ctx context.Context, opts txOptions) (*clickhouse, error) {
103105
ch.logf("[begin] tx=%t, data=%t", ch.inTransaction, ch.block != nil)
104106
switch {
105107
case ch.inTransaction:

clickhouse_columnar_test.go

+111
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
package clickhouse_test
2+
3+
import (
4+
"database/sql/driver"
5+
"testing"
6+
"time"
7+
8+
"github.com/kshvakov/clickhouse"
9+
"github.com/stretchr/testify/assert"
10+
)
11+
12+
func Test_ColumnarInsert(t *testing.T) {
13+
const (
14+
ddl = `
15+
CREATE TABLE clickhouse_test_columnar_insert (
16+
uint8 UInt8,
17+
uint16 UInt16,
18+
uint32 UInt32,
19+
uint64 UInt64,
20+
float32 Float32,
21+
float64 Float64,
22+
string String,
23+
fString FixedString(2),
24+
date Date,
25+
datetime DateTime,
26+
enum8 Enum8 ('a' = 1, 'b' = 2),
27+
enum16 Enum16('c' = 1, 'd' = 2)
28+
) Engine=Memory
29+
`
30+
dml = `
31+
INSERT INTO clickhouse_test_columnar_insert (
32+
uint8,
33+
uint16,
34+
uint32,
35+
uint64,
36+
float32,
37+
float64,
38+
string,
39+
fString,
40+
date,
41+
datetime,
42+
enum8,
43+
enum16
44+
) VALUES (
45+
?,
46+
?,
47+
?,
48+
?,
49+
?,
50+
?,
51+
?,
52+
?,
53+
?,
54+
?,
55+
?,
56+
?
57+
)
58+
`
59+
)
60+
if connect, err := clickhouse.OpenDirect("tcp://127.0.0.1:9000?debug=true"); assert.NoError(t, err) {
61+
{
62+
connect.Begin()
63+
stmt, _ := connect.Prepare("DROP TABLE clickhouse_test_columnar_insert")
64+
stmt.Exec([]driver.Value{})
65+
connect.Commit()
66+
}
67+
{
68+
if _, err := connect.Begin(); assert.NoError(t, err) {
69+
if stmt, err := connect.Prepare(ddl); assert.NoError(t, err) {
70+
if _, err := stmt.Exec([]driver.Value{}); assert.NoError(t, err) {
71+
assert.NoError(t, connect.Commit())
72+
}
73+
}
74+
}
75+
}
76+
{
77+
if _, err := connect.Begin(); assert.NoError(t, err) {
78+
if _, err := connect.Prepare(dml); assert.NoError(t, err) {
79+
block, err := connect.Block()
80+
assert.NoError(t, err)
81+
block.Reserve()
82+
block.NumRows = 100
83+
84+
for i := 0; i < 100; i++ {
85+
block.WriteUInt8(0, uint8(i))
86+
block.WriteUInt16(1, uint16(i))
87+
block.WriteUInt32(2, uint32(i))
88+
block.WriteUInt64(3, uint64(i))
89+
90+
block.WriteFloat32(4, float32(i))
91+
block.WriteFloat64(5, float64(i))
92+
93+
block.WriteString(6, "string")
94+
block.WriteFixedString(7, []byte("CH"))
95+
block.WriteDate(8, time.Now())
96+
block.WriteDateTime(9, time.Now())
97+
98+
block.WriteUInt8(10, 1)
99+
block.WriteUInt16(11, 2)
100+
101+
if !assert.NoError(t, err) {
102+
return
103+
}
104+
}
105+
106+
assert.NoError(t, connect.Commit())
107+
}
108+
}
109+
}
110+
}
111+
}

clickhouse_write_block.go

+2
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@ import (
66
)
77

88
func (ch *clickhouse) writeBlock(block *data.Block) error {
9+
ch.Lock()
10+
defer ch.Unlock()
911
if err := ch.encoder.Uvarint(protocol.ClientData); err != nil {
1012
return err
1113
}

examples/columnar.go

+70-24
Original file line numberDiff line numberDiff line change
@@ -3,18 +3,20 @@ package main
33
import (
44
"database/sql/driver"
55
"log"
6+
"sync"
67
"time"
78

89
"github.com/kshvakov/clickhouse"
10+
data "github.com/kshvakov/clickhouse/lib/data"
911
)
1012

1113
func main() {
12-
connect, err := clickhouse.Open("tcp://127.0.0.1:9000?username=&debug=true")
14+
connect, err := clickhouse.OpenDirect("tcp://127.0.0.1:9000?username=&debug=true")
1315
if err != nil {
1416
log.Fatal(err)
1517
}
1618
{
17-
tx, _ := connect.Begin()
19+
connect.Begin()
1820
stmt, _ := connect.Prepare(`
1921
CREATE TABLE IF NOT EXISTS example (
2022
os_id UInt8,
@@ -27,48 +29,92 @@ func main() {
2729
if _, err := stmt.Exec([]driver.Value{}); err != nil {
2830
log.Fatal(err)
2931
}
30-
tx.Commit()
32+
33+
if err := connect.Commit(); err != nil {
34+
log.Fatal(err)
35+
}
3136
}
3237
{
33-
tx, _ := connect.Begin()
34-
stmt, _ := connect.Prepare("INSERT INTO example (os_id, action_day, tags, categories) VALUES (?, ?, ?, ?)")
35-
cstmt, ok := stmt.(clickhouse.ColumnarStatement)
36-
if !ok {
37-
log.Fatal("Column writer is not supported")
38-
}
38+
connect.Begin()
39+
connect.Prepare("INSERT INTO example (os_id, action_day, tags, categories) VALUES (?, ?, ?, ?)")
3940

40-
w := cstmt.ColumnWriter()
41-
for i := 0; i < 1000; i++ {
42-
w.WriteUInt8(0, uint8(10+i))
41+
block, err := connect.Block()
42+
if err != nil {
43+
log.Fatal(err)
4344
}
4445

45-
for i := 0; i < 1000; i++ {
46-
w.WriteDate(1, time.Now())
47-
}
46+
blocks := []*data.Block{block, block.Copy()}
47+
48+
var wg sync.WaitGroup
49+
wg.Add(len(blocks))
4850

49-
for i := 0; i < 1000; i++ {
50-
w.WriteArray(2, clickhouse.Array([]string{"A", "B", "C"}))
51+
for i := range blocks {
52+
b := blocks[i]
53+
go func() {
54+
defer wg.Done()
55+
writeBatch(b, 1000)
56+
if err := connect.WriteBlock(b); err != nil {
57+
log.Fatal(err)
58+
}
59+
}()
5160
}
5261

53-
for i := 0; i < 1000; i++ {
54-
w.WriteArray(3, clickhouse.Array([]uint8{1, 2, 3, 4, 5}))
62+
wg.Wait()
63+
64+
if err := connect.Commit(); err != nil {
65+
log.Fatal(err)
5566
}
67+
}
68+
{
69+
connect.Begin()
70+
stmt, _ := connect.Prepare(`SELECT count() FROM example`)
5671

57-
if err := cstmt.ColumnWriterEnd(1000); err != nil {
72+
rows, err := stmt.Query([]driver.Value{})
73+
if err != nil {
5874
log.Fatal(err)
5975
}
6076

61-
if err := tx.Commit(); err != nil {
77+
columns := rows.Columns()
78+
row := make([]driver.Value, 1)
79+
for rows.Next(row) == nil {
80+
for i, c := range columns {
81+
log.Print(c, " : ", row[i])
82+
}
83+
}
84+
85+
if err := connect.Commit(); err != nil {
6286
log.Fatal(err)
6387
}
6488
}
6589
{
66-
tx, _ := connect.Begin()
90+
connect.Begin()
6791
stmt, _ := connect.Prepare(`DROP TABLE example`)
68-
6992
if _, err := stmt.Exec([]driver.Value{}); err != nil {
7093
log.Fatal(err)
7194
}
72-
tx.Commit()
95+
if err := connect.Commit(); err != nil {
96+
log.Fatal(err)
97+
}
98+
}
99+
}
100+
101+
func writeBatch(block *data.Block, n int) {
102+
block.Reserve()
103+
block.NumRows += uint64(n)
104+
105+
for i := 0; i < n; i++ {
106+
block.WriteUInt8(0, uint8(10+i))
107+
}
108+
109+
for i := 0; i < n; i++ {
110+
block.WriteDate(1, time.Now())
111+
}
112+
113+
for i := 0; i < n; i++ {
114+
block.WriteArray(2, clickhouse.Array([]string{"A", "B", "C"}))
115+
}
116+
117+
for i := 0; i < n; i++ {
118+
block.WriteArray(3, clickhouse.Array([]uint8{1, 2, 3, 4, 5}))
73119
}
74120
}

lib/data/block.go

+8
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,14 @@ type Block struct {
2222
info blockInfo
2323
}
2424

25+
func (block *Block) Copy() *Block {
26+
return &Block{
27+
Columns: block.Columns,
28+
NumColumns: block.NumColumns,
29+
info: block.info,
30+
}
31+
}
32+
2533
func (block *Block) ColumnNames() []string {
2634
names := make([]string, 0, len(block.Columns))
2735
for _, column := range block.Columns {

write_column.go

+31-17
Original file line numberDiff line numberDiff line change
@@ -1,30 +1,26 @@
11
package clickhouse
22

3-
// @todo: restore block_size functionality
4-
53
import (
4+
"database/sql"
5+
"database/sql/driver"
66
"time"
77

8+
"github.com/kshvakov/clickhouse/lib/data"
89
"github.com/kshvakov/clickhouse/lib/types"
910
)
1011

11-
// Statement supporting columnar writer
12-
type ColumnarStatement interface {
13-
ColumnWriter() ColumnWriter
14-
ColumnWriterEnd(rows uint64) error
15-
}
16-
17-
func (stmt *stmt) ColumnWriter() ColumnWriter {
18-
stmt.ch.block.Reserve()
19-
return stmt.ch.block
12+
// Interface for Clickhouse driver
13+
type Clickhouse interface {
14+
Block() (*data.Block, error)
15+
Prepare(query string) (driver.Stmt, error)
16+
Begin() (driver.Tx, error)
17+
Commit() error
18+
Rollback() error
19+
Close() error
20+
WriteBlock(block *data.Block) error
2021
}
2122

22-
func (stmt *stmt) ColumnWriterEnd(rows uint64) error {
23-
stmt.ch.block.NumRows += rows
24-
return nil
25-
}
26-
27-
// Interface for block writer allowing writes to individual columns
23+
// Interface for Block allowing writes to individual columns
2824
type ColumnWriter interface {
2925
WriteDate(c int, v time.Time) error
3026
WriteDateTime(c int, v time.Time) error
@@ -39,3 +35,21 @@ type ColumnWriter interface {
3935
WriteString(c int, v string) error
4036
WriteFixedString(c int, v []byte) error
4137
}
38+
39+
func OpenDirect(dsn string) (Clickhouse, error) {
40+
return open(dsn)
41+
}
42+
43+
func (ch *clickhouse) Block() (*data.Block, error) {
44+
if ch.block == nil {
45+
return nil, sql.ErrTxDone
46+
}
47+
return ch.block, nil
48+
}
49+
50+
func (ch *clickhouse) WriteBlock(block *data.Block) error {
51+
if block == nil {
52+
return sql.ErrTxDone
53+
}
54+
return ch.writeBlock(block)
55+
}

0 commit comments

Comments
 (0)