Skip to content

Commit

Permalink
[SPARK-14028][STREAMING][KINESIS][TESTS] Remove deprecated methods; f…
Browse files Browse the repository at this point in the history
…ix two other warnings

## What changes were proposed in this pull request?

- Removed two methods that has been deprecated since 1.4
- Fixed two other compilation warnings

## How was this patch tested?

existing test suits

Author: proflin <[email protected]>

Closes apache#11850 from lw-lin/streaming-kinesis-deprecates-warnings.
  • Loading branch information
lw-lin authored and roygao94 committed Mar 22, 2016
1 parent f27eb94 commit 20fb74e
Show file tree
Hide file tree
Showing 4 changed files with 13 additions and 98 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -221,51 +221,6 @@ object KinesisUtils {
}
}

/**
* Create an input stream that pulls messages from a Kinesis stream.
* This uses the Kinesis Client Library (KCL) to pull messages from Kinesis.
*
* Note:
*
* - The AWS credentials will be discovered using the DefaultAWSCredentialsProviderChain
* on the workers. See AWS documentation to understand how DefaultAWSCredentialsProviderChain
* gets AWS credentials.
* - The region of the `endpointUrl` will be used for DynamoDB and CloudWatch.
* - The Kinesis application name used by the Kinesis Client Library (KCL) will be the app name
* in [[org.apache.spark.SparkConf]].
*
* @param ssc StreamingContext object
* @param streamName Kinesis stream name
* @param endpointUrl Endpoint url of Kinesis service
* (e.g., https://kinesis.us-east-1.amazonaws.com)
* @param checkpointInterval Checkpoint interval for Kinesis checkpointing.
* See the Kinesis Spark Streaming documentation for more
* details on the different types of checkpoints.
* @param initialPositionInStream In the absence of Kinesis checkpoint info, this is the
* worker's initial starting position in the stream.
* The values are either the beginning of the stream
* per Kinesis' limit of 24 hours
* (InitialPositionInStream.TRIM_HORIZON) or
* the tip of the stream (InitialPositionInStream.LATEST).
* @param storageLevel Storage level to use for storing the received objects
* StorageLevel.MEMORY_AND_DISK_2 is recommended.
*/
@deprecated("use other forms of createStream", "1.4.0")
def createStream(
ssc: StreamingContext,
streamName: String,
endpointUrl: String,
checkpointInterval: Duration,
initialPositionInStream: InitialPositionInStream,
storageLevel: StorageLevel
): ReceiverInputDStream[Array[Byte]] = {
ssc.withNamedScope("kinesis stream") {
new KinesisInputDStream[Array[Byte]](ssc, streamName, endpointUrl,
getRegionByEndpoint(endpointUrl), initialPositionInStream, ssc.sc.appName,
checkpointInterval, storageLevel, defaultMessageHandler, None)
}
}

/**
* Create an input stream that pulls messages from a Kinesis stream.
* This uses the Kinesis Client Library (KCL) to pull messages from Kinesis.
Expand Down Expand Up @@ -453,47 +408,6 @@ object KinesisUtils {
defaultMessageHandler(_), awsAccessKeyId, awsSecretKey)
}

/**
* Create an input stream that pulls messages from a Kinesis stream.
* This uses the Kinesis Client Library (KCL) to pull messages from Kinesis.
*
* Note:
* - The AWS credentials will be discovered using the DefaultAWSCredentialsProviderChain
* on the workers. See AWS documentation to understand how DefaultAWSCredentialsProviderChain
* gets AWS credentials.
* - The region of the `endpointUrl` will be used for DynamoDB and CloudWatch.
* - The Kinesis application name used by the Kinesis Client Library (KCL) will be the app name in
* [[org.apache.spark.SparkConf]].
*
* @param jssc Java StreamingContext object
* @param streamName Kinesis stream name
* @param endpointUrl Endpoint url of Kinesis service
* (e.g., https://kinesis.us-east-1.amazonaws.com)
* @param checkpointInterval Checkpoint interval for Kinesis checkpointing.
* See the Kinesis Spark Streaming documentation for more
* details on the different types of checkpoints.
* @param initialPositionInStream In the absence of Kinesis checkpoint info, this is the
* worker's initial starting position in the stream.
* The values are either the beginning of the stream
* per Kinesis' limit of 24 hours
* (InitialPositionInStream.TRIM_HORIZON) or
* the tip of the stream (InitialPositionInStream.LATEST).
* @param storageLevel Storage level to use for storing the received objects
* StorageLevel.MEMORY_AND_DISK_2 is recommended.
*/
@deprecated("use other forms of createStream", "1.4.0")
def createStream(
jssc: JavaStreamingContext,
streamName: String,
endpointUrl: String,
checkpointInterval: Duration,
initialPositionInStream: InitialPositionInStream,
storageLevel: StorageLevel
): JavaReceiverInputDStream[Array[Byte]] = {
createStream(
jssc.ssc, streamName, endpointUrl, checkpointInterval, initialPositionInStream, storageLevel)
}

private def getRegionByEndpoint(endpointUrl: String): String = {
RegionUtils.getRegionByEndpoint(endpointUrl).getName()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark.streaming.kinesis;

import com.amazonaws.regions.RegionUtils;
import com.amazonaws.services.kinesis.model.Record;
import org.junit.Test;

Expand All @@ -34,11 +35,13 @@
public class JavaKinesisStreamSuite extends LocalJavaStreamingContext {
@Test
public void testKinesisStream() {
// Tests the API, does not actually test data receiving
JavaDStream<byte[]> kinesisStream = KinesisUtils.createStream(ssc, "mySparkStream",
"https://kinesis.us-west-2.amazonaws.com", new Duration(2000),
InitialPositionInStream.LATEST, StorageLevel.MEMORY_AND_DISK_2());
String dummyEndpointUrl = KinesisTestUtils.defaultEndpointUrl();
String dummyRegionName = RegionUtils.getRegionByEndpoint(dummyEndpointUrl).getName();

// Tests the API, does not actually test data receiving
JavaDStream<byte[]> kinesisStream = KinesisUtils.createStream(ssc, "myAppName", "mySparkStream",
dummyEndpointUrl, dummyRegionName, InitialPositionInStream.LATEST, new Duration(2000),
StorageLevel.MEMORY_AND_DISK_2());
ssc.stop();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ trait KinesisFunSuite extends SparkFunSuite {
if (shouldRunTests) {
body
} else {
ignore(s"$message [enable by setting env var $envVarNameForEnablingTests=1]")()
ignore(s"$message [enable by setting env var $envVarNameForEnablingTests=1]")(())
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -99,14 +99,10 @@ abstract class KinesisStreamTests(aggregateTestData: Boolean) extends KinesisFun
}

test("KinesisUtils API") {
// Tests the API, does not actually test data receiving
val kinesisStream1 = KinesisUtils.createStream(ssc, "mySparkStream",
dummyEndpointUrl, Seconds(2),
InitialPositionInStream.LATEST, StorageLevel.MEMORY_AND_DISK_2)
val kinesisStream2 = KinesisUtils.createStream(ssc, "myAppNam", "mySparkStream",
val kinesisStream1 = KinesisUtils.createStream(ssc, "myAppName", "mySparkStream",
dummyEndpointUrl, dummyRegionName,
InitialPositionInStream.LATEST, Seconds(2), StorageLevel.MEMORY_AND_DISK_2)
val kinesisStream3 = KinesisUtils.createStream(ssc, "myAppNam", "mySparkStream",
val kinesisStream2 = KinesisUtils.createStream(ssc, "myAppName", "mySparkStream",
dummyEndpointUrl, dummyRegionName,
InitialPositionInStream.LATEST, Seconds(2), StorageLevel.MEMORY_AND_DISK_2,
dummyAWSAccessKey, dummyAWSSecretKey)
Expand Down Expand Up @@ -154,7 +150,9 @@ abstract class KinesisStreamTests(aggregateTestData: Boolean) extends KinesisFun

// Verify that KinesisBackedBlockRDD is generated even when there are no blocks
val emptyRDD = kinesisStream.createBlockRDD(time, Seq.empty)
emptyRDD shouldBe a [KinesisBackedBlockRDD[Array[Byte]]]
// Verify it's KinesisBackedBlockRDD[_] rather than KinesisBackedBlockRDD[Array[Byte]], because
// the type parameter will be erased at runtime
emptyRDD shouldBe a [KinesisBackedBlockRDD[_]]
emptyRDD.partitions shouldBe empty

// Verify that the KinesisBackedBlockRDD has isBlockValid = false when blocks are invalid
Expand Down

0 comments on commit 20fb74e

Please sign in to comment.