From 4b2ad30d908f8d52111998a8c16f42d46ba9f67d Mon Sep 17 00:00:00 2001 From: slinkydeveloper Date: Tue, 28 May 2024 11:59:12 +0200 Subject: [PATCH] Return 409 for send requests with idempotency key/workflow id that have been already submitted. --- .../src/handler/service_handler.rs | 8 ++- crates/ingress-http/src/handler/tests.rs | 60 ++++++++++++++++++- 2 files changed, 66 insertions(+), 2 deletions(-) diff --git a/crates/ingress-http/src/handler/service_handler.rs b/crates/ingress-http/src/handler/service_handler.rs index b00bf0075..94cb2f4e1 100644 --- a/crates/ingress-http/src/handler/service_handler.rs +++ b/crates/ingress-http/src/handler/service_handler.rs @@ -280,10 +280,16 @@ where warn!("Response channel was closed"); return Err(HandlerError::Unavailable); }; + let submit_reattached_to_existing_invocation = + submit_notification.invocation_id != invocation_id; trace!("Complete external HTTP send request successfully"); Ok(Response::builder() - .status(StatusCode::ACCEPTED) + .status(if submit_reattached_to_existing_invocation { + StatusCode::CONFLICT + } else { + StatusCode::ACCEPTED + }) .header(header::CONTENT_TYPE, APPLICATION_JSON) .header(X_RESTATE_ID, submit_notification.invocation_id.to_string()) .body(Full::new( diff --git a/crates/ingress-http/src/handler/tests.rs b/crates/ingress-http/src/handler/tests.rs index c17ede9f9..d28fcb784 100644 --- a/crates/ingress-http/src/handler/tests.rs +++ b/crates/ingress-http/src/handler/tests.rs @@ -404,6 +404,64 @@ async fn idempotency_key_and_send() { person: "Francesco".to_string(), }; + let req = hyper::Request::builder() + .uri("http://localhost/greeter.Greeter/greet/send") + .method(Method::POST) + .header("content-type", "application/json") + .header(IDEMPOTENCY_KEY, "123456") + .body(Full::new(Bytes::from( + serde_json::to_vec(&greeting_req).unwrap(), + ))) + .unwrap(); + + let response = handle(req, move |ingress_req| { + // Get the function invocation and assert on it + let (service_invocation, notification_tx) = + ingress_req.expect_one_way_invocation_with_submit_notification(); + assert_eq!( + service_invocation.invocation_target.service_name(), + "greeter.Greeter" + ); + assert_eq!(service_invocation.invocation_target.handler_name(), "greet"); + + let greeting_req: GreetingRequest = + serde_json::from_slice(&service_invocation.argument).unwrap(); + assert_eq!(&greeting_req.person, "Francesco"); + + assert_eq!( + service_invocation.idempotency_key, + Some(ByteString::from_static("123456")) + ); + assert_eq!( + service_invocation.completion_retention_time, + Some(Duration::from_secs(60 * 60 * 24)) + ); + assert_that!( + service_invocation.submit_notification_sink, + some(anything()) + ); + + notification_tx + .send(SubmittedInvocationNotification { + invocation_id: service_invocation.invocation_id, + }) + .unwrap(); + }) + .await; + + assert_eq!(response.status(), StatusCode::ACCEPTED); + let (_, response_body) = response.into_parts(); + let response_bytes = response_body.collect().await.unwrap().to_bytes(); + let _: SendResponse = serde_json::from_slice(&response_bytes).unwrap(); +} + +#[tokio::test] +#[traced_test] +async fn idempotency_key_and_send_with_different_invocation_id() { + let greeting_req = GreetingRequest { + person: "Francesco".to_string(), + }; + let req = hyper::Request::builder() .uri("http://localhost/greeter.Greeter/greet/send") .method(Method::POST) @@ -451,7 +509,7 @@ async fn idempotency_key_and_send() { }) .await; - assert_eq!(response.status(), StatusCode::ACCEPTED); + assert_eq!(response.status(), StatusCode::CONFLICT); let (_, response_body) = response.into_parts(); let response_bytes = response_body.collect().await.unwrap().to_bytes(); let send_response: SendResponse = serde_json::from_slice(&response_bytes).unwrap();