From 0c068aba488cf0944a6b8cda5968ae46bdc2b168 Mon Sep 17 00:00:00 2001 From: Sergey Kulik <104143901+zxqfd555-pw@users.noreply.github.com> Date: Mon, 12 Feb 2024 18:26:19 +0100 Subject: [PATCH] use glob patterns instead of paths in fs connectors (#5370) GitOrigin-RevId: 041f9a71cb3ca3468aadcdc9990297f520526f27 --- CHANGELOG.md | 1 + python/pathway/io/csv/__init__.py | 7 +- python/pathway/io/fs/__init__.py | 22 +- python/pathway/io/jsonlines/__init__.py | 7 +- python/pathway/io/plaintext/__init__.py | 7 +- python/pathway/tests/test_io.py | 59 +++++ src/connectors/data_storage.rs | 195 ++++++++--------- src/python_api.rs | 205 ++++++++++-------- tests/integration/test_bytes.rs | 11 +- .../test_connector_field_defaults.rs | 17 +- tests/integration/test_debezium.rs | 5 +- tests/integration/test_dsv.rs | 17 +- tests/integration/test_dsv_dir.rs | 21 +- tests/integration/test_jsonlines.rs | 23 +- tests/integration/test_metadata.rs | 17 +- tests/integration/test_seek.rs | 20 +- 16 files changed, 347 insertions(+), 287 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 161ae71d..bf7ab85a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm ### Changed - `pw.io.kafka.write` now does retries when sending to the output topic fails. +- `pw.io.csv.read`, `pw.io.jsonlines.read`, `pw.io.fs.read`, `pw.io.plaintext.read` now handle `path` as a glob pattern and read all matched files and directories recursively. ## [0.8.0] - 2024-02-01 diff --git a/python/pathway/io/csv/__init__.py b/python/pathway/io/csv/__init__.py index 70e1e832..28c90889 100644 --- a/python/pathway/io/csv/__init__.py +++ b/python/pathway/io/csv/__init__.py @@ -40,7 +40,9 @@ def read( the modification time. Args: - path: Path to the file or to the folder with files. + path: [glob](https://en.wikipedia.org/wiki/Glob_(programming)) pattern for the \ +objects to be read. The connector will read the contents of all matching files as well \ +as recursively read the contents of all matching folders. value_columns: Names of the columns to be extracted from the files. [will be deprecated soon] schema: Schema of the resulting table. id_columns: In case the table should have a primary key generated according to @@ -55,7 +57,8 @@ def read( the other hand, the "static" mode will only consider the available data and ingest all \ of it in one commit. The default value is "streaming". object_pattern: Unix shell style pattern for filtering only certain files in the \ -directory. Ignored in case a path to a single file is specified. +directory. Ignored in case a path to a single file is specified. This value will be \ +deprecated soon, please use glob pattern in ``path`` instead. with_metadata: When set to true, the connector will add an additional column \ named ``_metadata`` to the table. This column will be a JSON field that will contain two \ optional fields - ``created_at`` and ``modified_at``. These fields will have integral \ diff --git a/python/pathway/io/fs/__init__.py b/python/pathway/io/fs/__init__.py index 2e674dc4..688d95c3 100644 --- a/python/pathway/io/fs/__init__.py +++ b/python/pathway/io/fs/__init__.py @@ -2,6 +2,7 @@ from __future__ import annotations +import warnings from os import PathLike, fspath from typing import Any @@ -56,7 +57,9 @@ def read( ``data`` with each cell containing a single line from the file. Args: - path: Path to the file or to the folder with files. + path: [glob](https://en.wikipedia.org/wiki/Glob_(programming)) pattern for the \ +objects to be read. The connector will read the contents of all matching files as well \ +as recursively read the contents of all matching folders. format: Format of data to be read. Currently "csv", "json", "plaintext", \ "plaintext_by_file" and "binary" formats are supported. The difference between \ "plaintext" and "plaintext_by_file" is how the input is tokenized: if the "plaintext" \ @@ -79,7 +82,8 @@ def read( where the path to be mapped needs to be a `JSON Pointer (RFC 6901) `_. object_pattern: Unix shell style pattern for filtering only certain files in the \ -directory. Ignored in case a path to a single file is specified. +directory. Ignored in case a path to a single file is specified. This value will be \ +deprecated soon, please use glob pattern in ``path`` instead. with_metadata: When set to true, the connector will add an additional column \ named ``_metadata`` to the table. This column will be a JSON field that will contain two \ optional fields - ``created_at`` and ``modified_at``. These fields will have integral \ @@ -214,10 +218,20 @@ def read( >>> t = pw.io.fs.read("raw_dataset/lines.txt", format="plaintext") """ + path = fspath(path) + + if object_pattern != "*": + warnings.warn( + "'object_pattern' is deprecated and will be removed soon. " + "Please use a glob pattern in `path` instead", + DeprecationWarning, + stacklevel=2, + ) + if format == "csv": data_storage = api.DataStorage( storage_type="csv", - path=fspath(path), + path=path, csv_parser_settings=csv_settings.api_settings if csv_settings else None, mode=internal_connector_mode(mode), object_pattern=object_pattern, @@ -226,7 +240,7 @@ def read( else: data_storage = api.DataStorage( storage_type="fs", - path=fspath(path), + path=path, mode=internal_connector_mode(mode), read_method=internal_read_method(format), object_pattern=object_pattern, diff --git a/python/pathway/io/jsonlines/__init__.py b/python/pathway/io/jsonlines/__init__.py index 10f62a5e..3b9a3846 100644 --- a/python/pathway/io/jsonlines/__init__.py +++ b/python/pathway/io/jsonlines/__init__.py @@ -39,7 +39,9 @@ def read( the modification time. Args: - path: Path to the file or to the folder with files. + path: [glob](https://en.wikipedia.org/wiki/Glob_(programming)) pattern for the \ +objects to be read. The connector will read the contents of all matching files as well \ +as recursively read the contents of all matching folders. schema: Schema of the resulting table. mode: Denotes how the engine polls the new data from the source. Currently \ "streaming" and "static" are supported. If set to "streaming" the engine will wait for \ @@ -53,7 +55,8 @@ def read( ``: ``, where the path to be mapped needs to be a `JSON Pointer (RFC 6901) `_. object_pattern: Unix shell style pattern for filtering only certain files in the \ -directory. Ignored in case a path to a single file is specified. +directory. Ignored in case a path to a single file is specified. This value will be \ +deprecated soon, please use glob pattern in ``path`` instead. with_metadata: When set to true, the connector will add an additional column \ named ``_metadata`` to the table. This column will be a JSON field that will contain two \ optional fields - ``created_at`` and ``modified_at``. These fields will have integral \ diff --git a/python/pathway/io/plaintext/__init__.py b/python/pathway/io/plaintext/__init__.py index 40c61efc..56f04f4b 100644 --- a/python/pathway/io/plaintext/__init__.py +++ b/python/pathway/io/plaintext/__init__.py @@ -31,7 +31,9 @@ def read( modification time is, the earlier the file will be passed to the engine. Args: - path: Path to a file or to a folder. + path: [glob](https://en.wikipedia.org/wiki/Glob_(programming)) pattern for the \ +objects to be read. The connector will read the contents of all matching files as well \ +as recursively read the contents of all matching folders. mode: Denotes how the engine polls the new data from the source. Currently \ "streaming" and "static" are supported. If set to "streaming" the engine will wait for \ the updates in the specified directory. It will track file additions, deletions, and \ @@ -40,7 +42,8 @@ def read( the other hand, the "static" mode will only consider the available data and ingest all \ of it in one commit. The default value is "streaming". object_pattern: Unix shell style pattern for filtering only certain files in the \ -directory. Ignored in case a path to a single file is specified. +directory. Ignored in case a path to a single file is specified. This value will be \ +deprecated soon, please use glob pattern in ``path`` instead. with_metadata: When set to true, the connector will add an additional column \ named ``_metadata`` to the table. This column will be a JSON field that will contain two \ optional fields - ``created_at`` and ``modified_at``. These fields will have integral \ diff --git a/python/pathway/tests/test_io.py b/python/pathway/tests/test_io.py index d0bbdc3c..ebf7de72 100644 --- a/python/pathway/tests/test_io.py +++ b/python/pathway/tests/test_io.py @@ -2824,3 +2824,62 @@ class InputSchema(pw.Schema): schema=InputSchema, delete_completed_queries=False, ) + + +def test_subdirectories(tmp_path: pathlib.Path): + nested_inputs_path = ( + tmp_path / "nested_level_1" / "nested_level_2" / "nested_level_3" + ) + os.makedirs(nested_inputs_path) + output_path = tmp_path / "output.json" + write_lines(nested_inputs_path / "a.txt", "a\nb\nc") + + table = pw.io.plaintext.read(tmp_path / "nested_level_1", mode="static") + pw.io.jsonlines.write(table, output_path) + pw.run() + + assert FileLinesNumberChecker(output_path, 3)() + + +def test_glob_pattern(tmp_path: pathlib.Path): + nested_inputs_path = ( + tmp_path / "nested_level_1" / "nested_level_2" / "nested_level_3" + ) + os.makedirs(nested_inputs_path) + output_path = tmp_path / "output.json" + write_lines(nested_inputs_path / "a.txt", "a\nb\nc") + write_lines(nested_inputs_path / "b.txt", "d\ne\nf\ng") + + table = pw.io.plaintext.read(tmp_path / "nested_level_1/**/b.txt", mode="static") + pw.io.jsonlines.write(table, output_path) + pw.run() + + assert FileLinesNumberChecker(output_path, 4)() + + +def test_glob_pattern_recurse_subdirs(tmp_path: pathlib.Path): + os.makedirs(tmp_path / "input" / "foo" / "level2") + write_lines(tmp_path / "input" / "foo" / "level2" / "a.txt", "a\nb\nc") + write_lines(tmp_path / "input" / "f1.txt", "d\ne\nf\ng") + write_lines(tmp_path / "input" / "bar.txt", "h\ni\nj\nk\nl") + output_path = tmp_path / "output.json" + + table = pw.io.plaintext.read(tmp_path / "input/f*", mode="static") + pw.io.jsonlines.write(table, output_path) + pw.run() + + assert FileLinesNumberChecker(output_path, 7)() + + +def test_glob_pattern_nothing_matched(tmp_path: pathlib.Path): + os.makedirs(tmp_path / "input" / "foo" / ".level2") + write_lines(tmp_path / "input" / "foo" / ".level2" / ".a.txt", "a\nb\nc") + write_lines(tmp_path / "input" / "f1.txt", "d\ne\nf\ng") + write_lines(tmp_path / "input" / "bar.txt", "h\ni\nj\nk\nl") + output_path = tmp_path / "output.json" + + table = pw.io.plaintext.read(tmp_path / "input/f", mode="static") + pw.io.jsonlines.write(table, output_path) + pw.run() + + assert FileLinesNumberChecker(output_path, 0)() diff --git a/src/connectors/data_storage.rs b/src/connectors/data_storage.rs index 7e69b1e3..91f0d096 100644 --- a/src/connectors/data_storage.rs +++ b/src/connectors/data_storage.rs @@ -11,7 +11,6 @@ use std::collections::HashSet; use std::collections::VecDeque; use std::env; use std::fmt::Debug; -use std::fs::DirEntry; use std::fs::File; use std::io; use std::io::BufRead; @@ -26,7 +25,7 @@ use std::str::{from_utf8, Utf8Error}; use std::sync::Arc; use std::thread; use std::thread::sleep; -use std::time::{Duration, SystemTime, UNIX_EPOCH}; +use std::time::{Duration, SystemTime}; use chrono::{DateTime, FixedOffset}; use log::{error, warn}; @@ -73,9 +72,12 @@ use serde::{Deserialize, Serialize}; mod inotify_support { use inotify::WatchMask; use std::path::Path; + use std::thread::sleep; + use std::time::Duration; pub use inotify::Inotify; + #[allow(dead_code)] pub fn subscribe_inotify(path: impl AsRef) -> Option { let inotify = Inotify::init().ok()?; @@ -96,11 +98,17 @@ mod inotify_support { Some(inotify) } - pub fn wait(inotify: &mut Inotify) -> Option<()> { - inotify - .read_events_blocking(&mut [0; 1024]) - .ok() - .map(|_events| ()) + #[allow(clippy::unnecessary_wraps)] + pub fn wait(_inotify: &mut Inotify) -> Option<()> { + sleep(Duration::from_millis(500)); + None + + // Commented out due to using recursive subdirs + // + // inotify + // .read_events_blocking(&mut [0; 1024]) + // .ok() + // .map(|_events| ()) } } @@ -475,7 +483,7 @@ pub struct FilesystemReader { impl FilesystemReader { pub fn new( - path: impl Into, + path: &str, streaming_mode: ConnectorMode, persistent_id: Option, read_method: ReadMethod, @@ -741,11 +749,10 @@ enum PosixScannerAction { #[derive(Debug)] struct FilesystemScanner { - path: PathBuf, + path: GlobPattern, cache_directory_path: Option, - - is_directory: bool, streaming_mode: ConnectorMode, + object_pattern: String, // Mapping from the path of the loaded file to its modification timestamp known_files: HashMap, @@ -753,7 +760,6 @@ struct FilesystemScanner { current_action: Option, cached_modify_times: HashMap>, inotify: Option, - object_pattern: GlobPattern, next_file_for_insertion: Option, cached_metadata: HashMap>, @@ -764,22 +770,16 @@ struct FilesystemScanner { impl FilesystemScanner { fn new( - path: impl Into, + path: &str, persistent_id: Option, streaming_mode: ConnectorMode, object_pattern: &str, ) -> Result { - let mut path = path.into(); - if path.exists() || matches!(streaming_mode, ConnectorMode::Static) { - path = std::fs::canonicalize(path)?; - } + let path_glob = GlobPattern::new(path)?; - let is_directory = path.is_dir(); - let inotify = if streaming_mode.is_polling_enabled() { - inotify_support::subscribe_inotify(&path) - } else { - None - }; + // Alternative solution here is to do inotify_support::subscribe_inotify(path) + // if streaming mode allows polling. + let inotify = None; let (cache_directory_path, connector_tmp_storage) = { if streaming_mode.are_deletions_enabled() { @@ -805,16 +805,15 @@ impl FilesystemScanner { }; Ok(Self { - path, - is_directory, + path: path_glob, streaming_mode, cache_directory_path, + object_pattern: object_pattern.to_string(), known_files: HashMap::new(), current_action: None, cached_modify_times: HashMap::new(), inotify, - object_pattern: GlobPattern::new(object_pattern)?, next_file_for_insertion: None, cached_metadata: HashMap::new(), _connector_tmp_storage: connector_tmp_storage, @@ -859,31 +858,17 @@ impl FilesystemScanner { } } - fn seek_to_file(&mut self, seek_file_path: &Path) -> io::Result<()> { + fn seek_to_file(&mut self, seek_file_path: &Path) -> Result<(), ReadError> { if self.streaming_mode.are_deletions_enabled() { warn!("seek for snapshot mode may not work correctly in case deletions take place"); } self.known_files.clear(); - if !self.is_directory { - self.current_action = Some(PosixScannerAction::Read(Arc::new( - seek_file_path.to_path_buf(), - ))); - let modify_system_time = std::fs::metadata(seek_file_path)?.modified().unwrap(); - let modify_unix_timestamp = modify_system_time - .duration_since(UNIX_EPOCH) - .expect("File modified earlier than UNIX epoch") - .as_secs(); - self.known_files - .insert(seek_file_path.to_path_buf(), modify_unix_timestamp); - return Ok(()); - } - let target_modify_time = match std::fs::metadata(seek_file_path) { Ok(metadata) => metadata.modified()?, Err(e) => { if !matches!(e.kind(), std::io::ErrorKind::NotFound) { - return Err(e); + return Err(ReadError::Io(e)); } warn!( "Unable to restore state: last persisted file {seek_file_path:?} not found in directory. Processing all files in directory." @@ -891,24 +876,20 @@ impl FilesystemScanner { return Ok(()); } }; - let files_in_directory = std::fs::read_dir(self.path.as_path())?; - for entry in files_in_directory.flatten() { - if let Ok(file_type) = entry.file_type() { - if !file_type.is_file() { - continue; - } - - let Some(modify_time) = self.modify_time(&entry) else { - continue; - }; - let current_path = entry.path(); - if (modify_time, current_path.as_path()) <= (target_modify_time, seek_file_path) { - let modify_timestamp = modify_time - .duration_since(SystemTime::UNIX_EPOCH) - .expect("System time should be after the Unix epoch") - .as_secs(); - self.known_files.insert(current_path, modify_timestamp); - } + let matching_files: Vec = self.get_matching_file_paths()?; + for entry in matching_files { + if !entry.is_file() { + continue; + } + let Some(modify_time) = self.modify_time(&entry) else { + continue; + }; + if (modify_time, entry.as_path()) <= (target_modify_time, seek_file_path) { + let modify_timestamp = modify_time + .duration_since(SystemTime::UNIX_EPOCH) + .expect("System time should be after the Unix epoch") + .as_secs(); + self.known_files.insert(entry, modify_timestamp); } } self.current_action = Some(PosixScannerAction::Read(Arc::new( @@ -918,7 +899,7 @@ impl FilesystemScanner { Ok(()) } - fn modify_time(&mut self, entry: &DirEntry) -> Option { + fn modify_time(&mut self, entry: &Path) -> Option { if self.streaming_mode.are_deletions_enabled() { // If deletions are enabled, we also need to handle the case when the modification // time of an entry changes. Hence, we can't just memorize it once. @@ -926,7 +907,7 @@ impl FilesystemScanner { } else { *self .cached_modify_times - .entry(entry.path()) + .entry(entry.to_path_buf()) .or_insert_with(|| entry.metadata().ok()?.modified().ok()) } } @@ -939,7 +920,7 @@ impl FilesystemScanner { /// a `FinishedSource` event when we've had a scheduled action but the /// corresponding file was deleted before we were able to execute this scheduled action. /// scheduled action. - fn next_action_determined(&mut self) -> io::Result> { + fn next_action_determined(&mut self) -> Result, ReadError> { // Finalize the current processing action if let Some(PosixScannerAction::Delete(path)) = take(&mut self.current_action) { let cached_path = self @@ -1039,58 +1020,56 @@ impl FilesystemScanner { }) } - fn next_insertion_entry(&mut self) -> io::Result> { - let mut selected_file: Option<(PathBuf, SystemTime)> = None; - if self.is_directory { - let files_in_directory = std::fs::read_dir(self.path.as_path())?; + fn get_matching_file_paths(&self) -> Result, ReadError> { + let mut result = Vec::new(); - for entry in files_in_directory.flatten() { - if let Ok(file_type) = entry.file_type() { - if !file_type.is_file() { - continue; - } + let file_and_folder_paths = glob::glob(self.path.as_str())?.flatten(); + for entry in file_and_folder_paths { + // If an entry is a file, it should just be added to result + if entry.is_file() { + result.push(entry); + continue; + } - let Some(modify_time) = self.modify_time(&entry) else { - continue; - }; + // Otherwise scan all files in all subdirectories and add them + let Some(path) = entry.to_str() else { + error!("Non-unicode paths are not supported. Ignoring: {entry:?}"); + continue; + }; - let current_path = entry.path(); + let folder_scan_pattern = format!("{path}/**/{}", self.object_pattern); + let folder_contents = glob::glob(&folder_scan_pattern)?.flatten(); + for nested_entry in folder_contents { + if nested_entry.is_file() { + result.push(nested_entry); + } + } + } - if let Some(file_name) = entry.file_name().to_str() { - if !self.object_pattern.matches(file_name) { - continue; - } - } else { - warn!( - "Failed to parse file name (non-unicode): {:?}. It will be ignored", - entry.file_name() - ); - continue; - } + Ok(result) + } + fn next_insertion_entry(&mut self) -> Result, ReadError> { + let matching_files: Vec = self.get_matching_file_paths()?; + let mut selected_file: Option<(PathBuf, SystemTime)> = None; + for entry in matching_files { + if !entry.is_file() || self.known_files.contains_key(&(*entry)) { + continue; + } + + let Some(modify_time) = self.modify_time(&entry) else { + continue; + }; + + match &selected_file { + Some((currently_selected_name, selected_file_created_at)) => { + if (selected_file_created_at, currently_selected_name) > (&modify_time, &entry) { - if self.known_files.contains_key(&(*current_path)) { - continue; - } - match &selected_file { - Some((currently_selected_name, selected_file_created_at)) => { - if (selected_file_created_at, currently_selected_name) - > (&modify_time, ¤t_path) - { - selected_file = Some((current_path, modify_time)); - } - } - None => selected_file = Some((current_path, modify_time)), - } + selected_file = Some((entry, modify_time)); } } + None => selected_file = Some((entry, modify_time)), } - } else { - let is_existing_file = self.path.exists() && self.path.is_file(); - if !self.known_files.is_empty() || !is_existing_file { - return Ok(None); - } - selected_file = Some((self.path.clone(), SystemTime::now())); } match selected_file { @@ -1169,14 +1148,14 @@ pub struct CsvFilesystemReader { impl CsvFilesystemReader { pub fn new( - path: impl Into, + path: &str, parser_builder: csv::ReaderBuilder, streaming_mode: ConnectorMode, persistent_id: Option, object_pattern: &str, ) -> Result { let filesystem_scanner = - FilesystemScanner::new(path.into(), persistent_id, streaming_mode, object_pattern)?; + FilesystemScanner::new(path, persistent_id, streaming_mode, object_pattern)?; Ok(CsvFilesystemReader { parser_builder, persistent_id, diff --git a/src/python_api.rs b/src/python_api.rs index b1dd0e15..489a7b92 100644 --- a/src/python_api.rs +++ b/src/python_api.rs @@ -3636,107 +3636,120 @@ impl DataStorage { .map(IntoPersistentId::into_persistent_id) } - fn construct_reader(&self, py: pyo3::Python) -> PyResult<(Box, usize)> { - match self.storage_type.as_ref() { - "fs" => { - let storage = FilesystemReader::new( - self.path()?, - self.mode, - self.internal_persistent_id(), - self.read_method, - &self.object_pattern, - ) - .map_err(|e| { - PyIOError::new_err(format!("Failed to initialize Filesystem reader: {e}")) - })?; - Ok((Box::new(storage), 1)) - } - "s3" => { - let (_, deduced_path) = AwsS3Settings::deduce_bucket_and_path(self.path()?); - let storage = S3GenericReader::new( - self.s3_bucket(py)?, - deduced_path.unwrap_or(self.path()?.to_string()), - self.mode.is_polling_enabled(), - self.internal_persistent_id(), - self.read_method, - ) - .map_err(|e| PyRuntimeError::new_err(format!("Creating S3 reader failed: {e}")))?; - Ok((Box::new(storage), 1)) - } - "s3_csv" => { - let (_, deduced_path) = AwsS3Settings::deduce_bucket_and_path(self.path()?); - let storage = S3CsvReader::new( - self.s3_bucket(py)?, - deduced_path.unwrap_or(self.path()?.to_string()), - self.build_csv_parser_settings(py), - self.mode.is_polling_enabled(), - self.internal_persistent_id(), - ) - .map_err(|e| PyRuntimeError::new_err(format!("Creating S3 reader failed: {e}")))?; - Ok((Box::new(storage), 1)) - } - "csv" => { - let reader = CsvFilesystemReader::new( - self.path()?, - self.build_csv_parser_settings(py), - self.mode, - self.internal_persistent_id(), - &self.object_pattern, - ) - .map_err(|e| { - PyIOError::new_err(format!("Failed to initialize CsvFilesystem reader: {e}")) - })?; - Ok((Box::new(reader), 1)) - } - "kafka" => { - let client_config = self.kafka_client_config()?; + fn construct_fs_reader(&self) -> PyResult<(Box, usize)> { + let storage = FilesystemReader::new( + self.path()?, + self.mode, + self.internal_persistent_id(), + self.read_method, + &self.object_pattern, + ) + .map_err(|e| PyIOError::new_err(format!("Failed to initialize Filesystem reader: {e}")))?; + Ok((Box::new(storage), 1)) + } + + fn construct_s3_reader(&self, py: pyo3::Python) -> PyResult<(Box, usize)> { + let (_, deduced_path) = AwsS3Settings::deduce_bucket_and_path(self.path()?); + let storage = S3GenericReader::new( + self.s3_bucket(py)?, + deduced_path.unwrap_or(self.path()?.to_string()), + self.mode.is_polling_enabled(), + self.internal_persistent_id(), + self.read_method, + ) + .map_err(|e| PyRuntimeError::new_err(format!("Creating S3 reader failed: {e}")))?; + Ok((Box::new(storage), 1)) + } - let consumer: BaseConsumer = client_config.create().map_err(|e| { - PyValueError::new_err(format!("Creating Kafka consumer failed: {e}")) - })?; + fn construct_s3_csv_reader( + &self, + py: pyo3::Python, + ) -> PyResult<(Box, usize)> { + let (_, deduced_path) = AwsS3Settings::deduce_bucket_and_path(self.path()?); + let storage = S3CsvReader::new( + self.s3_bucket(py)?, + deduced_path.unwrap_or(self.path()?.to_string()), + self.build_csv_parser_settings(py), + self.mode.is_polling_enabled(), + self.internal_persistent_id(), + ) + .map_err(|e| PyRuntimeError::new_err(format!("Creating S3 reader failed: {e}")))?; + Ok((Box::new(storage), 1)) + } - let topic = self.kafka_topic()?; - consumer.subscribe(&[topic]).map_err(|e| { - PyIOError::new_err(format!("Subscription to Kafka topic failed: {e}")) - })?; + fn construct_csv_reader(&self, py: pyo3::Python) -> PyResult<(Box, usize)> { + let reader = CsvFilesystemReader::new( + self.path()?, + self.build_csv_parser_settings(py), + self.mode, + self.internal_persistent_id(), + &self.object_pattern, + ) + .map_err(|e| { + PyIOError::new_err(format!("Failed to initialize CsvFilesystem reader: {e}")) + })?; + Ok((Box::new(reader), 1)) + } - let reader = - KafkaReader::new(consumer, topic.to_string(), self.internal_persistent_id()); - Ok((Box::new(reader), self.parallel_readers.unwrap_or(256))) - } - "python" => { - let subject = self.python_subject.clone().ok_or_else(|| { - PyValueError::new_err( - "For Python connector, python_subject should be specified", - ) - })?; + fn construct_kafka_reader(&self) -> PyResult<(Box, usize)> { + let client_config = self.kafka_client_config()?; - if subject.borrow(py).is_internal && self.persistent_id.is_some() { - return Err(PyValueError::new_err( - "Python connectors marked internal can't have persistent id", - )); - } + let consumer: BaseConsumer = client_config + .create() + .map_err(|e| PyValueError::new_err(format!("Creating Kafka consumer failed: {e}")))?; - let reader = PythonReaderBuilder::new(subject, self.internal_persistent_id()); - Ok((Box::new(reader), 1)) - } - "sqlite" => { - let connection = SqliteConnection::open_with_flags( - self.path()?, - SqliteOpenFlags::SQLITE_OPEN_READ_ONLY | SqliteOpenFlags::SQLITE_OPEN_NO_MUTEX, - ) - .map_err(|e| { - PyRuntimeError::new_err(format!("Failed to open Sqlite connection: {e}")) - })?; - let table_name = self.table_name.clone().ok_or_else(|| { - PyValueError::new_err("For Sqlite connector, table_name should be specified") - })?; - let column_names = self.column_names.clone().ok_or_else(|| { - PyValueError::new_err("For Sqlite connector, column_names should be specified") - })?; - let reader = SqliteReader::new(connection, table_name, column_names); - Ok((Box::new(reader), 1)) - } + let topic = self.kafka_topic()?; + consumer + .subscribe(&[topic]) + .map_err(|e| PyIOError::new_err(format!("Subscription to Kafka topic failed: {e}")))?; + + let reader = KafkaReader::new(consumer, topic.to_string(), self.internal_persistent_id()); + Ok((Box::new(reader), self.parallel_readers.unwrap_or(256))) + } + + fn construct_python_reader( + &self, + py: pyo3::Python, + ) -> PyResult<(Box, usize)> { + let subject = self.python_subject.clone().ok_or_else(|| { + PyValueError::new_err("For Python connector, python_subject should be specified") + })?; + + if subject.borrow(py).is_internal && self.persistent_id.is_some() { + return Err(PyValueError::new_err( + "Python connectors marked internal can't have persistent id", + )); + } + + let reader = PythonReaderBuilder::new(subject, self.internal_persistent_id()); + Ok((Box::new(reader), 1)) + } + + fn construct_sqlite_reader(&self) -> PyResult<(Box, usize)> { + let connection = SqliteConnection::open_with_flags( + self.path()?, + SqliteOpenFlags::SQLITE_OPEN_READ_ONLY | SqliteOpenFlags::SQLITE_OPEN_NO_MUTEX, + ) + .map_err(|e| PyRuntimeError::new_err(format!("Failed to open Sqlite connection: {e}")))?; + let table_name = self.table_name.clone().ok_or_else(|| { + PyValueError::new_err("For Sqlite connector, table_name should be specified") + })?; + let column_names = self.column_names.clone().ok_or_else(|| { + PyValueError::new_err("For Sqlite connector, column_names should be specified") + })?; + let reader = SqliteReader::new(connection, table_name, column_names); + Ok((Box::new(reader), 1)) + } + + fn construct_reader(&self, py: pyo3::Python) -> PyResult<(Box, usize)> { + match self.storage_type.as_ref() { + "fs" => self.construct_fs_reader(), + "s3" => self.construct_s3_reader(py), + "s3_csv" => self.construct_s3_csv_reader(py), + "csv" => self.construct_csv_reader(py), + "kafka" => self.construct_kafka_reader(), + "python" => self.construct_python_reader(py), + "sqlite" => self.construct_sqlite_reader(), other => Err(PyValueError::new_err(format!( "Unknown data source {other:?}" ))), diff --git a/tests/integration/test_bytes.rs b/tests/integration/test_bytes.rs index b1b9bf5d..94c0b879 100644 --- a/tests/integration/test_bytes.rs +++ b/tests/integration/test_bytes.rs @@ -1,7 +1,5 @@ // Copyright © 2024 Pathway -use std::path::PathBuf; - use pathway_engine::connectors::data_format::{IdentityParser, ParseResult, ParsedEvent, Parser}; use pathway_engine::connectors::data_storage::{ ConnectorMode, FilesystemReader, ReadMethod, ReadResult, Reader, @@ -10,13 +8,8 @@ use pathway_engine::connectors::SessionType; use pathway_engine::engine::Value; fn read_bytes_from_path(path: &str) -> eyre::Result> { - let mut reader = FilesystemReader::new( - PathBuf::from(path), - ConnectorMode::Static, - None, - ReadMethod::Full, - "*", - )?; + let mut reader = + FilesystemReader::new(path, ConnectorMode::Static, None, ReadMethod::Full, "*")?; let mut parser = IdentityParser::new(vec!["data".to_string()], false, SessionType::Native); let mut events = Vec::new(); diff --git a/tests/integration/test_connector_field_defaults.rs b/tests/integration/test_connector_field_defaults.rs index 3f7ebfce..19a3d9a3 100644 --- a/tests/integration/test_connector_field_defaults.rs +++ b/tests/integration/test_connector_field_defaults.rs @@ -3,7 +3,6 @@ use super::helpers::{data_parsing_fails, read_data_from_reader}; use std::collections::HashMap; -use std::path::PathBuf; use pathway_engine::connectors::data_format::{ DsvParser, DsvSettings, InnerSchemaField, JsonLinesParser, ParsedEvent, @@ -27,7 +26,7 @@ fn test_dsv_with_default_end_of_line() -> eyre::Result<()> { ); let reader = CsvFilesystemReader::new( - PathBuf::from("tests/data/dsv_with_skips.txt"), + "tests/data/dsv_with_skips.txt", builder, ConnectorMode::Static, None, @@ -81,7 +80,7 @@ fn test_dsv_with_default_middle_of_line() -> eyre::Result<()> { ); let reader = CsvFilesystemReader::new( - PathBuf::from("tests/data/dsv_with_skips2.txt"), + "tests/data/dsv_with_skips2.txt", builder, ConnectorMode::Static, None, @@ -131,7 +130,7 @@ fn test_dsv_fails_without_default() -> eyre::Result<()> { schema.insert("number".to_string(), InnerSchemaField::new(Type::Int, None)); let reader = CsvFilesystemReader::new( - PathBuf::from("tests/data/dsv_with_skips.txt"), + "tests/data/dsv_with_skips.txt", builder, ConnectorMode::Static, None, @@ -164,7 +163,7 @@ fn test_dsv_with_default_nullable() -> eyre::Result<()> { ); let reader = CsvFilesystemReader::new( - PathBuf::from("tests/data/dsv_with_skips.txt"), + "tests/data/dsv_with_skips.txt", builder, ConnectorMode::Static, None, @@ -208,7 +207,7 @@ fn test_dsv_with_default_nullable() -> eyre::Result<()> { #[test] fn test_jsonlines_fails_without_default() -> eyre::Result<()> { let reader = FilesystemReader::new( - PathBuf::from("tests/data/jsonlines.txt"), + "tests/data/jsonlines.txt", ConnectorMode::Static, None, ReadMethod::ByLine, @@ -237,7 +236,7 @@ fn test_jsonlines_with_default() -> eyre::Result<()> { ); let reader = FilesystemReader::new( - PathBuf::from("tests/data/jsonlines_with_skips.txt"), + "tests/data/jsonlines_with_skips.txt", ConnectorMode::Static, None, ReadMethod::ByLine, @@ -290,7 +289,7 @@ fn test_jsonlines_with_default_at_jsonpath() -> eyre::Result<()> { ); let reader = FilesystemReader::new( - PathBuf::from("tests/data/jsonlines_with_skips.txt"), + "tests/data/jsonlines_with_skips.txt", ConnectorMode::Static, None, ReadMethod::ByLine, @@ -337,7 +336,7 @@ fn test_jsonlines_explicit_null_not_overridden() -> eyre::Result<()> { ); let reader = FilesystemReader::new( - PathBuf::from("tests/data/jsonlines_with_skips_and_nulls.txt"), + "tests/data/jsonlines_with_skips_and_nulls.txt", ConnectorMode::Static, None, ReadMethod::ByLine, diff --git a/tests/integration/test_debezium.rs b/tests/integration/test_debezium.rs index 31bcb3f1..04960551 100644 --- a/tests/integration/test_debezium.rs +++ b/tests/integration/test_debezium.rs @@ -4,7 +4,6 @@ use super::helpers::{assert_error_shown_for_raw_data, read_data_from_reader}; use std::fs::File; use std::io::{BufRead, BufReader}; -use std::path::PathBuf; use assert_matches::assert_matches; @@ -20,7 +19,7 @@ use pathway_engine::engine::Value; #[test] fn test_debezium_reads_ok() -> eyre::Result<()> { let reader = FilesystemReader::new( - PathBuf::from("tests/data/sample_debezium.txt"), + "tests/data/sample_debezium.txt", ConnectorMode::Static, None, ReadMethod::ByLine, @@ -165,7 +164,7 @@ fn test_debezium_tokens_amt_mismatch() -> eyre::Result<()> { #[test] fn test_debezium_mongodb_format() -> eyre::Result<()> { let reader = FilesystemReader::new( - PathBuf::from("tests/data/sample_debezium_mongodb.txt"), + "tests/data/sample_debezium_mongodb.txt", ConnectorMode::Static, None, ReadMethod::ByLine, diff --git a/tests/integration/test_dsv.rs b/tests/integration/test_dsv.rs index 556d24f1..1975f57b 100644 --- a/tests/integration/test_dsv.rs +++ b/tests/integration/test_dsv.rs @@ -4,7 +4,6 @@ use super::helpers::{assert_error_shown, assert_error_shown_for_reader_context}; use std::collections::HashMap; use std::collections::HashSet; -use std::path::PathBuf; use pathway_engine::connectors::data_format::{ DsvParser, DsvSettings, InnerSchemaField, ParseResult, ParsedEvent, Parser, @@ -17,7 +16,7 @@ use pathway_engine::engine::{Key, Type, Value}; #[test] fn test_dsv_read_ok() -> eyre::Result<()> { let mut reader = FilesystemReader::new( - PathBuf::from("tests/data/sample.txt"), + "tests/data/sample.txt", ConnectorMode::Static, None, ReadMethod::ByLine, @@ -60,7 +59,7 @@ fn test_dsv_read_ok() -> eyre::Result<()> { #[test] fn test_dsv_column_does_not_exist() -> eyre::Result<()> { let reader = FilesystemReader::new( - PathBuf::from("tests/data/sample.txt"), + "tests/data/sample.txt", ConnectorMode::Static, None, ReadMethod::ByLine, @@ -83,7 +82,7 @@ fn test_dsv_column_does_not_exist() -> eyre::Result<()> { #[test] fn test_dsv_rows_parsing_ignore_type() -> eyre::Result<()> { let mut reader = FilesystemReader::new( - PathBuf::from("tests/data/sample_str_int.txt"), + "tests/data/sample_str_int.txt", ConnectorMode::Static, None, ReadMethod::ByLine, @@ -119,7 +118,7 @@ fn test_dsv_rows_parsing_ignore_type() -> eyre::Result<()> { #[test] fn test_dsv_not_enough_columns() -> eyre::Result<()> { let mut reader = FilesystemReader::new( - PathBuf::from("tests/data/sample_bad_lines.txt"), + "tests/data/sample_bad_lines.txt", ConnectorMode::Static, None, ReadMethod::ByLine, @@ -164,7 +163,7 @@ fn test_dsv_not_enough_columns() -> eyre::Result<()> { #[test] fn test_dsv_autogenerate_pkey() -> eyre::Result<()> { let mut reader = FilesystemReader::new( - PathBuf::from("tests/data/sample.txt"), + "tests/data/sample.txt", ConnectorMode::Static, None, ReadMethod::ByLine, @@ -207,7 +206,7 @@ fn test_dsv_autogenerate_pkey() -> eyre::Result<()> { #[test] fn test_dsv_composite_pkey() -> eyre::Result<()> { let mut reader = FilesystemReader::new( - PathBuf::from("tests/data/sample_composite_pkey.txt"), + "tests/data/sample_composite_pkey.txt", ConnectorMode::Static, None, ReadMethod::ByLine, @@ -270,7 +269,7 @@ fn test_dsv_read_schema_ok() -> eyre::Result<()> { ); let mut reader = FilesystemReader::new( - PathBuf::from("tests/data/schema.txt"), + "tests/data/schema.txt", ConnectorMode::Static, None, ReadMethod::ByLine, @@ -339,7 +338,7 @@ fn test_dsv_read_schema_nonparsable() -> eyre::Result<()> { ); let mut reader = FilesystemReader::new( - PathBuf::from("tests/data/incorrect_types.txt"), + "tests/data/incorrect_types.txt", ConnectorMode::Static, None, ReadMethod::ByLine, diff --git a/tests/integration/test_dsv_dir.rs b/tests/integration/test_dsv_dir.rs index b0db85a4..daf1d5ee 100644 --- a/tests/integration/test_dsv_dir.rs +++ b/tests/integration/test_dsv_dir.rs @@ -3,7 +3,6 @@ use super::helpers::read_data_from_reader; use std::collections::HashMap; -use std::path::PathBuf; use pathway_engine::connectors::data_format::ParsedEvent; use pathway_engine::connectors::data_format::{DsvParser, DsvSettings}; @@ -16,7 +15,7 @@ fn test_dsv_dir_ok() -> eyre::Result<()> { builder.has_headers(false); let reader = CsvFilesystemReader::new( - PathBuf::from("tests/data/csvdir"), + "tests/data/csvdir", builder, ConnectorMode::Static, None, @@ -49,7 +48,7 @@ fn test_single_file_ok() -> eyre::Result<()> { builder.has_headers(false); let reader = CsvFilesystemReader::new( - PathBuf::from("tests/data/sample.txt"), + "tests/data/sample.txt", builder, ConnectorMode::Static, None, @@ -74,7 +73,7 @@ fn test_custom_delimiter() -> eyre::Result<()> { builder.has_headers(false); let reader = CsvFilesystemReader::new( - PathBuf::from("tests/data/sql_injection.txt"), + "tests/data/sql_injection.txt", builder, ConnectorMode::Static, None, @@ -101,7 +100,7 @@ fn test_escape_fields() -> eyre::Result<()> { builder.has_headers(false); let reader = CsvFilesystemReader::new( - PathBuf::from("tests/data/csv_fields_escaped.txt"), + "tests/data/csv_fields_escaped.txt", builder, ConnectorMode::Static, None, @@ -147,7 +146,7 @@ fn test_escape_newlines() -> eyre::Result<()> { builder.has_headers(false); let reader = CsvFilesystemReader::new( - PathBuf::from("tests/data/csv_escaped_newlines.txt"), + "tests/data/csv_escaped_newlines.txt", builder, ConnectorMode::Static, None, @@ -183,13 +182,17 @@ fn test_nonexistent_file() -> eyre::Result<()> { builder.has_headers(false); let reader = CsvFilesystemReader::new( - PathBuf::from("tests/data/nonexistent_file.txt"), + "tests/data/nonexistent_file.txt", builder, ConnectorMode::Static, None, "*", ); - assert!(reader.is_err()); + + // We treat this path as a glob pattern, so the situation is normal: + // the scanner will just scan the contents on an empty set. If a file + // will be added at this path, it will be scanned and read. + assert!(reader.is_ok()); Ok(()) } @@ -200,7 +203,7 @@ fn test_special_fields() -> eyre::Result<()> { builder.has_headers(false); let reader = CsvFilesystemReader::new( - PathBuf::from("tests/data/csv_special_fields.txt"), + "tests/data/csv_special_fields.txt", builder, ConnectorMode::Static, None, diff --git a/tests/integration/test_jsonlines.rs b/tests/integration/test_jsonlines.rs index ce426db8..3ef0b989 100644 --- a/tests/integration/test_jsonlines.rs +++ b/tests/integration/test_jsonlines.rs @@ -3,7 +3,6 @@ use super::helpers::{assert_error_shown, read_data_from_reader}; use std::collections::HashMap; -use std::path::PathBuf; use std::sync::Arc; @@ -15,7 +14,7 @@ use pathway_engine::engine::Value; #[test] fn test_jsonlines_ok() -> eyre::Result<()> { let reader = FilesystemReader::new( - PathBuf::from("tests/data/jsonlines.txt"), + "tests/data/jsonlines.txt", ConnectorMode::Static, None, ReadMethod::ByLine, @@ -55,7 +54,7 @@ fn test_jsonlines_ok() -> eyre::Result<()> { #[test] fn test_jsonlines_incorrect_key() -> eyre::Result<()> { let reader = FilesystemReader::new( - PathBuf::from("tests/data/jsonlines.txt"), + "tests/data/jsonlines.txt", ConnectorMode::Static, None, ReadMethod::ByLine, @@ -82,7 +81,7 @@ fn test_jsonlines_incorrect_key() -> eyre::Result<()> { #[test] fn test_jsonlines_incomplete_key_to_null() -> eyre::Result<()> { let reader = FilesystemReader::new( - PathBuf::from("tests/data/jsonlines.txt"), + "tests/data/jsonlines.txt", ConnectorMode::Static, None, ReadMethod::ByLine, @@ -106,7 +105,7 @@ fn test_jsonlines_incomplete_key_to_null() -> eyre::Result<()> { #[test] fn test_jsonlines_incorrect_values() -> eyre::Result<()> { let reader = FilesystemReader::new( - PathBuf::from("tests/data/jsonlines.txt"), + "tests/data/jsonlines.txt", ConnectorMode::Static, None, ReadMethod::ByLine, @@ -133,7 +132,7 @@ fn test_jsonlines_incorrect_values() -> eyre::Result<()> { #[test] fn test_jsonlines_types_parsing() -> eyre::Result<()> { let reader = FilesystemReader::new( - PathBuf::from("tests/data/jsonlines_types.txt"), + "tests/data/jsonlines_types.txt", ConnectorMode::Static, None, ReadMethod::ByLine, @@ -184,7 +183,7 @@ fn test_jsonlines_types_parsing() -> eyre::Result<()> { #[test] fn test_jsonlines_complex_paths() -> eyre::Result<()> { let reader = FilesystemReader::new( - PathBuf::from("tests/data/json_complex_paths.txt"), + "tests/data/json_complex_paths.txt", ConnectorMode::Static, None, ReadMethod::ByLine, @@ -241,7 +240,7 @@ fn test_jsonlines_complex_paths() -> eyre::Result<()> { #[test] fn test_jsonlines_complex_paths_error() -> eyre::Result<()> { let reader = FilesystemReader::new( - PathBuf::from("tests/data/json_complex_paths.txt"), + "tests/data/json_complex_paths.txt", ConnectorMode::Static, None, ReadMethod::ByLine, @@ -283,7 +282,7 @@ fn test_jsonlines_complex_paths_error() -> eyre::Result<()> { #[test] fn test_jsonlines_complex_path_ignore_errors() -> eyre::Result<()> { let reader = FilesystemReader::new( - PathBuf::from("tests/data/json_complex_paths.txt"), + "tests/data/json_complex_paths.txt", ConnectorMode::Static, None, ReadMethod::ByLine, @@ -322,7 +321,7 @@ fn test_jsonlines_complex_path_ignore_errors() -> eyre::Result<()> { #[test] fn test_jsonlines_incorrect_key_verbose_error() -> eyre::Result<()> { let reader = FilesystemReader::new( - PathBuf::from("tests/data/jsonlines.txt"), + "tests/data/jsonlines.txt", ConnectorMode::Static, None, ReadMethod::ByLine, @@ -352,7 +351,7 @@ fn test_jsonlines_incorrect_jsonpointer_verbose_error() -> eyre::Result<()> { routes.insert("d".to_string(), "/non/existent/path".to_string()); let reader = FilesystemReader::new( - PathBuf::from("tests/data/jsonlines.txt"), + "tests/data/jsonlines.txt", ConnectorMode::Static, None, ReadMethod::ByLine, @@ -379,7 +378,7 @@ fn test_jsonlines_incorrect_jsonpointer_verbose_error() -> eyre::Result<()> { #[test] fn test_jsonlines_failed_to_parse_field() -> eyre::Result<()> { let reader = FilesystemReader::new( - PathBuf::from("tests/data/json_complex_paths.txt"), + "tests/data/json_complex_paths.txt", ConnectorMode::Static, None, ReadMethod::ByLine, diff --git a/tests/integration/test_metadata.rs b/tests/integration/test_metadata.rs index 33c96bf7..4fbedf2a 100644 --- a/tests/integration/test_metadata.rs +++ b/tests/integration/test_metadata.rs @@ -3,7 +3,6 @@ use super::helpers::read_data_from_reader; use std::collections::HashMap; -use std::path::PathBuf; use pathway_engine::connectors::data_format::{ DsvParser, DsvSettings, IdentityParser, JsonLinesParser, ParsedEvent, @@ -31,7 +30,7 @@ fn check_file_name_in_metadata(data_read: &ParsedEvent, name: &str) { #[test] fn test_metadata_fs_dir() -> eyre::Result<()> { let reader = FilesystemReader::new( - PathBuf::from("tests/data/csvdir/"), + "tests/data/csvdir/", ConnectorMode::Static, None, ReadMethod::ByLine, @@ -61,7 +60,7 @@ fn test_metadata_fs_dir() -> eyre::Result<()> { #[test] fn test_metadata_fs_file() -> eyre::Result<()> { let reader = FilesystemReader::new( - PathBuf::from("tests/data/minimal.txt"), + "tests/data/minimal.txt", ConnectorMode::Static, None, ReadMethod::ByLine, @@ -92,7 +91,7 @@ fn test_metadata_csv_dir() -> eyre::Result<()> { builder.has_headers(false); let reader = CsvFilesystemReader::new( - PathBuf::from("tests/data/csvdir/"), + "tests/data/csvdir/", builder, ConnectorMode::Static, None, @@ -125,7 +124,7 @@ fn test_metadata_csv_file() -> eyre::Result<()> { builder.has_headers(false); let reader = CsvFilesystemReader::new( - PathBuf::from("tests/data/minimal.txt"), + "tests/data/minimal.txt", builder, ConnectorMode::Static, None, @@ -153,7 +152,7 @@ fn test_metadata_csv_file() -> eyre::Result<()> { #[test] fn test_metadata_json_file() -> eyre::Result<()> { let reader = FilesystemReader::new( - PathBuf::from("tests/data/jsonlines.txt"), + "tests/data/jsonlines.txt", ConnectorMode::Static, None, ReadMethod::ByLine, @@ -177,7 +176,7 @@ fn test_metadata_json_file() -> eyre::Result<()> { #[test] fn test_metadata_json_dir() -> eyre::Result<()> { let reader = FilesystemReader::new( - PathBuf::from("tests/data/jsonlines/"), + "tests/data/jsonlines/", ConnectorMode::Static, None, ReadMethod::ByLine, @@ -202,7 +201,7 @@ fn test_metadata_json_dir() -> eyre::Result<()> { #[test] fn test_metadata_identity_file() -> eyre::Result<()> { let reader = FilesystemReader::new( - PathBuf::from("tests/data/jsonlines.txt"), + "tests/data/jsonlines.txt", ConnectorMode::Static, None, ReadMethod::ByLine, @@ -223,7 +222,7 @@ fn test_metadata_identity_file() -> eyre::Result<()> { #[test] fn test_metadata_identity_dir() -> eyre::Result<()> { let reader = FilesystemReader::new( - PathBuf::from("tests/data/jsonlines/"), + "tests/data/jsonlines/", ConnectorMode::Static, None, ReadMethod::ByLine, diff --git a/tests/integration/test_seek.rs b/tests/integration/test_seek.rs index 6507f70a..61f5fc57 100644 --- a/tests/integration/test_seek.rs +++ b/tests/integration/test_seek.rs @@ -25,17 +25,11 @@ enum TestedFormat { Json, } -fn csv_reader_parser_pair(input_path: &Path) -> (Box, Box) { +fn csv_reader_parser_pair(input_path: &str) -> (Box, Box) { let mut builder = csv::ReaderBuilder::new(); builder.has_headers(false); - let reader = CsvFilesystemReader::new( - input_path.to_path_buf(), - builder, - ConnectorMode::Static, - Some(1), - "*", - ) - .unwrap(); + let reader = + CsvFilesystemReader::new(input_path, builder, ConnectorMode::Static, Some(1), "*").unwrap(); let parser = DsvParser::new( DsvSettings::new( Some(vec!["key".to_string()]), @@ -47,9 +41,9 @@ fn csv_reader_parser_pair(input_path: &Path) -> (Box, Box (Box, Box) { +fn json_reader_parser_pair(input_path: &str) -> (Box, Box) { let reader = FilesystemReader::new( - input_path.to_path_buf(), + input_path, ConnectorMode::Static, Some(1), ReadMethod::ByLine, @@ -74,8 +68,8 @@ fn full_cycle_read_kv( global_tracker: Option<&SharedWorkersPersistenceCoordinator>, ) -> FullReadResult { let (reader, mut parser) = match format { - TestedFormat::Csv => csv_reader_parser_pair(input_path), - TestedFormat::Json => json_reader_parser_pair(input_path), + TestedFormat::Csv => csv_reader_parser_pair(input_path.to_str().unwrap()), + TestedFormat::Json => json_reader_parser_pair(input_path.to_str().unwrap()), }; full_cycle_read(reader, parser.as_mut(), persistent_storage, global_tracker) }