Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-8889] Trim unnecessary columns during MoR snapshot read #12677

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,6 @@ case class BaseFileOnlyRelation(override val sqlContext: SQLContext,
override protected val shouldExtractPartitionValuesFromPartitionPath: Boolean =
internalSchemaOpt.isEmpty

override lazy val mandatoryFields: Seq[String] = Seq.empty

// Before Spark 3.4.0: PartitioningAwareFileIndex.BASE_PATH_PARAM
// Since Spark 3.4.0: FileIndexOptions.BASE_PATH_PARAM
val BASE_PATH_PARAM = "basePath"
Expand All @@ -81,7 +79,7 @@ case class BaseFileOnlyRelation(override val sqlContext: SQLContext,
protected override def composeRDD(fileSplits: Seq[HoodieBaseFileSplit],
tableSchema: HoodieTableSchema,
requiredSchema: HoodieTableSchema,
requestedColumns: Array[String],
optionalSchema: HoodieTableSchema,
filters: Array[Filter]): RDD[InternalRow] = {
val (partitionSchema, dataSchema, requiredDataSchema) = tryPrunePartitionColumns(tableSchema, requiredSchema)
val baseFileReader = createBaseFileReader(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -278,13 +278,21 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,

/**
* Columns that relation has to read from the storage to properly execute on its semantic: for ex,
* for Merge-on-Read tables key fields as well and precombine field comprise mandatory set of columns,
* for performing incremental read, the {@link HoodieRecord.COMMIT_TIME_METADATA_FIELD} is required for filtering the out-of-range records
*
* @VisibleInTests
*/
lazy val mandatoryFields: Seq[String] = Seq.empty

/**
* Columns that relation may need to read from the storage to properly execute on its semantic: for ex,
* for Merge-on-Read tables key fields as well and pre-combine field comprise mandatory set of columns,
* meaning that regardless of whether this columns are being requested by the query they will be fetched
* regardless so that relation is able to combine records properly (if necessary)
* regardless so that relation is able to combine records properly (when performing Snapshot-Read on the file-groups with log files)
*
* @VisibleInTests
*/
val mandatoryFields: Seq[String]
lazy val optionalExtraFields: Seq[String] = Seq.empty

protected def timeline: HoodieTimeline =
// NOTE: We're including compaction here since it's not considering a "commit" operation
Expand Down Expand Up @@ -350,34 +358,51 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,
* NOTE: DO NOT OVERRIDE THIS METHOD
*/
override final def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row] = {
val sourceSchema = prunedDataSchema.map(s => convertToAvroSchema(s, tableName)).getOrElse(tableAvroSchema)

// 1. Generate the required schema, including:
// - the columns requested by the caller
// - the mandatory fields according to the relation
val requiredTargetColumns: Array[String] = appendMandatoryColumns(requiredColumns)
val (requiredAvroSchema, requiredStructSchema, requiredInternalSchema) =
projectSchema(Either.cond(internalSchemaOpt.isDefined, internalSchemaOpt.get, sourceSchema), requiredTargetColumns)
val requiredSchema = HoodieTableSchema(requiredStructSchema, requiredAvroSchema.toString, Some(requiredInternalSchema))

// 2. Generate the optional schema, including:
// - the columns requested by the caller
// - the mandatory fields according to the relation
// - the optional extra fields

// NOTE: PLEASE READ CAREFULLY BEFORE MAKING CHANGES
// *Appending* additional columns to the ones requested by the caller is not a problem, as those
// will be eliminated by the caller's projection;
// (!) Please note, however, that it's critical to avoid _reordering_ of the requested columns as this
// will break the upstream projection
val targetColumns: Array[String] = appendMandatoryColumns(requiredColumns)
val optionalTargetColumns: Array[String] = appendOptionalExtraColumns(requiredTargetColumns)
// NOTE: We explicitly fallback to default table's Avro schema to make sure we avoid unnecessary Catalyst > Avro
// schema conversion, which is lossy in nature (for ex, it doesn't preserve original Avro type-names) and
// could have an effect on subsequent de-/serializing records in some exotic scenarios (when Avro unions
// w/ more than 2 types are involved)
val sourceSchema = prunedDataSchema.map(s => convertToAvroSchema(s, tableName)).getOrElse(tableAvroSchema)
val (requiredAvroSchema, requiredStructSchema, requiredInternalSchema) =
projectSchema(Either.cond(internalSchemaOpt.isDefined, internalSchemaOpt.get, sourceSchema), targetColumns)
val (optionalAvroSchema, optionalStructSchema, optionalInternalSchema) =
projectSchema(Either.cond(internalSchemaOpt.isDefined, internalSchemaOpt.get, sourceSchema), optionalTargetColumns)
val optionalSchema = HoodieTableSchema(optionalStructSchema, optionalAvroSchema.toString, Some(optionalInternalSchema))

// 3. Generate table full schema, including:
// - all columns from the table's schema
// - inner metadata fields
val tableAvroSchemaStr = tableAvroSchema.toString
val tableSchema = HoodieTableSchema(tableStructSchema, tableAvroSchemaStr, internalSchemaOpt)

val filterExpressions = convertToExpressions(filters)
val (partitionFilters, dataFilters) = filterExpressions.partition(isPartitionPredicate)

val fileSplits = collectFileSplits(partitionFilters, dataFilters)

val tableAvroSchemaStr = tableAvroSchema.toString

val tableSchema = HoodieTableSchema(tableStructSchema, tableAvroSchemaStr, internalSchemaOpt)
val requiredSchema = HoodieTableSchema(requiredStructSchema, requiredAvroSchema.toString, Some(requiredInternalSchema))

if (fileSplits.isEmpty) {
sparkSession.sparkContext.emptyRDD
} else {
val rdd = composeRDD(fileSplits, tableSchema, requiredSchema, targetColumns, filters)
val rdd = composeRDD(fileSplits, tableSchema, requiredSchema, optionalSchema, filters)

// Here we rely on a type erasure, to workaround inherited API restriction and pass [[RDD[InternalRow]]] back as [[RDD[Row]]]
// Please check [[needConversion]] scala-doc for more details
Expand All @@ -390,15 +415,15 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,
*
* @param fileSplits file splits to be handled by the RDD
* @param tableSchema target table's schema
* @param requiredSchema projected schema required by the reader
* @param requestedColumns columns requested by the query
* @param requiredSchema projected schema required by the reader, including mandatory fields
* @param optionalSchema projected schema required by the reader, including mandatory and optional extra fields
* @param filters data filters to be applied
* @return instance of RDD (holding [[InternalRow]]s)
*/
protected def composeRDD(fileSplits: Seq[FileSplit],
tableSchema: HoodieTableSchema,
requiredSchema: HoodieTableSchema,
requestedColumns: Array[String],
optionalSchema: HoodieTableSchema,
filters: Array[Filter]): RDD[InternalRow]

/**
Expand Down Expand Up @@ -470,6 +495,15 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,
requestedColumns ++ missing
}

private final def appendOptionalExtraColumns(requestedColumns: Array[String]): Array[String] = {
// For a nested field in optional extra columns, we should first get the root-level field, and then
// check for any missing column, as the requestedColumns should only contain root-level fields
// We should only append root-level field as well
val missing = optionalExtraFields.map(col => HoodieAvroUtils.getRootLevelFieldName(col))
.filter(rootField => !requestedColumns.contains(rootField))
requestedColumns ++ missing
}

def imbueConfigs(sqlContext: SQLContext): Unit = {
sqlContext.sparkSession.sessionState.conf.setConfString("spark.sql.parquet.filterPushdown", "true")
sqlContext.sparkSession.sessionState.conf.setConfString("spark.sql.parquet.recordLevelFilter.enabled", "true")
Expand Down Expand Up @@ -525,7 +559,7 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,

protected def createBaseFileReaders(tableSchema: HoodieTableSchema,
requiredSchema: HoodieTableSchema,
requestedColumns: Array[String],
optionalSchema: HoodieTableSchema,
requiredFilters: Seq[Filter],
optionalFilters: Seq[Filter] = Seq.empty,
baseFileFormat: HoodieFileFormat = tableConfig.getBaseFileFormat): HoodieMergeOnReadBaseFileReaders = {
Expand Down Expand Up @@ -555,53 +589,39 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,
partitionSchema = partitionSchema,
dataSchema = dataSchema,
requiredDataSchema = requiredDataSchema,
// This file-reader is used to read base file records, subsequently merging them with the records
// stored in delta-log files. As such, we have to read _all_ records from the base file, while avoiding
// applying any filtering _before_ we complete combining them w/ delta-log records (to make sure that
// we combine them correctly);
// As such only required filters could be pushed-down to such reader
filters = requiredFilters,
// This file-reader is only used in cases when no merging is performed, therefore it's safe to push
// down these filters to the base file readers
filters = requiredFilters ++ optionalFilters,
options = optParams,
// NOTE: We have to fork the Hadoop Config here as Spark will be modifying it
// to configure Parquet reader appropriately
hadoopConf = embedInternalSchema(new Configuration(conf), requiredDataSchema.internalSchema),
baseFileFormat = baseFileFormat
)

// For file groups without delta logs, we can make the following optimizations:
// a) If the requested columns are not included in mandatoryColumns, they can be removed from requiredDataSchema.
// b) Apply filters to reader for data skipping since no merging.
val mandatoryColumns = mandatoryFields.map(HoodieAvroUtils.getRootLevelFieldName)
val unusedMandatoryColumnNames = mandatoryColumns.filterNot(requestedColumns.contains)
val prunedRequiredSchema = if (unusedMandatoryColumnNames.isEmpty) {
requiredDataSchema
} else {
val prunedStructSchema =
StructType(requiredDataSchema.structTypeSchema.fields
.filterNot(f => unusedMandatoryColumnNames.contains(f.name)))

HoodieTableSchema(prunedStructSchema, convertToAvroSchema(prunedStructSchema, tableName).toString)
}

val requiredSchemaReaderSkipMerging = createBaseFileReader(
// For file groups with delta logs, we should use the optional schema to read so that we can perform payload-combine.
val optionalSchemaReader = createBaseFileReader(
spark = sqlContext.sparkSession,
partitionSchema = partitionSchema,
dataSchema = dataSchema,
requiredDataSchema = prunedRequiredSchema,
// This file-reader is only used in cases when no merging is performed, therefore it's safe to push
// down these filters to the base file readers
filters = requiredFilters ++ optionalFilters,
requiredDataSchema = optionalSchema,
// This file-reader is used to read base file records, subsequently merging them with the records
// stored in delta-log files. As such, we have to read _all_ records from the base file, while avoiding
// applying any filtering _before_ we complete combining them w/ delta-log records (to make sure that
// we combine them correctly);
// As such only required filters could be pushed-down to such reader
filters = requiredFilters,
options = optParams,
// NOTE: We have to fork the Hadoop Config here as Spark will be modifying it
// to configure Parquet reader appropriately
hadoopConf = embedInternalSchema(new Configuration(conf), requiredDataSchema.internalSchema),
hadoopConf = embedInternalSchema(new Configuration(conf), optionalSchema.internalSchema),
baseFileFormat = baseFileFormat
)

HoodieMergeOnReadBaseFileReaders(
fullSchemaReader = fullSchemaReader,
requiredSchemaReader = requiredSchemaReader,
requiredSchemaReaderSkipMerging = requiredSchemaReaderSkipMerging)
optionalSchemaReader = optionalSchemaReader)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ case class HoodieBootstrapMORRelation(override val sqlContext: SQLContext,
protected lazy val mandatoryFieldsForMerging: Seq[String] =
Seq(recordKeyField) ++ preCombineFieldOpt.map(Seq(_)).getOrElse(Seq())

override lazy val mandatoryFields: Seq[String] = mandatoryFieldsForMerging
override lazy val optionalExtraFields: Seq[String] = mandatoryFieldsForMerging

protected override def getFileSlices(partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): Seq[FileSlice] = {
if (globPaths.isEmpty) {
Expand All @@ -83,11 +83,11 @@ case class HoodieBootstrapMORRelation(override val sqlContext: SQLContext,
protected override def composeRDD(fileSplits: Seq[FileSplit],
tableSchema: HoodieTableSchema,
requiredSchema: HoodieTableSchema,
requestedColumns: Array[String],
optionalSchema: HoodieTableSchema,
filters: Array[Filter]): RDD[InternalRow] = {

val (bootstrapDataFileReader, bootstrapSkeletonFileReader, regularFileReader) = getFileReaders(tableSchema,
requiredSchema, requestedColumns, filters)
optionalSchema, filters)
new HoodieBootstrapMORRDD(
sqlContext.sparkSession,
config = jobConf,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,6 @@ abstract class BaseHoodieBootstrapRelation(override val sqlContext: SQLContext,

private lazy val skeletonSchema = HoodieSparkUtils.getMetaSchema

override lazy val mandatoryFields: Seq[String] = Seq.empty

protected def getFileSlices(partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): Seq[FileSlice] = {
listLatestFileSlices(globPaths, partitionFilters, dataFilters)
}
Expand Down Expand Up @@ -133,27 +131,27 @@ abstract class BaseHoodieBootstrapRelation(override val sqlContext: SQLContext,
* get all the file readers required for composeRDD
*/
protected def getFileReaders(tableSchema: HoodieTableSchema,
requiredSchema: HoodieTableSchema,
requestedColumns: Array[String],
optionalSchema: HoodieTableSchema,
filters: Array[Filter]): (BaseFileReader, BaseFileReader, BaseFileReader) = {

val requiredSkeletonFileSchema =
StructType(skeletonSchema.filter(f => requestedColumns.exists(col => resolver(f.name, col))))
StructType(skeletonSchema.filter(f => optionalSchema.structTypeSchema.fields.exists(col => resolver(f.name, col.name))))

val (bootstrapDataFileReader, bootstrapSkeletonFileReader) =
createBootstrapFileReaders(tableSchema, requiredSchema, requiredSkeletonFileSchema, filters)
createBootstrapFileReaders(tableSchema, optionalSchema, requiredSkeletonFileSchema, filters)

val regularFileReader = createRegularFileReader(tableSchema, requiredSchema, filters)
val regularFileReader = createRegularFileReader(tableSchema, optionalSchema, filters)
(bootstrapDataFileReader, bootstrapSkeletonFileReader, regularFileReader)
}

protected override def composeRDD(fileSplits: Seq[FileSplit],
tableSchema: HoodieTableSchema,
requiredSchema: HoodieTableSchema,
requestedColumns: Array[String],
optionalSchema: HoodieTableSchema,
filters: Array[Filter]): RDD[InternalRow] = {

val (bootstrapDataFileReader, bootstrapSkeletonFileReader, regularFileReader) = getFileReaders(tableSchema,
requiredSchema, requestedColumns, filters)
requiredSchema, filters)
new HoodieBootstrapRDD(sqlContext.sparkSession, bootstrapDataFileReader, bootstrapSkeletonFileReader, regularFileReader,
requiredSchema, fileSplits)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ class HoodieMergeOnReadRDDV1(@transient sc: SparkContext,
val partition = split.asInstanceOf[HoodieMergeOnReadPartition]
val iter = partition.split match {
case dataFileOnlySplit if dataFileOnlySplit.logFiles.isEmpty =>
val projectedReader = projectReader(fileReaders.requiredSchemaReaderSkipMerging, requiredSchema.structTypeSchema)
val projectedReader = projectReader(fileReaders.requiredSchemaReader, requiredSchema.structTypeSchema)
projectedReader(dataFileOnlySplit.dataFile.get)

case logFileOnlySplit if logFileOnlySplit.dataFile.isEmpty =>
Expand All @@ -81,7 +81,7 @@ class HoodieMergeOnReadRDDV1(@transient sc: SparkContext,
case split =>
mergeType match {
case DataSourceReadOptions.REALTIME_SKIP_MERGE_OPT_VAL =>
val reader = fileReaders.requiredSchemaReaderSkipMerging
val reader = fileReaders.requiredSchemaReader
new SkipMergeIterator(split, reader, tableSchema, requiredSchema, tableState, getHadoopConf)

case DataSourceReadOptions.REALTIME_PAYLOAD_COMBINE_OPT_VAL =>
Expand Down Expand Up @@ -143,7 +143,7 @@ class HoodieMergeOnReadRDDV1(@transient sc: SparkContext,
// Record Payload classes then we can avoid reading and parsing the records w/ _full_ schema,
// and instead only rely on projected one, nevertheless being able to perform merging correctly
if (isProjectionCompatible(tableState)) {
fileReaders.requiredSchemaReader
fileReaders.optionalSchemaReader
} else {
fileReaders.fullSchemaReader
}
Expand Down
Loading
Loading