Skip to content

Commit

Permalink
exp/lighthorizon: Set a default number of workers. (#4465)
Browse files Browse the repository at this point in the history
* Default to the number of CPUs if worker count isn't specified
* Set a timeout on the reduce job to avoid test suite hanging indefinitely
  • Loading branch information
Shaptic authored Jul 20, 2022
1 parent 66228cb commit 033de79
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 5 deletions.
4 changes: 4 additions & 0 deletions exp/lighthorizon/index/backend/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ type FileBackend struct {
}

func NewFileBackend(dir string, parallel uint32) (*FileBackend, error) {
if parallel <= 0 {
parallel = 1
}

return &FileBackend{
dir: dir,
parallel: parallel,
Expand Down
10 changes: 5 additions & 5 deletions exp/lighthorizon/index/cmd/reduce.sh
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#!/bin/bash
#!/usr/bin/env bash
#
# Combines indices that were built separately in different folders into a single
# set of indices.
Expand All @@ -22,7 +22,7 @@ fi

if [[ ! -d "$2" ]]; then
echo "Warning: index dest ('$2') does not exist, creating..."
mkdir -p $2
mkdir -p "$2"
fi

MAP_JOB_COUNT=$(ls $1 | grep -E 'job_[0-9]+' | wc -l)
Expand Down Expand Up @@ -50,8 +50,8 @@ do
AWS_BATCH_JOB_ARRAY_INDEX=$i MAP_JOB_COUNT=$MAP_JOB_COUNT \
REDUCE_JOB_COUNT=$REDUCE_JOB_COUNT WORKER_COUNT=4 \
INDEX_SOURCE_ROOT=file://$1 INDEX_TARGET=file://$2 \
./reduce &
timeout -k 30s 10s ./reduce &

echo "pid=$!"
pids+=($!)
done
Expand All @@ -72,4 +72,4 @@ done

rm ./reduce # cleanup
echo "All jobs succeeded!"
exit 0
exit 0
5 changes: 5 additions & 0 deletions exp/lighthorizon/index/connect.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"
"net/url"
"path/filepath"
"runtime"

"github.com/aws/aws-sdk-go/aws"

Expand All @@ -15,6 +16,10 @@ func Connect(backendUrl string) (Store, error) {
}

func ConnectWithConfig(config StoreConfig) (Store, error) {
if config.Workers <= 0 {
config.Workers = uint32(runtime.NumCPU()) - 1
}

parsed, err := url.Parse(config.Url)
if err != nil {
return nil, err
Expand Down

0 comments on commit 033de79

Please sign in to comment.