-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Extract ReceiverStreamBuilder #7817
Conversation
5c7dd79
to
744971e
Compare
@@ -38,6 +38,113 @@ use tokio::task::JoinSet; | |||
use super::metrics::BaselineMetrics; | |||
use super::{ExecutionPlan, RecordBatchStream, SendableRecordBatchStream}; | |||
|
|||
/// Creates a stream from a collection of producing tasks, routing panics to the stream | |||
pub(crate) struct ReceiverStreamBuilder<O> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I made this pub(crate) but it could easily be made public should we wish to do so
/// underlying tasks correctly. | ||
/// | ||
/// Use [`Self::builder`] to construct one. | ||
pub struct RecordBatchReceiverStream { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This type doesn't really make a lot of sense, given it isn't actually what the builder returns
schema, | ||
join_set: JoinSet::new(), | ||
inner: ReceiverStreamBuilder::new(capacity), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seemed unnecessary / undesirable to burden ReceiverStreamBuilder with a notion of Schema
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree
// don't need tx | ||
drop(tx); | ||
|
||
// future that checks the result of the join set, and propagates panic if seen |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the interesting logic that needs DRYing
@@ -110,7 +211,7 @@ impl RecordBatchReceiverStreamBuilder { | |||
) { | |||
let output = self.tx(); | |||
|
|||
self.spawn(async move { | |||
self.inner.spawn(async move { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This logic by comparison is rather ExecutionPlan specifi, and I don't think valuable to DRY
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
schema, | ||
join_set: JoinSet::new(), | ||
inner: ReceiverStreamBuilder::new(capacity), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree
inner: BoxStream<'static, Result<RecordBatch>>, | ||
} | ||
#[doc(hidden)] | ||
pub struct RecordBatchReceiverStream {} | ||
|
||
impl RecordBatchReceiverStream { | ||
/// Create a builder with an internal buffer of capacity batches. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, this method documentation also seems incorrect at this time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This implementation looks good to me.
Compared to ReceiverStream
from tokio-stream, the only difference is this ReceiverStreamBuilder
provides methods to the user to bound and "detach" their tasks (spawn()
and spawn_blocking()
).
But considering this is also not tough to achieve without ReceiverStreamBuilder
, is it possible to use the one tokio provides instead?
AFAICT the tokio version doesn't provide panic propagation nor cancellation? |
Tokio version doesn't propagate panics. A panicked task will only panic the env it runs. It provides cancellation from the mpsc, dropped consumer (or the stream itself) will result in errors in senders. |
Given panic propagation and task cancellation are the entire purpose of this construction, I think we're therefore not duplicating something upstream? |
Thanks for explaining, LGTM 👍 |
I tried to encode this conversation into comments |
Looks great. TY. |
I plan to merge this PR when the CI passes |
Which issue does this PR close?
Closes #7800
Rationale for this change
An alternative to #7800 that instead of trying to make
RecordBatchReceiverStreamBuilder
handle non-RecordBatch inputs, instead extracts the lower-level stream management logic that I believe is what we're actually wanting to DRYWhat changes are included in this PR?
Are these changes tested?
Are there any user-facing changes?