Skip to content

Commit

Permalink
Add byte array pooling to nio http transport (#31349)
Browse files Browse the repository at this point in the history
This is related to #28898. This PR implements pooling of bytes arrays
when reading from the wire in the http server transport. In order to do
this, we must integrate with netty reference counting. That manner in
which this PR implements this is making Pages in InboundChannelBuffer
reference counted. When we accessing the underlying page to pass to
netty, we retain the page. When netty releases its bytebuf, it releases
the underlying pages we have passed to it.
  • Loading branch information
Tim-Brooks authored Jun 15, 2018
1 parent 605dbbe commit a705e1a
Show file tree
Hide file tree
Showing 19 changed files with 343 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@

package org.elasticsearch.common.util.concurrent;

import org.apache.lucene.store.AlreadyClosedException;

import java.util.concurrent.atomic.AtomicInteger;

/**
Expand Down Expand Up @@ -68,7 +66,7 @@ public final void decRef() {
}

protected void alreadyClosed() {
throw new AlreadyClosedException(name + " is already closed can't increment refCount current count [" + refCount.get() + "]");
throw new IllegalStateException(name + " is already closed can't increment refCount current count [" + refCount.get() + "]");
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public interface RefCounted {
*
* @see #decRef
* @see #tryIncRef()
* @throws org.apache.lucene.store.AlreadyClosedException iff the reference counter can not be incremented.
* @throws IllegalStateException iff the reference counter can not be incremented.
*/
void incRef();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
*/
package org.elasticsearch.common.util.concurrent;

import org.apache.lucene.store.AlreadyClosedException;
import org.elasticsearch.test.ESTestCase;
import org.hamcrest.Matchers;

Expand Down Expand Up @@ -70,14 +69,14 @@ public void testRefCount() throws IOException {
try {
counted.incRef();
fail(" expected exception");
} catch (AlreadyClosedException ex) {
} catch (IllegalStateException ex) {
assertThat(ex.getMessage(), equalTo("test is already closed can't increment refCount current count [0]"));
}

try {
counted.ensureOpen();
fail(" expected exception");
} catch (AlreadyClosedException ex) {
} catch (IllegalStateException ex) {
assertThat(ex.getMessage(), equalTo("closed"));
}
}
Expand Down Expand Up @@ -116,7 +115,7 @@ public void run() {
try {
counted.ensureOpen();
fail("expected to be closed");
} catch (AlreadyClosedException ex) {
} catch (IllegalStateException ex) {
assertThat(ex.getMessage(), equalTo("closed"));
}
assertThat(counted.refCount(), is(0));
Expand All @@ -140,7 +139,7 @@ protected void closeInternal() {
public void ensureOpen() {
if (closed.get()) {
assert this.refCount() == 0;
throw new AlreadyClosedException("closed");
throw new IllegalStateException("closed");
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.elasticsearch.nio;

import org.elasticsearch.common.util.concurrent.AbstractRefCounted;
import org.elasticsearch.nio.utils.ExceptionsHelper;

import java.nio.ByteBuffer;
Expand All @@ -41,6 +42,7 @@ public final class InboundChannelBuffer implements AutoCloseable {
private static final int PAGE_MASK = PAGE_SIZE - 1;
private static final int PAGE_SHIFT = Integer.numberOfTrailingZeros(PAGE_SIZE);
private static final ByteBuffer[] EMPTY_BYTE_BUFFER_ARRAY = new ByteBuffer[0];
private static final Page[] EMPTY_BYTE_PAGE_ARRAY = new Page[0];


private final ArrayDeque<Page> pages;
Expand Down Expand Up @@ -152,6 +154,46 @@ public ByteBuffer[] sliceBuffersTo(long to) {
return buffers;
}

/**
* This method will return an array of {@link Page} representing the bytes from the beginning of
* this buffer up through the index argument that was passed. The pages and buffers will be duplicates of
* the internal components, so any modifications to the markers {@link ByteBuffer#position()},
* {@link ByteBuffer#limit()}, etc will not modify the this class. Additionally, this will internally
* retain the underlying pages, so the pages returned by this method must be closed.
*
* @param to the index to slice up to
* @return the pages
*/
public Page[] sliceAndRetainPagesTo(long to) {
if (to > capacity) {
throw new IndexOutOfBoundsException("can't slice a channel buffer with capacity [" + capacity +
"], with slice parameters to [" + to + "]");
} else if (to == 0) {
return EMPTY_BYTE_PAGE_ARRAY;
}
long indexWithOffset = to + offset;
int pageCount = pageIndex(indexWithOffset);
int finalLimit = indexInPage(indexWithOffset);
if (finalLimit != 0) {
pageCount += 1;
}

Page[] pages = new Page[pageCount];
Iterator<Page> pageIterator = this.pages.iterator();
Page firstPage = pageIterator.next().duplicate();
ByteBuffer firstBuffer = firstPage.byteBuffer;
firstBuffer.position(firstBuffer.position() + offset);
pages[0] = firstPage;
for (int i = 1; i < pages.length; i++) {
pages[i] = pageIterator.next().duplicate();
}
if (finalLimit != 0) {
pages[pages.length - 1].byteBuffer.limit(finalLimit);
}

return pages;
}

/**
* This method will return an array of {@link ByteBuffer} representing the bytes from the index passed
* through the end of this buffer. The buffers will be duplicates of the internal buffers, so any
Expand Down Expand Up @@ -231,16 +273,49 @@ private int indexInPage(long index) {
public static class Page implements AutoCloseable {

private final ByteBuffer byteBuffer;
private final Runnable closeable;
// This is reference counted as some implementations want to retain the byte pages by calling
// sliceAndRetainPagesTo. With reference counting we can increment the reference count, return the
// pages, and safely close them when this channel buffer is done with them. The reference count
// would be 1 at that point, meaning that the pages will remain until the implementation closes
// theirs.
private final RefCountedCloseable refCountedCloseable;

public Page(ByteBuffer byteBuffer, Runnable closeable) {
this(byteBuffer, new RefCountedCloseable(closeable));
}

private Page(ByteBuffer byteBuffer, RefCountedCloseable refCountedCloseable) {
this.byteBuffer = byteBuffer;
this.closeable = closeable;
this.refCountedCloseable = refCountedCloseable;
}

private Page duplicate() {
refCountedCloseable.incRef();
return new Page(byteBuffer.duplicate(), refCountedCloseable);
}

public ByteBuffer getByteBuffer() {
return byteBuffer;
}

@Override
public void close() {
closeable.run();
refCountedCloseable.decRef();
}

private static class RefCountedCloseable extends AbstractRefCounted {

private final Runnable closeable;

private RefCountedCloseable(Runnable closeable) {
super("byte array page");
this.closeable = closeable;
}

@Override
protected void closeInternal() {
closeable.run();
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ public class InboundChannelBufferTests extends ESTestCase {

private static final int PAGE_SIZE = BigArrays.PAGE_SIZE_IN_BYTES;
private final Supplier<InboundChannelBuffer.Page> defaultPageSupplier = () ->
new InboundChannelBuffer.Page(ByteBuffer.allocate(BigArrays.BYTE_PAGE_SIZE), () -> {});
new InboundChannelBuffer.Page(ByteBuffer.allocate(BigArrays.BYTE_PAGE_SIZE), () -> {
});

public void testNewBufferHasSinglePage() {
InboundChannelBuffer channelBuffer = new InboundChannelBuffer(defaultPageSupplier);
Expand Down Expand Up @@ -167,6 +168,49 @@ public void testClose() {
expectThrows(IllegalStateException.class, () -> channelBuffer.ensureCapacity(1));
}

public void testCloseRetainedPages() {
ConcurrentLinkedQueue<AtomicBoolean> queue = new ConcurrentLinkedQueue<>();
Supplier<InboundChannelBuffer.Page> supplier = () -> {
AtomicBoolean atomicBoolean = new AtomicBoolean();
queue.add(atomicBoolean);
return new InboundChannelBuffer.Page(ByteBuffer.allocate(PAGE_SIZE), () -> atomicBoolean.set(true));
};
InboundChannelBuffer channelBuffer = new InboundChannelBuffer(supplier);
channelBuffer.ensureCapacity(PAGE_SIZE * 4);

assertEquals(4, queue.size());

for (AtomicBoolean closedRef : queue) {
assertFalse(closedRef.get());
}

InboundChannelBuffer.Page[] pages = channelBuffer.sliceAndRetainPagesTo(PAGE_SIZE * 2);

pages[1].close();

for (AtomicBoolean closedRef : queue) {
assertFalse(closedRef.get());
}

channelBuffer.close();

int i = 0;
for (AtomicBoolean closedRef : queue) {
if (i < 1) {
assertFalse(closedRef.get());
} else {
assertTrue(closedRef.get());
}
++i;
}

pages[0].close();

for (AtomicBoolean closedRef : queue) {
assertTrue(closedRef.get());
}
}

public void testAccessByteBuffers() {
InboundChannelBuffer channelBuffer = new InboundChannelBuffer(defaultPageSupplier);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,8 @@ public Map<String, Supplier<Transport>> getTransports(Settings settings, ThreadP

@Override
public Map<String, Supplier<HttpServerTransport>> getHttpTransports(Settings settings, ThreadPool threadPool, BigArrays bigArrays,
PageCacheRecycler pageCacheRecycler,
CircuitBreakerService circuitBreakerService,
NamedWriteableRegistry namedWriteableRegistry,
NamedXContentRegistry xContentRegistry,
NetworkService networkService,
HttpServerTransport.Dispatcher dispatcher) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ public class HttpReadWriteHandler implements ReadWriteHandler {

@Override
public int consumeReads(InboundChannelBuffer channelBuffer) throws IOException {
int bytesConsumed = adaptor.read(channelBuffer.sliceBuffersTo(channelBuffer.getIndex()));
int bytesConsumed = adaptor.read(channelBuffer.sliceAndRetainPagesTo(channelBuffer.getIndex()));
Object message;
while ((message = adaptor.pollInboundMessage()) != null) {
handleRequest(message);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import io.netty.channel.embedded.EmbeddedChannel;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.nio.FlushOperation;
import org.elasticsearch.nio.InboundChannelBuffer;
import org.elasticsearch.nio.WriteOperation;

import java.nio.ByteBuffer;
Expand Down Expand Up @@ -97,6 +98,13 @@ public int read(ByteBuffer[] buffers) {
return byteBuf.readerIndex() - initialReaderIndex;
}

public int read(InboundChannelBuffer.Page[] pages) {
ByteBuf byteBuf = PagedByteBuf.byteBufFromPages(pages);
int readableBytes = byteBuf.readableBytes();
nettyChannel.writeInbound(byteBuf);
return readableBytes;
}

public Object pollInboundMessage() {
return nettyChannel.readInbound();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,14 @@
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.network.NetworkAddress;
import org.elasticsearch.common.network.NetworkService;
import org.elasticsearch.common.recycler.Recycler;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.NetworkExceptionHelper;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.PageCacheRecycler;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.http.AbstractHttpServerTransport;
Expand All @@ -63,6 +65,7 @@
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
Expand Down Expand Up @@ -103,6 +106,8 @@ public class NioHttpServerTransport extends AbstractHttpServerTransport {
(s) -> Integer.toString(EsExecutors.numberOfProcessors(s) * 2),
(s) -> Setting.parseInt(s, 1, "http.nio.worker_count"), Setting.Property.NodeScope);

private final PageCacheRecycler pageCacheRecycler;

private final boolean tcpNoDelay;
private final boolean tcpKeepAlive;
private final boolean reuseAddress;
Expand All @@ -115,9 +120,11 @@ public class NioHttpServerTransport extends AbstractHttpServerTransport {
private HttpChannelFactory channelFactory;
private final NioCorsConfig corsConfig;

public NioHttpServerTransport(Settings settings, NetworkService networkService, BigArrays bigArrays, ThreadPool threadPool,
NamedXContentRegistry xContentRegistry, HttpServerTransport.Dispatcher dispatcher) {
public NioHttpServerTransport(Settings settings, NetworkService networkService, BigArrays bigArrays,
PageCacheRecycler pageCacheRecycler, ThreadPool threadPool, NamedXContentRegistry xContentRegistry,
HttpServerTransport.Dispatcher dispatcher) {
super(settings, networkService, bigArrays, threadPool, xContentRegistry, dispatcher);
this.pageCacheRecycler = pageCacheRecycler;

ByteSizeValue maxChunkSize = SETTING_HTTP_MAX_CHUNK_SIZE.get(settings);
ByteSizeValue maxHeaderSize = SETTING_HTTP_MAX_HEADER_SIZE.get(settings);
Expand Down Expand Up @@ -329,11 +336,15 @@ private HttpChannelFactory() {
@Override
public NioHttpChannel createChannel(NioSelector selector, SocketChannel channel) throws IOException {
NioHttpChannel nioChannel = new NioHttpChannel(channel);
java.util.function.Supplier<InboundChannelBuffer.Page> pageSupplier = () -> {
Recycler.V<byte[]> bytes = pageCacheRecycler.bytePage(false);
return new InboundChannelBuffer.Page(ByteBuffer.wrap(bytes.v()), bytes::close);
};
HttpReadWriteHandler httpReadWritePipeline = new HttpReadWriteHandler(nioChannel,NioHttpServerTransport.this,
handlingSettings, corsConfig);
Consumer<Exception> exceptionHandler = (e) -> exceptionCaught(nioChannel, e);
SocketChannelContext context = new BytesChannelContext(nioChannel, selector, exceptionHandler, httpReadWritePipeline,
InboundChannelBuffer.allocatingInstance());
new InboundChannelBuffer(pageSupplier));
nioChannel.setContext(context);
return nioChannel;
}
Expand Down
Loading

0 comments on commit a705e1a

Please sign in to comment.