Skip to content

Commit

Permalink
Tethering join position (#1672)
Browse files Browse the repository at this point in the history
* [Java] Fix java tethering join position

* [C] Align c with java publication image tethering join position

* [Java] Fix join position of tethered subscriptions

* [C] Align C tethering with java

* [Java] Add position check to untethered system test

* [Java] checkstyle
  • Loading branch information
marc-adaptive authored Oct 23, 2024
1 parent 2b4e27e commit 297f69c
Show file tree
Hide file tree
Showing 6 changed files with 16 additions and 5 deletions.
3 changes: 2 additions & 1 deletion aeron-driver/src/main/c/aeron_ipc_publication.c
Original file line number Diff line number Diff line change
Expand Up @@ -372,7 +372,8 @@ void aeron_ipc_publication_check_untethered_subscriptions(
case AERON_SUBSCRIPTION_TETHER_RESTING:
if (now_ns > (tetherable_position->time_of_last_update_ns + resting_timeout_ns))
{
aeron_counter_set_ordered(tetherable_position->value_addr, consumer_position);
int64_t join_position = aeron_ipc_publication_join_position(publication);
aeron_counter_set_ordered(tetherable_position->value_addr, join_position);
aeron_driver_conductor_on_available_image(
conductor,
publication->conductor_fields.managed_resource.registration_id,
Expand Down
3 changes: 2 additions & 1 deletion aeron-driver/src/main/c/aeron_publication_image.c
Original file line number Diff line number Diff line change
Expand Up @@ -1048,7 +1048,8 @@ void aeron_publication_image_check_untethered_subscriptions(
case AERON_SUBSCRIPTION_TETHER_RESTING:
if (now_ns > (tetherable_position->time_of_last_update_ns + resting_timeout_ns))
{
aeron_counter_set_ordered(tetherable_position->value_addr, *image->rcv_pos_position.value_addr);
int64_t join_position = aeron_publication_image_join_position(image);
aeron_counter_set_ordered(tetherable_position->value_addr, join_position);
aeron_driver_conductor_on_available_image(
conductor,
image->conductor_fields.managed_resource.registration_id,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -455,13 +455,14 @@ else if (UntetheredSubscription.State.RESTING == untethered.state)
{
if ((untethered.timeOfLastUpdateNs + untetheredRestingTimeoutNs) - nowNs <= 0)
{
final long joinPosition = joinPosition();
subscriberPositions = ArrayUtil.add(subscriberPositions, untethered.position);
conductor.notifyAvailableImageLink(
registrationId,
sessionId,
untethered.subscriptionLink,
untethered.position.id(),
joinPosition(),
joinPosition,
rawLog.fileName(),
CommonContext.IPC_CHANNEL);
untethered.state(UntetheredSubscription.State.ACTIVE, nowNs, streamId, sessionId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1101,13 +1101,14 @@ else if (UntetheredSubscription.State.RESTING == untethered.state)
{
if ((untethered.timeOfLastUpdateNs + untetheredRestingTimeoutNs) - nowNs <= 0)
{
final long joinPosition = joinPosition();
subscriberPositions = ArrayUtil.add(subscriberPositions, untethered.position);
conductor.notifyAvailableImageLink(
correlationId,
sessionId,
untethered.subscriptionLink,
untethered.position.id(),
joinPosition(),
joinPosition,
rawLog.fileName(),
sourceIdentity);
untethered.state(UntetheredSubscription.State.ACTIVE, nowNs, streamId, sessionId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ void shouldLifeCycleTimeoutsAndRelink()
eq(SESSION_ID),
eq(untetheredLink),
anyInt(),
eq(ipcPublication.joinPosition()),
eq(tetheredPosition.get()),
eq(rawLog.fileName()),
eq(CommonContext.IPC_CHANNEL));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import java.util.concurrent.atomic.AtomicInteger;

import static java.util.Arrays.asList;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;

Expand Down Expand Up @@ -218,6 +219,12 @@ void shouldRejoinAfterResting(final String channel)
aeron.conductorAgentInvoker().invoke();
}

if (!channel.startsWith("aeron-spy"))
{
final long tetheredPosition = tetheredSub.imageAtIndex(0).position();
final long untetheredJoinPosition = untetheredSub.imageAtIndex(0).joinPosition();
assertEquals(tetheredPosition, untetheredJoinPosition);
}
return;
}
}
Expand Down

0 comments on commit 297f69c

Please sign in to comment.