Skip to content

Commit

Permalink
Merge pull request #609 from zinggAI/issue607
Browse files Browse the repository at this point in the history
issue #607 refactor matcher
  • Loading branch information
sonalgoyal authored Jun 14, 2023
2 parents 8dadbc4 + 40fa2d0 commit 7ff2bbb
Show file tree
Hide file tree
Showing 4 changed files with 22 additions and 3 deletions.
2 changes: 2 additions & 0 deletions common/client/src/main/java/zingg/common/client/ZFrame.java
Original file line number Diff line number Diff line change
Expand Up @@ -137,4 +137,6 @@ public interface ZFrame<D, R, C> {

public FieldData[] fields();

public Object getMaxVal(String colName);

}
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,15 @@ public Matcher() {
}

protected ZFrame<D,R,C> getTestData() throws ZinggClientException{
ZFrame<D,R,C> data = getPipeUtil().read(true, args.getNumPartitions(), true, args.getData());
ZFrame<D,R,C> data = getPipeUtil().read(true, true, args.getNumPartitions(), true, args.getData());
return data;
}

protected ZFrame<D, R, C> getFieldDefColumnsDS(ZFrame<D, R, C> testDataOriginal) {
return getDSUtil().getFieldDefColumnsDS(testDataOriginal, args, true);
}


protected ZFrame<D,R,C> getBlocked( ZFrame<D,R,C> testData) throws Exception, ZinggClientException{
LOG.debug("Blocking model file location is " + args.getBlockFile());
Tree<Canopy<R>> tree = getBlockingTreeUtil().readBlockingTree(args);
Expand Down Expand Up @@ -81,11 +86,12 @@ protected ZFrame<D,R,C> getBlocks(ZFrame<D,R,C>blocked, ZFrame<D,R,C>bAll) throw
return blocked.select(ColName.ID_COL, ColName.HASH_COL);
}

@Override
public void execute() throws ZinggClientException {
try {
// read input, filter, remove self joins
ZFrame<D,R,C> testDataOriginal = getTestData();
testDataOriginal = getDSUtil().getFieldDefColumnsDS(testDataOriginal, args, true);
testDataOriginal = getFieldDefColumnsDS(testDataOriginal);
ZFrame<D,R,C> testData = getStopWords().preprocessForStopWords(testDataOriginal);
testData = testData.repartition(args.getNumPartitions(), testData.col(ColName.ID_COL));
//testData = dropDuplicates(testData);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -348,5 +348,11 @@ public FieldData[] fields() {
}
return fieldDataArr;
}


@Override
public Object getMaxVal(String colName) {
Row r = df.agg(functions.max(colName)).head();
return r.get(0);
}

}
5 changes: 5 additions & 0 deletions spark/client/src/test/java/zingg/client/TestSparkFrame.java
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,11 @@ public void testWithColumnforAnotherColumn() {
assertTrueCheckingExceptOutput(sf2, df.withColumn(newCol, col(oldCol)), "SparkFrame.withColumn(c, Column) output is not as expected");
}

@Test
public void testGetMaxVal(){
SparkFrame zScoreDF = getZScoreDF();
assertEquals(400,zScoreDF.getMaxVal(ColName.CLUSTER_COLUMN));
}

@Test
public void testGroupByMinMax(){
Expand Down

0 comments on commit 7ff2bbb

Please sign in to comment.