diff --git a/bom/pom.xml b/bom/pom.xml index 177bf7a6c..57a1b5e3d 100644 --- a/bom/pom.xml +++ b/bom/pom.xml @@ -1,5 +1,6 @@ - + tnb-parent @@ -104,6 +105,11 @@ system-x-azure-event-hubs 1.0-SNAPSHOT + + software.tnb + system-x-azure-service-bus + 1.0-SNAPSHOT + software.tnb system-x-azure-storage-blob diff --git a/system-x/services/azure/common/src/main/java/software/tnb/azure/common/account/AzureServiceBusAccount.java b/system-x/services/azure/common/src/main/java/software/tnb/azure/common/account/AzureServiceBusAccount.java new file mode 100644 index 000000000..766d74f90 --- /dev/null +++ b/system-x/services/azure/common/src/main/java/software/tnb/azure/common/account/AzureServiceBusAccount.java @@ -0,0 +1,21 @@ +package software.tnb.azure.common.account; + +import software.tnb.common.account.Account; +import software.tnb.common.account.WithId; + +public class AzureServiceBusAccount implements Account, WithId { + private String connection_string; + + @Override + public String credentialsId() { + return "azure-servicebus"; + } + + public void setConnection_string(String connection_string) { + this.connection_string = connection_string; + } + + public String connectionString() { + return connection_string; + } +} diff --git a/system-x/services/azure/pom.xml b/system-x/services/azure/pom.xml index d8efe67d3..54e0a733b 100644 --- a/system-x/services/azure/pom.xml +++ b/system-x/services/azure/pom.xml @@ -25,5 +25,6 @@ event-hubs storage-blob storage-queue + service-bus diff --git a/system-x/services/azure/service-bus/pom.xml b/system-x/services/azure/service-bus/pom.xml new file mode 100644 index 000000000..b855aa24d --- /dev/null +++ b/system-x/services/azure/service-bus/pom.xml @@ -0,0 +1,33 @@ + + + + system-x-azure + software.tnb + 1.0-SNAPSHOT + + 4.0.0 + + system-x-azure-service-bus + 1.0-SNAPSHOT + TNB :: System-X :: Services :: Azure :: Service Bus + + + 7.13.3 + + + + + software.tnb + system-x-azure-common + 1.0-SNAPSHOT + + + com.azure + azure-messaging-servicebus + ${azure.messaging.servicebus.version} + + + + diff --git a/system-x/services/azure/service-bus/src/main/java/software/tnb/azure/service/bus/service/ErrorProcessor.java b/system-x/services/azure/service-bus/src/main/java/software/tnb/azure/service/bus/service/ErrorProcessor.java new file mode 100644 index 000000000..d47e1fd6c --- /dev/null +++ b/system-x/services/azure/service-bus/src/main/java/software/tnb/azure/service/bus/service/ErrorProcessor.java @@ -0,0 +1,62 @@ +package software.tnb.azure.service.bus.service; + +import com.azure.messaging.servicebus.ServiceBusErrorContext; +import com.azure.messaging.servicebus.ServiceBusException; +import com.azure.messaging.servicebus.ServiceBusFailureReason; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; + +public class ErrorProcessor implements Consumer { + + private final List errors; + + public ErrorProcessor() { + this.errors = new ArrayList<>(); + } + + @Override + public void accept(ServiceBusErrorContext context) { + CountDownLatch countdownLatch = new CountDownLatch(1); + errors.add(String.format("Error when receiving messages from namespace: '%s'. Entity: '%s'%n", + context.getFullyQualifiedNamespace(), context.getEntityPath())); + + if (!(context.getException() instanceof ServiceBusException)) { + errors.add(String.format("Non-ServiceBusException occurred: %s%n", context.getException())); + return; + } + + ServiceBusException exception = (ServiceBusException) context.getException(); + ServiceBusFailureReason reason = exception.getReason(); + + if (reason == ServiceBusFailureReason.MESSAGING_ENTITY_DISABLED + || reason == ServiceBusFailureReason.MESSAGING_ENTITY_NOT_FOUND + || reason == ServiceBusFailureReason.UNAUTHORIZED) { + errors.add(String.format("An unrecoverable error occurred. Stopping processing with reason %s: %s%n", + reason, exception.getMessage())); + countdownLatch.countDown(); + } else if (reason == ServiceBusFailureReason.MESSAGE_LOCK_LOST) { + errors.add(String.format("Message lock lost for message: %s%n", context.getException())); + } else if (reason == ServiceBusFailureReason.SERVICE_BUSY) { + try { + TimeUnit.SECONDS.sleep(1L); + } catch (InterruptedException e) { + throw new RuntimeException("Unable to sleep for period of time", e); + } + } else { + errors.add(String.format("Error source %s, reason %s, message: %s%n", context.getErrorSource(), + reason, context.getException())); + } + } + + public List getErrors() { + return errors; + } + + public void reset() { + errors.clear(); + } +} diff --git a/system-x/services/azure/service-bus/src/main/java/software/tnb/azure/service/bus/service/MessageProcessor.java b/system-x/services/azure/service-bus/src/main/java/software/tnb/azure/service/bus/service/MessageProcessor.java new file mode 100644 index 000000000..4a9a3c094 --- /dev/null +++ b/system-x/services/azure/service-bus/src/main/java/software/tnb/azure/service/bus/service/MessageProcessor.java @@ -0,0 +1,29 @@ +package software.tnb.azure.service.bus.service; + +import com.azure.messaging.servicebus.ServiceBusReceivedMessage; +import com.azure.messaging.servicebus.ServiceBusReceivedMessageContext; + +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.function.Consumer; + +public class MessageProcessor implements Consumer { + + private final ConcurrentLinkedQueue messages; + + public MessageProcessor() { + this.messages = new ConcurrentLinkedQueue<>(); + } + + public void reset() { + messages.clear(); + } + + @Override + public void accept(ServiceBusReceivedMessageContext context) { + messages.add(context.getMessage()); + } + + public ConcurrentLinkedQueue getMessages() { + return messages; + } +} diff --git a/system-x/services/azure/service-bus/src/main/java/software/tnb/azure/service/bus/service/ServiceBus.java b/system-x/services/azure/service-bus/src/main/java/software/tnb/azure/service/bus/service/ServiceBus.java new file mode 100644 index 000000000..99e853343 --- /dev/null +++ b/system-x/services/azure/service-bus/src/main/java/software/tnb/azure/service/bus/service/ServiceBus.java @@ -0,0 +1,39 @@ +package software.tnb.azure.service.bus.service; + +import software.tnb.azure.common.account.AzureServiceBusAccount; +import software.tnb.azure.service.bus.validation.ServiceBusValidation; +import software.tnb.common.account.AccountFactory; +import software.tnb.common.service.Service; + +import org.junit.jupiter.api.extension.ExtensionContext; + +import com.azure.messaging.servicebus.administration.ServiceBusAdministrationClient; +import com.azure.messaging.servicebus.administration.ServiceBusAdministrationClientBuilder; +import com.google.auto.service.AutoService; + +@AutoService(ServiceBus.class) +public class ServiceBus implements Service { + private ServiceBusValidation validation; + + private static AzureServiceBusAccount azureServiceBusAccount() { + return AccountFactory.create(AzureServiceBusAccount.class); + } + + private static ServiceBusAdministrationClient getAdminClient() { + return new ServiceBusAdministrationClientBuilder().connectionString(azureServiceBusAccount().connectionString()).buildClient(); + } + + public ServiceBusValidation validation() { + return validation; + } + + @Override + public void afterAll(ExtensionContext extensionContext) { + // no-op + } + + @Override + public void beforeAll(ExtensionContext extensionContext) { + validation = new ServiceBusValidation(azureServiceBusAccount(), getAdminClient()); + } +} diff --git a/system-x/services/azure/service-bus/src/main/java/software/tnb/azure/service/bus/validation/ServiceBusValidation.java b/system-x/services/azure/service-bus/src/main/java/software/tnb/azure/service/bus/validation/ServiceBusValidation.java new file mode 100644 index 000000000..66e3a0cc0 --- /dev/null +++ b/system-x/services/azure/service-bus/src/main/java/software/tnb/azure/service/bus/validation/ServiceBusValidation.java @@ -0,0 +1,76 @@ +package software.tnb.azure.service.bus.validation; + +import software.tnb.azure.common.account.AzureServiceBusAccount; +import software.tnb.azure.service.bus.service.ErrorProcessor; +import software.tnb.azure.service.bus.service.MessageProcessor; +import software.tnb.common.utils.WaitUtils; + +import com.azure.messaging.servicebus.ServiceBusClientBuilder; +import com.azure.messaging.servicebus.ServiceBusMessage; +import com.azure.messaging.servicebus.ServiceBusProcessorClient; +import com.azure.messaging.servicebus.ServiceBusSenderClient; +import com.azure.messaging.servicebus.administration.ServiceBusAdministrationClient; + +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +public class ServiceBusValidation { + private final AzureServiceBusAccount azureServiceBusAccount; + private final ServiceBusAdministrationClient adminClient; + + public ServiceBusValidation(AzureServiceBusAccount azureServiceBusAccount, ServiceBusAdministrationClient adminClient) { + this.azureServiceBusAccount = azureServiceBusAccount; + this.adminClient = adminClient; + } + + public AzureServiceBusAccount getAzureServiceBusAccount() { + return azureServiceBusAccount; + } + + public void createQueue(String queue) { + adminClient.createQueue(queue); + WaitUtils.waitFor(() -> adminClient.getQueueExists(queue), 10 + , 1000L, "Waiting until the queue " + queue + " is created"); + } + + public void deleteQueue(String queue) { + adminClient.deleteQueue(queue); + WaitUtils.waitFor(() -> !adminClient.getQueueExists(queue), 10 + , 1000L, "Waiting until the queue " + queue + " is deleted"); + } + + public void sendMessage(String queue, String message) { + ServiceBusSenderClient client = new ServiceBusClientBuilder() + .connectionString(azureServiceBusAccount.connectionString()) + .sender() + .queueName(queue) + .buildClient(); + + client.sendMessage(new ServiceBusMessage(message)); + } + + public List receiveMessages(String queue) { + final MessageProcessor messageProcessor = new MessageProcessor(); + final ErrorProcessor errorProcessor = new ErrorProcessor(); + + try (ServiceBusProcessorClient client = new ServiceBusClientBuilder() + .connectionString(azureServiceBusAccount.connectionString()) + .processor() + .queueName(queue) + .processMessage(messageProcessor) + .processError(errorProcessor) + .buildProcessorClient()) { + + client.start(); + WaitUtils.waitFor(() -> !(errorProcessor.getErrors().isEmpty() + && messageProcessor.getMessages().isEmpty()) + , 10, 1000L, "Waiting for messages"); + } + + return Stream.concat(errorProcessor.getErrors().stream() + , messageProcessor.getMessages().stream() + .map(serviceBusReceivedMessage -> serviceBusReceivedMessage.getBody().toString())) + .collect(Collectors.toList()); + } +}