--- title: 字节RPC框架kitex源码阅读(二) categories: [源码阅读,kitex] --- > [!note] > 基于kitex@v0.11.3 ## 开篇 在上篇[字节RPC框架kitex源码阅读(一)](https://nosae.top/posts/kitex%E6%BA%90%E7%A0%81%E9%98%85%E8%AF%BB%E4%B8%80/)中,简单过了一遍从创建服务、监听端口、建立连接&派发、退出清理的流程,对于代码生成的回调如何在kitex内部得到调用也有了初步的认知。 这篇是(一)的续篇,深入分析`remote.Server`如何基于与客户端建立的连接做交互,包括传输、解码、编码等。 ## remote.ServerTransHandler 我们知道`server.Server`主要构建调用链、调用用户定义的回调。与远程传输有关的`remote.Server`提供了简单的几个接口方法给`server.Server`使用,相当于`server.Server`只需要关心调用链要怎么消费封装好的数据,不用管传输如何建立、数据如何封装: ```go // remote.Server type Server interface { Start() chan error Stop() error Address() net.Addr } ``` 而连接建立、数据收发、连接关闭等工作,`remote.Server`又依托`remote.TransServer`接口提供的这几个方法实现: ```go type TransServer interface { CreateListener(net.Addr) (net.Listener, error) BootstrapServer(net.Listener) (err error) Shutdown() error ConnCount() utils.AtomicInt } ``` `remote.TransServer`又依赖于`remote.TransServerHandler`接口提供的这些方法,贯穿连接的整个生命周期所做的一些工作: ```go type ServerTransHandler interface { Write(ctx context.Context, conn net.Conn, send Message) (nctx context.Context, err error) Read(ctx context.Context, conn net.Conn, msg Message) (nctx context.Context, err error) // 连接关闭时调用 OnInactive(ctx context.Context, conn net.Conn) // 处理RPC请求出错时调用 OnError(ctx context.Context, err error, conn net.Conn) // 处理RPC请求完成后调用,调用调用链交给上层处理,包括业务endpoint OnMessage(ctx context.Context, args, result Message) (context.Context, error) SetPipeline(pipeline *TransPipeline) // 连接新建时调用 OnActive(ctx context.Context, conn net.Conn) (context.Context, error) // 有RPC请求时调用 OnRead(ctx context.Context, conn net.Conn) error } ``` 梳理完这几个接口之后,我们从`server.Server.Run`开始往下分析,`remote.Server`、`remote.TransServer`、`remote.ServerTransHandler`都在此时被创建出来: ```go func (s *server) Run() (err error) { ... // 创建ServerTransHandler,负责数据传输过程中的关键步骤处理 transHdlr, err := s.newSvrTransHandler() if err != nil { return err } s.Lock() // TransServer在NewServer中被创建,并与ServerTransHandler绑定 s.svr, err = remotesvr.NewServer(s.opt.RemoteOpt, s.eps, transHdlr) s.Unlock() if err != nil { return err } ... // Start开始异步监听 errCh := s.svr.Start() ... } ``` 其中我们要重点关注的是`remote.ServerTransHandler`,因为它几乎贯穿整个连接的生命周期,包揽了与客户端的数据交互,看一下他是怎么被创建出来的,具体使用的是哪个实现类: ```go func (s *server) newSvrTransHandler() (handler remote.ServerTransHandler, err error) { // 使用工厂模式创建 transHdlrFactory := s.opt.RemoteOpt.SvrHandlerFactory transHdlr, err := transHdlrFactory.NewTransHandler(s.opt.RemoteOpt) ... // 创建TransPipeline transPl := remote.NewTransPipeline(transHdlr) // 添加inBound和outBound中间件 for _, ib := range s.opt.RemoteOpt.Inbounds { transPl.AddInboundHandler(ib) } for _, ob := range s.opt.RemoteOpt.Outbounds { transPl.AddOutboundHandler(ob) } return transPl, nil } ``` 返回得到一个名为`TransPipeline`的结构体,`TransPipeline`是代理类,代理了真正从工厂创建出来的`ServerTransHandler`对象,主要是为了调用inBound、outBound中间件。这里说的中间件不同于调用链上的中间件,是在流量入口和出口处做一些简单的工作,基本上不接触业务对象(req、reply)。 ![image-20241023183320421](https://cdn.jsdelivr.net/gh/NOS-AE/assets@main/img/image-20241023183320421.png) 比如限流功能就是通过serverLimiterHandler中间件实现的(详见pkg/remote/bound/limiter_inbound.go)。 `TransPipeline`包装的`ServerTransHandler`是通过工厂生成的,工厂默认是: ```go func newServerRemoteOption() *remote.ServerOption { return &remote.ServerOption{ SvrHandlerFactory: detection.NewSvrTransHandlerFactory(netpoll.NewSvrTransHandlerFactory(), nphttp2.NewSvrTransHandlerFactory()), ... } } ``` 这个`detection.svrTransHandlerFactory`工厂生产的依然不是具体实现,而是`detection.svrTransHandler`。在说他的作用前我先提一嘴,我们用kitex创建服务端时并没有指定使用什么传输协议(ttheader、http2等),像官网介绍的那样,kitex一个服务端同时支持所有传输协议: 回到代码,`detection.svrTransHandler`的作用就是检测连接上发送过来的数据匹配的是什么协议,从而选择使用对应的实现。比如默认的两个具体工厂分别生产的`trans.svrTransHandler`和`nphttp2.svrTransHandler`就分别对应着TTHeader协议和http2协议,后者可以通过在客户端创建时使用`client.WithTransportProtocol(transport.GRPC)`指定。 上述整个处理流程可以表示成下图,左边大箭头代表不同的连接,并且选择不同的传输协议与服务端交互,经过协议检测,最后选择对应的`ServerTransHandler`实现来处理数据: ![image-20241023191425835](https://cdn.jsdelivr.net/gh/NOS-AE/assets@main/img/image-20241023191425835.png) 下面就以TTHeader协议为例,同样按照我们熟悉的gonet实现的`ServerTransHandler`进行分析,与图中绿色的`netpoll`可视为功能平替。自己在本地调试运行gonet实现的时候,需要加上这两个服务端选项: ```go server.WithTransServerFactory(gonet.NewTransServerFactory()) server.WithTransHandlerFactory(gonet.NewSvrTransHandlerFactory()) ``` 这样一来,相当于不需要`detection.svrTransHandler`根据协议选择工厂,工厂被直接指定为`gonet.NewSvrTransHandlerFactory()`了。 另外,分别打开gonet和netpoll下的`server_handler.go`,可以发现并不像nphttp2和detection那样有定义自己`svrTransHandler`实现类,而是复用了`trans/default_server_handler.go`中定义的`svrTransHandler`: ```go // netpoll func newSvrTransHandler(opt *remote.ServerOption) (remote.ServerTransHandler, error) { return trans.NewDefaultSvrTransHandler(opt, NewNetpollConnExtension()) } ``` ```go // gonet func newSvrTransHandler(opt *remote.ServerOption) (remote.ServerTransHandler, error) { return trans.NewDefaultSvrTransHandler(opt, NewGonetExtension()) } ``` 因为`gonet`和`netpoll`的`ServerTransHandler`大部分处理逻辑相同,处理的都是TTHeader协议。有差异的部分以插件的方式实现。`default_server_handler.go`中定义的`svrTransHandler`如下: ```go type svrTransHandler struct { opt *remote.ServerOption svcSearcher remote.ServiceSearcher targetSvcInfo *serviceinfo.ServiceInfo inkHdlFunc endpoint.Endpoint codec remote.Codec transPipe *remote.TransPipeline ext Extension } ``` 因为`ServerTransHandler`是被`TransServer`直接来调用以处理连接生命周期的工作的,所以我们回到`BootstrapServer`这个函数,从连接建立到派发的地方开始分析: ```go func (ts *transServer) BootstrapServer(ln net.Listener) (err error) { if ln == nil { return errors.New("listener is nil in gonet transport server") } ts.ln = ln for { // 建立连接 conn, err := ts.ln.Accept() if err != nil { klog.Errorf("KITEX: BootstrapServer accept failed, err=%s", err.Error()) os.Exit(1) } go func() { var ( ctx = context.Background() err error ) defer func() { transRecover(ctx, conn, "OnRead") }() // 封装该连接,借助零拷贝reader的能力提升读性能 bc := newBufioConn(conn) // OnActive ctx, err = ts.transHdlr.OnActive(ctx, bc) if err != nil { klog.CtxErrorf(ctx, "KITEX: OnActive error=%s", err) return } for { ts.refreshDeadline(rpcinfo.GetRPCInfo(ctx), bc) // OnRead err := ts.transHdlr.OnRead(ctx, bc) if err != nil { // OnError ts.onError(ctx, err, bc) _ = bc.Close() return } } }() } } ``` 我们顺着代码,先去`OnActive`里都做了些什么(忽略`TransPipeline`和`detection`,直接分析实现类的`OnActive`方法): ```go func (t *svrTransHandler) OnActive(ctx context.Context, conn net.Conn) (context.Context, error) { // 从缓存池拿一个rpcinfo对象,用来保存RPC调用信息 ri := t.opt.InitOrResetRPCInfoFunc(nil, conn.RemoteAddr()) // 将对象保存到context当中,作为该连接的根context return rpcinfo.NewCtxWithRPCInfo(ctx, ri), nil } func NewCtxWithRPCInfo(ctx context.Context, ri RPCInfo) context.Context { if ri != nil { return context.WithValue(ctx, ctxRPCInfoKey, ri) } return ctx } ``` `OnActive`比较简单,初始化一下该连接的rpcinfo对象和context。 接着是`OnRead`,看着比较长,但基本逻辑就是「读请求 -> 解析请求 -> 处理请求 -> 写响应」: ```go // The connection should be closed after returning error. func (t *svrTransHandler) OnRead(ctx context.Context, conn net.Conn) (err error) { ctx, ri := t.newCtxWithRPCInfo(ctx, conn) t.ext.SetReadTimeout(ctx, conn, ri.Config(), remote.Server) var recvMsg remote.Message var sendMsg remote.Message closeConnOutsideIfErr := true defer func() { panicErr := recover() var wrapErr error // 处理panic,打印日志 ... // 结束该RPC调用相关的tracer、profiler t.finishTracer(ctx, ri, err, panicErr) t.finishProfiler(ctx) // 回收一些缓存池的资源 remote.RecycleMessage(recvMsg) remote.RecycleMessage(sendMsg) if rpcinfo.PoolEnabled() { t.opt.InitOrResetRPCInfoFunc(ri, conn.RemoteAddr()) } if wrapErr != nil { err = wrapErr } if err != nil && !closeConnOutsideIfErr { err = nil } }() // 开启RPC调用相关的tracer、profiler ctx = t.startTracer(ctx, ri) ctx = t.startProfiler(ctx) // 创建请求数据的载体 recvMsg = remote.NewMessageWithNewer(t.targetSvcInfo, t.svcSearcher, ri, remote.Call, remote.Server) recvMsg.SetPayloadCodec(t.opt.PayloadCodec) // Read读请求数据 ctx, err = t.transPipe.Read(ctx, conn, recvMsg) if err != nil { t.writeErrorReplyIfNeeded(ctx, recvMsg, conn, err, ri, true) return err } svcInfo := recvMsg.ServiceInfo() ... // 创建响应数据的载体 sendMsg = remote.NewMessage(methodInfo.NewResult(), svcInfo, ri, remote.Reply, remote.Server) // OnMessage通知上层处理 ctx, err = t.transPipe.OnMessage(ctx, recvMsg, sendMsg) if err != nil { // OnError t.OnError(ctx, err, conn) err = remote.NewTransError(remote.InternalError, err) // 有必要的话,将错误信息响应给客户端 if closeConn := t.writeErrorReplyIfNeeded(ctx, recvMsg, conn, err, ri, false); closeConn { return err } // connection don't need to be closed when the error is return by the server handler closeConnOutsideIfErr = false return } remote.FillSendMsgFromRecvMsg(recvMsg, sendMsg) // Write写响应数据 if ctx, err = t.transPipe.Write(ctx, conn, sendMsg); err != nil { return err } return } ``` 分析具体代码之前,我们注意由`OnMessage`返回的错误(比如调用链返回了错误),在trans中视为业务逻辑错误,而不是传输错误,因此不会将这个错误返回,这个函数只会返回数据传输相关错误,让`TransServer`感知到传输错误后断开这个连接。 下面我们挑`OnRead`中几个关键点函数进行分析。首先是负责读数据和解码数据的的`Read`方法 ```go func (t *svrTransHandler) Read(ctx context.Context, conn net.Conn, recvMsg remote.Message) (nctx context.Context, err error) { var bufReader remote.ByteBuffer defer func() { // 释放buffer reader t.ext.ReleaseBuffer(bufReader, err) rpcinfo.Record(ctx, recvMsg.RPCInfo(), stats.ReadFinish, err) }() rpcinfo.Record(ctx, recvMsg.RPCInfo(), stats.ReadStart, nil) // 创建buffer reader bufReader = t.ext.NewReadByteBuffer(ctx, conn, recvMsg) // 数据读入并解码 if codec, ok := t.codec.(remote.MetaDecoder); ok { if err = codec.DecodeMeta(ctx, recvMsg, bufReader); err == nil { ... err = codec.DecodePayload(ctx, recvMsg, bufReader) } } else { err = t.codec.Decode(ctx, recvMsg, bufReader) } if err != nil { recvMsg.Tags()[remote.ReadFailed] = true return ctx, err } return ctx, nil } ``` 继续看负责通知消息到达并进行进一步处理的`OnMessage`方法,这个方法就更简单了,就是把请求数据过一遍调用链,并将调用链返回的业务结果保存到响应数据中并返回。 ```go func (t *svrTransHandler) OnMessage(ctx context.Context, args, result remote.Message) (context.Context, error) { err := t.inkHdlFunc(ctx, args.Data(), result.Data()) return ctx, err } ``` 最后的写响应方法`Write`: ```go func (t *svrTransHandler) Write(ctx context.Context, conn net.Conn, sendMsg remote.Message) (nctx context.Context, err error) { var bufWriter remote.ByteBuffer ri := sendMsg.RPCInfo() rpcinfo.Record(ctx, ri, stats.WriteStart, nil) defer func() { // 释放buffer writer t.ext.ReleaseBuffer(bufWriter, err) rpcinfo.Record(ctx, ri, stats.WriteFinish, err) }() ... // 创建buffer writer bufWriter = t.ext.NewWriteByteBuffer(ctx, conn, sendMsg) // 数据编码并写入连接中 err = t.codec.Encode(ctx, sendMsg, bufWriter) if err != nil { return ctx, err } // 将剩余缓存全部写入连接中(类比刷盘) return ctx, bufWriter.Flush() } ``` 另外还有没提到的`OnError`方法,这个方法无论是业务错误还是传输错误都会被调用,主要是打印一些日志、当因为错误而发生连接关闭时设置下rpcinfo的信息(这个状态可能还以metadata的形式发回给客户端,或者用于stat等,没具体考究过) ```go func (t *svrTransHandler) OnError(ctx context.Context, err error, conn net.Conn) { ri := rpcinfo.GetRPCInfo(ctx) rService, rAddr := getRemoteInfo(ri, conn) if t.ext.IsRemoteClosedErr(err) { // it should not regard error which cause by remote connection closed as server error if ri == nil { return } remote := rpcinfo.AsMutableEndpointInfo(ri.From()) remote.SetTag(rpcinfo.RemoteClosedTag, "1") } else { var de *kerrors.DetailedError if ok := errors.As(err, &de); ok && de.Stack() != "" { klog.CtxErrorf(ctx, "KITEX: processing request error, remoteService=%s, remoteAddr=%v, error=%s\nstack=%s", rService, rAddr, err.Error(), de.Stack()) } else { klog.CtxErrorf(ctx, "KITEX: processing request error, remoteService=%s, remoteAddr=%v, error=%s", rService, rAddr, err.Error()) } } } ``` ## 结尾 本篇与前篇从源码的视角简要分析了整个kitex服务端从创建、运行、建立连接、接收请求、处理请求、响应、结束的大致运行流程。通过以小见大的方式,文章是基于经典的go网络标准库来进行分析的,期间顺便弄清了一些接口的功能和他们之间的依赖关系,为更加深入地对kitex中其他各个组件的探讨提供了主干、基础。