Skip to content

Commit

Permalink
Avoid error thrown when no spec storage for warehouse/serving is regi…
Browse files Browse the repository at this point in the history
…stered (#83)
  • Loading branch information
pradithya authored and feast-ci-bot committed Jan 18, 2019
1 parent c525845 commit 7032b50
Show file tree
Hide file tree
Showing 2 changed files with 121 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ public class SplitOutputByStore extends PTransform<PFeatureRows, PFeatureRows> {
public PFeatureRows expand(PFeatureRows input) {
Map<String, Write> transforms = getFeatureStoreTransforms();
Set<String> keys = transforms.keySet();
Preconditions.checkArgument(transforms.size() > 0, "no write transforms found");

log.info(String.format("Splitting on keys = [%s]", String.join(",", keys)));
MultiOutputSplit<String> splitter = new MultiOutputSplit<>(selector, keys, specs);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import feast.storage.MockTransforms;
import feast.types.FeatureRowExtendedProto.FeatureRowExtended;
import feast.types.FeatureRowProto.FeatureRow;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import org.apache.beam.sdk.testing.PAssert;
Expand Down Expand Up @@ -240,6 +241,126 @@ public void testSplitWhereFeature2HasNoStoreId() {
}


@Test
public void testSplitWhereNoStorageSpec() {
// There is no storage spec registered and feature 1 has no storage set
// feature1 should get thrown harmlessly

SerializableFunction<FeatureSpec, String> selector = (fs) -> fs.getDataStores().getServing()
.getId();
MockSpecService specService = new MockSpecService();
specService.entitySpecs.put("e1", EntitySpec.getDefaultInstance());
specService.featureSpecs.put(
"f1", FeatureSpec.newBuilder().setEntity("e1")
.build());

Specs specs =
Specs.of(
"jobname",
ImportSpec.newBuilder()
.addEntities("e1")
.setSchema(
Schema.newBuilder()
.addAllFields(
Collections.singletonList(
Field.newBuilder().setFeatureId("f1").build())))
.build(),
specService);
assertNull(specs.getError());
List<FeatureStore> stores = Collections.emptyList();
SplitOutputByStore split = new SplitOutputByStore(stores, selector, specs);

PCollection<FeatureRowExtended> input =
pipeline
.apply(
Create.of(
FeatureRow.newBuilder()
.addFeatures(Features.of("f1", Values.ofInt32(1)))
.build()))
.apply(new ToFeatureRowExtended());
PFeatureRows pfrows = PFeatureRows.of(input);
pfrows = pfrows.apply("do split", split);

PAssert.that(
pfrows
.getErrors()).empty();
PAssert.that(
pfrows
.getMain()
.apply(
MapElements.into(TypeDescriptor.of(FeatureRow.class))
.via(FeatureRowExtended::getRow)))
.containsInAnyOrder(
FeatureRow.newBuilder()
.addFeatures(Features.of("f1", Values.ofInt32(1)))
.setEventTimestamp(Timestamp.getDefaultInstance())
.build());

pipeline.run();
}


@Test
public void testSplitWhereNoStorageSpecForAFeature() {
// There is no storage spec registered but feature 1 has storage set
// feature1 should get thrown harmlessly

SerializableFunction<FeatureSpec, String> selector = (fs) -> fs.getDataStores().getServing()
.getId();
MockSpecService specService = new MockSpecService();
specService.entitySpecs.put("e1", EntitySpec.getDefaultInstance());
specService.featureSpecs.put(
"f1", FeatureSpec.newBuilder().setEntity("e1")
.setDataStores(
DataStores.newBuilder().setServing(DataStore.newBuilder().setId("store1")
.build()))
.build());

Specs specs =
Specs.of(
"jobname",
ImportSpec.newBuilder()
.addEntities("e1")
.setSchema(
Schema.newBuilder()
.addAllFields(
Collections.singletonList(
Field.newBuilder().setFeatureId("f1").build())))
.build(),
specService);
assertNull(specs.getError());
List<FeatureStore> stores = Collections.emptyList();
SplitOutputByStore split = new SplitOutputByStore(stores, selector, specs);

PCollection<FeatureRowExtended> input =
pipeline
.apply(
Create.of(
FeatureRow.newBuilder()
.addFeatures(Features.of("f1", Values.ofInt32(1)))
.build()))
.apply(new ToFeatureRowExtended());
PFeatureRows pfrows = PFeatureRows.of(input);
pfrows = pfrows.apply("do split", split);

PAssert.that(
pfrows
.getErrors()).empty();
PAssert.that(
pfrows
.getMain()
.apply(
MapElements.into(TypeDescriptor.of(FeatureRow.class))
.via(FeatureRowExtended::getRow)))
.containsInAnyOrder(
FeatureRow.newBuilder()
.addFeatures(Features.of("f1", Values.ofInt32(1)))
.setEventTimestamp(Timestamp.getDefaultInstance())
.build());

pipeline.run();
}

@Test
public void testWriteTags() {
TupleTag<FeatureRowExtended> tag1 = new TupleTag<>("TAG1");
Expand Down

0 comments on commit 7032b50

Please sign in to comment.