Dubbo-Triple


  • Triple的底层原理分析
  • Triple请求调用和响应处理
  • Triple请求处理和响应结果发送
  • 总结

  • Dubbo协议相比较于Http1.1而言性能上是要更好的
    • 但是Dubbo协议自己的缺点就是不通用
  • Triple协议是基于Http2协议的
    • HTTP2协议从设计层面就解决了HTTP1性能低的问题
    • Google公司开发的gRPC,也基于的HTTP2
    • k8s/etcd等都支持gRPC协议

Triple的底层原理分析

  • 就是因为HTTP2中的数据帧机制,Triple协议才能支持UNARY、SERVER_STREAM、BI_STREAM三种模式
  • UNARY:就是最普通的
    • 服务端只有在接收到完请求包括的所有的HEADERS帧和DATA帧之后 (通过调用onCompleted()发送最后一个DATA帧),才会处理数据
    • 客户端也只有接收完响应包括的所有的HEADERS帧和DATA帧之后,才会处理响应结果
  • SERVER_STREAM:服务端流
    • 服务端在接收完请求包括的所有的DATA帧之后,才会处理数据
    • 不过在处理数据的过程中,可以多次发送响应DATA帧(第一个DATA帧发送之前会发送一个HEADERS帧)
    • 客户端每接收到一个响应DATA帧就可以直接处理该响应DATA 帧
    • 这个模式下,客户端只能发一次数据,但能多次处理响应DATA帧
    • gRPC的效果是正确的,Dubbo3.0需要异步进行发送
  • BI_STREAM:双端流,或者客户端流
    • 客户端可以控制发送多个请求DATA帧(第一个DATA帧发送之前会发送一个HEADERS帧)
    • 服务端会不断的接收到请求DATA帧并进行处理,并且及时的把处理结果作为响应DATA帧发送给客户端(第一个DATA帧发送之前会发送一个HEADERS帧)
    • 而客户端每接收到一个响应结果DATA帧也会直接处理
    • 客户端和服务端都在不断的接收和发送DATA帧并进行处理
      • 请求HEADER帧和响应HEADERS帧都只发了一个

Triple请求调用和响应处理

  • 在服务提供者进行服务导出时,会按照协议以及对应的端口启动Server
    • 比如Triple协议就会启动Netty并绑定指定的端口,等待Socket连接
  • 在进行服务消费者进行服务引入的过程中,会生成TripleInvoker对象
    • 在构造TripleInvoker对象的构造方法中,会利用ConnectionManager创建一个Connection对象
    • 而Connection对象中包含了一个Bootstrap对象(Netty中用来建立Socket连接的)
  • 不过以上都只是创建对象,并不会真正和服务去建立Socket连接
    • 所以在生成TripleInvoker对象过程中不会真正去创建Socket连接
  • 当我们在服务消费端执行以下代码时 demoService.sayHello("demo")
    • demoService是一个代理对象,在执行方法的过程中,最终会调用TripleInvoker的doInvoke()方法
    • 在doInvoke()方法中,会利用Connection对象来判断Socket连接是否可用
    • 如果不可用并且没有初始化,那就会创建Socket连接
  • 一个Connection对象就表示一个Socket连接,在TripleInvoker对象中也只有一个Connection对象
    • 也就是一个TripleInvoker对象只对应一个Socket连接
      • 这个和DubboInvoker不太一样,一个DubboInvoker中可以有多个ExchangeClient
        • 每个ExchangeClient都会与服务端创建一个Socket连接
        • 所以一个DubboInvoker可以对应多个Socket连接
        • 当然多个Socket连接的目的就是提高并发
      • 不过在TripleInvoker对象中就不需要这么来设计了,因为可以Stream机制来提高并发
  • 当我们利用服务接口的代理对象执行方法时就会创建一个Socket连接
    • 就算这个代理对象再次执行方法时也不会再次创建Socket连接了
  • 有可能两个服务接口对应的是一个Socket连接
    • 比如服务提供者应用A,提供了DemoService和HelloService两个服务
    • 服务消费者应用B引入了这两个服务
    • 那么在服务消费者这端,这个两个接口对应的代理对象对应的TripleInvoker是不同的两个
    • 但是这两个TripleInvoker会公用一个Socket连接
    • 因为ConnectionManager在创建Connection对象时会根据服务URL的address进行缓存
    • 后续这两个代理对象在执行方法时使用的就是同一个Socket连接,但是是不同的Stream
  • Socket连接创建好之后,就需要发送Invocation对象给服务提供者了
    • 因为是基于的HTTP2,所以要先创建一个Stream,然后再通过Stream来发送数据
  • TripleInvoker中用的是Netty,所以最终会利用Netty来创建Stream,对应的对象为Http2StreamChannel
  • 消费端的TripleInvoker最终会利用Http2StreamChannel来发送和接收数据帧
    • 数据帧对应的对象为Http2Frame,它又分为Http2DataFrame、Http2HeadersFrame等
  • 正常情况下,会每生成一个数据帧就会通过Http2StreamChannel发送出去
    • 但是在Triple中有一个小小的优化,会有一个批量发送的思想
    • 当要发送一个数据帧时,会先把数据帧放入一个WriteQueue中
    • 然后会从线程池中拿到一个线程调用WriteQueue的flush方法
      • 连续从队列中取到了128个数据帧就flush一次
  • 只要向WriteQueue中添加一个数据帧之后,那就会尝试开启一个线程,要不要开启线程要看CAS
    • 比如现在有10个线程同时向WriteQueue中添加了一个数据帧
    • 那么这10个线程中的某一个会CAS成功,其他会CAS失败
    • 那么此时CAS成功的线程会负责从线程池中获取另外一个线程执行上面的flush方法
    • 从而获取WriteQueue中的数据帧然后发送出去
  • 有了底层这套设计之后,对于TripleInvoker而言,它只需要把要发送的数据封装为数据帧,然后添加到WriteQueue中就可以了
  • 在TripleInvoker的doInvoke()源码中,在创建完成Socket连接后
    • 基于Socket连接先构造一个ClientCall对象
    • 根据当前调用的方法信息构造一个RequestMetadata对象,这个对象表示,当前调用的是哪个接口的哪个方法,并且记录了所配置的序列化方式,压缩方式,超时时间等
    • 紧接着构造一个ClientCall.Listener,这个Listener是用来处理响应结果的,针对不同的流式调用类型,会构造出不同的ClientCall.Listener
      • UNARY:会构造出一个UnaryClientCallListener,内部包含了一个DeadlineFuture,DeadlineFuture是用来控制timeout的
      • SERVER_STREAM:会构造出一个ObserverToClientCallListenerAdapter,内部包含了调用方法时传入进来的StreamObserver对象,最终就是由这个StreamObserver对象来处理响应结果的
      • BI_STREAM:和SERVER_STREAM一样,也会构造出来一个ObserverToClientCallListenerAdapter
    • 紧着着,就会调用ClientCall对象的start方法创建一个Stream,并且返回一个StreamObserver对象
    • 得到了StreamObserver对象后,会根据不同的流式调用类型来使用这个StreamObserver对象
      • UNARY:直接调用StreamObserver对象的onNext()方法来发送方法参数,然后调用onCompleted方法,然后返回一个new AsyncRpcResult(future, invocation),future就是DeadlineFuture,后续会通过DeadlineFuture同步等待响应结果的到来,并最终把获取到的响应结果返回给业务方法
      • SERVER_STREAM:直接调用StreamObserver对象的onNext()方法来发送方法参数,然后调用onCompleted方法,然后返回一个new AsyncRpcResult(CompletableFuture.completedFuture(new AppResponse()), invocation) 后续不会同步了,并且返回null给业务方法
      • BI_STREAM:直接返回new AsyncRpcResult( CompletableFuture.completedFuture(new AppResponse(requestObserver)), invocation),也不同同步等待响应结果了,而是直接把requestObserver对象返回给了业务方法
  • 不管是哪种流式调用类型,都会先创建一个Stream,得到对应的一个StreamObserver对象,然后调用StreamObserver对象的onNext方法来发送数据,比如发送服务接口方法的入参值,比如一个User对象
    • 在发送User对象之前,会先发送请求头,请求头中包含了当前调用的是哪个接口、哪个方法、版本号、序列化方式、压缩方式等信息,注意请求头中会包含一些gRPC相关的key,主要就是为了兼容gRPC
    • 然后就是发送请求体
    • 然后再对User对象进行序列化,得到字节数组
    • 然后再压缩字节数组
    • 然后把压缩之后的字节数组以及是否压缩标记生成一个DataQueueCommand对象,并且把这个对象添加到writeQueue中去,然后执行scheduleFlush(),该方法就会开启一个线程从writeQueue中获取数据进行发送,发送时就会触发DataQueueCommand对象的doSend方法进行发送,该方法中会构造一个DefaultHttp2DataFrame对象,该对象中由两个属性endStream,表示是不是Stream中的最后一帧,另外一个属性为content,表示帧携带的核心数据,该数据格式为
      • 第一个字节记录请求体是否被压缩
      • 紧着的四个字节记录字节数组的长度
      • 后面就真正的字节数据
  • TripleInvoker接收响应数据的流程:ClientCall.Listener就是用来监听是否接收到的响应数据的,不同的流式调用方式会对应不同的ClientCall.Listener
    • UNARY:UnaryClientCallListener,内部包含了一个DeadlineFuture,DeadlineFuture是用来控制timeout的
    • SERVER_STREAM:ObserverToClientCallListenerAdapter,内部包含了调用方法时传入进来的StreamObserver对象,最终就是由这个StreamObserver对象来处理响应结果的
    • BI_STREAM:和SERVER_STREAM一样,也会构造出来一个ObserverToClientCallListenerAdapter
  • 在之前创建Stream时,会向Http2StreamChannel绑定一个TripleHttp2ClientResponseHandler,很明显这个Handler就是用来处理接收到的响应数据的
  • 在TripleHttp2ClientResponseHandler的channelRead0方法中,每接收一个响应数据就会判断是Http2HeadersFrame还是Http2DataFrame,然后调用ClientTransportListener中对应的onHeader方法和onData方法
    • onHeader方法通过处理响应头,会生成一个TriDecoder,它是用来解压并处理响应体的
    • onData方法会利用TriDecoder的deframe()方法来处理响应体
  • 如果服务提供者那边调用了onCompleted方法,会向客户端响应一个请求头,endStream为true,表示响应结束,也会触发执行onHeader方法,从而会调用TriDecoder的close()方法
  • TriDecoder的deframe()方法在处理响应体数据时,会分为两个步骤
    • 先解析前5个字节,先解析第1个字节确定该响应体是否压缩了,再解析后续4个字节确定响应体内容的字节长度
    • 然后再取出该长度的字节作为响应体数据,如果压缩了,那就进行解压,然后把解压之后的字节数组传递给ClientStreamListenerImpl的onMessage()方法,该方法就会按对应的序列化方式进行反序列化,得到最终的对象,然后再调用到最终的UnaryClientCallListener或者ObserverToClientCallListenerAdapter的onMessage()方法
  • TriDecoder的close()方法最终也会调用到UnaryClientCallListener或者ObserverToClientCallListenerAdapter的close()方法
  • UnaryClientCallListener,构造它时传递了一个DeadlineFuture对象
    • onMessage()接收到响应结果对象后,会把结果对象赋值给appResponse属性
    • onClose()会取出appResponse属性记录的结果对象构造出来一个AppResponse对象,然后调用DeadlineFuture的received方法,从而将方法调用线程接阻塞,并得到响应结果对象
  • ObserverToClientCallListenerAdapter,构造它时传递了一个StreamObserver对象
    • onMessage()接收到响应结果对象后,会调用StreamObserver对象的onNext(),并把结果对象传给onNext()方法,从触发了程序员的onNext()方法逻辑
    • onClose()就会调用StreamObserver对象的onCompleted(),或者调用onError()方法

Triple请求处理和响应结果发送

  • 部分内容和发送请求和处理响应是非常类似的,无非就是把视角从消费端切换到服务端,前面分析的是消费端发送和接收数据,现在要分析的是服务端接收和发送数据
  • 消费端在创建一个Stream后,会生成一个对应的StreamObserver对象用来发送数据和一个ClientCall.Listener用来接收响应数据,对于服务端其实也一样,在接收到消费端创建Stream的命令后,也需要生成一个对应的StreamObserver对象用来响应数据以及一个ServerCall.Listener用来接收请求数据
  • 在服务导出时,TripleProtocol的export方法中会开启一个ServerBootstrap,并绑定指定的端口,并且最重要的是,Netty会负责接收创建Stream的信息,一旦就收到这个信号,就会生成一个ChannelPipeline,并给ChannelPipeline绑定一个TripleHttp2FrameServerHandler,而这个TripleHttp2FrameServerHandler就可以用来处理Http2HeadersFrame和Http2DataFrame
  • 比如在接收到请求头后,会构造一个ServerStream对象,该对象有一个ServerTransportObserver对象,ServerTransportObserver对象就会真正来处理请求头和请求体
    • onHeader()方法,用来处理请求头
      • 比如从请求头中得到当前请求调用的是哪个服务接口,哪个方法
      • 构造一个TriDecoder对象,TriDecoder对象用来处理请求体
      • 构造一个ReflectionServerCall对象并调用它的doStartCall()方法,从而生成不同的ServerCall.Listener
        • UNARY:UnaryServerCallListener
        • SERVER_STREAM:ServerStreamServerCallListener
        • BI_STREAM:BiStreamServerCallListener
        • 并且在构造这些ServerCall.Listener时会把ReflectionServerCall对象传入进去,ReflectionServerCall对象可以用来向客户端发送数据
    • onData()方法,用来处理请求体,调用TriDecoder对象的deframe方法来处理请求体,如果是endStream,那还会调用TriDecoder对象的close方法
  • TriDecoder
    • deframe():这个方法的作用和客户端时一样的,都是先解析请求体的前5个字节,然后解压请全体,然后反序列化得到请求参数对象,然后调用不同的ServerCall.Listener中的onMessage()
    • close():当客户端调用onCompleted方法时,就表示发送数据完毕,此时会发送一个DefaultHttp2DataFrame并且endStream为true,从而会触发服务端TriDecoder对象的close()方法,从而调用不同的ServerCall.Listener中的onComplete()
  • UnaryServerCallListener
    • 在接收到请求头时,会构造UnaryServerCallListener对象,没什么特殊的
    • 然后接收到请求体时,请求体中的数据就是调用接口方法的入参值,比如User对象,那么就会调用UnaryServerCallListener的onMessage()方法,在这个方法中会把User对象设置到invocation对象中
    • 当消费端调用onCompleted()方法,表示请求体数据发送完毕,从而触发UnaryServerCallListener的onComplete()方法,在该方法中会调用invoke()方法,从而执行服务方法,并得到结果,得到结果后,会调用UnaryServerCallListener的onReturn()方法,把结果通过responseObserver发送给消费端,并调用responseObserver的onCompleted()方法,表示响应数据发送完毕,responseObserver是ReflectionServerCall对象的一个StreamObserver适配对象(ServerCallToObserverAdapter)
  • ServerStreamServerCallListener
    • 在接收到请求头时,会构造ServerStreamServerCallListener对象,没什么特殊的
    • 然后接收到请求体时,请求体中的数据就是调用接口方法的入参值,比如User对象,那么就会调用ServerStreamServerCallListener的onMessage()方法,在这个方法中会把User对象以及responseObserver对象设置到invocation对象中,这是和UnaryServerCallListener不同的地方,UnaryServerCallListener只会把User对象设置给invocation,而ServerStreamServerCallListener还会把responseObserver对象设置进去,因为服务端流需要这个responseObserver对象,服务方法拿到这个对象后就可以自己来控制如何发送响应体,并什么时候调用onCompleted()方法来表示响应体发送完毕
    • 当消费端调用onCompleted()方法,表示请求体数据发送完毕,从而触发ServerStreamServerCallListener的onComplete()方法,在该方法中会调用invoke()方法,从而执行服务方法,从而会通过responseObserver对象来发送数据
    • 方法执行完后,仍然会调用ServerStreamServerCallListener的onReturn()方法,但是个空方法
  • BiStreamServerCallListener
    • 在接收到请求头时,会构造BiStreamServerCallListener对象,这里比较特殊,会把responseObserver设置给invocation并执行invoke()方法,从而执行服务方法,并执行onReturn()方法,onReturn()方法中会把服务方法的执行结果,也是一个StreamObserver对象,赋值给BiStreamServerCallListener对象的requestObserver属性
    • 这样,在接收到请求头时,服务方法就会执行了,并且得到了一个requestObserver,它是程序员定义的,是用来处理请求体的,另外的responseObserver是用来发送响应体的
    • 紧接着就会收到请求体,从而触发onMessage()方法,该方法中会调用requestObserver的onNext()方法,这样就可以做到,服务端能实时的接收到消费端每次所发送过来的数据,并且进行处理,处理过程中,如果需要响应就可以利用responseObserver进行响应
    • 一旦消费端那边调用了onCompleted()方法,那么就会触发BiStreamServerCallListener的onComplete方法,该方法中也就是调用requestObserver的onCompleted(),主要就触发程序员自己写的StreamObserver对象中的onCompleted(),并没有针对底层的Stream做什么事情

总结

  • 不管是Unary,还是ServerStream,还是BiStream,底层客户端和服务端之前都只有一个Stream,它们三者的区别在于
    • Unary:通过流,将方法入参值作为请求体发送出去,而且只发送一次,服务端这边接收到请求体之后,会执行服务方法,得到结果,把结果返回给客户端,也只响应一次
    • ServerStream:通过流,将方法入参值作为请求体发送出去,而且只发送一次,服务端这边接收到请求体之后,会执行服务方法,并且会把当前流对应的StreamObserver对象也传给服务方法,由服务方法自己控制如何响应,响应几次,响应什么数据,什么时候响应结束,都由服务方法自己控制
    • BiStream,通过流,客户端和服务端,都可以发送和响应多次

文章作者: 钱不寒
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 钱不寒 !
  目录