Netty-常用 ChannelHandler


  • 粘包/半包
  • 编解码器
    • SSL/TLS
    • HTTP
  • 序列化
  • ChannelHandler 单元测试
  • channelRead 和 channelReadComplete
    • 读到完整的业务请求报文后才调用一次业务ChannelHandler 的 channelReadComplete 方法
      • 无论这条报文底层经过了几次SocketChannel 的read 调用
    • channelRead 在每次从SocketChannel 成功读到消息后,由系统触发
      • 如果一个业务消息被TCP协议栈发送了N 次,则服务端的 channelRead 方法就会被调用 N 次

粘包/半包

  • 主流协议的解决方案
    • 在包尾增加分割符,比如回车换行符进行分割,例如FTP 协议
    • 消息定长,例如每个报文的大小为固定长度200 字节,如果不够,空位补空格
    • 将消息分为消息头和消息体,消息头中包含表示消息总长度(或者消息体长度)的字段
      • 通常设计思路为消息头的第一个字段使用int32 来表示消息的总长度,使用LengthFieldBasedFrameDecoder

编解码器

  • 解码器
    • 将字节解码为消息 ByteToMessageDecoder<I>
      • 由于不可能知道远程节点是否会一次性地发送一个完整的消息,所以这个类会对入站数据进行缓冲,直到它准备好处理
      • decode()方法被调用时将会传入一个包含了传入数据的ByteBuf,以及一个用来添加解码消息的List
        • 对这个方法的调用将会重复进行,直到确定没有新的元素被添加到该List,或者该ByteBuf 中没有更多可读取的字节时为止
        • 如果该List 不为空,那么它的内容将会被传递给ChannelPipeline 中的下一个ChannelInboundHandler
    • 将一种消息类型解码为另一种 MessageToMessageDecoder<T>
    • TooLongFrameException:其将由解码器在帧超出指定的大小限制时抛出
      • 由于Netty 是一个异步框架,所以需要在字节可以解码之前在内存中缓冲它们
      • 因此,不能让解码器缓冲大量的数据以至于耗尽可用的内存
    • 可以设置一个最大字节数的阈值,如果超出该阈值,则会导致抛出一个TooLongFrameException
      • 如何处理该异常则完全取决于该解码器的用户
        • 可能返回一个特殊的响应;可能就是关闭对应的连接
      • 位于ChannelPipeline 中的其他ChannelHandler 可以选择在exceptionCaught()方法中处理该异常或者忽略它
  • 编码器
    • 将消息编码为字节 MessageToByteEncoder<I>
    • 将消息编码为消息 MessageToMessageEncoder<T>
  • 编解码器:同时实现了ChannelInboundHandler 和 ChannelOutboundHandler
    • ByteToMessageCodec MessageToMessageCodec

SSL/TLS

  • SSLContext 和 SSLEngine 类实现解密和加密
  • Netty 通过一个名为SslHandler 的ChannelHandler 实现利用了这个 API
    • 其中SslHandler 在内部使用SSLEngine 来完成实际的工作
  • 在大多数情况下,SslHandler 将是ChannelPipeline 中的第一个ChannelHandler

HTTP

  • 所有类型的 HTTP 消息都实现了HttpObject 接口
  • HttpRequestEncoder 将HttpRequest、HttpContent 和LastHttpContent 消息编码为字节
  • HttpResponseEncoder 将HttpResponse、HttpContent 和LastHttpContent 消息编码为字节
  • HttpRequestDecoder 将字节解码为HttpRequest、HttpContent 和LastHttpContent 消息
  • HttpResponseDecoder 将字节解码为HttpResponse、HttpContent 和LastHttpContent 消息
  • HttpClientCodec 和HttpServerCodec 则将请求和响应做了一个组合
  • 聚合 HTTP 消息 HttpObjectAggregator:可以将多个消息部分合并为FullHttpRequest 或者FullHttpResponse 消息
  • HTTP 压缩:Netty 为压缩和解压缩提供了ChannelHandler 实现,它们同时支持gzip 和deflate 编码
  • HTTPS:启用HTTPS 只需要将SslHandler 添加到ChannelPipeline 的ChannelHandler 组合中

序列化

  • Java 序列化的缺点
    • 无法跨语言
    • 序列化后的码流太大
    • 序列化性能太低
  • Netty 内置了对 JBoss Marshalling 和 Protocol Buffers 的支持
  • protobuf
    • 发送
      • 添加消息长度 ProtobufVarint32LengthFieldPrepender
      • 序列化 ProtobufEncoder
    • 接收
      • 去除消息长度 ProtobufVarint32FrameDecoder
      • 反序列化 ProtobufDecoder
  • 集成第三方的序列化框架 MessagePack
    • 需要自定义消息头 LengthFieldBasedFrame
    • 然后在发送端添加消息头,接收端去除消息头
  • LengthFieldBasedFrame
    • maxFrameLength:表示的是包的最大长度
    • lengthFieldOffset:指的是长度域的偏移量,表示跳过指定个数字节之后的才是长度域
    • lengthFieldLength:记录该帧数据长度的字段,也就是长度域本身的长度
    • lengthAdjustment:长度的一个修正值,可正可负
      • Netty 在读取到数据包的长度值N 后,认为接下来的N 个字节都是需要读取的
      • 但是根据实际情况,有可能需要增加N 的值,也有可能需要减少N 的值
      • 具体增加多少,减少多少,写在这个参数里
    • initialBytesToStrip:从数据帧中跳过的字节数
      • 表示得到一个完整的数据包之后,扔掉这个数据包中多少字节数,才是后续业务实际需要的业务数据
    • failFast:建议不要修改,否则可能会造成内存溢出
      • 如果为true(默认),则表示读取到长度域
        • TA 的值的超过maxFrameLength,就抛出一个TooLongFrameException
      • 为false 表示只有当真正读取完长度域的值表示的字节之后,才会抛出TooLongFrameException

ChannelHandler 单元测试

  • EmbeddedChannel
    • 将入站数据或者出站数据写入到EmbeddedChannel 中,然后检查是否有任何东西到达了ChannelPipeline 的尾端
      • 以这种方式,你便可以确定消息是否已经被编码或者被解码过了,以及是否触发了任何的ChannelHandler 动作
    • EmbeddedChannel
    • writeInbound 将入站消息写到EmbeddedChannel 中
      • 如果可以通过readInbound()方法从EmbeddedChannel 中读取数据,则返回true
    • readInbound 从EmbeddedChannel 中读取一个入站消息
      • 任何返回的东西都穿越了整个ChannelPipeline。如果没有任何可供读取的,则返回null
    • writeOutbound 将出站消息写到EmbeddedChannel 中
      • 如果现在可以通过readOutbound()方法从EmbeddedChannel 中读取到什么东西,则返回true
    • readOutbound 从EmbeddedChannel 中读取一个出站消息
      • 任何返回的东西都穿越了整个ChannelPipeline。如果没有任何可供读取的,则返回null
    • finish 将EmbeddedChannel 标记为完成
      • 并且如果有可被读取的入站数据或者出站数据,则返回true
      • 这个方法还将会调用EmbeddedChannel 上的close()方法
    • 使用writeOutbound()方法将消息写到Channel 中,并通过ChannelPipeline 沿着出站的方向传递。随后,你可以使用readOutbound()方法来读取已被处理过的消息,以确定结果是否和预期一样。类似地,对于入站数据,你需要使用writeInbound()和readInbound()方法
    • 在每种情况下,消息都将会传递过ChannelPipeline,并且被相关的 ChannelInboundHandler 或者ChannelOutboundHandler 处理

文章作者: 钱不寒
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 钱不寒 !
  目录