diff --git a/build.gradle b/build.gradle index 4a0eea7f58..6f99cdc581 100644 --- a/build.gradle +++ b/build.gradle @@ -104,6 +104,7 @@ allprojects { scalatestVersion = '2.2.6' jettyVersion = '9.2.16.v20160414' guavaVersion = '14.0.1' + kryoVersion = '4.0.0' derbyVersion = '10.12.1.1' pegdownVersion = '1.6.0' snappyStoreVersion = '1.5.1' @@ -279,6 +280,7 @@ subprojects { include '**/*.class' exclude '**/*DUnitTest.class' + exclude '**/*DUnitSingleTest.class' exclude '**/*TestBase.class' workingDir = "${testResultsBase}/junit" diff --git a/cluster/src/dunit/scala/io/snappydata/cluster/ClusterManagerTestBase.scala b/cluster/src/dunit/scala/io/snappydata/cluster/ClusterManagerTestBase.scala index d087210d93..f296291ed1 100644 --- a/cluster/src/dunit/scala/io/snappydata/cluster/ClusterManagerTestBase.scala +++ b/cluster/src/dunit/scala/io/snappydata/cluster/ClusterManagerTestBase.scala @@ -17,7 +17,7 @@ package io.snappydata.cluster import java.io.File -import java.sql.{DriverManager, Connection} +import java.sql.{Connection, DriverManager} import java.util.Properties import scala.collection.JavaConverters._ @@ -31,6 +31,7 @@ import io.snappydata.{Locator, Server, ServiceManager} import org.slf4j.LoggerFactory import org.apache.spark.sql.SnappyContext +import org.apache.spark.sql.collection.Utils import org.apache.spark.{SparkConf, SparkContext} /** @@ -173,7 +174,7 @@ class ClusterManagerTestBase(s: String) extends DistributedTestBase(s) { def getANetConnection(netPort: Int, useGemXDURL: Boolean = false): Connection = { val driver = "com.pivotal.gemfirexd.jdbc.ClientDriver" - Class.forName(driver).newInstance + Utils.classForName(driver).newInstance var url: String = null if (useGemXDURL) { url = "jdbc:gemfirexd://localhost:" + netPort + "/" @@ -222,6 +223,7 @@ object ClusterManagerTestBase { conf.set("spark.sql.inMemoryColumnarStorage.batchSize", "3") // conf.set("spark.executor.memory", "2g") // conf.set("spark.shuffle.manager", "SORT") + Utils.setDefaultSerializerAndCodec(conf) props.asScala.foreach({ case (k, v) => if (k.indexOf(".") < 0) { @@ -273,6 +275,8 @@ object ClusterManagerTestBase { cleanupTestData(null, null) val sparkContext = SnappyContext.globalSparkContext if (sparkContext != null) sparkContext.stop() + // clear system properties set explicitly + Utils.clearDefaultSerializerAndCodec() } def stopNetworkServers(): Unit = { diff --git a/cluster/src/main/scala/io/snappydata/cluster/ExecutorInitiator.scala b/cluster/src/main/scala/io/snappydata/cluster/ExecutorInitiator.scala index 82c0eeda50..e85756f742 100644 --- a/cluster/src/main/scala/io/snappydata/cluster/ExecutorInitiator.scala +++ b/cluster/src/main/scala/io/snappydata/cluster/ExecutorInitiator.scala @@ -35,6 +35,7 @@ import io.snappydata.gemxd.ClusterCallbacksImpl import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.executor.SnappyCoarseGrainedExecutorBackend import org.apache.spark.sql.SnappyContext +import org.apache.spark.sql.collection.Utils import org.apache.spark.{Logging, SparkCallbacks, SparkConf, SparkEnv} /** @@ -147,12 +148,14 @@ object ExecutorInitiator extends Logging { // Fetch the driver's Spark properties. val executorConf = new SparkConf + Utils.setDefaultSerializerAndCodec(executorConf) val port = executorConf.getInt("spark.executor.port", 0) val props = SparkCallbacks.fetchDriverProperty(executorHost, executorConf, port, url) - val driverConf = new SparkConf() + val driverConf = new SparkConf + Utils.setDefaultSerializerAndCodec(driverConf) // Specify a default directory for executor, if the local directory for executor // is set via the executor conf, // it will override this property later in the code @@ -173,8 +176,8 @@ object ExecutorInitiator extends Logging { // TODO: conf to this conf that was received from driver. // If memory manager is not set, use Snappy unified memory manager - driverConf.set("spark.memory.manager", - driverConf.get("spark.memory.manager", SNAPPY_MEMORY_MANAGER)) + driverConf.setIfMissing("spark.memory.manager", + SNAPPY_MEMORY_MANAGER) val cores = driverConf.getInt("spark.executor.cores", Runtime.getRuntime.availableProcessors() * 2) @@ -242,6 +245,7 @@ object ExecutorInitiator extends Logging { executorRunnable.stopTask = true } executorRunnable.setDriverDetails(None, null) + Utils.clearDefaultSerializerAndCodec() } def restartExecutor(): Unit = { @@ -256,7 +260,7 @@ object ExecutorInitiator extends Logging { // Avoid creation of executor inside the Gem accessor // that is a Spark driver but has joined the gem system // in the non embedded mode - if (SparkCallbacks.isDriver()) { + if (SparkCallbacks.isDriver) { logInfo("Executor cannot be instantiated in this " + "VM as a Spark driver is already running. ") return diff --git a/cluster/src/main/scala/io/snappydata/gemxd/SparkSQLExecuteImpl.scala b/cluster/src/main/scala/io/snappydata/gemxd/SparkSQLExecuteImpl.scala index 996803975c..40f13e93fd 100644 --- a/cluster/src/main/scala/io/snappydata/gemxd/SparkSQLExecuteImpl.scala +++ b/cluster/src/main/scala/io/snappydata/gemxd/SparkSQLExecuteImpl.scala @@ -20,6 +20,8 @@ import java.io.DataOutput import java.nio.ByteBuffer import java.nio.charset.StandardCharsets +import com.esotericsoftware.kryo.io.{Input, Output} +import com.esotericsoftware.kryo.{Kryo, KryoSerializable} import com.gemstone.gemfire.DataSerializer import com.gemstone.gemfire.internal.shared.Version import com.gemstone.gemfire.internal.{ByteArrayDataInput, InternalDataSerializer} @@ -46,7 +48,7 @@ import org.apache.spark.sql.{DataFrame, SnappyContext} import org.apache.spark.storage.{RDDBlockId, StorageLevel} import org.apache.spark.unsafe.Platform import org.apache.spark.util.SnappyUtils -import org.apache.spark.{Logging, SparkContext, SparkEnv} +import org.apache.spark.{Logging, SparkContext, SparkEnv, TaskContext} /** * Encapsulates a Spark execution for use in query routing from JDBC. @@ -91,9 +93,10 @@ class SparkSQLExecuteImpl(val sql: String, case None => (false, Array.empty[String]) } - private def handleLocalExecution(srh: SnappyResultHolder): Unit = { + private def handleLocalExecution(srh: SnappyResultHolder, + size: Int): Unit = { // prepare SnappyResultHolder with all data and create new one - if (hdos.size > 0) { + if (size > 0) { val rawData = hdos.toByteArrayCopy srh.fromSerializedData(rawData, rawData.length, null) } @@ -118,13 +121,13 @@ class SparkSQLExecuteImpl(val sql: String, val handler = new InternalRowHandler(sql, querySchema, serializeComplexType, colTypes) val rows = executedPlan.executeCollect() - handler.serializeRows(rows.iterator) + handler(null, rows.iterator) }) hdos.clearForReuse() writeMetaData() hdos.write(result) if (isLocalExecution) { - handleLocalExecution(srh) + handleLocalExecution(srh, hdos.size) } msg.lastResult(srh) return @@ -133,6 +136,7 @@ class SparkSQLExecuteImpl(val sql: String, val resultsRdd = executedPlan.execute() val bm = SparkEnv.get.blockManager val partitionBlockIds = new Array[RDDBlockId](resultsRdd.partitions.length) + val handler = new ExecutionHandler(sql, querySchema, resultsRdd.id, partitionBlockIds, serializeComplexType, colTypes) var blockReadSuccess = false @@ -162,10 +166,7 @@ class SparkSQLExecuteImpl(val sql: String, if (dosSize > GemFireXDUtils.DML_MAX_CHUNK_SIZE) { if (isLocalExecution) { // prepare SnappyResultHolder with all data and create new one - if (dosSize > 0) { - val rawData = hdos.toByteArrayCopy - srh.fromSerializedData(rawData, rawData.length, null) - } + handleLocalExecution(srh, dosSize) msg.sendResult(srh) srh = new SnappyResultHolder(this) } else { @@ -186,7 +187,7 @@ class SparkSQLExecuteImpl(val sql: String, writeMetaData() } if (isLocalExecution) { - handleLocalExecution(srh) + handleLocalExecution(srh, hdos.size) } msg.lastResult(srh) @@ -518,11 +519,15 @@ object SparkSQLExecuteImpl { } } -class InternalRowHandler(sql: String, schema: StructType, - serializeComplexType: Boolean, - rowStoreColTypes: Array[(Int, Int, Int)] = null) extends Serializable { +class InternalRowHandler(private var sql: String, + private var schema: StructType, + private var serializeComplexType: Boolean, + private var rowStoreColTypes: Array[(Int, Int, Int)] = null) + extends ((TaskContext, Iterator[InternalRow]) => Array[Byte]) + with Serializable with KryoSerializable { - final def serializeRows(itr: Iterator[InternalRow]): Array[Byte] = { + override def apply(context: TaskContext, + itr: Iterator[InternalRow]): Array[Byte] = { var numCols = -1 var numEightColGroups = -1 var numPartCols = -1 @@ -568,17 +573,62 @@ class InternalRowHandler(sql: String, schema: StructType, } dos.toByteArray } + + override def write(kryo: Kryo, output: Output): Unit = { + output.writeString(sql) + kryo.writeObject(output, schema) + output.writeBoolean(serializeComplexType) + val colTypes = rowStoreColTypes + if (colTypes != null) { + val len = colTypes.length + output.writeVarInt(len, true) + var i = 0 + while (i < len) { + val colType = colTypes(i) + output.writeVarInt(colType._1, false) + output.writeVarInt(colType._2, false) + output.writeVarInt(colType._3, false) + i += 1 + } + } else { + output.writeVarInt(0, true) + } + } + + override def read(kryo: Kryo, input: Input): Unit = { + sql = input.readString() + schema = kryo.readObject[StructType](input, classOf[StructType]) + serializeComplexType = input.readBoolean() + val len = input.readVarInt(true) + if (len > 0) { + val colTypes = new Array[(Int, Int, Int)](len) + var i = 0 + while (i < len) { + val colType1 = input.readVarInt(false) + val colType2 = input.readVarInt(false) + val colType3 = input.readVarInt(false) + colTypes(i) = (colType1, colType2, colType3) + i += 1 + } + rowStoreColTypes = colTypes + } else { + rowStoreColTypes = null + } + } } -final class ExecutionHandler(sql: String, schema: StructType, rddId: Int, - partitionBlockIds: Array[RDDBlockId], serializeComplexType: Boolean, - rowStoreColTypes: Array[(Int, Int, Int)] = null) - extends InternalRowHandler(sql, schema, serializeComplexType, rowStoreColTypes) { +final class ExecutionHandler(_sql: String, _schema: StructType, rddId: Int, + partitionBlockIds: Array[RDDBlockId], _serializeComplexType: Boolean, + _rowStoreColTypes: Array[(Int, Int, Int)]) + extends InternalRowHandler(_sql, _schema, _serializeComplexType, + _rowStoreColTypes) with Serializable with KryoSerializable { + + def this() = this(null, null, 0, null, false, null) def apply(resultsRdd: RDD[InternalRow], df: DataFrame): Unit = { Utils.withNewExecutionId(df, { val sc = SnappyContext.globalSparkContext - sc.runJob(resultsRdd, serializeRows _, resultHandler _) + sc.runJob(resultsRdd, this, resultHandler _) }) } @@ -592,6 +642,9 @@ final class ExecutionHandler(sql: String, schema: StructType, rddId: Int, partitionBlockIds(partitionId) = blockId } } + + override def toString(): String = + s"ExecutionHandler: Iterator[InternalRow] => Array[Byte]" } object SnappyContextPerConnection { diff --git a/cluster/src/main/scala/io/snappydata/impl/LeadImpl.scala b/cluster/src/main/scala/io/snappydata/impl/LeadImpl.scala index 8f73bf7007..f6971f2a01 100644 --- a/cluster/src/main/scala/io/snappydata/impl/LeadImpl.scala +++ b/cluster/src/main/scala/io/snappydata/impl/LeadImpl.scala @@ -111,6 +111,7 @@ class LeadImpl extends ServerImpl with Lead with Logging { setAppName("leaderLauncher"). set(Property.JobserverEnabled(), "true"). set("spark.scheduler.mode", "FAIR") + Utils.setDefaultSerializerAndCodec(conf) // inspect user input and add appropriate prefixes // if property doesn't contain '.' @@ -240,12 +241,13 @@ class LeadImpl extends ServerImpl with Lead with Logging { SnappyContext.flushSampleTables() } - assert(sparkContext != null, "Mix and match of LeadService api " + + assert(sparkContext != null, "Mix and match of LeadService api " + "and SparkContext is unsupported.") if (!sparkContext.isStopped) { sparkContext.stop() sparkContext = null } + Utils.clearDefaultSerializerAndCodec() if (null != remoteInterpreterServerObj) { val method: Method = remoteInterpreterServerClass.getMethod("isAlive") diff --git a/cluster/src/main/scala/org/apache/spark/SparkCallbacks.scala b/cluster/src/main/scala/org/apache/spark/SparkCallbacks.scala index c481fa161d..35ace66124 100644 --- a/cluster/src/main/scala/org/apache/spark/SparkCallbacks.scala +++ b/cluster/src/main/scala/org/apache/spark/SparkCallbacks.scala @@ -23,8 +23,9 @@ import org.apache.spark.rpc.RpcEnv import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.RetrieveSparkProps /** - * Calls that are needed to be sent to snappy-cluster classes because the variables are private[spark] - */ + * Calls that are needed to be sent to snappy-cluster classes because + * the variables are private[spark] + */ object SparkCallbacks { def createExecutorEnv( @@ -70,7 +71,7 @@ object SparkCallbacks { SparkConf.isExecutorStartupConf(key) } - def isDriver() : Boolean = { + def isDriver: Boolean = { SparkEnv.get != null && SparkEnv.get.executorId == SparkContext.DRIVER_IDENTIFIER } diff --git a/cluster/src/main/scala/org/apache/spark/scheduler/cluster/SnappyCoarseGrainedSchedulerBackend.scala b/cluster/src/main/scala/org/apache/spark/scheduler/cluster/SnappyCoarseGrainedSchedulerBackend.scala index 4dc7588c1d..62dd37c127 100644 --- a/cluster/src/main/scala/org/apache/spark/scheduler/cluster/SnappyCoarseGrainedSchedulerBackend.scala +++ b/cluster/src/main/scala/org/apache/spark/scheduler/cluster/SnappyCoarseGrainedSchedulerBackend.scala @@ -22,13 +22,13 @@ import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedM import com.pivotal.gemfirexd.internal.engine.distributed.utils.GemFireXDUtils import org.apache.spark.SparkContext -import org.apache.spark.internal.Logging import org.apache.spark.rpc.{RpcEndpointAddress, RpcEnv} import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd, SparkListenerBlockManagerAdded, SparkListenerBlockManagerRemoved, SparkListenerExecutorAdded, SparkListenerExecutorRemoved, TaskSchedulerImpl} import org.apache.spark.sql.{BlockAndExecutorId, SnappyContext} -class SnappyCoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, override val rpcEnv: RpcEnv) - extends CoarseGrainedSchedulerBackend(scheduler, rpcEnv) with Logging { +class SnappyCoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, + override val rpcEnv: RpcEnv) + extends CoarseGrainedSchedulerBackend(scheduler, rpcEnv) { private val snappyAppId = "snappy-app-" + System.currentTimeMillis @@ -92,7 +92,7 @@ class SnappyCoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, override } class BlockManagerIdListener(sc: SparkContext) - extends SparkListener with Logging { + extends SparkListener { override def onExecutorAdded( msg: SparkListenerExecutorAdded): Unit = synchronized { diff --git a/cluster/src/test/resources/log4j.properties b/cluster/src/test/resources/log4j.properties new file mode 100644 index 0000000000..1b1c325bff --- /dev/null +++ b/cluster/src/test/resources/log4j.properties @@ -0,0 +1,78 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +log4j.rootCategory=INFO, file + +# RollingFile appender +log4j.appender.file=org.apache.log4j.RollingFileAppender +log4j.appender.file.append=true +log4j.appender.file.file=snappydata.log +log4j.appender.file.MaxFileSize=100MB +log4j.appender.file.MaxBackupIndex=10000 +log4j.appender.file.layout=org.apache.log4j.PatternLayout +log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS zzz} %t %p %c{1}: %m%n + +# Console appender +log4j.appender.console=org.apache.log4j.ConsoleAppender +log4j.appender.console.target=System.out +log4j.appender.console.layout=org.apache.log4j.PatternLayout +log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS zzz} %t %p %c{1}: %m%n + +# Ignore messages below warning level from Jetty, because it's a bit verbose +log4j.logger.org.spark-project.jetty=WARN +org.spark-project.jetty.LEVEL=WARN + +# Some packages are noisy for no good reason. +log4j.additivity.org.apache.hadoop.hive.serde2.lazy.LazyStruct=false +log4j.logger.org.apache.hadoop.hive.serde2.lazy.LazyStruct=OFF + +log4j.additivity.org.apache.hadoop.hive.metastore.RetryingHMSHandler=false +log4j.logger.org.apache.hadoop.hive.metastore.RetryingHMSHandler=OFF + +log4j.additivity.hive.log=false +log4j.logger.hive.log=OFF + +log4j.additivity.parquet.hadoop.ParquetRecordReader=false +log4j.logger.parquet.hadoop.ParquetRecordReader=OFF + +log4j.additivity.org.apache.parquet.hadoop.ParquetRecordReader=false +log4j.logger.org.apache.parquet.hadoop.ParquetRecordReader=OFF + +log4j.additivity.org.apache.parquet.hadoop.ParquetOutputCommitter=false +log4j.logger.org.apache.parquet.hadoop.ParquetOutputCommitter=OFF + +log4j.additivity.hive.ql.metadata.Hive=false +log4j.logger.hive.ql.metadata.Hive=OFF + +log4j.additivity.org.apache.hadoop.hive.ql.io.RCFile=false +log4j.logger.org.apache.hadoop.hive.ql.io.RCFile=ERROR + +# Other Spark classes that generate unnecessary logs at INFO level +log4j.logger.org.apache.spark.broadcast.TorrentBroadcast=WARN +log4j.logger.org.apache.spark.ContextCleaner=WARN +log4j.logger.org.apache.spark.MapOutputTracker=WARN +log4j.logger.org.apache.spark.scheduler.TaskSchedulerImpl=WARN +log4j.logger.org.apache.spark.storage.CoarseGrainedSchedulerBackend=WARN +log4j.logger.org.apache.spark.storage.ShuffleBlockFetcherIterator=WARN +log4j.logger.org.apache.spark.scheduler.DAGScheduler=WARN +log4j.logger.org.apache.spark.scheduler.TaskSetManager=WARN +log4j.logger.org.apache.spark.scheduler.FairSchedulableBuilder=WARN +log4j.logger.org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend$DriverEndpoint=WARN +log4j.logger.org.apache.spark.storage.BlockManagerInfo=WARN +log4j.logger.org.apache.spark.executor.SnappyExecutor=WARN + +# log4j.logger.org.apache.spark.sql.execution.WholeStageCodegenExec=DEBUG diff --git a/cluster/src/test/scala/org/apache/spark/sql/streaming/ClusterSnappyStreamingSuite.scala b/cluster/src/test/scala/org/apache/spark/sql/streaming/ClusterSnappyStreamingSuite.scala index 2021518eec..913f3ab897 100644 --- a/cluster/src/test/scala/org/apache/spark/sql/streaming/ClusterSnappyStreamingSuite.scala +++ b/cluster/src/test/scala/org/apache/spark/sql/streaming/ClusterSnappyStreamingSuite.scala @@ -16,8 +16,6 @@ */ package org.apache.spark.sql.streaming -import java.util.{Map => JMap} - import scala.collection.mutable import scala.language.postfixOps @@ -126,4 +124,4 @@ class ClusterSnappyStreamingSuite assert(r.length > 0) ssnc.sql("drop table gemColumnTable") } -} \ No newline at end of file +} diff --git a/core/build.gradle b/core/build.gradle index 589ca4ce3f..f45f93d8ca 100644 --- a/core/build.gradle +++ b/core/build.gradle @@ -69,6 +69,8 @@ dependencies { compile 'com.zaxxer:HikariCP:2.4.7' //compile 'org.spark-project:dstream-twitter_2.11:0.1.0' compile 'org.twitter4j:twitter4j-stream:4.0.4' + compile 'org.objenesis:objenesis:2.4' + compile "com.esotericsoftware:kryo-shaded:${kryoVersion}" provided 'com.rabbitmq:amqp-client:3.5.7' testCompile project(':dunit') diff --git a/core/src/main/scala/io/snappydata/Literals.scala b/core/src/main/scala/io/snappydata/Literals.scala index 5ec80ceac7..af7e90d7d2 100644 --- a/core/src/main/scala/io/snappydata/Literals.scala +++ b/core/src/main/scala/io/snappydata/Literals.scala @@ -19,7 +19,6 @@ package io.snappydata import java.util.Properties import org.apache.spark.SparkConf -import org.apache.spark.sql.sources._ /** * Constant names suggested per naming convention @@ -88,6 +87,13 @@ object Constant { val MAX_VARCHAR_SIZE = 32672 + val DEFAULT_SERIALIZER = "org.apache.spark.serializer.PooledKryoSerializer" + + // LZ4 JNI version is the fastest one but LZF gives best balance between + // speed and compression ratio having higher compression ration than LZ4. + // But the JNI version means no warmup time which helps for short jobs. + val DEFAULT_CODEC = "lz4" + // System property to tell the system whether the String type columns // should be considered as clob or not val STRING_AS_CLOB_PROP = "spark-string-as-clob" @@ -178,7 +184,7 @@ object Property extends Enumeration { * mirrors closely the format used by Hive,Oracle query hints with a comment * followed immediately by a '+' and then "key(value)" for the hint. Example: *

- * SELECT * /{@literal *}+ hint(value) *{@literal /} FROM t1 + * SELECT * /`*`+ hint(value) *`/` FROM t1 */ object QueryHint extends Enumeration { @@ -205,7 +211,7 @@ object QueryHint extends Enumeration { * Possible values are valid indexes defined on the table. * * Example:
- * SELECT * FROM t1 /{@literal *}+ index(xxx) *{@literal /} , t2 --+ withIndex(yyy) + * SELECT * FROM t1 /`*`+ index(xxx) *`/` , t2 --+ withIndex(yyy) */ val Index = Value("index") @@ -216,7 +222,7 @@ object QueryHint extends Enumeration { * Possible comma separated values are [[io.snappydata.JOS]]. * * Example:
- * SELECT * FROM /{@literal *}+ joinOrder(fixed) *{@literal /} t1, t2 + * SELECT * FROM /`*`+ joinOrder(fixed) *`/` t1, t2 */ val JoinOrder = Value("joinOrder") @@ -238,7 +244,7 @@ object QueryHint extends Enumeration { /** * List of possible values for Join Order QueryHint. * - * [[Note:]] Ordering is applicable only when index choice is left to the optimizer. By default, + * `Note:` Ordering is applicable only when index choice is left to the optimizer. By default, * if user specifies explicit index hint like "select * from t1 --+ index()", optimizer will just * honor the hint and skip everything mentioned in joinOrder. In other words, a blank index() * hint for any table disables choice of index and its associated following rules. @@ -253,7 +259,7 @@ object JOS extends Enumeration { * Continue to attempt optimization choices of index for colocated joins even if user have * specified explicit index hints for some tables. * - * [[Note:]] user specified index hint will be honored and optimizer will only attempt for + * `Note:` user specified index hint will be honored and optimizer will only attempt for * other tables in the query. */ val ContinueOptimizations = Value("continueOpts") diff --git a/core/src/main/scala/io/snappydata/impl/SparkShellRDDHelper.scala b/core/src/main/scala/io/snappydata/impl/SparkShellRDDHelper.scala index b94062d7cd..939d1d39aa 100644 --- a/core/src/main/scala/io/snappydata/impl/SparkShellRDDHelper.scala +++ b/core/src/main/scala/io/snappydata/impl/SparkShellRDDHelper.scala @@ -17,7 +17,6 @@ package io.snappydata.impl import java.sql.{Connection, ResultSet, SQLException, Statement} -import java.util import scala.collection.JavaConverters._ import scala.collection.mutable @@ -59,21 +58,21 @@ final class SparkShellRDDHelper { val resolvedName = StoreUtils.lookupName(tableName, conn.getSchema) val partition = split.asInstanceOf[ExecutorMultiBucketLocalShellPartition] - var bucketString = "" - partition.buckets.foreach( bucket => { - bucketString = bucketString + bucket + "," - }) - val buckets = bucketString.substring(0, bucketString.length-1) + val buckets = partition.buckets.mkString(",") val statement = conn.createStatement() - if (!useLocatorURL) - statement.execute(s"call sys.SET_BUCKETS_FOR_LOCAL_EXECUTION('$resolvedName', '$buckets')") + if (!useLocatorURL) { + statement.execute( + s"call sys.SET_BUCKETS_FOR_LOCAL_EXECUTION('$resolvedName', '$buckets')") + } val rs = statement.executeQuery(query) (statement, rs) } - def getConnection(connectionProperties: ConnectionProperties, split: Partition): Connection = { - val urlsOfNetServerHost = split.asInstanceOf[ExecutorMultiBucketLocalShellPartition].hostList + def getConnection(connectionProperties: ConnectionProperties, + split: Partition): Connection = { + val urlsOfNetServerHost = split.asInstanceOf[ + ExecutorMultiBucketLocalShellPartition].hostList useLocatorURL = SparkShellRDDHelper.useLocatorUrl(urlsOfNetServerHost) createConnection(connectionProperties, urlsOfNetServerHost) } @@ -99,13 +98,12 @@ final class SparkShellRDDHelper { ConnectionPool.getPoolConnection(jdbcUrl, GemFireXDClientDialect, props, connProperties.executorConnProps, connProperties.hikariCP) } catch { - case sqlException: SQLException => - if (hostList.size == 1 || useLocatorURL) - throw sqlException - else { - hostList.remove(index) - createConnection(connProperties, hostList) - } + case sqle: SQLException => if (hostList.size == 1 || useLocatorURL) { + throw sqle + } else { + hostList.remove(index) + createConnection(connProperties, hostList) + } } } } @@ -126,75 +124,6 @@ object SparkShellRDDHelper { partitions } - private def mapBucketsToPartitions(tableName: String, conn: Connection): Array[Partition] = { - val resolvedName = StoreUtils.lookupName(tableName, conn.getSchema) - val bucketToServerList = getBucketToServerMapping(resolvedName) - val numBuckets = bucketToServerList.length - - Misc.getRegionForTable(resolvedName, true).asInstanceOf[Region[_, _]] match { - case pr: PartitionedRegion => - val serverToBuckets = new mutable.HashMap[InternalDistributedMember, - mutable.HashSet[Int]]() - var totalBuckets = new mutable.HashSet[Int]() - for (p <- 0 until numBuckets) { - val owner = pr.getBucketPrimary(p) - val bucketSet = { - if (serverToBuckets.contains(owner)) serverToBuckets.get(owner).get - else new mutable.HashSet[Int]() - } - bucketSet += p - totalBuckets += p - serverToBuckets put(owner, bucketSet) - } - val numCores = Runtime.getRuntime.availableProcessors() - val numServers = GemFireXDUtils.getGfxdAdvisor.adviseDataStores(null).size() - val numPartitions = numServers * numCores - val partitions = { - if (numBuckets < numPartitions) { - new Array[Partition](numBuckets) - } else { - new Array[Partition](numPartitions) - } - } - var partCnt = 0; - serverToBuckets foreach (e => { - var numCoresPending = numCores - var localBuckets = e._2 - assert(!localBuckets.isEmpty) - var maxBucketsPerPart = Math.ceil(e._2.size.toFloat / numCores) - assert(maxBucketsPerPart >= 1) - while (partCnt <= numPartitions && !localBuckets.isEmpty) { - var cntr = 0; - val bucketsPerPart = new mutable.HashSet[Int]() - maxBucketsPerPart = Math.ceil(localBuckets.size.toFloat / numCoresPending) - assert(maxBucketsPerPart >= 1) - while (cntr < maxBucketsPerPart && !localBuckets.isEmpty) { - val buck = localBuckets.head - bucketsPerPart += buck - localBuckets = localBuckets - buck - totalBuckets = totalBuckets - buck - cntr += 1 - } - partitions(partCnt) = new ExecutorMultiBucketLocalShellPartition( - partCnt, bucketsPerPart, bucketToServerList(bucketsPerPart.head)) - partCnt += 1 - numCoresPending -= 1 - } - }) - partitions - case pr: DistributedRegion => - val numPartitions = bucketToServerList.length - val partitions = new Array[Partition](numPartitions) - for (p <- 0 until numPartitions) { - partitions(p) = new ExecutorMultiBucketLocalShellPartition( - p, - mutable.HashSet.empty, - bucketToServerList(p)) - } - partitions - } - } - private def useLocatorUrl(hostList: ArrayBuffer[(String, String)]): Boolean = hostList.isEmpty @@ -208,11 +137,13 @@ object SparkShellRDDHelper { if (netServers.indexOf(',') > 0) { for (netServer <- netServers.split(",")) { netUrls += node.getIpAddress.getHostAddress -> - (urlPrefix + org.apache.spark.sql.collection.Utils.getClientHostPort(netServer) + urlSuffix) + (urlPrefix + org.apache.spark.sql.collection.Utils + .getClientHostPort(netServer) + urlSuffix) } } else { netUrls += node.getIpAddress.getHostAddress -> - (urlPrefix + org.apache.spark.sql.collection.Utils.getClientHostPort(netServers) + urlSuffix) + (urlPrefix + org.apache.spark.sql.collection.Utils + .getClientHostPort(netServers) + urlSuffix) } } } @@ -243,7 +174,8 @@ object SparkShellRDDHelper { membersToNetServers, urlPrefix, urlSuffix, netUrls)) if (netUrls.isEmpty) { - // Save the bucket which does not have a neturl, and later assign available ones to it. + // Save the bucket which does not have a neturl, + // and later assign available ones to it. orphanBuckets += bid } else { for (e <- netUrls) { diff --git a/core/src/main/scala/org/apache/spark/serializer/PooledKryoSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/PooledKryoSerializer.scala new file mode 100644 index 0000000000..7b3e8c06d1 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/serializer/PooledKryoSerializer.scala @@ -0,0 +1,468 @@ +/* + * Copyright (c) 2016 SnappyData, Inc. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you + * may not use this file except in compliance with the License. You + * may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. See accompanying + * LICENSE file. + */ +package org.apache.spark.serializer + +import java.io.{EOFException, IOException, InputStream, OutputStream} +import java.lang.ref.SoftReference +import java.nio.ByteBuffer + +import scala.reflect.ClassTag + +import com.esotericsoftware.kryo.io.{Input, Output} +import com.esotericsoftware.kryo.serializers.DefaultSerializers.KryoSerializableSerializer +import com.esotericsoftware.kryo.{Kryo, KryoException} + +import org.apache.spark.broadcast.TorrentBroadcast +import org.apache.spark.executor.{InputMetrics, OutputMetrics, ShuffleReadMetrics, ShuffleWriteMetrics, TaskMetrics} +import org.apache.spark.network.util.ByteUnit +import org.apache.spark.rdd.ZippedPartitionsPartition +import org.apache.spark.scheduler._ +import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.{LaunchTask, StatusUpdate} +import org.apache.spark.sql.BlockAndExecutorId +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.sql.collection.{MultiBucketExecutorPartition, NarrowExecutorLocalSplitDep} +import org.apache.spark.sql.execution.columnar.impl.{ColumnarStorePartitionedRDD, SparkShellCachedBatchRDD, SparkShellRowRDD} +import org.apache.spark.sql.execution.metric.SQLMetric +import org.apache.spark.sql.execution.row.RowFormatScanRDD +import org.apache.spark.sql.sources.ConnectionProperties +import org.apache.spark.sql.types.StructType +import org.apache.spark.storage.BlockManagerMessages.{RemoveBlock, RemoveBroadcast, RemoveRdd, RemoveShuffle, UpdateBlockInfo} +import org.apache.spark.storage._ +import org.apache.spark.unsafe.types.UTF8String +import org.apache.spark.util.collection.BitSet +import org.apache.spark.util.{CollectionAccumulator, DoubleAccumulator, LongAccumulator, SerializableBuffer, Utils} +import org.apache.spark.{Logging, SparkConf, SparkEnv} + +/** + * A pooled, optimized version of Spark's KryoSerializer that also works for + * closure serialization. + * + * Note that this serializer is not guaranteed to be wire-compatible across + * different versions of Spark. It is intended to be used to + * serialize/de-serialize data within a single Spark application. + */ +final class PooledKryoSerializer(conf: SparkConf) + extends KryoSerializer(conf) with Serializable { + + /** + * Sets a class loader for the serializer to use in deserialization. + * + * @return this Serializer object + */ + override def setDefaultClassLoader(classLoader: ClassLoader): Serializer = { + // clear the pool and cache + KryoSerializerPool.clear() + super.setDefaultClassLoader(classLoader) + } + + override def newKryo(): Kryo = { + val kryo = super.newKryo() + + // specific serialization implementations in Spark and commonly used classes + kryo.register(classOf[UnsafeRow], new KryoSerializableSerializer) + kryo.register(classOf[UTF8String], new KryoSerializableSerializer) + kryo.register(classOf[UpdateBlockInfo], new ExternalizableOnlySerializer) + kryo.register(classOf[CompressedMapStatus], new ExternalizableOnlySerializer) + kryo.register(classOf[HighlyCompressedMapStatus], + new ExternalizableOnlySerializer) + kryo.register(classOf[IndirectTaskResult[_]]) + kryo.register(classOf[RDDBlockId]) + kryo.register(classOf[ShuffleBlockId]) + kryo.register(classOf[ShuffleDataBlockId]) + kryo.register(classOf[ShuffleIndexBlockId]) + kryo.register(classOf[BroadcastBlockId]) + kryo.register(classOf[TaskResultBlockId]) + kryo.register(classOf[StreamBlockId]) + kryo.register(classOf[TorrentBroadcast[_]]) + + // other classes having specific implementations in snappy-spark but for + // spark compatibility, the serializer is not mentioned explicitly + kryo.register(classOf[ResultTask[_, _]]) + kryo.register(classOf[ShuffleMapTask]) + kryo.register(classOf[DirectTaskResult[_]]) + kryo.register(classOf[SerializableBuffer]) + kryo.register(Utils.classForName( + "org.apache.spark.rpc.netty.NettyRpcEndpointRef")) + kryo.register(Utils.classForName( + "org.apache.spark.rpc.netty.RequestMessage")) + kryo.register(classOf[LaunchTask]) + kryo.register(classOf[TaskDescription]) + kryo.register(classOf[StatusUpdate]) + kryo.register(classOf[TaskMetrics]) + kryo.register(classOf[InputMetrics]) + kryo.register(classOf[OutputMetrics]) + kryo.register(classOf[ShuffleReadMetrics]) + kryo.register(classOf[ShuffleWriteMetrics]) + kryo.register(classOf[LongAccumulator]) + kryo.register(classOf[DoubleAccumulator]) + kryo.register(classOf[CollectionAccumulator[_]]) + kryo.register(classOf[SQLMetric]) + kryo.register(classOf[ZippedPartitionsPartition]) + kryo.register(classOf[RemoveBlock]) + kryo.register(classOf[RemoveBroadcast]) + kryo.register(classOf[RemoveRdd]) + kryo.register(classOf[RemoveShuffle]) + kryo.register(classOf[BitSet]) + + kryo.register(classOf[BlockManagerId], new ExternalizableResolverSerializer( + BlockManagerId.getCachedBlockManagerId)) + kryo.register(classOf[StorageLevel], new ExternalizableResolverSerializer( + StorageLevel.getCachedStorageLevel)) + kryo.register(classOf[BlockAndExecutorId], new ExternalizableOnlySerializer) + kryo.register(classOf[StructType], StructTypeSerializer) + kryo.register(classOf[NarrowExecutorLocalSplitDep], + new KryoSerializableSerializer) + kryo.register(classOf[ConnectionProperties], ConnectionPropertiesSerializer) + kryo.register(classOf[RowFormatScanRDD], new KryoSerializableSerializer) + kryo.register(classOf[SparkShellRowRDD], new KryoSerializableSerializer) + kryo.register(classOf[ColumnarStorePartitionedRDD], + new KryoSerializableSerializer) + kryo.register(classOf[SparkShellCachedBatchRDD], + new KryoSerializableSerializer) + kryo.register(classOf[MultiBucketExecutorPartition], + new KryoSerializableSerializer) + + kryo + } + + override def newInstance(): PooledKryoSerializerInstance = { + new PooledKryoSerializerInstance(this) + } + + private[spark] override lazy val supportsRelocationOfSerializedObjects: Boolean = { + // If auto-reset is disabled, then Kryo may store references to duplicate + // occurrences of objects in the stream rather than writing those objects' + // serialized bytes, breaking relocation. See + // https://groups.google.com/d/msg/kryo-users/6ZUSyfjjtdo/FhGG1KHDXPgJ for more details. + newInstance().getAutoReset + } +} + +final class PooledObject(serializer: PooledKryoSerializer, + bufferSize: Int) { + val kryo: Kryo = serializer.newKryo() + val output: Output = new Output(bufferSize, -1) + val input: Input = new KryoInputStringFix(0) + lazy val streamInput: Input = new KryoInputStringFix(bufferSize) +} + +// TODO: SW: pool must be per SparkContext +object KryoSerializerPool { + + private[serializer] val autoResetField = + classOf[Kryo].getDeclaredField("autoReset") + autoResetField.setAccessible(true) + + private[serializer] val zeroBytes = new Array[Byte](0) + + private val (serializer, bufferSize): (PooledKryoSerializer, Int) = { + val conf = Option(SparkEnv.get).map(_.conf).getOrElse(new SparkConf()) + val bufferSizeKb = conf.getSizeAsKb("spark.kryoserializer.buffer", "4k") + val bufferSize = ByteUnit.KiB.toBytes(bufferSizeKb).toInt + (new PooledKryoSerializer(conf), bufferSize) + } + + private[this] val pool = new java.util.ArrayDeque[SoftReference[PooledObject]]() + + def borrow(): PooledObject = { + var ref: SoftReference[PooledObject] = null + pool.synchronized { + ref = pool.pollFirst() + } + while (ref != null) { + val poolObject = ref.get() + if (poolObject != null) { + return poolObject + } + pool.synchronized { + ref = pool.pollFirst() + } + } + new PooledObject(serializer, bufferSize) + } + + def release(poolObject: PooledObject, + clearInputBuffer: Boolean = false): Unit = { + // Call reset() to clear any Kryo state that might have been modified + // by the last operation to borrow this instance (SPARK-7766). + poolObject.kryo.reset() + if (clearInputBuffer) { + poolObject.input.setBuffer(zeroBytes) + } else { + poolObject.output.clear() + } + val ref = new SoftReference[PooledObject](poolObject) + pool.synchronized { + pool.addFirst(ref) + } + } + + def clear(): Unit = { + pool.synchronized { + pool.clear() + } + } +} + +private[spark] final class PooledKryoSerializerInstance( + pooledSerializer: PooledKryoSerializer) + extends SerializerInstance with Logging { + + private def readByteBufferAsInput(bb: ByteBuffer, input: Input): Unit = { + if (bb.hasArray) { + input.setBuffer(bb.array(), + bb.arrayOffset() + bb.position(), bb.remaining()) + } else { + val numBytes = bb.remaining() + val bytes = new Array[Byte](numBytes) + bb.get(bytes, 0, numBytes) + input.setBuffer(bytes, 0, numBytes) + } + } + + override def serialize[T: ClassTag](t: T): ByteBuffer = { + val poolObject = KryoSerializerPool.borrow() + val output = poolObject.output + try { + poolObject.kryo.writeClassAndObject(output, t) + val result = ByteBuffer.wrap(output.toBytes) + result + } finally { + KryoSerializerPool.release(poolObject) + } + } + + override def deserialize[T: ClassTag](buffer: ByteBuffer): T = { + val poolObject = KryoSerializerPool.borrow() + val input = poolObject.input + try { + readByteBufferAsInput(buffer, input) + val result = poolObject.kryo.readClassAndObject(input).asInstanceOf[T] + result + } finally { + KryoSerializerPool.release(poolObject, clearInputBuffer = true) + } + } + + override def deserialize[T: ClassTag](buffer: ByteBuffer, + loader: ClassLoader): T = { + val poolObject = KryoSerializerPool.borrow() + val kryo = poolObject.kryo + val input = poolObject.input + val oldClassLoader = kryo.getClassLoader + try { + kryo.setClassLoader(loader) + readByteBufferAsInput(buffer, input) + val result = kryo.readClassAndObject(input).asInstanceOf[T] + result + } finally { + kryo.setClassLoader(oldClassLoader) + KryoSerializerPool.release(poolObject, clearInputBuffer = true) + } + } + + override def serializeStream(stream: OutputStream): SerializationStream = { + new KryoReuseSerializationStream(stream) + } + + override def deserializeStream(stream: InputStream): DeserializationStream = { + new KryoStringFixDeserializationStream(stream) + } + + /** + * Returns true if auto-reset is on. The only reason this would be false is + * if the user-supplied registrator explicitly turns auto-reset off. + */ + def getAutoReset: Boolean = { + val poolObject = KryoSerializerPool.borrow() + try { + val result = KryoSerializerPool.autoResetField.get( + poolObject.kryo).asInstanceOf[Boolean] + result + } finally { + KryoSerializerPool.release(poolObject) + } + } +} + +private[serializer] class KryoReuseSerializationStream( + stream: OutputStream) extends SerializationStream { + + private[this] val poolObject = KryoSerializerPool.borrow() + + // use incoming stream itself if it is an output + private[this] var output = stream match { + case out: Output => out + case _ => poolObject.output.setOutputStream(stream); poolObject.output + } + + override def writeObject[T: ClassTag](t: T): SerializationStream = { + poolObject.kryo.writeClassAndObject(output, t) + this + } + + override def flush() { + if (output != null) { + output.flush() + } else { + throw new IOException("Stream is closed") + } + } + + override def close() { + if (output != null) { + try { + output.close() + } finally { + output = null + poolObject.output.setOutputStream(null) + KryoSerializerPool.release(poolObject) + } + } + } +} + +private[spark] class KryoStringFixDeserializationStream( + stream: InputStream) extends DeserializationStream { + + private[this] val poolObject = KryoSerializerPool.borrow() + + private[this] var input = poolObject.streamInput + input.setInputStream(stream) + + override def readObject[T: ClassTag](): T = { + try { + poolObject.kryo.readClassAndObject(input).asInstanceOf[T] + } catch { + // DeserializationStream uses the EOF exception to indicate + // stopping condition. + case e: KryoException + if e.getMessage.toLowerCase.contains("buffer underflow") => + throw new EOFException + } + } + + override def close() { + if (input != null) { + try { + // Kryo's Input automatically closes the input stream it is using. + input.close() + } finally { + input.setInputStream(null) + input = null + KryoSerializerPool.release(poolObject) + } + } + } +} + +/** + * Fix for https://github.com/EsotericSoftware/kryo/issues/128. + * Uses an additional 0x0 byte as end marker. + */ +private[spark] final class KryoInputStringFix(size: Int) + extends Input(size) { + + override def readString: String = { + require(1) + val b = buffer(position) + if ((b & 0x80) == 0) { + // ASCII. + position += 1 + readAscii + } else { + // fallback to super's version (position not incremented) + super.readString + } + } + + override def readStringBuilder: java.lang.StringBuilder = { + require(1) + val b = buffer(position) + if ((b & 0x80) == 0) { + // ASCII. + position += 1 + new java.lang.StringBuilder(readAscii) + } else { + // fallback to super's version (position not incremented) + super.readStringBuilder + } + } + + private def readAscii: String = { + val buffer = this.buffer + var end = position + val start = end - 1 + val limit = this.limit + var b = 0 + do { + if (end == limit) return readAscii_slow + b = buffer(end) + end += 1 + } while ((b & 0x80) == 0) + val nbytes = end - start + val bytes = new Array[Byte](nbytes) + System.arraycopy(buffer, start, bytes, 0, nbytes) + // Mask end of ascii bit. + bytes(nbytes - 1) = (bytes(nbytes - 1) & 0x7F).toByte + position = end + // noinspection ScalaDeprecation + new String(bytes, 0, 0, nbytes) + } + + private def readAscii_slow: String = { + position -= 1 // Re-read the first byte. + // Copy chars currently in buffer. + var charCount = limit - position + if (charCount > this.chars.length) { + this.chars = new Array[Char](charCount * 2) + } + var chars = this.chars + val buffer = this.buffer + var i = position + var ii = 0 + val n = limit + while (i < n) { + chars(ii) = buffer(i).toChar + i += 1 + ii += 1 + } + position = limit + // Copy additional chars one by one. + var readNextByte = true + while (readNextByte) { + require(1) + val b = buffer(position) + position += 1 + if (charCount == chars.length) { + val newChars = new Array[Char](charCount * 2) + System.arraycopy(chars, 0, newChars, 0, charCount) + chars = newChars + this.chars = newChars + } + if ((b & 0x80) != 0x80) { + chars(charCount) = b.toChar + charCount += 1 + } else { + chars(charCount) = (b & 0x7F).toChar + charCount += 1 + readNextByte = false + } + } + new String(chars, 0, charCount) + } +} diff --git a/core/src/main/scala/org/apache/spark/serializer/serializers.scala b/core/src/main/scala/org/apache/spark/serializer/serializers.scala new file mode 100644 index 0000000000..af1262aaed --- /dev/null +++ b/core/src/main/scala/org/apache/spark/serializer/serializers.scala @@ -0,0 +1,250 @@ +/* + * Copyright (c) 2016 SnappyData, Inc. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you + * may not use this file except in compliance with the License. You + * may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. See accompanying + * LICENSE file. + */ +package org.apache.spark.serializer + +import java.io.{Externalizable, IOException, ObjectInput, ObjectOutput} +import java.sql.Types + +import com.esotericsoftware.kryo.io.{Input, KryoObjectInput, KryoObjectOutput, Output} +import com.esotericsoftware.kryo.{Kryo, KryoException, Serializer => KryoClassSerializer} + +import org.apache.spark.sql.jdbc.JdbcDialect +import org.apache.spark.sql.row.{GemFireXDClientDialect, GemFireXDDialect} +import org.apache.spark.sql.sources.ConnectionProperties +import org.apache.spark.sql.types._ + + +private[spark] class ExternalizableOnlySerializer[T <: Externalizable] + extends KryoClassSerializer[T] { + + private var objectInput: KryoObjectInput = _ + private var objectOutput: KryoObjectOutput = _ + + override def write(kryo: Kryo, output: Output, obj: T): Unit = { + try { + obj.writeExternal(getObjectOutput(kryo, output)) + } catch { + case e@(_: ClassCastException | _: IOException) => + throw new KryoException(e) + } + } + + override def read(kryo: Kryo, input: Input, c: Class[T]): T = { + try { + val obj = kryo.newInstance(c) + obj.readExternal(getObjectInput(kryo, input)) + obj + } catch { + case e@(_: ClassCastException | _: ClassNotFoundException | + _: IOException) => throw new KryoException(e) + } + } + + private def getObjectOutput(kryo: Kryo, output: Output): ObjectOutput = { + if (objectOutput == null) { + objectOutput = new KryoObjectOutput(kryo, output) + } else { + objectOutput.setOutput(output) + } + objectOutput + } + + private def getObjectInput(kryo: Kryo, input: Input): ObjectInput = { + if (objectInput == null) { + objectInput = new KryoObjectInput(kryo, input) + } else { + objectInput.setInput(input) + } + objectInput + } +} + +private[spark] final class ExternalizableResolverSerializer[T <: Externalizable]( + readResolve: T => T) extends ExternalizableOnlySerializer[T] { + + override def read(kryo: Kryo, input: Input, c: Class[T]): T = { + readResolve(super.read(kryo, input, c)) + } +} + +object StructTypeSerializer extends KryoClassSerializer[StructType] { + + def writeType(kryo: Kryo, output: Output, dataType: DataType): Unit = { + dataType match { + case IntegerType => output.writeVarInt(Types.INTEGER, false) + case LongType => output.writeVarInt(Types.BIGINT, false) + case StringType => output.writeVarInt(Types.CLOB, false) + case DoubleType => output.writeVarInt(Types.DOUBLE, false) + case FloatType => output.writeVarInt(Types.FLOAT, false) + case ShortType => output.writeVarInt(Types.SMALLINT, false) + case ByteType => output.writeVarInt(Types.TINYINT, false) + case BooleanType => output.writeVarInt(Types.BOOLEAN, false) + case BinaryType => output.writeVarInt(Types.BLOB, false) + case TimestampType => output.writeVarInt(Types.TIMESTAMP, false) + case DateType => output.writeVarInt(Types.DATE, false) + case t: DecimalType => + output.writeVarInt(Types.DECIMAL, false) + output.writeVarInt(t.precision, true) + output.writeVarInt(t.scale, true) + case a: ArrayType => + output.writeVarInt(Types.ARRAY, false) + writeType(kryo, output, a.elementType) + output.writeBoolean(a.containsNull) + case m: MapType => + // indicates MapType since there is no equivalent in JDBC + output.writeVarInt(Types.JAVA_OBJECT, false) + writeType(kryo, output, m.keyType) + writeType(kryo, output, m.valueType) + output.writeBoolean(m.valueContainsNull) + case s: StructType => + output.writeVarInt(Types.STRUCT, false) + write(kryo, output, s) + case _ => + output.writeVarInt(Types.OTHER, false) + kryo.writeClassAndObject(output, dataType) + } + } + + def readType(kryo: Kryo, input: Input): DataType = { + input.readVarInt(false) match { + case Types.INTEGER => IntegerType + case Types.BIGINT => LongType + case Types.CLOB => StringType + case Types.DOUBLE => DoubleType + case Types.FLOAT => FloatType + case Types.SMALLINT => ShortType + case Types.TINYINT => ByteType + case Types.BOOLEAN => BooleanType + case Types.BLOB => BinaryType + case Types.TIMESTAMP => TimestampType + case Types.DATE => DateType + case Types.DECIMAL => + val precision = input.readVarInt(true) + val scale = input.readVarInt(true) + DecimalType(precision, scale) + case Types.ARRAY => + val elementType = readType(kryo, input) + ArrayType(elementType, input.readBoolean()) + case Types.JAVA_OBJECT => // indicates MapType + val keyType = readType(kryo, input) + val valueType = readType(kryo, input) + MapType(keyType, valueType, input.readBoolean()) + case Types.STRUCT => read(kryo, input, classOf[StructType]) + case Types.OTHER => kryo.readClassAndObject(input).asInstanceOf[DataType] + case t => throw new KryoException( + s"Serialization error: unexpected DataType ID $t") + } + } + + override def write(kryo: Kryo, output: Output, struct: StructType): Unit = { + val fields = struct.fields + val numFields = fields.length + output.writeVarInt(numFields, true) + var i = 0 + while (i < numFields) { + val field = fields(i) + output.writeString(field.name) + writeType(kryo, output, field.dataType) + output.writeBoolean(field.nullable) + TypeUtilities.writeMetadata(field.metadata, kryo, output) + i += 1 + } + } + + override def read(kryo: Kryo, input: Input, + c: Class[StructType]): StructType = { + val numFields = input.readVarInt(true) + val fields = new Array[StructField](numFields) + var i = 0 + while (i < numFields) { + val name = input.readString() + val dataType = readType(kryo, input) + val nullable = input.readBoolean() + val metadata = TypeUtilities.readMetadata(kryo, input) + fields(i) = StructField(name, dataType, nullable, metadata) + i += 1 + } + StructType(fields) + } +} + +object ConnectionPropertiesSerializer + extends KryoClassSerializer[ConnectionProperties] { + + override def write(kryo: Kryo, output: Output, + connProps: ConnectionProperties): Unit = { + output.writeString(connProps.url) + output.writeString(connProps.driver) + connProps.dialect match { + case GemFireXDDialect => output.writeByte(0) + case GemFireXDClientDialect => output.writeByte(1) + case d => output.writeByte(2) + kryo.writeClassAndObject(output, d) + } + val poolProps = connProps.poolProps + if (poolProps ne null) { + val numProps = poolProps.size + output.writeVarInt(numProps, true) + if (numProps > 0) { + for ((key, value) <- poolProps) { + output.writeString(key) + output.writeString(value) + } + } + } else { + output.writeVarInt(0, true) + } + // write only executor properties if available since on target side + // that is the one which will be used + if (connProps.executorConnProps.isEmpty) { + TypeUtilities.writeProperties(connProps.connProps, output) + } else { + TypeUtilities.writeProperties(connProps.executorConnProps, output) + } + output.writeBoolean(connProps.hikariCP) + } + + override def read(kryo: Kryo, input: Input, + c: Class[ConnectionProperties]): ConnectionProperties = { + read(kryo, input) + } + + def read(kryo: Kryo, input: Input): ConnectionProperties = { + val url = input.readString() + val driver = input.readString() + val dialect = input.readByte() match { + case 0 => GemFireXDDialect + case 1 => GemFireXDClientDialect + case _ => kryo.readClassAndObject(input).asInstanceOf[JdbcDialect] + } + var numProps = input.readVarInt(true) + var poolProps: Map[String, String] = Map.empty + if (numProps > 0) { + val propsBuilder = Map.newBuilder[String, String] + while (numProps > 0) { + val key = input.readString() + propsBuilder += key -> input.readString() + numProps -= 1 + } + poolProps = propsBuilder.result + } + val connProps = TypeUtilities.readProperties(input) + val hikariCP = input.readBoolean() + ConnectionProperties(url, driver, dialect, poolProps, connProps, + connProps, hikariCP) + } +} diff --git a/core/src/main/scala/org/apache/spark/sql/collection/Utils.scala b/core/src/main/scala/org/apache/spark/sql/collection/Utils.scala index 76c2f61e59..e67b9a8d13 100644 --- a/core/src/main/scala/org/apache/spark/sql/collection/Utils.scala +++ b/core/src/main/scala/org/apache/spark/sql/collection/Utils.scala @@ -27,10 +27,13 @@ import scala.language.existentials import scala.reflect.ClassTag import scala.util.Sorting -import io.snappydata.ToolsCallback +import com.esotericsoftware.kryo.io.{Input, Output} +import com.esotericsoftware.kryo.{Kryo, KryoSerializable} +import io.snappydata.{Constant, ToolsCallback} import org.apache.commons.math3.distribution.NormalDistribution import org.apache.spark.internal.Logging +import org.apache.spark.io.CompressionCodec import org.apache.spark.rdd.RDD import org.apache.spark.scheduler.TaskLocation import org.apache.spark.scheduler.local.LocalSchedulerBackend @@ -40,14 +43,14 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.util.{ArrayData, DateTimeUtils, MapData} import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow} import org.apache.spark.sql.execution.command.ExecutedCommandExec -import org.apache.spark.sql.execution.{CollectLimitExec, LocalTableScanExec, SparkPlan, TakeOrderedAndProjectExec} import org.apache.spark.sql.execution.datasources.jdbc.{DriverRegistry, DriverWrapper} +import org.apache.spark.sql.execution.{CollectLimitExec, LocalTableScanExec, SparkPlan, TakeOrderedAndProjectExec} import org.apache.spark.sql.hive.SnappyStoreHiveCatalog import org.apache.spark.sql.sources.CastLongTime import org.apache.spark.sql.types._ import org.apache.spark.storage.BlockManagerId import org.apache.spark.util.io.ChunkedByteBuffer -import org.apache.spark.{Partition, Partitioner, SparkContext, SparkEnv, TaskContext} +import org.apache.spark.{Partition, Partitioner, SparkConf, SparkContext, SparkEnv, TaskContext} object Utils { @@ -624,6 +627,36 @@ object Utils { def newChunkedByteBuffer(chunks: Array[ByteBuffer]): ChunkedByteBuffer = new ChunkedByteBuffer(chunks) + + def setDefaultConfProperty(conf: SparkConf, name: String, + default: String): Unit = { + conf.getOption(name) match { + case None => + // set both in configuration and as System property for all + // confs created on the fly + conf.set(name, default) + System.setProperty(name, default) + case _ => + } + } + + def setDefaultSerializerAndCodec(conf: SparkConf): Unit = { + // enable optimized pooled Kryo serializer by default + setDefaultConfProperty(conf, "spark.serializer", + Constant.DEFAULT_SERIALIZER) + setDefaultConfProperty(conf, "spark.closure.serializer", + Constant.DEFAULT_SERIALIZER) + if (Constant.DEFAULT_CODEC != CompressionCodec.DEFAULT_COMPRESSION_CODEC) { + setDefaultConfProperty(conf, "spark.io.compression.codec", + Constant.DEFAULT_CODEC) + } + } + + def clearDefaultSerializerAndCodec(): Unit = { + System.clearProperty("spark.serializer") + System.clearProperty("spark.closure.serializer") + System.clearProperty("spark.io.compression.codec") + } } class ExecutorLocalRDD[T: ClassTag](_sc: SparkContext, @@ -693,8 +726,48 @@ class ExecutorLocalPartition(override val index: Int, override def toString: String = s"ExecutorLocalPartition($index, $blockId)" } -class MultiBucketExecutorPartition(override val index: Int, - val buckets: Set[Int], val hostExecutorIds: Seq[String]) extends Partition { +class MultiBucketExecutorPartition(private[this] var _index: Int, + private[this] var _buckets: Set[Int], + private[this] var _hostExecutorIds: Seq[String]) + extends Partition with KryoSerializable { + + override def index: Int = _index + + def buckets: Set[Int] = _buckets + + def hostExecutorIds: Seq[String] = _hostExecutorIds + + override def write(kryo: Kryo, output: Output): Unit = { + output.writeVarInt(_index, true) + output.writeVarInt(_buckets.size, true) + for (bucket <- _buckets) { + output.writeVarInt(bucket, true) + } + output.writeVarInt(_hostExecutorIds.length, true) + for (executor <- _hostExecutorIds) { + output.writeString(executor) + } + } + + override def read(kryo: Kryo, input: Input): Unit = { + _index = input.readVarInt(true) + + var numBuckets = input.readVarInt(true) + val bucketsBuilder = Set.newBuilder[Int] + while (numBuckets > 0) { + bucketsBuilder += input.readVarInt(true) + numBuckets -= 1 + } + _buckets = bucketsBuilder.result() + + var numExecutors = input.readVarInt(true) + val executorBuilder = Seq.newBuilder[String] + while (numExecutors > 0) { + executorBuilder += input.readString() + numExecutors -= 1 + } + _hostExecutorIds = executorBuilder.result() + } override def toString: String = s"MultiBucketExecutorPartition($index, $buckets, $hostExecutorIds)" @@ -704,8 +777,9 @@ class MultiBucketExecutorPartition(override val index: Int, private[spark] case class NarrowExecutorLocalSplitDep( @transient rdd: RDD[_], @transient splitIndex: Int, - var split: Partition) extends Serializable { + private var split: Partition) extends Serializable with KryoSerializable { + // noinspection ScalaUnusedSymbol @throws[java.io.IOException] private def writeObject(oos: ObjectOutputStream): Unit = org.apache.spark.util.Utils.tryOrIOException { @@ -713,6 +787,17 @@ private[spark] case class NarrowExecutorLocalSplitDep( split = rdd.partitions(splitIndex) oos.defaultWriteObject() } + + override def write(kryo: Kryo, output: Output): Unit = + org.apache.spark.util.Utils.tryOrIOException { + // Update the reference to parent split at the time of task serialization + split = rdd.partitions(splitIndex) + kryo.writeClassAndObject(output, split) + } + + override def read(kryo: Kryo, input: Input): Unit = { + split = kryo.readClassAndObject(input).asInstanceOf[Partition] + } } /** diff --git a/core/src/main/scala/org/apache/spark/sql/execution/RDDKryo.scala b/core/src/main/scala/org/apache/spark/sql/execution/RDDKryo.scala new file mode 100644 index 0000000000..74e492a0b1 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/sql/execution/RDDKryo.scala @@ -0,0 +1,66 @@ +/* + * Copyright (c) 2016 SnappyData, Inc. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you + * may not use this file except in compliance with the License. You + * may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. See accompanying + * LICENSE file. + */ +package org.apache.spark.sql.execution + +import scala.reflect.ClassTag + +import com.esotericsoftware.kryo.io.{Input, Output} +import com.esotericsoftware.kryo.{Kryo, KryoSerializable} + +import org.apache.spark.rdd.{RDD, RDDCheckpointData} +import org.apache.spark.sql.types.TypeUtilities +import org.apache.spark.storage.StorageLevel +import org.apache.spark.{Dependency, SparkContext} + +/** base RDD KryoSerializable class that will serialize minimal RDD fields */ +abstract class RDDKryo[T: ClassTag](_sc: SparkContext, _deps: Seq[Dependency[_]]) + extends RDD[T](_sc, _deps) with KryoSerializable { + + override def write(kryo: Kryo, output: Output): Unit = { + output.writeInt(id) + val storageLevel = getStorageLevel + if (storageLevel eq StorageLevel.NONE) { + output.writeByte(0) + output.writeByte(0) + } else { + output.writeByte(storageLevel.toInt) + output.writeByte(storageLevel.replication) + } + checkpointData match { + case None => output.writeBoolean(false) + case Some(data) => output.writeBoolean(true) + kryo.writeClassAndObject(output, data) + } + } + + override def read(kryo: Kryo, input: Input): Unit = { + TypeUtilities.rddIdField.set(this, input.readInt()) + val flags = input.readByte() + val replication = input.readByte() + if (flags == 0 && replication == 0) { + TypeUtilities.rddStorageLevelField.set(this, StorageLevel.NONE) + } else { + TypeUtilities.rddStorageLevelField.set(this, StorageLevel(flags, replication)) + } + if (input.readBoolean()) { + checkpointData = Some(kryo.readClassAndObject(input) + .asInstanceOf[RDDCheckpointData[T]]) + } else { + checkpointData = None + } + } +} diff --git a/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer.scala b/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer.scala deleted file mode 100644 index bed9ee1ce2..0000000000 --- a/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer.scala +++ /dev/null @@ -1,110 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.execution - -import java.nio.ByteBuffer -import java.util.{HashMap => JavaHashMap} - -import scala.reflect.ClassTag - -import com.esotericsoftware.kryo.io.{Input, Output} -import com.esotericsoftware.kryo.{Kryo, Serializer} -import com.twitter.chill.ResourcePool - -import org.apache.spark.serializer.{KryoSerializer, SerializerInstance} -import org.apache.spark.sql.types.Decimal -import org.apache.spark.util.MutablePair -import org.apache.spark.{SparkConf, SparkEnv} - -/** - * See how to avoid usage of this class in SnappyData, as Spark has stopped using the class - */ -private[sql] class SparkSqlSerializer(conf: SparkConf) extends KryoSerializer(conf) { - override def newKryo(): Kryo = { - val kryo = super.newKryo() - kryo.setRegistrationRequired(false) - kryo.register(classOf[MutablePair[_, _]]) - kryo.register(classOf[org.apache.spark.sql.catalyst.expressions.GenericRow]) - kryo.register(classOf[org.apache.spark.sql.catalyst.expressions.GenericInternalRow]) - kryo.register(classOf[org.apache.spark.sql.catalyst.expressions.GenericMutableRow]) - kryo.register(classOf[java.math.BigDecimal], new JavaBigDecimalSerializer) - kryo.register(classOf[BigDecimal], new ScalaBigDecimalSerializer) - - kryo.register(classOf[Decimal]) - kryo.register(classOf[JavaHashMap[_, _]]) - - kryo.setReferences(false) - kryo - } -} - -private[execution] class KryoResourcePool(size: Int) - extends ResourcePool[SerializerInstance](size) { - - val ser: SparkSqlSerializer = { - val sparkConf = Option(SparkEnv.get).map(_.conf).getOrElse(new SparkConf()) - new SparkSqlSerializer(sparkConf) - } - - def newInstance(): SerializerInstance = ser.newInstance() -} - -private[sql] object SparkSqlSerializer { - @transient lazy val resourcePool = new KryoResourcePool(30) - - private[this] def acquireRelease[O](fn: SerializerInstance => O): O = { - val kryo = resourcePool.borrow - try { - fn(kryo) - } finally { - resourcePool.release(kryo) - } - } - - def serialize[T: ClassTag](o: T): Array[Byte] = - acquireRelease { k => - k.serialize(o).array() - } - - def deserialize[T: ClassTag](bytes: Array[Byte]): T = - acquireRelease { k => - k.deserialize[T](ByteBuffer.wrap(bytes)) - } -} - -private[sql] class JavaBigDecimalSerializer extends Serializer[java.math.BigDecimal] { - def write(kryo: Kryo, output: Output, bd: java.math.BigDecimal) { - // TODO: There are probably more efficient representations than strings... - output.writeString(bd.toString) - } - - def read(kryo: Kryo, input: Input, tpe: Class[java.math.BigDecimal]): java.math.BigDecimal = { - new java.math.BigDecimal(input.readString()) - } -} - -private[sql] class ScalaBigDecimalSerializer extends Serializer[BigDecimal] { - def write(kryo: Kryo, output: Output, bd: BigDecimal) { - // TODO: There are probably more efficient representations than strings... - output.writeString(bd.toString) - } - - def read(kryo: Kryo, input: Input, tpe: Class[BigDecimal]): BigDecimal = { - new java.math.BigDecimal(input.readString()) - } -} diff --git a/core/src/main/scala/org/apache/spark/sql/execution/columnar/ColumnTableScan.scala b/core/src/main/scala/org/apache/spark/sql/execution/columnar/ColumnTableScan.scala index a4666225a8..f1fe5a8b0e 100644 --- a/core/src/main/scala/org/apache/spark/sql/execution/columnar/ColumnTableScan.scala +++ b/core/src/main/scala/org/apache/spark/sql/execution/columnar/ColumnTableScan.scala @@ -282,7 +282,9 @@ private[sql] final case class ColumnTableScan( ctx.addMutableState("int", batchIndex, s"$batchIndex = 0;") // need DataType and nullable to get decoder in generated code - val fields = ctx.addReferenceObj("fields", output.toArray, "Attribute[]") + // shipping as StructType for efficient serialization + val planSchema = ctx.addReferenceObj("schema", schema, + classOf[StructType].getName) val columnBufferInitCode = new StringBuilder val bufferInitCode = new StringBuilder val cursorUpdateCode = new StringBuilder @@ -359,10 +361,11 @@ private[sql] final case class ColumnTableScan( } columnBufferInitCode.append( s""" - $decoder = $decoderClass.getColumnDecoder($buffer, - $fields[$index]); + $decoder = $decoderClass.getColumnDecoder( + $buffer, $planSchema.apply($index)); // intialize the decoder and store the starting cursor position - $cursor = $decoder.initializeDecoding($buffer, $fields[$index]); + $cursor = $decoder.initializeDecoding( + $buffer, $planSchema.apply($index)); """) bufferInitCode.append( s""" diff --git a/core/src/main/scala/org/apache/spark/sql/execution/columnar/ExternalStoreUtils.scala b/core/src/main/scala/org/apache/spark/sql/execution/columnar/ExternalStoreUtils.scala index 447edb1140..03a25cecfa 100644 --- a/core/src/main/scala/org/apache/spark/sql/execution/columnar/ExternalStoreUtils.scala +++ b/core/src/main/scala/org/apache/spark/sql/execution/columnar/ExternalStoreUtils.scala @@ -244,15 +244,18 @@ object ExternalStoreUtils { connProps, executorConnProps, hikariCP) } + def getConnection(id: String, connProperties: ConnectionProperties, + forExecutor: Boolean): Connection = { + registerDriver(connProperties.driver) + val connProps = if (forExecutor) connProperties.executorConnProps + else connProperties.connProps + ConnectionPool.getPoolConnection(id, connProperties.dialect, + connProperties.poolProps, connProps, connProperties.hikariCP) + } + def getConnector(id: String, connProperties: ConnectionProperties, - forExecutor: Boolean): () => Connection = { - () => { - registerDriver(connProperties.driver) - val connProps = if (forExecutor) connProperties.executorConnProps - else connProperties.connProps - ConnectionPool.getPoolConnection(id, connProperties.dialect, - connProperties.poolProps, connProps, connProperties.hikariCP) - } + forExecutor: Boolean): () => Connection = () => { + getConnection(id, connProperties, forExecutor) } def getConnectionType(dialect: JdbcDialect): ConnectionType.Value = { diff --git a/core/src/main/scala/org/apache/spark/sql/execution/columnar/JDBCAppendableRelation.scala b/core/src/main/scala/org/apache/spark/sql/execution/columnar/JDBCAppendableRelation.scala index d31a7cd4c0..cd34c7e9dd 100644 --- a/core/src/main/scala/org/apache/spark/sql/execution/columnar/JDBCAppendableRelation.scala +++ b/core/src/main/scala/org/apache/spark/sql/execution/columnar/JDBCAppendableRelation.scala @@ -28,7 +28,6 @@ import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.SortDirection import org.apache.spark.sql.collection.Utils -import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.execution.datasources.DataSource import org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils import org.apache.spark.sql.hive.{QualifiedTableName, SnappyStoreHiveCatalog} @@ -50,14 +49,13 @@ case class JDBCAppendableRelation( override val schema: StructType, origOptions: Map[String, String], externalStore: ExternalStore, - @transient override val sqlContext: SQLContext) - extends BaseRelation - with PrunedUnsafeFilteredScan - with InsertableRelation - with DestroyRelation - with IndexableRelation - with Logging - with Serializable { + @transient override val sqlContext: SQLContext) extends BaseRelation + with PrunedUnsafeFilteredScan + with InsertableRelation + with DestroyRelation + with IndexableRelation + with Logging + with Serializable { self => @@ -85,9 +83,6 @@ case class JDBCAppendableRelation( val schemaFields = Utils.getSchemaFields(schema) - final lazy val executorConnector = ExternalStoreUtils.getConnector(table, - connProperties, forExecutor = true) - private val bufferLock = new ReentrantReadWriteLock() /** Acquires a read lock on the cache for the duration of `f`. */ @@ -108,8 +103,6 @@ case class JDBCAppendableRelation( } } - // TODO: Suranjan currently doesn't apply any filters. - // will see that later. override def buildUnsafeScan(requiredColumns: Array[String], filters: Array[Filter]): (RDD[Any], Seq[RDD[InternalRow]]) = { val (cachedColumnBuffers, requestedColumns) = scanTable(table, @@ -233,7 +226,7 @@ case class JDBCAppendableRelation( createTable(externalStore, s"create table $tableName (uuid varchar(36) " + "not null, partitionId integer not null, numRows integer not null, " + "stats blob, " + schema.fields.map(structField => - externalStore.columnPrefix + structField.name + " blob") + externalStore.columnPrefix + structField.name + " blob") .mkString(", ") + s", $primarykey) $partitionStrategy", tableName, dropIfExists = false) // for test make it false } @@ -310,14 +303,13 @@ object JDBCAppendableRelation extends Logging { } else { Constant.DEFAULT_SCHEMA + "__" + table } - Constant.INTERNAL_SCHEMA_NAME + "." + tableName + Constant.SHADOW_TABLE_SUFFIX + Constant.INTERNAL_SCHEMA_NAME + "." + tableName + Constant.SHADOW_TABLE_SUFFIX } } final class DefaultSource extends ColumnarRelationProvider -class ColumnarRelationProvider - extends SchemaRelationProvider +class ColumnarRelationProvider extends SchemaRelationProvider with CreatableRelationProvider { def createRelation(sqlContext: SQLContext, mode: SaveMode, diff --git a/core/src/main/scala/org/apache/spark/sql/execution/columnar/encoding/BooleanBitSetEncoding.scala b/core/src/main/scala/org/apache/spark/sql/execution/columnar/encoding/BooleanBitSetEncoding.scala index f2b232448c..ed05daf36d 100644 --- a/core/src/main/scala/org/apache/spark/sql/execution/columnar/encoding/BooleanBitSetEncoding.scala +++ b/core/src/main/scala/org/apache/spark/sql/execution/columnar/encoding/BooleanBitSetEncoding.scala @@ -16,8 +16,7 @@ */ package org.apache.spark.sql.execution.columnar.encoding -import org.apache.spark.sql.catalyst.expressions.Attribute -import org.apache.spark.sql.types.{BooleanType, DataType} +import org.apache.spark.sql.types.{BooleanType, DataType, StructField} final class BooleanBitSetEncoding extends BooleanBitSetEncodingBase with NotNullColumn @@ -36,7 +35,7 @@ abstract class BooleanBitSetEncodingBase extends ColumnEncoding { dataType == BooleanType override def initializeDecoding(columnBytes: AnyRef, - field: Attribute): Long = { + field: StructField): Long = { val cursor = super.initializeDecoding(columnBytes, field) // read the count but its not used since CachedBatch has numRows ColumnEncoding.readInt(columnBytes, cursor) diff --git a/core/src/main/scala/org/apache/spark/sql/execution/columnar/encoding/ColumnEncoding.scala b/core/src/main/scala/org/apache/spark/sql/execution/columnar/encoding/ColumnEncoding.scala index 0b3f4b85d3..dce73cc547 100644 --- a/core/src/main/scala/org/apache/spark/sql/execution/columnar/encoding/ColumnEncoding.scala +++ b/core/src/main/scala/org/apache/spark/sql/execution/columnar/encoding/ColumnEncoding.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.columnar.encoding import java.lang.reflect.Field import java.nio.ByteOrder -import org.apache.spark.sql.catalyst.expressions.{Attribute, UnsafeArrayData, UnsafeMapData, UnsafeRow} +import org.apache.spark.sql.catalyst.expressions.{UnsafeArrayData, UnsafeMapData, UnsafeRow} import org.apache.spark.sql.collection.Utils import org.apache.spark.sql.types._ import org.apache.spark.unsafe.Platform @@ -33,9 +33,9 @@ abstract class ColumnEncoding { def supports(dataType: DataType): Boolean protected def initializeNulls(columnBytes: AnyRef, - field: Attribute): Long + field: StructField): Long - def initializeDecoding(columnBytes: AnyRef, field: Attribute): Long = { + def initializeDecoding(columnBytes: AnyRef, field: StructField): Long = { val cursor = initializeNulls(columnBytes, field) val dataType = Utils.getSQLDataType(field.dataType) // no typeId for complex types @@ -232,7 +232,7 @@ object ColumnEncoding { ) def getColumnDecoder(columnBytes: Array[Byte], - field: Attribute): ColumnEncoding = { + field: StructField): ColumnEncoding = { // read and skip null values array at the start, then read the typeId var cursor = Platform.BYTE_ARRAY_OFFSET val nullValuesSize = readInt(columnBytes, cursor) << 2 @@ -341,7 +341,7 @@ object ColumnEncoding { trait NotNullColumn extends ColumnEncoding { override protected final def initializeNulls( - columnBytes: AnyRef, field: Attribute): Long = { + columnBytes: AnyRef, field: StructField): Long = { val cursor = Platform.BYTE_ARRAY_OFFSET val numNullValues = ColumnEncoding.readInt(columnBytes, cursor) if (numNullValues != 0) { @@ -392,7 +392,7 @@ trait NullableColumn extends ColumnEncoding { } override protected final def initializeNulls( - columnBytes: AnyRef, field: Attribute): Long = { + columnBytes: AnyRef, field: StructField): Long = { val cursor = Platform.BYTE_ARRAY_OFFSET val nullValuesSize = ColumnEncoding.readInt(columnBytes, cursor) << 2 if (nullValuesSize > 0) { diff --git a/core/src/main/scala/org/apache/spark/sql/execution/columnar/encoding/DictionaryEncoding.scala b/core/src/main/scala/org/apache/spark/sql/execution/columnar/encoding/DictionaryEncoding.scala index c49ba3603d..524b84c9ed 100644 --- a/core/src/main/scala/org/apache/spark/sql/execution/columnar/encoding/DictionaryEncoding.scala +++ b/core/src/main/scala/org/apache/spark/sql/execution/columnar/encoding/DictionaryEncoding.scala @@ -16,9 +16,8 @@ */ package org.apache.spark.sql.execution.columnar.encoding -import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.collection.Utils -import org.apache.spark.sql.types.{DataType, DateType, IntegerType, LongType, StringType, TimestampType} +import org.apache.spark.sql.types.{DataType, DateType, IntegerType, LongType, StringType, StructField, TimestampType} import org.apache.spark.unsafe.Platform import org.apache.spark.unsafe.types.UTF8String @@ -42,7 +41,7 @@ abstract class DictionaryEncodingBase extends ColumnEncoding { private[this] final var longDictionary: Array[Long] = _ override def initializeDecoding(columnBytes: AnyRef, - field: Attribute): Long = { + field: StructField): Long = { var cursor = super.initializeDecoding(columnBytes, field) val elementNum = ColumnEncoding.readInt(columnBytes, cursor) cursor += 4 diff --git a/core/src/main/scala/org/apache/spark/sql/execution/columnar/encoding/RunLengthEncoding.scala b/core/src/main/scala/org/apache/spark/sql/execution/columnar/encoding/RunLengthEncoding.scala index 247e8cf56f..192aab35c5 100644 --- a/core/src/main/scala/org/apache/spark/sql/execution/columnar/encoding/RunLengthEncoding.scala +++ b/core/src/main/scala/org/apache/spark/sql/execution/columnar/encoding/RunLengthEncoding.scala @@ -16,7 +16,6 @@ */ package org.apache.spark.sql.execution.columnar.encoding -import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.types._ import org.apache.spark.unsafe.Platform import org.apache.spark.unsafe.types.UTF8String @@ -42,7 +41,7 @@ abstract class RunLengthEncodingBase extends ColumnEncoding { } override def initializeDecoding(columnBytes: AnyRef, - field: Attribute): Long = { + field: StructField): Long = { cursorPos = super.initializeDecoding(columnBytes, field) // use the current count + value for cursor since that will be read and // written most frequently while actual cursor will be less frequently used diff --git a/core/src/main/scala/org/apache/spark/sql/execution/columnar/encoding/Uncompressed.scala b/core/src/main/scala/org/apache/spark/sql/execution/columnar/encoding/Uncompressed.scala index d5a92aa080..585c98b732 100644 --- a/core/src/main/scala/org/apache/spark/sql/execution/columnar/encoding/Uncompressed.scala +++ b/core/src/main/scala/org/apache/spark/sql/execution/columnar/encoding/Uncompressed.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.execution.columnar.encoding import java.math.{BigDecimal, BigInteger} -import org.apache.spark.sql.catalyst.expressions.{Attribute, UnsafeArrayData, UnsafeMapData, UnsafeRow} +import org.apache.spark.sql.catalyst.expressions.{UnsafeArrayData, UnsafeMapData, UnsafeRow} import org.apache.spark.sql.collection.Utils import org.apache.spark.sql.types._ import org.apache.spark.unsafe.Platform @@ -39,7 +39,7 @@ abstract class UncompressedBase extends ColumnEncoding { def supports(dataType: DataType): Boolean = true override final def initializeDecoding(columnBytes: AnyRef, - field: Attribute): Long = { + field: StructField): Long = { val cursor = initializeNulls(columnBytes, field) // typeId takes 4 bytes for non-complex types else 0; // adjust cursor for the first next call to avoid extra checks in next diff --git a/core/src/main/scala/org/apache/spark/sql/execution/columnar/impl/ColumnFormatRelation.scala b/core/src/main/scala/org/apache/spark/sql/execution/columnar/impl/ColumnFormatRelation.scala index c1b04a299b..e42084af77 100644 --- a/core/src/main/scala/org/apache/spark/sql/execution/columnar/impl/ColumnFormatRelation.scala +++ b/core/src/main/scala/org/apache/spark/sql/execution/columnar/impl/ColumnFormatRelation.scala @@ -24,7 +24,6 @@ import com.gemstone.gemfire.internal.cache.PartitionedRegion import com.pivotal.gemfirexd.internal.engine.Misc import io.snappydata.Constant -import org.apache.spark.{Logging, Partition} import org.apache.spark.rdd.RDD import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.InternalRow @@ -41,6 +40,7 @@ import org.apache.spark.sql.row.GemFireXDDialect import org.apache.spark.sql.sources._ import org.apache.spark.sql.store.{CodeGeneration, StoreUtils} import org.apache.spark.sql.types.StructType +import org.apache.spark.{Logging, Partition} /** * This class acts as a DataSource provider for column format tables provided Snappy. @@ -118,10 +118,10 @@ class BaseColumnFormatRelation( filters: Array[Filter]): (RDD[Any], Seq[RDD[InternalRow]]) = { val (rdd, _) = scanTable(table, requiredColumns, filters) - val zipped = buildRowBufferRDD(rdd.partitions, requiredColumns, filters, true). - zipPartitions(rdd) { (leftItr, rightItr) => - Iterator[Any](leftItr, rightItr) - } + val zipped = buildRowBufferRDD(rdd.partitions, requiredColumns, filters, + useResultSet = true).zipPartitions(rdd) { (leftItr, rightItr) => + Iterator[Any](leftItr, rightItr) + } (zipped, Nil) } @@ -143,8 +143,6 @@ class BaseColumnFormatRelation( case ConnectionType.Embedded => new RowFormatScanRDD( session, - executorConnector, - ExternalStoreUtils.pruneSchema(schemaFields, requiredColumns), resolvedName, isPartitioned, requiredColumns, @@ -158,8 +156,6 @@ class BaseColumnFormatRelation( case _ => new SparkShellRowRDD( session, - executorConnector, - ExternalStoreUtils.pruneSchema(schemaFields, requiredColumns), resolvedName, isPartitioned, requiredColumns, @@ -471,7 +467,7 @@ class ColumnFormatRelation( override def recoverDependentRelations(properties: Map[String, String]): Unit = { var dependentRelations: Array[String] = Array() - if (None != properties.get(ExternalStoreUtils.DEPENDENT_RELATIONS)) { + if (properties.get(ExternalStoreUtils.DEPENDENT_RELATIONS).isDefined) { dependentRelations = properties(ExternalStoreUtils.DEPENDENT_RELATIONS).split(",") } @@ -678,7 +674,7 @@ final class DefaultSource extends ColumnarRelationProvider { val ddlExtensionForShadowTable = StoreUtils.ddlExtensionString( parametersForShadowTable, isRowTable = false, isShadowTable = true) - val dependentRelations = parameters.remove(ExternalStoreUtils.DEPENDENT_RELATIONS) + // val dependentRelations = parameters.remove(ExternalStoreUtils.DEPENDENT_RELATIONS) val connProperties = ExternalStoreUtils.validateAndGetAllProps(sc, parameters) @@ -702,7 +698,7 @@ final class DefaultSource extends ColumnarRelationProvider { // create an index relation if it is an index table val baseTable = options.get(StoreUtils.GEM_INDEXED_TABLE) val relation = baseTable match { - case Some(baseTable) => new IndexColumnFormatRelation(SnappyStoreHiveCatalog. + case Some(btable) => new IndexColumnFormatRelation(SnappyStoreHiveCatalog. processTableIdentifier(table, sqlContext.conf), getClass.getCanonicalName, mode, @@ -713,7 +709,7 @@ final class DefaultSource extends ColumnarRelationProvider { externalStore, partitioningColumn, sqlContext, - baseTable) + btable) case None => new ColumnFormatRelation(SnappyStoreHiveCatalog. processTableIdentifier(table, sqlContext.conf), getClass.getCanonicalName, diff --git a/core/src/main/scala/org/apache/spark/sql/execution/columnar/impl/JDBCSourceAsColumnarStore.scala b/core/src/main/scala/org/apache/spark/sql/execution/columnar/impl/JDBCSourceAsColumnarStore.scala index a72854f634..9b86067214 100644 --- a/core/src/main/scala/org/apache/spark/sql/execution/columnar/impl/JDBCSourceAsColumnarStore.scala +++ b/core/src/main/scala/org/apache/spark/sql/execution/columnar/impl/JDBCSourceAsColumnarStore.scala @@ -19,20 +19,21 @@ package org.apache.spark.sql.execution.columnar.impl import java.sql.{Connection, ResultSet, Statement} import java.util.UUID -import scala.reflect.ClassTag - +import com.esotericsoftware.kryo.io.{Input, Output} +import com.esotericsoftware.kryo.{Kryo, KryoSerializable} import com.gemstone.gemfire.internal.cache.{AbstractRegion, PartitionedRegion} import com.pivotal.gemfirexd.internal.engine.Misc import com.pivotal.gemfirexd.internal.engine.distributed.utils.GemFireXDUtils import io.snappydata.impl.SparkShellRDDHelper import org.apache.spark.rdd.RDD +import org.apache.spark.serializer.ConnectionPropertiesSerializer import org.apache.spark.sql.collection._ +import org.apache.spark.sql.execution.RDDKryo import org.apache.spark.sql.execution.columnar._ import org.apache.spark.sql.execution.row.RowFormatScanRDD import org.apache.spark.sql.sources.{ConnectionProperties, Filter} import org.apache.spark.sql.store.StoreUtils -import org.apache.spark.sql.types.StructType import org.apache.spark.sql.{SnappySession, SparkSession} import org.apache.spark.{Partition, TaskContext} @@ -42,15 +43,16 @@ import org.apache.spark.{Partition, TaskContext} class JDBCSourceAsColumnarStore(_connProperties: ConnectionProperties, _numPartitions: Int) extends JDBCSourceAsStore(_connProperties, _numPartitions) { + self => override def getConnectedExternalStore(tableName: String, - onExecutor: Boolean): ConnectedExternalStore = new JDBCSourceAsColumnarStore( - _connProperties, - _numPartitions) with ConnectedExternalStore { - protected[this] override val connectedInstance: Connection = - self.getConnection(tableName, onExecutor) - } + onExecutor: Boolean): ConnectedExternalStore = + new JDBCSourceAsColumnarStore(_connProperties, _numPartitions) + with ConnectedExternalStore { + protected[this] override val connectedInstance: Connection = + self.getConnection(tableName, onExecutor) + } override def getCachedBatchRDD(tableName: String, requiredColumns: Array[String], session: SparkSession): RDD[CachedBatch] = { @@ -64,7 +66,7 @@ class JDBCSourceAsColumnarStore(_connProperties: ConnectionProperties, // partition-specific val poolProps = _connProperties.poolProps - (if (_connProperties.hikariCP) "jdbcUrl" else "url") - new SparkShellCachedBatchRDD[CachedBatch](snappySession, + new SparkShellCachedBatchRDD(snappySession, tableName, requiredColumns, ConnectionProperties(_connProperties.url, _connProperties.driver, _connProperties.dialect, poolProps, _connProperties.connProps, _connProperties.executorConnProps, @@ -129,9 +131,9 @@ class JDBCSourceAsColumnarStore(_connProperties: ConnectionProperties, final class ColumnarStorePartitionedRDD( @transient private val session: SnappySession, - tableName: String, + private var tableName: String, @transient private val store: JDBCSourceAsColumnarStore) - extends RDD[Any](session.sparkContext, Nil) { + extends RDDKryo[Any](session.sparkContext, Nil) with KryoSerializable { override def compute(part: Partition, context: TaskContext): Iterator[Any] = { val container = GemFireXDUtils.getGemFireContainer(tableName, true) @@ -156,12 +158,26 @@ final class ColumnarStorePartitionedRDD( region.asInstanceOf[PartitionedRegion]) }) } + + override def write(kryo: Kryo, output: Output): Unit = { + super.write(kryo, output) + output.writeString(tableName) + } + + override def read(kryo: Kryo, input: Input): Unit = { + super.read(kryo, input) + tableName = input.readString() + } } -class SparkShellCachedBatchRDD[T: ClassTag](_session: SnappySession, - tableName: String, requiredColumns: Array[String], - connProperties: ConnectionProperties, store: ExternalStore) - extends RDD[CachedBatch](_session.sparkContext, Nil) { +final class SparkShellCachedBatchRDD( + @transient private val session: SnappySession, + private var tableName: String, + private var requiredColumns: Array[String], + private var connProperties: ConnectionProperties, + @transient private val store: ExternalStore) + extends RDDKryo[CachedBatch](session.sparkContext, Nil) + with KryoSerializable { override def compute(split: Partition, context: TaskContext): Iterator[CachedBatch] = { @@ -181,20 +197,38 @@ class SparkShellCachedBatchRDD[T: ClassTag](_session: SnappySession, override def getPartitions: Array[Partition] = { store.tryExecute(tableName, SparkShellRDDHelper.getPartitions(tableName, _)) } + + override def write(kryo: Kryo, output: Output): Unit = { + super.write(kryo, output) + + output.writeString(tableName) + output.writeVarInt(requiredColumns.length, true) + for (column <- requiredColumns) { + output.writeString(column) + } + ConnectionPropertiesSerializer.write(kryo, output, connProperties) + } + + override def read(kryo: Kryo, input: Input): Unit = { + super.read(kryo, input) + + tableName = input.readString() + val numColumns = input.readVarInt(true) + requiredColumns = Array.fill(numColumns)(input.readString()) + connProperties = ConnectionPropertiesSerializer.read(kryo, input) + } } -class SparkShellRowRDD[T: ClassTag](_session: SnappySession, - getConnection: () => Connection, - schema: StructType, - tableName: String, - isPartitioned: Boolean, - columns: Array[String], - connProperties: ConnectionProperties, - filters: Array[Filter] = Array.empty[Filter], - partitions: Array[Partition] = Array.empty[Partition]) - extends RowFormatScanRDD(_session, getConnection, schema, tableName, - isPartitioned, columns, pushProjections = true, useResultSet = true, - connProperties, filters, partitions) { +class SparkShellRowRDD(_session: SnappySession, + _tableName: String, + _isPartitioned: Boolean, + _columns: Array[String], + _connProperties: ConnectionProperties, + _filters: Array[Filter] = Array.empty[Filter], + _parts: Array[Partition] = Array.empty[Partition]) + extends RowFormatScanRDD(_session, _tableName, _isPartitioned, _columns, + pushProjections = true, useResultSet = true, _connProperties, + _filters, _parts) { override def computeResultSet( thePart: Partition): (Connection, Statement, ResultSet) = { @@ -203,8 +237,6 @@ class SparkShellRowRDD[T: ClassTag](_session: SnappySession, connProperties, thePart) val resolvedName = StoreUtils.lookupName(tableName, conn.getSchema) - // TODO: this will fail if no network server is available unless SNAP-365 is - // fixed with the approach of having an iterator that can fetch from remote if (isPartitioned) { val ps = conn.prepareStatement( "call sys.SET_BUCKETS_FOR_LOCAL_EXECUTION(?, ?)") @@ -238,10 +270,11 @@ class SparkShellRowRDD[T: ClassTag](_session: SnappySession, override def getPartitions: Array[Partition] = { // use incoming partitions if provided (e.g. for collocated tables) - if (partitions.length > 0) { - return partitions + if (parts != null && parts.length > 0) { + return parts } - val conn = getConnection() + val conn = ExternalStoreUtils.getConnection(tableName, connProperties, + forExecutor = true) try { SparkShellRDDHelper.getPartitions(tableName, conn) } finally { diff --git a/core/src/main/scala/org/apache/spark/sql/execution/row/ResultSetEncodingAdapter.scala b/core/src/main/scala/org/apache/spark/sql/execution/row/ResultSetEncodingAdapter.scala index 0cd4d9c2a4..c058f0dadf 100644 --- a/core/src/main/scala/org/apache/spark/sql/execution/row/ResultSetEncodingAdapter.scala +++ b/core/src/main/scala/org/apache/spark/sql/execution/row/ResultSetEncodingAdapter.scala @@ -20,10 +20,10 @@ import java.sql.ResultSet import com.gemstone.gemfire.internal.shared.ClientSharedData -import org.apache.spark.sql.catalyst.expressions.{Attribute, UnsafeArrayData, UnsafeMapData, UnsafeRow} +import org.apache.spark.sql.catalyst.expressions.{UnsafeArrayData, UnsafeMapData, UnsafeRow} import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.execution.columnar.encoding.ColumnEncoding -import org.apache.spark.sql.types.{DataType, Decimal} +import org.apache.spark.sql.types.{DataType, Decimal, StructField} import org.apache.spark.unsafe.Platform import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String} @@ -41,10 +41,10 @@ final class ResultSetEncodingAdapter(rs: ResultSet, columnPosition: Int) override def supports(dataType: DataType): Boolean = true override protected def initializeNulls(columnBytes: AnyRef, - field: Attribute): Long = 0L + field: StructField): Long = 0L override def initializeDecoding(columnBytes: AnyRef, - field: Attribute): Long = 0L + field: StructField): Long = 0L override def nextBoolean(columnBytes: AnyRef, cursor: Long): Long = 0L diff --git a/core/src/main/scala/org/apache/spark/sql/execution/row/RowFormatRelation.scala b/core/src/main/scala/org/apache/spark/sql/execution/row/RowFormatRelation.scala index 5df0e2e322..38d5cc4175 100644 --- a/core/src/main/scala/org/apache/spark/sql/execution/row/RowFormatRelation.scala +++ b/core/src/main/scala/org/apache/spark/sql/execution/row/RowFormatRelation.scala @@ -34,7 +34,7 @@ import org.apache.spark.sql.execution.columnar.impl.SparkShellRowRDD import org.apache.spark.sql.execution.columnar.{ConnectionType, ExternalStoreUtils} import org.apache.spark.sql.execution.datasources.LogicalRelation import org.apache.spark.sql.execution.datasources.jdbc.JDBCPartition -import org.apache.spark.sql.execution.{ConnectionPool, PartitionedDataSourceScan, SparkPlan} +import org.apache.spark.sql.execution.{ConnectionPool, PartitionedDataSourceScan} import org.apache.spark.sql.hive.SnappyStoreHiveCatalog import org.apache.spark.sql.row.{GemFireXDDialect, JDBCMutableRelation} import org.apache.spark.sql.sources._ @@ -87,7 +87,7 @@ class RowFormatRelation( while (itr.hasNext) { // first column of index has to be present in filter to be usable val indexCols = itr.next().getIndexDescriptor.baseColumnPositions() - cols += baseColumns(indexCols(0)) + cols += baseColumns(indexCols(0) - 1) } } cols @@ -106,8 +106,6 @@ class RowFormatRelation( case ConnectionType.Embedded => new RowFormatScanRDD( session, - executorConnector, - ExternalStoreUtils.pruneSchema(schemaFields, requiredColumns), resolvedName, isPartitioned, requiredColumns, @@ -120,8 +118,6 @@ class RowFormatRelation( case _ => new SparkShellRowRDD( session, - executorConnector, - ExternalStoreUtils.pruneSchema(schemaFields, requiredColumns), resolvedName, isPartitioned, requiredColumns, @@ -247,7 +243,7 @@ class RowFormatRelation( val sncCatalog = snappySession.sessionState.catalog var dependentRelations: Array[String] = Array() - if (None != properties.get(ExternalStoreUtils.DEPENDENT_RELATIONS)) { + if (properties.get(ExternalStoreUtils.DEPENDENT_RELATIONS).isDefined) { dependentRelations = properties(ExternalStoreUtils.DEPENDENT_RELATIONS).split(",") } dependentRelations.foreach(rel => { @@ -275,7 +271,7 @@ final class DefaultSource extends MutableRelationProvider { isRowTable = true, isShadowTable = false) val schemaExtension = s"$schema $ddlExtension" val preservePartitions = parameters.remove("preservepartitions") - val dependentRelations = parameters.remove(ExternalStoreUtils.DEPENDENT_RELATIONS) + // val dependentRelations = parameters.remove(ExternalStoreUtils.DEPENDENT_RELATIONS) val sc = sqlContext.sparkContext val connProperties = ExternalStoreUtils.validateAndGetAllProps(sc, parameters) diff --git a/core/src/main/scala/org/apache/spark/sql/execution/row/RowFormatScanRDD.scala b/core/src/main/scala/org/apache/spark/sql/execution/row/RowFormatScanRDD.scala index 8869cbfd9e..c3f6eef5c9 100644 --- a/core/src/main/scala/org/apache/spark/sql/execution/row/RowFormatScanRDD.scala +++ b/core/src/main/scala/org/apache/spark/sql/execution/row/RowFormatScanRDD.scala @@ -22,6 +22,8 @@ import java.util.GregorianCalendar import scala.collection.JavaConverters._ import scala.collection.mutable.ArrayBuffer +import com.esotericsoftware.kryo.io.{Input, Output} +import com.esotericsoftware.kryo.{Kryo, KryoSerializable} import com.gemstone.gemfire.internal.cache.{CacheDistributionAdvisee, NonLocalRegionEntry, PartitionedRegion} import com.gemstone.gemfire.internal.shared.ClientSharedData import com.pivotal.gemfirexd.internal.engine.Misc @@ -30,13 +32,12 @@ import com.pivotal.gemfirexd.internal.engine.store.{AbstractCompactExecRow, GemF import com.pivotal.gemfirexd.internal.iapi.types.RowLocation import com.pivotal.gemfirexd.internal.impl.jdbc.EmbedResultSet -import org.apache.spark.rdd.RDD +import org.apache.spark.serializer.ConnectionPropertiesSerializer import org.apache.spark.sql.SnappySession import org.apache.spark.sql.collection.MultiBucketExecutorPartition -import org.apache.spark.sql.execution.ConnectionPool import org.apache.spark.sql.execution.columnar.{ExternalStoreUtils, ResultSetIterator} +import org.apache.spark.sql.execution.{ConnectionPool, RDDKryo} import org.apache.spark.sql.sources._ -import org.apache.spark.sql.types._ import org.apache.spark.{Partition, TaskContext} /** @@ -46,23 +47,21 @@ import org.apache.spark.{Partition, TaskContext} * as JDBCRDD has a lot of methods as private. */ class RowFormatScanRDD(@transient val session: SnappySession, - getConnection: () => Connection, - schema: StructType, - tableName: String, - isPartitioned: Boolean, - columns: Array[String], - val pushProjections: Boolean, - val useResultSet: Boolean, - connProperties: ConnectionProperties, - filters: Array[Filter] = Array.empty[Filter], - partitions: Array[Partition] = Array.empty[Partition]) - extends RDD[Any](session.sparkContext, Nil) { + protected var tableName: String, + protected var isPartitioned: Boolean, + @transient private val columns: Array[String], + var pushProjections: Boolean, + var useResultSet: Boolean, + protected var connProperties: ConnectionProperties, + @transient private val filters: Array[Filter] = Array.empty[Filter], + @transient protected val parts: Array[Partition] = Array.empty[Partition]) + extends RDDKryo[Any](session.sparkContext, Nil) with KryoSerializable { protected var filterWhereArgs: ArrayBuffer[Any] = _ /** * `filters`, but as a WHERE clause suitable for injection into a SQL query. */ - protected val filterWhereClause: String = { + protected var filterWhereClause: String = { val numFilters = filters.length if (numFilters > 0) { val sb = new StringBuilder().append(" WHERE ") @@ -149,7 +148,7 @@ class RowFormatScanRDD(@transient val session: SnappySession, /** * `columns`, but as a String suitable for injection into a SQL query. */ - protected val columnList: String = { + protected var columnList: String = { if (!pushProjections) "*" else if (columns.length > 0) { val sb = new StringBuilder() @@ -163,7 +162,8 @@ class RowFormatScanRDD(@transient val session: SnappySession, def computeResultSet( thePart: Partition): (Connection, Statement, ResultSet) = { - val conn = getConnection() + val conn = ExternalStoreUtils.getConnection(tableName, + connProperties, forExecutor = true) if (isPartitioned) { val ps = conn.prepareStatement( @@ -237,8 +237,8 @@ class RowFormatScanRDD(@transient val session: SnappySession, override def getPartitions: Array[Partition] = { // use incoming partitions if provided (e.g. for collocated tables) - if (partitions.length > 0) { - return partitions + if (parts != null && parts.length > 0) { + return parts } val conn = ConnectionPool.getPoolConnection(tableName, connProperties.dialect, connProperties.poolProps, @@ -256,6 +256,62 @@ class RowFormatScanRDD(@transient val session: SnappySession, conn.close() } } + + override def write(kryo: Kryo, output: Output): Unit = { + super.write(kryo, output) + + output.writeString(tableName) + output.writeBoolean(isPartitioned) + output.writeBoolean(pushProjections) + output.writeBoolean(useResultSet) + + output.writeString(columnList) + val filterArgs = filterWhereArgs + val len = if (filterArgs eq null) 0 else filterArgs.size + if (len == 0) { + output.writeVarInt(0, true) + } else { + var i = 0 + output.writeVarInt(len, true) + output.writeString(filterWhereClause) + while (i < len) { + kryo.writeClassAndObject(output, filterArgs(i)) + i += 1 + } + } + // need connection properties only if computing ResultSet + if (pushProjections || useResultSet || !isPartitioned || len > 0) { + ConnectionPropertiesSerializer.write(kryo, output, connProperties) + } + } + + override def read(kryo: Kryo, input: Input): Unit = { + super.read(kryo, input) + + tableName = input.readString() + isPartitioned = input.readBoolean() + pushProjections = input.readBoolean() + useResultSet = input.readBoolean() + + columnList = input.readString() + val numFilters = input.readVarInt(true) + if (numFilters == 0) { + filterWhereClause = "" + filterWhereArgs = null + } else { + filterWhereClause = input.readString() + filterWhereArgs = new ArrayBuffer[Any](numFilters) + var i = 0 + while (i < numFilters) { + filterWhereArgs += kryo.readClassAndObject(input) + i += 1 + } + } + // read connection properties only if computing ResultSet + if (pushProjections || useResultSet || !isPartitioned || numFilters > 0) { + connProperties = ConnectionPropertiesSerializer.read(kryo, input) + } + } } /** diff --git a/core/src/main/scala/org/apache/spark/sql/execution/row/UnsafeRowEncodingAdapter.scala b/core/src/main/scala/org/apache/spark/sql/execution/row/UnsafeRowEncodingAdapter.scala index 930ecc9a03..b151b74716 100644 --- a/core/src/main/scala/org/apache/spark/sql/execution/row/UnsafeRowEncodingAdapter.scala +++ b/core/src/main/scala/org/apache/spark/sql/execution/row/UnsafeRowEncodingAdapter.scala @@ -16,9 +16,9 @@ */ package org.apache.spark.sql.execution.row -import org.apache.spark.sql.catalyst.expressions.{Attribute, UnsafeArrayData, UnsafeMapData, UnsafeRow} +import org.apache.spark.sql.catalyst.expressions.{UnsafeArrayData, UnsafeMapData, UnsafeRow} import org.apache.spark.sql.execution.columnar.encoding.ColumnEncoding -import org.apache.spark.sql.types.{DataType, Decimal} +import org.apache.spark.sql.types.{DataType, Decimal, StructField} import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String} final class UnsafeRowEncodingAdapter(holder: UnsafeRowHolder, columnIndex: Int) @@ -29,10 +29,10 @@ final class UnsafeRowEncodingAdapter(holder: UnsafeRowHolder, columnIndex: Int) override def supports(dataType: DataType): Boolean = true override protected def initializeNulls(columnBytes: AnyRef, - field: Attribute): Long = 0L + field: StructField): Long = 0L override def initializeDecoding(columnBytes: AnyRef, - field: Attribute): Long = 0L + field: StructField): Long = 0L override def nextBoolean(columnBytes: AnyRef, cursor: Long): Long = 0L diff --git a/core/src/main/scala/org/apache/spark/sql/hive/HiveClientUtil.scala b/core/src/main/scala/org/apache/spark/sql/hive/HiveClientUtil.scala index d056d76202..8f0a0decfb 100644 --- a/core/src/main/scala/org/apache/spark/sql/hive/HiveClientUtil.scala +++ b/core/src/main/scala/org/apache/spark/sql/hive/HiveClientUtil.scala @@ -34,10 +34,10 @@ import org.apache.spark.sql.hive.client.{IsolatedClientLoader, HiveClient} import org.apache.spark.sql.internal.SQLConf /** - * A utility class to get hive connection to underlying metastore. A lot of code is similar to - * org.apache.spark.sql.hive.HiveUtils. Difference being we take a connection to underlying GemXD store. - * ///TDOD We need to investigate if we can phase out this class and use HiveUtils directly. - * @param sparkContext + * A utility class to get hive connection to underlying metastore. + * A lot of code is similar to org.apache.spark.sql.hive.HiveUtils. + * Difference being we take a connection to underlying GemXD store. + * TODO We need to investigate if we can phase out this class and use HiveUtils directly. */ class HiveClientUtil(val sparkContext: SparkContext) extends Logging { @@ -135,7 +135,6 @@ class HiveClientUtil(val sparkContext: SparkContext) extends Logging { private def newClient(): HiveClient = synchronized { - closeCurrent() // Just to ensure no other HiveDB is alive for this thread. val metaVersion = IsolatedClientLoader.hiveVersion(hiveMetastoreVersion) // We instantiate a HiveConf here to read in the hive-site.xml file and // then pass the options into the isolated client loader @@ -212,6 +211,16 @@ class HiveClientUtil(val sparkContext: SparkContext) extends Logging { DriverRegistry.register("com.pivotal.gemfirexd.jdbc.EmbeddedDriver") DriverRegistry.register("com.pivotal.gemfirexd.jdbc.ClientDriver") + // set as system properties for default HiveConf's + val props = metadataConf.getAllProperties + val propNames = props.stringPropertyNames().iterator() + while (propNames.hasNext) { + val name = propNames.next() + System.setProperty(name, props.getProperty(name)) + } + + closeCurrent() // Just to ensure no other HiveDB is alive for this thread. + logInfo("Initializing HiveMetastoreConnection version " + s"$hiveMetastoreVersion using Spark classes.") // new ClientWrapper(metaVersion, allConfig, classLoader) diff --git a/core/src/main/scala/org/apache/spark/sql/hive/SnappyExternalCatalog.scala b/core/src/main/scala/org/apache/spark/sql/hive/SnappyExternalCatalog.scala index b3f11ab399..82a656b379 100644 --- a/core/src/main/scala/org/apache/spark/sql/hive/SnappyExternalCatalog.scala +++ b/core/src/main/scala/org/apache/spark/sql/hive/SnappyExternalCatalog.scala @@ -32,7 +32,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.hive.client.HiveClient -private[spark] class SnappyExternalCatalog(var client :HiveClient, hadoopConf: Configuration) +private[spark] class SnappyExternalCatalog(var client: HiveClient, hadoopConf: Configuration) extends ExternalCatalog with Logging { import CatalogTypes.TablePartitionSpec @@ -43,8 +43,6 @@ private[spark] class SnappyExternalCatalog(var client :HiveClient, hadoopConf: C classOf[TException].getCanonicalName) - - /** * Whether this is an exception thrown by the hive client that should be wrapped. * @@ -66,6 +64,7 @@ private[spark] class SnappyExternalCatalog(var client :HiveClient, hadoopConf: C val tClass = t.getClass.getName tClass.contains("DisconnectedException") || tClass.contains("DisconnectException") || + (tClass.contains("MetaException") && t.getMessage.contains("retries")) || isDisconnectException(t.getCause) } else { false @@ -77,7 +76,7 @@ private[spark] class SnappyExternalCatalog(var client :HiveClient, hadoopConf: C function } catch { case he: HiveException if isDisconnectException(he) => - // stale GemXD connection + // stale JDBC connection Hive.closeCurrent() client = client.newSession() function @@ -99,7 +98,7 @@ private[spark] class SnappyExternalCatalog(var client :HiveClient, hadoopConf: C } private def requireDbMatches(db: String, table: CatalogTable): Unit = { - if (table.identifier.database != Some(db)) { + if (!table.identifier.database.contains(db)) { throw new AnalysisException( s"Provided database '$db' does not match the one specified in the " + s"table definition (${table.identifier.database.getOrElse("n/a")})") @@ -367,7 +366,8 @@ private[spark] class SnappyExternalCatalog(var client :HiveClient, hadoopConf: C // we are normalizing the function name. val functionName = funcDefinition.identifier.funcName.toLowerCase val functionIdentifier = funcDefinition.identifier.copy(funcName = functionName) - withHiveExceptionHandling(client.createFunction(db, funcDefinition.copy(identifier = functionIdentifier))) + withHiveExceptionHandling(client.createFunction(db, + funcDefinition.copy(identifier = functionIdentifier))) } override def dropFunction(db: String, name: String): Unit = withClient { diff --git a/core/src/main/scala/org/apache/spark/sql/hive/SnappyStoreHiveCatalog.scala b/core/src/main/scala/org/apache/spark/sql/hive/SnappyStoreHiveCatalog.scala index 21d261f741..8b200ad34b 100644 --- a/core/src/main/scala/org/apache/spark/sql/hive/SnappyStoreHiveCatalog.scala +++ b/core/src/main/scala/org/apache/spark/sql/hive/SnappyStoreHiveCatalog.scala @@ -47,7 +47,7 @@ import org.apache.spark.sql.hive.SnappyStoreHiveCatalog._ import org.apache.spark.sql.hive.client._ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.row.JDBCMutableRelation -import org.apache.spark.sql.sources.{DependencyCatalog, BaseRelation, DependentRelation, JdbcExtendedUtils, ParentRelation} +import org.apache.spark.sql.sources.{BaseRelation, DependencyCatalog, DependentRelation, JdbcExtendedUtils, ParentRelation} import org.apache.spark.sql.streaming.{StreamBaseRelation, StreamPlan} import org.apache.spark.sql.types.{DataType, MetadataBuilder, StructType} @@ -166,10 +166,10 @@ class SnappyStoreHiveCatalog(externalCatalog: SnappyExternalCatalog, (JdbcExtendedUtils.ALLOW_EXISTING_PROPERTY -> "true")).resolveRelation() } relation match { - case sr: StreamBaseRelation => //Do Nothing as it is not supported for stream relation - case pr: ParentRelation => { + case sr: StreamBaseRelation => // Do Nothing as it is not supported for stream relation + case pr: ParentRelation => var dependentRelations: Array[String] = Array() - if (None != table.properties.get(ExternalStoreUtils.DEPENDENT_RELATIONS)) { + if (table.properties.get(ExternalStoreUtils.DEPENDENT_RELATIONS).isDefined) { dependentRelations = table.properties(ExternalStoreUtils.DEPENDENT_RELATIONS) .split(",") } @@ -177,9 +177,7 @@ class SnappyStoreHiveCatalog(externalCatalog: SnappyExternalCatalog, dependentRelations.foreach(rel => { DependencyCatalog.addDependent(in.toString, rel) }) - - } - case _ => //Do nothing + case _ => // Do nothing } (LogicalRelation(relation), table) @@ -471,7 +469,8 @@ class SnappyStoreHiveCatalog(externalCatalog: SnappyExternalCatalog, lookupRelation(newQualifiedTableName(t)) match { case LogicalRelation(p: ParentRelation, _, _) => p.removeDependent(dep, this) - removeDependentRelation(newQualifiedTableName(t),newQualifiedTableName(dep.name)) + removeDependentRelation(newQualifiedTableName(t), + newQualifiedTableName(dep.name)) case _ => // ignore } } catch { @@ -535,7 +534,8 @@ class SnappyStoreHiveCatalog(externalCatalog: SnappyExternalCatalog, lookupRelation(newQualifiedTableName(t)) match { case LogicalRelation(p: ParentRelation, _, _) => p.addDependent(dep, this) - addDependentRelation(newQualifiedTableName(t),newQualifiedTableName(dep.name)) + addDependentRelation(newQualifiedTableName(t), + newQualifiedTableName(dep.name)) case _ => // ignore } tableProperties.put(JdbcExtendedUtils.BASETABLE_PROPERTY, t) @@ -582,7 +582,7 @@ class SnappyStoreHiveCatalog(externalCatalog: SnappyExternalCatalog, function } catch { case he: HiveException if isDisconnectException(he) => - // stale GemXD connection + // stale JDBC connection Hive.closeCurrent() client = externalCatalog.client.newSession() function @@ -624,6 +624,7 @@ class SnappyStoreHiveCatalog(externalCatalog: SnappyExternalCatalog, val tClass = t.getClass.getName tClass.contains("DisconnectedException") || tClass.contains("DisconnectException") || + (tClass.contains("MetaException") && t.getMessage.contains("retries")) || isDisconnectException(t.getCause) } else { false @@ -760,6 +761,7 @@ class SnappyStoreHiveCatalog(externalCatalog: SnappyExternalCatalog, (ExternalStoreUtils.DEPENDENT_RELATIONS -> (indexes + index.toString()))) ) } + def addDependentRelation(inTable: QualifiedTableName, dependentRelation: QualifiedTableName): Unit = { alterTableLock.synchronized { diff --git a/core/src/main/scala/org/apache/spark/sql/sources/MutableRelationProvider.scala b/core/src/main/scala/org/apache/spark/sql/sources/MutableRelationProvider.scala index 717bd2c336..3abfa60456 100644 --- a/core/src/main/scala/org/apache/spark/sql/sources/MutableRelationProvider.scala +++ b/core/src/main/scala/org/apache/spark/sql/sources/MutableRelationProvider.scala @@ -130,6 +130,8 @@ abstract class MutableRelationProvider } } +// IMPORTANT: if any changes are made to this class then update the +// serialization correspondingly in ConnectionPropertiesSerializer case class ConnectionProperties(url: String, driver: String, dialect: JdbcDialect, poolProps: Map[String, String], connProps: Properties, executorConnProps: Properties, hikariCP: Boolean) diff --git a/core/src/main/scala/org/apache/spark/sql/sources/SnappyOptimizations.scala b/core/src/main/scala/org/apache/spark/sql/sources/SnappyOptimizations.scala index 9342685fd7..d91f8e83e9 100644 --- a/core/src/main/scala/org/apache/spark/sql/sources/SnappyOptimizations.scala +++ b/core/src/main/scala/org/apache/spark/sql/sources/SnappyOptimizations.scala @@ -23,9 +23,7 @@ import scala.collection.mutable.ArrayBuffer import io.snappydata.QueryHint._ import org.apache.spark.sql.SnappySession -import org.apache.spark.sql.catalyst.expressions.{AttributeReference, AttributeSet, Expression, -PredicateHelper} -import org.apache.spark.sql.catalyst.optimizer.ReorderJoin +import org.apache.spark.sql.catalyst.expressions.{AttributeReference, AttributeSet, Expression, PredicateHelper} import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.execution.PartitionedDataSourceScan @@ -182,7 +180,7 @@ case class ResolveIndex(implicit val snappySession: SnappySession) extends Rule[ * like colocation chain can occur, we can potentially use multiple indexes. That we can * determine by reducing these combinations of 2 and merge into longer chain. * - * The choice of colocated group to [[replace]] will be made firstly based on more + * The choice of colocated group to `replace` will be made firstly based on more * feasibility check on LogicalPlan.outputSet, and then cost based computed by considering * filter and size. Later selectivity can be introduced. * diff --git a/core/src/main/scala/org/apache/spark/sql/types/TypeUtilities.scala b/core/src/main/scala/org/apache/spark/sql/types/TypeUtilities.scala new file mode 100644 index 0000000000..adbff45607 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/sql/types/TypeUtilities.scala @@ -0,0 +1,106 @@ +/* + * Copyright (c) 2016 SnappyData, Inc. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you + * may not use this file except in compliance with the License. You + * may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. See accompanying + * LICENSE file. + */ +package org.apache.spark.sql.types + +import java.util.Properties + +import com.esotericsoftware.kryo.Kryo +import com.esotericsoftware.kryo.io.{Input, Output} + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.execution.CodegenSupport + +object TypeUtilities { + + private[spark] val (rddIdField, rddStorageLevelField) = { + val c = classOf[RDD[_]] + val fields = c.getDeclaredFields + val f1 = fields.find(f => f.getName == "_id" || f.getName == "id").get + val f2 = fields.find(_.getName.endsWith("storageLevel")).get + f1.setAccessible(true) + f2.setAccessible(true) + (f1, f2) + } + + private[spark] val (parentMethod, parentSetter) = { + val c = classOf[CodegenSupport] + val m = c.getDeclaredMethod("parent") + m.setAccessible(true) + val s = c.getDeclaredMethod("parent_$eq", classOf[CodegenSupport]) + s.setAccessible(true) + (m, s) + } + + def getMetadata[T](key: String, metadata: Metadata): Option[T] = { + metadata.map.get(key).asInstanceOf[Option[T]] + } + + def writeMetadata(metadata: Metadata, kryo: Kryo, output: Output): Unit = { + val map = metadata.map + if ((metadata eq Metadata.empty) || map.isEmpty) { + output.writeVarInt(0, true) + } else { + output.writeVarInt(map.size, true) + metadata.map.foreach { case (key, value) => + output.writeString(key) + kryo.writeClassAndObject(output, value) + } + } + } + + def readMetadata(kryo: Kryo, input: Input): Metadata = { + var mapLen = input.readVarInt(true) + if (mapLen <= 0) { + Metadata.empty + } else { + val map = Map.newBuilder[String, Any] + while (mapLen > 0) { + val key = input.readString() + val value = kryo.readClassAndObject(input) + map += key -> value + mapLen -= 1 + } + new Metadata(map.result) + } + } + + def writeProperties(props: Properties, output: Output): Unit = { + if (props != null) { + val keys = props.stringPropertyNames() + output.writeVarInt(keys.size(), true) + val keysIterator = keys.iterator() + while (keysIterator.hasNext) { + val key = keysIterator.next() + output.writeString(key) + output.writeString(props.getProperty(key)) + } + } else { + output.writeVarInt(0, true) + } + } + + def readProperties(input: Input): Properties = { + val props = new Properties() + var numProperties = input.readVarInt(true) + while (numProperties > 0) { + val key = input.readString() + props.setProperty(key, input.readString()) + numProperties -= 1 + } + props + } +} diff --git a/core/src/test/resources/log4j.properties b/core/src/test/resources/log4j.properties index ef8456bf62..1b1c325bff 100644 --- a/core/src/test/resources/log4j.properties +++ b/core/src/test/resources/log4j.properties @@ -20,7 +20,7 @@ log4j.rootCategory=INFO, file # RollingFile appender log4j.appender.file=org.apache.log4j.RollingFileAppender log4j.appender.file.append=true -log4j.appender.file.file=gemfirexd.log +log4j.appender.file.file=snappydata.log log4j.appender.file.MaxFileSize=100MB log4j.appender.file.MaxBackupIndex=10000 log4j.appender.file.layout=org.apache.log4j.PatternLayout @@ -75,4 +75,4 @@ log4j.logger.org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend$Dr log4j.logger.org.apache.spark.storage.BlockManagerInfo=WARN log4j.logger.org.apache.spark.executor.SnappyExecutor=WARN -log4j.logger.org.apache.spark.sql.execution.WholeStageCodegenExec=DEBUG +# log4j.logger.org.apache.spark.sql.execution.WholeStageCodegenExec=DEBUG diff --git a/spark b/spark index c421eefe75..ffab1a508c 160000 --- a/spark +++ b/spark @@ -1 +1 @@ -Subproject commit c421eefe7581b4845e02cebbcb68aa07fa715a34 +Subproject commit ffab1a508c64c2e7383797a0a651f87a9b043d2b