-
Notifications
You must be signed in to change notification settings - Fork 899
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[WIP] LogBuffer metadata store channelUri #1697
[WIP] LogBuffer metadata store channelUri #1697
Conversation
aeron-client/src/main/java/io/aeron/ChannelUriStringBuilder.java
Outdated
Show resolved
Hide resolved
/** | ||
* The maximum byte of the channel-uri stores as an ascii string (excluding the length). | ||
*/ | ||
public static final int LOG_CHANNEL_URI_MAX_LENGTH = 4096 ; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There will be ChannelUri.MAX_URI_LENGTH
that is set to 4095
, i.e. we don't need log buffer specific limit.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
4095 or 4096? I guess the latter.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess for /0 termination of C based strings.
|
||
LOG_META_DATA_LENGTH = align(LOG_CHANNEL_URI_OFFSET + SIZE_OF_INT + LOG_CHANNEL_URI_MAX_LENGTH, PAGE_MIN_SIZE); | ||
|
||
System.out.println("offset:"+offset); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Debug log.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All the system.outs will removed in the final version of this PR.
@@ -1764,15 +1819,66 @@ private NetworkPublication newNetworkPublication( | |||
} | |||
} | |||
|
|||
|
|||
private static String enrichNetworkPublicationUri(ChannelUri channelUri, int streamId, int sessionId, int initialTermId, SendChannelEndpoint channelEndpoint, PublicationParams params) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You could create a base enrich method that sets common parameters such as mtu
, term-length
, initial-term-id
(etc.) and will return a ChannelUriStringBuilder
so that it can be further enhanced.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
|
||
import java.nio.ByteBuffer; | ||
|
||
public class Main |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this a debug code? If not, then it should be turned into a a test instead.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It will also be removed.
public int streamId(){ | ||
return streamId; | ||
} | ||
|
||
public void streamId(int streamId) | ||
{ | ||
this.streamId = streamId; | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These should return and accept and Integer
object. Also you need a version that reads stream-id
from a ChannelUri
(e.g. see ChannelUriStringBuilder#termOffset(io.aeron.ChannelUri)
). And it needs to filled in the constructor of the class.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have fixed the Integer part. I forget to change the signatures of the methods after converting streamId from int to Integer.
But the streamId isn't part of the channelUri; you can't create a channel like this:
Publication publication = aeron.addPublication( "aeron:udp?endpoint=localhost:40123|streamId=10");
Exposing the property as PARAM_NAME and adding it to the ChannelURI, could lead to end users getting confused. WDYT.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We want to make this parameter official. It should be possible to set stream-id
in the channel uri and will be adding validation that the stream-id
parameter is the same as streamId
argument to the method.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok.
* @throws IllegalArgumentException if the URI exceeds the maximum length. | ||
*/ | ||
public static void channelUri(final UnsafeBuffer metadataBuffer, final String channelUri) { | ||
if (channelUri.length() > LOG_CHANNEL_URI_MAX_LENGTH) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should use ChannelUri.MAX_URI_LENGTH
instead.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I introduced a new constant:
LOG_CHANNEL_URI_MAX_LENGTH = LOG_META_DATA_LENGTH - (LOG_CHANNEL_URI_OFFSET + SIZE_OF_INT);
This is now used to validate that the stored channel-uri isn't too long. This value is now 7164. Perhaps I should remove 1 more to deal with /0 C strings.
The extra length is needed because we'll be adding settings to the user specified channel-uri, so we could exceed the current 4095 limit.
88591aa
to
c8fc3df
Compare
final String ccStr = udpChannel.channelUri().get(CONGESTION_CONTROL_PARAM_NAME); | ||
uriStringBuilder.congestionControl(null == ccStr ? "static" : ccStr); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The cc
part can be removed as it is configured on a subscription side.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
uriStringBuilder.initialTermId(params.initialTermId); | ||
uriStringBuilder.termId(params.hasPosition ? params.termId : params.initialTermId); | ||
uriStringBuilder.termOffset(params.hasPosition ? params.termOffset : 0); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This logic is already implemented in PublicationParams
, i.e. just assign variables directly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
|
||
uriStringBuilder.eos(params.signalEos); | ||
uriStringBuilder.spiesSimulateConnection(params.spiesSimulateConnection); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
pub-wnd
is missing, i.e. use params.publicationWindowLength
to assign it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
if(!uriStringBuilder.isSessionIdTagged()){ | ||
uriStringBuilder.sessionId(params.sessionId); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should write session-id
unconditionally.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
System.out.println("Set channelUri:" + channelUri(logMetaData)); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Debug log.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed
final PublicationParams params) | ||
{ | ||
final ChannelUriStringBuilder uriStringBuilder = new ChannelUriStringBuilder(channel); | ||
uriStringBuilder.streamId(streamId); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
streamId
is part of PublicationParams
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
System.out.println("LOG_DEFAULT_FRAME_HEADER_MAX_LENGTH:" + LOG_DEFAULT_FRAME_HEADER_MAX_LENGTH); | ||
System.out.println("LOG_CHANNEL_URI_OFFSET:" + LOG_CHANNEL_URI_OFFSET); | ||
System.out.println("LOG_META_DATA_LENGTH:" + LOG_META_DATA_LENGTH); | ||
System.out.println("LOG_CHANNEL_URI_MAX_LENGTH:" + LOG_CHANNEL_URI_MAX_LENGTH); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Debug log.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed
configureUriStringBuilderWithPublicationParams(params, uriStringBuilder); | ||
|
||
|
||
uriStringBuilder.streamId(streamId); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
streamId
is part of PublicationParams
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
aeron-client/src/main/java/io/aeron/ChannelUriStringBuilder.java
Outdated
Show resolved
Hide resolved
private Integer streamId; | ||
private Integer publicationWindowLength; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is already implemented on master. Please merge those into your PR or rebase it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
|
||
channelUriStringBuilder.receiverWindowLength(params.receiverWindowLength); | ||
|
||
// channelUriStringBuilder.group(params.group); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should add so-rcvbuf
and so-sndbuf
from ReceiveChannelEndpoint
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
private static String congestionControlName(final CongestionControl congestionControl) | ||
{ | ||
if (congestionControl instanceof StaticWindowCongestionControl) | ||
{ | ||
return "static"; | ||
} | ||
else if (congestionControl instanceof CubicCongestionControl) | ||
{ | ||
return "cubic"; | ||
} | ||
else | ||
{ | ||
return congestionControl.getClass().getName(); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's use congestionControl.getClass().getName()
for simplicity. And it also aligns with the flow control handling.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok. Will fix.
0d3f190
to
94a4b3d
Compare
Closing this PR since it breaks compatibility. |
In this PR the channelUri is added to the metadata section of the LogBuffer.