Skip to content

Commit

Permalink
[dbnode] Add placement set handler for perform set operations (#2108)
Browse files Browse the repository at this point in the history
  • Loading branch information
robskillington authored Jan 30, 2020
1 parent 3709171 commit 217cfe8
Show file tree
Hide file tree
Showing 14 changed files with 824 additions and 67 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func TestConfigGetBootstrappersHandler(t *testing.T) {
}
`
assert.Equal(t, stripAllWhitespace(expectedResponse), string(body),
xtest.Diff(mustPrettyJSON(t, expectedResponse), mustPrettyJSON(t, string(body))))
xtest.Diff(xtest.MustPrettyJSON(t, expectedResponse), xtest.MustPrettyJSON(t, string(body))))
}

func TestConfigGetBootstrappersHandlerNotFound(t *testing.T) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func TestConfigSetBootstrappersHandler(t *testing.T) {
}
`
assert.Equal(t, stripAllWhitespace(expectedResponse), string(body),
xtest.Diff(mustPrettyJSON(t, expectedResponse), mustPrettyJSON(t, string(body))))
xtest.Diff(xtest.MustPrettyJSON(t, expectedResponse), xtest.MustPrettyJSON(t, string(body))))
}

func TestConfigSetBootstrappersHandlerNoValues(t *testing.T) {
Expand Down
22 changes: 6 additions & 16 deletions src/query/api/v1/handler/database/create_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
package database

import (
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
Expand Down Expand Up @@ -204,7 +203,7 @@ func testLocalType(t *testing.T, providedType string, placementExists bool) {
}
`
assert.Equal(t, stripAllWhitespace(expectedResponse), string(body),
xtest.Diff(mustPrettyJSON(t, expectedResponse), mustPrettyJSON(t, string(body))))
xtest.Diff(xtest.MustPrettyJSON(t, expectedResponse), xtest.MustPrettyJSON(t, string(body))))
}

func TestLocalTypeClusteredPlacementAlreadyExists(t *testing.T) {
Expand Down Expand Up @@ -362,7 +361,7 @@ func TestLocalTypeWithNumShards(t *testing.T) {
}
`
assert.Equal(t, stripAllWhitespace(expectedResponse), string(body),
xtest.Diff(mustPrettyJSON(t, expectedResponse), mustPrettyJSON(t, string(body))))
xtest.Diff(xtest.MustPrettyJSON(t, expectedResponse), xtest.MustPrettyJSON(t, string(body))))
}
func TestLocalWithBlockSizeNanos(t *testing.T) {
ctrl := gomock.NewController(t)
Expand Down Expand Up @@ -472,7 +471,7 @@ func TestLocalWithBlockSizeNanos(t *testing.T) {
}
`
assert.Equal(t, stripAllWhitespace(expectedResponse), string(body),
xtest.Diff(mustPrettyJSON(t, expectedResponse), mustPrettyJSON(t, string(body))))
xtest.Diff(xtest.MustPrettyJSON(t, expectedResponse), xtest.MustPrettyJSON(t, string(body))))
}

func TestLocalWithBlockSizeExpectedSeriesDatapointsPerHour(t *testing.T) {
Expand Down Expand Up @@ -587,7 +586,7 @@ func TestLocalWithBlockSizeExpectedSeriesDatapointsPerHour(t *testing.T) {
`, desiredBlockSize, desiredBlockSize)

assert.Equal(t, stripAllWhitespace(expectedResponse), string(body),
xtest.Diff(mustPrettyJSON(t, expectedResponse), mustPrettyJSON(t, string(body))))
xtest.Diff(xtest.MustPrettyJSON(t, expectedResponse), xtest.MustPrettyJSON(t, string(body))))
}

func TestClusterTypeHosts(t *testing.T) {
Expand Down Expand Up @@ -845,7 +844,7 @@ func testClusterTypeHosts(t *testing.T, placementExists bool) {
}
`
assert.Equal(t, stripAllWhitespace(expectedResponse), string(body),
xtest.Diff(mustPrettyJSON(t, expectedResponse), mustPrettyJSON(t, string(body))))
xtest.Diff(xtest.MustPrettyJSON(t, expectedResponse), xtest.MustPrettyJSON(t, string(body))))
}

func TestClusterTypeHostsWithIsolationGroup(t *testing.T) {
Expand Down Expand Up @@ -977,7 +976,7 @@ func TestClusterTypeHostsWithIsolationGroup(t *testing.T) {
}
`
assert.Equal(t, stripAllWhitespace(expectedResponse), string(body),
xtest.Diff(mustPrettyJSON(t, expectedResponse), mustPrettyJSON(t, string(body))))
xtest.Diff(xtest.MustPrettyJSON(t, expectedResponse), xtest.MustPrettyJSON(t, string(body))))
}
func TestClusterTypeMissingHostnames(t *testing.T) {
ctrl := gomock.NewController(t)
Expand Down Expand Up @@ -1048,15 +1047,6 @@ func stripAllWhitespace(str string) string {
}, str)
}

func mustPrettyJSON(t *testing.T, str string) string {
var unmarshalled map[string]interface{}
err := json.Unmarshal([]byte(str), &unmarshalled)
require.NoError(t, err)
pretty, err := json.MarshalIndent(unmarshalled, "", " ")
require.NoError(t, err)
return string(pretty)
}

func withEndline(str string) string {
return str + "\n"
}
9 changes: 9 additions & 0 deletions src/query/api/v1/handler/placement/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,15 @@ func RegisterRoutes(
r.HandleFunc(M3DBReplaceURL, replaceFn).Methods(ReplaceHTTPMethod)
r.HandleFunc(M3AggReplaceURL, replaceFn).Methods(ReplaceHTTPMethod)
r.HandleFunc(M3CoordinatorReplaceURL, replaceFn).Methods(ReplaceHTTPMethod)

// Set
var (
setHandler = NewSetHandler(opts)
setFn = applyMiddleware(setHandler.ServeHTTP, defaults, opts.instrumentOptions)
)
r.HandleFunc(M3DBSetURL, setFn).Methods(SetHTTPMethod)
r.HandleFunc(M3AggSetURL, setFn).Methods(SetHTTPMethod)
r.HandleFunc(M3CoordinatorSetURL, setFn).Methods(SetHTTPMethod)
}

func newPlacementCutoverNanosFn(
Expand Down
155 changes: 155 additions & 0 deletions src/query/api/v1/handler/placement/set.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
// Copyright (c) 2018 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package placement

import (
"net/http"
"path"
"time"

"github.com/m3db/m3/src/cluster/kv"
"github.com/m3db/m3/src/cluster/placement"
"github.com/m3db/m3/src/query/api/v1/handler"
"github.com/m3db/m3/src/query/api/v1/handler/prometheus/handleroptions"
"github.com/m3db/m3/src/query/generated/proto/admin"
"github.com/m3db/m3/src/query/util/logging"
xhttp "github.com/m3db/m3/src/x/net/http"

"github.com/gogo/protobuf/jsonpb"
"go.uber.org/zap"
)

const (
// SetHTTPMethod is the HTTP method for the the upsert endpoint.
SetHTTPMethod = http.MethodPost

setPathName = "set"
)

var (
// M3DBSetURL is the url for the m3db replace handler (method POST).
M3DBSetURL = path.Join(handler.RoutePrefixV1,
M3DBServicePlacementPathName, setPathName)

// M3AggSetURL is the url for the m3aggregator replace handler (method
// POST).
M3AggSetURL = path.Join(handler.RoutePrefixV1,
handleroptions.M3AggregatorServiceName, setPathName)

// M3CoordinatorSetURL is the url for the m3coordinator replace handler
// (method POST).
M3CoordinatorSetURL = path.Join(handler.RoutePrefixV1,
handleroptions.M3CoordinatorServiceName, setPathName)
)

// SetHandler is the type for placement replaces.
type SetHandler Handler

// NewSetHandler returns a new SetHandler.
func NewSetHandler(opts HandlerOptions) *SetHandler {
return &SetHandler{HandlerOptions: opts, nowFn: time.Now}
}

func (h *SetHandler) ServeHTTP(
svc handleroptions.ServiceNameAndDefaults,
w http.ResponseWriter,
r *http.Request,
) {
ctx := r.Context()
logger := logging.WithContext(ctx, h.instrumentOptions)

req, pErr := h.parseRequest(r)
if pErr != nil {
xhttp.Error(w, pErr.Inner(), pErr.Code())
return
}

serviceOpts := handleroptions.NewServiceOptions(svc,
r.Header, h.m3AggServiceOptions)
service, _, err := ServiceWithAlgo(h.clusterClient,
serviceOpts, h.nowFn(), nil)
if err != nil {
logger.Error("unable to create placement service", zap.Error(err))
xhttp.Error(w, err, http.StatusInternalServerError)
return
}

curPlacement, err := service.Placement()
if err == kv.ErrNotFound {
logger.Error("placement not found", zap.Any("req", req), zap.Error(err))
xhttp.Error(w, err, http.StatusNotFound)
return
}
if err != nil {
logger.Error("unable to get current placement", zap.Error(err))
xhttp.Error(w, err, http.StatusInternalServerError)
return
}

var (
placementProto = req.Placement
placementVersion int
)
newPlacement, err := placement.NewPlacementFromProto(req.Placement)
if err != nil {
logger.Error("unable to create new placement from proto", zap.Error(err))
xhttp.Error(w, err, http.StatusBadRequest)
return
}

dryRun := !req.Confirm
if dryRun {
logger.Info("performing dry run for set placement, not confirmed")
placementVersion = curPlacement.Version() + 1
} else {
logger.Info("performing live run for set placement, confirmed")
// Ensure the placement we're updating is still the one on which we validated
// all shards are available.
updatedPlacement, err := service.CheckAndSet(newPlacement,
curPlacement.Version())
if err != nil {
logger.Error("unable to update placement", zap.Error(err))
xhttp.Error(w, err, http.StatusInternalServerError)
return
}

placementVersion = updatedPlacement.Version()
}

resp := &admin.PlacementSetResponse{
Placement: placementProto,
Version: int32(placementVersion),
DryRun: dryRun,
}

xhttp.WriteProtoMsgJSONResponse(w, resp, logger)
}

func (h *SetHandler) parseRequest(r *http.Request) (*admin.PlacementSetRequest, *xhttp.ParseError) {
defer r.Body.Close()

req := &admin.PlacementSetRequest{}
if err := jsonpb.Unmarshal(r.Body, req); err != nil {
return nil, xhttp.NewParseError(err, http.StatusBadRequest)
}

return req, nil
}
Loading

0 comments on commit 217cfe8

Please sign in to comment.