Skip to content

Commit

Permalink
Merge pull request #65 from ashwanthkumar/aggregator-support
Browse files Browse the repository at this point in the history
Aggregator support
  • Loading branch information
ashwanthkumar authored Jun 2, 2017
2 parents f6ed7c9 + a27fe07 commit 386efeb
Show file tree
Hide file tree
Showing 16 changed files with 422 additions and 17 deletions.
14 changes: 7 additions & 7 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
language: java
sudo: false
jdk:
- oraclejdk8
env:
global:
secure: MmyMAHnhCNXJIclWXIZIIb2ZpJZknvaqZ2HUpxf+LIsr0dSwg3OhULc6hOEsktusio4rf7/mK5FihOEz4Lb4FFAzTuTn23YyPjjNY4BHIoqF+v5sgy0z8EGUl+DymoJExQ02CRwy6gyIk0MUYbcuZMbUEg5S8v/JvLL1hgEhv+8eAN3dGrMoog8NsDuYlhzZlFXXm8DN83PZ/9boagWq+WGHajNCUC7Ra7y4TfhRUKEDZfoln39xeazQwdUFq9iZ0urGM+QhZxL6wf2G68DAHfTmor/ahVZlOMcbmhakVqxVYz17f0ovKlvD2ybBDzaqae5j/AiTkEmHTEvXzXeSJT0VHE0/JuSHH+r+1tJxnrT+zg6s9YSOUDaZRkIWmLFe/jWV3X8AZ9MwETZDQptyTgxHqIYM8HBQA8j3CkjzgLMFkZfqwVgJRc2QjUwKKttmLEM7mJlxuyNoqD5fNHhFJCJBnJtJQAV3J6CEse4a1qOQRJoFBGIp8u5l/ej9Bwdf+WVp+mXt3uQOfr6L6PgYE+Ll9gy/gAPwn8IgllYN9q/ypGc9BO2fe77uWfDgQsTjwNfNnA0wKf3YYYunNnMN58oFBjukpZZKiyUOjgEdrrcCX/I6zMRDaA2YrW4s+JG1Idr2przEouPTbNjanw0hqQzz96Gslsv6FNhRYJaJicw=
- secure: MmyMAHnhCNXJIclWXIZIIb2ZpJZknvaqZ2HUpxf+LIsr0dSwg3OhULc6hOEsktusio4rf7/mK5FihOEz4Lb4FFAzTuTn23YyPjjNY4BHIoqF+v5sgy0z8EGUl+DymoJExQ02CRwy6gyIk0MUYbcuZMbUEg5S8v/JvLL1hgEhv+8eAN3dGrMoog8NsDuYlhzZlFXXm8DN83PZ/9boagWq+WGHajNCUC7Ra7y4TfhRUKEDZfoln39xeazQwdUFq9iZ0urGM+QhZxL6wf2G68DAHfTmor/ahVZlOMcbmhakVqxVYz17f0ovKlvD2ybBDzaqae5j/AiTkEmHTEvXzXeSJT0VHE0/JuSHH+r+1tJxnrT+zg6s9YSOUDaZRkIWmLFe/jWV3X8AZ9MwETZDQptyTgxHqIYM8HBQA8j3CkjzgLMFkZfqwVgJRc2QjUwKKttmLEM7mJlxuyNoqD5fNHhFJCJBnJtJQAV3J6CEse4a1qOQRJoFBGIp8u5l/ej9Bwdf+WVp+mXt3uQOfr6L6PgYE+Ll9gy/gAPwn8IgllYN9q/ypGc9BO2fe77uWfDgQsTjwNfNnA0wKf3YYYunNnMN58oFBjukpZZKiyUOjgEdrrcCX/I6zMRDaA2YrW4s+JG1Idr2przEouPTbNjanw0hqQzz96Gslsv6FNhRYJaJicw=
- secure: rUkD/vXLmdUB/hDBkregZz4YmJoG7Ro91xJbADumf4Lzfm8wO4bK1mpbsGqBDOXvp/the76MPOdu6halDw8njktfXcrwUCPQaXtn8A7+bQW1Kc1L/SpKs5G+ycGVKF8AAYUMMBRs6RX1QHQTwxSOVkOTYD6me9rYZTX6PdcW9fCw7EgIzBc8X2I8tQckCj5DIoLECxUo2bwrkL7KKkNmFPLK7Cm6jddNASqzqZo3pybeGTFuGbAsXN11b2CfINl2srjJ/gH3DDXKFlcAKpPuH7YW+3YIA/15S6xoimUFnLaUILFpNTTwxGmQwZ/h235K6kh0kWHdPdeUlGzl0yN8KgSwS1DFKLfe+VrKSD0441XyW/snwtrlC9jl/jX2A5uxaalVDD27MZj3Z8kpdSHtuaYIZ86d7yNRWqOJMcwkI7K9W4JiocBGZ373EttxVM7dywQuPPu/bwyEI2fq44/JUQD+3AJ2Exgoh1NFrHT+H+NCDXY+x7W8S+lD9adlrmvHZfNZ1I401V8u9mEApw0XWaQCIvKfqXogk4UDtzxAvJKv0J7A1k70xH59OtnU5kHWF5j1805hkVhp/1+AC6UNVaiNLAoRNSH5PQFRtfJz4WyfMmonR+ULtBsFTdw6ON1gxbHAHiAY0WQmcBwUsF3gOn8GZrXKGxizKb6QutH64wI=
- SONATYPE_USERNAME: ashwanthkumar
cache:
directories:
- $HOME/.m2
- $HOME/DENV
after_success:
- git config user.name "Ashwanth Kumar"
- git config user.email "[email protected]"
- git remote add gh-token "https://${GH_TOKEN}@github.com/ashwanthkumar/suuchi.git";
- git fetch gh-token && git fetch gh-token gh-pages:gh-pages;
- sudo pip install mkdocs==0.15.3
- mkdocs gh-deploy -v --clean --remote-name gh-token;
- bash bin/hooks_publish_docs.sh
- bash <(curl -s https://codecov.io/bash)
- mvn deploy --settings travis-settings.xml -DskipTests=true -B
15 changes: 15 additions & 0 deletions bin/hooks_publish_docs.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
#!/usr/bin/env bash

git config user.name "Ashwanth Kumar"
git config user.email "[email protected]"
git remote add gh-token "https://${GH_TOKEN}@github.com/ashwanthkumar/suuchi.git";
git fetch gh-token && git fetch gh-token gh-pages:gh-pages;

# Deploy Docs only for builds out of master and not PRs or tags.
if ([ "$TRAVIS_BRANCH" == "master" ] || [ ! -z "$TRAVIS_TAG" ]) &&
[ "$TRAVIS_PULL_REQUEST" == "false" ]; then
virtualenv --system-site-packages ${HOME}/DENV
source ${HOME}/DENV/bin/activate
pip install mkdocs==0.15.3
mkdocs gh-deploy -v --clean --remote-name gh-token;
fi
1 change: 1 addition & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,7 @@
<configuration>
<excludes>
<exclude>in/ashwanthkumar/suuchi/rpc/generated/*</exclude>
<exclude>in/ashwanthkumar/suuchi/core/tests/*</exclude>
</excludes>
</configuration>
</execution>
Expand Down
17 changes: 16 additions & 1 deletion suuchi-core/pom.xml
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<artifactId>suuchi-core</artifactId>
<version>0.2.18-SNAPSHOT</version>
Expand All @@ -25,6 +26,18 @@
<version>2.8.2</version>
</dependency>

<dependency>
<groupId>com.twitter</groupId>
<artifactId>algebird-core_${scala.lib.version}</artifactId>
<version>0.13.0</version>
</dependency>

<dependency>
<groupId>com.twitter</groupId>
<artifactId>bijection-core_${scala.lib.version}</artifactId>
<version>0.9.5</version>
</dependency>

<dependency>
<groupId>com.typesafe</groupId>
<artifactId>config</artifactId>
Expand Down Expand Up @@ -129,6 +142,8 @@
<goals>
<goal>compile</goal>
<goal>compile-custom</goal>
<goal>test-compile</goal>
<goal>test-compile-custom</goal>
</goals>
</execution>
</executions>
Expand Down
13 changes: 12 additions & 1 deletion suuchi-core/src/main/proto/suuchi.proto
Original file line number Diff line number Diff line change
Expand Up @@ -72,4 +72,15 @@ message ScanResponse {
message KV {
bytes key = 1;
bytes value = 2;
}
}

message ReduceRequest {
}

message ReduceResponse {
int64 output = 1;
}

service Aggregator {
rpc Reduce (ReduceRequest) returns (ReduceResponse);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package in.ashwanthkumar.suuchi.router

import java.util
import java.util.concurrent.TimeUnit

import com.google.common.util.concurrent.Futures
import com.twitter.algebird.Aggregator
import in.ashwanthkumar.suuchi.cluster.MemberAddress
import in.ashwanthkumar.suuchi.rpc.CachedChannelPool
import io.grpc._
import io.grpc.stub.{ClientCalls, MetadataUtils, StreamObserver, StreamObservers}
import org.slf4j.LoggerFactory

import scala.collection.JavaConverters._

trait Aggregation {
def aggregator[ReqT, RespT]: PartialFunction[MethodDescriptor[ReqT, RespT], Aggregator[RespT, Any, RespT]]
}

class AggregationRouter(members: List[MemberAddress], agg: Aggregation) extends ServerInterceptor {
val channelPool = CachedChannelPool()
val log = LoggerFactory.getLogger(classOf[AggregationRouter])

override def interceptCall[ReqT, RespT](incomingRequest: ServerCall[ReqT, RespT], headers: Metadata, next: ServerCallHandler[ReqT, RespT]): ServerCall.Listener[ReqT] = {
val isBroadcastRequest = headers.containsKey(Headers.BROADCAST_REQUEST_KEY)
if (isBroadcastRequest || !agg.aggregator.isDefinedAt(incomingRequest.getMethodDescriptor)) {
next.startCall(incomingRequest, headers)
} else {
// ServerCall.Listener for ServerStreaming methods
headers.put(Headers.BROADCAST_REQUEST_KEY, true)
incomingRequest.request(2)
new ServerCall.Listener[ReqT] {
val aggregator = agg.aggregator.apply(incomingRequest.getMethodDescriptor)
var request: ReqT = _

override def onCancel() = {
log.debug("AggregationRouter#onCancel")
incomingRequest.close(Status.CANCELLED, headers)
}
override def onHalfClose() = {
log.debug("AggregationRouter#onHalfClose")
try {
val gathered = scatter(members, channelPool, incomingRequest.getMethodDescriptor, headers, request)
val reduced = aggregator.apply(gathered.asScala)
incomingRequest.sendHeaders(headers)
incomingRequest.sendMessage(reduced)
incomingRequest.close(Status.OK, headers)
} catch {
case e: Throwable =>
log.error(e.getMessage, e)
incomingRequest.close(Status.INTERNAL.withCause(e), headers)
}
}
override def onReady() = {
log.debug("AggregationRouter#onReady")
}
override def onMessage(message: ReqT) = {
// We don't do the aggregation here but on onHalfClose()
request = message
}
override def onComplete() = {
log.debug("AggregationRouter#onComplete")
}
}
}
}

protected def scatter[ReqT, RespT](nodes: List[MemberAddress], channelPool: CachedChannelPool, methodDescriptor: MethodDescriptor[ReqT, RespT], headers: Metadata, input: ReqT): util.List[RespT] = {
AggregationRouter.scatter(nodes, channelPool, methodDescriptor, headers, input)
}
}

object AggregationRouter {
def scatter[ReqT, RespT](nodes: List[MemberAddress], channelPool: CachedChannelPool, methodDescriptor: MethodDescriptor[ReqT, RespT], headers:Metadata, input: ReqT): util.List[RespT] = {
val scatterRequests = nodes.map(destination => {
val channel = channelPool.get(destination, insecure = true)
val clientCall = ClientInterceptors.interceptForward(channel, MetadataUtils.newAttachHeadersInterceptor(headers))
.newCall(methodDescriptor, CallOptions.DEFAULT.withDeadlineAfter(10, TimeUnit.MINUTES)) // TODO (ashwanthkumar): Make this deadline configurable
ClientCalls.futureUnaryCall(clientCall, input)
})

Futures.allAsList(scatterRequests: _*).get()
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ import io.grpc.Metadata
object Headers {
val ELIGIBLE_NODES = "eligible_nodes"
val REPLICATION_REQUEST = "replication_request"
val BROADCAST_REQUEST = "broadcast_request"

val REPLICATION_REQUEST_KEY = Metadata.Key.of(Headers.REPLICATION_REQUEST, StringMarshaller)
val ELIGIBLE_NODES_KEY = Metadata.Key.of(Headers.ELIGIBLE_NODES, MemberAddressMarshaller)
val BROADCAST_REQUEST_KEY = Metadata.Key.of(Headers.BROADCAST_REQUEST, BooleanMarshaller)
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,14 @@ case object StringMarshaller extends AsciiMarshaller[String] {
override def parseAsciiString(serialized: String): String = serialized
}

/**
* Send a boolean value using AsciiMarshaller
*/
case object BooleanMarshaller extends AsciiMarshaller[Boolean] {
override def toAsciiString(value: Boolean): String = value.toString
override def parseAsciiString(serialized: String): Boolean = serialized.toBoolean
}

/**
* Converts a collection of [[MemberAddress]] to it's external form separated by `|`
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package in.ashwanthkumar.suuchi.rpc

import java.net.InetAddress

import com.google.protobuf.Message
import com.twitter.algebird.Aggregator
import in.ashwanthkumar.suuchi.cluster.MemberAddress
import in.ashwanthkumar.suuchi.router._
import io.grpc.{Server => GServer, _}
Expand Down Expand Up @@ -135,6 +137,16 @@ class Server[T <: ServerBuilder[T]](serverBuilder: ServerBuilder[T], whoami: Mem
this
}

def aggregate(allNodes: List[MemberAddress], service: BindableService, agg: Aggregation): Server[T] = {
aggregate(allNodes, service.bindService(), agg)
}

def aggregate(allNodes: List[MemberAddress], service: ServerServiceDefinition, agg: Aggregation): Server[T] = {
val aggregator = new AggregationRouter(allNodes, agg)
serverBuilder.addService(ServerInterceptors.interceptForward(service, aggregator))
this
}

/**
* Add a service without any routing logic
* @param service Service which is handled locally by the node and not forwarded
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package in.ashwanthkumar.suuchi.rpc

import com.twitter.algebird.{Aggregator, LongRing, Semigroup}
import in.ashwanthkumar.suuchi.router.Aggregation
import in.ashwanthkumar.suuchi.rpc.generated.SuuchiRPC.ReduceResponse
import in.ashwanthkumar.suuchi.rpc.generated.{AggregatorGrpc, SuuchiRPC}
import io.grpc.stub.StreamObserver

class SuuchiAggregatorService extends AggregatorGrpc.AggregatorImplBase {
override def reduce(request: SuuchiRPC.ReduceRequest, responseObserver: StreamObserver[SuuchiRPC.ReduceResponse]) = {
responseObserver.onNext(ReduceResponse.newBuilder().setOutput(1).build())
responseObserver.onCompleted()
}
}

class SumOfNumbers extends Aggregation {
override def aggregator[ReqT, RespT] = {
case AggregatorGrpc.METHOD_REDUCE => new Aggregator[ReduceResponse, Long, ReduceResponse] {
override def prepare(input: ReduceResponse) = input.getOutput
override def semigroup: Semigroup[Long] = LongRing
override def present(reduced: Long) = ReduceResponse.newBuilder().setOutput(reduced).build()
}.asInstanceOf[Aggregator[RespT, Any, RespT]]
}
}
24 changes: 24 additions & 0 deletions suuchi-core/src/test/proto/test.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
syntax = "proto3";

option java_package = "in.ashwanthkumar.suuchi.core.tests";
option java_outer_classname = "SuuchiTestRPC";

message ReduceRequest {
}

message ReduceResponse {
int64 output = 1;
}

service Aggregator {
rpc Reduce (ReduceRequest) returns (ReduceResponse);
}

message FooRequest {
}
message FooResponse {
}

service Random {
rpc Foo (FooRequest) returns (FooResponse);
}
Loading

0 comments on commit 386efeb

Please sign in to comment.