Skip to content

Commit

Permalink
Add reported_at timestamps to events (flyteorg#529)
Browse files Browse the repository at this point in the history
* updated flyteidl

Signed-off-by: Daniel Rammer <[email protected]>

* updated pod timestamp recording and added recorded_at on task event

Signed-off-by: Daniel Rammer <[email protected]>

* updated flyteidl and flyteplugins

Signed-off-by: Daniel Rammer <[email protected]>

* setting occurred_at when node inputs begin resolution

Signed-off-by: Daniel Rammer <[email protected]>

* added reported_at to node execution events

Signed-off-by: Daniel Rammer <[email protected]>

* updated flyteidl and flyteplugins dependencies

Signed-off-by: Daniel Rammer <[email protected]>

* updated flyteidl and flyteplugins deps

Signed-off-by: Daniel Rammer <[email protected]>

---------

Signed-off-by: Daniel Rammer <[email protected]>
  • Loading branch information
hamersaw authored Mar 24, 2023
1 parent b4cde1f commit e462412
Show file tree
Hide file tree
Showing 7 changed files with 46 additions and 11 deletions.
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ require (
github.com/DiSiqueira/GoTree v1.0.1-0.20180907134536-53a8e837f295
github.com/benlaurie/objecthash v0.0.0-20180202135721-d1e3d6079fc1
github.com/fatih/color v1.13.0
github.com/flyteorg/flyteidl v1.3.12
github.com/flyteorg/flyteplugins v1.0.42
github.com/flyteorg/flyteidl v1.3.14
github.com/flyteorg/flyteplugins v1.0.43
github.com/flyteorg/flytestdlib v1.0.15
github.com/ghodss/yaml v1.0.0
github.com/go-redis/redis v6.15.7+incompatible
Expand Down
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -260,10 +260,10 @@ github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5Kwzbycv
github.com/fatih/color v1.13.0 h1:8LOYc1KYPPmyKMuN8QV2DNRWNbLo6LZ0iLs8+mlH53w=
github.com/fatih/color v1.13.0/go.mod h1:kLAiJbzzSOZDVNGyDpeOxJ47H46qBXwg5ILebYFFOfk=
github.com/felixge/httpsnoop v1.0.1/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U=
github.com/flyteorg/flyteidl v1.3.12 h1:RTcxCrqKU235cWuy+j3gkmqPJOaaYEcJaT6fsRjoS8Q=
github.com/flyteorg/flyteidl v1.3.12/go.mod h1:Pkt2skI1LiHs/2ZoekBnyPhuGOFMiuul6HHcKGZBsbM=
github.com/flyteorg/flyteplugins v1.0.42 h1:vkBe8hzG7jlpqE6ZBKf7cENjkN8n+t6ff+EpxYeRrec=
github.com/flyteorg/flyteplugins v1.0.42/go.mod h1:5in2y2zWO6fbheoPJ44wNRppfVpjkWXCs0dy+oA232o=
github.com/flyteorg/flyteidl v1.3.14 h1:o5M0g/r6pXTPu5PEurbYxbQmuOu3hqqsaI2M6uvK0N8=
github.com/flyteorg/flyteidl v1.3.14/go.mod h1:Pkt2skI1LiHs/2ZoekBnyPhuGOFMiuul6HHcKGZBsbM=
github.com/flyteorg/flyteplugins v1.0.43 h1:uI/Y88xqJKfvfuxfu0Sw9CNZ7iu3+HUwwRhxh558cbs=
github.com/flyteorg/flyteplugins v1.0.43/go.mod h1:ztsonku5fKwyxcIg1k69PTiBVjRI6d3nK5DnC+iwx08=
github.com/flyteorg/flytestdlib v1.0.15 h1:kv9jDQmytbE84caY+pkZN8trJU2ouSAmESzpTEhfTt0=
github.com/flyteorg/flytestdlib v1.0.15/go.mod h1:ghw/cjY0sEWIIbyCtcJnL/Gt7ZS7gf9SUi0CCPhbz3s=
github.com/flyteorg/stow v0.3.6 h1:jt50ciM14qhKBaIrB+ppXXY+SXB59FNREFgTJqCyqIk=
Expand Down
4 changes: 4 additions & 0 deletions pkg/controller/nodes/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -524,6 +524,7 @@ func (c *nodeExecutor) finalize(ctx context.Context, h handler.Node, nCtx handle
func (c *nodeExecutor) handleNotYetStartedNode(ctx context.Context, dag executors.DAGStructure, nCtx *nodeExecContext, _ handler.Node) (executors.NodeStatus, error) {
logger.Debugf(ctx, "Node not yet started, running pre-execute")
defer logger.Debugf(ctx, "Node pre-execute completed")
occurredAt := time.Now()
p, err := c.preExecute(ctx, dag, nCtx)
if err != nil {
logger.Errorf(ctx, "failed preExecute for node. Error: %s", err.Error())
Expand All @@ -547,6 +548,7 @@ func (c *nodeExecutor) handleNotYetStartedNode(ctx context.Context, dag executor
if np != nodeStatus.GetPhase() {
// assert np == Queued!
logger.Infof(ctx, "Change in node state detected from [%s] -> [%s]", nodeStatus.GetPhase().String(), np.String())
p = p.WithOccuredAt(occurredAt)

nev, err := ToNodeExecutionEvent(nCtx.NodeExecutionMetadata().GetNodeExecutionID(),
p, nCtx.InputReader().GetInputPath().String(), nodeStatus, nCtx.ExecutionContext().GetEventVersion(),
Expand Down Expand Up @@ -691,6 +693,7 @@ func (c *nodeExecutor) handleQueuedOrRunningNode(ctx context.Context, nCtx *node
Message: err.Error(),
},
},
ReportedAt: ptypes.TimestampNow(),
})

if err != nil {
Expand Down Expand Up @@ -1152,6 +1155,7 @@ func (c *nodeExecutor) AbortHandler(ctx context.Context, execContext executors.E
},
},
ProducerId: c.clusterID,
ReportedAt: ptypes.TimestampNow(),
})
if err != nil && !eventsErr.IsNotFound(err) && !eventsErr.IsEventIncompatibleClusterError(err) {
if errors2.IsCausedBy(err, errors.IllegalStateError) {
Expand Down
10 changes: 10 additions & 0 deletions pkg/controller/nodes/handler/transition_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,16 @@ func (p PhaseInfo) WithInfo(i *ExecutionInfo) PhaseInfo {
}
}

func (p PhaseInfo) WithOccuredAt(t time.Time) PhaseInfo {
return PhaseInfo{
p: p.p,
occurredAt: t,
err: p.err,
info: p.info,
reason: p.reason,
}
}

var PhaseInfoUndefined = PhaseInfo{p: EPhaseUndefined}

func phaseInfo(p EPhase, err *core.ExecutionError, info *ExecutionInfo, reason string) PhaseInfo {
Expand Down
3 changes: 3 additions & 0 deletions pkg/controller/nodes/task/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -659,6 +659,7 @@ func (t Handler) Handle(ctx context.Context, nCtx handler.NodeExecutionContext)
}

barrierTick := uint32(0)
occurredAt := time.Now()
// STEP 2: If no cache-hit and not transitioning to PhaseWaitingForCache, then lets invoke the plugin and wait for a transition out of undefined
if pluginTrns.execInfo.TaskNodeInfo == nil || (pluginTrns.pInfo.Phase() != pluginCore.PhaseWaitingForCache &&
pluginTrns.execInfo.TaskNodeInfo.TaskNodeMetadata.CacheStatus != core.CatalogCacheStatus_CACHE_HIT) {
Expand Down Expand Up @@ -724,6 +725,7 @@ func (t Handler) Handle(ctx context.Context, nCtx handler.NodeExecutionContext)
PluginID: p.GetID(),
ResourcePoolInfo: tCtx.rm.GetResourcePoolInfo(),
ClusterID: t.clusterID,
OccurredAt: occurredAt,
})
if err != nil {
return handler.UnknownTransition, err
Expand All @@ -750,6 +752,7 @@ func (t Handler) Handle(ctx context.Context, nCtx handler.NodeExecutionContext)
PluginID: p.GetID(),
ResourcePoolInfo: tCtx.rm.GetResourcePoolInfo(),
ClusterID: t.clusterID,
OccurredAt: occurredAt,
})
if err != nil {
logger.Errorf(ctx, "failed to convert plugin transition to TaskExecutionEvent. Error: %s", err.Error())
Expand Down
26 changes: 21 additions & 5 deletions pkg/controller/nodes/task/transformer.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package task

import (
"time"

"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core"
"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/event"
pluginCore "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/core"
Expand All @@ -9,9 +11,10 @@ import (
"github.com/flyteorg/flytepropeller/pkg/controller/config"
"github.com/flyteorg/flytepropeller/pkg/controller/executors"
"github.com/flyteorg/flytepropeller/pkg/controller/nodes/common"
"github.com/golang/protobuf/ptypes"

"github.com/flyteorg/flytepropeller/pkg/controller/nodes/handler"

"github.com/golang/protobuf/ptypes"
timestamppb "github.com/golang/protobuf/ptypes/timestamp"
)

// This is used by flyteadmin to indicate that map tasks now report subtask metadata individually.
Expand Down Expand Up @@ -78,15 +81,27 @@ type ToTaskExecutionEventInputs struct {
PluginID string
ResourcePoolInfo []*event.ResourcePoolInfo
ClusterID string
OccurredAt time.Time
}

func ToTaskExecutionEvent(input ToTaskExecutionEventInputs) (*event.TaskExecutionEvent, error) {
// Transitions to a new phase

tm := ptypes.TimestampNow()
var err error
var occurredAt *timestamppb.Timestamp
if i := input.Info.Info(); i != nil && i.OccurredAt != nil {
tm, err = ptypes.TimestampProto(*i.OccurredAt)
occurredAt, err = ptypes.TimestampProto(*i.OccurredAt)
} else {
occurredAt, err = ptypes.TimestampProto(input.OccurredAt)
}

if err != nil {
return nil, err
}

reportedAt := ptypes.TimestampNow()
if i := input.Info.Info(); i != nil && i.ReportedAt != nil {
occurredAt, err = ptypes.TimestampProto(*i.ReportedAt)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -127,11 +142,12 @@ func ToTaskExecutionEvent(input ToTaskExecutionEventInputs) (*event.TaskExecutio
Phase: ToTaskEventPhase(input.Info.Phase()),
PhaseVersion: input.Info.Version(),
ProducerId: input.ClusterID,
OccurredAt: tm,
OccurredAt: occurredAt,
TaskType: input.TaskType,
Reason: input.Info.Reason(),
Metadata: metadata,
EventVersion: taskExecutionEventVersion,
ReportedAt: reportedAt,
}

if input.Info.Phase().IsSuccess() && input.OutputWriter != nil {
Expand Down
2 changes: 2 additions & 0 deletions pkg/controller/nodes/transformers.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ func ToNodeExecutionEvent(nodeExecID *core.NodeExecutionIdentifier,
OccurredAt: occurredTime,
ProducerId: clusterID,
EventVersion: nodeExecutionEventVersion,
ReportedAt: ptypes.TimestampNow(),
}
} else {
nev = &event.NodeExecutionEvent{
Expand All @@ -122,6 +123,7 @@ func ToNodeExecutionEvent(nodeExecID *core.NodeExecutionIdentifier,
OccurredAt: occurredTime,
ProducerId: clusterID,
EventVersion: nodeExecutionEventVersion,
ReportedAt: ptypes.TimestampNow(),
}
}

Expand Down

0 comments on commit e462412

Please sign in to comment.