Skip to content

Commit

Permalink
PROTO
Browse files Browse the repository at this point in the history
  • Loading branch information
256dpi committed Nov 23, 2023
1 parent 94e9b54 commit 1a3c1c3
Show file tree
Hide file tree
Showing 5 changed files with 129 additions and 10 deletions.
11 changes: 11 additions & 0 deletions axe/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,14 @@ import (
"github.com/256dpi/fire/stick"
)

// TODO: Add "delay" parameter to allow control of the task execution delay.

// Error is used to control retry a cancellation. These errors are expected and
// are not forwarded to the reporter.
type Error struct {
Reason string
Retry bool
Delay time.Duration
}

// E is a shorthand to construct an error. If retry is true the job will be
Expand All @@ -29,6 +32,14 @@ func E(reason string, retry bool) *Error {
}
}

func ED(reason string, retry bool, delay time.Duration) *Error {
return &Error{
Reason: reason,
Retry: retry,
Delay: delay,
}
}

// Error implements the error interface.
func (c *Error) Error() string {
return c.Reason
Expand Down
106 changes: 98 additions & 8 deletions torch/compute.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package torch
import (
"crypto/md5"
"encoding/hex"
"errors"
"fmt"
"strings"
"time"
Expand All @@ -14,6 +15,31 @@ import (
"github.com/256dpi/fire/stick"
)

// TODO: A computation may fail for some reason. Either there is a problem
// while generating the output or the input is invalid. The former case should
// be handled by the

// TODO: Maybe in a first step we just add the functionality to store an error?
// Most users for computations may need to handle known errors anyway.
// -> Everything else will abort the callback or task as usual.

// Error is used to return computation errors.
type Error struct {
Reason string
}

// E will return a new error with the provided reason.
func E(reason string) *Error {
return &Error{
Reason: reason,
}
}

// Error implements the error interface.
func (e *Error) Error() string {
return e.Reason
}

// Status defines the status of a computation.
type Status struct {
// Progress defines the state of the computation. If the value is less than
Expand All @@ -28,8 +54,18 @@ type Status struct {
Hash string `json:"hash"`

// Valid indicates whether the value is valid. It may be cleared to indicate
// hat the value is outdated and should be recomputed.
// that the value is outdated and should be recomputed.
Valid bool `json:"valid"`

// Attempts defines the number of attempts that were made to compute the
// output.
Attempts int `json:"attempts"`

// TODO: Add "Recompute" flag to force a full re-computation?
// - Allows the system to reset the attempts counter automatically.

// Error defines the error that occurred during the last computation attempt.
Error string `json:"error"`
}

// Hash is a helper function that returns the MD5 hash of the input if present
Expand Down Expand Up @@ -101,22 +137,30 @@ type Computation struct {
// computed. Otherwise, output is released immediately if possible.
KeepOutdated bool

// The maximum number of attempts before the computation is considered
// failed.
MaxAttempts int

// The interval at which the computation should be retried if it fails.
RetryInterval time.Duration

// The interval at which the input is checked for outside changes.
RehashInterval time.Duration

// The interval a which the output is recomputed regardless if the input
// is the same.
// The interval at which the output is recomputed regardless if the input
// is the same or the computation failed.
RecomputeInterval time.Duration
}

// Compute will return an operation that automatically runs the provided
// asynchronous computation. During a check/modifier call, the hash of the input
// is taken to determine if the output needs to be computed. During a scan the
// computation is only invoked when the status is missing, invalid or outdated.
// To force a computation in both cases, the status can be flagged as invalid.
// To force a computation in both cases, the status can be flagged as invalid
// and the attempts cleared.
//
// If no releaser is configured, the computer is also invoked asynchronously to
// compute the output for a zero input (zero hash). If a releaser is configured,
// compute the output for a zero input (zero hash). If a releaser is available,
// it is invoked instead synchronously to release (clear) the current output.
// Optionally, the outdated output can be kept until it is recomputed.
func Compute(comp Computation) *Operation {
Expand All @@ -130,21 +174,39 @@ func Compute(comp Computation) *Operation {
// determine fields
validField := "#" + coal.F(comp.Model, comp.Name) + ".valid"
updatedField := "#" + coal.F(comp.Model, comp.Name) + ".updated"
attemptsField := "#" + coal.F(comp.Model, comp.Name) + ".attempts"

return &Operation{
Name: name,
Model: comp.Model,
Sync: true,
Query: func() bson.M {
// prepare filters
filters := []bson.M{
{comp.Name: nil},
{validField: false},
var filters []bson.M

// add base filter
baseFilter := bson.M{
validField: bson.M{
"$ne": true,
},
}
if comp.MaxAttempts > 0 {
baseFilter[attemptsField] = bson.M{
"$lt": comp.MaxAttempts,
}
}
if comp.RetryInterval > 0 {
// TODO: Only if attempts > 0?
baseFilter[updatedField] = bson.M{
"$lt": time.Now().Add(-comp.RetryInterval),
}
}
filters = append(filters, baseFilter)

// add rehash filter
if comp.RehashInterval > 0 {
filters = append(filters, bson.M{
validField: true,
updatedField: bson.M{
"$lt": time.Now().Add(-comp.RehashInterval),
},
Expand All @@ -154,6 +216,7 @@ func Compute(comp Computation) *Operation {
// add recompute filter
if comp.RecomputeInterval > 0 {
filters = append(filters, bson.M{
// may be valid or invalid
updatedField: bson.M{
"$lt": time.Now().Add(-comp.RecomputeInterval),
},
Expand All @@ -165,6 +228,8 @@ func Compute(comp Computation) *Operation {
}
},
Filter: func(model coal.Model) bool {
// TODO: Check max attempts.

// get status
status := stick.MustGet(model, comp.Name).(*Status)
if status == nil || !status.Valid {
Expand Down Expand Up @@ -260,6 +325,12 @@ func Compute(comp Computation) *Operation {
return nil
}

// get attempts
attempts := 1
if status != nil && !status.Valid {
attempts = status.Attempts + 1
}

// set progress function
ctx.Progress = func(factor float64) error {
// ignore some factors
Expand All @@ -279,6 +350,7 @@ func Compute(comp Computation) *Operation {
comp.Name: &Status{
Progress: factor,
Updated: time.Now(),
Attempts: attempts,
},
},
}, false)
Expand All @@ -293,16 +365,34 @@ func Compute(comp Computation) *Operation {

// compute output
err := comp.Computer(ctx)

// handle failures
var compErr *Error
if errors.As(err, &compErr) {
// update status
ctx.Change("$set", comp.Name, &Status{
Updated: time.Now(),
Attempts: attempts,
Error: compErr.Reason,
})

return nil
}

// return other errors
if err != nil {
return err
}

/* handle success */

// update status
ctx.Change("$set", comp.Name, &Status{
Progress: 1,
Updated: time.Now(),
Hash: hash,
Valid: true,
Attempts: attempts,
})

return nil
Expand Down
18 changes: 18 additions & 0 deletions torch/compute_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ func TestComputeScan(t *testing.T) {
Updated: model.Status.Updated,
Hash: Hash("Hello world!"),
Valid: true,
Attempts: 1,
}, model.Status)
assert.True(t, model.Status.Updated.After(oldUpdated))

Expand Down Expand Up @@ -106,6 +107,7 @@ func TestComputeScan(t *testing.T) {
Updated: model.Status.Updated,
Hash: Hash("What's up?"),
Valid: true,
Attempts: 1,
}, model.Status)
assert.True(t, model.Status.Updated.After(oldUpdated))

Expand All @@ -114,6 +116,7 @@ func TestComputeScan(t *testing.T) {
oldUpdated = model.Status.Updated

model.Status.Valid = false
model.Status.Attempts = 0
env.Replace(model)

err = env.Process(model)
Expand All @@ -126,6 +129,7 @@ func TestComputeScan(t *testing.T) {
Updated: model.Status.Updated,
Hash: Hash("What's up?"),
Valid: true,
Attempts: 1,
}, model.Status)
assert.True(t, model.Status.Updated.After(oldUpdated))

Expand All @@ -146,6 +150,7 @@ func TestComputeScan(t *testing.T) {
Updated: model.Status.Updated,
Hash: "",
Valid: true,
Attempts: 1,
}, model.Status)
assert.True(t, model.Status.Updated.After(oldUpdated))
})
Expand Down Expand Up @@ -202,6 +207,7 @@ func TestComputeProcess(t *testing.T) {
Updated: model.Status.Updated,
Hash: Hash("Hello world!"),
Valid: true,
Attempts: 1,
}, model.Status)
assert.True(t, model.Status.Updated.After(oldStatus.Updated))

Expand Down Expand Up @@ -238,6 +244,7 @@ func TestComputeProcess(t *testing.T) {
Updated: model.Status.Updated,
Hash: Hash("What's up?"),
Valid: true,
Attempts: 1,
}, model.Status)
assert.True(t, model.Status.Updated.After(oldStatus.Updated))

Expand Down Expand Up @@ -265,6 +272,7 @@ func TestComputeProcess(t *testing.T) {
Updated: model.Status.Updated,
Hash: Hash("What's up?"),
Valid: true,
Attempts: 1,
}, model.Status)
assert.True(t, model.Status.Updated.After(oldStatus.Updated))

Expand All @@ -288,6 +296,7 @@ func TestComputeProcess(t *testing.T) {
Updated: model.Status.Updated,
Hash: "",
Valid: true,
Attempts: 1,
}, model.Status)
assert.True(t, model.Status.Updated.After(oldStatus.Updated))
})
Expand Down Expand Up @@ -335,6 +344,7 @@ func TestComputeProgress(t *testing.T) {
Updated: model.Status.Updated,
Hash: Hash("Hello world!"),
Valid: true,
Attempts: 1,
}, model.Status)
assert.Equal(t, []float64{0, 0.25, 0.5, 0.75, 1}, progress)
})
Expand Down Expand Up @@ -374,6 +384,7 @@ func TestComputeReleaser(t *testing.T) {
Updated: model.Status.Updated,
Hash: Hash("Hello world!"),
Valid: true,
Attempts: 1,
}, model.Status)

/* new input */
Expand Down Expand Up @@ -403,6 +414,7 @@ func TestComputeReleaser(t *testing.T) {
Updated: model.Status.Updated,
Hash: Hash("What's up?"),
Valid: true,
Attempts: 1,
}, model.Status)
assert.True(t, model.Status.Updated.After(oldUpdated))

Expand Down Expand Up @@ -458,6 +470,7 @@ func TestComputeKeepOutdated(t *testing.T) {
Updated: model.Status.Updated,
Hash: Hash("Hello world!"),
Valid: true,
Attempts: 1,
}, model.Status)

/* new input */
Expand All @@ -483,6 +496,7 @@ func TestComputeKeepOutdated(t *testing.T) {
Updated: model.Status.Updated,
Hash: Hash("What's up?"),
Valid: true,
Attempts: 1,
}, model.Status)

/* leftover input */
Expand Down Expand Up @@ -532,6 +546,7 @@ func TestComputeRehashInterval(t *testing.T) {
Updated: model.Status.Updated,
Hash: Hash("Hello world!"),
Valid: true,
Attempts: 1,
}, model.Status)

/* rehash same */
Expand All @@ -558,6 +573,7 @@ func TestComputeRehashInterval(t *testing.T) {
Updated: model.Status.Updated,
Hash: Hash("What's up?"),
Valid: true,
Attempts: 1,
}, model.Status)
assert.True(t, model.Status.Updated.After(before))
})
Expand Down Expand Up @@ -593,6 +609,7 @@ func TestComputeRecomputeInterval(t *testing.T) {
Updated: model.Status.Updated,
Hash: Hash("Hello world!"),
Valid: true,
Attempts: 1,
}, model.Status)

/* recompute same */
Expand All @@ -610,6 +627,7 @@ func TestComputeRecomputeInterval(t *testing.T) {
Updated: model.Status.Updated,
Hash: Hash("Hello world!"),
Valid: true,
Attempts: 1,
}, model.Status)
assert.True(t, model.Status.Updated.After(updated))

Expand Down
2 changes: 1 addition & 1 deletion torch/operation.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ type Context struct {
// not yet been fully processed and the handler should be called again
// sometime later. If a synchronous operation is deferred, it will always be
// retried asynchronously.
Defer bool
Defer bool // TODO: Use time.Duration?

// The executed operation.
Operation *Operation
Expand Down
Loading

0 comments on commit 1a3c1c3

Please sign in to comment.