Skip to content

Commit

Permalink
fix errors
Browse files Browse the repository at this point in the history
  • Loading branch information
yottahmd committed Feb 28, 2025
1 parent 3239ed4 commit 1d0b644
Show file tree
Hide file tree
Showing 9 changed files with 101 additions and 43 deletions.
33 changes: 19 additions & 14 deletions internal/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,16 +118,17 @@ func (a *Agent) Run(ctx context.Context) error {
// Make a connection to the database.
// It should close the connection to the history database when the DAG
// execution is finished.
if err := a.setupDatabase(ctx); err != nil {
return err
historyRecord := a.setupHistoryRecord(ctx)
if err := historyRecord.Open(ctx); err != nil {
return fmt.Errorf("failed to open history record: %w", err)
}
defer func() {
if err := a.historyStore.Close(ctx); err != nil {
if err := historyRecord.Close(ctx); err != nil {
logger.Error(ctx, "Failed to close history store", "err", err)
}
}()

if err := a.historyStore.Write(ctx, a.Status()); err != nil {
if err := historyRecord.Write(ctx, a.Status()); err != nil {
logger.Error(ctx, "Failed to write status", "err", err)
}

Expand All @@ -136,6 +137,7 @@ func (a *Agent) Run(ctx context.Context) error {
if err := a.setupSocketServer(ctx); err != nil {
return fmt.Errorf("failed to setup unix socket server: %w", err)
}

listenerErrCh := make(chan error)
go execWithRecovery(ctx, func() {
err := a.socketServer.Serve(ctx, listenerErrCh)
Expand Down Expand Up @@ -164,7 +166,7 @@ func (a *Agent) Run(ctx context.Context) error {
go execWithRecovery(ctx, func() {
for node := range done {
status := a.Status()
if err := a.historyStore.Write(ctx, status); err != nil {
if err := historyRecord.Write(ctx, status); err != nil {
logger.Error(ctx, "Failed to write status", "err", err)
}
if err := a.reporter.reportStep(ctx, a.dag, status, node); err != nil {
Expand All @@ -180,7 +182,7 @@ func (a *Agent) Run(ctx context.Context) error {
if a.finished.Load() {
return
}
if err := a.historyStore.Write(ctx, a.Status()); err != nil {
if err := historyRecord.Write(ctx, a.Status()); err != nil {
logger.Error(ctx, "Status write failed", "err", err)
}
})
Expand All @@ -192,7 +194,7 @@ func (a *Agent) Run(ctx context.Context) error {
// Update the finished status to the history database.
finishedStatus := a.Status()
logger.Info(ctx, "DAG execution finished", "status", finishedStatus.Status)
if err := a.historyStore.Write(ctx, a.Status()); err != nil {
if err := historyRecord.Write(ctx, a.Status()); err != nil {
logger.Error(ctx, "Status write failed", "err", err)
}

Expand Down Expand Up @@ -435,14 +437,13 @@ func (a *Agent) setupGraphForRetry(ctx context.Context) error {
return nil
}

// setup database prepare database connection and remove old history data.
func (a *Agent) setupDatabase(ctx context.Context) error {
func (a *Agent) setupHistoryRecord(ctx context.Context) persistence.HistoryRecord {
location, retentionDays := a.dag.Location, a.dag.HistRetentionDays
if err := a.historyStore.RemoveOld(ctx, location, retentionDays); err != nil {
logger.Error(ctx, "History data cleanup failed", "err", err)
}

return a.historyStore.Open(ctx, a.dag.Location, time.Now(), a.requestID)
return a.historyStore.NewRecord(ctx, location, time.Now(), a.requestID)
}

// setupSocketServer create socket server instance.
Expand Down Expand Up @@ -535,13 +536,17 @@ func (o *dbClient) GetDAG(ctx context.Context, name string) (*digraph.DAG, error
}

func (o *dbClient) GetStatus(ctx context.Context, name string, requestID string) (*digraph.Status, error) {
status, err := o.historyStore.FindByRequestID(ctx, name, requestID)
historyRecord, err := o.historyStore.FindByRequestID(ctx, name, requestID)
if err != nil {
return nil, err
}
status, err := historyRecord.ReadStatus(ctx)
if err != nil {
return nil, err
}

outputVariables := map[string]string{}
for _, node := range status.Status.Nodes {
for _, node := range status.Nodes {
if node.Step.OutputVariables != nil {
node.Step.OutputVariables.Range(func(_, value any) bool {
// split the value by '=' to get the key and value
Expand All @@ -556,7 +561,7 @@ func (o *dbClient) GetStatus(ctx context.Context, name string, requestID string)

return &digraph.Status{
Outputs: outputVariables,
Name: status.Status.Name,
Params: status.Status.Params,
Name: status.Name,
Params: status.Params,
}, nil
}
69 changes: 51 additions & 18 deletions internal/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,54 +186,86 @@ func (*client) GetCurrentStatus(_ context.Context, dag *digraph.DAG) (*persisten
func (e *client) GetStatusByRequestID(ctx context.Context, dag *digraph.DAG, requestID string) (
*persistence.Status, error,
) {
ret, err := e.historyStore.FindByRequestID(ctx, dag.Location, requestID)
record, err := e.historyStore.FindByRequestID(ctx, dag.Location, requestID)
if err != nil {
return nil, err
return nil, fmt.Errorf("failed to find status by request id: %w", err)
}
historyStatus, err := record.ReadStatus(ctx)
if err != nil {
return nil, fmt.Errorf("failed to read status: %w", err)
}

// If the DAG is running, set the status to error if the request ID does not match
// Because the DAG execution must be stopped
// TODO: Handle different request IDs for the same DAG
status, _ := e.GetCurrentStatus(ctx, dag)
if status != nil && status.RequestID != requestID {
// if the request id is not matched then correct the status
ret.Status.CorrectRunningStatus()
historyStatus.SetStatusToErrorIfRunning()
}
return &ret.Status, err

return historyStatus, err
}

func (*client) currentStatus(_ context.Context, dag *digraph.DAG) (*persistence.Status, error) {
client := sock.NewClient(dag.SockAddr())
ret, err := client.Request("GET", "/status")
statusJSON, err := client.Request("GET", "/status")
if err != nil {
return nil, fmt.Errorf("failed to get status: %w", err)
}
return persistence.StatusFromJSON(ret)

return persistence.StatusFromJSON(statusJSON)
}

func (e *client) GetLatestStatus(ctx context.Context, dag *digraph.DAG) (persistence.Status, error) {
currStatus, _ := e.currentStatus(ctx, dag)
if currStatus != nil {
return *currStatus, nil
}
status, err := e.historyStore.ReadStatusToday(ctx, dag.Location)

var latestStatus *persistence.Status

record, err := e.historyStore.ReadToday(ctx, dag.Location)
if err != nil {
status := persistence.NewStatusFactory(dag).CreateDefault()
if errors.Is(err, persistence.ErrNoStatusDataToday) ||
errors.Is(err, persistence.ErrNoStatusData) {
// No status for today
return status, nil
}
return status, err
goto handleError
}
status.CorrectRunningStatus()
return *status, nil

latestStatus, err = record.ReadStatus(ctx)
if err != nil {
goto handleError
}

latestStatus.SetStatusToErrorIfRunning()
return *latestStatus, nil

handleError:

if errors.Is(err, persistence.ErrNoStatusDataToday) ||
errors.Is(err, persistence.ErrNoStatusData) {
// No status for today
return persistence.NewStatusFactory(dag).CreateDefault(), nil
}

return persistence.NewStatusFactory(dag).CreateDefault(), err
}

func (e *client) GetRecentHistory(ctx context.Context, dag *digraph.DAG, n int) []persistence.StatusFile {
return e.historyStore.ReadStatusRecent(ctx, dag.Location, n)
records := e.historyStore.ReadRecent(ctx, dag.Location, n)

var ret []persistence.StatusFile
for _, record := range records {
if statusFile, err := record.Read(ctx); err == nil {
ret = append(ret, *statusFile)
}
}

return ret
}

var errDAGIsRunning = errors.New("the DAG is running")

func (e *client) UpdateStatus(ctx context.Context, dag *digraph.DAG, status persistence.Status) error {
client := sock.NewClient(dag.SockAddr())

res, err := client.Request("GET", "/status")
if err != nil {
if errors.Is(err, sock.ErrTimeout) {
Expand All @@ -246,6 +278,7 @@ func (e *client) UpdateStatus(ctx context.Context, dag *digraph.DAG, status pers
return errDAGIsRunning
}
}

return e.historyStore.Update(ctx, dag.Location, status.RequestID, status)
}

Expand Down
8 changes: 5 additions & 3 deletions internal/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,14 +75,16 @@ func TestClient_GetStatus(t *testing.T) {
cli := th.Client

// Open the history store and write a status before updating it.
err := th.HistoryStore.Open(ctx, dag.Location, now, requestID)
record := th.HistoryStore.NewRecord(ctx, dag.Location, now, requestID)

err := record.Open(ctx)
require.NoError(t, err)

status := testNewStatus(dag.DAG, requestID, scheduler.StatusSuccess, scheduler.NodeStatusSuccess)

err = th.HistoryStore.Write(ctx, status)
err = record.Write(ctx, status)
require.NoError(t, err)
_ = th.HistoryStore.Close(ctx)
_ = record.Close(ctx)

// Get the status and check if it is the same as the one we wrote.
statusToCheck, err := cli.GetStatusByRequestID(ctx, dag.DAG, requestID)
Expand Down
8 changes: 7 additions & 1 deletion internal/cmd/retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func runRetry(ctx *Context, args []string) error {
return fmt.Errorf("failed to resolve absolute path for %s: %w", specFilePath, err)
}

status, err := ctx.historyStore().FindByRequestID(ctx, absolutePath, requestID)
historyRecord, err := ctx.historyStore().FindByRequestID(ctx, absolutePath, requestID)
if err != nil {
logger.Error(ctx, "Failed to retrieve historical execution", "requestID", requestID, "err", err)
return fmt.Errorf("failed to retrieve historical execution for request ID %s: %w", requestID, err)
Expand All @@ -55,6 +55,12 @@ func runRetry(ctx *Context, args []string) error {
digraph.WithBaseConfig(ctx.cfg.Paths.BaseConfig),
}

status, err := historyRecord.Read(ctx)
if err != nil {
logger.Error(ctx, "Failed to read status", "err", err)
return fmt.Errorf("failed to read status: %w", err)
}

if status.Status.Params != "" {
// backward compatibility
loadOpts = append(loadOpts, digraph.WithParams(status.Status.Params))
Expand Down
12 changes: 8 additions & 4 deletions internal/cmd/status_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,16 @@ func TestStatusCommand(t *testing.T) {
}()

require.Eventually(t, func() bool {
status := th.HistoryStore.ReadStatusRecent(th.Context, dagFile.Location, 1)
if len(status) < 1 {
historyRecords := th.HistoryStore.ReadRecent(th.Context, dagFile.Location, 1)
if len(historyRecords) < 1 {
return false
}
println(status[0].Status.Status.String())
return scheduler.StatusRunning == status[0].Status.Status
status, err := historyRecords[0].ReadStatus(th.Context)
if err != nil {
return false
}

return scheduler.StatusRunning == status.Status
}, time.Second*3, time.Millisecond*50)

// Check the current status.
Expand Down
2 changes: 2 additions & 0 deletions internal/persistence/jsondb/jsondb.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,9 +213,11 @@ func (db *JSONDB) Rename(_ context.Context, oldKey, newKey string) error {
log.Printf("failed to rename %s to %s: %s", m, f, err)
}
}

if files, _ := os.ReadDir(oldDir); len(files) == 0 {
_ = os.Remove(oldDir)
}

return nil
}

Expand Down
8 changes: 7 additions & 1 deletion internal/persistence/jsondb/record.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,13 @@ func (hr *HistoryRecord) Read(_ context.Context) (*persistence.StatusFile, error
// parseLocked reads the status file and returns the last valid status.
// Must be called with a lock (read or write) already held.
func (hr *HistoryRecord) parseLocked() (*persistence.Status, error) {
f, err := os.Open(hr.file)
return ParseStatusFile(hr.file)
}

// ParseStatusFile reads the status file and returns the last valid status.
// TODO: Remove this function and use HistoryRecord.ReadStatus instead.
func ParseStatusFile(file string) (*persistence.Status, error) {
f, err := os.Open(file)
if err != nil {
return nil, fmt.Errorf("%w: %v", ErrReadFailed, err)
}
Expand Down
2 changes: 1 addition & 1 deletion internal/persistence/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ type Status struct {
ParamsList []string `json:"ParamsList,omitempty"`
}

func (st *Status) CorrectRunningStatus() {
func (st *Status) SetStatusToErrorIfRunning() {
if st.Status == scheduler.StatusRunning {
st.Status = scheduler.StatusError
st.StatusText = st.Status.String()
Expand Down
2 changes: 1 addition & 1 deletion internal/persistence/status_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func TestCorrectRunningStatus(t *testing.T) {
dag := &digraph.DAG{Name: "test"}
requestID := "request-id-testII"
status := persistence.NewStatusFactory(dag).Create(requestID, scheduler.StatusRunning, 0, time.Now())
status.CorrectRunningStatus()
status.SetStatusToErrorIfRunning()
require.Equal(t, scheduler.StatusError, status.Status)
}

Expand Down

0 comments on commit 1d0b644

Please sign in to comment.