-
Notifications
You must be signed in to change notification settings - Fork 41
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
✅ [CSB-959] Add azure-servicebus System-X
- Loading branch information
Showing
8 changed files
with
268 additions
and
1 deletion.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
21 changes: 21 additions & 0 deletions
21
.../azure/common/src/main/java/software/tnb/azure/common/account/AzureServiceBusAccount.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,21 @@ | ||
package software.tnb.azure.common.account; | ||
|
||
import software.tnb.common.account.Account; | ||
import software.tnb.common.account.WithId; | ||
|
||
public class AzureServiceBusAccount implements Account, WithId { | ||
private String connection_string; | ||
|
||
@Override | ||
public String credentialsId() { | ||
return "azure-servicebus"; | ||
} | ||
|
||
public void setConnection_string(String connection_string) { | ||
this.connection_string = connection_string; | ||
} | ||
|
||
public String connectionString() { | ||
return connection_string; | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,33 @@ | ||
<?xml version="1.0" encoding="UTF-8"?> | ||
<project xmlns="http://maven.apache.org/POM/4.0.0" | ||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" | ||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> | ||
<parent> | ||
<artifactId>system-x-azure</artifactId> | ||
<groupId>software.tnb</groupId> | ||
<version>1.0-SNAPSHOT</version> | ||
</parent> | ||
<modelVersion>4.0.0</modelVersion> | ||
|
||
<artifactId>system-x-azure-service-bus</artifactId> | ||
<version>1.0-SNAPSHOT</version> | ||
<name>TNB :: System-X :: Services :: Azure :: Service Bus</name> | ||
|
||
<properties> | ||
<azure.messaging.servicebus.version>7.13.3</azure.messaging.servicebus.version> | ||
</properties> | ||
|
||
<dependencies> | ||
<dependency> | ||
<groupId>software.tnb</groupId> | ||
<artifactId>system-x-azure-common</artifactId> | ||
<version>1.0-SNAPSHOT</version> | ||
</dependency> | ||
<dependency> | ||
<groupId>com.azure</groupId> | ||
<artifactId>azure-messaging-servicebus</artifactId> | ||
<version>${azure.messaging.servicebus.version}</version> | ||
</dependency> | ||
</dependencies> | ||
|
||
</project> |
62 changes: 62 additions & 0 deletions
62
...zure/service-bus/src/main/java/software/tnb/azure/service/bus/service/ErrorProcessor.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,62 @@ | ||
package software.tnb.azure.service.bus.service; | ||
|
||
import com.azure.messaging.servicebus.ServiceBusErrorContext; | ||
import com.azure.messaging.servicebus.ServiceBusException; | ||
import com.azure.messaging.servicebus.ServiceBusFailureReason; | ||
|
||
import java.util.ArrayList; | ||
import java.util.List; | ||
import java.util.concurrent.CountDownLatch; | ||
import java.util.concurrent.TimeUnit; | ||
import java.util.function.Consumer; | ||
|
||
public class ErrorProcessor implements Consumer<ServiceBusErrorContext> { | ||
|
||
private final List<String> errors; | ||
|
||
public ErrorProcessor() { | ||
this.errors = new ArrayList<>(); | ||
} | ||
|
||
@Override | ||
public void accept(ServiceBusErrorContext context) { | ||
CountDownLatch countdownLatch = new CountDownLatch(1); | ||
errors.add(String.format("Error when receiving messages from namespace: '%s'. Entity: '%s'%n", | ||
context.getFullyQualifiedNamespace(), context.getEntityPath())); | ||
|
||
if (!(context.getException() instanceof ServiceBusException)) { | ||
errors.add(String.format("Non-ServiceBusException occurred: %s%n", context.getException())); | ||
return; | ||
} | ||
|
||
ServiceBusException exception = (ServiceBusException) context.getException(); | ||
ServiceBusFailureReason reason = exception.getReason(); | ||
|
||
if (reason == ServiceBusFailureReason.MESSAGING_ENTITY_DISABLED | ||
|| reason == ServiceBusFailureReason.MESSAGING_ENTITY_NOT_FOUND | ||
|| reason == ServiceBusFailureReason.UNAUTHORIZED) { | ||
errors.add(String.format("An unrecoverable error occurred. Stopping processing with reason %s: %s%n", | ||
reason, exception.getMessage())); | ||
countdownLatch.countDown(); | ||
} else if (reason == ServiceBusFailureReason.MESSAGE_LOCK_LOST) { | ||
errors.add(String.format("Message lock lost for message: %s%n", context.getException())); | ||
} else if (reason == ServiceBusFailureReason.SERVICE_BUSY) { | ||
try { | ||
TimeUnit.SECONDS.sleep(1L); | ||
} catch (InterruptedException e) { | ||
throw new RuntimeException("Unable to sleep for period of time", e); | ||
} | ||
} else { | ||
errors.add(String.format("Error source %s, reason %s, message: %s%n", context.getErrorSource(), | ||
reason, context.getException())); | ||
} | ||
} | ||
|
||
public List<String> getErrors() { | ||
return errors; | ||
} | ||
|
||
public void reset() { | ||
errors.clear(); | ||
} | ||
} |
29 changes: 29 additions & 0 deletions
29
...re/service-bus/src/main/java/software/tnb/azure/service/bus/service/MessageProcessor.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,29 @@ | ||
package software.tnb.azure.service.bus.service; | ||
|
||
import com.azure.messaging.servicebus.ServiceBusReceivedMessage; | ||
import com.azure.messaging.servicebus.ServiceBusReceivedMessageContext; | ||
|
||
import java.util.concurrent.ConcurrentLinkedQueue; | ||
import java.util.function.Consumer; | ||
|
||
public class MessageProcessor implements Consumer<ServiceBusReceivedMessageContext> { | ||
|
||
private final ConcurrentLinkedQueue<ServiceBusReceivedMessage> messages; | ||
|
||
public MessageProcessor() { | ||
this.messages = new ConcurrentLinkedQueue<>(); | ||
} | ||
|
||
public void reset() { | ||
messages.clear(); | ||
} | ||
|
||
@Override | ||
public void accept(ServiceBusReceivedMessageContext context) { | ||
messages.add(context.getMessage()); | ||
} | ||
|
||
public ConcurrentLinkedQueue<ServiceBusReceivedMessage> getMessages() { | ||
return messages; | ||
} | ||
} |
39 changes: 39 additions & 0 deletions
39
...es/azure/service-bus/src/main/java/software/tnb/azure/service/bus/service/ServiceBus.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,39 @@ | ||
package software.tnb.azure.service.bus.service; | ||
|
||
import software.tnb.azure.common.account.AzureServiceBusAccount; | ||
import software.tnb.azure.service.bus.validation.ServiceBusValidation; | ||
import software.tnb.common.account.AccountFactory; | ||
import software.tnb.common.service.Service; | ||
|
||
import org.junit.jupiter.api.extension.ExtensionContext; | ||
|
||
import com.azure.messaging.servicebus.administration.ServiceBusAdministrationClient; | ||
import com.azure.messaging.servicebus.administration.ServiceBusAdministrationClientBuilder; | ||
import com.google.auto.service.AutoService; | ||
|
||
@AutoService(ServiceBus.class) | ||
public class ServiceBus implements Service { | ||
private ServiceBusValidation validation; | ||
|
||
private static AzureServiceBusAccount azureServiceBusAccount() { | ||
return AccountFactory.create(AzureServiceBusAccount.class); | ||
} | ||
|
||
private static ServiceBusAdministrationClient getAdminClient() { | ||
return new ServiceBusAdministrationClientBuilder().connectionString(azureServiceBusAccount().connectionString()).buildClient(); | ||
} | ||
|
||
public ServiceBusValidation validation() { | ||
return validation; | ||
} | ||
|
||
@Override | ||
public void afterAll(ExtensionContext extensionContext) { | ||
// no-op | ||
} | ||
|
||
@Override | ||
public void beforeAll(ExtensionContext extensionContext) { | ||
validation = new ServiceBusValidation(azureServiceBusAccount(), getAdminClient()); | ||
} | ||
} |
76 changes: 76 additions & 0 deletions
76
...ice-bus/src/main/java/software/tnb/azure/service/bus/validation/ServiceBusValidation.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,76 @@ | ||
package software.tnb.azure.service.bus.validation; | ||
|
||
import software.tnb.azure.common.account.AzureServiceBusAccount; | ||
import software.tnb.azure.service.bus.service.ErrorProcessor; | ||
import software.tnb.azure.service.bus.service.MessageProcessor; | ||
import software.tnb.common.utils.WaitUtils; | ||
|
||
import com.azure.messaging.servicebus.ServiceBusClientBuilder; | ||
import com.azure.messaging.servicebus.ServiceBusMessage; | ||
import com.azure.messaging.servicebus.ServiceBusProcessorClient; | ||
import com.azure.messaging.servicebus.ServiceBusSenderClient; | ||
import com.azure.messaging.servicebus.administration.ServiceBusAdministrationClient; | ||
|
||
import java.util.List; | ||
import java.util.stream.Collectors; | ||
import java.util.stream.Stream; | ||
|
||
public class ServiceBusValidation { | ||
private final AzureServiceBusAccount azureServiceBusAccount; | ||
private final ServiceBusAdministrationClient adminClient; | ||
|
||
public ServiceBusValidation(AzureServiceBusAccount azureServiceBusAccount, ServiceBusAdministrationClient adminClient) { | ||
this.azureServiceBusAccount = azureServiceBusAccount; | ||
this.adminClient = adminClient; | ||
} | ||
|
||
public AzureServiceBusAccount getAzureServiceBusAccount() { | ||
return azureServiceBusAccount; | ||
} | ||
|
||
public void createQueue(String queue) { | ||
adminClient.createQueue(queue); | ||
WaitUtils.waitFor(() -> adminClient.getQueueExists(queue), 10 | ||
, 1000L, "Waiting until the queue " + queue + " is created"); | ||
} | ||
|
||
public void deleteQueue(String queue) { | ||
adminClient.deleteQueue(queue); | ||
WaitUtils.waitFor(() -> !adminClient.getQueueExists(queue), 10 | ||
, 1000L, "Waiting until the queue " + queue + " is deleted"); | ||
} | ||
|
||
public void sendMessage(String queue, String message) { | ||
ServiceBusSenderClient client = new ServiceBusClientBuilder() | ||
.connectionString(azureServiceBusAccount.connectionString()) | ||
.sender() | ||
.queueName(queue) | ||
.buildClient(); | ||
|
||
client.sendMessage(new ServiceBusMessage(message)); | ||
} | ||
|
||
public List<String> receiveMessages(String queue) { | ||
final MessageProcessor messageProcessor = new MessageProcessor(); | ||
final ErrorProcessor errorProcessor = new ErrorProcessor(); | ||
|
||
try (ServiceBusProcessorClient client = new ServiceBusClientBuilder() | ||
.connectionString(azureServiceBusAccount.connectionString()) | ||
.processor() | ||
.queueName(queue) | ||
.processMessage(messageProcessor) | ||
.processError(errorProcessor) | ||
.buildProcessorClient()) { | ||
|
||
client.start(); | ||
WaitUtils.waitFor(() -> !(errorProcessor.getErrors().isEmpty() | ||
&& messageProcessor.getMessages().isEmpty()) | ||
, 10, 1000L, "Waiting for messages"); | ||
} | ||
|
||
return Stream.concat(errorProcessor.getErrors().stream() | ||
, messageProcessor.getMessages().stream() | ||
.map(serviceBusReceivedMessage -> serviceBusReceivedMessage.getBody().toString())) | ||
.collect(Collectors.toList()); | ||
} | ||
} |