Skip to content

Commit

Permalink
Migrate RedisConnectionFactory to Lifecycle beans.
Browse files Browse the repository at this point in the history
Closes: #2503
Original pull request: #2627
  • Loading branch information
christophstrobl authored and mp911de committed Jul 10, 2023
1 parent bc382ae commit 908a4d0
Show file tree
Hide file tree
Showing 27 changed files with 622 additions and 277 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import java.util.LinkedHashSet;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;

import javax.net.ssl.HostnameVerifier;
import javax.net.ssl.SSLParameters;
Expand All @@ -42,9 +43,9 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;

import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.context.SmartLifecycle;
import org.springframework.dao.DataAccessException;
import org.springframework.dao.InvalidDataAccessApiUsageException;
import org.springframework.dao.InvalidDataAccessResourceUsageException;
Expand Down Expand Up @@ -85,7 +86,7 @@
* @see JedisClientConfiguration
* @see Jedis
*/
public class JedisConnectionFactory implements InitializingBean, DisposableBean, RedisConnectionFactory {
public class JedisConnectionFactory implements RedisConnectionFactory, InitializingBean, DisposableBean, SmartLifecycle {

private final static Log log = LogFactory.getLog(JedisConnectionFactory.class);
private static final ExceptionTranslationStrategy EXCEPTION_TRANSLATION = new PassThroughExceptionTranslationStrategy(
Expand All @@ -104,8 +105,11 @@ public class JedisConnectionFactory implements InitializingBean, DisposableBean,
private @Nullable ClusterTopologyProvider topologyProvider;
private @Nullable ClusterCommandExecutor clusterCommandExecutor;

private boolean initialized;
private boolean destroyed;
enum State {
CREATED, STARTING, STARTED, STOPPING, STOPPED, DESTROYED;
}

private AtomicReference<State> state = new AtomicReference<>(State.CREATED);

/**
* Constructs a new {@link JedisConnectionFactory} instance with default settings (default connection pooling).
Expand Down Expand Up @@ -287,24 +291,80 @@ protected JedisConnection postProcessConnection(JedisConnection connection) {
return connection;
}

public void afterPropertiesSet() {
@Override
public void start() {

clientConfig = createClientConfig(getDatabase(), getRedisUsername(), getRedisPassword());
State current = state.getAndUpdate(state -> {
if (State.CREATED.equals(state) || State.STOPPED.equals(state)) {
return State.STARTING;
}
return state;
});

if (getUsePool() && !isRedisClusterAware()) {
this.pool = createPool();
if (State.CREATED.equals(current) || State.STOPPED.equals(current)) {

if (getUsePool() && !isRedisClusterAware()) {
this.pool = createPool();
}

if (isRedisClusterAware()) {

this.cluster = createCluster();
this.topologyProvider = createTopologyProvider(this.cluster);
this.clusterCommandExecutor = new ClusterCommandExecutor(this.topologyProvider,
new JedisClusterConnection.JedisClusterNodeResourceProvider(this.cluster, this.topologyProvider),
EXCEPTION_TRANSLATION);
}

state.set(State.STARTED);
}
}

if (isRedisClusterAware()) {
@Override
public void stop() {

if (state.compareAndSet(State.STARTED, State.STOPPING)) {
if (getUsePool() && !isRedisClusterAware()) {
if (pool != null) {
try {
this.pool.close();
} catch (Exception ex) {
log.warn("Cannot properly close Jedis pool", ex);
}
this.pool = null;
}
}

if(this.clusterCommandExecutor != null) {
try {
this.clusterCommandExecutor.destroy();
} catch (Exception e) {
throw new RuntimeException(e);
}
}

this.cluster = createCluster();
this.topologyProvider = createTopologyProvider(this.cluster);
this.clusterCommandExecutor = new ClusterCommandExecutor(this.topologyProvider,
new JedisClusterConnection.JedisClusterNodeResourceProvider(this.cluster, this.topologyProvider),
EXCEPTION_TRANSLATION);
if (this.cluster != null) {

this.topologyProvider = null;

try {
cluster.close();
} catch (Exception ex) {
log.warn("Cannot properly close Jedis cluster", ex);
}
}
state.set(State.STOPPED);
}
}

@Override
public boolean isRunning() {
return State.STARTED.equals(state.get());
}

this.initialized = true;
@Override
public void afterPropertiesSet() {
clientConfig = createClientConfig(getDatabase(), getRedisUsername(), getRedisPassword());
}

JedisClientConfig createSentinelClientConfig(SentinelConfiguration sentinelConfiguration) {
Expand Down Expand Up @@ -415,32 +475,8 @@ protected JedisCluster createCluster(RedisClusterConfiguration clusterConfig,

public void destroy() {

if (getUsePool() && pool != null) {

try {
pool.destroy();
} catch (Exception ex) {
log.warn("Cannot properly close Jedis pool", ex);
}
pool = null;
}

if (cluster != null) {

try {
cluster.close();
} catch (Exception ex) {
log.warn("Cannot properly close Jedis cluster", ex);
}

try {
clusterCommandExecutor.destroy();
} catch (Exception ex) {
log.warn("Cannot properly close cluster command executor", ex);
}
}

this.destroyed = true;
stop();
state.set(State.DESTROYED);
}

public RedisConnection getConnection() {
Expand Down Expand Up @@ -866,8 +902,19 @@ private MutableJedisClientConfiguration getMutableConfiguration() {
}

private void assertInitialized() {
Assert.state(this.initialized, "JedisConnectionFactory was not initialized through afterPropertiesSet()");
Assert.state(!this.destroyed, "JedisConnectionFactory was destroyed and cannot be used anymore");

State current = state.get();

if (State.STARTED.equals(current)) {
return;
}

switch (current) {
case CREATED, STOPPED -> throw new IllegalStateException(String.format("JedisConnectionFactory has been %s. Use start() to initialize it", current));
case DESTROYED -> throw new IllegalStateException(
"JedisConnectionFactory was destroyed and cannot be used anymore");
default -> throw new IllegalStateException(String.format("JedisConnectionFactory is %s", current));
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,13 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.stream.Collectors;

import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.context.SmartLifecycle;
import org.springframework.dao.DataAccessException;
import org.springframework.dao.InvalidDataAccessApiUsageException;
import org.springframework.data.redis.ExceptionTranslationStrategy;
Expand Down Expand Up @@ -116,8 +118,8 @@
* @author Andrea Como
* @author Chris Bono
*/
public class LettuceConnectionFactory
implements InitializingBean, DisposableBean, RedisConnectionFactory, ReactiveRedisConnectionFactory {
public class LettuceConnectionFactory implements RedisConnectionFactory, ReactiveRedisConnectionFactory,
InitializingBean, DisposableBean, SmartLifecycle {

private static final ExceptionTranslationStrategy EXCEPTION_TRANSLATION = new PassThroughExceptionTranslationStrategy(
LettuceExceptionConverter.INSTANCE);
Expand All @@ -144,8 +146,11 @@ public class LettuceConnectionFactory

private @Nullable ClusterCommandExecutor clusterCommandExecutor;

private boolean initialized;
private boolean destroyed;
enum State {
CREATED, STARTING, STARTED, STOPPING, STOPPED, DESTROYED;
}

private AtomicReference<State> state = new AtomicReference<>(State.CREATED);

/**
* Constructs a new {@link LettuceConnectionFactory} instance with default settings.
Expand Down Expand Up @@ -333,33 +338,78 @@ public static RedisConfiguration createRedisConfiguration(RedisURI redisUri) {
return LettuceConverters.createRedisStandaloneConfiguration(redisUri);
}

public void afterPropertiesSet() {
@Override
public void start() {

State current = state.getAndUpdate(state -> {
if (State.CREATED.equals(state) || State.STOPPED.equals(state)) {
return State.STARTING;
}
return state;
});

this.client = createClient();
if (State.CREATED.equals(current) || State.STOPPED.equals(current)) {

this.connectionProvider = new ExceptionTranslatingConnectionProvider(createConnectionProvider(client, CODEC));
this.reactiveConnectionProvider = new ExceptionTranslatingConnectionProvider(
createConnectionProvider(client, LettuceReactiveRedisConnection.CODEC));
this.client = createClient();

if (isClusterAware()) {
this.connectionProvider = new ExceptionTranslatingConnectionProvider(createConnectionProvider(client, CODEC));
this.reactiveConnectionProvider = new ExceptionTranslatingConnectionProvider(
createConnectionProvider(client, LettuceReactiveRedisConnection.CODEC));

if (isClusterAware()) {

this.clusterCommandExecutor = new ClusterCommandExecutor(
new LettuceClusterTopologyProvider((RedisClusterClient) client),
new LettuceClusterConnection.LettuceClusterNodeResourceProvider(this.connectionProvider),
EXCEPTION_TRANSLATION);
}

state.set(State.STARTED);

this.clusterCommandExecutor = new ClusterCommandExecutor(
new LettuceClusterTopologyProvider((RedisClusterClient) client),
new LettuceClusterConnection.LettuceClusterNodeResourceProvider(this.connectionProvider),
EXCEPTION_TRANSLATION);
if (getEagerInitialization() && getShareNativeConnection()) {
initConnection();
}
}
}

@Override
public void stop() {

this.initialized = true;
if (state.compareAndSet(State.STARTED, State.STOPPING)) {
resetConnection();
dispose(connectionProvider);
dispose(reactiveConnectionProvider);
try {
Duration quietPeriod = clientConfiguration.getShutdownQuietPeriod();
Duration timeout = clientConfiguration.getShutdownTimeout();
client.shutdown(quietPeriod.toMillis(), timeout.toMillis(), TimeUnit.MILLISECONDS);
state.set(State.STOPPED);
} catch (Exception e) {

if (getEagerInitialization() && getShareNativeConnection()) {
initConnection();
if (log.isWarnEnabled()) {
log.warn((client != null ? ClassUtils.getShortName(client.getClass()) : "LettuceClient")
+ " did not shut down gracefully.", e);
}
}
state.set(State.STOPPED);
}
}

public void destroy() {
@Override
public boolean isRunning() {
return State.STARTED.equals(state.get());
}

resetConnection();
@Override
public void afterPropertiesSet() {
// customization hook. initialization happens in start
}

@Override
public void destroy() {

stop();
client = null;
if (clusterCommandExecutor != null) {

try {
Expand All @@ -368,23 +418,7 @@ public void destroy() {
log.warn("Cannot properly close cluster command executor", ex);
}
}

dispose(connectionProvider);
dispose(reactiveConnectionProvider);

try {
Duration quietPeriod = clientConfiguration.getShutdownQuietPeriod();
Duration timeout = clientConfiguration.getShutdownTimeout();
client.shutdown(quietPeriod.toMillis(), timeout.toMillis(), TimeUnit.MILLISECONDS);
} catch (Exception e) {

if (log.isWarnEnabled()) {
log.warn((client != null ? ClassUtils.getShortName(client.getClass()) : "LettuceClient")
+ " did not shut down gracefully.", e);
}
}

this.destroyed = true;
state.set(State.DESTROYED);
}

private void dispose(LettuceConnectionProvider connectionProvider) {
Expand Down Expand Up @@ -532,8 +566,6 @@ public void initConnection() {
*/
public void resetConnection() {

assertInitialized();

Optionals.toStream(Optional.ofNullable(connection), Optional.ofNullable(reactiveConnection))
.forEach(SharedConnection::resetConnection);

Expand Down Expand Up @@ -1267,8 +1299,19 @@ private RedisClient createBasicClient() {
}

private void assertInitialized() {
Assert.state(this.initialized, "LettuceConnectionFactory was not initialized through afterPropertiesSet()");
Assert.state(!this.destroyed, "LettuceConnectionFactory was destroyed and cannot be used anymore");

State current = state.get();

if (State.STARTED.equals(current)) {
return;
}

switch (current) {
case CREATED, STOPPED -> throw new IllegalStateException(String.format("LettuceConnectionFactory has been %s. Use start() to initialize it", current));
case DESTROYED -> throw new IllegalStateException(
"LettuceConnectionFactory was destroyed and cannot be used anymore");
default -> throw new IllegalStateException(String.format("LettuceConnectionFactory is %s", current));
}
}

private static void applyToAll(RedisURI source, Consumer<RedisURI> action) {
Expand Down
Loading

0 comments on commit 908a4d0

Please sign in to comment.