diff --git a/ChangeLog.rst b/ChangeLog.rst index b08bd7ef..e756045d 100644 --- a/ChangeLog.rst +++ b/ChangeLog.rst @@ -1,5 +1,9 @@ Change Log ========== +New in draft +-------------------- +* Change set_many and set_multi api return value. see [pr](https://github.com/pinterest/pymemcache/pull/179) + New in version 1.4.4 -------------------- * pypy3 to travis test matrix diff --git a/pymemcache/client/base.py b/pymemcache/client/base.py index 7ac2224d..2892029c 100644 --- a/pymemcache/client/base.py +++ b/pymemcache/client/base.py @@ -29,7 +29,7 @@ RECV_SIZE = 4096 VALID_STORE_RESULTS = { - b'set': (b'STORED',), + b'set': (b'STORED', b'NOT_STORED'), b'add': (b'STORED', b'NOT_STORED'), b'replace': (b'STORED', b'NOT_STORED'), b'append': (b'STORED', b'NOT_STORED'), @@ -107,11 +107,11 @@ def _check_key(key, allow_unicode_keys, key_prefix=b''): ) elif c == ord(b'\00'): raise MemcacheIllegalInputError( - "Key contains null character: '%r'" % (key,) + "Key contains null character: '%r'" % (key,) ) elif c == ord(b'\r'): raise MemcacheIllegalInputError( - "Key contains carriage return: '%r'" % (key,) + "Key contains carriage return: '%r'" % (key,) ) return key @@ -309,17 +309,20 @@ def set_many(self, values, expire=0, noreply=None): self.default_noreply). Returns: - If no exception is raised, always returns True. Otherwise all, some - or none of the keys have been successfully set. If noreply is True - then a successful return does not guarantee that any keys were - successfully set (just that the keys were successfully sent). + Returns a list of keys that failed to be inserted. + If noreply is True, alwais returns empty list. """ - # TODO: make this more performant by sending all the values first, then # waiting for all the responses. + if noreply is None: + noreply = self.default_noreply + + failed = [] for key, value in six.iteritems(values): - self.set(key, value, expire, noreply) - return True + result = self.set(key, value, expire, noreply) + if not result: + failed.append(key) + return failed set_multi = set_many @@ -656,7 +659,7 @@ def version(self): if not result.startswith(b'VERSION '): raise MemcacheUnknownError( - "Received unexpected response: %s" % (result, )) + "Received unexpected response: %s" % (result, )) return result[8:] @@ -930,7 +933,8 @@ def set(self, key, value, expire=0, noreply=None): def set_many(self, values, expire=0, noreply=None): with self.client_pool.get_and_release(destroy_on_fail=True) as client: - return client.set_many(values, expire=expire, noreply=noreply) + failed = client.set_many(values, expire=expire, noreply=noreply) + return failed set_multi = set_many diff --git a/pymemcache/client/hash.py b/pymemcache/client/hash.py index 9aa95ace..63bae658 100644 --- a/pymemcache/client/hash.py +++ b/pymemcache/client/hash.py @@ -1,6 +1,7 @@ import socket import time import logging +import six from pymemcache.client.base import Client, PooledClient, _check_key from pymemcache.client.rendezvous import RendezvousHash @@ -13,6 +14,7 @@ class HashClient(object): """ A client for communicating with a cluster of memcached servers """ + def __init__( self, servers, @@ -171,34 +173,7 @@ def _safely_run_func(self, client, func, default_val, *args, **kwargs): # Connecting to the server fail, we should enter # retry mode except socket.error: - # This client has never failed, lets mark it for failure - if ( - client.server not in self._failed_clients and - self.retry_attempts > 0 - ): - self._failed_clients[client.server] = { - 'failed_time': time.time(), - 'attempts': 0, - } - # We aren't allowing any retries, we should mark the server as - # dead immediately - elif ( - client.server not in self._failed_clients and - self.retry_attempts <= 0 - ): - self._failed_clients[client.server] = { - 'failed_time': time.time(), - 'attempts': 0, - } - logger.debug("marking server as dead %s", client.server) - self.remove_server(*client.server) - # This client has failed previously, we need to update the metadata - # to reflect that we have attempted it again - else: - failed_metadata = self._failed_clients[client.server] - failed_metadata['attempts'] += 1 - failed_metadata['failed_time'] = time.time() - self._failed_clients[client.server] = failed_metadata + self._mark_failed_server(client.server) # if we haven't enabled ignore_exc, don't move on gracefully, just # raise the exception @@ -214,6 +189,95 @@ def _safely_run_func(self, client, func, default_val, *args, **kwargs): return default_val + def _safely_run_set_many(self, client, values, *args, **kwargs): + failed = [] + succeeded = [] + try: + if client.server in self._failed_clients: + # This server is currently failing, lets check if it is in + # retry or marked as dead + failed_metadata = self._failed_clients[client.server] + + # we haven't tried our max amount yet, if it has been enough + # time lets just retry using it + if failed_metadata['attempts'] < self.retry_attempts: + failed_time = failed_metadata['failed_time'] + if time.time() - failed_time > self.retry_timeout: + logger.debug( + 'retrying failed server: %s', client.server + ) + succeeded, failed, err = self._set_many( + client, values, *args, **kwargs) + if err is not None: + raise err + # we were successful, lets remove it from the failed + # clients + self._failed_clients.pop(client.server) + return failed + return values.keys() + else: + # We've reached our max retry attempts, we need to mark + # the sever as dead + logger.debug('marking server as dead: %s', client.server) + self.remove_server(*client.server) + + succeeded, failed, err = self._set_many( + client, values, *args, **kwargs + ) + if err is not None: + raise err + + return failed + + # Connecting to the server fail, we should enter + # retry mode + except socket.error: + self._mark_failed_server(client.server) + + # if we haven't enabled ignore_exc, don't move on gracefully, just + # raise the exception + if not self.ignore_exc: + raise + + return list(set(values.keys()) - set(succeeded)) + except Exception: + # any exceptions that aren't socket.error we need to handle + # gracefully as well + if not self.ignore_exc: + raise + + return list(set(values.keys()) - set(succeeded)) + + def _mark_failed_server(self, server): + # This client has never failed, lets mark it for failure + if ( + server not in self._failed_clients and + self.retry_attempts > 0 + ): + self._failed_clients[server] = { + 'failed_time': time.time(), + 'attempts': 0, + } + # We aren't allowing any retries, we should mark the server as + # dead immediately + elif ( + server not in self._failed_clients and + self.retry_attempts <= 0 + ): + self._failed_clients[server] = { + 'failed_time': time.time(), + 'attempts': 0, + } + logger.debug("marking server as dead %s", server) + self.remove_server(*server) + # This client has failed previously, we need to update the metadata + # to reflect that we have attempted it again + else: + failed_metadata = self._failed_clients[server] + failed_metadata['attempts'] += 1 + failed_metadata['failed_time'] = time.time() + self._failed_clients[server] = failed_metadata + def _run_cmd(self, cmd, key, default_val, *args, **kwargs): client = self._get_client(key) @@ -227,6 +291,22 @@ def _run_cmd(self, cmd, key, default_val, *args, **kwargs): client, func, default_val, *args, **kwargs ) + def _set_many(self, client, values, *args, **kwargs): + failed = [] + succeeded = [] + + try: + for key, value in six.iteritems(values): + result = client.set(key, value, *args, **kwargs) + if result: + succeeded.append(key) + else: + failed.append(key) + except Exception as e: + return succeeded, failed, e + + return succeeded, failed, None + def set(self, key, *args, **kwargs): return self._run_cmd('set', key, False, *args, **kwargs) @@ -241,13 +321,13 @@ def decr(self, key, *args, **kwargs): def set_many(self, values, *args, **kwargs): client_batches = {} - end = [] + failed = [] - for key, value in values.items(): + for key, value in six.iteritems(values): client = self._get_client(key) if client is None: - end.append(False) + failed.append(key) continue if client.server not in client_batches: @@ -257,15 +337,12 @@ def set_many(self, values, *args, **kwargs): for server, values in client_batches.items(): client = self.clients['%s:%s' % server] - new_args = list(args) - new_args.insert(0, values) - result = self._safely_run_func( - client, - client.set_many, False, *new_args, **kwargs + + failed += self._safely_run_set_many( + client, values, *args, **kwargs ) - end.append(result) - return all(end) + return failed set_multi = set_many diff --git a/pymemcache/test/test_client.py b/pymemcache/test/test_client.py index 2c173ead..a20b740f 100644 --- a/pymemcache/test/test_client.py +++ b/pymemcache/test/test_client.py @@ -156,13 +156,13 @@ def test_set_noreply(self): def test_set_many_success(self): client = self.make_client([b'STORED\r\n']) result = client.set_many({b'key': b'value'}, noreply=False) - assert result is True + assert result == [] def test_set_multi_success(self): # Should just map to set_many client = self.make_client([b'STORED\r\n']) result = client.set_multi({b'key': b'value'}, noreply=False) - assert result is True + assert result == [] def test_add_stored(self): client = self.make_client([b'STORED\r', b'\n']) @@ -602,7 +602,7 @@ def _set(): def test_set_many_socket_handling(self): client = self.make_client([b'STORED\r\n']) result = client.set_many({b'key': b'value'}, noreply=False) - assert result is True + assert result == [] assert client.sock.closed is False assert len(client.sock.send_bufs) == 1 @@ -738,18 +738,26 @@ def _default_noreply_true(self, cmd, args, response): result = getattr(client, cmd)(*args) assert result is True + def _default_noreply_true_and_empty_list(self, cmd, args, response): + client = self.make_client(response, default_noreply=True) + result = getattr(client, cmd)(*args) + assert result == [] + def test_default_noreply_set(self): with pytest.raises(MemcacheUnknownError): self._default_noreply_false( - 'set', (b'key', b'value'), [b'NOT_STORED\r\n']) + 'set', (b'key', b'value'), [b'UNKNOWN\r\n']) + self._default_noreply_false( + 'set', (b'key', b'value'), [b'NOT_STORED\r\n']) self._default_noreply_true( 'set', (b'key', b'value'), [b'NOT_STORED\r\n']) def test_default_noreply_set_many(self): with pytest.raises(MemcacheUnknownError): - self._default_noreply_false( - 'set_many', ({b'key': b'value'},), [b'NOT_STORED\r\n']) - self._default_noreply_true( + client = self.make_client([b'UNKNOWN\r\n'], default_noreply=False) + result = client.set_many({b'key': b'value'}) + assert result == [b'key'] + self._default_noreply_true_and_empty_list( 'set_many', ({b'key': b'value'},), [b'NOT_STORED\r\n']) def test_default_noreply_add(self): @@ -854,18 +862,25 @@ def _default_noreply_true(self, cmd, args, response): result = getattr(client, cmd)(*args) assert result is True + def _default_noreply_true_and_empty_list(self, cmd, args, response): + client = self.make_client(response, default_noreply=True) + result = getattr(client, cmd)(*args) + assert result == [] + def test_default_noreply_set(self): with pytest.raises(MemcacheUnknownError): self._default_noreply_false( - 'set', (b'key', b'value'), [b'NOT_STORED\r\n']) + 'set', (b'key', b'value'), [b'UNKNOWN\r\n']) + self._default_noreply_false( + 'set', (b'key', b'value'), [b'NOT_STORED\r\n']) self._default_noreply_true( 'set', (b'key', b'value'), [b'NOT_STORED\r\n']) def test_default_noreply_set_many(self): with pytest.raises(MemcacheUnknownError): - self._default_noreply_false( - 'set_many', ({b'key': b'value'},), [b'NOT_STORED\r\n']) - self._default_noreply_true( + client = self.make_client([b'UNKNOWN\r\n'], default_noreply=False) + client.set_many({b'key': b'value'}) + self._default_noreply_true_and_empty_list( 'set_many', ({b'key': b'value'},), [b'NOT_STORED\r\n']) def test_default_noreply_add(self): @@ -1008,5 +1023,5 @@ def test_recv(self): b'key1 0 6\r\nval', socket.error(errno.EINTR, "Interrupted system call"), b'ue1\r\nEND\r\n', - ]) + ]) assert client[b'key1'] == b'value1' diff --git a/pymemcache/test/test_client_hash.py b/pymemcache/test/test_client_hash.py index e8437d3d..cea71bdf 100644 --- a/pymemcache/test/test_client_hash.py +++ b/pymemcache/test/test_client_hash.py @@ -200,7 +200,7 @@ def test_no_servers_left_with_set_many(self): ) result = client.set_many({'foo': 'bar'}) - assert result is False + assert result == ['foo'] def test_no_servers_left_with_get_many(self): from pymemcache.client.hash import HashClient @@ -213,4 +213,44 @@ def test_no_servers_left_with_get_many(self): result = client.get_many(['foo', 'bar']) assert result == {'foo': False, 'bar': False} + def test_ignore_exec_set_many(self): + values = { + 'key1': 'value1', + 'key2': 'value2', + 'key3': 'value3' + } + + with pytest.raises(MemcacheUnknownError): + client = self.make_client(*[ + [b'STORED\r\n', b'UNKNOWN\r\n', b'STORED\r\n'], + [b'STORED\r\n', b'UNKNOWN\r\n', b'STORED\r\n'], + ]) + client.set_many(values, noreply=False) + + client = self.make_client(*[ + [b'STORED\r\n', b'UNKNOWN\r\n', b'STORED\r\n'], + ], ignore_exc=True) + result = client.set_many(values, noreply=False) + + assert len(result) == 2 + + def test_noreply_set_many(self): + values = { + 'key1': 'value1', + 'key2': 'value2', + 'key3': 'value3' + } + + client = self.make_client(*[ + [b'STORED\r\n', b'NOT_STORED\r\n', b'STORED\r\n'], + ]) + result = client.set_many(values, noreply=False) + assert len(result) == 1 + + client = self.make_client(*[ + [b'STORED\r\n', b'NOT_STORED\r\n', b'STORED\r\n'], + ]) + result = client.set_many(values, noreply=True) + assert result == [] + # TODO: Test failover logic diff --git a/pymemcache/test/utils.py b/pymemcache/test/utils.py index e453a622..7325f2d9 100644 --- a/pymemcache/test/utils.py +++ b/pymemcache/test/utils.py @@ -105,7 +105,7 @@ def set(self, key, value, expire=0, noreply=True): def set_many(self, values, expire=None, noreply=True): for key, value in six.iteritems(values): self.set(key, value, expire, noreply) - return True + return [] set_multi = set_many