- Triple的底层原理分析
- Triple请求调用和响应处理
- Triple请求处理和响应结果发送
- 总结
- Dubbo协议相比较于Http1.1而言性能上是要更好的
- 但是Dubbo协议自己的缺点就是不通用
- Triple协议是基于Http2协议的
- HTTP2协议从设计层面就解决了HTTP1性能低的问题
- Google公司开发的gRPC,也基于的HTTP2
- k8s/etcd等都支持gRPC协议
Triple的底层原理分析
- 就是因为HTTP2中的数据帧机制,Triple协议才能支持UNARY、SERVER_STREAM、BI_STREAM三种模式
- UNARY:就是最普通的
- 服务端只有在接收到完请求包括的所有的HEADERS帧和DATA帧之后 (通过调用onCompleted()发送最后一个DATA帧),才会处理数据
- 客户端也只有接收完响应包括的所有的HEADERS帧和DATA帧之后,才会处理响应结果
- SERVER_STREAM:服务端流
- 服务端在接收完请求包括的所有的DATA帧之后,才会处理数据
- 不过在处理数据的过程中,可以多次发送响应DATA帧(第一个DATA帧发送之前会发送一个HEADERS帧)
- 客户端每接收到一个响应DATA帧就可以直接处理该响应DATA 帧
- 这个模式下,客户端只能发一次数据,但能多次处理响应DATA帧
- gRPC的效果是正确的,Dubbo3.0需要异步进行发送
- BI_STREAM:双端流,或者客户端流
- 客户端可以控制发送多个请求DATA帧(第一个DATA帧发送之前会发送一个HEADERS帧)
- 服务端会不断的接收到请求DATA帧并进行处理,并且及时的把处理结果作为响应DATA帧发送给客户端(第一个DATA帧发送之前会发送一个HEADERS帧)
- 而客户端每接收到一个响应结果DATA帧也会直接处理
- 客户端和服务端都在不断的接收和发送DATA帧并进行处理
- 请求HEADER帧和响应HEADERS帧都只发了一个
Triple请求调用和响应处理
- 在服务提供者进行服务导出时,会按照协议以及对应的端口启动Server
- 比如Triple协议就会启动Netty并绑定指定的端口,等待Socket连接
- 在进行服务消费者进行服务引入的过程中,会生成TripleInvoker对象
- 在构造TripleInvoker对象的构造方法中,会利用ConnectionManager创建一个Connection对象
- 而Connection对象中包含了一个Bootstrap对象(Netty中用来建立Socket连接的)
- 不过以上都只是创建对象,并不会真正和服务去建立Socket连接
- 所以在生成TripleInvoker对象过程中不会真正去创建Socket连接
- 当我们在服务消费端执行以下代码时
demoService.sayHello("demo")
- demoService是一个代理对象,在执行方法的过程中,最终会调用TripleInvoker的doInvoke()方法
- 在doInvoke()方法中,会利用Connection对象来判断Socket连接是否可用
- 如果不可用并且没有初始化,那就会创建Socket连接
- 一个Connection对象就表示一个Socket连接,在TripleInvoker对象中也只有一个Connection对象
- 也就是一个TripleInvoker对象只对应一个Socket连接
- 这个和DubboInvoker不太一样,一个DubboInvoker中可以有多个ExchangeClient
- 每个ExchangeClient都会与服务端创建一个Socket连接
- 所以一个DubboInvoker可以对应多个Socket连接
- 当然多个Socket连接的目的就是提高并发
- 不过在TripleInvoker对象中就不需要这么来设计了,因为可以Stream机制来提高并发
- 这个和DubboInvoker不太一样,一个DubboInvoker中可以有多个ExchangeClient
- 也就是一个TripleInvoker对象只对应一个Socket连接
- 当我们利用服务接口的代理对象执行方法时就会创建一个Socket连接
- 就算这个代理对象再次执行方法时也不会再次创建Socket连接了
- 有可能两个服务接口对应的是一个Socket连接
- 比如服务提供者应用A,提供了DemoService和HelloService两个服务
- 服务消费者应用B引入了这两个服务
- 那么在服务消费者这端,这个两个接口对应的代理对象对应的TripleInvoker是不同的两个
- 但是这两个TripleInvoker会公用一个Socket连接
- 因为ConnectionManager在创建Connection对象时会根据服务URL的address进行缓存
- 后续这两个代理对象在执行方法时使用的就是同一个Socket连接,但是是不同的Stream
- Socket连接创建好之后,就需要发送Invocation对象给服务提供者了
- 因为是基于的HTTP2,所以要先创建一个Stream,然后再通过Stream来发送数据
- TripleInvoker中用的是Netty,所以最终会利用Netty来创建Stream,对应的对象为Http2StreamChannel
- 消费端的TripleInvoker最终会利用Http2StreamChannel来发送和接收数据帧
- 数据帧对应的对象为Http2Frame,它又分为Http2DataFrame、Http2HeadersFrame等
- 正常情况下,会每生成一个数据帧就会通过Http2StreamChannel发送出去
- 但是在Triple中有一个小小的优化,会有一个批量发送的思想
- 当要发送一个数据帧时,会先把数据帧放入一个WriteQueue中
- 然后会从线程池中拿到一个线程调用WriteQueue的flush方法
- 连续从队列中取到了128个数据帧就flush一次
- 只要向WriteQueue中添加一个数据帧之后,那就会尝试开启一个线程,要不要开启线程要看CAS
- 比如现在有10个线程同时向WriteQueue中添加了一个数据帧
- 那么这10个线程中的某一个会CAS成功,其他会CAS失败
- 那么此时CAS成功的线程会负责从线程池中获取另外一个线程执行上面的flush方法
- 从而获取WriteQueue中的数据帧然后发送出去
- 有了底层这套设计之后,对于TripleInvoker而言,它只需要把要发送的数据封装为数据帧,然后添加到WriteQueue中就可以了
- 在TripleInvoker的doInvoke()源码中,在创建完成Socket连接后
- 基于Socket连接先构造一个ClientCall对象
- 根据当前调用的方法信息构造一个RequestMetadata对象,这个对象表示,当前调用的是哪个接口的哪个方法,并且记录了所配置的序列化方式,压缩方式,超时时间等
- 紧接着构造一个ClientCall.Listener,这个Listener是用来处理响应结果的,针对不同的流式调用类型,会构造出不同的ClientCall.Listener
- UNARY:会构造出一个UnaryClientCallListener,内部包含了一个DeadlineFuture,DeadlineFuture是用来控制timeout的
- SERVER_STREAM:会构造出一个ObserverToClientCallListenerAdapter,内部包含了调用方法时传入进来的StreamObserver对象,最终就是由这个StreamObserver对象来处理响应结果的
- BI_STREAM:和SERVER_STREAM一样,也会构造出来一个ObserverToClientCallListenerAdapter
- 紧着着,就会调用ClientCall对象的start方法创建一个Stream,并且返回一个StreamObserver对象
- 得到了StreamObserver对象后,会根据不同的流式调用类型来使用这个StreamObserver对象
- UNARY:直接调用StreamObserver对象的onNext()方法来发送方法参数,然后调用onCompleted方法,然后返回一个
new AsyncRpcResult(future, invocation)
,future就是DeadlineFuture,后续会通过DeadlineFuture同步等待响应结果的到来,并最终把获取到的响应结果返回给业务方法 - SERVER_STREAM:直接调用StreamObserver对象的onNext()方法来发送方法参数,然后调用onCompleted方法,然后返回一个
new AsyncRpcResult(CompletableFuture.completedFuture(new AppResponse()), invocation)
后续不会同步了,并且返回null给业务方法 - BI_STREAM:直接返回
new AsyncRpcResult( CompletableFuture.completedFuture(new AppResponse(requestObserver)), invocation)
,也不同同步等待响应结果了,而是直接把requestObserver对象返回给了业务方法
- UNARY:直接调用StreamObserver对象的onNext()方法来发送方法参数,然后调用onCompleted方法,然后返回一个
- 不管是哪种流式调用类型,都会先创建一个Stream,得到对应的一个StreamObserver对象,然后调用StreamObserver对象的onNext方法来发送数据,比如发送服务接口方法的入参值,比如一个User对象
- 在发送User对象之前,会先发送请求头,请求头中包含了当前调用的是哪个接口、哪个方法、版本号、序列化方式、压缩方式等信息,注意请求头中会包含一些gRPC相关的key,主要就是为了兼容gRPC
- 然后就是发送请求体
- 然后再对User对象进行序列化,得到字节数组
- 然后再压缩字节数组
- 然后把压缩之后的字节数组以及是否压缩标记生成一个DataQueueCommand对象,并且把这个对象添加到writeQueue中去,然后执行scheduleFlush(),该方法就会开启一个线程从writeQueue中获取数据进行发送,发送时就会触发DataQueueCommand对象的doSend方法进行发送,该方法中会构造一个DefaultHttp2DataFrame对象,该对象中由两个属性endStream,表示是不是Stream中的最后一帧,另外一个属性为content,表示帧携带的核心数据,该数据格式为
- 第一个字节记录请求体是否被压缩
- 紧着的四个字节记录字节数组的长度
- 后面就真正的字节数据
- TripleInvoker接收响应数据的流程:ClientCall.Listener就是用来监听是否接收到的响应数据的,不同的流式调用方式会对应不同的ClientCall.Listener
- UNARY:UnaryClientCallListener,内部包含了一个DeadlineFuture,DeadlineFuture是用来控制timeout的
- SERVER_STREAM:ObserverToClientCallListenerAdapter,内部包含了调用方法时传入进来的StreamObserver对象,最终就是由这个StreamObserver对象来处理响应结果的
- BI_STREAM:和SERVER_STREAM一样,也会构造出来一个ObserverToClientCallListenerAdapter
- 在之前创建Stream时,会向Http2StreamChannel绑定一个TripleHttp2ClientResponseHandler,很明显这个Handler就是用来处理接收到的响应数据的
- 在TripleHttp2ClientResponseHandler的channelRead0方法中,每接收一个响应数据就会判断是Http2HeadersFrame还是Http2DataFrame,然后调用ClientTransportListener中对应的onHeader方法和onData方法
- onHeader方法通过处理响应头,会生成一个TriDecoder,它是用来解压并处理响应体的
- onData方法会利用TriDecoder的deframe()方法来处理响应体
- 如果服务提供者那边调用了onCompleted方法,会向客户端响应一个请求头,endStream为true,表示响应结束,也会触发执行onHeader方法,从而会调用TriDecoder的close()方法
- TriDecoder的deframe()方法在处理响应体数据时,会分为两个步骤
- 先解析前5个字节,先解析第1个字节确定该响应体是否压缩了,再解析后续4个字节确定响应体内容的字节长度
- 然后再取出该长度的字节作为响应体数据,如果压缩了,那就进行解压,然后把解压之后的字节数组传递给ClientStreamListenerImpl的onMessage()方法,该方法就会按对应的序列化方式进行反序列化,得到最终的对象,然后再调用到最终的UnaryClientCallListener或者ObserverToClientCallListenerAdapter的onMessage()方法
- TriDecoder的close()方法最终也会调用到UnaryClientCallListener或者ObserverToClientCallListenerAdapter的close()方法
- UnaryClientCallListener,构造它时传递了一个DeadlineFuture对象
- onMessage()接收到响应结果对象后,会把结果对象赋值给appResponse属性
- onClose()会取出appResponse属性记录的结果对象构造出来一个AppResponse对象,然后调用DeadlineFuture的received方法,从而将方法调用线程接阻塞,并得到响应结果对象
- ObserverToClientCallListenerAdapter,构造它时传递了一个StreamObserver对象
- onMessage()接收到响应结果对象后,会调用StreamObserver对象的onNext(),并把结果对象传给onNext()方法,从触发了程序员的onNext()方法逻辑
- onClose()就会调用StreamObserver对象的onCompleted(),或者调用onError()方法
Triple请求处理和响应结果发送
- 部分内容和发送请求和处理响应是非常类似的,无非就是把视角从消费端切换到服务端,前面分析的是消费端发送和接收数据,现在要分析的是服务端接收和发送数据
- 消费端在创建一个Stream后,会生成一个对应的StreamObserver对象用来发送数据和一个ClientCall.Listener用来接收响应数据,对于服务端其实也一样,在接收到消费端创建Stream的命令后,也需要生成一个对应的StreamObserver对象用来响应数据以及一个ServerCall.Listener用来接收请求数据
- 在服务导出时,TripleProtocol的export方法中会开启一个ServerBootstrap,并绑定指定的端口,并且最重要的是,Netty会负责接收创建Stream的信息,一旦就收到这个信号,就会生成一个ChannelPipeline,并给ChannelPipeline绑定一个TripleHttp2FrameServerHandler,而这个TripleHttp2FrameServerHandler就可以用来处理Http2HeadersFrame和Http2DataFrame
- 比如在接收到请求头后,会构造一个ServerStream对象,该对象有一个ServerTransportObserver对象,ServerTransportObserver对象就会真正来处理请求头和请求体
- onHeader()方法,用来处理请求头
- 比如从请求头中得到当前请求调用的是哪个服务接口,哪个方法
- 构造一个TriDecoder对象,TriDecoder对象用来处理请求体
- 构造一个ReflectionServerCall对象并调用它的doStartCall()方法,从而生成不同的ServerCall.Listener
- UNARY:UnaryServerCallListener
- SERVER_STREAM:ServerStreamServerCallListener
- BI_STREAM:BiStreamServerCallListener
- 并且在构造这些ServerCall.Listener时会把ReflectionServerCall对象传入进去,ReflectionServerCall对象可以用来向客户端发送数据
- onData()方法,用来处理请求体,调用TriDecoder对象的deframe方法来处理请求体,如果是endStream,那还会调用TriDecoder对象的close方法
- onHeader()方法,用来处理请求头
- TriDecoder
- deframe():这个方法的作用和客户端时一样的,都是先解析请求体的前5个字节,然后解压请全体,然后反序列化得到请求参数对象,然后调用不同的ServerCall.Listener中的onMessage()
- close():当客户端调用onCompleted方法时,就表示发送数据完毕,此时会发送一个DefaultHttp2DataFrame并且endStream为true,从而会触发服务端TriDecoder对象的close()方法,从而调用不同的ServerCall.Listener中的onComplete()
- UnaryServerCallListener
- 在接收到请求头时,会构造UnaryServerCallListener对象,没什么特殊的
- 然后接收到请求体时,请求体中的数据就是调用接口方法的入参值,比如User对象,那么就会调用UnaryServerCallListener的onMessage()方法,在这个方法中会把User对象设置到invocation对象中
- 当消费端调用onCompleted()方法,表示请求体数据发送完毕,从而触发UnaryServerCallListener的onComplete()方法,在该方法中会调用invoke()方法,从而执行服务方法,并得到结果,得到结果后,会调用UnaryServerCallListener的onReturn()方法,把结果通过responseObserver发送给消费端,并调用responseObserver的onCompleted()方法,表示响应数据发送完毕,responseObserver是ReflectionServerCall对象的一个StreamObserver适配对象(ServerCallToObserverAdapter)
- ServerStreamServerCallListener
- 在接收到请求头时,会构造ServerStreamServerCallListener对象,没什么特殊的
- 然后接收到请求体时,请求体中的数据就是调用接口方法的入参值,比如User对象,那么就会调用ServerStreamServerCallListener的onMessage()方法,在这个方法中会把User对象以及responseObserver对象设置到invocation对象中,这是和UnaryServerCallListener不同的地方,UnaryServerCallListener只会把User对象设置给invocation,而ServerStreamServerCallListener还会把responseObserver对象设置进去,因为服务端流需要这个responseObserver对象,服务方法拿到这个对象后就可以自己来控制如何发送响应体,并什么时候调用onCompleted()方法来表示响应体发送完毕
- 当消费端调用onCompleted()方法,表示请求体数据发送完毕,从而触发ServerStreamServerCallListener的onComplete()方法,在该方法中会调用invoke()方法,从而执行服务方法,从而会通过responseObserver对象来发送数据
- 方法执行完后,仍然会调用ServerStreamServerCallListener的onReturn()方法,但是个空方法
- BiStreamServerCallListener
- 在接收到请求头时,会构造BiStreamServerCallListener对象,这里比较特殊,会把responseObserver设置给invocation并执行invoke()方法,从而执行服务方法,并执行onReturn()方法,onReturn()方法中会把服务方法的执行结果,也是一个StreamObserver对象,赋值给BiStreamServerCallListener对象的requestObserver属性
- 这样,在接收到请求头时,服务方法就会执行了,并且得到了一个requestObserver,它是程序员定义的,是用来处理请求体的,另外的responseObserver是用来发送响应体的
- 紧接着就会收到请求体,从而触发onMessage()方法,该方法中会调用requestObserver的onNext()方法,这样就可以做到,服务端能实时的接收到消费端每次所发送过来的数据,并且进行处理,处理过程中,如果需要响应就可以利用responseObserver进行响应
- 一旦消费端那边调用了onCompleted()方法,那么就会触发BiStreamServerCallListener的onComplete方法,该方法中也就是调用requestObserver的onCompleted(),主要就触发程序员自己写的StreamObserver对象中的onCompleted(),并没有针对底层的Stream做什么事情
总结
- 不管是Unary,还是ServerStream,还是BiStream,底层客户端和服务端之前都只有一个Stream,它们三者的区别在于
- Unary:通过流,将方法入参值作为请求体发送出去,而且只发送一次,服务端这边接收到请求体之后,会执行服务方法,得到结果,把结果返回给客户端,也只响应一次
- ServerStream:通过流,将方法入参值作为请求体发送出去,而且只发送一次,服务端这边接收到请求体之后,会执行服务方法,并且会把当前流对应的StreamObserver对象也传给服务方法,由服务方法自己控制如何响应,响应几次,响应什么数据,什么时候响应结束,都由服务方法自己控制
- BiStream,通过流,客户端和服务端,都可以发送和响应多次