From 5be3a88cb67d179832dd41448948f8e79b0a6255 Mon Sep 17 00:00:00 2001 From: Vikas Gupta Date: Wed, 27 Sep 2023 16:33:33 +0530 Subject: [PATCH 1/2] refactor writing cluster data --- .../src/main/java/zingg/common/core/executor/Matcher.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/common/core/src/main/java/zingg/common/core/executor/Matcher.java b/common/core/src/main/java/zingg/common/core/executor/Matcher.java index 2aae7fea2..b1b627a64 100644 --- a/common/core/src/main/java/zingg/common/core/executor/Matcher.java +++ b/common/core/src/main/java/zingg/common/core/executor/Matcher.java @@ -277,7 +277,7 @@ public void writeOutput( ZFrame blocked, ZFrame dupesActual) th } graphWithScores = getDSUtil().select(graphWithScores, columns); */ - getPipeUtil().write(graphWithScores, args, args.getOutput()); + writeClusters(graphWithScores); } } catch(Exception e) { @@ -286,6 +286,10 @@ public void writeOutput( ZFrame blocked, ZFrame dupesActual) th } + protected void writeClusters(ZFrame graphWithScores) throws ZinggClientException { + getPipeUtil().write(graphWithScores, args, args.getOutput()); + } + protected ZFrame getGraphWithScores(ZFrame graph, ZFrame score) { ZFramegraphWithScores = getDSUtil().joinZColFirst( score, graph, ColName.ID_COL, false).cache(); From b8448a629426ece57a021b83cd0157fcac8d2554 Mon Sep 17 00:00:00 2001 From: Vikas Gupta Date: Thu, 28 Sep 2023 12:26:42 +0530 Subject: [PATCH 2/2] refactor --- .../zingg/common/core/executor/Matcher.java | 85 ++++++++++--------- 1 file changed, 43 insertions(+), 42 deletions(-) diff --git a/common/core/src/main/java/zingg/common/core/executor/Matcher.java b/common/core/src/main/java/zingg/common/core/executor/Matcher.java index b1b627a64..4fc351c33 100644 --- a/common/core/src/main/java/zingg/common/core/executor/Matcher.java +++ b/common/core/src/main/java/zingg/common/core/executor/Matcher.java @@ -238,46 +238,8 @@ public void writeOutput( ZFrame blocked, ZFrame dupesActual) th //all clusters consolidated in one place if (args.getOutput() != null) { - //-1 is initial suggestion, 1 is add, 0 is deletion, 2 is unsure - /*blocked = blocked.drop(ColName.HASH_COL); - blocked = blocked.drop(ColName.SOURCE_COL); - blocked = blocked.cache(); - */ - - dupesActual = dupesActual.cache(); - System.out.println("dupes ------------"); - if (LOG.isDebugEnabled()) { - dupesActual.show(); - } - ZFramegraph = getGraphUtil().buildGraph(blocked, dupesActual).cache(); - //graph.toJavaRDD().saveAsTextFile("/tmp/zgraph"); - System.out.println("graph ------------"); - if (LOG.isDebugEnabled()) { - graph.show(); - } - //write score - ZFramescore = getMinMaxScores(dupesActual, graph).cache(); - //score.toJavaRDD().coalesce(1).saveAsTextFile("/tmp/zallscoresAvg"); - graph = graph.repartition(args.getNumPartitions(), graph.col(ColName.ID_COL)).cache(); - if (LOG.isDebugEnabled()) { - score.show(); - } - ZFrame graphWithScores = getGraphWithScores(graph, score); - //graphWithScores.toJavaRDD().saveAsTextFile("/tmp/zgraphWScores"); - graphWithScores = graphWithScores.drop(ColName.HASH_COL); - graphWithScores = graphWithScores.drop(ColName.COL_PREFIX + ColName.ID_COL); - graphWithScores = graphWithScores.drop(ColName.ID_COL); - graphWithScores = graphWithScores.drop(ColName.SOURCE_COL); - /*String[] cols = graphWithScores.columns(); - List columns = new ArrayList(); - //columns.add(graphWithScores.col(ColName.CLUSTER_COLUMN)); - //go only upto the last col, which is cluster col - for (int i=0; i < cols.length - 1; ++i) { - columns.add(graphWithScores.col(cols[i])); - } - graphWithScores = getDSUtil().select(graphWithScores, columns); - */ - writeClusters(graphWithScores); + ZFrame graphWithScores = getOutput(blocked, dupesActual); + getPipeUtil().write(graphWithScores, args, args.getOutput()); } } catch(Exception e) { @@ -286,8 +248,47 @@ public void writeOutput( ZFrame blocked, ZFrame dupesActual) th } - protected void writeClusters(ZFrame graphWithScores) throws ZinggClientException { - getPipeUtil().write(graphWithScores, args, args.getOutput()); + protected ZFrame getOutput(ZFrame blocked, ZFrame dupesActual) throws Exception { + //-1 is initial suggestion, 1 is add, 0 is deletion, 2 is unsure + /*blocked = blocked.drop(ColName.HASH_COL); + blocked = blocked.drop(ColName.SOURCE_COL); + blocked = blocked.cache(); + */ + + dupesActual = dupesActual.cache(); + System.out.println("dupes ------------"); + if (LOG.isDebugEnabled()) { + dupesActual.show(); + } + ZFramegraph = getGraphUtil().buildGraph(blocked, dupesActual).cache(); + //graph.toJavaRDD().saveAsTextFile("/tmp/zgraph"); + System.out.println("graph ------------"); + if (LOG.isDebugEnabled()) { + graph.show(); + } + //write score + ZFramescore = getMinMaxScores(dupesActual, graph).cache(); + //score.toJavaRDD().coalesce(1).saveAsTextFile("/tmp/zallscoresAvg"); + graph = graph.repartition(args.getNumPartitions(), graph.col(ColName.ID_COL)).cache(); + if (LOG.isDebugEnabled()) { + score.show(); + } + ZFrame graphWithScores = getGraphWithScores(graph, score); + //graphWithScores.toJavaRDD().saveAsTextFile("/tmp/zgraphWScores"); + graphWithScores = graphWithScores.drop(ColName.HASH_COL); + graphWithScores = graphWithScores.drop(ColName.COL_PREFIX + ColName.ID_COL); + graphWithScores = graphWithScores.drop(ColName.ID_COL); + graphWithScores = graphWithScores.drop(ColName.SOURCE_COL); + /*String[] cols = graphWithScores.columns(); + List columns = new ArrayList(); + //columns.add(graphWithScores.col(ColName.CLUSTER_COLUMN)); + //go only upto the last col, which is cluster col + for (int i=0; i < cols.length - 1; ++i) { + columns.add(graphWithScores.col(cols[i])); + } + graphWithScores = getDSUtil().select(graphWithScores, columns); + */ + return graphWithScores; } protected ZFrame getGraphWithScores(ZFrame graph, ZFrame score) {