Skip to content

Commit

Permalink
New approach for no value concept taken after NumPy
Browse files Browse the repository at this point in the history
  • Loading branch information
HyukjinKwon committed Feb 8, 2018
1 parent 30295bf commit a349d07
Show file tree
Hide file tree
Showing 4 changed files with 98 additions and 10 deletions.
1 change: 1 addition & 0 deletions python/pyspark/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
from pyspark.taskcontext import TaskContext
from pyspark.profiler import Profiler, BasicProfiler
from pyspark.version import __version__
from pyspark._globals import _NoValue


def since(version):
Expand Down
70 changes: 70 additions & 0 deletions python/pyspark/_globals.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

"""
Module defining global singleton classes.
This module raises a RuntimeError if an attempt to reload it is made. In that
way the identities of the classes defined here are fixed and will remain so
even if pyspark itself is reloaded. In particular, a function like the following
will still work correctly after pyspark is reloaded:
def foo(arg=pyspark._NoValue):
if arg is pyspark._NoValue:
...
See gh-7844 for a discussion of the reload problem that motivated this module.
Note that this approach is taken after from NumPy.
"""

__ALL__ = ['_NoValue']


# Disallow reloading this module so as to preserve the identities of the
# classes defined here.
if '_is_loaded' in globals():
raise RuntimeError('Reloading pyspark._globals is not allowed')
_is_loaded = True


class _NoValueType(object):
"""Special keyword value.
The instance of this class may be used as the default value assigned to a
deprecated keyword in order to check if it has been given a user defined
value.
This class was copied from NumPy.
"""
__instance = None

def __new__(cls):
# ensure that only one instance exists
if not cls.__instance:
cls.__instance = super(_NoValueType, cls).__new__(cls)
return cls.__instance

# needed for python 2 to preserve identity through a pickle
def __reduce__(self):
return (self.__class__, ())

def __repr__(self):
return "<no value>"


_NoValue = _NoValueType()
26 changes: 21 additions & 5 deletions python/pyspark/sql/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@

import warnings

from pyspark import copy_func, since
from pyspark import copy_func, since, _NoValue
from pyspark.rdd import RDD, _load_from_socket, ignore_unicode_prefix
from pyspark.serializers import ArrowSerializer, BatchedSerializer, PickleSerializer, \
UTF8Deserializer
Expand Down Expand Up @@ -1532,7 +1532,7 @@ def fillna(self, value, subset=None):
return DataFrame(self._jdf.na().fill(value, self._jseq(subset)), self.sql_ctx)

@since(1.4)
def replace(self, to_replace, value=None, subset=None):
def replace(self, to_replace, value=_NoValue, subset=None):
"""Returns a new :class:`DataFrame` replacing a value with another value.
:func:`DataFrame.replace` and :func:`DataFrameNaFunctions.replace` are
aliases of each other.
Expand All @@ -1545,8 +1545,8 @@ def replace(self, to_replace, value=None, subset=None):
:param to_replace: bool, int, long, float, string, list or dict.
Value to be replaced.
If the value is a dict, then `value` is ignored and `to_replace` must be a
mapping between a value and a replacement.
If the value is a dict, then `value` is ignored or can be omitted, and `to_replace`
must be a mapping between a value and a replacement.
:param value: bool, int, long, float, string, list or None.
The replacement value must be a bool, int, long, float, string or None. If `value` is a
list, `value` should be of the same length and type as `to_replace`.
Expand Down Expand Up @@ -1577,6 +1577,16 @@ def replace(self, to_replace, value=None, subset=None):
|null| null|null|
+----+------+----+
>>> df4.na.replace({'Alice': None}).show()
+----+------+----+
| age|height|name|
+----+------+----+
| 10| 80|null|
| 5| null| Bob|
|null| null| Tom|
|null| null|null|
+----+------+----+
>>> df4.na.replace(['Alice', 'Bob'], ['A', 'B'], 'name').show()
+----+------+----+
| age|height|name|
Expand All @@ -1587,6 +1597,12 @@ def replace(self, to_replace, value=None, subset=None):
|null| null|null|
+----+------+----+
"""
if value is _NoValue:
if isinstance(to_replace, dict):
value = None
else:
raise TypeError("value argument is required when to_replace is not a dictionary.")

# Helper functions
def all_of(types):
"""Given a type or tuple of types and a sequence of xs
Expand Down Expand Up @@ -2047,7 +2063,7 @@ def fill(self, value, subset=None):

fill.__doc__ = DataFrame.fillna.__doc__

def replace(self, to_replace, value, subset=None):
def replace(self, to_replace, value=_NoValue, subset=None):
return self.df.replace(to_replace, value, subset)

replace.__doc__ = DataFrame.replace.__doc__
Expand Down
11 changes: 6 additions & 5 deletions python/pyspark/sql/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -2243,11 +2243,6 @@ def test_replace(self):
.replace(False, True).first())
self.assertTupleEqual(row, (True, True))

# replace list while value is not given (default to None)
row = self.spark.createDataFrame(
[(u'Alice', 10, 80.0)], schema).replace(["Alice", "Bob"]).first()
self.assertTupleEqual(row, (None, 10, 80.0))

# replace string with None and then drop None rows
row = self.spark.createDataFrame(
[(u'Alice', 10, 80.0)], schema).replace(u'Alice', None).dropna()
Expand Down Expand Up @@ -2283,6 +2278,12 @@ def test_replace(self):
self.spark.createDataFrame(
[(u'Alice', 10, 80.1)], schema).replace({u"Alice": u"Bob", 10: 20}).first()

with self.assertRaisesRegexp(
TypeError,
'value argument is required when to_replace is not a dictionary.'):
self.spark.createDataFrame(
[(u'Alice', 10, 80.0)], schema).replace(["Alice", "Bob"]).first()

def test_capture_analysis_exception(self):
self.assertRaises(AnalysisException, lambda: self.spark.sql("select abc"))
self.assertRaises(AnalysisException, lambda: self.df.selectExpr("a + b"))
Expand Down

0 comments on commit a349d07

Please sign in to comment.