Skip to content

Commit

Permalink
Update Javadoc in loadbalance package
Browse files Browse the repository at this point in the history
Closes gh-954

Signed-off-by: Rossen Stoyanchev <[email protected]>
  • Loading branch information
rstoyanchev committed Apr 19, 2021
1 parent c80b3cb commit 831d5bf
Show file tree
Hide file tree
Showing 13 changed files with 212 additions and 118 deletions.
17 changes: 17 additions & 0 deletions rsocket-core/src/main/java/io/rsocket/core/ResolvingOperator.java
Original file line number Diff line number Diff line change
@@ -1,3 +1,18 @@
/*
* Copyright 2015-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.rsocket.core;

import java.time.Duration;
Expand All @@ -15,6 +30,8 @@
import reactor.util.annotation.Nullable;
import reactor.util.context.Context;

// A copy of this class exists in io.rsocket.loadbalance

class ResolvingOperator<T> implements Disposable {

static final CancellationException ON_DISPOSE = new CancellationException("Disposed");
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,33 @@
/*
* Copyright 2015-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.rsocket.loadbalance;

import io.rsocket.util.Clock;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;

/**
* The base implementation of the {@link WeightedStats} interface
* Implementation of {@link WeightedStats} that manages tracking state and exposes the required
* stats.
*
* <p>A sub-class or a different class (delegation) needs to call {@link #startStream()}, {@link
* #stopStream()}, {@link #startRequest()}, and {@link #stopRequest(long)} to drive state tracking.
*
* @since 1.1
* @see WeightedStatsRequestInterceptor
*/
public class BaseWeightedStats implements WeightedStats {

Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,40 @@
/*
* Copyright 2015-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.rsocket.loadbalance;

import io.rsocket.core.RSocketConnector;
import io.rsocket.plugins.InterceptorRegistry;

/**
* Extension for {@link LoadbalanceStrategy} which allows pre-setup {@link RSocketConnector} for
* {@link LoadbalanceStrategy} needs
* A {@link LoadbalanceStrategy} with an interest in configuring the {@link RSocketConnector} for
* connecting to load-balance targets in order to hook into request lifecycle and track usage
* statistics.
*
* <p>Currently this callback interface is supported for strategies configured in {@link
* LoadbalanceRSocketClient}.
*
* @since 1.1
*/
public interface ClientLoadbalanceStrategy extends LoadbalanceStrategy {

/**
* Initialize the connector, for example using the {@link InterceptorRegistry}, to intercept
* requests.
*
* @param connector the connector to configure
*/
void initialize(RSocketConnector connector);
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2015-2020 the original author or authors.
* Copyright 2015-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -19,15 +19,16 @@
import io.rsocket.RSocket;
import io.rsocket.core.RSocketClient;
import io.rsocket.core.RSocketConnector;
import io.rsocket.transport.ClientTransport;
import java.util.List;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.annotation.Nullable;

/**
* {@link RSocketClient} implementation that uses a {@link LoadbalanceStrategy} to select the {@code
* RSocket} to use for a given request from a pool of possible targets.
* An implementation of {@link RSocketClient backed by a pool of {@code RSocket} instances and using a {@link
* LoadbalanceStrategy} to select the {@code RSocket} to use for a given request.
*
* @since 1.1
*/
Expand All @@ -39,6 +40,7 @@ private LoadbalanceRSocketClient(RSocketPool rSocketPool) {
this.rSocketPool = rSocketPool;
}

/** Return {@code Mono} that selects an RSocket from the underlying pool. */
@Override
public Mono<RSocket> source() {
return Mono.fromSupplier(rSocketPool::select);
Expand Down Expand Up @@ -75,7 +77,7 @@ public void dispose() {
}

/**
* Shortcut to create an {@link LoadbalanceRSocketClient} with round robin loadalancing.
* Shortcut to create an {@link LoadbalanceRSocketClient} with round-robin load balancing.
* Effectively a shortcut for:
*
* <pre class="cdoe">
Expand All @@ -84,8 +86,8 @@ public void dispose() {
* .build();
* </pre>
*
* @param connector the {@link Builder#connector(RSocketConnector) to use
* @param targetPublisher publisher that periodically refreshes the list of targets to loadbalance across.
* @param connector a "template" for connecting to load balance targets
* @param targetPublisher refreshes the list of load balance targets periodically
* @return the created client instance
*/
public static LoadbalanceRSocketClient create(
Expand All @@ -94,11 +96,10 @@ public static LoadbalanceRSocketClient create(
}

/**
* Return a builder to create an {@link LoadbalanceRSocketClient} with.
* Return a builder for a {@link LoadbalanceRSocketClient}.
*
* @param targetPublisher publisher that periodically refreshes the list of targets to loadbalance
* across.
* @return the builder instance
* @param targetPublisher refreshes the list of load balance targets periodically
* @return the created builder
*/
public static Builder builder(Publisher<List<LoadbalanceTarget>> targetPublisher) {
return new Builder(targetPublisher);
Expand All @@ -118,10 +119,11 @@ public static class Builder {
}

/**
* The given {@link RSocketConnector} is used as a template to produce the {@code Mono<RSocket>}
* source for each {@link LoadbalanceTarget}. This is done by passing the {@code
* ClientTransport} contained in every target to the {@code connect} method of the given
* connector instance.
* Configure the "template" connector to use for connecting to load balance targets. To
* establish a connection, the {@link LoadbalanceTarget#getTransport() ClientTransport}
* contained in each target is passed to the connector's {@link
* RSocketConnector#connect(ClientTransport) connect} method and thus the same connector with
* the same settings applies to all targets.
*
* <p>By default this is initialized with {@link RSocketConnector#create()}.
*
Expand All @@ -133,7 +135,7 @@ public Builder connector(RSocketConnector connector) {
}

/**
* Switch to using a round-robin strategy for selecting a target.
* Configure {@link RoundRobinLoadbalanceStrategy} as the strategy to use to select targets.
*
* <p>This is the strategy used by default.
*/
Expand All @@ -143,8 +145,7 @@ public Builder roundRobinLoadbalanceStrategy() {
}

/**
* Switch to using a strategy that assigns a weight to each pooled {@code RSocket} based on
* actual usage stats, and uses that to make a choice.
* Configure {@link WeightedLoadbalanceStrategy} as the strategy to use to select targets.
*
* <p>By default, {@link RoundRobinLoadbalanceStrategy} is used.
*/
Expand All @@ -154,7 +155,7 @@ public Builder weightedLoadbalanceStrategy() {
}

/**
* Provide the {@link LoadbalanceStrategy} to use.
* Configure the {@link LoadbalanceStrategy} to use.
*
* <p>By default, {@link RoundRobinLoadbalanceStrategy} is used.
*/
Expand All @@ -165,8 +166,13 @@ public Builder loadbalanceStrategy(LoadbalanceStrategy strategy) {

/** Build the {@link LoadbalanceRSocketClient} instance. */
public LoadbalanceRSocketClient build() {
final RSocketConnector connector = initConnector();
final LoadbalanceStrategy strategy = initLoadbalanceStrategy();
final RSocketConnector connector =
(this.connector != null ? this.connector : RSocketConnector.create());

final LoadbalanceStrategy strategy =
(this.loadbalanceStrategy != null
? this.loadbalanceStrategy
: new RoundRobinLoadbalanceStrategy());

if (strategy instanceof ClientLoadbalanceStrategy) {
((ClientLoadbalanceStrategy) strategy).initialize(connector);
Expand All @@ -175,15 +181,5 @@ public LoadbalanceRSocketClient build() {
return new LoadbalanceRSocketClient(
new RSocketPool(connector, this.targetPublisher, strategy));
}

private RSocketConnector initConnector() {
return (this.connector != null ? this.connector : RSocketConnector.create());
}

private LoadbalanceStrategy initLoadbalanceStrategy() {
return (this.loadbalanceStrategy != null
? this.loadbalanceStrategy
: new RoundRobinLoadbalanceStrategy());
}
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2015-2020 the original author or authors.
* Copyright 2015-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -18,8 +18,21 @@
import io.rsocket.RSocket;
import java.util.List;

/**
* Strategy to select an {@link RSocket} given a list of instances for load-balancing purposes. A
* simple implementation might go in round-robin fashion while a more sophisticated strategy might
* check availability, track usage stats, and so on.
*
* @since 1.1
*/
@FunctionalInterface
public interface LoadbalanceStrategy {

RSocket select(List<RSocket> availableRSockets);
/**
* Select an {@link RSocket} from the given non-empty list.
*
* @param sockets the list to choose from
* @return the selected instance
*/
RSocket select(List<RSocket> sockets);
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2020 the original author or authors.
* Copyright 2002-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -15,14 +15,18 @@
*/
package io.rsocket.loadbalance;

import io.rsocket.core.RSocketConnector;
import io.rsocket.transport.ClientTransport;
import org.reactivestreams.Publisher;

/**
* Simple container for a key and a {@link ClientTransport}, representing a specific target for
* loadbalancing purposes. The key is used to compare previous and new targets when refreshing the
* list of target to use. The transport is used to connect to the target.
* Representation for a load-balance target used as input to {@link LoadbalanceRSocketClient} that
* in turn maintains and peridodically updates a list of current load-balance targets. The {@link
* #getKey()} is used to identify a target uniquely while the {@link #getTransport() transport} is
* used to connect to the target server.
*
* @since 1.1
* @see LoadbalanceRSocketClient#create(RSocketConnector, Publisher)
*/
public class LoadbalanceTarget {

Expand All @@ -34,23 +38,22 @@ private LoadbalanceTarget(String key, ClientTransport transport) {
this.transport = transport;
}

/** Return the key for this target. */
/** Return the key that identifies this target uniquely. */
public String getKey() {
return key;
}

/** Return the transport to use to connect to the target. */
/** Return the transport to use to connect to the target server. */
public ClientTransport getTransport() {
return transport;
}

/**
* Create a an instance of {@link LoadbalanceTarget} with the given key and {@link
* ClientTransport}. The key can be anything that can be used to identify identical targets, e.g.
* a SocketAddress, URL, etc.
* Create a new {@link LoadbalanceTarget} with the given key and {@link ClientTransport}. The key
* can be anything that identifies the target uniquely, e.g. SocketAddress, URL, and so on.
*
* @param key the key to use to identify identical targets
* @param transport the transport to use for connecting to the target
* @param key identifies the load-balance target uniquely
* @param transport for connecting to the target
* @return the created instance
*/
public static LoadbalanceTarget from(String key, ClientTransport transport) {
Expand Down
Loading

0 comments on commit 831d5bf

Please sign in to comment.