From de20644d4732249b35f71869d456b2e9b2212ac8 Mon Sep 17 00:00:00 2001 From: Ben Ye Date: Sat, 28 Dec 2024 20:00:27 -0800 Subject: [PATCH 1/3] optimize sort keys by server in memcache client Signed-off-by: Ben Ye --- pkg/cacheutil/memcached_client.go | 24 ++++-- pkg/cacheutil/memcached_client_test.go | 25 ++---- pkg/cacheutil/memcached_server_selector.go | 99 ++++++++++++++-------- 3 files changed, 91 insertions(+), 57 deletions(-) diff --git a/pkg/cacheutil/memcached_client.go b/pkg/cacheutil/memcached_client.go index e700ffee7b..309851efcd 100644 --- a/pkg/cacheutil/memcached_client.go +++ b/pkg/cacheutil/memcached_client.go @@ -573,18 +573,28 @@ func (c *memcachedClient) getMultiSingle(ctx context.Context, keys []string) (it func (c *memcachedClient) sortKeysByServer(keys []string) []string { bucketed := make(map[string][]string) - for _, key := range keys { - addr, err := c.selector.PickServer(key) - // If we couldn't determine the correct server, return keys in existing order - if err != nil { - return keys - } + addrs := *(addrsPool.Get().(*[]net.Addr)) + defer func() { + addrs = (addrs)[:0] + addrsPool.Put(&addrs) + }() + err := c.selector.Each(func(addr net.Addr) error { + addrs = append(addrs, addr) + return nil + }) + // No need to pick server and sort keys if no more than 1 server. + if err != nil || len(addrs) <= 1 { + return keys + } + for _, key := range keys { + // Bypass selector and pick server using jump hash. + addr := pickServerWithJumpHash(addrs, key) addrString := addr.String() bucketed[addrString] = append(bucketed[addrString], key) } - var out []string + out := make([]string, 0, len(keys)) for srv := range bucketed { out = append(out, bucketed[srv]...) } diff --git a/pkg/cacheutil/memcached_client_test.go b/pkg/cacheutil/memcached_client_test.go index 4951075720..52cd49b2bc 100644 --- a/pkg/cacheutil/memcached_client_test.go +++ b/pkg/cacheutil/memcached_client_test.go @@ -454,13 +454,9 @@ func TestMemcachedClient_sortKeysByServer(t *testing.T) { config.Addresses = []string{"127.0.0.1:11211", "127.0.0.2:11211"} backendMock := newMemcachedClientBackendMock() selector := &mockServerSelector{ - serversByKey: map[string]mockAddr{ - "key1": "127.0.0.1:11211", - "key2": "127.0.0.2:11211", - "key3": "127.0.0.1:11211", - "key4": "127.0.0.2:11211", - "key5": "127.0.0.1:11211", - "key6": "127.0.0.2:11211", + addrs: []mockAddr{ + "127.0.0.1:11211", + "127.0.0.2:11211", }, } @@ -478,8 +474,8 @@ func TestMemcachedClient_sortKeysByServer(t *testing.T) { } sorted := client.sortKeysByServer(keys) - testutil.ContainsStringSlice(t, sorted, []string{"key1", "key3", "key5"}) - testutil.ContainsStringSlice(t, sorted, []string{"key2", "key4", "key6"}) + testutil.ContainsStringSlice(t, sorted, []string{"key1", "key2", "key4"}) + testutil.ContainsStringSlice(t, sorted, []string{"key5", "key3", "key6"}) } type mockAddr string @@ -493,25 +489,20 @@ func (m mockAddr) String() string { } type mockServerSelector struct { - serversByKey map[string]mockAddr + addrs []mockAddr } +// PickServer is not used here. func (m *mockServerSelector) PickServer(key string) (net.Addr, error) { - if srv, ok := m.serversByKey[key]; ok { - return srv, nil - } - panic(fmt.Sprintf("unmapped key: %s", key)) } func (m *mockServerSelector) Each(f func(net.Addr) error) error { - for k := range m.serversByKey { - addr := m.serversByKey[k] + for _, addr := range m.addrs { if err := f(addr); err != nil { return err } } - return nil } diff --git a/pkg/cacheutil/memcached_server_selector.go b/pkg/cacheutil/memcached_server_selector.go index 5426d6af33..b0b1e40880 100644 --- a/pkg/cacheutil/memcached_server_selector.go +++ b/pkg/cacheutil/memcached_server_selector.go @@ -5,6 +5,7 @@ package cacheutil import ( "net" + "strings" "sync" "github.com/bradfitz/gomemcache/memcache" @@ -30,9 +31,8 @@ var ( // with consistent DNS names where the naturally sorted order // is predictable (ie. Kubernetes statefulsets). type MemcachedJumpHashSelector struct { - // To avoid copy and pasting all memcache server list logic, - // we embed it and implement our features on top of it. - servers memcache.ServerList + mu sync.RWMutex + addrs []net.Addr } // SetServers changes a MemcachedJumpHashSelector's set of servers at @@ -53,52 +53,85 @@ func (s *MemcachedJumpHashSelector) SetServers(servers ...string) error { copy(sortedServers, servers) natsort.Sort(sortedServers) - return s.servers.SetServers(sortedServers...) + naddr := make([]net.Addr, len(servers)) + var err error + for i, server := range sortedServers { + naddr[i], err = parseStaticAddr(server) + if err != nil { + return err + } + } + + s.mu.Lock() + defer s.mu.Unlock() + s.addrs = naddr + return nil } // PickServer returns the server address that a given item // should be shared onto. func (s *MemcachedJumpHashSelector) PickServer(key string) (net.Addr, error) { - // Unfortunately we can't read the list of server addresses from - // the original implementation, so we use Each() to fetch all of them. - addrs := *(addrsPool.Get().(*[]net.Addr)) - err := s.servers.Each(func(addr net.Addr) error { - addrs = append(addrs, addr) - return nil - }) - if err != nil { - return nil, err - } - - // No need of a jump hash in case of 0 or 1 servers. - if len(addrs) == 0 { - addrs = (addrs)[:0] - addrsPool.Put(&addrs) + s.mu.RLock() + defer s.mu.RUnlock() + if len(s.addrs) == 0 { return nil, memcache.ErrNoServers + } else if len(s.addrs) == 1 { + return s.addrs[0], nil } - if len(addrs) == 1 { - picked := addrs[0] - - addrs = (addrs)[:0] - addrsPool.Put(&addrs) + return pickServerWithJumpHash(s.addrs, key), nil +} - return picked, nil +// Each iterates over each server and calls the given function. +// If f returns a non-nil error, iteration will stop and that +// error will be returned. +func (s *MemcachedJumpHashSelector) Each(f func(net.Addr) error) error { + s.mu.RLock() + defer s.mu.RUnlock() + for _, def := range s.addrs { + if err := f(def); err != nil { + return err + } } + return nil +} +// pickServerWithJumpHash returns the server address that a given item should be shared onto. +func pickServerWithJumpHash(addrs []net.Addr, key string) net.Addr { // Pick a server using the jump hash. cs := xxhash.Sum64String(key) idx := jumpHash(cs, len(addrs)) picked := (addrs)[idx] + return picked +} - addrs = (addrs)[:0] - addrsPool.Put(&addrs) +// Copied from https://github.com/bradfitz/gomemcache/blob/master/memcache/selector.go#L68. +func parseStaticAddr(server string) (net.Addr, error) { + if strings.Contains(server, "/") { + addr, err := net.ResolveUnixAddr("unix", server) + if err != nil { + return nil, err + } + return newStaticAddr(addr), nil + } + tcpaddr, err := net.ResolveTCPAddr("tcp", server) + if err != nil { + return nil, err + } + return newStaticAddr(tcpaddr), nil +} - return picked, nil +// Copied from https://github.com/bradfitz/gomemcache/blob/master/memcache/selector.go#L45 +// staticAddr caches the Network() and String() values from any net.Addr. +type staticAddr struct { + ntw, str string } -// Each iterates over each server and calls the given function. -// If f returns a non-nil error, iteration will stop and that -// error will be returned. -func (s *MemcachedJumpHashSelector) Each(f func(net.Addr) error) error { - return s.servers.Each(f) +func newStaticAddr(a net.Addr) net.Addr { + return &staticAddr{ + ntw: a.Network(), + str: a.String(), + } } + +func (s *staticAddr) Network() string { return s.ntw } +func (s *staticAddr) String() string { return s.str } From 5d2d238849805ef6657ce22e351d6b84be1141d7 Mon Sep 17 00:00:00 2001 From: Ben Ye Date: Tue, 31 Dec 2024 14:09:36 -0800 Subject: [PATCH 2/3] address comments Signed-off-by: Ben Ye --- pkg/cacheutil/memcached_client.go | 25 ++++---------- pkg/cacheutil/memcached_client_test.go | 38 ++++++++++++++++------ pkg/cacheutil/memcached_server_selector.go | 35 +++++++++++++++----- 3 files changed, 60 insertions(+), 38 deletions(-) diff --git a/pkg/cacheutil/memcached_client.go b/pkg/cacheutil/memcached_client.go index 309851efcd..5737fac949 100644 --- a/pkg/cacheutil/memcached_client.go +++ b/pkg/cacheutil/memcached_client.go @@ -102,6 +102,10 @@ type updatableServerSelector interface { // resolve. No attempt is made to connect to the server. If any // error occurs, no changes are made to the internal server list. SetServers(servers ...string) error + + // PickServerForKeys is like PickServer but returns a map of server address + // and corresponding keys. + PickServerForKeys(keys []string) (map[string][]string, error) } // MemcachedClientConfig is the config accepted by RemoteCacheClient. @@ -571,29 +575,12 @@ func (c *memcachedClient) getMultiSingle(ctx context.Context, keys []string) (it // *except* that keys sharded to the same server will be together. The order of keys // returned may change from call to call. func (c *memcachedClient) sortKeysByServer(keys []string) []string { - bucketed := make(map[string][]string) - - addrs := *(addrsPool.Get().(*[]net.Addr)) - defer func() { - addrs = (addrs)[:0] - addrsPool.Put(&addrs) - }() - err := c.selector.Each(func(addr net.Addr) error { - addrs = append(addrs, addr) - return nil - }) + bucketed, err := c.selector.PickServerForKeys(keys) // No need to pick server and sort keys if no more than 1 server. - if err != nil || len(addrs) <= 1 { + if err != nil || len(bucketed) <= 1 { return keys } - for _, key := range keys { - // Bypass selector and pick server using jump hash. - addr := pickServerWithJumpHash(addrs, key) - addrString := addr.String() - bucketed[addrString] = append(bucketed[addrString], key) - } - out := make([]string, 0, len(keys)) for srv := range bucketed { out = append(out, bucketed[srv]...) diff --git a/pkg/cacheutil/memcached_client_test.go b/pkg/cacheutil/memcached_client_test.go index 52cd49b2bc..ab1f5dca95 100644 --- a/pkg/cacheutil/memcached_client_test.go +++ b/pkg/cacheutil/memcached_client_test.go @@ -454,9 +454,9 @@ func TestMemcachedClient_sortKeysByServer(t *testing.T) { config.Addresses = []string{"127.0.0.1:11211", "127.0.0.2:11211"} backendMock := newMemcachedClientBackendMock() selector := &mockServerSelector{ - addrs: []mockAddr{ - "127.0.0.1:11211", - "127.0.0.2:11211", + resp: map[string][]string{ + "127.0.0.1:11211": {"key1", "key2", "key4"}, + "127.0.0.2:11211": {"key5", "key3", "key6"}, }, } @@ -476,6 +476,23 @@ func TestMemcachedClient_sortKeysByServer(t *testing.T) { sorted := client.sortKeysByServer(keys) testutil.ContainsStringSlice(t, sorted, []string{"key1", "key2", "key4"}) testutil.ContainsStringSlice(t, sorted, []string{"key5", "key3", "key6"}) + + // 1 server no need to sort. + client.selector = &mockServerSelector{ + resp: map[string][]string{ + "127.0.0.1:11211": {}, + }, + } + sorted = client.sortKeysByServer(keys) + testutil.ContainsStringSlice(t, sorted, []string{"key1", "key2", "key3", "key4", "key5", "key6"}) + + // 0 server no need to sort. + client.selector = &mockServerSelector{ + resp: map[string][]string{}, + err: memcache.ErrCacheMiss, + } + sorted = client.sortKeysByServer(keys) + testutil.ContainsStringSlice(t, sorted, []string{"key1", "key2", "key3", "key4", "key5", "key6"}) } type mockAddr string @@ -489,7 +506,8 @@ func (m mockAddr) String() string { } type mockServerSelector struct { - addrs []mockAddr + resp map[string][]string + err error } // PickServer is not used here. @@ -497,13 +515,13 @@ func (m *mockServerSelector) PickServer(key string) (net.Addr, error) { panic(fmt.Sprintf("unmapped key: %s", key)) } +// Each is not used here. func (m *mockServerSelector) Each(f func(net.Addr) error) error { - for _, addr := range m.addrs { - if err := f(addr); err != nil { - return err - } - } - return nil + panic("not implemented") +} + +func (m *mockServerSelector) PickServerForKeys(keys []string) (map[string][]string, error) { + return m.resp, m.err } func (m *mockServerSelector) SetServers(...string) error { diff --git a/pkg/cacheutil/memcached_server_selector.go b/pkg/cacheutil/memcached_server_selector.go index b0b1e40880..bc1f36706f 100644 --- a/pkg/cacheutil/memcached_server_selector.go +++ b/pkg/cacheutil/memcached_server_selector.go @@ -13,15 +13,6 @@ import ( "github.com/facette/natsort" ) -var ( - addrsPool = sync.Pool{ - New: func() interface{} { - addrs := make([]net.Addr, 0, 64) - return &addrs - }, - } -) - // MemcachedJumpHashSelector implements the memcache.ServerSelector // interface, utilizing a jump hash to distribute keys to servers. // @@ -95,6 +86,32 @@ func (s *MemcachedJumpHashSelector) Each(f func(net.Addr) error) error { return nil } +// PickServerForKeys is like PickServer but returns a map of server address +// and corresponding keys. +func (s *MemcachedJumpHashSelector) PickServerForKeys(keys []string) (map[string][]string, error) { + s.mu.RLock() + defer s.mu.RUnlock() + + // No need of a jump hash in case of 0 or 1 servers. + if len(s.addrs) <= 0 { + return nil, memcache.ErrNoServers + } + + m := make(map[string][]string, len(keys)) + if len(s.addrs) == 1 { + m[s.addrs[0].String()] = keys + return m, nil + } + + for _, key := range keys { + // Pick a server using the jump hash. + picked := pickServerWithJumpHash(s.addrs, key).String() + m[picked] = append(m[picked], key) + } + + return m, nil +} + // pickServerWithJumpHash returns the server address that a given item should be shared onto. func pickServerWithJumpHash(addrs []net.Addr, key string) net.Addr { // Pick a server using the jump hash. From 38337e8df96e0401164671f71172c0397104cfd7 Mon Sep 17 00:00:00 2001 From: Ben Ye Date: Tue, 31 Dec 2024 14:30:45 -0800 Subject: [PATCH 3/3] remove unused mockAddr Signed-off-by: Ben Ye --- pkg/cacheutil/memcached_client_test.go | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/pkg/cacheutil/memcached_client_test.go b/pkg/cacheutil/memcached_client_test.go index ab1f5dca95..957fd060b0 100644 --- a/pkg/cacheutil/memcached_client_test.go +++ b/pkg/cacheutil/memcached_client_test.go @@ -495,16 +495,6 @@ func TestMemcachedClient_sortKeysByServer(t *testing.T) { testutil.ContainsStringSlice(t, sorted, []string{"key1", "key2", "key3", "key4", "key5", "key6"}) } -type mockAddr string - -func (m mockAddr) Network() string { - return "mock" -} - -func (m mockAddr) String() string { - return string(m) -} - type mockServerSelector struct { resp map[string][]string err error