Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-48772][SS][SQL] State Data Source Change Feed Reader Mode #47188

Closed
Closed
Show file tree
Hide file tree
Changes from 6 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
1ade442
Squashed commit of the following:
eason-yuchen-liu Jul 2, 2024
98bf8ec
revert unnecessary changes
eason-yuchen-liu Jul 2, 2024
fb890ae
Merge branch 'master' into readStateChange
eason-yuchen-liu Jul 2, 2024
db45c6f
Add comments
eason-yuchen-liu Jul 2, 2024
1926e5e
Merge branch 'readStateChange' of https://github.com/eason-yuchen-liu…
eason-yuchen-liu Jul 2, 2024
24c0351
minor
eason-yuchen-liu Jul 2, 2024
d4a4b80
group options & make options in changeFeed mode isolate from some opt…
eason-yuchen-liu Jul 8, 2024
42552ac
reorder the columns in the result
eason-yuchen-liu Jul 8, 2024
24db837
address comments from Jungtaek
eason-yuchen-liu Jul 8, 2024
adde991
minor
eason-yuchen-liu Jul 8, 2024
d3ca86c
refactor StatePartitionReader for both modes
eason-yuchen-liu Jul 8, 2024
5199c56
minor
eason-yuchen-liu Jul 8, 2024
ce75133
Use NextIterator as the interface rather than StateStoreChangeDataRea…
eason-yuchen-liu Jul 9, 2024
84dcf15
more doc
eason-yuchen-liu Jul 9, 2024
22a086b
Merge branch 'master' into readStateChange
eason-yuchen-liu Jul 9, 2024
c797d0b
use Jungtaek's advice in checking schema validity
eason-yuchen-liu Jul 9, 2024
5921479
Merge branch 'readStateChange' of https://github.com/eason-yuchen-liu…
eason-yuchen-liu Jul 9, 2024
e5674cf
solve column family merge conflict
eason-yuchen-liu Jul 9, 2024
c012e1a
pass tests
eason-yuchen-liu Jul 9, 2024
ff0cd43
continue
eason-yuchen-liu Jul 9, 2024
2ad7590
make the doc consistent
eason-yuchen-liu Jul 10, 2024
43420f6
continue
eason-yuchen-liu Jul 10, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3812,7 +3812,7 @@
"STATE_STORE_PROVIDER_DOES_NOT_SUPPORT_FINE_GRAINED_STATE_REPLAY" : {
"message" : [
"The given State Store Provider <inputClass> does not extend org.apache.spark.sql.execution.streaming.state.SupportsFineGrainedReplay.",
"Therefore, it does not support option snapshotStartBatchId in state data source."
"Therefore, it does not support option snapshotStartBatchId or readChangeFeed in state data source."
],
"sqlState" : "42K06"
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ import org.apache.spark.sql.execution.streaming.StreamingCheckpointConstants.{DI
import org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinHelper.{LeftSide, RightSide}
import org.apache.spark.sql.execution.streaming.state.{StateSchemaCompatibilityChecker, StateStore, StateStoreConf, StateStoreId, StateStoreProviderId}
import org.apache.spark.sql.sources.DataSourceRegister
import org.apache.spark.sql.types.{IntegerType, StructType}
import org.apache.spark.sql.types.{IntegerType, LongType, StringType, StructType}
import org.apache.spark.sql.util.CaseInsensitiveStringMap
import org.apache.spark.util.SerializableConfiguration

Expand Down Expand Up @@ -94,10 +94,21 @@ class StateDataSource extends TableProvider with DataSourceRegister {
manager.readSchemaFile()
}

new StructType()
.add("key", keySchema)
.add("value", valueSchema)
.add("partition_id", IntegerType)
if (sourceOptions.readChangeFeed) {
new StructType()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd expect change_type and batch_id to begin with, and even batch ID to be placed earlier (batch_id, change_type).

Given the characteristic of change feed, the output is expected to be ordered by batch ID (among partition IDs, which may be uneasy), or even the data source does not do so, users should be able to do so easily because they will high likely do.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense.

.add("key", keySchema)
.add("value", valueSchema)
.add("change_type", StringType)
.add("batch_id", LongType)
.add("partition_id", IntegerType)
} else {
new StructType()
.add("key", keySchema)
.add("value", valueSchema)
.add("partition_id", IntegerType)
}


} catch {
case NonFatal(e) =>
throw StateDataSourceErrors.failedToReadStateSchema(sourceOptions, e)
Expand Down Expand Up @@ -132,7 +143,10 @@ case class StateSourceOptions(
storeName: String,
joinSide: JoinSideValues,
snapshotStartBatchId: Option[Long],
snapshotPartitionId: Option[Int]) {
snapshotPartitionId: Option[Int],
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While we are here, it'd be nice to structure sub-options as the parameters are now 10 and 5 params aren't common ones. Options for 1) starting with snapshot 2) readChangeFeed can be grouped together and be Option[<option model class>] for each.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea.

readChangeFeed: Boolean,
changeStartBatchId: Option[Long],
changeEndBatchId: Option[Long]) {
def stateCheckpointLocation: Path = new Path(resolvedCpLocation, DIR_NAME_STATE)

override def toString: String = {
Expand All @@ -151,6 +165,9 @@ object StateSourceOptions extends DataSourceOptions {
val JOIN_SIDE = newOption("joinSide")
val SNAPSHOT_START_BATCH_ID = newOption("snapshotStartBatchId")
val SNAPSHOT_PARTITION_ID = newOption("snapshotPartitionId")
val READ_CHANGE_FEED = newOption("readChangeFeed")
val CHANGE_START_BATCH_ID = newOption("changeStartBatchId")
val CHANGE_END_BATCH_ID = newOption("changeEndBatchId")

object JoinSideValues extends Enumeration {
type JoinSideValues = Value
Expand Down Expand Up @@ -231,9 +248,45 @@ object StateSourceOptions extends DataSourceOptions {
throw StateDataSourceErrors.requiredOptionUnspecified(SNAPSHOT_PARTITION_ID)
}

val readChangeFeed = Option(options.get(READ_CHANGE_FEED)).exists(_.toBoolean)

val changeStartBatchId = Option(options.get(CHANGE_START_BATCH_ID)).map(_.toLong)
var changeEndBatchId = Option(options.get(CHANGE_END_BATCH_ID)).map(_.toLong)

if (readChangeFeed) {
if (joinSide != JoinSideValues.none) {
throw StateDataSourceErrors.conflictOptions(Seq(JOIN_SIDE, READ_CHANGE_FEED))
}
if (changeStartBatchId.isEmpty) {
throw StateDataSourceErrors.requiredOptionUnspecified(CHANGE_START_BATCH_ID)
}
changeEndBatchId = Option(changeEndBatchId.getOrElse(batchId))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably we'll need to make clear the current option batchId to denote that it is "ending" batch ID - it will help the option to be used among multiple modes.

We could probably design a new option and promote the new option later. Before that, let's simply not fall back - let's require users to specify symmetric option. We could reconsider the option of consolidating "starting batch ID" as well later.


// changeStartBatchId and changeEndBatchId must all be defined at this point
if (changeStartBatchId.get < 0) {
throw StateDataSourceErrors.invalidOptionValueIsNegative(CHANGE_START_BATCH_ID)
}
if (changeEndBatchId.get < changeStartBatchId.get) {
throw StateDataSourceErrors.invalidOptionValue(CHANGE_END_BATCH_ID,
s"$CHANGE_END_BATCH_ID cannot be smaller than $CHANGE_START_BATCH_ID. " +
s"Please check the input to $CHANGE_END_BATCH_ID, or if you are using its default " +
s"value, make sure that $CHANGE_START_BATCH_ID is less than ${changeEndBatchId.get}.")
}
} else {
if (changeStartBatchId.isDefined) {
throw StateDataSourceErrors.invalidOptionValue(CHANGE_START_BATCH_ID,
s"Only specify this option when $READ_CHANGE_FEED is set to true.")
}
if (changeEndBatchId.isDefined) {
throw StateDataSourceErrors.invalidOptionValue(CHANGE_END_BATCH_ID,
s"Only specify this option when $READ_CHANGE_FEED is set to true.")
}
}

StateSourceOptions(
resolvedCpLocation, batchId, operatorId, storeName,
joinSide, snapshotStartBatchId, snapshotPartitionId)
joinSide, snapshotStartBatchId, snapshotPartitionId,
readChangeFeed, changeStartBatchId, changeEndBatchId)
}

private def resolvedCheckpointLocation(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ import org.apache.spark.sql.connector.read.{InputPartition, PartitionReader, Par
import org.apache.spark.sql.execution.datasources.v2.state.metadata.StateMetadataTableEntry
import org.apache.spark.sql.execution.datasources.v2.state.utils.SchemaUtil
import org.apache.spark.sql.execution.streaming.state._
import org.apache.spark.sql.execution.streaming.state.RecordType.{getRecordTypeAsString, RecordType}
import org.apache.spark.sql.types.StructType
import org.apache.spark.unsafe.types.UTF8String
import org.apache.spark.util.SerializableConfiguration

/**
Expand All @@ -37,8 +39,14 @@ class StatePartitionReaderFactory(
stateStoreMetadata: Array[StateMetadataTableEntry]) extends PartitionReaderFactory {

override def createReader(partition: InputPartition): PartitionReader[InternalRow] = {
new StatePartitionReader(storeConf, hadoopConf,
partition.asInstanceOf[StateStoreInputPartition], schema, stateStoreMetadata)
val stateStoreInputPartition = partition.asInstanceOf[StateStoreInputPartition]
if (stateStoreInputPartition.sourceOptions.readChangeFeed) {
new StateStoreChangeDataPartitionReader(storeConf, hadoopConf,
partition.asInstanceOf[StateStoreInputPartition], schema, stateStoreMetadata)
} else {
new StatePartitionReader(storeConf, hadoopConf,
partition.asInstanceOf[StateStoreInputPartition], schema, stateStoreMetadata)
}
}
}

Expand All @@ -57,7 +65,7 @@ class StatePartitionReader(
private val keySchema = SchemaUtil.getSchemaAsDataType(schema, "key").asInstanceOf[StructType]
private val valueSchema = SchemaUtil.getSchemaAsDataType(schema, "value").asInstanceOf[StructType]

private lazy val provider: StateStoreProvider = {
protected lazy val provider: StateStoreProvider = {
val stateStoreId = StateStoreId(partition.sourceOptions.stateCheckpointLocation.toString,
partition.sourceOptions.operatorId, partition.partition, partition.sourceOptions.storeName)
val stateStoreProviderId = StateStoreProviderId(stateStoreId, partition.queryId)
Expand Down Expand Up @@ -104,11 +112,11 @@ class StatePartitionReader(
}
}

private lazy val iter: Iterator[InternalRow] = {
protected lazy val iter: Iterator[InternalRow] = {
store.iterator().map(pair => unifyStateRowPair((pair.key, pair.value)))
}

private var current: InternalRow = _
protected var current: InternalRow = _

override def next(): Boolean = {
if (iter.hasNext) {
Expand Down Expand Up @@ -136,3 +144,48 @@ class StatePartitionReader(
row
}
}

/**
* An implementation of [[PartitionReader]] for the readChangeFeed mode of State Data Source.
* It reads the change of state over batches of a particular partition.
*/
class StateStoreChangeDataPartitionReader(
storeConf: StateStoreConf,
hadoopConf: SerializableConfiguration,
partition: StateStoreInputPartition,
schema: StructType,
stateStoreMetadata: Array[StateMetadataTableEntry])
extends StatePartitionReader(storeConf, hadoopConf, partition, schema, stateStoreMetadata) {

private lazy val changeDataReader: StateStoreChangeDataReader = {
if (!provider.isInstanceOf[SupportsFineGrainedReplay]) {
throw StateStoreErrors.stateStoreProviderDoesNotSupportFineGrainedReplay(
provider.getClass.toString)
}
provider.asInstanceOf[SupportsFineGrainedReplay]
.getStateStoreChangeDataReader(
partition.sourceOptions.changeStartBatchId.get + 1,
partition.sourceOptions.changeEndBatchId.get + 1)
}

override protected lazy val iter: Iterator[InternalRow] = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd say, the logic for iterator is simple enough and reusing the logic partially makes things more complicated. Initializing schema, state store provider and the store instance can be reused among two classes (store instance isn't even reused) - maybe good to have abstract class named StatePartitionReaderBase and move these common parts to the new abstract class.

changeDataReader.iterator.map(unifyStateChangeDataRow)
}

override def close(): Unit = {
current = null
changeDataReader.close()
provider.close()
}

private def unifyStateChangeDataRow(row: (RecordType, UnsafeRow, UnsafeRow, Long)):
InternalRow = {
val result = new GenericInternalRow(5)
result.update(0, row._2)
result.update(1, row._3)
result.update(2, UTF8String.fromString(getRecordTypeAsString(row._1)))
result.update(3, row._4)
result.update(4, partition.partition)
result
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import org.apache.spark.sql.execution.datasources.v2.state.StateSourceOptions.Jo
import org.apache.spark.sql.execution.datasources.v2.state.metadata.StateMetadataTableEntry
import org.apache.spark.sql.execution.datasources.v2.state.utils.SchemaUtil
import org.apache.spark.sql.execution.streaming.state.StateStoreConf
import org.apache.spark.sql.types.{IntegerType, StructType}
import org.apache.spark.sql.types.{IntegerType, LongType, StringType, StructType}
import org.apache.spark.sql.util.CaseInsensitiveStringMap
import org.apache.spark.util.ArrayImplicits._

Expand Down Expand Up @@ -76,6 +76,9 @@ class StateTable(
override def properties(): util.Map[String, String] = Map.empty[String, String].asJava

private def isValidSchema(schema: StructType): Boolean = {
Copy link
Contributor

@HeartSaVioR HeartSaVioR Jul 9, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My proposal could handle both non-CDF and CDF altogether in the single flow - this still needs a divergence and also every column has its own if or else if. Have you tried with my proposal?

Copy link
Contributor Author

@eason-yuchen-liu eason-yuchen-liu Jul 9, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry overlooked the code. It is indeed more elegant. Thanks for the suggestion.

if (sourceOptions.readChangeFeed) {
return isValidChangeDataSchema(schema)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: one empty line to clearly denote it is early-return

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Btw, we verify the same column name with the same logic regardless of the mode. That said, we should be able to refine the logic to reduce redundant code.

val expectedFieldNames = if (sourceOptions.readChangeFeed) {
  Seq("key", "value", "change_type", "batch_id", "partition_id")
} else {
  Seq("key", "value", "partition_id")
}
val expectedTypes = Map("key" -> classOf[StructType], ..., "batch_id" -> classOf[LongType]) // <= should contain all 5 columns

if (schema.fieldNames.toImmutableArraySeq != expectedFieldNames) {
  false
} else {
  schema.fieldNames.forall { fieldName =>
    SchemaUtil.getSchemaAsDataType(schema, fieldName).getClass == expectedTypes(fieldName)
  }
}

The above code isn't written with IDE so please consider this as snippet and construct yours.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch!

if (schema.fieldNames.toImmutableArraySeq != Seq("key", "value", "partition_id")) {
false
} else if (!SchemaUtil.getSchemaAsDataType(schema, "key").isInstanceOf[StructType]) {
Expand All @@ -89,6 +92,25 @@ class StateTable(
}
}

private def isValidChangeDataSchema(schema: StructType): Boolean = {
if (schema.fieldNames.toImmutableArraySeq !=
Seq("key", "value", "change_type", "batch_id", "partition_id")) {
false
} else if (!SchemaUtil.getSchemaAsDataType(schema, "key").isInstanceOf[StructType]) {
false
} else if (!SchemaUtil.getSchemaAsDataType(schema, "value").isInstanceOf[StructType]) {
false
} else if (!SchemaUtil.getSchemaAsDataType(schema, "change_type").isInstanceOf[StringType]) {
false
} else if (!SchemaUtil.getSchemaAsDataType(schema, "batch_id").isInstanceOf[LongType]) {
false
} else if (!SchemaUtil.getSchemaAsDataType(schema, "partition_id").isInstanceOf[IntegerType]) {
false
} else {
true
}
}

override def metadataColumns(): Array[MetadataColumn] = Array.empty
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -978,4 +978,47 @@ private[sql] class HDFSBackedStateStoreProvider extends StateStoreProvider with

result
}

override def getStateStoreChangeDataReader(startVersion: Long, endVersion: Long):
StateStoreChangeDataReader = {
new HDFSBackedStateStoreChangeDataReader(fm, baseDir, startVersion, endVersion,
CompressionCodec.createCodec(sparkConf, storeConf.compressionCodec),
keySchema, valueSchema)
}
}

/** [[StateStoreChangeDataReader]] implementation for [[HDFSBackedStateStoreProvider]] */
class HDFSBackedStateStoreChangeDataReader(
fm: CheckpointFileManager,
stateLocation: Path,
startVersion: Long,
endVersion: Long,
compressionCodec: CompressionCodec,
keySchema: StructType,
valueSchema: StructType)
extends StateStoreChangeDataReader(
fm, stateLocation, startVersion, endVersion, compressionCodec) {

override protected var changelogSuffix: String = "delta"

override def getNext(): (RecordType.Value, UnsafeRow, UnsafeRow, Long) = {
val reader = currentChangelogReader()
if (reader == null) {
return null
}
val (recordType, keyArray, valueArray, _) = reader.next()
val keyRow = new UnsafeRow(keySchema.fields.length)
keyRow.pointTo(keyArray, keyArray.length)
if (valueArray == null) {
(recordType, keyRow, null, currentChangelogVersion - 1)
} else {
val valueRow = new UnsafeRow(valueSchema.fields.length)
// If valueSize in existing file is not multiple of 8, floor it to multiple of 8.
// This is a workaround for the following:
// Prior to Spark 2.3 mistakenly append 4 bytes to the value row in
// `RowBasedKeyValueBatch`, which gets persisted into the checkpoint data
valueRow.pointTo(valueArray, (valueArray.length / 8) * 8)
(recordType, keyRow, valueRow, currentChangelogVersion - 1)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,20 @@
package org.apache.spark.sql.execution.streaming.state

import java.io._
import java.util.concurrent.ConcurrentHashMap

import scala.util.control.NonFatal

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path

import org.apache.spark.{SparkConf, SparkEnv}
import org.apache.spark.internal.{Logging, MDC}
import org.apache.spark.internal.LogKeys._
import org.apache.spark.io.CompressionCodec
import org.apache.spark.sql.catalyst.expressions.UnsafeRow
import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.execution.streaming.CheckpointFileManager
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.Utils

Expand Down Expand Up @@ -392,6 +396,19 @@ private[sql] class RocksDBStateStoreProvider
case e: Throwable => throw QueryExecutionErrors.cannotLoadStore(e)
}
}

override def getStateStoreChangeDataReader(startVersion: Long, endVersion: Long):
StateStoreChangeDataReader = {
val statePath = stateStoreId.storeCheckpointLocation()
val sparkConf = Option(SparkEnv.get).map(_.conf).getOrElse(new SparkConf)
new RocksDBStateStoreChangeDataReader(
CheckpointFileManager.create(statePath, hadoopConf),
statePath,
startVersion,
endVersion,
CompressionCodec.createCodec(sparkConf, storeConf.compressionCodec),
keyValueEncoderMap)
}
}

object RocksDBStateStoreProvider {
Expand Down Expand Up @@ -487,3 +504,34 @@ object RocksDBStateStoreProvider {
CUSTOM_METRIC_PINNED_BLOCKS_MEM_USAGE, CUSTOM_METRIC_NUM_EXTERNAL_COL_FAMILIES,
CUSTOM_METRIC_NUM_INTERNAL_COL_FAMILIES)
}

/** [[StateStoreChangeDataReader]] implementation for [[RocksDBStateStoreProvider]] */
class RocksDBStateStoreChangeDataReader(
fm: CheckpointFileManager,
stateLocation: Path,
startVersion: Long,
endVersion: Long,
compressionCodec: CompressionCodec,
keyValueEncoderMap:
ConcurrentHashMap[String, (RocksDBKeyStateEncoder, RocksDBValueStateEncoder)])
extends StateStoreChangeDataReader(
fm, stateLocation, startVersion, endVersion, compressionCodec) {

override protected var changelogSuffix: String = "changelog"

override def getNext(): (RecordType.Value, UnsafeRow, UnsafeRow, Long) = {
val reader = currentChangelogReader()
if (reader == null) {
return null
}
val (recordType, keyArray, valueArray, columnFamily) = reader.next()
val (rocksDBKeyStateEncoder, rocksDBValueStateEncoder) = keyValueEncoderMap.get(columnFamily)
val keyRow = rocksDBKeyStateEncoder.decodeKey(keyArray)
if (valueArray == null) {
(recordType, keyRow, null, currentChangelogVersion - 1)
} else {
val valueRow = rocksDBValueStateEncoder.decodeValue(valueArray)
(recordType, keyRow, valueRow, currentChangelogVersion - 1)
}
}
}
Loading