-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathbuild_mercury_image_parquet.py
70 lines (47 loc) · 2.31 KB
/
build_mercury_image_parquet.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
from urllib.request import urlopen
from io import BytesIO
from PIL import Image
from imagehash import phash
from pyspark.sql import functions as F
from pyspark.sql import types as T
from pyspark.ml.linalg import Vectors
import numpy as np
from pyspark.ml.linalg import *
from pyspark.ml.linalg import DenseVector, VectorUDT
from pyspark.ml.feature import StringIndexer, StandardScaler
from pyspark.context import SparkContext
from pyspark.sql import SQLContext
sc = SparkContext()
sqlContext = SQLContext(sc)
def image_nparray(v):
try:
resp = urlopen("http://api.idigbio.org/v2/media?size=webview&filereference=" + v)
pilImage = Image.open(BytesIO(resp.read())).resize((256,256),Image.LANCZOS)
array = np.asarray(pilImage).astype(float)
array = array.ravel()
return array.tolist()
except:
return np.nan
joined_df = sqlContext.read.parquet("/outputs/idigbio-media-20171112T013207-mercury-usecase.parquet")
imgnpa_udf = F.udf(image_nparray, T.ArrayType(T.FloatType()))
npdf = joined_df.select("accessuri","contaminated",imgnpa_udf("accessuri").alias("imgnpa"))
#to_vectors = F.udf(lambda x: DenseVector(x), VectorUDT())
#vector_df = npdf.withColumn("vector_images",to_vectors("image_nparray(accessuri)"))
#get_vector_length = F.udf(lambda vec : vec.size, T.IntegerType())
#vector_length_df = vector_df.withColumn("vector_length",get_vector_length("vector_images"))
##vector_length_df.describe().show()
#equal_df = vector_length_df.filter(F.col("vector_length")==196608)
#same_length_df = equal_df.select("vector_images","contaminated")
# Standardizes features by scaling to unit variance and/or removing the mean using column summary statistics on
# the samples in the training set
# Standardization can improve the convergence rate during the optimization process, and also prevents against
# features with very large variances exerting an overly large influence during model training.
#scaler = StandardScaler(inputCol="vector_images", outputCol="scaled_features", withStd=True, withMean=False)
##fitted_scaler = scaler.fit(same_length_df)
## scaled_df = fitted_scaler.transform(same_length_df)
#input_data = np.array(same_length_df.select('vector_images').collect())
(npdf
.write
.mode("overwrite")
.parquet("/outputs/idigbio-media-20171112T013207-mercury-images.parquet")
)