From a2d920189c8cf20d5b0ae67d487821430e89ccfe Mon Sep 17 00:00:00 2001 From: yohamta Date: Wed, 27 Apr 2022 10:43:08 +0900 Subject: [PATCH] Fix to use word DAG --- README.md | 8 +- cmd/{jobctl.go => dagu.go} | 2 +- cmd/{jobctl_test.go => dagu_test.go} | 4 +- cmd/dry.go | 4 +- cmd/retry.go | 8 +- cmd/retry_test.go | 16 +-- cmd/start.go | 12 +- cmd/stop.go | 8 +- .../{complex_job.yaml => complex_dag.yaml} | 0 examples/config_err_step_no_command.yaml | 3 + examples/example_1.yaml | 2 +- internal/admin/config.go | 12 +- internal/admin/definition.go | 2 +- internal/admin/handlers/{job.go => dag.go} | 120 +++++++++--------- internal/admin/handlers/html.go | 2 - internal/admin/handlers/list.go | 28 ++-- .../web/templates/{job.gohtml => dag.gohtml} | 54 ++++---- .../admin/handlers/web/templates/index.gohtml | 49 ++++--- internal/admin/http_test.go | 2 +- internal/admin/loader_test.go | 14 +- internal/admin/routes.go | 22 ++-- internal/agent/agent.go | 52 ++++---- internal/agent/agent_test.go | 46 +++---- internal/config/config.go | 2 +- internal/config/config_test.go | 2 +- internal/controller/controller.go | 30 ++--- internal/controller/controller_test.go | 28 ++-- internal/controller/{job.go => dag.go} | 18 +-- internal/reporter/reporter_test.go | 6 +- tests/admin/.dagu/admin.yaml | 4 +- tests/admin/admin.yaml | 4 +- ...r_get_job.yaml => controller_get_dag.yaml} | 0 32 files changed, 282 insertions(+), 282 deletions(-) rename cmd/{jobctl.go => dagu.go} (93%) rename cmd/{jobctl_test.go => dagu_test.go} (94%) rename examples/{complex_job.yaml => complex_dag.yaml} (100%) create mode 100644 examples/config_err_step_no_command.yaml rename internal/admin/handlers/{job.go => dag.go} (75%) rename internal/admin/handlers/web/templates/{job.gohtml => dag.gohtml} (93%) rename internal/controller/{job.go => dag.go} (75%) rename tests/testdata/{controller_get_job.yaml => controller_get_dag.yaml} (100%) diff --git a/README.md b/README.md index 5ffd7893b..ecfe01aba 100644 --- a/README.md +++ b/README.md @@ -100,7 +100,7 @@ Download the binary from [Releases page](https://github.com/dagu/dagu/releases) - **History**: History of the execution of the pipeline. - ![History](https://user-images.githubusercontent.com/1475839/165418472-385cb8e0-351c-4508-b337-a082ea53b4ec.png) + ![History](https://user-images.githubusercontent.com/1475839/165426067-02c4f72f-e3f0-4cd8-aa38-35fa98f0382f.png) ## Architecture @@ -120,7 +120,7 @@ Please create `~/.dagu/admin.yaml`. ```yaml host: # default : 127.0.0.1 port: # default : 8080 -jobs: # default : current working directory +dags: # default : current working directory # optional command: @@ -136,8 +136,8 @@ Please create `~/.dagu/config.yaml`. All settings can be overridden by individua Creating a global configuration is a convenient way to organize common settings. ```yaml -logDir: # log directory to write standard output from the job steps -histRetentionDays: 3 # job history retention days (not for log files) +logDir: # log directory to write standard output +histRetentionDays: 3 # history retention days (not for log files) # E-mail server config (optional) smtp: diff --git a/cmd/jobctl.go b/cmd/dagu.go similarity index 93% rename from cmd/jobctl.go rename to cmd/dagu.go index 0d03c7e36..7086d1a57 100644 --- a/cmd/jobctl.go +++ b/cmd/dagu.go @@ -39,7 +39,7 @@ func run() error { func makeApp() *cli.App { return &cli.App{ Name: "dagu", - Usage: "Simple command to run a group of jobs", + Usage: "A simple command to run workflows (DAGs)", UsageText: "dagu [options] [args]", Commands: []*cli.Command{ newStartCommand(), diff --git a/cmd/jobctl_test.go b/cmd/dagu_test.go similarity index 94% rename from cmd/jobctl_test.go rename to cmd/dagu_test.go index 527c788a2..10252a5ab 100644 --- a/cmd/jobctl_test.go +++ b/cmd/dagu_test.go @@ -63,7 +63,7 @@ func runAppTestOutput(app *cli.App, test appTest, t *testing.T) { w.Close() if err != nil && !test.errored { - t.Fatalf("job failed unexpectedly %v", err) + t.Fatalf("failed unexpectedly %v", err) return } @@ -87,7 +87,7 @@ func runAppTest(app *cli.App, test appTest, t *testing.T) { err := app.Run(test.args) if err != nil && !test.errored { - t.Fatalf("job failed unexpectedly %v", err) + t.Fatalf("failed unexpectedly %v", err) return } } diff --git a/cmd/dry.go b/cmd/dry.go index 17319c57e..411c1221e 100644 --- a/cmd/dry.go +++ b/cmd/dry.go @@ -42,7 +42,7 @@ func newDryCommand() *cli.Command { func dryRun(cfg *config.Config) error { a := &agent.Agent{Config: &agent.Config{ - Job: cfg, + DAG: cfg, Dry: true, }} listenSignals(func(sig os.Signal) { @@ -51,7 +51,7 @@ func dryRun(cfg *config.Config) error { err := a.Run() if err != nil { - log.Printf("[DRY] job failed %v", err) + log.Printf("[DRY] failed %v", err) } return nil } diff --git a/cmd/retry.go b/cmd/retry.go index e9ccdee35..a99cd35e0 100644 --- a/cmd/retry.go +++ b/cmd/retry.go @@ -48,15 +48,15 @@ func newRetryCommand() *cli.Command { if err != nil { return err } - return retryJob(cfg, status.Status) + return retry(cfg, status.Status) }, } } -func retryJob(cfg *config.Config, status *models.Status) error { +func retry(cfg *config.Config, status *models.Status) error { a := &agent.Agent{ Config: &agent.Config{ - Job: cfg, + DAG: cfg, Dry: false, }, RetryConfig: &agent.RetryConfig{ @@ -70,7 +70,7 @@ func retryJob(cfg *config.Config, status *models.Status) error { err := a.Run() if err != nil { - log.Printf("running job failed. %v", err) + log.Printf("running failed. %v", err) } return nil } diff --git a/cmd/retry_test.go b/cmd/retry_test.go index dca80cfb7..6746bda4d 100644 --- a/cmd/retry_test.go +++ b/cmd/retry_test.go @@ -21,12 +21,12 @@ func Test_retryCommand(t *testing.T) { output: []string{}, }, t) - job, err := controller.FromConfig(configPath) + dag, err := controller.FromConfig(configPath) require.NoError(t, err) - require.Equal(t, job.Status.Status, scheduler.SchedulerStatus_Error) + require.Equal(t, dag.Status.Status, scheduler.SchedulerStatus_Error) db := database.New(database.DefaultConfig()) - status, err := db.FindByRequestId(configPath, job.Status.RequestId) + status, err := db.FindByRequestId(configPath, dag.Status.RequestId) require.NoError(t, err) dw, err := db.NewWriterFor(configPath, status.File) require.NoError(t, err) @@ -44,19 +44,19 @@ func Test_retryCommand(t *testing.T) { app = makeApp() runAppTestOutput(app, appTest{ args: []string{"", "retry", fmt.Sprintf("--req=%s", - job.Status.RequestId), testConfig("cmd_retry.yaml")}, errored: false, + dag.Status.RequestId), testConfig("cmd_retry.yaml")}, errored: false, output: []string{"parameter is x"}, }, t) assert.Eventually(t, func() bool { - job, err = controller.FromConfig(testConfig("cmd_retry.yaml")) + dag, err = controller.FromConfig(testConfig("cmd_retry.yaml")) if err != nil { return false } - return job.Status.Status == scheduler.SchedulerStatus_Success + return dag.Status.Status == scheduler.SchedulerStatus_Success }, time.Millisecond*3000, time.Millisecond*100) - job, err = controller.FromConfig(testConfig("cmd_retry.yaml")) + dag, err = controller.FromConfig(testConfig("cmd_retry.yaml")) require.NoError(t, err) - require.NotEqual(t, status.Status.RequestId, job.Status.RequestId) + require.NotEqual(t, status.Status.RequestId, dag.Status.RequestId) } diff --git a/cmd/start.go b/cmd/start.go index 3c3aba3e4..19686e4d0 100644 --- a/cmd/start.go +++ b/cmd/start.go @@ -25,24 +25,24 @@ func newStartCommand() *cli.Command { }, Action: func(c *cli.Context) error { if c.NArg() == 0 { - return errors.New("config file must be specified.") + return errors.New("config file must be specified") } if c.NArg() != 1 { - return errors.New("too many parameters.") + return errors.New("too many parameters") } config_file_path := c.Args().Get(0) cfg, err := cl.Load(config_file_path, c.String("params")) if err != nil { return err } - return startJob(cfg) + return start(cfg) }, } } -func startJob(cfg *config.Config) error { +func start(cfg *config.Config) error { a := &agent.Agent{Config: &agent.Config{ - Job: cfg, + DAG: cfg, Dry: false, }} @@ -52,7 +52,7 @@ func startJob(cfg *config.Config) error { err := a.Run() if err != nil { - log.Printf("running job failed. %v", err) + log.Printf("running failed. %v", err) } return nil } diff --git a/cmd/stop.go b/cmd/stop.go index 67287b1c3..b9893f306 100644 --- a/cmd/stop.go +++ b/cmd/stop.go @@ -26,12 +26,12 @@ func newStopCommand() *cli.Command { if err != nil { return err } - return stopJob(cfg) + return stop(cfg) }, } } -func stopJob(cfg *config.Config) error { +func stop(cfg *config.Config) error { status, err := controller.New(cfg).GetStatus() if err != nil { return err @@ -39,7 +39,7 @@ func stopJob(cfg *config.Config) error { if status.Status != scheduler.SchedulerStatus_Running || !status.Pid.IsRunning() { - log.Printf("job is not running.") + log.Printf("DAG is not running.") return nil } syscall.Kill(int(status.Pid), syscall.SIGINT) @@ -55,6 +55,6 @@ func stopJob(cfg *config.Config) error { } break } - log.Printf("job is stopped.") + log.Printf("DAG is stopped.") return nil } diff --git a/examples/complex_job.yaml b/examples/complex_dag.yaml similarity index 100% rename from examples/complex_job.yaml rename to examples/complex_dag.yaml diff --git a/examples/config_err_step_no_command.yaml b/examples/config_err_step_no_command.yaml new file mode 100644 index 000000000..c659982cb --- /dev/null +++ b/examples/config_err_step_no_command.yaml @@ -0,0 +1,3 @@ +name: test +steps: + - name: step 1 \ No newline at end of file diff --git a/examples/example_1.yaml b/examples/example_1.yaml index a3fbc263b..e474200af 100644 --- a/examples/example_1.yaml +++ b/examples/example_1.yaml @@ -1,4 +1,4 @@ -name: example job +name: example DAG steps: - name: "1" command: echo hello world diff --git a/internal/admin/config.go b/internal/admin/config.go index 0fb6ccb41..c71db5f00 100644 --- a/internal/admin/config.go +++ b/internal/admin/config.go @@ -22,7 +22,7 @@ type Config struct { Host string Port string Env []string - Jobs string + DAGs string Command string WorkDir string IsBasicAuth bool @@ -41,12 +41,12 @@ func (c *Config) setup() error { if c.Command == "" { c.Command = "dagu" } - if c.Jobs == "" { + if c.DAGs == "" { wd, err := os.Getwd() if err != nil { return err } - c.Jobs = wd + c.DAGs = wd } if c.Host == "" { c.Host = "127.0.0.1" @@ -81,14 +81,14 @@ func buildFromDefinition(def *configDefinition) (c *Config, err error) { } c.Port = strconv.Itoa(def.Port) - jd, err := parseVariable(def.Jobs) + jd, err := parseVariable(def.Dags) if err != nil { return nil, err } if !filepath.IsAbs(jd) { - return nil, fmt.Errorf("jobs directory should be absolute path. was %s", jd) + return nil, fmt.Errorf("DAGs directory should be absolute path. was %s", jd) } - c.Jobs, err = filepath.Abs(jd) + c.DAGs, err = filepath.Abs(jd) if err != nil { return nil, err } diff --git a/internal/admin/definition.go b/internal/admin/definition.go index 8204509f1..5bc15604e 100644 --- a/internal/admin/definition.go +++ b/internal/admin/definition.go @@ -4,7 +4,7 @@ type configDefinition struct { Host string Port int Env map[string]string - Jobs string + Dags string Command string WorkDir string IsBasicAuth bool diff --git a/internal/admin/handlers/job.go b/internal/admin/handlers/dag.go similarity index 75% rename from internal/admin/handlers/job.go rename to internal/admin/handlers/dag.go index 508ca2c2a..f3cfcb6bc 100644 --- a/internal/admin/handlers/job.go +++ b/internal/admin/handlers/dag.go @@ -23,21 +23,21 @@ import ( "golang.org/x/text/transform" ) -type jobStatus struct { +type dagStatus struct { Name string Vals []scheduler.NodeStatus } type Log struct { - GridData []*jobStatus + GridData []*dagStatus Logs []*models.StatusFile } -type jobResponse struct { +type dagResponse struct { Title string Charset string - Job *controller.Job - Tab jobTabType + DAG *controller.DAG + Tab dagTabType Graph string Definition string LogData *Log @@ -58,29 +58,29 @@ type stepLog struct { Content string } -type jobTabType int +type dagTabType int const ( - JobTabType_Status jobTabType = iota - JobTabType_Config - JobTabType_History - JobTabType_StepLog - JobTabType_ScLog - JobTabType_None + DAG_TabType_Status dagTabType = iota + DAG_TabType_Config + DAG_TabType_History + DAG_TabType_StepLog + DAG_TabType_ScLog + DAG_TabType_None ) -type jobParameter struct { - Tab jobTabType +type dagParameter struct { + Tab dagTabType Group string File string Step string } -func newJobResponse(cfg string, job *controller.Job, tab jobTabType, - group string) *jobResponse { - return &jobResponse{ +func newDAGResponse(cfg string, dag *controller.DAG, tab dagTabType, + group string) *dagResponse { + return &dagResponse{ Title: cfg, - Job: job, + DAG: dag, Tab: tab, Definition: "", LogData: nil, @@ -88,13 +88,13 @@ func newJobResponse(cfg string, job *controller.Job, tab jobTabType, } } -type JobHandlerConfig struct { - JobsDir string +type DAGHandlerConfig struct { + DAGsDir string LogEncodingCharset string } -func HandleGetJob(hc *JobHandlerConfig) http.HandlerFunc { - renderFunc := useTemplate("job.gohtml", "job") +func HandleGetDAG(hc *DAGHandlerConfig) http.HandlerFunc { + renderFunc := useTemplate("dag.gohtml", "dag") return func(w http.ResponseWriter, r *http.Request) { cfg, err := getPathParameter(r) @@ -103,30 +103,30 @@ func HandleGetJob(hc *JobHandlerConfig) http.HandlerFunc { return } - params := getJobParameter(r) - job, err := controller.FromConfig(filepath.Join(hc.JobsDir, params.Group, cfg)) + params := getDAGParameter(r) + dag, err := controller.FromConfig(filepath.Join(hc.DAGsDir, params.Group, cfg)) if err != nil { encodeError(w, err) return } - c := controller.New(job.Config) - data := newJobResponse(cfg, job, params.Tab, params.Group) + c := controller.New(dag.Config) + data := newDAGResponse(cfg, dag, params.Tab, params.Group) switch params.Tab { - case JobTabType_Status: - data.Graph = models.StepGraph(job.Status.Nodes, params.Tab != JobTabType_Config) - case JobTabType_Config: - steps := models.FromSteps(job.Config.Steps) - data.Graph = models.StepGraph(steps, params.Tab != JobTabType_Config) - data.Definition, _ = config.ReadConfig(path.Join(hc.JobsDir, params.Group, cfg)) - case JobTabType_History: - logs, err := controller.New(job.Config).GetStatusHist(30) + case DAG_TabType_Status: + data.Graph = models.StepGraph(dag.Status.Nodes, params.Tab != DAG_TabType_Config) + case DAG_TabType_Config: + steps := models.FromSteps(dag.Config.Steps) + data.Graph = models.StepGraph(steps, params.Tab != DAG_TabType_Config) + data.Definition, _ = config.ReadConfig(path.Join(hc.DAGsDir, params.Group, cfg)) + case DAG_TabType_History: + logs, err := controller.New(dag.Config).GetStatusHist(30) if err != nil { encodeError(w, err) return } data.LogData = buildLog(logs) - case JobTabType_StepLog: + case DAG_TabType_StepLog: if isJsonRequest(r) { data.StepLog, err = readStepLog(c, params.File, params.Step, hc.LogEncodingCharset) if err != nil { @@ -134,7 +134,7 @@ func HandleGetJob(hc *JobHandlerConfig) http.HandlerFunc { return } } - case JobTabType_ScLog: + case DAG_TabType_ScLog: if isJsonRequest(r) { data.ScLog, err = readSchedulerLog(c, params.File) if err != nil { @@ -157,13 +157,13 @@ func isJsonRequest(r *http.Request) bool { return r.Header.Get("Accept") == "application/json" } -type PostJobHandlerConfig struct { - JobsDir string +type PostDAGHandlerConfig struct { + DAGsDir string Bin string WkDir string } -func HandlePostJobAction(hc *PostJobHandlerConfig) http.HandlerFunc { +func HandlePostDAGAction(hc *PostDAGHandlerConfig) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { action := r.FormValue("action") @@ -176,34 +176,34 @@ func HandlePostJobAction(hc *PostJobHandlerConfig) http.HandlerFunc { return } - file := filepath.Join(hc.JobsDir, group, cfg) - job, err := controller.FromConfig(file) + file := filepath.Join(hc.DAGsDir, group, cfg) + dag, err := controller.FromConfig(file) if err != nil { encodeError(w, err) return } - c := controller.New(job.Config) + c := controller.New(dag.Config) switch action { case "start": - if job.Status.Status == scheduler.SchedulerStatus_Running { + if dag.Status.Status == scheduler.SchedulerStatus_Running { w.WriteHeader(http.StatusBadRequest) - w.Write([]byte("job is already running.")) + w.Write([]byte("DAG is already running.")) return } - err = c.StartJob(hc.Bin, hc.WkDir, "") + err = c.Start(hc.Bin, hc.WkDir, "") if err != nil { w.WriteHeader(http.StatusInternalServerError) w.Write([]byte(err.Error())) return } case "stop": - if job.Status.Status != scheduler.SchedulerStatus_Running { + if dag.Status.Status != scheduler.SchedulerStatus_Running { w.WriteHeader(http.StatusBadRequest) - w.Write([]byte("job is not running.")) + w.Write([]byte("DAG is not running.")) return } - err = c.StopJob() + err = c.Stop() if err != nil { w.WriteHeader(http.StatusNotFound) w.Write([]byte(err.Error())) @@ -215,7 +215,7 @@ func HandlePostJobAction(hc *PostJobHandlerConfig) http.HandlerFunc { w.Write([]byte("request-id is required.")) return } - err = c.RetryJob(hc.Bin, hc.WkDir, reqId) + err = c.Retry(hc.Bin, hc.WkDir, reqId) if err != nil { w.WriteHeader(http.StatusInternalServerError) w.Write([]byte(err.Error())) @@ -226,7 +226,7 @@ func HandlePostJobAction(hc *PostJobHandlerConfig) http.HandlerFunc { return } - http.Redirect(w, r, job.File, http.StatusSeeOther) + http.Redirect(w, r, dag.File, http.StatusSeeOther) } } @@ -327,7 +327,7 @@ func readFile(f string, decorder *encoding.Decoder) ([]byte, error) { func buildLog(logs []*models.StatusFile) *Log { ret := &Log{ - GridData: []*jobStatus{}, + GridData: []*dagStatus{}, Logs: logs, } tmp := map[string][]scheduler.NodeStatus{} @@ -344,7 +344,7 @@ func buildLog(logs []*models.StatusFile) *Log { } } for k, v := range tmp { - ret.GridData = append(ret.GridData, &jobStatus{Name: k, Vals: v}) + ret.GridData = append(ret.GridData, &dagStatus{Name: k, Vals: v}) } sort.Slice(ret.GridData, func(i, c int) bool { return strings.Compare(ret.GridData[i].Name, ret.GridData[c].Name) <= 0 @@ -366,14 +366,14 @@ func buildLog(logs []*models.StatusFile) *Log { } for _, h := range []string{constants.OnSuccess, constants.OnFailure, constants.OnCancel, constants.OnExit} { if v, ok := tmp[h]; ok { - ret.GridData = append(ret.GridData, &jobStatus{Name: h, Vals: v}) + ret.GridData = append(ret.GridData, &dagStatus{Name: h, Vals: v}) } } return ret } func getPathParameter(r *http.Request) (string, error) { - re := regexp.MustCompile("/jobs/([^/\\?]+)/?$") + re := regexp.MustCompile("/dags/([^/\\?]+)/?$") m := re.FindStringSubmatch(r.URL.Path) if len(m) < 2 { return "", fmt.Errorf("invalid URL") @@ -381,17 +381,17 @@ func getPathParameter(r *http.Request) (string, error) { return m[1], nil } -func getJobParameter(r *http.Request) *jobParameter { - p := &jobParameter{ - Tab: JobTabType_Status, +func getDAGParameter(r *http.Request) *dagParameter { + p := &dagParameter{ + Tab: DAG_TabType_Status, Group: "", } if tab, ok := r.URL.Query()["t"]; ok { i, err := strconv.Atoi(tab[0]) - if err != nil || i >= int(JobTabType_None) { - p.Tab = JobTabType_Status + if err != nil || i >= int(DAG_TabType_None) { + p.Tab = DAG_TabType_Status } else { - p.Tab = jobTabType(i) + p.Tab = dagTabType(i) } } if group, ok := r.URL.Query()["group"]; ok { diff --git a/internal/admin/handlers/html.go b/internal/admin/handlers/html.go index 5e3b996d1..733ab4d53 100644 --- a/internal/admin/handlers/html.go +++ b/internal/admin/handlers/html.go @@ -23,8 +23,6 @@ var defaultFuncs = template.FuncMap{ //go:embed web/templates/* var assets embed.FS var templatePath = "web/templates/" -var defaultConfig = &struct { -}{} func useTemplate(layout string, name string) func(http.ResponseWriter, interface{}) { files := append(baseTemplates(), path.Join(templatePath, layout)) diff --git a/internal/admin/handlers/list.go b/internal/admin/handlers/list.go index a476f6069..1b53f207f 100644 --- a/internal/admin/handlers/list.go +++ b/internal/admin/handlers/list.go @@ -10,17 +10,17 @@ import ( "github.com/yohamta/dagu/internal/utils" ) -type jobListResponse struct { +type dagListResponse struct { Title string Charset string - Jobs []*controller.Job + DAGs []*controller.DAG Groups []*group Group string Errors []string HasError bool } -type jobListParameter struct { +type dagListParameter struct { Group string } @@ -29,16 +29,16 @@ type group struct { Dir string } -type JobListHandlerConfig struct { - JobsDir string +type DAGListHandlerConfig struct { + DAGsDir string } -func HandleGetList(hc *JobListHandlerConfig) http.HandlerFunc { +func HandleGetList(hc *DAGListHandlerConfig) http.HandlerFunc { renderFunc := useTemplate("index.gohtml", "index") return func(w http.ResponseWriter, r *http.Request) { params := getGetListParameter(r) - dir := filepath.Join(hc.JobsDir, params.Group) - jobs, errs, err := controller.GetJobList(dir) + dir := filepath.Join(hc.DAGsDir, params.Group) + dags, errs, err := controller.GetDAGs(dir) if err != nil { encodeError(w, err) return @@ -54,7 +54,7 @@ func HandleGetList(hc *JobListHandlerConfig) http.HandlerFunc { } hasErr := false - for _, j := range jobs { + for _, j := range dags { if j.Error != nil { hasErr = true break @@ -64,9 +64,9 @@ func HandleGetList(hc *JobListHandlerConfig) http.HandlerFunc { hasErr = true } - data := &jobListResponse{ - Title: "JobList", - Jobs: jobs, + data := &dagListResponse{ + Title: "DAGList", + DAGs: dags, Groups: groups, Group: params.Group, Errors: errs, @@ -80,8 +80,8 @@ func HandleGetList(hc *JobListHandlerConfig) http.HandlerFunc { } } -func getGetListParameter(r *http.Request) *jobListParameter { - p := &jobListParameter{ +func getGetListParameter(r *http.Request) *dagListParameter { + p := &dagListParameter{ Group: "", } if group, ok := r.URL.Query()["group"]; ok { diff --git a/internal/admin/handlers/web/templates/job.gohtml b/internal/admin/handlers/web/templates/dag.gohtml similarity index 93% rename from internal/admin/handlers/web/templates/job.gohtml rename to internal/admin/handlers/web/templates/dag.gohtml index c594d72d6..e715fb202 100644 --- a/internal/admin/handlers/web/templates/job.gohtml +++ b/internal/admin/handlers/web/templates/dag.gohtml @@ -145,12 +145,12 @@ ) : null} @@ -233,19 +233,19 @@ function ConfigTab({ data }) { const mermaidStyle = { display: 'flex', alignItems: 'center', justifyContent: 'center', - width: (data.Job.Config.Steps.length * 100) + "px" + width: (data.DAG.Config.Steps.length * 100) + "px" }; - const handlers = getHandlersFromConfig(data.Job.Config); + const handlers = getHandlersFromConfig(data.DAG.Config); return (
{data.Graph} - - + +
-

{data.Job.Config.ConfigPath}

+

{data.DAG.Config.ConfigPath}

{data.Definition}
@@ -257,16 +257,16 @@ display: 'flex', alignItems: 'flex-start', justifyContent: 'flex-start', - width: (data.Job.Status.Nodes.length * 100) + "px" + width: (data.DAG.Status.Nodes.length * 100) + "px" }; - const handlers = getHandlersFromStatus(data.Job.Status); + const handlers = getHandlersFromStatus(data.DAG.Status); return (
{subTab == 0 ? ( {data.Graph} - ) : ()} - - + ) : ()} + +
); @@ -427,7 +427,7 @@ ] function Timeline({ status }) { if (status.Status == 0 || status.Status == 1) { - // the job is not running or running + // the DAG is not running or running return null; } const graph = React.useMemo(() => { @@ -471,7 +471,7 @@ Request ID - Job Name + DAG Name Started At Finished At Status @@ -506,7 +506,7 @@ { width: "100px" }, {}, ] - function NodeTable({ nodes, file = "", job }) { + function NodeTable({ nodes, file = "", dag }) { const tableStyle = { "tableLayout": "fixed", "wordWrap": "break-word", @@ -537,7 +537,7 @@ - {nodes.map((n, idx) => )} + {nodes.map((n, idx) => )}
@@ -550,7 +550,7 @@ ) } - function NodeTableRow({ rownum, node, file, job }) { + function NodeTableRow({ rownum, node, file }) { const url = encodeURI("?t=" + TAB_ID__STEPLOG + "&group={{.Group}}&file=" + file + "&step=" + node.Step.Name) return ( @@ -577,23 +577,23 @@ } } return { - "start": func('Do you really want to start the job?'), - "stop": func('Do you really want to cancel the job?'), + "start": func('Do you really want to start the DAG?'), + "stop": func('Do you really want to cancel the DAG?'), "retry": func( - "Do you really want to retry the last job (" + - data.Job.Status.RequestId + ") ?"), + "Do you really want to retry the last execution (" + + data.DAG.Status.RequestId + ") ?"), } - }, [data.Job.Status.RequestId]); + }, [data.DAG.Status.RequestId]); const buttonStyle = React.useMemo(() => ({ "start": { width: "100px", backgroundColor: "gray", border: 0, color: "white", }, "stop": { width: "100px", backgroundColor: "gray", border: 0, color: "white", }, "retry": { width: "100px", backgroundColor: "gray", border: 0, color: "white", }, }), []); const buttonState = React.useMemo(() => ({ - "start": data.Job.Status.Status != SCHEDULER_STATUS__RUNNING, - "stop": data.Job.Status.Status == SCHEDULER_STATUS__RUNNING, - "retry": data.Job.Status.Status != SCHEDULER_STATUS__RUNNING - && data.Job.Status.RequestId != "", + "start": data.DAG.Status.Status != SCHEDULER_STATUS__RUNNING, + "stop": data.DAG.Status.Status == SCHEDULER_STATUS__RUNNING, + "retry": data.DAG.Status.Status != SCHEDULER_STATUS__RUNNING + && data.DAG.Status.RequestId != "", }), [data]); return (
@@ -623,7 +623,7 @@
- +
} - console.log({jobs, errors, hasError}) return (
Please check the below errors!
    - {jobs.filter((job) => job.Error).map((job) => { - const url = encodeURI(job.File); - return
  • {job.File}: {job.ErrorT}
  • + {dags.filter((dag) => dag.Error).map((dag) => { + const url = encodeURI(dag.File); + return
  • {dag.File}: {dag.ErrorT}
  • })} {errors.map(e =>
  • {e}
  • )}
@@ -80,23 +79,23 @@
- - + +
); } - function JobItem({ job, group }) { - const url = encodeURI("/jobs/" + job.File + "?group=" + group) + function DAGItem({ dag, group }) { + const url = encodeURI("/dags/" + dag.File + "?group=" + group) return ( - {job.File} - Job - {job.Config.Name} - {job.Config.Description} - {job.Status.StatusText} - {job.Status.Pid == "-1" ? "" : job.Status.Pid} - {job.Status.StartedAt} - {job.Status.FinishedAt} + {dag.File} + DAG + {dag.Config.Name} + {dag.Config.Description} + {dag.Status.StatusText} + {dag.Status.Pid == "-1" ? "" : dag.Status.Pid} + {dag.Status.StartedAt} + {dag.Status.FinishedAt} ) } function StatusTag({ status, children }) { @@ -108,7 +107,7 @@ ) } function GroupItemBack({ group }) { - const url = encodeURI("/jobs/") + const url = encodeURI("/dags/") return ( ../ (upper group) Group @@ -121,7 +120,7 @@ ) } function GroupItem({ group }) { - const url = encodeURI("/jobs/?group=" + group.Name) + const url = encodeURI("/dags/?group=" + group.Name) return ( {group.Name} Group @@ -133,14 +132,14 @@ - ) } - function JobTable({ jobs = [], groups = [], group = "" }) { + function DAGsTable({ dags = [], groups = [], group = "" }) { const root = { Name: "" }; const currGrp = "{{.Group}}"; return ( - + @@ -157,8 +156,8 @@ {groups.map((item) => { return })} - {jobs.filter((job) => !job.Error).map((job) => { - return + {dags.filter((dag) => !dag.Error).map((dag) => { + return })}
JobDAG Type Name Description
diff --git a/internal/admin/http_test.go b/internal/admin/http_test.go index 4139564da..2477affbb 100644 --- a/internal/admin/http_test.go +++ b/internal/admin/http_test.go @@ -52,7 +52,7 @@ func TestHttpServerShutdownWithAPI(t *testing.T) { server := admin.NewServer(&admin.Config{ Host: host, Port: port, - Jobs: dir, + DAGs: dir, }) go func() { diff --git a/internal/admin/loader_test.go b/internal/admin/loader_test.go index bb40802a7..e1af0d98f 100644 --- a/internal/admin/loader_test.go +++ b/internal/admin/loader_test.go @@ -34,7 +34,7 @@ func TestDefaultConfig(t *testing.T) { testConfig(t, cfg, &testWant{ Host: "127.0.0.1", Port: "8000", - Jobs: path.Join(wd), + DAGs: path.Join(wd), Command: "dagu", }) } @@ -47,9 +47,9 @@ func TestHomeAdminConfig(t *testing.T) { testConfig(t, cfg, &testWant{ Host: "localhost", Port: "8081", - Jobs: path.Join(testsDir, "/dagu/jobs"), + DAGs: path.Join(testsDir, "/dagu/dags"), Command: path.Join(testsDir, "/dagu/bin/dagu"), - WorkDir: path.Join(testsDir, "/dagu/jobs"), + WorkDir: path.Join(testsDir, "/dagu/dags"), }) } @@ -61,9 +61,9 @@ func TestLoadAdminConfig(t *testing.T) { testConfig(t, cfg, &testWant{ Host: "localhost", Port: "8082", - Jobs: path.Join(testsDir, "/dagu/jobs"), + DAGs: path.Join(testsDir, "/dagu/dags"), Command: path.Join(testsDir, "/dagu/bin/dagu"), - WorkDir: path.Join(testsDir, "/dagu/jobs"), + WorkDir: path.Join(testsDir, "/dagu/dags"), }) } @@ -71,7 +71,7 @@ func testConfig(t *testing.T, cfg *admin.Config, want *testWant) { t.Helper() assert.Equal(t, want.Host, cfg.Host) assert.Equal(t, want.Port, cfg.Port) - assert.Equal(t, want.Jobs, cfg.Jobs) + assert.Equal(t, want.DAGs, cfg.DAGs) assert.Equal(t, want.WorkDir, cfg.WorkDir) assert.Equal(t, want.Command, cfg.Command) } @@ -79,7 +79,7 @@ func testConfig(t *testing.T, cfg *admin.Config, want *testWant) { type testWant struct { Host string Port string - Jobs string + DAGs string Command string WorkDir string } diff --git a/internal/admin/routes.go b/internal/admin/routes.go index f87d9010a..ad20b3a6d 100644 --- a/internal/admin/routes.go +++ b/internal/admin/routes.go @@ -15,24 +15,24 @@ type route struct { func defaultRoutes(cfg *Config) []*route { return []*route{ {http.MethodGet, `^/?$`, handlers.HandleGetList( - &handlers.JobListHandlerConfig{ - JobsDir: cfg.Jobs, + &handlers.DAGListHandlerConfig{ + DAGsDir: cfg.DAGs, }, )}, - {http.MethodGet, `^/jobs/?$`, handlers.HandleGetList( - &handlers.JobListHandlerConfig{ - JobsDir: cfg.Jobs, + {http.MethodGet, `^/dags/?$`, handlers.HandleGetList( + &handlers.DAGListHandlerConfig{ + DAGsDir: cfg.DAGs, }, )}, - {http.MethodGet, `^/jobs/([^/]+)$`, handlers.HandleGetJob( - &handlers.JobHandlerConfig{ - JobsDir: cfg.Jobs, + {http.MethodGet, `^/dags/([^/]+)$`, handlers.HandleGetDAG( + &handlers.DAGHandlerConfig{ + DAGsDir: cfg.DAGs, LogEncodingCharset: cfg.LogEncodingCharset, }, )}, - {http.MethodPost, `^/jobs/([^/]+)$`, handlers.HandlePostJobAction( - &handlers.PostJobHandlerConfig{ - JobsDir: cfg.Jobs, + {http.MethodPost, `^/dags/([^/]+)$`, handlers.HandlePostDAGAction( + &handlers.PostDAGHandlerConfig{ + DAGsDir: cfg.DAGs, Bin: cfg.Command, WkDir: cfg.WorkDir, }, diff --git a/internal/agent/agent.go b/internal/agent/agent.go index 0006825f2..eb85d5624 100644 --- a/internal/agent/agent.go +++ b/internal/agent/agent.go @@ -40,7 +40,7 @@ type Agent struct { } type Config struct { - Job *config.Config + DAG *config.Config Dry bool } @@ -76,7 +76,7 @@ func (a *Agent) Run() error { func (a *Agent) Status() *models.Status { status := models.NewStatus( - a.Job, + a.DAG, a.graph.Nodes(), a.scheduler.Status(a.graph), os.Getpid(), @@ -152,25 +152,25 @@ func (a *Agent) Kill(done chan bool) { func (a *Agent) init() { a.scheduler = scheduler.New( &scheduler.Config{ - LogDir: path.Join(a.Job.LogDir, utils.ValidFilename(a.Job.Name, "_")), - MaxActiveRuns: a.Job.MaxActiveRuns, - DelaySec: a.Job.DelaySec, + LogDir: path.Join(a.DAG.LogDir, utils.ValidFilename(a.DAG.Name, "_")), + MaxActiveRuns: a.DAG.MaxActiveRuns, + DelaySec: a.DAG.DelaySec, Dry: a.Dry, - OnExit: a.Job.HandlerOn.Exit, - OnSuccess: a.Job.HandlerOn.Success, - OnFailure: a.Job.HandlerOn.Failure, - OnCancel: a.Job.HandlerOn.Cancel, + OnExit: a.DAG.HandlerOn.Exit, + OnSuccess: a.DAG.HandlerOn.Success, + OnFailure: a.DAG.HandlerOn.Failure, + OnCancel: a.DAG.HandlerOn.Cancel, }) a.reporter = reporter.New(&reporter.Config{ Mailer: mail.New( &mail.Config{ - Host: a.Job.Smtp.Host, - Port: a.Job.Smtp.Port, + Host: a.DAG.Smtp.Host, + Port: a.DAG.Smtp.Port, }), }) a.logFilename = filepath.Join( - a.Job.LogDir, fmt.Sprintf("%s.%s.log", - utils.ValidFilename(a.Job.Name, "_"), + a.DAG.LogDir, fmt.Sprintf("%s.%s.log", + utils.ValidFilename(a.DAG.Name, "_"), time.Now().Format("20060102.15:04:05"), )) } @@ -180,7 +180,7 @@ func (a *Agent) setupGraph() (err error) { log.Printf("setup for retry") return a.setupRetry() } - a.graph, err = scheduler.NewExecutionGraph(a.Job.Steps...) + a.graph, err = scheduler.NewExecutionGraph(a.DAG.Steps...) return } @@ -204,23 +204,23 @@ func (a *Agent) setupRequestId() error { func (a *Agent) setupDatabase() (err error) { a.database = database.New(database.DefaultConfig()) - a.dbWriter, a.dbFile, err = a.database.NewWriter(a.Job.ConfigPath, time.Now()) + a.dbWriter, a.dbFile, err = a.database.NewWriter(a.DAG.ConfigPath, time.Now()) return } func (a *Agent) setupSocketServer() (err error) { a.socketServer, err = sock.NewServer( &sock.Config{ - Addr: sock.GetSockAddr(a.Job.ConfigPath), + Addr: sock.GetSockAddr(a.DAG.ConfigPath), HandlerFunc: a.handleHTTP, }) return } func (a *Agent) checkPreconditions() error { - if len(a.Job.Preconditions) > 0 { - log.Printf("checking pre conditions for \"%s\"", a.Job.Name) - if err := config.EvalConditions(a.Job.Preconditions); err != nil { + if len(a.DAG.Preconditions) > 0 { + log.Printf("checking pre conditions for \"%s\"", a.DAG.Name) + if err := config.EvalConditions(a.DAG.Preconditions); err != nil { done := make(chan bool) go a.scheduler.Cancel(a.graph, done) <-done @@ -275,7 +275,7 @@ func (a *Agent) run() error { for node := range done { status := a.Status() a.dbWriter.Write(status) - a.reporter.ReportStep(a.Job, status, node) + a.reporter.ReportStep(a.DAG, status, node) } }() @@ -288,13 +288,13 @@ func (a *Agent) run() error { } a.reporter.ReportSummary(status, lastErr) - if err := a.reporter.ReportMail(a.Job, status); err != nil { + if err := a.reporter.ReportMail(a.DAG, status); err != nil { log.Printf("failed to send mail. %s", err) } if err := a.dbWriter.Close(); err != nil { log.Printf("failed to close db writer. err: %v", err) - } else if err := a.database.Compact(a.Job.ConfigPath, a.dbFile); err != nil { + } else if err := a.database.Compact(a.DAG.ConfigPath, a.dbFile); err != nil { log.Printf("failed to compact data. %s", err) } @@ -307,7 +307,7 @@ func (a *Agent) dryRun() error { go func() { for node := range done { status := a.Status() - a.reporter.ReportStep(a.Job, status, node) + a.reporter.ReportStep(a.DAG, status, node) } }() @@ -323,13 +323,13 @@ func (a *Agent) dryRun() error { } func (a *Agent) checkIsRunning() error { - status, err := controller.New(a.Job).GetStatus() + status, err := controller.New(a.DAG).GetStatus() if err != nil { return err } if status.Status != scheduler.SchedulerStatus_None { - return fmt.Errorf("the job is already running. socket=%s", - sock.GetSockAddr(a.Job.ConfigPath)) + return fmt.Errorf("the DAG is already running. socket=%s", + sock.GetSockAddr(a.DAG.ConfigPath)) } return nil } diff --git a/internal/agent/agent_test.go b/internal/agent/agent_test.go index a69766939..1ce6cb847 100644 --- a/internal/agent/agent_test.go +++ b/internal/agent/agent_test.go @@ -28,44 +28,44 @@ func TestMain(m *testing.M) { os.Exit(code) } -func TestRunJob(t *testing.T) { - job, err := controller.FromConfig(testConfig("agent_run.yaml")) +func TestRunDAG(t *testing.T) { + dag, err := controller.FromConfig(testConfig("agent_run.yaml")) require.NoError(t, err) - status, err := testJob(t, job) + status, err := testDAG(t, dag) require.NoError(t, err) assert.Equal(t, scheduler.SchedulerStatus_Success, status.Status) } -func TestCancelJob(t *testing.T) { +func TestCancelDAG(t *testing.T) { for _, abort := range []func(*agent.Agent){ func(a *agent.Agent) { a.Signal(syscall.SIGTERM) }, func(a *agent.Agent) { a.Cancel(syscall.SIGTERM) }, func(a *agent.Agent) { a.Kill(nil) }, } { - a, job := testJobAsync(t, testConfig("agent_sleep.yaml")) + a, dag := testDAGAsync(t, testConfig("agent_sleep.yaml")) time.Sleep(time.Millisecond * 100) abort(a) time.Sleep(time.Millisecond * 500) - status, err := controller.New(job.Config).GetLastStatus() + status, err := controller.New(dag.Config).GetLastStatus() require.NoError(t, err) assert.Equal(t, scheduler.SchedulerStatus_Cancel, status.Status) } } func TestPreConditionInvalid(t *testing.T) { - job, err := controller.FromConfig(testConfig("agent_multiple_steps.yaml")) + dag, err := controller.FromConfig(testConfig("agent_multiple_steps.yaml")) require.NoError(t, err) - job.Config.Preconditions = []*config.Condition{ + dag.Config.Preconditions = []*config.Condition{ { Condition: "`echo 1`", Expected: "0", }, } - status, err := testJob(t, job) + status, err := testDAG(t, dag) require.Error(t, err) assert.Equal(t, scheduler.SchedulerStatus_Cancel, status.Status) @@ -75,16 +75,16 @@ func TestPreConditionInvalid(t *testing.T) { } func TestPreConditionValid(t *testing.T) { - job, err := controller.FromConfig(testConfig("agent_with_params.yaml")) + dag, err := controller.FromConfig(testConfig("agent_with_params.yaml")) require.NoError(t, err) - job.Config.Preconditions = []*config.Condition{ + dag.Config.Preconditions = []*config.Condition{ { Condition: "`echo 1`", Expected: "1", }, } - status, err := testJob(t, job) + status, err := testDAG(t, dag) require.NoError(t, err) assert.Equal(t, scheduler.SchedulerStatus_Success, status.Status) @@ -94,9 +94,9 @@ func TestPreConditionValid(t *testing.T) { } func TestOnExit(t *testing.T) { - job, err := controller.FromConfig(testConfig("agent_on_exit.yaml")) + dag, err := controller.FromConfig(testConfig("agent_on_exit.yaml")) require.NoError(t, err) - status, err := testJob(t, job) + status, err := testDAG(t, dag) require.NoError(t, err) assert.Equal(t, scheduler.SchedulerStatus_Success, status.Status) @@ -108,10 +108,10 @@ func TestOnExit(t *testing.T) { func TestRetry(t *testing.T) { cfg := testConfig("agent_retry.yaml") - job, err := controller.FromConfig(cfg) + dag, err := controller.FromConfig(cfg) require.NoError(t, err) - status, err := testJob(t, job) + status, err := testDAG(t, dag) require.Error(t, err) assert.Equal(t, scheduler.SchedulerStatus_Error, status.Status) @@ -120,7 +120,7 @@ func TestRetry(t *testing.T) { } a := &agent.Agent{ Config: &agent.Config{ - Job: job.Config, + DAG: dag.Config, }, RetryConfig: &agent.RetryConfig{ Status: status, @@ -139,10 +139,10 @@ func TestRetry(t *testing.T) { } } -func testJob(t *testing.T, job *controller.Job) (*models.Status, error) { +func testDAG(t *testing.T, dag *controller.DAG) (*models.Status, error) { t.Helper() a := &agent.Agent{Config: &agent.Config{ - Job: job.Config, + DAG: dag.Config, }} err := a.Run() return a.Status(), err @@ -152,19 +152,19 @@ func testConfig(name string) string { return path.Join(testsDir, name) } -func testJobAsync(t *testing.T, file string) (*agent.Agent, *controller.Job) { +func testDAGAsync(t *testing.T, file string) (*agent.Agent, *controller.DAG) { t.Helper() - job, err := controller.FromConfig(file) + dag, err := controller.FromConfig(file) require.NoError(t, err) a := &agent.Agent{Config: &agent.Config{ - Job: job.Config, + DAG: dag.Config, }} go func() { a.Run() }() - return a, job + return a, dag } diff --git a/internal/config/config.go b/internal/config/config.go index 5ce1c6a6c..3bf08e44c 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -352,7 +352,7 @@ func loadVariables(strVariables map[string]string) (map[string]string, error) { func assertDef(def *configDefinition) error { if def.Name == "" { - return fmt.Errorf("job name must be specified.") + return fmt.Errorf("DAG name must be specified.") } if len(def.Steps) == 0 { return fmt.Errorf("at least one step must be specified.") diff --git a/internal/config/config_test.go b/internal/config/config_test.go index 98d7f6aa6..18681213a 100644 --- a/internal/config/config_test.go +++ b/internal/config/config_test.go @@ -33,7 +33,7 @@ func TestAssertDefinition(t *testing.T) { loader := config.NewConfigLoader() _, err := loader.Load(path.Join(testDir, "config_err_no_name.yaml"), "") - require.Equal(t, err, fmt.Errorf("job name must be specified.")) + require.Equal(t, err, fmt.Errorf("DAG name must be specified.")) _, err = loader.Load(path.Join(testDir, "config_err_no_steps.yaml"), "") require.Equal(t, err, fmt.Errorf("at least one step must be specified.")) diff --git a/internal/controller/controller.go b/internal/controller/controller.go index b4f0228d1..88acdc391 100644 --- a/internal/controller/controller.go +++ b/internal/controller/controller.go @@ -20,19 +20,19 @@ import ( ) type Controller interface { - StopJob() error - StartJob(bin string, workDir string, params string) error - RetryJob(bin string, workDir string, reqId string) error + Stop() error + Start(bin string, workDir string, params string) error + Retry(bin string, workDir string, reqId string) error GetStatus() (*models.Status, error) GetLastStatus() (*models.Status, error) GetStatusHist(n int) ([]*models.StatusFile, error) } -func GetJobList(dir string) (jobs []*Job, errs []string, err error) { - jobs = []*Job{} +func GetDAGs(dir string) (dags []*DAG, errs []string, err error) { + dags = []*DAG{} errs = []string{} if !utils.FileExists(dir) { - errs = append(errs, fmt.Sprintf("invalid jobs directory: %s", dir)) + errs = append(errs, fmt.Sprintf("invalid DAGs directory: %s", dir)) return } fis, err := ioutil.ReadDir(dir) @@ -43,17 +43,17 @@ func GetJobList(dir string) (jobs []*Job, errs []string, err error) { if filepath.Ext(fi.Name()) != ".yaml" { continue } - job, err := fromConfig(filepath.Join(dir, fi.Name()), true) + dag, err := fromConfig(filepath.Join(dir, fi.Name()), true) if err != nil { log.Printf("%v", err) - if job == nil { + if dag == nil { errs = append(errs, err.Error()) continue } } - jobs = append(jobs, job) + dags = append(dags, dag) } - return jobs, errs, nil + return dags, errs, nil } var _ Controller = (*controller)(nil) @@ -68,13 +68,13 @@ func New(cfg *config.Config) Controller { } } -func (c *controller) StopJob() error { +func (c *controller) Stop() error { client := sock.Client{Addr: sock.GetSockAddr(c.cfg.ConfigPath)} _, err := client.Request("POST", "/stop") return err } -func (c *controller) StartJob(bin string, workDir string, params string) (err error) { +func (c *controller) Start(bin string, workDir string, params string) (err error) { go func() { args := []string{"start"} if params != "" { @@ -88,14 +88,14 @@ func (c *controller) StartJob(bin string, workDir string, params string) (err er defer cmd.Wait() err = cmd.Start() if err != nil { - log.Printf("failed to start a job: %v", err) + log.Printf("failed to start a DAG: %v", err) } }() time.Sleep(time.Millisecond * 500) return } -func (c *controller) RetryJob(bin string, workDir string, reqId string) (err error) { +func (c *controller) Retry(bin string, workDir string, reqId string) (err error) { go func() { args := []string{"retry"} args = append(args, fmt.Sprintf("--req=%s", reqId)) @@ -107,7 +107,7 @@ func (c *controller) RetryJob(bin string, workDir string, reqId string) (err err defer cmd.Wait() err := cmd.Start() if err != nil { - log.Printf("failed to retry a job: %v", err) + log.Printf("failed to retry a DAG: %v", err) } }() time.Sleep(time.Millisecond * 500) diff --git a/internal/controller/controller_test.go b/internal/controller/controller_test.go index 1a9415e97..3758b7bcc 100644 --- a/internal/controller/controller_test.go +++ b/internal/controller/controller_test.go @@ -34,10 +34,10 @@ func testConfig(name string) string { func TestGetStatus(t *testing.T) { file := testConfig("controller_success.yaml") - job, err := controller.FromConfig(file) + dag, err := controller.FromConfig(file) require.NoError(t, err) - st, err := controller.New(job.Config).GetStatus() + st, err := controller.New(dag.Config).GetStatus() require.NoError(t, err) assert.Equal(t, scheduler.SchedulerStatus_None, st.Status) } @@ -45,11 +45,11 @@ func TestGetStatus(t *testing.T) { func TestGetStatusRunningAndDone(t *testing.T) { file := testConfig("controller_status.yaml") - job, err := controller.FromConfig(file) + dag, err := controller.FromConfig(file) require.NoError(t, err) a := agent.Agent{Config: &agent.Config{ - Job: job.Config, + DAG: dag.Config, }} go func() { @@ -58,30 +58,30 @@ func TestGetStatusRunningAndDone(t *testing.T) { }() time.Sleep(time.Millisecond * 500) - st, err := controller.New(job.Config).GetStatus() + st, err := controller.New(dag.Config).GetStatus() require.NoError(t, err) time.Sleep(time.Millisecond * 50) assert.Equal(t, scheduler.SchedulerStatus_Running, st.Status) assert.Eventually(t, func() bool { - st, _ := controller.New(job.Config).GetLastStatus() + st, _ := controller.New(dag.Config).GetLastStatus() return scheduler.SchedulerStatus_Success == st.Status }, time.Millisecond*1500, time.Millisecond*100) } -func TestGetJob(t *testing.T) { - file := testConfig("controller_get_job.yaml") - job, err := controller.FromConfig(file) +func TestGetDAG(t *testing.T) { + file := testConfig("controller_get_dag.yaml") + dag, err := controller.FromConfig(file) require.NoError(t, err) - assert.Equal(t, "basic success", job.Config.Name) + assert.Equal(t, "basic success", dag.Config.Name) } -func TestGetJobList(t *testing.T) { - jobs, errs, err := controller.GetJobList(testsDir) +func TestGetDAGList(t *testing.T) { + dags, errs, err := controller.GetDAGs(testsDir) require.NoError(t, err) require.Equal(t, 0, len(errs)) - matches, err := filepath.Glob(path.Join(testsDir, "*.yaml")) - assert.Equal(t, len(matches), len(jobs)) + matches, _ := filepath.Glob(path.Join(testsDir, "*.yaml")) + assert.Equal(t, len(matches), len(dags)) } diff --git a/internal/controller/job.go b/internal/controller/dag.go similarity index 75% rename from internal/controller/job.go rename to internal/controller/dag.go index ce1a88624..0cb763baa 100644 --- a/internal/controller/job.go +++ b/internal/controller/dag.go @@ -10,7 +10,7 @@ import ( "github.com/yohamta/dagu/internal/utils" ) -type Job struct { +type DAG struct { File string Dir string Config *config.Config @@ -19,14 +19,14 @@ type Job struct { ErrorT *string } -func FromConfig(file string) (*Job, error) { +func FromConfig(file string) (*DAG, error) { if !utils.FileExists(file) { return nil, fmt.Errorf("file not found: %s", file) } return fromConfig(file, false) } -func fromConfig(file string, headOnly bool) (*Job, error) { +func fromConfig(file string, headOnly bool) (*DAG, error) { cl := config.NewConfigLoader() var cfg *config.Config var err error @@ -37,11 +37,11 @@ func fromConfig(file string, headOnly bool) (*Job, error) { } if err != nil { if cfg != nil { - return newJob(cfg, nil, err), err + return newDAG(cfg, nil, err), err } cfg := &config.Config{ConfigPath: file} cfg.Init() - return newJob(cfg, nil, err), err + return newDAG(cfg, nil, err), err } status, err := New(cfg).GetLastStatus() if err != nil { @@ -49,14 +49,14 @@ func fromConfig(file string, headOnly bool) (*Job, error) { } if !headOnly { if _, err := scheduler.NewExecutionGraph(cfg.Steps...); err != nil { - return newJob(cfg, status, err), err + return newDAG(cfg, status, err), err } } - return newJob(cfg, status, err), nil + return newDAG(cfg, status, err), nil } -func newJob(cfg *config.Config, s *models.Status, err error) *Job { - ret := &Job{ +func newDAG(cfg *config.Config, s *models.Status, err error) *DAG { + ret := &DAG{ File: filepath.Base(cfg.ConfigPath), Dir: filepath.Dir(cfg.ConfigPath), Config: cfg, diff --git a/internal/reporter/reporter_test.go b/internal/reporter/reporter_test.go index 20f5bb433..d1cab8d77 100644 --- a/internal/reporter/reporter_test.go +++ b/internal/reporter/reporter_test.go @@ -24,7 +24,7 @@ func TestReporter(t *testing.T) { t.Run(scenario, func(t *testing.T) { cfg := &config.Config{ - Name: "test-job", + Name: "test DAG", MailOn: config.MailOn{ Failure: true, }, @@ -74,7 +74,7 @@ func testErrorMail(t *testing.T, rp *Reporter, cfg *config.Config, nodes []*mode mock := rp.Mailer.(*mockMailer) require.Contains(t, mock.subject, "Error") - require.Contains(t, mock.subject, "test-job") + require.Contains(t, mock.subject, "test DAG") require.Equal(t, 1, mock.count) } @@ -102,7 +102,7 @@ func testSuccessMail(t *testing.T, rp *Reporter, cfg *config.Config, nodes []*mo mock := rp.Mailer.(*mockMailer) require.Contains(t, mock.subject, "Success") - require.Contains(t, mock.subject, "test-job") + require.Contains(t, mock.subject, "test DAG") require.Equal(t, 1, mock.count) } diff --git a/tests/admin/.dagu/admin.yaml b/tests/admin/.dagu/admin.yaml index 018c4d8f9..a9092590c 100644 --- a/tests/admin/.dagu/admin.yaml +++ b/tests/admin/.dagu/admin.yaml @@ -1,5 +1,5 @@ host: ${HOST} port: 8081 -jobs: "${HOME}/dagu/jobs" +dags: "${HOME}/dagu/dags" command: "${HOME}/dagu/bin/dagu" -workDir: "${HOME}/dagu/jobs" \ No newline at end of file +workDir: "${HOME}/dagu/dags" \ No newline at end of file diff --git a/tests/admin/admin.yaml b/tests/admin/admin.yaml index 7c6466b96..5f97a35ec 100644 --- a/tests/admin/admin.yaml +++ b/tests/admin/admin.yaml @@ -1,6 +1,6 @@ host: ${HOST} port: 8082 -jobs: "${HOME}/dagu/jobs" +dags: "${HOME}/dagu/dags" command: "${HOME}/dagu/bin/dagu" -workDir: "${HOME}/dagu/jobs" +workDir: "${HOME}/dagu/dags" logEncodingCharset: euc-jp \ No newline at end of file diff --git a/tests/testdata/controller_get_job.yaml b/tests/testdata/controller_get_dag.yaml similarity index 100% rename from tests/testdata/controller_get_job.yaml rename to tests/testdata/controller_get_dag.yaml