Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Multiple queues with different priority #3

Open
IT-VBFK opened this issue Jul 10, 2023 · 5 comments
Open

Multiple queues with different priority #3

IT-VBFK opened this issue Jul 10, 2023 · 5 comments

Comments

@IT-VBFK
Copy link

IT-VBFK commented Jul 10, 2023

Hi

Just a quick question: Is it possible somehow to treat queues with a different priority?

My use case is: I have two queues, the first handles some background tasks while the second should be open for "immediate" response * (in my case a HTTP response, which should be handled with priority, so the users don't have to wait longer than needed).

If I could tell the node that it should per default start processing the default queue, but if something happens in the second queue it should stop and continue in that queue until this queue is empty and then proceed with the background tasks again.

Thank you :)

*) I have to handle this that way, because my queue items are serial commands and this requires some delay between

@drmibell
Copy link
Owner

Have you been able to construct a flow that does what you want, possibly with external logic or multiple queue-gate nodes? That flow would help me understand better what changes would have to be made in the multiple-queue node.

@IT-VBFK
Copy link
Author

IT-VBFK commented Jul 12, 2023

No, unfortunately not.

The closest is something like this, but it stops after dequeuing the prio1 queue.

[{"id":"fe3b0366e0f7fed9","type":"inject","z":"fd6e1b98d7a33d40","name":"default","props":[{"p":"payload"},{"p":"queue","v":"default","vt":"str"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"","payload":"","payloadType":"str","x":170,"y":160,"wires":[["917af25299a46ad7"]]},{"id":"917af25299a46ad7","type":"m-queue","z":"fd6e1b98d7a33d40","name":"","queueSelect":"queue","controlFlag":"control","defaultQueue":"default","allQueues":"all","triggerCmd":"trigger","statusCmd":"status","pauseCmd":"pause","resumeCmd":"resume","flushCmd":"flush","resetCmd":"reset","peekCmd":"peek","dropCmd":"drop","maximumCmd":"maximum","newestCmd":"newest","protectCmd":"protect","deleteCmd":"delete","paused":false,"protect":false,"keepNewestDefault":false,"maxSizeDefault":100,"protectDefault":true,"persist":false,"newValue":"value","storeName":"memory","statusOutput":true,"outputs":2,"x":420,"y":200,"wires":[["765e45f41e346c32"],["399d048e86c4cd58"]]},{"id":"28ce3e2ce0b5a23e","type":"inject","z":"fd6e1b98d7a33d40","name":"","props":[{"p":"payload"},{"p":"queue","v":"all","vt":"str"},{"p":"control","v":"true","vt":"bool"}],"repeat":"2","crontab":"","once":false,"onceDelay":0.1,"topic":"","payload":"status","payloadType":"str","x":160,"y":240,"wires":[[]]},{"id":"82d37241fa34a301","type":"function","z":"fd6e1b98d7a33d40","name":"function 7","func":"if(msg.payload.length == 1)\n    return {payload: 'default'};\n\nlet returnValue = \"default\";\nflow.set('currentStatus', msg.payload);\n\nfor (const e of msg.payload) {\n    if(e.payload.qName == 'default'){\n        if (e.payload.msgQ.length > 0)\n            returnValue = \"default\";\n    }\n    if (e.payload.qName == 'p1') {\n        if (e.payload.msgQ.length > 0){\n            returnValue = \"p1\";\n            break;\n        }\n    }\n}\n\nreturn {payload: returnValue};","outputs":1,"noerr":0,"initialize":"","finalize":"","libs":[],"x":740,"y":200,"wires":[["10a913625e1263db"]]},{"id":"399d048e86c4cd58","type":"join","z":"fd6e1b98d7a33d40","name":"","mode":"custom","build":"array","property":"","propertyType":"full","key":"topic","joiner":"\\n","joinerType":"str","accumulate":false,"timeout":"","count":"2","reduceRight":false,"reduceExp":"","reduceInit":"","reduceInitType":"","reduceFixup":"","x":590,"y":200,"wires":[["82d37241fa34a301"]]},{"id":"566bdaf69e3bb10d","type":"inject","z":"fd6e1b98d7a33d40","name":"p1","props":[{"p":"payload"},{"p":"queue","v":"p1","vt":"str"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"","payload":"","payloadType":"str","x":170,"y":200,"wires":[["917af25299a46ad7"]]},{"id":"d0b3fe2e439acead","type":"inject","z":"fd6e1b98d7a33d40","name":"","props":[{"p":"payload"},{"p":"queue","v":"default","vt":"str"},{"p":"control","v":"true","vt":"bool"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"","payload":"peek","payloadType":"str","x":150,"y":280,"wires":[[]]},{"id":"b5ed5f14d1f4fb10","type":"inject","z":"fd6e1b98d7a33d40","name":"p1","props":[{"p":"payload"},{"p":"queue","v":"p1","vt":"str"},{"p":"control","v":"true","vt":"bool"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"","payload":"peek","payloadType":"str","x":150,"y":320,"wires":[[]]},{"id":"10a913625e1263db","type":"change","z":"fd6e1b98d7a33d40","name":"","rules":[{"t":"set","p":"currentQueue","pt":"flow","to":"payload","tot":"msg"}],"action":"","property":"","from":"","to":"","reg":false,"x":990,"y":200,"wires":[[]]},{"id":"4a9f5e1c2781c5cc","type":"link in","z":"fd6e1b98d7a33d40","name":"","links":["8dbb09866862a3ba","d5631dee82472a14","6c2510b2feb03582"],"x":295,"y":80,"wires":[["917af25299a46ad7"]]},{"id":"765e45f41e346c32","type":"delay","z":"fd6e1b98d7a33d40","name":"","pauseType":"delay","timeout":"6","timeoutUnits":"seconds","rate":"1","nbRateUnits":"1","rateUnits":"second","randomFirst":"1","randomLast":"5","randomUnits":"seconds","drop":false,"allowrate":false,"outputs":1,"x":590,"y":80,"wires":[["8536ad2453b9907b","73a18ceecce53a93"]]},{"id":"8536ad2453b9907b","type":"function","z":"fd6e1b98d7a33d40","name":"get next","func":"let queue = flow.get('currentQueue');\nnode.send({ queue: queue, control: true, payload: 'drop' });\n\nreturn [\n    { queue: queue, control: true, payload: 'peek' },\n    {payload: queue}\n];\n","outputs":2,"noerr":0,"initialize":"","finalize":"","libs":[],"x":770,"y":80,"wires":[["8dbb09866862a3ba"],[]]},{"id":"8dbb09866862a3ba","type":"link out","z":"fd6e1b98d7a33d40","name":"","links":["4a9f5e1c2781c5cc"],"x":925,"y":40,"wires":[]},{"id":"be8fa3449f228385","type":"inject","z":"fd6e1b98d7a33d40","name":"","props":[],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"","x":110,"y":620,"wires":[["b8d75cbce3ae3205"]]},{"id":"a0aa1bdca5ba065b","type":"inject","z":"fd6e1b98d7a33d40","name":"Trigger dequeue","props":[],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"","x":140,"y":520,"wires":[["f616f3e1e55bf354","2a992ec15e4b3e51"]]},{"id":"75ae9ba4933907cd","type":"change","z":"fd6e1b98d7a33d40","name":"Block dequeueing trigger","rules":[{"t":"set","p":"dequeuing","pt":"flow","to":"true","tot":"bool"}],"action":"","property":"","from":"","to":"","reg":false,"x":650,"y":520,"wires":[[]]},{"id":"b8d75cbce3ae3205","type":"function","z":"fd6e1b98d7a33d40","name":"Unblock dequeuing trigger, in case queue is empty","func":"let currentStatus = flow.get('currentStatus');\nlet lastStatus = flow.get('lastStatus');\n\nlet dequeuingQueues = 0;\nfor (let i = 0; i < currentStatus.length; i++) {\n    if(currentStatus[i].payload.msgQ.length == lastStatus[i].payload.msgQ.length) {\n        flow.set('dequeuing', false);\n    }\n}\n/*\nfor (const e of currentStatus) {\n    if (e.payload.msgQ.length > 0) dequeuingQueues++;\n}\n\nflow.set('dequeuing', dequeuingQueues > 0);\n*/","outputs":0,"noerr":0,"initialize":"","finalize":"","libs":[],"x":490,"y":620,"wires":[]},{"id":"f616f3e1e55bf354","type":"switch","z":"fd6e1b98d7a33d40","name":"Currently dequeuing?","property":"dequeuing","propertyType":"flow","rules":[{"t":"false"}],"checkall":"true","repair":false,"outputs":1,"x":380,"y":520,"wires":[["75ae9ba4933907cd","8cf803693d345eb6"]],"outputLabels":["No"]},{"id":"c9c1e5deff46618b","type":"inject","z":"fd6e1b98d7a33d40","name":"","props":[],"repeat":"","crontab":"","once":true,"onceDelay":0.1,"topic":"","x":120,"y":680,"wires":[["9326ef3eb85a1a07"]]},{"id":"9326ef3eb85a1a07","type":"change","z":"fd6e1b98d7a33d40","name":"","rules":[{"t":"set","p":"dequeuing","pt":"flow","to":"false","tot":"bool"}],"action":"","property":"","from":"","to":"","reg":false,"x":410,"y":680,"wires":[[]]},{"id":"e068acd87d62e37d","type":"comment","z":"fd6e1b98d7a33d40","name":"Dequeuing trigger","info":"","x":150,"y":480,"wires":[]},{"id":"d5631dee82472a14","type":"link out","z":"fd6e1b98d7a33d40","name":"link out 37","mode":"link","links":["4a9f5e1c2781c5cc"],"x":925,"y":460,"wires":[]},{"id":"2a992ec15e4b3e51","type":"change","z":"fd6e1b98d7a33d40","name":"","rules":[{"t":"set","p":"control","pt":"msg","to":"true","tot":"bool"},{"t":"set","p":"payload","pt":"msg","to":"status","tot":"str"},{"t":"set","p":"queue","pt":"msg","to":"all","tot":"str"}],"action":"","property":"","from":"","to":"","reg":false,"x":430,"y":400,"wires":[["6c2510b2feb03582"]]},{"id":"8cf803693d345eb6","type":"change","z":"fd6e1b98d7a33d40","name":"","rules":[{"t":"set","p":"queue","pt":"msg","to":"currentQueue","tot":"flow"},{"t":"set","p":"payload","pt":"msg","to":"trigger","tot":"str"},{"t":"set","p":"control","pt":"msg","to":"true","tot":"bool"}],"action":"","property":"","from":"","to":"","reg":false,"x":710,"y":400,"wires":[["d5631dee82472a14"]]},{"id":"6c2510b2feb03582","type":"link out","z":"fd6e1b98d7a33d40","name":"link out 38","mode":"link","links":["4a9f5e1c2781c5cc"],"x":605,"y":320,"wires":[]},{"id":"73a18ceecce53a93","type":"debug","z":"fd6e1b98d7a33d40","name":"debug 1","active":true,"tosidebar":true,"console":false,"tostatus":false,"complete":"true","targetType":"full","statusVal":"","statusType":"auto","x":1020,"y":140,"wires":[]}]

@IT-VBFK
Copy link
Author

IT-VBFK commented Jul 13, 2023

Or as an alternative: extend q-gate (meaning a new node, p-queue or similar) to order messages in queue by e.g. msg.priority; if undefined leave the order as is.

@drmibell
Copy link
Owner

Or as an alternative: extend q-gate (meaning a new node, p-queue or similar) to order messages in queue by e.g. msg.priority; if undefined leave the order as is.

That's an interesting idea, and I'll come back to it shortly. Your original requirement seems to need just two queues, one priority and the other background, with the latter triggering only when the first is empty. Please let me know whether this flow does the job for you.

[{"id":"6639e0fb64d001eb","type":"q-gate","z":"00ba562ab9fec3da","name":"background","controlTopic":"control","defaultState":"queueing","openCmd":"open","closeCmd":"close","toggleCmd":"toggle","queueCmd":"queue","defaultCmd":"default","triggerCmd":"trigger","flushCmd":"flush","resetCmd":"reset","peekCmd":"peek","dropCmd":"drop","statusCmd":"status","maxQueueLength":"100","keepNewest":false,"qToggle":false,"persist":false,"storeName":"memoryOnly","x":570,"y":780,"wires":[["38d931ed3607c915"]]},{"id":"aad58308027f4896","type":"q-gate","z":"00ba562ab9fec3da","name":"priority","controlTopic":"control","defaultState":"queueing","openCmd":"open","closeCmd":"close","toggleCmd":"toggle","queueCmd":"queue","defaultCmd":"default","triggerCmd":"trigger","flushCmd":"flush","resetCmd":"reset","peekCmd":"peek","dropCmd":"drop","statusCmd":"status","maxQueueLength":"100","keepNewest":false,"qToggle":false,"persist":false,"storeName":"memoryOnly","x":550,"y":860,"wires":[["38d931ed3607c915"]]},{"id":"ae772c4e5e5ed126","type":"switch","z":"00ba562ab9fec3da","name":"","property":"priority","propertyType":"msg","rules":[{"t":"false"},{"t":"true"}],"checkall":"true","repair":false,"outputs":2,"x":350,"y":780,"wires":[["6639e0fb64d001eb"],["aad58308027f4896"]]},{"id":"d15ede1f58107b76","type":"inject","z":"00ba562ab9fec3da","name":"background","props":[{"p":"payload"},{"p":"priority","v":"false","vt":"bool"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"","payload":"","payloadType":"date","x":190,"y":760,"wires":[["ae772c4e5e5ed126"]]},{"id":"8713adc00c377894","type":"inject","z":"00ba562ab9fec3da","name":"priority","props":[{"p":"payload"},{"p":"priority","v":"true","vt":"bool"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"","payload":"","payloadType":"date","x":210,"y":800,"wires":[["ae772c4e5e5ed126"]]},{"id":"88fa9c03969e773a","type":"inject","z":"00ba562ab9fec3da","name":"trigger","props":[{"p":"payload"},{"p":"topic","vt":"str"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"control","payload":"trigger","payloadType":"str","x":210,"y":860,"wires":[["f03141b25f7450a1"]]},{"id":"f03141b25f7450a1","type":"switch","z":"00ba562ab9fec3da","name":"","property":"priority","propertyType":"flow","rules":[{"t":"eq","v":"0","vt":"num"},{"t":"else"}],"checkall":"true","repair":false,"outputs":2,"x":350,"y":860,"wires":[["6639e0fb64d001eb"],["aad58308027f4896"]]},{"id":"f6fea11a9e685869","type":"status","z":"00ba562ab9fec3da","name":"priority status","scope":["aad58308027f4896"],"x":570,"y":920,"wires":[["ee90ca895eafbff9"]]},{"id":"ee90ca895eafbff9","type":"change","z":"00ba562ab9fec3da","name":"set flow.priority","rules":[{"t":"set","p":"priority","pt":"flow","to":"status.text","tot":"msg"},{"t":"set","p":"priority","pt":"flow","to":"$substringAfter($flowContext(\"priority\"),':')\t","tot":"jsonata"}],"action":"","property":"","from":"","to":"","reg":false,"x":740,"y":920,"wires":[[]]},{"id":"38d931ed3607c915","type":"debug","z":"00ba562ab9fec3da","name":"","active":true,"tosidebar":true,"console":false,"tostatus":false,"complete":"true","targetType":"full","statusVal":"","statusType":"auto","x":730,"y":820,"wires":[]}]

Keeping a queue sorted by priority should be possible, but as is often the case with requests for new features or new nodes, I have to weigh the development effort involved against the benefit to users. The multiple-queue and q-gate nodes (as well as node-red-contrib-simple-gate) were always intended to provide basic functionality that can be used with external logic to provide more complex behavior. I am working on a flow that uses a few additional nodes to do the priority sorting, and this will give me an idea of how much effort would be involved in using external nodes as opposed to doing it internally.

@IT-VBFK
Copy link
Author

IT-VBFK commented Jul 14, 2023

From the functional perspective, yes (meaning if I trigger this manually). But I have to wrap an automation around this with following criteria:

  • On the end of the flow it should only pass a message per 5 seconds (this is really important, because this messages trigger some serial interactions, otherwise the serial device stops working because of message flood)
  • it should automatically start dequeuing the correct queue
  • taking all queues into consideration (especially for (1))

Currently, this is working great (apart from the priority thing)

[{"id":"82ee14f9acda0bc7","type":"link in","z":"fe1f1f31bc9cc679","name":"","links":["8b7176152be38b03","9c17a4ab45b9359b"],"x":535,"y":100,"wires":[["03e61716b5a653fe"]]},{"id":"03e61716b5a653fe","type":"q-gate","z":"fe1f1f31bc9cc679","name":"","controlTopic":"control","defaultState":"queueing","openCmd":"open","closeCmd":"close","toggleCmd":"toggle","queueCmd":"queue","defaultCmd":"default","triggerCmd":"trigger","flushCmd":"flush","resetCmd":"reset","peekCmd":"peek","dropCmd":"drop","statusCmd":"status","maxQueueLength":"0","keepNewest":false,"qToggle":false,"persist":false,"storeName":"memory","x":600,"y":160,"wires":[["4497f2c3edb74888","fc79c8a00b7cd7e5"]]},{"id":"4497f2c3edb74888","type":"delay","z":"fe1f1f31bc9cc679","name":"","pauseType":"delay","timeout":"5","timeoutUnits":"seconds","rate":"1","nbRateUnits":"1","rateUnits":"second","randomFirst":"1","randomLast":"5","randomUnits":"seconds","drop":false,"allowrate":false,"outputs":1,"x":790,"y":100,"wires":[["087dc2c9e3fc3c45"]]},{"id":"087dc2c9e3fc3c45","type":"function","z":"fe1f1f31bc9cc679","name":"get next","func":"node.send({topic: \"control\", payload: \"drop\"})\nmsg.topic = \"control\"\nmsg.payload = \"peek\"\nreturn msg;","outputs":1,"noerr":0,"initialize":"","finalize":"","libs":[],"x":970,"y":100,"wires":[["8b7176152be38b03"]]},{"id":"8b7176152be38b03","type":"link out","z":"fe1f1f31bc9cc679","name":"","links":["82ee14f9acda0bc7"],"x":1105,"y":100,"wires":[]},{"id":"ab1fad2d0c75ab5c","type":"comment","z":"fe1f1f31bc9cc679","name":"Dequeuing trigger","info":"","x":200,"y":420,"wires":[]},{"id":"6ae912569c053995","type":"inject","z":"fe1f1f31bc9cc679","name":"Trigger dequeue","props":[{"p":"payload"},{"p":"topic","vt":"str"}],"repeat":"5","crontab":"","once":false,"onceDelay":0.1,"topic":"control","payload":"peek","payloadType":"str","x":210,"y":460,"wires":[["5c137355103fa0ac"]]},{"id":"5c137355103fa0ac","type":"switch","z":"fe1f1f31bc9cc679","name":"Currently dequeuing?","property":"dequeuing","propertyType":"flow","rules":[{"t":"false"}],"checkall":"true","repair":false,"outputs":1,"x":420,"y":460,"wires":[["97ce8b0cee20cfbe"]],"outputLabels":["No"]},{"id":"97ce8b0cee20cfbe","type":"change","z":"fe1f1f31bc9cc679","name":"Block dequeueing trigger","rules":[{"t":"set","p":"dequeuing","pt":"flow","to":"true","tot":"bool"}],"action":"","property":"","from":"","to":"","reg":false,"x":650,"y":460,"wires":[["9c17a4ab45b9359b"]]},{"id":"58ebb96d2ec2eadf","type":"status","z":"fe1f1f31bc9cc679","name":"","scope":["03e61716b5a653fe"],"x":160,"y":560,"wires":[["3a5df3ea476fdd25"]]},{"id":"3a5df3ea476fdd25","type":"function","z":"fe1f1f31bc9cc679","name":"Unblock dequeuing trigger, in case queue is empty","func":"let queueLength = parseInt(msg.status.text.split(' ')[1]);\n\nif(queueLength == 0)\n    flow.set('dequeuing', false);","outputs":0,"noerr":0,"initialize":"","finalize":"","libs":[],"x":550,"y":560,"wires":[]},{"id":"06c7bd45a1813997","type":"inject","z":"fe1f1f31bc9cc679","name":"","props":[],"repeat":"","crontab":"","once":true,"onceDelay":0.1,"topic":"","x":170,"y":620,"wires":[["09c38687cb60f0f0"]]},{"id":"09c38687cb60f0f0","type":"change","z":"fe1f1f31bc9cc679","name":"","rules":[{"t":"set","p":"dequeuing","pt":"flow","to":"false","tot":"bool"}],"action":"","property":"","from":"","to":"","reg":false,"x":460,"y":620,"wires":[[]]},{"id":"c87eb537985ca2f6","type":"inject","z":"fe1f1f31bc9cc679","name":"Generate msg","props":[{"p":"payload"},{"p":"topic","vt":"str"}],"repeat":"1","crontab":"","once":false,"onceDelay":0.1,"topic":"","payload":"","payloadType":"date","x":240,"y":160,"wires":[["03e61716b5a653fe"]]},{"id":"c1f9c2c265c74f36","type":"comment","z":"fe1f1f31bc9cc679","name":"Get data to feed the queue","info":"","x":230,"y":120,"wires":[]},{"id":"fc79c8a00b7cd7e5","type":"debug","z":"fe1f1f31bc9cc679","name":"Serial Actions","active":true,"tosidebar":true,"console":false,"tostatus":false,"complete":"payload","targetType":"msg","statusVal":"","statusType":"auto","x":940,"y":200,"wires":[]},{"id":"9c17a4ab45b9359b","type":"link out","z":"fe1f1f31bc9cc679","name":"link out 43","mode":"link","links":["82ee14f9acda0bc7"],"x":825,"y":460,"wires":[]},{"id":"b1ff27b5325eca85","type":"inject","z":"fe1f1f31bc9cc679","name":"Priority msg","props":[{"p":"payload"},{"p":"topic","vt":"str"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"prio1","payload":"","payloadType":"date","x":230,"y":220,"wires":[["03e61716b5a653fe"]]}]

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants