Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement websubhub:ServiceConfig annotation for websubhub:Service #96

Merged
merged 18 commits into from
Apr 20, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Multipart content delivery using websubhub-client.
- Subscription Lease seconds expired event.

## [0.2.0-alpha8] - 2021-04-16
### Added
- [Add compiler plugin to validate wesub:SubscriberService.](https://github.com/ballerina-platform/ballerina-standard-library/issues/1099)
- [Implement websubhub:ServiceConfig annotation for websubhub:Service](https://github.com/ballerina-platform/ballerina-standard-library/issues/1253)

## [0.2.0-alpha7] - 2021-04-02
### Fixed
- [Fix issue in form-url-encoded content-delivery in websubhub-client.](https://github.com/ballerina-platform/ballerina-standard-library/issues/1107)
27 changes: 27 additions & 0 deletions websubhub-ballerina/annotation.bal
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
// Copyright (c) 2021 WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
//
// WSO2 Inc. licenses this file to you under the Apache License,
// Version 2.0 (the "License"); you may not use this file except
// in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

# Configuration for a WebSub Hub service.
#
# + leaseSeconds - The period for which the subscription is expected to be active in the Hub
# + webHookConfig - HTTP client configurations for subscription / unsubscription intent verification
public type ServiceConfiguration record {|
int leaseSeconds?;
ClientConfiguration webHookConfig?;
|};

# WebSub Hub Configuration for the service.
public annotation ServiceConfiguration ServiceConfig on service;
64 changes: 62 additions & 2 deletions websubhub-ballerina/commons.bal
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,30 @@ const string BALLERINA_PUBLISH_HEADER = "x-ballerina-publisher";
# `SHA256 HMAC` algorithm name, this is prepended to the generated signature value.
const string SHA256_HMAC = "sha256";

# Represents HTTP/1.1 protocol
const string HTTP_1_1 = "1.1";

# Represents HTTP/2.0 protocol
const string HTTP_2_0 = "2.0";

# Options to compress using gzip or deflate.
#
# `AUTO`: When service behaves as a HTTP gateway inbound request/response accept-encoding option is set as the
# outbound request/response accept-encoding/content-encoding option
# `ALWAYS`: Always set accept-encoding/content-encoding in outbound request/response
# `NEVER`: Never set accept-encoding/content-encoding header in outbound request/response
public type Compression COMPRESSION_AUTO|COMPRESSION_ALWAYS|COMPRESSION_NEVER;

# When service behaves as a HTTP gateway inbound request/response accept-encoding option is set as the
# outbound request/response accept-encoding/content-encoding option.
public const COMPRESSION_AUTO = "AUTO";

# Always set accept-encoding/content-encoding in outbound request/response.
public const COMPRESSION_ALWAYS = "ALWAYS";

# Never set accept-encoding/content-encoding header in outbound request/response.
public const COMPRESSION_NEVER = "NEVER";

# Response Status object, used to communicate status of the executed actions.
#
# + code - status code value
Expand Down Expand Up @@ -281,8 +305,26 @@ public final InternalUnsubscriptionError INTERNAL_UNSUBSCRIPTION_ERROR = error I
public final UnsubscriptionDeniedError UNSUBSCRIPTION_DENIED_ERROR = error UnsubscriptionDeniedError("Unsubscription denied");

# Record to represent client configuration for HubClient / PublisherClient
#
# + httpVersion - The HTTP version understood by the client
# + http1Settings - Configurations related to HTTP/1.x protocol
# + http2Settings - Configurations related to HTTP/2 protocol
# + timeout - The maximum time to wait (in seconds) for a response before closing the connection
# + poolConfig - Configurations associated with request pooling
# + retryConfig - Configurations associated with retrying
# + responseLimits - Configurations associated with inbound response size limits
# + secureSocket - SSL/TLS related options
# + circuitBreaker - Configurations associated with the behaviour of the Circuit Breaker
public type ClientConfiguration record {|
*http:ClientConfiguration;
string httpVersion = HTTP_1_1;
http:ClientHttp1Settings http1Settings = {};
http:ClientHttp2Settings http2Settings = {};
decimal timeout = 60;
http:PoolConfiguration poolConfig?;
http:RetryConfig retryConfig?;
http:ResponseLimitConfigs responseLimits = {};
http:ClientSecureSocket secureSocket?;
http:CircuitBreakerConfig circuitBreaker?;
|};

# Provides a set of configurations for configure the underlying HTTP listener of the WebSubHub listener.
Expand Down Expand Up @@ -310,5 +352,23 @@ isolated function isSuccessStatusCode(int statusCode) returns boolean {
# + topic - Name of the `topic`
# + return - a `string` containing the value for `HTTP Link Header`
isolated function generateLinkUrl(string hubUrl, string topic) returns string {
return hubUrl + "; rel=\"hub\", " + topic + "; rel=\"self\"";
return string`${hubUrl}; rel=\"hub\", ${topic}; rel=\"self\"`;
}

# Converts {@code websubhub:ClientConfiguration} to {@code http:ClientConfiguration}
#
# + config - provided {@code websubhub:ClientConfiguration}
# + return - a {@code http:ClientConfiguration} from the provided {@code websubhub:ClientConfiguration}
isolated function retrieveHttpClientConfig(ClientConfiguration config) returns http:ClientConfiguration {
return {
httpVersion: config.httpVersion,
http1Settings: config.http1Settings,
http2Settings: config.http2Settings,
timeout: config.timeout,
poolConfig: config?.poolConfig,
retryConfig: config?.retryConfig,
responseLimits: config.responseLimits,
secureSocket: config?.secureSocket,
circuitBreaker: config?.circuitBreaker
};
}
10 changes: 7 additions & 3 deletions websubhub-ballerina/http_service.bal
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import ballerina/jballerina.java;

service class HttpService {
private Service hubService;
private ClientConfiguration clientConfig;
private string hub;
private int defaultHubLeaseSeconds;
private boolean isSubscriptionAvailable = false;
Expand All @@ -34,8 +35,9 @@ service class HttpService {
# + hubService - {@code websubhub:Service} provided service
# + hubUrl - {@code string} current Hub URL
# + leaseSeconds - {@code int} default value for subscription lease-seconds
public isolated function init(Service hubService, string hubUrl, int leaseSeconds) {
public isolated function init(Service hubService, string hubUrl, int leaseSeconds, *ClientConfiguration clientConfig) {
self.hubService = hubService;
self.clientConfig = clientConfig;
self.hub = hubUrl;
self.defaultHubLeaseSeconds = leaseSeconds;
string[] methodNames = getServiceMethodNames(hubService);
Expand Down Expand Up @@ -131,13 +133,15 @@ service class HttpService {
<@untainted> self.isSubscriptionAvailable,
<@untainted> self.isSubscriptionValidationAvailable,
<@untainted> self.hub,
<@untainted> self.defaultHubLeaseSeconds);
<@untainted> self.defaultHubLeaseSeconds,
self.clientConfig);
}
MODE_UNSUBSCRIBE => {
processUnsubscriptionRequestAndRespond(<@untainted> request, caller, response,
headers, <@untainted> params, self.hubService,
self.isUnsubscriptionAvailable,
<@untainted> self.isUnsubscriptionValidationAvailable);
<@untainted> self.isUnsubscriptionValidationAvailable,
self.clientConfig);
}
MODE_PUBLISH => {
string? topic = getEncodedValueOrUpdatedErrorResponse(params, HUB_TOPIC, response);
Expand Down
2 changes: 1 addition & 1 deletion websubhub-ballerina/hub_client.bal
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public client class HubClient {
self.topic = subscription.hubTopic;
self.linkHeaderValue = generateLinkUrl(self.hub, self.topic);
self.secret = subscription?.hubSecret is string ? <string>subscription?.hubSecret : "";
self.httpClient = check new(subscription.hubCallback, <http:ClientConfiguration?>config);
self.httpClient = check new(subscription.hubCallback, retrieveHttpClientConfig(config));
}

# Distributes the published content to subscribers.
Expand Down
26 changes: 22 additions & 4 deletions websubhub-ballerina/hub_listener.bal
Original file line number Diff line number Diff line change
Expand Up @@ -44,17 +44,26 @@ public class Listener {

# Attaches the provided Service to the Listener.
#
# + s - The `websubhub:Service` object to attach
# + service - The `websubhub:Service` object to attach
# + name - The path of the Service to be hosted
# + return - An `error`, if an error occurred during the service attaching process
public isolated function attach(Service s, string[]|string? name = ()) returns error? {
public isolated function attach(Service 'service, string[]|string? name = ()) returns error? {
if (self.listenerConfig.secureSocket is ()) {
log:printWarn("HTTPS is recommended but using HTTP");
}

string hubUrl = self.retrieveHubUrl(name);
// todo implement to retrieve hub-lease-seconds via annotation configuration
self.httpService = new(s, hubUrl, self.defaultHubLeaseSeconds);
ServiceConfiguration? configuration = retrieveServiceAnnotations('service);
if (configuration is ServiceConfiguration) {
int leaseSeconds = configuration?.leaseSeconds is int ? <int>(configuration?.leaseSeconds) : self.defaultHubLeaseSeconds;
if (configuration?.webHookConfig is ClientConfiguration) {
self.httpService = new('service, hubUrl, leaseSeconds, <ClientConfiguration>(configuration?.webHookConfig));
} else {
self.httpService = new('service, hubUrl, leaseSeconds);
}
} else {
self.httpService = new('service, hubUrl, self.defaultHubLeaseSeconds);
}
checkpanic self.httpListener.attach(<HttpService> self.httpService, name);
}

Expand Down Expand Up @@ -104,3 +113,12 @@ public class Listener {
return self.httpListener.immediateStop();
}
}

# Retrieves the `websubhub:ServiceConfiguration` annotation values
#
# + serviceType - current service type
# + return - {@code websubhub:ServiceConfiguration} if present or `nil` if absent
isolated function retrieveServiceAnnotations(Service serviceType) returns ServiceConfiguration? {
typedesc<any> serviceTypedesc = typeof serviceType;
return serviceTypedesc.@ServiceConfig;
}
2 changes: 1 addition & 1 deletion websubhub-ballerina/publisher_client.bal
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public client class PublisherClient {
# + config - The `websubhub:ClientConfiguration` for the underlying client or else `()`
public isolated function init(string url, *ClientConfiguration config) returns error? {
self.url = url;
self.httpClient = check new (self.url, <http:ClientConfiguration?>config);
self.httpClient = check new (self.url, retrieveHttpClientConfig(config));
}

# Registers a topic in a Ballerina WebSub Hub against which subscribers can subscribe and the publisher will
Expand Down
137 changes: 137 additions & 0 deletions websubhub-ballerina/tests/hub_annotation_test.bal
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
// Copyright (c) 2021 WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
//
// WSO2 Inc. licenses this file to you under the Apache License,
// Version 2.0 (the "License"); you may not use this file except
// in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

import ballerina/lang.runtime;
import ballerina/http;
import ballerina/io;
import ballerina/test;

boolean isSubscriptionVerifiedWithSsl = false;
boolean isContentDeliveredWithSsl = false;

http:ListenerConfiguration subConfigs = {
secureSocket: {
key: {
path: "tests/resources/ballerinaKeystore.pkcs12",
password: "ballerina"
}
}
};

service /subscriber on new http:Listener(9098, subConfigs) {
resource function get .(http:Request req, http:Caller caller) returns @tainted error? {
map<string[]> payload = req.getQueryParams();
string[] hubMode = <string[]> payload["hub.mode"];
if (hubMode[0] == "denied") {
io:println("[ANNOTATION_CONFIG] Subscriber Validation failed ", payload);
check caller->respond("");
} else {
isSubscriptionVerifiedWithSsl = true;
string[] challengeArray = <string[]> payload["hub.challenge"];
io:println("[ANNOTATION_CONFIG] Subscriber Verified ", challengeArray);
check caller->respond(challengeArray[0]);
}
}

resource function post .(http:Request req, http:Caller caller) returns @tainted error? {
isContentDeliveredWithSsl = true;
check caller->respond();
}
}

@ServiceConfig {
webHookConfig: {
secureSocket: {
cert: {
path: "tests/resources/ballerinaTruststore.pkcs12",
password: "ballerina"
}
}
}
}
service /websubhub on new Listener(9099) {

isolated remote function onRegisterTopic(TopicRegistration message) returns TopicRegistrationSuccess {
return TOPIC_REGISTRATION_SUCCESS;
}

isolated remote function onDeregisterTopic(TopicDeregistration message) returns TopicDeregistrationSuccess {
return TOPIC_DEREGISTRATION_SUCCESS;
}

remote function onUpdateMessage(UpdateMessage msg) returns Acknowledgement|UpdateMessageError {
Subscription subscriptionMsg = retrieveSubscriptionMsg("https://localhost:9098/subscriber");
HubClient|error? clientEp = new (subscriptionMsg, httpsConfig);
if (clientEp is HubClient) {
ContentDistributionMessage updateMsg = {content: <string>msg.content};
ContentDistributionSuccess|SubscriptionDeletedError|error? publishResponse = clientEp->notifyContentDistribution(updateMsg);
return ACKNOWLEDGEMENT;
} else {
return UPDATE_MESSAGE_ERROR;
}
}

isolated remote function onSubscription(Subscription msg) returns SubscriptionAccepted {
io:println("[ANNOTATION_CONFIG] Received subscription ", msg);
return SUBSCRIPTION_ACCEPTED;
}

isolated remote function onSubscriptionValidation(Subscription msg) {
}

isolated remote function onSubscriptionIntentVerified(VerifiedSubscription msg) {
}

isolated remote function onUnsubscription(Unsubscription msg) returns UnsubscriptionAccepted {
return UNSUBSCRIPTION_ACCEPTED;
}

isolated remote function onUnsubscriptionValidation(Unsubscription msg) {
}

isolated remote function onUnsubscriptionIntentVerified(VerifiedUnsubscription msg) {
}
}

PublisherClient annotationTestPublisher = checkpanic new ("http://localhost:9099/websubhub");

http:Client annotationTestClient = checkpanic new("http://localhost:9099/websubhub");

@test:Config {}
function testSubscriptionWithAnnotationConfig() returns @tainted error? {
http:Request request = new;
request.setTextPayload("hub.mode=subscribe&hub.topic=test&hub.callback=https://localhost:9098/subscriber",
"application/x-www-form-urlencoded");
http:Response response = check annotationTestClient->post("/", request);
test:assertEquals(response.statusCode, 202);
waitForActionCompletion(isSubscriptionVerifiedWithSsl);
test:assertTrue(isSubscriptionVerifiedWithSsl);
}

@test:Config {}
function testContentUpdateWithAnnotationConfig() returns @tainted error? {
Acknowledgement response = check annotationTestPublisher->publishUpdate("test", "This is a test message");
waitForActionCompletion(isContentDeliveredWithSsl);
test:assertTrue(isContentDeliveredWithSsl);
}

isolated function waitForActionCompletion(boolean flag, int count = 10) {
int counter = 10;
while (!flag && counter >= 0) {
runtime:sleep(1);
counter -= 1;
}
}
2 changes: 1 addition & 1 deletion websubhub-ballerina/tests/ssl_enabled_hub_test.bal
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ ClientConfiguration httpsConfig = {

PublisherClient sslEnabledPublisher = checkpanic new ("https://localhost:9096/websubhub", httpsConfig);

http:Client sslEnabledClient = checkpanic new("https://localhost:9096/websubhub", <http:ClientConfiguration>httpsConfig);
http:Client sslEnabledClient = checkpanic new("https://localhost:9096/websubhub", retrieveHttpClientConfig(httpsConfig));

@test:Config{}
public function testPublisherRegisterSuccessWithSsl() {
Expand Down
Loading