Skip to content

Commit

Permalink
refactor persistence package
Browse files Browse the repository at this point in the history
  • Loading branch information
yottahmd committed Feb 27, 2025
1 parent 759d89d commit dbff7dd
Show file tree
Hide file tree
Showing 23 changed files with 128 additions and 147 deletions.
23 changes: 11 additions & 12 deletions internal/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"github.com/dagu-org/dagu/internal/logger"
"github.com/dagu-org/dagu/internal/mailer"
"github.com/dagu-org/dagu/internal/persistence"
"github.com/dagu-org/dagu/internal/persistence/model"
"github.com/dagu-org/dagu/internal/sock"
)

Expand All @@ -34,7 +33,7 @@ import (
type Agent struct {
dag *digraph.DAG
dry bool
retryTarget *model.Status
retryTarget *persistence.Status
dagStore persistence.DAGStore
client client.Client
scheduler *scheduler.Scheduler
Expand Down Expand Up @@ -62,7 +61,7 @@ type Options struct {
// RetryTarget is the target status (history of execution) to retry.
// If it's specified the agent will execute the DAG with the same
// configuration as the specified history.
RetryTarget *model.Status
RetryTarget *persistence.Status
}

// New creates a new Agent.
Expand Down Expand Up @@ -217,7 +216,7 @@ func (a *Agent) PrintSummary(ctx context.Context) {
}

// Status collects the current running status of the DAG and returns it.
func (a *Agent) Status() model.Status {
func (a *Agent) Status() persistence.Status {
// Lock to avoid race condition.
a.lock.RLock()
defer a.lock.RUnlock()
Expand All @@ -229,19 +228,19 @@ func (a *Agent) Status() model.Status {
}

// Create the status object to record the current status.
return model.NewStatusFactory(a.dag).
return persistence.NewStatusFactory(a.dag).
Create(
a.requestID,
schedulerStatus,
os.Getpid(),
a.graph.StartAt(),
model.WithFinishedAt(a.graph.FinishAt()),
model.WithNodes(a.graph.NodeData()),
model.WithLogFilePath(a.logFile),
model.WithOnExitNode(a.scheduler.HandlerNode(digraph.HandlerOnExit)),
model.WithOnSuccessNode(a.scheduler.HandlerNode(digraph.HandlerOnSuccess)),
model.WithOnFailureNode(a.scheduler.HandlerNode(digraph.HandlerOnFailure)),
model.WithOnCancelNode(a.scheduler.HandlerNode(digraph.HandlerOnCancel)),
persistence.WithFinishedAt(a.graph.FinishAt()),
persistence.WithNodes(a.graph.NodeData()),
persistence.WithLogFilePath(a.logFile),
persistence.WithOnExitNode(a.scheduler.HandlerNode(digraph.HandlerOnExit)),
persistence.WithOnSuccessNode(a.scheduler.HandlerNode(digraph.HandlerOnSuccess)),
persistence.WithOnFailureNode(a.scheduler.HandlerNode(digraph.HandlerOnFailure)),
persistence.WithOnCancelNode(a.scheduler.HandlerNode(digraph.HandlerOnCancel)),
)
}

Expand Down
4 changes: 2 additions & 2 deletions internal/agent/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,11 @@ import (
"testing"

"github.com/dagu-org/dagu/internal/agent"
"github.com/dagu-org/dagu/internal/persistence"
"github.com/dagu-org/dagu/internal/test"

"github.com/dagu-org/dagu/internal/digraph"
"github.com/dagu-org/dagu/internal/digraph/scheduler"
"github.com/dagu-org/dagu/internal/persistence/model"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -215,7 +215,7 @@ func TestAgent_HandleHTTP(t *testing.T) {
require.Equal(t, http.StatusOK, mockResponseWriter.status)

// Check if the status is returned correctly
status, err := model.StatusFromJSON(mockResponseWriter.body)
status, err := persistence.StatusFromJSON(mockResponseWriter.body)
require.NoError(t, err)
require.Equal(t, scheduler.StatusRunning, status.Status)

Expand Down
16 changes: 8 additions & 8 deletions internal/agent/reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
"github.com/dagu-org/dagu/internal/digraph"
"github.com/dagu-org/dagu/internal/digraph/scheduler"
"github.com/dagu-org/dagu/internal/logger"
"github.com/dagu-org/dagu/internal/persistence/model"
"github.com/dagu-org/dagu/internal/persistence"
"github.com/jedib0t/go-pretty/v6/table"
)

Expand All @@ -28,7 +28,7 @@ func newReporter(sender Sender) *reporter {

// reportStep is a function that reports the status of a step.
func (r *reporter) reportStep(
ctx context.Context, dag *digraph.DAG, status model.Status, node *scheduler.Node,
ctx context.Context, dag *digraph.DAG, status persistence.Status, node *scheduler.Node,
) error {
nodeStatus := node.State().Status
if nodeStatus != scheduler.NodeStatusNone {
Expand All @@ -46,7 +46,7 @@ func (r *reporter) reportStep(
}

// report is a function that reports the status of the scheduler.
func (r *reporter) getSummary(_ context.Context, status model.Status, err error) string {
func (r *reporter) getSummary(_ context.Context, status persistence.Status, err error) string {
var buf bytes.Buffer
_, _ = buf.Write([]byte("\n"))
_, _ = buf.Write([]byte("Summary ->\n"))
Expand All @@ -58,7 +58,7 @@ func (r *reporter) getSummary(_ context.Context, status model.Status, err error)
}

// send is a function that sends a report mail.
func (r *reporter) send(ctx context.Context, dag *digraph.DAG, status model.Status, err error) error {
func (r *reporter) send(ctx context.Context, dag *digraph.DAG, status persistence.Status, err error) error {
if err != nil || status.Status == scheduler.StatusError {
if dag.MailOn != nil && dag.MailOn.Failure {
fromAddress := dag.ErrorMail.From
Expand Down Expand Up @@ -91,7 +91,7 @@ var dagHeader = table.Row{
"Error",
}

func renderDAGSummary(status model.Status, err error) string {
func renderDAGSummary(status persistence.Status, err error) string {
dataRow := table.Row{
status.RequestID,
status.Name,
Expand Down Expand Up @@ -122,7 +122,7 @@ var stepHeader = table.Row{
"Error",
}

func renderStepSummary(nodes []*model.Node) string {
func renderStepSummary(nodes []*persistence.Node) string {
stepTable := table.NewWriter()
stepTable.AppendHeader(stepHeader)

Expand All @@ -147,7 +147,7 @@ func renderStepSummary(nodes []*model.Node) string {
return stepTable.Render()
}

func renderHTML(nodes []*model.Node) string {
func renderHTML(nodes []*persistence.Node) string {
var buffer bytes.Buffer
addValFunc := func(val string) {
_, _ = buffer.WriteString(
Expand Down Expand Up @@ -195,7 +195,7 @@ func renderHTML(nodes []*model.Node) string {
}

func addAttachments(
trigger bool, nodes []*model.Node,
trigger bool, nodes []*persistence.Node,
) (attachments []string) {
if trigger {
for _, n := range nodes {
Expand Down
24 changes: 12 additions & 12 deletions internal/agent/reporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,14 @@ import (

"github.com/dagu-org/dagu/internal/digraph"
"github.com/dagu-org/dagu/internal/digraph/scheduler"
"github.com/dagu-org/dagu/internal/persistence/model"
"github.com/dagu-org/dagu/internal/persistence"
"github.com/dagu-org/dagu/internal/stringutil"
"github.com/stretchr/testify/require"
)

func TestReporter(t *testing.T) {
for scenario, fn := range map[string]func(
t *testing.T, rp *reporter, dag *digraph.DAG, nodes []*model.Node,
t *testing.T, rp *reporter, dag *digraph.DAG, nodes []*persistence.Node,
){
"create error mail": testErrorMail,
"no error mail": testNoErrorMail,
Expand Down Expand Up @@ -49,7 +49,7 @@ func TestReporter(t *testing.T) {
},
}

nodes := []*model.Node{
nodes := []*persistence.Node{
{
Step: digraph.Step{
Name: "test-step",
Expand All @@ -69,11 +69,11 @@ func TestReporter(t *testing.T) {
}
}

func testErrorMail(t *testing.T, rp *reporter, dag *digraph.DAG, nodes []*model.Node) {
func testErrorMail(t *testing.T, rp *reporter, dag *digraph.DAG, nodes []*persistence.Node) {
dag.MailOn.Failure = true
dag.MailOn.Success = false

_ = rp.send(context.Background(), dag, model.Status{
_ = rp.send(context.Background(), dag, persistence.Status{
Status: scheduler.StatusError,
Nodes: nodes,
}, fmt.Errorf("Error"))
Expand All @@ -85,11 +85,11 @@ func testErrorMail(t *testing.T, rp *reporter, dag *digraph.DAG, nodes []*model.
require.Equal(t, 1, mock.count)
}

func testNoErrorMail(t *testing.T, rp *reporter, dag *digraph.DAG, nodes []*model.Node) {
func testNoErrorMail(t *testing.T, rp *reporter, dag *digraph.DAG, nodes []*persistence.Node) {
dag.MailOn.Failure = false
dag.MailOn.Success = true

err := rp.send(context.Background(), dag, model.Status{
err := rp.send(context.Background(), dag, persistence.Status{
Status: scheduler.StatusError,
Nodes: nodes,
}, nil)
Expand All @@ -100,11 +100,11 @@ func testNoErrorMail(t *testing.T, rp *reporter, dag *digraph.DAG, nodes []*mode
require.Equal(t, 0, mock.count)
}

func testSuccessMail(t *testing.T, rp *reporter, dag *digraph.DAG, nodes []*model.Node) {
func testSuccessMail(t *testing.T, rp *reporter, dag *digraph.DAG, nodes []*persistence.Node) {
dag.MailOn.Failure = true
dag.MailOn.Success = true

err := rp.send(context.Background(), dag, model.Status{
err := rp.send(context.Background(), dag, persistence.Status{
Status: scheduler.StatusSuccess,
Nodes: nodes,
}, nil)
Expand All @@ -117,14 +117,14 @@ func testSuccessMail(t *testing.T, rp *reporter, dag *digraph.DAG, nodes []*mode
require.Equal(t, 1, mock.count)
}

func testRenderSummary(t *testing.T, _ *reporter, dag *digraph.DAG, nodes []*model.Node) {
status := model.NewStatusFactory(dag).Create("request-id", scheduler.StatusError, 0, time.Now())
func testRenderSummary(t *testing.T, _ *reporter, dag *digraph.DAG, nodes []*persistence.Node) {
status := persistence.NewStatusFactory(dag).Create("request-id", scheduler.StatusError, 0, time.Now())
summary := renderDAGSummary(status, errors.New("test error"))
require.Contains(t, summary, "test error")
require.Contains(t, summary, dag.Name)
}

func testRenderTable(t *testing.T, _ *reporter, _ *digraph.DAG, nodes []*model.Node) {
func testRenderTable(t *testing.T, _ *reporter, _ *digraph.DAG, nodes []*persistence.Node) {
summary := renderStepSummary(nodes)
require.Contains(t, summary, nodes[0].Step.Name)
require.Contains(t, summary, nodes[0].Step.Args[0])
Expand Down
23 changes: 11 additions & 12 deletions internal/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (
"github.com/dagu-org/dagu/internal/frontend/gen/restapi/operations/dags"
"github.com/dagu-org/dagu/internal/logger"
"github.com/dagu-org/dagu/internal/persistence"
"github.com/dagu-org/dagu/internal/persistence/model"
"github.com/dagu-org/dagu/internal/sock"
)

Expand Down Expand Up @@ -170,22 +169,22 @@ func (e *client) Retry(_ context.Context, dag *digraph.DAG, requestID string) er
return cmd.Wait()
}

func (*client) GetCurrentStatus(_ context.Context, dag *digraph.DAG) (*model.Status, error) {
func (*client) GetCurrentStatus(_ context.Context, dag *digraph.DAG) (*persistence.Status, error) {
client := sock.NewClient(dag.SockAddr())
ret, err := client.Request("GET", "/status")
if err != nil {
if errors.Is(err, sock.ErrTimeout) {
return nil, err
}
// The DAG is not running so return the default status
status := model.NewStatusFactory(dag).CreateDefault()
status := persistence.NewStatusFactory(dag).CreateDefault()
return &status, nil
}
return model.StatusFromJSON(ret)
return persistence.StatusFromJSON(ret)
}

func (e *client) GetStatusByRequestID(ctx context.Context, dag *digraph.DAG, requestID string) (
*model.Status, error,
*persistence.Status, error,
) {
ret, err := e.historyStore.FindByRequestID(ctx, dag.Location, requestID)
if err != nil {
Expand All @@ -199,23 +198,23 @@ func (e *client) GetStatusByRequestID(ctx context.Context, dag *digraph.DAG, req
return &ret.Status, err
}

func (*client) currentStatus(_ context.Context, dag *digraph.DAG) (*model.Status, error) {
func (*client) currentStatus(_ context.Context, dag *digraph.DAG) (*persistence.Status, error) {
client := sock.NewClient(dag.SockAddr())
ret, err := client.Request("GET", "/status")
if err != nil {
return nil, fmt.Errorf("failed to get status: %w", err)
}
return model.StatusFromJSON(ret)
return persistence.StatusFromJSON(ret)
}

func (e *client) GetLatestStatus(ctx context.Context, dag *digraph.DAG) (model.Status, error) {
func (e *client) GetLatestStatus(ctx context.Context, dag *digraph.DAG) (persistence.Status, error) {
currStatus, _ := e.currentStatus(ctx, dag)
if currStatus != nil {
return *currStatus, nil
}
status, err := e.historyStore.ReadStatusToday(ctx, dag.Location)
if err != nil {
status := model.NewStatusFactory(dag).CreateDefault()
status := persistence.NewStatusFactory(dag).CreateDefault()
if errors.Is(err, persistence.ErrNoStatusDataToday) ||
errors.Is(err, persistence.ErrNoStatusData) {
// No status for today
Expand All @@ -227,21 +226,21 @@ func (e *client) GetLatestStatus(ctx context.Context, dag *digraph.DAG) (model.S
return *status, nil
}

func (e *client) GetRecentHistory(ctx context.Context, dag *digraph.DAG, n int) []model.StatusFile {
func (e *client) GetRecentHistory(ctx context.Context, dag *digraph.DAG, n int) []persistence.StatusFile {
return e.historyStore.ReadStatusRecent(ctx, dag.Location, n)
}

var errDAGIsRunning = errors.New("the DAG is running")

func (e *client) UpdateStatus(ctx context.Context, dag *digraph.DAG, status model.Status) error {
func (e *client) UpdateStatus(ctx context.Context, dag *digraph.DAG, status persistence.Status) error {
client := sock.NewClient(dag.SockAddr())
res, err := client.Request("GET", "/status")
if err != nil {
if errors.Is(err, sock.ErrTimeout) {
return err
}
} else {
unmarshalled, _ := model.StatusFromJSON(res)
unmarshalled, _ := persistence.StatusFromJSON(res)
if unmarshalled != nil && unmarshalled.RequestID == status.RequestID &&
unmarshalled.Status == scheduler.StatusRunning {
return errDAGIsRunning
Expand Down
12 changes: 6 additions & 6 deletions internal/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
"github.com/dagu-org/dagu/internal/client"
"github.com/dagu-org/dagu/internal/digraph"
"github.com/dagu-org/dagu/internal/digraph/scheduler"
"github.com/dagu-org/dagu/internal/persistence/model"
"github.com/dagu-org/dagu/internal/persistence"
"github.com/dagu-org/dagu/internal/sock"
"github.com/dagu-org/dagu/internal/test"
)
Expand All @@ -31,7 +31,7 @@ func TestClient_GetStatus(t *testing.T) {
socketServer, _ := sock.NewServer(
dag.SockAddr(),
func(w http.ResponseWriter, _ *http.Request) {
status := model.NewStatusFactory(dag.DAG).Create(
status := persistence.NewStatusFactory(dag.DAG).Create(
requestID, scheduler.StatusRunning, 0, time.Now(),
)
w.WriteHeader(http.StatusOK)
Expand Down Expand Up @@ -308,11 +308,11 @@ func TestClient_ReadHistory(t *testing.T) {
})
}

func testNewStatus(dag *digraph.DAG, requestID string, status scheduler.Status, nodeStatus scheduler.NodeStatus) model.Status {
func testNewStatus(dag *digraph.DAG, requestID string, status scheduler.Status, nodeStatus scheduler.NodeStatus) persistence.Status {
nodes := []scheduler.NodeData{{State: scheduler.NodeState{Status: nodeStatus}}}
startedAt := model.Time(time.Now())
return model.NewStatusFactory(dag).Create(
requestID, status, 0, *startedAt, model.WithNodes(nodes),
startedAt := persistence.Time(time.Now())
return persistence.NewStatusFactory(dag).Create(
requestID, status, 0, *startedAt, persistence.WithNodes(nodes),
)
}

Expand Down
Loading

0 comments on commit dbff7dd

Please sign in to comment.