diff --git a/ddl/table.go b/ddl/table.go index 990568794cb34..6cd4965eb3f75 100644 --- a/ddl/table.go +++ b/ddl/table.go @@ -65,13 +65,6 @@ func onCreateTable(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, _ error) return ver, errors.Trace(err) } - // Fill in TableInfo.StreamWinCol - for _, c := range tbInfo.Columns { - if c.Tp == mysql.TypeTimestamp { - tbInfo.StreamWinCol = c.Name.L - } - } - switch tbInfo.State { case model.StateNone: // none -> public @@ -110,6 +103,15 @@ func onCreateStream(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, _ error return ver, errors.Trace(err) } + // Fill in TableInfo.StreamWinCol + // FIXME: Currently, we use the first `TIMESTAMP` column as streamWinCol. + // There may better way. We'll improve it latter. + for _, c := range tbInfo.Columns { + if c.Tp == mysql.TypeTimestamp { + tbInfo.StreamWinCol = c.Name.L + } + } + switch tbInfo.State { case model.StateNone: // none -> public diff --git a/executor/show.go b/executor/show.go index 929e8d2dfc453..05fa5011e4ea1 100644 --- a/executor/show.go +++ b/executor/show.go @@ -310,13 +310,14 @@ func (e *ShowExec) fetchShowTables(showStream bool) error { if checker != nil && !checker.RequestVerification(activeRoles, e.DBName.O, v.Meta().Name.O, "", mysql.AllPrivMask) { continue } - tableNames = append(tableNames, v.Meta().Name.O) if showStream == true { if v.Meta().IsStream == true { tableNames = append(tableNames, v.Meta().Name.O) } - } else { + if v.Meta().IsStream == false { + tableNames = append(tableNames, v.Meta().Name.O) + } if v.Meta().IsView() { tableTypes[v.Meta().Name.O] = "VIEW" } else { diff --git a/executor/stream_reader.go b/executor/stream_reader.go index 93f5205ac4951..0ef35c154893a 100644 --- a/executor/stream_reader.go +++ b/executor/stream_reader.go @@ -15,12 +15,18 @@ package executor import ( gojson "encoding/json" + "strconv" + "strings" + "github.com/cznic/mathutil" "github.com/pingcap/errors" "github.com/pingcap/parser/model" + "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/types/json" "github.com/pingcap/tidb/util/chunk" + "github.com/pingcap/tidb/util/mock" + // log "github.com/sirupsen/logrus" "golang.org/x/net/context" ) @@ -63,96 +69,96 @@ type StreamReaderExecutor struct { } func (e *StreamReaderExecutor) setVariableName(tp string) { - // if tp == "kafka" { - // e.variableName = variable.TiDBKafkaStreamTablePos - // } else if tp == "pulsar" { - // e.variableName = variable.TiDBPulsarStreamTablePos - // } else if tp == "log" { - // e.variableName = variable.TiDBLogStreamTablePos - // } else if tp == "demo" { - // e.variableName = variable.TiDBStreamTableDemoPos - // } + if tp == "kafka" { + e.variableName = variable.TiDBKafkaStreamTablePos + } else if tp == "pulsar" { + e.variableName = variable.TiDBPulsarStreamTablePos + } else if tp == "log" { + e.variableName = variable.TiDBLogStreamTablePos + } else if tp == "demo" { + e.variableName = variable.TiDBStreamTableDemoPos + } } // Open initialzes necessary variables for using this executor. func (e *StreamReaderExecutor) Open(ctx context.Context) error { - // tp, ok := e.Table.StreamProperties["type"] - // if !ok { - // return errors.New("Cannot find stream table type") - // } - - // e.tp = tp - // e.setVariableName(strings.ToLower(tp)) - - // e.topic, ok = e.Table.StreamProperties["topic"] - // if !ok { - // return errors.New("Cannot find stream table topic") - // } - - // var err error - // value, err := e.ctx.GetSessionVars().GlobalVarsAccessor.GetGlobalSysVar(e.variableName) - // if err != nil { - // return errors.Trace(err) - // } - // if value != "" { - // e.pos, err = strconv.Atoi(value) - // if err != nil { - // return errors.Trace(err) - // } - // } else { - // err = e.ctx.GetSessionVars().GlobalVarsAccessor.SetGlobalSysVar(e.variableName, "0") - // if err != nil { - // return errors.Trace(err) - // } - //} + tp, ok := e.Table.StreamProperties["type"] + if !ok { + return errors.New("Cannot find stream table type") + } + + e.tp = tp + e.setVariableName(strings.ToLower(tp)) + + e.topic, ok = e.Table.StreamProperties["topic"] + if !ok { + return errors.New("Cannot find stream table topic") + } + + var err error + value, err := e.ctx.GetSessionVars().GlobalVarsAccessor.GetGlobalSysVar(e.variableName) + if err != nil { + return errors.Trace(err) + } + if value != "" { + e.pos, err = strconv.Atoi(value) + if err != nil { + return errors.Trace(err) + } + } else { + err = e.ctx.GetSessionVars().GlobalVarsAccessor.SetGlobalSysVar(e.variableName, "0") + if err != nil { + return errors.Trace(err) + } + } return nil } // Next fills data into the chunk passed by its caller. func (e *StreamReaderExecutor) Next(ctx context.Context, chk *chunk.Chunk) error { - // value, err := e.ctx.GetSessionVars().GlobalVarsAccessor.GetGlobalSysVar(e.variableName) - // if err != nil { - // return errors.Trace(err) - // } - // e.pos, err = strconv.Atoi(value) - // if err != nil { - // return errors.Trace(err) - // } - - // pos := 0 - // chk.GrowAndReset(e.maxChunkSize) - // if e.result == nil { - // e.result = e.newFirstChunk() - // pos, err = e.fetchAll(e.pos) - // if err != nil { - // return errors.Trace(err) - // } - // iter := chunk.NewIterator4Chunk(e.result) - // for colIdx := 0; colIdx < e.Schema().Len(); colIdx++ { - // retType := e.Schema().Columns[colIdx].RetType - // if !types.IsTypeVarchar(retType.Tp) { - // continue - // } - // for row := iter.Begin(); row != iter.End(); row = iter.Next() { - // if valLen := len(row.GetString(colIdx)); retType.Flen < valLen { - // retType.Flen = valLen - // } - // } - // } - // } - // if e.cursor >= e.result.NumRows() { - // return nil - // } - // numCurBatch := mathutil.Min(chk.Capacity(), e.result.NumRows()-e.cursor) - // chk.Append(e.result, e.cursor, e.cursor+numCurBatch) - // e.cursor += numCurBatch - - // e.pos = pos - // err = e.ctx.GetSessionVars().GlobalVarsAccessor.SetGlobalSysVar(e.variableName, strconv.Itoa(e.pos)) - // if err != nil { - // return errors.Trace(err) - //} + value, err := e.ctx.GetSessionVars().GlobalVarsAccessor.GetGlobalSysVar(e.variableName) + if err != nil { + return errors.Trace(err) + } + e.pos, err = strconv.Atoi(value) + if err != nil { + return errors.Trace(err) + } + + pos := 0 + chk.GrowAndReset(e.maxChunkSize) + if e.result == nil { + e.result = newFirstChunk(e) + pos, err = e.fetchAll(e.pos) + if err != nil { + return errors.Trace(err) + } + iter := chunk.NewIterator4Chunk(e.result) + for colIdx := 0; colIdx < e.Schema().Len(); colIdx++ { + retType := e.Schema().Columns[colIdx].RetType + if !types.IsTypeVarchar(retType.Tp) { + continue + } + for row := iter.Begin(); row != iter.End(); row = iter.Next() { + if valLen := len(row.GetString(colIdx)); retType.Flen < valLen { + retType.Flen = valLen + } + } + } + } + if e.cursor >= e.result.NumRows() { + return nil + } + numCurBatch := mathutil.Min(chk.Capacity(), e.result.NumRows()-e.cursor) + chk.Append(e.result, e.cursor, e.cursor+numCurBatch) + e.cursor += numCurBatch + + e.pos = pos + err = e.ctx.GetSessionVars().GlobalVarsAccessor.SetGlobalSysVar(e.variableName, strconv.Itoa(e.pos)) + if err != nil { + return errors.Trace(err) + } return nil } @@ -192,22 +198,22 @@ func (e *StreamReaderExecutor) fetchAll(cursor int) (int, error) { } func (e *StreamReaderExecutor) fetchMockData(cursor int) (int, error) { - // var pos int - // for i := cursor; i < maxFetchCnt && i < cursor+batchFetchCnt; { - // data, err := e.getData(mock.MockStreamJsonData[i]) - // if err != nil { - // return 0, errors.Trace(err) - // } + var pos int + for i := cursor; i < maxFetchCnt && i < cursor+batchFetchCnt; { + data, err := e.getData(mock.MockStreamJsonData[i]) + if err != nil { + return 0, errors.Trace(err) + } - // row := chunk.MutRowFromDatums(data).ToRow() - // e.result.AppendRow(row) + row := chunk.MutRowFromDatums(data).ToRow() + e.result.AppendRow(row) - // i++ - // pos = i - // } - // - //return pos, nil - return 0, nil + i++ + pos = i + } + + return pos, nil + //return 0, nil } func (e *StreamReaderExecutor) fetchKafkaData(cursor int) (int, error) { @@ -240,27 +246,27 @@ func (e *StreamReaderExecutor) fetchKafkaData(cursor int) (int, error) { } func (e *StreamReaderExecutor) fetchMockKafkaData(cursor int) (int, error) { - // var pos int - // for i := cursor; i < maxFetchCnt && i < cursor+batchFetchCnt; i++ { - // row := []interface{}{mock.MockKafkaStreamData[i].ID, mock.MockKafkaStreamData[i].Content, mock.MockKafkaStreamData[i].CreateTime} - // e.appendRow(e.result, row) - // pos = i - // } - // - // return pos, nil - return 0, nil + var pos int + for i := cursor; i < maxFetchCnt && i < cursor+batchFetchCnt; i++ { + row := []interface{}{mock.MockKafkaStreamData[i].ID, mock.MockKafkaStreamData[i].Content, mock.MockKafkaStreamData[i].CreateTime} + e.appendRow(e.result, row) + pos = i + } + + return pos, nil + //return 0, nil } func (e *StreamReaderExecutor) fetchMockPulsarData(cursor int) (int, error) { - // var pos int - // for i := cursor; i < maxFetchCnt && i < cursor+batchFetchCnt; i++ { - // row := []interface{}{mock.MockPulsarStreamData[i].ID, mock.MockPulsarStreamData[i].Content, mock.MockPulsarStreamData[i].CreateTime} - // e.appendRow(e.result, row) - // pos = i - // } - // - // return pos, nil - return 0, nil + var pos int + for i := cursor; i < maxFetchCnt && i < cursor+batchFetchCnt; i++ { + row := []interface{}{mock.MockPulsarStreamData[i].ID, mock.MockPulsarStreamData[i].Content, mock.MockPulsarStreamData[i].CreateTime} + e.appendRow(e.result, row) + pos = i + } + + return pos, nil + //return 0, nil } func (e *StreamReaderExecutor) getData(data string) ([]types.Datum, error) { diff --git a/planner/core/optimizer.go b/planner/core/optimizer.go index 89aa581c4990e..c5bcb393a665f 100644 --- a/planner/core/optimizer.go +++ b/planner/core/optimizer.go @@ -66,6 +66,7 @@ var optRuleList = []logicalOptRule{ &aggregationPushDownSolver{}, &pushDownTopNOptimizer{}, &joinReOrderSolver{}, + &streamWindowCompleter{}, } // logicalOptRule means a logical optimizing rule, which contains decorrelate, ppd, column pruning, etc. diff --git a/planner/core/preprocess.go b/planner/core/preprocess.go index 6d36f57a23c15..5121c6d81fda8 100644 --- a/planner/core/preprocess.go +++ b/planner/core/preprocess.go @@ -199,6 +199,7 @@ func (p *preprocessor) Leave(in ast.Node) (out ast.Node, ok bool) { if p.hasStreamTable && x.GroupBy != nil && x.StreamWindowSpec == nil { p.err = errors.New("Can not execute aggregation on stream table without time window") } + if x.StreamWindowSpec != nil { x.StreamWindowSpec.WinCol = p.strWinColName } diff --git a/planner/core/rule_complete_window_info.go b/planner/core/rule_complete_window_info.go index e743653dbd026..35712fae92afa 100644 --- a/planner/core/rule_complete_window_info.go +++ b/planner/core/rule_complete_window_info.go @@ -14,6 +14,8 @@ package core import ( + "context" + "github.com/pingcap/parser/model" "github.com/pingcap/parser/mysql" "github.com/pingcap/tidb/expression" @@ -23,7 +25,7 @@ import ( type streamWindowCompleter struct { } -func (s *streamWindowCompleter) optimize(lp LogicalPlan) (LogicalPlan, error) { +func (s *streamWindowCompleter) optimize(ctx context.Context, lp LogicalPlan) (LogicalPlan, error) { lp.CompleteStreamWindow() return lp, nil } @@ -57,3 +59,7 @@ func (la *LogicalAggregation) CompleteStreamWindow() []*expression.Column { } return nil } + +func (s *streamWindowCompleter) name() string{ + return "stream_window_col_completer" +} diff --git a/planner/core/rule_inject_extra_projection.go b/planner/core/rule_inject_extra_projection.go index 42d71de850132..3dfa68dc494ab 100644 --- a/planner/core/rule_inject_extra_projection.go +++ b/planner/core/rule_inject_extra_projection.go @@ -45,9 +45,9 @@ func (pe *projInjector) inject(plan PhysicalPlan) PhysicalPlan { switch p := plan.(type) { case *PhysicalHashAgg: - plan = injectProjBelowAgg(plan, p.AggFuncs, p.GroupByItems) + plan = injectProjBelowAgg(plan, p.AggFuncs, p.GroupByItems, p.StreamWindow) case *PhysicalStreamAgg: - plan = injectProjBelowAgg(plan, p.AggFuncs, p.GroupByItems) + plan = injectProjBelowAgg(plan, p.AggFuncs, p.GroupByItems, nil) case *PhysicalSort: plan = injectProjBelowSort(p, p.ByItems) case *PhysicalTopN: @@ -70,7 +70,7 @@ func wrapCastForAggFuncs(sctx sessionctx.Context, aggFuncs []*aggregation.AggFun // injectProjBelowAgg injects a ProjOperator below AggOperator. If all the args // of `aggFuncs`, and all the item of `groupByItems` are columns or constants, // we do not need to build the `proj`. -func injectProjBelowAgg(aggPlan PhysicalPlan, aggFuncs []*aggregation.AggFuncDesc, groupByItems []expression.Expression) PhysicalPlan { +func injectProjBelowAgg(aggPlan PhysicalPlan, aggFuncs []*aggregation.AggFuncDesc, groupByItems []expression.Expression, sw *aggregation.AggWindowDesc) PhysicalPlan { hasScalarFunc := false wrapCastForAggFuncs(aggPlan.SCtx(), aggFuncs) @@ -88,7 +88,12 @@ func injectProjBelowAgg(aggPlan PhysicalPlan, aggFuncs []*aggregation.AggFuncDes return aggPlan } - projSchemaCols := make([]*expression.Column, 0, len(aggFuncs)+len(groupByItems)) + var projSchemaCols []*expression.Column + if sw == nil { + projSchemaCols = make([]*expression.Column, 0, len(aggFuncs)+len(groupByItems)) + } else { + projSchemaCols = make([]*expression.Column, 0, len(aggFuncs)+len(groupByItems)+1) + } projExprs := make([]expression.Expression, 0, cap(projSchemaCols)) cursor := 0 @@ -126,6 +131,28 @@ func injectProjBelowAgg(aggPlan PhysicalPlan, aggFuncs []*aggregation.AggFuncDes cursor++ } + if sw != nil { + for i, col := range aggPlan.Children()[0].Schema().Columns { + if col.ColName.L == sw.WinColName { + expr := &expression.Column{ + UniqueID: aggPlan.SCtx().GetSessionVars().AllocPlanColumnID(), + RetType: col.GetType(), + ColName: model.NewCIStr(sw.WinColName), + Index: i, + } + projExprs = append(projExprs, expr) + + col := &expression.Column{ + UniqueID: aggPlan.SCtx().GetSessionVars().AllocPlanColumnID(), + RetType: col.GetType(), + ColName: model.NewCIStr(sw.WinColName), + Index: cursor, + } + projSchemaCols = append(projSchemaCols, col) + } + } + } + child := aggPlan.Children()[0] prop := aggPlan.GetChildReqProps(0).Clone() proj := PhysicalProjection{