Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revise RedisKeyValueAdapter to support lifecycle #2959

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

<groupId>org.springframework.data</groupId>
<artifactId>spring-data-redis</artifactId>
<version>3.4.0-SNAPSHOT</version>
<version>3.4.0-GH-2957-SNAPSHOT</version>

<name>Spring Data Redis</name>
<description>Spring Data module for Redis</description>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,16 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import org.springframework.beans.BeansException;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.ApplicationListener;
import org.springframework.context.SmartLifecycle;
import org.springframework.core.convert.ConversionService;
import org.springframework.data.keyvalue.core.AbstractKeyValueAdapter;
import org.springframework.data.keyvalue.core.KeyValueAdapter;
Expand Down Expand Up @@ -99,17 +103,19 @@
* @author Mark Paluch
* @author Andrey Muchnik
* @author John Blum
* @author Lucian Torje
* @since 1.7
*/
public class RedisKeyValueAdapter extends AbstractKeyValueAdapter
implements InitializingBean, ApplicationContextAware, ApplicationListener<RedisKeyspaceEvent> {
implements InitializingBean, SmartLifecycle, ApplicationContextAware, ApplicationListener<RedisKeyspaceEvent> {

/**
* Time To Live in seconds that phantom keys should live longer than the actual key.
*/
private static final int PHANTOM_KEY_TTL = 300;

private final Log logger = LogFactory.getLog(getClass());
private final AtomicReference<State> state = new AtomicReference<>(State.CREATED);

private RedisOperations<?, ?> redisOps;
private RedisConverter converter;
private @Nullable RedisMessageListenerContainer messageListenerContainer;
Expand All @@ -121,6 +127,13 @@ public class RedisKeyValueAdapter extends AbstractKeyValueAdapter
private @Nullable String keyspaceNotificationsConfigParameter = null;
private ShadowCopy shadowCopy = ShadowCopy.DEFAULT;

/**
* Lifecycle state of this factory.
*/
enum State {
CREATED, STARTING, STARTED, STOPPING, STOPPED, DESTROYED;
}

/**
* Creates new {@link RedisKeyValueAdapter} with default {@link RedisMappingContext} and default
* {@link RedisCustomConversions}.
Expand Down Expand Up @@ -202,7 +215,7 @@ public Object put(Object id, Object item, String keyspace) {
&& this.expirationListener.get() == null) {

if (rdo.getTimeToLive() != null && rdo.getTimeToLive() > 0) {
initKeyExpirationListener();
initKeyExpirationListener(this.messageListenerContainer);
}
}

Expand Down Expand Up @@ -686,6 +699,11 @@ public void setShadowCopy(ShadowCopy shadowCopy) {
this.shadowCopy = shadowCopy;
}

@Override
public boolean isRunning() {
return State.STARTED.equals(this.state.get());
}

/**
* @see org.springframework.beans.factory.InitializingBean#afterPropertiesSet()
* @since 1.8
Expand All @@ -696,22 +714,61 @@ public void afterPropertiesSet() {
if (this.managedListenerContainer) {
initMessageListenerContainer();
}
}

@Override
public void start() {

State current = this.state.getAndUpdate(state -> isCreatedOrStopped(state) ? State.STARTING : state);

if (isCreatedOrStopped(current)) {

messageListenerContainer.start();

if (ObjectUtils.nullSafeEquals(EnableKeyspaceEvents.ON_STARTUP, this.enableKeyspaceEvents)) {
initKeyExpirationListener(this.messageListenerContainer);
}

this.state.set(State.STARTED);
}
}

private static boolean isCreatedOrStopped(@Nullable State state) {
return State.CREATED.equals(state) || State.STOPPED.equals(state);
}

@Override
public void stop() {

if (ObjectUtils.nullSafeEquals(EnableKeyspaceEvents.ON_STARTUP, this.enableKeyspaceEvents)) {
initKeyExpirationListener();
if (state.compareAndSet(State.STARTED, State.STOPPING)) {

KeyExpirationEventMessageListener listener = this.expirationListener.get();
if (listener != null) {

if (this.expirationListener.compareAndSet(listener, null)) {
try {
listener.destroy();
} catch (Exception e) {
logger.warn("Could not destroy KeyExpirationEventMessageListener", e);
}
}
}

messageListenerContainer.stop();
state.set(State.STOPPED);
}
}

public void destroy() throws Exception {

if (this.expirationListener.get() != null) {
this.expirationListener.get().destroy();
}
stop();

if (this.managedListenerContainer && this.messageListenerContainer != null) {
this.messageListenerContainer.destroy();
this.messageListenerContainer = null;
}

this.state.set(State.DESTROYED);
}

@Override
Expand All @@ -729,13 +786,12 @@ private void initMessageListenerContainer() {
this.messageListenerContainer = new RedisMessageListenerContainer();
this.messageListenerContainer.setConnectionFactory(((RedisTemplate<?, ?>) redisOps).getConnectionFactory());
this.messageListenerContainer.afterPropertiesSet();
this.messageListenerContainer.start();
}

private void initKeyExpirationListener() {
private void initKeyExpirationListener(RedisMessageListenerContainer messageListenerContainer) {

if (this.expirationListener.get() == null) {
MappingExpirationListener listener = new MappingExpirationListener(this.messageListenerContainer, this.redisOps,
MappingExpirationListener listener = new MappingExpirationListener(messageListenerContainer, this.redisOps,
this.converter, this.shadowCopy);

listener.setKeyspaceNotificationsConfigParameter(keyspaceNotificationsConfigParameter);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@
* @author Christoph Strobl
* @since 1.7
*/
public class KeyExpirationEventMessageListener extends KeyspaceEventMessageListener implements
ApplicationEventPublisherAware {
public class KeyExpirationEventMessageListener extends KeyspaceEventMessageListener
implements ApplicationEventPublisherAware {

private static final Topic KEYEVENT_EXPIRED_TOPIC = new PatternTopic("__keyevent@*__:expired");

Expand All @@ -45,6 +45,11 @@ public KeyExpirationEventMessageListener(RedisMessageListenerContainer listenerC
super(listenerContainer);
}

@Override
public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
this.publisher = applicationEventPublisher;
}

@Override
protected void doRegister(RedisMessageListenerContainer listenerContainer) {
listenerContainer.addMessageListener(this, KEYEVENT_EXPIRED_TOPIC);
Expand All @@ -67,8 +72,4 @@ protected void publishEvent(RedisKeyExpiredEvent event) {
}
}

@Override
public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
this.publisher = applicationEventPublisher;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.data.redis.connection.RedisConnection;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.RedisServerCommands;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
import org.springframework.util.ObjectUtils;
Expand All @@ -40,7 +42,7 @@ public abstract class KeyspaceEventMessageListener implements MessageListener, I

private final RedisMessageListenerContainer listenerContainer;

private String keyspaceNotificationsConfigParameter = "EA";
private @Nullable String keyspaceNotificationsConfigParameter = "EA";

/**
* Creates new {@link KeyspaceEventMessageListener}.
Expand All @@ -53,6 +55,26 @@ public KeyspaceEventMessageListener(RedisMessageListenerContainer listenerContai
this.listenerContainer = listenerContainer;
}

/**
* Set the configuration string to use for {@literal notify-keyspace-events}.
*
* @param keyspaceNotificationsConfigParameter can be {@literal null}.
* @since 1.8
*/
public void setKeyspaceNotificationsConfigParameter(@Nullable String keyspaceNotificationsConfigParameter) {
this.keyspaceNotificationsConfigParameter = keyspaceNotificationsConfigParameter;
}

@Override
public void afterPropertiesSet() {
init();
}

@Override
public void destroy() throws Exception {
listenerContainer.removeMessageListener(this);
}

@Override
public void onMessage(Message message, @Nullable byte[] pattern) {

Expand All @@ -76,20 +98,18 @@ public void onMessage(Message message, @Nullable byte[] pattern) {
*/
public void init() {

if (StringUtils.hasText(keyspaceNotificationsConfigParameter)) {
RedisConnectionFactory connectionFactory = listenerContainer.getConnectionFactory();

RedisConnection connection = listenerContainer.getConnectionFactory().getConnection();
if (StringUtils.hasText(keyspaceNotificationsConfigParameter) && connectionFactory != null) {

try {
try (RedisConnection connection = connectionFactory.getConnection()) {

Properties config = connection.getConfig("notify-keyspace-events");
RedisServerCommands commands = connection.serverCommands();
Properties config = commands.getConfig("notify-keyspace-events");

if (!StringUtils.hasText(config.getProperty("notify-keyspace-events"))) {
connection.setConfig("notify-keyspace-events", keyspaceNotificationsConfigParameter);
commands.setConfig("notify-keyspace-events", keyspaceNotificationsConfigParameter);
}

} finally {
connection.close();
}
}

Expand All @@ -105,23 +125,4 @@ protected void doRegister(RedisMessageListenerContainer container) {
listenerContainer.addMessageListener(this, TOPIC_ALL_KEYEVENTS);
}

@Override
public void destroy() throws Exception {
listenerContainer.removeMessageListener(this);
}

/**
* Set the configuration string to use for {@literal notify-keyspace-events}.
*
* @param keyspaceNotificationsConfigParameter can be {@literal null}.
* @since 1.8
*/
public void setKeyspaceNotificationsConfigParameter(String keyspaceNotificationsConfigParameter) {
this.keyspaceNotificationsConfigParameter = keyspaceNotificationsConfigParameter;
}

@Override
public void afterPropertiesSet() throws Exception {
init();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -82,19 +82,17 @@ void setUp() {
adapter = new RedisKeyValueAdapter(template, mappingContext);
adapter.setEnableKeyspaceEvents(EnableKeyspaceEvents.ON_STARTUP);
adapter.afterPropertiesSet();
adapter.start();

template.execute((RedisCallback<Void>) connection -> {
connection.flushDb();
return null;
});

RedisConnection connection = template.getConnectionFactory().getConnection();

try {
try (RedisConnection connection = template.getConnectionFactory()
.getConnection()) {
connection.setConfig("notify-keyspace-events", "");
connection.setConfig("notify-keyspace-events", "KEA");
} finally {
connection.close();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ void setUp() throws Exception {

adapter = new RedisKeyValueAdapter(template, context);
adapter.afterPropertiesSet();
adapter.start();
}

@AfterEach
Expand Down Expand Up @@ -153,12 +154,32 @@ void shouldInitKeyExpirationListenerOnStartup() throws Exception {
adapter = new RedisKeyValueAdapter(template, context);
adapter.setEnableKeyspaceEvents(EnableKeyspaceEvents.ON_STARTUP);
adapter.afterPropertiesSet();
adapter.start();

KeyExpirationEventMessageListener listener = ((AtomicReference<KeyExpirationEventMessageListener>) getField(adapter,
"expirationListener")).get();
assertThat(listener).isNotNull();
}

@Test // GH-2957
void adapterShouldBeRestartable() throws Exception {

adapter.destroy();

adapter = new RedisKeyValueAdapter(template, context);
adapter.setEnableKeyspaceEvents(EnableKeyspaceEvents.ON_STARTUP);
adapter.afterPropertiesSet();
adapter.start();
adapter.stop();

assertThat(((AtomicReference<KeyExpirationEventMessageListener>) getField(adapter, "expirationListener")).get())
.isNull();

adapter.start();
assertThat(((AtomicReference<KeyExpirationEventMessageListener>) getField(adapter, "expirationListener")).get())
.isNotNull();
}

@Test // DATAREDIS-491
void shouldInitKeyExpirationListenerOnFirstPutWithTtl() throws Exception {

Expand Down
Loading