Skip to content

Commit

Permalink
Merge branch 'master' into 1.0
Browse files Browse the repository at this point in the history
  • Loading branch information
jprante committed Dec 20, 2014
2 parents e6a6537 + be34dea commit a7971e1
Show file tree
Hide file tree
Showing 9 changed files with 127 additions and 96 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ Creating a JDBC river is easy:

- install the plugin

- download a JDBC driver jar from your vendor's site (for example MySQL) and put the jar into the folder of the plugin `$ES_HOME/plugins/river-jdbc`.
- download a JDBC driver jar from your vendor's site (for example MySQL) and put the jar into the folder of the plugin `$ES_HOME/plugins/jdbc`.

Assuming you have a table of name `orders`, you can issue this simple command from the command line

Expand Down Expand Up @@ -159,7 +159,7 @@ Internet access (of course)
5. Add MySQL JDBC driver jar to JDBC river plugin directory and set access permission for .jar file (at least chmod 644)

`cp mysql-connector-java-5.1.33-bin.jar $ES_HOME/plugins/jdbc/`
`chmod 644 $ES_HOME/plugins/jdbc/`
`chmod 644 $ES_HOME/plugins/jdbc/*`

6. Start elasticsearch from terminal window

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@
*/
package org.xbib.elasticsearch.plugin.jdbc.cron;

import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.ESLoggerFactory;

import java.util.Date;
import java.util.concurrent.CancellationException;
import java.util.concurrent.Future;
Expand All @@ -29,6 +32,9 @@
* to calculate future execution times for scheduled tasks.
*/
public class CronThreadPoolExecutor extends ScheduledThreadPoolExecutor implements CronExecutorService {

private final static ESLogger logger = ESLoggerFactory.getLogger("river.jdbc.CronThreadPoolExecutor");

/**
* Constructs a new CronThreadPoolExecutor.
*
Expand Down Expand Up @@ -74,11 +80,8 @@ public Future<?> schedule(final Runnable task, final CronExpression expression)
if (task == null) {
throw new NullPointerException();
}
this.setCorePoolSize(this.getCorePoolSize() + 1);
setCorePoolSize(getCorePoolSize() + 1);
Runnable scheduleTask = new Runnable() {
/**
* @see Runnable#run()
*/
@Override
public void run() {
Date now = new Date();
Expand All @@ -92,13 +95,10 @@ public void run() {
}
time = expression.getNextValidTimeAfter(now);
}
} catch (RejectedExecutionException e) {
//
} catch (CancellationException e) {
//
} catch (InterruptedException e) {
//
Thread.currentThread().interrupt();
} catch (RejectedExecutionException | CancellationException e) {
logger.error(e.getMessage(), e);
}
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ private List<Future<?>> schedule(Thread thread) {
Long seconds = settings.getAsTime("interval", TimeValue.timeValueSeconds(0)).seconds();
if (schedule != null && schedule.length > 0) {
CronThreadPoolExecutor cronThreadPoolExecutor =
new CronThreadPoolExecutor(settings.getAsInt("threadpoolsize", 4));
new CronThreadPoolExecutor(settings.getAsInt("threadpoolsize", 1));
for (String cron : schedule) {
futures.add(cronThreadPoolExecutor.schedule(thread, new CronExpression(cron)));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ private List<Future<?>> schedule(Thread thread) {
List<Future<?>> futures = newLinkedList();
if (schedule != null && schedule.length > 0) {
CronThreadPoolExecutor cronThreadPoolExecutor =
new CronThreadPoolExecutor(settings.getAsInt("threadpoolsize", 4));
new CronThreadPoolExecutor(settings.getAsInt("threadpoolsize", 1));
for (String cron : schedule) {
futures.add(cronThreadPoolExecutor.schedule(thread, new CronExpression(cron)));
}
Expand Down
118 changes: 75 additions & 43 deletions src/main/java/org/xbib/elasticsearch/plugin/jdbc/state/RiverState.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,6 @@
*/
public class RiverState implements Streamable, ToXContent, Comparable<RiverState> {

private final static DateTime EMPTY_DATETIME = new DateTime(0L);

/**
* The name of the river instance
*/
Expand All @@ -61,12 +59,12 @@ public class RiverState implements Streamable, ToXContent, Comparable<RiverState
/*
* The time of the last river activity
*/
private DateTime begin;
private DateTime lastActiveBegin;

/*
* The time when the last river activity ended
*/
private DateTime end;
private DateTime lastActiveEnd;

/**
* A custom map for more information about the river
Expand Down Expand Up @@ -120,43 +118,23 @@ public DateTime getStarted() {
* @return this state
*/
public RiverState setLastActive(DateTime begin, DateTime end) {
if (begin != null) {
this.begin = begin;
}
if (end != null) {
this.end = end;
}
this.lastActiveBegin = begin;
this.lastActiveEnd = end;
return this;
}

/**
* @return the begin of the last river activity
*/
public DateTime getLastActiveBegin() {
return begin != null ? begin : EMPTY_DATETIME;
return lastActiveBegin;
}

/**
* @return the end of the last river activity
*/
public DateTime getLastActiveEnd() {
return end != null ? end : EMPTY_DATETIME;
}

/**
* Was the river active at a certain time? Only the last activity can be checked.
*
* @param instant the time to check
* @return true if river was active, false if not
*/
public boolean wasActiveAt(DateTime instant) {
return instant != null
&& begin != null && begin.getMillis() != 0L && begin.isBefore(instant)
&& (end == null || end.getMillis() == 0L || end.isAfter(instant));
}

public boolean wasInactiveAt(DateTime instant) {
return !wasActiveAt(instant);
return lastActiveEnd;
}

public RiverState setCounter(Integer counter) {
Expand All @@ -178,19 +156,51 @@ public Map<String, Object> getCustom() {
return (Map<String, Object>) this.map.get("custom");
}

public boolean isAborted() {
return map.containsKey("aborted") ? (Boolean) map.get("aborted") : false;
}

public boolean isSuspended() {
return map.containsKey("suspended") ? (Boolean) map.get("suspended") : false;
}

public RiverState setLastStartDate(long lastStartDate) {
this.map.put("lastStartDate", lastStartDate);
return this;
}

public long getLastStartDate() {
return (long)this.map.get("lastStartDate");
}

public RiverState setLastEndDate(long lastEndDate) {
this.map.put("lastEndDate", lastEndDate);
return this;
}

public long getLastEndDate() {
return (long)this.map.get("lastEndDate");
}

public RiverState setLastExecutionStartDate(long lastExecutionStartDate) {
this.map.put("lastExecutionStartDate", lastExecutionStartDate);
return this;
}

public long getLastExecutionStartDate() {
return (long)this.map.get("lastExecutionStartDate");
}

public RiverState setLastExecutionEndDate(long lastExecutionEndDate) {
this.map.put("lastExecutionEndDate", lastExecutionEndDate);
return this;
}

public long getLastExecutionEndDate() {
return (long)this.map.get("lastExecutionEndDate");
}

public RiverState fromXContent(XContentParser parser) throws IOException {
DateTimeFormatter dateTimeFormatter = ISODateTimeFormat.dateOptionalTimeParser().withZone(DateTimeZone.UTC);
Long startTimestamp = 0L;
Long begin = 0L;
Long end = 0L;
Long begin = null;
Long end = null;
String name = null;
String type = null;
String currentFieldName = null;
Expand All @@ -213,11 +223,11 @@ public RiverState fromXContent(XContentParser parser) throws IOException {
break;
case "last_active_begin":
begin = parser.text() != null && !"null".equals(parser.text()) ?
dateTimeFormatter.parseMillis(parser.text()) : 0L;
dateTimeFormatter.parseMillis(parser.text()) : null;
break;
case "last_active_end":
end = parser.text() != null && !"null".equals(parser.text()) ?
dateTimeFormatter.parseMillis(parser.text()) : 0L;
dateTimeFormatter.parseMillis(parser.text()) : null;
break;
}
} else if (token == START_OBJECT) {
Expand All @@ -228,7 +238,8 @@ public RiverState fromXContent(XContentParser parser) throws IOException {
.setName(name)
.setType(type)
.setStarted(new DateTime(startTimestamp))
.setLastActive(new DateTime(begin), new DateTime(end))
.setLastActive(begin != null ? new DateTime(begin) : null,
end != null ? new DateTime(end) : null)
.setMap(map);
}

Expand All @@ -250,19 +261,40 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
public void readFrom(StreamInput in) throws IOException {
this.name = in.readOptionalString();
this.type = in.readOptionalString();
this.started = new DateTime(in.readLong());
this.begin = new DateTime(in.readLong());
this.end = new DateTime(in.readLong());
if (in.readBoolean()) {
this.started = new DateTime(in.readLong());
}
if (in.readBoolean()) {
this.lastActiveBegin = new DateTime(in.readLong());
}
if (in.readBoolean()) {
this.lastActiveEnd = new DateTime(in.readLong());
}
map = in.readMap();
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeOptionalString(name);
out.writeOptionalString(type);
out.writeLong(started != null ? started.getMillis() : 0L);
out.writeLong(begin != null ? begin.getMillis() : 0L);
out.writeLong(end != null ? end.getMillis() : 0L);
if (started != null) {
out.writeBoolean(true);
out.writeLong(started.getMillis());
} else {
out.writeBoolean(false);
}
if (lastActiveBegin != null) {
out.writeBoolean(true);
out.writeLong(lastActiveBegin.getMillis());
} else {
out.writeBoolean(false);
}
if (lastActiveEnd != null) {
out.writeBoolean(true);
out.writeLong(lastActiveEnd.getMillis());
} else {
out.writeBoolean(false);
}
out.writeMap(map);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,14 +110,6 @@ public class SimpleRiverContext implements RiverContext {

private long lastRowCount;

private long lastStartDate;

private long lastEndDate;

private long lastExecutionStartDate;

private long lastExecutionEndDate;

private Map<String, Object> columnNameMap;

private Map<String, Object> lastRow = new HashMap<String, Object>();
Expand Down Expand Up @@ -337,39 +329,39 @@ public long getLastRowCount() {
}

public SimpleRiverContext setLastStartDate(long lastStartDate) {
this.lastStartDate = lastStartDate;
riverState.setLastStartDate(lastStartDate);
return this;
}

public long getLastStartDate() {
return lastStartDate;
return riverState.getLastStartDate();
}

public SimpleRiverContext setLastEndDate(long lastEndDate) {
this.lastEndDate = lastEndDate;
riverState.setLastEndDate(lastEndDate);
return this;
}

public long getLastEndDate() {
return lastEndDate;
return riverState.getLastEndDate();
}

public SimpleRiverContext setLastExecutionStartDate(long lastExecutionStartDate) {
this.lastExecutionStartDate = lastExecutionStartDate;
riverState.setLastExecutionStartDate(lastExecutionStartDate);
return this;
}

public long getLastExecutionStartDate() {
return lastExecutionStartDate;
return riverState.getLastExecutionStartDate();
}

public SimpleRiverContext setLastExecutionEndDate(long lastExecutionEndDate) {
this.lastExecutionEndDate = lastExecutionEndDate;
riverState.setLastExecutionEndDate(lastExecutionEndDate);
return this;
}

public long getLastExecutionEndDate() {
return lastExecutionEndDate;
return riverState.getLastExecutionEndDate();
}

public SimpleRiverContext setColumnNameMap(Map<String, Object> columnNameMap) {
Expand Down Expand Up @@ -473,10 +465,10 @@ public Map<String, Object> asMap() {
.field("shouldignorenull", shouldIgnoreNull)
.field("lastResultSetMetadata", lastResultSetMetadata)
.field("lastDatabaseMetadata", lastDatabaseMetadata)
.field("lastStartDate", lastStartDate)
.field("lastEndDate", lastEndDate)
.field("lastExecutionStartDate", lastExecutionStartDate)
.field("lastExecutionEndDate", lastExecutionEndDate)
.field("lastStartDate", riverState.getLastStartDate())
.field("lastEndDate", riverState.getLastEndDate())
.field("lastExecutionStartDate", riverState.getLastExecutionStartDate())
.field("lastExecutionEndDate", riverState.getLastExecutionEndDate())
.field("columnNameMap", columnNameMap)
.field("lastRow", lastRow)
.field("sql", sql)
Expand Down
Loading

0 comments on commit a7971e1

Please sign in to comment.