Skip to content

Commit

Permalink
removes deprecated errorConsumer
Browse files Browse the repository at this point in the history
Signed-off-by: Oleh Dokuka <[email protected]>
  • Loading branch information
OlegDokuka committed May 10, 2020
1 parent cec7a78 commit faa0a85
Show file tree
Hide file tree
Showing 15 changed files with 86 additions and 251 deletions.
6 changes: 2 additions & 4 deletions rsocket-core/src/main/java/io/rsocket/RSocketFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ public static class ClientRSocketFactory implements ClientTransportAcceptor {
private Resume resume;

public ClientRSocketFactory() {
this(RSocketConnector.create().errorConsumer(Throwable::printStackTrace));
this(RSocketConnector.create());
}

public ClientRSocketFactory(RSocketConnector connector) {
Expand Down Expand Up @@ -395,7 +395,6 @@ public ClientRSocketFactory fragment(int mtu) {

/** @deprecated this is deprecated with no replacement. */
public ClientRSocketFactory errorConsumer(Consumer<Throwable> errorConsumer) {
connector.errorConsumer(errorConsumer);
return this;
}

Expand All @@ -417,7 +416,7 @@ public static class ServerRSocketFactory implements ServerTransportAcceptor {
private Resume resume;

public ServerRSocketFactory() {
this(RSocketServer.create().errorConsumer(Throwable::printStackTrace));
this(RSocketServer.create());
}

public ServerRSocketFactory(RSocketServer server) {
Expand Down Expand Up @@ -499,7 +498,6 @@ public ServerRSocketFactory fragment(int mtu) {

/** @deprecated this is deprecated with no replacement. */
public ServerRSocketFactory errorConsumer(Consumer<Throwable> errorConsumer) {
server.errorConsumer(errorConsumer);
return this;
}

Expand Down
16 changes: 0 additions & 16 deletions rsocket-core/src/main/java/io/rsocket/core/RSocketConnector.java
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,6 @@ public class RSocketConnector {
private int mtu = 0;
private PayloadDecoder payloadDecoder = PayloadDecoder.DEFAULT;

private Consumer<Throwable> errorConsumer = ex -> {};

private RSocketConnector() {}

/**
Expand Down Expand Up @@ -436,17 +434,6 @@ public RSocketConnector payloadDecoder(PayloadDecoder decoder) {
return this;
}

/**
* @deprecated this is deprecated with no replacement and will be removed after {@link
* io.rsocket.RSocketFactory} is removed.
*/
@Deprecated
public RSocketConnector errorConsumer(Consumer<Throwable> errorConsumer) {
Objects.requireNonNull(errorConsumer);
this.errorConsumer = errorConsumer;
return this;
}

/**
* The final step to connect with the transport to use as input and the resulting {@code
* Mono<RSocket>} as output. Each subscriber to the returned {@code Mono} starts a new connection
Expand Down Expand Up @@ -524,7 +511,6 @@ public Mono<RSocket> connect(Supplier<ClientTransport> transportSupplier) {
new RSocketRequester(
multiplexer.asClientConnection(),
payloadDecoder,
errorConsumer,
StreamIdSupplier.clientSupplier(),
mtu,
(int) keepAliveInterval.toMillis(),
Expand Down Expand Up @@ -564,7 +550,6 @@ public Mono<RSocket> connect(Supplier<ClientTransport> transportSupplier) {
CLIENT_TAG,
wrappedConnection.alloc(),
leases.sender(),
errorConsumer,
leases.stats())
: ResponderLeaseHandler.None;

Expand All @@ -573,7 +558,6 @@ public Mono<RSocket> connect(Supplier<ClientTransport> transportSupplier) {
multiplexer.asServerConnection(),
wrappedRSocketHandler,
payloadDecoder,
errorConsumer,
responderLeaseHandler,
mtu);

Expand Down
46 changes: 29 additions & 17 deletions rsocket-core/src/main/java/io/rsocket/core/RSocketRequester.java
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.BaseSubscriber;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
Expand All @@ -74,9 +76,8 @@
* Requester Side of a RSocket socket. Sends {@link ByteBuf}s to a {@link RSocketResponder} of peer
*/
class RSocketRequester implements RSocket {
private static final AtomicReferenceFieldUpdater<RSocketRequester, Throwable> TERMINATION_ERROR =
AtomicReferenceFieldUpdater.newUpdater(
RSocketRequester.class, Throwable.class, "terminationError");
private static final Logger LOGGER = LoggerFactory.getLogger(RSocketRequester.class);

private static final Exception CLOSED_CHANNEL_EXCEPTION = new ClosedChannelException();
private static final Consumer<ReferenceCounted> DROPPED_ELEMENTS_CONSUMER =
referenceCounted -> {
Expand All @@ -93,9 +94,14 @@ class RSocketRequester implements RSocket {
CLOSED_CHANNEL_EXCEPTION.setStackTrace(new StackTraceElement[0]);
}

private volatile Throwable terminationError;

private static final AtomicReferenceFieldUpdater<RSocketRequester, Throwable> TERMINATION_ERROR =
AtomicReferenceFieldUpdater.newUpdater(
RSocketRequester.class, Throwable.class, "terminationError");

private final DuplexConnection connection;
private final PayloadDecoder payloadDecoder;
private final Consumer<Throwable> errorConsumer;
private final StreamIdSupplier streamIdSupplier;
private final IntObjectMap<Subscription> senders;
private final IntObjectMap<Processor<Payload, Payload>> receivers;
Expand All @@ -104,14 +110,12 @@ class RSocketRequester implements RSocket {
private final RequesterLeaseHandler leaseHandler;
private final ByteBufAllocator allocator;
private final KeepAliveFramesAcceptor keepAliveFramesAcceptor;
private volatile Throwable terminationError;
private final MonoProcessor<Void> onClose;
private final Scheduler serialScheduler;

RSocketRequester(
DuplexConnection connection,
PayloadDecoder payloadDecoder,
Consumer<Throwable> errorConsumer,
StreamIdSupplier streamIdSupplier,
int mtu,
int keepAliveTickPeriod,
Expand All @@ -122,7 +126,6 @@ class RSocketRequester implements RSocket {
this.connection = connection;
this.allocator = connection.alloc();
this.payloadDecoder = payloadDecoder;
this.errorConsumer = errorConsumer;
this.streamIdSupplier = streamIdSupplier;
this.mtu = mtu;
this.leaseHandler = leaseHandler;
Expand All @@ -140,7 +143,7 @@ class RSocketRequester implements RSocket {
.subscribe(null, this::tryTerminateOnConnectionError, this::tryTerminateOnConnectionClose);
connection.send(sendProcessor).subscribe(null, this::handleSendProcessorError);

connection.receive().subscribe(this::handleIncomingFrames, errorConsumer);
connection.receive().subscribe(this::handleIncomingFrames, e -> {});

if (keepAliveTickPeriod != 0 && keepAliveHandler != null) {
KeepAliveSupport keepAliveSupport =
Expand Down Expand Up @@ -396,7 +399,6 @@ private Flux<Payload> handleChannel(Flux<Payload> request) {
payload.release();
final IllegalArgumentException t =
new IllegalArgumentException(INVALID_PAYLOAD_ERROR_MESSAGE);
errorConsumer.accept(t);
return Mono.error(t);
}
return handleChannel(payload, flux);
Expand Down Expand Up @@ -446,7 +448,6 @@ protected void hookOnNext(Payload payload) {
cancel();
final IllegalArgumentException t =
new IllegalArgumentException(INVALID_PAYLOAD_ERROR_MESSAGE);
errorConsumer.accept(t);
// no need to send any errors.
sendProcessor.onNext(CancelFrameCodec.encode(allocator, streamId));
receiver.onError(t);
Expand Down Expand Up @@ -610,9 +611,9 @@ private void handleStreamZero(FrameType type, ByteBuf frame) {
break;
default:
// Ignore unknown frames. Throwing an error will close the socket.
errorConsumer.accept(
new IllegalStateException(
"Client received supported frame on stream 0: " + frame.toString()));
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Requester unsupported frame on stream 0: " + frame.toString());
}
}
}

Expand Down Expand Up @@ -669,7 +670,7 @@ private void handleFrame(int streamId, FrameType type, ByteBuf frame) {
}
default:
throw new IllegalStateException(
"Client received supported frame on stream " + streamId + ": " + frame.toString());
"Requester received unsupported frame on stream " + streamId + ": " + frame.toString());
}
}

Expand Down Expand Up @@ -725,6 +726,14 @@ private void tryTerminate(Supplier<Throwable> errorSupplier) {
}
}

private void tryShutdown() {
if (terminationError == null) {
if (TERMINATION_ERROR.compareAndSet(this, null, CLOSED_CHANNEL_EXCEPTION)) {
terminate(CLOSED_CHANNEL_EXCEPTION);
}
}
}

private void terminate(Throwable e) {
connection.dispose();
leaseHandler.dispose();
Expand All @@ -737,7 +746,9 @@ private void terminate(Throwable e) {
try {
receiver.onError(e);
} catch (Throwable t) {
errorConsumer.accept(t);
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Dropped exception", t);
}
}
});
}
Expand All @@ -749,14 +760,15 @@ private void terminate(Throwable e) {
try {
sender.cancel();
} catch (Throwable t) {
errorConsumer.accept(t);
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Dropped exception", t);
}
}
});
}
senders.clear();
receivers.clear();
sendProcessor.dispose();
errorConsumer.accept(e);
onClose.onError(e);
}

Expand Down
27 changes: 13 additions & 14 deletions rsocket-core/src/main/java/io/rsocket/core/RSocketResponder.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Disposable;
import reactor.core.Exceptions;
import reactor.core.publisher.*;
Expand All @@ -51,6 +53,8 @@

/** Responder side of RSocket. Receives {@link ByteBuf}s from a peer's {@link RSocketRequester} */
class RSocketResponder implements RSocket {
private static final Logger LOGGER = LoggerFactory.getLogger(RSocketResponder.class);

private static final Consumer<ReferenceCounted> DROPPED_ELEMENTS_CONSUMER =
referenceCounted -> {
if (referenceCounted.refCnt() > 0) {
Expand All @@ -70,7 +74,6 @@ class RSocketResponder implements RSocket {
private final io.rsocket.ResponderRSocket responderRSocket;

private final PayloadDecoder payloadDecoder;
private final Consumer<Throwable> errorConsumer;
private final ResponderLeaseHandler leaseHandler;
private final Disposable leaseHandlerDisposable;
private final MonoProcessor<Void> onClose;
Expand All @@ -88,12 +91,10 @@ class RSocketResponder implements RSocket {
private final UnboundedProcessor<ByteBuf> sendProcessor;
private final ByteBufAllocator allocator;

@SuppressWarnings("deprecation")
RSocketResponder(
DuplexConnection connection,
RSocket requestHandler,
PayloadDecoder payloadDecoder,
Consumer<Throwable> errorConsumer,
ResponderLeaseHandler leaseHandler,
int mtu) {
this.connection = connection;
Expand All @@ -107,7 +108,6 @@ class RSocketResponder implements RSocket {
: null;

this.payloadDecoder = payloadDecoder;
this.errorConsumer = errorConsumer;
this.leaseHandler = leaseHandler;
this.sendingSubscriptions = new SynchronizedIntObjectHashMap<>();
this.channelProcessors = new SynchronizedIntObjectHashMap<>();
Expand All @@ -119,7 +119,7 @@ class RSocketResponder implements RSocket {

connection.send(sendProcessor).subscribe(null, this::handleSendProcessorError);

connection.receive().subscribe(this::handleFrame, errorConsumer);
connection.receive().subscribe(this::handleFrame, e -> {});
leaseHandlerDisposable = leaseHandler.send(sendProcessor::onNextPrioritized);

this.connection
Expand All @@ -136,7 +136,9 @@ private void handleSendProcessorError(Throwable t) {
try {
subscription.cancel();
} catch (Throwable e) {
errorConsumer.accept(e);
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Dropped exception", t);
}
}
});

Expand All @@ -147,7 +149,9 @@ private void handleSendProcessorError(Throwable t) {
try {
subscription.onError(t);
} catch (Throwable e) {
errorConsumer.accept(e);
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Dropped exception", t);
}
}
});
}
Expand Down Expand Up @@ -376,9 +380,7 @@ protected void hookOnSubscribe(Subscription subscription) {
}

@Override
protected void hookOnError(Throwable throwable) {
errorConsumer.accept(throwable);
}
protected void hookOnError(Throwable throwable) {}

@Override
protected void hookFinally(SignalType type) {
Expand Down Expand Up @@ -587,9 +589,7 @@ protected void hookOnSubscribe(Subscription subscription) {
}

@Override
protected void hookOnError(Throwable throwable) {
errorConsumer.accept(throwable);
}
protected void hookOnError(Throwable throwable) {}
});
}

Expand All @@ -603,7 +603,6 @@ private void handleCancelFrame(int streamId) {
}

private void handleError(int streamId, Throwable t) {
errorConsumer.accept(t);
sendProcessor.onNext(ErrorFrameCodec.encode(allocator, streamId, t));
}

Expand Down
20 changes: 1 addition & 19 deletions rsocket-core/src/main/java/io/rsocket/core/RSocketServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,6 @@ public final class RSocketServer {
private int mtu = 0;
private PayloadDecoder payloadDecoder = PayloadDecoder.DEFAULT;

private Consumer<Throwable> errorConsumer = ex -> {};

private RSocketServer() {}

/** Static factory method to create an {@code RSocketServer}. */
Expand Down Expand Up @@ -247,16 +245,6 @@ public RSocketServer payloadDecoder(PayloadDecoder decoder) {
return this;
}

/**
* @deprecated this is deprecated with no replacement and will be removed after {@link
* io.rsocket.RSocketFactory} is removed.
*/
@Deprecated
public RSocketServer errorConsumer(Consumer<Throwable> errorConsumer) {
this.errorConsumer = errorConsumer;
return this;
}

/**
* Start the server on the given transport.
*
Expand Down Expand Up @@ -387,7 +375,6 @@ private Mono<Void> acceptSetup(
new RSocketRequester(
wrappedMultiplexer.asServerConnection(),
payloadDecoder,
errorConsumer,
StreamIdSupplier.serverSupplier(),
mtu,
setupPayload.keepAliveInterval(),
Expand All @@ -414,19 +401,14 @@ private Mono<Void> acceptSetup(
ResponderLeaseHandler responderLeaseHandler =
leaseEnabled
? new ResponderLeaseHandler.Impl<>(
SERVER_TAG,
connection.alloc(),
leases.sender(),
errorConsumer,
leases.stats())
SERVER_TAG, connection.alloc(), leases.sender(), leases.stats())
: ResponderLeaseHandler.None;

RSocket rSocketResponder =
new RSocketResponder(
connection,
wrappedRSocketHandler,
payloadDecoder,
errorConsumer,
responderLeaseHandler,
mtu);
})
Expand Down
Loading

0 comments on commit faa0a85

Please sign in to comment.