Skip to content

Commit

Permalink
[WIP] TWS Batch Support
Browse files Browse the repository at this point in the history
  • Loading branch information
ericm-db committed Jan 31, 2024
1 parent 4c06c7d commit c8cec52
Showing 1 changed file with 60 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,19 @@
*/
package org.apache.spark.sql.execution.streaming


import java.util.UUID
import java.util.concurrent.TimeUnit.NANOSECONDS

import org.apache.hadoop.conf.Configuration

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Ascending, Attribute, Expression, SortOrder, UnsafeRow}
import org.apache.spark.sql.catalyst.plans.physical.Distribution
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.streaming.state._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.streaming.{OutputMode, StatefulProcessor, TimeoutMode}
import org.apache.spark.sql.types._
import org.apache.spark.util.{CompletionIterator, Utils}
Expand Down Expand Up @@ -155,23 +159,62 @@ case class TransformWithStateExec(
override protected def doExecute(): RDD[InternalRow] = {
metrics // force lazy init at driver

child.execute().mapPartitionsWithStateStore[InternalRow](
getStateInfo,
schemaForKeyRow,
schemaForValueRow,
numColsPrefixKey = 0,
session.sqlContext.sessionState,
Some(session.sqlContext.streams.stateStoreCoordinator),
useColumnFamilies = true
) {
case (store: StateStore, singleIterator: Iterator[InternalRow]) =>
val processorHandle = new StatefulProcessorHandleImpl(
store, getStateInfo.queryRunId, isStreaming)
assert(processorHandle.getHandleState == StatefulProcessorHandleState.CREATED)
statefulProcessor.init(processorHandle, outputMode)
processorHandle.setHandleState(StatefulProcessorHandleState.INITIALIZED)
val result = processDataWithPartition(singleIterator, store, processorHandle)
result
if (isStreaming) {
child.execute().mapPartitionsWithStateStore[InternalRow](
getStateInfo,
schemaForKeyRow,
schemaForValueRow,
numColsPrefixKey = 0,
session.sqlContext.sessionState,
Some(session.sqlContext.streams.stateStoreCoordinator),
useColumnFamilies = true
) {
case (store: StateStore, singleIterator: Iterator[InternalRow]) =>
val processorHandle = new StatefulProcessorHandleImpl(
store, getStateInfo.queryRunId, isStreaming)
assert(processorHandle.getHandleState == StatefulProcessorHandleState.CREATED)
statefulProcessor.init(processorHandle, outputMode)
processorHandle.setHandleState(StatefulProcessorHandleState.INITIALIZED)
val result = processDataWithPartition(singleIterator, store, processorHandle)
result
}
} else {
child.execute().mapPartitions[InternalRow](
iter => {
val sqlConf = new SQLConf()
sqlConf.setConf(SQLConf.NUM_STATE_STORE_MAINTENANCE_THREADS, 1)
sqlConf.setConf(SQLConf.STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT, 1)
sqlConf.setConf(SQLConf.MIN_BATCHES_TO_RETAIN, 1)
sqlConf.setConf(SQLConf.STATE_STORE_PROVIDER_CLASS,
"org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider")
sqlConf.setConf(SQLConf.STATE_STORE_FORMAT_VALIDATION_ENABLED, false)
sqlConf.setConf(SQLConf.STATE_STORE_SKIP_NULLS_FOR_STREAM_STREAM_JOINS, false)
sqlConf.setConf(SQLConf.STATE_STORE_COMPRESSION_CODEC, "lz4")
sqlConf.setConf(SQLConf.STATE_SCHEMA_CHECK_ENABLED, false)
sqlConf.setConf(SQLConf.STREAMING_MAINTENANCE_INTERVAL, 1000L)

val providerId = new StateStoreProviderId(
StateStoreId(Utils.createTempDir().getAbsolutePath, 0, 0), getStateInfo.queryRunId)

val stateStoreProvider = StateStoreProvider.createAndInit(
providerId,
schemaForKeyRow,
schemaForValueRow,
numColsPrefixKey = 0,
useColumnFamilies = true,
storeConf = StateStoreConf(sqlConf),
hadoopConf = new Configuration())

val store = stateStoreProvider.getStore(0)
val processorHandle =
new StatefulProcessorHandleImpl(store, UUID.randomUUID(), isStreaming)
assert(processorHandle.getHandleState == StatefulProcessorHandleState.CREATED)
statefulProcessor.init(processorHandle, outputMode)
processorHandle.setHandleState(StatefulProcessorHandleState.INITIALIZED)
val result = processDataWithPartition(iter, null, processorHandle)
result
}
)
}
}
}
Expand Down

0 comments on commit c8cec52

Please sign in to comment.