Skip to content

Commit

Permalink
[Java] Derive control response stream id from clusterId/serviceId.
Browse files Browse the repository at this point in the history
  • Loading branch information
vyazelenko committed Oct 21, 2024
1 parent 11f4dcc commit 3f7c91b
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1961,7 +1961,8 @@ public void conclude()
.controlRequestChannel(AeronArchive.Configuration.localControlChannel())
.controlResponseChannel(AeronArchive.Configuration.localControlChannel())
.controlRequestStreamId(AeronArchive.Configuration.localControlStreamId())
.controlResponseStreamId(BitUtil.generateRandomisedId());
.controlResponseStreamId(
clusterId * 100 + 100 + AeronArchive.Configuration.controlResponseStreamId());
}

if (!archiveContext.controlRequestChannel().startsWith(CommonContext.IPC_CHANNEL))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -969,7 +969,8 @@ public void conclude()
.controlRequestChannel(AeronArchive.Configuration.localControlChannel())
.controlResponseChannel(AeronArchive.Configuration.localControlChannel())
.controlRequestStreamId(AeronArchive.Configuration.localControlStreamId())
.controlResponseStreamId(BitUtil.generateRandomisedId());
.controlResponseStreamId(
clusterId * 100 + 100 + AeronArchive.Configuration.controlResponseStreamId() + (serviceId + 1));
}

if (!archiveContext.controlRequestChannel().startsWith(CommonContext.IPC_CHANNEL))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -837,15 +837,18 @@ void shouldCreateArchiveContextUsingLocalChannelConfiguration()
}
}

@Test
void shouldCreateAliasForControlStreams()
@ParameterizedTest
@CsvSource({ "19,20", "0,222" })
void shouldCreateAliasForControlStreams(final int clusterId, final int controlResponseStreamId)
{
final String controlChannel = "aeron:ipc?term-length=64k";
final int localControlStreamId = 10;
System.setProperty(AeronArchive.Configuration.LOCAL_CONTROL_CHANNEL_PROP_NAME, controlChannel);
System.setProperty(
AeronArchive.Configuration.LOCAL_CONTROL_STREAM_ID_PROP_NAME, Integer.toString(localControlStreamId));
context.archiveContext(null).clusterId(19);
System.setProperty(
AeronArchive.Configuration.CONTROL_RESPONSE_STREAM_ID_PROP_NAME, Integer.toString(controlResponseStreamId));
context.archiveContext(null).clusterId(clusterId);
assertNull(context.archiveContext());

try
Expand All @@ -856,18 +859,19 @@ void shouldCreateAliasForControlStreams()
assertNotNull(archiveContext);
assertThat(
archiveContext.controlRequestChannel(),
Matchers.containsString("alias=cm-archive-ctrl-req-cluster-19"));
Matchers.containsString("alias=cm-archive-ctrl-req-cluster-" + clusterId));
assertThat(
archiveContext.controlResponseChannel(),
Matchers.containsString("alias=cm-archive-ctrl-resp-cluster-19"));
Matchers.containsString("alias=cm-archive-ctrl-resp-cluster-" + clusterId));
assertEquals(localControlStreamId, archiveContext.controlRequestStreamId());
assertNotEquals(localControlStreamId, archiveContext.controlResponseStreamId());
assertEquals(clusterId * 100 + 100 + controlResponseStreamId, archiveContext.controlResponseStreamId());
}
finally
{
CloseHelper.quietClose(context::close);
System.clearProperty(AeronArchive.Configuration.LOCAL_CONTROL_CHANNEL_PROP_NAME);
System.clearProperty(AeronArchive.Configuration.LOCAL_CONTROL_STREAM_ID_PROP_NAME);
System.clearProperty(AeronArchive.Configuration.CONTROL_RESPONSE_STREAM_ID_PROP_NAME);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -356,15 +356,18 @@ void shouldCreateArchiveContextUsingLocalChannelConfiguration()
}
}

@Test
void shouldCreateAliasForControlStreams()
@ParameterizedTest
@CsvSource({ "42,88", "0,19" })
void shouldCreateAliasForControlStreams(final int clusterId, final int controlResponseStreamId)
{
final String controlChannel = "aeron:ipc?term-length=64k";
final int localControlStreamId = 0;
System.setProperty(AeronArchive.Configuration.LOCAL_CONTROL_CHANNEL_PROP_NAME, controlChannel);
System.setProperty(
AeronArchive.Configuration.LOCAL_CONTROL_STREAM_ID_PROP_NAME, Integer.toString(localControlStreamId));
context.archiveContext(null).serviceId(5).clusterId(42);
System.setProperty(
AeronArchive.Configuration.CONTROL_RESPONSE_STREAM_ID_PROP_NAME, Integer.toString(controlResponseStreamId));
context.archiveContext(null).serviceId(5).clusterId(clusterId);
assertNull(context.archiveContext());

try
Expand All @@ -375,22 +378,24 @@ void shouldCreateAliasForControlStreams()
assertNotNull(archiveContext);
assertThat(
archiveContext.controlRequestChannel(),
Matchers.containsString("alias=sc-5-archive-ctrl-req-cluster-42"));
Matchers.containsString("alias=sc-5-archive-ctrl-req-cluster-" + clusterId));
assertThat(
archiveContext.controlResponseChannel(),
Matchers.containsString("alias=sc-5-archive-ctrl-resp-cluster-42"));
Matchers.containsString("alias=sc-5-archive-ctrl-resp-cluster-" + clusterId));
assertEquals(localControlStreamId, archiveContext.controlRequestStreamId());
assertNotEquals(localControlStreamId, archiveContext.controlResponseStreamId());
assertEquals(
clusterId * 100 + 100 + controlResponseStreamId + (context.serviceId() + 1),
archiveContext.controlResponseStreamId());
}
finally
{
CloseHelper.quietClose(context::close);
System.clearProperty(AeronArchive.Configuration.LOCAL_CONTROL_CHANNEL_PROP_NAME);
System.clearProperty(AeronArchive.Configuration.LOCAL_CONTROL_STREAM_ID_PROP_NAME);
System.clearProperty(AeronArchive.Configuration.CONTROL_RESPONSE_STREAM_ID_PROP_NAME);
}
}


@ParameterizedTest
@CsvSource({
"aeron:ipc,aeron:ipc?term-length=64k|mtu=8k," +
Expand Down

0 comments on commit 3f7c91b

Please sign in to comment.