Skip to content

Commit

Permalink
Merge pull request #676 from zinggAI/pipeutilchanges
Browse files Browse the repository at this point in the history
refactor getting/writing cluster data
  • Loading branch information
sonalgoyal authored Sep 29, 2023
2 parents 4443be5 + b8448a6 commit 47bf7da
Showing 1 changed file with 44 additions and 39 deletions.
83 changes: 44 additions & 39 deletions common/core/src/main/java/zingg/common/core/executor/Matcher.java
Original file line number Diff line number Diff line change
Expand Up @@ -238,45 +238,7 @@ public void writeOutput( ZFrame<D,R,C> blocked, ZFrame<D,R,C> dupesActual) th

//all clusters consolidated in one place
if (args.getOutput() != null) {
//-1 is initial suggestion, 1 is add, 0 is deletion, 2 is unsure
/*blocked = blocked.drop(ColName.HASH_COL);
blocked = blocked.drop(ColName.SOURCE_COL);
blocked = blocked.cache();
*/

dupesActual = dupesActual.cache();
System.out.println("dupes ------------");
if (LOG.isDebugEnabled()) {
dupesActual.show();
}
ZFrame<D,R,C>graph = getGraphUtil().buildGraph(blocked, dupesActual).cache();
//graph.toJavaRDD().saveAsTextFile("/tmp/zgraph");
System.out.println("graph ------------");
if (LOG.isDebugEnabled()) {
graph.show();
}
//write score
ZFrame<D,R,C>score = getMinMaxScores(dupesActual, graph).cache();
//score.toJavaRDD().coalesce(1).saveAsTextFile("/tmp/zallscoresAvg");
graph = graph.repartition(args.getNumPartitions(), graph.col(ColName.ID_COL)).cache();
if (LOG.isDebugEnabled()) {
score.show();
}
ZFrame<D, R, C> graphWithScores = getGraphWithScores(graph, score);
//graphWithScores.toJavaRDD().saveAsTextFile("/tmp/zgraphWScores");
graphWithScores = graphWithScores.drop(ColName.HASH_COL);
graphWithScores = graphWithScores.drop(ColName.COL_PREFIX + ColName.ID_COL);
graphWithScores = graphWithScores.drop(ColName.ID_COL);
graphWithScores = graphWithScores.drop(ColName.SOURCE_COL);
/*String[] cols = graphWithScores.columns();
List<Column> columns = new ArrayList<Column>();
//columns.add(graphWithScores.col(ColName.CLUSTER_COLUMN));
//go only upto the last col, which is cluster col
for (int i=0; i < cols.length - 1; ++i) {
columns.add(graphWithScores.col(cols[i]));
}
graphWithScores = getDSUtil().select(graphWithScores, columns);
*/
ZFrame<D, R, C> graphWithScores = getOutput(blocked, dupesActual);
getPipeUtil().write(graphWithScores, args, args.getOutput());
}
}
Expand All @@ -286,6 +248,49 @@ public void writeOutput( ZFrame<D,R,C> blocked, ZFrame<D,R,C> dupesActual) th

}

protected ZFrame<D, R, C> getOutput(ZFrame<D, R, C> blocked, ZFrame<D, R, C> dupesActual) throws Exception {
//-1 is initial suggestion, 1 is add, 0 is deletion, 2 is unsure
/*blocked = blocked.drop(ColName.HASH_COL);
blocked = blocked.drop(ColName.SOURCE_COL);
blocked = blocked.cache();
*/

dupesActual = dupesActual.cache();
System.out.println("dupes ------------");
if (LOG.isDebugEnabled()) {
dupesActual.show();
}
ZFrame<D,R,C>graph = getGraphUtil().buildGraph(blocked, dupesActual).cache();
//graph.toJavaRDD().saveAsTextFile("/tmp/zgraph");
System.out.println("graph ------------");
if (LOG.isDebugEnabled()) {
graph.show();
}
//write score
ZFrame<D,R,C>score = getMinMaxScores(dupesActual, graph).cache();
//score.toJavaRDD().coalesce(1).saveAsTextFile("/tmp/zallscoresAvg");
graph = graph.repartition(args.getNumPartitions(), graph.col(ColName.ID_COL)).cache();
if (LOG.isDebugEnabled()) {
score.show();
}
ZFrame<D, R, C> graphWithScores = getGraphWithScores(graph, score);
//graphWithScores.toJavaRDD().saveAsTextFile("/tmp/zgraphWScores");
graphWithScores = graphWithScores.drop(ColName.HASH_COL);
graphWithScores = graphWithScores.drop(ColName.COL_PREFIX + ColName.ID_COL);
graphWithScores = graphWithScores.drop(ColName.ID_COL);
graphWithScores = graphWithScores.drop(ColName.SOURCE_COL);
/*String[] cols = graphWithScores.columns();
List<Column> columns = new ArrayList<Column>();
//columns.add(graphWithScores.col(ColName.CLUSTER_COLUMN));
//go only upto the last col, which is cluster col
for (int i=0; i < cols.length - 1; ++i) {
columns.add(graphWithScores.col(cols[i]));
}
graphWithScores = getDSUtil().select(graphWithScores, columns);
*/
return graphWithScores;
}

protected ZFrame<D, R, C> getGraphWithScores(ZFrame<D, R, C> graph, ZFrame<D, R, C> score) {
ZFrame<D,R,C>graphWithScores = getDSUtil().joinZColFirst(
score, graph, ColName.ID_COL, false).cache();
Expand Down

0 comments on commit 47bf7da

Please sign in to comment.