package main import ( "context" "fmt" "github.com/aws/aws-sdk-go-v2/service/dynamodb" "github.com/aws/aws-sdk-go-v2/service/dynamodb/types" "github.com/sirupsen/logrus" "github.com/sourcegraph/conc/pool" ) type MigrationClient struct { dbClient *dynamodb.Client OriginTable string TargetTable string // MutationFunc func(map[string]types.AttributeValue) (map[string]types.AttributeValue, error) log logrus.FieldLogger } func (mc *MigrationClient) batchWriteItemsV3(ctx context.Context, items []map[string]types.AttributeValue) error { p := pool.New().WithContext(ctx).WithMaxGoroutines(5).WithFirstError() const batchSize = 25 batchID := 1 for i := 0; i < len(items); i += batchSize { end := i + batchSize if end > len(items) { end = len(items) } batch := items[i:end] writeRequests := make([]types.WriteRequest, len(batch)) for j, item := range batch { writeRequests[j] = types.WriteRequest{ PutRequest: &types.PutRequest{ Item: item, }, } } p.Go(func(ctx context.Context) error { return mc.writeSub(ctx, writeRequests, batchID) }) batchID++ } return p.Wait() } func (mc *MigrationClient) writeSub(ctx context.Context, writeRequests []types.WriteRequest, batchID int) error { mc.log.Infof("received write operation. batchID: %v batchlen: %v", batchID, len(writeRequests)) input := &dynamodb.BatchWriteItemInput{ RequestItems: map[string][]types.WriteRequest{ mc.TargetTable: writeRequests, }, } for { result, err := mc.dbClient.BatchWriteItem(ctx, input) if err != nil { return fmt.Errorf("failed to batch write items (batchID: %v ): %w", batchID, err) } // Check for unprocessed items if len(result.UnprocessedItems) == 0 { break } // Log unprocessed items mc.log.Errorf("Unprocessed items: %v", result.UnprocessedItems) // Retry unprocessed items mc.log.Infof("Retrying unprocessed items... (BatchID: %v)", batchID) input.RequestItems = result.UnprocessedItems // Optional: Add a delay before retrying // time.Sleep(1 * time.Second) } return nil }