Skip to content
This repository has been archived by the owner on Oct 23, 2024. It is now read-only.

Commit

Permalink
Merge pull request #81 from zonybob/gh-13-backpressure
Browse files Browse the repository at this point in the history
Enhance Backpressure Support in Mesos Response Stream
  • Loading branch information
BenWhitehead authored Aug 4, 2017
2 parents 8d6c277 + 7e81fea commit 25959f8
Show file tree
Hide file tree
Showing 4 changed files with 244 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -85,18 +85,23 @@ public final class MesosClient<Send, Receive> {
@NotNull
private final AtomicReference<String> mesosStreamId = new AtomicReference<>(null);

@NotNull
private final Observable.Transformer<byte[], byte[]> backpressureTransformer;

MesosClient(
@NotNull final URI mesosUri,
@NotNull final Function<Class<?>, UserAgentEntry> applicationUserAgentEntry,
@NotNull final MessageCodec<Send> sendCodec,
@NotNull final MessageCodec<Receive> receiveCodec,
@NotNull final Send subscribe,
@NotNull final Function<Observable<Receive>, Observable<Optional<SinkOperation<Send>>>> streamProcessor
@NotNull final Function<Observable<Receive>, Observable<Optional<SinkOperation<Send>>>> streamProcessor,
@NotNull final Observable.Transformer<byte[], byte[]> backpressureTransformer
) {
this.mesosUri = mesosUri;
this.receiveCodec = receiveCodec;
this.subscribe = subscribe;
this.streamProcessor = streamProcessor;
this.backpressureTransformer = backpressureTransformer;

userAgent = new UserAgent(
applicationUserAgentEntry,
Expand Down Expand Up @@ -132,6 +137,7 @@ public AwaitableSubscription openStream() {
.subscribeOn(Rx.io())
.flatMap(verifyResponseOk(subscribe, mesosStreamId, receiveCodec.mediaType()))
.lift(new RecordIOOperator())
.compose(backpressureTransformer)
.observeOn(Rx.compute())
/* Begin temporary back-pressure */
.buffer(250, TimeUnit.MILLISECONDS)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@
import com.mesosphere.mesos.rx.java.util.MessageCodec;
import com.mesosphere.mesos.rx.java.util.UserAgentEntry;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import rx.BackpressureOverflow;
import rx.Observable;
import rx.functions.Action0;

import java.net.URI;
import java.util.Optional;
Expand All @@ -42,8 +45,11 @@ public final class MesosClientBuilder<Send, Receive> {
private MessageCodec<Receive> receiveCodec;
private Send subscribe;
private Function<Observable<Receive>, Observable<Optional<SinkOperation<Send>>>> streamProcessor;
private Observable.Transformer<byte[], byte[]> backpressureTransformer;

private MesosClientBuilder() {}
private MesosClientBuilder() {
backpressureTransformer = observable -> observable;
}

/**
* Create a new instance of MesosClientBuilder
Expand Down Expand Up @@ -162,6 +168,83 @@ public MesosClientBuilder<Send, Receive> processStream(
return this;
}

/**
* Instructs the HTTP byte[] stream to be composed with reactive pull backpressure such that
* a burst of incoming Mesos messages is handled by an unbounded buffer rather than a
* MissingBackpressureException.
*
* As an example, this may be necessary for Mesos schedulers that launch large numbers
* of tasks at a time and then request reconciliation.
*
* @return this builder (allowing for further chained calls)
* @see <a href="http://reactivex.io/documentation/operators/backpressure.html">ReactiveX operators documentation: backpressure operators</a>
*/
@NotNull
public MesosClientBuilder<Send, Receive> onBackpressureBuffer(
) {
this.backpressureTransformer = observable -> observable.onBackpressureBuffer();
return this;
}


/**
* Instructs the HTTP byte[] stream to be composed with reactive pull backpressure such that
* a burst of incoming Mesos messages is handled by a bounded buffer rather than a
* MissingBackpressureException. If the buffer is overflown, a {@link java.nio.BufferOverflowException}
* is thrown.
*
* As an example, this may be necessary for Mesos schedulers that launch large numbers
* of tasks at a time and then request reconciliation.
*
* @param capacity number of slots available in the buffer.
* @return this builder (allowing for further chained calls)
* @see <a href="http://reactivex.io/documentation/operators/backpressure.html">ReactiveX operators documentation: backpressure operators</a>
*/
@NotNull
public MesosClientBuilder<Send, Receive> onBackpressureBuffer(
final long capacity
) {
this.backpressureTransformer = observable -> observable.onBackpressureBuffer(capacity);
return this;
}

/**
* Instructs the HTTP byte[] stream to be composed with reactive pull backpressure such that
* a burst of incoming Mesos messages is handled by a bounded buffer rather than a
* MissingBackpressureException. If the buffer is overflown, your own custom onOverflow callback
* will be invoked, and the overflow will mitigate the issue based on the {@link BackpressureOverflow.Strategy}
* that you select.
*
* <ul>
* <li>{@link BackpressureOverflow#ON_OVERFLOW_ERROR} (default) will {@code onError} dropping all undelivered items,
* unsubscribing from the source, and notifying the producer with {@code onOverflow}. </li>
* <li>{@link BackpressureOverflow#ON_OVERFLOW_DROP_LATEST} will drop any new items emitted by the producer while
* the buffer is full, without generating any {@code onError}. Each drop will however invoke {@code onOverflow}
* to signal the overflow to the producer.</li>
* <li>{@link BackpressureOverflow#ON_OVERFLOW_DROP_OLDEST} will drop the oldest items in the buffer in order to make
* room for newly emitted ones. Overflow will not generate an{@code onError}, but each drop will invoke
* {@code onOverflow} to signal the overflow to the producer.</li>
* </ul>
*
* As an example, this may be necessary for Mesos schedulers that launch large numbers
* of tasks at a time and then request reconciliation.
*
* @param capacity number of slots available in the buffer.
* @param onOverflow action to execute if an item needs to be buffered, but there are no available slots. Null is allowed.
* @param strategy how should the {@code Observable} react to buffer overflows.
* @return this builder (allowing for further chained calls)
* @see <a href="http://reactivex.io/documentation/operators/backpressure.html">ReactiveX operators documentation: backpressure operators</a>
*/
@NotNull
public MesosClientBuilder<Send, Receive> onBackpressureBuffer(
final long capacity,
@Nullable final Action0 onOverflow,
@NotNull final BackpressureOverflow.Strategy strategy
) {
this.backpressureTransformer = observable -> observable.onBackpressureBuffer(capacity, onOverflow, strategy);
return this;
}

/**
* Builds the instance of {@link MesosClient} that has been configured by this builder.
* All items are expected to have non-null values, if any item is null an exception will be thrown.
Expand All @@ -175,7 +258,8 @@ public final MesosClient<Send, Receive> build() {
checkNotNull(sendCodec),
checkNotNull(receiveCodec),
checkNotNull(subscribe),
checkNotNull(streamProcessor)
checkNotNull(streamProcessor),
checkNotNull(backpressureTransformer)
);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
/*
* Copyright (C) 2016 Mesosphere, Inc
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.mesosphere.mesos.rx.java;

import com.mesosphere.mesos.rx.java.test.StringMessageCodec;
import com.mesosphere.mesos.rx.java.util.UserAgentEntries;
import io.netty.buffer.ByteBuf;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.reactivex.netty.RxNetty;
import io.reactivex.netty.protocol.http.server.HttpServer;
import io.reactivex.netty.protocol.http.server.HttpServerResponse;
import io.reactivex.netty.protocol.http.server.RequestHandler;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.exceptions.MissingBackpressureException;

import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.util.Optional;
import java.util.concurrent.TimeUnit;

import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;

public final class MesosClientBackpressureIntegrationTest {

private static final Logger LOGGER = LoggerFactory.getLogger(MesosClientBackpressureIntegrationTest.class);

static int msgNo = 0;

@Rule
public Timeout timeoutRule = new Timeout(10_000, TimeUnit.MILLISECONDS);

@Test
@Ignore
public void testBurstyObservable_missingBackpressureException() throws Throwable {
final String subscribedMessage = "{\"type\": \"SUBSCRIBED\",\"subscribed\": {\"framework_id\": {\"value\":\"12220-3440-12532-2345\"},\"heartbeat_interval_seconds\":15.0}";
final String heartbeatMessage = "{\"type\":\"HEARTBEAT\"}";
final byte[] hmsg = heartbeatMessage.getBytes(StandardCharsets.UTF_8);
final byte[] hbytes = String.format("%d\n", heartbeatMessage.getBytes().length).getBytes(StandardCharsets.UTF_8);

final RequestHandler<ByteBuf, ByteBuf> handler = (request, response) -> {
response.setStatus(HttpResponseStatus.OK);
response.getHeaders().setHeader("Content-Type", "text/plain;charset=utf-8");
writeRecordIOMessage(response, subscribedMessage);
for (int i = 0; i < 20000; i++) {
response.writeBytes(hbytes);
response.writeBytes(hmsg);
}
return response.flush();
};
final HttpServer<ByteBuf, ByteBuf> server = RxNetty.createHttpServer(0, handler);
server.start();
final URI uri = URI.create(String.format("http://localhost:%d/api/v1/scheduler", server.getServerPort()));
final MesosClient<String, String> client = createClientForStreaming(uri).build();

try {
client.openStream().await();
fail("Expect an exception to be propagated up due to backpressure");
} catch (MissingBackpressureException e) {
// expected
e.printStackTrace();
assertThat(e.getMessage()).isNullOrEmpty();
} finally {
server.shutdown();
}
}

@Test
public void testBurstyObservable_unboundedBufferSucceeds() throws Throwable {
msgNo = 0;
final int numMessages = 20000;
final String subscribedMessage = "{\"type\": \"SUBSCRIBED\",\"subscribed\": {\"framework_id\": {\"value\":\"12220-3440-12532-2345\"},\"heartbeat_interval_seconds\":15.0}";
final String heartbeatMessage = "{\"type\":\"HEARTBEAT\"}";
final RequestHandler<ByteBuf, ByteBuf> handler = (request, response) -> {
response.setStatus(HttpResponseStatus.OK);
response.getHeaders().setHeader("Content-Type", "text/plain;charset=utf-8");
writeRecordIOMessage(response, subscribedMessage);
for (int i = 0; i < numMessages; i++) {
writeRecordIOMessage(response, heartbeatMessage);
}
return response.close();
};
final HttpServer<ByteBuf, ByteBuf> server = RxNetty.createHttpServer(0, handler);
server.start();
final URI uri = URI.create(String.format("http://localhost:%d/api/v1/scheduler", server.getServerPort()));
final MesosClient<String, String> client = createClientForStreaming(uri)
.onBackpressureBuffer()
.build();

try {
client.openStream().await();
} finally {
// 20000 heartbeats PLUS 1 subscribe
assertEquals("All heartbeats received (plus the subscribed)", 1 + numMessages, msgNo);
server.shutdown();
}
}

private void writeRecordIOMessage(HttpServerResponse<ByteBuf> response, String msg) {
response.writeBytesAndFlush(String.format("%d\n", msg.getBytes().length).getBytes(StandardCharsets.UTF_8));
response.writeBytesAndFlush(msg.getBytes(StandardCharsets.UTF_8));
}

private static MesosClientBuilder<String, String> createClientForStreaming(final URI uri) {
return MesosClientBuilder.<String, String>newBuilder()
.sendCodec(StringMessageCodec.UTF8_STRING)
.receiveCodec(StringMessageCodec.UTF8_STRING)
.mesosUri(uri)
.applicationUserAgentEntry(UserAgentEntries.literal("test", "test"))
.processStream(events ->
events
.doOnNext(e -> LOGGER.debug(++msgNo + " : " + e))
.map(e -> Optional.empty()))
.subscribe("subscribe");
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,20 @@ public void awaitCall(final int callCount) throws InterruptedException {
sem.acquire(callCount);
}

/**
* Block the invoking thread until {@code callCount} {@link Call}s are received by the server or
* until a specified amount of time elapses.
* <p>
*
* @param callCount The number of events to block and wait for
* @param timeout The amount of time to wait for the events before timing out
* @param unit The {@link TimeUnit} for the timeout
* @throws InterruptedException if the current thread is interrupted while waiting
*/
public void awaitCall(final int callCount, final long timeout, final TimeUnit unit) throws InterruptedException {
sem.tryAcquire(callCount, timeout, unit);
}

/**
* Block the invoking thread until a "Subscribe call" is received by the server as determined by the
* {@code isSubscribePredicate} provided to the constructor.
Expand Down

0 comments on commit 25959f8

Please sign in to comment.