---
name: kratos-events
description: Implements event-driven architecture with Watermill pub/sub (RabbitMQ/AMQP/message queue) for go-kratos microservices. Creates publisher/subscriber wrappers, event handlers, worker services with context propagation (request_id, correlation_id), and RabbitMQ/AMQP integration. Use when implementing pub/sub patterns, creating message queues, adding event handlers, building worker services for async processing, or setting up background jobs.
---
## How Event-Driven Architecture Works in Kratos
### Three-Layer Architecture
**1. Platform Layer** (`platform/events/`)
- Broker-agnostic interfaces (Publisher, Subscriber)
- Allows swapping brokers (AMQP, Redis, Kafka) without changing code
**2. Data Layer** (`services/{service}/internal/data/mq/`)
- Publisher/Subscriber wrappers
- Context propagation and metadata enrichment
- Structured logging with request_id and correlation_id
**3. Application Layer** (`services/{service}/internal/handlers/`)
- Event handlers with business logic
- Delegate to use cases (biz layer)
- Extract context and metadata from messages
### Worker Services
Each service can have a companion **worker** binary:
```
services/{service}/
├── cmd/
│ ├── {service}/ # Main HTTP/gRPC service
│ └── {service}-worker/ # Event processing worker
├── internal/
│ ├── handlers/ # Event handlers
│ ├── worker/ # Watermill router setup
│ └── ...
```
### Context Propagation Flow
```
HTTP Request → Middleware (request_id) → Use Case → Publisher
↓ (message with context + metadata)
Subscriber → Handler → Use Case (same request_id in logs)
```
**Best Practice**: Always use `logger.WithContext(ctx)` for distributed tracing.
What would you like to do?
1. Create publisher/subscriber wrappers for a new service
2. Add event handler for existing service
3. Set up worker service for async processing
4. Publish events from business layer
5. View examples and patterns
**Wait for response before proceeding.**
| Response | Workflow |
|----------|----------|
| 1, "create wrappers", "new service" | `workflows/create-pubsub-wrappers.md` |
| 2, "add handler", "handler" | `workflows/add-event-handler.md` |
| 3, "worker", "async", "background" | `workflows/setup-worker.md` |
| 4, "publish", "emit event" | `workflows/publish-events.md` |
| 5, "examples", "patterns", "help" | `workflows/view-examples.md` |
**After reading the workflow, follow it exactly.**
## 1. Create Publisher/Subscriber Wrappers
**File**: `services/{service}/internal/data/mq/publisher.go`
```go
package mq
import (
"context"
"fmt"
"platform/events"
middleware2 "platform/middleware"
"github.com/ThreeDotsLabs/watermill"
"github.com/ThreeDotsLabs/watermill/message"
"github.com/ThreeDotsLabs/watermill/message/router/middleware"
"github.com/go-kratos/kratos/v2/log"
)
const RoutingKey = "routing_key"
func NewEventPublisher(pub message.Publisher, logger log.Logger) events.Publisher {
return &eventPublisher{
pub: pub,
logger: log.NewHelper(logger),
}
}
type eventPublisher struct {
pub message.Publisher
logger *log.Helper
}
func (ep *eventPublisher) Unwrap() message.Publisher {
return ep.pub
}
func (ep *eventPublisher) Publish(ctx context.Context, topic string, payload []byte) error {
msg := message.NewMessage(watermill.NewUUID(), payload)
// Propagate context to subscriber
msg.SetContext(ctx)
// Set correlation ID if not already set
correlationId := middleware.MessageCorrelationID(msg)
if correlationId == "" {
correlationId = watermill.NewUUID()
middleware.SetCorrelationID(correlationId, msg)
}
// Set routing key
SetMessageRoutingKey(topic, msg)
// Extract request ID from HTTP context
if requestID := extractRequestID(ctx); requestID != "" {
msg.Metadata.Set(middleware2.RequestIdKey, requestID)
}
ep.logger.WithContext(ctx).Infof("Publishing message %s to topic %s, correlation_id: %s", msg.UUID, topic, correlationId)
if err := ep.pub.Publish(topic, msg); err != nil {
ep.logger.WithContext(ctx).Errorf("Failed to publish message %s to topic %s: %v", msg.UUID, topic, err)
return fmt.Errorf("failed to publish message to topic %s: %w", topic, err)
}
return nil
}
func SetMessageRoutingKey(key string, msg *message.Message) {
if MessageRoutingKey(msg) != "" {
return
}
msg.Metadata.Set(RoutingKey, key)
}
func MessageRoutingKey(message *message.Message) string {
return message.Metadata.Get(RoutingKey)
}
func extractRequestID(ctx context.Context) string {
val := middleware2.RequestID()(ctx)
if requestID, ok := val.(string); ok {
return requestID
}
return ""
}
```
**File**: `services/{service}/internal/data/mq/subscriber.go`
```go
package mq
import (
"context"
"platform/events"
"github.com/ThreeDotsLabs/watermill/message"
"github.com/go-kratos/kratos/v2/log"
)
func NewEventSubscriber(sub message.Subscriber, logger log.Logger) events.Subscriber {
return &eventSubscriber{
sub: sub,
logger: log.NewHelper(logger),
}
}
type eventSubscriber struct {
sub message.Subscriber
logger *log.Helper
}
func (es *eventSubscriber) Subscribe(ctx context.Context, topic string) (<-chan *message.Message, error) {
es.logger.WithContext(ctx).Infof("Subscribing to topic: %s", topic)
messages, err := es.sub.Subscribe(ctx, topic)
if err != nil {
es.logger.WithContext(ctx).Errorf("Failed to subscribe to topic %s: %v", topic, err)
return nil, err
}
es.logger.WithContext(ctx).Infof("Successfully subscribed to topic: %s", topic)
return messages, nil
}
func (es *eventSubscriber) Close() error {
es.logger.Info("Closing event subscriber")
if err := es.sub.Close(); err != nil {
es.logger.Errorf("Failed to close subscriber: %v", err)
return err
}
es.logger.Info("Event subscriber closed successfully")
return nil
}
func (es *eventSubscriber) Unwrap() message.Subscriber {
return es.sub
}
```
## 2. Create Event Handler
**File**: `services/{service}/internal/handlers/{event}_handler.go`
```go
package handlers
import (
"{service}/internal/biz"
"github.com/ThreeDotsLabs/watermill/message"
"github.com/go-kratos/kratos/v2/log"
)
func NewLifecycleEventHandler(symbolUC biz.SymbolUseCase, logger log.Logger) *LifecycleEventHandler {
return &LifecycleEventHandler{
logger: log.NewHelper(logger),
symbolUC: symbolUC,
}
}
type LifecycleEventHandler struct {
logger *log.Helper
symbolUC biz.SymbolUseCase
}
func (h *LifecycleEventHandler) Handle(msg *message.Message) error {
// CRITICAL: Extract context from message
ctx := msg.Context()
// Extract correlation ID for tracing
correlationID := msg.Metadata.Get("correlation_id")
requestID := msg.Metadata.Get("request_id")
h.logger.WithContext(ctx).Infof(
"Processing lifecycle event - msgID: %s, correlationID: %s, requestID: %s",
msg.UUID,
correlationID,
requestID,
)
// Unmarshal payload
var payload EventPayload
if err := json.Unmarshal(msg.Payload, &payload); err != nil {
h.logger.WithContext(ctx).Errorf("Failed to unmarshal payload: %v", err)
return err
}
// Delegate to business layer
if err := h.symbolUC.ProcessLifecycleEvent(ctx, &payload); err != nil {
h.logger.WithContext(ctx).Errorf("Failed to process event: %v", err)
return err
}
return nil
}
```
**File**: `services/{service}/internal/handlers/provider.go`
```go
package handlers
import "github.com/google/wire"
// ProviderSet is handlers providers.
var ProviderSet = wire.NewSet(
NewLifecycleEventHandler,
)
```
## 3. Create Worker Service
**File**: `services/{service}/internal/worker/worker.go`
```go
package worker
import (
"context"
"fmt"
"platform/events"
platform_logger "platform/logger"
conf "symbols/internal/conf/gen"
"symbols/internal/handlers"
"sync"
"time"
"github.com/ThreeDotsLabs/watermill/message"
"github.com/ThreeDotsLabs/watermill/message/router/middleware"
"github.com/ThreeDotsLabs/watermill/message/router/plugin"
"github.com/go-kratos/kratos/v2/log"
)
type HookFunc func(ctx context.Context) error
type Worker interface {
Start() HookFunc
Stop() HookFunc
}
func NewWorker(router *message.Router, logger log.Logger) Worker {
return &worker{
logger: log.NewHelper(logger),
name: "watermill-router",
router: router,
closeTimeout: 15 * time.Second,
done: make(chan struct{}),
}
}
type worker struct {
logger *log.Helper
name string
router *message.Router
closeTimeout time.Duration
startOnce sync.Once
stopOnce sync.Once
done chan struct{}
err error
}
func (w *worker) Start() HookFunc {
return func(ctx context.Context) error {
w.startOnce.Do(func() {
go func() {
defer close(w.done)
w.logger.WithContext(ctx).Infof("Starting router %s", w.name)
if err := w.router.Run(ctx); err != nil {
w.err = fmt.Errorf("%s: router run: %w", w.name, err)
w.logger.WithContext(ctx).Errorf("%v", w.err)
}
}()
})
return nil
}
}
func (w *worker) Stop() HookFunc {
return func(ctx context.Context) error {
var closeErr error
w.stopOnce.Do(func() {
stopCtx, cancel := context.WithTimeout(ctx, w.closeTimeout)
defer cancel()
w.logger.WithContext(ctx).Infof("Closing router %s", w.name)
errCh := make(chan error, 1)
go func() { errCh <- w.router.Close() }()
select {
case <-stopCtx.Done():
w.logger.WithContext(ctx).Errorf("Shutting down %s (timeout: %v)", w.name, w.closeTimeout)
closeErr = fmt.Errorf("%s: router close timeout: %w", w.name, stopCtx.Err())
case err := <-errCh:
if err != nil {
w.logger.WithContext(ctx).Errorf("%s: router failed to gracefully close: %v", w.name, err)
closeErr = fmt.Errorf("%s: router close: %w", w.name, err)
}
}
})
<-w.done
if closeErr != nil && w.err == nil {
w.err = closeErr
}
return w.err
}
}
func NewRouter(cfg *conf.Data, lifecycleHandler *handlers.LifecycleEventHandler, eventPub events.Publisher, eventSub events.Subscriber, logger *platform_logger.WatermillLogger) *message.Router {
router, err := message.NewRouter(message.RouterConfig{}, logger)
if err != nil {
panic(err)
}
// SignalsHandler will gracefully shut down Router when SIGTERM is received
router.AddPlugin(plugin.SignalsHandler)
// Router level middleware is executed for every message sent to the router
router.AddMiddleware(
// CorrelationID will copy the correlation id from the incoming message's metadata to the produced messages
middleware.CorrelationID,
// The handler function is retried if it returns an error
middleware.Retry{
MaxRetries: 3,
InitialInterval: time.Millisecond * 100,
Logger: logger,
}.Middleware,
// Recoverer handles panics from handlers
middleware.Recoverer,
)
// Add handlers for specific topics
router.AddConsumerHandler("events", cfg.Mq.Exchange.Name, eventSub.Unwrap(), lifecycleHandler.Handle)
return router
}
```
**File**: `services/{service}/cmd/{service}-worker/main.go`
```go
package main
import (
"flag"
"os"
p "platform/logger"
"{service}/internal/conf/gen"
"{service}/internal/worker"
"github.com/go-kratos/kratos/v2"
"github.com/go-kratos/kratos/v2/config"
"github.com/go-kratos/kratos/v2/config/env"
"github.com/go-kratos/kratos/v2/config/file"
"github.com/go-kratos/kratos/v2/log"
_ "go.uber.org/automaxprocs"
)
var (
Name string = "{service}-worker"
Version string = "1.0"
configFile string
id, _ = os.Hostname()
)
func init() {
flag.StringVar(&configFile, "conf", "configs/config.yaml", "config path, eg: --conf config.yaml")
}
func newApp(worker worker.Worker, logger log.Logger) *kratos.App {
return kratos.New(
kratos.ID(id),
kratos.Name(Name),
kratos.Version(Version),
kratos.Metadata(map[string]string{}),
kratos.Logger(logger),
kratos.BeforeStart(worker.Start()),
kratos.AfterStop(worker.Stop()),
)
}
func main() {
flag.Parse()
c := config.New(
config.WithSource(
env.NewSource(),
file.NewSource(configFile),
),
)
defer c.Close()
if err := c.Load(); err != nil {
panic(err)
}
var bc conf.Bootstrap
if err := c.Scan(&bc); err != nil {
panic(err)
}
if err := bc.Validate(); err != nil {
panic(err)
}
logger := p.NewLogger(bc.Log.Level, id, Name, Version)
app, cleanup, err := wireApp(bc.Server, bc.Data, bc.Log, logger)
if err != nil {
panic(err)
}
defer cleanup()
if err := app.Run(); err != nil {
panic(err)
}
}
```
## 4. Publish Events from Business Layer
**File**: `services/{service}/internal/biz/{entity}.go`
```go
func (uc *symbolUseCase) CreateSymbol(ctx context.Context, symbol *Symbol) (*Symbol, error) {
// Validate
if err := uc.validator.Struct(symbol); err != nil {
return nil, err
}
// Create in database
result, err := uc.repo.Create(ctx, symbol)
if err != nil {
return nil, err
}
// Publish event (if publisher configured)
if uc.pub != nil {
payload, _ := json.Marshal(map[string]interface{}{
"symbol_id": result.Id,
"action": "created",
})
// Context is automatically propagated with request_id
if err := uc.pub.Publish(ctx, "symbols.created", payload); err != nil {
uc.log.WithContext(ctx).Errorf("Failed to publish event: %v", err)
// Don't fail the operation if event publishing fails
}
}
return result, nil
}
```
## Context Propagation
**ALWAYS extract context from message**:
```go
func (h *Handler) Handle(msg *message.Message) error {
ctx := msg.Context() // CRITICAL - preserves request_id chain
h.logger.WithContext(ctx).Infof("Processing...")
}
```
**ALWAYS use WithContext for logging**:
```go
h.logger.WithContext(ctx).Infof("...") // ✅ Includes request_id
h.logger.Infof("...") // ❌ No request_id
```
## Metadata Extraction
```go
correlationID := msg.Metadata.Get("correlation_id")
requestID := msg.Metadata.Get("request_id")
```
## Error Handling in Handlers
```go
func (h *Handler) Handle(msg *message.Message) error {
// Return error to trigger retry (configured in middleware)
if err := h.process(msg); err != nil {
h.logger.WithContext(ctx).Errorf("Processing failed: %v", err)
return err // Watermill will retry based on middleware config
}
// Return nil to ACK message
return nil
}
```
## Unwrap Pattern
Use `Unwrap()` to access underlying Watermill publisher/subscriber for router:
```go
router.AddConsumerHandler("name", topic, eventSub.Unwrap(), handler.Handle)
```
```
services/{service}/
├── cmd/
│ ├── {service}/ # Main service
│ └── {service}-worker/ # Worker service
│ ├── main.go
│ ├── wire.go
│ └── wire_gen.go
├── internal/
│ ├── data/
│ │ └── mq/
│ │ ├── publisher.go
│ │ └── subscriber.go
│ ├── handlers/ # Event handlers
│ │ ├── provider.go
│ │ └── lifecycle.go
│ └── worker/ # Worker setup
│ └── worker.go
```
## Adding to Wire
**File**: `services/{service}/internal/data/data.go`
```go
var ProviderSet = wire.NewSet(
NewData,
repo.NewSymbolRepo,
mq.NewEventPublisher, // Add publisher
mq.NewEventSubscriber, // Add subscriber
)
```
**File**: `services/{service}/internal/handlers/provider.go`
```go
var ProviderSet = wire.NewSet(
NewLifecycleEventHandler,
)
```
**File**: `services/{service}/internal/worker/provider.go`
```go
var ProviderSet = wire.NewSet(
NewWorker,
NewRouter,
)
```
**File**: `services/{service}/cmd/{service}-worker/wire.go`
```go
//go:build wireinject
func wireApp(
*conf.Server,
*conf.Data,
*conf.Log,
log.Logger,
) (*kratos.App, func(), error) {
wire.Build(
worker.ProviderSet,
handlers.ProviderSet,
biz.ProviderSet,
data.ProviderSet,
newApp,
)
return &kratos.App{}, nil, nil
}
```
After adding providers, run:
```bash
cd services/{service}
make generate
```
- [ ] Publisher wrapper enriches messages with context, correlation_id, request_id
- [ ] Subscriber wrapper provides lifecycle management
- [ ] Event handlers extract `ctx := msg.Context()`
- [ ] Event handlers use `logger.WithContext(ctx)`
- [ ] Event handlers delegate to biz layer (not data layer)
- [ ] Worker service uses graceful shutdown with timeout
- [ ] Router configured with middleware (CorrelationID, Retry, Recoverer)
- [ ] Unwrap() used for router integration
- [ ] Wire ProviderSets updated and regenerated
- [ ] Worker binary created in cmd/{service}-worker/
❌ **DON'T:**
- Log without context: `h.logger.Infof("...")`
- Ignore message context: process payload without extracting ctx
- Put business logic in handlers (delegate to use cases)
- Fail operations if event publishing fails
- Skip correlation ID propagation
- Use blocking operations in publisher without timeout
✅ **DO:**
- Always extract context: `ctx := msg.Context()`
- Always log with context: `h.logger.WithContext(ctx)`
- Delegate to use cases from handlers
- Make event publishing non-blocking (don't fail if publish fails)
- Use middleware.CorrelationID in router
- Set reasonable timeouts for worker shutdown
Event-driven architecture is correctly implemented when:
1. Publisher/subscriber wrappers created in `internal/data/mq/`
2. Event handlers created in `internal/handlers/`
3. Worker service created in `cmd/{service}-worker/`
4. Context propagation works end-to-end (request_id visible in all logs)
5. Correlation IDs tracked across service boundaries
6. Handlers delegate to business layer (biz)
7. Router configured with retry and recovery middleware
8. Wire dependencies configured and regenerated
9. Worker gracefully shuts down on SIGTERM
10. All logging uses `WithContext(ctx)` for distributed tracing
**Verification**:
- Publish event from HTTP request, verify request_id appears in worker logs
- Check correlation_id is consistent across publisher and subscriber
- Test graceful shutdown with SIGTERM
- Verify retry middleware works on handler errors
For detailed architecture documentation, see:
- `docs/pubsub-architecture.md` - Complete pub/sub implementation guide
- `platform/events/events.go` - Platform interfaces
- CLAUDE.md - Event-driven architecture overview