Skip to content

Commit

Permalink
Resolve merge conflicts
Browse files Browse the repository at this point in the history
  • Loading branch information
ayeshLK committed Feb 17, 2025
2 parents 2396936 + cbed44d commit f2b8459
Show file tree
Hide file tree
Showing 3 changed files with 226 additions and 222 deletions.
188 changes: 83 additions & 105 deletions ballerina/http_service.bal
Original file line number Diff line number Diff line change
Expand Up @@ -15,34 +15,23 @@
// under the License.

import ballerina/http;
import ballerina/mime;
import ballerina/log;
import ballerina/mime;

isolated service class HttpService {
*http:Service;

private final HttpToWebsubhubAdaptor adaptor;
private final readonly & ClientConfiguration clientConfig;
private final string hub;
private final int defaultHubLeaseSeconds;
private final boolean isSubscriptionAvailable;
private final boolean isSubscriptionValidationAvailable;
private final boolean isUnsubscriptionAvailable;
private final boolean isUnsubscriptionValidationAvailable;
private final Controller hubController;
private final int defaultLeaseSeconds;
private final SubscriptionHandler subscriptionHandler;

isolated function init(HttpToWebsubhubAdaptor adaptor, string hubUrl, int leaseSeconds,
*ClientConfiguration clientConfig) {
*ClientConfiguration clientConfig) {
self.adaptor = adaptor;
self.clientConfig = clientConfig.cloneReadOnly();
self.hub = hubUrl;
self.defaultHubLeaseSeconds = leaseSeconds;
string[] methodNames = adaptor.getServiceMethodNames();
self.isSubscriptionAvailable = isMethodAvailable("onSubscription", methodNames);
self.isSubscriptionValidationAvailable = isMethodAvailable("onSubscriptionValidation", methodNames);
self.isUnsubscriptionAvailable = isMethodAvailable("onUnsubscription", methodNames);
self.isUnsubscriptionValidationAvailable = isMethodAvailable("onUnsubscriptionValidation", methodNames);
self.hubController = new;
self.defaultLeaseSeconds = leaseSeconds;
self.subscriptionHandler = new (adaptor, clientConfig);
}

isolated resource function post .(http:Caller caller, http:Request request, http:Headers headers) returns Error? {
Expand All @@ -51,40 +40,34 @@ isolated service class HttpService {
if params is error {
response.statusCode = http:STATUS_BAD_REQUEST;
response.setTextPayload(params.message());
return respondToRequest(caller, response);
} else {
string? mode = params[HUB_MODE];
match mode {
MODE_REGISTER => {
http:Response|error result = processTopicRegistration(headers, params, self.adaptor);
return handleResult(caller, result);
}
MODE_DEREGISTER => {
http:Response|error result = processTopicDeregistration(headers, params, self.adaptor);
return handleResult(caller, result);
}
MODE_SUBSCRIBE => {
return self.handleSubscription(caller, headers, params);
}
MODE_UNSUBSCRIBE => {
return self.handleUnsubscription(caller, headers, params);
}
MODE_PUBLISH => {
http:Response|error result = processContentPublish(request, headers, params, self.adaptor);
if result is error {
response.statusCode = http:STATUS_BAD_REQUEST;
response.setTextPayload(result.message());
return respondToRequest(caller, response);
} else {
return respondToRequest(caller, result);
}
}
_ => {
response.statusCode = http:STATUS_BAD_REQUEST;
string errorMessage = "The request does not include valid `hub.mode` form param.";
response.setTextPayload(errorMessage);
return respondToRequest(caller, response);
}
return respondWithResult(caller, response);
}

string? mode = params[HUB_MODE];
match mode {
MODE_REGISTER => {
http:Response|error result = processTopicRegistration(headers, params, self.adaptor);
return respondWithResult(caller, result);
}
MODE_DEREGISTER => {
http:Response|error result = processTopicDeregistration(headers, params, self.adaptor);
return respondWithResult(caller, result);
}
MODE_SUBSCRIBE => {
return self.processSubscription(caller, headers, params);
}
MODE_UNSUBSCRIBE => {
return self.processUnsubscription(caller, headers, params);
}
MODE_PUBLISH => {
http:Response|error result = processContentPublish(request, headers, params, self.adaptor);
return respondWithResult(caller, result);
}
_ => {
response.statusCode = http:STATUS_BAD_REQUEST;
string errorMessage = "The request does not include valid `hub.mode` form param.";
response.setTextPayload(errorMessage);
return respondWithResult(caller, response);
}
}
}
Expand Down Expand Up @@ -124,79 +107,74 @@ isolated service class HttpService {
return params;
}

isolated function handleSubscription(http:Caller caller, http:Headers headers, map<string> params) returns Error? {
Subscription|error subscription = createSubscriptionMessage(self.hub, self.defaultHubLeaseSeconds, params);
if subscription is Subscription {
http:Response|Redirect result = processSubscription(subscription, headers, self.adaptor, self.isSubscriptionAvailable);
if result is Redirect {
error? redirectError = caller->redirect(new http:Response(), result.code, result.redirectUrls);
if redirectError is error {
log:printError("Error occurred while redirecting the subscription", 'error = redirectError);
}
} else {
int currentStatusCode = result.statusCode;
if currentStatusCode == http:STATUS_ACCEPTED {
check respondToRequest(caller, result);
error? verificationResult = processSubscriptionVerification(headers, self.adaptor, subscription,
self.isSubscriptionValidationAvailable, self.clientConfig);
if verificationResult is error {
log:printError("Error occurred while processing subscription", 'error = verificationResult);
}
return;
}
return respondToRequest(caller, result);
}
} else {
isolated function processSubscription(http:Caller caller, http:Headers headers, map<string> params)
returns Error? {

Subscription|error subscription = createSubscriptionMessage(self.hub, self.defaultLeaseSeconds, params);
if subscription is error {
http:Response response = new;
response.statusCode = http:STATUS_BAD_REQUEST;
response.setTextPayload(subscription.message());
return respondToRequest(caller, response);
return respondWithResult(caller, response);
}

http:Response|Redirect result = self.subscriptionHandler.intiateSubscription(subscription, headers);
if result is Redirect {
error? redirectError = caller->redirect(new http:Response(), result.code, result.redirectUrls);
if redirectError is error {
log:printError("Error occurred while redirecting the subscription", 'error = redirectError);
}
return;
}

check respondWithResult(caller, result);
if result.statusCode != http:STATUS_ACCEPTED {
return;
}

error? verification = self.subscriptionHandler.verifySubscription(subscription, headers);
if verification is error {
log:printError("Error occurred while processing subscription", 'error = verification);
}
}

isolated function handleUnsubscription(http:Caller caller, http:Headers headers, map<string> params) returns Error? {
isolated function processUnsubscription(http:Caller caller, http:Headers headers, map<string> params)
returns Error? {

Unsubscription|error unsubscription = createUnsubscriptionMessage(params);
if unsubscription is Unsubscription {
http:Response result = processUnsubscription(unsubscription, headers, self.adaptor, self.isUnsubscriptionAvailable);
int currentStatusCode = result.statusCode;
if currentStatusCode == http:STATUS_ACCEPTED {
check respondToRequest(caller, result);
error? verificationResult = processUnSubscriptionVerification(headers, self.adaptor, unsubscription,
self.isUnsubscriptionValidationAvailable, self.clientConfig);
if verificationResult is error {
log:printError("Error occurred while processing unsubscription", 'error = verificationResult);
}
return;
}
return respondToRequest(caller, result);
} else {
if unsubscription is error {
http:Response response = new;
response.statusCode = http:STATUS_BAD_REQUEST;
response.setTextPayload(unsubscription.message());
return respondToRequest(caller, response);
return respondWithResult(caller, response);
}
}
}

isolated function isMethodAvailable(string methodName, string[] methods) returns boolean {
return methods.indexOf(methodName) is int;
http:Response result = self.subscriptionHandler.initiateUnsubscription(unsubscription, headers);
check respondWithResult(caller, result);
if result.statusCode != http:STATUS_ACCEPTED {
return;
}

error? verification = self.subscriptionHandler.verifyUnsubscription(unsubscription, headers);
if verification is error {
log:printError("Error occurred while processing unsubscription", 'error = verification);
}
}
}

isolated function handleResult(http:Caller caller, http:Response|error result) returns Error? {
isolated function respondWithResult(http:Caller caller, http:Response|error result) returns Error? {
http:ListenerError? respondError = ();
if result is error {
http:Response response = new;
response.statusCode = http:STATUS_BAD_REQUEST;
response.setTextPayload(result.message());
return respondToRequest(caller, response);
respondError = caller->respond(response);
} else {
return respondToRequest(caller, result);
respondError = caller->respond(result);
}
}

isolated function respondToRequest(http:Caller caller, http:Response response) returns Error? {
http:ListenerError? responseError = caller->respond(response);
if responseError is http:ListenerError {
return error Error(
"Error occurred while responding to the request ", responseError, statusCode = http:STATUS_INTERNAL_SERVER_ERROR);
if respondError is http:ListenerError {
return error Error("Error occurred while responding to the request ", respondError,
statusCode = http:STATUS_INTERNAL_SERVER_ERROR);
}
}
10 changes: 5 additions & 5 deletions ballerina/natives.bal
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,13 @@ import ballerina/jballerina.java;

isolated class HttpToWebsubhubAdaptor {
isolated function init(Service 'service) {
externInit(self, 'service);
self.externInit('service);
}

isolated function externInit(Service serviceObj) = @java:Method {
'class: "io.ballerina.stdlib.websubhub.NativeHttpToWebsubhubAdaptor"
} external;

isolated function getServiceMethodNames() returns string[] = @java:Method {
'class: "io.ballerina.stdlib.websubhub.NativeHttpToWebsubhubAdaptor"
} external;
Expand Down Expand Up @@ -69,7 +73,3 @@ isolated class HttpToWebsubhubAdaptor {
'class: "io.ballerina.stdlib.websubhub.NativeHttpToWebsubhubAdaptor"
} external;
}

isolated function externInit(HttpToWebsubhubAdaptor adaptor, Service serviceObj) = @java:Method {
'class: "io.ballerina.stdlib.websubhub.NativeHttpToWebsubhubAdaptor"
} external;
Loading

0 comments on commit f2b8459

Please sign in to comment.