From 618fc2de43713f8f1029e1137c61d1588b998a4f Mon Sep 17 00:00:00 2001 From: yeyuqiang Date: Tue, 12 Jan 2021 10:55:06 +0800 Subject: [PATCH] Plasma Memory Manager Implementation (#56) --- .../spark/io/pmem/PlasmaMemoryManager.java | 53 +++++++++++++++++++ .../io/pmem/PlasmaMemoryManagerSuite.java | 45 ++++++++++++++++ .../io/pmem/PlasmaOutputInputStreamSuite.java | 16 ++++++ 3 files changed, 114 insertions(+) create mode 100644 core/src/main/java/org/apache/spark/io/pmem/PlasmaMemoryManager.java create mode 100644 core/src/test/java/org/apache/spark/io/pmem/PlasmaMemoryManagerSuite.java diff --git a/core/src/main/java/org/apache/spark/io/pmem/PlasmaMemoryManager.java b/core/src/main/java/org/apache/spark/io/pmem/PlasmaMemoryManager.java new file mode 100644 index 0000000000000..52768e3cadfa9 --- /dev/null +++ b/core/src/main/java/org/apache/spark/io/pmem/PlasmaMemoryManager.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.io.pmem; + +import com.google.common.annotations.VisibleForTesting; + +import java.util.concurrent.atomic.AtomicLong; + +/** + * PMem Memory Manager + * Handle whether the system can provide enough pmem memory in numeric + */ +public class PlasmaMemoryManager { + + private long totalMemory; + @VisibleForTesting + AtomicLong memoryUsed = new AtomicLong(0); + + private double useRatio; + + public PlasmaMemoryManager(long totalMemory, double useRatio) { + this.totalMemory = totalMemory; + this.useRatio = useRatio; + } + + // Make pmem can be written when usage is lower than use ratio + public boolean isAvailable() { + return (totalMemory * useRatio - memoryUsed.get()) > 0; + } + + public void increase(long memorySize) { + memoryUsed.getAndAdd(memorySize); + } + + public void release(long memorySize) { + memoryUsed.getAndAdd(memorySize * -1); + } + +} diff --git a/core/src/test/java/org/apache/spark/io/pmem/PlasmaMemoryManagerSuite.java b/core/src/test/java/org/apache/spark/io/pmem/PlasmaMemoryManagerSuite.java new file mode 100644 index 0000000000000..4390a2b6980e3 --- /dev/null +++ b/core/src/test/java/org/apache/spark/io/pmem/PlasmaMemoryManagerSuite.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.io.pmem; + +import org.junit.Test; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +import static org.junit.Assert.*; + +public class PlasmaMemoryManagerSuite { + + @Test + public void testMultiThreadUseCase() throws InterruptedException { + PlasmaMemoryManager memoryManager = new PlasmaMemoryManager(100000000, 0.8); + int processNum = Runtime.getRuntime().availableProcessors(); + ExecutorService threadPool = Executors.newFixedThreadPool(processNum); + for (int i = 0; i < 100 * processNum; i++) { + threadPool.submit(() -> { + memoryManager.increase(100); + memoryManager.release(100); + }); + } + threadPool.shutdown(); + threadPool.awaitTermination(Long.MAX_VALUE, TimeUnit.MINUTES); + assertEquals(0, memoryManager.memoryUsed.get()); + } + +} diff --git a/core/src/test/java/org/apache/spark/io/pmem/PlasmaOutputInputStreamSuite.java b/core/src/test/java/org/apache/spark/io/pmem/PlasmaOutputInputStreamSuite.java index 150e954cc1f30..5a00a43e1ff72 100644 --- a/core/src/test/java/org/apache/spark/io/pmem/PlasmaOutputInputStreamSuite.java +++ b/core/src/test/java/org/apache/spark/io/pmem/PlasmaOutputInputStreamSuite.java @@ -1,3 +1,19 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.spark.io.pmem; import org.apache.spark.SparkConf;