From d823bbafe8c782d01fdcf9255c2d32f1ff37f672 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Thu, 14 Jun 2018 14:41:41 -0600 Subject: [PATCH 1/9] Work on implementing reference counting for http reading --- .../nio/InboundChannelBuffer.java | 44 ++++++++++++++- .../elasticsearch/http/nio/PageByteBuf.java | 53 +++++++++++++++++++ 2 files changed, 96 insertions(+), 1 deletion(-) create mode 100644 plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/PageByteBuf.java diff --git a/libs/nio/src/main/java/org/elasticsearch/nio/InboundChannelBuffer.java b/libs/nio/src/main/java/org/elasticsearch/nio/InboundChannelBuffer.java index f671b39d4d61b..2f8144a087a56 100644 --- a/libs/nio/src/main/java/org/elasticsearch/nio/InboundChannelBuffer.java +++ b/libs/nio/src/main/java/org/elasticsearch/nio/InboundChannelBuffer.java @@ -27,6 +27,7 @@ import java.util.Iterator; import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Supplier; /** @@ -185,6 +186,33 @@ public ByteBuffer[] sliceBuffersFrom(long from) { return buffers; } + public ByteBuffer[] sliceAndRetainPagesFrom(long from) { +// if (from > capacity) { +// throw new IndexOutOfBoundsException("can't slice a channel buffer with capacity [" + capacity + +// "], with slice parameters from [" + from + "]"); +// } else if (from == capacity) { +// return EMPTY_BYTE_BUFFER_ARRAY; +// } +// long indexWithOffset = from + offset; +// +// int pageIndex = pageIndex(indexWithOffset); +// int indexInPage = indexInPage(indexWithOffset); +// +// ByteBuffer[] buffers = new Page[pages.size() - pageIndex]; +// Iterator pageIterator = pages.descendingIterator(); +// for (int i = buffers.length - 1; i > 0; --i) { +// buffers[i] = pageIterator.next().byteBuffer.duplicate(); +// } +// ByteBuffer firstPostIndexBuffer = pageIterator.next().byteBuffer.duplicate(); +// firstPostIndexBuffer.position(firstPostIndexBuffer.position() + indexInPage); +// buffers[0] = firstPostIndexBuffer; +// +// return buffers; + return null; + } + + + public void incrementIndex(long delta) { if (delta < 0) { throw new IllegalArgumentException("Cannot increment an index with a negative delta [" + delta + "]"); @@ -232,15 +260,29 @@ public static class Page implements AutoCloseable { private final ByteBuffer byteBuffer; private final Runnable closeable; + private final AtomicInteger referenceCount; public Page(ByteBuffer byteBuffer, Runnable closeable) { + this(byteBuffer, closeable, new AtomicInteger(1)); + } + + private Page(ByteBuffer byteBuffer, Runnable closeable, AtomicInteger referenceCount) { this.byteBuffer = byteBuffer; this.closeable = closeable; + this.referenceCount = referenceCount; + } + + public ByteBuffer getByteBuffer() { + return byteBuffer; } @Override public void close() { - closeable.run(); + int newReferenceCount = referenceCount.decrementAndGet(); + assert newReferenceCount >= 0 : "Reference count should never be less than 0. Found: [" + newReferenceCount + "]."; + if (newReferenceCount == 0) { + closeable.run(); + } } } } diff --git a/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/PageByteBuf.java b/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/PageByteBuf.java new file mode 100644 index 0000000000000..212b3e7bfd144 --- /dev/null +++ b/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/PageByteBuf.java @@ -0,0 +1,53 @@ +package org.elasticsearch.http.nio; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.CompositeByteBuf; +import io.netty.buffer.UnpooledByteBufAllocator; +import io.netty.buffer.UnpooledHeapByteBuf; +import org.elasticsearch.nio.InboundChannelBuffer; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; + +public class PageByteBuf extends UnpooledHeapByteBuf { + + private final Runnable releasable; + + private PageByteBuf(byte[] array, Runnable releasable) { + super(UnpooledByteBufAllocator.DEFAULT, array, array.length); + this.releasable = releasable; + } + + public static ByteBuf byteBufFromPages(InboundChannelBuffer.Page[] pages) { + int componentCount = pages.length; + if (componentCount == 1) { + return byteBufFromPage(pages[0]); + } else { + int maxComponents = Math.max(16, componentCount); + final List components = new ArrayList<>(componentCount); + for (InboundChannelBuffer.Page page: pages) { + components.add(byteBufFromPage(page)); + } + return new CompositeByteBuf(UnpooledByteBufAllocator.DEFAULT, false, maxComponents, components); + } + } + + public static ByteBuf byteBufFromPage(InboundChannelBuffer.Page page) { + ByteBuffer buffer = page.getByteBuffer(); + assert !buffer.isDirect() && buffer.hasArray() : "Must be a heap buffer with an array"; + int offset = buffer.arrayOffset() + buffer.position(); + PageByteBuf newByteBuf = new PageByteBuf(buffer.array(), page::close); + return newByteBuf.slice(offset, buffer.remaining()); + } + + + @Override + protected void deallocate() { + try { + super.deallocate(); + } finally { + releasable.run(); + } + } +} From 4aa8dba42cdd7a8fd58015d40598abb26149b35b Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Thu, 14 Jun 2018 14:54:59 -0600 Subject: [PATCH 2/9] WIP --- .../nio/InboundChannelBuffer.java | 81 +++++++++++++------ .../elasticsearch/http/nio/PageByteBuf.java | 9 ++- 2 files changed, 64 insertions(+), 26 deletions(-) diff --git a/libs/nio/src/main/java/org/elasticsearch/nio/InboundChannelBuffer.java b/libs/nio/src/main/java/org/elasticsearch/nio/InboundChannelBuffer.java index 2f8144a087a56..7d359aec8ee57 100644 --- a/libs/nio/src/main/java/org/elasticsearch/nio/InboundChannelBuffer.java +++ b/libs/nio/src/main/java/org/elasticsearch/nio/InboundChannelBuffer.java @@ -42,6 +42,7 @@ public final class InboundChannelBuffer implements AutoCloseable { private static final int PAGE_MASK = PAGE_SIZE - 1; private static final int PAGE_SHIFT = Integer.numberOfTrailingZeros(PAGE_SIZE); private static final ByteBuffer[] EMPTY_BYTE_BUFFER_ARRAY = new ByteBuffer[0]; + private static final Page[] EMPTY_BYTE_PAGE_ARRAY = new Page[0]; private final ArrayDeque pages; @@ -153,6 +154,36 @@ public ByteBuffer[] sliceBuffersTo(long to) { return buffers; } + public Page[] sliceAndRetainPagesTo(long to) { + if (to > capacity) { + throw new IndexOutOfBoundsException("can't slice a channel buffer with capacity [" + capacity + + "], with slice parameters to [" + to + "]"); + } else if (to == 0) { + return EMPTY_BYTE_PAGE_ARRAY; + } + long indexWithOffset = to + offset; + int pageCount = pageIndex(indexWithOffset); + int finalLimit = indexInPage(indexWithOffset); + if (finalLimit != 0) { + pageCount += 1; + } + + Page[] pages = new Page[pageCount]; + Iterator pageIterator = this.pages.iterator(); + Page firstPage = pageIterator.next().duplicate(); + ByteBuffer firstBuffer = firstPage.byteBuffer; + firstBuffer.position(firstBuffer.position() + offset); + pages[0] = firstPage; + for (int i = 1; i < pages.length; i++) { + pages[i] = pageIterator.next().duplicate(); + } + if (finalLimit != 0) { + pages[pages.length - 1].byteBuffer.limit(finalLimit); + } + + return pages; + } + /** * This method will return an array of {@link ByteBuffer} representing the bytes from the index passed * through the end of this buffer. The buffers will be duplicates of the internal buffers, so any @@ -186,29 +217,28 @@ public ByteBuffer[] sliceBuffersFrom(long from) { return buffers; } - public ByteBuffer[] sliceAndRetainPagesFrom(long from) { -// if (from > capacity) { -// throw new IndexOutOfBoundsException("can't slice a channel buffer with capacity [" + capacity + -// "], with slice parameters from [" + from + "]"); -// } else if (from == capacity) { -// return EMPTY_BYTE_BUFFER_ARRAY; -// } -// long indexWithOffset = from + offset; -// -// int pageIndex = pageIndex(indexWithOffset); -// int indexInPage = indexInPage(indexWithOffset); -// -// ByteBuffer[] buffers = new Page[pages.size() - pageIndex]; -// Iterator pageIterator = pages.descendingIterator(); -// for (int i = buffers.length - 1; i > 0; --i) { -// buffers[i] = pageIterator.next().byteBuffer.duplicate(); -// } -// ByteBuffer firstPostIndexBuffer = pageIterator.next().byteBuffer.duplicate(); -// firstPostIndexBuffer.position(firstPostIndexBuffer.position() + indexInPage); -// buffers[0] = firstPostIndexBuffer; -// -// return buffers; - return null; + public Page[] sliceAndRetainPagesFrom(long from) { + if (from > capacity) { + throw new IndexOutOfBoundsException("can't slice a channel buffer with capacity [" + capacity + + "], with slice parameters from [" + from + "]"); + } else if (from == capacity) { + return EMPTY_BYTE_PAGE_ARRAY; + } + long indexWithOffset = from + offset; + + int pageIndex = pageIndex(indexWithOffset); + int indexInPage = indexInPage(indexWithOffset); + + Page[] pages = new Page[this.pages.size() - pageIndex]; + Iterator pageIterator = this.pages.descendingIterator(); + for (int i = pages.length - 1; i > 0; --i) { + pages[i] = pageIterator.next().duplicate(); + } + Page firstPostIndexPage = pageIterator.next().duplicate(); + ByteBuffer firstPostIndexBuffer = firstPostIndexPage.byteBuffer; + firstPostIndexBuffer.position(firstPostIndexBuffer.position() + indexInPage); + pages[0] = firstPostIndexPage; + return pages; } @@ -272,6 +302,11 @@ private Page(ByteBuffer byteBuffer, Runnable closeable, AtomicInteger referenceC this.referenceCount = referenceCount; } + private Page duplicate() { + referenceCount.incrementAndGet(); + return new Page(byteBuffer.duplicate(), closeable, referenceCount); + } + public ByteBuffer getByteBuffer() { return byteBuffer; } diff --git a/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/PageByteBuf.java b/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/PageByteBuf.java index 212b3e7bfd144..c6e71c0f324ad 100644 --- a/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/PageByteBuf.java +++ b/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/PageByteBuf.java @@ -2,6 +2,7 @@ import io.netty.buffer.ByteBuf; import io.netty.buffer.CompositeByteBuf; +import io.netty.buffer.Unpooled; import io.netty.buffer.UnpooledByteBufAllocator; import io.netty.buffer.UnpooledHeapByteBuf; import org.elasticsearch.nio.InboundChannelBuffer; @@ -21,19 +22,21 @@ private PageByteBuf(byte[] array, Runnable releasable) { public static ByteBuf byteBufFromPages(InboundChannelBuffer.Page[] pages) { int componentCount = pages.length; - if (componentCount == 1) { + if (componentCount == 0) { + return Unpooled.EMPTY_BUFFER; + } else if (componentCount == 1) { return byteBufFromPage(pages[0]); } else { int maxComponents = Math.max(16, componentCount); final List components = new ArrayList<>(componentCount); - for (InboundChannelBuffer.Page page: pages) { + for (InboundChannelBuffer.Page page : pages) { components.add(byteBufFromPage(page)); } return new CompositeByteBuf(UnpooledByteBufAllocator.DEFAULT, false, maxComponents, components); } } - public static ByteBuf byteBufFromPage(InboundChannelBuffer.Page page) { + private static ByteBuf byteBufFromPage(InboundChannelBuffer.Page page) { ByteBuffer buffer = page.getByteBuffer(); assert !buffer.isDirect() && buffer.hasArray() : "Must be a heap buffer with an array"; int offset = buffer.arrayOffset() + buffer.position(); From 9e51ba2365efcefc16aac75d01be44b68b2acfb4 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Thu, 14 Jun 2018 15:32:12 -0600 Subject: [PATCH 3/9] Hook up reference counting --- .../elasticsearch/transport/Netty4Plugin.java | 2 +- .../http/nio/HttpReadWriteHandler.java | 2 +- .../elasticsearch/http/nio/NettyAdaptor.java | 8 +++++++ .../http/nio/NioHttpServerTransport.java | 16 +++++++++++--- .../elasticsearch/http/nio/PageByteBuf.java | 19 ++++++++++++++++ .../transport/nio/NioTransportPlugin.java | 5 +++-- .../http/nio/NioHttpServerTransportTests.java | 22 ++++++++++--------- .../common/network/NetworkModule.java | 2 +- .../elasticsearch/plugins/NetworkPlugin.java | 2 +- .../common/network/NetworkModuleTests.java | 6 ++--- .../core/LocalStateCompositeXPackPlugin.java | 4 ++-- .../xpack/security/Security.java | 2 +- 12 files changed, 65 insertions(+), 25 deletions(-) diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/Netty4Plugin.java b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/Netty4Plugin.java index c6655b58bc3bd..70afcc86ad8f9 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/Netty4Plugin.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/Netty4Plugin.java @@ -87,8 +87,8 @@ public Map> getTransports(Settings settings, ThreadP @Override public Map> getHttpTransports(Settings settings, ThreadPool threadPool, BigArrays bigArrays, + PageCacheRecycler pageCacheRecycler, CircuitBreakerService circuitBreakerService, - NamedWriteableRegistry namedWriteableRegistry, NamedXContentRegistry xContentRegistry, NetworkService networkService, HttpServerTransport.Dispatcher dispatcher) { diff --git a/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/HttpReadWriteHandler.java b/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/HttpReadWriteHandler.java index 05f28e8254aa1..ec01eeb4ba863 100644 --- a/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/HttpReadWriteHandler.java +++ b/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/HttpReadWriteHandler.java @@ -94,7 +94,7 @@ public class HttpReadWriteHandler implements ReadWriteHandler { @Override public int consumeReads(InboundChannelBuffer channelBuffer) throws IOException { - int bytesConsumed = adaptor.read(channelBuffer.sliceBuffersTo(channelBuffer.getIndex())); + int bytesConsumed = adaptor.read(channelBuffer.sliceAndRetainPagesTo(channelBuffer.getIndex())); Object message; while ((message = adaptor.pollInboundMessage()) != null) { handleRequest(message); diff --git a/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/NettyAdaptor.java b/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/NettyAdaptor.java index cf8c92bff905c..f54501532445f 100644 --- a/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/NettyAdaptor.java +++ b/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/NettyAdaptor.java @@ -29,6 +29,7 @@ import io.netty.channel.embedded.EmbeddedChannel; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.nio.FlushOperation; +import org.elasticsearch.nio.InboundChannelBuffer; import org.elasticsearch.nio.WriteOperation; import java.nio.ByteBuffer; @@ -97,6 +98,13 @@ public int read(ByteBuffer[] buffers) { return byteBuf.readerIndex() - initialReaderIndex; } + public int read(InboundChannelBuffer.Page[] pages) { + ByteBuf byteBuf = PageByteBuf.byteBufFromPages(pages); + int readableBytes = byteBuf.readableBytes(); + nettyChannel.writeInbound(byteBuf); + return readableBytes; + } + public Object pollInboundMessage() { return nettyChannel.readInbound(); } diff --git a/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/NioHttpServerTransport.java b/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/NioHttpServerTransport.java index 57aaebb16a1a2..b2cedca29cbc9 100644 --- a/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/NioHttpServerTransport.java +++ b/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/NioHttpServerTransport.java @@ -32,12 +32,14 @@ import org.elasticsearch.common.Strings; import org.elasticsearch.common.network.NetworkAddress; import org.elasticsearch.common.network.NetworkService; +import org.elasticsearch.common.recycler.Recycler; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.NetworkExceptionHelper; import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.common.util.PageCacheRecycler; import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.http.AbstractHttpServerTransport; @@ -64,6 +66,7 @@ import java.io.IOException; import java.net.InetAddress; import java.net.InetSocketAddress; +import java.nio.ByteBuffer; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.ArrayList; @@ -107,6 +110,7 @@ public class NioHttpServerTransport extends AbstractHttpServerTransport { private final BigArrays bigArrays; private final ThreadPool threadPool; private final NamedXContentRegistry xContentRegistry; + private final PageCacheRecycler pageCacheRecycler; private final HttpHandlingSettings httpHandlingSettings; @@ -122,12 +126,14 @@ public class NioHttpServerTransport extends AbstractHttpServerTransport { private HttpChannelFactory channelFactory; private final NioCorsConfig corsConfig; - public NioHttpServerTransport(Settings settings, NetworkService networkService, BigArrays bigArrays, ThreadPool threadPool, - NamedXContentRegistry xContentRegistry, HttpServerTransport.Dispatcher dispatcher) { + public NioHttpServerTransport(Settings settings, NetworkService networkService, BigArrays bigArrays, + PageCacheRecycler pageCacheRecycler, ThreadPool threadPool, NamedXContentRegistry xContentRegistry, + HttpServerTransport.Dispatcher dispatcher) { super(settings, networkService, threadPool, dispatcher); this.bigArrays = bigArrays; this.threadPool = threadPool; this.xContentRegistry = xContentRegistry; + this.pageCacheRecycler = pageCacheRecycler; ByteSizeValue maxChunkSize = SETTING_HTTP_MAX_CHUNK_SIZE.get(settings); ByteSizeValue maxHeaderSize = SETTING_HTTP_MAX_HEADER_SIZE.get(settings); @@ -344,11 +350,15 @@ private HttpChannelFactory() { @Override public NioSocketChannel createChannel(NioSelector selector, SocketChannel channel) throws IOException { NioSocketChannel nioChannel = new NioSocketChannel(channel); + java.util.function.Supplier pageSupplier = () -> { + Recycler.V bytes = pageCacheRecycler.bytePage(false); + return new InboundChannelBuffer.Page(ByteBuffer.wrap(bytes.v()), bytes::close); + }; HttpReadWriteHandler httpReadWritePipeline = new HttpReadWriteHandler(nioChannel,NioHttpServerTransport.this, httpHandlingSettings, xContentRegistry, corsConfig, threadPool.getThreadContext()); Consumer exceptionHandler = (e) -> exceptionCaught(nioChannel, e); SocketChannelContext context = new BytesChannelContext(nioChannel, selector, exceptionHandler, httpReadWritePipeline, - InboundChannelBuffer.allocatingInstance()); + new InboundChannelBuffer(pageSupplier)); nioChannel.setContext(context); return nioChannel; } diff --git a/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/PageByteBuf.java b/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/PageByteBuf.java index c6e71c0f324ad..e4542786ae16d 100644 --- a/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/PageByteBuf.java +++ b/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/PageByteBuf.java @@ -1,3 +1,22 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + package org.elasticsearch.http.nio; import io.netty.buffer.ByteBuf; diff --git a/plugins/transport-nio/src/main/java/org/elasticsearch/transport/nio/NioTransportPlugin.java b/plugins/transport-nio/src/main/java/org/elasticsearch/transport/nio/NioTransportPlugin.java index 1cc94f18dd3c1..1da8e909b2dd8 100644 --- a/plugins/transport-nio/src/main/java/org/elasticsearch/transport/nio/NioTransportPlugin.java +++ b/plugins/transport-nio/src/main/java/org/elasticsearch/transport/nio/NioTransportPlugin.java @@ -67,12 +67,13 @@ public Map> getTransports(Settings settings, ThreadP @Override public Map> getHttpTransports(Settings settings, ThreadPool threadPool, BigArrays bigArrays, + PageCacheRecycler pageCacheRecycler, CircuitBreakerService circuitBreakerService, - NamedWriteableRegistry namedWriteableRegistry, NamedXContentRegistry xContentRegistry, NetworkService networkService, HttpServerTransport.Dispatcher dispatcher) { return Collections.singletonMap(NIO_HTTP_TRANSPORT_NAME, - () -> new NioHttpServerTransport(settings, networkService, bigArrays, threadPool, xContentRegistry, dispatcher)); + () -> new NioHttpServerTransport(settings, networkService, bigArrays, pageCacheRecycler, threadPool, xContentRegistry, + dispatcher)); } } diff --git a/plugins/transport-nio/src/test/java/org/elasticsearch/http/nio/NioHttpServerTransportTests.java b/plugins/transport-nio/src/test/java/org/elasticsearch/http/nio/NioHttpServerTransportTests.java index c43fc7d072360..6796762279dcf 100644 --- a/plugins/transport-nio/src/test/java/org/elasticsearch/http/nio/NioHttpServerTransportTests.java +++ b/plugins/transport-nio/src/test/java/org/elasticsearch/http/nio/NioHttpServerTransportTests.java @@ -88,12 +88,14 @@ public class NioHttpServerTransportTests extends ESTestCase { private NetworkService networkService; private ThreadPool threadPool; private MockBigArrays bigArrays; + private MockPageCacheRecycler pageRecycler; @Before public void setup() throws Exception { networkService = new NetworkService(Collections.emptyList()); threadPool = new TestThreadPool("test"); - bigArrays = new MockBigArrays(new MockPageCacheRecycler(Settings.EMPTY), new NoneCircuitBreakerService()); + pageRecycler = new MockPageCacheRecycler(Settings.EMPTY); + bigArrays = new MockBigArrays(pageRecycler, new NoneCircuitBreakerService()); } @After @@ -186,7 +188,7 @@ public void dispatchBadRequest(RestRequest request, RestChannel channel, ThreadC throw new AssertionError(); } }; - try (NioHttpServerTransport transport = new NioHttpServerTransport(settings, networkService, bigArrays, threadPool, + try (NioHttpServerTransport transport = new NioHttpServerTransport(settings, networkService, bigArrays, pageRecycler, threadPool, xContentRegistry(), dispatcher)) { transport.start(); final TransportAddress remoteAddress = randomFrom(transport.boundAddress().boundAddresses()); @@ -210,13 +212,13 @@ public void dispatchBadRequest(RestRequest request, RestChannel channel, ThreadC } public void testBindUnavailableAddress() { - try (NioHttpServerTransport transport = new NioHttpServerTransport(Settings.EMPTY, networkService, bigArrays, threadPool, - xContentRegistry(), new NullDispatcher())) { + try (NioHttpServerTransport transport = new NioHttpServerTransport(Settings.EMPTY, networkService, bigArrays, pageRecycler, + threadPool, xContentRegistry(), new NullDispatcher())) { transport.start(); TransportAddress remoteAddress = randomFrom(transport.boundAddress().boundAddresses()); Settings settings = Settings.builder().put("http.port", remoteAddress.getPort()).build(); - try (NioHttpServerTransport otherTransport = new NioHttpServerTransport(settings, networkService, bigArrays, threadPool, - xContentRegistry(), new NullDispatcher())) { + try (NioHttpServerTransport otherTransport = new NioHttpServerTransport(settings, networkService, bigArrays, pageRecycler, + threadPool, xContentRegistry(), new NullDispatcher())) { BindHttpException bindHttpException = expectThrows(BindHttpException.class, () -> otherTransport.start()); assertEquals("Failed to bind to [" + remoteAddress.getPort() + "]", bindHttpException.getMessage()); } @@ -259,8 +261,8 @@ public void dispatchBadRequest(final RestRequest request, settings = Settings.builder().put(httpMaxInitialLineLengthSetting.getKey(), maxInitialLineLength + "b").build(); } - try (NioHttpServerTransport transport = - new NioHttpServerTransport(settings, networkService, bigArrays, threadPool, xContentRegistry(), dispatcher)) { + try (NioHttpServerTransport transport = new NioHttpServerTransport(settings, networkService, bigArrays, pageRecycler, + threadPool, xContentRegistry(), dispatcher)) { transport.start(); final TransportAddress remoteAddress = randomFrom(transport.boundAddress().boundAddresses()); @@ -300,8 +302,8 @@ public void dispatchBadRequest(final RestRequest request, }; - try (NioHttpServerTransport transport = - new NioHttpServerTransport(Settings.EMPTY, networkService, bigArrays, threadPool, xContentRegistry(), dispatcher)) { + try (NioHttpServerTransport transport = new NioHttpServerTransport(Settings.EMPTY, networkService, bigArrays, pageRecycler, + threadPool, xContentRegistry(), dispatcher)) { transport.start(); transport.dispatchRequest(null, null); diff --git a/server/src/main/java/org/elasticsearch/common/network/NetworkModule.java b/server/src/main/java/org/elasticsearch/common/network/NetworkModule.java index 70d26770a7bdc..cd8141ffa3c91 100644 --- a/server/src/main/java/org/elasticsearch/common/network/NetworkModule.java +++ b/server/src/main/java/org/elasticsearch/common/network/NetworkModule.java @@ -116,7 +116,7 @@ public NetworkModule(Settings settings, boolean transportClient, List> httpTransportFactory = plugin.getHttpTransports(settings, threadPool, bigArrays, - circuitBreakerService, namedWriteableRegistry, xContentRegistry, networkService, dispatcher); + pageCacheRecycler, circuitBreakerService, xContentRegistry, networkService, dispatcher); if (transportClient == false) { for (Map.Entry> entry : httpTransportFactory.entrySet()) { registerHttpTransport(entry.getKey(), entry.getValue()); diff --git a/server/src/main/java/org/elasticsearch/plugins/NetworkPlugin.java b/server/src/main/java/org/elasticsearch/plugins/NetworkPlugin.java index df41036ffeabb..d33997fc82b99 100644 --- a/server/src/main/java/org/elasticsearch/plugins/NetworkPlugin.java +++ b/server/src/main/java/org/elasticsearch/plugins/NetworkPlugin.java @@ -71,8 +71,8 @@ default Map> getTransports(Settings settings, Thread * See {@link org.elasticsearch.common.network.NetworkModule#HTTP_TYPE_SETTING} to configure a specific implementation. */ default Map> getHttpTransports(Settings settings, ThreadPool threadPool, BigArrays bigArrays, + PageCacheRecycler pageCacheRecycler, CircuitBreakerService circuitBreakerService, - NamedWriteableRegistry namedWriteableRegistry, NamedXContentRegistry xContentRegistry, NetworkService networkService, HttpServerTransport.Dispatcher dispatcher) { diff --git a/server/src/test/java/org/elasticsearch/common/network/NetworkModuleTests.java b/server/src/test/java/org/elasticsearch/common/network/NetworkModuleTests.java index ba74e373f8842..8a4eb8e9177f1 100644 --- a/server/src/test/java/org/elasticsearch/common/network/NetworkModuleTests.java +++ b/server/src/test/java/org/elasticsearch/common/network/NetworkModuleTests.java @@ -159,8 +159,8 @@ public void testRegisterHttpTransport() { @Override public Map> getHttpTransports(Settings settings, ThreadPool threadPool, BigArrays bigArrays, + PageCacheRecycler pageCacheRecycler, CircuitBreakerService circuitBreakerService, - NamedWriteableRegistry namedWriteableRegistry, NamedXContentRegistry xContentRegistry, NetworkService networkService, HttpServerTransport.Dispatcher requestDispatcher) { @@ -198,8 +198,8 @@ public Map> getTransports(Settings settings, ThreadP @Override public Map> getHttpTransports(Settings settings, ThreadPool threadPool, BigArrays bigArrays, + PageCacheRecycler pageCacheRecycler, CircuitBreakerService circuitBreakerService, - NamedWriteableRegistry namedWriteableRegistry, NamedXContentRegistry xContentRegistry, NetworkService networkService, HttpServerTransport.Dispatcher requestDispatcher) { @@ -233,8 +233,8 @@ public Map> getTransports(Settings settings, ThreadP @Override public Map> getHttpTransports(Settings settings, ThreadPool threadPool, BigArrays bigArrays, + PageCacheRecycler pageCacheRecycler, CircuitBreakerService circuitBreakerService, - NamedWriteableRegistry namedWriteableRegistry, NamedXContentRegistry xContentRegistry, NetworkService networkService, HttpServerTransport.Dispatcher requestDispatcher) { diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/LocalStateCompositeXPackPlugin.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/LocalStateCompositeXPackPlugin.java index 44fd61e1693ad..796cae375e3a6 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/LocalStateCompositeXPackPlugin.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/LocalStateCompositeXPackPlugin.java @@ -286,14 +286,14 @@ public Map> getTransports(Settings settings, ThreadP @Override public Map> getHttpTransports(Settings settings, ThreadPool threadPool, BigArrays bigArrays, + PageCacheRecycler pageCacheRecycler, CircuitBreakerService circuitBreakerService, - NamedWriteableRegistry namedWriteableRegistry, NamedXContentRegistry xContentRegistry, NetworkService networkService, HttpServerTransport.Dispatcher dispatcher) { Map> transports = new HashMap<>(); filterPlugins(NetworkPlugin.class).stream().forEach(p -> transports.putAll(p.getHttpTransports(settings, threadPool, bigArrays, - circuitBreakerService, namedWriteableRegistry, xContentRegistry, networkService, dispatcher))); + pageCacheRecycler, circuitBreakerService, xContentRegistry, networkService, dispatcher))); return transports; } diff --git a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/Security.java b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/Security.java index 664745b19204b..c0bd7882c419a 100644 --- a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/Security.java +++ b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/Security.java @@ -843,8 +843,8 @@ public Map> getTransports(Settings settings, ThreadP @Override public Map> getHttpTransports(Settings settings, ThreadPool threadPool, BigArrays bigArrays, + PageCacheRecycler pageCacheRecycler, CircuitBreakerService circuitBreakerService, - NamedWriteableRegistry namedWriteableRegistry, NamedXContentRegistry xContentRegistry, NetworkService networkService, HttpServerTransport.Dispatcher dispatcher) { From 46d7982b276a988bc37a8d9f9e4bbc0d7089c386 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Thu, 14 Jun 2018 15:50:05 -0600 Subject: [PATCH 4/9] Add tests --- .../nio/InboundChannelBuffer.java | 36 ++++--------- .../nio/InboundChannelBufferTests.java | 46 +++++++++++++++- .../http/nio/PageByteBufTests.java | 52 +++++++++++++++++++ 3 files changed, 107 insertions(+), 27 deletions(-) create mode 100644 plugins/transport-nio/src/test/java/org/elasticsearch/http/nio/PageByteBufTests.java diff --git a/libs/nio/src/main/java/org/elasticsearch/nio/InboundChannelBuffer.java b/libs/nio/src/main/java/org/elasticsearch/nio/InboundChannelBuffer.java index 7d359aec8ee57..1ed1743a8bc0b 100644 --- a/libs/nio/src/main/java/org/elasticsearch/nio/InboundChannelBuffer.java +++ b/libs/nio/src/main/java/org/elasticsearch/nio/InboundChannelBuffer.java @@ -154,6 +154,16 @@ public ByteBuffer[] sliceBuffersTo(long to) { return buffers; } + /** + * This method will return an array of {@link Page} representing the bytes from the beginning of + * this buffer up through the index argument that was passed. The pages and buffers will be duplicates of + * the internal components, so any modifications to the markers {@link ByteBuffer#position()}, + * {@link ByteBuffer#limit()}, etc will not modify the this class. Additionally, this will internally + * retain the underlying pages, so the pages returned by this method must be closed. + * + * @param to the index to slice up to + * @return the pages + */ public Page[] sliceAndRetainPagesTo(long to) { if (to > capacity) { throw new IndexOutOfBoundsException("can't slice a channel buffer with capacity [" + capacity + @@ -217,32 +227,6 @@ public ByteBuffer[] sliceBuffersFrom(long from) { return buffers; } - public Page[] sliceAndRetainPagesFrom(long from) { - if (from > capacity) { - throw new IndexOutOfBoundsException("can't slice a channel buffer with capacity [" + capacity + - "], with slice parameters from [" + from + "]"); - } else if (from == capacity) { - return EMPTY_BYTE_PAGE_ARRAY; - } - long indexWithOffset = from + offset; - - int pageIndex = pageIndex(indexWithOffset); - int indexInPage = indexInPage(indexWithOffset); - - Page[] pages = new Page[this.pages.size() - pageIndex]; - Iterator pageIterator = this.pages.descendingIterator(); - for (int i = pages.length - 1; i > 0; --i) { - pages[i] = pageIterator.next().duplicate(); - } - Page firstPostIndexPage = pageIterator.next().duplicate(); - ByteBuffer firstPostIndexBuffer = firstPostIndexPage.byteBuffer; - firstPostIndexBuffer.position(firstPostIndexBuffer.position() + indexInPage); - pages[0] = firstPostIndexPage; - return pages; - } - - - public void incrementIndex(long delta) { if (delta < 0) { throw new IllegalArgumentException("Cannot increment an index with a negative delta [" + delta + "]"); diff --git a/libs/nio/src/test/java/org/elasticsearch/nio/InboundChannelBufferTests.java b/libs/nio/src/test/java/org/elasticsearch/nio/InboundChannelBufferTests.java index 199a509cbfabb..8dd72e869e8d9 100644 --- a/libs/nio/src/test/java/org/elasticsearch/nio/InboundChannelBufferTests.java +++ b/libs/nio/src/test/java/org/elasticsearch/nio/InboundChannelBufferTests.java @@ -31,7 +31,8 @@ public class InboundChannelBufferTests extends ESTestCase { private static final int PAGE_SIZE = BigArrays.PAGE_SIZE_IN_BYTES; private final Supplier defaultPageSupplier = () -> - new InboundChannelBuffer.Page(ByteBuffer.allocate(BigArrays.BYTE_PAGE_SIZE), () -> {}); + new InboundChannelBuffer.Page(ByteBuffer.allocate(BigArrays.BYTE_PAGE_SIZE), () -> { + }); public void testNewBufferHasSinglePage() { InboundChannelBuffer channelBuffer = new InboundChannelBuffer(defaultPageSupplier); @@ -167,6 +168,49 @@ public void testClose() { expectThrows(IllegalStateException.class, () -> channelBuffer.ensureCapacity(1)); } + public void testCloseRetainedPages() { + ConcurrentLinkedQueue queue = new ConcurrentLinkedQueue<>(); + Supplier supplier = () -> { + AtomicBoolean atomicBoolean = new AtomicBoolean(); + queue.add(atomicBoolean); + return new InboundChannelBuffer.Page(ByteBuffer.allocate(PAGE_SIZE), () -> atomicBoolean.set(true)); + }; + InboundChannelBuffer channelBuffer = new InboundChannelBuffer(supplier); + channelBuffer.ensureCapacity(PAGE_SIZE * 4); + + assertEquals(4, queue.size()); + + for (AtomicBoolean closedRef : queue) { + assertFalse(closedRef.get()); + } + + InboundChannelBuffer.Page[] pages = channelBuffer.sliceAndRetainPagesTo(PAGE_SIZE * 2); + + pages[1].close(); + + for (AtomicBoolean closedRef : queue) { + assertFalse(closedRef.get()); + } + + channelBuffer.close(); + + int i = 0; + for (AtomicBoolean closedRef : queue) { + if (i < 1) { + assertFalse(closedRef.get()); + } else { + assertTrue(closedRef.get()); + } + ++i; + } + + pages[0].close(); + + for (AtomicBoolean closedRef : queue) { + assertTrue(closedRef.get()); + } + } + public void testAccessByteBuffers() { InboundChannelBuffer channelBuffer = new InboundChannelBuffer(defaultPageSupplier); diff --git a/plugins/transport-nio/src/test/java/org/elasticsearch/http/nio/PageByteBufTests.java b/plugins/transport-nio/src/test/java/org/elasticsearch/http/nio/PageByteBufTests.java new file mode 100644 index 0000000000000..14f8508e67863 --- /dev/null +++ b/plugins/transport-nio/src/test/java/org/elasticsearch/http/nio/PageByteBufTests.java @@ -0,0 +1,52 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.http.nio; + +import io.netty.buffer.ByteBuf; +import org.elasticsearch.nio.InboundChannelBuffer; +import org.elasticsearch.test.ESTestCase; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.concurrent.atomic.AtomicInteger; + +public class PageByteBufTests extends ESTestCase { + + public void testReleasingPage() { + AtomicInteger integer = new AtomicInteger(0); + int pageCount = randomInt(10) + 1; + ArrayList pages = new ArrayList<>(); + for (int i = 0; i < pageCount; ++i) { + pages.add(new InboundChannelBuffer.Page(ByteBuffer.allocate(10), integer::incrementAndGet)); + } + + ByteBuf byteBuf = PageByteBuf.byteBufFromPages(pages.toArray(new InboundChannelBuffer.Page[0])); + + assertEquals(0, integer.get()); + byteBuf.retain(); + byteBuf.release(); + assertEquals(0, integer.get()); + ByteBuf secondBuf = byteBuf.retainedSlice(); + byteBuf.release(); + assertEquals(0, integer.get()); + secondBuf.release(); + assertEquals(pageCount, integer.get()); + } +} From 72f45eebd3cae0aed4c3529b8576011f97419af0 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Thu, 14 Jun 2018 15:50:41 -0600 Subject: [PATCH 5/9] change names --- .../java/org/elasticsearch/http/nio/NettyAdaptor.java | 2 +- .../http/nio/{PageByteBuf.java => PagedByteBuf.java} | 8 ++++---- .../nio/{PageByteBufTests.java => PagedByteBufTests.java} | 4 ++-- 3 files changed, 7 insertions(+), 7 deletions(-) rename plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/{PageByteBuf.java => PagedByteBuf.java} (89%) rename plugins/transport-nio/src/test/java/org/elasticsearch/http/nio/{PageByteBufTests.java => PagedByteBufTests.java} (91%) diff --git a/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/NettyAdaptor.java b/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/NettyAdaptor.java index f54501532445f..41cb72aa32273 100644 --- a/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/NettyAdaptor.java +++ b/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/NettyAdaptor.java @@ -99,7 +99,7 @@ public int read(ByteBuffer[] buffers) { } public int read(InboundChannelBuffer.Page[] pages) { - ByteBuf byteBuf = PageByteBuf.byteBufFromPages(pages); + ByteBuf byteBuf = PagedByteBuf.byteBufFromPages(pages); int readableBytes = byteBuf.readableBytes(); nettyChannel.writeInbound(byteBuf); return readableBytes; diff --git a/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/PageByteBuf.java b/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/PagedByteBuf.java similarity index 89% rename from plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/PageByteBuf.java rename to plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/PagedByteBuf.java index e4542786ae16d..ce6f37baa02c5 100644 --- a/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/PageByteBuf.java +++ b/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/PagedByteBuf.java @@ -30,16 +30,16 @@ import java.util.ArrayList; import java.util.List; -public class PageByteBuf extends UnpooledHeapByteBuf { +public class PagedByteBuf extends UnpooledHeapByteBuf { private final Runnable releasable; - private PageByteBuf(byte[] array, Runnable releasable) { + private PagedByteBuf(byte[] array, Runnable releasable) { super(UnpooledByteBufAllocator.DEFAULT, array, array.length); this.releasable = releasable; } - public static ByteBuf byteBufFromPages(InboundChannelBuffer.Page[] pages) { + static ByteBuf byteBufFromPages(InboundChannelBuffer.Page[] pages) { int componentCount = pages.length; if (componentCount == 0) { return Unpooled.EMPTY_BUFFER; @@ -59,7 +59,7 @@ private static ByteBuf byteBufFromPage(InboundChannelBuffer.Page page) { ByteBuffer buffer = page.getByteBuffer(); assert !buffer.isDirect() && buffer.hasArray() : "Must be a heap buffer with an array"; int offset = buffer.arrayOffset() + buffer.position(); - PageByteBuf newByteBuf = new PageByteBuf(buffer.array(), page::close); + PagedByteBuf newByteBuf = new PagedByteBuf(buffer.array(), page::close); return newByteBuf.slice(offset, buffer.remaining()); } diff --git a/plugins/transport-nio/src/test/java/org/elasticsearch/http/nio/PageByteBufTests.java b/plugins/transport-nio/src/test/java/org/elasticsearch/http/nio/PagedByteBufTests.java similarity index 91% rename from plugins/transport-nio/src/test/java/org/elasticsearch/http/nio/PageByteBufTests.java rename to plugins/transport-nio/src/test/java/org/elasticsearch/http/nio/PagedByteBufTests.java index 14f8508e67863..b354a7d230766 100644 --- a/plugins/transport-nio/src/test/java/org/elasticsearch/http/nio/PageByteBufTests.java +++ b/plugins/transport-nio/src/test/java/org/elasticsearch/http/nio/PagedByteBufTests.java @@ -27,7 +27,7 @@ import java.util.ArrayList; import java.util.concurrent.atomic.AtomicInteger; -public class PageByteBufTests extends ESTestCase { +public class PagedByteBufTests extends ESTestCase { public void testReleasingPage() { AtomicInteger integer = new AtomicInteger(0); @@ -37,7 +37,7 @@ public void testReleasingPage() { pages.add(new InboundChannelBuffer.Page(ByteBuffer.allocate(10), integer::incrementAndGet)); } - ByteBuf byteBuf = PageByteBuf.byteBufFromPages(pages.toArray(new InboundChannelBuffer.Page[0])); + ByteBuf byteBuf = PagedByteBuf.byteBufFromPages(pages.toArray(new InboundChannelBuffer.Page[0])); assertEquals(0, integer.get()); byteBuf.retain(); From da37b44f06df3c39566e68eb868ff2243d67f451 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Fri, 15 Jun 2018 09:58:24 -0600 Subject: [PATCH 6/9] Chnages for review --- .../util/concurrent/AbstractRefCounted.java | 4 +- .../common/util/concurrent/RefCounted.java | 2 +- .../nio/InboundChannelBuffer.java | 38 ++++++++++++------ .../elasticsearch/http/nio/PagedByteBuf.java | 2 +- .../http/nio/PagedByteBufTests.java | 39 +++++++++++++++++++ 5 files changed, 68 insertions(+), 17 deletions(-) rename {server => libs/core}/src/main/java/org/elasticsearch/common/util/concurrent/AbstractRefCounted.java (92%) rename {server => libs/core}/src/main/java/org/elasticsearch/common/util/concurrent/RefCounted.java (95%) diff --git a/server/src/main/java/org/elasticsearch/common/util/concurrent/AbstractRefCounted.java b/libs/core/src/main/java/org/elasticsearch/common/util/concurrent/AbstractRefCounted.java similarity index 92% rename from server/src/main/java/org/elasticsearch/common/util/concurrent/AbstractRefCounted.java rename to libs/core/src/main/java/org/elasticsearch/common/util/concurrent/AbstractRefCounted.java index e0b8aea178c70..a30e7490ff445 100644 --- a/server/src/main/java/org/elasticsearch/common/util/concurrent/AbstractRefCounted.java +++ b/libs/core/src/main/java/org/elasticsearch/common/util/concurrent/AbstractRefCounted.java @@ -19,8 +19,6 @@ package org.elasticsearch.common.util.concurrent; -import org.apache.lucene.store.AlreadyClosedException; - import java.util.concurrent.atomic.AtomicInteger; /** @@ -68,7 +66,7 @@ public final void decRef() { } protected void alreadyClosed() { - throw new AlreadyClosedException(name + " is already closed can't increment refCount current count [" + refCount.get() + "]"); + throw new IllegalStateException(name + " is already closed can't increment refCount current count [" + refCount.get() + "]"); } /** diff --git a/server/src/main/java/org/elasticsearch/common/util/concurrent/RefCounted.java b/libs/core/src/main/java/org/elasticsearch/common/util/concurrent/RefCounted.java similarity index 95% rename from server/src/main/java/org/elasticsearch/common/util/concurrent/RefCounted.java rename to libs/core/src/main/java/org/elasticsearch/common/util/concurrent/RefCounted.java index b2cc8b99c63de..1e7bdc0e78faa 100644 --- a/server/src/main/java/org/elasticsearch/common/util/concurrent/RefCounted.java +++ b/libs/core/src/main/java/org/elasticsearch/common/util/concurrent/RefCounted.java @@ -44,7 +44,7 @@ public interface RefCounted { * * @see #decRef * @see #tryIncRef() - * @throws org.apache.lucene.store.AlreadyClosedException iff the reference counter can not be incremented. + * @throws IllegalStateException iff the reference counter can not be incremented. */ void incRef(); diff --git a/libs/nio/src/main/java/org/elasticsearch/nio/InboundChannelBuffer.java b/libs/nio/src/main/java/org/elasticsearch/nio/InboundChannelBuffer.java index 1ed1743a8bc0b..7c718237cd20e 100644 --- a/libs/nio/src/main/java/org/elasticsearch/nio/InboundChannelBuffer.java +++ b/libs/nio/src/main/java/org/elasticsearch/nio/InboundChannelBuffer.java @@ -19,6 +19,7 @@ package org.elasticsearch.nio; +import org.elasticsearch.common.util.concurrent.AbstractRefCounted; import org.elasticsearch.nio.utils.ExceptionsHelper; import java.nio.ByteBuffer; @@ -27,7 +28,6 @@ import java.util.Iterator; import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Supplier; /** @@ -273,22 +273,25 @@ private int indexInPage(long index) { public static class Page implements AutoCloseable { private final ByteBuffer byteBuffer; - private final Runnable closeable; - private final AtomicInteger referenceCount; + // This is reference counted as some implementations want to retain the byte pages by calling + // sliceAndRetainPagesTo. With reference counting we can increment the reference count, return the + // pages, and safely close them when this channel buffer is done with them. The reference count + // would be 1 at that point, meaning that the pages will remain until the implementation closes + // theirs. + private final RefCountedCloseable refCountedCloseable; public Page(ByteBuffer byteBuffer, Runnable closeable) { - this(byteBuffer, closeable, new AtomicInteger(1)); + this(byteBuffer, new RefCountedCloseable(closeable)); } - private Page(ByteBuffer byteBuffer, Runnable closeable, AtomicInteger referenceCount) { + private Page(ByteBuffer byteBuffer, RefCountedCloseable refCountedCloseable) { this.byteBuffer = byteBuffer; - this.closeable = closeable; - this.referenceCount = referenceCount; + this.refCountedCloseable = refCountedCloseable; } private Page duplicate() { - referenceCount.incrementAndGet(); - return new Page(byteBuffer.duplicate(), closeable, referenceCount); + refCountedCloseable.incRef(); + return new Page(byteBuffer.duplicate(), refCountedCloseable); } public ByteBuffer getByteBuffer() { @@ -297,9 +300,20 @@ public ByteBuffer getByteBuffer() { @Override public void close() { - int newReferenceCount = referenceCount.decrementAndGet(); - assert newReferenceCount >= 0 : "Reference count should never be less than 0. Found: [" + newReferenceCount + "]."; - if (newReferenceCount == 0) { + refCountedCloseable.decRef(); + } + + private static class RefCountedCloseable extends AbstractRefCounted { + + private final Runnable closeable; + + private RefCountedCloseable(Runnable closeable) { + super("byte array page"); + this.closeable = closeable; + } + + @Override + protected void closeInternal() { closeable.run(); } } diff --git a/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/PagedByteBuf.java b/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/PagedByteBuf.java index ce6f37baa02c5..40f3aeecfbc94 100644 --- a/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/PagedByteBuf.java +++ b/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/PagedByteBuf.java @@ -57,7 +57,7 @@ static ByteBuf byteBufFromPages(InboundChannelBuffer.Page[] pages) { private static ByteBuf byteBufFromPage(InboundChannelBuffer.Page page) { ByteBuffer buffer = page.getByteBuffer(); - assert !buffer.isDirect() && buffer.hasArray() : "Must be a heap buffer with an array"; + assert buffer.isDirect() == false && buffer.hasArray() : "Must be a heap buffer with an array"; int offset = buffer.arrayOffset() + buffer.position(); PagedByteBuf newByteBuf = new PagedByteBuf(buffer.array(), page::close); return newByteBuf.slice(offset, buffer.remaining()); diff --git a/plugins/transport-nio/src/test/java/org/elasticsearch/http/nio/PagedByteBufTests.java b/plugins/transport-nio/src/test/java/org/elasticsearch/http/nio/PagedByteBufTests.java index b354a7d230766..15bd18ecf6959 100644 --- a/plugins/transport-nio/src/test/java/org/elasticsearch/http/nio/PagedByteBufTests.java +++ b/plugins/transport-nio/src/test/java/org/elasticsearch/http/nio/PagedByteBufTests.java @@ -49,4 +49,43 @@ public void testReleasingPage() { secondBuf.release(); assertEquals(pageCount, integer.get()); } + + public void testBytesAreUsed() { + byte[] bytes1 = new byte[10]; + byte[] bytes2 = new byte[10]; + + for (int i = 0; i < 10; ++i) { + bytes1[i] = (byte) i; + } + + for (int i = 10; i < 20; ++i) { + bytes2[i - 10] = (byte) i; + } + + InboundChannelBuffer.Page[] pages = new InboundChannelBuffer.Page[2]; + pages[0] = new InboundChannelBuffer.Page(ByteBuffer.wrap(bytes1), () -> {}); + pages[1] = new InboundChannelBuffer.Page(ByteBuffer.wrap(bytes2), () -> {}); + + ByteBuf byteBuf = PagedByteBuf.byteBufFromPages(pages); + assertEquals(20, byteBuf.readableBytes()); + + for (int i = 0; i < 20; ++i) { + assertEquals((byte) i, byteBuf.getByte(i)); + } + + InboundChannelBuffer.Page[] pages2 = new InboundChannelBuffer.Page[2]; + ByteBuffer firstBuffer = ByteBuffer.wrap(bytes1); + firstBuffer.position(2); + ByteBuffer secondBuffer = ByteBuffer.wrap(bytes2); + secondBuffer.limit(8); + pages2[0] = new InboundChannelBuffer.Page(firstBuffer, () -> {}); + pages2[1] = new InboundChannelBuffer.Page(secondBuffer, () -> {}); + + ByteBuf byteBuf2 = PagedByteBuf.byteBufFromPages(pages2); + assertEquals(16, byteBuf2.readableBytes()); + + for (int i = 2; i < 18; ++i) { + assertEquals((byte) i, byteBuf2.getByte(i - 2)); + } + } } From c11429c7b471986b4ff386e3ee539a89bc3d1dd6 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Fri, 15 Jun 2018 10:46:01 -0600 Subject: [PATCH 7/9] Fix test --- .../test/java/org/elasticsearch/index/store/StoreTests.java | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/index/store/StoreTests.java b/server/src/test/java/org/elasticsearch/index/store/StoreTests.java index 9352d978e6e46..5a9f64902d3ac 100644 --- a/server/src/test/java/org/elasticsearch/index/store/StoreTests.java +++ b/server/src/test/java/org/elasticsearch/index/store/StoreTests.java @@ -39,7 +39,6 @@ import org.apache.lucene.index.SegmentInfos; import org.apache.lucene.index.SnapshotDeletionPolicy; import org.apache.lucene.index.Term; -import org.apache.lucene.store.AlreadyClosedException; import org.apache.lucene.store.BaseDirectoryWrapper; import org.apache.lucene.store.ChecksumIndexInput; import org.apache.lucene.store.Directory; @@ -148,13 +147,13 @@ public void testRefCount() throws IOException { try { store.incRef(); fail(" expected exception"); - } catch (AlreadyClosedException ex) { + } catch (IllegalStateException ex) { } try { store.ensureOpen(); fail(" expected exception"); - } catch (AlreadyClosedException ex) { + } catch (IllegalStateException ex) { } } From 29014adc4a7c5ebcf65d75696f284d3e644dd02e Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Fri, 15 Jun 2018 11:28:33 -0600 Subject: [PATCH 8/9] Cleanup test --- .../org/elasticsearch/index/store/StoreTests.java | 14 ++------------ 1 file changed, 2 insertions(+), 12 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/index/store/StoreTests.java b/server/src/test/java/org/elasticsearch/index/store/StoreTests.java index 5a9f64902d3ac..2cea9bb364684 100644 --- a/server/src/test/java/org/elasticsearch/index/store/StoreTests.java +++ b/server/src/test/java/org/elasticsearch/index/store/StoreTests.java @@ -144,18 +144,8 @@ public void testRefCount() throws IOException { store.decRef(); assertThat(store.refCount(), Matchers.equalTo(0)); assertFalse(store.tryIncRef()); - try { - store.incRef(); - fail(" expected exception"); - } catch (IllegalStateException ex) { - - } - try { - store.ensureOpen(); - fail(" expected exception"); - } catch (IllegalStateException ex) { - - } + expectThrows(IllegalStateException.class, store::incRef); + expectThrows(IllegalStateException.class, store::ensureOpen); } public void testVerifyingIndexOutput() throws IOException { From d81fe4441439d79ed50f068a4ebdaa2b83f9e768 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Fri, 15 Jun 2018 11:53:41 -0600 Subject: [PATCH 9/9] Fix test --- .../common/util/concurrent/RefCountedTests.java | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) rename {server => libs/core}/src/test/java/org/elasticsearch/common/util/concurrent/RefCountedTests.java (94%) diff --git a/server/src/test/java/org/elasticsearch/common/util/concurrent/RefCountedTests.java b/libs/core/src/test/java/org/elasticsearch/common/util/concurrent/RefCountedTests.java similarity index 94% rename from server/src/test/java/org/elasticsearch/common/util/concurrent/RefCountedTests.java rename to libs/core/src/test/java/org/elasticsearch/common/util/concurrent/RefCountedTests.java index b2664b134ed8e..ebcf12482dfa7 100644 --- a/server/src/test/java/org/elasticsearch/common/util/concurrent/RefCountedTests.java +++ b/libs/core/src/test/java/org/elasticsearch/common/util/concurrent/RefCountedTests.java @@ -18,7 +18,6 @@ */ package org.elasticsearch.common.util.concurrent; -import org.apache.lucene.store.AlreadyClosedException; import org.elasticsearch.test.ESTestCase; import org.hamcrest.Matchers; @@ -70,14 +69,14 @@ public void testRefCount() throws IOException { try { counted.incRef(); fail(" expected exception"); - } catch (AlreadyClosedException ex) { + } catch (IllegalStateException ex) { assertThat(ex.getMessage(), equalTo("test is already closed can't increment refCount current count [0]")); } try { counted.ensureOpen(); fail(" expected exception"); - } catch (AlreadyClosedException ex) { + } catch (IllegalStateException ex) { assertThat(ex.getMessage(), equalTo("closed")); } } @@ -116,7 +115,7 @@ public void run() { try { counted.ensureOpen(); fail("expected to be closed"); - } catch (AlreadyClosedException ex) { + } catch (IllegalStateException ex) { assertThat(ex.getMessage(), equalTo("closed")); } assertThat(counted.refCount(), is(0)); @@ -140,7 +139,7 @@ protected void closeInternal() { public void ensureOpen() { if (closed.get()) { assert this.refCount() == 0; - throw new AlreadyClosedException("closed"); + throw new IllegalStateException("closed"); } } }