From 5f4f851549fa0ac191595bbf99e297a116ddc0df Mon Sep 17 00:00:00 2001 From: Ashvin Agrawal Date: Tue, 21 Jan 2025 21:00:09 -0800 Subject: [PATCH 1/3] Extract Delta Lake deletion vectors This change extracts deletion vectors represented as roaring bitmaps in delta lake files and converts them into the XTable intermediate representation. Previously, XTable only detected tables changes that included adding or removing of data files. Now the detected table change also includes any deletion vectors files added in the commit. Note that, in Delta Lake, the Deletion vectors are represented in a compressed binary format. However, once extracted by Xtable, the offset are currently extracted into a list of long offsets. This representation is not the most efficient for large datasets. Optimization is pending to prioritize end-to-end conversion completion. --- .../org/apache/xtable/model/TableChange.java | 19 +- .../model/storage/InternalDeletionVector.java | 63 ++++++ .../xtable/delta/DeltaActionsConverter.java | 39 +++- .../xtable/delta/DeltaConversionSource.java | 23 ++- .../apache/xtable/TestSparkDeltaTable.java | 7 +- .../delta/ITDeltaDeleteVectorConvert.java | 187 ++++++++++++++++-- .../delta/TestDeltaActionsConverter.java | 23 ++- 7 files changed, 318 insertions(+), 43 deletions(-) create mode 100644 xtable-api/src/main/java/org/apache/xtable/model/storage/InternalDeletionVector.java diff --git a/xtable-api/src/main/java/org/apache/xtable/model/TableChange.java b/xtable-api/src/main/java/org/apache/xtable/model/TableChange.java index 51f0ee0bb..e750916f7 100644 --- a/xtable-api/src/main/java/org/apache/xtable/model/TableChange.java +++ b/xtable-api/src/main/java/org/apache/xtable/model/TableChange.java @@ -18,10 +18,15 @@ package org.apache.xtable.model; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + import lombok.Builder; import lombok.Value; import org.apache.xtable.model.storage.DataFilesDiff; +import org.apache.xtable.model.storage.InternalDeletionVector; /** * Captures the changes in a single commit/instant from the source table. @@ -29,11 +34,23 @@ * @since 0.1 */ @Value -@Builder(toBuilder = true) +@Builder(toBuilder = true, builderClassName = "Builder") public class TableChange { // Change in files at the specified instant DataFilesDiff filesDiff; + // A commit can add deletion vectors when some records are deleted. New deletion vectors can be + // added even if no new data files are added. However, as deletion vectors are always associated + // with a data file, they are implicitly removed when a corresponding data file is removed. + List deletionVectorsAdded; + /** The {@link InternalTable} at the commit time to which this table change belongs. */ InternalTable tableAsOfChange; + + public static class Builder { + public Builder deletionVectorsAdded(Collection deletionVectorsAdded) { + this.deletionVectorsAdded = new ArrayList<>(deletionVectorsAdded); + return this; + } + } } diff --git a/xtable-api/src/main/java/org/apache/xtable/model/storage/InternalDeletionVector.java b/xtable-api/src/main/java/org/apache/xtable/model/storage/InternalDeletionVector.java new file mode 100644 index 000000000..c7ae037e6 --- /dev/null +++ b/xtable-api/src/main/java/org/apache/xtable/model/storage/InternalDeletionVector.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.model.storage; + +import java.util.Iterator; +import java.util.function.Supplier; + +import lombok.AccessLevel; +import lombok.Builder; +import lombok.Getter; +import lombok.NonNull; +import lombok.Value; +import lombok.experimental.Accessors; + +@Builder(toBuilder = true, builderClassName = "Builder") +@Accessors(fluent = true) +@Value +public class InternalDeletionVector { + // path (absolute with scheme) of data file to which this deletion vector belongs + @NonNull String dataFilePath; + + // physical path of the deletion vector file (absolute with scheme) + String deletionVectorFilePath; + + // offset of deletion vector start in the deletion vector file + int offset; + + // length of the deletion vector in the deletion vector file + int length; + + // count of records deleted by this deletion vector + long countRecordsDeleted; + + @Getter(AccessLevel.NONE) + Supplier> deleteRecordSupplier; + + public Iterator deleteRecordIterator() { + return deleteRecordSupplier.get(); + } + + public static class Builder { + public Builder deleteRecordSupplier(Supplier> recordsSupplier) { + this.deleteRecordSupplier = recordsSupplier; + return this; + } + } +} diff --git a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaActionsConverter.java b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaActionsConverter.java index 40b822dfb..38eb65c66 100644 --- a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaActionsConverter.java +++ b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaActionsConverter.java @@ -18,7 +18,9 @@ package org.apache.xtable.delta; +import java.util.Arrays; import java.util.Collections; +import java.util.Iterator; import java.util.List; import lombok.AccessLevel; @@ -30,6 +32,9 @@ import org.apache.spark.sql.delta.actions.AddFile; import org.apache.spark.sql.delta.actions.DeletionVectorDescriptor; import org.apache.spark.sql.delta.actions.RemoveFile; +import org.apache.spark.sql.delta.deletionvectors.RoaringBitmapArray; +import org.apache.spark.sql.delta.storage.dv.DeletionVectorStore; +import org.apache.spark.sql.delta.storage.dv.HadoopFileSystemDVStore; import org.apache.xtable.exception.NotSupportedException; import org.apache.xtable.model.schema.InternalField; @@ -37,6 +42,7 @@ import org.apache.xtable.model.stat.ColumnStat; import org.apache.xtable.model.storage.FileFormat; import org.apache.xtable.model.storage.InternalDataFile; +import org.apache.xtable.model.storage.InternalDeletionVector; @NoArgsConstructor(access = AccessLevel.PRIVATE) public class DeltaActionsConverter { @@ -115,16 +121,41 @@ static String getFullPathToFile(Snapshot snapshot, String dataFilePath) { * * @param snapshot the commit snapshot * @param addFile the add file action - * @return the deletion vector representation (path of data file), or null if no deletion vector - * is present + * @return the deletion vector representation, or null if no deletion vector is present */ - public String extractDeletionVectorFile(Snapshot snapshot, AddFile addFile) { + public InternalDeletionVector extractDeletionVector(Snapshot snapshot, AddFile addFile) { DeletionVectorDescriptor deletionVector = addFile.deletionVector(); if (deletionVector == null) { return null; } String dataFilePath = addFile.path(); - return getFullPathToFile(snapshot, dataFilePath); + dataFilePath = getFullPathToFile(snapshot, dataFilePath); + Path deletionVectorFilePath = deletionVector.absolutePath(snapshot.deltaLog().dataPath()); + + // TODO assumes deletion vector file. Need to handle inlined deletion vectors + InternalDeletionVector deleteVector = + InternalDeletionVector.builder() + .dataFilePath(dataFilePath) + .deletionVectorFilePath(deletionVectorFilePath.toString()) + .countRecordsDeleted(deletionVector.cardinality()) + .offset((Integer) deletionVector.offset().get()) + .length(deletionVector.sizeInBytes()) + .deleteRecordSupplier(() -> deletedRecordsIterator(snapshot, deletionVector)) + .build(); + + return deleteVector; + } + + private Iterator deletedRecordsIterator( + Snapshot snapshot, DeletionVectorDescriptor deleteVector) { + DeletionVectorStore dvStore = + new HadoopFileSystemDVStore(snapshot.deltaLog().newDeltaHadoopConf()); + + Path deletionVectorFilePath = deleteVector.absolutePath(snapshot.deltaLog().dataPath()); + int size = deleteVector.sizeInBytes(); + int offset = deleteVector.offset().isDefined() ? (int) deleteVector.offset().get() : 1; + RoaringBitmapArray rbm = dvStore.read(deletionVectorFilePath, offset, size); + return Arrays.stream(rbm.values()).iterator(); } } diff --git a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaConversionSource.java b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaConversionSource.java index 140eb8adc..9d44401f3 100644 --- a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaConversionSource.java +++ b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaConversionSource.java @@ -22,11 +22,9 @@ import java.time.Instant; import java.util.ArrayList; import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.Set; import lombok.Builder; import lombok.extern.log4j.Log4j2; @@ -54,6 +52,7 @@ import org.apache.xtable.model.storage.DataFilesDiff; import org.apache.xtable.model.storage.FileFormat; import org.apache.xtable.model.storage.InternalDataFile; +import org.apache.xtable.model.storage.InternalDeletionVector; import org.apache.xtable.model.storage.PartitionFileGroup; import org.apache.xtable.spi.extractor.ConversionSource; import org.apache.xtable.spi.extractor.DataFileIterator; @@ -112,8 +111,8 @@ public TableChange getTableChangeForCommit(Long versionNumber) { // All 3 of the following data structures use data file's absolute path as the key Map addedFiles = new HashMap<>(); Map removedFiles = new HashMap<>(); - // Set of data file paths for which deletion vectors exists. - Set deletionVectors = new HashSet<>(); + // Map of data file paths for which deletion vectors exists. + Map deletionVectors = new HashMap<>(); for (Action action : actionsForVersion) { if (action instanceof AddFile) { @@ -128,10 +127,10 @@ public TableChange getTableChangeForCommit(Long versionNumber) { DeltaPartitionExtractor.getInstance(), DeltaStatsExtractor.getInstance()); addedFiles.put(dataFile.getPhysicalPath(), dataFile); - String deleteVectorPath = - actionsConverter.extractDeletionVectorFile(snapshotAtVersion, (AddFile) action); - if (deleteVectorPath != null) { - deletionVectors.add(deleteVectorPath); + InternalDeletionVector deletionVector = + actionsConverter.extractDeletionVector(snapshotAtVersion, (AddFile) action); + if (deletionVector != null) { + deletionVectors.put(deletionVector.dataFilePath(), deletionVector); } } else if (action instanceof RemoveFile) { InternalDataFile dataFile = @@ -150,7 +149,7 @@ public TableChange getTableChangeForCommit(Long versionNumber) { // entry which is replaced by a new entry, AddFile with delete vector information. Since the // same data file is removed and added, we need to remove it from the added and removed file // maps which are used to track actual added and removed data files. - for (String deletionVector : deletionVectors) { + for (String deletionVector : deletionVectors.keySet()) { // validate that a Remove action is also added for the data file if (removedFiles.containsKey(deletionVector)) { addedFiles.remove(deletionVector); @@ -167,7 +166,11 @@ public TableChange getTableChangeForCommit(Long versionNumber) { .filesAdded(addedFiles.values()) .filesRemoved(removedFiles.values()) .build(); - return TableChange.builder().tableAsOfChange(tableAtVersion).filesDiff(dataFilesDiff).build(); + return TableChange.builder() + .tableAsOfChange(tableAtVersion) + .deletionVectorsAdded(deletionVectors.values()) + .filesDiff(dataFilesDiff) + .build(); } @Override diff --git a/xtable-core/src/test/java/org/apache/xtable/TestSparkDeltaTable.java b/xtable-core/src/test/java/org/apache/xtable/TestSparkDeltaTable.java index ee5b1ccdd..909b1b790 100644 --- a/xtable-core/src/test/java/org/apache/xtable/TestSparkDeltaTable.java +++ b/xtable-core/src/test/java/org/apache/xtable/TestSparkDeltaTable.java @@ -39,6 +39,7 @@ import org.apache.spark.sql.functions; import org.apache.spark.sql.delta.DeltaLog; +import org.apache.spark.sql.delta.actions.AddFile; import com.google.common.base.Preconditions; @@ -212,11 +213,15 @@ private String initBasePath(Path tempDir, String tableName) throws IOException { } public List getAllActiveFiles() { - return deltaLog.snapshot().allFiles().collectAsList().stream() + return getAllActiveFilesInfo().stream() .map(addFile -> addSlashToBasePath(basePath) + addFile.path()) .collect(Collectors.toList()); } + public List getAllActiveFilesInfo() { + return deltaLog.snapshot().allFiles().collectAsList(); + } + private String addSlashToBasePath(String basePath) { if (basePath.endsWith("/")) { return basePath; diff --git a/xtable-core/src/test/java/org/apache/xtable/delta/ITDeltaDeleteVectorConvert.java b/xtable-core/src/test/java/org/apache/xtable/delta/ITDeltaDeleteVectorConvert.java index ed02893e3..c58088d57 100644 --- a/xtable-core/src/test/java/org/apache/xtable/delta/ITDeltaDeleteVectorConvert.java +++ b/xtable-core/src/test/java/org/apache/xtable/delta/ITDeltaDeleteVectorConvert.java @@ -19,11 +19,16 @@ package org.apache.xtable.delta; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; import java.nio.file.Path; +import java.nio.file.Paths; import java.time.Instant; import java.util.ArrayList; +import java.util.Collections; +import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.stream.Collectors; import org.apache.hadoop.conf.Configuration; @@ -38,6 +43,7 @@ import org.apache.spark.sql.delta.DeltaLog; import org.apache.spark.sql.delta.actions.AddFile; +import org.apache.spark.sql.delta.actions.DeletionVectorDescriptor; import scala.Option; @@ -49,6 +55,7 @@ import org.apache.xtable.model.InstantsForIncrementalSync; import org.apache.xtable.model.InternalSnapshot; import org.apache.xtable.model.TableChange; +import org.apache.xtable.model.storage.InternalDeletionVector; import org.apache.xtable.model.storage.TableFormat; public class ITDeltaDeleteVectorConvert { @@ -56,6 +63,7 @@ public class ITDeltaDeleteVectorConvert { private static SparkSession sparkSession; private DeltaConversionSourceProvider conversionSourceProvider; + private TestSparkDeltaTable testSparkDeltaTable; @BeforeAll public static void setupOnce() { @@ -91,11 +99,24 @@ void setUp() { conversionSourceProvider.init(hadoopConf); } + private static class TableState { + Map activeFiles; + List rowsToDelete; + + TableState(Map activeFiles) { + this(activeFiles, Collections.emptyList()); + } + + TableState(Map activeFiles, List rowsToDelete) { + this.activeFiles = activeFiles; + this.rowsToDelete = rowsToDelete; + } + } + @Test public void testInsertsUpsertsAndDeletes() { String tableName = GenericTable.getTableName(); - TestSparkDeltaTable testSparkDeltaTable = - new TestSparkDeltaTable(tableName, tempDir, sparkSession, null, false); + testSparkDeltaTable = new TestSparkDeltaTable(tableName, tempDir, sparkSession, null, false); // enable deletion vectors for the test table testSparkDeltaTable @@ -105,25 +126,30 @@ public void testInsertsUpsertsAndDeletes() { + tableName + " SET TBLPROPERTIES ('delta.enableDeletionVectors' = true)"); - List> allActiveFiles = new ArrayList<>(); + List testTableStates = new ArrayList<>(); List allTableChanges = new ArrayList<>(); List rows = testSparkDeltaTable.insertRows(50); Long timestamp1 = testSparkDeltaTable.getLastCommitTimestamp(); - allActiveFiles.add(testSparkDeltaTable.getAllActiveFiles()); + Map tableFiles = collectActiveFilesAfterCommit(testSparkDeltaTable); + testTableStates.add(new TableState(tableFiles, Collections.emptyList())); List rows1 = testSparkDeltaTable.insertRows(50); - allActiveFiles.add(testSparkDeltaTable.getAllActiveFiles()); + tableFiles = collectActiveFilesAfterCommit(testSparkDeltaTable); + testTableStates.add(new TableState(tableFiles)); + validateDeletedRecordCount(testSparkDeltaTable.getDeltaLog(), 0, 0); assertEquals(100L, testSparkDeltaTable.getNumRows()); - validateDeletedRecordCount(testSparkDeltaTable.getDeltaLog(), allActiveFiles.size() + 1, 0, 0); // upsert does not create delete vectors testSparkDeltaTable.upsertRows(rows.subList(0, 20)); - allActiveFiles.add(testSparkDeltaTable.getAllActiveFiles()); + tableFiles = collectActiveFilesAfterCommit(testSparkDeltaTable); + testTableStates.add(new TableState(tableFiles)); + validateDeletedRecordCount(testSparkDeltaTable.getDeltaLog(), 0, 0); assertEquals(100L, testSparkDeltaTable.getNumRows()); - validateDeletedRecordCount(testSparkDeltaTable.getDeltaLog(), allActiveFiles.size() + 1, 0, 0); testSparkDeltaTable.insertRows(50); - allActiveFiles.add(testSparkDeltaTable.getAllActiveFiles()); + tableFiles = collectActiveFilesAfterCommit(testSparkDeltaTable); + testTableStates.add(new TableState(tableFiles)); + validateDeletedRecordCount(testSparkDeltaTable.getDeltaLog(), 0, 0); assertEquals(150L, testSparkDeltaTable.getNumRows()); // delete a few rows with gaps in ids @@ -133,12 +159,15 @@ public void testInsertsUpsertsAndDeletes() { .collect(Collectors.toList()); rowsToDelete.addAll(rows.subList(35, 45)); testSparkDeltaTable.deleteRows(rowsToDelete); - allActiveFiles.add(testSparkDeltaTable.getAllActiveFiles()); + tableFiles = collectActiveFilesAfterCommit(testSparkDeltaTable); + testTableStates.add(new TableState(tableFiles, rowsToDelete)); + validateDeletedRecordCount(testSparkDeltaTable.getDeltaLog(), 2, 15); assertEquals(135L, testSparkDeltaTable.getNumRows()); - validateDeletedRecordCount(testSparkDeltaTable.getDeltaLog(), allActiveFiles.size() + 1, 2, 15); testSparkDeltaTable.insertRows(50); - allActiveFiles.add(testSparkDeltaTable.getAllActiveFiles()); + tableFiles = collectActiveFilesAfterCommit(testSparkDeltaTable); + testTableStates.add(new TableState(tableFiles)); + validateDeletedRecordCount(testSparkDeltaTable.getDeltaLog(), 2, 15); assertEquals(185L, testSparkDeltaTable.getNumRows()); // delete a few rows from a file which already has a deletion vector, this should generate a @@ -146,18 +175,22 @@ public void testInsertsUpsertsAndDeletes() { // This deletion step intentionally deletes the same rows again to test the merge. rowsToDelete = rows1.subList(5, 15); testSparkDeltaTable.deleteRows(rowsToDelete); - allActiveFiles.add(testSparkDeltaTable.getAllActiveFiles()); + tableFiles = collectActiveFilesAfterCommit(testSparkDeltaTable); + testTableStates.add(new TableState(tableFiles, rowsToDelete)); + validateDeletedRecordCount(testSparkDeltaTable.getDeltaLog(), 2, 22); assertEquals(178L, testSparkDeltaTable.getNumRows()); - validateDeletedRecordCount(testSparkDeltaTable.getDeltaLog(), allActiveFiles.size() + 1, 2, 22); testSparkDeltaTable.insertRows(50); - allActiveFiles.add(testSparkDeltaTable.getAllActiveFiles()); + tableFiles = collectActiveFilesAfterCommit(testSparkDeltaTable); + testTableStates.add(new TableState(tableFiles)); + validateDeletedRecordCount(testSparkDeltaTable.getDeltaLog(), 2, 22); assertEquals(228L, testSparkDeltaTable.getNumRows()); + String tableBasePath = testSparkDeltaTable.getBasePath(); SourceTable tableConfig = SourceTable.builder() .name(testSparkDeltaTable.getTableName()) - .basePath(testSparkDeltaTable.getBasePath()) + .basePath(tableBasePath) .formatName(TableFormat.DELTA) .build(); DeltaConversionSource conversionSource = @@ -165,8 +198,9 @@ public void testInsertsUpsertsAndDeletes() { InternalSnapshot internalSnapshot = conversionSource.getCurrentSnapshot(); // validateDeltaPartitioning(internalSnapshot); - ValidationTestHelper.validateSnapshot( - internalSnapshot, allActiveFiles.get(allActiveFiles.size() - 1)); + List activeDataFilePaths = + new ArrayList<>(testTableStates.get(testTableStates.size() - 1).activeFiles.keySet()); + ValidationTestHelper.validateSnapshot(internalSnapshot, activeDataFilePaths); // Get changes in incremental format. InstantsForIncrementalSync instantsForIncrementalSync = @@ -179,13 +213,124 @@ public void testInsertsUpsertsAndDeletes() { TableChange tableChange = conversionSource.getTableChangeForCommit(version); allTableChanges.add(tableChange); } - ValidationTestHelper.validateTableChanges(allActiveFiles, allTableChanges); + + List> allActiveDataFilePaths = + testTableStates.stream() + .map(s -> s.activeFiles) + .map(Map::keySet) + .map(ArrayList::new) + .collect(Collectors.toList()); + ValidationTestHelper.validateTableChanges(allActiveDataFilePaths, allTableChanges); + + validateDeletionInfo(testTableStates, allTableChanges); + } + + // collects active files in the current snapshot as a map and adds it to the list + private Map collectActiveFilesAfterCommit( + TestSparkDeltaTable testSparkDeltaTable) { + Map allFiles = + testSparkDeltaTable.getAllActiveFilesInfo().stream() + .collect( + Collectors.toMap( + file -> getAddFileAbsolutePath(file, testSparkDeltaTable.getBasePath()), + file -> file)); + return allFiles; + } + + private void validateDeletionInfo( + List testTableStates, List allTableChanges) { + if (allTableChanges.isEmpty() && testTableStates.size() <= 1) { + return; + } + + assertEquals( + allTableChanges.size(), + testTableStates.size() - 1, + "Number of table changes should be equal to number of commits - 1"); + + for (int i = 0; i < allTableChanges.size() - 1; i++) { + Map activeFileAfterCommit = testTableStates.get(i + 1).activeFiles; + Map activeFileBeforeCommit = testTableStates.get(i).activeFiles; + + Map activeFilesWithUpdatedDeleteInfo = + activeFileAfterCommit.entrySet().stream() + .filter(e -> e.getValue().deletionVector() != null) + .filter( + entry -> { + if (activeFileBeforeCommit.get(entry.getKey()) == null) { + return true; + } + if (activeFileBeforeCommit.get(entry.getKey()).deletionVector() == null) { + return true; + } + DeletionVectorDescriptor deletionVectorDescriptor = + activeFileBeforeCommit.get(entry.getKey()).deletionVector(); + return !deletionVectorDescriptor.equals(entry.getValue().deletionVector()); + }) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + + if (activeFilesWithUpdatedDeleteInfo.isEmpty()) { + continue; + } + + // validate all new delete vectors are correctly detected + validateDeletionInfoForCommit( + testTableStates.get(i + 1), activeFilesWithUpdatedDeleteInfo, allTableChanges.get(i)); + } + } + + private void validateDeletionInfoForCommit( + TableState tableState, + Map activeFilesAfterCommit, + TableChange changeDetectedForCommit) { + Map detectedDeleteInfos = + changeDetectedForCommit.getDeletionVectorsAdded().stream() + .collect(Collectors.toMap(InternalDeletionVector::dataFilePath, file -> file)); + + Map filesWithDeleteVectors = + activeFilesAfterCommit.entrySet().stream() + .filter(file -> file.getValue().deletionVector() != null) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + + assertEquals(filesWithDeleteVectors.size(), detectedDeleteInfos.size()); + + for (Map.Entry fileWithDeleteVector : filesWithDeleteVectors.entrySet()) { + InternalDeletionVector deleteInfo = detectedDeleteInfos.get(fileWithDeleteVector.getKey()); + assertNotNull(deleteInfo); + DeletionVectorDescriptor deletionVectorDescriptor = + fileWithDeleteVector.getValue().deletionVector(); + assertEquals(deletionVectorDescriptor.cardinality(), deleteInfo.countRecordsDeleted()); + assertEquals(deletionVectorDescriptor.sizeInBytes(), deleteInfo.length()); + assertEquals(deletionVectorDescriptor.offset().get(), deleteInfo.offset()); + + String deletionFilePath = + deletionVectorDescriptor + .absolutePath(new org.apache.hadoop.fs.Path(testSparkDeltaTable.getBasePath())) + .toString(); + assertEquals(deletionFilePath, deleteInfo.deletionVectorFilePath()); + + Iterator iterator = deleteInfo.deleteRecordIterator(); + List deletes = new ArrayList<>(); + iterator.forEachRemaining(deletes::add); + assertEquals(deletes.size(), deleteInfo.countRecordsDeleted()); + } + } + + private static String getAddFileAbsolutePath(AddFile file, String tableBasePath) { + String filePath = file.path(); + if (filePath.startsWith(tableBasePath)) { + return filePath; + } + return Paths.get(tableBasePath, file.path()).toString(); } private void validateDeletedRecordCount( - DeltaLog deltaLog, int version, int deleteVectorFileCount, int deletionRecordCount) { + DeltaLog deltaLog, int deleteVectorFileCount, int deletionRecordCount) { List allFiles = - deltaLog.getSnapshotAt(version, Option.empty()).allFiles().collectAsList(); + deltaLog + .getSnapshotAt(deltaLog.snapshot().version(), Option.empty()) + .allFiles() + .collectAsList(); List filesWithDeletionVectors = allFiles.stream().filter(f -> f.deletionVector() != null).collect(Collectors.toList()); diff --git a/xtable-core/src/test/java/org/apache/xtable/delta/TestDeltaActionsConverter.java b/xtable-core/src/test/java/org/apache/xtable/delta/TestDeltaActionsConverter.java index e62e93414..a2fa12de2 100644 --- a/xtable-core/src/test/java/org/apache/xtable/delta/TestDeltaActionsConverter.java +++ b/xtable-core/src/test/java/org/apache/xtable/delta/TestDeltaActionsConverter.java @@ -18,10 +18,14 @@ package org.apache.xtable.delta; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.mockito.Mockito.when; + import java.net.URISyntaxException; import org.apache.hadoop.fs.Path; -import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import org.mockito.Mockito; @@ -32,6 +36,8 @@ import scala.Option; +import org.apache.xtable.model.storage.InternalDeletionVector; + class TestDeltaActionsConverter { @Test @@ -49,7 +55,9 @@ void extractDeletionVector() throws URISyntaxException { DeletionVectorDescriptor deletionVector = null; AddFile addFileAction = new AddFile(filePath, null, size, time, dataChange, stats, null, deletionVector); - Assertions.assertNull(actionsConverter.extractDeletionVectorFile(snapshot, addFileAction)); + InternalDeletionVector internaldeletionVector = + actionsConverter.extractDeletionVector(snapshot, addFileAction); + assertNull(internaldeletionVector); deletionVector = DeletionVectorDescriptor.onDiskWithAbsolutePath( @@ -58,10 +66,13 @@ void extractDeletionVector() throws URISyntaxException { addFileAction = new AddFile(filePath, null, size, time, dataChange, stats, null, deletionVector); - Mockito.when(snapshot.deltaLog()).thenReturn(deltaLog); - Mockito.when(deltaLog.dataPath()) + when(snapshot.deltaLog()).thenReturn(deltaLog); + when(deltaLog.dataPath()) .thenReturn(new Path("https://container.blob.core.windows.net/tablepath")); - Assertions.assertEquals( - filePath, actionsConverter.extractDeletionVectorFile(snapshot, addFileAction)); + internaldeletionVector = actionsConverter.extractDeletionVector(snapshot, addFileAction); + assertNotNull(internaldeletionVector); + assertEquals(filePath, internaldeletionVector.dataFilePath()); + assertEquals(42, internaldeletionVector.countRecordsDeleted()); + assertEquals(size, internaldeletionVector.length()); } } From 7786c5bd2cb970cc571b56d401a9bb194d992e30 Mon Sep 17 00:00:00 2001 From: Ashvin Agrawal Date: Wed, 22 Jan 2025 17:42:39 -0800 Subject: [PATCH 2/3] Handle optional offset information --- .../org/apache/xtable/delta/DeltaActionsConverter.java | 8 ++++++-- .../apache/xtable/delta/TestDeltaActionsConverter.java | 2 +- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaActionsConverter.java b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaActionsConverter.java index 38eb65c66..7eef2004c 100644 --- a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaActionsConverter.java +++ b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaActionsConverter.java @@ -139,7 +139,7 @@ public InternalDeletionVector extractDeletionVector(Snapshot snapshot, AddFile a .dataFilePath(dataFilePath) .deletionVectorFilePath(deletionVectorFilePath.toString()) .countRecordsDeleted(deletionVector.cardinality()) - .offset((Integer) deletionVector.offset().get()) + .offset(getOffset(deletionVector)) .length(deletionVector.sizeInBytes()) .deleteRecordSupplier(() -> deletedRecordsIterator(snapshot, deletionVector)) .build(); @@ -154,8 +154,12 @@ private Iterator deletedRecordsIterator( Path deletionVectorFilePath = deleteVector.absolutePath(snapshot.deltaLog().dataPath()); int size = deleteVector.sizeInBytes(); - int offset = deleteVector.offset().isDefined() ? (int) deleteVector.offset().get() : 1; + int offset = getOffset(deleteVector); RoaringBitmapArray rbm = dvStore.read(deletionVectorFilePath, offset, size); return Arrays.stream(rbm.values()).iterator(); } + + private static int getOffset(DeletionVectorDescriptor deleteVector) { + return deleteVector.offset().isDefined() ? (int) deleteVector.offset().get() : 1; + } } diff --git a/xtable-core/src/test/java/org/apache/xtable/delta/TestDeltaActionsConverter.java b/xtable-core/src/test/java/org/apache/xtable/delta/TestDeltaActionsConverter.java index a2fa12de2..d06afb531 100644 --- a/xtable-core/src/test/java/org/apache/xtable/delta/TestDeltaActionsConverter.java +++ b/xtable-core/src/test/java/org/apache/xtable/delta/TestDeltaActionsConverter.java @@ -41,7 +41,7 @@ class TestDeltaActionsConverter { @Test - void extractDeletionVector() throws URISyntaxException { + void extractDeletionVector() { DeltaActionsConverter actionsConverter = DeltaActionsConverter.getInstance(); int size = 123; From 92712a6bbf9091210302c178febaabd69c835bda Mon Sep 17 00:00:00 2001 From: Ashvin Agrawal Date: Wed, 22 Jan 2025 20:46:34 -0800 Subject: [PATCH 3/3] Fix spotless error, unused import --- .../java/org/apache/xtable/delta/TestDeltaActionsConverter.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/xtable-core/src/test/java/org/apache/xtable/delta/TestDeltaActionsConverter.java b/xtable-core/src/test/java/org/apache/xtable/delta/TestDeltaActionsConverter.java index d06afb531..109ed1f49 100644 --- a/xtable-core/src/test/java/org/apache/xtable/delta/TestDeltaActionsConverter.java +++ b/xtable-core/src/test/java/org/apache/xtable/delta/TestDeltaActionsConverter.java @@ -23,8 +23,6 @@ import static org.junit.jupiter.api.Assertions.assertNull; import static org.mockito.Mockito.when; -import java.net.URISyntaxException; - import org.apache.hadoop.fs.Path; import org.junit.jupiter.api.Test; import org.mockito.Mockito;