Skip to content

Commit

Permalink
Scenario activities in PullProcessRepository (#7595)
Browse files Browse the repository at this point in the history
  • Loading branch information
mateuszkp96 authored Feb 26, 2025
1 parent 0ba764d commit 9b7255b
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 14 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package pl.touk.nussknacker.ui.listener.services

import pl.touk.nussknacker.engine.api.deployment.ScenarioActivity
import pl.touk.nussknacker.engine.api.process.{ProcessId, ProcessName, VersionId}
import pl.touk.nussknacker.ui.listener.{ListenerScenarioWithDetails, User}

Expand All @@ -21,4 +22,9 @@ trait PullProcessRepository {
ec: ExecutionContext
): Future[Option[ListenerScenarioWithDetails]]

def fetchActivities(processName: ProcessName)(
implicit listenerUser: User,
ec: ExecutionContext
): Future[List[ScenarioActivity]]

}
Original file line number Diff line number Diff line change
@@ -1,17 +1,22 @@
package pl.touk.nussknacker.ui.process.repository

import pl.touk.nussknacker.engine.api.deployment.ScenarioActivity
import pl.touk.nussknacker.engine.api.graph.ScenarioGraph
import pl.touk.nussknacker.engine.api.process.{ProcessId, ProcessName, VersionId}
import pl.touk.nussknacker.ui.api.ListenerApiUser
import pl.touk.nussknacker.ui.listener.{ListenerScenarioWithDetails, User}
import pl.touk.nussknacker.ui.listener.services.{PullProcessRepository => ListenerPullProcessRepository}
import pl.touk.nussknacker.ui.process.scenarioactivity.FetchScenarioActivityService
import pl.touk.nussknacker.ui.process.scenarioactivity.FetchScenarioActivityService.ScenarioActivityFetchError
import pl.touk.nussknacker.ui.security.api.LoggedUser

import scala.concurrent.{ExecutionContext, Future}
import scala.language.implicitConversions

class PullProcessRepository(fetchingProcessRepository: FetchingProcessRepository[Future])
extends ListenerPullProcessRepository {
class PullProcessRepository(
fetchingProcessRepository: FetchingProcessRepository[Future],
activityService: FetchScenarioActivityService
) extends ListenerPullProcessRepository {

private implicit def toLoggedUser(implicit user: User): LoggedUser =
user.asInstanceOf[ListenerApiUser].loggedUser
Expand All @@ -35,9 +40,21 @@ class PullProcessRepository(fetchingProcessRepository: FetchingProcessRepository
)(implicit listenerUser: User, ec: ExecutionContext): Future[Option[ListenerScenarioWithDetails]] = for {
maybeProcessId <- fetchingProcessRepository.fetchProcessId(processName)
processId <- maybeProcessId.fold(
Future.failed[ProcessId](new IllegalArgumentException(s"ProcessId for $processName not found"))
Future.failed[ProcessId](new IllegalArgumentException(s"Scenario with name $processName not found"))
)(Future.successful)
processDetails <- fetchProcessDetailsForId(processId, versionId)
} yield processDetails

override def fetchActivities(
processName: ProcessName,
)(implicit listenerUser: User, ec: ExecutionContext): Future[List[ScenarioActivity]] =
activityService
.fetchActivities(processName, None)
.value
.flatMap {
case Right(value) => Future.successful(value)
case Left(ScenarioActivityFetchError.NoScenario(scenarioName)) =>
Future.failed(throw new IllegalArgumentException(s"Scenario with name $scenarioName not found"))
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -184,16 +184,22 @@ class AkkaHttpBasedRouteProvider(
DBFetchingProcessRepository.createFutureRepository(dbRef, actionRepository, scenarioLabelsRepository)
writeProcessRepository =
ProcessRepository.create(dbRef, designerClock, scenarioActivityRepository, scenarioLabelsRepository, migrations)
processChangeListener = ProcessChangeListenerLoader.loadListeners(
getClass.getClassLoader,
resolvedDesignerConfig,
NussknackerServices(new PullProcessRepository(futureProcessRepository))
)
dmDispatcher =
new DeploymentManagerDispatcher(
processingTypeDataProvider.mapValues(_.deploymentData.validDeploymentManagerOrStub),
futureProcessRepository
)
fetchScenarioActivityService = new FetchScenarioActivityService(
dmDispatcher,
scenarioActivityRepository,
futureProcessRepository,
dbioRunner,
)
processChangeListener = ProcessChangeListenerLoader.loadListeners(
getClass.getClassLoader,
resolvedDesignerConfig,
NussknackerServices(new PullProcessRepository(futureProcessRepository, fetchScenarioActivityService))
)
deploymentsStatusesProvider =
new EngineSideDeploymentStatusesProvider(dmDispatcher, featureTogglesConfig.scenarioStateTimeout)
scenarioStatusProvider = new ScenarioStatusProvider(
Expand Down Expand Up @@ -350,12 +356,6 @@ class AkkaHttpBasedRouteProvider(
processService,
fragmentRepository
)
val fetchScenarioActivityService = new FetchScenarioActivityService(
dmDispatcher,
scenarioActivityRepository,
futureProcessRepository,
dbioRunner,
)
val notificationService = new NotificationServiceImpl(
fetchScenarioActivityService,
actionRepository,
Expand Down

0 comments on commit 9b7255b

Please sign in to comment.