Skip to content

Commit

Permalink
Scheduling API update and fixed flaky tests (#7552)
Browse files Browse the repository at this point in the history
  • Loading branch information
mgoworko authored Feb 13, 2025
1 parent 9ef252c commit 6af265e
Show file tree
Hide file tree
Showing 10 changed files with 37 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ import com.typesafe.config.Config
import pl.touk.nussknacker.engine.api.deployment.inconsistency.InconsistentStateDetector
import pl.touk.nussknacker.engine.api.deployment.scheduler.services._
import pl.touk.nussknacker.engine.api.process.{ProcessIdWithName, ProcessName, VersionId}
import pl.touk.nussknacker.engine.newdeployment
import pl.touk.nussknacker.engine.util.WithDataFreshnessStatusUtils.WithDataFreshnessStatusOps
import pl.touk.nussknacker.engine.{BaseModelData, DeploymentManagerDependencies, newdeployment}

import java.time.Instant
import scala.concurrent.ExecutionContext.Implicits._
Expand Down Expand Up @@ -141,7 +141,9 @@ sealed trait SchedulingSupport

trait SchedulingSupported extends SchedulingSupport {

def createScheduledExecutionPerformer(deploymentConfig: Config): ScheduledExecutionPerformer
def createScheduledExecutionPerformer(
rawSchedulingConfig: Config,
): ScheduledExecutionPerformer

def customSchedulePropertyExtractorFactory: Option[SchedulePropertyExtractorFactory] = None

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package pl.touk.nussknacker.ui.process.periodic

import com.typesafe.config.Config
import com.typesafe.scalalogging.LazyLogging
import pl.touk.nussknacker.engine.DeploymentManagerDependencies
import pl.touk.nussknacker.engine.api.component.ScenarioPropertyConfig
import pl.touk.nussknacker.engine.api.definition.{MandatoryParameterValidator, StringParameterEditor}
import pl.touk.nussknacker.engine.api.deployment.scheduler.services.{
Expand All @@ -10,7 +11,6 @@ import pl.touk.nussknacker.engine.api.deployment.scheduler.services.{
SchedulePropertyExtractorFactory
}
import pl.touk.nussknacker.engine.api.deployment.{DeploymentManager, SchedulingSupported}
import pl.touk.nussknacker.engine.{DeploymentManagerDependencies, ModelData}
import pl.touk.nussknacker.ui.db.DbRef
import pl.touk.nussknacker.ui.process.periodic.cron.{CronParameterValidator, CronSchedulePropertyExtractor}
import pl.touk.nussknacker.ui.process.periodic.legacy.db.{LegacyDbInitializer, SlickLegacyPeriodicProcessesRepository}
Expand All @@ -30,7 +30,6 @@ object PeriodicDeploymentManagerDecorator extends LazyLogging {
def decorate(
underlying: DeploymentManager,
schedulingSupported: SchedulingSupported,
modelData: ModelData,
deploymentConfig: Config,
dependencies: DeploymentManagerDependencies,
dbRef: DbRef,
Expand Down Expand Up @@ -93,7 +92,7 @@ object PeriodicDeploymentManagerDecorator extends LazyLogging {
delegate = underlying,
dependencies = dependencies,
periodicProcessesRepository = periodicProcessesRepository,
scheduledExecutionPerformer = schedulingSupported.createScheduledExecutionPerformer(deploymentConfig),
scheduledExecutionPerformer = schedulingSupported.createScheduledExecutionPerformer(rawSchedulingConfig),
schedulePropertyExtractorFactory = schedulePropertyExtractorFactory,
processConfigEnricherFactory = processConfigEnricherFactory,
listenerFactory = periodicProcessListenerFactory,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,6 @@ object ProcessingTypeData {
PeriodicDeploymentManagerDecorator.decorate(
underlying = deploymentManager,
schedulingSupported = supported,
modelData = modelData,
deploymentConfig = deploymentConfig,
dependencies = deploymentManagerDependencies,
dbRef = dbRef,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,23 +7,29 @@ import pl.touk.nussknacker.engine.deployment.{DeploymentId, ExternalDeploymentId
import pl.touk.nussknacker.engine.testing.StubbingCommands
import pl.touk.nussknacker.ui.process.periodic.model.PeriodicProcessDeploymentId

import scala.collection.concurrent.TrieMap
import scala.concurrent.Future

class DeploymentManagerStub extends BaseDeploymentManager with StubbingCommands {

var jobStatus: Map[ProcessName, List[StatusDetails]] = Map.empty
val jobStatus: TrieMap[ProcessName, List[StatusDetails]] = TrieMap.empty

def getJobStatus(processName: ProcessName): Option[List[StatusDetails]] = {
jobStatus.get(processName)
}

def setEmptyStateStatus(): Unit = {
jobStatus = Map.empty
jobStatus.clear()
}

def addStateStatus(
processName: ProcessName,
status: StateStatus,
deploymentIdOpt: Option[PeriodicProcessDeploymentId]
): Unit = {
jobStatus = jobStatus ++ Map(
processName -> List(
jobStatus.put(
processName,
List(
StatusDetails(
deploymentId = deploymentIdOpt.map(pdid => DeploymentId(pdid.toString)),
externalDeploymentId = Some(ExternalDeploymentId("1")),
Expand All @@ -42,8 +48,9 @@ class DeploymentManagerStub extends BaseDeploymentManager with StubbingCommands
status: StateStatus,
deploymentIdOpt: Option[PeriodicProcessDeploymentId]
): Unit = {
jobStatus = Map(
processName -> List(
jobStatus.put(
processName,
List(
StatusDetails(
deploymentId = deploymentIdOpt.map(pdid => DeploymentId(pdid.toString)),
externalDeploymentId = Some(ExternalDeploymentId("1")),
Expand Down Expand Up @@ -77,7 +84,7 @@ class DeploymentManagerStub extends BaseDeploymentManager with StubbingCommands
override def getProcessStates(
name: ProcessName
)(implicit freshnessPolicy: DataFreshnessPolicy): Future[WithDataFreshnessStatus[List[StatusDetails]]] = {
Future.successful(WithDataFreshnessStatus.fresh(jobStatus.get(name).toList.flatten))
Future.successful(WithDataFreshnessStatus.fresh(getJobStatus(name).toList.flatten))
}

override def deploymentSynchronisationSupport: DeploymentSynchronisationSupport = NoDeploymentSynchronisationSupport
Expand All @@ -90,7 +97,7 @@ class DeploymentManagerStub extends BaseDeploymentManager with StubbingCommands
override def getAllProcessesStates()(
implicit freshnessPolicy: DataFreshnessPolicy
): Future[WithDataFreshnessStatus[Map[ProcessName, List[StatusDetails]]]] =
Future.successful(WithDataFreshnessStatus.fresh(jobStatus))
Future.successful(WithDataFreshnessStatus.fresh(jobStatus.toMap))

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -482,7 +482,7 @@ class PeriodicDeploymentManagerTest
f.periodicProcessService.handleFinished.futureValue

// after some time Flink stops returning job status
f.delegateDeploymentManagerStub.jobStatus = Map.empty
f.delegateDeploymentManagerStub.jobStatus.clear()

f.getMergedStatusDetails.status shouldEqual ProblemStateStatus.Failed
f.repository.deploymentEntities.loneElement.status shouldBe PeriodicProcessDeploymentStatus.Failed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ class PeriodicProcessServiceTest
FinishedEvent(
finished.toDetails,
canonicalProcess,
f.delegateDeploymentManagerStub.jobStatus.get(processName).flatMap(_.headOption)
f.delegateDeploymentManagerStub.getJobStatus(processName).flatMap(_.headOption)
),
ScheduledEvent(scheduled.toDetails, firstSchedule = false)
)
Expand Down Expand Up @@ -295,7 +295,7 @@ class PeriodicProcessServiceTest
f.events.loneElement shouldBe FinishedEvent(
event.toDetails,
canonicalProcess,
f.delegateDeploymentManagerStub.jobStatus.get(processName).flatMap(_.headOption)
f.delegateDeploymentManagerStub.getJobStatus(processName).flatMap(_.headOption)
)
}

Expand Down Expand Up @@ -378,7 +378,7 @@ class PeriodicProcessServiceTest
f.events.toList shouldBe List(
FailedOnRunEvent(
expectedDetails.toDetails,
f.delegateDeploymentManagerStub.jobStatus.get(processName).flatMap(_.headOption)
f.delegateDeploymentManagerStub.getJobStatus(processName).flatMap(_.headOption)
)
)
}
Expand Down
5 changes: 3 additions & 2 deletions docs/Changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,9 @@
* [#7446](https://github.com/TouK/nussknacker/pull/7446) Small changes regarding node errors in fragments used in scenarios:
* Fragment error node tips in scenarios are now clickable and open problematic node edit window in a new tab.
* Fragment nodes are now highlighted when they contain nodes with errors.
* [#7364](https://github.com/TouK/nussknacker/pull/7364) PeriodicDeploymentManger is no longer a separate DM, but instead is an optional functionality and decorator for all DMs
* in order to use it, DM must implement interface `schedulingSupported`, that handles deployments on a specific engine
* [#7364](https://github.com/TouK/nussknacker/pull/7364) PeriodicDeploymentManager is no longer a separate DM, but instead is an optional functionality and decorator for all DMs
* Nussknacker API additionally modified in [#7552](https://github.com/TouK/nussknacker/pull/7552)
* in order to use it, DM must implement interface `SchedulingSupported`, that handles deployments on a specific engine
* implementation provided for Flink DM
* additional, necessary, db schema changes concerning the periodic/scheduling mechanism introduced in [#7519](https://github.com/TouK/nussknacker/pull/7519)
* [#7443](https://github.com/TouK/nussknacker/pull/7443) Indexing on record is more similar to indexing on map. The change lets us access record values dynamically. For example now spel expression "{a: 5, b: 10}[#input.field]" compiles and has type "Integer" inferred from types of values of the record. This lets us access record value based on user input, for instance if user passes "{"field": "b"}" to scenario we will get value "10", whereas input {"field": "c"} would result in "null". Expression "{a: 5}["b"]" still does not compile because it is known at compile time that record does not have property "b".
Expand Down
2 changes: 2 additions & 0 deletions docs/MigrationGuide.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ To see the biggest differences please consult the [changelog](Changelog.md).
they should be modified to use the predefined set of actions or otherwise replaced by custom links and handled outside Nussknacker.
* [#7364](https://github.com/TouK/nussknacker/pull/7364)
* additional, necessary, db schema changes concerning the periodic/scheduling mechanism introduced in [#7519](https://github.com/TouK/nussknacker/pull/7519)
* additionally modified in [#7552](https://github.com/TouK/nussknacker/pull/7552)
* the PeriodicDeploymentManager is no longer a separate DM type
* in `scenarioTypes` config section, the `deploymentConfig` of a periodic scenario type (only Flink was supported so far) may have looked like that:
```hocon
Expand Down Expand Up @@ -91,6 +92,7 @@ To see the biggest differences please consult the [changelog](Changelog.md).
### Code API changes
* [#7368](https://github.com/TouK/nussknacker/pull/7368) [#7502](https://github.com/TouK/nussknacker/pull/7502) Renamed `PeriodicSourceFactory` to `EventGeneratorSourceFactory`
* [#7364](https://github.com/TouK/nussknacker/pull/7364) The DeploymentManager must implement `def schedulingSupport: SchedulingSupport`. If support not added, then `NoSchedulingSupport` should be used.
* interface modified in [#7552](https://github.com/TouK/nussknacker/pull/7552)
* [#7511](https://github.com/TouK/nussknacker/pull/7511) Changes around flink-based scenario testing. As an entry point to all migration steps, assume that `FlinkMiniClusterWithServices` is a new `FlinkMiniClusterHolder`
* From perspective of testkit (`TestScenarioRunner.flinkBased`) module usage
* `flink-tests` module doesn't depend on `flink-test-utils` module. To create `FlinkMiniClusterWithServices` follow steps below. Example migration process is also available in [PR with the related change](https://github.com/TouK/nussknacker/pull/7511/files#diff-2ccffe37f56882fa91afb457ba45c98f399c40f7667b2de9ea3453b6e8a76989).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -306,8 +306,9 @@ class FlinkDeploymentManager(

override def schedulingSupport: SchedulingSupport = new SchedulingSupported {

override def createScheduledExecutionPerformer(config: Config): ScheduledExecutionPerformer =
FlinkScheduledExecutionPerformer.create(modelData, client, config)
override def createScheduledExecutionPerformer(
rawSchedulingConfig: Config,
): ScheduledExecutionPerformer = FlinkScheduledExecutionPerformer.create(client, modelData, rawSchedulingConfig)

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,13 @@ object FlinkScheduledExecutionPerformer {
val jarFileNameRuntimeParam = "jarFileName"

def create(
flinkClient: FlinkClient,
modelData: BaseModelData,
client: FlinkClient,
config: Config,
rawSchedulingConfig: Config,
): ScheduledExecutionPerformer = {
new FlinkScheduledExecutionPerformer(
flinkClient = client,
jarsDir = Paths.get(config.getString("scheduling.jarsDir")),
flinkClient = flinkClient,
jarsDir = Paths.get(rawSchedulingConfig.getString("jarsDir")),
inputConfigDuringExecution = modelData.inputConfigDuringExecution,
modelJarProvider = new FlinkModelJarProvider(modelData.modelClassLoaderUrls)
)
Expand Down

0 comments on commit 6af265e

Please sign in to comment.