diff --git a/docs/configuration.md b/docs/configuration.md index 945b306901dd1..acee267883ed5 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -8,7 +8,7 @@ title: Spark Configuration Spark provides three locations to configure the system: * [Spark properties](#spark-properties) control most application parameters and can be set by using - a [SparkConf](api/core/index.html#org.apache.spark.SparkConf) object, or through Java + a [SparkConf](api/scala/index.html#org.apache.spark.SparkConf) object, or through Java system properties. * [Environment variables](#environment-variables) can be used to set per-machine settings, such as the IP address, through the `conf/spark-env.sh` script on each node. diff --git a/docs/streaming-custom-receivers.md b/docs/streaming-custom-receivers.md index 27cd085782f66..912bf37c81543 100644 --- a/docs/streaming-custom-receivers.md +++ b/docs/streaming-custom-receivers.md @@ -7,25 +7,29 @@ Spark Streaming can receive streaming data from any arbitrary data source beyond the one's for which it has in-built support (that is, beyond Flume, Kafka, Kinesis, files, sockets, etc.). This requires the developer to implement a *receiver* that is customized for receiving data from the concerned data source. This guide walks through the process of implementing a custom receiver -and using it in a Spark Streaming application. +and using it in a Spark Streaming application. Note that custom receivers can be implemented +in Scala or Java. -### Implementing a Custom Receiver +## Implementing a Custom Receiver -This starts with implementing a [Receiver](api/scala/index.html#org.apache.spark.streaming.receiver.Receiver). +This starts with implementing a **Receiver** +([Scala doc](api/scala/index.html#org.apache.spark.streaming.receiver.Receiver), +[Java doc](api/java/org/apache/spark/streaming/receiver/Receiver.html)). A custom receiver must extend this abstract class by implementing two methods - `onStart()`: Things to do to start receiving data. - `onStop()`: Things to do to stop receiving data. -Note that `onStart()` and `onStop()` must not block indefinitely. Typically, onStart() would start the threads +Both `onStart()` and `onStop()` must not block indefinitely. Typically, `onStart()` would start the threads that responsible for receiving the data and `onStop()` would ensure that the receiving by those threads are stopped. The receiving threads can also use `isStopped()`, a `Receiver` method, to check whether they should stop receiving data. Once the data is received, that data can be stored inside Spark -by calling `store(data)`, which is a method provided by the -[Receiver](api/scala/index.html#org.apache.spark.streaming.receiver.Receiver) class. +by calling `store(data)`, which is a method provided by the Receiver class. There are number of flavours of `store()` which allow you store the received data -record-at-a-time or as whole collection of objects / serialized bytes. +record-at-a-time or as whole collection of objects / serialized bytes. Note that the flavour of +`store()` used to implemented a receiver affects its reliability and fault-tolerance semantics. +This is discussed [later](#receiver-reliability) in more detail. Any exception in the receiving threads should be caught and handled properly to avoid silent failures of the receiver. `restart()` will restart the receiver by @@ -158,7 +162,7 @@ public class JavaCustomReceiver extends Receiver { -### Using the custom receiver in a Spark Streaming application +## Using the custom receiver in a Spark Streaming application The custom receiver can be used in a Spark Streaming application by using `streamingContext.receiverStream()`. This will create @@ -191,9 +195,68 @@ The full source code is in the example [JavaCustomReceiver.java](https://github. - - -### Implementing and Using a Custom Actor-based Receiver +## Receiver Reliability +As discussed in brief in the +[Spark Streaming Programming Guide](streaming-programming-guide.html#receiver-reliability), +there are two kinds of receivers based on their reliability and fault-tolerance semantics. + +1. *Reliable Receiver* - For *reliable sources* that allow sent data to be acknowledged, a + *reliable receiver* correctly acknowledges to the source that the data has been received + and stored in Spark reliably (that is, replicated successfully). Usually, + implementing this receiver involves careful consideration of the semantics of source + acknowledgements. +1. *Unreliable Receiver* - These are receivers for unreliable sources that do not support + acknowledging. Even for reliable sources, one may implement an unreliable receiver that + do not go into the complexity of acknowledging correctly. + +To implement a *reliable receiver*, you have to use `store(multiple-records)` to store data. +This flavour of `store` is a blocking call which returns only after all the given records have +been stored inside Spark. If replication is enabled receiver's configured storage level +(enabled by default), then this call returns after replication has completed. +Thus it ensures that the data is reliably stored, and the receiver can now acknowledge the +source appropriately. This ensures that no data is caused when the receiver fails in the middle +of replicating data -- the buffered data will not be acknowledged and hence will be later resent +by the source. + +An *unreliable receiver* does not have to implement any of this logic. It can simply receive +records from the source and insert them one-at-a-time using `store(single-record)`. While it does +not get the reliability guarantees of `store(multiple-records)`, it has the following advantages. + +- The system takes care of chunking that data into appropriate sized blocks (look for block +interval in the [Spark Streaming Programming Guide](streaming-programming-guide.html)). +- The system takes care of controlling the receiving rates if the rate limits have been specified. +- Because of these two, *unreliable receivers are simpler to implement than reliable receivers. + +The following table summarizes the characteristics of both types of receivers + + + + + + + + + + + + + + + + + + +
Receiver TypeCharacteristics
Unreliable Receivers + Simple to implement.
+ System takes care of block generation and rate control. + No fault-tolerance guarantees, can loose data on receiver failure. +
Reliable Receivers + Strong fault-tolerance guarantees, can ensure zero data loss.
+ Block generation and rate control to be handled by the receiver implementation.
+ Implementation complexity depends on the acknowledgement mechanisms of the source. +
+ +## Implementing and Using a Custom Actor-based Receiver Custom [Akka Actors](http://doc.akka.io/docs/akka/2.2.4/scala/actors.html) can also be used to receive data. The [`ActorHelper`](api/scala/index.html#org.apache.spark.streaming.receiver.ActorHelper) @@ -217,5 +280,3 @@ val lines = ssc.actorStream[String](Props(new CustomActor()), "CustomReceiver") See [ActorWordCount.scala](https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/ActorWordCount.scala) for an end-to-end example. - -