diff --git a/ballerina/Ballerina.toml b/ballerina/Ballerina.toml index 2555f93f..851c97f5 100644 --- a/ballerina/Ballerina.toml +++ b/ballerina/Ballerina.toml @@ -1,7 +1,7 @@ [package] org = "ballerina" name = "websubhub" -version = "1.13.0" +version = "1.13.1" authors = ["Ballerina"] keywords = ["websub", "hub", "publisher", "service", "listener", "client"] repository = "https://github.com/ballerina-platform/module-ballerina-websubhub" @@ -15,5 +15,5 @@ graalvmCompatible = true [[platform.java21.dependency]] groupId = "io.ballerina.stdlib" artifactId = "websubhub-native" -version = "1.13.0" -path = "../native/build/libs/websubhub-native-1.13.0.jar" +version = "1.13.1" +path = "../native/build/libs/websubhub-native-1.13.1-SNAPSHOT.jar" diff --git a/ballerina/CompilerPlugin.toml b/ballerina/CompilerPlugin.toml index ced877a5..e3752ed3 100644 --- a/ballerina/CompilerPlugin.toml +++ b/ballerina/CompilerPlugin.toml @@ -3,4 +3,4 @@ id = "websubhub-compiler-plugin" class = "io.ballerina.stdlib.websubhub.WebSubHubCompilerPlugin" [[dependency]] -path = "../compiler-plugin/build/libs/websubhub-compiler-plugin-1.13.0.jar" +path = "../compiler-plugin/build/libs/websubhub-compiler-plugin-1.13.1-SNAPSHOT.jar" diff --git a/ballerina/Dependencies.toml b/ballerina/Dependencies.toml index a3ce3b03..d5719c0e 100644 --- a/ballerina/Dependencies.toml +++ b/ballerina/Dependencies.toml @@ -73,7 +73,7 @@ dependencies = [ [[package]] org = "ballerina" name = "http" -version = "2.13.0" +version = "2.13.1" dependencies = [ {org = "ballerina", name = "auth"}, {org = "ballerina", name = "cache"}, @@ -347,7 +347,7 @@ modules = [ [[package]] org = "ballerina" name = "websubhub" -version = "1.13.0" +version = "1.13.1" dependencies = [ {org = "ballerina", name = "crypto"}, {org = "ballerina", name = "http"}, diff --git a/ballerina/http_service.bal b/ballerina/http_service.bal index f638f710..2103b6b5 100644 --- a/ballerina/http_service.bal +++ b/ballerina/http_service.bal @@ -15,32 +15,23 @@ // under the License. import ballerina/http; -import ballerina/mime; import ballerina/log; +import ballerina/mime; isolated service class HttpService { *http:Service; - + private final HttpToWebsubhubAdaptor adaptor; - private final readonly & ClientConfiguration clientConfig; private final string hub; - private final int defaultHubLeaseSeconds; - private final boolean isSubscriptionAvailable; - private final boolean isSubscriptionValidationAvailable; - private final boolean isUnsubscriptionAvailable; - private final boolean isUnsubscriptionValidationAvailable; + private final int defaultLeaseSeconds; + private final SubscriptionHandler subscriptionHandler; isolated function init(HttpToWebsubhubAdaptor adaptor, string hubUrl, int leaseSeconds, - *ClientConfiguration clientConfig) { + *ClientConfiguration clientConfig) { self.adaptor = adaptor; - self.clientConfig = clientConfig.cloneReadOnly(); self.hub = hubUrl; - self.defaultHubLeaseSeconds = leaseSeconds; - string[] methodNames = adaptor.getServiceMethodNames(); - self.isSubscriptionAvailable = isMethodAvailable("onSubscription", methodNames); - self.isSubscriptionValidationAvailable = isMethodAvailable("onSubscriptionValidation", methodNames); - self.isUnsubscriptionAvailable = isMethodAvailable("onUnsubscription", methodNames); - self.isUnsubscriptionValidationAvailable = isMethodAvailable("onUnsubscriptionValidation", methodNames); + self.defaultLeaseSeconds = leaseSeconds; + self.subscriptionHandler = new (adaptor, clientConfig); } isolated resource function post .(http:Caller caller, http:Request request, http:Headers headers) returns Error? { @@ -49,40 +40,34 @@ isolated service class HttpService { if params is error { response.statusCode = http:STATUS_BAD_REQUEST; response.setTextPayload(params.message()); - return respondToRequest(caller, response); - } else { - string? mode = params[HUB_MODE]; - match mode { - MODE_REGISTER => { - http:Response|error result = processTopicRegistration(headers, params, self.adaptor); - return handleResult(caller, result); - } - MODE_DEREGISTER => { - http:Response|error result = processTopicDeregistration(headers, params, self.adaptor); - return handleResult(caller, result); - } - MODE_SUBSCRIBE => { - return self.handleSubscription(caller, headers, params); - } - MODE_UNSUBSCRIBE => { - return self.handleUnsubscription(caller, headers, params); - } - MODE_PUBLISH => { - http:Response|error result = processContentPublish(request, headers, params, self.adaptor); - if result is error { - response.statusCode = http:STATUS_BAD_REQUEST; - response.setTextPayload(result.message()); - return respondToRequest(caller, response); - } else { - return respondToRequest(caller, result); - } - } - _ => { - response.statusCode = http:STATUS_BAD_REQUEST; - string errorMessage = "The request does not include valid `hub.mode` form param."; - response.setTextPayload(errorMessage); - return respondToRequest(caller, response); - } + return respondWithResult(caller, response); + } + + string? mode = params[HUB_MODE]; + match mode { + MODE_REGISTER => { + http:Response|error result = processTopicRegistration(headers, params, self.adaptor); + return respondWithResult(caller, result); + } + MODE_DEREGISTER => { + http:Response|error result = processTopicDeregistration(headers, params, self.adaptor); + return respondWithResult(caller, result); + } + MODE_SUBSCRIBE => { + return self.processSubscription(caller, headers, params); + } + MODE_UNSUBSCRIBE => { + return self.processUnsubscription(caller, headers, params); + } + MODE_PUBLISH => { + http:Response|error result = processContentPublish(request, headers, params, self.adaptor); + return respondWithResult(caller, result); + } + _ => { + response.statusCode = http:STATUS_BAD_REQUEST; + string errorMessage = "The request does not include valid `hub.mode` form param."; + response.setTextPayload(errorMessage); + return respondWithResult(caller, response); } } } @@ -122,79 +107,74 @@ isolated service class HttpService { return params; } - isolated function handleSubscription(http:Caller caller, http:Headers headers, map params) returns Error? { - Subscription|error subscription = createSubscriptionMessage(self.hub, self.defaultHubLeaseSeconds, params); - if subscription is Subscription { - http:Response|Redirect result = processSubscription(subscription, headers, self.adaptor, self.isSubscriptionAvailable); - if result is Redirect { - error? redirectError = caller->redirect(new http:Response(), result.code, result.redirectUrls); - if redirectError is error { - log:printError("Error occurred while redirecting the subscription", 'error = redirectError); - } - } else { - int currentStatusCode = result.statusCode; - if currentStatusCode == http:STATUS_ACCEPTED { - check respondToRequest(caller, result); - error? verificationResult = processSubscriptionVerification(headers, self.adaptor, subscription, - self.isSubscriptionValidationAvailable, self.clientConfig); - if verificationResult is error { - log:printError("Error occurred while processing subscription", 'error = verificationResult); - } - return; - } - return respondToRequest(caller, result); - } - } else { + isolated function processSubscription(http:Caller caller, http:Headers headers, map params) + returns Error? { + + Subscription|error subscription = createSubscriptionMessage(self.hub, self.defaultLeaseSeconds, params); + if subscription is error { http:Response response = new; response.statusCode = http:STATUS_BAD_REQUEST; response.setTextPayload(subscription.message()); - return respondToRequest(caller, response); + return respondWithResult(caller, response); + } + + http:Response|Redirect result = self.subscriptionHandler.intiateSubscription(subscription, headers); + if result is Redirect { + error? redirectError = caller->redirect(new http:Response(), result.code, result.redirectUrls); + if redirectError is error { + log:printError("Error occurred while redirecting the subscription", 'error = redirectError); + } + return; + } + + check respondWithResult(caller, result); + if result.statusCode != http:STATUS_ACCEPTED { + return; + } + + error? verification = self.subscriptionHandler.verifySubscription(subscription, headers); + if verification is error { + log:printError("Error occurred while processing subscription", 'error = verification); } } - isolated function handleUnsubscription(http:Caller caller, http:Headers headers, map params) returns Error? { + isolated function processUnsubscription(http:Caller caller, http:Headers headers, map params) + returns Error? { + Unsubscription|error unsubscription = createUnsubscriptionMessage(params); - if unsubscription is Unsubscription { - http:Response result = processUnsubscription(unsubscription, headers, self.adaptor, self.isUnsubscriptionAvailable); - int currentStatusCode = result.statusCode; - if currentStatusCode == http:STATUS_ACCEPTED { - check respondToRequest(caller, result); - error? verificationResult = processUnSubscriptionVerification(headers, self.adaptor, unsubscription, - self.isUnsubscriptionValidationAvailable, self.clientConfig); - if verificationResult is error { - log:printError("Error occurred while processing unsubscription", 'error = verificationResult); - } - return; - } - return respondToRequest(caller, result); - } else { + if unsubscription is error { http:Response response = new; response.statusCode = http:STATUS_BAD_REQUEST; response.setTextPayload(unsubscription.message()); - return respondToRequest(caller, response); + return respondWithResult(caller, response); } - } -} -isolated function isMethodAvailable(string methodName, string[] methods) returns boolean { - return methods.indexOf(methodName) is int; + http:Response result = self.subscriptionHandler.initiateUnsubscription(unsubscription, headers); + check respondWithResult(caller, result); + if result.statusCode != http:STATUS_ACCEPTED { + return; + } + + error? verification = self.subscriptionHandler.verifyUnsubscription(unsubscription, headers); + if verification is error { + log:printError("Error occurred while processing unsubscription", 'error = verification); + } + } } -isolated function handleResult(http:Caller caller, http:Response|error result) returns Error? { +isolated function respondWithResult(http:Caller caller, http:Response|error result) returns Error? { + http:ListenerError? respondError = (); if result is error { http:Response response = new; response.statusCode = http:STATUS_BAD_REQUEST; response.setTextPayload(result.message()); - return respondToRequest(caller, response); + respondError = caller->respond(response); } else { - return respondToRequest(caller, result); + respondError = caller->respond(result); } -} -isolated function respondToRequest(http:Caller caller, http:Response response) returns Error? { - http:ListenerError? responseError = caller->respond(response); - if responseError is http:ListenerError { - return error Error( - "Error occurred while responding to the request ", responseError, statusCode = http:STATUS_INTERNAL_SERVER_ERROR); + if respondError is http:ListenerError { + return error Error("Error occurred while responding to the request ", respondError, + statusCode = http:STATUS_INTERNAL_SERVER_ERROR); } } diff --git a/ballerina/natives.bal b/ballerina/natives.bal index c5b21739..54c90113 100644 --- a/ballerina/natives.bal +++ b/ballerina/natives.bal @@ -19,9 +19,13 @@ import ballerina/jballerina.java; isolated class HttpToWebsubhubAdaptor { isolated function init(Service 'service) { - externInit(self, 'service); + self.externInit('service); } + isolated function externInit(Service serviceObj) = @java:Method { + 'class: "io.ballerina.stdlib.websubhub.NativeHttpToWebsubhubAdaptor" + } external; + isolated function getServiceMethodNames() returns string[] = @java:Method { 'class: "io.ballerina.stdlib.websubhub.NativeHttpToWebsubhubAdaptor" } external; @@ -69,7 +73,3 @@ isolated class HttpToWebsubhubAdaptor { 'class: "io.ballerina.stdlib.websubhub.NativeHttpToWebsubhubAdaptor" } external; } - -isolated function externInit(HttpToWebsubhubAdaptor adaptor, Service serviceObj) = @java:Method { - 'class: "io.ballerina.stdlib.websubhub.NativeHttpToWebsubhubAdaptor" -} external; diff --git a/ballerina/subscription.bal b/ballerina/subscription.bal index b5a0027f..e7f7abbd 100644 --- a/ballerina/subscription.bal +++ b/ballerina/subscription.bal @@ -17,10 +17,140 @@ import ballerina/http; import ballerina/uuid; -isolated function createSubscriptionMessage(string hubUrl, int defaultHubLeaseSeconds, map params) returns Subscription|error { +isolated class SubscriptionHandler { + private final HttpToWebsubhubAdaptor adaptor; + private final readonly & ClientConfiguration clientConfig; + + private final boolean isOnSubscriptionAvailable; + private final boolean isOnSubscriptionValidationAvailable; + private final boolean isOnUnsubscriptionAvailable; + private final boolean isOnUnsubscriptionValidationAvailable; + + isolated function init(HttpToWebsubhubAdaptor adaptor, *ClientConfiguration clientConfig) { + self.adaptor = adaptor; + self.clientConfig = clientConfig.cloneReadOnly(); + string[] methodNames = adaptor.getServiceMethodNames(); + self.isOnSubscriptionAvailable = methodNames.indexOf("onSubscription") is int; + self.isOnSubscriptionValidationAvailable = methodNames.indexOf("onSubscriptionValidation") is int; + self.isOnUnsubscriptionAvailable = methodNames.indexOf("onUnsubscription") is int; + self.isOnUnsubscriptionValidationAvailable = methodNames.indexOf("onUnsubscriptionValidation") is int; + } + + isolated function intiateSubscription(Subscription message, http:Headers headers) returns http:Response|Redirect { + if !self.isOnSubscriptionAvailable { + http:Response response = new; + response.statusCode = http:STATUS_ACCEPTED; + return response; + } + + SubscriptionAccepted|Redirect|error result = self.adaptor.callOnSubscriptionMethod(message, headers); + if result is Redirect { + return result; + } + + return processOnSubscriptionResult(result); + } + + isolated function verifySubscription(Subscription message, http:Headers headers) returns error? { + error? validationResult = self.validateSubscription(message, headers); + if validationResult is error { + [string, string?][] params = [ + [HUB_MODE, MODE_DENIED], + [HUB_TOPIC, message.hubTopic], + [HUB_REASON, validationResult.message()] + ]; + _ = check sendNotification(message.hubCallback, params, self.clientConfig); + return; + } + + string challenge = uuid:createType4AsString(); + [string, string?][] params = [ + [HUB_MODE, MODE_SUBSCRIBE], + [HUB_TOPIC, message.hubTopic], + [HUB_CHALLENGE, challenge], + [HUB_LEASE_SECONDS, message.hubLeaseSeconds] + ]; + http:Response subscriberResponse = check sendNotification(message.hubCallback, params, self.clientConfig); + string responsePayload = check subscriberResponse.getTextPayload(); + if challenge != responsePayload { + return; + } + + VerifiedSubscription verifiedSubscription = { + ...message + }; + check self.adaptor.callOnSubscriptionIntentVerifiedMethod(verifiedSubscription, headers); + } + + isolated function validateSubscription(Subscription message, http:Headers headers) returns error? { + if self.isOnSubscriptionValidationAvailable { + return self.adaptor.callOnSubscriptionValidationMethod(message, headers); + } + + if !message.hubCallback.startsWith("http://") && !message.hubCallback.startsWith("https://") { + return error SubscriptionDeniedError( + "Invalid hub.callback param in the request.", statusCode = http:STATUS_NOT_ACCEPTABLE); + } + } + + isolated function initiateUnsubscription(Unsubscription message, http:Headers headers) returns http:Response { + if !self.isOnUnsubscriptionAvailable { + http:Response response = new; + response.statusCode = http:STATUS_ACCEPTED; + return response; + } + + UnsubscriptionAccepted|error result = self.adaptor.callOnUnsubscriptionMethod(message, headers); + return processOnUnsubscriptionResult(result); + } + + isolated function verifyUnsubscription(Unsubscription message, http:Headers headers) returns error? { + error? validationResult = self.validateUnsubscription(message, headers); + if validationResult is error { + [string, string?][] params = [ + [HUB_MODE, MODE_DENIED], + [HUB_TOPIC, message.hubTopic], + [HUB_REASON, validationResult.message()] + ]; + _ = check sendNotification(message.hubCallback, params, self.clientConfig); + } + + string challenge = uuid:createType4AsString(); + [string, string?][] params = [ + [HUB_MODE, MODE_UNSUBSCRIBE], + [HUB_TOPIC, message.hubTopic], + [HUB_CHALLENGE, challenge] + ]; + http:Response subscriberResponse = check sendNotification(message.hubCallback, params, self.clientConfig); + string responsePayload = check subscriberResponse.getTextPayload(); + if challenge != responsePayload { + return; + } + + VerifiedUnsubscription verifiedUnsubscription = { + ...message + }; + check self.adaptor.callOnUnsubscriptionIntentVerifiedMethod(verifiedUnsubscription, headers); + } + + isolated function validateUnsubscription(Unsubscription message, http:Headers headers) returns error? { + if self.isOnUnsubscriptionValidationAvailable { + return self.adaptor.callOnUnsubscriptionValidationMethod(message, headers); + } + + if !message.hubCallback.startsWith("http://") && !message.hubCallback.startsWith("https://") { + return error SubscriptionDeniedError( + "Invalid hub.callback param in the request.", statusCode = http:STATUS_NOT_ACCEPTABLE); + } + } +} + +isolated function createSubscriptionMessage(string hubUrl, int defaultLeaseSeconds, map params) +returns Subscription|error { + string topic = check retrieveQueryParameter(params, HUB_TOPIC); string hubCallback = check retrieveQueryParameter(params, HUB_CALLBACK); - int leaseSeconds = retrieveLeaseSeconds(params, defaultHubLeaseSeconds); + int leaseSeconds = retrieveLeaseSeconds(params, defaultLeaseSeconds); Subscription message = { hub: hubUrl, hubMode: MODE_SUBSCRIBE, @@ -35,7 +165,7 @@ isolated function createSubscriptionMessage(string hubUrl, int defaultHubLeaseSe return message; } -isolated function retrieveLeaseSeconds(map params, int defaultHubLeaseSeconds) returns int { +isolated function retrieveLeaseSeconds(map params, int defaultLeaseSeconds) returns int { var hubLeaseSeconds = params.removeIfHasKey(HUB_LEASE_SECONDS); if hubLeaseSeconds is string { var retrievedLeaseSeconds = 'int:fromString(hubLeaseSeconds); @@ -43,22 +173,7 @@ isolated function retrieveLeaseSeconds(map params, int defaultHubLeaseSe return retrievedLeaseSeconds; } } - return defaultHubLeaseSeconds; -} - -isolated function processSubscription(Subscription message, http:Headers headers, - HttpToWebsubhubAdaptor adaptor, boolean onSubscriptionMethodAvailable) returns http:Response|Redirect { - if !onSubscriptionMethodAvailable { - http:Response response = new; - response.statusCode = http:STATUS_ACCEPTED; - return response; - } else { - SubscriptionAccepted|Redirect|error result = adaptor.callOnSubscriptionMethod(message, headers); - if result is Redirect { - return result; - } - return processOnSubscriptionResult(result); - } + return defaultLeaseSeconds; } isolated function processOnSubscriptionResult(SubscriptionAccepted|error result) returns http:Response|Redirect { @@ -71,52 +186,13 @@ isolated function processOnSubscriptionResult(SubscriptionAccepted|error result) updateErrorResponse(response, errorDetails, result.message()); return response; } else { - CommonResponse errorDetails = result is InternalSubscriptionError ? result.detail() : INTERNAL_SUBSCRIPTION_ERROR.detail(); + CommonResponse errorDetails = result is InternalSubscriptionError ? + result.detail() : INTERNAL_SUBSCRIPTION_ERROR.detail(); updateErrorResponse(response, errorDetails, result.message()); return response; } } -isolated function processSubscriptionVerification(http:Headers headers, HttpToWebsubhubAdaptor adaptor, Subscription message, - boolean subscriptionValidationMethodAvailable, ClientConfiguration config) returns error? { - error? validationResult = validateSubscription(subscriptionValidationMethodAvailable, message, headers, adaptor); - if validationResult is error { - [string, string?][] params = [ - [HUB_MODE, MODE_DENIED], - [HUB_TOPIC, message.hubTopic], - [HUB_REASON, validationResult.message()] - ]; - _ = check sendNotification(message.hubCallback, params, config); - } else { - string challenge = uuid:createType4AsString(); - [string, string?][] params = [ - [HUB_MODE, MODE_SUBSCRIBE], - [HUB_TOPIC, message.hubTopic], - [HUB_CHALLENGE, challenge], - [HUB_LEASE_SECONDS, message.hubLeaseSeconds] - ]; - http:Response subscriberResponse = check sendNotification(message.hubCallback, params, config); - string respStringPayload = check subscriberResponse.getTextPayload(); - if (respStringPayload == challenge) { - VerifiedSubscription verifiedMessage = { - ...message - }; - check adaptor.callOnSubscriptionIntentVerifiedMethod(verifiedMessage, headers); - } - } -} - -isolated function validateSubscription(boolean isRemoteMethodAvailable, Subscription message, - http:Headers headers, HttpToWebsubhubAdaptor adaptor) returns error? { - if isRemoteMethodAvailable { - return adaptor.callOnSubscriptionValidationMethod(message, headers); - } else { - if !message.hubCallback.startsWith("http://") && !message.hubCallback.startsWith("https://") { - return error SubscriptionDeniedError("Invalid hub.callback param in the request.", statusCode = http:STATUS_NOT_ACCEPTABLE); - } - } -} - isolated function createUnsubscriptionMessage(map params) returns Unsubscription|error { string topic = check retrieveQueryParameter(params, HUB_TOPIC); string hubCallback = check retrieveQueryParameter(params, HUB_CALLBACK); @@ -132,18 +208,6 @@ isolated function createUnsubscriptionMessage(map params) returns Unsubs return message; } -isolated function processUnsubscription(Unsubscription message, http:Headers headers, - HttpToWebsubhubAdaptor adaptor, boolean onUnsubscriptionMethodAvailable) returns http:Response { - if !onUnsubscriptionMethodAvailable { - http:Response response = new; - response.statusCode = http:STATUS_ACCEPTED; - return response; - } else { - UnsubscriptionAccepted|error result = adaptor.callOnUnsubscriptionMethod(message, headers); - return processOnUnsubscriptionResult(result); - } -} - isolated function processOnUnsubscriptionResult(UnsubscriptionAccepted|error result) returns http:Response { http:Response response = new; if result is UnsubscriptionAccepted { @@ -154,47 +218,9 @@ isolated function processOnUnsubscriptionResult(UnsubscriptionAccepted|error res updateErrorResponse(response, errorDetails, result.message()); return response; } else { - CommonResponse errorDetails = result is InternalSubscriptionError ? result.detail() : INTERNAL_UNSUBSCRIPTION_ERROR.detail(); + CommonResponse errorDetails = result is InternalSubscriptionError ? + result.detail() : INTERNAL_UNSUBSCRIPTION_ERROR.detail(); updateErrorResponse(response, errorDetails, result.message()); return response; } } - -isolated function processUnSubscriptionVerification(http:Headers headers, HttpToWebsubhubAdaptor adaptor, Unsubscription message, - boolean unsubscriptionValidationMethodAvailable, ClientConfiguration config) returns error? { - error? validationResult = validateUnsubscription(unsubscriptionValidationMethodAvailable, message, headers, adaptor); - if validationResult is error { - [string, string?][] params = [ - [HUB_MODE, MODE_DENIED], - [HUB_TOPIC, message.hubTopic], - [HUB_REASON, validationResult.message()] - ]; - _ = check sendNotification(message.hubCallback, params, config); - } else { - string challenge = uuid:createType4AsString(); - [string, string?][] params = [ - [HUB_MODE, MODE_UNSUBSCRIBE], - [HUB_TOPIC, message.hubTopic], - [HUB_CHALLENGE, challenge] - ]; - http:Response subscriberResponse = check sendNotification(message.hubCallback, params, config); - string respStringPayload = check subscriberResponse.getTextPayload(); - if (respStringPayload == challenge) { - VerifiedUnsubscription verifiedMessage = { - ...message - }; - check adaptor.callOnUnsubscriptionIntentVerifiedMethod(verifiedMessage, headers); - } - } -} - -isolated function validateUnsubscription(boolean isRemoteMethodAvailable, Unsubscription message, - http:Headers headers, HttpToWebsubhubAdaptor adaptor) returns error? { - if isRemoteMethodAvailable { - return adaptor.callOnUnsubscriptionValidationMethod(message, headers); - } else { - if !message.hubCallback.startsWith("http://") && !message.hubCallback.startsWith("https://") { - return error SubscriptionDeniedError("Invalid hub.callback param in the request.", statusCode = http:STATUS_NOT_ACCEPTABLE); - } - } -}