Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Csvpipe #402

Merged
merged 5 commits into from
Jul 18, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
package zingg.similarity.function;

import static org.junit.jupiter.api.Assertions.assertEquals;

import org.junit.jupiter.api.Test;

public class TestJaccSimFunction {

@Test
public void testFirstStringNull() {
StringSimilarityDistanceFunction strDistanceFn = new JaccSimFunction("test");
assertEquals(1d, strDistanceFn.call(null, "text 2"));
}

@Test
public void testFirstStringEmpty() {
StringSimilarityDistanceFunction strDistanceFn = new JaccSimFunction("test");
assertEquals(1d, strDistanceFn.call("", "text 2"));
}

@Test
public void testSecondStringNull() {
StringSimilarityDistanceFunction strDistanceFn = new JaccSimFunction("test");
assertEquals(1d, strDistanceFn.call("text 1", null));
}

@Test
public void testSecondStringEmpty() {
StringSimilarityDistanceFunction strDistanceFn = new JaccSimFunction("test");
assertEquals(1d, strDistanceFn.call("text 1", ""));
}

@Test
public void testBothEmpty() {
StringSimilarityDistanceFunction strDistanceFn = new JaccSimFunction("test");
assertEquals(1d, strDistanceFn.call("", ""));
}

@Test
public void testBothNull() {
StringSimilarityDistanceFunction strDistanceFn = new JaccSimFunction("test");
assertEquals(1d, strDistanceFn.call(null, null));
}

@Test
public void testBothSame() {
StringSimilarityDistanceFunction strDistanceFn = new JaccSimFunction("test");
assertEquals(1d, strDistanceFn.call("sample text", "sample text"));
}

@Test
public void testBothSameButCaseDifferent() {
StringSimilarityDistanceFunction strDistanceFn = new JaccSimFunction("test");
assertEquals(1d, strDistanceFn.call("sample text", "sAmPle TeXt"));
}

@Test
public void testBothNotEmptyDifferent() {
StringSimilarityDistanceFunction strDistanceFn = new JaccSimFunction("test");
Double score = strDistanceFn.call("sample text first", "sample text second");
assertEquals(0.5d, score);
}

@Test
public void testSpecificInputsDifferent() {
StringSimilarityDistanceFunction strDistanceFn = new JaccSimFunction("test");
String first = "sonicwall client/server ";
String second = "sonicwall businesses ";
Double score = strDistanceFn.call(first, second);
assertEquals(0.25d, score);
}

@Test
public void testInputsSameWithSlashes() {
StringSimilarityDistanceFunction strDistanceFn = new JaccSimFunction("test");
String first = "sample/string";
String second = "sample/string";
Double score = strDistanceFn.call(first, second);
assertEquals(1d, score);
}

@Test
public void testInputsDifferentWithSlashesAndColons() {
StringSimilarityDistanceFunction strDistanceFn = new JaccSimFunction("test");
String first = "slashes/and:colons.,";
String second = "slashes and colons";
Double score = strDistanceFn.call(first, second);
assertEquals(1d, score);
}
}
12 changes: 4 additions & 8 deletions examples/amazon-google/AmazonGoogle.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,21 +23,17 @@
#in that case, replace df with input df
dfAmazon = spark.read.format("csv").schema("id string, title string, description string, manufacturer string, price double ").load("examples/amazon-google/Amazon.csv")
dfSchemaAmazon = str(dfAmazon.schema.json())
inputPipeAmazon = CsvPipe("testAmazon")
inputPipeAmazon.setLocation("examples/amazon-google/Amazon.csv")
inputPipeAmazon.setSchema(dfSchemaAmazon)
inputPipeAmazon = CsvPipe("testAmazon", dfSchemaAmazon, "examples/amazon-google/Amazon.csv")

dfGoogle = spark.read.format("csv").schema("id string, title string, description string, manufacturer string, price double ").load("examples/amazon-google/GoogleProducts.csv")
dfSchemaGoogle = str(dfGoogle.schema.json())
inputPipeGoogle = CsvPipe("testGoogle")
inputPipeGoogle.setLocation("examples/amazon-google/GoogleProducts.csv")
inputPipeGoogle.setSchema(dfSchemaGoogle)
inputPipeGoogle = CsvPipe("testGoogle", dfSchemaGoogle, "examples/amazon-google/GoogleProducts.csv")

args.setData(inputPipeAmazon,inputPipeGoogle)

#setting outputpipe in 'args'
outputPipe = CsvPipe("resultAmazonGoogle")
outputPipe.setLocation("/tmp/AwsGoogleOutput")
outputPipe = CsvPipe("resultAmazonGoogle", None, "/tmp/AwsGoogleOutput")

args.setOutput(outputPipe)

options = ClientOptions()
Expand Down
9 changes: 2 additions & 7 deletions examples/febrl/FebrlExample.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,18 +28,13 @@
#below line should not be required if you are reading from in memory dataset
#in that case, replace df with input df
df = spark.read.format("csv").schema("id string, fname string, lname string, stNo string, add1 string, add2 string, city string, state string, areacode string, dob string, ssn string").load("examples/febrl/test.csv")

inputPipe = CsvPipe("test")
inputPipe.setLocation("examples/febrl/test.csv")

dfSchema = str(df.schema.json())
inputPipe.setSchema(dfSchema)
inputPipe = CsvPipe("test", dfSchema, "examples/febrl/test.csv")

args.setData(inputPipe)

#setting outputpipe in 'args'
outputPipe = CsvPipe("csv")
outputPipe.setLocation("/tmp")
outputPipe = CsvPipe("csv", None, "/tmp")

args.setOutput(outputPipe)

Expand Down
13 changes: 4 additions & 9 deletions examples/iTunes-amazon/iTunesAmazon.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,22 +28,17 @@
#below line should not be required if you are reading from in memory dataset
#in that case, replace df with input df
dfiTunes = spark.read.format("csv").schema("id string, Song_Name string, Artist_Name string, Album_Name string, Genre string, Price double, CopyRight string, Time string, Released string").load("examples/iTunes-amazon/iTunesMusic.csv")
dfSchema = str(dfiTunes.schema.json())
inputPipeiTunes = CsvPipe("testiTunes")
inputPipeiTunes.setLocation("examples/iTunes-amazon/iTunesMusic.csv")
inputPipeiTunes.setSchema(dfSchema)
dfSchemaiTunes = str(dfiTunes.schema.json())
inputPipeiTunes = CsvPipe("testiTunes", dfSchemaiTunes, "examples/iTunes-amazon/iTunesMusic.csv")

dfAmazon = spark.read.format("csv").schema("id string, Song_Name string, Artist_Name string, Album_Name string, Genre string, Price double, CopyRight string, Time string, Released string").load("examples/iTunes-amazon/AmazonMusic.csv")
dfSchemaAmazon = str(dfAmazon.schema.json())
inputPipeAmazon = CsvPipe("testAmazon")
inputPipeAmazon.setLocation("examples/iTunes-amazon/AmazonMusic.csv")
inputPipeAmazon.setSchema(dfSchemaAmazon)
inputPipeAmazon = CsvPipe("testAmazon", dfSchemaAmazon, "examples/iTunes-amazon/AmazonMusic.csv")

args.setData(inputPipeiTunes,inputPipeAmazon)

#setting outputpipe in 'args'
outputPipe = CsvPipe("iTunesAmazonresult")
outputPipe.setLocation("/tmp/iTunesAmazonOutput")
outputPipe = CsvPipe("iTunesAmazonresult", None, "/tmp/iTunesAmazonOutput")

args.setOutput(outputPipe)

Expand Down
10 changes: 4 additions & 6 deletions examples/ncVoters5M/ncVoters.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,14 @@
#below line should not be required if you are reading from in memory dataset
#in that case, replace df with input df
df = spark.read.format("csv").schema("recid string, givenname string, surname string, suburb string, postcode double ").load("examples/ncVoters5M/5Party-ocp20/")
dfSchemaA = str(df.schema.json())
dfSchema = str(df.schema.json())
inputPipe = CsvPipe("test", dfSchema, "examples/ncVoters5M/5Party-ocp20/")

inputPipe = CsvPipe("test")
inputPipe.setLocation("examples/ncVoters5M/5Party-ocp20/")
inputPipe.setSchema(dfSchemaA)
args.setData(inputPipe)

#setting outputpipe in 'args'
outputPipe = CsvPipe("ncVotersResult")
outputPipe.setLocation("/tmp/ncVotersOutput")
outputPipe = CsvPipe("ncVotersResult", None, "/tmp/ncVotersOutput")

args.setOutput(outputPipe)

options = ClientOptions()
Expand Down
14 changes: 9 additions & 5 deletions python/zingg/pipes/pipes.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,17 @@ class CsvPipe(Pipe):

:param name: name of the pipe.
:type name: String
:param schema: (optional) json schema for the pipe
:type schema: Schema or None
:param location: (optional) location from where we read data
:type location: String or None
"""
def __init__(self, name):
def __init__(self, name, schema = None, location = None):
Pipe.__init__(self, name, Format.CSV.type())
if(schema != None):
Pipe.setSchema(schema)
if(location != None):
Pipe.addProperty(self, FilePipe.LOCATION, location)

def setDelimiter(self, delimiter):
""" This method is used to define delimiter of CsvPipe
Expand All @@ -42,10 +50,6 @@ def setLocation(self, location):
def setHeader(self, header):
Pipe.addProperty(self, FilePipe.HEADER, header)

class BigQueryPipe(Pipe):

=======

class BigQueryPipe(Pipe):
""" Pipe Class for working with BigQuery pipeline

Expand Down
65 changes: 65 additions & 0 deletions test/testFebrl/testFebrl.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
import unittest
from unittest.case import TestCase
import unittest
from io import StringIO


from zingg import *
from zingg.pipes import *

args = Arguments()
fname = FieldDefinition("fname", "string", MatchType.FUZZY)
lname = FieldDefinition("lname", "string", MatchType.FUZZY)
stNo = FieldDefinition("stNo", "string", MatchType.FUZZY)
add1 = FieldDefinition("add1","string", MatchType.FUZZY)
add2 = FieldDefinition("add2", "string", MatchType.FUZZY)
city = FieldDefinition("city", "string", MatchType.FUZZY)
areacode = FieldDefinition("areacode", "string", MatchType.FUZZY)
state = FieldDefinition("state", "string", MatchType.FUZZY)
dob = FieldDefinition("dob", "string", MatchType.FUZZY)
ssn = FieldDefinition("ssn", "string", MatchType.FUZZY)

fieldDefs = [fname, lname, stNo, add1, add2, city, areacode, state, dob, ssn]

args.setFieldDefinition(fieldDefs)
args.setModelId("100")
args.setZinggDir("models")
args.setNumPartitions(4)
args.setLabelDataSampleSize(0.5)

df = spark.read.format("csv").schema("id string, fname string, lname string, stNo string, add1 string, add2 string, city string, state string, areacode string, dob string, ssn string").load("examples/febrl/test.csv")
inputPipe = CsvPipe("test")
inputPipe.setLocation("examples/febrl/test.csv")
dfSchema = str(df.schema.json())
inputPipe.setSchema(dfSchema)

outputPipe = CsvPipe("result")
outputPipe.setLocation("/tmp/pythonTest")

args.setData(inputPipe)
args.setOutput(outputPipe)

options = ClientOptions()
# options.setPhase("trainMatch")
options.setPhase("trainMatch")

#testing

class Accuracy_recordCount(TestCase):
def test_recordCount(self):
client = Zingg(args, options)
client.initAndExecute()
pMarkedDF = client.getPandasDfFromDs(client.getMarkedRecords())
labelledData = spark.createDataFrame(pMarkedDF)

total_marked = pMarkedDF.shape[0]

# marked record count test
self.assertEqual(total_marked, 76)

pMarkedDF.drop(pMarkedDF[pMarkedDF[ColName.PREDICTION_COL] == -1].index, inplace=True)
acc = (pMarkedDF[ColName.MATCH_FLAG_COL]== pMarkedDF[ColName.PREDICTION_COL]).mean()

# accuracy test
self.assertGreater(acc, 0.9)