The entry point into all functionality in Spark SQL is the
-[SQLContext](api/scala/index.html#org.apache.spark.sql.SQLContext) class, or one of its
-descendants. To create a basic SQLContext, all you need is a SparkContext.
+[`SQLContext`](api/scala/index.html#org.apache.spark.sql.`SQLContext`) class, or one of its
+descendants. To create a basic `SQLContext`, all you need is a SparkContext.
{% highlight scala %}
val sc: SparkContext // An existing SparkContext.
@@ -43,8 +43,8 @@ import sqlContext.implicits._
The entry point into all functionality in Spark SQL is the
-[SQLContext](api/java/index.html#org.apache.spark.sql.SQLContext) class, or one of its
-descendants. To create a basic SQLContext, all you need is a SparkContext.
+[`SQLContext`](api/java/index.html#org.apache.spark.sql.SQLContext) class, or one of its
+descendants. To create a basic `SQLContext`, all you need is a SparkContext.
{% highlight java %}
JavaSparkContext sc = ...; // An existing JavaSparkContext.
@@ -56,8 +56,8 @@ SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc);
The entry point into all relational functionality in Spark is the
-[SQLContext](api/python/pyspark.sql.SQLContext-class.html) class, or one
-of its decedents. To create a basic SQLContext, all you need is a SparkContext.
+[`SQLContext`](api/python/pyspark.sql.SQLContext-class.html) class, or one
+of its decedents. To create a basic `SQLContext`, all you need is a SparkContext.
{% highlight python %}
from pyspark.sql import SQLContext
@@ -67,20 +67,20 @@ sqlContext = SQLContext(sc)
-In addition to the basic SQLContext, you can also create a HiveContext, which provides a
-superset of the functionality provided by the basic SQLContext. Additional features include
+In addition to the basic `SQLContext`, you can also create a `HiveContext`, which provides a
+superset of the functionality provided by the basic `SQLContext`. Additional features include
the ability to write queries using the more complete HiveQL parser, access to Hive UDFs, and the
-ability to read data from Hive tables. To use a HiveContext, you do not need to have an
-existing Hive setup, and all of the data sources available to a SQLContext are still available.
-HiveContext is only packaged separately to avoid including all of Hive's dependencies in the default
-Spark build. If these dependencies are not a problem for your application then using HiveContext
-is recommended for the 1.3 release of Spark. Future releases will focus on bringing SQLContext up
-to feature parity with a HiveContext.
+ability to read data from Hive tables. To use a `HiveContext`, you do not need to have an
+existing Hive setup, and all of the data sources available to a `SQLContext` are still available.
+`HiveContext` is only packaged separately to avoid including all of Hive's dependencies in the default
+Spark build. If these dependencies are not a problem for your application then using `HiveContext`
+is recommended for the 1.3 release of Spark. Future releases will focus on bringing `SQLContext` up
+to feature parity with a `HiveContext`.
The specific variant of SQL that is used to parse queries can also be selected using the
`spark.sql.dialect` option. This parameter can be changed using either the `setConf` method on
-a SQLContext or by using a `SET key=value` command in SQL. For a SQLContext, the only dialect
-available is "sql" which uses a simple SQL parser provided by Spark SQL. In a HiveContext, the
+a `SQLContext` or by using a `SET key=value` command in SQL. For a `SQLContext`, the only dialect
+available is "sql" which uses a simple SQL parser provided by Spark SQL. In a `HiveContext`, the
default is "hiveql", though "sql" is also available. Since the HiveQL parser is much more complete,
this is recommended for most use cases.
@@ -100,7 +100,7 @@ val sqlContext = new org.apache.spark.sql.SQLContext(sc)
val df = sqlContext.jsonFile("examples/src/main/resources/people.json")
// Displays the content of the DataFrame to stdout
-df.show()
+df.show()
{% endhighlight %}
@@ -151,10 +151,10 @@ val df = sqlContext.jsonFile("examples/src/main/resources/people.json")
// Show the content of the DataFrame
df.show()
-// age name
+// age name
// null Michael
-// 30 Andy
-// 19 Justin
+// 30 Andy
+// 19 Justin
// Print the schema in a tree format
df.printSchema()
@@ -164,17 +164,17 @@ df.printSchema()
// Select only the "name" column
df.select("name").show()
-// name
+// name
// Michael
-// Andy
-// Justin
+// Andy
+// Justin
// Select everybody, but increment the age by 1
df.select("name", df("age") + 1).show()
// name (age + 1)
-// Michael null
-// Andy 31
-// Justin 20
+// Michael null
+// Andy 31
+// Justin 20
// Select people older than 21
df.filter(df("name") > 21).show()
@@ -201,10 +201,10 @@ DataFrame df = sqlContext.jsonFile("examples/src/main/resources/people.json");
// Show the content of the DataFrame
df.show();
-// age name
+// age name
// null Michael
-// 30 Andy
-// 19 Justin
+// 30 Andy
+// 19 Justin
// Print the schema in a tree format
df.printSchema();
@@ -214,17 +214,17 @@ df.printSchema();
// Select only the "name" column
df.select("name").show();
-// name
+// name
// Michael
-// Andy
-// Justin
+// Andy
+// Justin
// Select everybody, but increment the age by 1
df.select("name", df.col("age").plus(1)).show();
// name (age + 1)
-// Michael null
-// Andy 31
-// Justin 20
+// Michael null
+// Andy 31
+// Justin 20
// Select people older than 21
df.filter(df("name") > 21).show();
@@ -251,10 +251,10 @@ df = sqlContext.jsonFile("examples/src/main/resources/people.json")
# Show the content of the DataFrame
df.show()
-## age name
+## age name
## null Michael
-## 30 Andy
-## 19 Justin
+## 30 Andy
+## 19 Justin
# Print the schema in a tree format
df.printSchema()
@@ -264,17 +264,17 @@ df.printSchema()
# Select only the "name" column
df.select("name").show()
-## name
+## name
## Michael
-## Andy
-## Justin
+## Andy
+## Justin
# Select everybody, but increment the age by 1
df.select("name", df.age + 1).show()
## name (age + 1)
-## Michael null
-## Andy 31
-## Justin 20
+## Michael null
+## Andy 31
+## Justin 20
# Select people older than 21
df.filter(df.name > 21).show()
@@ -358,7 +358,7 @@ import sqlContext.implicits._
case class Person(name: String, age: Int)
// Create an RDD of Person objects and register it as a table.
-val people = sc.textFile("examples/src/main/resources/people.txt").map(_.split(",")).map(p => Person(p(0), p(1).trim.toInt))
+val people = sc.textFile("examples/src/main/resources/people.txt").map(_.split(",")).map(p => Person(p(0), p(1).trim.toInt)).toDF()
people.registerTempTable("people")
// SQL statements can be run by using the sql methods provided by sqlContext.
@@ -662,8 +662,146 @@ for name in names.collect():
Spark SQL supports operating on a variety of data sources through the `DataFrame` interface.
A DataFrame can be operated on as normal RDDs and can also be registered as a temporary table.
Registering a DataFrame as a table allows you to run SQL queries over its data. This section
-describes the various methods for loading data into a DataFrame.
+describes the general methods for loading and saving data using the Spark Data Sources and then
+goes into specific options that are available for the built-in data sources.
+
+## Generic Load/Save Functions
+
+In the simplest form, the default data source (`parquet` unless otherwise configured by
+`spark.sql.sources.default`) will be used for all operations.
+
+
+
+
+{% highlight scala %}
+val df = sqlContext.load("people.parquet")
+df.select("name", "age").save("namesAndAges.parquet")
+{% endhighlight %}
+
+
+
+
+
+{% highlight java %}
+
+DataFrame df = sqlContext.load("people.parquet");
+df.select("name", "age").save("namesAndAges.parquet");
+
+{% endhighlight %}
+
+
+
+
+
+{% highlight python %}
+
+df = sqlContext.load("people.parquet")
+df.select("name", "age").save("namesAndAges.parquet")
+
+{% endhighlight %}
+
+
+
+
+### Manually Specifying Options
+You can also manually specify the data source that will be used along with any extra options
+that you would like to pass to the data source. Data sources are specified by their fully qualified
+name (i.e., `org.apache.spark.sql.parquet`), but for built-in sources you can also use the shorted
+name (`json`, `parquet`, `jdbc`). DataFrames of any type can be converted into other types
+using this syntax.
+
+
+
+
+{% highlight scala %}
+val df = sqlContext.load("people.json", "json")
+df.select("name", "age").save("namesAndAges.parquet", "parquet")
+{% endhighlight %}
+
+
+
+
+
+{% highlight java %}
+
+DataFrame df = sqlContext.load("people.json", "json");
+df.select("name", "age").save("namesAndAges.parquet", "parquet");
+
+{% endhighlight %}
+
+
+
+
+
+{% highlight python %}
+
+df = sqlContext.load("people.json", "json")
+df.select("name", "age").save("namesAndAges.parquet", "parquet")
+
+{% endhighlight %}
+
+
+
+
+### Save Modes
+
+Save operations can optionally take a `SaveMode`, that specifies how to handle existing data if
+present. It is important to realize that these save modes do not utilize any locking and are not
+atomic. Thus, it is not safe to have multiple writers attempting to write to the same location.
+Additionally, when performing a `Overwrite`, the data will be deleted before writing out the
+new data.
+
+
+Scala/Java | Python | Meaning |
+
+ SaveMode.ErrorIfExists (default) |
+ "error" (default) |
+
+ When saving a DataFrame to a data source, if data already exists,
+ an exception is expected to be thrown.
+ |
+
+
+ SaveMode.Append |
+ "append" |
+
+ When saving a DataFrame to a data source, if data/table already exists,
+ contents of the DataFrame are expected to be appended to existing data.
+ |
+
+
+ SaveMode.Overwrite |
+ "overwrite" |
+
+ Overwrite mode means that when saving a DataFrame to a data source,
+ if data/table already exists, existing data is expected to be overwritten by the contents of
+ the DataFrame.
+ |
+
+
+ SaveMode.Ignore |
+ "ignore" |
+
+ Ignore mode means that when saving a DataFrame to a data source, if data already exists,
+ the save operation is expected to not save the contents of the DataFrame and to not
+ change the existing data. This is similar to a `CREATE TABLE IF NOT EXISTS` in SQL.
+ |
+
+
+
+### Saving to Persistent Tables
+
+When working with a `HiveContext`, `DataFrames` can also be saved as persistent tables using the
+`saveAsTable` command. Unlike the `registerTempTable` command, `saveAsTable` will materialize the
+contents of the dataframe and create a pointer to the data in the HiveMetastore. Persistent tables
+will still exist even after your Spark program has restarted, as long as you maintain your connection
+to the same metastore. A DataFrame for a persistent table can be created by calling the `table`
+method on a `SQLContext` with the name of the table.
+
+By default `saveAsTable` will create a "managed table", meaning that the location of the data will
+be controlled by the metastore. Managed tables will also have their data deleted automatically
+when a table is dropped.
## Parquet Files
@@ -751,11 +889,150 @@ for teenName in teenNames.collect():
+
+
+{% highlight sql %}
+
+CREATE TEMPORARY TABLE parquetTable
+USING org.apache.spark.sql.parquet
+OPTIONS (
+ path "examples/src/main/resources/people.parquet"
+)
+
+SELECT * FROM parquetTable
+
+{% endhighlight %}
+
+
+
+