diff --git a/api/src/main/scala/io/findify/featury/api/MetricsApi.scala b/api/src/main/scala/io/findify/featury/api/MetricsApi.scala index 88bef38..30b5738 100644 --- a/api/src/main/scala/io/findify/featury/api/MetricsApi.scala +++ b/api/src/main/scala/io/findify/featury/api/MetricsApi.scala @@ -6,6 +6,7 @@ import io.findify.featury.model._ import io.prometheus.client.{CollectorRegistry, Counter, Gauge, Histogram, Summary} import io.prometheus.client.exporter.common.TextFormat import org.http4s.HttpRoutes +import org.http4s.Uri.Path.Segment import org.http4s.dsl.io._ import java.io.{ByteArrayOutputStream, OutputStreamWriter} diff --git a/build.sbt b/build.sbt index c209f2b..367f287 100644 --- a/build.sbt +++ b/build.sbt @@ -2,11 +2,11 @@ import Deps._ name := "featury" -version := "0.1" +version := "0.1.1" lazy val shared = Seq( scalaVersion := "2.12.14", - version := "0.1", + version := "0.1.1", organization := "io.findify" ) diff --git a/core/src/main/protobuf/featury.proto b/core/src/main/protobuf/featury.proto index 72f447f..5d78f34 100644 --- a/core/src/main/protobuf/featury.proto +++ b/core/src/main/protobuf/featury.proto @@ -12,6 +12,7 @@ option (scalapb.options) = { }; message Key { + option (scalapb.message).companion_extends = "io.findify.featury.model.KeyCompanionOps"; option (scalapb.message).companion_extends = "io.findify.featury.model.json.KeyJson"; message Namespace { option (scalapb.message).extends = "AnyVal"; diff --git a/core/src/main/scala/io/findify/featury/model/KeyCompanionOps.scala b/core/src/main/scala/io/findify/featury/model/KeyCompanionOps.scala new file mode 100644 index 0000000..d249497 --- /dev/null +++ b/core/src/main/scala/io/findify/featury/model/KeyCompanionOps.scala @@ -0,0 +1,13 @@ +package io.findify.featury.model + +import io.findify.featury.model.Key.{Id, Tenant} + +trait KeyCompanionOps { + def apply(conf: FeatureConfig, tenant: Tenant, id: Id): Key = new Key( + ns = conf.ns, + scope = conf.scope, + name = conf.name, + tenant = tenant, + id = id + ) +} diff --git a/core/src/main/scala/io/findify/featury/model/ScopeKeyOps.scala b/core/src/main/scala/io/findify/featury/model/ScopeKeyOps.scala index 11201c0..195c2a3 100644 --- a/core/src/main/scala/io/findify/featury/model/ScopeKeyOps.scala +++ b/core/src/main/scala/io/findify/featury/model/ScopeKeyOps.scala @@ -4,6 +4,6 @@ import io.findify.featury.model.Key.{Scope, Id, Namespace, Tenant} trait ScopeKeyOps { def apply(key: Key): ScopeKey = ScopeKey(key.ns, key.scope, key.tenant, key.id) - def make(ns: String, scope: String, tenant: String, id: String): ScopeKey = - ScopeKey(Namespace(ns), Scope(scope), Tenant(tenant), Id(id)) + def make(ns: String, scope: String, tenant: String, id: String) = + Option(ScopeKey(Namespace(ns), Scope(scope), Tenant(tenant), Id(id))) } diff --git a/flink/src/main/scala/io/findify/featury/flink/FeatureJoinFunction.scala b/flink/src/main/scala/io/findify/featury/flink/FeatureJoinFunction.scala index c927612..06f4b69 100644 --- a/flink/src/main/scala/io/findify/featury/flink/FeatureJoinFunction.scala +++ b/flink/src/main/scala/io/findify/featury/flink/FeatureJoinFunction.scala @@ -13,11 +13,10 @@ import org.apache.flink.util.Collector import scala.collection.JavaConverters._ -class FeatureJoinFunction[T]()(implicit +class FeatureJoinFunction[T](by: Join[T])(implicit ki: TypeInformation[String], - vi: TypeInformation[FeatureValue], - join: Join[T] -) extends KeyedCoProcessFunction[ScopeKey, T, FeatureValue, T] + vi: TypeInformation[FeatureValue] +) extends KeyedCoProcessFunction[Option[ScopeKey], T, FeatureValue, T] with CheckpointedFunction { var lastValues: MapState[String, FeatureValue] = _ @@ -31,25 +30,27 @@ class FeatureJoinFunction[T]()(implicit override def processElement1( value: T, - ctx: KeyedCoProcessFunction[ScopeKey, T, FeatureValue, T]#Context, + ctx: KeyedCoProcessFunction[Option[ScopeKey], T, FeatureValue, T]#Context, out: Collector[T] ): Unit = { - val values = lastValues.values().asScala.toList - if (values.nonEmpty) { - out.collect(join.appendValues(value, values)) - } else { - out.collect(value) + if (ctx.getCurrentKey.isDefined) { + val values = lastValues.values().asScala.toList + if (values.nonEmpty) { + out.collect(by.appendValues(value, values)) + } else { + out.collect(value) + } } - val br = 1 } override def processElement2( value: FeatureValue, - ctx: KeyedCoProcessFunction[ScopeKey, T, FeatureValue, T]#Context, + ctx: KeyedCoProcessFunction[Option[ScopeKey], T, FeatureValue, T]#Context, out: Collector[T] ): Unit = { - lastValues.put(value.key.name.value, value) - val br = 1 + if (ctx.getCurrentKey.isDefined) { + lastValues.put(value.key.name.value, value) + } } } diff --git a/flink/src/main/scala/io/findify/featury/flink/FeaturyFlow.scala b/flink/src/main/scala/io/findify/featury/flink/FeaturyFlow.scala index ffdbf4d..2aaa250 100644 --- a/flink/src/main/scala/io/findify/featury/flink/FeaturyFlow.scala +++ b/flink/src/main/scala/io/findify/featury/flink/FeaturyFlow.scala @@ -10,10 +10,9 @@ import org.apache.flink.streaming.api.scala.extensions._ object FeaturyFlow { import io.findify.featury.flink.util.StreamName._ - def join[T](values: DataStream[FeatureValue], events: DataStream[T], scopes: List[Scope])(implicit - j: Join[T], + def join[T](values: DataStream[FeatureValue], events: DataStream[T], scopes: List[Scope], by: Join[T])(implicit ti: TypeInformation[T], - ki: TypeInformation[ScopeKey], + ki: TypeInformation[Option[ScopeKey]], si: TypeInformation[String], fvi: TypeInformation[FeatureValue] ): DataStream[T] = @@ -22,10 +21,10 @@ object FeaturyFlow { case head :: tail => val result = events .connect(values) - .keyBy[ScopeKey](t => j.scopedKey(t, head), t => ScopeKey(t.key)) - .process(new FeatureJoinFunction[T]()) - .id(s"join-$head") - join(values, result, tail) + .keyBy[Option[ScopeKey]](t => by.scopedKey(t, head), t => Some(ScopeKey(t.key))) + .process(new FeatureJoinFunction[T](by)) + .id(s"join-${head.value}") + join(values, result, tail, by) } diff --git a/flink/src/main/scala/io/findify/featury/flink/Join.scala b/flink/src/main/scala/io/findify/featury/flink/Join.scala index fdf8d4f..e50d639 100644 --- a/flink/src/main/scala/io/findify/featury/flink/Join.scala +++ b/flink/src/main/scala/io/findify/featury/flink/Join.scala @@ -5,6 +5,5 @@ import io.findify.featury.model.{FeatureValue, ScopeKey} trait Join[T] extends Serializable { def appendValues(self: T, values: List[FeatureValue]): T - def scopedKey(value: T, scope: Scope): ScopeKey - + def scopedKey(value: T, scope: Scope): Option[ScopeKey] } diff --git a/flink/src/test/scala/io/findify/featury/flink/FeatureJoinTest.scala b/flink/src/test/scala/io/findify/featury/flink/FeatureJoinTest.scala index d45e04a..4a4ced5 100644 --- a/flink/src/test/scala/io/findify/featury/flink/FeatureJoinTest.scala +++ b/flink/src/test/scala/io/findify/featury/flink/FeatureJoinTest.scala @@ -1,6 +1,13 @@ package io.findify.featury.flink -import io.findify.featury.flink.FeatureJoinTest.{MerchantScope, ProductLine, ProductScope, SearchScope, UserScope} +import io.findify.featury.flink.FeatureJoinTest.{ + MerchantScope, + ProductLine, + ProductScope, + SearchScope, + UserScope, + productJoin +} import io.findify.featury.model.FeatureConfig.{CounterConfig, ScalarConfig} import io.findify.featury.model.Key.{FeatureName, Namespace, Scope} import io.findify.featury.model.{ @@ -58,7 +65,8 @@ class FeatureJoinTest extends AnyFlatSpec with Matchers with FlinkStreamTest { val features = FeaturyFlow.process(writes, schema) val joined = - FeaturyFlow.join[ProductLine](features, sessions, List(MerchantScope, ProductScope, SearchScope, UserScope)) + FeaturyFlow + .join[ProductLine](features, sessions, List(MerchantScope, ProductScope, SearchScope, UserScope), productJoin) val result = joined.executeAndCollect(100) result.headOption shouldBe Some( @@ -89,16 +97,16 @@ object FeatureJoinTest { values: List[FeatureValue] = Nil ) - implicit val productJoin: Join[ProductLine] = new Join[ProductLine] { + val productJoin: Join[ProductLine] = new Join[ProductLine] { override def appendValues(self: ProductLine, values: List[FeatureValue]): ProductLine = self.copy(values = values ++ self.values) - override def scopedKey(value: ProductLine, scope: Scope): ScopeKey = scope match { + override def scopedKey(value: ProductLine, scope: Scope): Option[ScopeKey] = scope match { case MerchantScope => ScopeKey.make("dev", "merchant", "1", value.merchant) case ProductScope => ScopeKey.make("dev", "product", "1", value.product) case SearchScope => ScopeKey.make("dev", "search", "1", value.search) case UserScope => ScopeKey.make("dev", "user", "1", value.search) - case _ => ??? + case _ => None } }