Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add timeout for deployments #420

Merged
merged 1 commit into from
Nov 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,11 @@ public class NeonBeeConfigConverter {
static void fromJson(Iterable<java.util.Map.Entry<String, Object>> json, NeonBeeConfig obj) {
for (java.util.Map.Entry<String, Object> member : json) {
switch (member.getKey()) {
case "deploymentTimeout":
if (member.getValue() instanceof Number) {
obj.setDeploymentTimeout(((Number) member.getValue()).intValue());
}
break;
case "eventBusCodecs":
if (member.getValue() instanceof JsonObject) {
java.util.Map<String, java.lang.String> map = new java.util.LinkedHashMap<>();
Expand Down Expand Up @@ -62,6 +67,16 @@ static void fromJson(Iterable<java.util.Map.Entry<String, Object>> json, NeonBee
obj.setMicrometerRegistries(list);
}
break;
case "modelsDeploymentTimeout":
if (member.getValue() instanceof Number) {
obj.setModelsDeploymentTimeout(((Number) member.getValue()).intValue());
}
break;
case "moduleDeploymentTimeout":
if (member.getValue() instanceof Number) {
obj.setModuleDeploymentTimeout(((Number) member.getValue()).intValue());
}
break;
case "platformClasses":
if (member.getValue() instanceof JsonArray) {
java.util.ArrayList<java.lang.String> list = new java.util.ArrayList<>();
Expand All @@ -82,6 +97,11 @@ static void fromJson(Iterable<java.util.Map.Entry<String, Object>> json, NeonBee
obj.setTrackingDataHandlingStrategy((String) member.getValue());
}
break;
case "verticleDeploymentTimeout":
if (member.getValue() instanceof Number) {
obj.setVerticleDeploymentTimeout(((Number) member.getValue()).intValue());
}
break;
}
}
}
Expand All @@ -91,6 +111,7 @@ static void toJson(NeonBeeConfig obj, JsonObject json) {
}

static void toJson(NeonBeeConfig obj, java.util.Map<String, Object> json) {
json.put("deploymentTimeout", obj.getDeploymentTimeout());
if (obj.getEventBusCodecs() != null) {
JsonObject map = new JsonObject();
obj.getEventBusCodecs().forEach((key, value) -> map.put(key, value));
Expand All @@ -109,6 +130,12 @@ static void toJson(NeonBeeConfig obj, java.util.Map<String, Object> json) {
obj.getMicrometerRegistries().forEach(item -> array.add(item.toJson()));
json.put("micrometerRegistries", array);
}
if (obj.getModelsDeploymentTimeout() != null) {
json.put("modelsDeploymentTimeout", obj.getModelsDeploymentTimeout());
}
if (obj.getModuleDeploymentTimeout() != null) {
json.put("moduleDeploymentTimeout", obj.getModuleDeploymentTimeout());
}
if (obj.getPlatformClasses() != null) {
JsonArray array = new JsonArray();
obj.getPlatformClasses().forEach(item -> array.add(item));
Expand All @@ -120,5 +147,8 @@ static void toJson(NeonBeeConfig obj, java.util.Map<String, Object> json) {
if (obj.getTrackingDataHandlingStrategy() != null) {
json.put("trackingDataHandlingStrategy", obj.getTrackingDataHandlingStrategy());
}
if (obj.getVerticleDeploymentTimeout() != null) {
json.put("verticleDeploymentTimeout", obj.getVerticleDeploymentTimeout());
}
}
}
129 changes: 129 additions & 0 deletions src/main/java/io/neonbee/config/NeonBeeConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,16 @@
import static io.neonbee.internal.helper.ConfigHelper.notFound;
import static io.neonbee.internal.helper.ConfigHelper.readConfig;
import static io.neonbee.internal.helper.ConfigHelper.rephraseConfigNames;
import static io.neonbee.internal.helper.StringHelper.EMPTY;
import static io.vertx.core.Future.future;
import static io.vertx.core.Future.succeededFuture;

import java.lang.reflect.InvocationTargetException;
import java.nio.file.Path;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Stream;

import com.fasterxml.jackson.core.StreamReadConstraints;
Expand Down Expand Up @@ -42,6 +45,11 @@ public class NeonBeeConfig {
*/
public static final int DEFAULT_EVENT_BUS_TIMEOUT = 30;

/**
* The default timeout for a deployment to finish.
*/
public static final int DEFAULT_DEPLOYMENT_TIMEOUT = 30;

/**
* The default tracking data handling strategy.
*/
Expand All @@ -57,6 +65,14 @@ public class NeonBeeConfig {

private int eventBusTimeout = DEFAULT_EVENT_BUS_TIMEOUT;

private int deploymentTimeout = DEFAULT_DEPLOYMENT_TIMEOUT;

private Integer modelsDeploymentTimeout;

private Integer moduleDeploymentTimeout;

private Integer verticleDeploymentTimeout;
carlspring marked this conversation as resolved.
Show resolved Hide resolved

private Map<String, String> eventBusCodecs = Map.of();

private String trackingDataHandlingStrategy = DEFAULT_TRACKING_DATA_HANDLING_STRATEGY;
Expand Down Expand Up @@ -231,6 +247,119 @@ public NeonBeeConfig setEventBusTimeout(int eventBusTimeout) {
return this;
}

/**
* Returns the general deployment timeout for an individual deployment of any type in seconds. If unset / equal or
* smaller than 0, no timeout applies to the deployment.
*
* @return the deployment timeout in seconds
*/
public int getDeploymentTimeout() {
return deploymentTimeout;
}

/**
* Returns the deployment timeout for a given deployment type, or the general {@link #getDeploymentTimeout()} if the
* deployment timeout for a given type is unset / equal or smaller than zero or an unrecognized type / {@code null}
* was passed.
*
* @param deploymentType the type of the deployment, e.g. modules, module, verticle or {@code null}
* @return the individual or general deployment timeout in seconds
*/
public int getDeploymentTimeout(String deploymentType) {
switch (Optional.ofNullable(deploymentType).map(type -> type.toLowerCase(Locale.ROOT)).orElse(EMPTY)) {
case "models":
return getModelsDeploymentTimeout();
case "module":
return getModuleDeploymentTimeout();
case "verticle":
return getVerticleDeploymentTimeout();
default:
return getDeploymentTimeout();
}
}

/**
* Set the general deployment timeout for an individual deployment of any type in seconds. If equal or smaller than
* 0, no timeout applies to the deployment.
*
* @param deploymentTimeout the deployment timeout in seconds
* @return the {@linkplain NeonBeeConfig} for fluent use
*/
@Fluent
public NeonBeeConfig setDeploymentTimeout(int deploymentTimeout) {
this.deploymentTimeout = deploymentTimeout;
return this;
}

/**
* Returns the deployment timeout of an individual models deployment in seconds. If unset the general
* {@link #getDeploymentTimeout()} is returned.
*
* @return the deployment timeout in seconds
*/
public Integer getModelsDeploymentTimeout() {
return modelsDeploymentTimeout != null ? modelsDeploymentTimeout : getDeploymentTimeout();
}

/**
* Set the deployment timeout of an individual models deployment in seconds. If equal or smaller than 0, no timeout
* applies to the deployment. If set to {@code null} the general {@link #getDeploymentTimeout()} applies.
*
* @param modelsDeploymentTimeout the deployment timeout in seconds or {@code null}
* @return the {@linkplain NeonBeeConfig} for fluent use
*/
@Fluent
public NeonBeeConfig setModelsDeploymentTimeout(Integer modelsDeploymentTimeout) {
this.modelsDeploymentTimeout = modelsDeploymentTimeout;
return this;
}

/**
* Returns the deployment timeout of an individual module deployment in seconds. If unset the general
* {@link #getDeploymentTimeout()} is returned.
*
* @return the deployment timeout in seconds
*/
public Integer getModuleDeploymentTimeout() {
return moduleDeploymentTimeout != null ? moduleDeploymentTimeout : getDeploymentTimeout();
}

/**
* Set the deployment timeout of an individual module deployment in seconds. If equal or smaller than 0, no timeout
* applies to the deployment. If set to {@code null} the general {@link #getDeploymentTimeout()} applies.
*
* @param moduleDeploymentTimeout the deployment timeout in seconds or {@code null}
* @return the {@linkplain NeonBeeConfig} for fluent use
*/
@Fluent
public NeonBeeConfig setModuleDeploymentTimeout(Integer moduleDeploymentTimeout) {
this.moduleDeploymentTimeout = moduleDeploymentTimeout;
return this;
}

/**
* Returns the deployment timeout of an individual verticle deployment in seconds. If unset the general
* {@link #getDeploymentTimeout()} is returned.
*
* @return the deployment timeout in seconds
*/
public Integer getVerticleDeploymentTimeout() {
return verticleDeploymentTimeout != null ? verticleDeploymentTimeout : getDeploymentTimeout();
}

/**
* Set the deployment timeout of an individual verticle deployment in seconds. If equal or smaller than 0, no
* timeout applies to the deployment. If set to {@code null} the general {@link #getDeploymentTimeout()} applies.
*
* @param verticleDeploymentTimeout the deployment timeout in seconds or {@code null}
* @return the {@linkplain NeonBeeConfig} for fluent use
*/
@Fluent
public NeonBeeConfig setVerticleDeploymentTimeout(Integer verticleDeploymentTimeout) {
this.verticleDeploymentTimeout = verticleDeploymentTimeout;
return this;
}

/**
* Gets a list of default codecs to register on the event bus.
* <p>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,16 @@
import static io.vertx.core.Future.failedFuture;
import static io.vertx.core.Future.succeededFuture;

import java.util.concurrent.TimeUnit;
import java.util.function.Function;

import io.neonbee.NeonBee;
import io.neonbee.logging.LoggingFacade;
import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.core.impl.ContextInternal;
import io.vertx.core.impl.future.FutureInternal;
import io.vertx.core.impl.future.Listener;
Expand All @@ -24,7 +27,23 @@ public abstract class PendingDeployment extends Deployment implements FutureInte

LOGGER.info("Started deployment of {} ...", deployable);

this.deployFuture = deployFuture.map(deploymentId -> {
Future<String> timeoutFuture = deployFuture;
int timeout = neonBee.getConfig().getDeploymentTimeout(deployable.getType());
if (timeout > 0) {
Vertx vertx = neonBee.getVertx();
Promise<String> timeoutPromise = Promise.promise();
// fail the promise after the timeout is expired
long timerId = vertx.setTimer(TimeUnit.SECONDS.toMillis(timeout), nothing -> {
timeoutPromise.fail("Deployment timed-out after " + timeout + " seconds");
});
// in case the deployment finished, it completes the promise and we can cancel the timer
deployFuture.onComplete(timeoutPromise).onComplete(deploymentId -> {
vertx.cancelTimer(timerId);
});
timeoutFuture = timeoutPromise.future();
}

this.deployFuture = timeoutFuture.map(deploymentId -> {
// in case a deployment doesn't want to specify a own deployment ID, generate one based on the hash code of
// the pending deployment (thus all deployables, might just return an empty future)
return deploymentId != null ? deploymentId : super.getDeploymentId();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
package io.neonbee.internal.deploy;

import static com.google.common.truth.Truth.assertThat;
import static io.neonbee.NeonBeeMockHelper.defaultVertxMock;
import static io.neonbee.NeonBeeMockHelper.registerNeonBeeMock;
import static io.neonbee.internal.deploy.DeploymentTest.newNeonBeeMockForDeployment;
import static io.vertx.core.Future.failedFuture;
import static io.vertx.core.Future.succeededFuture;
import static java.nio.charset.StandardCharsets.UTF_8;
Expand Down Expand Up @@ -58,13 +57,12 @@ void testDeployUndeploy() throws NoSuchFieldException, IllegalAccessException {
Map.of("okay", new JsonObject().put("namespace", "test").toBuffer().getBytes()), Map.of());
DeployableModels deployable = new DeployableModels(definition);

Vertx vertxMock = defaultVertxMock();
NeonBee neonBee = registerNeonBeeMock(vertxMock, new NeonBeeOptions.Mutable().setIgnoreClassPath(true));
NeonBee neonBeeMock = newNeonBeeMockForDeployment(new NeonBeeOptions.Mutable().setIgnoreClassPath(true));

PendingDeployment deployment = deployable.deploy(neonBee);
PendingDeployment deployment = deployable.deploy(neonBeeMock);
assertThat(deployment.succeeded()).isTrue();
Set<EntityModelDefinition> definitions =
ReflectionHelper.getValueOfPrivateField(neonBee.getModelManager(), "externalModelDefinitions");
ReflectionHelper.getValueOfPrivateField(neonBeeMock.getModelManager(), "externalModelDefinitions");
assertThat(definitions).contains(definition);

assertThat(deployment.undeploy().succeeded()).isTrue();
Expand All @@ -77,11 +75,11 @@ void testDeployFailed() {
EntityModelDefinition definition = new EntityModelDefinition(Map.of(), Map.of());
DeployableModels deployable = new DeployableModels(definition);

Vertx vertxMock = defaultVertxMock();
NeonBee neonBeeMock = newNeonBeeMockForDeployment(new NeonBeeOptions.Mutable().setIgnoreClassPath(true));
Vertx vertxMock = neonBeeMock.getVertx();
when(vertxMock.fileSystem().readDir(any())).thenReturn(failedFuture("any failure"));
NeonBee neonBee = registerNeonBeeMock(vertxMock, new NeonBeeOptions.Mutable().setIgnoreClassPath(true));

PendingDeployment deployment = deployable.deploy(neonBee);
PendingDeployment deployment = deployable.deploy(neonBeeMock);
assertThat(deployment.failed()).isTrue();
assertThat(deployment.cause()).hasMessageThat().isEqualTo("any failure");
assertThat(deployment.undeploy().succeeded()).isTrue();
Expand All @@ -90,7 +88,8 @@ void testDeployFailed() {
@Test
@DisplayName("test read model payloads")
void testReadModelPayloads() {
Vertx vertxMock = defaultVertxMock();
NeonBee neonBeeMock = newNeonBeeMockForDeployment();
Vertx vertxMock = neonBeeMock.getVertx();

ClassLoader classLoaderMock = mock(ClassLoader.class);
when(classLoaderMock.getResourceAsStream(any())).thenAnswer(invocation -> {
Expand All @@ -106,7 +105,8 @@ void testReadModelPayloads() {
@Test
@DisplayName("test scan class path")
void testScanClassPath() {
Vertx vertxMock = defaultVertxMock();
NeonBee neonBeeMock = newNeonBeeMockForDeployment();
Vertx vertxMock = neonBeeMock.getVertx();

ClassPathScanner classPathScannerMock = mock(ClassPathScanner.class);
when(classPathScannerMock.scanManifestFiles(any(), any())).thenReturn(succeededFuture(List.of("entry")));
Expand All @@ -129,8 +129,11 @@ void testScanClassPath() {
@Test
@DisplayName("test from JAR")
void testFromJar() throws IOException {
NeonBee neonBeeMock = newNeonBeeMockForDeployment();
Vertx vertxMock = neonBeeMock.getVertx();

NeonBeeModuleJar moduleJar = NeonBeeModuleJar.create("testmodule").withModels().build();
Future<DeployableModels> deployable = DeployableModels.fromJar(defaultVertxMock(), moduleJar.writeToTempPath());
Future<DeployableModels> deployable = DeployableModels.fromJar(vertxMock, moduleJar.writeToTempPath());
assertThat(deployable.succeeded()).isTrue();
assertThat(deployable.result().modelDefinition.getCSNModelDefinitions())
.comparingValuesUsing(Correspondence.<byte[], byte[]>from(Arrays::equals, "is not equal to"))
Expand Down
Loading
Loading