Skip to content

Commit

Permalink
Distributed: fixed issue #2008 on deploying database
Browse files Browse the repository at this point in the history
  • Loading branch information
lvca committed Jan 30, 2014
1 parent 5eff66f commit 2a590b7
Show file tree
Hide file tree
Showing 4 changed files with 116 additions and 70 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import com.orientechnologies.orient.core.db.OScenarioThreadLocal;
import com.orientechnologies.orient.core.db.OScenarioThreadLocal.RUN_MODE;
import com.orientechnologies.orient.core.db.document.ODatabaseDocumentTx;
import com.orientechnologies.orient.core.record.impl.ODocument;
import com.orientechnologies.orient.server.config.OServerUserConfiguration;
import com.orientechnologies.orient.server.distributed.*;
import com.orientechnologies.orient.server.distributed.ODistributedRequest.EXECUTION_MODE;
Expand Down Expand Up @@ -533,9 +532,7 @@ protected void checkLocalNodeInConfiguration() {
}

if (dirty) {
final ODocument doc = cfg.serialize();
manager.updateCachedDatabaseConfiguration(databaseName, doc);
manager.getConfigurationMap().put(OHazelcastPlugin.CONFIG_DATABASE_PREFIX + databaseName, doc);
manager.updateCachedDatabaseConfiguration(databaseName, cfg.serialize(), true, true);
}
}

Expand Down Expand Up @@ -573,9 +570,7 @@ protected void removeNodeInConfiguration(final String iNode, final boolean iForc
}

if (dirty) {
final ODocument doc = cfg.serialize();
manager.updateCachedDatabaseConfiguration(databaseName, doc);
manager.getConfigurationMap().put(OHazelcastPlugin.CONFIG_DATABASE_PREFIX + databaseName, doc);
manager.updateCachedDatabaseConfiguration(databaseName, cfg.serialize(), true, true);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -470,7 +470,7 @@ public void entryAdded(EntryEvent<String, Object> iEvent) {
}

} else if (key.startsWith(CONFIG_DATABASE_PREFIX)) {
saveDatabaseConfiguration(key.substring(CONFIG_DATABASE_PREFIX.length()), (ODocument) iEvent.getValue());
updateCachedDatabaseConfiguration(key.substring(CONFIG_DATABASE_PREFIX.length()), (ODocument) iEvent.getValue(), true, false);
OClientConnectionManager.instance().pushDistribCfg2Clients(getClusterConfiguration());
}
}
Expand All @@ -493,7 +493,7 @@ public void entryUpdated(EntryEvent<String, Object> iEvent) {
getNodeName(iEvent.getMember()));

installNewDatabases(false);
saveDatabaseConfiguration(dbName, (ODocument) iEvent.getValue());
updateCachedDatabaseConfiguration(dbName, (ODocument) iEvent.getValue(), true, false);
OClientConnectionManager.instance().pushDistribCfg2Clients(getClusterConfiguration());
}
}
Expand Down Expand Up @@ -671,15 +671,20 @@ protected void installNewDatabases(final boolean iStartup) {
ODistributedDatabaseChunk chunk = (ODistributedDatabaseChunk) value;

final String fileName = System.getProperty("java.io.tmpdir") + "/orientdb/install_" + databaseName + ".zip";

ODistributedServerLog.warn(this, getLocalNodeName(), r.getKey(), DIRECTION.NONE,
"copying remote database '%s' to: %s", databaseName, fileName);

final File file = new File(fileName);
if (file.exists())
file.delete();

FileOutputStream out = null;
try {
final FileOutputStream out = new FileOutputStream(fileName, false);
out = new FileOutputStream(fileName, false);

out.write(chunk.buffer);
for (int chunkNum = 1; !chunk.last; chunkNum++) {
long fileSize = writeDatabaseChunk(1, chunk, out);
for (int chunkNum = 2; !chunk.last; chunkNum++) {
distrDatabase.setWaitForTaskType(OCopyDatabaseChunkTask.class);

final Map<String, Object> result = (Map<String, Object>) sendRequest(databaseName, null,
Expand All @@ -691,14 +696,25 @@ protected void installNewDatabases(final boolean iStartup) {
continue;
else {
chunk = (ODistributedDatabaseChunk) res.getValue();
out.write(chunk.buffer);
fileSize += writeDatabaseChunk(chunkNum, chunk, out);
}
}
}

ODistributedServerLog.warn(this, getLocalNodeName(), null, DIRECTION.NONE,
"database copied correctly, size=%s", OFileUtils.getSizeAsString(fileSize));

} catch (Exception e) {
ODistributedServerLog.error(this, getLocalNodeName(), null, DIRECTION.NONE,
"error on transferring database '%s' to '%s'", e, databaseName, fileName);
} finally {
try {
if (out != null) {
out.flush();
out.close();
}
} catch (IOException e) {
}
}

installDatabase(distrDatabase, databaseName, dbPath, r.getKey(), fileName);
Expand All @@ -720,29 +736,51 @@ protected void installNewDatabases(final boolean iStartup) {
}
}

private void installDatabase(final OHazelcastDistributedDatabase distrDatabase, final String databaseName, final String dbPath,
public void updateCachedDatabaseConfiguration(String iDatabaseName, ODocument cfg, boolean iSaveToDisk, boolean iDeployToCluster) {
super.updateCachedDatabaseConfiguration(iDatabaseName, cfg, iSaveToDisk);

if (iDeployToCluster)
// DEPLOY THE CONFIGURATION TO THE CLUSTER
getConfigurationMap().put(OHazelcastPlugin.CONFIG_DATABASE_PREFIX + iDatabaseName, cfg);
}

protected long writeDatabaseChunk(final int iChunkId, final ODistributedDatabaseChunk chunk, final FileOutputStream out)
throws IOException {

ODistributedServerLog.warn(this, getLocalNodeName(), null, DIRECTION.NONE, "- writing chunk #%d offset=%d size=%s", iChunkId,
chunk.offset, OFileUtils.getSizeAsString(chunk.buffer.length));
out.write(chunk.buffer);

return chunk.buffer.length;
}

protected void installDatabase(final OHazelcastDistributedDatabase distrDatabase, final String databaseName, final String dbPath,
final String iNode, final String iDatabaseCompressedFile) {
ODistributedServerLog.warn(this, getLocalNodeName(), iNode, DIRECTION.IN, "installing database %s in %s...", databaseName,
ODistributedServerLog.warn(this, getLocalNodeName(), iNode, DIRECTION.IN, "installing database '%s' to: %s...", databaseName,
dbPath);

try {
final FileInputStream in = new FileInputStream(iDatabaseCompressedFile);
File f = new File(iDatabaseCompressedFile);

new File(dbPath).mkdirs();
final ODatabaseDocumentTx db = new ODatabaseDocumentTx("local:" + dbPath);

db.restore(in, null, null);
in.close();
final FileInputStream in = new FileInputStream(f);
try {
db.restore(in, null, null);
} finally {
in.close();
}

db.close();
Orient.instance().unregisterStorageByName(db.getName());

ODistributedServerLog.warn(this, getLocalNodeName(), null, DIRECTION.NONE,
"installed database %s in %s, setting it online...", databaseName, dbPath);
ODistributedServerLog.warn(this, getLocalNodeName(), null, DIRECTION.NONE, "installed database '%s', setting it online...",
databaseName);

distrDatabase.setOnline();

ODistributedServerLog.warn(this, getLocalNodeName(), null, DIRECTION.NONE, "database %s is online", databaseName);
ODistributedServerLog.warn(this, getLocalNodeName(), null, DIRECTION.NONE, "database '%s' is online", databaseName);

} catch (IOException e) {
ODistributedServerLog.warn(this, getLocalNodeName(), null, DIRECTION.IN, "error on copying database '%s' on local server", e,
Expand All @@ -759,7 +797,7 @@ protected ODocument loadDatabaseConfiguration(final String iDatabaseName, final
ODistributedServerLog.info(this, getLocalNodeName(), null, DIRECTION.NONE,
"loaded database configuration from active cluster");

updateCachedDatabaseConfiguration(iDatabaseName, cfg);
updateCachedDatabaseConfiguration(iDatabaseName, cfg, false, false);
return cfg;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,6 @@
*/
package com.orientechnologies.orient.server.distributed;

import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;

import com.orientechnologies.common.log.OLogManager;
import com.orientechnologies.common.parser.OSystemVariableResolver;
import com.orientechnologies.orient.core.Orient;
Expand All @@ -38,6 +30,14 @@
import com.orientechnologies.orient.server.distributed.conflict.OReplicationConflictResolver;
import com.orientechnologies.orient.server.plugin.OServerPluginAbstract;

import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;

/**
* Abstract plugin to manage the distributed environment.
*
Expand Down Expand Up @@ -197,7 +197,7 @@ protected ODocument loadDatabaseConfiguration(final String iDatabaseName, final
f.read(buffer);

final ODocument doc = (ODocument) new ODocument().fromJSON(new String(buffer), "noMap");
updateCachedDatabaseConfiguration(iDatabaseName, doc);
updateCachedDatabaseConfiguration(iDatabaseName, doc, false);
return doc;

} catch (Exception e) {
Expand All @@ -211,12 +211,51 @@ protected ODocument loadDatabaseConfiguration(final String iDatabaseName, final
return null;
}

public void updateCachedDatabaseConfiguration(final String iDatabaseName, final ODocument cfg) {
public void updateCachedDatabaseConfiguration(final String iDatabaseName, final ODocument cfg, final boolean iSaveToDisk) {
synchronized (cachedDatabaseConfiguration) {
final ODocument oldCfg = cachedDatabaseConfiguration.get(iDatabaseName);
if (oldCfg != null && (oldCfg == cfg || Arrays.equals(oldCfg.toStream(), cfg.toStream())))
// NO CHANGE, SKIP IT
return;

// INCREMENT VERSION
Integer oldVersion = cfg.field("version");
if (oldVersion == null)
oldVersion = 0;
cfg.field("version", oldVersion.intValue() + 1);

// SAVE IN NODE'S LOCAL RAM
cachedDatabaseConfiguration.put(iDatabaseName, cfg);

// PRINT THE NEW CONFIGURATION
OLogManager.instance().info(this, "updated distributed configuration for database: %s:\n----------\n%s\n----------",
iDatabaseName, cfg.toJSON("prettyPrint"));

if (iSaveToDisk) {
// SAVE THE CONFIGURATION TO DISK
FileOutputStream f = null;
try {
File file = getDistributedConfigFile(iDatabaseName);

OLogManager.instance().info(this, "Saving distributed configuration file for database '%s' to: %s", iDatabaseName, file);

if (!file.exists())
file.createNewFile();

f = new FileOutputStream(file);
f.write(cfg.toJSON().getBytes());
f.flush();
} catch (Exception e) {
OLogManager.instance().error(this, "Error on saving distributed configuration file", e);

} finally {
if (f != null)
try {
f.close();
} catch (IOException e) {
}
}
}
}
}

Expand All @@ -235,42 +274,6 @@ public ODistributedConfiguration getDatabaseConfiguration(final String iDatabase
}
}

protected void saveDatabaseConfiguration(final String iDatabaseName, final ODocument cfg) {
synchronized (cachedDatabaseConfiguration) {
final ODocument oldCfg = cachedDatabaseConfiguration.get(iDatabaseName);
if (oldCfg != null && Arrays.equals(oldCfg.toStream(), cfg.toStream()))
// NO CHANGE, SKIP IT
return;
}

// INCREMENT VERSION
Integer oldVersion = cfg.field("version");
if (oldVersion == null)
oldVersion = 0;
cfg.field("version", oldVersion.intValue() + 1);

updateCachedDatabaseConfiguration(iDatabaseName, cfg);

FileOutputStream f = null;
try {
File file = getDistributedConfigFile(iDatabaseName);

OLogManager.instance().config(this, "Saving distributed configuration file for database '%s' in: %s", iDatabaseName, file);

f = new FileOutputStream(file);
f.write(cfg.toJSON().getBytes());
} catch (Exception e) {
OLogManager.instance().error(this, "Error on saving distributed configuration file", e);

} finally {
if (f != null)
try {
f.close();
} catch (IOException e) {
}
}
}

public File getDistributedConfigFile(final String iDatabaseName) {
return new File(serverInstance.getDatabaseDirectory() + iDatabaseName + "/" + FILE_DISTRIBUTED_DB_CONFIG);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,18 +60,28 @@ public Object execute(final OServer iServer, ODistributedServerManager iManager,
ODistributedServerLog.warn(this, iManager.getLocalNodeName(), getNodeSource(), DIRECTION.OUT, "deploying database %s...",
databaseName);

final File f = new File(BACKUP_DIRECTORY + "/" + database.getName());
final File f = new File(BACKUP_DIRECTORY + "/backup_" + database.getName() + ".zip");
if (f.exists())
f.delete();
else
f.getParentFile().mkdirs();
f.createNewFile();

ODistributedServerLog.warn(this, iManager.getLocalNodeName(), getNodeSource(), DIRECTION.OUT,
"creating backup of database '%s' in directory: %s...", databaseName, f.getAbsolutePath());

database.backup(new FileOutputStream(f), null, null);

ODistributedServerLog.warn(this, iManager.getLocalNodeName(), getNodeSource(), DIRECTION.OUT,
"sending the compressed database '%s' over the network to node '%s', size=%s...", databaseName, getNodeSource(),
OFileUtils.getSizeAsString(f.length()));

return new ODistributedDatabaseChunk(f, 0, CHUNK_MAX_SIZE);
final ODistributedDatabaseChunk chunk = new ODistributedDatabaseChunk(f, 0, CHUNK_MAX_SIZE);

ODistributedServerLog.warn(this, iManager.getLocalNodeName(), getNodeSource(), ODistributedServerLog.DIRECTION.OUT,
"- transferring chunk #%d offset=%d size=%s...", 1, 0, OFileUtils.getSizeAsNumber(chunk.buffer.length));

return chunk;

} finally {
lock.unlock();
Expand Down

0 comments on commit 2a590b7

Please sign in to comment.