Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix: Avoid triggering notification if metadata changes but not the attribute value itself #3727 #4090

Closed
wants to merge 13 commits into from
1 change: 1 addition & 0 deletions CHANGES_NEXT_RELEASE
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
- Fix: avoid triggering notification if metadata changes but not the attribute value itself (#3727)
- Add: conditions.alterationTypes subscription fuctionality (#1494)
- Reference distribution changed RHEL/CentOS 8 to Debian 11
1 change: 1 addition & 0 deletions src/app/contextBroker/contextBroker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ std::vector<int> serviceQueueSizeV;
std::vector<int> serviceNumThreadV;

bool noCache;
bool onlyMetadata;
unsigned int connectionMemory;
unsigned int maxConnections;
unsigned int reqPoolSize;
Expand Down
5 changes: 5 additions & 0 deletions src/lib/apiTypesV2/Subscription.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,11 @@ std::string Notification::toJson(const std::string& attrsFormat)
jh.addNumber("failsCounter", this->failsCounter);
}

if (this->notifyOnMetadataChange)
{
jh.addBool("notifyOnMetadataChange", this->notifyOnMetadataChange);
}

return jh.str();
}

Expand Down
3 changes: 3 additions & 0 deletions src/lib/apiTypesV2/Subscription.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ struct Notification
std::vector<std::string> metadata;
bool blacklist;
bool onlyChanged;
bool notifyOnMetadataChange;
long long timesSent;
long long failsCounter;
long long maxFailsLimit;
Expand All @@ -94,6 +95,7 @@ struct Notification
attributes(),
blacklist(false),
onlyChanged(false),
notifyOnMetadataChange(true),
timesSent(0),
failsCounter(0),
maxFailsLimit(-1),
Expand Down Expand Up @@ -146,6 +148,7 @@ struct Subscription
std::string id;
std::string description;
bool descriptionProvided;
bool notifyOnMetadataChange;
Subject subject;
long long expires;
std::string status;
Expand Down
2 changes: 2 additions & 0 deletions src/lib/apiTypesV2/SubscriptionUpdate.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ class SubscriptionUpdate : public Subscription
bool throttlingProvided;
bool blacklistProvided;
bool onlyChangedProvided;
bool notifyOnMetadataChangeProvided;
bool fromNgsiv1; // to support a special case when the SubscriptionUpdate comes from NGSIv1

SubscriptionUpdate():
Expand All @@ -57,6 +58,7 @@ class SubscriptionUpdate : public Subscription
throttlingProvided(false),
blacklistProvided(false),
onlyChangedProvided(false),
notifyOnMetadataChangeProvided(true),
fromNgsiv1(false)
{
descriptionProvided = false;
Expand Down
2 changes: 2 additions & 0 deletions src/lib/cache/subCache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -814,6 +814,7 @@ void subCacheItemInsert
const std::string& coords,
const std::string& georel,
bool blacklist,
bool notifyOnMetadataChange,
bool onlyChanged
)
{
Expand Down Expand Up @@ -851,6 +852,7 @@ void subCacheItemInsert
cSubP->expression.georel = georel;
cSubP->blacklist = blacklist;
cSubP->onlyChanged = onlyChanged;
cSubP->notifyOnMetadataChange = notifyOnMetadataChange;
cSubP->httpInfo = httpInfo;
cSubP->mqttInfo = mqttInfo;
cSubP->notifyConditionV = conditionAttrs;
Expand Down
2 changes: 2 additions & 0 deletions src/lib/cache/subCache.h
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ struct CachedSubscription
SubscriptionExpression expression;
bool blacklist;
bool onlyChanged;
bool notifyOnMetadataChange;
ngsiv2::HttpInfo httpInfo;
ngsiv2::MqttInfo mqttInfo;
int64_t lastFailure; // timestamp of last notification failure
Expand Down Expand Up @@ -217,6 +218,7 @@ extern void subCacheItemInsert
const std::string& coords,
const std::string& georel,
bool blacklist,
bool notifyOnMetadataChange,
bool onlyChanged
);

Expand Down
1 change: 1 addition & 0 deletions src/lib/common/globals.h
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,7 @@ extern unsigned cprForwardLimit;
extern char notificationMode[];
extern char notifFlowControl[];
extern bool noCache;
extern bool onlyMetadata;
extern bool simulatedNotification;

extern bool semWaitStatistics;
Expand Down
16 changes: 16 additions & 0 deletions src/lib/jsonParseV2/parseSubscription.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -944,6 +944,22 @@ static std::string parseNotification(ConnectionInfo* ciP, SubscriptionUpdate* su
}
}

// notifyOnMetadataChange field
if (notification.HasMember("notifyOnMetadataChange"))
{
Opt<bool> notifyOnMetadataChangeOpt = getBoolOpt(notification, "notifyOnMetadataChange");
if (!notifyOnMetadataChangeOpt.ok())
{
return badInput(ciP, notifyOnMetadataChangeOpt.error);
}
else if (notifyOnMetadataChangeOpt.given)
{
bool notifyOnMetadataChangeBool = notifyOnMetadataChangeOpt.value;
subsP->notifyOnMetadataChangeProvided = true;
subsP->notification.notifyOnMetadataChange = notifyOnMetadataChangeBool;
}
}

// attrsFormat field
Opt<std::string> attrsFormatOpt = getStringOpt(notification, "attrsFormat");

Expand Down
14 changes: 14 additions & 0 deletions src/lib/mongoBackend/MongoCommonSubscription.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -519,6 +519,20 @@ void setOnlyChanged(const Subscription& sub, orion::BSONObjBuilder* b)



/* ****************************************************************************
*
* setNotifyOnMetadataChange -
*/
void setNotifyOnMetadataChange(const Subscription& sub, orion::BSONObjBuilder* b)
{
bool bl = sub.notification.notifyOnMetadataChange;

b->append(CSUB_NOTIFYONMETADATACHANGE, bl);
LM_T(LmtMongo, ("Subscription notifyOnMetadataChange: %s", bl ? "true" : "false"));
}



/* ****************************************************************************
*
* setOperations -
Expand Down
8 changes: 8 additions & 0 deletions src/lib/mongoBackend/MongoCommonSubscription.h
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,14 @@ extern void setOnlyChanged(const ngsiv2::Subscription& sub, orion::BSONObjBuilde



/* ****************************************************************************
*
* setNotifyOnMetadataChange -
*/
extern void setNotifyOnMetadataChange(const ngsiv2::Subscription& sub, orion::BSONObjBuilder* b);



/* ****************************************************************************
*
* setOperations -
Expand Down
54 changes: 48 additions & 6 deletions src/lib/mongoBackend/MongoCommonUpdate.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -678,10 +678,39 @@ static bool mergeAttrInfo
* 3) the metadata changed (this is done checking if the size of the original and final metadata vectors is
* different and, if they are of the same size, checking if the vectors are not equal)
*/
actualUpdate = (attrValueChanges(attr, caP, forcedUpdate, apiVersion) ||
((!caP->type.empty()) &&
(!attr.hasField(ENT_ATTRS_TYPE) || getStringFieldF(attr, ENT_ATTRS_TYPE) != caP->type) ) ||
mdNew.nFields() != mdSize || !equalMetadata(md, mdNew));

extern bool onlyMetadata;

if (attrValueChanges(attr, caP, forcedUpdate, apiVersion))
{
onlyMetadata = false;
}

else
{
if ((!caP->type.empty()) && (!attr.hasField(ENT_ATTRS_TYPE) || getStringFieldF(attr, ENT_ATTRS_TYPE) != caP->type))
{
onlyMetadata = false;
}
else
{
if (mdNew.nFields() != mdSize)
{
onlyMetadata = true;
}
else
{
if (!equalMetadata(md, mdNew))
{
onlyMetadata = true;
}
else
{
onlyMetadata = false;
}
}
}
}
}
else
{
Expand Down Expand Up @@ -1120,6 +1149,7 @@ static bool addTriggeredSubscriptions_withCache
TriggeredSubscription* subP = new TriggeredSubscription((long long) cSubP->throttling,
cSubP->maxFailsLimit,
cSubP->failsCounter,
cSubP->notifyOnMetadataChange,
(long long) cSubP->lastNotificationTime,
cSubP->renderFormat,
cSubP->httpInfo,
Expand Down Expand Up @@ -1607,6 +1637,7 @@ static bool addTriggeredSubscriptions_noCache
std::string renderFormatString = sub.hasField(CSUB_FORMAT)? getStringFieldF(sub, CSUB_FORMAT) : "legacy";
bool onlyChanged = sub.hasField(CSUB_ONLYCHANGED)? getBoolFieldF(sub, CSUB_ONLYCHANGED) : false;
bool blacklist = sub.hasField(CSUB_BLACKLIST)? getBoolFieldF(sub, CSUB_BLACKLIST) : false;
bool notifyOnMetadataChange = sub.hasField(CSUB_NOTIFYONMETADATACHANGE)? getBoolFieldF(sub, CSUB_NOTIFYONMETADATACHANGE) : true;
RenderFormat renderFormat = stringToRenderFormat(renderFormatString);
ngsiv2::HttpInfo httpInfo;
ngsiv2::MqttInfo mqttInfo;
Expand All @@ -1626,6 +1657,7 @@ static bool addTriggeredSubscriptions_noCache
throttling,
maxFailsLimit,
failsCounter,
notifyOnMetadataChange,
lastNotification,
renderFormat,
httpInfo,
Expand Down Expand Up @@ -1789,7 +1821,8 @@ static bool processOnChangeConditionForUpdateContext
const std::string& fiwareCorrelator,
unsigned int correlatorCounter,
const ngsiv2::Notification& notification,
bool blacklist = false
bool blacklist = false,
bool notifyOnMetadataChange = true
)
{
NotifyContextRequest ncr;
Expand Down Expand Up @@ -1830,6 +1863,13 @@ static bool processOnChangeConditionForUpdateContext
}
}

/* Avoid sending notification if notifyOnMetadataChange == false and metadata changes */
if (notifyOnMetadataChange == false && onlyMetadata == true)
{
ncr.contextElementResponseVector.release();
return false;
}

/* Early exit without sending notification if attribute list is empty */
if (cer.entity.attributeVector.size() == 0)
{
Expand All @@ -1853,6 +1893,7 @@ static bool processOnChangeConditionForUpdateContext
tenant,
maxFailsLimit,
failsCounter,
notifyOnMetadataChange,
xauthToken,
fiwareCorrelator,
correlatorCounter,
Expand Down Expand Up @@ -2004,7 +2045,8 @@ static unsigned int processSubscriptions
fiwareCorrelator,
notifStartCounter + notifSent + 1,
notification,
tSubP->blacklist);
tSubP->blacklist,
tSubP->notifyOnMetadataChange);

if (notificationSent)
{
Expand Down
3 changes: 3 additions & 0 deletions src/lib/mongoBackend/TriggeredSubscription.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ TriggeredSubscription::TriggeredSubscription
long long _throttling,
long long _maxFailsLimit,
long long _failsCounter,
bool _notifyOnMetadataChange,
long long _lastNotification,
RenderFormat _renderFormat,
const ngsiv2::HttpInfo& _httpInfo,
Expand All @@ -54,6 +55,7 @@ TriggeredSubscription::TriggeredSubscription
throttling(_throttling),
maxFailsLimit(_maxFailsLimit),
failsCounter(_failsCounter),
notifyOnMetadataChange(_notifyOnMetadataChange),
lastNotification(_lastNotification),
renderFormat(_renderFormat),
httpInfo(_httpInfo),
Expand Down Expand Up @@ -86,6 +88,7 @@ TriggeredSubscription::TriggeredSubscription
throttling(-1),
maxFailsLimit(-1),
failsCounter(-1),
notifyOnMetadataChange(true),
lastNotification(-1),
renderFormat(_renderFormat),
httpInfo(_httpInfo),
Expand Down
2 changes: 2 additions & 0 deletions src/lib/mongoBackend/TriggeredSubscription.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ class TriggeredSubscription
long long throttling;
long long maxFailsLimit;
long long failsCounter;
bool notifyOnMetadataChange;
long long lastNotification;
RenderFormat renderFormat;
ngsiv2::HttpInfo httpInfo;
Expand All @@ -78,6 +79,7 @@ class TriggeredSubscription
TriggeredSubscription(long long _throttling,
long long _maxFailsLimit,
long long _failsCounter,
bool _notifyOnMetadataChange,
long long _lastNotification,
RenderFormat _renderFormat,
const ngsiv2::HttpInfo& _httpInfo,
Expand Down
1 change: 1 addition & 0 deletions src/lib/mongoBackend/dbConstants.h
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@
#define CSUB_LASTFAILUREASON "lastFailureReason"
#define CSUB_LASTSUCCESS "lastSuccess"
#define CSUB_LASTSUCCESSCODE "lastSuccessCode"
#define CSUB_NOTIFYONMETADATACHANGE "notifyOnMetadataChange"

#define CSUB_MQTTTOPIC "topic"
#define CSUB_MQTTQOS "qos"
Expand Down
2 changes: 2 additions & 0 deletions src/lib/mongoBackend/mongoCreateSubscription.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ static void insertInCache
sub.subject.condition.expression.coords,
sub.subject.condition.expression.georel,
sub.notification.blacklist,
sub.notification.notifyOnMetadataChange,
sub.notification.onlyChanged);

cacheSemGive(__FUNCTION__, "Inserting subscription in cache");
Expand Down Expand Up @@ -162,6 +163,7 @@ std::string mongoCreateSubscription
setAttrs(sub, &b);
setMetadata(sub, &b);
setBlacklist(sub, &b);
setNotifyOnMetadataChange(sub, &b);
setOnlyChanged(sub, &b);

if (!sub.description.empty())
Expand Down
6 changes: 6 additions & 0 deletions src/lib/mongoBackend/mongoGetSubscriptions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,7 @@ static void setNotification(Subscription* subP, const orion::BSONObj& r, const s
nP->lastSuccess = r.hasField(CSUB_LASTSUCCESS)? getIntOrLongFieldAsLongF(r, CSUB_LASTSUCCESS) : -1;
nP->lastFailureReason = r.hasField(CSUB_LASTFAILUREASON)? getStringFieldF(r, CSUB_LASTFAILUREASON) : "";
nP->lastSuccessCode = r.hasField(CSUB_LASTSUCCESSCODE)? getIntOrLongFieldAsLongF(r, CSUB_LASTSUCCESSCODE) : -1;
nP->notifyOnMetadataChange = r.hasField(CSUB_NOTIFYONMETADATACHANGE)? getBoolFieldF(r, CSUB_NOTIFYONMETADATACHANGE) : true;

// Attributes format
subP->attrsFormat = r.hasField(CSUB_FORMAT)? stringToRenderFormat(getStringFieldF(r, CSUB_FORMAT)) : NGSI_V1_LEGACY;
Expand Down Expand Up @@ -232,6 +233,11 @@ static void setNotification(Subscription* subP, const orion::BSONObj& r, const s
subP->notification.lastSuccess = cSubP->lastSuccess;
subP->notification.lastSuccessCode = cSubP->lastSuccessCode;
}

if (cSubP->notifyOnMetadataChange > subP->notifyOnMetadataChange)
{
subP->notification.notifyOnMetadataChange = cSubP->notifyOnMetadataChange;
}
}
cacheSemGive(__FUNCTION__, "get notification info");
}
Expand Down
2 changes: 2 additions & 0 deletions src/lib/mongoBackend/mongoSubCache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ int mongoSubCacheItemInsert(const char* tenant, const orion::BSONObj& sub)
cSubP->count = 0;
cSubP->failsCounter = 0;
cSubP->onlyChanged = sub.hasField(CSUB_ONLYCHANGED)? getBoolFieldF(sub, CSUB_ONLYCHANGED) : false;
cSubP->notifyOnMetadataChange = sub.hasField(CSUB_NOTIFYONMETADATACHANGE)? getBoolFieldF(sub, CSUB_NOTIFYONMETADATACHANGE) : true;
cSubP->next = NULL;


Expand Down Expand Up @@ -392,6 +393,7 @@ int mongoSubCacheItemInsert
cSubP->expression.georel = georel;
cSubP->next = NULL;
cSubP->blacklist = sub.hasField(CSUB_BLACKLIST)? getBoolFieldF(sub, CSUB_BLACKLIST) : false;
cSubP->notifyOnMetadataChange = sub.hasField(CSUB_NOTIFYONMETADATACHANGE) ? getBoolFieldF(sub, CSUB_NOTIFYONMETADATACHANGE) : true;

cSubP->lastNotificationTime = lastNotificationTime;
cSubP->lastFailure = lastFailure;
Expand Down
1 change: 1 addition & 0 deletions src/lib/mongoBackend/mongoUpdateSubscription.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,7 @@ std::string mongoUpdateSubscription
if (subUp.statusProvided) setStatus(subUp.status, &setB, now);
if (subUp.blacklistProvided) setBlacklist(subUp, &setB);
if (subUp.onlyChangedProvided) setOnlyChanged(subUp, &setB);
if (subUp.notifyOnMetadataChangeProvided) setNotifyOnMetadataChange(subUp, &setB);
if (subUp.attrsFormatProvided) setFormat(subUp, &setB);


Expand Down
Loading