From 30ee960d67448e59250e7ecbbf75b979b5ec48cd Mon Sep 17 00:00:00 2001 From: liwuhuan Date: Thu, 8 Aug 2024 15:44:08 +0800 Subject: [PATCH 01/18] feat: agent new dag context add DaguRequestID & DaguSchedulerLogPath value --- internal/agent/agent.go | 14 ++++++++++++-- internal/dag/context.go | 22 ++++++++++++++-------- 2 files changed, 26 insertions(+), 10 deletions(-) diff --git a/internal/agent/agent.go b/internal/agent/agent.go index d2a4f21f1..50c1c097c 100644 --- a/internal/agent/agent.go +++ b/internal/agent/agent.go @@ -203,7 +203,12 @@ func (a *Agent) Run(ctx context.Context) error { // Start the DAG execution. lastErr := a.scheduler.Schedule( - dag.NewContext(ctx, a.dag, a.dataStore.DAGStore()), + dag.NewContext(ctx, dag.Context{ + DAG: a.dag, + Finder: a.dataStore.DAGStore(), + DaguRequestID: a.requestID, + DaguSchedulerLogPath: a.logFile, + }), a.graph, done, ) @@ -384,7 +389,12 @@ func (a *Agent) dryRun() error { a.logger.Info("Dry-run started", "reqId", a.requestID) lastErr := a.scheduler.Schedule( - dag.NewContext(context.Background(), a.dag, a.dataStore.DAGStore()), + dag.NewContext(context.Background(), dag.Context{ + DAG: a.dag, + Finder: a.dataStore.DAGStore(), + DaguRequestID: a.requestID, + DaguSchedulerLogPath: a.logDir, + }), a.graph, done, ) diff --git a/internal/dag/context.go b/internal/dag/context.go index 271f4f4d8..5f767b90c 100644 --- a/internal/dag/context.go +++ b/internal/dag/context.go @@ -28,25 +28,31 @@ type Finder interface { // Context contains the current DAG and Finder. type Context struct { - DAG *DAG - Finder Finder + DAG *DAG + Finder Finder + DaguSchedulerLogPath string + DaguExecutionLogPath string + DaguRequestID string } // ctxKey is used as the key for storing the DAG in the context. type ctxKey struct{} -// NewContext creates a new context with the DAG and Finder. -func NewContext(ctx context.Context, dag *DAG, finder Finder) context.Context { - return context.WithValue(ctx, ctxKey{}, Context{ - DAG: dag, - Finder: finder, - }) +// NewContext creates a new context with the DAG and Finder and RequestIDEnvKey. +func NewContext(ctx context.Context, dagCtx Context) context.Context { + return context.WithValue(ctx, ctxKey{}, dagCtx) } var ( errFailedCtxAssertion = errors.New("failed to assert DAG context") ) +const ( + ExecutionLogPathEnvKey = "DAGU_EXECUTION_LOG_PATH" + SchedulerLogPathEnvKey = "DAGU_SCHEDULER_LOG_PATH" + RequestIDEnvKey = "DAGU_REQUEST_ID" +) + // GetContext returns the DAG Context from the context. // It returns an error if the context does not contain a DAG Context. func GetContext(ctx context.Context) (Context, error) { From c4f36ee6be275cbaf528425d8a01e56123e87446 Mon Sep 17 00:00:00 2001 From: liwuhuan Date: Thu, 8 Aug 2024 15:44:12 +0800 Subject: [PATCH 02/18] feat: before node setupExec to setUpBuildInContextEnv --- internal/dag/scheduler/node.go | 25 +++++++++++++++++++++++++ 1 file changed, 25 insertions(+) diff --git a/internal/dag/scheduler/node.go b/internal/dag/scheduler/node.go index 7436d6b41..2145d0927 100644 --- a/internal/dag/scheduler/node.go +++ b/internal/dag/scheduler/node.go @@ -129,6 +129,10 @@ func (n *Node) State() NodeState { // Execute runs the command synchronously and returns error if any. func (n *Node) Execute(ctx context.Context) error { + if err := n.setUpBuildInContextEnv(ctx); err != nil { + return err + } + cmd, err := n.setupExec(ctx) if err != nil { return err @@ -368,6 +372,27 @@ func (n *Node) setupLog() error { return nil } +func (n *Node) setUpBuildInContextEnv(ctx context.Context) error { + var ( + dagCtx dag.Context + err error + ) + + if dagCtx, err = dag.GetContext(ctx); err != nil { + return err + } + + if err = os.Setenv(dag.ExecutionLogPathEnvKey, n.data.Log); err != nil { + return err + } + + if err = os.Setenv(dag.SchedulerLogPathEnvKey, dagCtx.DaguSchedulerLogPath); err != nil { + return err + } + + return os.Setenv(dag.RequestIDEnvKey, dagCtx.DaguRequestID) +} + func (n *Node) teardown() error { if n.done { return nil From e8d83c65e586cb68f11d6432b2f66ceff7f0c226 Mon Sep 17 00:00:00 2001 From: halalala222 <1741196223@qq.com> Date: Fri, 16 Aug 2024 10:05:26 +0800 Subject: [PATCH 03/18] fix: agent revert change --- internal/agent/agent.go | 22 ++-------------------- 1 file changed, 2 insertions(+), 20 deletions(-) diff --git a/internal/agent/agent.go b/internal/agent/agent.go index 50c1c097c..fa9e0f05c 100644 --- a/internal/agent/agent.go +++ b/internal/agent/agent.go @@ -202,16 +202,7 @@ func (a *Agent) Run(ctx context.Context) error { }() // Start the DAG execution. - lastErr := a.scheduler.Schedule( - dag.NewContext(ctx, dag.Context{ - DAG: a.dag, - Finder: a.dataStore.DAGStore(), - DaguRequestID: a.requestID, - DaguSchedulerLogPath: a.logFile, - }), - a.graph, - done, - ) + lastErr := a.scheduler.Schedule(dag.NewContext(ctx, a.dag, a.dataStore.DAGStore()), a.graph, done) // Update the finished status to the history database. finishedStatus := a.Status() @@ -388,16 +379,7 @@ func (a *Agent) dryRun() error { a.logger.Info("Dry-run started", "reqId", a.requestID) - lastErr := a.scheduler.Schedule( - dag.NewContext(context.Background(), dag.Context{ - DAG: a.dag, - Finder: a.dataStore.DAGStore(), - DaguRequestID: a.requestID, - DaguSchedulerLogPath: a.logDir, - }), - a.graph, - done, - ) + lastErr := a.scheduler.Schedule(dag.NewContext(context.Background(), a.dag, a.dataStore.DAGStore()), a.graph, done) a.reporter.report(a.Status(), lastErr) From 42b18dacd7486adc0838bd50d26e1016601357d4 Mon Sep 17 00:00:00 2001 From: halalala222 <1741196223@qq.com> Date: Fri, 16 Aug 2024 10:05:46 +0800 Subject: [PATCH 04/18] fix: dag context revert change --- internal/dag/context.go | 19 +++++-------------- 1 file changed, 5 insertions(+), 14 deletions(-) diff --git a/internal/dag/context.go b/internal/dag/context.go index 5f767b90c..8fe5e6e7e 100644 --- a/internal/dag/context.go +++ b/internal/dag/context.go @@ -28,31 +28,22 @@ type Finder interface { // Context contains the current DAG and Finder. type Context struct { - DAG *DAG - Finder Finder - DaguSchedulerLogPath string - DaguExecutionLogPath string - DaguRequestID string + DAG *DAG + Finder Finder } // ctxKey is used as the key for storing the DAG in the context. type ctxKey struct{} -// NewContext creates a new context with the DAG and Finder and RequestIDEnvKey. -func NewContext(ctx context.Context, dagCtx Context) context.Context { - return context.WithValue(ctx, ctxKey{}, dagCtx) +// NewContext creates a new context with the DAG and Finder. +func NewContext(ctx context.Context, dag *DAG, finder Finder) context.Context { + return context.WithValue(ctx, ctxKey{}, Context{DAG: dag, Finder: finder}) } var ( errFailedCtxAssertion = errors.New("failed to assert DAG context") ) -const ( - ExecutionLogPathEnvKey = "DAGU_EXECUTION_LOG_PATH" - SchedulerLogPathEnvKey = "DAGU_SCHEDULER_LOG_PATH" - RequestIDEnvKey = "DAGU_REQUEST_ID" -) - // GetContext returns the DAG Context from the context. // It returns an error if the context does not contain a DAG Context. func GetContext(ctx context.Context) (Context, error) { From aa568e7b703ed2d266f0da7885078b41620328f7 Mon Sep 17 00:00:00 2001 From: halalala222 <1741196223@qq.com> Date: Fri, 16 Aug 2024 10:06:12 +0800 Subject: [PATCH 05/18] fix: node revert change --- internal/dag/scheduler/node.go | 26 -------------------------- 1 file changed, 26 deletions(-) diff --git a/internal/dag/scheduler/node.go b/internal/dag/scheduler/node.go index 2145d0927..7d5d88571 100644 --- a/internal/dag/scheduler/node.go +++ b/internal/dag/scheduler/node.go @@ -129,10 +129,6 @@ func (n *Node) State() NodeState { // Execute runs the command synchronously and returns error if any. func (n *Node) Execute(ctx context.Context) error { - if err := n.setUpBuildInContextEnv(ctx); err != nil { - return err - } - cmd, err := n.setupExec(ctx) if err != nil { return err @@ -371,28 +367,6 @@ func (n *Node) setupLog() error { n.logWriter = bufio.NewWriter(n.logFile) return nil } - -func (n *Node) setUpBuildInContextEnv(ctx context.Context) error { - var ( - dagCtx dag.Context - err error - ) - - if dagCtx, err = dag.GetContext(ctx); err != nil { - return err - } - - if err = os.Setenv(dag.ExecutionLogPathEnvKey, n.data.Log); err != nil { - return err - } - - if err = os.Setenv(dag.SchedulerLogPathEnvKey, dagCtx.DaguSchedulerLogPath); err != nil { - return err - } - - return os.Setenv(dag.RequestIDEnvKey, dagCtx.DaguRequestID) -} - func (n *Node) teardown() error { if n.done { return nil From eb933a57bf379ab0bfbff5dea8b3d40783c9d3db Mon Sep 17 00:00:00 2001 From: halalala222 <1741196223@qq.com> Date: Fri, 16 Aug 2024 10:06:37 +0800 Subject: [PATCH 06/18] feat: add build in context constants --- internal/constants/constants.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/internal/constants/constants.go b/internal/constants/constants.go index 2d097c6d9..0031feeb3 100644 --- a/internal/constants/constants.go +++ b/internal/constants/constants.go @@ -16,5 +16,9 @@ package constants var ( - Version = "0.0.1" + Version = "0.0.1" + StepDaguExecutionLogPathKeySuffix = "DAGU_EXECUTION_LOG_PATH" + StepDaguExecutionLogPathKeyPrefix = "STEP" + DaguSchedulerLogPathKey = "DAGU_SCHEDULER_LOG_PATH" + DaguRequestIDKey = "DAGU_REQUEST_ID" ) From 190c3b6a51c565870ec02733621ee6bde138f848 Mon Sep 17 00:00:00 2001 From: halalala222 <1741196223@qq.com> Date: Fri, 16 Aug 2024 10:07:20 +0800 Subject: [PATCH 07/18] feat: export agent process build in context environment --- internal/agent/agent.go | 22 +++++++++++++++++++--- 1 file changed, 19 insertions(+), 3 deletions(-) diff --git a/internal/agent/agent.go b/internal/agent/agent.go index fa9e0f05c..0896e7b55 100644 --- a/internal/agent/agent.go +++ b/internal/agent/agent.go @@ -27,13 +27,13 @@ import ( "syscall" "time" - "github.com/daguflow/dagu/internal/logger" - "github.com/daguflow/dagu/internal/persistence" - "github.com/daguflow/dagu/internal/client" + "github.com/daguflow/dagu/internal/constants" "github.com/daguflow/dagu/internal/dag" "github.com/daguflow/dagu/internal/dag/scheduler" + "github.com/daguflow/dagu/internal/logger" "github.com/daguflow/dagu/internal/mailer" + "github.com/daguflow/dagu/internal/persistence" "github.com/daguflow/dagu/internal/persistence/model" "github.com/daguflow/dagu/internal/sock" ) @@ -309,12 +309,28 @@ func (a *Agent) HandleHTTP(w http.ResponseWriter, r *http.Request) { } } +func (a *Agent) setupEnvironmentVariable() error { + var ( + err error + ) + + if err = os.Setenv(constants.DaguSchedulerLogPathKey, a.logFile); err != nil { + return err + } + + return os.Setenv(constants.DaguRequestIDKey, a.requestID) +} + // setup the agent instance for DAG execution. func (a *Agent) setup() error { // Lock to prevent race condition. a.lock.Lock() defer a.lock.Unlock() + if err := a.setupEnvironmentVariable(); err != nil { + return err + } + a.scheduler = a.newScheduler() a.reporter = newReporter( mailer.New(&mailer.NewMailerArgs{ From f9e13bb6a59659e528ea624966f79d02a10d6bba Mon Sep 17 00:00:00 2001 From: halalala222 <1741196223@qq.com> Date: Fri, 16 Aug 2024 10:07:48 +0800 Subject: [PATCH 08/18] feat: step add nodeID --- internal/dag/step.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/internal/dag/step.go b/internal/dag/step.go index 5030e0163..4523c506d 100644 --- a/internal/dag/step.go +++ b/internal/dag/step.go @@ -25,6 +25,8 @@ import ( // A step is created from parsing a DAG file written in YAML. // It marshal/unmarshal to/from JSON when it is saved in the execution history. type Step struct { + // nodeID is the unique identifier for the step. JSON tag is omitted to avoid + NodeID int `json:"-"` // Name is the name of the step. Name string `json:"Name"` // Description is the description of the step. From 198573be1a8c090fde6fc8c2589f7e058a3d260e Mon Sep 17 00:00:00 2001 From: halalala222 <1741196223@qq.com> Date: Fri, 16 Aug 2024 10:08:13 +0800 Subject: [PATCH 09/18] feat: add generate step special execution log path key function --- internal/util/utils.go | 21 +++++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/internal/util/utils.go b/internal/util/utils.go index ad152240e..b80206207 100644 --- a/internal/util/utils.go +++ b/internal/util/utils.go @@ -21,10 +21,13 @@ import ( "os" "path/filepath" "regexp" + "strconv" "strings" "time" "github.com/mattn/go-shellwords" + + "github.com/daguflow/dagu/internal/constants" ) var ( @@ -225,3 +228,21 @@ func AddYamlExtension(file string) string { } return file } + +func GenerateStepSpecialExecutionLogPathKey(stepID int) string { + var ( + keyBuilder = strings.Builder{} + ) + + keyBuilder.WriteString(constants.StepDaguExecutionLogPathKeyPrefix) + + keyBuilder.WriteString("_") + + keyBuilder.WriteString(strconv.Itoa(stepID)) + + keyBuilder.WriteString("_") + + keyBuilder.WriteString(constants.StepDaguExecutionLogPathKeySuffix) + + return keyBuilder.String() +} From d00a67850b84df6016a2e926ce6488efce2328bc Mon Sep 17 00:00:00 2001 From: halalala222 <1741196223@qq.com> Date: Fri, 16 Aug 2024 10:09:31 +0800 Subject: [PATCH 10/18] feat: command add origin execution log path environment --- internal/dag/executor/command.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/internal/dag/executor/command.go b/internal/dag/executor/command.go index 251cfd8dc..836151fbb 100644 --- a/internal/dag/executor/command.go +++ b/internal/dag/executor/command.go @@ -24,9 +24,9 @@ import ( "sync" "syscall" - "github.com/daguflow/dagu/internal/util" - + "github.com/daguflow/dagu/internal/constants" "github.com/daguflow/dagu/internal/dag" + "github.com/daguflow/dagu/internal/util" ) type commandExecutor struct { @@ -40,9 +40,11 @@ func newCommand(ctx context.Context, step dag.Step) (Executor, error) { if len(step.Dir) > 0 && !util.FileExists(step.Dir) { return nil, fmt.Errorf("directory %q does not exist", step.Dir) } + stepSpecialExecutionLogPathKey := util.GenerateStepSpecialExecutionLogPathKey(step.NodeID) cmd.Dir = step.Dir cmd.Env = append(cmd.Env, os.Environ()...) cmd.Env = append(cmd.Env, step.Variables...) + cmd.Env = append(cmd.Env, fmt.Sprintf("%s=%s", constants.StepDaguExecutionLogPathKeySuffix, os.Getenv(stepSpecialExecutionLogPathKey))) step.OutputVariables.Range(func(_, value any) bool { cmd.Env = append(cmd.Env, value.(string)) return true From e2957f654b3db9415c3933b1b9fe06c9b0409e88 Mon Sep 17 00:00:00 2001 From: halalala222 <1741196223@qq.com> Date: Fri, 16 Aug 2024 10:30:21 +0800 Subject: [PATCH 11/18] feat: node replace arguments & export node special execution log path environment --- internal/dag/scheduler/node.go | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/internal/dag/scheduler/node.go b/internal/dag/scheduler/node.go index 7d5d88571..fb7174fe5 100644 --- a/internal/dag/scheduler/node.go +++ b/internal/dag/scheduler/node.go @@ -20,6 +20,7 @@ import ( "bytes" "context" "fmt" + "github.com/daguflow/dagu/internal/constants" "io" "log" "os" @@ -359,6 +360,9 @@ func (n *Node) setupLog() error { n.logLock.Lock() defer n.logLock.Unlock() var err error + if err = os.Setenv(util.GenerateStepSpecialExecutionLogPathKey(n.id), n.data.State.Log); err != nil { + return err + } n.logFile, err = util.OpenOrCreateFile(n.data.State.Log) if err != nil { n.data.State.Error = err @@ -432,6 +436,14 @@ func (n *Node) init() { return } n.id = getNextNodeID() + n.data.Step.NodeID = n.id + + n.data.Step.CmdWithArgs = strings.ReplaceAll( + n.data.Step.CmdWithArgs, + constants.StepDaguExecutionLogPathKeySuffix, + util.GenerateStepSpecialExecutionLogPathKey(n.id), + ) + if n.data.Step.Variables == nil { n.data.Step.Variables = []string{} } From b876e942ef398379aa80924d6f977cb690f844f2 Mon Sep 17 00:00:00 2001 From: halalala222 <1741196223@qq.com> Date: Fri, 16 Aug 2024 10:47:48 +0800 Subject: [PATCH 12/18] style: go imports fmt --- internal/dag/scheduler/node.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/internal/dag/scheduler/node.go b/internal/dag/scheduler/node.go index fb7174fe5..b0be00693 100644 --- a/internal/dag/scheduler/node.go +++ b/internal/dag/scheduler/node.go @@ -20,7 +20,6 @@ import ( "bytes" "context" "fmt" - "github.com/daguflow/dagu/internal/constants" "io" "log" "os" @@ -29,10 +28,12 @@ import ( "sync" "time" + "golang.org/x/sys/unix" + + "github.com/daguflow/dagu/internal/constants" "github.com/daguflow/dagu/internal/dag" "github.com/daguflow/dagu/internal/dag/executor" "github.com/daguflow/dagu/internal/util" - "golang.org/x/sys/unix" ) // Node is a node in a DAG. It executes a command. From c7454ee5287ee1f92167b4831b8203f21660e19d Mon Sep 17 00:00:00 2001 From: halalala222 <1741196223@qq.com> Date: Mon, 19 Aug 2024 10:13:58 +0800 Subject: [PATCH 13/18] feat: dag context hold envs --- internal/dag/context.go | 29 ++++++++++++++++++++++++++++- 1 file changed, 28 insertions(+), 1 deletion(-) diff --git a/internal/dag/context.go b/internal/dag/context.go index 8fe5e6e7e..24ee51f4b 100644 --- a/internal/dag/context.go +++ b/internal/dag/context.go @@ -18,6 +18,10 @@ package dag import ( "context" "errors" + "strconv" + "strings" + + "github.com/daguflow/dagu/internal/constants" ) // Finder finds a DAG by name. @@ -30,6 +34,7 @@ type Finder interface { type Context struct { DAG *DAG Finder Finder + Envs []string } // ctxKey is used as the key for storing the DAG in the context. @@ -37,7 +42,7 @@ type ctxKey struct{} // NewContext creates a new context with the DAG and Finder. func NewContext(ctx context.Context, dag *DAG, finder Finder) context.Context { - return context.WithValue(ctx, ctxKey{}, Context{DAG: dag, Finder: finder}) + return context.WithValue(ctx, ctxKey{}, Context{DAG: dag, Finder: finder, Envs: make([]string, 0)}) } var ( @@ -53,3 +58,25 @@ func GetContext(ctx context.Context) (Context, error) { } return dagCtx, nil } + +func WithDagContext(ctx context.Context, dagContext Context) context.Context { + return context.WithValue(ctx, ctxKey{}, dagContext) +} + +func GenGlobalStepLogEnvKey(stepID int) string { + var ( + keyBuilder = strings.Builder{} + ) + + keyBuilder.WriteString(constants.StepDaguExecutionLogPathKeyPrefix) + + keyBuilder.WriteString("_") + + keyBuilder.WriteString(strconv.Itoa(stepID)) + + keyBuilder.WriteString("_") + + keyBuilder.WriteString(constants.StepDaguExecutionLogPathKeySuffix) + + return keyBuilder.String() +} From 2fe6725848956e1655e360e5e2b65ba6f0d9707f Mon Sep 17 00:00:00 2001 From: halalala222 <1741196223@qq.com> Date: Mon, 19 Aug 2024 10:15:14 +0800 Subject: [PATCH 14/18] feat: step up node env in execute & pass log path env with dag context --- internal/dag/scheduler/node.go | 16 +++++++++++----- internal/dag/step.go | 2 -- 2 files changed, 11 insertions(+), 7 deletions(-) diff --git a/internal/dag/scheduler/node.go b/internal/dag/scheduler/node.go index b0be00693..8f5573926 100644 --- a/internal/dag/scheduler/node.go +++ b/internal/dag/scheduler/node.go @@ -131,6 +131,16 @@ func (n *Node) State() NodeState { // Execute runs the command synchronously and returns error if any. func (n *Node) Execute(ctx context.Context) error { + dagContext, err := dag.GetContext(ctx) + if err != nil { + return err + } + // set node special log path environment variable + if err = os.Setenv(dag.GenGlobalStepLogEnvKey(n.id), n.data.State.Log); err != nil { + return err + } + dagContext.Envs = append(dagContext.Envs, fmt.Sprintf("%s=%s", constants.StepDaguExecutionLogPathKeySuffix, n.data.State.Log)) + ctx = dag.WithDagContext(ctx, dagContext) cmd, err := n.setupExec(ctx) if err != nil { return err @@ -361,9 +371,6 @@ func (n *Node) setupLog() error { n.logLock.Lock() defer n.logLock.Unlock() var err error - if err = os.Setenv(util.GenerateStepSpecialExecutionLogPathKey(n.id), n.data.State.Log); err != nil { - return err - } n.logFile, err = util.OpenOrCreateFile(n.data.State.Log) if err != nil { n.data.State.Error = err @@ -437,12 +444,11 @@ func (n *Node) init() { return } n.id = getNextNodeID() - n.data.Step.NodeID = n.id n.data.Step.CmdWithArgs = strings.ReplaceAll( n.data.Step.CmdWithArgs, constants.StepDaguExecutionLogPathKeySuffix, - util.GenerateStepSpecialExecutionLogPathKey(n.id), + dag.GenGlobalStepLogEnvKey(n.id), ) if n.data.Step.Variables == nil { diff --git a/internal/dag/step.go b/internal/dag/step.go index 4523c506d..5030e0163 100644 --- a/internal/dag/step.go +++ b/internal/dag/step.go @@ -25,8 +25,6 @@ import ( // A step is created from parsing a DAG file written in YAML. // It marshal/unmarshal to/from JSON when it is saved in the execution history. type Step struct { - // nodeID is the unique identifier for the step. JSON tag is omitted to avoid - NodeID int `json:"-"` // Name is the name of the step. Name string `json:"Name"` // Description is the description of the step. From 882cf1418d022b9150368ac1b5d53148a23eced1 Mon Sep 17 00:00:00 2001 From: halalala222 <1741196223@qq.com> Date: Mon, 19 Aug 2024 10:15:30 +0800 Subject: [PATCH 15/18] feat: remove gen log path key in dag package --- internal/util/utils.go | 21 --------------------- 1 file changed, 21 deletions(-) diff --git a/internal/util/utils.go b/internal/util/utils.go index b80206207..ad152240e 100644 --- a/internal/util/utils.go +++ b/internal/util/utils.go @@ -21,13 +21,10 @@ import ( "os" "path/filepath" "regexp" - "strconv" "strings" "time" "github.com/mattn/go-shellwords" - - "github.com/daguflow/dagu/internal/constants" ) var ( @@ -228,21 +225,3 @@ func AddYamlExtension(file string) string { } return file } - -func GenerateStepSpecialExecutionLogPathKey(stepID int) string { - var ( - keyBuilder = strings.Builder{} - ) - - keyBuilder.WriteString(constants.StepDaguExecutionLogPathKeyPrefix) - - keyBuilder.WriteString("_") - - keyBuilder.WriteString(strconv.Itoa(stepID)) - - keyBuilder.WriteString("_") - - keyBuilder.WriteString(constants.StepDaguExecutionLogPathKeySuffix) - - return keyBuilder.String() -} From 2dd46ba5e24184d159005e9d4db3302890a39eb6 Mon Sep 17 00:00:00 2001 From: halalala222 <1741196223@qq.com> Date: Mon, 19 Aug 2024 10:15:46 +0800 Subject: [PATCH 16/18] feat: command env from dag context --- internal/dag/executor/command.go | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/internal/dag/executor/command.go b/internal/dag/executor/command.go index 836151fbb..6b7032814 100644 --- a/internal/dag/executor/command.go +++ b/internal/dag/executor/command.go @@ -24,7 +24,6 @@ import ( "sync" "syscall" - "github.com/daguflow/dagu/internal/constants" "github.com/daguflow/dagu/internal/dag" "github.com/daguflow/dagu/internal/util" ) @@ -40,11 +39,15 @@ func newCommand(ctx context.Context, step dag.Step) (Executor, error) { if len(step.Dir) > 0 && !util.FileExists(step.Dir) { return nil, fmt.Errorf("directory %q does not exist", step.Dir) } - stepSpecialExecutionLogPathKey := util.GenerateStepSpecialExecutionLogPathKey(step.NodeID) + dagContext, err := dag.GetContext(ctx) + if err != nil { + return nil, err + } + fmt.Printf("dagContext: %v\n", dagContext) cmd.Dir = step.Dir cmd.Env = append(cmd.Env, os.Environ()...) cmd.Env = append(cmd.Env, step.Variables...) - cmd.Env = append(cmd.Env, fmt.Sprintf("%s=%s", constants.StepDaguExecutionLogPathKeySuffix, os.Getenv(stepSpecialExecutionLogPathKey))) + cmd.Env = append(cmd.Env, dagContext.Envs...) step.OutputVariables.Range(func(_, value any) bool { cmd.Env = append(cmd.Env, value.(string)) return true From 8fee9531cd48cc33f6e33c7fc9236059d861a359 Mon Sep 17 00:00:00 2001 From: halalala222 <1741196223@qq.com> Date: Mon, 19 Aug 2024 10:27:07 +0800 Subject: [PATCH 17/18] fix: execute need dag context cause test case run failed --- internal/dag/scheduler/node_test.go | 16 ++++++---- internal/dag/scheduler/scheduler_test.go | 40 +++++++++++++----------- 2 files changed, 32 insertions(+), 24 deletions(-) diff --git a/internal/dag/scheduler/node_test.go b/internal/dag/scheduler/node_test.go index 981badc2d..2c29056fe 100644 --- a/internal/dag/scheduler/node_test.go +++ b/internal/dag/scheduler/node_test.go @@ -29,13 +29,17 @@ import ( "github.com/stretchr/testify/require" ) +func nodeTextCtxWithDagContext() context.Context { + return dag.NewContext(context.Background(), nil, nil) +} + func TestExecute(t *testing.T) { n := &Node{data: NodeData{ Step: dag.Step{ Command: "true", OutputVariables: &dag.SyncMap{}, }}} - require.NoError(t, n.Execute(context.Background())) + require.NoError(t, n.Execute(nodeTextCtxWithDagContext())) require.Nil(t, n.data.State.Error) } @@ -45,7 +49,7 @@ func TestError(t *testing.T) { Command: "false", OutputVariables: &dag.SyncMap{}, }}} - err := n.Execute(context.Background()) + err := n.Execute(nodeTextCtxWithDagContext()) require.True(t, err != nil) require.Equal(t, n.data.State.Error, err) } @@ -64,7 +68,7 @@ func TestSignal(t *testing.T) { }() n.setStatus(NodeStatusRunning) - err := n.Execute(context.Background()) + err := n.Execute(nodeTextCtxWithDagContext()) require.Error(t, err) require.Equal(t, n.State().Status, NodeStatusCancel) @@ -85,7 +89,7 @@ func TestSignalSpecified(t *testing.T) { }() n.setStatus(NodeStatusRunning) - err := n.Execute(context.Background()) + err := n.Execute(nodeTextCtxWithDagContext()) require.Error(t, err) require.Equal(t, n.State().Status, NodeStatusCancel) @@ -346,7 +350,7 @@ func TestRunScript(t *testing.T) { require.Equal(t, n.data.Step.Script, string(b)) require.NoError(t, err) - err = n.Execute(context.Background()) + err = n.Execute(nodeTextCtxWithDagContext()) require.NoError(t, err) err = n.teardown() require.NoError(t, err) @@ -383,7 +387,7 @@ func runTestNode(t *testing.T, n *Node) { err := n.setup(os.Getenv("HOME"), fmt.Sprintf("test-request-id-%d", rand.Int())) require.NoError(t, err) - err = n.Execute(context.Background()) + err = n.Execute(nodeTextCtxWithDagContext()) require.NoError(t, err) err = n.teardown() require.NoError(t, err) diff --git a/internal/dag/scheduler/scheduler_test.go b/internal/dag/scheduler/scheduler_test.go index b50d95596..372de4858 100644 --- a/internal/dag/scheduler/scheduler_test.go +++ b/internal/dag/scheduler/scheduler_test.go @@ -48,6 +48,10 @@ func TestMain(m *testing.M) { os.Exit(code) } +func schedulerTextCtxWithDagContext() context.Context { + return dag.NewContext(context.Background(), nil, nil) +} + func TestScheduler(t *testing.T) { g, err := NewExecutionGraph( logger.Default, @@ -69,7 +73,7 @@ func TestScheduler(t *testing.T) { } }() - err = sc.Schedule(context.Background(), g, done) + err = sc.Schedule(schedulerTextCtxWithDagContext(), g, done) require.Error(t, err) require.Equal(t, counter.Load(), int64(3)) @@ -87,7 +91,7 @@ func TestSchedulerParallel(t *testing.T) { step("2", testCommand), step("3", testCommand), ) - err := sc.Schedule(context.Background(), g, nil) + err := sc.Schedule(schedulerTextCtxWithDagContext(), g, nil) require.NoError(t, err) require.Equal(t, sc.Status(g), StatusSuccess) @@ -177,7 +181,7 @@ func TestSchedulerCancel(t *testing.T) { sc.Cancel(g) }() - _ = sc.Schedule(context.Background(), g, nil) + _ = sc.Schedule(schedulerTextCtxWithDagContext(), g, nil) require.Eventually(t, func() bool { return sc.Status(g) == StatusCancel @@ -201,7 +205,7 @@ func TestSchedulerTimeout(t *testing.T) { ) sc := New(&Config{Timeout: time.Second * 2, LogDir: testHomeDir}) - err := sc.Schedule(context.Background(), g, nil) + err := sc.Schedule(schedulerTextCtxWithDagContext(), g, nil) require.Error(t, err) require.Equal(t, sc.Status(g), StatusError) @@ -308,7 +312,7 @@ func TestSchedulerRetrySuccess(t *testing.T) { require.Greater(t, retriedAt.Sub(startedAt), time.Millisecond*500) }() - err = sc.Schedule(context.Background(), g, nil) + err = sc.Schedule(schedulerTextCtxWithDagContext(), g, nil) require.NoError(t, err) require.Equal(t, sc.Status(g), StatusSuccess) @@ -370,7 +374,7 @@ func TestSchedulerOnExit(t *testing.T) { step("3", testCommand), ) - err := sc.Schedule(context.Background(), g, nil) + err := sc.Schedule(schedulerTextCtxWithDagContext(), g, nil) require.NoError(t, err) nodes := g.Nodes() @@ -392,7 +396,7 @@ func TestSchedulerOnExitOnFail(t *testing.T) { step("3", testCommand), ) - err := sc.Schedule(context.Background(), g, nil) + err := sc.Schedule(schedulerTextCtxWithDagContext(), g, nil) require.Error(t, err) nodes := g.Nodes() @@ -419,7 +423,7 @@ func TestSchedulerOnSignal(t *testing.T) { sc.Signal(g, syscall.SIGTERM, nil, false) }() - err := sc.Schedule(context.Background(), g, nil) + err := sc.Schedule(schedulerTextCtxWithDagContext(), g, nil) require.NoError(t, err) nodes := g.Nodes() @@ -451,7 +455,7 @@ func TestSchedulerOnCancel(t *testing.T) { sc.Signal(g, syscall.SIGTERM, done, false) }() - err := sc.Schedule(context.Background(), g, nil) + err := sc.Schedule(schedulerTextCtxWithDagContext(), g, nil) require.NoError(t, err) <-done // Wait for canceling finished require.Equal(t, sc.Status(g), StatusCancel) @@ -478,7 +482,7 @@ func TestSchedulerOnSuccess(t *testing.T) { step("1", testCommand), ) - err := sc.Schedule(context.Background(), g, nil) + err := sc.Schedule(schedulerTextCtxWithDagContext(), g, nil) require.NoError(t, err) nodes := g.Nodes() @@ -504,7 +508,7 @@ func TestSchedulerOnFailure(t *testing.T) { step("1", testCommandFail), ) - err := sc.Schedule(context.Background(), g, nil) + err := sc.Schedule(schedulerTextCtxWithDagContext(), g, nil) require.Error(t, err) nodes := g.Nodes() @@ -537,7 +541,7 @@ func TestRepeat(t *testing.T) { sc.Cancel(g) }() - err := sc.Schedule(context.Background(), g, nil) + err := sc.Schedule(schedulerTextCtxWithDagContext(), g, nil) require.NoError(t, err) nodes := g.Nodes() @@ -560,7 +564,7 @@ func TestRepeatFail(t *testing.T) { }, ) sc := New(&Config{LogDir: testHomeDir}) - err := sc.Schedule(context.Background(), g, nil) + err := sc.Schedule(schedulerTextCtxWithDagContext(), g, nil) require.Error(t, err) nodes := g.Nodes() @@ -592,7 +596,7 @@ func TestStopRepetitiveTaskGracefully(t *testing.T) { sc.Signal(g, syscall.SIGTERM, done, false) }() - err := sc.Schedule(context.Background(), g, nil) + err := sc.Schedule(schedulerTextCtxWithDagContext(), g, nil) require.NoError(t, err) <-done @@ -637,7 +641,7 @@ func TestNodeSetupFailure(t *testing.T) { }, ) sc := New(&Config{LogDir: testHomeDir}) - err := sc.Schedule(context.Background(), g, nil) + err := sc.Schedule(schedulerTextCtxWithDagContext(), g, nil) require.Error(t, err) require.Equal(t, sc.Status(g), StatusError) @@ -665,7 +669,7 @@ func TestNodeTeardownFailure(t *testing.T) { nodes[0].mu.Unlock() }() - err := sc.Schedule(context.Background(), g, nil) + err := sc.Schedule(schedulerTextCtxWithDagContext(), g, nil) // file already closed require.Error(t, err) @@ -683,7 +687,7 @@ func TestTakeOutputFromPrevStep(t *testing.T) { s2.Output = "TOOK_PREV_OUT" g, sc := newTestScheduler(t, &Config{LogDir: testHomeDir}, s1, s2) - err := sc.Schedule(context.Background(), g, nil) + err := sc.Schedule(schedulerTextCtxWithDagContext(), g, nil) require.NoError(t, err) nodes := g.Nodes() @@ -712,7 +716,7 @@ func testSchedule(t *testing.T, steps ...dag.Step) ( MaxActiveRuns: 2, LogDir: testHomeDir, }, steps...) - return g, sc, sc.Schedule(context.Background(), g, nil) + return g, sc, sc.Schedule(schedulerTextCtxWithDagContext(), g, nil) } func newTestScheduler(t *testing.T, cfg *Config, steps ...dag.Step) ( From 2e737f72f882a7aa51a79d034072e185226cc6ea Mon Sep 17 00:00:00 2001 From: halalala222 <1741196223@qq.com> Date: Mon, 19 Aug 2024 16:38:34 +0800 Subject: [PATCH 18/18] fix: remove unless info print --- internal/dag/executor/command.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/dag/executor/command.go b/internal/dag/executor/command.go index 6b7032814..5d2c7a618 100644 --- a/internal/dag/executor/command.go +++ b/internal/dag/executor/command.go @@ -43,7 +43,7 @@ func newCommand(ctx context.Context, step dag.Step) (Executor, error) { if err != nil { return nil, err } - fmt.Printf("dagContext: %v\n", dagContext) + cmd.Dir = step.Dir cmd.Env = append(cmd.Env, os.Environ()...) cmd.Env = append(cmd.Env, step.Variables...)