Skip to content
This repository has been archived by the owner on Oct 9, 2023. It is now read-only.

Fix PhaseStatus that gets displayed for Presto tasks #74

Merged
merged 1 commit into from
Apr 2, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 17 additions & 11 deletions go/tasks/plugins/presto/execution_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@ func (p ExecutionPhase) String() string {
}

type ExecutionState struct {
Phase ExecutionPhase
CurrentPhase ExecutionPhase
PreviousPhase ExecutionPhase

// This will store the command ID from Presto
CommandID string `json:"commandId,omitempty"`
Expand Down Expand Up @@ -105,7 +106,7 @@ func HandleExecutionState(
var transformError error
var newState ExecutionState

switch currentState.Phase {
switch currentState.CurrentPhase {
case PhaseNotStarted:
newState, transformError = GetAllocationToken(ctx, tCtx, currentState, metrics)

Expand All @@ -125,8 +126,11 @@ func HandleExecutionState(
// If there are still Presto statements to execute, increment the query count, reset the phase to 'queued'
// and continue executing the remaining statements. In this case, we won't request another allocation token
// as the 5 statements that get executed are all considered to be part of the same "query"
currentState.Phase = PhaseQueued
currentState.PreviousPhase = currentState.CurrentPhase
currentState.CurrentPhase = PhaseQueued
} else {
//currentState.Phase = PhaseQuerySucceeded
currentState.PreviousPhase = currentState.CurrentPhase
transformError = writeOutput(ctx, tCtx, currentState.CurrentPrestoQuery.ExternalLocation)
}
currentState.QueryCount++
Expand Down Expand Up @@ -172,11 +176,11 @@ func GetAllocationToken(
}

if allocationStatus == core.AllocationStatusGranted {
newState.Phase = PhaseQueued
newState.CurrentPhase = PhaseQueued
} else if allocationStatus == core.AllocationStatusExhausted {
newState.Phase = PhaseNotStarted
newState.CurrentPhase = PhaseNotStarted
} else if allocationStatus == core.AllocationStatusNamespaceQuotaExceeded {
newState.Phase = PhaseNotStarted
newState.CurrentPhase = PhaseNotStarted
} else {
return newState, errors.Errorf(errors.ResourceManagerFailure, "Got bad allocation result [%s] for token [%s]",
allocationStatus, uniqueID)
Expand Down Expand Up @@ -389,7 +393,8 @@ func KickOffQuery(
commandID := response.ID
logger.Infof(ctx, "Created Presto ID [%s] for token %s", commandID, uniqueID)
currentState.CommandID = commandID
currentState.Phase = PhaseSubmitted
currentState.PreviousPhase = currentState.CurrentPhase
currentState.CurrentPhase = PhaseSubmitted
currentState.URI = response.NextURI
currentState.CurrentPrestoQueryUUID = uniqueID

Expand Down Expand Up @@ -475,7 +480,8 @@ func MapExecutionStateToPhaseInfo(state ExecutionState) core.PhaseInfo {
var phaseInfo core.PhaseInfo
t := time.Now()

switch state.Phase {
//switch state.Phase {
switch state.CurrentPhase {
case PhaseNotStarted:
phaseInfo = core.PhaseInfoNotReady(t, core.DefaultPhaseVersion, "Haven't received allocation token")
case PhaseQueued:
Expand Down Expand Up @@ -515,7 +521,7 @@ func ConstructTaskInfo(e ExecutionState) *core.TaskInfo {

func ConstructTaskLog(e ExecutionState) *idlCore.TaskLog {
return &idlCore.TaskLog{
Name: fmt.Sprintf("Status: %s [%s]", e.Phase, e.CommandID),
Name: fmt.Sprintf("Status: %s [%s]", e.PreviousPhase, e.CommandID),
MessageFormat: idlCore.TaskLog_UNKNOWN,
Uri: e.URI,
}
Expand Down Expand Up @@ -551,11 +557,11 @@ func Finalize(ctx context.Context, tCtx core.TaskExecutionContext, _ ExecutionSt
}

func InTerminalState(e ExecutionState) bool {
return e.Phase == PhaseQuerySucceeded || e.Phase == PhaseQueryFailed
return e.CurrentPhase == PhaseQuerySucceeded || e.CurrentPhase == PhaseQueryFailed
}

func IsNotYetSubmitted(e ExecutionState) bool {
if e.Phase == PhaseNotStarted || e.Phase == PhaseQueued {
if e.CurrentPhase == PhaseNotStarted || e.CurrentPhase == PhaseQueued {
return true
}
return false
Expand Down
32 changes: 16 additions & 16 deletions go/tasks/plugins/presto/execution_state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func TestInTerminalState(t *testing.T) {

for _, tt := range stateTests {
t.Run(tt.phase.String(), func(t *testing.T) {
e := ExecutionState{Phase: tt.phase}
e := ExecutionState{CurrentPhase: tt.phase}
res := InTerminalState(e)
assert.Equal(t, tt.isTerminal, res)
})
Expand All @@ -63,7 +63,7 @@ func TestIsNotYetSubmitted(t *testing.T) {

for _, tt := range stateTests {
t.Run(tt.phase.String(), func(t *testing.T) {
e := ExecutionState{Phase: tt.phase}
e := ExecutionState{CurrentPhase: tt.phase}
res := IsNotYetSubmitted(e)
assert.Equal(t, tt.isNotYetSubmitted, res)
})
Expand Down Expand Up @@ -98,7 +98,7 @@ func TestConstructTaskInfo(t *testing.T) {
assert.NoError(t, err)

e := ExecutionState{
Phase: PhaseQuerySucceeded,
CurrentPhase: PhaseQuerySucceeded,
CommandID: "123",
SyncFailureCount: 0,
URI: u.String(),
Expand All @@ -111,22 +111,22 @@ func TestConstructTaskInfo(t *testing.T) {
func TestMapExecutionStateToPhaseInfo(t *testing.T) {
t.Run("NotStarted", func(t *testing.T) {
e := ExecutionState{
Phase: PhaseNotStarted,
CurrentPhase: PhaseNotStarted,
}
phaseInfo := MapExecutionStateToPhaseInfo(e)
assert.Equal(t, core.PhaseNotReady, phaseInfo.Phase())
})

t.Run("Queued", func(t *testing.T) {
e := ExecutionState{
Phase: PhaseQueued,
CurrentPhase: PhaseQueued,
CreationFailureCount: 0,
}
phaseInfo := MapExecutionStateToPhaseInfo(e)
assert.Equal(t, core.PhaseRunning, phaseInfo.Phase())

e = ExecutionState{
Phase: PhaseQueued,
CurrentPhase: PhaseQueued,
CreationFailureCount: 100,
}
phaseInfo = MapExecutionStateToPhaseInfo(e)
Expand All @@ -136,7 +136,7 @@ func TestMapExecutionStateToPhaseInfo(t *testing.T) {

t.Run("Submitted", func(t *testing.T) {
e := ExecutionState{
Phase: PhaseSubmitted,
CurrentPhase: PhaseSubmitted,
}
phaseInfo := MapExecutionStateToPhaseInfo(e)
assert.Equal(t, core.PhaseRunning, phaseInfo.Phase())
Expand All @@ -157,7 +157,7 @@ func TestGetAllocationToken(t *testing.T) {
mockMetrics := getPrestoExecutorMetrics(promutils.NewTestScope())
state, err := GetAllocationToken(ctx, tCtx, mockCurrentState, mockMetrics)
assert.NoError(t, err)
assert.Equal(t, PhaseQueued, state.Phase)
assert.Equal(t, PhaseQueued, state.CurrentPhase)
})

t.Run("exhausted", func(t *testing.T) {
Expand All @@ -171,7 +171,7 @@ func TestGetAllocationToken(t *testing.T) {
mockMetrics := getPrestoExecutorMetrics(promutils.NewTestScope())
state, err := GetAllocationToken(ctx, tCtx, mockCurrentState, mockMetrics)
assert.NoError(t, err)
assert.Equal(t, PhaseNotStarted, state.Phase)
assert.Equal(t, PhaseNotStarted, state.CurrentPhase)
})

t.Run("namespace exhausted", func(t *testing.T) {
Expand All @@ -185,7 +185,7 @@ func TestGetAllocationToken(t *testing.T) {
mockMetrics := getPrestoExecutorMetrics(promutils.NewTestScope())
state, err := GetAllocationToken(ctx, tCtx, mockCurrentState, mockMetrics)
assert.NoError(t, err)
assert.Equal(t, PhaseNotStarted, state.Phase)
assert.Equal(t, PhaseNotStarted, state.CurrentPhase)
})

t.Run("Request start time, if empty in current state, should be set", func(t *testing.T) {
Expand Down Expand Up @@ -232,7 +232,7 @@ func TestAbort(t *testing.T) {
x = true
}).Return(nil)

err := Abort(ctx, ExecutionState{Phase: PhaseSubmitted, CommandID: "123456"}, mockPresto)
err := Abort(ctx, ExecutionState{CurrentPhase: PhaseSubmitted, CommandID: "123456"}, mockPresto)
assert.NoError(t, err)
assert.True(t, x)
})
Expand All @@ -245,7 +245,7 @@ func TestAbort(t *testing.T) {
x = true
}).Return(nil)

err := Abort(ctx, ExecutionState{Phase: PhaseQuerySucceeded, CommandID: "123456"}, mockPresto)
err := Abort(ctx, ExecutionState{CurrentPhase: PhaseQuerySucceeded, CommandID: "123456"}, mockPresto)
assert.NoError(t, err)
assert.False(t, x)
})
Expand All @@ -272,12 +272,12 @@ func TestMonitorQuery(t *testing.T) {
ctx := context.Background()
tCtx := GetMockTaskExecutionContext()
state := ExecutionState{
Phase: PhaseSubmitted,
CurrentPhase: PhaseSubmitted,
}
var getOrCreateCalled = false
mockCache := &mocks2.AutoRefresh{}
mockCache.OnGetOrCreateMatch(mock.AnythingOfType("string"), mock.Anything).Return(ExecutionStateCacheItem{
ExecutionState: ExecutionState{Phase: PhaseQuerySucceeded},
ExecutionState: ExecutionState{CurrentPhase: PhaseQuerySucceeded},
Identifier: "my_wf_exec_project:my_wf_exec_domain:my_wf_exec_name",
}, nil).Run(func(_ mock.Arguments) {
getOrCreateCalled = true
Expand All @@ -286,7 +286,7 @@ func TestMonitorQuery(t *testing.T) {
newState, err := MonitorQuery(ctx, tCtx, state, mockCache)
assert.NoError(t, err)
assert.True(t, getOrCreateCalled)
assert.Equal(t, PhaseQuerySucceeded, newState.Phase)
assert.Equal(t, PhaseQuerySucceeded, newState.CurrentPhase)
}

func TestKickOffQuery(t *testing.T) {
Expand All @@ -312,7 +312,7 @@ func TestKickOffQuery(t *testing.T) {
state := ExecutionState{}
newState, err := KickOffQuery(ctx, tCtx, state, mockPresto, mockCache)
assert.NoError(t, err)
assert.Equal(t, PhaseSubmitted, newState.Phase)
assert.Equal(t, PhaseSubmitted, newState.CurrentPhase)
assert.Equal(t, "1234567", newState.CommandID)
assert.True(t, getOrCreateCalled)
assert.True(t, prestoCalled)
Expand Down
7 changes: 4 additions & 3 deletions go/tasks/plugins/presto/executions_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,11 +123,12 @@ func (p *ExecutionsCache) SyncPrestoQuery(ctx context.Context, batch cache.Batch
return nil, err
}

if newExecutionPhase > executionStateCacheItem.Phase {
if newExecutionPhase > executionStateCacheItem.CurrentPhase {
logger.Infof(ctx, "Moving ExecutionPhase for %s %s from %s to %s", executionStateCacheItem.CommandID,
executionStateCacheItem.Identifier, executionStateCacheItem.Phase, newExecutionPhase)
executionStateCacheItem.Identifier, executionStateCacheItem.CurrentPhase, newExecutionPhase)

executionStateCacheItem.Phase = newExecutionPhase
executionStateCacheItem.PreviousPhase = executionStateCacheItem.CurrentPhase
executionStateCacheItem.CurrentPhase = newExecutionPhase

resp = append(resp, cache.ItemSyncResponse{
ID: query.GetID(),
Expand Down
8 changes: 4 additions & 4 deletions go/tasks/plugins/presto/executions_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func TestPrestoExecutionsCache_SyncQuboleQuery(t *testing.T) {
}

state := ExecutionState{
Phase: PhaseQuerySucceeded,
CurrentPhase: PhaseQuerySucceeded,
}
cacheItem := ExecutionStateCacheItem{
ExecutionState: state,
Expand Down Expand Up @@ -67,8 +67,8 @@ func TestPrestoExecutionsCache_SyncQuboleQuery(t *testing.T) {
}

state := ExecutionState{
CommandID: "123456",
Phase: PhaseSubmitted,
CommandID: "123456",
CurrentPhase: PhaseSubmitted,
}
cacheItem := ExecutionStateCacheItem{
ExecutionState: state,
Expand All @@ -86,6 +86,6 @@ func TestPrestoExecutionsCache_SyncQuboleQuery(t *testing.T) {
newExecutionState := newCacheItem[0].Item.(ExecutionStateCacheItem)
assert.NoError(t, err)
assert.Equal(t, cache.Update, newCacheItem[0].Action)
assert.Equal(t, PhaseQuerySucceeded, newExecutionState.Phase)
assert.Equal(t, PhaseQuerySucceeded, newExecutionState.CurrentPhase)
})
}