Skip to content

Commit

Permalink
[MINOR] Typo fixes
Browse files Browse the repository at this point in the history
## What changes were proposed in this pull request?

Typo fixes. No functional changes.

## How was this patch tested?

Built the sources and ran with samples.

Author: Jacek Laskowski <[email protected]>

Closes apache#11802 from jaceklaskowski/typo-fixes.
  • Loading branch information
jaceklaskowski authored and srowen committed Apr 2, 2016
1 parent 67d7535 commit 06694f1
Show file tree
Hide file tree
Showing 16 changed files with 52 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ object RecoverableNetworkWordCount {

def main(args: Array[String]) {
if (args.length != 4) {
System.err.println("You arguments were " + args.mkString("[", ", ", "]"))
System.err.println("Your arguments were " + args.mkString("[", ", ", "]"))
System.err.println(
"""
|Usage: RecoverableNetworkWordCount <hostname> <port> <checkpoint-directory>
Expand Down
2 changes: 1 addition & 1 deletion mllib/src/main/scala/org/apache/spark/ml/Pipeline.scala
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ class Pipeline @Since("1.4.0") (
t
case _ =>
throw new IllegalArgumentException(
s"Do not support stage $stage of type ${stage.getClass}")
s"Does not support stage $stage of type ${stage.getClass}")
}
if (index < indexOfLastEstimator) {
curDataset = transformer.transform(curDataset)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ private[regression] trait LinearRegressionParams extends PredictorParams
* The specific squared error loss function used is:
* L = 1/2n ||A coefficients - y||^2^
*
* This support multiple types of regularization:
* This supports multiple types of regularization:
* - none (a.k.a. ordinary least squares)
* - L2 (ridge regression)
* - L1 (Lasso)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,13 +79,13 @@ abstract class LogicalPlan extends QueryPlan[LogicalPlan] with Logging {

/**
* Computes [[Statistics]] for this plan. The default implementation assumes the output
* cardinality is the product of of all child plan's cardinality, i.e. applies in the case
* cardinality is the product of all child plan's cardinality, i.e. applies in the case
* of cartesian joins.
*
* [[LeafNode]]s must override this.
*/
def statistics: Statistics = {
if (children.size == 0) {
if (children.isEmpty) {
throw new UnsupportedOperationException(s"LeafNode $nodeName must implement statistics.")
}
Statistics(sizeInBytes = children.map(_.statistics.sizeInBytes).product)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ class ExperimentalMethods private[sql]() {

/**
* Allows extra strategies to be injected into the query planner at runtime. Note this API
* should be consider experimental and is not intended to be stable across releases.
* should be considered experimental and is not intended to be stable across releases.
*
* @since 1.3.0
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import org.apache.spark.util.collection.CompactBuffer
/**
* Performs an inner hash join of two child relations. When the output RDD of this operator is
* being constructed, a Spark job is asynchronously started to calculate the values for the
* broadcasted relation. This data is then placed in a Spark broadcast variable. The streamed
* broadcast relation. This data is then placed in a Spark broadcast variable. The streamed
* relation is not shuffled.
*/
case class BroadcastHashJoin(
Expand Down
12 changes: 6 additions & 6 deletions sql/core/src/main/scala/org/apache/spark/sql/functions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2232,7 +2232,7 @@ object functions {

/**
* Splits str around pattern (pattern is a regular expression).
* NOTE: pattern is a string represent the regular expression.
* NOTE: pattern is a string representation of the regular expression.
*
* @group string_funcs
* @since 1.5.0
Expand Down Expand Up @@ -2267,9 +2267,9 @@ object functions {

/**
* Translate any character in the src by a character in replaceString.
* The characters in replaceString is corresponding to the characters in matchingString.
* The translate will happen when any character in the string matching with the character
* in the matchingString.
* The characters in replaceString correspond to the characters in matchingString.
* The translate will happen when any character in the string matches the character
* in the `matchingString`.
*
* @group string_funcs
* @since 1.5.0
Expand Down Expand Up @@ -2692,7 +2692,7 @@ object functions {
//////////////////////////////////////////////////////////////////////////////////////////////

/**
* Returns true if the array contain the value
* Returns true if the array contains `value`
* @group collection_funcs
* @since 1.5.0
*/
Expand Down Expand Up @@ -2920,7 +2920,7 @@ object functions {

/**
* Defines a user-defined function (UDF) using a Scala closure. For this variant, the caller must
* specifcy the output data type, and there is no automatic input type coercion.
* specify the output data type, and there is no automatic input type coercion.
*
* @param f A closure in Scala
* @param dataType The output data type of the UDF
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ class StreamingContext private[streaming] (
"both SparkContext and checkpoint as null")
}

private[streaming] val isCheckpointPresent = (_cp != null)
private[streaming] val isCheckpointPresent: Boolean = _cp != null

private[streaming] val sc: SparkContext = {
if (_sc != null) {
Expand Down Expand Up @@ -213,8 +213,8 @@ class StreamingContext private[streaming] (
def sparkContext: SparkContext = sc

/**
* Set each DStreams in this context to remember RDDs it generated in the last given duration.
* DStreams remember RDDs only for a limited duration of time and releases them for garbage
* Set each DStream in this context to remember RDDs it generated in the last given duration.
* DStreams remember RDDs only for a limited duration of time and release them for garbage
* collection. This method allows the developer to specify how long to remember the RDDs (
* if the developer wishes to query old data outside the DStream computation).
* @param duration Minimum duration that each DStream should remember its RDDs
Expand Down Expand Up @@ -282,13 +282,14 @@ class StreamingContext private[streaming] (
}

/**
* Create a input stream from TCP source hostname:port. Data is received using
* Creates an input stream from TCP source hostname:port. Data is received using
* a TCP socket and the receive bytes is interpreted as UTF8 encoded `\n` delimited
* lines.
* @param hostname Hostname to connect to for receiving data
* @param port Port to connect to for receiving data
* @param storageLevel Storage level to use for storing the received objects
* (default: StorageLevel.MEMORY_AND_DISK_SER_2)
* @see [[socketStream]]
*/
def socketTextStream(
hostname: String,
Expand All @@ -299,7 +300,7 @@ class StreamingContext private[streaming] (
}

/**
* Create a input stream from TCP source hostname:port. Data is received using
* Creates an input stream from TCP source hostname:port. Data is received using
* a TCP socket and the receive bytes it interpreted as object using the given
* converter.
* @param hostname Hostname to connect to for receiving data
Expand Down Expand Up @@ -860,7 +861,7 @@ private class StreamingContextPythonHelper {
*/
def tryRecoverFromCheckpoint(checkpointPath: String): Option[StreamingContext] = {
val checkpointOption = CheckpointReader.read(
checkpointPath, new SparkConf(), SparkHadoopUtil.get.conf, false)
checkpointPath, new SparkConf(), SparkHadoopUtil.get.conf, ignoreReadError = false)
checkpointOption.map(new StreamingContext(null, _, null))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.{StreamingContext, Time}

/**
* An input stream that always returns the same RDD on each timestep. Useful for testing.
* An input stream that always returns the same RDD on each time step. Useful for testing.
*/
class ConstantInputDStream[T: ClassTag](_ssc: StreamingContext, rdd: RDD[T])
extends InputDStream[T](_ssc) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ abstract class DStream[T: ClassTag] (

// RDDs generated, marked as private[streaming] so that testsuites can access it
@transient
private[streaming] var generatedRDDs = new HashMap[Time, RDD[T]] ()
private[streaming] var generatedRDDs = new HashMap[Time, RDD[T]]()

// Time zero for the DStream
private[streaming] var zeroTime: Time = null
Expand Down Expand Up @@ -269,15 +269,15 @@ abstract class DStream[T: ClassTag] (
checkpointDuration == null || rememberDuration > checkpointDuration,
s"The remember duration for ${this.getClass.getSimpleName} has been set to " +
s" $rememberDuration which is not more than the checkpoint interval" +
s" ($checkpointDuration). Please set it to higher than $checkpointDuration."
s" ($checkpointDuration). Please set it to a value higher than $checkpointDuration."
)

dependencies.foreach(_.validateAtStart())

logInfo(s"Slide time = $slideDuration")
logInfo(s"Storage level = ${storageLevel.description}")
logInfo(s"Checkpoint interval = $checkpointDuration")
logInfo(s"Remember duration = $rememberDuration")
logInfo(s"Remember interval = $rememberDuration")
logInfo(s"Initialized and validated $this")
}

Expand Down Expand Up @@ -535,7 +535,7 @@ abstract class DStream[T: ClassTag] (
private def readObject(ois: ObjectInputStream): Unit = Utils.tryOrIOException {
logDebug(s"${this.getClass().getSimpleName}.readObject used")
ois.defaultReadObject()
generatedRDDs = new HashMap[Time, RDD[T]] ()
generatedRDDs = new HashMap[Time, RDD[T]]()
}

// =======================================================================
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import org.apache.spark.streaming.Time
import org.apache.spark.util.Utils

private[streaming]
class DStreamCheckpointData[T: ClassTag] (dstream: DStream[T])
class DStreamCheckpointData[T: ClassTag](dstream: DStream[T])
extends Serializable with Logging {
protected val data = new HashMap[Time, AnyRef]()

Expand All @@ -45,7 +45,7 @@ class DStreamCheckpointData[T: ClassTag] (dstream: DStream[T])
/**
* Updates the checkpoint data of the DStream. This gets called every time
* the graph checkpoint is initiated. Default implementation records the
* checkpoint files to which the generate RDDs of the DStream has been saved.
* checkpoint files at which the generated RDDs of the DStream have been saved.
*/
def update(time: Time) {

Expand Down Expand Up @@ -103,7 +103,7 @@ class DStreamCheckpointData[T: ClassTag] (dstream: DStream[T])

/**
* Restore the checkpoint data. This gets called once when the DStream graph
* (along with its DStreams) are being restored from a graph checkpoint file.
* (along with its output DStreams) is being restored from a graph checkpoint file.
* Default implementation restores the RDDs from their checkpoint files.
*/
def restore() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ import org.apache.spark.util.Utils
*
* @param _ssc Streaming context that will execute this input stream
*/
abstract class InputDStream[T: ClassTag] (_ssc: StreamingContext)
abstract class InputDStream[T: ClassTag](_ssc: StreamingContext)
extends DStream[T](_ssc) {

private[streaming] var lastValidTime: Time = null
Expand Down Expand Up @@ -90,8 +90,8 @@ abstract class InputDStream[T: ClassTag] (_ssc: StreamingContext)
} else {
// Time is valid, but check it it is more than lastValidTime
if (lastValidTime != null && time < lastValidTime) {
logWarning("isTimeValid called with " + time + " where as last valid time is " +
lastValidTime)
logWarning(s"isTimeValid called with $time whereas the last valid time " +
s"is $lastValidTime")
}
lastValidTime = time
true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ class ReducedWindowedDStream[K: ClassTag, V: ClassTag](

logDebug("Window time = " + windowDuration)
logDebug("Slide time = " + slideDuration)
logDebug("ZeroTime = " + zeroTime)
logDebug("Zero time = " + zeroTime)
logDebug("Current window = " + currentWindow)
logDebug("Previous window = " + previousWindow)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ class StateDStream[K: ClassTag, V: ClassTag, S: ClassTag](
// Try to get the parent RDD
parent.getOrCompute(validTime) match {
case Some(parentRDD) => { // If parent RDD exists, then compute as usual
computeUsingPreviousRDD (parentRDD, prevStateRDD)
computeUsingPreviousRDD(parentRDD, prevStateRDD)
}
case None => { // If parent RDD does not exist

Expand Down Expand Up @@ -98,15 +98,15 @@ class StateDStream[K: ClassTag, V: ClassTag, S: ClassTag](
// and then apply the update function
val updateFuncLocal = updateFunc
val finalFunc = (iterator: Iterator[(K, Iterable[V])]) => {
updateFuncLocal (iterator.map (tuple => (tuple._1, tuple._2.toSeq, None)))
updateFuncLocal(iterator.map(tuple => (tuple._1, tuple._2.toSeq, None)))
}

val groupedRDD = parentRDD.groupByKey (partitioner)
val sessionRDD = groupedRDD.mapPartitions (finalFunc, preservePartitioning)
val groupedRDD = parentRDD.groupByKey(partitioner)
val sessionRDD = groupedRDD.mapPartitions(finalFunc, preservePartitioning)
// logDebug("Generating state RDD for time " + validTime + " (first)")
Some (sessionRDD)
Some(sessionRDD)
}
case Some (initialStateRDD) => {
case Some(initialStateRDD) => {
computeUsingPreviousRDD(parentRDD, initialStateRDD)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ private[streaming] class ReceivedBlockTracker(
timeToAllocatedBlocks.put(batchTime, allocatedBlocks)
lastAllocatedBatchTime = batchTime
} else {
logInfo(s"Possibly processed batch $batchTime need to be processed again in WAL recovery")
logInfo(s"Possibly processed batch $batchTime needs to be processed again in WAL recovery")
}
} else {
// This situation occurs when:
Expand All @@ -129,7 +129,7 @@ private[streaming] class ReceivedBlockTracker(
// 2. Slow checkpointing makes recovered batch time older than WAL recovered
// lastAllocatedBatchTime.
// This situation will only occurs in recovery time.
logInfo(s"Possibly processed batch $batchTime need to be processed again in WAL recovery")
logInfo(s"Possibly processed batch $batchTime needs to be processed again in WAL recovery")
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,20 @@ import org.apache.spark.SparkConf
import org.apache.spark.streaming.Duration

/**
* A component that estimates the rate at wich an InputDStream should ingest
* elements, based on updates at every batch completion.
* A component that estimates the rate at which an `InputDStream` should ingest
* records, based on updates at every batch completion.
*
* @see [[org.apache.spark.streaming.scheduler.RateController]]
*/
private[streaming] trait RateEstimator extends Serializable {

/**
* Computes the number of elements the stream attached to this `RateEstimator`
* Computes the number of records the stream attached to this `RateEstimator`
* should ingest per second, given an update on the size and completion
* times of the latest batch.
*
* @param time The timetamp of the current batch interval that just finished
* @param elements The number of elements that were processed in this batch
* @param time The timestamp of the current batch interval that just finished
* @param elements The number of records that were processed in this batch
* @param processingDelay The time in ms that took for the job to complete
* @param schedulingDelay The time in ms that the job spent in the scheduling queue
*/
Expand All @@ -46,13 +48,13 @@ private[streaming] trait RateEstimator extends Serializable {
object RateEstimator {

/**
* Return a new RateEstimator based on the value of `spark.streaming.RateEstimator`.
* Return a new `RateEstimator` based on the value of
* `spark.streaming.backpressure.rateEstimator`.
*
* The only known estimator right now is `pid`.
* The only known and acceptable estimator right now is `pid`.
*
* @return An instance of RateEstimator
* @throws IllegalArgumentException if there is a configured RateEstimator that doesn't match any
* known estimators.
* @throws IllegalArgumentException if the configured RateEstimator is not `pid`.
*/
def create(conf: SparkConf, batchInterval: Duration): RateEstimator =
conf.get("spark.streaming.backpressure.rateEstimator", "pid") match {
Expand All @@ -64,6 +66,6 @@ object RateEstimator {
new PIDRateEstimator(batchInterval.milliseconds, proportional, integral, derived, minRate)

case estimator =>
throw new IllegalArgumentException(s"Unkown rate estimator: $estimator")
throw new IllegalArgumentException(s"Unknown rate estimator: $estimator")
}
}

0 comments on commit 06694f1

Please sign in to comment.