From e80f4ec102af642571129129104a2ff804c89271 Mon Sep 17 00:00:00 2001 From: lvca Date: Sat, 1 Feb 2014 15:50:23 +0100 Subject: [PATCH 1/2] Distributed: auto deploy, changed logging levels --- .../server/hazelcast/OHazelcastPlugin.java | 40 +++++++++---------- .../task/OCopyDatabaseChunkTask.java | 2 +- .../distributed/task/ODeployDatabaseTask.java | 6 +-- 3 files changed, 24 insertions(+), 24 deletions(-) diff --git a/distributed/src/main/java/com/orientechnologies/orient/server/hazelcast/OHazelcastPlugin.java b/distributed/src/main/java/com/orientechnologies/orient/server/hazelcast/OHazelcastPlugin.java index fcb46a04a58..0f52ed8de1b 100755 --- a/distributed/src/main/java/com/orientechnologies/orient/server/hazelcast/OHazelcastPlugin.java +++ b/distributed/src/main/java/com/orientechnologies/orient/server/hazelcast/OHazelcastPlugin.java @@ -15,6 +15,23 @@ */ package com.orientechnologies.orient.server.hazelcast; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Date; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.locks.Lock; + import com.hazelcast.config.FileSystemXmlConfig; import com.hazelcast.config.QueueConfig; import com.hazelcast.core.EntryEvent; @@ -59,23 +76,6 @@ import com.orientechnologies.orient.server.distributed.task.ODeployDatabaseTask; import com.orientechnologies.orient.server.network.OServerNetworkListener; -import java.io.File; -import java.io.FileInputStream; -import java.io.FileNotFoundException; -import java.io.FileOutputStream; -import java.io.IOException; -import java.io.Serializable; -import java.util.ArrayList; -import java.util.Date; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.locks.Lock; - /** * Hazelcast implementation for clustering. * @@ -685,7 +685,7 @@ protected void installNewDatabases(final boolean iStartup) { final String fileName = Orient.getTempPath() + "install_" + databaseName + ".zip"; - ODistributedServerLog.warn(this, getLocalNodeName(), r.getKey(), DIRECTION.IN, + ODistributedServerLog.info(this, getLocalNodeName(), r.getKey(), DIRECTION.IN, "copying remote database '%s' to: %s", databaseName, fileName); final File file = new File(fileName); @@ -711,7 +711,7 @@ protected void installNewDatabases(final boolean iStartup) { } } - ODistributedServerLog.warn(this, getLocalNodeName(), null, DIRECTION.NONE, + ODistributedServerLog.info(this, getLocalNodeName(), null, DIRECTION.NONE, "database copied correctly, size=%s", OFileUtils.getSizeAsString(fileSize)); } catch (Exception e) { @@ -785,7 +785,7 @@ protected void installDatabase(final OHazelcastDistributedDatabase distrDatabase db.close(); Orient.instance().unregisterStorageByName(db.getName()); - ODistributedServerLog.warn(this, getLocalNodeName(), null, DIRECTION.NONE, "installed database '%s', setting it online...", + ODistributedServerLog.info(this, getLocalNodeName(), null, DIRECTION.NONE, "installed database '%s', setting it online...", databaseName); distrDatabase.setOnline(); diff --git a/server/src/main/java/com/orientechnologies/orient/server/distributed/task/OCopyDatabaseChunkTask.java b/server/src/main/java/com/orientechnologies/orient/server/distributed/task/OCopyDatabaseChunkTask.java index 2c44612b0f7..06ef403bcf9 100644 --- a/server/src/main/java/com/orientechnologies/orient/server/distributed/task/OCopyDatabaseChunkTask.java +++ b/server/src/main/java/com/orientechnologies/orient/server/distributed/task/OCopyDatabaseChunkTask.java @@ -60,7 +60,7 @@ public Object execute(final OServer iServer, ODistributedServerManager iManager, final ODistributedDatabaseChunk result = new ODistributedDatabaseChunk(f, offset, ODeployDatabaseTask.CHUNK_MAX_SIZE); - ODistributedServerLog.warn(this, iManager.getLocalNodeName(), getNodeSource(), ODistributedServerLog.DIRECTION.OUT, + ODistributedServerLog.info(this, iManager.getLocalNodeName(), getNodeSource(), ODistributedServerLog.DIRECTION.OUT, "- transferring chunk #%d offset=%d size=%s...", chunkNum, result.offset, OFileUtils.getSizeAsNumber(result.buffer.length)); return result; diff --git a/server/src/main/java/com/orientechnologies/orient/server/distributed/task/ODeployDatabaseTask.java b/server/src/main/java/com/orientechnologies/orient/server/distributed/task/ODeployDatabaseTask.java index a14f5938ce6..face0d25902 100644 --- a/server/src/main/java/com/orientechnologies/orient/server/distributed/task/ODeployDatabaseTask.java +++ b/server/src/main/java/com/orientechnologies/orient/server/distributed/task/ODeployDatabaseTask.java @@ -66,18 +66,18 @@ public Object execute(final OServer iServer, ODistributedServerManager iManager, f.getParentFile().mkdirs(); f.createNewFile(); - ODistributedServerLog.warn(this, iManager.getLocalNodeName(), getNodeSource(), DIRECTION.OUT, + ODistributedServerLog.info(this, iManager.getLocalNodeName(), getNodeSource(), DIRECTION.OUT, "creating backup of database '%s' in directory: %s...", databaseName, f.getAbsolutePath()); database.backup(new FileOutputStream(f), null, null); - ODistributedServerLog.warn(this, iManager.getLocalNodeName(), getNodeSource(), DIRECTION.OUT, + ODistributedServerLog.info(this, iManager.getLocalNodeName(), getNodeSource(), DIRECTION.OUT, "sending the compressed database '%s' over the network to node '%s', size=%s...", databaseName, getNodeSource(), OFileUtils.getSizeAsString(f.length())); final ODistributedDatabaseChunk chunk = new ODistributedDatabaseChunk(f, 0, CHUNK_MAX_SIZE); - ODistributedServerLog.warn(this, iManager.getLocalNodeName(), getNodeSource(), ODistributedServerLog.DIRECTION.OUT, + ODistributedServerLog.info(this, iManager.getLocalNodeName(), getNodeSource(), ODistributedServerLog.DIRECTION.OUT, "- transferring chunk #%d offset=%d size=%s...", 1, 0, OFileUtils.getSizeAsNumber(chunk.buffer.length)); return chunk; From 07854f564d47ccd9b01a0e4841b88cc726f9aea4 Mon Sep 17 00:00:00 2001 From: lvca Date: Sun, 2 Feb 2014 02:12:37 +0100 Subject: [PATCH 2/2] Fixed issue #2010 by supporting pessimistic locks on SQL update --- .../orient/client/remote/OStorageRemote.java | 3 +- .../client/remote/OStorageRemoteThread.java | 5 +- .../common/concur/lock/OLockManager.java | 28 ++ .../core/command/OCommandRequestAbstract.java | 36 ++- .../core/config/OStorageConfiguration.java | 2 +- .../orient/core/db/ODatabaseComplex.java | 8 +- .../db/ODatabaseRecordWrapperAbstract.java | 25 +- .../core/db/document/ODatabaseDocumentTx.java | 12 +- .../orient/core/db/graph/OGraphDatabase.java | 4 +- .../orient/core/db/raw/ODatabaseRaw.java | 27 +- .../db/record/ODatabaseRecordAbstract.java | 163 +++++----- .../core/db/record/ODatabaseRecordTx.java | 28 +- .../orient/core/db/record/OIdentifiable.java | 28 +- .../orient/core/db/tool/ODatabaseCompare.java | 8 +- .../core/iterator/OIdentifiableIterator.java | 59 ++-- .../core/iterator/ORecordIteratorClass.java | 10 +- .../core/iterator/ORecordIteratorCluster.java | 8 +- .../iterator/ORecordIteratorClusters.java | 15 +- .../orient/core/record/impl/ODocument.java | 3 +- .../sql/OCommandExecutorSQLAlterClass.java | 2 +- .../OCommandExecutorSQLResultsetAbstract.java | 14 +- .../core/sql/OCommandExecutorSQLSelect.java | 6 +- .../core/sql/OCommandExecutorSQLUpdate.java | 282 ++++++++++-------- .../orient/core/storage/OStorage.java | 18 +- .../orient/core/storage/OStorageEmbedded.java | 6 +- .../local/OStorageConfigurationSegment.java | 3 +- .../storage/impl/local/OStorageLocal.java | 58 ++-- .../impl/local/OStorageLocalTxExecuter.java | 5 +- .../paginated/OLocalPaginatedStorage.java | 70 +++-- .../storage/impl/memory/OStorageMemory.java | 58 ++-- .../orient/core/tx/OTransaction.java | 103 +++---- .../orient/core/tx/OTransactionNoTx.java | 11 +- .../core/tx/OTransactionOptimistic.java | 34 ++- .../OMVRBTreeEntryDataProviderAbstract.java | 2 +- .../provider/OMVRBTreeProviderAbstract.java | 2 +- .../oldsharding/OAutoshardedStorageImpl.java | 7 +- .../orient/OrientElementScanIterable.java | 4 +- .../object/db/OCommandSQLPojoWrapper.java | 9 +- .../orient/object/db/OObjectDatabaseTx.java | 14 +- .../distributed/ODistributedStorage.java | 9 +- .../binary/ONetworkProtocolBinary.java | 40 ++- .../database/auto/ConcurrentUpdatesTest.java | 207 ++++++++++--- .../speed/ReadAllClusterObjectsSpeedTest.java | 9 +- .../orient/console/OConsoleDatabaseApp.java | 4 +- 44 files changed, 888 insertions(+), 561 deletions(-) diff --git a/client/src/main/java/com/orientechnologies/orient/client/remote/OStorageRemote.java b/client/src/main/java/com/orientechnologies/orient/client/remote/OStorageRemote.java index d1e913771c2..cbc21fb1418 100755 --- a/client/src/main/java/com/orientechnologies/orient/client/remote/OStorageRemote.java +++ b/client/src/main/java/com/orientechnologies/orient/client/remote/OStorageRemote.java @@ -413,8 +413,7 @@ public ORecordMetadata getRecordMetadata(final ORID rid) { } while (true); } - public OStorageOperationResult readRecord(final ORecordId iRid, final String iFetchPlan, final boolean iIgnoreCache, - final ORecordCallback iCallback, boolean loadTombstones) { + public OStorageOperationResult readRecord(final ORecordId iRid, final String iFetchPlan, final boolean iIgnoreCache, final ORecordCallback iCallback, boolean loadTombstones, LOCKING_STRATEGY iLockingStrategy) { checkConnection(); if (OStorageRemoteThreadLocal.INSTANCE.get().commandExecuting) diff --git a/client/src/main/java/com/orientechnologies/orient/client/remote/OStorageRemoteThread.java b/client/src/main/java/com/orientechnologies/orient/client/remote/OStorageRemoteThread.java index 87c73c49279..d2f42953135 100755 --- a/client/src/main/java/com/orientechnologies/orient/client/remote/OStorageRemoteThread.java +++ b/client/src/main/java/com/orientechnologies/orient/client/remote/OStorageRemoteThread.java @@ -220,11 +220,10 @@ public OStorageOperationResult createRecord(final int iDataSe } } - public OStorageOperationResult readRecord(final ORecordId iRid, final String iFetchPlan, boolean iIgnoreCache, - ORecordCallback iCallback, boolean loadTombstones) { + public OStorageOperationResult readRecord(final ORecordId iRid, final String iFetchPlan, boolean iIgnoreCache, ORecordCallback iCallback, boolean loadTombstones, LOCKING_STRATEGY iLockingStrategy) { pushSession(); try { - return delegate.readRecord(iRid, iFetchPlan, iIgnoreCache, null, loadTombstones); + return delegate.readRecord(iRid, iFetchPlan, iIgnoreCache, null, loadTombstones, LOCKING_STRATEGY.DEFAULT); } finally { popSession(); } diff --git a/commons/src/main/java/com/orientechnologies/common/concur/lock/OLockManager.java b/commons/src/main/java/com/orientechnologies/common/concur/lock/OLockManager.java index 1d7b9cece6c..bf4edbbd0ef 100755 --- a/commons/src/main/java/com/orientechnologies/common/concur/lock/OLockManager.java +++ b/commons/src/main/java/com/orientechnologies/common/concur/lock/OLockManager.java @@ -183,7 +183,32 @@ public void releaseLock(final REQUESTER_TYPE iRequester, final RESOURCE_TYPE iRe lock.readLock().unlock(); else lock.writeLock().unlock(); + } + + public void modifyLock(final REQUESTER_TYPE iRequester, final RESOURCE_TYPE iResourceId, final LOCK iCurrentLockType, + final LOCK iNewLockType) throws OLockException { + if (!enabled || iNewLockType == iCurrentLockType) + return; + final CountableLock lock; + final Object internalLock = internalLock(iResourceId); + synchronized (internalLock) { + lock = map.get(iResourceId); + if (lock == null) + throw new OLockException("Error on releasing a non acquired lock by the requester '" + iRequester + + "' against the resource: '" + iResourceId + "'"); + + if (iCurrentLockType == LOCK.SHARED) + lock.readLock().unlock(); + else + lock.writeLock().unlock(); + + // RE-ACQUIRE IT + if (iNewLockType == LOCK.SHARED) + lock.readLock().lock(); + else + lock.writeLock().lock(); + } } public void clear() { @@ -195,6 +220,9 @@ public int getCountCurrentLocks() { return map.size(); } + public void releaseAllLocksOfRequester(REQUESTER_TYPE iRequester) { + } + protected RESOURCE_TYPE getImmutableResourceId(final RESOURCE_TYPE iResourceId) { return iResourceId; } diff --git a/core/src/main/java/com/orientechnologies/orient/core/command/OCommandRequestAbstract.java b/core/src/main/java/com/orientechnologies/orient/core/command/OCommandRequestAbstract.java index 716db2c9b35..886d49b2bdc 100644 --- a/core/src/main/java/com/orientechnologies/orient/core/command/OCommandRequestAbstract.java +++ b/core/src/main/java/com/orientechnologies/orient/core/command/OCommandRequestAbstract.java @@ -15,13 +15,14 @@ */ package com.orientechnologies.orient.core.command; -import java.util.HashMap; -import java.util.Map; - import com.orientechnologies.common.listener.OProgressListener; import com.orientechnologies.orient.core.command.OCommandContext.TIMEOUT_STRATEGY; import com.orientechnologies.orient.core.config.OGlobalConfiguration; import com.orientechnologies.orient.core.db.record.OIdentifiable; +import com.orientechnologies.orient.core.storage.OStorage; + +import java.util.HashMap; +import java.util.Map; /** * Text based Command Request abstract class. @@ -31,15 +32,16 @@ */ @SuppressWarnings("serial") public abstract class OCommandRequestAbstract implements OCommandRequestInternal { - protected OCommandResultListener resultListener; - protected OProgressListener progressListener; - protected int limit = -1; - protected long timeoutMs = OGlobalConfiguration.COMMAND_TIMEOUT.getValueAsLong(); - protected TIMEOUT_STRATEGY timeoutStrategy = TIMEOUT_STRATEGY.EXCEPTION; - protected Map parameters; - protected String fetchPlan = null; - protected boolean useCache = false; - protected OCommandContext context; + protected OCommandResultListener resultListener; + protected OProgressListener progressListener; + protected int limit = -1; + protected long timeoutMs = OGlobalConfiguration.COMMAND_TIMEOUT.getValueAsLong(); + protected TIMEOUT_STRATEGY timeoutStrategy = TIMEOUT_STRATEGY.EXCEPTION; + protected OStorage.LOCKING_STRATEGY lockStrategy = OStorage.LOCKING_STRATEGY.NONE; + protected Map parameters; + protected String fetchPlan = null; + protected boolean useCache = false; + protected OCommandContext context; protected OCommandRequestAbstract() { } @@ -137,7 +139,7 @@ public long getTimeoutTime() { return timeoutMs; } - public void setTimeout(final long timeout, TIMEOUT_STRATEGY strategy) { + public void setTimeout(final long timeout, final TIMEOUT_STRATEGY strategy) { this.timeoutMs = timeout; this.timeoutStrategy = strategy; } @@ -145,4 +147,12 @@ public void setTimeout(final long timeout, TIMEOUT_STRATEGY strategy) { public TIMEOUT_STRATEGY getTimeoutStrategy() { return timeoutStrategy; } + + public OStorage.LOCKING_STRATEGY getLockingStrategy() { + return lockStrategy; + } + + public void setLockStrategy(final OStorage.LOCKING_STRATEGY lockStrategy) { + this.lockStrategy = lockStrategy; + } } diff --git a/core/src/main/java/com/orientechnologies/orient/core/config/OStorageConfiguration.java b/core/src/main/java/com/orientechnologies/orient/core/config/OStorageConfiguration.java index 58c00cb1811..2c572c8653d 100755 --- a/core/src/main/java/com/orientechnologies/orient/core/config/OStorageConfiguration.java +++ b/core/src/main/java/com/orientechnologies/orient/core/config/OStorageConfiguration.java @@ -93,7 +93,7 @@ public OStorageConfiguration(final OStorage iStorage) { * @throws OSerializationException */ public OStorageConfiguration load() throws OSerializationException { - final byte[] record = storage.readRecord(CONFIG_RID, null, false, null, false).getResult().buffer; + final byte[] record = storage.readRecord(CONFIG_RID, null, false, null, false, OStorage.LOCKING_STRATEGY.DEFAULT).getResult().buffer; if (record == null) throw new OStorageException("Cannot load database's configuration. The database seems to be corrupted."); diff --git a/core/src/main/java/com/orientechnologies/orient/core/db/ODatabaseComplex.java b/core/src/main/java/com/orientechnologies/orient/core/db/ODatabaseComplex.java index 184a0b4b39c..495ebd4e112 100755 --- a/core/src/main/java/com/orientechnologies/orient/core/db/ODatabaseComplex.java +++ b/core/src/main/java/com/orientechnologies/orient/core/db/ODatabaseComplex.java @@ -105,13 +105,16 @@ public enum OPERATION_MODE { /** * Loads a record using a fetch plan. * + * * @param iObject * Record to load * @param iFetchPlan * Fetch plan used + * @param iLockingStrategy * @return The record received */ - public RET load(T iObject, String iFetchPlan, boolean iIgnoreCache, boolean loadTombstone); + public RET load(T iObject, String iFetchPlan, boolean iIgnoreCache, boolean loadTombstone, + OStorage.LOCKING_STRATEGY iLockingStrategy); /** * Loads a record using a fetch plan. @@ -172,7 +175,8 @@ public enum OPERATION_MODE { */ public RET load(ORID iRecordId, String iFetchPlan, boolean iIgnoreCache); - public RET load(ORID iRecordId, String iFetchPlan, boolean iIgnoreCache, boolean loadTombstone); + public RET load(ORID iRecordId, String iFetchPlan, boolean iIgnoreCache, boolean loadTombstone, + OStorage.LOCKING_STRATEGY iLockingStrategy); /** * Saves an entity in synchronous mode. If the entity is not dirty, then the operation will be ignored. For custom entity diff --git a/core/src/main/java/com/orientechnologies/orient/core/db/ODatabaseRecordWrapperAbstract.java b/core/src/main/java/com/orientechnologies/orient/core/db/ODatabaseRecordWrapperAbstract.java index af84a1efe41..22fa2095718 100755 --- a/core/src/main/java/com/orientechnologies/orient/core/db/ODatabaseRecordWrapperAbstract.java +++ b/core/src/main/java/com/orientechnologies/orient/core/db/ODatabaseRecordWrapperAbstract.java @@ -15,14 +15,6 @@ */ package com.orientechnologies.orient.core.db; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.Callable; - import com.orientechnologies.orient.core.command.OCommandRequest; import com.orientechnologies.orient.core.db.record.ODatabaseRecord; import com.orientechnologies.orient.core.db.record.OIdentifiable; @@ -49,6 +41,14 @@ import com.orientechnologies.orient.core.tx.OTransaction.TXTYPE; import com.orientechnologies.orient.core.version.ORecordVersion; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.Callable; + @SuppressWarnings("unchecked") public abstract class ODatabaseRecordWrapperAbstract extends ODatabaseWrapperAbstract implements ODatabaseComplex> { @@ -226,14 +226,13 @@ public > RET load(final ORID iRecordId, final Str } @Override - public > RET load(ORID iRecordId, String iFetchPlan, boolean iIgnoreCache, boolean loadTombstone) { - return (RET) underlying.load(iRecordId, iFetchPlan, iIgnoreCache, loadTombstone); + public > RET load(ORID iRecordId, String iFetchPlan, boolean iIgnoreCache, boolean loadTombstone, OStorage.LOCKING_STRATEGY iLockingStrategy) { + return (RET) underlying.load(iRecordId, iFetchPlan, iIgnoreCache, loadTombstone, OStorage.LOCKING_STRATEGY.DEFAULT); } @Override - public > RET load(ORecordInternal iObject, String iFetchPlan, boolean iIgnoreCache, - boolean loadTombstone) { - return (RET) underlying.load(iObject, iFetchPlan, iIgnoreCache, loadTombstone); + public > RET load(ORecordInternal iObject, String iFetchPlan, boolean iIgnoreCache, boolean loadTombstone, OStorage.LOCKING_STRATEGY iLockingStrategy) { + return (RET) underlying.load(iObject, iFetchPlan, iIgnoreCache, loadTombstone, OStorage.LOCKING_STRATEGY.DEFAULT); } public > RET getRecord(final OIdentifiable iIdentifiable) { diff --git a/core/src/main/java/com/orientechnologies/orient/core/db/document/ODatabaseDocumentTx.java b/core/src/main/java/com/orientechnologies/orient/core/db/document/ODatabaseDocumentTx.java index 5047e66c3e0..0f1e981e213 100755 --- a/core/src/main/java/com/orientechnologies/orient/core/db/document/ODatabaseDocumentTx.java +++ b/core/src/main/java/com/orientechnologies/orient/core/db/document/ODatabaseDocumentTx.java @@ -16,8 +16,6 @@ package com.orientechnologies.orient.core.db.document; -import java.util.*; - import com.orientechnologies.common.exception.OException; import com.orientechnologies.common.log.OLogManager; import com.orientechnologies.orient.core.Orient; @@ -39,9 +37,17 @@ import com.orientechnologies.orient.core.record.ORecordInternal; import com.orientechnologies.orient.core.record.impl.ODocument; import com.orientechnologies.orient.core.storage.ORecordCallback; +import com.orientechnologies.orient.core.storage.OStorage; import com.orientechnologies.orient.core.storage.impl.local.OFreezableStorage; import com.orientechnologies.orient.core.version.ORecordVersion; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.Iterator; +import java.util.List; + @SuppressWarnings("unchecked") public class ODatabaseDocumentTx extends ODatabaseRecordWrapperAbstract implements ODatabaseDocument { public ODatabaseDocumentTx(final String iURL) { @@ -198,7 +204,7 @@ public ORecordIteratorCluster browseCluster(String iClusterName, OClu checkSecurity(ODatabaseSecurityResources.CLUSTER, ORole.PERMISSION_READ, iClusterName); return new ORecordIteratorCluster(this, underlying, getClusterIdByName(iClusterName), startClusterPosition, - endClusterPosition, true, loadTombstones); + endClusterPosition, true, loadTombstones, OStorage.LOCKING_STRATEGY.DEFAULT); } /** diff --git a/core/src/main/java/com/orientechnologies/orient/core/db/graph/OGraphDatabase.java b/core/src/main/java/com/orientechnologies/orient/core/db/graph/OGraphDatabase.java index 2c49fdc6096..dab72ed48a5 100755 --- a/core/src/main/java/com/orientechnologies/orient/core/db/graph/OGraphDatabase.java +++ b/core/src/main/java/com/orientechnologies/orient/core/db/graph/OGraphDatabase.java @@ -30,7 +30,6 @@ import com.orientechnologies.orient.core.db.record.ODatabaseRecordTx; import com.orientechnologies.orient.core.db.record.OIdentifiable; import com.orientechnologies.orient.core.db.record.ridbag.ORidBag; -import com.orientechnologies.orient.core.db.record.ridbag.sbtree.OSBTreeRidBag; import com.orientechnologies.orient.core.id.ORID; import com.orientechnologies.orient.core.iterator.ORecordIteratorClass; import com.orientechnologies.orient.core.metadata.schema.OClass; @@ -149,7 +148,8 @@ public Iterable browseEdges(final boolean iPolymorphic) { } public Iterable browseElements(final String iClass, final boolean iPolymorphic) { - return new ORecordIteratorClass(this, (ODatabaseRecordAbstract) getUnderlying(), iClass, iPolymorphic, true, false); + return new ORecordIteratorClass(this, (ODatabaseRecordAbstract) getUnderlying(), iClass, iPolymorphic, true, false, + OStorage.LOCKING_STRATEGY.DEFAULT); } public ODocument createVertex() { diff --git a/core/src/main/java/com/orientechnologies/orient/core/db/raw/ODatabaseRaw.java b/core/src/main/java/com/orientechnologies/orient/core/db/raw/ODatabaseRaw.java index 5305bf03fd9..f0d693b7fa0 100755 --- a/core/src/main/java/com/orientechnologies/orient/core/db/raw/ODatabaseRaw.java +++ b/core/src/main/java/com/orientechnologies/orient/core/db/raw/ODatabaseRaw.java @@ -16,22 +16,6 @@ package com.orientechnologies.orient.core.db.raw; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.IdentityHashMap; -import java.util.Iterator; -import java.util.List; -import java.util.Locale; -import java.util.Map; -import java.util.Map.Entry; -import java.util.TimeZone; -import java.util.concurrent.Callable; - import com.orientechnologies.common.concur.lock.ONoLock; import com.orientechnologies.common.exception.OException; import com.orientechnologies.common.listener.OListenerManger; @@ -66,6 +50,13 @@ import com.orientechnologies.orient.core.storage.impl.local.paginated.OLocalPaginatedStorage; import com.orientechnologies.orient.core.version.ORecordVersion; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.*; +import java.util.Map.Entry; +import java.util.concurrent.Callable; + /** * Lower level ODatabase implementation. It's extended or wrapped by all the others. * @@ -234,14 +225,14 @@ public long countClusterElements(int[] iClusterIds, boolean countTombstones) { } public OStorageOperationResult read(final ORecordId iRid, final String iFetchPlan, final boolean iIgnoreCache, - boolean loadTombstones) { + final boolean loadTombstones, final OStorage.LOCKING_STRATEGY iLockingStrategy) { if (!iRid.isValid()) return new OStorageOperationResult(null); OFetchHelper.checkFetchPlanValid(iFetchPlan); try { - return storage.readRecord(iRid, iFetchPlan, iIgnoreCache, null, loadTombstones); + return storage.readRecord(iRid, iFetchPlan, iIgnoreCache, null, loadTombstones, iLockingStrategy); } catch (Throwable t) { if (iRid.isTemporary()) diff --git a/core/src/main/java/com/orientechnologies/orient/core/db/record/ODatabaseRecordAbstract.java b/core/src/main/java/com/orientechnologies/orient/core/db/record/ODatabaseRecordAbstract.java index 75f83685c3c..7bd33dbe2fd 100755 --- a/core/src/main/java/com/orientechnologies/orient/core/db/record/ODatabaseRecordAbstract.java +++ b/core/src/main/java/com/orientechnologies/orient/core/db/record/ODatabaseRecordAbstract.java @@ -15,9 +15,6 @@ */ package com.orientechnologies.orient.core.db.record; -import java.util.*; -import java.util.concurrent.Callable; - import com.orientechnologies.common.exception.OException; import com.orientechnologies.common.log.OLogManager; import com.orientechnologies.orient.core.Orient; @@ -73,13 +70,22 @@ import com.orientechnologies.orient.core.serialization.serializer.record.ORecordSerializer; import com.orientechnologies.orient.core.serialization.serializer.record.ORecordSerializerFactory; import com.orientechnologies.orient.core.sql.OCommandSQL; -import com.orientechnologies.orient.core.storage.*; +import com.orientechnologies.orient.core.storage.ORawBuffer; +import com.orientechnologies.orient.core.storage.ORecordCallback; +import com.orientechnologies.orient.core.storage.ORecordMetadata; +import com.orientechnologies.orient.core.storage.OStorage; +import com.orientechnologies.orient.core.storage.OStorageEmbedded; +import com.orientechnologies.orient.core.storage.OStorageOperationResult; +import com.orientechnologies.orient.core.storage.OStorageProxy; import com.orientechnologies.orient.core.storage.impl.local.paginated.ORecordSerializationContext; import com.orientechnologies.orient.core.tx.OTransactionRealAbstract; import com.orientechnologies.orient.core.type.tree.provider.OMVRBTreeRIDProvider; import com.orientechnologies.orient.core.version.ORecordVersion; import com.orientechnologies.orient.core.version.OVersionFactory; +import java.util.*; +import java.util.concurrent.Callable; + @SuppressWarnings("unchecked") public abstract class ODatabaseRecordAbstract extends ODatabaseWrapperAbstract implements ODatabaseRecord { @@ -305,50 +311,56 @@ public void reload() { } public > RET reload(final ORecordInternal iRecord) { - return executeReadRecord((ORecordId) iRecord.getIdentity(), iRecord, null, true, false); + return executeReadRecord((ORecordId) iRecord.getIdentity(), iRecord, null, true, false, OStorage.LOCKING_STRATEGY.DEFAULT); } public > RET reload(final ORecordInternal iRecord, final String iFetchPlan) { - return (RET) executeReadRecord((ORecordId) iRecord.getIdentity(), iRecord, iFetchPlan, true, false); + return (RET) executeReadRecord((ORecordId) iRecord.getIdentity(), iRecord, iFetchPlan, true, false, + OStorage.LOCKING_STRATEGY.DEFAULT); } public > RET reload(final ORecordInternal iRecord, final String iFetchPlan, boolean iIgnoreCache) { - return (RET) executeReadRecord((ORecordId) iRecord.getIdentity(), iRecord, iFetchPlan, iIgnoreCache, false); + return (RET) executeReadRecord((ORecordId) iRecord.getIdentity(), iRecord, iFetchPlan, iIgnoreCache, false, + OStorage.LOCKING_STRATEGY.DEFAULT); } /** * Loads a record using a fetch plan. */ public > RET load(final ORecordInternal iRecord, final String iFetchPlan) { - return (RET) executeReadRecord((ORecordId) iRecord.getIdentity(), iRecord, iFetchPlan, false, false); + return (RET) executeReadRecord((ORecordId) iRecord.getIdentity(), iRecord, iFetchPlan, false, false, + OStorage.LOCKING_STRATEGY.DEFAULT); } public > RET load(final ORecordInternal iRecord, final String iFetchPlan, final boolean iIgnoreCache) { - return (RET) executeReadRecord((ORecordId) iRecord.getIdentity(), iRecord, iFetchPlan, iIgnoreCache, false); + return (RET) executeReadRecord((ORecordId) iRecord.getIdentity(), iRecord, iFetchPlan, iIgnoreCache, false, + OStorage.LOCKING_STRATEGY.DEFAULT); } @Override public > RET load(ORecordInternal iRecord, String iFetchPlan, boolean iIgnoreCache, - boolean loadTombstone) { - return (RET) executeReadRecord((ORecordId) iRecord.getIdentity(), iRecord, iFetchPlan, iIgnoreCache, loadTombstone); + boolean loadTombstone, OStorage.LOCKING_STRATEGY iLockingStrategy) { + return (RET) executeReadRecord((ORecordId) iRecord.getIdentity(), iRecord, iFetchPlan, iIgnoreCache, loadTombstone, + iLockingStrategy); } public > RET load(final ORID iRecordId) { - return (RET) executeReadRecord((ORecordId) iRecordId, null, null, false, false); + return (RET) executeReadRecord((ORecordId) iRecordId, null, null, false, false, OStorage.LOCKING_STRATEGY.DEFAULT); } public > RET load(final ORID iRecordId, final String iFetchPlan) { - return (RET) executeReadRecord((ORecordId) iRecordId, null, iFetchPlan, false, false); + return (RET) executeReadRecord((ORecordId) iRecordId, null, iFetchPlan, false, false, OStorage.LOCKING_STRATEGY.DEFAULT); } public > RET load(final ORID iRecordId, final String iFetchPlan, final boolean iIgnoreCache) { - return (RET) executeReadRecord((ORecordId) iRecordId, null, iFetchPlan, iIgnoreCache, false); + return (RET) executeReadRecord((ORecordId) iRecordId, null, iFetchPlan, iIgnoreCache, false, OStorage.LOCKING_STRATEGY.DEFAULT); } @Override - public > RET load(ORID iRecordId, String iFetchPlan, boolean iIgnoreCache, boolean loadTombstone) { - return (RET) executeReadRecord((ORecordId) iRecordId, null, iFetchPlan, iIgnoreCache, loadTombstone); + public > RET load(ORID iRecordId, String iFetchPlan, boolean iIgnoreCache, boolean loadTombstone, + OStorage.LOCKING_STRATEGY iLockingStrategy) { + return (RET) executeReadRecord((ORecordId) iRecordId, null, iFetchPlan, iIgnoreCache, loadTombstone, iLockingStrategy); } /** @@ -458,27 +470,30 @@ public > ORecordIteratorCluster browseCluste } @Override - public > ORecordIteratorCluster browseCluster(String iClusterName, Class iRecordClass, - OClusterPosition startClusterPosition, OClusterPosition endClusterPosition, boolean loadTombstones) { + public > ORecordIteratorCluster browseCluster(final String iClusterName, + final Class iRecordClass, final OClusterPosition startClusterPosition, final OClusterPosition endClusterPosition, + final boolean loadTombstones) { checkSecurity(ODatabaseSecurityResources.CLUSTER, ORole.PERMISSION_READ, iClusterName); setCurrentDatabaseinThreadLocal(); final int clusterId = getClusterIdByName(iClusterName); - return new ORecordIteratorCluster(this, this, clusterId, startClusterPosition, endClusterPosition, true, loadTombstones); + return new ORecordIteratorCluster(this, this, clusterId, startClusterPosition, endClusterPosition, true, loadTombstones, + OStorage.LOCKING_STRATEGY.DEFAULT); } @Override - public > ORecordIteratorCluster browseCluster(String iClusterName, - OClusterPosition startClusterPosition, OClusterPosition endClusterPosition, boolean loadTombstones) { + public > ORecordIteratorCluster browseCluster(final String iClusterName, + final OClusterPosition startClusterPosition, final OClusterPosition endClusterPosition, final boolean loadTombstones) { checkSecurity(ODatabaseSecurityResources.CLUSTER, ORole.PERMISSION_READ, iClusterName); setCurrentDatabaseinThreadLocal(); final int clusterId = getClusterIdByName(iClusterName); - return new ORecordIteratorCluster(this, this, clusterId, startClusterPosition, endClusterPosition, true, loadTombstones); + return new ORecordIteratorCluster(this, this, clusterId, startClusterPosition, endClusterPosition, true, loadTombstones, + OStorage.LOCKING_STRATEGY.DEFAULT); } public ORecordIteratorCluster browseCluster(final String iClusterName) { @@ -676,11 +691,10 @@ public DB checkSecurity(final String iResourceGener } public > RET executeReadRecord(final ORecordId iRid, ORecordInternal iRecord, - final String iFetchPlan, final boolean iIgnoreCache, boolean loadTombstones) { + final String iFetchPlan, final boolean iIgnoreCache, final boolean loadTombstones, + final OStorage.LOCKING_STRATEGY iLockingStrategy) { checkOpeness(); - // setCurrentDatabaseinThreadLocal(); - try { checkSecurity(ODatabaseSecurityResources.CLUSTER, ORole.PERMISSION_READ, getClusterNameById(iRid.getClusterId())); @@ -712,7 +726,7 @@ record = iRecord; return (RET) record; } - final ORawBuffer recordBuffer = underlying.read(iRid, iFetchPlan, iIgnoreCache, loadTombstones).getResult(); + final ORawBuffer recordBuffer = underlying.read(iRid, iFetchPlan, iIgnoreCache, loadTombstones, iLockingStrategy).getResult(); if (recordBuffer == null) return null; @@ -764,12 +778,11 @@ public > RET executeSaveRecord(final ORecordInter setCurrentDatabaseinThreadLocal(); - - final Set lockedIndexes = new HashSet(); + final Set lockedIndexes = new HashSet(); record.setInternalStatus(com.orientechnologies.orient.core.db.record.ORecordElement.STATUS.MARSHALLING); try { - if(record instanceof ODocument) - acquireIndexModificationLock((ODocument) record, lockedIndexes); + if (record instanceof ODocument) + acquireIndexModificationLock((ODocument) record, lockedIndexes); final boolean wasNew = iForceCreate || rid.isNew(); if (wasNew && rid.clusterId == -1) @@ -881,7 +894,7 @@ else if (stream == null || stream.length == 0) // WRAP IT AS ODATABASE EXCEPTION throw new ODatabaseException("Error on saving record in cluster #" + record.getIdentity().getClusterId(), t); } finally { - releaseIndexModificationLock(lockedIndexes); + releaseIndexModificationLock(lockedIndexes); record.setInternalStatus(com.orientechnologies.orient.core.db.record.ORecordElement.STATUS.LOADED); } return (RET) record; @@ -915,14 +928,14 @@ public void executeDeleteRecord(final OIdentifiable record, final ORecordVersion checkSecurity(ODatabaseSecurityResources.CLUSTER, ORole.PERMISSION_DELETE, getClusterNameById(rid.clusterId)); - final Set lockedIndexes = new HashSet(); + final Set lockedIndexes = new HashSet(); setCurrentDatabaseinThreadLocal(); ORecordSerializationContext.pushContext(); try { - if(record instanceof ODocument) - acquireIndexModificationLock((ODocument) record, lockedIndexes); + if (record instanceof ODocument) + acquireIndexModificationLock((ODocument) record, lockedIndexes); - try { + try { // if cache is switched off record will be unreachable after delete. ORecord rec = record.getRecord(); if (iCallTriggers && rec != null) @@ -965,7 +978,7 @@ else if (rec != null) throw new ODatabaseException("Error on deleting record in cluster #" + record.getIdentity().getClusterId(), t); } } finally { - releaseIndexModificationLock(lockedIndexes); + releaseIndexModificationLock(lockedIndexes); ORecordSerializationContext.pullContext(); } } @@ -1187,7 +1200,7 @@ private boolean processReplicaUpdate(ORecordMetadata loadedRecordMetadata) throw replicaToUpdate = mergeWithRecord(rid); callbackHooks(TYPE.BEFORE_REPLICA_UPDATE, replicaToUpdate); } else if (!loadedRecordMetadata.getRecordVersion().isTombstone() && replicaVersion.isTombstone()) { - replicaToUpdate = load(rid, "*:0", false, true); + replicaToUpdate = load(rid, "*:0", false, true, OStorage.LOCKING_STRATEGY.DEFAULT); replicaToUpdate.getRecordVersion().copyFrom(replicaVersion); callbackHooks(TYPE.BEFORE_REPLICA_DELETE, replicaToUpdate); @@ -1266,7 +1279,7 @@ private ORecordInternal mergeWithRecord(ORID rid) { if (rid == null) replicaToAdd = new ODocument(); else - replicaToAdd = load(rid, "*:0", false, true); + replicaToAdd = load(rid, "*:0", false, true, OStorage.LOCKING_STRATEGY.DEFAULT); ((ODocument) replicaToAdd).merge((ODocument) record, false, false); @@ -1279,41 +1292,41 @@ private ORecordInternal mergeWithRecord(ORID rid) { } } - private void releaseIndexModificationLock(Set lockedIndexes) { - final OMetadataDefault metadata = getMetadata(); - if (metadata == null) - return; - - final OIndexManager indexManager = metadata.getIndexManager(); - if(indexManager == null) - return; - - for (String indexName : lockedIndexes) { - OIndex index = indexManager.getIndex(indexName); - index.getInternal().releaseModificationLock(); - } - } - - private void acquireIndexModificationLock(ODocument doc, Set lockedIndexes) { - if (getStorage() instanceof OStorageEmbedded) { - final OClass cls = doc.getSchemaClass(); - if (cls != null) { - final Collection> indexes = cls.getIndexes(); - if (indexes != null) { - final SortedSet> indexesToLock = new TreeSet>(new Comparator>() { - public int compare(OIndex indexOne, OIndex indexTwo) { - return indexOne.getName().compareTo(indexTwo.getName()); - } - }); - - indexesToLock.addAll(indexes); - - for (final OIndex index : indexesToLock) { - index.getInternal().acquireModificationLock(); - lockedIndexes.add(index.getName()); - } - } - } - } - } + private void releaseIndexModificationLock(Set lockedIndexes) { + final OMetadataDefault metadata = getMetadata(); + if (metadata == null) + return; + + final OIndexManager indexManager = metadata.getIndexManager(); + if (indexManager == null) + return; + + for (String indexName : lockedIndexes) { + OIndex index = indexManager.getIndex(indexName); + index.getInternal().releaseModificationLock(); + } + } + + private void acquireIndexModificationLock(ODocument doc, Set lockedIndexes) { + if (getStorage() instanceof OStorageEmbedded) { + final OClass cls = doc.getSchemaClass(); + if (cls != null) { + final Collection> indexes = cls.getIndexes(); + if (indexes != null) { + final SortedSet> indexesToLock = new TreeSet>(new Comparator>() { + public int compare(OIndex indexOne, OIndex indexTwo) { + return indexOne.getName().compareTo(indexTwo.getName()); + } + }); + + indexesToLock.addAll(indexes); + + for (final OIndex index : indexesToLock) { + index.getInternal().acquireModificationLock(); + lockedIndexes.add(index.getName()); + } + } + } + } + } } diff --git a/core/src/main/java/com/orientechnologies/orient/core/db/record/ODatabaseRecordTx.java b/core/src/main/java/com/orientechnologies/orient/core/db/record/ODatabaseRecordTx.java index ccd537c1d3c..adbe3f28dc0 100755 --- a/core/src/main/java/com/orientechnologies/orient/core/db/record/ODatabaseRecordTx.java +++ b/core/src/main/java/com/orientechnologies/orient/core/db/record/ODatabaseRecordTx.java @@ -24,6 +24,7 @@ import com.orientechnologies.orient.core.record.ORecord; import com.orientechnologies.orient.core.record.ORecordInternal; import com.orientechnologies.orient.core.storage.ORecordCallback; +import com.orientechnologies.orient.core.storage.OStorage; import com.orientechnologies.orient.core.tx.OTransaction; import com.orientechnologies.orient.core.tx.OTransaction.TXSTATUS; import com.orientechnologies.orient.core.tx.OTransaction.TXTYPE; @@ -180,56 +181,59 @@ public OTransaction getTransaction() { @SuppressWarnings("unchecked") @Override public > RET load(final ORecordInternal iRecord, final String iFetchPlan) { - return (RET) currentTx.loadRecord(iRecord.getIdentity(), iRecord, iFetchPlan, false, false); + return (RET) currentTx.loadRecord(iRecord.getIdentity(), iRecord, iFetchPlan, false, false, OStorage.LOCKING_STRATEGY.DEFAULT); } @SuppressWarnings("unchecked") @Override public > RET load(ORecordInternal iRecord, String iFetchPlan, boolean iIgnoreCache, - boolean loadTombstone) { - return (RET) currentTx.loadRecord(iRecord.getIdentity(), iRecord, iFetchPlan, iIgnoreCache, loadTombstone); + boolean loadTombstone, OStorage.LOCKING_STRATEGY iLockingStrategy) { + return (RET) currentTx.loadRecord(iRecord.getIdentity(), iRecord, iFetchPlan, iIgnoreCache, loadTombstone, iLockingStrategy); } @SuppressWarnings("unchecked") @Override public > RET load(final ORecordInternal iRecord) { - return (RET) currentTx.loadRecord(iRecord.getIdentity(), iRecord, null, false, false); + return (RET) currentTx.loadRecord(iRecord.getIdentity(), iRecord, null, false, false, OStorage.LOCKING_STRATEGY.DEFAULT); } @SuppressWarnings("unchecked") @Override public > RET load(final ORID iRecordId) { - return (RET) currentTx.loadRecord(iRecordId, null, null, false, false); + return (RET) currentTx.loadRecord(iRecordId, null, null, false, false, OStorage.LOCKING_STRATEGY.DEFAULT); } @SuppressWarnings("unchecked") @Override public > RET load(final ORID iRecordId, final String iFetchPlan) { - return (RET) currentTx.loadRecord(iRecordId, null, iFetchPlan, false, false); + return (RET) currentTx.loadRecord(iRecordId, null, iFetchPlan, false, false, OStorage.LOCKING_STRATEGY.DEFAULT); } @SuppressWarnings("unchecked") @Override - public > RET load(ORID iRecordId, String iFetchPlan, boolean iIgnoreCache, boolean loadTombstone) { - return (RET) currentTx.loadRecord(iRecordId, null, iFetchPlan, iIgnoreCache, loadTombstone); + public > RET load(final ORID iRecordId, String iFetchPlan, final boolean iIgnoreCache, + final boolean loadTombstone, OStorage.LOCKING_STRATEGY iLockingStrategy) { + return (RET) currentTx.loadRecord(iRecordId, null, iFetchPlan, iIgnoreCache, loadTombstone, iLockingStrategy); } @SuppressWarnings("unchecked") @Override - public > RET reload(ORecordInternal iRecord) { + public > RET reload(final ORecordInternal iRecord) { return reload(iRecord, null, false); } @SuppressWarnings("unchecked") @Override - public > RET reload(ORecordInternal iRecord, String iFetchPlan) { + public > RET reload(final ORecordInternal iRecord, final String iFetchPlan) { return reload(iRecord, iFetchPlan, false); } @SuppressWarnings("unchecked") @Override - public > RET reload(ORecordInternal iRecord, String iFetchPlan, boolean iIgnoreCache) { - ORecordInternal record = currentTx.loadRecord(iRecord.getIdentity(), iRecord, iFetchPlan, iIgnoreCache, false); + public > RET reload(final ORecordInternal iRecord, final String iFetchPlan, + final boolean iIgnoreCache) { + ORecordInternal record = currentTx.loadRecord(iRecord.getIdentity(), iRecord, iFetchPlan, iIgnoreCache, false, + OStorage.LOCKING_STRATEGY.DEFAULT); if (record != null && iRecord != record) { iRecord.fromStream(record.toStream()); iRecord.getRecordVersion().copyFrom(record.getRecordVersion()); diff --git a/core/src/main/java/com/orientechnologies/orient/core/db/record/OIdentifiable.java b/core/src/main/java/com/orientechnologies/orient/core/db/record/OIdentifiable.java index a2f56ef2298..55e0ec99a36 100644 --- a/core/src/main/java/com/orientechnologies/orient/core/db/record/OIdentifiable.java +++ b/core/src/main/java/com/orientechnologies/orient/core/db/record/OIdentifiable.java @@ -15,11 +15,11 @@ */ package com.orientechnologies.orient.core.db.record; -import java.util.Comparator; - import com.orientechnologies.orient.core.id.ORID; import com.orientechnologies.orient.core.record.ORecord; +import java.util.Comparator; + /** * Base interface for identifiable objects. This abstraction is required to use ORID and ORecord in many points. * @@ -27,17 +27,17 @@ * */ public interface OIdentifiable extends Comparable, Comparator { - /** - * Returns the record identity. - * - * @return ORID instance - */ - public ORID getIdentity(); + /** + * Returns the record identity. + * + * @return ORID instance + */ + public ORID getIdentity(); - /** - * Returns the record instance. - * - * @return ORecord instance - */ - public > T getRecord(); + /** + * Returns the record instance. + * + * @return ORecord instance + */ + public > T getRecord(); } diff --git a/core/src/main/java/com/orientechnologies/orient/core/db/tool/ODatabaseCompare.java b/core/src/main/java/com/orientechnologies/orient/core/db/tool/ODatabaseCompare.java index 98f71a6638c..72eb796bddd 100755 --- a/core/src/main/java/com/orientechnologies/orient/core/db/tool/ODatabaseCompare.java +++ b/core/src/main/java/com/orientechnologies/orient/core/db/tool/ODatabaseCompare.java @@ -530,16 +530,16 @@ private boolean compareRecords(ODocumentHelper.RIDMapper ridMapper) { && rid.equals(new ORecordId(storage2.getConfiguration().indexMgrRecordId))) continue; - final ORawBuffer buffer1 = storage1.readRecord(rid, null, true, null, false).getResult(); + final ORawBuffer buffer1 = storage1.readRecord(rid, null, true, null, false, OStorage.LOCKING_STRATEGY.DEFAULT).getResult(); final ORawBuffer buffer2; if (ridMapper == null) - buffer2 = storage2.readRecord(rid, null, true, null, false).getResult(); + buffer2 = storage2.readRecord(rid, null, true, null, false, OStorage.LOCKING_STRATEGY.DEFAULT).getResult(); else { final ORID newRid = ridMapper.map(rid); if (newRid == null) - buffer2 = storage2.readRecord(rid, null, true, null, false).getResult(); + buffer2 = storage2.readRecord(rid, null, true, null, false, OStorage.LOCKING_STRATEGY.DEFAULT).getResult(); else - buffer2 = storage2.readRecord(new ORecordId(newRid), null, true, null, false).getResult(); + buffer2 = storage2.readRecord(new ORecordId(newRid), null, true, null, false, OStorage.LOCKING_STRATEGY.DEFAULT).getResult(); } if (buffer1 == null && buffer2 == null) diff --git a/core/src/main/java/com/orientechnologies/orient/core/iterator/OIdentifiableIterator.java b/core/src/main/java/com/orientechnologies/orient/core/iterator/OIdentifiableIterator.java index f0db8038158..7133b02f2a3 100755 --- a/core/src/main/java/com/orientechnologies/orient/core/iterator/OIdentifiableIterator.java +++ b/core/src/main/java/com/orientechnologies/orient/core/iterator/OIdentifiableIterator.java @@ -15,10 +15,6 @@ */ package com.orientechnologies.orient.core.iterator; -import java.util.Iterator; -import java.util.List; -import java.util.NoSuchElementException; - import com.orientechnologies.common.log.OLogManager; import com.orientechnologies.orient.core.db.record.ODatabaseRecord; import com.orientechnologies.orient.core.db.record.OIdentifiable; @@ -31,6 +27,10 @@ import com.orientechnologies.orient.core.storage.OPhysicalPosition; import com.orientechnologies.orient.core.storage.OStorage; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; + /** * Iterator class to browse forward and backward the records of a cluster. Once browsed in a direction, the iterator cannot change * it. @@ -38,43 +38,46 @@ * @author Luca Garulli */ public abstract class OIdentifiableIterator implements Iterator, Iterable { - protected final ODatabaseRecord database; - private final ODatabaseRecord lowLevelDatabase; - private final OStorage dbStorage; + protected final ODatabaseRecord database; + private final ODatabaseRecord lowLevelDatabase; + private final OStorage dbStorage; + + protected boolean liveUpdated = false; + protected long limit = -1; + protected long browsedRecords = 0; - protected boolean liveUpdated = false; - protected long limit = -1; - protected long browsedRecords = 0; + private String fetchPlan; + private ORecordInternal reusedRecord = null; // DEFAULT = NOT + // REUSE IT + private Boolean directionForward; - private String fetchPlan; - private ORecordInternal reusedRecord = null; // DEFAULT = NOT - // REUSE IT - private Boolean directionForward; + protected final ORecordId current = new ORecordId(); - protected final ORecordId current = new ORecordId(); + protected OStorage.LOCKING_STRATEGY lockingStrategy = OStorage.LOCKING_STRATEGY.DEFAULT; - protected long totalAvailableRecords; - protected List txEntries; + protected long totalAvailableRecords; + protected List txEntries; - protected int currentTxEntryPosition = -1; + protected int currentTxEntryPosition = -1; - protected OClusterPosition firstClusterEntry = OClusterPositionFactory.INSTANCE.valueOf(0); - protected OClusterPosition lastClusterEntry = OClusterPositionFactory.INSTANCE.getMaxValue(); + protected OClusterPosition firstClusterEntry = OClusterPositionFactory.INSTANCE.valueOf(0); + protected OClusterPosition lastClusterEntry = OClusterPositionFactory.INSTANCE.getMaxValue(); - private OClusterPosition currentEntry = OClusterPosition.INVALID_POSITION; + private OClusterPosition currentEntry = OClusterPosition.INVALID_POSITION; - private int currentEntryPosition = -1; - private OPhysicalPosition[] positionsToProcess = null; + private int currentEntryPosition = -1; + private OPhysicalPosition[] positionsToProcess = null; - private final boolean useCache; - private final boolean iterateThroughTombstones; + private final boolean useCache; + private final boolean iterateThroughTombstones; public OIdentifiableIterator(final ODatabaseRecord iDatabase, final ODatabaseRecord iLowLevelDatabase, final boolean useCache, - final boolean iterateThroughTombstones) { + final boolean iterateThroughTombstones, final OStorage.LOCKING_STRATEGY iLockingStrategy) { database = iDatabase; lowLevelDatabase = iLowLevelDatabase; this.iterateThroughTombstones = iterateThroughTombstones; this.useCache = useCache; + lockingStrategy = iLockingStrategy; dbStorage = lowLevelDatabase.getStorage(); @@ -277,9 +280,9 @@ protected ORecordInternal readCurrentRecord(ORecordInternal iRecord, final try { if (iRecord != null) { iRecord.setIdentity(new ORecordId(current.clusterId, current.clusterPosition)); - iRecord = lowLevelDatabase.load(iRecord, fetchPlan, !useCache, iterateThroughTombstones); + iRecord = lowLevelDatabase.load(iRecord, fetchPlan, !useCache, iterateThroughTombstones, lockingStrategy); } else - iRecord = lowLevelDatabase.load(current, fetchPlan, !useCache, iterateThroughTombstones); + iRecord = lowLevelDatabase.load(current, fetchPlan, !useCache, iterateThroughTombstones, lockingStrategy); } catch (ODatabaseException e) { if (Thread.interrupted()) // THREAD INTERRUPTED: RETURN diff --git a/core/src/main/java/com/orientechnologies/orient/core/iterator/ORecordIteratorClass.java b/core/src/main/java/com/orientechnologies/orient/core/iterator/ORecordIteratorClass.java index 4398c7e9139..2dc113ef6c8 100755 --- a/core/src/main/java/com/orientechnologies/orient/core/iterator/ORecordIteratorClass.java +++ b/core/src/main/java/com/orientechnologies/orient/core/iterator/ORecordIteratorClass.java @@ -23,6 +23,7 @@ import com.orientechnologies.orient.core.record.ORecord; import com.orientechnologies.orient.core.record.ORecordInternal; import com.orientechnologies.orient.core.record.impl.ODocument; +import com.orientechnologies.orient.core.storage.OStorage; /** * Iterator class to browse forward and backward the records of a cluster. Once browsed in a direction, the iterator cannot change @@ -52,7 +53,14 @@ public ORecordIteratorClass(final ODatabaseRecord iDatabase, final ODatabaseReco public ORecordIteratorClass(final ODatabaseRecord iDatabase, final ODatabaseRecord iLowLevelDatabase, final String iClassName, final boolean iPolymorphic, final boolean iUseCache, final boolean iterateThroughTombstones) { - super(iDatabase, iLowLevelDatabase, iUseCache, iterateThroughTombstones); + this(iDatabase, iLowLevelDatabase, iClassName, iPolymorphic, iUseCache, iterateThroughTombstones, + OStorage.LOCKING_STRATEGY.DEFAULT); + } + + public ORecordIteratorClass(final ODatabaseRecord iDatabase, final ODatabaseRecord iLowLevelDatabase, final String iClassName, + final boolean iPolymorphic, final boolean iUseCache, final boolean iterateThroughTombstones, + final OStorage.LOCKING_STRATEGY iLockingStrategy) { + super(iDatabase, iLowLevelDatabase, iUseCache, iterateThroughTombstones, iLockingStrategy); targetClass = database.getMetadata().getSchema().getClass(iClassName); if (targetClass == null) diff --git a/core/src/main/java/com/orientechnologies/orient/core/iterator/ORecordIteratorCluster.java b/core/src/main/java/com/orientechnologies/orient/core/iterator/ORecordIteratorCluster.java index 41c3a1af2f3..d57e869ffbe 100755 --- a/core/src/main/java/com/orientechnologies/orient/core/iterator/ORecordIteratorCluster.java +++ b/core/src/main/java/com/orientechnologies/orient/core/iterator/ORecordIteratorCluster.java @@ -23,6 +23,7 @@ import com.orientechnologies.orient.core.id.ORID; import com.orientechnologies.orient.core.record.ORecord; import com.orientechnologies.orient.core.record.ORecordInternal; +import com.orientechnologies.orient.core.storage.OStorage; /** * Iterator class to browse forward and backward the records of a cluster. Once browsed in a direction, the iterator cannot change @@ -36,13 +37,13 @@ public class ORecordIteratorCluster> extends OIde public ORecordIteratorCluster(final ODatabaseRecord iDatabase, final ODatabaseRecordAbstract iLowLevelDatabase, final int iClusterId, final boolean iUseCache) { this(iDatabase, iLowLevelDatabase, iClusterId, OClusterPosition.INVALID_POSITION, OClusterPosition.INVALID_POSITION, iUseCache, - false); + false, OStorage.LOCKING_STRATEGY.DEFAULT); } public ORecordIteratorCluster(final ODatabaseRecord iDatabase, final ODatabaseRecordAbstract iLowLevelDatabase, final int iClusterId, final OClusterPosition firstClusterEntry, final OClusterPosition lastClusterEntry, - final boolean iUseCache, final boolean iterateThroughTombstones) { - super(iDatabase, iLowLevelDatabase, iUseCache, iterateThroughTombstones); + final boolean iUseCache, final boolean iterateThroughTombstones, final OStorage.LOCKING_STRATEGY iLockingStrategy) { + super(iDatabase, iLowLevelDatabase, iUseCache, iterateThroughTombstones, iLockingStrategy); if (iClusterId == ORID.CLUSTER_ID_INVALID) throw new IllegalArgumentException("The clusterId is invalid"); @@ -79,7 +80,6 @@ public ORecordIteratorCluster(final ODatabaseRecord iDatabase, final ODatabaseRe } begin(); - } @Override diff --git a/core/src/main/java/com/orientechnologies/orient/core/iterator/ORecordIteratorClusters.java b/core/src/main/java/com/orientechnologies/orient/core/iterator/ORecordIteratorClusters.java index 9f7835d8b8d..a10030c7069 100755 --- a/core/src/main/java/com/orientechnologies/orient/core/iterator/ORecordIteratorClusters.java +++ b/core/src/main/java/com/orientechnologies/orient/core/iterator/ORecordIteratorClusters.java @@ -15,9 +15,6 @@ */ package com.orientechnologies.orient.core.iterator; -import java.util.Arrays; -import java.util.NoSuchElementException; - import com.orientechnologies.orient.core.db.record.ODatabaseRecord; import com.orientechnologies.orient.core.db.record.ORecordOperation; import com.orientechnologies.orient.core.id.OClusterPosition; @@ -25,6 +22,10 @@ import com.orientechnologies.orient.core.id.ORID; import com.orientechnologies.orient.core.record.ORecord; import com.orientechnologies.orient.core.record.ORecordInternal; +import com.orientechnologies.orient.core.storage.OStorage; + +import java.util.Arrays; +import java.util.NoSuchElementException; /** * Iterator to browse multiple clusters forward and backward. Once browsed in a direction, the iterator cannot change it. This @@ -43,15 +44,15 @@ public class ORecordIteratorClusters> extends OId protected ORID endRange; public ORecordIteratorClusters(final ODatabaseRecord iDatabase, final ODatabaseRecord iLowLevelDatabase, final int[] iClusterIds, - final boolean iUseCache, final boolean iterateThroughTombstones) { - super(iDatabase, iLowLevelDatabase, iUseCache, iterateThroughTombstones); + final boolean iUseCache, final boolean iterateThroughTombstones, final OStorage.LOCKING_STRATEGY iLockingStrategy) { + super(iDatabase, iLowLevelDatabase, iUseCache, iterateThroughTombstones, iLockingStrategy); clusterIds = iClusterIds; config(); } protected ORecordIteratorClusters(final ODatabaseRecord iDatabase, final ODatabaseRecord iLowLevelDatabase, - final boolean iUseCache, final boolean iterateThroughTombstones) { - super(iDatabase, iLowLevelDatabase, iUseCache, iterateThroughTombstones); + final boolean iUseCache, final boolean iterateThroughTombstones, final OStorage.LOCKING_STRATEGY iLockingStrategy) { + super(iDatabase, iLowLevelDatabase, iUseCache, iterateThroughTombstones, iLockingStrategy); } public ORecordIteratorClusters setRange(final ORID iBegin, final ORID iEnd) { diff --git a/core/src/main/java/com/orientechnologies/orient/core/record/impl/ODocument.java b/core/src/main/java/com/orientechnologies/orient/core/record/impl/ODocument.java index 82d9b9c6a2b..a27826c0a31 100755 --- a/core/src/main/java/com/orientechnologies/orient/core/record/impl/ODocument.java +++ b/core/src/main/java/com/orientechnologies/orient/core/record/impl/ODocument.java @@ -37,6 +37,7 @@ import com.orientechnologies.orient.core.serialization.serializer.OStringSerializerHelper; import com.orientechnologies.orient.core.serialization.serializer.record.ORecordSerializerFactory; import com.orientechnologies.orient.core.serialization.serializer.record.string.ORecordSerializerSchemaAware2CSV; +import com.orientechnologies.orient.core.storage.OStorage; import java.io.ByteArrayOutputStream; import java.io.Externalizable; @@ -335,7 +336,7 @@ public ODocument load(final String iFetchPlan, boolean iIgnoreCache) { public ODocument load(final String iFetchPlan, boolean iIgnoreCache, boolean loadTombstone) { Object result = null; try { - result = getDatabase().load(this, iFetchPlan, iIgnoreCache, loadTombstone); + result = getDatabase().load(this, iFetchPlan, iIgnoreCache, loadTombstone, OStorage.LOCKING_STRATEGY.DEFAULT); } catch (Exception e) { throw new ORecordNotFoundException("The record with id '" + getIdentity() + "' was not found", e); } diff --git a/core/src/main/java/com/orientechnologies/orient/core/sql/OCommandExecutorSQLAlterClass.java b/core/src/main/java/com/orientechnologies/orient/core/sql/OCommandExecutorSQLAlterClass.java index 5decf1688e3..66f335980ec 100644 --- a/core/src/main/java/com/orientechnologies/orient/core/sql/OCommandExecutorSQLAlterClass.java +++ b/core/src/main/java/com/orientechnologies/orient/core/sql/OCommandExecutorSQLAlterClass.java @@ -127,7 +127,7 @@ public Object execute(final Map iArgs) { do { for (OPhysicalPosition position : positions) { final ORecordId identity = new ORecordId(clusterId, position.clusterPosition); - final ORawBuffer record = storage.readRecord(identity, null, true, null, false).getResult(); + final ORawBuffer record = storage.readRecord(identity, null, true, null, false, OStorage.LOCKING_STRATEGY.DEFAULT).getResult(); if (record.recordType == ODocument.RECORD_TYPE) { final ORecordSerializerSchemaAware2CSV serializer = (ORecordSerializerSchemaAware2CSV) ORecordSerializerFactory diff --git a/core/src/main/java/com/orientechnologies/orient/core/sql/OCommandExecutorSQLResultsetAbstract.java b/core/src/main/java/com/orientechnologies/orient/core/sql/OCommandExecutorSQLResultsetAbstract.java index e8d5b1f5592..9eb7aa52f4b 100755 --- a/core/src/main/java/com/orientechnologies/orient/core/sql/OCommandExecutorSQLResultsetAbstract.java +++ b/core/src/main/java/com/orientechnologies/orient/core/sql/OCommandExecutorSQLResultsetAbstract.java @@ -44,6 +44,7 @@ import com.orientechnologies.orient.core.sql.operator.OQueryOperatorNotEquals; import com.orientechnologies.orient.core.sql.query.OSQLAsynchQuery; import com.orientechnologies.orient.core.sql.query.OSQLSynchQuery; +import com.orientechnologies.orient.core.storage.OStorage; import java.util.ArrayList; import java.util.Collections; @@ -357,9 +358,13 @@ protected void searchInClasses() { database.checkSecurity(ODatabaseSecurityResources.CLASS, ORole.PERMISSION_READ, cls.getName().toLowerCase()); // NO INDEXES: SCAN THE ENTIRE CLUSTER + + OStorage.LOCKING_STRATEGY locking = context != null && context.getVariable("$locking") != null ? (OStorage.LOCKING_STRATEGY) context + .getVariable("$locking") : OStorage.LOCKING_STRATEGY.DEFAULT; + final ORID[] range = getRange(); target = new ORecordIteratorClass>(database, (ODatabaseRecordAbstract) database, cls.getName(), true, - request.isUseCache(), false).setRange(range[0], range[1]); + request.isUseCache(), false, locking).setRange(range[0], range[1]); } protected void searchInClusters() { @@ -398,8 +403,11 @@ protected void searchInClusters() { final ORID[] range = getRange(); - target = new ORecordIteratorClusters>(database, database, clIds, request.isUseCache(), false).setRange( - range[0], range[1]); + final OStorage.LOCKING_STRATEGY locking = context != null && context.getVariable("$locking") != null ? (OStorage.LOCKING_STRATEGY) context + .getVariable("$locking") : OStorage.LOCKING_STRATEGY.DEFAULT; + + target = new ORecordIteratorClusters>(database, database, clIds, request.isUseCache(), false, locking) + .setRange(range[0], range[1]); } protected void applyLimitAndSkip() { diff --git a/core/src/main/java/com/orientechnologies/orient/core/sql/OCommandExecutorSQLSelect.java b/core/src/main/java/com/orientechnologies/orient/core/sql/OCommandExecutorSQLSelect.java index efe4fd999e9..833f5cff55d 100755 --- a/core/src/main/java/com/orientechnologies/orient/core/sql/OCommandExecutorSQLSelect.java +++ b/core/src/main/java/com/orientechnologies/orient/core/sql/OCommandExecutorSQLSelect.java @@ -368,7 +368,11 @@ protected boolean executeSearchRecord(final OIdentifiable id) { if (!context.checkTimeout()) return false; - final ORecordInternal record = id.getRecord(); + final OStorage.LOCKING_STRATEGY lockingStrategy = context != null && context.getVariable("$locking") != null ? (OStorage.LOCKING_STRATEGY) context + .getVariable("$locking") : OStorage.LOCKING_STRATEGY.DEFAULT; + + final ORecordInternal record = id instanceof ORecordInternal ? (ORecordInternal) id : getDatabase().load( + id.getIdentity(), null, false, false, lockingStrategy); context.updateMetric("recordReads", +1); diff --git a/core/src/main/java/com/orientechnologies/orient/core/sql/OCommandExecutorSQLUpdate.java b/core/src/main/java/com/orientechnologies/orient/core/sql/OCommandExecutorSQLUpdate.java index caccb5139de..132aa9c0b47 100755 --- a/core/src/main/java/com/orientechnologies/orient/core/sql/OCommandExecutorSQLUpdate.java +++ b/core/src/main/java/com/orientechnologies/orient/core/sql/OCommandExecutorSQLUpdate.java @@ -31,6 +31,8 @@ import com.orientechnologies.orient.core.sql.filter.OSQLFilterItem; import com.orientechnologies.orient.core.sql.functions.OSQLFunctionRuntime; import com.orientechnologies.orient.core.sql.query.OSQLAsynchQuery; +import com.orientechnologies.orient.core.storage.OStorage; +import com.orientechnologies.orient.core.storage.OStorageEmbedded; import java.util.ArrayList; import java.util.Collection; @@ -55,6 +57,7 @@ public class OCommandExecutorSQLUpdate extends OCommandExecutorSQLSetAware imple private static final String KEYWORD_REMOVE = "REMOVE"; private static final String KEYWORD_INCREMENT = "INCREMENT"; private static final String KEYWORD_MERGE = "MERGE"; + private static final String KEYWORD_STRATEGY = "STRATEGY"; private Map setEntries = new LinkedHashMap(); private List> addEntries = new ArrayList>(); @@ -62,6 +65,7 @@ public class OCommandExecutorSQLUpdate extends OCommandExecutorSQLSetAware imple private List> removeEntries = new ArrayList>(); private Map incrementEntries = new LinkedHashMap(); private ODocument merge = null; + private String strategy = "NONE"; private OQuery query; private OSQLFilter compiledFilter; @@ -98,9 +102,10 @@ public OCommandExecutorSQLUpdate parse(final OCommandRequest iRequest) { if (parserIsEnded() || (!word.equals(KEYWORD_SET) && !word.equals(KEYWORD_ADD) && !word.equals(KEYWORD_PUT) && !word.equals(KEYWORD_REMOVE) - && !word.equals(KEYWORD_INCREMENT) && !word.equals(KEYWORD_CONTENT) && !word.equals(KEYWORD_MERGE))) + && !word.equals(KEYWORD_INCREMENT) && !word.equals(KEYWORD_CONTENT) && !word.equals(KEYWORD_MERGE) && !word + .equals(KEYWORD_STRATEGY))) throwSyntaxErrorException("Expected keyword " + KEYWORD_SET + "," + KEYWORD_ADD + "," + KEYWORD_CONTENT + "," + KEYWORD_MERGE - + "," + KEYWORD_PUT + "," + KEYWORD_REMOVE + " or " + KEYWORD_INCREMENT); + + "," + KEYWORD_PUT + "," + KEYWORD_REMOVE + " or " + KEYWORD_INCREMENT + " or " + KEYWORD_STRATEGY); while (!parserIsEnded() && !parserGetLastWord().equals(OCommandExecutorSQLAbstract.KEYWORD_WHERE)) { word = parserGetLastWord(); @@ -119,6 +124,8 @@ else if (word.equals(KEYWORD_REMOVE)) parseRemoveFields(); else if (word.equals(KEYWORD_INCREMENT)) parseIncrementFields(); + else if (word.equals(KEYWORD_STRATEGY)) + parseStrategy(); else break; @@ -139,7 +146,7 @@ else if (word.equals(KEYWORD_INCREMENT)) } else if (additionalStatement.equals(OCommandExecutorSQLAbstract.KEYWORD_WHERE) || additionalStatement.equals(OCommandExecutorSQLAbstract.KEYWORD_LIMIT) - || additionalStatement.equals(OCommandExecutorSQLAbstract.KEYWORD_LET)) + || additionalStatement.equals(OCommandExecutorSQLAbstract.KEYWORD_LET) || additionalStatement.equals(KEYWORD_STRATEGY)) query = new OSQLAsynchQuery("select from " + subjectName + " " + additionalStatement + " " + parserText.substring(parserGetCurrentPosition()), this); else if (additionalStatement != null && !additionalStatement.isEmpty()) @@ -168,7 +175,12 @@ public Object execute(final Map iArgs) { query.setUseCache(false); query.setContext(context); + + if (strategy.equals("LOCK")) + query.getContext().setVariable("$locking", OStorage.LOCKING_STRATEGY.KEEP_EXCLUSIVE_LOCK); + getDatabase().query(query, queryArgs); + return recordCount; } @@ -178,160 +190,167 @@ public Object execute(final Map iArgs) { @SuppressWarnings("unchecked") public boolean result(final Object iRecord) { final ODocument record = (ODocument) ((OIdentifiable) iRecord).getRecord(); + try { - if (compiledFilter != null) { - // ADDITIONAL FILTERING - if (!(Boolean) compiledFilter.evaluate(record, null, context)) - return false; - } - - final Set updatedRecords = new HashSet(); - - parameters.reset(); - - if (content != null) { - // REPLACE ALL THE CONTENT - record.clear(); - record.merge(content, false, false); - updatedRecords.add(record); - } + if (compiledFilter != null) { + // ADDITIONAL FILTERING + if (!(Boolean) compiledFilter.evaluate(record, null, context)) + return false; + } - if (merge != null) { - // REPLACE ALL THE CONTENT - record.merge(merge, true, false); - updatedRecords.add(record); - } + final Set updatedRecords = new HashSet(); - // BIND VALUES TO UPDATE - if (!setEntries.isEmpty()) { - Set changedDocuments = OSQLHelper.bindParameters(record, setEntries, parameters, context); - if (changedDocuments != null) - updatedRecords.addAll(changedDocuments); - } + parameters.reset(); - // BIND VALUES TO INCREMENT - for (Map.Entry entry : incrementEntries.entrySet()) { - final Number prevValue = record.field(entry.getKey()); + if (content != null) { + // REPLACE ALL THE CONTENT + record.clear(); + record.merge(content, false, false); + updatedRecords.add(record); + } - if (prevValue == null) - // NO PREVIOUS VALUE: CONSIDER AS 0 - record.field(entry.getKey(), entry.getValue()); - else - // COMPUTING INCREMENT - record.field(entry.getKey(), OType.increment(prevValue, entry.getValue())); + if (merge != null) { + // REPLACE ALL THE CONTENT + record.merge(merge, true, false); + updatedRecords.add(record); + } - updatedRecords.add(record); - } + // BIND VALUES TO UPDATE + if (!setEntries.isEmpty()) { + Set changedDocuments = OSQLHelper.bindParameters(record, setEntries, parameters, context); + if (changedDocuments != null) + updatedRecords.addAll(changedDocuments); + } - Object v; - - // BIND VALUES TO ADD - Collection coll; - Object fieldValue; - for (OPair entry : addEntries) { - coll = null; - if (!record.containsField(entry.getKey())) { - // GET THE TYPE IF ANY - if (record.getSchemaClass() != null) { - OProperty prop = record.getSchemaClass().getProperty(entry.getKey()); - if (prop != null && prop.getType() == OType.LINKSET) - // SET TYPE - coll = new HashSet(); + // BIND VALUES TO INCREMENT + if (!incrementEntries.isEmpty()) { + for (Map.Entry entry : incrementEntries.entrySet()) { + final Number prevValue = record.field(entry.getKey()); + + if (prevValue == null) + // NO PREVIOUS VALUE: CONSIDER AS 0 + record.field(entry.getKey(), entry.getValue()); + else + // COMPUTING INCREMENT + record.field(entry.getKey(), OType.increment(prevValue, entry.getValue())); } - - if (coll == null) - // IN ALL OTHER CASES USE A LIST - coll = new ArrayList(); - - record.field(entry.getKey(), coll); - } else { - fieldValue = record.field(entry.getKey()); - - if (fieldValue instanceof Collection) - coll = (Collection) fieldValue; - else - continue; + updatedRecords.add(record); } - v = entry.getValue(); + Object v; + + // BIND VALUES TO ADD + Collection coll; + Object fieldValue; + for (OPair entry : addEntries) { + coll = null; + if (!record.containsField(entry.getKey())) { + // GET THE TYPE IF ANY + if (record.getSchemaClass() != null) { + OProperty prop = record.getSchemaClass().getProperty(entry.getKey()); + if (prop != null && prop.getType() == OType.LINKSET) + // SET TYPE + coll = new HashSet(); + } - if (v instanceof OSQLFilterItem) - v = ((OSQLFilterItem) v).getValue(record, null, context); - else if (v instanceof OSQLFunctionRuntime) - v = ((OSQLFunctionRuntime) v).execute(record, record, null, context); - else if (v instanceof OCommandRequest) - v = ((OCommandRequest) v).execute(record, null, context); + if (coll == null) + // IN ALL OTHER CASES USE A LIST + coll = new ArrayList(); - coll.add(v); - updatedRecords.add(record); - } + record.field(entry.getKey(), coll); + } else { + fieldValue = record.field(entry.getKey()); - // BIND VALUES TO PUT (AS MAP) - Map map; - OPair pair; - for (Entry> entry : putEntries.entrySet()) { - fieldValue = record.field(entry.getKey()); - - if (fieldValue == null) { - if (record.getSchemaClass() != null) { - final OProperty property = record.getSchemaClass().getProperty(entry.getKey()); - if (property != null - && (property.getType() != null && (!property.getType().equals(OType.EMBEDDEDMAP) && !property.getType().equals( - OType.LINKMAP)))) { - throw new OCommandExecutionException("field " + entry.getKey() + " is not defined as a map"); - } + if (fieldValue instanceof Collection) + coll = (Collection) fieldValue; + else + continue; } - fieldValue = new HashMap(); - record.field(entry.getKey(), fieldValue); - } - - if (fieldValue instanceof Map) { - map = (Map) fieldValue; - pair = entry.getValue(); - - v = pair.getValue(); + v = entry.getValue(); if (v instanceof OSQLFilterItem) v = ((OSQLFilterItem) v).getValue(record, null, context); - else if (pair.getValue() instanceof OSQLFunctionRuntime) + else if (v instanceof OSQLFunctionRuntime) v = ((OSQLFunctionRuntime) v).execute(record, record, null, context); else if (v instanceof OCommandRequest) v = ((OCommandRequest) v).execute(record, null, context); - map.put(pair.getKey(), v); + coll.add(v); updatedRecords.add(record); } - } - // REMOVE FIELD IF ANY - for (OPair entry : removeEntries) { - v = entry.getValue(); - if (v == EMPTY_VALUE) { - record.removeField(entry.getKey()); - updatedRecords.add(record); - } else { + // BIND VALUES TO PUT (AS MAP) + Map map; + OPair pair; + for (Entry> entry : putEntries.entrySet()) { fieldValue = record.field(entry.getKey()); - if (fieldValue instanceof Collection) { - coll = (Collection) fieldValue; - if (coll.remove(v)) - updatedRecords.add(record); - } else if (fieldValue instanceof Map) { + if (fieldValue == null) { + if (record.getSchemaClass() != null) { + final OProperty property = record.getSchemaClass().getProperty(entry.getKey()); + if (property != null + && (property.getType() != null && (!property.getType().equals(OType.EMBEDDEDMAP) && !property.getType().equals( + OType.LINKMAP)))) { + throw new OCommandExecutionException("field " + entry.getKey() + " is not defined as a map"); + } + } + fieldValue = new HashMap(); + record.field(entry.getKey(), fieldValue); + } + + if (fieldValue instanceof Map) { map = (Map) fieldValue; - if (map.remove(v) != null) - updatedRecords.add(record); + + pair = entry.getValue(); + + v = pair.getValue(); + + if (v instanceof OSQLFilterItem) + v = ((OSQLFilterItem) v).getValue(record, null, context); + else if (pair.getValue() instanceof OSQLFunctionRuntime) + v = ((OSQLFunctionRuntime) v).execute(record, record, null, context); + else if (v instanceof OCommandRequest) + v = ((OCommandRequest) v).execute(record, null, context); + + map.put(pair.getKey(), v); + updatedRecords.add(record); } } - } - for (ODocument d : updatedRecords) { - d.setDirty(); - d.save(); - recordCount++; - } + // REMOVE FIELD IF ANY + for (OPair entry : removeEntries) { + v = entry.getValue(); + if (v == EMPTY_VALUE) { + record.removeField(entry.getKey()); + updatedRecords.add(record); + } else { + fieldValue = record.field(entry.getKey()); + + if (fieldValue instanceof Collection) { + coll = (Collection) fieldValue; + if (coll.remove(v)) + updatedRecords.add(record); + } else if (fieldValue instanceof Map) { + map = (Map) fieldValue; + if (map.remove(v) != null) + updatedRecords.add(record); + } + } + } + + for (ODocument d : updatedRecords) { + d.setDirty(); + d.save(); + + recordCount++; + } + return true; - return true; + } finally { + if (strategy.equalsIgnoreCase("LOCK")) + ((OStorageEmbedded) getDatabase().getStorage()).releaseWriteLock(record.getIdentity()); + } } protected void parseMerge() { @@ -440,7 +459,7 @@ private void parseIncrementFields() { @Override public String getSyntax() { - return "UPDATE |cluster:> [SET|ADD|PUT|REMOVE|INCREMENT|CONTENT {}|MERGE {}] [[,] = |]* [WHERE ]"; + return "UPDATE |cluster:> [SET|ADD|PUT|REMOVE|INCREMENT|CONTENT {}|MERGE {}] [[,] = |]* [STRATEGY ] [WHERE ]"; } @Override @@ -459,4 +478,15 @@ protected String getBlock(String fieldValue) { return fieldValue; } + /** + * Parses the lock keyword if found. + */ + protected void parseStrategy() throws OCommandSQLParsingException { + parserNextWord(true); + strategy = parserGetLastWord(); + + if (!strategy.equalsIgnoreCase("NONE") && !strategy.equalsIgnoreCase("LOCK")) + throwParsingException("Invalid " + KEYWORD_STRATEGY + " value set to '" + strategy + + "' but it should be NONE, LOCK or RETRY [WAIT ]. Example: " + KEYWORD_STRATEGY + " RETRY 3 WAIT 100"); + } } diff --git a/core/src/main/java/com/orientechnologies/orient/core/storage/OStorage.java b/core/src/main/java/com/orientechnologies/orient/core/storage/OStorage.java index 9b472a8d07a..80ca1bb4711 100755 --- a/core/src/main/java/com/orientechnologies/orient/core/storage/OStorage.java +++ b/core/src/main/java/com/orientechnologies/orient/core/storage/OStorage.java @@ -15,12 +15,6 @@ */ package com.orientechnologies.orient.core.storage; -import java.io.IOException; -import java.util.Collection; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.Callable; - import com.orientechnologies.common.concur.resource.OSharedContainer; import com.orientechnologies.common.concur.resource.OSharedResourceAdaptiveExternal; import com.orientechnologies.orient.core.cache.OLevel2RecordCache; @@ -33,6 +27,12 @@ import com.orientechnologies.orient.core.util.OBackupable; import com.orientechnologies.orient.core.version.ORecordVersion; +import java.io.IOException; +import java.util.Collection; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.Callable; + /** * This is the gateway interface between the Database side and the storage. Provided implementations are: Local, Remote and Memory. * @@ -56,6 +56,10 @@ public enum STATUS { CLOSED, OPEN, CLOSING } + public enum LOCKING_STRATEGY { + NONE, DEFAULT, KEEP_SHARED_LOCK, KEEP_EXCLUSIVE_LOCK + } + public void open(String iUserName, String iUserPassword, final Map iProperties); public void create(Map iProperties); @@ -86,7 +90,7 @@ public OStorageOperationResult createRecord(int iDataSegmentI ORecordVersion iRecordVersion, byte iRecordType, int iMode, ORecordCallback iCallback); public OStorageOperationResult readRecord(ORecordId iRid, String iFetchPlan, boolean iIgnoreCache, - ORecordCallback iCallback, boolean loadTombstones); + ORecordCallback iCallback, boolean loadTombstones, LOCKING_STRATEGY iLockingStrategy); public OStorageOperationResult updateRecord(ORecordId iRecordId, byte[] iContent, ORecordVersion iVersion, byte iRecordType, int iMode, ORecordCallback iCallback); diff --git a/core/src/main/java/com/orientechnologies/orient/core/storage/OStorageEmbedded.java b/core/src/main/java/com/orientechnologies/orient/core/storage/OStorageEmbedded.java index 4481b5c0d87..04a1134acd4 100755 --- a/core/src/main/java/com/orientechnologies/orient/core/storage/OStorageEmbedded.java +++ b/core/src/main/java/com/orientechnologies/orient/core/storage/OStorageEmbedded.java @@ -58,7 +58,7 @@ public OStorageEmbedded(final String iName, final String iFilePath, final String public abstract OCluster getClusterByName(final String iClusterName); protected abstract ORawBuffer readRecord(final OCluster iClusterSegment, final ORecordId iRid, boolean iAtomicLock, - boolean loadTombstones); + boolean loadTombstones, LOCKING_STRATEGY iLockingStrategy); /** * Closes the storage freeing the lock manager first. @@ -201,6 +201,10 @@ public void releaseReadLock(final ORID iRid) { lockManager.releaseLock(Thread.currentThread(), iRid, LOCK.SHARED); } + public void releaseAllLocksOfCurrentThread() { + lockManager.releaseAllLocksOfRequester(Thread.currentThread()); + } + @Override public ORecordMetadata getRecordMetadata(ORID rid) { if (rid.isNew()) diff --git a/core/src/main/java/com/orientechnologies/orient/core/storage/impl/local/OStorageConfigurationSegment.java b/core/src/main/java/com/orientechnologies/orient/core/storage/impl/local/OStorageConfigurationSegment.java index 1a43cae878e..c34c0b7f7a9 100755 --- a/core/src/main/java/com/orientechnologies/orient/core/storage/impl/local/OStorageConfigurationSegment.java +++ b/core/src/main/java/com/orientechnologies/orient/core/storage/impl/local/OStorageConfigurationSegment.java @@ -23,6 +23,7 @@ import com.orientechnologies.orient.core.exception.OSerializationException; import com.orientechnologies.orient.core.serialization.OBinaryProtocol; import com.orientechnologies.orient.core.storage.ORawBuffer; +import com.orientechnologies.orient.core.storage.OStorage; import com.orientechnologies.orient.core.storage.fs.OFile; /** @@ -58,7 +59,7 @@ public OStorageConfiguration load() throws OSerializationException { // @COMPATIBILITY0.9.25 // CHECK FOR OLD VERSION OF DATABASE - final ORawBuffer rawRecord = storage.readRecord(CONFIG_RID, null, false, null, false).getResult(); + final ORawBuffer rawRecord = storage.readRecord(CONFIG_RID, null, false, null, false, OStorage.LOCKING_STRATEGY.DEFAULT).getResult(); if (rawRecord != null) fromStream(rawRecord.buffer); diff --git a/core/src/main/java/com/orientechnologies/orient/core/storage/impl/local/OStorageLocal.java b/core/src/main/java/com/orientechnologies/orient/core/storage/impl/local/OStorageLocal.java index 0b14df65ece..41b10b9aec5 100755 --- a/core/src/main/java/com/orientechnologies/orient/core/storage/impl/local/OStorageLocal.java +++ b/core/src/main/java/com/orientechnologies/orient/core/storage/impl/local/OStorageLocal.java @@ -15,18 +15,7 @@ */ package com.orientechnologies.orient.core.storage.impl.local; -import java.io.File; -import java.io.FileNotFoundException; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashSet; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.Callable; - +import com.orientechnologies.common.concur.lock.OLockManager; import com.orientechnologies.common.concur.lock.OLockManager.LOCK; import com.orientechnologies.common.concur.lock.OModificationLock; import com.orientechnologies.common.exception.OException; @@ -77,6 +66,18 @@ import com.orientechnologies.orient.core.version.ORecordVersion; import com.orientechnologies.orient.core.version.OVersionFactory; +import java.io.File; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.Callable; + public class OStorageLocal extends OStorageLocalAbstract { private final int DELETE_MAX_RETRIES; private final int DELETE_WAIT_TIME; @@ -1137,9 +1138,10 @@ public V callInRecordLock(Callable callable, ORID rid, boolean exclusiveL } public OStorageOperationResult readRecord(final ORecordId iRid, final String iFetchPlan, boolean iIgnoreCache, - ORecordCallback iCallback, boolean loadTombstones) { + ORecordCallback iCallback, final boolean loadTombstones, final LOCKING_STRATEGY iLockingStrategy) { checkOpeness(); - return new OStorageOperationResult(readRecord(getClusterById(iRid.clusterId), iRid, true, loadTombstones)); + return new OStorageOperationResult(readRecord(getClusterById(iRid.clusterId), iRid, true, loadTombstones, + iLockingStrategy)); } public OStorageOperationResult updateRecord(final ORecordId iRid, final byte[] iContent, @@ -1758,7 +1760,8 @@ protected OPhysicalPosition createRecord(final ODataLocal dataSegment, final OCl } @Override - protected ORawBuffer readRecord(final OCluster iClusterSegment, final ORecordId iRid, boolean iAtomicLock, boolean loadTombstones) { + protected ORawBuffer readRecord(final OCluster iClusterSegment, final ORecordId iRid, final boolean iAtomicLock, + final boolean loadTombstones, final LOCKING_STRATEGY iLockingStrategy) { if (!iRid.isPersistent()) throw new IllegalArgumentException("Cannot read record " + iRid + " since the position is invalid in database '" + name + '\''); @@ -1774,7 +1777,19 @@ protected ORawBuffer readRecord(final OCluster iClusterSegment, final ORecordId lock.acquireSharedLock(); try { - lockManager.acquireLock(Thread.currentThread(), iRid, LOCK.SHARED); + switch (iLockingStrategy) { + case DEFAULT: + case KEEP_SHARED_LOCK: + lockManager.acquireLock(Thread.currentThread(), iRid, LOCK.SHARED); + break; + case NONE: + // DO NOTHING + break; + case KEEP_EXCLUSIVE_LOCK: + throw new IllegalStateException("Exclusive locking not supported on read() in 'local' storage. Use plocal instead."); + // lockManager.acquireLock(Thread.currentThread(), iRid, LOCK.EXCLUSIVE); + } + try { final OPhysicalPosition ppos = iClusterSegment.getPhysicalPosition(new OPhysicalPosition(iRid.clusterPosition)); @@ -1789,7 +1804,16 @@ protected ORawBuffer readRecord(final OCluster iClusterSegment, final ORecordId return new ORawBuffer(data.getRecord(ppos.dataSegmentPos), ppos.recordVersion, ppos.recordType); } finally { - lockManager.releaseLock(Thread.currentThread(), iRid, LOCK.SHARED); + switch (iLockingStrategy) { + case DEFAULT: + lockManager.releaseLock(Thread.currentThread(), iRid, OLockManager.LOCK.SHARED); + break; + case NONE: + case KEEP_SHARED_LOCK: + case KEEP_EXCLUSIVE_LOCK: + // DO NOTHING + break; + } } } catch (IOException e) { diff --git a/core/src/main/java/com/orientechnologies/orient/core/storage/impl/local/OStorageLocalTxExecuter.java b/core/src/main/java/com/orientechnologies/orient/core/storage/impl/local/OStorageLocalTxExecuter.java index a39f033b227..7836689269d 100755 --- a/core/src/main/java/com/orientechnologies/orient/core/storage/impl/local/OStorageLocalTxExecuter.java +++ b/core/src/main/java/com/orientechnologies/orient/core/storage/impl/local/OStorageLocalTxExecuter.java @@ -35,6 +35,7 @@ import com.orientechnologies.orient.core.storage.OCluster; import com.orientechnologies.orient.core.storage.OPhysicalPosition; import com.orientechnologies.orient.core.storage.ORawBuffer; +import com.orientechnologies.orient.core.storage.OStorage; import com.orientechnologies.orient.core.tx.OTransaction; import com.orientechnologies.orient.core.tx.OTransactionAbstract; import com.orientechnologies.orient.core.tx.OTxListener; @@ -113,7 +114,7 @@ protected ORecordVersion updateRecord(final int iTxId, final OCluster iClusterSe final byte[] iContent, final ORecordVersion iVersion, final byte iRecordType) { try { // READ CURRENT RECORD CONTENT - final ORawBuffer buffer = storage.readRecord(iClusterSegment, iRid, true, false); + final ORawBuffer buffer = storage.readRecord(iClusterSegment, iRid, true, false, OStorage.LOCKING_STRATEGY.DEFAULT); if (buffer == null) if (OFastConcurrentModificationException.enabled()) @@ -145,7 +146,7 @@ protected boolean deleteRecord(final int iTxId, final OCluster iClusterSegment, final ORecordId rid = new ORecordId(iClusterSegment.getId(), iPosition); // READ CURRENT RECORD CONTENT - final ORawBuffer buffer = storage.readRecord(iClusterSegment, rid, true, false); + final ORawBuffer buffer = storage.readRecord(iClusterSegment, rid, true, false, OStorage.LOCKING_STRATEGY.DEFAULT); if (buffer != null) { // SAVE INTO THE LOG THE OLD RECORD diff --git a/core/src/main/java/com/orientechnologies/orient/core/storage/impl/local/paginated/OLocalPaginatedStorage.java b/core/src/main/java/com/orientechnologies/orient/core/storage/impl/local/paginated/OLocalPaginatedStorage.java index aeb750d5463..ac6cd543528 100755 --- a/core/src/main/java/com/orientechnologies/orient/core/storage/impl/local/paginated/OLocalPaginatedStorage.java +++ b/core/src/main/java/com/orientechnologies/orient/core/storage/impl/local/paginated/OLocalPaginatedStorage.java @@ -16,25 +16,6 @@ package com.orientechnologies.orient.core.storage.impl.local.paginated; -import java.io.File; -import java.io.FileNotFoundException; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Comparator; -import java.util.HashMap; -import java.util.HashSet; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.TimeUnit; - import com.orientechnologies.common.concur.lock.OLockManager; import com.orientechnologies.common.concur.lock.OModificationLock; import com.orientechnologies.common.exception.OException; @@ -91,6 +72,25 @@ import com.orientechnologies.orient.core.version.ORecordVersion; import com.orientechnologies.orient.core.version.OVersionFactory; +import java.io.File; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; + /** * @author Andrey Lomakin * @since 28.03.13 @@ -1173,13 +1173,15 @@ public ORecordMetadata getRecordMetadata(ORID rid) { @Override public OStorageOperationResult readRecord(final ORecordId iRid, final String iFetchPlan, boolean iIgnoreCache, - ORecordCallback iCallback, boolean loadTombstones) { + ORecordCallback iCallback, boolean loadTombstones, LOCKING_STRATEGY iLockingStrategy) { checkOpeness(); - return new OStorageOperationResult(readRecord(getClusterById(iRid.clusterId), iRid, true, loadTombstones)); + return new OStorageOperationResult(readRecord(getClusterById(iRid.clusterId), iRid, true, loadTombstones, + iLockingStrategy)); } @Override - protected ORawBuffer readRecord(final OCluster clusterSegment, final ORecordId rid, boolean atomicLock, boolean loadTombstones) { + protected ORawBuffer readRecord(final OCluster clusterSegment, final ORecordId rid, boolean atomicLock, boolean loadTombstones, + LOCKING_STRATEGY iLockingStrategy) { checkOpeness(); if (!rid.isPersistent()) @@ -1193,11 +1195,31 @@ protected ORawBuffer readRecord(final OCluster clusterSegment, final ORecordId r lock.acquireSharedLock(); try { - lockManager.acquireLock(Thread.currentThread(), rid, OLockManager.LOCK.SHARED); + switch (iLockingStrategy) { + case DEFAULT: + case KEEP_SHARED_LOCK: + lockManager.acquireLock(Thread.currentThread(), rid, OLockManager.LOCK.SHARED); + break; + case NONE: + // DO NOTHING + break; + case KEEP_EXCLUSIVE_LOCK: + lockManager.acquireLock(Thread.currentThread(), rid, OLockManager.LOCK.EXCLUSIVE); + } + try { return clusterSegment.readRecord(rid.clusterPosition); } finally { - lockManager.releaseLock(Thread.currentThread(), rid, OLockManager.LOCK.SHARED); + switch (iLockingStrategy) { + case DEFAULT: + lockManager.releaseLock(Thread.currentThread(), rid, OLockManager.LOCK.SHARED); + break; + case NONE: + case KEEP_SHARED_LOCK: + case KEEP_EXCLUSIVE_LOCK: + // DO NOTHING + break; + } } } catch (IOException e) { diff --git a/core/src/main/java/com/orientechnologies/orient/core/storage/impl/memory/OStorageMemory.java b/core/src/main/java/com/orientechnologies/orient/core/storage/impl/memory/OStorageMemory.java index 9c6fa8334fb..94310f455af 100755 --- a/core/src/main/java/com/orientechnologies/orient/core/storage/impl/memory/OStorageMemory.java +++ b/core/src/main/java/com/orientechnologies/orient/core/storage/impl/memory/OStorageMemory.java @@ -15,19 +15,7 @@ */ package com.orientechnologies.orient.core.storage.impl.memory; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.Callable; - +import com.orientechnologies.common.concur.lock.OLockManager; import com.orientechnologies.common.concur.lock.OLockManager.LOCK; import com.orientechnologies.common.exception.OException; import com.orientechnologies.common.log.OLogManager; @@ -59,6 +47,19 @@ import com.orientechnologies.orient.core.version.ORecordVersion; import com.orientechnologies.orient.core.version.OVersionFactory; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.Callable; + /** * Memory implementation of storage. This storage works only in memory and has the following features: *
    @@ -365,18 +366,30 @@ public OStorageOperationResult createRecord(final int iDataSe } public OStorageOperationResult readRecord(final ORecordId iRid, String iFetchPlan, boolean iIgnoreCache, - ORecordCallback iCallback, boolean loadTombstones) { - return new OStorageOperationResult(readRecord(getClusterById(iRid.clusterId), iRid, true, loadTombstones)); + ORecordCallback iCallback, boolean loadTombstones, LOCKING_STRATEGY iLockingStrategy) { + return new OStorageOperationResult(readRecord(getClusterById(iRid.clusterId), iRid, true, loadTombstones, + iLockingStrategy)); } @Override protected ORawBuffer readRecord(final OCluster iClusterSegment, final ORecordId iRid, final boolean iAtomicLock, - boolean loadTombstones) { + boolean loadTombstones, LOCKING_STRATEGY iLockingStrategy) { final long timer = Orient.instance().getProfiler().startChrono(); lock.acquireSharedLock(); try { - lockManager.acquireLock(Thread.currentThread(), iRid, LOCK.SHARED); + switch (iLockingStrategy) { + case DEFAULT: + case KEEP_SHARED_LOCK: + lockManager.acquireLock(Thread.currentThread(), iRid, LOCK.SHARED); + break; + case NONE: + // DO NOTHING + break; + case KEEP_EXCLUSIVE_LOCK: + lockManager.acquireLock(Thread.currentThread(), iRid, LOCK.EXCLUSIVE); + } + try { final OClusterPosition lastPos = iClusterSegment.getLastPosition(); @@ -397,7 +410,16 @@ protected ORawBuffer readRecord(final OCluster iClusterSegment, final ORecordId return new ORawBuffer(dataSegment.readRecord(ppos.dataSegmentPos), ppos.recordVersion, ppos.recordType); } finally { - lockManager.releaseLock(Thread.currentThread(), iRid, LOCK.SHARED); + switch (iLockingStrategy) { + case DEFAULT: + lockManager.releaseLock(Thread.currentThread(), iRid, OLockManager.LOCK.SHARED); + break; + case NONE: + case KEEP_SHARED_LOCK: + case KEEP_EXCLUSIVE_LOCK: + // DO NOTHING + break; + } } } catch (IOException e) { throw new OStorageException("Error on read record in cluster: " + iClusterSegment.getId(), e); diff --git a/core/src/main/java/com/orientechnologies/orient/core/tx/OTransaction.java b/core/src/main/java/com/orientechnologies/orient/core/tx/OTransaction.java index b0e38c2b579..38c2033ee00 100755 --- a/core/src/main/java/com/orientechnologies/orient/core/tx/OTransaction.java +++ b/core/src/main/java/com/orientechnologies/orient/core/tx/OTransaction.java @@ -15,8 +15,6 @@ */ package com.orientechnologies.orient.core.tx; -import java.util.List; - import com.orientechnologies.orient.core.db.ODatabaseComplex.OPERATION_MODE; import com.orientechnologies.orient.core.db.record.ODatabaseRecordTx; import com.orientechnologies.orient.core.db.record.OIdentifiable; @@ -26,83 +24,88 @@ import com.orientechnologies.orient.core.record.ORecordInternal; import com.orientechnologies.orient.core.record.impl.ODocument; import com.orientechnologies.orient.core.storage.ORecordCallback; +import com.orientechnologies.orient.core.storage.OStorage; import com.orientechnologies.orient.core.version.ORecordVersion; +import java.util.List; + public interface OTransaction { - public enum TXTYPE { - NOTX, OPTIMISTIC, PESSIMISTIC - } + public enum TXTYPE { + NOTX, OPTIMISTIC, PESSIMISTIC + } - public enum TXSTATUS { - INVALID, BEGUN, COMMITTING, ROLLBACKING, COMPLETED - } + public enum TXSTATUS { + INVALID, BEGUN, COMMITTING, ROLLBACKING, COMPLETED + } - public void begin(); + public void begin(); - public void commit(); + public void commit(); - public void rollback(); + public void rollback(); - public ODatabaseRecordTx getDatabase(); + public ODatabaseRecordTx getDatabase(); - public void clearRecordEntries(); + public void clearRecordEntries(); - public ORecordInternal loadRecord(ORID iRid, ORecordInternal iRecord, String iFetchPlan, boolean ignoreCache, - boolean loadTombstone); + public ORecordInternal loadRecord(ORID iRid, ORecordInternal iRecord, String iFetchPlan, boolean ignoreCache, + boolean loadTombstone, final OStorage.LOCKING_STRATEGY iLockingStrategy); - public boolean updateReplica(ORecordInternal iRecord); + public boolean updateReplica(ORecordInternal iRecord); - public void saveRecord(ORecordInternal iContent, String iClusterName, OPERATION_MODE iMode, boolean iForceCreate, - ORecordCallback iRecordCreatedCallback, ORecordCallback iRecordUpdatedCallback); + public void saveRecord(ORecordInternal iContent, String iClusterName, OPERATION_MODE iMode, boolean iForceCreate, + ORecordCallback iRecordCreatedCallback, ORecordCallback iRecordUpdatedCallback); - public void deleteRecord(ORecordInternal iRecord, OPERATION_MODE iMode); + public void deleteRecord(ORecordInternal iRecord, OPERATION_MODE iMode); - public int getId(); + public int getId(); - public TXSTATUS getStatus(); + public TXSTATUS getStatus(); - public Iterable getCurrentRecordEntries(); + public Iterable getCurrentRecordEntries(); - public Iterable getAllRecordEntries(); + public Iterable getAllRecordEntries(); - public List getRecordEntriesByClass(String iClassName); + public List getRecordEntriesByClass(String iClassName); - public List getNewRecordEntriesByClusterIds(int[] iIds); + public List getNewRecordEntriesByClusterIds(int[] iIds); - public ORecordInternal getRecord(ORID iRid); + public ORecordInternal getRecord(ORID iRid); - public ORecordOperation getRecordEntry(ORID rid); + public ORecordOperation getRecordEntry(ORID rid); - public List getInvolvedIndexes(); + public List getInvolvedIndexes(); - public ODocument getIndexChanges(); + public ODocument getIndexChanges(); - public void addIndexEntry(OIndex delegate, final String iIndexName, final OTransactionIndexChanges.OPERATION iStatus, - final Object iKey, final OIdentifiable iValue); + public void addIndexEntry(OIndex delegate, final String iIndexName, final OTransactionIndexChanges.OPERATION iStatus, + final Object iKey, final OIdentifiable iValue); - public void clearIndexEntries(); + public void clearIndexEntries(); - public OTransactionIndexChanges getIndexChanges(String iName); + public OTransactionIndexChanges getIndexChanges(String iName); - /** - * Tells if the transaction is active. - * - * @return - */ - public boolean isActive(); + /** + * Tells if the transaction is active. + * + * @return + */ + public boolean isActive(); - public boolean isUsingLog(); + public boolean isUsingLog(); - public void setUsingLog(boolean useLog); + public void setUsingLog(boolean useLog); - public void close(); + public void close(); - /** - * When commit in transaction is performed all new records will change their identity, but index values will contain stale links, - * to fix them given method will be called for each entry. This update local transaction maps too. - * - * @param oldRid Record identity before commit. - * @param newRid Record identity after commit. - */ - public void updateIdentityAfterCommit(final ORID oldRid, final ORID newRid); + /** + * When commit in transaction is performed all new records will change their identity, but index values will contain stale links, + * to fix them given method will be called for each entry. This update local transaction maps too. + * + * @param oldRid + * Record identity before commit. + * @param newRid + * Record identity after commit. + */ + public void updateIdentityAfterCommit(final ORID oldRid, final ORID newRid); } diff --git a/core/src/main/java/com/orientechnologies/orient/core/tx/OTransactionNoTx.java b/core/src/main/java/com/orientechnologies/orient/core/tx/OTransactionNoTx.java index c6f7e79f7de..7598aafe860 100755 --- a/core/src/main/java/com/orientechnologies/orient/core/tx/OTransactionNoTx.java +++ b/core/src/main/java/com/orientechnologies/orient/core/tx/OTransactionNoTx.java @@ -15,9 +15,6 @@ */ package com.orientechnologies.orient.core.tx; -import java.util.Collection; -import java.util.List; - import com.orientechnologies.common.exception.OException; import com.orientechnologies.orient.core.db.ODatabaseComplex.OPERATION_MODE; import com.orientechnologies.orient.core.db.record.ODatabaseRecordTx; @@ -29,9 +26,13 @@ import com.orientechnologies.orient.core.record.ORecordInternal; import com.orientechnologies.orient.core.record.impl.ODocument; import com.orientechnologies.orient.core.storage.ORecordCallback; +import com.orientechnologies.orient.core.storage.OStorage; import com.orientechnologies.orient.core.tx.OTransactionIndexChanges.OPERATION; import com.orientechnologies.orient.core.version.ORecordVersion; +import java.util.Collection; +import java.util.List; + /** * No operation transaction. * @@ -56,11 +57,11 @@ public void close() { } public ORecordInternal loadRecord(final ORID iRid, final ORecordInternal iRecord, final String iFetchPlan, - boolean ignonreCache, boolean loadTombstone) { + final boolean ignonreCache, final boolean loadTombstone, final OStorage.LOCKING_STRATEGY iLockingStrategy) { if (iRid.isNew()) return null; - return database.executeReadRecord((ORecordId) iRid, iRecord, iFetchPlan, ignonreCache, loadTombstone); + return database.executeReadRecord((ORecordId) iRid, iRecord, iFetchPlan, ignonreCache, loadTombstone, iLockingStrategy); } /** diff --git a/core/src/main/java/com/orientechnologies/orient/core/tx/OTransactionOptimistic.java b/core/src/main/java/com/orientechnologies/orient/core/tx/OTransactionOptimistic.java index 5b982d8851d..863e8527eab 100755 --- a/core/src/main/java/com/orientechnologies/orient/core/tx/OTransactionOptimistic.java +++ b/core/src/main/java/com/orientechnologies/orient/core/tx/OTransactionOptimistic.java @@ -16,11 +16,6 @@ package com.orientechnologies.orient.core.tx; -import java.util.*; -import java.util.Map.Entry; -import java.util.concurrent.Callable; -import java.util.concurrent.atomic.AtomicInteger; - import com.orientechnologies.common.concur.OTimeoutException; import com.orientechnologies.common.log.OLogManager; import com.orientechnologies.orient.core.config.OGlobalConfiguration; @@ -47,9 +42,21 @@ import com.orientechnologies.orient.core.record.ORecordInternal; import com.orientechnologies.orient.core.record.impl.ODocument; import com.orientechnologies.orient.core.storage.ORecordCallback; +import com.orientechnologies.orient.core.storage.OStorage; import com.orientechnologies.orient.core.storage.OStorageEmbedded; import com.orientechnologies.orient.core.version.ORecordVersion; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.concurrent.Callable; +import java.util.concurrent.atomic.AtomicInteger; + public class OTransactionOptimistic extends OTransactionRealAbstract { private static final boolean useSBTree = OGlobalConfiguration.INDEX_USE_SBTREE_BY_DEFAULT.getValueAsBoolean(); @@ -217,7 +224,7 @@ public Object call() throws Exception { } } - status = TXSTATUS.COMPLETED; + status = TXSTATUS.COMPLETED; } public void rollback() { @@ -246,16 +253,16 @@ public Void call() throws Exception { v.getRecord().unload(); indexEntries.clear(); - temp2persistent.clear(); - allEntries.clear(); - recordIndexOperations.clear(); - recordEntries.clear(); + temp2persistent.clear(); + allEntries.clear(); + recordIndexOperations.clear(); + recordEntries.clear(); - status = TXSTATUS.COMPLETED; + status = TXSTATUS.COMPLETED; } public ORecordInternal loadRecord(final ORID iRid, final ORecordInternal iRecord, final String iFetchPlan, - boolean ignoreCache, boolean loadTombstone) { + final boolean ignoreCache, final boolean loadTombstone, final OStorage.LOCKING_STRATEGY iLockingStrategy) { checkTransaction(); final ORecordInternal txRecord = getRecord(iRid); @@ -277,7 +284,8 @@ public ORecordInternal loadRecord(final ORID iRid, final ORecordInternal i return null; // DELEGATE TO THE STORAGE, NO TOMBSTONES SUPPORT IN TX MODE - final ORecordInternal record = database.executeReadRecord((ORecordId) iRid, iRecord, iFetchPlan, ignoreCache, false); + final ORecordInternal record = database.executeReadRecord((ORecordId) iRid, iRecord, iFetchPlan, ignoreCache, false, + iLockingStrategy); if (record != null) addRecord(record, ORecordOperation.LOADED, null); diff --git a/core/src/main/java/com/orientechnologies/orient/core/type/tree/provider/OMVRBTreeEntryDataProviderAbstract.java b/core/src/main/java/com/orientechnologies/orient/core/type/tree/provider/OMVRBTreeEntryDataProviderAbstract.java index 2f984d2525a..3edda236cb1 100755 --- a/core/src/main/java/com/orientechnologies/orient/core/type/tree/provider/OMVRBTreeEntryDataProviderAbstract.java +++ b/core/src/main/java/com/orientechnologies/orient/core/type/tree/provider/OMVRBTreeEntryDataProviderAbstract.java @@ -88,7 +88,7 @@ protected void load(final ODatabaseRecord iDb) { } protected void load(final OStorage iStorage) { - final ORawBuffer raw = iStorage.readRecord((ORecordId) record.getIdentity(), null, false, null, false).getResult(); + final ORawBuffer raw = iStorage.readRecord((ORecordId) record.getIdentity(), null, false, null, false, OStorage.LOCKING_STRATEGY.DEFAULT).getResult(); record.fill((ORecordId) record.getIdentity(), raw.version, raw.buffer, false); fromStream(raw.buffer); } diff --git a/core/src/main/java/com/orientechnologies/orient/core/type/tree/provider/OMVRBTreeProviderAbstract.java b/core/src/main/java/com/orientechnologies/orient/core/type/tree/provider/OMVRBTreeProviderAbstract.java index 76d179b171c..fd221af37ea 100755 --- a/core/src/main/java/com/orientechnologies/orient/core/type/tree/provider/OMVRBTreeProviderAbstract.java +++ b/core/src/main/java/com/orientechnologies/orient/core/type/tree/provider/OMVRBTreeProviderAbstract.java @@ -150,7 +150,7 @@ protected void load(final OStorage iSt) { if (!record.getIdentity().isValid()) // NOTHING TO LOAD return; - ORawBuffer raw = iSt.readRecord((ORecordId) record.getIdentity(), null, false, null, false).getResult(); + ORawBuffer raw = iSt.readRecord((ORecordId) record.getIdentity(), null, false, null, false, OStorage.LOCKING_STRATEGY.DEFAULT).getResult(); if (raw == null) throw new OConfigurationException("Cannot load map with id " + record.getIdentity()); record.getRecordVersion().copyFrom(raw.version); diff --git a/distributed/src/main/java/com/orientechnologies/orient/server/hazelcast/oldsharding/OAutoshardedStorageImpl.java b/distributed/src/main/java/com/orientechnologies/orient/server/hazelcast/oldsharding/OAutoshardedStorageImpl.java index 37da78260f1..8fbb3be2b81 100755 --- a/distributed/src/main/java/com/orientechnologies/orient/server/hazelcast/oldsharding/OAutoshardedStorageImpl.java +++ b/distributed/src/main/java/com/orientechnologies/orient/server/hazelcast/oldsharding/OAutoshardedStorageImpl.java @@ -124,16 +124,15 @@ public OStorageOperationResult createRecord(int iDataSegmentI } @Override - public OStorageOperationResult readRecord(ORecordId iRid, String iFetchPlan, boolean iIgnoreCache, - ORecordCallback iCallback, boolean loadTombstones) { + public OStorageOperationResult readRecord(ORecordId iRid, String iFetchPlan, boolean iIgnoreCache, ORecordCallback iCallback, boolean loadTombstones, LOCKING_STRATEGY iLockingStrategy) { if (undistributedClusters.contains(iRid.getClusterId())) { - return wrapped.readRecord(iRid, iFetchPlan, iIgnoreCache, iCallback, loadTombstones); + return wrapped.readRecord(iRid, iFetchPlan, iIgnoreCache, iCallback, loadTombstones, LOCKING_STRATEGY.DEFAULT); } final ODHTNode node = serverInstance.findSuccessor(iRid.clusterPosition.longValue()); if (node.isLocal()) - return wrapped.readRecord(iRid, iFetchPlan, iIgnoreCache, iCallback, loadTombstones); + return wrapped.readRecord(iRid, iFetchPlan, iIgnoreCache, iCallback, loadTombstones, LOCKING_STRATEGY.DEFAULT); else return new OStorageOperationResult(node.readRecord(wrapped.getName(), iRid), true); } diff --git a/graphdb/src/main/java/com/tinkerpop/blueprints/impls/orient/OrientElementScanIterable.java b/graphdb/src/main/java/com/tinkerpop/blueprints/impls/orient/OrientElementScanIterable.java index c2236048af9..90043369b7e 100644 --- a/graphdb/src/main/java/com/tinkerpop/blueprints/impls/orient/OrientElementScanIterable.java +++ b/graphdb/src/main/java/com/tinkerpop/blueprints/impls/orient/OrientElementScanIterable.java @@ -1,7 +1,5 @@ package com.tinkerpop.blueprints.impls.orient; -import java.util.Iterator; - import com.orientechnologies.orient.core.db.document.ODatabaseDocumentTx; import com.orientechnologies.orient.core.db.record.ODatabaseRecordAbstract; import com.orientechnologies.orient.core.iterator.ORecordIteratorClass; @@ -9,6 +7,8 @@ import com.tinkerpop.blueprints.CloseableIterable; import com.tinkerpop.blueprints.Element; +import java.util.Iterator; + /** * @author Luca Garulli (http://www.orientechnologies.com) */ diff --git a/object/src/main/java/com/orientechnologies/orient/object/db/OCommandSQLPojoWrapper.java b/object/src/main/java/com/orientechnologies/orient/object/db/OCommandSQLPojoWrapper.java index 9c3b1e4c94b..eea301a2b4f 100644 --- a/object/src/main/java/com/orientechnologies/orient/object/db/OCommandSQLPojoWrapper.java +++ b/object/src/main/java/com/orientechnologies/orient/object/db/OCommandSQLPojoWrapper.java @@ -15,15 +15,15 @@ */ package com.orientechnologies.orient.object.db; -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; - import com.orientechnologies.orient.core.command.OCommandContext; import com.orientechnologies.orient.core.command.OCommandContext.TIMEOUT_STRATEGY; import com.orientechnologies.orient.core.command.OCommandRequest; import com.orientechnologies.orient.core.record.impl.ODocument; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + /** * Wraps the execution of a generic command by assuring to convert the result set in POJO where applicable. * @@ -124,6 +124,5 @@ public TIMEOUT_STRATEGY getTimeoutStrategy() { @Override public void setTimeout(long timeout, TIMEOUT_STRATEGY strategy) { command.setTimeout(timeout, strategy); - } } diff --git a/object/src/main/java/com/orientechnologies/orient/object/db/OObjectDatabaseTx.java b/object/src/main/java/com/orientechnologies/orient/object/db/OObjectDatabaseTx.java index 038f1f857d1..ea7e38f0807 100755 --- a/object/src/main/java/com/orientechnologies/orient/object/db/OObjectDatabaseTx.java +++ b/object/src/main/java/com/orientechnologies/orient/object/db/OObjectDatabaseTx.java @@ -19,6 +19,8 @@ import java.util.Collection; import java.util.List; import java.util.Map; + +import com.orientechnologies.orient.core.storage.OStorage; import javassist.util.proxy.Proxy; import javassist.util.proxy.ProxyObject; @@ -279,11 +281,11 @@ public RET detachAll(final Object iPojo, boolean returnNonProxiedInstance) } public RET load(final Object iPojo, final String iFetchPlan, final boolean iIgnoreCache) { - return (RET) load(iPojo, iFetchPlan, iIgnoreCache, false); + return (RET) load(iPojo, iFetchPlan, iIgnoreCache, false, OStorage.LOCKING_STRATEGY.DEFAULT); } @Override - public RET load(Object iPojo, String iFetchPlan, boolean iIgnoreCache, boolean loadTombstone) { + public RET load(Object iPojo, String iFetchPlan, boolean iIgnoreCache, boolean loadTombstone, OStorage.LOCKING_STRATEGY iLockingStrategy) { checkOpeness(); if (iPojo == null) return null; @@ -293,7 +295,7 @@ public RET load(Object iPojo, String iFetchPlan, boolean iIgnoreCache, boo try { record.setInternalStatus(ORecordElement.STATUS.UNMARSHALLING); - record = underlying.load(record, iFetchPlan, iIgnoreCache, loadTombstone); + record = underlying.load(record, iFetchPlan, iIgnoreCache, loadTombstone, OStorage.LOCKING_STRATEGY.DEFAULT); return (RET) stream2pojo(record, iPojo, iFetchPlan); } finally { @@ -310,17 +312,17 @@ public RET load(final ORID iRecordId, final String iFetchPlan) { } public RET load(final ORID iRecordId, final String iFetchPlan, final boolean iIgnoreCache) { - return (RET) load(iRecordId, iFetchPlan, iIgnoreCache, false); + return (RET) load(iRecordId, iFetchPlan, iIgnoreCache, false, OStorage.LOCKING_STRATEGY.DEFAULT); } @Override - public RET load(ORID iRecordId, String iFetchPlan, boolean iIgnoreCache, boolean loadTombstone) { + public RET load(ORID iRecordId, String iFetchPlan, boolean iIgnoreCache, boolean loadTombstone, OStorage.LOCKING_STRATEGY iLockingStrategy) { checkOpeness(); if (iRecordId == null) return null; // GET THE ASSOCIATED DOCUMENT - final ODocument record = (ODocument) underlying.load(iRecordId, iFetchPlan, iIgnoreCache, loadTombstone); + final ODocument record = (ODocument) underlying.load(iRecordId, iFetchPlan, iIgnoreCache, loadTombstone, OStorage.LOCKING_STRATEGY.DEFAULT); if (record == null) return null; diff --git a/server/src/main/java/com/orientechnologies/orient/server/distributed/ODistributedStorage.java b/server/src/main/java/com/orientechnologies/orient/server/distributed/ODistributedStorage.java index ae90f7caf30..70af47ba1d4 100755 --- a/server/src/main/java/com/orientechnologies/orient/server/distributed/ODistributedStorage.java +++ b/server/src/main/java/com/orientechnologies/orient/server/distributed/ODistributedStorage.java @@ -176,24 +176,23 @@ else if (result instanceof Throwable) } } - public OStorageOperationResult readRecord(final ORecordId iRecordId, final String iFetchPlan, - final boolean iIgnoreCache, final ORecordCallback iCallback, boolean loadTombstones) { + public OStorageOperationResult readRecord(final ORecordId iRecordId, final String iFetchPlan, final boolean iIgnoreCache, final ORecordCallback iCallback, boolean loadTombstones, LOCKING_STRATEGY iLockingStrategy) { if (OScenarioThreadLocal.INSTANCE.get() == RUN_MODE.RUNNING_DISTRIBUTED) // ALREADY DISTRIBUTED - return wrapped.readRecord(iRecordId, iFetchPlan, iIgnoreCache, iCallback, loadTombstones); + return wrapped.readRecord(iRecordId, iFetchPlan, iIgnoreCache, iCallback, loadTombstones, LOCKING_STRATEGY.DEFAULT); try { final String clusterName = getClusterNameByRID(iRecordId); final ODistributedConfiguration dConfig = dManager.getDatabaseConfiguration(getName()); if (!dManager.getDatabaseConfiguration(getName()).isReplicationActive(clusterName)) // DON'T REPLICATE - return wrapped.readRecord(iRecordId, iFetchPlan, iIgnoreCache, iCallback, loadTombstones); + return wrapped.readRecord(iRecordId, iFetchPlan, iIgnoreCache, iCallback, loadTombstones, LOCKING_STRATEGY.DEFAULT); final ODistributedPartitioningStrategy strategy = dManager.getPartitioningStrategy(dConfig.getPartitionStrategy(clusterName)); final ODistributedPartition partition = strategy.getPartition(dManager, getName(), clusterName); if (partition.getNodes().contains(dManager.getLocalNodeName())) // LOCAL NODE OWNS THE DATA: GET IT LOCALLY BECAUSE IT'S FASTER - return wrapped.readRecord(iRecordId, iFetchPlan, iIgnoreCache, iCallback, loadTombstones); + return wrapped.readRecord(iRecordId, iFetchPlan, iIgnoreCache, iCallback, loadTombstones, LOCKING_STRATEGY.DEFAULT); // DISTRIBUTE IT final Object result = dManager.sendRequest(getName(), clusterName, new OReadRecordTask(iRecordId), EXECUTION_MODE.RESPONSE); diff --git a/server/src/main/java/com/orientechnologies/orient/server/network/protocol/binary/ONetworkProtocolBinary.java b/server/src/main/java/com/orientechnologies/orient/server/network/protocol/binary/ONetworkProtocolBinary.java index 31a2d556bb2..407e6bd653e 100755 --- a/server/src/main/java/com/orientechnologies/orient/server/network/protocol/binary/ONetworkProtocolBinary.java +++ b/server/src/main/java/com/orientechnologies/orient/server/network/protocol/binary/ONetworkProtocolBinary.java @@ -15,13 +15,6 @@ */ package com.orientechnologies.orient.server.network.protocol.binary; -import java.io.IOException; -import java.io.ObjectOutputStream; -import java.net.Socket; -import java.net.SocketException; -import java.util.*; -import java.util.Map.Entry; - import com.orientechnologies.common.collection.OMultiValue; import com.orientechnologies.common.concur.lock.OLockException; import com.orientechnologies.common.io.OIOException; @@ -39,7 +32,12 @@ import com.orientechnologies.orient.core.db.raw.ODatabaseRaw; import com.orientechnologies.orient.core.db.record.ODatabaseRecordTx; import com.orientechnologies.orient.core.db.record.OIdentifiable; -import com.orientechnologies.orient.core.exception.*; +import com.orientechnologies.orient.core.exception.OConfigurationException; +import com.orientechnologies.orient.core.exception.ODatabaseException; +import com.orientechnologies.orient.core.exception.OSecurityAccessException; +import com.orientechnologies.orient.core.exception.OSecurityException; +import com.orientechnologies.orient.core.exception.OStorageException; +import com.orientechnologies.orient.core.exception.OTransactionAbortedException; import com.orientechnologies.orient.core.fetch.OFetchContext; import com.orientechnologies.orient.core.fetch.OFetchHelper; import com.orientechnologies.orient.core.fetch.OFetchListener; @@ -60,6 +58,7 @@ import com.orientechnologies.orient.core.storage.OCluster; import com.orientechnologies.orient.core.storage.OPhysicalPosition; import com.orientechnologies.orient.core.storage.ORecordMetadata; +import com.orientechnologies.orient.core.storage.OStorage; import com.orientechnologies.orient.core.storage.OStorageProxy; import com.orientechnologies.orient.core.storage.impl.memory.OStorageMemory; import com.orientechnologies.orient.core.version.ORecordVersion; @@ -74,6 +73,17 @@ import com.orientechnologies.orient.server.plugin.OServerPluginHelper; import com.orientechnologies.orient.server.tx.OTransactionOptimisticProxy; +import java.io.IOException; +import java.io.ObjectOutputStream; +import java.net.Socket; +import java.net.SocketException; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; + public class ONetworkProtocolBinary extends OBinaryNetworkProtocolAbstract { protected OClientConnection connection; protected OUser account; @@ -387,7 +397,7 @@ private void floorPositions() throws IOException { sendOk(clientTxId); final OPhysicalPosition[] previousPositions = connection.database.getStorage().floorPhysicalPositions(clusterId, - new OPhysicalPosition(clusterPosition)); + new OPhysicalPosition(clusterPosition)); if (previousPositions != null) { channel.writeInt(previousPositions.length); @@ -420,7 +430,7 @@ private void higherPositions() throws IOException { sendOk(clientTxId); OPhysicalPosition[] nextPositions = connection.database.getStorage().higherPhysicalPositions(clusterId, - new OPhysicalPosition(clusterPosition)); + new OPhysicalPosition(clusterPosition)); if (nextPositions != null) { @@ -451,7 +461,7 @@ private void ceilingPositions() throws IOException { sendOk(clientTxId); final OPhysicalPosition[] previousPositions = connection.database.getStorage().ceilingPhysicalPositions(clusterId, - new OPhysicalPosition(clusterPosition)); + new OPhysicalPosition(clusterPosition)); if (previousPositions != null) { channel.writeInt(previousPositions.length); @@ -740,7 +750,7 @@ protected void shutdownConnection() throws IOException { setDataCommandInfo("Shutdowning"); OLogManager.instance().info(this, "Received shutdown command from the remote client %s:%d", channel.socket.getInetAddress(), - channel.socket.getPort()); + channel.socket.getPort()); final String user = channel.readString(); final String passwd = channel.readString(); @@ -1244,9 +1254,6 @@ protected void cleanOutRecord() throws IOException { protected void updateRecord() throws IOException { setDataCommandInfo("Update record"); - if (!isConnectionAlive()) - return; - if (!isConnectionAlive()) return; @@ -1346,7 +1353,8 @@ protected void readRecord() throws IOException { } } else { - final ORecordInternal record = connection.database.load(rid, fetchPlanString, ignoreCache, loadTombstones); + final ORecordInternal record = connection.database.load(rid, fetchPlanString, ignoreCache, loadTombstones, + OStorage.LOCKING_STRATEGY.DEFAULT); beginResponse(); try { diff --git a/tests/src/test/java/com/orientechnologies/orient/test/database/auto/ConcurrentUpdatesTest.java b/tests/src/test/java/com/orientechnologies/orient/test/database/auto/ConcurrentUpdatesTest.java index e0335106f74..572744acbb4 100755 --- a/tests/src/test/java/com/orientechnologies/orient/test/database/auto/ConcurrentUpdatesTest.java +++ b/tests/src/test/java/com/orientechnologies/orient/test/database/auto/ConcurrentUpdatesTest.java @@ -15,34 +15,41 @@ */ package com.orientechnologies.orient.test.database.auto; -import java.util.concurrent.atomic.AtomicLong; - -import org.testng.Assert; -import org.testng.annotations.*; - import com.orientechnologies.common.concur.ONeedRetryException; import com.orientechnologies.orient.core.config.OGlobalConfiguration; import com.orientechnologies.orient.core.db.document.ODatabaseDocumentTx; import com.orientechnologies.orient.core.id.ORID; import com.orientechnologies.orient.core.record.impl.ODocument; +import com.orientechnologies.orient.core.sql.OCommandSQL; import com.orientechnologies.orient.core.tx.OTransaction.TXTYPE; import com.orientechnologies.orient.enterprise.channel.binary.OResponseProcessingException; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Optional; +import org.testng.annotations.Parameters; +import org.testng.annotations.Test; + +import java.util.concurrent.atomic.AtomicLong; @Test public class ConcurrentUpdatesTest { - private final static int CYCLES = 50; - private final static int MAX_RETRIES = 100; + private final static int OPTIMISTIC_CYCLES = 100; + private final static int PESSIMISTIC_CYCLES = 100; + private final static int THREADS = 10; + private final static int MAX_RETRIES = 100; protected String url; private boolean level1CacheEnabled; private boolean level2CacheEnabled; private boolean mvccEnabled; + private long startedOn; - private final AtomicLong counter = new AtomicLong(); - private final AtomicLong totalRetries = new AtomicLong(); + private final AtomicLong counter = new AtomicLong(); + private final AtomicLong totalRetries = new AtomicLong(); - class UpdateField implements Runnable { + class OptimisticUpdateField implements Runnable { ODatabaseDocumentTx db; ORID rid1; @@ -50,7 +57,7 @@ class UpdateField implements Runnable { String fieldValue = null; String threadName; - public UpdateField(ODatabaseDocumentTx iDb, ORID iRid1, ORID iRid2, String iThreadName) { + public OptimisticUpdateField(ODatabaseDocumentTx iDb, ORID iRid1, ORID iRid2, String iThreadName) { super(); db = iDb; rid1 = iRid1; @@ -61,7 +68,7 @@ public UpdateField(ODatabaseDocumentTx iDb, ORID iRid1, ORID iRid2, String iThre @Override public void run() { try { - for (int i = 0; i < CYCLES; i++) { + for (int i = 0; i < OPTIMISTIC_CYCLES; i++) { for (int retry = 0; retry < MAX_RETRIES; ++retry) { try { db.begin(TXTYPE.OPTIMISTIC); @@ -99,6 +106,57 @@ public void run() { } } + class PessimisticUpdate implements Runnable { + + ODatabaseDocumentTx db; + String fieldValue = null; + ORID rid; + String threadName; + boolean lock; + + public PessimisticUpdate(ODatabaseDocumentTx iDb, ORID iRid, String iThreadName, boolean iLock) { + super(); + db = iDb; + rid = iRid; + threadName = iThreadName; + lock = iLock; + } + + @Override + public void run() { + try { + for (int i = 0; i < PESSIMISTIC_CYCLES; i++) { + String cmd = "update " + rid + " increment total = 1"; + if (lock) + cmd += " strategy lock"; + + for (int retry = 0; retry < MAX_RETRIES; ++retry) { + try { + db.command(new OCommandSQL(cmd)).execute(); + counter.incrementAndGet(); + break; + + } catch (OResponseProcessingException e) { + Assert.assertTrue(e.getCause() instanceof ONeedRetryException); + + System.out.println("SQL UPDATE - Retry " + Thread.currentThread().getName() + " " + i + " - " + retry + "/" + + MAX_RETRIES + "..."); + // Thread.sleep(retry * 10); + } catch (ONeedRetryException e) { + System.out.println("SQL UPDATE - Retry " + Thread.currentThread().getName() + " " + i + " - " + retry + "/" + + MAX_RETRIES + "..."); + // Thread.sleep(retry * 10); + } + // System.out.println("thread " + threadName + " counter " + counter.get()); + } + } + } catch (Throwable e) { + e.printStackTrace(); + Assert.assertTrue(false); + } + } + } + @Parameters(value = "url") public ConcurrentUpdatesTest(@Optional(value = "memory:test") String iURL) { url = iURL; @@ -119,6 +177,7 @@ public void init() { if ("memory:test".equals(url)) new ODatabaseDocumentTx(url).create().close(); + } @AfterClass @@ -129,57 +188,119 @@ public void deinit() { } @Test - public void concurrentUpdates() throws Exception { - ODatabaseDocumentTx database1 = new ODatabaseDocumentTx(url).open("admin", "admin"); - ODatabaseDocumentTx database2 = new ODatabaseDocumentTx(url).open("admin", "admin"); - ODatabaseDocumentTx database3 = new ODatabaseDocumentTx(url).open("admin", "admin"); + public void concurrentOptimisticUpdates() throws Exception { + System.out.println("Started Test OPTIMISTIC"); + + counter.set(0); + startedOn = System.currentTimeMillis(); - ODocument doc1 = database1.newInstance(); + ODatabaseDocumentTx[] databases = new ODatabaseDocumentTx[THREADS]; + for (int i = 0; i < THREADS; ++i) + databases[i] = new ODatabaseDocumentTx(url).open("admin", "admin"); + + ODocument doc1 = databases[0].newInstance(); doc1.field("INIT", "ok"); - database1.save(doc1); + databases[0].save(doc1); ORID rid1 = doc1.getIdentity(); - ODocument doc2 = database1.newInstance(); + ODocument doc2 = databases[0].newInstance(); doc2.field("INIT", "ok"); - database1.save(doc2); + databases[0].save(doc2); ORID rid2 = doc2.getIdentity(); - UpdateField vUpdate1 = new UpdateField(database1, rid1, rid2, "thread1"); - UpdateField vUpdate2 = new UpdateField(database2, rid2, rid1, "thread2"); - UpdateField vUpdate3 = new UpdateField(database3, rid2, rid1, "thread3"); + OptimisticUpdateField[] ops = new OptimisticUpdateField[THREADS]; + for (int i = 0; i < THREADS; ++i) + ops[i] = new OptimisticUpdateField(databases[i], rid1, rid2, "thread" + i); - Thread vThread1 = new Thread(vUpdate1, "ConcurrentTest1"); - Thread vThread2 = new Thread(vUpdate2, "ConcurrentTest2"); - Thread vThread3 = new Thread(vUpdate3, "ConcurrentTest3"); + Thread[] threads = new Thread[THREADS]; + for (int i = 0; i < THREADS; ++i) + threads[i] = new Thread(ops[i], "ConcurrentTest" + i); - vThread1.start(); - vThread2.start(); - vThread3.start(); + for (int i = 0; i < THREADS; ++i) + threads[i].start(); - vThread1.join(); - vThread2.join(); - vThread3.join(); + for (int i = 0; i < THREADS; ++i) + threads[i].join(); System.out.println("Done! Total updates executed in parallel: " + counter.get() + " average retries: " + ((float) totalRetries.get() / (float) counter.get())); - Assert.assertEquals(counter.get(), CYCLES * 3); + Assert.assertEquals(counter.get(), OPTIMISTIC_CYCLES * THREADS); + + doc1 = databases[0].load(rid1, null, true); + + for (int i = 0; i < THREADS; ++i) + Assert.assertEquals(doc1.field(ops[i].threadName), ops[i].fieldValue, ops[i].threadName); - doc1 = database1.load(rid1, null, true); - Assert.assertEquals(doc1.field(vUpdate1.threadName), vUpdate1.fieldValue, vUpdate1.threadName); - Assert.assertEquals(doc1.field(vUpdate2.threadName), vUpdate2.fieldValue, vUpdate2.threadName); - Assert.assertEquals(doc1.field(vUpdate3.threadName), vUpdate3.fieldValue, vUpdate3.threadName); System.out.println("RESULT doc 1:"); System.out.println(doc1.toJSON()); - doc2 = database1.load(rid2, null, true); - Assert.assertEquals(doc2.field(vUpdate1.threadName), vUpdate1.fieldValue, vUpdate1.threadName); - Assert.assertEquals(doc2.field(vUpdate2.threadName), vUpdate2.fieldValue, vUpdate2.threadName); - Assert.assertEquals(doc2.field(vUpdate3.threadName), vUpdate3.fieldValue, vUpdate3.threadName); + doc2 = databases[0].load(rid2, null, true); + + for (int i = 0; i < THREADS; ++i) + Assert.assertEquals(doc2.field(ops[i].threadName), ops[i].fieldValue, ops[i].threadName); + System.out.println("RESULT doc 2:"); System.out.println(doc2.toJSON()); - database1.close(); - database2.close(); + for (int i = 0; i < THREADS; ++i) + databases[i].close(); + + System.out.println("Test completed in " + (System.currentTimeMillis() - startedOn)); + } + + @Test + public void concurrentPessimisticSQLUpdates() throws Exception { + if (url.startsWith("local:")) + // SKIP TEST WITH LOCAL + return; + sqlUpdate(true); + } + + @Test + public void concurrentOptimisticSQLUpdates() throws Exception { + sqlUpdate(false); + } + + protected void sqlUpdate(boolean lock) throws InterruptedException { + System.out.println("Started Test " + (lock ? "LOCK" : "")); + + counter.set(0); + startedOn = System.currentTimeMillis(); + + ODatabaseDocumentTx[] databases = new ODatabaseDocumentTx[THREADS]; + for (int i = 0; i < THREADS; ++i) + databases[i] = new ODatabaseDocumentTx(url).open("admin", "admin"); + + ODocument doc1 = databases[0].newInstance(); + doc1.field("total", 0); + databases[0].save(doc1); + ORID rid1 = doc1.getIdentity(); + + PessimisticUpdate[] ops = new PessimisticUpdate[THREADS]; + for (int i = 0; i < THREADS; ++i) + ops[i] = new PessimisticUpdate(databases[i], rid1, "thread" + i, lock); + + Thread[] threads = new Thread[THREADS]; + for (int i = 0; i < THREADS; ++i) + threads[i] = new Thread(ops[i], "ConcurrentTest" + i); + + for (int i = 0; i < THREADS; ++i) + threads[i].start(); + + for (int i = 0; i < THREADS; ++i) + threads[i].join(); + + System.out.println("Done! Total sql updates executed in parallel: " + counter.get()); + + Assert.assertEquals(counter.get(), PESSIMISTIC_CYCLES * THREADS); + + doc1 = databases[0].load(rid1, null, true); + Assert.assertEquals(doc1.field("total"), PESSIMISTIC_CYCLES * THREADS); + + for (int i = 0; i < THREADS; ++i) + databases[i].close(); + + System.out.println("Test " + (lock ? "LOCK" : "") + " completed in " + (System.currentTimeMillis() - startedOn)); } } diff --git a/tests/src/test/java/com/orientechnologies/orient/test/database/speed/ReadAllClusterObjectsSpeedTest.java b/tests/src/test/java/com/orientechnologies/orient/test/database/speed/ReadAllClusterObjectsSpeedTest.java index 428f767042f..a1e3b211bc5 100755 --- a/tests/src/test/java/com/orientechnologies/orient/test/database/speed/ReadAllClusterObjectsSpeedTest.java +++ b/tests/src/test/java/com/orientechnologies/orient/test/database/speed/ReadAllClusterObjectsSpeedTest.java @@ -15,14 +15,15 @@ */ package com.orientechnologies.orient.test.database.speed; -import java.io.IOException; -import java.io.UnsupportedEncodingException; - import com.orientechnologies.common.test.SpeedTestMonoThread; import com.orientechnologies.orient.core.db.raw.ODatabaseRaw; import com.orientechnologies.orient.core.id.OClusterPositionFactory; import com.orientechnologies.orient.core.id.ORecordId; import com.orientechnologies.orient.core.storage.ORawBuffer; +import com.orientechnologies.orient.core.storage.OStorage; + +import java.io.IOException; +import java.io.UnsupportedEncodingException; public class ReadAllClusterObjectsSpeedTest extends SpeedTestMonoThread { private static final String CLUSTER_NAME = "Animal"; @@ -50,7 +51,7 @@ public void cycle() throws UnsupportedEncodingException { for (int i = 0; i < db.countClusterElements(CLUSTER_NAME); ++i) { rid.clusterPosition = OClusterPositionFactory.INSTANCE.valueOf(i); - buffer = db.read(rid, null, false, false).getResult(); + buffer = db.read(rid, null, false, false, OStorage.LOCKING_STRATEGY.DEFAULT).getResult(); if (buffer != null) ++objectsRead; } diff --git a/tools/src/main/java/com/orientechnologies/orient/console/OConsoleDatabaseApp.java b/tools/src/main/java/com/orientechnologies/orient/console/OConsoleDatabaseApp.java index 472d5cae2c2..49877f9096e 100755 --- a/tools/src/main/java/com/orientechnologies/orient/console/OConsoleDatabaseApp.java +++ b/tools/src/main/java/com/orientechnologies/orient/console/OConsoleDatabaseApp.java @@ -973,7 +973,7 @@ public void displayRawRecord(@ConsoleParameter(name = "rid", description = "The checkForDatabase(); ORecordId rid = new ORecordId(iRecordId); - final ORawBuffer buffer = currentDatabase.getStorage().readRecord(rid, null, false, null, false).getResult(); + final ORawBuffer buffer = currentDatabase.getStorage().readRecord(rid, null, false, null, false, OStorage.LOCKING_STRATEGY.DEFAULT).getResult(); if (buffer == null) throw new OException("The record has been deleted"); @@ -1732,7 +1732,7 @@ public void reloadRecordInternal(String iRecordId, String iFetchPlan) { checkForDatabase(); currentRecord = ((ODatabaseRecordAbstract) currentDatabase.getUnderlying()).executeReadRecord(new ORecordId(iRecordId), null, - iFetchPlan, true, false); + iFetchPlan, true, false, OStorage.LOCKING_STRATEGY.DEFAULT); displayRecord(null); message("\nOK");