Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Additional tests #663

Merged
merged 15 commits into from
Aug 14, 2024
Prev Previous commit
Next Next commit
wip
yottahmd committed Aug 12, 2024
commit 935d2d2fb5aaaf4a596b23f1bbec37fe5bdff91e
25 changes: 13 additions & 12 deletions cmd/dry.go
Original file line number Diff line number Diff line change
@@ -17,7 +17,6 @@ package cmd

import (
"log"
"os"
"path/filepath"

"github.com/daguflow/dagu/internal/agent"
@@ -45,32 +44,35 @@ func dryCmd() *cobra.Command {

params, err := cmd.Flags().GetString("params")
if err != nil {
initLogger.Error("Parameter retrieval failed", "error", err)
os.Exit(1)
initLogger.Fatal("Parameter retrieval failed", "error", err)
}

workflow, err := dag.Load(cfg.BaseConfig, args[0], removeQuotes(params))
if err != nil {
initLogger.Error("Workflow load failed", "error", err, "file", args[0])
os.Exit(1)
initLogger.Fatal("Workflow load failed", "error", err, "file", args[0])
}

requestID, err := generateRequestID()
if err != nil {
initLogger.Error("Request ID generation failed", "error", err)
os.Exit(1)
initLogger.Fatal("Request ID generation failed", "error", err)
}

logFile, err := openLogFile("dry_", cfg.LogDir, workflow, requestID)
logFile, err := logger.OpenLogFile(logger.LogFileConfig{
Prefix: "dry_",
LogDir: cfg.LogDir,
DAGLogDir: workflow.LogDir,
DAGName: workflow.Name,
RequestID: requestID,
})

if err != nil {
initLogger.Error(
initLogger.Fatal(
"Log file creation failed",
"error",
err,
"workflow",
workflow.Name,
)
os.Exit(1)
}
defer logFile.Close()

@@ -98,11 +100,10 @@ func dryCmd() *cobra.Command {
listenSignals(ctx, agt)

if err := agt.Run(ctx); err != nil {
agentLogger.Error("Workflow execution failed",
agentLogger.Fatal("Workflow execution failed",
"error", err,
"workflow", workflow.Name,
"requestID", requestID)
os.Exit(1)
}
},
}
56 changes: 0 additions & 56 deletions cmd/logging.go

This file was deleted.

30 changes: 14 additions & 16 deletions cmd/restart.go
Original file line number Diff line number Diff line change
@@ -17,7 +17,6 @@ package cmd

import (
"log"
"os"
"path/filepath"
"time"

@@ -57,18 +56,16 @@ func restartCmd() *cobra.Command {
specFilePath := args[0]
workflow, err := dag.Load(cfg.BaseConfig, specFilePath, "")
if err != nil {
initLogger.Error("Workflow load failed", "error", err, "file", args[0])
os.Exit(1)
initLogger.Fatal("Workflow load failed", "error", err, "file", args[0])
}

dataStore := newDataStores(cfg)
cli := newClient(cfg, dataStore, initLogger)

if err := stopDAGIfRunning(cli, workflow, initLogger); err != nil {
initLogger.Error("Workflow stop operation failed",
initLogger.Fatal("Workflow stop operation failed",
"error", err,
"workflow", workflow.Name)
os.Exit(1)
}

// Wait for the specified amount of time before restarting.
@@ -77,35 +74,37 @@ func restartCmd() *cobra.Command {
// Retrieve the parameter of the previous execution.
params, err := getPreviousExecutionParams(cli, workflow)
if err != nil {
initLogger.Error("Previous execution parameter retrieval failed",
initLogger.Fatal("Previous execution parameter retrieval failed",
"error", err,
"workflow", workflow.Name)
os.Exit(1)
}

// Start the DAG with the same parameter.
// Need to reload the DAG file with the parameter.
workflow, err = dag.Load(cfg.BaseConfig, specFilePath, params)
if err != nil {
initLogger.Error("Workflow reload failed",
initLogger.Fatal("Workflow reload failed",
"error", err,
"file", specFilePath,
"params", params)
os.Exit(1)
}

requestID, err := generateRequestID()
if err != nil {
initLogger.Error("Request ID generation failed", "error", err)
os.Exit(1)
initLogger.Fatal("Request ID generation failed", "error", err)
}

logFile, err := openLogFile("restart_", cfg.LogDir, workflow, requestID)
logFile, err := logger.OpenLogFile(logger.LogFileConfig{
Prefix: "restart_",
LogDir: cfg.LogDir,
DAGLogDir: workflow.LogDir,
DAGName: workflow.Name,
RequestID: requestID,
})
if err != nil {
initLogger.Error("Log file creation failed",
initLogger.Fatal("Log file creation failed",
"error", err,
"workflow", workflow.Name)
os.Exit(1)
}
defer logFile.Close()

@@ -133,11 +132,10 @@ func restartCmd() *cobra.Command {

listenSignals(cmd.Context(), agt)
if err := agt.Run(cmd.Context()); err != nil {
agentLogger.Error("Workflow restart failed",
agentLogger.Fatal("Workflow restart failed",
"error", err,
"workflow", workflow.Name,
"requestID", requestID)
os.Exit(1)
}
},
}
26 changes: 13 additions & 13 deletions cmd/retry.go
Original file line number Diff line number Diff line change
@@ -56,44 +56,45 @@ func retryCmd() *cobra.Command {
specFilePath := args[0]
absoluteFilePath, err := filepath.Abs(specFilePath)
if err != nil {
initLogger.Error("Absolute path resolution failed",
initLogger.Fatal("Absolute path resolution failed",
"error", err,
"file", specFilePath)
os.Exit(1)
}

status, err := historyStore.FindByRequestID(absoluteFilePath, requestID)
if err != nil {
initLogger.Error("Historical execution retrieval failed",
initLogger.Fatal("Historical execution retrieval failed",
"error", err,
"requestID", requestID,
"file", absoluteFilePath)
os.Exit(1)
}

// Start the DAG with the same parameters with the execution that
// is being retried.
workflow, err := dag.Load(cfg.BaseConfig, absoluteFilePath, status.Status.Params)
if err != nil {
initLogger.Error("Workflow specification load failed",
initLogger.Fatal("Workflow specification load failed",
"error", err,
"file", specFilePath,
"params", status.Status.Params)
os.Exit(1)
}

newRequestID, err := generateRequestID()
if err != nil {
initLogger.Error("Request ID generation failed", "error", err)
os.Exit(1)
initLogger.Fatal("Request ID generation failed", "error", err)
}

logFile, err := openLogFile("dry_", cfg.LogDir, workflow, newRequestID)
logFile, err := logger.OpenLogFile(logger.LogFileConfig{
Prefix: "retry_",
LogDir: cfg.LogDir,
DAGLogDir: workflow.LogDir,
DAGName: workflow.Name,
RequestID: newRequestID,
})
if err != nil {
initLogger.Error("Log file creation failed",
initLogger.Fatal("Log file creation failed",
"error", err,
"workflow", workflow.Name)
os.Exit(1)
}
defer logFile.Close()

@@ -126,8 +127,7 @@ func retryCmd() *cobra.Command {
listenSignals(ctx, agt)

if err := agt.Run(ctx); err != nil {
agentLogger.Error("Failed to start workflow", "error", err)
os.Exit(1)
agentLogger.Fatal("Failed to start workflow", "error", err)
}
},
}
4 changes: 1 addition & 3 deletions cmd/scheduler.go
Original file line number Diff line number Diff line change
@@ -17,7 +17,6 @@ package cmd

import (
"log"
"os"

"github.com/daguflow/dagu/internal/config"
"github.com/daguflow/dagu/internal/logger"
@@ -54,14 +53,13 @@ func schedulerCmd() *cobra.Command {
cli := newClient(cfg, dataStore, logger)
sc := scheduler.New(cfg, logger, cli)
if err := sc.Start(ctx); err != nil {
logger.Error(
logger.Fatal(
"Scheduler initialization failed",
"error",
err,
"specsDirectory",
cfg.DAGs,
)
os.Exit(1)
}
},
}
4 changes: 1 addition & 3 deletions cmd/server.go
Original file line number Diff line number Diff line change
@@ -17,7 +17,6 @@ package cmd

import (
"log"
"os"

"github.com/daguflow/dagu/internal/config"
"github.com/daguflow/dagu/internal/frontend"
@@ -52,8 +51,7 @@ func serverCmd() *cobra.Command {
cli := newClient(cfg, dataStore, logger)
server := frontend.New(cfg, logger, cli)
if err := server.Serve(cmd.Context()); err != nil {
logger.Error("Server initialization failed", "error", err)
os.Exit(1)
logger.Fatal("Server initialization failed", "error", err)
}
},
}
24 changes: 12 additions & 12 deletions cmd/start.go
Original file line number Diff line number Diff line change
@@ -17,7 +17,6 @@ package cmd

import (
"log"
"os"
"path/filepath"

"github.com/daguflow/dagu/internal/agent"
@@ -52,32 +51,34 @@ func startCmd() *cobra.Command {

params, err := cmd.Flags().GetString("params")
if err != nil {
initLogger.Error("Parameter retrieval failed", "error", err)
os.Exit(1)
initLogger.Fatal("Parameter retrieval failed", "error", err)
}

workflow, err := dag.Load(cfg.BaseConfig, args[0], removeQuotes(params))
if err != nil {
initLogger.Error("Workflow load failed", "error", err, "file", args[0])
os.Exit(1)
initLogger.Fatal("Workflow load failed", "error", err, "file", args[0])
}

requestID, err := generateRequestID()
if err != nil {
initLogger.Error("Request ID generation failed", "error", err)
os.Exit(1)
initLogger.Fatal("Request ID generation failed", "error", err)
}

logFile, err := openLogFile("start_", cfg.LogDir, workflow, requestID)
logFile, err := logger.OpenLogFile(logger.LogFileConfig{
Prefix: "start_",
LogDir: cfg.LogDir,
DAGLogDir: workflow.LogDir,
DAGName: workflow.Name,
RequestID: requestID,
})
if err != nil {
initLogger.Error(
initLogger.Fatal(
"Log file creation failed",
"error",
err,
"workflow",
workflow.Name,
)
os.Exit(1)
}
defer logFile.Close()

@@ -111,11 +112,10 @@ func startCmd() *cobra.Command {
listenSignals(ctx, agt)

if err := agt.Run(ctx); err != nil {
agentLogger.Error("Workflow execution failed",
agentLogger.Fatal("Workflow execution failed",
"error", err,
"workflow", workflow.Name,
"requestID", requestID)
os.Exit(1)
}
},
}
Loading