From 4bf8e329353f8c0a09bfd6f2902f2beb79e7d805 Mon Sep 17 00:00:00 2001 From: Breezewish Date: Thu, 12 Oct 2017 15:37:55 +0800 Subject: [PATCH 01/13] allow differentiating system sessions and user sessions --- privilege/privileges/cache.go | 19 ++----------------- session.go | 21 +++++++++++++++++++-- tidb.go | 4 ++-- 3 files changed, 23 insertions(+), 21 deletions(-) diff --git a/privilege/privileges/cache.go b/privilege/privileges/cache.go index dc3f6aa32eff9..62cf6725deefa 100644 --- a/privilege/privileges/cache.go +++ b/privilege/privileges/cache.go @@ -172,26 +172,11 @@ func (p *MySQLPrivilege) LoadColumnsPrivTable(ctx context.Context) error { func (p *MySQLPrivilege) loadTable(ctx context.Context, sql string, decodeTableRow func(*ast.Row, []*ast.ResultField) error) error { - tmp, err := ctx.(sqlexec.SQLExecutor).Execute(sql) + rows, fs, err := ctx.(sqlexec.RestrictedSQLExecutor).ExecRestrictedSQL(ctx, sql) if err != nil { return errors.Trace(err) } - rs := tmp[0] - defer terror.Call(rs.Close) - - fs, err := rs.Fields() - if err != nil { - return errors.Trace(err) - } - for { - row, err := rs.Next() - if err != nil { - return errors.Trace(err) - } - if row == nil { - break - } - + for _, row := range rows { err = decodeTableRow(row, fs) if err != nil { return errors.Trace(err) diff --git a/session.go b/session.go index 0e06eaee713b1..49ba16b043d00 100644 --- a/session.go +++ b/session.go @@ -59,6 +59,7 @@ import ( // Session context type Session interface { context.Context + IsSystemSession() bool Status() uint16 // Flag of current status, such as autocommit. LastInsertID() uint64 // LastInsertID is the last inserted auto_increment ID. AffectedRows() uint64 // Affected rows by latest executed stmt. @@ -122,6 +123,8 @@ type session struct { goCtx goctx.Context cancelFunc goctx.CancelFunc + isSystem bool + mu struct { sync.RWMutex values map[fmt.Stringer]interface{} @@ -213,6 +216,14 @@ func (s *session) GetSessionManager() util.SessionManager { return s.sessionManager } +func (s *session) SetSystemSession() { + s.isSystem = true +} + +func (s *session) IsSystemSession() bool { + return s.isSystem +} + type schemaLeaseChecker struct { domain.SchemaValidator schemaVer int64 @@ -527,7 +538,7 @@ func (s *session) ExecRestrictedSQL(ctx context.Context, sql string) ([]*ast.Row return rows, fields, nil } -func createSessionFunc(store kv.Storage) pools.Factory { +func createSessionFunc(store kv.Storage, isSystem bool) pools.Factory { return func() (pools.Resource, error) { se, err := createSession(store) if err != nil { @@ -539,11 +550,14 @@ func createSessionFunc(store kv.Storage) pools.Factory { } se.sessionVars.CommonGlobalLoaded = true se.sessionVars.InRestrictedSQL = true + if isSystem { + se.SetSystemSession() + } return se, nil } } -func createSessionWithDomainFunc(store kv.Storage) func(*domain.Domain) (pools.Resource, error) { +func createSessionWithDomainFunc(store kv.Storage, isSystem bool) func(*domain.Domain) (pools.Resource, error) { return func(dom *domain.Domain) (pools.Resource, error) { se, err := createSessionWithDomain(store, dom) if err != nil { @@ -555,6 +569,9 @@ func createSessionWithDomainFunc(store kv.Storage) func(*domain.Domain) (pools.R } se.sessionVars.CommonGlobalLoaded = true se.sessionVars.InRestrictedSQL = true + if isSystem { + se.SetSystemSession() + } return se, nil } } diff --git a/tidb.go b/tidb.go index c855ad7971304..c7224c7fc539f 100644 --- a/tidb.go +++ b/tidb.go @@ -68,8 +68,8 @@ func (dm *domainMap) Get(store kv.Storage) (d *domain.Domain, err error) { } err = util.RunWithRetry(defaultMaxRetries, retryInterval, func() (retry bool, err1 error) { log.Infof("store %v new domain, ddl lease %v, stats lease %d", store.UUID(), ddlLease, statisticLease) - factory := createSessionFunc(store) - sysFactory := createSessionWithDomainFunc(store) + factory := createSessionFunc(store, true) + sysFactory := createSessionWithDomainFunc(store, true) d, err1 = domain.NewDomain(store, ddlLease, statisticLease, factory, sysFactory) return true, errors.Trace(err1) }) From 4543070267646014de7f726bb144f3cac26c37ef Mon Sep 17 00:00:00 2001 From: Breezewish Date: Thu, 12 Oct 2017 16:33:24 +0800 Subject: [PATCH 02/13] load schema definitions from file --- config/config.go | 13 ++++++ config/config.toml.example | 10 +++++ dashbase/schema.go | 84 ++++++++++++++++++++++++++++++++++++++ tidb-server/main.go | 11 +++++ 4 files changed, 118 insertions(+) create mode 100644 dashbase/schema.go diff --git a/config/config.go b/config/config.go index 1993ee1924afd..ac6d0bb032b97 100644 --- a/config/config.go +++ b/config/config.go @@ -36,6 +36,7 @@ type Config struct { Status Status `toml:"status" json:"status"` Performance Performance `toml:"performance" json:"performance"` XProtocol XProtocol `toml:"xprotocol" json:"xprotocol"` + Dashbase Dashbase `tomk:"dashbase" json:"dashbase"` } // Log is the log section of config. @@ -88,6 +89,13 @@ type XProtocol struct { XSocket string `toml:"xsocket" json:"xsocket"` } +// Dashbase is the dashbase section of the config. +type Dashbase struct { + Enabled bool `toml:"enabled" json:"enabled"` + KafkaHosts []string `toml:"kafka-hosts" json:"kafka-hosts"` + SchemaFile string `toml:"schema-file" json:"schema-file"` +} + var defaultConf = Config{ Host: "0.0.0.0", Port: 4000, @@ -121,6 +129,11 @@ var defaultConf = Config{ XHost: "0.0.0.0", XPort: 14000, }, + Dashbase: Dashbase{ + Enabled: true, + KafkaHosts: []string{"localhost:9092"}, + SchemaFile: "dashbase-schema.toml", + }, } var globalConf = defaultConf diff --git a/config/config.toml.example b/config/config.toml.example index 88a47eeaf0e54..c624d18c5e392 100644 --- a/config/config.toml.example +++ b/config/config.toml.example @@ -114,3 +114,13 @@ xport = 14000 # The socket file to use for x protocol connection. xsocket = "" + +[dashbase] +# Whether TiDB is acted as a Dashbase proxy. +enabled = false + +# The Kafka hosts for inserting data. +kafka-hosts = ["localhost:9092"] + +# The path to the file describes table schemas in TOML. +schema-file = "" diff --git a/dashbase/schema.go b/dashbase/schema.go new file mode 100644 index 0000000000000..766300f5f9c52 --- /dev/null +++ b/dashbase/schema.go @@ -0,0 +1,84 @@ +// Copyright 2017 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package dashbase + +import ( + "fmt" + "strings" + + "github.com/BurntSushi/toml" + "github.com/juju/errors" +) + +type ColumnDefinition struct { + name string + dataType string +} + +type TableDefinition struct { + name string + columns []*ColumnDefinition +} + +var tableDefinitions map[string]*TableDefinition + +func init() { + clearTableDefinitions() +} + +func clearTableDefinitions() { + tableDefinitions = make(map[string]*TableDefinition) +} + +// LoadSchemaFromFile loads Dashbase schema definitions from a file. +func LoadSchemaFromFile(path string) error { + schemas := make(map[string]interface{}) + _, err := toml.DecodeFile(path, &schemas) + if err != nil { + return errors.Trace(err) + } + + tableDefinitions = make(map[string]*TableDefinition) + + for tableName, _tableColumns := range schemas { + var table TableDefinition + table.name = tableName + table.columns = make([]*ColumnDefinition, 0) + tableColumns := _tableColumns.(map[string]interface{}) + for columnName, _columnType := range tableColumns { + columnType := _columnType.(string) + var column ColumnDefinition + column.name = columnName + column.dataType = columnType + table.columns = append(table.columns, &column) + } + lowerTableName := strings.ToLower(tableName) + _, existTable := tableDefinitions[lowerTableName] + if existTable { + return fmt.Errorf("Duplicate table schema definition %s", lowerTableName) + } + tableDefinitions[lowerTableName] = &table + } + + return nil +} + +// GetTableSchema gets defined schema for a given table. +func GetTableSchema(tableName string) *TableDefinition { + table, ok := tableDefinitions[strings.ToLower(tableName)] + if !ok { + return nil + } + return table +} diff --git a/tidb-server/main.go b/tidb-server/main.go index f55ab19b1135b..8ee1d13dc7f2b 100644 --- a/tidb-server/main.go +++ b/tidb-server/main.go @@ -29,6 +29,7 @@ import ( "github.com/pingcap/pd/pkg/logutil" "github.com/pingcap/tidb" "github.com/pingcap/tidb/config" + "github.com/pingcap/tidb/dashbase" "github.com/pingcap/tidb/ddl" "github.com/pingcap/tidb/domain" "github.com/pingcap/tidb/kv" @@ -124,6 +125,7 @@ func main() { validateConfig() setGlobalVars() setupLog() + setupDashbase() printInfo() createStoreAndDomain() setupBinlogClient() @@ -318,6 +320,15 @@ func setGlobalVars() { privileges.SkipWithGrant = cfg.Security.SkipGrantTable } +func setupDashbase() { + if !cfg.Dashbase.Enabled { + return + } + if err := dashbase.LoadSchemaFromFile(cfg.Dashbase.SchemaFile); err != nil { + log.Fatalf("Unable to load Dashbase schema definition: %s", err.Error()) + } +} + func setupLog() { err := logutil.InitLogger(cfg.Log.ToLogConfig()) terror.MustNil(err) From 684bab186b3189cc0e6a48f3521d7b4bc9614dc1 Mon Sep 17 00:00:00 2001 From: Breezewish Date: Fri, 13 Oct 2017 16:53:05 +0800 Subject: [PATCH 03/13] dashbase functions --- dashbase/avro.go | 103 ++++++++++++++++++++++++++++ dashbase/columntype.go | 95 ++++++++++++++++++++++++++ dashbase/execute.go | 149 +++++++++++++++++++++++++++++++++++++++++ dashbase/kafka.go | 39 +++++++++++ dashbase/schema.go | 32 ++++++++- 5 files changed, 416 insertions(+), 2 deletions(-) create mode 100644 dashbase/avro.go create mode 100644 dashbase/columntype.go create mode 100644 dashbase/execute.go create mode 100644 dashbase/kafka.go diff --git a/dashbase/avro.go b/dashbase/avro.go new file mode 100644 index 0000000000000..a4874659e7c36 --- /dev/null +++ b/dashbase/avro.go @@ -0,0 +1,103 @@ +// Copyright 2017 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package dashbase + +import ( + "bytes" + "encoding/binary" + + "github.com/juju/errors" + "github.com/linkedin/goavro" +) + +// See http://avro.apache.org/docs/1.8.2/spec.html#schema_fingerprints +const emptyCRC64 uint64 = 0xc15d213aa4d7a795 + +const avroSchema string = `{"name":"io.dashbase.avro.DashbaseEvent","type":"record","fields":[{"name":"timeInMillis","type":"long"},{"name":"metaColumns","type":{"type":"map","values":"string"}},{"name":"numberColumns","type":{"type":"map","values":"double"}},{"name":"textColumns","type":{"type":"map","values":"string"}},{"name":"idColumns","type":{"type":"map","values":"string"}},{"name":"omitPayload","type":"boolean"}]}` + +var table []uint64 +var dashbaseCodec *goavro.Codec +var dashbaseSchemaChecksum uint64 + +func makeCRC64Table() { + table := make([]uint64, 256) + for i := 0; i < 256; i++ { + fp := uint64(i) + for j := 0; j < 8; j++ { + fp = (fp >> 1) ^ (emptyCRC64 & -(fp & 1)) + } + table[i] = fp + } +} + +func makeAvroCodec() { + codec, err := goavro.NewCodec(avroSchema) + if err != nil { + panic(err) + } + dashbaseCodec = codec +} + +func avroCRC64(buf []byte) uint64 { + fp := emptyCRC64 + for _, val := range buf { + fp = (fp >> 8) ^ table[int(fp^uint64(val))&0xff] + } + return fp +} + +func AvroEncode(columns []*ColumnDefinition, values []interface{}) ([]byte, error) { + event := make(map[string]interface{}) + metaColumns := make(map[string]string) + textColumns := make(map[string]string) + numberColumns := make(map[string]float64) + idColumns := make(map[string]string) + + for idx, column := range columns { + switch column.dataType { + case TypeTime: + event["timeInMillis"] = values[idx].(uint64) + case TypeMeta: + metaColumns[column.name] = values[idx].(string) + case TypeText: + textColumns[column.name] = values[idx].(string) + case TypeNumeric: + numberColumns[column.name] = values[idx].(float64) + } + } + + event["metaColumns"] = metaColumns + event["textColumns"] = textColumns + event["numberColumns"] = numberColumns + event["idColumns"] = idColumns + event["omitPayload"] = false + + body, err := dashbaseCodec.BinaryFromNative(nil, event) + if err != nil { + return nil, errors.Trace(err) + } + + message := new(bytes.Buffer) + message.Write([]byte{0xC3, 0x01}) + binary.Write(message, binary.LittleEndian, dashbaseSchemaChecksum) + message.Write(body) + + return message.Bytes(), nil +} + +func init() { + makeAvroCodec() + makeCRC64Table() + dashbaseSchemaChecksum = avroCRC64([]byte(avroSchema)) +} diff --git a/dashbase/columntype.go b/dashbase/columntype.go new file mode 100644 index 0000000000000..d9d0ca48dd882 --- /dev/null +++ b/dashbase/columntype.go @@ -0,0 +1,95 @@ +// Copyright 2017 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package dashbase + +import ( + "fmt" + "time" + + "github.com/juju/errors" + "github.com/pingcap/tidb/util/types" +) + +// ColumnType is the column type +type ColumnType string + +const ( + // TypeMeta is the `meta` type in Dashbase + TypeMeta ColumnType = "meta" + + // TypeTime is the `time` type in Dashbase + TypeTime ColumnType = "time" + + // TypeNumeric is the `numeric` type in Dashbase + TypeNumeric ColumnType = "numeric" + + // TypeText is the `text` type in Dashbase + TypeText ColumnType = "text" +) + +// KafkaEncoder converts a datum into a type for Dashbase Kafka API +type kafkaEncoder func(types.Datum) interface{} + +type codecDefinition struct { + columnType ColumnType + encoder kafkaEncoder +} + +var codecs = []*codecDefinition{ + { + TypeText, + func(input types.Datum) interface{} { + return input.GetString() + }, + }, + { + TypeMeta, + func(input types.Datum) interface{} { + return input.GetString() + }, + }, + { + TypeNumeric, + func(input types.Datum) interface{} { + return input.GetFloat64() + }, + }, + { + TypeTime, + func(input types.Datum) interface{} { + time, err := input.GetMysqlTime().Time.GoTime(time.Local) + if err != nil { + return 0 + } + return time.Unix() * 1000 + }, + }, +} + +var encoders map[ColumnType]kafkaEncoder + +func init() { + encoders = make(map[ColumnType]kafkaEncoder) + for _, codec := range codecs { + encoders[codec.columnType] = codec.encoder + } +} + +func getEncoder(columnType ColumnType) (kafkaEncoder, error) { + encoder, ok := encoders[columnType] + if !ok { + return nil, errors.Trace(fmt.Errorf("Unsupported Dashbase type %s", columnType)) + } + return encoder, nil +} diff --git a/dashbase/execute.go b/dashbase/execute.go new file mode 100644 index 0000000000000..ca6af2ac1c03d --- /dev/null +++ b/dashbase/execute.go @@ -0,0 +1,149 @@ +// Copyright 2017 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package dashbase + +import ( + "fmt" + "strings" + + "github.com/juju/errors" + "github.com/pingcap/tidb/ast" + "github.com/pingcap/tidb/expression" +) + +func Execute(stmt ast.Node) (bool, ast.RecordSet, error) { + switch node := stmt.(type) { + case *ast.InsertStmt: + return executeInsert(node) + default: + return false, nil, nil + } +} + +func executeInsert(node *ast.InsertStmt) (bool, ast.RecordSet, error) { + tableName := node.Table.TableRefs.Left.(*ast.TableSource).AsName.L + tableSchema := GetTableSchema(tableName) + if tableSchema == nil { + return false, nil, nil + } + + if node.IsReplace { + return true, nil, errors.Errorf("REPLACE is not supported for Dashbase table") + } + if len(node.OnDuplicate) > 0 { + return true, nil, errors.Errorf("ON DUPLICATE is not supported for Dashbase table") + } + + columnsByName := make(map[string]*ColumnDefinition) + for _, col := range tableSchema.columns { + columnsByName[strings.ToLower(col.name)] = col + } + + // Ensure column exists: check INSERT ... SET ... + for _, assignment := range node.Setlist { + if _, exists := columnsByName[assignment.Column.Name.L]; !exists { + return true, nil, fmt.Errorf("unknown column %s", assignment.Column.Name.O) + } + } + // Ensure column exists: check INSERT ... (...) VALUES (...) + if len(node.Columns) > 0 { + if len(node.Columns) != len(node.Lists) { + return true, nil, fmt.Errorf("column count doesn't match value count") + } + for _, column := range node.Columns { + if _, exists := columnsByName[column.Name.L]; !exists { + return true, nil, fmt.Errorf("unknown column %s", column.Name.O) + } + } + } + + var rows [][]*expression.Constant + var columns []*ColumnDefinition + + if len(node.Setlist) > 0 { + // Assign values based on INSERT ... SET ... + row := make([]*expression.Constant, 0) + for _, assignment := range node.Setlist { + if val, ok := assignment.Expr.(*ast.ValueExpr); ok { + row = append(row, &expression.Constant{ + Value: val.Datum, + RetType: &val.Type, + }) + columns = append(columns, columnsByName[assignment.Column.Name.L]) + } else { + return true, nil, fmt.Errorf("Only support constants for Dashbase table") + } + } + rows = append(rows, row) + } else { + // Assign values based on INSERT ... VALUES (....),(....) + hasNamedColumns := true + for _, insertRow := range node.Lists { + maxListItems := len(insertRow) + if len(node.Columns) == 0 { + hasNamedColumns = false + if len(tableSchema.columns) < maxListItems { + maxListItems = len(tableSchema.columns) + } + } + row := make([]*expression.Constant, maxListItems) + for i, item := range insertRow { + if i >= maxListItems { + break + } + var columnName string + if hasNamedColumns { + columnName = node.Columns[i].Name.L + } else { + columnName = strings.ToLower(tableSchema.columns[i].name) + } + if val, ok := item.(*ast.ValueExpr); ok { + row[i] = &expression.Constant{ + Value: val.Datum, + RetType: &val.Type, + } + columns = append(columns, columnsByName[columnName]) + } else { + return true, nil, fmt.Errorf("Only support constants for Dashbase table") + } + } + rows = append(rows, row) + } + } + + var columnEncoders []kafkaEncoder + + for _, column := range columns { + encoder, err := getEncoder(column.dataType) + if err != nil { + return true, nil, errors.Trace(err) + } + columnEncoders = append(columnEncoders, encoder) + } + + for _, row := range rows { + encodedRow := make([]interface{}, len(columns)) + for idx, data := range row { + encodedRow[idx] = columnEncoders[idx](data.Value) + } + + message, err := AvroEncode(columns, encodedRow) + if err != nil { + return true, nil, errors.Trace(err) + } + PublishKafka(strings.ToLower(tableName), message) + } + + return true, ast.RecordSet{}, nil +} diff --git a/dashbase/kafka.go b/dashbase/kafka.go new file mode 100644 index 0000000000000..52db07d7cf7ea --- /dev/null +++ b/dashbase/kafka.go @@ -0,0 +1,39 @@ +// Copyright 2017 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package dashbase + +import ( + "github.com/Shopify/sarama" +) + +var producer sarama.AsyncProducer + +func OpenKafka(hosts []string) error { + config := sarama.NewConfig() + producer2, err := sarama.NewAsyncProducer(hosts, config) + if err != nil { + return err + } + producer = producer2 + return nil +} + +func PublishKafka(topic string, payload []byte) { + message := &sarama.ProducerMessage{Topic: topic, Value: sarama.ByteEncoder(payload)} + producer.Input() <- message +} + +func CloseKafka() { + producer.Close() +} diff --git a/dashbase/schema.go b/dashbase/schema.go index 766300f5f9c52..d88e61698e913 100644 --- a/dashbase/schema.go +++ b/dashbase/schema.go @@ -23,7 +23,7 @@ import ( type ColumnDefinition struct { name string - dataType string + dataType ColumnType } type TableDefinition struct { @@ -60,7 +60,7 @@ func LoadSchemaFromFile(path string) error { columnType := _columnType.(string) var column ColumnDefinition column.name = columnName - column.dataType = columnType + column.dataType = ColumnType(columnType) table.columns = append(table.columns, &column) } lowerTableName := strings.ToLower(tableName) @@ -68,12 +68,40 @@ func LoadSchemaFromFile(path string) error { if existTable { return fmt.Errorf("Duplicate table schema definition %s", lowerTableName) } + err := validateTableDefinition(&table) + if err != nil { + return errors.Trace(err) + } tableDefinitions[lowerTableName] = &table } return nil } +func validateTableDefinition(table *TableDefinition) error { + if len(table.columns) == 0 { + return fmt.Errorf("There should be at least one column in the table %s", table.name) + } + timeFieldCount := 0 + for _, column := range table.columns { + switch column.dataType { + case TypeTime: + timeFieldCount++ + case TypeMeta, TypeNumeric, TypeText: + // do nothing + default: + return fmt.Errorf("Invalid field type %s for column %s in the table %s", column.dataType, column.name, table.name) + } + } + if timeFieldCount == 0 { + return fmt.Errorf("There should be a time column in the table %s", table.name) + } + if timeFieldCount > 1 { + return fmt.Errorf("There should be only one time column in the table %s", table.name) + } + return nil +} + // GetTableSchema gets defined schema for a given table. func GetTableSchema(tableName string) *TableDefinition { table, ok := tableDefinitions[strings.ToLower(tableName)] From b7312755c4e3435c1470ffd27e8d72b6ab29ff93 Mon Sep 17 00:00:00 2001 From: Breezewish Date: Mon, 16 Oct 2017 11:33:04 +0800 Subject: [PATCH 04/13] fix insert --- dashbase/avro.go | 4 ++-- dashbase/columntype.go | 18 ++++++++++++++---- dashbase/execute.go | 29 +++++++++++++++++++++++++++-- domain/domain.go | 3 +++ session.go | 19 +++++++++++++++++++ tidb-server/main.go | 4 ++++ 6 files changed, 69 insertions(+), 8 deletions(-) diff --git a/dashbase/avro.go b/dashbase/avro.go index a4874659e7c36..aef4c74ec9835 100644 --- a/dashbase/avro.go +++ b/dashbase/avro.go @@ -31,7 +31,7 @@ var dashbaseCodec *goavro.Codec var dashbaseSchemaChecksum uint64 func makeCRC64Table() { - table := make([]uint64, 256) + table = make([]uint64, 256) for i := 0; i < 256; i++ { fp := uint64(i) for j := 0; j < 8; j++ { @@ -67,7 +67,7 @@ func AvroEncode(columns []*ColumnDefinition, values []interface{}) ([]byte, erro for idx, column := range columns { switch column.dataType { case TypeTime: - event["timeInMillis"] = values[idx].(uint64) + event["timeInMillis"] = values[idx].(int64) case TypeMeta: metaColumns[column.name] = values[idx].(string) case TypeText: diff --git a/dashbase/columntype.go b/dashbase/columntype.go index d9d0ca48dd882..2a694ebe7cd76 100644 --- a/dashbase/columntype.go +++ b/dashbase/columntype.go @@ -15,7 +15,7 @@ package dashbase import ( "fmt" - "time" + "strconv" "github.com/juju/errors" "github.com/pingcap/tidb/util/types" @@ -62,17 +62,27 @@ var codecs = []*codecDefinition{ { TypeNumeric, func(input types.Datum) interface{} { - return input.GetFloat64() + // return input.GetFloat64() + f, err := strconv.ParseFloat(input.GetString(), 64) + if err != nil { + return 0 + } + return f }, }, { TypeTime, func(input types.Datum) interface{} { - time, err := input.GetMysqlTime().Time.GoTime(time.Local) + i, err := strconv.ParseInt(input.GetString(), 10, 64) if err != nil { return 0 } - return time.Unix() * 1000 + return i + // time, err := input.GetMysqlTime().Time.GoTime(time.Local) + // if err != nil { + // return 0 + // } + // return time.Unix() * 1000 }, }, } diff --git a/dashbase/execute.go b/dashbase/execute.go index ca6af2ac1c03d..6c612b10b9f58 100644 --- a/dashbase/execute.go +++ b/dashbase/execute.go @@ -18,6 +18,7 @@ import ( "strings" "github.com/juju/errors" + "github.com/ngaut/log" "github.com/pingcap/tidb/ast" "github.com/pingcap/tidb/expression" ) @@ -31,8 +32,32 @@ func Execute(stmt ast.Node) (bool, ast.RecordSet, error) { } } +type insertRecordSet struct { + insertedRows int +} + +func (rs insertRecordSet) Next() (*ast.Row, error) { + return nil, nil +} + +func (rs insertRecordSet) Fields() ([]*ast.ResultField, error) { + return nil, nil +} + +func (rs insertRecordSet) Close() error { + return nil +} + func executeInsert(node *ast.InsertStmt) (bool, ast.RecordSet, error) { - tableName := node.Table.TableRefs.Left.(*ast.TableSource).AsName.L + ts, ok := node.Table.TableRefs.Left.(*ast.TableSource) + if !ok { + return false, nil, nil + } + tn, ok := ts.Source.(*ast.TableName) + if !ok { + return false, nil, nil + } + tableName := tn.Name.L tableSchema := GetTableSchema(tableName) if tableSchema == nil { return false, nil, nil @@ -145,5 +170,5 @@ func executeInsert(node *ast.InsertStmt) (bool, ast.RecordSet, error) { PublishKafka(strings.ToLower(tableName), message) } - return true, ast.RecordSet{}, nil + return true, insertRecordSet{insertedRows: len(rows)}, nil } diff --git a/domain/domain.go b/domain/domain.go index d2db522d3a23e..15e63ae2eecc4 100644 --- a/domain/domain.go +++ b/domain/domain.go @@ -24,6 +24,7 @@ import ( "github.com/grpc-ecosystem/go-grpc-prometheus" "github.com/juju/errors" "github.com/ngaut/pools" + "github.com/pingcap/tidb/config" "github.com/pingcap/tidb/context" "github.com/pingcap/tidb/ddl" "github.com/pingcap/tidb/infoschema" @@ -663,3 +664,5 @@ var ( // ErrInfoSchemaChanged returns the error that information schema is changed. ErrInfoSchemaChanged = terror.ClassDomain.New(codeInfoSchemaChanged, "Information schema is changed.") ) + +var Config *config.Config diff --git a/session.go b/session.go index 49ba16b043d00..d8e94a5ea9898 100644 --- a/session.go +++ b/session.go @@ -32,6 +32,7 @@ import ( "github.com/ngaut/pools" "github.com/pingcap/tidb/ast" "github.com/pingcap/tidb/context" + "github.com/pingcap/tidb/dashbase" "github.com/pingcap/tidb/domain" "github.com/pingcap/tidb/executor" "github.com/pingcap/tidb/kv" @@ -677,6 +678,7 @@ func (s *session) Execute(sql string) ([]ast.RecordSet, error) { log.Warnf("[%d] parse error:\n%v\n%s", connID, err, sql) return nil, errors.Trace(err) } + sessionExecuteParseDuration.Observe(time.Since(startTS).Seconds()) var rs []ast.RecordSet @@ -685,6 +687,23 @@ func (s *session) Execute(sql string) ([]ast.RecordSet, error) { startTS := time.Now() // Some executions are done in compile stage, so we reset them before compile. executor.ResetStmtCtx(s, rst) + + if !s.IsSystemSession() && domain.Config.Dashbase.Enabled { + log.Infof("try dashbase execute\n") + caught, r, err := dashbase.Execute(rst) + + if caught { + if err != nil { + log.Warnf("[%d] dashbase session error:\n%v\n%s", connID, errors.ErrorStack(err), s) + err2 := s.RollbackTxn() + terror.Log(errors.Trace(err2)) + return nil, errors.Trace(err) + } + rs = append(rs, r) + continue + } + } + st, err1 := Compile(s, rst) if err1 != nil { log.Warnf("[%d] compile error:\n%v\n%s", connID, err1, sql) diff --git a/tidb-server/main.go b/tidb-server/main.go index 8ee1d13dc7f2b..cb9e0b0d4ac67 100644 --- a/tidb-server/main.go +++ b/tidb-server/main.go @@ -307,6 +307,7 @@ func validateConfig() { } func setGlobalVars() { + domain.Config = cfg ddlLeaseDuration := parseLease(cfg.Lease) tidb.SetSchemaLease(ddlLeaseDuration) statsLeaseDuration := parseLease(cfg.Performance.StatsLease) @@ -327,6 +328,9 @@ func setupDashbase() { if err := dashbase.LoadSchemaFromFile(cfg.Dashbase.SchemaFile); err != nil { log.Fatalf("Unable to load Dashbase schema definition: %s", err.Error()) } + if err := dashbase.OpenKafka(cfg.Dashbase.KafkaHosts); err != nil { + log.Fatalf("Unable to connect to Kafka: %s", err.Error()) + } } func setupLog() { From 34551b705202747ff0a022da13805d9e78e7bb95 Mon Sep 17 00:00:00 2001 From: Breezewish Date: Mon, 16 Oct 2017 15:22:26 +0800 Subject: [PATCH 05/13] fix connection --- dashbase/execute.go | 19 +------------------ session.go | 13 +++++++------ 2 files changed, 8 insertions(+), 24 deletions(-) diff --git a/dashbase/execute.go b/dashbase/execute.go index 6c612b10b9f58..a6ca44cc5960f 100644 --- a/dashbase/execute.go +++ b/dashbase/execute.go @@ -18,7 +18,6 @@ import ( "strings" "github.com/juju/errors" - "github.com/ngaut/log" "github.com/pingcap/tidb/ast" "github.com/pingcap/tidb/expression" ) @@ -32,22 +31,6 @@ func Execute(stmt ast.Node) (bool, ast.RecordSet, error) { } } -type insertRecordSet struct { - insertedRows int -} - -func (rs insertRecordSet) Next() (*ast.Row, error) { - return nil, nil -} - -func (rs insertRecordSet) Fields() ([]*ast.ResultField, error) { - return nil, nil -} - -func (rs insertRecordSet) Close() error { - return nil -} - func executeInsert(node *ast.InsertStmt) (bool, ast.RecordSet, error) { ts, ok := node.Table.TableRefs.Left.(*ast.TableSource) if !ok { @@ -170,5 +153,5 @@ func executeInsert(node *ast.InsertStmt) (bool, ast.RecordSet, error) { PublishKafka(strings.ToLower(tableName), message) } - return true, insertRecordSet{insertedRows: len(rows)}, nil + return true, nil, nil } diff --git a/session.go b/session.go index d8e94a5ea9898..658f30254381f 100644 --- a/session.go +++ b/session.go @@ -683,27 +683,28 @@ func (s *session) Execute(sql string) ([]ast.RecordSet, error) { var rs []ast.RecordSet for _, rst := range rawStmts { - s.PrepareTxnCtx() - startTS := time.Now() // Some executions are done in compile stage, so we reset them before compile. executor.ResetStmtCtx(s, rst) if !s.IsSystemSession() && domain.Config.Dashbase.Enabled { - log.Infof("try dashbase execute\n") caught, r, err := dashbase.Execute(rst) if caught { if err != nil { log.Warnf("[%d] dashbase session error:\n%v\n%s", connID, errors.ErrorStack(err), s) - err2 := s.RollbackTxn() - terror.Log(errors.Trace(err2)) return nil, errors.Trace(err) } - rs = append(rs, r) + if r != nil { + rs = append(rs, r) + } + s.SetValue(context.QueryString, rst.Text()) continue } } + s.PrepareTxnCtx() + startTS := time.Now() + st, err1 := Compile(s, rst) if err1 != nil { log.Warnf("[%d] compile error:\n%v\n%s", connID, err1, sql) From 87f684bd3c65d79b1685a92d6ce2fdfa0cc770dc Mon Sep 17 00:00:00 2001 From: Breezewish Date: Mon, 16 Oct 2017 15:23:15 +0800 Subject: [PATCH 06/13] disable dashbase by default --- config/config.go | 2 +- config/config.toml.example | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/config/config.go b/config/config.go index ac6d0bb032b97..6fdf1844d1ff4 100644 --- a/config/config.go +++ b/config/config.go @@ -130,7 +130,7 @@ var defaultConf = Config{ XPort: 14000, }, Dashbase: Dashbase{ - Enabled: true, + Enabled: false, KafkaHosts: []string{"localhost:9092"}, SchemaFile: "dashbase-schema.toml", }, diff --git a/config/config.toml.example b/config/config.toml.example index c624d18c5e392..8c94f713f888f 100644 --- a/config/config.toml.example +++ b/config/config.toml.example @@ -123,4 +123,4 @@ enabled = false kafka-hosts = ["localhost:9092"] # The path to the file describes table schemas in TOML. -schema-file = "" +schema-file = "dashbase-schema.toml" From bddb75c90be74dd74569b37e10ec21113701ba0a Mon Sep 17 00:00:00 2001 From: Breezewish Date: Mon, 16 Oct 2017 19:39:06 +0800 Subject: [PATCH 07/13] fix unit tests --- session.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/session.go b/session.go index 546d420095638..76e1c79d38ac3 100644 --- a/session.go +++ b/session.go @@ -734,7 +734,7 @@ func (s *session) Execute(sql string) (recordSets []ast.RecordSet, err error) { compiler := executor.Compiler{} for _, stmtNode := range stmtNodes { // Special logic for dashbase related SQL. - if !s.IsSystemSession() && domain.Config.Dashbase.Enabled { + if !s.IsSystemSession() && domain.Config != nil && domain.Config.Dashbase.Enabled { caught, r, err := dashbase.Execute(stmtNode) if caught { From 15f2cec2849cc8aac3fd58deb6605d09e41aab43 Mon Sep 17 00:00:00 2001 From: Breezewish Date: Wed, 25 Oct 2017 11:16:17 +0800 Subject: [PATCH 08/13] basic query --- config/config.go | 2 + config/config.toml.example | 3 + dashbase/api.go | 68 ++++++++++++++++ dashbase/execute.go | 154 +++++++++++++++++++++++++++++++++++++ 4 files changed, 227 insertions(+) create mode 100644 dashbase/api.go diff --git a/config/config.go b/config/config.go index 3a465a7193ea5..eb5e883e5a8cc 100644 --- a/config/config.go +++ b/config/config.go @@ -101,6 +101,7 @@ type PlanCache struct { type Dashbase struct { Enabled bool `toml:"enabled" json:"enabled"` KafkaHosts []string `toml:"kafka-hosts" json:"kafka-hosts"` + APIURL string `toml:"api-url" json:"api-url"` SchemaFile string `toml:"schema-file" json:"schema-file"` } @@ -145,6 +146,7 @@ var defaultConf = Config{ Dashbase: Dashbase{ Enabled: false, KafkaHosts: []string{"localhost:9092"}, + APIURL: "https://staging.dashbase.io:9876", SchemaFile: "dashbase-schema.toml", }, } diff --git a/config/config.toml.example b/config/config.toml.example index 9a963766a4755..11ff72d535a41 100644 --- a/config/config.toml.example +++ b/config/config.toml.example @@ -127,5 +127,8 @@ enabled = false # The Kafka hosts for inserting data. kafka-hosts = ["localhost:9092"] +# The API host for querying data. +api-host = "localhost:9876" + # The path to the file describes table schemas in TOML. schema-file = "dashbase-schema.toml" diff --git a/dashbase/api.go b/dashbase/api.go new file mode 100644 index 0000000000000..9944a3eddecff --- /dev/null +++ b/dashbase/api.go @@ -0,0 +1,68 @@ +// Copyright 2017 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package dashbase + +import ( + "encoding/json" + "fmt" + "net/http" + "net/url" + "time" + + "github.com/juju/errors" +) + +type ApiClient struct { + URL string +} + +type ApiSQLResponse struct { + Hits []struct { + TimeInSeconds int64 + Payload struct { + Fields map[string][]string + } + } + ErrorMessage string `json:"message"` +} + +const ( + apiTimeout time.Duration = 30 * time.Second +) + +// Query sends a SQL query to remote Dashbase API client +func (client *ApiClient) Query(SQLStatement string) (*ApiSQLResponse, error) { + param := url.Values{} + param.Add("sql", SQLStatement) + param.Add("timezone", "GMT") + + httpClient := http.Client{Timeout: apiTimeout} + resp, err := httpClient.Get(fmt.Sprintf("%s/v1/sql?%s", client.URL, param.Encode())) + if err != nil { + return nil, errors.Trace(fmt.Errorf("Failed to connect Dashbase API service at %s", client.URL)) + } + defer resp.Body.Close() + + var ret ApiSQLResponse + err = json.NewDecoder(resp.Body).Decode(&ret) + if err != nil { + return nil, errors.Trace(fmt.Errorf("Failed to decode Dashbase API response data")) + } + + if len(ret.ErrorMessage) > 0 { + return nil, errors.Trace(fmt.Errorf("Dashbase error: %s", ret.ErrorMessage)) + } + + return &ret, nil +} diff --git a/dashbase/execute.go b/dashbase/execute.go index a6ca44cc5960f..e50a6576811f5 100644 --- a/dashbase/execute.go +++ b/dashbase/execute.go @@ -19,13 +19,20 @@ import ( "github.com/juju/errors" "github.com/pingcap/tidb/ast" + "github.com/pingcap/tidb/domain" + "github.com/pingcap/tidb/executor" "github.com/pingcap/tidb/expression" + "github.com/pingcap/tidb/model" + "github.com/pingcap/tidb/mysql" + "github.com/pingcap/tidb/util/types" ) func Execute(stmt ast.Node) (bool, ast.RecordSet, error) { switch node := stmt.(type) { case *ast.InsertStmt: return executeInsert(node) + case *ast.SelectStmt: + return executeSelect(node) default: return false, nil, nil } @@ -155,3 +162,150 @@ func executeInsert(node *ast.InsertStmt) (bool, ast.RecordSet, error) { return true, nil, nil } + +type selectResultSet struct { + sql string + tableName string + fields []*ColumnDefinition + resultSetFields []*ast.ResultField // will be assigned after first Next() + fetched bool + rows []executor.Row + cursor int +} + +func (e *selectResultSet) Next() (*ast.Row, error) { + if !e.fetched { + err := e.fetchAll() + e.fetched = true + if err != nil { + return nil, errors.Trace(err) + } + } + if e.cursor >= len(e.rows) { + return nil, nil + } + row := e.rows[e.cursor] + e.cursor++ + return &ast.Row{Data: row}, nil +} + +func (e *selectResultSet) Fields() ([]*ast.ResultField, error) { + return e.resultSetFields, nil +} + +func (e *selectResultSet) Close() error { + return nil +} + +func (e *selectResultSet) fetchAll() error { + client := ApiClient{ + URL: domain.Config.Dashbase.APIURL, + } + + result, err := client.Query(e.sql) + if err != nil { + return errors.Trace(err) + } + + numberOfTargetColumns := len(e.fields) + resultSetFields := make([]*ast.ResultField, 0) + for _, field := range e.fields { + rf := ast.ResultField{ + ColumnAsName: model.NewCIStr(field.name), + TableAsName: model.NewCIStr(e.tableName), + DBName: model.NewCIStr(""), + Table: &model.TableInfo{Name: model.NewCIStr(e.tableName)}, + Column: &model.ColumnInfo{ + FieldType: *types.NewFieldType(mysql.TypeVarString), + Name: model.NewCIStr(field.name), + }, + } + resultSetFields = append(resultSetFields, &rf) + } + e.resultSetFields = resultSetFields + + // TODO: support time column + // TODO: support aggregation column + + for _, hit := range result.Hits { + loweredKeyRow := make(map[string]string) + for key, field := range hit.Payload.Fields { + if len(field) > 0 { + loweredKeyRow[strings.ToLower(key)] = field[0] + } + } + datums := make([]types.Datum, numberOfTargetColumns) + for i, field := range e.fields { + data, ok := loweredKeyRow[strings.ToLower(field.name)] + if !ok { + datums[i] = types.NewDatum(nil) + } else { + datums[i] = types.NewDatum(data) + } + } + e.rows = append(e.rows, datums) + } + return nil +} + +func executeSelect(node *ast.SelectStmt) (bool, ast.RecordSet, error) { + ts, ok := node.From.TableRefs.Left.(*ast.TableSource) + if !ok { + return false, nil, nil + } + tn, ok := ts.Source.(*ast.TableName) + if !ok { + return false, nil, nil + } + tableName := tn.Name.L + tableSchema := GetTableSchema(tableName) + if tableSchema == nil { + return false, nil, nil + } + + columnsByName := make(map[string]*ColumnDefinition) + for _, column := range tableSchema.columns { + columnsByName[strings.ToLower(column.name)] = column + } + + hasWildcard := false + + for _, field := range node.Fields.Fields { + if field.WildCard != nil { + hasWildcard = true + break + } + } + + fields := make([]*ColumnDefinition, 0) + + // field selections + if hasWildcard { + for _, col := range tableSchema.columns { + fields = append(fields, col) + } + } else { + for _, field := range node.Fields.Fields { + if field.WildCard != nil { + continue + } + switch expr := field.Expr.(type) { + case *ast.ColumnNameExpr: + lowerColumnName := expr.Name.Name.L + col, ok := columnsByName[lowerColumnName] + if !ok { + return true, nil, fmt.Errorf("Column %s not found", expr.Name.Name.O) + } + fields = append(fields, col) + default: + // ignore non-column-name ref + } + } + } + + return true, &selectResultSet{ + sql: node.Text(), + tableName: tableName, + fields: fields, + }, nil +} From 627df347a7e959aa55fcd2867cdc939fa4399168 Mon Sep 17 00:00:00 2001 From: Breezewish Date: Wed, 25 Oct 2017 14:02:26 +0800 Subject: [PATCH 09/13] support time column --- dashbase/execute.go | 26 ++++++++++++++++++++------ 1 file changed, 20 insertions(+), 6 deletions(-) diff --git a/dashbase/execute.go b/dashbase/execute.go index e50a6576811f5..b5cf5130d566e 100644 --- a/dashbase/execute.go +++ b/dashbase/execute.go @@ -16,6 +16,7 @@ package dashbase import ( "fmt" "strings" + "time" "github.com/juju/errors" "github.com/pingcap/tidb/ast" @@ -210,13 +211,19 @@ func (e *selectResultSet) fetchAll() error { numberOfTargetColumns := len(e.fields) resultSetFields := make([]*ast.ResultField, 0) for _, field := range e.fields { + var fieldType *types.FieldType + if field.dataType == TypeTime { + fieldType = types.NewFieldType(mysql.TypeDatetime) + } else { + fieldType = types.NewFieldType(mysql.TypeString) + } rf := ast.ResultField{ ColumnAsName: model.NewCIStr(field.name), TableAsName: model.NewCIStr(e.tableName), DBName: model.NewCIStr(""), Table: &model.TableInfo{Name: model.NewCIStr(e.tableName)}, Column: &model.ColumnInfo{ - FieldType: *types.NewFieldType(mysql.TypeVarString), + FieldType: *fieldType, Name: model.NewCIStr(field.name), }, } @@ -224,7 +231,6 @@ func (e *selectResultSet) fetchAll() error { } e.resultSetFields = resultSetFields - // TODO: support time column // TODO: support aggregation column for _, hit := range result.Hits { @@ -236,11 +242,19 @@ func (e *selectResultSet) fetchAll() error { } datums := make([]types.Datum, numberOfTargetColumns) for i, field := range e.fields { - data, ok := loweredKeyRow[strings.ToLower(field.name)] - if !ok { - datums[i] = types.NewDatum(nil) + if field.dataType == TypeTime { + datums[i] = types.NewDatum(types.Time{ + Time: types.FromGoTime(time.Unix(hit.TimeInSeconds, 0)), + Type: mysql.TypeDatetime, + TimeZone: time.Local, + }) } else { - datums[i] = types.NewDatum(data) + data, ok := loweredKeyRow[strings.ToLower(field.name)] + if !ok { + datums[i] = types.NewDatum(nil) + } else { + datums[i] = types.NewDatum(data) + } } } e.rows = append(e.rows, datums) From db589a64acfee162c94aa9f3981df5b0066b4ad2 Mon Sep 17 00:00:00 2001 From: Breezewish Date: Thu, 26 Oct 2017 14:07:45 +0800 Subject: [PATCH 10/13] aggregations --- dashbase/api.go | 11 ++++++ dashbase/execute.go | 86 ++++++++++++++++++++++++++++++++++----------- 2 files changed, 77 insertions(+), 20 deletions(-) diff --git a/dashbase/api.go b/dashbase/api.go index 9944a3eddecff..3bbe6888c06da 100644 --- a/dashbase/api.go +++ b/dashbase/api.go @@ -28,12 +28,23 @@ type ApiClient struct { } type ApiSQLResponse struct { + Request struct { + Aggregations map[string]struct { + RequestType string + Col string + Type string + } + } Hits []struct { TimeInSeconds int64 Payload struct { Fields map[string][]string } } + Aggregations map[string]struct { + ResponseType string + Value float64 + } ErrorMessage string `json:"message"` } diff --git a/dashbase/execute.go b/dashbase/execute.go index b5cf5130d566e..c60de78fbf8c5 100644 --- a/dashbase/execute.go +++ b/dashbase/execute.go @@ -210,6 +210,8 @@ func (e *selectResultSet) fetchAll() error { numberOfTargetColumns := len(e.fields) resultSetFields := make([]*ast.ResultField, 0) + + // selection columns for _, field := range e.fields { var fieldType *types.FieldType if field.dataType == TypeTime { @@ -229,36 +231,80 @@ func (e *selectResultSet) fetchAll() error { } resultSetFields = append(resultSetFields, &rf) } + + // aggregation columns + aggregations := make([]string, 0) + for key := range result.Request.Aggregations { + aggregations = append(aggregations, key) + numberOfTargetColumns++ + rf := ast.ResultField{ + ColumnAsName: model.NewCIStr(key), + TableAsName: model.NewCIStr(e.tableName), + DBName: model.NewCIStr(""), + Table: &model.TableInfo{Name: model.NewCIStr(e.tableName)}, + Column: &model.ColumnInfo{ + FieldType: *types.NewFieldType(mysql.TypeDouble), + Name: model.NewCIStr(key), + }, + } + resultSetFields = append(resultSetFields, &rf) + } + e.resultSetFields = resultSetFields - // TODO: support aggregation column + var iterateHitMax int // how many rows should we iterate + if len(aggregations) > 0 { + iterateHitMax = 1 + } else { + iterateHitMax = len(result.Hits) + } + + for i := 0; i < iterateHitMax; i++ { + datums := make([]types.Datum, 0) - for _, hit := range result.Hits { - loweredKeyRow := make(map[string]string) - for key, field := range hit.Payload.Fields { - if len(field) > 0 { - loweredKeyRow[strings.ToLower(key)] = field[0] + // append field selections + if i >= len(result.Hits) { + for range e.fields { + datums = append(datums, types.NewDatum(nil)) } - } - datums := make([]types.Datum, numberOfTargetColumns) - for i, field := range e.fields { - if field.dataType == TypeTime { - datums[i] = types.NewDatum(types.Time{ - Time: types.FromGoTime(time.Unix(hit.TimeInSeconds, 0)), - Type: mysql.TypeDatetime, - TimeZone: time.Local, - }) - } else { - data, ok := loweredKeyRow[strings.ToLower(field.name)] - if !ok { - datums[i] = types.NewDatum(nil) + } else { + hit := result.Hits[i] + loweredKeyRow := make(map[string]string) + for key, field := range hit.Payload.Fields { + if len(field) > 0 { + loweredKeyRow[strings.ToLower(key)] = field[0] + } + } + for _, field := range e.fields { + if field.dataType == TypeTime { + datums = append(datums, types.NewDatum(types.Time{ + Time: types.FromGoTime(time.Unix(hit.TimeInSeconds, 0)), + Type: mysql.TypeDatetime, + TimeZone: time.Local, + })) } else { - datums[i] = types.NewDatum(data) + data, ok := loweredKeyRow[strings.ToLower(field.name)] + if !ok { + datums = append(datums, types.NewDatum(nil)) + } else { + datums = append(datums, types.NewDatum(data)) + } } } } + + // append aggregation selections + for _, key := range aggregations { + data, ok := result.Aggregations[key] + if !ok { + datums = append(datums, types.NewDatum(nil)) + } else { + datums = append(datums, types.NewDatum(data.Value)) + } + } e.rows = append(e.rows, datums) } + return nil } From 8cae8e56f45ad6586c7918905798d202158b9a47 Mon Sep 17 00:00:00 2001 From: Breezewish Date: Mon, 30 Oct 2017 19:15:02 +0800 Subject: [PATCH 11/13] grumble --- config/config.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/config/config.go b/config/config.go index 26bb6f24ac454..ee0b8aaed357f 100644 --- a/config/config.go +++ b/config/config.go @@ -196,7 +196,7 @@ var defaultConf = Config{ Dashbase: Dashbase{ Enabled: false, KafkaHosts: []string{"localhost:9092"}, - APIURL: "https://staging.dashbase.io:9876", + APIURL: "http://localhost:9876", SchemaFile: "dashbase-schema.toml", }, } From 602a0d6f9a57b40350034dd75b5476d2ca0ea1e4 Mon Sep 17 00:00:00 2001 From: Breezewish Date: Thu, 2 Nov 2017 15:00:05 +0800 Subject: [PATCH 12/13] fix selecting variable --- dashbase/execute.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/dashbase/execute.go b/dashbase/execute.go index c60de78fbf8c5..8f0c95a208d0a 100644 --- a/dashbase/execute.go +++ b/dashbase/execute.go @@ -309,6 +309,9 @@ func (e *selectResultSet) fetchAll() error { } func executeSelect(node *ast.SelectStmt) (bool, ast.RecordSet, error) { + if node.From == nil { + return false, nil, nil + } ts, ok := node.From.TableRefs.Left.(*ast.TableSource) if !ok { return false, nil, nil From 399111de29f3020b41c618d870d23d4bea0aead5 Mon Sep 17 00:00:00 2001 From: Breezewish Date: Thu, 2 Nov 2017 16:00:46 +0800 Subject: [PATCH 13/13] fix config --- config/config.toml.example | 2 +- dashbase/execute.go | 4 ++-- domain/domain.go | 3 --- session.go | 3 ++- tidb-server/main.go | 1 - 5 files changed, 5 insertions(+), 8 deletions(-) diff --git a/config/config.toml.example b/config/config.toml.example index 14106231bcc35..28628cea68329 100644 --- a/config/config.toml.example +++ b/config/config.toml.example @@ -186,7 +186,7 @@ enabled = false kafka-hosts = ["localhost:9092"] # The API host for querying data. -api-host = "localhost:9876" +api-url = "http://localhost:9876" # The path to the file describes table schemas in TOML. schema-file = "dashbase-schema.toml" diff --git a/dashbase/execute.go b/dashbase/execute.go index 8f0c95a208d0a..713e228f31326 100644 --- a/dashbase/execute.go +++ b/dashbase/execute.go @@ -20,7 +20,7 @@ import ( "github.com/juju/errors" "github.com/pingcap/tidb/ast" - "github.com/pingcap/tidb/domain" + "github.com/pingcap/tidb/config" "github.com/pingcap/tidb/executor" "github.com/pingcap/tidb/expression" "github.com/pingcap/tidb/model" @@ -200,7 +200,7 @@ func (e *selectResultSet) Close() error { func (e *selectResultSet) fetchAll() error { client := ApiClient{ - URL: domain.Config.Dashbase.APIURL, + URL: config.GetGlobalConfig().Dashbase.APIURL, } result, err := client.Query(e.sql) diff --git a/domain/domain.go b/domain/domain.go index 3af77ee2a1adb..1307d21205ec3 100644 --- a/domain/domain.go +++ b/domain/domain.go @@ -24,7 +24,6 @@ import ( "github.com/grpc-ecosystem/go-grpc-prometheus" "github.com/juju/errors" "github.com/ngaut/pools" - "github.com/pingcap/tidb/config" "github.com/pingcap/tidb/context" "github.com/pingcap/tidb/ddl" "github.com/pingcap/tidb/infoschema" @@ -671,5 +670,3 @@ var ( // ErrInfoSchemaChanged returns the error that information schema is changed. ErrInfoSchemaChanged = terror.ClassDomain.New(codeInfoSchemaChanged, "Information schema is changed.") ) - -var Config *config.Config diff --git a/session.go b/session.go index 585ed22f58994..09e4d3c071022 100644 --- a/session.go +++ b/session.go @@ -31,6 +31,7 @@ import ( "github.com/juju/errors" "github.com/ngaut/pools" "github.com/pingcap/tidb/ast" + "github.com/pingcap/tidb/config" "github.com/pingcap/tidb/context" "github.com/pingcap/tidb/dashbase" "github.com/pingcap/tidb/domain" @@ -741,7 +742,7 @@ func (s *session) Execute(sql string) (recordSets []ast.RecordSet, err error) { compiler := executor.Compiler{} for _, stmtNode := range stmtNodes { // Special logic for dashbase related SQL. - if !s.IsSystemSession() && domain.Config != nil && domain.Config.Dashbase.Enabled { + if !s.IsSystemSession() && config.GetGlobalConfig().Dashbase.Enabled { caught, r, err := dashbase.Execute(stmtNode) if caught { diff --git a/tidb-server/main.go b/tidb-server/main.go index ca7e1d6feae9f..bb36e7ca2bccd 100644 --- a/tidb-server/main.go +++ b/tidb-server/main.go @@ -312,7 +312,6 @@ func validateConfig() { } func setGlobalVars() { - domain.Config = cfg ddlLeaseDuration := parseLease(cfg.Lease) tidb.SetSchemaLease(ddlLeaseDuration) statsLeaseDuration := parseLease(cfg.Performance.StatsLease)