-
Notifications
You must be signed in to change notification settings - Fork 8
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add Apache Arrow stream writers #147
Conversation
JesseMckinzie
commented
Sep 20, 2023
- Update Apache Arrow writers for Arrow IPC and Parquet formats to write in streams to make Arrow writing fully scalable
- Update Python API and CLI to use new writers
- Add unit tests for updated get_arrow_table method in the Python API
src/nyx/output_2_buffer.cpp
Outdated
@@ -27,6 +27,7 @@ | |||
|
|||
namespace Nyxus | |||
{ | |||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just a white-space change?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was left behind after removing a function I implemented
src/nyx/environment.h
Outdated
std::string arrow_output_type = ""; | ||
ArrowOutputStream arrow_stream; | ||
std::shared_ptr<ApacheArrowWriter> arrow_writer; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
initialize to nullptr
?
src/nyx/output_writers.h
Outdated
@@ -16,9 +20,20 @@ | |||
#include <memory> | |||
|
|||
#include "helpers/helpers.h" | |||
#include "globals.h" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not clear why we need this here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is no longer needed
src/nyx/scan_fastloader_way.cpp
Outdated
// Sanity | ||
#ifdef _WIN32 | ||
#include<windows.h> | ||
#endif | ||
|
||
#include <chrono> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need this?
if (ok == false) | ||
{ | ||
std::cout << "save_features_2_csv() returned an error code" << std::endl; | ||
return 2; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This need to be updated to refer to proper function call. May be something like
if (!ok){
if(save2csv){...}
else {...}
}
src/nyx/scan_fastloader_way.cpp
Outdated
if (!status.ok()) { | ||
// Handle read error | ||
auto err = status.ToString(); | ||
throw std::runtime_error("Error writing Arrow file: " + err); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we throw
? This can be performance degrading if we are in a situation where we keep throwing. Are we catching this in the caller code? If not, may be just ignore/log and move on?
Since this function already return error codes if anything goes wrong, we should just use the same error reporting mechanism instead of throw
.
src/nyx/main_nyxus.cpp
Outdated
use_arrow, // 'true' to save to csv | ||
theEnvironment.useCsv, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Update the arg comment :-)
src/nyx/output_writers_old.h
Outdated
@@ -0,0 +1,166 @@ | |||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this a required file?
src/nyx/scan_fastloader_way.cpp
Outdated
#ifdef USE_ARROW | ||
if (arrow_output) { | ||
|
||
auto features = Nyxus::get_feature_values(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess this is an extra call that we don't need?
throw std::invalid_argument("The arrow file path must end in \".arrow\""); | ||
} | ||
|
||
if (!(arrow_file_type_upper == "ARROW" || arrow_file_type_upper == "ARROWIPC" || arrow_file_type_upper == "PARQUET")) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How a user can specify via arrow_file_type_upper that the desired format is ".feather" ?
@@ -32,7 +32,7 @@ namespace Nyxus | |||
|
|||
bool scanFilePairParallel(const std::string& intens_fpath, const std::string& label_fpath, int num_fastloader_threads, int num_sensemaker_threads, int filepair_index, int tot_num_filepairs); | |||
std::string getPureFname(const std::string& fpath); | |||
int processDataset(const std::vector<std::string>& intensFiles, const std::vector<std::string>& labelFiles, int numFastloaderThreads, int numSensemakerThreads, int numReduceThreads, int min_online_roi_size, bool save2csv, const std::string& csvOutputDir); | |||
int processDataset(const std::vector<std::string>& intensFiles, const std::vector<std::string>& labelFiles, int numFastloaderThreads, int numSensemakerThreads, int numReduceThreads, int min_online_roi_size, bool save2csv, bool arrow_output, const std::string& csvOutputDir); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wouldn't having overloaded function processDataset() for the separate cases of CSV and Apache output types be less confusing than having a single overparametered processDataset() ?
#ifdef USE_ARROW | ||
use_arrow = theEnvironment.arrow_output_type == "ARROW" || theEnvironment.arrow_output_type == "PARQUET"; | ||
#endif | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This piece of logic is OK but is just somewhat not elegant to put such a low-level stuff in main(). Again, if we overload processDataset() as mentioned earlier, we would be able to call it separately with Apache-related and CSV parameters.
return 2; | ||
} | ||
} | ||
#endif |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
File scan_fastloader_way.cpp is getting cluttered with #ifdef USE_ARROW code. Which is OK for this PR but I would immediately fix in a subsequent PR to keep the image scan logic free of low-level file format stuff. (Principle of separation of concerns.)
std::shared_ptr<ApacheArrowWriter> create_arrow_file(const std::string& arrow_file_type, | ||
const std::string& arrow_file_path, | ||
const std::vector<std::string>& header); | ||
std::shared_ptr<arrow::Table> get_arrow_table(const std::string& file_path, arrow::Status& table_status); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This function call should change in a subsequent PR not to pass a ref to retrieve an error/status code. Instead the caller code will check for nullptr
on the return value.