From 7467f12a5e96388da69d3aabcf04b529c59e08fc Mon Sep 17 00:00:00 2001 From: Clemens Wolff Date: Wed, 21 Feb 2018 16:30:55 -0500 Subject: [PATCH] Add logging for stream creation --- .../sources/streamfactories/StreamFactoryBase.scala | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/project-fortis-spark/src/main/scala/com/microsoft/partnercatalyst/fortis/spark/sources/streamfactories/StreamFactoryBase.scala b/project-fortis-spark/src/main/scala/com/microsoft/partnercatalyst/fortis/spark/sources/streamfactories/StreamFactoryBase.scala index 5531f35f..f4cab04b 100644 --- a/project-fortis-spark/src/main/scala/com/microsoft/partnercatalyst/fortis/spark/sources/streamfactories/StreamFactoryBase.scala +++ b/project-fortis-spark/src/main/scala/com/microsoft/partnercatalyst/fortis/spark/sources/streamfactories/StreamFactoryBase.scala @@ -1,6 +1,6 @@ package com.microsoft.partnercatalyst.fortis.spark.sources.streamfactories -import com.microsoft.partnercatalyst.fortis.spark.logging.FortisTelemetry +import com.microsoft.partnercatalyst.fortis.spark.logging.FortisTelemetry.{get => Log} import com.microsoft.partnercatalyst.fortis.spark.sources.streamprovider.{ConnectorConfig, InvalidConnectorConfigException, StreamFactory} import org.apache.spark.streaming.StreamingContext import org.apache.spark.streaming.dstream.DStream @@ -11,12 +11,12 @@ import scala.util.Try abstract class StreamFactoryBase[A: ClassTag] extends StreamFactory[A]{ override def createStream(ssc: StreamingContext): PartialFunction[ConnectorConfig, DStream[A]] = { case config if canHandle(config) => + Log.logInfo(s"Creating stream ${config.name}") + val stream = buildStream(ssc, config).transform(rdd => { - // Bake telemetry for incoming batch sizes into resulting stream - val batchSize = rdd.count() - val streamId = config.parameters("streamId").toString - val connectorName = config.name - FortisTelemetry.get.logEvent("batch.receive", Map("streamId" -> streamId, "connectorName" -> connectorName), Map("batchSize" -> batchSize.toDouble)) + Log.logEvent("batch.receive", + Map("streamId" -> config.parameters("streamId").toString, "connectorName" -> config.name), + Map("batchSize" -> rdd.count().toDouble)) rdd })