Skip to content

Commit

Permalink
Merge branch 'df5' of github.com:davies/spark into SPARK-5752
Browse files Browse the repository at this point in the history
Conflicts:
	python/pyspark/sql/dataframe.py
	python/pyspark/sql/functions.py
  • Loading branch information
rxin committed Feb 14, 2015
2 parents ff5832c + 97dd47c commit 61d3fca
Show file tree
Hide file tree
Showing 4 changed files with 17 additions and 30 deletions.
36 changes: 12 additions & 24 deletions python/pyspark/sql/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -392,21 +392,6 @@ def sample(self, withReplacement, fraction, seed=None):
rdd = self._jdf.sample(withReplacement, fraction, long(seed))
return DataFrame(rdd, self.sql_ctx)

# def takeSample(self, withReplacement, num, seed=None):
# """Return a fixed-size sampled subset of this DataFrame.
#
# >>> df = sqlCtx.inferSchema(rdd)
# >>> df.takeSample(False, 2, 97)
# [Row(field1=3, field2=u'row3'), Row(field1=1, field2=u'row1')]
# """
# seed = seed if seed is not None else random.randint(0, sys.maxint)
# with SCCallSiteSync(self.context) as css:
# bytesInJava = self._jdf \
# .takeSampleToPython(withReplacement, num, long(seed)) \
# .iterator()
# cls = _create_cls(self.schema)
# return map(cls, self._collect_iterator_through_file(bytesInJava))

@property
def dtypes(self):
"""Return all column names and their data types as a list.
Expand Down Expand Up @@ -649,11 +634,11 @@ def withColumnRenamed(self, existing, new):
for c in self.columns]
return self.select(*cols)

def to_pandas(self):
def toPandas(self):
"""
Collect all the rows and return a `pandas.DataFrame`.
>>> df.to_pandas() # doctest: +SKIP
>>> df.toPandas() # doctest: +SKIP
age name
0 2 Alice
1 5 Bob
Expand Down Expand Up @@ -701,8 +686,13 @@ def agg(self, *exprs):
name to aggregate methods.
>>> gdf = df.groupBy(df.name)
<<<<<<< HEAD
>>> gdf.agg({"age": "max"}).collect()
[Row(name=u'Bob', MAX(age#0)=5), Row(name=u'Alice', MAX(age#0)=2)]
=======
>>> gdf.agg({"*": "count"}).collect()
[Row(name=u'Bob', COUNT(1)=1), Row(name=u'Alice', COUNT(1)=1)]
>>>>>>> 97dd47cfa9af95ac2f6445d966ae43dc140542f1
>>> from pyspark.sql import functions as F
>>> gdf.agg(F.min(df.age)).collect()
Expand Down Expand Up @@ -935,11 +925,11 @@ def __repr__(self):
else:
return 'Column<%s>' % self._jdf.toString()

def to_pandas(self):
def toPandas(self):
"""
Return a pandas.Series from the column
>>> df.age.to_pandas() # doctest: +SKIP
>>> df.age.toPandas() # doctest: +SKIP
0 2
1 5
dtype: int64
Expand All @@ -957,11 +947,9 @@ def _test():
globs = pyspark.sql.dataframe.__dict__.copy()
sc = SparkContext('local[4]', 'PythonTest')
globs['sc'] = sc
globs['sqlCtx'] = sqlCtx = SQLContext(sc)
rdd2 = sc.parallelize([Row(name='Alice', age=2), Row(name='Bob', age=5)])
rdd3 = sc.parallelize([Row(name='Tom', height=80), Row(name='Bob', height=85)])
globs['df'] = sqlCtx.inferSchema(rdd2)
globs['df2'] = sqlCtx.inferSchema(rdd3)
globs['sqlCtx'] = SQLContext(sc)
globs['df'] = sc.parallelize([Row(name='Alice', age=2), Row(name='Bob', age=5)]).toDF()
globs['df2'] = sc.parallelize([Row(name='Tom', height=80), Row(name='Bob', height=85)]).toDF()
(failure_count, test_count) = doctest.testmod(
pyspark.sql.dataframe, globs=globs,
optionflags=doctest.ELLIPSIS | doctest.NORMALIZE_WHITESPACE)
Expand Down
6 changes: 3 additions & 3 deletions python/pyspark/sql/functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,9 +155,9 @@ def _test():
globs = pyspark.sql.dataframe.__dict__.copy()
sc = SparkContext('local[4]', 'PythonTest')
globs['sc'] = sc
globs['sqlCtx'] = sqlCtx = SQLContext(sc)
rdd2 = sc.parallelize([Row(name='Alice', age=2), Row(name='Bob', age=5)])
globs['df'] = sqlCtx.inferSchema(rdd2)
globs['sqlCtx'] = SQLContext(sc)
globs['df'] = sc.parallelize([Row(name='Alice', age=2), Row(name='Bob', age=5)]).toDF()
globs['df2'] = sc.parallelize([Row(name='Tom', height=80), Row(name='Bob', height=85)]).toDF()
(failure_count, test_count) = doctest.testmod(
pyspark.sql.dataframe, globs=globs,
optionflags=doctest.ELLIPSIS | doctest.NORMALIZE_WHITESPACE)
Expand Down
3 changes: 1 addition & 2 deletions python/pyspark/sql/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -336,8 +336,7 @@ def setUpClass(cls):
cls.sc._jvm.org.apache.spark.sql.hive.test.TestHiveContext(cls.sc._jsc.sc())
cls.sqlCtx = HiveContext(cls.sc, _scala_HiveContext)
cls.testData = [Row(key=i, value=str(i)) for i in range(100)]
rdd = cls.sc.parallelize(cls.testData)
cls.df = cls.sqlCtx.inferSchema(rdd)
cls.df = cls.sc.parallelize(cls.testData).toDF()

@classmethod
def tearDownClass(cls):
Expand Down
2 changes: 1 addition & 1 deletion python/run-tests
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ rm -rf metastore warehouse
function run_test() {
echo "Running test: $1" | tee -a $LOG_FILE

SPARK_TESTING=1 time "$FWDIR"/bin/pyspark $1 >> $LOG_FILE 2>&1
SPARK_TESTING=1 time "$FWDIR"/bin/pyspark $1 > $LOG_FILE 2>&1

FAILED=$((PIPESTATUS[0]||$FAILED))

Expand Down

0 comments on commit 61d3fca

Please sign in to comment.