--- name: crud-transport-grpc description: PowerX gRPC 传输层规则(全局 server、拦截器链、DI)。 --- # PowerX CRUD Transport gRPC ## 步骤 1) 打开 `本文件内嵌规则`。 2) 按规则执行实现/校对。 3) 完成后按核对清单验收。 ## 核对点 - 与 PowerX 当前代码结构、路径与命名一致。 - 仅在传输层/契约层做职责内改动,不跨层越界。 ## 规则(内嵌) ### transport_grpc.yaml ````yaml kind: ruleset name: transport_grpc version: 1.0.0 owner: powerx status: stable meta: intent: > 规范 gRPC 服务器层的实现:单实例 Server、模块级 Handler;统一拦截器(鉴权/租户/日志/恢复/限流/校验); 错误映射到 gRPC status;尊重 deadline;透传/回写 metadata;按需支持流式事件与健康检查/反射。 references: - crud_grpc.yaml - crud_service.yaml - crud_di.yaml - proto_gen.yaml scope: applies_to: - "internal/transport/grpc/**/**.go" - "internal/server/grpc/**/**.go" principles: - 传输解耦:gRPC 层仅依赖 *shared.Deps 和 Service,不直连 Repo/DB/外部客户端。 - 可观测:记录 request_id/trace_id/tenant/actor 与方法名、耗时、错误码等关键指标。 - 一致性:错误语义与 REST/Service 对齐;分页与 DTO 吻合。 - 健壮性:统一恢复拦截;尊重客户端 deadline;合理默认超时;支持优雅停机。 - 扩展性:拦截器链条可插拔;流式事件名与 REST SSE 保持一致。 - 单实例:仅允许一个全局 gRPC Server(internal/server/grpc/server.go)。 checks: # ===== 全局 Server(唯一):负责 New + 拦截器 + 集中注册 ===== - id: grpc.global.server.shape level: error when: { file: "internal/server/grpc/server.go" } assert: - must_contain_regex: "grpc\\.NewServer\\(" - must_contain_regex: "Register[A-Za-z0-9]+ServiceServer\\(.*\\)" - must_not_import: ["gorm.io/gorm","database/sql","github.com/gin-gonic/gin"] # ===== 模块实现文件:允许 *_handler.go 或 service.go,禁止 new/register ===== - id: grpc.module.impl.shape level: error when: { glob: "internal/transport/grpc/**/{*_handler.go,service.go}" } assert: - must_contain_regex: "type\\s+Server\\s+struct\\s*{\\s*\\*shared\\.Deps\\s*}" - must_contain_regex: "func\\s+New\\(deps\\s*\\*shared\\.Deps\\)\\s*\\*Server" - must_not_contain_regex: "grpc\\.NewServer\\(" - must_not_contain_regex: "Register[A-Za-z0-9]+ServiceServer\\(" - must_not_import: ["database/sql","github.com/gin-gonic/gin"] # ===== 代码生成产物位置校验 ===== - id: grpc.codegen.layout level: error when: { file: "api/grpc/contract/buf.gen.yaml" } assert: - contains: "api/grpc/gen" # ========= 依赖注入:必须通过 Deps 注入 Service ========= di_usage: - id: grpc.depends_on_service_only level: error when: { glob: "internal/transport/grpc/**/**.go" } assert: - should_contain: "deps" - must_not_import: ["gorm.io/gorm","database/sql"] - must_not_call: ["redis.NewClient(","kafka.NewReader(","sql.Open("] # ========= 拦截器:Unary & Stream 必配链(检查全局 server)========= interceptors: - id: grpc.chain.interceptors level: error when: { file: "internal/server/grpc/server.go" } assert: - must_contain_any: - "grpc.ChainUnaryInterceptor(" - "grpc_middleware.ChainUnaryServer(" - must_contain_any: - "grpc.ChainStreamInterceptor(" - "grpc_middleware.ChainStreamServer(" - id: grpc.interceptors.kinds level: warn when: { file: "internal/server/grpc/server.go" } assert: - should_contain_any: - "AuthUnaryInterceptor" - "TenantUnaryInterceptor" - "LoggingUnaryInterceptor" - "RecoveryUnaryInterceptor" - "RateLimitUnaryInterceptor" - "ValidateUnaryInterceptor" # ========= Metadata 与 Deadline ========= metadata_deadline: - id: grpc.metadata.read level: error when: { glob: "internal/transport/grpc/**/**.go" } assert: - must_contain_any: ["metadata.FromIncomingContext","grpc.SetHeader","grpc.SetTrailer"] - id: grpc.deadline.respect level: warn when: { glob: "internal/transport/grpc/**/**.go" } assert: - should_contain_any: - "ctx, cancel := context.WithTimeout(" - "if _, ok := ctx.Deadline(); ok {" - "deadline, ok := ctx.Deadline()" # ========= 鉴权与多租户(通常由拦截器注入)========= auth_tenant: - id: grpc.auth.from_metadata level: warn when: { glob: "internal/transport/grpc/**/interceptors*.go" } assert: - should_contain_any: ["authorization","bearer","jwt","tenant","actor"] - id: grpc.tenant.required_in_ctx level: warn when: { glob: "internal/transport/grpc/**/**.go" } assert: - should_contain: "TenantIDFromCtx(" # ========= 错误映射 ========= error_mapping: - id: grpc.error.to_status level: error when: { glob: "internal/transport/grpc/**/**.go" } assert: - must_contain_any: ["status.Error(","status.Errorf("] - must_import: ["google.golang.org/grpc/status","google.golang.org/grpc/codes"] # ========= 服务实现仅调用 Service ========= handler_calls_service: - id: grpc.handler.calls.service level: error when: { glob: "internal/transport/grpc/**/**.go" } assert: - should_contain_regex: "s\\.Deps\\.[A-Za-z0-9]+Service\\." - must_not_import: ["internal/repository"] # ========= 流式(存在时)事件与取消 ========= streaming: - id: grpc.streaming.events level: warn when: { contains: "stream " } assert: - should_contain_any: ["start","token","data","final","end","error","heartbeat"] - should_contain_any: ["ctx.Err()","stream.Context()"] # ========= 可选:健康检查与反射(检查全局 server)========= health_reflection: - id: grpc.health.reflection level: info when: { file: "internal/server/grpc/server.go" } assert: - should_contain_any: - "health.NewServer(" - "reflection.Register(" acceptance: checklist: - "[ ] 仅存在一个全局 gRPC Server:internal/server/grpc/server.go" - "[ ] 模块实现通过 New(*shared.Deps) 构造,禁止 new/register server" - "[ ] 已配置 Unary/Stream 拦截器链(鉴权/租户/日志/恢复/限流/校验 可按需)" - "[ ] 通过 metadata 读写 request_id/trace_id,尊重 client deadline" - "[ ] 错误映射使用 status/codes,语义与 REST/Service 对齐" - "[ ] 如有流式:事件名与 REST SSE 对齐、处理取消与心跳" - "[ ] (可选)健康检查与反射启用" templates: server_go: | // internal/transport/grpc/server.go package grpcserver import ( "context" "net" "time" "google.golang.org/grpc" "google.golang.org/grpc/metadata" "google.golang.org/grpc/status" "google.golang.org/grpc/codes" "github.com/ArtisanCloud/PowerX/internal/app/shared" gen "{{module_path}}/api/grpc/gen/media" ) type Server struct { deps *shared.Deps gs *grpc.Server } func New(deps *shared.Deps) *Server { unary := grpc.ChainUnaryInterceptor( AuthUnaryInterceptor(deps), TenantUnaryInterceptor(deps), LoggingUnaryInterceptor(deps), RecoveryUnaryInterceptor(), // RateLimitUnaryInterceptor(...), // ValidateUnaryInterceptor(...), ) stream := grpc.ChainStreamInterceptor( AuthStreamInterceptor(deps), TenantStreamInterceptor(deps), LoggingStreamInterceptor(deps), RecoveryStreamInterceptor(), ) gs := grpc.NewServer(unary, stream) s := &Server{deps: deps, gs: gs} gen.RegisterMediaAssetServiceServer(gs, s) // 示例注册 return s } func (s *Server) Serve(l net.Listener) error { return s.gs.Serve(l) } func (s *Server) Stop() { s.gs.GracefulStop() } // ---- 示例:实现方法仅调用 Service,并处理 metadata / deadline ---- func (s *Server) GetMediaAsset(ctx context.Context, in *gen.GetMediaAssetRequest) (*gen.MediaAssetResponse, error) { // deadline:若上游未设置,则给一个合理默认 if _, ok := ctx.Deadline(); !ok { var cancel context.CancelFunc ctx, cancel = context.WithTimeout(ctx, 10*time.Second) defer cancel() } // 读取 request metadata(如 request-id) if md, ok := metadata.FromIncomingContext(ctx); ok { _ = md.Get("x-request-id") } tenantID := TenantIDFromCtx(ctx) // 由拦截器注入 out, err := s.deps.MediaAssetService.Get(ctx, tenantID, in.GetId()) if err != nil { return nil, toStatus(err) } return &gen.MediaAssetResponse{Data: MapToProto(out)}, nil } interceptors_go: | // internal/transport/grpc/interceptors.go package grpcserver import ( "context" "strings" "time" "google.golang.org/grpc" "google.golang.org/grpc/metadata" "google.golang.org/grpc/status" "google.golang.org/grpc/codes" "github.com/ArtisanCloud/PowerX/internal/app/shared" ) type ctxKey string const ( tenantKey ctxKey = "tenant_id" actorKey ctxKey = "actor_id" ) func TenantIDFromCtx(ctx context.Context) uint64 { v := ctx.Value(tenantKey) if v == nil { return 0 } if n, ok := v.(uint64); ok { return n } return 0 } func AuthUnaryInterceptor(deps *shared.Deps) grpc.UnaryServerInterceptor { return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) { md, _ := metadata.FromIncomingContext(ctx) tok := getBearer(md.Get("authorization")) if tok == "" { return nil, status.Error(codes.Unauthenticated, "missing token") } // TODO: 验证 JWT/STS → 解析 tenant/actor ctx = context.WithValue(ctx, tenantKey, /* tenantID */ uint64(1)) ctx = context.WithValue(ctx, actorKey, /* actorID */ uint64(1)) return handler(ctx, req) } } func TenantUnaryInterceptor(deps *shared.Deps) grpc.UnaryServerInterceptor { return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) { if TenantIDFromCtx(ctx) == 0 { return nil, status.Error(codes.InvalidArgument, "tenant missing") } return handler(ctx, req) } } func LoggingUnaryInterceptor(deps *shared.Deps) grpc.UnaryServerInterceptor { return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) { start := time.Now() resp, err := handler(ctx, req) // TODO: 记录 method、耗时、tenant、actor、status 等 _ = start return resp, err } } func RecoveryUnaryInterceptor() grpc.UnaryServerInterceptor { return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) { defer func() { if r := recover(); r != nil { err = status.Error(codes.Internal, "panic recovered") } }() return handler(ctx, req) } } func getBearer(vals []string) string { for _, v := range vals { v = strings.TrimSpace(v) if strings.HasPrefix(strings.ToLower(v), "bearer ") { return strings.TrimSpace(v[7:]) } } return "" } errors_map_go: | // internal/transport/grpc/errors_map.go package grpcserver import ( "errors" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" ) func toStatus(err error) error { switch { case IsNotFound(err): return status.Error(codes.NotFound, "not found") case IsConflict(err): return status.Error(codes.AlreadyExists, "conflict") case IsUnauthorized(err): return status.Error(codes.Unauthenticated, "unauthenticated") case IsForbidden(err): return status.Error(codes.PermissionDenied, "forbidden") case IsRateLimited(err): return status.Error(codes.ResourceExhausted, "rate limited") case IsPrecondition(err): return status.Error(codes.FailedPrecondition, "precondition failed") default: return status.Error(codes.Internal, err.Error()) } } mapping_helper_go: | // internal/transport/grpc/mapping_helper.go package grpcserver import ( gen "{{module_path}}/api/grpc/gen/media" m "{{module_path}}/pkg/corex/db/persistence/model/media" ) func MapToProto(in *m.MediaAsset) *gen.MediaAsset { if in == nil { return nil } return &gen.MediaAsset{ Id: in.ID, TenantId: in.TenantID, Name: in.Name, Code: in.Code, MetaJson: string(in.Meta), Status: int32(in.Status), // TODO: 时间字段转换 } } func MapListToProto(items []*m.MediaAsset) []*gen.MediaAsset { out := make([]*gen.MediaAsset, 0, len(items)) for _, it := range items { out = append(out, MapToProto(it)) } return out } ````