--- description: "Read and write interleaved image-text datasets between WebDataset tar shards and Parquet using InterleavedParquetReader, InterleavedWebdatasetReader, and matching writers" categories: ["text-curation"] tags: ["interleaved", "io", "parquet", "webdataset", "reader", "writer", "schema"] personas: ["data-scientist-focused", "mle-focused"] difficulty: "intermediate" content_type: "how-to" modality: "universal" --- # Interleaved IO Read and write interleaved image-text data between WebDataset tar shards and Parquet. All four combinations work out of the box: ```text WDS tar ⇄ InterleavedBatch ⇄ Parquet ``` ## Understanding the Round-Trip ### Storage Formats Interleaved samples can live in two on-disk formats; in memory both materialize as `InterleavedBatch`: | Format | Layout | When to Use | |--------|--------|-------------| | **WebDataset tar shards** | One tar per shard, one file per item; `sample_id` encoded in member key | Streaming reads, S3-friendly, MINT-1T-compatible | | **Parquet rows** | One row per item (text/image/metadata), grouped by `sample_id` | Random access, column projection, ~5× faster reads in benchmarks | Choose based on the consumer: training loops that stream large datasets often prefer WDS tars; analytics or sample-level inspection prefers Parquet. ### When to Convert A common workflow is "curate once in Parquet, train from tars": 1. **Curate in Parquet** — fast reads, easy filtering, column projection. 2. **Convert to WDS** — once curation is done, write the final dataset as MINT-1T-style tars for the training loop. The IO stages on this page support that workflow without intermediate copies. ## Readers ### `InterleavedParquetReader` Reads Parquet files into `InterleavedBatch`. Each row maps to one item (text, image, or metadata) with a `sample_id` that groups items into samples. ```python from nemo_curator.stages.interleaved.io.reader import InterleavedParquetReader reader = InterleavedParquetReader( file_paths="s3://bucket/interleaved/*.parquet", fields=("source_url", "license"), # passthrough columns kept alongside reserved ones max_batch_bytes=512 * 1024 * 1024, # 512 MiB per output batch ) ``` Key behaviors: - **`fields=` passthrough**: keep additional columns alongside the reserved interleaved schema. Missing columns null-fill in a single pass. - **Push-down column projection**: the reader uses `pq.read_schema()` per file and asks Parquet for only the columns it needs. - **`max_batch_bytes` splitting**: large file groups are split into multiple batches by accumulated size; per-split lineage is preserved on the output `source_files` field. - **Schema control**: pass `schema=` for strict alignment to a target Arrow schema, or `schema_overrides=` for partial type overrides on top of `INTERLEAVED_SCHEMA`. Specifying both warns and prefers `schema=`. ### `InterleavedWebdatasetReader` Reads MINT-1T-style WebDataset tar shards. Same `fields=` and `max_batch_bytes` semantics as the Parquet reader. ```python from nemo_curator.stages.interleaved.io.reader import InterleavedWebdatasetReader reader = InterleavedWebdatasetReader( file_paths="s3://bucket/mint1t/*.tar", fields=("source_url",), sample_id_field="sample_id", image_extensions=["png", "jpg", "jpeg"], json_extensions=["json"], texts_field="texts", images_field="images", ) ``` Configurable file extensions for image, JSON, and texts members (`image_extensions`, `json_extensions`, `texts_field`, `images_field`, `image_member_field`) let the reader cooperate with non-standard tar layouts. ### Reader Parameters | Parameter | Type | Default | Description | | --- | --- | --- | --- | | `file_paths` | str \| list[str] | Required | Glob, list, or single path. Supports local, S3, GCS via fsspec. | | `files_per_partition` | int \| None | `None` | Group input files into partitions of this size. | | `blocksize` | int \| str \| None | `None` | Optional bytes-per-partition target (e.g., `"512MiB"`). | | `max_batch_bytes` | int \| None | `None` | Split large partitions into multiple `InterleavedBatch` outputs; `None` means no splitting. | | `fields` | tuple[str, ...] \| None | `None` | Passthrough columns to keep alongside the reserved `INTERLEAVED_SCHEMA`. | | `read_kwargs` | dict | `{}` | Forwarded to the underlying read (`pyarrow.parquet.read_table`, etc.). | | `schema` | pa.Schema \| None | `None` | Strict alignment target. Reserved columns get canonical types; passthrough columns surface overflow. | | `schema_overrides` | dict[str, pa.DataType] \| None | `None` | Partial type overrides on top of `INTERLEAVED_SCHEMA`. Warns when both `schema=` and `schema_overrides=` are set. | ## Writers ### `InterleavedParquetWriter` Writes `InterleavedBatch` to Parquet. Inherits `schema=` / `schema_overrides=` and the materialization-error policy (see below). ### `InterleavedWebdatasetWriterStage` Writes `InterleavedBatch` to MINT-1T-style WebDataset tar shards. ```python from nemo_curator.stages.interleaved.io.writers.webdataset import InterleavedWebdatasetWriterStage writer = InterleavedWebdatasetWriterStage( output_dir="./out", file_extension="tar", ) pipeline.add_stage(writer) ``` Implementation notes: - **`urllib.parse.quote(sample_id, safe="")`** is used as the tar member key — injective and roundtrip-safe with `sample_id_field="sample_id"` on the matching reader. - **Single-pass `groupby`**: rows are grouped by `sample_id` once instead of filtered per sample (O(n) instead of O(n × m)). - **Sparse positions**: gaps in the position field are preserved as `None` entries; the WDS reader skips them on the way back. - **Supported modalities**: `metadata`, `text`, and `image`. Any other modality raises `ValueError` at write time. ## Schema Utilities `utils/schema.py` ships shared helpers used by every arrow-based reader and writer. ### `reconcile_schema()` Apply canonical types to reserved columns; preserve passthrough column types as-is; avoid unsafe large↔small downcasts; unwrap Parquet dictionary encoding from passthrough columns. ### `align_table()` Pad, reorder, and cast an Arrow table to a target schema. Reserved columns use `safe=False` for predictable casts; passthrough columns use `safe=True` so overflow surfaces rather than silently corrupting data. Does **not** re-reconcile the user-provided target. ### `resolve_schema()` Merge `schema=` and `schema_overrides=` arguments with the canonical `INTERLEAVED_SCHEMA`; warn when both are provided; return `None` when neither is set. Used internally by readers and writers. ## Materialization Error Policy Writers expose `on_materialize_error=`, controlling what happens when a fetch step (or upstream stage) sets an error on a row: | Value | Behavior | Use Case | |-------|----------|----------| | `"error"` | Raise immediately (default for strict pipelines). | Production pipelines where an error indicates a real problem. | | `"warn"` | Emit a warning and keep the row. | Development; want visibility without halting. | | `"drop_row"` | Drop the offending row but keep the rest of the sample. | Resilient pipelines where one bad item shouldn't kill a sample. | | `"drop_sample"` | Drop the entire sample if any row in it errored. | Strict cleanliness; one bad row taints the whole sample. | The policy is applied after fetch, so it covers errors raised by the materialization step itself and by upstream stages. ## Mixed-Backend Path Handling The internal helper `_build_global_range_index()` groups paths by filesystem object so a single batch that mixes S3 and local paths no longer fails silently — each backend's range queries run against the right filesystem. This works automatically; you don't need to configure it. ## Performance Benchmarks on 80 NVMe shards of MINT-1T PDF data (6,818 samples, 9.9 GB WDS / 6.0 GB Parquet) with an aspect-ratio filter applied: | Path | Wall-clock | Samples/sec | Notes | | --- | --- | --- | --- | | WDS → Parquet | 76.8 s | 88.8 | Tar parsing dominates | | WDS → WDS | 75.4 s | 90.4 | Tar parsing dominates | | **Parquet → Parquet** | **15.7 s** | **435.0** | Parquet column projection wins | | **Parquet → WDS** | **18.5 s** | **368.4** | Parquet read + tar write | Parquet-sourced paths are roughly **5× faster** than WDS-sourced paths because tar parsing dominates the WDS read cost. ## Complete IO Pipeline Examples ### Curate-Once-in-Parquet, Train-from-Tars ```python from nemo_curator.pipeline import Pipeline from nemo_curator.backends.xenna import XennaExecutor from nemo_curator.stages.interleaved.io.reader import InterleavedParquetReader from nemo_curator.stages.interleaved.io.writers.webdataset import ( InterleavedWebdatasetWriterStage, ) from nemo_curator.stages.interleaved.filter.blur_filter import InterleavedBlurFilterStage pipeline = Pipeline(name="parquet_to_wds") # 1. Read curated Parquet (faster random access during filtering) pipeline.add_stage( InterleavedParquetReader(file_paths="s3://bucket/curated/*.parquet") ) # 2. Filter pipeline.add_stage(InterleavedBlurFilterStage(score_threshold=100.0)) # 3. Write final WDS tars (training-loop friendly) pipeline.add_stage(InterleavedWebdatasetWriterStage(output_dir="./final_tars")) executor = XennaExecutor() pipeline.run(executor) ``` ### Format Conversion (No Filtering) ```python # WDS → Parquet for analytics / ad-hoc inspection pipeline.add_stage( InterleavedWebdatasetReader(file_paths="s3://bucket/mint1t/*.tar") ) pipeline.add_stage(InterleavedParquetWriter(output_dir="./parquet_copy")) ``` ## Best Practices - **Curate in Parquet, ship in WDS**: Parquet is ~5× faster for filtering operations; convert to WDS only for the final training-loop format. - **Pass through provenance fields**: list source-tracking columns (`source_url`, `license`, `crawl_date`) in `fields=` so they survive into the output. Missing the list silently drops them. - **Use `max_batch_bytes` for large shards**: without splitting, a single 5 GB Parquet file becomes one giant batch. Set `max_batch_bytes=512 * 1024 * 1024` for memory-friendly batches. - **Pick the right `on_materialize_error`**: `"error"` for production, `"warn"` for development, `"drop_sample"` for strict cleanliness. The default raises — set it explicitly when you want anything else. ## Related Topics - **[Interleaved Filters](/curate-text/process-data/interleaved/filters)** — sample-level quality filters that operate on `InterleavedBatch`. - **[Nemotron-Parse PDF Pipeline](/curate-text/load-data/nemotron-parse-pdf)** — produces interleaved Parquet output from PDF inputs.