Skip to content

Commit

Permalink
Propagate readiness status to all datacenters (#1826)
Browse files Browse the repository at this point in the history
This is part of the feature of falling back to remote datacenters when the local one is unhealthy during publishing. Considering that hermes-frontend has access only to ZooKeeper in its local datacenter, it needs to have information there on the readiness status of all datacenters to select only those that are ready. Before this change, only the readiness status of the local datacenter was stored in the local ZooKeeper.
  • Loading branch information
piotrrzysko authored Mar 4, 2024
1 parent f29ae97 commit f2cef4c
Show file tree
Hide file tree
Showing 32 changed files with 294 additions and 244 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ public int hashCode() {

public enum ReadinessStatus {
READY,
NOT_READY,
UNDEFINED
NOT_READY
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package pl.allegro.tech.hermes.benchmark.environment;

import pl.allegro.tech.hermes.frontend.server.ReadinessChecker;
import pl.allegro.tech.hermes.frontend.readiness.ReadinessChecker;

class DisabledReadinessChecker implements ReadinessChecker {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import pl.allegro.tech.hermes.frontend.publishing.message.MessageContentTypeEnforcer;
import pl.allegro.tech.hermes.frontend.publishing.message.MessageFactory;
import pl.allegro.tech.hermes.frontend.publishing.metadata.DefaultHeadersPropagator;
import pl.allegro.tech.hermes.frontend.readiness.HealthCheckService;
import pl.allegro.tech.hermes.frontend.server.HermesServer;
import pl.allegro.tech.hermes.frontend.validator.MessageValidators;
import pl.allegro.tech.hermes.metrics.PathsCompiler;
Expand Down Expand Up @@ -70,6 +71,7 @@ static HermesServer provideHermesServer() throws IOException {
hermesServerProperties,
metricsFacade,
httpHandler,
new HealthCheckService(),
new DisabledReadinessChecker(false),
new NoOpMessagePreviewPersister(),
throughputLimiter,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

public interface KafkaParameters {

String getDatacenter();

boolean isAuthenticationEnabled();

String getAuthenticationMechanism();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package pl.allegro.tech.hermes.domain.readiness;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import pl.allegro.tech.hermes.api.DatacenterReadiness;

import java.util.List;

public record DatacenterReadinessList(List<DatacenterReadiness> datacenters) {
@JsonCreator
public DatacenterReadinessList(@JsonProperty("datacenters") List<DatacenterReadiness> datacenters) {
this.datacenters = datacenters;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,7 @@ public class ZookeeperPaths {
public static final String MAX_RATE_PATH = "max-rate";
public static final String MAX_RATE_HISTORY_PATH = "history";
public static final String STORAGE_HEALTH_PATH = "storage-health";
public static final String FRONTEND_PATH = "frontend";
public static final String READINESS_PATH = "readiness";
public static final String DATACENTER_READINESS_PATH = "datacenter-readiness";
public static final String OFFLINE_RETRANSMISSION_PATH = "offline-retransmission";
public static final String OFFLINE_RETRANSMISSION_TASKS_PATH = "tasks";

Expand Down Expand Up @@ -151,8 +150,8 @@ public String nodeHealthPathForManagementHost(String host, String port) {
return Joiner.on(URL_SEPARATOR).join(basePath, STORAGE_HEALTH_PATH, String.format("%s_%s", host, port));
}

public String frontendReadinessPath() {
return Joiner.on(URL_SEPARATOR).join(basePath, FRONTEND_PATH, READINESS_PATH);
public String datacenterReadinessPath() {
return Joiner.on(URL_SEPARATOR).join(basePath, DATACENTER_READINESS_PATH);
}

public String offlineRetransmissionPath() {
Expand Down
2 changes: 1 addition & 1 deletion hermes-console/json-server/db.json
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@
},
{
"datacenter": "DC3",
"status": "UNDEFINED"
"status": "READY"
}
],
"constraints": {
Expand Down
2 changes: 1 addition & 1 deletion hermes-console/src/api/datacenter-readiness.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
export interface DatacenterReadiness {
datacenter: string;
status: 'READY' | 'NOT_READY' | 'UNDEFINED';
status: 'READY' | 'NOT_READY';
}

export interface Readiness {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import pl.allegro.tech.hermes.frontend.cache.topic.TopicsCache;
import pl.allegro.tech.hermes.frontend.listeners.BrokerListeners;
import pl.allegro.tech.hermes.frontend.producer.BrokerMessageProducer;
import pl.allegro.tech.hermes.frontend.services.HealthCheckService;
import pl.allegro.tech.hermes.frontend.validator.MessageValidators;
import pl.allegro.tech.hermes.frontend.validator.TopicMessageValidator;
import pl.allegro.tech.hermes.infrastructure.zookeeper.ZookeeperPaths;
Expand Down Expand Up @@ -75,11 +74,6 @@ public BlacklistZookeeperNotifyingCache blacklistZookeeperNotifyingCache(Curator
return new BlacklistZookeeperNotifyingCache(curator, zookeeperPaths);
}

@Bean(initMethod = "startup")
public HealthCheckService healthCheckService() {
return new HealthCheckService();
}

@Bean
public BrokerListeners defaultBrokerListeners() {
return new BrokerListeners();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
package pl.allegro.tech.hermes.frontend.config;

import com.fasterxml.jackson.databind.ObjectMapper;
import io.micrometer.prometheus.PrometheusMeterRegistry;
import io.undertow.server.HttpHandler;
import org.apache.curator.framework.CuratorFramework;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
Expand All @@ -13,14 +11,14 @@
import pl.allegro.tech.hermes.frontend.producer.BrokerMessageProducer;
import pl.allegro.tech.hermes.frontend.publishing.handlers.ThroughputLimiter;
import pl.allegro.tech.hermes.frontend.publishing.preview.DefaultMessagePreviewPersister;
import pl.allegro.tech.hermes.frontend.server.DefaultReadinessChecker;
import pl.allegro.tech.hermes.frontend.readiness.HealthCheckService;
import pl.allegro.tech.hermes.frontend.readiness.ReadinessChecker;
import pl.allegro.tech.hermes.frontend.server.HermesServer;
import pl.allegro.tech.hermes.frontend.server.SslContextFactoryProvider;
import pl.allegro.tech.hermes.frontend.server.TopicMetadataLoadingJob;
import pl.allegro.tech.hermes.frontend.server.TopicMetadataLoadingRunner;
import pl.allegro.tech.hermes.frontend.server.TopicMetadataLoadingStartupHook;
import pl.allegro.tech.hermes.frontend.server.TopicSchemaLoadingStartupHook;
import pl.allegro.tech.hermes.infrastructure.zookeeper.ZookeeperPaths;
import pl.allegro.tech.hermes.schema.SchemaRepository;

import java.util.Optional;
Expand All @@ -39,7 +37,8 @@ public HermesServer hermesServer(HermesServerProperties hermesServerProperties,
SslProperties sslProperties,
MetricsFacade metricsFacade,
HttpHandler publishingHandler,
DefaultReadinessChecker defaultReadinessChecker,
HealthCheckService healthCheckService,
ReadinessChecker readinessChecker,
DefaultMessagePreviewPersister defaultMessagePreviewPersister,
ThroughputLimiter throughputLimiter,
TopicMetadataLoadingJob topicMetadataLoadingJob,
Expand All @@ -51,7 +50,8 @@ public HermesServer hermesServer(HermesServerProperties hermesServerProperties,
hermesServerProperties,
metricsFacade,
publishingHandler,
defaultReadinessChecker,
healthCheckService,
readinessChecker,
defaultMessagePreviewPersister,
throughputLimiter,
topicMetadataLoadingJob,
Expand All @@ -60,23 +60,6 @@ public HermesServer hermesServer(HermesServerProperties hermesServerProperties,
prometheusMeterRegistry);
}

@Bean
public DefaultReadinessChecker readinessChecker(ReadinessCheckProperties readinessCheckProperties,
TopicMetadataLoadingRunner topicMetadataLoadingRunner,
CuratorFramework zookeeper,
ZookeeperPaths paths,
ObjectMapper mapper) {
return new DefaultReadinessChecker(
topicMetadataLoadingRunner,
zookeeper,
paths,
mapper,
readinessCheckProperties.isEnabled(),
readinessCheckProperties.isKafkaCheckEnabled(),
readinessCheckProperties.getInterval()
);
}

@Bean
public SslContextFactoryProvider sslContextFactoryProvider(Optional<SslContextFactory> sslContextFactory,
SslProperties sslProperties) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package pl.allegro.tech.hermes.frontend.config;

import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.curator.framework.CuratorFramework;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import pl.allegro.tech.hermes.frontend.readiness.AdminReadinessService;
import pl.allegro.tech.hermes.frontend.readiness.DefaultReadinessChecker;
import pl.allegro.tech.hermes.frontend.readiness.HealthCheckService;
import pl.allegro.tech.hermes.frontend.server.TopicMetadataLoadingRunner;
import pl.allegro.tech.hermes.infrastructure.dc.DatacenterNameProvider;
import pl.allegro.tech.hermes.infrastructure.zookeeper.ZookeeperPaths;

@Configuration
@EnableConfigurationProperties({ReadinessCheckProperties.class})
public class ReadinessConfiguration {

@Bean
public DefaultReadinessChecker readinessChecker(ReadinessCheckProperties readinessCheckProperties,
TopicMetadataLoadingRunner topicMetadataLoadingRunner,
AdminReadinessService adminReadinessService) {
return new DefaultReadinessChecker(
topicMetadataLoadingRunner,
adminReadinessService,
readinessCheckProperties.isEnabled(),
readinessCheckProperties.isKafkaCheckEnabled(),
readinessCheckProperties.getInterval()
);
}

@Bean(initMethod = "start", destroyMethod = "stop")
public AdminReadinessService adminReadinessService(ObjectMapper mapper,
CuratorFramework zookeeper,
ZookeeperPaths paths,
DatacenterNameProvider datacenterNameProvider) {
String localDatacenterName = datacenterNameProvider.getDatacenterName();
return new AdminReadinessService(mapper, zookeeper, paths, localDatacenterName);
}

@Bean(initMethod = "startup")
public HealthCheckService healthCheckService() {
return new HealthCheckService();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package pl.allegro.tech.hermes.frontend.readiness;

import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.NodeCache;
import org.apache.curator.framework.recipes.cache.NodeCacheListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pl.allegro.tech.hermes.api.DatacenterReadiness;
import pl.allegro.tech.hermes.common.exception.InternalProcessingException;
import pl.allegro.tech.hermes.domain.readiness.DatacenterReadinessList;
import pl.allegro.tech.hermes.infrastructure.zookeeper.ZookeeperPaths;

import java.util.Collections;
import java.util.Map;
import java.util.stream.Collectors;

import static pl.allegro.tech.hermes.api.DatacenterReadiness.ReadinessStatus.READY;

public class AdminReadinessService implements NodeCacheListener {

private static final Logger logger = LoggerFactory.getLogger(AdminReadinessService.class);

private final NodeCache cache;
private final ObjectMapper mapper;
private final String localDatacenterName;

private volatile Map<String, Boolean> readinessPerDatacenter;

public AdminReadinessService(ObjectMapper mapper,
CuratorFramework curator,
ZookeeperPaths paths,
String localDatacenterName) {
this.mapper = mapper;
this.localDatacenterName = localDatacenterName;
this.cache = new NodeCache(curator, paths.datacenterReadinessPath());
cache.getListenable().addListener(this);
try {
cache.start(true);
} catch (Exception e) {
throw new InternalProcessingException("Readiness cache cannot start.", e);
}
}

public void start() {
refreshAdminReady();
}

public void stop() {
try {
cache.close();
} catch (Exception e) {
logger.warn("Failed to stop readiness cache", e);
}
}

@Override
public void nodeChanged() {
refreshAdminReady();
}

private void refreshAdminReady() {
try {
ChildData nodeData = cache.getCurrentData();
if (nodeData != null) {
byte[] data = nodeData.getData();
DatacenterReadinessList readiness = mapper.readValue(data, DatacenterReadinessList.class);
readinessPerDatacenter = readiness.datacenters().stream()
.collect(Collectors.toMap(DatacenterReadiness::getDatacenter, e -> e.getStatus() == READY));
} else {
readinessPerDatacenter = Collections.emptyMap();
}
} catch (Exception e) {
logger.error("Failed reloading readiness cache.", e);
}
}

public boolean isLocalDatacenterReady() {
return isDatacenterReady(localDatacenterName);
}

public boolean isDatacenterReady(String datacenter) {
return readinessPerDatacenter.getOrDefault(datacenter, true);
}
}
Loading

0 comments on commit f2cef4c

Please sign in to comment.