-
Notifications
You must be signed in to change notification settings - Fork 615
Netty 线程模型
javahongxi edited this page Aug 20, 2019
·
33 revisions
RocketMQ NettyRemotingServer
ServerBootstrap childHandler =
// EventLoopGroup: bossGroup, workerGroup
this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector)
.channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
.option(ChannelOption.SO_REUSEADDR, true)
.option(ChannelOption.SO_KEEPALIVE, false)
.childOption(ChannelOption.TCP_NODELAY, true)
.childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize())
.childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize())
.localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort()))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline()
// DefaultEventExecutorGroup: optionalGroup (相当于netty提供的业务线程池)
.addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME,
new HandshakeHandler(TlsSystemConfig.tlsMode))
.addLast(defaultEventExecutorGroup,
new NettyEncoder(),
new NettyDecoder(),
new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
new NettyConnectManageHandler(),
// NettyServerHandler里有业务处理线程池
new NettyServerHandler()
);
}
});
bossGroup,workerGroup,optionalGroup实质都用的是 MultithreadEventExecutorGroup
, 其维护的nThreads个executor都是ThreadPerTaskExecutor
protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) {
if (nThreads <= 0) {
throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
}
// 使用者没传threadFactory时,executor = null
// 传了时,executor = new ThreadPerTaskExecutor(threadFactory)
if (executor == null) {
executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
}
children = new EventExecutor[nThreads];
for (int i = 0; i < nThreads; i ++) {
boolean success = false;
try {
children[i] = newChild(executor, args);
success = true;
} catch (Exception e) {
// TODO: Think about if this is a good exception type
throw new IllegalStateException("failed to create a child event loop", e);
} finally {
// ...
}
}
}
ServerBootstrap
(main线程)
@Override
void init(Channel channel) throws Exception {
// ...
ChannelPipeline p = channel.pipeline();
final EventLoopGroup currentChildGroup = childGroup;
final ChannelHandler currentChildHandler = childHandler;
final Entry<ChannelOption<?>, Object>[] currentChildOptions;
final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
synchronized (childOptions) {
currentChildOptions = childOptions.entrySet().toArray(newOptionArray(0));
}
synchronized (childAttrs) {
currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(0));
}
// ChannelInitializer的特点是执行initChannel后会随即被删除
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(final Channel ch) throws Exception {
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);
}
// 服务端channel的eventLoop去执行如下task,接下来我们关注eventLoop.execute(Runnable task)是怎么执行的
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
}
NioEventLoop extends SingleThreadEventLoop extends SingleThreadEventExecutor
SingleThreadEventExecutor
implements Executor
private final Queue<Runnable> taskQueue;
private volatile Thread thread;
private final Executor executor; // ThreadPerTaskExecutor
@Override
public void execute(Runnable task) {
if (task == null) {
throw new NullPointerException("task");
}
boolean inEventLoop = inEventLoop();
addTask(task); // taskQueue
if (!inEventLoop) {
startThread(); // 接下来看这个
if (isShutdown() && removeTask(task)) {
reject();
}
}
if (!addTaskWakesUp && wakesUpForTask(task)) {
wakeup(inEventLoop);
}
}
private void doStartThread() {
assert thread == null;
// ThreadPerTaskExecutor
executor.execute(new Runnable() {
@Override
public void run() {
// 当前线程即ThreadPerTaskExecutor创建的线程
thread = Thread.currentThread();
if (interrupted) {
thread.interrupt();
}
boolean success = false;
updateLastExecutionTime();
try {
SingleThreadEventExecutor.this.run(); // 接下来看这个
success = true;
} catch (Throwable t) {
logger.warn("Unexpected exception from an event executor: ", t);
} finally {
// ...
}
}
});
}
NioEventLoop
@Override
protected void run() {
for (;;) {
try {
// 我们以为是select(),其实可能是selectNow(),前者阻塞,后者非阻塞
switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
case SelectStrategy.CONTINUE: // -2
continue;
case SelectStrategy.BUSY_WAIT: // -3
// fall-through to SELECT since the busy-wait is not supported with NIO
case SelectStrategy.SELECT: // -1
select(wakenUp.getAndSet(false));
if (wakenUp.get()) {
selector.wakeup();
}
default:
}
// 当hasTasks()为true时,以上case都不满足,直接走到这里
cancelledKeys = 0;
needsToSelectAgain = false;
final int ioRatio = this.ioRatio;
if (ioRatio == 100) {
try {
processSelectedKeys();
} finally {
// Ensure we always run tasks.
runAllTasks();
}
} else { // 默认
final long ioStartTime = System.nanoTime();
try {
processSelectedKeys(); // 接下来看这里
} finally {
// Ensure we always run tasks.
final long ioTime = System.nanoTime() - ioStartTime;
runAllTasks(ioTime * (100 - ioRatio) / ioRatio); // 接下来看这里
}
}
} catch (Throwable t) {
handleLoopException(t);
}
// ...
}
}
DefaultSelectStrategy
@Override
public int calculateStrategy(IntSupplier selectSupplier, boolean hasTasks) throws Exception {
return hasTasks ? selectSupplier.get() : SelectStrategy.SELECT; // 显然有task时返回值 >= 0
}
NioEventLoop
private final IntSupplier selectNowSupplier = new IntSupplier() {
@Override
public int get() throws Exception {
return selectNow(); // >= 0
}
};
wiki.hongxi.org
首页
Java核心技术
- JUC JMM与线程安全
- JUC 指令重排与内存屏障
- JUC Java内存模型FAQ
- JUC 同步和Java内存模型
- JUC volatile实现原理
- JUC AQS详解
- JUC AQS理解
- JUC synchronized优化
- JUC 线程和同步
- JUC 线程状态
- JUC 线程通信
- JUC ThreadLocal介绍及原理
- JUC 死锁及避免方案
- JUC 读写锁简单实现
- JUC 信号量
- JUC 阻塞队列
- NIO Overview
- NIO Channel
- NIO Buffer
- NIO Scatter与Gather
- NIO Channel to Channel Transfers
- NIO Selector
- NIO FileChannel
- NIO SocketChannel
- NIO ServerSocketChannel
- NIO Non-blocking Server
- NIO DatagramChannel
- NIO Pipe
- NIO NIO vs. IO
- NIO DirectBuffer
- NIO zero-copy
- NIO Source Code
- NIO HTTP Protocol
- NIO epoll bug
- Reflection 基础
- Reflection 动态代理
- JVM相关
- 设计模式典型案例
Netty
RocketMQ深入研究
kafka深入研究
Pulsar深入研究
Dubbo源码导读
- Dubbo SPI
- Dubbo 自适应拓展机制
- Dubbo 服务导出
- Dubbo 服务引用
- Dubbo 服务字典
- Dubbo 服务路由
- Dubbo 集群
- Dubbo 负载均衡
- Dubbo 服务调用过程
微服务架构
Redis
Elasticsearch
其他
- Dubbo 框架设计
- Dubbo 优雅停机
- dubbo-spring-boot-starter使用指南
- rocketmq-spring-boot-starter使用指南
- Mybatis multi-database in spring-boot 2
- RocketMQ 客户端简单封装
- Otter 入门
杂谈
关于我