Skip to content

Commit

Permalink
[NU-1806] Add cancel request dto (instead of plain text comment) (#7562)
Browse files Browse the repository at this point in the history
  • Loading branch information
gskrobisz authored Feb 18, 2025
1 parent b67a283 commit f4b37ac
Show file tree
Hide file tree
Showing 7 changed files with 105 additions and 36 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package pl.touk.nussknacker.restmodel

import io.circe.generic.JsonCodec

@JsonCodec final case class CancelRequest(
comment: Option[String]
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package pl.touk.nussknacker.restmodel

import io.circe.generic.JsonCodec
import pl.touk.nussknacker.engine.api.component.NodesDeploymentData

@JsonCodec final case class DeployRequest(
comment: Option[String],
nodesDeploymentData: Option[NodesDeploymentData]
)
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import akka.http.scaladsl.server._
import akka.http.scaladsl.unmarshalling.{FromEntityUnmarshaller, Unmarshaller}
import com.typesafe.scalalogging.LazyLogging
import de.heikoseeberger.akkahttpcirce.FailFastCirceSupport
import io.circe.generic.JsonCodec
import io.circe.generic.extras.semiauto.deriveConfiguredEncoder
import io.circe.{Decoder, Encoder, Json, parser}
import io.dropwizard.metrics5.MetricRegistry
Expand All @@ -17,7 +16,7 @@ import pl.touk.nussknacker.engine.api.deployment.DeploymentUpdateStrategy.StateR
import pl.touk.nussknacker.engine.api.deployment._
import pl.touk.nussknacker.engine.api.graph.ScenarioGraph
import pl.touk.nussknacker.engine.testmode.TestProcess._
import pl.touk.nussknacker.restmodel.{RunOffScheduleRequest, RunOffScheduleResponse}
import pl.touk.nussknacker.restmodel.{CancelRequest, DeployRequest, RunOffScheduleRequest, RunOffScheduleResponse}
import pl.touk.nussknacker.ui.api.ProcessesResources.ProcessUnmarshallingError
import pl.touk.nussknacker.ui.api.description.NodesApiEndpoints.Dtos.AdhocTestParametersRequest
import pl.touk.nussknacker.ui.metrics.TimeMeasuring.measureTime
Expand Down Expand Up @@ -62,11 +61,6 @@ object ManagementResources {

}

@JsonCodec final case class RunDeploymentRequest(
nodesDeploymentData: Option[NodesDeploymentData],
comment: Option[String]
)

}

class ManagementResources(
Expand All @@ -91,25 +85,50 @@ class ManagementResources(
private implicit final val plainBytes: FromEntityUnmarshaller[Array[Byte]] = Unmarshaller.byteArrayUnmarshaller
private implicit final val plainString: FromEntityUnmarshaller[String] = Unmarshaller.stringUnmarshaller

// TODO: This is workaround for touk/nussknacker-example-scenarios-library that deploys tests with plain text comment.
// TODO: This (deployRequestEntity and cancelRequestEntity) is used as a transition from comment-as-plain-text-body to json.
// e.g. touk/nussknacker-example-scenarios-library, that is used in e2e tests, uses plain text comment.
// https://github.com/TouK/nussknacker-scenario-examples-library/pull/7
private def deployRequestEntity: Directive1[RunDeploymentRequest] = {
// To be replaced by `entity(as[DeployRequest]))` and `entity(as[CancelRequest]))`.
private def deployRequestEntity: Directive1[DeployRequest] = {
entity(as[Option[String]]).flatMap { optStr =>
{
optStr match {
case None => provide(RunDeploymentRequest(None, None))
case None => provide(DeployRequest(None, None))
case Some(body) =>
io.circe.parser.parse(body) match {
case Right(json) =>
json.as[RunDeploymentRequest] match {
json.as[DeployRequest] match {
case Right(request) =>
provide(request)
case Left(notValidDeployRequest) =>
reject(MalformedRequestContentRejection("Invalid deployment request", notValidDeployRequest))
reject(MalformedRequestContentRejection("Invalid deploy request", notValidDeployRequest))
}
case Left(notJson) =>
// assume deployment request contains plaintext comment only
provide(RunDeploymentRequest(None, Some(body)))
provide(DeployRequest(Some(body), None))
}
}
}
}
}

private def cancelRequestEntity: Directive1[CancelRequest] = {
entity(as[Option[String]]).flatMap { optStr =>
{
optStr match {
case None => provide(CancelRequest(None))
case Some(body) =>
io.circe.parser.parse(body) match {
case Right(json) =>
json.as[CancelRequest] match {
case Right(request) =>
provide(request)
case Left(notValidRequest) =>
reject(MalformedRequestContentRejection("Invalid cancel request", notValidRequest))
}
case Left(notJson) =>
// assume cancel request contains plaintext comment only
provide(CancelRequest(Some(body)))
}
}
}
Expand Down Expand Up @@ -187,13 +206,13 @@ class ManagementResources(
}
} ~
path("cancel" / ProcessNameSegment) { processName =>
(post & processId(processName) & entity(as[Option[String]])) { (processIdWithName, comment) =>
(post & processId(processName) & cancelRequestEntity) { (processIdWithName, request) =>
canDeploy(processIdWithName) {
complete {
measureTime("cancel", metricRegistry) {
deploymentService.processCommand(
CancelScenarioCommand(commonData =
CommonCommandData(processIdWithName, comment.flatMap(Comment.from), user)
CommonCommandData(processIdWithName, request.comment.flatMap(Comment.from), user)
)
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import pl.touk.nussknacker.engine.api.process.VersionId.initialVersionId
import pl.touk.nussknacker.engine.api.process._
import pl.touk.nussknacker.engine.canonicalgraph.CanonicalProcess
import pl.touk.nussknacker.engine.definition.test.{ModelDataTestInfoProvider, TestInfoProvider}
import pl.touk.nussknacker.restmodel.{CancelRequest, DeployRequest}
import pl.touk.nussknacker.restmodel.scenariodetails.ScenarioWithDetails
import pl.touk.nussknacker.security.Permission
import pl.touk.nussknacker.test.EitherValuesDetailedMessage
Expand All @@ -42,7 +43,6 @@ import pl.touk.nussknacker.test.mock.{
import pl.touk.nussknacker.test.utils.domain.TestFactory._
import pl.touk.nussknacker.test.utils.domain.{ProcessTestData, TestFactory}
import pl.touk.nussknacker.test.utils.scalas.AkkaHttpExtensions.toRequestEntity
import pl.touk.nussknacker.ui.api.ManagementResources.RunDeploymentRequest
import pl.touk.nussknacker.ui.api._
import pl.touk.nussknacker.ui.config.scenariotoolbar.CategoriesScenarioToolbarsConfigParser
import pl.touk.nussknacker.ui.config.FeatureTogglesConfig
Expand Down Expand Up @@ -378,13 +378,13 @@ trait NuResourcesTest
s"/processManagement/deploy/$processName",
HttpEntity(
ContentTypes.`application/json`,
RunDeploymentRequest(None, comment).asJson.noSpaces
DeployRequest(comment, None).asJson.noSpaces
)
) ~>
withPermissions(deployRoute(), Permission.Deploy, Permission.Read)

// TODO: See comment in ManagementResources.deployRequestEntity
protected def deployProcessCommentOnly(
protected def deployProcessCommentDeprecated(
processName: ProcessName,
comment: Option[String] = None
): RouteTestResult =
Expand All @@ -397,6 +397,20 @@ trait NuResourcesTest
protected def cancelProcess(
processName: ProcessName,
comment: Option[String] = None
): RouteTestResult =
Post(
s"/processManagement/cancel/$processName",
HttpEntity(
ContentTypes.`application/json`,
CancelRequest(comment).asJson.noSpaces
)
) ~>
withPermissions(deployRoute(), Permission.Deploy, Permission.Read)

// TODO: See comment in ManagementResources.deployRequestEntity
protected def cancelProcessCommentDeprecated(
processName: ProcessName,
comment: Option[String] = None
): RouteTestResult =
Post(
s"/processManagement/cancel/$processName",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,14 @@ import pl.touk.nussknacker.engine.api.{MetaData, StreamMetaData}
import pl.touk.nussknacker.engine.build.ScenarioBuilder
import pl.touk.nussknacker.engine.kafka.KafkaFactory
import pl.touk.nussknacker.engine.spel.SpelExtension._
import pl.touk.nussknacker.restmodel.DeployRequest
import pl.touk.nussknacker.restmodel.scenariodetails._
import pl.touk.nussknacker.security.Permission
import pl.touk.nussknacker.test.PatientScalaFutures
import pl.touk.nussknacker.test.base.it.NuResourcesTest
import pl.touk.nussknacker.test.mock.MockDeploymentManagerSyntaxSugar.Ops
import pl.touk.nussknacker.test.utils.domain.TestFactory.{withAllPermissions, withPermissions}
import pl.touk.nussknacker.test.utils.domain.{ProcessTestData, TestFactory}
import pl.touk.nussknacker.ui.api.ManagementResources.RunDeploymentRequest
import pl.touk.nussknacker.ui.api.description.scenarioActivity.Dtos
import pl.touk.nussknacker.ui.process.ScenarioQuery
import pl.touk.nussknacker.ui.process.exception.ProcessIllegalAction
Expand Down Expand Up @@ -130,28 +130,49 @@ class ManagementResourcesSpec
// TODO: To be removed. See comment in ManagementResources.deployRequestEntity
test("deploys and cancels with plain text comment") {
saveCanonicalProcessAndAssertSuccess(ProcessTestData.sampleScenario)
deployProcessCommentOnly(
deployProcessCommentDeprecated(
ProcessTestData.sampleScenario.name,
comment = Some("deployComment")
) ~> checkThatEventually {
getProcess(processName) ~> check {
val processDetails = responseAs[ScenarioWithDetails]
processDetails.lastStateAction.exists(_.actionName == ScenarioActionName.Deploy) shouldBe true
}
cancelProcessCommentDeprecated(
ProcessTestData.sampleScenario.name,
comment = Some("cancelComment")
) ~> checkThatEventually {
status shouldBe StatusCodes.OK
getProcess(processName) ~> check {
val processDetails = responseAs[ScenarioWithDetails]
processDetails.lastStateAction.exists(_.actionName == ScenarioActionName.Cancel) shouldBe true
}
}
}
}

// TODO: To be removed. See comment in ManagementResources.deployRequestEntity
test("deploys and cancels with plain text no comment") {
saveCanonicalProcessAndAssertSuccess(ProcessTestData.sampleScenario)
deployProcessCommentOnly(
deployProcessCommentDeprecated(
ProcessTestData.sampleScenario.name,
comment = None
) ~> checkThatEventually {
status shouldBe StatusCodes.OK
getProcess(processName) ~> check {
val processDetails = responseAs[ScenarioWithDetails]
processDetails.lastStateAction.exists(_.actionName == ScenarioActionName.Deploy) shouldBe true
}
cancelProcessCommentDeprecated(
ProcessTestData.sampleScenario.name,
comment = None
) ~> checkThatEventually {
status shouldBe StatusCodes.OK
getProcess(processName) ~> check {
val processDetails = responseAs[ScenarioWithDetails]
processDetails.lastStateAction.exists(_.actionName == ScenarioActionName.Cancel) shouldBe true
}
}
}
}

Expand Down Expand Up @@ -271,7 +292,7 @@ class ManagementResourcesSpec
s"/processManagement/deploy/${ProcessTestData.sampleScenario.name}",
HttpEntity(
ContentTypes.`application/json`,
RunDeploymentRequest(None, None).asJson.noSpaces
DeployRequest(None, None).asJson.noSpaces
)
) ~> withPermissions(
deployRoute(),
Expand Down
5 changes: 3 additions & 2 deletions docs/Changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -88,9 +88,10 @@
* [#7537](https://github.com/TouK/nussknacker/pull/7537) Collection helper improvements:
* preserved elements order in #COLLECTION.merge and #COLLECTION.distinct functions
* additional check for #COLLECTION.min and #COLLECTION.max if elements have a Comparable type
* [#6860](https://github.com/TouK/nussknacker/pull/6860) Ability to configure deploy action parameters and apply those parameters in deploy http request.
* [#6860](https://github.com/TouK/nussknacker/pull/6860) [#7562](https://github.com/TouK/nussknacker/pull/7562) Added optional configuration of action parameters and applied those parameters in deploy http request.
* Kafka source has "offset reset strategy" parameter that controls starting point for reading events.
* Configuration entry `kafkaEspProperties.forceLatestRead` is replaced with `kafkaEspProperties.defaultOffsetResetStrategy`
* The http request for `/deploy` and `/cancel` require valid json instead of plain text message.
* Configuration entry `kafkaEspProperties.forceLatestRead` is replaced with `kafkaEspProperties.defaultOffsetResetStrategy` with possible values: "ToLatest", "ToEarliest", "None".
* [#7545](https://github.com/TouK/nussknacker/pull/7545) Added `useMiniClusterForDeployment` option allowing to run Flink scenarios on Flink MiniCluster
* [#7568](https://github.com/TouK/nussknacker/pull/7568) The "JSON" button was renamed to "Export" to mark that it generates data usable in "Import"

Expand Down
22 changes: 10 additions & 12 deletions docs/MigrationGuide.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,20 @@ To see the biggest differences please consult the [changelog](Changelog.md).
* enabled - if set to false stickyNotes feature is disabled, stickyNotes cant be created, they are also not loaded to graph
* [#7534](https://github.com/TouK/nussknacker/pull/7534) `shouldVerifyBeforeDeploy` configuration entry available for Flink deployment
was renamed to `scenarioStateVerification.enabled`
* [#6860](https://github.com/TouK/nussknacker/pull/6860) Configuration entry `kafkaEspProperties.forceLatestRead` is replaced with `kafkaEspProperties.defaultOffsetResetStrategy`:
* forceLatestRead is missing -> keep defaultOffsetResetStrategy missing or set to "None"
* forceLatestRead: false -> defaultOffsetResetStrategy: "None"
* forceLatestRead: true -> defaultOffsetResetStrategy: "ToLatest"
* [#7568](https://github.com/TouK/nussknacker/pull/7568) The `process-json` button in `processToolbarConfig` was renamed to `process-export`

### REST API changes

* [#7563](https://github.com/TouK/nussknacker/pull/7563) `ProcessAction.buildInfo` field was renamed to `ProcessAction.modelInfo` and is optional now.
`ProcessAction` type is used in `ScenarioWithDetails.lastDeployedAction`, `ScenarioWithDetails.lastStateAction` and `ScenarioWithDetails.lastAction`
which are returned by `/processes`, `/processesDetails` endpoints. It is also used by `/components/$id/usages` endpoint
* [#6860](https://github.com/TouK/nussknacker/pull/6860) [#7562](https://github.com/TouK/nussknacker/pull/7562)
* Deploy and cancel http request requires valid json in request body (see `DeployRequest` and `CancelRequest`) instead of plain text, e.g. `{"comment": "example text"}`.
* For KafkaFlinkSource it is possible to provide optional deployment parameter, e.g. `{"comment": "example text", "nodesDeploymentData": {"my_source_node_id": {"offsetResetStrategy": "ToLatest"}}}`.

### Code API changes

Expand Down Expand Up @@ -123,12 +130,6 @@ To see the biggest differences please consult the [changelog](Changelog.md).
* [#7468](https://github.com/TouK/nussknacker/pull/7468) When a namespace is configured, Kafka consumer groups are also namespaced.
This change should have been introduced as of starting from Nussknacker 1.15 when a feature flag `useNamingStrategyForConsumerGroupId`
was removed to temporarily disable consumer group namespacing.
* [#6860](https://github.com/TouK/nussknacker/pull/6860) Deploy http request requires valid json in request body (see `RunDeploymentRequest`) instead of plain text, e.g. `{"comment": "example text"}`.
For KafkaFlinkSource it is possible to provide optional deployment parameter, e.g. `{"comment": "example text", "nodesDeploymentData": {"my_source_node_id": {"offsetResetStrategy": "Reset"}}}`.
Configuration entry `kafkaEspProperties.forceLatestRead` is replaced with `kafkaEspProperties.defaultOffsetResetStrategy`:
* forceLatestRead is missing -> keep defaultOffsetResetStrategy missing or set to "None"
* forceLatestRead: false -> defaultOffsetResetStrategy: "None"
* forceLatestRead: true -> defaultOffsetResetStrategy: "ToLatest"
## In version 1.18.0
Expand All @@ -155,6 +156,9 @@ To see the biggest differences please consult the [changelog](Changelog.md).
```
* [#6979](https://github.com/TouK/nussknacker/pull/6979) Add `type: "activities-panel"` to the `processToolbarConfig` which replaces removed `{ type: "versions-panel" }` `{ type: "comments-panel" }` and `{ type: "attachments-panel" }`
* [#6958](https://github.com/TouK/nussknacker/pull/6958) Added message size limit in the "Kafka" exceptionHandler: `maxMessageBytes`.
Its default value reflects Kafka's default size limit of 1 MB (`max.message.bytes`), you need to increase it if your
error topic allows for larger messages. Remember to add some margin for Kafka protocol overhead (100 bytes should be enough).
### Code API changes
Expand Down Expand Up @@ -200,12 +204,6 @@ To see the biggest differences please consult the [changelog](Changelog.md).
* [#7246](https://github.com/TouK/nussknacker/pull/7246) Changes in DictApiEndpoints:
* `DictListRequestDto` `expectedType`: TypingResultInJson -> Json
### Configuration changes
* [#6958](https://github.com/TouK/nussknacker/pull/6958) Added message size limit in the "Kafka" exceptionHandler: `maxMessageBytes`.
Its default value reflects Kafka's default size limit of 1 MB (`max.message.bytes`), you need to increase it if your
error topic allows for larger messages. Remember to add some margin for Kafka protocol overhead (100 bytes should be enough).
### Other changes
* [#6692](https://github.com/TouK/nussknacker/pull/6692) Kryo serializers for `UnmodifiableCollection`, `scala.Product` etc.
Expand Down

0 comments on commit f4b37ac

Please sign in to comment.