--- tags: [wechat, article, claude, openai] title: "基于 Amazon Kinesis Data Streams 实现 DynamoDB 历史数据清理与增量同步" url: https://aws.amazon.com/cn/blogs/china/based-on-amazon-kinesis-data-streams-implement-dynamodb/ source: rss feed_name: AWS China Blog sha256: e7244f2bc03e124283777ac175b2d10d242c461b61a4824cf537bb2d83ab036e ---
摘要:本文介绍了一种基于 Amazon Kinesis Data Streams、AWS Lambda、AWS Glue 和 Amazon S3 的完整方案,帮助企业客户在不停机的前提下,对 Amazon DynamoDB 表进行历史数据清理、TTL 自动过期配置,并通过 Kinesis 实现增量数据的无缝同步,最终将过期数据归档至 Amazon S3 智能分层存储以降低长期成本。
在使用 Amazon DynamoDB 构建业务系统的过程中,随着数据持续写入,表中会积累大量历史数据。对于视频字幕翻译、日志记录、IoT 事件等场景,数据往往具有明显的时效性——超过一定时间后不再被业务查询,但仍占用存储空间并产生费用。
企业客户在处理这类问题时,通常面临以下挑战:
本文将介绍一种基于 Amazon Kinesis Data Streams 的完整解决方案,通过将增量同步的时间窗口从 24 小时扩展到最长 365 天,从根本上解决大数据量场景下的迁移时间约束问题。该方案与 AWS Well-Architected Framework 的卓越运营和成本优化支柱保持一致。
某字幕翻译业务中,每次翻译任务会在 DynamoDB 中生成大量字幕记录。随着业务运行,表中积累了数 TB 的历史数据,其中大部分超过 30 天的记录已不再被业务查询。客户希望:
| 维度 | 收益 |
| 零数据丢失 | Kinesis 保留期最长 365 天,彻底消除迁移窗口期的数据丢失风险 |
| 零停机迁移 | 源表持续提供服务,增量数据通过 Kinesis + Lambda 实时同步到新表 |
| 自动生命周期管理 | TTL 自动过期删除 + DynamoDB Streams 归档到 S3 智能分层,形成完整的数据生命周期 |
| 成本显著降低 | 清理历史数据减少 DynamoDB 存储费用,归档数据利用 S3 智能分层最低至 $0.00099/GB/月 |
| 架构简洁 | 全部使用 AWS 托管服务,无需维护额外基础设施 |
在增量同步环节,DynamoDB 原生支持两种变更捕获机制。以下是关键对比:
| DynamoDB Streams | Kinesis Data Streams | |
| 数据保留期 | 24 小时(不可修改) | 最长 365 天(可配置) |
| 额外成本 | 免费 | Kinesis 按 shard/小时计费 |
| Lambda 代码 | 直接读取 DynamoDB 格式 | 需要 base64 解码 |
| 适用场景 | 小数据量,24h 内完成 | 大数据量,可能需要数天 |
对于 10TB 以上的大表,历史数据处理流程(导出 → Glue 清洗 → 导入新表)通常需要 2-5 天。DynamoDB Streams 的 24 小时保留期无法覆盖这一时间窗口,而 Kinesis Data Streams 可将保留期设为 7 天甚至更长,从根本上消除了时间约束。
该方案分为两条并行的数据处理路径:历史数据批量处理和增量数据实时同步,最终汇聚到同一张新表,并通过 TTL + DynamoDB Streams 实现持续的数据生命周期管理。
[图1] |
| 组件 | 功能 | 说明 |
| Amazon DynamoDB | 源表与目标表 | 源表持续提供服务,新表仅包含清洗后的数据并启用 TTL |
| Amazon Kinesis Data Streams | 增量变更捕获 | 记录源表所有写入操作,保留期可配置至 7 天以上 |
| AWS Lambda | 增量同步 + 归档 | 消费 Kinesis 记录写入新表(加 TTL),消费 DynamoDB Streams 归档到 S3 |
| AWS Glue | 历史数据清洗 | 筛选有效数据、添加 TTL 字段、转换为 DynamoDB JSON 格式 |
| Amazon S3 | 数据中转与归档 | 存储导出数据、Glue 处理结果,以及过期数据的长期归档(智能分层) |
整体流程的执行顺序至关重要,必须确保增量捕获先于历史数据导出启动,以避免数据丢失:
关键设计说明
为什么步骤 8 使用 TRIM_HORIZON?
因为 Lambda 触发器设置为从 Kinesis Stream 的最早可用记录开始消费,这意味着从步骤 2 开始记录的所有增量变更都会被自动处理,无需手动对齐时间窗口。窗口期内与历史数据重叠的记录,通过 put_item 的覆盖特性自动去重。
在开始实施之前,请确保满足以下条件:
AWS 环境要求
VideoTranslationSubtitle 为例)glue-test023 为例)us-east-1 为例)权限要求
首先创建用于捕获 DynamoDB 源表增量变更的 Kinesis Data Stream,并将保留期设置为 7 天,为后续的历史数据处理预留充足的时间窗口。
控制台操作
1. Kinesis 控制台 → Data streams → Create data stream
2. 配置:
| 配置项 | 值 |
| Data stream name | VideoTranslationSubtitle-stream |
| Capacity mode | On-demand |
3. 点击 Create data stream
4. 修改保留期:点击刚创建的 Stream → Configuration 标签 → Data retention period → Edit → 修改为 168 小时(7 天)→ Save changes
CLI 操作
aws kinesis create-stream \
--stream-name VideoTranslationSubtitle-stream \
--stream-mode-details StreamMode=ON_DEMAND \
--region us-east-1
aws kinesis increase-stream-retention-period \
--stream-name VideoTranslationSubtitle-stream \
--retention-period-hours 168 \
--region us-east-1
将源表与 Kinesis Data Stream 关联后,源表的所有写入、更新和删除操作都会被实时记录到 Stream 中。这一步必须在导出历史数据之前完成,以确保窗口期内的增量数据不会丢失。
控制台操作:
1. DynamoDB 控制台 → Tables → 选择 `VideoTranslationSubtitle`
2. Exports and streams 标签
3. Amazon Kinesis data stream details 区域 → Turn on
4. 选择 VideoTranslationSubtitle-stream
5. 点击 Turn on stream
CLI 操作:
aws dynamodb enable-kinesis-streaming-destination \
--table-name VideoTranslationSubtitle \
--stream-arn arn:aws:kinesis:us-east-1:<account-id>:stream/VideoTranslationSubtitle-stream \
--region us-east-1
验证:
aws dynamodb describe-kinesis-streaming-destination \
--table-name VideoTranslationSubtitle \
--region us-east-1
确认 StreamStatus 为 ACTIVE 后再进行下一步。
该函数负责消费 Kinesis Data Stream 中的增量记录,为每条数据添加 TTL 字段后写入目标表。此步骤仅创建函数,暂不添加触发器——触发器将在历史数据导入完成后再添加。
创建函数:
1. Lambda 控制台 → Create function
2. 配置:
| 配置项 | 值 |
| Function name | ddb-kinesis-sync-ttl |
| Runtime | Python 3.12 |
| Architecture | arm64 |
| Execution role | Create a new role with basic Lambda permissions |
3. 粘贴以下代码:
import boto3
import json
import base64
import time
dynamodb = boto3.client('dynamodb', region_name='us-east-1')
TARGET_TABLE = 'VideoTranslationSubtitle-ttl'
TTL_DAYS = 30
def lambda_handler(event, context):
for record in event['Records']:
payload = base64.b64decode(record['kinesis']['data']).decode('utf-8')
data = json.loads(payload)
event_name = data.get('eventName', '')
if event_name in ('INSERT', 'MODIFY'):
new_image = data['dynamodb']['NewImage']
ttl_value = int(time.time()) + TTL_DAYS * 86400
new_image['test001'] = {'N': str(ttl_value)}
dynamodb.put_item(TableName=TARGET_TABLE, Item=new_image)
return {'statusCode': 200}
>
注意:Kinesis 版代码需要对 `record[‘kinesis’][‘data’]` 进行 base64 解码,这与直接使用 DynamoDB Streams 触发器的代码不同。
4. 点击 Deploy
配置 IAM 权限:
Lambda 函数需要读取 Kinesis Stream 和写入目标 DynamoDB 表的权限。进入 Lambda 函数页面 → Configuration → Permissions → 点击 Role name → Add permissions → Create inline policy → JSON:
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": "dynamodb:PutItem",
"Resource": "arn:aws:dynamodb:us-east-1:<account-id>:table/VideoTranslationSubtitle-ttl"
},
{
"Effect": "Allow",
"Action": [
"kinesis:GetRecords",
"kinesis:GetShardIterator",
"kinesis:DescribeStream",
"kinesis:DescribeStreamSummary",
"kinesis:ListShards",
"kinesis:ListStreams",
"kinesis:SubscribeToShard"
],
"Resource": "arn:aws:kinesis:us-east-1:<account-id>:stream/VideoTranslationSubtitle-stream"
}
]
}
<account-id> 替换为实际 AWS 账户 ID
Policy name:`kinesis-sync-policy` → Create policy
调整 Lambda 配置:
Configuration → General configuration → Edit
| 配置项 | 值 |
| Memory | 256 MB |
| Timeout | 1 min 0 sec |
利用 DynamoDB 原生的 Export to S3 功能,将源表的全量数据导出。该操作不会影响源表的正常读写性能。
1. DynamoDB 控制台 → 左侧 Exports to S3 → Export to S3
2. 配置:
| 配置项 | 值 |
| Source table | VideoTranslationSubtitle |
| S3 bucket | glue-test023 |
| S3 prefix | ddb-export/ |
| Export format | DynamoDB JSON |
| Export type | Full export |
3. 点击 Export,等待状态变为 Completed
验证:
aws s3 ls s3://glue-test023/ddb-export/ --recursive --summarize --human-readable
使用 AWS Glue ETL Job 对导出的历史数据进行清洗:筛选出最近 30 天的有效数据,为每条记录计算并添加 TTL 字段,最后转换为 DynamoDB JSON 格式输出到 S3。
上传 Glue 脚本:
aws s3 cp glue_add_ttl.py s3://aws-glue-assets-<account-id>-us-east-1/scripts/glue_add_ttl.py
Glue 脚本内容:
import sys
import time
from datetime import datetime, timedelta, timezone
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.context import SparkContext
from pyspark.sql.functions import col, udf
from pyspark.sql.types import LongType, StringType
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
S3_INPUT = "s3://glue-test023/ddb-export/"
S3_OUTPUT = "s3://glue-test023/ddb-with-ttl/"
ONE_MONTH_AGO = (datetime.now(timezone.utc) - timedelta(days=30)).strftime("%Y-%m-%dT%H:%M:%SZ")
@udf(returnType=LongType())
def calc_ttl(created_at):
dt = datetime.strptime(created_at, "%Y-%m-%dT%H:%M:%SZ").replace(tzinfo=timezone.utc)
return int((dt + timedelta(days=30)).timestamp())
df = spark.read.json(S3_INPUT)
df_flat = df.select(
col("Item.JobId.S").alias("JobId"),
col("Item.Index.N").cast("string").alias("Index"),
col("Item.Content.S").alias("Content"),
col("Item.BeginTime.S").alias("BeginTime"),
col("Item.EndTime.S").alias("EndTime"),
col("Item.CreatedAt.S").alias("CreatedAt"),
)
df_filtered = df_flat.filter(col("CreatedAt") >= ONE_MONTH_AGO)
df_with_ttl = df_filtered.withColumn("test001", calc_ttl(col("CreatedAt")))
@udf(returnType=StringType())
def to_ddb_json(job_id, index, content, begin_time, end_time, created_at, ttl):
import json
item = {"Item": {"JobId": {"S": job_id}, "Index": {"N": index}, "Content": {"S": content},
"BeginTime": {"S": begin_time}, "EndTime": {"S": end_time},
"CreatedAt": {"S": created_at}, "test001": {"N": str(ttl)}}}
return json.dumps(item, ensure_ascii=False)
df_output = df_with_ttl.select(
to_ddb_json("JobId", "Index", "Content", "BeginTime", "EndTime", "CreatedAt", "test001").alias("value"))
df_output.write.mode("overwrite").text(S3_OUTPUT)
job.commit()
创建 Glue Job(控制台)
1. Glue 控制台 → ETL jobs → Script editor → Engine: Spark → Create
2. 粘贴脚本 → Job details
| 配置项 | 值 |
| Name | VideoTranslationSubtitle-ttl |
| IAM Role | 有 S3 读写权限的 Glue Role |
| Glue version | 4.0 或 5.0 |
| Language | Python 3 |
| Worker type | G.1X |
| Number of workers | 10(大数据量可增加到 50-100) |
| Job timeout | 480 分钟 |
| Job bookmark | Disable |
3. Save → Run
验证输出:
aws s3 ls s3://glue-test023/ddb-with-ttl/ --recursive --summarize --human-readable
aws s3 cp s3://glue-test023/ddb-with-ttl/<任意文件> - | head -1 | python3 -m json.tool
使用 DynamoDB 的 Import from S3 功能,将 Glue 处理后的数据直接导入并创建新表。
>
注意:Import from S3 会创建一张全新的表,不能导入到已有表。
1. DynamoDB 控制台 → Imports from S3 → Import from S3
第一页 – Source:
| 配置项 | 值 |
| S3 source URL | s3://glue-test023/ddb-with-ttl/ |
| Import file format | DynamoDB JSON |
| Import file compression |