import sys from pyspark.sql import SparkSession from pyspark.sql.functions import col, to_date, lit, pmod from pyspark.sql.functions import hash as spark_hash # 実行時引数 # 例: iceberg-lab ap-tokyo-1 bucket = sys.argv[1] namespace = sys.argv[2] region = sys.argv[3] warehouse = f"oci://{bucket}@{namespace}/iceberg_warehouse" plain_base = f"oci://{bucket}@{namespace}/plain_demo" spark = ( SparkSession.builder .appName("iceberg-feature-demo-on-oci") .config( "spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions" ) .config("spark.sql.catalog.oci", "org.apache.iceberg.spark.SparkCatalog") .config("spark.sql.catalog.oci.type", "hadoop") .config("spark.sql.catalog.oci.warehouse", warehouse) .config( "spark.hadoop.fs.AbstractFileSystem.oci.impl", "com.oracle.bmc.hdfs.Bmc" ) .getOrCreate() ) def show_sql(title, sql_text): print("\n" + "=" * 80) print(title) print("=" * 80) spark.sql(sql_text).show(truncate=False) def table_path(table_name): # Hadoop catalog のデフォルト配置: # warehouse/demo/adb_sales のようになる db, tbl = table_name.split(".") return f"{warehouse}/{db}/{tbl}" def list_metadata_json(table_name): path = table_path(table_name) + "/metadata" jvm = spark._jvm conf = spark._jsc.hadoopConfiguration() p = jvm.org.apache.hadoop.fs.Path(path) fs = p.getFileSystem(conf) statuses = fs.listStatus(p) files = [] for s in statuses: name = s.getPath().getName() if name.endswith(".metadata.json"): files.append(s.getPath().toString()) return sorted(files) def latest_metadata_json(table_name): files = list_metadata_json(table_name) return files[-1] def oci_to_https(oci_uri): # oci://bucket@namespace/path/to/file rest = oci_uri.replace("oci://", "", 1) bucket_and_ns, object_name = rest.split("/", 1) bucket_name, ns = bucket_and_ns.split("@", 1) return ( f"https://objectstorage.{region}.oraclecloud.com" f"/n/{ns}/b/{bucket_name}/o/{object_name}" ) def print_adb_metadata_hint(label, table_name): meta_oci = latest_metadata_json(table_name) meta_https = oci_to_https(meta_oci) print(f"\n[ADB_METADATA_JSON] {label}") print(meta_https) spark.sql("CREATE NAMESPACE IF NOT EXISTS oci.demo") spark.sql("DROP TABLE IF EXISTS oci.demo.adb_sales") spark.sql("DROP TABLE IF EXISTS oci.demo.web_events") # ------------------------------------------------------------------- # 1. ADB でも確認する non-partitioned Iceberg table # ------------------------------------------------------------------- spark.sql(""" CREATE TABLE oci.demo.adb_sales ( order_id BIGINT, customer_id BIGINT, amount DOUBLE, order_ts TIMESTAMP ) USING iceberg """) spark.sql(""" INSERT INTO oci.demo.adb_sales VALUES (1, 101, 120.0, TIMESTAMP '2026-04-01 10:00:00'), (2, 102, 80.0, TIMESTAMP '2026-04-01 11:00:00'), (3, 103, 200.0, TIMESTAMP '2026-04-02 09:00:00') """) print_adb_metadata_hint("schema_v1", "demo.adb_sales") show_sql( "schema evolution 前の Iceberg table", "SELECT * FROM oci.demo.adb_sales ORDER BY order_id" ) # ------------------------------------------------------------------- # 1. 安全な schema evolution # ------------------------------------------------------------------- spark.sql(""" ALTER TABLE oci.demo.adb_sales ADD COLUMNS ( coupon_code STRING COMMENT 'coupon used at purchase time' ) """) spark.sql(""" INSERT INTO oci.demo.adb_sales VALUES (4, 104, 150.0, TIMESTAMP '2026-04-03 13:00:00', 'WELCOME') """) print_adb_metadata_hint("schema_v2", "demo.adb_sales") show_sql( "schema evolution 後。古い行の coupon_code は NULL になる", """ SELECT order_id, amount, coupon_code FROM oci.demo.adb_sales ORDER BY order_id """ ) # plain Parquet で似たことをするとどう見えるか plain_schema_path = f"{plain_base}/schema_evolution_parquet" spark.createDataFrame( [(1, 120.0), (2, 80.0)], ["order_id", "amount"] ).write.mode("overwrite").parquet(plain_schema_path) spark.createDataFrame( [(3, 200.0, "WELCOME")], ["order_id", "amount", "coupon_code"] ).write.mode("append").parquet(plain_schema_path) print("\n" + "=" * 80) print("plain Parquet: mergeSchema なしで読む") print("=" * 80) spark.read.parquet(plain_schema_path).printSchema() spark.read.parquet(plain_schema_path).show(truncate=False) print("\n" + "=" * 80) print("plain Parquet: mergeSchema ありで読む") print("=" * 80) spark.read.option("mergeSchema", "true").parquet(plain_schema_path).printSchema() spark.read.option("mergeSchema", "true").parquet(plain_schema_path).show(truncate=False) # ------------------------------------------------------------------- # 2. hidden partitioning # ------------------------------------------------------------------- spark.sql(""" CREATE TABLE oci.demo.web_events ( event_id BIGINT, user_id BIGINT, event_ts TIMESTAMP, event_type STRING ) USING iceberg PARTITIONED BY (days(event_ts)) """) spark.sql(""" INSERT INTO oci.demo.web_events VALUES (1, 201, TIMESTAMP '2026-04-01 10:00:00', 'view'), (2, 202, TIMESTAMP '2026-04-01 10:05:00', 'click'), (3, 201, TIMESTAMP '2026-04-02 09:00:00', 'view'), (4, 203, TIMESTAMP '2026-04-02 09:10:00', 'buy') """) show_sql( "hidden partitioning: 利用者は event_day ではなく event_ts で検索する", """ SELECT event_id, user_id, event_ts, event_type FROM oci.demo.web_events WHERE event_ts >= TIMESTAMP '2026-04-01 00:00:00' AND event_ts < TIMESTAMP '2026-04-02 00:00:00' ORDER BY event_id """ ) show_sql( "Iceberg の files metadata table。partition 情報は裏側で管理されている", """ SELECT spec_id, partition, record_count, file_path FROM oci.demo.web_events.files ORDER BY spec_id, file_path """ ) # plain Parquet で partition column を露出させる例 plain_events_path = f"{plain_base}/events_by_day" events_df = spark.sql("SELECT * FROM oci.demo.web_events") ( events_df .withColumn("event_day", to_date(col("event_ts"))) .write .mode("overwrite") .partitionBy("event_day") .parquet(plain_events_path) ) plain_events = spark.read.parquet(plain_events_path) plain_events.createOrReplaceTempView("plain_events") show_sql( "plain Parquet: 物理 partition column の event_day が見えてしまう", """ SELECT event_id, user_id, event_ts, event_day FROM plain_events ORDER BY event_id """ ) # ------------------------------------------------------------------- # 3. partition evolution # ------------------------------------------------------------------- spark.sql(""" ALTER TABLE oci.demo.web_events ADD PARTITION FIELD bucket(4, user_id) AS user_bucket """) spark.sql(""" INSERT INTO oci.demo.web_events VALUES (5, 201, TIMESTAMP '2026-04-03 10:00:00', 'view'), (6, 202, TIMESTAMP '2026-04-03 10:10:00', 'click') """) show_sql( "partition evolution 後も、利用者は event_ts / user_id で普通に検索する", """ SELECT event_id, user_id, event_ts, event_type FROM oci.demo.web_events WHERE event_ts >= TIMESTAMP '2026-04-03 00:00:00' AND event_ts < TIMESTAMP '2026-04-04 00:00:00' AND user_id = 201 ORDER BY event_id """ ) show_sql( "partition evolution: 古い file は spec_id 0、新しい file は spec_id 1 になる", """ SELECT spec_id, partition, record_count, file_path FROM oci.demo.web_events.files ORDER BY spec_id, file_path """ ) # plain Parquet で途中から partition 構成を変える例 plain_mixed_path = f"{plain_base}/events_mixed_layout" before_df = spark.sql(""" SELECT * FROM oci.demo.web_events WHERE event_ts < TIMESTAMP '2026-04-03 00:00:00' """) after_df = spark.sql(""" SELECT * FROM oci.demo.web_events WHERE event_ts >= TIMESTAMP '2026-04-03 00:00:00' """) ( before_df .withColumn("event_day", to_date(col("event_ts"))) .write .mode("overwrite") .partitionBy("event_day") .parquet(plain_mixed_path) ) ( after_df .withColumn("event_day", to_date(col("event_ts"))) .withColumn("user_bucket", pmod(spark_hash(col("user_id")), lit(4))) .write .mode("append") .partitionBy("event_day", "user_bucket") .parquet(plain_mixed_path) ) print("\n" + "=" * 80) print("plain Parquet: 途中から partition 構成を変えた例") print("古いレイアウトと新しいレイアウトが混ざるため、管理が難しくなる") print("=" * 80) try: spark.read.parquet(plain_mixed_path).printSchema() spark.read.parquet(plain_mixed_path).show(truncate=False) except Exception as e: print("plain Parquet mixed layout read error:") print(str(e)[:1000]) # ------------------------------------------------------------------- # 4. snapshot ベースの time travel / rollback # ------------------------------------------------------------------- show_sql( "現在の snapshots", """ SELECT committed_at, snapshot_id, operation FROM oci.demo.adb_sales.snapshots ORDER BY committed_at """ ) snapshot_before_bad = spark.sql(""" SELECT snapshot_id FROM oci.demo.adb_sales.snapshots ORDER BY committed_at DESC LIMIT 1 """).collect()[0][0] print(f"\n[SNAPSHOT_BEFORE_BAD] {snapshot_before_bad}") print_adb_metadata_hint("before_bad_insert", "demo.adb_sales") spark.sql(""" INSERT INTO oci.demo.adb_sales VALUES (999, 999, -9999.0, TIMESTAMP '2026-04-05 00:00:00', 'BAD') """) print_adb_metadata_hint("after_bad_insert", "demo.adb_sales") show_sql( "誤投入後の現在状態", """ SELECT COUNT(*) AS row_count, SUM(amount) AS total_amount FROM oci.demo.adb_sales """ ) show_sql( "time travel: 誤投入前の snapshot を VERSION AS OF で読む", f""" SELECT COUNT(*) AS row_count, SUM(amount) AS total_amount FROM oci.demo.adb_sales VERSION AS OF {snapshot_before_bad} """ ) spark.sql(f""" CALL oci.system.rollback_to_snapshot('demo.adb_sales', {snapshot_before_bad}) """).show(truncate=False) print_adb_metadata_hint("after_rollback", "demo.adb_sales") show_sql( "rollback 後の現在状態", """ SELECT COUNT(*) AS row_count, SUM(amount) AS total_amount FROM oci.demo.adb_sales """ ) spark.stop()