Skip to content

Commit

Permalink
Verify blocking (#1007)
Browse files Browse the repository at this point in the history
* fixing verifyblocking

* working changes

* fixing location name in spark

* junits

---------

Co-authored-by: sania-16 <[email protected]>
Co-authored-by: Sania Goyal <[email protected]>
  • Loading branch information
3 people authored Jan 17, 2025
1 parent a5f24dc commit 4a1eaa9
Show file tree
Hide file tree
Showing 8 changed files with 52 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public class Pipe<D,R,C> implements Serializable{ // St:StructType, Sv:SaveMode
String name;
String format;
String preprocessors;
Map<String, String> props = new HashMap<String, String>();
protected Map<String, String> props = new HashMap<String, String>();
int id;
protected ZFrame<D, R, C> dataset;
String schema;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ public void execute() throws ZinggClientException {
LOG.info("Blocked");

ZFrame<D,R,C> blockCounts = blocked.select(ColName.HASH_COL).groupByCount(ColName.HASH_COL, ColName.HASH_COUNTS_COL).sortDescending(ColName.HASH_COUNTS_COL);

blockCounts = blockCounts.cache();

getPipeUtil().write(blockCounts,getVerifyBlockingPipeUtil().getCountsPipe(args));

ZFrame<D,R,C> blockTopRec = blockCounts.select(ColName.HASH_COL,ColName.HASH_COUNTS_COL).limit(noOfBlocks);
Expand Down
16 changes: 14 additions & 2 deletions spark/client/src/main/java/zingg/spark/client/pipe/SparkPipe.java
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
package zingg.spark.client.pipe;

import java.io.IOException;

import org.apache.spark.sql.Column;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;

import org.apache.spark.sql.Dataset;

import zingg.common.client.pipe.FilePipe;
import zingg.common.client.pipe.Pipe;


Expand Down Expand Up @@ -53,5 +53,17 @@ public void setOverwriteMode() {
setMode(SaveMode.Overwrite.toString());
}

public static String massageLocation(String name){
name = name.replaceAll("-", "");
name = name.replaceAll("@","");
name = name.replaceAll(",","");
name = name.replaceAll(":","");
return name;
}

public void setLocation(String fileName){
this.props.put(FilePipe.LOCATION, massageLocation(fileName));
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -151,8 +151,5 @@ public Pipe<Dataset<Row>, Row, Column> getStopWordsPipe(String fileName) {
p.setOverwriteMode();
return p;
}




}
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,11 @@
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;

import zingg.common.client.ZFrame;
//import zingg.common.client.pipe.InMemoryPipe;
import zingg.common.client.pipe.Pipe;
import zingg.common.client.util.DFReader;
import zingg.common.client.util.DFWriter;
import zingg.common.client.util.IModelHelper;
import zingg.common.client.util.PipeUtil;
import zingg.spark.client.SparkFrame;
import org.apache.spark.sql.SparkSession;
Expand Down
34 changes: 34 additions & 0 deletions spark/client/src/test/java/zingg/spark/client/TestSparkPipe.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package zingg.spark.client;

import static org.junit.jupiter.api.Assertions.assertEquals;

import java.util.HashMap;
import java.util.Map;

import org.junit.jupiter.api.Test;

import zingg.spark.client.pipe.SparkPipe;

public class TestSparkPipe {

@Test
public void testMassageLocation(){
String input = "zin:gg/spar,k/client/blo@cks/-1234";
String expected = "zingg/spark/client/blocks/1234";
assertEquals(expected, SparkPipe.massageLocation(input));
}

@Test
public void testSetLocation(){
String input = "zin:gg/spar,k/client/blo@cks/-1234";
String exp = "zingg/spark/client/blocks/1234";
Map<String, String> expProps = new HashMap<String, String>();
expProps.put("location", exp);
SparkPipe p = new SparkPipe();
p.setLocation(input);
Map<String, String> props = new HashMap<String, String>();
props = p.getProps();
assertEquals(props.get("location"), expProps.get("location"));
}

}
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
package zingg.spark.core.executor;

import org.apache.spark.internal.config.R;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

import zingg.common.client.IArguments;
import zingg.common.client.pipe.FilePipe;
import zingg.common.client.pipe.Pipe;
import zingg.common.client.util.IModelHelper;
import zingg.common.client.util.PipeUtilBase;
Expand All @@ -24,7 +22,7 @@ public SparkVerifyBlockingPipes(PipeUtilBase<SparkSession,Dataset<Row>,Row,Colum
public Pipe<Dataset<Row>,Row,Column> getPipeForVerifyBlockingLocation(IArguments args, String type){
SparkPipe p = new SparkPipe();
p.setFormat(Pipe.FORMAT_PARQUET);
p.setProp(FilePipe.LOCATION, getName(args,timestamp,type));
p.setLocation(getName(args,timestamp,type));
p.setOverwriteMode();
return p;
}
Expand Down
3 changes: 0 additions & 3 deletions spark/core/src/test/java/zingg/spark/core/util/TestUtil.java
Original file line number Diff line number Diff line change
@@ -1,13 +1,10 @@
package zingg.spark.core.util;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;

import java.util.ArrayList;
import java.util.List;

import org.apache.spark.sql.types.DataType;
import org.junit.jupiter.api.Test;

import zingg.common.client.util.*;
Expand Down

0 comments on commit 4a1eaa9

Please sign in to comment.