--- title: 字节RPC框架kitex源码阅读(一) categories: [源码阅读,kitex] --- >[!note] > 基于kitex@v0.11.3 ## 开篇 随着分布式系统的发展,RPC(Remote Procedure Call,远程过程调用)已成为微服务架构中不可或缺的基础组件。RPC 通过让服务之间像调用本地方法一样发起远程调用,极大简化了跨进程、跨服务器的通信复杂度。对于开发者来说,选择一个性能稳定、易于使用的 RPC 框架至关重要。 Kitex,全称 “Kite Extensible”,是字节跳动开源的一款基于 Thrift 协议的高性能 Golang RPC 框架。它的设计目标是为大规模分布式服务提供高效、可扩展的 RPC 能力。除了性能上的优化,Kitex 还通过插件化的架构提供了灵活的扩展支持,开发者可以根据需求自定义编解码器、传输层协议等。 Kitex 的核心亮点包括: - **高性能**:Kitex 对性能的优化贯穿整个数据链路,能够在大规模并发请求场景下保持低延迟。 - **易用性**:Kitex 通过自动生成客户端和服务端代码,简化了开发者的使用流程。 - **丰富的功能支持**:包括负载均衡、链路追踪、熔断、限流等功能,帮助开发者构建健壮的微服务架构。 在本文中,我们将从源码层面逐步拆解从发起请求到响应结束,Kitex **服务端**是如何实现的,深入探讨它是如何实现这些核心功能的,对于其他组件的细节,比如服务发现、负载均衡、限流、链路追踪等,可能会稍有涉及,但大部分会另开文章讲解,因为“先理清主干,再探讨分支”是阅读源码的好习惯。 ## 代码生成、运行 > [!tip] > 为什么需要代码生成(codegen)?因为这些代码与用户定义的IDL有关,如果由框架本身实现的话,很可能会需要一些比如读取文件、反射等运行时机制来实现,拖慢运行效率。其次这些代码写法上却是通用的,而且量非常大,所以提前写好一个程序为你生成代码,替代手工编写。生成出来代码有的也叫脚手架 首先我们创建两个go project,并使用kitex提供的命令行工具分别去生成脚手架代码,让项目run起来,并发起第一个rpc调用试试。 为了简单起见: 1. 客户端和服务端都在本地运行,并且不采用服务发现,客户端采用直接IP+端口号的方式调用服务端 2. 服务端提供的是一个简单的echo服务,也就是原样响应客户端传过来的字符串 protobuf IDL如下,放到idl文件夹中: ```protobuf syntax = "proto3"; package example; option go_package = "example"; message ExampleRequest { string msg = 1; } message ExampleResponse { string msg = 1; } service ExampleService { rpc ExampleMethod(ExampleRequest) returns (ExampleResponse); } ``` 代码生成官网有教程,两行命令搞定 ```sh # 服务端 kitex -type protobuf -service example -module kitextest -I ../idl example.proto # 客户端 kitex -type protobuf example -module kitextest-client -I ../idl example.proto ``` 代码生成完成后,整理目录结构如下 ```sh ├── idl ├── kitextest │ ├── kitex_gen │ └── script └── kitextest-client └── kitex_gen ``` 生成好基本的代码后,就去服务端的`handler.go`文件里填充好echo的逻辑,命令行运行`go run .`启动服务端监听本地8888端口: ```go func (s *ExampleServiceImpl) ExampleMethod(ctx context.Context, req *example.ExampleRequest) (resp *example.ExampleResponse, err error) { resp = &example.ExampleResponse{Msg: req.Msg} return } ``` 客户端那边就调用`ExampleMethod`方法,然后打印一下响应结果没问题就行。 ## 服务初始化 由于本篇主要讲服务端,所以客户端那边的作用就是一个黑盒程序我们不用管它,客户端能调用`ExampleMethod`这个RPC方法就行。 首先从代码生成的`main.go`文件入手,生成的代码不出意外应该是这样的: ```go package main import ( example "kitextest/kitex_gen/example/exampleservice" "log" ) func main() { svr := example.NewServer(new(ExampleServiceImpl)) err := svr.Run() if err != nil { log.Println(err.Error()) } } ``` 1. 创建`server.Server`对象,传入处理业务逻辑的handler对象 2. 调用`svr.Run`运行程序并阻塞直到服务结束 这里handler相当于一个回调对象。当客户端有请求打进来,kitex会负责数据接收以及数据的解码,最后封装成定义好的请求体,调用handler相应的方法(比如`ExampleMethod`)给你去处理业务逻辑,你处理完业务逻辑后将定义好的响应体返回给kitex,kitex将响应体编码并传输回客户端。 整个过程听上去十分简单,开发人员只需要关注请求体、业务逻辑、响应体这三样东西,其他事情都由kitex搞定。 下面正式步入服务端源码,首先看下`example.NewServer`如何创建服务端对象: ```go func NewServer(handler example.ExampleService, opts ...server.Option) server.Server { var options []server.Option options = append(options, opts...) svr := server.NewServer(options...) if err := svr.RegisterService(serviceInfo(), handler); err != nil { panic(err) } return svr } ``` 1. 创建`server.Server`对象 2. 在对象上注册一个服务(Service) 这里的服务就是你的Example服务,本质上就是那个回调对象 `server.NewServer`里主要是创建了`server.Server`(这是个接口,其唯一实现是`server.server`),初始化配置Options保存到`server.opt`中,根据配置初始化一些运行时中间件保存到`server.mws`中。 `Server.RegisterService`将服务及其提供的RPC方法等信息注册到`server`对象上,其最终会调用`services.addService`来完成: ```go func (s *services) addService(svcInfo *serviceinfo.ServiceInfo, handler interface{}, registerOpts *RegisterOptions) error { // 创建service对象 svc := newService(svcInfo, handler) ... // svcMap根据服务名找到service s.svcMap[svcInfo.ServiceName] = svc // methodSvcsMap根据方法名找到services for methodName := range svcInfo.Methods { svcs := s.methodSvcsMap[methodName] if registerOpts.IsFallbackService { svcs = append([]*service{svc}, svcs...) } else { svcs = append(svcs, svc) } s.methodSvcsMap[methodName] = svcs } return nil } ``` 这就是服务初始化的全部内容,包括初始化一些配置、注册服务,其实就是将服务名方法名与服务做映射便于处理客户端请求时找到对应的回调方法来处理业务逻辑。 ## 启动服务端 回到`main.go`,下一步就是调用`Server.Run`开始运行服务端。 ```go func (s *server) Run() (err error) { s.Lock() s.isRun = true s.Unlock() // 建立中间件调用链 s.buildFullInvokeChain() if err = s.check(); err != nil { return err } ... s.fillMoreServiceInfo(s.opt.RemoteOpt.Address) // 继续初始化remote相关的一些配置 // 以及初始化一些inBound(请求)、outBound(响应)中间件,比如限流就是在inBound中间件里做的 s.richRemoteOption() // 创建ServerTransHandler,负责传输过程中的关键步骤处理 transHdlr, err := s.newSvrTransHandler() if err != nil { return err } s.Lock() // remotesvr.NewServer创建remote.Server,其封装了net.Listener,提供开始监听和停止的功能,借助ServerTransHandler来处理每个连接上数据的读写等具体操作 s.svr, err = remotesvr.NewServer(s.opt.RemoteOpt, s.eps, transHdlr) s.Unlock() if err != nil { return err } ... // Start开始异步监听 errCh := s.svr.Start() ... // waitExit将服务注册到注册中心,最后阻塞等待结束信号退出或报错退出 if err = s.waitExit(errCh); err != nil { klog.Errorf("KITEX: received error and exit: error=%s", err.Error()) } // 停止监听和服务,从注册中心注销服务等 if e := s.Stop(); e != nil && err == nil { err = e klog.Errorf("KITEX: stop server error: error=%s", e.Error()) } return } ``` 去除了一些不影响分析的代码后,可以看到整个启动的流程还是比较清晰: 1. 初始化一些调用链和配置 2. 监听端口,等客户端请求打进来后,remote/trans相关代码解析数据,随后调用链进行处理 3. 将服务注册到注册中心,阻塞等待程序结束 4. 清理资源 下面一次介绍这关键的4步都是怎么做的。 ### 初始化调用链 这里调用链有两种,分别对应,普通的RPC(unary)和流式RPC(streaming),这里只基于普通的一元RPC进行讨论。 ```go func (s *server) buildInvokeChain() { innerHandlerEp := s.invokeHandleEndpoint() ... // 构建调用链 s.eps = endpoint.Chain(s.mws...)(innerHandlerEp) } // Chain connect middlewares into one middleware. func Chain(mws ...Middleware) Middleware { return func(next Endpoint) Endpoint { for i := len(mws) - 1; i >= 0; i-- { next = mws[i](next) } return next } } ``` 所谓调用链其实就是将之前初始化的中间件`s.mws`串起来,按照与下标相反的顺序依次调用: ```plaintext mw[n] -> mw[n-1] -> ... -> mw[0] ``` 这里对`endpoint.Endpoint`和`endpoint.Middleware`的用法还是挺有意思的。 OK,所以`Chain`函数就是将`s.mws`串起来成一个大的中间件,最后还调用了一个`innerHandlerEp`作为整个调用链的最后一个`Endpoint`,这个`Endpoint`有什么特别的呢,看一下: ```go func (s *server) invokeHandleEndpoint() endpoint.Endpoint { return func(ctx context.Context, args, resp interface{}) (err error) { // 获取RPC请求的服务和方法 ri := rpcinfo.GetRPCInfo(ctx) methodName := ri.Invocation().MethodName() serviceName := ri.Invocation().ServiceName() svc := s.svcs.svcMap[serviceName] svcInfo := svc.svcInfo ... // 拿到用户定义的回调方法 implHandlerFunc := svcInfo.MethodInfo(methodName).Handler() ... // 调用回调方法 err = implHandlerFunc(ctx, svc.handler, args, resp) ... return err } } ``` 最后一个`Endpoint`的作用是调用用户处理业务逻辑的回调方法。所以调用链实际上就是按一定顺序一个接一个地调用`Endpoint`,其中这些`Endpoint`包括了: 1. kitex内部固定使用到的`Endpoint`,比如用来记录调试信息(也可能没有,我猜的) 2. kitex根据用户通过`WithXXX`的配置生成的某些`Endpoint` 3. 用户通过`WithMiddleware`完全自定义的`Endpoint` 4. 根据RPC请求调用相应用户回调方法的`Endpoint` 另外这里回调方法具体是什么呢?看一下之前代码生成的`exampleservice.go`文件就知道了: ```go var serviceMethods = map[string]kitex.MethodInfo{ "ExampleMethod": kitex.NewMethodInfo( exampleMethodHandler, newExampleMethodArgs, newExampleMethodResult, false, kitex.WithStreamingMode(kitex.StreamingUnary), ), } ``` ### 监听端口 因为写法上非常经典,对比go http标准库中`http.ListenAndServe`十分相像,伪代码如下: ```go func ListenAndServe(addr string) { // 创建Listener ln, err := Listen(addr) for { // 建立新连接 conn, err := ln.Accept() // 开启goroutine处理这个连接,然后去建立下一个连接 go serve(conn) } } ``` 再回到kitex中,由`s.svr.Start()`开启监听和连接事件派发,由实现类`remote.server`具体实现: ```go func (s *server) Start() chan error { errCh := make(chan error, 1) // 创建listener // 具体是可能是标准库go net或者字节自己实现的netpoll网络库,由transSvr决定 ln, err := s.buildListener() if err != nil { errCh <- err return errCh } s.Lock() s.listener = ln s.Unlock() // 调用transSvr.BootstrapServer开始监听和派发 gofunc.GoFunc(context.Background(), func() { errCh <- s.transSvr.BootstrapServer(ln) }) return errCh } // 以标准库go net为例子 func (ts *transServer) BootstrapServer(ln net.Listener) (err error) { ... for { // 建立连接 conn, err := ts.ln.Accept() ... // 开启goroutine专门处理连接 go func() { var ( ctx = context.Background() err error ) defer func() { transRecover(ctx, conn, "OnRead") }() bc := newBufioConn(conn) // OnActive通知新连接建立 ctx, err = ts.transHdlr.OnActive(ctx, bc) if err != nil { klog.CtxErrorf(ctx, "KITEX: OnActive error=%s", err) return } for { ts.refreshDeadline(rpcinfo.GetRPCInfo(ctx), bc) // 循环处理该连接的读写 err := ts.transHdlr.OnRead(ctx, bc) if err != nil { ts.onError(ctx, err, bc) _ = bc.Close() return } } }() } } ``` 与`ListenAndServe`的方式进行比对,其实就是一样的处理方式。同样为了简单起见,这里分析的是基于gonet的transServer,如果是字节自研的netpoll,相对要更加复杂些,不过本质上也是让一个goroutine去等待连接建立,并派发给其他gorutine处理。 ### 服务注册 & 等待退出 服务起起来之后,就去注册服务到注册中心,让客户端可以通过服务名发现这个服务并建立连接,这个过程就是服务发现的第一步。最后就是等待结束信号退出或报错退出。 ```go func (s *server) waitExit(errCh chan error) error { exitSignal := s.opt.ExitSignal() // service may not be available as soon as startup. // 由于上一步的BootstrapServer是异步执行的,需要一点时间绑定端口、开启netpoll等待之类的操作需要些时间,期间可能会报错。这里采用sleep的方式简单等待,确定服务起起来后再将注册服务到注册中心 delayRegister := time.After(1 * time.Second) for { select { case err := <-exitSignal: // 收到结束信号退出,结束信号默认是SIGINT和SIGTERM中断,用户也可以通过WithExitSignal自定义 return err case err := <-errCh: // 报错退出 return err case <-delayRegister: s.Lock() // 注册服务到注册中心 if err := s.opt.Registry.Register(s.opt.RegistryInfo); err != nil { s.Unlock() return err } s.Unlock() } } } ``` ### 清理资源 `waitExit`退出之后服务停止,调用`Stop`做收尾工作: ```go func (s *server) Stop() (err error) { s.stopped.Do(func() { s.Lock() defer s.Unlock() muShutdownHooks.Lock() // 用户自定义的回调 for i := range onShutdown { onShutdown[i]() } muShutdownHooks.Unlock() // 将服务从注册中心注销 if s.opt.RegistryInfo != nil { err = s.opt.Registry.Deregister(s.opt.RegistryInfo) s.opt.RegistryInfo = nil } // 关闭listener if s.svr != nil { if e := s.svr.Stop(); e != nil { err = e } s.svr = nil } }) return } ```