Skip to content

Commit

Permalink
refactor: reap zombie process and opt-out with no-reap flag (#172)
Browse files Browse the repository at this point in the history
* fix: random waitid error

fix #171

* fix(reap): forward signal

* refactor: modify reaper to get supercronic exitStatus

* fix(reaper): unify signal list & fix signal forward

* chore: replace ioutil to io

* fix(test): ci timeout

* opt-out with no-reap flag

Co-authored-by: Josh Raker <[email protected]>

* fix: typo on signal

* fix: args pass to supercronic

* fix(test): remove removed flag

* chore: remove misleading comment

---------

Co-authored-by: Josh Raker <[email protected]>
  • Loading branch information
qianlongzt and joshraker authored Oct 10, 2024
1 parent bfb0d97 commit 8b4edf5
Show file tree
Hide file tree
Showing 9 changed files with 176 additions and 22 deletions.
4 changes: 2 additions & 2 deletions cron/cron_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package cron
import (
"context"
"fmt"
"io/ioutil"
"io"
"regexp"
"strings"
"sync"
Expand Down Expand Up @@ -41,7 +41,7 @@ func (hook *testHook) Levels() []logrus.Level {

func newTestLogger() (*logrus.Entry, chan *logrus.Entry) {
logger := logrus.New()
logger.Out = ioutil.Discard
logger.Out = io.Discard
logger.Level = logrus.DebugLevel

channel := make(chan *logrus.Entry, TEST_CHANNEL_BUFFER_SIZE)
Expand Down
1 change: 0 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ require (
github.com/evalphobia/logrus_sentry v0.8.2
github.com/fsnotify/fsnotify v1.7.0
github.com/prometheus/client_golang v1.20.2
github.com/ramr/go-reaper v0.2.1
github.com/sirupsen/logrus v1.9.3
github.com/stretchr/testify v1.9.0
)
Expand Down
2 changes: 0 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,6 @@ github.com/prometheus/common v0.57.0 h1:Ro/rKjwdq9mZn1K5QPctzh+MA4Lp0BuYk5ZZEVho
github.com/prometheus/common v0.57.0/go.mod h1:7uRPFSUTbfZWsJ7MHY56sqt7hLQu3bxXHDnNhl8E9qI=
github.com/prometheus/procfs v0.15.1 h1:YagwOFzUgYfKKHX6Dr+sHT7km/hxC76UB0learggepc=
github.com/prometheus/procfs v0.15.1/go.mod h1:fB45yRUv8NstnjriLhBQLuOUt+WW4BsoGhij/e3PBqk=
github.com/ramr/go-reaper v0.2.1 h1:zww+wlQOvTjBZuk1920R/e0GFEb6O7+B0WQLV6dM924=
github.com/ramr/go-reaper v0.2.1/go.mod h1:AVypdzrcCXjSc/JYnlXl8TsB+z84WyFzxWE8Jh0MOJc=
github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU5NdKM8=
github.com/rogpeppe/go-internal v1.12.0/go.mod h1:E+RYuTGaKKdloAfM02xzb0FW3Paa99yedzYV+kq4uf4=
github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ=
Expand Down
1 change: 1 addition & 0 deletions integration/normal.crontab
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
* * * * * * * echo 1
32 changes: 26 additions & 6 deletions integration/test.bats
Original file line number Diff line number Diff line change
Expand Up @@ -102,14 +102,34 @@ wait_for() {
! run_supercronic -test "${BATS_TEST_DIRNAME}/invalid.crontab"
}

@test "reap zombie process" {
@test "it run as pid 1 and reap zombie process" {
out="${WORK_DIR}/zombie-crontab-out"

# run in new process namespace
sudo timeout 10s unshare --fork --pid --mount-proc \
${BATS_TEST_DIRNAME}/../supercronic "${BATS_TEST_DIRNAME}/zombie.crontab" &
${BATS_TEST_DIRNAME}/../supercronic "${BATS_TEST_DIRNAME}/zombie.crontab" >"$out" 2>&1 &
local pid=$!
sleep 1.5
run bash -c "ps axo pid=,stat=|grep Z"
sleep 3

kill -TERM ${pid}

[[ "$status" -eq 1 ]]
# todo: use other method to detect zombie cleanup
wait_for grep "reaper cleanup: pid=" "$out"
}


@test "it run as pid 1 and normal crontab no error" {
out="${WORK_DIR}/normal-crontab-out"

# sleep 30 seconds occur found bug
# FIXME: other way to detect
sudo timeout 30s unshare --fork --pid --mount-proc \
"${BATS_TEST_DIRNAME}/../supercronic" "${BATS_TEST_DIRNAME}/normal.crontab" >"$out" 2>&1 &
# https://github.com/aptible/supercronic/issues/171
local pid=$!
local foundErr

sleep 29.5
kill -TERM ${pid}
grep "waitid: no child processes" "$out" && foundErr=1
[[ $foundErr != 1 ]]
}
2 changes: 1 addition & 1 deletion integration/zombie.crontab
Original file line number Diff line number Diff line change
@@ -1 +1 @@
* * * * * * * /bin/sleep 1 & exec /bin/sleep 0
* * * * * * * /bin/sleep 0.1 & exec /bin/sleep 0
6 changes: 3 additions & 3 deletions log/hook/splitstream.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
package hook

import (
"github.com/sirupsen/logrus"
"io"
"io/ioutil"

"github.com/sirupsen/logrus"
)

type writerHook struct {
Expand All @@ -25,7 +25,7 @@ func (h *writerHook) Fire(entry *logrus.Entry) error {
}

func RegisterSplitLogger(logger *logrus.Logger, outWriter io.Writer, errWriter io.Writer) {
logger.SetOutput(ioutil.Discard)
logger.SetOutput(io.Discard)

logger.AddHook(&writerHook{
writer: outWriter,
Expand Down
27 changes: 20 additions & 7 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (
"github.com/aptible/supercronic/prometheus_metrics"
"github.com/evalphobia/logrus_sentry"
"github.com/fsnotify/fsnotify"
reaper "github.com/ramr/go-reaper"
"github.com/sirupsen/logrus"
)

Expand All @@ -31,6 +30,8 @@ func main() {
json := flag.Bool("json", false, "enable JSON logging")
test := flag.Bool("test", false, "test crontab (does not run jobs)")
inotify := flag.Bool("inotify", false, "use inotify to detect crontab file changes")
// If this flag changes, update forkExec to disable reaping in the child process
disableReap := flag.Bool("no-reap", false, "disable reaping of dead processes, note: reaping requires pid 1")
prometheusListen := flag.String(
"prometheus-listen-address",
"",
Expand Down Expand Up @@ -101,7 +102,19 @@ func main() {
os.Exit(2)
return
}

if !*disableReap {
if os.Getpid() == 1 {
// Clean up zombie processes caused by incorrect crontab commands
// Use forkExec to avoid random waitid errors
// https://github.com/aptible/supercronic/issues/88
// https://github.com/aptible/supercronic/issues/171
logrus.Info("reaping dead processes")
forkExec()
return
}

logrus.Warn("process reaping disabled, not pid 1")
}
crontabFileName := flag.Args()[0]

var watcher *fsnotify.Watcher
Expand Down Expand Up @@ -165,12 +178,8 @@ func main() {
}()
}

// Start background reaping of orphaned child processes.
go reaper.Reap()
// _ = reaper.Reap

termChan := make(chan os.Signal, 1)
signal.Notify(termChan, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT, syscall.SIGUSR2)
signal.Notify(termChan, signalList...)

if *inotify {
go func() {
Expand Down Expand Up @@ -266,3 +275,7 @@ func readCrontabAtPath(path string) (*crontab.Crontab, error) {

return crontab.ParseCrontab(file)
}

var signalList = []os.Signal{
syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT, syscall.SIGUSR2,
}
123 changes: 123 additions & 0 deletions reaper.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
package main

import (
"os"
"os/signal"
"syscall"

"github.com/sirupsen/logrus"
)

func forkExec() {

// run supercronic in other process
pwd, err := os.Getwd()
if err != nil {
logrus.Fatalf("Failed to get current working directory: %s", err.Error())
return
}

pattrs := &syscall.ProcAttr{
Dir: pwd,
Env: os.Environ(),
Files: []uintptr{
uintptr(syscall.Stdin),
uintptr(syscall.Stdout),
uintptr(syscall.Stderr),
},
}
args := make([]string, 0, len(os.Args)+1)
// disable reaping for supercronic, avoid no sense warning
args = append(args, os.Args[0], "-no-reap")
args = append(args, os.Args[1:]...)

pid, err := syscall.ForkExec(args[0], args, pattrs)
if err != nil {
logrus.Fatalf("Failed to fork exec: %s", err.Error())
return
}

// forward signal to supercronic
signalToFork(pid)
// got supercronic exit status
wstatus := reapChildren(pid)
os.Exit(wstatus.ExitStatus())
}

func signalToFork(pid int) {
p, err := os.FindProcess(pid)
if err != nil {
logrus.Fatalf("Failed findProcess supercronic pid:%d,%s", pid, err.Error())
}
termChan := make(chan os.Signal, 1)
signal.Notify(termChan, signalList...)
go func() {
for {
s := <-termChan
if err := p.Signal(s); err != nil {
logrus.Errorf("Failed to send signal to supercronic: %s", err.Error())
}
}
}()
}

// copy from https://github.com/ramr/go-reaper
// modify for wait exit status of supercronic
// without modify, supercronic exit status may not be obtained

// Be a good parent - clean up behind the children.
func reapChildren(superCrondPid int) syscall.WaitStatus {
var notifications = make(chan os.Signal, 1)

go sigChildHandler(notifications)

// all child
const rpid = -1
var wstatus syscall.WaitStatus

for {
var sig = <-notifications
logrus.Debugf("reaper received signal %v\n", sig)
for {
pid, err := syscall.Wait4(rpid, &wstatus, 0, nil)
for syscall.EINTR == err {
pid, err = syscall.Wait4(pid, &wstatus, 0, nil)
}

if syscall.ECHILD == err {
break
}

if superCrondPid == pid {
logrus.Debugf("supercronic exit, pid=%d, wstatus=%+v, err=%+v\n", pid, wstatus, err)
return wstatus
}
// note: change output need change test
logrus.Warnf("reaper cleanup: pid=%d, wstatus=%+v\n",
pid, wstatus)
}
}

}

// Handle death of child (SIGCHLD) messages. Pushes the signal onto the
// notifications channel if there is a waiter.
func sigChildHandler(notifications chan os.Signal) {
var sigs = make(chan os.Signal, 3)
signal.Notify(sigs, syscall.SIGCHLD)

for {
var sig = <-sigs
select {
case notifications <- sig: /* published it. */
default:
/*
* Notifications channel full - drop it to the
* floor. This ensures we don't fill up the SIGCHLD
* queue. The reaper just waits for any child
* process (pid=-1), so we ain't loosing it!! ;^)
*/
}
}

} /* End of function sigChildHandler. */

0 comments on commit 8b4edf5

Please sign in to comment.