Skip to content

Commit

Permalink
Support synchronous AssertAndFetch for sleep package.
Browse files Browse the repository at this point in the history
Some synchronization patterns require the ability to simultaneously wake and
sleep a goroutine. For the sleep package, this is the case when a waker must be
asserted when a subsequent fetch is imminent.

Currently, this operation results in significant P churn in the runtime, which
ping-pongs execution between multiple system threads and cores and consumes a
significant amount of host CPU (and because of the context switches, this can
be significant worse with mitigations for side channel vulnerabilities).

The solution is to introduce a dedicated mechanism for a synchronous switch
which does not wake another runtime P (see golang/go#32113). This can be used
by the `AssertAndFetch` API in the sleep package.

The benchmark results for this package are very similiar to raw channel
operations for all cases, with the exception of operations that do not wait.
The primary advantage is more precise control over scheduling. This will be
used in a subsequent change.

```
BenchmarkGoAssertNonWaiting
BenchmarkGoAssertNonWaiting-8                   261364384                4.976 ns/op
BenchmarkGoSingleSelect
BenchmarkGoSingleSelect-8                       20946358                57.77 ns/op
BenchmarkGoMultiSelect
BenchmarkGoMultiSelect-8                         6071697               197.0 ns/op
BenchmarkGoWaitOnSingleSelect
BenchmarkGoWaitOnSingleSelect-8                  4978051               235.4 ns/op
BenchmarkGoWaitOnMultiSelect
BenchmarkGoWaitOnMultiSelect-8                   2309224               520.2 ns/op

BenchmarkSleeperAssertNonWaiting
BenchmarkSleeperAssertNonWaiting-8              447325033                2.657 ns/op
BenchmarkSleeperSingleSelect
BenchmarkSleeperSingleSelect-8                  21488844                55.19 ns/op
BenchmarkSleeperMultiSelect
BenchmarkSleeperMultiSelect-8                   21851674                54.89 ns/op
BenchmarkSleeperWaitOnSingleSelect
BenchmarkSleeperWaitOnSingleSelect-8             2860327               416.4 ns/op
BenchmarkSleeperWaitOnSingleSelectSync
BenchmarkSleeperWaitOnSingleSelectSync-8         2741733               427.1 ns/op
BenchmarkSleeperWaitOnMultiSelect
BenchmarkSleeperWaitOnMultiSelect-8              2867484               418.1 ns/op
BenchmarkSleeperWaitOnMultiSelectSync
BenchmarkSleeperWaitOnMultiSelectSync-8          2789158               427.9 ns/op
```

PiperOrigin-RevId: 406873844
  • Loading branch information
amscanne authored and gvisor-bot committed Nov 1, 2021
1 parent 9776edb commit 4669f70
Show file tree
Hide file tree
Showing 13 changed files with 502 additions and 304 deletions.
240 changes: 181 additions & 59 deletions pkg/sleep/sleep_test.go

Large diffs are not rendered by default.

149 changes: 91 additions & 58 deletions pkg/sleep/sleep_unsafe.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,10 @@
//
// // Called repeatedly.
// for {
// switch id, _ := s.Fetch(true); id {
// case constant1:
// switch w, _ := s.Fetch(true); w {
// case &w1:
// // Do work triggered by w1 being asserted.
// case constant2:
// case &w2:
// // Do work triggered by w2 being asserted.
// }
// }
Expand Down Expand Up @@ -119,20 +119,27 @@ type Sleeper struct {
waitingG uintptr
}

// AddWaker associates the given waker to the sleeper. id is the value to be
// returned when the sleeper is woken by the given waker.
func (s *Sleeper) AddWaker(w *Waker, id int) {
// AddWaker associates the given waker to the sleeper.
func (s *Sleeper) AddWaker(w *Waker) {
if w.allWakersNext != nil {
// This implies its owned by another sleeper.
panic("waker has non-nil allWakersNext")
}
if w.next != nil {
// This implies its still queued by another sleeper.
panic("waker has non-nil next")
}

// Add the waker to the list of all wakers.
w.allWakersNext = s.allWakers
s.allWakers = w
w.id = id

// Try to associate the waker with the sleeper. If it's already
// asserted, we simply enqueue it in the "ready" list.
for {
p := (*Sleeper)(atomic.LoadPointer(&w.s))
if p == &assertedSleeper {
s.enqueueAssertedWaker(w)
s.enqueueAssertedWaker(w, true /* wakep */)
return
}

Expand All @@ -143,8 +150,11 @@ func (s *Sleeper) AddWaker(w *Waker, id int) {
}

// nextWaker returns the next waker in the notification list, blocking if
// needed.
func (s *Sleeper) nextWaker(block bool) *Waker {
// needed. The parameter wakepOrSleep indicates that if the operation does not
// block, then we will need to explicitly wake a runtime P.
//
// Precondition: wakepOrSleep may be true iff block is true.
func (s *Sleeper) nextWaker(block, wakepOrSleep bool) *Waker {
// Attempt to replenish the local list if it's currently empty.
if s.localList == nil {
for atomic.LoadPointer(&s.sharedList) == nil {
Expand All @@ -168,6 +178,10 @@ func (s *Sleeper) nextWaker(block bool) *Waker {
break
}

// Since we are sleeping for sure, we no longer
// need to wakep once we get a value.
wakepOrSleep = false

// Try to commit the sleep and report it to the
// tracer as a select.
//
Expand Down Expand Up @@ -198,6 +212,11 @@ func (s *Sleeper) nextWaker(block bool) *Waker {
w := s.localList
s.localList = w.next

// Do we need to wake a P?
if wakepOrSleep {
sync.Wakep()
}

return w
}

Expand All @@ -213,42 +232,60 @@ func commitSleep(g uintptr, waitingG unsafe.Pointer) bool {
return sync.RaceUncheckedAtomicCompareAndSwapUintptr((*uintptr)(waitingG), preparingG, g)
}

// Fetch fetches the next wake-up notification. If a notification is immediately
// available, it is returned right away. Otherwise, the behavior depends on the
// value of 'block': if true, the current goroutine blocks until a notification
// arrives, then returns it; if false, returns 'ok' as false.
//
// When 'ok' is true, the value of 'id' corresponds to the id associated with
// the waker; when 'ok' is false, 'id' is undefined.
// fetch is the backing implements for Fetch and AssertAndFetch.
//
// N.B. This method is *not* thread-safe. Only one goroutine at a time is
// allowed to call this method.
func (s *Sleeper) Fetch(block bool) (id int, ok bool) {
// Precondition: per nextWaker.
//go:nosplit
func (s *Sleeper) fetch(block, wakepOrSleep bool) *Waker {
for {
w := s.nextWaker(block)
w := s.nextWaker(block, wakepOrSleep)
if w == nil {
return -1, false
return nil
}

// Reassociate the waker with the sleeper. If the waker was
// still asserted we can return it, otherwise try the next one.
old := (*Sleeper)(atomic.SwapPointer(&w.s, usleeper(s)))
if old == &assertedSleeper {
return w.id, true
return w
}
}
}

// Fetch fetches the next wake-up notification. If a notification is immediately
// available, it is returned right away. Otherwise, the behavior depends on the
// value of 'block': if true, the current goroutine blocks until a notification
// arrives, then returns it; if false, returns 'w' as nil.
//
// N.B. This method is *not* thread-safe. Only one goroutine at a time is
// allowed to call this method.
func (s *Sleeper) Fetch(block bool) *Waker {
return s.fetch(block, false /* wakepOrSleep */)
}

// AssertAndFetch asserts the given waker and fetches the next wake-up notification.
// Note that this will always be blocking, since there is no value in joining a
// non-blocking operation.
//
// N.B. Like Fetch, this method is *not* thread-safe. This will also yield the current
// P to the next goroutine, avoiding associated scheduled overhead.
//+checkescapes:all
//go:nosplit
func (s *Sleeper) AssertAndFetch(n *Waker) *Waker {
n.assert(false /* wakep */)
return s.fetch(true /* block */, true /* wakepOrSleep*/)
}

// Done is used to indicate that the caller won't use this Sleeper anymore. It
// removes the association with all wakers so that they can be safely reused
// by another sleeper after Done() returns.
func (s *Sleeper) Done() {
// Remove all associations that we can, and build a list of the ones
// we could not. An association can be removed right away from waker w
// if w.s has a pointer to the sleeper, that is, the waker is not
// asserted yet. By atomically switching w.s to nil, we guarantee that
// subsequent calls to Assert() on the waker will not result in it being
// queued to this sleeper.
// Remove all associations that we can, and build a list of the ones we
// could not. An association can be removed right away from waker w if
// w.s has a pointer to the sleeper, that is, the waker is not asserted
// yet. By atomically switching w.s to nil, we guarantee that
// subsequent calls to Assert() on the waker will not result in it
// being queued.
var pending *Waker
w := s.allWakers
for w != nil {
Expand All @@ -260,39 +297,34 @@ func (s *Sleeper) Done() {
pending = w
break
}

if atomic.CompareAndSwapPointer(&w.s, t, nil) {
w.allWakersNext = nil
break
}
}
w = next
}
s.allWakers = nil

// The associations that we could not remove are either asserted, or in
// the process of being asserted, or have been asserted and cleared
// before being pulled from the sleeper lists. We must wait for them all
// to make it to the sleeper lists, so that we know that the wakers
// before being pulled from the sleeper lists. We must wait for them
// all to make it to the sleeper lists, so that we know that the wakers
// won't do any more work towards waking this sleeper up.
for pending != nil {
pulled := s.nextWaker(true)

// Remove the waker we just pulled from the list of associated
// wakers.
prev := &pending
for w := *prev; w != nil; w = *prev {
if pulled == w {
*prev = w.allWakersNext
break
}
prev = &w.allWakersNext
}
for w := pending; w != nil; w = w.allWakersNext {
s.nextWaker(true, false) // Dequeue exactly one waiter.
}
for w := pending; w != nil; {
next := w.allWakersNext
w.allWakersNext = nil
w = next
}
s.allWakers = nil
}

// enqueueAssertedWaker enqueues an asserted waker to the "ready" circular list
// of wakers that want to notify the sleeper.
func (s *Sleeper) enqueueAssertedWaker(w *Waker) {
//go:nosplit
func (s *Sleeper) enqueueAssertedWaker(w *Waker, wakep bool) {
// Add the new waker to the front of the list.
for {
v := (*Waker)(atomic.LoadPointer(&s.sharedList))
Expand All @@ -312,7 +344,7 @@ func (s *Sleeper) enqueueAssertedWaker(w *Waker) {
case 0, preparingG:
default:
// We managed to get a G. Wake it up.
sync.Goready(g, 0)
sync.Goready(g, 0, wakep)
}
}

Expand Down Expand Up @@ -349,15 +381,11 @@ type Waker struct {
// allWakersNext is used to form a linked list of all wakers associated
// to a given sleeper.
allWakersNext *Waker

// id is the value to be returned to sleepers when they wake up due to
// this waker being asserted.
id int
}

// Assert moves the waker to an asserted state, if it isn't asserted yet. When
// asserted, the waker will cause its matching sleeper to wake up.
func (w *Waker) Assert() {
// assert is the implementation for Assert.
//go:nosplit
func (w *Waker) assert(wakep bool) {
// Nothing to do if the waker is already asserted. This check allows us
// to complete this case (already asserted) without any interlocked
// operations on x86.
Expand All @@ -367,13 +395,18 @@ func (w *Waker) Assert() {

// Mark the waker as asserted, and wake up a sleeper if there is one.
switch s := (*Sleeper)(atomic.SwapPointer(&w.s, usleeper(&assertedSleeper))); s {
case nil:
case &assertedSleeper:
case nil, &assertedSleeper:
default:
s.enqueueAssertedWaker(w)
s.enqueueAssertedWaker(w, wakep)
}
}

// Assert moves the waker to an asserted state, if it isn't asserted yet. When
// asserted, the waker will cause its matching sleeper to wake up.
func (w *Waker) Assert() {
w.assert(true /* wakep */)
}

// Clear moves the waker to then non-asserted state and returns whether it was
// asserted before being cleared.
//
Expand Down
2 changes: 2 additions & 0 deletions pkg/sync/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ go_library(
"race_amd64.s",
"race_arm64.s",
"race_unsafe.go",
"runtime_amd64.s",
"runtime_arm64.s",
"runtime_unsafe.go",
"rwmutex_unsafe.go",
"seqcount.go",
Expand Down
25 changes: 25 additions & 0 deletions pkg/sync/runtime_amd64.s
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
// Copyright 2018 The gVisor Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

//go:build amd64 && go1.8 && !go1.19 && !goexperiment.staticlockranking
// +build amd64,go1.8,!go1.19,!goexperiment.staticlockranking

#include "textflag.h"

TEXT ·addrOfSpinning(SB),NOSPLIT,$0-8
// The offset specified here is the nmspinning value in sched.
LEAQ runtime·sched(SB), AX
ADDQ $92, AX
MOVQ AX, ret+0(FP)
RET
26 changes: 26 additions & 0 deletions pkg/sync/runtime_arm64.s
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
// Copyright 2018 The gVisor Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

//go:build arm64 && go1.8 && !go1.19 && !goexperiment.staticlockranking
// +build arm64,go1.8,!go1.19,!goexperiment.staticlockranking

#include "textflag.h"

TEXT ·addrOfSpinning(SB),NOSPLIT,$0-8
// The offset specified here is the nmspinning value in sched.
MOVD $runtime·sched(SB), R0
MOVQ $92, R1
ADDQ R0, R1, R0
MOVD R0, ret+0(FP)
RET
29 changes: 28 additions & 1 deletion pkg/sync/runtime_unsafe.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package sync
import (
"fmt"
"reflect"
"sync/atomic"
"unsafe"
)

Expand All @@ -35,11 +36,37 @@ func Gopark(unlockf func(uintptr, unsafe.Pointer) bool, lock unsafe.Pointer, rea
//go:linkname gopark runtime.gopark
func gopark(unlockf func(uintptr, unsafe.Pointer) bool, lock unsafe.Pointer, reason uint8, traceEv byte, traceskip int)

// Wakep is runtime.wakep.
//
//go:nosplit
func Wakep() {
wakep()
}

// addrOfSpinning returns the global location of runtime.sched.nmspinning.
func addrOfSpinning() *int32

// nmspinning is the global location of runtime.sched.nmspinning.
var nmspinning = addrOfSpinning()

//go:linkname wakep runtime.wakep
func wakep()

// Goready is runtime.goready.
//
// The additional wakep argument controls whether a new thread will be kicked to
// execute the P. This should be true in most circumstances. However, if the
// current thread is about to sleep, then this can be false for efficiency.
//
//go:nosplit
func Goready(gp uintptr, traceskip int) {
func Goready(gp uintptr, traceskip int, wakep bool) {
if !wakep {
atomic.AddInt32(nmspinning, 1)
}
goready(gp, traceskip)
if !wakep {
atomic.AddInt32(nmspinning, -1)
}
}

//go:linkname goready runtime.goready
Expand Down
Loading

0 comments on commit 4669f70

Please sign in to comment.