diff --git a/aeron-agent/src/main/java/io/aeron/agent/CmdInterceptor.java b/aeron-agent/src/main/java/io/aeron/agent/CmdInterceptor.java index 99c48c0a5d7..5b6efe7c64e 100644 --- a/aeron-agent/src/main/java/io/aeron/agent/CmdInterceptor.java +++ b/aeron-agent/src/main/java/io/aeron/agent/CmdInterceptor.java @@ -51,7 +51,7 @@ class CmdInterceptor CMD_IN_REMOVE_RCV_DESTINATION, CMD_OUT_ON_CLIENT_TIMEOUT, CMD_IN_TERMINATE_DRIVER, - CMD_IN_INVALIDATE_IMAGE); + CMD_IN_REJECT_IMAGE); @SuppressWarnings("checkstyle:methodlength") @Advice.OnMethodEnter @@ -155,8 +155,8 @@ static void logCmd(final int msgTypeId, final DirectBuffer buffer, final int ind LOGGER.log(CMD_IN_TERMINATE_DRIVER, buffer, index, length); break; - case INVALIDATE_IMAGE: - LOGGER.log(CMD_IN_INVALIDATE_IMAGE, buffer, index, length); + case REJECT_IMAGE: + LOGGER.log(CMD_IN_REJECT_IMAGE, buffer, index, length); break; } } diff --git a/aeron-agent/src/main/java/io/aeron/agent/DriverEventCode.java b/aeron-agent/src/main/java/io/aeron/agent/DriverEventCode.java index 0238a838b2c..aa6dd89cae5 100644 --- a/aeron-agent/src/main/java/io/aeron/agent/DriverEventCode.java +++ b/aeron-agent/src/main/java/io/aeron/agent/DriverEventCode.java @@ -99,7 +99,7 @@ public enum DriverEventCode implements EventCode RESEND(55, (code, buffer, offset, builder) -> DriverEventDissector.dissectResend(buffer, offset, builder)), - CMD_IN_INVALIDATE_IMAGE(56, DriverEventDissector::dissectCommand); + CMD_IN_REJECT_IMAGE(56, DriverEventDissector::dissectCommand); static final int EVENT_CODE_TYPE = EventCodeType.DRIVER.getTypeCode(); diff --git a/aeron-agent/src/main/java/io/aeron/agent/DriverEventDissector.java b/aeron-agent/src/main/java/io/aeron/agent/DriverEventDissector.java index 0e0d7502389..f7950d4d0a0 100644 --- a/aeron-agent/src/main/java/io/aeron/agent/DriverEventDissector.java +++ b/aeron-agent/src/main/java/io/aeron/agent/DriverEventDissector.java @@ -55,7 +55,7 @@ final class DriverEventDissector private static final SubscriptionReadyFlyweight SUBSCRIPTION_READY = new SubscriptionReadyFlyweight(); private static final ClientTimeoutFlyweight CLIENT_TIMEOUT = new ClientTimeoutFlyweight(); private static final TerminateDriverFlyweight TERMINATE_DRIVER = new TerminateDriverFlyweight(); - private static final InvalidateImageFlyweight INVALIDATE_IMAGE = new InvalidateImageFlyweight(); + private static final RejectImageFlyweight REJECT_IMAGE = new RejectImageFlyweight(); static final String CONTEXT = "DRIVER"; @@ -215,9 +215,9 @@ static void dissectCommand( dissectTerminateDriver(builder); break; - case CMD_IN_INVALIDATE_IMAGE: - INVALIDATE_IMAGE.wrap(buffer, offset + encodedLength); - dissectInvalidateImage(builder); + case CMD_IN_REJECT_IMAGE: + REJECT_IMAGE.wrap(buffer, offset + encodedLength); + dissectRejectImage(builder); break; default: @@ -773,13 +773,13 @@ private static void dissectTerminateDriver(final StringBuilder builder) .append(" tokenBufferLength=").append(TERMINATE_DRIVER.tokenBufferLength()); } - private static void dissectInvalidateImage(final StringBuilder builder) + private static void dissectRejectImage(final StringBuilder builder) { builder - .append("clientId=").append(INVALIDATE_IMAGE.clientId()) - .append(" correlationId=").append(INVALIDATE_IMAGE.correlationId()) - .append(" imageCorrelationId=").append(INVALIDATE_IMAGE.imageCorrelationId()) - .append(" position=").append(INVALIDATE_IMAGE.position()) - .append(" reason=").append(INVALIDATE_IMAGE.reason()); + .append("clientId=").append(REJECT_IMAGE.clientId()) + .append(" correlationId=").append(REJECT_IMAGE.correlationId()) + .append(" imageCorrelationId=").append(REJECT_IMAGE.imageCorrelationId()) + .append(" position=").append(REJECT_IMAGE.position()) + .append(" reason=").append(REJECT_IMAGE.reason()); } } diff --git a/aeron-client/src/main/java/io/aeron/ClientConductor.java b/aeron-client/src/main/java/io/aeron/ClientConductor.java index 8aa597b3dd7..ca0a72890e8 100644 --- a/aeron-client/src/main/java/io/aeron/ClientConductor.java +++ b/aeron-client/src/main/java/io/aeron/ClientConductor.java @@ -1317,7 +1317,7 @@ void closeImages(final Image[] images, final UnavailableImageHandler unavailable } } - void invalidateImage(final long correlationId, final long position, final String reason) + void rejectImage(final long correlationId, final long position, final String reason) { clientLock.lock(); try @@ -1327,7 +1327,7 @@ void invalidateImage(final long correlationId, final long position, final String // TODO, check reason length?? - final long registrationId = driverProxy.invalidateImage(correlationId, position, reason); + final long registrationId = driverProxy.rejectImage(correlationId, position, reason); awaitResponse(registrationId); } finally diff --git a/aeron-client/src/main/java/io/aeron/DriverProxy.java b/aeron-client/src/main/java/io/aeron/DriverProxy.java index 52f5249ec6c..2efa751be36 100644 --- a/aeron-client/src/main/java/io/aeron/DriverProxy.java +++ b/aeron-client/src/main/java/io/aeron/DriverProxy.java @@ -37,7 +37,7 @@ public final class DriverProxy private final RemoveMessageFlyweight removeMessage = new RemoveMessageFlyweight(); private final DestinationMessageFlyweight destinationMessage = new DestinationMessageFlyweight(); private final CounterMessageFlyweight counterMessage = new CounterMessageFlyweight(); - private final InvalidateImageFlyweight invalidateImage = new InvalidateImageFlyweight(); + private final RejectImageFlyweight rejectImage = new RejectImageFlyweight(); private final RingBuffer toDriverCommandBuffer; /** @@ -462,29 +462,29 @@ public boolean terminateDriver(final DirectBuffer tokenBuffer, final int tokenOf } /** - * Invalidate a specific image. + * Reject a specific image. * * @param imageCorrelationId of the image to be invalidated * @param position of the image when invalidation occurred * @param reason user supplied reason for invalidation, reported back to publication * @return the correlationId of the request for invalidation. */ - public long invalidateImage( + public long rejectImage( final long imageCorrelationId, final long position, final String reason) { - final int length = InvalidateImageFlyweight.computeLength(reason); - final int index = toDriverCommandBuffer.tryClaim(INVALIDATE_IMAGE, length); + final int length = RejectImageFlyweight.computeLength(reason); + final int index = toDriverCommandBuffer.tryClaim(REJECT_IMAGE, length); if (index < 0) { - throw new AeronException("could not write invalidate image command"); + throw new AeronException("could not write reject image command"); } final long correlationId = toDriverCommandBuffer.nextCorrelationId(); - invalidateImage + rejectImage .wrap(toDriverCommandBuffer.buffer(), index) .clientId(clientId) .correlationId(correlationId) diff --git a/aeron-client/src/main/java/io/aeron/Image.java b/aeron-client/src/main/java/io/aeron/Image.java index 2909b637c3e..14366f3fe04 100644 --- a/aeron-client/src/main/java/io/aeron/Image.java +++ b/aeron-client/src/main/java/io/aeron/Image.java @@ -793,9 +793,9 @@ public int rawPoll(final RawBlockHandler handler, final int blockLengthLimit) * * @param reason an error message to be forwarded back to the publication. */ - public void invalidate(final String reason) + public void reject(final String reason) { - subscription.invalidate(correlationId, position(), reason); + subscription.rejectImage(correlationId, position(), reason); } private UnsafeBuffer activeTermBuffer(final long position) diff --git a/aeron-client/src/main/java/io/aeron/Subscription.java b/aeron-client/src/main/java/io/aeron/Subscription.java index e6c6f733853..80f8849b3b9 100644 --- a/aeron-client/src/main/java/io/aeron/Subscription.java +++ b/aeron-client/src/main/java/io/aeron/Subscription.java @@ -620,9 +620,9 @@ Image removeImage(final long correlationId) return removedImage; } - void invalidate(final long correlationId, final long position, final String reason) + void rejectImage(final long correlationId, final long position, final String reason) { - conductor.invalidateImage(correlationId, position, reason); + conductor.rejectImage(correlationId, position, reason); } /** diff --git a/aeron-client/src/main/java/io/aeron/command/ControlProtocolEvents.java b/aeron-client/src/main/java/io/aeron/command/ControlProtocolEvents.java index 5af99456b08..8314d62a61e 100644 --- a/aeron-client/src/main/java/io/aeron/command/ControlProtocolEvents.java +++ b/aeron-client/src/main/java/io/aeron/command/ControlProtocolEvents.java @@ -95,7 +95,7 @@ public class ControlProtocolEvents /** * Invalidate an image. */ - public static final int INVALIDATE_IMAGE = 0x0F; + public static final int REJECT_IMAGE = 0x0F; // Media Driver to Clients diff --git a/aeron-client/src/main/java/io/aeron/command/InvalidateImageFlyweight.java b/aeron-client/src/main/java/io/aeron/command/RejectImageFlyweight.java similarity index 92% rename from aeron-client/src/main/java/io/aeron/command/InvalidateImageFlyweight.java rename to aeron-client/src/main/java/io/aeron/command/RejectImageFlyweight.java index 356e8077b26..8af72f14a37 100644 --- a/aeron-client/src/main/java/io/aeron/command/InvalidateImageFlyweight.java +++ b/aeron-client/src/main/java/io/aeron/command/RejectImageFlyweight.java @@ -48,7 +48,7 @@ * +---------------------------------------------------------------+ * */ -public class InvalidateImageFlyweight extends CorrelatedMessageFlyweight +public class RejectImageFlyweight extends CorrelatedMessageFlyweight { private static final int IMAGE_CORRELATION_ID_FIELD_OFFSET = CORRELATION_ID_FIELD_OFFSET + SIZE_OF_LONG; private static final int POSITION_FIELD_OFFSET = IMAGE_CORRELATION_ID_FIELD_OFFSET + SIZE_OF_LONG; @@ -62,7 +62,7 @@ public class InvalidateImageFlyweight extends CorrelatedMessageFlyweight * @param offset at which the message begins. * @return this for a fluent API. */ - public InvalidateImageFlyweight wrap(final MutableDirectBuffer buffer, final int offset) + public RejectImageFlyweight wrap(final MutableDirectBuffer buffer, final int offset) { super.wrap(buffer, offset); return this; @@ -84,7 +84,7 @@ public long imageCorrelationId() * @param position new image correlation id value. * @return this for a fluent API. */ - public InvalidateImageFlyweight imageCorrelationId(final long position) + public RejectImageFlyweight imageCorrelationId(final long position) { buffer.putLong(offset + IMAGE_CORRELATION_ID_FIELD_OFFSET, position); return this; @@ -106,7 +106,7 @@ public long position() * @param position new position value. * @return this for a fluent API. */ - public InvalidateImageFlyweight position(final long position) + public RejectImageFlyweight position(final long position) { buffer.putLong(offset + POSITION_FIELD_OFFSET, position); return this; @@ -118,7 +118,7 @@ public InvalidateImageFlyweight position(final long position) * @param reason for invalidating the image. * @return this for a fluent API. */ - public InvalidateImageFlyweight reason(final String reason) + public RejectImageFlyweight reason(final String reason) { buffer.putStringAscii(offset + REASON_FIELD_OFFSET, reason); return this; @@ -149,7 +149,7 @@ public int reasonBufferLength() /** * {@inheritDoc} */ - public InvalidateImageFlyweight clientId(final long clientId) + public RejectImageFlyweight clientId(final long clientId) { super.clientId(clientId); return this; @@ -158,7 +158,7 @@ public InvalidateImageFlyweight clientId(final long clientId) /** * {@inheritDoc} */ - public InvalidateImageFlyweight correlationId(final long correlationId) + public RejectImageFlyweight correlationId(final long correlationId) { super.correlationId(correlationId); return this; diff --git a/aeron-client/src/test/java/io/aeron/ImageTest.java b/aeron-client/src/test/java/io/aeron/ImageTest.java index 96b7e7764a7..295ff735834 100644 --- a/aeron-client/src/test/java/io/aeron/ImageTest.java +++ b/aeron-client/src/test/java/io/aeron/ImageTest.java @@ -611,7 +611,7 @@ void shouldPollFragmentsToBoundedFragmentHandlerWithMaxPositionAboveIntMaxValue( } @Test - void shouldInvalidateFragment() + void shouldRejectFragment() { final int initialOffset = TERM_BUFFER_LENGTH - (ALIGNED_FRAME_LENGTH * 2); final long initialPosition = computePosition( @@ -625,9 +625,9 @@ void shouldInvalidateFragment() assertEquals(initialPosition, image.position()); final String reason = "this is garbage"; - image.invalidate(reason); + image.reject(reason); - verify(subscription).invalidate(image.correlationId(), image.position(), reason); + verify(subscription).rejectImage(image.correlationId(), image.position(), reason); // final int fragmentsRead = image.boundedPoll( // mockFragmentHandler, maxPosition, Integer.MAX_VALUE); diff --git a/aeron-driver/src/main/java/io/aeron/driver/ClientCommandAdapter.java b/aeron-driver/src/main/java/io/aeron/driver/ClientCommandAdapter.java index 9a21e9f607f..ca094afd91f 100644 --- a/aeron-driver/src/main/java/io/aeron/driver/ClientCommandAdapter.java +++ b/aeron-driver/src/main/java/io/aeron/driver/ClientCommandAdapter.java @@ -43,7 +43,7 @@ final class ClientCommandAdapter implements ControlledMessageHandler private final DestinationMessageFlyweight destinationMsgFlyweight = new DestinationMessageFlyweight(); private final CounterMessageFlyweight counterMsgFlyweight = new CounterMessageFlyweight(); private final TerminateDriverFlyweight terminateDriverFlyweight = new TerminateDriverFlyweight(); - private final InvalidateImageFlyweight invalidateImageFlyweight = new InvalidateImageFlyweight(); + private final RejectImageFlyweight rejectImageFlyweight = new RejectImageFlyweight(); private final DriverConductor conductor; private final RingBuffer toDriverCommands; private final ClientProxy clientProxy; @@ -264,17 +264,17 @@ else if (channel.startsWith(SPY_QUALIFIER)) break; } - case INVALIDATE_IMAGE: + case REJECT_IMAGE: { - invalidateImageFlyweight.wrap(buffer, index); - invalidateImageFlyweight.validateLength(msgTypeId, length); - correlationId = invalidateImageFlyweight.correlationId(); - - conductor.onInvalidateImage( - invalidateImageFlyweight.correlationId(), - invalidateImageFlyweight.imageCorrelationId(), - invalidateImageFlyweight.position(), - invalidateImageFlyweight.reason()); + rejectImageFlyweight.wrap(buffer, index); + rejectImageFlyweight.validateLength(msgTypeId, length); + correlationId = rejectImageFlyweight.correlationId(); + + conductor.onRejectImage( + rejectImageFlyweight.correlationId(), + rejectImageFlyweight.imageCorrelationId(), + rejectImageFlyweight.position(), + rejectImageFlyweight.reason()); break; } diff --git a/aeron-driver/src/main/java/io/aeron/driver/DriverConductor.java b/aeron-driver/src/main/java/io/aeron/driver/DriverConductor.java index 41e4ecd5977..d64ac7c55da 100644 --- a/aeron-driver/src/main/java/io/aeron/driver/DriverConductor.java +++ b/aeron-driver/src/main/java/io/aeron/driver/DriverConductor.java @@ -1418,7 +1418,7 @@ void onTerminateDriver(final DirectBuffer tokenBuffer, final int tokenOffset, fi } } - void onInvalidateImage( + void onRejectImage( final long correlationId, final long imageCorrelationId, final long position, @@ -1437,7 +1437,7 @@ void onInvalidateImage( GENERIC_ERROR, "Unable to resolve image for correlationId=" + imageCorrelationId); } - receiverProxy.invalidateImage(imageCorrelationId, position, reason); + receiverProxy.rejectImage(imageCorrelationId, position, reason); clientProxy.operationSucceeded(correlationId); } diff --git a/aeron-driver/src/main/java/io/aeron/driver/PublicationImage.java b/aeron-driver/src/main/java/io/aeron/driver/PublicationImage.java index a4511cbe19b..6a71b39f95c 100644 --- a/aeron-driver/src/main/java/io/aeron/driver/PublicationImage.java +++ b/aeron-driver/src/main/java/io/aeron/driver/PublicationImage.java @@ -88,7 +88,7 @@ class PublicationImageReceiverFields extends PublicationImagePadding2 boolean isSendingEosSm = false; long timeOfLastPacketNs; ImageConnection[] imageConnections = new ImageConnection[1]; - String invalidationReason = null; + String rejectionReason = null; } class PublicationImagePadding3 extends PublicationImageReceiverFields @@ -590,7 +590,7 @@ int insertPacket( { final boolean isEndOfStream = DataHeaderFlyweight.isEndOfStream(buffer); - if (null != invalidationReason) + if (null != rejectionReason) { if (isEndOfStream) { @@ -707,12 +707,12 @@ int sendPendingStatusMessage(final long nowNs) final long changeNumber = endSmChange; final boolean hasSmTimedOut = (timeOfLastSmNs + smTimeoutNs) - nowNs < 0; - if (null != invalidationReason) + if (null != rejectionReason) { if (hasSmTimedOut) { channelEndpoint.sendErrorFrame( - imageConnections, sessionId, streamId, GENERIC_ERROR.value(), invalidationReason); + imageConnections, sessionId, streamId, GENERIC_ERROR.value(), rejectionReason); timeOfLastSmNs = nowNs; workCount++; @@ -918,9 +918,9 @@ public boolean hasReachedEndOfLife() return hasReceiverReleased && State.DONE == state; } - void invalidate(final String reason) + void reject(final String reason) { - invalidationReason = reason; + rejectionReason = reason; } private void state(final State state) diff --git a/aeron-driver/src/main/java/io/aeron/driver/Receiver.java b/aeron-driver/src/main/java/io/aeron/driver/Receiver.java index 831641623b4..d4901b90e86 100644 --- a/aeron-driver/src/main/java/io/aeron/driver/Receiver.java +++ b/aeron-driver/src/main/java/io/aeron/driver/Receiver.java @@ -334,13 +334,13 @@ void onResolutionChange( channelEndpoint.updateControlAddress(transportIndex, newAddress); } - void onInvalidateImage(final long imageCorrelationId, final long position, final String reason) + void onRejectImage(final long imageCorrelationId, final long position, final String reason) { for (final PublicationImage image : publicationImages) { if (imageCorrelationId == image.correlationId()) { - image.invalidate(reason); + image.reject(reason); break; } } diff --git a/aeron-driver/src/main/java/io/aeron/driver/ReceiverProxy.java b/aeron-driver/src/main/java/io/aeron/driver/ReceiverProxy.java index 58aed30ecf7..486ada901ca 100644 --- a/aeron-driver/src/main/java/io/aeron/driver/ReceiverProxy.java +++ b/aeron-driver/src/main/java/io/aeron/driver/ReceiverProxy.java @@ -196,15 +196,15 @@ void requestSetup( } } - void invalidateImage(final long imageCorrelationId, final long position, final String reason) + void rejectImage(final long imageCorrelationId, final long position, final String reason) { if (notConcurrent()) { - receiver.onInvalidateImage(imageCorrelationId, position, reason); + receiver.onRejectImage(imageCorrelationId, position, reason); } else { - offer(() -> receiver.onInvalidateImage(imageCorrelationId, position, reason)); + offer(() -> receiver.onRejectImage(imageCorrelationId, position, reason)); } } } diff --git a/aeron-system-tests/src/test/java/io/aeron/ImageInvalidationTest.java b/aeron-system-tests/src/test/java/io/aeron/ImageInvalidationTest.java index f8ab1ec5438..6a38d660ddf 100644 --- a/aeron-system-tests/src/test/java/io/aeron/ImageInvalidationTest.java +++ b/aeron-system-tests/src/test/java/io/aeron/ImageInvalidationTest.java @@ -107,7 +107,7 @@ PublicationErrorFrame poll() @Test @InterruptAfter(10) @SlowTest - void shouldInvalidateSubscriptionsImage() throws IOException + void shouldRejectSubscriptionsImage() throws IOException { context.imageLivenessTimeoutNs(TimeUnit.SECONDS.toNanos(3)); @@ -147,7 +147,7 @@ void shouldInvalidateSubscriptionsImage() throws IOException assertEquals(pub.position(), image.position()); final String reason = "Needs to be closed"; - image.invalidate(reason); + image.reject(reason); final long t0 = System.nanoTime(); while (pub.isConnected()) @@ -234,7 +234,7 @@ void shouldOnlyReceivePublicationErrorFrameOnRelevantClient() throws IOException final Image image = sub.imageAtIndex(0); final String reason = "Needs to be closed"; - image.invalidate(reason); + image.reject(reason); while (null == errorFrameHandler1.poll()) { @@ -293,7 +293,7 @@ void shouldReceivePublicationErrorFramesAllRelevantClients() throws IOException final Image image = sub.imageAtIndex(0); final String reason = "Needs to be closed"; - image.invalidate(reason); + image.reject(reason); while (null == errorFrameHandler1.poll()) { @@ -310,7 +310,7 @@ void shouldReceivePublicationErrorFramesAllRelevantClients() throws IOException @Test @InterruptAfter(10) @SlowTest - void shouldInvalidateSubscriptionsImageManualMdc() + void shouldRejectSubscriptionsImageManualMdc() { context.imageLivenessTimeoutNs(TimeUnit.SECONDS.toNanos(3)); @@ -355,7 +355,7 @@ void shouldInvalidateSubscriptionsImageManualMdc() final int initialAvailable = imageAvailable.get(); final String reason = "Needs to be closed"; - image.invalidate(reason); + image.reject(reason); final long t0 = System.nanoTime(); while (pub.isConnected()) @@ -409,7 +409,7 @@ void shouldRejectInvalidationReasonThatIsTooLong() Tests.awaitConnected(pub); Tests.awaitConnected(sub); - assertThrows(AeronException.class, () -> sub.imageAtIndex(0).invalidate(tooLongReason)); + assertThrows(AeronException.class, () -> sub.imageAtIndex(0).reject(tooLongReason)); } } @@ -458,7 +458,7 @@ void shouldReturnAllParametersToApi(final String addressStr) throws UnknownHostE assertEquals(pub.position(), image.position()); final String reason = "Needs to be closed"; - image.invalidate(reason); + image.reject(reason); PublicationErrorFrame errorFrame; while (null == (errorFrame = errorFrameHandler.poll()))