diff --git a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieReaderConfig.java b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieReaderConfig.java index 5a8eb69ad798b..24455397c0c5b 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieReaderConfig.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieReaderConfig.java @@ -61,7 +61,7 @@ public class HoodieReaderConfig extends HoodieConfig { public static final ConfigProperty FILE_GROUP_READER_ENABLED = ConfigProperty .key("hoodie.file.group.reader.enabled") - .defaultValue(true) + .defaultValue(false) .markAdvanced() .sinceVersion("1.0.0") .withDocumentation("Use engine agnostic file group reader if enabled"); diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyRelation.scala index f28da81417c4c..d10938129235a 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyRelation.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyRelation.scala @@ -80,8 +80,8 @@ case class BaseFileOnlyRelation(override val sqlContext: SQLContext, protected override def composeRDD(fileSplits: Seq[HoodieBaseFileSplit], tableSchema: HoodieTableSchema, - requiredSchema: HoodieTableSchema, - requestedColumns: Array[String], + requiredSchema/*schema required by caller + mandatory fields*/: HoodieTableSchema, + rawRequiredSchema/*schema required by caller*/: HoodieTableSchema, filters: Array[Filter]): RDD[InternalRow] = { val (partitionSchema, dataSchema, requiredDataSchema) = tryPrunePartitionColumns(tableSchema, requiredSchema) val baseFileReader = createBaseFileReader( diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala index bde6ab037470e..931739157d42b 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala @@ -350,6 +350,14 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext, * NOTE: DO NOT OVERRIDE THIS METHOD */ override final def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row] = { + val sourceSchema = prunedDataSchema.map(s => convertToAvroSchema(s, tableName)).getOrElse(tableAvroSchema) + + // 1. Generate raw required schema, only including the columns requested by the caller + val (rawRequiredAvroSchema, rawRequiredStructSchema, rawRequiredInternalSchema) = + projectSchema(Either.cond(internalSchemaOpt.isDefined, internalSchemaOpt.get, sourceSchema), requiredColumns) + val rawRequiredSchema = HoodieTableSchema(rawRequiredStructSchema, rawRequiredAvroSchema.toString, Some(rawRequiredInternalSchema)) + + // 2. Generate required schema, including mandatory fields // NOTE: PLEASE READ CAREFULLY BEFORE MAKING CHANGES // *Appending* additional columns to the ones requested by the caller is not a problem, as those // will be eliminated by the caller's projection; @@ -360,24 +368,24 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext, // schema conversion, which is lossy in nature (for ex, it doesn't preserve original Avro type-names) and // could have an effect on subsequent de-/serializing records in some exotic scenarios (when Avro unions // w/ more than 2 types are involved) - val sourceSchema = prunedDataSchema.map(s => convertToAvroSchema(s, tableName)).getOrElse(tableAvroSchema) val (requiredAvroSchema, requiredStructSchema, requiredInternalSchema) = projectSchema(Either.cond(internalSchemaOpt.isDefined, internalSchemaOpt.get, sourceSchema), targetColumns) + val requiredSchema = HoodieTableSchema(requiredStructSchema, requiredAvroSchema.toString, Some(requiredInternalSchema)) + + // 3. Generate table full schema, including metadata fields + val tableAvroSchemaStr = tableAvroSchema.toString + val tableSchema = HoodieTableSchema(tableStructSchema, tableAvroSchemaStr, internalSchemaOpt) val filterExpressions = convertToExpressions(filters) val (partitionFilters, dataFilters) = filterExpressions.partition(isPartitionPredicate) val fileSplits = collectFileSplits(partitionFilters, dataFilters) - val tableAvroSchemaStr = tableAvroSchema.toString - - val tableSchema = HoodieTableSchema(tableStructSchema, tableAvroSchemaStr, internalSchemaOpt) - val requiredSchema = HoodieTableSchema(requiredStructSchema, requiredAvroSchema.toString, Some(requiredInternalSchema)) if (fileSplits.isEmpty) { sparkSession.sparkContext.emptyRDD } else { - val rdd = composeRDD(fileSplits, tableSchema, requiredSchema, targetColumns, filters) + val rdd = composeRDD(fileSplits, tableSchema, requiredSchema, rawRequiredSchema, filters) // Here we rely on a type erasure, to workaround inherited API restriction and pass [[RDD[InternalRow]]] back as [[RDD[Row]]] // Please check [[needConversion]] scala-doc for more details @@ -390,15 +398,15 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext, * * @param fileSplits file splits to be handled by the RDD * @param tableSchema target table's schema - * @param requiredSchema projected schema required by the reader - * @param requestedColumns columns requested by the query + * @param requiredSchema projected schema required by the reader, including mandatory fields, only used for merging on read + * @param rawRequiredSchema schema required by the caller * @param filters data filters to be applied * @return instance of RDD (holding [[InternalRow]]s) */ protected def composeRDD(fileSplits: Seq[FileSplit], tableSchema: HoodieTableSchema, - requiredSchema: HoodieTableSchema, - requestedColumns: Array[String], + requiredSchema/*schema required by caller + mandatory fields*/: HoodieTableSchema, + rawRequiredSchema/*schema required by caller*/: HoodieTableSchema, filters: Array[Filter]): RDD[InternalRow] /** @@ -524,8 +532,8 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext, def updatePrunedDataSchema(prunedSchema: StructType): Relation protected def createBaseFileReaders(tableSchema: HoodieTableSchema, - requiredSchema: HoodieTableSchema, - requestedColumns: Array[String], + requiredSchema/*schema required by caller + mandatory fields*/: HoodieTableSchema, + rawRequiredSchema/*schema required by caller*/: HoodieTableSchema, requiredFilters: Seq[Filter], optionalFilters: Seq[Filter] = Seq.empty, baseFileFormat: HoodieFileFormat = tableConfig.getBaseFileFormat): HoodieMergeOnReadBaseFileReaders = { @@ -571,23 +579,23 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext, // For file groups without delta logs, we can make the following optimizations: // a) If the requested columns are not included in mandatoryColumns, they can be removed from requiredDataSchema. // b) Apply filters to reader for data skipping since no merging. - val mandatoryColumns = mandatoryFields.map(HoodieAvroUtils.getRootLevelFieldName) - val unusedMandatoryColumnNames = mandatoryColumns.filterNot(requestedColumns.contains) - val prunedRequiredSchema = if (unusedMandatoryColumnNames.isEmpty) { - requiredDataSchema - } else { - val prunedStructSchema = - StructType(requiredDataSchema.structTypeSchema.fields - .filterNot(f => unusedMandatoryColumnNames.contains(f.name))) - - HoodieTableSchema(prunedStructSchema, convertToAvroSchema(prunedStructSchema, tableName).toString) - } +// val mandatoryColumns = mandatoryFields.map(HoodieAvroUtils.getRootLevelFieldName) +// val unusedMandatoryColumnNames = mandatoryColumns.filterNot(requestedColumns.contains) +// val prunedRequiredSchema = if (unusedMandatoryColumnNames.isEmpty) { +// requiredDataSchema +// } else { +// val prunedStructSchema = +// StructType(requiredDataSchema.structTypeSchema.fields +// .filterNot(f => unusedMandatoryColumnNames.contains(f.name))) +// +// HoodieTableSchema(prunedStructSchema, convertToAvroSchema(prunedStructSchema, tableName).toString) +// } val requiredSchemaReaderSkipMerging = createBaseFileReader( spark = sqlContext.sparkSession, partitionSchema = partitionSchema, dataSchema = dataSchema, - requiredDataSchema = prunedRequiredSchema, + requiredDataSchema = rawRequiredSchema, // This file-reader is only used in cases when no merging is performed, therefore it's safe to push // down these filters to the base file readers filters = requiredFilters ++ optionalFilters, diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBootstrapMORRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBootstrapMORRelation.scala index a06a216c661ca..3445ed6e4a2aa 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBootstrapMORRelation.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBootstrapMORRelation.scala @@ -82,12 +82,12 @@ case class HoodieBootstrapMORRelation(override val sqlContext: SQLContext, protected override def composeRDD(fileSplits: Seq[FileSplit], tableSchema: HoodieTableSchema, - requiredSchema: HoodieTableSchema, - requestedColumns: Array[String], + requiredSchema/*schema required by caller + mandatory fields*/: HoodieTableSchema, + rawRequiredSchema/*schema required by caller*/: HoodieTableSchema, filters: Array[Filter]): RDD[InternalRow] = { val (bootstrapDataFileReader, bootstrapSkeletonFileReader, regularFileReader) = getFileReaders(tableSchema, - requiredSchema, requestedColumns, filters) + requiredSchema, filters) new HoodieBootstrapMORRDD( sqlContext.sparkSession, config = jobConf, diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBootstrapRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBootstrapRelation.scala index cda0bafa17f89..2cdfcee608228 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBootstrapRelation.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBootstrapRelation.scala @@ -133,11 +133,11 @@ abstract class BaseHoodieBootstrapRelation(override val sqlContext: SQLContext, * get all the file readers required for composeRDD */ protected def getFileReaders(tableSchema: HoodieTableSchema, - requiredSchema: HoodieTableSchema, - requestedColumns: Array[String], + requiredSchema/*schema required by caller + mandatory fields*/: HoodieTableSchema, filters: Array[Filter]): (BaseFileReader, BaseFileReader, BaseFileReader) = { + val requiredSkeletonFileSchema = - StructType(skeletonSchema.filter(f => requestedColumns.exists(col => resolver(f.name, col)))) + StructType(skeletonSchema.filter(f => requiredSchema.structTypeSchema.fields.exists(col => resolver(f.name, col.name)))) val (bootstrapDataFileReader, bootstrapSkeletonFileReader) = createBootstrapFileReaders(tableSchema, requiredSchema, requiredSkeletonFileSchema, filters) @@ -148,12 +148,12 @@ abstract class BaseHoodieBootstrapRelation(override val sqlContext: SQLContext, protected override def composeRDD(fileSplits: Seq[FileSplit], tableSchema: HoodieTableSchema, - requiredSchema: HoodieTableSchema, - requestedColumns: Array[String], + requiredSchema/*schema required by caller + mandatory fields*/: HoodieTableSchema, + rawRequiredSchema/*schema required by caller*/: HoodieTableSchema, filters: Array[Filter]): RDD[InternalRow] = { val (bootstrapDataFileReader, bootstrapSkeletonFileReader, regularFileReader) = getFileReaders(tableSchema, - requiredSchema, requestedColumns, filters) + requiredSchema, filters) new HoodieBootstrapRDD(sqlContext.sparkSession, bootstrapDataFileReader, bootstrapSkeletonFileReader, regularFileReader, requiredSchema, fileSplits) } diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDDV1.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDDV1.scala index 3d4c567239ce3..0dde9e5bb49c9 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDDV1.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDDV1.scala @@ -55,7 +55,8 @@ class HoodieMergeOnReadRDDV1(@transient sc: SparkContext, @transient config: Configuration, fileReaders: HoodieMergeOnReadBaseFileReaders, tableSchema: HoodieTableSchema, - requiredSchema: HoodieTableSchema, + requiredSchema/*schema required by caller + mandatory fields*/: HoodieTableSchema, + rawRequiredSchema/*schema required by caller*/: HoodieTableSchema, tableState: HoodieTableState, mergeType: String, @transient fileSplits: Seq[HoodieMergeOnReadFileSplit], @@ -72,17 +73,17 @@ class HoodieMergeOnReadRDDV1(@transient sc: SparkContext, val partition = split.asInstanceOf[HoodieMergeOnReadPartition] val iter = partition.split match { case dataFileOnlySplit if dataFileOnlySplit.logFiles.isEmpty => - val projectedReader = projectReader(fileReaders.requiredSchemaReaderSkipMerging, requiredSchema.structTypeSchema) + val projectedReader = projectReader(fileReaders.requiredSchemaReaderSkipMerging, rawRequiredSchema.structTypeSchema) projectedReader(dataFileOnlySplit.dataFile.get) case logFileOnlySplit if logFileOnlySplit.dataFile.isEmpty => - new LogFileIterator(logFileOnlySplit, tableSchema, requiredSchema, tableState, getHadoopConf) + new LogFileIterator(logFileOnlySplit, tableSchema, rawRequiredSchema, tableState, getHadoopConf) case split => mergeType match { case DataSourceReadOptions.REALTIME_SKIP_MERGE_OPT_VAL => val reader = fileReaders.requiredSchemaReaderSkipMerging - new SkipMergeIterator(split, reader, tableSchema, requiredSchema, tableState, getHadoopConf) + new SkipMergeIterator(split, reader, tableSchema, rawRequiredSchema, tableState, getHadoopConf) case DataSourceReadOptions.REALTIME_PAYLOAD_COMBINE_OPT_VAL => val reader = pickBaseFileReader() diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDDV2.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDDV2.scala index ab9dc6b4aeaa3..948cf27d769c4 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDDV2.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDDV2.scala @@ -73,7 +73,8 @@ class HoodieMergeOnReadRDDV2(@transient sc: SparkContext, @transient config: Configuration, fileReaders: HoodieMergeOnReadBaseFileReaders, tableSchema: HoodieTableSchema, - requiredSchema: HoodieTableSchema, + requiredSchema/*schema required by caller + mandatory fields*/: HoodieTableSchema, + rawRequiredSchema/*schema required by caller*/: HoodieTableSchema, tableState: HoodieTableState, mergeType: String, @transient fileSplits: Seq[HoodieMergeOnReadFileSplit], @@ -88,17 +89,17 @@ class HoodieMergeOnReadRDDV2(@transient sc: SparkContext, val partition = split.asInstanceOf[HoodieMergeOnReadPartition] val iter = partition.split match { case dataFileOnlySplit if dataFileOnlySplit.logFiles.isEmpty => - val projectedReader = projectReader(fileReaders.requiredSchemaReaderSkipMerging, requiredSchema.structTypeSchema) + val projectedReader = projectReader(fileReaders.requiredSchemaReaderSkipMerging, rawRequiredSchema.structTypeSchema) projectedReader(dataFileOnlySplit.dataFile.get) case logFileOnlySplit if logFileOnlySplit.dataFile.isEmpty => - new LogFileIterator(logFileOnlySplit, tableSchema, requiredSchema, tableState, getHadoopConf) + new LogFileIterator(logFileOnlySplit, tableSchema, rawRequiredSchema, tableState, getHadoopConf) case split => mergeType match { case DataSourceReadOptions.REALTIME_SKIP_MERGE_OPT_VAL => val reader = fileReaders.requiredSchemaReaderSkipMerging - new SkipMergeIterator(split, reader, tableSchema, requiredSchema, tableState, getHadoopConf) + new SkipMergeIterator(split, reader, tableSchema, rawRequiredSchema, tableState, getHadoopConf) case DataSourceReadOptions.REALTIME_PAYLOAD_COMBINE_OPT_VAL => val reader = pickBaseFileReader() diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelationV1.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelationV1.scala index 62030fed870bd..50cd18c478735 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelationV1.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelationV1.scala @@ -77,14 +77,14 @@ case class MergeOnReadIncrementalRelationV1(override val sqlContext: SQLContext, protected override def composeRDD(fileSplits: Seq[HoodieMergeOnReadFileSplit], tableSchema: HoodieTableSchema, - requiredSchema: HoodieTableSchema, - requestedColumns: Array[String], + requiredSchema/*schema required by caller + mandatory fields*/: HoodieTableSchema, + rawRequiredSchema/*schema required by caller*/: HoodieTableSchema, filters: Array[Filter]): RDD[InternalRow] = { // The only required filters are ones that make sure we're only fetching records that // fall into incremental span of the timeline being queried val requiredFilters = incrementalSpanRecordFilters val optionalFilters = filters - val readers = createBaseFileReaders(tableSchema, requiredSchema, requestedColumns, requiredFilters, optionalFilters) + val readers = createBaseFileReaders(tableSchema, requiredSchema, rawRequiredSchema, requiredFilters, optionalFilters) new HoodieMergeOnReadRDDV1( sqlContext.sparkContext, @@ -92,6 +92,7 @@ case class MergeOnReadIncrementalRelationV1(override val sqlContext: SQLContext, fileReaders = readers, tableSchema = tableSchema, requiredSchema = requiredSchema, + rawRequiredSchema = rawRequiredSchema, tableState = tableState, mergeType = mergeType, fileSplits = fileSplits, diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelationV2.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelationV2.scala index c74fbbdc6724d..e785282bd554b 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelationV2.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelationV2.scala @@ -72,14 +72,14 @@ case class MergeOnReadIncrementalRelationV2(override val sqlContext: SQLContext, protected override def composeRDD(fileSplits: Seq[HoodieMergeOnReadFileSplit], tableSchema: HoodieTableSchema, - requiredSchema: HoodieTableSchema, - requestedColumns: Array[String], + requiredSchema/*schema required by caller + mandatory fields*/: HoodieTableSchema, + rawRequiredSchema/*schema required by caller*/: HoodieTableSchema, filters: Array[Filter]): RDD[InternalRow] = { // The only required filters are ones that make sure we're only fetching records that // fall into incremental span of the timeline being queried val requiredFilters = incrementalSpanRecordFilters val optionalFilters = filters - val readers = createBaseFileReaders(tableSchema, requiredSchema, requestedColumns, requiredFilters, optionalFilters) + val readers = createBaseFileReaders(tableSchema, requiredSchema, rawRequiredSchema, requiredFilters, optionalFilters) new HoodieMergeOnReadRDDV2( sqlContext.sparkContext, @@ -87,6 +87,7 @@ case class MergeOnReadIncrementalRelationV2(override val sqlContext: SQLContext, fileReaders = readers, tableSchema = tableSchema, requiredSchema = requiredSchema, + rawRequiredSchema = rawRequiredSchema, tableState = tableState, mergeType = mergeType, fileSplits = fileSplits, diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala index 3ba986c412bac..ce642fe4d5aaa 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala @@ -105,12 +105,12 @@ abstract class BaseMergeOnReadSnapshotRelation(sqlContext: SQLContext, protected override def composeRDD(fileSplits: Seq[HoodieMergeOnReadFileSplit], tableSchema: HoodieTableSchema, - requiredSchema: HoodieTableSchema, - requestedColumns: Array[String], + requiredSchema/*schema required by caller + mandatory fields*/: HoodieTableSchema, + rawRequiredSchema/*schema required by caller*/: HoodieTableSchema, filters: Array[Filter]): RDD[InternalRow] = { val requiredFilters = Seq.empty val optionalFilters = filters - val readers = createBaseFileReaders(tableSchema, requiredSchema, requestedColumns, requiredFilters, optionalFilters) + val readers = createBaseFileReaders(tableSchema, requiredSchema, rawRequiredSchema, requiredFilters, optionalFilters) new HoodieMergeOnReadRDDV2( sqlContext.sparkContext, @@ -118,6 +118,7 @@ abstract class BaseMergeOnReadSnapshotRelation(sqlContext: SQLContext, fileReaders = readers, tableSchema = tableSchema, requiredSchema = requiredSchema, + rawRequiredSchema = rawRequiredSchema, tableState = tableState, mergeType = mergeType, fileSplits = fileSplits)