Skip to content

Commit

Permalink
Merge pull request #1807 from echeipesh/refactor/eac/geotiff-rdd
Browse files Browse the repository at this point in the history
Refactor Windowed Reading GeoTiffs from S3 and Hdfs
  • Loading branch information
lossyrob authored Nov 15, 2016
2 parents 849d321 + 68f2ffc commit 288b45b
Show file tree
Hide file tree
Showing 81 changed files with 1,946 additions and 1,250 deletions.
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -10,26 +10,26 @@ import org.scalatest._

class BigTiffSpec extends FunSpec with RasterMatchers with GeoTiffTestUtils {
describe("Reading BigTiffs") {
val smallPath = "raster-test/data/geotiff-test-files/ls8_int32.tif"
val bigPath = "raster-test/data/geotiff-test-files/bigtiffs/ls8_int32-big.tif"
val smallPath = geoTiffPath("ls8_int32.tif")
val bigPath = geoTiffPath("bigtiffs/ls8_int32-big.tif")

val smallPathMulti = "raster-test/data/geotiff-test-files/multi.tif"
val bigPathMulti = "raster-test/data/geotiff-test-files/bigtiffs/multi-big.tif"
val smallPathMulti = geoTiffPath("multi.tif")
val bigPathMulti = geoTiffPath("bigtiffs/multi-big.tif")

val chunkSize = 500

it("should read in the entire SinglebandGeoTiff") {
val local = LocalBytesStreamer(bigPath, chunkSize)
val reader = StreamByteReader(local)
val local = FileRangeReader(bigPath)
val reader = StreamingByteReader(local, chunkSize)
val actual = SinglebandGeoTiff(reader)
val expected = SinglebandGeoTiff(smallPath)

assertEqual(actual.tile, expected.tile)
}

it("should read in a cropped SinlebandGeoTiff from the edge") {
val local = LocalBytesStreamer(bigPath, chunkSize)
val reader = StreamByteReader(local)
val local = FileRangeReader(bigPath)
val reader = StreamingByteReader(local, chunkSize)
val tiffTags = TiffTagsReader.read(smallPath)
val extent = tiffTags.extent
val e = Extent(extent.xmin, extent.ymin, extent.xmin + 100, extent.ymin + 100)
Expand All @@ -41,8 +41,8 @@ class BigTiffSpec extends FunSpec with RasterMatchers with GeoTiffTestUtils {
}

it("should read in a cropped SinglebandGeoTiff in the middle") {
val local = LocalBytesStreamer(bigPath, chunkSize)
val reader = StreamByteReader(local)
val local = FileRangeReader(bigPath)
val reader = StreamingByteReader(local, chunkSize)
val tiffTags = TiffTagsReader.read(smallPath)
val extent = tiffTags.extent
val e = Extent(extent.xmin + 100 , extent.ymin + 100, extent.xmax - 250, extent.ymax - 250)
Expand All @@ -54,17 +54,17 @@ class BigTiffSpec extends FunSpec with RasterMatchers with GeoTiffTestUtils {
}

it("should read in the entire MultibandGeoTiff") {
val local = LocalBytesStreamer(bigPathMulti, chunkSize)
val reader = StreamByteReader(local)
val local = FileRangeReader(bigPathMulti)
val reader = StreamingByteReader(local, chunkSize)
val actual = MultibandGeoTiff(reader)
val expected = MultibandGeoTiff(smallPathMulti)

assertEqual(actual.tile, expected.tile)
}

it("should read in a cropped MultibandGeoTiff from the edge") {
val local = LocalBytesStreamer(bigPathMulti, chunkSize)
val reader = StreamByteReader(local)
val local = FileRangeReader(bigPathMulti)
val reader = StreamingByteReader(local, chunkSize)
val tiffTags = TiffTagsReader.read(smallPathMulti)
val extent = tiffTags.extent
val e = Extent(extent.xmin, extent.ymin, extent.xmin + 100, extent.ymin + 100)
Expand All @@ -76,8 +76,8 @@ class BigTiffSpec extends FunSpec with RasterMatchers with GeoTiffTestUtils {
}

it("should read in a cropped MultibandGeoTiff in the middle") {
val local = LocalBytesStreamer(bigPathMulti, chunkSize)
val reader = StreamByteReader(local)
val local = FileRangeReader(bigPathMulti)
val reader = StreamingByteReader(local, chunkSize)
val tiffTags = TiffTagsReader.read(smallPathMulti)
val extent = tiffTags.extent
val e = Extent(extent.xmin + 100 , extent.ymin + 100, extent.xmax - 250, extent.ymax - 250)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@ trait Tester {
val byteBuffer= Filesystem.toMappedByteBuffer(path)
val arraySegmentBytes: ArraySegmentBytes =
ArraySegmentBytes(byteBuffer, tiffTags)
val bufferSegmentBytes: BufferSegmentBytes =
BufferSegmentBytes(byteBuffer, tiffTags)

val bufferSegmentBytes: LazySegmentBytes =
LazySegmentBytes(byteBuffer, tiffTags)

val geoTiff =
if (tiffTags.bandCount == 1)
Expand Down Expand Up @@ -65,7 +65,7 @@ class SegmentBytesSpec extends FunSpec
}
}

describe("Reading into BufferSegmentBytes") {
describe("Reading into LazySegmentBytes") {
it("striped, singleband GeoTiff") {
val tester = new Tester(paths(0))
assert(tester.bufferSegmentBytes.size == tester.actual.size)
Expand All @@ -85,7 +85,7 @@ class SegmentBytesSpec extends FunSpec
it("should read in a large file") {
val tiffTags = TiffTagsReader.read(largeFile)
val byteBuffer = Filesystem.toMappedByteBuffer(largeFile)
BufferSegmentBytes(byteBuffer, tiffTags)
LazySegmentBytes(byteBuffer, tiffTags)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ class SinglebandGeoTiffReaderSpec extends FunSpec
expected
}

def writeExpectedTile(t: Tile, n: String): Unit =
def writeExpectedTile(t: Tile, n: String): Unit =
geotrellis.raster.io.geotiff.writer.GeoTiffWriter.write(
GeoTiff(
t,
Expand Down Expand Up @@ -319,8 +319,6 @@ class SinglebandGeoTiffReaderSpec extends FunSpec
withClue(s"Failed for Storage $s, type $t") {
val gtiff = geoTiff(s, t)
val tile = gtiff.tile
println("tiles", gtiff, gtiff.cellType, tile, tile.cellType, tile.toArrayTile, tile.toArrayTile.cellType)
println(tile.get(0, 0), tile.toArrayTile.get(0, 0))
assertEqual(tile, tile.toArrayTile)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ object ArraySegmentBytes {
* Creates a new instance of ArraySegmentBytes.
*
* @param byteReader A ByteReader that contains the bytes of the GeoTiff
* @param byteBuffer A ByteBuffer that contains the bytes of the GeoTiff
* @param tiffTags The [[geotrellis.raster.io.geotiff.tags.TiffTags]] of the GeoTiff
* @return A new instance of ArraySegmentBytes
*/
Expand All @@ -46,17 +45,12 @@ object ArraySegmentBytes {
val compressedBytes: Array[Array[Byte]] = {
def readSections(offsets: Array[Long],
byteCounts: Array[Long]): Array[Array[Byte]] = {
val oldOffset = byteReader.position

val result = Array.ofDim[Array[Byte]](offsets.size)

cfor(0)(_ < offsets.size, _ + 1) { i =>
byteReader.position(offsets(i).toInt)
result(i) = byteReader.getSignedByteArray(byteCounts(i).toInt)
result(i) = byteReader.getSignedByteArray(byteCounts(i), offsets(i))
}

byteReader.position(oldOffset)

result
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,10 @@ import spire.syntax.cfor._
* This class implements [[SegmentBytes]] via a ByteReader.
*
* @param byteReader: A ByteReader that contains bytes of the GeoTiff
* @param storageMethod: The [[StorageMethod]] of the GeoTiff
* @param tifftags: The [[TiffTags]] of the GeoTiff
* @return A new instance of BufferSegmentBytes
* @return A new instance of LazySegmentBytes
*/
case class BufferSegmentBytes(byteReader: ByteReader, tiffTags: TiffTags) extends SegmentBytes {
case class LazySegmentBytes(byteReader: ByteReader, tiffTags: TiffTags) extends SegmentBytes {

val (offsets, byteCounts) =
if (tiffTags.hasStripStorage) {
Expand All @@ -32,7 +31,7 @@ case class BufferSegmentBytes(byteReader: ByteReader, tiffTags: TiffTags) extend
BasicTags._stripByteCounts get)

(stripOffsets.get, stripByteCounts.get)

} else {
val tileOffsets = (tiffTags &|->
TiffTags._tileTags ^|->
Expand All @@ -54,11 +53,6 @@ case class BufferSegmentBytes(byteReader: ByteReader, tiffTags: TiffTags) extend
* @param i: The index number of the segment.
* @return An Array[Byte] that contains the bytes of the segment
*/
def getSegment(i: Int) = {
val oldOffset = byteReader.position
byteReader.position(offsets(i).toInt)
val result = byteReader.getSignedByteArray(byteCounts(i).toInt)
byteReader.position(oldOffset)
result
}
def getSegment(i: Int) =
byteReader.getSignedByteArray(byteCounts(i), offsets(i))
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ object GeoTiffReader {
case Some(x) => readSingleband(path, false, true).crop(x)
case None => readSingleband(path)
}

/* Read a single band GeoTIFF file.
* If there is more than one band in the GeoTiff, read the first band only.
*/
Expand All @@ -71,7 +71,7 @@ object GeoTiffReader {
*/
def readSingleband(bytes: Array[Byte]): SinglebandGeoTiff =
readSingleband(ByteBuffer.wrap(bytes), true, false)

/* Read a single band GeoTIFF file.
* If there is more than one band in the GeoTiff, read the first band only.
*/
Expand All @@ -84,7 +84,7 @@ object GeoTiffReader {

def readSingleband(byteReader: ByteReader, e: Extent): SinglebandGeoTiff =
readSingleband(byteReader, Some(e))

def readSingleband(byteReader: ByteReader, e: Option[Extent]): SinglebandGeoTiff =
e match {
case Some(x) => readSingleband(byteReader, false, true).crop(x)
Expand Down Expand Up @@ -127,7 +127,7 @@ object GeoTiffReader {
*/
def readMultiband(path: String): MultibandGeoTiff =
readMultiband(path, true, false)

/* Read in only the extent for each band in a multi ban GeoTIFF file.
*/
def readMultiband(path: String, e: Extent): MultibandGeoTiff =
Expand All @@ -140,21 +140,21 @@ object GeoTiffReader {
case Some(x) => readMultiband(path, false, true).crop(x)
case None => readMultiband(path)
}

/* Read a multi band GeoTIFF file.
*/
def readMultiband(path: String, decompress: Boolean, streaming: Boolean): MultibandGeoTiff =
if (streaming)
readMultiband(Filesystem.toMappedByteBuffer(path), decompress, streaming)
else
readMultiband(ByteBuffer.wrap(Filesystem.slurp(path)), decompress, streaming)

def readMultiband(byteReader: ByteReader): MultibandGeoTiff =
readMultiband(byteReader, true, false)

def readMultiband(byteReader: ByteReader, e: Extent): MultibandGeoTiff =
readMultiband(byteReader, Some(e))

def readMultiband(byteReader: ByteReader, e: Option[Extent]): MultibandGeoTiff =
e match {
case Some(x) => readMultiband(byteReader, false, true).crop(x)
Expand All @@ -165,7 +165,7 @@ object GeoTiffReader {
*/
def readMultiband(bytes: Array[Byte]): MultibandGeoTiff =
readMultiband(ByteBuffer.wrap(bytes), true, false)

/* Read a multi band GeoTIFF file.
*/
def readMultiband(bytes: Array[Byte], decompress: Boolean,
Expand Down Expand Up @@ -277,9 +277,9 @@ object GeoTiffReader {
// Validate Tiff identification number
val tiffIdNumber = byteReader.getChar
if (tiffIdNumber != 42 && tiffIdNumber != 43)
throw new MalformedGeoTiffException(s"bad identification number (must be 42 or 43, was $tiffIdNumber)")
throw new MalformedGeoTiffException(s"bad identification number (must be 42 or 43, was $tiffIdNumber (${tiffIdNumber.toInt}))")

val tiffTags =
val tiffTags =
if (tiffIdNumber == 42) {
val smallStart = byteReader.getInt
TiffTagsReader.read(byteReader, smallStart)
Expand Down Expand Up @@ -317,7 +317,7 @@ object GeoTiffReader {

val segmentBytes: SegmentBytes =
if (streaming)
BufferSegmentBytes(byteReader, tiffTags)
LazySegmentBytes(byteReader, tiffTags)
else
ArraySegmentBytes(byteReader, tiffTags)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ trait ByteReaderExtensions {

arr
}

final def getLongArray(length: Long, valueOffset: Long): Array[Long] = {
val arr = Array.ofDim[Long](length.toInt)

Expand Down Expand Up @@ -171,35 +171,24 @@ trait ByteReaderExtensions {

arr
}


/** NOTE: We don't support lengths greater than Int.MaxValue yet (or ever). */
final def getSignedByteArray(length: Long, valueOffset: Long): Array[Byte] = {
val arr = Array.ofDim[Byte](length.toInt)

val len = length.toInt
if (length <= 4) {
val arr = Array.ofDim[Byte](len)
val bb = ByteBuffer.allocate(4).order(byteReader.order).putInt(0, valueOffset.toInt)
cfor(0)(_ < length, _ + 1) { i =>
cfor(0)(_ < len, _ + 1) { i =>
arr(i) = bb.get
}
arr
} else {
val oldPos = byteReader.position
byteReader.position(valueOffset)

cfor(0)(_ < length, _ + 1) { i =>
arr(i) = byteReader.get
}

val arr = byteReader.getBytes(len)
byteReader.position(oldPos)
arr
}

arr
}

final def getSignedByteArray(length: Long): Array[Byte] = {
val arr = Array.ofDim[Byte](length.toInt)
cfor(0)(_ < length, _ + 1) { i =>
arr(i) = byteReader.get
}
arr
}

final def getSignedShortArray(length: Long, valueOffset: Long): Array[Short] = {
Expand Down
Loading

0 comments on commit 288b45b

Please sign in to comment.