Skip to content

Commit

Permalink
Merge pull request #8407 from heyitsanthony/v2v3
Browse files Browse the repository at this point in the history
v2 emulation over v3
  • Loading branch information
Anthony Romano authored Aug 31, 2017
2 parents 4c725ce + 32bfd9e commit 1b19a5c
Show file tree
Hide file tree
Showing 34 changed files with 1,922 additions and 694 deletions.
1 change: 1 addition & 0 deletions e2e/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,7 @@ func (cfg *etcdProcessClusterConfig) etcdServerProcessConfigs() []*etcdServerPro
"--data-dir", dataDirPath,
"--snapshot-count", fmt.Sprintf("%d", cfg.snapCount),
}
args = addV2Args(args)
if cfg.forceNewCluster {
args = append(args, "--force-new-cluster")
}
Expand Down
19 changes: 19 additions & 0 deletions e2e/v2_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
// Copyright 2017 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// +build !v2v3

package e2e

func addV2Args(args []string) []string { return args }
21 changes: 21 additions & 0 deletions e2e/v2v3_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
// Copyright 2017 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// +build v2v3

package e2e

func addV2Args(args []string) []string {
return append(args, "--experimental-enable-v2v3", "v2/")
}
1 change: 1 addition & 0 deletions embed/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ type Config struct {
// Experimental flags

ExperimentalCorruptCheckTime time.Duration `json:"experimental-corrupt-check-time"`
ExperimentalEnableV2V3 string `json:"experimental-enable-v2v3"`
}

// configYAML holds the config suitable for yaml parsing
Expand Down
9 changes: 8 additions & 1 deletion embed/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ import (
"github.com/coreos/etcd/etcdserver"
"github.com/coreos/etcd/etcdserver/api/etcdhttp"
"github.com/coreos/etcd/etcdserver/api/v2http"
"github.com/coreos/etcd/etcdserver/api/v2v3"
"github.com/coreos/etcd/etcdserver/api/v3client"
"github.com/coreos/etcd/etcdserver/api/v3rpc"
"github.com/coreos/etcd/pkg/cors"
"github.com/coreos/etcd/pkg/debugutil"
Expand Down Expand Up @@ -409,7 +411,12 @@ func (e *Etcd) serve() (err error) {
// Start a client server goroutine for each listen address
var h http.Handler
if e.Config().EnableV2 {
h = v2http.NewClientHandler(e.Server, e.Server.Cfg.ReqTimeout())
if len(e.Config().ExperimentalEnableV2V3) > 0 {
srv := v2v3.NewServer(v3client.New(e.Server), e.cfg.ExperimentalEnableV2V3)
h = v2http.NewClientHandler(srv, e.Server.Cfg.ReqTimeout())
} else {
h = v2http.NewClientHandler(e.Server, e.Server.Cfg.ReqTimeout())
}
} else {
mux := http.NewServeMux()
etcdhttp.HandleBasic(mux, e.Server)
Expand Down
14 changes: 3 additions & 11 deletions etcdctl/ctlv3/command/migrate_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,8 +218,9 @@ func applyConf(cc raftpb.ConfChange, cl *membership.RaftCluster) {
}
}

func applyRequest(r *pb.Request, applyV2 etcdserver.ApplierV2) {
toTTLOptions(r)
func applyRequest(req *pb.Request, applyV2 etcdserver.ApplierV2) {
r := (*etcdserver.RequestV2)(req)
r.TTLOptions()
switch r.Method {
case "POST":
applyV2.Post(r)
Expand All @@ -236,15 +237,6 @@ func applyRequest(r *pb.Request, applyV2 etcdserver.ApplierV2) {
}
}

func toTTLOptions(r *pb.Request) store.TTLOptionSet {
refresh, _ := pbutil.GetBool(r.Refresh)
ttlOptions := store.TTLOptionSet{Refresh: refresh}
if r.Expiration != 0 {
ttlOptions.ExpireTime = time.Unix(0, r.Expiration)
}
return ttlOptions
}

func writeStore(w io.Writer, st store.Store) uint64 {
all, err := st.Get("/1", true, true)
if err != nil {
Expand Down
1 change: 1 addition & 0 deletions etcdmain/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ func newConfig() *config {

fs.BoolVar(&cfg.StrictReconfigCheck, "strict-reconfig-check", cfg.StrictReconfigCheck, "Reject reconfiguration requests that would cause quorum loss.")
fs.BoolVar(&cfg.EnableV2, "enable-v2", true, "Accept etcd V2 client requests.")
fs.StringVar(&cfg.ExperimentalEnableV2V3, "experimental-enable-v2v3", cfg.ExperimentalEnableV2V3, "v3 prefix for serving emulated v2 state.")

// proxy
fs.Var(cfg.proxy, "proxy", fmt.Sprintf("Valid values include %s", strings.Join(cfg.proxy.Values, ", ")))
Expand Down
2 changes: 2 additions & 0 deletions etcdmain/help.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,5 +183,7 @@ auth flags:
experimental flags:
--experimental-corrupt-check-time '0s'
duration of time between cluster corruption check passes.
--experimental-enable-v2v3 ''
serve v2 requests through the v3 backend under a given prefix.
`
)
3 changes: 0 additions & 3 deletions etcdserver/api/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,6 @@ type Cluster interface {
// Member retrieves a particular member based on ID, or nil if the
// member does not exist in the cluster
Member(id types.ID) *membership.Member
// IsIDRemoved checks whether the given ID has been removed from this
// cluster at some point in the past
IsIDRemoved(id types.ID) bool
// Version is the cluster-wide minimum major.minor version.
Version() *semver.Version
}
2 changes: 1 addition & 1 deletion etcdserver/api/etcdhttp/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ const (

// HandleBasic adds handlers to a mux for serving JSON etcd client requests
// that do not access the v2 store.
func HandleBasic(mux *http.ServeMux, server *etcdserver.EtcdServer) {
func HandleBasic(mux *http.ServeMux, server etcdserver.ServerPeer) {
mux.HandleFunc(varsPath, serveVars)
mux.HandleFunc(configPath+"/local/log", logHandleFunc)
HandleMetricsHealth(mux, server)
Expand Down
6 changes: 3 additions & 3 deletions etcdserver/api/etcdhttp/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ const (
)

// HandleMetricsHealth registers metrics and health handlers.
func HandleMetricsHealth(mux *http.ServeMux, srv *etcdserver.EtcdServer) {
func HandleMetricsHealth(mux *http.ServeMux, srv etcdserver.ServerV2) {
mux.Handle(pathMetrics, prometheus.Handler())
mux.Handle(PathHealth, NewHealthHandler(func() Health { return checkHealth(srv) }))
}
Expand All @@ -44,7 +44,7 @@ func HandlePrometheus(mux *http.ServeMux) {
}

// HandleHealth registers health handler on '/health'.
func HandleHealth(mux *http.ServeMux, srv *etcdserver.EtcdServer) {
func HandleHealth(mux *http.ServeMux, srv etcdserver.ServerV2) {
mux.Handle(PathHealth, NewHealthHandler(func() Health { return checkHealth(srv) }))
}

Expand Down Expand Up @@ -74,7 +74,7 @@ type Health struct {
Errors []string `json:"errors,omitempty"`
}

func checkHealth(srv *etcdserver.EtcdServer) Health {
func checkHealth(srv etcdserver.ServerV2) Health {
h := Health{Health: false}

as := srv.Alarms()
Expand Down
9 changes: 2 additions & 7 deletions etcdserver/api/etcdhttp/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,8 @@ const (
)

// NewPeerHandler generates an http.Handler to handle etcd peer requests.
func NewPeerHandler(s *etcdserver.EtcdServer) http.Handler {
var lh http.Handler
l := s.Lessor()
if l != nil {
lh = leasehttp.NewHandler(l, func() <-chan struct{} { return s.ApplyWait() })
}
return newPeerHandler(s.Cluster(), s.RaftHandler(), lh)
func NewPeerHandler(s etcdserver.ServerPeer) http.Handler {
return newPeerHandler(s.Cluster(), s.RaftHandler(), s.LeaseHandler())
}

func newPeerHandler(cluster api.Cluster, raftHandler http.Handler, leaseHandler http.Handler) http.Handler {
Expand Down
1 change: 0 additions & 1 deletion etcdserver/api/etcdhttp/peer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ func (c *fakeCluster) Members() []*membership.Member {
return []*membership.Member(ms)
}
func (c *fakeCluster) Member(id types.ID) *membership.Member { return c.members[uint64(id)] }
func (c *fakeCluster) IsIDRemoved(id types.ID) bool { return false }
func (c *fakeCluster) Version() *semver.Version { return nil }

// TestNewPeerHandlerOnRaftPrefix tests that NewPeerHandler returns a handler that
Expand Down
34 changes: 17 additions & 17 deletions etcdserver/api/v2http/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,22 +50,21 @@ const (
)

// NewClientHandler generates a muxed http.Handler with the given parameters to serve etcd client requests.
func NewClientHandler(server *etcdserver.EtcdServer, timeout time.Duration) http.Handler {
func NewClientHandler(server etcdserver.ServerPeer, timeout time.Duration) http.Handler {
mux := http.NewServeMux()
etcdhttp.HandleBasic(mux, server)
handleV2(mux, server, timeout)
return requestLogger(mux)
}

func handleV2(mux *http.ServeMux, server *etcdserver.EtcdServer, timeout time.Duration) {
func handleV2(mux *http.ServeMux, server etcdserver.ServerV2, timeout time.Duration) {
sec := auth.NewStore(server, timeout)
kh := &keysHandler{
sec: sec,
server: server,
cluster: server.Cluster(),
timer: server,
timeout: timeout,
clientCertAuthEnabled: server.Cfg.ClientCertAuthEnabled,
clientCertAuthEnabled: server.ClientCertAuthEnabled(),
}

sh := &statsHandler{
Expand All @@ -78,15 +77,15 @@ func handleV2(mux *http.ServeMux, server *etcdserver.EtcdServer, timeout time.Du
cluster: server.Cluster(),
timeout: timeout,
clock: clockwork.NewRealClock(),
clientCertAuthEnabled: server.Cfg.ClientCertAuthEnabled,
clientCertAuthEnabled: server.ClientCertAuthEnabled(),
}

mah := &machinesHandler{cluster: server.Cluster()}

sech := &authHandler{
sec: sec,
cluster: server.Cluster(),
clientCertAuthEnabled: server.Cfg.ClientCertAuthEnabled,
clientCertAuthEnabled: server.ClientCertAuthEnabled(),
}
mux.HandleFunc("/", http.NotFound)
mux.Handle(keysPrefix, kh)
Expand All @@ -102,9 +101,8 @@ func handleV2(mux *http.ServeMux, server *etcdserver.EtcdServer, timeout time.Du

type keysHandler struct {
sec auth.Store
server etcdserver.Server
server etcdserver.ServerV2
cluster api.Cluster
timer etcdserver.RaftTimer
timeout time.Duration
clientCertAuthEnabled bool
}
Expand Down Expand Up @@ -142,15 +140,15 @@ func (h *keysHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
}
switch {
case resp.Event != nil:
if err := writeKeyEvent(w, resp.Event, noValueOnSuccess, h.timer); err != nil {
if err := writeKeyEvent(w, resp, noValueOnSuccess); err != nil {
// Should never be reached
plog.Errorf("error writing event (%v)", err)
}
reportRequestCompleted(rr, resp, startTime)
case resp.Watcher != nil:
ctx, cancel := context.WithTimeout(context.Background(), defaultWatchTimeout)
defer cancel()
handleKeyWatch(ctx, w, resp.Watcher, rr.Stream, h.timer)
handleKeyWatch(ctx, w, resp, rr.Stream)
default:
writeKeyError(w, errors.New("received response with no Event/Watcher!"))
}
Expand All @@ -170,7 +168,7 @@ func (h *machinesHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {

type membersHandler struct {
sec auth.Store
server etcdserver.Server
server etcdserver.ServerV2
cluster api.Cluster
timeout time.Duration
clock clockwork.Clock
Expand Down Expand Up @@ -503,14 +501,15 @@ func parseKeyRequest(r *http.Request, clock clockwork.Clock) (etcdserverpb.Reque
// writeKeyEvent trims the prefix of key path in a single Event under
// StoreKeysPrefix, serializes it and writes the resulting JSON to the given
// ResponseWriter, along with the appropriate headers.
func writeKeyEvent(w http.ResponseWriter, ev *store.Event, noValueOnSuccess bool, rt etcdserver.RaftTimer) error {
func writeKeyEvent(w http.ResponseWriter, resp etcdserver.Response, noValueOnSuccess bool) error {
ev := resp.Event
if ev == nil {
return errors.New("cannot write empty Event!")
}
w.Header().Set("Content-Type", "application/json")
w.Header().Set("X-Etcd-Index", fmt.Sprint(ev.EtcdIndex))
w.Header().Set("X-Raft-Index", fmt.Sprint(rt.Index()))
w.Header().Set("X-Raft-Term", fmt.Sprint(rt.Term()))
w.Header().Set("X-Raft-Index", fmt.Sprint(resp.Index))
w.Header().Set("X-Raft-Term", fmt.Sprint(resp.Term))

if ev.IsCreated() {
w.WriteHeader(http.StatusCreated)
Expand Down Expand Up @@ -552,7 +551,8 @@ func writeKeyError(w http.ResponseWriter, err error) {
}
}

func handleKeyWatch(ctx context.Context, w http.ResponseWriter, wa store.Watcher, stream bool, rt etcdserver.RaftTimer) {
func handleKeyWatch(ctx context.Context, w http.ResponseWriter, resp etcdserver.Response, stream bool) {
wa := resp.Watcher
defer wa.Remove()
ech := wa.EventChan()
var nch <-chan bool
Expand All @@ -562,8 +562,8 @@ func handleKeyWatch(ctx context.Context, w http.ResponseWriter, wa store.Watcher

w.Header().Set("Content-Type", "application/json")
w.Header().Set("X-Etcd-Index", fmt.Sprint(wa.StartIndex()))
w.Header().Set("X-Raft-Index", fmt.Sprint(rt.Index()))
w.Header().Set("X-Raft-Term", fmt.Sprint(rt.Term()))
w.Header().Set("X-Raft-Index", fmt.Sprint(resp.Index))
w.Header().Set("X-Raft-Term", fmt.Sprint(resp.Term))
w.WriteHeader(http.StatusOK)

// Ensure headers are flushed early, in case of long polling
Expand Down
Loading

0 comments on commit 1b19a5c

Please sign in to comment.