Skip to content

Commit

Permalink
Merge pull request #26 from sergkh/uri-format
Browse files Browse the repository at this point in the history
URI support
  • Loading branch information
sergkh authored Jan 12, 2017
2 parents d1c0af0 + 4ebe75f commit ca87565
Show file tree
Hide file tree
Showing 9 changed files with 84 additions and 72 deletions.
6 changes: 3 additions & 3 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ language: scala
scala:
- 2.11.5
- 2.11.8
- 2.12.0
env:
global:
- REDIS_TAG=3.2.0
Expand All @@ -25,7 +26,6 @@ before_script:
- export REDIS_VERSION="$(redis-cli INFO SERVER | sed -n 2p)"
- echo $REDIS_VERSION
script:
- sbt clean coverage test package
- sbt clean test package
after_success:
- sbt coveralls
- test "${TRAVIS_PULL_REQUEST}" = 'false' && sh "$TRAVIS_BUILD_DIR/.bintray.sh" && sbt publish bintrayRelease
- test "${TRAVIS_PULL_REQUEST}" = 'false' && test "${TRAVIS_TAG}" != '' && sh "$TRAVIS_BUILD_DIR/.bintray.sh" && sbt publish bintrayRelease
51 changes: 30 additions & 21 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,13 @@
* protocol pipelining
* connection multiplexing
* in-memory redis implementation which can come in handy for testing (in active development)
* scala 2.12 support

Changes from [original version](https://github.com/andreyk0/redis-client-scala-netty):
* added support of [scripting](http://redis.io/commands#scripting) commands
* moved to scala [futures](http://docs.scala-lang.org/overviews/core/futures.html)
* binary safe requests encoding
* moved to maven
* moved to sbt
* in-memory client
* Distributed lock implementation ([Redlock](http://redis.io/topics/distlock))

Expand All @@ -27,7 +28,7 @@ Unit tests assume redis is running on localhost on port 6379, THEY WILL FLUSH AL
```scala
import com.fotolog.redis.RedisClient

val r = RedisClient("localhost", 6379)
val r = RedisClient("redis://:db_password@localhost:6379")

// basic

Expand All @@ -50,53 +51,57 @@ Unit tests assume redis is running on localhost on port 6379, THEY WILL FLUSH AL
For convenience any API method has both synchronous and asynchronous versions:

```scala

import com.fotolog.redis.RedisClient
val r = RedisClient("redis://localhost")
val fooFuture = r.getAsync[String]("foo")

val future = r.lpopAsync[String]("ll") // prefetch data
future.map(_.toLong) // map to another value

```

Synchronous methods by default wait up to 1 minute for the future to complete, but one can specify duration on client initialization:

```scala
import com.fotolog.redis.RedisClient
import scala.concurrent.duration._

val r = RedisClient("localhost", timeout = 20 seconds)
val r = RedisClient("redis://localhost", timeout = 20 seconds)

```

Dealing with Lua scripts:

```scala

val resultsList = c.eval[Int]("return ARGV[1];", ("anyKey", "2"))
val scriptHash = c.scriptLoad("return ARGV[1];")
val evalResultsList = c.evalsha[Int](scriptHash, ("key", "4"))
import com.fotolog.redis.RedisClient
val r = RedisClient("redis://localhost")

val resultsList = r.eval[Int]("return ARGV[1];", ("anyKey", "2"))
val scriptHash = r.scriptLoad("return ARGV[1];")
val evalResultsList = r.evalsha[Int](scriptHash, ("key", "4"))

```

Hyper Log Log:

```scala
import com.fotolog.redis.RedisClient
val r = RedisClient("redis://localhost")
val anyAddedH1 = r.pfadd("h1", "a", "b", "c", "d") // true
val anyAddedH2 = r.pfadd("h2", "a", "b", "f", "g") // true
val count = r.pfcount("h1") // 4

val anyAddedH1 = c.pfadd("h1", "a", "b", "c", "d") // true
val anyAddedH2 = c.pfadd("h2", "a", "b", "f", "g") // true
val count = c.pfcount("h1") // 4

c.pfmerge("merge-result", "h1", "h2")
val mergeCount = c.pfcount("merge-result") // 6
r.pfmerge("merge-result", "h1", "h2")
val mergeCount = r.pfcount("merge-result") // 6

```

Sharding example:

```scala
import com.fotolog.redis.{RedisCluster, RedisClient, RedisHost}
import com.fotolog.redis.{RedisCluster, RedisClient}

val shardingHashFunc = (s:String) => s.hashCode // shard on string values
val cluster = new RedisCluster[String](shardingHashFunc, RedisHost("localhost", 6379) /*, more redis hosts */)
val cluster = new RedisCluster[String](shardingHashFunc, "redis://localhost:6379" /*, more redis hosts */)

val r = cluster("egusername") // get redis client
```
Expand Down Expand Up @@ -124,7 +129,7 @@ Sharding example:

Conversion objects can be passed explicitly:
```scala
c.set("key-name", 15)(intProtobufConverter)
r.set("key-name", 15)(intProtobufConverter)
```

## In-memory client usage
Expand All @@ -134,7 +139,7 @@ client to perform all operations in memory. Also it is possible to emulate sever
similar to in-memory databases in popular embeddable RDMS like H2 or HyperSQL. Following code creates an in-memory database `test`:

```scala
val c = RedisClient("mem:test")
val r = RedisClient("redis-mem://test")
```

Note: feature is in active development so not all operations are supported now.
Expand All @@ -144,7 +149,9 @@ Redlock is a distributed lock implementation on multiple (or one) redis instance
Usage example:

```scala
val redlock = Redlock("192.168.2.11" -> 6379, "192.168.2.12" -> 6379, "192.168.2.13" -> 6379)
import com.fotolog.redis.primitives.Redlock

val redlock = Redlock("redis://192.168.2.11", "redis://192.168.2.12", "redis://192.168.2.13")

val lock = redlock.lock("resource-name", ttl = 60)

Expand All @@ -155,14 +162,16 @@ Usage example:

One have to specify some name common for all application instances. In case of success `lock` method returns object that contains resource name and some random value that should be used to unlock resource:

```scala
redlock.unlock(lock)
```

Manual unlocking is not always necessary as lock will be unlocked automatically on Redis server after `ttl` seconds will pass.

Also there is convenient `withLock` method:

```scala
val redlock = Redlock("192.168.2.15")
val redlock = Redlock("redis://192.168.2.15")
redlock.withLock("resource-name") {
/* Some code that has to be executed only on one instance/thread. */
}
Expand Down
10 changes: 4 additions & 6 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,11 @@ name := "redis-scala"

organization := "com.impactua"

val revision = sys.env.getOrElse("TRAVIS_BUILD_NUMBER", "0-SNAPSHOT")

version := s"""1.3.$revision"""
version := sys.env.getOrElse("TRAVIS_TAG", "1.4.0-SNAPSHOT")

scalaVersion := "2.11.8"

crossScalaVersions := Seq("2.10.4", "2.11.8")
crossScalaVersions := Seq("2.10.4", "2.11.8", "2.12.0")

licenses += ("MIT", url("http://opensource.org/licenses/MIT"))

Expand All @@ -26,6 +24,6 @@ concurrentRestrictions in Global += Tags.limit(Tags.Test, 1)

libraryDependencies ++= Seq(
"io.netty" % "netty" % "3.10.6.Final",
"org.scalatest" %% "scalatest" % "2.2.6" % "test",
"com.storm-enroute" %% "scalameter" % "0.7" % "test"
"org.scalatest" %% "scalatest" % "3.0.1" % "test",
"com.storm-enroute" %% "scalameter" % "0.8.2" % "test"
)
2 changes: 1 addition & 1 deletion project/build.properties
Original file line number Diff line number Diff line change
@@ -1 +1 @@
sbt.version=0.13.8
sbt.version=0.13.13
5 changes: 1 addition & 4 deletions project/plugins.sbt
Original file line number Diff line number Diff line change
@@ -1,5 +1,2 @@
addSbtPlugin("org.scoverage" % "sbt-scoverage" % "1.3.5")
addSbtPlugin("me.lessis" % "bintray-sbt" % "0.3.0")

addSbtPlugin("org.scoverage" % "sbt-coveralls" % "1.1.0")

addSbtPlugin("me.lessis" % "bintray-sbt" % "0.3.0")
56 changes: 32 additions & 24 deletions src/main/scala/com/fotolog/redis/RedisClient.scala
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
package com.fotolog.redis

import java.net.URI
import java.util.concurrent.TimeUnit

import com.fotolog.redis
import com.fotolog.redis.commands._
import com.fotolog.redis.connections.{InMemoryRedisConnection, Netty3RedisConnection, RedisConnection}

import scala.concurrent.duration.Duration
import scala.concurrent.{Await, Future}
import scala.util.Try

object RedisClient {

Expand All @@ -18,31 +19,38 @@ object RedisClient {
}

@throws(classOf[AuthenticationException])
def apply(host: String = "localhost",
port: Int = 6379,
password: Option[String] = None,
timeout: Duration = DEFAULT_TIMEOUT) = {
val client = if (host.startsWith("mem:")) {
new RedisClient(new InMemoryRedisConnection(host.substring("mem:".length)), timeout)
} else {
// in case host specified as "host:port" then override port
val (aHost, aPort) = if(host.contains(":")) {
val Array(h, p) = host.split(":")
(h, p.toInt)
} else {
(host, port)
}

new RedisClient(new Netty3RedisConnection(aHost, aPort), timeout)
}
// redis://[:password@]host[:port][/db-number]
def apply(uri: String = "redis://localhost:6379", timeout: Duration = DEFAULT_TIMEOUT) = {

for (pass <- password) {
if (!client.auth(pass)) {
throw new AuthenticationException("Authentication failed")
}
}
val redisUri = new URI(uri)

Option(redisUri.getScheme) match {
case Some("redis") | None =>
val port = if (redisUri.getPort > 0) redisUri.getPort else 6379
val client = new RedisClient(new Netty3RedisConnection(redisUri.getHost, port), timeout)

for (userInfo <- Option(redisUri.getUserInfo)) {
val password = userInfo.stripPrefix(":")

client
if (!client.auth(password)) {
throw AuthenticationException("Authentication failed")
}
}

for (db <- Option(redisUri.getPath) if db.nonEmpty) {
val dbIndex = Try(db.toInt).filter(_ >= 0).getOrElse {
throw new IllegalArgumentException(s"Invalid path value: '$db' in URI: '$uri'. Has to be a valid database index")
}

client.select(dbIndex)
}

client
case Some("redis-mem") =>
new RedisClient(new InMemoryRedisConnection(redisUri.getHost), timeout)
case Some(unknownSchema) =>
throw new IllegalArgumentException(s"Unsupported schema: '$unknownSchema' in URI: '$uri'. Valid schemas are 'redis' and 'redis-mem://'")
}
}

}
Expand Down
17 changes: 9 additions & 8 deletions src/main/scala/com/fotolog/redis/RedisCluster.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,27 +2,28 @@ package com.fotolog.redis

import java.util.concurrent.atomic.AtomicReference

case class RedisHost(host: String = "localhost", port: Int = 6379)

class RedisCluster[Shard](hash: (Shard)=>Int, hosts: RedisHost*) {
val h2cRef = new AtomicReference[Map[RedisHost, RedisClient]](Map.empty)
class RedisCluster[Shard](hash: (Shard) => Int, uris: String*) {
val h2cRef = new AtomicReference[Map[String, RedisClient]](Map.empty)

def apply(s: Shard): RedisClient = {
val h = hosts(hash(s).abs % hosts.length)
val h = uris(hash(s).abs % uris.length)
h2cRef.get.get(h) match {
case Some(c) => if (c.isConnected) c else newClient(h)
case None => newClient(h)
}
}

private[RedisCluster] def newClient(h: RedisHost): RedisClient = h.synchronized {
private[RedisCluster] def newClient(uri: String): RedisClient = uri.synchronized {
var h2c = h2cRef.get

def newClientThreadSafe(): RedisClient = {
val c = RedisClient(h.host, h.port)
while(!h2cRef.compareAndSet(h2c, h2c + (h->c))) h2c = h2cRef.get
val c = RedisClient(uri)
while(!h2cRef.compareAndSet(h2c, h2c + (uri -> c))) h2c = h2cRef.get
c
}
h2c.get(h) match {

h2c.get(uri) match {
case None => newClientThreadSafe()
case Some(c) => if (c.isConnected){ c } else { c.shutdown(); newClientThreadSafe() }
}
Expand Down
5 changes: 2 additions & 3 deletions src/main/scala/com/fotolog/redis/primitives/Redlock.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import com.fotolog.redis.{RedisCluster, RedisClient}
* Usage example:
*
* {{{
* val redlock = Redlock("192.168.2.11" -> 6379, "192.168.2.12" -> 6379, "192.168.2.13" -> 6379)
* val redlock = Redlock("192.168.2.11:6379", "192.168.2.12:6379", "192.168.2.13:6379")
* val lock = redlock.lock("resource-name")
*
* if(lock.successful) {
Expand Down Expand Up @@ -130,9 +130,8 @@ object Redlock {
private[redis] final val UNLOCK_SCRIPT = "if redis.call(\"get\",KEYS[1]) == ARGV[1] then\n return redis.call(\"del\", KEYS[1])\n else\n return 0\n end"

def apply(client: RedisClient): Redlock = new Redlock(Seq(client))
def apply(hosts: (String, Int)*): Redlock = new Redlock(hosts.map( h => RedisClient(h._1, h._2)))
def apply(uris: String*): Redlock = new Redlock(uris.map(uri => RedisClient(uri)))
def apply(clients: Array[RedisClient]): Redlock = new Redlock(clients)
def apply[T](host: String, port: Int = 6379): Redlock = new Redlock(Seq(RedisClient(host, port)))
}

trait Lock {
Expand Down
4 changes: 2 additions & 2 deletions src/test/scala/com/fotolog/redis/TestClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ trait TestClient extends BeforeAndAfterEach with BeforeAndAfterAll { this: Suite
client.shutdown()
}

def createClient = RedisClient(sys.env.getOrElse("TEST_DB_HOST", "localhost"), password = sys.env.get("TEST_DB_PASS"))
def createClient = RedisClient(sys.env.getOrElse("TEST_DB_URI", "redis://localhost"))

def createInMemoryClient = RedisClient("mem:test")
def createInMemoryClient = RedisClient("redis-mem://test")
}

0 comments on commit ca87565

Please sign in to comment.