Skip to content

Commit

Permalink
make join def explicit, allow optional scopes while joining (#15)
Browse files Browse the repository at this point in the history
  • Loading branch information
shuttie authored Jun 25, 2021
1 parent 2dc0215 commit 74f51c2
Show file tree
Hide file tree
Showing 9 changed files with 54 additions and 32 deletions.
1 change: 1 addition & 0 deletions api/src/main/scala/io/findify/featury/api/MetricsApi.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import io.findify.featury.model._
import io.prometheus.client.{CollectorRegistry, Counter, Gauge, Histogram, Summary}
import io.prometheus.client.exporter.common.TextFormat
import org.http4s.HttpRoutes
import org.http4s.Uri.Path.Segment
import org.http4s.dsl.io._

import java.io.{ByteArrayOutputStream, OutputStreamWriter}
Expand Down
4 changes: 2 additions & 2 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@ import Deps._

name := "featury"

version := "0.1"
version := "0.1.1"

lazy val shared = Seq(
scalaVersion := "2.12.14",
version := "0.1",
version := "0.1.1",
organization := "io.findify"
)

Expand Down
1 change: 1 addition & 0 deletions core/src/main/protobuf/featury.proto
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ option (scalapb.options) = {
};

message Key {
option (scalapb.message).companion_extends = "io.findify.featury.model.KeyCompanionOps";
option (scalapb.message).companion_extends = "io.findify.featury.model.json.KeyJson";
message Namespace {
option (scalapb.message).extends = "AnyVal";
Expand Down
13 changes: 13 additions & 0 deletions core/src/main/scala/io/findify/featury/model/KeyCompanionOps.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package io.findify.featury.model

import io.findify.featury.model.Key.{Id, Tenant}

trait KeyCompanionOps {
def apply(conf: FeatureConfig, tenant: Tenant, id: Id): Key = new Key(
ns = conf.ns,
scope = conf.scope,
name = conf.name,
tenant = tenant,
id = id
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,6 @@ import io.findify.featury.model.Key.{Scope, Id, Namespace, Tenant}

trait ScopeKeyOps {
def apply(key: Key): ScopeKey = ScopeKey(key.ns, key.scope, key.tenant, key.id)
def make(ns: String, scope: String, tenant: String, id: String): ScopeKey =
ScopeKey(Namespace(ns), Scope(scope), Tenant(tenant), Id(id))
def make(ns: String, scope: String, tenant: String, id: String) =
Option(ScopeKey(Namespace(ns), Scope(scope), Tenant(tenant), Id(id)))
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,10 @@ import org.apache.flink.util.Collector

import scala.collection.JavaConverters._

class FeatureJoinFunction[T]()(implicit
class FeatureJoinFunction[T](by: Join[T])(implicit
ki: TypeInformation[String],
vi: TypeInformation[FeatureValue],
join: Join[T]
) extends KeyedCoProcessFunction[ScopeKey, T, FeatureValue, T]
vi: TypeInformation[FeatureValue]
) extends KeyedCoProcessFunction[Option[ScopeKey], T, FeatureValue, T]
with CheckpointedFunction {

var lastValues: MapState[String, FeatureValue] = _
Expand All @@ -31,25 +30,27 @@ class FeatureJoinFunction[T]()(implicit

override def processElement1(
value: T,
ctx: KeyedCoProcessFunction[ScopeKey, T, FeatureValue, T]#Context,
ctx: KeyedCoProcessFunction[Option[ScopeKey], T, FeatureValue, T]#Context,
out: Collector[T]
): Unit = {
val values = lastValues.values().asScala.toList
if (values.nonEmpty) {
out.collect(join.appendValues(value, values))
} else {
out.collect(value)
if (ctx.getCurrentKey.isDefined) {
val values = lastValues.values().asScala.toList
if (values.nonEmpty) {
out.collect(by.appendValues(value, values))
} else {
out.collect(value)
}
}
val br = 1
}

override def processElement2(
value: FeatureValue,
ctx: KeyedCoProcessFunction[ScopeKey, T, FeatureValue, T]#Context,
ctx: KeyedCoProcessFunction[Option[ScopeKey], T, FeatureValue, T]#Context,
out: Collector[T]
): Unit = {
lastValues.put(value.key.name.value, value)
val br = 1
if (ctx.getCurrentKey.isDefined) {
lastValues.put(value.key.name.value, value)
}
}

}
13 changes: 6 additions & 7 deletions flink/src/main/scala/io/findify/featury/flink/FeaturyFlow.scala
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,9 @@ import org.apache.flink.streaming.api.scala.extensions._
object FeaturyFlow {
import io.findify.featury.flink.util.StreamName._

def join[T](values: DataStream[FeatureValue], events: DataStream[T], scopes: List[Scope])(implicit
j: Join[T],
def join[T](values: DataStream[FeatureValue], events: DataStream[T], scopes: List[Scope], by: Join[T])(implicit
ti: TypeInformation[T],
ki: TypeInformation[ScopeKey],
ki: TypeInformation[Option[ScopeKey]],
si: TypeInformation[String],
fvi: TypeInformation[FeatureValue]
): DataStream[T] =
Expand All @@ -22,10 +21,10 @@ object FeaturyFlow {
case head :: tail =>
val result = events
.connect(values)
.keyBy[ScopeKey](t => j.scopedKey(t, head), t => ScopeKey(t.key))
.process(new FeatureJoinFunction[T]())
.id(s"join-$head")
join(values, result, tail)
.keyBy[Option[ScopeKey]](t => by.scopedKey(t, head), t => Some(ScopeKey(t.key)))
.process(new FeatureJoinFunction[T](by))
.id(s"join-${head.value}")
join(values, result, tail, by)

}

Expand Down
3 changes: 1 addition & 2 deletions flink/src/main/scala/io/findify/featury/flink/Join.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,5 @@ import io.findify.featury.model.{FeatureValue, ScopeKey}

trait Join[T] extends Serializable {
def appendValues(self: T, values: List[FeatureValue]): T
def scopedKey(value: T, scope: Scope): ScopeKey

def scopedKey(value: T, scope: Scope): Option[ScopeKey]
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,13 @@
package io.findify.featury.flink

import io.findify.featury.flink.FeatureJoinTest.{MerchantScope, ProductLine, ProductScope, SearchScope, UserScope}
import io.findify.featury.flink.FeatureJoinTest.{
MerchantScope,
ProductLine,
ProductScope,
SearchScope,
UserScope,
productJoin
}
import io.findify.featury.model.FeatureConfig.{CounterConfig, ScalarConfig}
import io.findify.featury.model.Key.{FeatureName, Namespace, Scope}
import io.findify.featury.model.{
Expand Down Expand Up @@ -58,7 +65,8 @@ class FeatureJoinTest extends AnyFlatSpec with Matchers with FlinkStreamTest {
val features = FeaturyFlow.process(writes, schema)

val joined =
FeaturyFlow.join[ProductLine](features, sessions, List(MerchantScope, ProductScope, SearchScope, UserScope))
FeaturyFlow
.join[ProductLine](features, sessions, List(MerchantScope, ProductScope, SearchScope, UserScope), productJoin)

val result = joined.executeAndCollect(100)
result.headOption shouldBe Some(
Expand Down Expand Up @@ -89,16 +97,16 @@ object FeatureJoinTest {
values: List[FeatureValue] = Nil
)

implicit val productJoin: Join[ProductLine] = new Join[ProductLine] {
val productJoin: Join[ProductLine] = new Join[ProductLine] {
override def appendValues(self: ProductLine, values: List[FeatureValue]): ProductLine =
self.copy(values = values ++ self.values)

override def scopedKey(value: ProductLine, scope: Scope): ScopeKey = scope match {
override def scopedKey(value: ProductLine, scope: Scope): Option[ScopeKey] = scope match {
case MerchantScope => ScopeKey.make("dev", "merchant", "1", value.merchant)
case ProductScope => ScopeKey.make("dev", "product", "1", value.product)
case SearchScope => ScopeKey.make("dev", "search", "1", value.search)
case UserScope => ScopeKey.make("dev", "user", "1", value.search)
case _ => ???
case _ => None
}
}

Expand Down

0 comments on commit 74f51c2

Please sign in to comment.