diff --git a/CHANGELOG.md b/CHANGELOG.md index bf7ab85a..161ae71d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,7 +14,6 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm ### Changed - `pw.io.kafka.write` now does retries when sending to the output topic fails. -- `pw.io.csv.read`, `pw.io.jsonlines.read`, `pw.io.fs.read`, `pw.io.plaintext.read` now handle `path` as a glob pattern and read all matched files and directories recursively. ## [0.8.0] - 2024-02-01 diff --git a/python/pathway/io/csv/__init__.py b/python/pathway/io/csv/__init__.py index 28c90889..70e1e832 100644 --- a/python/pathway/io/csv/__init__.py +++ b/python/pathway/io/csv/__init__.py @@ -40,9 +40,7 @@ def read( the modification time. Args: - path: [glob](https://en.wikipedia.org/wiki/Glob_(programming)) pattern for the \ -objects to be read. The connector will read the contents of all matching files as well \ -as recursively read the contents of all matching folders. + path: Path to the file or to the folder with files. value_columns: Names of the columns to be extracted from the files. [will be deprecated soon] schema: Schema of the resulting table. id_columns: In case the table should have a primary key generated according to @@ -57,8 +55,7 @@ def read( the other hand, the "static" mode will only consider the available data and ingest all \ of it in one commit. The default value is "streaming". object_pattern: Unix shell style pattern for filtering only certain files in the \ -directory. Ignored in case a path to a single file is specified. This value will be \ -deprecated soon, please use glob pattern in ``path`` instead. +directory. Ignored in case a path to a single file is specified. with_metadata: When set to true, the connector will add an additional column \ named ``_metadata`` to the table. This column will be a JSON field that will contain two \ optional fields - ``created_at`` and ``modified_at``. These fields will have integral \ diff --git a/python/pathway/io/fs/__init__.py b/python/pathway/io/fs/__init__.py index 688d95c3..2e674dc4 100644 --- a/python/pathway/io/fs/__init__.py +++ b/python/pathway/io/fs/__init__.py @@ -2,7 +2,6 @@ from __future__ import annotations -import warnings from os import PathLike, fspath from typing import Any @@ -57,9 +56,7 @@ def read( ``data`` with each cell containing a single line from the file. Args: - path: [glob](https://en.wikipedia.org/wiki/Glob_(programming)) pattern for the \ -objects to be read. The connector will read the contents of all matching files as well \ -as recursively read the contents of all matching folders. + path: Path to the file or to the folder with files. format: Format of data to be read. Currently "csv", "json", "plaintext", \ "plaintext_by_file" and "binary" formats are supported. The difference between \ "plaintext" and "plaintext_by_file" is how the input is tokenized: if the "plaintext" \ @@ -82,8 +79,7 @@ def read( where the path to be mapped needs to be a `JSON Pointer (RFC 6901) `_. object_pattern: Unix shell style pattern for filtering only certain files in the \ -directory. Ignored in case a path to a single file is specified. This value will be \ -deprecated soon, please use glob pattern in ``path`` instead. +directory. Ignored in case a path to a single file is specified. with_metadata: When set to true, the connector will add an additional column \ named ``_metadata`` to the table. This column will be a JSON field that will contain two \ optional fields - ``created_at`` and ``modified_at``. These fields will have integral \ @@ -218,20 +214,10 @@ def read( >>> t = pw.io.fs.read("raw_dataset/lines.txt", format="plaintext") """ - path = fspath(path) - - if object_pattern != "*": - warnings.warn( - "'object_pattern' is deprecated and will be removed soon. " - "Please use a glob pattern in `path` instead", - DeprecationWarning, - stacklevel=2, - ) - if format == "csv": data_storage = api.DataStorage( storage_type="csv", - path=path, + path=fspath(path), csv_parser_settings=csv_settings.api_settings if csv_settings else None, mode=internal_connector_mode(mode), object_pattern=object_pattern, @@ -240,7 +226,7 @@ def read( else: data_storage = api.DataStorage( storage_type="fs", - path=path, + path=fspath(path), mode=internal_connector_mode(mode), read_method=internal_read_method(format), object_pattern=object_pattern, diff --git a/python/pathway/io/jsonlines/__init__.py b/python/pathway/io/jsonlines/__init__.py index 3b9a3846..10f62a5e 100644 --- a/python/pathway/io/jsonlines/__init__.py +++ b/python/pathway/io/jsonlines/__init__.py @@ -39,9 +39,7 @@ def read( the modification time. Args: - path: [glob](https://en.wikipedia.org/wiki/Glob_(programming)) pattern for the \ -objects to be read. The connector will read the contents of all matching files as well \ -as recursively read the contents of all matching folders. + path: Path to the file or to the folder with files. schema: Schema of the resulting table. mode: Denotes how the engine polls the new data from the source. Currently \ "streaming" and "static" are supported. If set to "streaming" the engine will wait for \ @@ -55,8 +53,7 @@ def read( ``: ``, where the path to be mapped needs to be a `JSON Pointer (RFC 6901) `_. object_pattern: Unix shell style pattern for filtering only certain files in the \ -directory. Ignored in case a path to a single file is specified. This value will be \ -deprecated soon, please use glob pattern in ``path`` instead. +directory. Ignored in case a path to a single file is specified. with_metadata: When set to true, the connector will add an additional column \ named ``_metadata`` to the table. This column will be a JSON field that will contain two \ optional fields - ``created_at`` and ``modified_at``. These fields will have integral \ diff --git a/python/pathway/io/plaintext/__init__.py b/python/pathway/io/plaintext/__init__.py index 56f04f4b..40c61efc 100644 --- a/python/pathway/io/plaintext/__init__.py +++ b/python/pathway/io/plaintext/__init__.py @@ -31,9 +31,7 @@ def read( modification time is, the earlier the file will be passed to the engine. Args: - path: [glob](https://en.wikipedia.org/wiki/Glob_(programming)) pattern for the \ -objects to be read. The connector will read the contents of all matching files as well \ -as recursively read the contents of all matching folders. + path: Path to a file or to a folder. mode: Denotes how the engine polls the new data from the source. Currently \ "streaming" and "static" are supported. If set to "streaming" the engine will wait for \ the updates in the specified directory. It will track file additions, deletions, and \ @@ -42,8 +40,7 @@ def read( the other hand, the "static" mode will only consider the available data and ingest all \ of it in one commit. The default value is "streaming". object_pattern: Unix shell style pattern for filtering only certain files in the \ -directory. Ignored in case a path to a single file is specified. This value will be \ -deprecated soon, please use glob pattern in ``path`` instead. +directory. Ignored in case a path to a single file is specified. with_metadata: When set to true, the connector will add an additional column \ named ``_metadata`` to the table. This column will be a JSON field that will contain two \ optional fields - ``created_at`` and ``modified_at``. These fields will have integral \ diff --git a/python/pathway/tests/test_io.py b/python/pathway/tests/test_io.py index ebf7de72..d0bbdc3c 100644 --- a/python/pathway/tests/test_io.py +++ b/python/pathway/tests/test_io.py @@ -2824,62 +2824,3 @@ class InputSchema(pw.Schema): schema=InputSchema, delete_completed_queries=False, ) - - -def test_subdirectories(tmp_path: pathlib.Path): - nested_inputs_path = ( - tmp_path / "nested_level_1" / "nested_level_2" / "nested_level_3" - ) - os.makedirs(nested_inputs_path) - output_path = tmp_path / "output.json" - write_lines(nested_inputs_path / "a.txt", "a\nb\nc") - - table = pw.io.plaintext.read(tmp_path / "nested_level_1", mode="static") - pw.io.jsonlines.write(table, output_path) - pw.run() - - assert FileLinesNumberChecker(output_path, 3)() - - -def test_glob_pattern(tmp_path: pathlib.Path): - nested_inputs_path = ( - tmp_path / "nested_level_1" / "nested_level_2" / "nested_level_3" - ) - os.makedirs(nested_inputs_path) - output_path = tmp_path / "output.json" - write_lines(nested_inputs_path / "a.txt", "a\nb\nc") - write_lines(nested_inputs_path / "b.txt", "d\ne\nf\ng") - - table = pw.io.plaintext.read(tmp_path / "nested_level_1/**/b.txt", mode="static") - pw.io.jsonlines.write(table, output_path) - pw.run() - - assert FileLinesNumberChecker(output_path, 4)() - - -def test_glob_pattern_recurse_subdirs(tmp_path: pathlib.Path): - os.makedirs(tmp_path / "input" / "foo" / "level2") - write_lines(tmp_path / "input" / "foo" / "level2" / "a.txt", "a\nb\nc") - write_lines(tmp_path / "input" / "f1.txt", "d\ne\nf\ng") - write_lines(tmp_path / "input" / "bar.txt", "h\ni\nj\nk\nl") - output_path = tmp_path / "output.json" - - table = pw.io.plaintext.read(tmp_path / "input/f*", mode="static") - pw.io.jsonlines.write(table, output_path) - pw.run() - - assert FileLinesNumberChecker(output_path, 7)() - - -def test_glob_pattern_nothing_matched(tmp_path: pathlib.Path): - os.makedirs(tmp_path / "input" / "foo" / ".level2") - write_lines(tmp_path / "input" / "foo" / ".level2" / ".a.txt", "a\nb\nc") - write_lines(tmp_path / "input" / "f1.txt", "d\ne\nf\ng") - write_lines(tmp_path / "input" / "bar.txt", "h\ni\nj\nk\nl") - output_path = tmp_path / "output.json" - - table = pw.io.plaintext.read(tmp_path / "input/f", mode="static") - pw.io.jsonlines.write(table, output_path) - pw.run() - - assert FileLinesNumberChecker(output_path, 0)() diff --git a/src/connectors/data_storage.rs b/src/connectors/data_storage.rs index 91f0d096..7e69b1e3 100644 --- a/src/connectors/data_storage.rs +++ b/src/connectors/data_storage.rs @@ -11,6 +11,7 @@ use std::collections::HashSet; use std::collections::VecDeque; use std::env; use std::fmt::Debug; +use std::fs::DirEntry; use std::fs::File; use std::io; use std::io::BufRead; @@ -25,7 +26,7 @@ use std::str::{from_utf8, Utf8Error}; use std::sync::Arc; use std::thread; use std::thread::sleep; -use std::time::{Duration, SystemTime}; +use std::time::{Duration, SystemTime, UNIX_EPOCH}; use chrono::{DateTime, FixedOffset}; use log::{error, warn}; @@ -72,12 +73,9 @@ use serde::{Deserialize, Serialize}; mod inotify_support { use inotify::WatchMask; use std::path::Path; - use std::thread::sleep; - use std::time::Duration; pub use inotify::Inotify; - #[allow(dead_code)] pub fn subscribe_inotify(path: impl AsRef) -> Option { let inotify = Inotify::init().ok()?; @@ -98,17 +96,11 @@ mod inotify_support { Some(inotify) } - #[allow(clippy::unnecessary_wraps)] - pub fn wait(_inotify: &mut Inotify) -> Option<()> { - sleep(Duration::from_millis(500)); - None - - // Commented out due to using recursive subdirs - // - // inotify - // .read_events_blocking(&mut [0; 1024]) - // .ok() - // .map(|_events| ()) + pub fn wait(inotify: &mut Inotify) -> Option<()> { + inotify + .read_events_blocking(&mut [0; 1024]) + .ok() + .map(|_events| ()) } } @@ -483,7 +475,7 @@ pub struct FilesystemReader { impl FilesystemReader { pub fn new( - path: &str, + path: impl Into, streaming_mode: ConnectorMode, persistent_id: Option, read_method: ReadMethod, @@ -749,10 +741,11 @@ enum PosixScannerAction { #[derive(Debug)] struct FilesystemScanner { - path: GlobPattern, + path: PathBuf, cache_directory_path: Option, + + is_directory: bool, streaming_mode: ConnectorMode, - object_pattern: String, // Mapping from the path of the loaded file to its modification timestamp known_files: HashMap, @@ -760,6 +753,7 @@ struct FilesystemScanner { current_action: Option, cached_modify_times: HashMap>, inotify: Option, + object_pattern: GlobPattern, next_file_for_insertion: Option, cached_metadata: HashMap>, @@ -770,16 +764,22 @@ struct FilesystemScanner { impl FilesystemScanner { fn new( - path: &str, + path: impl Into, persistent_id: Option, streaming_mode: ConnectorMode, object_pattern: &str, ) -> Result { - let path_glob = GlobPattern::new(path)?; + let mut path = path.into(); + if path.exists() || matches!(streaming_mode, ConnectorMode::Static) { + path = std::fs::canonicalize(path)?; + } - // Alternative solution here is to do inotify_support::subscribe_inotify(path) - // if streaming mode allows polling. - let inotify = None; + let is_directory = path.is_dir(); + let inotify = if streaming_mode.is_polling_enabled() { + inotify_support::subscribe_inotify(&path) + } else { + None + }; let (cache_directory_path, connector_tmp_storage) = { if streaming_mode.are_deletions_enabled() { @@ -805,15 +805,16 @@ impl FilesystemScanner { }; Ok(Self { - path: path_glob, + path, + is_directory, streaming_mode, cache_directory_path, - object_pattern: object_pattern.to_string(), known_files: HashMap::new(), current_action: None, cached_modify_times: HashMap::new(), inotify, + object_pattern: GlobPattern::new(object_pattern)?, next_file_for_insertion: None, cached_metadata: HashMap::new(), _connector_tmp_storage: connector_tmp_storage, @@ -858,17 +859,31 @@ impl FilesystemScanner { } } - fn seek_to_file(&mut self, seek_file_path: &Path) -> Result<(), ReadError> { + fn seek_to_file(&mut self, seek_file_path: &Path) -> io::Result<()> { if self.streaming_mode.are_deletions_enabled() { warn!("seek for snapshot mode may not work correctly in case deletions take place"); } self.known_files.clear(); + if !self.is_directory { + self.current_action = Some(PosixScannerAction::Read(Arc::new( + seek_file_path.to_path_buf(), + ))); + let modify_system_time = std::fs::metadata(seek_file_path)?.modified().unwrap(); + let modify_unix_timestamp = modify_system_time + .duration_since(UNIX_EPOCH) + .expect("File modified earlier than UNIX epoch") + .as_secs(); + self.known_files + .insert(seek_file_path.to_path_buf(), modify_unix_timestamp); + return Ok(()); + } + let target_modify_time = match std::fs::metadata(seek_file_path) { Ok(metadata) => metadata.modified()?, Err(e) => { if !matches!(e.kind(), std::io::ErrorKind::NotFound) { - return Err(ReadError::Io(e)); + return Err(e); } warn!( "Unable to restore state: last persisted file {seek_file_path:?} not found in directory. Processing all files in directory." @@ -876,20 +891,24 @@ impl FilesystemScanner { return Ok(()); } }; - let matching_files: Vec = self.get_matching_file_paths()?; - for entry in matching_files { - if !entry.is_file() { - continue; - } - let Some(modify_time) = self.modify_time(&entry) else { - continue; - }; - if (modify_time, entry.as_path()) <= (target_modify_time, seek_file_path) { - let modify_timestamp = modify_time - .duration_since(SystemTime::UNIX_EPOCH) - .expect("System time should be after the Unix epoch") - .as_secs(); - self.known_files.insert(entry, modify_timestamp); + let files_in_directory = std::fs::read_dir(self.path.as_path())?; + for entry in files_in_directory.flatten() { + if let Ok(file_type) = entry.file_type() { + if !file_type.is_file() { + continue; + } + + let Some(modify_time) = self.modify_time(&entry) else { + continue; + }; + let current_path = entry.path(); + if (modify_time, current_path.as_path()) <= (target_modify_time, seek_file_path) { + let modify_timestamp = modify_time + .duration_since(SystemTime::UNIX_EPOCH) + .expect("System time should be after the Unix epoch") + .as_secs(); + self.known_files.insert(current_path, modify_timestamp); + } } } self.current_action = Some(PosixScannerAction::Read(Arc::new( @@ -899,7 +918,7 @@ impl FilesystemScanner { Ok(()) } - fn modify_time(&mut self, entry: &Path) -> Option { + fn modify_time(&mut self, entry: &DirEntry) -> Option { if self.streaming_mode.are_deletions_enabled() { // If deletions are enabled, we also need to handle the case when the modification // time of an entry changes. Hence, we can't just memorize it once. @@ -907,7 +926,7 @@ impl FilesystemScanner { } else { *self .cached_modify_times - .entry(entry.to_path_buf()) + .entry(entry.path()) .or_insert_with(|| entry.metadata().ok()?.modified().ok()) } } @@ -920,7 +939,7 @@ impl FilesystemScanner { /// a `FinishedSource` event when we've had a scheduled action but the /// corresponding file was deleted before we were able to execute this scheduled action. /// scheduled action. - fn next_action_determined(&mut self) -> Result, ReadError> { + fn next_action_determined(&mut self) -> io::Result> { // Finalize the current processing action if let Some(PosixScannerAction::Delete(path)) = take(&mut self.current_action) { let cached_path = self @@ -1020,56 +1039,58 @@ impl FilesystemScanner { }) } - fn get_matching_file_paths(&self) -> Result, ReadError> { - let mut result = Vec::new(); - - let file_and_folder_paths = glob::glob(self.path.as_str())?.flatten(); - for entry in file_and_folder_paths { - // If an entry is a file, it should just be added to result - if entry.is_file() { - result.push(entry); - continue; - } - - // Otherwise scan all files in all subdirectories and add them - let Some(path) = entry.to_str() else { - error!("Non-unicode paths are not supported. Ignoring: {entry:?}"); - continue; - }; + fn next_insertion_entry(&mut self) -> io::Result> { + let mut selected_file: Option<(PathBuf, SystemTime)> = None; + if self.is_directory { + let files_in_directory = std::fs::read_dir(self.path.as_path())?; - let folder_scan_pattern = format!("{path}/**/{}", self.object_pattern); - let folder_contents = glob::glob(&folder_scan_pattern)?.flatten(); - for nested_entry in folder_contents { - if nested_entry.is_file() { - result.push(nested_entry); - } - } - } + for entry in files_in_directory.flatten() { + if let Ok(file_type) = entry.file_type() { + if !file_type.is_file() { + continue; + } - Ok(result) - } + let Some(modify_time) = self.modify_time(&entry) else { + continue; + }; - fn next_insertion_entry(&mut self) -> Result, ReadError> { - let matching_files: Vec = self.get_matching_file_paths()?; - let mut selected_file: Option<(PathBuf, SystemTime)> = None; - for entry in matching_files { - if !entry.is_file() || self.known_files.contains_key(&(*entry)) { - continue; - } + let current_path = entry.path(); - let Some(modify_time) = self.modify_time(&entry) else { - continue; - }; + if let Some(file_name) = entry.file_name().to_str() { + if !self.object_pattern.matches(file_name) { + continue; + } + } else { + warn!( + "Failed to parse file name (non-unicode): {:?}. It will be ignored", + entry.file_name() + ); + continue; + } - match &selected_file { - Some((currently_selected_name, selected_file_created_at)) => { - if (selected_file_created_at, currently_selected_name) > (&modify_time, &entry) { - selected_file = Some((entry, modify_time)); + if self.known_files.contains_key(&(*current_path)) { + continue; + } + match &selected_file { + Some((currently_selected_name, selected_file_created_at)) => { + if (selected_file_created_at, currently_selected_name) + > (&modify_time, ¤t_path) + { + selected_file = Some((current_path, modify_time)); + } + } + None => selected_file = Some((current_path, modify_time)), + } } } - None => selected_file = Some((entry, modify_time)), } + } else { + let is_existing_file = self.path.exists() && self.path.is_file(); + if !self.known_files.is_empty() || !is_existing_file { + return Ok(None); + } + selected_file = Some((self.path.clone(), SystemTime::now())); } match selected_file { @@ -1148,14 +1169,14 @@ pub struct CsvFilesystemReader { impl CsvFilesystemReader { pub fn new( - path: &str, + path: impl Into, parser_builder: csv::ReaderBuilder, streaming_mode: ConnectorMode, persistent_id: Option, object_pattern: &str, ) -> Result { let filesystem_scanner = - FilesystemScanner::new(path, persistent_id, streaming_mode, object_pattern)?; + FilesystemScanner::new(path.into(), persistent_id, streaming_mode, object_pattern)?; Ok(CsvFilesystemReader { parser_builder, persistent_id, diff --git a/src/python_api.rs b/src/python_api.rs index 489a7b92..b1dd0e15 100644 --- a/src/python_api.rs +++ b/src/python_api.rs @@ -3636,120 +3636,107 @@ impl DataStorage { .map(IntoPersistentId::into_persistent_id) } - fn construct_fs_reader(&self) -> PyResult<(Box, usize)> { - let storage = FilesystemReader::new( - self.path()?, - self.mode, - self.internal_persistent_id(), - self.read_method, - &self.object_pattern, - ) - .map_err(|e| PyIOError::new_err(format!("Failed to initialize Filesystem reader: {e}")))?; - Ok((Box::new(storage), 1)) - } - - fn construct_s3_reader(&self, py: pyo3::Python) -> PyResult<(Box, usize)> { - let (_, deduced_path) = AwsS3Settings::deduce_bucket_and_path(self.path()?); - let storage = S3GenericReader::new( - self.s3_bucket(py)?, - deduced_path.unwrap_or(self.path()?.to_string()), - self.mode.is_polling_enabled(), - self.internal_persistent_id(), - self.read_method, - ) - .map_err(|e| PyRuntimeError::new_err(format!("Creating S3 reader failed: {e}")))?; - Ok((Box::new(storage), 1)) - } - - fn construct_s3_csv_reader( - &self, - py: pyo3::Python, - ) -> PyResult<(Box, usize)> { - let (_, deduced_path) = AwsS3Settings::deduce_bucket_and_path(self.path()?); - let storage = S3CsvReader::new( - self.s3_bucket(py)?, - deduced_path.unwrap_or(self.path()?.to_string()), - self.build_csv_parser_settings(py), - self.mode.is_polling_enabled(), - self.internal_persistent_id(), - ) - .map_err(|e| PyRuntimeError::new_err(format!("Creating S3 reader failed: {e}")))?; - Ok((Box::new(storage), 1)) - } - - fn construct_csv_reader(&self, py: pyo3::Python) -> PyResult<(Box, usize)> { - let reader = CsvFilesystemReader::new( - self.path()?, - self.build_csv_parser_settings(py), - self.mode, - self.internal_persistent_id(), - &self.object_pattern, - ) - .map_err(|e| { - PyIOError::new_err(format!("Failed to initialize CsvFilesystem reader: {e}")) - })?; - Ok((Box::new(reader), 1)) - } - - fn construct_kafka_reader(&self) -> PyResult<(Box, usize)> { - let client_config = self.kafka_client_config()?; - - let consumer: BaseConsumer = client_config - .create() - .map_err(|e| PyValueError::new_err(format!("Creating Kafka consumer failed: {e}")))?; - - let topic = self.kafka_topic()?; - consumer - .subscribe(&[topic]) - .map_err(|e| PyIOError::new_err(format!("Subscription to Kafka topic failed: {e}")))?; - - let reader = KafkaReader::new(consumer, topic.to_string(), self.internal_persistent_id()); - Ok((Box::new(reader), self.parallel_readers.unwrap_or(256))) - } + fn construct_reader(&self, py: pyo3::Python) -> PyResult<(Box, usize)> { + match self.storage_type.as_ref() { + "fs" => { + let storage = FilesystemReader::new( + self.path()?, + self.mode, + self.internal_persistent_id(), + self.read_method, + &self.object_pattern, + ) + .map_err(|e| { + PyIOError::new_err(format!("Failed to initialize Filesystem reader: {e}")) + })?; + Ok((Box::new(storage), 1)) + } + "s3" => { + let (_, deduced_path) = AwsS3Settings::deduce_bucket_and_path(self.path()?); + let storage = S3GenericReader::new( + self.s3_bucket(py)?, + deduced_path.unwrap_or(self.path()?.to_string()), + self.mode.is_polling_enabled(), + self.internal_persistent_id(), + self.read_method, + ) + .map_err(|e| PyRuntimeError::new_err(format!("Creating S3 reader failed: {e}")))?; + Ok((Box::new(storage), 1)) + } + "s3_csv" => { + let (_, deduced_path) = AwsS3Settings::deduce_bucket_and_path(self.path()?); + let storage = S3CsvReader::new( + self.s3_bucket(py)?, + deduced_path.unwrap_or(self.path()?.to_string()), + self.build_csv_parser_settings(py), + self.mode.is_polling_enabled(), + self.internal_persistent_id(), + ) + .map_err(|e| PyRuntimeError::new_err(format!("Creating S3 reader failed: {e}")))?; + Ok((Box::new(storage), 1)) + } + "csv" => { + let reader = CsvFilesystemReader::new( + self.path()?, + self.build_csv_parser_settings(py), + self.mode, + self.internal_persistent_id(), + &self.object_pattern, + ) + .map_err(|e| { + PyIOError::new_err(format!("Failed to initialize CsvFilesystem reader: {e}")) + })?; + Ok((Box::new(reader), 1)) + } + "kafka" => { + let client_config = self.kafka_client_config()?; - fn construct_python_reader( - &self, - py: pyo3::Python, - ) -> PyResult<(Box, usize)> { - let subject = self.python_subject.clone().ok_or_else(|| { - PyValueError::new_err("For Python connector, python_subject should be specified") - })?; + let consumer: BaseConsumer = client_config.create().map_err(|e| { + PyValueError::new_err(format!("Creating Kafka consumer failed: {e}")) + })?; - if subject.borrow(py).is_internal && self.persistent_id.is_some() { - return Err(PyValueError::new_err( - "Python connectors marked internal can't have persistent id", - )); - } + let topic = self.kafka_topic()?; + consumer.subscribe(&[topic]).map_err(|e| { + PyIOError::new_err(format!("Subscription to Kafka topic failed: {e}")) + })?; - let reader = PythonReaderBuilder::new(subject, self.internal_persistent_id()); - Ok((Box::new(reader), 1)) - } + let reader = + KafkaReader::new(consumer, topic.to_string(), self.internal_persistent_id()); + Ok((Box::new(reader), self.parallel_readers.unwrap_or(256))) + } + "python" => { + let subject = self.python_subject.clone().ok_or_else(|| { + PyValueError::new_err( + "For Python connector, python_subject should be specified", + ) + })?; - fn construct_sqlite_reader(&self) -> PyResult<(Box, usize)> { - let connection = SqliteConnection::open_with_flags( - self.path()?, - SqliteOpenFlags::SQLITE_OPEN_READ_ONLY | SqliteOpenFlags::SQLITE_OPEN_NO_MUTEX, - ) - .map_err(|e| PyRuntimeError::new_err(format!("Failed to open Sqlite connection: {e}")))?; - let table_name = self.table_name.clone().ok_or_else(|| { - PyValueError::new_err("For Sqlite connector, table_name should be specified") - })?; - let column_names = self.column_names.clone().ok_or_else(|| { - PyValueError::new_err("For Sqlite connector, column_names should be specified") - })?; - let reader = SqliteReader::new(connection, table_name, column_names); - Ok((Box::new(reader), 1)) - } + if subject.borrow(py).is_internal && self.persistent_id.is_some() { + return Err(PyValueError::new_err( + "Python connectors marked internal can't have persistent id", + )); + } - fn construct_reader(&self, py: pyo3::Python) -> PyResult<(Box, usize)> { - match self.storage_type.as_ref() { - "fs" => self.construct_fs_reader(), - "s3" => self.construct_s3_reader(py), - "s3_csv" => self.construct_s3_csv_reader(py), - "csv" => self.construct_csv_reader(py), - "kafka" => self.construct_kafka_reader(), - "python" => self.construct_python_reader(py), - "sqlite" => self.construct_sqlite_reader(), + let reader = PythonReaderBuilder::new(subject, self.internal_persistent_id()); + Ok((Box::new(reader), 1)) + } + "sqlite" => { + let connection = SqliteConnection::open_with_flags( + self.path()?, + SqliteOpenFlags::SQLITE_OPEN_READ_ONLY | SqliteOpenFlags::SQLITE_OPEN_NO_MUTEX, + ) + .map_err(|e| { + PyRuntimeError::new_err(format!("Failed to open Sqlite connection: {e}")) + })?; + let table_name = self.table_name.clone().ok_or_else(|| { + PyValueError::new_err("For Sqlite connector, table_name should be specified") + })?; + let column_names = self.column_names.clone().ok_or_else(|| { + PyValueError::new_err("For Sqlite connector, column_names should be specified") + })?; + let reader = SqliteReader::new(connection, table_name, column_names); + Ok((Box::new(reader), 1)) + } other => Err(PyValueError::new_err(format!( "Unknown data source {other:?}" ))), diff --git a/tests/integration/test_bytes.rs b/tests/integration/test_bytes.rs index 94c0b879..b1b9bf5d 100644 --- a/tests/integration/test_bytes.rs +++ b/tests/integration/test_bytes.rs @@ -1,5 +1,7 @@ // Copyright © 2024 Pathway +use std::path::PathBuf; + use pathway_engine::connectors::data_format::{IdentityParser, ParseResult, ParsedEvent, Parser}; use pathway_engine::connectors::data_storage::{ ConnectorMode, FilesystemReader, ReadMethod, ReadResult, Reader, @@ -8,8 +10,13 @@ use pathway_engine::connectors::SessionType; use pathway_engine::engine::Value; fn read_bytes_from_path(path: &str) -> eyre::Result> { - let mut reader = - FilesystemReader::new(path, ConnectorMode::Static, None, ReadMethod::Full, "*")?; + let mut reader = FilesystemReader::new( + PathBuf::from(path), + ConnectorMode::Static, + None, + ReadMethod::Full, + "*", + )?; let mut parser = IdentityParser::new(vec!["data".to_string()], false, SessionType::Native); let mut events = Vec::new(); diff --git a/tests/integration/test_connector_field_defaults.rs b/tests/integration/test_connector_field_defaults.rs index 19a3d9a3..3f7ebfce 100644 --- a/tests/integration/test_connector_field_defaults.rs +++ b/tests/integration/test_connector_field_defaults.rs @@ -3,6 +3,7 @@ use super::helpers::{data_parsing_fails, read_data_from_reader}; use std::collections::HashMap; +use std::path::PathBuf; use pathway_engine::connectors::data_format::{ DsvParser, DsvSettings, InnerSchemaField, JsonLinesParser, ParsedEvent, @@ -26,7 +27,7 @@ fn test_dsv_with_default_end_of_line() -> eyre::Result<()> { ); let reader = CsvFilesystemReader::new( - "tests/data/dsv_with_skips.txt", + PathBuf::from("tests/data/dsv_with_skips.txt"), builder, ConnectorMode::Static, None, @@ -80,7 +81,7 @@ fn test_dsv_with_default_middle_of_line() -> eyre::Result<()> { ); let reader = CsvFilesystemReader::new( - "tests/data/dsv_with_skips2.txt", + PathBuf::from("tests/data/dsv_with_skips2.txt"), builder, ConnectorMode::Static, None, @@ -130,7 +131,7 @@ fn test_dsv_fails_without_default() -> eyre::Result<()> { schema.insert("number".to_string(), InnerSchemaField::new(Type::Int, None)); let reader = CsvFilesystemReader::new( - "tests/data/dsv_with_skips.txt", + PathBuf::from("tests/data/dsv_with_skips.txt"), builder, ConnectorMode::Static, None, @@ -163,7 +164,7 @@ fn test_dsv_with_default_nullable() -> eyre::Result<()> { ); let reader = CsvFilesystemReader::new( - "tests/data/dsv_with_skips.txt", + PathBuf::from("tests/data/dsv_with_skips.txt"), builder, ConnectorMode::Static, None, @@ -207,7 +208,7 @@ fn test_dsv_with_default_nullable() -> eyre::Result<()> { #[test] fn test_jsonlines_fails_without_default() -> eyre::Result<()> { let reader = FilesystemReader::new( - "tests/data/jsonlines.txt", + PathBuf::from("tests/data/jsonlines.txt"), ConnectorMode::Static, None, ReadMethod::ByLine, @@ -236,7 +237,7 @@ fn test_jsonlines_with_default() -> eyre::Result<()> { ); let reader = FilesystemReader::new( - "tests/data/jsonlines_with_skips.txt", + PathBuf::from("tests/data/jsonlines_with_skips.txt"), ConnectorMode::Static, None, ReadMethod::ByLine, @@ -289,7 +290,7 @@ fn test_jsonlines_with_default_at_jsonpath() -> eyre::Result<()> { ); let reader = FilesystemReader::new( - "tests/data/jsonlines_with_skips.txt", + PathBuf::from("tests/data/jsonlines_with_skips.txt"), ConnectorMode::Static, None, ReadMethod::ByLine, @@ -336,7 +337,7 @@ fn test_jsonlines_explicit_null_not_overridden() -> eyre::Result<()> { ); let reader = FilesystemReader::new( - "tests/data/jsonlines_with_skips_and_nulls.txt", + PathBuf::from("tests/data/jsonlines_with_skips_and_nulls.txt"), ConnectorMode::Static, None, ReadMethod::ByLine, diff --git a/tests/integration/test_debezium.rs b/tests/integration/test_debezium.rs index 04960551..31bcb3f1 100644 --- a/tests/integration/test_debezium.rs +++ b/tests/integration/test_debezium.rs @@ -4,6 +4,7 @@ use super::helpers::{assert_error_shown_for_raw_data, read_data_from_reader}; use std::fs::File; use std::io::{BufRead, BufReader}; +use std::path::PathBuf; use assert_matches::assert_matches; @@ -19,7 +20,7 @@ use pathway_engine::engine::Value; #[test] fn test_debezium_reads_ok() -> eyre::Result<()> { let reader = FilesystemReader::new( - "tests/data/sample_debezium.txt", + PathBuf::from("tests/data/sample_debezium.txt"), ConnectorMode::Static, None, ReadMethod::ByLine, @@ -164,7 +165,7 @@ fn test_debezium_tokens_amt_mismatch() -> eyre::Result<()> { #[test] fn test_debezium_mongodb_format() -> eyre::Result<()> { let reader = FilesystemReader::new( - "tests/data/sample_debezium_mongodb.txt", + PathBuf::from("tests/data/sample_debezium_mongodb.txt"), ConnectorMode::Static, None, ReadMethod::ByLine, diff --git a/tests/integration/test_dsv.rs b/tests/integration/test_dsv.rs index 1975f57b..556d24f1 100644 --- a/tests/integration/test_dsv.rs +++ b/tests/integration/test_dsv.rs @@ -4,6 +4,7 @@ use super::helpers::{assert_error_shown, assert_error_shown_for_reader_context}; use std::collections::HashMap; use std::collections::HashSet; +use std::path::PathBuf; use pathway_engine::connectors::data_format::{ DsvParser, DsvSettings, InnerSchemaField, ParseResult, ParsedEvent, Parser, @@ -16,7 +17,7 @@ use pathway_engine::engine::{Key, Type, Value}; #[test] fn test_dsv_read_ok() -> eyre::Result<()> { let mut reader = FilesystemReader::new( - "tests/data/sample.txt", + PathBuf::from("tests/data/sample.txt"), ConnectorMode::Static, None, ReadMethod::ByLine, @@ -59,7 +60,7 @@ fn test_dsv_read_ok() -> eyre::Result<()> { #[test] fn test_dsv_column_does_not_exist() -> eyre::Result<()> { let reader = FilesystemReader::new( - "tests/data/sample.txt", + PathBuf::from("tests/data/sample.txt"), ConnectorMode::Static, None, ReadMethod::ByLine, @@ -82,7 +83,7 @@ fn test_dsv_column_does_not_exist() -> eyre::Result<()> { #[test] fn test_dsv_rows_parsing_ignore_type() -> eyre::Result<()> { let mut reader = FilesystemReader::new( - "tests/data/sample_str_int.txt", + PathBuf::from("tests/data/sample_str_int.txt"), ConnectorMode::Static, None, ReadMethod::ByLine, @@ -118,7 +119,7 @@ fn test_dsv_rows_parsing_ignore_type() -> eyre::Result<()> { #[test] fn test_dsv_not_enough_columns() -> eyre::Result<()> { let mut reader = FilesystemReader::new( - "tests/data/sample_bad_lines.txt", + PathBuf::from("tests/data/sample_bad_lines.txt"), ConnectorMode::Static, None, ReadMethod::ByLine, @@ -163,7 +164,7 @@ fn test_dsv_not_enough_columns() -> eyre::Result<()> { #[test] fn test_dsv_autogenerate_pkey() -> eyre::Result<()> { let mut reader = FilesystemReader::new( - "tests/data/sample.txt", + PathBuf::from("tests/data/sample.txt"), ConnectorMode::Static, None, ReadMethod::ByLine, @@ -206,7 +207,7 @@ fn test_dsv_autogenerate_pkey() -> eyre::Result<()> { #[test] fn test_dsv_composite_pkey() -> eyre::Result<()> { let mut reader = FilesystemReader::new( - "tests/data/sample_composite_pkey.txt", + PathBuf::from("tests/data/sample_composite_pkey.txt"), ConnectorMode::Static, None, ReadMethod::ByLine, @@ -269,7 +270,7 @@ fn test_dsv_read_schema_ok() -> eyre::Result<()> { ); let mut reader = FilesystemReader::new( - "tests/data/schema.txt", + PathBuf::from("tests/data/schema.txt"), ConnectorMode::Static, None, ReadMethod::ByLine, @@ -338,7 +339,7 @@ fn test_dsv_read_schema_nonparsable() -> eyre::Result<()> { ); let mut reader = FilesystemReader::new( - "tests/data/incorrect_types.txt", + PathBuf::from("tests/data/incorrect_types.txt"), ConnectorMode::Static, None, ReadMethod::ByLine, diff --git a/tests/integration/test_dsv_dir.rs b/tests/integration/test_dsv_dir.rs index daf1d5ee..b0db85a4 100644 --- a/tests/integration/test_dsv_dir.rs +++ b/tests/integration/test_dsv_dir.rs @@ -3,6 +3,7 @@ use super::helpers::read_data_from_reader; use std::collections::HashMap; +use std::path::PathBuf; use pathway_engine::connectors::data_format::ParsedEvent; use pathway_engine::connectors::data_format::{DsvParser, DsvSettings}; @@ -15,7 +16,7 @@ fn test_dsv_dir_ok() -> eyre::Result<()> { builder.has_headers(false); let reader = CsvFilesystemReader::new( - "tests/data/csvdir", + PathBuf::from("tests/data/csvdir"), builder, ConnectorMode::Static, None, @@ -48,7 +49,7 @@ fn test_single_file_ok() -> eyre::Result<()> { builder.has_headers(false); let reader = CsvFilesystemReader::new( - "tests/data/sample.txt", + PathBuf::from("tests/data/sample.txt"), builder, ConnectorMode::Static, None, @@ -73,7 +74,7 @@ fn test_custom_delimiter() -> eyre::Result<()> { builder.has_headers(false); let reader = CsvFilesystemReader::new( - "tests/data/sql_injection.txt", + PathBuf::from("tests/data/sql_injection.txt"), builder, ConnectorMode::Static, None, @@ -100,7 +101,7 @@ fn test_escape_fields() -> eyre::Result<()> { builder.has_headers(false); let reader = CsvFilesystemReader::new( - "tests/data/csv_fields_escaped.txt", + PathBuf::from("tests/data/csv_fields_escaped.txt"), builder, ConnectorMode::Static, None, @@ -146,7 +147,7 @@ fn test_escape_newlines() -> eyre::Result<()> { builder.has_headers(false); let reader = CsvFilesystemReader::new( - "tests/data/csv_escaped_newlines.txt", + PathBuf::from("tests/data/csv_escaped_newlines.txt"), builder, ConnectorMode::Static, None, @@ -182,17 +183,13 @@ fn test_nonexistent_file() -> eyre::Result<()> { builder.has_headers(false); let reader = CsvFilesystemReader::new( - "tests/data/nonexistent_file.txt", + PathBuf::from("tests/data/nonexistent_file.txt"), builder, ConnectorMode::Static, None, "*", ); - - // We treat this path as a glob pattern, so the situation is normal: - // the scanner will just scan the contents on an empty set. If a file - // will be added at this path, it will be scanned and read. - assert!(reader.is_ok()); + assert!(reader.is_err()); Ok(()) } @@ -203,7 +200,7 @@ fn test_special_fields() -> eyre::Result<()> { builder.has_headers(false); let reader = CsvFilesystemReader::new( - "tests/data/csv_special_fields.txt", + PathBuf::from("tests/data/csv_special_fields.txt"), builder, ConnectorMode::Static, None, diff --git a/tests/integration/test_jsonlines.rs b/tests/integration/test_jsonlines.rs index 3ef0b989..ce426db8 100644 --- a/tests/integration/test_jsonlines.rs +++ b/tests/integration/test_jsonlines.rs @@ -3,6 +3,7 @@ use super::helpers::{assert_error_shown, read_data_from_reader}; use std::collections::HashMap; +use std::path::PathBuf; use std::sync::Arc; @@ -14,7 +15,7 @@ use pathway_engine::engine::Value; #[test] fn test_jsonlines_ok() -> eyre::Result<()> { let reader = FilesystemReader::new( - "tests/data/jsonlines.txt", + PathBuf::from("tests/data/jsonlines.txt"), ConnectorMode::Static, None, ReadMethod::ByLine, @@ -54,7 +55,7 @@ fn test_jsonlines_ok() -> eyre::Result<()> { #[test] fn test_jsonlines_incorrect_key() -> eyre::Result<()> { let reader = FilesystemReader::new( - "tests/data/jsonlines.txt", + PathBuf::from("tests/data/jsonlines.txt"), ConnectorMode::Static, None, ReadMethod::ByLine, @@ -81,7 +82,7 @@ fn test_jsonlines_incorrect_key() -> eyre::Result<()> { #[test] fn test_jsonlines_incomplete_key_to_null() -> eyre::Result<()> { let reader = FilesystemReader::new( - "tests/data/jsonlines.txt", + PathBuf::from("tests/data/jsonlines.txt"), ConnectorMode::Static, None, ReadMethod::ByLine, @@ -105,7 +106,7 @@ fn test_jsonlines_incomplete_key_to_null() -> eyre::Result<()> { #[test] fn test_jsonlines_incorrect_values() -> eyre::Result<()> { let reader = FilesystemReader::new( - "tests/data/jsonlines.txt", + PathBuf::from("tests/data/jsonlines.txt"), ConnectorMode::Static, None, ReadMethod::ByLine, @@ -132,7 +133,7 @@ fn test_jsonlines_incorrect_values() -> eyre::Result<()> { #[test] fn test_jsonlines_types_parsing() -> eyre::Result<()> { let reader = FilesystemReader::new( - "tests/data/jsonlines_types.txt", + PathBuf::from("tests/data/jsonlines_types.txt"), ConnectorMode::Static, None, ReadMethod::ByLine, @@ -183,7 +184,7 @@ fn test_jsonlines_types_parsing() -> eyre::Result<()> { #[test] fn test_jsonlines_complex_paths() -> eyre::Result<()> { let reader = FilesystemReader::new( - "tests/data/json_complex_paths.txt", + PathBuf::from("tests/data/json_complex_paths.txt"), ConnectorMode::Static, None, ReadMethod::ByLine, @@ -240,7 +241,7 @@ fn test_jsonlines_complex_paths() -> eyre::Result<()> { #[test] fn test_jsonlines_complex_paths_error() -> eyre::Result<()> { let reader = FilesystemReader::new( - "tests/data/json_complex_paths.txt", + PathBuf::from("tests/data/json_complex_paths.txt"), ConnectorMode::Static, None, ReadMethod::ByLine, @@ -282,7 +283,7 @@ fn test_jsonlines_complex_paths_error() -> eyre::Result<()> { #[test] fn test_jsonlines_complex_path_ignore_errors() -> eyre::Result<()> { let reader = FilesystemReader::new( - "tests/data/json_complex_paths.txt", + PathBuf::from("tests/data/json_complex_paths.txt"), ConnectorMode::Static, None, ReadMethod::ByLine, @@ -321,7 +322,7 @@ fn test_jsonlines_complex_path_ignore_errors() -> eyre::Result<()> { #[test] fn test_jsonlines_incorrect_key_verbose_error() -> eyre::Result<()> { let reader = FilesystemReader::new( - "tests/data/jsonlines.txt", + PathBuf::from("tests/data/jsonlines.txt"), ConnectorMode::Static, None, ReadMethod::ByLine, @@ -351,7 +352,7 @@ fn test_jsonlines_incorrect_jsonpointer_verbose_error() -> eyre::Result<()> { routes.insert("d".to_string(), "/non/existent/path".to_string()); let reader = FilesystemReader::new( - "tests/data/jsonlines.txt", + PathBuf::from("tests/data/jsonlines.txt"), ConnectorMode::Static, None, ReadMethod::ByLine, @@ -378,7 +379,7 @@ fn test_jsonlines_incorrect_jsonpointer_verbose_error() -> eyre::Result<()> { #[test] fn test_jsonlines_failed_to_parse_field() -> eyre::Result<()> { let reader = FilesystemReader::new( - "tests/data/json_complex_paths.txt", + PathBuf::from("tests/data/json_complex_paths.txt"), ConnectorMode::Static, None, ReadMethod::ByLine, diff --git a/tests/integration/test_metadata.rs b/tests/integration/test_metadata.rs index 4fbedf2a..33c96bf7 100644 --- a/tests/integration/test_metadata.rs +++ b/tests/integration/test_metadata.rs @@ -3,6 +3,7 @@ use super::helpers::read_data_from_reader; use std::collections::HashMap; +use std::path::PathBuf; use pathway_engine::connectors::data_format::{ DsvParser, DsvSettings, IdentityParser, JsonLinesParser, ParsedEvent, @@ -30,7 +31,7 @@ fn check_file_name_in_metadata(data_read: &ParsedEvent, name: &str) { #[test] fn test_metadata_fs_dir() -> eyre::Result<()> { let reader = FilesystemReader::new( - "tests/data/csvdir/", + PathBuf::from("tests/data/csvdir/"), ConnectorMode::Static, None, ReadMethod::ByLine, @@ -60,7 +61,7 @@ fn test_metadata_fs_dir() -> eyre::Result<()> { #[test] fn test_metadata_fs_file() -> eyre::Result<()> { let reader = FilesystemReader::new( - "tests/data/minimal.txt", + PathBuf::from("tests/data/minimal.txt"), ConnectorMode::Static, None, ReadMethod::ByLine, @@ -91,7 +92,7 @@ fn test_metadata_csv_dir() -> eyre::Result<()> { builder.has_headers(false); let reader = CsvFilesystemReader::new( - "tests/data/csvdir/", + PathBuf::from("tests/data/csvdir/"), builder, ConnectorMode::Static, None, @@ -124,7 +125,7 @@ fn test_metadata_csv_file() -> eyre::Result<()> { builder.has_headers(false); let reader = CsvFilesystemReader::new( - "tests/data/minimal.txt", + PathBuf::from("tests/data/minimal.txt"), builder, ConnectorMode::Static, None, @@ -152,7 +153,7 @@ fn test_metadata_csv_file() -> eyre::Result<()> { #[test] fn test_metadata_json_file() -> eyre::Result<()> { let reader = FilesystemReader::new( - "tests/data/jsonlines.txt", + PathBuf::from("tests/data/jsonlines.txt"), ConnectorMode::Static, None, ReadMethod::ByLine, @@ -176,7 +177,7 @@ fn test_metadata_json_file() -> eyre::Result<()> { #[test] fn test_metadata_json_dir() -> eyre::Result<()> { let reader = FilesystemReader::new( - "tests/data/jsonlines/", + PathBuf::from("tests/data/jsonlines/"), ConnectorMode::Static, None, ReadMethod::ByLine, @@ -201,7 +202,7 @@ fn test_metadata_json_dir() -> eyre::Result<()> { #[test] fn test_metadata_identity_file() -> eyre::Result<()> { let reader = FilesystemReader::new( - "tests/data/jsonlines.txt", + PathBuf::from("tests/data/jsonlines.txt"), ConnectorMode::Static, None, ReadMethod::ByLine, @@ -222,7 +223,7 @@ fn test_metadata_identity_file() -> eyre::Result<()> { #[test] fn test_metadata_identity_dir() -> eyre::Result<()> { let reader = FilesystemReader::new( - "tests/data/jsonlines/", + PathBuf::from("tests/data/jsonlines/"), ConnectorMode::Static, None, ReadMethod::ByLine, diff --git a/tests/integration/test_seek.rs b/tests/integration/test_seek.rs index 61f5fc57..6507f70a 100644 --- a/tests/integration/test_seek.rs +++ b/tests/integration/test_seek.rs @@ -25,11 +25,17 @@ enum TestedFormat { Json, } -fn csv_reader_parser_pair(input_path: &str) -> (Box, Box) { +fn csv_reader_parser_pair(input_path: &Path) -> (Box, Box) { let mut builder = csv::ReaderBuilder::new(); builder.has_headers(false); - let reader = - CsvFilesystemReader::new(input_path, builder, ConnectorMode::Static, Some(1), "*").unwrap(); + let reader = CsvFilesystemReader::new( + input_path.to_path_buf(), + builder, + ConnectorMode::Static, + Some(1), + "*", + ) + .unwrap(); let parser = DsvParser::new( DsvSettings::new( Some(vec!["key".to_string()]), @@ -41,9 +47,9 @@ fn csv_reader_parser_pair(input_path: &str) -> (Box, Box (Box, Box) { +fn json_reader_parser_pair(input_path: &Path) -> (Box, Box) { let reader = FilesystemReader::new( - input_path, + input_path.to_path_buf(), ConnectorMode::Static, Some(1), ReadMethod::ByLine, @@ -68,8 +74,8 @@ fn full_cycle_read_kv( global_tracker: Option<&SharedWorkersPersistenceCoordinator>, ) -> FullReadResult { let (reader, mut parser) = match format { - TestedFormat::Csv => csv_reader_parser_pair(input_path.to_str().unwrap()), - TestedFormat::Json => json_reader_parser_pair(input_path.to_str().unwrap()), + TestedFormat::Csv => csv_reader_parser_pair(input_path), + TestedFormat::Json => json_reader_parser_pair(input_path), }; full_cycle_read(reader, parser.as_mut(), persistent_storage, global_tracker) }