From cde2e71771649169cfd438257b041cef2c35144f Mon Sep 17 00:00:00 2001 From: Grigory Pomadchin Date: Mon, 27 Mar 2017 16:13:55 +0300 Subject: [PATCH] PNG / JPG / GeoTiff hadoop write methods Signed-off-by: Grigory Pomadchin --- .../spark/raster/HadoopWriteMethods.scala | 66 +++++++++++++++++++ .../geotrellis/spark/raster/Implicits.scala | 28 ++++++++ 2 files changed, 94 insertions(+) create mode 100644 spark/src/main/scala/geotrellis/spark/raster/HadoopWriteMethods.scala create mode 100644 spark/src/main/scala/geotrellis/spark/raster/Implicits.scala diff --git a/spark/src/main/scala/geotrellis/spark/raster/HadoopWriteMethods.scala b/spark/src/main/scala/geotrellis/spark/raster/HadoopWriteMethods.scala new file mode 100644 index 0000000000..c02b9760cc --- /dev/null +++ b/spark/src/main/scala/geotrellis/spark/raster/HadoopWriteMethods.scala @@ -0,0 +1,66 @@ +/* + * Copyright 2016 Azavea + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package geotrellis.spark.raster + +import geotrellis.util.MethodExtensions + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileSystem, Path} +import org.apache.hadoop.io.compress.CompressionCodecFactory +import org.apache.spark.SparkContext + +import java.io.DataOutputStream + +trait HadoopWriteMethods[T] extends MethodExtensions[T] { + def write(path: Path)(implicit sc: SparkContext): Unit = write(path, gzip = false) + def write(path: Path, gzip: Boolean)(implicit sc: SparkContext): Unit = write(path, gzip, sc.hadoopConfiguration) + def write(path: Path, conf: Configuration): Unit = write(path, gzip = false, conf) + def write(path: Path, gzip: Boolean, conf: Configuration): Unit +} + +object HadoopWriteMethods { + def write(path: Path, gzip: Boolean, conf: Configuration)(dosWrite: DataOutputStream => Unit): Unit = { + val fs = FileSystem.get(conf) + + val os = + if (!gzip) { + fs.create(path) + } else { + val factory = new CompressionCodecFactory(conf) + val outputUri = new Path(s"${path.toUri.toString}.gz") + + val codec = factory.getCodec(outputUri) + + if (codec == null) { + println(s"No codec found for $outputUri, writing without compression.") + fs.create(path) + } else { + codec.createOutputStream(fs.create(outputUri)) + } + } + try { + val dos = new DataOutputStream(os) + try { + dosWrite(dos) + } finally { + dos.close + } + } finally { + os.close + } + } +} diff --git a/spark/src/main/scala/geotrellis/spark/raster/Implicits.scala b/spark/src/main/scala/geotrellis/spark/raster/Implicits.scala new file mode 100644 index 0000000000..bc20fc83b8 --- /dev/null +++ b/spark/src/main/scala/geotrellis/spark/raster/Implicits.scala @@ -0,0 +1,28 @@ +package geotrellis.spark.raster + +import geotrellis.raster.CellGrid +import geotrellis.raster.io.geotiff.GeoTiff +import geotrellis.raster.io.geotiff.writer.GeoTiffWriter +import geotrellis.raster.render.{Jpg, Png} + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.Path + +object Implicits extends Implicits + +trait Implicits { + implicit class withJpgHadoopWriteMethods(val self: Jpg) extends HadoopWriteMethods[Jpg] { + def write(path: Path, gzip: Boolean, conf: Configuration): Unit = + HadoopWriteMethods.write(path, gzip, conf) { _.write(self.bytes) } + } + + implicit class withPngHadoopWriteMethods(val self: Png) extends HadoopWriteMethods[Png] { + def write(path: Path, gzip: Boolean, conf: Configuration): Unit = + HadoopWriteMethods.write(path, gzip, conf) { _.write(self.bytes) } + } + + implicit class withGeoTiffHadoopWriteMethods[T <: CellGrid](val self: GeoTiff[T]) extends HadoopWriteMethods[GeoTiff[T]] { + def write(path: Path, gzip: Boolean, conf: Configuration): Unit = + HadoopWriteMethods.write(path, gzip, conf) { new GeoTiffWriter(self, _).write() } + } +}