diff --git a/external/flume-sink/src/main/scala/org/apache/spark/flume/sink/SparkSink.scala b/external/flume-sink/src/main/scala/org/apache/spark/flume/sink/SparkSink.scala index 6243463a475b6..9e70be74d7e6d 100644 --- a/external/flume-sink/src/main/scala/org/apache/spark/flume/sink/SparkSink.scala +++ b/external/flume-sink/src/main/scala/org/apache/spark/flume/sink/SparkSink.scala @@ -30,9 +30,8 @@ import java.util.concurrent._ import java.util import org.apache.flume.conf.{ConfigurationException, Configurable} import com.google.common.util.concurrent.ThreadFactoryBuilder -import org.apache.avro.ipc.{NettyTransceiver, NettyServer} +import org.apache.avro.ipc.NettyServer import org.apache.avro.ipc.specific.SpecificResponder -import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory import java.net.InetSocketAddress class SparkSink() extends AbstractSink with Configurable { @@ -75,12 +74,10 @@ class SparkSink() extends AbstractSink with Configurable { val responder = new SpecificResponder(classOf[SparkFlumeProtocol], new AvroCallbackHandler()) - serverOpt = Option(new NettyServer(responder, new InetSocketAddress(hostname, port), - new NioServerSocketChannelFactory( - Executors.newCachedThreadPool(new ThreadFactoryBuilder().setNameFormat( - "Spark Sink " + classOf[NettyTransceiver].getSimpleName + " Boss-%d").build), - Executors.newFixedThreadPool(maxThreads, new ThreadFactoryBuilder().setNameFormat( - "Spark Sink " + classOf[NettyTransceiver].getSimpleName + " I/O Worker-%d").build)))) + // Using the constructor that takes specific thread-pools requires bringing in netty + // dependencies which are being excluded in the build. In practice, + // Netty dependencies are already available on the JVM as Flume would have pulled them in. + serverOpt = Option(new NettyServer(responder, new InetSocketAddress(hostname, port))) serverOpt.map(server => server.start()) lock.lock() @@ -93,10 +90,14 @@ class SparkSink() extends AbstractSink with Configurable { } override def stop() { + transactionExecutorOpt.map(executor => executor.shutdownNow()) + serverOpt.map(server => { + server.close() + server.join() + }) lock.lock() try { running = false - transactionExecutorOpt.map(executor => executor.shutdownNow()) blockingCondition.signalAll() } finally { lock.unlock() @@ -131,23 +132,28 @@ class SparkSink() extends AbstractSink with Configurable { Status.BACKOFF } + + // Object representing an empty batch returned by the txn processor due to some error. + case object ErrorEventBatch extends EventBatch + private class AvroCallbackHandler() extends SparkFlumeProtocol { override def getEventBatch(n: Int): EventBatch = { val processor = processorFactory.get.checkOut(n) transactionExecutorOpt.map(executor => executor.submit(processor)) // Wait until a batch is available - can be null if some error was thrown - val eventBatch = Option(processor.eventQueue.take()) - if (eventBatch.isDefined) { - val eventsToBeSent = eventBatch.get - processorMap.put(eventsToBeSent.getSequenceNumber, processor) - if (LOG.isDebugEnabled) { - LOG.debug("Sent " + eventsToBeSent.getEventBatch.size() + - " events with sequence number: " + eventsToBeSent.getSequenceNumber) + val eventBatch = processor.eventQueue.take() + eventBatch match { + case ErrorEventBatch => throw new FlumeException("Something went wrong. No events" + + " retrieved from channel.") + case events => { + processorMap.put(events.getSequenceNumber, processor) + if (LOG.isDebugEnabled) { + LOG.debug("Sent " + events.getEventBatch.size() + + " events with sequence number: " + events.getSequenceNumber) + } + events } - eventsToBeSent - } else { - throw new FlumeException("Error while trying to retrieve events from the channel.") } } @@ -211,17 +217,38 @@ class SparkSink() extends AbstractSink with Configurable { val events = eventBatch.getEventBatch events.clear() val loop = new Breaks + var gotEventsInThisTxn = false loop.breakable { - for (i <- 0 until maxBatchSize) { + var i = 0 + // Using for here causes the maxBatchSize change to be ineffective as the Range gets + // pregenerated + while (i < maxBatchSize) { + i += 1 val eventOpt = Option(getChannel.take()) - eventOpt.map(event => { events.add(new SparkSinkEvent(toCharSequenceMap(event .getHeaders), ByteBuffer.wrap(event.getBody))) + gotEventsInThisTxn = true }) if (eventOpt.isEmpty) { - loop.break() + if (!gotEventsInThisTxn) { + // To avoid sending empty batches, we wait till events are available backing off + // between attempts to get events. Each attempt to get an event though causes one + // iteration to be lost. To ensure that we still send back maxBatchSize number of + // events, we cheat and increase the maxBatchSize by 1 to account for the lost + // iteration. Even throwing an exception is expensive as Avro will serialize it + // and send it over the wire, which is useless. Before incrementing though, + // ensure that we are not anywhere near INT_MAX. + if (maxBatchSize >= Int.MaxValue / 2) { + // Random sanity check + throw new RuntimeException("Safety exception - polled too many times, no events!") + } + maxBatchSize += 1 + Thread.sleep(500) + } else { + loop.break() + } } } } @@ -283,7 +310,7 @@ class SparkSink() extends AbstractSink with Configurable { null // No point rethrowing the exception } finally { // Must *always* release the caller thread - eventQueue.put(null) + eventQueue.put(ErrorEventBatch) // In the case of success coming after the timeout, but before resetting the seq number // remove the event from the map and then clear the value resultQueue.clear() diff --git a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingReceiverSuite.scala b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingReceiverSuite.scala index 579f0b1091df3..aa5db4d94ff17 100644 --- a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingReceiverSuite.scala +++ b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingReceiverSuite.scala @@ -22,7 +22,6 @@ import org.apache.spark.streaming.{TestSuiteBase, TestOutputStream, StreamingCon import org.apache.spark.storage.StorageLevel import scala.collection.mutable.{SynchronizedBuffer, ArrayBuffer} import org.apache.spark.streaming.util.ManualClock -import java.nio.charset.Charset import org.apache.flume.channel.MemoryChannel import org.apache.flume.Context import org.apache.flume.conf.Configurables @@ -39,7 +38,7 @@ class FlumePollingReceiverSuite extends TestSuiteBase { // Set up the streaming context and input streams val ssc = new StreamingContext(conf, batchDuration) val flumeStream: ReceiverInputDStream[SparkPollingEvent] = - FlumeUtils.createPollingStream(ssc, "localhost", testPort, 100, 5, + FlumeUtils.createPollingStream(ssc, "localhost", testPort, 100, 1, StorageLevel.MEMORY_AND_DISK) val outputBuffer = new ArrayBuffer[Seq[SparkPollingEvent]] with SynchronizedBuffer[Seq[SparkPollingEvent]] @@ -63,15 +62,17 @@ class FlumePollingReceiverSuite extends TestSuiteBase { ssc.start() val clock = ssc.scheduler.clock.asInstanceOf[ManualClock] - val input = Seq(1, 2, 3, 4, 5) + var t = 0 for (i <- 0 until 5) { val tx = channel.getTransaction tx.begin() - for (j <- 0 until input.size) { + for (j <- 0 until 5) { channel.put(EventBuilder.withBody( - (String.valueOf(i) + input(j)).getBytes("utf-8"), - Map[String, String]("test-" + input(j).toString -> "header"))) + String.valueOf(t).getBytes("utf-8"), + Map[String, String]("test-" + t.toString -> "header"))) + t += 1 } + tx.commit() tx.close() Thread.sleep(500) // Allow some time for the events to reach @@ -86,19 +87,30 @@ class FlumePollingReceiverSuite extends TestSuiteBase { assert(timeTaken < maxWaitTimeMillis, "Operation timed out after " + timeTaken + " ms") logInfo("Stopping context") ssc.stop() + sink.stop() + channel.stop() - val decoder = Charset.forName("UTF-8").newDecoder() - - assert(outputBuffer.size === 5) + val flattenedBuffer = outputBuffer.flatten + assert(flattenedBuffer.size === 25) var counter = 0 - for (i <- 0 until outputBuffer.size; - j <- 0 until outputBuffer(i).size) { - counter += 1 - val eventToVerify = outputBuffer(i)(j).event - val str = decoder.decode(eventToVerify.getBody) - assert(str.toString === (String.valueOf(i) + input(j))) - assert(eventToVerify.getHeaders.get("test-" + input(j).toString) === "header") + for (i <- 0 until 25) { + val eventToVerify = EventBuilder.withBody( + String.valueOf(i).getBytes("utf-8"), + Map[String, String]("test-" + i.toString -> "header")) + var found = false + var j = 0 + while (j < flattenedBuffer.size && !found) { + val strToCompare = new String(flattenedBuffer(j).event.getBody.array(), "utf-8") + if (new String(eventToVerify.getBody, "utf-8") == strToCompare && + eventToVerify.getHeaders.get("test-" + i.toString) + .equals(flattenedBuffer(j).event.getHeaders.get("test-" + i.toString))) { + found = true + counter += 1 + } + j += 1 + } } + assert (counter === 25) } } diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index df21813ff983a..223c37d729fa6 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -23,7 +23,6 @@ import sbt.Keys._ import sbt.Task import sbtassembly.Plugin._ import AssemblyKeys._ -import sbtavro.SbtAvro._ import scala.Some import scala.util.Properties import org.scalastyle.sbt.ScalastylePlugin.{Settings => ScalaStyleSettings}