From 73806e3d065c2a7eba22248686c0f8a802d3cf5d Mon Sep 17 00:00:00 2001 From: zhilingc Date: Thu, 14 May 2020 10:53:24 +0800 Subject: [PATCH 1/3] Apply default project to incoming rows without project defined --- .../core/job/dataflow/DataflowJobManager.java | 1 + .../job/direct/DirectRunnerJobManager.java | 2 + .../main/java/feast/core/model/Project.java | 1 + .../main/java/feast/ingestion/ImportJob.java | 1 + .../ingestion/options/ImportOptions.java | 6 +++ .../ProcessAndValidateFeatureRows.java | 6 ++- .../transform/fn/ProcessFeatureRowDoFn.java | 18 ++++++- .../ProcessAndValidateFeatureRowsTest.java | 52 +++++++++++++++++++ 8 files changed, 85 insertions(+), 2 deletions(-) diff --git a/core/src/main/java/feast/core/job/dataflow/DataflowJobManager.java b/core/src/main/java/feast/core/job/dataflow/DataflowJobManager.java index a504c62317..efee662e3a 100644 --- a/core/src/main/java/feast/core/job/dataflow/DataflowJobManager.java +++ b/core/src/main/java/feast/core/job/dataflow/DataflowJobManager.java @@ -279,6 +279,7 @@ private ImportOptions getPipelineOptions( pipelineOptions.setFeatureSetJson(featureSetJsonCompressor.compress(featureSets)); pipelineOptions.setStoreJson(Collections.singletonList(JsonFormat.printer().print(sink))); pipelineOptions.setProject(projectId); + pipelineOptions.setDefaultFeastProject(Project.DEFAULT_NAME); pipelineOptions.setUpdate(update); pipelineOptions.setRunner(DataflowRunner.class); pipelineOptions.setJobName(jobName); diff --git a/core/src/main/java/feast/core/job/direct/DirectRunnerJobManager.java b/core/src/main/java/feast/core/job/direct/DirectRunnerJobManager.java index 4bcff01686..2e2b43047e 100644 --- a/core/src/main/java/feast/core/job/direct/DirectRunnerJobManager.java +++ b/core/src/main/java/feast/core/job/direct/DirectRunnerJobManager.java @@ -26,6 +26,7 @@ import feast.core.model.FeatureSet; import feast.core.model.Job; import feast.core.model.JobStatus; +import feast.core.model.Project; import feast.core.util.TypeConversion; import feast.ingestion.ImportJob; import feast.ingestion.options.BZip2Compressor; @@ -105,6 +106,7 @@ private ImportOptions getPipelineOptions( pipelineOptions.setJobName(jobName); pipelineOptions.setStoreJson(Collections.singletonList(JsonFormat.printer().print(sink))); pipelineOptions.setRunner(DirectRunner.class); + pipelineOptions.setDefaultFeastProject(Project.DEFAULT_NAME); pipelineOptions.setProject(""); // set to default value to satisfy validation if (metrics.isEnabled()) { pipelineOptions.setMetricsExporterType(metrics.getType()); diff --git a/core/src/main/java/feast/core/model/Project.java b/core/src/main/java/feast/core/model/Project.java index d6e6149394..c55830c824 100644 --- a/core/src/main/java/feast/core/model/Project.java +++ b/core/src/main/java/feast/core/model/Project.java @@ -34,6 +34,7 @@ @Entity @Table(name = "projects") public class Project { + public static final String DEFAULT_NAME = "default"; // Name of the project @Id diff --git a/ingestion/src/main/java/feast/ingestion/ImportJob.java b/ingestion/src/main/java/feast/ingestion/ImportJob.java index 0420ddcabb..16efe11f55 100644 --- a/ingestion/src/main/java/feast/ingestion/ImportJob.java +++ b/ingestion/src/main/java/feast/ingestion/ImportJob.java @@ -130,6 +130,7 @@ public static PipelineResult runPipeline(ImportOptions options) throws IOExcepti .get(FEATURE_ROW_OUT) .apply( ProcessAndValidateFeatureRows.newBuilder() + .setDefaultProject(options.getDefaultFeastProject()) .setFeatureSetSpecs(featureSetSpecsByKey) .setSuccessTag(FEATURE_ROW_OUT) .setFailureTag(DEADLETTER_OUT) diff --git a/ingestion/src/main/java/feast/ingestion/options/ImportOptions.java b/ingestion/src/main/java/feast/ingestion/options/ImportOptions.java index 1fa127d662..f2bf31d1cd 100644 --- a/ingestion/src/main/java/feast/ingestion/options/ImportOptions.java +++ b/ingestion/src/main/java/feast/ingestion/options/ImportOptions.java @@ -27,6 +27,12 @@ /** Options passed to Beam to influence the job's execution environment */ public interface ImportOptions extends PipelineOptions, DataflowPipelineOptions, DirectOptions { + @Required + @Description("Default feast project to apply to incoming rows that do not specify project in its feature set reference.") + String getDefaultFeastProject(); + + void setDefaultFeastProject(String defaultProject); + @Required @Description( "JSON string representation of the FeatureSet that the import job will process, in BZip2 binary format." diff --git a/ingestion/src/main/java/feast/ingestion/transform/ProcessAndValidateFeatureRows.java b/ingestion/src/main/java/feast/ingestion/transform/ProcessAndValidateFeatureRows.java index 50c3e0ee4f..2ce8918ccf 100644 --- a/ingestion/src/main/java/feast/ingestion/transform/ProcessAndValidateFeatureRows.java +++ b/ingestion/src/main/java/feast/ingestion/transform/ProcessAndValidateFeatureRows.java @@ -39,6 +39,8 @@ public abstract class ProcessAndValidateFeatureRows public abstract Map getFeatureSetSpecs(); + public abstract String getDefaultProject(); + public abstract TupleTag getSuccessTag(); public abstract TupleTag getFailureTag(); @@ -53,6 +55,8 @@ public abstract static class Builder { public abstract Builder setFeatureSetSpecs( Map featureSets); + public abstract Builder setDefaultProject(String defaultProject); + public abstract Builder setSuccessTag(TupleTag successTag); public abstract Builder setFailureTag(TupleTag failureTag); @@ -69,7 +73,7 @@ public PCollectionTuple expand(PCollection input) { .collect(Collectors.toMap(Pair::getLeft, Pair::getRight)); return input - .apply("ProcessFeatureRows", ParDo.of(new ProcessFeatureRowDoFn())) + .apply("ProcessFeatureRows", ParDo.of(new ProcessFeatureRowDoFn(getDefaultProject()))) .apply( "ValidateFeatureRows", ParDo.of( diff --git a/ingestion/src/main/java/feast/ingestion/transform/fn/ProcessFeatureRowDoFn.java b/ingestion/src/main/java/feast/ingestion/transform/fn/ProcessFeatureRowDoFn.java index 2a115a8071..1ab416a66f 100644 --- a/ingestion/src/main/java/feast/ingestion/transform/fn/ProcessFeatureRowDoFn.java +++ b/ingestion/src/main/java/feast/ingestion/transform/fn/ProcessFeatureRowDoFn.java @@ -21,11 +21,19 @@ public class ProcessFeatureRowDoFn extends DoFn { + private String defaultProject; + + public ProcessFeatureRowDoFn(String defaultProject) { + this.defaultProject = defaultProject; + } + @ProcessElement public void processElement(ProcessContext context) { FeatureRow featureRow = context.element(); + String featureSetId = stripVersion(featureRow.getFeatureSet()); + featureSetId = applyDefaultProject(featureSetId); featureRow = - featureRow.toBuilder().setFeatureSet(stripVersion(featureRow.getFeatureSet())).build(); + featureRow.toBuilder().setFeatureSet(featureSetId).build(); context.output(featureRow); } @@ -34,4 +42,12 @@ private String stripVersion(String featureSetId) { String[] split = featureSetId.split(":"); return split[0]; } + + private String applyDefaultProject(String featureSetId) { + String[] split = featureSetId.split("/"); + if (split.length == 1) { + return defaultProject + "/" + featureSetId; + } + return featureSetId; + } } diff --git a/ingestion/src/test/java/feast/ingestion/transform/ProcessAndValidateFeatureRowsTest.java b/ingestion/src/test/java/feast/ingestion/transform/ProcessAndValidateFeatureRowsTest.java index 96105e3789..18c7d2ddad 100644 --- a/ingestion/src/test/java/feast/ingestion/transform/ProcessAndValidateFeatureRowsTest.java +++ b/ingestion/src/test/java/feast/ingestion/transform/ProcessAndValidateFeatureRowsTest.java @@ -109,6 +109,7 @@ public void shouldWriteSuccessAndFailureTagsCorrectly() { .setCoder(ProtoCoder.of(FeatureRow.class)) .apply( ProcessAndValidateFeatureRows.newBuilder() + .setDefaultProject("myproject") .setFailureTag(FAILURE_TAG) .setSuccessTag(SUCCESS_TAG) .setFeatureSetSpecs(featureSetSpecs) @@ -158,6 +159,7 @@ public void shouldStripVersions() { .setCoder(ProtoCoder.of(FeatureRow.class)) .apply( ProcessAndValidateFeatureRows.newBuilder() + .setDefaultProject("myproject") .setFailureTag(FAILURE_TAG) .setSuccessTag(SUCCESS_TAG) .setFeatureSetSpecs(featureSetSpecs) @@ -168,6 +170,55 @@ public void shouldStripVersions() { p.run(); } + @Test + public void shouldApplyDefaultProject() { + FeatureSetSpec fs1 = + FeatureSetSpec.newBuilder() + .setName("feature_set") + .setProject("myproject") + .addEntities( + EntitySpec.newBuilder() + .setName("entity_id_primary") + .setValueType(Enum.INT32) + .build()) + .addEntities( + EntitySpec.newBuilder() + .setName("entity_id_secondary") + .setValueType(Enum.STRING) + .build()) + .addFeatures( + FeatureSpec.newBuilder().setName("feature_1").setValueType(Enum.STRING).build()) + .addFeatures( + FeatureSpec.newBuilder().setName("feature_2").setValueType(Enum.INT64).build()) + .build(); + + Map featureSetSpecs = new HashMap<>(); + featureSetSpecs.put("myproject/feature_set", fs1); + + List input = new ArrayList<>(); + List expected = new ArrayList<>(); + + FeatureRow randomRow = TestUtil.createRandomFeatureRow(fs1); + expected.add(randomRow); + randomRow = randomRow.toBuilder().setFeatureSet("feature_set").build(); + input.add(randomRow); + + PCollectionTuple output = + p.apply(Create.of(input)) + .setCoder(ProtoCoder.of(FeatureRow.class)) + .apply( + ProcessAndValidateFeatureRows.newBuilder() + .setDefaultProject("myproject") + .setFailureTag(FAILURE_TAG) + .setSuccessTag(SUCCESS_TAG) + .setFeatureSetSpecs(featureSetSpecs) + .build()); + + PAssert.that(output.get(SUCCESS_TAG)).containsInAnyOrder(expected); + + p.run(); + } + @Test public void shouldExcludeUnregisteredFields() { FeatureSetSpec fs1 = @@ -212,6 +263,7 @@ public void shouldExcludeUnregisteredFields() { .setCoder(ProtoCoder.of(FeatureRow.class)) .apply( ProcessAndValidateFeatureRows.newBuilder() + .setDefaultProject("myproject") .setFailureTag(FAILURE_TAG) .setSuccessTag(SUCCESS_TAG) .setFeatureSetSpecs(featureSets) From aaaf9e631e4a2c10335439d87487cd28b38162ca Mon Sep 17 00:00:00 2001 From: zhilingc Date: Thu, 14 May 2020 10:57:53 +0800 Subject: [PATCH 2/3] Apply spotless --- .../ingestion/options/ImportOptions.java | 3 +- .../transform/fn/ProcessFeatureRowDoFn.java | 3 +- .../ProcessAndValidateFeatureRowsTest.java | 58 +++++++++---------- 3 files changed, 32 insertions(+), 32 deletions(-) diff --git a/ingestion/src/main/java/feast/ingestion/options/ImportOptions.java b/ingestion/src/main/java/feast/ingestion/options/ImportOptions.java index f2bf31d1cd..e3a1b841c4 100644 --- a/ingestion/src/main/java/feast/ingestion/options/ImportOptions.java +++ b/ingestion/src/main/java/feast/ingestion/options/ImportOptions.java @@ -28,7 +28,8 @@ public interface ImportOptions extends PipelineOptions, DataflowPipelineOptions, DirectOptions { @Required - @Description("Default feast project to apply to incoming rows that do not specify project in its feature set reference.") + @Description( + "Default feast project to apply to incoming rows that do not specify project in its feature set reference.") String getDefaultFeastProject(); void setDefaultFeastProject(String defaultProject); diff --git a/ingestion/src/main/java/feast/ingestion/transform/fn/ProcessFeatureRowDoFn.java b/ingestion/src/main/java/feast/ingestion/transform/fn/ProcessFeatureRowDoFn.java index 1ab416a66f..3680348cf0 100644 --- a/ingestion/src/main/java/feast/ingestion/transform/fn/ProcessFeatureRowDoFn.java +++ b/ingestion/src/main/java/feast/ingestion/transform/fn/ProcessFeatureRowDoFn.java @@ -32,8 +32,7 @@ public void processElement(ProcessContext context) { FeatureRow featureRow = context.element(); String featureSetId = stripVersion(featureRow.getFeatureSet()); featureSetId = applyDefaultProject(featureSetId); - featureRow = - featureRow.toBuilder().setFeatureSet(featureSetId).build(); + featureRow = featureRow.toBuilder().setFeatureSet(featureSetId).build(); context.output(featureRow); } diff --git a/ingestion/src/test/java/feast/ingestion/transform/ProcessAndValidateFeatureRowsTest.java b/ingestion/src/test/java/feast/ingestion/transform/ProcessAndValidateFeatureRowsTest.java index 18c7d2ddad..8c5d7bd8ed 100644 --- a/ingestion/src/test/java/feast/ingestion/transform/ProcessAndValidateFeatureRowsTest.java +++ b/ingestion/src/test/java/feast/ingestion/transform/ProcessAndValidateFeatureRowsTest.java @@ -159,7 +159,7 @@ public void shouldStripVersions() { .setCoder(ProtoCoder.of(FeatureRow.class)) .apply( ProcessAndValidateFeatureRows.newBuilder() - .setDefaultProject("myproject") + .setDefaultProject("myproject") .setFailureTag(FAILURE_TAG) .setSuccessTag(SUCCESS_TAG) .setFeatureSetSpecs(featureSetSpecs) @@ -173,24 +173,24 @@ public void shouldStripVersions() { @Test public void shouldApplyDefaultProject() { FeatureSetSpec fs1 = - FeatureSetSpec.newBuilder() - .setName("feature_set") - .setProject("myproject") - .addEntities( - EntitySpec.newBuilder() - .setName("entity_id_primary") - .setValueType(Enum.INT32) - .build()) - .addEntities( - EntitySpec.newBuilder() - .setName("entity_id_secondary") - .setValueType(Enum.STRING) - .build()) - .addFeatures( - FeatureSpec.newBuilder().setName("feature_1").setValueType(Enum.STRING).build()) - .addFeatures( - FeatureSpec.newBuilder().setName("feature_2").setValueType(Enum.INT64).build()) - .build(); + FeatureSetSpec.newBuilder() + .setName("feature_set") + .setProject("myproject") + .addEntities( + EntitySpec.newBuilder() + .setName("entity_id_primary") + .setValueType(Enum.INT32) + .build()) + .addEntities( + EntitySpec.newBuilder() + .setName("entity_id_secondary") + .setValueType(Enum.STRING) + .build()) + .addFeatures( + FeatureSpec.newBuilder().setName("feature_1").setValueType(Enum.STRING).build()) + .addFeatures( + FeatureSpec.newBuilder().setName("feature_2").setValueType(Enum.INT64).build()) + .build(); Map featureSetSpecs = new HashMap<>(); featureSetSpecs.put("myproject/feature_set", fs1); @@ -204,15 +204,15 @@ public void shouldApplyDefaultProject() { input.add(randomRow); PCollectionTuple output = - p.apply(Create.of(input)) - .setCoder(ProtoCoder.of(FeatureRow.class)) - .apply( - ProcessAndValidateFeatureRows.newBuilder() - .setDefaultProject("myproject") - .setFailureTag(FAILURE_TAG) - .setSuccessTag(SUCCESS_TAG) - .setFeatureSetSpecs(featureSetSpecs) - .build()); + p.apply(Create.of(input)) + .setCoder(ProtoCoder.of(FeatureRow.class)) + .apply( + ProcessAndValidateFeatureRows.newBuilder() + .setDefaultProject("myproject") + .setFailureTag(FAILURE_TAG) + .setSuccessTag(SUCCESS_TAG) + .setFeatureSetSpecs(featureSetSpecs) + .build()); PAssert.that(output.get(SUCCESS_TAG)).containsInAnyOrder(expected); @@ -263,7 +263,7 @@ public void shouldExcludeUnregisteredFields() { .setCoder(ProtoCoder.of(FeatureRow.class)) .apply( ProcessAndValidateFeatureRows.newBuilder() - .setDefaultProject("myproject") + .setDefaultProject("myproject") .setFailureTag(FAILURE_TAG) .setSuccessTag(SUCCESS_TAG) .setFeatureSetSpecs(featureSets) From b2ff681d3924d853949d79707d03173be696e958 Mon Sep 17 00:00:00 2001 From: zhilingc Date: Mon, 18 May 2020 18:16:51 +0800 Subject: [PATCH 3/3] Fix broken test --- ingestion/src/test/java/feast/ingestion/ImportJobTest.java | 1 + 1 file changed, 1 insertion(+) diff --git a/ingestion/src/test/java/feast/ingestion/ImportJobTest.java b/ingestion/src/test/java/feast/ingestion/ImportJobTest.java index 16d27303cd..39e4296378 100644 --- a/ingestion/src/test/java/feast/ingestion/ImportJobTest.java +++ b/ingestion/src/test/java/feast/ingestion/ImportJobTest.java @@ -176,6 +176,7 @@ public void runPipeline_ShouldWriteToRedisCorrectlyGivenValidSpecAndFeatureRow() }); options.setFeatureSetJson(compressor.compress(spec)); options.setStoreJson(Collections.singletonList(JsonFormat.printer().print(redis))); + options.setDefaultFeastProject("myproject"); options.setProject(""); options.setBlockOnRun(false);