Skip to content

Commit

Permalink
Support quorum queues in QueueBuilder
Browse files Browse the repository at this point in the history
* Fix race in unrelated test
  • Loading branch information
garyrussell authored and artembilan committed Nov 6, 2019
1 parent 23d35b6 commit 6edcac7
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,26 @@ public QueueBuilder masterLocator(MasterLocator locator) {
return withArgument("x-queue-master-locator", locator.getValue());
}

/**
* Set the queue argument to declare a queue of type 'quorum' instead of 'classic'.
* @return the builder.
* @since 2.2.2
*/
public QueueBuilder quorum() {
return withArgument("x-queue-type", "quorum");
}

/**
* Set the delivery limit; only applies to quorum queues.
* @param limit the limit.
* @return the builder.
* @since 2.2.2
* @see #quorum()
*/
public QueueBuilder deliveryLimit(int limit) {
return withArgument("x-delivery-limit", limit);
}

/**
* Builds a final queue.
* @return the Queue instance.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ static void setUp() {

@AfterAll
static void tearDown() {
brokerRunning.deleteQueues("all.args.1", "all.args.2", "all.args.3");
brokerRunning.deleteQueues("all.args.1", "all.args.2", "all.args.3", "test.quorum");
}

/**
Expand All @@ -90,13 +90,13 @@ static void tearDown() {
* @throws Exception the exception.
*/
@Test
public void test() throws Exception {
void test() throws Exception {
assertThat(this.rabbitTemplate.convertSendAndReceive("foo")).isNull();
assertThat(this.deadListener.latch.await(10, TimeUnit.SECONDS)).isTrue();
}

@Test
public void testQueueArgs1() throws MalformedURLException, URISyntaxException, InterruptedException {
void testQueueArgs1() throws MalformedURLException, URISyntaxException, InterruptedException {
Client client = new Client(brokerRunning.getAdminUri(), brokerRunning.getAdminUser(),
brokerRunning.getAdminPassword());
QueueInfo queue = client.getQueue("/", "all.args.1");
Expand All @@ -105,6 +105,7 @@ public void testQueueArgs1() throws MalformedURLException, URISyntaxException, I
Thread.sleep(100);
queue = client.getQueue("/", "all.args.1");
}
assertThat(n).isLessThan(100);
Map<String, Object> arguments = queue.getArguments();
assertThat(arguments.get("x-message-ttl")).isEqualTo(1000);
assertThat(arguments.get("x-expires")).isEqualTo(200_000);
Expand All @@ -119,7 +120,7 @@ public void testQueueArgs1() throws MalformedURLException, URISyntaxException, I
}

@Test
public void testQueueArgs2() throws MalformedURLException, URISyntaxException, InterruptedException {
void testQueueArgs2() throws MalformedURLException, URISyntaxException, InterruptedException {
Client client = new Client(brokerRunning.getAdminUri(), brokerRunning.getAdminUser(),
brokerRunning.getAdminPassword());
QueueInfo queue = client.getQueue("/", "all.args.2");
Expand All @@ -128,6 +129,7 @@ public void testQueueArgs2() throws MalformedURLException, URISyntaxException, I
Thread.sleep(100);
queue = client.getQueue("/", "all.args.1");
}
assertThat(n).isLessThan(100);
Map<String, Object> arguments = queue.getArguments();
assertThat(arguments.get("x-message-ttl")).isEqualTo(1000);
assertThat(arguments.get("x-expires")).isEqualTo(200_000);
Expand All @@ -142,7 +144,7 @@ public void testQueueArgs2() throws MalformedURLException, URISyntaxException, I
}

@Test
public void testQueueArgs3() throws MalformedURLException, URISyntaxException, InterruptedException {
void testQueueArgs3() throws MalformedURLException, URISyntaxException, InterruptedException {
Client client = new Client(brokerRunning.getAdminUri(), brokerRunning.getAdminUser(),
brokerRunning.getAdminPassword());
QueueInfo queue = client.getQueue("/", "all.args.3");
Expand All @@ -151,6 +153,7 @@ public void testQueueArgs3() throws MalformedURLException, URISyntaxException, I
Thread.sleep(100);
queue = client.getQueue("/", "all.args.1");
}
assertThat(n).isLessThan(100);
Map<String, Object> arguments = queue.getArguments();
assertThat(arguments.get("x-message-ttl")).isEqualTo(1000);
assertThat(arguments.get("x-expires")).isEqualTo(200_000);
Expand All @@ -165,7 +168,26 @@ public void testQueueArgs3() throws MalformedURLException, URISyntaxException, I

ExchangeInfo exchange = client.getExchange("/", "dlx.test.requestEx");
assertThat(exchange.getArguments().get("alternate-exchange")).isEqualTo("alternate");
}
}

/*
* Does not require a 3.8 broker - they are just arbitrary arguments.
*/
@Test
void testQuorumArgs() throws MalformedURLException, URISyntaxException, InterruptedException {
Client client = new Client(brokerRunning.getAdminUri(), brokerRunning.getAdminUser(),
brokerRunning.getAdminPassword());
QueueInfo queue = client.getQueue("/", "test.quorum");
int n = 0;
while (n++ < 100 && queue == null) {
Thread.sleep(100);
queue = client.getQueue("/", "test.quorum");
}
assertThat(n).isLessThan(100);
Map<String, Object> arguments = queue.getArguments();
assertThat(arguments.get("x-queue-type")).isEqualTo("quorum");
assertThat(arguments.get("x-delivery-limit")).isEqualTo(10);
}

@Configuration
public static class FixedReplyQueueDeadLetterConfig {
Expand Down Expand Up @@ -330,6 +352,14 @@ public Queue allArgs3() {
.build();
}

@Bean
public Queue quorum() {
return QueueBuilder.durable("test.quorum")
.quorum()
.deliveryLimit(10)
.build();
}

@Bean
public DeadListener deadListener() {
return new DeadListener();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,10 @@ public void testDeferredChannelCacheNack() throws Exception {
template.convertAndSend("", QUEUE2 + "junk", "foo", new MyCD("foo"));
assertThat(returnLatch.await(10, TimeUnit.SECONDS)).isTrue();
assertThat(confirmLatch.await(10, TimeUnit.SECONDS)).isTrue();
int n = 0;
while (n++ < 100 && cacheCount.get() != 1) {
Thread.sleep(100);
}
assertThat(cacheCount.get()).isEqualTo(1);
assertThat(returnCalledFirst.get()).isTrue();
cf.destroy();
Expand Down

0 comments on commit 6edcac7

Please sign in to comment.