Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: update data model for infinispan #2156

Merged
merged 7 commits into from
Mar 9, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -10,71 +10,66 @@
package org.zowe.apiml.caching.service.infinispan.storage;

import lombok.extern.slf4j.Slf4j;
import org.infinispan.Cache;
import org.zowe.apiml.caching.model.KeyValue;
import org.zowe.apiml.caching.service.Messages;
import org.zowe.apiml.caching.service.Storage;
import org.zowe.apiml.caching.service.StorageException;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;

@Slf4j
public class InfinispanStorage implements Storage {


private final Cache<String, Map<String, KeyValue>> cache;
private final ConcurrentMap<String, KeyValue> cache;

public InfinispanStorage(Cache<String, Map<String, KeyValue>> cache) {
public InfinispanStorage(ConcurrentMap<String, KeyValue> cache) {
this.cache = cache;
}

@Override
public KeyValue create(String serviceId, KeyValue toCreate) {
toCreate.setServiceId(serviceId);
log.info("Writing record: {}|{}|{}", serviceId, toCreate.getKey(), toCreate.getValue());

Map<String, KeyValue> serviceCache = cache.computeIfAbsent(serviceId, k -> new HashMap<>());
KeyValue serviceCache = cache.putIfAbsent(serviceId + toCreate.getKey(), toCreate);
Comment on lines +34 to +37
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the reason for using serviceId + toCreate.getKey() as the key, rather than a nested map structure like the in memory implementation? This seems to add complexity to the code in the read/delete all for service endpoints, and creates unintuitive coupling between the CRUD operations to calculate the correct key.

I think a comment explaining this might be useful in this file.

Copy link
Member Author

@achmelo achmelo Mar 9, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nested map structure was used in the original implementation of Infinispan storage and it turned out to be slow and error-prone in HA setup. Cache object can be mutated by another instance of Caching service when it is not directly used, e.g.

Map<String, KeyValue> serviceCache = cache.computeIfAbsent(serviceId, k -> new HashMap<>());
KeyValue entry = serviceCache.put(toCreate.getKey(), toCreate);
        cache.put(serviceId, serviceCache);

after conputeIfAbsent is returned, the cache lock is released and can be acquired by another instance. When cache.put() is called, the object could not be the same. It could be also an issue for services with too many entries. Each call for serviceId will return a large object that will not be necessarily useful.
There is a trade-off for read/delete all methods but I think that those will not be used extensively. If this will not be the case, we can adjust them.


if (serviceCache.containsKey(toCreate.getKey())) {
if (serviceCache != null) {
throw new StorageException(Messages.DUPLICATE_KEY.getKey(), Messages.DUPLICATE_KEY.getStatus(), toCreate.getKey());
}
KeyValue entry = serviceCache.put(toCreate.getKey(), toCreate);
cache.put(serviceId, serviceCache);
return entry;
return null;
}

@Override
public KeyValue read(String serviceId, String key) {
log.info("Reading record for service {} under key {}", serviceId, key);
Map<String, KeyValue> serviceCache = cache.get(serviceId);
if (serviceCache != null && serviceCache.containsKey(key)) {
return serviceCache.get(key);
KeyValue serviceCache = cache.get(serviceId + key);
if (serviceCache != null) {
return serviceCache;
} else {
throw new StorageException(Messages.KEY_NOT_IN_CACHE.getKey(), Messages.KEY_NOT_IN_CACHE.getStatus(), key, serviceId);
}
}

@Override
public KeyValue update(String serviceId, KeyValue toUpdate) {
toUpdate.setServiceId(serviceId);
log.info("Updating record for service {} under key {}", serviceId, toUpdate);
Map<String, KeyValue> serviceCache = cache.get(serviceId);
if (serviceCache == null || !serviceCache.containsKey(toUpdate.getKey())) {
KeyValue serviceCache = cache.put(serviceId + toUpdate.getKey(), toUpdate);
if (serviceCache == null) {
throw new StorageException(Messages.KEY_NOT_IN_CACHE.getKey(), Messages.KEY_NOT_IN_CACHE.getStatus(), toUpdate.getKey(), serviceId);
}
serviceCache.put(toUpdate.getKey(), toUpdate);
cache.put(serviceId, serviceCache);
return toUpdate;

}

@Override
public KeyValue delete(String serviceId, String toDelete) {
log.info("Removing record for service {} under key {}", serviceId, toDelete);
Map<String, KeyValue> serviceCache = cache.get(serviceId);
KeyValue entry;
if (serviceCache.containsKey(toDelete)) {
entry = serviceCache.remove(toDelete);
cache.put(serviceId, serviceCache);
KeyValue entry = cache.remove(serviceId + toDelete);
if (entry != null) {
return entry;
} else {
throw new StorageException(Messages.KEY_NOT_IN_CACHE.getKey(), Messages.KEY_NOT_IN_CACHE.getStatus(), toDelete, serviceId);
Expand All @@ -84,12 +79,22 @@ public KeyValue delete(String serviceId, String toDelete) {
@Override
public Map<String, KeyValue> readForService(String serviceId) {
log.info("Reading all records for service {} ", serviceId);
return cache.get(serviceId);
Map<String, KeyValue> result = new HashMap<>();
cache.forEach((key, value) -> {
if (serviceId.equals(value.getServiceId())) {
result.put(value.getKey(), value);
}
});
return result;
}

@Override
public void deleteForService(String serviceId) {
log.info("Removing all records for service {} ", serviceId);
cache.remove(serviceId);
cache.forEach((key, value) -> {
if (value.getServiceId().equals(serviceId)) {
cache.remove(key);
}
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@
import org.zowe.apiml.caching.model.KeyValue;
import org.zowe.apiml.caching.service.StorageException;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

import static org.junit.jupiter.api.Assertions.*;
import static org.mockito.Mockito.*;
Expand All @@ -26,7 +26,7 @@ class InfinispanStorageTest {

public static final KeyValue TO_CREATE = new KeyValue("key1", "val1");
public static final KeyValue TO_UPDATE = new KeyValue("key1", "val2");
Cache<String, Map<String, KeyValue>> cache;
Cache<String, KeyValue> cache;
InfinispanStorage storage;
String serviceId1 = "service1";

Expand All @@ -39,86 +39,101 @@ void setup() {
@Nested
class WhenEntryDoesntExist {

Map<String, KeyValue> serviceStore;
KeyValue keyValue;

@BeforeEach
void createEmptyStore() {
serviceStore = new HashMap<>();
}

@Test
void whenCreate_thenCacheIsUpdated() {
when(cache.computeIfAbsent(any(), any())).thenReturn(serviceStore);
storage.create(serviceId1, TO_CREATE);
verify(cache, times(1)).put(serviceId1, serviceStore);
keyValue = null;
}

@Test
void whenRead_thenExceptionIsThrown() {
String key = TO_CREATE.getKey();
when(cache.get(serviceId1)).thenReturn(serviceStore);
when(cache.get(serviceId1)).thenReturn(keyValue);
assertThrows(StorageException.class, () -> storage.read(serviceId1, key));
}

@Test
void whenDelete_thenExceptionIsThrown() {
void whenUpdate_thenExceptionIsThrown() {
KeyValue entry = new KeyValue("key", "value");
when(cache.get(serviceId1)).thenReturn(keyValue);
assertThrows(StorageException.class, () -> storage.update(serviceId1, entry));
}

String key = TO_CREATE.getKey();
when(cache.get(serviceId1)).thenReturn(serviceStore);
assertThrows(StorageException.class, () -> storage.delete(serviceId1, key));
verify(cache, times(0)).put(serviceId1, serviceStore);
@Test
void whenAddNew_returnNull() {
keyValue = new KeyValue("key", "value");
assertNull(storage.create(serviceId1, keyValue));
}

@Test
void whenUpdate_thenCacheIsUpdated() {
void whenDelete_thenExceptionIsThrown() {

when(cache.get(serviceId1)).thenReturn(serviceStore);
assertThrows(StorageException.class, () -> storage.update(serviceId1, TO_UPDATE));
verify(cache, times(0)).put(serviceId1, serviceStore);
String key = TO_CREATE.getKey();
when(cache.remove(serviceId1 + key)).thenReturn(null);
assertThrows(StorageException.class, () -> storage.delete(serviceId1, key));
}

}


@Nested
class WhenEntryExists {
Map<String, KeyValue> serviceStore;
KeyValue keyValue;

@BeforeEach
void createStoreWithEntry() {
serviceStore = new HashMap<>();
serviceStore.put(TO_CREATE.getKey(), TO_CREATE);
keyValue = TO_CREATE;
}

@Test
void whenCreate_thenExceptionIsThrown() {
when(cache.computeIfAbsent(any(), any())).thenReturn(serviceStore);
void exceptionIsThrown() {
when(cache.putIfAbsent(any(), any())).thenReturn(keyValue);
assertThrows(StorageException.class, () -> storage.create(serviceId1, TO_CREATE));
}

@Test
void whenRead_thenEntryIsReturned() {
when(cache.get(serviceId1)).thenReturn(serviceStore);
void entryIsReturned() {
when(cache.get(serviceId1 + TO_CREATE.getKey())).thenReturn(TO_CREATE);
KeyValue result = storage.read(serviceId1, TO_CREATE.getKey());
assertEquals(TO_CREATE.getValue(), result.getValue());
}

@Test
void whenUpdate_thenCacheIsUpdated() {
void cacheIsUpdated() {

when(cache.get(serviceId1)).thenReturn(serviceStore);
when(cache.put(serviceId1 + TO_UPDATE.getKey(), TO_UPDATE)).thenReturn(TO_UPDATE);
storage.update(serviceId1, TO_UPDATE);
verify(cache, times(1)).put(serviceId1, serviceStore);
assertEquals("val2", serviceStore.get(TO_CREATE.getKey()).getValue());
verify(cache, times(1)).put(serviceId1 + TO_UPDATE.getKey(), TO_UPDATE);
assertEquals("val2", TO_UPDATE.getValue());
}

@Test
void whenDelete_thenCacheIsUpdated() {
void itemIsDeleted() {
ConcurrentMap<String, KeyValue> cache = new ConcurrentHashMap<>();
InfinispanStorage storage = new InfinispanStorage(cache);
assertNull(storage.create(serviceId1, TO_CREATE));
assertEquals(TO_CREATE, storage.delete(serviceId1, TO_CREATE.getKey()));
}

when(cache.get(serviceId1)).thenReturn(serviceStore);
KeyValue result = storage.delete(serviceId1, TO_CREATE.getKey());
verify(cache, times(1)).put(serviceId1, serviceStore);
assertEquals(TO_CREATE.getValue(), result.getValue());
assertNull(serviceStore.get(TO_CREATE.getKey()));
@Test
void returnAll() {
ConcurrentMap<String, KeyValue> cache = new ConcurrentHashMap<>();
InfinispanStorage storage = new InfinispanStorage(cache);
storage.create(serviceId1, new KeyValue("key", "value"));
storage.create(serviceId1, new KeyValue("key2", "value2"));
assertEquals(2, storage.readForService(serviceId1).size());
}

@Test
void removeAll() {
ConcurrentMap<String, KeyValue> cache = new ConcurrentHashMap<>();
InfinispanStorage storage = new InfinispanStorage(cache);
storage.create(serviceId1, new KeyValue("key", "value"));
storage.create(serviceId1, new KeyValue("key2", "value2"));
assertEquals(2, storage.readForService(serviceId1).size());
storage.deleteForService(serviceId1);
assertEquals(0, storage.readForService(serviceId1).size());
}

}
Expand Down
2 changes: 1 addition & 1 deletion config/profiling/run-profiling.sh
Original file line number Diff line number Diff line change
Expand Up @@ -76,4 +76,4 @@ then
fi
fi

jmeter -Jhost=$host -Jport=$port -Jthreads=$threads -Jdataset=$dataset -Jjmeter.reportgenerator.overall_granularity=1000 -n -t caching-profiling-parametrized.jmx -l $dir/result -e -o $dir/test-results -j $dir/result.log
jmeter -D javax.net.ssl.keyStore=../../keystore/client_cert/client-certs.p12 -D javax.net.ssl.keyStorePassword=password -Jhost=$host -Jport=$port -Jthreads=$threads -Jdataset=$dataset -Jjmeter.reportgenerator.overall_granularity=1000 -n -t caching-profiling-parametrized.jmx -l $dir/result -e -o $dir/test-results -j $dir/result.log