Skip to content

Commit

Permalink
✅ [CSB-959] Add azure-servicebus System-X
Browse files Browse the repository at this point in the history
  • Loading branch information
dolearci authored and mcarlett committed Mar 31, 2023
1 parent 19662a1 commit bfad610
Show file tree
Hide file tree
Showing 8 changed files with 268 additions and 1 deletion.
8 changes: 7 additions & 1 deletion bom/pom.xml
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://maven.apache.org/POM/4.0.0"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<!-- This file is automatically updated by update_bom.sh script, do not edit this file manually -->
<parent>
<artifactId>tnb-parent</artifactId>
Expand Down Expand Up @@ -104,6 +105,11 @@
<artifactId>system-x-azure-event-hubs</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>software.tnb</groupId>
<artifactId>system-x-azure-service-bus</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>software.tnb</groupId>
<artifactId>system-x-azure-storage-blob</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package software.tnb.azure.common.account;

import software.tnb.common.account.Account;
import software.tnb.common.account.WithId;

public class AzureServiceBusAccount implements Account, WithId {
private String connection_string;

@Override
public String credentialsId() {
return "azure-servicebus";
}

public void setConnection_string(String connection_string) {
this.connection_string = connection_string;
}

public String connectionString() {
return connection_string;
}
}
1 change: 1 addition & 0 deletions system-x/services/azure/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -25,5 +25,6 @@
<module>event-hubs</module>
<module>storage-blob</module>
<module>storage-queue</module>
<module>service-bus</module>
</modules>
</project>
33 changes: 33 additions & 0 deletions system-x/services/azure/service-bus/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>system-x-azure</artifactId>
<groupId>software.tnb</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>system-x-azure-service-bus</artifactId>
<version>1.0-SNAPSHOT</version>
<name>TNB :: System-X :: Services :: Azure :: Service Bus</name>

<properties>
<azure.messaging.servicebus.version>7.13.3</azure.messaging.servicebus.version>
</properties>

<dependencies>
<dependency>
<groupId>software.tnb</groupId>
<artifactId>system-x-azure-common</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-messaging-servicebus</artifactId>
<version>${azure.messaging.servicebus.version}</version>
</dependency>
</dependencies>

</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package software.tnb.azure.service.bus.service;

import com.azure.messaging.servicebus.ServiceBusErrorContext;
import com.azure.messaging.servicebus.ServiceBusException;
import com.azure.messaging.servicebus.ServiceBusFailureReason;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;

public class ErrorProcessor implements Consumer<ServiceBusErrorContext> {

private final List<String> errors;

public ErrorProcessor() {
this.errors = new ArrayList<>();
}

@Override
public void accept(ServiceBusErrorContext context) {
CountDownLatch countdownLatch = new CountDownLatch(1);
errors.add(String.format("Error when receiving messages from namespace: '%s'. Entity: '%s'%n",
context.getFullyQualifiedNamespace(), context.getEntityPath()));

if (!(context.getException() instanceof ServiceBusException)) {
errors.add(String.format("Non-ServiceBusException occurred: %s%n", context.getException()));
return;
}

ServiceBusException exception = (ServiceBusException) context.getException();
ServiceBusFailureReason reason = exception.getReason();

if (reason == ServiceBusFailureReason.MESSAGING_ENTITY_DISABLED
|| reason == ServiceBusFailureReason.MESSAGING_ENTITY_NOT_FOUND
|| reason == ServiceBusFailureReason.UNAUTHORIZED) {
errors.add(String.format("An unrecoverable error occurred. Stopping processing with reason %s: %s%n",
reason, exception.getMessage()));
countdownLatch.countDown();
} else if (reason == ServiceBusFailureReason.MESSAGE_LOCK_LOST) {
errors.add(String.format("Message lock lost for message: %s%n", context.getException()));
} else if (reason == ServiceBusFailureReason.SERVICE_BUSY) {
try {
TimeUnit.SECONDS.sleep(1L);
} catch (InterruptedException e) {
throw new RuntimeException("Unable to sleep for period of time", e);
}
} else {
errors.add(String.format("Error source %s, reason %s, message: %s%n", context.getErrorSource(),
reason, context.getException()));
}
}

public List<String> getErrors() {
return errors;
}

public void reset() {
errors.clear();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package software.tnb.azure.service.bus.service;

import com.azure.messaging.servicebus.ServiceBusReceivedMessage;
import com.azure.messaging.servicebus.ServiceBusReceivedMessageContext;

import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.function.Consumer;

public class MessageProcessor implements Consumer<ServiceBusReceivedMessageContext> {

private final ConcurrentLinkedQueue<ServiceBusReceivedMessage> messages;

public MessageProcessor() {
this.messages = new ConcurrentLinkedQueue<>();
}

public void reset() {
messages.clear();
}

@Override
public void accept(ServiceBusReceivedMessageContext context) {
messages.add(context.getMessage());
}

public ConcurrentLinkedQueue<ServiceBusReceivedMessage> getMessages() {
return messages;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package software.tnb.azure.service.bus.service;

import software.tnb.azure.common.account.AzureServiceBusAccount;
import software.tnb.azure.service.bus.validation.ServiceBusValidation;
import software.tnb.common.account.AccountFactory;
import software.tnb.common.service.Service;

import org.junit.jupiter.api.extension.ExtensionContext;

import com.azure.messaging.servicebus.administration.ServiceBusAdministrationClient;
import com.azure.messaging.servicebus.administration.ServiceBusAdministrationClientBuilder;
import com.google.auto.service.AutoService;

@AutoService(ServiceBus.class)
public class ServiceBus implements Service {
private ServiceBusValidation validation;

private static AzureServiceBusAccount azureServiceBusAccount() {
return AccountFactory.create(AzureServiceBusAccount.class);
}

private static ServiceBusAdministrationClient getAdminClient() {
return new ServiceBusAdministrationClientBuilder().connectionString(azureServiceBusAccount().connectionString()).buildClient();
}

public ServiceBusValidation validation() {
return validation;
}

@Override
public void afterAll(ExtensionContext extensionContext) {
// no-op
}

@Override
public void beforeAll(ExtensionContext extensionContext) {
validation = new ServiceBusValidation(azureServiceBusAccount(), getAdminClient());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package software.tnb.azure.service.bus.validation;

import software.tnb.azure.common.account.AzureServiceBusAccount;
import software.tnb.azure.service.bus.service.ErrorProcessor;
import software.tnb.azure.service.bus.service.MessageProcessor;
import software.tnb.common.utils.WaitUtils;

import com.azure.messaging.servicebus.ServiceBusClientBuilder;
import com.azure.messaging.servicebus.ServiceBusMessage;
import com.azure.messaging.servicebus.ServiceBusProcessorClient;
import com.azure.messaging.servicebus.ServiceBusSenderClient;
import com.azure.messaging.servicebus.administration.ServiceBusAdministrationClient;

import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;

public class ServiceBusValidation {
private final AzureServiceBusAccount azureServiceBusAccount;
private final ServiceBusAdministrationClient adminClient;

public ServiceBusValidation(AzureServiceBusAccount azureServiceBusAccount, ServiceBusAdministrationClient adminClient) {
this.azureServiceBusAccount = azureServiceBusAccount;
this.adminClient = adminClient;
}

public AzureServiceBusAccount getAzureServiceBusAccount() {
return azureServiceBusAccount;
}

public void createQueue(String queue) {
adminClient.createQueue(queue);
WaitUtils.waitFor(() -> adminClient.getQueueExists(queue), 10
, 1000L, "Waiting until the queue " + queue + " is created");
}

public void deleteQueue(String queue) {
adminClient.deleteQueue(queue);
WaitUtils.waitFor(() -> !adminClient.getQueueExists(queue), 10
, 1000L, "Waiting until the queue " + queue + " is deleted");
}

public void sendMessage(String queue, String message) {
ServiceBusSenderClient client = new ServiceBusClientBuilder()
.connectionString(azureServiceBusAccount.connectionString())
.sender()
.queueName(queue)
.buildClient();

client.sendMessage(new ServiceBusMessage(message));
}

public List<String> receiveMessages(String queue) {
final MessageProcessor messageProcessor = new MessageProcessor();
final ErrorProcessor errorProcessor = new ErrorProcessor();

try (ServiceBusProcessorClient client = new ServiceBusClientBuilder()
.connectionString(azureServiceBusAccount.connectionString())
.processor()
.queueName(queue)
.processMessage(messageProcessor)
.processError(errorProcessor)
.buildProcessorClient()) {

client.start();
WaitUtils.waitFor(() -> !(errorProcessor.getErrors().isEmpty()
&& messageProcessor.getMessages().isEmpty())
, 10, 1000L, "Waiting for messages");
}

return Stream.concat(errorProcessor.getErrors().stream()
, messageProcessor.getMessages().stream()
.map(serviceBusReceivedMessage -> serviceBusReceivedMessage.getBody().toString()))
.collect(Collectors.toList());
}
}

0 comments on commit bfad610

Please sign in to comment.