-
Notifications
You must be signed in to change notification settings - Fork 1
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Multiple queues with different priority #3
Comments
Have you been able to construct a flow that does what you want, possibly with external logic or multiple |
No, unfortunately not. The closest is something like this, but it stops after dequeuing the prio1 queue. [{"id":"fe3b0366e0f7fed9","type":"inject","z":"fd6e1b98d7a33d40","name":"default","props":[{"p":"payload"},{"p":"queue","v":"default","vt":"str"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"","payload":"","payloadType":"str","x":170,"y":160,"wires":[["917af25299a46ad7"]]},{"id":"917af25299a46ad7","type":"m-queue","z":"fd6e1b98d7a33d40","name":"","queueSelect":"queue","controlFlag":"control","defaultQueue":"default","allQueues":"all","triggerCmd":"trigger","statusCmd":"status","pauseCmd":"pause","resumeCmd":"resume","flushCmd":"flush","resetCmd":"reset","peekCmd":"peek","dropCmd":"drop","maximumCmd":"maximum","newestCmd":"newest","protectCmd":"protect","deleteCmd":"delete","paused":false,"protect":false,"keepNewestDefault":false,"maxSizeDefault":100,"protectDefault":true,"persist":false,"newValue":"value","storeName":"memory","statusOutput":true,"outputs":2,"x":420,"y":200,"wires":[["765e45f41e346c32"],["399d048e86c4cd58"]]},{"id":"28ce3e2ce0b5a23e","type":"inject","z":"fd6e1b98d7a33d40","name":"","props":[{"p":"payload"},{"p":"queue","v":"all","vt":"str"},{"p":"control","v":"true","vt":"bool"}],"repeat":"2","crontab":"","once":false,"onceDelay":0.1,"topic":"","payload":"status","payloadType":"str","x":160,"y":240,"wires":[[]]},{"id":"82d37241fa34a301","type":"function","z":"fd6e1b98d7a33d40","name":"function 7","func":"if(msg.payload.length == 1)\n return {payload: 'default'};\n\nlet returnValue = \"default\";\nflow.set('currentStatus', msg.payload);\n\nfor (const e of msg.payload) {\n if(e.payload.qName == 'default'){\n if (e.payload.msgQ.length > 0)\n returnValue = \"default\";\n }\n if (e.payload.qName == 'p1') {\n if (e.payload.msgQ.length > 0){\n returnValue = \"p1\";\n break;\n }\n }\n}\n\nreturn {payload: returnValue};","outputs":1,"noerr":0,"initialize":"","finalize":"","libs":[],"x":740,"y":200,"wires":[["10a913625e1263db"]]},{"id":"399d048e86c4cd58","type":"join","z":"fd6e1b98d7a33d40","name":"","mode":"custom","build":"array","property":"","propertyType":"full","key":"topic","joiner":"\\n","joinerType":"str","accumulate":false,"timeout":"","count":"2","reduceRight":false,"reduceExp":"","reduceInit":"","reduceInitType":"","reduceFixup":"","x":590,"y":200,"wires":[["82d37241fa34a301"]]},{"id":"566bdaf69e3bb10d","type":"inject","z":"fd6e1b98d7a33d40","name":"p1","props":[{"p":"payload"},{"p":"queue","v":"p1","vt":"str"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"","payload":"","payloadType":"str","x":170,"y":200,"wires":[["917af25299a46ad7"]]},{"id":"d0b3fe2e439acead","type":"inject","z":"fd6e1b98d7a33d40","name":"","props":[{"p":"payload"},{"p":"queue","v":"default","vt":"str"},{"p":"control","v":"true","vt":"bool"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"","payload":"peek","payloadType":"str","x":150,"y":280,"wires":[[]]},{"id":"b5ed5f14d1f4fb10","type":"inject","z":"fd6e1b98d7a33d40","name":"p1","props":[{"p":"payload"},{"p":"queue","v":"p1","vt":"str"},{"p":"control","v":"true","vt":"bool"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"","payload":"peek","payloadType":"str","x":150,"y":320,"wires":[[]]},{"id":"10a913625e1263db","type":"change","z":"fd6e1b98d7a33d40","name":"","rules":[{"t":"set","p":"currentQueue","pt":"flow","to":"payload","tot":"msg"}],"action":"","property":"","from":"","to":"","reg":false,"x":990,"y":200,"wires":[[]]},{"id":"4a9f5e1c2781c5cc","type":"link in","z":"fd6e1b98d7a33d40","name":"","links":["8dbb09866862a3ba","d5631dee82472a14","6c2510b2feb03582"],"x":295,"y":80,"wires":[["917af25299a46ad7"]]},{"id":"765e45f41e346c32","type":"delay","z":"fd6e1b98d7a33d40","name":"","pauseType":"delay","timeout":"6","timeoutUnits":"seconds","rate":"1","nbRateUnits":"1","rateUnits":"second","randomFirst":"1","randomLast":"5","randomUnits":"seconds","drop":false,"allowrate":false,"outputs":1,"x":590,"y":80,"wires":[["8536ad2453b9907b","73a18ceecce53a93"]]},{"id":"8536ad2453b9907b","type":"function","z":"fd6e1b98d7a33d40","name":"get next","func":"let queue = flow.get('currentQueue');\nnode.send({ queue: queue, control: true, payload: 'drop' });\n\nreturn [\n { queue: queue, control: true, payload: 'peek' },\n {payload: queue}\n];\n","outputs":2,"noerr":0,"initialize":"","finalize":"","libs":[],"x":770,"y":80,"wires":[["8dbb09866862a3ba"],[]]},{"id":"8dbb09866862a3ba","type":"link out","z":"fd6e1b98d7a33d40","name":"","links":["4a9f5e1c2781c5cc"],"x":925,"y":40,"wires":[]},{"id":"be8fa3449f228385","type":"inject","z":"fd6e1b98d7a33d40","name":"","props":[],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"","x":110,"y":620,"wires":[["b8d75cbce3ae3205"]]},{"id":"a0aa1bdca5ba065b","type":"inject","z":"fd6e1b98d7a33d40","name":"Trigger dequeue","props":[],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"","x":140,"y":520,"wires":[["f616f3e1e55bf354","2a992ec15e4b3e51"]]},{"id":"75ae9ba4933907cd","type":"change","z":"fd6e1b98d7a33d40","name":"Block dequeueing trigger","rules":[{"t":"set","p":"dequeuing","pt":"flow","to":"true","tot":"bool"}],"action":"","property":"","from":"","to":"","reg":false,"x":650,"y":520,"wires":[[]]},{"id":"b8d75cbce3ae3205","type":"function","z":"fd6e1b98d7a33d40","name":"Unblock dequeuing trigger, in case queue is empty","func":"let currentStatus = flow.get('currentStatus');\nlet lastStatus = flow.get('lastStatus');\n\nlet dequeuingQueues = 0;\nfor (let i = 0; i < currentStatus.length; i++) {\n if(currentStatus[i].payload.msgQ.length == lastStatus[i].payload.msgQ.length) {\n flow.set('dequeuing', false);\n }\n}\n/*\nfor (const e of currentStatus) {\n if (e.payload.msgQ.length > 0) dequeuingQueues++;\n}\n\nflow.set('dequeuing', dequeuingQueues > 0);\n*/","outputs":0,"noerr":0,"initialize":"","finalize":"","libs":[],"x":490,"y":620,"wires":[]},{"id":"f616f3e1e55bf354","type":"switch","z":"fd6e1b98d7a33d40","name":"Currently dequeuing?","property":"dequeuing","propertyType":"flow","rules":[{"t":"false"}],"checkall":"true","repair":false,"outputs":1,"x":380,"y":520,"wires":[["75ae9ba4933907cd","8cf803693d345eb6"]],"outputLabels":["No"]},{"id":"c9c1e5deff46618b","type":"inject","z":"fd6e1b98d7a33d40","name":"","props":[],"repeat":"","crontab":"","once":true,"onceDelay":0.1,"topic":"","x":120,"y":680,"wires":[["9326ef3eb85a1a07"]]},{"id":"9326ef3eb85a1a07","type":"change","z":"fd6e1b98d7a33d40","name":"","rules":[{"t":"set","p":"dequeuing","pt":"flow","to":"false","tot":"bool"}],"action":"","property":"","from":"","to":"","reg":false,"x":410,"y":680,"wires":[[]]},{"id":"e068acd87d62e37d","type":"comment","z":"fd6e1b98d7a33d40","name":"Dequeuing trigger","info":"","x":150,"y":480,"wires":[]},{"id":"d5631dee82472a14","type":"link out","z":"fd6e1b98d7a33d40","name":"link out 37","mode":"link","links":["4a9f5e1c2781c5cc"],"x":925,"y":460,"wires":[]},{"id":"2a992ec15e4b3e51","type":"change","z":"fd6e1b98d7a33d40","name":"","rules":[{"t":"set","p":"control","pt":"msg","to":"true","tot":"bool"},{"t":"set","p":"payload","pt":"msg","to":"status","tot":"str"},{"t":"set","p":"queue","pt":"msg","to":"all","tot":"str"}],"action":"","property":"","from":"","to":"","reg":false,"x":430,"y":400,"wires":[["6c2510b2feb03582"]]},{"id":"8cf803693d345eb6","type":"change","z":"fd6e1b98d7a33d40","name":"","rules":[{"t":"set","p":"queue","pt":"msg","to":"currentQueue","tot":"flow"},{"t":"set","p":"payload","pt":"msg","to":"trigger","tot":"str"},{"t":"set","p":"control","pt":"msg","to":"true","tot":"bool"}],"action":"","property":"","from":"","to":"","reg":false,"x":710,"y":400,"wires":[["d5631dee82472a14"]]},{"id":"6c2510b2feb03582","type":"link out","z":"fd6e1b98d7a33d40","name":"link out 38","mode":"link","links":["4a9f5e1c2781c5cc"],"x":605,"y":320,"wires":[]},{"id":"73a18ceecce53a93","type":"debug","z":"fd6e1b98d7a33d40","name":"debug 1","active":true,"tosidebar":true,"console":false,"tostatus":false,"complete":"true","targetType":"full","statusVal":"","statusType":"auto","x":1020,"y":140,"wires":[]}] |
Or as an alternative: extend q-gate (meaning a new node, p-queue or similar) to order messages in queue by e.g. msg.priority; if undefined leave the order as is. |
That's an interesting idea, and I'll come back to it shortly. Your original requirement seems to need just two queues, one priority and the other background, with the latter triggering only when the first is empty. Please let me know whether this flow does the job for you.
Keeping a queue sorted by priority should be possible, but as is often the case with requests for new features or new nodes, I have to weigh the development effort involved against the benefit to users. The |
From the functional perspective, yes (meaning if I trigger this manually). But I have to wrap an automation around this with following criteria:
Currently, this is working great (apart from the priority thing) [{"id":"82ee14f9acda0bc7","type":"link in","z":"fe1f1f31bc9cc679","name":"","links":["8b7176152be38b03","9c17a4ab45b9359b"],"x":535,"y":100,"wires":[["03e61716b5a653fe"]]},{"id":"03e61716b5a653fe","type":"q-gate","z":"fe1f1f31bc9cc679","name":"","controlTopic":"control","defaultState":"queueing","openCmd":"open","closeCmd":"close","toggleCmd":"toggle","queueCmd":"queue","defaultCmd":"default","triggerCmd":"trigger","flushCmd":"flush","resetCmd":"reset","peekCmd":"peek","dropCmd":"drop","statusCmd":"status","maxQueueLength":"0","keepNewest":false,"qToggle":false,"persist":false,"storeName":"memory","x":600,"y":160,"wires":[["4497f2c3edb74888","fc79c8a00b7cd7e5"]]},{"id":"4497f2c3edb74888","type":"delay","z":"fe1f1f31bc9cc679","name":"","pauseType":"delay","timeout":"5","timeoutUnits":"seconds","rate":"1","nbRateUnits":"1","rateUnits":"second","randomFirst":"1","randomLast":"5","randomUnits":"seconds","drop":false,"allowrate":false,"outputs":1,"x":790,"y":100,"wires":[["087dc2c9e3fc3c45"]]},{"id":"087dc2c9e3fc3c45","type":"function","z":"fe1f1f31bc9cc679","name":"get next","func":"node.send({topic: \"control\", payload: \"drop\"})\nmsg.topic = \"control\"\nmsg.payload = \"peek\"\nreturn msg;","outputs":1,"noerr":0,"initialize":"","finalize":"","libs":[],"x":970,"y":100,"wires":[["8b7176152be38b03"]]},{"id":"8b7176152be38b03","type":"link out","z":"fe1f1f31bc9cc679","name":"","links":["82ee14f9acda0bc7"],"x":1105,"y":100,"wires":[]},{"id":"ab1fad2d0c75ab5c","type":"comment","z":"fe1f1f31bc9cc679","name":"Dequeuing trigger","info":"","x":200,"y":420,"wires":[]},{"id":"6ae912569c053995","type":"inject","z":"fe1f1f31bc9cc679","name":"Trigger dequeue","props":[{"p":"payload"},{"p":"topic","vt":"str"}],"repeat":"5","crontab":"","once":false,"onceDelay":0.1,"topic":"control","payload":"peek","payloadType":"str","x":210,"y":460,"wires":[["5c137355103fa0ac"]]},{"id":"5c137355103fa0ac","type":"switch","z":"fe1f1f31bc9cc679","name":"Currently dequeuing?","property":"dequeuing","propertyType":"flow","rules":[{"t":"false"}],"checkall":"true","repair":false,"outputs":1,"x":420,"y":460,"wires":[["97ce8b0cee20cfbe"]],"outputLabels":["No"]},{"id":"97ce8b0cee20cfbe","type":"change","z":"fe1f1f31bc9cc679","name":"Block dequeueing trigger","rules":[{"t":"set","p":"dequeuing","pt":"flow","to":"true","tot":"bool"}],"action":"","property":"","from":"","to":"","reg":false,"x":650,"y":460,"wires":[["9c17a4ab45b9359b"]]},{"id":"58ebb96d2ec2eadf","type":"status","z":"fe1f1f31bc9cc679","name":"","scope":["03e61716b5a653fe"],"x":160,"y":560,"wires":[["3a5df3ea476fdd25"]]},{"id":"3a5df3ea476fdd25","type":"function","z":"fe1f1f31bc9cc679","name":"Unblock dequeuing trigger, in case queue is empty","func":"let queueLength = parseInt(msg.status.text.split(' ')[1]);\n\nif(queueLength == 0)\n flow.set('dequeuing', false);","outputs":0,"noerr":0,"initialize":"","finalize":"","libs":[],"x":550,"y":560,"wires":[]},{"id":"06c7bd45a1813997","type":"inject","z":"fe1f1f31bc9cc679","name":"","props":[],"repeat":"","crontab":"","once":true,"onceDelay":0.1,"topic":"","x":170,"y":620,"wires":[["09c38687cb60f0f0"]]},{"id":"09c38687cb60f0f0","type":"change","z":"fe1f1f31bc9cc679","name":"","rules":[{"t":"set","p":"dequeuing","pt":"flow","to":"false","tot":"bool"}],"action":"","property":"","from":"","to":"","reg":false,"x":460,"y":620,"wires":[[]]},{"id":"c87eb537985ca2f6","type":"inject","z":"fe1f1f31bc9cc679","name":"Generate msg","props":[{"p":"payload"},{"p":"topic","vt":"str"}],"repeat":"1","crontab":"","once":false,"onceDelay":0.1,"topic":"","payload":"","payloadType":"date","x":240,"y":160,"wires":[["03e61716b5a653fe"]]},{"id":"c1f9c2c265c74f36","type":"comment","z":"fe1f1f31bc9cc679","name":"Get data to feed the queue","info":"","x":230,"y":120,"wires":[]},{"id":"fc79c8a00b7cd7e5","type":"debug","z":"fe1f1f31bc9cc679","name":"Serial Actions","active":true,"tosidebar":true,"console":false,"tostatus":false,"complete":"payload","targetType":"msg","statusVal":"","statusType":"auto","x":940,"y":200,"wires":[]},{"id":"9c17a4ab45b9359b","type":"link out","z":"fe1f1f31bc9cc679","name":"link out 43","mode":"link","links":["82ee14f9acda0bc7"],"x":825,"y":460,"wires":[]},{"id":"b1ff27b5325eca85","type":"inject","z":"fe1f1f31bc9cc679","name":"Priority msg","props":[{"p":"payload"},{"p":"topic","vt":"str"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"prio1","payload":"","payloadType":"date","x":230,"y":220,"wires":[["03e61716b5a653fe"]]}] |
Hi
Just a quick question: Is it possible somehow to treat queues with a different priority?
My use case is: I have two queues, the first handles some background tasks while the second should be open for "immediate" response * (in my case a HTTP response, which should be handled with priority, so the users don't have to wait longer than needed).
If I could tell the node that it should per default start processing the default queue, but if something happens in the second queue it should stop and continue in that queue until this queue is empty and then proceed with the background tasks again.
Thank you :)
*) I have to handle this that way, because my queue items are serial commands and this requires some delay between
The text was updated successfully, but these errors were encountered: