Skip to content

Commit

Permalink
cassandra connector (#4)
Browse files Browse the repository at this point in the history
* cassandra connector

* add cassandra container in CI
  • Loading branch information
shuttie authored Jun 15, 2021
1 parent c2c1812 commit 05ba091
Show file tree
Hide file tree
Showing 8 changed files with 172 additions and 10 deletions.
20 changes: 15 additions & 5 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,21 @@ on:

jobs:
build:
services:
redis:
image: "redis:6.2.3"
ports:
- 6379:6379
cassandra:
image: "cassandra:3.11.10"
env:
JVM_OPTS: "-Dcassandra.skip_wait_for_gossip_to_settle=0"
HEAP_NEWSIZE: "32m"
MAX_HEAP_SIZE: "256m"
CASSANDRA_START_RPC: "true"
ports:
- 9042:9042

runs-on: ${{ matrix.platform }}
strategy:
matrix:
Expand All @@ -22,11 +37,6 @@ jobs:
with:
java-version: ${{ matrix.java }}

- name: Start Redis
uses: supercharge/[email protected]
with:
redis-version: 6.2.3

- name: Cache maven packages
uses: actions/cache@v2
env:
Expand Down
5 changes: 5 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,14 @@ lazy val api = (project in file("api"))
.settings(shared: _*)
.dependsOn(core % "test->test;compile->compile")
.dependsOn(redis % "test->test;compile->compile")
.dependsOn(cassandra % "test->test;compile->compile")

lazy val redis = (project in file("connector/redis"))
.settings(shared: _*)
.dependsOn(core % "test->test;compile->compile")

lazy val cassandra = (project in file("connector/cassandra"))
.settings(shared: _*)
.dependsOn(core % "test->test;compile->compile")

sonatypeProfileName := "io.findify"
8 changes: 8 additions & 0 deletions connector/cassandra/build.sbt
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
import Deps._

name := "featury-cassandra"

libraryDependencies ++= Seq(
"com.datastax.oss" % "java-driver-core" % "4.12.0",
"org.cognitor.cassandra" % "cassandra-migration" % "2.4.0_v4"
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
create table if not exists features(
ns text,
scope text,
tenant text,
name text,
id text,
values blob,
primary key ((ns, scope, tenant, id), name)
);
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
package io.findify.featury.connector.cassandra

import cats.effect.{IO, Resource}
import com.datastax.oss.driver.api.core.cql.{AsyncResultSet, BatchStatement, BatchStatementBuilder, BatchType, Row}
import com.datastax.oss.driver.api.core.{CqlSession, CqlSessionBuilder}
import io.findify.featury.model.FeatureValue
import io.findify.featury.model.api.{ReadRequest, ReadResponse}
import io.findify.featury.values.{FeatureStore, StoreCodec}
import io.findify.featury.values.ValueStoreConfig.CassandraConfig
import org.cognitor.cassandra.migration.{Database, MigrationRepository, MigrationTask}
import org.cognitor.cassandra.migration.keyspace.Keyspace
import cats.implicits._

import scala.collection.JavaConverters._
import java.net.InetSocketAddress
import java.nio.ByteBuffer
import java.util

case class CassandraStore(session: CqlSession, codec: StoreCodec) extends FeatureStore {
lazy val read = session.prepare(
"select values from features where ns=? and scope=? and tenant=? and id in ? and name in ?"
)
lazy val write = session.prepare(
"insert into features (ns, scope, tenant, name, id, values) values (?,?,?,?,?,?)"
)
override def read(request: ReadRequest): IO[ReadResponse] = {
for {
bound <- IO {
read.bind(
request.ns.value,
request.scope.value,
request.tenant.value,
util.Arrays.asList(request.ids.map(_.value): _*),
util.Arrays.asList(request.features.map(_.value): _*)
)
}
rows <- IO.fromCompletableFuture(IO(session.executeAsync(bound).toCompletableFuture))
parsed <- parsePage(rows)
} yield {
ReadResponse(parsed)
}
}

def parsePage(rs: AsyncResultSet, acc: List[FeatureValue] = Nil): IO[List[FeatureValue]] = for {
values <- rs.currentPage().asScala.toList.traverse(parseRow).map(next => acc ++ next)
result <-
if (rs.hasMorePages)
IO.fromCompletableFuture(IO(rs.fetchNextPage().toCompletableFuture))
.flatMap(next => parsePage(next, values))
else
IO.pure(values)
} yield {
result
}

def parseRow(row: Row): IO[FeatureValue] = {
IO.fromEither(codec.decode(row.getByteBuffer("values").array()))
}

override def write(batch: List[FeatureValue]): Unit = {
val stmt = batch
.foldLeft(BatchStatement.builder(BatchType.UNLOGGED))((builder, value) =>
builder.addStatement(
write.bind(
value.key.ns.value,
value.key.scope.value,
value.key.tenant.value,
value.key.name.value,
value.key.id.value,
ByteBuffer.wrap(codec.encode(value))
)
)
)
.build()
session.execute(stmt)
}

}

object CassandraStore {
def makeResource(conf: CassandraConfig) =
Resource.make(for {
_ <- createKeyspace(conf)
migrateSession <- makeSession(conf)
_ <- migrate(conf, migrateSession)
session <- makeSession(conf)
} yield {
new CassandraStore(session, conf.codec)
})(c => IO(c.session.close()))

def createKeyspace(conf: CassandraConfig) = IO {
val builder = CqlSession.builder()
conf.hosts.foreach(host => builder.addContactPoint(new InetSocketAddress(host, conf.port)))
val session = builder.withLocalDatacenter(conf.dc).build()
session.execute(
s"create keyspace if not exists ${conf.keyspace} with replication = {'class' : 'SimpleStrategy', 'replication_factor' : ${conf.replication}}"
)
session.close()
}

def makeSession(conf: CassandraConfig) = IO {
val builder = CqlSession.builder()
conf.hosts.foreach(host => builder.addContactPoint(new InetSocketAddress(host, conf.port)))
builder.withLocalDatacenter(conf.dc).withKeyspace(conf.keyspace).build()
}

def migrate(conf: CassandraConfig, session: CqlSession): IO[Unit] = IO {
val db = new Database(session, new Keyspace(conf.keyspace))
val task = new MigrationTask(db, new MigrationRepository())
task.migrate()
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package io.findify.featury.connector.cassandra

import cats.effect.{IO, Resource}
import io.findify.featury.StoreTestSuite
import io.findify.featury.values.FeatureStore
import io.findify.featury.values.StoreCodec.ProtobufCodec
import io.findify.featury.values.ValueStoreConfig.CassandraConfig

class CassandraStoreTest extends StoreTestSuite {
val conf = CassandraConfig(List("localhost"), 9042, "datacenter1", "dev", None, ProtobufCodec, 1)
override def storeResource: Resource[IO, FeatureStore] = CassandraStore.makeResource(conf)
}
6 changes: 1 addition & 5 deletions connector/redis/build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,5 @@ import Deps._
name := "featury-redis"

libraryDependencies ++= Seq(
"org.typelevel" %% "log4cats-core" % log4catsVersion,
"org.typelevel" %% "log4cats-slf4j" % log4catsVersion,
"redis.clients" % "jedis" % "3.6.0"
"redis.clients" % "jedis" % "3.6.0"
)

scalacOptions += "-Ypartial-unification"
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,15 @@ sealed trait ValueStoreConfig {}
object ValueStoreConfig {
case class RedisConfig(host: String, port: Int, codec: StoreCodec) extends ValueStoreConfig
case class MemoryConfig() extends ValueStoreConfig
case class CassandraConfig(
hosts: List[String],
port: Int,
dc: String,
keyspace: String,
ttl: Option[Int],
codec: StoreCodec,
replication: Int
) extends ValueStoreConfig

implicit val animalConfHint = new FieldCoproductHint[ValueStoreConfig]("type") {
override def fieldValue(name: String) = name.dropRight("Config".length).toLowerCase
Expand All @@ -22,6 +31,7 @@ object ValueStoreConfig {
}
implicit val redisConfigReader = deriveReader[RedisConfig]
implicit val memConfigReader = deriveReader[MemoryConfig]
implicit val cassandraReader = deriveReader[CassandraConfig]
implicit val valueStoreReader = deriveReader[ValueStoreConfig]

}

0 comments on commit 05ba091

Please sign in to comment.