Skip to content

Commit

Permalink
Merge branch 'smallrye:main' into feature/failure_strategies
Browse files Browse the repository at this point in the history
  • Loading branch information
dankristensen authored May 6, 2024
2 parents 55e2322 + f894d90 commit dd48f6a
Show file tree
Hide file tree
Showing 13 changed files with 31 additions and 23 deletions.
1 change: 1 addition & 0 deletions .github/workflows/build-main-branches.yml
Original file line number Diff line number Diff line change
Expand Up @@ -88,3 +88,4 @@ jobs:
--fail-at-end
- name: Codecov
uses: codecov/[email protected]
if: false
1 change: 1 addition & 0 deletions .github/workflows/build-pull.yml
Original file line number Diff line number Diff line change
Expand Up @@ -56,3 +56,4 @@ jobs:
-B clean install -Pcoverage -Dtest-containers=true ${{ matrix.java.build_opts }}
- name: Codecov
uses: codecov/[email protected]
if: false
2 changes: 1 addition & 1 deletion documentation/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-install-plugin</artifactId>
<version>3.1.1</version>
<version>3.1.2</version>
<configuration>
<skip>true</skip>
</configuration>
Expand Down
2 changes: 1 addition & 1 deletion examples/kafka-quickstart-kotlin/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-install-plugin</artifactId>
<version>3.1.1</version>
<version>3.1.2</version>
<configuration>
<skip>true</skip>
</configuration>
Expand Down
2 changes: 1 addition & 1 deletion examples/kafka-quickstart/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-install-plugin</artifactId>
<version>3.1.1</version>
<version>3.1.2</version>
<configuration>
<skip>true</skip>
</configuration>
Expand Down
2 changes: 1 addition & 1 deletion examples/mqtt-quickstart/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-install-plugin</artifactId>
<version>3.1.1</version>
<version>3.1.2</version>
<configuration>
<skip>true</skip>
</configuration>
Expand Down
2 changes: 1 addition & 1 deletion examples/quickstart/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-install-plugin</artifactId>
<version>3.1.1</version>
<version>3.1.2</version>
<configuration>
<skip>true</skip>
</configuration>
Expand Down
2 changes: 1 addition & 1 deletion examples/rabbitmq-quickstart/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-install-plugin</artifactId>
<version>3.1.1</version>
<version>3.1.2</version>
<configuration>
<skip>true</skip>
</configuration>
Expand Down
4 changes: 2 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@
<artemis.version>2.33.0</artemis.version>
<commons-io.version>2.16.1</commons-io.version>

<jboss-log-manager.version>3.0.5.Final</jboss-log-manager.version>
<jboss-log-manager.version>3.0.6.Final</jboss-log-manager.version>

<kafka.version>3.7.0</kafka.version>

Expand Down Expand Up @@ -124,7 +124,7 @@
<revapi-reporter-text.version>0.15.0</revapi-reporter-text.version>
<revapi.skip>true</revapi.skip>

<jreleaser-maven-plugin.version>1.11.0</jreleaser-maven-plugin.version>
<jreleaser-maven-plugin.version>1.12.0</jreleaser-maven-plugin.version>
</properties>

<modules>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ public <K, V> Uni<Void> handle(IncomingKafkaRecord<K, V> record) {
map.put(key, new OffsetAndMetadata(record.getOffset() + 1, null));
consumer.commitAsync(map)
.subscribe().with(ignored -> {
}, throwable -> log.failedToCommitAsync(key, record.getOffset() + 1));
}, throwable -> log.failedToCommitAsync(key, record.getOffset() + 1, throwable));
}
});
return Uni.createFrom().voidItem().runSubscriptionOn(record::runOnMessageContext);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ void acknowledgementFromRevokedTopicPartition(long offset, TopicPartition topicP

@LogMessage(level = Logger.Level.WARN)
@Message(id = 18259, value = "Kafka latest commit strategy failed to commit record from topic-partition '%s' at offset %d")
void failedToCommitAsync(TopicPartition topicPartition, long offset);
void failedToCommitAsync(TopicPartition topicPartition, long offset, @Cause Throwable cause);

@LogMessage(level = Logger.Level.ERROR)
@Message(id = 18260, value = "Unable to recover from the serialization failure (topic: %s), configure a SerializationFailureHandler to recover from errors.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public class RabbitMQBrokerTestBase {
.withExposedPorts(5672, 15672)
.withNetworkAliases("rabbitmq")
.withNetwork(Network.SHARED)
.withLogConsumer(of -> LOGGER.info(of.getUtf8String()))
.withLogConsumer(of -> LOGGER.debug(of.getUtf8String()))
.waitingFor(Wait.forLogMessage(".*Server startup complete.*\\n", 1))
.withCopyFileToContainer(MountableFile.forClasspathResource("rabbitmq/enabled_plugins"),
"/etc/rabbitmq/enabled_plugins");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,13 @@ public class RabbitMQUsage {
public RabbitMQUsage(final Vertx vertx, final String host, final int port, final int managementPort, final String user,
final String pwd) {
this.managementPort = managementPort;
this.options = new RabbitMQOptions().setHost(host).setPort(port).setUser(user).setPassword(pwd);
this.options = new RabbitMQOptions()
.setHost(host)
.setPort(port)
.setUser(user)
.setPassword(pwd)
.setAutomaticRecoveryOnInitialConnection(false)
.setReconnectAttempts(1);
this.client = RabbitMQClient.create(new Vertx(vertx.getDelegate()), options);
}

Expand Down Expand Up @@ -97,23 +103,23 @@ public void produce(String exchange, String queue, String routingKey, int messag
client.startAndAwait();

Thread t = new Thread(() -> {
LOGGER.infof("Starting RabbitMQ sender to write %s messages with routing key %s", messageCount, routingKey);
LOGGER.debugf("Starting RabbitMQ sender to write %s messages with routing key %s", messageCount, routingKey);
try {
for (int i = 0; i != messageCount; ++i) {
Object payload = messageSupplier.get();
Buffer body = Buffer.buffer(payload.toString());
client.basicPublish(exchange, routingKey, properties, body)
.subscribe().with(
v -> {
LOGGER.infof("Producer sent message %s", payload);
LOGGER.debugf("Producer sent message %s", payload);
done.countDown();
},
Throwable::printStackTrace);
}
} catch (Exception e) {
LOGGER.error("Unable to send message", e);
}
LOGGER.infof("Finished sending %s messages with routing key %s", messageCount, routingKey);
LOGGER.debugf("Finished sending %s messages with routing key %s", messageCount, routingKey);
});

t.setName(exchange + "-thread");
Expand Down Expand Up @@ -146,7 +152,7 @@ public void consume(String exchange, String routingKey,
// Now set up a consumer
client.basicConsumerAndAwait(queue, new QueueOptions()).handler(
msg -> {
LOGGER.infof("Consumer %s: consuming message", exchange);
LOGGER.debugf("Consumer %s: consuming message", exchange);
consumerFunction.accept(msg);
});
}
Expand All @@ -155,23 +161,23 @@ public void consumeIntegers(String exchange, String routingKey, Consumer<Integer
final String queue = "tempConsumeIntegers";
// Start by the machinery to receive the messages
client.startAndAwait();
LOGGER.infof("RabbitMQ client now started");
LOGGER.debugf("RabbitMQ client now started");
client.exchangeDeclareAndAwait(exchange, "topic", false, true);
LOGGER.infof("RabbitMQ exchange declared %s", exchange);
LOGGER.debugf("RabbitMQ exchange declared %s", exchange);
client.queueDeclareAndAwait(queue, false, false, true);
LOGGER.infof("RabbitMQ queue declared %s", queue);
LOGGER.infof("About to bind RabbitMQ queue % to exchange %s via routing key %s", queue, exchange, routingKey);
LOGGER.debugf("RabbitMQ queue declared %s", queue);
LOGGER.debugf("About to bind RabbitMQ queue % to exchange %s via routing key %s", queue, exchange, routingKey);
client.queueBindAndAwait(queue, exchange, routingKey);
LOGGER.infof("RabbitMQ queue % bound to exchange %s via routing key %s", queue, exchange, routingKey);
LOGGER.debugf("RabbitMQ queue % bound to exchange %s via routing key %s", queue, exchange, routingKey);

// Now set up a consumer
client.basicConsumerAndAwait(queue, new QueueOptions()).handler(
msg -> {
final String payload = msg.body().toString();
LOGGER.infof("Consumer %s: consuming message %s", exchange, payload);
LOGGER.debugf("Consumer %s: consuming message %s", exchange, payload);
consumer.accept(Integer.parseInt(payload));
});
LOGGER.infof("Created consumer");
LOGGER.debugf("Created consumer");
}

public void close() {
Expand Down

0 comments on commit dd48f6a

Please sign in to comment.