Skip to content

Commit

Permalink
Merge pull request #68 from navinrathore/zAnalytics
Browse files Browse the repository at this point in the history
Added Google Analytics for telemetry #41
  • Loading branch information
sonalgoyal authored Dec 17, 2021
2 parents aca5ade + f6a7ead commit c244f12
Show file tree
Hide file tree
Showing 13 changed files with 274 additions and 16 deletions.
10 changes: 10 additions & 0 deletions client/src/main/java/zingg/client/Arguments.java
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ public class Arguments implements Serializable {
String modelId = "1";
double threshold = 0.5d;
int jobId = 1;
boolean collectMetrics = true;


public double getThreshold() {
Expand Down Expand Up @@ -151,6 +152,7 @@ public static final Arguments createArgumentsFromJSON(String filePath, String ph
true);
LOG.warn("Config Argument is " + filePath);
Arguments args = mapper.readValue(new File(filePath), Arguments.class);
LOG.warn("collectMetrics is " + args.getCollectMetrics());
LOG.warn("phase is " + phase);
checkValid(args, phase);
return args;
Expand Down Expand Up @@ -530,6 +532,14 @@ public void setJobId(int jobId) {
this.jobId = jobId;
}

public boolean getCollectMetrics() {
return collectMetrics;
}

public void setCollectMetrics(boolean collectMetrics) {
this.collectMetrics = collectMetrics;
}

public String[] getPipeNames() {
Pipe[] input = this.getData();
String[] sourceNames = new String[input.length];
Expand Down
44 changes: 34 additions & 10 deletions client/src/main/java/zingg/client/Client.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
import zingg.client.util.Email;
import zingg.client.util.EmailBody;


/**
* This is the main point of interface with the Zingg matching product.
*
Expand Down Expand Up @@ -88,15 +87,37 @@ public static void printBanner() {
String versionStr = "0.3";
LOG.info("");
LOG.info("********************************************************");
LOG.info("* Zingg AI *");
LOG.info("* (C) 2021 Zingg.AI *");
LOG.info("* Zingg AI *");
LOG.info("* (C) 2021 Zingg.AI *");
LOG.info("********************************************************");
LOG.info("");
LOG.info("using: Zingg v" + versionStr);
LOG.info("");
}


public static void printAnalyticsBanner(boolean collectMetrics) {
if(collectMetrics) {
LOG.info("");
LOG.info("**************************************************************************");
LOG.info("* ** Note about analytics collection by Zingg AI ** *");
LOG.info("* *");
LOG.info("* Please note that Zingg captures a few metrics about application's *");
LOG.info("* runtime parameters. However, no user's personal data or application *");
LOG.info("* data is captured. If you want to switch off this feature, please *");
LOG.info("* set the flag collectMetrics to false in config. For details, please *");
LOG.info("* refer to the Zingg docs (https://docs.zingg.ai/docs/analytics.html) *");
LOG.info("**************************************************************************");
LOG.info("");
}
else {
LOG.info("");
LOG.info("********************************************************");
LOG.info("* Zingg is not collecting any analytics data *");
LOG.info("********************************************************");
LOG.info("");
}
}

public static void main(String... args) {
printBanner();
Client client = null;
Expand All @@ -117,10 +138,12 @@ public static void main(String... args) {
else {
arguments = Arguments.createArgumentsFromJSONString(options.get(ClientOptions.CONF).value, phase);
}

printAnalyticsBanner(arguments.getCollectMetrics());

client = new Client(arguments, options);
client.init();
client.execute();
client.postMetrics();
LOG.warn("Zingg processing has completed");
}
catch(ZinggClientException e) {
Expand Down Expand Up @@ -177,7 +200,12 @@ public Arguments getArguments() {

public void execute() throws ZinggClientException {
zingg.execute();
}

public void postMetrics() throws ZinggClientException {
zingg.postMetrics();
}

public void setArguments(Arguments args) {
this.arguments = args;
}
Expand All @@ -189,8 +217,4 @@ public ClientOptions getOptions() {
public void setOptions(ClientOptions options) {
this.options = options;
}




}
}
2 changes: 2 additions & 0 deletions client/src/main/java/zingg/client/IZingg.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,6 @@ public void init(Arguments args, String license)

public String getName();

public void postMetrics();

}
7 changes: 6 additions & 1 deletion core/src/main/java/zingg/Matcher.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@

import zingg.client.ZinggClientException;
import zingg.client.ZinggOptions;
import zingg.util.Analytics;
import zingg.client.util.ColName;
import zingg.client.util.ColValues;
import zingg.util.Metric;
import zingg.client.util.Util;
import zingg.util.BlockingTreeUtil;
import zingg.util.DSUtil;
Expand Down Expand Up @@ -100,7 +102,10 @@ public void execute() throws ZinggClientException {
Dataset<Row> testData = getTestData();
testData = testData.repartition(args.getNumPartitions(), testData.col(ColName.ID_COL));
//testData = dropDuplicates(testData);
LOG.info("Read " + testData.count());
long count = testData.count();
LOG.info("Read " + count);
Analytics.track(Metric.DATA_COUNT, count, args.getCollectMetrics());

Dataset<Row> blocked = getBlocked(testData);
LOG.info("Blocked ");
/*blocked = blocked.cache();
Expand Down
4 changes: 4 additions & 0 deletions core/src/main/java/zingg/Trainer.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,10 @@

import zingg.client.ZinggClientException;
import zingg.client.ZinggOptions;
import zingg.util.Analytics;
import zingg.client.util.ColName;
import zingg.client.util.ColValues;
import zingg.util.Metric;
import zingg.client.util.Util;
import zingg.util.BlockingTreeUtil;
import zingg.util.DSUtil;
Expand Down Expand Up @@ -53,6 +55,8 @@ public void execute() throws ZinggClientException {
Model model = ModelUtil.createModel(positives, negatives, new Model(this.featurers), spark);
model.save(args.getModel());
LOG.info("Learnt similarity rules and saved output at " + args.getZinggDir());
Analytics.track(Metric.TRAINING_MATCHES, Metric.approxCount(positives), args.getCollectMetrics());
Analytics.track(Metric.TRAINING_NONMATCHES, Metric.approxCount(negatives), args.getCollectMetrics());
LOG.info("Finished Learning phase");
} catch (Exception e) {
e.printStackTrace();
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/java/zingg/TrainingDataFinder.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public void execute() throws ZinggClientException {
Dataset<Row> data = PipeUtil.read(spark, true, true, args.getData());
LOG.warn("Read input data " + data.count());
//create 20 pos pairs

Dataset<Row> posPairs = null;
Dataset<Row> negPairs = null;
Dataset<Row> trFile = getTraining();
Expand Down
19 changes: 18 additions & 1 deletion core/src/main/java/zingg/ZinggBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
import java.util.stream.Collectors;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
Expand All @@ -16,13 +17,16 @@
import zingg.client.MatchType;
import zingg.client.ZinggClientException;
import zingg.client.ZinggOptions;
import zingg.util.Analytics;
import zingg.util.DSUtil;
import zingg.client.util.ListMap;

import zingg.util.Metric;
import zingg.feature.Feature;
import zingg.feature.FeatureFactory;
import zingg.hash.HashFunction;

import zingg.util.HashUtil;
import zingg.util.PipeUtil;

public abstract class ZinggBase implements Serializable, IZingg {

Expand All @@ -34,13 +38,15 @@ public abstract class ZinggBase implements Serializable, IZingg {
protected ZinggOptions zinggOptions;
protected ListMap<DataType, HashFunction> hashFunctions;
protected Map<FieldDefinition, Feature> featurers;
protected long startTime;
public static final String hashFunctionFile = "hashFunctions.json";

public static final Log LOG = LogFactory.getLog(ZinggBase.class);

@Override
public void init(Arguments args, String license)
throws ZinggClientException {
startTime = System.currentTimeMillis();
this.args = args;
try{
spark = SparkSession
Expand Down Expand Up @@ -106,6 +112,17 @@ public void copyContext(ZinggBase b) {
this.hashFunctions = b.hashFunctions;
}

public void postMetrics() {
boolean collectMetrics = args.getCollectMetrics();
Analytics.track(Metric.EXEC_TIME, (System.currentTimeMillis() - startTime) / 1000, collectMetrics);
Analytics.track(Metric.TOTAL_FIELDS_COUNT, args.getFieldDefinition().size(), collectMetrics);
Analytics.track(Metric.MATCH_FIELDS_COUNT, DSUtil.getFieldDefinitionFiltered(args, MatchType.DONT_USE).size(),
collectMetrics);
Analytics.track(Metric.DATA_FORMAT, PipeUtil.getPipesAsString(args.getData()), collectMetrics);
Analytics.track(Metric.OUTPUT_FORMAT, PipeUtil.getPipesAsString(args.getOutput()), collectMetrics);

Analytics.postEvent(zinggOptions.getValue(), collectMetrics);
}

public Arguments getArgs() {
return this.args;
Expand Down
139 changes: 139 additions & 0 deletions core/src/main/java/zingg/util/Analytics.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
package zingg.util;

import java.io.BufferedReader;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.HttpURLConnection;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL;
import java.util.HashMap;
import java.util.Map;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.http.client.utils.URIBuilder;

public class Analytics {

private static final String HOST = "www.google-analytics.com";
private static final String PATH = "/mp/collect";
//private static final String PATH = "/debug/mp/collect"; //set the path to validate the POST request
private static final String API_SECRET = "LWZHm7tASgOl2VHpy_LR8A";
private static final String API_VERSION = "2";
private static final String MEASUREMENT_ID = "G-VFQXB5JFC1";

private static Map<String, String> metrics;
public static final Log LOG = LogFactory.getLog(Analytics.class);

private static Map<String, String> getMetrics() {
if(metrics == null) {
metrics = new HashMap<String, String>();
}
return metrics;
}

public static void track(String metricName, String metricValue, boolean collectMetrics) {
if (collectMetrics) {
getMetrics().put(metricName, metricValue);
}
}

public static void track(String metricName, double metricValue, boolean collectMetrics) {
track(metricName, String.valueOf(metricValue), collectMetrics);
}

public static void postEvent(String phase, boolean collectMetrics) {
if (collectMetrics == false) {
return;
}
ObjectMapper mapper = new ObjectMapper();
ObjectNode rootNode = mapper.createObjectNode();

rootNode.put("client_id", "555");

ObjectNode eventNode = mapper.createObjectNode();
eventNode.put("name", phase);

ObjectNode paramNode = mapper.createObjectNode();
for (Map.Entry<String, String> entry : metrics.entrySet()) {
paramNode.put(entry.getKey(), entry.getValue());
}
eventNode.set("params", paramNode);

ArrayNode eventList;
eventList = mapper.createArrayNode();
eventList.add(eventNode);
rootNode.set("events", eventList);

Analytics.sendEvents(rootNode.toString());
}

private static void sendEvents(String param) {
URIBuilder builder = new URIBuilder();
builder
.setScheme("https")
.setHost(HOST)
.setPath(PATH)
.addParameter("api_secret", API_SECRET)
.addParameter("v", API_VERSION)
.addParameter("measurement_id", MEASUREMENT_ID); // Tracking ID

URI uri = null;
try {
uri = builder.build();
URL url = uri.toURL();
String response = executePostRequest(url.toString(), param);
} catch (IOException | URISyntaxException e) {
e.printStackTrace();
}
LOG.debug("Event tracked.");
}

private static String executePostRequest(String targetURL, String urlParameters) {
HttpURLConnection connection = null;
try {
//Create connection
URL url = new URL(targetURL);
connection = (HttpURLConnection) url.openConnection();
connection.setRequestMethod("POST");
connection.setRequestProperty("Content-Type",
"application/x-www-form-urlencoded");
connection.setRequestProperty("Content-Length",
Integer.toString(urlParameters.getBytes().length));
connection.setRequestProperty("Content-Language", "en-US");
connection.setUseCaches(false);
connection.setDoOutput(true);

//Send request
DataOutputStream os = new DataOutputStream (
connection.getOutputStream());
os.writeBytes(urlParameters);
os.close();

//Get Response
InputStream is = connection.getInputStream();
BufferedReader reader = new BufferedReader(new InputStreamReader(is));
StringBuffer response = new StringBuffer();
String line;
while ((line = reader.readLine()) != null) {
response.append(line);
response.append('\r');
}
reader.close();
return response.toString();
} catch (Exception e) {
return null;
} finally {
if (connection != null) {
connection.disconnect();
}
}
}
}
10 changes: 8 additions & 2 deletions core/src/main/java/zingg/util/DSUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@
import zingg.client.util.ColValues;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
Expand Down Expand Up @@ -241,5 +241,11 @@ private static Dataset<Row> getTraining(SparkSession spark, Arguments args, Pipe
if (trFile == null) LOG.warn("No training data found");
return trFile;
}


public static List<FieldDefinition> getFieldDefinitionFiltered(Arguments args, MatchType type) {
return args.getFieldDefinition()
.stream()
.filter(f -> !(f.getMatchType() == null || f.getMatchType().equals(MatchType.DONT_USE)))
.collect(Collectors.toList());
}
}
Loading

0 comments on commit c244f12

Please sign in to comment.