From cc14202e9c804068d1532ff143b06854266d3714 Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Mon, 28 Jul 2014 17:25:51 -0700 Subject: [PATCH] keep silent in worker if JVM close the socket --- python/pyspark/worker.py | 21 +++++++++++++-------- 1 file changed, 13 insertions(+), 8 deletions(-) diff --git a/python/pyspark/worker.py b/python/pyspark/worker.py index 24d41b12d1b1a..2770f63059853 100644 --- a/python/pyspark/worker.py +++ b/python/pyspark/worker.py @@ -75,14 +75,19 @@ def main(infile, outfile): init_time = time.time() iterator = deserializer.load_stream(infile) serializer.dump_stream(func(split_index, iterator), outfile) - except Exception as e: - # Write the error to stderr in addition to trying to pass it back to - # Java, in case it happened while serializing a record - print >> sys.stderr, "PySpark worker failed with exception:" - print >> sys.stderr, traceback.format_exc() - write_int(SpecialLengths.PYTHON_EXCEPTION_THROWN, outfile) - write_with_length(traceback.format_exc(), outfile) - sys.exit(-1) + except Exception: + try: + write_int(SpecialLengths.PYTHON_EXCEPTION_THROWN, outfile) + write_with_length(traceback.format_exc(), outfile) + outfile.flush() + except IOError: + # JVM close the socket + pass + except Exception: + # Write the error to stderr if it happened while serializing + print >> sys.stderr, "PySpark worker failed with exception:" + print >> sys.stderr, traceback.format_exc() + exit(-1) finish_time = time.time() report_times(outfile, boot_time, init_time, finish_time) # Mark the beginning of the accumulators section of the output