Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

make join def explicit, allow optional scopes while joining #15

Merged
merged 1 commit into from
Jun 25, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions api/src/main/scala/io/findify/featury/api/MetricsApi.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import io.findify.featury.model._
import io.prometheus.client.{CollectorRegistry, Counter, Gauge, Histogram, Summary}
import io.prometheus.client.exporter.common.TextFormat
import org.http4s.HttpRoutes
import org.http4s.Uri.Path.Segment
import org.http4s.dsl.io._

import java.io.{ByteArrayOutputStream, OutputStreamWriter}
Expand Down
4 changes: 2 additions & 2 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@ import Deps._

name := "featury"

version := "0.1"
version := "0.1.1"

lazy val shared = Seq(
scalaVersion := "2.12.14",
version := "0.1",
version := "0.1.1",
organization := "io.findify"
)

Expand Down
1 change: 1 addition & 0 deletions core/src/main/protobuf/featury.proto
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ option (scalapb.options) = {
};

message Key {
option (scalapb.message).companion_extends = "io.findify.featury.model.KeyCompanionOps";
option (scalapb.message).companion_extends = "io.findify.featury.model.json.KeyJson";
message Namespace {
option (scalapb.message).extends = "AnyVal";
Expand Down
13 changes: 13 additions & 0 deletions core/src/main/scala/io/findify/featury/model/KeyCompanionOps.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package io.findify.featury.model

import io.findify.featury.model.Key.{Id, Tenant}

trait KeyCompanionOps {
def apply(conf: FeatureConfig, tenant: Tenant, id: Id): Key = new Key(
ns = conf.ns,
scope = conf.scope,
name = conf.name,
tenant = tenant,
id = id
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,6 @@ import io.findify.featury.model.Key.{Scope, Id, Namespace, Tenant}

trait ScopeKeyOps {
def apply(key: Key): ScopeKey = ScopeKey(key.ns, key.scope, key.tenant, key.id)
def make(ns: String, scope: String, tenant: String, id: String): ScopeKey =
ScopeKey(Namespace(ns), Scope(scope), Tenant(tenant), Id(id))
def make(ns: String, scope: String, tenant: String, id: String) =
Option(ScopeKey(Namespace(ns), Scope(scope), Tenant(tenant), Id(id)))
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,10 @@ import org.apache.flink.util.Collector

import scala.collection.JavaConverters._

class FeatureJoinFunction[T]()(implicit
class FeatureJoinFunction[T](by: Join[T])(implicit
ki: TypeInformation[String],
vi: TypeInformation[FeatureValue],
join: Join[T]
) extends KeyedCoProcessFunction[ScopeKey, T, FeatureValue, T]
vi: TypeInformation[FeatureValue]
) extends KeyedCoProcessFunction[Option[ScopeKey], T, FeatureValue, T]
with CheckpointedFunction {

var lastValues: MapState[String, FeatureValue] = _
Expand All @@ -31,25 +30,27 @@ class FeatureJoinFunction[T]()(implicit

override def processElement1(
value: T,
ctx: KeyedCoProcessFunction[ScopeKey, T, FeatureValue, T]#Context,
ctx: KeyedCoProcessFunction[Option[ScopeKey], T, FeatureValue, T]#Context,
out: Collector[T]
): Unit = {
val values = lastValues.values().asScala.toList
if (values.nonEmpty) {
out.collect(join.appendValues(value, values))
} else {
out.collect(value)
if (ctx.getCurrentKey.isDefined) {
val values = lastValues.values().asScala.toList
if (values.nonEmpty) {
out.collect(by.appendValues(value, values))
} else {
out.collect(value)
}
}
val br = 1
}

override def processElement2(
value: FeatureValue,
ctx: KeyedCoProcessFunction[ScopeKey, T, FeatureValue, T]#Context,
ctx: KeyedCoProcessFunction[Option[ScopeKey], T, FeatureValue, T]#Context,
out: Collector[T]
): Unit = {
lastValues.put(value.key.name.value, value)
val br = 1
if (ctx.getCurrentKey.isDefined) {
lastValues.put(value.key.name.value, value)
}
}

}
13 changes: 6 additions & 7 deletions flink/src/main/scala/io/findify/featury/flink/FeaturyFlow.scala
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,9 @@ import org.apache.flink.streaming.api.scala.extensions._
object FeaturyFlow {
import io.findify.featury.flink.util.StreamName._

def join[T](values: DataStream[FeatureValue], events: DataStream[T], scopes: List[Scope])(implicit
j: Join[T],
def join[T](values: DataStream[FeatureValue], events: DataStream[T], scopes: List[Scope], by: Join[T])(implicit
ti: TypeInformation[T],
ki: TypeInformation[ScopeKey],
ki: TypeInformation[Option[ScopeKey]],
si: TypeInformation[String],
fvi: TypeInformation[FeatureValue]
): DataStream[T] =
Expand All @@ -22,10 +21,10 @@ object FeaturyFlow {
case head :: tail =>
val result = events
.connect(values)
.keyBy[ScopeKey](t => j.scopedKey(t, head), t => ScopeKey(t.key))
.process(new FeatureJoinFunction[T]())
.id(s"join-$head")
join(values, result, tail)
.keyBy[Option[ScopeKey]](t => by.scopedKey(t, head), t => Some(ScopeKey(t.key)))
.process(new FeatureJoinFunction[T](by))
.id(s"join-${head.value}")
join(values, result, tail, by)

}

Expand Down
3 changes: 1 addition & 2 deletions flink/src/main/scala/io/findify/featury/flink/Join.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,5 @@ import io.findify.featury.model.{FeatureValue, ScopeKey}

trait Join[T] extends Serializable {
def appendValues(self: T, values: List[FeatureValue]): T
def scopedKey(value: T, scope: Scope): ScopeKey

def scopedKey(value: T, scope: Scope): Option[ScopeKey]
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,13 @@
package io.findify.featury.flink

import io.findify.featury.flink.FeatureJoinTest.{MerchantScope, ProductLine, ProductScope, SearchScope, UserScope}
import io.findify.featury.flink.FeatureJoinTest.{
MerchantScope,
ProductLine,
ProductScope,
SearchScope,
UserScope,
productJoin
}
import io.findify.featury.model.FeatureConfig.{CounterConfig, ScalarConfig}
import io.findify.featury.model.Key.{FeatureName, Namespace, Scope}
import io.findify.featury.model.{
Expand Down Expand Up @@ -58,7 +65,8 @@ class FeatureJoinTest extends AnyFlatSpec with Matchers with FlinkStreamTest {
val features = FeaturyFlow.process(writes, schema)

val joined =
FeaturyFlow.join[ProductLine](features, sessions, List(MerchantScope, ProductScope, SearchScope, UserScope))
FeaturyFlow
.join[ProductLine](features, sessions, List(MerchantScope, ProductScope, SearchScope, UserScope), productJoin)

val result = joined.executeAndCollect(100)
result.headOption shouldBe Some(
Expand Down Expand Up @@ -89,16 +97,16 @@ object FeatureJoinTest {
values: List[FeatureValue] = Nil
)

implicit val productJoin: Join[ProductLine] = new Join[ProductLine] {
val productJoin: Join[ProductLine] = new Join[ProductLine] {
override def appendValues(self: ProductLine, values: List[FeatureValue]): ProductLine =
self.copy(values = values ++ self.values)

override def scopedKey(value: ProductLine, scope: Scope): ScopeKey = scope match {
override def scopedKey(value: ProductLine, scope: Scope): Option[ScopeKey] = scope match {
case MerchantScope => ScopeKey.make("dev", "merchant", "1", value.merchant)
case ProductScope => ScopeKey.make("dev", "product", "1", value.product)
case SearchScope => ScopeKey.make("dev", "search", "1", value.search)
case UserScope => ScopeKey.make("dev", "user", "1", value.search)
case _ => ???
case _ => None
}
}

Expand Down