Skip to content

Commit

Permalink
fix(warehouse): increase async job timeout (#2721)
Browse files Browse the repository at this point in the history
  • Loading branch information
achettyiitr authored Nov 24, 2022
1 parent 867ce5b commit 88f1ec1
Show file tree
Hide file tree
Showing 5 changed files with 157 additions and 149 deletions.
40 changes: 20 additions & 20 deletions warehouse/jobs/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,36 +18,36 @@ import (
)

// AddWarehouseJobHandler The following handler gets called for adding async
func (asyncWhJob *AsyncJobWhT) AddWarehouseJobHandler(w http.ResponseWriter, r *http.Request) {
pkgLogger.Info("[WH-Jobs] Got Async Job Add Request")
pkgLogger.LogRequest(r)
func (a *AsyncJobWhT) AddWarehouseJobHandler(w http.ResponseWriter, r *http.Request) {
a.logger.Info("[WH-Jobs] Got Async Job Add Request")
a.logger.LogRequest(r)
body, err := io.ReadAll(r.Body)
if err != nil {
pkgLogger.Errorf("[WH-Jobs]: Error reading body: %v", err)
a.logger.Errorf("[WH-Jobs]: Error reading body: %v", err)
http.Error(w, "can't read body", http.StatusBadRequest)
return
}
defer r.Body.Close()
var startJobPayload StartJobReqPayload
err = json.Unmarshal(body, &startJobPayload)
if err != nil {
pkgLogger.Errorf("[WH-Jobs]: Error unmarshalling body: %v", err)
a.logger.Errorf("[WH-Jobs]: Error unmarshalling body: %v", err)
http.Error(w, "can't unmarshall body", http.StatusBadRequest)
return
}
if !validatePayload(startJobPayload) {
pkgLogger.Errorf("[WH-Jobs]: Invalid Payload %v", err)
a.logger.Errorf("[WH-Jobs]: Invalid Payload %v", err)
http.Error(w, "invalid Payload", http.StatusBadRequest)
return
}
if !asyncWhJob.enabled {
pkgLogger.Errorf("[WH-Jobs]: Error Warehouse Jobs API not initialized %v", err)
if !a.enabled {
a.logger.Errorf("[WH-Jobs]: Error Warehouse Jobs API not initialized %v", err)
http.Error(w, "warehouse jobs api not initialized", http.StatusBadRequest)
return
}
tableNames, err := asyncWhJob.getTableNamesBy(startJobPayload.SourceID, startJobPayload.DestinationID, startJobPayload.JobRunID, startJobPayload.TaskRunID)
tableNames, err := a.getTableNamesBy(startJobPayload.SourceID, startJobPayload.DestinationID, startJobPayload.JobRunID, startJobPayload.TaskRunID)
if err != nil {
pkgLogger.Errorf("[WH-Jobs]: Error extracting tableNames for the job run id: %v", err)
a.logger.Errorf("[WH-Jobs]: Error extracting tableNames for the job run id: %v", err)
http.Error(w, "Error extracting tableNames", http.StatusBadRequest)
return
}
Expand All @@ -73,7 +73,7 @@ func (asyncWhJob *AsyncJobWhT) AddWarehouseJobHandler(w http.ResponseWriter, r *
AsyncJobType: startJobPayload.AsyncJobType,
MetaData: metadataJson,
}
id, err := asyncWhJob.addJobsToDB(asyncWhJob.context, &payload)
id, err := a.addJobsToDB(a.context, &payload)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
Expand All @@ -93,10 +93,10 @@ func (asyncWhJob *AsyncJobWhT) AddWarehouseJobHandler(w http.ResponseWriter, r *
_, _ = w.Write(response)
}

func (asyncWhJob *AsyncJobWhT) StatusWarehouseJobHandler(w http.ResponseWriter, r *http.Request) {
func (a *AsyncJobWhT) StatusWarehouseJobHandler(w http.ResponseWriter, r *http.Request) {
if r.Method == http.MethodGet {
pkgLogger.Info("Got Async Job Status Request")
pkgLogger.LogRequest(r)
a.logger.Info("Got Async Job Status Request")
a.logger.LogRequest(r)
jobRunId := r.URL.Query().Get("job_run_id")
taskRunId := r.URL.Query().Get("task_run_id")

Expand All @@ -110,7 +110,7 @@ func (asyncWhJob *AsyncJobWhT) StatusWarehouseJobHandler(w http.ResponseWriter,
}
if !validatePayload(payload) {

pkgLogger.Errorf("[WH]: Error Invalid Status Parameters")
a.logger.Errorf("[WH]: Error Invalid Status Parameters")
http.Error(w, "invalid request", http.StatusBadRequest)
return
}
Expand All @@ -120,15 +120,15 @@ func (asyncWhJob *AsyncJobWhT) StatusWarehouseJobHandler(w http.ResponseWriter,
SourceID: sourceId,
DestinationID: destinationId,
}
pkgLogger.Infof("Got Payload job_run_id %s, task_run_id %s \n", startJobPayload.JobRunID, startJobPayload.TaskRunID)
a.logger.Infof("Got Payload job_run_id %s, task_run_id %s \n", startJobPayload.JobRunID, startJobPayload.TaskRunID)

if !asyncWhJob.enabled {
pkgLogger.Errorf("[WH]: Error Warehouse Jobs API not initialized")
if !a.enabled {
a.logger.Errorf("[WH]: Error Warehouse Jobs API not initialized")
http.Error(w, "warehouse jobs api not initialized", http.StatusBadRequest)
return
}

response := asyncWhJob.getStatusAsyncJob(asyncWhJob.context, &startJobPayload)
response := a.getStatusAsyncJob(a.context, &startJobPayload)

writeResponse, err := json.Marshal(response)
if err != nil {
Expand All @@ -137,7 +137,7 @@ func (asyncWhJob *AsyncJobWhT) StatusWarehouseJobHandler(w http.ResponseWriter,
}
w.Write(writeResponse)
} else {
pkgLogger.Errorf("[WH]: Error Invalid Method")
a.logger.Errorf("[WH]: Error Invalid Method")
http.Error(w, "invalid request", http.StatusBadRequest)
return
}
Expand Down
Loading

0 comments on commit 88f1ec1

Please sign in to comment.