Skip to content

Commit

Permalink
refactor(backend): 同步1.3.0代码 #4581
Browse files Browse the repository at this point in the history
  • Loading branch information
zhangzhw8 authored May 24, 2024
2 parents fd69590 + 688c647 commit 39fb587
Show file tree
Hide file tree
Showing 243 changed files with 9,133 additions and 2,434 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
package kafkacmd

import (
"encoding/json"
"fmt"

"dbm-services/bigdata/db-tools/dbactuator/internal/subcmd"
"dbm-services/bigdata/db-tools/dbactuator/pkg/components/kafka"
"dbm-services/bigdata/db-tools/dbactuator/pkg/rollback"
"dbm-services/bigdata/db-tools/dbactuator/pkg/util"
"dbm-services/common/go-pubpkg/logger"

"github.com/spf13/cobra"
)

// CheckBrokerEmptyAct 结构体定义了检查Kafka Broker是否为空的操作
type CheckBrokerEmptyAct struct {
*subcmd.BaseOptions // 嵌入基础选项
Service kafka.DecomBrokerComp // Kafka服务组件
}

// CheckBrokerEmptyCommand 创建并返回一个cobra命令,用于检查Kafka Broker是否为空
func CheckBrokerEmptyCommand() *cobra.Command {
act := CheckBrokerEmptyAct{
BaseOptions: subcmd.GBaseOptions, // 初始化基础选项
}
cmd := &cobra.Command{
Use: "check_broker_empty",
Short: "检查Broker为空",
Example: fmt.Sprintf(`dbactuator kafka check_broker_empty %s`, subcmd.CmdBaseExapmleStr),
Run: func(cmd *cobra.Command, args []string) {
util.CheckErr(act.Validate()) // 验证参数
if act.RollBack {
util.CheckErr(act.Rollback()) // 执行回滚操作
return
}
util.CheckErr(act.Init()) // 初始化操作
util.CheckErr(act.Run()) // 执行检查操作
},
}
return cmd
}

// Validate 验证CheckBrokerEmptyAct结构体的参数
func (d *CheckBrokerEmptyAct) Validate() (err error) {
return d.BaseOptions.Validate() // 调用基础选项的验证方法
}

// Init 初始化CheckBrokerEmptyAct结构体
func (d *CheckBrokerEmptyAct) Init() (err error) {
logger.Info("CheckBrokerEmptyAct Init")
if err = d.Deserialize(&d.Service.Params); err != nil {
logger.Error("DeserializeAndValidate failed, %v", err)
return err
}
d.Service.GeneralParam = subcmd.GeneralRuntimeParam // 设置通用运行时参数
return d.Service.Init() // 初始化服务组件
}

// Rollback 执行回滚操作
func (d *CheckBrokerEmptyAct) Rollback() (err error) {
var r rollback.RollBackObjects
if err = d.DeserializeAndValidate(&r); err != nil {
logger.Error("DeserializeAndValidate failed, %v", err)
return err
}
err = r.RollBack() // 调用回滚对象的回滚方法
if err != nil {
logger.Error("roll back failed %s", err.Error())
}
return
}

// Run 执行检查Kafka Broker是否为空的操作
func (d *CheckBrokerEmptyAct) Run() (err error) {
steps := subcmd.Steps{
{
FunName: "检查Broker为空",
Func: d.Service.DoEmptyCheck, // 指定执行的函数
},
}

if err := steps.Run(); err != nil {
rollbackCtxb, rerr := json.Marshal(d.Service.RollBackContext) // 序列化回滚上下文
if rerr != nil {
logger.Error("json Marshal %s", err.Error())
fmt.Printf("<ctx>Can't RollBack<ctx>\n")
}
fmt.Printf("<ctx>%s<ctx>\n", string(rollbackCtxb)) // 打印回滚上下文
return err
}

logger.Info("check_broker_empty successfully") // 打印成功信息
return nil
}
Original file line number Diff line number Diff line change
@@ -1,25 +1,29 @@
package kafkacmd

// Todo
import (
// 导入内部子命令包和模板工具包
. "dbm-services/bigdata/db-tools/dbactuator/internal/subcmd"
"dbm-services/bigdata/db-tools/dbactuator/pkg/util/templates"

"github.com/spf13/cobra"
)

// NewKafkaCommand TODO
// Todo
// NewKafkaCommand 创建一个新的Kafka命令集
// 这个函数返回一个*cobra.Command对象,它是Kafka相关操作的根命令
func NewKafkaCommand() *cobra.Command {
// 创建根命令"kafka",它将包含一系列子命令
cmds := &cobra.Command{
Use: "kafka [kafka operation]",
Short: "Kafka Operation Command Line Interface",
RunE: ValidateSubCommand(),
Use: "kafka [kafka operation]", // 使用说明
Short: "Kafka Operation Command Line Interface", // 简短描述
RunE: ValidateSubCommand(), // 当运行无子命令时,验证是否提供了子命令
}

// 定义一组子命令
groups := templates.CommandGroups{
{
Message: "kafka operation sets",
Message: "kafka operation sets", // 组描述
Commands: []*cobra.Command{
// 添加Kafka相关的子命令
InitCommand(),
DecompressKafkaPkgCommand(),
InstallSupervisorCommand(),
Expand All @@ -37,9 +41,14 @@ func NewKafkaCommand() *cobra.Command {
ReconfigRemoveCommand(),
RestartBrokerCommand(),
ReplaceBrokerCommand(),
CheckBrokerEmptyCommand(),
},
},
}

// 将命令组添加到根命令
groups.Add(cmds)

// 返回根命令对象
return cmds
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package kafka

import (
"fmt"
"io/ioutil"
"os"
"strings"
"time"

Expand Down Expand Up @@ -85,7 +85,7 @@ func (d *DecomBrokerComp) DoReplaceBrokers() (err error) {
// /data/kafkaenv/host.json
jsonFile := fmt.Sprintf("%s/%s.json", cst.DefaultKafkaEnv, broker)
logger.Info("jsonfile: %s", jsonFile)
if err = ioutil.WriteFile(jsonFile, []byte(topicJSON), 0644); err != nil {
if err = os.WriteFile(jsonFile, []byte(topicJSON), 0644); err != nil {
logger.Error("write %s failed, %v", jsonFile, err)
return err
}
Expand Down Expand Up @@ -120,43 +120,34 @@ func (d *DecomBrokerComp) DoReplaceBrokers() (err error) {
return nil
}

// DoDecomBrokers TODO
/**
* @description:
* @return
*/
func (d *DecomBrokerComp) DoDecomBrokers() error {

// DoDecomBrokers 进行 Kafka broker 的缩容
func (d *DecomBrokerComp) DoDecomBrokers() (err error) {
var id string
// 连接到 Zookeeper
zkHost := d.Params.ZookeeperIP + ":2181"
brokers := d.Params.ExcludeBrokers

// connect to zk
conn, _, err := zk.Connect([]string{zkHost}, 10*time.Second) // *10)
if err != nil {
logger.Error("Connect zk failed, %s", err)
return err
}
defer conn.Close()

/*
allIds, err := kafkautil.GetBrokerIds(zkHost)
if err != nil {
logger.Error("can't get broker ids", err)
return err
}
*/
// 获取要缩容的 broker 的 ID
var excludeIds []string
for _, broker := range brokers {

id, err := kafkautil.GetBrokerIDByHost(conn, broker)
for _, broker := range d.Params.ExcludeBrokers {
id, err = kafkautil.GetBrokerIDByHost(conn, broker)
if err != nil {
logger.Error("cant get %s broker id, %s", broker, err)
return err
continue
}
excludeIds = append(excludeIds, id)
}
// 假如缩容的host已经不在kafka集群,例如机器故障已经关机了,这种情况不生成执行计划
if len(excludeIds) == 0 {
logger.Info("缩容的broker不在集群里面.")
return nil
}
logger.Info("excludeIds: %v", excludeIds)
// Get topics
// 获取主题并写入 JSON 文件
b, err := kafkautil.WriteTopicJSON(zkHost)
if err != nil {
return err
Expand All @@ -165,23 +156,23 @@ func (d *DecomBrokerComp) DoDecomBrokers() error {
logger.Info("No need to do reassignment.")
return nil
}

logger.Info("Creating topic.json file")
topicJSONFile := fmt.Sprintf("%s/topic.json", cst.DefaultKafkaEnv)
if err := ioutil.WriteFile(topicJSONFile, b, 0644); err != nil {
if err = os.WriteFile(topicJSONFile, b, 0644); err != nil {
logger.Error("write %s failed, %s", topicJSONFile, err)
return err
}

// 生成分区副本重分配的计划并写入 JSON 文件
logger.Info("Creating plan.json file")
err = kafkautil.GenReassignmentJSON(conn, zkHost, excludeIds)
if err != nil {
logger.Error("Create plan.json failed %s", err)
return err
}

// 执行分区副本重分配
logger.Info("Execute the plan")
planJSONFile := fmt.Sprintf("%s/plan.json", cst.DefaultKafkaEnv)
planJSONFile := cst.PlanJSONFile
err = kafkautil.DoReassignPartitions(zkHost, planJSONFile)
if err != nil {
logger.Error("Execute partitions reassignment failed %s", err)
Expand All @@ -191,46 +182,81 @@ func (d *DecomBrokerComp) DoDecomBrokers() error {
return nil
}

// DoPartitionCheck TODO
// DoPartitionCheck 检查Kafka分区搬迁的状态。
// 这个过程会重复检查搬迁状态,直到所有分区都成功搬迁或达到最大重试次数。
func (d *DecomBrokerComp) DoPartitionCheck() (err error) {
const MaxRetry = 5
count := 0
zkHost := d.Params.ZookeeperIP + ":2181"
jsonFile := fmt.Sprintf("%s/plan.json", cst.DefaultKafkaEnv)
topicJSONFile := fmt.Sprintf("%s/topic.json", cst.DefaultKafkaEnv)

// 定义最大重试次数为288次
const MaxRetry = 288
count := 0 // 初始化计数器
zkHost := d.Params.ZookeeperIP + ":2181" // 构建Zookeeper的连接字符串
jsonFile := cst.PlanJSONFile // 搬迁计划文件
topicJSONFile := fmt.Sprintf("%s/topic.json", cst.DefaultKafkaEnv) // Kafka主题配置文件

// 循环检查搬迁状态
for {
count++
count++ // 增加重试计数
logger.Info("检查搬迁状态,次数[%d]", count)

// 如果topic.json文件不存在,表示没有需要检查的搬迁进度
if !osutil.FileExist(topicJSONFile) {
logger.Info("[%s] no exist, no need to check progress.")
break
logger.Info("[%s] no exist, no need to check progress.", topicJSONFile)
break // 退出循环
}

// 调用kafkautil.CheckReassignPartitions来检查搬迁进度
out, err := kafkautil.CheckReassignPartitions(zkHost, jsonFile)
if err != nil {
// 如果检查失败,记录错误并返回
logger.Error("检查partition搬迁进度失败 %v", err)
return err
}
// 记录当前搬迁进度
logger.Info("当前进度: [%s]", out)
// 如果输出为空,表示搬迁完成
if len(out) == 0 {
logger.Info("数据搬迁完成")
break
break // 退出循环
}

// 如果达到最大重试次数,记录错误并返回超时错误
if count == MaxRetry {
logger.Error("检查数据搬迁超时,可以选择重试")
return fmt.Errorf("检查扩容状态超时,可以选择重试")
}
time.Sleep(60 * time.Second)
// 等待5分钟后再次检查
time.Sleep(300 * time.Second)
}

// 搬迁完成后的日志信息
logger.Info("分区已搬空, 若有新增topic, 请检查分区分布")
logger.Info("清理计划文件")
extraCmd := fmt.Sprintf("rm -f %s %s", jsonFile, topicJSONFile)
// 构建清理计划文件的命令
extraCmd := fmt.Sprintf("rm -f %s %s %s", jsonFile, topicJSONFile, cst.RollbackFile)
logger.Info("cmd: [%s]", extraCmd)
// 执行清理命令
osutil.ExecShellCommandBd(false, extraCmd)

// 函数成功完成,返回nil
return nil
}

// DoEmptyCheck 检查broker数据目录为空
func (d *DecomBrokerComp) DoEmptyCheck() (err error) {
// 从配置文件中读取数据目录
dataDirs, err := kafkautil.ReadDataDirs(cst.KafkaConfigFile)
if err != nil {
logger.Error("Error reading data directories from config file: %v", err)
return err
}
// 检查Broker是否为空
empty, err := kafkautil.IsBrokerEmpty(dataDirs)
if err != nil {
logger.Error("Error checking if the broker is empty: %v", err)
return err
}
if !empty {
errMsg := fmt.Errorf("The broker is not empty.")
return errMsg
}
return nil
}
Loading

0 comments on commit 39fb587

Please sign in to comment.