Skip to content

Commit

Permalink
feat: add job verticle to deploy missing entity verticles
Browse files Browse the repository at this point in the history
  • Loading branch information
halber committed Nov 8, 2023
1 parent 559342a commit 2bb7d41
Show file tree
Hide file tree
Showing 4 changed files with 367 additions and 7 deletions.
12 changes: 11 additions & 1 deletion src/main/java/io/neonbee/entity/EntityVerticle.java
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,17 @@ public final String getName() {
// Entity verticle are generally not exposed via any web interface, but only via the event bus. Also, they are
// generally never accessed directly, but only via the shared entity name map, so return a generated name here.
// The name must be unique in the Vert.x instance / cluster and the same for every entity verticle of this type.
return String.format("_%s-%d", getClass().getSimpleName(), getClass().getName().hashCode());
return getName(getClass());
}

/**
* Returns a unique name for a given EntityVerticle class.
*
* @param clazz the EntityVerticle class
* @return a unique name for a given EntityVerticle class
*/
public static String getName(Class<? extends EntityVerticle> clazz) {
return String.format("_%s-%d", clazz.getSimpleName(), clazz.getName().hashCode());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,15 @@
*/
public class ClusterEntityRegistry implements Registry<String> {

private static final String QUALIFIED_NAME = "qualifiedName";
/**
* The key for the qualified name.
*/
public static final String QUALIFIED_NAME_KEY = "qualifiedName";

private static final String ENTITY_NAME = "entityName";
/**
* The key for the entity name.
*/
public static final String ENTITY_NAME_KEY = "entityName";

@VisibleForTesting
final WriteSafeRegistry<JsonObject> clusteringInformation;
Expand All @@ -56,7 +62,7 @@ public ClusterEntityRegistry(Vertx vertx, String registryName) {

@VisibleForTesting
static JsonObject clusterRegistrationInformation(String sharedMapKey, String value) {
return JsonObject.of(QUALIFIED_NAME, value, ENTITY_NAME, sharedMapKey);
return JsonObject.of(QUALIFIED_NAME_KEY, value, ENTITY_NAME_KEY, sharedMapKey);
}

@Override
Expand Down Expand Up @@ -96,7 +102,7 @@ public Future<JsonArray> get(String sharedMapKey) {
* @param clusterNodeId the ID of the cluster node
* @return the future
*/
Future<JsonArray> getClusteringInformation(String clusterNodeId) {
public Future<JsonArray> getClusteringInformation(String clusterNodeId) {
return clusteringInformation.get(clusterNodeId);
}

Expand All @@ -123,8 +129,8 @@ public Future<Void> unregisterNode(String clusterNodeId) {
for (Object o : registeredEntities) {
if (remove(map, o)) {
JsonObject jo = (JsonObject) o;
String entityName = jo.getString(ENTITY_NAME);
String qualifiedName = jo.getString(QUALIFIED_NAME);
String entityName = jo.getString(ENTITY_NAME_KEY);
String qualifiedName = jo.getString(QUALIFIED_NAME_KEY);
futureList.add(unregister(entityName, qualifiedName));
}
}
Expand Down
195 changes: 195 additions & 0 deletions src/main/java/io/neonbee/job/RedeployEntitiesJob.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,195 @@
package io.neonbee.job;

import static io.neonbee.internal.deploy.DeployableVerticle.fromClass;
import static io.neonbee.internal.deploy.Deployables.fromDeployables;
import static io.neonbee.internal.scanner.DeployableScanner.scanForDeployableClasses;

import java.time.Duration;
import java.util.AbstractMap;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

import com.google.common.annotations.VisibleForTesting;

import io.neonbee.NeonBee;
import io.neonbee.NeonBeeDeployable;
import io.neonbee.NeonBeeProfile;
import io.neonbee.data.DataContext;
import io.neonbee.entity.EntityVerticle;
import io.neonbee.internal.Registry;
import io.neonbee.internal.cluster.ClusterHelper;
import io.neonbee.internal.cluster.entity.ClusterEntityRegistry;
import io.neonbee.internal.deploy.Deployable;
import io.neonbee.internal.deploy.Deployables;
import io.neonbee.logging.LoggingFacade;
import io.vertx.core.Future;
import io.vertx.core.Verticle;
import io.vertx.core.Vertx;
import io.vertx.core.json.JsonArray;
import io.vertx.core.json.JsonObject;

/**
* A job that redeploys all entity verticles that are not deployed in the cluster.
*/
@NeonBeeDeployable(namespace = NeonBeeDeployable.NEONBEE_NAMESPACE, autoDeploy = false)
public class RedeployEntitiesJob extends JobVerticle {

private static final LoggingFacade LOGGER = LoggingFacade.create(RedeployEntitiesJob.class);

private static final Duration DEFAULT_INTERVAL = Duration.ofMinutes(5L);

/**
* Create a new ReregisterEntitiesJob job verticle with the default configuration.
*/
public RedeployEntitiesJob() {
this(new JobSchedule(DEFAULT_INTERVAL));
}

/**
* Create a new ReregisterEntitiesJob job verticle.
*
* @param schedule the schedule to use when starting this job verticle
*/
public RedeployEntitiesJob(JobSchedule schedule) {
this(schedule, false);
}

/**
* Create a new ReregisterEntitiesJob job verticle. Optionally undeploy the verticle when the job execution ended
* (hit the end instant or one time execution)
*
* @param schedule the schedule to use when starting this job verticle
* @param undeployWhenDone if true, undeploy the verticle when done
*/
public RedeployEntitiesJob(JobSchedule schedule, boolean undeployWhenDone) {
super(schedule, undeployWhenDone);
}

@Override
public Future<?> execute(DataContext context) {
LOGGER.correlateWith(context).info("Start re registering entities");
long startTime = System.currentTimeMillis();

// get the currently deployed entity verticles
NeonBee neonBee = NeonBee.get(getVertx());
Registry<String> entityRegistry = neonBee.getEntityRegistry();
if (entityRegistry instanceof ClusterEntityRegistry) {
LOGGER.correlateWith(context).info("Getting registered entities from cluster");

ClusterEntityRegistry clusterEntityRegistry = ((ClusterEntityRegistry) entityRegistry);
Future<JsonArray> clusteringInformation = clusterEntityRegistry
.getClusteringInformation(ClusterHelper.getClusterNodeId(vertx))
.onSuccess(event -> LOGGER.correlateWith(context).info("Got registered entities from cluster"))
.onFailure(error -> LOGGER.correlateWith(context)
.error("Failed getting registered entities from cluster", error));

var entitiesFromClassPath = classPathEntityVertilces(vertx)
.onSuccess(event -> LOGGER.correlateWith(context)
.info("Finished reregistering entities. took {} ms",
System.currentTimeMillis() - startTime))
.onFailure(error -> LOGGER.correlateWith(context)
.error("Failed reregistering entities", error));

return Future.all(clusteringInformation, entitiesFromClassPath)
.map(compositeFuture -> findNotDeployedEntityVerticles(
context,
entitiesFromClassPath.result(),
clusteringInformation.result()))
.compose(difference -> deployNotDeployedEntityVerticles(context, neonBee, difference))
.onFailure(error -> LOGGER.correlateWith(context)
.error("Failed getting registered entities from cluster", error));
} else {
// if it is not a clustered deployment we have nothing to do
return Future.succeededFuture();
}
}

/**
* Find all entity verticles that are not deployed in the cluster.
*
* @param context the data context
* @param classPathEntitiesMap the entity verticles from the class path
* @param clusteringInformation the clustering information
* @return a map of entity verticles that are not deployed in the cluster
*/
private Map<String, Class<? extends EntityVerticle>> findNotDeployedEntityVerticles(
DataContext context,
Map<String, Class<? extends EntityVerticle>> classPathEntitiesMap,
JsonArray clusteringInformation) {
Set<String> deployedEntitiesSet = qualifiedNamesSet(context, clusteringInformation);
Map<String, Class<? extends EntityVerticle>> difference = new HashMap<>(classPathEntitiesMap);
difference.keySet().removeAll(deployedEntitiesSet);
return difference;
}

private Set<String> qualifiedNamesSet(DataContext context, JsonArray clusteringInformation) {
Set<String> deployedEntitiesSet;
if (clusteringInformation == null) {
LOGGER.correlateWith(context).info("No entities registered in cluster");
deployedEntitiesSet = Set.of();
} else {
deployedEntitiesSet = clusteringInformation
.stream()
.map(jo -> (JsonObject) jo)
.map(jo -> jo.getString(ClusterEntityRegistry.QUALIFIED_NAME_KEY))
.collect(Collectors.toSet());
}
return deployedEntitiesSet;
}

private Future<Object> deployNotDeployedEntityVerticles(DataContext context, NeonBee neonBee,
Map<String, Class<? extends EntityVerticle>> difference) {
if (difference.isEmpty()) {
LOGGER.correlateWith(context).info("Everything OK, all EntityVerticles are deployed.");
return Future.succeededFuture();
} else {
List<Future<? extends Deployable>> toDeplyoe = difference.entrySet()
.stream()
.peek(stringClassEntry -> LOGGER.correlateWith(context).warn(
"Entity verticles with qualified name \"{}\" will be deployed.",
stringClassEntry.getKey()))
.map(stringClassEntry -> fromClass(vertx, stringClassEntry.getValue()))
.collect(Collectors.toList());

return fromDeployables(toDeplyoe)
.compose(Deployables.allTo(neonBee))
.onSuccess(deployment -> LOGGER.correlateWith(context)
.info("Entity verticles deployed. \"{}\"", deployment.getDeployable().getIdentifier()))
.mapEmpty();
}
}

private Future<Map<String, Class<? extends EntityVerticle>>> classPathEntityVertilces(Vertx vertx) {
return scanForDeployableClasses(vertx).map(verticles -> verticles.stream()
.filter(EntityVerticle.class::isAssignableFrom)
.filter(verticleClass -> filterByAutoDeployAndProfiles(verticleClass, activeProfiles()))
.map(verticleClass -> (Class<? extends EntityVerticle>) verticleClass)
.map(verticleClass -> {
NeonBeeDeployable annotation = verticleClass.getAnnotation(NeonBeeDeployable.class);
String namespace = annotation != null ? annotation.namespace() + "/" : "";

return new AbstractMap.SimpleEntry<String, Class<? extends EntityVerticle>>(
namespace + EntityVerticle.getName(verticleClass),
verticleClass);
})
.collect(Collectors.toMap(
Map.Entry::getKey,
Map.Entry::getValue,
(existingValue, newValue) -> existingValue)));
}

private Set<NeonBeeProfile> activeProfiles() {
return NeonBee.get(getVertx()).getOptions().getActiveProfiles();
}

@VisibleForTesting
boolean filterByAutoDeployAndProfiles(Class<? extends Verticle> verticleClass,
Collection<NeonBeeProfile> activeProfiles) {
NeonBeeDeployable annotation = verticleClass.getAnnotation(NeonBeeDeployable.class);
return annotation.autoDeploy() && annotation.profile().isActive(activeProfiles);
}
}
Loading

0 comments on commit 2bb7d41

Please sign in to comment.