Skip to content

Commit

Permalink
Return 409 for send requests with idempotency key/workflow id that ha…
Browse files Browse the repository at this point in the history
…ve been already submitted.
  • Loading branch information
slinkydeveloper committed May 28, 2024
1 parent bf1e637 commit 4b2ad30
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 2 deletions.
8 changes: 7 additions & 1 deletion crates/ingress-http/src/handler/service_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -280,10 +280,16 @@ where
warn!("Response channel was closed");
return Err(HandlerError::Unavailable);
};
let submit_reattached_to_existing_invocation =
submit_notification.invocation_id != invocation_id;

trace!("Complete external HTTP send request successfully");
Ok(Response::builder()
.status(StatusCode::ACCEPTED)
.status(if submit_reattached_to_existing_invocation {
StatusCode::CONFLICT
} else {
StatusCode::ACCEPTED
})
.header(header::CONTENT_TYPE, APPLICATION_JSON)
.header(X_RESTATE_ID, submit_notification.invocation_id.to_string())
.body(Full::new(
Expand Down
60 changes: 59 additions & 1 deletion crates/ingress-http/src/handler/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -404,6 +404,64 @@ async fn idempotency_key_and_send() {
person: "Francesco".to_string(),
};

let req = hyper::Request::builder()
.uri("http://localhost/greeter.Greeter/greet/send")
.method(Method::POST)
.header("content-type", "application/json")
.header(IDEMPOTENCY_KEY, "123456")
.body(Full::new(Bytes::from(
serde_json::to_vec(&greeting_req).unwrap(),
)))
.unwrap();

let response = handle(req, move |ingress_req| {
// Get the function invocation and assert on it
let (service_invocation, notification_tx) =
ingress_req.expect_one_way_invocation_with_submit_notification();
assert_eq!(
service_invocation.invocation_target.service_name(),
"greeter.Greeter"
);
assert_eq!(service_invocation.invocation_target.handler_name(), "greet");

let greeting_req: GreetingRequest =
serde_json::from_slice(&service_invocation.argument).unwrap();
assert_eq!(&greeting_req.person, "Francesco");

assert_eq!(
service_invocation.idempotency_key,
Some(ByteString::from_static("123456"))
);
assert_eq!(
service_invocation.completion_retention_time,
Some(Duration::from_secs(60 * 60 * 24))
);
assert_that!(
service_invocation.submit_notification_sink,
some(anything())
);

notification_tx
.send(SubmittedInvocationNotification {
invocation_id: service_invocation.invocation_id,
})
.unwrap();
})
.await;

assert_eq!(response.status(), StatusCode::ACCEPTED);
let (_, response_body) = response.into_parts();
let response_bytes = response_body.collect().await.unwrap().to_bytes();
let _: SendResponse = serde_json::from_slice(&response_bytes).unwrap();
}

#[tokio::test]
#[traced_test]
async fn idempotency_key_and_send_with_different_invocation_id() {
let greeting_req = GreetingRequest {
person: "Francesco".to_string(),
};

let req = hyper::Request::builder()
.uri("http://localhost/greeter.Greeter/greet/send")
.method(Method::POST)
Expand Down Expand Up @@ -451,7 +509,7 @@ async fn idempotency_key_and_send() {
})
.await;

assert_eq!(response.status(), StatusCode::ACCEPTED);
assert_eq!(response.status(), StatusCode::CONFLICT);
let (_, response_body) = response.into_parts();
let response_bytes = response_body.collect().await.unwrap().to_bytes();
let send_response: SendResponse = serde_json::from_slice(&response_bytes).unwrap();
Expand Down

0 comments on commit 4b2ad30

Please sign in to comment.