From 3a540597802b105e86999cc3715f36f2e9fa4b7a Mon Sep 17 00:00:00 2001 From: Yee Hing Tong Date: Tue, 16 Jun 2020 11:06:02 -0700 Subject: [PATCH] Presto - Reduce number of queries run (#98) --- go/tasks/plugins/presto/execution_state.go | 49 +++++++--------------- 1 file changed, 15 insertions(+), 34 deletions(-) diff --git a/go/tasks/plugins/presto/execution_state.go b/go/tasks/plugins/presto/execution_state.go index 5268cf4499..701b25d6a7 100644 --- a/go/tasks/plugins/presto/execution_state.go +++ b/go/tasks/plugins/presto/execution_state.go @@ -123,10 +123,10 @@ func HandleExecutionState( newState, transformError = MonitorQuery(ctx, tCtx, currentState, executionsCache) case PhaseQuerySucceeded: - if currentState.QueryCount < 4 { + if currentState.QueryCount < 1 { // If there are still Presto statements to execute, increment the query count, reset the phase to 'queued' // and continue executing the remaining statements. In this case, we won't request another allocation token - // as the 5 statements that get executed are all considered to be part of the same "query" + // as the 2 statements that get executed are all considered to be part of the same "query" currentState.PreviousPhase = currentState.CurrentPhase currentState.CurrentPhase = PhaseQueued } else { @@ -304,7 +304,18 @@ func GetNextQuery( user = tCtx.TaskExecutionMetadata().GetNamespace() } - statement = fmt.Sprintf(`CREATE TABLE hive.flyte_temporary_tables."%s_temp" AS %s`, tempTableName, statement) + externalLocation, err := tCtx.DataStore().ConstructReference(ctx, tCtx.OutputWriter().GetRawOutputPrefix(), "") + if err != nil { + return Query{}, err + } + + queryWrapTemplate := ` + CREATE TABLE hive.flyte_temporary_tables."%s_temp" + WITH (format = 'PARQUET', external_location = '%s') + AS (%s) + ` + + statement = fmt.Sprintf(queryWrapTemplate, tempTableName, externalLocation, statement) prestoQuery := Query{ Statement: statement, @@ -317,46 +328,16 @@ func GetNextQuery( }, TempTableName: tempTableName + "_temp", ExternalTableName: tempTableName + "_external", + ExternalLocation: externalLocation.String(), } return prestoQuery, nil case 1: - externalLocation, err := tCtx.DataStore().ConstructReference(ctx, tCtx.OutputWriter().GetRawOutputPrefix(), "") - if err != nil { - return Query{}, err - } - - statement := fmt.Sprintf(` -CREATE TABLE hive.flyte_temporary_tables."%s" (LIKE hive.flyte_temporary_tables."%s") -WITH (format = 'PARQUET', external_location = '%s')`, - currentState.CurrentPrestoQuery.ExternalTableName, - currentState.CurrentPrestoQuery.TempTableName, - externalLocation, - ) - currentState.CurrentPrestoQuery.Statement = statement - currentState.CurrentPrestoQuery.ExternalLocation = externalLocation.String() - return currentState.CurrentPrestoQuery, nil - - case 2: - statement := ` -INSERT INTO hive.flyte_temporary_tables."%s" -SELECT * -FROM hive.flyte_temporary_tables."%s"` - statement = fmt.Sprintf(statement, currentState.CurrentPrestoQuery.ExternalTableName, currentState.CurrentPrestoQuery.TempTableName) - currentState.CurrentPrestoQuery.Statement = statement - return currentState.CurrentPrestoQuery, nil - - case 3: statement := fmt.Sprintf(`DROP TABLE hive.flyte_temporary_tables."%s"`, currentState.CurrentPrestoQuery.TempTableName) currentState.CurrentPrestoQuery.Statement = statement return currentState.CurrentPrestoQuery, nil - case 4: - statement := fmt.Sprintf(`DROP TABLE hive.flyte_temporary_tables."%s"`, currentState.CurrentPrestoQuery.ExternalTableName) - currentState.CurrentPrestoQuery.Statement = statement - return currentState.CurrentPrestoQuery, nil - default: return currentState.CurrentPrestoQuery, nil }