Skip to content

Commit

Permalink
added more tests :)
Browse files Browse the repository at this point in the history
  • Loading branch information
ahirreddy committed Apr 15, 2014
1 parent d26ec5e commit 7515ba0
Showing 1 changed file with 35 additions and 10 deletions.
45 changes: 35 additions & 10 deletions python/pyspark/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -488,14 +488,31 @@ def __init__(self, sparkContext):
>>> srdd.collect() == [{"field1" : 1, "field2" : "row1"}, {"field1" : 2, "field2": "row2"}, {"field1" : 3, "field2": "row3"}]
True
>>> sqlCtx.applySchema(srdd) # doctest: +IGNORE_EXCEPTION_DETAIL
Traceback (most recent call last):
...
ValueError:...
>>> bad_rdd = sc.parallelize([1,2,3])
>>> sqlCtx.applySchema(bad_rdd) # doctest: +IGNORE_EXCEPTION_DETAIL
Traceback (most recent call last):
...
ValueError:...
# registerRDDAsTable
>>> sqlCtx.registerRDDAsTable(srdd, "table1")
# sql
>>> srdd2 = sqlCtx.sql("select field1 as f1, field2 as f2 from table1")
>>> srdd2 = sqlCtx.sql("SELECT field1 AS f1, field2 as f2 from table1")
>>> srdd2.collect() == [{"f1" : 1, "f2" : "row1"}, {"f1" : 2, "f2": "row2"}, {"f1" : 3, "f2": "row3"}]
True
# table
#>>> sqlCtx.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)")
#>>> sqlCtx.sql('INSERT INTO src (key, value) VALUES (1, "one")')
#>>> sqlCtx.sql('INSERT INTO src (key, value) VALUES (2, "two")')
#>>> srdd3 = sqlCtx.table("src")
#>>> srdd3.collect() == [{"key" : 1, "value" : "one"}, {"key" : 2, "value": "two"}]
"""
self._sc = sparkContext
self._jsc = self._sc._jsc
Expand All @@ -506,25 +523,21 @@ def applySchema(self, rdd):
"""
Infer and apply a schema to an RDD of L{dict}s. We peek at the first row of the RDD to
determine the fields names and types, and then use that to extract all the dictionaries.
# >>> sc2 = SparkContext('local', 'test2') # doctest: +IGNORE_EXCEPTION_DETAIL
# Traceback (most recent call last):
# ...
# ValueError:...
"""
if (rdd.__class__ is SchemaRDD):
raise ValueError("Cannot apply schema to %s" % SchemaRDD.__name__)
elif not isinstance(rdd.first(), dict):
raise ValueError("Only RDDs with dictionaries can be converted to %s: %s" %
(SchemaRDD.__name__, rdd.first().__class__.__name))
(SchemaRDD.__name__, rdd.first()))

jrdd = self._sc._pythonToJavaMap(rdd._jrdd)
srdd = self._ssql_ctx.applySchema(jrdd.rdd())
return SchemaRDD(srdd, self)

def registerRDDAsTable(self, rdd, tableName):
"""
Registers the given RDD as a temporary table in the catalog. Temporary tables exist only
during the lifetime of this instance of SQLContext.
"""
if (rdd.__class__ is SchemaRDD):
jschema_rdd = rdd._jschema_rdd
Expand All @@ -533,22 +546,34 @@ def registerRDDAsTable(self, rdd, tableName):
raise ValueError("Can only register SchemaRDD as table")

def parquetFile(path):
"""
Loads a Parquet file, returning the result as a L{SchemaRDD}.
"""
jschema_rdd = self._ssql_ctx.parquetFile(path)
return SchemaRDD(jschema_rdd, self)

def sql(self, sqlQuery):
"""
Run a sql query over a registered table, and return a L{SchemaRDD} with the results.
Executes a SQL query using Spark, returning the result as a L{SchemaRDD}.
"""
return SchemaRDD(self._ssql_ctx.sql(sqlQuery), self)

def table(tableName):
def table(self, tableName):
"""
Returns the specified table as a L{SchemaRDD}.
"""
return SchemaRDD(self._ssql_ctx.table(tableName), self)

def cacheTable(tableName):
"""
Caches the specified table in-memory.
"""
self._ssql_ctx.cacheTable(tableName)

def uncacheTable(tableName):
"""
Removes the specified table from the in-memory cache.
"""
self._ssql_ctx.uncacheTable(tableName)

def _test():
Expand Down

0 comments on commit 7515ba0

Please sign in to comment.