Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Additional tests #663

Merged
merged 15 commits into from
Aug 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,4 @@ tmp/*
.idea

# Directory for local development
local/
.local/
10 changes: 2 additions & 8 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ VERSION=
SCRIPT_DIR=$(abspath $(dir $(lastword $(MAKEFILE_LIST))))

# Directories for miscellaneous files for the local environment
LOCAL_DIR=$(SCRIPT_DIR)/local
LOCAL_DIR=$(SCRIPT_DIR)/.local
LOCAL_BIN_DIR=$(LOCAL_DIR)/bin

# Configuration directory
Expand Down Expand Up @@ -111,6 +111,7 @@ run-server-https: ${SERVER_CERT_FILE} ${SERVER_KEY_FILE}
test:
@echo "${COLOR_GREEN}Running tests...${COLOR_RESET}"
@GOBIN=${LOCAL_BIN_DIR} go install ${PKG_gotestsum}
@go clean -testcache
@${LOCAL_BIN_DIR}/gotestsum ${GOTESTSUM_ARGS} -- ${GO_TEST_FLAGS} ./...

# test-coverage runs all tests with coverage.
Expand All @@ -119,13 +120,6 @@ test-coverage:
@GOBIN=${LOCAL_BIN_DIR} go install ${PKG_gotestsum}
@${LOCAL_BIN_DIR}/gotestsum ${GOTESTSUM_ARGS} -- ${GO_TEST_FLAGS} -coverprofile="coverage.txt" -covermode=atomic ./...

# test-clean cleans the test cache and run all tests.
test-clean: build-bin
@echo "${COLOR_GREEN}Running tests...${COLOR_RESET}"
@GOBIN=${LOCAL_BIN_DIR} go install ${PKG_gotestsum}
@go clean -testcache
@${LOCAL_BIN_DIR}/gotestsum ${GOTESTSUM_ARGS} -- ${GO_TEST_FLAGS} ./...

# lint runs the linter.
lint: golangci-lint

Expand Down
25 changes: 13 additions & 12 deletions cmd/dry.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ package cmd

import (
"log"
"os"
"path/filepath"

"github.com/daguflow/dagu/internal/agent"
Expand Down Expand Up @@ -45,32 +44,35 @@ func dryCmd() *cobra.Command {

params, err := cmd.Flags().GetString("params")
if err != nil {
initLogger.Error("Parameter retrieval failed", "error", err)
os.Exit(1)
initLogger.Fatal("Parameter retrieval failed", "error", err)
}

workflow, err := dag.Load(cfg.BaseConfig, args[0], removeQuotes(params))
if err != nil {
initLogger.Error("Workflow load failed", "error", err, "file", args[0])
os.Exit(1)
initLogger.Fatal("Workflow load failed", "error", err, "file", args[0])
}

requestID, err := generateRequestID()
if err != nil {
initLogger.Error("Request ID generation failed", "error", err)
os.Exit(1)
initLogger.Fatal("Request ID generation failed", "error", err)
}

logFile, err := openLogFile("dry_", cfg.LogDir, workflow, requestID)
logFile, err := logger.OpenLogFile(logger.LogFileConfig{
Prefix: "dry_",
LogDir: cfg.LogDir,
DAGLogDir: workflow.LogDir,
DAGName: workflow.Name,
RequestID: requestID,
})

if err != nil {
initLogger.Error(
initLogger.Fatal(
"Log file creation failed",
"error",
err,
"workflow",
workflow.Name,
)
os.Exit(1)
}
defer logFile.Close()

Expand Down Expand Up @@ -98,11 +100,10 @@ func dryCmd() *cobra.Command {
listenSignals(ctx, agt)

if err := agt.Run(ctx); err != nil {
agentLogger.Error("Workflow execution failed",
agentLogger.Fatal("Workflow execution failed",
"error", err,
"workflow", workflow.Name,
"requestID", requestID)
os.Exit(1)
}
},
}
Expand Down
56 changes: 0 additions & 56 deletions cmd/logging.go

This file was deleted.

30 changes: 14 additions & 16 deletions cmd/restart.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ package cmd

import (
"log"
"os"
"path/filepath"
"time"

Expand Down Expand Up @@ -57,18 +56,16 @@ func restartCmd() *cobra.Command {
specFilePath := args[0]
workflow, err := dag.Load(cfg.BaseConfig, specFilePath, "")
if err != nil {
initLogger.Error("Workflow load failed", "error", err, "file", args[0])
os.Exit(1)
initLogger.Fatal("Workflow load failed", "error", err, "file", args[0])
}

dataStore := newDataStores(cfg)
cli := newClient(cfg, dataStore, initLogger)

if err := stopDAGIfRunning(cli, workflow, initLogger); err != nil {
initLogger.Error("Workflow stop operation failed",
initLogger.Fatal("Workflow stop operation failed",
"error", err,
"workflow", workflow.Name)
os.Exit(1)
}

// Wait for the specified amount of time before restarting.
Expand All @@ -77,35 +74,37 @@ func restartCmd() *cobra.Command {
// Retrieve the parameter of the previous execution.
params, err := getPreviousExecutionParams(cli, workflow)
if err != nil {
initLogger.Error("Previous execution parameter retrieval failed",
initLogger.Fatal("Previous execution parameter retrieval failed",
"error", err,
"workflow", workflow.Name)
os.Exit(1)
}

// Start the DAG with the same parameter.
// Need to reload the DAG file with the parameter.
workflow, err = dag.Load(cfg.BaseConfig, specFilePath, params)
if err != nil {
initLogger.Error("Workflow reload failed",
initLogger.Fatal("Workflow reload failed",
"error", err,
"file", specFilePath,
"params", params)
os.Exit(1)
}

requestID, err := generateRequestID()
if err != nil {
initLogger.Error("Request ID generation failed", "error", err)
os.Exit(1)
initLogger.Fatal("Request ID generation failed", "error", err)
}

logFile, err := openLogFile("restart_", cfg.LogDir, workflow, requestID)
logFile, err := logger.OpenLogFile(logger.LogFileConfig{
Prefix: "restart_",
LogDir: cfg.LogDir,
DAGLogDir: workflow.LogDir,
DAGName: workflow.Name,
RequestID: requestID,
})
if err != nil {
initLogger.Error("Log file creation failed",
initLogger.Fatal("Log file creation failed",
"error", err,
"workflow", workflow.Name)
os.Exit(1)
}
defer logFile.Close()

Expand Down Expand Up @@ -133,11 +132,10 @@ func restartCmd() *cobra.Command {

listenSignals(cmd.Context(), agt)
if err := agt.Run(cmd.Context()); err != nil {
agentLogger.Error("Workflow restart failed",
agentLogger.Fatal("Workflow restart failed",
"error", err,
"workflow", workflow.Name,
"requestID", requestID)
os.Exit(1)
}
},
}
Expand Down
26 changes: 13 additions & 13 deletions cmd/retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,44 +56,45 @@ func retryCmd() *cobra.Command {
specFilePath := args[0]
absoluteFilePath, err := filepath.Abs(specFilePath)
if err != nil {
initLogger.Error("Absolute path resolution failed",
initLogger.Fatal("Absolute path resolution failed",
"error", err,
"file", specFilePath)
os.Exit(1)
}

status, err := historyStore.FindByRequestID(absoluteFilePath, requestID)
if err != nil {
initLogger.Error("Historical execution retrieval failed",
initLogger.Fatal("Historical execution retrieval failed",
"error", err,
"requestID", requestID,
"file", absoluteFilePath)
os.Exit(1)
}

// Start the DAG with the same parameters with the execution that
// is being retried.
workflow, err := dag.Load(cfg.BaseConfig, absoluteFilePath, status.Status.Params)
if err != nil {
initLogger.Error("Workflow specification load failed",
initLogger.Fatal("Workflow specification load failed",
"error", err,
"file", specFilePath,
"params", status.Status.Params)
os.Exit(1)
}

newRequestID, err := generateRequestID()
if err != nil {
initLogger.Error("Request ID generation failed", "error", err)
os.Exit(1)
initLogger.Fatal("Request ID generation failed", "error", err)
}

logFile, err := openLogFile("dry_", cfg.LogDir, workflow, newRequestID)
logFile, err := logger.OpenLogFile(logger.LogFileConfig{
Prefix: "retry_",
LogDir: cfg.LogDir,
DAGLogDir: workflow.LogDir,
DAGName: workflow.Name,
RequestID: newRequestID,
})
if err != nil {
initLogger.Error("Log file creation failed",
initLogger.Fatal("Log file creation failed",
"error", err,
"workflow", workflow.Name)
os.Exit(1)
}
defer logFile.Close()

Expand Down Expand Up @@ -126,8 +127,7 @@ func retryCmd() *cobra.Command {
listenSignals(ctx, agt)

if err := agt.Run(ctx); err != nil {
agentLogger.Error("Failed to start workflow", "error", err)
os.Exit(1)
agentLogger.Fatal("Failed to start workflow", "error", err)
}
},
}
Expand Down
4 changes: 1 addition & 3 deletions cmd/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ package cmd

import (
"log"
"os"

"github.com/daguflow/dagu/internal/config"
"github.com/daguflow/dagu/internal/logger"
Expand Down Expand Up @@ -54,14 +53,13 @@ func schedulerCmd() *cobra.Command {
cli := newClient(cfg, dataStore, logger)
sc := scheduler.New(cfg, logger, cli)
if err := sc.Start(ctx); err != nil {
logger.Error(
logger.Fatal(
"Scheduler initialization failed",
"error",
err,
"specsDirectory",
cfg.DAGs,
)
os.Exit(1)
}
},
}
Expand Down
4 changes: 1 addition & 3 deletions cmd/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ package cmd

import (
"log"
"os"

"github.com/daguflow/dagu/internal/config"
"github.com/daguflow/dagu/internal/frontend"
Expand Down Expand Up @@ -52,8 +51,7 @@ func serverCmd() *cobra.Command {
cli := newClient(cfg, dataStore, logger)
server := frontend.New(cfg, logger, cli)
if err := server.Serve(cmd.Context()); err != nil {
logger.Error("Server initialization failed", "error", err)
os.Exit(1)
logger.Fatal("Server initialization failed", "error", err)
}
},
}
Expand Down
Loading
Loading