From 2e0f3579f1fa7139c2e79bde656cbac049abbc33 Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Fri, 1 May 2015 23:43:24 -0700 Subject: [PATCH] [SPARK-7242] added python api for freqItems in DataFrames The python api for DataFrame's plus addressed your comments from previous PR. rxin Author: Burak Yavuz Closes #5859 from brkyvz/df-freq-py2 and squashes the following commits: f9aa9ce [Burak Yavuz] addressed comments v0.1 4b25056 [Burak Yavuz] added python api for freqItems --- python/pyspark/sql/dataframe.py | 25 +++++++++++++++++++ python/pyspark/sql/tests.py | 7 ++++++ .../spark/sql/DataFrameStatFunctions.scala | 9 ++++--- 3 files changed, 38 insertions(+), 3 deletions(-) diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py index 5ff49cac5522b..e9fd17ed4ce94 100644 --- a/python/pyspark/sql/dataframe.py +++ b/python/pyspark/sql/dataframe.py @@ -889,6 +889,26 @@ def cov(self, col1, col2): raise ValueError("col2 should be a string.") return self._jdf.stat().cov(col1, col2) + def freqItems(self, cols, support=None): + """ + Finding frequent items for columns, possibly with false positives. Using the + frequent element count algorithm described in + "http://dx.doi.org/10.1145/762471.762473, proposed by Karp, Schenker, and Papadimitriou". + :func:`DataFrame.freqItems` and :func:`DataFrameStatFunctions.freqItems` are aliases. + + :param cols: Names of the columns to calculate frequent items for as a list or tuple of + strings. + :param support: The frequency with which to consider an item 'frequent'. Default is 1%. + The support must be greater than 1e-4. + """ + if isinstance(cols, tuple): + cols = list(cols) + if not isinstance(cols, list): + raise ValueError("cols must be a list or tuple of column names as strings.") + if not support: + support = 0.01 + return DataFrame(self._jdf.stat().freqItems(_to_seq(self._sc, cols), support), self.sql_ctx) + @ignore_unicode_prefix def withColumn(self, colName, col): """Returns a new :class:`DataFrame` by adding a column. @@ -1344,6 +1364,11 @@ def cov(self, col1, col2): cov.__doc__ = DataFrame.cov.__doc__ + def freqItems(self, cols, support=None): + return self.df.freqItems(cols, support) + + freqItems.__doc__ = DataFrame.freqItems.__doc__ + def _test(): import doctest diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index 44c8b6a1aac13..613efc0ac029d 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -375,6 +375,13 @@ def test_column_select(self): self.assertEqual(self.testData, df.select(df.key, df.value).collect()) self.assertEqual([Row(value='1')], df.where(df.key == 1).select(df.value).collect()) + def test_freqItems(self): + vals = [Row(a=1, b=-2.0) if i % 2 == 0 else Row(a=i, b=i * 1.0) for i in range(100)] + df = self.sc.parallelize(vals).toDF() + items = df.stat.freqItems(("a", "b"), 0.4).collect()[0] + self.assertTrue(1 in items[0]) + self.assertTrue(-2.0 in items[1]) + def test_aggregator(self): df = self.df g = df.groupBy() diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameStatFunctions.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameStatFunctions.scala index 23652aeb7c7bc..e8fa82947759b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameStatFunctions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameStatFunctions.scala @@ -43,7 +43,10 @@ final class DataFrameStatFunctions private[sql](df: DataFrame) { } /** - * Runs `freqItems` with a default `support` of 1%. + * Finding frequent items for columns, possibly with false positives. Using the + * frequent element count algorithm described in + * [[http://dx.doi.org/10.1145/762471.762473, proposed by Karp, Schenker, and Papadimitriou]]. + * Uses a `default` support of 1%. * * @param cols the names of the columns to search frequent items in. * @return A Local DataFrame with the Array of frequent items for each column. @@ -55,14 +58,14 @@ final class DataFrameStatFunctions private[sql](df: DataFrame) { /** * Python friendly implementation for `freqItems` */ - def freqItems(cols: List[String], support: Double): DataFrame = { + def freqItems(cols: Seq[String], support: Double): DataFrame = { FrequentItems.singlePassFreqItems(df, cols, support) } /** * Python friendly implementation for `freqItems` with a default `support` of 1%. */ - def freqItems(cols: List[String]): DataFrame = { + def freqItems(cols: Seq[String]): DataFrame = { FrequentItems.singlePassFreqItems(df, cols, 0.01) }