Skip to content

Commit

Permalink
featurestore as notebooks upgradable to pipeline jobs (logicalclocks#49)
Browse files Browse the repository at this point in the history
  • Loading branch information
o-alex authored and tkakantousis committed May 27, 2019
1 parent 800845a commit 45265db
Show file tree
Hide file tree
Showing 6 changed files with 860 additions and 0 deletions.
156 changes: 156 additions & 0 deletions tensorflow/notebooks/sysml_pipeline/1_feature_group1.ipynb
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
{
"cells": [
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import numpy as np\n",
"import random\n",
"import pandas as pd\n",
"from pyspark.sql import SQLContext\n",
"sqlContext = SQLContext(sc)\n",
"from pyspark.sql import Row\n",
"from hops import featurestore\n",
"import tensorflow as tf\n",
"from tensorflow import keras\n",
"from tensorflow.keras import layers\n",
"from hops import experiment\n",
"from tensorflow.python.keras.callbacks import TensorBoard\n",
"from hops import tensorboard"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"area_ids = list(range(1,51))\n",
"house_sizes = []\n",
"house_worths = []\n",
"house_ages = []\n",
"house_area_ids = []\n",
"for i in area_ids:\n",
" for j in list(range(1,100)):\n",
" house_sizes.append(abs(np.random.normal()*1000)/i)\n",
" house_worths.append(abs(np.random.normal()*10000)/i)\n",
" house_ages.append(abs(np.random.normal()*10000)/i)\n",
" house_area_ids.append(i)\n",
"house_ids = list(range(len(house_area_ids)))\n",
"houses_for_sale_data = pd.DataFrame({\n",
" 'area_id':house_area_ids,\n",
" 'house_id':house_ids,\n",
" 'house_worth': house_worths,\n",
" 'house_age': house_ages,\n",
" 'house_size': house_sizes\n",
" })\n",
"houses_for_sale_data_spark_df = sqlContext.createDataFrame(houses_for_sale_data)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"houses_for_sale_data_spark_df.show(5)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"houses_for_sale_data_spark_df.printSchema()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"sum_houses_for_sale_df = houses_for_sale_data_spark_df.groupBy(\"area_id\").sum()\n",
"count_houses_for_sale_df = houses_for_sale_data_spark_df.groupBy(\"area_id\").count()\n",
"sum_count_houses_for_sale_df = sum_houses_for_sale_df.join(count_houses_for_sale_df, \"area_id\")\n",
"sum_count_houses_for_sale_df = sum_count_houses_for_sale_df \\\n",
" .withColumnRenamed(\"sum(house_age)\", \"sum_house_age\") \\\n",
" .withColumnRenamed(\"sum(house_worth)\", \"sum_house_worth\") \\\n",
" .withColumnRenamed(\"sum(house_size)\", \"sum_house_size\") \\\n",
" .withColumnRenamed(\"count\", \"num_rows\")\n",
"def compute_average_features_house_for_sale(row):\n",
" avg_house_worth = row.sum_house_worth/float(row.num_rows)\n",
" avg_house_size = row.sum_house_size/float(row.num_rows)\n",
" avg_house_age = row.sum_house_age/float(row.num_rows)\n",
" return Row(\n",
" sum_house_worth=row.sum_house_worth, \n",
" sum_house_age=row.sum_house_age,\n",
" sum_house_size=row.sum_house_size,\n",
" area_id = row.area_id,\n",
" avg_house_worth = avg_house_worth,\n",
" avg_house_size = avg_house_size,\n",
" avg_house_age = avg_house_age\n",
" )\n",
"houses_for_sale_features_df = sum_count_houses_for_sale_df.rdd.map(\n",
" lambda row: compute_average_features_house_for_sale(row)\n",
").toDF()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"houses_for_sale_features_df.show(5)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"houses_for_sale_features_df.printSchema()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"featurestore.create_featuregroup(\n",
" houses_for_sale_features_df,\n",
" \"houses_for_sale_featuregroup\",\n",
" description=\"aggregate features of houses for sale per area\",\n",
" descriptive_statistics=False,\n",
" feature_correlation=False,\n",
" feature_histograms=False,\n",
" cluster_analysis=False\n",
")"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "PySpark",
"language": "",
"name": "pysparkkernel"
},
"language_info": {
"codemirror_mode": {
"name": "python",
"version": 2
},
"mimetype": "text/x-python",
"name": "pyspark",
"pygments_lexer": "python2"
}
},
"nbformat": 4,
"nbformat_minor": 2
}
149 changes: 149 additions & 0 deletions tensorflow/notebooks/sysml_pipeline/2_feature_group2.ipynb
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
{
"cells": [
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import numpy as np\n",
"import random\n",
"import pandas as pd\n",
"from pyspark.sql import SQLContext\n",
"sqlContext = SQLContext(sc)\n",
"from pyspark.sql import Row\n",
"from hops import featurestore\n",
"import tensorflow as tf\n",
"from tensorflow import keras\n",
"from tensorflow.keras import layers\n",
"from hops import experiment\n",
"from tensorflow.python.keras.callbacks import TensorBoard\n",
"from hops import tensorboard"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"area_ids = list(range(1,51))\n",
"house_purchased_amounts = []\n",
"house_purchases_bidders = []\n",
"house_purchases_area_ids = []\n",
"for i in area_ids:\n",
" for j in list(range(1,1000)):\n",
" house_purchased_amounts.append(abs(np.random.exponential()*100000)/i)\n",
" house_purchases_bidders.append(int(abs(np.random.exponential()*10)/i))\n",
" house_purchases_area_ids.append(i)\n",
"house_purchase_ids = list(range(len(house_purchases_bidders)))\n",
"houses_sold_data = pd.DataFrame({\n",
" 'area_id':house_purchases_area_ids,\n",
" 'house_purchase_id':house_purchase_ids,\n",
" 'number_of_bidders': house_purchases_bidders,\n",
" 'sold_for_amount': house_purchased_amounts\n",
" })\n",
"houses_sold_data_spark_df = sqlContext.createDataFrame(houses_sold_data)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"houses_sold_data_spark_df.show(5)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"houses_sold_data_spark_df.printSchema()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"sum_houses_sold_df = houses_sold_data_spark_df.groupBy(\"area_id\").sum()\n",
"count_houses_sold_df = houses_sold_data_spark_df.groupBy(\"area_id\").count()\n",
"sum_count_houses_sold_df = sum_houses_sold_df.join(count_houses_sold_df, \"area_id\")\n",
"sum_count_houses_sold_df = sum_count_houses_sold_df \\\n",
" .withColumnRenamed(\"sum(number_of_bidders)\", \"sum_number_of_bidders\") \\\n",
" .withColumnRenamed(\"sum(sold_for_amount)\", \"sum_sold_for_amount\") \\\n",
" .withColumnRenamed(\"count\", \"num_rows\")\n",
"def compute_average_features_houses_sold(row):\n",
" avg_num_bidders = row.sum_number_of_bidders/float(row.num_rows)\n",
" avg_sold_for = row.sum_sold_for_amount/float(row.num_rows)\n",
" return Row(\n",
" sum_number_of_bidders=row.sum_number_of_bidders, \n",
" sum_sold_for_amount=row.sum_sold_for_amount,\n",
" area_id = row.area_id,\n",
" avg_num_bidders = avg_num_bidders,\n",
" avg_sold_for = avg_sold_for\n",
" )\n",
"houses_sold_features_df = sum_count_houses_sold_df.rdd.map(\n",
" lambda row: compute_average_features_houses_sold(row)\n",
").toDF()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"houses_sold_features_df.show(5)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"houses_sold_features_df.printSchema()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"featurestore.create_featuregroup(\n",
" houses_sold_features_df,\n",
" \"houses_sold_featuregroup\",\n",
" description=\"aggregate features of sold houses per area\",\n",
" descriptive_statistics=False,\n",
" feature_correlation=False,\n",
" feature_histograms=False,\n",
" cluster_analysis=False\n",
")"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "PySpark",
"language": "",
"name": "pysparkkernel"
},
"language_info": {
"codemirror_mode": {
"name": "python",
"version": 2
},
"mimetype": "text/x-python",
"name": "pyspark",
"pygments_lexer": "python2"
}
},
"nbformat": 4,
"nbformat_minor": 2
}
62 changes: 62 additions & 0 deletions tensorflow/notebooks/sysml_pipeline/3_training_dataset.ipynb
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
{
"cells": [
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import numpy as np\n",
"import random\n",
"import pandas as pd\n",
"from pyspark.sql import SQLContext\n",
"sqlContext = SQLContext(sc)\n",
"from pyspark.sql import Row\n",
"from hops import featurestore\n",
"import tensorflow as tf\n",
"from tensorflow import keras\n",
"from tensorflow.keras import layers\n",
"from hops import experiment\n",
"from tensorflow.python.keras.callbacks import TensorBoard\n",
"from hops import tensorboard"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"features_df = featurestore.get_features([\"avg_house_age\", \"avg_house_size\", \n",
" \"avg_house_worth\", \"avg_num_bidders\", \n",
" \"avg_sold_for\"])\n",
"featurestore.create_training_dataset(\n",
" features_df, \"predict_house_sold_for_dataset\",\n",
" data_format=\"tfrecords\",\n",
" descriptive_statistics=False,\n",
" feature_correlation=False,\n",
" feature_histograms=False,\n",
" cluster_analysis=False\n",
")"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "PySpark",
"language": "",
"name": "pysparkkernel"
},
"language_info": {
"codemirror_mode": {
"name": "python",
"version": 2
},
"mimetype": "text/x-python",
"name": "pyspark",
"pygments_lexer": "python2"
}
},
"nbformat": 4,
"nbformat_minor": 2
}
Loading

0 comments on commit 45265db

Please sign in to comment.