Skip to content

Commit

Permalink
close EC streams when idle, reopen on demand
Browse files Browse the repository at this point in the history
* refactor and amend housekeeper
  - use UnregInterval consistently across
  - add `UnregIf`
  - reduce work chan capacity to 48 (was 512); add "channel full" check
* target: implement on-EC/off-EC handler
  - TODO: revisit 1m delay
* part three, prev. commit: 1114868

Signed-off-by: Alex Aizman <[email protected]>
  • Loading branch information
alex-aizman committed Sep 10, 2024
1 parent a91636b commit 0642c85
Show file tree
Hide file tree
Showing 10 changed files with 126 additions and 96 deletions.
6 changes: 3 additions & 3 deletions ais/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ func initDaemon(version, buildTime string) cos.Runner {

daemon.rg = &rungroup{rs: make(map[string]cos.Runner, 6)}
hk.Init()
daemon.rg.add(hk.DefaultHK)
daemon.rg.add(hk.HK)

// K8s
k8s.Init()
Expand Down Expand Up @@ -361,11 +361,11 @@ func (g *rungroup) runAll(mainRunner cos.Runner) error {
g.errCh = make(chan runRet, len(g.rs))

// run all, housekeeper first
go g.run(hk.DefaultHK)
go g.run(hk.HK)
runtime.Gosched()
hk.WaitStarted()
for _, r := range g.rs {
if r.Name() == hk.DefaultHK.Name() {
if r.Name() == hk.HK.Name() {
continue
}
go g.run(r)
Expand Down
18 changes: 15 additions & 3 deletions ais/tgtec.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,14 @@ import (
"net/http"
"os"
"strconv"
"time"

"github.com/NVIDIA/aistore/api/apc"
"github.com/NVIDIA/aistore/cmn"
"github.com/NVIDIA/aistore/cmn/cos"
"github.com/NVIDIA/aistore/core/meta"
"github.com/NVIDIA/aistore/ec"
"github.com/NVIDIA/aistore/hk"
)

func (t *target) ecHandler(w http.ResponseWriter, r *http.Request) {
Expand Down Expand Up @@ -67,21 +69,31 @@ func (t *target) sendECMetafile(w http.ResponseWriter, r *http.Request, bck *met
}

func (t *target) httpecpost(w http.ResponseWriter, r *http.Request) {
apiItems, err := t.parseURL(w, r, apc.URLPathEC.L, 1, false)
const (
hkname = "close-ec-streams" + hk.NameSuffix
postpone = time.Minute
)
items, err := t.parseURL(w, r, apc.URLPathEC.L, 1, false)
if err != nil {
return
}
action := apiItems[0]
action := items[0]
switch action {
case apc.ActEcOpen:
hk.UnregIf(hkname, closeEc) // just in case
ec.ECM.OpenStreams(false /*with refc*/)
case apc.ActEcClose:
ec.ECM.CloseStreams(false /*with refc*/)
hk.Reg(hkname, closeEc, postpone)
default:
t.writeErr(w, r, errActEc(action))
}
}

func closeEc(int64) time.Duration {
ec.ECM.CloseStreams(false /*with refc*/)
return hk.UnregInterval
}

func errActEc(act string) error {
return fmt.Errorf(fmtErrInvaldAction, act, []string{apc.ActEcOpen, apc.ActEcClose})
}
2 changes: 1 addition & 1 deletion bench/tools/aisloader/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -351,7 +351,7 @@ func Start(version, buildtime string) (err error) {
// empty config to use memsys constants;
// alternatively: "memsys": { "min_free": "2gb", ... }
hk.Init()
go hk.DefaultHK.Run()
go hk.HK.Run()
hk.WaitStarted()

config := &cmn.Config{}
Expand Down
2 changes: 1 addition & 1 deletion cmd/cli/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module github.com/NVIDIA/aistore/cmd/cli
go 1.22.3

require (
github.com/NVIDIA/aistore v1.3.24-0.20240906170600-713920710479
github.com/NVIDIA/aistore v1.3.24-0.20240909232046-a91636bc334a
github.com/fatih/color v1.17.0
github.com/json-iterator/go v1.1.12
github.com/onsi/ginkgo/v2 v2.20.0
Expand Down
4 changes: 2 additions & 2 deletions cmd/cli/go.sum
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
code.cloudfoundry.org/bytefmt v0.0.0-20190710193110-1eb035ffe2b6/go.mod h1:wN/zk7mhREp/oviagqUXY3EwuHhWyOvAdsn5Y4CzOrc=
github.com/BurntSushi/toml v1.3.2/go.mod h1:CxXYINrC8qIiEnFrOxCa7Jy5BFHlXnUU2pbicEuybxQ=
github.com/NVIDIA/aistore v1.3.24-0.20240906170600-713920710479 h1:JUPW/hrRVa9zqxjJNBXrWdscGyVBv1dhsT9nNPi4V+8=
github.com/NVIDIA/aistore v1.3.24-0.20240906170600-713920710479/go.mod h1:si83S9r29vwIC0f0CE2Mk+25bFiaN6mmVlmuBpP4hHM=
github.com/NVIDIA/aistore v1.3.24-0.20240909232046-a91636bc334a h1:3pqUBkkTtvMa+6+YqA8o199Wvucqc9uJaAtndLJk2vo=
github.com/NVIDIA/aistore v1.3.24-0.20240909232046-a91636bc334a/go.mod h1:si83S9r29vwIC0f0CE2Mk+25bFiaN6mmVlmuBpP4hHM=
github.com/OneOfOne/xxhash v1.2.8 h1:31czK/TI9sNkxIKfaUfGlU47BAxQ0ztGgd9vPyqimf8=
github.com/OneOfOne/xxhash v1.2.8/go.mod h1:eZbhyaAYD41SGSSsnmcpxVoRiQ/MPUTjUdIIOT9Um7Q=
github.com/VividCortex/ewma v1.1.1/go.mod h1:2Tkkvm3sRDVXaiyucHiACn4cqf7DpdyLvmxzcbUokwA=
Expand Down
5 changes: 3 additions & 2 deletions ec/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,9 +122,10 @@ func (mgr *Manager) OpenStreams(withRefc bool) {
mgr.respBundle.Store(bundle.New(client, respSbArgs))
}

func (mgr *Manager) CloseStreams(withRefc bool) {
if withRefc {
func (mgr *Manager) CloseStreams(justRefc bool) {
if justRefc {
mgr._refc.Dec()
return
}
if !mgr.bundleEnabled.CAS(true, false) {
return
Expand Down
2 changes: 1 addition & 1 deletion hk/common_durations.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Package hk provides mechanism for registering cleanup
// functions which are invoked at specified intervals.
/*
* Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved.
* Copyright (c) 2023-2024, NVIDIA CORPORATION. All rights reserved.
*/
package hk

Expand Down
Loading

0 comments on commit 0642c85

Please sign in to comment.