Skip to content

Commit

Permalink
fix(#3388): Remove asset links from adapters when deleted
Browse files Browse the repository at this point in the history
  • Loading branch information
tenthe committed Dec 30, 2024
1 parent 179ed94 commit 8385f8d
Show file tree
Hide file tree
Showing 19 changed files with 506 additions and 10 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -872,7 +872,7 @@
<module>streampipes-wrapper-kafka-streams</module>
<module>streampipes-wrapper-siddhi</module>
<module>streampipes-wrapper-standalone</module>
<module>asset-model-management</module>
<module>streampipes-asset-model-management</module>
</modules>

<profiles>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
<version>0.97.0-SNAPSHOT</version>
</parent>

<artifactId>asset-model-management</artifactId>
<artifactId>streampipes-asset-model-management</artifactId>

<properties>
<maven.compiler.source>17</maven.compiler.source>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.apache.streampipes.assetmodel.management;

import org.apache.streampipes.model.assets.SpAsset;
import org.apache.streampipes.model.assets.SpAssetModel;
import org.apache.streampipes.storage.management.StorageDispatcher;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.List;

public class AssetModelHelper {
private static final Logger LOG = LoggerFactory.getLogger(AssetModelHelper.class);

private final AssetModelManagement assetModelManagement;

public AssetModelHelper(AssetModelManagement assetModelManagement) {
this.assetModelManagement = assetModelManagement;
}


public AssetModelHelper() {
var genericStorage = StorageDispatcher.INSTANCE.getNoSqlStore()
.getGenericStorage();
assetModelManagement = new AssetModelManagement(genericStorage);
}

/**
* Removes the asset link with the given resource ID from all asset models.
*
* @param resourceId The ID of the resource to be removed from the asset links.
* @throws IOException If an I/O error occurs while fetching or updating asset models.
*/
public void removeAssetLinkFromAllAssets(String resourceId) throws IOException {
var allAssetModels = getAllAssetModelsFromStorage();

removeAssetLinksFromAssetModels(resourceId, allAssetModels);
}

private void removeAssetLinksFromAssetModels(String resourceId, List<SpAssetModel> allAssetModels)
throws IOException {
for (SpAssetModel assetModel : allAssetModels) {
removeAssetLinksFromAssetModelRecursively(assetModel, resourceId);
updateAssetModel(assetModel);
}
}

private void updateAssetModel(SpAssetModel assetModel) throws IOException {
try {
assetModelManagement.update(assetModel.getId(), assetModel);
} catch (IOException e) {
LOG.error("Could not fetch all asset models from storage", e);
throw new IOException("Could not fetch all asset models from storage", e);
}
}

private List<SpAssetModel> getAllAssetModelsFromStorage() throws IOException {
try {
return assetModelManagement.findAll();
} catch (IOException e) {
LOG.error("Could not fetch all asset models from storage", e);
throw new IOException("Could not fetch all asset models from storage", e);
}
}

/**
* This method removes the asset link from the asset model and recursively from all sub-assets.
*/
private void removeAssetLinksFromAssetModelRecursively(SpAssetModel assetModel, String resourceId) {
removeAssetLinks(assetModel, resourceId);

assetModel.getAssets()
.forEach(asset -> removeAssetLinksFromAsset(asset, resourceId));
}

/**
* Removes the resourceId from the asset links and recursively from all sub-assets.
*/
private void removeAssetLinksFromAsset(SpAsset asset, String resourceId) {
removeAssetLinks(asset, resourceId);

if (asset.getAssets() != null) {
asset.getAssets()
.forEach(subAsset -> removeAssetLinks(subAsset, resourceId));
}
}

/**
* Takes the asset as an input and removes the asset link with the given resource ID.
*/
private void removeAssetLinks(SpAsset asset, String resourceId) {
var assetLinks = asset.getAssetLinks();
if (assetLinks != null) {
assetLinks.removeIf(link -> resourceId.equals(link.getResourceId()));
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.apache.streampipes.assetmodel.management;

import org.apache.streampipes.model.assets.AssetLink;
import org.apache.streampipes.model.assets.SpAsset;
import org.apache.streampipes.model.assets.SpAssetModel;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

class AssetModelHelperTest {

private static final String RESOURCE_ID_TO_BE_REMOVED = "resourceId";

private AssetModelManagement assetModelManagement;
private AssetModelHelper assetModelHelper;

@BeforeEach
void setUp() {
assetModelManagement = Mockito.mock(AssetModelManagement.class);
assetModelHelper = new AssetModelHelper(assetModelManagement);
}

@Test
void removeAssetLinkFromAllAssets_FromSpAssetModel() throws IOException {
// Provide a sample asset model that contains the asset link to be removed and mock the asset model management
// findAll
var sampleAssetModel = getSampleAssetModel();
addAssetLink(sampleAssetModel, RESOURCE_ID_TO_BE_REMOVED);
when(assetModelManagement.findAll()).thenReturn(List.of(sampleAssetModel));

assetModelHelper.removeAssetLinkFromAllAssets(RESOURCE_ID_TO_BE_REMOVED);

// Verify that asset model was updated and does not contain the asset link anymore
verify(assetModelManagement, times(1)).update(sampleAssetModel.getId(), sampleAssetModel);
assertTrue(sampleAssetModel.getAssetLinks().isEmpty());
}

@Test
void removeAssetLinkFromAllAssets_FromSpAsset() throws IOException {
// Provide a sample asset model with one asset that contains the asset link to be removed and mock the asset model
// management findAll
var sampleAssetModel = getSampleAssetModel();
var asset = new SpAsset();
addAssetLink(asset, RESOURCE_ID_TO_BE_REMOVED);
sampleAssetModel.setAssets(List.of(asset));
when(assetModelManagement.findAll()).thenReturn(List.of(sampleAssetModel));

assetModelHelper.removeAssetLinkFromAllAssets(RESOURCE_ID_TO_BE_REMOVED);

// Verify that asset was updated and does not contain the asset link anymore
verify(assetModelManagement, times(1)).update(sampleAssetModel.getId(), sampleAssetModel);
assertTrue(sampleAssetModel.getAssets().get(0).getAssetLinks().isEmpty());
}

@Test
void removeAssetLinkFromAllAssets_IOExceptionOnReadingAssetModels() throws IOException {
when(assetModelManagement.findAll()).thenThrow(new IOException());

assertThrows(IOException.class, () -> assetModelHelper.removeAssetLinkFromAllAssets(RESOURCE_ID_TO_BE_REMOVED));
}

@Test
void removeAssetLinkFromAllAssets_IOExceptionWhenUpdatingModel() throws IOException {
var sampleAssetModel = getSampleAssetModel();
addAssetLink(sampleAssetModel, RESOURCE_ID_TO_BE_REMOVED);
when(assetModelManagement.findAll()).thenReturn(List.of(sampleAssetModel));

when(assetModelManagement.update(sampleAssetModel.getId(), sampleAssetModel)).thenThrow(new IOException());

assertThrows(IOException.class, () -> assetModelHelper.removeAssetLinkFromAllAssets(RESOURCE_ID_TO_BE_REMOVED));
}


private SpAssetModel getSampleAssetModel() {
var sampleAssetModel = new SpAssetModel();
sampleAssetModel.setId("1");

return sampleAssetModel;
}

private void addAssetLink(SpAsset asset, String assetLinkResourceId) {
var assetLink = new AssetLink();
assetLink.setResourceId(assetLinkResourceId);
asset.setAssetLinks(new ArrayList<>(List.of(assetLink)));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -205,4 +205,5 @@ void delete_ThrowsIOException() throws IOException {
assertThrows(IOException.class, () -> assetModelManagement.delete(SAMPLE_ASSET_MODEL_ID, REV));
}


}
5 changes: 5 additions & 0 deletions streampipes-connect-management/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,11 @@

<dependencies>
<!-- StreamPipes dependencies -->
<dependency>
<groupId>org.apache.streampipes</groupId>
<artifactId>streampipes-asset-model-management</artifactId>
<version>0.97.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.streampipes</groupId>
<artifactId>streampipes-connect-shared</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.streampipes.connect.management.management;

import org.apache.streampipes.assetmodel.management.AssetModelHelper;
import org.apache.streampipes.commons.exceptions.NoServiceEndpointsAvailableException;
import org.apache.streampipes.commons.exceptions.SepaParseException;
import org.apache.streampipes.commons.exceptions.connect.AdapterException;
Expand All @@ -37,6 +38,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.List;
import java.util.NoSuchElementException;

Expand All @@ -50,6 +52,7 @@ public class AdapterMasterManagement {
private final IAdapterStorage adapterInstanceStorage;
private final AdapterMetrics adapterMetrics;
private final AdapterResourceManager adapterResourceManager;
private final AssetModelHelper assetModelHelper;

private final DataStreamResourceManager dataStreamResourceManager;

Expand All @@ -63,6 +66,7 @@ public AdapterMasterManagement(
this.adapterMetrics = adapterMetrics;
this.adapterResourceManager = adapterResourceManager;
this.dataStreamResourceManager = dataStreamResourceManager;
this.assetModelHelper = new AssetModelHelper();
}

public void addAdapter(
Expand Down Expand Up @@ -127,11 +131,13 @@ public void deleteAdapter(String elementId) {

stopAdapterWithLogging(elementId);

deleteAdaterFromCouchDbAndFromLoggingService(elementId);
deleteAdaterFromCouchDbLoggingServiceAndAssetLinks(elementId);

deleteCorrespondingDataStream(adapterDescription);

}


private void stopAdapterWithLogging(String elementId) {
LOG.info("Attempting to stop adapter: {}", elementId);
try {
Expand All @@ -142,15 +148,25 @@ private void stopAdapterWithLogging(String elementId) {
}
}

private void deleteAdaterFromCouchDbAndFromLoggingService(String elementId) {
private void deleteAdaterFromCouchDbLoggingServiceAndAssetLinks(String elementId) {
adapterResourceManager.delete(elementId);
ExtensionsLogProvider.INSTANCE.remove(elementId);
removeAdapterFromAllAssetLinks(elementId);
LOG.info("Successfully deleted adapter in couchdb: {}", elementId);
}

private void removeAdapterFromAllAssetLinks(String elementId) {
try {
assetModelHelper.removeAssetLinkFromAllAssets(elementId);
} catch (IOException e) {
LOG.error("Failed to remove adapter from asset models: {}", elementId, e);
}
}

private void deleteCorrespondingDataStream(AdapterDescription adapterDescription) {
var correspondingDataStreamElementId = adapterDescription.getCorrespondingDataStreamElementId();
dataStreamResourceManager.delete(correspondingDataStreamElementId);
removeAdapterFromAllAssetLinks(correspondingDataStreamElementId);
LOG.info("Successfully deleted data stream in couchdb: {}", correspondingDataStreamElementId);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ public class AssetLink {
private String linkLabel;
private String queryHint;
private boolean editingDisabled;
private boolean navigationActive;

public AssetLink() {
}
Expand Down Expand Up @@ -71,6 +72,14 @@ public void setQueryHint(String queryHint) {
this.queryHint = queryHint;
}

public boolean isNavigationActive() {
return navigationActive;
}

public void setNavigationActive(boolean navigationActive) {
this.navigationActive = navigationActive;
}

@Override
public boolean equals(Object o) {
if (this == o) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.ArrayList;
import java.util.List;


public class AssetLinkType {

public final String appDocType = GenericDocTypes.DOC_ASSET_LINK_TYPE;
Expand Down
Loading

0 comments on commit 8385f8d

Please sign in to comment.