Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update Javadoc for Load-Balancing #1000

Merged
merged 1 commit into from
Apr 19, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions rsocket-core/src/main/java/io/rsocket/core/ResolvingOperator.java
Original file line number Diff line number Diff line change
@@ -1,3 +1,18 @@
/*
* Copyright 2015-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.rsocket.core;

import java.time.Duration;
Expand All @@ -15,6 +30,8 @@
import reactor.util.annotation.Nullable;
import reactor.util.context.Context;

// A copy of this class exists in io.rsocket.loadbalance

class ResolvingOperator<T> implements Disposable {

static final CancellationException ON_DISPOSE = new CancellationException("Disposed");
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,33 @@
/*
* Copyright 2015-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.rsocket.loadbalance;

import io.rsocket.util.Clock;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;

/**
* The base implementation of the {@link WeightedStats} interface
* Implementation of {@link WeightedStats} that manages tracking state and exposes the required
* stats.
*
* <p>A sub-class or a different class (delegation) needs to call {@link #startStream()}, {@link
* #stopStream()}, {@link #startRequest()}, and {@link #stopRequest(long)} to drive state tracking.
*
* @since 1.1
* @see WeightedStatsRequestInterceptor
*/
public class BaseWeightedStats implements WeightedStats {

Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,40 @@
/*
* Copyright 2015-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.rsocket.loadbalance;

import io.rsocket.core.RSocketConnector;
import io.rsocket.plugins.InterceptorRegistry;

/**
* Extension for {@link LoadbalanceStrategy} which allows pre-setup {@link RSocketConnector} for
* {@link LoadbalanceStrategy} needs
* A {@link LoadbalanceStrategy} with an interest in configuring the {@link RSocketConnector} for
* connecting to load-balance targets in order to hook into request lifecycle and track usage
* statistics.
*
* <p>Currently this callback interface is supported for strategies configured in {@link
* LoadbalanceRSocketClient}.
*
* @since 1.1
*/
public interface ClientLoadbalanceStrategy extends LoadbalanceStrategy {

/**
* Initialize the connector, for example using the {@link InterceptorRegistry}, to intercept
* requests.
*
* @param connector the connector to configure
*/
void initialize(RSocketConnector connector);
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2015-2020 the original author or authors.
* Copyright 2015-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -19,15 +19,16 @@
import io.rsocket.RSocket;
import io.rsocket.core.RSocketClient;
import io.rsocket.core.RSocketConnector;
import io.rsocket.transport.ClientTransport;
import java.util.List;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.annotation.Nullable;

/**
* {@link RSocketClient} implementation that uses a {@link LoadbalanceStrategy} to select the {@code
* RSocket} to use for a given request from a pool of possible targets.
* An implementation of {@link RSocketClient backed by a pool of {@code RSocket} instances and using a {@link
* LoadbalanceStrategy} to select the {@code RSocket} to use for a given request.
*
* @since 1.1
*/
Expand All @@ -39,6 +40,7 @@ private LoadbalanceRSocketClient(RSocketPool rSocketPool) {
this.rSocketPool = rSocketPool;
}

/** Return {@code Mono} that selects an RSocket from the underlying pool. */
@Override
public Mono<RSocket> source() {
return Mono.fromSupplier(rSocketPool::select);
Expand Down Expand Up @@ -75,7 +77,7 @@ public void dispose() {
}

/**
* Shortcut to create an {@link LoadbalanceRSocketClient} with round robin loadalancing.
* Shortcut to create an {@link LoadbalanceRSocketClient} with round-robin load balancing.
* Effectively a shortcut for:
*
* <pre class="cdoe">
Expand All @@ -84,8 +86,8 @@ public void dispose() {
* .build();
* </pre>
*
* @param connector the {@link Builder#connector(RSocketConnector) to use
* @param targetPublisher publisher that periodically refreshes the list of targets to loadbalance across.
* @param connector a "template" for connecting to load balance targets
* @param targetPublisher refreshes the list of load balance targets periodically
* @return the created client instance
*/
public static LoadbalanceRSocketClient create(
Expand All @@ -94,11 +96,10 @@ public static LoadbalanceRSocketClient create(
}

/**
* Return a builder to create an {@link LoadbalanceRSocketClient} with.
* Return a builder for a {@link LoadbalanceRSocketClient}.
*
* @param targetPublisher publisher that periodically refreshes the list of targets to loadbalance
* across.
* @return the builder instance
* @param targetPublisher refreshes the list of load balance targets periodically
* @return the created builder
*/
public static Builder builder(Publisher<List<LoadbalanceTarget>> targetPublisher) {
return new Builder(targetPublisher);
Expand All @@ -118,10 +119,11 @@ public static class Builder {
}

/**
* The given {@link RSocketConnector} is used as a template to produce the {@code Mono<RSocket>}
* source for each {@link LoadbalanceTarget}. This is done by passing the {@code
* ClientTransport} contained in every target to the {@code connect} method of the given
* connector instance.
* Configure the "template" connector to use for connecting to load balance targets. To
* establish a connection, the {@link LoadbalanceTarget#getTransport() ClientTransport}
* contained in each target is passed to the connector's {@link
* RSocketConnector#connect(ClientTransport) connect} method and thus the same connector with
* the same settings applies to all targets.
*
* <p>By default this is initialized with {@link RSocketConnector#create()}.
*
Expand All @@ -133,7 +135,7 @@ public Builder connector(RSocketConnector connector) {
}

/**
* Switch to using a round-robin strategy for selecting a target.
* Configure {@link RoundRobinLoadbalanceStrategy} as the strategy to use to select targets.
*
* <p>This is the strategy used by default.
*/
Expand All @@ -143,8 +145,7 @@ public Builder roundRobinLoadbalanceStrategy() {
}

/**
* Switch to using a strategy that assigns a weight to each pooled {@code RSocket} based on
* actual usage stats, and uses that to make a choice.
* Configure {@link WeightedLoadbalanceStrategy} as the strategy to use to select targets.
*
* <p>By default, {@link RoundRobinLoadbalanceStrategy} is used.
*/
Expand All @@ -154,7 +155,7 @@ public Builder weightedLoadbalanceStrategy() {
}

/**
* Provide the {@link LoadbalanceStrategy} to use.
* Configure the {@link LoadbalanceStrategy} to use.
*
* <p>By default, {@link RoundRobinLoadbalanceStrategy} is used.
*/
Expand All @@ -165,8 +166,13 @@ public Builder loadbalanceStrategy(LoadbalanceStrategy strategy) {

/** Build the {@link LoadbalanceRSocketClient} instance. */
public LoadbalanceRSocketClient build() {
final RSocketConnector connector = initConnector();
final LoadbalanceStrategy strategy = initLoadbalanceStrategy();
final RSocketConnector connector =
(this.connector != null ? this.connector : RSocketConnector.create());

final LoadbalanceStrategy strategy =
(this.loadbalanceStrategy != null
? this.loadbalanceStrategy
: new RoundRobinLoadbalanceStrategy());

if (strategy instanceof ClientLoadbalanceStrategy) {
((ClientLoadbalanceStrategy) strategy).initialize(connector);
Expand All @@ -175,15 +181,5 @@ public LoadbalanceRSocketClient build() {
return new LoadbalanceRSocketClient(
new RSocketPool(connector, this.targetPublisher, strategy));
}

private RSocketConnector initConnector() {
return (this.connector != null ? this.connector : RSocketConnector.create());
}

private LoadbalanceStrategy initLoadbalanceStrategy() {
return (this.loadbalanceStrategy != null
? this.loadbalanceStrategy
: new RoundRobinLoadbalanceStrategy());
}
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2015-2020 the original author or authors.
* Copyright 2015-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -18,8 +18,21 @@
import io.rsocket.RSocket;
import java.util.List;

/**
* Strategy to select an {@link RSocket} given a list of instances for load-balancing purposes. A
* simple implementation might go in round-robin fashion while a more sophisticated strategy might
* check availability, track usage stats, and so on.
*
* @since 1.1
*/
@FunctionalInterface
public interface LoadbalanceStrategy {

RSocket select(List<RSocket> availableRSockets);
/**
* Select an {@link RSocket} from the given non-empty list.
*
* @param sockets the list to choose from
* @return the selected instance
*/
RSocket select(List<RSocket> sockets);
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2020 the original author or authors.
* Copyright 2002-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -15,14 +15,18 @@
*/
package io.rsocket.loadbalance;

import io.rsocket.core.RSocketConnector;
import io.rsocket.transport.ClientTransport;
import org.reactivestreams.Publisher;

/**
* Simple container for a key and a {@link ClientTransport}, representing a specific target for
* loadbalancing purposes. The key is used to compare previous and new targets when refreshing the
* list of target to use. The transport is used to connect to the target.
* Representation for a load-balance target used as input to {@link LoadbalanceRSocketClient} that
* in turn maintains and peridodically updates a list of current load-balance targets. The {@link
* #getKey()} is used to identify a target uniquely while the {@link #getTransport() transport} is
* used to connect to the target server.
*
* @since 1.1
* @see LoadbalanceRSocketClient#create(RSocketConnector, Publisher)
*/
public class LoadbalanceTarget {

Expand All @@ -34,23 +38,22 @@ private LoadbalanceTarget(String key, ClientTransport transport) {
this.transport = transport;
}

/** Return the key for this target. */
/** Return the key that identifies this target uniquely. */
public String getKey() {
return key;
}

/** Return the transport to use to connect to the target. */
/** Return the transport to use to connect to the target server. */
public ClientTransport getTransport() {
return transport;
}

/**
* Create a an instance of {@link LoadbalanceTarget} with the given key and {@link
* ClientTransport}. The key can be anything that can be used to identify identical targets, e.g.
* a SocketAddress, URL, etc.
* Create a new {@link LoadbalanceTarget} with the given key and {@link ClientTransport}. The key
* can be anything that identifies the target uniquely, e.g. SocketAddress, URL, and so on.
*
* @param key the key to use to identify identical targets
* @param transport the transport to use for connecting to the target
* @param key identifies the load-balance target uniquely
* @param transport for connecting to the target
* @return the created instance
*/
public static LoadbalanceTarget from(String key, ClientTransport transport) {
Expand Down
Loading