Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ADDED] Support padding when allocating buffer for incoming messages #624

Merged
merged 1 commit into from
Jan 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/conn.c
Original file line number Diff line number Diff line change
Expand Up @@ -2600,10 +2600,10 @@ _createMsg(natsMsg **newMsg, natsConnection *nc, char *buf, int bufLen, int hdrL
replyLen = natsBuf_Len(nc->ps->ma.reply);
}

s = natsMsg_create(newMsg,
s = natsMsg_createWithPadding(newMsg,
(const char*) natsBuf_Data(nc->ps->ma.subject), subjLen,
(const char*) reply, replyLen,
(const char*) buf, bufLen, hdrLen);
(const char*) buf, bufLen, nc->opts->payloadPaddingSize, hdrLen);
return s;
}

Expand Down
20 changes: 16 additions & 4 deletions src/msg.c
Original file line number Diff line number Diff line change
Expand Up @@ -744,16 +744,18 @@ natsMsg_GetTime(natsMsg *msg)
}

natsStatus
natsMsg_create(natsMsg **newMsg,
natsMsg_createWithPadding(natsMsg **newMsg,
const char *subject, int subjLen,
const char *reply, int replyLen,
const char *buf, int bufLen, int hdrLen)
const char *buf, int bufLen, int bufPaddingSize, int hdrLen)
{
natsMsg *msg = NULL;
char *ptr = NULL;
int bufSize = 0;
int dataLen = bufLen;
bool hasHdrs = (hdrLen > 0 ? true : false);
// Make payload a null-terminated string and add at least one zero byte to the end
int padLen = (bufPaddingSize > 0 ? bufPaddingSize : 1);
kozlovic marked this conversation as resolved.
Show resolved Hide resolved

bufSize = subjLen;
bufSize += 1;
Expand All @@ -763,7 +765,7 @@ natsMsg_create(natsMsg **newMsg,
bufSize += 1;
}
bufSize += bufLen;
bufSize += 1;
bufSize += padLen;
if (hasHdrs)
bufSize++;

Expand Down Expand Up @@ -828,7 +830,7 @@ natsMsg_create(natsMsg **newMsg,
if (buf != NULL)
memcpy(ptr, buf, dataLen);
ptr += dataLen;
*(ptr) = '\0';
memset(ptr, 0, padLen);
// This is essentially to match server's view of a message size
// when sending messages to pull consumers and keeping track
// of size in regards to a max_bytes setting.
Expand All @@ -843,6 +845,16 @@ natsMsg_create(natsMsg **newMsg,
return NATS_OK;
}

natsStatus
natsMsg_create(natsMsg **newMsg,
const char *subject, int subjLen,
const char *reply, int replyLen,
const char *buf, int bufLen, int hdrLen)
{
return natsMsg_createWithPadding(newMsg, subject, subjLen, reply, replyLen,
buf, bufLen, 0, hdrLen);
}

// Used internally to initialize a message structure, generally defined on the stack,
// that will then be passed as a reference to publish functions.
void
Expand Down
7 changes: 7 additions & 0 deletions src/msg.h
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,13 @@ natsMsg_create(natsMsg **newMsg,
const char *reply, int replyLen,
const char *buf, int bufLen, int hdrLen);

natsStatus
natsMsg_createWithPadding(natsMsg **newMsg,
const char *subject, int subjLen,
const char *reply, int replyLen,
const char *buf, int bufLen, int bufPaddingSize,
int hdrLen);

void
natsMsg_freeHeaders(natsMsg *msg);

Expand Down
16 changes: 16 additions & 0 deletions src/nats.h
Original file line number Diff line number Diff line change
Expand Up @@ -3116,6 +3116,22 @@ natsOptions_DisableNoResponders(natsOptions *opts, bool disabled);
NATS_EXTERN natsStatus
natsOptions_SetCustomInboxPrefix(natsOptions *opts, const char *inboxPrefix);

/** \brief Sets a custom padding when allocating buffer for incoming messages
*
* By default library allocates natsMsg with payload buffer size
* equal to payload size. Sometimes it can be useful to add some
* padding to the end of the buffer which can be tweaked using
* this option.
*
* To clear the custom message buffer padding, call this function with 0.
* Changing this option has no effect on existing NATS connections.
*
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would like to add a note here that says that changing the option has no effect on existing NATS connections.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

* @param opts the pointer to the #natsOptions object.
* @param paddingSize the desired inbox prefix.
*/
NATS_EXTERN natsStatus
natsOptions_SetMessageBufferPadding(natsOptions *opts, int paddingSize);

/** \brief Destroys a #natsOptions object.
*
* Destroys the natsOptions object, freeing used memory. See the note in
Expand Down
3 changes: 3 additions & 0 deletions src/natsp.h
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,9 @@ struct __natsOptions

// Custom inbox prefix
char *inboxPfx;

// Custom message payload padding size
int payloadPaddingSize;
};

typedef struct __nats_MsgList
Expand Down
12 changes: 12 additions & 0 deletions src/opts.c
Original file line number Diff line number Diff line change
Expand Up @@ -1440,6 +1440,18 @@ natsOptions_SetCustomInboxPrefix(natsOptions *opts, const char *inboxPrefix)
return NATS_UPDATE_ERR_STACK(s);
}

natsStatus
natsOptions_SetMessageBufferPadding(natsOptions *opts, int paddingSize)
{
LOCK_AND_CHECK_OPTIONS(opts, (paddingSize < 0));

opts->payloadPaddingSize = paddingSize;

UNLOCK_OPTS(opts);

return NATS_OK;
}

static void
_freeOptions(natsOptions *opts)
{
Expand Down
1 change: 1 addition & 0 deletions test/list.txt
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ OldRequest
SimultaneousRequests
RequestClose
CustomInbox
MessagePadding
FlushInCb
ReleaseFlush
FlushErrOnDisconnect
Expand Down
58 changes: 58 additions & 0 deletions test/test.c
Original file line number Diff line number Diff line change
Expand Up @@ -12559,6 +12559,63 @@ test_CustomInbox(void)
_stopServer(serverPid);
}

static void
test_MessageBufferPadding(void)
{
natsStatus s;
natsConnection *nc = NULL;
natsOptions *opts = NULL;
natsSubscription *sub = NULL;
natsMsg *msg = NULL;
natsPid serverPid = NATS_INVALID_PID;
const char *string = "Hello World";
const char *servers[] = { "nats://127.0.0.1:4222" };
int serversCount = 1;
int paddingSize = 32;
bool paddingIsZeros = true;

serverPid = _startServer(servers[0], NULL, true);
CHECK_SERVER_STARTED(serverPid);

test("Create options: ");
s = natsOptions_Create(&opts);
testCond(s == NATS_OK);

test("Setting message buffer padding: ");
s = natsOptions_SetMessageBufferPadding(opts, paddingSize);
testCond(s == NATS_OK);

test("Setting servers: ");
s = natsOptions_SetServers(opts, servers, serversCount);
testCond(s == NATS_OK);

test("Test generating message for subscriber: ")
s = natsConnection_Connect(&nc, opts);
IFOK(s, natsConnection_SubscribeSync(&sub, nc, "foo"));
IFOK(s, natsConnection_PublishString(nc, "foo", string));
IFOK(s, natsSubscription_NextMsg(&msg, sub, 1000));
testCond((s == NATS_OK)
&& (msg != NULL)
&& (strncmp(string, natsMsg_GetData(msg), natsMsg_GetDataLength(msg)) == 0));

test("Test access to memory in message buffer beyond data length: ");
// This test can pass even if padding doesn't work as excepted.
// But valgrind will show access to unallocated memory
for (int i=natsMsg_GetDataLength(msg); i<natsMsg_GetDataLength(msg)+paddingSize; i++) {
if (natsMsg_GetData(msg)[i])
paddingIsZeros = false;
}

testCond(paddingIsZeros);

natsMsg_Destroy(msg);
natsSubscription_Destroy(sub);
natsConnection_Destroy(nc);
natsOptions_Destroy(opts);

_stopServer(serverPid);
}

static void
test_FlushInCb(void)
{
Expand Down Expand Up @@ -34224,6 +34281,7 @@ static testInfo allTests[] =
{"SimultaneousRequests", test_SimultaneousRequest},
{"RequestClose", test_RequestClose},
{"CustomInbox", test_CustomInbox},
{"MessagePadding", test_MessageBufferPadding},
{"FlushInCb", test_FlushInCb},
{"ReleaseFlush", test_ReleaseFlush},
{"FlushErrOnDisconnect", test_FlushErrOnDisconnect},
Expand Down