Skip to content

Commit

Permalink
[Java] Rename to explicitly reference that the handler is dealing spe…
Browse files Browse the repository at this point in the history
…cifically with error frames for publications. Add test to verify that it works for exclusive publications.
  • Loading branch information
mikeb01 committed Aug 1, 2024
1 parent a964ffd commit e5595ba
Show file tree
Hide file tree
Showing 7 changed files with 118 additions and 62 deletions.
21 changes: 11 additions & 10 deletions aeron-client/src/main/java/io/aeron/Aeron.java
Original file line number Diff line number Diff line change
Expand Up @@ -880,7 +880,7 @@ public static class Context extends CommonContext
private UnavailableImageHandler unavailableImageHandler;
private AvailableCounterHandler availableCounterHandler;
private UnavailableCounterHandler unavailableCounterHandler;
private ErrorFrameHandler errorFrameHandler = ErrorFrameHandler.NO_OP;
private PublicationErrorFrameHandler publicationErrorFrameHandler = PublicationErrorFrameHandler.NO_OP;
private Runnable closeHandler;
private long keepAliveIntervalNs = Configuration.KEEPALIVE_INTERVAL_NS;
private long interServiceTimeoutNs = 0;
Expand Down Expand Up @@ -1663,27 +1663,28 @@ public ThreadFactory threadFactory()
}

/**
* Set a handler to receive error frames that have been received by the local driver for resources owned by
* Set the handler to receive error frames that have been received by the local driver for publications added by
* this client.
*
* @param errorFrameHandler to be called back when an error frame is received.
* @return this for a fluent API.
* @param publicationErrorFrameHandler to be called back when an error frame is received.
* @return this for a fluent API.
*/
public Context errorFrameHandler(final ErrorFrameHandler errorFrameHandler)
public Context publicationErrorFrameHandler(
final PublicationErrorFrameHandler publicationErrorFrameHandler)
{
this.errorFrameHandler = errorFrameHandler;
this.publicationErrorFrameHandler = publicationErrorFrameHandler;
return this;
}

/**
* Get the handler to receive error frames that have been received by the local driver for resources owned by
* Get the handler to receive error frames that have been received by the local driver for publications added by
* this client.
*
* @return the {@link ErrorFrameHandler} to call back on to.
* @return the {@link PublicationErrorFrameHandler} to call back on to.
*/
public ErrorFrameHandler errorFrameHandler()
public PublicationErrorFrameHandler publicationErrorFrameHandler()
{
return this.errorFrameHandler;
return this.publicationErrorFrameHandler;
}

/**
Expand Down
3 changes: 2 additions & 1 deletion aeron-client/src/main/java/io/aeron/ClientConductor.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.agrona.concurrent.status.CountersReader;
import org.agrona.concurrent.status.UnsafeBufferPosition;

import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
Expand Down Expand Up @@ -282,7 +283,7 @@ void onPublicationError(final PublicationErrorFrameFlyweight errorFrameFlyweight
if (publication.originalRegistrationId() == errorFrameFlyweight.registrationId())
{
publicationErrorFrame.set(errorFrameFlyweight);
ctx.errorFrameHandler().onPublicationError(publicationErrorFrame);
ctx.publicationErrorFrameHandler().onPublicationError(publicationErrorFrame);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,7 @@ else if (correlationId == activeCorrelationId)
publicationErrorFrame.wrap(buffer, index);

conductor.onPublicationError(publicationErrorFrame);
break;
}
}
}
Expand Down
43 changes: 0 additions & 43 deletions aeron-client/src/main/java/io/aeron/ErrorFrameHandler.java

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Copyright 2014-2024 Real Logic Limited.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.aeron;

import io.aeron.status.PublicationErrorFrame;

/**
* Interface for handling various error frame messages for publications.
*/
public interface PublicationErrorFrameHandler
{
PublicationErrorFrameHandler NO_OP = new PublicationErrorFrameHandler()
{
public void onPublicationError(final PublicationErrorFrame errorFrame)
{
}
};

/**
* Called when an error frame for a publication is received by the local driver and needs to be propagated to the
* appropriate clients. E.g. when an image is invalidated. This callback will reuse the {@link
* PublicationErrorFrame} instance, so data is only valid for the lifetime of the callback. If the user needs to
* pass the data onto another thread or hold in another location for use later, then the user needs to make use of
* the {@link PublicationErrorFrame#clone()} method to create a copy for their own use.
* <p>
* This callback will be executed on the client conductor thread, similar to image availability notifications.
* <p>
* This notification will only be propagated to clients that have added an instance of the Publication that received
* the error frame (i.e. the originalRegistrationId matches the registrationId on the error frame).
*
* @param errorFrame containing the relevant information about the publication and the error message.
*/
void onPublicationError(PublicationErrorFrame errorFrame);
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package io.aeron.status;

import io.aeron.ErrorCode;
import io.aeron.command.PublicationErrorFrameFlyweight;

import java.net.InetSocketAddress;
Expand Down
64 changes: 56 additions & 8 deletions aeron-system-tests/src/test/java/io/aeron/RejectImageTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package io.aeron;

import io.aeron.driver.MediaDriver;
import io.aeron.driver.PublicationImage;
import io.aeron.driver.ThreadingMode;
import io.aeron.exceptions.AeronException;
import io.aeron.status.PublicationErrorFrame;
Expand Down Expand Up @@ -47,6 +48,8 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiFunction;
import java.util.function.Function;

import static io.aeron.driver.status.SystemCounterDescriptor.ERRORS;
import static io.aeron.driver.status.SystemCounterDescriptor.ERROR_FRAMES_RECEIVED;
Expand All @@ -65,6 +68,7 @@
public class RejectImageTest
{
public static final long A_VALUE_THAT_SHOWS_WE_ARENT_SPAMMING_ERROR_MESSAGES = 1000L;

@RegisterExtension
final SystemTestWatcher systemTestWatcher = new SystemTestWatcher();

Expand All @@ -89,7 +93,7 @@ private TestMediaDriver launch()
return driver;
}

private static final class QueuedErrorFrameHandler implements ErrorFrameHandler
private static final class QueuedErrorFrameHandler implements PublicationErrorFrameHandler
{
private final OneToOneConcurrentArrayQueue<PublicationErrorFrame> errorFrameQueue =
new OneToOneConcurrentArrayQueue<>(512);
Expand Down Expand Up @@ -117,7 +121,7 @@ void shouldRejectSubscriptionsImage() throws IOException

final Aeron.Context ctx = new Aeron.Context()
.aeronDirectoryName(driver.aeronDirectoryName())
.errorFrameHandler(errorFrameHandler);
.publicationErrorFrameHandler(errorFrameHandler);

final AtomicBoolean imageUnavailable = new AtomicBoolean(false);

Expand Down Expand Up @@ -210,11 +214,11 @@ void shouldOnlyReceivePublicationErrorFrameOnRelevantClient() throws IOException

final Aeron.Context ctx1 = new Aeron.Context()
.aeronDirectoryName(driver.aeronDirectoryName())
.errorFrameHandler(errorFrameHandler1);
.publicationErrorFrameHandler(errorFrameHandler1);

final Aeron.Context ctx2 = new Aeron.Context()
.aeronDirectoryName(driver.aeronDirectoryName())
.errorFrameHandler(errorFrameHandler2);
.publicationErrorFrameHandler(errorFrameHandler2);

try (Aeron aeron1 = Aeron.connect(ctx1);
Aeron aeron2 = Aeron.connect(ctx2);
Expand Down Expand Up @@ -253,7 +257,6 @@ void shouldOnlyReceivePublicationErrorFrameOnRelevantClient() throws IOException
}
}


@Test
@InterruptAfter(10)
@SlowTest
Expand All @@ -267,11 +270,11 @@ void shouldReceivePublicationErrorFramesAllRelevantClients() throws IOException

final Aeron.Context ctx1 = new Aeron.Context()
.aeronDirectoryName(driver.aeronDirectoryName())
.errorFrameHandler(errorFrameHandler1);
.publicationErrorFrameHandler(errorFrameHandler1);

final Aeron.Context ctx2 = new Aeron.Context()
.aeronDirectoryName(driver.aeronDirectoryName())
.errorFrameHandler(errorFrameHandler2);
.publicationErrorFrameHandler(errorFrameHandler2);

try (Aeron aeron1 = Aeron.connect(ctx1);
Aeron aeron2 = Aeron.connect(ctx2);
Expand Down Expand Up @@ -431,7 +434,7 @@ void shouldReturnAllParametersToApi(final String addressStr) throws UnknownHostE

final Aeron.Context ctx = new Aeron.Context()
.aeronDirectoryName(driver.aeronDirectoryName())
.errorFrameHandler(errorFrameHandler);
.publicationErrorFrameHandler(errorFrameHandler);

final long groupTag = 1001;
final int port = 10001;
Expand Down Expand Up @@ -478,4 +481,49 @@ void shouldReturnAllParametersToApi(final String addressStr) throws UnknownHostE
assertEquals(new InetSocketAddress(addressStr, port), errorFrame.sourceAddress());
}
}

@ParameterizedTest
@InterruptAfter(5)
@ValueSource(booleans = { true, false })
void shouldOnlyReceivePublicationErrorFrames(final boolean isExclusive) throws IOException
{
context.imageLivenessTimeoutNs(TimeUnit.SECONDS.toNanos(3));

final TestMediaDriver driver = launch();
final QueuedErrorFrameHandler errorFrameHandler = new QueuedErrorFrameHandler();

final Aeron.Context ctx = new Aeron.Context()
.aeronDirectoryName(driver.aeronDirectoryName())
.publicationErrorFrameHandler(errorFrameHandler);

final Function<Aeron, ? extends Publication> addPub = (aeron) -> isExclusive ?
aeron.addExclusivePublication(channel, streamId) : aeron.addPublication(channel, streamId);

try (Aeron aeron = Aeron.connect(ctx);
Publication pub = addPub.apply(aeron);
Subscription sub = aeron.addSubscription(channel, streamId))
{
Tests.awaitConnected(pub);
Tests.awaitConnected(sub);

while (pub.offer(message) < 0)
{
Tests.yield();
}

while (0 == sub.poll((buffer, offset, length, header) -> {}, 1))
{
Tests.yield();
}

final Image image = sub.imageAtIndex(0);
final String reason = "Needs to be closed";
image.reject(reason);

while (null == errorFrameHandler.poll())
{
Tests.yield();
}
}
}
}

0 comments on commit e5595ba

Please sign in to comment.