Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Update 'advanced' flag for concurrency multithreading and data input in FileComponent #5901

Merged
merged 2 commits into from
Jan 23, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/backend/base/langflow/base/data/base_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ def __init__(self, *args, **kwargs):
required=False,
input_types=["Data", "Message"],
is_list=True,
advanced=True,
),
BoolInput(
name="silent_errors",
Expand Down
2 changes: 1 addition & 1 deletion src/backend/base/langflow/components/data/file.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ class FileComponent(BaseFileComponent):
IntInput(
name="concurrency_multithreading",
display_name="Processing Concurrency",
advanced=False,
advanced=True,
info="When multiple files are being processed, the number of files to process concurrently.",
value=1,
),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -977,11 +977,11 @@
"show": true,
"title_case": false,
"type": "code",
"value": "from langflow.base.data import BaseFileComponent\nfrom langflow.base.data.utils import TEXT_FILE_TYPES, parallel_load_data, parse_text_file_to_data\nfrom langflow.io import BoolInput, IntInput\nfrom langflow.schema import Data\n\n\nclass FileComponent(BaseFileComponent):\n \"\"\"Handles loading and processing of individual or zipped text files.\n\n This component supports processing multiple valid files within a zip archive,\n resolving paths, validating file types, and optionally using multithreading for processing.\n \"\"\"\n\n display_name = \"File\"\n description = \"Load a file to be used in your project.\"\n icon = \"file-text\"\n name = \"File\"\n\n VALID_EXTENSIONS = TEXT_FILE_TYPES\n\n inputs = [\n *BaseFileComponent._base_inputs,\n BoolInput(\n name=\"use_multithreading\",\n display_name=\"[Deprecated] Use Multithreading\",\n advanced=True,\n value=True,\n info=\"Set 'Processing Concurrency' greater than 1 to enable multithreading.\",\n ),\n IntInput(\n name=\"concurrency_multithreading\",\n display_name=\"Processing Concurrency\",\n advanced=False,\n info=\"When multiple files are being processed, the number of files to process concurrently.\",\n value=1,\n ),\n ]\n\n outputs = [\n *BaseFileComponent._base_outputs,\n ]\n\n def process_files(self, file_list: list[BaseFileComponent.BaseFile]) -> list[BaseFileComponent.BaseFile]:\n \"\"\"Processes files either sequentially or in parallel, depending on concurrency settings.\n\n Args:\n file_list (list[BaseFileComponent.BaseFile]): List of files to process.\n\n Returns:\n list[BaseFileComponent.BaseFile]: Updated list of files with merged data.\n \"\"\"\n\n def process_file(file_path: str, *, silent_errors: bool = False) -> Data | None:\n \"\"\"Processes a single file and returns its Data object.\"\"\"\n try:\n return parse_text_file_to_data(file_path, silent_errors=silent_errors)\n except FileNotFoundError as e:\n msg = f\"File not found: {file_path}. Error: {e}\"\n self.log(msg)\n if not silent_errors:\n raise\n return None\n except Exception as e:\n msg = f\"Unexpected error processing {file_path}: {e}\"\n self.log(msg)\n if not silent_errors:\n raise\n return None\n\n if not file_list:\n msg = \"No files to process.\"\n raise ValueError(msg)\n\n concurrency = 1 if not self.use_multithreading else max(1, self.concurrency_multithreading)\n file_count = len(file_list)\n\n parallel_processing_threshold = 2\n if concurrency < parallel_processing_threshold or file_count < parallel_processing_threshold:\n if file_count > 1:\n self.log(f\"Processing {file_count} files sequentially.\")\n processed_data = [process_file(str(file.path), silent_errors=self.silent_errors) for file in file_list]\n else:\n self.log(f\"Starting parallel processing of {file_count} files with concurrency: {concurrency}.\")\n file_paths = [str(file.path) for file in file_list]\n processed_data = parallel_load_data(\n file_paths,\n silent_errors=self.silent_errors,\n load_function=process_file,\n max_concurrency=concurrency,\n )\n\n # Use rollup_basefile_data to merge processed data with BaseFile objects\n return self.rollup_data(file_list, processed_data)\n"
"value": "from langflow.base.data import BaseFileComponent\nfrom langflow.base.data.utils import TEXT_FILE_TYPES, parallel_load_data, parse_text_file_to_data\nfrom langflow.io import BoolInput, IntInput\nfrom langflow.schema import Data\n\n\nclass FileComponent(BaseFileComponent):\n \"\"\"Handles loading and processing of individual or zipped text files.\n\n This component supports processing multiple valid files within a zip archive,\n resolving paths, validating file types, and optionally using multithreading for processing.\n \"\"\"\n\n display_name = \"File\"\n description = \"Load a file to be used in your project.\"\n icon = \"file-text\"\n name = \"File\"\n\n VALID_EXTENSIONS = TEXT_FILE_TYPES\n\n inputs = [\n *BaseFileComponent._base_inputs,\n BoolInput(\n name=\"use_multithreading\",\n display_name=\"[Deprecated] Use Multithreading\",\n advanced=True,\n value=True,\n info=\"Set 'Processing Concurrency' greater than 1 to enable multithreading.\",\n ),\n IntInput(\n name=\"concurrency_multithreading\",\n display_name=\"Processing Concurrency\",\n advanced=True,\n info=\"When multiple files are being processed, the number of files to process concurrently.\",\n value=1,\n ),\n ]\n\n outputs = [\n *BaseFileComponent._base_outputs,\n ]\n\n def process_files(self, file_list: list[BaseFileComponent.BaseFile]) -> list[BaseFileComponent.BaseFile]:\n \"\"\"Processes files either sequentially or in parallel, depending on concurrency settings.\n\n Args:\n file_list (list[BaseFileComponent.BaseFile]): List of files to process.\n\n Returns:\n list[BaseFileComponent.BaseFile]: Updated list of files with merged data.\n \"\"\"\n\n def process_file(file_path: str, *, silent_errors: bool = False) -> Data | None:\n \"\"\"Processes a single file and returns its Data object.\"\"\"\n try:\n return parse_text_file_to_data(file_path, silent_errors=silent_errors)\n except FileNotFoundError as e:\n msg = f\"File not found: {file_path}. Error: {e}\"\n self.log(msg)\n if not silent_errors:\n raise\n return None\n except Exception as e:\n msg = f\"Unexpected error processing {file_path}: {e}\"\n self.log(msg)\n if not silent_errors:\n raise\n return None\n\n if not file_list:\n msg = \"No files to process.\"\n raise ValueError(msg)\n\n concurrency = 1 if not self.use_multithreading else max(1, self.concurrency_multithreading)\n file_count = len(file_list)\n\n parallel_processing_threshold = 2\n if concurrency < parallel_processing_threshold or file_count < parallel_processing_threshold:\n if file_count > 1:\n self.log(f\"Processing {file_count} files sequentially.\")\n processed_data = [process_file(str(file.path), silent_errors=self.silent_errors) for file in file_list]\n else:\n self.log(f\"Starting parallel processing of {file_count} files with concurrency: {concurrency}.\")\n file_paths = [str(file.path) for file in file_list]\n processed_data = parallel_load_data(\n file_paths,\n silent_errors=self.silent_errors,\n load_function=process_file,\n max_concurrency=concurrency,\n )\n\n # Use rollup_basefile_data to merge processed data with BaseFile objects\n return self.rollup_data(file_list, processed_data)\n"
},
"concurrency_multithreading": {
"_input_type": "IntInput",
"advanced": false,
"advanced": true,
"display_name": "Processing Concurrency",
"dynamic": false,
"info": "When multiple files are being processed, the number of files to process concurrently.",
Expand Down Expand Up @@ -1013,7 +1013,7 @@
},
"file_path": {
"_input_type": "HandleInput",
"advanced": false,
"advanced": true,
"display_name": "Server File Path",
"dynamic": false,
"info": "Data object with a 'file_path' property pointing to server file or a Message object with a path to the file. Supercedes 'Path' but supports same file types.",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2493,11 +2493,11 @@
"show": true,
"title_case": false,
"type": "code",
"value": "from langflow.base.data import BaseFileComponent\nfrom langflow.base.data.utils import TEXT_FILE_TYPES, parallel_load_data, parse_text_file_to_data\nfrom langflow.io import BoolInput, IntInput\nfrom langflow.schema import Data\n\n\nclass FileComponent(BaseFileComponent):\n \"\"\"Handles loading and processing of individual or zipped text files.\n\n This component supports processing multiple valid files within a zip archive,\n resolving paths, validating file types, and optionally using multithreading for processing.\n \"\"\"\n\n display_name = \"File\"\n description = \"Load a file to be used in your project.\"\n icon = \"file-text\"\n name = \"File\"\n\n VALID_EXTENSIONS = TEXT_FILE_TYPES\n\n inputs = [\n *BaseFileComponent._base_inputs,\n BoolInput(\n name=\"use_multithreading\",\n display_name=\"[Deprecated] Use Multithreading\",\n advanced=True,\n value=True,\n info=\"Set 'Processing Concurrency' greater than 1 to enable multithreading.\",\n ),\n IntInput(\n name=\"concurrency_multithreading\",\n display_name=\"Processing Concurrency\",\n advanced=False,\n info=\"When multiple files are being processed, the number of files to process concurrently.\",\n value=1,\n ),\n ]\n\n outputs = [\n *BaseFileComponent._base_outputs,\n ]\n\n def process_files(self, file_list: list[BaseFileComponent.BaseFile]) -> list[BaseFileComponent.BaseFile]:\n \"\"\"Processes files either sequentially or in parallel, depending on concurrency settings.\n\n Args:\n file_list (list[BaseFileComponent.BaseFile]): List of files to process.\n\n Returns:\n list[BaseFileComponent.BaseFile]: Updated list of files with merged data.\n \"\"\"\n\n def process_file(file_path: str, *, silent_errors: bool = False) -> Data | None:\n \"\"\"Processes a single file and returns its Data object.\"\"\"\n try:\n return parse_text_file_to_data(file_path, silent_errors=silent_errors)\n except FileNotFoundError as e:\n msg = f\"File not found: {file_path}. Error: {e}\"\n self.log(msg)\n if not silent_errors:\n raise\n return None\n except Exception as e:\n msg = f\"Unexpected error processing {file_path}: {e}\"\n self.log(msg)\n if not silent_errors:\n raise\n return None\n\n if not file_list:\n msg = \"No files to process.\"\n raise ValueError(msg)\n\n concurrency = 1 if not self.use_multithreading else max(1, self.concurrency_multithreading)\n file_count = len(file_list)\n\n parallel_processing_threshold = 2\n if concurrency < parallel_processing_threshold or file_count < parallel_processing_threshold:\n if file_count > 1:\n self.log(f\"Processing {file_count} files sequentially.\")\n processed_data = [process_file(str(file.path), silent_errors=self.silent_errors) for file in file_list]\n else:\n self.log(f\"Starting parallel processing of {file_count} files with concurrency: {concurrency}.\")\n file_paths = [str(file.path) for file in file_list]\n processed_data = parallel_load_data(\n file_paths,\n silent_errors=self.silent_errors,\n load_function=process_file,\n max_concurrency=concurrency,\n )\n\n # Use rollup_basefile_data to merge processed data with BaseFile objects\n return self.rollup_data(file_list, processed_data)\n"
"value": "from langflow.base.data import BaseFileComponent\nfrom langflow.base.data.utils import TEXT_FILE_TYPES, parallel_load_data, parse_text_file_to_data\nfrom langflow.io import BoolInput, IntInput\nfrom langflow.schema import Data\n\n\nclass FileComponent(BaseFileComponent):\n \"\"\"Handles loading and processing of individual or zipped text files.\n\n This component supports processing multiple valid files within a zip archive,\n resolving paths, validating file types, and optionally using multithreading for processing.\n \"\"\"\n\n display_name = \"File\"\n description = \"Load a file to be used in your project.\"\n icon = \"file-text\"\n name = \"File\"\n\n VALID_EXTENSIONS = TEXT_FILE_TYPES\n\n inputs = [\n *BaseFileComponent._base_inputs,\n BoolInput(\n name=\"use_multithreading\",\n display_name=\"[Deprecated] Use Multithreading\",\n advanced=True,\n value=True,\n info=\"Set 'Processing Concurrency' greater than 1 to enable multithreading.\",\n ),\n IntInput(\n name=\"concurrency_multithreading\",\n display_name=\"Processing Concurrency\",\n advanced=True,\n info=\"When multiple files are being processed, the number of files to process concurrently.\",\n value=1,\n ),\n ]\n\n outputs = [\n *BaseFileComponent._base_outputs,\n ]\n\n def process_files(self, file_list: list[BaseFileComponent.BaseFile]) -> list[BaseFileComponent.BaseFile]:\n \"\"\"Processes files either sequentially or in parallel, depending on concurrency settings.\n\n Args:\n file_list (list[BaseFileComponent.BaseFile]): List of files to process.\n\n Returns:\n list[BaseFileComponent.BaseFile]: Updated list of files with merged data.\n \"\"\"\n\n def process_file(file_path: str, *, silent_errors: bool = False) -> Data | None:\n \"\"\"Processes a single file and returns its Data object.\"\"\"\n try:\n return parse_text_file_to_data(file_path, silent_errors=silent_errors)\n except FileNotFoundError as e:\n msg = f\"File not found: {file_path}. Error: {e}\"\n self.log(msg)\n if not silent_errors:\n raise\n return None\n except Exception as e:\n msg = f\"Unexpected error processing {file_path}: {e}\"\n self.log(msg)\n if not silent_errors:\n raise\n return None\n\n if not file_list:\n msg = \"No files to process.\"\n raise ValueError(msg)\n\n concurrency = 1 if not self.use_multithreading else max(1, self.concurrency_multithreading)\n file_count = len(file_list)\n\n parallel_processing_threshold = 2\n if concurrency < parallel_processing_threshold or file_count < parallel_processing_threshold:\n if file_count > 1:\n self.log(f\"Processing {file_count} files sequentially.\")\n processed_data = [process_file(str(file.path), silent_errors=self.silent_errors) for file in file_list]\n else:\n self.log(f\"Starting parallel processing of {file_count} files with concurrency: {concurrency}.\")\n file_paths = [str(file.path) for file in file_list]\n processed_data = parallel_load_data(\n file_paths,\n silent_errors=self.silent_errors,\n load_function=process_file,\n max_concurrency=concurrency,\n )\n\n # Use rollup_basefile_data to merge processed data with BaseFile objects\n return self.rollup_data(file_list, processed_data)\n"
},
"concurrency_multithreading": {
"_input_type": "IntInput",
"advanced": false,
"advanced": true,
"display_name": "Processing Concurrency",
"dynamic": false,
"info": "When multiple files are being processed, the number of files to process concurrently.",
Expand Down Expand Up @@ -2529,7 +2529,7 @@
},
"file_path": {
"_input_type": "HandleInput",
"advanced": false,
"advanced": true,
"display_name": "Server File Path",
"dynamic": false,
"info": "Data object with a 'file_path' property pointing to server file or a Message object with a path to the file. Supercedes 'Path' but supports same file types.",
Expand Down
Loading