--- tags: [wechat, article, claude, openai] title: "基于 Amazon Kinesis Data Streams 实现 DynamoDB 历史数据清理与增量同步" url: https://aws.amazon.com/cn/blogs/china/based-on-amazon-kinesis-data-streams-implement-dynamodb/ source: rss feed_name: AWS China Blog sha256: e7244f2bc03e124283777ac175b2d10d242c461b61a4824cf537bb2d83ab036e ---

摘要:本文介绍了一种基于 Amazon Kinesis Data Streams、AWS Lambda、AWS Glue 和 Amazon S3 的完整方案,帮助企业客户在不停机的前提下,对 Amazon DynamoDB 表进行历史数据清理、TTL 自动过期配置,并通过 Kinesis 实现增量数据的无缝同步,最终将过期数据归档至 Amazon S3 智能分层存储以降低长期成本。

目录

01 一、引言
02 二、概览
03 三、方案架构
04 四、前置条件
05 五、实施详解
06 六、成本分析
07 七、常见问题
08 八、总结
09 九、参考资源

一、引言

在使用 Amazon DynamoDB 构建业务系统的过程中,随着数据持续写入,表中会积累大量历史数据。对于视频字幕翻译、日志记录、IoT 事件等场景,数据往往具有明显的时效性——超过一定时间后不再被业务查询,但仍占用存储空间并产生费用。

企业客户在处理这类问题时,通常面临以下挑战:

  1. 存储成本持续增长:DynamoDB 按存储量计费,TB 级历史数据每月产生可观的存储费用,而这些数据可能已经不再被访问。
  2. 数据清理与业务连续性的矛盾:直接删除历史数据需要消耗大量写入容量(WCU),可能影响在线业务的正常读写;而创建新表并迁移数据,又面临迁移窗口期内增量数据丢失的风险。
  3. 增量同步的时间窗口限制:DynamoDB Streams 的数据保留期仅为 24 小时且不可修改。对于 10TB 以上的大表,历史数据的导出、清洗和导入流程可能需要数天时间,远超 24 小时的窗口期。
  4. 缺乏自动化的数据生命周期管理:清理后的数据如果没有归档机制,可能导致合规审计所需的历史数据永久丢失。

本文将介绍一种基于 Amazon Kinesis Data Streams 的完整解决方案,通过将增量同步的时间窗口从 24 小时扩展到最长 365 天,从根本上解决大数据量场景下的迁移时间约束问题。该方案与 AWS Well-Architected Framework 的卓越运营和成本优化支柱保持一致。

二、概览

2.1 业务场景

某字幕翻译业务中,每次翻译任务会在 DynamoDB 中生成大量字幕记录。随着业务运行,表中积累了数 TB 的历史数据,其中大部分超过 30 天的记录已不再被业务查询。客户希望:

2.2 方案收益

维度 收益
零数据丢失 Kinesis 保留期最长 365 天,彻底消除迁移窗口期的数据丢失风险
零停机迁移 源表持续提供服务,增量数据通过 Kinesis + Lambda 实时同步到新表
自动生命周期管理 TTL 自动过期删除 + DynamoDB Streams 归档到 S3 智能分层,形成完整的数据生命周期
成本显著降低 清理历史数据减少 DynamoDB 存储费用,归档数据利用 S3 智能分层最低至 $0.00099/GB/月
架构简洁 全部使用 AWS 托管服务,无需维护额外基础设施

2.3 方案选型:为什么选择 Kinesis Data Streams

在增量同步环节,DynamoDB 原生支持两种变更捕获机制。以下是关键对比:

DynamoDB Streams Kinesis Data Streams
数据保留期 24 小时(不可修改) 最长 365 天(可配置)
额外成本 免费 Kinesis 按 shard/小时计费
Lambda 代码 直接读取 DynamoDB 格式 需要 base64 解码
适用场景 小数据量,24h 内完成 大数据量,可能需要数天

对于 10TB 以上的大表,历史数据处理流程(导出 → Glue 清洗 → 导入新表)通常需要 2-5 天。DynamoDB Streams 的 24 小时保留期无法覆盖这一时间窗口,而 Kinesis Data Streams 可将保留期设为 7 天甚至更长,从根本上消除了时间约束。

三、方案架构

该方案分为两条并行的数据处理路径:历史数据批量处理和增量数据实时同步,最终汇聚到同一张新表,并通过 TTL + DynamoDB Streams 实现持续的数据生命周期管理。

[图1]

3.1 核心组件

组件 功能 说明
Amazon DynamoDB 源表与目标表 源表持续提供服务,新表仅包含清洗后的数据并启用 TTL
Amazon Kinesis Data Streams 增量变更捕获 记录源表所有写入操作,保留期可配置至 7 天以上
AWS Lambda 增量同步 + 归档 消费 Kinesis 记录写入新表(加 TTL),消费 DynamoDB Streams 归档到 S3
AWS Glue 历史数据清洗 筛选有效数据、添加 TTL 字段、转换为 DynamoDB JSON 格式
Amazon S3 数据中转与归档 存储导出数据、Glue 处理结果,以及过期数据的长期归档(智能分层)

3.2 工作流程与执行顺序

整体流程的执行顺序至关重要,必须确保增量捕获先于历史数据导出启动,以避免数据丢失:

步骤 1: 创建 Kinesis Data Stream(保留期 7 天)
步骤 2: 源表关联 Kinesis Stream             ← 最先做,开始记录所有变更
步骤 3: 创建 Lambda 函数(先不添加触发器)
步骤 4: 导出源表数据到 S3
步骤 5: Glue 处理历史数据(筛选+加TTL)
步骤 6: Import from S3 创建新表
步骤 7: 启用 TTL
步骤 8: 添加 Lambda 触发器(Kinesis,TRIM_HORIZON) ← 自动补上窗口期数据
步骤 9: 验证数据完整性
⚠ 步骤 4-7 需在 Kinesis 保留期(7 天)内完成,保留期可按需延长

ℹ 关键设计说明

为什么步骤 8 使用 TRIM_HORIZON

因为 Lambda 触发器设置为从 Kinesis Stream 的最早可用记录开始消费,这意味着从步骤 2 开始记录的所有增量变更都会被自动处理,无需手动对齐时间窗口。窗口期内与历史数据重叠的记录,通过 put_item 的覆盖特性自动去重。

四、前置条件

在开始实施之前,请确保满足以下条件:

AWS 环境要求

权限要求

五、实施详解

5.1 步骤一:创建 Kinesis Data Stream

首先创建用于捕获 DynamoDB 源表增量变更的 Kinesis Data Stream,并将保留期设置为 7 天,为后续的历史数据处理预留充足的时间窗口。

控制台操作

1. Kinesis 控制台 → Data streams → Create data stream

2. 配置:

配置项
Data stream name VideoTranslationSubtitle-stream
Capacity mode On-demand

3. 点击 Create data stream

4. 修改保留期:点击刚创建的 Stream → Configuration 标签 → Data retention period → Edit → 修改为 168 小时(7 天)→ Save changes

CLI 操作

aws kinesis create-stream \
  --stream-name VideoTranslationSubtitle-stream \
  --stream-mode-details StreamMode=ON_DEMAND \
  --region us-east-1
aws kinesis increase-stream-retention-period \
  --stream-name VideoTranslationSubtitle-stream \
  --retention-period-hours 168 \
  --region us-east-1

5.2 步骤二:DynamoDB 源表关联 Kinesis Stream

将源表与 Kinesis Data Stream 关联后,源表的所有写入、更新和删除操作都会被实时记录到 Stream 中。这一步必须在导出历史数据之前完成,以确保窗口期内的增量数据不会丢失。

控制台操作:

1. DynamoDB 控制台 → Tables → 选择 `VideoTranslationSubtitle`

2. Exports and streams 标签

3. Amazon Kinesis data stream details 区域 → Turn on

4. 选择 VideoTranslationSubtitle-stream

5. 点击 Turn on stream

CLI 操作:

aws dynamodb enable-kinesis-streaming-destination \
  --table-name VideoTranslationSubtitle \
  --stream-arn arn:aws:kinesis:us-east-1:<account-id>:stream/VideoTranslationSubtitle-stream \
  --region us-east-1

验证:

aws dynamodb describe-kinesis-streaming-destination \
  --table-name VideoTranslationSubtitle \
  --region us-east-1

确认 StreamStatusACTIVE 后再进行下一步。

5.3 步骤三:创建增量同步 Lambda 函数

该函数负责消费 Kinesis Data Stream 中的增量记录,为每条数据添加 TTL 字段后写入目标表。此步骤仅创建函数,暂不添加触发器——触发器将在历史数据导入完成后再添加。

创建函数:

1. Lambda 控制台 → Create function

2. 配置:

配置项
Function name ddb-kinesis-sync-ttl
Runtime Python 3.12
Architecture arm64
Execution role Create a new role with basic Lambda permissions

3. 粘贴以下代码:

import boto3
import json
import base64
import time
dynamodb = boto3.client('dynamodb', region_name='us-east-1')
TARGET_TABLE = 'VideoTranslationSubtitle-ttl'
TTL_DAYS = 30
def lambda_handler(event, context):
    for record in event['Records']:
        payload = base64.b64decode(record['kinesis']['data']).decode('utf-8')
        data = json.loads(payload)
        event_name = data.get('eventName', '')
        if event_name in ('INSERT', 'MODIFY'):
            new_image = data['dynamodb']['NewImage']
            ttl_value = int(time.time()) + TTL_DAYS * 86400
            new_image['test001'] = {'N': str(ttl_value)}
            dynamodb.put_item(TableName=TARGET_TABLE, Item=new_image)
    return {'statusCode': 200}

> ⚠ 注意:Kinesis 版代码需要对 `record[‘kinesis’][‘data’]` 进行 base64 解码,这与直接使用 DynamoDB Streams 触发器的代码不同。

4. 点击 Deploy

配置 IAM 权限:

Lambda 函数需要读取 Kinesis Stream 和写入目标 DynamoDB 表的权限。进入 Lambda 函数页面 → Configuration → Permissions → 点击 Role name → Add permissions → Create inline policy → JSON:

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": "dynamodb:PutItem",
      "Resource": "arn:aws:dynamodb:us-east-1:<account-id>:table/VideoTranslationSubtitle-ttl"
    },
    {
      "Effect": "Allow",
      "Action": [
        "kinesis:GetRecords",
        "kinesis:GetShardIterator",
        "kinesis:DescribeStream",
        "kinesis:DescribeStreamSummary",
        "kinesis:ListShards",
        "kinesis:ListStreams",
        "kinesis:SubscribeToShard"
      ],
      "Resource": "arn:aws:kinesis:us-east-1:<account-id>:stream/VideoTranslationSubtitle-stream"
    }
  ]
}
<account-id> 替换为实际 AWS 账户 ID

Policy name:`kinesis-sync-policy` → Create policy

调整 Lambda 配置:

Configuration → General configuration → Edit

配置项
Memory 256 MB
Timeout 1 min 0 sec

5.4 步骤四:导出历史数据到 S3

利用 DynamoDB 原生的 Export to S3 功能,将源表的全量数据导出。该操作不会影响源表的正常读写性能。

1. DynamoDB 控制台 → 左侧 Exports to S3 → Export to S3

2. 配置:

配置项
Source table VideoTranslationSubtitle
S3 bucket glue-test023
S3 prefix ddb-export/
Export format DynamoDB JSON
Export type Full export

3. 点击 Export,等待状态变为 Completed

验证:

aws s3 ls s3://glue-test023/ddb-export/ --recursive --summarize --human-readable

5.5 步骤五:Glue 处理历史数据(筛选 + 添加 TTL)

使用 AWS Glue ETL Job 对导出的历史数据进行清洗:筛选出最近 30 天的有效数据,为每条记录计算并添加 TTL 字段,最后转换为 DynamoDB JSON 格式输出到 S3。

上传 Glue 脚本:

aws s3 cp glue_add_ttl.py s3://aws-glue-assets-<account-id>-us-east-1/scripts/glue_add_ttl.py

Glue 脚本内容:

import sys
import time
from datetime import datetime, timedelta, timezone
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.context import SparkContext
from pyspark.sql.functions import col, udf
from pyspark.sql.types import LongType, StringType
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
S3_INPUT = "s3://glue-test023/ddb-export/"
S3_OUTPUT = "s3://glue-test023/ddb-with-ttl/"
ONE_MONTH_AGO = (datetime.now(timezone.utc) - timedelta(days=30)).strftime("%Y-%m-%dT%H:%M:%SZ")
@udf(returnType=LongType())
def calc_ttl(created_at):
    dt = datetime.strptime(created_at, "%Y-%m-%dT%H:%M:%SZ").replace(tzinfo=timezone.utc)
    return int((dt + timedelta(days=30)).timestamp())
df = spark.read.json(S3_INPUT)
df_flat = df.select(
    col("Item.JobId.S").alias("JobId"),
    col("Item.Index.N").cast("string").alias("Index"),
    col("Item.Content.S").alias("Content"),
    col("Item.BeginTime.S").alias("BeginTime"),
    col("Item.EndTime.S").alias("EndTime"),
    col("Item.CreatedAt.S").alias("CreatedAt"),
)
df_filtered = df_flat.filter(col("CreatedAt") >= ONE_MONTH_AGO)
df_with_ttl = df_filtered.withColumn("test001", calc_ttl(col("CreatedAt")))
@udf(returnType=StringType())
def to_ddb_json(job_id, index, content, begin_time, end_time, created_at, ttl):
    import json
    item = {"Item": {"JobId": {"S": job_id}, "Index": {"N": index}, "Content": {"S": content},
            "BeginTime": {"S": begin_time}, "EndTime": {"S": end_time},
            "CreatedAt": {"S": created_at}, "test001": {"N": str(ttl)}}}
    return json.dumps(item, ensure_ascii=False)
df_output = df_with_ttl.select(
    to_ddb_json("JobId", "Index", "Content", "BeginTime", "EndTime", "CreatedAt", "test001").alias("value"))
df_output.write.mode("overwrite").text(S3_OUTPUT)
job.commit()

创建 Glue Job(控制台)

1. Glue 控制台 → ETL jobs → Script editor → Engine: Spark → Create

2. 粘贴脚本 → Job details

配置项
Name VideoTranslationSubtitle-ttl
IAM Role 有 S3 读写权限的 Glue Role
Glue version 4.0 或 5.0
Language Python 3
Worker type G.1X
Number of workers 10(大数据量可增加到 50-100)
Job timeout 480 分钟
Job bookmark Disable

3. Save → Run

验证输出:

aws s3 ls s3://glue-test023/ddb-with-ttl/ --recursive --summarize --human-readable
aws s3 cp s3://glue-test023/ddb-with-ttl/<任意文件> - | head -1 | python3 -m json.tool

5.6 步骤六:从 S3 导入创建新表

使用 DynamoDB 的 Import from S3 功能,将 Glue 处理后的数据直接导入并创建新表。

> ⚠ 注意:Import from S3 会创建一张全新的表,不能导入到已有表。

1. DynamoDB 控制台 → Imports from S3 → Import from S3

第一页 – Source:

配置项
S3 source URL s3://glue-test023/ddb-with-ttl/
Import file format DynamoDB JSON
Import file compression