Skip to content

Commit

Permalink
Support ndjson
Browse files Browse the repository at this point in the history
Signed-off-by: Andrew Stein <[email protected]>
  • Loading branch information
texodus committed Dec 9, 2024
1 parent da17600 commit dd1b000
Show file tree
Hide file tree
Showing 8 changed files with 331 additions and 6 deletions.
10 changes: 10 additions & 0 deletions cpp/perspective/src/cpp/server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1281,6 +1281,12 @@ ProtoServer::_handle_request(std::uint32_t client_id, const Request& req) {
Table::from_rows(index, r.data().from_rows(), limit);
break;
}
case proto::MakeTableData::kFromNdjson: {
table = Table::from_ndjson(
index, r.data().from_ndjson(), limit
);
break;
}
case proto::MakeTableData::kFromSchema: {
std::vector<std::string> columns;
std::vector<t_dtype> types;
Expand Down Expand Up @@ -1489,6 +1495,10 @@ ProtoServer::_handle_request(std::uint32_t client_id, const Request& req) {
table->update_cols(r.data().from_cols(), r.port_id());
break;
}
case proto::MakeTableData::kFromNdjson: {
table->update_ndjson(r.data().from_ndjson(), r.port_id());
break;
}
case proto::MakeTableData::kFromSchema:
case proto::MakeTableData::DATA_NOT_SET:
default: {
Expand Down
260 changes: 254 additions & 6 deletions cpp/perspective/src/cpp/table.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1305,12 +1305,6 @@ Table::from_rows(
const auto& psp_pkey_col = data_table.get_column("psp_pkey");
const auto& psp_okey_col = data_table.get_column("psp_okey");

// rapidjson::StringBuffer buffer;
// buffer.Clear();
// rapidjson::Writer<rapidjson::StringBuffer> writer(buffer);
// document.Accept(writer);
// std::cout << buffer.GetString() << std::endl;

// 3.) Fill table
for (const auto& row : document.GetArray()) {
for (const auto& it : row.GetObject()) {
Expand Down Expand Up @@ -1352,6 +1346,260 @@ Table::from_rows(
return tbl;
}

void
Table::update_ndjson(const std::string_view& data, std::uint32_t port_id) {
rapidjson::Document document;
rapidjson::StringStream s(data.data());
document.ParseStream<rapidjson::kParseStopWhenDoneFlag>(s);
if (document.Size() == 0) {
return;
}

if (!document.IsObject()) {
// TODO Legacy error message
PSP_COMPLAIN_AND_ABORT(
"Cannot determine data types without column names!\n"
)
}

bool is_implicit = m_index.empty();
t_schema table_schema = get_schema();

// 2.) Create table
t_data_table data_table(table_schema);
data_table.init();

// 2a.) Estimate row size to reduce malloc pressure.
auto newlines = 0;
for (char i : data) {
if (i == '\n') {
newlines++;
}
}

data_table.reserve(newlines + 1);
if (is_implicit) {
data_table.add_column("psp_pkey", DTYPE_INT32, true);
} else {
data_table.add_column(
"psp_pkey", table_schema.get_dtype(m_index), true
);
}

t_uindex ii = 0;
const auto& psp_pkey_col = data_table.get_column("psp_pkey");
auto schema = data_table.get_schema();
bool is_first_row = true;
std::vector<std::string> missing_columns = m_column_names;

// 3.) Fill table
bool is_finished = false;
while (!is_finished) {
if (is_implicit) {
psp_pkey_col->set_nth<std::uint32_t>(ii, (ii + m_offset) % m_limit);
}

for (const auto& it : document.GetObject()) {
std::shared_ptr<t_column> col;
std::string_view col_name = it.name.GetString();
if (std::string_view{it.name.GetString()} == "__INDEX__") {
col_name = "psp_pkey";
}

if (!schema.has_column(col_name)) {
LOG_DEBUG("Ignoring column " << col_name);
LOG_DEBUG("Schema:\n" << schema);
continue;
}

if (is_first_row) {
missing_columns.erase(
std ::remove(
missing_columns.begin(), missing_columns.end(), col_name
),
missing_columns.end()
);
}

col = data_table.get_column(col_name);
auto promote = fill_column_json(col, ii, it.value, true);
if (promote) {
std::stringstream ss;
ss << "Cannot append value of type " << dtype_to_str(*promote)
<< " to column of type " << dtype_to_str(col->get_dtype())
<< std::endl;
PSP_COMPLAIN_AND_ABORT(ss.str());
}

if (!is_implicit && m_index == it.name.GetString()) {
fill_column_json(psp_pkey_col, ii, it.value, true);
}
}

is_first_row = false;
if (ii + m_offset >= m_limit) {
for (auto& col_name : missing_columns) {
data_table.get_column(col_name)->unset(ii);
}
}

ii++;

document.ParseStream<rapidjson::kParseStopWhenDoneFlag>(s);
if (document.HasParseError()) {
is_finished = true;
}
}

data_table.extend(ii);
data_table.clone_column("psp_pkey", "psp_okey");
process_op_column(data_table, t_op::OP_INSERT);
calculate_offset(ii);
m_pool->send(get_gnode()->get_id(), port_id, data_table);
}

std::shared_ptr<Table>
Table::from_ndjson(
const std::string& index, const std::string_view& data, std::uint32_t limit
) {
auto pool = std::make_shared<t_pool>();
pool->init();

// 1.) Infer schema
rapidjson::Document document;
rapidjson::StringStream s(data.data());
document.ParseStream<rapidjson::kParseStopWhenDoneFlag>(s);

if (document.Size() > 0 && !document.IsObject()) {
std::stringstream ss;
ss << "Received non-object " << document[0].GetType();
PSP_COMPLAIN_AND_ABORT(ss.str())
}

std::vector<std::string> column_names;
std::vector<t_dtype> data_types;
bool is_implicit = true;
std::set<std::string> columns_known_type;
std::set<std::string> columns_seen;

// TODO I don't think it makes sense to do the same incremental-schema
// enhancement we do for regular JSON. For now this only checks the first
// row.
[&]() {
for (const auto& col : document.GetObject()) {
columns_seen.insert(col.name.GetString());
}

// https://github.com/Tencent/rapidjson/issues/1994
for (const auto& col : document.GetObject()) {
if (col.name.GetString() == index) {
is_implicit = false;
}

if (columns_known_type.count(col.name.GetString()) > 0) {
continue;
}

auto dtype = rapidjson_type_to_dtype(col.value);
if (dtype != DTYPE_NONE) {
columns_known_type.insert(col.name.GetString());
data_types.push_back(rapidjson_type_to_dtype(col.value));
column_names.emplace_back(col.name.GetString());
}

// Theoretically there can end too early if the first
// few rows are missing columns that are present in later rows.
if (columns_known_type.size() == columns_seen.size()) {
return;
}
}
}();

auto untyped_columns = columns_seen;
for (const auto& col : columns_seen) {
if (columns_known_type.count(col) == 0) {
// Default all null columns to string
data_types.push_back(DTYPE_STR);
column_names.emplace_back(col);
}
}

t_schema schema(column_names, data_types);

// 2.) Create table
t_data_table data_table(schema);
data_table.init();

if (is_implicit) {
data_table.add_column("psp_pkey", DTYPE_INT32, true);
data_table.add_column("psp_okey", DTYPE_INT32, true);
} else {
data_table.add_column("psp_pkey", schema.get_dtype(index), true);
data_table.add_column("psp_okey", schema.get_dtype(index), true);
}

std::int32_t ii = 0;
const auto& psp_pkey_col = data_table.get_column("psp_pkey");
const auto& psp_okey_col = data_table.get_column("psp_okey");

// 2a.) Estimate row size to reduce malloc pressure.
auto newlines = 0;
for (char i : data) {
if (i == '\n') {
newlines++;
}
}

data_table.reserve(newlines + 1);

// 3.) Fill table
bool is_finished = false;
while (!is_finished) {
for (const auto& it : document.GetObject()) {
auto col = data_table.get_column(it.name.GetString());
const auto* col_name = it.name.GetString();
const auto& cell = it.value;
auto promote = fill_column_json(col, ii, cell, false);
if (promote) {
LOG_DEBUG(
"Promoting column " << col_name << " from "
<< dtype_to_str(col->get_dtype())
<< " to " << dtype_to_str(*promote)
);

data_table.promote_column(col_name, *promote, ii, true);
col = data_table.get_column(col_name);
fill_column_json(col, ii, cell, false);
}

if (!is_implicit && index == it.name.GetString()) {
fill_column_json(psp_pkey_col, ii, it.value, false);
fill_column_json(psp_okey_col, ii, it.value, false);
}
}

if (is_implicit) {
psp_pkey_col->set_nth<std::int32_t>(ii, ii % limit);
psp_okey_col->set_nth<std::int32_t>(ii, ii % limit);
}

ii++;
document.ParseStream<rapidjson::kParseStopWhenDoneFlag>(s);
if (document.HasParseError()) {
is_finished = true;
}
}

data_table.extend(ii);
auto tbl = std::make_shared<Table>(
pool, schema.columns(), schema.types(), limit, index
);

tbl->init(data_table, ii, t_op::OP_INSERT, 0);
pool->_process();
return tbl;
}

std::shared_ptr<Table>
Table::from_schema(
const std::string& index, const t_schema& schema, std::uint32_t limit
Expand Down
7 changes: 7 additions & 0 deletions cpp/perspective/src/include/perspective/table.h
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,7 @@ class PERSPECTIVE_EXPORT Table {
void update_csv(const std::string_view& data, std::uint32_t port_id);
void update_rows(const std::string_view& data, std::uint32_t port_id);
void update_cols(const std::string_view& data, std::uint32_t port_id);
void update_ndjson(const std::string_view& data, std::uint32_t port_id);
// void update_cols(const std::string_view& data) const;

static std::shared_ptr<Table> from_csv(
Expand All @@ -218,6 +219,12 @@ class PERSPECTIVE_EXPORT Table {
std::uint32_t limit = std::numeric_limits<std::uint32_t>::max()
);

static std::shared_ptr<Table> from_ndjson(
const std::string& index,
const std::string_view& data,
std::uint32_t limit = std::numeric_limits<std::uint32_t>::max()
);

static std::shared_ptr<Table> from_schema(
const std::string& index,
const t_schema& schema,
Expand Down
1 change: 1 addition & 0 deletions cpp/protos/perspective.proto
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ message MakeTableData {
string from_rows = 4;
string from_cols = 5;
string from_view = 6;
string from_ndjson = 7;
};
}

Expand Down
4 changes: 4 additions & 0 deletions rust/perspective-client/src/rust/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ pub enum TableReadFormat {

#[serde(rename = "arrow")]
Arrow,

#[serde(rename = "ndjson")]
Ndjson,
}

impl TableReadFormat {
Expand All @@ -53,6 +56,7 @@ impl TableReadFormat {
Some("json") => Some(TableReadFormat::JsonString),
Some("columns") => Some(TableReadFormat::ColumnsString),
Some("arrow") => Some(TableReadFormat::Arrow),
Some("ndjson") => Some(TableReadFormat::Ndjson),
None => None,
Some(x) => return Err(format!("Unknown format \"{}\"", x)),
})
Expand Down
2 changes: 2 additions & 0 deletions rust/perspective-client/src/rust/table_data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ pub enum UpdateData {
Arrow(Bytes),
JsonRows(String),
JsonColumns(String),
Ndjson(String),
}

impl From<UpdateData> for TableData {
Expand Down Expand Up @@ -70,6 +71,7 @@ impl From<UpdateData> for proto::MakeTableData {
UpdateData::Arrow(x) => make_table_data::Data::FromArrow(x.into()),
UpdateData::JsonRows(x) => make_table_data::Data::FromRows(x),
UpdateData::JsonColumns(x) => make_table_data::Data::FromCols(x),
UpdateData::Ndjson(x) => make_table_data::Data::FromNdjson(x),
};

MakeTableData { data: Some(data) }
Expand Down
9 changes: 9 additions & 0 deletions rust/perspective-js/src/rust/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,9 @@ pub(crate) impl UpdateData {
Some(TableReadFormat::Arrow) => Ok(Some(UpdateData::Arrow(
value.as_string().into_apierror()?.into_bytes().into(),
))),
Some(TableReadFormat::Ndjson) => {
Ok(Some(UpdateData::Ndjson(value.as_string().into_apierror()?)))
},
}
} else if value.is_instance_of::<ArrayBuffer>() {
let uint8array = Uint8Array::new(value);
Expand All @@ -121,6 +124,9 @@ pub(crate) impl UpdateData {
Some(TableReadFormat::ColumnsString) => {
Ok(Some(UpdateData::JsonColumns(String::from_utf8(slice)?)))
},
Some(TableReadFormat::Ndjson) => {
Ok(Some(UpdateData::Ndjson(String::from_utf8(slice)?)))
},
None | Some(TableReadFormat::Arrow) => Ok(Some(UpdateData::Arrow(slice.into()))),
}
} else if let Some(uint8array) = value.dyn_ref::<Uint8Array>() {
Expand All @@ -133,6 +139,9 @@ pub(crate) impl UpdateData {
Some(TableReadFormat::ColumnsString) => {
Ok(Some(UpdateData::JsonColumns(String::from_utf8(slice)?)))
},
Some(TableReadFormat::Ndjson) => {
Ok(Some(UpdateData::Ndjson(String::from_utf8(slice)?)))
},
None | Some(TableReadFormat::Arrow) => Ok(Some(UpdateData::Arrow(slice.into()))),
}
} else if value.is_instance_of::<Array>() {
Expand Down
Loading

0 comments on commit dd1b000

Please sign in to comment.