Skip to content

Commit

Permalink
Read avro files without dependencies (part 5)
Browse files Browse the repository at this point in the history
PiperOrigin-RevId: 683922183
  • Loading branch information
achoum authored and copybara-github committed Oct 9, 2024
1 parent b050978 commit a7be9c1
Show file tree
Hide file tree
Showing 6 changed files with 338 additions and 2 deletions.
2 changes: 2 additions & 0 deletions yggdrasil_decision_forests/dataset/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -401,6 +401,8 @@ cc_library_ydf(
":data_spec",
":data_spec_cc_proto",
":data_spec_inference",
":example_reader_interface",
"//yggdrasil_decision_forests/utils:sharded_io",
"//yggdrasil_decision_forests/utils:status_macros",
"@com_google_absl//absl/status",
"@com_google_absl//absl/status:statusor",
Expand Down
2 changes: 1 addition & 1 deletion yggdrasil_decision_forests/dataset/avro.cc
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ absl::StatusOr<const std::string> GetJsonStringField(

} // namespace

absl::StatusOr<AvroType> ParseType(std::string_view key) {
absl::StatusOr<AvroType> ParseType(absl::string_view key) {
if (key == "null") {
return AvroType::kNull;
} else if (key == "boolean") {
Expand Down
2 changes: 1 addition & 1 deletion yggdrasil_decision_forests/dataset/avro.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ enum class AvroType {
};

std::string TypeToString(AvroType type);
absl::StatusOr<AvroType> ParseType(std::string_view key);
absl::StatusOr<AvroType> ParseType(absl::string_view key);

enum class AvroCodec {
kNull = 0,
Expand Down
194 changes: 194 additions & 0 deletions yggdrasil_decision_forests/dataset/avro_example.cc
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,200 @@ absl::Status InitializeUnstackedColumn(

} // namespace

absl::Status AvroExampleReader::Implementation::OpenShard(
absl::string_view path) {
const bool is_first_file = reader_ == nullptr;
auto save_previous_reader = std::move(reader_);
ASSIGN_OR_RETURN(reader_, AvroReader::Create(path));
if (is_first_file) {
RETURN_IF_ERROR(ComputeReadingMaps(reader_->fields(), dataspec_,
&univariate_field_idx_to_column_idx_,
&multivariate_field_idx_to_unroll_idx_));
} else {
if (save_previous_reader->fields() != reader_->fields()) {
return absl::InvalidArgumentError(
"All the files in the same shard should have the same schema.");
}
}
return absl::OkStatus();
}

absl::StatusOr<bool> AvroExampleReader::Implementation::NextInShard(
proto::Example* example) {
example->clear_attributes();
while (example->attributes_size() < dataspec_.columns_size()) {
example->add_attributes();
}

ASSIGN_OR_RETURN(const bool has_record, reader_->ReadNextRecord());
if (!has_record) {
return false;
}

int field_idx = 0;
for (const auto& field : reader_->fields()) {
switch (field.type) {
case AvroType::kUnknown:
case AvroType::kNull:
break;

case AvroType::kBoolean: {
ASSIGN_OR_RETURN(const auto value,
reader_->ReadNextFieldBoolean(field));
const int col_idx = univariate_field_idx_to_column_idx_[field_idx];
if (col_idx == -1) {
// Ignore field.
break;
}
if (!value.has_value()) {
break;
}
example->mutable_attributes(col_idx)->set_boolean(*value);
} break;

case AvroType::kLong:
case AvroType::kInt: {
ASSIGN_OR_RETURN(const auto value,
reader_->ReadNextFieldInteger(field));
const int col_idx = univariate_field_idx_to_column_idx_[field_idx];
if (col_idx == -1) {
// Ignore field.
break;
}
if (!value.has_value()) {
break;
}
example->mutable_attributes(col_idx)->set_numerical(*value);
} break;

case AvroType::kDouble:
case AvroType::kFloat: {
absl::optional<double> value;
if (field.type == AvroType::kFloat) {
ASSIGN_OR_RETURN(value, reader_->ReadNextFieldFloat(field));
} else {
ASSIGN_OR_RETURN(value, reader_->ReadNextFieldDouble(field));
}
if (value.has_value() && std::isnan(*value)) {
value = absl::nullopt;
}
const int col_idx = univariate_field_idx_to_column_idx_[field_idx];
if (col_idx == -1) {
// Ignore field.
break;
}
if (!value.has_value()) {
break;
}
example->mutable_attributes(col_idx)->set_numerical(*value);
} break;

case AvroType::kBytes:
case AvroType::kString: {
std::string value;
ASSIGN_OR_RETURN(const auto has_value,
reader_->ReadNextFieldString(field, &value));
const int col_idx = univariate_field_idx_to_column_idx_[field_idx];
if (col_idx == -1) {
// Ignore field.
break;
}
if (!has_value) {
break;
}
ASSIGN_OR_RETURN(auto int_value,
CategoricalStringToValueWithStatus(
value, dataspec_.columns(col_idx)));
example->mutable_attributes(col_idx)->set_categorical(int_value);
} break;

case AvroType::kArray:
switch (field.sub_type) {
case AvroType::kDouble:
case AvroType::kFloat: {
bool has_value;
std::vector<float> values;
if (field.sub_type == AvroType::kFloat) {
ASSIGN_OR_RETURN(
has_value, reader_->ReadNextFieldArrayFloat(field, &values));
} else {
ASSIGN_OR_RETURN(
has_value,
reader_->ReadNextFieldArrayDoubleIntoFloat(field, &values));
}
if (!has_value) {
break;
}

// Check if field used.
const auto unstacked_idx =
multivariate_field_idx_to_unroll_idx_[field_idx];
if (unstacked_idx == -1) {
break;
}

auto& unstacked = dataspec_.unstackeds(unstacked_idx);
for (int dim_idx = 0; dim_idx < values.size(); dim_idx++) {
const int col_idx = unstacked.begin_column_idx() + dim_idx;
const float value = values[dim_idx];
if (!std::isnan(value)) {
example->mutable_attributes(col_idx)->set_numerical(value);
}
}
} break;

case AvroType::kString:
case AvroType::kBytes: {
std::vector<std::string> values;
ASSIGN_OR_RETURN(const auto has_value,
reader_->ReadNextFieldArrayString(field, &values));
if (!has_value) {
break;
}

const auto univariate_col_idx =
univariate_field_idx_to_column_idx_[field_idx];
if (univariate_col_idx != -1) {
const auto& col_spec = dataspec_.columns(univariate_col_idx);
for (const auto& value : values) {
ASSIGN_OR_RETURN(
auto int_value,
CategoricalStringToValueWithStatus(value, col_spec));
example->mutable_attributes(univariate_col_idx)
->mutable_categorical_set()
->add_values(int_value);
}
break;
}

// Check if field used.
const auto unstacked_idx =
multivariate_field_idx_to_unroll_idx_[field_idx];
if (unstacked_idx == -1) {
break;
}

auto& unstacked = dataspec_.unstackeds(unstacked_idx);
for (int dim_idx = 0; dim_idx < values.size(); dim_idx++) {
const int col_idx = unstacked.begin_column_idx() + dim_idx;
const auto& col_spec = dataspec_.columns(col_idx);
ASSIGN_OR_RETURN(auto int_value,
CategoricalStringToValueWithStatus(
values[dim_idx], col_spec));
example->mutable_attributes(col_idx)->set_categorical(int_value);
}
} break;
default:
return absl::UnimplementedError("Unsupported type");
}
break;
}
field_idx++;
}
DCHECK_EQ(field_idx, reader_->fields().size());
return true;
}

absl::StatusOr<dataset::proto::DataSpecification> CreateDataspec(
absl::string_view path, dataset::proto::DataSpecificationGuide& guide) {
// TODO: Reading of multiple paths.
Expand Down
55 changes: 55 additions & 0 deletions yggdrasil_decision_forests/dataset/avro_example.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,74 @@
#define YGGDRASIL_DECISION_FORESTS_DATASET_AVRO_EXAMPLE_H_

#include <cstddef>
#include <memory>
#include <vector>

#include "absl/status/status.h"
#include "absl/status/statusor.h"
#include "absl/strings/string_view.h"
#include "absl/types/optional.h"
#include "yggdrasil_decision_forests/dataset/avro.h"
#include "yggdrasil_decision_forests/dataset/data_spec.pb.h"
#include "yggdrasil_decision_forests/dataset/example_reader_interface.h"
#include "yggdrasil_decision_forests/utils/sharded_io.h"

namespace yggdrasil_decision_forests::dataset::avro {

// Creates a dataspec from the Avro file.
absl::StatusOr<dataset::proto::DataSpecification> CreateDataspec(
absl::string_view path, dataset::proto::DataSpecificationGuide& guide);

class AvroExampleReader final : public ExampleReaderInterface {
public:
explicit AvroExampleReader(const proto::DataSpecification& data_spec,
absl::optional<std::vector<int>> required_columns)
: sharded_reader_(data_spec, required_columns) {}

absl::StatusOr<bool> Next(proto::Example* example) override {
return sharded_reader_.Next(example);
}

absl::Status Open(absl::string_view sharded_path) override {
return sharded_reader_.Open(sharded_path);
}

private:
class Implementation final : public utils::ShardedReader<proto::Example> {
public:
explicit Implementation(
const proto::DataSpecification& data_spec,
const absl::optional<std::vector<int>>& required_columns)
: dataspec_(data_spec), required_columns_(required_columns) {}

protected:
// Opens the Avro file at "path", and check that the header is as expected.
absl::Status OpenShard(absl::string_view path) override;

// Scans a new row in the Avro file, and parses it as a proto:Example.
absl::StatusOr<bool> NextInShard(proto::Example* example) override;

private:
// The data spec.
const proto::DataSpecification dataspec_;

// Currently, open file;
std::unique_ptr<AvroReader> reader_;

// Mapping between the Avro field index and the column index for the
// univariate features. -1's are used for ignored fields.
std::vector<int> univariate_field_idx_to_column_idx_;

// Mapping between the Avro field index and the unstacked index for the
// multivariate features. -1's are used for ignored fields.
std::vector<int> multivariate_field_idx_to_unroll_idx_;

const absl::optional<std::vector<int>> required_columns_;
};

Implementation sharded_reader_;
};

namespace internal {

// Infers the dataspec from the Avro file i.e. find the columns, but do not
Expand Down
85 changes: 85 additions & 0 deletions yggdrasil_decision_forests/dataset/avro_example_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -347,5 +347,90 @@ TEST(AvroExample, CreateDataspec) {
EXPECT_THAT(dataspec, EqualsProto(expected));
}

TEST(AvroExample, ReadExample) {
dataset::proto::DataSpecificationGuide guide;
{
auto* col = guide.add_column_guides();
col->set_column_name_pattern("^f_another_array_of_string$");
col->set_type(proto::ColumnType::CATEGORICAL_SET);
}

{
auto* col = guide.add_column_guides();
col->set_column_name_pattern("^f_another_float$");
col->set_ignore_column(true);
}

guide.mutable_default_column_guide()
->mutable_categorial()
->set_min_vocab_frequency(1);

ASSERT_OK_AND_ASSIGN(
const auto dataspec,
CreateDataspec(file::JoinPath(DatasetDir(), "toy_codex-null.avro"),
guide));

AvroExampleReader reader(dataspec, {});
ASSERT_OK(reader.Open(file::JoinPath(DatasetDir(), "toy_codex-null.avro")));
proto::Example example;
ASSERT_OK_AND_ASSIGN(bool has_next, reader.Next(&example));
ASSERT_TRUE(has_next);

const proto::Example expected_1 = PARSE_TEST_PROTO(R"pb(
attributes { boolean: true }
attributes { numerical: 5 }
attributes { numerical: 1234567 }
attributes { numerical: 3.1415 }
attributes {}
attributes { categorical: 1 }
attributes { categorical: 1 }
attributes { numerical: 6.1 }
attributes { categorical_set { values: 4 values: 3 values: 1 } }
attributes { numerical: 1 }
attributes { numerical: 2 }
attributes { numerical: 3 }
attributes { numerical: 10 }
attributes { numerical: 20 }
attributes { numerical: 30 }
attributes { categorical: 2 }
attributes { categorical: 1 }
attributes { categorical: 1 }
attributes { numerical: 1 }
attributes { numerical: 2 }
attributes { numerical: 3 }
)pb");
EXPECT_THAT(example, EqualsProto(expected_1));

ASSERT_OK_AND_ASSIGN(has_next, reader.Next(&example));
ASSERT_TRUE(has_next);
const proto::Example expected_2 = PARSE_TEST_PROTO(R"pb(
attributes { boolean: false }
attributes { numerical: 6 }
attributes { numerical: -123 }
attributes { numerical: -1.234 }
attributes { numerical: 6.789 }
attributes { categorical: 2 }
attributes { categorical: 2 }
attributes {}
attributes { categorical_set { values: 1 values: 2 } }
attributes { numerical: 4 }
attributes { numerical: 5 }
attributes { numerical: 6 }
attributes { numerical: 40 }
attributes { numerical: 50 }
attributes { numerical: 60 }
attributes { categorical: 1 }
attributes { categorical: 2 }
attributes { categorical: 2 }
attributes {}
attributes {}
attributes {}
)pb");
EXPECT_THAT(example, EqualsProto(expected_2));

ASSERT_OK_AND_ASSIGN(has_next, reader.Next(&example));
ASSERT_FALSE(has_next);
}

} // namespace
} // namespace yggdrasil_decision_forests::dataset::avro

0 comments on commit a7be9c1

Please sign in to comment.