--- description: "Read existing JSONL and Parquet datasets using Curator's reader stages." categories: ["how-to-guides"] tags: ["jsonl", "parquet", "data-loading", "reader", "pipelines"] personas: ["data-scientist-focused", "mle-focused"] difficulty: "beginner" content_type: "how-to" modality: "text-only" --- # Read Existing Data Use Curator's `JsonlReader` and `ParquetReader` to read existing datasets into a pipeline, then optionally add processing stages. :sync: jsonl ## Example: Read JSONL and Filter ```python from nemo_curator.core.client import RayClient from nemo_curator.pipeline import Pipeline from nemo_curator.stages.text.io.reader import JsonlReader from nemo_curator.stages.text.filters import ScoreFilter from nemo_curator.stages.text.filters.heuristic import WordCountFilter # Initialize Ray client ray_client = RayClient() ray_client.start() # Create pipeline for processing existing JSONL files pipeline = Pipeline(name="jsonl_data_processing") # Read JSONL files reader = JsonlReader( file_paths="/path/to/data", files_per_partition=4, fields=["text", "url"] # Only read specific columns ) pipeline.add_stage(reader) # Add filtering stage word_filter = ScoreFilter( filter_obj=WordCountFilter(min_words=50, max_words=1000), text_field="text" ) pipeline.add_stage(word_filter) # Add more stages to pipeline... # Execute pipeline results = pipeline.run() # Stop Ray client ray_client.stop() ``` :sync: parquet ## Example: Read Parquet and Filter ```python from nemo_curator.core.client import RayClient from nemo_curator.pipeline import Pipeline from nemo_curator.stages.text.io.reader import ParquetReader from nemo_curator.stages.text.filters import ScoreFilter from nemo_curator.stages.text.filters.heuristic import WordCountFilter # Initialize Ray client ray_client = RayClient() ray_client.start() # Create pipeline for processing existing Parquet files pipeline = Pipeline(name="parquet_data_processing") # Read Parquet files with PyArrow engine reader = ParquetReader( file_paths="/path/to/data", files_per_partition=4, fields=["text", "metadata"] # Only read specific columns ) pipeline.add_stage(reader) # Add filtering stage word_filter = ScoreFilter( filter_obj=WordCountFilter(min_words=50, max_words=1000), text_field="text" ) pipeline.add_stage(word_filter) # Add more stages to pipeline... # Execute pipeline results = pipeline.run() # Stop Ray client ray_client.stop() ``` ## Reader Configuration ### Common Parameters Both `JsonlReader` and `ParquetReader` support these configuration options: | Parameter | Type | Description | Default | | --- | --- | --- | --- | | `file_paths` | str \| list[str] | File paths or glob patterns to read | Required | | `files_per_partition` | int \| None | Number of files per partition. Overrides `blocksize` if both are provided. | None | | `blocksize` | int \| str \| None | Target partition size (e.g., "128MB"). Ignored if `files_per_partition` is provided. | None | | `fields` | list[str] \| None | Column names to read (column selection) | None (all columns) | | `read_kwargs` | dict[str, Any] \| None | Extra arguments for the underlying reader | None | ### Parquet-Specific Features `ParquetReader` provides these optimizations: - **PyArrow Engine**: Uses `pyarrow` engine by default for better performance - **Storage Options**: Supports cloud storage via `storage_options` in `read_kwargs` - **Schema Handling**: Automatic schema inference and validation - **Columnar Efficiency**: Optimized for reading specific columns ### Performance Tips - Use `fields` parameter to read required columns for better performance - Set `files_per_partition` based on your cluster size and memory constraints - Use `blocksize` for fine-grained control over partition sizes ## Output Integration Both readers produce `DocumentBatch` tasks that integrate seamlessly with: - **Processing Stages**: Apply filters, transformations, and quality checks - **Writer Stages**: Export to JSONL, Parquet, or other formats - **Analysis Tools**: Convert to Pandas/PyArrow for inspection and debugging