diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/TezRuntimeUtils.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/TezRuntimeUtils.java index a1df131121..9d9b8c16c0 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/TezRuntimeUtils.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/TezRuntimeUtils.java @@ -26,7 +26,6 @@ import java.nio.ByteBuffer; import org.apache.hadoop.io.DataInputByteBuffer; -import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.tez.common.security.JobTokenSecretManager; import org.apache.tez.http.BaseHttpConnection; import org.apache.tez.http.HttpConnection; @@ -37,8 +36,6 @@ import org.slf4j.LoggerFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.CommonConfigurationKeys; -import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.tez.dag.api.TezUncheckedException; import org.apache.tez.runtime.api.OutputContext; import org.apache.tez.runtime.api.TaskContext; @@ -262,29 +259,4 @@ public static int deserializeShuffleProviderMetaData(ByteBuffer meta) in.close(); } } - - public static String getBufferSizeProperty(CompressionCodec codec) { - return getBufferSizeProperty(codec.getClass().getName()); - } - - public static String getBufferSizeProperty(String className) { - switch (className) { - case "org.apache.hadoop.io.compress.DefaultCodec": - case "org.apache.hadoop.io.compress.BZip2Codec": - case "org.apache.hadoop.io.compress.GzipCodec": - return CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY; - case "org.apache.hadoop.io.compress.SnappyCodec": - return CommonConfigurationKeys.IO_COMPRESSION_CODEC_SNAPPY_BUFFERSIZE_KEY; - case "org.apache.hadoop.io.compress.ZStandardCodec": - return CommonConfigurationKeys.IO_COMPRESSION_CODEC_ZSTD_BUFFER_SIZE_KEY; - case "org.apache.hadoop.io.compress.LzoCodec": - return CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZO_BUFFERSIZE_KEY; - case "com.hadoop.compression.lzo.LzoCodec": - return CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZO_BUFFERSIZE_KEY; - case "org.apache.hadoop.io.compress.Lz4Codec": - return CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZ4_BUFFERSIZE_KEY; - default: - return null; - } - } } diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java index a4bbf5aabf..4b91c34f8b 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java @@ -363,7 +363,7 @@ public Writer(Serialization keySerialization, Serialization valSerialization, FS void setupOutputStream(CompressionCodec codec) throws IOException { this.checksumOut = new IFileOutputStream(this.rawOut); if (codec != null) { - this.compressor = CodecPool.getCompressor(codec); + this.compressor = CodecUtils.getCompressor(codec); if (this.compressor != null) { this.compressor.reset(); this.compressedOut = codec.createOutputStream(checksumOut, compressor); @@ -773,7 +773,7 @@ public Reader(InputStream in, long length, checksumIn = new IFileInputStream(in, length, readAhead, readAheadLength/* , isCompressed */); if (isCompressed && codec != null) { - decompressor = CodecPool.getDecompressor(codec); + decompressor = CodecUtils.getDecompressor(codec); if (decompressor != null) { this.in = codec.createInputStream(checksumIn, decompressor); } else { @@ -818,7 +818,7 @@ public static void readToMemory(byte[] buffer, InputStream in, int compressedLen in = checksumIn; Decompressor decompressor = null; if (isCompressed && codec != null) { - decompressor = CodecPool.getDecompressor(codec); + decompressor = CodecUtils.getDecompressor(codec); if (decompressor != null) { decompressor.reset(); in = CodecUtils.getDecompressedInputStreamWithBufferSize(codec, checksumIn, decompressor, diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/utils/CodecUtils.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/utils/CodecUtils.java index 8e5154f3b0..4035da39e1 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/utils/CodecUtils.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/utils/CodecUtils.java @@ -23,6 +23,9 @@ import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeys; +import org.apache.hadoop.fs.CommonConfigurationKeysPublic; +import org.apache.hadoop.io.compress.CodecPool; import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.CompressionInputStream; import org.apache.hadoop.io.compress.Compressor; @@ -31,16 +34,18 @@ import org.apache.hadoop.util.ReflectionUtils; import org.apache.tez.runtime.library.api.TezRuntimeConfiguration; import org.apache.tez.runtime.library.common.ConfigUtils; -import org.apache.tez.runtime.library.common.TezRuntimeUtils; import org.apache.tez.runtime.library.common.sort.impl.IFile; import org.apache.tez.runtime.library.common.sort.impl.IFileInputStream; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.annotations.VisibleForTesting; + public final class CodecUtils { private static final Logger LOG = LoggerFactory.getLogger(IFile.class); - private static final int DEFAULT_BUFFER_SIZE = 128 * 1024; + @VisibleForTesting + static final int DEFAULT_BUFFER_SIZE = 256 * 1024; private CodecUtils() { } @@ -76,20 +81,21 @@ public static CompressionCodec getCodec(Configuration conf) throws IOException { public static InputStream getDecompressedInputStreamWithBufferSize(CompressionCodec codec, IFileInputStream checksumIn, Decompressor decompressor, int compressedLength) throws IOException { - String bufferSizeProp = TezRuntimeUtils.getBufferSizeProperty(codec); - Configurable configurableCodec = (Configurable) codec; - int originalSize = bufferSizeProp == null ? DEFAULT_BUFFER_SIZE : - configurableCodec.getConf().getInt(bufferSizeProp, DEFAULT_BUFFER_SIZE); - + String bufferSizeProp = getBufferSizeProperty(codec); CompressionInputStream in = null; if (bufferSizeProp != null) { + Configurable configurableCodec = (Configurable) codec; Configuration conf = configurableCodec.getConf(); - int newBufSize = Math.min(compressedLength, DEFAULT_BUFFER_SIZE); - LOG.trace("buffer size was set according to min(compressedLength, {}): {}={}", - DEFAULT_BUFFER_SIZE, bufferSizeProp, newBufSize); - synchronized (codec) { + synchronized (conf) { + int defaultBufferSize = getDefaultBufferSize(conf, codec); + int originalSize = conf.getInt(bufferSizeProp, defaultBufferSize); + + int newBufSize = Math.min(compressedLength, defaultBufferSize); + LOG.info("buffer size was set according to min({}, {}) => {}={}", compressedLength, + defaultBufferSize, bufferSizeProp, newBufSize); + conf.setInt(bufferSizeProp, newBufSize); in = codec.createInputStream(checksumIn, decompressor); @@ -117,7 +123,7 @@ public static InputStream getDecompressedInputStreamWithBufferSize(CompressionCo * issues above for Compressor instances as well, even when we tried to leverage from * smaller buffer size only on decompression paths. */ - configurableCodec.getConf().setInt(bufferSizeProp, originalSize); + conf.setInt(bufferSizeProp, originalSize); } } else { in = codec.createInputStream(checksumIn, decompressor); @@ -125,4 +131,68 @@ public static InputStream getDecompressedInputStreamWithBufferSize(CompressionCo return in; } + + public static Compressor getCompressor(CompressionCodec codec) { + Configurable configurableCodec = (Configurable) codec; + Configuration conf = configurableCodec.getConf(); + String bufferSizeProp = CodecUtils.getBufferSizeProperty(codec); + LOG.info("getting compressor with buffer size: {}", conf.getInt(bufferSizeProp, getDefaultBufferSize(conf, codec))); + return CodecPool.getCompressor(codec); + } + + public static Decompressor getDecompressor(CompressionCodec codec) { + Configurable configurableCodec = (Configurable) codec; + Configuration conf = configurableCodec.getConf(); + String bufferSizeProp = CodecUtils.getBufferSizeProperty(codec); + LOG.info("getting decompressor with buffer size: {}", conf.getInt(bufferSizeProp, getDefaultBufferSize(conf, codec))); + return CodecPool.getDecompressor(codec); + } + + public static String getBufferSizeProperty(CompressionCodec codec) { + return getBufferSizeProperty(codec.getClass().getName()); + } + + public static String getBufferSizeProperty(String codecClassName) { + switch (codecClassName) { + case "org.apache.hadoop.io.compress.DefaultCodec": + case "org.apache.hadoop.io.compress.BZip2Codec": + case "org.apache.hadoop.io.compress.GzipCodec": + return CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY; + case "org.apache.hadoop.io.compress.SnappyCodec": + return CommonConfigurationKeys.IO_COMPRESSION_CODEC_SNAPPY_BUFFERSIZE_KEY; + case "org.apache.hadoop.io.compress.ZStandardCodec": + return CommonConfigurationKeys.IO_COMPRESSION_CODEC_ZSTD_BUFFER_SIZE_KEY; + case "org.apache.hadoop.io.compress.LzoCodec": + case "com.hadoop.compression.lzo.LzoCodec": + return CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZO_BUFFERSIZE_KEY; + case "org.apache.hadoop.io.compress.Lz4Codec": + return CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZ4_BUFFERSIZE_KEY; + default: + return null; + } + } + + public static int getDefaultBufferSize(Configuration conf, CompressionCodec codec) { + return getDefaultBufferSize(conf, codec.getClass().getName()); + } + + public static int getDefaultBufferSize(Configuration conf, String codecClassName) { + switch (codecClassName) { + case "org.apache.hadoop.io.compress.DefaultCodec": + case "org.apache.hadoop.io.compress.BZip2Codec": + case "org.apache.hadoop.io.compress.GzipCodec": + return CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT; + case "org.apache.hadoop.io.compress.SnappyCodec": + return CommonConfigurationKeys.IO_COMPRESSION_CODEC_SNAPPY_BUFFERSIZE_DEFAULT; + case "org.apache.hadoop.io.compress.ZStandardCodec": + return CommonConfigurationKeys.IO_COMPRESSION_CODEC_ZSTD_BUFFER_SIZE_DEFAULT; + case "org.apache.hadoop.io.compress.LzoCodec": + case "com.hadoop.compression.lzo.LzoCodec": + return CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZO_BUFFERSIZE_DEFAULT; + case "org.apache.hadoop.io.compress.Lz4Codec": + return CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZ4_BUFFERSIZE_DEFAULT; + default: + return DEFAULT_BUFFER_SIZE; + } + } } \ No newline at end of file diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestIFile.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestIFile.java index bf35955625..960aee345a 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestIFile.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestIFile.java @@ -57,7 +57,6 @@ import org.apache.hadoop.util.NativeCodeLoader; import org.apache.tez.common.TezRuntimeFrameworkConfigs; import org.apache.tez.runtime.library.common.InputAttemptIdentifier; -import org.apache.tez.runtime.library.common.TezRuntimeUtils; import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.InMemoryReader; import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.InMemoryWriter; import org.apache.tez.runtime.library.common.sort.impl.IFile.Reader; @@ -66,6 +65,7 @@ import org.apache.tez.runtime.library.testutils.KVDataGen; import org.apache.tez.runtime.library.testutils.KVDataGen.KVPair; import org.apache.tez.runtime.library.utils.BufferUtils; +import org.apache.tez.runtime.library.utils.CodecUtils; import org.junit.After; import org.junit.Assert; import org.junit.Assume; @@ -734,7 +734,7 @@ public void testReadToDisk() throws IOException { public void testInMemoryBufferSize() throws IOException { Configurable configurableCodec = (Configurable) codec; int originalCodecBufferSize = - configurableCodec.getConf().getInt(TezRuntimeUtils.getBufferSizeProperty(codec), -1); + configurableCodec.getConf().getInt(CodecUtils.getBufferSizeProperty(codec), -1); // for smaller amount of data, codec buffer should be sized according to compressed data length List data = KVDataGen.generateTestData(false, rnd.nextInt(100)); @@ -742,7 +742,7 @@ public void testInMemoryBufferSize() throws IOException { readAndVerifyData(writer.getRawLength(), writer.getCompressedLength(), data, codec); Assert.assertEquals(originalCodecBufferSize, // original size is repaired - configurableCodec.getConf().getInt(TezRuntimeUtils.getBufferSizeProperty(codec), 0)); + configurableCodec.getConf().getInt(CodecUtils.getBufferSizeProperty(codec), 0)); // buffer size cannot grow infinitely with compressed data size data = KVDataGen.generateTestDataOfKeySize(false, 20000, rnd.nextInt(100)); @@ -750,7 +750,7 @@ public void testInMemoryBufferSize() throws IOException { readAndVerifyData(writer.getRawLength(), writer.getCompressedLength(), data, codec); Assert.assertEquals(originalCodecBufferSize, // original size is repaired - configurableCodec.getConf().getInt(TezRuntimeUtils.getBufferSizeProperty(codec), 0)); + configurableCodec.getConf().getInt(CodecUtils.getBufferSizeProperty(codec), 0)); } @Test(expected = IllegalArgumentException.class) @@ -766,7 +766,7 @@ private void tryWriteFileWithBufferSize(int bufferSize, String codecClassName) Configuration conf = new Configuration(); System.out.println("trying with buffer size: " + bufferSize); - conf.set(TezRuntimeUtils.getBufferSizeProperty(codecClassName), Integer.toString(bufferSize)); + conf.set(CodecUtils.getBufferSizeProperty(codecClassName), Integer.toString(bufferSize)); CompressionCodecFactory codecFactory = new CompressionCodecFactory(conf); CompressionCodec codecToTest = codecFactory.getCodecByClassName(codecClassName); diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/utils/TestCodecUtils.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/utils/TestCodecUtils.java new file mode 100644 index 0000000000..20d62f0660 --- /dev/null +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/utils/TestCodecUtils.java @@ -0,0 +1,196 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.tez.runtime.library.utils; + +import java.io.IOException; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeys; +import org.apache.hadoop.fs.CommonConfigurationKeysPublic; +import org.apache.hadoop.io.compress.BZip2Codec; +import org.apache.hadoop.io.compress.CodecPool; +import org.apache.hadoop.io.compress.CompressionCodec; +import org.apache.hadoop.io.compress.Compressor; +import org.apache.hadoop.io.compress.Decompressor; +import org.apache.hadoop.io.compress.DefaultCodec; +import org.apache.hadoop.io.compress.GzipCodec; +import org.apache.hadoop.io.compress.Lz4Codec; +import org.apache.hadoop.io.compress.SnappyCodec; +import org.apache.hadoop.io.compress.ZStandardCodec; +import org.apache.tez.runtime.library.api.TezRuntimeConfiguration; +import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.DummyCompressionCodec; +import org.apache.tez.runtime.library.common.sort.impl.IFileInputStream; +import org.junit.Assert; +import org.junit.Test; +import org.mockito.Mockito; + +public class TestCodecUtils { + + @Test + public void testConcurrentDecompressorCreationWithModifiedBuffersize() throws Exception { + int modifiedBufferSize = 1000; + int numberOfThreads = 1000; + + ExecutorService service = Executors.newFixedThreadPool(numberOfThreads); + + Configuration conf = new Configuration(); + conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_COMPRESS, true); + CompressionCodec codec = CodecUtils.getCodec(conf); + + Future[] futures = new Future[numberOfThreads]; + final CountDownLatch latch = new CountDownLatch(1); + + for (int i = 0; i < numberOfThreads; i++) { + futures[i] = service.submit(() -> { + try { + waitForLatch(latch); + + Assert.assertEquals(Integer.toString(CommonConfigurationKeys.IO_FILE_BUFFER_SIZE_DEFAULT), + conf.get(CommonConfigurationKeys.IO_FILE_BUFFER_SIZE_KEY)); + + Decompressor decompressor = CodecUtils.getDecompressor(codec); + CodecUtils.getDecompressedInputStreamWithBufferSize(codec, + Mockito.mock(IFileInputStream.class), decompressor, modifiedBufferSize); + + Assert.assertEquals(Integer.toString(CommonConfigurationKeys.IO_FILE_BUFFER_SIZE_DEFAULT), + conf.get(CommonConfigurationKeys.IO_FILE_BUFFER_SIZE_KEY)); + + CodecPool.returnDecompressor(decompressor); + } catch (IOException e) { + throw new RuntimeException(e); + } + }); + } + latch.countDown(); + + for (Future f : futures) { + f.get(); + } + } + + @Test + public void testConcurrentCompressorDecompressorCreation() throws Exception { + int modifiedBufferSize = 1000; + int numberOfThreads = 1000; + + ExecutorService service = Executors.newFixedThreadPool(numberOfThreads); + + Configuration conf = new Configuration(); + conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_COMPRESS, true); + CompressionCodec codec = CodecUtils.getCodec(conf); + + Future[] futures = new Future[numberOfThreads]; + final CountDownLatch latch = new CountDownLatch(1); + + for (int i = 0; i < numberOfThreads; i++) { + // let's "randomly" choose from scenarios and test them concurrently + // 1. getDecompressedInputStreamWithBufferSize + if (i % 3 == 0) { + futures[i] = service.submit(() -> { + try { + waitForLatch(latch); + + Assert.assertEquals( + Integer.toString(CommonConfigurationKeys.IO_FILE_BUFFER_SIZE_DEFAULT), + conf.get(CommonConfigurationKeys.IO_FILE_BUFFER_SIZE_KEY)); + + Decompressor decompressor = CodecUtils.getDecompressor(codec); + CodecUtils.getDecompressedInputStreamWithBufferSize(codec, + Mockito.mock(IFileInputStream.class), decompressor, modifiedBufferSize); + + Assert.assertEquals( + Integer.toString(CommonConfigurationKeys.IO_FILE_BUFFER_SIZE_DEFAULT), + conf.get(CommonConfigurationKeys.IO_FILE_BUFFER_SIZE_KEY)); + + CodecPool.returnDecompressor(decompressor); + } catch (IOException e) { + throw new RuntimeException(e); + } + }); + // 2. getCompressor + } else if (i % 3 == 1) { + futures[i] = service.submit(() -> { + waitForLatch(latch); + + Assert.assertEquals(Integer.toString(CommonConfigurationKeys.IO_FILE_BUFFER_SIZE_DEFAULT), + conf.get(CommonConfigurationKeys.IO_FILE_BUFFER_SIZE_KEY)); + + Compressor compressor = CodecUtils.getCompressor(codec); + + Assert.assertEquals(Integer.toString(CommonConfigurationKeys.IO_FILE_BUFFER_SIZE_DEFAULT), + conf.get(CommonConfigurationKeys.IO_FILE_BUFFER_SIZE_KEY)); + + CodecPool.returnCompressor(compressor); + + }); + // 3. getDecompressor + } else if (i % 3 == 2) { + futures[i] = service.submit(() -> { + waitForLatch(latch); + + Assert.assertEquals(Integer.toString(CommonConfigurationKeys.IO_FILE_BUFFER_SIZE_DEFAULT), + conf.get(CommonConfigurationKeys.IO_FILE_BUFFER_SIZE_KEY)); + + Decompressor decompressor = CodecUtils.getDecompressor(codec); + + Assert.assertEquals(Integer.toString(CommonConfigurationKeys.IO_FILE_BUFFER_SIZE_DEFAULT), + conf.get(CommonConfigurationKeys.IO_FILE_BUFFER_SIZE_KEY)); + + CodecPool.returnDecompressor(decompressor); + }); + } + } + latch.countDown(); + + for (Future f : futures) { + f.get(); + } + } + + @Test + public void testDefaultBufferSize() { + Configuration conf = new Configuration(); // config with no buffersize set + + Assert.assertEquals(CodecUtils.DEFAULT_BUFFER_SIZE, + CodecUtils.getDefaultBufferSize(conf, new DummyCompressionCodec())); + Assert.assertEquals(CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT, + CodecUtils.getDefaultBufferSize(conf, new DefaultCodec())); + Assert.assertEquals(CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT, + CodecUtils.getDefaultBufferSize(conf, new BZip2Codec())); + Assert.assertEquals(CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT, + CodecUtils.getDefaultBufferSize(conf, new GzipCodec())); + Assert.assertEquals(CommonConfigurationKeys.IO_COMPRESSION_CODEC_SNAPPY_BUFFERSIZE_DEFAULT, + CodecUtils.getDefaultBufferSize(conf, new SnappyCodec())); + Assert.assertEquals(CommonConfigurationKeys.IO_COMPRESSION_CODEC_ZSTD_BUFFER_SIZE_DEFAULT, + CodecUtils.getDefaultBufferSize(conf, new ZStandardCodec())); + Assert.assertEquals(CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZ4_BUFFERSIZE_DEFAULT, + CodecUtils.getDefaultBufferSize(conf, new Lz4Codec())); + } + + private void waitForLatch(CountDownLatch latch) { + try { + latch.await(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } +}