Skip to content

Netty 线程模型

javahongxi edited this page Aug 20, 2019 · 33 revisions

RocketMQ NettyRemotingServer

    ServerBootstrap childHandler =
            // EventLoopGroup: bossGroup, workerGroup
            this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector)
                .channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
                .option(ChannelOption.SO_BACKLOG, 1024)
                .option(ChannelOption.SO_REUSEADDR, true)
                .option(ChannelOption.SO_KEEPALIVE, false)
                .childOption(ChannelOption.TCP_NODELAY, true)
                .childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize())
                .childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize())
                .localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort()))
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    public void initChannel(SocketChannel ch) throws Exception {
                        ch.pipeline()
                            // DefaultEventExecutorGroup: optionalGroup (相当于netty提供的业务线程池)
                            .addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME,
                                new HandshakeHandler(TlsSystemConfig.tlsMode))
                            .addLast(defaultEventExecutorGroup,
                                new NettyEncoder(),
                                new NettyDecoder(),
                                new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
                                new NettyConnectManageHandler(),
                                // NettyServerHandler里有业务处理线程池
                                new NettyServerHandler()
                            );
                    }
                });

bossGroup,workerGroup,optionalGroup实质都用的是 MultithreadEventExecutorGroup, 其维护的nThreads个executor都是ThreadPerTaskExecutor

protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) {
    if (nThreads <= 0) {
        throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
    }
 		
    // 使用者没传threadFactory时,executor = null
    // 传了时,executor = new ThreadPerTaskExecutor(threadFactory)
    if (executor == null) {
        executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
    }
 
    children = new EventExecutor[nThreads];
    for (int i = 0; i < nThreads; i ++) {
        boolean success = false;
        try {
            children[i] = newChild(executor, args);
            success = true;
        } catch (Exception e) {
            // TODO: Think about if this is a good exception type
            throw new IllegalStateException("failed to create a child event loop", e);
        } finally {
            // ...
        }
    }
}

img

ServerBootstrap (main线程)

    @Override
    void init(Channel channel) throws Exception {
        // ...

        ChannelPipeline p = channel.pipeline();

        final EventLoopGroup currentChildGroup = childGroup;
        final ChannelHandler currentChildHandler = childHandler;
        final Entry<ChannelOption<?>, Object>[] currentChildOptions;
        final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
        synchronized (childOptions) {
            currentChildOptions = childOptions.entrySet().toArray(newOptionArray(0));
        }
        synchronized (childAttrs) {
            currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(0));
        }
        // ChannelInitializer的特点是执行initChannel后会随即被删除
        p.addLast(new ChannelInitializer<Channel>() {
            @Override
            public void initChannel(final Channel ch) throws Exception {
                final ChannelPipeline pipeline = ch.pipeline();
                ChannelHandler handler = config.handler();
                if (handler != null) {
                    pipeline.addLast(handler);
                }
                // 服务端channel的eventLoop去执行如下task,接下来我们关注eventLoop.execute(Runnable task)是怎么执行的
                ch.eventLoop().execute(new Runnable() {
                    @Override
                    public void run() {
                        pipeline.addLast(new ServerBootstrapAcceptor(
                                ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                    }
                });
            }
        });
    }

NioEventLoop extends SingleThreadEventLoop extends SingleThreadEventExecutor
SingleThreadEventExecutor implements Executor

    private final Queue<Runnable> taskQueue;

    private volatile Thread thread;
    
    private final Executor executor; // ThreadPerTaskExecutor

    @Override
    public void execute(Runnable task) {
        if (task == null) {
            throw new NullPointerException("task");
        }

        // this.thread == Thread.currentThread()
        // 第一次为false,以为刚开始thread为null
        boolean inEventLoop = inEventLoop();
        addTask(task); // taskQueue
        if (!inEventLoop) {
            startThread(); // 接下来看这个
            if (isShutdown() && removeTask(task)) {
                reject();
            }
        }

        if (!addTaskWakesUp && wakesUpForTask(task)) {
            wakeup(inEventLoop);
        }
    }
    private void doStartThread() {
        assert thread == null;
        // ThreadPerTaskExecutor
        executor.execute(new Runnable() {
            @Override
            public void run() {
                // 当前线程即ThreadPerTaskExecutor创建的线程
                thread = Thread.currentThread();
                if (interrupted) {
                    thread.interrupt();
                }

                boolean success = false;
                updateLastExecutionTime();
                try {
                    SingleThreadEventExecutor.this.run(); // 接下来看这个
                    success = true;
                } catch (Throwable t) {
                    logger.warn("Unexpected exception from an event executor: ", t);
                } finally {
                    // ...
                }
            }
        });
    }

NioEventLoop

    @Override
    protected void run() {
        for (;;) {
            try {
                // 我们以为是select(),其实可能是selectNow(),前者阻塞,后者非阻塞
                switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
                    case SelectStrategy.CONTINUE: // -2
                        continue;

                    case SelectStrategy.BUSY_WAIT: // -3
                        // fall-through to SELECT since the busy-wait is not supported with NIO

                    case SelectStrategy.SELECT: // -1
                        select(wakenUp.getAndSet(false));

                        if (wakenUp.get()) {
                            selector.wakeup();
                        }
                    default:
                }
                // 当hasTasks()为true时,以上case都不满足,直接走到这里
                cancelledKeys = 0;
                needsToSelectAgain = false;
                final int ioRatio = this.ioRatio;
                if (ioRatio == 100) {
                    try {
                        processSelectedKeys();
                    } finally {
                        // Ensure we always run tasks.
                        runAllTasks();
                    }
                } else { // 默认
                    final long ioStartTime = System.nanoTime();
                    try {
                        processSelectedKeys(); // 接下来看这里
                    } finally {
                        // Ensure we always run tasks.
                        final long ioTime = System.nanoTime() - ioStartTime;
                        runAllTasks(ioTime * (100 - ioRatio) / ioRatio); // 接下来看这里
                    }
                }
            } catch (Throwable t) {
                handleLoopException(t);
            }
            // ...
        }
    }

DefaultSelectStrategy

    @Override
    public int calculateStrategy(IntSupplier selectSupplier, boolean hasTasks) throws Exception {
        return hasTasks ? selectSupplier.get() : SelectStrategy.SELECT; // 显然有task时返回值 >= 0
    }

NioEventLoop

    private final IntSupplier selectNowSupplier = new IntSupplier() {
        @Override
        public int get() throws Exception {
            return selectNow(); // >= 0
        }
    };

首页

Java核心技术

Netty

RocketMQ深入研究

kafka深入研究

Pulsar深入研究

Dubbo源码导读

微服务架构

Redis

Elasticsearch

其他

杂谈

关于我

Clone this wiki locally