Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix bugs #44

Merged
merged 4 commits into from
Apr 27, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion business/ck_rebalance.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func (this *CKRebalance) InitCKConns() (err error) {
func (this *CKRebalance) GetTables() (err error) {
host := this.Hosts[0]
db := this.CKConns[host]
if this.Databases, this.DBTables,err = common.GetMergeTreeTables(db, host); err != nil{
if this.Databases, this.DBTables,err = common.GetMergeTreeTables("MergeTree", db, host); err != nil{
err = errors.Wrapf(err, "")
return
}
Expand Down
6 changes: 3 additions & 3 deletions common/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,13 +120,13 @@ func ConnectClickHouse(host string, port int, database string, user string, pass
return db, nil
}

func GetMergeTreeTables(db *sql.DB, host string) ([]string, map[string][]string, error) {
func GetMergeTreeTables(engine string, db *sql.DB, host string) ([]string, map[string][]string, error) {
var rows *sql.Rows
var databases []string
var err error
dbtables := make(map[string][]string)
query := fmt.Sprintf("SELECT DISTINCT database, name FROM system.tables WHERE (engine LIKE '%%MergeTree%%') AND (database != 'system') ORDER BY database")
log.Logger.Infof("host %s: query: %s", host, query)
query := fmt.Sprintf("SELECT DISTINCT database, name FROM system.tables WHERE (match(engine, '%s')) AND (database != 'system') ORDER BY database", engine)
log.Logger.Debugf("host %s: query: %s", host, query)
if rows, err = db.Query(query); err != nil {
err = errors.Wrapf(err, "")
return nil, nil, err
Expand Down
109 changes: 86 additions & 23 deletions controller/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,12 @@ package controller
import (
"database/sql"
"fmt"
"github.com/housepower/ckman/business"
"github.com/housepower/ckman/common"
"github.com/pkg/errors"
"golang.org/x/crypto/ssh"
"strconv"

"github.com/housepower/ckman/business"
"github.com/pkg/errors"

"github.com/housepower/ckman/service/nacos"

_ "github.com/ClickHouse/clickhouse-go"
Expand Down Expand Up @@ -239,6 +238,26 @@ func (ck *ClickHouseController) CreateTable(c *gin.Context) {
return
}

//sync zookeeper path
var conf model.CKManClickHouseConfig
con, ok := clickhouse.CkClusters.Load(clusterName)
if !ok {
model.WrapMsg(c, model.CLUSTER_NOT_EXIST, model.GetMsg(c, model.CLUSTER_NOT_EXIST),
fmt.Sprintf("cluster %s does not exist", clusterName))
return
}

conf = con.(model.CKManClickHouseConfig)
err = clickhouse.GetReplicaZkPath(&conf)

if err = ck.syncDownClusters(c); err != nil {
return
}
clickhouse.CkClusters.Store(clusterName, conf)
if err = ck.syncUpClusters(c); err != nil {
return
}

model.WrapMsg(c, model.SUCCESS, model.GetMsg(c, model.SUCCESS), nil)
}

Expand Down Expand Up @@ -307,18 +326,36 @@ func (ck *ClickHouseController) DeleteTable(c *gin.Context) {
return
}

var conf model.CKManClickHouseConfig
con, ok := clickhouse.CkClusters.Load(clusterName)
if !ok {
model.WrapMsg(c, model.CLUSTER_NOT_EXIST, model.GetMsg(c, model.CLUSTER_NOT_EXIST),
fmt.Sprintf("cluster %s does not exist", clusterName))
return
}

conf = con.(model.CKManClickHouseConfig)

params.Cluster = ckService.Config.Cluster
params.Name = c.Query("tableName")
params.DB = c.Query("database")
if params.DB == "" {
params.DB = ckService.Config.DB
}

if err := ckService.DeleteTable(&params); err != nil {
if err := ckService.DeleteTable(&conf, &params); err != nil {
model.WrapMsg(c, model.DELETE_CK_TABLE_FAIL, model.GetMsg(c, model.DELETE_CK_TABLE_FAIL), err)
return
}

if err = ck.syncDownClusters(c); err != nil {
return
}
clickhouse.CkClusters.Store(clusterName, conf)
if err = ck.syncUpClusters(c); err != nil {
return
}

model.WrapMsg(c, model.SUCCESS, model.GetMsg(c, model.SUCCESS), nil)
}

Expand Down Expand Up @@ -501,13 +538,28 @@ func (ck *ClickHouseController) StopCluster(c *gin.Context) {
return
}

//before stop, we need sync zoopath
err := clickhouse.GetReplicaZkPath(&conf)
if err != nil {
model.WrapMsg(c, model.STOP_CK_CLUSTER_FAIL, model.GetMsg(c, model.STOP_CK_CLUSTER_FAIL), err)
return
}

clickhouse.CkServices.Delete(clusterName)
err := deploy.StopCkCluster(&conf)
err = deploy.StopCkCluster(&conf)
if err != nil {
model.WrapMsg(c, model.STOP_CK_CLUSTER_FAIL, model.GetMsg(c, model.STOP_CK_CLUSTER_FAIL), err)
return
}

if err = ck.syncDownClusters(c); err != nil {
return
}
clickhouse.CkClusters.Store(clusterName, conf)
if err = ck.syncUpClusters(c); err != nil {
return
}

model.WrapMsg(c, model.SUCCESS, model.GetMsg(c, model.SUCCESS), nil)
}

Expand Down Expand Up @@ -545,6 +597,7 @@ func (ck *ClickHouseController) DestroyCluster(c *gin.Context) {
if err = ck.syncDownClusters(c); err != nil {
return
}
clickhouse.CkClusters.Delete(clusterName)
clickhouse.CkServices.Delete(clusterName)
if err = ck.syncUpClusters(c); err != nil {
return
Expand Down Expand Up @@ -895,22 +948,32 @@ func (ck *ClickHouseController) PingCluster(c *gin.Context) {
model.WrapMsg(c, model.PING_CK_CLUSTER_FAIL, model.GetMsg(c, model.PING_CK_CLUSTER_FAIL), "can't find any host")
return
}
var failList []string
for _, host := range conf.Hosts {
connect,err := common.ConnectClickHouse(host, conf.Port, req.Database, req.User, req.Password)
if err != nil {
failList = append(failList, host)
log.Logger.Error("err: %+v", err)
continue

var err error
var db *sql.DB
shardAvailable := true
for _, shard := range conf.Shards{
failNum := 0
for _, replica := range shard.Replicas {
host := replica.Ip
db,err = common.ConnectClickHouse(host, conf.Port, req.Database, req.User, req.Password)
if err != nil {
log.Logger.Error("err: %+v", err)
failNum++
continue
}
if err = db.Ping(); err != nil {
log.Logger.Error("err: %+v", err)
failNum++
continue
}
}
if err = connect.Ping(); err != nil {
failList = append(failList, host)
log.Logger.Error("err: %+v", err)
continue
if failNum == len(shard.Replicas) {
shardAvailable = false
}
}
if len(failList) > 0 {
err := fmt.Errorf("failList: %v", failList)

if !shardAvailable {
model.WrapMsg(c, model.PING_CK_CLUSTER_FAIL, model.GetMsg(c, model.PING_CK_CLUSTER_FAIL), err)
return
}
Expand Down Expand Up @@ -999,7 +1062,7 @@ func (ck *ClickHouseController) ArchiveToHDFS(c *gin.Context) {
conf = con.(model.CKManClickHouseConfig)

if len(conf.Hosts) == 0 {
model.WrapMsg(c, model.PURGER_TABLES_FAIL, model.GetMsg(c, model.PURGER_TABLES_FAIL),
model.WrapMsg(c, model.ARCHIVE_TO_HDFS_FAIL, model.GetMsg(c, model.ARCHIVE_TO_HDFS_FAIL),
errors.Errorf("can't find any host"))
return
}
Expand Down Expand Up @@ -1027,19 +1090,19 @@ func (ck *ClickHouseController) ArchiveToHDFS(c *gin.Context) {

archive.FillArchiveDefault()
if err := archive.InitConns(); err != nil {
model.WrapMsg(c, model.PING_CK_CLUSTER_FAIL, model.GetMsg(c, model.PING_CK_CLUSTER_FAIL), err)
model.WrapMsg(c, model.ARCHIVE_TO_HDFS_FAIL, model.GetMsg(c, model.ARCHIVE_TO_HDFS_FAIL), err)
}

if err := archive.GetSortingInfo(); err != nil {
model.WrapMsg(c, model.PING_CK_CLUSTER_FAIL, model.GetMsg(c, model.PING_CK_CLUSTER_FAIL), err)
model.WrapMsg(c, model.ARCHIVE_TO_HDFS_FAIL, model.GetMsg(c, model.ARCHIVE_TO_HDFS_FAIL), err)
}

if err := archive.ClearHDFS(); err != nil {
model.WrapMsg(c, model.PING_CK_CLUSTER_FAIL, model.GetMsg(c, model.PING_CK_CLUSTER_FAIL), err)
model.WrapMsg(c, model.ARCHIVE_TO_HDFS_FAIL, model.GetMsg(c, model.ARCHIVE_TO_HDFS_FAIL), err)
}

if err := archive.ExportToHDFS(); err != nil {
model.WrapMsg(c, model.PING_CK_CLUSTER_FAIL, model.GetMsg(c, model.PING_CK_CLUSTER_FAIL), err)
model.WrapMsg(c, model.ARCHIVE_TO_HDFS_FAIL, model.GetMsg(c, model.PING_CK_CLUSTER_FAIL), err)
}
model.WrapMsg(c, model.SUCCESS, model.GetMsg(c, model.SUCCESS), nil)
}
79 changes: 75 additions & 4 deletions deploy/ck.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"encoding/xml"
"fmt"
"github.com/housepower/ckman/service/zookeeper"
"io/ioutil"
"os"
"path"
Expand Down Expand Up @@ -622,6 +623,20 @@ func DestroyCkCluster(conf *model.CKManClickHouseConfig) error {
return err
}


//clear zkNode
service, err := zookeeper.NewZkService(conf.ZkNodes, conf.ZkPort)
if err != nil {
return err
}
zooPaths := clickhouse.ConvertZooPath(conf)
if len(zooPaths) > 0 {
for _, zooPath := range zooPaths {
if err := service.DeleteAll(zooPath); err != nil {
return err
}
}
}
return nil
}

Expand Down Expand Up @@ -733,17 +748,73 @@ func AddCkClusterNode(conf *model.CKManClickHouseConfig, req *model.AddNodeReq)
}

func DeleteCkClusterNode(conf *model.CKManClickHouseConfig, ip string) error {
// find the node index
// If the cluster just have 1 replica in shard, and the shard number not the biggest, we don't allow to delete it.
available := false
needDrop := false
shardNum := 0
var err error
for i, shard := range conf.Shards{
for _,replica := range shard.Replicas{
if replica.Ip == ip {
shardNum = i + 1
available = true
if i + 1 == len(conf.Shards){
if len(shard.Replicas) == 1{
needDrop = true
}
} else {
if len(shard.Replicas) == 1{
err = fmt.Errorf("can't delete node which only 1 replica in shard")
}
}
break
}
}
}

if !available {
err = fmt.Errorf("can't find this ip in cluster")
}

if err != nil {
log.Logger.Errorf("can't delete this node: %v", err)
return err
}

//delete zookeeper path if need
if needDrop {
log.Logger.Infof("the node %s is the only replica in shard, so we should delete zoopath", ip)
if err = clickhouse.GetReplicaZkPath(conf); err != nil {
return err
}
var zooPaths []string
for _, path := range conf.ZooPath {
zooPath := strings.Replace(path, "{cluster}", conf.Cluster, -1)
zooPath = strings.Replace(zooPath, "{shard}", fmt.Sprintf("%d", shardNum), -1)
zooPaths = append(zooPaths, zooPath)
}

service, err := zookeeper.NewZkService(conf.ZkNodes, conf.ZkPort)
if err != nil {
return err
}

for _, path := range zooPaths {
log.Logger.Debugf("zoopath: %s", path)
err := service.DeleteAll(path)
if err != nil {
return err
}
}
}

index := 0
for index < len(conf.Hosts) {
if conf.Hosts[index] == ip {
break
}
index++
}
if index >= len(conf.Hosts) {
return errors.Errorf("can'f find node %s on cluster %s", ip, conf.Cluster)
}

// stop the node
deploy := &CKDeploy{
Expand Down
37 changes: 19 additions & 18 deletions model/deploy_ck.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,24 +63,25 @@ type CkImportConfig struct {
}

type CKManClickHouseConfig struct {
Mode string `json:"mode"`
Hosts []string `json:"hosts"`
Names []string `json:"names"`
Port int `json:"port"`
HttpPort int `json:"httpPort"`
User string `json:"user"`
Password string `json:"password"`
DB string `json:"database"`
Cluster string `json:"cluster"`
ZkNodes []string `json:"zkNodes"`
ZkPort int `json:"zkPort"`
ZkStatusPort int `json:"zkStatusPort"`
IsReplica bool `json:"isReplica"`
Version string `json:"version"`
SshUser string `json:"sshUser"`
SshPassword string `json:"sshPassword"`
Shards []CkShard `json:"shards"`
Path string `json:"path"`
Mode string `json:"mode"`
Hosts []string `json:"hosts"`
Names []string `json:"names"`
Port int `json:"port"`
HttpPort int `json:"httpPort"`
User string `json:"user"`
Password string `json:"password"`
DB string `json:"database"`
Cluster string `json:"cluster"`
ZkNodes []string `json:"zkNodes"`
ZkPort int `json:"zkPort"`
ZkStatusPort int `json:"zkStatusPort"`
IsReplica bool `json:"isReplica"`
Version string `json:"version"`
SshUser string `json:"sshUser"`
SshPassword string `json:"sshPassword"`
Shards []CkShard `json:"shards"`
Path string `json:"path"`
ZooPath map[string]string `json:"zooPath"`
}

func (config *CkDeployConfig) Normalize() {
Expand Down
Loading