diff --git a/cgroup2/manager.go b/cgroup2/manager.go index 4a4292d5..dde13dc6 100644 --- a/cgroup2/manager.go +++ b/cgroup2/manager.go @@ -870,14 +870,7 @@ func NewSystemd(slice, group string, pid int, resources *Resources) (*Manager, e newSystemdProperty("TasksMax", uint64(resources.Pids.Max))) } - statusChan := make(chan string, 1) - if _, err := conn.StartTransientUnitContext(ctx, group, "replace", properties, statusChan); err == nil { - select { - case <-statusChan: - case <-time.After(time.Second): - logrus.Warnf("Timed out while waiting for StartTransientUnit(%s) completion signal from dbus. Continuing...", group) - } - } else if !isUnitExists(err) { + if err := startUnit(conn, group, properties, pid == -1); err != nil { return &Manager{}, err } @@ -886,6 +879,60 @@ func NewSystemd(slice, group string, pid int, resources *Resources) (*Manager, e }, nil } +func startUnit(conn *systemdDbus.Conn, group string, properties []systemdDbus.Property, ignoreExists bool) error { + ctx := context.TODO() + + statusChan := make(chan string, 1) + defer close(statusChan) + + retry := true + started := false + + for !started { + if _, err := conn.StartTransientUnitContext(ctx, group, "replace", properties, statusChan); err != nil { + if !isUnitExists(err) { + return err + } + + if ignoreExists { + return nil + } + + if retry { + retry = false + // When a unit of the same name already exists, it may be a leftover failed unit. + // If we reset it once, systemd can try to remove it. + attemptFailedUnitReset(conn, group) + continue + } + + return err + } else { + started = true + } + } + + select { + case s := <-statusChan: + if s != "done" { + attemptFailedUnitReset(conn, group) + return fmt.Errorf("error creating systemd unit `%s`: got `%s`", group, s) + } + case <-time.After(30 * time.Second): + logrus.Warnf("Timed out while waiting for StartTransientUnit(%s) completion signal from dbus. Continuing...", group) + } + + return nil +} + +func attemptFailedUnitReset(conn *systemdDbus.Conn, group string) { + err := conn.ResetFailedUnitContext(context.TODO(), group) + + if err != nil { + logrus.Warnf("Unable to reset failed unit: %v", err) + } +} + func LoadSystemd(slice, group string) (*Manager, error) { if slice == "" { slice = defaultSlice diff --git a/cgroup2/manager_test.go b/cgroup2/manager_test.go index a538b07c..117ad7b8 100644 --- a/cgroup2/manager_test.go +++ b/cgroup2/manager_test.go @@ -30,6 +30,49 @@ import ( "go.uber.org/goleak" ) +func setupForNewSystemd(t *testing.T) (cmd *exec.Cmd, group string) { + cmd = exec.Command("cat") + err := cmd.Start() + require.NoError(t, err, "failed to start cat process") + proc := cmd.Process + require.NotNil(t, proc, "process was nil") + + group = fmt.Sprintf("testing-watcher-%d.scope", proc.Pid) + + return +} + +func TestErrorsWhenUnitAlreadyExists(t *testing.T) { + checkCgroupMode(t) + + cmd, group := setupForNewSystemd(t) + proc := cmd.Process + + _, err := NewSystemd("", group, proc.Pid, &Resources{}) + require.NoError(t, err, "Failed to init new cgroup manager") + + _, err = NewSystemd("", group, proc.Pid, &Resources{}) + if err == nil { + t.Fatal("Expected recreating cgroup manager should fail") + } else if !isUnitExists(err) { + t.Fatalf("Failed to init cgroup manager with unexpected error: %s", err) + } +} + +// kubelet relies on this behavior to make sure a slice exists +func TestIgnoreUnitExistsWhenPidNegativeOne(t *testing.T) { + checkCgroupMode(t) + + cmd, group := setupForNewSystemd(t) + proc := cmd.Process + + _, err := NewSystemd("", group, proc.Pid, &Resources{}) + require.NoError(t, err, "Failed to init new cgroup manager") + + _, err = NewSystemd("", group, -1, &Resources{}) + require.NoError(t, err, "Expected to be able to recreate cgroup manager") +} + //nolint:staticcheck // Staticcheck false positives for nil pointer deference after t.Fatal func TestEventChanCleanupOnCgroupRemoval(t *testing.T) { checkCgroupMode(t)