Skip to content

Commit

Permalink
[SPARK-30091][SQL][PYTHON] Document mergeSchema option directly in th…
Browse files Browse the repository at this point in the history
…e PySpark Parquet APIs

### What changes were proposed in this pull request?

This change properly documents the `mergeSchema` option directly in the Python APIs for reading Parquet data.

### Why are the changes needed?

The docstring for `DataFrameReader.parquet()` mentions `mergeSchema` but doesn't show it in the API. It seems like a simple oversight.

Before this PR, you'd have to do this to use `mergeSchema`:

```python
spark.read.option('mergeSchema', True).parquet('test-parquet').show()
```

After this PR, you can use the option as (I believe) it was intended to be used:

```python
spark.read.parquet('test-parquet', mergeSchema=True).show()
```

### Does this PR introduce any user-facing change?

Yes, this PR changes the signatures of `DataFrameReader.parquet()` and `DataStreamReader.parquet()` to match their docstrings.

### How was this patch tested?

Testing the `mergeSchema` option directly seems to be left to the Scala side of the codebase. I tested my change manually to confirm the API works.

I also confirmed that setting `spark.sql.parquet.mergeSchema` at the session does not get overridden by leaving `mergeSchema` at its default when calling `parquet()`:

```
>>> spark.conf.set('spark.sql.parquet.mergeSchema', True)
>>> spark.range(3).write.parquet('test-parquet/id')
>>> spark.range(3).withColumnRenamed('id', 'name').write.parquet('test-parquet/name')
>>> spark.read.option('recursiveFileLookup', True).parquet('test-parquet').show()
+----+----+
|  id|name|
+----+----+
|null|   1|
|null|   2|
|null|   0|
|   1|null|
|   2|null|
|   0|null|
+----+----+
>>> spark.read.option('recursiveFileLookup', True).parquet('test-parquet', mergeSchema=False).show()
+----+
|  id|
+----+
|null|
|null|
|null|
|   1|
|   2|
|   0|
+----+
```

Closes apache#26730 from nchammas/parquet-merge-schema.

Authored-by: Nicholas Chammas <[email protected]>
Signed-off-by: HyukjinKwon <[email protected]>
  • Loading branch information
nchammas authored and attilapiros committed Dec 6, 2019
1 parent e9f3eee commit 30e6c88
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 17 deletions.
14 changes: 7 additions & 7 deletions python/pyspark/sql/readwriter.py
Original file line number Diff line number Diff line change
Expand Up @@ -305,22 +305,22 @@ def table(self, tableName):

@since(1.4)
def parquet(self, *paths, **options):
"""Loads Parquet files, returning the result as a :class:`DataFrame`.
"""
Loads Parquet files, returning the result as a :class:`DataFrame`.
:param mergeSchema: sets whether we should merge schemas collected from all
Parquet part-files. This will override ``spark.sql.parquet.mergeSchema``.
The default value is specified in ``spark.sql.parquet.mergeSchema``.
:param recursiveFileLookup: recursively scan a directory for files. Using this option
disables `partition discovery`_.
You can set the following Parquet-specific option(s) for reading Parquet files:
* ``mergeSchema``: sets whether we should merge schemas collected from all \
Parquet part-files. This will override ``spark.sql.parquet.mergeSchema``. \
The default value is specified in ``spark.sql.parquet.mergeSchema``.
>>> df = spark.read.parquet('python/test_support/sql/parquet_partitioned')
>>> df.dtypes
[('name', 'string'), ('year', 'int'), ('month', 'int'), ('day', 'int')]
"""
mergeSchema = options.get('mergeSchema', None)
recursiveFileLookup = options.get('recursiveFileLookup', None)
self._set_opts(recursiveFileLookup=recursiveFileLookup)
self._set_opts(mergeSchema=mergeSchema, recursiveFileLookup=recursiveFileLookup)
return self._df(self._jreader.parquet(_to_seq(self._spark._sc, paths)))

@ignore_unicode_prefix
Expand Down
19 changes: 9 additions & 10 deletions python/pyspark/sql/streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -535,26 +535,25 @@ def orc(self, path, recursiveFileLookup=None):
raise TypeError("path can be only a single string")

@since(2.0)
def parquet(self, path, recursiveFileLookup=None):
"""Loads a Parquet file stream, returning the result as a :class:`DataFrame`.
def parquet(self, path, mergeSchema=None, recursiveFileLookup=None):
"""
Loads a Parquet file stream, returning the result as a :class:`DataFrame`.
.. note:: Evolving.
:param mergeSchema: sets whether we should merge schemas collected from all
Parquet part-files. This will override ``spark.sql.parquet.mergeSchema``.
The default value is specified in ``spark.sql.parquet.mergeSchema``.
:param recursiveFileLookup: recursively scan a directory for files. Using this option
disables `partition discovery`_.
You can set the following Parquet-specific option(s) for reading Parquet files:
* ``mergeSchema``: sets whether we should merge schemas collected from all \
Parquet part-files. This will override ``spark.sql.parquet.mergeSchema``. \
The default value is specified in ``spark.sql.parquet.mergeSchema``.
.. note:: Evolving.
>>> parquet_sdf = spark.readStream.schema(sdf_schema).parquet(tempfile.mkdtemp())
>>> parquet_sdf.isStreaming
True
>>> parquet_sdf.schema == sdf_schema
True
"""
self._set_opts(recursiveFileLookup=recursiveFileLookup)
self._set_opts(mergeSchema=mergeSchema, recursiveFileLookup=recursiveFileLookup)
if isinstance(path, basestring):
return self._df(self._jreader.parquet(path))
else:
Expand Down

0 comments on commit 30e6c88

Please sign in to comment.