--- title: gRPC阅读(1)—— 服务端 categories: [源码阅读,grpc] --- ## gRPC介绍 gRPC 是一种由 Google 开发的高性能远程过程调用(RPC)框架,适用于分布式系统间的通信。它基于 HTTP/2 进行传输,使用 Protocol Buffers 进行序列化,提供跨平台的兼容性。gRPC 的核心理念是让客户端像调用本地函数一样调用远程服务,简化服务间的调用流程。 通过编写与具体编程语言无关的 IDL (默认是 protobuf) 来定义 RPC 方法,gRPC 框架就会生成语言相关的客户端/服务端代码。 ## HTTP/2介绍 ![image-20221129021828289](https://cdn.jsdelivr.net/gh/NOS-AE/assets@main/img/650581-20221130203317095-178108264.png) 相比http1,具有更高的传输效率(多路复用:在同一个链连接上同时处理多个请求),更低的延迟(服务端推送,减少请求数量、简化header大小)、带宽利用率更高(头部压缩、数据流优先)、更安全(基于tls)。 http2具体特性有: 1. **帧、消息、流**:帧是小通信数据单元;消息由一个或多个帧组成。例如请求的消息和响应的消息;一个连接中包含多个流,每个流包含多个帧。帧通过流id进行标识属于哪个流 2. **二进制分帧**:每个消息由若干个帧组成,帧是最小传输单位,并且原来基于文本编码变成基于二进制,进一步减小帧大小 3. **压缩header** 4. **多路复用**:即在同一连接中的多个stream的传输互不影响 5. **服务端推送** 6. **流量控制和资源优先级**:流量控制以有效利用多路复用机制,确保只有接收者使用的数据会被传输。优先级机制可以确保重要的资源被优先传输。 ## 启动服务端 通过官方的 helloworld 例子可以看到,服务端的启动分为三步: 1. 创建gRPC的Server 2. 将业务handler注册到Server 3. 调用Server.Serve在端口上进行监听 第一步没什么好说的,注意下第二步注册进去的东西: ```go // 注册进去的ServiceDesc var Greeter_ServiceDesc = grpc.ServiceDesc{ ServiceName: "helloworld.Greeter", HandlerType: (*GreeterServer)(nil), Methods: []grpc.MethodDesc{ { MethodName: "SayHello", Handler: _Greeter_SayHello_Handler, }, }, Streams: []grpc.StreamDesc{}, Metadata: "helloworld/helloworld.proto", } // Method对应的handler func _Greeter_SayHello_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { in := new(HelloRequest) if err := dec(in); err != nil { return nil, err } if interceptor == nil { return srv.(GreeterServer).SayHello(ctx, in) } info := &grpc.UnaryServerInfo{ Server: srv, FullMethod: Greeter_SayHello_FullMethodName, } handler := func(ctx context.Context, req interface{}) (interface{}, error) { return srv.(GreeterServer).SayHello(ctx, req.(*HelloRequest)) } return interceptor(ctx, in, info, handler) } ``` 前两步比较简单,再来看第三步的如何建立连接并进行处理。类似标准库http的`ListenAndServe`,本质上就是创建一个死循环等待有新的连接到来,然后开新的goroutine去处理这个连接上的读写事件: ```go func (s *Server) Serve(lis net.Listener) error { ... // 优雅停止相关 s.serveWG.Add(1) defer func() { s.serveWG.Done() if s.quit.HasFired() { // Stop or GracefulStop called; block until done and return nil. <-s.done.Done() } }() // 初始化listenerSocket ls := &listenSocket{ Listener: lis, channelz: channelz.RegisterSocket(&channelz.Socket{ SocketType: channelz.SocketTypeListen, Parent: s.channelz, RefName: lis.Addr().String(), LocalAddr: lis.Addr(), SocketOptions: channelz.GetSocketOption(lis)}, ), } s.lis[ls] = true ... channelz.Info(logger, ls.channelz, "ListenSocket created") var tempDelay time.Duration // how long to sleep on accept failure for { // 建立新连接 rawConn, err := lis.Accept() // 处理连接错误 if err != nil { if ne, ok := err.(interface { Temporary() bool }); ok && ne.Temporary() { // 偶发性错误,continue } s.mu.Lock() s.printf("done serving; Accept = %v", err) s.mu.Unlock() if s.quit.HasFired() { return nil } // 返回错误 return err } tempDelay = 0 s.serveWG.Add(1) // 开新的goroutine去处理这个连接 go func() { s.handleRawConn(lis.Addr().String(), rawConn) s.serveWG.Done() }() } } ``` 其中处理连接的代码在handleRawConn中: ```go func (s *Server) handleRawConn(lisAddr string, rawConn net.Conn) { ... // 创建http2 transport,http2也是gRPC唯一的ServerTransport st := s.newHTTP2Transport(rawConn) rawConn.SetDeadline(time.Time{}) if st == nil { return } // ? if cc, ok := rawConn.(interface { PassServerTransport(transport.ServerTransport) }); ok { cc.PassServerTransport(st) } // 将该transport保存到s中 if !s.addConn(lisAddr, st) { return } // 开启新协程去处理连接 go func() { s.serveStreams(context.Background(), st, rawConn) s.removeConn(lisAddr, st) }() } ``` 至此,各个步骤都十分明了以及常规,下面继续深入分析Server如何处理每个连接。(最后两行,暂时不理解为什么要多开一个协程而不是在当前协程进行处理,不过问题不大) ## 处理连接 ```go func (s *Server) serveStreams(ctx context.Context, st transport.ServerTransport, rawConn net.Conn) { ctx = transport.SetConnection(ctx, rawConn) ctx = peer.NewContext(ctx, st.Peer()) // stats for _, sh := range s.opts.statsHandlers { ctx = sh.TagConn(ctx, &stats.ConnTagInfo{ RemoteAddr: st.Peer().Addr, LocalAddr: st.Peer().LocalAddr, }) sh.HandleConn(ctx, &stats.ConnBegin{}) } defer func() { st.Close(errors.New("finished serving streams for the server transport")) for _, sh := range s.opts.statsHandlers { sh.HandleConn(ctx, &stats.ConnEnd{}) } }() // 信号量限制处理stream的最大并发度 streamQuota := newHandlerQuota(s.opts.maxConcurrentStreams) // transport将连接封装为一个个stream进行处理 st.HandleStreams(ctx, func(stream *transport.ServerStream) { s.handlersWG.Add(1) streamQuota.acquire() f := func() { defer streamQuota.release() defer s.handlersWG.Done() // 处理stream s.handleStream(st, stream) } // 优先让worker来处理这个stream if s.opts.numServerWorkers > 0 { select { case s.serverWorkerChannel <- f: return default: } } // 如果所有的worker都在忙,那么就新开一个协程来处理这个stream go f() }) } ``` 我们先忽略transport如何连接封装为stream,我们把stream看成是连接就行了。另外暂时不管worker是什么。下面直接分析handleStream如何处理stream ## 处理流 以下直接省略了不重要的代码分支,以便不用费时间观看这些垃圾细节(比如pos=-1,找不到斜杠则关闭流之类的处理) ```go func (s *Server) handleStream(t transport.ServerTransport, stream *transport.ServerStream) { // 获取service名和method名 sm := stream.Method() pos := strings.LastIndex(sm, "/") service := sm[:pos] method := sm[pos+1:] // 查询service srv, knownService := s.services[service] if knownService { // 查询unery-method if md, ok := srv.methods[method]; ok { // 处理unary-method s.processUnaryRPC(ctx, t, stream, srv, md, ti) return } // 查询streaming-method if sd, ok := srv.streams[method]; ok { // 处理streaming-method s.processStreamingRPC(ctx, t, stream, srv, sd, ti) return } } // 调用用户自定义的处理器 if unknownDesc := s.opts.unknownStreamDesc; unknownDesc != nil { s.processStreamingRPC(ctx, t, stream, nil, unknownDesc, ti) return } // 即找不到服务/方法,也没有用户自定义处理器,给客户端返回错误 ... } ``` 其实很简单,所谓查询的service/method其实就是在创建Server时由用户注册进来的那些信息。 我们继续分析最简单的unary-rpc的处理函数processUnaryRPC,这个函数挺长的,不过大多数代码都与tracing, logging, stats相关,不影响主流程分析。但也看得出来一个框架不能仅仅是完成功能,可观测性和后续的可维护性也同样重要: ```go func (s *Server) processUnaryRPC(ctx context.Context, t transport.ServerTransport, stream *transport.ServerStream, info *serviceInfo, md *MethodDesc, trInfo *traceInfo) (err error) { // tracing, logging ... // 设置解码器和编码器 ... // 从stream读取数据并解码 // 这里的解码指的是将payload从数据帧从取出,用BufferSlice保存 d, err := recvAndDecompress(&parser{r: stream, bufferPool: s.opts.bufferPool}, stream, dc, s.opts.maxReceiveMessageSize, payInfo, decomp, true) // decode function // 这里的解码指的是将payload反序列化成具体的对象 df := func(v any) error { if err := s.getCodec(stream.ContentSubtype()).Unmarshal(d, v); err != nil { return status.Errorf(codes.Internal, "grpc: error unmarshalling request: %v", err) } // tracing, stats, logging ... return nil } ctx = NewContextWithServerTransportStream(ctx, stream) // 调用用户注册进来的Method对应的Handler,即处理请求,返回响应 // 可以看到,返回的错误命名为appErr,属于业务层抛出的错误 reply, appErr := md.Handler(info.serviceImpl, ctx, df, s.opts.unaryInt) if appErr != nil { appStatus, ok := status.FromError(appErr) ... if e := t.WriteStatus(stream, appStatus); e != nil { channelz.Warningf(logger, s.channelz, "grpc: Server.processUnaryRPC failed to write status: %v", e) } ... return appErr } if trInfo != nil { trInfo.tr.LazyLog(stringer("OK"), false) } opts := &transport.Options{Last: true} // 发送响应 if err := s.sendResponse(ctx, t, stream, reply, cp, opts, comp); err != nil { ... } ... return t.WriteStatus(stream, statusOK) } ``` ## 多路复用 gRPC的多路复用实际上就是HTTP2的多路复用,即在同一连接上可以同时处理多个不同的stream。之前分析到handleStream这个函数的时候,有没有想过这里的`stream`是怎么来的呢,答案就藏在`st.HandleStreams`中:通过传入处理stream的回调函数,由transport负责解析出这些stream之后,封装成ServerStream,调用回调函数交给Server进行处理。 在开始进入HandleStream方法之前,还需要先了解下controlBuffer和loopyWriter(简称为cb和loopy),cb是transport中的其中一个成员变量,负责给loopy传递数据帧和控制帧: ```go type http2Server struct { // controlBuf delivers all the control related tasks (e.g., window // updates, reset streams, and various settings) to the controller. controlBuf *controlBuffer } ``` 这里说的帧,要跟http2的帧区别开来,即使某些cb帧可能对应http2的帧类型,但cb帧最终目的还是控制loopy的状态和行为,而不是直接发送到网络上。如图有14种cb帧,他们都实现了cbItem接口: image-20241114221109098 loopy负责不断从cb中读取数据帧和控制帧,维护待写入数据的stream的状态、负责数据写入连接中(发送数据)。loopy在transport初始化的时候就开始运行: ```go // 创建loopy func NewServerTransport(conn net.Conn, config *ServerConfig) (_ ServerTransport, err error) { go func() { t.loopy = newLoopyWriter(serverSide, t.framer, t.controlBuf, t.bdpEst, t.conn, t.logger, t.outgoingGoAwayHandler, t.bufferPool) err := t.loopy.run() ... }() } // 运行loopy func (l *loopyWriter) run() (err error) { defer func() { if l.logger.V(logLevel) { l.logger.Infof("loopyWriter exiting with error: %v", err) } // 如果不是IO错误,冲刷缓冲区数据 if !isIOError(err) { l.framer.writer.Flush() } // 关闭cb l.cbuf.finish() }() // 循环处理帧 for { // 获取一个cb帧并处理(阻塞) it, err := l.cbuf.get(true) if err != nil { return err } if err = l.handle(it); err != nil { return err } // 处理发送数据 if _, err = l.processData(); err != nil { return err } gosched := true hasdata: // 循环处理数据 for { // 获取下一个cb帧并处理(非阻塞) it, err := l.cbuf.get(false) if err != nil { return err } if it != nil { if err = l.handle(it); err != nil { return err } if _, err = l.processData(); err != nil { return err } continue hasdata } isEmpty, err := l.processData() if err != nil { return err } if !isEmpty { continue hasdata } // 为了减少IO次数,当第一次读到的数据不满一批时,尝试yield等待更多数据进来 // 下一次还是不满的话,就不管了,直接冲刷缓冲区 if gosched { gosched = false if l.framer.writer.offset < minBatchSize { runtime.Gosched() continue hasdata } } l.framer.writer.Flush() break hasdata } } } ``` 总而言之cb+loopy类似一个小型的mq+consumer,不仅规范了流级别的传输控制,还将传输控制与传输内容本身进行了解耦。关于cb和loopy的内部实现我们暂时不关心。 很快我们会在HandleStreams中遇到第一个cb帧。下面回到HandleStreams中接着分析: ```go func (t *http2Server) HandleStreams(ctx context.Context, handle func(*ServerStream)) { defer func() { close(t.readerDone) <-t.loopyWriterDone }() // 循环读取帧 for { // 限制正在排队的cb响应帧个数,比如incomingSettings、cleanupStreams t.controlBuf.throttle() // 读取HTTP2帧 frame, err := t.framer.fr.ReadFrame() // 记录最后一次读操作,与keepalive相关 atomic.StoreInt64(&t.lastRead, time.Now().UnixNano()) if err != nil { if se, ok := err.(http2.StreamError); ok { // 如果是该stream错误,则直接关闭这个stream,不影响其它stream if t.logger.V(logLevel) { t.logger.Warningf("Encountered http2.StreamError: %v", se) } t.mu.Lock() s := t.activeStreams[se.StreamID] t.mu.Unlock() if s != nil { // 如果stream不为空,清空transport中维护的这个stream // 然后cleanupStream t.closeStream(s, true, se.Code, false) } else { // transport没有维护这个stream,直接cleanupStream t.controlBuf.put(&cleanupStream{ streamID: se.StreamID, rst: true, rstCode: se.Code, onWrite: func() {}, }) } continue } // 对于其它错误,直接强行关闭连接 t.Close(err) return } // 处理不同类型的帧 switch frame := frame.(type) { case *http2.MetaHeadersFrame: if err := t.operateHeaders(ctx, frame, handle); err != nil { // 处理header失败直接发送GoAway优雅关闭连接 t.controlBuf.put(&goAway{ code: http2.ErrCodeProtocol, debugData: []byte(err.Error()), closeConn: err, }) continue } case *http2.DataFrame: t.handleData(frame) case *http2.RSTStreamFrame: t.handleRSTStream(frame) case *http2.SettingsFrame: t.handleSettings(frame) case *http2.PingFrame: t.handlePing(frame) case *http2.WindowUpdateFrame: t.handleWindowUpdate(frame) case *http2.GoAwayFrame: // TODO: Handle GoAway from the client appropriately. default: if t.logger.V(logLevel) { t.logger.Infof("Received unsupported frame type %T", frame) } } } } ``` (暂时不清楚为什么要做throttle限流,而且只针对响应帧进行限流,那么不做这个限流会有什么影响吗?) 上面出现的两个cb帧: - cleanupStream:清空cb中维护的这个stream,如果rst=true,还会RST帧告知对方流被重置 - goAway:当解析协议失败,发送GoAway告知对方优雅关闭连接 HandleStream传入的handle,是在处理Header帧的时候使用的: ```go func (t *http2Server) operateHeaders(ctx context.Context, frame *http2.MetaHeadersFrame, handle func(*ServerStream)) error { // Header被截断,无法处理,直接RST if frame.Truncated { t.controlBuf.put(&cleanupStream{ streamID: streamID, rst: true, rstCode: http2.ErrCodeFrameSize, onWrite: func() {}, }) return nil } // 客户端streamID是奇数,服务端的是偶数,并且双方的streamID都是递增的 if streamID%2 != 1 || streamID <= t.maxStreamID { ... } t.maxStreamID = streamID // 创建ServerStream,维护服务端的stream buf := newRecvBuffer() s := &ServerStream{ ... } // 根据协议,处理所有相关headers // 如果遇到协议错误、header缺失、header解析失败等错误,则向客户端发送错误帧 for _, hf := range frame.Fields { switch hf.Name { case "content-type": ... } } // 保存metadata到context里 ... // 确认是否能接受这个stream(比如transport已关闭、超过stream的数量限制等都会被拒绝) if t.state != reachable { ... } if uint32(len(t.activeStreams)) >= t.maxStreams { ... } // grpc规定使用HTTP2的POST方法进行数据传输 if httpMethod != http.MethodPost { ... } // InTapHandle,是每个新stream建立时的回调,比如可以用来做限流 // 官方将其定为experimental,因此不过多介绍 if t.inTapHandle != nil { var err error if s.ctx, err = t.inTapHandle(s.ctx, &tap.Info{FullMethodName: s.method, Header: mdata}); err != nil { ... } } // 保存该stream t.activeStreams[streamID] = s // 流量控制的相关回调,当从stream读取数据时触发 s.requestRead = func(n int) { t.adjustWindow(s, uint32(n)) } s.ctxDone = s.ctx.Done() s.wq = newWriteQuota(defaultWriteQuota, s.ctxDone) s.trReader = &transportReader{ reader: &recvBufferReader{ ctx: s.ctx, ctxDone: s.ctxDone, recv: s.buf, }, windowHandler: func(n int) { t.updateWindow(s, uint32(n)) }, } // 将stream注册到loopy中 t.controlBuf.put(®isterStream{ streamID: s.id, wq: s.wq, }) // 调用我们的回调函数,对应“流处理”那个小节所做的事情 handle(s) return nil } ``` ## 总结 至此我们大概梳理了一下整个服务端数据流转的闭环: 1. 建立连接 2. 处理连接 3. 处理流(多路复用,一条连接上有多个流) 4. 调用业务逻辑进行Req/Reply处理 静态代码都过了一遍,然后再打个断点,看一下在一次unary-rpc的调用过程中,服务端都收到了什么帧。我们在HandleStreams方法中,给不同类型的帧都打上断点,运行发现按照顺序收到的帧是: 1. SettingsFrame(作为通信序言,协调后续发送相关的设置) 2. MetaHeadersFrame(请求头) 3. DataFrame(数据) 4. **PingFrame(这个是ping ack,服务端处理完这个帧并响应后,客户端那边就显示调用完成了)** 5. WindowUpdateFrame(用于更新服务端的发送窗口大小) 6. PingFrame(客户端最后还会再ping一下) 另外,第一个PingFrame不仅仅代表一个ping ack,还与流量控制有关,具体参考[这篇官方博客](https://grpc.io/blog/grpc-go-perf-improvements/#bdp-estimation-and-dynamic-flow-control-window)。大概意思就是这个ping会用来检测网络情况并动态更新发送窗口大小:服务端会首先在开始接收数据帧的时候发一个BDP ping(带宽时延积ping),当数据帧接收完毕后收到对方的ping ack,计算出这段时间内的BDP,以此来调整窗口大小。 最后再简单看一下请求头帧MetaHeadersFrame,包含了哪些header image-20241115163855163 可以看到有我们熟悉的:method,说明grpc基于http的POST方法、:path包含了service的名字和方法名并使用"/"分割、content-type是grpc、timeout是客户端调用rpc时传入context的超时时间,大概是1s。 ## 参考