Skip to content

Commit

Permalink
support JSON with reserved SPL keywords: issue #12 and #72
Browse files Browse the repository at this point in the history
  • Loading branch information
Mark-Oliver Heger committed Aug 25, 2017
1 parent df2acc4 commit 84b688f
Show file tree
Hide file tree
Showing 6 changed files with 99 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ public class JSONToTuple extends AbstractOperator
private boolean wasTargetSpecified = false;
private boolean hasOptionalOut = false;
private TupleAttribute<Tuple,String> inputJsonAttribute = null;
private boolean wasPrefixToIgnoreSpecified = false;
private String prefixToIgnore = null;

@Parameter(name=INPUT_JSON_ATTRIBUTE_PARAM,optional=true, description="The input stream attribute (not the name of the attribute) which contains the input JSON string. This attribute must be of `rstring` or `ustring` type. Default is the sole input attribute when the schema has one attribute otherwise `jsonString`. Replaces parameter `jsonStringAttribute`.")
public void setInputJson(TupleAttribute<Tuple,String> in) {
Expand Down Expand Up @@ -89,6 +91,25 @@ public void setTargetAttribute(String value) {
public void setIgnoreParsingError(boolean value) {
ignoreParsingError = value;
}

@Parameter(optional=true, description=
"Specifies a string that, if present, is removed from the start of an attribute name." +
"You can use this method for JSON that contains elements or attributes with SPL or C++ keywords." +
"For example:\\n" +
"\\n" +
" stream <rstring __graph> A = JSONToTuple(Input) {\\n" +
" param ignorePrefix : \\\"__\\\";\\n" +
" }\\n" +
"\\n" +
"This example accepts JSON of the following form: \\n" +
"\\n" +
" {\\\"graph\\\" : \\\"value\\\"}\\n" +
"\\n" +
"Since graph is an SPL keyword, stream<rstring graph> A = JSONToTuple is not valid SPL.")
public void setPrefixToIgnore(String value) {
this.prefixToIgnore = value;
wasPrefixToIgnoreSpecified=true;
}

@ContextCheck
public static boolean checkOptionalPortSchema(OperatorContextChecker checker) {
Expand Down Expand Up @@ -131,6 +152,9 @@ public void initialize(OperatorContext op) throws Exception {
Arrays.asList(MetaType.TUPLE, MetaType.LIST, MetaType.BLIST, MetaType.SET, MetaType.BSET));
l.log(TraceLevel.INFO, "Will populate target field: " + targetAttribute); //$NON-NLS-1$
}
if (wasPrefixToIgnoreSpecified) {
JSONToTupleConverter.setPrefixToIgnore(this.prefixToIgnore);
}
}

public void process(StreamingInput<Tuple> stream, Tuple tuple) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ public class TupleToJSON extends AbstractOperator {
TupleAttribute<Tuple,?> rootAttr = null;
private String rootAttribute = null;
private Type rootAttributeType =null;
private boolean wasPrefixToIgnoreSpecified = false;
private String prefixToIgnore = null;

private static Logger l = Logger.getLogger(TupleToJSON.class.getCanonicalName());

Expand All @@ -63,6 +65,25 @@ public void setRootAttribute(String value) {
this.rootAttribute = value;
}

@Parameter(optional=true, description=
"Specifies a string that, if present, is removed from the start of an attribute name." +
"You can use this method for JSON that contains elements or attributes with SPL or C++ keywords." +
"For example:\\n" +
"\\n" +
" stream <rstring jsonString> A = TupleToJSON(Input) {\\n" +
" param ignorePrefix : \\\"__\\\";\\n" +
" }\\n" +
"\\n" +
"This example accepts JSON of the following form: \\n" +
"\\n" +
" {\\\"graph\\\" : \\\"value\\\"}\\n" +
"\\n" +
"Since graph is an SPL keyword, stream<rstring graph> as input schema of TupleToJSON is not valid SPL.")
public void setPrefixToIgnore(String value) {
this.prefixToIgnore = value;
wasPrefixToIgnoreSpecified=true;
}

@Override
public void initialize(OperatorContext op) throws Exception {
super.initialize(op);
Expand Down Expand Up @@ -108,8 +129,7 @@ public void process(StreamingInput<Tuple> stream, Tuple tuple) throws Exception
}
OutputTuple op = ops.newTuple();
op.assign(tuple);//copy over all relevant attributes form the source tuple
op.setString(jsonStringAttribute, jsonData);

op.setString(jsonStringAttribute, ((wasPrefixToIgnoreSpecified) ? jsonData.replaceAll("\""+this.prefixToIgnore, "\"") : jsonData));
ops.submit(op);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,12 @@ public class JSONToTupleConverter {

private static Logger l = Logger.getLogger(JSONToTupleConverter.class.getCanonicalName());

private static String prefixToIgnore = null; // null means prefixToIgnore is disabled

public static void setPrefixToIgnore (String value) {
prefixToIgnore = value;
}

/**
* Convert JSON value to an SPL tuple attribute value.
*
Expand Down Expand Up @@ -385,26 +391,28 @@ public static Tuple jsonToTuple(JSONObject jbase, StreamSchema schema) throws Ex
public static Map<String, Object> jsonToAtributeMap(JSONObject jbase, StreamSchema schema) throws Exception {
Map<String, Object> attrmap = new HashMap<String, Object>();
for(Attribute attr : schema) {
String name = attr.getName();
String nameToSearch = attr.getName();
if ((prefixToIgnore != null) && (attr.getName().startsWith(prefixToIgnore))) {
nameToSearch = attr.getName().substring(prefixToIgnore.length());
}
try {
if(l.isLoggable(TraceLevel.DEBUG)) {
l.log(TraceLevel.DEBUG, "Checking for: " + name); //$NON-NLS-1$
l.log(TraceLevel.DEBUG, "Checking for: " + nameToSearch); //$NON-NLS-1$
}
Object childobj = jbase.get(name);
Object childobj = jbase.get(nameToSearch);
if(childobj==null) {
if(l.isLoggable(TraceLevel.DEBUG)) {
l.log(TraceLevel.DEBUG, "Not Found: " + name); //$NON-NLS-1$
l.log(TraceLevel.DEBUG, "Not Found: " + nameToSearch); //$NON-NLS-1$
}
continue;
}
Object obj = jsonToAttribute(name, attr.getType(), childobj, null);
Object obj = jsonToAttribute(attr.getName(), attr.getType(), childobj, null);
if(obj!=null)
attrmap.put(name, obj);
}catch(Exception e) {
l.log(TraceLevel.ERROR, "Error converting object: " + name, e); //$NON-NLS-1$
attrmap.put(attr.getName(), obj);
} catch(Exception e) {
l.log(TraceLevel.ERROR, "Error converting object: " + attr.getName(), e); //$NON-NLS-1$
throw e;
}

}
return attrmap;
}
Expand Down
4 changes: 2 additions & 2 deletions tests/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,12 @@ args=-t ../com.ibm.streamsx.json --data-directory=data
ns=com.ibm.streamsx.json.tests
outputdir=./output

tests=BasicTest ListTest SetOfListTest InputSpecificationTest RootAttributeTest
tests=BasicTest ListTest SetOfListTest InputSpecificationTest RootAttributeTest ReservedKeywordTest

rntest=./scripts/testRunner.sh
ftest=./scripts/expectFail.sh

all: BasicTest ListTest SetOfListTest NullBasicTest RecordArrayListTest InputSpecificationTest RootAttributeTest CompileFailtest EmptyStringTest
all: BasicTest ListTest SetOfListTest NullBasicTest RecordArrayListTest InputSpecificationTest RootAttributeTest CompileFailtest EmptyStringTest ReservedKeywordTest
@echo "Tests Passed"

compile: ${tests}
Expand Down
24 changes: 24 additions & 0 deletions tests/com.ibm.streamsx.json.tests/SimpleTests.spl
Original file line number Diff line number Diff line change
Expand Up @@ -85,3 +85,27 @@ composite NullBasicTest {

}

composite ReservedKeywordTest {

type
MyType = rstring __type, rstring b, rstring __graph;

graph
stream<MyType> ExpectedS = Beacon() {
param
iterations : 1u;
output ExpectedS : __type="a", b="b", __graph="c";
}
stream<rstring jsonString> JsonS = Beacon() {
param
iterations : 1u;
output JsonS : jsonString = "{\"type\" : \"a\", \"b\" : \"b\", \"graph\" : \"c\"}";
}
() as SinkOp = VerifierJTOT(JsonS; ExpectedS) {} // verify JSONToTuple

() as SinkOp1 = Verifier(ExpectedS) {} // Verify TupleToJSON and JSONToTuple in a sequence
config
tracing : debug;

}

16 changes: 10 additions & 6 deletions tests/com.ibm.streamsx.json.tests/Verfier.spl
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,14 @@ composite Verifier(input InputS) {
graph

stream<rstring jsonString> JsonS =
com.ibm.streamsx.json::TupleToJSON(InputS)
{}
com.ibm.streamsx.json::TupleToJSON(InputS) {
param prefixToIgnore: "__";
}

stream<MyType> OutputS =
com.ibm.streamsx.json::JSONToTuple(JsonS)
{}
com.ibm.streamsx.json::JSONToTuple(JsonS) {
param prefixToIgnore: "__";
}

() as SinkOp = Custom(InputS; OutputS) {
logic
Expand Down Expand Up @@ -75,8 +78,9 @@ composite VerifierJTOT(input JsonS, ExpectedS) {
graph

stream<MyType> OutputS =
com.ibm.streamsx.json::JSONToTuple(JsonS)
{}
com.ibm.streamsx.json::JSONToTuple(JsonS) {
param prefixToIgnore: "__";
}

() as SinkOp = Custom(ExpectedS; OutputS) {
logic
Expand Down

0 comments on commit 84b688f

Please sign in to comment.