From ab0ff38c5577a22317a85f8c283429740bedc7cb Mon Sep 17 00:00:00 2001 From: lvca Date: Thu, 19 Nov 2015 23:24:47 -0600 Subject: [PATCH] Supported callbacks on async replication ok & error. Fixed issue #5249 --- .../core/command/OCommandRequestAbstract.java | 78 ++++- .../command/OCommandRequestTextAbstract.java | 11 +- .../core/command/ODistributedCommand.java | 15 +- .../orient/core/db/OExecutionThreadLocal.java | 12 +- .../replication/OAsyncReplicationError.java | 38 +++ .../core/replication/OAsyncReplicationOk.java | 29 ++ .../orient/core/sql/OCommandSQL.java | 20 +- .../asynch/ServerClusterAsyncGraphTest.java | 151 ++++++++++ .../distributed/ODistributedStorage.java | 273 +++++++++--------- 9 files changed, 462 insertions(+), 165 deletions(-) create mode 100644 core/src/main/java/com/orientechnologies/orient/core/replication/OAsyncReplicationError.java create mode 100644 core/src/main/java/com/orientechnologies/orient/core/replication/OAsyncReplicationOk.java create mode 100755 distributed/src/test/java/com/orientechnologies/orient/server/distributed/asynch/ServerClusterAsyncGraphTest.java diff --git a/core/src/main/java/com/orientechnologies/orient/core/command/OCommandRequestAbstract.java b/core/src/main/java/com/orientechnologies/orient/core/command/OCommandRequestAbstract.java index b7ea9b7c72f..af244f1b338 100755 --- a/core/src/main/java/com/orientechnologies/orient/core/command/OCommandRequestAbstract.java +++ b/core/src/main/java/com/orientechnologies/orient/core/command/OCommandRequestAbstract.java @@ -23,6 +23,8 @@ import com.orientechnologies.orient.core.command.OCommandContext.TIMEOUT_STRATEGY; import com.orientechnologies.orient.core.config.OGlobalConfiguration; import com.orientechnologies.orient.core.db.record.OIdentifiable; +import com.orientechnologies.orient.core.replication.OAsyncReplicationError; +import com.orientechnologies.orient.core.replication.OAsyncReplicationOk; import java.util.Collections; import java.util.HashMap; @@ -32,24 +34,26 @@ /** * Text based Command Request abstract class. - * + * * @author Luca Garulli - * */ @SuppressWarnings("serial") public abstract class OCommandRequestAbstract implements OCommandRequestInternal, ODistributedCommand { protected OCommandResultListener resultListener; protected OProgressListener progressListener; - protected int limit = -1; - protected long timeoutMs = OGlobalConfiguration.COMMAND_TIMEOUT.getValueAsLong(); - protected TIMEOUT_STRATEGY timeoutStrategy = TIMEOUT_STRATEGY.EXCEPTION; - protected Map parameters; - protected String fetchPlan = null; - protected boolean useCache = false; - protected boolean cacheableResult = false; + protected int limit = -1; + protected long timeoutMs = OGlobalConfiguration.COMMAND_TIMEOUT.getValueAsLong(); + protected TIMEOUT_STRATEGY timeoutStrategy = TIMEOUT_STRATEGY.EXCEPTION; + protected Map parameters; + protected String fetchPlan = null; + protected boolean useCache = false; + protected boolean cacheableResult = false; protected OCommandContext context; + protected OAsyncReplicationOk onAsyncReplicationOk; + protected OAsyncReplicationError onAsyncReplicationError; + - private final Set nodesToExclude = new HashSet(); + private final Set nodesToExclude = new HashSet(); protected OCommandRequestAbstract() { } @@ -67,7 +71,7 @@ public Map getParameters() { } protected void setParameters(final Object... iArgs) { - if (iArgs != null && iArgs.length > 0) + if (iArgs != null && iArgs.length>0) parameters = convertToParameters(iArgs); } @@ -78,12 +82,11 @@ protected Map convertToParameters(Object... iArgs) { if (iArgs.length == 1 && iArgs[0] instanceof Map) { params = (Map) iArgs[0]; } else { - if (iArgs.length == 1 && iArgs[0] != null && iArgs[0].getClass().isArray() && iArgs[0] instanceof Object[] ) + if (iArgs.length == 1 && iArgs[0] != null && iArgs[0].getClass().isArray() && iArgs[0] instanceof Object[]) iArgs = (Object[]) iArgs[0]; params = new HashMap(iArgs.length); - - for (int i = 0; i < iArgs.length; ++i) { + for (int i = 0; i convertToParameters(Object... iArgs) { return params; } + /** + * Defines a callback to call in case of the asynchronous replication succeed. + */ + @Override + public OCommandRequestAbstract onAsyncReplicationOk(final OAsyncReplicationOk iCallback) { + onAsyncReplicationOk = iCallback; + return this; + } + + + /** + * Defines a callback to call in case of error during the asynchronous replication. + */ + @Override + public OCommandRequestAbstract onAsyncReplicationError(final OAsyncReplicationError iCallback) { + if (iCallback != null) { + onAsyncReplicationError = new OAsyncReplicationError() { + int retry = 0; + + @Override + public ACTION onAsyncReplicationError(Throwable iException, final int iRetry) { + switch (iCallback.onAsyncReplicationError(iException, ++retry)) { + case RETRY: + execute(); + break; + + case IGNORE: + + } + + return ACTION.IGNORE; + } + }; + } else + onAsyncReplicationError = null; + return this; + } + public OProgressListener getProgressListener() { return progressListener; } @@ -182,4 +223,13 @@ public void addExcludedNode(String node) { public void removeExcludedNode(String node) { nodesToExclude.remove(node); } + + + public OAsyncReplicationOk getOnAsyncReplicationOk() { + return onAsyncReplicationOk; + } + + public OAsyncReplicationError getOnAsyncReplicationError() { + return onAsyncReplicationError; + } } diff --git a/core/src/main/java/com/orientechnologies/orient/core/command/OCommandRequestTextAbstract.java b/core/src/main/java/com/orientechnologies/orient/core/command/OCommandRequestTextAbstract.java index ba1c1ccec41..38c314014fd 100755 --- a/core/src/main/java/com/orientechnologies/orient/core/command/OCommandRequestTextAbstract.java +++ b/core/src/main/java/com/orientechnologies/orient/core/command/OCommandRequestTextAbstract.java @@ -20,6 +20,7 @@ package com.orientechnologies.orient.core.command; import com.orientechnologies.orient.core.db.ODatabaseRecordThreadLocal; +import com.orientechnologies.orient.core.db.OExecutionThreadLocal; import com.orientechnologies.orient.core.exception.OSerializationException; import com.orientechnologies.orient.core.index.OCompositeKey; import com.orientechnologies.orient.core.record.impl.ODocument; @@ -36,9 +37,8 @@ /** * Text based Command Request abstract class. - * + * * @author Luca Garulli - * */ @SuppressWarnings("serial") public abstract class OCommandRequestTextAbstract extends OCommandRequestAbstract implements OCommandRequestText { @@ -60,6 +60,10 @@ protected OCommandRequestTextAbstract(final String iText) { @SuppressWarnings("unchecked") public RET execute(final Object... iArgs) { setParameters(iArgs); + + OExecutionThreadLocal.INSTANCE.get().onAsyncReplicationOk = onAsyncReplicationOk; + OExecutionThreadLocal.INSTANCE.get().onAsyncReplicationError = onAsyncReplicationError; + return (RET) ODatabaseRecordThreadLocal.INSTANCE.get().getStorage().command(this); } @@ -182,8 +186,7 @@ protected void fromStream(final OMemoryStream buffer) { parameters.put(p.getKey(), compositeKey); } else { - final Object value = OCompositeKeySerializer.INSTANCE.deserialize(OStringSerializerHelper.getBinaryContent(p.getValue()), - 0); + final Object value = OCompositeKeySerializer.INSTANCE.deserialize(OStringSerializerHelper.getBinaryContent(p.getValue()), 0); if (p.getKey() instanceof String && Character.isDigit(((String) p.getKey()).charAt(0))) parameters.put(Integer.parseInt((String) p.getKey()), value); diff --git a/core/src/main/java/com/orientechnologies/orient/core/command/ODistributedCommand.java b/core/src/main/java/com/orientechnologies/orient/core/command/ODistributedCommand.java index d72d7548bec..6bad8a45ac4 100644 --- a/core/src/main/java/com/orientechnologies/orient/core/command/ODistributedCommand.java +++ b/core/src/main/java/com/orientechnologies/orient/core/command/ODistributedCommand.java @@ -20,6 +20,9 @@ package com.orientechnologies.orient.core.command; +import com.orientechnologies.orient.core.replication.OAsyncReplicationError; +import com.orientechnologies.orient.core.replication.OAsyncReplicationOk; + import java.util.Set; /** @@ -27,5 +30,15 @@ * @since 7/2/14 */ public interface ODistributedCommand { - Set nodesToExclude(); + Set nodesToExclude(); + + /** + * Defines a callback to call in case of the asynchronous replication succeed. + */ + ODistributedCommand onAsyncReplicationOk(OAsyncReplicationOk iCallback); + + /** + * Defines a callback to call in case of error during the asynchronous replication. + */ + ODistributedCommand onAsyncReplicationError(OAsyncReplicationError iCallback); } diff --git a/core/src/main/java/com/orientechnologies/orient/core/db/OExecutionThreadLocal.java b/core/src/main/java/com/orientechnologies/orient/core/db/OExecutionThreadLocal.java index 2dafc3c3682..333dbccaf24 100755 --- a/core/src/main/java/com/orientechnologies/orient/core/db/OExecutionThreadLocal.java +++ b/core/src/main/java/com/orientechnologies/orient/core/db/OExecutionThreadLocal.java @@ -22,15 +22,23 @@ import com.orientechnologies.common.thread.OSoftThread; import com.orientechnologies.orient.core.OOrientListenerAbstract; import com.orientechnologies.orient.core.Orient; +import com.orientechnologies.orient.core.replication.OAsyncReplicationError; +import com.orientechnologies.orient.core.replication.OAsyncReplicationOk; /** * Thread Local to store execution setting. - * + * * @author Luca Garulli */ public class OExecutionThreadLocal extends ThreadLocal { + public class OExecutionThreadData { + volatile public OAsyncReplicationOk onAsyncReplicationOk; + volatile public OAsyncReplicationError onAsyncReplicationError; + } - class OExecutionThreadData { + @Override + protected OExecutionThreadData initialValue() { + return new OExecutionThreadData(); } public static volatile OExecutionThreadLocal INSTANCE = new OExecutionThreadLocal(); diff --git a/core/src/main/java/com/orientechnologies/orient/core/replication/OAsyncReplicationError.java b/core/src/main/java/com/orientechnologies/orient/core/replication/OAsyncReplicationError.java new file mode 100644 index 00000000000..3dcd6725344 --- /dev/null +++ b/core/src/main/java/com/orientechnologies/orient/core/replication/OAsyncReplicationError.java @@ -0,0 +1,38 @@ +/* + * + * * Copyright 2014 Orient Technologies LTD (info(at)orientechnologies.com) + * * + * * Licensed under the Apache License, Version 2.0 (the "License"); + * * you may not use this file except in compliance with the License. + * * You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * * + * * For more information: http://www.orientechnologies.com + * + */ +package com.orientechnologies.orient.core.replication; + +/** + * Interface to catch errors on asynchronous replication. + * + * @author Luca Garulli + */ +public interface OAsyncReplicationError { + enum ACTION {IGNORE, RETRY} + + /** + * Callback called in case of error during asynchronous replication. + * + * @param iException The exception caught + * @param iRetry The number of retries so far. At every retry, this number is incremented. + * @return RETRY to retry the operation, otherwise IGNORE + */ + ACTION onAsyncReplicationError(Throwable iException, int iRetry); +} diff --git a/core/src/main/java/com/orientechnologies/orient/core/replication/OAsyncReplicationOk.java b/core/src/main/java/com/orientechnologies/orient/core/replication/OAsyncReplicationOk.java new file mode 100644 index 00000000000..1a43cede3d0 --- /dev/null +++ b/core/src/main/java/com/orientechnologies/orient/core/replication/OAsyncReplicationOk.java @@ -0,0 +1,29 @@ +/* + * + * * Copyright 2014 Orient Technologies LTD (info(at)orientechnologies.com) + * * + * * Licensed under the Apache License, Version 2.0 (the "License"); + * * you may not use this file except in compliance with the License. + * * You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * * + * * For more information: http://www.orientechnologies.com + * + */ +package com.orientechnologies.orient.core.replication; + +/** + * Interface to catch asynchronous replication operation successfully completed. + * + * @author Luca Garulli + */ +public interface OAsyncReplicationOk { + void onAsyncReplicationOk(); +} diff --git a/core/src/main/java/com/orientechnologies/orient/core/sql/OCommandSQL.java b/core/src/main/java/com/orientechnologies/orient/core/sql/OCommandSQL.java index 4b257fa97a8..1fde8e369f6 100644 --- a/core/src/main/java/com/orientechnologies/orient/core/sql/OCommandSQL.java +++ b/core/src/main/java/com/orientechnologies/orient/core/sql/OCommandSQL.java @@ -20,12 +20,13 @@ package com.orientechnologies.orient.core.sql; import com.orientechnologies.orient.core.command.OCommandRequestTextAbstract; +import com.orientechnologies.orient.core.replication.OAsyncReplicationError; +import com.orientechnologies.orient.core.replication.OAsyncReplicationOk; /** * SQL command request implementation. It just stores the request and delegated the execution to the configured OCommandExecutor. - * + * * @author Luca Garulli - * */ @SuppressWarnings("serial") public class OCommandSQL extends OCommandRequestTextAbstract { @@ -45,4 +46,19 @@ public String toString() { return "sql." + text;// OIOUtils.getStringMaxLength(text, 50, "..."); } + /** + * Defines a callback to call in case of the asynchronous replication succeed. + */ + @Override + public OCommandSQL onAsyncReplicationOk(final OAsyncReplicationOk iCallback) { + return (OCommandSQL) super.onAsyncReplicationOk(iCallback); + } + + /** + * Defines a callback to call in case of error during the asynchronous replication. + */ + @Override + public OCommandSQL onAsyncReplicationError(final OAsyncReplicationError iCallback) { + return (OCommandSQL) super.onAsyncReplicationError(iCallback); + } } diff --git a/distributed/src/test/java/com/orientechnologies/orient/server/distributed/asynch/ServerClusterAsyncGraphTest.java b/distributed/src/test/java/com/orientechnologies/orient/server/distributed/asynch/ServerClusterAsyncGraphTest.java new file mode 100755 index 00000000000..99d91580522 --- /dev/null +++ b/distributed/src/test/java/com/orientechnologies/orient/server/distributed/asynch/ServerClusterAsyncGraphTest.java @@ -0,0 +1,151 @@ +/* + * + * * Copyright 2014 Orient Technologies LTD (info(at)orientechnologies.com) + * * + * * Licensed under the Apache License, Version 2.0 (the "License"); + * * you may not use this file except in compliance with the License. + * * You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * * + * * For more information: http://www.orientechnologies.com + * + */ + +package com.orientechnologies.orient.server.distributed.asynch; + +import com.orientechnologies.common.concur.ONeedRetryException; +import com.orientechnologies.orient.core.replication.OAsyncReplicationError; +import com.orientechnologies.orient.core.sql.OCommandSQL; +import com.orientechnologies.orient.server.distributed.AbstractServerClusterTest; +import com.orientechnologies.orient.server.distributed.ServerRun; +import com.tinkerpop.blueprints.Direction; +import com.tinkerpop.blueprints.Edge; +import com.tinkerpop.blueprints.impls.orient.OrientGraphFactory; +import com.tinkerpop.blueprints.impls.orient.OrientGraphNoTx; +import com.tinkerpop.blueprints.impls.orient.OrientVertex; +import junit.framework.Assert; +import org.junit.Test; + +/** + * Check vertex and edge creation are propagated across all the nodes in asynchronous mode. + */ +public class ServerClusterAsyncGraphTest extends AbstractServerClusterTest { + final static int SERVERS = 2; + private OrientVertex v1; + private OrientVertex v2; + private OrientVertex v3; + + public String getDatabaseName() { + return "distributed-graphtest"; + } + + @Test + public void test() throws Exception { + init(SERVERS); + prepare(false); + execute(); + } + + @Override + protected String getDistributedServerConfiguration(final ServerRun server) { + return "asynch-dserver-config-" + server.getServerId() + ".xml"; + } + + @Override + protected void executeTest() throws Exception { + { + OrientGraphFactory factory = new OrientGraphFactory("plocal:target/server0/databases/" + getDatabaseName()); + OrientGraphNoTx g = factory.getNoTx(); + + try { + g.createVertexType("Post"); + g.createVertexType("User"); + g.createEdgeType("Own"); + + g.addVertex("class:User"); + + g.command(new OCommandSQL("insert into Post (content, timestamp) values('test', 1)")).execute(); + } finally { + g.shutdown(); + } + } + + // CHECK VERTEX CREATION ON ALL THE SERVERS + for (int s = 0; s result = g2.command(new OCommandSQL("select from Post")).execute(); + Assert.assertTrue(result.iterator().hasNext()); + Assert.assertNotNull(result.iterator().next()); + + } finally { + g2.shutdown(); + } + } + + { + OrientGraphFactory factory = new OrientGraphFactory("plocal:target/server0/databases/" + getDatabaseName()); + OrientGraphNoTx g = factory.getNoTx(); + try { + g.command(new OCommandSQL("create edge Own from (select from User) to (select from Post)").onAsyncReplicationError(new OAsyncReplicationError() { + @Override + public ACTION onAsyncReplicationError(Throwable iException, int iRetry) { + return iException instanceof ONeedRetryException && iRetry<=3 ? ACTION.RETRY : ACTION.IGNORE; + } + })).execute(); + + } finally { + g.shutdown(); + } + } + + Thread.sleep(2000); + + // CHECK VERTEX CREATION ON ALL THE SERVERS + for (int s = 0; s result = g2.command(new OCommandSQL("select from Own")).execute(); + Assert.assertTrue(result.iterator().hasNext()); + Assert.assertNotNull(result.iterator().next()); + + result = g2.command(new OCommandSQL("select from Post")).execute(); + Assert.assertTrue(result.iterator().hasNext()); + + final OrientVertex v = result.iterator().next(); + Assert.assertNotNull(v); + + final Iterable inEdges = v.getEdges(Direction.IN); + Assert.assertTrue(inEdges.iterator().hasNext()); + Assert.assertNotNull(inEdges.iterator().next()); + + result = g2.command(new OCommandSQL("select from User")).execute(); + Assert.assertTrue(result.iterator().hasNext()); + + final OrientVertex v2 = result.iterator().next(); + Assert.assertNotNull(v2); + + final Iterable outEdges = v2.getEdges(Direction.OUT); + Assert.assertTrue(outEdges.iterator().hasNext()); + Assert.assertNotNull(outEdges.iterator().next()); + + } finally { + g2.shutdown(); + } + } + + } +} diff --git a/server/src/main/java/com/orientechnologies/orient/server/distributed/ODistributedStorage.java b/server/src/main/java/com/orientechnologies/orient/server/distributed/ODistributedStorage.java index 35c935f8e6f..693f087e28b 100755 --- a/server/src/main/java/com/orientechnologies/orient/server/distributed/ODistributedStorage.java +++ b/server/src/main/java/com/orientechnologies/orient/server/distributed/ODistributedStorage.java @@ -38,6 +38,7 @@ import com.orientechnologies.orient.core.db.ODatabase; import com.orientechnologies.orient.core.db.ODatabaseDocumentInternal; import com.orientechnologies.orient.core.db.ODatabaseRecordThreadLocal; +import com.orientechnologies.orient.core.db.OExecutionThreadLocal; import com.orientechnologies.orient.core.db.OScenarioThreadLocal; import com.orientechnologies.orient.core.db.OScenarioThreadLocal.RUN_MODE; import com.orientechnologies.orient.core.db.document.ODatabaseDocumentTx; @@ -56,6 +57,8 @@ import com.orientechnologies.orient.core.record.ORecord; import com.orientechnologies.orient.core.record.ORecordInternal; import com.orientechnologies.orient.core.record.impl.ODocument; +import com.orientechnologies.orient.core.replication.OAsyncReplicationError; +import com.orientechnologies.orient.core.replication.OAsyncReplicationOk; import com.orientechnologies.orient.core.sql.OCommandExecutorSQLDelegate; import com.orientechnologies.orient.core.sql.OCommandExecutorSQLSelect; import com.orientechnologies.orient.core.sql.OCommandSQL; @@ -100,28 +103,27 @@ public class ODistributedStorage implements OStorage, OFreezableStorage, OAutosh protected final ODistributedServerManager dManager; protected final OAbstractPaginatedStorage wrapped; - protected final TimerTask purgeDeletedRecordsTask; + protected final TimerTask purgeDeletedRecordsTask; protected final ConcurrentHashMap> deletedRecords = new ConcurrentHashMap>(); protected final AtomicLong lastOperationId = new AtomicLong(); protected final BlockingQueue asynchronousOperationsQueue; protected final Thread asynchWorker; - protected volatile boolean running = true; - protected volatile File lastValidBackup = null; + protected volatile boolean running = true; + protected volatile File lastValidBackup = null; public ODistributedStorage(final OServer iServer, final OAbstractPaginatedStorage wrapped) { this.serverInstance = iServer; this.dManager = iServer.getDistributedManager(); this.wrapped = wrapped; - ODistributedServerLog.debug(this, dManager != null ? dManager.getLocalNodeName() : "?", null, - ODistributedServerLog.DIRECTION.NONE, "Installing distributed storage on database '%s'", wrapped.getName()); + ODistributedServerLog.debug(this, dManager != null ? dManager.getLocalNodeName() : "?", null, ODistributedServerLog.DIRECTION.NONE, "Installing distributed storage on database '%s'", wrapped.getName()); purgeDeletedRecordsTask = new TimerTask() { @Override public void run() { final long now = System.currentTimeMillis(); - for (Iterator>> it = deletedRecords.entrySet().iterator(); it.hasNext();) { + for (Iterator>> it = deletedRecords.entrySet().iterator(); it.hasNext(); ) { final Map.Entry> entry = it.next(); try { @@ -129,7 +131,7 @@ public void run() { final long time = entry.getValue().getKey(); final int version = entry.getValue().getValue(); - if (now - time > (OGlobalConfiguration.DISTRIBUTED_ASYNCH_RESPONSES_TIMEOUT.getValueAsLong() * 2)) { + if (now - time>(OGlobalConfiguration.DISTRIBUTED_ASYNCH_RESPONSES_TIMEOUT.getValueAsLong() * 2)) { // DELETE RECORD final OStorageOperationResult result = wrapped.deleteRecord(rid, version, 0, null); if (result == null || !result.getResult()) @@ -143,12 +145,10 @@ public void run() { } }; - Orient.instance().scheduleTask(purgeDeletedRecordsTask, - OGlobalConfiguration.DISTRIBUTED_PURGE_RESPONSES_TIMER_DELAY.getValueAsLong(), - OGlobalConfiguration.DISTRIBUTED_PURGE_RESPONSES_TIMER_DELAY.getValueAsLong()); + Orient.instance().scheduleTask(purgeDeletedRecordsTask, OGlobalConfiguration.DISTRIBUTED_PURGE_RESPONSES_TIMER_DELAY.getValueAsLong(), OGlobalConfiguration.DISTRIBUTED_PURGE_RESPONSES_TIMER_DELAY.getValueAsLong()); final int queueSize = OGlobalConfiguration.DISTRIBUTED_ASYNCH_QUEUE_SIZE.getValueAsInteger(); - if (queueSize <= 0) + if (queueSize<=0) asynchronousOperationsQueue = new LinkedBlockingQueue(); else asynchronousOperationsQueue = new LinkedBlockingQueue(queueSize); @@ -160,9 +160,7 @@ public void run() { try { final OAsynchDistributedOperation operation = asynchronousOperationsQueue.take(); - final Object result = dManager.sendRequest(operation.getDatabaseName(), operation.getClusterNames(), - operation.getNodes(), operation.getTask(), - operation.getCallback() != null ? EXECUTION_MODE.RESPONSE : EXECUTION_MODE.NO_RESPONSE); + final Object result = dManager.sendRequest(operation.getDatabaseName(), operation.getClusterNames(), operation.getNodes(), operation.getTask(), operation.getCallback() != null ? EXECUTION_MODE.RESPONSE : EXECUTION_MODE.NO_RESPONSE); if (operation.getCallback() != null) operation.getCallback().call(result); @@ -170,10 +168,8 @@ public void run() { } catch (InterruptedException e) { final int pendingMessages = asynchronousOperationsQueue.size(); - if (pendingMessages > 0) - ODistributedServerLog.warn(this, dManager != null ? dManager.getLocalNodeName() : "?", null, - ODistributedServerLog.DIRECTION.NONE, - "Received shutdown signal, waiting for asynchronous queue is empty (pending msgs=%d)...", pendingMessages); + if (pendingMessages>0) + ODistributedServerLog.warn(this, dManager != null ? dManager.getLocalNodeName() : "?", null, ODistributedServerLog.DIRECTION.NONE, "Received shutdown signal, waiting for asynchronous queue is empty (pending msgs=%d)...", pendingMessages); Thread.interrupted(); @@ -181,15 +177,12 @@ public void run() { if (running) // ASYNC: IGNORE IT if (e instanceof ONeedRetryException) - ODistributedServerLog.debug(this, dManager != null ? dManager.getLocalNodeName() : "?", null, - ODistributedServerLog.DIRECTION.OUT, "Error on executing asynchronous operation", e); + ODistributedServerLog.debug(this, dManager != null ? dManager.getLocalNodeName() : "?", null, ODistributedServerLog.DIRECTION.OUT, "Error on executing asynchronous operation", e); else - ODistributedServerLog.error(this, dManager != null ? dManager.getLocalNodeName() : "?", null, - ODistributedServerLog.DIRECTION.OUT, "Error on executing asynchronous operation", e); + ODistributedServerLog.error(this, dManager != null ? dManager.getLocalNodeName() : "?", null, ODistributedServerLog.DIRECTION.OUT, "Error on executing asynchronous operation", e); } } - ODistributedServerLog.warn(this, dManager != null ? dManager.getLocalNodeName() : "?", null, - ODistributedServerLog.DIRECTION.NONE, "Shutdown asynchronous queue worker completed"); + ODistributedServerLog.warn(this, dManager != null ? dManager.getLocalNodeName() : "?", null, ODistributedServerLog.DIRECTION.NONE, "Shutdown asynchronous queue worker completed"); } }; asynchWorker.setName("OrientDB Distributed asynch ops node=" + getNodeId() + " db=" + getName()); @@ -231,8 +224,7 @@ public Object command(final OCommandRequestText iCommand) { executor.setProgressListener(iCommand.getProgressListener()); executor.parse(iCommand); - final OCommandExecutor exec = executor instanceof OCommandExecutorSQLDelegate - ? ((OCommandExecutorSQLDelegate) executor).getDelegate() : executor; + final OCommandExecutor exec = executor instanceof OCommandExecutorSQLDelegate ? ((OCommandExecutorSQLDelegate) executor).getDelegate() : executor; if (!exec.isIdempotent()) checkNodeIsMaster(localNodeName, dbCfg); @@ -277,8 +269,7 @@ public Object command(final OCommandRequestText iCommand) { results = executeOnServers(iCommand, involvedClusters, nodeClusterMap); } - final OCommandExecutorSQLSelect select = exec instanceof OCommandExecutorSQLSelect ? (OCommandExecutorSQLSelect) exec - : null; + final OCommandExecutorSQLSelect select = exec instanceof OCommandExecutorSQLSelect ? (OCommandExecutorSQLSelect) exec : null; if (select != null && select.isAnyFunctionAggregates() && !select.hasGroupBy()) { result = mergeResultByAggregation(select, results); @@ -290,8 +281,7 @@ public Object command(final OCommandRequestText iCommand) { } } else { - final OAbstractCommandTask task = iCommand instanceof OCommandScript ? new OScriptTask(iCommand) - : new OSQLCommandTask(iCommand, new HashSet()); + final OAbstractCommandTask task = iCommand instanceof OCommandScript ? new OScriptTask(iCommand) : new OSQLCommandTask(iCommand, new HashSet()); task.setResultStrategy(OAbstractRemoteTask.RESULT_STRATEGY.ANY); final Collection nodes = dbCfg.getServers(involvedClusters); @@ -333,8 +323,7 @@ else if (result instanceof Exception) } } - protected Map executeOnServers(final OCommandRequestText iCommand, final Collection involvedClusters, - final Map> nodeClusterMap) { + protected Map executeOnServers(final OCommandRequestText iCommand, final Collection involvedClusters, final Map> nodeClusterMap) { final Map results = new HashMap(nodeClusterMap.size()); @@ -345,14 +334,11 @@ protected Map executeOnServers(final OCommandRequestText iComman if (!dManager.isNodeAvailable(nodeName, getName())) { - ODistributedServerLog.warn(this, dManager.getLocalNodeName(), nodeName, ODistributedServerLog.DIRECTION.OUT, - "Node '%s' is involved in the command '%s' against database '%s', but the node is not active. Excluding it", nodeName, - iCommand, wrapped.getName()); + ODistributedServerLog.warn(this, dManager.getLocalNodeName(), nodeName, ODistributedServerLog.DIRECTION.OUT, "Node '%s' is involved in the command '%s' against database '%s', but the node is not active. Excluding it", nodeName, iCommand, wrapped.getName()); } else { - final OAbstractCommandTask task = iCommand instanceof OCommandScript ? new OScriptTask(iCommand) - : new OSQLCommandTask(iCommand, c.getValue()); + final OAbstractCommandTask task = iCommand instanceof OCommandScript ? new OScriptTask(iCommand) : new OSQLCommandTask(iCommand, c.getValue()); task.setResultStrategy(OAbstractRemoteTask.RESULT_STRATEGY.ANY); nodes.clear(); @@ -431,8 +417,7 @@ protected Object mergeResultByAggregation(final OCommandExecutorSQLSelect select return list; } - protected boolean executeLocally(final String localNodeName, final ODistributedConfiguration dbCfg, final OCommandExecutor exec, - final Collection involvedClusters, final Collection nodes) { + protected boolean executeLocally(final String localNodeName, final ODistributedConfiguration dbCfg, final OCommandExecutor exec, final Collection involvedClusters, final Collection nodes) { boolean executeLocally = false; if (exec.isIdempotent()) { // IDEMPOTENT: CHECK IF CAN WORK LOCALLY ONLY @@ -445,7 +430,7 @@ protected boolean executeLocally(final String localNodeName, final ODistributedC maxReadQuorum = Math.max(maxReadQuorum, dbCfg.getReadQuorum(cl)); } - if (nodes.size() == 1 && nodes.iterator().next().equals(localNodeName) && maxReadQuorum <= 1) + if (nodes.size() == 1 && nodes.iterator().next().equals(localNodeName) && maxReadQuorum<=1) executeLocally = true; } else if (nodes.size() == 1 && nodes.iterator().next().equals(localNodeName)) @@ -454,8 +439,7 @@ protected boolean executeLocally(final String localNodeName, final ODistributedC return executeLocally; } - public OStorageOperationResult createRecord(final ORecordId iRecordId, final byte[] iContent, - final int iRecordVersion, final byte iRecordType, final int iMode, final ORecordCallback iCallback) { + public OStorageOperationResult createRecord(final ORecordId iRecordId, final byte[] iContent, final int iRecordVersion, final byte iRecordType, final int iMode, final ORecordCallback iCallback) { resetLastValidBackup(); if (OScenarioThreadLocal.INSTANCE.get() == RUN_MODE.RUNNING_DISTRIBUTED) @@ -507,11 +491,9 @@ public Object call() throws Exception { } if (!masterNode.equals(localNodeName)) - throw new ODistributedException("Error on inserting into cluster '" + clusterName + "' where local node '" + localNodeName - + "' is not the master of it, but it's '" + masterNode + "'"); + throw new ODistributedException("Error on inserting into cluster '" + clusterName + "' where local node '" + localNodeName + "' is not the master of it, but it's '" + masterNode + "'"); - OLogManager.instance().warn(this, "Local node '" + localNodeName + "' is not the master for cluster '" + clusterName - + "' (it's '" + masterNode + "'). Switching to a valid cluster of the same class: '" + newClusterName + "'"); + OLogManager.instance().warn(this, "Local node '" + localNodeName + "' is not the master for cluster '" + clusterName + "' (it's '" + masterNode + "'). Switching to a valid cluster of the same class: '" + newClusterName + "'"); clusterName = newClusterName; } @@ -522,21 +504,18 @@ public Object call() throws Exception { if (executionModeSynch) { // SYNCHRONOUS CALL: REPLICATE IT - final Object masterResult = dManager.sendRequest(getName(), Collections.singleton(clusterName), nodes, - new OCreateRecordTask(iRecordId, iContent, iRecordVersion, iRecordType), EXECUTION_MODE.RESPONSE); + final Object masterResult = dManager.sendRequest(getName(), Collections.singleton(clusterName), nodes, new OCreateRecordTask(iRecordId, iContent, iRecordVersion, iRecordType), EXECUTION_MODE.RESPONSE); if (masterResult instanceof ONeedRetryException) throw (ONeedRetryException) masterResult; else if (masterResult instanceof Exception) - throw OException.wrapException(new ODistributedException("Error on execution distributed CREATE_RECORD"), - (Exception) masterResult); + throw OException.wrapException(new ODistributedException("Error on execution distributed CREATE_RECORD"), (Exception) masterResult); // COPY THE CLUSTER POS -> RID final OPlaceholder masterPlaceholder = (OPlaceholder) masterResult; iRecordId.copyFrom(masterPlaceholder.getIdentity()); - return new OStorageOperationResult( - new OPhysicalPosition(masterPlaceholder.getIdentity().getClusterPosition(), masterPlaceholder.getVersion())); + return new OStorageOperationResult(new OPhysicalPosition(masterPlaceholder.getIdentity().getClusterPosition(), masterPlaceholder.getVersion())); } // ASYNCHRONOUS CALL: EXECUTE LOCALLY AND THEN DISTRIBUTE @@ -562,8 +541,7 @@ public Object call() throws Exception { // ASYNCHRONOUSLY REPLICATE IT TO ALL THE OTHER NODES nodes.remove(localNodeName); if (!nodes.isEmpty()) { - asynchronousExecution(new OAsynchDistributedOperation(getName(), Collections.singleton(clusterName), nodes, - new OCreateRecordTask(iRecordId, iContent, iRecordVersion, iRecordType))); + asynchronousExecution(new OAsynchDistributedOperation(getName(), Collections.singleton(clusterName), nodes, new OCreateRecordTask(iRecordId, iContent, iRecordVersion, iRecordType))); } // UPDATE RID WITH NEW POSITION @@ -581,8 +559,7 @@ public Object call() throws Exception { } } - public OStorageOperationResult readRecord(final ORecordId iRecordId, final String iFetchPlan, - final boolean iIgnoreCache, final ORecordCallback iCallback) { + public OStorageOperationResult readRecord(final ORecordId iRecordId, final String iFetchPlan, final boolean iIgnoreCache, final ORecordCallback iCallback) { if (deletedRecords.get(iRecordId) != null) // DELETED @@ -595,7 +572,7 @@ public OStorageOperationResult readRecord(final ORecordId iRecordId, final List nodes = dbCfg.getServers(clusterName, null); // CHECK IF LOCAL NODE OWNS THE DATA AND READ-QUORUM = 1: GET IT LOCALLY BECAUSE IT'S FASTER - if (nodes.isEmpty() || nodes.contains(dManager.getLocalNodeName()) && dbCfg.getReadQuorum(clusterName) <= 1) { + if (nodes.isEmpty() || nodes.contains(dManager.getLocalNodeName()) && dbCfg.getReadQuorum(clusterName)<=1) { // DON'T REPLICATE return (OStorageOperationResult) ODistributedAbstractPlugin.runInDistributedMode(new Callable() { @Override @@ -606,8 +583,7 @@ public Object call() throws Exception { } // DISTRIBUTE IT - final Object result = dManager.sendRequest(getName(), Collections.singleton(clusterName), nodes, - new OReadRecordTask(iRecordId), EXECUTION_MODE.RESPONSE); + final Object result = dManager.sendRequest(getName(), Collections.singleton(clusterName), nodes, new OReadRecordTask(iRecordId), EXECUTION_MODE.RESPONSE); if (result instanceof ONeedRetryException) throw (ONeedRetryException) result; @@ -627,8 +603,7 @@ else if (result instanceof Exception) } @Override - public OStorageOperationResult readRecordIfVersionIsNotLatest(final ORecordId rid, final String fetchPlan, - final boolean ignoreCache, final int recordVersion) throws ORecordNotFoundException { + public OStorageOperationResult readRecordIfVersionIsNotLatest(final ORecordId rid, final String fetchPlan, final boolean ignoreCache, final int recordVersion) throws ORecordNotFoundException { if (deletedRecords.get(rid) != null) // DELETED @@ -641,7 +616,7 @@ public OStorageOperationResult readRecordIfVersionIsNotLatest(final final List nodes = dbCfg.getServers(clusterName, null); // CHECK IF LOCAL NODE OWNS THE DATA AND READ-QUORUM = 1: GET IT LOCALLY BECAUSE IT'S FASTER - if (nodes.isEmpty() || nodes.contains(dManager.getLocalNodeName()) && dbCfg.getReadQuorum(clusterName) <= 1) { + if (nodes.isEmpty() || nodes.contains(dManager.getLocalNodeName()) && dbCfg.getReadQuorum(clusterName)<=1) { // DON'T REPLICATE return (OStorageOperationResult) ODistributedAbstractPlugin.runInDistributedMode(new Callable() { @Override @@ -652,8 +627,7 @@ public Object call() throws Exception { } // DISTRIBUTE IT - final Object result = dManager.sendRequest(getName(), Collections.singleton(clusterName), nodes, - new OReadRecordIfNotLatestTask(rid, recordVersion), EXECUTION_MODE.RESPONSE); + final Object result = dManager.sendRequest(getName(), Collections.singleton(clusterName), nodes, new OReadRecordIfNotLatestTask(rid, recordVersion), EXECUTION_MODE.RESPONSE); if (result instanceof ONeedRetryException) throw (ONeedRetryException) result; @@ -678,9 +652,7 @@ public OSBTreeCollectionManager getSBtreeCollectionManager() { } @Override - public OStorageOperationResult updateRecord(final ORecordId iRecordId, final boolean updateContent, - final byte[] iContent, final int iVersion, final byte iRecordType, final int iMode, - final ORecordCallback iCallback) { + public OStorageOperationResult updateRecord(final ORecordId iRecordId, final boolean updateContent, final byte[] iContent, final int iVersion, final byte iRecordType, final int iMode, final ORecordCallback iCallback) { resetLastValidBackup(); if (deletedRecords.get(iRecordId) != null) @@ -724,16 +696,12 @@ public Object call() throws Exception { throw new ORecordNotFoundException("Record with rid " + iRecordId + " was not found in database"); // REPLICATE IT - final Object result = dManager.sendRequest( - getName(), Collections.singleton(clusterName), nodes, new OUpdateRecordTask(iRecordId, - previousContent.getResult().getBuffer(), previousContent.getResult().version, iContent, iVersion, iRecordType), - EXECUTION_MODE.RESPONSE); + final Object result = dManager.sendRequest(getName(), Collections.singleton(clusterName), nodes, new OUpdateRecordTask(iRecordId, previousContent.getResult().getBuffer(), previousContent.getResult().version, iContent, iVersion, iRecordType), EXECUTION_MODE.RESPONSE); if (result instanceof ONeedRetryException) throw (ONeedRetryException) result; else if (result instanceof Exception) - throw OException.wrapException(new ODistributedException("Error on execution distributed UPDATE_RECORD"), - (Exception) result); + throw OException.wrapException(new ODistributedException("Error on execution distributed UPDATE_RECORD"), (Exception) result); // UPDATE LOCALLY return new OStorageOperationResult((Integer) result); @@ -761,9 +729,7 @@ public Object call() throws Exception { // LOAD PREVIOUS CONTENT TO BE USED IN CASE OF UNDO final OStorageOperationResult previousContent = readRecord(iRecordId, null, false, null); - asynchronousExecution( - new OAsynchDistributedOperation(getName(), Collections.singleton(clusterName), nodes, new OUpdateRecordTask(iRecordId, - previousContent.getResult().getBuffer(), previousContent.getResult().version, iContent, iVersion, iRecordType))); + asynchronousExecution(new OAsynchDistributedOperation(getName(), Collections.singleton(clusterName), nodes, new OUpdateRecordTask(iRecordId, previousContent.getResult().getBuffer(), previousContent.getResult().version, iContent, iVersion, iRecordType))); } return localResult; @@ -779,8 +745,7 @@ public Object call() throws Exception { } @Override - public OStorageOperationResult deleteRecord(final ORecordId iRecordId, final int iVersion, final int iMode, - final ORecordCallback iCallback) { + public OStorageOperationResult deleteRecord(final ORecordId iRecordId, final int iVersion, final int iMode, final ORecordCallback iCallback) { resetLastValidBackup(); if (OScenarioThreadLocal.INSTANCE.get() == RUN_MODE.RUNNING_DISTRIBUTED) @@ -814,14 +779,12 @@ public Object call() throws Exception { if (executionModeSynch) { // REPLICATE IT - final Object result = dManager.sendRequest(getName(), Collections.singleton(clusterName), nodes, - new ODeleteRecordTask(iRecordId, iVersion), EXECUTION_MODE.RESPONSE); + final Object result = dManager.sendRequest(getName(), Collections.singleton(clusterName), nodes, new ODeleteRecordTask(iRecordId, iVersion), EXECUTION_MODE.RESPONSE); if (result instanceof ONeedRetryException) throw (ONeedRetryException) result; else if (result instanceof Exception) - throw OException.wrapException(new ODistributedException("Error on execution distributed DELETE_RECORD"), - (Exception) result); + throw OException.wrapException(new ODistributedException("Error on execution distributed DELETE_RECORD"), (Exception) result); return new OStorageOperationResult(true); } @@ -845,8 +808,7 @@ public Object call() throws Exception { nodes.remove(localNodeName); if (!nodes.isEmpty()) - asynchronousExecution(new OAsynchDistributedOperation(getName(), Collections.singleton(clusterName), nodes, - new ODeleteRecordTask(iRecordId, iVersion))); + asynchronousExecution(new OAsynchDistributedOperation(getName(), Collections.singleton(clusterName), nodes, new ODeleteRecordTask(iRecordId, iVersion))); return localResult; @@ -1038,8 +1000,7 @@ public Object call() throws Exception { final int v = executionModeSynch ? record.getVersion() : record.getVersion(); - task = new OUpdateRecordTask(rid, previousContent.getResult().getBuffer(), previousContent.getResult().version, - record.toStream(), v, ORecordInternal.getRecordType(record)); + task = new OUpdateRecordTask(rid, previousContent.getResult().getBuffer(), previousContent.getResult().version, record.toStream(), v, ORecordInternal.getRecordType(record)); break; } @@ -1071,7 +1032,7 @@ public Object call() throws Exception { // AUTO-RETRY IN CASE RECORDS ARE LOCKED Object result = null; - for (int retry = 1; retry <= maxAutoRetry; ++retry) { + for (int retry = 1; retry<=maxAutoRetry; ++retry) { // SYNCHRONOUS CALL: REPLICATE IT result = dManager.sendRequest(getName(), involvedClusters, nodes, txTask, EXECUTION_MODE.RESPONSE); if (!processCommitResult(localNodeName, iTx, txTask, involvedClusters, tmpEntries, nodes, autoRetryDelay, result)) @@ -1082,8 +1043,7 @@ public Object call() throws Exception { } if (ODistributedServerLog.isDebugEnabled()) - ODistributedServerLog.debug(this, localNodeName, null, ODistributedServerLog.DIRECTION.NONE, - "distributed transaction retries exceed maximum auto-retries (%d)", maxAutoRetry); + ODistributedServerLog.debug(this, localNodeName, null, ODistributedServerLog.DIRECTION.NONE, "distributed transaction retries exceed maximum auto-retries (%d)", maxAutoRetry); // ONLY CASE: ODistributedRecordLockedException MORE THAN AUTO-RETRY throw (ODistributedRecordLockedException) result; @@ -1103,59 +1063,69 @@ public Object call() throws Exception { if (!nodes.isEmpty()) { if (executionModeSynch) dManager.sendRequest(getName(), involvedClusters, nodes, txTask, EXECUTION_MODE.RESPONSE); - else + else { + // MANAGE REPLICATION CALLBACK + final OAsyncReplicationOk onAsyncReplicationOk = OExecutionThreadLocal.INSTANCE.get().onAsyncReplicationOk; + final OAsyncReplicationError onAsyncReplicationError = getAsyncReplicationError(); + // ASYNCHRONOUSLY REPLICATE IT TO ALL THE OTHER NODES asynchronousExecution(new OAsynchDistributedOperation(getName(), involvedClusters, nodes, txTask, new OCallable() { @Override public Object call(final Object iArgument) { if (iArgument instanceof OTxTaskResult) { sendTxCompleted(localNodeName, involvedClusters, nodes, (OTxTaskResult) iArgument); + + if (onAsyncReplicationOk != null) + onAsyncReplicationOk.onAsyncReplicationOk(); + return null; } else if (iArgument instanceof Exception) { - final OAbstractRemoteTask undo = txTask.getUndoTaskForLocalStorage(iArgument); + try { + final OAbstractRemoteTask undo = txTask.getUndoTaskForLocalStorage(iArgument); - if (undo != null) - try { + if (undo != null) + try { - final ODatabaseDocumentTx database = new ODatabaseDocumentTx(getURL()); - database.setProperty(ODatabase.OPTIONS.SECURITY.toString(), OSecurityServerUser.class); - database.open("system", "system"); + final ODatabaseDocumentTx database = new ODatabaseDocumentTx(getURL()); + database.setProperty(ODatabase.OPTIONS.SECURITY.toString(), OSecurityServerUser.class); + database.open("system", "system"); - try { + try { + + undo.execute(serverInstance, dManager, database); + } finally { + database.close(); + } - undo.execute(serverInstance, dManager, database); - } finally { - database.close(); + } catch (Exception e) { + ODistributedServerLog.error(this, localNodeName, null, ODistributedServerLog.DIRECTION.NONE, "async distributed transaction failed, cannot revert local transaction. Current node could have a not aligned database. Remote answer: %s", e, iArgument); + throw OException.wrapException(new OTransactionException("Error on execution async distributed transaction, the database could be inconsistent"), (Exception) iArgument); } - } catch (Exception e) { - ODistributedServerLog.error(this, localNodeName, null, ODistributedServerLog.DIRECTION.NONE, "async distributed transaction failed, cannot revert local transaction. Current node could have a not aligned database. Remote answer: %s", e, iArgument); - throw OException.wrapException( - new OTransactionException( - "Error on execution async distributed transaction, the database could be inconsistent"), - (Exception) iArgument); - } - - if (ODistributedServerLog.isDebugEnabled()) - ODistributedServerLog.debug(this, localNodeName, null, ODistributedServerLog.DIRECTION.NONE, - "async distributed transaction failed: %s", iArgument); - - if (iArgument instanceof RuntimeException) - throw (RuntimeException) iArgument; - else - throw OException.wrapException(new OTransactionException("Error on execution async distributed transaction"), - (Exception) iArgument); + if (ODistributedServerLog.isDebugEnabled()) + ODistributedServerLog.debug(this, localNodeName, null, ODistributedServerLog.DIRECTION.NONE, "async distributed transaction failed: %s", iArgument); + + if (iArgument instanceof RuntimeException) + throw (RuntimeException) iArgument; + else + throw OException.wrapException(new OTransactionException("Error on execution async distributed transaction"), (Exception) iArgument); + + } finally { + + if (onAsyncReplicationError != null) + onAsyncReplicationError.onAsyncReplicationError((Throwable) iArgument, 0); + + } } // UNKNOWN RESPONSE TYPE if (ODistributedServerLog.isDebugEnabled()) - ODistributedServerLog.debug(this, localNodeName, null, ODistributedServerLog.DIRECTION.NONE, - "async distributed transaction error, received unknown response type: %s", iArgument); + ODistributedServerLog.debug(this, localNodeName, null, ODistributedServerLog.DIRECTION.NONE, "async distributed transaction error, received unknown response type: %s", iArgument); - throw new OTransactionException( - "Error on committing async distributed transaction, received unknown response type " + iArgument); + throw new OTransactionException("Error on committing async distributed transaction, received unknown response type " + iArgument); } })); + } } } catch (OValidationException e) { @@ -1171,18 +1141,18 @@ protected boolean processCommitResult(String localNodeName, OTransaction iTx, OT final List list = txResult.results; - for (int i = 0; i < txTask.getTasks().size(); ++i) { + for (int i = 0; i 0) + if (autoRetryDelay>0) Thread.sleep(autoRetryDelay); return false; } else if (result instanceof Exception) { // EXCEPTION: LOG IT AND ADD AS NESTED EXCEPTION if (ODistributedServerLog.isDebugEnabled()) - ODistributedServerLog.debug(this, localNodeName, null, ODistributedServerLog.DIRECTION.NONE, - "distributed transaction error: %s", result, result.toString()); + ODistributedServerLog.debug(this, localNodeName, null, ODistributedServerLog.DIRECTION.NONE, "distributed transaction error: %s", result, result.toString()); if (result instanceof OTransactionException || result instanceof ONeedRetryException) throw (RuntimeException) result; @@ -1217,8 +1186,7 @@ protected boolean processCommitResult(String localNodeName, OTransaction iTx, OT } else { // UNKNOWN RESPONSE TYPE if (ODistributedServerLog.isDebugEnabled()) - ODistributedServerLog.debug(this, localNodeName, null, ODistributedServerLog.DIRECTION.NONE, - "distributed transaction error, received unknown response type: %s", result); + ODistributedServerLog.debug(this, localNodeName, null, ODistributedServerLog.DIRECTION.NONE, "distributed transaction error, received unknown response type: %s", result); throw new OTransactionException("Error on committing distributed transaction, received unknown response type " + result); } @@ -1227,13 +1195,11 @@ protected boolean processCommitResult(String localNodeName, OTransaction iTx, OT private void sendTxCompleted(String localNodeName, Set involvedClusters, Set nodes, OTxTaskResult txResult) { // SEND FINAL TX COMPLETE TASK TO UNLOCK RECORDS - final Object completedResult = dManager.sendRequest(getName(), involvedClusters, nodes, new OCompletedTxTask(txResult.locks), - EXECUTION_MODE.RESPONSE); + final Object completedResult = dManager.sendRequest(getName(), involvedClusters, nodes, new OCompletedTxTask(txResult.locks), EXECUTION_MODE.RESPONSE); if (!(completedResult instanceof Boolean) || !((Boolean) completedResult).booleanValue()) { // EXCEPTION: LOG IT AND ADD AS NESTED EXCEPTION - ODistributedServerLog.error(this, localNodeName, null, ODistributedServerLog.DIRECTION.NONE, - "distributed transaction complete error: %s", completedResult); + ODistributedServerLog.error(this, localNodeName, null, ODistributedServerLog.DIRECTION.NONE, "distributed transaction complete error: %s", completedResult); } } @@ -1269,7 +1235,7 @@ public Collection getClusterInstances() { @Override public int addCluster(final String iClusterName, boolean forceListBased, final Object... iParameters) { - for (int retry = 0; retry < 10; ++retry) { + for (int retry = 0; retry<10; ++retry) { int clId = wrapped.addCluster(iClusterName, false, iParameters); if (OScenarioThreadLocal.INSTANCE.get() == RUN_MODE.DEFAULT) { @@ -1283,9 +1249,7 @@ public int addCluster(final String iClusterName, boolean forceListBased, final O final Object result = command(commandSQL); if (result != null && ((Integer) result).intValue() != clId) { - OLogManager.instance().warn(this, - "Error on creating cluster on distributed nodes: ids are different (local=%d and remote=%d). Retrying %d/%d...", clId, - ((Integer) result).intValue(), retry, 10); + OLogManager.instance().warn(this, "Error on creating cluster on distributed nodes: ids are different (local=%d and remote=%d). Retrying %d/%d...", clId, ((Integer) result).intValue(), retry, 10); wrapped.dropCluster(clId, false); @@ -1479,14 +1443,12 @@ public void release() { } @Override - public List backup(final OutputStream out, final Map options, final Callable callable, - final OCommandOutputListener iListener, final int compressionLevel, final int bufferSize) throws IOException { + public List backup(final OutputStream out, final Map options, final Callable callable, final OCommandOutputListener iListener, final int compressionLevel, final int bufferSize) throws IOException { return wrapped.backup(out, options, callable, iListener, compressionLevel, bufferSize); } @Override - public void restore(final InputStream in, final Map options, final Callable callable, - final OCommandOutputListener iListener) throws IOException { + public void restore(final InputStream in, final Map options, final Callable callable, final OCommandOutputListener iListener) throws IOException { wrapped.restore(in, options, callable, iListener); } @@ -1552,6 +1514,33 @@ protected void asynchronousExecution(final OAsynchDistributedOperation iOperatio asynchronousOperationsQueue.offer(iOperation); } + + protected OAsyncReplicationError getAsyncReplicationError() { + if (OExecutionThreadLocal.INSTANCE.get().onAsyncReplicationError != null) { + + final OAsyncReplicationError subCallback = OExecutionThreadLocal.INSTANCE.get().onAsyncReplicationError; + final ODatabaseDocumentTx currentDatabase = (ODatabaseDocumentTx) ODatabaseRecordThreadLocal.INSTANCE.get(); + final ODatabaseDocumentTx copyDatabase = currentDatabase.copy(); + currentDatabase.activateOnCurrentThread(); + + return new OAsyncReplicationError() { + @Override + public ACTION onAsyncReplicationError(final Throwable iException, final int iRetry) { + copyDatabase.activateOnCurrentThread(); + switch (subCallback.onAsyncReplicationError(iException, iRetry)) { + case RETRY: + break; + + case IGNORE: + } + + return OAsyncReplicationError.ACTION.IGNORE; + } + }; + } else + return null; + } + protected void handleDistributedException(final String iMessage, final Exception e, final Object... iParams) { if (e != null) { if (e instanceof OException)