Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#785] persistence: feat: support concurrent execution of the same DAG #868

Draft
wants to merge 25 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,7 @@ tmp/*
coverage.*

# Debug files
__debug_bin*
__debug_bin*

# Misc files
.aider*
139 changes: 90 additions & 49 deletions internal/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"github.com/dagu-org/dagu/internal/logger"
"github.com/dagu-org/dagu/internal/mailer"
"github.com/dagu-org/dagu/internal/persistence"
"github.com/dagu-org/dagu/internal/persistence/model"
"github.com/dagu-org/dagu/internal/sock"
)

Expand All @@ -34,7 +33,7 @@ import (
type Agent struct {
dag *digraph.DAG
dry bool
retryTarget *model.Status
retryTarget *persistence.Status
dagStore persistence.DAGStore
client client.Client
scheduler *scheduler.Scheduler
Expand Down Expand Up @@ -62,7 +61,7 @@ type Options struct {
// RetryTarget is the target status (history of execution) to retry.
// If it's specified the agent will execute the DAG with the same
// configuration as the specified history.
RetryTarget *model.Status
RetryTarget *persistence.Status
}

// New creates a new Agent.
Expand Down Expand Up @@ -91,14 +90,23 @@ func New(

// Run setups the scheduler and runs the DAG.
func (a *Agent) Run(ctx context.Context) error {
ctx, cancel := context.WithCancel(ctx)
defer cancel()

if err := a.setup(ctx); err != nil {
return err
return fmt.Errorf("agent setup failed: %w", err)
}

// Create a new context for the DAG execution
// Create a new context for the DAG execution with all necessary information
dbClient := newDBClient(a.historyStore, a.dagStore)
ctx = digraph.NewContext(ctx, a.dag, dbClient, a.requestID, a.logFile, a.dag.Params)

// Add structured logging context
ctx = logger.WithValues(ctx,
"dag", a.dag.Name,
"requestID", a.requestID,
)

// It should not run the DAG if the condition is unmet.
if err := a.checkPreconditions(ctx); err != nil {
logger.Info(ctx, "Preconditions are not met", "err", err)
Expand All @@ -119,16 +127,17 @@ func (a *Agent) Run(ctx context.Context) error {
// Make a connection to the database.
// It should close the connection to the history database when the DAG
// execution is finished.
if err := a.setupDatabase(ctx); err != nil {
return err
historyRecord := a.setupHistoryRecord(ctx)
if err := historyRecord.Open(ctx); err != nil {
return fmt.Errorf("failed to open history record: %w", err)
}
defer func() {
if err := a.historyStore.Close(ctx); err != nil {
if err := historyRecord.Close(ctx); err != nil {
logger.Error(ctx, "Failed to close history store", "err", err)
}
}()

if err := a.historyStore.Write(ctx, a.Status()); err != nil {
if err := historyRecord.Write(ctx, a.Status()); err != nil {
logger.Error(ctx, "Failed to write status", "err", err)
}

Expand All @@ -137,6 +146,7 @@ func (a *Agent) Run(ctx context.Context) error {
if err := a.setupSocketServer(ctx); err != nil {
return fmt.Errorf("failed to setup unix socket server: %w", err)
}

listenerErrCh := make(chan error)
go execWithRecovery(ctx, func() {
err := a.socketServer.Serve(ctx, listenerErrCh)
Expand Down Expand Up @@ -165,7 +175,7 @@ func (a *Agent) Run(ctx context.Context) error {
go execWithRecovery(ctx, func() {
for node := range done {
status := a.Status()
if err := a.historyStore.Write(ctx, status); err != nil {
if err := historyRecord.Write(ctx, status); err != nil {
logger.Error(ctx, "Failed to write status", "err", err)
}
if err := a.reporter.reportStep(ctx, a.dag, status, node); err != nil {
Expand All @@ -181,7 +191,7 @@ func (a *Agent) Run(ctx context.Context) error {
if a.finished.Load() {
return
}
if err := a.historyStore.Write(ctx, a.Status()); err != nil {
if err := historyRecord.Write(ctx, a.Status()); err != nil {
logger.Error(ctx, "Status write failed", "err", err)
}
})
Expand All @@ -192,8 +202,8 @@ func (a *Agent) Run(ctx context.Context) error {

// Update the finished status to the history database.
finishedStatus := a.Status()
logger.Info(ctx, "DAG execution finished", "status", finishedStatus.Status)
if err := a.historyStore.Write(ctx, a.Status()); err != nil {
logger.Info(ctx, "DAG execution finished", "status", finishedStatus.Status.String())
if err := historyRecord.Write(ctx, a.Status()); err != nil {
logger.Error(ctx, "Status write failed", "err", err)
}

Expand All @@ -217,7 +227,7 @@ func (a *Agent) PrintSummary(ctx context.Context) {
}

// Status collects the current running status of the DAG and returns it.
func (a *Agent) Status() model.Status {
func (a *Agent) Status() persistence.Status {
// Lock to avoid race condition.
a.lock.RLock()
defer a.lock.RUnlock()
Expand All @@ -229,19 +239,19 @@ func (a *Agent) Status() model.Status {
}

// Create the status object to record the current status.
return model.NewStatusFactory(a.dag).
return persistence.NewStatusFactory(a.dag).
Create(
a.requestID,
schedulerStatus,
os.Getpid(),
a.graph.StartAt(),
model.WithFinishedAt(a.graph.FinishAt()),
model.WithNodes(a.graph.NodeData()),
model.WithLogFilePath(a.logFile),
model.WithOnExitNode(a.scheduler.HandlerNode(digraph.HandlerOnExit)),
model.WithOnSuccessNode(a.scheduler.HandlerNode(digraph.HandlerOnSuccess)),
model.WithOnFailureNode(a.scheduler.HandlerNode(digraph.HandlerOnFailure)),
model.WithOnCancelNode(a.scheduler.HandlerNode(digraph.HandlerOnCancel)),
persistence.WithFinishedAt(a.graph.FinishAt()),
persistence.WithNodes(a.graph.NodeData()),
persistence.WithLogFilePath(a.logFile),
persistence.WithOnExitNode(a.scheduler.HandlerNode(digraph.HandlerOnExit)),
persistence.WithOnSuccessNode(a.scheduler.HandlerNode(digraph.HandlerOnSuccess)),
persistence.WithOnFailureNode(a.scheduler.HandlerNode(digraph.HandlerOnFailure)),
persistence.WithOnCancelNode(a.scheduler.HandlerNode(digraph.HandlerOnCancel)),
)
}

Expand Down Expand Up @@ -376,33 +386,45 @@ func (a *Agent) dryRun(ctx context.Context) error {
// process by sending a SIGKILL to force the process to be shutdown.
// if processes do not terminate after MaxCleanUp time, it sends KILL signal.
func (a *Agent) signal(ctx context.Context, sig os.Signal, allowOverride bool) {
logger.Info(ctx, "Sending signal to running child processes", "signal", sig)
done := make(chan bool)
logger.Info(ctx, "Sending signal to running child processes",
"signal", sig.String(),
"allowOverride", allowOverride,
"maxCleanupTime", a.dag.MaxCleanUpTime/time.Second)

signalCtx, cancel := context.WithTimeout(ctx, a.dag.MaxCleanUpTime)
defer cancel()

done := make(chan bool, 1)
go func() {
a.scheduler.Signal(ctx, a.graph, sig, done, allowOverride)
}()
timeout := time.NewTimer(a.dag.MaxCleanUpTime)
tick := time.NewTimer(time.Second * 5)
defer timeout.Stop()
defer tick.Stop()

ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()

for {
select {
case <-done:
logger.Info(ctx, "All child processes have been terminated")
return
case <-timeout.C:
logger.Info(ctx, "Time reached to max cleanup time")
logger.Info(ctx, "Sending KILL signal to running child processes.")

case <-signalCtx.Done():
logger.Info(ctx, "Max cleanup time reached, sending SIGKILL to force termination")
// Force kill with SIGKILL and don't wait for completion
a.scheduler.Signal(ctx, a.graph, syscall.SIGKILL, nil, false)
return
case <-tick.C:
logger.Info(ctx, "Sending signal again")

case <-ticker.C:
logger.Info(ctx, "Resending signal to processes that haven't terminated",
"signal", sig.String())
a.scheduler.Signal(ctx, a.graph, sig, nil, false)
tick.Reset(time.Second * 5)
default:
logger.Info(ctx, "Waiting for child processes to exit...")
time.Sleep(time.Second * 3)

case <-time.After(500 * time.Millisecond):
// Quick check to avoid busy waiting, but still responsive
if a.graph != nil && !a.graph.IsRunning() {
logger.Info(ctx, "No running processes detected, termination complete")
return
}
}
}
}
Expand Down Expand Up @@ -436,14 +458,13 @@ func (a *Agent) setupGraphForRetry(ctx context.Context) error {
return nil
}

// setup database prepare database connection and remove old history data.
func (a *Agent) setupDatabase(ctx context.Context) error {
func (a *Agent) setupHistoryRecord(ctx context.Context) persistence.HistoryRecord {
location, retentionDays := a.dag.Location, a.dag.HistRetentionDays
if err := a.historyStore.RemoveOld(ctx, location, retentionDays); err != nil {
logger.Error(ctx, "History data cleanup failed", "err", err)
}

return a.historyStore.Open(ctx, a.dag.Location, time.Now(), a.requestID)
return a.historyStore.NewRecord(ctx, location, time.Now(), a.requestID)
}

// setupSocketServer create socket server instance.
Expand Down Expand Up @@ -483,18 +504,34 @@ func (a *Agent) checkIsAlreadyRunning(ctx context.Context) error {
return nil
}

// execWithRecovery executes a function with panic recovery and detailed error reporting
// It captures stack traces and provides structured error information for debugging
func execWithRecovery(ctx context.Context, fn func()) {
defer func() {
if panicObj := recover(); panicObj != nil {
err, ok := panicObj.(error)
if !ok {
err = fmt.Errorf("panic: %v", panicObj)
stack := debug.Stack()

// Convert panic object to error
var err error
switch v := panicObj.(type) {
case error:
err = v
case string:
err = fmt.Errorf("panic: %s", v)
default:
err = fmt.Errorf("panic: %v", v)
}
st := string(debug.Stack())
logger.Error(ctx, "Panic occurred", "err", err, "st", st)

// Log with structured information
logger.Error(ctx, "Recovered from panic",
"error", err.Error(),
"errorType", fmt.Sprintf("%T", panicObj),
"stackTrace", stack,
"fullStack", string(stack))
}
}()

// Execute the function
fn()
}

Expand Down Expand Up @@ -536,13 +573,17 @@ func (o *dbClient) GetDAG(ctx context.Context, name string) (*digraph.DAG, error
}

func (o *dbClient) GetStatus(ctx context.Context, name string, requestID string) (*digraph.Status, error) {
status, err := o.historyStore.FindByRequestID(ctx, name, requestID)
historyRecord, err := o.historyStore.FindByRequestID(ctx, name, requestID)
if err != nil {
return nil, err
}
status, err := historyRecord.ReadStatus(ctx)
if err != nil {
return nil, err
}

outputVariables := map[string]string{}
for _, node := range status.Status.Nodes {
for _, node := range status.Nodes {
if node.Step.OutputVariables != nil {
node.Step.OutputVariables.Range(func(_, value any) bool {
// split the value by '=' to get the key and value
Expand All @@ -557,7 +598,7 @@ func (o *dbClient) GetStatus(ctx context.Context, name string, requestID string)

return &digraph.Status{
Outputs: outputVariables,
Name: status.Status.Name,
Params: status.Status.Params,
Name: status.Name,
Params: status.Params,
}, nil
}
4 changes: 2 additions & 2 deletions internal/agent/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,11 @@ import (
"testing"

"github.com/dagu-org/dagu/internal/agent"
"github.com/dagu-org/dagu/internal/persistence"
"github.com/dagu-org/dagu/internal/test"

"github.com/dagu-org/dagu/internal/digraph"
"github.com/dagu-org/dagu/internal/digraph/scheduler"
"github.com/dagu-org/dagu/internal/persistence/model"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -215,7 +215,7 @@ func TestAgent_HandleHTTP(t *testing.T) {
require.Equal(t, http.StatusOK, mockResponseWriter.status)

// Check if the status is returned correctly
status, err := model.StatusFromJSON(mockResponseWriter.body)
status, err := persistence.StatusFromJSON(mockResponseWriter.body)
require.NoError(t, err)
require.Equal(t, scheduler.StatusRunning, status.Status)

Expand Down
Loading
Loading