Skip to content

Commit

Permalink
add streaming 2.0 version
Browse files Browse the repository at this point in the history
  • Loading branch information
Zhiqiang-He committed Dec 5, 2016
1 parent 4401a63 commit 1006bb3
Show file tree
Hide file tree
Showing 353 changed files with 20,568 additions and 3,150 deletions.
83 changes: 64 additions & 19 deletions streaming/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,22 @@
<version>1.0</version>
<relativePath>../pom.xml</relativePath>
</parent>

<artifactId>streaming-base</artifactId>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-source-plugin</artifactId>
<executions>
<execution>
<id>attach-sources</id>
<goals>
<goal>jar</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
Expand All @@ -40,15 +51,23 @@
<addMavenDescriptor>false</addMavenDescriptor>
</archive>
<excludes>
<exclude>logback.xml</exclude>
<exclude>log4j2.xml</exclude>
<exclude>streaming-site.xml</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
</build>

<properties>
<log4j-over-slf4j.version>1.6.6</log4j-over-slf4j.version>
<log4j.version>2.1</log4j.version>
</properties>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>commons-lang</groupId>
<artifactId>commons-lang</artifactId>
Expand Down Expand Up @@ -80,19 +99,55 @@
</exclusions>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.0.6</version>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>13.0</version>
</dependency>
<dependency>
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
<version>1.9</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty</artifactId>
<version>3.6.3.Final</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<version>${log4j.version}</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<version>${log4j.version}</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>${log4j.version}</version>
</dependency>
<!--
说明:slf4j-log4j12和log4j-over-slf4j不能同时引用,会有冲突
但是由于CQL客户端已经解决了该冲突,
所以在CQL客户端可以使用slf4j-log4j12包用来屏蔽异常的log4j日志打印
但是在dependcy打包的时候,应该排除掉slf4j-log4j12
-->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.6.5</version>
<artifactId>slf4j-log4j12</artifactId>
<version>${log4j-over-slf4j.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>log4j-over-slf4j</artifactId>
<version>${log4j-over-slf4j.version}</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
<version>0.8.2.2</version>
<version>0.10.1.0</version>
<exclusions>
<exclusion>
<groupId>log4j</groupId>
Expand All @@ -104,15 +159,5 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>13.0</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty</artifactId>
<version>3.6.3.Final</version>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,6 @@ public abstract class Application
/**
* <默认构造函数>
*
* @param appName 应用名称、
* @param config 配置属性
* @throws StreamingException 流处理异常
*/
public Application(String appName, StreamingConfig config)
throws StreamingException
Expand All @@ -93,8 +90,6 @@ public Application(String appName, StreamingConfig config)
/**
* 增加数据流描述Schema
*
* @param schema 事件类型(schema)
* @throws StreamingException 流处理异常
*/
public void addEventSchema(TupleEventType schema)
throws StreamingException
Expand All @@ -112,7 +107,6 @@ public void addEventSchema(TupleEventType schema)
/**
* 添加功能
*
* @param operator 功能算子
*/
public void addFunctionStream(IRichOperator operator)
{
Expand All @@ -122,7 +116,6 @@ public void addFunctionStream(IRichOperator operator)
/**
* 添加输出算子
*
* @param output 输出算子
*/
public void addOutputStream(IRichOperator output)
{
Expand All @@ -132,7 +125,6 @@ public void addOutputStream(IRichOperator output)
/**
* 添加输入算子
*
* @param input 输入算子
*/
public void addInputStream(IRichOperator input)
{
Expand All @@ -142,7 +134,6 @@ public void addInputStream(IRichOperator input)
/**
* 获取流算子的所有schema
*
* @return 算子schema管理
*/
public EventTypeMng getStreamSchema()
{
Expand All @@ -152,9 +143,6 @@ public EventTypeMng getStreamSchema()
/**
* 通过输出流名称寻找对应算子
*
* @param streamName 流名称
* @return 算子
* @throws StreamingException 流处理异常
*/
public IRichOperator getOperatorByOutputStreamName(String streamName)
throws StreamingException
Expand Down Expand Up @@ -195,7 +183,6 @@ private IRichOperator getOperatorFromInputStreams(String streamName)
/**
* 获取所有的输入流
*
* @return 输入流列表
*/
public List<IRichOperator> getInputStreams()
{
Expand Down Expand Up @@ -229,7 +216,6 @@ private IRichOperator getOperatorFromFunctionStreams(String streamName)
/**
* 获取所有的功能流算子
*
* @return 功能流算子列表
*/
public List<IRichOperator> getFunctionstreams()
{
Expand All @@ -251,7 +237,6 @@ private IRichOperator getOperatorFromOutputStreams(String streamName)
/**
* 获取所有的输出流
*
* @return 输出流列表
*/
public List<IRichOperator> getOutputStreams()
{
Expand All @@ -261,8 +246,6 @@ public List<IRichOperator> getOutputStreams()
/**
* 根据名称获取对应的schema
*
* @param name 名称
* @return schema
*/
public IEventType getEventType(String name)
{
Expand All @@ -272,8 +255,6 @@ public IEventType getEventType(String name)
/**
* 获取已经排好序的功能算子,这个功能算子包含output算子
*
* @return 算子列表
* @throws StreamingException 流处理异常
*/
public List<IRichOperator> genFunctionOpsOrder()
throws StreamingException
Expand All @@ -284,7 +265,6 @@ public List<IRichOperator> genFunctionOpsOrder()
/**
* 获取应用程序名称
*
* @return 应用程序名称
*/
public String getAppName()
{
Expand All @@ -294,7 +274,6 @@ public String getAppName()
/**
* 获取配置属性
*
* @return 配置属性
*/
public StreamingConfig getConf()
{
Expand All @@ -304,41 +283,55 @@ public StreamingConfig getConf()
/**
* 应用远程提交
*
* @throws StreamingException 流处理异常
*/
public abstract void launch()
throws StreamingException;

/**
* 查询应用程序
*
* @return 如果存在,返回true
* @throws StreamingException 任务查询异常
*/
public abstract ApplicationResults getApplications()
throws StreamingException;

/**
* 检查任务是否存在
*
* @return 如果存在,返回true
* @throws StreamingException 任务删除异常
*/
public abstract boolean isApplicationExists()
throws StreamingException;

/**
* 删除远程应用
*
* @throws StreamingException 任务删除异常
*/
public abstract void killApplication()
throws StreamingException;

/**
* 设置用户已经打包好的Jar包
*
* @param userJar 用户jar包
*/
public abstract void setUserPackagedJar(String userJar);

/**
* 去活应用程序
* 将应用程序从运行状态变为deactive状态,从而暂停应用程序的运行。
*/
public abstract void deactiveApplication()
throws StreamingException;

/**
* 激活应用程序
* 将deactive状态的应用程序激活,变成active状态
*/
public abstract void activeApplication()
throws StreamingException;

/**
* 重分配应用程序
* 重新分配application的worker数量
*/
public abstract void rebalanceApplication(int workerNum)
throws StreamingException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,6 @@ public class ApplicationFactory
/**
* 创建应用程序
*
* @param conf 配置属性, 待创建的应用程序的类名称就保存在这里
* @param name 应用程序名称
* @return 创建好的应用程序
* @throws StreamingException 创建异常
*/
public static Application createApplication(StreamingConfig conf, String name)
throws StreamingException
Expand All @@ -64,7 +60,7 @@ public static Application createApplication(StreamingConfig conf, String name)
}

StreamingException exception = new StreamingException(e, ErrorCode.UNKNOWN_SERVER_COMMON_ERROR);
LOG.error(exception.getMessage(), e);
LOG.error(ErrorCode.UNKNOWN_SERVER_COMMON_ERROR.getFullMessage(), e);
throw exception;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,21 +27,17 @@ public interface ApplicationResults
{
/**
* 获取列格式化字符串
* @return 列格式化字符串
*/
String getFormatter();

/**
* 获取应用程序查询结果的标题头
* @return 标题头
*/
String[] getResultHeader();

/**
* 获取查询结果
* 查询结果的列数量必须和标题头的数组数量一致
* @param container 应用程序名称中必须包含的字符串,不区分大小写
* @return 查询结果
*/
List<String[]> getResults(String container);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ public enum DistributeType

/**
* <默认构造函数>
* @param desc 描述
*/
private DistributeType(String desc)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,7 @@ public class GroupInfo implements Serializable
private DistributeType ditributeType;

private List<String> fields;

// private CustomStreamGrouping grouping;


public String getStreamName()
{
return streamName;
Expand Down Expand Up @@ -71,20 +69,5 @@ public void setFields(List<String> fields)
{
this.fields = fields;
}

/**
* 尚不考虑用户自定义分发方式
*/
/*
public CustomStreamGrouping getGrouping()
{
return grouping;
}

public void setGrouping(CustomStreamGrouping grouping)
{
this.grouping = grouping;
}
*/

}
Loading

0 comments on commit 1006bb3

Please sign in to comment.