Skip to content

Commit

Permalink
Merge pull request #1614 from pomadchin/feature/collections-psummary
Browse files Browse the repository at this point in the history
Collections polygonal summary functions
  • Loading branch information
lossyrob authored Sep 23, 2016
2 parents 06d1306 + 0c58833 commit 3df8d9e
Show file tree
Hide file tree
Showing 26 changed files with 977 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,14 @@ object HBaseRDDWriter {

val codec = KeyValueRecordCodec[K, V]

//create the attribute table if it does not exist
if (!instance.getAdmin.tableExists(table)) {
val tableDesc = new HTableDescriptor(table: TableName)
val idsColumnFamilyDesc = new HColumnDescriptor(tilesCF)
tableDesc.addFamily(idsColumnFamilyDesc)
instance.getAdmin.createTable(tableDesc)
// create tile table if it does not exist
instance.withAdminDo { admin =>
if (!admin.tableExists(table)) {
val tableDesc = new HTableDescriptor(table: TableName)
val idsColumnFamilyDesc = new HColumnDescriptor(tilesCF)
tableDesc.addFamily(idsColumnFamilyDesc)
admin.createTable(tableDesc)
}
}

// Call groupBy with numPartitions; if called without that argument or a partitioner,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package geotrellis.spark

import geotrellis.raster._
import geotrellis.spark.tiling.LayoutDefinition
import geotrellis.util._

import scala.reflect.ClassTag

abstract class CellGridLayoutCollectionMethods[K: SpatialComponent, V <: CellGrid, M: GetComponent[?, LayoutDefinition]]
extends MethodExtensions[Seq[(K, V)] with Metadata[M]] {
def asRasters(): Seq[(K, Raster[V])] = {
val mapTransform = self.metadata.getComponent[LayoutDefinition].mapTransform
self.map { case (key, tile) =>
(key, Raster(tile, mapTransform(key)))
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package geotrellis.spark.mapalgebra

import geotrellis.raster.Tile
import geotrellis.util.MethodExtensions
import org.apache.spark.rdd.RDD

import scala.reflect.ClassTag


trait TileCollectionMethods[K] extends MethodExtensions[Seq[(K, Tile)]]
30 changes: 24 additions & 6 deletions spark/src/main/scala/geotrellis/spark/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,13 @@ import geotrellis.raster._
import geotrellis.vector._
import geotrellis.proj4._
import geotrellis.util._

import geotrellis.spark.tiling._
import geotrellis.spark.ingest._
import geotrellis.spark.crop._
import geotrellis.spark.filter._

import org.apache.spark.Partitioner
import org.apache.spark.{Partitioner, SparkContext}
import org.apache.spark.rdd._

import spire.syntax.cfor._

import monocle._
import monocle.syntax._

Expand Down Expand Up @@ -61,18 +57,29 @@ package object spark
with summary.Implicits
with tiling.Implicits {
type TileLayerRDD[K] = RDD[(K, Tile)] with Metadata[TileLayerMetadata[K]]

object TileLayerRDD {
def apply[K](rdd: RDD[(K, Tile)], metadata: TileLayerMetadata[K]): TileLayerRDD[K] =
new ContextRDD(rdd, metadata)
}

type TileLayerCollection[K] = Seq[(K, Tile)] with Metadata[TileLayerMetadata[K]]
object TileLayerCollection {
def apply[K](seq: Seq[(K, Tile)], metadata: TileLayerMetadata[K]): TileLayerCollection[K] =
new ContextCollection(seq, metadata)
}

type MultibandTileLayerRDD[K] = RDD[(K, MultibandTile)] with Metadata[TileLayerMetadata[K]]
object MultibandTileLayerRDD {
def apply[K](rdd: RDD[(K, MultibandTile)], metadata: TileLayerMetadata[K]): MultibandTileLayerRDD[K] =
new ContextRDD(rdd, metadata)
}

type MultibandTileLayerCollection[K] = Seq[(K, MultibandTile)] with Metadata[TileLayerMetadata[K]]
object MultibandTileLayerCollection {
def apply[K](seq: Seq[(K, MultibandTile)], metadata: TileLayerMetadata[K]): MultibandTileLayerCollection[K] =
new ContextCollection(seq, metadata)
}

type TileBounds = GridBounds

type SpatialComponent[K] = Component[K, SpatialKey]
Expand Down Expand Up @@ -107,6 +114,9 @@ package object spark
implicit class withCellGridLayoutRDDMethods[K: SpatialComponent: ClassTag, V <: CellGrid, M: GetComponent[?, LayoutDefinition]](val self: RDD[(K, V)] with Metadata[M])
extends CellGridLayoutRDDMethods[K, V, M]

implicit class withCellGridLayoutCollectionMethods[K: SpatialComponent, V <: CellGrid, M: GetComponent[?, LayoutDefinition]](val self: Seq[(K, V)] with Metadata[M])
extends CellGridLayoutCollectionMethods[K, V, M]

implicit class withProjectedExtentRDDMethods[K: Component[?, ProjectedExtent], V <: CellGrid](val rdd: RDD[(K, V)]) {
def toRasters: RDD[(K, Raster[V])] =
rdd.mapPartitions({ partition =>
Expand All @@ -116,6 +126,14 @@ package object spark
}, preservesPartitioning = true)
}

implicit class withCollectionConversionMethods[K, V, M](val rdd: RDD[(K, V)] with Metadata[M]) {
def toCollection: Seq[(K, V)] with Metadata[M] = ContextCollection(rdd.collect(), rdd.metadata)
}

implicit class withRddConversionMethods[K, V, M](val seq: Seq[(K, V)] with Metadata[M]) {
def toRDD(implicit sc: SparkContext): RDD[(K, V)] with Metadata[M] = ContextRDD(sc.parallelize(seq), seq.metadata)
}

implicit class withProjectedExtentTemporalTilerKeyMethods[K: Component[?, ProjectedExtent]: Component[?, TemporalKey]](val self: K) extends TilerKeyMethods[K, SpaceTimeKey] {
def extent = self.getComponent[ProjectedExtent].extent
def translate(spatialKey: SpatialKey): SpaceTimeKey = SpaceTimeKey(spatialKey, self.getComponent[TemporalKey])
Expand Down
8 changes: 8 additions & 0 deletions spark/src/main/scala/geotrellis/spark/stitch/Implicits.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,12 @@ trait Implicits {
implicit class withSpatialTileRDDMethods[V <: CellGrid: Stitcher](
val self: RDD[(SpatialKey, V)]
) extends SpatialTileRDDStitchMethods[V]

implicit class withSpatialTileLayoutCollectionMethods[V <: CellGrid: Stitcher, M: GetComponent[?, LayoutDefinition]](
val self: Seq[(SpatialKey, V)] with Metadata[M]
) extends SpatialTileLayoutCollectionStitchMethods[V, M]

implicit class withSpatialTileCollectionMethods[V <: CellGrid: Stitcher](
val self: Seq[(SpatialKey, V)]
) extends SpatialTileCollectionStitchMethods[V]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package geotrellis.spark.stitch

import geotrellis.raster._
import geotrellis.raster.stitch.Stitcher
import geotrellis.vector.Extent
import geotrellis.spark._
import geotrellis.spark.tiling._
import geotrellis.util._

import org.apache.spark.rdd.RDD

abstract class SpatialTileLayoutCollectionStitchMethods[V <: CellGrid: Stitcher, M: GetComponent[?, LayoutDefinition]]
extends MethodExtensions[Seq[(SpatialKey, V)] with Metadata[M]] {

def stitch(): Raster[V] = {
val (tile, bounds) = TileLayoutStitcher.stitch(self)
val mapTransform = self.metadata.getComponent[LayoutDefinition].mapTransform
Raster(tile, mapTransform(bounds))
}
}

abstract class SpatialTileCollectionStitchMethods[V <: CellGrid: Stitcher]
extends MethodExtensions[Seq[(SpatialKey, V)]] {

def stitch(): V = TileLayoutStitcher.stitch(self)._1
}
2 changes: 2 additions & 0 deletions spark/src/main/scala/geotrellis/spark/summary/Implicits.scala
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,6 @@ object Implicits extends Implicits
trait Implicits {
implicit class withStatsTileRDDMethods[K](val self: RDD[(K, Tile)])
(implicit val keyClassTag: ClassTag[K]) extends StatsTileRDDMethods[K]

implicit class withStatsTileCollectionMethods[K](val self: Seq[(K, Tile)]) extends StatsTileCollectionMethods[K]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
package geotrellis.spark.summary

import geotrellis.raster._
import geotrellis.raster.histogram._
import geotrellis.spark.mapalgebra._

trait StatsTileCollectionMethods[K] extends TileCollectionMethods[K] {

def averageByKey(): Seq[(K, Tile)] =
self.groupBy(_._1).mapValues { seq => seq.map(_._2).reduce(_ + _) / seq.size } toSeq

def histogram(): Histogram[Double] =
histogram(StreamingHistogram.DEFAULT_NUM_BUCKETS)

def histogram(numBuckets: Int): Histogram[Double] =
self
.map { case (key, tile) => tile.histogramDouble(numBuckets) }
.reduce { _ merge _ }

/** Gives a histogram that uses exact counts of integer values.
*
* @note This cannot handle counts that are larger than Int.MaxValue, and
* should not be used with very large datasets whose counts will overflow.
* These histograms can get very large with a wide range of values.
*/
def histogramExactInt: Histogram[Int] = {
self
.map { case (key, tile) => tile.histogram }
.reduce { _ merge _ }
}

def classBreaks(numBreaks: Int): Array[Int] =
classBreaksDouble(numBreaks).map(_.toInt)

def classBreaksDouble(numBreaks: Int): Array[Double] =
histogram(numBreaks).quantileBreaks(numBreaks)

/** Gives class breaks using a histogram that uses exact counts of integer values.
*
* @note This cannot handle counts that are larger than Int.MaxValue, and
* should not be used with very large datasets whose counts will overflow.
* These histograms can get very large with a wide range of values.
*/
def classBreaksExactInt(numBreaks: Int): Array[Int] =
histogramExactInt.quantileBreaks(numBreaks)

def minMax: (Int, Int) =
self.map(_._2.findMinMax)
.reduce { (t1, t2) =>
val (min1, max1) = t1
val (min2, max2) = t2
val min =
if(isNoData(min1)) min2
else {
if(isNoData(min2)) min1
else math.min(min1, min2)
}
val max =
if(isNoData(max1)) max2
else {
if(isNoData(max2)) max1
else math.max(max1, max2)
}
(min, max)
}

def minMaxDouble: (Double, Double) =
self
.map(_._2.findMinMaxDouble)
.reduce { (t1, t2) =>
val (min1, max1) = t1
val (min2, max2) = t2
val min =
if(isNoData(min1)) min2
else {
if(isNoData(min2)) min1
else math.min(min1, min2)
}
val max =
if(isNoData(max1)) max2
else {
if(isNoData(max2)) max1
else math.max(max1, max2)
}
(min, max)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,18 @@ trait Implicits {
(implicit val keyClassTag: ClassTag[K])
extends PolygonalSummaryKeyedFeatureRDDMethods[K, G, D]

implicit class withZonalSummaryTileLayerCollectionMethods[
K,
M: GetComponent[?, LayoutDefinition]
](val self: Seq[(K, Tile)] with Metadata[M])
(implicit val keyClassTag: ClassTag[K], implicit val _sc: SpatialComponent[K])
extends PolygonalSummaryTileLayerCollectionMethods[K, M] with Serializable

implicit class withZonalSummaryFeatureCollectionMethods[G <: Geometry, D](val featureCollection: Seq[Feature[G, D]])
extends PolygonalSummaryFeatureCollectionMethods[G, D]

implicit class withZonalSummaryKeyedFeatureCollectionMethods[K, G <: Geometry, D](val featureCollection: Seq[(K, Feature[G, D])])
(implicit val keyClassTag: ClassTag[K])
extends PolygonalSummaryKeyedFeatureCollectionMethods[K, G, D]

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package geotrellis.spark.summary.polygonal

import geotrellis.vector._
import geotrellis.vector.summary.polygonal._

trait PolygonalSummaryFeatureCollectionMethods[G <: Geometry, D] {
val featureCollection: Seq[Feature[G, D]]

def polygonalSummary[T](polygon: Polygon, zeroValue: T)(handler: PolygonalSummaryHandler[G, D, T]): T =
featureCollection.aggregate(zeroValue)(handler.mergeOp(polygon, zeroValue), handler.combineOp)

def polygonalSummary[T](multiPolygon: MultiPolygon, zeroValue: T)(handler: PolygonalSummaryHandler[G, D, T]): T =
featureCollection.aggregate(zeroValue)(handler.mergeOp(multiPolygon, zeroValue), handler.combineOp)

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package geotrellis.spark.summary.polygonal

import geotrellis.vector._
import geotrellis.vector.summary.polygonal._

trait PolygonalSummaryKeyedFeatureCollectionMethods[K, G <: Geometry, D] {
val featureCollection: Seq[(K, Feature[G, D])]

private def aggregateByKey[T](self: Seq[(K, Feature[G, D])])(zeroValue: T)(seqOp: (T, Feature[G, D]) => T, combOp: (T, T) => T): Seq[(K, T)] =
self.groupBy(_._1).mapValues { _.map(_._2).aggregate(zeroValue)(seqOp, combOp) } toSeq

def polygonalSummaryByKey[T](polygon: Polygon, zeroValue: T)(handler: PolygonalSummaryHandler[G, D, T]): Seq[(K, T)] =
aggregateByKey(featureCollection)(zeroValue)(handler.mergeOp(polygon, zeroValue), handler.combineOp)

def polygonalSummaryByKey[T](multiPolygon: MultiPolygon, zeroValue: T)(handler: PolygonalSummaryHandler[G, D, T]): Seq[(K, T)] =
aggregateByKey(featureCollection)(zeroValue)(handler.mergeOp(multiPolygon, zeroValue), handler.combineOp)
}
Loading

0 comments on commit 3df8d9e

Please sign in to comment.