Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extract Delta Lake deletion vectors #627

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,39 @@

package org.apache.xtable.model;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;

import lombok.Builder;
import lombok.Value;

import org.apache.xtable.model.storage.DataFilesDiff;
import org.apache.xtable.model.storage.InternalDeletionVector;

/**
* Captures the changes in a single commit/instant from the source table.
*
* @since 0.1
*/
@Value
@Builder(toBuilder = true)
@Builder(toBuilder = true, builderClassName = "Builder")
public class TableChange {
// Change in files at the specified instant
DataFilesDiff filesDiff;

// A commit can add deletion vectors when some records are deleted. New deletion vectors can be
// added even if no new data files are added. However, as deletion vectors are always associated
// with a data file, they are implicitly removed when a corresponding data file is removed.
List<InternalDeletionVector> deletionVectorsAdded;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When we read a snapshot, will there be deletionVectors present there as well?


/** The {@link InternalTable} at the commit time to which this table change belongs. */
InternalTable tableAsOfChange;

public static class Builder {
public Builder deletionVectorsAdded(Collection<InternalDeletionVector> deletionVectorsAdded) {
this.deletionVectorsAdded = new ArrayList<>(deletionVectorsAdded);
return this;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.xtable.model.storage;

import java.util.Iterator;
import java.util.function.Supplier;

import lombok.AccessLevel;
import lombok.Builder;
import lombok.Getter;
import lombok.NonNull;
import lombok.Value;
import lombok.experimental.Accessors;

@Builder(toBuilder = true, builderClassName = "Builder")
@Accessors(fluent = true)
@Value
public class InternalDeletionVector {
// path (absolute with scheme) of data file to which this deletion vector belongs
@NonNull String dataFilePath;

// physical path of the deletion vector file (absolute with scheme)
String deletionVectorFilePath;

// offset of deletion vector start in the deletion vector file
int offset;

// length of the deletion vector in the deletion vector file
int length;

// count of records deleted by this deletion vector
long countRecordsDeleted;

@Getter(AccessLevel.NONE)
Supplier<Iterator<Long>> deleteRecordSupplier;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a doc for this as well? What does the long represent?


public Iterator<Long> deleteRecordIterator() {
return deleteRecordSupplier.get();
}

public static class Builder {
public Builder deleteRecordSupplier(Supplier<Iterator<Long>> recordsSupplier) {
this.deleteRecordSupplier = recordsSupplier;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this required? It seems very similar to what the Builder annotation will provide

return this;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@

package org.apache.xtable.delta;

import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;

import lombok.AccessLevel;
Expand All @@ -30,13 +32,17 @@
import org.apache.spark.sql.delta.actions.AddFile;
import org.apache.spark.sql.delta.actions.DeletionVectorDescriptor;
import org.apache.spark.sql.delta.actions.RemoveFile;
import org.apache.spark.sql.delta.deletionvectors.RoaringBitmapArray;
import org.apache.spark.sql.delta.storage.dv.DeletionVectorStore;
import org.apache.spark.sql.delta.storage.dv.HadoopFileSystemDVStore;

import org.apache.xtable.exception.NotSupportedException;
import org.apache.xtable.model.schema.InternalField;
import org.apache.xtable.model.schema.InternalPartitionField;
import org.apache.xtable.model.stat.ColumnStat;
import org.apache.xtable.model.storage.FileFormat;
import org.apache.xtable.model.storage.InternalDataFile;
import org.apache.xtable.model.storage.InternalDeletionVector;

@NoArgsConstructor(access = AccessLevel.PRIVATE)
public class DeltaActionsConverter {
Expand Down Expand Up @@ -115,16 +121,45 @@ static String getFullPathToFile(Snapshot snapshot, String dataFilePath) {
*
* @param snapshot the commit snapshot
* @param addFile the add file action
* @return the deletion vector representation (path of data file), or null if no deletion vector
* is present
* @return the deletion vector representation, or null if no deletion vector is present
*/
public String extractDeletionVectorFile(Snapshot snapshot, AddFile addFile) {
public InternalDeletionVector extractDeletionVector(Snapshot snapshot, AddFile addFile) {
DeletionVectorDescriptor deletionVector = addFile.deletionVector();
if (deletionVector == null) {
return null;
}

String dataFilePath = addFile.path();
return getFullPathToFile(snapshot, dataFilePath);
dataFilePath = getFullPathToFile(snapshot, dataFilePath);
Path deletionVectorFilePath = deletionVector.absolutePath(snapshot.deltaLog().dataPath());

// TODO assumes deletion vector file. Need to handle inlined deletion vectors
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a way to detect that it is not a file? Can we throw some unsupported operation exception for now if we hit that path?

InternalDeletionVector deleteVector =
InternalDeletionVector.builder()
.dataFilePath(dataFilePath)
.deletionVectorFilePath(deletionVectorFilePath.toString())
.countRecordsDeleted(deletionVector.cardinality())
.offset(getOffset(deletionVector))
.length(deletionVector.sizeInBytes())
.deleteRecordSupplier(() -> deletedRecordsIterator(snapshot, deletionVector))
.build();

return deleteVector;
}

private Iterator<Long> deletedRecordsIterator(
Snapshot snapshot, DeletionVectorDescriptor deleteVector) {
DeletionVectorStore dvStore =
new HadoopFileSystemDVStore(snapshot.deltaLog().newDeltaHadoopConf());

Path deletionVectorFilePath = deleteVector.absolutePath(snapshot.deltaLog().dataPath());
int size = deleteVector.sizeInBytes();
int offset = getOffset(deleteVector);
RoaringBitmapArray rbm = dvStore.read(deletionVectorFilePath, offset, size);
return Arrays.stream(rbm.values()).iterator();
}

private static int getOffset(DeletionVectorDescriptor deleteVector) {
return deleteVector.offset().isDefined() ? (int) deleteVector.offset().get() : 1;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,9 @@
import java.time.Instant;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;

import lombok.Builder;
import lombok.extern.log4j.Log4j2;
Expand Down Expand Up @@ -54,6 +52,7 @@
import org.apache.xtable.model.storage.DataFilesDiff;
import org.apache.xtable.model.storage.FileFormat;
import org.apache.xtable.model.storage.InternalDataFile;
import org.apache.xtable.model.storage.InternalDeletionVector;
import org.apache.xtable.model.storage.PartitionFileGroup;
import org.apache.xtable.spi.extractor.ConversionSource;
import org.apache.xtable.spi.extractor.DataFileIterator;
Expand Down Expand Up @@ -112,8 +111,8 @@ public TableChange getTableChangeForCommit(Long versionNumber) {
// All 3 of the following data structures use data file's absolute path as the key
Map<String, InternalDataFile> addedFiles = new HashMap<>();
Map<String, InternalDataFile> removedFiles = new HashMap<>();
// Set of data file paths for which deletion vectors exists.
Set<String> deletionVectors = new HashSet<>();
// Map of data file paths for which deletion vectors exists.
Map<String, InternalDeletionVector> deletionVectors = new HashMap<>();

for (Action action : actionsForVersion) {
if (action instanceof AddFile) {
Expand All @@ -128,10 +127,10 @@ public TableChange getTableChangeForCommit(Long versionNumber) {
DeltaPartitionExtractor.getInstance(),
DeltaStatsExtractor.getInstance());
addedFiles.put(dataFile.getPhysicalPath(), dataFile);
String deleteVectorPath =
actionsConverter.extractDeletionVectorFile(snapshotAtVersion, (AddFile) action);
if (deleteVectorPath != null) {
deletionVectors.add(deleteVectorPath);
InternalDeletionVector deletionVector =
actionsConverter.extractDeletionVector(snapshotAtVersion, (AddFile) action);
if (deletionVector != null) {
deletionVectors.put(deletionVector.dataFilePath(), deletionVector);
}
} else if (action instanceof RemoveFile) {
InternalDataFile dataFile =
Expand All @@ -150,7 +149,7 @@ public TableChange getTableChangeForCommit(Long versionNumber) {
// entry which is replaced by a new entry, AddFile with delete vector information. Since the
// same data file is removed and added, we need to remove it from the added and removed file
// maps which are used to track actual added and removed data files.
for (String deletionVector : deletionVectors) {
for (String deletionVector : deletionVectors.keySet()) {
// validate that a Remove action is also added for the data file
if (removedFiles.containsKey(deletionVector)) {
addedFiles.remove(deletionVector);
Expand All @@ -167,7 +166,11 @@ public TableChange getTableChangeForCommit(Long versionNumber) {
.filesAdded(addedFiles.values())
.filesRemoved(removedFiles.values())
.build();
return TableChange.builder().tableAsOfChange(tableAtVersion).filesDiff(dataFilesDiff).build();
return TableChange.builder()
.tableAsOfChange(tableAtVersion)
.deletionVectorsAdded(deletionVectors.values())
.filesDiff(dataFilesDiff)
.build();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.spark.sql.functions;

import org.apache.spark.sql.delta.DeltaLog;
import org.apache.spark.sql.delta.actions.AddFile;

import com.google.common.base.Preconditions;

Expand Down Expand Up @@ -212,11 +213,15 @@ private String initBasePath(Path tempDir, String tableName) throws IOException {
}

public List<String> getAllActiveFiles() {
return deltaLog.snapshot().allFiles().collectAsList().stream()
return getAllActiveFilesInfo().stream()
.map(addFile -> addSlashToBasePath(basePath) + addFile.path())
.collect(Collectors.toList());
}

public List<AddFile> getAllActiveFilesInfo() {
return deltaLog.snapshot().allFiles().collectAsList();
}

private String addSlashToBasePath(String basePath) {
if (basePath.endsWith("/")) {
return basePath;
Expand Down
Loading
Loading