From 6ac25b6b6af22f29a11ea427bd55d2b2f75d18d9 Mon Sep 17 00:00:00 2001 From: James Roper Date: Mon, 13 May 2019 15:48:23 +1000 Subject: [PATCH] Cluster bootstrap remoting probe method --- .../src/main/resources/reference.conf | 20 ++- .../cluster/bootstrap/ClusterBootstrap.scala | 30 ++-- .../bootstrap/ClusterBootstrapSettings.scala | 8 +- .../HttpClusterBootstrapRoutes.scala | 28 +--- .../AbstractContactPointBootstrap.scala | 122 +++++++++++++++ .../internal/BootstrapCoordinator.scala | 71 +++++---- .../BootstrapProtocolSerializer.scala | 37 +++++ .../bootstrap/internal/ContactPoint.scala | 38 +++++ .../internal/HttpContactPointBootstrap.scala | 121 ++++----------- .../internal/RemotingContactPoint.scala | 30 ++++ .../RemotingContactPointBootstrap.scala | 49 ++++++ ...HttpClusterBootstrapIntegrationSpec.scala} | 4 +- ...otingClusterBootstrapIntegrationSpec.scala | 142 ++++++++++++++++++ 13 files changed, 540 insertions(+), 160 deletions(-) create mode 100644 cluster-bootstrap/src/main/scala/akka/management/cluster/bootstrap/internal/AbstractContactPointBootstrap.scala create mode 100644 cluster-bootstrap/src/main/scala/akka/management/cluster/bootstrap/internal/BootstrapProtocolSerializer.scala create mode 100644 cluster-bootstrap/src/main/scala/akka/management/cluster/bootstrap/internal/ContactPoint.scala create mode 100644 cluster-bootstrap/src/main/scala/akka/management/cluster/bootstrap/internal/RemotingContactPoint.scala create mode 100644 cluster-bootstrap/src/main/scala/akka/management/cluster/bootstrap/internal/RemotingContactPointBootstrap.scala rename cluster-bootstrap/src/test/scala/akka/management/cluster/bootstrap/contactpoint/{ClusterBootstrapIntegrationSpec.scala => HttpClusterBootstrapIntegrationSpec.scala} (97%) create mode 100644 cluster-bootstrap/src/test/scala/akka/management/cluster/bootstrap/contactpoint/RemotingClusterBootstrapIntegrationSpec.scala diff --git a/cluster-bootstrap/src/main/resources/reference.conf b/cluster-bootstrap/src/main/resources/reference.conf index 1372e67ce..1036194f8 100644 --- a/cluster-bootstrap/src/main/resources/reference.conf +++ b/cluster-bootstrap/src/main/resources/reference.conf @@ -99,9 +99,14 @@ akka.management { # Configured how we communicate with the contact point once it is discovered contact-point { + # The probe method. Valid values are akka-management and remoting. If akka-management, will use akka-management + # HTTP interface to discover seed nodes. If remoting, will use Akka remoting to discover seed nodes. + probe-method = "akka-management" + # If no port is discovered along with the host/ip of a contact point this port will be used as fallback # Also, when no port-name is used and multiple results are returned for a given service, this port is - # used to disambiguate. When set to , defaults to the value of akka.management.http.port + # used to disambiguate. When set to , defaults to the value of akka.management.http.port. + # Only used by http probe method, for remoting, the cluster remoting port is used. fallback-port = "" # port pun, it "complements" 2552 which is often used for Akka remoting # If some discovered seed node will keep failing to connect for specified period of time, @@ -126,3 +131,16 @@ akka.management { } } + +akka.actor { + serializers { + akka-management-cluster-bootstrap = "akka.management.cluster.bootstrap.internal.BootstrapProtocolSerializer" + } + serialization-bindings { + "akka.management.cluster.bootstrap.contactpoint.HttpBootstrapJsonProtocol$SeedNodes" = akka-management-cluster-bootstrap + "akka.management.cluster.bootstrap.internal.RemotingContactPoint$GetSeedNodes$" = akka-management-cluster-bootstrap + } + serialization-identifiers { + "akka.management.cluster.bootstrap.internal.BootstrapProtocolSerializer" = 8788 + } +} \ No newline at end of file diff --git a/cluster-bootstrap/src/main/scala/akka/management/cluster/bootstrap/ClusterBootstrap.scala b/cluster-bootstrap/src/main/scala/akka/management/cluster/bootstrap/ClusterBootstrap.scala index abf78c60e..bb229c4f2 100644 --- a/cluster-bootstrap/src/main/scala/akka/management/cluster/bootstrap/ClusterBootstrap.scala +++ b/cluster-bootstrap/src/main/scala/akka/management/cluster/bootstrap/ClusterBootstrap.scala @@ -7,9 +7,9 @@ package akka.management.cluster.bootstrap import java.util.concurrent.atomic.AtomicReference import akka.AkkaVersion + import scala.concurrent.Future import scala.concurrent.Promise - import akka.actor.ActorSystem import akka.actor.ExtendedActorSystem import akka.actor.Extension @@ -17,19 +17,18 @@ import akka.actor.ExtensionId import akka.actor.ExtensionIdProvider import akka.annotation.InternalApi import akka.cluster.Cluster -import akka.discovery.{ Discovery, ServiceDiscovery } +import akka.discovery.{Discovery, ServiceDiscovery} import akka.event.Logging import akka.http.scaladsl.model.Uri import akka.http.scaladsl.server.Route import akka.management.cluster.bootstrap.contactpoint.HttpClusterBootstrapRoutes -import akka.management.cluster.bootstrap.internal.BootstrapCoordinator +import akka.management.cluster.bootstrap.internal.{BootstrapCoordinator, RemotingContactPoint} import akka.management.scaladsl.ManagementRouteProviderSettings import akka.management.scaladsl.ManagementRouteProvider final class ClusterBootstrap(implicit system: ExtendedActorSystem) extends Extension with ManagementRouteProvider { import ClusterBootstrap.Internal._ - import system.dispatcher private val log = Logging(system, classOf[ClusterBootstrap]) @@ -59,11 +58,18 @@ final class ClusterBootstrap(implicit system: ExtendedActorSystem) extends Exten .get } - private[this] val _selfContactPointUri: Promise[Uri] = Promise() + private[this] val _selfContactPointUri: Promise[(String, Int)] = settings.contactPoint.probeMethod match { + case BootstrapCoordinator.ProbeMethodRemoting => + val self = Cluster(system).selfAddress + Promise.successful((self.host.getOrElse(sys.error("No host")), self.port.getOrElse(sys.error("No port")))) + case _ => Promise() + } override def routes(routeProviderSettings: ManagementRouteProviderSettings): Route = { - log.info(s"Using self contact point address: ${routeProviderSettings.selfBaseUri}") - this.setSelfContactPoint(routeProviderSettings.selfBaseUri) + if (settings.contactPoint.probeMethod == BootstrapCoordinator.ProbeMethodAkkaManagement) { + log.info(s"Using self contact point address: ${routeProviderSettings.selfBaseUri}") + this.setSelfContactPoint(routeProviderSettings.selfBaseUri) + } new HttpClusterBootstrapRoutes(settings).routes } @@ -80,6 +86,10 @@ final class ClusterBootstrap(implicit system: ExtendedActorSystem) extends Exten val bootstrapProps = BootstrapCoordinator.props(discovery, joinDecider, settings) val bootstrap = system.systemActorOf(bootstrapProps, "bootstrapCoordinator") + if (settings.contactPoint.probeMethod == BootstrapCoordinator.ProbeMethodRemoting) { + system.systemActorOf(RemotingContactPoint.props(settings), RemotingContactPoint.RemotingContactPointActorName) + } + // Bootstrap already logs in several other execution points when it can't form a cluster, and why. bootstrap ! BootstrapCoordinator.Protocol.InitiateBootstrapping } else log.warning("Bootstrap already initiated, yet start() method was called again. Ignoring.") @@ -96,13 +106,11 @@ final class ClusterBootstrap(implicit system: ExtendedActorSystem) extends Exten */ @InternalApi private[akka] def setSelfContactPoint(baseUri: Uri): Unit = - _selfContactPointUri.success(baseUri) + _selfContactPointUri.success((baseUri.authority.host.toString, baseUri.authority.port)) /** INTERNAL API */ @InternalApi private[akka] def selfContactPoint: Future[(String, Int)] = - _selfContactPointUri.future.map { uri => - (uri.authority.host.toString, uri.authority.port) - } + _selfContactPointUri.future } object ClusterBootstrap extends ExtensionId[ClusterBootstrap] with ExtensionIdProvider { diff --git a/cluster-bootstrap/src/main/scala/akka/management/cluster/bootstrap/ClusterBootstrapSettings.scala b/cluster-bootstrap/src/main/scala/akka/management/cluster/bootstrap/ClusterBootstrapSettings.scala index a2a34da4b..61734f899 100644 --- a/cluster-bootstrap/src/main/scala/akka/management/cluster/bootstrap/ClusterBootstrapSettings.scala +++ b/cluster-bootstrap/src/main/scala/akka/management/cluster/bootstrap/ClusterBootstrapSettings.scala @@ -10,8 +10,10 @@ import java.util.concurrent.TimeUnit import akka.actor.ActorSystem import akka.event.LoggingAdapter +import akka.management.cluster.bootstrap.internal.BootstrapCoordinator import com.typesafe.config.Config -import scala.concurrent.duration.{ FiniteDuration, _ } + +import scala.concurrent.duration.{FiniteDuration, _} import scala.compat.java8.OptionConverters._ import akka.util.JavaDurationConverters._ @@ -122,6 +124,10 @@ final class ClusterBootstrapSettings(config: Config, log: LoggingAdapter) { object contactPoint { private val contactPointConfig = bootConfig.getConfig("contact-point") + val probeMethod: String = contactPointConfig.getString("probe-method") + + require(BootstrapCoordinator.ValidProbeMethods.contains(probeMethod), "Probe method must be one of: " + BootstrapCoordinator.ValidProbeMethods.mkString(", ")) + val fallbackPort: Int = contactPointConfig .optDefinedValue("fallback-port") diff --git a/cluster-bootstrap/src/main/scala/akka/management/cluster/bootstrap/contactpoint/HttpClusterBootstrapRoutes.scala b/cluster-bootstrap/src/main/scala/akka/management/cluster/bootstrap/contactpoint/HttpClusterBootstrapRoutes.scala index c8b35df02..fe2d337db 100644 --- a/cluster-bootstrap/src/main/scala/akka/management/cluster/bootstrap/contactpoint/HttpClusterBootstrapRoutes.scala +++ b/cluster-bootstrap/src/main/scala/akka/management/cluster/bootstrap/contactpoint/HttpClusterBootstrapRoutes.scala @@ -5,10 +5,7 @@ package akka.management.cluster.bootstrap.contactpoint import scala.concurrent.duration._ - import akka.actor.ActorSystem -import akka.cluster.Cluster -import akka.cluster.Member import akka.event.Logging import akka.event.LoggingAdapter import akka.http.javadsl.server.directives.RouteAdapter @@ -16,8 +13,7 @@ import akka.http.scaladsl.model.HttpRequest import akka.http.scaladsl.model.Uri import akka.http.scaladsl.server.Route import akka.management.cluster.bootstrap.ClusterBootstrapSettings -import akka.management.cluster.bootstrap.contactpoint.HttpBootstrapJsonProtocol.ClusterMember -import akka.management.cluster.bootstrap.contactpoint.HttpBootstrapJsonProtocol.SeedNodes +import akka.management.cluster.bootstrap.internal.ContactPoint final class HttpClusterBootstrapRoutes(settings: ClusterBootstrapSettings) extends HttpBootstrapJsonProtocol { @@ -25,26 +21,8 @@ final class HttpClusterBootstrapRoutes(settings: ClusterBootstrapSettings) exten private def routeGetSeedNodes: Route = extractClientIP { clientIp ⇒ extractActorSystem { implicit system ⇒ - import akka.cluster.MemberStatus - val cluster = Cluster(system) - - def memberToClusterMember(m: Member): ClusterMember = - ClusterMember(m.uniqueAddress.address, m.uniqueAddress.longUid, m.status.toString, m.roles) - - val state = cluster.state - - // TODO shuffle the members so in a big deployment nodes start joining different ones and not all the same? - val members = state.members - .diff(state.unreachable) - .filter( - m => m.status == MemberStatus.up || m.status == MemberStatus.weaklyUp || m.status == MemberStatus.joining) - .take(settings.contactPoint.httpMaxSeedNodesToExpose) - .map(memberToClusterMember) - - val info = SeedNodes(cluster.selfMember.uniqueAddress.address, members) - log.info("Bootstrap request from {}: Contact Point returning {} seed-nodes ([{}])", clientIp, members.size, - members) - complete(info) + val contactPoint = new ContactPoint(system, settings, log) + complete(contactPoint.seedNodes(clientIp.toString)) } } diff --git a/cluster-bootstrap/src/main/scala/akka/management/cluster/bootstrap/internal/AbstractContactPointBootstrap.scala b/cluster-bootstrap/src/main/scala/akka/management/cluster/bootstrap/internal/AbstractContactPointBootstrap.scala new file mode 100644 index 000000000..a35f08b49 --- /dev/null +++ b/cluster-bootstrap/src/main/scala/akka/management/cluster/bootstrap/internal/AbstractContactPointBootstrap.scala @@ -0,0 +1,122 @@ +/* + * Copyright (C) 2017-2018 Lightbend Inc. + */ + +package akka.management.cluster.bootstrap.internal + +import java.time.LocalDateTime +import java.util.concurrent.ThreadLocalRandom + +import akka.actor.{Actor, ActorLogging, DeadLetterSuppression, Status, Timers} +import akka.annotation.InternalApi +import akka.discovery.ServiceDiscovery.ResolvedTarget +import akka.management.cluster.bootstrap.ClusterBootstrapSettings +import akka.util.Timeout +import akka.pattern.pipe + +import scala.concurrent.Future +import scala.concurrent.duration._ + +@InternalApi +private[bootstrap] object AbstractContactPointBootstrap { + + private case object ProbeTick extends DeadLetterSuppression + private val ProbingTimerKey = "probing-key" +} + + +/** + * Intended to be spawned as child actor by a higher-level Bootstrap coordinator that manages obtaining of the URIs. + * + * This additional step may at-first seem superficial -- after all, we already have some addresses of the nodes + * that we'll want to join -- however it is not optional. By communicating with the actual nodes before joining their + * cluster we're able to inquire about their status, double-check if perhaps they are part of an existing cluster already + * that we should join, or even coordinate rolling upgrades or more advanced patterns. + */ +@InternalApi +private[bootstrap] abstract class AbstractContactPointBootstrap( + settings: ClusterBootstrapSettings, + contactPoint: ResolvedTarget +) extends Actor + with ActorLogging + with Timers { + + import AbstractContactPointBootstrap.ProbeTick + import AbstractContactPointBootstrap.ProbingTimerKey + import akka.management.cluster.bootstrap.contactpoint.HttpBootstrapJsonProtocol._ + import context.dispatcher + + private val probeInterval = settings.contactPoint.probeInterval + private implicit val probingFailureTimeout: Timeout = Timeout(settings.contactPoint.probingFailureTimeout) + + /** + * If probing keeps failing until the deadline triggers, we notify the parent, + * such that it rediscover again. + */ + private var probingKeepFailingDeadline: Deadline = settings.contactPoint.probingFailureTimeout.fromNow + + private def resetProbingKeepFailingWithinDeadline(): Unit = + probingKeepFailingDeadline = settings.contactPoint.probingFailureTimeout.fromNow + + override final def preStart(): Unit = + self ! ProbeTick + + override final def receive: Receive = { + case ProbeTick ⇒ + log.debug("Probing [{}] for seed nodes...", uri) + probe() pipeTo self + + case Status.Failure(cause) => + log.warning("Probing [{}] failed due to: {}", uri, cause.getMessage) + if (probingKeepFailingDeadline.isOverdue()) { + log.error("Overdue of probing-failure-timeout, stop probing, signaling that it's failed") + context.parent ! BootstrapCoordinator.Protocol.ProbingFailed(contactPoint, cause) + context.stop(self) + } else { + // keep probing, hoping the request will eventually succeed + scheduleNextContactPointProbing() + } + + case response: SeedNodes ⇒ + notifyParentAboutSeedNodes(response) + resetProbingKeepFailingWithinDeadline() + // we keep probing and looking if maybe a cluster does form after all + // (technically could be long polling or web-sockets, but that would need reconnect logic, so this is simpler) + scheduleNextContactPointProbing() + } + + /** + * Probe the contact point. + * + * @param probingFailureTimeout A timeout, if not replied within this timeout, the returned Future should fail. + * @return A future of the seed nodes. + */ + protected def probe()(implicit probingFailureTimeout: Timeout): Future[SeedNodes] + + /** + * Render the URI of the contact point as a string. + * + * This is used for logging purposes. + */ + protected def uri: String + + private def notifyParentAboutSeedNodes(members: SeedNodes): Unit = { + val seedAddresses = members.seedNodes.map(_.node) + context.parent ! BootstrapCoordinator.Protocol.ObtainedHttpSeedNodesObservation(timeNow(), contactPoint, + members.selfNode, seedAddresses) + } + + private def scheduleNextContactPointProbing(): Unit = + timers.startSingleTimer(ProbingTimerKey, ProbeTick, effectiveProbeInterval()) + + /** Duration with configured jitter applied */ + private def effectiveProbeInterval(): FiniteDuration = + probeInterval + jitter(probeInterval) + + def jitter(d: FiniteDuration): FiniteDuration = + (d.toMillis * settings.contactPoint.probeIntervalJitter * ThreadLocalRandom.current().nextDouble()).millis + + protected def timeNow(): LocalDateTime = + LocalDateTime.now() + +} diff --git a/cluster-bootstrap/src/main/scala/akka/management/cluster/bootstrap/internal/BootstrapCoordinator.scala b/cluster-bootstrap/src/main/scala/akka/management/cluster/bootstrap/internal/BootstrapCoordinator.scala index 44a773df8..9bdf0d431 100644 --- a/cluster-bootstrap/src/main/scala/akka/management/cluster/bootstrap/internal/BootstrapCoordinator.scala +++ b/cluster-bootstrap/src/main/scala/akka/management/cluster/bootstrap/internal/BootstrapCoordinator.scala @@ -20,7 +20,6 @@ import akka.annotation.InternalApi import akka.cluster.Cluster import akka.discovery.{ Lookup, ServiceDiscovery } import akka.discovery.ServiceDiscovery.ResolvedTarget -import akka.http.scaladsl.model.Uri import akka.management.cluster.bootstrap.ClusterBootstrapSettings import akka.management.cluster.bootstrap.JoinDecider import akka.management.cluster.bootstrap.JoinDecision @@ -70,6 +69,9 @@ private[akka] object BootstrapCoordinator { else this } + val ProbeMethodAkkaManagement = "akka-management" + val ProbeMethodRemoting = "remoting" + val ValidProbeMethods = Set(ProbeMethodAkkaManagement, ProbeMethodRemoting) } /** @@ -283,33 +285,46 @@ private[akka] class BootstrapCoordinator(discovery: ServiceDiscovery, } private[internal] def ensureProbing(contactPoint: ResolvedTarget): Option[ActorRef] = { - val targetPort = contactPoint.port.getOrElse(settings.contactPoint.fallbackPort) - val rawBaseUri = Uri("http", Uri.Authority(Uri.Host(contactPoint.host), targetPort)) - val baseUri = settings.managementBasePath.fold(rawBaseUri)(prefix => rawBaseUri.withPath(Uri.Path(s"/$prefix"))) - - val childActorName = s"contactPointProbe-${baseUri.authority.host}-${baseUri.authority.port}" - log.debug("Ensuring probing actor: " + childActorName) - - // This should never really happen in well configured env, but it may happen that someone is confused with ports - // and we end up trying to probe (using http for example) a port that actually is our own remoting port. - // We actively bail out of this case and log a warning instead. - val wasAboutToProbeSelfAddress = - baseUri.authority.host.address() == cluster.selfAddress.host.getOrElse("---") && - baseUri.authority.port == cluster.selfAddress.port.getOrElse(-1) - - if (wasAboutToProbeSelfAddress) { - log.warning("Misconfiguration detected! Attempted to start probing a contact-point which address [{}] " + - "matches our local remoting address [{}]. Avoiding probing this address. Consider double checking your service " + - "discovery and port configurations.", baseUri, cluster.selfAddress) - None - } else - context.child(childActorName) match { - case Some(contactPointProbingChild) ⇒ - Some(contactPointProbingChild) - case None ⇒ - val props = HttpContactPointBootstrap.props(settings, contactPoint, baseUri) - Some(context.actorOf(props, childActorName)) - } + val childActorName = contactPoint.port match { + case Some(port) => s"contactPointProbe-${contactPoint.host}-$port" + case None => s"contactPointProbe-${contactPoint.host}" + } + + log.debug("Ensuring probing actor: {}", childActorName) + + context.child(childActorName) match { + case some: Some[_] ⇒ + some + case None ⇒ + settings.contactPoint.probeMethod match { + case BootstrapCoordinator.ProbeMethodAkkaManagement => + val targetPort = contactPoint.port.getOrElse(settings.contactPoint.fallbackPort) + + // This should never really happen in well configured env, but it may happen that someone is confused with ports + // and we end up trying to probe (using http for example) a port that actually is our own remoting port. + // We actively bail out of this case and log a warning instead. + val wasAboutToProbeSelfAddress = + contactPoint.host == cluster.selfAddress.host.getOrElse("---") && + targetPort == cluster.selfAddress.port.getOrElse(-1) + + if (wasAboutToProbeSelfAddress) { + log.warning("Misconfiguration detected! Attempted to start probing a contact-point which address [{}] " + + "matches our local remoting address [{}]. Avoiding probing this address. Consider double checking your service " + + "discovery and port configurations.", contactPoint, cluster.selfAddress) + None + } else { + val props = HttpContactPointBootstrap.props(settings, contactPoint) + Some(context.actorOf(props, childActorName)) + } + case BootstrapCoordinator.ProbeMethodRemoting => + val props = RemotingContactPointBootstrap.props(settings, contactPoint) + Some(context.actorOf(props, childActorName)) + + case unknown => + throw new IllegalArgumentException("Unknown probe method: " + unknown) + } + } + } private def decide(): Unit = { diff --git a/cluster-bootstrap/src/main/scala/akka/management/cluster/bootstrap/internal/BootstrapProtocolSerializer.scala b/cluster-bootstrap/src/main/scala/akka/management/cluster/bootstrap/internal/BootstrapProtocolSerializer.scala new file mode 100644 index 000000000..e93363b74 --- /dev/null +++ b/cluster-bootstrap/src/main/scala/akka/management/cluster/bootstrap/internal/BootstrapProtocolSerializer.scala @@ -0,0 +1,37 @@ +/* + * Copyright (C) 2017-2018 Lightbend Inc. + */ + +package akka.management.cluster.bootstrap.internal + +import java.io.NotSerializableException + +import akka.actor.ExtendedActorSystem +import akka.management.cluster.bootstrap.contactpoint.HttpBootstrapJsonProtocol.SeedNodes +import akka.management.cluster.bootstrap.contactpoint.HttpBootstrapJsonProtocol +import akka.management.cluster.bootstrap.internal.RemotingContactPoint.GetSeedNodes +import akka.serialization.{BaseSerializer, SerializerWithStringManifest} +import spray.json._ + +class BootstrapProtocolSerializer(val system: ExtendedActorSystem) extends SerializerWithStringManifest + with BaseSerializer with HttpBootstrapJsonProtocol { + + override def toBinary(obj: AnyRef): Array[Byte] = obj match { + case seedNodes: SeedNodes => seedNodes.toJson.compactPrint.getBytes("utf-8") + case GetSeedNodes => Array.emptyByteArray + } + + override def manifest(obj: AnyRef): String = obj match { + case _: SeedNodes => "A" + case GetSeedNodes => "B" + case _ => throw new IllegalArgumentException(s"Can't serialize object of type ${obj.getClass} in [${getClass.getName}]") + } + + override def fromBinary(bytes: Array[Byte], manifest: String): AnyRef = manifest match { + case "A" => new String(bytes, "utf-8").parseJson.convertTo[SeedNodes] + case "B" => GetSeedNodes + case _ => throw new NotSerializableException( + s"Unimplemented deserialization of message with manifest [$manifest] in [${getClass.getName}]") + + } +} diff --git a/cluster-bootstrap/src/main/scala/akka/management/cluster/bootstrap/internal/ContactPoint.scala b/cluster-bootstrap/src/main/scala/akka/management/cluster/bootstrap/internal/ContactPoint.scala new file mode 100644 index 000000000..a1269cd20 --- /dev/null +++ b/cluster-bootstrap/src/main/scala/akka/management/cluster/bootstrap/internal/ContactPoint.scala @@ -0,0 +1,38 @@ +/* + * Copyright (C) 2017-2018 Lightbend Inc. + */ + +package akka.management.cluster.bootstrap.internal + +import akka.actor.ActorSystem +import akka.annotation.InternalApi +import akka.cluster.{Cluster, Member, MemberStatus} +import akka.event.LoggingAdapter +import akka.management.cluster.bootstrap.ClusterBootstrapSettings +import akka.management.cluster.bootstrap.contactpoint.HttpBootstrapJsonProtocol.{ClusterMember, SeedNodes} + +@InternalApi +private[bootstrap] class ContactPoint(system: ActorSystem, settings: ClusterBootstrapSettings, log: LoggingAdapter) { + + private val cluster = Cluster(system) + + def seedNodes(clientAddress: String): SeedNodes = { + def memberToClusterMember(m: Member): ClusterMember = + ClusterMember(m.uniqueAddress.address, m.uniqueAddress.longUid, m.status.toString, m.roles) + + val state = cluster.state + + // TODO shuffle the members so in a big deployment nodes start joining different ones and not all the same? + val members = state.members + .diff(state.unreachable) + .filter( + m => m.status == MemberStatus.up || m.status == MemberStatus.weaklyUp || m.status == MemberStatus.joining) + .take(settings.contactPoint.httpMaxSeedNodesToExpose) + .map(memberToClusterMember) + + val info = SeedNodes(cluster.selfMember.uniqueAddress.address, members) + log.info("Bootstrap request from {}: Contact Point returning {} seed-nodes ([{}])", clientAddress, members.size, + members) + info + } +} diff --git a/cluster-bootstrap/src/main/scala/akka/management/cluster/bootstrap/internal/HttpContactPointBootstrap.scala b/cluster-bootstrap/src/main/scala/akka/management/cluster/bootstrap/internal/HttpContactPointBootstrap.scala index 2ff7a7c33..70363ac5b 100644 --- a/cluster-bootstrap/src/main/scala/akka/management/cluster/bootstrap/internal/HttpContactPointBootstrap.scala +++ b/cluster-bootstrap/src/main/scala/akka/management/cluster/bootstrap/internal/HttpContactPointBootstrap.scala @@ -4,18 +4,11 @@ package akka.management.cluster.bootstrap.internal -import java.time.LocalDateTime -import java.util.concurrent.ThreadLocalRandom import java.util.concurrent.TimeoutException import scala.concurrent.Future import scala.concurrent.duration._ -import akka.actor.Actor -import akka.actor.ActorLogging -import akka.actor.DeadLetterSuppression import akka.actor.Props -import akka.actor.Status -import akka.actor.Timers import akka.annotation.InternalApi import akka.cluster.Cluster import akka.discovery.ServiceDiscovery.ResolvedTarget @@ -26,21 +19,17 @@ import akka.http.scaladsl.model.Uri import akka.http.scaladsl.settings.ConnectionPoolSettings import akka.http.scaladsl.unmarshalling.Unmarshal import akka.management.cluster.bootstrap.ClusterBootstrapSettings -import akka.management.cluster.bootstrap.contactpoint.ClusterBootstrapRequests -import akka.management.cluster.bootstrap.contactpoint.HttpBootstrapJsonProtocol -import akka.management.cluster.bootstrap.contactpoint.HttpBootstrapJsonProtocol.SeedNodes +import akka.management.cluster.bootstrap.contactpoint.{ ClusterBootstrapRequests, HttpBootstrapJsonProtocol } import akka.pattern.after import akka.pattern.pipe import akka.stream.ActorMaterializer +import akka.util.Timeout @InternalApi private[bootstrap] object HttpContactPointBootstrap { - def props(settings: ClusterBootstrapSettings, contactPoint: ResolvedTarget, baseUri: Uri): Props = - Props(new HttpContactPointBootstrap(settings, contactPoint, baseUri)) - - private case object ProbeTick extends DeadLetterSuppression - private val ProbingTimerKey = "probing-key" + def props(settings: ClusterBootstrapSettings, contactPoint: ResolvedTarget): Props = + Props(new HttpContactPointBootstrap(settings, contactPoint)) } /** @@ -54,73 +43,41 @@ private[bootstrap] object HttpContactPointBootstrap { @InternalApi private[bootstrap] class HttpContactPointBootstrap( settings: ClusterBootstrapSettings, - contactPoint: ResolvedTarget, - baseUri: Uri -) extends Actor - with ActorLogging - with Timers - with HttpBootstrapJsonProtocol { + contactPoint: ResolvedTarget +) extends AbstractContactPointBootstrap(settings, contactPoint) with HttpBootstrapJsonProtocol { - import HttpContactPointBootstrap.ProbeTick - import HttpContactPointBootstrap.ProbingTimerKey + import HttpBootstrapJsonProtocol._ private val cluster = Cluster(context.system) - if (baseUri.authority.host.address() == cluster.selfAddress.host.getOrElse("---") && - baseUri.authority.port == cluster.selfAddress.port.getOrElse(-1)) { - throw new IllegalArgumentException( - "Requested base Uri to be probed matches local remoting address, bailing out! " + - s"Uri: $baseUri, this node's remoting address: ${cluster.selfAddress}") - } - private implicit val mat = ActorMaterializer()(context.system) private val http = Http()(context.system) private val connectionPoolWithoutRetries = ConnectionPoolSettings(context.system).withMaxRetries(0) import context.dispatcher - private val probeInterval = settings.contactPoint.probeInterval - private val probeRequest = ClusterBootstrapRequests.bootstrapSeedNodes(baseUri) - private val replyTimeout = Future.failed(new TimeoutException(s"Probing timeout of [$baseUri]")) - - /** - * If probing keeps failing until the deadline triggers, we notify the parent, - * such that it rediscover again. - */ - private var probingKeepFailingDeadline: Deadline = settings.contactPoint.probingFailureTimeout.fromNow - - private def resetProbingKeepFailingWithinDeadline(): Unit = - probingKeepFailingDeadline = settings.contactPoint.probingFailureTimeout.fromNow - - override def preStart(): Unit = - self ! ProbeTick - - override def receive = { - case ProbeTick ⇒ - val req = ClusterBootstrapRequests.bootstrapSeedNodes(baseUri) - log.debug("Probing [{}] for seed nodes...", req.uri) - - val reply = http.singleRequest(probeRequest, settings = connectionPoolWithoutRetries).flatMap(handleResponse) - - val afterTimeout = after(settings.contactPoint.probingFailureTimeout, context.system.scheduler)(replyTimeout) - Future.firstCompletedOf(List(reply, afterTimeout)).pipeTo(self) - - case Status.Failure(cause) => - log.warning("Probing [{}] failed due to: {}", probeRequest.uri, cause.getMessage) - if (probingKeepFailingDeadline.isOverdue()) { - log.error("Overdue of probing-failure-timeout, stop probing, signaling that it's failed") - context.parent ! BootstrapCoordinator.Protocol.ProbingFailed(contactPoint, cause) - context.stop(self) - } else { - // keep probing, hoping the request will eventually succeed - scheduleNextContactPointProbing() - } + private val probeRequest = { + val targetPort = contactPoint.port.getOrElse(settings.contactPoint.fallbackPort) + val rawBaseUri = Uri("http", Uri.Authority(Uri.Host(contactPoint.host), targetPort)) + val baseUri = settings.managementBasePath.fold(rawBaseUri)(prefix => rawBaseUri.withPath(Uri.Path(s"/$prefix"))) + + if (baseUri.authority.host.address() == cluster.selfAddress.host.getOrElse("---") && + baseUri.authority.port == cluster.selfAddress.port.getOrElse(-1)) { + throw new IllegalArgumentException( + "Requested base Uri to be probed matches local remoting address, bailing out! " + + s"Uri: $baseUri, this node's remoting address: ${cluster.selfAddress}") + } + + ClusterBootstrapRequests.bootstrapSeedNodes(baseUri) + } + private val replyTimeout = Future.failed(new TimeoutException(s"Probing timeout of [${probeRequest.uri}]")) + + override val uri: String = probeRequest.uri.toString + + override protected def probe()(implicit probingFailureTimeout: Timeout): Future[SeedNodes] = { + val reply = http.singleRequest(probeRequest, settings = connectionPoolWithoutRetries).flatMap(handleResponse) - case response: SeedNodes ⇒ - notifyParentAboutSeedNodes(response) - resetProbingKeepFailingWithinDeadline() - // we keep probing and looking if maybe a cluster does form after all - // (technically could be long polling or web-sockets, but that would need reconnect logic, so this is simpler) - scheduleNextContactPointProbing() + val afterTimeout = after(settings.contactPoint.probingFailureTimeout, context.system.scheduler)(replyTimeout) + Future.firstCompletedOf(List(reply, afterTimeout)).pipeTo(self) } private def handleResponse(response: HttpResponse): Future[SeedNodes] = { @@ -135,24 +92,4 @@ private[bootstrap] class HttpContactPointBootstrap( new IllegalStateException(s"Expected response '200 OK' but found ${response.status}. Body: '$body'")) } } - - private def notifyParentAboutSeedNodes(members: SeedNodes): Unit = { - val seedAddresses = members.seedNodes.map(_.node) - context.parent ! BootstrapCoordinator.Protocol.ObtainedHttpSeedNodesObservation(timeNow(), contactPoint, - members.selfNode, seedAddresses) - } - - private def scheduleNextContactPointProbing(): Unit = - timers.startSingleTimer(ProbingTimerKey, ProbeTick, effectiveProbeInterval()) - - /** Duration with configured jitter applied */ - private def effectiveProbeInterval(): FiniteDuration = - probeInterval + jitter(probeInterval) - - def jitter(d: FiniteDuration): FiniteDuration = - (d.toMillis * settings.contactPoint.probeIntervalJitter * ThreadLocalRandom.current().nextDouble()).millis - - protected def timeNow(): LocalDateTime = - LocalDateTime.now() - } diff --git a/cluster-bootstrap/src/main/scala/akka/management/cluster/bootstrap/internal/RemotingContactPoint.scala b/cluster-bootstrap/src/main/scala/akka/management/cluster/bootstrap/internal/RemotingContactPoint.scala new file mode 100644 index 000000000..4f04090a6 --- /dev/null +++ b/cluster-bootstrap/src/main/scala/akka/management/cluster/bootstrap/internal/RemotingContactPoint.scala @@ -0,0 +1,30 @@ +/* + * Copyright (C) 2017-2018 Lightbend Inc. + */ + +package akka.management.cluster.bootstrap.internal + +import akka.actor.{Actor, ActorLogging, Props} +import akka.annotation.InternalApi +import akka.management.cluster.bootstrap.ClusterBootstrapSettings + +@InternalApi +private[bootstrap] object RemotingContactPoint { + case object GetSeedNodes + + val RemotingContactPointActorName = "remotingContactPoint" + def props(settings: ClusterBootstrapSettings): Props = Props(new RemotingContactPoint(settings)) +} + +@InternalApi +private[bootstrap] final class RemotingContactPoint(settings: ClusterBootstrapSettings) extends Actor with ActorLogging { + + private val contactPoint = new ContactPoint(context.system, settings, log) + + import RemotingContactPoint.GetSeedNodes + + override def receive: Receive = { + case GetSeedNodes => + sender() ! contactPoint.seedNodes(sender().path.address.toString) + } +} diff --git a/cluster-bootstrap/src/main/scala/akka/management/cluster/bootstrap/internal/RemotingContactPointBootstrap.scala b/cluster-bootstrap/src/main/scala/akka/management/cluster/bootstrap/internal/RemotingContactPointBootstrap.scala new file mode 100644 index 000000000..af6ef42f0 --- /dev/null +++ b/cluster-bootstrap/src/main/scala/akka/management/cluster/bootstrap/internal/RemotingContactPointBootstrap.scala @@ -0,0 +1,49 @@ +/* + * Copyright (C) 2017-2018 Lightbend Inc. + */ + +package akka.management.cluster.bootstrap.internal +import akka.actor.Props +import akka.annotation.InternalApi +import akka.cluster.Cluster +import akka.discovery.ServiceDiscovery.ResolvedTarget +import akka.management.cluster.bootstrap.ClusterBootstrapSettings +import akka.management.cluster.bootstrap.contactpoint.HttpBootstrapJsonProtocol +import akka.pattern.ask +import akka.util.Timeout + +import scala.concurrent.Future + +@InternalApi +private[bootstrap] object RemotingContactPointBootstrap { + def props(settings: ClusterBootstrapSettings, contactPoint: ResolvedTarget): Props = + Props(new RemotingContactPointBootstrap(settings, contactPoint)) +} + +@InternalApi +private[bootstrap] final class RemotingContactPointBootstrap( + settings: ClusterBootstrapSettings, + contactPoint: ResolvedTarget +) extends AbstractContactPointBootstrap(settings, contactPoint) { + + override protected val uri: String = { + val cluster = Cluster(context.system) + val targetPort = contactPoint.port + .orElse(cluster.selfAddress.port) + .getOrElse(throw new IllegalArgumentException("Cannot infer port for contact point")) + val address = cluster.selfAddress.copy(host = Some(contactPoint.host), port = Some(targetPort)) + (self.path.parent.parent / RemotingContactPoint.RemotingContactPointActorName).toStringWithAddress(address) + } + + private val remoteContactPoint = context.system.actorSelection(uri) + + /** + * Probe the contact point. + * + * @param probingFailureTimeout A timeout, if not replied within this timeout, the returned Future should fail. + * @return A future of the seed nodes. + */ + override protected def probe()(implicit probingFailureTimeout: Timeout): Future[HttpBootstrapJsonProtocol.SeedNodes] = { + (remoteContactPoint ? RemotingContactPoint.GetSeedNodes).mapTo[HttpBootstrapJsonProtocol.SeedNodes] + } +} diff --git a/cluster-bootstrap/src/test/scala/akka/management/cluster/bootstrap/contactpoint/ClusterBootstrapIntegrationSpec.scala b/cluster-bootstrap/src/test/scala/akka/management/cluster/bootstrap/contactpoint/HttpClusterBootstrapIntegrationSpec.scala similarity index 97% rename from cluster-bootstrap/src/test/scala/akka/management/cluster/bootstrap/contactpoint/ClusterBootstrapIntegrationSpec.scala rename to cluster-bootstrap/src/test/scala/akka/management/cluster/bootstrap/contactpoint/HttpClusterBootstrapIntegrationSpec.scala index 1f13145ff..946716055 100644 --- a/cluster-bootstrap/src/test/scala/akka/management/cluster/bootstrap/contactpoint/ClusterBootstrapIntegrationSpec.scala +++ b/cluster-bootstrap/src/test/scala/akka/management/cluster/bootstrap/contactpoint/HttpClusterBootstrapIntegrationSpec.scala @@ -21,9 +21,9 @@ import org.scalatest.{ Matchers, WordSpecLike } import scala.concurrent.Future import scala.concurrent.duration._ -class ClusterBootstrapIntegrationSpec extends WordSpecLike with Matchers { +class HttpClusterBootstrapIntegrationSpec extends WordSpecLike with Matchers { - "Cluster Bootstrap" should { + "Cluster Bootstrap with HTTP probing" should { var remotingPorts = Map.empty[String, Int] var contactPointPorts = Map.empty[String, Int] diff --git a/cluster-bootstrap/src/test/scala/akka/management/cluster/bootstrap/contactpoint/RemotingClusterBootstrapIntegrationSpec.scala b/cluster-bootstrap/src/test/scala/akka/management/cluster/bootstrap/contactpoint/RemotingClusterBootstrapIntegrationSpec.scala new file mode 100644 index 000000000..91aa36b2e --- /dev/null +++ b/cluster-bootstrap/src/test/scala/akka/management/cluster/bootstrap/contactpoint/RemotingClusterBootstrapIntegrationSpec.scala @@ -0,0 +1,142 @@ +/* + * Copyright (C) 2017-2018 Lightbend Inc. + */ + +package akka.management.cluster.bootstrap.contactpoint + +import java.net.InetAddress + +import akka.actor.ActorSystem +import akka.cluster.Cluster +import akka.cluster.ClusterEvent.{CurrentClusterState, MemberUp} +import akka.discovery.ServiceDiscovery.{Resolved, ResolvedTarget} +import akka.discovery.{Lookup, MockDiscovery} +import akka.management.cluster.bootstrap.ClusterBootstrap +import akka.stream.ActorMaterializer +import akka.testkit.{SocketUtil, TestKit, TestProbe} +import com.typesafe.config.{Config, ConfigFactory} +import org.scalatest.{Matchers, WordSpecLike} + +import scala.concurrent.Future +import scala.concurrent.duration._ + +class RemotingClusterBootstrapIntegrationSpec extends WordSpecLike with Matchers { + + "Cluster Bootstrap with remoting probing" should { + + var remotingPorts = Map.empty[String, Int] + + def config(id: String): Config = { + val remotingPort = SocketUtil.temporaryServerAddress("127.0.0.1").getPort + + info(s"System [$id]: remoting port: $remotingPort") + + remotingPorts = remotingPorts.updated(id, remotingPort) + + ConfigFactory.parseString(s""" + akka { + loglevel = INFO + + cluster.jmx.multi-mbeans-in-same-jvm = on + + # this can be referred to in tests to use the mock discovery implementation + discovery.mock-dns.class = "akka.discovery.MockDiscovery" + + remote.netty.tcp.port = $remotingPort + + management { + + cluster.bootstrap { + contact-point-discovery { + discovery-method = mock-dns + + service-name = "remotingservice" + port-name = "remoting2" + protocol = "tcp2" + + service-namespace = "svc.cluster.local" + + stable-margin = 4 seconds + } + contact-point { + probe-method = remoting + } + } + } + } + """.stripMargin).withFallback(ConfigFactory.load()) + } + + val systemA = ActorSystem("System", config("A")) + val systemB = ActorSystem("System", config("B")) + val systemC = ActorSystem("System", config("C")) + + val clusterA = Cluster(systemA) + val clusterB = Cluster(systemB) + val clusterC = Cluster(systemC) + + val bootstrapA = ClusterBootstrap(systemA) + val bootstrapB = ClusterBootstrap(systemB) + val bootstrapC = ClusterBootstrap(systemC) + + // prepare the "mock DNS" + val name = "remotingservice.svc.cluster.local" + MockDiscovery.set(Lookup(name, Some("remoting2"), Some("tcp2")), + () => + Future.successful( + Resolved(name, + List( + ResolvedTarget( + host = clusterA.selfAddress.host.get, + port = remotingPorts.get("A"), + address = Option(InetAddress.getByName(clusterA.selfAddress.host.get)) + ), + ResolvedTarget( + host = clusterB.selfAddress.host.get, + port = remotingPorts.get("B"), + address = Option(InetAddress.getByName(clusterB.selfAddress.host.get)) + ), + ResolvedTarget( + host = clusterC.selfAddress.host.get, + port = remotingPorts.get("C"), + address = Option(InetAddress.getByName(clusterC.selfAddress.host.get)) + ) + )) + )) + + "start listening with the remote contact-points on 3 systems" in { + def start(system: ActorSystem) = { + implicit val sys = system + implicit val mat = ActorMaterializer()(system) + ClusterBootstrap(system) + } + + start(systemA) + start(systemB) + start(systemC) + } + + "join three DNS discovered nodes by forming new cluster (happy path)" in { + bootstrapA.discovery.getClass should ===(classOf[MockDiscovery]) + + bootstrapA.start() + bootstrapB.start() + bootstrapC.start() + + val pA = TestProbe()(systemA) + clusterA.subscribe(pA.ref, classOf[MemberUp]) + + pA.expectMsgType[CurrentClusterState] + val up1 = pA.expectMsgType[MemberUp](30.seconds) + info("" + up1) + } + + "terminate all systems" in { + try TestKit.shutdownActorSystem(systemA, 3.seconds) + finally try TestKit.shutdownActorSystem(systemB, 3.seconds) + finally TestKit.shutdownActorSystem(systemC, 3.seconds) + } + + } + +}