--- title: gRPC阅读(2)—— 客户端 categories: [源码阅读,grpc] --- ## 启动客户端 客户端的启动也是三部曲: 1. 初始化grpc.ClientConn 2. 创建service对应的Client(比如codegen生成的GreeterClient) 3. 发起rpc调用 第二步比较简单,只是把ClientConn作为GreeterClient的成员变量,重点分析建立连接和RPC调用 ## 初始化ClientConn 初始化ClientConn做了很多准备工作,包括但不限于: - 应用选项(DialOption) - 构建拦截器调用链(Interceptor) - 决定使用什么resolver(resolver.Builder) - 检查传输层凭证,比如TLS(TransportCredentials) - 解析自定义服务端配置(ServerConfig) - ... 但还有一些配置是在真正发起RPC调用的时候才会被设置和触发,比如重试限流器、RPC配置选择器、RPC负载均衡器等。 ```go func NewClient(target string, opts ...DialOption) (conn *ClientConn, err error) { cc := &ClientConn{ target: target, conns: make(map[*addrConn]struct{}), dopts: defaultDialOptions(), } // 重试限流器 cc.retryThrottler.Store((*retryThrottler)(nil)) // 配置选择器,动态选择每个RPC的调用配置 cc.safeConfigSelector.UpdateConfigSelector(&defaultConfigSelector{nil}) cc.ctx, cc.cancel = context.WithCancel(context.Background()) // options ... // 确定使用哪个resolver(默认为dns) if err := cc.initParsedTargetAndResolverBuilder(); err != nil { return nil, err } // 内部使用的全局perTarget options for _, opt := range globalPerTargetDialOptions { opt.DialOptionForTarget(cc.parsedTarget.URL).apply(&cc.dopts) } // 初始化拦截器调用链 chainUnaryClientInterceptors(cc) chainStreamClientInterceptors(cc) // 验证安全传输,如TLS if err := cc.validateTransportCredentials(); err != nil { return nil, err } // 解析以json格式指定的配置 // 如负载均衡配置、per-RPC方法超时等 if cc.dopts.defaultServiceConfigRawJSON != nil { scpr := parseServiceConfig(*cc.dopts.defaultServiceConfigRawJSON, cc.dopts.maxCallAttempts) if scpr.Err != nil { return nil, fmt.Errorf("%s: %v", invalidDefaultServiceConfigErrPrefix, scpr.Err) } cc.dopts.defaultServiceConfig, _ = scpr.Config.(*ServiceConfig) } // keepalive对服务端探活 cc.mkp = cc.dopts.copts.KeepaliveParams // 获取authority,作为请求头中的:authority字段 if err = cc.initAuthority(); err != nil { return nil, err } // 注册channelz,用于监测grpc的运行 // 可通过http协议访问/grpc/channelz/v1查看grpc的状态 cc.channelzRegistration(target) channelz.Infof(logger, cc.channelz, "parsed dial target is: %#v", cc.parsedTarget) channelz.Infof(logger, cc.channelz, "Channel authority set to %q", cc.authority) // 连接状态管理器 cc.csMgr = newConnectivityStateManager(cc.ctx, cc.channelz) // 负载均衡器,动态选择每个RPC的子通道 cc.pickerWrapper = newPickerWrapper(cc.dopts.copts.StatsHandlers) // stats cc.metricsRecorderList = stats.NewMetricsRecorderList(cc.dopts.copts.StatsHandlers) cc.initIdleStateLocked() // Safe to call without the lock, since nothing else has a reference to cc. // idle状态管理 cc.idlenessMgr = idle.NewManager((*idler)(cc), cc.dopts.idleTimeout) return cc, nil } ``` 这么一套下来可以看到,初始化ClientConn的时候并没有建立连接,所以猜测是在第一次发起RPC调用的时候才去尝试建立连接。还有一种验证方法是,把服务端关闭,尝试NewClient,是不会返回错误的。 ## 发起RPC调用 从官方例子中的SayHello方法进入分析: ```go func (c *greeterClient) SayHello(ctx context.Context, in *HelloRequest, opts ...grpc.CallOption) (*HelloReply, error) { // 调用RPC时指定的选项 cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) out := new(HelloReply) // 调用ClientConn.Invoke,传入相应的方法名 err := c.cc.Invoke(ctx, Greeter_SayHello_FullMethodName, in, out, cOpts...) if err != nil { return nil, err } return out, nil } func (cc *ClientConn) Invoke(ctx context.Context, method string, args, reply any, opts ...CallOption) error { // 调用RPC时指定的选项 opts = combine(cc.dopts.callOptions, opts) // 如果设置了拦截器,调用拦截器 if cc.dopts.unaryInt != nil { return cc.dopts.unaryInt(ctx, method, args, reply, cc, invoke, opts...) } // 直接调用invoke return invoke(ctx, method, args, reply, cc, opts...) } ``` 默认情况下没有任何拦截器,我们直接分析invoke即可: ```go func invoke(ctx context.Context, method string, req, reply any, cc *ClientConn, opts ...CallOption) error { // 创建stream cs, err := newClientStream(ctx, unaryStreamDesc, cc, method, opts...) if err != nil { return err } // 发送请求 if err := cs.SendMsg(req); err != nil { return err } // 等待响应 return cs.RecvMsg(reply) } ``` ok,从现在开始,下面的事情就开始变得复杂了,首先是创建stream,创建连接就是在这一步完成的,忽略其它细节,最终会来到newClientStreamWithParams这个函数中,在里面会创建连接(注意实际上不一定是“创建”,而是复用,但是第一次调用的时候肯定是建立),然后基于这个连接创建stream: ```go func newClientStreamWithParams(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, mc serviceconfig.MethodConfig, onCommit, doneFunc func(), opts ...CallOption) (_ iresolver.ClientStream, err error) { ... // Pick the transport to use and create a new stream on the transport. // Assign cs.attempt upon success. op := func(a *csAttempt) error { // 创建连接 if err := a.getTransport(); err != nil { return err } // 创建stream if err := a.newStream(); err != nil { return err } cs.attempt = a return nil } // 带有重试的创建stream if err := cs.withRetry(op, func() { cs.bufferForRetryLocked(0, op, nil) }); err != nil { return nil, err } ... return cs, nil } ``` 创建好stream之后,下面就基于这个stream发送请求和接收响应,首先看看发送请求的数据流转: ```go func (cs *clientStream) SendMsg(m any) (err error) { ... // load hdr, payload, data hdr, data, payload, pf, err := prepareMsg(m, cs.codec, cs.cp, cs.comp, cs.cc.dopts.copts.BufferPool) if err != nil { return err } ... // 带有重试的发送数据 op := func(a *csAttempt) error { return a.sendMsg(m, hdr, payload, dataLen, payloadLen) } onSuccessCalled := false err = cs.withRetry(op, func() { cs.bufferForRetryLocked(len(hdr)+payloadLen, op, payload.Free) onSuccessCalled = true }) ... return err } ``` 其中调用了csAttempt.sendMsg来发送数据: ```go func (a *csAttempt) sendMsg(m any, hdr []byte, payld mem.BufferSlice, dataLength, payloadLength int) error { ... // 写数据 if err := a.t.Write(a.s, hdr, payld, &transport.Options{Last: !cs.desc.ClientStreams}); err != nil { ... } ... } ``` 其中使用的是ClientTransport来发送数据: ```go func (t *http2Client) Write(s *ClientStream, hdr []byte, data mem.BufferSlice, opts *Options) error { ... // 创建dataFrame df := &dataFrame{ streamID: s.id, endStream: opts.Last, h: hdr, reader: reader, } ... // 通知loopy发送数据 if err := t.controlBuf.put(df); err != nil { _ = reader.Close() return err } return nil } ``` 最终是通过loopy来将请求包含的数据发送出去。 再看下接收响应的数据流转: ```go func (cs *clientStream) RecvMsg(m any) error { ... err := cs.withRetry(func(a *csAttempt) error { return a.recvMsg(m, recvInfo) }, cs.commitAttemptLocked) } ``` 同样是通过csAttempt上的recvMsg方法接收数据: ```go func (a *csAttempt) recvMsg(m any, payInfo *payloadInfo) (err error) { if err := recv(a.p, cs.codec, a.s, a.dc, m, *cs.callInfo.maxReceiveMessageSize, payInfo, a.decomp, false); err != nil { ... } } func recv(p *parser, c baseCodec, s recvCompressor, dc Decompressor, m any, maxReceiveMessageSize int, payInfo *payloadInfo, compressor encoding.Compressor, isServer bool) error { data, err := recvAndDecompress(p, s, dc, maxReceiveMessageSize, payInfo, compressor, isServer) ... } func recvAndDecompress(p *parser, s recvCompressor, dc Decompressor, maxReceiveMessageSize int, payInfo *payloadInfo, compressor encoding.Compressor, isServer bool, ) (out mem.BufferSlice, err error) { pf, compressed, err := p.recvMsg(maxReceiveMessageSize) ... } ... ``` 一路点进去各个接收数据的关键方法/函数,可以发现数据是由其它reader线程读出并解码成帧,并封装为recvMsg送入channel中,当从stream试图接收数据的时候,实际上是从channel中读取这些帧,然后进行处理。 ## 总结 这篇写得总感觉头重脚轻,但是没办法,越写越发现客户端还真的挺复杂的,负载均衡、动态配置、连接懒建立、重试机制...这一套下来,限于篇幅不打算再在此赘述。 所以这篇的核心目的是,简单梳理发起一次RPC调用的数据流转,并且大概了解负载均衡...这套东西很多是在RPC调用的时候才会触发的即可。