512m
, 2g
).
+spark.driver.memory
512m
, 2g
).
+ spark.serializer
Property Name | Default | Meaning |
---|---|---|
spark.executor.memory |
- 512m | -
- Amount of memory to use per executor process, in the same format as JVM memory strings
- (e.g. 512m , 2g ).
- |
-
spark.executor.extraJavaOptions |
(none) | @@ -206,6 +206,35 @@ Apart from these, the following properties are also available, and may be useful used during aggregation goes above this amount, it will spill the data into disks.|
spark.python.profile |
+ false | ++ Enable profiling in Python worker, the profile result will show up by `sc.show_profiles()`, + or it will be displayed before the driver exiting. It also can be dumped into disk by + `sc.dump_profiles(path)`. If some of the profile results had been displayed maually, + they will not be displayed automatically before driver exiting. + | +
spark.python.profile.dump |
+ (none) | ++ The directory which is used to dump the profile result before driver exiting. + The results will be dumped as separated file for each RDD. They can be loaded + by ptats.Stats(). If this is specified, the profile result will not be displayed + automatically. + |
spark.python.worker.reuse |
+ true | ++ Reuse Python worker or not. If yes, it will use a fixed number of Python workers, + does not need to fork() a Python process for every tasks. It will be very useful + if there is large broadcast, then the broadcast will not be needed to transfered + from JVM to Python worker for every task. + | +
spark.executorEnv.[EnvironmentVariableName] |
(none) | @@ -214,6 +243,27 @@ Apart from these, the following properties are also available, and may be useful process. The user can specify multiple of these and to set multiple environment variables.|
spark.mesos.executor.home |
+ driver side SPARK_HOME |
+
+ Set the directory in which Spark is installed on the executors in Mesos. By default, the
+ executors will simply use the driver's Spark home directory, which may not be visible to
+ them. Note that this is only relevant if a Spark binary package is not specified through
+ spark.executor.uri .
+ |
+
spark.mesos.executor.memoryOverhead |
+ executor memory * 0.07, with minimum of 384 | +
+ This value is an additive for spark.executor.memory , specified in MiB,
+ which is used to calculate the total Mesos task memory. A value of 384
+ implies a 384MiB overhead. Additionally, there is a hard-coded 7% minimum
+ overhead. The final overhead will be the larger of either
+ `spark.mesos.executor.memoryOverhead` or 7% of `spark.executor.memory`.
+ |
+
spark.shuffle.manager
SORT
.
+ Implementation to use for shuffling data. There are two implementations available:
+ sort
and hash
. Sort-based shuffle is more memory-efficient and is
+ the default option starting in 1.2.
spark.ui.port
spark.io.compression.codec
lz4
, lzf
, and snappy
. You
- can also use fully qualified class names to specify the codec, e.g.
- org.apache.spark.io.LZ4CompressionCodec
,
+ The codec used to compress internal data such as RDD partitions, broadcast variables and
+ shuffle outputs. By default, Spark provides three codecs: lz4
, lzf
,
+ and snappy
. You can also use fully qualified class names to specify the codec,
+ e.g.
+ org.apache.spark.io.LZ4CompressionCodec
,
org.apache.spark.io.LZFCompressionCodec
,
and org.apache.spark.io.SnappyCompressionCodec
.
spark.files.fetchTimeout
spark.hadoop.cloneConf
Configuration
object for each task. This
+ option should be enabled to work around Configuration
thread-safety issues (see
+ SPARK-2546 for more details).
+ This is disabled by default in order to avoid unexpected performance regressions for jobs that
+ are not affected by these issues.spark.executor.heartbeatInterval
spark.port.maxRetries
spark.akka.heartbeat.pauses
spark.scheduler.revive.interval
spark.scheduler.maxRegisteredResourcesWaitingTime
+ spark.scheduler.maxRegisteredResourcesWaitingTime
.
reduceByKey
or combineByKey
will yield much better
performance.
leftOuterJoin
and rightOuterJoin
.
+ Outer joins are supported through leftOuterJoin
, rightOuterJoin
, and fullOuterJoin
.
spark.yarn.executor.memoryOverhead
spark.yarn.driver.memoryOverhead
spark.yarn.containerLauncherMaxThreads
Property Name | Default | Meaning |
---|---|---|
spark.sql.parquet.binaryAsString |
+ false | ++ Some other Parquet-producing systems, in particular Impala and older versions of Spark SQL, do + not differentiate between binary data and strings when writing out the Parquet schema. This + flag tells Spark SQL to interpret binary data as a string to provide compatibility with these systems. + | +
spark.sql.parquet.cacheMetadata |
+ false | ++ Turns on caching of Parquet schema metadata. Can speed up querying of static data. + | +
spark.sql.parquet.compression.codec |
+ snappy | ++ Sets the compression codec use when writing Parquet files. Acceptable values include: + uncompressed, snappy, gzip, lzo. + | +
Property Name | Default | Meaning |
---|---|---|
spark.sql.inMemoryColumnarStorage.compressed |
+ false | ++ When set to true Spark SQL will automatically select a compression codec for each column based + on statistics of the data. + | +
spark.sql.inMemoryColumnarStorage.batchSize |
+ 1000 | ++ Controls the size of batches for columnar caching. Larger batch sizes can improve memory utilization + and compression, but risk OOMs when caching data. + | +
Property Name | Default | Meaning |
---|---|---|
spark.sql.autoBroadcastJoinThreshold |
+ 10000 | ++ Configures the maximum size in bytes for a table that will be broadcast to all worker nodes when + performing a join. By setting this value to -1 broadcasting can be disabled. Note that currently + statistics are only supported for Hive Metastore tables where the command + `ANALYZE TABLE <tableName> COMPUTE STATISTICS noscan` has been run. + | +
spark.sql.codegen |
+ false | ++ When true, code will be dynamically generated at runtime for expression evaluation in a specific + query. For some queries with complicated expression this option can lead to significant speed-ups. + However, for simple queries this can actually slow down query execution. + | +
spark.sql.shuffle.partitions |
+ 200 | ++ Configures the number of partitions to use when shuffling data for joins or aggregations. + | +
Data type | +Value type in Scala | +API to access or create a data type |
---|---|---|
ByteType | +Byte | ++ ByteType + | +
ShortType | +Short | ++ ShortType + | +
IntegerType | +Int | ++ IntegerType + | +
LongType | +Long | ++ LongType + | +
FloatType | +Float | ++ FloatType + | +
DoubleType | +Double | ++ DoubleType + | +
DecimalType | +scala.math.sql.BigDecimal | ++ DecimalType + | +
StringType | +String | ++ StringType + | +
BinaryType | +Array[Byte] | ++ BinaryType + | +
BooleanType | +Boolean | ++ BooleanType + | +
TimestampType | +java.sql.Timestamp | ++ TimestampType + | +
ArrayType | +scala.collection.Seq | +
+ ArrayType(elementType, [containsNull]) + Note: The default value of containsNull is true. + |
+
MapType | +scala.collection.Map | +
+ MapType(keyType, valueType, [valueContainsNull]) + Note: The default value of valueContainsNull is true. + |
+
StructType | +org.apache.spark.sql.Row | +
+ StructType(fields) + Note: fields is a Seq of StructFields. Also, two fields with the same + name are not allowed. + |
+
StructField | +The value type in Scala of the data type of this field + (For example, Int for a StructField with the data type IntegerType) | ++ StructField(name, dataType, nullable) + | +
Data type | +Value type in Java | +API to access or create a data type |
---|---|---|
ByteType | +byte or Byte | ++ DataType.ByteType + | +
ShortType | +short or Short | ++ DataType.ShortType + | +
IntegerType | +int or Integer | ++ DataType.IntegerType + | +
LongType | +long or Long | ++ DataType.LongType + | +
FloatType | +float or Float | ++ DataType.FloatType + | +
DoubleType | +double or Double | ++ DataType.DoubleType + | +
DecimalType | +java.math.BigDecimal | ++ DataType.DecimalType + | +
StringType | +String | ++ DataType.StringType + | +
BinaryType | +byte[] | ++ DataType.BinaryType + | +
BooleanType | +boolean or Boolean | ++ DataType.BooleanType + | +
TimestampType | +java.sql.Timestamp | ++ DataType.TimestampType + | +
ArrayType | +java.util.List | +
+ DataType.createArrayType(elementType) + Note: The value of containsNull will be true + DataType.createArrayType(elementType, containsNull). + |
+
MapType | +java.util.Map | +
+ DataType.createMapType(keyType, valueType) + Note: The value of valueContainsNull will be true. + DataType.createMapType(keyType, valueType, valueContainsNull) + |
+
StructType | +org.apache.spark.sql.api.java.Row | +
+ DataType.createStructType(fields) + Note: fields is a List or an array of StructFields. + Also, two fields with the same name are not allowed. + |
+
StructField | +The value type in Java of the data type of this field + (For example, int for a StructField with the data type IntegerType) | ++ DataType.createStructField(name, dataType, nullable) + | +
Data type | +Value type in Python | +API to access or create a data type |
---|---|---|
ByteType | +
+ int or long + Note: Numbers will be converted to 1-byte signed integer numbers at runtime. + Please make sure that numbers are within the range of -128 to 127. + |
+ + ByteType() + | +
ShortType | +
+ int or long + Note: Numbers will be converted to 2-byte signed integer numbers at runtime. + Please make sure that numbers are within the range of -32768 to 32767. + |
+ + ShortType() + | +
IntegerType | +int or long | ++ IntegerType() + | +
LongType | +
+ long + Note: Numbers will be converted to 8-byte signed integer numbers at runtime. + Please make sure that numbers are within the range of + -9223372036854775808 to 9223372036854775807. + Otherwise, please convert data to decimal.Decimal and use DecimalType. + |
+ + LongType() + | +
FloatType | +
+ float + Note: Numbers will be converted to 4-byte single-precision floating + point numbers at runtime. + |
+ + FloatType() + | +
DoubleType | +float | ++ DoubleType() + | +
DecimalType | +decimal.Decimal | ++ DecimalType() + | +
StringType | +string | ++ StringType() + | +
BinaryType | +bytearray | ++ BinaryType() + | +
BooleanType | +bool | ++ BooleanType() + | +
TimestampType | +datetime.datetime | ++ TimestampType() + | +
ArrayType | +list, tuple, or array | +
+ ArrayType(elementType, [containsNull]) + Note: The default value of containsNull is True. + |
+
MapType | +dict | +
+ MapType(keyType, valueType, [valueContainsNull]) + Note: The default value of valueContainsNull is True. + |
+
StructType | +list or tuple | +
+ StructType(fields) + Note: fields is a Seq of StructFields. Also, two fields with the same + name are not allowed. + |
+
StructField | +The value type in Python of the data type of this field + (For example, Int for a StructField with the data type IntegerType) | ++ StructField(name, dataType, nullable) + | +
swift://container.PROVIDER/path
. You will also need to set your
+Swift security credentials, through core-site.xml
or via
+SparkContext.hadoopConfiguration
.
+Current Swift driver requires Swift to use Keystone authentication method.
+
+# Configuring Swift for Better Data Locality
+
+Although not mandatory, it is recommended to configure the proxy server of Swift with
+list_endpoints
to have better data locality. More information is
+[available here](https://github.com/openstack/swift/blob/master/swift/common/middleware/list_endpoints.py).
+
+
+# Dependencies
+
+The Spark application should include hadoop-openstack
dependency.
+For example, for Maven support, add the following to the pom.xml
file:
+
+{% highlight xml %}
+core-site.xml
and place it inside Spark's conf
directory.
+There are two main categories of parameters that should to be configured: declaration of the
+Swift driver and the parameters that are required by Keystone.
+
+Configuration of Hadoop to use Swift File system achieved via
+
+Property Name | Value |
---|---|
fs.swift.impl | +org.apache.hadoop.fs.swift.snative.SwiftNativeFileSystem | +
PROVIDER
can be any name.
+
+Property Name | Meaning | Required |
---|---|---|
fs.swift.service.PROVIDER.auth.url |
+ Keystone Authentication URL | +Mandatory | +
fs.swift.service.PROVIDER.auth.endpoint.prefix |
+ Keystone endpoints prefix | +Optional | +
fs.swift.service.PROVIDER.tenant |
+ Tenant | +Mandatory | +
fs.swift.service.PROVIDER.username |
+ Username | +Mandatory | +
fs.swift.service.PROVIDER.password |
+ Password | +Mandatory | +
fs.swift.service.PROVIDER.http.port |
+ HTTP port | +Mandatory | +
fs.swift.service.PROVIDER.region |
+ Keystone region | +Mandatory | +
fs.swift.service.PROVIDER.public |
+ Indicates if all URLs are public | +Mandatory | +
PROVIDER=SparkTest
and Keystone contains user tester
with password testing
+defined for tenant test
. Then core-site.xml
should include:
+
+{% highlight xml %}
+fs.swift.service.PROVIDER.tenant
,
+fs.swift.service.PROVIDER.username
,
+fs.swift.service.PROVIDER.password
contains sensitive information and keeping them in
+core-site.xml
is not always a good approach.
+We suggest to keep those parameters in core-site.xml
for testing purposes when running Spark
+via spark-shell
.
+For job submissions they should be provided via sparkContext.hadoopConfiguration
.
diff --git a/docs/streaming-custom-receivers.md b/docs/streaming-custom-receivers.md
index 1e045a3dd0ca9..27cd085782f66 100644
--- a/docs/streaming-custom-receivers.md
+++ b/docs/streaming-custom-receivers.md
@@ -186,7 +186,7 @@ JavaDStream
+
+
+
Source | Artifact |
---|---|
Kafka | spark-streaming-kafka_{{site.SCALA_BINARY_VERSION}} |
Flume | spark-streaming-flume_{{site.SCALA_BINARY_VERSION}} |
Kinesis | spark-streaming-kinesis-asl_{{site.SCALA_BINARY_VERSION}} [Apache Software License] |
spark-streaming-twitter_{{site.SCALA_BINARY_VERSION}} | |
ZeroMQ | spark-streaming-zeromq_{{site.SCALA_BINARY_VERSION}} |
MQTT | spark-streaming-mqtt_{{site.SCALA_BINARY_VERSION}} |
Kinesis (built separately) | kinesis-asl_{{site.SCALA_BINARY_VERSION}} |
@@ -392,8 +558,8 @@ as shown in the following figure. Any operation applied on a DStream translates to operations on the underlying RDDs. For example, in the [earlier example](#a-quick-example) of converting a stream of lines to words, -the `flatmap` operation is applied on each RDD in the `lines` DStream to generate the RDDs of the - `words` DStream. This is shown the following figure. +the `flatMap` operation is applied on each RDD in the `lines` DStream to generate the RDDs of the + `words` DStream. This is shown in the following figure.
-
Transformation | Meaning |
---|
Output Operation | Meaning | ||
---|---|---|---|
print() | -Prints first ten elements of every batch of data in a DStream on the driver. | -||
foreachRDD(func) | -The fundamental output operator. Applies a function, func, to each RDD generated from - the stream. This function should have side effects, such as printing output, saving the RDD to - external files, or writing it over the network to an external system. | +print() | + Prints first ten elements of every batch of data in a DStream on the driver.
+ This is useful for development and debugging.
+ + PS: called pprint() in Python) + |
saveAsObjectFiles(prefix, [suffix]) | @@ -811,17 +1059,162 @@ output operators are defined:Save this DStream's contents as a Hadoop file. The file name at each batch interval is generated based on prefix and suffix: "prefix-TIME_IN_MS[.suffix]". | ||
foreachRDD(func) | +The most generic output operator that applies a function, func, to each RDD generated from + the stream. This function should push the data in each RDD to a external system, like saving the RDD to + files, or writing it over the network to a database. Note that the function func is executed + at the driver, and will usually have RDD actions in it that will force the computation of the streaming RDDs. | +||