Skip to content

Commit b8a2e1c

Browse files
committed
Separate thread pool for I/O, add more tests for streaming, along with other minor changes for code clean up
1 parent 54cd54c commit b8a2e1c

21 files changed

+528
-155
lines changed

clickhouse-client/src/main/java/com/clickhouse/client/AbstractClient.java

Lines changed: 7 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -170,9 +170,8 @@ public void init(ClickHouseConfig config) {
170170
this.config = config;
171171
if (this.executor == null) { // only initialize once
172172
int threads = config.getMaxThreadsPerClient();
173-
this.executor = threads <= 0 ? ClickHouseClient.getExecutorService()
174-
: ClickHouseUtils.newThreadPool(getClass().getSimpleName(), threads,
175-
config.getMaxQueuedRequests());
173+
this.executor = threads < 1 ? ClickHouseClient.getExecutorService()
174+
: ClickHouseUtils.newThreadPool(this, threads, config.getMaxQueuedRequests());
176175
}
177176

178177
initialized = true;
@@ -196,27 +195,16 @@ public final void close() {
196195
try {
197196
server = null;
198197

199-
if (executor != null) {
200-
executor.shutdown();
201-
}
202-
203198
if (connection != null) {
204199
closeConnection(connection, false);
200+
connection = null;
205201
}
206202

207-
// shutdown* won't shutdown commonPool, so awaitTermination will always time out
208-
// on the other hand, for a client-specific thread pool, we'd better shut it
209-
// down for real
210-
if (executor != null && config.getMaxThreadsPerClient() > 0
211-
&& !executor.awaitTermination(config.getConnectionTimeout(), TimeUnit.MILLISECONDS)) {
212-
executor.shutdownNow();
203+
// avoid shutting down shared thread pool
204+
if (executor != null && config.getMaxThreadsPerClient() > 0 && !executor.isTerminated()) {
205+
executor.shutdown();
213206
}
214-
215207
executor = null;
216-
connection = null;
217-
} catch (InterruptedException e) {
218-
log.warn("Got interrupted when closing client", e);
219-
Thread.currentThread().interrupt();
220208
} catch (Exception e) {
221209
log.warn("Exception occurred when closing client", e);
222210
} finally {
@@ -226,7 +214,7 @@ public final void close() {
226214
closeConnection(connection, true);
227215
}
228216

229-
if (executor != null) {
217+
if (executor != null && config.getMaxThreadsPerClient() > 0) {
230218
executor.shutdownNow();
231219
}
232220
} finally {

clickhouse-client/src/main/java/com/clickhouse/client/ClickHouseClientBuilder.java

Lines changed: 9 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66
import java.util.Objects;
77
import java.util.ServiceLoader;
88
import java.util.concurrent.ExecutorService;
9-
import java.util.concurrent.ForkJoinPool;
109

1110
import com.clickhouse.client.config.ClickHouseOption;
1211
import com.clickhouse.client.config.ClickHouseDefaults;
@@ -23,23 +22,17 @@ public class ClickHouseClientBuilder {
2322
static {
2423
int maxThreads = (int) ClickHouseDefaults.MAX_THREADS.getEffectiveDefaultValue();
2524
int maxRequests = (int) ClickHouseDefaults.MAX_REQUESTS.getEffectiveDefaultValue();
25+
long keepAliveTimeoutMs = (long) ClickHouseDefaults.THREAD_KEEPALIVE_TIMEOUT.getEffectiveDefaultValue();
2626

27-
if (maxThreads <= 0 && maxRequests <= 0) {
28-
// java -XX:+UnlockDiagnosticVMOptions -XX:NativeMemoryTracking=summary
29-
// -XX:+PrintNMTStatistics -version
30-
defaultExecutor = ForkJoinPool.commonPool();
31-
} else {
32-
if (maxThreads <= 0) {
33-
maxThreads = Runtime.getRuntime().availableProcessors();
34-
}
35-
36-
if (maxRequests <= 0) {
37-
maxRequests = 0;
38-
}
39-
40-
defaultExecutor = ClickHouseUtils.newThreadPool(ClickHouseClient.class.getSimpleName(), maxThreads,
41-
maxRequests);
27+
if (maxThreads <= 0) {
28+
maxThreads = Runtime.getRuntime().availableProcessors();
29+
}
30+
if (maxRequests <= 0) {
31+
maxRequests = 0;
4232
}
33+
34+
defaultExecutor = ClickHouseUtils.newThreadPool(ClickHouseClient.class.getSimpleName(), maxThreads,
35+
maxThreads * 2, maxRequests, keepAliveTimeoutMs);
4336
}
4437

4538
protected ClickHouseConfig config;

clickhouse-client/src/main/java/com/clickhouse/client/ClickHouseException.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ private static String buildErrorMessage(int code, String message, ClickHouseNode
3737
}
3838

3939
if (server != null) {
40-
builder.append(" on server ").append(server);
40+
builder.append(", server ").append(server);
4141
}
4242

4343
return builder.toString();

clickhouse-client/src/main/java/com/clickhouse/client/ClickHouseInputStream.java

Lines changed: 59 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -11,13 +11,17 @@
1111
import java.util.concurrent.TimeUnit;
1212

1313
/**
14-
* Extended input stream.
14+
* Extended input stream for read optimization.
1515
*/
1616
public abstract class ClickHouseInputStream extends InputStream {
17+
/**
18+
* Empty byte array.
19+
*/
20+
public static final byte[] EMPTY_BYTES = new byte[0];
1721
/**
1822
* Empty and read-only byte buffer.
1923
*/
20-
public static final ByteBuffer EMPTY = ByteBuffer.wrap(new byte[0]).asReadOnlyBuffer();
24+
public static final ByteBuffer EMPTY_BUFFER = ByteBuffer.wrap(EMPTY_BYTES).asReadOnlyBuffer();
2125

2226
static final class BlockingInputStream extends ClickHouseInputStream {
2327
private final BlockingQueue<ByteBuffer> queue;
@@ -28,7 +32,7 @@ static final class BlockingInputStream extends ClickHouseInputStream {
2832
private boolean closed;
2933

3034
BlockingInputStream(BlockingQueue<ByteBuffer> queue, int timeout) {
31-
this.queue = queue;
35+
this.queue = ClickHouseChecker.nonNull(queue, "Queue");
3236
this.timeout = timeout;
3337

3438
this.buffer = null;
@@ -37,10 +41,12 @@ static final class BlockingInputStream extends ClickHouseInputStream {
3741

3842
private void ensureOpen() throws IOException {
3943
if (closed) {
40-
throw new IOException("Stream has been closed");
44+
throw new IOException(
45+
ClickHouseUtils.format("Blocking stream(queue: %d, buffer: %d) has been closed",
46+
queue.size(), buffer != null ? buffer.remaining() : 0));
4147
}
4248

43-
if (buffer == null || (buffer != EMPTY && !buffer.hasRemaining())) {
49+
if (buffer == null || (buffer != EMPTY_BUFFER && !buffer.hasRemaining())) {
4450
updateBuffer();
4551
}
4652
}
@@ -65,9 +71,11 @@ private int updateBuffer() throws IOException {
6571

6672
@Override
6773
public int available() throws IOException {
68-
ensureOpen();
74+
if (closed || buffer == EMPTY_BUFFER) {
75+
return 0;
76+
}
6977

70-
return buffer.remaining();
78+
return (buffer == null || !buffer.hasRemaining()) ? updateBuffer() : buffer.remaining();
7179
}
7280

7381
@Override
@@ -87,7 +95,7 @@ public void close() throws IOException {
8795
public byte readByte() throws IOException {
8896
ensureOpen();
8997

90-
if (buffer == EMPTY) {
98+
if (buffer == EMPTY_BUFFER) {
9199
close();
92100
throw new EOFException();
93101
}
@@ -99,7 +107,7 @@ public byte readByte() throws IOException {
99107
public int read() throws IOException {
100108
ensureOpen();
101109

102-
if (buffer == EMPTY) {
110+
if (buffer == EMPTY_BUFFER) {
103111
return -1;
104112
}
105113

@@ -112,7 +120,7 @@ public int read(byte[] b, int off, int len) throws IOException {
112120

113121
int counter = 0;
114122
while (len > 0) {
115-
if (buffer == EMPTY) {
123+
if (buffer == EMPTY_BUFFER) {
116124
return counter > 0 ? counter : -1;
117125
}
118126

@@ -162,7 +170,7 @@ public long skip(long n) throws IOException {
162170
// peforms better but this is a bit tricky
163171
if (n == Long.MAX_VALUE) {
164172
long counter = buffer.remaining();
165-
while (buffer != EMPTY && buffer.limit() > 0) {
173+
while (buffer != EMPTY_BUFFER && buffer.limit() > 0) {
166174
counter += buffer.limit();
167175
updateBuffer();
168176
}
@@ -184,20 +192,32 @@ static final class WrappedInputStream extends ClickHouseInputStream {
184192
closed = false;
185193
}
186194

195+
private void ensureOpen() throws IOException {
196+
if (closed) {
197+
throw new IOException(ClickHouseUtils.format("Wrapped stream(%s) has been closed", in));
198+
}
199+
}
200+
187201
@Override
188202
public int available() throws IOException {
189203
return !closed ? in.available() : 0;
190204
}
191205

192206
@Override
193207
public byte readByte() throws IOException {
208+
ensureOpen();
209+
194210
int v = in.read();
195-
if (v == -1) {
196-
close();
197-
throw new EOFException();
211+
if (v != -1) {
212+
return (byte) v;
198213
}
199214

200-
return (byte) v;
215+
try {
216+
close();
217+
} catch (IOException e) {
218+
// ignore
219+
}
220+
throw new EOFException();
201221
}
202222

203223
@Override
@@ -207,25 +227,30 @@ public boolean isClosed() {
207227

208228
@Override
209229
public void close() throws IOException {
210-
try {
211-
in.close();
212-
} finally {
213-
closed = true;
230+
if (!closed) {
231+
try {
232+
in.close();
233+
} finally {
234+
closed = true;
235+
}
214236
}
215237
}
216238

217239
@Override
218240
public int read() throws IOException {
241+
ensureOpen();
219242
return in.read();
220243
}
221244

222245
@Override
223246
public int read(byte[] b, int off, int len) throws IOException {
247+
ensureOpen();
224248
return in.read(b, off, len);
225249
}
226250

227251
@Override
228252
public long skip(long n) throws IOException {
253+
ensureOpen();
229254
return in.skip(n);
230255
}
231256
}
@@ -238,21 +263,23 @@ public long skip(long n) throws IOException {
238263
* @return wrapped input
239264
*/
240265
public static ClickHouseInputStream of(BlockingQueue<ByteBuffer> queue, int timeout) {
241-
return new BlockingInputStream(ClickHouseChecker.nonNull(queue, "queue"), timeout);
266+
return new BlockingInputStream(queue, timeout);
242267
}
243268

244269
/**
245270
* Wraps the given input stream.
246271
*
247272
* @param input non-null input stream
248-
* @return wrapped input
273+
* @return wrapped input, or the same input if it's instance of
274+
* {@link ClickHouseInputStream}
249275
*/
250276
public static ClickHouseInputStream of(InputStream input) {
251277
return input instanceof ClickHouseInputStream ? (ClickHouseInputStream) input : new WrappedInputStream(input);
252278
}
253279

254280
/**
255-
* Reads an unsigned byte from the input stream.
281+
* Reads an unsigned byte from the input stream. Unlike {@link #read()}, it will
282+
* throw {@link IOException} if the input stream has been closed.
256283
*
257284
* @return unsigned byte
258285
* @throws IOException when failed to read value from input stream or reached
@@ -263,8 +290,10 @@ public int readUnsignedByte() throws IOException {
263290
}
264291

265292
/**
266-
* Reads one single byte from the input stream. It's supposed to be faster than
267-
* {@link #read()}.
293+
* Reads one single byte from the input stream. Unlike {@link #read()}, it will
294+
* throw {@link IOException} if the input stream has been closed. In general,
295+
* this method should be faster than {@link #read()}, especially when it's an
296+
* input stream backed by byte[] or {@link java.nio.ByteBuffer}.
268297
*
269298
* @return byte value if present
270299
* @throws IOException when failed to read value from input stream or reached
@@ -274,14 +303,19 @@ public int readUnsignedByte() throws IOException {
274303

275304
/**
276305
* Reads {@code length} bytes from the input stream. It behaves in the same
277-
* way as {@link java.io.DataInput#readFully(byte[])}.
306+
* way as {@link java.io.DataInput#readFully(byte[])}, and it will throw
307+
* {@link IOException} when the input stream has been closed.
278308
*
279309
* @param length number of bytes to read
280310
* @return byte array and its length should be {@code length}
281311
* @throws IOException when failed to read value from input stream, not able to
282312
* retrieve all bytes, or reached end of the stream
283313
*/
284314
public byte[] readBytes(int length) throws IOException {
315+
if (length <= 0) {
316+
return EMPTY_BYTES;
317+
}
318+
285319
byte[] bytes = new byte[length];
286320

287321
for (int l = length, c = 0, n = 0; l > 0; l -= n) {
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
package com.clickhouse.client;
2+
3+
import java.util.concurrent.ThreadFactory;
4+
import java.util.concurrent.atomic.AtomicInteger;
5+
6+
public class ClickHouseThreadFactory implements ThreadFactory {
7+
private final boolean daemon;
8+
private final int priority;
9+
10+
private final ThreadGroup group;
11+
private final String namePrefix;
12+
private final AtomicInteger threadNumber;
13+
14+
public ClickHouseThreadFactory(Object owner) {
15+
this(owner, false, Thread.NORM_PRIORITY);
16+
}
17+
18+
public ClickHouseThreadFactory(Object owner, boolean daemon, int priority) {
19+
String prefix = null;
20+
if (owner instanceof String) {
21+
prefix = ((String) owner).trim();
22+
} else if (owner != null) {
23+
prefix = new StringBuilder().append(owner.getClass().getSimpleName()).append('@').append(owner.hashCode())
24+
.toString();
25+
}
26+
this.daemon = daemon;
27+
this.priority = ClickHouseChecker.between(priority, "Priority", Thread.MIN_PRIORITY, Thread.MAX_PRIORITY);
28+
29+
SecurityManager s = System.getSecurityManager();
30+
group = s != null ? s.getThreadGroup() : Thread.currentThread().getThreadGroup();
31+
namePrefix = !ClickHouseChecker.isNullOrBlank(prefix) ? prefix
32+
: new StringBuilder().append(getClass().getSimpleName()).append('@').append(hashCode())
33+
.append('-').toString();
34+
threadNumber = new AtomicInteger(1);
35+
}
36+
37+
@Override
38+
public Thread newThread(Runnable r) {
39+
Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0);
40+
if (daemon != t.isDaemon()) {
41+
t.setDaemon(daemon);
42+
}
43+
if (priority != t.getPriority()) {
44+
t.setPriority(priority);
45+
}
46+
// t.setUncaughtExceptionHandler(null);
47+
return t;
48+
}
49+
}

0 commit comments

Comments
 (0)