-
Notifications
You must be signed in to change notification settings - Fork 364
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Spark-Enabled Cost-Distance #1999
Spark-Enabled Cost-Distance #1999
Conversation
5f51e4b
to
afd4dd0
Compare
🎉 🎉 🎉 🎉 🎉 🎉 🎉 |
* | ||
* 2. https://github.com/ngageoint/mrgeo/blob/0c6ed4a7e66bb0923ec5c570b102862aee9e885e/mrgeo-mapalgebra/mrgeo-mapalgebra-costdistance/src/main/scala/org/mrgeo/mapalgebra/CostDistanceMapOp.scala | ||
*/ | ||
object MrGeoCostDistance { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the reference in the doc string is sufficient pointing back to Mr Geo, the type names should probably just be CostDistance
(col, row, friction, 0.0) | ||
}) | ||
accumulator.add((key, costs)) | ||
}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see the loop version does a count
to force the execution and cache the RDD. Is it possible to avoid this foreach
with a similar method, where this accumulator is moved to the map
, that RDD persisted, and the RDD counted to force execution? Perhaps not the best optimization, but your opinion on this would probably help me understand the logic better.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The foreach
is strictly for the side-effects (to set the accumulator).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Gotcha, but it causes a double iteration of the RDD. The accumulator could be set in the map, and then the map executed with caching, so that the RDD transformation is cached and iterated over once, and the accumulator value is set.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh right
All comments addressed. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking good, the last thing is the performance question of the single threaded cost distance changes
val cost2 = calcCost(c1, r1, dir(c, r), cost) | ||
if (cost2.isDefined) { | ||
curMinCost = math.min(curMinCost, source + cost1.get + cost2.get) | ||
def compute( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Have you benchmarked the single-threaded case for this change? It would be good to get numbers to prove this method works faster/just as fast.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have not run any benchmarks, but (informally) I did not detect any noticeable speed difference.
That having been said, it is possible (even likely) that there is a difference, because the changes that I made were very self-consciously deoptimizations.
That was necessary because the previous algorithm seemed to avoid putting elements into the priority queue whenever it could. That was laudable in the single-threaded case, but in order to maintain coherence in the case were points need to be transferred from some adjacent tile to the present one, I think that it makes sense to have one and only one place for points to enter (to ensure that points coming in from adjacent tiles and ponits that were already in the tile are treated exactly the same).
After getting the basics working, I then took another pass an re-added some mild optimizations. Generally speaking, I am pretty comfortable with were the single-threaded version is.
*/ | ||
def apply[K: (? => SpatialKey), V: (? => Tile)]( | ||
friction: RDD[(K, V)] with Metadata[TileLayerMetadata[K]], | ||
points: Seq[Point], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Discussion point, not a change request: What would it take to create an option to pass in an RDD of points? I'm curious if there are use cases for this, and if it would be possible with the current algorithm.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that there would be some mechanical changes needed in order to support an RDD of points, probably the friction layer and the points would need to be join
ed (either logically or literally) as the first step.
As a practical matter, I do not think that supporting RDDs of points would add any real capability. If someone really did have so many points that they do not fit into memory on one machine and some appreciable percentage of those points have unique projections into the raster, then it is likely that the raster layer is so large (in terms of pixels) that this algorithm would run out of driver memory before even reaching the second iteration (driver memory requirements are a function of the total layer size -- this is unavoidable because propagation across tile boundaries must be coordinated by the driver).
On the other hand, if someone has a source RDD whose points can be clustered (or otherwise reduced) into a fairly small list, then that would be viable option (but that would be unrelated to this work).
713f6ec
to
065398c
Compare
065398c
to
7116d89
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm wondering about ident in atomic expressions like 1+1
; should it be fixed to 1 + 1
? (code style question)
*/ | ||
def generateEmptyQueue(cols: Int, rows: Int): Q = { | ||
new PriorityQueue( | ||
(cols*16 + rows*16), new java.util.Comparator[Cost] { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unnecessary parentheses.
* "Propagating radial waves of travel cost in a grid." | ||
* International Journal of Geographical Information Science 24.9 (2010): 1391-1413. | ||
* | ||
* @param friction Friction tile; pixels are interpreted as "second per meter" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@param frictionTile
val costTile = generateEmptyCostTile(cols, rows) | ||
val q: Q = generateEmptyQueue(cols, rows) | ||
|
||
points.foreach({ case (col, row) => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cfor
here would be a bit faster (according to benchmarks on aspect-tif.tif
):
cfor(0)(_ < points.length, _ + 1) { i =>
val (col, row) = points(i)
q.add((col, row, frictionTile.getDouble(col, row), 0.0))
}
require(frictionTile.dimensions == costTile.dimensions) | ||
|
||
def inTile(col: Int, row: Int): Boolean = | ||
((0 <= col && col < cols) && (0 <= row && row < rows)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unnecessary parentheses here too, and in methods below.
* @param col The column of the given location | ||
* @param row The row of the given location | ||
* @param friction1 The instantaneous cost (friction) at the neighboring location | ||
* @param cost The length of the best-known path from a source to the neighboring location |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@param neighborCost
|
||
costs.count | ||
previous.unpersist() | ||
} while (accumulator.value.size > 0) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
accumulator.value.nonEmpty
val resolution = computeResolution(friction) | ||
logger.debug(s"Computed resolution: $resolution meters/pixel") | ||
|
||
val bounds = friction.metadata.bounds.asInstanceOf[KeyBounds[K]] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Eh, dirty cast here, you can use pattern match on Bounds[K]
, and to throw Exception
(for example) on EmptyBounds
logger.debug(s"Computed resolution: $resolution meters/pixel") | ||
|
||
val bounds = friction.metadata.bounds.asInstanceOf[KeyBounds[K]] | ||
val minKey = implicitly[SpatialKey](bounds.minKey) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
instead of the direct implicitly
call, it is possible to write smth like:
val KeyBounds(minKey: SpatialKey, maxKey: SpatialKey) = bounds
val (minKeyCol, minKeyRow) = minKey
everywhere below can be changed, as there are lots of direct implicitly
calls and tuple._{1/2}
usages. I'll omit all comments below about the same thing.
|
||
// Construct return value and return it | ||
val metadata = TileLayerMetadata(DoubleCellType, md.layout, md.extent, md.crs, md.bounds) | ||
val rdd = costs.map({ case (k, _, cost) => (k, cost.asInstanceOf[Tile]) }) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's possible just to upcast DoubleArrayTile
up to Tile
:
(k, cost: Tile)
val mt = md.mapTransform | ||
val kv = friction.first | ||
val key = implicitly[SpatialKey](kv._1) | ||
val tile = implicitly[Tile](kv._2) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
to avoid the direct implicitly
call it's possible provide explicit type:
val (key: SpatialKey, tile: Tile) = kv
Benchmarks. Let me know if that's not enough and i need to test more, but i think it is already obvious that it's significantly slower. |
No, I'll just pull the original one out of version control and use that for the single-tile case. |
2e2307c
to
91777a6
Compare
Signed-off-by: James McClain <[email protected]>
Increase size by one binary order of magnitude. Still proportional to the square root of the area of the typical expected tile.
Force cost values up as quickly as possible to allow paths to be pruned.
This code implements the first iteration of the n-iteration distributed cost-distance algorithm.
05 Feb 19:44:24 INFO [cdistance.CostDistance$] - MILLIS: 155272 05 Feb 19:48:29 INFO [cdistance.CostDistance$] - MILLIS: 153500
Naming this accumulator adds a lot of clutter to the Spark UI.
Previously, only points were accepted.
91777a6
to
b7b3580
Compare
b7b3580
to
30fa62f
Compare
val costTile = generateEmptyCostTile(cols, rows) | ||
val q: Q = generateEmptyQueue(cols, rows) | ||
|
||
var i = 0; while (i < points.length) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why don't to use just cfor
? It's a common dep for the whole raster package. A bit less ugly ^^'.
val keys = mutable.ArrayBuffer.empty[SpatialKey] | ||
val bounds = md.layout.mapTransform(g.envelope) | ||
|
||
var row = bounds.rowMin; while (row <= bounds.rowMax) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here cfor
can be used too
other | ||
} | ||
def add(pair: KeyCostPair): Unit = { | ||
this.synchronized { list.append(pair) } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I totally forgot about it, but @echeipesh noticed these locks and reminded about it. @jamesmcclain, have you considered any java concurrent collection usage here, instead of synchronized
everywhere?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The most appropriate structure that I found was CopyOnWriteArrayList
. The documentation for that class says the following:
This is ordinarily too costly, but may be more efficient than alternatives when traversal operations vastly outnumber mutations, and is useful when you cannot or don't want to synchronize traversals, yet need to preclude interference among concurrent threads. The "snapshot" style iterator method uses a reference to the state of the array at the point that the iterator was created. This array never changes during the lifetime of the iterator, so interference is impossible and the iterator is guaranteed not to throw ConcurrentModificationException. The iterator will not reflect additions, removals, or changes to the list since the iterator was created. Element-changing operations on iterators themselves (remove, set, and add) are not supported. These methods throw UnsupportedOperationException.
(I am pointing in particular to the first sentence.)
I do not think that a synchronized concurrent data structure is warranted here since there is only one place for new data to go (the end of the list).
Fixes #1980