Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add TileFeature Type #1429

Merged
merged 8 commits into from
May 19, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ class AccumuloTestRegistrator extends TestRegistrator {
kryo.register(classOf[org.apache.accumulo.core.client.Durability])
kryo.register(classOf[org.apache.accumulo.core.data.Key])
kryo.register(classOf[org.apache.accumulo.core.data.Value])

kryo.register(classOf[geotrellis.spark.io.avro.codecs.TileFeatureCodec$$anon$1])
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ class AccumuloSpaceTimeSpec
with TestEnvironment
with AccumuloTestEnvironment
with TestFiles
with CoordinateSpaceTimeTests
with LayerUpdateSpaceTimeTileTests {
with CoordinateSpaceTimeSpec
with LayerUpdateSpaceTimeTileSpec {
implicit lazy val instance = MockAccumuloInstance()

lazy val reader = AccumuloLayerReader(instance)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ class AccumuloSpatialSpec
with TestEnvironment
with AccumuloTestEnvironment
with TestFiles
with AllOnesTestTileTests {
with AllOnesTestTileSpec {

implicit lazy val instance = MockAccumuloInstance()

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package geotrellis.spark.io.accumulo

import geotrellis.raster.{Tile, TileFeature}
import geotrellis.spark._
import geotrellis.spark.io._
import geotrellis.spark.io.index._
import geotrellis.spark.testfiles.TestTileFeatureFiles

import com.github.nscala_time.time.Imports._
import org.joda.time.DateTime


class AccumuloTileFeatureSpaceTimeSpec
extends PersistenceSpec[SpaceTimeKey, TileFeature[Tile, Tile], TileLayerMetadata[SpaceTimeKey]]
with SpaceTimeKeyIndexMethods
with TestEnvironment
with AccumuloTestEnvironment
with TestTileFeatureFiles
with CoordinateSpaceTimeTileFeatureSpec
with LayerUpdateSpaceTimeTileFeatureSpec {
implicit lazy val instance = MockAccumuloInstance()

lazy val reader = AccumuloLayerReader(instance)
lazy val writer = AccumuloLayerWriter(instance, "tiles", SocketWriteStrategy())
lazy val deleter = AccumuloLayerDeleter(instance)
lazy val reindexer = AccumuloLayerReindexer(instance, SocketWriteStrategy())
lazy val updater = AccumuloLayerUpdater(instance, SocketWriteStrategy())
lazy val tiles = AccumuloValueReader(instance)
lazy val sample = CoordinateSpaceTime
lazy val copier = AccumuloLayerCopier(instance, reader, writer)
lazy val mover = AccumuloLayerMover(copier, deleter)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package geotrellis.spark.io.accumulo

import geotrellis.raster.{Tile, TileFeature}
import geotrellis.spark._
import geotrellis.spark.io._
import geotrellis.spark.io.index._
import geotrellis.spark.testfiles.TestTileFeatureFiles


class AccumuloTileFeatureSpatialSpec
extends PersistenceSpec[SpatialKey, TileFeature[Tile, Tile], TileLayerMetadata[SpatialKey]]
with SpatialKeyIndexMethods
with TestEnvironment
with AccumuloTestEnvironment
with TestTileFeatureFiles
with AllOnesTestTileFeatureSpec {

implicit lazy val instance = MockAccumuloInstance()

lazy val reader = AccumuloLayerReader(instance)
lazy val writer = AccumuloLayerWriter(instance, "tiles", SocketWriteStrategy())
lazy val deleter = AccumuloLayerDeleter(instance)
lazy val reindexer = AccumuloLayerReindexer(instance, SocketWriteStrategy())
lazy val updater = AccumuloLayerUpdater(instance, SocketWriteStrategy())
lazy val tiles = AccumuloValueReader(instance)
lazy val sample = AllOnesTestFile

lazy val copier = AccumuloLayerCopier(instance, reader, writer)
lazy val mover = AccumuloLayerMover(copier, deleter)
}
30 changes: 30 additions & 0 deletions raster/src/main/scala/geotrellis/raster/TileFeature.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Copyright (c) 2016 Azavea.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package geotrellis.raster

/**
* The TileFeature Type. This is used for packaging a CellGrid (a
* Tile or MultibandTile) together with some metadata.
*
* @param tile The CellGrid-derived tile
* @param data The additional metadata
*/
case class TileFeature[+T <: CellGrid,D](tile: T, data: D) {
def cellType: CellType = tile.cellType
def cols: Int = tile.cols
def rows: Int = tile.rows
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ class S3SpaceTimeSpec
with SpaceTimeKeyIndexMethods
with TestEnvironment
with TestFiles
with CoordinateSpaceTimeTests
with LayerUpdateSpaceTimeTileTests
with CoordinateSpaceTimeSpec
with LayerUpdateSpaceTimeTileSpec
with BeforeAndAfterAll {

registerAfterAll { () =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ class S3SpatialSpec
extends PersistenceSpec[SpatialKey, Tile, TileLayerMetadata[SpatialKey]]
with SpatialKeyIndexMethods
with TestEnvironment with TestFiles
with AllOnesTestTileTests {
with AllOnesTestTileSpec {

lazy val bucket = "mock-bucket"
lazy val prefix = "catalog"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package geotrellis.spark.io.s3

import geotrellis.raster.{Tile, TileFeature}
import geotrellis.spark.io._
import geotrellis.spark.io.index._
import geotrellis.spark.testfiles.TestTileFeatureFiles
import geotrellis.spark._

import com.github.nscala_time.time.Imports._
import org.joda.time.DateTime
import org.scalatest._


class S3TileFeatureSpaceTimeSpec
extends PersistenceSpec[SpaceTimeKey, TileFeature[Tile, Tile], TileLayerMetadata[SpaceTimeKey]]
with SpaceTimeKeyIndexMethods
with TestEnvironment
with TestTileFeatureFiles
with CoordinateSpaceTimeTileFeatureSpec
with LayerUpdateSpaceTimeTileFeatureSpec
with BeforeAndAfterAll {

registerAfterAll { () =>
MockS3Client.reset()
}

lazy val bucket = "mock-bucket"
lazy val prefix = "catalog"

lazy val attributeStore = new S3AttributeStore(bucket, prefix) {
override val s3Client = new MockS3Client
}

lazy val rddReader =
new S3RDDReader {
def getS3Client = () => new MockS3Client()
}

lazy val rddWriter =
new S3RDDWriter {
def getS3Client = () => new MockS3Client
}

lazy val reader = new MockS3LayerReader(attributeStore)
lazy val writer = new MockS3LayerWriter(attributeStore, bucket, prefix)
lazy val updater = new S3LayerUpdater(attributeStore, reader) { override def rddWriter = S3TileFeatureSpaceTimeSpec.this.rddWriter }
lazy val deleter = new S3LayerDeleter(attributeStore) { override val getS3Client = () => new MockS3Client }
lazy val copier = new S3LayerCopier(attributeStore, bucket, prefix) { override val getS3Client = () => new MockS3Client }
lazy val reindexer = GenericLayerReindexer[S3LayerHeader](attributeStore, reader, writer, deleter, copier)
lazy val mover = GenericLayerMover(copier, deleter)
lazy val tiles = new S3ValueReader(attributeStore) {
override val s3Client = new MockS3Client
}
lazy val sample = CoordinateSpaceTime
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package geotrellis.spark.io.s3

import geotrellis.raster.{Tile, TileFeature}
import geotrellis.spark._
import geotrellis.spark.io._
import geotrellis.spark.io.index._
import geotrellis.spark.testfiles.TestTileFeatureFiles

import org.scalatest._


class S3TileFeatureSpatialSpec
extends PersistenceSpec[SpatialKey, TileFeature[Tile, Tile], TileLayerMetadata[SpatialKey]]
with SpatialKeyIndexMethods
with TestEnvironment
with TestTileFeatureFiles
with AllOnesTestTileFeatureSpec {

lazy val bucket = "mock-bucket"
lazy val prefix = "catalog"

registerAfterAll { () =>
MockS3Client.reset()
}

lazy val attributeStore = new S3AttributeStore(bucket, prefix) {
override val s3Client = new MockS3Client()
}

lazy val rddReader =
new S3RDDReader {
def getS3Client = () => new MockS3Client()
}

lazy val rddWriter =
new S3RDDWriter {
def getS3Client = () => new MockS3Client()
}

lazy val reader = new MockS3LayerReader(attributeStore)
lazy val writer = new MockS3LayerWriter(attributeStore, bucket, prefix)
lazy val updater = new S3LayerUpdater(attributeStore, reader) { override def rddWriter = S3TileFeatureSpatialSpec.this.rddWriter }
lazy val deleter = new S3LayerDeleter(attributeStore) { override val getS3Client = () => new MockS3Client() }
lazy val copier = new S3LayerCopier(attributeStore, bucket, prefix) { override val getS3Client = () => new MockS3Client }
lazy val reindexer = GenericLayerReindexer[S3LayerHeader](attributeStore, reader, writer, deleter, copier)
lazy val mover = GenericLayerMover(copier, deleter)
lazy val tiles = new S3ValueReader(attributeStore) { override val s3Client = new MockS3Client() }
lazy val sample = AllOnesTestFile
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@ import geotrellis.spark.io.avro.{AvroRecordCodec, AvroUnionCodec}

object Implicits extends Implicits

trait Implicits extends TileCodecs with KeyCodecs {
trait Implicits
extends TileCodecs
with TileFeatureCodec
with KeyCodecs {
implicit def tileUnionCodec = new AvroUnionCodec[Tile](
byteArrayTileCodec,
floatArrayTileCodec,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package geotrellis.spark.io.avro.codecs

import java.nio.ByteBuffer

import geotrellis.raster._
import geotrellis.spark.io.avro._
import geotrellis.spark.io.avro.codecs.Implicits._

import org.apache.avro.SchemaBuilder
import org.apache.avro.generic._

import scala.collection.JavaConverters._
import scala.util.Try


trait TileFeatureCodec {

implicit def tileFeatureCodec[
T <: Tile: AvroRecordCodec,
D: AvroRecordCodec
]: AvroRecordCodec[TileFeature[T,D]] = new AvroRecordCodec[TileFeature[T,D]] {
def schema = SchemaBuilder
.record("TileFeature").namespace("geotrellis.raster")
.fields()
.name("tile").`type`(implicitly[AvroRecordCodec[T]].schema).noDefault
.name("data").`type`(implicitly[AvroRecordCodec[D]].schema).noDefault
.endRecord()

def encode(tileFeature: TileFeature[T, D], rec: GenericRecord): Unit = {
rec.put("tile", implicitly[AvroRecordCodec[T]].encode(tileFeature.tile))
rec.put("data", implicitly[AvroRecordCodec[D]].encode(tileFeature.data))
}

def decode(rec: GenericRecord): TileFeature[T,D] = {
val tile: T = implicitly[AvroRecordCodec[T]].decode(rec.get("tile").asInstanceOf[GenericRecord])
val data: D = implicitly[AvroRecordCodec[D]].decode(rec.get("data").asInstanceOf[GenericRecord])
TileFeature(tile, data)
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -108,10 +108,13 @@ class KryoRegistrator extends SparkKryoRegistrator {
kryo.register(classOf[geotrellis.raster.DoubleRawArrayTile])
kryo.register(classOf[geotrellis.raster.DoubleConstantNoDataArrayTile])
kryo.register(classOf[geotrellis.raster.DoubleUserDefinedNoDataArrayTile])

kryo.register(classOf[Array[geotrellis.raster.Tile]])
kryo.register(classOf[Array[geotrellis.raster.TileFeature[_,_]]])
kryo.register(classOf[geotrellis.raster.Tile])
kryo.register(classOf[geotrellis.raster.TileFeature[_,_]])

kryo.register(classOf[geotrellis.raster.ArrayMultibandTile])
kryo.register(classOf[Array[geotrellis.raster.Tile]])
kryo.register(classOf[geotrellis.raster.CompositeTile])
kryo.register(classOf[geotrellis.raster.ConstantTile])
kryo.register(classOf[geotrellis.raster.CroppedTile])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ class TileRDDMergeMethods[K: ClassTag, V: ClassTag: ? => TileMergeMethods[V]](va
def merge(other: RDD[(K, V)]): RDD[(K, V)] =
TileRDDMerge(self, other)

def merge(partitioner: Option[Partitioner] = None): RDD[(K, V)] =
def merge(): RDD[(K, V)] =
TileRDDMerge(self, None)

def merge(partitioner: Option[Partitioner]): RDD[(K, V)] =
TileRDDMerge(self, partitioner)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package geotrellis.spark.io

import geotrellis.raster.{GridBounds, Tile, TileFeature}
import geotrellis.spark._
import geotrellis.spark.io.avro.codecs._
import geotrellis.spark.io.json._
import geotrellis.vector.Extent


trait AllOnesTestTileFeatureSpec { self: PersistenceSpec[SpatialKey, TileFeature[Tile, Tile], TileLayerMetadata[SpatialKey]] =>

val bounds1 = GridBounds(1,1,3,3)
val bounds2 = GridBounds(4,5,6,6)

for(PersistenceSpecDefinition(keyIndexMethodName, _, layerIds) <- specLayerIds) {
val layerId = layerIds.layerId
val query = reader.query[SpatialKey, TileFeature[Tile, Tile], TileLayerMetadata[SpatialKey]](layerId)

describe(s"AllOnes query tests for $keyIndexMethodName") {
it("filters past layout bounds") {
query.where(Intersects(GridBounds(6, 2, 7, 3))).result.keys.collect() should
contain theSameElementsAs Array(SpatialKey(6, 3), SpatialKey(6, 2))
}

it("query inside layer bounds") {
val actual = query.where(Intersects(bounds1)).result.keys.collect()
val expected = for ((x, y) <- bounds1.coords) yield SpatialKey(x, y)

if (expected.diff(actual).nonEmpty)
info(s"missing: ${(expected diff actual).toList}")
if (actual.diff(expected).nonEmpty)
info(s"unwanted: ${(actual diff expected).toList}")

actual should contain theSameElementsAs expected
}

it("query outside of layer bounds") {
query.where(Intersects(GridBounds(10, 10, 15, 15))).result.collect() should be(empty)
}

it("disjoint query on space") {
val actual = query.where(Intersects(bounds1) or Intersects(bounds2)).result.keys.collect()
val expected = for ((x, y) <- bounds1.coords ++ bounds2.coords) yield SpatialKey(x, y)

if (expected.diff(actual).nonEmpty)
info(s"missing: ${(expected diff actual).toList}")
if (actual.diff(expected).nonEmpty)
info(s"unwanted: ${(actual diff expected).toList}")

actual should contain theSameElementsAs expected
}

it("should filter by extent") {
val extent = Extent(-10, -10, 10, 10) // this should intersect the four central tiles in 8x8 layout
query.where(Intersects(extent)).result.keys.collect() should
contain theSameElementsAs {
for ((col, row) <- GridBounds(3, 3, 4, 4).coords) yield SpatialKey(col, row)
}
}
}
}
}
Loading