diff --git a/common/src/main/java/org/apache/celeborn/common/network/util/NettyUtils.java b/common/src/main/java/org/apache/celeborn/common/network/util/NettyUtils.java index 0a7641247e1..7da5603db4c 100644 --- a/common/src/main/java/org/apache/celeborn/common/network/util/NettyUtils.java +++ b/common/src/main/java/org/apache/celeborn/common/network/util/NettyUtils.java @@ -17,7 +17,6 @@ package org.apache.celeborn.common.network.util; -import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -124,19 +123,11 @@ private static PooledByteBufAllocator createPooledByteBufAllocator( * parameter value. */ public static synchronized PooledByteBufAllocator getSharedPooledByteBufAllocator( - CelebornConf conf, AbstractSource source, boolean allowCache) { + CelebornConf conf, boolean allowCache) { final int index = allowCache ? 0 : 1; if (_sharedPooledByteBufAllocator[index] == null) { _sharedPooledByteBufAllocator[index] = createPooledByteBufAllocator(true, allowCache, conf.networkAllocatorArenas()); - if (source != null) { - new NettyMemoryMetrics( - _sharedPooledByteBufAllocator[index], - "shared-pool-" + index, - conf.networkAllocatorVerboseMetric(), - source, - Collections.emptyMap()); - } } return _sharedPooledByteBufAllocator[index]; } @@ -148,27 +139,29 @@ public static PooledByteBufAllocator getPooledByteBufAllocator( public static PooledByteBufAllocator getPooledByteBufAllocator( TransportConf conf, AbstractSource source, boolean allowCache, int coreNum) { + PooledByteBufAllocator allocator; if (conf.getCelebornConf().networkShareMemoryAllocator()) { - return getSharedPooledByteBufAllocator( - conf.getCelebornConf(), - source, - allowCache && conf.getCelebornConf().networkMemoryAllocatorAllowCache()); - } - int arenas; - if (coreNum != 0) { - arenas = coreNum; + allocator = + getSharedPooledByteBufAllocator( + conf.getCelebornConf(), + allowCache && conf.getCelebornConf().networkMemoryAllocatorAllowCache()); } else { - arenas = conf.getCelebornConf().networkAllocatorArenas(); + int arenas; + if (coreNum != 0) { + arenas = coreNum; + } else { + arenas = conf.getCelebornConf().networkAllocatorArenas(); + } + allocator = createPooledByteBufAllocator(conf.preferDirectBufs(), allowCache, arenas); } - PooledByteBufAllocator allocator = - createPooledByteBufAllocator(conf.preferDirectBufs(), allowCache, arenas); if (source != null) { - String poolName = "default-netty-pool"; + String poolName; Map labels = new HashMap<>(); - String moduleName = conf.getModuleName(); - if (!moduleName.isEmpty()) { - poolName = moduleName; - int index = allocatorsIndex.compute(moduleName, (k, v) -> v == null ? 0 : v + 1); + if (conf.getCelebornConf().networkMemoryAllocatorAllowCache()) { + poolName = allowCache ? "netty-shared-cache-pool" : "netty-shared-non-cache-pool"; + } else { + poolName = conf.getModuleName(); + int index = allocatorsIndex.compute(poolName, (k, v) -> v == null ? 0 : v + 1); labels.put("allocatorIndex", String.valueOf(index)); } new NettyMemoryMetrics( diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala index c2f1c66a1d7..af9bb3fbcda 100644 --- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala @@ -132,7 +132,10 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs DeviceMonitor.createDeviceMonitor(conf, this, deviceInfos, tmpDiskInfos, workerSource) val storageBufferAllocator: PooledByteBufAllocator = - NettyUtils.getPooledByteBufAllocator(new TransportConf("StorageManager", conf), null, true) + NettyUtils.getPooledByteBufAllocator( + new TransportConf("StorageManager", conf), + workerSource, + true) // (mountPoint -> LocalFlusher) private val ( diff --git a/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/memory/MemoryPartitionFilesSorterSuiteJ.java b/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/memory/MemoryPartitionFilesSorterSuiteJ.java index 1d39839b0b9..1314e32af3b 100644 --- a/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/memory/MemoryPartitionFilesSorterSuiteJ.java +++ b/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/memory/MemoryPartitionFilesSorterSuiteJ.java @@ -36,7 +36,6 @@ import org.apache.celeborn.common.meta.FileInfo; import org.apache.celeborn.common.meta.MemoryFileInfo; import org.apache.celeborn.common.meta.ReduceFileMeta; -import org.apache.celeborn.common.metrics.source.AbstractSource; import org.apache.celeborn.common.network.util.NettyUtils; import org.apache.celeborn.common.unsafe.Platform; import org.apache.celeborn.service.deploy.worker.WorkerSource; @@ -59,9 +58,8 @@ public long[] prepare(int mapCount) { byte[] batchHeader = new byte[16]; fileInfo = new MemoryFileInfo(userIdentifier, true, new ReduceFileMeta(8 * 1024 * 1024)); - AbstractSource source = Mockito.mock(AbstractSource.class); PooledByteBufAllocator allocator = - NettyUtils.getSharedPooledByteBufAllocator(new CelebornConf(), source, false); + NettyUtils.getSharedPooledByteBufAllocator(new CelebornConf(), false); CompositeByteBuf buffer = allocator.compositeBuffer(); fileInfo.setBuffer(buffer);