Skip to content

Commit

Permalink
Merge branch 'branch-1.2.1' of https://github.com/Tencent/angel into …
Browse files Browse the repository at this point in the history
…branch-1.2.1
  • Loading branch information
paynie committed Oct 26, 2017
2 parents d4992ba + d0abdbc commit a28460f
Show file tree
Hide file tree
Showing 4 changed files with 144 additions and 152 deletions.
6 changes: 3 additions & 3 deletions docs/algo/spark_on_angel_optimizer.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,9 @@ Spark mllib中Logistic Regression算法做了很多数据预处理的逻辑,

| item | Spark | Spark on Angel | 加速比例 |
|---|---|---|---|
|SGD LR (step_size=0.05,maxIter=100) | 2.9 hour | 2.1 hour | 27.6% |
|L-BFGS LR (m=10, maxIter=50) | 2 hour | 1.3 hour | 35.0% |
|OWL-QN LR (m=10, maxIter=50) | 3.3 hour | 1.9 hour | 42.4% |
|SGD LR (step_size=0.05,maxIter=100) | 2.9 hour | 1.5 hour | 48.3% |
|L-BFGS LR (m=10, maxIter=50) | 2 hour | 1.0 hour | 50.0% |
|OWL-QN LR (m=10, maxIter=50) | 3.3 hour | 1.4 hour | 57.6% |

如上数据所示,Spark on Angel相较于Spark在训练LR模型时有不同程度的加速;对于越复杂的模型,其加速的比例越大。
同时值得强调的是,Spark on Angel的算法逻辑实现与纯Spark的实现没有太多的差别,大大方便了广大Spark用户。
39 changes: 21 additions & 18 deletions docs/overview/spark_on_angel.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,28 +26,34 @@ Angel从v1.0.0版本开始,就加入了**PS-Service**的特性,不仅仅可
* 利用Spark的Context,和Angel的配置,创建AngelContext,在Driver端负责全局的初始化和启动工作

* **PSClient**
* 负责PSVector与local value直接的运算(包括pull、push、increment), 以及PSVector与PSVector之间的运算(包括大部分的代数运算);同时还支持PSF(用户自定义的PS函数)
* PSClient所有运算会被封装到RemotePSVector和BreezePSVector。
* PSClient集成了PSVector和PSMatrix的所有初始化、运算、Pull/Push等操作
* 包括三部分Initializer,VectorOps,MatrixOps;分别对应PS的初始化操作,PSVector运算操作和PSMatrix运算操作

* **PSModelPool**
* PSModelPool对应了Angel PS上的一个矩阵,PSModelPool负责PSVector的申请、回收、销毁等工作。
* **PSModel**
* PSModel是PS server上PSVector/PSMatrix的总称,包含着PSClient对象
* PSModel是PSVector和PSMatrix的父类

* **PSVetorProxy/PSVector**
* PSVectorProxy是PSVector(包括RemotePSVector和BreezePSVector)的代理,指向Angel PS上的某个PSVector。
* PSVector的RemotePSVector和BreezePSVector封装了在不同场景下的PSVector的运算。RemotePSVector提供了PSVector与local value直接的运算(包括pull、push、increment),而BreezePSVector提供了PSVector与PSVector之间的运算(包括大部分的代数运算),以及PSF(用户自定义的PS函数)
* **PSVector**
* 包括DensePSVecotr和SparsePSVector
* PSVector的申请:通过`PSVector.dense(dim: Int, capacity: Int = 50)`申请PSVector,会创建一个维度为`dim`,容量为`capacity`的VectorPool,同一个VectorPool内的两个PSVector可以做运算。
通过`PSVector.duplicate(psVector)`,申请一个与`psVector`在同一个VectorPool的PSVector。
* PSVector有两个装饰类:`BreezePSVector`和`CachedPSVector`,`BreezePSVector`使PSVector可以支持Breeze算法库里的Vector运算。而`CachedPSVector`支持PSVector在Pull/Push过程中的缓存功能。

* **PSMatrix**
* 包括DensePSMatrix和SparsePSMatrix
* PSMatrix的创建和销毁:通过`PSMatrix.dense(rows: Int, cols: Int)`创建,当PSMatrix不再使用后,需要手动调用`destory`销毁该Matrix

使用Spark on Angel的简单代码如下:

```Scala

val psContext PSContext.getOrCreate(spark.sparkContext)
val pool = psContext.createModelPool(dim, capacity)
val psVector = pool.createModel(0.0)
PSContext.getOrCreate(spark.sparkContext)
val psVector = PSVector.dense(dim, capacity)
rdd.map { case (label , feature) =>
psVector.increment(feature)
...
}
println("feature sum size:" + psVector.mkRemote.size())
println("feature sum:" + psVector.pull.mkString(" "))
```

## 3. 启动流程
Expand All @@ -56,8 +62,7 @@ Spark on Angel本质上是一个Spark任务。Spark启动后,driver通过Angel
Spark driver的执行流程
- 启动SparkSession
- 启动PSContext
- 创建PSModelPool
- 申请PSVector
- 申请PSVector/PSMatrix
- 执行算法逻辑
- 终止PSContext和SparkSession

Expand Down Expand Up @@ -101,13 +106,11 @@ def runOWLQN(trainData: RDD[(Vector, Double)], dim: Int, m: Int, maxIter: Int):

def runOWLQN(trainData: RDD[(Vector, Double)], dim: Int, m: Int, maxIter: Int): Unit = {

val pool = PSContext.createModelPool(dim, 20)

val initWeightPS = pool.createZero().mkBreeze()
val l1regPS = pool.createZero().mkBreeze()
val initWeightPS = PSVector.dense(dim, 20).toBreeze()
val l1regPS = PSVector.duplicate(initWeightPS.component).zero().toBreeze

val owlqn = new OWLQN(maxIter, m, l1regPS, tol)
val states = owlqn.iterations(CostFunc(trainData), initWeightPS)
val states = owlqn.iterations(PSCostFunc(trainData), initWeightPS)
………

Expand Down
193 changes: 94 additions & 99 deletions docs/programmers_guide/spark_on_angel_programing_guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,93 +54,93 @@ val context = PSContext.getOrCreate(spark.sparkContext)
// 第一次启动时,需要传入SparkContext
val context = PSContext.getOrCreate(spark.sparkContext)

// 此后,就无需传入SparkContext,直接的PSContext
val context = PSContext.getOrCreate()
// 此后,直接通过PSContext.instance()获取context
val context = PSContext.instance()

// 终止PSContext
PSContext.stop()
```

- 申请/销毁PSModelPoool
PSModelPool在Angel PS上其实是一个矩阵,矩阵列数是dim,行数是capacity。
可以申请多个不同大小的PSModelPool。

```scala
val pool = context.createModelPool(dim, capacity)
context.destroyModelPool(pool)
```

### 4. PSModelPool
PSModelPool在Angel PS上其实是一个矩阵,矩阵列数是`dim`,行数是`capacity`
同一个Application中,可以申请多个不同大小的PSModelPool。
可以从PSModelPool申请PSVector,存放在PSModle上PSVector的维度都是`dim`
PSModelPool只能存放、管理维度为`dim`的PSVector。

注意:同一个Pool内的PSVector才能做运算。

```scala
// 用Array数据初始化一个PSVector,array的维度必须与pool维度保持一致
val arrayProxy = pool.createModel(array)
// PSVector的每个维度都是value
val valueProxy = pool.createModel(value)

// 全0的PSVector
val zeroProxy = pool.createZero()
// 随机的PSVector, 随机数服从均匀分布
val uniformProxy = pool.createRandomUniform(0.0, 1.0)
// 随机的PSVector, 随机数服从正态分布
val normalProxy = pool.createRandomNormal(0.0, 1.0)
```

使用之后的PSVector,可以手动delete、也可以放之不管系统会自动回收;delete后的PSVector就不能再使用。
```scala
pool.delete(vectorPorxy)
```

### 5. PSVectorProxy/PSVector
PSVectorProxy是PSVector(包括BreezePSVector和RemotePSVector)的代理,指向Angel PS上的某个PSVector。
而PSVector的BreezePSVector和RemotePSVector封装了在不同场景下的PSVector的运算。

- PSVectorProxy和PSVector(BreezePSVector和RemotePSVector)之间的转换

```scala
// PSVectorProxy to BreezePSVector、RemotePSVector
val brzVector = vectorProxy.mkBreeze()
val remoteVector = vectorProxy.mkRemote()
### 4. PSVector
PSVector是PSModel的子类,同时PSVector有DensePSVector/SparsePSVector和BreezePSVector/CachedPSVector四种不同的实现。DensePSVector/SparsePSVector是两种不同数据格式的PSVector,而BreezePSVector/CachedPSVector是两种不同功能的PSVector。

在介绍PSVector之前,需要先了解一下PSVectorPool的概念;PSVectorPool在Spark on Angel的编程接口中不会显式地接触到,但需要了解其概念。

- PSVectorPool
PSVectorPool本质上是Angel PS上的一个矩阵,矩阵列数是`dim`,行数是`capacity`
PSVectorPool负责PSVector的申请、自动回收。自动回收类似于Java的GC功能,PSVector对象使用后不用手动delete。
同一个PSVectorPool里的PSVector的维度都是`dim`,同一个Pool里的PSVector才能做运算。

- PSVector的申请和初始化
PSVector第一次申请的时候,必须通过PSVector的伴生对象中dense/sparse方法申请。
dense/sparse方法会创建PSVectorPool,因此需要传入dimension和capacity参数。

通过duplicate方法可以申请一个与已有psVector对象同Pool的PSVector。

```scala
// 第一次申请DensePSVector和SparsePSVector
// capacity提供了默认参数
val dVector = PSVector.dense(dim, capacity)
val sVector = PSVector.sparse(dim, capacity)

// 从现有的psVector duplicate出新的PSVector
val samePoolVector = PSVector.duplicate(dVector)

// 初始化
// fill with 1.0
dVector.fill(1.0)
// 初始化dVector,使dVector的元素服从[-1.0, 1.0]的均匀分布
dVector.randomUniform(-1.0, 1.0)
// 初始化dVector,使dVector的元素服从N(0.0, 1.0)的正态分布
dVector.randomNormal(0.0, 1.0)
```
- DensePSVector VS. SparsePSVector
顾名思义,DensePSVector和SparsePSVector是针对稠密和稀疏两种不同的数据形式设计的PSVector

- BreezePSVector VS. CachedPSVector
BreezePSVector和CachedPSVector是封装了不同运算功能的PSVector装饰类。

BreezePSVector面向于Breeze算法库,封装了同一个PSVectorPool里PSVector之间的运算。包括常用的math运算和blas运算,BreezePSVector实现了Breeze内部的NumbericOps操作,因此BreezePSVector支持+,-,* 这样的操作

```scala
val brzVector1 = brzVector2 :* 2.0 + brzVector3
```
也可以显式地调用Breeze.math和Breeze.blas里的操作。

CachedPSVector为Pull、increment/mergeMax/mergeMin提供了Cache的功能,减少这些操作和PS交互的次数。
如,pullWithCache会加Pull下来的Vector缓存到本地,下次Pull同一个Vector时,直接读取缓存的Vector;
incrementWithCache会将多次的increment操作在本地聚合,最后通过flush操作,将本地聚合的结果increment到PSVector。

```scala
val cacheVector = PSVector.dense(dim).toCache
rdd.map { case (label , feature) =>
// 并没有立即更新psVector
cacheVector.incrementWithCache(feature)
}
// flushIncrement会将所有executor上的缓存的cacheVector的increment结果,累加到cacheVector
cacheVector.flushIncrement
```

// BreezePSVector、RemotePSVector to PSVectorProxy
val vectorProxy = brzVector.proxy
val vectorProxy = remoteVector.proxy
### 5. PSMatrix
PSMatrix是Angel PS上的矩阵,其有DensePSMatrix和SparsePSMatrix两种实现。

// BreezePSVector, RemotePSVector之间的转换
val remoteVector = brzVector.toRemote()
val brzVector = remoteVector.toBreeze()
```
- PSMatrix的创建和销毁
PSMatrix通过伴生对象中的dense/sparse方法申请对应的matrix。
PSVector会有PSVectorPool自动回收、销毁无用的PSVector,而PSMatrix需要手动调用destroy方法销毁PS上的matrix

- RemotePSVector
RemotePSVector封装了PSVector和本地Array之间的操作
如果需要对指定PSMatrix的分区参数,通过rowsInBlock/colsInBlock指定每个分区block的大小。

```scala
// pull PSVector到本地
val localArray = remoteVector.pull()
// push 本地的Array到Angel PS
remoteVector.push(localArray)
// 将本地的Array累加到Angel PS上的PSVector
remoteVector.increment(localArray)

// 本地的Array和PSVector取最大值、最小值
remoteVector.mergeMax(localArray)
remoteVector.mergeMin(localArray)
```
// 创建、初始化
val dMatrix = DensePSMatrix.dense(rows, cols, rowsInBlock, colsInBlock)
val sMatrix = SparsePSMatrix.sparse(rows, cols)

- BreezePSVector
BreezePSVector封装了同一个PSModelPool里PSVector之间的运算。包括常用的math运算和blas运算
BreezePSVector实现了Breeze内部的NumbericOps操作,因此BreezePSVector支持+,-,* 这样的操作
dMatrix.destroy()

```scala
val brzVector1 = 2.0 * brzVector2 + brzVector3
// Pull/Push操作
val array = dMatrix.pull(rowId)
dMatrix.push(rowId, array)
```
也可以显式地调用Breeze.math和Breeze.blas里的操作。

### 6. 支持自定义的PS function

Expand Down Expand Up @@ -192,51 +192,46 @@ public class MulScalar implements MapFunc {

下面是将RDD[(label, feature)]中的所有feature都累加到PSVector中。

```java
```scala
val dim = 10
val poolCapacity = 40
val capacity = 40

val context = PSContext.getOrCreate()
val pool = context.createModelPool(dim, poolCapacity)
val psProxy = pool.zero()
val psVector = PSVector.dense(dim, capacity).toCache

rdd.foreach { case (label , feature) =>
psProxy.mkRemote.increment(feature)
psProxy.incrementWithCache(feature)
}
psVector.flushIncrement

println("feature sum:" + psProxy.pull())
println("feature sum:" + psVector.pull().mkString(" "))
```

- Example 2: Gradient Descent实现

下面是一个简单版本的Gradient Descent的PS实现
```java
val context = PSContext.getOrCreate()
val pool = context.createModelPool(dim, poolCapacity)
val w = pool.createModel(initWeights)
val gradient = pool.zeros()
下面是一个简单版本的Gradient Descent的PS实现
注:这个例子里的instance的label是-1和1。

```scala

val w = PSVector.dense(dim).fill(initWeights)

for (i <- 1 to ITERATIONS) {
val totalG = gradient.mkRemote()
val gradient = PSVector.duplicate(w)

val nothing = points.mapPartitions { iter =>
val brzW = new DenseVector(w.mkRemote.pull())
val nothing = instance.mapPartitions { iter =>
val brzW = new DenseVector(w.pull())

val subG = iter.map { p =>
p.x * (1 / (1 + math.exp(-p.y * brzW.dot(p.x))) - 1) * p.y
val subG = iter.map { case (label, feature) =>
feature * (1 / (1 + math.exp(-label * brzW.dot(feature))) - 1) * label
}.reduce(_ + _)

totalG.incrementAndFlush(subG.toArray)
gradient.increment(subG.toArray)
Iterator.empty
}
nothing.count()

w.mkBreeze += -1.0 * gradent.mkBreeze
gradient.mkRemote.fill(0.0)
w.toBreeze :+= gradent.toBreeze :* -1.0
}

println("feature sum:" + w.mkRemote.pull())

gradient.delete()
w.delete()
println("w:" + w.pull().mkString(" "))
```
58 changes: 26 additions & 32 deletions docs/tutorials/spark_on_angel_quick_start.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,36 +44,30 @@ Spark on Angel的任务本质上是一个Spark的Application,完成Spark on An
## Example Code: Gradient Descent的Angel PS实现

下面是一个简单版本的Gradient Descent的PS实现

```Scala

val points:RDD[Point] = _

val wPS = PSVector.dense(DIM).fill(0.0)
val gradientPS = PSVector.dense(DIM).fill(0.0)

for (i <- 1 to ITERATIONS) {
val totalG = gradientPS.toCache

val trigger = points.mapPartitions { iter =>
val brzW = new DenseVector(wPS.pull())

val subG = iter.map { p =>
p.x * (1 / (1 + math.exp(-p.y * brzW.dot(p.x))) - 1) * p.y
}.reduce(_ + _)

totalG.push(subG.toArray())
Iterator.empty
}
trigger.count()

wPS.toBreeze += -1.0 * gradientPS.toBreeze
gradientPS.fill(0.0)
}

println("feature sum:" + wPS.pull())

gradientPS.delete()
wPS.delete()
}
```java
val w = PSVector.dense(dim)
val sc = SparkSession.builder().getOrCreate().sparkContext

for (i <- 1 to ITERATIONS) {
val bcW = sc.broadcast(w.pull())
val totalG = PSVector.duplicate(w)

val tempRDD = trainData.mapPartitions { iter =>
val breezeW = new DenseVector(bcW.value)

val subG = iter.map { case (feat, label) =>
val brzData = new DenseVector[Double](feat.toArray)
val margin: Double = -1.0 * breezeW.dot(brzData)
val gradientMultiplier = (1.0 / (1.0 + math.exp(margin))) - label
val gradient = brzData * gradientMultiplier
gradient
}.reduce(_ + _)
totalG.increment(subG.toArray)
Iterator.empty
}
tempRDD.count()
w.toBreeze -= (totalG.toBreeze :* (1.0 / sampleNum))
}

println(s"w: ${w.pull().mkString(" ")}")
```

0 comments on commit a28460f

Please sign in to comment.