Skip to content

Commit

Permalink
redis: add persistence class
Browse files Browse the repository at this point in the history
  • Loading branch information
shuttie committed May 17, 2021
1 parent 51a36d9 commit ef21263
Show file tree
Hide file tree
Showing 17 changed files with 142 additions and 21 deletions.
3 changes: 2 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -32,5 +32,6 @@ libraryDependencies ++= Seq(
"com.github.scopt" %% "scopt" % "4.0.1",
"com.github.fppt" % "jedis-mock" % "0.1.19" % Test,
"redis.clients" % "jedis" % "3.6.0",
"com.google.guava" % "guava" % "30.1.1-jre"
"com.google.guava" % "guava" % "30.1.1-jre",
"com.datastax.oss" % "java-driver-core" % "4.11.1"
)
9 changes: 9 additions & 0 deletions docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,12 @@ services:
image: 'redis:6.2.3'
ports:
- '6379:6379'
cassandra:
image: 'cassandra:3.11.10'
ports:
- '9042:9042'
environment:
- JVM_OPTS=-Dcassandra.skip_wait_for_gossip_to_settle=0
- HEAP_NEWSIZE=32m
- MAX_HEAP_SIZE=256m
- CASSANDRA_START_RPC=true
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import io.findify.featury.model.Schema.FeatureConfig

import scala.concurrent.duration._

trait BoundedList[T <: Scalar] extends Feature[BoundedListState[T], BoundedListValue[T]] {
trait BoundedList[T <: Scalar] extends Feature[BoundedListState[T], BoundedListValue[T], BoundedListConfig] {
def config: BoundedListConfig
def fromItems(list: List[ListItem[T]]): BoundedListValue[T]
def put(key: Key, value: T, ts: Timestamp): IO[Unit]
Expand Down
2 changes: 1 addition & 1 deletion src/main/scala/io/findify/featury/feature/Counter.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import io.findify.featury.model.FeatureValue.{Num, NumScalarValue}
import io.findify.featury.model.Key.FeatureName
import io.findify.featury.model.Schema.FeatureConfig

trait Counter extends Feature[CounterState, NumScalarValue] {
trait Counter extends Feature[CounterState, NumScalarValue, CounterConfig] {
def config: CounterConfig
def increment(key: Key, value: Double): IO[Unit]
override def empty(): CounterState = CounterState(0)
Expand Down
5 changes: 4 additions & 1 deletion src/main/scala/io/findify/featury/feature/Feature.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,12 @@ package io.findify.featury.feature

import cats.effect.IO
import io.findify.featury.feature.Feature.State
import io.findify.featury.model.Schema.FeatureConfig
import io.findify.featury.model.{FeatureValue, Key}

trait Feature[S <: State, T <: FeatureValue] {
trait Feature[S <: State, T <: FeatureValue, C <: FeatureConfig] {
def init(): IO[Unit] = IO.unit
def config: C
def empty(): S
def readState(key: Key): IO[S]
def computeValue(state: S): Option[T]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import io.findify.featury.model.Schema.FeatureConfig

import scala.util.Random

trait FreqEstimator extends Feature[FreqEstimatorState, StringFrequencyValue] {
trait FreqEstimator extends Feature[FreqEstimatorState, StringFrequencyValue, FreqEstimatorConfig] {
def config: FreqEstimatorConfig
def put(key: Key, value: String): IO[Unit] = if (Random.nextInt(config.sampleRate) == 0) {
putReal(key, value)
Expand Down
11 changes: 9 additions & 2 deletions src/main/scala/io/findify/featury/feature/PeriodicCounter.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,14 @@ import io.findify.featury.feature.Counter.{CounterConfig, CounterState}
import io.findify.featury.feature.Feature.State
import io.findify.featury.feature.PeriodicCounter.{PeriodicCounterConfig, PeriodicCounterState}
import io.findify.featury.model.FeatureValue.{Num, NumScalarValue, PeriodicNumValue, PeriodicValue}
import io.findify.featury.model.Key.FeatureName
import io.findify.featury.model.Schema.FeatureConfig
import io.findify.featury.model.{Key, Timestamp}

import java.util
import scala.concurrent.duration.FiniteDuration

trait PeriodicCounter extends Feature[PeriodicCounterState, PeriodicNumValue] {
trait PeriodicCounter extends Feature[PeriodicCounterState, PeriodicNumValue, PeriodicCounterConfig] {
def config: PeriodicCounterConfig
def increment(key: Key, ts: Timestamp, value: Double): IO[Unit]
override def empty(): PeriodicCounterState = PeriodicCounterState(Timestamp(0L))
Expand All @@ -35,7 +37,12 @@ trait PeriodicCounter extends Feature[PeriodicCounterState, PeriodicNumValue] {
object PeriodicCounter {
case class RangeCount(start: Timestamp, count: Double)
case class PeriodicCounterState(now: Timestamp, periods: Map[Timestamp, Double] = Map.empty) extends State
case class PeriodicCounterConfig(period: FiniteDuration, count: Int, sumPeriodRanges: List[PeriodRange])
case class PeriodicCounterConfig(
name: FeatureName,
period: FiniteDuration,
count: Int,
sumPeriodRanges: List[PeriodRange]
) extends FeatureConfig
case class PeriodRange(startOffset: Int, endOffset: Int)

}
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import io.findify.featury.model.Schema.FeatureConfig
import scala.collection.JavaConverters._
import scala.util.Random

trait StatsEstimator extends Feature[StatsEstimatorState, NumStatsValue] {
trait StatsEstimator extends Feature[StatsEstimatorState, NumStatsValue, StatsEstimatorConfig] {
def config: StatsEstimatorConfig
final def put(key: Key, value: Double): IO[Unit] = {
if (Random.nextInt(config.sampleRate) == 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,18 @@ import scala.annotation.tailrec
import scala.collection.JavaConverters._

trait RedisBoundedList[T <: Scalar] extends BoundedList[T] {
val SUFFIX = "bl"
import KeyCodec._

def redis: Jedis
def codec: Codec[ListItem[T]]

override def put(key: Key, value: T, ts: Timestamp): IO[Unit] = IO {
redis.rpush(key.toRedisKey("state"), codec.encode(ListItem(value, ts)))
redis.rpush(key.toRedisKey(SUFFIX), codec.encode(ListItem(value, ts)))
}

override def readState(key: Key): IO[BoundedList.BoundedListState[T]] = for {
list <- IO { redis.lrange(key.toRedisKey("state"), 0, -1).asScala.toList }
list <- IO { redis.lrange(key.toRedisKey(SUFFIX), 0, -1).asScala.toList }
decoded <- decodeList(list)
} yield {
decoded
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,13 @@ import redis.clients.jedis.Jedis
import scala.util.{Failure, Success, Try}

class RedisCounter(val config: CounterConfig, val redis: Jedis) extends Counter {
val SUFFIX = "c"
import KeyCodec._

override def increment(key: Key, value: Double): IO[Unit] = IO { redis.incrByFloat(key.toRedisKey("state"), value) }
override def increment(key: Key, value: Double): IO[Unit] = IO { redis.incrByFloat(key.toRedisKey(SUFFIX), value) }

override def readState(key: Key): IO[Counter.CounterState] = for {
bytes <- IO { Option(redis.get(key.toRedisKey("state"))) }
bytes <- IO { Option(redis.get(key.toRedisKey(SUFFIX))) }
value <- IO.fromEither(parseDouble(bytes))
} yield {
CounterState(value)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,17 @@ import redis.clients.jedis.Jedis
import scala.collection.JavaConverters._

class RedisFreqEstimator(val config: FreqEstimatorConfig, redis: Jedis) extends FreqEstimator {
val SUFFIX = "f"
import KeyCodec._
override def putReal(key: Key, value: String): IO[Unit] = {
val multi = redis.multi()
multi.lpush(key.toRedisKey("freq"), value)
multi.ltrim(key.toRedisKey("freq"), 0, config.poolSize)
multi.lpush(key.toRedisKey(SUFFIX), value)
multi.ltrim(key.toRedisKey(SUFFIX), 0, config.poolSize)
IO { multi.exec() }
}

override def readState(key: Key): IO[FreqEstimator.FreqEstimatorState] = for {
response <- IO { redis.lrange(key.toRedisKey("freq"), 0, -1) }
response <- IO { redis.lrange(key.toRedisKey(SUFFIX), 0, -1) }
} yield {
FreqEstimatorState(response.asScala.toVector)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package io.findify.featury.persistence.redis

import cats.effect.IO
import io.findify.featury.feature.PeriodicCounter
import io.findify.featury.feature.PeriodicCounter.{PeriodicCounterConfig, PeriodicCounterState}
import io.findify.featury.model.{BackendError, Key, Timestamp}
import redis.clients.jedis.Jedis

import scala.annotation.tailrec
import scala.collection.JavaConverters._
import scala.util.{Failure, Success, Try}

class RedisPeriodicCounter(val config: PeriodicCounterConfig, redis: Jedis) extends PeriodicCounter {
val SUFFIX = "pc"
val SUFFIX_TS = "pct"
import KeyCodec._
override def increment(key: Key, ts: Timestamp, value: Double): IO[Unit] = for {
_ <- IO { redis.hincrByFloat(key.toRedisKey(SUFFIX), ts.toStartOfPeriod(config.period).ts.toString, value) }
_ <- IO { redis.set(key.toRedisKey(SUFFIX_TS), ts.ts.toString) }
} yield {}

override def readState(key: Key): IO[PeriodicCounter.PeriodicCounterState] = for {
responseOption <- IO { Option(redis.hgetAll(key.toRedisKey(SUFFIX))).map(_.asScala) }
nowOption <- IO { Option(redis.get(key.toRedisKey(SUFFIX_TS))) }
now <- nowOption match {
case Some(value) => IO.fromTry(Try(java.lang.Long.parseLong(value)))
case None => IO.raiseError(BackendError(s"cannot parse ts: $nowOption"))
}
decoded <- responseOption match {
case None => IO.pure(empty())
case Some(response) => parseResponse(response.toList).map(x => PeriodicCounterState(Timestamp(now), x))
}
} yield {
decoded
}

@tailrec private def parseResponse(
values: List[(String, String)],
acc: List[(Timestamp, Double)] = Nil
): IO[Map[Timestamp, Double]] =
values match {
case Nil => IO(acc.toMap)
case (key, value) :: tail =>
(Try(java.lang.Long.parseLong(key)), Try(java.lang.Double.parseDouble(value))) match {
case (Success(ts), Success(inc)) => parseResponse(tail, (Timestamp(ts) -> inc) :: acc)
case (Failure(ex), _) => IO.raiseError(ex)
case (_, Failure(ex)) => IO.raiseError(ex)
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package io.findify.featury.persistence.redis

import cats.effect.IO
import cats.effect.kernel.Resource
import io.findify.featury.feature.{BoundedList, Counter, FreqEstimator, PeriodicCounter, StatsEstimator}
import io.findify.featury.model.FeatureValue
import io.findify.featury.persistence.{Persistence, ValueStore}
import redis.clients.jedis.Jedis

class RedisPersistence(val redis: Jedis) extends Persistence {
override def periodicCounter(config: PeriodicCounter.PeriodicCounterConfig): PeriodicCounter =
???

override def numBoundedList(config: BoundedList.BoundedListConfig): BoundedList[FeatureValue.Num] =
new RedisBoundedList.RedisNumBoundedList(redis, config)

override def textBoundedList(config: BoundedList.BoundedListConfig): BoundedList[FeatureValue.Text] =
new RedisBoundedList.RedisTextBoundedList(redis, config)

override def statsEstimator(config: StatsEstimator.StatsEstimatorConfig): StatsEstimator =
new RedisStatsEstimator(config, redis)

override def counter(config: Counter.CounterConfig): Counter =
new RedisCounter(config, redis)

override def freqEstimator(config: FreqEstimator.FreqEstimatorConfig): FreqEstimator =
new RedisFreqEstimator(config, redis)

override def values(): ValueStore =
new RedisValues(redis)
}

object RedisPersistence {
def resource(host: String, port: Int = 6379) =
Resource.make(IO { new RedisPersistence(new Jedis(host, port)) })(redis => IO(redis.redis.close()))
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,17 @@ import scala.collection.JavaConverters._
import scala.util.{Failure, Success, Try}

class RedisStatsEstimator(val config: StatsEstimatorConfig, redis: Jedis) extends StatsEstimator {
val SUFFIX = "s"
import KeyCodec._
override def putReal(key: Key, value: Double): IO[Unit] = {
val multi = redis.multi()
multi.lpush(key.toRedisKey("stats"), value.toString)
multi.ltrim(key.toRedisKey("stats"), 0, config.poolSize)
multi.lpush(key.toRedisKey(SUFFIX), value.toString)
multi.ltrim(key.toRedisKey(SUFFIX), 0, config.poolSize)
IO { multi.exec() }
}

override def readState(key: Key): IO[StatsEstimator.StatsEstimatorState] = for {
response <- IO { redis.lrange(key.toRedisKey("stats"), 0, -1) }
response <- IO { redis.lrange(key.toRedisKey(SUFFIX), 0, -1) }
decoded <- parseRecursive(response.asScala.toList)
} yield {
StatsEstimatorState(decoded.toVector)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import io.findify.featury.persistence.Persistence
import io.findify.featury.service.FeatureService.FeatureKey
import shapeless.syntax.typeable._

trait FeatureService[W <: WriteAction, F <: Feature[_, _]] {
trait FeatureService[W <: WriteAction, F <: Feature[_, _, _]] {
def mapping: Map[FeatureKey, F]
def write(action: W, feature: F): IO[Unit]
def select(action: WriteAction): Option[W]
Expand Down Expand Up @@ -63,7 +63,7 @@ object FeatureService {
override def select(action: WriteAction): Option[WriteNumList] = action.cast[WriteNumList]
}

def load[F <: Feature[_, _]](schema: Schema)(pf: PartialFunction[FeatureConfig, F]): Map[FeatureKey, F] = {
def load[F <: Feature[_, _, _]](schema: Schema)(pf: PartialFunction[FeatureConfig, F]): Map[FeatureKey, F] = {
val result = for {
ns <- schema.namespaces
group <- ns.groups
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import cats.effect.unsafe.implicits.global
import cats.effect.{IO, Resource}
import io.findify.featury.feature.PeriodicCounter.{PeriodRange, PeriodicCounterConfig}
import io.findify.featury.model.FeatureValue.{PeriodicNumValue, PeriodicValue}
import io.findify.featury.model.Key.FeatureName
import io.findify.featury.model.Timestamp
import io.findify.featury.util.TestKey
import org.scalatest.Outcome
Expand All @@ -15,7 +16,8 @@ import scala.concurrent.duration._
trait PeriodicCounterSuite extends FixtureAnyFlatSpec with Matchers {
type FixtureParam = PeriodicCounter

def config: PeriodicCounterConfig = PeriodicCounterConfig(1.day, 10, List(PeriodRange(0, 0), PeriodRange(7, 0)))
def config: PeriodicCounterConfig =
PeriodicCounterConfig(FeatureName("f1"), 1.day, 10, List(PeriodRange(0, 0), PeriodRange(7, 0)))
def makeCounter(): Resource[IO, FixtureParam]

override def withFixture(test: OneArgTest): Outcome = {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package io.findify.featury.persistence.redis

import cats.effect.{IO, Resource}
import io.findify.featury.feature.{PeriodicCounter, PeriodicCounterSuite}

class RedisPeriodicCounterTest extends PeriodicCounterSuite with RedisMock {
override def makeCounter(): Resource[IO, PeriodicCounter] =
Resource.make(IO(new RedisPeriodicCounter(config, redisClient)))(_ => IO.unit)
}

0 comments on commit ef21263

Please sign in to comment.