diff --git a/aeron-client/src/main/java/io/aeron/Aeron.java b/aeron-client/src/main/java/io/aeron/Aeron.java index ce98431a3de..8ad4ffd51e4 100644 --- a/aeron-client/src/main/java/io/aeron/Aeron.java +++ b/aeron-client/src/main/java/io/aeron/Aeron.java @@ -880,7 +880,7 @@ public static class Context extends CommonContext private UnavailableImageHandler unavailableImageHandler; private AvailableCounterHandler availableCounterHandler; private UnavailableCounterHandler unavailableCounterHandler; - private ErrorFrameHandler errorFrameHandler = ErrorFrameHandler.NO_OP; + private PublicationErrorFrameHandler publicationErrorFrameHandler = PublicationErrorFrameHandler.NO_OP; private Runnable closeHandler; private long keepAliveIntervalNs = Configuration.KEEPALIVE_INTERVAL_NS; private long interServiceTimeoutNs = 0; @@ -1663,27 +1663,28 @@ public ThreadFactory threadFactory() } /** - * Set a handler to receive error frames that have been received by the local driver for resources owned by + * Set the handler to receive error frames that have been received by the local driver for publications added by * this client. * - * @param errorFrameHandler to be called back when an error frame is received. - * @return this for a fluent API. + * @param publicationErrorFrameHandler to be called back when an error frame is received. + * @return this for a fluent API. */ - public Context errorFrameHandler(final ErrorFrameHandler errorFrameHandler) + public Context publicationErrorFrameHandler( + final PublicationErrorFrameHandler publicationErrorFrameHandler) { - this.errorFrameHandler = errorFrameHandler; + this.publicationErrorFrameHandler = publicationErrorFrameHandler; return this; } /** - * Get the handler to receive error frames that have been received by the local driver for resources owned by + * Get the handler to receive error frames that have been received by the local driver for publications added by * this client. * - * @return the {@link ErrorFrameHandler} to call back on to. + * @return the {@link PublicationErrorFrameHandler} to call back on to. */ - public ErrorFrameHandler errorFrameHandler() + public PublicationErrorFrameHandler publicationErrorFrameHandler() { - return this.errorFrameHandler; + return this.publicationErrorFrameHandler; } /** diff --git a/aeron-client/src/main/java/io/aeron/ClientConductor.java b/aeron-client/src/main/java/io/aeron/ClientConductor.java index ca0a72890e8..cea10af7b78 100644 --- a/aeron-client/src/main/java/io/aeron/ClientConductor.java +++ b/aeron-client/src/main/java/io/aeron/ClientConductor.java @@ -30,6 +30,7 @@ import org.agrona.concurrent.status.CountersReader; import org.agrona.concurrent.status.UnsafeBufferPosition; +import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Lock; @@ -282,7 +283,7 @@ void onPublicationError(final PublicationErrorFrameFlyweight errorFrameFlyweight if (publication.originalRegistrationId() == errorFrameFlyweight.registrationId()) { publicationErrorFrame.set(errorFrameFlyweight); - ctx.errorFrameHandler().onPublicationError(publicationErrorFrame); + ctx.publicationErrorFrameHandler().onPublicationError(publicationErrorFrame); } } } diff --git a/aeron-client/src/main/java/io/aeron/DriverEventsAdapter.java b/aeron-client/src/main/java/io/aeron/DriverEventsAdapter.java index b4d88740771..a11693f55ce 100644 --- a/aeron-client/src/main/java/io/aeron/DriverEventsAdapter.java +++ b/aeron-client/src/main/java/io/aeron/DriverEventsAdapter.java @@ -255,6 +255,7 @@ else if (correlationId == activeCorrelationId) publicationErrorFrame.wrap(buffer, index); conductor.onPublicationError(publicationErrorFrame); + break; } } } diff --git a/aeron-client/src/main/java/io/aeron/ErrorFrameHandler.java b/aeron-client/src/main/java/io/aeron/ErrorFrameHandler.java deleted file mode 100644 index 5f5d49ac8c1..00000000000 --- a/aeron-client/src/main/java/io/aeron/ErrorFrameHandler.java +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Copyright 2014-2024 Real Logic Limited. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.aeron; - -import io.aeron.status.PublicationErrorFrame; - -/** - * Interface for handling various error frame messages from different components in the client. - */ -public interface ErrorFrameHandler -{ - ErrorFrameHandler NO_OP = new ErrorFrameHandler() - { - }; - - /** - * Called when an error frame received by the local driver is propagated to the clients. E.g. when an image is - * invalidated. This callback will reuse the {@link PublicationErrorFrame} instance, so data is only valid for the - * lifetime of the callback. If the user needs to pass the data onto another thread or hold in another location for - * use later, then the user needs to make use of the {@link PublicationErrorFrame#clone()} method to create a copy - * for their own use. - *
- * This callback will be executed on the client conductor thread, similar to image availability notifications. - * - * @param errorFrame contain the data from the error frame received by the publication. - */ - default void onPublicationError(final PublicationErrorFrame errorFrame) - { - } -} diff --git a/aeron-client/src/main/java/io/aeron/PublicationErrorFrameHandler.java b/aeron-client/src/main/java/io/aeron/PublicationErrorFrameHandler.java new file mode 100644 index 00000000000..f095b0cd85c --- /dev/null +++ b/aeron-client/src/main/java/io/aeron/PublicationErrorFrameHandler.java @@ -0,0 +1,47 @@ +/* + * Copyright 2014-2024 Real Logic Limited. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.aeron; + +import io.aeron.status.PublicationErrorFrame; + +/** + * Interface for handling various error frame messages for publications. + */ +public interface PublicationErrorFrameHandler +{ + PublicationErrorFrameHandler NO_OP = new PublicationErrorFrameHandler() + { + public void onPublicationError(final PublicationErrorFrame errorFrame) + { + } + }; + + /** + * Called when an error frame for a publication is received by the local driver and needs to be propagated to the + * appropriate clients. E.g. when an image is invalidated. This callback will reuse the {@link + * PublicationErrorFrame} instance, so data is only valid for the lifetime of the callback. If the user needs to + * pass the data onto another thread or hold in another location for use later, then the user needs to make use of + * the {@link PublicationErrorFrame#clone()} method to create a copy for their own use. + *
+ * This callback will be executed on the client conductor thread, similar to image availability notifications. + *
+ * This notification will only be propagated to clients that have added an instance of the Publication that received
+ * the error frame (i.e. the originalRegistrationId matches the registrationId on the error frame).
+ *
+ * @param errorFrame containing the relevant information about the publication and the error message.
+ */
+ void onPublicationError(PublicationErrorFrame errorFrame);
+}
diff --git a/aeron-client/src/main/java/io/aeron/status/PublicationErrorFrame.java b/aeron-client/src/main/java/io/aeron/status/PublicationErrorFrame.java
index fa01f6b0bb6..426789a5a04 100644
--- a/aeron-client/src/main/java/io/aeron/status/PublicationErrorFrame.java
+++ b/aeron-client/src/main/java/io/aeron/status/PublicationErrorFrame.java
@@ -15,6 +15,7 @@
*/
package io.aeron.status;
+import io.aeron.ErrorCode;
import io.aeron.command.PublicationErrorFrameFlyweight;
import java.net.InetSocketAddress;
diff --git a/aeron-system-tests/src/test/java/io/aeron/RejectImageTest.java b/aeron-system-tests/src/test/java/io/aeron/RejectImageTest.java
index 1240da66461..c6b736182ac 100644
--- a/aeron-system-tests/src/test/java/io/aeron/RejectImageTest.java
+++ b/aeron-system-tests/src/test/java/io/aeron/RejectImageTest.java
@@ -16,6 +16,7 @@
package io.aeron;
import io.aeron.driver.MediaDriver;
+import io.aeron.driver.PublicationImage;
import io.aeron.driver.ThreadingMode;
import io.aeron.exceptions.AeronException;
import io.aeron.status.PublicationErrorFrame;
@@ -47,6 +48,8 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.BiFunction;
+import java.util.function.Function;
import static io.aeron.driver.status.SystemCounterDescriptor.ERRORS;
import static io.aeron.driver.status.SystemCounterDescriptor.ERROR_FRAMES_RECEIVED;
@@ -65,6 +68,7 @@
public class RejectImageTest
{
public static final long A_VALUE_THAT_SHOWS_WE_ARENT_SPAMMING_ERROR_MESSAGES = 1000L;
+
@RegisterExtension
final SystemTestWatcher systemTestWatcher = new SystemTestWatcher();
@@ -89,7 +93,7 @@ private TestMediaDriver launch()
return driver;
}
- private static final class QueuedErrorFrameHandler implements ErrorFrameHandler
+ private static final class QueuedErrorFrameHandler implements PublicationErrorFrameHandler
{
private final OneToOneConcurrentArrayQueue