Skip to content

Commit

Permalink
Add logging for stream creation
Browse files Browse the repository at this point in the history
  • Loading branch information
c-w committed Feb 21, 2018
1 parent e4f659e commit 7467f12
Showing 1 changed file with 6 additions and 6 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package com.microsoft.partnercatalyst.fortis.spark.sources.streamfactories

import com.microsoft.partnercatalyst.fortis.spark.logging.FortisTelemetry
import com.microsoft.partnercatalyst.fortis.spark.logging.FortisTelemetry.{get => Log}
import com.microsoft.partnercatalyst.fortis.spark.sources.streamprovider.{ConnectorConfig, InvalidConnectorConfigException, StreamFactory}
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.DStream
Expand All @@ -11,12 +11,12 @@ import scala.util.Try
abstract class StreamFactoryBase[A: ClassTag] extends StreamFactory[A]{
override def createStream(ssc: StreamingContext): PartialFunction[ConnectorConfig, DStream[A]] = {
case config if canHandle(config) =>
Log.logInfo(s"Creating stream ${config.name}")

val stream = buildStream(ssc, config).transform(rdd => {
// Bake telemetry for incoming batch sizes into resulting stream
val batchSize = rdd.count()
val streamId = config.parameters("streamId").toString
val connectorName = config.name
FortisTelemetry.get.logEvent("batch.receive", Map("streamId" -> streamId, "connectorName" -> connectorName), Map("batchSize" -> batchSize.toDouble))
Log.logEvent("batch.receive",
Map("streamId" -> config.parameters("streamId").toString, "connectorName" -> config.name),
Map("batchSize" -> rdd.count().toDouble))

rdd
})
Expand Down

0 comments on commit 7467f12

Please sign in to comment.