Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue #1351 Implementation : Added message_deleted callback when the processing queue exceeds maxBufferedMessages #1358

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions src/MQTTAsync.c
Original file line number Diff line number Diff line change
Expand Up @@ -1600,6 +1600,27 @@ int MQTTAsync_setDeliveryCompleteCallback(MQTTAsync handle, void* context,
return rc;
}

int MQTTAsync_setMessageDeletedCallback(MQTTAsync handle, void* context,
MQTTAsync_messageDeleted* md)
{
int rc = MQTTASYNC_SUCCESS;
MQTTAsyncs* m = handle;

FUNC_ENTRY;
MQTTAsync_lock_mutex(mqttasync_mutex);

if (m == NULL || m->c->connect_state != 0)
rc = MQTTASYNC_FAILURE;
else
{
m->mdContext = context;
m->md = md;
}

MQTTAsync_unlock_mutex(mqttasync_mutex);
FUNC_EXIT_RC(rc);
return rc;
}

int MQTTAsync_setDisconnected(MQTTAsync handle, void* context, MQTTAsync_disconnected* disconnected)
{
Expand Down
33 changes: 33 additions & 0 deletions src/MQTTAsync.h
Original file line number Diff line number Diff line change
Expand Up @@ -444,6 +444,16 @@ typedef void MQTTAsync_connected(void* context, char* cause);
typedef void MQTTAsync_disconnected(void* context, MQTTProperties* properties,
enum MQTTReasonCodes reasonCode);


/**
* This is a callback function, which will be called when the oldest message is deleted before processing.
*
* @param context A pointer to the <i>context</i> value originally passed to
* MQTTAsync_setMessageDeletedCallback(), which contains any application-specific context.
* @param token A request identifier
*/
typedef void MQTTAsync_messageDeleted(void* context, MQTTAsync_token token);

/**
* Sets the MQTTAsync_disconnected() callback function for a client.
* @param handle A valid client handle from a successful call to
Expand Down Expand Up @@ -875,6 +885,29 @@ LIBMQTT_API int MQTTAsync_setMessageArrivedCallback(MQTTAsync handle, void* cont
LIBMQTT_API int MQTTAsync_setDeliveryCompleteCallback(MQTTAsync handle, void* context,
MQTTAsync_deliveryComplete* dc);


/**
* This function sets the callback function for a message deleted event
* for a specific client. If you do not set a messageDeleted callback
* function, you will not be notified of any message that is dropped before processing
* when the maxBufferedMessages limit is reached.
*
* <b>Note:</b> This callback is called when <i>deleteOldestMessages</i> flag is set during
* client creation
* @param handle A valid client handle from a successful call to
* MQTTAsync_create() or MQTTAsync_createWithOptions()
* @param context A pointer to any application-specific context. The
* <i>context</i> pointer is passed to the callback functions to provide
* access to the context information in the callback.
* @param md A pointer to an MQTTAsync_messageDeleted() callback
* function. You can set this to NULL if you do not want notification for the deleted message.
* @return ::MQTTASYNC_SUCCESS if the callbacks were correctly set,
* ::MQTTASYNC_FAILURE if an error occurred.
*/
LIBMQTT_API int MQTTAsync_setMessageDeletedCallback(MQTTAsync handle, void* context,
MQTTAsync_messageDeleted* md);


/**
* Sets the MQTTAsync_connected() callback function for a client.
* @param handle A valid client handle from a successful call to
Expand Down
6 changes: 6 additions & 0 deletions src/MQTTAsyncUtils.c
Original file line number Diff line number Diff line change
Expand Up @@ -895,6 +895,12 @@ int MQTTAsync_addCommand(MQTTAsync_queuedCommand* command, int command_size)
#endif

MQTTAsync_freeCommand(first_publish);

/* notify callback */
if (command->client->md)
{
(*(command->client->md))(command->client->mdContext, first_publish->command.token);
}
}
}
else
Expand Down
2 changes: 2 additions & 0 deletions src/MQTTAsyncUtils.h
Original file line number Diff line number Diff line change
Expand Up @@ -97,9 +97,11 @@ typedef struct MQTTAsync_struct
MQTTAsync_connectionLost* cl;
MQTTAsync_messageArrived* ma;
MQTTAsync_deliveryComplete* dc;
MQTTAsync_messageDeleted* md;
void* clContext; /* the context to be associated with the conn lost callback*/
void* maContext; /* the context to be associated with the msg arrived callback*/
void* dcContext; /* the context to be associated with the deliv complete callback*/
void* mdContext; /* the context to be associated with the message deleted callback*/

MQTTAsync_connected* connected;
void* connected_context; /* the context to be associated with the connected callback*/
Expand Down
122 changes: 121 additions & 1 deletion test/test9.c
Original file line number Diff line number Diff line change
Expand Up @@ -2685,12 +2685,132 @@ int test10(struct Options options)
return failures;
}

/**************************************************************************

Test11: Invoke callback for oldest buffered messages first on buffer full

****************************************************************************/
int test11Finished = 0;
int test11cConnected = 0;
int test11OnFailureCalled = 0;
int test11MessagesToSend = 6;
int test11messagedeleted = 0;

void test11cOnConnect(void* context, MQTTAsync_successData* response)
{
MQTTAsync c = (MQTTAsync)context;
int rc;

MyLog(LOGA_DEBUG, "In connect onSuccess callback for client c, context %p\n", context);
test11cConnected = 1;

/* send more messages than max buffer */
for (int i = 0; i < test11MessagesToSend; ++i)
{
char buf[50];

MQTTAsync_message pubmsg = MQTTAsync_message_initializer;
MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
pubmsg.qos = i % 3;
sprintf(buf, "%d message no, QoS %d", i, pubmsg.qos);
pubmsg.payload = buf;
pubmsg.payloadlen = (int)(strlen(pubmsg.payload) + 1);
pubmsg.retained = 0;
rc = MQTTAsync_sendMessage(c, test_topic, &pubmsg, &opts);
assert("Good rc from sendMessage", rc == MQTTASYNC_SUCCESS, "rc was %d ", rc);
}
}

void test11_messageDeleted(void* context, MQTTAsync_token token)
{
MQTTAsync c = (MQTTAsync)context;
MyLog(LOGA_INFO, "Message %d token deleted", token);
test11messagedeleted++;
}

void test11OnFailure(void* context, MQTTAsync_failureData* response)
{
MyLog(LOGA_DEBUG, "In connect onFailure callback, context %p", context);

test11OnFailureCalled++;
test11Finished = 1;
}

int test11(struct Options options)
{
char* testname = "test11";
int subsqos = 2;
MQTTAsync c;
MQTTAsync_connectOptions opts = MQTTAsync_connectOptions_initializer;
MQTTAsync_willOptions wopts = MQTTAsync_willOptions_initializer;
MQTTAsync_createOptions createOptions = MQTTAsync_createOptions_initializer;
int rc = 0;
int count = 0;
char clientidc[70];
int i = 0;

sprintf(willTopic, "paho-test9-11-%s", unique);
sprintf(clientidc, "paho-test9-11-c-%s", unique);
sprintf(test_topic, "paho-test9-11-test topic %s", unique);

test11Finished = 0;
failures = 0;
MyLog(LOGA_INFO, "Starting Offline buffering 11 - delete oldest buffered messages first");
fprintf(xml, "<testcase classname=\"test9\" name=\"%s\"", testname);
global_start_time = start_clock();

createOptions.maxBufferedMessages = 3;
createOptions.deleteOldestMessages = 1;
rc = MQTTAsync_createWithOptions(&c, options.connection, clientidc, MQTTCLIENT_PERSISTENCE_DEFAULT,
NULL, &createOptions);
assert("good rc from create", rc == MQTTASYNC_SUCCESS, "rc was %d \n", rc);
if (rc != MQTTASYNC_SUCCESS)
{
MQTTAsync_destroy(&c);
goto exit;
}

rc = MQTTAsync_setMessageDeletedCallback (c, c, test11_messageDeleted);
assert("Good rc from setMessageDeletedCallback", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);

opts.onSuccess = test11cOnConnect;
opts.onFailure = test11OnFailure;
opts.context = c;
opts.cleansession = 0;

MyLog(LOGA_DEBUG, "Connecting client c");
rc = MQTTAsync_connect(c, &opts);
assert("Good rc from connect", rc == MQTTASYNC_SUCCESS, "rc was %d ", rc);
if (rc != MQTTASYNC_SUCCESS)
{
failures++;
goto exit;
}

/* wait for the first 3 messages to be deleted */
count = 0;
while (test11messagedeleted < 3 && ++count < 1000)
MySleep(100);

assert("Test number of messages deleted:",test11messagedeleted == 3, "test11messagedeleted was %d", test11messagedeleted);
rc = MQTTAsync_disconnect(c, NULL);
assert("Good rc from disconnect", rc == MQTTASYNC_SUCCESS, "rc was %d ", rc);

exit:
MySleep(200);
MQTTAsync_destroy(&c);
MyLog(LOGA_INFO, "%s: test %s. %d tests run, %d failures.",
(failures == 0) ? "passed" : "failed", testname, tests, failures);
write_test_result();
return failures;

}

int main(int argc, char** argv)
{
int* numtests = &tests;
int rc = 0;
int (*tests[])() = { NULL, test1, test2, test3, test4, test5, test6, test7, test8, test9, test10};
int (*tests[])() = { NULL, test1, test2, test3, test4, test5, test6, test7, test8, test9, test10, test11};
time_t randtime;

srand((unsigned) time(&randtime));
Expand Down