Skip to content

Commit

Permalink
fix(*): add missing close on process that runs commands as bash, rena…
Browse files Browse the repository at this point in the history
…me abort to close

> open /tmp/tmpbash888507873.bash: too many open files

Signed-off-by: Gyuho Lee <[email protected]>
  • Loading branch information
gyuho committed Jan 17, 2025
1 parent b794199 commit 751d553
Show file tree
Hide file tree
Showing 28 changed files with 167 additions and 52 deletions.
2 changes: 1 addition & 1 deletion cmd/gpud/command/join.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ func runCommand(ctx context.Context, script string, result *string) error {
return err
}
}
if err := p.Abort(ctx); err != nil {
if err := p.Close(ctx); err != nil {
return err
}
return nil
Expand Down
5 changes: 5 additions & 0 deletions components/accelerator/nvidia/query/detect.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,11 @@ func ListNVIDIAPCIs(ctx context.Context) ([]string, error) {
if err := p.Start(ctx); err != nil {
return nil, err
}
defer func() {
if err := p.Close(ctx); err != nil {
log.Logger.Warnw("failed to abort command", "err", err)
}
}()

lines := make([]string, 0)
if err := process.Read(
Expand Down
6 changes: 6 additions & 0 deletions components/accelerator/nvidia/query/infiniband/infiniband.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"os/exec"
"strings"

"github.com/leptonai/gpud/log"
"github.com/leptonai/gpud/pkg/file"
"github.com/leptonai/gpud/pkg/process"
)
Expand Down Expand Up @@ -63,6 +64,11 @@ func CountInfinibandPCIBuses(ctx context.Context) (int, error) {
if err := p.Start(ctx); err != nil {
return 0, err
}
defer func() {
if err := p.Close(ctx); err != nil {
log.Logger.Warnw("failed to abort command", "err", err)
}
}()

count := 0

Expand Down
5 changes: 5 additions & 0 deletions components/accelerator/nvidia/query/nvidia_smi_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,11 @@ func RunSMI(ctx context.Context, args ...string) ([]byte, error) {
if err := p.Start(ctx); err != nil {
return nil, err
}
defer func() {
if err := p.Close(ctx); err != nil {
log.Logger.Warnw("failed to abort command", "err", err)
}
}()

// in case of driver issue, the nvidia-smi is stuck in "state:D" -- uninterruptible sleep state
// which may not handle the os kill signal from the context timeout/cancellation
Expand Down
10 changes: 6 additions & 4 deletions components/accelerator/nvidia/query/peermem/peermem.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"time"

"github.com/leptonai/gpud/components/accelerator/nvidia/query/infiniband"
"github.com/leptonai/gpud/log"
"github.com/leptonai/gpud/pkg/process"
)

Expand All @@ -35,6 +36,11 @@ func CheckLsmodPeermemModule(ctx context.Context) (*LsmodPeermemModuleOutput, er
if err := proc.Start(ctx); err != nil {
return nil, err
}
defer func() {
if err := proc.Close(ctx); err != nil {
log.Logger.Warnw("failed to abort command", "err", err)
}
}()

// e.g.,
// sudo lsmod | grep nvidia_peermem
Expand All @@ -59,10 +65,6 @@ func CheckLsmodPeermemModule(ctx context.Context) (*LsmodPeermemModuleOutput, er
return nil, fmt.Errorf("failed to read lsmod output: %w\n\noutput:\n%s", err, strings.Join(lines, "\n"))
}

if perr := proc.Abort(ctx); perr != nil {
return nil, err
}

o := &LsmodPeermemModuleOutput{
IbstatExists: infiniband.IbstatExists(),
InfinibandClassExists: infiniband.CountInfinibandClass() > 0,
Expand Down
10 changes: 7 additions & 3 deletions components/diagnose/diagnose.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/leptonai/gpud/components/dmesg"
query_log_common "github.com/leptonai/gpud/components/query/log/common"
query_log_tail "github.com/leptonai/gpud/components/query/log/tail"
"github.com/leptonai/gpud/log"
"github.com/leptonai/gpud/pkg/host"
"github.com/leptonai/gpud/pkg/process"
pkd_systemd "github.com/leptonai/gpud/pkg/systemd"
Expand Down Expand Up @@ -480,6 +481,12 @@ func (o *output) runCommand(ctx context.Context, subDir string, args ...string)
if serr := p.Start(ctx); serr != nil {
return serr
}
defer func() {
if err := p.Close(ctx); err != nil {
log.Logger.Warnw("failed to abort command", "err", err)
}
}()

select {
case <-ctx.Done():
return ctx.Err()
Expand All @@ -491,9 +498,6 @@ func (o *output) runCommand(ctx context.Context, subDir string, args ...string)
})
}
}
if err := p.Abort(ctx); err != nil {
return err
}

return nil
}
6 changes: 6 additions & 0 deletions components/dmesg/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (

query_config "github.com/leptonai/gpud/components/query/config"
query_log_config "github.com/leptonai/gpud/components/query/log/config"
"github.com/leptonai/gpud/log"
"github.com/leptonai/gpud/pkg/dmesg"
"github.com/leptonai/gpud/pkg/process"
)
Expand Down Expand Up @@ -97,6 +98,11 @@ func checkDmesgSupportsSinceFlag(ctx context.Context) bool {
if err := p.Start(ctx); err != nil {
return false
}
defer func() {
if err := p.Close(ctx); err != nil {
log.Logger.Warnw("failed to abort command", "err", err)
}
}()

lines := make([]string, 0)
if err := process.Read(
Expand Down
9 changes: 6 additions & 3 deletions components/query/log/tail/scan.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,12 @@ func Scan(ctx context.Context, opts ...OpOption) (int, error) {
if err := p.Start(ctx); err != nil {
return 0, err
}
defer func() {
if err := p.Close(ctx); err != nil {
log.Logger.Warnw("failed to abort command", "err", err)
}
}()

select {
case <-ctx.Done():
return 0, ctx.Err()
Expand All @@ -61,9 +67,6 @@ func Scan(ctx context.Context, opts ...OpOption) (int, error) {
if err := f.Sync(); err != nil {
return 0, err
}
if err := p.Abort(ctx); err != nil {
return 0, err
}
}

f, err := os.Open(file)
Expand Down
4 changes: 4 additions & 0 deletions components/query/log/tail/streamer_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,10 @@ func (sr *commandStreamer) waitCommand() {
sr.dedup.mu.Unlock()
seenPool.Put(sr.dedup)
}

if err := sr.proc.Close(sr.ctx); err != nil {
log.Logger.Warnw("failed to abort command", "err", err)
}
}()

select {
Expand Down
6 changes: 6 additions & 0 deletions manager/controllers/package_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,12 @@ func runCommand(ctx context.Context, script, arg string, result *string) error {
if err = p.Start(ctx); err != nil {
return err
}
defer func() {
if err := p.Close(ctx); err != nil {
log.Logger.Warnw("failed to abort command", "err", err)
}
}()

if result != nil {
go func() {
lines := make([]string, 0)
Expand Down
6 changes: 6 additions & 0 deletions pkg/disk/findmnt.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"strconv"
"strings"

"github.com/leptonai/gpud/log"
"github.com/leptonai/gpud/pkg/file"
"github.com/leptonai/gpud/pkg/process"

Expand All @@ -31,6 +32,11 @@ func FindMnt(ctx context.Context, target string) (*FindMntOutput, error) {
if err := p.Start(ctx); err != nil {
return nil, err
}
defer func() {
if err := p.Close(ctx); err != nil {
log.Logger.Warnw("failed to abort command", "err", err)
}
}()

lines := make([]string, 0)
if err := process.Read(
Expand Down
10 changes: 10 additions & 0 deletions pkg/disk/lsblk.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,11 @@ func decideLsblkFlag(ctx context.Context) (string, func([]byte, ...OpOption) (Bl
if err := p.Start(ctx); err != nil {
return "", nil, err
}
defer func() {
if err := p.Close(ctx); err != nil {
log.Logger.Warnw("failed to abort command", "err", err)
}
}()

lines := make([]string, 0)
if err := process.Read(
Expand Down Expand Up @@ -136,6 +141,11 @@ func GetBlockDevices(ctx context.Context, opts ...OpOption) (BlockDevices, error
if err := p.Start(ctx); err != nil {
return nil, err
}
defer func() {
if err := p.Close(ctx); err != nil {
log.Logger.Warnw("failed to abort command", "err", err)
}
}()

lines := make([]string, 0)
if err := process.Read(
Expand Down
5 changes: 5 additions & 0 deletions pkg/host/machine_id.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,11 @@ func DmidecodeUUID(ctx context.Context) (string, error) {
if err := p.Start(ctx); err != nil {
return "", err
}
defer func() {
if err := p.Close(ctx); err != nil {
log.Logger.Warnw("failed to abort command", "err", err)
}
}()

lines := make([]string, 0)
uuid := ""
Expand Down
11 changes: 11 additions & 0 deletions pkg/host/virt.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"strings"

"github.com/leptonai/gpud/log"
"github.com/leptonai/gpud/pkg/file"
"github.com/leptonai/gpud/pkg/process"
)
Expand Down Expand Up @@ -59,6 +60,11 @@ func SystemdDetectVirt(ctx context.Context) (VirtualizationEnvironment, error) {
if err := p.Start(ctx); err != nil {
return VirtualizationEnvironment{}, err
}
defer func() {
if err := p.Close(ctx); err != nil {
log.Logger.Warnw("failed to abort command", "err", err)
}
}()

lines := make([]string, 0)
if err := process.Read(
Expand Down Expand Up @@ -116,6 +122,11 @@ func SystemManufacturer(ctx context.Context) (string, error) {
if err := p.Start(ctx); err != nil {
return "", err
}
defer func() {
if err := p.Close(ctx); err != nil {
log.Logger.Warnw("failed to abort command", "err", err)
}
}()

lines := make([]string, 0)
if err := process.Read(
Expand Down
6 changes: 6 additions & 0 deletions pkg/pci/pci.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"regexp"
"strings"

"github.com/leptonai/gpud/log"
"github.com/leptonai/gpud/pkg/file"
"github.com/leptonai/gpud/pkg/process"

Expand Down Expand Up @@ -37,6 +38,11 @@ func List(ctx context.Context, opts ...OpOption) (Devices, error) {
if err := p.Start(ctx); err != nil {
return nil, err
}
defer func() {
if err := p.Close(ctx); err != nil {
log.Logger.Warnw("failed to abort command", "err", err)
}
}()

scanner := bufio.NewScanner(p.StdoutReader())
devs, err := parseLspciVVV(ctx, scanner, op.nameMatchFunc)
Expand Down
4 changes: 2 additions & 2 deletions pkg/process/examples/bash-script-output-to-file/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,10 @@ echo hello
panic("timeout")
}

if err := p.Abort(ctx); err != nil {
if err := p.Close(ctx); err != nil {
panic(err)
}
if err := p.Abort(ctx); err != nil {
if err := p.Close(ctx); err != nil {
panic(err)
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/process/examples/restart-commands/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,10 @@ func main() {
panic("timeout")
}

if err := p.Abort(ctx); err != nil {
if err := p.Close(ctx); err != nil {
panic(err)
}
if err := p.Abort(ctx); err != nil {
if err := p.Close(ctx); err != nil {
panic(err)
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/process/examples/simple-command/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,10 @@ func main() {
panic(err)
}

if err := p.Abort(ctx); err != nil {
if err := p.Close(ctx); err != nil {
panic(err)
}
if err := p.Abort(ctx); err != nil {
if err := p.Close(ctx); err != nil {
panic(err)
}
}
4 changes: 2 additions & 2 deletions pkg/process/examples/simple-commands/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,10 @@ func main() {
panic(err)
}

if err := p.Abort(ctx); err != nil {
if err := p.Close(ctx); err != nil {
panic(err)
}
if err := p.Abort(ctx); err != nil {
if err := p.Close(ctx); err != nil {
panic(err)
}
}
4 changes: 2 additions & 2 deletions pkg/process/examples/stream-blocking-commands/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,10 @@ func main() {
panic(err)
}

if err := p.Abort(ctx); err != nil {
if err := p.Close(ctx); err != nil {
panic(err)
}
if err := p.Abort(ctx); err != nil {
if err := p.Close(ctx); err != nil {
panic(err)
}
}
4 changes: 2 additions & 2 deletions pkg/process/examples/stream-commands/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,10 @@ func main() {
panic(err)
}

if err := p.Abort(ctx); err != nil {
if err := p.Close(ctx); err != nil {
panic(err)
}
if err := p.Abort(ctx); err != nil {
if err := p.Close(ctx); err != nil {
panic(err)
}
}
Loading

0 comments on commit 751d553

Please sign in to comment.