diff --git a/python/pyspark/context.py b/python/pyspark/context.py index aff14344ae371..b79600416b61a 100644 --- a/python/pyspark/context.py +++ b/python/pyspark/context.py @@ -488,14 +488,31 @@ def __init__(self, sparkContext): >>> srdd.collect() == [{"field1" : 1, "field2" : "row1"}, {"field1" : 2, "field2": "row2"}, {"field1" : 3, "field2": "row3"}] True + >>> sqlCtx.applySchema(srdd) # doctest: +IGNORE_EXCEPTION_DETAIL + Traceback (most recent call last): + ... + ValueError:... + + >>> bad_rdd = sc.parallelize([1,2,3]) + >>> sqlCtx.applySchema(bad_rdd) # doctest: +IGNORE_EXCEPTION_DETAIL + Traceback (most recent call last): + ... + ValueError:... + # registerRDDAsTable >>> sqlCtx.registerRDDAsTable(srdd, "table1") # sql - >>> srdd2 = sqlCtx.sql("select field1 as f1, field2 as f2 from table1") + >>> srdd2 = sqlCtx.sql("SELECT field1 AS f1, field2 as f2 from table1") >>> srdd2.collect() == [{"f1" : 1, "f2" : "row1"}, {"f1" : 2, "f2": "row2"}, {"f1" : 3, "f2": "row3"}] True + # table + #>>> sqlCtx.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)") + #>>> sqlCtx.sql('INSERT INTO src (key, value) VALUES (1, "one")') + #>>> sqlCtx.sql('INSERT INTO src (key, value) VALUES (2, "two")') + #>>> srdd3 = sqlCtx.table("src") + #>>> srdd3.collect() == [{"key" : 1, "value" : "one"}, {"key" : 2, "value": "two"}] """ self._sc = sparkContext self._jsc = self._sc._jsc @@ -506,17 +523,12 @@ def applySchema(self, rdd): """ Infer and apply a schema to an RDD of L{dict}s. We peek at the first row of the RDD to determine the fields names and types, and then use that to extract all the dictionaries. - - # >>> sc2 = SparkContext('local', 'test2') # doctest: +IGNORE_EXCEPTION_DETAIL - # Traceback (most recent call last): - # ... - # ValueError:... """ if (rdd.__class__ is SchemaRDD): raise ValueError("Cannot apply schema to %s" % SchemaRDD.__name__) elif not isinstance(rdd.first(), dict): raise ValueError("Only RDDs with dictionaries can be converted to %s: %s" % - (SchemaRDD.__name__, rdd.first().__class__.__name)) + (SchemaRDD.__name__, rdd.first())) jrdd = self._sc._pythonToJavaMap(rdd._jrdd) srdd = self._ssql_ctx.applySchema(jrdd.rdd()) @@ -524,7 +536,8 @@ def applySchema(self, rdd): def registerRDDAsTable(self, rdd, tableName): """ - + Registers the given RDD as a temporary table in the catalog. Temporary tables exist only + during the lifetime of this instance of SQLContext. """ if (rdd.__class__ is SchemaRDD): jschema_rdd = rdd._jschema_rdd @@ -533,22 +546,34 @@ def registerRDDAsTable(self, rdd, tableName): raise ValueError("Can only register SchemaRDD as table") def parquetFile(path): + """ + Loads a Parquet file, returning the result as a L{SchemaRDD}. + """ jschema_rdd = self._ssql_ctx.parquetFile(path) return SchemaRDD(jschema_rdd, self) def sql(self, sqlQuery): """ - Run a sql query over a registered table, and return a L{SchemaRDD} with the results. + Executes a SQL query using Spark, returning the result as a L{SchemaRDD}. """ return SchemaRDD(self._ssql_ctx.sql(sqlQuery), self) - def table(tableName): + def table(self, tableName): + """ + Returns the specified table as a L{SchemaRDD}. + """ return SchemaRDD(self._ssql_ctx.table(tableName), self) def cacheTable(tableName): + """ + Caches the specified table in-memory. + """ self._ssql_ctx.cacheTable(tableName) def uncacheTable(tableName): + """ + Removes the specified table from the in-memory cache. + """ self._ssql_ctx.uncacheTable(tableName) def _test():