-
Notifications
You must be signed in to change notification settings - Fork 7
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Allow continue on errors #102
Changes from 5 commits
0506cca
0beee18
d6446d3
f0f5530
8d99583
5057a77
55610e0
3c00a85
f4f16a2
603fb14
0bfb2bb
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,36 +1,28 @@ | ||
package com.redislabs; | ||
|
||
import java.io.Serializable; | ||
import java.util.ArrayList; | ||
import java.util.Arrays; | ||
import java.util.Collection; | ||
import java.util.HashMap; | ||
import java.util.Iterator; | ||
import java.util.List; | ||
import java.util.Map; | ||
import java.util.UUID; | ||
import java.util.concurrent.ConcurrentHashMap; | ||
import java.util.concurrent.ConcurrentLinkedDeque; | ||
import java.util.stream.Collectors; | ||
|
||
import gears.ExecutionMode; | ||
import gears.GearsBuilder; | ||
import gears.LogLevel; | ||
import gears.operations.*; | ||
import gears.readers.StreamReader; | ||
import gears.readers.StreamReader.FailurePolicy; | ||
import org.hibernate.Session; | ||
import org.hibernate.Transaction; | ||
import org.hibernate.TransactionException; | ||
import org.hibernate.boot.registry.StandardServiceInitiator; | ||
import org.hibernate.boot.registry.StandardServiceRegistryBuilder; | ||
import org.hibernate.boot.registry.internal.StandardServiceRegistryImpl; | ||
import org.hibernate.exception.JDBCConnectionException; | ||
import org.hibernate.service.Service; | ||
import org.hibernate.service.spi.ServiceException; | ||
import org.hibernate.service.spi.ServiceRegistryImplementor; | ||
|
||
import gears.ExecutionMode; | ||
import gears.GearsBuilder; | ||
import gears.LogLevel; | ||
import gears.operations.AccumulateOperation; | ||
import gears.operations.ForeachOperation; | ||
import gears.operations.MapOperation; | ||
import gears.operations.OnRegisteredOperation; | ||
import gears.operations.OnUnregisteredOperation; | ||
import gears.readers.StreamReader; | ||
import gears.readers.StreamReader.FailurePolicy; | ||
import java.io.Serializable; | ||
import java.util.*; | ||
import java.util.concurrent.ConcurrentHashMap; | ||
import java.util.concurrent.ConcurrentLinkedDeque; | ||
import java.util.stream.Collectors; | ||
import java.util.stream.Stream; | ||
|
||
public class Connector implements ForeachOperation<ArrayList<HashMap<String,Object>>>, | ||
AccumulateOperation<HashMap<String,Object>, ArrayList<HashMap<String, Object>>>, | ||
|
@@ -41,7 +33,8 @@ public class Connector implements ForeachOperation<ArrayList<HashMap<String,Obje | |
public static final String ENTETY_NAME_STR = "__entityName__"; | ||
public static final String EVENT_STR = "__event__"; | ||
public static final String SOURCE_STR = "__source__"; | ||
|
||
public static final String DLQ_STR = "_DLQ"; | ||
|
||
class RGHibernateStandardServiceInitiator implements StandardServiceInitiator<Service>{ | ||
|
||
private Map values; | ||
|
@@ -103,19 +96,21 @@ static Connector getConnector(String name) { | |
private int batchSize; | ||
private int duration; | ||
private int retryInterval; | ||
private boolean errorsToDLQ; | ||
|
||
public transient ConcurrentLinkedDeque<WriteThroughMD> queue = null; | ||
|
||
public Connector() {} | ||
|
||
public Connector(String name, String xmlDef, int batchSize, int duration, int retryInterval) { | ||
public Connector(String name, String xmlDef, int batchSize, int duration, int retryInterval, boolean errorsToDLQ) { | ||
this.name = name; | ||
this.xmlDef = xmlDef; | ||
this.errorsToDLQ = errorsToDLQ; | ||
|
||
System.setProperty("javax.xml.bind.JAXBContextFactory", "org.eclipse.persistence.jaxb.JAXBContextFactory"); | ||
Thread.currentThread().setContextClassLoader(WriteBehind.class.getClassLoader()); | ||
StandardServiceRegistryImpl tempRegistry = (StandardServiceRegistryImpl)new StandardServiceRegistryBuilder() | ||
.configure( InMemoryURLFactory.getInstance().build("configuration", this.xmlDef)) | ||
.configure( InMemoryURLFactory.getInstance().build("configuration", xmlDef)) | ||
.build(); | ||
|
||
RGHibernateStandardServiceInitiator initiator = this.new RGHibernateStandardServiceInitiator(); | ||
|
@@ -146,9 +141,9 @@ public Connector(String name, String xmlDef, int batchSize, int duration, int re | |
.map(this) | ||
.accumulate(this) | ||
.foreach(this) | ||
.map(ArrayList<HashMap<String, Object>>::size) | ||
.map(ArrayList::size) | ||
.register(ExecutionMode.ASYNC_LOCAL, this, this); | ||
|
||
connector = RGHibernate.getOrCreate(name); | ||
connector.setXmlConf(xmlDef); | ||
connectors.put(this.name, this); | ||
|
@@ -202,7 +197,7 @@ public HashMap<String, Object> map(HashMap<String, Object> r) throws Exception { | |
|
||
String sourceName = map.get(SOURCE_STR); | ||
|
||
Source source = (WriteSource)Source.getSource(sourceName); | ||
Source source = Source.getSource(sourceName); | ||
|
||
Map<String, Object> newMap = new HashMap<>(); | ||
PropertyData idProperty = source.getIdProperty(); | ||
|
@@ -212,7 +207,7 @@ public HashMap<String, Object> map(HashMap<String, Object> r) throws Exception { | |
try { | ||
convertedVal = idProperty.convertToObject(val); | ||
}catch (Exception e) { | ||
String msg = String.format("Can not conver id property %s val %s, error='%s'", idProperty.getName(), val, e.toString()); | ||
String msg = String.format("Can not convert id property %s val %s, error='%s'", idProperty.getName(), val, e); | ||
GearsBuilder.log(msg, LogLevel.WARNING); | ||
throw new Exception(msg); | ||
} | ||
|
@@ -231,7 +226,7 @@ public HashMap<String, Object> map(HashMap<String, Object> r) throws Exception { | |
pm = source.getPropertyMapping(key); | ||
convertedVal = pm.convertToObject(val); | ||
}catch (Exception e) { | ||
String msg = String.format("Can not find property mapping for %s val %s, error='%s'", key, val, e.toString()); | ||
String msg = String.format("Can not find property mapping for %s val %s, error='%s'", key, val, e); | ||
GearsBuilder.log(msg, LogLevel.WARNING); | ||
throw new Exception(msg); | ||
} | ||
|
@@ -245,55 +240,79 @@ public HashMap<String, Object> map(HashMap<String, Object> r) throws Exception { | |
return r; | ||
} | ||
|
||
|
||
@Override | ||
public void foreach(ArrayList<HashMap<String, Object>> record) throws Exception { | ||
String lastStreamId = null; | ||
String msg = null; | ||
Exception cause = null; | ||
int lastCommittedIdx = -1; | ||
synchronized (this.connector) { | ||
try { | ||
Session session = connector.getSession(); | ||
Transaction transaction = session.beginTransaction(); | ||
boolean isMerge = true; | ||
|
||
for(Map<String, Object> r: record) { | ||
lastStreamId = new String((byte[])r.get("id")); | ||
Map<String, Object> map = (Map<String, Object>) r.get("value"); | ||
String sourceName = (String)map.remove(SOURCE_STR); | ||
|
||
String event = (String)map.remove(EVENT_STR); | ||
if(event.charAt(0) != 'd') { | ||
if(!isMerge) { | ||
|
||
for (int idx = 0; idx < record.size(); ++idx) { | ||
Map<String, Object> r = record.get(idx); | ||
lastStreamId = new String((byte[]) r.get("id")); | ||
Map<String, Object> value = (Map<String, Object>) r.get("value"); | ||
Map<String, Object> map = value.entrySet().stream() | ||
.collect(Collectors.toMap(Map.Entry::getKey, entry -> entry.getValue())); | ||
String sourceName = (String) map.remove(SOURCE_STR); | ||
|
||
String event = (String) map.remove(EVENT_STR); | ||
if (event.charAt(0) != 'd') { | ||
if (!isMerge) { | ||
transaction.commit(); | ||
session.clear(); | ||
transaction = session.beginTransaction(); | ||
isMerge = true; | ||
lastCommittedIdx = idx - 1; | ||
} | ||
session.merge((String)map.remove(ENTETY_NAME_STR), map); | ||
}else { | ||
if(isMerge) { | ||
session.merge((String) map.remove(ENTETY_NAME_STR), map); | ||
} else { | ||
if (isMerge) { | ||
transaction.commit(); | ||
session.clear(); | ||
transaction = session.beginTransaction(); | ||
isMerge = false; | ||
lastCommittedIdx = idx - 1; | ||
} | ||
Source source = Source.getSource(sourceName); | ||
Object o = session.get((String)map.remove(ENTETY_NAME_STR), (Serializable)map.get(source.getIdProperty().getName())); | ||
Object o = session.get((String) map.remove(ENTETY_NAME_STR), (Serializable) map.get(source.getIdProperty().getName())); | ||
// o can be null on hdel that removed the last field | ||
if(o != null) { | ||
if (o != null) { | ||
session.delete(o); | ||
} | ||
} | ||
} | ||
|
||
transaction.commit(); | ||
session.clear(); | ||
}catch (Exception e) { | ||
msg = String.format("Failed commiting transaction error='%s'", e.toString()); | ||
} | ||
catch (TransactionException|JDBCConnectionException|ServiceException ex) { | ||
msg = String.format("Failed committing transaction error='%s'", ex); | ||
GearsBuilder.log(msg, LogLevel.WARNING); | ||
lastStreamId = null; | ||
cause = ex; | ||
connector.closeSession(); | ||
|
||
} | ||
catch (Exception e) { | ||
msg = String.format("Failed committing transaction error='%s'", e); | ||
GearsBuilder.log(msg, LogLevel.WARNING); | ||
|
||
lastStreamId = null; | ||
cause = e; | ||
|
||
if ( connector.getSession().getTransaction().isActive() ) { | ||
connector.getSession().getTransaction().rollback(); | ||
} | ||
|
||
connector.getSession().clear(); | ||
retry(record.subList(lastCommittedIdx + 1, record.size())); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We should do the retry only if it was requested when register the connector. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Added an option to use DLQ There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I do not see where you use this new value? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Right, I moved it to separate exceptions and forgot to put it back in. Just did |
||
msg = null; | ||
} | ||
|
||
while(!queue.isEmpty()) { | ||
|
@@ -310,9 +329,106 @@ public void foreach(ArrayList<HashMap<String, Object>> record) throws Exception | |
throw new Exception(msg, cause); | ||
} | ||
} | ||
|
||
} | ||
|
||
|
||
private void retry(List<HashMap<String, Object>> record) throws Exception { | ||
for(Map<String, Object> r: record) { | ||
try { | ||
Session session = connector.getSession(); | ||
Transaction transaction = session.beginTransaction(); | ||
|
||
Map<String, Object> value = (Map<String, Object>) r.get("value"); | ||
Map<String, Object> map = value.entrySet().stream() | ||
.collect(Collectors.toMap(Map.Entry::getKey, entry -> entry.getValue())); | ||
String sourceName = (String) map.remove(SOURCE_STR); | ||
|
||
String event = (String) map.remove(EVENT_STR); | ||
if (event.charAt(0) != 'd') { | ||
session.merge((String) map.remove(ENTETY_NAME_STR), map); | ||
} else { | ||
Source source = Source.getSource(sourceName); | ||
Object o = session.get((String) map.remove(ENTETY_NAME_STR), (Serializable) map.get(source.getIdProperty().getName())); | ||
// o can be null on hdel that removed the last field | ||
if (o != null) { | ||
session.delete(o); | ||
} | ||
} | ||
transaction.commit(); | ||
session.clear(); | ||
} catch (Exception e) { | ||
String msg = String.format("Failed retrying transaction error='%s'", e); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What if you fail here because of connection error? |
||
GearsBuilder.log(msg, LogLevel.WARNING); | ||
|
||
//connector.closeSession(); | ||
if (connector.getSession().getTransaction().isActive()) { | ||
connector.getSession().getTransaction().rollback(); | ||
} | ||
|
||
connector.getSession().clear(); | ||
putInDLQ(r, e); | ||
} | ||
} | ||
} | ||
|
||
private void putInDLQ(Map<String, Object> r, Exception e) throws Exception { | ||
String streamName = new String((byte[])r.get("key")); | ||
Map<String, Object> map = (Map<String, Object>) r.get("value"); | ||
|
||
Map<String, String> newMap = new HashMap<>(); | ||
|
||
newMap.put("error", e.getMessage()); | ||
Throwable ex = e; | ||
while (ex.getCause() != null) | ||
ex = ex.getCause(); | ||
newMap.put("cause", ex.getLocalizedMessage()); | ||
|
||
String[] command; | ||
Stream<String> commandStreamInit = Stream.of("XADD", DLQ_STR + streamName, "*"); | ||
MeirShpilraien marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
String msg = null; | ||
String sourceName = (String)map.remove(SOURCE_STR); | ||
Source source = Source.getSource(sourceName); | ||
PropertyData idProperty = source.getIdProperty(); | ||
Object idVal = map.remove(idProperty.getName()); | ||
String idStr = null; | ||
try { | ||
idStr = idProperty.convertToStr(idVal); | ||
}catch (Exception propEx) { | ||
msg = String.format("Can not convert id property %s val %s, error='%s'", idProperty.getName(), idVal, propEx); | ||
GearsBuilder.log(msg, LogLevel.WARNING); | ||
throw new Exception(msg); | ||
} | ||
|
||
newMap.put(idProperty.getName(), idStr); | ||
newMap.put(ENTETY_NAME_STR, map.remove(ENTETY_NAME_STR).toString()); | ||
newMap.put(EVENT_STR, map.remove(EVENT_STR).toString()); | ||
newMap.put(SOURCE_STR, sourceName); | ||
|
||
for(String key : map.keySet()) { | ||
Object val = map.get(key); | ||
|
||
PropertyData pm = null; | ||
String convertedVal = null; | ||
try { | ||
pm = source.getPropertyMapping(key); | ||
convertedVal = pm.convertToStr(val); | ||
} catch (Exception propEx) { | ||
msg = String.format("Can not find property mapping for %s val %s, error='%s'", key, val, propEx); | ||
GearsBuilder.log(msg, LogLevel.WARNING); | ||
throw new Exception(msg); | ||
} | ||
|
||
|
||
newMap.put(key, convertedVal); | ||
} | ||
|
||
Stream<String> fieldsStream = newMap.entrySet().stream() | ||
.flatMap(entry -> Stream.of(entry.getKey(), entry.getValue())); | ||
|
||
command = Stream.concat(commandStreamInit, fieldsStream).toArray(String[]::new); | ||
GearsBuilder.execute(command); | ||
} | ||
|
||
public Object getObject(String entetyName, Serializable pk) throws Exception { | ||
Object o = null; | ||
synchronized (this.connector) { | ||
|
@@ -321,7 +437,7 @@ public Object getObject(String entetyName, Serializable pk) throws Exception { | |
session.clear(); | ||
o = session.get(entetyName, pk); | ||
}catch(Exception e) { | ||
String msg = String.format("Failed fetching data from databse, error='%s'", e.toString()); | ||
String msg = String.format("Failed fetching data from databse, error='%s'", e); | ||
GearsBuilder.log(msg, LogLevel.WARNING); | ||
connector.closeSession(); | ||
throw e; | ||
|
@@ -395,6 +511,10 @@ public int getRetryInterval() { | |
return retryInterval; | ||
} | ||
|
||
public boolean getErrorsToDLQ() { | ||
MeirShpilraien marked this conversation as resolved.
Show resolved
Hide resolved
|
||
return errorsToDLQ; | ||
} | ||
|
||
public void setName(String name) { | ||
this.name = name; | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So there is no really need to set the message here right? And then no need to set it to NULL after? Also I do not see where you check the
errorsToDLQ
value?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right, I moved it to separate exceptions and forgot to put it back in. Just did