From 9e7ee4e587c060fd3ecbbc95ba132436aa88ef9f Mon Sep 17 00:00:00 2001 From: Ayesh Almeida Date: Thu, 13 Feb 2025 22:39:14 +0530 Subject: [PATCH 01/14] [Automated] Update the native jar versions --- ballerina/Ballerina.toml | 6 +++--- ballerina/CompilerPlugin.toml | 2 +- ballerina/Dependencies.toml | 4 ++-- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/ballerina/Ballerina.toml b/ballerina/Ballerina.toml index 2555f93f..851c97f5 100644 --- a/ballerina/Ballerina.toml +++ b/ballerina/Ballerina.toml @@ -1,7 +1,7 @@ [package] org = "ballerina" name = "websubhub" -version = "1.13.0" +version = "1.13.1" authors = ["Ballerina"] keywords = ["websub", "hub", "publisher", "service", "listener", "client"] repository = "https://github.com/ballerina-platform/module-ballerina-websubhub" @@ -15,5 +15,5 @@ graalvmCompatible = true [[platform.java21.dependency]] groupId = "io.ballerina.stdlib" artifactId = "websubhub-native" -version = "1.13.0" -path = "../native/build/libs/websubhub-native-1.13.0.jar" +version = "1.13.1" +path = "../native/build/libs/websubhub-native-1.13.1-SNAPSHOT.jar" diff --git a/ballerina/CompilerPlugin.toml b/ballerina/CompilerPlugin.toml index ced877a5..e3752ed3 100644 --- a/ballerina/CompilerPlugin.toml +++ b/ballerina/CompilerPlugin.toml @@ -3,4 +3,4 @@ id = "websubhub-compiler-plugin" class = "io.ballerina.stdlib.websubhub.WebSubHubCompilerPlugin" [[dependency]] -path = "../compiler-plugin/build/libs/websubhub-compiler-plugin-1.13.0.jar" +path = "../compiler-plugin/build/libs/websubhub-compiler-plugin-1.13.1-SNAPSHOT.jar" diff --git a/ballerina/Dependencies.toml b/ballerina/Dependencies.toml index a3ce3b03..d5719c0e 100644 --- a/ballerina/Dependencies.toml +++ b/ballerina/Dependencies.toml @@ -73,7 +73,7 @@ dependencies = [ [[package]] org = "ballerina" name = "http" -version = "2.13.0" +version = "2.13.1" dependencies = [ {org = "ballerina", name = "auth"}, {org = "ballerina", name = "cache"}, @@ -347,7 +347,7 @@ modules = [ [[package]] org = "ballerina" name = "websubhub" -version = "1.13.0" +version = "1.13.1" dependencies = [ {org = "ballerina", name = "crypto"}, {org = "ballerina", name = "http"}, From a1cf5a84a3a3003c9735bcceb0be7dfdf008b42f Mon Sep 17 00:00:00 2001 From: Ayesh Almeida Date: Thu, 13 Feb 2025 23:04:04 +0530 Subject: [PATCH 02/14] Restructure HTTP service by removing unwanted indentation --- ballerina/http_service.bal | 110 ++++++++++++++++++------------------- 1 file changed, 55 insertions(+), 55 deletions(-) diff --git a/ballerina/http_service.bal b/ballerina/http_service.bal index f638f710..7df20eb6 100644 --- a/ballerina/http_service.bal +++ b/ballerina/http_service.bal @@ -50,39 +50,38 @@ isolated service class HttpService { response.statusCode = http:STATUS_BAD_REQUEST; response.setTextPayload(params.message()); return respondToRequest(caller, response); - } else { - string? mode = params[HUB_MODE]; - match mode { - MODE_REGISTER => { - http:Response|error result = processTopicRegistration(headers, params, self.adaptor); - return handleResult(caller, result); - } - MODE_DEREGISTER => { - http:Response|error result = processTopicDeregistration(headers, params, self.adaptor); - return handleResult(caller, result); - } - MODE_SUBSCRIBE => { - return self.handleSubscription(caller, headers, params); - } - MODE_UNSUBSCRIBE => { - return self.handleUnsubscription(caller, headers, params); - } - MODE_PUBLISH => { - http:Response|error result = processContentPublish(request, headers, params, self.adaptor); - if result is error { - response.statusCode = http:STATUS_BAD_REQUEST; - response.setTextPayload(result.message()); - return respondToRequest(caller, response); - } else { - return respondToRequest(caller, result); - } - } - _ => { + } + + string? mode = params[HUB_MODE]; + match mode { + MODE_REGISTER => { + http:Response|error result = processTopicRegistration(headers, params, self.adaptor); + return handleResult(caller, result); + } + MODE_DEREGISTER => { + http:Response|error result = processTopicDeregistration(headers, params, self.adaptor); + return handleResult(caller, result); + } + MODE_SUBSCRIBE => { + return self.handleSubscription(caller, headers, params); + } + MODE_UNSUBSCRIBE => { + return self.handleUnsubscription(caller, headers, params); + } + MODE_PUBLISH => { + http:Response|error result = processContentPublish(request, headers, params, self.adaptor); + if result is error { response.statusCode = http:STATUS_BAD_REQUEST; - string errorMessage = "The request does not include valid `hub.mode` form param."; - response.setTextPayload(errorMessage); + response.setTextPayload(result.message()); return respondToRequest(caller, response); } + return respondToRequest(caller, result); + } + _ => { + response.statusCode = http:STATUS_BAD_REQUEST; + string errorMessage = "The request does not include valid `hub.mode` form param."; + response.setTextPayload(errorMessage); + return respondToRequest(caller, response); } } } @@ -131,25 +130,26 @@ isolated service class HttpService { if redirectError is error { log:printError("Error occurred while redirecting the subscription", 'error = redirectError); } - } else { - int currentStatusCode = result.statusCode; - if currentStatusCode == http:STATUS_ACCEPTED { - check respondToRequest(caller, result); - error? verificationResult = processSubscriptionVerification(headers, self.adaptor, subscription, - self.isSubscriptionValidationAvailable, self.clientConfig); - if verificationResult is error { - log:printError("Error occurred while processing subscription", 'error = verificationResult); - } - return; + return; + } + + int currentStatusCode = result.statusCode; + if currentStatusCode == http:STATUS_ACCEPTED { + check respondToRequest(caller, result); + error? verificationResult = processSubscriptionVerification( + headers, self.adaptor, subscription, self.isSubscriptionValidationAvailable, self.clientConfig); + if verificationResult is error { + log:printError("Error occurred while processing subscription", 'error = verificationResult); } - return respondToRequest(caller, result); + return; } - } else { - http:Response response = new; - response.statusCode = http:STATUS_BAD_REQUEST; - response.setTextPayload(subscription.message()); - return respondToRequest(caller, response); + return respondToRequest(caller, result); } + + http:Response response = new; + response.statusCode = http:STATUS_BAD_REQUEST; + response.setTextPayload(subscription.message()); + return respondToRequest(caller, response); } isolated function handleUnsubscription(http:Caller caller, http:Headers headers, map params) returns Error? { @@ -159,20 +159,20 @@ isolated service class HttpService { int currentStatusCode = result.statusCode; if currentStatusCode == http:STATUS_ACCEPTED { check respondToRequest(caller, result); - error? verificationResult = processUnSubscriptionVerification(headers, self.adaptor, unsubscription, - self.isUnsubscriptionValidationAvailable, self.clientConfig); + error? verificationResult = processUnSubscriptionVerification( + headers, self.adaptor, unsubscription, self.isUnsubscriptionValidationAvailable, self.clientConfig); if verificationResult is error { log:printError("Error occurred while processing unsubscription", 'error = verificationResult); } return; } return respondToRequest(caller, result); - } else { - http:Response response = new; - response.statusCode = http:STATUS_BAD_REQUEST; - response.setTextPayload(unsubscription.message()); - return respondToRequest(caller, response); } + + http:Response response = new; + response.statusCode = http:STATUS_BAD_REQUEST; + response.setTextPayload(unsubscription.message()); + return respondToRequest(caller, response); } } @@ -186,9 +186,9 @@ isolated function handleResult(http:Caller caller, http:Response|error result) r response.statusCode = http:STATUS_BAD_REQUEST; response.setTextPayload(result.message()); return respondToRequest(caller, response); - } else { - return respondToRequest(caller, result); } + + return respondToRequest(caller, result); } isolated function respondToRequest(http:Caller caller, http:Response response) returns Error? { From 1233372891df26a4740733edeee7d5c28cb655db Mon Sep 17 00:00:00 2001 From: Ayesh Almeida Date: Thu, 13 Feb 2025 23:12:16 +0530 Subject: [PATCH 03/14] Refactor native http-to-websubhub adaptor --- ballerina/natives.bal | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/ballerina/natives.bal b/ballerina/natives.bal index c5b21739..54c90113 100644 --- a/ballerina/natives.bal +++ b/ballerina/natives.bal @@ -19,9 +19,13 @@ import ballerina/jballerina.java; isolated class HttpToWebsubhubAdaptor { isolated function init(Service 'service) { - externInit(self, 'service); + self.externInit('service); } + isolated function externInit(Service serviceObj) = @java:Method { + 'class: "io.ballerina.stdlib.websubhub.NativeHttpToWebsubhubAdaptor" + } external; + isolated function getServiceMethodNames() returns string[] = @java:Method { 'class: "io.ballerina.stdlib.websubhub.NativeHttpToWebsubhubAdaptor" } external; @@ -69,7 +73,3 @@ isolated class HttpToWebsubhubAdaptor { 'class: "io.ballerina.stdlib.websubhub.NativeHttpToWebsubhubAdaptor" } external; } - -isolated function externInit(HttpToWebsubhubAdaptor adaptor, Service serviceObj) = @java:Method { - 'class: "io.ballerina.stdlib.websubhub.NativeHttpToWebsubhubAdaptor" -} external; From 565e298eff6277a45f16db5693a3c748a86d7df1 Mon Sep 17 00:00:00 2001 From: Ayesh Almeida Date: Fri, 14 Feb 2025 00:27:49 +0530 Subject: [PATCH 04/14] Remove unwanted methods --- ballerina/http_service.bal | 55 +++++++++++++++++--------------------- 1 file changed, 25 insertions(+), 30 deletions(-) diff --git a/ballerina/http_service.bal b/ballerina/http_service.bal index 7df20eb6..115e6f96 100644 --- a/ballerina/http_service.bal +++ b/ballerina/http_service.bal @@ -37,10 +37,10 @@ isolated service class HttpService { self.hub = hubUrl; self.defaultHubLeaseSeconds = leaseSeconds; string[] methodNames = adaptor.getServiceMethodNames(); - self.isSubscriptionAvailable = isMethodAvailable("onSubscription", methodNames); - self.isSubscriptionValidationAvailable = isMethodAvailable("onSubscriptionValidation", methodNames); - self.isUnsubscriptionAvailable = isMethodAvailable("onUnsubscription", methodNames); - self.isUnsubscriptionValidationAvailable = isMethodAvailable("onUnsubscriptionValidation", methodNames); + self.isSubscriptionAvailable = methodNames.indexOf("onSubscription") is int; + self.isSubscriptionValidationAvailable = methodNames.indexOf("onSubscriptionValidation") is int; + self.isUnsubscriptionAvailable = methodNames.indexOf("onUnsubscription") is int; + self.isUnsubscriptionValidationAvailable = methodNames.indexOf("onUnsubscriptionValidation") is int; } isolated resource function post .(http:Caller caller, http:Request request, http:Headers headers) returns Error? { @@ -49,18 +49,18 @@ isolated service class HttpService { if params is error { response.statusCode = http:STATUS_BAD_REQUEST; response.setTextPayload(params.message()); - return respondToRequest(caller, response); + return respondWithResult(caller, response); } string? mode = params[HUB_MODE]; match mode { MODE_REGISTER => { http:Response|error result = processTopicRegistration(headers, params, self.adaptor); - return handleResult(caller, result); + return respondWithResult(caller, result); } MODE_DEREGISTER => { http:Response|error result = processTopicDeregistration(headers, params, self.adaptor); - return handleResult(caller, result); + return respondWithResult(caller, result); } MODE_SUBSCRIBE => { return self.handleSubscription(caller, headers, params); @@ -73,15 +73,15 @@ isolated service class HttpService { if result is error { response.statusCode = http:STATUS_BAD_REQUEST; response.setTextPayload(result.message()); - return respondToRequest(caller, response); + return respondWithResult(caller, response); } - return respondToRequest(caller, result); + return respondWithResult(caller, result); } _ => { response.statusCode = http:STATUS_BAD_REQUEST; string errorMessage = "The request does not include valid `hub.mode` form param."; response.setTextPayload(errorMessage); - return respondToRequest(caller, response); + return respondWithResult(caller, response); } } } @@ -135,7 +135,8 @@ isolated service class HttpService { int currentStatusCode = result.statusCode; if currentStatusCode == http:STATUS_ACCEPTED { - check respondToRequest(caller, result); + check respondWithResult(caller, result); + error? verificationResult = processSubscriptionVerification( headers, self.adaptor, subscription, self.isSubscriptionValidationAvailable, self.clientConfig); if verificationResult is error { @@ -143,13 +144,13 @@ isolated service class HttpService { } return; } - return respondToRequest(caller, result); + return respondWithResult(caller, result); } http:Response response = new; response.statusCode = http:STATUS_BAD_REQUEST; response.setTextPayload(subscription.message()); - return respondToRequest(caller, response); + return respondWithResult(caller, response); } isolated function handleUnsubscription(http:Caller caller, http:Headers headers, map params) returns Error? { @@ -158,7 +159,7 @@ isolated service class HttpService { http:Response result = processUnsubscription(unsubscription, headers, self.adaptor, self.isUnsubscriptionAvailable); int currentStatusCode = result.statusCode; if currentStatusCode == http:STATUS_ACCEPTED { - check respondToRequest(caller, result); + check respondWithResult(caller, result); error? verificationResult = processUnSubscriptionVerification( headers, self.adaptor, unsubscription, self.isUnsubscriptionValidationAvailable, self.clientConfig); if verificationResult is error { @@ -166,35 +167,29 @@ isolated service class HttpService { } return; } - return respondToRequest(caller, result); + return respondWithResult(caller, result); } http:Response response = new; response.statusCode = http:STATUS_BAD_REQUEST; response.setTextPayload(unsubscription.message()); - return respondToRequest(caller, response); + return respondWithResult(caller, response); } } -isolated function isMethodAvailable(string methodName, string[] methods) returns boolean { - return methods.indexOf(methodName) is int; -} - -isolated function handleResult(http:Caller caller, http:Response|error result) returns Error? { +isolated function respondWithResult(http:Caller caller, http:Response|error result) returns Error? { + http:ListenerError? respondError = (); if result is error { http:Response response = new; response.statusCode = http:STATUS_BAD_REQUEST; response.setTextPayload(result.message()); - return respondToRequest(caller, response); + respondError = caller->respond(response); + } else { + respondError = caller->respond(result); } - - return respondToRequest(caller, result); -} -isolated function respondToRequest(http:Caller caller, http:Response response) returns Error? { - http:ListenerError? responseError = caller->respond(response); - if responseError is http:ListenerError { - return error Error( - "Error occurred while responding to the request ", responseError, statusCode = http:STATUS_INTERNAL_SERVER_ERROR); + if respondError is http:ListenerError { + return error Error("Error occurred while responding to the request ", respondError, + statusCode = http:STATUS_INTERNAL_SERVER_ERROR); } } From 6895120272c4203f6bbaac889942189af913ad2a Mon Sep 17 00:00:00 2001 From: Ayesh Almeida Date: Fri, 14 Feb 2025 01:15:57 +0530 Subject: [PATCH 05/14] Refactor hub subcription flow --- ballerina/http_service.bal | 38 +++--- ballerina/subscription.bal | 236 ++++++++++++++++++++----------------- 2 files changed, 144 insertions(+), 130 deletions(-) diff --git a/ballerina/http_service.bal b/ballerina/http_service.bal index 115e6f96..c7f4360c 100644 --- a/ballerina/http_service.bal +++ b/ballerina/http_service.bal @@ -15,32 +15,23 @@ // under the License. import ballerina/http; -import ballerina/mime; import ballerina/log; +import ballerina/mime; isolated service class HttpService { *http:Service; - + private final HttpToWebsubhubAdaptor adaptor; - private final readonly & ClientConfiguration clientConfig; private final string hub; private final int defaultHubLeaseSeconds; - private final boolean isSubscriptionAvailable; - private final boolean isSubscriptionValidationAvailable; - private final boolean isUnsubscriptionAvailable; - private final boolean isUnsubscriptionValidationAvailable; + private final SubscriptionHandler subscriptionHandler; isolated function init(HttpToWebsubhubAdaptor adaptor, string hubUrl, int leaseSeconds, - *ClientConfiguration clientConfig) { + *ClientConfiguration clientConfig) { self.adaptor = adaptor; - self.clientConfig = clientConfig.cloneReadOnly(); self.hub = hubUrl; self.defaultHubLeaseSeconds = leaseSeconds; - string[] methodNames = adaptor.getServiceMethodNames(); - self.isSubscriptionAvailable = methodNames.indexOf("onSubscription") is int; - self.isSubscriptionValidationAvailable = methodNames.indexOf("onSubscriptionValidation") is int; - self.isUnsubscriptionAvailable = methodNames.indexOf("onUnsubscription") is int; - self.isUnsubscriptionValidationAvailable = methodNames.indexOf("onUnsubscriptionValidation") is int; + self.subscriptionHandler = new (adaptor, leaseSeconds, clientConfig); } isolated resource function post .(http:Caller caller, http:Request request, http:Headers headers) returns Error? { @@ -124,7 +115,7 @@ isolated service class HttpService { isolated function handleSubscription(http:Caller caller, http:Headers headers, map params) returns Error? { Subscription|error subscription = createSubscriptionMessage(self.hub, self.defaultHubLeaseSeconds, params); if subscription is Subscription { - http:Response|Redirect result = processSubscription(subscription, headers, self.adaptor, self.isSubscriptionAvailable); + http:Response|Redirect result = self.subscriptionHandler.processSubscription(subscription, headers); if result is Redirect { error? redirectError = caller->redirect(new http:Response(), result.code, result.redirectUrls); if redirectError is error { @@ -132,13 +123,12 @@ isolated service class HttpService { } return; } - + int currentStatusCode = result.statusCode; if currentStatusCode == http:STATUS_ACCEPTED { check respondWithResult(caller, result); - - error? verificationResult = processSubscriptionVerification( - headers, self.adaptor, subscription, self.isSubscriptionValidationAvailable, self.clientConfig); + + error? verificationResult = self.subscriptionHandler.processSubscriptionVerification(subscription, headers); if verificationResult is error { log:printError("Error occurred while processing subscription", 'error = verificationResult); } @@ -146,7 +136,7 @@ isolated service class HttpService { } return respondWithResult(caller, result); } - + http:Response response = new; response.statusCode = http:STATUS_BAD_REQUEST; response.setTextPayload(subscription.message()); @@ -156,12 +146,12 @@ isolated service class HttpService { isolated function handleUnsubscription(http:Caller caller, http:Headers headers, map params) returns Error? { Unsubscription|error unsubscription = createUnsubscriptionMessage(params); if unsubscription is Unsubscription { - http:Response result = processUnsubscription(unsubscription, headers, self.adaptor, self.isUnsubscriptionAvailable); + http:Response result = self.subscriptionHandler.processUnsubscription(unsubscription, headers); int currentStatusCode = result.statusCode; if currentStatusCode == http:STATUS_ACCEPTED { check respondWithResult(caller, result); - error? verificationResult = processUnSubscriptionVerification( - headers, self.adaptor, unsubscription, self.isUnsubscriptionValidationAvailable, self.clientConfig); + error? verificationResult = self.subscriptionHandler.processUnSubscriptionVerification( + unsubscription, headers); if verificationResult is error { log:printError("Error occurred while processing unsubscription", 'error = verificationResult); } @@ -189,7 +179,7 @@ isolated function respondWithResult(http:Caller caller, http:Response|error resu } if respondError is http:ListenerError { - return error Error("Error occurred while responding to the request ", respondError, + return error Error("Error occurred while responding to the request ", respondError, statusCode = http:STATUS_INTERNAL_SERVER_ERROR); } } diff --git a/ballerina/subscription.bal b/ballerina/subscription.bal index b5a0027f..386a7342 100644 --- a/ballerina/subscription.bal +++ b/ballerina/subscription.bal @@ -17,6 +17,136 @@ import ballerina/http; import ballerina/uuid; +isolated class SubscriptionHandler { + private final HttpToWebsubhubAdaptor adaptor; + private final readonly & ClientConfiguration clientConfig; + private final int defaultHubLeaseSeconds; + + private final boolean isOnSubscriptionAvailable; + private final boolean isOnSubscriptionValidationAvailable; + private final boolean isOnUnsubscriptionAvailable; + private final boolean isOnUnsubscriptionValidationAvailable; + + isolated function init(HttpToWebsubhubAdaptor adaptor, int leaseSeconds, *ClientConfiguration clientConfig) { + self.adaptor = adaptor; + self.defaultHubLeaseSeconds = leaseSeconds; + self.clientConfig = clientConfig.cloneReadOnly(); + string[] methodNames = adaptor.getServiceMethodNames(); + self.isOnSubscriptionAvailable = methodNames.indexOf("onSubscription") is int; + self.isOnSubscriptionValidationAvailable = methodNames.indexOf("onSubscriptionValidation") is int; + self.isOnUnsubscriptionAvailable = methodNames.indexOf("onUnsubscription") is int; + self.isOnUnsubscriptionValidationAvailable = methodNames.indexOf("onUnsubscriptionValidation") is int; + } + + isolated function processSubscription(Subscription message, http:Headers headers) returns http:Response|Redirect { + if !self.isOnSubscriptionAvailable { + http:Response response = new; + response.statusCode = http:STATUS_ACCEPTED; + return response; + } + + SubscriptionAccepted|Redirect|error result = self.adaptor.callOnSubscriptionMethod(message, headers); + if result is Redirect { + return result; + } + + return processOnSubscriptionResult(result); + } + + isolated function processSubscriptionVerification(Subscription message, http:Headers headers) returns error? { + error? validationResult = self.validateSubscription(message, headers); + if validationResult is error { + [string, string?][] params = [ + [HUB_MODE, MODE_DENIED], + [HUB_TOPIC, message.hubTopic], + [HUB_REASON, validationResult.message()] + ]; + _ = check sendNotification(message.hubCallback, params, self.clientConfig); + return; + } + + string challenge = uuid:createType4AsString(); + [string, string?][] params = [ + [HUB_MODE, MODE_SUBSCRIBE], + [HUB_TOPIC, message.hubTopic], + [HUB_CHALLENGE, challenge], + [HUB_LEASE_SECONDS, message.hubLeaseSeconds] + ]; + http:Response subscriberResponse = check sendNotification(message.hubCallback, params, self.clientConfig); + string responsePayload = check subscriberResponse.getTextPayload(); + if challenge != responsePayload { + return; + } + + VerifiedSubscription verifiedSubscription = { + ...message + }; + check self.adaptor.callOnSubscriptionIntentVerifiedMethod(verifiedSubscription, headers); + } + + isolated function validateSubscription(Subscription message, http:Headers headers) returns error? { + if self.isOnSubscriptionValidationAvailable { + return self.adaptor.callOnSubscriptionValidationMethod(message, headers); + } + + if !message.hubCallback.startsWith("http://") && !message.hubCallback.startsWith("https://") { + return error SubscriptionDeniedError( + "Invalid hub.callback param in the request.", statusCode = http:STATUS_NOT_ACCEPTABLE); + } + } + + isolated function processUnsubscription(Unsubscription message, http:Headers headers) returns http:Response { + if !self.isOnUnsubscriptionAvailable { + http:Response response = new; + response.statusCode = http:STATUS_ACCEPTED; + return response; + } + + UnsubscriptionAccepted|error result = self.adaptor.callOnUnsubscriptionMethod(message, headers); + return processOnUnsubscriptionResult(result); + } + + isolated function processUnSubscriptionVerification(Unsubscription message, http:Headers headers) returns error? { + error? validationResult = self.validateUnsubscription(message, headers); + if validationResult is error { + [string, string?][] params = [ + [HUB_MODE, MODE_DENIED], + [HUB_TOPIC, message.hubTopic], + [HUB_REASON, validationResult.message()] + ]; + _ = check sendNotification(message.hubCallback, params, self.clientConfig); + } + + string challenge = uuid:createType4AsString(); + [string, string?][] params = [ + [HUB_MODE, MODE_UNSUBSCRIBE], + [HUB_TOPIC, message.hubTopic], + [HUB_CHALLENGE, challenge] + ]; + http:Response subscriberResponse = check sendNotification(message.hubCallback, params, self.clientConfig); + string responsePayload = check subscriberResponse.getTextPayload(); + if challenge != responsePayload { + return; + } + + VerifiedUnsubscription verifiedUnsubscription = { + ...message + }; + check self.adaptor.callOnUnsubscriptionIntentVerifiedMethod(verifiedUnsubscription, headers); + } + + isolated function validateUnsubscription(Unsubscription message, http:Headers headers) returns error? { + if self.isOnUnsubscriptionValidationAvailable { + return self.adaptor.callOnUnsubscriptionValidationMethod(message, headers); + } + + if !message.hubCallback.startsWith("http://") && !message.hubCallback.startsWith("https://") { + return error SubscriptionDeniedError( + "Invalid hub.callback param in the request.", statusCode = http:STATUS_NOT_ACCEPTABLE); + } + } +} + isolated function createSubscriptionMessage(string hubUrl, int defaultHubLeaseSeconds, map params) returns Subscription|error { string topic = check retrieveQueryParameter(params, HUB_TOPIC); string hubCallback = check retrieveQueryParameter(params, HUB_CALLBACK); @@ -46,21 +176,6 @@ isolated function retrieveLeaseSeconds(map params, int defaultHubLeaseSe return defaultHubLeaseSeconds; } -isolated function processSubscription(Subscription message, http:Headers headers, - HttpToWebsubhubAdaptor adaptor, boolean onSubscriptionMethodAvailable) returns http:Response|Redirect { - if !onSubscriptionMethodAvailable { - http:Response response = new; - response.statusCode = http:STATUS_ACCEPTED; - return response; - } else { - SubscriptionAccepted|Redirect|error result = adaptor.callOnSubscriptionMethod(message, headers); - if result is Redirect { - return result; - } - return processOnSubscriptionResult(result); - } -} - isolated function processOnSubscriptionResult(SubscriptionAccepted|error result) returns http:Response|Redirect { http:Response response = new; if result is SubscriptionAccepted { @@ -77,46 +192,6 @@ isolated function processOnSubscriptionResult(SubscriptionAccepted|error result) } } -isolated function processSubscriptionVerification(http:Headers headers, HttpToWebsubhubAdaptor adaptor, Subscription message, - boolean subscriptionValidationMethodAvailable, ClientConfiguration config) returns error? { - error? validationResult = validateSubscription(subscriptionValidationMethodAvailable, message, headers, adaptor); - if validationResult is error { - [string, string?][] params = [ - [HUB_MODE, MODE_DENIED], - [HUB_TOPIC, message.hubTopic], - [HUB_REASON, validationResult.message()] - ]; - _ = check sendNotification(message.hubCallback, params, config); - } else { - string challenge = uuid:createType4AsString(); - [string, string?][] params = [ - [HUB_MODE, MODE_SUBSCRIBE], - [HUB_TOPIC, message.hubTopic], - [HUB_CHALLENGE, challenge], - [HUB_LEASE_SECONDS, message.hubLeaseSeconds] - ]; - http:Response subscriberResponse = check sendNotification(message.hubCallback, params, config); - string respStringPayload = check subscriberResponse.getTextPayload(); - if (respStringPayload == challenge) { - VerifiedSubscription verifiedMessage = { - ...message - }; - check adaptor.callOnSubscriptionIntentVerifiedMethod(verifiedMessage, headers); - } - } -} - -isolated function validateSubscription(boolean isRemoteMethodAvailable, Subscription message, - http:Headers headers, HttpToWebsubhubAdaptor adaptor) returns error? { - if isRemoteMethodAvailable { - return adaptor.callOnSubscriptionValidationMethod(message, headers); - } else { - if !message.hubCallback.startsWith("http://") && !message.hubCallback.startsWith("https://") { - return error SubscriptionDeniedError("Invalid hub.callback param in the request.", statusCode = http:STATUS_NOT_ACCEPTABLE); - } - } -} - isolated function createUnsubscriptionMessage(map params) returns Unsubscription|error { string topic = check retrieveQueryParameter(params, HUB_TOPIC); string hubCallback = check retrieveQueryParameter(params, HUB_CALLBACK); @@ -132,18 +207,6 @@ isolated function createUnsubscriptionMessage(map params) returns Unsubs return message; } -isolated function processUnsubscription(Unsubscription message, http:Headers headers, - HttpToWebsubhubAdaptor adaptor, boolean onUnsubscriptionMethodAvailable) returns http:Response { - if !onUnsubscriptionMethodAvailable { - http:Response response = new; - response.statusCode = http:STATUS_ACCEPTED; - return response; - } else { - UnsubscriptionAccepted|error result = adaptor.callOnUnsubscriptionMethod(message, headers); - return processOnUnsubscriptionResult(result); - } -} - isolated function processOnUnsubscriptionResult(UnsubscriptionAccepted|error result) returns http:Response { http:Response response = new; if result is UnsubscriptionAccepted { @@ -159,42 +222,3 @@ isolated function processOnUnsubscriptionResult(UnsubscriptionAccepted|error res return response; } } - -isolated function processUnSubscriptionVerification(http:Headers headers, HttpToWebsubhubAdaptor adaptor, Unsubscription message, - boolean unsubscriptionValidationMethodAvailable, ClientConfiguration config) returns error? { - error? validationResult = validateUnsubscription(unsubscriptionValidationMethodAvailable, message, headers, adaptor); - if validationResult is error { - [string, string?][] params = [ - [HUB_MODE, MODE_DENIED], - [HUB_TOPIC, message.hubTopic], - [HUB_REASON, validationResult.message()] - ]; - _ = check sendNotification(message.hubCallback, params, config); - } else { - string challenge = uuid:createType4AsString(); - [string, string?][] params = [ - [HUB_MODE, MODE_UNSUBSCRIBE], - [HUB_TOPIC, message.hubTopic], - [HUB_CHALLENGE, challenge] - ]; - http:Response subscriberResponse = check sendNotification(message.hubCallback, params, config); - string respStringPayload = check subscriberResponse.getTextPayload(); - if (respStringPayload == challenge) { - VerifiedUnsubscription verifiedMessage = { - ...message - }; - check adaptor.callOnUnsubscriptionIntentVerifiedMethod(verifiedMessage, headers); - } - } -} - -isolated function validateUnsubscription(boolean isRemoteMethodAvailable, Unsubscription message, - http:Headers headers, HttpToWebsubhubAdaptor adaptor) returns error? { - if isRemoteMethodAvailable { - return adaptor.callOnUnsubscriptionValidationMethod(message, headers); - } else { - if !message.hubCallback.startsWith("http://") && !message.hubCallback.startsWith("https://") { - return error SubscriptionDeniedError("Invalid hub.callback param in the request.", statusCode = http:STATUS_NOT_ACCEPTABLE); - } - } -} From 509f44c1f36cedd14a3f4ec57955fe75eb3f1b33 Mon Sep 17 00:00:00 2001 From: Ayesh Almeida Date: Fri, 14 Feb 2025 01:20:37 +0530 Subject: [PATCH 06/14] Rename variable name --- ballerina/subscription.bal | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/ballerina/subscription.bal b/ballerina/subscription.bal index 386a7342..b21af3a1 100644 --- a/ballerina/subscription.bal +++ b/ballerina/subscription.bal @@ -20,7 +20,7 @@ import ballerina/uuid; isolated class SubscriptionHandler { private final HttpToWebsubhubAdaptor adaptor; private final readonly & ClientConfiguration clientConfig; - private final int defaultHubLeaseSeconds; + private final int defaultLeaseSeconds; private final boolean isOnSubscriptionAvailable; private final boolean isOnSubscriptionValidationAvailable; @@ -29,7 +29,7 @@ isolated class SubscriptionHandler { isolated function init(HttpToWebsubhubAdaptor adaptor, int leaseSeconds, *ClientConfiguration clientConfig) { self.adaptor = adaptor; - self.defaultHubLeaseSeconds = leaseSeconds; + self.defaultLeaseSeconds = leaseSeconds; self.clientConfig = clientConfig.cloneReadOnly(); string[] methodNames = adaptor.getServiceMethodNames(); self.isOnSubscriptionAvailable = methodNames.indexOf("onSubscription") is int; @@ -147,10 +147,10 @@ isolated class SubscriptionHandler { } } -isolated function createSubscriptionMessage(string hubUrl, int defaultHubLeaseSeconds, map params) returns Subscription|error { +isolated function createSubscriptionMessage(string hubUrl, int defaultLeaseSeconds, map params) returns Subscription|error { string topic = check retrieveQueryParameter(params, HUB_TOPIC); string hubCallback = check retrieveQueryParameter(params, HUB_CALLBACK); - int leaseSeconds = retrieveLeaseSeconds(params, defaultHubLeaseSeconds); + int leaseSeconds = retrieveLeaseSeconds(params, defaultLeaseSeconds); Subscription message = { hub: hubUrl, hubMode: MODE_SUBSCRIBE, @@ -165,7 +165,7 @@ isolated function createSubscriptionMessage(string hubUrl, int defaultHubLeaseSe return message; } -isolated function retrieveLeaseSeconds(map params, int defaultHubLeaseSeconds) returns int { +isolated function retrieveLeaseSeconds(map params, int defaultLeaseSeconds) returns int { var hubLeaseSeconds = params.removeIfHasKey(HUB_LEASE_SECONDS); if hubLeaseSeconds is string { var retrievedLeaseSeconds = 'int:fromString(hubLeaseSeconds); @@ -173,7 +173,7 @@ isolated function retrieveLeaseSeconds(map params, int defaultHubLeaseSe return retrievedLeaseSeconds; } } - return defaultHubLeaseSeconds; + return defaultLeaseSeconds; } isolated function processOnSubscriptionResult(SubscriptionAccepted|error result) returns http:Response|Redirect { From 23fc1b8a3777689cf02708c2fef8c76aec7dd4bf Mon Sep 17 00:00:00 2001 From: Ayesh Almeida Date: Fri, 14 Feb 2025 01:36:02 +0530 Subject: [PATCH 07/14] Refactor subscription and unsubscription flow --- ballerina/http_service.bal | 87 +++++++++++++++++++------------------- 1 file changed, 43 insertions(+), 44 deletions(-) diff --git a/ballerina/http_service.bal b/ballerina/http_service.bal index c7f4360c..1800bf70 100644 --- a/ballerina/http_service.bal +++ b/ballerina/http_service.bal @@ -54,10 +54,10 @@ isolated service class HttpService { return respondWithResult(caller, result); } MODE_SUBSCRIBE => { - return self.handleSubscription(caller, headers, params); + return self.onSubscriptionRequest(caller, headers, params); } MODE_UNSUBSCRIBE => { - return self.handleUnsubscription(caller, headers, params); + return self.onUnsubcriptionRequest(caller, headers, params); } MODE_PUBLISH => { http:Response|error result = processContentPublish(request, headers, params, self.adaptor); @@ -112,58 +112,57 @@ isolated service class HttpService { return params; } - isolated function handleSubscription(http:Caller caller, http:Headers headers, map params) returns Error? { + isolated function onSubscriptionRequest(http:Caller caller, http:Headers headers, map params) + returns Error? { Subscription|error subscription = createSubscriptionMessage(self.hub, self.defaultHubLeaseSeconds, params); - if subscription is Subscription { - http:Response|Redirect result = self.subscriptionHandler.processSubscription(subscription, headers); - if result is Redirect { - error? redirectError = caller->redirect(new http:Response(), result.code, result.redirectUrls); - if redirectError is error { - log:printError("Error occurred while redirecting the subscription", 'error = redirectError); - } - return; - } - - int currentStatusCode = result.statusCode; - if currentStatusCode == http:STATUS_ACCEPTED { - check respondWithResult(caller, result); + if subscription is error { + http:Response response = new; + response.statusCode = http:STATUS_BAD_REQUEST; + response.setTextPayload(subscription.message()); + return respondWithResult(caller, response); + } - error? verificationResult = self.subscriptionHandler.processSubscriptionVerification(subscription, headers); - if verificationResult is error { - log:printError("Error occurred while processing subscription", 'error = verificationResult); - } - return; + http:Response|Redirect result = self.subscriptionHandler.processSubscription(subscription, headers); + if result is Redirect { + error? redirectError = caller->redirect(new http:Response(), result.code, result.redirectUrls); + if redirectError is error { + log:printError("Error occurred while redirecting the subscription", 'error = redirectError); } - return respondWithResult(caller, result); + return; } - http:Response response = new; - response.statusCode = http:STATUS_BAD_REQUEST; - response.setTextPayload(subscription.message()); - return respondWithResult(caller, response); + check respondWithResult(caller, result); + if result.statusCode != http:STATUS_ACCEPTED { + return; + } + + error? verificationResult = self.subscriptionHandler.processSubscriptionVerification(subscription, headers); + if verificationResult is error { + log:printError("Error occurred while processing subscription", 'error = verificationResult); + } } - isolated function handleUnsubscription(http:Caller caller, http:Headers headers, map params) returns Error? { + isolated function onUnsubcriptionRequest(http:Caller caller, http:Headers headers, map params) + returns Error? { Unsubscription|error unsubscription = createUnsubscriptionMessage(params); - if unsubscription is Unsubscription { - http:Response result = self.subscriptionHandler.processUnsubscription(unsubscription, headers); - int currentStatusCode = result.statusCode; - if currentStatusCode == http:STATUS_ACCEPTED { - check respondWithResult(caller, result); - error? verificationResult = self.subscriptionHandler.processUnSubscriptionVerification( - unsubscription, headers); - if verificationResult is error { - log:printError("Error occurred while processing unsubscription", 'error = verificationResult); - } - return; - } - return respondWithResult(caller, result); + if unsubscription is error { + http:Response response = new; + response.statusCode = http:STATUS_BAD_REQUEST; + response.setTextPayload(unsubscription.message()); + return respondWithResult(caller, response); } - http:Response response = new; - response.statusCode = http:STATUS_BAD_REQUEST; - response.setTextPayload(unsubscription.message()); - return respondWithResult(caller, response); + http:Response result = self.subscriptionHandler.processUnsubscription(unsubscription, headers); + check respondWithResult(caller, result); + if result.statusCode != http:STATUS_ACCEPTED { + return; + } + + error? verificationResult = self.subscriptionHandler.processUnSubscriptionVerification( + unsubscription, headers); + if verificationResult is error { + log:printError("Error occurred while processing unsubscription", 'error = verificationResult); + } } } From ea2a84de3278b64c2cd5cf59a1f720c082d3f8cf Mon Sep 17 00:00:00 2001 From: Ayesh Almeida Date: Fri, 14 Feb 2025 01:39:11 +0530 Subject: [PATCH 08/14] Refactor code base --- ballerina/http_service.bal | 21 ++++++++++----------- ballerina/subscription.bal | 4 +--- 2 files changed, 11 insertions(+), 14 deletions(-) diff --git a/ballerina/http_service.bal b/ballerina/http_service.bal index 1800bf70..44b8e3fd 100644 --- a/ballerina/http_service.bal +++ b/ballerina/http_service.bal @@ -23,15 +23,15 @@ isolated service class HttpService { private final HttpToWebsubhubAdaptor adaptor; private final string hub; - private final int defaultHubLeaseSeconds; + private final int defaultLeaseSeconds; private final SubscriptionHandler subscriptionHandler; isolated function init(HttpToWebsubhubAdaptor adaptor, string hubUrl, int leaseSeconds, *ClientConfiguration clientConfig) { self.adaptor = adaptor; self.hub = hubUrl; - self.defaultHubLeaseSeconds = leaseSeconds; - self.subscriptionHandler = new (adaptor, leaseSeconds, clientConfig); + self.defaultLeaseSeconds = leaseSeconds; + self.subscriptionHandler = new (adaptor, clientConfig); } isolated resource function post .(http:Caller caller, http:Request request, http:Headers headers) returns Error? { @@ -114,7 +114,7 @@ isolated service class HttpService { isolated function onSubscriptionRequest(http:Caller caller, http:Headers headers, map params) returns Error? { - Subscription|error subscription = createSubscriptionMessage(self.hub, self.defaultHubLeaseSeconds, params); + Subscription|error subscription = createSubscriptionMessage(self.hub, self.defaultLeaseSeconds, params); if subscription is error { http:Response response = new; response.statusCode = http:STATUS_BAD_REQUEST; @@ -136,9 +136,9 @@ isolated service class HttpService { return; } - error? verificationResult = self.subscriptionHandler.processSubscriptionVerification(subscription, headers); - if verificationResult is error { - log:printError("Error occurred while processing subscription", 'error = verificationResult); + error? verification = self.subscriptionHandler.processSubscriptionVerification(subscription, headers); + if verification is error { + log:printError("Error occurred while processing subscription", 'error = verification); } } @@ -158,10 +158,9 @@ isolated service class HttpService { return; } - error? verificationResult = self.subscriptionHandler.processUnSubscriptionVerification( - unsubscription, headers); - if verificationResult is error { - log:printError("Error occurred while processing unsubscription", 'error = verificationResult); + error? verification = self.subscriptionHandler.processUnSubscriptionVerification(unsubscription, headers); + if verification is error { + log:printError("Error occurred while processing unsubscription", 'error = verification); } } } diff --git a/ballerina/subscription.bal b/ballerina/subscription.bal index b21af3a1..f0a8d5c8 100644 --- a/ballerina/subscription.bal +++ b/ballerina/subscription.bal @@ -20,16 +20,14 @@ import ballerina/uuid; isolated class SubscriptionHandler { private final HttpToWebsubhubAdaptor adaptor; private final readonly & ClientConfiguration clientConfig; - private final int defaultLeaseSeconds; private final boolean isOnSubscriptionAvailable; private final boolean isOnSubscriptionValidationAvailable; private final boolean isOnUnsubscriptionAvailable; private final boolean isOnUnsubscriptionValidationAvailable; - isolated function init(HttpToWebsubhubAdaptor adaptor, int leaseSeconds, *ClientConfiguration clientConfig) { + isolated function init(HttpToWebsubhubAdaptor adaptor, *ClientConfiguration clientConfig) { self.adaptor = adaptor; - self.defaultLeaseSeconds = leaseSeconds; self.clientConfig = clientConfig.cloneReadOnly(); string[] methodNames = adaptor.getServiceMethodNames(); self.isOnSubscriptionAvailable = methodNames.indexOf("onSubscription") is int; From d845c45155f0863954fb77fd4aca273f79100f15 Mon Sep 17 00:00:00 2001 From: Ayesh Almeida Date: Fri, 14 Feb 2025 01:44:12 +0530 Subject: [PATCH 09/14] Refactor method names --- ballerina/http_service.bal | 4 ++-- ballerina/subscription.bal | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/ballerina/http_service.bal b/ballerina/http_service.bal index 44b8e3fd..bade0ecf 100644 --- a/ballerina/http_service.bal +++ b/ballerina/http_service.bal @@ -136,7 +136,7 @@ isolated service class HttpService { return; } - error? verification = self.subscriptionHandler.processSubscriptionVerification(subscription, headers); + error? verification = self.subscriptionHandler.verifySubscription(subscription, headers); if verification is error { log:printError("Error occurred while processing subscription", 'error = verification); } @@ -158,7 +158,7 @@ isolated service class HttpService { return; } - error? verification = self.subscriptionHandler.processUnSubscriptionVerification(unsubscription, headers); + error? verification = self.subscriptionHandler.verifyUnsubscription(unsubscription, headers); if verification is error { log:printError("Error occurred while processing unsubscription", 'error = verification); } diff --git a/ballerina/subscription.bal b/ballerina/subscription.bal index f0a8d5c8..a7430fbf 100644 --- a/ballerina/subscription.bal +++ b/ballerina/subscription.bal @@ -51,7 +51,7 @@ isolated class SubscriptionHandler { return processOnSubscriptionResult(result); } - isolated function processSubscriptionVerification(Subscription message, http:Headers headers) returns error? { + isolated function verifySubscription(Subscription message, http:Headers headers) returns error? { error? validationResult = self.validateSubscription(message, headers); if validationResult is error { [string, string?][] params = [ @@ -104,7 +104,7 @@ isolated class SubscriptionHandler { return processOnUnsubscriptionResult(result); } - isolated function processUnSubscriptionVerification(Unsubscription message, http:Headers headers) returns error? { + isolated function verifyUnsubscription(Unsubscription message, http:Headers headers) returns error? { error? validationResult = self.validateUnsubscription(message, headers); if validationResult is error { [string, string?][] params = [ From abe2ed78e38dfced3dac0bb628b76d5002045877 Mon Sep 17 00:00:00 2001 From: Ayesh Almeida Date: Fri, 14 Feb 2025 01:46:06 +0530 Subject: [PATCH 10/14] Format the ballerina code --- ballerina/subscription.bal | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/ballerina/subscription.bal b/ballerina/subscription.bal index a7430fbf..3e1b6729 100644 --- a/ballerina/subscription.bal +++ b/ballerina/subscription.bal @@ -145,7 +145,8 @@ isolated class SubscriptionHandler { } } -isolated function createSubscriptionMessage(string hubUrl, int defaultLeaseSeconds, map params) returns Subscription|error { +isolated function createSubscriptionMessage(string hubUrl, int defaultLeaseSeconds, map params) +returns Subscription|error { string topic = check retrieveQueryParameter(params, HUB_TOPIC); string hubCallback = check retrieveQueryParameter(params, HUB_CALLBACK); int leaseSeconds = retrieveLeaseSeconds(params, defaultLeaseSeconds); @@ -184,7 +185,8 @@ isolated function processOnSubscriptionResult(SubscriptionAccepted|error result) updateErrorResponse(response, errorDetails, result.message()); return response; } else { - CommonResponse errorDetails = result is InternalSubscriptionError ? result.detail() : INTERNAL_SUBSCRIPTION_ERROR.detail(); + CommonResponse errorDetails = result is InternalSubscriptionError ? + result.detail() : INTERNAL_SUBSCRIPTION_ERROR.detail(); updateErrorResponse(response, errorDetails, result.message()); return response; } @@ -215,7 +217,8 @@ isolated function processOnUnsubscriptionResult(UnsubscriptionAccepted|error res updateErrorResponse(response, errorDetails, result.message()); return response; } else { - CommonResponse errorDetails = result is InternalSubscriptionError ? result.detail() : INTERNAL_UNSUBSCRIPTION_ERROR.detail(); + CommonResponse errorDetails = result is InternalSubscriptionError ? + result.detail() : INTERNAL_UNSUBSCRIPTION_ERROR.detail(); updateErrorResponse(response, errorDetails, result.message()); return response; } From 059e3432823d2889af1f95223fd1f8a74a19b6b1 Mon Sep 17 00:00:00 2001 From: Ayesh Almeida Date: Mon, 17 Feb 2025 09:39:22 +0530 Subject: [PATCH 11/14] Reformat code --- ballerina/http_service.bal | 2 ++ ballerina/subscription.bal | 1 + 2 files changed, 3 insertions(+) diff --git a/ballerina/http_service.bal b/ballerina/http_service.bal index bade0ecf..42487b3e 100644 --- a/ballerina/http_service.bal +++ b/ballerina/http_service.bal @@ -114,6 +114,7 @@ isolated service class HttpService { isolated function onSubscriptionRequest(http:Caller caller, http:Headers headers, map params) returns Error? { + Subscription|error subscription = createSubscriptionMessage(self.hub, self.defaultLeaseSeconds, params); if subscription is error { http:Response response = new; @@ -144,6 +145,7 @@ isolated service class HttpService { isolated function onUnsubcriptionRequest(http:Caller caller, http:Headers headers, map params) returns Error? { + Unsubscription|error unsubscription = createUnsubscriptionMessage(params); if unsubscription is error { http:Response response = new; diff --git a/ballerina/subscription.bal b/ballerina/subscription.bal index 3e1b6729..7c2adc0f 100644 --- a/ballerina/subscription.bal +++ b/ballerina/subscription.bal @@ -147,6 +147,7 @@ isolated class SubscriptionHandler { isolated function createSubscriptionMessage(string hubUrl, int defaultLeaseSeconds, map params) returns Subscription|error { + string topic = check retrieveQueryParameter(params, HUB_TOPIC); string hubCallback = check retrieveQueryParameter(params, HUB_CALLBACK); int leaseSeconds = retrieveLeaseSeconds(params, defaultLeaseSeconds); From 0089bcc4cbe5e334e28ede9af05cb1e1abd5a946 Mon Sep 17 00:00:00 2001 From: Ayesh Almeida Date: Mon, 17 Feb 2025 09:46:50 +0530 Subject: [PATCH 12/14] Refactor naming in the codebase --- ballerina/http_service.bal | 12 ++++++------ ballerina/subscription.bal | 4 ++-- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/ballerina/http_service.bal b/ballerina/http_service.bal index 42487b3e..bc108bd4 100644 --- a/ballerina/http_service.bal +++ b/ballerina/http_service.bal @@ -54,10 +54,10 @@ isolated service class HttpService { return respondWithResult(caller, result); } MODE_SUBSCRIBE => { - return self.onSubscriptionRequest(caller, headers, params); + return self.processSubscription(caller, headers, params); } MODE_UNSUBSCRIBE => { - return self.onUnsubcriptionRequest(caller, headers, params); + return self.processUnsubscription(caller, headers, params); } MODE_PUBLISH => { http:Response|error result = processContentPublish(request, headers, params, self.adaptor); @@ -112,7 +112,7 @@ isolated service class HttpService { return params; } - isolated function onSubscriptionRequest(http:Caller caller, http:Headers headers, map params) + isolated function processSubscription(http:Caller caller, http:Headers headers, map params) returns Error? { Subscription|error subscription = createSubscriptionMessage(self.hub, self.defaultLeaseSeconds, params); @@ -123,7 +123,7 @@ isolated service class HttpService { return respondWithResult(caller, response); } - http:Response|Redirect result = self.subscriptionHandler.processSubscription(subscription, headers); + http:Response|Redirect result = self.subscriptionHandler.intiateSubscription(subscription, headers); if result is Redirect { error? redirectError = caller->redirect(new http:Response(), result.code, result.redirectUrls); if redirectError is error { @@ -143,7 +143,7 @@ isolated service class HttpService { } } - isolated function onUnsubcriptionRequest(http:Caller caller, http:Headers headers, map params) + isolated function processUnsubscription(http:Caller caller, http:Headers headers, map params) returns Error? { Unsubscription|error unsubscription = createUnsubscriptionMessage(params); @@ -154,7 +154,7 @@ isolated service class HttpService { return respondWithResult(caller, response); } - http:Response result = self.subscriptionHandler.processUnsubscription(unsubscription, headers); + http:Response result = self.subscriptionHandler.initiateUnsubscription(unsubscription, headers); check respondWithResult(caller, result); if result.statusCode != http:STATUS_ACCEPTED { return; diff --git a/ballerina/subscription.bal b/ballerina/subscription.bal index 7c2adc0f..e7f7abbd 100644 --- a/ballerina/subscription.bal +++ b/ballerina/subscription.bal @@ -36,7 +36,7 @@ isolated class SubscriptionHandler { self.isOnUnsubscriptionValidationAvailable = methodNames.indexOf("onUnsubscriptionValidation") is int; } - isolated function processSubscription(Subscription message, http:Headers headers) returns http:Response|Redirect { + isolated function intiateSubscription(Subscription message, http:Headers headers) returns http:Response|Redirect { if !self.isOnSubscriptionAvailable { http:Response response = new; response.statusCode = http:STATUS_ACCEPTED; @@ -93,7 +93,7 @@ isolated class SubscriptionHandler { } } - isolated function processUnsubscription(Unsubscription message, http:Headers headers) returns http:Response { + isolated function initiateUnsubscription(Unsubscription message, http:Headers headers) returns http:Response { if !self.isOnUnsubscriptionAvailable { http:Response response = new; response.statusCode = http:STATUS_ACCEPTED; From 2430aa99af6ee3c87a501477c424db5fa00006ea Mon Sep 17 00:00:00 2001 From: Ayesh Almeida Date: Mon, 17 Feb 2025 09:54:04 +0530 Subject: [PATCH 13/14] Refactor code base --- ballerina/http_service.bal | 5 ----- 1 file changed, 5 deletions(-) diff --git a/ballerina/http_service.bal b/ballerina/http_service.bal index bc108bd4..8d64c789 100644 --- a/ballerina/http_service.bal +++ b/ballerina/http_service.bal @@ -61,11 +61,6 @@ isolated service class HttpService { } MODE_PUBLISH => { http:Response|error result = processContentPublish(request, headers, params, self.adaptor); - if result is error { - response.statusCode = http:STATUS_BAD_REQUEST; - response.setTextPayload(result.message()); - return respondWithResult(caller, response); - } return respondWithResult(caller, result); } _ => { From 3e30d3d9715f0d7c378393b092272e6272ac3575 Mon Sep 17 00:00:00 2001 From: Ayesh Almeida Date: Mon, 17 Feb 2025 09:59:26 +0530 Subject: [PATCH 14/14] Reformat the code --- ballerina/http_service.bal | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/ballerina/http_service.bal b/ballerina/http_service.bal index 8d64c789..2103b6b5 100644 --- a/ballerina/http_service.bal +++ b/ballerina/http_service.bal @@ -107,7 +107,7 @@ isolated service class HttpService { return params; } - isolated function processSubscription(http:Caller caller, http:Headers headers, map params) + isolated function processSubscription(http:Caller caller, http:Headers headers, map params) returns Error? { Subscription|error subscription = createSubscriptionMessage(self.hub, self.defaultLeaseSeconds, params); @@ -115,7 +115,7 @@ isolated service class HttpService { http:Response response = new; response.statusCode = http:STATUS_BAD_REQUEST; response.setTextPayload(subscription.message()); - return respondWithResult(caller, response); + return respondWithResult(caller, response); } http:Response|Redirect result = self.subscriptionHandler.intiateSubscription(subscription, headers); @@ -135,10 +135,10 @@ isolated service class HttpService { error? verification = self.subscriptionHandler.verifySubscription(subscription, headers); if verification is error { log:printError("Error occurred while processing subscription", 'error = verification); - } + } } - isolated function processUnsubscription(http:Caller caller, http:Headers headers, map params) + isolated function processUnsubscription(http:Caller caller, http:Headers headers, map params) returns Error? { Unsubscription|error unsubscription = createUnsubscriptionMessage(params); @@ -146,7 +146,7 @@ isolated service class HttpService { http:Response response = new; response.statusCode = http:STATUS_BAD_REQUEST; response.setTextPayload(unsubscription.message()); - return respondWithResult(caller, response); + return respondWithResult(caller, response); } http:Response result = self.subscriptionHandler.initiateUnsubscription(unsubscription, headers);