Skip to content

Commit

Permalink
add: non null column check & disable schema evolve option
Browse files Browse the repository at this point in the history
  • Loading branch information
yuxiqian committed Aug 16, 2024
1 parent 75f786f commit a44b8e6
Show file tree
Hide file tree
Showing 6 changed files with 47 additions and 7 deletions.
2 changes: 2 additions & 0 deletions src/main/scala/factory/PhakerDataFactory.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ class PhakerDataFactory extends DataSourceFactory {
TableId.parse(conf.get(TABLE_ID)),
conf.get(REJECTED_TYPES).split(',').toSet,
conf.get(SCHEMA_EVOLVE),
conf.get(NON_NULL_COLUMNS),
conf.get(MAX_COLUMN_COUNT),
conf.get(RECORDS_PER_SECOND)
)
Expand All @@ -40,6 +41,7 @@ class PhakerDataFactory extends DataSourceFactory {
Set[ConfigOption[_]](
REJECTED_TYPES,
SCHEMA_EVOLVE,
NON_NULL_COLUMNS,
MAX_COLUMN_COUNT,
RECORDS_PER_SECOND
).asJava
Expand Down
22 changes: 19 additions & 3 deletions src/main/scala/source/PhakeDataGenerator.scala
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,9 @@ import java.time.{Instant, ZonedDateTime}
import scala.util.Random

object PhakeDataGenerator {
def randomType(rejectedTypes: Set[String]): DataType = {
val choices = List(

def possibleChoices(rejectedTypes: Set[String]): List[DataType] = {
List(
DataTypes.BINARY(17 + Random.nextInt(100)),
DataTypes.VARBINARY(17 + Random.nextInt(100)),
DataTypes.BOOLEAN,
Expand All @@ -30,10 +31,25 @@ object PhakeDataGenerator {
DataTypes.TIMESTAMP_TZ(Random.nextInt(10)),
DataTypes.TIMESTAMP_LTZ(Random.nextInt(10))
).filterNot(t => rejectedTypes.contains(t.getClass.getSimpleName))
choices(Random.nextInt(choices.length))
}

def randomType(
rejectedTypes: Set[String],
generateNonNullColumns: Boolean
): DataType = {
val choices = possibleChoices(rejectedTypes)
val resultType = choices(Random.nextInt(choices.length))
if (generateNonNullColumns && Random.nextBoolean()) {
resultType.notNull
} else {
resultType.nullable
}
}

def randomData(name: String, dataType: DataType): AnyRef = {
if (dataType.isNullable && Random.nextBoolean()) {
return null
}
if (name == PhakerDatabase.primaryKey) {
return idCount
.synchronized {
Expand Down
2 changes: 2 additions & 0 deletions src/main/scala/source/PhakerDataSource.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ class PhakerDataSource(
tableId: TableId,
rejectedTypes: Set[String],
schemaEvolve: Boolean,
generateNonNullColumns: Boolean,
maxColumnCount: Int,
recordsPerSecond: Int
) extends DataSource {
Expand All @@ -18,6 +19,7 @@ class PhakerDataSource(
tableId,
rejectedTypes,
schemaEvolve,
generateNonNullColumns,
maxColumnCount
),
recordsPerSecond
Expand Down
8 changes: 8 additions & 0 deletions src/main/scala/source/PhakerDataSourceOptions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,14 @@ object PhakerDataSourceOptions {
"Whether generate schema evolution events occasionally. Defaults to true."
)

val NON_NULL_COLUMNS: ConfigOption[lang.Boolean] = ConfigOptions
.key("non.null.columns")
.booleanType()
.defaultValue(false)
.withDescription(
"Whether generating non-nullable columns into downstream."
)

val MAX_COLUMN_COUNT: ConfigOption[lang.Integer] = ConfigOptions
.key("max.column.count")
.intType()
Expand Down
15 changes: 12 additions & 3 deletions src/main/scala/source/PhakerSourceGenerator.scala
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,18 @@ class PhakerSourceGenerator(
tableId: TableId,
rejectedTypes: Set[String],
schemaEvolve: Boolean,
generateNonNullColumns: Boolean,
maxColumnCount: Int
) extends RandomGenerator[Event] {

private val cachedEvents: util.List[Event] = {
if (!schemaEvolve) {
PhakerDatabase.columnList ++=
PhakeDataGenerator
.possibleChoices(rejectedTypes)
.zipWithIndex
.map(t => (s"column${t._2}_${t._1.getClass.getSimpleName}", t._1))
}
val cache = new util.ArrayList[Event]
cache.add(
new CreateTableEvent(
Expand Down Expand Up @@ -114,12 +122,13 @@ class PhakerSourceGenerator(

println("Emitting schema change events...")

val addedColumnType =
PhakeDataGenerator.randomType(rejectedTypes, generateNonNullColumns)

val addedColumnName = colCount.synchronized {
colCount += 1
s"column$colCount"
s"column${colCount}_${addedColumnType.getClass.getSimpleName}"
}
val addedColumnType = PhakeDataGenerator.randomType(rejectedTypes)

PhakerDatabase.columnList.synchronized {
PhakerDatabase.columnList :+= (addedColumnName, addedColumnType)
println(s"Done, new schema: ${PhakerDatabase.genSchema}")
Expand Down
5 changes: 4 additions & 1 deletion src/test/scala/PhakerTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,10 @@ class PhakerTest extends AnyFunSuite {
TableId.tableId("default_namespace", "default_schema", "default_table"),
Set("IntType", "FloatType", "DoubleType"),
true,
true,
17
),
1000
1
)

val env = StreamExecutionEnvironment.getExecutionEnvironment
Expand All @@ -50,6 +51,8 @@ class PhakerTest extends AnyFunSuite {
)
.set(PhakerDataSourceOptions.REJECTED_TYPES, "BinaryType,VarBinaryType")
.set[java.lang.Integer](PhakerDataSourceOptions.RECORDS_PER_SECOND, 1)
.set[java.lang.Boolean](PhakerDataSourceOptions.NON_NULL_COLUMNS, true)
.set[java.lang.Boolean](PhakerDataSourceOptions.SCHEMA_EVOLVE, true)
.set[java.lang.Integer](PhakerDataSourceOptions.MAX_COLUMN_COUNT, 50)

val sourceDef =
Expand Down

0 comments on commit a44b8e6

Please sign in to comment.