Skip to content

Commit

Permalink
proxy: remove capnslog (etcd-io#11614)
Browse files Browse the repository at this point in the history
* proxy: remove capnslog

* CHANGELOG: function signature change
  • Loading branch information
jingyih authored Feb 12, 2020
1 parent 71e3220 commit bb29615
Show file tree
Hide file tree
Showing 16 changed files with 121 additions and 76 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG-3.5.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@ See [code changes](https://github.com/etcd-io/etcd/compare/v3.4.0...v3.5.0) and
- Deprecated `etcd_debugging_mvcc_txn_total` Prometheus metric. Use `etcd_mvcc_txn_total` instead.
- Deprecated `etcd_debugging_mvcc_range_total` Prometheus metric. Use `etcd_mvcc_range_total` instead.
- Master branch `/version` outputs `3.5.0-pre`, instead of `3.4.0+git`.
- Changed `proxy` package function signature to [support structured logger](https://github.com/etcd-io/etcd/pull/11614).
- Previously, `NewClusterProxy(c *clientv3.Client, advaddr string, prefix string) (pb.ClusterServer, <-chan struct{})`, now `NewClusterProxy(lg *zap.Logger, c *clientv3.Client, advaddr string, prefix string) (pb.ClusterServer, <-chan struct{})`.
- Previously, `Register(c *clientv3.Client, prefix string, addr string, ttl int)`, now `Register(lg *zap.Logger, c *clientv3.Client, prefix string, addr string, ttl int) <-chan struct{}`.
- Previously, `NewHandler(t *http.Transport, urlsFunc GetProxyURLs, failureWait time.Duration, refreshInterval time.Duration) http.Handler`, now `NewHandler(lg *zap.Logger, t *http.Transport, urlsFunc GetProxyURLs, failureWait time.Duration, refreshInterval time.Duration) http.Handler`.

### Metrics, Monitoring

Expand Down
2 changes: 1 addition & 1 deletion etcdmain/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,7 @@ func startProxy(cfg *config) error {

return clientURLs
}
ph := httpproxy.NewHandler(pt, uf, time.Duration(cfg.cp.ProxyFailureWaitMs)*time.Millisecond, time.Duration(cfg.cp.ProxyRefreshIntervalMs)*time.Millisecond)
ph := httpproxy.NewHandler(lg, pt, uf, time.Duration(cfg.cp.ProxyFailureWaitMs)*time.Millisecond, time.Duration(cfg.cp.ProxyRefreshIntervalMs)*time.Millisecond)
ph = embed.WrapCORS(cfg.ec.CORS, ph)

if cfg.isReadonlyProxy() {
Expand Down
4 changes: 2 additions & 2 deletions etcdmain/grpc_proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -349,9 +349,9 @@ func newGRPCProxyServer(lg *zap.Logger, client *clientv3.Client) *grpc.Server {
kvp, _ := grpcproxy.NewKvProxy(client)
watchp, _ := grpcproxy.NewWatchProxy(client)
if grpcProxyResolverPrefix != "" {
grpcproxy.Register(client, grpcProxyResolverPrefix, grpcProxyAdvertiseClientURL, grpcProxyResolverTTL)
grpcproxy.Register(lg, client, grpcProxyResolverPrefix, grpcProxyAdvertiseClientURL, grpcProxyResolverTTL)
}
clusterp, _ := grpcproxy.NewClusterProxy(client, grpcProxyAdvertiseClientURL, grpcProxyResolverPrefix)
clusterp, _ := grpcproxy.NewClusterProxy(lg, client, grpcProxyAdvertiseClientURL, grpcProxyResolverPrefix)
leasep, _ := grpcproxy.NewLeaseProxy(client)
mainp := grpcproxy.NewMaintenanceProxy(client)
authp := grpcproxy.NewAuthProxy(client)
Expand Down
4 changes: 3 additions & 1 deletion integration/cluster_proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import (
"go.etcd.io/etcd/clientv3/namespace"
"go.etcd.io/etcd/proxy/grpcproxy"
"go.etcd.io/etcd/proxy/grpcproxy/adapter"

"go.uber.org/zap"
)

var (
Expand Down Expand Up @@ -56,7 +58,7 @@ func toGRPC(c *clientv3.Client) grpcAPI {
wp, wpch := grpcproxy.NewWatchProxy(c)
lp, lpch := grpcproxy.NewLeaseProxy(c)
mp := grpcproxy.NewMaintenanceProxy(c)
clp, _ := grpcproxy.NewClusterProxy(c, "", "") // without registering proxy URLs
clp, _ := grpcproxy.NewClusterProxy(zap.NewExample(), c, "", "") // without registering proxy URLs
authp := grpcproxy.NewAuthProxy(c)
lockp := grpcproxy.NewLockProxy(c)
electp := grpcproxy.NewElectionProxy(c)
Expand Down
12 changes: 9 additions & 3 deletions proxy/grpcproxy/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes"
pb "go.etcd.io/etcd/etcdserver/etcdserverpb"

"go.uber.org/zap"
"golang.org/x/time/rate"
gnaming "google.golang.org/grpc/naming"
)
Expand All @@ -34,6 +35,7 @@ import (
const resolveRetryRate = 1

type clusterProxy struct {
lg *zap.Logger
clus clientv3.Cluster
ctx context.Context
gr *naming.GRPCResolver
Expand All @@ -49,8 +51,12 @@ type clusterProxy struct {
// NewClusterProxy takes optional prefix to fetch grpc-proxy member endpoints.
// The returned channel is closed when there is grpc-proxy endpoint registered
// and the client's context is canceled so the 'register' loop returns.
func NewClusterProxy(c *clientv3.Client, advaddr string, prefix string) (pb.ClusterServer, <-chan struct{}) {
func NewClusterProxy(lg *zap.Logger, c *clientv3.Client, advaddr string, prefix string) (pb.ClusterServer, <-chan struct{}) {
if lg == nil {
lg = zap.NewNop()
}
cp := &clusterProxy{
lg: lg,
clus: c.Cluster,
ctx: c.Ctx(),
gr: &naming.GRPCResolver{Client: c},
Expand Down Expand Up @@ -78,7 +84,7 @@ func (cp *clusterProxy) resolve(prefix string) {
for rm.Wait(cp.ctx) == nil {
wa, err := cp.gr.Resolve(prefix)
if err != nil {
plog.Warningf("failed to resolve %q (%v)", prefix, err)
cp.lg.Warn("failed to resolve prefix", zap.String("prefix", prefix), zap.Error(err))
continue
}
cp.monitor(wa)
Expand All @@ -89,7 +95,7 @@ func (cp *clusterProxy) monitor(wa gnaming.Watcher) {
for cp.ctx.Err() == nil {
ups, err := wa.Next()
if err != nil {
plog.Warningf("clusterProxy watcher error (%v)", err)
cp.lg.Warn("clusterProxy watcher error", zap.Error(err))
if rpctypes.ErrorDesc(err) == naming.ErrWatcherClosed.Error() {
return
}
Expand Down
9 changes: 5 additions & 4 deletions proxy/grpcproxy/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"go.etcd.io/etcd/integration"
"go.etcd.io/etcd/pkg/testutil"

"go.uber.org/zap"
"google.golang.org/grpc"
)

Expand All @@ -34,7 +35,7 @@ func TestClusterProxyMemberList(t *testing.T) {
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
defer clus.Terminate(t)

cts := newClusterProxyServer([]string{clus.Members[0].GRPCAddr()}, t)
cts := newClusterProxyServer(zap.NewExample(), []string{clus.Members[0].GRPCAddr()}, t)
defer cts.close(t)

cfg := clientv3.Config{
Expand Down Expand Up @@ -88,7 +89,7 @@ func (cts *clusterproxyTestServer) close(t *testing.T) {
}
}

func newClusterProxyServer(endpoints []string, t *testing.T) *clusterproxyTestServer {
func newClusterProxyServer(lg *zap.Logger, endpoints []string, t *testing.T) *clusterproxyTestServer {
cfg := clientv3.Config{
Endpoints: endpoints,
DialTimeout: 5 * time.Second,
Expand All @@ -113,8 +114,8 @@ func newClusterProxyServer(endpoints []string, t *testing.T) *clusterproxyTestSe
cts.server.Serve(cts.l)
}()

Register(client, "test-prefix", cts.l.Addr().String(), 7)
cts.cp, cts.donec = NewClusterProxy(client, cts.l.Addr().String(), "test-prefix")
Register(lg, client, "test-prefix", cts.l.Addr().String(), 7)
cts.cp, cts.donec = NewClusterProxy(lg, client, cts.l.Addr().String(), "test-prefix")
cts.caddr = cts.l.Addr().String()
pb.RegisterClusterServer(cts.server, cts.cp)
close(servec)
Expand Down
19 changes: 0 additions & 19 deletions proxy/grpcproxy/logger.go

This file was deleted.

19 changes: 12 additions & 7 deletions proxy/grpcproxy/register.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"go.etcd.io/etcd/clientv3/concurrency"
"go.etcd.io/etcd/clientv3/naming"

"go.uber.org/zap"
"golang.org/x/time/rate"
gnaming "google.golang.org/grpc/naming"
)
Expand All @@ -32,17 +33,17 @@ const registerRetryRate = 1
// Register registers itself as a grpc-proxy server by writing prefixed-key
// with session of specified TTL (in seconds). The returned channel is closed
// when the client's context is canceled.
func Register(c *clientv3.Client, prefix string, addr string, ttl int) <-chan struct{} {
func Register(lg *zap.Logger, c *clientv3.Client, prefix string, addr string, ttl int) <-chan struct{} {
rm := rate.NewLimiter(rate.Limit(registerRetryRate), registerRetryRate)

donec := make(chan struct{})
go func() {
defer close(donec)

for rm.Wait(c.Ctx()) == nil {
ss, err := registerSession(c, prefix, addr, ttl)
ss, err := registerSession(lg, c, prefix, addr, ttl)
if err != nil {
plog.Warningf("failed to create a session %v", err)
lg.Warn("failed to create a session", zap.Error(err))
continue
}
select {
Expand All @@ -51,8 +52,8 @@ func Register(c *clientv3.Client, prefix string, addr string, ttl int) <-chan st
return

case <-ss.Done():
plog.Warning("session expired; possible network partition or server restart")
plog.Warning("creating a new session to rejoin")
lg.Warn("session expired; possible network partition or server restart")
lg.Warn("creating a new session to rejoin")
continue
}
}
Expand All @@ -61,7 +62,7 @@ func Register(c *clientv3.Client, prefix string, addr string, ttl int) <-chan st
return donec
}

func registerSession(c *clientv3.Client, prefix string, addr string, ttl int) (*concurrency.Session, error) {
func registerSession(lg *zap.Logger, c *clientv3.Client, prefix string, addr string, ttl int) (*concurrency.Session, error) {
ss, err := concurrency.NewSession(c, concurrency.WithTTL(ttl))
if err != nil {
return nil, err
Expand All @@ -72,7 +73,11 @@ func registerSession(c *clientv3.Client, prefix string, addr string, ttl int) (*
return nil, err
}

plog.Infof("registered %q with %d-second lease", addr, ttl)
lg.Info(
"registered session with lease",
zap.String("addr", addr),
zap.Int("lease-ttl", ttl),
)
return ss, nil
}

Expand Down
3 changes: 2 additions & 1 deletion proxy/grpcproxy/register_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"go.etcd.io/etcd/integration"
"go.etcd.io/etcd/pkg/testutil"

"go.uber.org/zap"
gnaming "google.golang.org/grpc/naming"
)

Expand All @@ -44,7 +45,7 @@ func TestRegister(t *testing.T) {
t.Fatalf("len(ups) expected 0, got %d (%v)", len(ups), ups)
}

donec := Register(cli, testPrefix, paddr, 5)
donec := Register(zap.NewExample(), cli, testPrefix, paddr, 5)

ups, err = wa.Next()
if err != nil {
Expand Down
37 changes: 29 additions & 8 deletions proxy/httpproxy/director.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ import (
"net/url"
"sync"
"time"

"go.uber.org/zap"
)

// defaultRefreshInterval is the default proxyRefreshIntervalMs value
Expand All @@ -31,8 +33,12 @@ func init() {
rand.Seed(time.Now().UnixNano())
}

func newDirector(urlsFunc GetProxyURLs, failureWait time.Duration, refreshInterval time.Duration) *director {
func newDirector(lg *zap.Logger, urlsFunc GetProxyURLs, failureWait time.Duration, refreshInterval time.Duration) *director {
if lg == nil {
lg = zap.NewNop()
}
d := &director{
lg: lg,
uf: urlsFunc,
failureWait: failureWait,
}
Expand All @@ -56,7 +62,7 @@ func newDirector(urlsFunc GetProxyURLs, failureWait time.Duration, refreshInterv
for _, e := range es {
sl = append(sl, e.URL.String())
}
plog.Infof("endpoints found %q", sl)
lg.Info("endpoints found", zap.Strings("endpoints", sl))
})
}
time.Sleep(ri)
Expand All @@ -68,6 +74,7 @@ func newDirector(urlsFunc GetProxyURLs, failureWait time.Duration, refreshInterv

type director struct {
sync.Mutex
lg *zap.Logger
ep []*endpoint
uf GetProxyURLs
failureWait time.Duration
Expand All @@ -81,10 +88,10 @@ func (d *director) refresh() {
for _, u := range urls {
uu, err := url.Parse(u)
if err != nil {
plog.Printf("upstream URL invalid: %v", err)
d.lg.Info("upstream URL invalid", zap.Error(err))
continue
}
endpoints = append(endpoints, newEndpoint(*uu, d.failureWait))
endpoints = append(endpoints, newEndpoint(d.lg, *uu, d.failureWait))
}

// shuffle array to avoid connections being "stuck" to a single endpoint
Expand All @@ -109,8 +116,9 @@ func (d *director) endpoints() []*endpoint {
return filtered
}

func newEndpoint(u url.URL, failureWait time.Duration) *endpoint {
func newEndpoint(lg *zap.Logger, u url.URL, failureWait time.Duration) *endpoint {
ep := endpoint{
lg: lg,
URL: u,
Available: true,
failFunc: timedUnavailabilityFunc(failureWait),
Expand All @@ -122,6 +130,7 @@ func newEndpoint(u url.URL, failureWait time.Duration) *endpoint {
type endpoint struct {
sync.Mutex

lg *zap.Logger
URL url.URL
Available bool

Expand All @@ -138,10 +147,17 @@ func (ep *endpoint) Failed() {
ep.Available = false
ep.Unlock()

plog.Printf("marked endpoint %s unavailable", ep.URL.String())
if ep.lg != nil {
ep.lg.Info("marked endpoint unavailable", zap.String("endpoint", ep.URL.String()))
}

if ep.failFunc == nil {
plog.Printf("no failFunc defined, endpoint %s will be unavailable forever.", ep.URL.String())
if ep.lg != nil {
ep.lg.Info(
"no failFunc defined, endpoint will be unavailable forever",
zap.String("endpoint", ep.URL.String()),
)
}
return
}

Expand All @@ -152,7 +168,12 @@ func timedUnavailabilityFunc(wait time.Duration) func(*endpoint) {
return func(ep *endpoint) {
time.AfterFunc(wait, func() {
ep.Available = true
plog.Printf("marked endpoint %s available, to retest connectivity", ep.URL.String())
if ep.lg != nil {
ep.lg.Info(
"marked endpoint available, to retest connectivity",
zap.String("endpoint", ep.URL.String()),
)
}
})
}
}
4 changes: 3 additions & 1 deletion proxy/httpproxy/director_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
"sort"
"testing"
"time"

"go.uber.org/zap"
)

func TestNewDirectorScheme(t *testing.T) {
Expand Down Expand Up @@ -53,7 +55,7 @@ func TestNewDirectorScheme(t *testing.T) {
uf := func() []string {
return tt.urls
}
got := newDirector(uf, time.Minute, time.Minute)
got := newDirector(zap.NewExample(), uf, time.Minute, time.Minute)

var gep []string
for _, ep := range got.ep {
Expand Down
11 changes: 8 additions & 3 deletions proxy/httpproxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"strings"
"time"

"go.uber.org/zap"
"golang.org/x/net/http2"
)

Expand All @@ -43,17 +44,21 @@ type GetProxyURLs func() []string
// NewHandler creates a new HTTP handler, listening on the given transport,
// which will proxy requests to an etcd cluster.
// The handler will periodically update its view of the cluster.
func NewHandler(t *http.Transport, urlsFunc GetProxyURLs, failureWait time.Duration, refreshInterval time.Duration) http.Handler {
func NewHandler(lg *zap.Logger, t *http.Transport, urlsFunc GetProxyURLs, failureWait time.Duration, refreshInterval time.Duration) http.Handler {
if lg == nil {
lg = zap.NewNop()
}
if t.TLSClientConfig != nil {
// Enable http2, see Issue 5033.
err := http2.ConfigureTransport(t)
if err != nil {
plog.Infof("Error enabling Transport HTTP/2 support: %v", err)
lg.Info("Error enabling Transport HTTP/2 support", zap.Error(err))
}
}

p := &reverseProxy{
director: newDirector(urlsFunc, failureWait, refreshInterval),
lg: lg,
director: newDirector(lg, urlsFunc, failureWait, refreshInterval),
transport: t,
}

Expand Down
Loading

0 comments on commit bb29615

Please sign in to comment.