Skip to content

Commit

Permalink
added init so that preprocs can be prepared before invocation
Browse files Browse the repository at this point in the history
  • Loading branch information
sonalgoyal committed Jan 20, 2025
1 parent 53ce7b7 commit 469c914
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ public interface IPreprocessor<S,D,R,C,T> extends Serializable{

public void setContext(IContext<S,D,R,C,T> c);

public void init();

public IContext<S,D,R,C,T> getContext();

public void setFieldDefinition(FieldDefinition fd);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ default ZFrame<D,R,C> preprocess(ZFrame<D,R,C> df) throws ZinggClientException {
IPreprocessor ip = getPreprocMap().get(o).getDeclaredConstructor().newInstance();
//setting context and field defn
ip.setContext(getContext());
ip.init();;
ip.setFieldDefinition(def);
dfp = ip.preprocess(dfp);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,31 +35,34 @@ public SparkStopWordsRemover(){

public SparkStopWordsRemover(IContext<SparkSession, Dataset<Row>, Row, Column,DataType> context) {
super(context);
this.udfName = registerUDF();
registerUDF();
}

public SparkStopWordsRemover(IContext<SparkSession, Dataset<Row>, Row, Column,DataType> context, FieldDefinition fd) {
super(context,fd);
this.udfName = registerUDF();
registerUDF();
}

@Override
protected ZFrame<Dataset<Row>, Row, Column> removeStopWordsFromDF(ZFrame<Dataset<Row>, Row, Column> ds,
String fieldName, String pattern) {
this.udfName = registerUDF();
Dataset<Row> dfAfterRemoval = ds.df().withColumn(fieldName,callUDF(udfName, ds.df().col(fieldName),lit(pattern)));
Dataset<Row> dfAfterRemoval = ds.df().withColumn(fieldName,callUDF(udfName, ds.df().col(fieldName),lit(pattern)));
return new SparkFrame(dfAfterRemoval);
}

protected String registerUDF() {
protected void registerUDF() {
RemoveStopWordsUDF removeStopWordsUDF = new RemoveStopWordsUDF();
// Each field will have different pattern
String udfName = removeStopWordsUDF.getName();
this.udfName = removeStopWordsUDF.getName();
// register the UDF
SparkSession zSession = getContext().getSession();

SparkFnRegistrar.registerUDF2(zSession, udfName, removeStopWordsUDF, DataTypes.StringType);
return udfName;
}

@Override
public void init() {
registerUDF();
}

}

0 comments on commit 469c914

Please sign in to comment.