Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add more scheduling options for process runner #9862

Merged
merged 1 commit into from
Dec 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 76 additions & 19 deletions internal/app/machined/pkg/system/runner/process/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@ import (
"os"
"path"
"slices"
"strings"
"syscall"
"time"
"unsafe"

"github.com/containerd/cgroups/v3"
"github.com/containerd/cgroups/v3/cgroup1"
Expand Down Expand Up @@ -104,27 +104,22 @@ type commandWrapper struct {
}

func dropCaps(droppedCapabilities []string, launcher *cap.Launcher) error {
droppedCaps := strings.Join(droppedCapabilities, ",")

if droppedCaps != "" {
caps := strings.Split(droppedCaps, ",")
dropCaps := xslices.Map(caps, func(c string) cap.Value {
capability, capErr := cap.FromName(c)
if capErr != nil {
panic(fmt.Errorf("failed to parse capability: %s", capErr))
}

return capability
})

iab := cap.IABGetProc()
if err := iab.SetVector(cap.Bound, true, dropCaps...); err != nil {
return fmt.Errorf("failed to set capabilities: %w", err)
dropCaps := xslices.Map(droppedCapabilities, func(c string) cap.Value {
capability, capErr := cap.FromName(c)
if capErr != nil {
panic(fmt.Errorf("failed to parse capability: %s", capErr))
}

launcher.SetIAB(iab)
return capability
})

iab := cap.IABGetProc()
if err := iab.SetVector(cap.Bound, true, dropCaps...); err != nil {
return fmt.Errorf("failed to set capabilities: %w", err)
}

launcher.SetIAB(iab)

return nil
}

Expand Down Expand Up @@ -309,7 +304,7 @@ func (p *processRunner) build() (commandWrapper, error) {

// Apply cgroup and OOM score after the process is launched.
//
//nolint:gocyclo
//nolint:gocyclo,cyclop
func applyProperties(p *processRunner, pid int) error {
if p.opts.CgroupPath != "" {
path := cgroup.Path(p.opts.CgroupPath)
Expand Down Expand Up @@ -353,6 +348,68 @@ func applyProperties(p *processRunner, pid int) error {
}
}

if p.opts.Priority != 0 {
if err := syscall.Setpriority(syscall.PRIO_PROCESS, pid, p.opts.Priority); err != nil {
return fmt.Errorf("failed to set priority of process %s to %d: %w", p, p.opts.Priority, err)
}
}

if ioPriority, ioPrioritySet := p.opts.IOPriority.Get(); ioPrioritySet {
err := setIOPriority(p, pid, ioPriority)
if err != nil {
return err
}
}

if schedulingPolicy, schedulingPolicySet := p.opts.SchedulingPolicy.Get(); schedulingPolicySet {
err := setSchedulingPolicy(p, pid, schedulingPolicy)
if err != nil {
return err
}
}

return nil
}

func setIOPriority(p *processRunner, pid int, ioPriority runner.IOPriorityParam) error {
if ioPriority.Class > runner.IoprioClassIdle {
return fmt.Errorf("failed to set IO priority of process %s: class %d is not valid", p, ioPriority.Class)
}

if ioPriority.Priority > 7 {
return fmt.Errorf("failed to set IO priority of process %s: priority %d is not valid", p, ioPriority.Priority)
}

classPos := 13 // IOPRIO_CLASS_SHIFT
priorityValue := ioPriority.Class<<classPos | ioPriority.Priority
sysctlWho := uintptr(1) // IOPRIO_WHO_PROCESS, we don't operate on threads or groups
smira marked this conversation as resolved.
Show resolved Hide resolved

ret, _, syscallError := syscall.Syscall(syscall.SYS_IOPRIO_SET, sysctlWho, uintptr(pid), uintptr(priorityValue))
if int(ret) == -1 {
return fmt.Errorf("failed to set IO priority of process %s to %d: syscall failed with %s", p, priorityValue, syscallError.Error())
}

return nil
}

func setSchedulingPolicy(p *processRunner, pid int, schedulingPolicy uint) error {
if schedulingPolicy > runner.SchedulingPolicyDeadline {
return fmt.Errorf("failed to set scheduling policy of process %s: policy %d is not valid", p, schedulingPolicy)
}

options := struct{ Priority int32 }{
Priority: int32(0),
}

if _, _, syscallError := syscall.Syscall(
syscall.SYS_SCHED_SETSCHEDULER,
uintptr(pid),
uintptr(schedulingPolicy),
uintptr(unsafe.Pointer(&options)),
); syscallError != 0 {
return fmt.Errorf("failed to set scheduling policy of process %s to %d: syscall failed with %s", p, schedulingPolicy, syscallError.Error())
}

return nil
}

Expand Down
150 changes: 150 additions & 0 deletions internal/app/machined/pkg/system/runner/process/process_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,10 @@ import (
"log"
"os"
"path/filepath"
"strconv"
"strings"
"sync"
"syscall"
"testing"
"time"

Expand Down Expand Up @@ -219,6 +222,153 @@ func (suite *ProcessSuite) TestStopSigKill() {
<-done
}

func (suite *ProcessSuite) TestPriority() {
pidFile := filepath.Join(suite.tmpDir, "talos-test-pid-prio")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need to t.Skip() this is os.Getuid() != 0? (to make go test ./... pass without buildkit?)

Copy link
Member Author

@dsseng dsseng Dec 4, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

go test -v ./internal/app/machined/pkg/system/runner/process works for me, it's how I developed this. If we have nice < 17 we can drop nice to 17 (generally) without caps or root

//nolint:errcheck
_ = os.Remove(pidFile)

currentPriority, err := syscall.Getpriority(syscall.PRIO_PROCESS, os.Getpid())
suite.Assert().NoError(err)

if currentPriority <= 3 {
suite.T().Skipf("skipping test, we already have low priority %d", currentPriority)
}

r := process.NewRunner(false, &runner.Args{
ID: "nokill",
ProcessArgs: []string{"/bin/sh", "-c", "echo $BASHPID >> " + pidFile + "; trap -- '' SIGTERM; while :; do :; done"},
},
runner.WithLoggingManager(suite.loggingManager),
runner.WithGracefulShutdownTimeout(10*time.Millisecond),
runner.WithPriority(17),
)
suite.Assert().NoError(r.Open())

defer func() { suite.Assert().NoError(r.Close()) }()

done := make(chan error, 1)

go func() {
done <- r.Run(MockEventSink)
}()

time.Sleep(10 * time.Millisecond)

pidString, err := os.ReadFile(pidFile)
suite.Assert().NoError(err)

pid, err := strconv.ParseUint(strings.Trim(string(pidString), "\r\n"), 10, 32)
suite.Assert().NoError(err)

currentPriority, err = syscall.Getpriority(syscall.PRIO_PROCESS, int(pid))
suite.Assert().NoError(err)
// 40..1 corresponds to -20..19 since system call interface must reserve -1 for error
suite.Assert().Equalf(3, currentPriority, "process priority should be 3 (nice 17), got %d", currentPriority)

time.Sleep(1000 * time.Millisecond)

suite.Assert().NoError(r.Stop())
<-done
}

func (suite *ProcessSuite) TestIOPriority() {
pidFile := filepath.Join(suite.tmpDir, "talos-test-pid-ionice")
//nolint:errcheck
_ = os.Remove(pidFile)

//nolint:errcheck
ioprio, _, _ := syscall.Syscall(syscall.SYS_IOPRIO_GET, uintptr(1), uintptr(os.Getpid()), 0)
suite.Assert().NotEqual(-1, int(ioprio))

if ioprio>>13 == runner.IoprioClassIdle {
suite.T().Skipf("skipping test, we already have idle IO priority %d", ioprio)
}

r := process.NewRunner(false, &runner.Args{
ID: "nokill",
ProcessArgs: []string{"/bin/sh", "-c", "echo $BASHPID >> " + pidFile + "; trap -- '' SIGTERM; while :; do :; done"},
},
runner.WithLoggingManager(suite.loggingManager),
runner.WithGracefulShutdownTimeout(10*time.Millisecond),
runner.WithIOPriority(runner.IoprioClassIdle, 6),
)
suite.Assert().NoError(r.Open())

defer func() { suite.Assert().NoError(r.Close()) }()

done := make(chan error, 1)

go func() {
done <- r.Run(MockEventSink)
}()

time.Sleep(10 * time.Millisecond)

pidString, err := os.ReadFile(pidFile)
suite.Assert().NoError(err)

pid, err := strconv.ParseUint(strings.Trim(string(pidString), "\r\n"), 10, 32)
suite.Assert().NoError(err)

//nolint:errcheck
ioprio, _, _ = syscall.Syscall(syscall.SYS_IOPRIO_GET, uintptr(1), uintptr(pid), 0)
suite.Assert().NotEqual(-1, int(ioprio))
suite.Assert().Equal(runner.IoprioClassIdle<<13+6, int(ioprio))

time.Sleep(10 * time.Millisecond)

suite.Assert().NoError(r.Stop())
<-done
}

func (suite *ProcessSuite) TestSchedulingPolicy() {
pidFile := filepath.Join(suite.tmpDir, "talos-test-pid-sched")
//nolint:errcheck
_ = os.Remove(pidFile)

pol, _, errno := syscall.Syscall(syscall.SYS_SCHED_GETSCHEDULER, uintptr(os.Getpid()), 0, 0)
suite.Assert().Equal(0, int(errno))

if pol == runner.SchedulingPolicyIdle {
suite.T().Skipf("skipping test, we already have idle scheduling policy")
}

r := process.NewRunner(false, &runner.Args{
ID: "nokill",
ProcessArgs: []string{"/bin/sh", "-c", "echo $BASHPID >> " + pidFile + "; trap -- '' SIGTERM; while :; do :; done"},
},
runner.WithLoggingManager(suite.loggingManager),
runner.WithGracefulShutdownTimeout(10*time.Millisecond),
runner.WithSchedulingPolicy(runner.SchedulingPolicyIdle),
)
suite.Assert().NoError(r.Open())

defer func() { suite.Assert().NoError(r.Close()) }()

done := make(chan error, 1)

go func() {
done <- r.Run(MockEventSink)
}()

time.Sleep(10 * time.Millisecond)

pidString, err := os.ReadFile(pidFile)
suite.Assert().NoError(err)

pid, err := strconv.ParseUint(strings.Trim(string(pidString), "\r\n"), 10, 32)
suite.Assert().NoError(err)

pol, _, errno = syscall.Syscall(syscall.SYS_SCHED_GETSCHEDULER, uintptr(pid), 0, 0)
suite.Assert().Equal(0, int(errno))
suite.Assert().Equal(runner.SchedulingPolicyIdle, int(pol))

time.Sleep(10 * time.Millisecond)

suite.Assert().NoError(r.Stop())
<-done
}

func TestProcessSuite(t *testing.T) {
for _, runReaper := range []bool{true, false} {
func(runReaper bool) {
Expand Down
64 changes: 64 additions & 0 deletions internal/app/machined/pkg/system/runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,12 @@ type Args struct {
ProcessArgs []string
}

// IOPriorityParam represents the combination of IO scheduling class and priority.
type IOPriorityParam struct {
Class uint
Priority uint
}

// Options is the functional options struct.
type Options struct {
// LoggingManager provides service log handling.
Expand Down Expand Up @@ -82,6 +88,12 @@ type Options struct {
Ctty optional.Optional[int]
// UID is the user id of the process.
UID uint32
// Priority is the niceness value of the process.
Priority int
// IOPriority is the IO priority value and class of the process.
IOPriority optional.Optional[IOPriorityParam]
// SchedulingPolicy is the scheduling policy of the process.
SchedulingPolicy optional.Optional[uint]
}

// Option is the functional option func.
Expand Down Expand Up @@ -249,3 +261,55 @@ func WithMemoryReservation(limit uint64) oci.SpecOpts {
return nil
}
}

// WithPriority sets the priority of the process.
func WithPriority(priority int) Option {
return func(args *Options) {
args.Priority = priority
}
}

const (
// IoprioClassNone represents IOPRIO_CLASS_NONE.
IoprioClassNone = iota
// IoprioClassRt represents IOPRIO_CLASS_RT.
IoprioClassRt
// IoprioClassBe represents IOPRIO_CLASS_BE.
IoprioClassBe
// IoprioClassIdle represents IOPRIO_CLASS_IDLE.
IoprioClassIdle
)

// WithIOPriority sets the IO priority and class of the process.
func WithIOPriority(class, priority uint) Option {
return func(args *Options) {
args.IOPriority = optional.Some(IOPriorityParam{
Class: class,
Priority: priority,
})
}
}

const (
// SchedulingPolicyNormal represents SCHED_NORMAL.
SchedulingPolicyNormal = iota
// SchedulingPolicyFIFO represents SCHED_FIFO.
SchedulingPolicyFIFO
// SchedulingPolicyRR represents SCHED_RR.
SchedulingPolicyRR
// SchedulingPolicyBatch represents SCHED_BATCH.
SchedulingPolicyBatch
// SchedulingPolicyIsoUnimplemented represents SCHED_ISO.
SchedulingPolicyIsoUnimplemented
// SchedulingPolicyIdle represents SCHED_IDLE.
SchedulingPolicyIdle
// SchedulingPolicyDeadline represents SCHED_DEADLINE.
SchedulingPolicyDeadline
)

// WithSchedulingPolicy sets the scheduling policy of the process.
func WithSchedulingPolicy(policy uint) Option {
return func(args *Options) {
args.SchedulingPolicy = optional.Some(policy)
}
}
1 change: 1 addition & 0 deletions internal/app/machined/pkg/system/services/dashboard.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ func (d *Dashboard) Runner(r runtime.Runtime) (runner.Runner, error) {
runner.WithSelinuxLabel(constants.SelinuxLabelDashboard),
runner.WithCgroupPath(constants.CgroupDashboard),
runner.WithUID(constants.DashboardUserID),
runner.WithPriority(constants.DashboardPriority),
),
restart.WithType(restart.Forever),
), nil
Expand Down
4 changes: 4 additions & 0 deletions pkg/machinery/constants/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -530,6 +530,10 @@ const (
// We use the same user ID as apid so that the dashboard can write to the machined unix socket.
DashboardUserID = ApidUserID

// DashboardPriority is the priority for the dashboard service.
// Higher nice value for the dashboard to give more CPU time to other services when under load.
DashboardPriority = 10

// TrustdPort is the port for the trustd service.
TrustdPort = 50001

Expand Down
Loading