Skip to content

Commit

Permalink
sql: add ResultTypes to processor spec
Browse files Browse the repository at this point in the history
This commit introduces a new field to `execinfrapb.ProcessorSpec` in
order to propagate the result types of the processors and the
corresponding field has been removed from `PhysicalPlan` (and was
replaced by a getter method). I believe it is a cleaner abstraction than
what we had previously (implicit type schema expectation by the output
stage that is passed into InputSyncSpec), and this change is
a prerequisite to refactor of the vectorized flow setup - with this
change it'll be now very easy to make sure that the desired type schema
is produced after any vectorized operator (without this plumbing we
would need to attempt to deduce the desired schema, and in some cases it
is not possible - e.g. when we have a remote table reader that is
feeding into an outbox).

Release note: None
  • Loading branch information
yuzefovich committed Oct 27, 2020
1 parent 13d435e commit a76ee31
Show file tree
Hide file tree
Showing 28 changed files with 452 additions and 303 deletions.
13 changes: 8 additions & 5 deletions pkg/ccl/backupccl/restore_processor_planning.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/sql/physicalplan"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/storage/cloud"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/errors"
Expand Down Expand Up @@ -159,7 +160,8 @@ func distRestore(
RangeRouterSpec: rangeRouterSpec,
},
},
StageID: splitAndScatterStageID,
StageID: splitAndScatterStageID,
ResultTypes: splitAndScatterOutputTypes,
},
}
pIdx := p.AddProcessor(proc)
Expand All @@ -176,10 +178,11 @@ func distRestore(
Input: []execinfrapb.InputSyncSpec{
{ColumnTypes: splitAndScatterOutputTypes},
},
Core: execinfrapb.ProcessorCoreUnion{RestoreData: &restoreDataSpec},
Post: execinfrapb.PostProcessSpec{},
Output: []execinfrapb.OutputRouterSpec{{Type: execinfrapb.OutputRouterSpec_PASS_THROUGH}},
StageID: restoreDataStageID,
Core: execinfrapb.ProcessorCoreUnion{RestoreData: &restoreDataSpec},
Post: execinfrapb.PostProcessSpec{},
Output: []execinfrapb.OutputRouterSpec{{Type: execinfrapb.OutputRouterSpec_PASS_THROUGH}},
StageID: restoreDataStageID,
ResultTypes: []*types.T{},
},
}
pIdx := p.AddProcessor(proc)
Expand Down
3 changes: 2 additions & 1 deletion pkg/sql/colexec/colbuilder/execplan.go
Original file line number Diff line number Diff line change
Expand Up @@ -1164,7 +1164,8 @@ func (r opResult) wrapPostProcessSpec(
Core: execinfrapb.ProcessorCoreUnion{
Noop: &execinfrapb.NoopCoreSpec{},
},
Post: *post,
Post: *post,
ResultTypes: args.Spec.ResultTypes,
}
return r.createAndWrapRowSource(
ctx, flowCtx, args, []colexecbase.Operator{r.Op}, [][]*types.T{r.ColumnTypes},
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/colexec/external_sort_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,7 @@ func createDiskBackedSorter(
Post: execinfrapb.PostProcessSpec{
Limit: uint64(k),
},
ResultTypes: typs,
}
args := &NewColOperatorArgs{
Spec: spec,
Expand Down
10 changes: 10 additions & 0 deletions pkg/sql/colexec/hashjoiner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -942,6 +942,14 @@ func createSpecForHashJoiner(tc *joinTestCase) *execinfrapb.ProcessorSpec {
for _, outCol := range tc.rightOutCols {
projection = append(projection, rColOffset+outCol)
}
resultTypes := make([]*types.T, 0, len(projection))
for _, i := range projection {
if int(i) < len(tc.leftTypes) {
resultTypes = append(resultTypes, tc.leftTypes[i])
} else {
resultTypes = append(resultTypes, tc.rightTypes[i-rColOffset])
}
}
return &execinfrapb.ProcessorSpec{
Input: []execinfrapb.InputSyncSpec{
{ColumnTypes: tc.leftTypes},
Expand All @@ -954,6 +962,7 @@ func createSpecForHashJoiner(tc *joinTestCase) *execinfrapb.ProcessorSpec {
Projection: true,
OutputColumns: projection,
},
ResultTypes: resultTypes,
}
}

Expand Down Expand Up @@ -1173,6 +1182,7 @@ func TestHashJoinerProjection(t *testing.T) {
// from the left and from the right are intertwined.
OutputColumns: []uint32{3, 1, 0, 5, 4, 2},
},
ResultTypes: []*types.T{types.Int, types.Int, types.Bool, types.Decimal, types.Float, types.Bytes},
}

leftSource := newOpTestInput(1, leftTuples, leftTypes)
Expand Down
4 changes: 3 additions & 1 deletion pkg/sql/colexec/is_null_ops_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,14 +228,16 @@ func TestIsNullSelOp(t *testing.T) {
for _, c := range testCases {
log.Infof(ctx, "%s", c.desc)
opConstructor := func(input []colexecbase.Operator) (colexecbase.Operator, error) {
typs := []*types.T{types.Int}
spec := &execinfrapb.ProcessorSpec{
Input: []execinfrapb.InputSyncSpec{{ColumnTypes: []*types.T{types.Int}}},
Input: []execinfrapb.InputSyncSpec{{ColumnTypes: typs}},
Core: execinfrapb.ProcessorCoreUnion{
Noop: &execinfrapb.NoopCoreSpec{},
},
Post: execinfrapb.PostProcessSpec{
Filter: execinfrapb.Expression{Expr: fmt.Sprintf("@1 %s", c.selExpr)},
},
ResultTypes: typs,
}
args := &NewColOperatorArgs{
Spec: spec,
Expand Down
9 changes: 9 additions & 0 deletions pkg/sql/colexec/mergejoiner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,14 @@ func createSpecForMergeJoiner(tc *joinTestCase) *execinfrapb.ProcessorSpec {
for _, outCol := range tc.rightOutCols {
projection = append(projection, rColOffset+outCol)
}
var resultTypes []*types.T
for _, i := range projection {
if int(i) < len(tc.leftTypes) {
resultTypes = append(resultTypes, tc.leftTypes[i])
} else {
resultTypes = append(resultTypes, tc.rightTypes[i-rColOffset])
}
}
return &execinfrapb.ProcessorSpec{
Input: []execinfrapb.InputSyncSpec{
{ColumnTypes: tc.leftTypes},
Expand All @@ -80,6 +88,7 @@ func createSpecForMergeJoiner(tc *joinTestCase) *execinfrapb.ProcessorSpec {
Projection: true,
OutputColumns: projection,
},
ResultTypes: resultTypes,
}
}

Expand Down
1 change: 1 addition & 0 deletions pkg/sql/colexec/ordinality_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ func createTestOrdinalityOperator(
Core: execinfrapb.ProcessorCoreUnion{
Ordinality: &execinfrapb.OrdinalitySpec{},
},
ResultTypes: append(inputTypes, types.Int),
}
args := &NewColOperatorArgs{
Spec: spec,
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/colexec/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1589,6 +1589,7 @@ func createTestProjectingOperator(
Post: execinfrapb.PostProcessSpec{
RenderExprs: renderExprs,
},
ResultTypes: append(inputTypes, typedExpr.ResolvedType()),
}
args := &NewColOperatorArgs{
Spec: spec,
Expand Down
6 changes: 6 additions & 0 deletions pkg/sql/colexec/window_functions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,11 +287,17 @@ func TestWindowFunctions(t *testing.T) {
for i := range ct {
ct[i] = types.Int
}
resultType := types.Int
wf := tc.windowerSpec.WindowFns[0].Func.WindowFunc
if wf == &percentRankFn || wf == &cumeDistFn {
resultType = types.Float
}
spec := &execinfrapb.ProcessorSpec{
Input: []execinfrapb.InputSyncSpec{{ColumnTypes: ct}},
Core: execinfrapb.ProcessorCoreUnion{
Windower: &tc.windowerSpec,
},
ResultTypes: append(ct, resultType),
}
sem := colexecbase.NewTestingSemaphore(maxNumberFDs)
args := &NewColOperatorArgs{
Expand Down
3 changes: 3 additions & 0 deletions pkg/sql/colflow/colbatch_scan_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/colfetcher"
"github.com/cockroachdb/cockroach/pkg/sql/execinfra"
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/sql/rowenc"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
Expand Down Expand Up @@ -80,6 +81,7 @@ func TestColBatchScanMeta(t *testing.T) {
Projection: true,
OutputColumns: []uint32{0},
},
ResultTypes: rowenc.OneIntCol,
}

args := &colexec.NewColOperatorArgs{
Expand Down Expand Up @@ -138,6 +140,7 @@ func BenchmarkColBatchScan(b *testing.B) {
Projection: true,
OutputColumns: []uint32{0, 1},
},
ResultTypes: rowenc.TwoIntCols,
}

evalCtx := tree.MakeTestingEvalContext(s.ClusterSettings())
Expand Down
6 changes: 6 additions & 0 deletions pkg/sql/colflow/vectorized_flow_space_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/colexecbase/colexecerror"
"github.com/cockroachdb/cockroach/pkg/sql/execinfra"
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/sql/rowenc"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
Expand Down Expand Up @@ -67,6 +68,7 @@ func TestVectorizeInternalMemorySpaceError(t *testing.T) {
Post: execinfrapb.PostProcessSpec{
RenderExprs: []execinfrapb.Expression{{Expr: "CASE WHEN @1 = 1 THEN 1 ELSE 2 END"}},
},
ResultTypes: rowenc.OneIntCol,
},
},
{
Expand All @@ -76,6 +78,7 @@ func TestVectorizeInternalMemorySpaceError(t *testing.T) {
Core: execinfrapb.ProcessorCoreUnion{
MergeJoiner: &execinfrapb.MergeJoinerSpec{},
},
ResultTypes: append(twoInputs[0].ColumnTypes, twoInputs[1].ColumnTypes...),
},
},
}
Expand Down Expand Up @@ -160,6 +163,7 @@ func TestVectorizeAllocatorSpaceError(t *testing.T) {
},
},
},
ResultTypes: oneInput[0].ColumnTypes,
},
spillingSupported: true,
},
Expand All @@ -178,6 +182,7 @@ func TestVectorizeAllocatorSpaceError(t *testing.T) {
},
},
},
ResultTypes: oneInput[0].ColumnTypes,
},
},
{
Expand All @@ -190,6 +195,7 @@ func TestVectorizeAllocatorSpaceError(t *testing.T) {
RightEqColumns: []uint32{0},
},
},
ResultTypes: append(twoInputs[0].ColumnTypes, twoInputs[1].ColumnTypes...),
},
spillingSupported: true,
},
Expand Down
3 changes: 3 additions & 0 deletions pkg/sql/colflow/vectorized_flow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ func TestDrainOnlyInputDAG(t *testing.T) {
},
},
},
ResultTypes: intCols(numInputTypesToMaterializer),
},
// This is the root of the flow. The noop operator that will read from i1
// and the materializer.
Expand All @@ -156,6 +157,7 @@ func TestDrainOnlyInputDAG(t *testing.T) {
Streams: []execinfrapb.StreamEndpointSpec{{Type: execinfrapb.StreamEndpointSpec_SYNC_RESPONSE}},
},
},
ResultTypes: intCols(numInputTypesToMaterializer),
},
{
// Because creating a table reader is too complex (you need to create a
Expand All @@ -179,6 +181,7 @@ func TestDrainOnlyInputDAG(t *testing.T) {
Streams: []execinfrapb.StreamEndpointSpec{{Type: execinfrapb.StreamEndpointSpec_REMOTE}},
},
},
ResultTypes: intCols(numInputTypesToOutbox),
},
}

Expand Down
Loading

0 comments on commit a76ee31

Please sign in to comment.