Skip to content

Commit

Permalink
Supported callbacks on async replication ok & error. Fixed issue #5249
Browse files Browse the repository at this point in the history
  • Loading branch information
lvca committed Nov 20, 2015
1 parent 7e0ce9b commit ab0ff38
Show file tree
Hide file tree
Showing 9 changed files with 462 additions and 165 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import com.orientechnologies.orient.core.command.OCommandContext.TIMEOUT_STRATEGY;
import com.orientechnologies.orient.core.config.OGlobalConfiguration;
import com.orientechnologies.orient.core.db.record.OIdentifiable;
import com.orientechnologies.orient.core.replication.OAsyncReplicationError;
import com.orientechnologies.orient.core.replication.OAsyncReplicationOk;

import java.util.Collections;
import java.util.HashMap;
Expand All @@ -32,24 +34,26 @@

/**
* Text based Command Request abstract class.
*
*
* @author Luca Garulli
*
*/
@SuppressWarnings("serial")
public abstract class OCommandRequestAbstract implements OCommandRequestInternal, ODistributedCommand {
protected OCommandResultListener resultListener;
protected OProgressListener progressListener;
protected int limit = -1;
protected long timeoutMs = OGlobalConfiguration.COMMAND_TIMEOUT.getValueAsLong();
protected TIMEOUT_STRATEGY timeoutStrategy = TIMEOUT_STRATEGY.EXCEPTION;
protected Map<Object, Object> parameters;
protected String fetchPlan = null;
protected boolean useCache = false;
protected boolean cacheableResult = false;
protected int limit = -1;
protected long timeoutMs = OGlobalConfiguration.COMMAND_TIMEOUT.getValueAsLong();
protected TIMEOUT_STRATEGY timeoutStrategy = TIMEOUT_STRATEGY.EXCEPTION;
protected Map<Object, Object> parameters;
protected String fetchPlan = null;
protected boolean useCache = false;
protected boolean cacheableResult = false;
protected OCommandContext context;
protected OAsyncReplicationOk onAsyncReplicationOk;
protected OAsyncReplicationError onAsyncReplicationError;


private final Set<String> nodesToExclude = new HashSet<String>();
private final Set<String> nodesToExclude = new HashSet<String>();

protected OCommandRequestAbstract() {
}
Expand All @@ -67,7 +71,7 @@ public Map<Object, Object> getParameters() {
}

protected void setParameters(final Object... iArgs) {
if (iArgs != null && iArgs.length > 0)
if (iArgs != null && iArgs.length>0)
parameters = convertToParameters(iArgs);
}

Expand All @@ -78,12 +82,11 @@ protected Map<Object, Object> convertToParameters(Object... iArgs) {
if (iArgs.length == 1 && iArgs[0] instanceof Map) {
params = (Map<Object, Object>) iArgs[0];
} else {
if (iArgs.length == 1 && iArgs[0] != null && iArgs[0].getClass().isArray() && iArgs[0] instanceof Object[] )
if (iArgs.length == 1 && iArgs[0] != null && iArgs[0].getClass().isArray() && iArgs[0] instanceof Object[])
iArgs = (Object[]) iArgs[0];

params = new HashMap<Object, Object>(iArgs.length);

for (int i = 0; i < iArgs.length; ++i) {
for (int i = 0; i<iArgs.length; ++i) {
Object par = iArgs[i];

if (par instanceof OIdentifiable && ((OIdentifiable) par).getIdentity().isValid())
Expand All @@ -96,6 +99,44 @@ protected Map<Object, Object> convertToParameters(Object... iArgs) {
return params;
}

/**
* Defines a callback to call in case of the asynchronous replication succeed.
*/
@Override
public OCommandRequestAbstract onAsyncReplicationOk(final OAsyncReplicationOk iCallback) {
onAsyncReplicationOk = iCallback;
return this;
}


/**
* Defines a callback to call in case of error during the asynchronous replication.
*/
@Override
public OCommandRequestAbstract onAsyncReplicationError(final OAsyncReplicationError iCallback) {
if (iCallback != null) {
onAsyncReplicationError = new OAsyncReplicationError() {
int retry = 0;

@Override
public ACTION onAsyncReplicationError(Throwable iException, final int iRetry) {
switch (iCallback.onAsyncReplicationError(iException, ++retry)) {
case RETRY:
execute();
break;

case IGNORE:

}

return ACTION.IGNORE;
}
};
} else
onAsyncReplicationError = null;
return this;
}

public OProgressListener getProgressListener() {
return progressListener;
}
Expand Down Expand Up @@ -182,4 +223,13 @@ public void addExcludedNode(String node) {
public void removeExcludedNode(String node) {
nodesToExclude.remove(node);
}


public OAsyncReplicationOk getOnAsyncReplicationOk() {
return onAsyncReplicationOk;
}

public OAsyncReplicationError getOnAsyncReplicationError() {
return onAsyncReplicationError;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package com.orientechnologies.orient.core.command;

import com.orientechnologies.orient.core.db.ODatabaseRecordThreadLocal;
import com.orientechnologies.orient.core.db.OExecutionThreadLocal;
import com.orientechnologies.orient.core.exception.OSerializationException;
import com.orientechnologies.orient.core.index.OCompositeKey;
import com.orientechnologies.orient.core.record.impl.ODocument;
Expand All @@ -36,9 +37,8 @@

/**
* Text based Command Request abstract class.
*
*
* @author Luca Garulli
*
*/
@SuppressWarnings("serial")
public abstract class OCommandRequestTextAbstract extends OCommandRequestAbstract implements OCommandRequestText {
Expand All @@ -60,6 +60,10 @@ protected OCommandRequestTextAbstract(final String iText) {
@SuppressWarnings("unchecked")
public <RET> RET execute(final Object... iArgs) {
setParameters(iArgs);

OExecutionThreadLocal.INSTANCE.get().onAsyncReplicationOk = onAsyncReplicationOk;
OExecutionThreadLocal.INSTANCE.get().onAsyncReplicationError = onAsyncReplicationError;

return (RET) ODatabaseRecordThreadLocal.INSTANCE.get().getStorage().command(this);
}

Expand Down Expand Up @@ -182,8 +186,7 @@ protected void fromStream(final OMemoryStream buffer) {
parameters.put(p.getKey(), compositeKey);

} else {
final Object value = OCompositeKeySerializer.INSTANCE.deserialize(OStringSerializerHelper.getBinaryContent(p.getValue()),
0);
final Object value = OCompositeKeySerializer.INSTANCE.deserialize(OStringSerializerHelper.getBinaryContent(p.getValue()), 0);

if (p.getKey() instanceof String && Character.isDigit(((String) p.getKey()).charAt(0)))
parameters.put(Integer.parseInt((String) p.getKey()), value);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,25 @@

package com.orientechnologies.orient.core.command;

import com.orientechnologies.orient.core.replication.OAsyncReplicationError;
import com.orientechnologies.orient.core.replication.OAsyncReplicationOk;

import java.util.Set;

/**
* @author Andrey Lomakin (a.lomakin-at-orientechnologies.com)
* @since 7/2/14
*/
public interface ODistributedCommand {
Set<String> nodesToExclude();
Set<String> nodesToExclude();

/**
* Defines a callback to call in case of the asynchronous replication succeed.
*/
ODistributedCommand onAsyncReplicationOk(OAsyncReplicationOk iCallback);

/**
* Defines a callback to call in case of error during the asynchronous replication.
*/
ODistributedCommand onAsyncReplicationError(OAsyncReplicationError iCallback);
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,23 @@
import com.orientechnologies.common.thread.OSoftThread;
import com.orientechnologies.orient.core.OOrientListenerAbstract;
import com.orientechnologies.orient.core.Orient;
import com.orientechnologies.orient.core.replication.OAsyncReplicationError;
import com.orientechnologies.orient.core.replication.OAsyncReplicationOk;

/**
* Thread Local to store execution setting.
*
*
* @author Luca Garulli
*/
public class OExecutionThreadLocal extends ThreadLocal<OExecutionThreadLocal.OExecutionThreadData> {
public class OExecutionThreadData {
volatile public OAsyncReplicationOk onAsyncReplicationOk;
volatile public OAsyncReplicationError onAsyncReplicationError;
}

class OExecutionThreadData {
@Override
protected OExecutionThreadData initialValue() {
return new OExecutionThreadData();
}

public static volatile OExecutionThreadLocal INSTANCE = new OExecutionThreadLocal();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
*
* * Copyright 2014 Orient Technologies LTD (info(at)orientechnologies.com)
* *
* * Licensed under the Apache License, Version 2.0 (the "License");
* * you may not use this file except in compliance with the License.
* * You may obtain a copy of the License at
* *
* * http://www.apache.org/licenses/LICENSE-2.0
* *
* * Unless required by applicable law or agreed to in writing, software
* * distributed under the License is distributed on an "AS IS" BASIS,
* * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* * See the License for the specific language governing permissions and
* * limitations under the License.
* *
* * For more information: http://www.orientechnologies.com
*
*/
package com.orientechnologies.orient.core.replication;

/**
* Interface to catch errors on asynchronous replication.
*
* @author Luca Garulli
*/
public interface OAsyncReplicationError {
enum ACTION {IGNORE, RETRY}

/**
* Callback called in case of error during asynchronous replication.
*
* @param iException The exception caught
* @param iRetry The number of retries so far. At every retry, this number is incremented.
* @return RETRY to retry the operation, otherwise IGNORE
*/
ACTION onAsyncReplicationError(Throwable iException, int iRetry);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
*
* * Copyright 2014 Orient Technologies LTD (info(at)orientechnologies.com)
* *
* * Licensed under the Apache License, Version 2.0 (the "License");
* * you may not use this file except in compliance with the License.
* * You may obtain a copy of the License at
* *
* * http://www.apache.org/licenses/LICENSE-2.0
* *
* * Unless required by applicable law or agreed to in writing, software
* * distributed under the License is distributed on an "AS IS" BASIS,
* * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* * See the License for the specific language governing permissions and
* * limitations under the License.
* *
* * For more information: http://www.orientechnologies.com
*
*/
package com.orientechnologies.orient.core.replication;

/**
* Interface to catch asynchronous replication operation successfully completed.
*
* @author Luca Garulli
*/
public interface OAsyncReplicationOk {
void onAsyncReplicationOk();
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,13 @@
package com.orientechnologies.orient.core.sql;

import com.orientechnologies.orient.core.command.OCommandRequestTextAbstract;
import com.orientechnologies.orient.core.replication.OAsyncReplicationError;
import com.orientechnologies.orient.core.replication.OAsyncReplicationOk;

/**
* SQL command request implementation. It just stores the request and delegated the execution to the configured OCommandExecutor.
*
*
* @author Luca Garulli
*
*/
@SuppressWarnings("serial")
public class OCommandSQL extends OCommandRequestTextAbstract {
Expand All @@ -45,4 +46,19 @@ public String toString() {
return "sql." + text;// OIOUtils.getStringMaxLength(text, 50, "...");
}

/**
* Defines a callback to call in case of the asynchronous replication succeed.
*/
@Override
public OCommandSQL onAsyncReplicationOk(final OAsyncReplicationOk iCallback) {
return (OCommandSQL) super.onAsyncReplicationOk(iCallback);
}

/**
* Defines a callback to call in case of error during the asynchronous replication.
*/
@Override
public OCommandSQL onAsyncReplicationError(final OAsyncReplicationError iCallback) {
return (OCommandSQL) super.onAsyncReplicationError(iCallback);
}
}
Loading

0 comments on commit ab0ff38

Please sign in to comment.