Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix throttling #25801

Open
wants to merge 3 commits into
base: dev
Choose a base branch
from
Open

Fix throttling #25801

wants to merge 3 commits into from

Conversation

gracianodias3
Copy link

@gracianodias3 gracianodias3 commented Jan 13, 2025

Olá!

I am having same problem as peoples in #25030. After looking at the good work in #24122, I investigate in code and discover that throttleit package is not throwing away extra calls, but putting them in queue (I think this is not what we want it to do). Also, I think it's not so good to add new dependency (because of security risks with supply chain attacks) just for one small function, so I coped important parts into throttle.ts.

@gracianodias3
Copy link
Author

gracianodias3 commented Jan 13, 2025

I've also renamed the parameter from throttle to min_time_between_payloads, as it's more descriptive of what setting is actually doing. I've also tried to expose this in the UI. But changing the name will be a breaking change. I don't know if this is ok...

@gracianodias3
Copy link
Author

Ah, I read code again and notice it has clearTimeout in function that returns, so maybe it not making queue like I think before. I need study more to understand why throttle not working for me. I keep this PR open while I try to figure out problem.

@Koenkk
Copy link
Owner

Koenkk commented Jan 14, 2025

But changing the name will be a breaking change. I don't know if this is ok...

Let's keep it throttle to avoid a breaking changes.

@ivanfmartinez could you also take a look?

@gracianodias3
Copy link
Author

I thought this might be race condition, and tried to reproduce:

func main() {
	var wg sync.WaitGroup
	for i := 0; i < 20; i++ {
		wg.Add(1)

		go func() {
			fmt.Println("Sending req")
			_, err := http.Get("http://localhost:3000/")
			if err != nil {
				log.Fatal(err)
			}
			time.Sleep(250 * time.Millisecond)
			wg.Done()
		}()
	}

	wg.Wait()
}

and

const rootHandler = (req, res) => {
    totalCallCount++
    console.log("hit root, callCount = ", totalCallCount);
    res.send('ok');
}

const throttledRootHandler = throttle(rootHandler, 1000);

app.get('/', throttledRootHandler);

but it work as it should:

hit root, callCount =  1
hit root, callCount =  2

This is not what I see in my home assistant though.


Ahhh, I think I understand problem! I use Home Assistant on HAOS. When we restart Home Assistant (on HAOS), the add-on containers not stop - they continue running. When we change throttle value, it only start working when we restart Zigbee2MQTT. I was restarting Home Assistant only, thinking this would restart Zigbee2MQTT too, so looks like throttle not working. But when I restart Zigbee2MQTT by itself, not with Home Assistant, then the changes work right!

image

I will put UI changes in this PR, and also bring throttleit package inside this project for safety reason. I try something like:

const freshThrottle = settings.getDevice(device.ieeeAddr).options.throttle;

to see if maybe we can get new throttle value without needing restart container, but this also get old value still.

@ivanfmartinez
Copy link
Contributor

If I understand correctly the last message my implementation is working @gracianodias3 just have not restarted the zigbee2mqtt process when tried first.

I dont know much about the z2m internals, but as other parameters can be changed from UI probably throttle can be changed also, I just dont know how to do this changes on UI.

@gracianodias3
Copy link
Author

If I understand correctly the last message my implementation is working @gracianodias3 just have not restarted the zigbee2mqtt process when tried first.

I dont know much about the z2m internals, but as other parameters can be changed from UI probably throttle can be changed also, I just dont know how to do this changes on UI.

The code you make works good, but me (and I think other people too) thought that when we restart Home Assistant, it would restart Zigbee2MQTT also, but não é assim when using HAOS (I not know about other ways to install). I put UI elements in PR (I think I did right), but when trying to make device settings read new value immediately, I could not make work.

@gracianodias3
Copy link
Author

gracianodias3 commented Jan 14, 2025

I just test changing values with new UI element, and is working right away without need restart!

image

Edit:

Actually I think is not correct what I say. I make test again and not working like I think. When I think more about this, probably is because this.throttlers[device.ieeeAddr] is like saving old value in cache. And I think same thing probably happening with debouncer too.

@gracianodias3
Copy link
Author

Now is working, I test changing values in UI. I make fix that is ok solution but working:

    async publishThrottle(device: Device, payload: KeyValue, time: number): Promise<void> {
        const throttleKey = device.ieeeAddr + device.options.throttle;
        if (!this.throttlers[throttleKey]) {
            this.throttlers[throttleKey] = {
                publish: throttle(this.publishEntityState, time * 1000),
            };
        }

        // Update state cache right away. This makes sure that during throttling cached state is always up to date.
        // By updating cache we make sure that state cache is always up-to-date.
        this.state.set(device, payload);

        await this.throttlers[throttleKey].publish(device, payload, 'publishThrottle');
    }

The throttleKey only work for specific throttle value, so when value change the cache is not valid anymore.

image

@Koenkk
Copy link
Owner

Koenkk commented Jan 15, 2025

I got a bit lost in what problem you are trying to solve here, with the example code from throttleit:

const throttle = require('throttleit');

// Throttling a function that processes data.
function processData(data) {
	console.log('Processing:', data);

	// Add data processing logic here.
}

// Throttle the `processData` function to be called at most once every 3 seconds.
const throttledProcessData = throttle(processData, 3000);

// Simulate calling the function multiple times with different data.
throttledProcessData('Data 1');
throttledProcessData('Data 2');
throttledProcessData('Data 3');

I get the following output:

$ node test.js
Processing: Data 1
Processing: Data 3

That looks correct to me?

@gracianodias3
Copy link
Author

Sorry, my English is not so good. I've run this through ChatGPT to improve it, I don't know if it says what I mean though:

Sorry, it's become a bit of a mess. I, along with others, found that when we updated the throttle value it wasn't respected, even with a restart of Home Assistant. I initially thought it was due to the throttleit library, but that's behaving as expected. Poking around, I discovered a few things:

  • If using HAOS, rebooting Home Assistant doesn't reboot Zigbee2MQTT
  • That shouldn't be a problem, but the throttler functionality (and debounce, from the looks of it) use the device address as the key when setting up a throttler. What that means is that unless Zigbee2MQTT is restarted, those values will be cached because this.throttlers[device.ieeeAddr] will always refer to the throttler with the first value that it saw. I've updated the throttler to use what is effectively (device.ieeeAddr, throttleValue) as the key, so that if the throttle value changes from 1 -> 2, it stops looking for (device.ieeeAddr, 1), and instead looks for (device.ieeeAddr, 2).

So a description of the changes

  • I've vendored the throttleit library. I'm generally not a big fan of pulling in dependencies for effectively a single function, but I know the JS world often feels differently
  • I've moved to using throttleKey for the .throttlers lookup so that when the throttle value changes, the 'cache' is invalidated
  • I've added a test to capture that this behaviour is respected
  • Added the UI element to the settings page to set the throttle value

There are people who've complained about the throttling functionality not working. I suspect they're running into the same problem that I was, which was changing the throttle value via the filesystem didn't result in messages being throttled, even after a restart. This PR fixes that.

@@ -71,8 +71,9 @@ export default class Receive extends Extension {
}

async publishThrottle(device: Device, payload: KeyValue, time: number): Promise<void> {
if (!this.throttlers[device.ieeeAddr]) {
this.throttlers[device.ieeeAddr] = {
const throttleKey = device.ieeeAddr + device.options.throttle;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will keep stale values in memory indefinitely (until Z2M shutdown). Better to invalidate the throttler when the option changes for a device.
Can't we just set the option as "restart mandatory" though (cleaner for such a feature)?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for review! I am again running my message in ChatGPT, please tell me if unclear.

This will keep stale values in memory indefinitely (until Z2M shutdown). Better to invalidate the throttler when the option changes for a device.

I considered the memory issue, but here's why I think it's okay:

  • Memory Impact: These chatty devices won't be used in large numbers since they can slow down networks. Most users won't have hundreds of them or frequently change their throttle settings.
  • Cache Invalidation: I looked into clearing the throttle cache when settings change, but wasn't sure how to connect the settings UI to the ExtensionReceive component. Do you know an easy way?
  • Current Approach: Given the small real-world memory impact, keeping values in memory seems better than adding complex state management.
  • Alternative Solution: We could auto-clear individual cached values after 2*throttleValue using setTimeout, but this adds complexity for minimal benefit.

Can't we just set the option as "restart mandatory" though (cleaner for such a feature)?

That's not user-friendly. Plus, as we've seen, restarting Home Assistant on HAOS doesn't restart Zigbee2MQTT, which confuses users.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't we just set the option as "restart mandatory" though (cleaner for such a feature)?

I'm also in favour of this. The restart button will automatically show up in the frontend. Settings requiresRestart true will enable this:

"requiresRestart": true

@@ -0,0 +1,23 @@
export default function throttle<Args extends unknown[]>(fn: (...args: Args) => Promise<void>, wait: number): (...args: Args) => Promise<void> {
Copy link
Collaborator

@Nerivec Nerivec Jan 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't test it, but this looks to be messing with context, and also the return type is no longer generic as before.
Can we just bring the same code over, update it to typescript (the type is already in there as d.ts), and avoid potential issues down the road?

And also include a link to the source.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for catching that. While my tests pass and it's working on my system, there are some TypeScript issues to address.

I tried implementing your suggestion but ran into problems with the any type. We'd need to enable noImplicitThis to use ThisType. I'm not very experienced with some TypeScript features, so there might be a simple solution I'm missing. I have a partial solution, but it's not perfect:

export default function throttle<T extends (...arguments_: unknown[]) => unknown>(function_: T, wait: number): T {
	if (typeof function_ !== 'function') {
		throw new TypeError(`Expected the first argument to be a \`function\`, got \`${typeof function_}\`.`);
	}

	// TODO: Add `wait` validation too in the next major version.

	let timeoutId: NodeJS.Timeout;
	let lastCallTime = 0;

	return function throttled(...arguments_) {  
		clearTimeout(timeoutId);

		const now = Date.now();
		const timeSinceLastCall = now - lastCallTime;
		const delayForNextCall = wait - timeSinceLastCall;

		if (delayForNextCall <= 0) {
			lastCallTime = now;
			function_.apply(this, arguments_);
		} else {
			timeoutId = setTimeout(() => {
				lastCallTime = Date.now();
				function_.apply(this, arguments_);
			}, delayForNextCall);
		}
	} as T;
}

This still relies on runtime type checking. While we could use runtime type checking, I prefer not to since:

  • The throttle function will likely only be used for this specific throttling feature
  • It's better to let the TypeScript compiler catch issues at compile time
  • We should make the types as strict as possible

You're right about adding attribution. If you know a better way to make the type checker happy, I'm interested. While my current TypeScript implementation works, I'm not sure if changing the function's context could cause problems.

@Koenkk
Copy link
Owner

Koenkk commented Jan 17, 2025

@gracianodias3

I've vendored the throttleit library. I'm generally not a big fan of pulling in dependencies for effectively a single function, but I know the JS world often feels differently

If the library works fine, lets keep on using it and no reinvent the wheel here. The package is also widely used (6M downloads per week)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants