-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Remove interior mutability of MemTable
#4514
Conversation
for batch in insert_batches.iter_mut() { | ||
origin_batches.push(RecordBatch::try_new( | ||
schema.clone(), | ||
batch.columns().to_vec(), | ||
)?); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The ideal way should be concating columns of origin_batches
and insert_batches
, then create new RecordBatch
es with origin schema and concated columns.
But seems arrow-rs doesn't provide similar APIs, so I just use the current way.
@@ -74,12 +72,20 @@ pub async fn insert(ctx: &SessionContext, insert_stmt: &SQLStatement) -> Result< | |||
.collect::<std::result::Result<Vec<DFExpr>, DataFusionError>>()?; | |||
// Directly use `select` to get `RecordBatch` | |||
let dataframe = ctx.read_empty()?; | |||
insert_batches.push(dataframe.select(logical_exprs)?.collect().await?) | |||
insert_batches.extend(dataframe.select(logical_exprs)?.collect().await?) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if you could make this
insert_batches.extend(dataframe.select(logical_exprs)?.collect().await?) | |
origin_batches.extend(dataframe.select(logical_exprs)?.collect().await?) |
And avoid the need for insert_batches
entirely 🤔
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, nice suggestion!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Look like a nice improvement to me -- thanks @xudong963
I had some small suggestions but they aren't needed or could be done as a follow on PR I think
table_batches.extend(insert_batches); | ||
// Replace new batches schema to old schema | ||
for batch in origin_batches.iter_mut() { | ||
*batch = RecordBatch::try_new(schema.clone(), batch.columns().to_vec())?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is interesting -- I guess we can sort it out as a follow on
Thanks @xudong963 ! |
Benchmark runs are scheduled for baseline = 73527e3 and contender = 0fca6d5. 0fca6d5 is a master commit associated with this PR. Results will be available as each benchmark for each run completes. |
Which issue does this PR close?
Follow up #4496
Rationale for this change
What changes are included in this PR?
Are these changes tested?
Are there any user-facing changes?