Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

v0.3 backport: Add feature and feature set labels #737

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 27 additions & 3 deletions core/src/main/java/feast/core/model/FeatureSet.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import feast.core.FeatureSetProto.EntitySpec;
import feast.core.FeatureSetProto.FeatureSetSpec;
import feast.core.FeatureSetProto.FeatureSpec;
import feast.core.util.TypeConversion;
import feast.types.ValueProto.ValueType;
import java.util.ArrayList;
import java.util.HashMap;
Expand Down Expand Up @@ -79,6 +80,10 @@ public class FeatureSet extends AbstractTimestampEntity implements Comparable<Fe
@JoinColumn(name = "source")
private Source source;

// User defined metadata
@Column(name = "labels", columnDefinition = "text")
private String labels;

public FeatureSet() {
super();
}
Expand All @@ -89,22 +94,25 @@ public FeatureSet(
long maxAgeSeconds,
List<Field> entities,
List<Field> features,
Source source) {
Source source,
Map<String, String> labels) {
this.id = String.format("%s:%s", name, version);
this.name = name;
this.version = version;
this.maxAgeSeconds = maxAgeSeconds;
this.entities = entities;
this.features = features;
this.source = source;
this.labels = TypeConversion.convertMapToJsonString(labels);
}

public static FeatureSet fromProto(FeatureSetSpec featureSetSpec) {
Source source = Source.fromProto(featureSetSpec.getSource());
String id = String.format("%s:%d", featureSetSpec.getName(), featureSetSpec.getVersion());
List<Field> features = new ArrayList<>();
for (FeatureSpec feature : featureSetSpec.getFeaturesList()) {
features.add(new Field(id, feature.getName(), feature.getValueType()));
features.add(
new Field(id, feature.getName(), feature.getValueType(), feature.getLabelsMap()));
}
List<Field> entities = new ArrayList<>();
for (EntitySpec entity : featureSetSpec.getEntitiesList()) {
Expand All @@ -117,7 +125,8 @@ public static FeatureSet fromProto(FeatureSetSpec featureSetSpec) {
featureSetSpec.getMaxAge().getSeconds(),
entities,
features,
source);
source,
featureSetSpec.getLabelsMap());
}

public FeatureSetSpec toProto() throws InvalidProtocolBufferException {
Expand All @@ -136,6 +145,7 @@ public FeatureSetSpec toProto() throws InvalidProtocolBufferException {
FeatureSpec.newBuilder()
.setName(feature.getName())
.setValueType(ValueType.Enum.valueOf(feature.getType()))
.putAllLabels(feature.getLabels())
.build());
}
return FeatureSetSpec.newBuilder()
Expand All @@ -145,6 +155,7 @@ public FeatureSetSpec toProto() throws InvalidProtocolBufferException {
.addAllEntities(entitySpecs)
.addAllFeatures(featureSpecs)
.setSource(source.toProto())
.putAllLabels(TypeConversion.convertJsonStringToMap(labels))
.build();
}

Expand Down Expand Up @@ -209,4 +220,17 @@ public boolean equalTo(FeatureSet other) {
public int compareTo(FeatureSet o) {
return Integer.compare(version, o.version);
}

@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (!(obj instanceof FeatureSet)) {
return false;
}
FeatureSet other = (FeatureSet) obj;

return this.equalTo(other) && labels.equals(other.labels);
}
}
23 changes: 20 additions & 3 deletions core/src/main/java/feast/core/model/Field.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@
*/
package feast.core.model;

import feast.core.util.TypeConversion;
import feast.types.ValueProto.ValueType;
import java.util.Map;
import java.util.Objects;
import javax.persistence.Column;
import javax.persistence.Entity;
Expand Down Expand Up @@ -52,18 +54,31 @@ public class Field {
@Column(name = "type", nullable = false)
private String type;

// Labels that this field belongs to
@Column(name = "labels", columnDefinition = "text")
private String labels;

public Field() {
super();
}

public Field(String featureSetId, String name, ValueType.Enum type) {
public Field(String featureSetId, String name, ValueType.Enum type, Map<String, String> labels) {
// TODO: Remove all mention of feature sets inside of this class!
FeatureSet featureSet = new FeatureSet();
featureSet.setId(featureSetId);
this.featureSet = featureSet;
this.id = String.format("%s:%s", featureSetId, name);
this.name = name;
this.type = type.toString();
this.labels = TypeConversion.convertMapToJsonString(labels);
}

public Field(String featureSetId, String name, ValueType.Enum type) {
this(featureSetId, name, type, null);
}

public Map<String, String> getLabels() {
return TypeConversion.convertJsonStringToMap(this.labels);
}

@Override
Expand All @@ -75,11 +90,13 @@ public boolean equals(Object o) {
return false;
}
Field field = (Field) o;
return name.equals(field.getName()) && type.equals(field.getType());
return name.equals(field.getName())
&& type.equals(field.getType())
&& labels.equals(field.labels);
}

@Override
public int hashCode() {
return Objects.hash(super.hashCode(), id, featureSet, name, type);
return Objects.hash(super.hashCode(), id, featureSet, name, type, labels);
}
}
3 changes: 0 additions & 3 deletions core/src/main/java/feast/core/util/TypeConversion.java
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,6 @@ public static Map<String, String> convertJsonStringToMap(String jsonString) {
* @return json string corresponding to given map
*/
public static String convertMapToJsonString(Map<String, String> map) {
if (map.isEmpty()) {
return "{}";
}
return gson.toJson(map);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,19 @@

public class FeatureSetValidator {
public static void validateSpec(FeatureSetSpec featureSetSpec) {
if (featureSetSpec.getLabelsMap().containsKey("")) {
throw new IllegalArgumentException("Feature set label keys must not be empty");
}
checkValidCharacters(featureSetSpec.getName(), "name");
checkUniqueColumns(featureSetSpec.getEntitiesList(), featureSetSpec.getFeaturesList());
for (EntitySpec entitySpec : featureSetSpec.getEntitiesList()) {
checkValidCharacters(entitySpec.getName(), "entities::name");
}
for (FeatureSpec featureSpec : featureSetSpec.getFeaturesList()) {
checkValidCharacters(featureSpec.getName(), "features::name");
if (featureSpec.getLabelsMap().containsKey("")) {
throw new IllegalArgumentException("Feature label keys must not be empty");
}
}
}

Expand Down
141 changes: 113 additions & 28 deletions core/src/test/java/feast/core/service/SpecServiceTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,9 @@
import feast.core.CoreServiceProto.UpdateStoreRequest;
import feast.core.CoreServiceProto.UpdateStoreResponse;
import feast.core.FeatureSetProto;
import feast.core.FeatureSetProto.EntitySpec;
import feast.core.FeatureSetProto.FeatureSetSpec;
import feast.core.FeatureSetProto.FeatureSpec;
import feast.core.SourceProto.KafkaSourceConfig;
import feast.core.SourceProto.SourceType;
import feast.core.StoreProto;
import feast.core.StoreProto.Store.RedisConfig;
import feast.core.StoreProto.Store.StoreType;
Expand All @@ -53,10 +52,7 @@
import feast.core.model.Store;
import feast.types.ValueProto.ValueType.Enum;
import io.grpc.StatusRuntimeException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.*;
import java.util.stream.Collectors;
import org.junit.Before;
import org.junit.Rule;
Expand All @@ -82,26 +78,18 @@ public class SpecServiceTest {
@Before
public void setUp() {
initMocks(this);
defaultSource =
new Source(
SourceType.KAFKA,
KafkaSourceConfig.newBuilder()
.setBootstrapServers("kafka:9092")
.setTopic("my-topic")
.build(),
true);
defaultSource = TestObjectFactory.defaultSource;

FeatureSet featureSet1v1 = newDummyFeatureSet("f1", 1);
FeatureSet featureSet1v2 = newDummyFeatureSet("f1", 2);
FeatureSet featureSet1v3 = newDummyFeatureSet("f1", 3);
FeatureSet featureSet2v1 = newDummyFeatureSet("f2", 1);

Field f3f1 = new Field("f3", "f3f1", Enum.INT64);
Field f3f2 = new Field("f3", "f3f2", Enum.INT64);
Field f3e1 = new Field("f3", "f3e1", Enum.STRING);
Field f3f1 = TestObjectFactory.CreateFeatureField("f3", "f3f1", Enum.INT64);
Field f3f2 = TestObjectFactory.CreateFeatureField("f3", "f3f2", Enum.INT64);
Field f3e1 = TestObjectFactory.CreateEntityField("f3", "f3e1", Enum.STRING);
FeatureSet featureSet3v1 =
new FeatureSet(
"f3", 1, 100L, Arrays.asList(f3e1), Arrays.asList(f3f2, f3f1), defaultSource);
TestObjectFactory.CreateFeatureSet("f3", 1, Arrays.asList(f3e1), Arrays.asList(f3f2, f3f1));

featureSets =
Arrays.asList(featureSet1v1, featureSet1v2, featureSet1v3, featureSet2v1, featureSet3v1);
Expand Down Expand Up @@ -349,12 +337,12 @@ public void applyFeatureSetShouldIncrementFeatureSetVersionIfAlreadyExists()
public void applyFeatureSetShouldNotCreateFeatureSetIfFieldsUnordered()
throws InvalidProtocolBufferException {

Field f3f1 = new Field("f3", "f3f1", Enum.INT64);
Field f3f2 = new Field("f3", "f3f2", Enum.INT64);
Field f3e1 = new Field("f3", "f3e1", Enum.STRING);
Field f3f1 = TestObjectFactory.CreateFeatureField("f3", "f3f1", Enum.INT64);
Field f3f2 = TestObjectFactory.CreateFeatureField("f3", "f3f2", Enum.INT64);
Field f3e1 = TestObjectFactory.CreateEntityField("f3", "f3e1", Enum.STRING);
FeatureSetProto.FeatureSetSpec incomingFeatureSet =
(new FeatureSet(
"f3", 5, 100L, Arrays.asList(f3e1), Arrays.asList(f3f2, f3f1), defaultSource))
(TestObjectFactory.CreateFeatureSet(
"f3", 5, Arrays.asList(f3e1), Arrays.asList(f3f2, f3f1)))
.toProto();

FeatureSetSpec expected = incomingFeatureSet;
Expand All @@ -367,6 +355,100 @@ public void applyFeatureSetShouldNotCreateFeatureSetIfFieldsUnordered()
assertThat(applyFeatureSetResponse.getFeatureSet().getName(), equalTo(expected.getName()));
}

@Test
public void applyFeatureSetShouldAcceptFeatureLabels() throws InvalidProtocolBufferException {
List<FeatureSetProto.EntitySpec> entitySpecs = new ArrayList<>();
entitySpecs.add(
FeatureSetProto.EntitySpec.newBuilder()
.setName("entity1")
.setValueType(Enum.INT64)
.build());

Map<String, String> featureLabels0 =
new HashMap<String, String>() {
{
put("label1", "feast1");
}
};

Map<String, String> featureLabels1 =
new HashMap<String, String>() {
{
put("label1", "feast1");
put("label2", "feast2");
}
};

List<Map<String, String>> featureLabels = new ArrayList<>();
featureLabels.add(featureLabels0);
featureLabels.add(featureLabels1);

List<FeatureSpec> featureSpecs = new ArrayList<>();
featureSpecs.add(
FeatureSpec.newBuilder()
.setName("feature1")
.setValueType(Enum.INT64)
.putAllLabels(featureLabels.get(0))
.build());
featureSpecs.add(
FeatureSpec.newBuilder()
.setName("feature2")
.setValueType(Enum.INT64)
.putAllLabels(featureLabels.get(1))
.build());

FeatureSetSpec featureSetSpec =
FeatureSetSpec.newBuilder()
.setName("featureSetWithConstraints")
.addAllEntities(entitySpecs)
.addAllFeatures(featureSpecs)
.build();

ApplyFeatureSetResponse applyFeatureSetResponse = specService.applyFeatureSet(featureSetSpec);
FeatureSetSpec appliedFeatureSetSpec = applyFeatureSetResponse.getFeatureSet();

// appliedEntitySpecs needs to be sorted because the list returned by specService may not
// follow the order in the request
List<FeatureSetProto.EntitySpec> appliedEntitySpecs =
new ArrayList<>(appliedFeatureSetSpec.getEntitiesList());
appliedEntitySpecs.sort(Comparator.comparing(EntitySpec::getName));

// appliedFeatureSpecs needs to be sorted because the list returned by specService may not
// follow the order in the request
List<FeatureSpec> appliedFeatureSpecs =
new ArrayList<>(appliedFeatureSetSpec.getFeaturesList());
appliedFeatureSpecs.sort(Comparator.comparing(FeatureSpec::getName));

List<Map<String, String>> featureSpecsLabels =
appliedFeatureSpecs.stream().map(e -> e.getLabelsMap()).collect(Collectors.toList());
assertThat(appliedEntitySpecs, equalTo(entitySpecs));
assertThat(appliedFeatureSpecs, equalTo(featureSpecs));
assertThat(featureSpecsLabels, equalTo(featureLabels));
}

@Test
public void applyFeatureSetShouldAcceptFeatureSetLabels() throws InvalidProtocolBufferException {
Map<String, String> featureSetLabels =
new HashMap<String, String>() {
{
put("description", "My precious feature set");
}
};

FeatureSetSpec featureSetSpec =
FeatureSetSpec.newBuilder()
.setName("preciousFeatureSet")
.putAllLabels(featureSetLabels)
.build();

ApplyFeatureSetResponse applyFeatureSetResponse = specService.applyFeatureSet(featureSetSpec);
FeatureSetSpec appliedFeatureSetSpec = applyFeatureSetResponse.getFeatureSet();

Map<String, String> appliedLabels = appliedFeatureSetSpec.getLabelsMap();

assertThat(appliedLabels, equalTo(featureSetLabels));
}

@Test
public void shouldUpdateStoreIfConfigChanges() throws InvalidProtocolBufferException {
when(storeRepository.findById("SERVING")).thenReturn(Optional.of(stores.get(0)));
Expand Down Expand Up @@ -406,10 +488,13 @@ public void shouldDoNothingIfNoChange() throws InvalidProtocolBufferException {
}

private FeatureSet newDummyFeatureSet(String name, int version) {
Field feature = new Field(name, "feature", Enum.INT64);
Field entity = new Field(name, "entity", Enum.STRING);
return new FeatureSet(
name, version, 100L, Arrays.asList(entity), Arrays.asList(feature), defaultSource);
Field feature = TestObjectFactory.CreateFeatureField(name, "feature", Enum.INT64);
Field entity = TestObjectFactory.CreateEntityField(name, "entity", Enum.STRING);

FeatureSet fs =
TestObjectFactory.CreateFeatureSet(
name, version, Arrays.asList(entity), Arrays.asList(feature));
return fs;
}

private Store newDummyStore(String name) {
Expand Down
Loading