diff --git a/project-fortis-spark/src/main/scala/com/microsoft/partnercatalyst/fortis/spark/sources/streamfactories/StreamFactoryBase.scala b/project-fortis-spark/src/main/scala/com/microsoft/partnercatalyst/fortis/spark/sources/streamfactories/StreamFactoryBase.scala index 5531f35f..f4cab04b 100644 --- a/project-fortis-spark/src/main/scala/com/microsoft/partnercatalyst/fortis/spark/sources/streamfactories/StreamFactoryBase.scala +++ b/project-fortis-spark/src/main/scala/com/microsoft/partnercatalyst/fortis/spark/sources/streamfactories/StreamFactoryBase.scala @@ -1,6 +1,6 @@ package com.microsoft.partnercatalyst.fortis.spark.sources.streamfactories -import com.microsoft.partnercatalyst.fortis.spark.logging.FortisTelemetry +import com.microsoft.partnercatalyst.fortis.spark.logging.FortisTelemetry.{get => Log} import com.microsoft.partnercatalyst.fortis.spark.sources.streamprovider.{ConnectorConfig, InvalidConnectorConfigException, StreamFactory} import org.apache.spark.streaming.StreamingContext import org.apache.spark.streaming.dstream.DStream @@ -11,12 +11,12 @@ import scala.util.Try abstract class StreamFactoryBase[A: ClassTag] extends StreamFactory[A]{ override def createStream(ssc: StreamingContext): PartialFunction[ConnectorConfig, DStream[A]] = { case config if canHandle(config) => + Log.logInfo(s"Creating stream ${config.name}") + val stream = buildStream(ssc, config).transform(rdd => { - // Bake telemetry for incoming batch sizes into resulting stream - val batchSize = rdd.count() - val streamId = config.parameters("streamId").toString - val connectorName = config.name - FortisTelemetry.get.logEvent("batch.receive", Map("streamId" -> streamId, "connectorName" -> connectorName), Map("batchSize" -> batchSize.toDouble)) + Log.logEvent("batch.receive", + Map("streamId" -> config.parameters("streamId").toString, "connectorName" -> config.name), + Map("batchSize" -> rdd.count().toDouble)) rdd })