Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] LogBuffer metadata store channelUri #1697

Conversation

pveentjer
Copy link
Contributor

In this PR the channelUri is added to the metadata section of the LogBuffer.

@pveentjer pveentjer requested a review from vyazelenko December 12, 2024 08:42
/**
* The maximum byte of the channel-uri stores as an ascii string (excluding the length).
*/
public static final int LOG_CHANNEL_URI_MAX_LENGTH = 4096 ;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There will be ChannelUri.MAX_URI_LENGTH that is set to 4095, i.e. we don't need log buffer specific limit.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

4095 or 4096? I guess the latter.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess for /0 termination of C based strings.


LOG_META_DATA_LENGTH = align(LOG_CHANNEL_URI_OFFSET + SIZE_OF_INT + LOG_CHANNEL_URI_MAX_LENGTH, PAGE_MIN_SIZE);

System.out.println("offset:"+offset);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Debug log.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All the system.outs will removed in the final version of this PR.

@@ -1764,15 +1819,66 @@ private NetworkPublication newNetworkPublication(
}
}


private static String enrichNetworkPublicationUri(ChannelUri channelUri, int streamId, int sessionId, int initialTermId, SendChannelEndpoint channelEndpoint, PublicationParams params)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You could create a base enrich method that sets common parameters such as mtu, term-length, initial-term-id (etc.) and will return a ChannelUriStringBuilder so that it can be further enhanced.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done


import java.nio.ByteBuffer;

public class Main
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this a debug code? If not, then it should be turned into a a test instead.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will also be removed.

Comment on lines 2175 to 2295
public int streamId(){
return streamId;
}

public void streamId(int streamId)
{
this.streamId = streamId;
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These should return and accept and Integer object. Also you need a version that reads stream-id from a ChannelUri (e.g. see ChannelUriStringBuilder#termOffset(io.aeron.ChannelUri)). And it needs to filled in the constructor of the class.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have fixed the Integer part. I forget to change the signatures of the methods after converting streamId from int to Integer.

But the streamId isn't part of the channelUri; you can't create a channel like this:

Publication publication = aeron.addPublication( "aeron:udp?endpoint=localhost:40123|streamId=10");

Exposing the property as PARAM_NAME and adding it to the ChannelURI, could lead to end users getting confused. WDYT.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We want to make this parameter official. It should be possible to set stream-id in the channel uri and will be adding validation that the stream-id parameter is the same as streamId argument to the method.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok.

* @throws IllegalArgumentException if the URI exceeds the maximum length.
*/
public static void channelUri(final UnsafeBuffer metadataBuffer, final String channelUri) {
if (channelUri.length() > LOG_CHANNEL_URI_MAX_LENGTH) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should use ChannelUri.MAX_URI_LENGTH instead.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I introduced a new constant:

    LOG_CHANNEL_URI_MAX_LENGTH = LOG_META_DATA_LENGTH - (LOG_CHANNEL_URI_OFFSET + SIZE_OF_INT);

This is now used to validate that the stored channel-uri isn't too long. This value is now 7164. Perhaps I should remove 1 more to deal with /0 C strings.

The extra length is needed because we'll be adding settings to the user specified channel-uri, so we could exceed the current 4095 limit.

@pveentjer pveentjer force-pushed the feature/logbuffer-channel-uri-take2 branch 2 times, most recently from 88591aa to c8fc3df Compare December 16, 2024 05:05
Comment on lines 1890 to 1891
final String ccStr = udpChannel.channelUri().get(CONGESTION_CONTROL_PARAM_NAME);
uriStringBuilder.congestionControl(null == ccStr ? "static" : ccStr);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The cc part can be removed as it is configured on a subscription side.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Comment on lines 1915 to 1917
uriStringBuilder.initialTermId(params.initialTermId);
uriStringBuilder.termId(params.hasPosition ? params.termId : params.initialTermId);
uriStringBuilder.termOffset(params.hasPosition ? params.termOffset : 0);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This logic is already implemented in PublicationParams, i.e. just assign variables directly.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done


uriStringBuilder.eos(params.signalEos);
uriStringBuilder.spiesSimulateConnection(params.spiesSimulateConnection);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pub-wnd is missing, i.e. use params.publicationWindowLength to assign it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Comment on lines 1919 to 1921
if(!uriStringBuilder.isSessionIdTagged()){
uriStringBuilder.sessionId(params.sessionId);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should write session-id unconditionally.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Comment on lines 1981 to 1971
System.out.println("Set channelUri:" + channelUri(logMetaData));
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Debug log.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed

final PublicationParams params)
{
final ChannelUriStringBuilder uriStringBuilder = new ChannelUriStringBuilder(channel);
uriStringBuilder.streamId(streamId);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

streamId is part of PublicationParams.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Comment on lines 239 to 242
System.out.println("LOG_DEFAULT_FRAME_HEADER_MAX_LENGTH:" + LOG_DEFAULT_FRAME_HEADER_MAX_LENGTH);
System.out.println("LOG_CHANNEL_URI_OFFSET:" + LOG_CHANNEL_URI_OFFSET);
System.out.println("LOG_META_DATA_LENGTH:" + LOG_META_DATA_LENGTH);
System.out.println("LOG_CHANNEL_URI_MAX_LENGTH:" + LOG_CHANNEL_URI_MAX_LENGTH);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Debug log.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed

configureUriStringBuilderWithPublicationParams(params, uriStringBuilder);


uriStringBuilder.streamId(streamId);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

streamId is part of PublicationParams.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Comment on lines 83 to 84
private Integer streamId;
private Integer publicationWindowLength;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is already implemented on master. Please merge those into your PR or rebase it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done


channelUriStringBuilder.receiverWindowLength(params.receiverWindowLength);

// channelUriStringBuilder.group(params.group);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should add so-rcvbuf and so-sndbuf from ReceiveChannelEndpoint.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Comment on lines 449 to 463
private static String congestionControlName(final CongestionControl congestionControl)
{
if (congestionControl instanceof StaticWindowCongestionControl)
{
return "static";
}
else if (congestionControl instanceof CubicCongestionControl)
{
return "cubic";
}
else
{
return congestionControl.getClass().getName();
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's use congestionControl.getClass().getName() for simplicity. And it also aligns with the flow control handling.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok. Will fix.

@pveentjer pveentjer force-pushed the feature/logbuffer-channel-uri-take2 branch from 0d3f190 to 94a4b3d Compare December 17, 2024 12:42
@pveentjer
Copy link
Contributor Author

Closing this PR since it breaks compatibility.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants