diff --git a/xtable-api/src/main/java/org/apache/xtable/model/TableChange.java b/xtable-api/src/main/java/org/apache/xtable/model/TableChange.java index 51f0ee0bb..e750916f7 100644 --- a/xtable-api/src/main/java/org/apache/xtable/model/TableChange.java +++ b/xtable-api/src/main/java/org/apache/xtable/model/TableChange.java @@ -18,10 +18,15 @@ package org.apache.xtable.model; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + import lombok.Builder; import lombok.Value; import org.apache.xtable.model.storage.DataFilesDiff; +import org.apache.xtable.model.storage.InternalDeletionVector; /** * Captures the changes in a single commit/instant from the source table. @@ -29,11 +34,23 @@ * @since 0.1 */ @Value -@Builder(toBuilder = true) +@Builder(toBuilder = true, builderClassName = "Builder") public class TableChange { // Change in files at the specified instant DataFilesDiff filesDiff; + // A commit can add deletion vectors when some records are deleted. New deletion vectors can be + // added even if no new data files are added. However, as deletion vectors are always associated + // with a data file, they are implicitly removed when a corresponding data file is removed. + List deletionVectorsAdded; + /** The {@link InternalTable} at the commit time to which this table change belongs. */ InternalTable tableAsOfChange; + + public static class Builder { + public Builder deletionVectorsAdded(Collection deletionVectorsAdded) { + this.deletionVectorsAdded = new ArrayList<>(deletionVectorsAdded); + return this; + } + } } diff --git a/xtable-api/src/main/java/org/apache/xtable/model/storage/InternalDeletionVector.java b/xtable-api/src/main/java/org/apache/xtable/model/storage/InternalDeletionVector.java new file mode 100644 index 000000000..c7ae037e6 --- /dev/null +++ b/xtable-api/src/main/java/org/apache/xtable/model/storage/InternalDeletionVector.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.model.storage; + +import java.util.Iterator; +import java.util.function.Supplier; + +import lombok.AccessLevel; +import lombok.Builder; +import lombok.Getter; +import lombok.NonNull; +import lombok.Value; +import lombok.experimental.Accessors; + +@Builder(toBuilder = true, builderClassName = "Builder") +@Accessors(fluent = true) +@Value +public class InternalDeletionVector { + // path (absolute with scheme) of data file to which this deletion vector belongs + @NonNull String dataFilePath; + + // physical path of the deletion vector file (absolute with scheme) + String deletionVectorFilePath; + + // offset of deletion vector start in the deletion vector file + int offset; + + // length of the deletion vector in the deletion vector file + int length; + + // count of records deleted by this deletion vector + long countRecordsDeleted; + + @Getter(AccessLevel.NONE) + Supplier> deleteRecordSupplier; + + public Iterator deleteRecordIterator() { + return deleteRecordSupplier.get(); + } + + public static class Builder { + public Builder deleteRecordSupplier(Supplier> recordsSupplier) { + this.deleteRecordSupplier = recordsSupplier; + return this; + } + } +} diff --git a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaActionsConverter.java b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaActionsConverter.java index 40b822dfb..7eef2004c 100644 --- a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaActionsConverter.java +++ b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaActionsConverter.java @@ -18,7 +18,9 @@ package org.apache.xtable.delta; +import java.util.Arrays; import java.util.Collections; +import java.util.Iterator; import java.util.List; import lombok.AccessLevel; @@ -30,6 +32,9 @@ import org.apache.spark.sql.delta.actions.AddFile; import org.apache.spark.sql.delta.actions.DeletionVectorDescriptor; import org.apache.spark.sql.delta.actions.RemoveFile; +import org.apache.spark.sql.delta.deletionvectors.RoaringBitmapArray; +import org.apache.spark.sql.delta.storage.dv.DeletionVectorStore; +import org.apache.spark.sql.delta.storage.dv.HadoopFileSystemDVStore; import org.apache.xtable.exception.NotSupportedException; import org.apache.xtable.model.schema.InternalField; @@ -37,6 +42,7 @@ import org.apache.xtable.model.stat.ColumnStat; import org.apache.xtable.model.storage.FileFormat; import org.apache.xtable.model.storage.InternalDataFile; +import org.apache.xtable.model.storage.InternalDeletionVector; @NoArgsConstructor(access = AccessLevel.PRIVATE) public class DeltaActionsConverter { @@ -115,16 +121,45 @@ static String getFullPathToFile(Snapshot snapshot, String dataFilePath) { * * @param snapshot the commit snapshot * @param addFile the add file action - * @return the deletion vector representation (path of data file), or null if no deletion vector - * is present + * @return the deletion vector representation, or null if no deletion vector is present */ - public String extractDeletionVectorFile(Snapshot snapshot, AddFile addFile) { + public InternalDeletionVector extractDeletionVector(Snapshot snapshot, AddFile addFile) { DeletionVectorDescriptor deletionVector = addFile.deletionVector(); if (deletionVector == null) { return null; } String dataFilePath = addFile.path(); - return getFullPathToFile(snapshot, dataFilePath); + dataFilePath = getFullPathToFile(snapshot, dataFilePath); + Path deletionVectorFilePath = deletionVector.absolutePath(snapshot.deltaLog().dataPath()); + + // TODO assumes deletion vector file. Need to handle inlined deletion vectors + InternalDeletionVector deleteVector = + InternalDeletionVector.builder() + .dataFilePath(dataFilePath) + .deletionVectorFilePath(deletionVectorFilePath.toString()) + .countRecordsDeleted(deletionVector.cardinality()) + .offset(getOffset(deletionVector)) + .length(deletionVector.sizeInBytes()) + .deleteRecordSupplier(() -> deletedRecordsIterator(snapshot, deletionVector)) + .build(); + + return deleteVector; + } + + private Iterator deletedRecordsIterator( + Snapshot snapshot, DeletionVectorDescriptor deleteVector) { + DeletionVectorStore dvStore = + new HadoopFileSystemDVStore(snapshot.deltaLog().newDeltaHadoopConf()); + + Path deletionVectorFilePath = deleteVector.absolutePath(snapshot.deltaLog().dataPath()); + int size = deleteVector.sizeInBytes(); + int offset = getOffset(deleteVector); + RoaringBitmapArray rbm = dvStore.read(deletionVectorFilePath, offset, size); + return Arrays.stream(rbm.values()).iterator(); + } + + private static int getOffset(DeletionVectorDescriptor deleteVector) { + return deleteVector.offset().isDefined() ? (int) deleteVector.offset().get() : 1; } } diff --git a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaConversionSource.java b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaConversionSource.java index 140eb8adc..9d44401f3 100644 --- a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaConversionSource.java +++ b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaConversionSource.java @@ -22,11 +22,9 @@ import java.time.Instant; import java.util.ArrayList; import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.Set; import lombok.Builder; import lombok.extern.log4j.Log4j2; @@ -54,6 +52,7 @@ import org.apache.xtable.model.storage.DataFilesDiff; import org.apache.xtable.model.storage.FileFormat; import org.apache.xtable.model.storage.InternalDataFile; +import org.apache.xtable.model.storage.InternalDeletionVector; import org.apache.xtable.model.storage.PartitionFileGroup; import org.apache.xtable.spi.extractor.ConversionSource; import org.apache.xtable.spi.extractor.DataFileIterator; @@ -112,8 +111,8 @@ public TableChange getTableChangeForCommit(Long versionNumber) { // All 3 of the following data structures use data file's absolute path as the key Map addedFiles = new HashMap<>(); Map removedFiles = new HashMap<>(); - // Set of data file paths for which deletion vectors exists. - Set deletionVectors = new HashSet<>(); + // Map of data file paths for which deletion vectors exists. + Map deletionVectors = new HashMap<>(); for (Action action : actionsForVersion) { if (action instanceof AddFile) { @@ -128,10 +127,10 @@ public TableChange getTableChangeForCommit(Long versionNumber) { DeltaPartitionExtractor.getInstance(), DeltaStatsExtractor.getInstance()); addedFiles.put(dataFile.getPhysicalPath(), dataFile); - String deleteVectorPath = - actionsConverter.extractDeletionVectorFile(snapshotAtVersion, (AddFile) action); - if (deleteVectorPath != null) { - deletionVectors.add(deleteVectorPath); + InternalDeletionVector deletionVector = + actionsConverter.extractDeletionVector(snapshotAtVersion, (AddFile) action); + if (deletionVector != null) { + deletionVectors.put(deletionVector.dataFilePath(), deletionVector); } } else if (action instanceof RemoveFile) { InternalDataFile dataFile = @@ -150,7 +149,7 @@ public TableChange getTableChangeForCommit(Long versionNumber) { // entry which is replaced by a new entry, AddFile with delete vector information. Since the // same data file is removed and added, we need to remove it from the added and removed file // maps which are used to track actual added and removed data files. - for (String deletionVector : deletionVectors) { + for (String deletionVector : deletionVectors.keySet()) { // validate that a Remove action is also added for the data file if (removedFiles.containsKey(deletionVector)) { addedFiles.remove(deletionVector); @@ -167,7 +166,11 @@ public TableChange getTableChangeForCommit(Long versionNumber) { .filesAdded(addedFiles.values()) .filesRemoved(removedFiles.values()) .build(); - return TableChange.builder().tableAsOfChange(tableAtVersion).filesDiff(dataFilesDiff).build(); + return TableChange.builder() + .tableAsOfChange(tableAtVersion) + .deletionVectorsAdded(deletionVectors.values()) + .filesDiff(dataFilesDiff) + .build(); } @Override diff --git a/xtable-core/src/test/java/org/apache/xtable/TestSparkDeltaTable.java b/xtable-core/src/test/java/org/apache/xtable/TestSparkDeltaTable.java index ee5b1ccdd..909b1b790 100644 --- a/xtable-core/src/test/java/org/apache/xtable/TestSparkDeltaTable.java +++ b/xtable-core/src/test/java/org/apache/xtable/TestSparkDeltaTable.java @@ -39,6 +39,7 @@ import org.apache.spark.sql.functions; import org.apache.spark.sql.delta.DeltaLog; +import org.apache.spark.sql.delta.actions.AddFile; import com.google.common.base.Preconditions; @@ -212,11 +213,15 @@ private String initBasePath(Path tempDir, String tableName) throws IOException { } public List getAllActiveFiles() { - return deltaLog.snapshot().allFiles().collectAsList().stream() + return getAllActiveFilesInfo().stream() .map(addFile -> addSlashToBasePath(basePath) + addFile.path()) .collect(Collectors.toList()); } + public List getAllActiveFilesInfo() { + return deltaLog.snapshot().allFiles().collectAsList(); + } + private String addSlashToBasePath(String basePath) { if (basePath.endsWith("/")) { return basePath; diff --git a/xtable-core/src/test/java/org/apache/xtable/delta/ITDeltaDeleteVectorConvert.java b/xtable-core/src/test/java/org/apache/xtable/delta/ITDeltaDeleteVectorConvert.java index ed02893e3..c58088d57 100644 --- a/xtable-core/src/test/java/org/apache/xtable/delta/ITDeltaDeleteVectorConvert.java +++ b/xtable-core/src/test/java/org/apache/xtable/delta/ITDeltaDeleteVectorConvert.java @@ -19,11 +19,16 @@ package org.apache.xtable.delta; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; import java.nio.file.Path; +import java.nio.file.Paths; import java.time.Instant; import java.util.ArrayList; +import java.util.Collections; +import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.stream.Collectors; import org.apache.hadoop.conf.Configuration; @@ -38,6 +43,7 @@ import org.apache.spark.sql.delta.DeltaLog; import org.apache.spark.sql.delta.actions.AddFile; +import org.apache.spark.sql.delta.actions.DeletionVectorDescriptor; import scala.Option; @@ -49,6 +55,7 @@ import org.apache.xtable.model.InstantsForIncrementalSync; import org.apache.xtable.model.InternalSnapshot; import org.apache.xtable.model.TableChange; +import org.apache.xtable.model.storage.InternalDeletionVector; import org.apache.xtable.model.storage.TableFormat; public class ITDeltaDeleteVectorConvert { @@ -56,6 +63,7 @@ public class ITDeltaDeleteVectorConvert { private static SparkSession sparkSession; private DeltaConversionSourceProvider conversionSourceProvider; + private TestSparkDeltaTable testSparkDeltaTable; @BeforeAll public static void setupOnce() { @@ -91,11 +99,24 @@ void setUp() { conversionSourceProvider.init(hadoopConf); } + private static class TableState { + Map activeFiles; + List rowsToDelete; + + TableState(Map activeFiles) { + this(activeFiles, Collections.emptyList()); + } + + TableState(Map activeFiles, List rowsToDelete) { + this.activeFiles = activeFiles; + this.rowsToDelete = rowsToDelete; + } + } + @Test public void testInsertsUpsertsAndDeletes() { String tableName = GenericTable.getTableName(); - TestSparkDeltaTable testSparkDeltaTable = - new TestSparkDeltaTable(tableName, tempDir, sparkSession, null, false); + testSparkDeltaTable = new TestSparkDeltaTable(tableName, tempDir, sparkSession, null, false); // enable deletion vectors for the test table testSparkDeltaTable @@ -105,25 +126,30 @@ public void testInsertsUpsertsAndDeletes() { + tableName + " SET TBLPROPERTIES ('delta.enableDeletionVectors' = true)"); - List> allActiveFiles = new ArrayList<>(); + List testTableStates = new ArrayList<>(); List allTableChanges = new ArrayList<>(); List rows = testSparkDeltaTable.insertRows(50); Long timestamp1 = testSparkDeltaTable.getLastCommitTimestamp(); - allActiveFiles.add(testSparkDeltaTable.getAllActiveFiles()); + Map tableFiles = collectActiveFilesAfterCommit(testSparkDeltaTable); + testTableStates.add(new TableState(tableFiles, Collections.emptyList())); List rows1 = testSparkDeltaTable.insertRows(50); - allActiveFiles.add(testSparkDeltaTable.getAllActiveFiles()); + tableFiles = collectActiveFilesAfterCommit(testSparkDeltaTable); + testTableStates.add(new TableState(tableFiles)); + validateDeletedRecordCount(testSparkDeltaTable.getDeltaLog(), 0, 0); assertEquals(100L, testSparkDeltaTable.getNumRows()); - validateDeletedRecordCount(testSparkDeltaTable.getDeltaLog(), allActiveFiles.size() + 1, 0, 0); // upsert does not create delete vectors testSparkDeltaTable.upsertRows(rows.subList(0, 20)); - allActiveFiles.add(testSparkDeltaTable.getAllActiveFiles()); + tableFiles = collectActiveFilesAfterCommit(testSparkDeltaTable); + testTableStates.add(new TableState(tableFiles)); + validateDeletedRecordCount(testSparkDeltaTable.getDeltaLog(), 0, 0); assertEquals(100L, testSparkDeltaTable.getNumRows()); - validateDeletedRecordCount(testSparkDeltaTable.getDeltaLog(), allActiveFiles.size() + 1, 0, 0); testSparkDeltaTable.insertRows(50); - allActiveFiles.add(testSparkDeltaTable.getAllActiveFiles()); + tableFiles = collectActiveFilesAfterCommit(testSparkDeltaTable); + testTableStates.add(new TableState(tableFiles)); + validateDeletedRecordCount(testSparkDeltaTable.getDeltaLog(), 0, 0); assertEquals(150L, testSparkDeltaTable.getNumRows()); // delete a few rows with gaps in ids @@ -133,12 +159,15 @@ public void testInsertsUpsertsAndDeletes() { .collect(Collectors.toList()); rowsToDelete.addAll(rows.subList(35, 45)); testSparkDeltaTable.deleteRows(rowsToDelete); - allActiveFiles.add(testSparkDeltaTable.getAllActiveFiles()); + tableFiles = collectActiveFilesAfterCommit(testSparkDeltaTable); + testTableStates.add(new TableState(tableFiles, rowsToDelete)); + validateDeletedRecordCount(testSparkDeltaTable.getDeltaLog(), 2, 15); assertEquals(135L, testSparkDeltaTable.getNumRows()); - validateDeletedRecordCount(testSparkDeltaTable.getDeltaLog(), allActiveFiles.size() + 1, 2, 15); testSparkDeltaTable.insertRows(50); - allActiveFiles.add(testSparkDeltaTable.getAllActiveFiles()); + tableFiles = collectActiveFilesAfterCommit(testSparkDeltaTable); + testTableStates.add(new TableState(tableFiles)); + validateDeletedRecordCount(testSparkDeltaTable.getDeltaLog(), 2, 15); assertEquals(185L, testSparkDeltaTable.getNumRows()); // delete a few rows from a file which already has a deletion vector, this should generate a @@ -146,18 +175,22 @@ public void testInsertsUpsertsAndDeletes() { // This deletion step intentionally deletes the same rows again to test the merge. rowsToDelete = rows1.subList(5, 15); testSparkDeltaTable.deleteRows(rowsToDelete); - allActiveFiles.add(testSparkDeltaTable.getAllActiveFiles()); + tableFiles = collectActiveFilesAfterCommit(testSparkDeltaTable); + testTableStates.add(new TableState(tableFiles, rowsToDelete)); + validateDeletedRecordCount(testSparkDeltaTable.getDeltaLog(), 2, 22); assertEquals(178L, testSparkDeltaTable.getNumRows()); - validateDeletedRecordCount(testSparkDeltaTable.getDeltaLog(), allActiveFiles.size() + 1, 2, 22); testSparkDeltaTable.insertRows(50); - allActiveFiles.add(testSparkDeltaTable.getAllActiveFiles()); + tableFiles = collectActiveFilesAfterCommit(testSparkDeltaTable); + testTableStates.add(new TableState(tableFiles)); + validateDeletedRecordCount(testSparkDeltaTable.getDeltaLog(), 2, 22); assertEquals(228L, testSparkDeltaTable.getNumRows()); + String tableBasePath = testSparkDeltaTable.getBasePath(); SourceTable tableConfig = SourceTable.builder() .name(testSparkDeltaTable.getTableName()) - .basePath(testSparkDeltaTable.getBasePath()) + .basePath(tableBasePath) .formatName(TableFormat.DELTA) .build(); DeltaConversionSource conversionSource = @@ -165,8 +198,9 @@ public void testInsertsUpsertsAndDeletes() { InternalSnapshot internalSnapshot = conversionSource.getCurrentSnapshot(); // validateDeltaPartitioning(internalSnapshot); - ValidationTestHelper.validateSnapshot( - internalSnapshot, allActiveFiles.get(allActiveFiles.size() - 1)); + List activeDataFilePaths = + new ArrayList<>(testTableStates.get(testTableStates.size() - 1).activeFiles.keySet()); + ValidationTestHelper.validateSnapshot(internalSnapshot, activeDataFilePaths); // Get changes in incremental format. InstantsForIncrementalSync instantsForIncrementalSync = @@ -179,13 +213,124 @@ public void testInsertsUpsertsAndDeletes() { TableChange tableChange = conversionSource.getTableChangeForCommit(version); allTableChanges.add(tableChange); } - ValidationTestHelper.validateTableChanges(allActiveFiles, allTableChanges); + + List> allActiveDataFilePaths = + testTableStates.stream() + .map(s -> s.activeFiles) + .map(Map::keySet) + .map(ArrayList::new) + .collect(Collectors.toList()); + ValidationTestHelper.validateTableChanges(allActiveDataFilePaths, allTableChanges); + + validateDeletionInfo(testTableStates, allTableChanges); + } + + // collects active files in the current snapshot as a map and adds it to the list + private Map collectActiveFilesAfterCommit( + TestSparkDeltaTable testSparkDeltaTable) { + Map allFiles = + testSparkDeltaTable.getAllActiveFilesInfo().stream() + .collect( + Collectors.toMap( + file -> getAddFileAbsolutePath(file, testSparkDeltaTable.getBasePath()), + file -> file)); + return allFiles; + } + + private void validateDeletionInfo( + List testTableStates, List allTableChanges) { + if (allTableChanges.isEmpty() && testTableStates.size() <= 1) { + return; + } + + assertEquals( + allTableChanges.size(), + testTableStates.size() - 1, + "Number of table changes should be equal to number of commits - 1"); + + for (int i = 0; i < allTableChanges.size() - 1; i++) { + Map activeFileAfterCommit = testTableStates.get(i + 1).activeFiles; + Map activeFileBeforeCommit = testTableStates.get(i).activeFiles; + + Map activeFilesWithUpdatedDeleteInfo = + activeFileAfterCommit.entrySet().stream() + .filter(e -> e.getValue().deletionVector() != null) + .filter( + entry -> { + if (activeFileBeforeCommit.get(entry.getKey()) == null) { + return true; + } + if (activeFileBeforeCommit.get(entry.getKey()).deletionVector() == null) { + return true; + } + DeletionVectorDescriptor deletionVectorDescriptor = + activeFileBeforeCommit.get(entry.getKey()).deletionVector(); + return !deletionVectorDescriptor.equals(entry.getValue().deletionVector()); + }) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + + if (activeFilesWithUpdatedDeleteInfo.isEmpty()) { + continue; + } + + // validate all new delete vectors are correctly detected + validateDeletionInfoForCommit( + testTableStates.get(i + 1), activeFilesWithUpdatedDeleteInfo, allTableChanges.get(i)); + } + } + + private void validateDeletionInfoForCommit( + TableState tableState, + Map activeFilesAfterCommit, + TableChange changeDetectedForCommit) { + Map detectedDeleteInfos = + changeDetectedForCommit.getDeletionVectorsAdded().stream() + .collect(Collectors.toMap(InternalDeletionVector::dataFilePath, file -> file)); + + Map filesWithDeleteVectors = + activeFilesAfterCommit.entrySet().stream() + .filter(file -> file.getValue().deletionVector() != null) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + + assertEquals(filesWithDeleteVectors.size(), detectedDeleteInfos.size()); + + for (Map.Entry fileWithDeleteVector : filesWithDeleteVectors.entrySet()) { + InternalDeletionVector deleteInfo = detectedDeleteInfos.get(fileWithDeleteVector.getKey()); + assertNotNull(deleteInfo); + DeletionVectorDescriptor deletionVectorDescriptor = + fileWithDeleteVector.getValue().deletionVector(); + assertEquals(deletionVectorDescriptor.cardinality(), deleteInfo.countRecordsDeleted()); + assertEquals(deletionVectorDescriptor.sizeInBytes(), deleteInfo.length()); + assertEquals(deletionVectorDescriptor.offset().get(), deleteInfo.offset()); + + String deletionFilePath = + deletionVectorDescriptor + .absolutePath(new org.apache.hadoop.fs.Path(testSparkDeltaTable.getBasePath())) + .toString(); + assertEquals(deletionFilePath, deleteInfo.deletionVectorFilePath()); + + Iterator iterator = deleteInfo.deleteRecordIterator(); + List deletes = new ArrayList<>(); + iterator.forEachRemaining(deletes::add); + assertEquals(deletes.size(), deleteInfo.countRecordsDeleted()); + } + } + + private static String getAddFileAbsolutePath(AddFile file, String tableBasePath) { + String filePath = file.path(); + if (filePath.startsWith(tableBasePath)) { + return filePath; + } + return Paths.get(tableBasePath, file.path()).toString(); } private void validateDeletedRecordCount( - DeltaLog deltaLog, int version, int deleteVectorFileCount, int deletionRecordCount) { + DeltaLog deltaLog, int deleteVectorFileCount, int deletionRecordCount) { List allFiles = - deltaLog.getSnapshotAt(version, Option.empty()).allFiles().collectAsList(); + deltaLog + .getSnapshotAt(deltaLog.snapshot().version(), Option.empty()) + .allFiles() + .collectAsList(); List filesWithDeletionVectors = allFiles.stream().filter(f -> f.deletionVector() != null).collect(Collectors.toList()); diff --git a/xtable-core/src/test/java/org/apache/xtable/delta/TestDeltaActionsConverter.java b/xtable-core/src/test/java/org/apache/xtable/delta/TestDeltaActionsConverter.java index e62e93414..109ed1f49 100644 --- a/xtable-core/src/test/java/org/apache/xtable/delta/TestDeltaActionsConverter.java +++ b/xtable-core/src/test/java/org/apache/xtable/delta/TestDeltaActionsConverter.java @@ -18,10 +18,12 @@ package org.apache.xtable.delta; -import java.net.URISyntaxException; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.mockito.Mockito.when; import org.apache.hadoop.fs.Path; -import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import org.mockito.Mockito; @@ -32,10 +34,12 @@ import scala.Option; +import org.apache.xtable.model.storage.InternalDeletionVector; + class TestDeltaActionsConverter { @Test - void extractDeletionVector() throws URISyntaxException { + void extractDeletionVector() { DeltaActionsConverter actionsConverter = DeltaActionsConverter.getInstance(); int size = 123; @@ -49,7 +53,9 @@ void extractDeletionVector() throws URISyntaxException { DeletionVectorDescriptor deletionVector = null; AddFile addFileAction = new AddFile(filePath, null, size, time, dataChange, stats, null, deletionVector); - Assertions.assertNull(actionsConverter.extractDeletionVectorFile(snapshot, addFileAction)); + InternalDeletionVector internaldeletionVector = + actionsConverter.extractDeletionVector(snapshot, addFileAction); + assertNull(internaldeletionVector); deletionVector = DeletionVectorDescriptor.onDiskWithAbsolutePath( @@ -58,10 +64,13 @@ void extractDeletionVector() throws URISyntaxException { addFileAction = new AddFile(filePath, null, size, time, dataChange, stats, null, deletionVector); - Mockito.when(snapshot.deltaLog()).thenReturn(deltaLog); - Mockito.when(deltaLog.dataPath()) + when(snapshot.deltaLog()).thenReturn(deltaLog); + when(deltaLog.dataPath()) .thenReturn(new Path("https://container.blob.core.windows.net/tablepath")); - Assertions.assertEquals( - filePath, actionsConverter.extractDeletionVectorFile(snapshot, addFileAction)); + internaldeletionVector = actionsConverter.extractDeletionVector(snapshot, addFileAction); + assertNotNull(internaldeletionVector); + assertEquals(filePath, internaldeletionVector.dataFilePath()); + assertEquals(42, internaldeletionVector.countRecordsDeleted()); + assertEquals(size, internaldeletionVector.length()); } }