Skip to content

Commit

Permalink
revert globbing to investigate possible problems (#5679)
Browse files Browse the repository at this point in the history
This reverts commit 041f9a71cb3ca3468aadcdc9990297f520526f27.

Co-authored-by: Michał Bartoszkiewicz <[email protected]>
GitOrigin-RevId: 972e19321c18c76d266fbec74ecc778bd5ea9965
  • Loading branch information
2 people authored and Manul from Pathway committed Feb 13, 2024
1 parent 0ffa2a4 commit db56bb0
Show file tree
Hide file tree
Showing 16 changed files with 287 additions and 347 deletions.
1 change: 0 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm

### Changed
- `pw.io.kafka.write` now does retries when sending to the output topic fails.
- `pw.io.csv.read`, `pw.io.jsonlines.read`, `pw.io.fs.read`, `pw.io.plaintext.read` now handle `path` as a glob pattern and read all matched files and directories recursively.

## [0.8.0] - 2024-02-01

Expand Down
7 changes: 2 additions & 5 deletions python/pathway/io/csv/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,7 @@ def read(
the modification time.
Args:
path: [glob](https://en.wikipedia.org/wiki/Glob_(programming)) pattern for the \
objects to be read. The connector will read the contents of all matching files as well \
as recursively read the contents of all matching folders.
path: Path to the file or to the folder with files.
value_columns: Names of the columns to be extracted from the files. [will be deprecated soon]
schema: Schema of the resulting table.
id_columns: In case the table should have a primary key generated according to
Expand All @@ -57,8 +55,7 @@ def read(
the other hand, the "static" mode will only consider the available data and ingest all \
of it in one commit. The default value is "streaming".
object_pattern: Unix shell style pattern for filtering only certain files in the \
directory. Ignored in case a path to a single file is specified. This value will be \
deprecated soon, please use glob pattern in ``path`` instead.
directory. Ignored in case a path to a single file is specified.
with_metadata: When set to true, the connector will add an additional column \
named ``_metadata`` to the table. This column will be a JSON field that will contain two \
optional fields - ``created_at`` and ``modified_at``. These fields will have integral \
Expand Down
22 changes: 4 additions & 18 deletions python/pathway/io/fs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

from __future__ import annotations

import warnings
from os import PathLike, fspath
from typing import Any

Expand Down Expand Up @@ -57,9 +56,7 @@ def read(
``data`` with each cell containing a single line from the file.
Args:
path: [glob](https://en.wikipedia.org/wiki/Glob_(programming)) pattern for the \
objects to be read. The connector will read the contents of all matching files as well \
as recursively read the contents of all matching folders.
path: Path to the file or to the folder with files.
format: Format of data to be read. Currently "csv", "json", "plaintext", \
"plaintext_by_file" and "binary" formats are supported. The difference between \
"plaintext" and "plaintext_by_file" is how the input is tokenized: if the "plaintext" \
Expand All @@ -82,8 +79,7 @@ def read(
where the path to be mapped needs to be a
`JSON Pointer (RFC 6901) <https://www.rfc-editor.org/rfc/rfc6901>`_.
object_pattern: Unix shell style pattern for filtering only certain files in the \
directory. Ignored in case a path to a single file is specified. This value will be \
deprecated soon, please use glob pattern in ``path`` instead.
directory. Ignored in case a path to a single file is specified.
with_metadata: When set to true, the connector will add an additional column \
named ``_metadata`` to the table. This column will be a JSON field that will contain two \
optional fields - ``created_at`` and ``modified_at``. These fields will have integral \
Expand Down Expand Up @@ -218,20 +214,10 @@ def read(
>>> t = pw.io.fs.read("raw_dataset/lines.txt", format="plaintext")
"""

path = fspath(path)

if object_pattern != "*":
warnings.warn(
"'object_pattern' is deprecated and will be removed soon. "
"Please use a glob pattern in `path` instead",
DeprecationWarning,
stacklevel=2,
)

if format == "csv":
data_storage = api.DataStorage(
storage_type="csv",
path=path,
path=fspath(path),
csv_parser_settings=csv_settings.api_settings if csv_settings else None,
mode=internal_connector_mode(mode),
object_pattern=object_pattern,
Expand All @@ -240,7 +226,7 @@ def read(
else:
data_storage = api.DataStorage(
storage_type="fs",
path=path,
path=fspath(path),
mode=internal_connector_mode(mode),
read_method=internal_read_method(format),
object_pattern=object_pattern,
Expand Down
7 changes: 2 additions & 5 deletions python/pathway/io/jsonlines/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,7 @@ def read(
the modification time.
Args:
path: [glob](https://en.wikipedia.org/wiki/Glob_(programming)) pattern for the \
objects to be read. The connector will read the contents of all matching files as well \
as recursively read the contents of all matching folders.
path: Path to the file or to the folder with files.
schema: Schema of the resulting table.
mode: Denotes how the engine polls the new data from the source. Currently \
"streaming" and "static" are supported. If set to "streaming" the engine will wait for \
Expand All @@ -55,8 +53,7 @@ def read(
``<field_name>: <path to be mapped>``, where the path to be mapped needs to be a
`JSON Pointer (RFC 6901) <https://www.rfc-editor.org/rfc/rfc6901>`_.
object_pattern: Unix shell style pattern for filtering only certain files in the \
directory. Ignored in case a path to a single file is specified. This value will be \
deprecated soon, please use glob pattern in ``path`` instead.
directory. Ignored in case a path to a single file is specified.
with_metadata: When set to true, the connector will add an additional column \
named ``_metadata`` to the table. This column will be a JSON field that will contain two \
optional fields - ``created_at`` and ``modified_at``. These fields will have integral \
Expand Down
7 changes: 2 additions & 5 deletions python/pathway/io/plaintext/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,7 @@ def read(
modification time is, the earlier the file will be passed to the engine.
Args:
path: [glob](https://en.wikipedia.org/wiki/Glob_(programming)) pattern for the \
objects to be read. The connector will read the contents of all matching files as well \
as recursively read the contents of all matching folders.
path: Path to a file or to a folder.
mode: Denotes how the engine polls the new data from the source. Currently \
"streaming" and "static" are supported. If set to "streaming" the engine will wait for \
the updates in the specified directory. It will track file additions, deletions, and \
Expand All @@ -42,8 +40,7 @@ def read(
the other hand, the "static" mode will only consider the available data and ingest all \
of it in one commit. The default value is "streaming".
object_pattern: Unix shell style pattern for filtering only certain files in the \
directory. Ignored in case a path to a single file is specified. This value will be \
deprecated soon, please use glob pattern in ``path`` instead.
directory. Ignored in case a path to a single file is specified.
with_metadata: When set to true, the connector will add an additional column \
named ``_metadata`` to the table. This column will be a JSON field that will contain two \
optional fields - ``created_at`` and ``modified_at``. These fields will have integral \
Expand Down
59 changes: 0 additions & 59 deletions python/pathway/tests/test_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -2824,62 +2824,3 @@ class InputSchema(pw.Schema):
schema=InputSchema,
delete_completed_queries=False,
)


def test_subdirectories(tmp_path: pathlib.Path):
nested_inputs_path = (
tmp_path / "nested_level_1" / "nested_level_2" / "nested_level_3"
)
os.makedirs(nested_inputs_path)
output_path = tmp_path / "output.json"
write_lines(nested_inputs_path / "a.txt", "a\nb\nc")

table = pw.io.plaintext.read(tmp_path / "nested_level_1", mode="static")
pw.io.jsonlines.write(table, output_path)
pw.run()

assert FileLinesNumberChecker(output_path, 3)()


def test_glob_pattern(tmp_path: pathlib.Path):
nested_inputs_path = (
tmp_path / "nested_level_1" / "nested_level_2" / "nested_level_3"
)
os.makedirs(nested_inputs_path)
output_path = tmp_path / "output.json"
write_lines(nested_inputs_path / "a.txt", "a\nb\nc")
write_lines(nested_inputs_path / "b.txt", "d\ne\nf\ng")

table = pw.io.plaintext.read(tmp_path / "nested_level_1/**/b.txt", mode="static")
pw.io.jsonlines.write(table, output_path)
pw.run()

assert FileLinesNumberChecker(output_path, 4)()


def test_glob_pattern_recurse_subdirs(tmp_path: pathlib.Path):
os.makedirs(tmp_path / "input" / "foo" / "level2")
write_lines(tmp_path / "input" / "foo" / "level2" / "a.txt", "a\nb\nc")
write_lines(tmp_path / "input" / "f1.txt", "d\ne\nf\ng")
write_lines(tmp_path / "input" / "bar.txt", "h\ni\nj\nk\nl")
output_path = tmp_path / "output.json"

table = pw.io.plaintext.read(tmp_path / "input/f*", mode="static")
pw.io.jsonlines.write(table, output_path)
pw.run()

assert FileLinesNumberChecker(output_path, 7)()


def test_glob_pattern_nothing_matched(tmp_path: pathlib.Path):
os.makedirs(tmp_path / "input" / "foo" / ".level2")
write_lines(tmp_path / "input" / "foo" / ".level2" / ".a.txt", "a\nb\nc")
write_lines(tmp_path / "input" / "f1.txt", "d\ne\nf\ng")
write_lines(tmp_path / "input" / "bar.txt", "h\ni\nj\nk\nl")
output_path = tmp_path / "output.json"

table = pw.io.plaintext.read(tmp_path / "input/f", mode="static")
pw.io.jsonlines.write(table, output_path)
pw.run()

assert FileLinesNumberChecker(output_path, 0)()
Loading

0 comments on commit db56bb0

Please sign in to comment.