Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor subscription/unsubscription flows #1080

Merged
merged 14 commits into from
Feb 17, 2025
6 changes: 3 additions & 3 deletions ballerina/Ballerina.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[package]
org = "ballerina"
name = "websubhub"
version = "1.13.0"
version = "1.13.1"
authors = ["Ballerina"]
keywords = ["websub", "hub", "publisher", "service", "listener", "client"]
repository = "https://github.com/ballerina-platform/module-ballerina-websubhub"
Expand All @@ -15,5 +15,5 @@ graalvmCompatible = true
[[platform.java21.dependency]]
groupId = "io.ballerina.stdlib"
artifactId = "websubhub-native"
version = "1.13.0"
path = "../native/build/libs/websubhub-native-1.13.0.jar"
version = "1.13.1"
path = "../native/build/libs/websubhub-native-1.13.1-SNAPSHOT.jar"
2 changes: 1 addition & 1 deletion ballerina/CompilerPlugin.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,4 @@ id = "websubhub-compiler-plugin"
class = "io.ballerina.stdlib.websubhub.WebSubHubCompilerPlugin"

[[dependency]]
path = "../compiler-plugin/build/libs/websubhub-compiler-plugin-1.13.0.jar"
path = "../compiler-plugin/build/libs/websubhub-compiler-plugin-1.13.1-SNAPSHOT.jar"
4 changes: 2 additions & 2 deletions ballerina/Dependencies.toml
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ dependencies = [
[[package]]
org = "ballerina"
name = "http"
version = "2.13.0"
version = "2.13.1"
dependencies = [
{org = "ballerina", name = "auth"},
{org = "ballerina", name = "cache"},
Expand Down Expand Up @@ -347,7 +347,7 @@ modules = [
[[package]]
org = "ballerina"
name = "websubhub"
version = "1.13.0"
version = "1.13.1"
dependencies = [
{org = "ballerina", name = "crypto"},
{org = "ballerina", name = "http"},
Expand Down
186 changes: 83 additions & 103 deletions ballerina/http_service.bal
Original file line number Diff line number Diff line change
Expand Up @@ -15,32 +15,23 @@
// under the License.

import ballerina/http;
import ballerina/mime;
import ballerina/log;
import ballerina/mime;

isolated service class HttpService {
*http:Service;

private final HttpToWebsubhubAdaptor adaptor;
private final readonly & ClientConfiguration clientConfig;
private final string hub;
private final int defaultHubLeaseSeconds;
private final boolean isSubscriptionAvailable;
private final boolean isSubscriptionValidationAvailable;
private final boolean isUnsubscriptionAvailable;
private final boolean isUnsubscriptionValidationAvailable;
private final int defaultLeaseSeconds;
private final SubscriptionHandler subscriptionHandler;

isolated function init(HttpToWebsubhubAdaptor adaptor, string hubUrl, int leaseSeconds,
*ClientConfiguration clientConfig) {
*ClientConfiguration clientConfig) {
self.adaptor = adaptor;
self.clientConfig = clientConfig.cloneReadOnly();
self.hub = hubUrl;
self.defaultHubLeaseSeconds = leaseSeconds;
string[] methodNames = adaptor.getServiceMethodNames();
self.isSubscriptionAvailable = isMethodAvailable("onSubscription", methodNames);
self.isSubscriptionValidationAvailable = isMethodAvailable("onSubscriptionValidation", methodNames);
self.isUnsubscriptionAvailable = isMethodAvailable("onUnsubscription", methodNames);
self.isUnsubscriptionValidationAvailable = isMethodAvailable("onUnsubscriptionValidation", methodNames);
self.defaultLeaseSeconds = leaseSeconds;
self.subscriptionHandler = new (adaptor, clientConfig);
}

isolated resource function post .(http:Caller caller, http:Request request, http:Headers headers) returns Error? {
Expand All @@ -49,40 +40,34 @@
if params is error {
response.statusCode = http:STATUS_BAD_REQUEST;
response.setTextPayload(params.message());
return respondToRequest(caller, response);
} else {
string? mode = params[HUB_MODE];
match mode {
MODE_REGISTER => {
http:Response|error result = processTopicRegistration(headers, params, self.adaptor);
return handleResult(caller, result);
}
MODE_DEREGISTER => {
http:Response|error result = processTopicDeregistration(headers, params, self.adaptor);
return handleResult(caller, result);
}
MODE_SUBSCRIBE => {
return self.handleSubscription(caller, headers, params);
}
MODE_UNSUBSCRIBE => {
return self.handleUnsubscription(caller, headers, params);
}
MODE_PUBLISH => {
http:Response|error result = processContentPublish(request, headers, params, self.adaptor);
if result is error {
response.statusCode = http:STATUS_BAD_REQUEST;
response.setTextPayload(result.message());
return respondToRequest(caller, response);
} else {
return respondToRequest(caller, result);
}
}
_ => {
response.statusCode = http:STATUS_BAD_REQUEST;
string errorMessage = "The request does not include valid `hub.mode` form param.";
response.setTextPayload(errorMessage);
return respondToRequest(caller, response);
}
return respondWithResult(caller, response);
}

string? mode = params[HUB_MODE];
match mode {
MODE_REGISTER => {
http:Response|error result = processTopicRegistration(headers, params, self.adaptor);
return respondWithResult(caller, result);
}
MODE_DEREGISTER => {
http:Response|error result = processTopicDeregistration(headers, params, self.adaptor);
return respondWithResult(caller, result);
}
MODE_SUBSCRIBE => {
return self.processSubscription(caller, headers, params);
}
MODE_UNSUBSCRIBE => {
return self.processUnsubscription(caller, headers, params);
}
MODE_PUBLISH => {
http:Response|error result = processContentPublish(request, headers, params, self.adaptor);
return respondWithResult(caller, result);
}
_ => {
response.statusCode = http:STATUS_BAD_REQUEST;
string errorMessage = "The request does not include valid `hub.mode` form param.";
response.setTextPayload(errorMessage);
return respondWithResult(caller, response);
}
}
}
Expand Down Expand Up @@ -122,79 +107,74 @@
return params;
}

isolated function handleSubscription(http:Caller caller, http:Headers headers, map<string> params) returns Error? {
Subscription|error subscription = createSubscriptionMessage(self.hub, self.defaultHubLeaseSeconds, params);
if subscription is Subscription {
http:Response|Redirect result = processSubscription(subscription, headers, self.adaptor, self.isSubscriptionAvailable);
if result is Redirect {
error? redirectError = caller->redirect(new http:Response(), result.code, result.redirectUrls);
if redirectError is error {
log:printError("Error occurred while redirecting the subscription", 'error = redirectError);
}
} else {
int currentStatusCode = result.statusCode;
if currentStatusCode == http:STATUS_ACCEPTED {
check respondToRequest(caller, result);
error? verificationResult = processSubscriptionVerification(headers, self.adaptor, subscription,
self.isSubscriptionValidationAvailable, self.clientConfig);
if verificationResult is error {
log:printError("Error occurred while processing subscription", 'error = verificationResult);
}
return;
}
return respondToRequest(caller, result);
}
} else {
isolated function processSubscription(http:Caller caller, http:Headers headers, map<string> params)
returns Error? {

Subscription|error subscription = createSubscriptionMessage(self.hub, self.defaultLeaseSeconds, params);
if subscription is error {
http:Response response = new;
response.statusCode = http:STATUS_BAD_REQUEST;
response.setTextPayload(subscription.message());
return respondToRequest(caller, response);
return respondWithResult(caller, response);

Check warning on line 118 in ballerina/http_service.bal

View check run for this annotation

Codecov / codecov/patch

ballerina/http_service.bal#L118

Added line #L118 was not covered by tests
}

http:Response|Redirect result = self.subscriptionHandler.intiateSubscription(subscription, headers);
if result is Redirect {
error? redirectError = caller->redirect(new http:Response(), result.code, result.redirectUrls);
if redirectError is error {
log:printError("Error occurred while redirecting the subscription", 'error = redirectError);

Check warning on line 125 in ballerina/http_service.bal

View check run for this annotation

Codecov / codecov/patch

ballerina/http_service.bal#L125

Added line #L125 was not covered by tests
}
return;

Check warning on line 127 in ballerina/http_service.bal

View check run for this annotation

Codecov / codecov/patch

ballerina/http_service.bal#L127

Added line #L127 was not covered by tests
}

check respondWithResult(caller, result);
if result.statusCode != http:STATUS_ACCEPTED {
return;
}

error? verification = self.subscriptionHandler.verifySubscription(subscription, headers);
if verification is error {
log:printError("Error occurred while processing subscription", 'error = verification);
}
}

isolated function handleUnsubscription(http:Caller caller, http:Headers headers, map<string> params) returns Error? {
isolated function processUnsubscription(http:Caller caller, http:Headers headers, map<string> params)
returns Error? {

Unsubscription|error unsubscription = createUnsubscriptionMessage(params);
if unsubscription is Unsubscription {
http:Response result = processUnsubscription(unsubscription, headers, self.adaptor, self.isUnsubscriptionAvailable);
int currentStatusCode = result.statusCode;
if currentStatusCode == http:STATUS_ACCEPTED {
check respondToRequest(caller, result);
error? verificationResult = processUnSubscriptionVerification(headers, self.adaptor, unsubscription,
self.isUnsubscriptionValidationAvailable, self.clientConfig);
if verificationResult is error {
log:printError("Error occurred while processing unsubscription", 'error = verificationResult);
}
return;
}
return respondToRequest(caller, result);
} else {
if unsubscription is error {
http:Response response = new;
response.statusCode = http:STATUS_BAD_REQUEST;
response.setTextPayload(unsubscription.message());
return respondToRequest(caller, response);
return respondWithResult(caller, response);

Check warning on line 149 in ballerina/http_service.bal

View check run for this annotation

Codecov / codecov/patch

ballerina/http_service.bal#L149

Added line #L149 was not covered by tests
}
}
}

isolated function isMethodAvailable(string methodName, string[] methods) returns boolean {
return methods.indexOf(methodName) is int;
http:Response result = self.subscriptionHandler.initiateUnsubscription(unsubscription, headers);
check respondWithResult(caller, result);
if result.statusCode != http:STATUS_ACCEPTED {
return;
}

error? verification = self.subscriptionHandler.verifyUnsubscription(unsubscription, headers);
if verification is error {
log:printError("Error occurred while processing unsubscription", 'error = verification);
}
}
}

isolated function handleResult(http:Caller caller, http:Response|error result) returns Error? {
isolated function respondWithResult(http:Caller caller, http:Response|error result) returns Error? {
http:ListenerError? respondError = ();
if result is error {
http:Response response = new;
response.statusCode = http:STATUS_BAD_REQUEST;
response.setTextPayload(result.message());
return respondToRequest(caller, response);
respondError = caller->respond(response);
} else {
return respondToRequest(caller, result);
respondError = caller->respond(result);
}
}

isolated function respondToRequest(http:Caller caller, http:Response response) returns Error? {
http:ListenerError? responseError = caller->respond(response);
if responseError is http:ListenerError {
return error Error(
"Error occurred while responding to the request ", responseError, statusCode = http:STATUS_INTERNAL_SERVER_ERROR);
if respondError is http:ListenerError {
return error Error("Error occurred while responding to the request ", respondError,
statusCode = http:STATUS_INTERNAL_SERVER_ERROR);

Check warning on line 178 in ballerina/http_service.bal

View check run for this annotation

Codecov / codecov/patch

ballerina/http_service.bal#L177-L178

Added lines #L177 - L178 were not covered by tests
}
}
10 changes: 5 additions & 5 deletions ballerina/natives.bal
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,13 @@ import ballerina/jballerina.java;

isolated class HttpToWebsubhubAdaptor {
isolated function init(Service 'service) {
externInit(self, 'service);
self.externInit('service);
}

isolated function externInit(Service serviceObj) = @java:Method {
'class: "io.ballerina.stdlib.websubhub.NativeHttpToWebsubhubAdaptor"
} external;

isolated function getServiceMethodNames() returns string[] = @java:Method {
'class: "io.ballerina.stdlib.websubhub.NativeHttpToWebsubhubAdaptor"
} external;
Expand Down Expand Up @@ -69,7 +73,3 @@ isolated class HttpToWebsubhubAdaptor {
'class: "io.ballerina.stdlib.websubhub.NativeHttpToWebsubhubAdaptor"
} external;
}

isolated function externInit(HttpToWebsubhubAdaptor adaptor, Service serviceObj) = @java:Method {
'class: "io.ballerina.stdlib.websubhub.NativeHttpToWebsubhubAdaptor"
} external;
Loading
Loading