Skip to content

Commit

Permalink
feat: Trim unnecessary columns during MoR snapshot read
Browse files Browse the repository at this point in the history
1. Trim unnecessary columns during MoR snapshot read

Signed-off-by: TheR1sing3un <[email protected]>
  • Loading branch information
TheR1sing3un committed Jan 20, 2025
1 parent 696683f commit 39f927a
Show file tree
Hide file tree
Showing 10 changed files with 66 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public class HoodieReaderConfig extends HoodieConfig {

public static final ConfigProperty<Boolean> FILE_GROUP_READER_ENABLED = ConfigProperty
.key("hoodie.file.group.reader.enabled")
.defaultValue(true)
.defaultValue(false)
.markAdvanced()
.sinceVersion("1.0.0")
.withDocumentation("Use engine agnostic file group reader if enabled");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,8 @@ case class BaseFileOnlyRelation(override val sqlContext: SQLContext,

protected override def composeRDD(fileSplits: Seq[HoodieBaseFileSplit],
tableSchema: HoodieTableSchema,
requiredSchema: HoodieTableSchema,
requestedColumns: Array[String],
requiredSchema/*schema required by caller + mandatory fields*/: HoodieTableSchema,
rawRequiredSchema/*schema required by caller*/: HoodieTableSchema,
filters: Array[Filter]): RDD[InternalRow] = {
val (partitionSchema, dataSchema, requiredDataSchema) = tryPrunePartitionColumns(tableSchema, requiredSchema)
val baseFileReader = createBaseFileReader(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,14 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,
* NOTE: DO NOT OVERRIDE THIS METHOD
*/
override final def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row] = {
val sourceSchema = prunedDataSchema.map(s => convertToAvroSchema(s, tableName)).getOrElse(tableAvroSchema)

// 1. Generate raw required schema, only including the columns requested by the caller
val (rawRequiredAvroSchema, rawRequiredStructSchema, rawRequiredInternalSchema) =
projectSchema(Either.cond(internalSchemaOpt.isDefined, internalSchemaOpt.get, sourceSchema), requiredColumns)
val rawRequiredSchema = HoodieTableSchema(rawRequiredStructSchema, rawRequiredAvroSchema.toString, Some(rawRequiredInternalSchema))

// 2. Generate required schema, including mandatory fields
// NOTE: PLEASE READ CAREFULLY BEFORE MAKING CHANGES
// *Appending* additional columns to the ones requested by the caller is not a problem, as those
// will be eliminated by the caller's projection;
Expand All @@ -360,24 +368,24 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,
// schema conversion, which is lossy in nature (for ex, it doesn't preserve original Avro type-names) and
// could have an effect on subsequent de-/serializing records in some exotic scenarios (when Avro unions
// w/ more than 2 types are involved)
val sourceSchema = prunedDataSchema.map(s => convertToAvroSchema(s, tableName)).getOrElse(tableAvroSchema)
val (requiredAvroSchema, requiredStructSchema, requiredInternalSchema) =
projectSchema(Either.cond(internalSchemaOpt.isDefined, internalSchemaOpt.get, sourceSchema), targetColumns)
val requiredSchema = HoodieTableSchema(requiredStructSchema, requiredAvroSchema.toString, Some(requiredInternalSchema))

// 3. Generate table full schema, including metadata fields
val tableAvroSchemaStr = tableAvroSchema.toString
val tableSchema = HoodieTableSchema(tableStructSchema, tableAvroSchemaStr, internalSchemaOpt)

val filterExpressions = convertToExpressions(filters)
val (partitionFilters, dataFilters) = filterExpressions.partition(isPartitionPredicate)

val fileSplits = collectFileSplits(partitionFilters, dataFilters)

val tableAvroSchemaStr = tableAvroSchema.toString

val tableSchema = HoodieTableSchema(tableStructSchema, tableAvroSchemaStr, internalSchemaOpt)
val requiredSchema = HoodieTableSchema(requiredStructSchema, requiredAvroSchema.toString, Some(requiredInternalSchema))

if (fileSplits.isEmpty) {
sparkSession.sparkContext.emptyRDD
} else {
val rdd = composeRDD(fileSplits, tableSchema, requiredSchema, targetColumns, filters)
val rdd = composeRDD(fileSplits, tableSchema, requiredSchema, rawRequiredSchema, filters)

// Here we rely on a type erasure, to workaround inherited API restriction and pass [[RDD[InternalRow]]] back as [[RDD[Row]]]
// Please check [[needConversion]] scala-doc for more details
Expand All @@ -390,15 +398,15 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,
*
* @param fileSplits file splits to be handled by the RDD
* @param tableSchema target table's schema
* @param requiredSchema projected schema required by the reader
* @param requestedColumns columns requested by the query
* @param requiredSchema projected schema required by the reader, including mandatory fields, only used for merging on read
* @param rawRequiredSchema schema required by the caller
* @param filters data filters to be applied
* @return instance of RDD (holding [[InternalRow]]s)
*/
protected def composeRDD(fileSplits: Seq[FileSplit],
tableSchema: HoodieTableSchema,
requiredSchema: HoodieTableSchema,
requestedColumns: Array[String],
requiredSchema/*schema required by caller + mandatory fields*/: HoodieTableSchema,
rawRequiredSchema/*schema required by caller*/: HoodieTableSchema,
filters: Array[Filter]): RDD[InternalRow]

/**
Expand Down Expand Up @@ -524,8 +532,8 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,
def updatePrunedDataSchema(prunedSchema: StructType): Relation

protected def createBaseFileReaders(tableSchema: HoodieTableSchema,
requiredSchema: HoodieTableSchema,
requestedColumns: Array[String],
requiredSchema/*schema required by caller + mandatory fields*/: HoodieTableSchema,
rawRequiredSchema/*schema required by caller*/: HoodieTableSchema,
requiredFilters: Seq[Filter],
optionalFilters: Seq[Filter] = Seq.empty,
baseFileFormat: HoodieFileFormat = tableConfig.getBaseFileFormat): HoodieMergeOnReadBaseFileReaders = {
Expand Down Expand Up @@ -571,23 +579,23 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,
// For file groups without delta logs, we can make the following optimizations:
// a) If the requested columns are not included in mandatoryColumns, they can be removed from requiredDataSchema.
// b) Apply filters to reader for data skipping since no merging.
val mandatoryColumns = mandatoryFields.map(HoodieAvroUtils.getRootLevelFieldName)
val unusedMandatoryColumnNames = mandatoryColumns.filterNot(requestedColumns.contains)
val prunedRequiredSchema = if (unusedMandatoryColumnNames.isEmpty) {
requiredDataSchema
} else {
val prunedStructSchema =
StructType(requiredDataSchema.structTypeSchema.fields
.filterNot(f => unusedMandatoryColumnNames.contains(f.name)))

HoodieTableSchema(prunedStructSchema, convertToAvroSchema(prunedStructSchema, tableName).toString)
}
// val mandatoryColumns = mandatoryFields.map(HoodieAvroUtils.getRootLevelFieldName)
// val unusedMandatoryColumnNames = mandatoryColumns.filterNot(requestedColumns.contains)
// val prunedRequiredSchema = if (unusedMandatoryColumnNames.isEmpty) {
// requiredDataSchema
// } else {
// val prunedStructSchema =
// StructType(requiredDataSchema.structTypeSchema.fields
// .filterNot(f => unusedMandatoryColumnNames.contains(f.name)))
//
// HoodieTableSchema(prunedStructSchema, convertToAvroSchema(prunedStructSchema, tableName).toString)
// }

val requiredSchemaReaderSkipMerging = createBaseFileReader(
spark = sqlContext.sparkSession,
partitionSchema = partitionSchema,
dataSchema = dataSchema,
requiredDataSchema = prunedRequiredSchema,
requiredDataSchema = rawRequiredSchema,
// This file-reader is only used in cases when no merging is performed, therefore it's safe to push
// down these filters to the base file readers
filters = requiredFilters ++ optionalFilters,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,12 +82,12 @@ case class HoodieBootstrapMORRelation(override val sqlContext: SQLContext,

protected override def composeRDD(fileSplits: Seq[FileSplit],
tableSchema: HoodieTableSchema,
requiredSchema: HoodieTableSchema,
requestedColumns: Array[String],
requiredSchema/*schema required by caller + mandatory fields*/: HoodieTableSchema,
rawRequiredSchema/*schema required by caller*/: HoodieTableSchema,
filters: Array[Filter]): RDD[InternalRow] = {

val (bootstrapDataFileReader, bootstrapSkeletonFileReader, regularFileReader) = getFileReaders(tableSchema,
requiredSchema, requestedColumns, filters)
requiredSchema, filters)
new HoodieBootstrapMORRDD(
sqlContext.sparkSession,
config = jobConf,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,11 +133,11 @@ abstract class BaseHoodieBootstrapRelation(override val sqlContext: SQLContext,
* get all the file readers required for composeRDD
*/
protected def getFileReaders(tableSchema: HoodieTableSchema,
requiredSchema: HoodieTableSchema,
requestedColumns: Array[String],
requiredSchema/*schema required by caller + mandatory fields*/: HoodieTableSchema,
filters: Array[Filter]): (BaseFileReader, BaseFileReader, BaseFileReader) = {

val requiredSkeletonFileSchema =
StructType(skeletonSchema.filter(f => requestedColumns.exists(col => resolver(f.name, col))))
StructType(skeletonSchema.filter(f => requiredSchema.structTypeSchema.fields.exists(col => resolver(f.name, col.name))))

val (bootstrapDataFileReader, bootstrapSkeletonFileReader) =
createBootstrapFileReaders(tableSchema, requiredSchema, requiredSkeletonFileSchema, filters)
Expand All @@ -148,12 +148,12 @@ abstract class BaseHoodieBootstrapRelation(override val sqlContext: SQLContext,

protected override def composeRDD(fileSplits: Seq[FileSplit],
tableSchema: HoodieTableSchema,
requiredSchema: HoodieTableSchema,
requestedColumns: Array[String],
requiredSchema/*schema required by caller + mandatory fields*/: HoodieTableSchema,
rawRequiredSchema/*schema required by caller*/: HoodieTableSchema,
filters: Array[Filter]): RDD[InternalRow] = {

val (bootstrapDataFileReader, bootstrapSkeletonFileReader, regularFileReader) = getFileReaders(tableSchema,
requiredSchema, requestedColumns, filters)
requiredSchema, filters)
new HoodieBootstrapRDD(sqlContext.sparkSession, bootstrapDataFileReader, bootstrapSkeletonFileReader, regularFileReader,
requiredSchema, fileSplits)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@ class HoodieMergeOnReadRDDV1(@transient sc: SparkContext,
@transient config: Configuration,
fileReaders: HoodieMergeOnReadBaseFileReaders,
tableSchema: HoodieTableSchema,
requiredSchema: HoodieTableSchema,
requiredSchema/*schema required by caller + mandatory fields*/: HoodieTableSchema,
rawRequiredSchema/*schema required by caller*/: HoodieTableSchema,
tableState: HoodieTableState,
mergeType: String,
@transient fileSplits: Seq[HoodieMergeOnReadFileSplit],
Expand All @@ -72,17 +73,17 @@ class HoodieMergeOnReadRDDV1(@transient sc: SparkContext,
val partition = split.asInstanceOf[HoodieMergeOnReadPartition]
val iter = partition.split match {
case dataFileOnlySplit if dataFileOnlySplit.logFiles.isEmpty =>
val projectedReader = projectReader(fileReaders.requiredSchemaReaderSkipMerging, requiredSchema.structTypeSchema)
val projectedReader = projectReader(fileReaders.requiredSchemaReaderSkipMerging, rawRequiredSchema.structTypeSchema)
projectedReader(dataFileOnlySplit.dataFile.get)

case logFileOnlySplit if logFileOnlySplit.dataFile.isEmpty =>
new LogFileIterator(logFileOnlySplit, tableSchema, requiredSchema, tableState, getHadoopConf)
new LogFileIterator(logFileOnlySplit, tableSchema, rawRequiredSchema, tableState, getHadoopConf)

case split =>
mergeType match {
case DataSourceReadOptions.REALTIME_SKIP_MERGE_OPT_VAL =>
val reader = fileReaders.requiredSchemaReaderSkipMerging
new SkipMergeIterator(split, reader, tableSchema, requiredSchema, tableState, getHadoopConf)
new SkipMergeIterator(split, reader, tableSchema, rawRequiredSchema, tableState, getHadoopConf)

case DataSourceReadOptions.REALTIME_PAYLOAD_COMBINE_OPT_VAL =>
val reader = pickBaseFileReader()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,8 @@ class HoodieMergeOnReadRDDV2(@transient sc: SparkContext,
@transient config: Configuration,
fileReaders: HoodieMergeOnReadBaseFileReaders,
tableSchema: HoodieTableSchema,
requiredSchema: HoodieTableSchema,
requiredSchema/*schema required by caller + mandatory fields*/: HoodieTableSchema,
rawRequiredSchema/*schema required by caller*/: HoodieTableSchema,
tableState: HoodieTableState,
mergeType: String,
@transient fileSplits: Seq[HoodieMergeOnReadFileSplit],
Expand All @@ -88,17 +89,17 @@ class HoodieMergeOnReadRDDV2(@transient sc: SparkContext,
val partition = split.asInstanceOf[HoodieMergeOnReadPartition]
val iter = partition.split match {
case dataFileOnlySplit if dataFileOnlySplit.logFiles.isEmpty =>
val projectedReader = projectReader(fileReaders.requiredSchemaReaderSkipMerging, requiredSchema.structTypeSchema)
val projectedReader = projectReader(fileReaders.requiredSchemaReaderSkipMerging, rawRequiredSchema.structTypeSchema)
projectedReader(dataFileOnlySplit.dataFile.get)

case logFileOnlySplit if logFileOnlySplit.dataFile.isEmpty =>
new LogFileIterator(logFileOnlySplit, tableSchema, requiredSchema, tableState, getHadoopConf)
new LogFileIterator(logFileOnlySplit, tableSchema, rawRequiredSchema, tableState, getHadoopConf)

case split =>
mergeType match {
case DataSourceReadOptions.REALTIME_SKIP_MERGE_OPT_VAL =>
val reader = fileReaders.requiredSchemaReaderSkipMerging
new SkipMergeIterator(split, reader, tableSchema, requiredSchema, tableState, getHadoopConf)
new SkipMergeIterator(split, reader, tableSchema, rawRequiredSchema, tableState, getHadoopConf)

case DataSourceReadOptions.REALTIME_PAYLOAD_COMBINE_OPT_VAL =>
val reader = pickBaseFileReader()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,21 +77,22 @@ case class MergeOnReadIncrementalRelationV1(override val sqlContext: SQLContext,

protected override def composeRDD(fileSplits: Seq[HoodieMergeOnReadFileSplit],
tableSchema: HoodieTableSchema,
requiredSchema: HoodieTableSchema,
requestedColumns: Array[String],
requiredSchema/*schema required by caller + mandatory fields*/: HoodieTableSchema,
rawRequiredSchema/*schema required by caller*/: HoodieTableSchema,
filters: Array[Filter]): RDD[InternalRow] = {
// The only required filters are ones that make sure we're only fetching records that
// fall into incremental span of the timeline being queried
val requiredFilters = incrementalSpanRecordFilters
val optionalFilters = filters
val readers = createBaseFileReaders(tableSchema, requiredSchema, requestedColumns, requiredFilters, optionalFilters)
val readers = createBaseFileReaders(tableSchema, requiredSchema, rawRequiredSchema, requiredFilters, optionalFilters)

new HoodieMergeOnReadRDDV1(
sqlContext.sparkContext,
config = jobConf,
fileReaders = readers,
tableSchema = tableSchema,
requiredSchema = requiredSchema,
rawRequiredSchema = rawRequiredSchema,
tableState = tableState,
mergeType = mergeType,
fileSplits = fileSplits,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,21 +72,22 @@ case class MergeOnReadIncrementalRelationV2(override val sqlContext: SQLContext,

protected override def composeRDD(fileSplits: Seq[HoodieMergeOnReadFileSplit],
tableSchema: HoodieTableSchema,
requiredSchema: HoodieTableSchema,
requestedColumns: Array[String],
requiredSchema/*schema required by caller + mandatory fields*/: HoodieTableSchema,
rawRequiredSchema/*schema required by caller*/: HoodieTableSchema,
filters: Array[Filter]): RDD[InternalRow] = {
// The only required filters are ones that make sure we're only fetching records that
// fall into incremental span of the timeline being queried
val requiredFilters = incrementalSpanRecordFilters
val optionalFilters = filters
val readers = createBaseFileReaders(tableSchema, requiredSchema, requestedColumns, requiredFilters, optionalFilters)
val readers = createBaseFileReaders(tableSchema, requiredSchema, rawRequiredSchema, requiredFilters, optionalFilters)

new HoodieMergeOnReadRDDV2(
sqlContext.sparkContext,
config = jobConf,
fileReaders = readers,
tableSchema = tableSchema,
requiredSchema = requiredSchema,
rawRequiredSchema = rawRequiredSchema,
tableState = tableState,
mergeType = mergeType,
fileSplits = fileSplits,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,19 +105,20 @@ abstract class BaseMergeOnReadSnapshotRelation(sqlContext: SQLContext,

protected override def composeRDD(fileSplits: Seq[HoodieMergeOnReadFileSplit],
tableSchema: HoodieTableSchema,
requiredSchema: HoodieTableSchema,
requestedColumns: Array[String],
requiredSchema/*schema required by caller + mandatory fields*/: HoodieTableSchema,
rawRequiredSchema/*schema required by caller*/: HoodieTableSchema,
filters: Array[Filter]): RDD[InternalRow] = {
val requiredFilters = Seq.empty
val optionalFilters = filters
val readers = createBaseFileReaders(tableSchema, requiredSchema, requestedColumns, requiredFilters, optionalFilters)
val readers = createBaseFileReaders(tableSchema, requiredSchema, rawRequiredSchema, requiredFilters, optionalFilters)

new HoodieMergeOnReadRDDV2(
sqlContext.sparkContext,
config = jobConf,
fileReaders = readers,
tableSchema = tableSchema,
requiredSchema = requiredSchema,
rawRequiredSchema = rawRequiredSchema,
tableState = tableState,
mergeType = mergeType,
fileSplits = fileSplits)
Expand Down

0 comments on commit 39f927a

Please sign in to comment.