diff --git a/.gitignore b/.gitignore index 1661ec7..f64efdb 100644 --- a/.gitignore +++ b/.gitignore @@ -1,7 +1,6 @@ *.log bin/* .vscode -external/curve-go-rpc/* external/website/* docker/pigeon docker/website diff --git a/.gitmodules b/.gitmodules index 1c3b344..1cdcc1a 100644 --- a/.gitmodules +++ b/.gitmodules @@ -1,6 +1,3 @@ -[submodule "external/curve-go-rpc"] - path = external/curve-go-rpc - url = git@github.com:SeanHai/curve-go-rpc.git [submodule "external/website"] path = external/website url = git@github.com:opencurve/curve-dashboard.git diff --git a/api/curvebs/agent/agent.go b/api/curvebs/agent/agent.go index 593d5c7..9dd1762 100644 --- a/api/curvebs/agent/agent.go +++ b/api/curvebs/agent/agent.go @@ -23,8 +23,8 @@ package agent import ( + bshttp "github.com/opencurve/curve-manager/internal/http/curvebs" metrics "github.com/opencurve/curve-manager/internal/metrics/core" - bsrpc "github.com/opencurve/curve-manager/internal/rpc/curvebs" "github.com/opencurve/curve-manager/internal/snapshotclone" "github.com/opencurve/curve-manager/internal/storage" "github.com/opencurve/pigeon" @@ -75,7 +75,7 @@ func InitClients(logger *pigeon.Logger) error { } // init mds rpc client - bsrpc.Init(clusterAddrs.Addrs) + bshttp.Init(clusterAddrs.Addrs) // init metric client metrics.Init(clusterAddrs.Addrs) @@ -86,7 +86,7 @@ func InitClients(logger *pigeon.Logger) error { if currentClusterId <= 0 && clusterAddrs.ClusterId > 0 { currentClusterId = clusterAddrs.ClusterId return initAlerts(alertExpirationDays, logger) - } + } if currentClusterId > 0 { stopAlertTasks() currentClusterId = clusterAddrs.ClusterId diff --git a/api/curvebs/agent/cluster.go b/api/curvebs/agent/cluster.go index c2f9e24..b12d688 100644 --- a/api/curvebs/agent/cluster.go +++ b/api/curvebs/agent/cluster.go @@ -28,8 +28,8 @@ import ( comm "github.com/opencurve/curve-manager/api/common" "github.com/opencurve/curve-manager/internal/common" "github.com/opencurve/curve-manager/internal/errno" + bshttp "github.com/opencurve/curve-manager/internal/http/curvebs" "github.com/opencurve/curve-manager/internal/metrics/bsmetric" - bsrpc "github.com/opencurve/curve-manager/internal/rpc/curvebs" "github.com/opencurve/pigeon" ) @@ -47,7 +47,7 @@ type ClusterStatus struct { func GetClusterSpace(l *pigeon.Logger, rId string) (interface{}, errno.Errno) { result := Space{} // get logical pools form mds - pools, err := bsrpc.GMdsClient.ListLogicalPool() + pools, err := bshttp.GMdsClient.ListLogicalPool() if err != nil { l.Error("GetClusterSpace bsrpc.ListLogicalPool failed", pigeon.Field("error", err), @@ -111,7 +111,7 @@ func GetClusterPerformance(r *pigeon.Request, start, end, interval uint64) (inte func GetClusterStatus(l *pigeon.Logger, rId string) interface{} { clusterStatus := ClusterStatus{} // 1. get pool numbers in cluster - pools, err := bsrpc.GMdsClient.ListLogicalPool() + pools, err := bshttp.GMdsClient.ListLogicalPool() if err != nil { clusterStatus.Healthy = false clusterStatus.PoolNum = 0 diff --git a/api/curvebs/agent/copyset.go b/api/curvebs/agent/copyset.go index 6575cc7..46e6ef4 100644 --- a/api/curvebs/agent/copyset.go +++ b/api/curvebs/agent/copyset.go @@ -29,8 +29,8 @@ import ( set "github.com/deckarep/golang-set/v2" "github.com/opencurve/curve-manager/internal/common" + bshttp "github.com/opencurve/curve-manager/internal/http/curvebs" "github.com/opencurve/curve-manager/internal/metrics/bsmetric" - bsrpc "github.com/opencurve/curve-manager/internal/rpc/curvebs" ) const ( @@ -121,7 +121,7 @@ func (cs *Copyset) updatePeerOfflineCopysets(csAddr string) error { if err != nil { return err } - copysets, err := bsrpc.GMdsClient.GetCopySetsInChunkServer(item[0], uint32(port)) + copysets, err := bshttp.GMdsClient.GetCopySetsInChunkServer(item[0], uint32(port)) if err != nil { return fmt.Errorf("GetCopySetsInChunkServer failed, %s", err) } @@ -134,7 +134,7 @@ func (cs *Copyset) updatePeerOfflineCopysets(csAddr string) error { if len(copysets) > 0 { logicalPoolId = copysets[0].LogicalPoolId } - memberInfo, err := bsrpc.GMdsClient.GetChunkServerListInCopySets(logicalPoolId, copysetIds) + memberInfo, err := bshttp.GMdsClient.GetChunkServerListInCopySets(logicalPoolId, copysetIds) if err != nil { return fmt.Errorf("GetChunkServerListInCopySets failed, %s", err) } @@ -184,7 +184,7 @@ func (cs *Copyset) ifChunkServerInCopysets(csAddr string, groupIds *set.Set[stri logicalPoolId = getPoolIdFormGroupId(ngid) copysetIds = append(copysetIds, getCopysetIdFromGroupId(ngid)) } - memberInfo, err := bsrpc.GMdsClient.GetChunkServerListInCopySets(logicalPoolId, copysetIds) + memberInfo, err := bshttp.GMdsClient.GetChunkServerListInCopySets(logicalPoolId, copysetIds) if err != nil { return nil, fmt.Errorf("GetChunkServerListInCopySets failed, %s", err) } @@ -365,7 +365,7 @@ func (cs *Copyset) checkCopysetsOnChunkServer(csAddr string, status []map[string func (cs *Copyset) checkCopysetsWithMds() (bool, error) { // get copysets in cluster - csInfos, err := bsrpc.GMdsClient.GetCopySetsInCluster() + csInfos, err := bshttp.GMdsClient.GetCopySetsInCluster() if err != nil { return false, fmt.Errorf("GetCopySetsInCluster failed, %s", err) } @@ -410,7 +410,7 @@ func (cs *Copyset) checkCopysetsWithMds() (bool, error) { func (cs *Copyset) checkCopysetsInCluster() (bool, error) { healthy := true // 2.1 get chunkservers in cluster - chunkservers, err := bsrpc.GMdsClient.GetChunkServerInCluster() + chunkservers, err := bshttp.GMdsClient.GetChunkServerInCluster() if err != nil { return false, fmt.Errorf("GetChunkServerInCluster failed, %s", err) } diff --git a/api/curvebs/agent/service_status.go b/api/curvebs/agent/service_status.go index 5f1e8fb..d3cca4d 100644 --- a/api/curvebs/agent/service_status.go +++ b/api/curvebs/agent/service_status.go @@ -24,13 +24,12 @@ package agent import ( "fmt" + bshttp "github.com/opencurve/curve-manager/internal/http/curvebs" - "github.com/SeanHai/curve-go-rpc/rpc/curvebs" comm "github.com/opencurve/curve-manager/api/common" "github.com/opencurve/curve-manager/internal/common" "github.com/opencurve/curve-manager/internal/errno" "github.com/opencurve/curve-manager/internal/metrics/bsmetric" - bsrpc "github.com/opencurve/curve-manager/internal/rpc/curvebs" "github.com/opencurve/pigeon" ) @@ -91,7 +90,7 @@ func GetSnapShotCloneServerStatus(r *pigeon.Request) (interface{}, errno.Errno) func GetChunkServerStatus(l *pigeon.Logger, rId string) (interface{}, errno.Errno) { var result ChunkServerStatus // get chunkserver form mds - chunkservers, err := bsrpc.GMdsClient.GetChunkServerInCluster() + chunkservers, err := bshttp.GMdsClient.GetChunkServerInCluster() if err != nil { l.Error("GetChunkServerStatus bsrpc.GetChunkServerInCluster failed", pigeon.Field("error", err), @@ -103,7 +102,7 @@ func GetChunkServerStatus(l *pigeon.Logger, rId string) (interface{}, errno.Errn var endponits []string for _, cs := range chunkservers { endpoint := fmt.Sprintf("%s:%d", cs.HostIp, cs.Port) - if cs.OnlineStatus == curvebs.ONLINE_STATUS { + if cs.OnlineStatus == bshttp.ONLINE_STATUS { online += 1 endponits = append(endponits, endpoint) } else { diff --git a/api/curvebs/agent/test_mdsLeader.go b/api/curvebs/agent/test_mdsLeader.go new file mode 100644 index 0000000..01b83fe --- /dev/null +++ b/api/curvebs/agent/test_mdsLeader.go @@ -0,0 +1,40 @@ +package agent + +import ( + "encoding/json" + "fmt" + "github.com/go-resty/resty/v2" + "github.com/opencurve/curve-manager/internal/common" + "net/url" + "testing" +) + +func TestMdsClient_ListPhysicalPool_http(t *testing.T) { + +} +func TestGetCurrentClusterServicesAddr() (clusterServicesAddr, error) { + ret := clusterServicesAddr{} + httpClient := common.GetHttpClient() + url := (&url.URL{ + Scheme: "http", + Host: "127.0.0.1:11000", + Path: "/", + RawQuery: fmt.Sprintf("%s=%s", "method", CLUSTER_SERVICES_ADDRESS), + }).String() + + resp, err := resty.NewWithClient(httpClient).R(). + SetHeader("Connection", "Keep-Alive"). + SetHeader("Content-Type", "application/json"). + SetHeader("User-Agent", "Curve-Manager"). + Execute("GET", url) + if err != nil { + return ret, fmt.Errorf("getClusterServicesAddr failed: %v", err) + } + + respStruct := admHttpResponse{} + err = json.Unmarshal([]byte(resp.String()), &respStruct) + if err != nil { + return ret, fmt.Errorf("Unmarshal getClusterServicesAddr response failed, resp = %s, err = %v", resp.String(), err) + } + return respStruct.Data, nil +} diff --git a/api/curvebs/agent/topology.go b/api/curvebs/agent/topology.go index e65af33..072beae 100644 --- a/api/curvebs/agent/topology.go +++ b/api/curvebs/agent/topology.go @@ -23,15 +23,15 @@ package agent import ( + "github.com/opencurve/curve-manager/internal/http/curvebs" "sort" - "github.com/SeanHai/curve-go-rpc/rpc/curvebs" comm "github.com/opencurve/curve-manager/api/common" "github.com/opencurve/curve-manager/internal/common" "github.com/opencurve/curve-manager/internal/errno" + bshttp "github.com/opencurve/curve-manager/internal/http/curvebs" "github.com/opencurve/curve-manager/internal/metrics/bsmetric" metricomm "github.com/opencurve/curve-manager/internal/metrics/common" - bsrpc "github.com/opencurve/curve-manager/internal/rpc/curvebs" "github.com/opencurve/pigeon" ) @@ -92,7 +92,7 @@ func listChunkServer(pools *[]Pool, size int) error { for zIndex, zone := range pool.Zones { for sIndex, server := range zone.Servers { go func(id uint32, addr *Server) { - chunkservers, err := bsrpc.GMdsClient.ListChunkServer(id) + chunkservers, err := bshttp.GMdsClient.ListChunkServer(id) ret <- common.QueryResult{ Key: addr, Result: chunkservers, @@ -125,7 +125,7 @@ func listZoneServer(pools *[]Pool, size int) error { for pIndex, pool := range *pools { for zIndex, zone := range pool.Zones { go func(id uint32, addr *Zone) { - servers, err := bsrpc.GMdsClient.ListZoneServer(id) + servers, err := bshttp.GMdsClient.ListZoneServer(id) ret <- common.QueryResult{ Key: addr, Result: servers, @@ -165,7 +165,7 @@ func listPoolZone(pools *[]Pool) error { number := 0 for index, pool := range *pools { go func(id uint32, addr *Pool) { - zones, err := bsrpc.GMdsClient.ListPoolZone(id) + zones, err := bshttp.GMdsClient.ListPoolZone(id) ret <- common.QueryResult{ Key: addr, Result: zones, @@ -194,7 +194,7 @@ func listPoolZone(pools *[]Pool) error { func getPoolSpace(pools *[]PoolInfo) error { // get can be recycled space - _, recycledSize, err := bsrpc.GMdsClient.GetFileAllocatedSize(RECYCLEBIN_DIR) + _, recycledSize, err := bshttp.GMdsClient.GetFileAllocatedSize(RECYCLEBIN_DIR) if err != nil { return err } @@ -312,7 +312,7 @@ func sortTopology(pools []Pool) { func ListLogicalPool(r *pigeon.Request) (interface{}, errno.Errno) { result := []PoolInfo{} // get info from mds - pools, err := bsrpc.GMdsClient.ListLogicalPool() + pools, err := bshttp.GMdsClient.ListLogicalPool() if err != nil { r.Logger().Error("ListLogicalPool bsrpc.ListLogicalPool failed", pigeon.Field("error", err), @@ -353,7 +353,7 @@ func ListLogicalPool(r *pigeon.Request) (interface{}, errno.Errno) { } func GetLogicalPool(r *pigeon.Request, poolId uint32, start, end, interval uint64) (interface{}, errno.Errno) { - pool, err := bsrpc.GMdsClient.GetLogicalPool(poolId) + pool, err := bshttp.GMdsClient.GetLogicalPool(poolId) if err != nil { r.Logger().Error("GetLogicalPool bsrpc.GetLogicalPool failed", pigeon.Field("poolId", poolId), @@ -411,7 +411,7 @@ func GetLogicalPool(r *pigeon.Request, poolId uint32, start, end, interval uint6 func ListTopology(r *pigeon.Request) (interface{}, errno.Errno) { result := []Pool{} - logicalPools, err := bsrpc.GMdsClient.ListLogicalPool() + logicalPools, err := bshttp.GMdsClient.ListLogicalPool() if err != nil { r.Logger().Error("ListTopology bsrpc.ListLogicalPool failed", pigeon.Field("error", err), diff --git a/api/curvebs/agent/volume.go b/api/curvebs/agent/volume.go index cdeddc4..e7db885 100644 --- a/api/curvebs/agent/volume.go +++ b/api/curvebs/agent/volume.go @@ -33,13 +33,12 @@ import ( "strings" "time" - "github.com/SeanHai/curve-go-rpc/rpc/curvebs" comm "github.com/opencurve/curve-manager/api/common" "github.com/opencurve/curve-manager/internal/common" "github.com/opencurve/curve-manager/internal/errno" + bshttp "github.com/opencurve/curve-manager/internal/http/curvebs" "github.com/opencurve/curve-manager/internal/metrics/bsmetric" metricomm "github.com/opencurve/curve-manager/internal/metrics/common" - bsrpc "github.com/opencurve/curve-manager/internal/rpc/curvebs" "github.com/opencurve/curve-manager/internal/snapshotclone" "github.com/opencurve/pigeon" ) @@ -75,14 +74,14 @@ type VolumePoolInfo struct { Alloc uint32 `json:"alloc" binding:"required"` } type VolumeInfo struct { - Info curvebs.FileInfo `json:"info" binding:"required"` + Info bshttp.FileInfo `json:"info" binding:"required"` Pools []VolumePoolInfo `json:"pools"` Performance []metricomm.UserPerformance `json:"performance" binding:"required"` } type ListVolumeInfo struct { - Total int `json:"total" binding:"required"` - Info []curvebs.FileInfo `json:"info" binding:"required"` + Total int `json:"total" binding:"required"` + Info []bshttp.FileInfo `json:"info" binding:"required"` } func getUpPath(dir string) string { @@ -122,7 +121,7 @@ func getAuthInfoOfRoot() (*AuthInfo, string) { }, "" } -func sortFile(files []curvebs.FileInfo, orderKey string, direction int) { +func sortFile(files []bshttp.FileInfo, orderKey string, direction int) { sort.Slice(files, func(i, j int) bool { switch orderKey { case ORDER_BY_CTIME: @@ -150,7 +149,7 @@ func getVolumeAllocSize(dir string, volumes *[]VolumeInfo) error { ret := make(chan common.QueryResult, size) for index, volume := range *volumes { go func(vname string, addr *VolumeInfo) { - _, poolSize, err := bsrpc.GMdsClient.GetFileAllocatedSize(vname) + _, poolSize, err := bshttp.GMdsClient.GetFileAllocatedSize(vname) ret <- common.QueryResult{ Key: addr, Result: poolSize, @@ -181,12 +180,12 @@ func getVolumeAllocSize(dir string, volumes *[]VolumeInfo) error { } func getVolumePoolInfo(volumes *[]VolumeInfo) error { - pools, err := bsrpc.GMdsClient.ListLogicalPool() + pools, err := bshttp.GMdsClient.ListLogicalPool() if err != nil { return fmt.Errorf("getVolumePoolInfo failed, %s", err) } - poolMap := make(map[uint32]*curvebs.LogicalPool) + poolMap := make(map[uint32]*bshttp.LogicalPool) for index, pool := range pools { poolMap[pool.Id] = &pools[index] } @@ -234,9 +233,9 @@ func getVolumeSpaceSize(dir string, size int, volumes *[]VolumeInfo) error { } ret := make(chan common.QueryResult, size) for index, volume := range *volumes { - if volume.Info.FileType == curvebs.INODE_DIRECTORY { + if volume.Info.FileType == bshttp.INODE_DIRECTORY { go func(vname string, addr *VolumeInfo) { - size, err := bsrpc.GMdsClient.GetFileSize(vname) + size, err := bshttp.GMdsClient.GetFileSize(vname) ret <- common.QueryResult{ Key: addr, Result: size, @@ -264,7 +263,7 @@ func findVolumeMountPoints(dir string, volumes *[]VolumeInfo) error { ret := make(chan common.QueryResult, size) for index, volume := range *volumes { go func(vname string, addr *VolumeInfo) { - infos, err := bsrpc.GMdsClient.FindFileMountPoint(vname) + infos, err := bshttp.GMdsClient.FindFileMountPoint(vname) ret <- common.QueryResult{ Key: addr, Result: infos, @@ -288,7 +287,7 @@ func findVolumeMountPoints(dir string, volumes *[]VolumeInfo) error { func ListVolume(r *pigeon.Request, size, page uint32, path, key string, direction int) (interface{}, errno.Errno) { listVolumeInfo := ListVolumeInfo{ - Info: []curvebs.FileInfo{}, + Info: []bshttp.FileInfo{}, } authInfo, err := getAuthInfoOfRoot() if err != "" { @@ -297,7 +296,7 @@ func ListVolume(r *pigeon.Request, size, page uint32, path, key string, directio pigeon.Field("requestId", r.HeadersIn[comm.HEADER_REQUEST_ID])) return nil, errno.GET_ROOT_AUTH_FAILED } - fileInfos, e := bsrpc.GMdsClient.ListDir(path, authInfo.userName, authInfo.signatrue, authInfo.date) + fileInfos, e := bshttp.GMdsClient.ListDir(path, authInfo.userName, authInfo.signatrue, authInfo.date) if e != nil { r.Logger().Error("ListVolume bsrpc.ListDir failed", pigeon.Field("path", path), @@ -309,7 +308,7 @@ func ListVolume(r *pigeon.Request, size, page uint32, path, key string, directio return nil, errno.LIST_VOLUME_FAILED } // exclude /RecycleBin and /clone - var tmpSlice []curvebs.FileInfo + var tmpSlice []bshttp.FileInfo if path == ROOT_DIR { for i := range fileInfos { if fileInfos[i].FileName == RECYCLEBIN_NAME || fileInfos[i].FileName == CLONE_NAME { @@ -338,7 +337,7 @@ func ListVolume(r *pigeon.Request, size, page uint32, path, key string, directio tmpVolumes = append(tmpVolumes, VolumeInfo{ Info: v, }) - if v.FileType == curvebs.INODE_DIRECTORY { + if v.FileType == bshttp.INODE_DIRECTORY { dirSize++ } } @@ -380,7 +379,7 @@ func GetVolume(r *pigeon.Request, volumeName string, start, end, interval uint64 pigeon.Field("requestId", r.HeadersIn[comm.HEADER_REQUEST_ID])) return nil, errno.GET_ROOT_AUTH_FAILED } - fileInfo, e := bsrpc.GMdsClient.GetFileInfo(volumeName, authInfo.userName, authInfo.signatrue, authInfo.date) + fileInfo, e := bshttp.GMdsClient.GetFileInfo(volumeName, authInfo.userName, authInfo.signatrue, authInfo.date) if e != nil { if e.Error() == FILE_NOT_EXIST || e.Error() == PARAM_ERROR { return nil, errno.OK @@ -417,7 +416,7 @@ func GetVolume(r *pigeon.Request, volumeName string, start, end, interval uint64 } // get performance of the volume - if start != 0 && end != 0 && interval != 0 { + if start != 0 && end != 0 && interval != 0 { e = getVolumePerformance(path, &volumes, start, end, interval) if e != nil { r.Logger().Error("GetVolume getVolumePerformance failed", @@ -434,7 +433,7 @@ func GetVolume(r *pigeon.Request, volumeName string, start, end, interval uint64 return volumes[0], errno.OK } -func needDelete(file *curvebs.FileInfo, expiration uint64) bool { +func needDelete(file *bshttp.FileInfo, expiration uint64) bool { if file == nil { return false } @@ -459,7 +458,7 @@ func CleanRecycleBin(r *pigeon.Request, expiration uint64) errno.Errno { return errno.GET_ROOT_AUTH_FAILED } - fileInfos, e := bsrpc.GMdsClient.ListDir(RECYCLEBIN_DIR, authInfo.userName, authInfo.signatrue, authInfo.date) + fileInfos, e := bshttp.GMdsClient.ListDir(RECYCLEBIN_DIR, authInfo.userName, authInfo.signatrue, authInfo.date) if e != nil { r.Logger().Error("CleanRecycleBin bsrpc.ListDir failed", pigeon.Field("error", e), @@ -470,7 +469,7 @@ func CleanRecycleBin(r *pigeon.Request, expiration uint64) errno.Errno { for _, file := range fileInfos { if needDelete(&file, expiration) { fileName := path.Join(RECYCLEBIN_DIR, file.FileName) - e = bsrpc.GMdsClient.DeleteFile(fileName, authInfo.userName, authInfo.signatrue, 0, authInfo.date, true) + e = bshttp.GMdsClient.DeleteFile(fileName, authInfo.userName, authInfo.signatrue, 0, authInfo.date, true) if e != nil { r.Logger().Error("CleanRecycleBin bsrpc.DeleteFile failed", pigeon.Field("fileName", fileName), @@ -499,7 +498,7 @@ func CreateNameSpace(r *pigeon.Request, name, user, passwrord string) errno.Errn if user == authInfo.userName && passwrord != "" { sig = authInfo.signatrue } - e := bsrpc.GMdsClient.CreateFile(name, curvebs.INODE_DIRECTORY, user, sig, 0, authInfo.date, 0, 0) + e := bshttp.GMdsClient.CreateFile(name, bshttp.INODE_DIRECTORY, user, sig, 0, authInfo.date, 0, 0) if e != nil { r.Logger().Error("CreateNameSpace failed", pigeon.Field("name", name), @@ -524,7 +523,7 @@ func CreateVolume(r *pigeon.Request, name, user string, passwrord string, length if user == authInfo.userName && passwrord != "" { sig = authInfo.signatrue } - e := bsrpc.GMdsClient.CreateFile(name, curvebs.INODE_PAGEFILE, user, sig, length*common.GiB, authInfo.date, stripUnit, stripCount) + e := bshttp.GMdsClient.CreateFile(name, bshttp.INODE_PAGEFILE, user, sig, length*common.GiB, authInfo.date, stripUnit, stripCount) if e != nil { r.Logger().Error("CreateVolume failed", pigeon.Field("name", name), @@ -548,7 +547,7 @@ func ExtendVolume(r *pigeon.Request, name string, length uint64) errno.Errno { pigeon.Field("requestId", r.HeadersIn[comm.HEADER_REQUEST_ID])) return errno.GET_ROOT_AUTH_FAILED } - e := bsrpc.GMdsClient.ExtendFile(name, authInfo.userName, authInfo.signatrue, length*common.GiB, authInfo.date) + e := bshttp.GMdsClient.ExtendFile(name, authInfo.userName, authInfo.signatrue, length*common.GiB, authInfo.date) if e != nil { r.Logger().Error("ExtendVolume failed", pigeon.Field("name", name), @@ -569,8 +568,8 @@ func VolumeThrottle(r *pigeon.Request, name, throttleType string, limit, burst, pigeon.Field("requestId", r.HeadersIn[comm.HEADER_REQUEST_ID])) return errno.GET_ROOT_AUTH_FAILED } - e := bsrpc.GMdsClient.UpdateFileThrottleParams(name, authInfo.userName, authInfo.signatrue, authInfo.date, - curvebs.ThrottleParams{ + e := bshttp.GMdsClient.UpdateFileThrottleParams(name, authInfo.userName, authInfo.signatrue, authInfo.date, + bshttp.ThrottleParams{ Type: throttleType, Limit: limit, Burst: burst, @@ -593,8 +592,8 @@ func VolumeThrottle(r *pigeon.Request, name, throttleType string, limit, burst, func deleteVolume(r *pigeon.Request, volumes *map[string]string, force bool, auth *AuthInfo) bool { success := true for name, ftype := range *volumes { - if ftype == curvebs.INODE_DIRECTORY { - fileInfos, e := bsrpc.GMdsClient.ListDir(name, auth.userName, auth.signatrue, auth.date) + if ftype == bshttp.INODE_DIRECTORY { + fileInfos, e := bshttp.GMdsClient.ListDir(name, auth.userName, auth.signatrue, auth.date) if e != nil { r.Logger().Error("DeleteVolume ListDir failed", pigeon.Field("path", name), @@ -607,7 +606,7 @@ func deleteVolume(r *pigeon.Request, volumes *map[string]string, force bool, aut } deleteVolume(r, &v, force, auth) } - e := bsrpc.GMdsClient.DeleteFile(name, auth.userName, auth.signatrue, 0, auth.date, force) + e := bshttp.GMdsClient.DeleteFile(name, auth.userName, auth.signatrue, 0, auth.date, force) if e != nil { r.Logger().Error("DeleteVolume failed", pigeon.Field("name", name), @@ -652,7 +651,7 @@ func RecoverVolume(r *pigeon.Request, ids map[string]uint64) errno.Errno { } success := true for name, id := range ids { - e := bsrpc.GMdsClient.RecoverFile(name, authInfo.userName, authInfo.signatrue, id, authInfo.date) + e := bshttp.GMdsClient.RecoverFile(name, authInfo.userName, authInfo.signatrue, id, authInfo.date) if e != nil { r.Logger().Error("RecoverVolume failed", pigeon.Field("name", name), diff --git a/go.mod b/go.mod index 84c9c05..75ce217 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,6 @@ module github.com/opencurve/curve-manager go 1.18 require ( - github.com/SeanHai/curve-go-rpc v0.0.0-20230327062842-ff4a19bed139 github.com/deckarep/golang-set/v2 v2.1.0 github.com/go-resty/resty/v2 v2.7.0 github.com/google/uuid v1.3.0 @@ -114,4 +113,3 @@ require ( gopkg.in/yaml.v3 v3.0.1 // indirect ) -replace github.com/SeanHai/curve-go-rpc => ./external/curve-go-rpc diff --git a/go.sum b/go.sum index 0a98c39..902d328 100644 --- a/go.sum +++ b/go.sum @@ -42,6 +42,7 @@ github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03 github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo= github.com/Microsoft/go-winio v0.6.0 h1:slsWYD/zyx7lCXoZVlvQrj0hPTM1HI4+v1sIda2yDvg= github.com/Microsoft/go-winio v0.6.0/go.mod h1:cTAf44im0RAYeL23bpB+fzCyDH2MJiz2BO69KH/soAE= +github.com/SeanHai/curve-go-rpc v0.0.0-20230327062842-ff4a19bed139/go.mod h1:eQugQLCNkJCNxk7oddevOnkgHHaxwURgEkhYlKeetEU= github.com/Shopify/logrus-bugsnag v0.0.0-20170309145241-6dbc35f2c30d/go.mod h1:HI8ITrYtUY+O+ZhtlqUnD8+KwNPOyugEhfP9fdUIaEQ= github.com/Shopify/logrus-bugsnag v0.0.0-20171204204709-577dee27f20d h1:UrqY+r/OJnIp5u0s1SbQ8dVfLCZJsnvazdBP5hS4iRs= github.com/Wine93/grace v0.0.0-20221021033009-7d0348013a3c h1:LfrAhPNuQo38pAX5gBoRRskbskCgbISKC7CnWlirC78= diff --git a/internal/http/baseHttp/base.go b/internal/http/baseHttp/base.go new file mode 100644 index 0000000..92a7fe1 --- /dev/null +++ b/internal/http/baseHttp/base.go @@ -0,0 +1,125 @@ +package baseHttp + +import ( + "fmt" + "github.com/go-resty/resty/v2" + "net/url" + "time" +) + +type HttpResult struct { + Key interface{} + Err error + Result interface{} +} + +type BaseHttp struct { + Client *resty.Client + Timeout time.Duration + RetryTimes uint32 +} + +var ( + GMetricClient *BaseHttp +) + +func (cli *BaseHttp) SendHTTP(host []string, path string) *HttpResult { + + size := len(host) + if size == 0 { + return &HttpResult{ + Key: "", + Err: fmt.Errorf("empty addr"), + Result: nil, + } + } + results := make(chan HttpResult, size) + for _, host := range host { + url := (&url.URL{ + Scheme: "http", + Host: host, + Path: path, + }).String() + + resp, err := cli.Client.R(). + SetHeader("Connection", "Keep-Alive"). + SetHeader("Content-Type", "application/json"). + SetHeader("User-Agent", "curl/7.52.1"). + Execute("GET", url) + results <- HttpResult{ + Key: host, + Err: err, + Result: resp, + } + } + var count = 0 + var httpErr string + for res := range results { + if res.Err == nil { + return &res + } + count++ + httpErr = fmt.Sprintf("%s;%s:%s", httpErr, res.Key, res.Err.Error()) + if count >= size { + break + } + + } + return &HttpResult{ + Key: "", + Err: fmt.Errorf(httpErr), + Result: nil, + } + +} + +func (cli *BaseHttp) SendHTTPByPost(host []string, path string, body any) *HttpResult { + + size := len(host) + if size == 0 { + return &HttpResult{ + Key: "", + Err: fmt.Errorf("empty addr"), + Result: nil, + } + } + results := make(chan HttpResult, size) + for _, host := range host { + url := (&url.URL{ + Scheme: "http", + Host: host, + Path: path, + }).String() + + resp, err := cli.Client.R(). + SetHeader("Connection", "Keep-Alive"). + SetHeader("Content-Type", "application/json"). + SetHeader("User-Agent", "curl/7.52.1"). + SetBody(body). + Execute("Post", url) + results <- HttpResult{ + Key: host, + Err: err, + Result: resp, + } + } + var count = 0 + var httpErr string + for res := range results { + if res.Err == nil { + return &res + } + count++ + httpErr = fmt.Sprintf("%s;%s:%s", httpErr, res.Key, res.Err.Error()) + if count >= size { + break + } + + } + return &HttpResult{ + Key: "", + Err: fmt.Errorf(httpErr), + Result: nil, + } + +} diff --git a/internal/http/common/common.go b/internal/http/common/common.go new file mode 100644 index 0000000..3dc36e5 --- /dev/null +++ b/internal/http/common/common.go @@ -0,0 +1,96 @@ +package common + +import ( + "google.golang.org/protobuf/runtime/protoimpl" +) + +const ( + GiB = 1024 * 1024 * 1024 + TIME_FORMAT = "2006-01-02 15:04:05" +) + +type CopysetInfo struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + LogicalPoolId *uint32 `protobuf:"varint,1,req,name=logicalPoolId" json:"logicalPoolId,omitempty"` + CopysetId *uint32 `protobuf:"varint,2,req,name=copysetId" json:"copysetId,omitempty"` + Scaning *bool `protobuf:"varint,3,opt,name=scaning" json:"scaning,omitempty"` + LastScanSec *uint64 `protobuf:"varint,4,opt,name=lastScanSec" json:"lastScanSec,omitempty"` + LastScanConsistent *bool `protobuf:"varint,5,opt,name=lastScanConsistent" json:"lastScanConsistent,omitempty"` +} + +type ChunkServerLocation struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + ChunkServerID *uint32 `protobuf:"varint,1,req,name=chunkServerID" json:"chunkServerID,omitempty"` + HostIp *string `protobuf:"bytes,2,req,name=hostIp" json:"hostIp,omitempty"` + Port *uint32 `protobuf:"varint,3,req,name=port" json:"port,omitempty"` + ExternalIp *string `protobuf:"bytes,4,opt,name=externalIp" json:"externalIp,omitempty"` +} + +func (x *CopysetInfo) GetLogicalPoolId() uint32 { + if x != nil && x.LogicalPoolId != nil { + return *x.LogicalPoolId + } + return 0 +} + +func (x *CopysetInfo) GetCopysetId() uint32 { + if x != nil && x.CopysetId != nil { + return *x.CopysetId + } + return 0 +} + +func (x *CopysetInfo) GetScaning() bool { + if x != nil && x.Scaning != nil { + return *x.Scaning + } + return false +} + +func (x *CopysetInfo) GetLastScanSec() uint64 { + if x != nil && x.LastScanSec != nil { + return *x.LastScanSec + } + return 0 +} + +func (x *CopysetInfo) GetLastScanConsistent() bool { + if x != nil && x.LastScanConsistent != nil { + return *x.LastScanConsistent + } + return false +} + +func (x *ChunkServerLocation) GetChunkServerID() uint32 { + if x != nil && x.ChunkServerID != nil { + return *x.ChunkServerID + } + return 0 +} + +func (x *ChunkServerLocation) GetHostIp() string { + if x != nil && x.HostIp != nil { + return *x.HostIp + } + return "" +} + +func (x *ChunkServerLocation) GetPort() uint32 { + if x != nil && x.Port != nil { + return *x.Port + } + return 0 +} + +func (x *ChunkServerLocation) GetExternalIp() string { + if x != nil && x.ExternalIp != nil { + return *x.ExternalIp + } + return "" +} diff --git a/internal/http/curvebs/curvebs.go b/internal/http/curvebs/curvebs.go new file mode 100644 index 0000000..753897c --- /dev/null +++ b/internal/http/curvebs/curvebs.go @@ -0,0 +1,54 @@ +package curvebs + +import ( + "github.com/go-resty/resty/v2" + "github.com/opencurve/curve-manager/internal/common" + "net/url" + "strings" +) + +var ( + GMdsClient *MdsClient +) + +const ( + CURVEBS_MDS_ADDRESS = "mds.address" + + DEFAULT_RPC_TIMEOUT_MS = 500 + DEFAULT_RPC_RETRY_TIMES = 3 +) + +func Init(cfg map[string]string) { + addrs := findLeader(cfg) + GMdsClient = NewMdsClient(MdsClientOption{ + TimeoutMs: DEFAULT_RPC_TIMEOUT_MS, + RetryTimes: DEFAULT_RPC_RETRY_TIMES, + Addrs: strings.Split(addrs, common.CURVEBS_ADDRESS_DELIMITER), + }) +} + +func findLeader(cfg map[string]string) string { + mds_addr := cfg[CURVEBS_MDS_ADDRESS] + Addrs := strings.Split(mds_addr, common.CURVEBS_ADDRESS_DELIMITER) + for _, addr := range Addrs { + httpClient := common.GetHttpClient() + url := (&url.URL{ + Scheme: "http", + Host: addr, + Path: "/", + }).String() + resp, err := resty.NewWithClient(httpClient).R(). + SetHeader("Connection", "Keep-Alive"). + SetHeader("Content-Type", "application/json"). + SetHeader("User-Agent", "Curve-Manager"). + Execute("GET", url) + if err != nil { + continue + } + if resp.Body() != nil { + return addr + } + + } + return mds_addr +} diff --git a/internal/http/curvebs/mds.go b/internal/http/curvebs/mds.go new file mode 100644 index 0000000..94a6aef --- /dev/null +++ b/internal/http/curvebs/mds.go @@ -0,0 +1,30 @@ +package curvebs + +import ( + "github.com/go-resty/resty/v2" + "github.com/opencurve/curve-manager/internal/common" + "github.com/opencurve/curve-manager/internal/http/baseHttp" + "time" +) + +type MdsClientOption struct { + TimeoutMs int + RetryTimes uint32 + Addrs []string +} + +type MdsClient struct { + addrs []string + baseClient_http baseHttp.BaseHttp +} + +func NewMdsClient(option MdsClientOption) *MdsClient { + return &MdsClient{ + addrs: option.Addrs, + baseClient_http: baseHttp.BaseHttp{ + Client: resty.NewWithClient(common.GetHttpClient()), + Timeout: time.Duration(option.TimeoutMs * int(time.Millisecond)), + RetryTimes: option.RetryTimes, + }, + } +} diff --git a/internal/http/curvebs/namespace.go b/internal/http/curvebs/namespace.go new file mode 100644 index 0000000..f006083 --- /dev/null +++ b/internal/http/curvebs/namespace.go @@ -0,0 +1,523 @@ +/* +* Copyright (c) 2023 NetEase Inc. +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. + */ + +/* +* Project: Curve-Go-RPC +* Created Date: 2023-03-03 +* Author: wanghai (SeanHai) + */ + +package curvebs + +import ( + "encoding/json" + "fmt" + "github.com/go-resty/resty/v2" + "github.com/opencurve/curve-manager/internal/http/common" + "github.com/opencurve/curve-manager/internal/http/nameserver2" + "strconv" + "time" +) + +const ( + // file type + INODE_DIRECTORY = "INODE_DIRECTORY" + INODE_PAGEFILE = "INODE_PAGEFILE" + INODE_APPENDFILE = "INODE_APPENDFILE" + INODE_APPENDECFILE = "INODE_APPENDECFILE" + INODE_SNAPSHOT_PAGEFILE = "INODE_SNAPSHOT_PAGEFILE" + + // file status + FILE_CREATED = "kFileCreated" + FILE_DELETING = "kFileDeleting" + FILE_CLONING = "kFileCloning" + FILE_CLONEMETA_INSTALLED = "kFileCloneMetaInstalled" + FILE_CLONED = "kFileCloned" + FILE_BEIING_CLONED = "kFileBeingCloned" + + // throttle type + IOPS_TOTAL = "IOPS_TOTAL" + IOPS_READ = "IOPS_READ" + IOPS_WRITE = "IOPS_WRITE" + BPS_TOTAL = "BPS_TOTAL" + BPS_READ = "BPS_READ" + BPS_WRITE = "BPS_WRITE" + + // apis + GET_FILE_ALLOC_SIZE_FUNC = "GetAllocatedSize" + LIST_DIR_FUNC = "ListDir" + GET_FILE_INFO = "GetFileInfo" + GET_FILE_SIZE = "GetFileSize" + DELETE_FILE = "DeleteFile" + CREATE_FILE = "CreateFile" + EXTEND_FILE = "ExtendFile" + RECOVER_FILE = "RecoverFile" + UPDATE_FILE_THROTTLE_PARAMS = "UpdateFileThrottleParams" + FIND_FILE_MOUNTPOINT = "FindFileMountPoint" + + GET_FILE_ALLOC_SIZE_FUNC_http = "GetAllocatedSize" + LIST_DIR_FUNC_http = "ListDir" + GET_FILE_INFO_http = "GetFileInfo" + GET_FILE_SIZE_http = "GetFileSize" + DELETE_FILE_http = "DeleteFile" + CREATE_FILE_http = "CreateFile" + EXTEND_FILE_http = "ExtendFile" + RECOVER_FILE_http = "RecoverFile" + UPDATE_FILE_THROTTLE_PARAMS_http = "UpdateFileThrottleParams" + FIND_FILE_MOUNTPOINT_http = "FindFileMountPoint" +) + +type ThrottleParams struct { + Type string `json:"type"` + Limit uint64 `json:"limit"` + Burst uint64 `json:"burst"` + BurstLength uint64 `json:"burstLength"` +} + +type FileInfo struct { + Id uint64 `json:"id"` + FileName string `json:"fileName"` + ParentId uint64 `json:"parentId"` + FileType string `json:"fileType"` + Owner string `json:"owner"` + ChunkSize uint32 `json:"chunkSize"` + SegmentSize uint32 `json:"segmentSize"` + Length uint64 `json:"length"` + AllocateSize uint64 `json:"alloc"` + Ctime string `json:"ctime"` + SeqNum uint64 `json:"seqNum"` + FileStatus string `json:"fileStatus"` + OriginalFullPathName string `json:"originalFullPathName"` + CloneSource string `json:"cloneSource"` + CloneLength uint64 `json:"cloneLength"` + StripeUnit uint64 `json:"stripeUnit"` + StripeCount uint64 `json:"stripeCount"` + ThrottleParams []ThrottleParams `json:"throttleParams"` + Epoch uint64 `json:"epoch"` + MountPoints []string `json:"mountPoints"` +} + +func (cli *MdsClient) GetFileAllocatedSize(filename string) (uint64, map[uint32]uint64, error) { + var host = cli.addrs + //todo checkHost + var path = GET_FILE_ALLOC_SIZE_FUNC_http + path = path + "FileName=" + filename + + ret := cli.baseClient_http.SendHTTP(host, path) + if ret.Err != nil { + return 0, nil, ret.Err + } + v := ret.Result.(*resty.Response).String() + var response *nameserver2.GetAllocatedSizeResponse + err := json.Unmarshal([]byte(v), &response) + if err != nil { + return 0, nil, err + } + statusCode := response.GetStatusCode() + if statusCode != nameserver2.StatusCode_kOK { + return 0, nil, fmt.Errorf(nameserver2.StatusCode_name[int32(statusCode)]) + } + infos := make(map[uint32]uint64) + for k, v := range response.GetAllocSizeMap() { + infos[k] = v / common.GiB + } + return response.GetAllocatedSize() / common.GiB, infos, nil +} + +func getFileType(t string) nameserver2.FileType { + switch t { + case INODE_DIRECTORY: + return nameserver2.FileType_INODE_DIRECTORY + case INODE_PAGEFILE: + return nameserver2.FileType_INODE_PAGEFILE + case INODE_APPENDFILE: + return nameserver2.FileType_INODE_APPENDFILE + case INODE_APPENDECFILE: + return nameserver2.FileType_INODE_APPENDECFILE + case INODE_SNAPSHOT_PAGEFILE: + return nameserver2.FileType_INODE_SNAPSHOT_PAGEFILE + default: + return -1 + } +} + +func getFileTypeStr(t nameserver2.FileType) string { + switch t { + case nameserver2.FileType_INODE_DIRECTORY: + return INODE_DIRECTORY + case nameserver2.FileType_INODE_PAGEFILE: + return INODE_PAGEFILE + case nameserver2.FileType_INODE_APPENDFILE: + return INODE_APPENDFILE + case nameserver2.FileType_INODE_APPENDECFILE: + return INODE_APPENDECFILE + case nameserver2.FileType_INODE_SNAPSHOT_PAGEFILE: + return INODE_SNAPSHOT_PAGEFILE + default: + return INVALID + } +} + +func getFileStatus(s nameserver2.FileStatus) string { + switch s { + case nameserver2.FileStatus_kFileCreated: + return FILE_CREATED + case nameserver2.FileStatus_kFileDeleting: + return FILE_DELETING + case nameserver2.FileStatus_kFileCloning: + return FILE_CLONING + case nameserver2.FileStatus_kFileCloneMetaInstalled: + return FILE_CLONEMETA_INSTALLED + case nameserver2.FileStatus_kFileCloned: + return FILE_CLONED + case nameserver2.FileStatus_kFileBeingCloned: + return FILE_BEIING_CLONED + default: + return INVALID + } +} + +func getThrottleTypeStr(t nameserver2.ThrottleType) string { + switch t { + case nameserver2.ThrottleType_IOPS_TOTAL: + return IOPS_TOTAL + case nameserver2.ThrottleType_IOPS_READ: + return IOPS_READ + case nameserver2.ThrottleType_IOPS_WRITE: + return IOPS_WRITE + case nameserver2.ThrottleType_BPS_TOTAL: + return BPS_TOTAL + case nameserver2.ThrottleType_BPS_READ: + return BPS_READ + case nameserver2.ThrottleType_BPS_WRITE: + return BPS_WRITE + default: + return INVALID + } +} + +func getThrottleType(t string) nameserver2.ThrottleType { + switch t { + case IOPS_TOTAL: + return nameserver2.ThrottleType_IOPS_TOTAL + case IOPS_READ: + return nameserver2.ThrottleType_IOPS_READ + case IOPS_WRITE: + return nameserver2.ThrottleType_IOPS_WRITE + case BPS_TOTAL: + return nameserver2.ThrottleType_BPS_TOTAL + case BPS_READ: + return nameserver2.ThrottleType_BPS_READ + case BPS_WRITE: + return nameserver2.ThrottleType_BPS_WRITE + default: + return 0 + } +} + +func (cli *MdsClient) ListDir(filename, owner, sig string, date uint64) ([]FileInfo, error) { + var host = cli.addrs + //todo check URL + var path = LIST_DIR_FUNC_http + path = path + "FileName=" + filename + "&Owner=" + owner + "&Date=" + strconv.Itoa(int(date)) + if sig != "" { + path = fmt.Sprintf("%s %s %d", path, "Signature=", &sig) + } + + ret := cli.baseClient_http.SendHTTP(host, path) + if ret.Err != nil { + return nil, ret.Err + } + v := ret.Result.(*resty.Response).String() + var response *nameserver2.ListDirResponse + err := json.Unmarshal([]byte(v), &response) + if err != nil { + return nil, err + } + statusCode := response.GetStatusCode() + if statusCode != nameserver2.StatusCode_kOK { + return nil, fmt.Errorf(nameserver2.StatusCode_name[int32(statusCode)]) + } + infos := []FileInfo{} + for _, v := range response.GetFileInfo() { + var info FileInfo + info.Id = v.GetId() + info.FileName = v.GetFileName() + info.ParentId = v.GetParentId() + info.FileType = getFileTypeStr(v.GetFileType()) + info.Owner = v.GetOwner() + info.ChunkSize = v.GetChunkSize() + info.SegmentSize = v.GetSegmentSize() + info.Length = v.GetLength() / common.GiB + info.Ctime = time.Unix(int64(v.GetCtime()/1000000), 0).Format(common.TIME_FORMAT) + info.SeqNum = v.GetSeqNum() + info.FileStatus = getFileStatus(v.GetFileStatus()) + info.OriginalFullPathName = v.GetOriginalFullPathName() + info.CloneSource = v.GetCloneSource() + info.CloneLength = v.GetCloneLength() + info.StripeUnit = v.GetStripeUnit() + info.StripeCount = v.GetStripeCount() + info.ThrottleParams = []ThrottleParams{} + for _, p := range v.GetThrottleParams().GetThrottleParams() { + var param ThrottleParams + param.Type = getThrottleTypeStr(p.GetType()) + param.Limit = p.GetLimit() + param.Burst = p.GetBurst() + param.BurstLength = p.GetBurstLength() + info.ThrottleParams = append(info.ThrottleParams, param) + } + info.Epoch = v.GetEpoch() + infos = append(infos, info) + } + return infos, nil +} + +func (cli *MdsClient) GetFileInfo(filename, owner, sig string, date uint64) (FileInfo, error) { + info := FileInfo{} + var host = cli.addrs + // todo check URL + var path = GET_FILE_SIZE_http + path = path + "FileName=" + filename + "&Owner=" + owner + "&Date=" + strconv.Itoa(int(date)) + if sig != "" { + path = fmt.Sprintf("%s %s %d", path, "Signature=", &sig) + } + + ret := cli.baseClient_http.SendHTTP(host, path) + if ret.Err != nil { + return info, ret.Err + } + tmp := ret.Result.(*resty.Response).String() + var response *nameserver2.GetFileInfoResponse + err := json.Unmarshal([]byte(tmp), &response) + if err != nil { + return info, err + } + statusCode := response.GetStatusCode() + if statusCode != nameserver2.StatusCode_kOK { + return info, fmt.Errorf(nameserver2.StatusCode_name[int32(statusCode)]) + } + v := response.GetFileInfo() + info.Id = v.GetId() + info.FileName = v.GetFileName() + info.ParentId = v.GetParentId() + info.FileType = getFileTypeStr(v.GetFileType()) + info.Owner = v.GetOwner() + info.ChunkSize = v.GetChunkSize() + info.SegmentSize = v.GetSegmentSize() + info.Length = v.GetLength() / common.GiB + info.Ctime = time.Unix(int64(v.GetCtime()/1000000), 0).Format(common.TIME_FORMAT) + info.SeqNum = v.GetSeqNum() + info.FileStatus = getFileStatus(v.GetFileStatus()) + info.OriginalFullPathName = v.GetOriginalFullPathName() + info.CloneSource = v.GetCloneSource() + info.CloneLength = v.GetCloneLength() + info.StripeUnit = v.GetStripeUnit() + info.StripeCount = v.GetStripeCount() + info.ThrottleParams = []ThrottleParams{} + for _, p := range v.GetThrottleParams().GetThrottleParams() { + var param ThrottleParams + param.Type = getThrottleTypeStr(p.GetType()) + param.Limit = p.GetLimit() + param.Burst = p.GetBurst() + param.BurstLength = p.GetBurstLength() + info.ThrottleParams = append(info.ThrottleParams, param) + } + info.Epoch = v.GetEpoch() + return info, nil +} + +func (cli *MdsClient) GetFileSize(fileName string) (uint64, error) { + var size uint64 + var host = cli.addrs + var path = GET_FILE_SIZE_http + // todo checkURL + path = fmt.Sprintf("%s %s %s", path, "FileName=", fileName) + ret := cli.baseClient_http.SendHTTP(host, path) + if ret.Err != nil { + return size, ret.Err + } + + tmp := ret.Result.(*resty.Response).String() + var response *nameserver2.GetFileSizeResponse + err := json.Unmarshal([]byte(tmp), &response) + if err != nil { + return 0, err + } + statusCode := response.GetStatusCode() + if statusCode != nameserver2.StatusCode_kOK { + return size, fmt.Errorf(nameserver2.StatusCode_name[int32(statusCode)]) + } + size = response.GetFileSize() / common.GiB + return size, nil +} + +func (cli *MdsClient) DeleteFile(filename, owner, sig string, fileId, date uint64, forceDelete bool) error { + var host = cli.addrs + var path = DELETE_FILE_http + //todo checkURL + path = path + "FileName=" + filename + "&Date=" + strconv.Itoa(int(date)) + "&ForceDelete=" + strconv.FormatBool(forceDelete) + if sig != "" { + path = fmt.Sprintf("%s %s %s", path, "Signature=", sig) + } + if fileId != 0 { + path = fmt.Sprintf("%s %s %d", path, "FileId=", &fileId) + } + + ret := cli.baseClient_http.SendHTTP(host, path) + + if ret.Err != nil { + return ret.Err + } + tmp := ret.Result.(*resty.Response).String() + var response *nameserver2.DeleteFileResponse + err := json.Unmarshal([]byte(tmp), &response) + if err != nil { + return err + } + statusCode := response.GetStatusCode() + if statusCode != nameserver2.StatusCode_kOK { + return fmt.Errorf(nameserver2.StatusCode_name[int32(statusCode)]) + } + return nil +} + +func (cli *MdsClient) RecoverFile(filename, owner, sig string, fileId, date uint64) error { + var host = cli.addrs + var path = RECOVER_FILE_http + //todo checkURL + path = path + "FileName=" + filename + "&Owner=" + owner + "&Date" + strconv.Itoa(int(date)) + if sig != "" { + path = fmt.Sprintf("%s %s %s", path, "?Signature=", sig) + } + if fileId != 0 { + path = fmt.Sprintf("%s %s %d", path, "?FileId=", &fileId) + } + + ret := cli.baseClient_http.SendHTTP(host, path) + if ret.Err != nil { + return ret.Err + } + tmp := ret.Result.(*resty.Response).String() + var response *nameserver2.RecoverFileResponse + err := json.Unmarshal([]byte(tmp), &response) + if err != nil { + return err + } + statusCode := response.GetStatusCode() + if statusCode != nameserver2.StatusCode_kOK { + return fmt.Errorf(nameserver2.StatusCode_name[int32(statusCode)]) + } + return nil +} + +func (cli *MdsClient) CreateFile(filename, ftype, owner, sig string, length, date, stripeUnit, stripeCount uint64) error { + var host = cli.addrs + var path = CREATE_FILE_http + //todo: generating param + ret := cli.baseClient_http.SendHTTP(host, path) + if ret.Err != nil { + return ret.Err + } + tmp := ret.Result.(*resty.Response).String() + var response *nameserver2.CreateFileResponse + err := json.Unmarshal([]byte(tmp), &response) + if err != nil { + return err + } + statusCode := response.GetStatusCode() + if statusCode != nameserver2.StatusCode_kOK { + return fmt.Errorf(nameserver2.StatusCode_name[int32(statusCode)]) + } + return nil +} + +func (cli *MdsClient) ExtendFile(filename, owner, sig string, newSize, date uint64) error { + var host = cli.addrs + var path = EXTEND_FILE_http + //todo: checkURL + path = path + "FileName=" + filename + "&NewSize=" + strconv.Itoa(int(newSize)) + "&Owner=" + owner + "&Date" + strconv.Itoa(int(date)) + if sig != "" { + path = fmt.Sprintf("%s %s %s", path, "Signature=", sig) + } + ret := cli.baseClient_http.SendHTTP(host, path) + if ret.Err != nil { + return ret.Err + } + tmp := ret.Result.(*resty.Response).String() + var response *nameserver2.ExtendFileResponse + err := json.Unmarshal([]byte(tmp), &response) + if err != nil { + return err + } + statusCode := response.GetStatusCode() + if statusCode != nameserver2.StatusCode_kOK { + return fmt.Errorf(nameserver2.StatusCode_name[int32(statusCode)]) + } + return nil +} + +func (cli *MdsClient) UpdateFileThrottleParams(filename, owner, sig string, date uint64, params ThrottleParams) error { + var host = cli.addrs + var path = UPDATE_FILE_THROTTLE_PARAMS_http + + //todo : generating paramt + if sig != "" { + path = fmt.Sprintf("%s %s %s", path, "Signature=", sig) + } + ret := cli.baseClient_http.SendHTTP(host, path) + if ret.Err != nil { + return ret.Err + } + tmp := ret.Result.(*resty.Response).String() + var response *nameserver2.UpdateFileThrottleParamsResponse + err := json.Unmarshal([]byte(tmp), &response) + if err != nil { + return err + } + statusCode := response.GetStatusCode() + if statusCode != nameserver2.StatusCode_kOK { + return fmt.Errorf(nameserver2.StatusCode_name[int32(statusCode)]) + } + return nil +} + +func (cli *MdsClient) FindFileMountPoint(filename string) ([]string, error) { + info := []string{} + + var host = cli.addrs + //todo: checkURL + var path = FIND_FILE_MOUNTPOINT_http + path = fmt.Sprintf("%s %s %s", path, "FileName=", filename) + ret := cli.baseClient_http.SendHTTP(host, path) + + if ret.Err != nil { + return nil, ret.Err + } + tmp := ret.Result.(*resty.Response).String() + var response *nameserver2.FindFileMountPointResponse + err := json.Unmarshal([]byte(tmp), &response) + if err != nil { + return nil, err + } + statusCode := response.GetStatusCode() + if statusCode != nameserver2.StatusCode_kOK { + return info, fmt.Errorf(nameserver2.StatusCode_name[int32(statusCode)]) + } + for _, v := range response.GetClientInfo() { + info = append(info, fmt.Sprintf("%s:%d", v.GetIp(), v.GetPort())) + } + return info, nil +} diff --git a/internal/http/curvebs/topology.go b/internal/http/curvebs/topology.go new file mode 100644 index 0000000..a51be4d --- /dev/null +++ b/internal/http/curvebs/topology.go @@ -0,0 +1,632 @@ +/* +* Copyright (c) 2023 NetEase Inc. +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. + */ + +/* +* Project: Curve-Go-RPC +* Created Date: 2023-03-03 +* Author: wanghai (SeanHai) + */ + +package curvebs + +import ( + "encoding/json" + _ "encoding/json" + "fmt" + "github.com/go-resty/resty/v2" + _ "github.com/go-resty/resty/v2" + "github.com/opencurve/curve-manager/internal/http/baseHttp" + "github.com/opencurve/curve-manager/internal/http/common" + _ "github.com/opencurve/curve-manager/internal/http/common" + "github.com/opencurve/curve-manager/internal/http/statuscode" + "github.com/opencurve/curve-manager/internal/http/topology" + "strconv" + _ "strconv" + "time" + _ "time" +) + +const ( + // invalid type + INVALID = "INVALID" + + // logical pool type + PAGEFILE_TYPE = "PAGEFILE" + APPENDFILE_TYPE = "APPENDFILE" + APPENDECFILE_TYPE = "APPENDECFILE" + + // logical pool allocate status + ALLOW_STATUS = "ALLOW" + DENY_STATUS = "DENY" + + // chunkserver status + READWRITE_STATUS = "READWRITE" + PENDDING_STATUS = "PENDDING" + RETIRED_STATUS = "RETIRED" + + // chunkserver disk status + DISKNORMAL_STATUS = "DISKNORMAL" + DISKERROR_STATUS = "DISKERROR" + + // chunkserver online status + ONLINE_STATUS = "ONLINE" + OFFLINE_STATUS = "OFFLINE" + UNSTABLE_STATUS = "UNSTABLE" + + // apis + LIST_PHYSICAL_POOL_FUNC = "ListPhysicalPool" + LIST_LOGICAL_POOL_FUNC = "ListLogicalPool" + LIST_POOL_ZONE_FUNC = "ListPoolZone" + LIST_ZONE_SERVER_FUNC = "ListZoneServer" + LIST_CHUNKSERVER_FUNC = "ListChunkServer" + GET_CHUNKSERVER_IN_CLUSTER_FUNC = "GetChunkServerInCluster" + GET_COPYSET_IN_CHUNKSERVER_FUNC = "GetCopySetsInChunkServer" + GET_CHUNKSERVER_LIST_IN_COPYSETS = "GetChunkServerListInCopySets" + GET_COPYSETS_IN_CLUSTER = "GetCopySetsInCluster" + GET_LOGICAL_POOL = "GetLogicalPool" + + //http path + HTTP_Service = "TopologyService" + LIST_PHYSICAL_POOL_FUNC_HTTP = "ListPhysicalPool" + LIST_LOGICAL_POOL_FUNC_HTTP = "ListLogicalPool" + LIST_POOL_ZONE_FUNC_HTTP = "ListPoolZone" + LIST_ZONE_SERVER_FUNC_HTTP = "ListZoneServer" + LIST_CHUNKSERVER_FUNC_HTTP = "ListChunkServer" + GET_CHUNKSERVER_IN_CLUSTER_FUNC_HTTP = "GetChunkServerInCluster" + GET_COPYSET_IN_CHUNKSERVER_FUNC_HTTP = "GetCopySetsInChunkServer" + GET_CHUNKSERVER_LIST_IN_COPYSETS_HTTP = "GetChunkServerListInCopySets" + GET_COPYSETS_IN_CLUSTER_HTTP = "GetCopySetsInCluster" + GET_LOGICAL_POOL_HTTP = "GetLogicalPool" +) + +type PhysicalPool struct { + Id uint32 `json:"id" binding:"required"` + Name string `json:"name" binding:"required"` + Desc string `json:"desc"` +} + +type LogicalPool struct { + Id uint32 `json:"id" binding:"required"` + Name string `json:"name" binding:"required"` + PhysicalPoolId uint32 `json:"physicalPoolId" binding:"required"` + Type string `json:"type" binding:"required"` + CreateTime string `json:"createTime" binding:"required"` + AllocateStatus string `json:"allocateStatus" binding:"required"` + ScanEnable bool `json:"scanEnable"` +} + +type Zone struct { + Id uint32 `json:"id" binding:"required"` + Name string `json:"name" binding:"required"` + PhysicalPoolId uint32 `json:"physicalPoolId" binding:"required"` + PhysicalPoolName string `json:"physicalName" binding:"required"` + Desc string `json:"desc"` +} + +type Server struct { + Id uint32 `json:"id" binding:"required"` + HostName string `json:"hostName" binding:"required"` + InternalIp string `json:"internalIp" binding:"required"` + InternalPort uint32 `json:"internalPort" binding:"required"` + ExternalIp string `json:"externalIp" binding:"required"` + ExternalPort uint32 `json:"externalPort" binding:"required"` + ZoneId uint32 `json:"zoneId" binding:"required"` + ZoneName string `json:"zoneName" binding:"required"` + PhysicalPoolId uint32 `json:"physicalPoolId" binding:"required"` + PhysicalPoolName string `json:"physicalName" binding:"required"` + Desc string `json:"desc"` +} + +type ChunkServer struct { + Id uint32 `json:"id" binding:"required"` + DiskType string `json:"diskType" binding:"required"` + HostIp string `json:"hostIp" binding:"required"` + Port uint32 `json:"port" binding:"required"` + Status string `json:"status" binding:"required"` + DiskStatus string `json:"diskStatus" binding:"required"` + OnlineStatus string `json:"onlineStatus" binding:"required"` + MountPoint string `json:"mountPoint" binding:"required"` + DiskCapacity string `json:"diskCapacity" binding:"required"` + DiskUsed string `json:"diskUsed" binding:"required"` + ExternalIp string `json:"externalIp"` +} + +type CopySetInfo struct { + LogicalPoolId uint32 `json:"logicalPoolId" binding:"required"` + CopysetId uint32 `json:"copysetId" binding:"required"` + Scanning bool `json:"scanning"` + LastScanSec uint64 `json:"lastScanSec"` + LastScanConsistent bool `json:"lastScanConsistent"` +} + +type ChunkServerLocation struct { + ChunkServerId uint32 `json:"chunkServerId" binding:"required"` + HostIp string `json:"hostIp" binding:"required"` + Port uint32 `josn:"port" binding:"required"` + ExternalIp string `json:"externalIp"` +} +type CopySetServerInfo struct { + CopysetId uint32 `json:"copysetId" binding:"required"` + CsLocs []ChunkServerLocation `json:"csLocs" binding:"required"` +} + +func (cli *MdsClient) ListPhysicalPool() ([]PhysicalPool, error) { + var host = cli.addrs + var path = HTTP_Service + "/" + LIST_PHYSICAL_POOL_FUNC_HTTP + ret := cli.baseClient_http.SendHTTP(host, path) + if ret.Err != nil { + return nil, ret.Err + } + v := ret.Result.(*resty.Response).String() + var response *topology.ListPhysicalPoolResponse + res := json.Unmarshal([]byte(v), &response) + //response := ret.Result.(*topology.ListPhysicalPoolResponse) + //statusCode := response.GetStatusCode() + if res != nil { + return nil, res + } + statusCode := response.GetStatusCode() + if statusCode != int32(statuscode.TopoStatusCode_Success) { + return nil, fmt.Errorf(statuscode.TopoStatusCode_name[statusCode]) + } + + var infos []PhysicalPool + for _, pool := range response.GetPhysicalPoolInfos() { + info := PhysicalPool{} + info.Id = pool.GetPhysicalPoolID() + info.Name = pool.GetPhysicalPoolName() + info.Desc = pool.GetDesc() + infos = append(infos, info) + } + return infos, nil +} + +func getLogicalPoolType(t topology.LogicalPoolType) string { + switch t { + case topology.PAGEFILE: + return PAGEFILE_TYPE + case topology.APPENDFILE: + return APPENDFILE_TYPE + case topology.APPENDECFILE: + return APPENDECFILE_TYPE + default: + return INVALID + } +} + +func getLogicalPoolAllocateStatus(s topology.AllocateStatus) string { + switch s { + case topology.ALLOW: + return ALLOW_STATUS + case topology.DENY: + return DENY_STATUS + default: + return INVALID + } +} + +func (cli *MdsClient) ListLogicalPool() ([]LogicalPool, error) { + // list physical pool and get pool id + physicalPools, err := cli.ListPhysicalPool() + if err != nil { + return nil, err + } + size := len(physicalPools) + results := make(chan baseHttp.HttpResult, size) + for _, pool := range physicalPools { + go func(id uint32) { + var host = cli.addrs + // todo: checkURL + var path = HTTP_Service + "/" + LIST_LOGICAL_POOL_FUNC_HTTP + "?" + "physicalPoolID=" + path = fmt.Sprintf("%s %d", path, &id) + + ret := cli.baseClient_http.SendHTTP(host, path) + if ret.Err != nil { + results <- baseHttp.HttpResult{ + Key: id, + Err: fmt.Errorf("%s: %v", ret.Key, ret.Err), + Result: nil, + } + } else { + v := ret.Result.(*resty.Response).String() + var response *topology.ListLogicalPoolResponse + err := json.Unmarshal([]byte(v), &response) + if err != nil { + results <- baseHttp.HttpResult{ + Key: id, + Err: err, + Result: nil, + } + } else { + statusCode := response.GetStatusCode() + if statusCode != int32(statuscode.TopoStatusCode_Success) { + results <- baseHttp.HttpResult{ + Key: id, + Err: fmt.Errorf("%s", statuscode.TopoStatusCode_name[statusCode]), + Result: nil, + } + } else { + var pools []LogicalPool + for _, pool := range response.GetLogicalPoolInfos() { + info := LogicalPool{} + info.Id = pool.GetLogicalPoolID() + info.Name = pool.GetLogicalPoolName() + info.PhysicalPoolId = pool.GetPhysicalPoolID() + info.Type = getLogicalPoolType(pool.GetType()) + info.CreateTime = time.Unix(int64(pool.GetCreateTime()), 0).Format(common.TIME_FORMAT) + info.AllocateStatus = getLogicalPoolAllocateStatus(pool.GetAllocateStatus()) + info.ScanEnable = pool.GetScanEnable() + pools = append(pools, info) + } + results <- baseHttp.HttpResult{ + Key: id, + Err: nil, + Result: &pools, + } + } + } + } + }(pool.Id) + } + + pools := []LogicalPool{} + count := 0 + for res := range results { + if res.Err != nil { + return nil, fmt.Errorf("physical pool id: %d; %v", res.Key, res.Err) + } + pools = append(pools, (*res.Result.(*[]LogicalPool))...) + count++ + if count >= size { + break + } + } + return pools, nil +} + +func (cli *MdsClient) GetLogicalPool(poolId uint32) (LogicalPool, error) { + info := LogicalPool{} + var host = cli.addrs + var path = HTTP_Service + "/" + GET_LOGICAL_POOL_HTTP + //todo: checkURL + path = fmt.Sprintf("%s %s %d", path, "?LogicPoolId=", &poolId) + + ret := cli.baseClient_http.SendHTTP(host, path) + + if ret.Err != nil { + return info, ret.Err + } + v := ret.Result.(*resty.Response).String() + var response *topology.GetLogicalPoolResponse + err := json.Unmarshal([]byte(v), &response) + if err != nil { + return info, err + } + statusCode := response.GetStatusCode() + if statusCode != int32(statuscode.TopoStatusCode_Success) { + return info, fmt.Errorf(statuscode.TopoStatusCode_name[statusCode]) + } + pool := response.GetLogicalPoolInfo() + info.Id = pool.GetLogicalPoolID() + info.Name = pool.GetLogicalPoolName() + info.PhysicalPoolId = pool.GetPhysicalPoolID() + info.Type = getLogicalPoolType(pool.GetType()) + info.CreateTime = time.Unix(int64(pool.GetCreateTime()), 0).Format(common.TIME_FORMAT) + info.AllocateStatus = getLogicalPoolAllocateStatus(pool.GetAllocateStatus()) + info.ScanEnable = pool.GetScanEnable() + return info, nil +} + +// list zones of physical pool + +func (cli *MdsClient) ListPoolZone(poolId uint32) ([]Zone, error) { + var host = cli.addrs + //todo checkURL + var path = LIST_POOL_ZONE_FUNC_HTTP + path = fmt.Sprintf("%s %s %d", path, "PhysicalPoolId=", &poolId) + + ret := cli.baseClient_http.SendHTTP(host, path) + + if ret.Err != nil { + return nil, ret.Err + } + v := ret.Result.(*resty.Response).String() + var response *topology.ListPoolZoneResponse + err := json.Unmarshal([]byte(v), &response) + if err != nil { + return nil, err + } + statusCode := response.GetStatusCode() + if statusCode != int32(statuscode.TopoStatusCode_Success) { + return nil, fmt.Errorf(statuscode.TopoStatusCode_name[statusCode]) + } + + infos := []Zone{} + for _, zone := range response.GetZones() { + info := Zone{} + info.Id = zone.GetZoneID() + info.Name = zone.GetZoneName() + info.PhysicalPoolId = zone.GetPhysicalPoolID() + info.PhysicalPoolName = zone.GetPhysicalPoolName() + info.Desc = zone.GetDesc() + infos = append(infos, info) + } + return infos, nil +} + +// list servers of zone + +func (cli *MdsClient) ListZoneServer(zoneId uint32) ([]Server, error) { + + var host = cli.addrs + var path = LIST_ZONE_SERVER_FUNC_HTTP + path = fmt.Sprintf("%s %s %d", path, "ZoneId=", &zoneId) + + ret := cli.baseClient_http.SendHTTP(host, path) + if ret.Err != nil { + return nil, ret.Err + } + v := ret.Result.(*resty.Response).String() + var response *topology.ListZoneServerResponse + err := json.Unmarshal([]byte(v), &response) + if err != nil { + return nil, err + } + + statusCode := response.GetStatusCode() + if statusCode != int32(statuscode.TopoStatusCode_Success) { + return nil, fmt.Errorf(statuscode.TopoStatusCode_name[statusCode]) + } + + infos := []Server{} + for _, server := range response.GetServerInfo() { + info := Server{} + info.Id = server.GetServerID() + info.HostName = server.GetHostName() + info.InternalIp = server.GetInternalIp() + info.InternalPort = server.GetInternalPort() + info.ExternalIp = server.GetExternalIp() + info.ExternalPort = server.GetExternalPort() + info.ZoneId = server.GetZoneID() + info.ZoneName = server.GetZoneName() + info.PhysicalPoolId = server.GetPhysicalPoolID() + info.PhysicalPoolName = server.GetPhysicalPoolName() + info.Desc = server.GetDesc() + infos = append(infos, info) + } + return infos, nil +} + +// list chunkservers of server + +func getChunkServerStatus(s topology.ChunkServerStatus) string { + switch s { + case topology.READWRITE: + return READWRITE_STATUS + case topology.PENDDING: + return PENDDING_STATUS + case topology.RETIRED: + return RETIRED_STATUS + default: + return INVALID + } +} + +func getDiskStatus(s topology.DiskState) string { + switch s { + case topology.DISKNORMAL: + return DISKNORMAL_STATUS + case topology.DISKERROR: + return DISKERROR_STATUS + default: + return INVALID + } +} + +func getOnlineStatus(s topology.OnlineState) string { + switch s { + case topology.ONLINE: + return ONLINE_STATUS + case topology.OFFLINE: + return OFFLINE_STATUS + case topology.UNSTABLE: + return UNSTABLE_STATUS + default: + return INVALID + } +} + +func (cli *MdsClient) ListChunkServer(serverId uint32) ([]ChunkServer, error) { + var host = cli.addrs + var path = HTTP_Service + "/" + LIST_CHUNKSERVER_FUNC_HTTP + //todo checkURL + path = fmt.Sprintf("%s %s %d", path, "?serverId=", &serverId) + ret := cli.baseClient_http.SendHTTP(host, path) + + if ret.Err != nil { + return nil, ret.Err + } + v := ret.Result.(*resty.Response).String() + var response *topology.ListChunkServerResponse + err := json.Unmarshal([]byte(v), &response) + if err != nil { + return nil, err + } + statusCode := response.GetStatusCode() + if statusCode != int32(statuscode.TopoStatusCode_Success) { + return nil, fmt.Errorf(statuscode.TopoStatusCode_name[statusCode]) + } + + infos := []ChunkServer{} + for _, cs := range response.GetChunkServerInfos() { + if cs.GetStatus() == topology.RETIRED { + continue + } + info := ChunkServer{} + info.Id = cs.GetChunkServerID() + info.DiskType = cs.GetDiskType() + info.HostIp = cs.GetHostIp() + info.Port = cs.GetPort() + info.Status = getChunkServerStatus(cs.GetStatus()) + info.DiskStatus = getDiskStatus(cs.GetDiskStatus()) + info.OnlineStatus = getOnlineStatus(cs.GetOnlineState()) + info.MountPoint = cs.GetMountPoint() + info.DiskCapacity = strconv.FormatUint(cs.GetDiskCapacity()/common.GiB, 10) + info.DiskUsed = strconv.FormatUint(cs.GetDiskUsed()/common.GiB, 10) + info.ExternalIp = cs.GetExternalIp() + infos = append(infos, info) + } + return infos, nil +} + +func (cli *MdsClient) GetChunkServerInCluster() ([]ChunkServer, error) { + var host = cli.addrs + //todo check URL service + var path = HTTP_Service + GET_CHUNKSERVER_IN_CLUSTER_FUNC_HTTP + ret := cli.baseClient_http.SendHTTP(host, path) + if ret.Err != nil { + return nil, ret.Err + } + v := ret.Result.(*resty.Response).String() + var response *topology.GetChunkServerInClusterResponse + err := json.Unmarshal([]byte(v), &response) + if err != nil { + return nil, err + } + statusCode := response.GetStatusCode() + if statusCode != int32(statuscode.TopoStatusCode_Success) { + return nil, fmt.Errorf(statuscode.TopoStatusCode_name[statusCode]) + } + infos := []ChunkServer{} + for _, cs := range response.GetChunkServerInfos() { + if cs.GetStatus() == topology.RETIRED { + continue + } + info := ChunkServer{} + info.Id = cs.GetChunkServerID() + info.DiskType = cs.GetDiskType() + info.HostIp = cs.GetHostIp() + info.Port = cs.GetPort() + info.Status = getChunkServerStatus(cs.GetStatus()) + info.DiskStatus = getDiskStatus(cs.GetDiskStatus()) + info.OnlineStatus = getOnlineStatus(cs.GetOnlineState()) + info.MountPoint = cs.GetMountPoint() + info.DiskCapacity = strconv.FormatUint(cs.GetDiskCapacity()/common.GiB, 10) + info.DiskUsed = strconv.FormatUint(cs.GetDiskUsed()/common.GiB, 10) + info.ExternalIp = cs.GetExternalIp() + infos = append(infos, info) + } + return infos, nil +} + +func (cli *MdsClient) GetCopySetsInChunkServer(ip string, port uint32) ([]CopySetInfo, error) { + var host = cli.addrs + //todo checkURL + var path = GET_COPYSET_IN_CHUNKSERVER_FUNC_HTTP + + ret := cli.baseClient_http.SendHTTP(host, path) + if ret.Err != nil { + return nil, ret.Err + } + v := ret.Result.(*resty.Response).String() + var response *topology.GetCopySetsInChunkServerResponse + err := json.Unmarshal([]byte(v), &response) + if err != nil { + return nil, err + } + statusCode := response.GetStatusCode() + if statusCode != int32(statuscode.TopoStatusCode_Success) { + return nil, fmt.Errorf(statuscode.TopoStatusCode_name[statusCode]) + } + infos := []CopySetInfo{} + for _, cs := range response.GetCopysetInfos() { + info := CopySetInfo{} + info.LogicalPoolId = cs.GetLogicalPoolId() + info.CopysetId = cs.GetCopysetId() + info.Scanning = cs.GetScaning() + info.LastScanSec = cs.GetLastScanSec() + info.LastScanConsistent = cs.GetLastScanConsistent() + infos = append(infos, info) + } + return infos, nil +} + +func (cli *MdsClient) GetChunkServerListInCopySets(logicalPoolId uint32, copysetIds []uint32) ([]CopySetServerInfo, error) { + var host = cli.addrs + //todo checkURL + var path = HTTP_Service + "/" + GET_CHUNKSERVER_LIST_IN_COPYSETS_HTTP + path = fmt.Sprintf("%s %s %d", path, "LogicPoolId=", &logicalPoolId) + ret := cli.baseClient_http.SendHTTP(host, path) + if ret.Err != nil { + return nil, ret.Err + } + v := ret.Result.(*resty.Response).String() + var response *topology.GetChunkServerListInCopySetsResponse + err := json.Unmarshal([]byte(v), &response) + if err != nil { + return nil, err + } + statusCode := response.GetStatusCode() + if statusCode != int32(statuscode.TopoStatusCode_Success) { + return nil, fmt.Errorf(statuscode.TopoStatusCode_name[statusCode]) + } + infos := []CopySetServerInfo{} + for _, csInfo := range response.GetCsInfo() { + info := CopySetServerInfo{} + info.CopysetId = csInfo.GetCopysetId() + for _, locs := range csInfo.GetCsLocs() { + var l ChunkServerLocation + l.ChunkServerId = locs.GetChunkServerID() + l.HostIp = locs.GetHostIp() + l.Port = locs.GetPort() + l.ExternalIp = locs.GetExternalIp() + info.CsLocs = append(info.CsLocs, l) + } + infos = append(infos, info) + } + return infos, nil +} + +func (cli *MdsClient) GetCopySetsInCluster() ([]CopySetInfo, error) { + var host = cli.addrs + //todo checkURL + var path = HTTP_Service + "/" + LIST_PHYSICAL_POOL_FUNC_HTTP + ret := cli.baseClient_http.SendHTTP(host, path) + if ret.Err != nil { + return nil, ret.Err + } + v := ret.Result.(*resty.Response).String() + var response *topology.GetCopySetsInClusterResponse + err := json.Unmarshal([]byte(v), &response) + if err != nil { + return nil, err + } + statusCode := response.GetStatusCode() + if statusCode != int32(statuscode.TopoStatusCode_Success) { + return nil, fmt.Errorf(statuscode.TopoStatusCode_name[statusCode]) + } + infos := []CopySetInfo{} + for _, csInfo := range response.GetCopysetInfos() { + info := CopySetInfo{} + info.CopysetId = csInfo.GetCopysetId() + info.LastScanConsistent = csInfo.GetLastScanConsistent() + info.LastScanSec = csInfo.GetLastScanSec() + info.LogicalPoolId = csInfo.GetLogicalPoolId() + info.Scanning = csInfo.GetScaning() + infos = append(infos, info) + } + return infos, nil +} diff --git a/internal/http/curvebs/topology_test.go b/internal/http/curvebs/topology_test.go new file mode 100644 index 0000000..65a990b --- /dev/null +++ b/internal/http/curvebs/topology_test.go @@ -0,0 +1,93 @@ +package curvebs + +import ( + "encoding/json" + "fmt" + "github.com/go-resty/resty/v2" + "github.com/opencurve/curve-manager/internal/common" + "net/url" + "strings" + "testing" +) + +var ( + clientOption MdsClientOption = MdsClientOption{ + TimeoutMs: 500, + RetryTimes: 3, + Addrs: []string{"192.168.170.138:6702"}, + } +) + +const ( + CLUSTER_SERVICES_ADDRESS = "cluster.service.addr" +) + +type admHttpResponse struct { + ErrorCode string `json:"errorCode"` + ErrorMsg string `json:"errorMsg"` + Data clusterServicesAddr `json:"data"` +} +type clusterServicesAddr struct { + ClusterId int `json:"clusterId"` + Addrs map[string]string `json:"addrs"` +} + +func TestMdsClient_ListPhysicalPool_http(t *testing.T) { + info, err := GetCurrentClusterServicesAddr() + if err != nil { + + } + mds_addr := info.Addrs[CURVEBS_MDS_ADDRESS] + if mds_addr != "" { + Addrs := strings.Split(mds_addr, common.CURVEBS_ADDRESS_DELIMITER) + for _, addr := range Addrs { + httpClient := common.GetHttpClient() + url := (&url.URL{ + Scheme: "http", + Host: addr, + Path: "/", + RawQuery: fmt.Sprintf("%s=%s", "method", CLUSTER_SERVICES_ADDRESS), + }).String() + + resp, err := resty.NewWithClient(httpClient).R(). + SetHeader("Connection", "Keep-Alive"). + SetHeader("Content-Type", "application/json"). + SetHeader("User-Agent", "Curve-Manager"). + Execute("GET", url) + if resp.Body() != nil { + + } + if err != nil { + + } + } + } + +} + +func GetCurrentClusterServicesAddr() (clusterServicesAddr, error) { + ret := clusterServicesAddr{} + httpClient := common.GetHttpClient() + url := (&url.URL{ + Scheme: "http", + Host: "127.0.0.1:11000", + Path: "/", + RawQuery: fmt.Sprintf("%s=%s", "method", CLUSTER_SERVICES_ADDRESS), + }).String() + + resp, err := resty.NewWithClient(httpClient).R(). + SetHeader("Connection", "Keep-Alive"). + SetHeader("Content-Type", "application/json"). + SetHeader("User-Agent", "Curve-Manager"). + Execute("GET", url) + if err != nil { + return ret, fmt.Errorf("getClusterServicesAddr failed: %v", err) + } + + respStruct := admHttpResponse{} + err = json.Unmarshal([]byte(resp.String()), &respStruct) + if err != nil { + return ret, fmt.Errorf("Unmarshal getClusterServicesAddr response failed, resp = %s, err = %v", resp.String(), err) + } + return respStruct.Data, nil +} diff --git a/internal/http/nameserver2/nameserver2.go b/internal/http/nameserver2/nameserver2.go new file mode 100644 index 0000000..133610b --- /dev/null +++ b/internal/http/nameserver2/nameserver2.go @@ -0,0 +1,668 @@ +package nameserver2 + +import "google.golang.org/protobuf/runtime/protoimpl" + +type FileType int32 + +const ( + FileType_INODE_DIRECTORY FileType = 0 + FileType_INODE_PAGEFILE FileType = 1 + FileType_INODE_APPENDFILE FileType = 2 + FileType_INODE_APPENDECFILE FileType = 3 + FileType_INODE_SNAPSHOT_PAGEFILE FileType = 4 +) + +var ( + StatusCode_name = map[int32]string{ + 0: "kOK", + 101: "kFileExists", + 102: "kFileNotExists", + 103: "kNotDirectory", + 104: "kParaError", + 105: "kShrinkBiggerFile", + 106: "kExtentUnitError", + 107: "kSegmentNotAllocated", + 108: "kSegmentAllocateError", + 109: "kDirNotExist", + 110: "kNotSupported", + 111: "kOwnerAuthFail", + 112: "kDirNotEmpty", + 120: "kFileUnderSnapShot", + 121: "kFileNotUnderSnapShot", + 122: "kSnapshotDeleting", + 123: "kSnapshotFileNotExists", + 124: "kSnapshotFileDeleteError", + 125: "kSessionNotExist", + 126: "kFileOccupied", + 127: "kCloneFileNameIllegal", + 128: "kCloneStatusNotMatch", + 129: "kCommonFileDeleteError", + 130: "kFileIdNotMatch", + 131: "kFileUnderDeleting", + 132: "kFileLengthNotSupported", + 133: "kDeleteFileBeingCloned", + 134: "kClientVersionNotMatch", + 135: "kSnapshotFrozen", + 136: "kSnapshotCloneConnectFail", + 137: "kSnapshotCloneServerNotInit", + 138: "kRecoverFileCloneMetaInstalled", + 139: "kRecoverFileError", + 140: "kEpochTooOld", + 501: "kStorageError", + 502: "KInternalError", + } + StatusCode_value = map[string]int32{ + "kOK": 0, + "kFileExists": 101, + "kFileNotExists": 102, + "kNotDirectory": 103, + "kParaError": 104, + "kShrinkBiggerFile": 105, + "kExtentUnitError": 106, + "kSegmentNotAllocated": 107, + "kSegmentAllocateError": 108, + "kDirNotExist": 109, + "kNotSupported": 110, + "kOwnerAuthFail": 111, + "kDirNotEmpty": 112, + "kFileUnderSnapShot": 120, + "kFileNotUnderSnapShot": 121, + "kSnapshotDeleting": 122, + "kSnapshotFileNotExists": 123, + "kSnapshotFileDeleteError": 124, + "kSessionNotExist": 125, + "kFileOccupied": 126, + "kCloneFileNameIllegal": 127, + "kCloneStatusNotMatch": 128, + "kCommonFileDeleteError": 129, + "kFileIdNotMatch": 130, + "kFileUnderDeleting": 131, + "kFileLengthNotSupported": 132, + "kDeleteFileBeingCloned": 133, + "kClientVersionNotMatch": 134, + "kSnapshotFrozen": 135, + "kSnapshotCloneConnectFail": 136, + "kSnapshotCloneServerNotInit": 137, + "kRecoverFileCloneMetaInstalled": 138, + "kRecoverFileError": 139, + "kEpochTooOld": 140, + "kStorageError": 501, + "KInternalError": 502, + } +) + +type StatusCode int32 + +const ( + // 执行成功 + StatusCode_kOK StatusCode = 0 + // 文件已存在 + StatusCode_kFileExists StatusCode = 101 + // 文件不存在 + StatusCode_kFileNotExists StatusCode = 102 + // 非目录类型 + StatusCode_kNotDirectory StatusCode = 103 + // 传入参数错误 + StatusCode_kParaError StatusCode = 104 + // 缩小文件,目前不支持缩小文件 + StatusCode_kShrinkBiggerFile StatusCode = 105 + // 扩容单位错误,非segment size整数倍 + StatusCode_kExtentUnitError StatusCode = 106 + // segment未分配 + StatusCode_kSegmentNotAllocated StatusCode = 107 + // segment分配失败 + StatusCode_kSegmentAllocateError StatusCode = 108 + // 目录不存在 + StatusCode_kDirNotExist StatusCode = 109 + // 功能不支持 + StatusCode_kNotSupported StatusCode = 110 + // owner认证失败 + StatusCode_kOwnerAuthFail StatusCode = 111 + // 目录非空 + StatusCode_kDirNotEmpty StatusCode = 112 + // 文件已处于快照中 + StatusCode_kFileUnderSnapShot StatusCode = 120 + // 文件不在快照中 + StatusCode_kFileNotUnderSnapShot StatusCode = 121 + // 快照删除中 + StatusCode_kSnapshotDeleting StatusCode = 122 + // 快照文件不存在 + StatusCode_kSnapshotFileNotExists StatusCode = 123 + // 快照文件删除失败 + StatusCode_kSnapshotFileDeleteError StatusCode = 124 + // session不存在 + StatusCode_kSessionNotExist StatusCode = 125 + // 文件已被占用 + StatusCode_kFileOccupied StatusCode = 126 + StatusCode_kCloneFileNameIllegal StatusCode = 127 + StatusCode_kCloneStatusNotMatch StatusCode = 128 + // 文件删除失败 + StatusCode_kCommonFileDeleteError StatusCode = 129 + // 文件id不匹配 + StatusCode_kFileIdNotMatch StatusCode = 130 + // 文件在删除中 + StatusCode_kFileUnderDeleting StatusCode = 131 + // 文件长度不符合要求 + StatusCode_kFileLengthNotSupported StatusCode = 132 + // 文件正在被克隆 + StatusCode_kDeleteFileBeingCloned StatusCode = 133 + // client版本不匹配 + StatusCode_kClientVersionNotMatch StatusCode = 134 + // snapshot功能禁用中 + StatusCode_kSnapshotFrozen StatusCode = 135 + // 快照克隆服务连不上 + StatusCode_kSnapshotCloneConnectFail StatusCode = 136 + // 快照克隆服务未初始化 + StatusCode_kSnapshotCloneServerNotInit StatusCode = 137 + // recover file status is CloneMetaInstalled + StatusCode_kRecoverFileCloneMetaInstalled StatusCode = 138 + // recover file fail + StatusCode_kRecoverFileError StatusCode = 139 + // epoch too old + StatusCode_kEpochTooOld StatusCode = 140 + // 元数据存储错误 + StatusCode_kStorageError StatusCode = 501 + // 内部错误 + StatusCode_KInternalError StatusCode = 502 +) + +type GetAllocatedSizeResponse struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + StatusCode *StatusCode `protobuf:"varint,1,req,name=statusCode,enum=curve.mds.StatusCode" json:"statusCode,omitempty"` + // 文件或目录的分配大小 + AllocatedSize *uint64 `protobuf:"varint,2,opt,name=allocatedSize" json:"allocatedSize,omitempty"` + // key是逻辑池id,value是分配大小 + AllocSizeMap map[uint32]uint64 `protobuf:"bytes,3,rep,name=allocSizeMap" json:"allocSizeMap,omitempty" protobuf_key:"varint,1,opt,name=key" protobuf_val:"varint,2,opt,name=value"` +} + +type FileStatus int32 + +const ( + // 文件创建完成 + FileStatus_kFileCreated FileStatus = 0 + // 文件删除中 + FileStatus_kFileDeleting FileStatus = 1 + // 文件正在克隆 + FileStatus_kFileCloning FileStatus = 2 + // 文件元数据安装完毕 + FileStatus_kFileCloneMetaInstalled FileStatus = 3 + // 文件克隆完成 + FileStatus_kFileCloned FileStatus = 4 + // 文件正在被克隆 + FileStatus_kFileBeingCloned FileStatus = 5 +) + +// Enum value maps for FileStatus. +var ( + FileStatus_name = map[int32]string{ + 0: "kFileCreated", + 1: "kFileDeleting", + 2: "kFileCloning", + 3: "kFileCloneMetaInstalled", + 4: "kFileCloned", + 5: "kFileBeingCloned", + } + FileStatus_value = map[string]int32{ + "kFileCreated": 0, + "kFileDeleting": 1, + "kFileCloning": 2, + "kFileCloneMetaInstalled": 3, + "kFileCloned": 4, + "kFileBeingCloned": 5, + } +) + +type ThrottleType int32 + +const ( + ThrottleType_IOPS_TOTAL ThrottleType = 1 + ThrottleType_IOPS_READ ThrottleType = 2 + ThrottleType_IOPS_WRITE ThrottleType = 3 + ThrottleType_BPS_TOTAL ThrottleType = 4 + ThrottleType_BPS_READ ThrottleType = 5 + ThrottleType_BPS_WRITE ThrottleType = 6 +) + +// Enum value maps for ThrottleType. +var ( + ThrottleType_name = map[int32]string{ + 1: "IOPS_TOTAL", + 2: "IOPS_READ", + 3: "IOPS_WRITE", + 4: "BPS_TOTAL", + 5: "BPS_READ", + 6: "BPS_WRITE", + } + ThrottleType_value = map[string]int32{ + "IOPS_TOTAL": 1, + "IOPS_READ": 2, + "IOPS_WRITE": 3, + "BPS_TOTAL": 4, + "BPS_READ": 5, + "BPS_WRITE": 6, + } +) + +type ListDirResponse struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + StatusCode *StatusCode `protobuf:"varint,1,req,name=statusCode,enum=curve.mds.StatusCode" json:"statusCode,omitempty"` + FileInfo []*FileInfo `protobuf:"bytes,2,rep,name=fileInfo" json:"fileInfo,omitempty"` +} + +func (x *GetAllocatedSizeResponse) GetStatusCode() StatusCode { + if x != nil && x.StatusCode != nil { + return *x.StatusCode + } + return StatusCode_kOK +} + +func (x *GetAllocatedSizeResponse) GetAllocatedSize() uint64 { + if x != nil && x.AllocatedSize != nil { + return *x.AllocatedSize + } + return 0 +} + +func (x *GetAllocatedSizeResponse) GetAllocSizeMap() map[uint32]uint64 { + if x != nil { + return x.AllocSizeMap + } + return nil +} + +func (x *ListDirResponse) GetStatusCode() StatusCode { + if x != nil && x.StatusCode != nil { + return *x.StatusCode + } + return StatusCode_kOK +} + +func (x *ListDirResponse) GetFileInfo() []*FileInfo { + if x != nil { + return x.FileInfo + } + return nil +} + +type FileInfo struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Id *uint64 `protobuf:"varint,1,opt,name=id" json:"id,omitempty"` + FileName *string `protobuf:"bytes,2,opt,name=fileName" json:"fileName,omitempty"` + ParentId *uint64 `protobuf:"varint,3,opt,name=parentId" json:"parentId,omitempty"` + FileType *FileType `protobuf:"varint,4,opt,name=fileType,enum=curve.mds.FileType" json:"fileType,omitempty"` + Owner *string `protobuf:"bytes,5,opt,name=owner" json:"owner,omitempty"` + ChunkSize *uint32 `protobuf:"varint,6,opt,name=chunkSize" json:"chunkSize,omitempty"` + SegmentSize *uint32 `protobuf:"varint,7,opt,name=segmentSize" json:"segmentSize,omitempty"` + Length *uint64 `protobuf:"varint,8,opt,name=length" json:"length,omitempty"` + Ctime *uint64 `protobuf:"varint,9,opt,name=ctime" json:"ctime,omitempty"` + SeqNum *uint64 `protobuf:"varint,10,opt,name=seqNum" json:"seqNum,omitempty"` + FileStatus *FileStatus `protobuf:"varint,11,opt,name=fileStatus,enum=curve.mds.FileStatus" json:"fileStatus,omitempty"` + // 用于文件转移到回收站的情况下恢复场景下的使用, + // RecycleBin(回收站)目录下使用/其他场景下不使用 + OriginalFullPathName *string `protobuf:"bytes,12,opt,name=originalFullPathName" json:"originalFullPathName,omitempty"` + // cloneSource 当前用于存放克隆源(当前主要用于curvefs) + // 后期可以考虑存放 s3相关信息 + CloneSource *string `protobuf:"bytes,13,opt,name=cloneSource" json:"cloneSource,omitempty"` + // cloneLength 克隆源文件的长度,用于clone过程中进行extent + CloneLength *uint64 `protobuf:"varint,14,opt,name=cloneLength" json:"cloneLength,omitempty"` + StripeUnit *uint64 `protobuf:"varint,15,opt,name=stripeUnit" json:"stripeUnit,omitempty"` + StripeCount *uint64 `protobuf:"varint,16,opt,name=stripeCount" json:"stripeCount,omitempty"` + ThrottleParams *FileThrottleParams `protobuf:"bytes,17,opt,name=throttleParams" json:"throttleParams,omitempty"` + Epoch *uint64 `protobuf:"varint,18,opt,name=epoch" json:"epoch,omitempty"` +} + +func (x *FileInfo) GetId() uint64 { + if x != nil && x.Id != nil { + return *x.Id + } + return 0 +} + +func (x *FileInfo) GetFileName() string { + if x != nil && x.FileName != nil { + return *x.FileName + } + return "" +} + +func (x *FileInfo) GetParentId() uint64 { + if x != nil && x.ParentId != nil { + return *x.ParentId + } + return 0 +} + +func (x *FileInfo) GetFileType() FileType { + if x != nil && x.FileType != nil { + return *x.FileType + } + return FileType_INODE_DIRECTORY +} + +func (x *FileInfo) GetOwner() string { + if x != nil && x.Owner != nil { + return *x.Owner + } + return "" +} + +func (x *FileInfo) GetChunkSize() uint32 { + if x != nil && x.ChunkSize != nil { + return *x.ChunkSize + } + return 0 +} + +func (x *FileInfo) GetSegmentSize() uint32 { + if x != nil && x.SegmentSize != nil { + return *x.SegmentSize + } + return 0 +} + +func (x *FileInfo) GetLength() uint64 { + if x != nil && x.Length != nil { + return *x.Length + } + return 0 +} + +func (x *FileInfo) GetCtime() uint64 { + if x != nil && x.Ctime != nil { + return *x.Ctime + } + return 0 +} + +func (x *FileInfo) GetSeqNum() uint64 { + if x != nil && x.SeqNum != nil { + return *x.SeqNum + } + return 0 +} + +func (x *FileInfo) GetFileStatus() FileStatus { + if x != nil && x.FileStatus != nil { + return *x.FileStatus + } + return FileStatus_kFileCreated +} + +func (x *FileInfo) GetOriginalFullPathName() string { + if x != nil && x.OriginalFullPathName != nil { + return *x.OriginalFullPathName + } + return "" +} + +func (x *FileInfo) GetCloneSource() string { + if x != nil && x.CloneSource != nil { + return *x.CloneSource + } + return "" +} + +func (x *FileInfo) GetCloneLength() uint64 { + if x != nil && x.CloneLength != nil { + return *x.CloneLength + } + return 0 +} + +func (x *FileInfo) GetStripeUnit() uint64 { + if x != nil && x.StripeUnit != nil { + return *x.StripeUnit + } + return 0 +} + +func (x *FileInfo) GetStripeCount() uint64 { + if x != nil && x.StripeCount != nil { + return *x.StripeCount + } + return 0 +} + +func (x *FileInfo) GetThrottleParams() *FileThrottleParams { + if x != nil { + return x.ThrottleParams + } + return nil +} + +func (x *FileInfo) GetEpoch() uint64 { + if x != nil && x.Epoch != nil { + return *x.Epoch + } + return 0 +} + +type FileThrottleParams struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + ThrottleParams []*ThrottleParams `protobuf:"bytes,1,rep,name=throttleParams" json:"throttleParams,omitempty"` +} + +func (x *FileThrottleParams) GetThrottleParams() []*ThrottleParams { + if x != nil { + return x.ThrottleParams + } + return nil +} + +type ThrottleParams struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Type *ThrottleType `protobuf:"varint,1,req,name=type,enum=curve.mds.ThrottleType" json:"type,omitempty"` + Limit *uint64 `protobuf:"varint,2,req,name=limit" json:"limit,omitempty"` + Burst *uint64 `protobuf:"varint,3,opt,name=burst" json:"burst,omitempty"` + BurstLength *uint64 `protobuf:"varint,4,opt,name=burstLength" json:"burstLength,omitempty"` +} + +func (x *ThrottleParams) GetType() ThrottleType { + if x != nil && x.Type != nil { + return *x.Type + } + return ThrottleType_IOPS_TOTAL +} + +func (x *ThrottleParams) GetLimit() uint64 { + if x != nil && x.Limit != nil { + return *x.Limit + } + return 0 +} + +func (x *ThrottleParams) GetBurst() uint64 { + if x != nil && x.Burst != nil { + return *x.Burst + } + return 0 +} + +func (x *ThrottleParams) GetBurstLength() uint64 { + if x != nil && x.BurstLength != nil { + return *x.BurstLength + } + return 0 +} + +type GetFileInfoResponse struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + StatusCode *StatusCode `protobuf:"varint,1,req,name=statusCode,enum=curve.mds.StatusCode" json:"statusCode,omitempty"` + FileInfo *FileInfo `protobuf:"bytes,2,opt,name=fileInfo" json:"fileInfo,omitempty"` +} + +type GetFileSizeResponse struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + StatusCode *StatusCode `protobuf:"varint,1,req,name=statusCode,enum=curve.mds.StatusCode" json:"statusCode,omitempty"` + // 文件或目录的file length + FileSize *uint64 `protobuf:"varint,2,opt,name=fileSize" json:"fileSize,omitempty"` +} + +func (x *GetFileSizeResponse) GetStatusCode() StatusCode { + if x != nil && x.StatusCode != nil { + return *x.StatusCode + } + return StatusCode_kOK +} + +func (x *GetFileSizeResponse) GetFileSize() uint64 { + if x != nil && x.FileSize != nil { + return *x.FileSize + } + return 0 +} + +func (x *GetFileInfoResponse) GetStatusCode() StatusCode { + if x != nil && x.StatusCode != nil { + return *x.StatusCode + } + return StatusCode_kOK +} + +func (x *GetFileInfoResponse) GetFileInfo() *FileInfo { + if x != nil { + return x.FileInfo + } + return nil +} + +type DeleteFileResponse struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + StatusCode *StatusCode `protobuf:"varint,1,req,name=statusCode,enum=curve.mds.StatusCode" json:"statusCode,omitempty"` +} +type RecoverFileResponse struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + StatusCode *StatusCode `protobuf:"varint,1,req,name=statusCode,enum=curve.mds.StatusCode" json:"statusCode,omitempty"` +} + +func (x *DeleteFileResponse) GetStatusCode() StatusCode { + if x != nil && x.StatusCode != nil { + return *x.StatusCode + } + return StatusCode_kOK +} + +func (x *RecoverFileResponse) GetStatusCode() StatusCode { + if x != nil && x.StatusCode != nil { + return *x.StatusCode + } + return StatusCode_kOK +} + +type CreateFileResponse struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + StatusCode *StatusCode `protobuf:"varint,1,req,name=statusCode,enum=curve.mds.StatusCode" json:"statusCode,omitempty"` +} + +func (x *CreateFileResponse) GetStatusCode() StatusCode { + if x != nil && x.StatusCode != nil { + return *x.StatusCode + } + return StatusCode_kOK +} + +type ExtendFileResponse struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + StatusCode *StatusCode `protobuf:"varint,1,req,name=statusCode,enum=curve.mds.StatusCode" json:"statusCode,omitempty"` +} + +func (x *ExtendFileResponse) GetStatusCode() StatusCode { + if x != nil && x.StatusCode != nil { + return *x.StatusCode + } + return StatusCode_kOK +} + +type UpdateFileThrottleParamsResponse struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + StatusCode *StatusCode `protobuf:"varint,1,req,name=statusCode,enum=curve.mds.StatusCode" json:"statusCode,omitempty"` +} + +func (x *UpdateFileThrottleParamsResponse) GetStatusCode() StatusCode { + if x != nil && x.StatusCode != nil { + return *x.StatusCode + } + return StatusCode_kOK +} + +type FindFileMountPointResponse struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + StatusCode *StatusCode `protobuf:"varint,1,req,name=statusCode,enum=curve.mds.StatusCode" json:"statusCode,omitempty"` + ClientInfo []*ClientInfo `protobuf:"bytes,2,rep,name=clientInfo" json:"clientInfo,omitempty"` +} + +func (x *FindFileMountPointResponse) GetStatusCode() StatusCode { + if x != nil && x.StatusCode != nil { + return *x.StatusCode + } + return StatusCode_kOK +} + +func (x *FindFileMountPointResponse) GetClientInfo() []*ClientInfo { + if x != nil { + return x.ClientInfo + } + return nil +} + +type ClientInfo struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Ip *string `protobuf:"bytes,1,req,name=ip" json:"ip,omitempty"` + Port *uint32 `protobuf:"varint,2,req,name=port" json:"port,omitempty"` +} + +func (x *ClientInfo) GetIp() string { + if x != nil && x.Ip != nil { + return *x.Ip + } + return "" +} + +func (x *ClientInfo) GetPort() uint32 { + if x != nil && x.Port != nil { + return *x.Port + } + return 0 +} diff --git a/internal/http/statuscode/statuscode.go b/internal/http/statuscode/statuscode.go new file mode 100644 index 0000000..07f27f8 --- /dev/null +++ b/internal/http/statuscode/statuscode.go @@ -0,0 +1,74 @@ +package statuscode + +type TopoStatusCode int32 + +const ( + TopoStatusCode_Success TopoStatusCode = 0 + TopoStatusCode_InternalError TopoStatusCode = -1 + TopoStatusCode_InvalidParam TopoStatusCode = -2 + TopoStatusCode_InitFail TopoStatusCode = -3 + TopoStatusCode_StorgeFail TopoStatusCode = -4 + TopoStatusCode_IdDuplicated TopoStatusCode = -5 + TopoStatusCode_ChunkServerNotFound TopoStatusCode = -6 + TopoStatusCode_ServerNotFound TopoStatusCode = -7 + TopoStatusCode_ZoneNotFound TopoStatusCode = -8 + TopoStatusCode_PhysicalPoolNotFound TopoStatusCode = -9 + TopoStatusCode_LogicalPoolNotFound TopoStatusCode = -10 + TopoStatusCode_CopySetNotFound TopoStatusCode = -11 + TopoStatusCode_GenCopysetErr TopoStatusCode = -12 + TopoStatusCode_AllocateIdFail TopoStatusCode = -13 + TopoStatusCode_CannotRemoveWhenNotEmpty TopoStatusCode = -14 + TopoStatusCode_IpPortDuplicated TopoStatusCode = -15 + TopoStatusCode_NameDuplicated TopoStatusCode = -16 + TopoStatusCode_CreateCopysetNodeOnChunkServerFail TopoStatusCode = -17 + TopoStatusCode_CannotRemoveNotRetired TopoStatusCode = -18 + TopoStatusCode_LogicalPoolExist TopoStatusCode = -19 +) + +// Enum value maps for TopoStatusCode. +var ( + TopoStatusCode_name = map[int32]string{ + 0: "Success", + -1: "InternalError", + -2: "InvalidParam", + -3: "InitFail", + -4: "StorgeFail", + -5: "IdDuplicated", + -6: "ChunkServerNotFound", + -7: "ServerNotFound", + -8: "ZoneNotFound", + -9: "PhysicalPoolNotFound", + -10: "LogicalPoolNotFound", + -11: "CopySetNotFound", + -12: "GenCopysetErr", + -13: "AllocateIdFail", + -14: "CannotRemoveWhenNotEmpty", + -15: "IpPortDuplicated", + -16: "NameDuplicated", + -17: "CreateCopysetNodeOnChunkServerFail", + -18: "CannotRemoveNotRetired", + -19: "LogicalPoolExist", + } + TopoStatusCode_value = map[string]int32{ + "Success": 0, + "InternalError": -1, + "InvalidParam": -2, + "InitFail": -3, + "StorgeFail": -4, + "IdDuplicated": -5, + "ChunkServerNotFound": -6, + "ServerNotFound": -7, + "ZoneNotFound": -8, + "PhysicalPoolNotFound": -9, + "LogicalPoolNotFound": -10, + "CopySetNotFound": -11, + "GenCopysetErr": -12, + "AllocateIdFail": -13, + "CannotRemoveWhenNotEmpty": -14, + "IpPortDuplicated": -15, + "NameDuplicated": -16, + "CreateCopysetNodeOnChunkServerFail": -17, + "CannotRemoveNotRetired": -18, + "LogicalPoolExist": -19, + } +) diff --git a/internal/http/topology/topology.go b/internal/http/topology/topology.go new file mode 100644 index 0000000..0d1b186 --- /dev/null +++ b/internal/http/topology/topology.go @@ -0,0 +1,644 @@ +package topology + +import ( + "github.com/opencurve/curve-manager/internal/http/common" + "google.golang.org/protobuf/runtime/protoimpl" +) + +type LogicalPoolType int32 + +const ( + PAGEFILE LogicalPoolType = 0 + APPENDFILE LogicalPoolType = 1 + APPENDECFILE LogicalPoolType = 2 +) + +type AllocateStatus int32 + +const ( + ALLOW AllocateStatus = 0 + DENY AllocateStatus = 1 +) + +type ChunkServerStatus int32 + +const ( + READWRITE ChunkServerStatus = 0 + PENDDING ChunkServerStatus = 1 + RETIRED ChunkServerStatus = 2 +) + +type DiskState int32 + +const ( + DISKNORMAL DiskState = 0 + DISKERROR DiskState = 1 +) + +type OnlineState int32 + +const ( + ONLINE OnlineState = 0 + OFFLINE OnlineState = 1 + UNSTABLE OnlineState = 2 +) + +type ListPhysicalPoolResponse struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + StatusCode *int32 `protobuf:"zigzag32,1,req,name=statusCode" json:"statusCode,omitempty"` + PhysicalPoolInfos []*PhysicalPoolInfo `protobuf:"bytes,2,rep,name=physicalPoolInfos" json:"physicalPoolInfos,omitempty"` +} + +type PhysicalPoolInfo struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + PhysicalPoolID *uint32 `protobuf:"varint,1,req,name=physicalPoolID" json:"physicalPoolID,omitempty"` + PhysicalPoolName *string `protobuf:"bytes,2,req,name=physicalPoolName" json:"physicalPoolName,omitempty"` + Desc *string `protobuf:"bytes,3,opt,name=desc" json:"desc,omitempty"` +} + +func (x *ListPhysicalPoolResponse) GetStatusCode() int32 { + if x != nil && x.StatusCode != nil { + return *x.StatusCode + } + return 0 +} + +func (x *ListPhysicalPoolResponse) GetPhysicalPoolInfos() []*PhysicalPoolInfo { + if x != nil { + return x.PhysicalPoolInfos + } + return nil +} + +func (x *PhysicalPoolInfo) GetPhysicalPoolID() uint32 { + if x != nil && x.PhysicalPoolID != nil { + return *x.PhysicalPoolID + } + return 0 +} + +func (x *PhysicalPoolInfo) GetPhysicalPoolName() string { + if x != nil && x.PhysicalPoolName != nil { + return *x.PhysicalPoolName + } + return "" +} + +func (x *PhysicalPoolInfo) GetDesc() string { + if x != nil && x.Desc != nil { + return *x.Desc + } + return "" +} + +type ListLogicalPoolResponse struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + StatusCode *int32 `protobuf:"zigzag32,1,req,name=statusCode" json:"statusCode,omitempty"` + LogicalPoolInfos []*LogicalPoolInfo `protobuf:"bytes,2,rep,name=logicalPoolInfos" json:"logicalPoolInfos,omitempty"` +} + +func (x *ListLogicalPoolResponse) GetStatusCode() int32 { + if x != nil && x.StatusCode != nil { + return *x.StatusCode + } + return 0 +} + +func (x *ListLogicalPoolResponse) GetLogicalPoolInfos() []*LogicalPoolInfo { + if x != nil { + return x.LogicalPoolInfos + } + return nil +} + +type LogicalPoolInfo struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + LogicalPoolID *uint32 `protobuf:"varint,1,req,name=logicalPoolID" json:"logicalPoolID,omitempty"` + LogicalPoolName *string `protobuf:"bytes,2,req,name=logicalPoolName" json:"logicalPoolName,omitempty"` + PhysicalPoolID *uint32 `protobuf:"varint,3,req,name=physicalPoolID" json:"physicalPoolID,omitempty"` + Type *LogicalPoolType `protobuf:"varint,4,req,name=type,enum=curve.mds.topology.LogicalPoolType" json:"type,omitempty"` + CreateTime *uint64 `protobuf:"varint,5,req,name=createTime" json:"createTime,omitempty"` + RedundanceAndPlaceMentPolicy []byte `protobuf:"bytes,6,req,name=redundanceAndPlaceMentPolicy" json:"redundanceAndPlaceMentPolicy,omitempty"` //json body + UserPolicy []byte `protobuf:"bytes,7,req,name=userPolicy" json:"userPolicy,omitempty"` //json body + AllocateStatus *AllocateStatus `protobuf:"varint,8,req,name=allocateStatus,enum=curve.mds.topology.AllocateStatus" json:"allocateStatus,omitempty"` + ScanEnable *bool `protobuf:"varint,9,opt,name=scanEnable" json:"scanEnable,omitempty"` +} + +func (x *LogicalPoolInfo) GetLogicalPoolID() uint32 { + if x != nil && x.LogicalPoolID != nil { + return *x.LogicalPoolID + } + return 0 +} + +func (x *LogicalPoolInfo) GetLogicalPoolName() string { + if x != nil && x.LogicalPoolName != nil { + return *x.LogicalPoolName + } + return "" +} + +func (x *LogicalPoolInfo) GetPhysicalPoolID() uint32 { + if x != nil && x.PhysicalPoolID != nil { + return *x.PhysicalPoolID + } + return 0 +} + +func (x *LogicalPoolInfo) GetType() LogicalPoolType { + if x != nil && x.Type != nil { + return *x.Type + } + return PAGEFILE +} + +func (x *LogicalPoolInfo) GetCreateTime() uint64 { + if x != nil && x.CreateTime != nil { + return *x.CreateTime + } + return 0 +} + +func (x *LogicalPoolInfo) GetRedundanceAndPlaceMentPolicy() []byte { + if x != nil { + return x.RedundanceAndPlaceMentPolicy + } + return nil +} + +func (x *LogicalPoolInfo) GetUserPolicy() []byte { + if x != nil { + return x.UserPolicy + } + return nil +} + +func (x *LogicalPoolInfo) GetAllocateStatus() AllocateStatus { + if x != nil && x.AllocateStatus != nil { + return *x.AllocateStatus + } + return ALLOW +} + +func (x *LogicalPoolInfo) GetScanEnable() bool { + if x != nil && x.ScanEnable != nil { + return *x.ScanEnable + } + return false +} + +type GetLogicalPoolResponse struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + StatusCode *int32 `protobuf:"zigzag32,1,req,name=statusCode" json:"statusCode,omitempty"` + LogicalPoolInfo *LogicalPoolInfo `protobuf:"bytes,2,opt,name=logicalPoolInfo" json:"logicalPoolInfo,omitempty"` +} + +func (x *GetLogicalPoolResponse) GetStatusCode() int32 { + if x != nil && x.StatusCode != nil { + return *x.StatusCode + } + return 0 +} + +func (x *GetLogicalPoolResponse) GetLogicalPoolInfo() *LogicalPoolInfo { + if x != nil { + return x.LogicalPoolInfo + } + return nil +} + +type ListPoolZoneResponse struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + StatusCode *int32 `protobuf:"zigzag32,1,req,name=statusCode" json:"statusCode,omitempty"` + Zones []*ZoneInfo `protobuf:"bytes,2,rep,name=zones" json:"zones,omitempty"` +} + +func (x *ListPoolZoneResponse) GetStatusCode() int32 { + if x != nil && x.StatusCode != nil { + return *x.StatusCode + } + return 0 +} + +func (x *ListPoolZoneResponse) GetZones() []*ZoneInfo { + if x != nil { + return x.Zones + } + return nil +} + +type ZoneInfo struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + ZoneID *uint32 `protobuf:"varint,1,req,name=zoneID" json:"zoneID,omitempty"` + ZoneName *string `protobuf:"bytes,2,req,name=zoneName" json:"zoneName,omitempty"` + PhysicalPoolID *uint32 `protobuf:"varint,3,req,name=physicalPoolID" json:"physicalPoolID,omitempty"` + PhysicalPoolName *string `protobuf:"bytes,4,req,name=physicalPoolName" json:"physicalPoolName,omitempty"` + Desc *string `protobuf:"bytes,5,opt,name=desc" json:"desc,omitempty"` +} + +func (x *ZoneInfo) GetZoneID() uint32 { + if x != nil && x.ZoneID != nil { + return *x.ZoneID + } + return 0 +} + +func (x *ZoneInfo) GetZoneName() string { + if x != nil && x.ZoneName != nil { + return *x.ZoneName + } + return "" +} + +func (x *ZoneInfo) GetPhysicalPoolID() uint32 { + if x != nil && x.PhysicalPoolID != nil { + return *x.PhysicalPoolID + } + return 0 +} + +func (x *ZoneInfo) GetPhysicalPoolName() string { + if x != nil && x.PhysicalPoolName != nil { + return *x.PhysicalPoolName + } + return "" +} + +func (x *ZoneInfo) GetDesc() string { + if x != nil && x.Desc != nil { + return *x.Desc + } + return "" +} + +type ListZoneServerResponse struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + StatusCode *int32 `protobuf:"zigzag32,1,req,name=statusCode" json:"statusCode,omitempty"` + ServerInfo []*ServerInfo `protobuf:"bytes,2,rep,name=serverInfo" json:"serverInfo,omitempty"` +} + +func (x *ListZoneServerResponse) GetStatusCode() int32 { + if x != nil && x.StatusCode != nil { + return *x.StatusCode + } + return 0 +} + +func (x *ListZoneServerResponse) GetServerInfo() []*ServerInfo { + if x != nil { + return x.ServerInfo + } + return nil +} + +type ServerInfo struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + ServerID *uint32 `protobuf:"varint,1,req,name=serverID" json:"serverID,omitempty"` + HostName *string `protobuf:"bytes,2,req,name=hostName" json:"hostName,omitempty"` + InternalIp *string `protobuf:"bytes,3,req,name=internalIp" json:"internalIp,omitempty"` + InternalPort *uint32 `protobuf:"varint,4,req,name=internalPort" json:"internalPort,omitempty"` + ExternalIp *string `protobuf:"bytes,5,req,name=externalIp" json:"externalIp,omitempty"` + ExternalPort *uint32 `protobuf:"varint,6,req,name=externalPort" json:"externalPort,omitempty"` + ZoneID *uint32 `protobuf:"varint,7,req,name=zoneID" json:"zoneID,omitempty"` + ZoneName *string `protobuf:"bytes,8,req,name=zoneName" json:"zoneName,omitempty"` + PhysicalPoolID *uint32 `protobuf:"varint,9,req,name=physicalPoolID" json:"physicalPoolID,omitempty"` + PhysicalPoolName *string `protobuf:"bytes,10,req,name=physicalPoolName" json:"physicalPoolName,omitempty"` + Desc *string `protobuf:"bytes,11,req,name=desc" json:"desc,omitempty"` +} + +func (x *ServerInfo) GetServerID() uint32 { + if x != nil && x.ServerID != nil { + return *x.ServerID + } + return 0 +} + +func (x *ServerInfo) GetHostName() string { + if x != nil && x.HostName != nil { + return *x.HostName + } + return "" +} + +func (x *ServerInfo) GetInternalIp() string { + if x != nil && x.InternalIp != nil { + return *x.InternalIp + } + return "" +} + +func (x *ServerInfo) GetInternalPort() uint32 { + if x != nil && x.InternalPort != nil { + return *x.InternalPort + } + return 0 +} + +func (x *ServerInfo) GetExternalIp() string { + if x != nil && x.ExternalIp != nil { + return *x.ExternalIp + } + return "" +} + +func (x *ServerInfo) GetExternalPort() uint32 { + if x != nil && x.ExternalPort != nil { + return *x.ExternalPort + } + return 0 +} + +func (x *ServerInfo) GetZoneID() uint32 { + if x != nil && x.ZoneID != nil { + return *x.ZoneID + } + return 0 +} + +func (x *ServerInfo) GetZoneName() string { + if x != nil && x.ZoneName != nil { + return *x.ZoneName + } + return "" +} + +func (x *ServerInfo) GetPhysicalPoolID() uint32 { + if x != nil && x.PhysicalPoolID != nil { + return *x.PhysicalPoolID + } + return 0 +} + +func (x *ServerInfo) GetPhysicalPoolName() string { + if x != nil && x.PhysicalPoolName != nil { + return *x.PhysicalPoolName + } + return "" +} + +func (x *ServerInfo) GetDesc() string { + if x != nil && x.Desc != nil { + return *x.Desc + } + return "" +} + +type ListChunkServerResponse struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + StatusCode *int32 `protobuf:"zigzag32,1,req,name=statusCode" json:"statusCode,omitempty"` + ChunkServerInfos []*ChunkServerInfo `protobuf:"bytes,2,rep,name=chunkServerInfos" json:"chunkServerInfos,omitempty"` +} + +func (x *ListChunkServerResponse) GetStatusCode() int32 { + if x != nil && x.StatusCode != nil { + return *x.StatusCode + } + return 0 +} + +func (x *ListChunkServerResponse) GetChunkServerInfos() []*ChunkServerInfo { + if x != nil { + return x.ChunkServerInfos + } + return nil +} + +type ChunkServerInfo struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + ChunkServerID *uint32 `protobuf:"varint,1,req,name=chunkServerID" json:"chunkServerID,omitempty"` + DiskType *string `protobuf:"bytes,2,req,name=diskType" json:"diskType,omitempty"` + HostIp *string `protobuf:"bytes,3,req,name=hostIp" json:"hostIp,omitempty"` + Port *uint32 `protobuf:"varint,4,req,name=port" json:"port,omitempty"` + Status *ChunkServerStatus `protobuf:"varint,5,req,name=status,enum=curve.mds.topology.ChunkServerStatus" json:"status,omitempty"` + DiskStatus *DiskState `protobuf:"varint,6,req,name=diskStatus,enum=curve.mds.topology.DiskState" json:"diskStatus,omitempty"` + OnlineState *OnlineState `protobuf:"varint,7,req,name=onlineState,enum=curve.mds.topology.OnlineState" json:"onlineState,omitempty"` + MountPoint *string `protobuf:"bytes,8,req,name=mountPoint" json:"mountPoint,omitempty"` + DiskCapacity *uint64 `protobuf:"varint,9,req,name=diskCapacity" json:"diskCapacity,omitempty"` + DiskUsed *uint64 `protobuf:"varint,10,req,name=diskUsed" json:"diskUsed,omitempty"` + ExternalIp *string `protobuf:"bytes,11,opt,name=externalIp" json:"externalIp,omitempty"` +} + +func (x *ChunkServerInfo) GetChunkServerID() uint32 { + if x != nil && x.ChunkServerID != nil { + return *x.ChunkServerID + } + return 0 +} + +func (x *ChunkServerInfo) GetDiskType() string { + if x != nil && x.DiskType != nil { + return *x.DiskType + } + return "" +} + +func (x *ChunkServerInfo) GetHostIp() string { + if x != nil && x.HostIp != nil { + return *x.HostIp + } + return "" +} + +func (x *ChunkServerInfo) GetPort() uint32 { + if x != nil && x.Port != nil { + return *x.Port + } + return 0 +} + +func (x *ChunkServerInfo) GetStatus() ChunkServerStatus { + if x != nil && x.Status != nil { + return *x.Status + } + return READWRITE +} + +func (x *ChunkServerInfo) GetDiskStatus() DiskState { + if x != nil && x.DiskStatus != nil { + return *x.DiskStatus + } + return DISKNORMAL +} + +func (x *ChunkServerInfo) GetOnlineState() OnlineState { + if x != nil && x.OnlineState != nil { + return *x.OnlineState + } + return ONLINE +} + +func (x *ChunkServerInfo) GetMountPoint() string { + if x != nil && x.MountPoint != nil { + return *x.MountPoint + } + return "" +} + +func (x *ChunkServerInfo) GetDiskCapacity() uint64 { + if x != nil && x.DiskCapacity != nil { + return *x.DiskCapacity + } + return 0 +} + +func (x *ChunkServerInfo) GetDiskUsed() uint64 { + if x != nil && x.DiskUsed != nil { + return *x.DiskUsed + } + return 0 +} + +func (x *ChunkServerInfo) GetExternalIp() string { + if x != nil && x.ExternalIp != nil { + return *x.ExternalIp + } + return "" +} + +type GetChunkServerInClusterResponse struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + StatusCode *int32 `protobuf:"zigzag32,1,req,name=statusCode" json:"statusCode,omitempty"` + ChunkServerInfos []*ChunkServerInfo `protobuf:"bytes,2,rep,name=chunkServerInfos" json:"chunkServerInfos,omitempty"` +} + +func (x *GetChunkServerInClusterResponse) GetStatusCode() int32 { + if x != nil && x.StatusCode != nil { + return *x.StatusCode + } + return 0 +} + +func (x *GetChunkServerInClusterResponse) GetChunkServerInfos() []*ChunkServerInfo { + if x != nil { + return x.ChunkServerInfos + } + return nil +} + +type GetCopySetsInChunkServerResponse struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + StatusCode *int32 `protobuf:"zigzag32,1,req,name=statusCode" json:"statusCode,omitempty"` + CopysetInfos []*common.CopysetInfo `protobuf:"bytes,2,rep,name=copysetInfos" json:"copysetInfos,omitempty"` +} + +func (x *GetCopySetsInChunkServerResponse) GetStatusCode() int32 { + if x != nil && x.StatusCode != nil { + return *x.StatusCode + } + return 0 +} + +func (x *GetCopySetsInChunkServerResponse) GetCopysetInfos() []*common.CopysetInfo { + if x != nil { + return x.CopysetInfos + } + return nil +} + +type GetChunkServerListInCopySetsResponse struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + StatusCode *int32 `protobuf:"zigzag32,1,req,name=statusCode" json:"statusCode,omitempty"` + CsInfo []*CopySetServerInfo `protobuf:"bytes,2,rep,name=csInfo" json:"csInfo,omitempty"` +} + +func (x *GetChunkServerListInCopySetsResponse) GetStatusCode() int32 { + if x != nil && x.StatusCode != nil { + return *x.StatusCode + } + return 0 +} + +func (x *GetChunkServerListInCopySetsResponse) GetCsInfo() []*CopySetServerInfo { + if x != nil { + return x.CsInfo + } + return nil +} + +type CopySetServerInfo struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + CopysetId *uint32 `protobuf:"varint,1,req,name=copysetId" json:"copysetId,omitempty"` + CsLocs []*common.ChunkServerLocation `protobuf:"bytes,2,rep,name=csLocs" json:"csLocs,omitempty"` +} + +func (x *CopySetServerInfo) GetCopysetId() uint32 { + if x != nil && x.CopysetId != nil { + return *x.CopysetId + } + return 0 +} + +func (x *CopySetServerInfo) GetCsLocs() []*common.ChunkServerLocation { + if x != nil { + return x.CsLocs + } + return nil +} + +type GetCopySetsInClusterResponse struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + StatusCode *int32 `protobuf:"zigzag32,1,req,name=statusCode" json:"statusCode,omitempty"` + CopysetInfos []*common.CopysetInfo `protobuf:"bytes,2,rep,name=copysetInfos" json:"copysetInfos,omitempty"` +} + +func (x *GetCopySetsInClusterResponse) GetStatusCode() int32 { + if x != nil && x.StatusCode != nil { + return *x.StatusCode + } + return 0 +} + +func (x *GetCopySetsInClusterResponse) GetCopysetInfos() []*common.CopysetInfo { + if x != nil { + return x.CopysetInfos + } + return nil +} diff --git a/internal/metrics/bsmetric/mds_test.go b/internal/metrics/bsmetric/mds_test.go new file mode 100644 index 0000000..e96d7d6 --- /dev/null +++ b/internal/metrics/bsmetric/mds_test.go @@ -0,0 +1,11 @@ +package bsmetric + +import ( + "testing" +) + +func TestMdsStatus(t *testing.T) { + c, err := GetMdsStatus() + print(c) + print(err) +} diff --git a/internal/rpc/curvebs/curvebs.go b/internal/rpc/curvebs/curvebs.go deleted file mode 100644 index 279a25d..0000000 --- a/internal/rpc/curvebs/curvebs.go +++ /dev/null @@ -1,49 +0,0 @@ -/* -* Copyright (c) 2023 NetEase Inc. -* -* Licensed under the Apache License, Version 2.0 (the "License"); -* you may not use this file except in compliance with the License. -* You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. - */ - -/* -* Project: Curve-Manager -* Created Date: 2023-03-07 -* Author: wanghai (SeanHai) - */ -package curvebs - -import ( - "strings" - - bsrpc "github.com/SeanHai/curve-go-rpc/rpc/curvebs" - "github.com/opencurve/curve-manager/internal/common" -) - -var ( - GMdsClient *bsrpc.MdsClient -) - -const ( - CURVEBS_MDS_ADDRESS = "mds.address" - - DEFAULT_RPC_TIMEOUT_MS = 500 - DEFAULT_RPC_RETRY_TIMES = 3 -) - -func Init(cfg map[string]string) { - addrs := cfg[CURVEBS_MDS_ADDRESS] - GMdsClient = bsrpc.NewMdsClient(bsrpc.MdsClientOption{ - TimeoutMs: DEFAULT_RPC_TIMEOUT_MS, - RetryTimes: DEFAULT_RPC_RETRY_TIMES, - Addrs: strings.Split(addrs, common.CURVEBS_ADDRESS_DELIMITER), - }) -}