Skip to content

Commit

Permalink
[query] Fix early close on lazy functions
Browse files Browse the repository at this point in the history
  • Loading branch information
arnikola committed Oct 6, 2019
1 parent 17531d8 commit 42ad45d
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 3 deletions.
13 changes: 12 additions & 1 deletion src/query/executor/transform/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,18 @@ func ProcessSimpleBlock(
return err
}

// NB: The flow here is a little weird; this kicks off the next block's
// processing step after retrieving it, then attempts to close it. There is a
// trick here where some blocks (specifically lazy wrappers) that should not
// be closed, as they would free underlying data. The general story in block
// lifecycle should be revisited to remove quirks arising from these edge
// cases (something where blocks are responsible for calling their own
// downstreams would seem more intuative and allow finer grained lifecycle
// control).
err = controller.Process(queryCtx, nextBlock)
nextBlock.Close()
if nextBlock.Info().Type() != block.BlockLazy {
nextBlock.Close()
}

return err
}
28 changes: 26 additions & 2 deletions src/query/executor/transform/exec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,11 +80,22 @@ func TestProcessSimpleBlock(t *testing.T) {
return ProcessSimpleBlock(tctx.Node, tctx.Controller, tctx.QueryCtx, tctx.Controller.ID, tctx.SourceBlock)
}

configureSuccessfulNode := func(tctx *testContext) {
configureNode := func(
tctx *testContext,
blockType block.BlockType,
closeExpected bool,
) {
tctx.Node.EXPECT().Params().Return(utils.StaticParams("foo"))
tctx.Node.EXPECT().ProcessBlock(gomock.Any(), gomock.Any(), gomock.Any()).Return(tctx.ResultBlock, nil)
tctx.ChildNode.EXPECT().Process(gomock.Any(), gomock.Any(), gomock.Any())
tctx.ResultBlock.EXPECT().Close()
tctx.ResultBlock.EXPECT().Info().Return(block.NewBlockInfo(blockType))
if closeExpected {
tctx.ResultBlock.EXPECT().Close()
}
}

configureSuccessfulNode := func(tctx *testContext) {
configureNode(tctx, block.BlockM3TSZCompressed, true)
}

t.Run("closes next block", func(t *testing.T) {
Expand All @@ -96,6 +107,19 @@ func TestProcessSimpleBlock(t *testing.T) {
require.NoError(t, doCall(tctx))
})

configureLazyNode := func(tctx *testContext) {
configureNode(tctx, block.BlockLazy, false)
}

t.Run("does not close lazy block", func(t *testing.T) {
tctx, closer := setup(t)
defer closer()

configureLazyNode(tctx)

require.NoError(t, doCall(tctx))
})

t.Run("errors on process error", func(t *testing.T) {
tctx, closer := setup(t)
defer closer()
Expand Down

0 comments on commit 42ad45d

Please sign in to comment.