diff --git a/core/src/main/java/com/datastrato/gravitino/storage/kv/KvEntityStore.java b/core/src/main/java/com/datastrato/gravitino/storage/kv/KvEntityStore.java index 4ede9212cd3..cbaf9882868 100644 --- a/core/src/main/java/com/datastrato/gravitino/storage/kv/KvEntityStore.java +++ b/core/src/main/java/com/datastrato/gravitino/storage/kv/KvEntityStore.java @@ -38,6 +38,7 @@ import java.util.Collections; import java.util.List; import java.util.Objects; +import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.function.Function; import java.util.stream.Collectors; import lombok.Getter; @@ -58,6 +59,10 @@ public class KvEntityStore implements EntityStore { ImmutableMap.of("RocksDBKvBackend", RocksDBKvBackend.class.getCanonicalName()); @Getter @VisibleForTesting private KvBackend backend; + + // Lock to control the concurrency of the entity store, to be more exact, the concurrency of + // accessing the underlying kv store. + private ReentrantReadWriteLock reentrantReadWriteLock; private EntityKeyEncoder entityKeyEncoder; private NameMappingService nameMappingService; private EntitySerDe serDe; @@ -69,6 +74,7 @@ public void initialize(Config config) throws RuntimeException { // instance, We should make it configurable in the future. this.nameMappingService = new KvNameMappingService(backend); this.entityKeyEncoder = new BinaryEntityKeyEncoder(nameMappingService); + this.reentrantReadWriteLock = new ReentrantReadWriteLock(); } @Override @@ -89,14 +95,16 @@ public List list( byte[] endKey = Bytes.increment(Bytes.wrap(startKey)).get(); List> kvs = - backend.scan( - new KvRangeScan.KvRangeScanBuilder() - .start(startKey) - .end(endKey) - .startInclusive(true) - .endInclusive(false) - .limit(Integer.MAX_VALUE) - .build()); + executeWithReadLock( + () -> + backend.scan( + new KvRangeScan.KvRangeScanBuilder() + .start(startKey) + .end(endKey) + .startInclusive(true) + .endInclusive(false) + .limit(Integer.MAX_VALUE) + .build())); for (Pair pairs : kvs) { entities.add(serDe.deserialize(pairs.getRight(), e)); @@ -112,7 +120,7 @@ public boolean exists(NameIdentifier ident, EntityType entityType) throws IOExce return false; } - return backend.get(key) != null; + return executeWithReadLock(() -> backend.get(key) != null); } @Override @@ -120,7 +128,12 @@ public void put(E e, boolean overwritten) throws IOException, EntityAlreadyExistsException { byte[] key = entityKeyEncoder.encode(e.nameIdentifier(), e.type()); byte[] value = serDe.serialize(e); - backend.put(key, value, overwritten); + + executeWithWriteLock( + () -> { + backend.put(key, value, overwritten); + return null; + }); } @Override @@ -128,37 +141,42 @@ public E update( NameIdentifier ident, Class type, EntityType entityType, Function updater) throws IOException, NoSuchEntityException, AlreadyExistsException { byte[] key = entityKeyEncoder.encode(ident, entityType); - return executeInTransaction( - () -> { - byte[] value = backend.get(key); - if (value == null) { - throw new NoSuchEntityException(ident.toString()); - } - - E e = serDe.deserialize(value, type); - E updatedE = updater.apply(e); - if (updatedE.nameIdentifier().equals(ident)) { - backend.put(key, serDe.serialize(updatedE), true); - return updatedE; - } - - // If we have changed the name of the entity, We would do the following steps: - // Check whether the new entities already existed - boolean newEntityExist = exists(updatedE.nameIdentifier(), entityType); - if (newEntityExist) { - throw new AlreadyExistsException( - String.format( - "Entity %s already exist, please check again", updatedE.nameIdentifier())); - } - - // Update the name mapping - nameMappingService.updateName( - generateKeyForMapping(ident), generateKeyForMapping(updatedE.nameIdentifier())); - // Update the entity to store - backend.put(key, serDe.serialize(updatedE), true); - return updatedE; - }); + return executeWithWriteLock( + () -> + executeInTransaction( + () -> { + byte[] value = backend.get(key); + if (value == null) { + throw new NoSuchEntityException(ident.toString()); + } + + E e = serDe.deserialize(value, type); + E updatedE = updater.apply(e); + if (updatedE.nameIdentifier().equals(ident)) { + backend.put(key, serDe.serialize(updatedE), true); + return updatedE; + } + + // If we have changed the name of the entity, We would do the following steps: + // Check whether the new entities already existed + boolean newEntityExist = exists(updatedE.nameIdentifier(), entityType); + if (newEntityExist) { + throw new AlreadyExistsException( + String.format( + "Entity %s already exist, please check again", + updatedE.nameIdentifier())); + } + + // Update the name mapping + nameMappingService.updateName( + generateKeyForMapping(ident), + generateKeyForMapping(updatedE.nameIdentifier())); + + // Update the entity to store + backend.put(key, serDe.serialize(updatedE), true); + return updatedE; + })); } private String concatIdAndName(long[] namespaceIds, String name) { @@ -227,7 +245,7 @@ public E get( throw new NoSuchEntityException(ident.toString()); } - byte[] value = backend.get(key); + byte[] value = executeWithReadLock(() -> backend.get(key)); if (value == null) { throw new NoSuchEntityException(ident.toString()); } @@ -291,44 +309,48 @@ private byte[] replacePrefixTypeInfo(byte[] encode, String subTypePrefix) { @Override public boolean delete(NameIdentifier ident, EntityType entityType, boolean cascade) throws IOException { - if (!exists(ident, entityType)) { - return false; - } + return executeWithWriteLock( + () -> { + if (!exists(ident, entityType)) { + return false; + } - byte[] dataKey = entityKeyEncoder.encode(ident, entityType, true); - List subEntityPrefix = getSubEntitiesPrefix(ident, entityType); - if (subEntityPrefix.isEmpty()) { - // has no sub-entities - return backend.delete(dataKey); - } + byte[] dataKey = entityKeyEncoder.encode(ident, entityType, true); + List subEntityPrefix = getSubEntitiesPrefix(ident, entityType); + if (subEntityPrefix.isEmpty()) { + // has no sub-entities + return backend.delete(dataKey); + } - byte[] directChild = Iterables.getLast(subEntityPrefix); - byte[] endKey = Bytes.increment(Bytes.wrap(directChild)).get(); - List> kvs = - backend.scan( - new KvRangeScan.KvRangeScanBuilder() - .start(directChild) - .end(endKey) - .startInclusive(true) - .endInclusive(false) - .limit(1) - .build()); - - if (!cascade && !kvs.isEmpty()) { - throw new NonEmptyEntityException( - String.format("Entity %s has sub-entities, you should remove sub-entities first", ident)); - } + byte[] directChild = Iterables.getLast(subEntityPrefix); + byte[] endKey = Bytes.increment(Bytes.wrap(directChild)).get(); + List> kvs = + backend.scan( + new KvRangeScan.KvRangeScanBuilder() + .start(directChild) + .end(endKey) + .startInclusive(true) + .endInclusive(false) + .limit(1) + .build()); + + if (!cascade && !kvs.isEmpty()) { + throw new NonEmptyEntityException( + String.format( + "Entity %s has sub-entities, you should remove sub-entities first", ident)); + } - for (byte[] prefix : subEntityPrefix) { - backend.deleteRange( - new KvRangeScan.KvRangeScanBuilder() - .start(prefix) - .startInclusive(true) - .end(Bytes.increment(Bytes.wrap(prefix)).get()) - .build()); - } + for (byte[] prefix : subEntityPrefix) { + backend.deleteRange( + new KvRangeScan.KvRangeScanBuilder() + .start(prefix) + .startInclusive(true) + .end(Bytes.increment(Bytes.wrap(prefix)).get()) + .build()); + } - return backend.delete(dataKey); + return backend.delete(dataKey); + }); } @Override @@ -359,4 +381,27 @@ private static KvBackend createKvEntityBackend(Config config) { "Failed to create and initialize KvBackend by name: " + backendName, e); } } + + @FunctionalInterface + interface IOExecutable { + R execute() throws IOException; + } + + private R executeWithReadLock(IOExecutable executable) throws IOException { + reentrantReadWriteLock.readLock().lock(); + try { + return executable.execute(); + } finally { + reentrantReadWriteLock.readLock().unlock(); + } + } + + private R executeWithWriteLock(IOExecutable executable) throws IOException { + reentrantReadWriteLock.writeLock().lock(); + try { + return executable.execute(); + } finally { + reentrantReadWriteLock.writeLock().unlock(); + } + } } diff --git a/core/src/test/java/com/datastrato/gravitino/storage/kv/TestKvEntityStorage.java b/core/src/test/java/com/datastrato/gravitino/storage/kv/TestKvEntityStorage.java index 7659f90f7a4..cfe4bc87002 100644 --- a/core/src/test/java/com/datastrato/gravitino/storage/kv/TestKvEntityStorage.java +++ b/core/src/test/java/com/datastrato/gravitino/storage/kv/TestKvEntityStorage.java @@ -30,10 +30,19 @@ import com.datastrato.gravitino.meta.SchemaEntity; import com.datastrato.gravitino.meta.SchemaVersion; import com.datastrato.gravitino.meta.TableEntity; +import com.google.common.io.Files; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import java.io.File; import java.io.IOException; import java.time.Instant; import java.util.List; import java.util.UUID; +import java.util.concurrent.CompletionService; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorCompletionService; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import org.apache.commons.io.FileUtils; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Assertions; @@ -736,4 +745,116 @@ void testCreateKvEntityStore() throws IOException { store.put(updatedMetalake2); } } + + @Test + void testConcurrentIssues() throws IOException, ExecutionException, InterruptedException { + Config config = Mockito.mock(Config.class); + File file = Files.createTempDir(); + file.deleteOnExit(); + Mockito.when(config.get(ENTITY_STORE)).thenReturn("kv"); + Mockito.when(config.get(ENTITY_KV_STORE)).thenReturn(DEFAULT_ENTITY_KV_STORE); + Mockito.when(config.get(Configs.ENTITY_SERDE)).thenReturn("proto"); + Mockito.when(config.get(ENTRY_KV_ROCKSDB_BACKEND_PATH)).thenReturn(file.getAbsolutePath()); + ThreadPoolExecutor threadPoolExecutor = + new ThreadPoolExecutor( + 10, + 20, + 0L, + TimeUnit.MILLISECONDS, + new LinkedBlockingQueue(1000), + new ThreadFactoryBuilder().setDaemon(true).setNameFormat("gravitino-t-%d").build()); + + CompletionService future = new ExecutorCompletionService<>(threadPoolExecutor); + + try (EntityStore store = EntityStoreFactory.createEntityStore(config)) { + store.initialize(config); + Assertions.assertTrue(store instanceof KvEntityStore); + store.setSerDe(EntitySerDeFactory.createEntitySerDe(config.get(Configs.ENTITY_SERDE))); + + AuditInfo auditInfo = + new AuditInfo.Builder().withCreator("creator").withCreateTime(Instant.now()).build(); + + BaseMetalake metalake = createBaseMakeLake("metalake", auditInfo); + CatalogEntity catalog = createCatalog(Namespace.of("metalake"), "catalog", auditInfo); + + store.put(metalake); + store.put(catalog); + Assertions.assertNotNull( + store.get(catalog.nameIdentifier(), EntityType.CATALOG, CatalogEntity.class)); + + // Delete the catalog entity, and we try to use multi-thread to delete it and make sure only + // one thread can delete it. + for (int i = 0; i < 10; i++) { + future.submit( + () -> store.delete(NameIdentifier.of("metalake", "catalog"), EntityType.CATALOG)); + } + int totalSuccessNum = 0; + for (int i = 0; i < 10; i++) { + totalSuccessNum += future.take().get() ? 1 : 0; + } + Assertions.assertEquals(1, totalSuccessNum); + + // Try to use multi-thread to put the same catalog entity, and make sure only one thread can + // put it. + for (int i = 0; i < 20; i++) { + future.submit( + () -> { + store.put(catalog); /* overwrite is false, then only one will save it successfully */ + return null; + }); + } + + int totalFailed = 0; + for (int i = 0; i < 20; i++) { + try { + future.take().get(); + } catch (Exception e) { + Assertions.assertTrue(e.getCause() instanceof EntityAlreadyExistsException); + totalFailed++; + } + } + Assertions.assertEquals(19, totalFailed); + + // Try to use multi-thread to update the same catalog entity, and make sure only one thread + // can update it. + for (int i = 0; i < 10; i++) { + future.submit( + () -> { + // Ten threads rename the catalog entity from 'catalog' to 'catalog1' at the same + // time. + store.update( + NameIdentifier.of("metalake", "catalog"), + CatalogEntity.class, + EntityType.CATALOG, + e -> { + AuditInfo auditInfo1 = + new AuditInfo.Builder() + .withCreator("creator1") + .withCreateTime(Instant.now()) + .build(); + return createCatalog(Namespace.of("metalake"), "catalog1", auditInfo1); + }); + return null; + }); + } + + totalFailed = 0; + for (int i = 0; i < 10; i++) { + try { + future.take().get(); + } catch (Exception e) { + // It may throw NoSuchEntityException or AlreadyExistsException + // NoSuchEntityException: because old entity has been renamed by the other thread already, + // we can't get the old one. + // AlreadyExistsException: because the entity has been renamed by the other thread + // already, we can't rename it again. + Assertions.assertTrue( + e.getCause() instanceof AlreadyExistsException + || e.getCause() instanceof NoSuchEntityException); + totalFailed++; + } + } + Assertions.assertEquals(9, totalFailed); + } + } }