Skip to content

Commit

Permalink
get rid of kryo (#6)
Browse files Browse the repository at this point in the history
  • Loading branch information
shuttie authored Jun 16, 2021
1 parent e78d710 commit 85633d1
Show file tree
Hide file tree
Showing 18 changed files with 52 additions and 87 deletions.
7 changes: 2 additions & 5 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,8 @@ version := "0.0.1-SNAPSHOT"

lazy val shared = Seq(
organization := "io.findify",
scalaVersion := "2.12.13",
scalaVersion := "2.12.14",
scalacOptions ++= Seq("-feature", "-deprecation"),
libraryDependencies ++= Seq(
"com.github.blemale" %% "scaffeine" % "4.0.2"
),
version := "0.0.1-SNAPSHOT",
publishMavenStyle := true,
publishTo := sonatypePublishToBundle.value,
Expand All @@ -27,7 +24,7 @@ lazy val shared = Seq(
)
)

scalaVersion := "2.12.13"
scalaVersion := "2.12.14"

lazy val core = (project in file("core")).settings(shared: _*)

Expand Down
2 changes: 1 addition & 1 deletion connector/redis/build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,5 @@ import Deps._
name := "featury-redis"

libraryDependencies ++= Seq(
"redis.clients" % "jedis" % "3.6.0"
"redis.clients" % "jedis" % "3.6.1"
)
5 changes: 3 additions & 2 deletions core/build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@ libraryDependencies ++= Seq(
"io.circe" %% "circe-generic-extras" % circeVersion,
"io.circe" %% "circe-parser" % circeVersion,
"org.typelevel" %% "cats-effect" % "3.1.1",
"com.github.pureconfig" %% "pureconfig" % "0.15.0",
"com.github.pureconfig" %% "pureconfig" % "0.16.0",
"org.typelevel" %% "log4cats-core" % log4catsVersion,
"org.typelevel" %% "log4cats-slf4j" % log4catsVersion
"org.typelevel" %% "log4cats-slf4j" % log4catsVersion,
"com.github.blemale" %% "scaffeine" % "5.0.0"
)

Compile / PB.targets := Seq(
Expand Down
6 changes: 4 additions & 2 deletions flink/build.sbt
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
import Deps._
resolvers += "Sonatype OSS Snapshots" at "https://oss.sonatype.org/content/repositories/snapshots"

name := "featury-flink"

lazy val flinkVersion = "1.13.0"
lazy val flinkVersion = "1.13.1"

libraryDependencies ++= Seq(
"org.apache.flink" %% "flink-scala" % flinkVersion % "provided",
"org.apache.flink" % "flink-connector-files" % flinkVersion % "provided",
"org.apache.flink" %% "flink-test-utils" % flinkVersion % "provided, test",
"org.apache.flink" %% "flink-streaming-scala" % flinkVersion % "provided",
"com.github.luben" % "zstd-jni" % "1.4.9-5"
"com.github.luben" % "zstd-jni" % "1.5.0-2",
"io.findify" %% "flink-adt" % "0.4.2"
)
Original file line number Diff line number Diff line change
@@ -1,53 +1,28 @@
package io.findify.featury.flink

import io.findify.featury.flink.FeatureProcessFunction.stateTag
import io.findify.featury.flink.feature.{
FlinkBoundedList,
FlinkCounter,
FlinkFreqEstimator,
FlinkPeriodicCounter,
FlinkScalarFeature,
FlinkStatsEstimator
}
import io.findify.featury.model.Feature.{
BoundedList,
Counter,
FreqEstimator,
PeriodicCounter,
ScalarFeature,
StatsEstimator
}
import io.findify.featury.model.FeatureConfig.{
BoundedListConfig,
CounterConfig,
FreqEstimatorConfig,
PeriodicCounterConfig,
ScalarConfig,
StatsEstimatorConfig
}
import io.findify.featury.model.Write.{Append, Increment, PeriodicIncrement, Put, PutFreqSample, PutStatSample}
import io.findify.featury.model.{
Feature,
FeatureConfig,
FeatureKey,
FeatureValue,
FeatureValueMessage,
Key,
Schema,
State,
Timestamp,
Write
}
import io.findify.featury.flink.feature._
import io.findify.featury.model.Feature._
import io.findify.featury.model.FeatureConfig._
import io.findify.featury.model.Write._
import io.findify.featury.model._
import org.apache.flink.api.common.state.{KeyedStateStore, ValueState, ValueStateDescriptor}
import org.apache.flink.api.common.typeinfo.{TypeInfo, TypeInformation}
import org.apache.flink.runtime.state.{FunctionInitializationContext, FunctionSnapshotContext}
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.scala.OutputTag
import org.apache.flink.util.Collector
import org.apache.flink.streaming.api.scala._

class FeatureProcessFunction(schema: Schema)
extends KeyedProcessFunction[Key, Write, FeatureValue]
class FeatureProcessFunction(schema: Schema)(implicit
longTI: TypeInformation[Long],
intTI: TypeInformation[Int],
doubleTI: TypeInformation[Double],
tvTI: TypeInformation[TimeValue],
stringTI: TypeInformation[String],
scalarTI: TypeInformation[Scalar],
stateTI: TypeInformation[State]
) extends KeyedProcessFunction[Key, Write, FeatureValue]
with CheckpointedFunction {

@transient var features: Map[FeatureKey, Feature[_ <: Write, _ <: FeatureValue, _ <: FeatureConfig, _ <: State]] = _
Expand Down Expand Up @@ -105,5 +80,5 @@ class FeatureProcessFunction(schema: Schema)
}

object FeatureProcessFunction {
val stateTag = OutputTag[State]("side-output")
def stateTag(implicit stateTI: TypeInformation[State]) = OutputTag[State]("side-output")
}

This file was deleted.

18 changes: 15 additions & 3 deletions flink/src/main/scala/io/findify/featury/flink/FeaturyFlow.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,16 @@ import org.apache.flink.api.common.eventtime.{SerializableTimestampAssigner, Wat
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.streaming.api.scala.DataStream
import org.apache.flink.streaming.api.scala.extensions._
import org.apache.flink.api.scala._

object FeaturyFlow {
import io.findify.featury.flink.util.StreamName._

def join[T](values: DataStream[FeatureValue], events: DataStream[T], scopes: List[Scope])(implicit
j: Join[T],
ti: TypeInformation[T]
ti: TypeInformation[T],
ki: TypeInformation[ScopeKey],
si: TypeInformation[String],
fvi: TypeInformation[FeatureValue]
): DataStream[T] =
scopes match {
case Nil => events
Expand All @@ -27,7 +29,17 @@ object FeaturyFlow {

}

def process(stream: DataStream[Write], schema: Schema): DataStream[FeatureValue] = {
def process(stream: DataStream[Write], schema: Schema)(implicit
ki: TypeInformation[Key],
fvi: TypeInformation[FeatureValue],
longTI: TypeInformation[Long],
intTI: TypeInformation[Int],
doubleTI: TypeInformation[Double],
tvTI: TypeInformation[TimeValue],
stringTI: TypeInformation[String],
scalarTI: TypeInformation[Scalar],
stateTI: TypeInformation[State]
): DataStream[FeatureValue] = {
stream
.assignTimestampsAndWatermarks(
WatermarkStrategy
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import io.findify.featury.model.Write.{Increment, Put}
import io.findify.featury.utils.TestKey
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers
import org.apache.flink.api.scala._
import io.findify.flinkadt.api._

import scala.concurrent.duration._

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ trait FlinkStreamTest extends BeforeAndAfterAll { this: Suite =>
env.setRuntimeMode(RuntimeExecutionMode.BATCH)
env.enableCheckpointing(1000)
env.setRestartStrategy(RestartStrategies.noRestart())
//env.getConfig.disableGenericTypes()
env.getConfig.disableGenericTypes()
env
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package io.findify.featury.flink.feature

import io.findify.featury.features.BoundedListSuite
import io.findify.featury.flink.{FeaturyFlow, FlinkStreamTest}
import io.findify.featury.model.FeatureConfig.{BoundedListConfig, ScalarConfig}
import io.findify.featury.model.Key.{Id, Tenant}
import io.findify.featury.model.Write.Append
import io.findify.featury.model.{
Expand All @@ -15,7 +14,7 @@ import io.findify.featury.model.{
Schema,
Write
}
import org.apache.flink.api.scala._
import io.findify.flinkadt.api._

import scala.concurrent.duration._

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import io.findify.featury.model.FeatureConfig.{CounterConfig, ScalarConfig}
import io.findify.featury.model.{CounterValue, FeatureKey, FeatureValue, Key, SString, Schema, Timestamp, Write}
import io.findify.featury.model.Key.{FeatureName, Scope, Id, Namespace, Tenant}
import io.findify.featury.model.Write.{Append, Increment}
import org.apache.flink.api.scala._
import io.findify.flinkadt.api._

import scala.concurrent.duration._

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import io.findify.featury.model.FeatureConfig.{FreqEstimatorConfig, PeriodicCoun
import io.findify.featury.model.Key.{Id, Tenant}
import io.findify.featury.model.Write.{PeriodicIncrement, PutFreqSample}
import io.findify.featury.model.{FeatureKey, FeatureValue, FrequencyValue, Key, PeriodicCounterValue, Schema, Write}
import org.apache.flink.api.scala._
import io.findify.flinkadt.api._

import scala.concurrent.duration._

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import io.findify.featury.model.{
}
import io.findify.featury.model.Key.{Id, Tenant}
import io.findify.featury.model.Write.{Append, PeriodicIncrement}
import org.apache.flink.api.scala._
import io.findify.flinkadt.api._

import scala.concurrent.duration._

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import io.findify.featury.model.Key.{Id, Tenant}
import io.findify.featury.model.Write.{Append, Put}

import scala.concurrent.duration._
import org.apache.flink.api.scala._
import io.findify.flinkadt.api._

class FlinkScalarFeatureTest extends ScalarFeatureSuite with FlinkStreamTest {
val k = Key(config.ns, config.group, config.name, Tenant("1"), Id("x1"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import io.findify.featury.model.FeatureConfig.{FreqEstimatorConfig, StatsEstimat
import io.findify.featury.model.{FeatureKey, FeatureValue, FrequencyValue, Key, NumStatsValue, Schema, Write}
import io.findify.featury.model.Key.{Id, Tenant}
import io.findify.featury.model.Write.{PutFreqSample, PutStatSample}
import org.apache.flink.api.scala._
import io.findify.flinkadt.api._

import scala.concurrent.duration._

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import io.findify.featury.model.{FeatureValue, Key, SString, ScalarValue, Timest
import io.findify.featury.values.StoreCodec.ProtobufCodec
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers
import org.apache.flink.api.scala._
import io.findify.flinkadt.api._

class FeatureStoreSinkTest extends AnyFlatSpec with Matchers with FlinkStreamTest {
val k = Key(Namespace("ns"), Scope("s"), FeatureName("f1"), Tenant("1"), Id("x1"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import org.apache.flink.api.common.eventtime.WatermarkStrategy
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers
import org.apache.flink.core.fs.Path
import org.apache.flink.api.scala._
import io.findify.flinkadt.api._

import scala.language.higherKinds
import scala.concurrent.duration._
Expand Down
2 changes: 1 addition & 1 deletion project/Deps.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ object Deps {
val log4catsVersion = "2.1.1"
val scalatestVersion = "3.2.9"
val circeVersion = "0.14.1"
val circeYamlVersion = "0.13.1"
val circeYamlVersion = "0.14.0"
val cassandraDriverVersion = "4.11.1"
val scalapbVersion = "0.11.3"
}

0 comments on commit 85633d1

Please sign in to comment.