diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index 45575f14fa798..3093de75a8fef 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -704,15 +704,16 @@ def pipe_objs(out): out.write(s.encode('utf-8')) out.close() Thread(target=pipe_objs, args=[pipe.stdin]).start() + def check_return_code(): pipe.wait() if pipe.returncode: raise Exception("Pipe function `%s' exited " - "with error code %d" % (command, pipe.returncode)) + "with error code %d" % (command, pipe.returncode)) else: return None return (x.rstrip(b'\n').decode('utf-8') for x in - chain(iter(pipe.stdout.readline, b''), iter(check_return_code, None))) + chain(iter(pipe.stdout.readline, b''), iter(check_return_code, None))) return self.mapPartitions(func) def foreach(self, f):