Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added the purge API on node endpoints #3447

Merged
merged 4 commits into from
Oct 27, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions command/agent/node_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ func (s *HTTPServer) NodeSpecificRequest(resp http.ResponseWriter, req *http.Req
case strings.HasSuffix(path, "/drain"):
nodeName := strings.TrimSuffix(path, "/drain")
return s.nodeToggleDrain(resp, req, nodeName)
case strings.HasSuffix(path, "/purge"):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

HTTP Documentation

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

nodeName := strings.TrimSuffix(path, "/purge")
return s.nodePurge(resp, req, nodeName)
default:
return s.nodeQuery(resp, req, path)
}
Expand Down Expand Up @@ -142,3 +145,19 @@ func (s *HTTPServer) nodeQuery(resp http.ResponseWriter, req *http.Request,
}
return out.Node, nil
}

func (s *HTTPServer) nodePurge(resp http.ResponseWriter, req *http.Request, nodeID string) (interface{}, error) {
if req.Method != "POST" {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am going to merge and also allow PUT

return nil, CodedError(405, ErrInvalidMethod)
}
args := structs.NodeDeregisterRequest{
NodeID: nodeID,
}
s.parseWriteRequest(req, &args.WriteRequest)
var out structs.NodeUpdateResponse
if err := s.agent.RPC("Node.Deregister", &args, &out); err != nil {
return nil, err
}
setIndex(resp, out.Index)
return out, nil
}
65 changes: 65 additions & 0 deletions command/agent/node_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,71 @@ func TestHTTP_NodeDrain(t *testing.T) {
})
}

func TestHTTP_NodePurge(t *testing.T) {
t.Parallel()
httpTest(t, nil, func(s *TestAgent) {
// Create the node
node := mock.Node()
args := structs.NodeRegisterRequest{
Node: node,
WriteRequest: structs.WriteRequest{Region: "global"},
}
var resp structs.NodeUpdateResponse
if err := s.Agent.RPC("Node.Register", &args, &resp); err != nil {
t.Fatalf("err: %v", err)
}

// Add some allocations to the node
state := s.Agent.server.State()
alloc1 := mock.Alloc()
alloc1.NodeID = node.ID
if err := state.UpsertJobSummary(999, mock.JobSummary(alloc1.JobID)); err != nil {
t.Fatal(err)
}
err := state.UpsertAllocs(1000, []*structs.Allocation{alloc1})
if err != nil {
t.Fatalf("err: %v", err)
}

// Make the HTTP request to purge it
req, err := http.NewRequest("POST", "/v1/node/"+node.ID+"/purge", nil)
if err != nil {
t.Fatalf("err: %v", err)
}
respW := httptest.NewRecorder()

// Make the request
obj, err := s.Server.NodeSpecificRequest(respW, req)
if err != nil {
t.Fatalf("err: %v", err)
}

// Check for the index
if respW.HeaderMap.Get("X-Nomad-Index") == "" {
t.Fatalf("missing index")
}

// Check the response
upd := obj.(structs.NodeUpdateResponse)
if len(upd.EvalIDs) == 0 {
t.Fatalf("bad: %v", upd)
}

// Ensure that the node is not present anymore
args1 := structs.NodeSpecificRequest{
NodeID: node.ID,
QueryOptions: structs.QueryOptions{Region: "global"},
}
var resp1 structs.SingleNodeResponse
if err := s.Agent.RPC("Node.GetNode", &args1, &resp1); err != nil {
t.Fatalf("err: %v", err)
}
if resp1.Node != nil {
t.Fatalf("node still exists after purging: %#v", resp1.Node)
}
})
}

func TestHTTP_NodeQuery(t *testing.T) {
t.Parallel()
httpTest(t, nil, func(s *TestAgent) {
Expand Down
24 changes: 22 additions & 2 deletions nomad/node_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,10 +209,31 @@ func (n *Node) Deregister(args *structs.NodeDeregisterRequest, reply *structs.No
}
defer metrics.MeasureSince([]string{"nomad", "client", "deregister"}, time.Now())

// Check node permissions
if aclObj, err := n.srv.ResolveToken(args.AuthToken); err != nil {
return err
} else if aclObj != nil && !aclObj.AllowNodeWrite() {
return structs.ErrPermissionDenied
}

// Verify the arguments
if args.NodeID == "" {
return fmt.Errorf("missing node ID for client deregistration")
}
// Look for the node
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Before the // Verify the arguments section can you add:

	// Check node write permissions
	if aclObj, err := n.srv.ResolveToken(args.AuthToken); err != nil {
		return err
	} else if aclObj != nil && !aclObj.AllowNodeWrite() {
		return structs.ErrPermissionDenied
	}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We will need to add a test similar to this: https://github.com/hashicorp/nomad/blob/master/nomad/node_endpoint_test.go#L613

What you need to test is that an ACL with node write/management token work and that another token does not.

snap, err := n.srv.fsm.State().Snapshot()
if err != nil {
return err
}

ws := memdb.NewWatchSet()
node, err := snap.NodeByID(ws, args.NodeID)
if err != nil {
return err
}
if node == nil {
return fmt.Errorf("node not found")
}

// Commit this update via Raft
_, index, err := n.srv.raftApply(structs.NodeDeregisterRequestType, args)
Expand All @@ -232,8 +253,7 @@ func (n *Node) Deregister(args *structs.NodeDeregisterRequest, reply *structs.No
}

// Determine if there are any Vault accessors on the node
ws := memdb.NewWatchSet()
accessors, err := n.srv.State().VaultAccessorsByNode(ws, args.NodeID)
accessors, err := snap.VaultAccessorsByNode(ws, args.NodeID)
if err != nil {
n.srv.logger.Printf("[ERR] nomad.client: looking up accessors for node %q failed: %v", args.NodeID, err)
return err
Expand Down
65 changes: 65 additions & 0 deletions nomad/node_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,71 @@ func TestClientEndpoint_Deregister(t *testing.T) {
}
}

func TestClientEndpoint_Deregister_ACL(t *testing.T) {
t.Parallel()
s1, root := testACLServer(t, nil)
defer s1.Shutdown()
codec := rpcClient(t, s1)
testutil.WaitForLeader(t, s1.RPC)

// Create the node
node := mock.Node()
node1 := mock.Node()
state := s1.fsm.State()
if err := state.UpsertNode(1, node); err != nil {
t.Fatalf("err: %v", err)
}
if err := state.UpsertNode(2, node1); err != nil {
t.Fatalf("err: %v", err)
}

// Create the policy and tokens
validToken := mock.CreatePolicyAndToken(t, state, 1001, "test-valid", mock.NodePolicy(acl.PolicyWrite))
invalidToken := mock.CreatePolicyAndToken(t, state, 1003, "test-invalid", mock.NodePolicy(acl.PolicyRead))

// Deregister without any token and expect it to fail
dereg := &structs.NodeDeregisterRequest{
NodeID: node.ID,
WriteRequest: structs.WriteRequest{Region: "global"},
}
var resp structs.GenericResponse
if err := msgpackrpc.CallWithCodec(codec, "Node.Deregister", dereg, &resp); err == nil {
t.Fatalf("node de-register succeeded")
}

// Deregister with a valid token
dereg.AuthToken = validToken.SecretID
if err := msgpackrpc.CallWithCodec(codec, "Node.Deregister", dereg, &resp); err != nil {
t.Fatalf("err: %v", err)
}

// Check for the node in the FSM
ws := memdb.NewWatchSet()
out, err := state.NodeByID(ws, node.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if out != nil {
t.Fatalf("unexpected node")
}

// Deregister with an invalid token.
dereg1 := &structs.NodeDeregisterRequest{
NodeID: node1.ID,
WriteRequest: structs.WriteRequest{Region: "global"},
}
dereg1.AuthToken = invalidToken.SecretID
if err := msgpackrpc.CallWithCodec(codec, "Node.Deregister", dereg1, &resp); err == nil {
t.Fatalf("rpc should not have succeeded")
}

// Try with a root token
dereg1.AuthToken = root.SecretID
if err := msgpackrpc.CallWithCodec(codec, "Node.Deregister", dereg1, &resp); err != nil {
t.Fatalf("err: %v", err)
}
}

func TestClientEndpoint_Deregister_Vault(t *testing.T) {
t.Parallel()
s1 := testServer(t, nil)
Expand Down
45 changes: 45 additions & 0 deletions website/source/api/nodes.html.md
Original file line number Diff line number Diff line change
Expand Up @@ -614,3 +614,48 @@ $ curl \
"KnownLeader": false
}
```

## Purge Node

This endpoint purges a node from the system. Nodes can still join the cluster if
they are alive.

| Method | Path | Produces |
| ------- | ------------------------- | -------------------------- |
| `POST` | `/v1/node/:node_id/purge` | `application/json` |

The table below shows this endpoint's support for
[blocking queries](/api/index.html#blocking-queries) and
[required ACLs](/api/index.html#acls).

| Blocking Queries | ACL Required |
| ---------------- | ------------------ |
| `NO` | `node:write` |

### Parameters

- `:node_id` `(string: <required>)`- Specifies the UUID of the node. This must
be the full UUID, not the short 8-character one. This is specified as part of
the path.

### Sample Request

```text
$ curl \
-XPOST https://nomad.rocks/v1/node/fb2170a8-257d-3c64-b14d-bc06cc94e34c/purge
```

### Sample Response

```json
{
"EvalIDs": [
"253ec083-22a7-76c9-b8b6-2bf3d4b27bfb"
],
"EvalCreateIndex": 91,
"NodeModifyIndex": 90,
"Index": 90,
"LastContact": 0,
"KnownLeader": false
}
```