Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Spark-Enabled Cost-Distance #1999

Merged
merged 28 commits into from
Mar 31, 2017

Conversation

jamesmcclain
Copy link
Member

@jamesmcclain jamesmcclain commented Feb 2, 2017

  • Simplified cost-distance implementation
  • Implementation of distributed cost-distance algorithm
  • Unit Tests
  • Comments

Fixes #1980

screenshot from 2017-02-06 13 14 25

screenshot from 2017-02-06 13 22 14

screenshot from 2017-02-14 14 10 36

screenshot from 2017-02-14 14 10 40

@jamesmcclain jamesmcclain force-pushed the feature/cost-distance branch from 5f51e4b to afd4dd0 Compare February 2, 2017 14:53
@lossyrob
Copy link
Member

lossyrob commented Feb 2, 2017

party like it's PR 1999

@jamesmcclain
Copy link
Member Author

🎉 🎉 🎉 🎉 🎉 🎉 🎉

*
* 2. https://github.com/ngageoint/mrgeo/blob/0c6ed4a7e66bb0923ec5c570b102862aee9e885e/mrgeo-mapalgebra/mrgeo-mapalgebra-costdistance/src/main/scala/org/mrgeo/mapalgebra/CostDistanceMapOp.scala
*/
object MrGeoCostDistance {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the reference in the doc string is sufficient pointing back to Mr Geo, the type names should probably just be CostDistance

(col, row, friction, 0.0)
})
accumulator.add((key, costs))
})
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see the loop version does a count to force the execution and cache the RDD. Is it possible to avoid this foreach with a similar method, where this accumulator is moved to the map, that RDD persisted, and the RDD counted to force execution? Perhaps not the best optimization, but your opinion on this would probably help me understand the logic better.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The foreach is strictly for the side-effects (to set the accumulator).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Gotcha, but it causes a double iteration of the RDD. The accumulator could be set in the map, and then the map executed with caching, so that the RDD transformation is cached and iterated over once, and the accumulator value is set.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh right

@jamesmcclain jamesmcclain changed the title [WiP] Spark-Enabled Cost-Distance Spark-Enabled Cost-Distance Feb 9, 2017
@jamesmcclain
Copy link
Member Author

All comments addressed.

Copy link
Member

@lossyrob lossyrob left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking good, the last thing is the performance question of the single threaded cost distance changes

val cost2 = calcCost(c1, r1, dir(c, r), cost)
if (cost2.isDefined) {
curMinCost = math.min(curMinCost, source + cost1.get + cost2.get)
def compute(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have you benchmarked the single-threaded case for this change? It would be good to get numbers to prove this method works faster/just as fast.

Copy link
Member Author

@jamesmcclain jamesmcclain Feb 21, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have not run any benchmarks, but (informally) I did not detect any noticeable speed difference.

That having been said, it is possible (even likely) that there is a difference, because the changes that I made were very self-consciously deoptimizations.

That was necessary because the previous algorithm seemed to avoid putting elements into the priority queue whenever it could. That was laudable in the single-threaded case, but in order to maintain coherence in the case were points need to be transferred from some adjacent tile to the present one, I think that it makes sense to have one and only one place for points to enter (to ensure that points coming in from adjacent tiles and ponits that were already in the tile are treated exactly the same).

After getting the basics working, I then took another pass an re-added some mild optimizations. Generally speaking, I am pretty comfortable with were the single-threaded version is.

*/
def apply[K: (? => SpatialKey), V: (? => Tile)](
friction: RDD[(K, V)] with Metadata[TileLayerMetadata[K]],
points: Seq[Point],
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Discussion point, not a change request: What would it take to create an option to pass in an RDD of points? I'm curious if there are use cases for this, and if it would be possible with the current algorithm.

Copy link
Member Author

@jamesmcclain jamesmcclain Feb 21, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that there would be some mechanical changes needed in order to support an RDD of points, probably the friction layer and the points would need to be joined (either logically or literally) as the first step.

As a practical matter, I do not think that supporting RDDs of points would add any real capability. If someone really did have so many points that they do not fit into memory on one machine and some appreciable percentage of those points have unique projections into the raster, then it is likely that the raster layer is so large (in terms of pixels) that this algorithm would run out of driver memory before even reaching the second iteration (driver memory requirements are a function of the total layer size -- this is unavoidable because propagation across tile boundaries must be coordinated by the driver).

On the other hand, if someone has a source RDD whose points can be clustered (or otherwise reduced) into a fairly small list, then that would be viable option (but that would be unrelated to this work).

@jamesmcclain
Copy link
Member Author

Copy link
Member

@pomadchin pomadchin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering about ident in atomic expressions like 1+1; should it be fixed to 1 + 1? (code style question)

*/
def generateEmptyQueue(cols: Int, rows: Int): Q = {
new PriorityQueue(
(cols*16 + rows*16), new java.util.Comparator[Cost] {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unnecessary parentheses.

* "Propagating radial waves of travel cost in a grid."
* International Journal of Geographical Information Science 24.9 (2010): 1391-1413.
*
* @param friction Friction tile; pixels are interpreted as "second per meter"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@param frictionTile

val costTile = generateEmptyCostTile(cols, rows)
val q: Q = generateEmptyQueue(cols, rows)

points.foreach({ case (col, row) =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cfor here would be a bit faster (according to benchmarks on aspect-tif.tif):

cfor(0)(_ < points.length, _ + 1) { i =>
  val (col, row) = points(i)
  q.add((col, row, frictionTile.getDouble(col, row), 0.0))
}

require(frictionTile.dimensions == costTile.dimensions)

def inTile(col: Int, row: Int): Boolean =
((0 <= col && col < cols) && (0 <= row && row < rows))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unnecessary parentheses here too, and in methods below.

* @param col The column of the given location
* @param row The row of the given location
* @param friction1 The instantaneous cost (friction) at the neighboring location
* @param cost The length of the best-known path from a source to the neighboring location
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@param neighborCost


costs.count
previous.unpersist()
} while (accumulator.value.size > 0)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

accumulator.value.nonEmpty

val resolution = computeResolution(friction)
logger.debug(s"Computed resolution: $resolution meters/pixel")

val bounds = friction.metadata.bounds.asInstanceOf[KeyBounds[K]]
Copy link
Member

@pomadchin pomadchin Mar 22, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Eh, dirty cast here, you can use pattern match on Bounds[K], and to throw Exception (for example) on EmptyBounds

logger.debug(s"Computed resolution: $resolution meters/pixel")

val bounds = friction.metadata.bounds.asInstanceOf[KeyBounds[K]]
val minKey = implicitly[SpatialKey](bounds.minKey)
Copy link
Member

@pomadchin pomadchin Mar 22, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

instead of the direct implicitly call, it is possible to write smth like:

val KeyBounds(minKey: SpatialKey, maxKey: SpatialKey) = bounds
val (minKeyCol, minKeyRow) = minKey

everywhere below can be changed, as there are lots of direct implicitly calls and tuple._{1/2} usages. I'll omit all comments below about the same thing.


// Construct return value and return it
val metadata = TileLayerMetadata(DoubleCellType, md.layout, md.extent, md.crs, md.bounds)
val rdd = costs.map({ case (k, _, cost) => (k, cost.asInstanceOf[Tile]) })
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's possible just to upcast DoubleArrayTile up to Tile:

(k, cost: Tile)

val mt = md.mapTransform
val kv = friction.first
val key = implicitly[SpatialKey](kv._1)
val tile = implicitly[Tile](kv._2)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to avoid the direct implicitly call it's possible provide explicit type:

val (key: SpatialKey, tile: Tile) = kv

@pomadchin pomadchin assigned jamesmcclain and unassigned pomadchin Mar 22, 2017
@pomadchin
Copy link
Member

Benchmarks. Let me know if that's not enough and i need to test more, but i think it is already obvious that it's significantly slower.

@jamesmcclain
Copy link
Member Author

No, I'll just pull the original one out of version control and use that for the single-tile case.

@lossyrob lossyrob dismissed their stale review March 24, 2017 20:10

Changes addressed in grisha's review

@jamesmcclain jamesmcclain force-pushed the feature/cost-distance branch 4 times, most recently from 2e2307c to 91777a6 Compare March 27, 2017 13:19
James McClain added 5 commits March 27, 2017 09:23
Signed-off-by: James McClain <[email protected]>
Increase size by one binary order of magnitude.  Still proportional to
the square root of the area of the typical expected tile.
James McClain added 18 commits March 27, 2017 09:23
Force cost values up as quickly as possible to allow paths to be pruned.
This code implements the first iteration of the n-iteration distributed
cost-distance algorithm.
05 Feb 19:44:24 INFO [cdistance.CostDistance$] - MILLIS: 155272
05 Feb 19:48:29 INFO [cdistance.CostDistance$] - MILLIS: 153500
Naming this accumulator adds a lot of clutter to the Spark UI.
Previously, only points were accepted.
@jamesmcclain jamesmcclain force-pushed the feature/cost-distance branch from 91777a6 to b7b3580 Compare March 27, 2017 13:23
@jamesmcclain jamesmcclain force-pushed the feature/cost-distance branch from b7b3580 to 30fa62f Compare March 27, 2017 13:24
val costTile = generateEmptyCostTile(cols, rows)
val q: Q = generateEmptyQueue(cols, rows)

var i = 0; while (i < points.length) {
Copy link
Member

@pomadchin pomadchin Mar 27, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why don't to use just cfor? It's a common dep for the whole raster package. A bit less ugly ^^'.

val keys = mutable.ArrayBuffer.empty[SpatialKey]
val bounds = md.layout.mapTransform(g.envelope)

var row = bounds.rowMin; while (row <= bounds.rowMax) {
Copy link
Member

@pomadchin pomadchin Mar 27, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here cfor can be used too

other
}
def add(pair: KeyCostPair): Unit = {
this.synchronized { list.append(pair) }
Copy link
Member

@pomadchin pomadchin Mar 30, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I totally forgot about it, but @echeipesh noticed these locks and reminded about it. @jamesmcclain, have you considered any java concurrent collection usage here, instead of synchronized everywhere?

Copy link
Member Author

@jamesmcclain jamesmcclain Mar 31, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The most appropriate structure that I found was CopyOnWriteArrayList. The documentation for that class says the following:

This is ordinarily too costly, but may be more efficient than alternatives when traversal operations vastly outnumber mutations, and is useful when you cannot or don't want to synchronize traversals, yet need to preclude interference among concurrent threads. The "snapshot" style iterator method uses a reference to the state of the array at the point that the iterator was created. This array never changes during the lifetime of the iterator, so interference is impossible and the iterator is guaranteed not to throw ConcurrentModificationException. The iterator will not reflect additions, removals, or changes to the list since the iterator was created. Element-changing operations on iterators themselves (remove, set, and add) are not supported. These methods throw UnsupportedOperationException.

(I am pointing in particular to the first sentence.)

I do not think that a synchronized concurrent data structure is warranted here since there is only one place for new data to go (the end of the list).

@echeipesh echeipesh modified the milestones: 1.1, 1.2 Mar 31, 2017
@echeipesh echeipesh merged commit d3943c8 into locationtech:master Mar 31, 2017
@jamesmcclain jamesmcclain deleted the feature/cost-distance branch March 31, 2017 19:33
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants