Skip to content

Commit

Permalink
Merge pull request pingcap#10 from spongedu/tbssql2
Browse files Browse the repository at this point in the history
Tbssql2
  • Loading branch information
qiuyesuifeng authored Oct 15, 2019
2 parents 240d20e + 3c2b5f8 commit b31fbde
Show file tree
Hide file tree
Showing 7 changed files with 170 additions and 126 deletions.
16 changes: 9 additions & 7 deletions ddl/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,13 +65,6 @@ func onCreateTable(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, _ error)
return ver, errors.Trace(err)
}

// Fill in TableInfo.StreamWinCol
for _, c := range tbInfo.Columns {
if c.Tp == mysql.TypeTimestamp {
tbInfo.StreamWinCol = c.Name.L
}
}

switch tbInfo.State {
case model.StateNone:
// none -> public
Expand Down Expand Up @@ -110,6 +103,15 @@ func onCreateStream(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, _ error
return ver, errors.Trace(err)
}

// Fill in TableInfo.StreamWinCol
// FIXME: Currently, we use the first `TIMESTAMP` column as streamWinCol.
// There may better way. We'll improve it latter.
for _, c := range tbInfo.Columns {
if c.Tp == mysql.TypeTimestamp {
tbInfo.StreamWinCol = c.Name.L
}
}

switch tbInfo.State {
case model.StateNone:
// none -> public
Expand Down
5 changes: 3 additions & 2 deletions executor/show.go
Original file line number Diff line number Diff line change
Expand Up @@ -310,13 +310,14 @@ func (e *ShowExec) fetchShowTables(showStream bool) error {
if checker != nil && !checker.RequestVerification(activeRoles, e.DBName.O, v.Meta().Name.O, "", mysql.AllPrivMask) {
continue
}
tableNames = append(tableNames, v.Meta().Name.O)
if showStream == true {
if v.Meta().IsStream == true {
tableNames = append(tableNames, v.Meta().Name.O)
}

} else {
if v.Meta().IsStream == false {
tableNames = append(tableNames, v.Meta().Name.O)
}
if v.Meta().IsView() {
tableTypes[v.Meta().Name.O] = "VIEW"
} else {
Expand Down
230 changes: 118 additions & 112 deletions executor/stream_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,18 @@ package executor

import (
gojson "encoding/json"
"strconv"
"strings"

"github.com/cznic/mathutil"
"github.com/pingcap/errors"
"github.com/pingcap/parser/model"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/types/json"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/mock"

// log "github.com/sirupsen/logrus"
"golang.org/x/net/context"
)
Expand Down Expand Up @@ -63,96 +69,96 @@ type StreamReaderExecutor struct {
}

func (e *StreamReaderExecutor) setVariableName(tp string) {
// if tp == "kafka" {
// e.variableName = variable.TiDBKafkaStreamTablePos
// } else if tp == "pulsar" {
// e.variableName = variable.TiDBPulsarStreamTablePos
// } else if tp == "log" {
// e.variableName = variable.TiDBLogStreamTablePos
// } else if tp == "demo" {
// e.variableName = variable.TiDBStreamTableDemoPos
// }
if tp == "kafka" {
e.variableName = variable.TiDBKafkaStreamTablePos
} else if tp == "pulsar" {
e.variableName = variable.TiDBPulsarStreamTablePos
} else if tp == "log" {
e.variableName = variable.TiDBLogStreamTablePos
} else if tp == "demo" {
e.variableName = variable.TiDBStreamTableDemoPos
}
}

// Open initialzes necessary variables for using this executor.
func (e *StreamReaderExecutor) Open(ctx context.Context) error {
// tp, ok := e.Table.StreamProperties["type"]
// if !ok {
// return errors.New("Cannot find stream table type")
// }

// e.tp = tp
// e.setVariableName(strings.ToLower(tp))

// e.topic, ok = e.Table.StreamProperties["topic"]
// if !ok {
// return errors.New("Cannot find stream table topic")
// }

// var err error
// value, err := e.ctx.GetSessionVars().GlobalVarsAccessor.GetGlobalSysVar(e.variableName)
// if err != nil {
// return errors.Trace(err)
// }
// if value != "" {
// e.pos, err = strconv.Atoi(value)
// if err != nil {
// return errors.Trace(err)
// }
// } else {
// err = e.ctx.GetSessionVars().GlobalVarsAccessor.SetGlobalSysVar(e.variableName, "0")
// if err != nil {
// return errors.Trace(err)
// }
//}
tp, ok := e.Table.StreamProperties["type"]
if !ok {
return errors.New("Cannot find stream table type")
}

e.tp = tp
e.setVariableName(strings.ToLower(tp))

e.topic, ok = e.Table.StreamProperties["topic"]
if !ok {
return errors.New("Cannot find stream table topic")
}

var err error
value, err := e.ctx.GetSessionVars().GlobalVarsAccessor.GetGlobalSysVar(e.variableName)
if err != nil {
return errors.Trace(err)
}
if value != "" {
e.pos, err = strconv.Atoi(value)
if err != nil {
return errors.Trace(err)
}
} else {
err = e.ctx.GetSessionVars().GlobalVarsAccessor.SetGlobalSysVar(e.variableName, "0")
if err != nil {
return errors.Trace(err)
}
}

return nil
}

// Next fills data into the chunk passed by its caller.
func (e *StreamReaderExecutor) Next(ctx context.Context, chk *chunk.Chunk) error {
// value, err := e.ctx.GetSessionVars().GlobalVarsAccessor.GetGlobalSysVar(e.variableName)
// if err != nil {
// return errors.Trace(err)
// }
// e.pos, err = strconv.Atoi(value)
// if err != nil {
// return errors.Trace(err)
// }

// pos := 0
// chk.GrowAndReset(e.maxChunkSize)
// if e.result == nil {
// e.result = e.newFirstChunk()
// pos, err = e.fetchAll(e.pos)
// if err != nil {
// return errors.Trace(err)
// }
// iter := chunk.NewIterator4Chunk(e.result)
// for colIdx := 0; colIdx < e.Schema().Len(); colIdx++ {
// retType := e.Schema().Columns[colIdx].RetType
// if !types.IsTypeVarchar(retType.Tp) {
// continue
// }
// for row := iter.Begin(); row != iter.End(); row = iter.Next() {
// if valLen := len(row.GetString(colIdx)); retType.Flen < valLen {
// retType.Flen = valLen
// }
// }
// }
// }
// if e.cursor >= e.result.NumRows() {
// return nil
// }
// numCurBatch := mathutil.Min(chk.Capacity(), e.result.NumRows()-e.cursor)
// chk.Append(e.result, e.cursor, e.cursor+numCurBatch)
// e.cursor += numCurBatch

// e.pos = pos
// err = e.ctx.GetSessionVars().GlobalVarsAccessor.SetGlobalSysVar(e.variableName, strconv.Itoa(e.pos))
// if err != nil {
// return errors.Trace(err)
//}
value, err := e.ctx.GetSessionVars().GlobalVarsAccessor.GetGlobalSysVar(e.variableName)
if err != nil {
return errors.Trace(err)
}
e.pos, err = strconv.Atoi(value)
if err != nil {
return errors.Trace(err)
}

pos := 0
chk.GrowAndReset(e.maxChunkSize)
if e.result == nil {
e.result = newFirstChunk(e)
pos, err = e.fetchAll(e.pos)
if err != nil {
return errors.Trace(err)
}
iter := chunk.NewIterator4Chunk(e.result)
for colIdx := 0; colIdx < e.Schema().Len(); colIdx++ {
retType := e.Schema().Columns[colIdx].RetType
if !types.IsTypeVarchar(retType.Tp) {
continue
}
for row := iter.Begin(); row != iter.End(); row = iter.Next() {
if valLen := len(row.GetString(colIdx)); retType.Flen < valLen {
retType.Flen = valLen
}
}
}
}
if e.cursor >= e.result.NumRows() {
return nil
}
numCurBatch := mathutil.Min(chk.Capacity(), e.result.NumRows()-e.cursor)
chk.Append(e.result, e.cursor, e.cursor+numCurBatch)
e.cursor += numCurBatch

e.pos = pos
err = e.ctx.GetSessionVars().GlobalVarsAccessor.SetGlobalSysVar(e.variableName, strconv.Itoa(e.pos))
if err != nil {
return errors.Trace(err)
}

return nil
}
Expand Down Expand Up @@ -192,22 +198,22 @@ func (e *StreamReaderExecutor) fetchAll(cursor int) (int, error) {
}

func (e *StreamReaderExecutor) fetchMockData(cursor int) (int, error) {
// var pos int
// for i := cursor; i < maxFetchCnt && i < cursor+batchFetchCnt; {
// data, err := e.getData(mock.MockStreamJsonData[i])
// if err != nil {
// return 0, errors.Trace(err)
// }
var pos int
for i := cursor; i < maxFetchCnt && i < cursor+batchFetchCnt; {
data, err := e.getData(mock.MockStreamJsonData[i])
if err != nil {
return 0, errors.Trace(err)
}

// row := chunk.MutRowFromDatums(data).ToRow()
// e.result.AppendRow(row)
row := chunk.MutRowFromDatums(data).ToRow()
e.result.AppendRow(row)

// i++
// pos = i
// }
//
//return pos, nil
return 0, nil
i++
pos = i
}

return pos, nil
//return 0, nil
}

func (e *StreamReaderExecutor) fetchKafkaData(cursor int) (int, error) {
Expand Down Expand Up @@ -240,27 +246,27 @@ func (e *StreamReaderExecutor) fetchKafkaData(cursor int) (int, error) {
}

func (e *StreamReaderExecutor) fetchMockKafkaData(cursor int) (int, error) {
// var pos int
// for i := cursor; i < maxFetchCnt && i < cursor+batchFetchCnt; i++ {
// row := []interface{}{mock.MockKafkaStreamData[i].ID, mock.MockKafkaStreamData[i].Content, mock.MockKafkaStreamData[i].CreateTime}
// e.appendRow(e.result, row)
// pos = i
// }
//
// return pos, nil
return 0, nil
var pos int
for i := cursor; i < maxFetchCnt && i < cursor+batchFetchCnt; i++ {
row := []interface{}{mock.MockKafkaStreamData[i].ID, mock.MockKafkaStreamData[i].Content, mock.MockKafkaStreamData[i].CreateTime}
e.appendRow(e.result, row)
pos = i
}

return pos, nil
//return 0, nil
}

func (e *StreamReaderExecutor) fetchMockPulsarData(cursor int) (int, error) {
// var pos int
// for i := cursor; i < maxFetchCnt && i < cursor+batchFetchCnt; i++ {
// row := []interface{}{mock.MockPulsarStreamData[i].ID, mock.MockPulsarStreamData[i].Content, mock.MockPulsarStreamData[i].CreateTime}
// e.appendRow(e.result, row)
// pos = i
// }
//
// return pos, nil
return 0, nil
var pos int
for i := cursor; i < maxFetchCnt && i < cursor+batchFetchCnt; i++ {
row := []interface{}{mock.MockPulsarStreamData[i].ID, mock.MockPulsarStreamData[i].Content, mock.MockPulsarStreamData[i].CreateTime}
e.appendRow(e.result, row)
pos = i
}

return pos, nil
//return 0, nil
}

func (e *StreamReaderExecutor) getData(data string) ([]types.Datum, error) {
Expand Down
1 change: 1 addition & 0 deletions planner/core/optimizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ var optRuleList = []logicalOptRule{
&aggregationPushDownSolver{},
&pushDownTopNOptimizer{},
&joinReOrderSolver{},
&streamWindowCompleter{},
}

// logicalOptRule means a logical optimizing rule, which contains decorrelate, ppd, column pruning, etc.
Expand Down
1 change: 1 addition & 0 deletions planner/core/preprocess.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,7 @@ func (p *preprocessor) Leave(in ast.Node) (out ast.Node, ok bool) {
if p.hasStreamTable && x.GroupBy != nil && x.StreamWindowSpec == nil {
p.err = errors.New("Can not execute aggregation on stream table without time window")
}

if x.StreamWindowSpec != nil {
x.StreamWindowSpec.WinCol = p.strWinColName
}
Expand Down
8 changes: 7 additions & 1 deletion planner/core/rule_complete_window_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
package core

import (
"context"

"github.com/pingcap/parser/model"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/tidb/expression"
Expand All @@ -23,7 +25,7 @@ import (
type streamWindowCompleter struct {
}

func (s *streamWindowCompleter) optimize(lp LogicalPlan) (LogicalPlan, error) {
func (s *streamWindowCompleter) optimize(ctx context.Context, lp LogicalPlan) (LogicalPlan, error) {
lp.CompleteStreamWindow()
return lp, nil
}
Expand Down Expand Up @@ -57,3 +59,7 @@ func (la *LogicalAggregation) CompleteStreamWindow() []*expression.Column {
}
return nil
}

func (s *streamWindowCompleter) name() string{
return "stream_window_col_completer"
}
Loading

0 comments on commit b31fbde

Please sign in to comment.