-
Notifications
You must be signed in to change notification settings - Fork 12
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Introduce configurable congestion control. #31
base: main
Are you sure you want to change the base?
Conversation
The primary motivation for this (as of 2024/02/28) is to protect PlayHT On-Prem appliance from being inundated with a burst text-to-speech requests that it can't satisfy. Prior to this change, the client would split a text stream into two text chunks (referred to as "sentences") and send them to the API client (i.e. gRPC client) simultaneously. This would routinely overload on-prem appliances that operate without a lot of GPU capacity headroom[1]. The result would be that most requests that clients sent would immediately result in a gRPC error 8: RESOURCE_EXHAUSTED; and therefore, a bad customer experience. This change introduces allows customers to turn on one of a enumerated set of congestion control algorithms. We've implemented just one for now, StaticMar2024, which delays sending subsequent text chunks (i.e. sentences) to the gRPC client until audio for the preceding text chunk has started streaming. This is a very simple congestion control algorithm with static constants; it leaves a lot to be desired. We should iterate on these algorithms in the future. The CongestionCtrl enum was added so that algorithms can be added without requiring customers to change their code much. [1] Customers tend to be very cost sensitive regarding expensive GPU capacity, and therefore want to keep their appliances running near 100% utilization.
Hi @rodms10 - Any chance you can look at this one early next week? Sorry to be a bother - we've got customers excited to use this change, and currently blocked on it. Thank you 🙏🙏 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will be a great addition! Let's make the parallelism of 2 the default.
Can you run yarn verify
and yarn lint
please?
packages/playht/src/api/apiCommon.ts
Outdated
this.postChunkBackoff = 0; | ||
break; | ||
case CongestionCtrl.StaticMar2024: | ||
this.parallelism = 1; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd love for this to be configurable. We can have a default of 2, it will smooth out requests to our servers as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Short: Can we do this later? I don't want to cause an outage by changing default behavior.
Long:
So the idea is that customers configure this by setting CongestionCtrl with PlayHT.init. I'm wary of exposing the lower level knobs like parallelism and backoff directly to customers because I'm not convinced that static numbers are the right way to go medium-term. Medium-term, I suspect we'll want a congestion control algorithm that adapts to feedback the client gets from turbo-prop (e.g. RESOURCE_EXHAUSTED, or actual latencies vs expected latencies). If we expose the low level knobs directly to customers, then we'll have to accommodate those public low level knobs in perpetuity (or at least for some long deprecation time period).
I agree that parallelism = 2 is probably a good default (which is why I'm asking on-prem customers to use it); but I'm wary of imposing this behavior change on all customers, because it's not obvious to me what the performance artifacts will be in those cases. I'm hand holding every on-prem customers through use-cases; both from the SDK side and the appliance side (e.g. adjust the number of GPUs they have available). I suspect if we adjusted the parallelism here - we'd end up with a deluge of support requests (warranted or not) that we wouldn't be able to handle.
The way to do this would be to add a third CongestionCtrl option - say CongestionCtrl.SmoothApr2024 - that set the parallelism = 2 and postChunkBackoff=XY. Then we could test it slowly with select customers or a % of random streams and see the results (assuming we also build a way to gather SDK telemetry).
packages/playht/src/api/apiCommon.ts
Outdated
|
||
if (this.algo == CongestionCtrl.Off) return | ||
|
||
this.inflight = Math.max(this.inflight - 1, 0); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you're reducing inflight at the first chunk, why do it here too?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great catch. I was being lazy here and using the "end" event as a way to mask my lack of understanding which output formats are guaranteed to return output headers and which ones are not. I've updated the logic. Can you take a look?
packages/playht/src/api/apiCommon.ts
Outdated
|
||
constructor(fn: Function, name: string) { | ||
this.fn = fn; | ||
this.name = name |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is name used? If not, might not be useful to have a class.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Used for debugging the task queue. I added a note.
(If you feel strongly about removing it - I can do so. FWIW: each time this needs to be debugged, a debugger like myself would need to add Task::name back in. Not a huge deal.)
|
||
constructor( | ||
private readonly request: apiProto.playht.v1.ITtsRequest, | ||
private readonly rpcClient: grpc.Client, | ||
private readonly fallbackClient?: grpc.Client, | ||
) {} | ||
private readonly congestionCtrl?: CongestionCtrl |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Doing the congestion control in apiCommon should be enough, no?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Most of the customers I've been dealing with don't do retry very well - and the ones that do retry aren't wild about the fact that there are errors coming back. So, congestion control also manifests as retry+backoff inside the tts stream source.
I was running into dependency cycles due to the CongestionCtrl enum being defined in
|
/** | ||
* Enumerates a streaming congestion control algorithms, used to optimize the rate at which text is sent to PlayHT. | ||
*/ | ||
export enum CongestionCtrl { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I had to copy and paste this here to get around dependency cycles surfaced by yarn verify.
Thanks for the review @rodms10 . I made changes to address your (really good) feedback. Can you take another look? (I'm not wild about copying and pasting CongestionCtrl. But I had to do it to avoid dependency cycle errors surfaced by yarn verify. If you have another solution - let me know!) |
Hi @rodms10 - any chance you can take a look at this during the week? |
/** | ||
* Enumerates a streaming congestion control algorithms, used to optimize the rate at which text is sent to PlayHT. | ||
*/ | ||
export type CongestionCtrl = | ||
/** | ||
* The client will not do any congestion control. Text will be sent to PlayHT as fast as possible. | ||
*/ | ||
| 'Off' | ||
|
||
/** | ||
* The client will optimize for minimizing the number of physical resources required to handle a single stream. | ||
* | ||
* If you're using PlayHT On-Prem, you should use this {@link CongestionCtrl} algorithm. | ||
*/ | ||
| 'StaticMar2024'; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wdyt about deleting this and have it just used the one from ../grpc-client/congestion-ctrl.ts
? From what I could gather from a quick look, other files on src/api
already import elements from src/grpc-client
, so the dependency between the two modules (api
and grpc-client
) already exist
If leaving them (CongestionCtrl
and CongestionController
) far away is a bad thing, then maybe they should both be moved to a new module -- that both api
and grpc-client
would depend on, but that itself wouldn't depend on either, which would not create the dependency loop
Edit:
I had a better look and changed my mind: I think congestion
should be a module/package at the same level of api
and grpc-client
, since both use it and having it within grpc-client
is a mistake because congestion control is not tied to gprc logic only.
Feel free to leave it like this now, though. This is something we can refactor after this PR.
const nextAudioChunk = (async () => { | ||
return await internalGenerateStreamFromString(sentence, options); | ||
})(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, I feel like I may be missing something, isn't that the equivalent to this:
Edit:
I had a better look, probably the async IIFE was residual of old refactorings. Anyway, I think this simplification would be a good idea:
const nextAudioChunk = (async () => { | |
return await internalGenerateStreamFromString(sentence, options); | |
})(); | |
const nextAudioChunk = internalGenerateStreamFromString(sentence, options); |
// NOTE: The cast below is to avoid a cyclic dependency warning from "yarn verify" | ||
switch ((<{ outputFormat: string }>options).outputFormat) { | ||
case 'wav': | ||
completion.headersRemaining = 1; | ||
break; | ||
case 'mp3': | ||
completion.headersRemaining = 1; | ||
break; | ||
default: | ||
break; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I didn't get the warning you mention, but this cast did introduce a bug for me. I made some tests using just PlayHT.stream('text')
and the lack of options
made this .outputFormat
throw an error.
Something like this fixed it for me:
// NOTE: The cast below is to avoid a cyclic dependency warning from "yarn verify" | |
switch ((<{ outputFormat: string }>options).outputFormat) { | |
case 'wav': | |
completion.headersRemaining = 1; | |
break; | |
case 'mp3': | |
completion.headersRemaining = 1; | |
break; | |
default: | |
break; | |
} | |
if (typeof options === 'object' && 'outputFormat' in options) { | |
switch (options.outputFormat) { | |
case 'wav': | |
completion.headersRemaining = 1; | |
break; | |
case 'mp3': | |
completion.headersRemaining = 1; | |
break; | |
default: | |
break; | |
} | |
} |
setTimeout( | ||
() => | ||
writeAudio.on('finish', () => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So what is going on here:
audioChunkStream
is being.pipe()
d intowriteAudio
, which means whenaudioChunkStream
ends, it will endwriteAudio
. Ok, good.- Now notice that, OTOH,
writeAudio
is not beingpipe
d intowritableStream
. I mean, it is being "piped indirectly" by writing intowritableStream
from within the on thewrite()
method ofwriteAudio
, but this setup is not forwarding the EOF signal (aka it is not endingwritableStream
whenwriteAudio
ends, like thepipe
mentioned above does) - So, that
audioChunkStream.on('end',
bit above, which I'm suggesting the fix on, is there to take care of this ending.- But notice the following caveat: the
writeAudio.on('finish', ..)
event subscription happens right afteraudioChunkStream
'end'
s, right? Yeah, not exactly. It happens "asetTimeout
away" of that. It's a detail, but an important one. - This means that if the
'finish'
event ofaudioStream
happens before thatsetTimeout
callback runs (even though the time window for that is so tiny, but it exists), then thisaudioChunkStream.on('end'
event subscription will never have any practical effect, since thefinish
event -- that it will wait for -- will never happen again. When this is the case, thewritableStream.end()
bit is never called and therefore the stream never ends.
- But notice the following caveat: the
But this whole scenario was there before the congestion control, how could it affect it then?
The "new" bit is the postChunkBackoff
delay the congestionCtrl
adds. In a gist, if the last write()
call on writeAudio
takes less than postChunkBackoff
to complete, it means that when the task that pushes the EOF to audioChunkStream
runs, writeAudio
will have nothing else to do and as such it will trigger the finish
event right away when audioChunkStream
forwards the EOF (due to .pipe()
) -- or, well, maybe not "right away", but certainly less than "a setTimeout(,0)
away".
Anyway, please let me know if any of that makes sense -- believe it or not, this is me trying to be as succinct as possible. I have tested these fixes before suggesting them here.
PS.: I know 50ms is a very, very short window, but the race condition is certainly there. On-prem responses could theoretically be that fast (e.g. empty sentences), but, more than that, this number could change in the future (with some kind of exponential backoff implementation or whatnot) and by that time this kind of issue would be a lot of fun to debug :P
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
EXCELLENT find. I really appreciate the thoroughness! I kind of felt something was off here - but couldn't find it. Thank you! I'll circle back to this PR in a bit after I'm done with the training cluster work. Thanks again!
The primary (2024/02/28) motivation for this is to protect PlayHT On-Prem appliance from being inundated with a burst text-to-speech requests that it can't satisfy.
Prior to this change, the client would split a text stream into two text chunks (referred to as "sentences") and send them to the API client (i.e. gRPC client) simultaneously. This would routinely overload on-prem appliances that operate without a lot of GPU capacity headroom[1]. The result would be that most streams that clients start would immediately result in a gRPC
RESOURCE_EXHAUSTED
error; and therefore, a bad customer experience.This change allows customers to optionally turn on one of an enumerated set of congestion control algorithms. I've implemented just one for now,
StaticMar2024
, which delays sending subsequent text chunks (i.e. sentences) to the gRPC client until audio for the preceding text chunk has started streaming. This is a very simple congestion control algorithm with crudely chosen static constants; it leaves a lot to be desired. We should iterate on these algorithms in the future. TheCongestionCtrl
enum was added so that algorithms can be added without requiring customers to change their code much.[1] Customers tend to be very cost sensitive regarding expensive GPU capacity, and therefore want to keep their appliances running near 100% utilization.