Skip to content

Commit

Permalink
[API change] do not accept node URL - always require node ID
Browse files Browse the repository at this point in the history
* when not in cluster map, validate via "self-removed" history
* (security)

Signed-off-by: Alex Aizman <[email protected]>
  • Loading branch information
alex-aizman committed Jul 25, 2024
1 parent 25495d8 commit 071ddea
Show file tree
Hide file tree
Showing 11 changed files with 128 additions and 85 deletions.
4 changes: 2 additions & 2 deletions ais/fspathrgrp.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,9 +110,9 @@ func (g *fsprungroup) doDD(action string, flags uint64, mpath string, dontResilv
dsort.Managers.AbortAll(fmt.Errorf("%q %s", action, rmi))

if numAvail == 0 {
s := fmt.Sprintf("%s: lost (via %q) the last available mountpath %q", g.t.si, action, rmi)
nlog.Errorf("%s: lost (via %q) the last available mountpath %q", g.t.si, action, rmi)
g.postDD(rmi, action, nil /*xaction*/, nil /*error*/) // go ahead to disable/detach
g.t.disable(s)
g.t.disable()
return rmi, nil
}

Expand Down
18 changes: 10 additions & 8 deletions ais/htrun.go
Original file line number Diff line number Diff line change
Expand Up @@ -536,7 +536,10 @@ func (h *htrun) stop(wg *sync.WaitGroup, rmFromSmap bool) {
const sleep = time.Second >> 1

if rmFromSmap {
h.unregisterSelf(true)
smap := h.owner.smap.get()
if err := h.rmSelf(smap, true); err != nil && !cos.IsErrConnectionRefused(err) {
nlog.Warningln(err)
}
}
nlog.Infoln("Shutting down HTTP")

Expand Down Expand Up @@ -2023,11 +2026,10 @@ func (h *htrun) pollClusterStarted(config *cmn.Config, psi *meta.Snode) (maxNsti
}
}

func (h *htrun) unregisterSelf(ignoreErr bool) (err error) {
var status int
smap := h.owner.smap.get()
func (h *htrun) rmSelf(smap *smapX, ignoreErr bool) error {
if smap == nil || smap.validate() != nil {
return
nlog.Warningln("cannot remove", h.String(), "(self): local copy of Smap is invalid")
return nil
}
cargs := allocCargs()
{
Expand All @@ -2036,17 +2038,17 @@ func (h *htrun) unregisterSelf(ignoreErr bool) (err error) {
cargs.timeout = apc.DefaultTimeout
}
res := h.call(cargs, smap)
status, err = res.status, res.err
status, err := res.status, res.err
if err != nil {
f := nlog.Errorf
if ignoreErr {
f = nlog.Infof
}
f("%s: failed to unreg self, err: %v(%d)", h.si, err, status)
f("%s: failed to remove self from Smap: %v(%d)", h.si, err, status)
}
freeCargs(cargs)
freeCR(res)
return
return err
}

// via /health handler
Expand Down
52 changes: 34 additions & 18 deletions ais/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -2479,7 +2479,7 @@ func (p *proxy) reverseHandler(w http.ResponseWriter, r *http.Request) {
}
// otherwise, warn and go ahead
// (e.g. scenario: shutdown when transitioning through states)
nlog.Warningf("%s: %s status is: %s", p, si.StringEx(), daeStatus)
nlog.Warningln(p.String()+":", si.StringEx(), "status is:", daeStatus)
}

// access control
Expand All @@ -2500,27 +2500,43 @@ func (p *proxy) reverseHandler(w http.ResponseWriter, r *http.Request) {
if err != nil {
return
}
if si == nil {
// TODO: if not primary, restore the original request (including URL.Path but not only)
// and forwardCP
v := &p.rproxy.removed
v.mu.Lock()
si = v.m[nodeID]
v.mu.Unlock()

// do
if si != nil {
p.reverseNodeRequest(w, r, si)
return
}
// special case when the target self-removed itself from cluster map
// after having lost all mountpaths.
nodeURL := r.Header.Get(apc.HdrNodeURL)
if nodeURL == "" {
err = &errNodeNotFound{"cannot rproxy to", nodeID, p.si, smap}
p.writeErr(w, r, err, http.StatusNotFound)
return
if si == nil {
// when failing to find in Smap and (self)removed, both
var s string
if !smap.IsPrimary(p.si) {
s = "non-primary, " // TODO above
}
err = &errNodeNotFound{s + "cannot forward request to node", nodeID, p.si, smap}
p.writeErr(w, r, err, http.StatusNotFound)
return
}

// cleanup in place
go p._clremoved(nodeID)
}
parsedURL, err := url.Parse(nodeURL)
if err != nil {
p.writeErrf(w, r, "%s: invalid URL %q for node %s", p.si, nodeURL, nodeID)

// do
p.reverseNodeRequest(w, r, si)
}

func (p *proxy) _clremoved(sid string) {
time.Sleep(cmn.Rom.MaxKeepalive())
smap := p.owner.smap.get()
if smap.GetNode(sid) == nil {
return
}

p.reverseRequest(w, r, nodeID, parsedURL)
v := &p.rproxy.removed
v.mu.Lock()
delete(v.m, sid)
v.mu.Unlock()
}

//
Expand Down
16 changes: 13 additions & 3 deletions ais/prxclu.go
Original file line number Diff line number Diff line change
Expand Up @@ -2050,18 +2050,28 @@ func (p *proxy) httpcludel(w http.ResponseWriter, r *http.Request) {
return
}
if err := p.isIntraCall(r.Header, false /*from primary*/); err != nil {
err = fmt.Errorf("expecting intra-cluster call for self-initiated removal, got %w", err)
err = fmt.Errorf("expecting intra-cluster call for %q, got %w", apc.ActSelfRemove, err)
p.writeErr(w, r, err)
return
}

cid := r.Header.Get(apc.HdrCallerID)
if cid != sid {
err = fmt.Errorf("expecting self-initiated removal (%s != %s)", cid, sid)
err = fmt.Errorf("expecting %s by %s, got a wrong node ID (%s != %s)", apc.ActSelfRemove, node.StringEx(), cid, sid)
p.writeErr(w, r, err)
return
}
if ecode, err := p.mcastUnreg(&apc.ActMsg{Action: "self-initiated-removal"}, node); err != nil {

if ecode, err := p.mcastUnreg(&apc.ActMsg{Action: apc.ActSelfRemove}, node); err != nil {
p.writeErr(w, r, err, ecode)
} else {
v := &p.rproxy.removed
v.mu.Lock()
if v.m == nil {
v.m = make(meta.NodeMap, 4)
}
v.m[node.ID()] = node
v.mu.Unlock()
}
}

Expand Down
15 changes: 11 additions & 4 deletions ais/prxrev.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,15 @@ import (
type (
reverseProxy struct {
cloud *httputil.ReverseProxy // unmodified GET requests => storage.googleapis.com
nodes sync.Map // map of reverse proxies keyed by node DaemonIDs
nodes sync.Map // map [SID => reverse proxy instance]
removed struct {
m meta.NodeMap // map [SID => self-disabled node]
mu sync.Mutex
}
primary struct {
rp *httputil.ReverseProxy
url string
sync.Mutex
mu sync.Mutex
}
}
singleRProxy struct {
Expand Down Expand Up @@ -71,7 +75,7 @@ func (p *proxy) forwardCP(w http.ResponseWriter, r *http.Request, msg *apc.ActMs
}
}
primary := &p.rproxy.primary
primary.Lock()
primary.mu.Lock()
if primary.url != smap.Primary.PubNet.URL {
primary.url = smap.Primary.PubNet.URL
uparsed, err := url.Parse(smap.Primary.PubNet.URL)
Expand All @@ -81,7 +85,7 @@ func (p *proxy) forwardCP(w http.ResponseWriter, r *http.Request, msg *apc.ActMs
primary.rp.Transport = rpTransport(config)
primary.rp.ErrorHandler = p.rpErrHandler
}
primary.Unlock()
primary.mu.Unlock()
if len(body) > 0 {
debug.AssertFunc(func() bool {
l, _ := io.Copy(io.Discard, r.Body)
Expand Down Expand Up @@ -141,6 +145,9 @@ func (p *proxy) reverseNodeRequest(w http.ResponseWriter, r *http.Request, si *m
p.reverseRequest(w, r, si.ID(), parsedURL)
}

// usage:
// 1. primary => node in the cluster
// 2. primary => remais
func (p *proxy) reverseRequest(w http.ResponseWriter, r *http.Request, nodeID string, parsedURL *url.URL) {
rproxy := p.rproxy.loadOrStore(nodeID, parsedURL, p.rpErrHandler)
rproxy.ServeHTTP(w, r)
Expand Down
9 changes: 9 additions & 0 deletions ais/test/downloader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -929,9 +929,18 @@ func TestDownloadMountpath(t *testing.T) {
tassert.CheckFatal(t, err)
tlog.Logf("Started download job %s, waiting for it to finish\n", id2)

time.Sleep(3 * time.Second)
waitForDownload(t, id2, time.Minute)
objs, err = tools.ListObjectNames(proxyURL, bck, "", 0, true /*cached*/)
tassert.CheckError(t, err)

if len(objs) == 0 {
tlog.Logln("Listed zero objects - waiting a bit, retrying...")
time.Sleep(8 * time.Second)
objs, err = tools.ListObjectNames(proxyURL, bck, "", 0, true /*cached*/)
tassert.CheckError(t, err)
}

tassert.Fatalf(t, len(objs) == objsCnt, "Expected %d objects to be present, got: %d", objsCnt, len(objs))
}

Expand Down
3 changes: 2 additions & 1 deletion ais/test/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1034,12 +1034,13 @@ func TestMountpathDisableAll(t *testing.T) {
num: 5000,
numGetsEachFile: 2,
}
baseParams = tools.BaseAPIParams()
)

m.initAndSaveState(true /*cleanup*/)
m.expectTargets(1)

baseParams := tools.BaseAPIParams(m.smap.Primary.PubNet.URL) // NOTE: only primary has self-removed

// Remove all mountpaths on the target
target, _ := m.smap.GetRandTarget()
tname := target.StringEx()
Expand Down
75 changes: 38 additions & 37 deletions ais/tgtcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -1213,43 +1213,6 @@ func (t *target) healthHandler(w http.ResponseWriter, r *http.Request) {
}
}

// unregisters the target and marks it as disabled by an internal event
func (t *target) disable(msg string) {
t.regstate.mu.Lock()

if t.regstate.disabled.Load() {
t.regstate.mu.Unlock()
return // nothing to do
}
if err := t.unregisterSelf(false); err != nil {
t.regstate.mu.Unlock()
nlog.Errorf("%s but failed to remove self from Smap: %v", msg, err)
return
}
t.regstate.disabled.Store(true)
t.regstate.mu.Unlock()
nlog.Errorf("Warning: %s => disabled and removed self from Smap", msg)
}

// registers the target again if it was disabled by and internal event
func (t *target) enable() error {
t.regstate.mu.Lock()

if !t.regstate.disabled.Load() {
t.regstate.mu.Unlock()
return nil
}
if _, err := t.joinCluster(apc.ActSelfJoinTarget); err != nil {
t.regstate.mu.Unlock()
nlog.Infof("%s failed to re-join: %v", t, err)
return err
}
t.regstate.disabled.Store(false)
t.regstate.mu.Unlock()
nlog.Infof("%s is now active", t)
return nil
}

// checks with a given target to see if it has the object.
// target acts as a client - compare with api.HeadObject
func (t *target) headt2t(lom *core.LOM, tsi *meta.Snode, smap *smapX) (ok bool) {
Expand Down Expand Up @@ -1349,6 +1312,44 @@ func (t *target) decommission(action string, opts *apc.ActValRmNode) {
}
}

// disable and remove self from cluster map
func (t *target) disable() {
t.regstate.mu.Lock()

if t.regstate.disabled.Load() {
t.regstate.mu.Unlock()
return // nothing to do
}
smap := t.owner.smap.get()
if err := t.rmSelf(smap, false); err != nil {
t.regstate.mu.Unlock()
nlog.Errorln(t.String(), "failed to remove self from", smap.String()+":", err, "action:", apc.ActSelfRemove)
return
}
t.regstate.disabled.Store(true)
t.regstate.mu.Unlock()
nlog.Warningln(t.String(), "disabled and removed self from", smap.String(), "action:", apc.ActSelfRemove)
}

// registers the target again if it was disabled by and internal event
func (t *target) enable() error {
t.regstate.mu.Lock()

if !t.regstate.disabled.Load() {
t.regstate.mu.Unlock()
return nil
}
if _, err := t.joinCluster(apc.ActSelfJoinTarget); err != nil {
t.regstate.mu.Unlock()
nlog.Infoln(t.String(), "failed to re-join:", err)
return err
}
t.regstate.disabled.Store(false)
t.regstate.mu.Unlock()
nlog.Infoln(t.String(), "is now active")
return nil
}

// stop gracefully, return from rungroup.run
func (t *target) Stop(err error) {
if !nlog.Stopping() {
Expand Down
11 changes: 6 additions & 5 deletions api/apc/actmsg.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,11 +103,12 @@ const (

// internal use
const (
ActAddRemoteBck = "add-remote-bck" // add to BMD existing remote bucket, usually on the fly
ActRmNodeUnsafe = "rm-unsafe" // primary => the node to be removed
ActStartGFN = "start-gfn" // get-from-neighbor
ActStopGFN = "stop-gfn" // off
ActCleanupMarkers = "cleanup-markers"
ActAddRemoteBck = "add-remote-bck" // add to BMD existing remote bucket, usually on the fly
ActRmNodeUnsafe = "rm-unsafe" // primary => the node to be removed
ActStartGFN = "start-gfn" // get-from-neighbor
ActStopGFN = "stop-gfn" // off
ActCleanupMarkers = "cleanup-markers" // part of the target joining sequence
ActSelfRemove = "self-initiated-removal" // e.g., when losing last mountpath
)

const (
Expand Down
5 changes: 2 additions & 3 deletions api/apc/headers.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,8 @@ const (
// Query objects handle header.
HdrHandle = HeaderPrefix + "query-handle"

// Reverse proxy headers.
HdrNodeID = HeaderPrefix + "node-id"
HdrNodeURL = HeaderPrefix + "node-url"
// Reverse proxy header.
HdrNodeID = HeaderPrefix + "node-id"

// uptimes, respectively
HdrNodeUptime = HeaderPrefix + "node-uptime"
Expand Down
5 changes: 1 addition & 4 deletions api/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,7 @@ func GetMountpaths(bp BaseParams, node *meta.Snode) (mpl *apc.MountpathList, err
reqParams.Path = apc.URLPathReverseDae.S // NOTE: reverse, via p.reverseHandler
reqParams.Query = url.Values{apc.QparamWhat: []string{apc.WhatMountpaths}}
reqParams.Header = http.Header{
apc.HdrNodeID: []string{node.ID()},
apc.HdrNodeURL: []string{node.URL(cmn.NetPublic)},
apc.HdrNodeID: []string{node.ID()},
}
}
_, err = reqParams.DoReqAny(&mpl)
Expand All @@ -51,7 +50,6 @@ func AttachMountpath(bp BaseParams, node *meta.Snode, mountpath string, label ..
reqParams.Body = cos.MustMarshal(apc.ActMsg{Action: apc.ActMountpathAttach, Value: mountpath})
reqParams.Header = http.Header{
apc.HdrNodeID: []string{node.ID()},
apc.HdrNodeURL: []string{node.URL(cmn.NetPublic)},
cos.HdrContentType: []string{cos.ContentJSON},
}
if len(label) > 0 {
Expand All @@ -74,7 +72,6 @@ func EnableMountpath(bp BaseParams, node *meta.Snode, mountpath string) error {
reqParams.Body = cos.MustMarshal(apc.ActMsg{Action: apc.ActMountpathEnable, Value: mountpath})
reqParams.Header = http.Header{
apc.HdrNodeID: []string{node.ID()},
apc.HdrNodeURL: []string{node.URL(cmn.NetPublic)},
cos.HdrContentType: []string{cos.ContentJSON},
}
}
Expand Down

0 comments on commit 071ddea

Please sign in to comment.