diff --git a/.prow/scripts/test-end-to-end.sh b/.prow/scripts/test-end-to-end.sh index fde251097f..9b6b668a08 100755 --- a/.prow/scripts/test-end-to-end.sh +++ b/.prow/scripts/test-end-to-end.sh @@ -121,7 +121,7 @@ management: enabled: false EOF -nohup java -jar core/target/feast-core-0.3.0-SNAPSHOT.jar \ +nohup java -jar core/target/feast-core-0.3.1-SNAPSHOT.jar \ --spring.config.location=file:///tmp/core.application.yml \ &> /var/log/feast-core.log & sleep 20 @@ -172,7 +172,7 @@ spring: web-environment: false EOF -nohup java -jar serving/target/feast-serving-0.3.0-SNAPSHOT.jar \ +nohup java -jar serving/target/feast-serving-0.3.1-SNAPSHOT.jar \ --spring.config.location=file:///tmp/serving.online.application.yml \ &> /var/log/feast-serving-online.log & sleep 15 diff --git a/core/src/main/java/feast/core/service/SpecService.java b/core/src/main/java/feast/core/service/SpecService.java index 233bc9d28d..a2bde4a5b4 100644 --- a/core/src/main/java/feast/core/service/SpecService.java +++ b/core/src/main/java/feast/core/service/SpecService.java @@ -202,7 +202,6 @@ public ApplyFeatureSetResponse applyFeatureSet(FeatureSetSpec newFeatureSetSpec) .findByName(newFeatureSetSpec.getName()); if (existingFeatureSets.size() == 0) { newFeatureSetSpec = newFeatureSetSpec.toBuilder().setVersion(1).build(); - } else { existingFeatureSets = Ordering.natural().reverse().sortedCopy(existingFeatureSets); FeatureSet latest = existingFeatureSets.get(0); @@ -210,11 +209,8 @@ public ApplyFeatureSetResponse applyFeatureSet(FeatureSetSpec newFeatureSetSpec) // If the featureSet remains unchanged, we do nothing. if (featureSet.equalTo(latest)) { - newFeatureSetSpec = newFeatureSetSpec.toBuilder() - .setVersion(latest.getVersion()) - .build(); return ApplyFeatureSetResponse.newBuilder() - .setFeatureSet(newFeatureSetSpec) + .setFeatureSet(latest.toProto()) .setStatus(Status.NO_CHANGE) .build(); } diff --git a/core/src/test/java/feast/core/service/SpecServiceTest.java b/core/src/test/java/feast/core/service/SpecServiceTest.java index 85f6a4689c..cea80dbebe 100644 --- a/core/src/test/java/feast/core/service/SpecServiceTest.java +++ b/core/src/test/java/feast/core/service/SpecServiceTest.java @@ -99,6 +99,8 @@ public void setUp() { .thenReturn(featureSets); when(featureSetRepository.findByName("f1")) .thenReturn(featureSets.subList(0, 3)); + when(featureSetRepository.findFirstFeatureSetByNameOrderByVersionDesc("f1")) + .thenReturn(featureSet1v3); when(featureSetRepository.findByNameWithWildcard("f1")) .thenReturn(featureSets.subList(0, 3)); when(featureSetRepository.findByName("asd")) @@ -235,6 +237,8 @@ public void shouldGetLatestFeatureSetGivenMissingVersionFilter() @Test public void shouldGetSpecificFeatureSetGivenSpecificVersionFilter() throws InvalidProtocolBufferException { + when(featureSetRepository.findFeatureSetByNameAndVersion("f1", 2)) + .thenReturn(featureSets.get(1)); GetFeatureSetResponse actual = specService .getFeatureSet(GetFeatureSetRequest.newBuilder().setName("f1").setVersion(2).build()); FeatureSet expected = featureSets.get(1); diff --git a/ingestion/src/main/java/feast/ingestion/ImportJob.java b/ingestion/src/main/java/feast/ingestion/ImportJob.java index d919a18ac5..0154872de0 100644 --- a/ingestion/src/main/java/feast/ingestion/ImportJob.java +++ b/ingestion/src/main/java/feast/ingestion/ImportJob.java @@ -74,11 +74,10 @@ public static PipelineResult runPipeline(ImportOptions options) SpecUtil.getSubscribedFeatureSets(store.getSubscriptionsList(), featureSetSpecs); // Generate tags by key - Map> featureSetTagsByKey = subscribedFeatureSets.stream() + Map featureSetSpecsByKey = subscribedFeatureSets.stream() .map(fs -> { String id = String.format("%s:%s", fs.getName(), fs.getVersion()); - return Pair.of(id, new TupleTag(id) { - }); + return Pair.of(id, fs); }) .collect(Collectors.toMap(Pair::getLeft, Pair::getRight)); @@ -91,60 +90,60 @@ public static PipelineResult runPipeline(ImportOptions options) "ReadFeatureRowFromSource", ReadFromSource.newBuilder() .setSource(source) - .setFeatureSetTagByKey(featureSetTagsByKey) + .setSuccessTag(FEATURE_ROW_OUT) .setFailureTag(DEADLETTER_OUT) .build()); for (FeatureSetSpec featureSet : subscribedFeatureSets) { // Ensure Store has valid configuration and Feast can access it. StoreUtil.setupStore(store, featureSet); - String id = String.format("%s:%s", featureSet.getName(), featureSet.getVersion()); - - // Step 2. Validate incoming FeatureRows - PCollectionTuple validatedRows = convertedFeatureRows - .get(featureSetTagsByKey.get(id)) - .apply(ValidateFeatureRows.newBuilder() - .setFeatureSetSpec(featureSet) - .setSuccessTag(FEATURE_ROW_OUT) - .setFailureTag(DEADLETTER_OUT) - .build()); - - // Step 3. Write FeatureRow to the corresponding Store. - validatedRows - .get(FEATURE_ROW_OUT) + } + + // Step 2. Validate incoming FeatureRows + PCollectionTuple validatedRows = convertedFeatureRows + .get(FEATURE_ROW_OUT) + .apply(ValidateFeatureRows.newBuilder() + .setFeatureSetSpecs(featureSetSpecsByKey) + .setSuccessTag(FEATURE_ROW_OUT) + .setFailureTag(DEADLETTER_OUT) + .build()); + + // Step 3. Write FeatureRow to the corresponding Store. + validatedRows + .get(FEATURE_ROW_OUT) + .apply( + "WriteFeatureRowToStore", + WriteToStore.newBuilder().setFeatureSetSpecs(featureSetSpecsByKey) + .setStore(store) + .build()); + + // Step 4. Write FailedElements to a dead letter table in BigQuery. + if (options.getDeadLetterTableSpec() != null) { + convertedFeatureRows + .get(DEADLETTER_OUT) .apply( - "WriteFeatureRowToStore", - WriteToStore.newBuilder().setFeatureSetSpec(featureSet).setStore(store).build()); - - // Step 4. Write FailedElements to a dead letter table in BigQuery. - if (options.getDeadLetterTableSpec() != null) { - convertedFeatureRows - .get(DEADLETTER_OUT) - .apply( - "WriteFailedElements_ReadFromSource", - WriteFailedElementToBigQuery.newBuilder() - .setJsonSchema(ResourceUtil.getDeadletterTableSchemaJson()) - .setTableSpec(options.getDeadLetterTableSpec()) - .build()); - - validatedRows - .get(DEADLETTER_OUT) - .apply("WriteFailedElements_ValidateRows", - WriteFailedElementToBigQuery.newBuilder() - .setJsonSchema(ResourceUtil.getDeadletterTableSchemaJson()) - .setTableSpec(options.getDeadLetterTableSpec()) - .build()); - } - - // Step 5. Write metrics to a metrics sink. + "WriteFailedElements_ReadFromSource", + WriteFailedElementToBigQuery.newBuilder() + .setJsonSchema(ResourceUtil.getDeadletterTableSchemaJson()) + .setTableSpec(options.getDeadLetterTableSpec()) + .build()); + validatedRows - .apply("WriteMetrics", WriteMetricsTransform.newBuilder() - .setFeatureSetSpec(featureSet) - .setStoreName(store.getName()) - .setSuccessTag(FEATURE_ROW_OUT) - .setFailureTag(DEADLETTER_OUT) - .build()); + .get(DEADLETTER_OUT) + .apply("WriteFailedElements_ValidateRows", + WriteFailedElementToBigQuery.newBuilder() + .setJsonSchema(ResourceUtil.getDeadletterTableSchemaJson()) + .setTableSpec(options.getDeadLetterTableSpec()) + .build()); } + + // Step 5. Write metrics to a metrics sink. + validatedRows + .apply("WriteMetrics", WriteMetricsTransform.newBuilder() + .setStoreName(store.getName()) + .setSuccessTag(FEATURE_ROW_OUT) + .setFailureTag(DEADLETTER_OUT) + .build()); } return pipeline.run(); diff --git a/ingestion/src/main/java/feast/ingestion/transform/ReadFromSource.java b/ingestion/src/main/java/feast/ingestion/transform/ReadFromSource.java index 012823f84f..469952708d 100644 --- a/ingestion/src/main/java/feast/ingestion/transform/ReadFromSource.java +++ b/ingestion/src/main/java/feast/ingestion/transform/ReadFromSource.java @@ -24,7 +24,7 @@ public abstract class ReadFromSource extends PTransform> getFeatureSetTagByKey(); + public abstract TupleTag getSuccessTag(); public abstract TupleTag getFailureTag(); @@ -37,8 +37,7 @@ public abstract static class Builder { public abstract Builder setSource(Source source); - public abstract Builder setFeatureSetTagByKey( - Map> featureSetTagByKey); + public abstract Builder setSuccessTag(TupleTag successTag); public abstract Builder setFailureTag(TupleTag failureTag); @@ -76,13 +75,10 @@ public PCollectionTuple expand(PBegin input) { .commitOffsetsInFinalize()) .apply( "KafkaRecordToFeatureRow", ParDo.of(KafkaRecordToFeatureRowDoFn.newBuilder() - .setFeatureSetTagByKey(getFeatureSetTagByKey()) + .setSuccessTag(getSuccessTag()) .setFailureTag(getFailureTag()) .build()) - .withOutputTags(new TupleTag("placeholder") {}, - TupleTagList.of(Lists - .newArrayList(getFeatureSetTagByKey().values())) - .and(getFailureTag()))); + .withOutputTags(getSuccessTag(), TupleTagList.of(getFailureTag()))); } private String generateConsumerGroupId(String jobName) { diff --git a/ingestion/src/main/java/feast/ingestion/transform/ValidateFeatureRows.java b/ingestion/src/main/java/feast/ingestion/transform/ValidateFeatureRows.java index b026a4fd52..a2bd23077f 100644 --- a/ingestion/src/main/java/feast/ingestion/transform/ValidateFeatureRows.java +++ b/ingestion/src/main/java/feast/ingestion/transform/ValidateFeatureRows.java @@ -1,25 +1,27 @@ package feast.ingestion.transform; import com.google.auto.value.AutoValue; -import feast.core.FeatureSetProto.FeatureSetSpec; +import feast.core.FeatureSetProto; +import feast.ingestion.values.FeatureSetSpec; import feast.ingestion.transform.fn.ValidateFeatureRowDoFn; -import feast.ingestion.utils.SpecUtil; import feast.ingestion.values.FailedElement; -import feast.ingestion.values.Field; import feast.types.FeatureRowProto.FeatureRow; import java.util.Map; +import java.util.Map.Entry; +import java.util.stream.Collectors; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionTuple; import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.TupleTagList; +import org.apache.commons.lang3.tuple.Pair; @AutoValue public abstract class ValidateFeatureRows extends PTransform, PCollectionTuple> { - public abstract FeatureSetSpec getFeatureSetSpec(); + public abstract Map getFeatureSetSpecs(); public abstract TupleTag getSuccessTag(); @@ -32,7 +34,7 @@ public static Builder newBuilder() { @AutoValue.Builder public abstract static class Builder { - public abstract Builder setFeatureSetSpec(FeatureSetSpec featureSetSpec); + public abstract Builder setFeatureSetSpecs(Map featureSetSpec); public abstract Builder setSuccessTag(TupleTag successTag); @@ -43,17 +45,19 @@ public abstract static class Builder { @Override public PCollectionTuple expand(PCollection input) { - Map fieldsByName = SpecUtil - .getFieldByName(getFeatureSetSpec()); + + Map featureSetSpecs = getFeatureSetSpecs().entrySet().stream() + .map(e -> Pair.of(e.getKey(), new FeatureSetSpec(e.getValue()))) + .collect(Collectors.toMap(Pair::getLeft, Pair::getRight)); return input.apply("ValidateFeatureRows", ParDo.of(ValidateFeatureRowDoFn.newBuilder() - .setFeatureSetName(getFeatureSetSpec().getName()) - .setFeatureSetVersion(getFeatureSetSpec().getVersion()) - .setFieldByName(fieldsByName) + .setFeatureSetSpecs(featureSetSpecs) .setSuccessTag(getSuccessTag()) .setFailureTag(getFailureTag()) .build()) .withOutputTags(getSuccessTag(), TupleTagList.of(getFailureTag()))); } + + } diff --git a/ingestion/src/main/java/feast/ingestion/transform/WriteToStore.java b/ingestion/src/main/java/feast/ingestion/transform/WriteToStore.java index 8152010797..b23d3d4046 100644 --- a/ingestion/src/main/java/feast/ingestion/transform/WriteToStore.java +++ b/ingestion/src/main/java/feast/ingestion/transform/WriteToStore.java @@ -12,23 +12,28 @@ import feast.ingestion.options.ImportOptions; import feast.ingestion.utils.ResourceUtil; import feast.ingestion.values.FailedElement; -import feast.store.serving.bigquery.FeatureRowToTableRowDoFn; +import feast.store.serving.bigquery.FeatureRowToTableRow; import feast.store.serving.redis.FeatureRowToRedisMutationDoFn; import feast.store.serving.redis.RedisCustomIO; import feast.types.FeatureRowProto.FeatureRow; import java.io.IOException; +import java.util.Map; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.Method; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryInsertError; import org.apache.beam.sdk.io.gcp.bigquery.InsertRetryPolicy; +import org.apache.beam.sdk.io.gcp.bigquery.TableDestination; import org.apache.beam.sdk.io.gcp.bigquery.WriteResult; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PDone; +import org.apache.beam.sdk.values.ValueInSingleWindow; +import org.codehaus.jackson.JsonParser.Feature; import org.slf4j.Logger; @AutoValue @@ -38,7 +43,7 @@ public abstract class WriteToStore extends PTransform, P public abstract Store getStore(); - public abstract FeatureSetSpec getFeatureSetSpec(); + public abstract Map getFeatureSetSpecs(); public static Builder newBuilder() { return new AutoValue_WriteToStore.Builder(); @@ -46,9 +51,10 @@ public static Builder newBuilder() { @AutoValue.Builder public abstract static class Builder { + public abstract Builder setStore(Store store); - public abstract Builder setFeatureSetSpec(FeatureSetSpec featureSetSpec); + public abstract Builder setFeatureSetSpecs(Map featureSetSpecs); public abstract WriteToStore build(); } @@ -64,34 +70,34 @@ public PDone expand(PCollection input) { input .apply( "FeatureRowToRedisMutation", - ParDo.of(new FeatureRowToRedisMutationDoFn(getFeatureSetSpec()))) + ParDo.of(new FeatureRowToRedisMutationDoFn(getFeatureSetSpecs()))) .apply( "WriteRedisMutationToRedis", RedisCustomIO.write(redisConfig.getHost(), redisConfig.getPort())); break; case BIGQUERY: + BigQueryConfig bigqueryConfig = getStore().getBigqueryConfig(); - String tableSpec = - String.format( - "%s:%s.%s_v%s", - bigqueryConfig.getProjectId(), - bigqueryConfig.getDatasetId(), - getFeatureSetSpec().getName(), - getFeatureSetSpec().getVersion()); TimePartitioning timePartitioning = new TimePartitioning() .setType("DAY") - .setField(FeatureRowToTableRowDoFn.getEventTimestampColumn()); + .setField(FeatureRowToTableRow.getEventTimestampColumn()); WriteResult bigqueryWriteResult = input - .apply( - "FeatureRowToTableRow", - ParDo.of(new FeatureRowToTableRowDoFn(options.getJobName()))) .apply( "WriteTableRowToBigQuery", - BigQueryIO.writeTableRows() - .to(tableSpec) + BigQueryIO.write() + .to((SerializableFunction, TableDestination>) element -> { + String[] split = element.getValue().getFeatureSet().split(":"); + return new TableDestination(String.format( + "%s:%s.%s_v%s", + bigqueryConfig.getProjectId(), + bigqueryConfig.getDatasetId(), + split[0], + split[1]), null); + }) + .withFormatFunction(new FeatureRowToTableRow(options.getJobName())) .withCreateDisposition(CreateDisposition.CREATE_NEVER) .withWriteDisposition(WriteDisposition.WRITE_APPEND) .withExtendedErrorInfo() diff --git a/ingestion/src/main/java/feast/ingestion/transform/fn/KafkaRecordToFeatureRowDoFn.java b/ingestion/src/main/java/feast/ingestion/transform/fn/KafkaRecordToFeatureRowDoFn.java index 9b43a5ade8..7d400f3810 100644 --- a/ingestion/src/main/java/feast/ingestion/transform/fn/KafkaRecordToFeatureRowDoFn.java +++ b/ingestion/src/main/java/feast/ingestion/transform/fn/KafkaRecordToFeatureRowDoFn.java @@ -19,7 +19,8 @@ @AutoValue public abstract class KafkaRecordToFeatureRowDoFn extends DoFn, FeatureRow> { - public abstract Map> getFeatureSetTagByKey(); + + public abstract TupleTag getSuccessTag(); public abstract TupleTag getFailureTag(); @@ -30,7 +31,7 @@ public static KafkaRecordToFeatureRowDoFn.Builder newBuilder() { @AutoValue.Builder public abstract static class Builder { - public abstract Builder setFeatureSetTagByKey(Map> featureSetTagByKey); + public abstract Builder setSuccessTag(TupleTag successTag); public abstract Builder setFailureTag(TupleTag failureTag); @@ -56,19 +57,6 @@ public void processElement(ProcessContext context) { .build()); return; } - TupleTag tag = getFeatureSetTagByKey() - .getOrDefault(featureRow.getFeatureSet(), null); - if (tag == null) { - context.output( - getFailureTag(), - FailedElement.newBuilder() - .setTransformName("KafkaRecordToFeatureRow") - .setJobName(context.getPipelineOptions().getJobName()) - .setPayload(new String(Base64.getEncoder().encode(value))) - .setErrorMessage(String.format("Got row with unexpected feature set id %s. Expected one of %s.", featureRow.getFeatureSet(), getFeatureSetTagByKey().keySet())) - .build()); - return; - } - context.output(tag, featureRow); + context.output(featureRow); } } diff --git a/ingestion/src/main/java/feast/ingestion/transform/fn/ValidateFeatureRowDoFn.java b/ingestion/src/main/java/feast/ingestion/transform/fn/ValidateFeatureRowDoFn.java index 2906e220b2..3eff57004e 100644 --- a/ingestion/src/main/java/feast/ingestion/transform/fn/ValidateFeatureRowDoFn.java +++ b/ingestion/src/main/java/feast/ingestion/transform/fn/ValidateFeatureRowDoFn.java @@ -1,6 +1,8 @@ package feast.ingestion.transform.fn; import com.google.auto.value.AutoValue; +import feast.ingestion.values.FailedElement.Builder; +import feast.ingestion.values.FeatureSetSpec; import feast.ingestion.values.FailedElement; import feast.ingestion.values.Field; import feast.types.FeatureRowProto.FeatureRow; @@ -13,11 +15,7 @@ @AutoValue public abstract class ValidateFeatureRowDoFn extends DoFn { - public abstract String getFeatureSetName(); - - public abstract int getFeatureSetVersion(); - - public abstract Map getFieldByName(); + public abstract Map getFeatureSetSpecs(); public abstract TupleTag getSuccessTag(); @@ -29,11 +27,8 @@ public static Builder newBuilder() { @AutoValue.Builder public abstract static class Builder { - public abstract Builder setFeatureSetName(String featureSetName); - - public abstract Builder setFeatureSetVersion(int featureSetVersion); - public abstract Builder setFieldByName(Map fieldByName); + public abstract Builder setFeatureSetSpecs(Map featureSetSpecs); public abstract Builder setSuccessTag(TupleTag successTag); @@ -42,27 +37,28 @@ public abstract static class Builder { public abstract ValidateFeatureRowDoFn build(); } - @ProcessElement public void processElement(ProcessContext context) { String error = null; - String featureSetId = String.format("%s:%d", getFeatureSetName(), getFeatureSetVersion()); FeatureRow featureRow = context.element(); - if (featureRow.getFeatureSet().equals(featureSetId)) { + FeatureSetSpec featureSetSpec = getFeatureSetSpecs() + .getOrDefault(featureRow.getFeatureSet(), null); + if (featureSetSpec != null) { for (FieldProto.Field field : featureRow.getFieldsList()) { - if (!getFieldByName().containsKey(field.getName())) { + Field fieldSpec = featureSetSpec.getField(field.getName()); + if (fieldSpec == null) { error = String.format( "FeatureRow contains field '%s' which do not exists in FeatureSet '%s' version '%d'. Please check the FeatureRow data.", - field.getName(), getFeatureSetName(), getFeatureSetVersion()); + field.getName(), featureSetSpec.getId()); break; } // If value is set in the FeatureRow, make sure the value type matches // that defined in FeatureSetSpec if (!field.getValue().getValCase().equals(ValCase.VAL_NOT_SET)) { int expectedTypeFieldNumber = - getFieldByName().get(field.getName()).getType().getNumber(); + fieldSpec.getType().getNumber(); int actualTypeFieldNumber = field.getValue().getValCase().getNumber(); if (expectedTypeFieldNumber != actualTypeFieldNumber) { error = @@ -70,7 +66,7 @@ public void processElement(ProcessContext context) { "FeatureRow contains field '%s' with invalid type '%s'. Feast expects the field type to match that in FeatureSet '%s'. Please check the FeatureRow data.", field.getName(), field.getValue().getValCase(), - getFieldByName().get(field.getName()).getType()); + fieldSpec.getType()); break; } } @@ -78,20 +74,25 @@ public void processElement(ProcessContext context) { } else { error = String.format( "FeatureRow contains invalid feature set id %s. Please check that the feature rows are being published to the correct topic on the feature stream.", - featureSetId); + featureRow.getFeatureSet()); } if (error != null) { - context.output( - getFailureTag(), - FailedElement.newBuilder() - .setTransformName("ValidateFeatureRow") - .setJobName(context.getPipelineOptions().getJobName()) - .setPayload(featureRow.toString()) - .setErrorMessage(error) - .build()); - } else { - context.output(getSuccessTag(), featureRow); + FailedElement.Builder failedElement = FailedElement.newBuilder() + .setTransformName("ValidateFeatureRow") + .setJobName(context.getPipelineOptions().getJobName()) + .setPayload(featureRow.toString()) + .setErrorMessage(error); + if (featureSetSpec != null) { + String[] split = featureSetSpec.getId().split(":"); + failedElement = failedElement + .setFeatureSetName(split[0]) + .setFeatureSetVersion(split[1]); } + context.output( + getFailureTag(), failedElement.build()); + } else { + context.output(getSuccessTag(), featureRow); + } } } diff --git a/ingestion/src/main/java/feast/ingestion/transform/metrics/WriteDeadletterRowMetricsDoFn.java b/ingestion/src/main/java/feast/ingestion/transform/metrics/WriteDeadletterRowMetricsDoFn.java index a3c814158f..7040463eec 100644 --- a/ingestion/src/main/java/feast/ingestion/transform/metrics/WriteDeadletterRowMetricsDoFn.java +++ b/ingestion/src/main/java/feast/ingestion/transform/metrics/WriteDeadletterRowMetricsDoFn.java @@ -25,8 +25,6 @@ public abstract class WriteDeadletterRowMetricsDoFn extends public abstract String getStoreName(); - public abstract FeatureSetSpec getFeatureSetSpec(); - public abstract String getStatsdHost(); public abstract int getStatsdPort(); @@ -42,8 +40,6 @@ public abstract static class Builder { public abstract Builder setStoreName(String storeName); - public abstract Builder setFeatureSetSpec(FeatureSetSpec featureSetSpec); - public abstract Builder setStatsdHost(String statsdHost); public abstract Builder setStatsdPort(int statsdPort); @@ -63,21 +59,17 @@ public void setup() { @ProcessElement public void processElement(ProcessContext c) { - FeatureSetSpec featureSetSpec = getFeatureSetSpec(); - long rowCount = 0; for (FailedElement ignored : c.element().getValue()) { - rowCount++; - } - - try { - statsd.count("deadletter_row_count", rowCount, - STORE_TAG_KEY + ":" + getStoreName(), - FEATURE_SET_NAME_TAG_KEY + ":" + featureSetSpec.getName(), - FEATURE_SET_VERSION_TAG_KEY + ":" + featureSetSpec.getVersion(), - INGESTION_JOB_NAME_KEY + ":" + c.getPipelineOptions().getJobName()); - } catch (StatsDClientException e) { - log.warn("Unable to push metrics to server", e); + try { + statsd.count("deadletter_row_count", 1, + STORE_TAG_KEY + ":" + getStoreName(), + FEATURE_SET_NAME_TAG_KEY + ":" + ignored.getFeatureSetName(), + FEATURE_SET_VERSION_TAG_KEY + ":" + ignored.getFeatureSetVersion(), + INGESTION_JOB_NAME_KEY + ":" + c.getPipelineOptions().getJobName()); + } catch (StatsDClientException e) { + log.warn("Unable to push metrics to server", e); + } } } } diff --git a/ingestion/src/main/java/feast/ingestion/transform/metrics/WriteMetricsTransform.java b/ingestion/src/main/java/feast/ingestion/transform/metrics/WriteMetricsTransform.java index b37947d936..8fa7954e1c 100644 --- a/ingestion/src/main/java/feast/ingestion/transform/metrics/WriteMetricsTransform.java +++ b/ingestion/src/main/java/feast/ingestion/transform/metrics/WriteMetricsTransform.java @@ -22,8 +22,6 @@ public abstract class WriteMetricsTransform extends PTransform getSuccessTag(); public abstract TupleTag getFailureTag(); @@ -36,9 +34,6 @@ public static Builder newBuilder() { public abstract static class Builder { public abstract Builder setStoreName(String storeName); - - public abstract Builder setFeatureSetSpec(FeatureSetSpec featureSetSpec); - public abstract Builder setSuccessTag(TupleTag successTag); public abstract Builder setFailureTag(TupleTag failureTag); @@ -58,7 +53,6 @@ public PDone expand(PCollectionTuple input) { new WindowRecords<>(WINDOW_SIZE_SECONDS)) .apply("Write deadletter metrics", ParDo.of( WriteDeadletterRowMetricsDoFn.newBuilder() - .setFeatureSetSpec(getFeatureSetSpec()) .setStatsdHost(options.getStatsdHost()) .setStatsdPort(options.getStatsdPort()) .setStoreName(getStoreName()) @@ -69,7 +63,6 @@ public PDone expand(PCollectionTuple input) { new WindowRecords<>(WINDOW_SIZE_SECONDS)) .apply("Write row metrics", ParDo .of(WriteRowMetricsDoFn.newBuilder() - .setFeatureSetSpec(getFeatureSetSpec()) .setStatsdHost(options.getStatsdHost()) .setStatsdPort(options.getStatsdPort()) .setStoreName(getStoreName()) diff --git a/ingestion/src/main/java/feast/ingestion/transform/metrics/WriteRowMetricsDoFn.java b/ingestion/src/main/java/feast/ingestion/transform/metrics/WriteRowMetricsDoFn.java index d3401291b3..fc9fcd64cf 100644 --- a/ingestion/src/main/java/feast/ingestion/transform/metrics/WriteRowMetricsDoFn.java +++ b/ingestion/src/main/java/feast/ingestion/transform/metrics/WriteRowMetricsDoFn.java @@ -26,12 +26,19 @@ public abstract class WriteRowMetricsDoFn extends DoFn parseStoreJsonList(List jsonList) return stores; } - public static Map getFieldByName(FeatureSetSpec featureSetSpec) { + public static Map getFieldsByName(FeatureSetSpec featureSetSpec) { Map fieldByName = new HashMap<>(); for (EntitySpec entitySpec : featureSetSpec.getEntitiesList()) { fieldByName.put( diff --git a/ingestion/src/main/java/feast/ingestion/values/FailedElement.java b/ingestion/src/main/java/feast/ingestion/values/FailedElement.java index 037f7f6296..2d808ec1c5 100644 --- a/ingestion/src/main/java/feast/ingestion/values/FailedElement.java +++ b/ingestion/src/main/java/feast/ingestion/values/FailedElement.java @@ -2,6 +2,7 @@ import com.google.auto.value.AutoValue; import javax.annotation.Nullable; +import javax.validation.constraints.Null; import org.apache.beam.sdk.schemas.AutoValueSchema; import org.apache.beam.sdk.schemas.annotations.DefaultSchema; import org.joda.time.Instant; @@ -17,6 +18,12 @@ public abstract class FailedElement { @Nullable public abstract String getJobName(); + @Nullable + public abstract String getFeatureSetName(); + + @Nullable + public abstract String getFeatureSetVersion(); + @Nullable public abstract String getTransformName(); @@ -37,6 +44,10 @@ public static Builder newBuilder() { public abstract static class Builder { public abstract Builder setTimestamp(Instant timestamp); + public abstract Builder setFeatureSetName(String featureSetName); + + public abstract Builder setFeatureSetVersion(String featureSetVersion); + public abstract Builder setJobName(String jobName); public abstract Builder setTransformName(String transformName); diff --git a/ingestion/src/main/java/feast/ingestion/values/FeatureSetSpec.java b/ingestion/src/main/java/feast/ingestion/values/FeatureSetSpec.java new file mode 100644 index 0000000000..56becec0d0 --- /dev/null +++ b/ingestion/src/main/java/feast/ingestion/values/FeatureSetSpec.java @@ -0,0 +1,32 @@ +package feast.ingestion.values; + +import static feast.ingestion.utils.SpecUtil.getFieldsByName; + +import feast.core.FeatureSetProto; +import java.io.Serializable; +import java.util.Map; + +/** + * This class represents {@link feast.core.FeatureSetProto.FeatureSetSpec} but + * contains fields directly accessible by name for feature validation purposes. + * + *

The use for this class is mainly for validating the Fields in FeatureRow. + */ +public class FeatureSetSpec implements Serializable { + private final String id; + + private final Map fields; + + public FeatureSetSpec(FeatureSetProto.FeatureSetSpec featureSetSpec) { + this.id = String.format("%s:%d", featureSetSpec.getName(), featureSetSpec.getVersion()); + this.fields = getFieldsByName(featureSetSpec); + } + + public String getId() { + return id; + } + + public Field getField(String fieldName) { + return fields.getOrDefault(fieldName, null); + } +} diff --git a/ingestion/src/main/java/feast/store/serving/bigquery/FeatureRowToTableRowDoFn.java b/ingestion/src/main/java/feast/store/serving/bigquery/FeatureRowToTableRow.java similarity index 88% rename from ingestion/src/main/java/feast/store/serving/bigquery/FeatureRowToTableRowDoFn.java rename to ingestion/src/main/java/feast/store/serving/bigquery/FeatureRowToTableRow.java index a2ac738e01..c685e4d95c 100644 --- a/ingestion/src/main/java/feast/store/serving/bigquery/FeatureRowToTableRowDoFn.java +++ b/ingestion/src/main/java/feast/store/serving/bigquery/FeatureRowToTableRow.java @@ -6,19 +6,19 @@ import feast.types.FieldProto.Field; import java.util.Base64; import java.util.stream.Collectors; -import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.SerializableFunction; import org.joda.time.Instant; // TODO: Validate FeatureRow against FeatureSetSpec // i.e. that the value types in FeatureRow matches against those in FeatureSetSpec -public class FeatureRowToTableRowDoFn extends DoFn { +public class FeatureRowToTableRow implements SerializableFunction { private static final String EVENT_TIMESTAMP_COLUMN = "event_timestamp"; private static final String CREATED_TIMESTAMP_COLUMN = "created_timestamp"; private static final String JOB_ID_COLUMN = "job_id"; private final String jobId; - public FeatureRowToTableRowDoFn(String jobId) { + public FeatureRowToTableRow(String jobId) { this.jobId = jobId; } @@ -26,12 +26,7 @@ public static String getEventTimestampColumn() { return EVENT_TIMESTAMP_COLUMN; } - @ProcessElement - public void processElement(@Element FeatureRow featureRow, OutputReceiver out) { - out.output(createTableRow(featureRow, jobId)); - } - - private static TableRow createTableRow(FeatureRow featureRow, String jobId) { + public TableRow apply(FeatureRow featureRow) { TableRow tableRow = new TableRow(); tableRow.set(EVENT_TIMESTAMP_COLUMN, Timestamps.toString(featureRow.getEventTimestamp())); diff --git a/ingestion/src/main/java/feast/store/serving/redis/FeatureRowToRedisMutationDoFn.java b/ingestion/src/main/java/feast/store/serving/redis/FeatureRowToRedisMutationDoFn.java index 5c51b5d934..d54b4dbcb3 100644 --- a/ingestion/src/main/java/feast/store/serving/redis/FeatureRowToRedisMutationDoFn.java +++ b/ingestion/src/main/java/feast/store/serving/redis/FeatureRowToRedisMutationDoFn.java @@ -25,6 +25,7 @@ import feast.store.serving.redis.RedisCustomIO.RedisMutation; import feast.types.FeatureRowProto.FeatureRow; import feast.types.FieldProto.Field; +import java.util.Map; import java.util.Set; import java.util.stream.Collectors; import org.apache.beam.sdk.transforms.DoFn; @@ -34,13 +35,14 @@ public class FeatureRowToRedisMutationDoFn extends DoFn featureSetSpecs; - public FeatureRowToRedisMutationDoFn(FeatureSetSpec featureSetSpec) { - this.featureSetSpec = featureSetSpec; + public FeatureRowToRedisMutationDoFn(Map featureSetSpecs) { + this.featureSetSpecs = featureSetSpecs; } private RedisKey getKey(FeatureRow featureRow) { + FeatureSetSpec featureSetSpec = featureSetSpecs.get(featureRow.getFeatureSet()); Set entityNames = featureSetSpec.getEntitiesList().stream() .map(EntitySpec::getName).collect(Collectors.toSet()); @@ -60,8 +62,8 @@ private RedisKey getKey(FeatureRow featureRow) { @ProcessElement public void processElement(ProcessContext context) { FeatureRow featureRow = context.element(); - RedisKey key = getKey(featureRow); try { + RedisKey key = getKey(featureRow); RedisMutation redisMutation = new RedisMutation(Method.SET, key.toByteArray(), featureRow.toByteArray(), null, null); context.output(redisMutation); diff --git a/ingestion/src/test/java/feast/ingestion/transform/ValidateFeatureRowsTest.java b/ingestion/src/test/java/feast/ingestion/transform/ValidateFeatureRowsTest.java new file mode 100644 index 0000000000..e4d1e76640 --- /dev/null +++ b/ingestion/src/test/java/feast/ingestion/transform/ValidateFeatureRowsTest.java @@ -0,0 +1,91 @@ +package feast.ingestion.transform; + +import static org.junit.Assert.*; + +import feast.core.FeatureSetProto.EntitySpec; +import feast.core.FeatureSetProto.FeatureSetSpec; +import feast.core.FeatureSetProto.FeatureSpec; +import feast.core.SourceProto.KafkaSourceConfig; +import feast.core.SourceProto.Source; +import feast.core.SourceProto.SourceType; +import feast.ingestion.values.FailedElement; +import feast.test.TestUtil; +import feast.types.FeatureRowProto.FeatureRow; +import feast.types.ValueProto.ValueType.Enum; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.beam.sdk.extensions.protobuf.ProtoCoder; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.Count; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.values.PCollectionTuple; +import org.apache.beam.sdk.values.TupleTag; +import org.junit.Rule; +import org.junit.Test; + +public class ValidateFeatureRowsTest { + @Rule + public transient TestPipeline p = TestPipeline.create(); + + private static final TupleTag SUCCESS_TAG = new TupleTag() { + }; + + private static final TupleTag FAILURE_TAG = new TupleTag() { + }; + + @Test + public void shouldWriteSuccessAndFailureTagsCorrectly() { + FeatureSetSpec fs1 = FeatureSetSpec.newBuilder().setName("feature_set").setVersion(1) + .addEntities(EntitySpec.newBuilder() + .setName("entity_id_primary").setValueType(Enum.INT32).build()) + .addEntities(EntitySpec.newBuilder() + .setName("entity_id_secondary").setValueType(Enum.STRING).build()) + .addFeatures(FeatureSpec.newBuilder() + .setName("feature_1").setValueType(Enum.STRING).build()) + .addFeatures(FeatureSpec.newBuilder() + .setName("feature_2").setValueType(Enum.INT64).build()) + .build(); + + FeatureSetSpec fs2 = FeatureSetSpec.newBuilder().setName("feature_set").setVersion(2) + .addEntities(EntitySpec.newBuilder() + .setName("entity_id_primary").setValueType(Enum.INT32).build()) + .addEntities(EntitySpec.newBuilder() + .setName("entity_id_secondary").setValueType(Enum.STRING).build()) + .addFeatures(FeatureSpec.newBuilder() + .setName("feature_1").setValueType(Enum.STRING).build()) + .addFeatures(FeatureSpec.newBuilder() + .setName("feature_2").setValueType(Enum.INT64).build()) + .build(); + + + Map featureSetSpecs = new HashMap<>(); + featureSetSpecs.put("feature_set:1", fs1); + featureSetSpecs.put("feature_set:2", fs2); + + List input = new ArrayList<>(); + List expected = new ArrayList<>(); + + for (FeatureSetSpec featureSetSpec : featureSetSpecs.values()) { + FeatureRow randomRow = TestUtil.createRandomFeatureRow(featureSetSpec); + input.add(randomRow); + expected.add(randomRow); + } + + input.add(FeatureRow.newBuilder().setFeatureSet("invalid").build()); + + PCollectionTuple output = p.apply(Create.of(input)).setCoder(ProtoCoder.of(FeatureRow.class)) + .apply(ValidateFeatureRows.newBuilder() + .setFailureTag(FAILURE_TAG) + .setSuccessTag(SUCCESS_TAG) + .setFeatureSetSpecs(featureSetSpecs) + .build()); + + PAssert.that(output.get(SUCCESS_TAG)).containsInAnyOrder(expected); + PAssert.that(output.get(FAILURE_TAG).apply(Count.globally())).containsInAnyOrder(1L); + + p.run(); + } +} \ No newline at end of file diff --git a/pom.xml b/pom.xml index 7851428864..f46de07425 100644 --- a/pom.xml +++ b/pom.xml @@ -35,7 +35,7 @@ - 0.3.0-SNAPSHOT + 0.3.1-SNAPSHOT https://github.com/gojek/feast UTF-8 diff --git a/serving/src/main/java/feast/serving/service/RedisServingService.java b/serving/src/main/java/feast/serving/service/RedisServingService.java index aec8a4bfb4..118b1367bd 100644 --- a/serving/src/main/java/feast/serving/service/RedisServingService.java +++ b/serving/src/main/java/feast/serving/service/RedisServingService.java @@ -105,7 +105,7 @@ public GetOnlineFeaturesResponse getOnlineFeatures(GetOnlineFeaturesRequest requ .collect(Collectors.toList()); Duration defaultMaxAge = featureSetSpec.getMaxAge(); - if (featureSetRequest.getMaxAge() == Duration.getDefaultInstance()) { + if (featureSetRequest.getMaxAge().equals(Duration.getDefaultInstance())) { featureSetRequest = featureSetRequest.toBuilder().setMaxAge(defaultMaxAge).build(); } @@ -244,7 +244,7 @@ private void sendAndProcessMultiGet( private boolean isStale( FeatureSetRequest featureSetRequest, EntityRow entityRow, FeatureRow featureRow) { - if (featureSetRequest.getMaxAge() == Duration.getDefaultInstance()) { + if (featureSetRequest.getMaxAge().equals(Duration.getDefaultInstance())) { return false; } long givenTimestamp = entityRow.getEntityTimestamp().getSeconds();