Skip to content

Commit

Permalink
now pfop supports --workflow-template-id and --type
Browse files Browse the repository at this point in the history
  • Loading branch information
bachue committed Oct 11, 2024
1 parent 02f2c99 commit 9b28851
Show file tree
Hide file tree
Showing 5 changed files with 54 additions and 29 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
# 2.16.0
## 新增
1. create-share / share-ls / share-cp 命令支持创建对文件夹的共享,列举和下载
2. 上传命令支持额外的 accelerate 选项
2. 上传命令支持额外的 --accelerate 选项
3. fop 命令支持 --workflow-template-id 和 --type 选项

## 更新
1. listbucket2 现在即使使用了 --limit 参数得到的 marker 也是精确的
Expand Down
15 changes: 9 additions & 6 deletions cmd/fop.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ var preFopStatusCmdBuilder = func(cfg *iqshell.Config) *cobra.Command {
return cmd
}

var preFopCmdBuilder = func(cfg *iqshell.Config) *cobra.Command {
var info = operations.PreFopInfo{}
var pfopCmdBuilder = func(cfg *iqshell.Config) *cobra.Command {
var info = operations.PfopInfo{}
var cmd = &cobra.Command{
Use: "pfop <Bucket> <Key> <fopCommand>",
Short: "Issue a request to process file in bucket",
Expand All @@ -42,14 +42,17 @@ var preFopCmdBuilder = func(cfg *iqshell.Config) *cobra.Command {
if len(args) > 2 {
info.Fops = args[2]
}
operations.PreFop(cfg, info)
operations.Pfop(cfg, info)
},
}
cmd.Flags().StringVarP(&info.Pipeline, "pipeline", "p", "", "task pipeline")
cmd.Flags().StringVarP(&info.NotifyURL, "notify-url", "u", "", "notfiy url")
cmd.Flags().StringVarP(&info.WorkflowTemplateID, "workflow-template-id", "", "", "Workflow template ID")

cmd.Flags().BoolVarP(&info.NotifyForce, "force", "y", false, "force execute")
cmd.Flags().BoolVarP(&info.NotifyForce, "force-old", "f", false, "force execute, deprecated")
cmd.Flags().BoolVarP(&info.Force, "force", "y", false, "force execute")
cmd.Flags().BoolVarP(&info.Force, "force-old", "f", false, "force execute, deprecated")

cmd.Flags().Int64VarP(&info.Type, "type", "", 0, "task type")
_ = cmd.Flags().MarkDeprecated("force-old", "use --force instead")

return cmd
Expand All @@ -61,7 +64,7 @@ func init() {

func fopCmdLoader(superCmd *cobra.Command, cfg *iqshell.Config) {
superCmd.AddCommand(
preFopCmdBuilder(cfg),
pfopCmdBuilder(cfg),
preFopStatusCmdBuilder(cfg),
)
}
11 changes: 7 additions & 4 deletions docs/pfop.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,14 @@
# 格式
```
qshell pfop [--pipeline <Pipeline>] <Bucket> <Key> <Fops>
```
qshell pfop [--pipeline <Pipeline>] --workflow-template-id <WorkflowTemplateID> <Bucket> <Key>
```

# 帮助文档
可以在命令行输入如下命令获取帮助文档:
```
// 简单描述
$ qshell pfop -h
$ qshell pfop -h
// 详细文档(此文档)
$ qshell pfop --doc
Expand All @@ -21,13 +22,15 @@ $ qshell pfop --doc
# 参数
- Bucket:空间名,可以为公开空间或者私有空间【必选】
- Key:空间中文件的名称【必选】
- Fops:数据处理命令列表,以;分隔,可以指定多个数据处理命令。
- Fops:数据处理命令列表,以;分隔,可以指定多个数据处理命令。如果没有指定 `--workflow-template-id` 选项,则必选。
如: `avthumb/mp4|saveas/cWJ1Y2tldDpxa2V5;avthumb/flv|saveas/cWJ1Y2tldDpxa2V5Mg==`,是将上传的视频文件同时转码成mp4格式和flv格式后另存。

# 选项
- -p/--pipeline:处理队列名称, 如果没有制定该选项,默认使用公有队列【可选】
- -u/--notify-url:处理结果通知接收 URL,七牛将会向你设置的 URL 发起 Content-Type: application/json 的 POST 请求。【可选】
- -y/--force:强制执行数据处理。当服务端发现 fops 指定的数据处理结果已经存在,那就认为已经处理成功,避免重复处理浪费资源。 增加此选项(--force),则可强制执行数据处理并覆盖原结果。【可选】
- --workflow-template-id:工作流模版 ID【可选】
- --type:任务类型【可选】

# 示例
1 把 qiniutest 空间下的文件 `test.avi` 转码成 `mp4` 文件,转码后的结果保存到 `qiniutest` 空间中
Expand Down
40 changes: 29 additions & 11 deletions iqshell/storage/object/fop.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package object

import (
"context"

"github.com/qiniu/go-sdk/v7/storage"
"github.com/qiniu/qshell/v2/iqshell/common/account"
"github.com/qiniu/qshell/v2/iqshell/common/alert"
Expand All @@ -26,16 +28,18 @@ func PreFopStatus(info PreFopStatusApiInfo) (storage.PrefopRet, *data.CodeError)
return ret, data.ConvertError(e)
}

type PreFopApiInfo struct {
Bucket string `json:"bucket"`
Key string `json:"key"`
Fops string `json:"fops"`
Pipeline string `json:"pipeline"`
NotifyURL string `json:"notify_url"`
NotifyForce bool `json:"notify_force"`
type PfopApiInfo struct {
Bucket string
Key string
Fops string
Pipeline string
NotifyURL string
Force bool
Type int64
WorkflowTemplateID string
}

func PreFop(info PreFopApiInfo) (string, *data.CodeError) {
func Pfop(info PfopApiInfo) (string, *data.CodeError) {
if len(info.Bucket) == 0 {
return "", alert.CannotEmptyError("bucket", "")
}
Expand All @@ -44,16 +48,30 @@ func PreFop(info PreFopApiInfo) (string, *data.CodeError) {
return "", alert.CannotEmptyError("key", "")
}

if len(info.Fops) == 0 {
if len(info.Fops) == 0 && len(info.WorkflowTemplateID) == 0 {
return "", alert.CannotEmptyError("fops", "")
}

opManager, err := getOperationManager(info.Bucket)
if err != nil {
return "", err
}
persistentId, e := opManager.Pfop(info.Bucket, info.Key, info.Fops, info.Pipeline, info.NotifyURL, info.NotifyForce)
return persistentId, data.ConvertError(e)
force := int64(0)
if info.Force {
force = 1
}
pfopRequest := storage.PfopRequest{
BucketName: info.Bucket,
ObjectName: info.Key,
Fops: info.Fops,
NotifyUrl: info.NotifyURL,
Force: force,
Type: info.Type,
Pipeline: info.Pipeline,
WorkflowTemplateID: info.WorkflowTemplateID,
}
pfopRet, e := opManager.PfopV2(context.Background(), &pfopRequest)
return pfopRet.PersistentID, data.ConvertError(e)
}

func getOperationManager(bucket string) (*storage.OperationManager, *data.CodeError) {
Expand Down
14 changes: 7 additions & 7 deletions iqshell/storage/object/operations/fop.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,16 +33,16 @@ func PreFopStatus(cfg *iqshell.Config, info PreFopStatusInfo) {
})
if err != nil {
data.SetCmdStatusError()
log.ErrorF("pre fog status error:%v", err)
log.ErrorF("prefop status error:%v", err)
return
}

log.Alert(ret.String())
}

type PreFopInfo object.PreFopApiInfo
type PfopInfo object.PfopApiInfo

func (info *PreFopInfo) Check() *data.CodeError {
func (info *PfopInfo) Check() *data.CodeError {
if len(info.Bucket) == 0 {
return alert.CannotEmptyError("Bucket", "")
}
Expand All @@ -51,23 +51,23 @@ func (info *PreFopInfo) Check() *data.CodeError {
return alert.CannotEmptyError("Key", "")
}

if len(info.Fops) == 0 {
if len(info.Fops) == 0 && len(info.WorkflowTemplateID) == 0 {
return alert.CannotEmptyError("Fops", "")
}
return nil
}

func PreFop(cfg *iqshell.Config, info PreFopInfo) {
func Pfop(cfg *iqshell.Config, info PfopInfo) {
if shouldContinue := iqshell.CheckAndLoad(cfg, iqshell.CheckAndLoadInfo{
Checker: &info,
}); !shouldContinue {
return
}

persistentId, err := object.PreFop(object.PreFopApiInfo(info))
persistentId, err := object.Pfop(object.PfopApiInfo(info))
if err != nil {
data.SetCmdStatusError()
log.ErrorF("pre fog error:%v", err)
log.ErrorF("pfop error:%v", err)
return
}
log.Alert(persistentId)
Expand Down

0 comments on commit 9b28851

Please sign in to comment.