Skip to content

Commit

Permalink
Consolidate jobs into single steps instead of branching out (#326)
Browse files Browse the repository at this point in the history
* Consolidate jobs into single steps instead of branching out

* Add basic unit test for validation step

* Change version back to 0.3.1

* Use 0.3.1-SNAPSHOT jar in tests
  • Loading branch information
Chen Zhiling authored and feast-ci-bot committed Nov 27, 2019
1 parent 31b9ca4 commit e54780b
Show file tree
Hide file tree
Showing 20 changed files with 316 additions and 202 deletions.
4 changes: 2 additions & 2 deletions .prow/scripts/test-end-to-end.sh
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ management:
enabled: false
EOF

nohup java -jar core/target/feast-core-0.3.0-SNAPSHOT.jar \
nohup java -jar core/target/feast-core-0.3.1-SNAPSHOT.jar \
--spring.config.location=file:///tmp/core.application.yml \
&> /var/log/feast-core.log &
sleep 20
Expand Down Expand Up @@ -172,7 +172,7 @@ spring:
web-environment: false
EOF

nohup java -jar serving/target/feast-serving-0.3.0-SNAPSHOT.jar \
nohup java -jar serving/target/feast-serving-0.3.1-SNAPSHOT.jar \
--spring.config.location=file:///tmp/serving.online.application.yml \
&> /var/log/feast-serving-online.log &
sleep 15
Expand Down
6 changes: 1 addition & 5 deletions core/src/main/java/feast/core/service/SpecService.java
Original file line number Diff line number Diff line change
Expand Up @@ -202,19 +202,15 @@ public ApplyFeatureSetResponse applyFeatureSet(FeatureSetSpec newFeatureSetSpec)
.findByName(newFeatureSetSpec.getName());
if (existingFeatureSets.size() == 0) {
newFeatureSetSpec = newFeatureSetSpec.toBuilder().setVersion(1).build();

} else {
existingFeatureSets = Ordering.natural().reverse().sortedCopy(existingFeatureSets);
FeatureSet latest = existingFeatureSets.get(0);
FeatureSet featureSet = FeatureSet.fromProto(newFeatureSetSpec);

// If the featureSet remains unchanged, we do nothing.
if (featureSet.equalTo(latest)) {
newFeatureSetSpec = newFeatureSetSpec.toBuilder()
.setVersion(latest.getVersion())
.build();
return ApplyFeatureSetResponse.newBuilder()
.setFeatureSet(newFeatureSetSpec)
.setFeatureSet(latest.toProto())
.setStatus(Status.NO_CHANGE)
.build();
}
Expand Down
4 changes: 4 additions & 0 deletions core/src/test/java/feast/core/service/SpecServiceTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,8 @@ public void setUp() {
.thenReturn(featureSets);
when(featureSetRepository.findByName("f1"))
.thenReturn(featureSets.subList(0, 3));
when(featureSetRepository.findFirstFeatureSetByNameOrderByVersionDesc("f1"))
.thenReturn(featureSet1v3);
when(featureSetRepository.findByNameWithWildcard("f1"))
.thenReturn(featureSets.subList(0, 3));
when(featureSetRepository.findByName("asd"))
Expand Down Expand Up @@ -235,6 +237,8 @@ public void shouldGetLatestFeatureSetGivenMissingVersionFilter()
@Test
public void shouldGetSpecificFeatureSetGivenSpecificVersionFilter()
throws InvalidProtocolBufferException {
when(featureSetRepository.findFeatureSetByNameAndVersion("f1", 2))
.thenReturn(featureSets.get(1));
GetFeatureSetResponse actual = specService
.getFeatureSet(GetFeatureSetRequest.newBuilder().setName("f1").setVersion(2).build());
FeatureSet expected = featureSets.get(1);
Expand Down
95 changes: 47 additions & 48 deletions ingestion/src/main/java/feast/ingestion/ImportJob.java
Original file line number Diff line number Diff line change
Expand Up @@ -74,11 +74,10 @@ public static PipelineResult runPipeline(ImportOptions options)
SpecUtil.getSubscribedFeatureSets(store.getSubscriptionsList(), featureSetSpecs);

// Generate tags by key
Map<String, TupleTag<FeatureRow>> featureSetTagsByKey = subscribedFeatureSets.stream()
Map<String, FeatureSetSpec> featureSetSpecsByKey = subscribedFeatureSets.stream()
.map(fs -> {
String id = String.format("%s:%s", fs.getName(), fs.getVersion());
return Pair.of(id, new TupleTag<FeatureRow>(id) {
});
return Pair.of(id, fs);
})
.collect(Collectors.toMap(Pair::getLeft, Pair::getRight));

Expand All @@ -91,60 +90,60 @@ public static PipelineResult runPipeline(ImportOptions options)
"ReadFeatureRowFromSource",
ReadFromSource.newBuilder()
.setSource(source)
.setFeatureSetTagByKey(featureSetTagsByKey)
.setSuccessTag(FEATURE_ROW_OUT)
.setFailureTag(DEADLETTER_OUT)
.build());

for (FeatureSetSpec featureSet : subscribedFeatureSets) {
// Ensure Store has valid configuration and Feast can access it.
StoreUtil.setupStore(store, featureSet);
String id = String.format("%s:%s", featureSet.getName(), featureSet.getVersion());

// Step 2. Validate incoming FeatureRows
PCollectionTuple validatedRows = convertedFeatureRows
.get(featureSetTagsByKey.get(id))
.apply(ValidateFeatureRows.newBuilder()
.setFeatureSetSpec(featureSet)
.setSuccessTag(FEATURE_ROW_OUT)
.setFailureTag(DEADLETTER_OUT)
.build());

// Step 3. Write FeatureRow to the corresponding Store.
validatedRows
.get(FEATURE_ROW_OUT)
}

// Step 2. Validate incoming FeatureRows
PCollectionTuple validatedRows = convertedFeatureRows
.get(FEATURE_ROW_OUT)
.apply(ValidateFeatureRows.newBuilder()
.setFeatureSetSpecs(featureSetSpecsByKey)
.setSuccessTag(FEATURE_ROW_OUT)
.setFailureTag(DEADLETTER_OUT)
.build());

// Step 3. Write FeatureRow to the corresponding Store.
validatedRows
.get(FEATURE_ROW_OUT)
.apply(
"WriteFeatureRowToStore",
WriteToStore.newBuilder().setFeatureSetSpecs(featureSetSpecsByKey)
.setStore(store)
.build());

// Step 4. Write FailedElements to a dead letter table in BigQuery.
if (options.getDeadLetterTableSpec() != null) {
convertedFeatureRows
.get(DEADLETTER_OUT)
.apply(
"WriteFeatureRowToStore",
WriteToStore.newBuilder().setFeatureSetSpec(featureSet).setStore(store).build());

// Step 4. Write FailedElements to a dead letter table in BigQuery.
if (options.getDeadLetterTableSpec() != null) {
convertedFeatureRows
.get(DEADLETTER_OUT)
.apply(
"WriteFailedElements_ReadFromSource",
WriteFailedElementToBigQuery.newBuilder()
.setJsonSchema(ResourceUtil.getDeadletterTableSchemaJson())
.setTableSpec(options.getDeadLetterTableSpec())
.build());

validatedRows
.get(DEADLETTER_OUT)
.apply("WriteFailedElements_ValidateRows",
WriteFailedElementToBigQuery.newBuilder()
.setJsonSchema(ResourceUtil.getDeadletterTableSchemaJson())
.setTableSpec(options.getDeadLetterTableSpec())
.build());
}

// Step 5. Write metrics to a metrics sink.
"WriteFailedElements_ReadFromSource",
WriteFailedElementToBigQuery.newBuilder()
.setJsonSchema(ResourceUtil.getDeadletterTableSchemaJson())
.setTableSpec(options.getDeadLetterTableSpec())
.build());

validatedRows
.apply("WriteMetrics", WriteMetricsTransform.newBuilder()
.setFeatureSetSpec(featureSet)
.setStoreName(store.getName())
.setSuccessTag(FEATURE_ROW_OUT)
.setFailureTag(DEADLETTER_OUT)
.build());
.get(DEADLETTER_OUT)
.apply("WriteFailedElements_ValidateRows",
WriteFailedElementToBigQuery.newBuilder()
.setJsonSchema(ResourceUtil.getDeadletterTableSchemaJson())
.setTableSpec(options.getDeadLetterTableSpec())
.build());
}

// Step 5. Write metrics to a metrics sink.
validatedRows
.apply("WriteMetrics", WriteMetricsTransform.newBuilder()
.setStoreName(store.getName())
.setSuccessTag(FEATURE_ROW_OUT)
.setFailureTag(DEADLETTER_OUT)
.build());
}

return pipeline.run();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ public abstract class ReadFromSource extends PTransform<PBegin, PCollectionTuple

public abstract Source getSource();

public abstract Map<String, TupleTag<FeatureRow>> getFeatureSetTagByKey();
public abstract TupleTag<FeatureRow> getSuccessTag();

public abstract TupleTag<FailedElement> getFailureTag();

Expand All @@ -37,8 +37,7 @@ public abstract static class Builder {

public abstract Builder setSource(Source source);

public abstract Builder setFeatureSetTagByKey(
Map<String, TupleTag<FeatureRow>> featureSetTagByKey);
public abstract Builder setSuccessTag(TupleTag<FeatureRow> successTag);

public abstract Builder setFailureTag(TupleTag<FailedElement> failureTag);

Expand Down Expand Up @@ -76,13 +75,10 @@ public PCollectionTuple expand(PBegin input) {
.commitOffsetsInFinalize())
.apply(
"KafkaRecordToFeatureRow", ParDo.of(KafkaRecordToFeatureRowDoFn.newBuilder()
.setFeatureSetTagByKey(getFeatureSetTagByKey())
.setSuccessTag(getSuccessTag())
.setFailureTag(getFailureTag())
.build())
.withOutputTags(new TupleTag<FeatureRow>("placeholder") {},
TupleTagList.of(Lists
.newArrayList(getFeatureSetTagByKey().values()))
.and(getFailureTag())));
.withOutputTags(getSuccessTag(), TupleTagList.of(getFailureTag())));
}

private String generateConsumerGroupId(String jobName) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,25 +1,27 @@
package feast.ingestion.transform;

import com.google.auto.value.AutoValue;
import feast.core.FeatureSetProto.FeatureSetSpec;
import feast.core.FeatureSetProto;
import feast.ingestion.values.FeatureSetSpec;
import feast.ingestion.transform.fn.ValidateFeatureRowDoFn;
import feast.ingestion.utils.SpecUtil;
import feast.ingestion.values.FailedElement;
import feast.ingestion.values.Field;
import feast.types.FeatureRowProto.FeatureRow;
import java.util.Map;
import java.util.Map.Entry;
import java.util.stream.Collectors;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
import org.apache.commons.lang3.tuple.Pair;

@AutoValue
public abstract class ValidateFeatureRows extends
PTransform<PCollection<FeatureRow>, PCollectionTuple> {

public abstract FeatureSetSpec getFeatureSetSpec();
public abstract Map<String, FeatureSetProto.FeatureSetSpec> getFeatureSetSpecs();

public abstract TupleTag<FeatureRow> getSuccessTag();

Expand All @@ -32,7 +34,7 @@ public static Builder newBuilder() {
@AutoValue.Builder
public abstract static class Builder {

public abstract Builder setFeatureSetSpec(FeatureSetSpec featureSetSpec);
public abstract Builder setFeatureSetSpecs(Map<String, FeatureSetProto.FeatureSetSpec> featureSetSpec);

public abstract Builder setSuccessTag(TupleTag<FeatureRow> successTag);

Expand All @@ -43,17 +45,19 @@ public abstract static class Builder {

@Override
public PCollectionTuple expand(PCollection<FeatureRow> input) {
Map<String, Field> fieldsByName = SpecUtil
.getFieldByName(getFeatureSetSpec());

Map<String, FeatureSetSpec> featureSetSpecs = getFeatureSetSpecs().entrySet().stream()
.map(e -> Pair.of(e.getKey(), new FeatureSetSpec(e.getValue())))
.collect(Collectors.toMap(Pair::getLeft, Pair::getRight));

return input.apply("ValidateFeatureRows",
ParDo.of(ValidateFeatureRowDoFn.newBuilder()
.setFeatureSetName(getFeatureSetSpec().getName())
.setFeatureSetVersion(getFeatureSetSpec().getVersion())
.setFieldByName(fieldsByName)
.setFeatureSetSpecs(featureSetSpecs)
.setSuccessTag(getSuccessTag())
.setFailureTag(getFailureTag())
.build())
.withOutputTags(getSuccessTag(), TupleTagList.of(getFailureTag())));
}


}
40 changes: 23 additions & 17 deletions ingestion/src/main/java/feast/ingestion/transform/WriteToStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,23 +12,28 @@
import feast.ingestion.options.ImportOptions;
import feast.ingestion.utils.ResourceUtil;
import feast.ingestion.values.FailedElement;
import feast.store.serving.bigquery.FeatureRowToTableRowDoFn;
import feast.store.serving.bigquery.FeatureRowToTableRow;
import feast.store.serving.redis.FeatureRowToRedisMutationDoFn;
import feast.store.serving.redis.RedisCustomIO;
import feast.types.FeatureRowProto.FeatureRow;
import java.io.IOException;
import java.util.Map;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.Method;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryInsertError;
import org.apache.beam.sdk.io.gcp.bigquery.InsertRetryPolicy;
import org.apache.beam.sdk.io.gcp.bigquery.TableDestination;
import org.apache.beam.sdk.io.gcp.bigquery.WriteResult;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PDone;
import org.apache.beam.sdk.values.ValueInSingleWindow;
import org.codehaus.jackson.JsonParser.Feature;
import org.slf4j.Logger;

@AutoValue
Expand All @@ -38,17 +43,18 @@ public abstract class WriteToStore extends PTransform<PCollection<FeatureRow>, P

public abstract Store getStore();

public abstract FeatureSetSpec getFeatureSetSpec();
public abstract Map<String, FeatureSetSpec> getFeatureSetSpecs();

public static Builder newBuilder() {
return new AutoValue_WriteToStore.Builder();
}

@AutoValue.Builder
public abstract static class Builder {

public abstract Builder setStore(Store store);

public abstract Builder setFeatureSetSpec(FeatureSetSpec featureSetSpec);
public abstract Builder setFeatureSetSpecs(Map<String, FeatureSetSpec> featureSetSpecs);

public abstract WriteToStore build();
}
Expand All @@ -64,34 +70,34 @@ public PDone expand(PCollection<FeatureRow> input) {
input
.apply(
"FeatureRowToRedisMutation",
ParDo.of(new FeatureRowToRedisMutationDoFn(getFeatureSetSpec())))
ParDo.of(new FeatureRowToRedisMutationDoFn(getFeatureSetSpecs())))
.apply(
"WriteRedisMutationToRedis",
RedisCustomIO.write(redisConfig.getHost(), redisConfig.getPort()));
break;
case BIGQUERY:

BigQueryConfig bigqueryConfig = getStore().getBigqueryConfig();
String tableSpec =
String.format(
"%s:%s.%s_v%s",
bigqueryConfig.getProjectId(),
bigqueryConfig.getDatasetId(),
getFeatureSetSpec().getName(),
getFeatureSetSpec().getVersion());
TimePartitioning timePartitioning =
new TimePartitioning()
.setType("DAY")
.setField(FeatureRowToTableRowDoFn.getEventTimestampColumn());
.setField(FeatureRowToTableRow.getEventTimestampColumn());

WriteResult bigqueryWriteResult =
input
.apply(
"FeatureRowToTableRow",
ParDo.of(new FeatureRowToTableRowDoFn(options.getJobName())))
.apply(
"WriteTableRowToBigQuery",
BigQueryIO.writeTableRows()
.to(tableSpec)
BigQueryIO.<FeatureRow>write()
.to((SerializableFunction<ValueInSingleWindow<FeatureRow>, TableDestination>) element -> {
String[] split = element.getValue().getFeatureSet().split(":");
return new TableDestination(String.format(
"%s:%s.%s_v%s",
bigqueryConfig.getProjectId(),
bigqueryConfig.getDatasetId(),
split[0],
split[1]), null);
})
.withFormatFunction(new FeatureRowToTableRow(options.getJobName()))
.withCreateDisposition(CreateDisposition.CREATE_NEVER)
.withWriteDisposition(WriteDisposition.WRITE_APPEND)
.withExtendedErrorInfo()
Expand Down
Loading

0 comments on commit e54780b

Please sign in to comment.