diff --git a/.gitignore b/.gitignore index 3465c14c..61aafd53 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,5 @@ example/example cmd/cgctl/cgctl + +*.swp +coverage.txt diff --git a/cgroup2/manager.go b/cgroup2/manager.go index 22bf3d7b..dbffdf33 100644 --- a/cgroup2/manager.go +++ b/cgroup2/manager.go @@ -865,14 +865,7 @@ func NewSystemd(slice, group string, pid int, resources *Resources) (*Manager, e newSystemdProperty("TasksMax", uint64(resources.Pids.Max))) } - statusChan := make(chan string, 1) - if _, err := conn.StartTransientUnitContext(ctx, group, "replace", properties, statusChan); err == nil { - select { - case <-statusChan: - case <-time.After(time.Second): - logrus.Warnf("Timed out while waiting for StartTransientUnit(%s) completion signal from dbus. Continuing...", group) - } - } else if !isUnitExists(err) { + if err := startUnit(conn, group, properties, pid == -1); err != nil { return &Manager{}, err } @@ -881,6 +874,60 @@ func NewSystemd(slice, group string, pid int, resources *Resources) (*Manager, e }, nil } +func startUnit(conn *systemdDbus.Conn, group string, properties []systemdDbus.Property, ignoreExists bool) error { + ctx := context.TODO() + + statusChan := make(chan string, 1) + defer close(statusChan) + + retry := true + started := false + + for !started { + if _, err := conn.StartTransientUnitContext(ctx, group, "replace", properties, statusChan); err != nil { + if !isUnitExists(err) { + return err + } + + if ignoreExists { + return nil + } + + if retry { + retry = false + // When a unit of the same name already exists, it may be a leftover failed unit. + // If we reset it once, systemd can try to remove it. + attemptFailedUnitReset(conn, group) + continue + } + + return err + } else { + started = true + } + } + + select { + case s := <-statusChan: + if s != "done" { + attemptFailedUnitReset(conn, group) + return fmt.Errorf("error creating systemd unit `%s`: got `%s`", group, s) + } + case <-time.After(30 * time.Second): + logrus.Warnf("Timed out while waiting for StartTransientUnit(%s) completion signal from dbus. Continuing...", group) + } + + return nil +} + +func attemptFailedUnitReset(conn *systemdDbus.Conn, group string) { + err := conn.ResetFailedUnitContext(context.TODO(), group) + + if err != nil { + logrus.Warnf("Unable to reset failed unit: %v", err) + } +} + func LoadSystemd(slice, group string) (*Manager, error) { if slice == "" { slice = defaultSlice diff --git a/cgroup2/manager_test.go b/cgroup2/manager_test.go index b4515296..ad963450 100644 --- a/cgroup2/manager_test.go +++ b/cgroup2/manager_test.go @@ -29,15 +29,8 @@ import ( "go.uber.org/goleak" ) -//nolint:staticcheck // Staticcheck false positives for nil pointer deference after t.Fatal -func TestEventChanCleanupOnCgroupRemoval(t *testing.T) { - checkCgroupMode(t) - - cmd := exec.Command("cat") - stdin, err := cmd.StdinPipe() - if err != nil { - t.Fatalf("Failed to create cat process: %v", err) - } +func setupForNewSystemd(t *testing.T) (cmd *exec.Cmd, group string) { + cmd = exec.Command("cat") if err := cmd.Start(); err != nil { t.Fatalf("Failed to start cat process: %v", err) } @@ -46,6 +39,62 @@ func TestEventChanCleanupOnCgroupRemoval(t *testing.T) { t.Fatal("Process is nil") } + group = fmt.Sprintf("testing-watcher-%d.scope", proc.Pid) + + return +} + +func TestErrorsWhenUnitAlreadyExists(t *testing.T) { + checkCgroupMode(t) + + cmd, group := setupForNewSystemd(t) + proc := cmd.Process + + _, err := NewSystemd("", group, proc.Pid, &Resources{}) + if err != nil { + t.Fatalf("Failed to init new cgroup manager: %v", err) + } + + _, err = NewSystemd("", group, proc.Pid, &Resources{}) + if err == nil { + t.Fatal("Expected recreating cgroup manager should fail") + } else if !isUnitExists(err) { + t.Fatalf("Failed to init cgroup manager with unexpected error: %v", err) + } +} + +// kubelet relies on this behavior to make sure a slice exists +func TestIgnoreUnitExistsWhenPidNegativeOne(t *testing.T) { + checkCgroupMode(t) + + cmd, group := setupForNewSystemd(t) + proc := cmd.Process + + _, err := NewSystemd("", group, proc.Pid, &Resources{}) + if err != nil { + t.Fatalf("Failed to init new cgroup manager: %v", err) + } + + _, err = NewSystemd("", group, -1, &Resources{}) + if err != nil { + t.Fatalf("Expected to be able to recreate cgroup manager: %v", err) + } +} + +//nolint:staticcheck // Staticcheck false positives for nil pointer deference after t.Fatal +func TestEventChanCleanupOnCgroupRemoval(t *testing.T) { + checkCgroupMode(t) + + cmd := exec.Command("cat") + stdin, err := cmd.StdinPipe() + require.NoError(t, err, "failed to create cat process") + + err = cmd.Start() + require.NoError(t, err, "failed to start cat process") + + proc := cmd.Process + require.NotNil(t, proc, "process was nil") + group := fmt.Sprintf("testing-watcher-%d.scope", proc.Pid) c, err := NewSystemd("", group, proc.Pid, &Resources{}) if err != nil {