Skip to content
This repository has been archived by the owner on Feb 8, 2019. It is now read-only.

Commit

Permalink
fix GEARPUMP-152 upgrade Storm support to 1.0.x
Browse files Browse the repository at this point in the history
  • Loading branch information
darionyaphet committed Jun 28, 2016
1 parent 1b6a234 commit a7d8c67
Show file tree
Hide file tree
Showing 23 changed files with 182 additions and 76 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@

package org.apache.gearpump.experiments.storm.util;

import backtype.storm.utils.TimeCacheMap;

import org.apache.storm.utils.TimeCacheMap;

/**
* Wrapper class to suppress "deprecation" warning, as scala doesn't support the suppression.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,19 @@ package org.apache.gearpump.experiments.storm.main
import java.io.{File, FileOutputStream, FileWriter}
import java.nio.ByteBuffer
import java.nio.channels.{Channels, WritableByteChannel}
import java.util.{HashMap => JHashMap, Map => JMap, UUID}
import java.util.{List => JList, HashMap => JHashMap, Map => JMap, UUID}
import org.apache.storm.Config
import org.apache.storm.generated._
import org.apache.storm.nimbus.NimbusInfo
import org.apache.storm.security.auth.{ReqContext, ThriftConnectionType, ThriftServer}
import org.apache.storm.utils.Utils

import scala.collection.JavaConverters._
import scala.concurrent.duration.Duration
import scala.concurrent.{Await, Future}

import akka.actor.ActorSystem
import com.typesafe.config.ConfigValueFactory
import backtype.storm.Config
import backtype.storm.generated._
import backtype.storm.security.auth.{ThriftConnectionType, ThriftServer}
import backtype.storm.utils.Utils
import org.apache.storm.shade.org.json.simple.JSONValue
import org.apache.storm.shade.org.yaml.snakeyaml.Yaml
import org.slf4j.Logger
Expand Down Expand Up @@ -231,7 +233,8 @@ class GearpumpNimbus(clientContext: ClientContext, stormConf: JMap[AnyRef, AnyRe
val topologySummaryList = topologies.map { case (name, _) =>
new TopologySummary(name, name, 0, 0, 0, 0, "")
}.toSeq
new ClusterSummary(List[SupervisorSummary]().asJava, 0, topologySummaryList.asJava)
new ClusterSummary(List[SupervisorSummary]().asJava,
topologySummaryList.asJava, List[NimbusSummary]().asJava)
}

override def beginFileDownload(file: String): String = {
Expand Down Expand Up @@ -284,6 +287,94 @@ class GearpumpNimbus(clientContext: ClientContext, stormConf: JMap[AnyRef, AnyRe
new File(jar).delete()
topologies -= name
}

override def updateBlobReplication(key: String, replication: Int): Int = {
throw new UnsupportedOperationException
}

override def setLogConfig(name: String, config: LogConfig): Unit = {
throw new UnsupportedOperationException
}

override def downloadBlobChunk(session: String): ByteBuffer = {
throw new UnsupportedOperationException
}

override def beginBlobDownload(key: String): BeginDownloadResult = {
throw new UnsupportedOperationException
}

override def cancelBlobUpload(session: String): Unit = {
throw new UnsupportedOperationException
}

override def getTopologyHistory(user: String): TopologyHistoryInfo = {
throw new UnsupportedOperationException
}

override def getTopologyPageInfo(id: String, window: String, is_include_sys: Boolean): TopologyPageInfo = {
throw new UnsupportedOperationException
}

override def getBlobMeta(key: String): ReadableBlobMeta = {
throw new UnsupportedOperationException
}

override def createStateInZookeeper(key: String): Unit = {
throw new UnsupportedOperationException
}

override def setBlobMeta(key: String, meta: SettableBlobMeta): Unit = {
throw new UnsupportedOperationException
}

override def getComponentPendingProfileActions(id: String, component_id: String, action: ProfileAction): JList[ProfileRequest] = {
throw new UnsupportedOperationException
}

override def debug(name: String, component: String, enable: Boolean, samplingPercentage: Double): Unit = {
throw new UnsupportedOperationException
}

override def getComponentPageInfo(topology_id: String, component_id: String, window: String, is_include_sys: Boolean): ComponentPageInfo = {
throw new UnsupportedOperationException
}

override def setWorkerProfiler(id: String, profileRequest: ProfileRequest): Unit = {
throw new UnsupportedOperationException
}

override def finishBlobUpload(session: String): Unit = {
throw new UnsupportedOperationException
}

override def beginCreateBlob(key: String, meta: SettableBlobMeta): String = {
throw new UnsupportedOperationException
}

override def getBlobReplication(key: String): Int = {
throw new UnsupportedOperationException
}

override def deleteBlob(key: String): Unit = {
throw new UnsupportedOperationException
}

override def listBlobs(session: String): ListBlobsResult = {
throw new UnsupportedOperationException
}

override def getLogConfig(name: String): LogConfig = {
throw new UnsupportedOperationException
}

override def beginUpdateBlob(key: String): String = {
throw new UnsupportedOperationException
}

override def uploadBlobChunk(session: String, chunk: ByteBuffer): Unit = {
throw new UnsupportedOperationException
}
}

case class TopologyData(topology: StormTopology, config: JMap[AnyRef, AnyRef], jar: String)
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@

package org.apache.gearpump.experiments.storm.main

import backtype.storm.Config
import backtype.storm.utils.Utils
import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption}
import org.apache.gearpump.util.Constants._
import org.apache.gearpump.util.{AkkaApp, LogUtil, Util}
import org.apache.storm.Config
import org.apache.storm.utils.Utils

object GearpumpStormClient extends AkkaApp with ArgumentsParser {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,12 @@ package org.apache.gearpump.experiments.storm.processor

import java.util.{Collection => JCollection, List => JList}

import backtype.storm.task.IOutputCollector
import backtype.storm.tuple.Tuple
import org.apache.gearpump.experiments.storm.topology.TimedTuple
import org.apache.gearpump.experiments.storm.util.StormConstants._
import org.apache.gearpump.experiments.storm.util.StormOutputCollector
import org.apache.gearpump.streaming.task.UpdateCheckpointClock
import org.apache.storm.task.IOutputCollector
import org.apache.storm.tuple.Tuple

/**
* this is used by Storm bolt to emit messages
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ package org.apache.gearpump.experiments.storm.producer

import java.util.{List => JList}

import backtype.storm.spout.{ISpout, ISpoutOutputCollector}
import org.apache.gearpump.TimeStamp
import org.apache.gearpump.experiments.storm.util.StormOutputCollector
import org.apache.storm.spout.{ISpoutOutputCollector, ISpout}

case class PendingMessage(id: Object, messageTime: TimeStamp, startTime: TimeStamp)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,6 @@ import java.util.{HashMap => JHashMap, List => JList, Map => JMap}

import akka.actor.ActorRef
import akka.pattern.ask
import backtype.storm.Config
import backtype.storm.generated.{Bolt, ComponentCommon, SpoutSpec, StormTopology}
import backtype.storm.metric.api.IMetric
import backtype.storm.spout.{ISpout, SpoutOutputCollector}
import backtype.storm.task.{GeneralTopologyContext, IBolt, OutputCollector, TopologyContext}
import backtype.storm.tuple.{Fields, Tuple, TupleImpl}
import backtype.storm.utils.Utils
import clojure.lang.Atom
import org.apache.commons.io.{FileUtils, IOUtils}
import org.apache.gearpump.experiments.storm.processor.StormBoltOutputCollector
Expand All @@ -43,6 +36,13 @@ import org.apache.gearpump.streaming.DAG
import org.apache.gearpump.streaming.task.{GetDAG, TaskId, TaskContext, StartTime}
import org.apache.gearpump.util.{Constants, LogUtil}
import org.apache.gearpump.{Message, TimeStamp}
import org.apache.storm.Config
import org.apache.storm.generated.{Bolt, ComponentCommon, SpoutSpec, StormTopology}
import org.apache.storm.metric.api.IMetric
import org.apache.storm.spout.{SpoutOutputCollector, ISpout}
import org.apache.storm.task.{OutputCollector, IBolt, GeneralTopologyContext, TopologyContext}
import org.apache.storm.tuple.{Fields, Tuple, TupleImpl}
import org.apache.storm.utils.Utils
import org.slf4j.Logger

import scala.collection.JavaConverters._
Expand All @@ -55,13 +55,15 @@ import scala.concurrent.{Await, Future}
trait GearpumpStormComponent {
/**
* invoked at Task.onStart
* @param startTime task start time
*
* @param startTime task start time
*/
def start(startTime: StartTime): Unit

/**
* invoked at Task.onNext
* @param message incoming message
*
* @param message incoming message
*/
def next(message: Message): Unit

Expand Down Expand Up @@ -228,7 +230,8 @@ object GearpumpStormComponent {

/**
* invoked at TICK message when "topology.tick.tuple.freq.secs" is configured
* @param freq tick frequency
*
* @param freq tick frequency
*/
def tick(freq: Int): Unit = {
if (null == tickTuple) {
Expand All @@ -241,7 +244,8 @@ object GearpumpStormComponent {
/**
* normalize general config with per component configs
* "topology.transactional.id" and "topology.tick.tuple.freq.secs"
* @param stormConfig general config for all components
*
* @param stormConfig general config for all components
* @param componentCommon common component parts
*/
private def normalizeConfig(stormConfig: Map[AnyRef, AnyRef],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,6 @@ import java.lang.{Iterable => JIterable}
import java.util.{ArrayList => JArrayList, HashMap => JHashMap, List => JList, Map => JMap}

import akka.actor.ActorSystem
import backtype.storm.Config
import backtype.storm.generated._
import backtype.storm.utils.{ThriftTopologyUtils, Utils}
import org.apache.gearpump.cluster.UserConfig
import org.apache.gearpump.experiments.storm.processor.StormProcessor
import org.apache.gearpump.experiments.storm.producer.StormProducer
Expand All @@ -34,6 +31,9 @@ import org.apache.gearpump.experiments.storm.util.StormUtil._
import org.apache.gearpump.streaming.Processor
import org.apache.gearpump.streaming.task.Task
import org.apache.gearpump.util.LogUtil
import org.apache.storm.Config
import org.apache.storm.generated.{ComponentCommon, GlobalStreamId, Grouping, Bolt, SpoutSpec, StormTopology}
import org.apache.storm.utils.{ThriftTopologyUtils, Utils}
import org.slf4j.Logger

// TODO: Refactor this file, we should disable using of JavaConversions
Expand Down Expand Up @@ -64,7 +64,8 @@ object GearpumpStormTopology {
* 3. provides interface for Gearpump applications to use Storm topology
*
* an implicit ActorSystem is required to create Gearpump processors
* @param name topology name
*
* @param name topology name
* @param topology Storm topology
* @param sysConfig configs from "defaults.yaml" and custom config file
* @param appConfig config submitted from user application
Expand Down Expand Up @@ -124,7 +125,8 @@ private[storm] class GearpumpStormTopology(

/**
* creates Gearpump processor from Storm spout
* @param spoutId spout id
*
* @param spoutId spout id
* @param spoutSpec spout spec
* @param stormConfig merged storm config
* @param system actor system
Expand All @@ -141,7 +143,8 @@ private[storm] class GearpumpStormTopology(

/**
* creates Gearpump processor from Storm bolt
* @param boltId bolt id
*
* @param boltId bolt id
* @param boltSpec bolt spec
* @param stormConfig merged storm config
* @param system actor system
Expand Down Expand Up @@ -192,7 +195,8 @@ private[storm] class GearpumpStormTopology(
* 1. use "topology.tasks" if defined; otherwise use parallelism_hint
* 2. parallelism should not be larger than "topology.max.task.parallelism" if defined
* 3. component config overrides system config
* @param stormConfig System configs without merging "topology.tasks" and
*
* @param stormConfig System configs without merging "topology.tasks" and
* "topology.max.task.parallelism" of component
* @return number of task instances for a component
*/
Expand Down Expand Up @@ -223,7 +227,8 @@ private[storm] class GearpumpStormTopology(

/**
* merge component configs "topology.kryo.decorators" and "topology.kryo.register"
* @param componentConfigs list of component configs
*
* @param componentConfigs list of component configs
* @param allConfig existing configs without merging component configs
* @return the two configs merged from all the component configs and existing configs
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@ package org.apache.gearpump.experiments.storm.topology

import java.util.{List => JList}

import backtype.storm.task.GeneralTopologyContext
import backtype.storm.tuple.{Tuple, TupleImpl}

import org.apache.gearpump.TimeStamp
import org.apache.storm.task.GeneralTopologyContext
import org.apache.storm.tuple.{TupleImpl, Tuple}

/**
* this carries Storm tuple values in the Gearpump world
Expand All @@ -36,10 +36,11 @@ private[storm] class GearpumpTuple(
val sourceStreamId: String,
@transient val targetPartitions: Map[String, Array[Int]]) extends Serializable {
/**
* creates a Storm [[backtype.storm.tuple.Tuple]] to be passed to a Storm component
* creates a Storm [[org.apache.storm.tuple.Tuple]] to be passed to a Storm component
* this is needed for each incoming message
* because we cannot get [[backtype.storm.task.GeneralTopologyContext]] at deserialization
* @param topologyContext topology context used for all tasks
* because we cannot get [[org.apache.storm.task.GeneralTopologyContext]] at deserialization
*
* @param topologyContext topology context used for all tasks
* @return a Tuple
*/
def toTuple(topologyContext: GeneralTopologyContext, timestamp: TimeStamp): Tuple = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,13 @@
package org.apache.gearpump.experiments.storm.util

import java.util.{List => JList}
import org.apache.storm.generated.GlobalStreamId
import org.apache.storm.grouping.CustomStreamGrouping
import org.apache.storm.task.TopologyContext
import org.apache.storm.tuple.Fields

import scala.util.Random

import backtype.storm.generated.GlobalStreamId
import backtype.storm.grouping.CustomStreamGrouping
import backtype.storm.task.TopologyContext
import backtype.storm.tuple.Fields

/**
* Grouper is identical to that in storm but return gearpump partitions for storm tuple values
Expand Down Expand Up @@ -114,7 +115,7 @@ class AllGrouper(numTasks: Int) extends Grouper {
/**
* CustomGrouper allows users to specify grouping strategy
*
* @param grouping see [[backtype.storm.grouping.CustomStreamGrouping]]
* @param grouping see [[org.apache.storm.grouping.CustomStreamGrouping]]
*/
class CustomGrouper(grouping: CustomStreamGrouping) extends Grouper {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,14 @@
package org.apache.gearpump.experiments.storm.util

import java.util.{ArrayList => JArrayList, Iterator => JIterator, List => JList, Map => JMap}
import org.apache.storm.generated.{JavaObject, GlobalStreamId, Grouping}
import org.apache.storm.grouping.CustomStreamGrouping
import org.apache.storm.task.TopologyContext
import org.apache.storm.tuple.Fields
import org.apache.storm.utils.Utils

import scala.collection.JavaConverters._

import backtype.storm.generated.{GlobalStreamId, Grouping, JavaObject}
import backtype.storm.grouping.CustomStreamGrouping
import backtype.storm.task.TopologyContext
import backtype.storm.tuple.Fields
import backtype.storm.utils.Utils
import org.slf4j.Logger

import org.apache.gearpump._
Expand Down
Loading

0 comments on commit a7d8c67

Please sign in to comment.