Skip to content

Commit

Permalink
Refactoring
Browse files Browse the repository at this point in the history
  - Abstract Elasticsearch responses (JSONObject)
  - Simplify settings
  - Improve API
  - Add tests
  - Remove elasticsearch-test
  - Add README
  • Loading branch information
ggrossetie committed Apr 29, 2016
1 parent fb5064e commit a2cd50a
Show file tree
Hide file tree
Showing 17 changed files with 468 additions and 320 deletions.
2 changes: 2 additions & 0 deletions README.adoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
= Flamingo
Elasticsearch data migrations made easy.
13 changes: 0 additions & 13 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
<joda-time.version>2.9.3</joda-time.version>
<assertj-core.version>2.4.1</assertj-core.version>
<elasticsearch.version>1.7.5</elasticsearch.version>
<elasticsearch-test.version>1.7.5-erdf-1</elasticsearch-test.version>
</properties>
<dependencies>
<dependency>
Expand Down Expand Up @@ -48,18 +47,6 @@
<version>${elasticsearch.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.github.tlrx</groupId>
<artifactId>elasticsearch-test</artifactId>
<version>${elasticsearch-test.version}</version>
<exclusions>
<exclusion>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
</exclusion>
</exclusions>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,15 @@
package org.indusbox.flamingo.elasticsearch;
package org.indusbox.flamingo;

import java.io.File;
import java.util.Properties;

public class ElasticsearchSettings {
import com.google.common.base.Strings;

private String indexName;
public class FlamingoSettings {

private File scriptsDir;

private String indexName = ".flamingo";

private String protocol;
private String hostName;
Expand All @@ -13,21 +18,35 @@ public class ElasticsearchSettings {
private String username;
private String password;

public static ElasticsearchSettings fromConfig(Properties config) {
return new ElasticsearchSettings()
public static FlamingoSettings fromConfig(Properties config) {
FlamingoSettings flamingoSettings = new FlamingoSettings()
.setScriptsDir(new File(config.getProperty("flamingo.scriptsDir")))
.setUsername(config.getProperty("elasticsearch.user"))
.setPassword(config.getProperty("elasticsearch.password"))
.setProtocol(config.getProperty("elasticsearch.protocol"))
.setHostName(config.getProperty("elasticsearch.host"))
.setPort(Integer.valueOf(config.getProperty("elasticsearch.port")))
.setIndexName(config.getProperty("elasticsearch.index"));
.setPort(Integer.valueOf(config.getProperty("elasticsearch.port")));
String indexNameValue = config.getProperty("elasticsearch.index");
if (!Strings.isNullOrEmpty(indexNameValue)) {
flamingoSettings.setIndexName(indexNameValue);
}
return flamingoSettings;
}

public File getScriptsDir() {
return scriptsDir;
}

public FlamingoSettings setScriptsDir(File scriptsDir) {
this.scriptsDir = scriptsDir;
return this;
}

public String getIndexName() {
return indexName;
}

public ElasticsearchSettings setIndexName(String indexName) {
public FlamingoSettings setIndexName(String indexName) {
this.indexName = indexName;
return this;
}
Expand All @@ -36,7 +55,7 @@ public String getProtocol() {
return protocol;
}

public ElasticsearchSettings setProtocol(String protocol) {
public FlamingoSettings setProtocol(String protocol) {
this.protocol = protocol;
return this;
}
Expand All @@ -45,7 +64,7 @@ public String getHostName() {
return hostName;
}

public ElasticsearchSettings setHostName(String hostName) {
public FlamingoSettings setHostName(String hostName) {
this.hostName = hostName;
return this;
}
Expand All @@ -54,7 +73,7 @@ public int getPort() {
return port;
}

public ElasticsearchSettings setPort(int port) {
public FlamingoSettings setPort(int port) {
this.port = port;
return this;
}
Expand All @@ -63,7 +82,7 @@ public String getUsername() {
return username;
}

public ElasticsearchSettings setUsername(String username) {
public FlamingoSettings setUsername(String username) {
this.username = username;
return this;
}
Expand All @@ -72,7 +91,7 @@ public String getPassword() {
return password;
}

public ElasticsearchSettings setPassword(String password) {
public FlamingoSettings setPassword(String password) {
this.password = password;
return this;
}
Expand Down
137 changes: 64 additions & 73 deletions src/main/java/org/indusbox/flamingo/Migration.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,6 @@
import java.util.Objects;
import java.util.Properties;

import org.indusbox.flamingo.elasticsearch.ElasticsearchClient;
import org.indusbox.flamingo.elasticsearch.ElasticsearchSettings;
import org.json.simple.JSONArray;
import org.json.simple.JSONObject;
import org.json.simple.parser.ParseException;

import com.google.common.base.Optional;
import com.google.common.base.Predicate;
import com.google.common.collect.Iterables;
Expand All @@ -23,93 +17,90 @@

public class Migration {

public static void main(String[] args) throws NoSuchAlgorithmException, KeyStoreException, KeyManagementException, IOException, ParseException {
public static void main(String[] args) throws NoSuchAlgorithmException, KeyStoreException, KeyManagementException, IOException {
Properties config = ConfigurationLoader.load();
MigrationSettings migrationSettings = MigrationSettings.fromConfig(config);
migrate(migrationSettings);
FlamingoSettings settings = FlamingoSettings.fromConfig(config);
migrate(settings);
}

public static void migrate(MigrationSettings migrationSettings) throws IOException, NoSuchAlgorithmException, KeyStoreException, KeyManagementException, ParseException {
ElasticsearchSettings elasticsearchSettings = migrationSettings.getElasticsearchSettings();
ElasticsearchClient esClient = new ElasticsearchClient(elasticsearchSettings);
String esIndexName = elasticsearchSettings.getIndexName();
String esTypeName = migrationSettings.getTypeName();
List<File> scripts = ScriptFile.getScripts(migrationSettings.getScriptsDir());
public static int migrate(FlamingoSettings settings) throws IOException, NoSuchAlgorithmException, KeyStoreException, KeyManagementException {
ScriptManager scriptManager = new ScriptManager(settings);
List<File> scripts = ScriptFile.getScripts(settings.getScriptsDir());
if (scripts.isEmpty()) {
System.out.println("No script, no migration");
System.exit(0);
return 0;
} else {
if (!esClient.indexExists()) {
if (!scriptManager.indexFlamingoExists()) {
System.out.println("Index doesn't exist, creating index");
if (!esClient.createIndex()) {
throw new RuntimeException("Error while creating index " + esIndexName);
}
}
if (!esClient.typeExists()) {
if (!esClient.createType()) {
throw new RuntimeException("Error while creating type " + esTypeName + " in index " + esIndexName);
if (!scriptManager.createFlamingoIndex()) {
throw new RuntimeException("Error while creating index " + settings.getIndexName());
}
}

JSONObject latestSuccessfulScript = esClient.getLatestSuccessfulScript();
System.out.println("latestSuccessfulScript " + latestSuccessfulScript);
Long count = esClient.count();
Long count = scriptManager.count();
if (count == 0) {
// No script were executed, starting migration from the beginning
for (File script : scripts) {
esClient.executeScript(script);
scriptManager.executeScript(script);
}
} else {
JSONArray scriptObjects = esClient.list(count);
// Check consistency (exists + checksum)
for (Object scriptObject : scriptObjects) {
JSONObject scriptJson = (JSONObject) scriptObject;
final String fileName = (String) ((JSONObject) scriptJson.get("_source")).get("fileName");
final String checksum = (String) ((JSONObject) scriptJson.get("_source")).get("checksum");
Optional<File> scriptFound = Iterables.tryFind(scripts, new Predicate<File>() {
@Override
public boolean apply(File input) {
return fileName.equals(input.getName());
}
});
if (scriptFound.isPresent()) {
String currentChecksum = Files.hash(scriptFound.get(), Hashing.md5()).toString();
if (!Objects.equals(checksum, currentChecksum)) {
throw new IllegalStateException("Abort migration. Checksum is different for script " + fileName + "!");
}
} else {
throw new IllegalStateException("Abort migration. Script " + fileName + " doesn't exist anymore!");
return scripts.size();
}
// No more than one fail script
List<ScriptMetadata> failScripts = scriptManager.getFailScripts();
if (failScripts.size() > 1) {
throw new IllegalStateException("Abort migration. More than one failed script!");
}
// Check consistency (exists + checksum)
List<ScriptMetadata> scriptsMetadata = scriptManager.list(count);
for (ScriptMetadata scriptMetadata : scriptsMetadata) {
final String fileName = scriptMetadata.getFileName();
final String checksum = scriptMetadata.getChecksum();
Optional<File> scriptFound = Iterables.tryFind(scripts, new Predicate<File>() {
@Override
public boolean apply(File input) {
return fileName.equals(input.getName());
}
}
// No more than one fail script
JSONArray failScripts = esClient.getFailScripts();
if (failScripts.size() > 1) {
throw new IllegalStateException("Abort migration. More than one failed script!");
}
if (!failScripts.isEmpty()) {
JSONObject failScriptJson = (JSONObject) failScripts.get(0);
JSONObject latestScript = esClient.getLatestScript();
String id = (String) failScriptJson.get("_id");
if (!Objects.equals(id, latestScript.get("_id"))) {
throw new IllegalArgumentException("Abort migration. Fail script must be the latest script!");
});
if (scriptFound.isPresent()) {
String currentChecksum = Files.hash(scriptFound.get(), Hashing.md5()).toString();
if (!Objects.equals(checksum, currentChecksum)) {
throw new IllegalStateException("Abort migration. Checksum is different for script " + fileName + "!");
}
final String fileName = (String) ((JSONObject) failScriptJson.get("_source")).get("fileName");
// Retry failed script
System.out.println("Retrying failed script " + fileName);
File script = Iterables.find(scripts, new Predicate<File>() {
@Override
public boolean apply(File input) {
return fileName.equals(input.getName());
}
});
esClient.updateScript(script, id);
scripts.remove(scriptFound.get());
} else {
throw new IllegalStateException("Abort migration. Script " + fileName + " doesn't exist anymore!");
}
for (File script : scripts) {
if (esClient.searchScript(script.getName()) == null) {
esClient.executeScript(script);
}
if (!failScripts.isEmpty()) {
ScriptMetadata failScript = failScripts.get(0);
ScriptMetadata latestScript = scriptManager.getLatestScript();
String id = failScript.getId();
if (!Objects.equals(id, latestScript.getId())) {
throw new IllegalArgumentException("Abort migration. Fail script must be the latest script!");
}
final String fileName = failScript.getFileName();
// Retry failed script
System.out.println("Retrying failed script " + fileName);
File script = Iterables.find(scripts, new Predicate<File>() {
@Override
public boolean apply(File input) {
return fileName.equals(input.getName());
}
});
scriptManager.updateScript(script, id);
}
if (scripts.isEmpty()) {
System.out.println("No new script to apply");
return 0;
}
int result = 0;
for (File script : scripts) {
if (!scriptManager.scriptExists(script.getName())) {
scriptManager.executeScript(script);
result++;
}
}
return result;
}
}
}
54 changes: 0 additions & 54 deletions src/main/java/org/indusbox/flamingo/MigrationSettings.java

This file was deleted.

Loading

0 comments on commit a2cd50a

Please sign in to comment.