diff --git a/src/main/java/com/redislabs/Connector.java b/src/main/java/com/redislabs/Connector.java index ba27080..5fcb9fa 100644 --- a/src/main/java/com/redislabs/Connector.java +++ b/src/main/java/com/redislabs/Connector.java @@ -1,36 +1,28 @@ package com.redislabs; -import java.io.Serializable; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.HashMap; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.UUID; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentLinkedDeque; -import java.util.stream.Collectors; - +import gears.ExecutionMode; +import gears.GearsBuilder; +import gears.LogLevel; +import gears.operations.*; +import gears.readers.StreamReader; +import gears.readers.StreamReader.FailurePolicy; import org.hibernate.Session; import org.hibernate.Transaction; +import org.hibernate.TransactionException; import org.hibernate.boot.registry.StandardServiceInitiator; import org.hibernate.boot.registry.StandardServiceRegistryBuilder; import org.hibernate.boot.registry.internal.StandardServiceRegistryImpl; +import org.hibernate.exception.JDBCConnectionException; import org.hibernate.service.Service; +import org.hibernate.service.spi.ServiceException; import org.hibernate.service.spi.ServiceRegistryImplementor; -import gears.ExecutionMode; -import gears.GearsBuilder; -import gears.LogLevel; -import gears.operations.AccumulateOperation; -import gears.operations.ForeachOperation; -import gears.operations.MapOperation; -import gears.operations.OnRegisteredOperation; -import gears.operations.OnUnregisteredOperation; -import gears.readers.StreamReader; -import gears.readers.StreamReader.FailurePolicy; +import java.io.Serializable; +import java.util.*; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedDeque; +import java.util.stream.Collectors; +import java.util.stream.Stream; public class Connector implements ForeachOperation>>, AccumulateOperation, ArrayList>>, @@ -41,7 +33,8 @@ public class Connector implements ForeachOperation{ private Map values; @@ -103,19 +96,21 @@ static Connector getConnector(String name) { private int batchSize; private int duration; private int retryInterval; + private boolean errorsToDLQ; public transient ConcurrentLinkedDeque queue = null; - + public Connector() {} - public Connector(String name, String xmlDef, int batchSize, int duration, int retryInterval) { + public Connector(String name, String xmlDef, int batchSize, int duration, int retryInterval, boolean errorsToDLQ) { this.name = name; this.xmlDef = xmlDef; + this.errorsToDLQ = errorsToDLQ; System.setProperty("javax.xml.bind.JAXBContextFactory", "org.eclipse.persistence.jaxb.JAXBContextFactory"); Thread.currentThread().setContextClassLoader(WriteBehind.class.getClassLoader()); StandardServiceRegistryImpl tempRegistry = (StandardServiceRegistryImpl)new StandardServiceRegistryBuilder() - .configure( InMemoryURLFactory.getInstance().build("configuration", this.xmlDef)) + .configure( InMemoryURLFactory.getInstance().build("configuration", xmlDef)) .build(); RGHibernateStandardServiceInitiator initiator = this.new RGHibernateStandardServiceInitiator(); @@ -146,9 +141,9 @@ public Connector(String name, String xmlDef, int batchSize, int duration, int re .map(this) .accumulate(this) .foreach(this) - .map(ArrayList>::size) + .map(ArrayList::size) .register(ExecutionMode.ASYNC_LOCAL, this, this); - + connector = RGHibernate.getOrCreate(name); connector.setXmlConf(xmlDef); connectors.put(this.name, this); @@ -202,7 +197,7 @@ public HashMap map(HashMap r) throws Exception { String sourceName = map.get(SOURCE_STR); - Source source = (WriteSource)Source.getSource(sourceName); + Source source = Source.getSource(sourceName); Map newMap = new HashMap<>(); PropertyData idProperty = source.getIdProperty(); @@ -212,7 +207,7 @@ public HashMap map(HashMap r) throws Exception { try { convertedVal = idProperty.convertToObject(val); }catch (Exception e) { - String msg = String.format("Can not conver id property %s val %s, error='%s'", idProperty.getName(), val, e.toString()); + String msg = String.format("Can not convert id property %s val %s, error='%s'", idProperty.getName(), val, e); GearsBuilder.log(msg, LogLevel.WARNING); throw new Exception(msg); } @@ -231,7 +226,7 @@ public HashMap map(HashMap r) throws Exception { pm = source.getPropertyMapping(key); convertedVal = pm.convertToObject(val); }catch (Exception e) { - String msg = String.format("Can not find property mapping for %s val %s, error='%s'", key, val, e.toString()); + String msg = String.format("Can not find property mapping for %s val %s, error='%s'", key, val, e); GearsBuilder.log(msg, LogLevel.WARNING); throw new Exception(msg); } @@ -245,55 +240,93 @@ public HashMap map(HashMap r) throws Exception { return r; } + @Override public void foreach(ArrayList> record) throws Exception { String lastStreamId = null; String msg = null; Exception cause = null; + int lastCommittedIdx = -1; synchronized (this.connector) { try { Session session = connector.getSession(); Transaction transaction = session.beginTransaction(); boolean isMerge = true; - - for(Map r: record) { - lastStreamId = new String((byte[])r.get("id")); - Map map = (Map) r.get("value"); - String sourceName = (String)map.remove(SOURCE_STR); - - String event = (String)map.remove(EVENT_STR); - if(event.charAt(0) != 'd') { - if(!isMerge) { + + for (int idx = 0; idx < record.size(); ++idx) { + Map r = record.get(idx); + lastStreamId = new String((byte[]) r.get("id")); + Map value = (Map) r.get("value"); + Map map = value.entrySet().stream() + .collect(Collectors.toMap(Map.Entry::getKey, entry -> entry.getValue())); + String sourceName = (String) map.remove(SOURCE_STR); + + String event = (String) map.remove(EVENT_STR); + if (event.charAt(0) != 'd') { + if (!isMerge) { transaction.commit(); session.clear(); transaction = session.beginTransaction(); isMerge = true; + lastCommittedIdx = idx - 1; } - session.merge((String)map.remove(ENTETY_NAME_STR), map); - }else { - if(isMerge) { + session.merge((String) map.remove(ENTETY_NAME_STR), map); + } else { + if (isMerge) { transaction.commit(); session.clear(); transaction = session.beginTransaction(); isMerge = false; + lastCommittedIdx = idx - 1; } Source source = Source.getSource(sourceName); - Object o = session.get((String)map.remove(ENTETY_NAME_STR), (Serializable)map.get(source.getIdProperty().getName())); + Object o = session.get((String) map.remove(ENTETY_NAME_STR), (Serializable) map.get(source.getIdProperty().getName())); // o can be null on hdel that removed the last field - if(o != null) { + if (o != null) { session.delete(o); } } } - + transaction.commit(); session.clear(); - }catch (Exception e) { - msg = String.format("Failed commiting transaction error='%s'", e.toString()); + } + catch (TransactionException|JDBCConnectionException|ServiceException ex) { + msg = String.format("Failed committing transaction error='%s'", ex); GearsBuilder.log(msg, LogLevel.WARNING); + lastStreamId = null; + cause = ex; connector.closeSession(); + + } + catch (Exception e) { + msg = String.format("Failed committing transaction error='%s'", e); + GearsBuilder.log(msg, LogLevel.WARNING); + lastStreamId = null; cause = e; + + if ( errorsToDLQ ) { + if ( connector.getSession().getTransaction().isActive() ) { + connector.getSession().getTransaction().rollback(); + } + + connector.getSession().clear(); + + try { + retry(record.subList(lastCommittedIdx + 1, record.size())); + msg = null; + } + catch (Exception ex) { + msg = String.format("Failed committing transaction error='%s'", ex); + GearsBuilder.log(msg, LogLevel.WARNING); + cause = ex; + } + + } + else { + connector.closeSession(); + } } while(!queue.isEmpty()) { @@ -310,9 +343,108 @@ public void foreach(ArrayList> record) throws Exception throw new Exception(msg, cause); } } - } - + + private void retry(List> record) throws Exception { + for(Map r: record) { + try { + Session session = connector.getSession(); + Transaction transaction = session.beginTransaction(); + + Map value = (Map) r.get("value"); + Map map = value.entrySet().stream() + .collect(Collectors.toMap(Map.Entry::getKey, entry -> entry.getValue())); + String sourceName = (String) map.remove(SOURCE_STR); + + String event = (String) map.remove(EVENT_STR); + if (event.charAt(0) != 'd') { + session.merge((String) map.remove(ENTETY_NAME_STR), map); + } else { + Source source = Source.getSource(sourceName); + Object o = session.get((String) map.remove(ENTETY_NAME_STR), (Serializable) map.get(source.getIdProperty().getName())); + // o can be null on hdel that removed the last field + if (o != null) { + session.delete(o); + } + } + transaction.commit(); + session.clear(); + } catch (TransactionException|JDBCConnectionException|ServiceException ex) { + connector.closeSession(); + throw ex; + } catch (Exception e) { + String msg = String.format("Failed retrying transaction error='%s'", e); + GearsBuilder.log(msg, LogLevel.WARNING); + + if (connector.getSession().getTransaction().isActive()) { + connector.getSession().getTransaction().rollback(); + } + + connector.getSession().clear(); + putInDLQ(r, e); + } + } + } + + private void putInDLQ(Map r, Exception e) throws Exception { + String streamName = new String((byte[])r.get("key")); + Map map = (Map) r.get("value"); + + Map newMap = new HashMap<>(); + + newMap.put("error", e.getMessage()); + Throwable ex = e; + while (ex.getCause() != null) + ex = ex.getCause(); + newMap.put("cause", ex.getLocalizedMessage()); + + String[] command; + Stream commandStreamInit = Stream.of("XADD", DLQ_STR + streamName, "*"); + + String msg = null; + String sourceName = (String)map.remove(SOURCE_STR); + Source source = Source.getSource(sourceName); + PropertyData idProperty = source.getIdProperty(); + Object idVal = map.remove(idProperty.getName()); + String idStr = null; + try { + idStr = idProperty.convertToStr(idVal); + }catch (Exception propEx) { + msg = String.format("Can not convert id property %s val %s, error='%s'", idProperty.getName(), idVal, propEx); + GearsBuilder.log(msg, LogLevel.WARNING); + throw new Exception(msg); + } + + newMap.put(idProperty.getName(), idStr); + newMap.put(ENTETY_NAME_STR, map.remove(ENTETY_NAME_STR).toString()); + newMap.put(EVENT_STR, map.remove(EVENT_STR).toString()); + newMap.put(SOURCE_STR, sourceName); + + for(String key : map.keySet()) { + Object val = map.get(key); + + PropertyData pm = null; + String convertedVal = null; + try { + pm = source.getPropertyMapping(key); + convertedVal = pm.convertToStr(val); + } catch (Exception propEx) { + msg = String.format("Can not find property mapping for %s val %s, error='%s'", key, val, propEx); + GearsBuilder.log(msg, LogLevel.WARNING); + throw new Exception(msg); + } + + + newMap.put(key, convertedVal); + } + + Stream fieldsStream = newMap.entrySet().stream() + .flatMap(entry -> Stream.of(entry.getKey(), entry.getValue())); + + command = Stream.concat(commandStreamInit, fieldsStream).toArray(String[]::new); + GearsBuilder.execute(command); + } + public Object getObject(String entetyName, Serializable pk) throws Exception { Object o = null; synchronized (this.connector) { @@ -321,7 +453,7 @@ public Object getObject(String entetyName, Serializable pk) throws Exception { session.clear(); o = session.get(entetyName, pk); }catch(Exception e) { - String msg = String.format("Failed fetching data from databse, error='%s'", e.toString()); + String msg = String.format("Failed fetching data from databse, error='%s'", e); GearsBuilder.log(msg, LogLevel.WARNING); connector.closeSession(); throw e; @@ -395,6 +527,10 @@ public int getRetryInterval() { return retryInterval; } + public boolean getErrorsToDLQ() { + return errorsToDLQ; + } + public void setName(String name) { this.name = name; } @@ -438,6 +574,9 @@ public void setDuration(int duration) { public void setRetryInterval(int retryInterval) { this.retryInterval = retryInterval; } - + + public void setErrorsToDLQ(boolean errorsToDLQ) { + this.errorsToDLQ = errorsToDLQ; + } } diff --git a/src/main/java/com/redislabs/PropertyData.java b/src/main/java/com/redislabs/PropertyData.java index d46b695..692af65 100644 --- a/src/main/java/com/redislabs/PropertyData.java +++ b/src/main/java/com/redislabs/PropertyData.java @@ -1,12 +1,12 @@ package com.redislabs; -import java.io.Serializable; -import java.sql.Date; - import org.hibernate.type.AbstractStandardBasicType; import org.hibernate.type.DbTimestampType; import org.hibernate.type.Type; +import java.io.Serializable; +import java.sql.Date; + public class PropertyData implements Serializable{ /** diff --git a/src/main/java/com/redislabs/RGHibernate.java b/src/main/java/com/redislabs/RGHibernate.java index f6f28ec..07a4559 100644 --- a/src/main/java/com/redislabs/RGHibernate.java +++ b/src/main/java/com/redislabs/RGHibernate.java @@ -1,5 +1,13 @@ package com.redislabs; +import gears.GearsBuilder; +import org.hibernate.Session; +import org.hibernate.SessionFactory; +import org.hibernate.boot.Metadata; +import org.hibernate.boot.MetadataSources; +import org.hibernate.boot.registry.StandardServiceRegistry; +import org.hibernate.boot.registry.StandardServiceRegistryBuilder; + import java.io.Closeable; import java.io.IOException; import java.io.Serializable; @@ -8,19 +16,7 @@ import java.sql.Driver; import java.sql.DriverManager; import java.sql.SQLException; -import java.util.Collection; -import java.util.Enumeration; -import java.util.HashMap; -import java.util.Map; - -import org.hibernate.Session; -import org.hibernate.SessionFactory; -import org.hibernate.boot.Metadata; -import org.hibernate.boot.MetadataSources; -import org.hibernate.boot.registry.StandardServiceRegistry; -import org.hibernate.boot.registry.StandardServiceRegistryBuilder; - -import gears.GearsBuilder; +import java.util.*; public class RGHibernate implements Closeable, Serializable { @@ -42,9 +38,9 @@ public static RGHibernate get(String name) { return hibernateConnections.get(name); } - private String name; + private final String name; private String xmlConf; - private Map sources; + private final Map sources; private transient Session session = null; private transient SessionFactory factory = null; private transient StandardServiceRegistry registry = null; @@ -191,7 +187,7 @@ public void close() throws IOException { } } catch (SQLException e) { - GearsBuilder.log(String.format("Exception on deregister drivers, %s", e.toString())); + GearsBuilder.log(String.format("Exception on deregister drivers, %s", e)); } // Closing timer thread for oracle driver not to leak.. diff --git a/src/main/java/com/redislabs/ReadSource.java b/src/main/java/com/redislabs/ReadSource.java index 97b7365..82926b6 100644 --- a/src/main/java/com/redislabs/ReadSource.java +++ b/src/main/java/com/redislabs/ReadSource.java @@ -1,5 +1,11 @@ package com.redislabs; +import gears.ExecutionMode; +import gears.GearsBuilder; +import gears.operations.ForeachOperation; +import gears.readers.KeysReader; +import gears.records.KeysReaderRecord; + import java.io.Serializable; import java.util.ArrayList; import java.util.Iterator; @@ -9,12 +15,6 @@ import java.util.stream.Collectors; import java.util.stream.StreamSupport; -import gears.ExecutionMode; -import gears.GearsBuilder; -import gears.operations.ForeachOperation; -import gears.readers.KeysReader; -import gears.records.KeysReaderRecord; - public class ReadSource extends Source implements ForeachOperation{ /** @@ -58,7 +58,7 @@ public void foreach(KeysReaderRecord r) throws Exception { try { res = (Map)getConnectorObj().getObject(getHashPrefix(), (Serializable)pk); } catch (Exception e) { - GearsBuilder.overrideReply(String.format("-ERR %s", e.toString())); + GearsBuilder.overrideReply(String.format("-ERR %s", e)); throw e; } if (res != null) { diff --git a/src/main/java/com/redislabs/Source.java b/src/main/java/com/redislabs/Source.java index dfdff65..fb281a9 100644 --- a/src/main/java/com/redislabs/Source.java +++ b/src/main/java/com/redislabs/Source.java @@ -9,6 +9,7 @@ import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import gears.LogLevel; import org.hibernate.boot.Metadata; import org.hibernate.boot.MetadataSources; import org.hibernate.boot.registry.StandardServiceRegistry; @@ -61,11 +62,11 @@ public Source(String connector, String name, String xmlDef) { this.name = name; this.xmlDef = xmlDef; this.propertyMappings = new HashMap<>(); - + StandardServiceRegistry tempRegistry = new StandardServiceRegistryBuilder() - .configure( InMemoryURLFactory.getInstance().build("configuration", RGHibernate.get(connector).getXmlConf())) + .configure( InMemoryURLFactory.getInstance().build("configuration", RGHibernate.get(connector).getXmlConf())) .build(); - + MetadataSources tempSources = new MetadataSources(tempRegistry); tempSources.addURL(InMemoryURLFactory.getInstance().build("mapping", xmlDef)); Metadata metadata = tempSources.getMetadataBuilder().build(); diff --git a/src/main/java/com/redislabs/WriteBehind.java b/src/main/java/com/redislabs/WriteBehind.java index 43ced6d..6d2e405 100644 --- a/src/main/java/com/redislabs/WriteBehind.java +++ b/src/main/java/com/redislabs/WriteBehind.java @@ -1,22 +1,19 @@ package com.redislabs; -import java.io.Serializable; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.LinkedList; -import java.util.List; -import java.util.stream.Collectors; - import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.type.TypeFactory; - import gears.ExecutionMode; import gears.GearsBuilder; import gears.operations.FlatMapOperation; import gears.readers.CommandReader; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collection; +import java.util.LinkedList; +import java.util.stream.Collectors; + public class WriteBehind{ public static final int VERSION = 0x00010100; @@ -85,7 +82,7 @@ public static String getStringVersion(int version) { public static void main(String[] args) throws Exception { String verStr = getStringVersion(VERSION); if(args.length == 1 && args[0].equals("version")) { - System.out.print(String.format("%s\r\n", verStr)); + System.out.printf("%s\r\n", verStr); return; } GearsBuilder.log(String.format("RGHibernate %s", verStr)); @@ -109,20 +106,23 @@ public static void main(String[] args) throws Exception { GearsBuilder.CreateGearsBuilder(newConnectorReader, "Register a new connector"). map(r->{ String connectorName = new String((byte[])r[1]); - String connectorXml = new String((byte[])r[5]); + String connectorXml = new String((r.length > 6) ? (byte[])r[6] : (byte[])r[5]); int batchSize = Integer.parseInt(new String((byte[])r[2])); int duration = Integer.parseInt(new String((byte[])r[3])); int retryInterval = Integer.parseInt(new String((byte[])r[4])); + boolean dlq = false; + if (r.length > 6) + dlq = Boolean.parseBoolean(new String((byte[])r[5])); if(Connector.getConnector(connectorName)!=null) { throw new Exception("connector already exists"); } - new Connector(connectorName, connectorXml, batchSize, duration, retryInterval); + new Connector(connectorName, connectorXml, batchSize, duration, retryInterval, dlq); return "OK"; }).register(ExecutionMode.SYNC); // add source registration CommandReader newSourceReader = new CommandReader().setTrigger("SYNC.REGISTERSOURCE"); - GearsBuilder.CreateGearsBuilder(newSourceReader, "Registe a new source"). + GearsBuilder.CreateGearsBuilder(newSourceReader, "Register a new source"). map(r->{ String sourceName = new String((byte[])r[1]); String connectorName = new String((byte[])r[2]); @@ -131,6 +131,7 @@ public static void main(String[] args) throws Exception { String sourceXml = null; boolean isWrite = true; boolean writeThrough = false; + boolean writeOnChange = true; if(writePolicy.equals("WriteThrough")) { isWrite = true; writeThrough = true; @@ -139,11 +140,23 @@ public static void main(String[] args) throws Exception { }catch (Exception e) { throw new Exception("Could not parse timeout argument"); } - sourceXml = new String((byte[])r[5]); + if ( r.length > 6 ) { + writeOnChange = Boolean.parseBoolean(new String((byte[])r[5])); + sourceXml = new String((byte[])r[6]); + } + else { + sourceXml = new String((byte[]) r[5]); + } }else if(writePolicy.equals("WriteBehind")) { isWrite = true; writeThrough = false; - sourceXml = new String((byte[])r[4]); + if ( r.length > 5 ) { + writeOnChange = Boolean.parseBoolean(new String((byte[])r[4])); + sourceXml = new String((byte[])r[5]); + } + else { + sourceXml = new String((byte[]) r[4]); + } }else if(writePolicy.equals("ReadThrough")) { isWrite = false; try { @@ -165,7 +178,7 @@ public static void main(String[] args) throws Exception { } Source s = null; if (isWrite) { - s = new WriteSource(sourceName, connectorName, sourceXml, writeThrough, timeout); + s = new WriteSource(sourceName, connectorName, sourceXml, writeThrough, timeout, writeOnChange); }else { s = new ReadSource(sourceName, connectorName, sourceXml, timeout); } @@ -188,7 +201,7 @@ public static void main(String[] args) throws Exception { // remove connector CommandReader newRemoveConnectorReader = new CommandReader().setTrigger("SYNC.UNREGISTERCONNECTOR"); - GearsBuilder.CreateGearsBuilder(newRemoveConnectorReader, "Unregiste connector"). + GearsBuilder.CreateGearsBuilder(newRemoveConnectorReader, "Unregister connector"). map(r->{ String connectorName = new String((byte[])r[1]); Connector c = Connector.getConnector(connectorName); @@ -245,14 +258,14 @@ public Iterable flatmap(Object[] r) throws Exception { for(Connector c: updateInfo.getConnectors()) { GearsBuilder.log(String.format("Register connector %s", c.getName())); - new Connector(c.getName(), c.getXmlDef(), c.getBatchSize(), c.getDuration(), c.getRetryInterval()); + new Connector(c.getName(), c.getXmlDef(), c.getBatchSize(), c.getDuration(), c.getRetryInterval(), c.getErrorsToDLQ()); } for(Source temp: updateInfo.getSources()) { GearsBuilder.log(String.format("Register source %s to connector %s", temp.getName(), temp.getConnector())); if(temp instanceof WriteSource) { WriteSource s = (WriteSource)temp; - new WriteSource(s.getName(), s.getConnector(), s.getXmlDef(), s.isWriteThrough(), s.getTimeout()); + new WriteSource(s.getName(), s.getConnector(), s.getXmlDef(), s.isWriteThrough(), s.getTimeout(), s.writeOnChange); } else if(temp instanceof ReadSource) { ReadSource s = (ReadSource)temp; new ReadSource(s.getName(), s.getConnector(), s.getXmlDef(), s.getExpire()); diff --git a/src/main/java/com/redislabs/WriteSource.java b/src/main/java/com/redislabs/WriteSource.java index a740b82..b45de28 100644 --- a/src/main/java/com/redislabs/WriteSource.java +++ b/src/main/java/com/redislabs/WriteSource.java @@ -25,23 +25,28 @@ public class WriteSource extends Source{ private int timeout; private boolean writeThrough; + boolean writeOnChange; public WriteSource() { super(); } - public WriteSource(String name, String connector, String xmlDef, boolean writeThrough, int timeout) { + public WriteSource(String name, String connector, String xmlDef, boolean writeThrough, int timeout, boolean writeOnChange) { super(connector, name, xmlDef); this.writeThrough = writeThrough; this.timeout = timeout; + this.writeOnChange = writeOnChange; Connector c = getConnectorObj(); this.streamName = String.format("_Stream-%s-%s", connector, c.getUuid()); KeysReader reader = new KeysReader(). - setPattern(getHashPrefix() + ":*"). - setEventTypes(new String[] {"hset", "hmset", "hincrbyfloat", "hincrby", "hdel", "del", "change"}); + setPattern(getHashPrefix() + ":*"); + if ( writeOnChange ) + reader.setEventTypes(new String[] {"hset", "hmset", "hincrbyfloat", "hincrby", "hdel", "del", "change"}); + else + reader.setEventTypes(new String[] {"hset", "hmset", "hincrbyfloat", "hincrby", "hdel", "del"}); if(writeThrough) { /* @@ -56,7 +61,7 @@ public WriteSource(String name, String connector, String xmlDef, boolean writeTh try { return this.asyncforeach(r); } catch (Exception e) { - GearsBuilder.log(String.format("Error: ", e.toString())); + GearsBuilder.log(String.format("Error: ", e)); return null; } }). @@ -103,11 +108,11 @@ public GearsFuture asyncforeach(KeysReaderRecord record) throws Exceptio streamId = foreachInternal(record); - WriteThroughMD wtMD = new WriteThroughMD(streamId, f, timeout * 1000); + WriteThroughMD wtMD = new WriteThroughMD(streamId, f, timeout * 1000L); Connector c = Connector.getConnector(this.getConnector()); c.queue.add(wtMD); }catch(Exception e) { - GearsBuilder.overrideReply(String.format("-Err %s", e.toString())); + GearsBuilder.overrideReply(String.format("-Err %s", e)); f.setError(e.toString()); }finally { GearsBuilder.releaseRedisGil(); @@ -119,7 +124,7 @@ public void foreach(KeysReaderRecord record) throws Exception { try { foreachInternal(record); } catch(Exception e) { - GearsBuilder.log(String.format("-Err %s", e.toString())); + GearsBuilder.log(String.format("-Err %s", e)); throw e; } } @@ -133,7 +138,7 @@ public String foreachInternal(KeysReaderRecord record) throws Exception { try { getIdProperty().convertToObject(id); }catch (Exception e) { - String msg = String.format("Failed parsing id field \"%s\", value \"%s\", error=\"%s\"", getIdProperty().getName(), id, e.toString()); + String msg = String.format("Failed parsing id field \"%s\", value \"%s\", error=\"%s\"", getIdProperty().getName(), id, e); GearsBuilder.log(msg, LogLevel.WARNING); throw new Exception(msg); } @@ -161,7 +166,7 @@ Connector.ENTETY_NAME_STR, keySplit[0], getIdProperty().getName(), keySplit[1], throw new Exception(String.format("mandatory \"%s\" value is not set", pd.getName())); } }catch(Exception e) { - String msg = String.format("Failed parsing acheme for field \"%s\", value \"%s\", error=\"%s\"", pd.getName(), val, e.toString()); + String msg = String.format("Failed parsing acheme for field \"%s\", value \"%s\", error=\"%s\"", pd.getName(), val, e); GearsBuilder.log(msg, LogLevel.WARNING); throw new Exception(msg); } @@ -212,6 +217,9 @@ public boolean isWriteThrough() { public int getTimeout() { return timeout; } + public boolean getWriteOnChange() { + return writeOnChange; + } public void setTimeout(int timeout) { this.timeout = timeout; @@ -224,4 +232,8 @@ public void setStreamName(String streamName) { public void setWriteThrough(boolean writeThrough) { this.writeThrough = writeThrough; } + + public void setWriteOnChange(boolean writeOnChange) { + this.writeOnChange = writeOnChange; + } }