Skip to content

Commit

Permalink
Add tests for cancellation
Browse files Browse the repository at this point in the history
This commit adds new e2e tests which ensure that cancellation works.
The following call chains are canceled:

* CancelTestService/StartTest --> BlockingService/Block --> BlockingSerivce/Block (inboxed call since BlockingService is singleton)
* CancelTestService/StartTest --> BlockingService/Block --> uncompleted awakeable
* CancelTestService/StartTest --> BlockingService/Block --> very long sleep

This fixes restatedev#228.
  • Loading branch information
tillrohrmann committed Dec 18, 2023
1 parent f91ed8b commit abdabfe
Show file tree
Hide file tree
Showing 8 changed files with 391 additions and 2 deletions.
46 changes: 46 additions & 0 deletions contracts/src/main/proto/cancel_test.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
// Copyright (c) 2023 - Restate Software, Inc., Restate GmbH
//
// This file is part of the Restate e2e tests,
// which are released under the MIT license.
//
// You can find a copy of the license in file LICENSE in the root
// directory of this repository or package, or at
// https://github.com/restatedev/e2e/blob/main/LICENSE

syntax = "proto3";

option java_package = "dev.restate.e2e.services.canceltest";
option java_outer_classname = "CancelTestProto";

import "google/protobuf/empty.proto";
import "dev/restate/ext.proto";

package cancel_test;

service CancelTestService {
option (dev.restate.ext.service_type) = SINGLETON;

rpc StartTest(BlockingRequest) returns (google.protobuf.Empty);
rpc VerifyTest(google.protobuf.Empty) returns (Response);
}

enum BlockingOperation {
CALL = 0;
SLEEP = 1;
AWAKEABLE = 2;
}

message BlockingRequest {
BlockingOperation operation = 1;
}

message Response {
bool is_canceled = 1;
}

service BlockingService {
option (dev.restate.ext.service_type) = SINGLETON;

rpc Block(BlockingRequest) returns (google.protobuf.Empty);
rpc IsUnlocked(google.protobuf.Empty) returns (google.protobuf.Empty);
}
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,14 @@ public static void main(String[] args) {
case EventHandlerGrpc.SERVICE_NAME:
restateHttpEndpointBuilder.withService(new EventHandlerService());
break;
case dev.restate.e2e.services.canceltest.CancelTestServiceGrpc.SERVICE_NAME:
restateHttpEndpointBuilder.withService(
new dev.restate.e2e.services.canceltest.CancelTestService());
break;
case dev.restate.e2e.services.canceltest.BlockingServiceGrpc.SERVICE_NAME:
restateHttpEndpointBuilder.withService(
new dev.restate.e2e.services.canceltest.BlockingService());
break;
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
// Copyright (c) 2023 - Restate Software, Inc., Restate GmbH
//
// This file is part of the Restate e2e tests,
// which are released under the MIT license.
//
// You can find a copy of the license in file LICENSE in the root
// directory of this repository or package, or at
// https://github.com/restatedev/e2e/blob/main/LICENSE

package dev.restate.e2e.services.canceltest;

import dev.restate.e2e.services.awakeableholder.AwakeableHolderProto;
import dev.restate.e2e.services.awakeableholder.AwakeableHolderServiceRestate;
import dev.restate.sdk.Awakeable;
import dev.restate.sdk.RestateContext;
import dev.restate.sdk.common.CoreSerdes;
import dev.restate.sdk.common.TerminalException;
import java.time.Duration;

public class BlockingService extends BlockingServiceRestate.BlockingServiceRestateImplBase {
@Override
public void block(RestateContext context, CancelTestProto.BlockingRequest request)
throws TerminalException {
final BlockingServiceRestate.BlockingServiceRestateClient self =
BlockingServiceRestate.newClient(context);
final AwakeableHolderServiceRestate.AwakeableHolderServiceRestateClient client =
AwakeableHolderServiceRestate.newClient(context);

Awakeable<String> awakeable = context.awakeable(CoreSerdes.STRING_UTF8);
client
.hold(
AwakeableHolderProto.HoldRequest.newBuilder()
.setName("cancel")
.setId(awakeable.id())
.build())
.await();
awakeable.await();

switch (request.getOperation()) {
case CALL:
self.block(request).await();
break;
case SLEEP:
context.sleep(Duration.ofDays(1024));
break;
case AWAKEABLE:
Awakeable<String> uncompletable = context.awakeable(CoreSerdes.STRING_UTF8);
uncompletable.await();
break;
default:
throw new IllegalArgumentException("Unknown operation: " + request.getOperation());
}
}

@Override
public void isUnlocked(RestateContext context) throws TerminalException {
// no-op
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
// Copyright (c) 2023 - Restate Software, Inc., Restate GmbH
//
// This file is part of the Restate e2e tests,
// which are released under the MIT license.
//
// You can find a copy of the license in file LICENSE in the root
// directory of this repository or package, or at
// https://github.com/restatedev/e2e/blob/main/LICENSE

package dev.restate.e2e.services.canceltest;

import dev.restate.sdk.RestateContext;
import dev.restate.sdk.common.CoreSerdes;
import dev.restate.sdk.common.StateKey;
import dev.restate.sdk.common.TerminalException;

public class CancelTestService extends CancelTestServiceRestate.CancelTestServiceRestateImplBase {
private static StateKey<Boolean> CANCELED_STATE = StateKey.of("canceled", CoreSerdes.BOOLEAN);

@Override
public void startTest(RestateContext ctx, CancelTestProto.BlockingRequest request)
throws TerminalException {
BlockingServiceRestate.BlockingServiceRestateClient client =
BlockingServiceRestate.newClient(ctx);

try {
client.block(request).await();
} catch (TerminalException e) {
if (e.getCode() == TerminalException.Code.CANCELLED) {
ctx.set(CANCELED_STATE, true);
} else {
throw e;
}
}
}

@Override
public CancelTestProto.Response verifyTest(RestateContext context) throws TerminalException {
return CancelTestProto.Response.newBuilder()
.setIsCanceled(context.get(CANCELED_STATE).orElse(false))
.build();
}
}
20 changes: 19 additions & 1 deletion services/node-services/src/app.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import { protoMetadata as rngProtoMetadata } from "./generated/rng";
import { protoMetadata as awakeableHolderProtoMetadata } from "./generated/awakeable_holder";
import { protoMetadata as eventHandlerProtoMetadata } from "./generated/event_handler";
import { protoMetadata as killTestProtoMetadata } from "./generated/kill_test";
import { protoMetadata as cancelTestProtoMetadata } from "./generated/cancel_test";
import { CounterService, CounterServiceFQN } from "./counter";
import { ListService, ListServiceFQN } from "./collections";
import { FailingService, FailingServiceFQN } from "./errors";
Expand Down Expand Up @@ -57,6 +58,7 @@ import {
import { EventHandlerFQN, EventHandlerService } from "./event_handler";
import { startEmbeddedHandlerServer } from "./embedded_handler_api";
import { KillSingletonServiceFQN, KillTestService, KillTestServiceFQN, KillSingletonService } from "./kill_test";
import { BlockingServiceFQN, CancelTestService, CancelTestServiceFQN, BlockingService } from "./cancel_test";

let serverBuilder;
export let handler: (event: any) => Promise<any>;
Expand Down Expand Up @@ -209,7 +211,23 @@ const services = new Map<
service: "KillSingletonService",
instance: new KillSingletonService(),
}
]
],
[
CancelTestServiceFQN,
{
descriptor: cancelTestProtoMetadata,
service: "CancelTestService",
instance: new CancelTestService(),
}
],
[
BlockingServiceFQN,
{
descriptor: cancelTestProtoMetadata,
service: "BlockingService",
instance: new BlockingService(),
}
],
]);
console.log("Known services: " + services.keys());

Expand Down
2 changes: 1 addition & 1 deletion services/node-services/src/awakeable_holder.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ export class AwakeableHolderService implements IAwakeableHolderService {
const ctx = restate.useContext(this);

return {
hasAwakeable: (await ctx.get<string>(ID_KEY)) !== undefined,
hasAwakeable: (await ctx.get<string>(ID_KEY)) !== null,
};
}

Expand Down
86 changes: 86 additions & 0 deletions services/node-services/src/cancel_test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
// Copyright (c) 2023 - Restate Software, Inc., Restate GmbH
//
// This file is part of the Restate e2e tests,
// which are released under the MIT license.
//
// You can find a copy of the license in file LICENSE in the root
// directory of this repository or package, or at
// https://github.com/restatedev/e2e/blob/main/LICENSE

import * as restate from "@restatedev/restate-sdk";

import {
CancelTestService as ICancelTestService,
BlockingService as IBlockingService,
protobufPackage,
BlockingServiceClientImpl,
Response,
BlockingOperation,
BlockingRequest,
} from "./generated/cancel_test";
import { Empty } from "./generated/google/protobuf/empty";
import { AwakeableHolderServiceClientImpl } from "./generated/awakeable_holder";

export const CancelTestServiceFQN = protobufPackage + ".CancelTestService";
export const BlockingServiceFQN = protobufPackage + ".BlockingService";

export class CancelTestService implements ICancelTestService {
async verifyTest(request: Empty): Promise<Response> {
const ctx = restate.useContext(this);
const isCanceled = await ctx.get<boolean>("canceled") ?? false;

return Response.create({ isCanceled: isCanceled });
}

async startTest(request: BlockingRequest): Promise<Empty> {
const ctx = restate.useContext(this);
const client = new BlockingServiceClientImpl(ctx);

try {
await client.block(request);
} catch (e) {
if (e instanceof restate.TerminalError && (e as restate.TerminalError).code === restate.ErrorCodes.CANCELLED) {
ctx.set("canceled", true);
} else {
throw e;
}
}

return Empty.create({});
}

}

export class BlockingService implements IBlockingService {
async block(request: BlockingRequest): Promise<Empty> {
const ctx = restate.useContext(this);
const client = new AwakeableHolderServiceClientImpl(ctx);
const self = new BlockingServiceClientImpl(ctx);

const { id, promise } = ctx.awakeable();
client.hold({ name: "cancel", id })
await promise;

switch (request.operation) {
case BlockingOperation.CALL: {
await self.block(request);
break;
}
case BlockingOperation.SLEEP: {
await ctx.sleep(1_000_000_000);
break;
}
case BlockingOperation.AWAKEABLE: {
const { id: _, promise: uncompletable_promise } = ctx.awakeable();
await uncompletable_promise;
break;
}
}

return Empty.create({});
}

async isUnlocked(request: Empty): Promise<Empty> {
return Empty.create({});
}
}
Loading

0 comments on commit abdabfe

Please sign in to comment.