Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

python file changes for array #632

Merged
merged 9 commits into from
Aug 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file modified .DS_Store
Binary file not shown.
6 changes: 3 additions & 3 deletions test/InMemPipeTestImages.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

from PIL import Image

df = (spark.read.json('/home/ubuntu/image_data/listings/metadata'))
df = (getSparkSession().read.json('/home/ubuntu/image_data/listings/metadata'))

df = (
df
Expand All @@ -31,7 +31,7 @@
)

image_metadata = (
spark
getSparkSession()
.read
.csv(
path='/home/ubuntu/image_data/images/metadata',
Expand Down Expand Up @@ -132,7 +132,7 @@ def get_image_embedding(path):
#set the modelid and the zingg dir
args.setModelId("9999")
args.setZinggDir("/tmp/modelSmallImages")
args.setNumPartitions(8)
args.setNumPartitions(16)
args.setLabelDataSampleSize(0.2)

inputPipeSmallImages=InMemoryPipe("smallImages")
Expand Down
4 changes: 2 additions & 2 deletions test/InMemPipeTestImagesJson.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

from PIL import Image

df = (spark.read.json('/home/ubuntu/image_data/listings/metadata'))
df = (getSparkSession().read.json('/home/ubuntu/image_data/listings/metadata'))

df = (
df
Expand All @@ -31,7 +31,7 @@
)

image_metadata = (
spark
getSparkSession()
.read
.csv(
path='/home/ubuntu/image_data/images/metadata',
Expand Down
158 changes: 158 additions & 0 deletions test/InMemPipeTestImagesLink.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
from zingg.client import *
from zingg.pipes import *
from pyspark.sql.types import *
import pandas

import pyspark.sql.functions as fn
from sentence_transformers import SentenceTransformer, util
import torch
import pickle

from PIL import Image

df = (getSparkSession().read.json('/home/ubuntu/image_data/listings/metadata'))

df = (
df
.filter("country='US'")
.select(
'item_id',
'brand',
'bullet_point',
'domain_name',
'marketplace',
'item_keywords',
'item_name',
'product_description',
'main_image_id',
'other_image_id',
'node'
)
)

image_metadata = (
getSparkSession()
.read
.csv(
path='/home/ubuntu/image_data/images/metadata',
sep=',',
header=True,
)
)

@fn.udf(ArrayType(StringType()))
def get_english_values_from_array(array=None):

# prioritized list of english language codes (biased towards us english)
english = ['en_US','en_CA','en_GB','en_AU','en_IN','en_SG','en_AE']

# initialize search
values = []
if array is None: array=[]

# for each potential english code
for e in english:

# for each item in array
for a in array:
# if we found the english variant we want
if a['language_tag']==e:
# get value and stop
values += [a['value']]

# if value has been found, then break
if len(values) > 0: break

return values

model = SentenceTransformer('clip-ViT-B-32',device='cuda')

@fn.udf(ArrayType(DoubleType()))
#@fn.udf(StringType())
def get_image_embedding(path):

embedding = []

if path is not None:

full_path = '/home/ubuntu/image_data/images/small/' + path

# open image and convert to embedding
try:
image = Image.open(full_path).convert('RGB')
embedding = model.encode(image, batch_size=128, convert_to_tensor=False, show_progress_bar=False)
embedding = embedding.tolist()
except:
pass

# return embedding value
return embedding

items = (
df
.alias('a')
.select(
'item_id',
'domain_name',
'marketplace',
get_english_values_from_array('brand')[0].alias('brand'),
get_english_values_from_array('item_name')[0].alias('item_name'),
get_english_values_from_array('product_description')[0].alias('product_description'),
get_english_values_from_array('bullet_point').alias('bulletpoint'),
get_english_values_from_array('item_keywords').alias('item_keywords'),
fn.split( fn.col('node')[0]['node_name'], '/').alias('hierarchy'),
'main_image_id'
)
.join(
image_metadata.alias('b').select('image_id','path'),
on=fn.expr('a.main_image_id=b.image_id'),
how='left'
)
.withColumn('main_image_embedding', get_image_embedding(fn.col('path')))
.drop('main_image_id','image_id','bulletpoint','item_keywords','hierarchy')
)

#build the arguments for zingg
args = Arguments()
#set field definitions
item_id = FieldDefinition("item_id", "string", MatchType.DONT_USE)
domain_name = FieldDefinition("domain_name", "string", MatchType.DONT_USE)
marketplace = FieldDefinition("marketplace", "string", MatchType.DONT_USE)
brand = FieldDefinition("brand","string", MatchType.FUZZY)
item_name = FieldDefinition("item_name", "string", MatchType.TEXT)
product_description = FieldDefinition("product_description", "string", MatchType.DONT_USE)
path = FieldDefinition("path", "string", MatchType.DONT_USE)
main_image_embedding = FieldDefinition("main_image_embedding", "array<double>", MatchType.FUZZY)

#fieldDefs = [item_id, domain_name, marketplace, brand, item_name,product_description, bulletpoint, item_keywords, hierarchy,path, main_image_embedding]
fieldDefs = [item_id, domain_name, marketplace, brand, item_name,product_description,path,main_image_embedding]
args.setFieldDefinition(fieldDefs)
#set the modelid and the zingg dir
args.setModelId("9999")
args.setZinggDir("/tmp/modelSmallImages")
args.setNumPartitions(16)
args.setLabelDataSampleSize(0.2)

items1 = items.limit(100)
items2 = items1.limit(10)

inputPipeSmallImages1=InMemoryPipe("smallImages1")
inputPipeSmallImages1.setDataset(items1)

inputPipeSmallImages2=InMemoryPipe("smallImages2")
inputPipeSmallImages2.setDataset(items2)

args.setData(inputPipeSmallImages1,inputPipeSmallImages2)

#setting outputpipe in 'args'
outputPipe = Pipe("resultSmallImages", "parquet")
outputPipe.addProperty("location", "/tmp/resultSmallImages")
args.setOutput(outputPipe)

options = ClientOptions([ClientOptions.PHASE,"link"])

#Zingg execution for the given phase
zingg = Zingg(args, options)
zingg.initAndExecute()


29 changes: 29 additions & 0 deletions test/resAnalysisImages.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
from zingg.client import *
from zingg.pipes import *
from pyspark.sql.types import *
import pandas

import pyspark.sql.functions as fn
from sentence_transformers import SentenceTransformer, util
import torch
import pickle

from PIL import Image


#read data from parquet
res_df = getSparkSession().read.parquet('/tmp/resultSmallImages')
res_df.count()
res_df.show()

#get another df having z_cluster count more than 1
cluster_df = res_df.groupBy('z_cluster').count()
cluster_df = cluster_df.filter("count > 1")
cluster_df = cluster_df.sort(fn.desc('count'))
cluster_df.write.csv('/tmp/res_image_cluster')

#write relavant cols to csv for analysis
res_partial = res_df.select('z_cluster','item_id','brand','item_name','path').sort(fn.desc('item_name'))
res_partial.show()
res_partial.write.csv('/tmp/res_analysis_partial')
#analyse above DF one by one for data