Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow continue on errors #102

Merged
merged 11 commits into from
Jul 4, 2023
224 changes: 172 additions & 52 deletions src/main/java/com/redislabs/Connector.java
Original file line number Diff line number Diff line change
@@ -1,36 +1,28 @@
package com.redislabs;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.stream.Collectors;

import gears.ExecutionMode;
import gears.GearsBuilder;
import gears.LogLevel;
import gears.operations.*;
import gears.readers.StreamReader;
import gears.readers.StreamReader.FailurePolicy;
import org.hibernate.Session;
import org.hibernate.Transaction;
import org.hibernate.TransactionException;
import org.hibernate.boot.registry.StandardServiceInitiator;
import org.hibernate.boot.registry.StandardServiceRegistryBuilder;
import org.hibernate.boot.registry.internal.StandardServiceRegistryImpl;
import org.hibernate.exception.JDBCConnectionException;
import org.hibernate.service.Service;
import org.hibernate.service.spi.ServiceException;
import org.hibernate.service.spi.ServiceRegistryImplementor;

import gears.ExecutionMode;
import gears.GearsBuilder;
import gears.LogLevel;
import gears.operations.AccumulateOperation;
import gears.operations.ForeachOperation;
import gears.operations.MapOperation;
import gears.operations.OnRegisteredOperation;
import gears.operations.OnUnregisteredOperation;
import gears.readers.StreamReader;
import gears.readers.StreamReader.FailurePolicy;
import java.io.Serializable;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.stream.Collectors;
import java.util.stream.Stream;

public class Connector implements ForeachOperation<ArrayList<HashMap<String,Object>>>,
AccumulateOperation<HashMap<String,Object>, ArrayList<HashMap<String, Object>>>,
Expand All @@ -41,7 +33,8 @@ public class Connector implements ForeachOperation<ArrayList<HashMap<String,Obje
public static final String ENTETY_NAME_STR = "__entityName__";
public static final String EVENT_STR = "__event__";
public static final String SOURCE_STR = "__source__";

public static final String DLQ_STR = "_DLQ";

class RGHibernateStandardServiceInitiator implements StandardServiceInitiator<Service>{

private Map values;
Expand Down Expand Up @@ -103,19 +96,21 @@ static Connector getConnector(String name) {
private int batchSize;
private int duration;
private int retryInterval;
private boolean errorsToDLQ;

public transient ConcurrentLinkedDeque<WriteThroughMD> queue = null;

public Connector() {}

public Connector(String name, String xmlDef, int batchSize, int duration, int retryInterval) {
public Connector(String name, String xmlDef, int batchSize, int duration, int retryInterval, boolean errorsToDLQ) {
this.name = name;
this.xmlDef = xmlDef;
this.errorsToDLQ = errorsToDLQ;

System.setProperty("javax.xml.bind.JAXBContextFactory", "org.eclipse.persistence.jaxb.JAXBContextFactory");
Thread.currentThread().setContextClassLoader(WriteBehind.class.getClassLoader());
StandardServiceRegistryImpl tempRegistry = (StandardServiceRegistryImpl)new StandardServiceRegistryBuilder()
.configure( InMemoryURLFactory.getInstance().build("configuration", this.xmlDef))
.configure( InMemoryURLFactory.getInstance().build("configuration", xmlDef))
.build();

RGHibernateStandardServiceInitiator initiator = this.new RGHibernateStandardServiceInitiator();
Expand Down Expand Up @@ -146,9 +141,9 @@ public Connector(String name, String xmlDef, int batchSize, int duration, int re
.map(this)
.accumulate(this)
.foreach(this)
.map(ArrayList<HashMap<String, Object>>::size)
.map(ArrayList::size)
.register(ExecutionMode.ASYNC_LOCAL, this, this);

connector = RGHibernate.getOrCreate(name);
connector.setXmlConf(xmlDef);
connectors.put(this.name, this);
Expand Down Expand Up @@ -202,7 +197,7 @@ public HashMap<String, Object> map(HashMap<String, Object> r) throws Exception {

String sourceName = map.get(SOURCE_STR);

Source source = (WriteSource)Source.getSource(sourceName);
Source source = Source.getSource(sourceName);

Map<String, Object> newMap = new HashMap<>();
PropertyData idProperty = source.getIdProperty();
Expand All @@ -212,7 +207,7 @@ public HashMap<String, Object> map(HashMap<String, Object> r) throws Exception {
try {
convertedVal = idProperty.convertToObject(val);
}catch (Exception e) {
String msg = String.format("Can not conver id property %s val %s, error='%s'", idProperty.getName(), val, e.toString());
String msg = String.format("Can not convert id property %s val %s, error='%s'", idProperty.getName(), val, e);
GearsBuilder.log(msg, LogLevel.WARNING);
throw new Exception(msg);
}
Expand All @@ -231,7 +226,7 @@ public HashMap<String, Object> map(HashMap<String, Object> r) throws Exception {
pm = source.getPropertyMapping(key);
convertedVal = pm.convertToObject(val);
}catch (Exception e) {
String msg = String.format("Can not find property mapping for %s val %s, error='%s'", key, val, e.toString());
String msg = String.format("Can not find property mapping for %s val %s, error='%s'", key, val, e);
GearsBuilder.log(msg, LogLevel.WARNING);
throw new Exception(msg);
}
Expand All @@ -245,55 +240,79 @@ public HashMap<String, Object> map(HashMap<String, Object> r) throws Exception {
return r;
}


@Override
public void foreach(ArrayList<HashMap<String, Object>> record) throws Exception {
String lastStreamId = null;
String msg = null;
Exception cause = null;
int lastCommittedIdx = -1;
synchronized (this.connector) {
try {
Session session = connector.getSession();
Transaction transaction = session.beginTransaction();
boolean isMerge = true;

for(Map<String, Object> r: record) {
lastStreamId = new String((byte[])r.get("id"));
Map<String, Object> map = (Map<String, Object>) r.get("value");
String sourceName = (String)map.remove(SOURCE_STR);

String event = (String)map.remove(EVENT_STR);
if(event.charAt(0) != 'd') {
if(!isMerge) {

for (int idx = 0; idx < record.size(); ++idx) {
Map<String, Object> r = record.get(idx);
lastStreamId = new String((byte[]) r.get("id"));
Map<String, Object> value = (Map<String, Object>) r.get("value");
Map<String, Object> map = value.entrySet().stream()
.collect(Collectors.toMap(Map.Entry::getKey, entry -> entry.getValue()));
String sourceName = (String) map.remove(SOURCE_STR);

String event = (String) map.remove(EVENT_STR);
if (event.charAt(0) != 'd') {
if (!isMerge) {
transaction.commit();
session.clear();
transaction = session.beginTransaction();
isMerge = true;
lastCommittedIdx = idx - 1;
}
session.merge((String)map.remove(ENTETY_NAME_STR), map);
}else {
if(isMerge) {
session.merge((String) map.remove(ENTETY_NAME_STR), map);
} else {
if (isMerge) {
transaction.commit();
session.clear();
transaction = session.beginTransaction();
isMerge = false;
lastCommittedIdx = idx - 1;
}
Source source = Source.getSource(sourceName);
Object o = session.get((String)map.remove(ENTETY_NAME_STR), (Serializable)map.get(source.getIdProperty().getName()));
Object o = session.get((String) map.remove(ENTETY_NAME_STR), (Serializable) map.get(source.getIdProperty().getName()));
// o can be null on hdel that removed the last field
if(o != null) {
if (o != null) {
session.delete(o);
}
}
}

transaction.commit();
session.clear();
}catch (Exception e) {
msg = String.format("Failed commiting transaction error='%s'", e.toString());
}
catch (TransactionException|JDBCConnectionException|ServiceException ex) {
msg = String.format("Failed committing transaction error='%s'", ex);
GearsBuilder.log(msg, LogLevel.WARNING);
lastStreamId = null;
cause = ex;
connector.closeSession();

}
catch (Exception e) {
msg = String.format("Failed committing transaction error='%s'", e);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So there is no really need to set the message here right? And then no need to set it to NULL after? Also I do not see where you check the errorsToDLQ value?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, I moved it to separate exceptions and forgot to put it back in. Just did

GearsBuilder.log(msg, LogLevel.WARNING);

lastStreamId = null;
cause = e;

if ( connector.getSession().getTransaction().isActive() ) {
connector.getSession().getTransaction().rollback();
}

connector.getSession().clear();
retry(record.subList(lastCommittedIdx + 1, record.size()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should do the retry only if it was requested when register the connector.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added an option to use DLQ

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do not see where you use this new value?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, I moved it to separate exceptions and forgot to put it back in. Just did

msg = null;
}

while(!queue.isEmpty()) {
Expand All @@ -310,9 +329,106 @@ public void foreach(ArrayList<HashMap<String, Object>> record) throws Exception
throw new Exception(msg, cause);
}
}

}


private void retry(List<HashMap<String, Object>> record) throws Exception {
for(Map<String, Object> r: record) {
try {
Session session = connector.getSession();
Transaction transaction = session.beginTransaction();

Map<String, Object> value = (Map<String, Object>) r.get("value");
Map<String, Object> map = value.entrySet().stream()
.collect(Collectors.toMap(Map.Entry::getKey, entry -> entry.getValue()));
String sourceName = (String) map.remove(SOURCE_STR);

String event = (String) map.remove(EVENT_STR);
if (event.charAt(0) != 'd') {
session.merge((String) map.remove(ENTETY_NAME_STR), map);
} else {
Source source = Source.getSource(sourceName);
Object o = session.get((String) map.remove(ENTETY_NAME_STR), (Serializable) map.get(source.getIdProperty().getName()));
// o can be null on hdel that removed the last field
if (o != null) {
session.delete(o);
}
}
transaction.commit();
session.clear();
} catch (Exception e) {
String msg = String.format("Failed retrying transaction error='%s'", e);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if you fail here because of connection error?

GearsBuilder.log(msg, LogLevel.WARNING);

//connector.closeSession();
if (connector.getSession().getTransaction().isActive()) {
connector.getSession().getTransaction().rollback();
}

connector.getSession().clear();
putInDLQ(r, e);
}
}
}

private void putInDLQ(Map<String, Object> r, Exception e) throws Exception {
String streamName = new String((byte[])r.get("key"));
Map<String, Object> map = (Map<String, Object>) r.get("value");

Map<String, String> newMap = new HashMap<>();

newMap.put("error", e.getMessage());
Throwable ex = e;
while (ex.getCause() != null)
ex = ex.getCause();
newMap.put("cause", ex.getLocalizedMessage());

String[] command;
Stream<String> commandStreamInit = Stream.of("XADD", DLQ_STR + streamName, "*");
MeirShpilraien marked this conversation as resolved.
Show resolved Hide resolved

String msg = null;
String sourceName = (String)map.remove(SOURCE_STR);
Source source = Source.getSource(sourceName);
PropertyData idProperty = source.getIdProperty();
Object idVal = map.remove(idProperty.getName());
String idStr = null;
try {
idStr = idProperty.convertToStr(idVal);
}catch (Exception propEx) {
msg = String.format("Can not convert id property %s val %s, error='%s'", idProperty.getName(), idVal, propEx);
GearsBuilder.log(msg, LogLevel.WARNING);
throw new Exception(msg);
}

newMap.put(idProperty.getName(), idStr);
newMap.put(ENTETY_NAME_STR, map.remove(ENTETY_NAME_STR).toString());
newMap.put(EVENT_STR, map.remove(EVENT_STR).toString());
newMap.put(SOURCE_STR, sourceName);

for(String key : map.keySet()) {
Object val = map.get(key);

PropertyData pm = null;
String convertedVal = null;
try {
pm = source.getPropertyMapping(key);
convertedVal = pm.convertToStr(val);
} catch (Exception propEx) {
msg = String.format("Can not find property mapping for %s val %s, error='%s'", key, val, propEx);
GearsBuilder.log(msg, LogLevel.WARNING);
throw new Exception(msg);
}


newMap.put(key, convertedVal);
}

Stream<String> fieldsStream = newMap.entrySet().stream()
.flatMap(entry -> Stream.of(entry.getKey(), entry.getValue()));

command = Stream.concat(commandStreamInit, fieldsStream).toArray(String[]::new);
GearsBuilder.execute(command);
}

public Object getObject(String entetyName, Serializable pk) throws Exception {
Object o = null;
synchronized (this.connector) {
Expand All @@ -321,7 +437,7 @@ public Object getObject(String entetyName, Serializable pk) throws Exception {
session.clear();
o = session.get(entetyName, pk);
}catch(Exception e) {
String msg = String.format("Failed fetching data from databse, error='%s'", e.toString());
String msg = String.format("Failed fetching data from databse, error='%s'", e);
GearsBuilder.log(msg, LogLevel.WARNING);
connector.closeSession();
throw e;
Expand Down Expand Up @@ -395,6 +511,10 @@ public int getRetryInterval() {
return retryInterval;
}

public boolean getErrorsToDLQ() {
MeirShpilraien marked this conversation as resolved.
Show resolved Hide resolved
return errorsToDLQ;
}

public void setName(String name) {
this.name = name;
}
Expand Down
8 changes: 4 additions & 4 deletions src/main/java/com/redislabs/PropertyData.java
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
package com.redislabs;

import java.io.Serializable;
import java.sql.Date;

import org.hibernate.type.AbstractStandardBasicType;
import org.hibernate.type.TimestampType;
import org.hibernate.type.Type;

import java.io.Serializable;
import java.sql.Date;

public class PropertyData implements Serializable{

/**
Expand Down Expand Up @@ -52,7 +52,7 @@ public Object convertToObject(String val) {
}

public String convertToStr(Object val) {
if(this.type instanceof TimestampType && val instanceof java.sql.Timestamp) {
if(this.type instanceof TimestampType && (val instanceof java.sql.Timestamp)) {
// try first to parse as timestamp
try {
return Long.toString(((java.sql.Timestamp)val).getTime());
Expand Down
Loading