Skip to content

Support workerCount parameter of NioServerSocketChannelFactory and NioClientSocketChannelFactory #8

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 15 additions & 2 deletions src/main/java/org/msgpack/rpc/loop/EventLoop.java
Original file line number Diff line number Diff line change
Expand Up @@ -82,13 +82,21 @@ static public EventLoop start(ExecutorService workerExecutor, ExecutorService io
static public EventLoop start(
ExecutorService workerExecutor, ExecutorService ioExecutor,
ScheduledExecutorService scheduledExecutor, MessagePack messagePack) {
return getFactory().make(workerExecutor, ioExecutor, scheduledExecutor, messagePack);
return start(workerExecutor, ioExecutor, scheduledExecutor, messagePack,
2 * Runtime.getRuntime().availableProcessors());
}

static public EventLoop start(
ExecutorService workerExecutor, ExecutorService ioExecutor,
ScheduledExecutorService scheduledExecutor, MessagePack messagePack, int workerCount) {
return getFactory().make(workerExecutor, ioExecutor, scheduledExecutor, messagePack, workerCount);
}

private ExecutorService workerExecutor;
private ExecutorService ioExecutor;
private ScheduledExecutorService scheduledExecutor;
private MessagePack messagePack;
private int workerCount;

public MessagePack getMessagePack() {
return messagePack;
Expand All @@ -99,11 +107,12 @@ public void setMessagePack(MessagePack messagePack) {
}

public EventLoop(ExecutorService workerExecutor, ExecutorService ioExecutor,
ScheduledExecutorService scheduledExecutor, MessagePack messagePack) {
ScheduledExecutorService scheduledExecutor, MessagePack messagePack, int workerCount) {
this.workerExecutor = workerExecutor;
this.scheduledExecutor = scheduledExecutor;
this.ioExecutor = ioExecutor;
this.messagePack = messagePack;
this.workerCount = workerCount;
}

public ExecutorService getWorkerExecutor() {
Expand All @@ -118,6 +127,10 @@ public ScheduledExecutorService getScheduledExecutor() {
return scheduledExecutor;
}

public int getWorkerCount() {
return workerCount;
}

public void shutdown() {
scheduledExecutor.shutdown();
ioExecutor.shutdown();
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/org/msgpack/rpc/loop/EventLoopFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@

public interface EventLoopFactory {
public EventLoop make(ExecutorService workerExecutor, ExecutorService ioExecutor,
ScheduledExecutorService scheduledExecutor, MessagePack messagePack);
ScheduledExecutorService scheduledExecutor, MessagePack messagePack, int workerCount);

// TODO Map<String, String> EventLoopConfig
}
8 changes: 4 additions & 4 deletions src/main/java/org/msgpack/rpc/loop/netty/NettyEventLoop.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@
public class NettyEventLoop extends EventLoop {
public NettyEventLoop(ExecutorService workerExecutor,
ExecutorService ioExecutor,
ScheduledExecutorService scheduledExecutor, MessagePack messagePack) {
super(workerExecutor, ioExecutor, scheduledExecutor, messagePack);
ScheduledExecutorService scheduledExecutor, MessagePack messagePack, int workerCount) {
super(workerExecutor, ioExecutor, scheduledExecutor, messagePack, workerCount);
}

private ClientSocketChannelFactory clientFactory = null;
Expand All @@ -45,15 +45,15 @@ public NettyEventLoop(ExecutorService workerExecutor,
public synchronized ClientSocketChannelFactory getClientFactory() {
if (clientFactory == null) {
clientFactory = new NioClientSocketChannelFactory(getIoExecutor(),
getWorkerExecutor()); // TODO: workerCount
getWorkerExecutor(), getWorkerCount());
}
return clientFactory;
}

public synchronized ServerSocketChannelFactory getServerFactory() {
if (serverFactory == null) {
serverFactory = new NioServerSocketChannelFactory(getIoExecutor(),
getWorkerExecutor()); // TODO: workerCount
getWorkerExecutor(), getWorkerCount());
// messages will be dispatched to worker thread on server.
// see useThread(true) in NettyTcpClientTransport().
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ public NettyEventLoopFactory() {

public EventLoop make(ExecutorService workerExecutor,
ExecutorService ioExecutor,
ScheduledExecutorService scheduledExecutor, MessagePack messagePack) {
ScheduledExecutorService scheduledExecutor, MessagePack messagePack, int workerCount) {
return new NettyEventLoop(workerExecutor, ioExecutor,
scheduledExecutor, messagePack);
scheduledExecutor, messagePack, workerCount);
}
}