Skip to content

Commit

Permalink
Fixes druid-io#161: tranquility-flink supports only one sink operation.
Browse files Browse the repository at this point in the history
  • Loading branch information
Jan Graßegger committed Jun 21, 2016
1 parent 0cc9a61 commit 0ecf539
Showing 1 changed file with 10 additions and 10 deletions.
20 changes: 10 additions & 10 deletions flink/src/main/scala/com/metamx/tranquility/flink/BeamSink.scala
Original file line number Diff line number Diff line change
Expand Up @@ -39,24 +39,24 @@ class BeamSink[T](beamFactory: BeamFactory[T], reportDropsAsExceptions: Boolean
{
var sender: Option[Tranquilizer[T]] = None

private val exception = new AtomicReference[Throwable]()
private val receivedCounter = new LongCounter()
private val sentCounter = new LongCounter()
private val droppedCounter = new LongCounter()
private val exception = new AtomicReference[Throwable]()
private var receivedCounter: Option[LongCounter] = None
private var sentCounter : Option[LongCounter] = None
private var droppedCounter : Option[LongCounter] = None

override def open(parameters: Configuration) = {
sender = Some(beamFactory.tranquilizer)
getRuntimeContext.addAccumulator("Druid: Messages received", receivedCounter)
getRuntimeContext.addAccumulator("Druid: Messages sent", sentCounter)
getRuntimeContext.addAccumulator("Druid: Messages dropped", droppedCounter)
receivedCounter = Some(getRuntimeContext.getLongCounter("Druid: Messages received"))
sentCounter = Some(getRuntimeContext.getLongCounter("Druid: Messages sent"))
droppedCounter = Some(getRuntimeContext.getLongCounter("Druid: Messages dropped"))
}

override def invoke(value: T) = {
receivedCounter.add(1)
receivedCounter.get.add(1)
sender.get.send(value) respond {
case Return(()) => sentCounter.add(1)
case Return(()) => sentCounter.get.add(1)
case Throw(e: MessageDroppedException) if reportDropsAsExceptions => exception.compareAndSet(null, e)
case Throw(e: MessageDroppedException) => droppedCounter.add(1)
case Throw(e: MessageDroppedException) => droppedCounter.get.add(1)
case Throw(e) => exception.compareAndSet(null, e)
}

Expand Down

0 comments on commit 0ecf539

Please sign in to comment.