Skip to content

Netty 线程模型

javahongxi edited this page Aug 4, 2019 · 33 revisions

线程模型是Netty的核心设计,设计地很巧妙,之前项目中有一块处理并发的设计和Netty的Eventloop单线程设计类似,效果得到了实证。

Netty4的类层次结构和之前的版本变化很大,网上也有很多文章写Netty的线程模型,Reactor模式,但是有几个关键的概念大都没讲清楚。

这篇文章只讲Netty4线程模型最重要的几个关键点

第一个概念是如何理解NioEventLoop和NioEventLoopGroup:NioEventLoop实际上就是工作线程,可以直接理解为一个线程。NioEventLoopGroup是一个线程池,线程池中的线程就是NioEventLoop。Netty设计这几个类的时候,层次结构挺复杂,反而让人迷惑。

还有一个让人迷惑的地方是,创建ServerBootstrap时,要传递两个NioEventLoopGroup线程池,一个叫bossGroup,一个叫workGroup。《Netty权威指南》里只说了bossGroup是用来处理TCP连接请求的,workGroup是来处理IO事件的。

这么说是没错,但是没说清楚bossGroup具体如何处理TCP请求的。实际上bossGroup中有多个NioEventLoop线程,每个NioEventLoop绑定一个端口,也就是说,如果程序只需要监听1个端口的话,bossGroup里面只需要有一个NioEventLoop线程就行了。

在上一篇文章介绍服务器端绑定的过程中,我们看到最后是NioServerSocketChannel封装的Java的ServerSocketChannel执行了绑定,并且执行accept()方法来创建客户端SocketChannel的连接。一个端口只需要一个NioServerSocketChannel即可。

RocketMQ NettyRemotingServer

    ServerBootstrap childHandler =
            // EventLoopGroup: bossGroup, workerGroup
            this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector)
                .channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
                .option(ChannelOption.SO_BACKLOG, 1024)
                .option(ChannelOption.SO_REUSEADDR, true)
                .option(ChannelOption.SO_KEEPALIVE, false)
                .childOption(ChannelOption.TCP_NODELAY, true)
                .childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize())
                .childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize())
                .localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort()))
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    public void initChannel(SocketChannel ch) throws Exception {
                        ch.pipeline()
                            // worker2
                            // defaultEventExecutorGroup用于执行pipeline里的channelHandlers
                            // 如果没有设置defaultEventExecutorGroup,则用workerGroup执行
                            .addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME,
                                new HandshakeHandler(TlsSystemConfig.tlsMode))
                            .addLast(defaultEventExecutorGroup,
                                new NettyEncoder(),
                                new NettyDecoder(),
                                new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
                                new NettyConnectManageHandler(),
                                // NettyServerHandler里有业务处理线程池
                                new NettyServerHandler()
                            );
                    }
                });

boss,worker,worker2实质都用的是 MultithreadEventExecutorGroup, 其维护的nThreads个executor都是ThreadPerTaskExecutor

protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) {
    if (nThreads <= 0) {
        throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
    }
 		
    // 使用者没传threadFactory时,executor = null
    // 传了时,executor = new ThreadPerTaskExecutor(threadFactory)
    if (executor == null) {
        executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
    }
 
    children = new EventExecutor[nThreads];
    for (int i = 0; i < nThreads; i ++) {
        boolean success = false;
        try {
            children[i] = newChild(executor, args);
            success = true;
        } catch (Exception e) {
            // TODO: Think about if this is a good exception type
            throw new IllegalStateException("failed to create a child event loop", e);
        } finally {

第二个概念是每个NioEventLoop都绑定了一个Selector,所以在Netty4的线程模型中,是由多个Selecotr在监听IO就绪事件。而Channel注册到Selector。

举个例子,比如有100万个连接连到服务器端。平时的写法可能是1个Selector线程监听所有的IO就绪事件,1个Selector面对100万个连接(Channel)。

而如果使用了1000个NioEventLoop的线程池来说,1000个Selector面对100万个连接,每个Selector只需要关注1000个连接(Channel)

public final class NioEventLoop extends SingleThreadEventLoop {
    /**
     * The NIO {@link Selector}.
     */
    Selector selector;
    private SelectedSelectionKeySet selectedKeys;
 
    private final SelectorProvider provider;

第三个概念是一个Channel绑定一个NioEventLoop,相当于一个连接绑定一个线程,这个连接所有的ChannelHandler都是在一个线程中执行的,避免的多线程干扰。更重要的是ChannelPipline链表必须严格按照顺序执行的。单线程的设计能够保证ChannelHandler的顺序执行。

public interface Channel extends AttributeMap, Comparable<Channel> {
/**
 * Return the {@link EventLoop} this {@link Channel} was registered too.
 */
    EventLoop eventLoop();

第四个概念是一个NioEventLoop的selector可以被多个Channel注册,也就是说多个Channel共享一个EventLoop。EventLoop的Selecctor对这些Channel进行检查。

这段代码展示了线程池如何给Channel分配EventLoop,是根据Channel个数取模

public EventExecutor next() {
    return children[Math.abs(childIndex.getAndIncrement() % children.length)];
}

private void processSelectedKeysOptimized(SelectionKey[] selectedKeys) {
    for (int i = 0;; i ++) {
        // 逐个处理注册的Channel
        final SelectionKey k = selectedKeys[i];
        if (k == null) {
            break;
        }
        
        final Object a = k.attachment();
 
        if (a instanceof AbstractNioChannel) {
            processSelectedKey(k, (AbstractNioChannel) a);
        } else {
            @SuppressWarnings("unchecked")
            NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
            processSelectedKey(k, task);
        }
 
        if (needsToSelectAgain) {
            selectAgain();
            // Need to flip the optimized selectedKeys to get the right reference to the array
            // and reset the index to -1 which will then set to 0 on the for loop
            // to start over again.
            //
            // See https://github.com/netty/netty/issues/1523
            selectedKeys = this.selectedKeys.flip();
            i = -1;
        }
    }
}

理解了这4个概念之后就对Netty4的线程模型有了清楚的认识:

在监听一个端口的情况下,一个NioEventLoop通过一个NioServerSocketChannel监听端口,处理TCP连接。后端多个工作线程NioEventLoop处理IO事件。每个Channel绑定一个NioEventLoop线程,1个NioEventLoop线程关联一个selector来为多个注册到它的Channel监听IO就绪事件。NioEventLoop是单线程执行,保证Channel的pipline在单线程中执行,保证了ChannelHandler的执行顺序。

下面这张图,基本能说清楚。

img

首页

Java核心技术

Netty

RocketMQ深入研究

kafka深入研究

Pulsar深入研究

Dubbo源码导读

微服务架构

Redis

Elasticsearch

其他

杂谈

关于我

Clone this wiki locally