Skip to content

Commit

Permalink
Allow continue on errors (#102)
Browse files Browse the repository at this point in the history
This PR contains a retry mechanism in case of a failure.
RGHibernate tries to write a whole batch as a single transaction.
When a failure occurs, it tries to execute the updates one by one. If an update fails, it is moved to a DLQ stream together with details about the error, where it can be reviewed manually.
  • Loading branch information
shkediy authored Jul 4, 2023
1 parent efef63c commit 51aaf1b
Show file tree
Hide file tree
Showing 7 changed files with 271 additions and 110 deletions.
245 changes: 192 additions & 53 deletions src/main/java/com/redislabs/Connector.java
Original file line number Diff line number Diff line change
@@ -1,36 +1,28 @@
package com.redislabs;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.stream.Collectors;

import gears.ExecutionMode;
import gears.GearsBuilder;
import gears.LogLevel;
import gears.operations.*;
import gears.readers.StreamReader;
import gears.readers.StreamReader.FailurePolicy;
import org.hibernate.Session;
import org.hibernate.Transaction;
import org.hibernate.TransactionException;
import org.hibernate.boot.registry.StandardServiceInitiator;
import org.hibernate.boot.registry.StandardServiceRegistryBuilder;
import org.hibernate.boot.registry.internal.StandardServiceRegistryImpl;
import org.hibernate.exception.JDBCConnectionException;
import org.hibernate.service.Service;
import org.hibernate.service.spi.ServiceException;
import org.hibernate.service.spi.ServiceRegistryImplementor;

import gears.ExecutionMode;
import gears.GearsBuilder;
import gears.LogLevel;
import gears.operations.AccumulateOperation;
import gears.operations.ForeachOperation;
import gears.operations.MapOperation;
import gears.operations.OnRegisteredOperation;
import gears.operations.OnUnregisteredOperation;
import gears.readers.StreamReader;
import gears.readers.StreamReader.FailurePolicy;
import java.io.Serializable;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.stream.Collectors;
import java.util.stream.Stream;

public class Connector implements ForeachOperation<ArrayList<HashMap<String,Object>>>,
AccumulateOperation<HashMap<String,Object>, ArrayList<HashMap<String, Object>>>,
Expand All @@ -41,7 +33,8 @@ public class Connector implements ForeachOperation<ArrayList<HashMap<String,Obje
public static final String ENTETY_NAME_STR = "__entityName__";
public static final String EVENT_STR = "__event__";
public static final String SOURCE_STR = "__source__";

public static final String DLQ_STR = "_DLQ";

class RGHibernateStandardServiceInitiator implements StandardServiceInitiator<Service>{

private Map values;
Expand Down Expand Up @@ -103,19 +96,21 @@ static Connector getConnector(String name) {
private int batchSize;
private int duration;
private int retryInterval;
private boolean errorsToDLQ;

public transient ConcurrentLinkedDeque<WriteThroughMD> queue = null;

public Connector() {}

public Connector(String name, String xmlDef, int batchSize, int duration, int retryInterval) {
public Connector(String name, String xmlDef, int batchSize, int duration, int retryInterval, boolean errorsToDLQ) {
this.name = name;
this.xmlDef = xmlDef;
this.errorsToDLQ = errorsToDLQ;

System.setProperty("javax.xml.bind.JAXBContextFactory", "org.eclipse.persistence.jaxb.JAXBContextFactory");
Thread.currentThread().setContextClassLoader(WriteBehind.class.getClassLoader());
StandardServiceRegistryImpl tempRegistry = (StandardServiceRegistryImpl)new StandardServiceRegistryBuilder()
.configure( InMemoryURLFactory.getInstance().build("configuration", this.xmlDef))
.configure( InMemoryURLFactory.getInstance().build("configuration", xmlDef))
.build();

RGHibernateStandardServiceInitiator initiator = this.new RGHibernateStandardServiceInitiator();
Expand Down Expand Up @@ -146,9 +141,9 @@ public Connector(String name, String xmlDef, int batchSize, int duration, int re
.map(this)
.accumulate(this)
.foreach(this)
.map(ArrayList<HashMap<String, Object>>::size)
.map(ArrayList::size)
.register(ExecutionMode.ASYNC_LOCAL, this, this);

connector = RGHibernate.getOrCreate(name);
connector.setXmlConf(xmlDef);
connectors.put(this.name, this);
Expand Down Expand Up @@ -202,7 +197,7 @@ public HashMap<String, Object> map(HashMap<String, Object> r) throws Exception {

String sourceName = map.get(SOURCE_STR);

Source source = (WriteSource)Source.getSource(sourceName);
Source source = Source.getSource(sourceName);

Map<String, Object> newMap = new HashMap<>();
PropertyData idProperty = source.getIdProperty();
Expand All @@ -212,7 +207,7 @@ public HashMap<String, Object> map(HashMap<String, Object> r) throws Exception {
try {
convertedVal = idProperty.convertToObject(val);
}catch (Exception e) {
String msg = String.format("Can not conver id property %s val %s, error='%s'", idProperty.getName(), val, e.toString());
String msg = String.format("Can not convert id property %s val %s, error='%s'", idProperty.getName(), val, e);
GearsBuilder.log(msg, LogLevel.WARNING);
throw new Exception(msg);
}
Expand All @@ -231,7 +226,7 @@ public HashMap<String, Object> map(HashMap<String, Object> r) throws Exception {
pm = source.getPropertyMapping(key);
convertedVal = pm.convertToObject(val);
}catch (Exception e) {
String msg = String.format("Can not find property mapping for %s val %s, error='%s'", key, val, e.toString());
String msg = String.format("Can not find property mapping for %s val %s, error='%s'", key, val, e);
GearsBuilder.log(msg, LogLevel.WARNING);
throw new Exception(msg);
}
Expand All @@ -245,55 +240,93 @@ public HashMap<String, Object> map(HashMap<String, Object> r) throws Exception {
return r;
}


@Override
public void foreach(ArrayList<HashMap<String, Object>> record) throws Exception {
String lastStreamId = null;
String msg = null;
Exception cause = null;
int lastCommittedIdx = -1;
synchronized (this.connector) {
try {
Session session = connector.getSession();
Transaction transaction = session.beginTransaction();
boolean isMerge = true;

for(Map<String, Object> r: record) {
lastStreamId = new String((byte[])r.get("id"));
Map<String, Object> map = (Map<String, Object>) r.get("value");
String sourceName = (String)map.remove(SOURCE_STR);

String event = (String)map.remove(EVENT_STR);
if(event.charAt(0) != 'd') {
if(!isMerge) {

for (int idx = 0; idx < record.size(); ++idx) {
Map<String, Object> r = record.get(idx);
lastStreamId = new String((byte[]) r.get("id"));
Map<String, Object> value = (Map<String, Object>) r.get("value");
Map<String, Object> map = value.entrySet().stream()
.collect(Collectors.toMap(Map.Entry::getKey, entry -> entry.getValue()));
String sourceName = (String) map.remove(SOURCE_STR);

String event = (String) map.remove(EVENT_STR);
if (event.charAt(0) != 'd') {
if (!isMerge) {
transaction.commit();
session.clear();
transaction = session.beginTransaction();
isMerge = true;
lastCommittedIdx = idx - 1;
}
session.merge((String)map.remove(ENTETY_NAME_STR), map);
}else {
if(isMerge) {
session.merge((String) map.remove(ENTETY_NAME_STR), map);
} else {
if (isMerge) {
transaction.commit();
session.clear();
transaction = session.beginTransaction();
isMerge = false;
lastCommittedIdx = idx - 1;
}
Source source = Source.getSource(sourceName);
Object o = session.get((String)map.remove(ENTETY_NAME_STR), (Serializable)map.get(source.getIdProperty().getName()));
Object o = session.get((String) map.remove(ENTETY_NAME_STR), (Serializable) map.get(source.getIdProperty().getName()));
// o can be null on hdel that removed the last field
if(o != null) {
if (o != null) {
session.delete(o);
}
}
}

transaction.commit();
session.clear();
}catch (Exception e) {
msg = String.format("Failed commiting transaction error='%s'", e.toString());
}
catch (TransactionException|JDBCConnectionException|ServiceException ex) {
msg = String.format("Failed committing transaction error='%s'", ex);
GearsBuilder.log(msg, LogLevel.WARNING);
lastStreamId = null;
cause = ex;
connector.closeSession();

}
catch (Exception e) {
msg = String.format("Failed committing transaction error='%s'", e);
GearsBuilder.log(msg, LogLevel.WARNING);

lastStreamId = null;
cause = e;

if ( errorsToDLQ ) {
if ( connector.getSession().getTransaction().isActive() ) {
connector.getSession().getTransaction().rollback();
}

connector.getSession().clear();

try {
retry(record.subList(lastCommittedIdx + 1, record.size()));
msg = null;
}
catch (Exception ex) {
msg = String.format("Failed committing transaction error='%s'", ex);
GearsBuilder.log(msg, LogLevel.WARNING);
cause = ex;
}

}
else {
connector.closeSession();
}
}

while(!queue.isEmpty()) {
Expand All @@ -310,9 +343,108 @@ public void foreach(ArrayList<HashMap<String, Object>> record) throws Exception
throw new Exception(msg, cause);
}
}

}


private void retry(List<HashMap<String, Object>> record) throws Exception {
for(Map<String, Object> r: record) {
try {
Session session = connector.getSession();
Transaction transaction = session.beginTransaction();

Map<String, Object> value = (Map<String, Object>) r.get("value");
Map<String, Object> map = value.entrySet().stream()
.collect(Collectors.toMap(Map.Entry::getKey, entry -> entry.getValue()));
String sourceName = (String) map.remove(SOURCE_STR);

String event = (String) map.remove(EVENT_STR);
if (event.charAt(0) != 'd') {
session.merge((String) map.remove(ENTETY_NAME_STR), map);
} else {
Source source = Source.getSource(sourceName);
Object o = session.get((String) map.remove(ENTETY_NAME_STR), (Serializable) map.get(source.getIdProperty().getName()));
// o can be null on hdel that removed the last field
if (o != null) {
session.delete(o);
}
}
transaction.commit();
session.clear();
} catch (TransactionException|JDBCConnectionException|ServiceException ex) {
connector.closeSession();
throw ex;
} catch (Exception e) {
String msg = String.format("Failed retrying transaction error='%s'", e);
GearsBuilder.log(msg, LogLevel.WARNING);

if (connector.getSession().getTransaction().isActive()) {
connector.getSession().getTransaction().rollback();
}

connector.getSession().clear();
putInDLQ(r, e);
}
}
}

private void putInDLQ(Map<String, Object> r, Exception e) throws Exception {
String streamName = new String((byte[])r.get("key"));
Map<String, Object> map = (Map<String, Object>) r.get("value");

Map<String, String> newMap = new HashMap<>();

newMap.put("error", e.getMessage());
Throwable ex = e;
while (ex.getCause() != null)
ex = ex.getCause();
newMap.put("cause", ex.getLocalizedMessage());

String[] command;
Stream<String> commandStreamInit = Stream.of("XADD", DLQ_STR + streamName, "*");

String msg = null;
String sourceName = (String)map.remove(SOURCE_STR);
Source source = Source.getSource(sourceName);
PropertyData idProperty = source.getIdProperty();
Object idVal = map.remove(idProperty.getName());
String idStr = null;
try {
idStr = idProperty.convertToStr(idVal);
}catch (Exception propEx) {
msg = String.format("Can not convert id property %s val %s, error='%s'", idProperty.getName(), idVal, propEx);
GearsBuilder.log(msg, LogLevel.WARNING);
throw new Exception(msg);
}

newMap.put(idProperty.getName(), idStr);
newMap.put(ENTETY_NAME_STR, map.remove(ENTETY_NAME_STR).toString());
newMap.put(EVENT_STR, map.remove(EVENT_STR).toString());
newMap.put(SOURCE_STR, sourceName);

for(String key : map.keySet()) {
Object val = map.get(key);

PropertyData pm = null;
String convertedVal = null;
try {
pm = source.getPropertyMapping(key);
convertedVal = pm.convertToStr(val);
} catch (Exception propEx) {
msg = String.format("Can not find property mapping for %s val %s, error='%s'", key, val, propEx);
GearsBuilder.log(msg, LogLevel.WARNING);
throw new Exception(msg);
}


newMap.put(key, convertedVal);
}

Stream<String> fieldsStream = newMap.entrySet().stream()
.flatMap(entry -> Stream.of(entry.getKey(), entry.getValue()));

command = Stream.concat(commandStreamInit, fieldsStream).toArray(String[]::new);
GearsBuilder.execute(command);
}

public Object getObject(String entetyName, Serializable pk) throws Exception {
Object o = null;
synchronized (this.connector) {
Expand All @@ -321,7 +453,7 @@ public Object getObject(String entetyName, Serializable pk) throws Exception {
session.clear();
o = session.get(entetyName, pk);
}catch(Exception e) {
String msg = String.format("Failed fetching data from databse, error='%s'", e.toString());
String msg = String.format("Failed fetching data from databse, error='%s'", e);
GearsBuilder.log(msg, LogLevel.WARNING);
connector.closeSession();
throw e;
Expand Down Expand Up @@ -395,6 +527,10 @@ public int getRetryInterval() {
return retryInterval;
}

public boolean getErrorsToDLQ() {
return errorsToDLQ;
}

public void setName(String name) {
this.name = name;
}
Expand Down Expand Up @@ -438,6 +574,9 @@ public void setDuration(int duration) {
public void setRetryInterval(int retryInterval) {
this.retryInterval = retryInterval;
}


public void setErrorsToDLQ(boolean errorsToDLQ) {
this.errorsToDLQ = errorsToDLQ;
}

}
6 changes: 3 additions & 3 deletions src/main/java/com/redislabs/PropertyData.java
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
package com.redislabs;

import java.io.Serializable;
import java.sql.Date;

import org.hibernate.type.AbstractStandardBasicType;
import org.hibernate.type.DbTimestampType;
import org.hibernate.type.Type;

import java.io.Serializable;
import java.sql.Date;

public class PropertyData implements Serializable{

/**
Expand Down
Loading

0 comments on commit 51aaf1b

Please sign in to comment.