/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */ package org.apache.parquet.hadoop; import static java.lang.Boolean.TRUE; import static org.apache.parquet.Preconditions.checkArgument; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Set; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.FileSplit; import org.apache.parquet.Preconditions; import org.apache.parquet.conf.HadoopParquetConfiguration; import org.apache.parquet.conf.ParquetConfiguration; import org.apache.parquet.filter.UnboundRecordFilter; import org.apache.parquet.filter2.compat.FilterCompat; import org.apache.parquet.filter2.compat.FilterCompat.Filter; import org.apache.parquet.filter2.compat.RowGroupFilter; import org.apache.parquet.filter2.predicate.FilterPredicate; import org.apache.parquet.hadoop.api.InitContext; import org.apache.parquet.hadoop.api.ReadSupport; import org.apache.parquet.hadoop.api.ReadSupport.ReadContext; import org.apache.parquet.hadoop.metadata.BlockMetaData; import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; import org.apache.parquet.hadoop.metadata.GlobalMetaData; import org.apache.parquet.hadoop.metadata.ParquetMetadata; import org.apache.parquet.hadoop.util.ConfigurationUtil; import org.apache.parquet.hadoop.util.ContextUtil; import org.apache.parquet.hadoop.util.HiddenFileFilter; import org.apache.parquet.hadoop.util.SerializationUtil; import org.apache.parquet.io.ParquetDecodingException; import org.apache.parquet.schema.MessageType; import org.apache.parquet.schema.MessageTypeParser; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * The input format to read a Parquet file. *

* It requires an implementation of {@link ReadSupport} to materialize the records. *

* The requestedSchema will control how the original records get projected by the loader. * It must be a subset of the original schema. Only the columns needed to reconstruct the records with the requestedSchema will be scanned. * * @param the type of the materialized records * @see #READ_SUPPORT_CLASS * @see #UNBOUND_RECORD_FILTER * @see #STRICT_TYPE_CHECKING * @see #FILTER_PREDICATE * @see #TASK_SIDE_METADATA */ public class ParquetInputFormat extends FileInputFormat { private static final Logger LOG = LoggerFactory.getLogger(ParquetInputFormat.class); /** * key to configure the ReadSupport implementation */ public static final String READ_SUPPORT_CLASS = "parquet.read.support.class"; /** * key to configure the filter */ public static final String UNBOUND_RECORD_FILTER = "parquet.read.filter"; /** * key to configure type checking for conflicting schemas (default: true) */ public static final String STRICT_TYPE_CHECKING = "parquet.strict.typing"; /** * key to configure the filter predicate */ public static final String FILTER_PREDICATE = "parquet.private.read.filter.predicate"; /** * key to configure whether record-level filtering is enabled */ public static final String RECORD_FILTERING_ENABLED = "parquet.filter.record-level.enabled"; /** * key to configure whether row group stats filtering is enabled */ public static final String STATS_FILTERING_ENABLED = "parquet.filter.stats.enabled"; /** * key to configure whether row group dictionary filtering is enabled */ public static final String DICTIONARY_FILTERING_ENABLED = "parquet.filter.dictionary.enabled"; /** * key to configure whether column index filtering of pages is enabled */ public static final String COLUMN_INDEX_FILTERING_ENABLED = "parquet.filter.columnindex.enabled"; /** * key to configure whether page level checksum verification is enabled */ public static final String PAGE_VERIFY_CHECKSUM_ENABLED = "parquet.page.verify-checksum.enabled"; /** * key to configure whether row group bloom filtering is enabled */ public static final String BLOOM_FILTERING_ENABLED = "parquet.filter.bloom.enabled"; /** * Key to configure if off-heap buffer should be used for decryption */ public static final String OFF_HEAP_DECRYPT_BUFFER_ENABLED = "parquet.decrypt.off-heap.buffer.enabled"; /** * key to turn on or off task side metadata loading (default true) * if true then metadata is read on the task side and some tasks may finish immediately. * if false metadata is read on the client which is slower if there is a lot of metadata but tasks will only be spawn if there is work to do. */ public static final String TASK_SIDE_METADATA = "parquet.task.side.metadata"; /** * key to turn off file splitting. See PARQUET-246. */ public static final String SPLIT_FILES = "parquet.split.files"; private static final int MIN_FOOTER_CACHE_SIZE = 100; /** * Key to enable/disable vectored io while reading parquet files: * {@value}. */ public static final String HADOOP_VECTORED_IO_ENABLED = "parquet.hadoop.vectored.io.enabled"; /** * Default value of parquet.hadoop.vectored.io.enabled is {@value}. */ public static final boolean HADOOP_VECTORED_IO_DEFAULT = false; public static void setTaskSideMetaData(Job job, boolean taskSideMetadata) { ContextUtil.getConfiguration(job).setBoolean(TASK_SIDE_METADATA, taskSideMetadata); } public static boolean isTaskSideMetaData(Configuration configuration) { return configuration.getBoolean(TASK_SIDE_METADATA, TRUE); } public static void setReadSupportClass(Job job, Class readSupportClass) { ContextUtil.getConfiguration(job).set(READ_SUPPORT_CLASS, readSupportClass.getName()); } public static void setUnboundRecordFilter(Job job, Class filterClass) { Configuration conf = ContextUtil.getConfiguration(job); checkArgument( getFilterPredicate(conf) == null, "You cannot provide an UnboundRecordFilter after providing a FilterPredicate"); conf.set(UNBOUND_RECORD_FILTER, filterClass.getName()); } /** * @param configuration a configuration * @return an unbound record filter class * @deprecated use {@link #getFilter(Configuration)} */ @Deprecated public static Class getUnboundRecordFilter(Configuration configuration) { return ConfigurationUtil.getClassFromConfig(configuration, UNBOUND_RECORD_FILTER, UnboundRecordFilter.class); } private static UnboundRecordFilter getUnboundRecordFilterInstance(ParquetConfiguration configuration) { Class clazz = ConfigurationUtil.getClassFromConfig(configuration, UNBOUND_RECORD_FILTER, UnboundRecordFilter.class); if (clazz == null) { return null; } try { return (UnboundRecordFilter) clazz.newInstance(); } catch (InstantiationException | IllegalAccessException e) { throw new BadConfigurationException("could not instantiate unbound record filter class", e); } } public static void setReadSupportClass(JobConf conf, Class readSupportClass) { conf.set(READ_SUPPORT_CLASS, readSupportClass.getName()); } public static Class getReadSupportClass(Configuration configuration) { return ConfigurationUtil.getClassFromConfig(configuration, READ_SUPPORT_CLASS, ReadSupport.class); } public static void setFilterPredicate(Configuration configuration, FilterPredicate filterPredicate) { checkArgument( getUnboundRecordFilter(configuration) == null, "You cannot provide a FilterPredicate after providing an UnboundRecordFilter"); configuration.set(FILTER_PREDICATE + ".human.readable", filterPredicate.toString()); try { SerializationUtil.writeObjectToConfAsBase64(FILTER_PREDICATE, filterPredicate, configuration); } catch (IOException e) { throw new RuntimeException(e); } } private static FilterPredicate getFilterPredicate(Configuration configuration) { return getFilterPredicate(new HadoopParquetConfiguration(configuration)); } private static FilterPredicate getFilterPredicate(ParquetConfiguration configuration) { try { return SerializationUtil.readObjectFromConfAsBase64(FILTER_PREDICATE, configuration); } catch (IOException e) { throw new RuntimeException(e); } } /** * Returns a non-null Filter, which is a wrapper around either a * FilterPredicate, an UnboundRecordFilter, or a no-op filter. * * @param conf a configuration * @return a filter for the unbound record filter specified in conf */ public static Filter getFilter(Configuration conf) { return getFilter(new HadoopParquetConfiguration(conf)); } public static Filter getFilter(ParquetConfiguration conf) { return FilterCompat.get(getFilterPredicate(conf), getUnboundRecordFilterInstance(conf)); } private LruCache footersCache; private final Class> readSupportClass; /** * Hadoop will instantiate using this constructor */ public ParquetInputFormat() { this.readSupportClass = null; } /** * Constructor for subclasses, such as AvroParquetInputFormat, or wrappers. *

* Subclasses and wrappers may use this constructor to set the ReadSupport * class that will be used when reading instead of requiring the user to set * the read support property in their configuration. * * @param readSupportClass a ReadSupport subclass * @param the Java read support type */ public > ParquetInputFormat(Class readSupportClass) { this.readSupportClass = readSupportClass; } /** * {@inheritDoc} */ @Override public RecordReader createRecordReader(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException { Configuration conf = ContextUtil.getConfiguration(taskAttemptContext); ReadSupport readSupport = getReadSupport(conf); return new ParquetRecordReader(readSupport, getFilter(conf)); } /** * @param configuration to find the configuration for the read support * @return the configured read support * @deprecated use getReadSupportInstance static methods instead */ @Deprecated @SuppressWarnings("unchecked") ReadSupport getReadSupport(Configuration configuration) { return getReadSupportInstance( readSupportClass == null ? (Class>) getReadSupportClass(configuration) : readSupportClass); } /** * @param configuration to find the configuration for the read support * @param the Java type of objects created by the ReadSupport * @return the configured read support */ @SuppressWarnings("unchecked") public static ReadSupport getReadSupportInstance(Configuration configuration) { return getReadSupportInstance((Class>) getReadSupportClass(configuration)); } /** * @param readSupportClass to instantiate * @param the Java type of objects created by the ReadSupport * @return the configured read support */ @SuppressWarnings("unchecked") static ReadSupport getReadSupportInstance(Class> readSupportClass) { try { return readSupportClass.newInstance(); } catch (InstantiationException | IllegalAccessException e) { throw new BadConfigurationException("could not instantiate read support class", e); } } @Override protected boolean isSplitable(JobContext context, Path filename) { return ContextUtil.getConfiguration(context).getBoolean(SPLIT_FILES, true); } /** * {@inheritDoc} */ @Override public List getSplits(JobContext jobContext) throws IOException { Configuration configuration = ContextUtil.getConfiguration(jobContext); List splits = new ArrayList(); if (isTaskSideMetaData(configuration)) { // Although not required by the API, some clients may depend on always // receiving ParquetInputSplit. Translation is required at some point. for (InputSplit split : super.getSplits(jobContext)) { Preconditions.checkArgument(split instanceof FileSplit, "Cannot wrap non-FileSplit: %s", split); splits.add(ParquetInputSplit.from((FileSplit) split)); } return splits; } else { splits.addAll(getSplits(configuration, getFooters(jobContext))); } return splits; } /** * @param configuration the configuration to connect to the file system * @param footers the footers of the files to read * @return the splits for the footers * @throws IOException if there is an error while reading * @deprecated split planning using file footers will be removed */ @Deprecated public List getSplits(Configuration configuration, List