Skip to content

Commit

Permalink
[NU-2048] Fix for invalid classloader used for resources in K8sDeploy…
Browse files Browse the repository at this point in the history
…mentManager (#7592)
  • Loading branch information
arkadius authored Feb 27, 2025
1 parent 42525ea commit ab62a15
Show file tree
Hide file tree
Showing 16 changed files with 114 additions and 54 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,17 +54,12 @@ object LocalNussknackerWithSingleModel {
// This map is ignored but must exist
appConfig.withValue("scenarioTypes", ConfigValueFactory.fromMap(Map.empty[String, ConfigValue].asJava))
)
for {
deploymentManagersClassLoader <- DeploymentManagersClassLoader.create(List.empty)
designerConfigLoader = new SimpleConfigLoadingDesignerConfigLoader(designerConfig.rawConfig.resolved)
appFactory = new NussknackerAppFactory(
designerConfig,
designerConfigLoader,
_ => local,
deploymentManagersClassLoader
)
app <- appFactory.createApp()
} yield app
val designerConfigLoader = new SimpleConfigLoadingDesignerConfigLoader(designerConfig.rawConfig.resolved)
val appFactory = new NussknackerAppFactory(
designerConfigLoader,
_ => local,
)
appFactory.createApp()
}

// TODO: easier way of handling users file
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@ object NussknackerApp extends IOApp with LazyLogging {
.as(ExitCode.Success)
}

private def program = for {
appFactory <- NussknackerAppFactory.create(AlwaysLoadingFileBasedDesignerConfigLoader(getClass.getClassLoader))
_ <- appFactory.createApp()
} yield ()
private def program = {
val appFactory = NussknackerAppFactory(AlwaysLoadingFileBasedDesignerConfigLoader(getClass.getClassLoader))
appFactory.createApp()
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -33,30 +33,25 @@ import java.time.Clock

object NussknackerAppFactory {

def create(designerConfigLoader: DesignerConfigLoader): Resource[IO, NussknackerAppFactory] = {
for {
designerConfig <- Resource.eval(designerConfigLoader.loadDesignerConfig())
managersDirs <- Resource.eval(IO.delay(designerConfig.managersDirs()))
deploymentManagerClassLoader <- DeploymentManagersClassLoader.create(managersDirs)
} yield new NussknackerAppFactory(
designerConfig,
def apply(designerConfigLoader: DesignerConfigLoader): NussknackerAppFactory = {
new NussknackerAppFactory(
designerConfigLoader,
new ProcessingTypesConfigBasedProcessingTypeDataLoader(_, deploymentManagerClassLoader),
deploymentManagerClassLoader
new ProcessingTypesConfigBasedProcessingTypeDataLoader(_),
)
}

}

class NussknackerAppFactory(
alreadyLoadedConfig: DesignerConfig,
designerConfigLoader: DesignerConfigLoader,
createProcessingTypeDataLoader: ProcessingTypeConfigsLoader => ProcessingTypeDataLoader,
deploymentManagersClassLoader: DeploymentManagersClassLoader
) extends LazyLogging {

def createApp(clock: Clock = Clock.systemUTC()): Resource[IO, Unit] = {
for {
alreadyLoadedConfig <- Resource.eval(designerConfigLoader.loadDesignerConfig())
managersDirs <- Resource.eval(IO.delay(alreadyLoadedConfig.managersDirs()))
deploymentManagersClassLoader <- DeploymentManagersClassLoader.create(managersDirs)
system <- createActorSystem(alreadyLoadedConfig.rawConfig)
executionContextWithIORuntime <- ExecutionContextWithIORuntimeAdapter.createFrom(system.dispatcher)
ioSttpBackend <- AsyncHttpClientCatsBackend.resource[IO]()
Expand Down Expand Up @@ -87,6 +82,7 @@ class NussknackerAppFactory(
processingTypeDataLoader,
feStatisticsRepository,
clock,
deploymentManagersClassLoader,
modelClassLoaderProvider
)(
system,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import pl.touk.nussknacker.engine.definition.component.Components.ComponentDefin
import pl.touk.nussknacker.engine.deployment.EngineSetupName
import pl.touk.nussknacker.restmodel.scenariodetails.ScenarioParameters
import pl.touk.nussknacker.ui.db.DbRef
import pl.touk.nussknacker.ui.process.periodic.{PeriodicDeploymentManagerDecorator, SchedulingConfig}
import pl.touk.nussknacker.ui.process.periodic.PeriodicDeploymentManagerDecorator
import pl.touk.nussknacker.ui.process.processingtype.DesignerModelData.DynamicComponentsStaticDefinitions

import scala.util.control.NonFatal
Expand Down Expand Up @@ -100,17 +100,18 @@ object ProcessingTypeData {
engineSetupName: EngineSetupName,
modelData: ModelData,
deploymentConfig: Config,
metaDataInitializer: MetaDataInitializer,
metaDataInitializer: MetaDataInitializer
) = {
val scenarioStateCacheTTL = ScenarioStateCachingConfig.extractScenarioStateCacheTTL(deploymentConfig)

val validDeploymentManager = for {
deploymentManager <- deploymentManagerProvider.createDeploymentManager(
modelData,
deploymentManagerDependencies,
deploymentConfig,
scenarioStateCacheTTL
)
deploymentManager <-
deploymentManagerProvider.createDeploymentManager(
modelData,
deploymentManagerDependencies,
deploymentConfig,
scenarioStateCacheTTL
)
decoratedDeploymentManager = schedulingForProcessingType match {
case SchedulingForProcessingType.Available(dbRef) =>
deploymentManager.schedulingSupport match {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package pl.touk.nussknacker.ui.process.processingtype.loader

import cats.data.ValidatedNel
import com.typesafe.config.Config
import pl.touk.nussknacker.engine.api.component.ScenarioPropertyConfig
import pl.touk.nussknacker.engine.{
BaseModelData,
CustomProcessValidator,
DeploymentManagerDependencies,
DeploymentManagerProvider,
MetaDataInitializer
}
import pl.touk.nussknacker.engine.api.deployment.DeploymentManager
import pl.touk.nussknacker.engine.deployment.EngineSetupName
import pl.touk.nussknacker.engine.util.ThreadUtils
import pl.touk.nussknacker.engine.util.loader.DeploymentManagersClassLoader

import scala.concurrent.duration.FiniteDuration

// We wrap DeploymentManagerProvider with this handler to make sure that every method execution will have the correct
// context classloader pinned. It is especially important when some logic around java resources is invoked.
// See for example Source.fromResource, ConfigFactory.parseResources etc.
// Warning! This solution won't work for lazy loaded resources because for them, context classloader will be different.
private[loader] class DeploymentManagerProviderCorrectClassloaderHandler(
delegate: DeploymentManagerProvider,
deploymentManagersClassLoader: DeploymentManagersClassLoader
) extends DeploymentManagerProvider {

override def name: String = ThreadUtils.withThisAsContextClassLoader(deploymentManagersClassLoader) { delegate.name }

override def createDeploymentManager(
modelData: BaseModelData,
dependencies: DeploymentManagerDependencies,
deploymentConfig: Config,
scenarioStateCacheTTL: Option[FiniteDuration]
): ValidatedNel[String, DeploymentManager] = {
ThreadUtils.withThisAsContextClassLoader(deploymentManagersClassLoader) {
delegate.createDeploymentManager(modelData, dependencies, deploymentConfig, scenarioStateCacheTTL)
}
}

override def metaDataInitializer(config: Config): MetaDataInitializer =
ThreadUtils.withThisAsContextClassLoader(deploymentManagersClassLoader) { delegate.metaDataInitializer(config) }

override def scenarioPropertiesConfig(config: Config): Map[String, ScenarioPropertyConfig] =
ThreadUtils.withThisAsContextClassLoader(deploymentManagersClassLoader) {
delegate.scenarioPropertiesConfig(config)
}

override def additionalValidators(config: Config): List[CustomProcessValidator] =
ThreadUtils.withThisAsContextClassLoader(deploymentManagersClassLoader) { delegate.additionalValidators(config) }

override def defaultEngineSetupName: EngineSetupName =
ThreadUtils.withThisAsContextClassLoader(deploymentManagersClassLoader) { delegate.defaultEngineSetupName }

override def engineSetupIdentity(config: Config): Any =
ThreadUtils.withThisAsContextClassLoader(deploymentManagersClassLoader) { delegate.engineSetupIdentity(config) }

}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import com.typesafe.config.ConfigFactory
import pl.touk.nussknacker.engine._
import pl.touk.nussknacker.engine.api.process.ProcessingType
import pl.touk.nussknacker.engine.util.Implicits.RichScalaMap
import pl.touk.nussknacker.engine.util.loader.DeploymentManagersClassLoader
import pl.touk.nussknacker.ui.db.DbRef
import pl.touk.nussknacker.ui.process.processingtype.{
CombinedProcessingTypeData,
Expand All @@ -23,6 +24,7 @@ class LocalProcessingTypeDataLoader(
override def loadProcessingTypeData(
getModelDependencies: ProcessingType => ModelDependencies,
getDeploymentManagerDependencies: ProcessingType => DeploymentManagerDependencies,
deploymentManagersClassLoader: DeploymentManagersClassLoader,
modelClassLoaderProvider: ModelClassLoaderProvider,
dbRef: Option[DbRef],
): IO[ProcessingTypeDataState[ProcessingTypeData, CombinedProcessingTypeData]] = IO {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package pl.touk.nussknacker.ui.process.processingtype.loader
import cats.effect.IO
import pl.touk.nussknacker.engine.{DeploymentManagerDependencies, ModelDependencies}
import pl.touk.nussknacker.engine.api.process.ProcessingType
import pl.touk.nussknacker.engine.util.loader.DeploymentManagersClassLoader
import pl.touk.nussknacker.ui.db.DbRef
import pl.touk.nussknacker.ui.process.processingtype.{
CombinedProcessingTypeData,
Expand All @@ -17,6 +18,7 @@ trait ProcessingTypeDataLoader {
def loadProcessingTypeData(
getModelDependencies: ProcessingType => ModelDependencies,
getDeploymentManagerDependencies: ProcessingType => DeploymentManagerDependencies,
deploymentManagersClassLoader: DeploymentManagersClassLoader,
modelClassLoaderProvider: ModelClassLoaderProvider,
// should be always available, used by scheduling mechanism,
// but in tests sometimes we do not want to bootstrap the full environment with db
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,14 @@ import pl.touk.nussknacker.ui.process.processingtype.loader.ProcessingTypeDataLo
import pl.touk.nussknacker.ui.process.processingtype.provider.ProcessingTypeDataState

class ProcessingTypesConfigBasedProcessingTypeDataLoader(
processingTypeConfigsLoader: ProcessingTypeConfigsLoader,
deploymentManagersClassLoader: DeploymentManagersClassLoader
processingTypeConfigsLoader: ProcessingTypeConfigsLoader
) extends ProcessingTypeDataLoader
with LazyLogging {

override def loadProcessingTypeData(
getModelDependencies: ProcessingType => ModelDependencies,
getDeploymentManagerDependencies: ProcessingType => DeploymentManagerDependencies,
deploymentManagersClassLoader: DeploymentManagersClassLoader,
modelClassLoaderProvider: ModelClassLoaderProvider,
dbRef: Option[DbRef],
): IO[ProcessingTypeDataState[ProcessingTypeData, CombinedProcessingTypeData]] = {
Expand All @@ -32,6 +32,7 @@ class ProcessingTypesConfigBasedProcessingTypeDataLoader(
_,
getModelDependencies,
getDeploymentManagerDependencies,
deploymentManagersClassLoader,
modelClassLoaderProvider,
dbRef
)
Expand All @@ -42,14 +43,15 @@ class ProcessingTypesConfigBasedProcessingTypeDataLoader(
processingTypesConfig: ProcessingTypeConfigs,
getModelDependencies: ProcessingType => ModelDependencies,
getDeploymentManagerDependencies: ProcessingType => DeploymentManagerDependencies,
deploymentManagersClassLoader: DeploymentManagersClassLoader,
modelClassLoaderProvider: ModelClassLoaderProvider,
dbRef: Option[DbRef],
): ProcessingTypeDataState[ProcessingTypeData, CombinedProcessingTypeData] = {
// This step with splitting DeploymentManagerProvider loading for all processing types
// and after that creating ProcessingTypeData is done because of the deduplication of deployments
// See DeploymentManagerProvider.engineSetupIdentity
val providerWithNameInputData = processingTypesConfig.configByProcessingType.mapValuesNow { processingTypeConfig =>
val provider = createDeploymentManagerProvider(processingTypeConfig)
val provider = createDeploymentManagerProvider(deploymentManagersClassLoader, processingTypeConfig)
val nameInputData = EngineNameInputData(
provider.defaultEngineSetupName,
provider.engineSetupIdentity(processingTypeConfig.deploymentConfig),
Expand Down Expand Up @@ -109,11 +111,15 @@ class ProcessingTypesConfigBasedProcessingTypeDataLoader(
)
}

private def createDeploymentManagerProvider(typeConfig: ProcessingTypeConfig): DeploymentManagerProvider = {
ScalaServiceLoader.loadNamed[DeploymentManagerProvider](
private def createDeploymentManagerProvider(
deploymentManagersClassLoader: DeploymentManagersClassLoader,
typeConfig: ProcessingTypeConfig
): DeploymentManagerProvider = {
val loadedProvider = ScalaServiceLoader.loadNamed[DeploymentManagerProvider](
typeConfig.deploymentManagerType,
deploymentManagersClassLoader
)
new DeploymentManagerProviderCorrectClassloaderHandler(loadedProvider, deploymentManagersClassLoader)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import pl.touk.nussknacker.engine.definition.component.Components.ComponentDefin
import pl.touk.nussknacker.engine.definition.test.ModelDataTestInfoProvider
import pl.touk.nussknacker.engine.dict.ProcessDictSubstitutor
import pl.touk.nussknacker.engine.util.ExecutionContextWithIORuntime
import pl.touk.nussknacker.engine.util.loader.ScalaServiceLoader
import pl.touk.nussknacker.engine.util.loader.{DeploymentManagersClassLoader, ScalaServiceLoader}
import pl.touk.nussknacker.engine.util.multiplicity.{Empty, Many, Multiplicity, One}
import pl.touk.nussknacker.processCounts.{CountsReporter, CountsReporterCreator}
import pl.touk.nussknacker.processCounts.influxdb.InfluxCountsReporterCreator
Expand Down Expand Up @@ -120,6 +120,7 @@ class AkkaHttpBasedRouteProvider(
processingTypeDataLoader: ProcessingTypeDataLoader,
feStatisticsRepository: FEStatisticsRepository[Future],
designerClock: Clock,
deploymentManagersClassLoader: DeploymentManagersClassLoader,
modelClassLoaderProvider: ModelClassLoaderProvider
)(
implicit system: ActorSystem,
Expand Down Expand Up @@ -782,6 +783,7 @@ class AkkaHttpBasedRouteProvider(
sttpBackend,
_
),
deploymentManagersClassLoader,
modelClassLoaderProvider,
Some(dbRef),
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,8 @@ trait NuItTest extends WithHsqlDbTesting with DefaultUniquePortProvider with Wit
override protected def beforeAll(): Unit = {
super.beforeAll()
val designerConfigLoader = new SimpleConfigLoadingDesignerConfigLoader(adjustNuTestConfig())
releaseAppResources = NussknackerAppFactory
.create(designerConfigLoader)
.flatMap(_.createApp(clock = clock))
releaseAppResources = NussknackerAppFactory(designerConfigLoader)
.createApp(clock = clock)
.allocated
.unsafeRunSync()
._2
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ import pl.touk.nussknacker.engine.api.process._
import pl.touk.nussknacker.engine.api.process.VersionId.initialVersionId
import pl.touk.nussknacker.engine.canonicalgraph.CanonicalProcess
import pl.touk.nussknacker.engine.definition.test.{ModelDataTestInfoProvider, TestInfoProvider}
import pl.touk.nussknacker.restmodel.{CancelRequest, DeployRequest}
import pl.touk.nussknacker.restmodel.scenariodetails.ScenarioWithDetails
import pl.touk.nussknacker.restmodel.{CancelRequest, DeployRequest}
import pl.touk.nussknacker.security.Permission
import pl.touk.nussknacker.test.EitherValuesDetailedMessage
import pl.touk.nussknacker.test.base.db.WithHsqlDbTesting
Expand Down Expand Up @@ -186,13 +186,11 @@ trait NuResourcesTest
protected val typeToConfig: ProcessingTypeDataProvider[ProcessingTypeData, CombinedProcessingTypeData] = {
val designerConfig = DesignerConfig.from(testConfig)
ProcessingTypeDataProvider(
new ProcessingTypesConfigBasedProcessingTypeDataLoader(
() => IO.pure(designerConfig.processingTypeConfigs),
deploymentManagersClassLoader
)
new ProcessingTypesConfigBasedProcessingTypeDataLoader(() => IO.pure(designerConfig.processingTypeConfigs))
.loadProcessingTypeData(
_ => modelDependencies,
_ => deploymentManagerDependencies,
deploymentManagersClassLoader,
modelClassLoaderProvider,
Some(testDbRef),
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import pl.touk.nussknacker.engine.api.component.ComponentType._
import pl.touk.nussknacker.engine.api.graph.ScenarioGraph
import pl.touk.nussknacker.engine.api.process.{ProcessingType, ProcessObjectDependencies}
import pl.touk.nussknacker.engine.definition.component.Components.ComponentDefinitionExtractionMode
import pl.touk.nussknacker.engine.definition.component.Components.ComponentDefinitionExtractionMode.FinalDefinition
import pl.touk.nussknacker.engine.definition.component.defaultconfig.DefaultsComponentGroupName._
import pl.touk.nussknacker.engine.definition.component.defaultconfig.DefaultsComponentIcon
import pl.touk.nussknacker.engine.definition.component.defaultconfig.DefaultsComponentIcon._
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ class ProcessingTypeDataProviderSpec extends AnyFunSuite with WithTestDeployment
.loadProcessingTypeData(
_ => modelDependencies,
_ => TestFactory.deploymentManagerDependencies,
deploymentManagersClassLoader,
ModelClassLoaderProvider(
allProcessingTypes.map(_ -> ModelClassLoaderDependencies(List.empty, None)).toMap,
deploymentManagersClassLoader
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -302,13 +302,12 @@ class ScenarioParametersServiceTest
ComponentDefinitionExtractionMode.FinalDefinition
)
val processingTypeData =
new ProcessingTypesConfigBasedProcessingTypeDataLoader(
() => IO.pure(designerConfig.processingTypeConfigs),
deploymentManagersClassLoader
new ProcessingTypesConfigBasedProcessingTypeDataLoader(() => IO.pure(designerConfig.processingTypeConfigs),
)
.loadProcessingTypeData(
_ => modelDependencies,
_ => TestFactory.deploymentManagerDependencies,
deploymentManagersClassLoader,
ModelClassLoaderProvider(
designerConfig.processingTypeConfigs.configByProcessingType
.mapValuesNow(config => ModelClassLoaderDependencies(config.classPath, Some(workPath))),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,8 @@ class K8sDeploymentManager(
inputConfig.copy(config = withOverrides).serialized
}

private lazy val defaultLogbackConfig = Using.resource(Source.fromResource("runtime/default-logback.xml"))(_.mkString)
private lazy val defaultLogbackConfig =
Using.resource(Source.fromResource("runtime/default-logback.xml", getClass.getClassLoader))(_.mkString)

private def logbackConfig: String = config.logbackConfigPath
.map(path => Using.resource(Source.fromFile(path))(_.mkString))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ class IngressPreparer(config: IngressConfig, nuInstanceName: Option[String]) {
}

private def fromUserConfig: Ingress = {
val minimalConfig = ConfigFactory.parseResources("defaultMinimalIngress.conf")
val minimalConfig = ConfigFactory.parseResources(getClass.getClassLoader, "defaultMinimalIngress.conf")
val finalConfig = config.config.withFallback(minimalConfig)
Json.parse(finalConfig.root().render(ConfigRenderOptions.concise())).as[Ingress]
}
Expand Down

0 comments on commit ab62a15

Please sign in to comment.