Netty 简介

Netty 是个高性能NIO框架,它通过事件处理模型,隐藏了异步处理的细节,使我们在应用层仅需要关注消息对象的处理,而不用关心调度过程。同时 Netty 提供了很多协议栈的实现,大大简化了网络相关应用的开发难度。本文是Netty的入门指南,着重在讲清Netty和异步网络接口的基本概念,避免入坑。

Tip
Netty 当前稳定版是 4.x,而 5.x 是个废弃的版本。 作者说明

1. 核心概念

1.1. Channel

Channel代表一条传输链路,其底层可以是各种传输层协议,UDP、TCP、SCTP等等,可以在channel上进行读写操作。

1.2. Channel Pipeline

Netty认为数据流可以被多个 Handler 链式处理,每个 handler 接受前一个 handler 处理的结果,并输出给下个 handler,这个流程关系就是 pipeline。

例如我们处理HTTPS请求的过程大致如下:

  1. 将加密数据解码为字节流

  2. 对字节流解析出http消息

  3. 如果是chunked类型的http消息,我们可以做一次聚合

  4. 业务处理并返回response对象

  5. 将返回对象序列化为字节流

  6. SSL加密后返回给客户端

Figure 1. Channel pipeline执行顺序

在对应pipeline中,handler可能是以如下顺序被依次注册,netty会按我们注册的顺序先依次调用所有的inbound handler,直到处理完所有读取事件后,在写事件中再调用outbound handler。

ChannelPipeline p = ...;
...
// addLast() 方法将 handler 加入链尾
p.addLast("encoder", new HttpResponseEncoder()); (1)
p.addLast("decoder", new HttpRequestDecoder());  (2)
p.addLast("aggregator", new HttpObjectAggregator(1048576));
...
p.addLast("handler", new BusinessHandler());

需要注意的是,在handler中调用 ctx.write() 方法时,只会触发注册顺序在当前handler之前的handler。例如上例代码,如果encoder和decoder注册顺序反过来,那么在decoder中调用 ctx.write() 将不会触发 encoder。这时虽然可以通过 channel.write() 触发整个调用链,但相对低效,并不推荐。

1.3. Events & Handlers

Netty使用事件驱动,利用事件在handler之间传递消息,例如上面的http message传递就是由前一个 handler 调用 channelHandlerContext.fireChannelRead(msg) 方法来通知下一个 handler 处理 ChannelRead 事件,并且这个事件中携带了 msg 这个对象。

事件和 inbound/outbound 数据流相关。Inbound事件有以下几种:

  • Channel激活和失活

  • 读操作事件

  • 异常事件

  • 用户事件,例如心跳

Outbound事件则比较简单,通常是打开/关闭连接或写入/刷新数据。

Netty 应用程序则由 inbound 和 outbound 两类事件及其处理程序组成。事件处理的基本接口是 ChannelOutboundHandler、ChannelInboundHandler。Netty本身提供了庞大的ChannelHandler实现层次结构。很多适配器虽然只是一些空的实现,例如 ChannelInboundHandlerAdapter 和 ChannelOutboundHandlerAdapter。但当只需要处理少数事件时,我们可以方便的继承这些适配器。此外,Netty 还有许多特定协议的实现,例如对HTTP协议提供了HttpRequestDecoder, HttpResponseEncoder, HttpObjectAggregator 等 handler。

1.4. Encoders & Decoders

从网络层到应用层的数据转换通常涉及数据序列化和反序列化,因此Netty引入了 EncoderDecoder 概念,但它们本身也都是 Handler。其中 Decoder 负责反序列化,通常可以基于 ByteToMessageDecoder 开发自己的解码器。对应的,Encoder 通常基于 MessageToByteEncoder 开发。Netty自己也带了大量编解码器,可以处理常见协议。

1.5. Future

Netty中所有的IO操作都是异步的。类似Java标准库中的异步操作,Netty也提供了 Future 接口,但不同的是Netty的Future对完成状态有更精细的定义,并可以添加 FutureListener 作为回调,以便操作完成后被调用。

                                      +---------------------------+
                                      | Completed successfully    |
                                      +---------------------------+
                                 +---->      isDone() = true      |
 +--------------------------+    |    |   isSuccess() = true      |
 |        Uncompleted       |    |    +===========================+
 +--------------------------+    |    | Completed with failure    |
 |      isDone() = false    |    |    +---------------------------+
 |   isSuccess() = false    |----+---->      isDone() = true      |
 | isCancelled() = false    |    |    |       cause() = non-null  |
 |       cause() = null     |    |    +===========================+
 +--------------------------+    |    | Completed by cancellation |
                                 |    +---------------------------+
                                 +---->      isDone() = true      |
                                      | isCancelled() = true      |
                                      +---------------------------+
Note
注意千万不要在ChannelHandler中调用 await()sync(),Handler 中的操作都应该是异步的。

2. 服务端应用

2.1. Server初始化

服务端的启动主要就是通过 ServerBootstrap 对象来设置服务器的线程池、socket参数和Channel Pipeline,我们来看看一个完整的服务端例子。

@Slf4j
public class RpcServer {
    EventLoopGroup bossGroup = new NioEventLoopGroup(); (1)
    EventLoopGroup workerGroup = new NioEventLoopGroup(new DefaultThreadFactory("server")); (2)

    private int port;
    public RpcServer(int port) throws Exception {
        this.port = port;
        this.run();
    }

    public void stop(){  (3)
        bossGroup.shutdownGracefully();
        workerGroup.shutdownGracefully();
    }

    private void run() throws Exception {
        ServerBootstrap b = new ServerBootstrap();
        b.group(bossGroup, workerGroup);
        b.channel(NioServerSocketChannel.class);
        b.childHandler(new ChannelInitializer<SocketChannel>() {
            @Override
            public void initChannel(SocketChannel ch) throws Exception {
                ch.pipeline()
                        .addLast(new LoggingHandler(LogLevel.INFO))  (4)
                        .addLast(new ResponseEncoder(),     (5)
                                 new RequestDecoder(),      (6)
                                 new ProcessingHandler());  (7)
            }
        });
        b.option(ChannelOption.SO_BACKLOG, 128);   (8)
        b.childOption(ChannelOption.SO_KEEPALIVE, true);  (9)

        ChannelFuture f = b.bind(port).sync();
        f.channel().closeFuture().addListener((ChannelFutureListener) future -> {
            log.info(future.channel().toString() + " 链路关闭");
            stop();
        });
    }
}
  1. 用于acceptor的线程池。

  2. 用于worker的线程池,这个池理论上应该仅用于IO操作,如果你的Handler里面有阻塞操作,考虑将任务提交到单独的线程池,而不是扩展worker池大小。我们这里配置了线程组的名字,方便thread dump的时候做区分。

  3. EventLoopGroup启动后不会主动退出,如果想关闭服务器,需要主动调用 shutdown 方法。

  4. 我们将 LoggingHandler 作为第一个 handler 加入channel pipeline,以便核对入站和出站的字节流。当然也可以将它放在 decoder 后面,用于记录解码后的对象。

  5. Response对象的encoder,用于将对象序列化为字节流。

  6. Request对象的decoder,用于从字节流提取并反序列化对象。注意我们将decoder放在encoder后面,这样如果decoder出现问题,可以直接回复Response对象,不需要经过后面的业务handler。

  7. 用于处理业务逻辑的handler。

  8. 设置boss线程组的socket参数。

  9. 设置worker线程组的socket参数。

2.2. 消息编码与解码

Netty通过实现 ChannelOutboundHandler 接口的 write 方法来处理消息编码,但为了简化 ByteBuf 的内存分配和释放操作,我们通常继承 MessageToByteEncoder 来处理消息的序列化。但除了序列化,我们还需要考虑TCP发送的时候,由于缓存区大小、MSS、MTU等因素导致的粘包拆包问题,最通用的解决办法就是使用定长消息头,并在其中包含消息体的长度信息。这样解码的时候就可以先读取一个定长字节,然后根据长度信息对消息体解码。

RequestEncoder
public class RequestEncoder extends MessageToByteEncoder<RpcRequest> {
    private final Charset charset = Charset.forName("UTF-8");
    private Gson gson = new Gson();

    @Override
    protected void encode(ChannelHandlerContext ctx, RpcRequest msg, ByteBuf out) throws Exception {
        String json = gson.toJson(msg);
        out.writeInt(json.length());  (1)
        out.writeCharSequence(json, charset);
    }
}
  1. 写入消息体长度。Netty提供了 LengthFieldPrepender 帮我们自动添加 length 域到header里面,但这里为简化起见,直接调用 writeInt 写入长度。

对于带有长度信息的消息解码,可以利用 LengthFieldBasedFrameDecoder 处理粘包拆包。但我们的封包格式非常简单,这里就直接读取了。我们使用了 ReplayingDecoder 来简化读取过程,ReplayingDecoder 会在读取指定长度的内容前调用 buf.readableBytes() 检查剩余字节,如果不满足则重置 readerIndex,等待下次读取。当然 ReplayingDecoder 这种做法在慢速网络里容易引发反复读取的性能问题,我们暂时不考虑这点。同时由于长度一定是正整数,我们还可以利用负数当 keep alive 的消息包,例如下面读到 -1 后直接回个 -1。

RequestDecoder
public class RequestDecoder extends ReplayingDecoder<RpcRequest> {
    private final Charset charset = Charset.forName("UTF-8");
    private Gson gson = new Gson();

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        int strLen = in.readInt();
        if (strLen == -1) {
            ctx.writeAndFlush(ctx.alloc().buffer(4).writeInt(-1));
            return;
        }
        RpcRequest request = gson.fromJson(in.readCharSequence(strLen, charset).toString(), RpcRequest.class);
        out.add(request);
    }
}

2.3. 异常处理

Netty中的异常处理也是基于事件的,框架捕捉到异常后会触发异常事件,用户只需要在自己的 Handler 里面重写 exceptionCaught 方法,进行异常处理即可。异常也可以通过 ctx.fireExceptionCaught(cause) 传递给下一个 handler,这样可以方便统一处理异常。

public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
        throws Exception {
    log.info(cause.getLocalizedMessage(), cause);
    //do more exception handling
    ctx.close();
}

3. 客户端应用

3.1. Client初始化

客户端的初始化跟服务端类似,只是由 bind 变成了 connect,当然也不需要acceptor线程池。当连接建立之后我们会收到 channelActive 事件,对于可以立即初始化的操作,比如 Lightweight M2M bootstrap 消息我们可以在这里发送。但更常见的设计是将 Netty 的 handler 封装成协议层组件,由更上层的应用层逻辑控制消息收发。例如示例代码的 connect 和 send 操作。

public void connect(String host, int port) {
    EventLoopGroup workerGroup = new NioEventLoopGroup();
    Bootstrap b = new Bootstrap();
    b.group(workerGroup);
    b.channel(NioSocketChannel.class);
    b.option(ChannelOption.SO_KEEPALIVE, true);
    b.handler(new ChannelInitializer<SocketChannel>() {
        @Override
        public void initChannel(SocketChannel ch) throws Exception {
            ch.pipeline()
                .addLast("idleStateHandler", new IdleStateHandler(10, 5, 0))
                .addLast(new RequestEncoder(), new ResponseDecoder(),
                         new IdleHandler(), new ClientHandler());
        }
    });

    ChannelFuture f = b.connect(host, port).sync();  (1)
    this.channel = f.channel();  (2)
}

public void send(Request request) {
    channel.writeAndFlush(request);
}
  1. 因为连接建立之前一般做不了什么,使用同步方式建立连接可以简化后续代码。

  2. 通常需要获取 channel 对象方便客户端主动发送消息。

3.2. 事件处理

除了用于收发消息的读写事件,用户事件也是非常常用的。例如上面客户端的代码我们使用了 IdleStateHandler 生成 idle 事件。IdleStateHandler 内部有个定时器计算读或写事件上分别有多长时间的 idle 状态,达到设定的时长后则调用 ctx.fireUserEventTriggered(evt) 发送 IdleStateEvent,后续 handler 通过重写 userEventTriggered 方法处理事件。例如下面代码我们收到 READER_IDLE 后主动发送一个 int 用于保持连接。

public class IdleHandler extends ChannelDuplexHandler {
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt)
            throws Exception {
        if (evt instanceof IdleStateEvent) {
            IdleStateEvent e = (IdleStateEvent) evt;
            if (e.state() == IdleState.READER_IDLE) {
                ctx.writeAndFlush(ctx.alloc().buffer(4).writeInt(-1));
            } else if (e.state() == IdleState.WRITER_IDLE) {
                //do nothing
            }
        }
    }
}

4. 内部机制

4.1. 线程模型

在 Netty 中每个 Channel 创建的时候都会被 EventLoopGroup 以 round robin 策略分配给一个 EventLoop,并保证在 Channel 的整个生命周期都由这个 EventLoop 处理上面的事件。而 EventLoop 背后则是一个线程,与线程一对一绑定。EventLoop 不断的监听网络事件,并将事件分发给 ChannelHandler。ChannelHandler 的执行也是在当前 EventLoop 线程中,一旦 handler 中的处理出现了阻塞,会导致一组 Channel 无法及时处理。

Figure 2. EventLoop分配模型,摘自 https://www.jianshu.com/p/95513325d439

从上图的分配模型我们也可以很容易的看出,Netty被设计为使用少数线程处理大量 Channel,如果业务的连接数较少,将无法充分发挥服务器性能。

4.2. ByteBuf

上面的例子我们看到最终从channel里面读写的都是ByteBuf对象,ByteBuf 是最值得注意的类型, 它利用引用计数来提高内存分配和释放的性能。相对JVM的GC算法,单纯的引用计数性能要好得多,但同时它也容易引起内存泄漏。这里我们看看Netty是如何管理引用计数,以及我们在编码中需要注意的事项。

ByteBuf 在初次分配的时候,引用计数为 1,当我们读完 ByteBuf 之后应调用 release() 方法,将引用计数减一,以便netty可以回收该ByteBuf使用的内存段。

ByteBuf buf = ctx.alloc().directBuffer();
assert buf.refCnt() == 1;
boolean destroyed = buf.release();
assert destroyed;
assert buf.refCnt() == 0;

但在前面Decoder的例子中,我们并没有手工release,这会导致内存泄漏吗?通过阅读 ByteToMessageDecoder.channelRead() 方法的源码,可以发现 ByteToMessageDecoder 已经帮我们做了 release,无需我们再手工管理。但如果是自行实现 channelRead() 接口,则必须考虑 ByteBuf 的 release。关于引用计数更详细的说明可以参考官方wiki Reference counted objects

4.3. Native epoll

Netty 的 epoll 传输层使用了 epoll 边界触发模式(edge-triggered), 这比 Java NIO 提供的水平触发模式(level-triggered) 可能有更好的性能。同时它支持NIO不支持一些选项,例如 TCP_CORK, SO_REUSEPORT 等等。

Netty 中我们只需要简单判断下当前系统是否支持 Epoll 即可将 EventLoopGroup 和 channelClass 切换到 Epoll 版本上。

EventLoopGroup bossGroup = Epoll.isAvailable() ? new EpollEventLoopGroup() : new NioEventLoopGroup();
new ServerBootstrap().group(bossGroup, bossGroup)
    .channel(Epoll.isAvailable() ? EpollServerSocketChannel.class : NioServerSocketChannel.class);