\n",
@@ -951,226 +949,223 @@
"333 1 33 4 1997-11-03 07:38:19"
]
},
- "execution_count": 18,
"metadata": {},
- "output_type": "execute_result"
+ "execution_count": 18
}
],
- "source": [
- "data_test[data_test[COL_USER] == 1].head(10)"
- ]
+ "metadata": {}
},
{
"cell_type": "markdown",
- "metadata": {},
"source": [
"Timestamps of train data are all precedent to those in test data."
- ]
+ ],
+ "metadata": {}
},
{
"cell_type": "markdown",
- "metadata": {},
"source": [
"#### 3.3.2 Min-rating filter"
- ]
+ ],
+ "metadata": {}
},
{
"cell_type": "markdown",
- "metadata": {},
"source": [
"A min-rating filter is applied to data before it is split by using chronological splitter. The reason of doing this is that, for multi-split, there should be sufficient number of ratings for user/item in the data.\n",
"\n",
"For example, the following means splitting only applies to users that have at least 10 ratings."
- ]
+ ],
+ "metadata": {}
},
{
"cell_type": "code",
"execution_count": 19,
- "metadata": {},
- "outputs": [],
"source": [
"data_train, data_test = python_chrono_split(\n",
" data, filter_by=\"user\", min_rating=10, ratio=0.7,\n",
" col_user=COL_USER, col_item=COL_ITEM, col_timestamp=COL_TIMESTAMP\n",
")"
- ]
+ ],
+ "outputs": [],
+ "metadata": {}
},
{
"cell_type": "markdown",
- "metadata": {},
"source": [
"Number of rows in the yielded splits of data may not sum to the original ones as users with fewer than 10 ratings are filtered out in the splitting."
- ]
+ ],
+ "metadata": {}
},
{
"cell_type": "code",
"execution_count": 20,
- "metadata": {},
+ "source": [
+ "data_train.shape[0] + data_test.shape[0], data.shape[0]"
+ ],
"outputs": [
{
+ "output_type": "execute_result",
"data": {
"text/plain": [
"(100000, 100000)"
]
},
- "execution_count": 20,
"metadata": {},
- "output_type": "execute_result"
+ "execution_count": 20
}
],
- "source": [
- "data_train.shape[0] + data_test.shape[0], data.shape[0]"
- ]
+ "metadata": {}
},
{
"cell_type": "markdown",
- "metadata": {},
"source": [
"### 3.3 Stratified split"
- ]
+ ],
+ "metadata": {}
},
{
"cell_type": "markdown",
- "metadata": {},
"source": [
"Chronogically splitting method takes in a dataset and splits it by either user or item. The split is stratified so that the same set of users or items will appear in both training and testing data sets. \n",
"\n",
"Similar to chronological splitter, `filter_by` and `min_rating_filter` also apply to the stratified splitter.\n",
"\n",
"The following example shows the split of the sample data with a ratio of 0.7, and for each user there should be at least 10 ratings."
- ]
+ ],
+ "metadata": {}
},
{
"cell_type": "code",
"execution_count": 12,
- "metadata": {},
- "outputs": [],
"source": [
"data_train, data_test = python_stratified_split(\n",
" data, filter_by=\"user\", min_rating=10, ratio=0.7,\n",
" col_user=COL_USER, col_item=COL_ITEM\n",
")"
- ]
+ ],
+ "outputs": [],
+ "metadata": {}
},
{
"cell_type": "code",
"execution_count": 13,
- "metadata": {},
+ "source": [
+ "data_train.shape[0] + data_test.shape[0], data.shape[0]"
+ ],
"outputs": [
{
+ "output_type": "execute_result",
"data": {
"text/plain": [
"(100000, 100000)"
]
},
- "execution_count": 13,
"metadata": {},
- "output_type": "execute_result"
+ "execution_count": 13
}
],
- "source": [
- "data_train.shape[0] + data_test.shape[0], data.shape[0]"
- ]
+ "metadata": {}
},
{
"cell_type": "markdown",
- "metadata": {},
"source": [
"### 3.4 Data split in scale"
- ]
+ ],
+ "metadata": {}
},
{
"cell_type": "markdown",
- "metadata": {},
"source": [
"Spark DataFrame is used for scalable splitting. This allows splitting operation performed on large dataset that is distributed across Spark cluster."
- ]
+ ],
+ "metadata": {}
},
{
"cell_type": "markdown",
- "metadata": {},
"source": [
"For example, the below illustrates how to do a random split on the given Spark DataFrame. For simplicity reason, the same MovieLens data, which is in Pandas DataFrame, is transformed into Spark DataFrame and used for splitting."
- ]
+ ],
+ "metadata": {}
},
{
"cell_type": "code",
"execution_count": 21,
- "metadata": {},
- "outputs": [],
"source": [
"spark = start_or_get_spark()"
- ]
+ ],
+ "outputs": [],
+ "metadata": {}
},
{
"cell_type": "code",
"execution_count": 22,
- "metadata": {},
- "outputs": [],
"source": [
"data_spark = spark.read.csv(filepath)"
- ]
+ ],
+ "outputs": [],
+ "metadata": {}
},
{
"cell_type": "code",
"execution_count": 20,
- "metadata": {},
- "outputs": [],
"source": [
"data_spark_train, data_spark_test = spark_random_split(data_spark, ratio=0.7)"
- ]
+ ],
+ "outputs": [],
+ "metadata": {}
},
{
"cell_type": "markdown",
- "metadata": {},
"source": [
"Interestingly, it was noticed that Spark random split does not guarantee a deterministic result. This sometimes leads to issues when data is relatively small while users seek for a precision split. "
- ]
+ ],
+ "metadata": {}
},
{
"cell_type": "code",
"execution_count": 22,
- "metadata": {},
+ "source": [
+ "data_spark_train.count(), data_spark_test.count()"
+ ],
"outputs": [
{
+ "output_type": "execute_result",
"data": {
"text/plain": [
"(69995, 30005)"
]
},
- "execution_count": 22,
"metadata": {},
- "output_type": "execute_result"
+ "execution_count": 22
}
],
- "source": [
- "data_spark_train.count(), data_spark_test.count()"
- ]
+ "metadata": {}
},
{
"cell_type": "code",
"execution_count": null,
- "metadata": {},
- "outputs": [],
"source": [
"spark.stop()"
- ]
+ ],
+ "outputs": [],
+ "metadata": {}
},
{
"cell_type": "markdown",
- "metadata": {},
"source": [
"## References"
- ]
+ ],
+ "metadata": {}
},
{
"cell_type": "markdown",
- "metadata": {},
"source": [
"1. Dimitris Paraschakis et al, \"Comparative Evaluation of Top-N Recommenders in e-Commerce: An Industrial Perspective\", IEEE ICMLA, 2015, Miami, FL, USA.\n",
"2. Guy Shani and Asela Gunawardana, \"Evaluating Recommendation Systems\", Recommender Systems Handbook, Springer, 2015. \n",
"3. Apache Spark, url: https://spark.apache.org/."
- ]
+ ],
+ "metadata": {}
}
],
"metadata": {
diff --git a/examples/02_model_content_based_filtering/mmlspark_lightgbm_criteo.ipynb b/examples/02_model_content_based_filtering/mmlspark_lightgbm_criteo.ipynb
index 919ad584c6..96a9d03284 100644
--- a/examples/02_model_content_based_filtering/mmlspark_lightgbm_criteo.ipynb
+++ b/examples/02_model_content_based_filtering/mmlspark_lightgbm_criteo.ipynb
@@ -2,16 +2,15 @@
"cells": [
{
"cell_type": "markdown",
- "metadata": {},
"source": [
"
"
- ]
+ ],
+ "metadata": {}
},
{
"cell_type": "markdown",
- "metadata": {},
"source": [
"# Content-Based Personalization with LightGBM on Spark\n",
"\n",
@@ -23,47 +22,34 @@
"[MMLSpark](https://github.com/Azure/mmlspark) library, which allows LightGBM to be called in a Spark environment and be computed distributely.\n",
"\n",
"This scenario is a good example of **implicit feedback**, where binary labels indicate the interaction between a user and an item. This contrasts with explicit feedback, where the user explicitely rate the content, for example from 1 to 5. \n"
- ]
+ ],
+ "metadata": {}
},
{
"cell_type": "markdown",
- "metadata": {},
"source": [
"## Global Settings and Imports"
- ]
+ ],
+ "metadata": {}
},
{
"cell_type": "markdown",
- "metadata": {},
"source": [
"This notebook can be run in a Spark environment in a DSVM or in Azure Databricks. For more details about the installation process, please refer to the [setup instructions](../../SETUP.md).\n",
"\n",
"**NOTE for Azure Databricks:**\n",
"* A python script is provided to simplify setting up Azure Databricks with the correct dependencies. Run ```python tools/databricks_install.py -h``` for more details.\n",
"* MMLSpark should not be run on a cluster with autoscaling enabled. Disable the flag in the Azure Databricks Cluster configuration before running this notebook."
- ]
+ ],
+ "metadata": {}
},
{
"cell_type": "code",
"execution_count": 2,
- "metadata": {},
- "outputs": [
- {
- "name": "stdout",
- "output_type": "stream",
- "text": [
- "MMLSpark version: com.microsoft.ml.spark:mmlspark_2.11:0.18.1\n",
- "System version: 3.6.10 |Anaconda, Inc.| (default, May 8 2020, 02:54:21) \n",
- "[GCC 7.3.0]\n",
- "PySpark version: 2.4.3\n"
- ]
- }
- ],
"source": [
"import os\n",
"import sys\n",
"\n",
-
"\n",
"import pyspark\n",
"from pyspark.ml import PipelineModel\n",
@@ -71,37 +57,42 @@
"import papermill as pm\n",
"import scrapbook as sb\n",
"\n",
- "from recommenders.utils.spark_utils import start_or_get_spark\n",
"from recommenders.utils.notebook_utils import is_databricks\n",
+ "from recommenders.utils.spark_utils import start_or_get_spark\n",
"from recommenders.datasets.criteo import load_spark_df\n",
"from recommenders.datasets.spark_splitters import spark_random_split\n",
"\n",
"# Setup MML Spark\n",
- "if not is_databricks():\n",
- " # get the maven coordinates for MML Spark from databricks_install script\n",
- " from tools.databricks_install import MMLSPARK_INFO\n",
- " packages = [MMLSPARK_INFO[\"maven\"][\"coordinates\"]]\n",
- " repo = MMLSPARK_INFO[\"maven\"].get(\"repo\")\n",
- " spark = start_or_get_spark(packages=packages, repository=repo)\n",
- " dbutils = None\n",
- " print(\"MMLSpark version: {}\".format(MMLSPARK_INFO['maven']['coordinates']))\n",
+ "from recommenders.utils.spark_utils import MMLSPARK_REPO, MMLSPARK_PACKAGE\n",
+ "packages = [MMLSPARK_PACKAGE]\n",
+ "repos = [MMLSPARK_REPO]\n",
+ "spark = start_or_get_spark(packages=packages, repositories=repos)\n",
+ "dbutils = None\n",
+ "print(\"MMLSpark version: {}\".format(MMLSPARK_PACKAGE))\n",
"\n",
"from mmlspark.train import ComputeModelStatistics\n",
"from mmlspark.lightgbm import LightGBMClassifier\n",
"\n",
"print(\"System version: {}\".format(sys.version))\n",
"print(\"PySpark version: {}\".format(pyspark.version.__version__))"
- ]
+ ],
+ "outputs": [
+ {
+ "output_type": "stream",
+ "name": "stdout",
+ "text": [
+ "MMLSpark version: com.microsoft.ml.spark:mmlspark_2.11:0.18.1\n",
+ "System version: 3.6.10 |Anaconda, Inc.| (default, May 8 2020, 02:54:21) \n",
+ "[GCC 7.3.0]\n",
+ "PySpark version: 2.4.3\n"
+ ]
+ }
+ ],
+ "metadata": {}
},
{
"cell_type": "code",
"execution_count": 3,
- "metadata": {
- "tags": [
- "parameters"
- ]
- },
- "outputs": [],
"source": [
"# Criteo data size, it can be \"sample\" or \"full\"\n",
"DATA_SIZE = \"sample\"\n",
@@ -116,32 +107,43 @@
"\n",
"# Model name\n",
"MODEL_NAME = 'lightgbm_criteo.mml'"
- ]
+ ],
+ "outputs": [],
+ "metadata": {
+ "tags": [
+ "parameters"
+ ]
+ }
},
{
"cell_type": "markdown",
- "metadata": {},
"source": [
"## Data Preparation\n",
"\n",
"The [Criteo Display Advertising Challenge](https://www.kaggle.com/c/criteo-display-ad-challenge) (Criteo DAC) dataset is a well-known industry benchmarking dataset for developing CTR prediction models, and is used frequently by research papers. The original dataset contains over 45M rows, but there is also a down-sampled dataset which has 100,000 rows (this can be used by setting `DATA_SIZE = \"sample\"`). Each row corresponds to a display ad served by Criteo and the first column is indicates whether this ad has been clicked or not.
\n",
"The dataset contains 1 label column and 39 feature columns, where 13 columns are integer values (int00-int12) and 26 columns are categorical features (cat00-cat25).
\n",
"What the columns represent is not provided, but for this case we can consider the integer and categorical values as features representing the user and / or item content. The label is binary and is an example of implicit feedback indicating a user's interaction with an item. With this dataset we can demonstrate how to build a model that predicts the probability of a user interacting with an item based on available user and item content features.\n"
- ]
+ ],
+ "metadata": {}
},
{
"cell_type": "code",
"execution_count": 4,
- "metadata": {},
+ "source": [
+ "raw_data = load_spark_df(size=DATA_SIZE, spark=spark, dbutils=dbutils)\n",
+ "# visualize data\n",
+ "raw_data.limit(2).toPandas().head()"
+ ],
"outputs": [
{
- "name": "stderr",
"output_type": "stream",
+ "name": "stderr",
"text": [
"100%|██████████| 8.58k/8.58k [00:01<00:00, 5.15kKB/s]\n"
]
},
{
+ "output_type": "execute_result",
"data": {
"text/html": [
"
\n",
@@ -255,59 +257,53 @@
"[2 rows x 40 columns]"
]
},
- "execution_count": 4,
"metadata": {},
- "output_type": "execute_result"
+ "execution_count": 4
}
],
- "source": [
- "raw_data = load_spark_df(size=DATA_SIZE, spark=spark, dbutils=dbutils)\n",
- "# visualize data\n",
- "raw_data.limit(2).toPandas().head()"
- ]
+ "metadata": {}
},
{
"cell_type": "markdown",
- "metadata": {},
"source": [
"### Feature Processing\n",
"The feature data provided has many missing values across both integer and categorical feature fields. In addition the categorical features have many distinct values, so effectively cleaning and representing the feature data is an important step prior to training a model.
\n",
"One of the simplest ways of managing both features that have missing values as well as high cardinality is to use the hashing trick. The [FeatureHasher](http://spark.apache.org/docs/latest/ml-features.html#featurehasher) transformer will pass integer values through and will hash categorical features into a sparse vector of lower dimensionality, which can be used effectively by LightGBM.
\n",
"First, the dataset is splitted randomly for training and testing and feature processing is applied to each dataset."
- ]
+ ],
+ "metadata": {}
},
{
"cell_type": "code",
"execution_count": 5,
- "metadata": {},
- "outputs": [],
"source": [
"raw_train, raw_test = spark_random_split(raw_data, ratio=0.8, seed=42)"
- ]
+ ],
+ "outputs": [],
+ "metadata": {}
},
{
"cell_type": "code",
"execution_count": 6,
- "metadata": {},
- "outputs": [],
"source": [
"columns = [c for c in raw_data.columns if c != 'label']\n",
"feature_processor = FeatureHasher(inputCols=columns, outputCol='features')"
- ]
+ ],
+ "outputs": [],
+ "metadata": {}
},
{
"cell_type": "code",
"execution_count": 7,
- "metadata": {},
- "outputs": [],
"source": [
"train = feature_processor.transform(raw_train)\n",
"test = feature_processor.transform(raw_test)"
- ]
+ ],
+ "outputs": [],
+ "metadata": {}
},
{
"cell_type": "markdown",
- "metadata": {},
"source": [
"## Model Training\n",
"In MMLSpark, the LightGBM implementation for binary classification is invoked using the `LightGBMClassifier` class and specifying the objective as `\"binary\"`. In this instance, the occurrence of positive labels is quite low, so setting the `isUnbalance` flag to true helps account for this imbalance.
\n",
@@ -319,13 +315,12 @@
"- `learningRate`: the learning rate for training across trees\n",
"- `featureFraction`: the fraction of features used for training a tree\n",
"- `earlyStoppingRound`: round at which early stopping can be applied to avoid overfitting"
- ]
+ ],
+ "metadata": {}
},
{
"cell_type": "code",
"execution_count": 8,
- "metadata": {},
- "outputs": [],
"source": [
"lgbm = LightGBMClassifier(\n",
" labelCol=\"label\",\n",
@@ -341,41 +336,54 @@
" featureFraction=FEATURE_FRACTION,\n",
" earlyStoppingRound=EARLY_STOPPING_ROUND\n",
")"
- ]
+ ],
+ "outputs": [],
+ "metadata": {}
},
{
"cell_type": "markdown",
- "metadata": {},
"source": [
"### Model Training and Evaluation"
- ]
+ ],
+ "metadata": {}
},
{
"cell_type": "code",
"execution_count": 9,
- "metadata": {},
- "outputs": [],
"source": [
"model = lgbm.fit(train)\n"
- ]
+ ],
+ "outputs": [],
+ "metadata": {}
},
{
"cell_type": "code",
"execution_count": 10,
- "metadata": {},
- "outputs": [],
"source": [
"predictions = model.transform(test)"
- ]
+ ],
+ "outputs": [],
+ "metadata": {}
},
{
"cell_type": "code",
"execution_count": 11,
- "metadata": {},
+ "source": [
+ "evaluator = (\n",
+ " ComputeModelStatistics()\n",
+ " .setScoredLabelsCol(\"prediction\")\n",
+ " .setLabelCol(\"label\")\n",
+ " .setEvaluationMetric(\"AUC\")\n",
+ ")\n",
+ "\n",
+ "result = evaluator.transform(predictions)\n",
+ "auc = result.select(\"AUC\").collect()[0][0]\n",
+ "result.show()"
+ ],
"outputs": [
{
- "name": "stdout",
"output_type": "stream",
+ "name": "stdout",
"text": [
"+---------------+------------------+\n",
"|evaluation_type| AUC|\n",
@@ -386,77 +394,66 @@
]
}
],
- "source": [
- "evaluator = (\n",
- " ComputeModelStatistics()\n",
- " .setScoredLabelsCol(\"prediction\")\n",
- " .setLabelCol(\"label\")\n",
- " .setEvaluationMetric(\"AUC\")\n",
- ")\n",
- "\n",
- "result = evaluator.transform(predictions)\n",
- "auc = result.select(\"AUC\").collect()[0][0]\n",
- "result.show()"
- ]
+ "metadata": {}
},
{
"cell_type": "code",
"execution_count": 10,
- "metadata": {},
+ "source": [
+ "# Record results with papermill for tests\n",
+ "sb.glue(\"auc\", auc)"
+ ],
"outputs": [
{
+ "output_type": "display_data",
"data": {
"application/papermill.record+json": {
"auc": 0.6870253907336659
}
},
- "metadata": {},
- "output_type": "display_data"
+ "metadata": {}
}
],
- "source": [
- "# Record results with papermill for tests\n",
- "sb.glue(\"auc\", auc)"
- ]
+ "metadata": {}
},
{
"cell_type": "markdown",
- "metadata": {},
"source": [
"## Model Saving \n",
"The full pipeline for operating on raw data including feature processing and model prediction can be saved and reloaded for use in another workflow."
- ]
+ ],
+ "metadata": {}
},
{
"cell_type": "code",
"execution_count": null,
- "metadata": {},
- "outputs": [],
"source": [
"# save model\n",
"pipeline = PipelineModel(stages=[feature_processor, model])\n",
"pipeline.write().overwrite().save(MODEL_NAME)"
- ]
+ ],
+ "outputs": [],
+ "metadata": {}
},
{
"cell_type": "code",
"execution_count": null,
- "metadata": {},
- "outputs": [],
"source": [
"# cleanup spark instance\n",
"if not is_databricks():\n",
" spark.stop()"
- ]
+ ],
+ "outputs": [],
+ "metadata": {}
},
{
"cell_type": "markdown",
- "metadata": {},
"source": [
"## Additional Reading\n",
"\\[1\\] Guolin Ke, Qi Meng, Thomas Finley, Taifeng Wang, Wei Chen, Weidong Ma, Qiwei Ye, and Tie-Yan Liu. 2017. LightGBM: A highly efficient gradient boosting decision tree. In Advances in Neural Information Processing Systems. 3146–3154. https://papers.nips.cc/paper/6907-lightgbm-a-highly-efficient-gradient-boosting-decision-tree.pdf
\n",
"\\[2\\] MML Spark: https://mmlspark.blob.core.windows.net/website/index.html
\n"
- ]
+ ],
+ "metadata": {}
}
],
"metadata": {
diff --git a/examples/03_evaluate/als_movielens_diversity_metrics.ipynb b/examples/03_evaluate/als_movielens_diversity_metrics.ipynb
index bf684cf9d5..4b6fa73ac9 100644
--- a/examples/03_evaluate/als_movielens_diversity_metrics.ipynb
+++ b/examples/03_evaluate/als_movielens_diversity_metrics.ipynb
@@ -2,16 +2,15 @@
"cells": [
{
"cell_type": "markdown",
- "metadata": {},
"source": [
"
Copyright (c) Microsoft Corporation. All rights reserved.\n",
"\n",
"
Licensed under the MIT License."
- ]
+ ],
+ "metadata": {}
},
{
"cell_type": "markdown",
- "metadata": {},
"source": [
"# Apply Diversity Metrics \n",
"## -- Compare ALS and Random Recommenders on MovieLens (PySpark)\n",
@@ -41,11 +40,11 @@
"The comparision results show that the ALS recommender outperforms the random recommender on ranking metrics (Precision@k, Recall@k, NDCG@k, and\tMean average precision), while the random recommender outperforms ALS recommender on diversity metrics. This is because ALS is optimized for estimating the item rating as accurate as possible, therefore it performs well on accuracy metrics including rating and ranking metrics. As a side effect, the items being recommended tend to be popular items, which are the items mostly sold or viewed. It leaves the [long-tail items](https://github.com/microsoft/recommenders/blob/main/GLOSSARY.md) having less chance to get introduced to the users. This is the reason why ALS is not performing as well as a random recommender on diversity metrics. \n",
"\n",
"From the algorithmic point of view, items in the tail suffer from the cold-start problem, making them hard for recommendation systems to use. However, from the business point of view, oftentimes the items in the tail can be highly profitable, since, depending on supply, business can apply a higher margin to them. Recommendation systems that optimize metrics like novelty and diversity, can help to find users willing to get these long tail items. Usually there is a trade-off between one type of metric vs. another. One should decide which set of metrics to optimize based on business scenarios."
- ]
+ ],
+ "metadata": {}
},
{
"cell_type": "markdown",
- "metadata": {},
"source": [
"**Coverage**\n",
"\n",
@@ -65,11 +64,11 @@
"p(i|R) = \\frac{|M_r (i)|}{|\\textrm{reco_df}|}\n",
"$$\n",
"and $M_r (i)$ denotes the users who are recommended item $i$.\n"
- ]
+ ],
+ "metadata": {}
},
{
"cell_type": "markdown",
- "metadata": {},
"source": [
"\n",
"**Diversity**\n",
@@ -89,11 +88,11 @@
"$$\n",
"\\textrm{diversity} = 1 - \\textrm{IL}\n",
"$$\n"
- ]
+ ],
+ "metadata": {}
},
{
"cell_type": "markdown",
- "metadata": {},
"source": [
"\n",
"**Novelty**\n",
@@ -112,11 +111,11 @@
"$$\n",
"\\textrm{novelty} = \\sum_{i \\in N_r} \\frac{|M_r (i)|}{|\\textrm{reco_df}|} \\textrm{novelty}(i)\n",
"$$\n"
- ]
+ ],
+ "metadata": {}
},
{
"cell_type": "markdown",
- "metadata": {},
"source": [
"**Serendipity**\n",
"\n",
@@ -131,30 +130,19 @@
"\\textrm{serendipity} = \\frac{1}{|M|} \\sum_{u \\in M_r}\n",
"\\frac{1}{|N_r (u)|} \\sum_{i \\in N_r (u)} \\big(1 - \\textrm{expectedness}(i|u) \\big) \\, \\textrm{relevance}(i)\n",
"$$\n"
- ]
+ ],
+ "metadata": {}
},
{
"cell_type": "markdown",
- "metadata": {},
"source": [
"**Note**: This notebook requires a PySpark environment to run properly. Please follow the steps in [SETUP.md](https://github.com/Microsoft/Recommenders/blob/master/SETUP.md#dependencies-setup) to install the PySpark environment."
- ]
+ ],
+ "metadata": {}
},
{
"cell_type": "code",
"execution_count": 1,
- "metadata": {},
- "outputs": [
- {
- "name": "stdout",
- "output_type": "stream",
- "text": [
- "System version: 3.6.13 |Anaconda, Inc.| (default, Jun 4 2021, 14:25:59) \n",
- "[GCC 7.5.0]\n",
- "Spark version: 2.4.8\n"
- ]
- }
- ],
"source": [
"# set the environment path to find Recommenders\n",
"%load_ext autoreload\n",
@@ -165,16 +153,14 @@
"import pyspark\n",
"from pyspark.ml.recommendation import ALS\n",
"import pyspark.sql.functions as F\n",
- "from pyspark.sql import SparkSession\n",
- "from pyspark.sql.types import StringType, FloatType, IntegerType, LongType, StructType, StructField\n",
- "from pyspark.ml.feature import Tokenizer, RegexTokenizer, StopWordsRemover\n",
+ "from pyspark.sql.types import FloatType, IntegerType, LongType, StructType, StructField\n",
+ "from pyspark.ml.feature import Tokenizer, StopWordsRemover\n",
"from pyspark.ml.feature import HashingTF, CountVectorizer, VectorAssembler\n",
"\n",
"from recommenders.utils.timer import Timer\n",
"from recommenders.datasets import movielens\n",
- "from recommenders.utils.notebook_utils import is_jupyter\n",
"from recommenders.datasets.spark_splitters import spark_random_split\n",
- "from recommenders.evaluation.spark_evaluation import SparkRatingEvaluation, SparkRankingEvaluation, SparkDiversityEvaluation\n",
+ "from recommenders.evaluation.spark_evaluation import SparkRankingEvaluation, SparkDiversityEvaluation\n",
"from recommenders.utils.spark_utils import start_or_get_spark\n",
"\n",
"from pyspark.sql.window import Window\n",
@@ -185,25 +171,31 @@
"\n",
"print(\"System version: {}\".format(sys.version))\n",
"print(\"Spark version: {}\".format(pyspark.__version__))\n"
- ]
+ ],
+ "outputs": [
+ {
+ "output_type": "stream",
+ "name": "stdout",
+ "text": [
+ "System version: 3.6.13 |Anaconda, Inc.| (default, Jun 4 2021, 14:25:59) \n",
+ "[GCC 7.5.0]\n",
+ "Spark version: 2.4.8\n"
+ ]
+ }
+ ],
+ "metadata": {}
},
{
"cell_type": "markdown",
- "metadata": {},
"source": [
"\n",
"Set the default parameters."
- ]
+ ],
+ "metadata": {}
},
{
"cell_type": "code",
"execution_count": 2,
- "metadata": {
- "tags": [
- "parameters"
- ]
- },
- "outputs": [],
"source": [
"# top k items to recommend\n",
"TOP_K = 10\n",
@@ -215,22 +207,26 @@
"COL_USER=\"UserId\"\n",
"COL_ITEM=\"MovieId\"\n",
"COL_RATING=\"Rating\""
- ]
+ ],
+ "outputs": [],
+ "metadata": {
+ "tags": [
+ "parameters"
+ ]
+ }
},
{
"cell_type": "markdown",
- "metadata": {},
"source": [
"### 1. Set up Spark context\n",
"\n",
"The following settings work well for debugging locally on VM - change when running on a cluster. We set up a giant single executor with many threads and specify memory cap. "
- ]
+ ],
+ "metadata": {}
},
{
"cell_type": "code",
"execution_count": 3,
- "metadata": {},
- "outputs": [],
"source": [
"# the following settings work well for debugging locally on VM - change when running on a cluster\n",
"# set up a giant single executor with many threads and specify memory cap\n",
@@ -238,30 +234,45 @@
"spark = start_or_get_spark(\"ALS PySpark\", memory=\"16g\")\n",
"\n",
"spark.conf.set(\"spark.sql.crossJoin.enabled\", \"true\")"
- ]
+ ],
+ "outputs": [],
+ "metadata": {}
},
{
"cell_type": "markdown",
- "metadata": {},
"source": [
"### 2. Download the MovieLens dataset"
- ]
+ ],
+ "metadata": {}
},
{
"cell_type": "code",
"execution_count": 4,
- "metadata": {},
+ "source": [
+ "# Note: The DataFrame-based API for ALS currently only supports integers for user and item ids.\n",
+ "schema = StructType(\n",
+ " (\n",
+ " StructField(COL_USER, IntegerType()),\n",
+ " StructField(COL_ITEM, IntegerType()),\n",
+ " StructField(COL_RATING, FloatType()),\n",
+ " StructField(\"Timestamp\", LongType()),\n",
+ " )\n",
+ ")\n",
+ "\n",
+ "data = movielens.load_spark_df(spark, size=MOVIELENS_DATA_SIZE, schema=schema, title_col=\"title\", genres_col=\"genres\")\n",
+ "data.show()"
+ ],
"outputs": [
{
- "name": "stderr",
"output_type": "stream",
+ "name": "stderr",
"text": [
"100%|██████████| 4.81k/4.81k [00:00<00:00, 17.1kKB/s]\n"
]
},
{
- "name": "stdout",
"output_type": "stream",
+ "name": "stdout",
"text": [
"+-------+------+------+---------+--------------------+------+\n",
"|MovieId|UserId|Rating|Timestamp| title|genres|\n",
@@ -292,88 +303,73 @@
]
}
],
- "source": [
- "# Note: The DataFrame-based API for ALS currently only supports integers for user and item ids.\n",
- "schema = StructType(\n",
- " (\n",
- " StructField(COL_USER, IntegerType()),\n",
- " StructField(COL_ITEM, IntegerType()),\n",
- " StructField(COL_RATING, FloatType()),\n",
- " StructField(\"Timestamp\", LongType()),\n",
- " )\n",
- ")\n",
- "\n",
- "data = movielens.load_spark_df(spark, size=MOVIELENS_DATA_SIZE, schema=schema, title_col=\"title\", genres_col=\"genres\")\n",
- "data.show()"
- ]
+ "metadata": {}
},
{
"cell_type": "markdown",
- "metadata": {},
"source": [
"#### Split the data using the Spark random splitter provided in utilities"
- ]
+ ],
+ "metadata": {}
},
{
"cell_type": "code",
"execution_count": 5,
- "metadata": {},
+ "source": [
+ "train_df, test_df = spark_random_split(data.select(COL_USER, COL_ITEM, COL_RATING), ratio=0.75, seed=123)\n",
+ "print (\"N train_df\", train_df.cache().count())\n",
+ "print (\"N test_df\", test_df.cache().count())"
+ ],
"outputs": [
{
- "name": "stdout",
"output_type": "stream",
+ "name": "stdout",
"text": [
"N train_df 75066\n",
"N test_df 24934\n"
]
}
],
- "source": [
- "train_df, test_df = spark_random_split(data.select(COL_USER, COL_ITEM, COL_RATING), ratio=0.75, seed=123)\n",
- "print (\"N train_df\", train_df.cache().count())\n",
- "print (\"N test_df\", test_df.cache().count())"
- ]
+ "metadata": {}
},
{
"cell_type": "markdown",
- "metadata": {},
"source": [
"#### Get all possible user-item pairs"
- ]
+ ],
+ "metadata": {}
},
{
"cell_type": "markdown",
- "metadata": {},
"source": [
"Note: We assume that training data contains all users and all catalog items. "
- ]
+ ],
+ "metadata": {}
},
{
"cell_type": "code",
"execution_count": 6,
- "metadata": {},
- "outputs": [],
"source": [
"users = train_df.select(COL_USER).distinct()\n",
"items = train_df.select(COL_ITEM).distinct()\n",
"user_item = users.crossJoin(items)"
- ]
+ ],
+ "outputs": [],
+ "metadata": {}
},
{
"cell_type": "markdown",
- "metadata": {},
"source": [
"### 3. Train the ALS model on the training data, and get the top-k recommendations for our testing data\n",
"\n",
"To predict movie ratings, we use the rating data in the training set as users' explicit feedback. The hyperparameters used in building the model are referenced from [here](http://mymedialite.net/examples/datasets.html). We do not constrain the latent factors (`nonnegative = False`) in order to allow for both positive and negative preferences towards movies.\n",
"Timing will vary depending on the machine being used to train."
- ]
+ ],
+ "metadata": {}
},
{
"cell_type": "code",
"execution_count": 7,
- "metadata": {},
- "outputs": [],
"source": [
"header = {\n",
" \"userCol\": COL_USER,\n",
@@ -392,51 +388,42 @@
" seed=42,\n",
" **header\n",
")"
- ]
+ ],
+ "outputs": [],
+ "metadata": {}
},
{
"cell_type": "code",
"execution_count": 8,
- "metadata": {},
+ "source": [
+ "with Timer() as train_time:\n",
+ " model = als.fit(train_df)\n",
+ "\n",
+ "print(\"Took {} seconds for training.\".format(train_time.interval))"
+ ],
"outputs": [
{
- "name": "stdout",
"output_type": "stream",
+ "name": "stdout",
"text": [
"Took 4.012367556002573 seconds for training.\n"
]
}
],
- "source": [
- "with Timer() as train_time:\n",
- " model = als.fit(train_df)\n",
- "\n",
- "print(\"Took {} seconds for training.\".format(train_time.interval))"
- ]
+ "metadata": {}
},
{
"cell_type": "markdown",
- "metadata": {},
"source": [
"In the movie recommendation use case, recommending movies that have been rated by the users does not make sense. Therefore, the rated movies are removed from the recommended items.\n",
"\n",
"In order to achieve this, we recommend all movies to all users, and then remove the user-movie pairs that exist in the training dataset."
- ]
+ ],
+ "metadata": {}
},
{
"cell_type": "code",
"execution_count": 9,
- "metadata": {},
- "outputs": [
- {
- "name": "stdout",
- "output_type": "stream",
- "text": [
- "1464853\n",
- "9430\n"
- ]
- }
- ],
"source": [
"# Score all user-item pairs\n",
"dfs_pred = model.transform(user_item)\n",
@@ -457,22 +444,31 @@
"top_k_reco = top_all.select(\"*\", F.row_number().over(window).alias(\"rank\")).filter(F.col(\"rank\") <= TOP_K).drop(\"rank\")\n",
" \n",
"print(top_k_reco.count())"
- ]
+ ],
+ "outputs": [
+ {
+ "output_type": "stream",
+ "name": "stdout",
+ "text": [
+ "1464853\n",
+ "9430\n"
+ ]
+ }
+ ],
+ "metadata": {}
},
{
"cell_type": "markdown",
- "metadata": {},
"source": [
"### 4. Random Recommender\n",
"\n",
"We define a recommender which randomly recommends unseen items to each user. "
- ]
+ ],
+ "metadata": {}
},
{
"cell_type": "code",
"execution_count": 10,
- "metadata": {},
- "outputs": [],
"source": [
"# random recommender\n",
"window = Window.partitionBy(COL_USER).orderBy(F.rand())\n",
@@ -493,20 +489,20 @@
" .filter(F.col(\"score\") <= TOP_K)\n",
" .drop(COL_RATING)\n",
")"
- ]
+ ],
+ "outputs": [],
+ "metadata": {}
},
{
"cell_type": "markdown",
- "metadata": {},
"source": [
"### 5. ALS vs Random Recommenders Performance Comparison"
- ]
+ ],
+ "metadata": {}
},
{
"cell_type": "code",
"execution_count": 11,
- "metadata": {},
- "outputs": [],
"source": [
"def get_ranking_results(ranking_eval):\n",
" metrics = {\n",
@@ -527,13 +523,13 @@
" \"serendipity\": diversity_eval.serendipity()\n",
" }\n",
" return metrics "
- ]
+ ],
+ "outputs": [],
+ "metadata": {}
},
{
"cell_type": "code",
"execution_count": 12,
- "metadata": {},
- "outputs": [],
"source": [
"def generate_summary(data, algo, k, ranking_metrics, diversity_metrics):\n",
" summary = {\"Data\": data, \"Algo\": algo, \"K\": k}\n",
@@ -548,20 +544,20 @@
" summary.update(ranking_metrics)\n",
" summary.update(diversity_metrics)\n",
" return summary"
- ]
+ ],
+ "outputs": [],
+ "metadata": {}
},
{
"cell_type": "markdown",
- "metadata": {},
"source": [
"#### ALS Recommender Performance Results"
- ]
+ ],
+ "metadata": {}
},
{
"cell_type": "code",
"execution_count": 13,
- "metadata": {},
- "outputs": [],
"source": [
"als_ranking_eval = SparkRankingEvaluation(\n",
" test_df, \n",
@@ -575,13 +571,13 @@
")\n",
"\n",
"als_ranking_metrics = get_ranking_results(als_ranking_eval)"
- ]
+ ],
+ "outputs": [],
+ "metadata": {}
},
{
"cell_type": "code",
"execution_count": 17,
- "metadata": {},
- "outputs": [],
"source": [
"als_diversity_eval = SparkDiversityEvaluation(\n",
" train_df = train_df, \n",
@@ -591,29 +587,29 @@
")\n",
"\n",
"als_diversity_metrics = get_diversity_results(als_diversity_eval)"
- ]
+ ],
+ "outputs": [],
+ "metadata": {}
},
{
"cell_type": "code",
"execution_count": 18,
- "metadata": {},
- "outputs": [],
"source": [
"als_results = generate_summary(MOVIELENS_DATA_SIZE, \"als\", TOP_K, als_ranking_metrics, als_diversity_metrics)"
- ]
+ ],
+ "outputs": [],
+ "metadata": {}
},
{
"cell_type": "markdown",
- "metadata": {},
"source": [
"#### Random Recommender Performance Results"
- ]
+ ],
+ "metadata": {}
},
{
"cell_type": "code",
"execution_count": 19,
- "metadata": {},
- "outputs": [],
"source": [
"random_ranking_eval = SparkRankingEvaluation(\n",
" test_df,\n",
@@ -626,13 +622,13 @@
")\n",
"\n",
"random_ranking_metrics = get_ranking_results(random_ranking_eval)"
- ]
+ ],
+ "outputs": [],
+ "metadata": {}
},
{
"cell_type": "code",
"execution_count": 20,
- "metadata": {},
- "outputs": [],
"source": [
"random_diversity_eval = SparkDiversityEvaluation(\n",
" train_df = train_df, \n",
@@ -642,43 +638,48 @@
")\n",
" \n",
"random_diversity_metrics = get_diversity_results(random_diversity_eval)"
- ]
+ ],
+ "outputs": [],
+ "metadata": {}
},
{
"cell_type": "code",
"execution_count": 21,
- "metadata": {},
- "outputs": [],
"source": [
"random_results = generate_summary(MOVIELENS_DATA_SIZE, \"random\", TOP_K, random_ranking_metrics, random_diversity_metrics)"
- ]
+ ],
+ "outputs": [],
+ "metadata": {}
},
{
"cell_type": "markdown",
- "metadata": {},
"source": [
"#### Result Comparison"
- ]
+ ],
+ "metadata": {}
},
{
"cell_type": "code",
"execution_count": 22,
- "metadata": {},
- "outputs": [],
"source": [
"cols = [\"Data\", \"Algo\", \"K\", \"Precision@k\", \"Recall@k\", \"NDCG@k\", \"Mean average precision\",\"catalog_coverage\", \"distributional_coverage\",\"novelty\", \"diversity\", \"serendipity\" ]\n",
"df_results = pd.DataFrame(columns=cols)\n",
"\n",
"df_results.loc[1] = als_results \n",
"df_results.loc[2] = random_results "
- ]
+ ],
+ "outputs": [],
+ "metadata": {}
},
{
"cell_type": "code",
"execution_count": 23,
- "metadata": {},
+ "source": [
+ "df_results"
+ ],
"outputs": [
{
+ "output_type": "execute_result",
"data": {
"text/html": [
"
\n",
@@ -762,36 +763,31 @@
"2 0.893001 "
]
},
- "execution_count": 23,
"metadata": {},
- "output_type": "execute_result"
+ "execution_count": 23
}
],
- "source": [
- "df_results"
- ]
+ "metadata": {}
},
{
"cell_type": "markdown",
- "metadata": {},
"source": [
"#### Conclusion\n",
"The comparision results show that the ALS recommender outperforms the random recommender on ranking metrics (Precision@k, Recall@k, NDCG@k, and\tMean average precision), while the random recommender outperforms ALS recommender on diversity metrics. This is because ALS is optimized for estimating the item rating as accurate as possible, therefore it performs well on accuracy metrics including rating and ranking metrics. As a side effect, the items being recommended tend to be popular items, which are the items mostly sold or viewed. It leaves the long-tail less popular items having less chance to get introduced to the users. This is the reason why ALS is not performing as well as a random recommender on diversity metrics. "
- ]
+ ],
+ "metadata": {}
},
{
"cell_type": "markdown",
- "metadata": {},
"source": [
"### 6. Calculate diversity metrics using item feature vector based item-item similarity\n",
"In the above section we calculate diversity metrics using item co-occurrence count based item-item similarity. In the scenarios when item features are available, we may want to calculate item-item similarity based on item feature vectors. In this section, we show how to calculate diversity metrics using item feature vector based item-item similarity."
- ]
+ ],
+ "metadata": {}
},
{
"cell_type": "code",
"execution_count": 24,
- "metadata": {},
- "outputs": [],
"source": [
"# Get movie features \"title\" and \"genres\"\n",
"movies = (\n",
@@ -801,13 +797,13 @@
" .withColumn(\"title\", F.regexp_replace(F.col(\"title\"), \"[\\(),:^0-9]\", \"\")) # remove year from title\n",
" .drop(\"count\") # remove unused columns\n",
")"
- ]
+ ],
+ "outputs": [],
+ "metadata": {}
},
{
"cell_type": "code",
"execution_count": 25,
- "metadata": {},
- "outputs": [],
"source": [
"# tokenize \"title\" column\n",
"title_tokenizer = Tokenizer(inputCol=\"title\", outputCol=\"title_words\")\n",
@@ -816,16 +812,38 @@
"# remove stop words\n",
"remover = StopWordsRemover(inputCol=\"title_words\", outputCol=\"text\")\n",
"clean_data = remover.transform(tokenized_data).drop(\"title\", \"title_words\")"
- ]
+ ],
+ "outputs": [],
+ "metadata": {}
},
{
"cell_type": "code",
"execution_count": 26,
- "metadata": {},
+ "source": [
+ "# convert text input into feature vectors\n",
+ "\n",
+ "# step 1: perform HashingTF on column \"text\"\n",
+ "text_hasher = HashingTF(inputCol=\"text\", outputCol=\"text_features\", numFeatures=1024)\n",
+ "hashed_data = text_hasher.transform(clean_data)\n",
+ "\n",
+ "# step 2: fit a CountVectorizerModel from column \"genres\".\n",
+ "count_vectorizer = CountVectorizer(inputCol=\"genres\", outputCol=\"genres_features\")\n",
+ "count_vectorizer_model = count_vectorizer.fit(hashed_data)\n",
+ "vectorized_data = count_vectorizer_model.transform(hashed_data)\n",
+ "\n",
+ "# step 3: assemble features into a single vector\n",
+ "assembler = VectorAssembler(\n",
+ " inputCols=[\"text_features\", \"genres_features\"],\n",
+ " outputCol=\"features\",\n",
+ ")\n",
+ "feature_data = assembler.transform(vectorized_data).select(\"MovieId\", \"features\")\n",
+ "\n",
+ "feature_data.show(10, False)"
+ ],
"outputs": [
{
- "name": "stdout",
"output_type": "stream",
+ "name": "stdout",
"text": [
"+-------+---------------------------------------------+\n",
"|MovieId|features |\n",
@@ -846,49 +864,18 @@
]
}
],
- "source": [
- "# convert text input into feature vectors\n",
- "\n",
- "# step 1: perform HashingTF on column \"text\"\n",
- "text_hasher = HashingTF(inputCol=\"text\", outputCol=\"text_features\", numFeatures=1024)\n",
- "hashed_data = text_hasher.transform(clean_data)\n",
- "\n",
- "# step 2: fit a CountVectorizerModel from column \"genres\".\n",
- "count_vectorizer = CountVectorizer(inputCol=\"genres\", outputCol=\"genres_features\")\n",
- "count_vectorizer_model = count_vectorizer.fit(hashed_data)\n",
- "vectorized_data = count_vectorizer_model.transform(hashed_data)\n",
- "\n",
- "# step 3: assemble features into a single vector\n",
- "assembler = VectorAssembler(\n",
- " inputCols=[\"text_features\", \"genres_features\"],\n",
- " outputCol=\"features\",\n",
- ")\n",
- "feature_data = assembler.transform(vectorized_data).select(\"MovieId\", \"features\")\n",
- "\n",
- "feature_data.show(10, False)"
- ]
+ "metadata": {}
},
{
"cell_type": "markdown",
- "metadata": {},
"source": [
"The *features* column is represented with a SparseVector object. For example, in the feature vector (1043,[128,544,1025],[1.0,1.0,1.0]), 1043 is the vector length, indicating the vector consisting of 1043 item features. The values at index positions 128,544,1025 are 1.0, and the values at other positions are all 0. "
- ]
+ ],
+ "metadata": {}
},
{
"cell_type": "code",
"execution_count": 27,
- "metadata": {},
- "outputs": [
- {
- "name": "stdout",
- "output_type": "stream",
- "text": [
- "0.8738984131037538\n",
- "0.8873467159479473\n"
- ]
- }
- ],
"source": [
"als_eval = SparkDiversityEvaluation(\n",
" train_df = train_df, \n",
@@ -903,22 +890,22 @@
"als_serendipity=als_eval.serendipity()\n",
"print(als_diversity)\n",
"print(als_serendipity)"
- ]
- },
- {
- "cell_type": "code",
- "execution_count": 28,
- "metadata": {},
+ ],
"outputs": [
{
- "name": "stdout",
"output_type": "stream",
+ "name": "stdout",
"text": [
- "0.8978120851519519\n",
- "0.8937850286817351\n"
+ "0.8738984131037538\n",
+ "0.8873467159479473\n"
]
}
],
+ "metadata": {}
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 28,
"source": [
"random_eval = SparkDiversityEvaluation(\n",
" train_df = train_df, \n",
@@ -933,18 +920,28 @@
"random_serendipity=random_eval.serendipity()\n",
"print(random_diversity)\n",
"print(random_serendipity)"
- ]
+ ],
+ "outputs": [
+ {
+ "output_type": "stream",
+ "name": "stdout",
+ "text": [
+ "0.8978120851519519\n",
+ "0.8937850286817351\n"
+ ]
+ }
+ ],
+ "metadata": {}
},
{
"cell_type": "markdown",
- "metadata": {},
"source": [
"It's interesting that the value of diversity and serendipity changes when using different item-item similarity calculation approach, for both ALS algorithm and random recommender. The diversity and serendipity of random recommender are still higher than ALS algorithm. "
- ]
+ ],
+ "metadata": {}
},
{
"cell_type": "markdown",
- "metadata": {},
"source": [
"### References\n",
"The metric definitions / formulations are based on the following references:\n",
@@ -952,17 +949,18 @@
"- G. Shani and A. Gunawardana, Evaluating recommendation systems, Recommender Systems Handbook pp. 257-297, 2010.\n",
"- E. Yan, Serendipity: Accuracy’s unpopular best friend in recommender Systems, eugeneyan.com, April 2020\n",
"- Y.C. Zhang, D.Ó. Séaghdha, D. Quercia and T. Jambor, Auralist: introducing serendipity into music recommendation, WSDM 2012\n"
- ]
+ ],
+ "metadata": {}
},
{
"cell_type": "code",
"execution_count": null,
- "metadata": {},
- "outputs": [],
"source": [
"# cleanup spark instance\n",
"spark.stop()"
- ]
+ ],
+ "outputs": [],
+ "metadata": {}
}
],
"metadata": {
diff --git a/examples/05_operationalize/als_movie_o16n.ipynb b/examples/05_operationalize/als_movie_o16n.ipynb
index 28684cd821..5aa1bb1425 100644
--- a/examples/05_operationalize/als_movie_o16n.ipynb
+++ b/examples/05_operationalize/als_movie_o16n.ipynb
@@ -2,16 +2,15 @@
"cells": [
{
"cell_type": "markdown",
- "metadata": {},
"source": [
"Copyright (c) Microsoft Corporation. All rights reserved.\n",
"\n",
"Licensed under the MIT License."
- ]
+ ],
+ "metadata": {}
},
{
"cell_type": "markdown",
- "metadata": {},
"source": [
"# Building a Real-time Recommendation API\n",
"\n",
@@ -40,38 +39,29 @@
"1. [Service Creation](#1-Service-Creation)\n",
"2. [Training and evaluation](#2-Training)\n",
"3. [Operationalization](#3.-Operationalize-the-Recommender-Service)"
- ]
+ ],
+ "metadata": {}
},
{
"cell_type": "markdown",
- "metadata": {},
"source": [
"## Setup\n",
"To run this notebook on Azure Databricks, you should setup Azure Databricks by following the appropriate sections in the repository [SETUP instructions](../../SETUP.md) and import this notebook into your Azure Databricks Workspace (see instructions [here](https://docs.azuredatabricks.net/user-guide/notebooks/notebook-manage.html#import-a-notebook)).\n",
"\n",
"Please note: This notebook **REQUIRES** that you add the dependencies to support **operationalization**. See [SETUP](../../SETUP.md) for details.\n"
- ]
+ ],
+ "metadata": {}
},
{
"cell_type": "markdown",
- "metadata": {},
"source": [
"## 0 File Imports"
- ]
+ ],
+ "metadata": {}
},
{
"cell_type": "code",
"execution_count": 1,
- "metadata": {},
- "outputs": [
- {
- "name": "stdout",
- "output_type": "stream",
- "text": [
- "Azure SDK version: 1.0.69\n"
- ]
- }
- ],
"source": [
"import os\n",
"import sys\n",
@@ -95,24 +85,46 @@
"from pyspark.sql.types import StructType, StructField\n",
"from pyspark.sql.types import FloatType, IntegerType, LongType\n",
"\n",
- "from recommenders.utils.timer import Timer\n",
- "from recommenders.utils.spark_utils import start_or_get_spark\n",
"from recommenders.datasets import movielens\n",
"from recommenders.datasets.cosmos_cli import find_collection, read_collection, read_database, find_database\n",
"from recommenders.datasets.download_utils import maybe_download\n",
"from recommenders.datasets.spark_splitters import spark_random_split\n",
"from recommenders.evaluation.spark_evaluation import SparkRatingEvaluation, SparkRankingEvaluation\n",
"from recommenders.utils.notebook_utils import is_databricks\n",
+ "from recommenders.utils.timer import Timer\n",
+ "from recommenders.utils.spark_utils import start_or_get_spark\n",
"\n",
"print(\"Azure SDK version:\", azureml.core.VERSION)"
- ]
+ ],
+ "outputs": [
+ {
+ "output_type": "stream",
+ "name": "stdout",
+ "text": [
+ "Azure SDK version: 1.0.69\n"
+ ]
+ }
+ ],
+ "metadata": {}
},
{
"cell_type": "code",
"execution_count": 3,
- "metadata": {},
+ "source": [
+ "# Start spark session if needed\n",
+ "if not is_databricks():\n",
+ " cosmos_connector = (\n",
+ " \"https://search.maven.org/remotecontent?filepath=com/microsoft/azure/\"\n",
+ " \"azure-cosmosdb-spark_2.3.0_2.11/1.3.3/azure-cosmosdb-spark_2.3.0_2.11-1.3.3-uber.jar\"\n",
+ " )\n",
+ " jar_filepath = maybe_download(url=cosmos_connector, filename=\"cosmos.jar\")\n",
+ " spark = start_or_get_spark(\"ALS\", memory=\"10g\", jars=[jar_filepath])\n",
+ " sc = spark.sparkContext\n",
+ "print(sc)"
+ ],
"outputs": [
{
+ "output_type": "display_data",
"data": {
"text/html": [
"\n",
@@ -136,26 +148,13 @@
""
]
},
- "metadata": {},
- "output_type": "display_data"
+ "metadata": {}
}
],
- "source": [
- "# Start spark session if needed\n",
- "if not is_databricks():\n",
- " cosmos_connector = (\n",
- " \"https://search.maven.org/remotecontent?filepath=com/microsoft/azure/\"\n",
- " \"azure-cosmosdb-spark_2.3.0_2.11/1.3.3/azure-cosmosdb-spark_2.3.0_2.11-1.3.3-uber.jar\"\n",
- " )\n",
- " jar_filepath = maybe_download(url=cosmos_connector, filename=\"cosmos.jar\")\n",
- " spark = start_or_get_spark(\"ALS\", memory=\"10g\", jars=[jar_filepath])\n",
- " sc = spark.sparkContext\n",
- "display(sc)"
- ]
+ "metadata": {}
},
{
"cell_type": "markdown",
- "metadata": {},
"source": [
"## 1 Service Creation\n",
"Modify the **Subscription ID** to the subscription you would like to deploy to and set the resource name variables.\n",
@@ -169,20 +168,19 @@
"\n",
"1. [Azure Cosmos DB](https://azure.microsoft.com/en-us/services/cosmos-db/)\n",
"1. [Azure Kubernetes Service (AKS)](https://azure.microsoft.com/en-us/services/kubernetes-service/)"
- ]
+ ],
+ "metadata": {}
},
{
"cell_type": "markdown",
- "metadata": {},
"source": [
"**Add your Azure subscription ID**"
- ]
+ ],
+ "metadata": {}
},
{
"cell_type": "code",
"execution_count": 4,
- "metadata": {},
- "outputs": [],
"source": [
"# Add your subscription ID\n",
"subscription_id = \"\"\n",
@@ -196,43 +194,43 @@
"\n",
"# AzureML service and Azure Kubernetes Service prefix\n",
"service_name = \"mvl-als\""
- ]
+ ],
+ "outputs": [],
+ "metadata": {}
},
{
"cell_type": "code",
"execution_count": null,
- "metadata": {},
- "outputs": [],
"source": [
"# Login for Azure CLI so that AzureML can use Azure CLI login credentials\n",
"!az login"
- ]
+ ],
+ "outputs": [],
+ "metadata": {}
},
{
"cell_type": "code",
"execution_count": null,
- "metadata": {},
- "outputs": [],
"source": [
"# Change subscription if needed\n",
"!az account set --subscription {subscription_id}"
- ]
+ ],
+ "outputs": [],
+ "metadata": {}
},
{
"cell_type": "code",
"execution_count": null,
- "metadata": {},
- "outputs": [],
"source": [
"# Check account\n",
"!az account show"
- ]
+ ],
+ "outputs": [],
+ "metadata": {}
},
{
"cell_type": "code",
"execution_count": 9,
- "metadata": {},
- "outputs": [],
"source": [
"# CosmosDB\n",
"# account_name for CosmosDB cannot have \"_\" and needs to be less than 31 chars\n",
@@ -243,26 +241,26 @@
"# AzureML resource names\n",
"model_name = \"{}-reco.mml\".format(service_name)\n",
"aks_name = \"{}-aks\".format(service_name)"
- ]
+ ],
+ "outputs": [],
+ "metadata": {}
},
{
"cell_type": "code",
"execution_count": 10,
- "metadata": {},
- "outputs": [],
"source": [
"# top k items to recommend\n",
"TOP_K = 10\n",
"\n",
"# Select MovieLens data size: 100k, 1m, 10m, or 20m\n",
"MOVIELENS_DATA_SIZE = '100k'"
- ]
+ ],
+ "outputs": [],
+ "metadata": {}
},
{
"cell_type": "code",
"execution_count": 11,
- "metadata": {},
- "outputs": [],
"source": [
"userCol = \"UserId\"\n",
"itemCol = \"MovieId\"\n",
@@ -270,23 +268,21 @@
"\n",
"train_data_path = \"train\"\n",
"test_data_path = \"test\""
- ]
+ ],
+ "outputs": [],
+ "metadata": {}
},
{
"cell_type": "markdown",
- "metadata": {},
"source": [
"### 1.1 Import or create the AzureML Workspace. \n",
"This command will check if the AzureML Workspace exists or not, and will create the workspace if it doesn't exist."
- ]
+ ],
+ "metadata": {}
},
{
"cell_type": "code",
"execution_count": null,
- "metadata": {
- "scrolled": false
- },
- "outputs": [],
"source": [
"ws = Workspace.create(\n",
" name=workspace_name,\n",
@@ -295,31 +291,24 @@
" location=location,\n",
" exist_ok=True\n",
")"
- ]
+ ],
+ "outputs": [],
+ "metadata": {
+ "scrolled": false
+ }
},
{
"cell_type": "markdown",
- "metadata": {},
"source": [
"### 1.2 Create a Cosmos DB to store recommendation results\n",
"\n",
"This step will take some time to create CosmosDB resources."
- ]
+ ],
+ "metadata": {}
},
{
"cell_type": "code",
"execution_count": 13,
- "metadata": {},
- "outputs": [
- {
- "name": "stdout",
- "output_type": "stream",
- "text": [
- "Database created\n",
- "Collection created\n"
- ]
- }
- ],
"source": [
"# explicitly pass subscription_id in case user has multiple subscriptions\n",
"client = get_client_from_cli_profile(\n",
@@ -379,34 +368,57 @@
" Collection=cosmos_collection, \n",
" Upsert=True\n",
")"
- ]
+ ],
+ "outputs": [
+ {
+ "output_type": "stream",
+ "name": "stdout",
+ "text": [
+ "Database created\n",
+ "Collection created\n"
+ ]
+ }
+ ],
+ "metadata": {}
},
{
"cell_type": "markdown",
- "metadata": {},
"source": [
"## 2 Training\n",
"\n",
"Next, we train an [Alternating Least Squares model](https://spark.apache.org/docs/latest/ml-collaborative-filtering.html) on [MovieLens](https://grouplens.org/datasets/movielens/) dataset.\n",
"\n",
"### 2.1 Download the MovieLens dataset"
- ]
+ ],
+ "metadata": {}
},
{
"cell_type": "code",
"execution_count": 14,
- "metadata": {},
+ "source": [
+ "# Note: The DataFrame-based API for ALS currently only supports integers for user and item ids.\n",
+ "schema = StructType(\n",
+ " (\n",
+ " StructField(userCol, IntegerType()),\n",
+ " StructField(itemCol, IntegerType()),\n",
+ " StructField(ratingCol, FloatType()),\n",
+ " )\n",
+ ")\n",
+ "\n",
+ "data = movielens.load_spark_df(spark, size=MOVIELENS_DATA_SIZE, schema=schema)\n",
+ "data.show()"
+ ],
"outputs": [
{
- "name": "stderr",
"output_type": "stream",
+ "name": "stderr",
"text": [
"100%|██████████| 4.81k/4.81k [00:00<00:00, 11.0kKB/s]\n"
]
},
{
- "name": "stdout",
"output_type": "stream",
+ "name": "stdout",
"text": [
"+------+-------+------+\n",
"|UserId|MovieId|Rating|\n",
@@ -437,64 +449,50 @@
]
}
],
- "source": [
- "# Note: The DataFrame-based API for ALS currently only supports integers for user and item ids.\n",
- "schema = StructType(\n",
- " (\n",
- " StructField(userCol, IntegerType()),\n",
- " StructField(itemCol, IntegerType()),\n",
- " StructField(ratingCol, FloatType()),\n",
- " )\n",
- ")\n",
- "\n",
- "data = movielens.load_spark_df(spark, size=MOVIELENS_DATA_SIZE, schema=schema)\n",
- "data.show()"
- ]
+ "metadata": {}
},
{
"cell_type": "markdown",
- "metadata": {},
"source": [
"### 2.2 Split the data into train, test\n",
"There are several ways of splitting the data: random, chronological, stratified, etc., each of which favors a different real-world evaluation use case. We will split randomly in this example – for more details on which splitter to choose, consult [this guide](../01_prepare_data/data_split.ipynb)."
- ]
+ ],
+ "metadata": {}
},
{
"cell_type": "code",
"execution_count": 15,
- "metadata": {},
+ "source": [
+ "train, test = spark_random_split(data, ratio=0.75, seed=42)\n",
+ "print(\"N train\", train.cache().count())\n",
+ "print(\"N test\", test.cache().count())"
+ ],
"outputs": [
{
- "name": "stdout",
"output_type": "stream",
+ "name": "stdout",
"text": [
"N train 75031\n",
"N test 24969\n"
]
}
],
- "source": [
- "train, test = spark_random_split(data, ratio=0.75, seed=42)\n",
- "print(\"N train\", train.cache().count())\n",
- "print(\"N test\", test.cache().count())"
- ]
+ "metadata": {}
},
{
"cell_type": "markdown",
- "metadata": {},
"source": [
"### 2.3 Train the ALS model on the training data\n",
"\n",
"To predict movie ratings, we use the rating data in the training set as users' explicit feedback. The hyperparameters used to estimate the model are set based on [this page](http://mymedialite.net/examples/datasets.html).\n",
"\n",
"Under most circumstances, you would explore the hyperparameters and choose an optimal set based on some criteria. For additional details on this process, please see additional information in the deep dives [here](../04_model_select_and_optimize/tuning_spark_als.ipynb)."
- ]
+ ],
+ "metadata": {}
},
{
"cell_type": "code",
"execution_count": 16,
- "metadata": {},
- "outputs": [],
"source": [
"als = ALS(\n",
" rank=10,\n",
@@ -508,36 +506,45 @@
" itemCol=itemCol,\n",
" ratingCol=ratingCol,\n",
")"
- ]
+ ],
+ "outputs": [],
+ "metadata": {}
},
{
"cell_type": "code",
"execution_count": 17,
- "metadata": {},
- "outputs": [],
"source": [
"model = als.fit(train)"
- ]
+ ],
+ "outputs": [],
+ "metadata": {}
},
{
"cell_type": "markdown",
- "metadata": {},
"source": [
"### 2.4 Get top-k recommendations for our testing data\n",
"\n",
"In the movie recommendation use case, recommending movies that have been rated by the users do not make sense. Therefore, the rated movies are removed from the recommended items.\n",
"\n",
"In order to achieve this, we recommend all movies to all users, and then remove the user-movie pairs that exist in the training dataset."
- ]
+ ],
+ "metadata": {}
},
{
"cell_type": "code",
"execution_count": 18,
- "metadata": {},
+ "source": [
+ "# Get the cross join of all user-item pairs and score them.\n",
+ "users = train.select(userCol).distinct()\n",
+ "items = train.select(itemCol).distinct()\n",
+ "user_item = users.crossJoin(items)\n",
+ "dfs_pred = model.transform(user_item)\n",
+ "dfs_pred.show()"
+ ],
"outputs": [
{
- "name": "stdout",
"output_type": "stream",
+ "name": "stdout",
"text": [
"+------+-------+----------+\n",
"|UserId|MovieId|prediction|\n",
@@ -568,23 +575,27 @@
]
}
],
- "source": [
- "# Get the cross join of all user-item pairs and score them.\n",
- "users = train.select(userCol).distinct()\n",
- "items = train.select(itemCol).distinct()\n",
- "user_item = users.crossJoin(items)\n",
- "dfs_pred = model.transform(user_item)\n",
- "dfs_pred.show()"
- ]
+ "metadata": {}
},
{
"cell_type": "code",
"execution_count": 19,
- "metadata": {},
+ "source": [
+ "# Remove seen items.\n",
+ "dfs_pred_exclude_train = dfs_pred.alias(\"pred\").join(\n",
+ " train.alias(\"train\"),\n",
+ " (dfs_pred[userCol]==train[userCol]) & (dfs_pred[itemCol]==train[itemCol]),\n",
+ " how='outer'\n",
+ ")\n",
+ "top_all = dfs_pred_exclude_train.filter(dfs_pred_exclude_train[\"train.\"+ratingCol].isNull()) \\\n",
+ " .select(\"pred.\"+userCol, \"pred.\"+itemCol, \"pred.prediction\")\n",
+ "\n",
+ "top_all.show()"
+ ],
"outputs": [
{
- "name": "stdout",
"output_type": "stream",
+ "name": "stdout",
"text": [
"+------+-------+----------+\n",
"|UserId|MovieId|prediction|\n",
@@ -615,36 +626,34 @@
]
}
],
- "source": [
- "# Remove seen items.\n",
- "dfs_pred_exclude_train = dfs_pred.alias(\"pred\").join(\n",
- " train.alias(\"train\"),\n",
- " (dfs_pred[userCol]==train[userCol]) & (dfs_pred[itemCol]==train[itemCol]),\n",
- " how='outer'\n",
- ")\n",
- "top_all = dfs_pred_exclude_train.filter(dfs_pred_exclude_train[\"train.\"+ratingCol].isNull()) \\\n",
- " .select(\"pred.\"+userCol, \"pred.\"+itemCol, \"pred.prediction\")\n",
- "\n",
- "top_all.show()"
- ]
+ "metadata": {}
},
{
"cell_type": "markdown",
- "metadata": {},
"source": [
"### 2.5 Evaluate how well ALS performs\n",
"\n",
"Evaluate model performance using metrics such as Precision@K, Recall@K, [MAP@K](https://en.wikipedia.org/wiki/Evaluation_measures_\\(information_retrieval\\) or [nDCG@K](https://en.wikipedia.org/wiki/Discounted_cumulative_gain). For a full guide on what metrics to evaluate your recommender with, consult [this guide]../03_evaluate/evaluation.ipynb)."
- ]
+ ],
+ "metadata": {}
},
{
"cell_type": "code",
"execution_count": 20,
- "metadata": {},
+ "source": [
+ "cols = {\n",
+ " 'col_user': userCol,\n",
+ " 'col_item': itemCol,\n",
+ " 'col_rating': ratingCol,\n",
+ " 'col_prediction': \"prediction\",\n",
+ "}\n",
+ "\n",
+ "test.show()"
+ ],
"outputs": [
{
- "name": "stdout",
"output_type": "stream",
+ "name": "stdout",
"text": [
"+------+-------+------+\n",
"|UserId|MovieId|Rating|\n",
@@ -675,35 +684,11 @@
]
}
],
- "source": [
- "cols = {\n",
- " 'col_user': userCol,\n",
- " 'col_item': itemCol,\n",
- " 'col_rating': ratingCol,\n",
- " 'col_prediction': \"prediction\",\n",
- "}\n",
- "\n",
- "test.show()"
- ]
+ "metadata": {}
},
{
"cell_type": "code",
"execution_count": 21,
- "metadata": {},
- "outputs": [
- {
- "name": "stdout",
- "output_type": "stream",
- "text": [
- "Model:\tALS\n",
- "Top K:\t10\n",
- "MAP:\t0.003698\n",
- "NDCG:\t0.034331\n",
- "Precision@K:\t0.039343\n",
- "Recall@K:\t0.014976\n"
- ]
- }
- ],
"source": [
"# Evaluate Ranking Metrics\n",
"rank_eval = SparkRankingEvaluation(\n",
@@ -721,25 +706,26 @@
" \"Precision@K:\\t%f\" % rank_eval.precision_at_k(),\n",
" \"Recall@K:\\t%f\" % rank_eval.recall_at_k(), sep='\\n'\n",
")"
- ]
- },
- {
- "cell_type": "code",
- "execution_count": 22,
- "metadata": {},
+ ],
"outputs": [
{
- "name": "stdout",
"output_type": "stream",
+ "name": "stdout",
"text": [
- "Model:\tALS rating prediction\n",
- "RMSE:\t0.95\n",
- "MAE:\t0.740282\n",
- "Explained variance:\t0.289807\n",
- "R squared:\t0.285394\n"
+ "Model:\tALS\n",
+ "Top K:\t10\n",
+ "MAP:\t0.003698\n",
+ "NDCG:\t0.034331\n",
+ "Precision@K:\t0.039343\n",
+ "Recall@K:\t0.014976\n"
]
}
],
+ "metadata": {}
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 22,
"source": [
"# Evaluate Rating Metrics\n",
"prediction = model.transform(test)\n",
@@ -756,52 +742,71 @@
" \"Explained variance:\\t%f\" % rating_eval.exp_var(),\n",
" \"R squared:\\t%f\" % rating_eval.rsquared(), sep='\\n'\n",
")"
- ]
+ ],
+ "outputs": [
+ {
+ "output_type": "stream",
+ "name": "stdout",
+ "text": [
+ "Model:\tALS rating prediction\n",
+ "RMSE:\t0.95\n",
+ "MAE:\t0.740282\n",
+ "Explained variance:\t0.289807\n",
+ "R squared:\t0.285394\n"
+ ]
+ }
+ ],
+ "metadata": {}
},
{
"cell_type": "markdown",
- "metadata": {},
"source": [
"### 2.6 Save the model"
- ]
+ ],
+ "metadata": {}
},
{
"cell_type": "code",
"execution_count": 23,
- "metadata": {},
- "outputs": [],
"source": [
"(model\n",
" .write()\n",
" .overwrite()\n",
" .save(model_name))"
- ]
+ ],
+ "outputs": [],
+ "metadata": {}
},
{
"cell_type": "markdown",
- "metadata": {},
"source": [
"## 3. Operationalize the Recommender Service\n",
"Once the model is built with desirable performance, it will be operationalized to run as a REST endpoint to be utilized by a real time service. We will utilize [Azure Cosmos DB](https://azure.microsoft.com/en-us/services/cosmos-db/), [Azure Machine Learning Service](https://azure.microsoft.com/en-us/services/machine-learning-service/), and [Azure Kubernetes Service](https://docs.microsoft.com/en-us/azure/aks/intro-kubernetes) to operationalize the recommender service."
- ]
+ ],
+ "metadata": {}
},
{
"cell_type": "markdown",
- "metadata": {},
"source": [
"### 3.1 Create a look-up for Recommendations in Cosmos DB\n",
"\n",
"First, the Top-10 recommendations for each user as predicted by the model are stored as a lookup table in Cosmos DB. At runtime, the service will return the Top-10 recommendations as precomputed and stored in Cosmos DB:"
- ]
+ ],
+ "metadata": {}
},
{
"cell_type": "code",
"execution_count": 24,
- "metadata": {},
+ "source": [
+ "recs = model.recommendForAllUsers(10)\n",
+ "recs_topk = recs.withColumn(\"id\", recs[userCol].cast(\"string\")) \\\n",
+ " .select(\"id\", \"recommendations.\" + itemCol)\n",
+ "recs_topk.show()"
+ ],
"outputs": [
{
- "name": "stdout",
"output_type": "stream",
+ "name": "stdout",
"text": [
"+---+--------------------+\n",
"| id| MovieId|\n",
@@ -832,18 +837,11 @@
]
}
],
- "source": [
- "recs = model.recommendForAllUsers(10)\n",
- "recs_topk = recs.withColumn(\"id\", recs[userCol].cast(\"string\")) \\\n",
- " .select(\"id\", \"recommendations.\" + itemCol)\n",
- "recs_topk.show()"
- ]
+ "metadata": {}
},
{
"cell_type": "code",
"execution_count": 25,
- "metadata": {},
- "outputs": [],
"source": [
"# Save data to CosmosDB\n",
"(recs_topk.coalesce(1)\n",
@@ -852,22 +850,22 @@
" .mode('overwrite')\n",
" .options(**dbsecrets)\n",
" .save())"
- ]
+ ],
+ "outputs": [],
+ "metadata": {}
},
{
"cell_type": "markdown",
- "metadata": {},
"source": [
"### 3.2 Configure Azure Machine Learning\n",
"\n",
"Next, Azure Machine Learning Service is used to create a model scoring image and deploy it to Azure Kubernetes Service as a scalable containerized service. To achieve this, a **scoring script** should be created. In the script, we make a call to Cosmos DB to lookup the top 10 movies to recommend given an input User ID."
- ]
+ ],
+ "metadata": {}
},
{
"cell_type": "code",
"execution_count": 26,
- "metadata": {},
- "outputs": [],
"source": [
"score_sparkml = \"\"\"\n",
"import json\n",
@@ -902,29 +900,20 @@
"\n",
"with open(\"score_sparkml.py\", \"w\") as file:\n",
" file.write(score_sparkml)"
- ]
+ ],
+ "outputs": [],
+ "metadata": {}
},
{
"cell_type": "markdown",
- "metadata": {},
"source": [
"Register your model:"
- ]
+ ],
+ "metadata": {}
},
{
"cell_type": "code",
"execution_count": 27,
- "metadata": {},
- "outputs": [
- {
- "name": "stdout",
- "output_type": "stream",
- "text": [
- "Registering model mvl-als-reco.mml\n",
- "mvl-als-reco.mml AML trained model 1\n"
- ]
- }
- ],
"source": [
"mymodel = Model.register(\n",
" model_path=model_name, # this points to a local file\n",
@@ -934,27 +923,36 @@
")\n",
"\n",
"print(mymodel.name, mymodel.description, mymodel.version)"
- ]
+ ],
+ "outputs": [
+ {
+ "output_type": "stream",
+ "name": "stdout",
+ "text": [
+ "Registering model mvl-als-reco.mml\n",
+ "mvl-als-reco.mml AML trained model 1\n"
+ ]
+ }
+ ],
+ "metadata": {}
},
{
"cell_type": "markdown",
- "metadata": {},
"source": [
"### 3.3 Deploy the model as a Service on AKS"
- ]
+ ],
+ "metadata": {}
},
{
"cell_type": "markdown",
- "metadata": {},
"source": [
"#### 3.3.1 Create an Environment for your model:"
- ]
+ ],
+ "metadata": {}
},
{
"cell_type": "code",
"execution_count": 28,
- "metadata": {},
- "outputs": [],
"source": [
"env = Environment(name='sparkmlenv')\n",
"\n",
@@ -981,31 +979,21 @@
" SparkPackage(\"com.microsoft.azure\", artifact=\"azure-storage\", version=\"2.0.0\"),\n",
" SparkPackage(group=\"org.apache.hadoop\", artifact=\"hadoop-azure\", version=\"2.7.0\")\n",
"]"
- ]
+ ],
+ "outputs": [],
+ "metadata": {}
},
{
"cell_type": "markdown",
- "metadata": {},
"source": [
"#### 3.3.2 Create an AKS Cluster to run your container\n",
"This may take 20 to 30 minutes depending on the cluster size."
- ]
+ ],
+ "metadata": {}
},
{
"cell_type": "code",
"execution_count": 29,
- "metadata": {},
- "outputs": [
- {
- "name": "stdout",
- "output_type": "stream",
- "text": [
- "Creating.......................................................................................................\n",
- "SucceededProvisioning operation finished, operation \"Succeeded\"\n",
- "Succeeded\n"
- ]
- }
- ],
"source": [
"# Verify that cluster does not exist already\n",
"try:\n",
@@ -1022,29 +1010,30 @@
" aks_target.wait_for_completion(show_output = True)\n",
" print(aks_target.provisioning_state)\n",
" # To check any error logs, print(aks_target.provisioning_errors)"
- ]
+ ],
+ "outputs": [
+ {
+ "output_type": "stream",
+ "name": "stdout",
+ "text": [
+ "Creating.......................................................................................................\n",
+ "SucceededProvisioning operation finished, operation \"Succeeded\"\n",
+ "Succeeded\n"
+ ]
+ }
+ ],
+ "metadata": {}
},
{
"cell_type": "markdown",
- "metadata": {},
"source": [
"#### 3.3.3 Deploy the container image to AKS:"
- ]
+ ],
+ "metadata": {}
},
{
"cell_type": "code",
"execution_count": 30,
- "metadata": {},
- "outputs": [
- {
- "name": "stdout",
- "output_type": "stream",
- "text": [
- "Running....................................................................................................................\n",
- "SucceededAKS service creation operation finished, operation \"Succeeded\"\n"
- ]
- }
- ],
"source": [
"# Create an Inferencing Configuration with your environment and scoring script\n",
"inference_config = InferenceConfig(\n",
@@ -1070,25 +1059,61 @@
" # Retrieve existing service.\n",
" aks_service = Webservice(ws, name=service_name)\n",
" print(\"Retrieved existing service\")"
- ]
+ ],
+ "outputs": [
+ {
+ "output_type": "stream",
+ "name": "stdout",
+ "text": [
+ "Running....................................................................................................................\n",
+ "SucceededAKS service creation operation finished, operation \"Succeeded\"\n"
+ ]
+ }
+ ],
+ "metadata": {}
},
{
"cell_type": "markdown",
- "metadata": {},
"source": [
"### 3.4 Call the AKS model service\n",
"After the deployment, the service can be called with a user ID – the service will then look up the top 10 recommendations for that user in Cosmos DB and send back the results.\n",
"The following script demonstrates how to call the recommendation service API and view the result for the given user ID:"
- ]
+ ],
+ "metadata": {}
},
{
"cell_type": "code",
"execution_count": 31,
- "metadata": {},
+ "source": [
+ "import json\n",
+ "\n",
+ "scoring_url = aks_service.scoring_uri\n",
+ "service_key = aks_service.get_keys()[0]\n",
+ "\n",
+ "input_data = '[\"{\\\\\"id\\\\\":\\\\\"496\\\\\"}\"]'.encode()\n",
+ "\n",
+ "req = urllib.request.Request(scoring_url, data=input_data)\n",
+ "req.add_header(\"Authorization\",\"Bearer {}\".format(service_key))\n",
+ "req.add_header(\"Content-Type\",\"application/json\")\n",
+ "\n",
+ "with Timer() as t: \n",
+ " with urllib.request.urlopen(req) as result:\n",
+ " res = result.read()\n",
+ " resj = json.loads(\n",
+ " # Cleanup to parse into a json object\n",
+ " res.decode(\"utf-8\")\n",
+ " .replace(\"\\\\\", \"\")\n",
+ " .replace('\"', \"\")\n",
+ " .replace(\"'\", '\"')\n",
+ " )\n",
+ " print(json.dumps(resj, indent=4))\n",
+ " \n",
+ "print(\"Full run took %.2f seconds\" % t.interval)"
+ ],
"outputs": [
{
- "name": "stdout",
"output_type": "stream",
+ "name": "stdout",
"text": [
"{\n",
" \"MovieId\": [\n",
@@ -1114,36 +1139,10 @@
]
}
],
- "source": [
- "import json\n",
- "\n",
- "scoring_url = aks_service.scoring_uri\n",
- "service_key = aks_service.get_keys()[0]\n",
- "\n",
- "input_data = '[\"{\\\\\"id\\\\\":\\\\\"496\\\\\"}\"]'.encode()\n",
- "\n",
- "req = urllib.request.Request(scoring_url, data=input_data)\n",
- "req.add_header(\"Authorization\",\"Bearer {}\".format(service_key))\n",
- "req.add_header(\"Content-Type\",\"application/json\")\n",
- "\n",
- "with Timer() as t: \n",
- " with urllib.request.urlopen(req) as result:\n",
- " res = result.read()\n",
- " resj = json.loads(\n",
- " # Cleanup to parse into a json object\n",
- " res.decode(\"utf-8\")\n",
- " .replace(\"\\\\\", \"\")\n",
- " .replace('\"', \"\")\n",
- " .replace(\"'\", '\"')\n",
- " )\n",
- " print(json.dumps(resj, indent=4))\n",
- " \n",
- "print(\"Full run took %.2f seconds\" % t.interval)"
- ]
+ "metadata": {}
},
{
"cell_type": "markdown",
- "metadata": {},
"source": [
"## Appendix - Realtime scoring with AzureML\n",
"\n",
@@ -1193,14 +1192,15 @@
" \n",
" ...\n",
" ```"
- ]
+ ],
+ "metadata": {}
},
{
"cell_type": "code",
"execution_count": null,
- "metadata": {},
+ "source": [],
"outputs": [],
- "source": []
+ "metadata": {}
}
],
"metadata": {
diff --git a/recommenders/utils/spark_utils.py b/recommenders/utils/spark_utils.py
index 7f5d0cff8d..6e6f625944 100644
--- a/recommenders/utils/spark_utils.py
+++ b/recommenders/utils/spark_utils.py
@@ -2,7 +2,6 @@
# Licensed under the MIT License.
import os
-import sys
try:
@@ -11,6 +10,9 @@
pass # skip this import if we are in pure python environment
+MMLSPARK_PACKAGE = "com.microsoft.ml.spark:mmlspark_2.11:0.18.1"
+MMLSPARK_REPO = "https://mvnrepository.com/artifact"
+
def start_or_get_spark(
app_name="Sample",
url="local[*]",
@@ -18,18 +20,18 @@ def start_or_get_spark(
config=None,
packages=None,
jars=None,
- repository=None,
+ repositories=None,
):
"""Start Spark if not started
Args:
- app_name (str): Set name of the application
+ app_name (str): set name of the application
url (str): URL for spark master
- memory (str): Size of memory for spark driver
+ memory (str): size of memory for spark driver
config (dict): dictionary of configuration options
packages (list): list of packages to install
jars (list): list of jar files to add
- repository (str): The maven repository
+ repositories (list): list of maven repositories
Returns:
object: Spark context.
@@ -40,8 +42,8 @@ def start_or_get_spark(
submit_args = "--packages {} ".format(",".join(packages))
if jars is not None:
submit_args += "--jars {} ".format(",".join(jars))
- if repository is not None:
- submit_args += "--repositories {}".format(repository)
+ if repositories is not None:
+ submit_args += "--repositories {}".format(",".join(repositories))
if submit_args:
os.environ["PYSPARK_SUBMIT_ARGS"] = "{} pyspark-shell".format(submit_args)
diff --git a/tools/databricks_install.py b/tools/databricks_install.py
index 7e0ffffcee..e1421eb1da 100644
--- a/tools/databricks_install.py
+++ b/tools/databricks_install.py
@@ -3,7 +3,7 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License.
-# This script installs Recommenders/recommenders as an egg library onto a Databricks Workspace
+# This script installs Recommenders/recommenders from PyPI onto a Databricks Workspace
# Optionally, also installs a version of mmlspark as a maven library, and prepares the cluster
# for operationalization
@@ -12,7 +12,7 @@
import os
from pathlib import Path
-import shutil
+import pkg_resources
import sys
import time
from urllib.request import urlretrieve
@@ -26,6 +26,7 @@
from databricks_cli.libraries.api import LibrariesApi
from databricks_cli.dbfs.dbfs_path import DbfsPath
+from recommenders.utils.spark_utils import MMLSPARK_PACKAGE, MMLSPARK_REPO
CLUSTER_NOT_FOUND_MSG = """
Cannot find the target cluster {}. Please check if you entered the valid id.
@@ -46,12 +47,13 @@
"3": "https://search.maven.org/remotecontent?filepath=com/microsoft/azure/azure-cosmosdb-spark_2.2.0_2.11/1.1.1/azure-cosmosdb-spark_2.2.0_2.11-1.1.1-uber.jar",
"4": "https://search.maven.org/remotecontent?filepath=com/microsoft/azure/azure-cosmosdb-spark_2.3.0_2.11/1.2.2/azure-cosmosdb-spark_2.3.0_2.11-1.2.2-uber.jar",
"5": "https://search.maven.org/remotecontent?filepath=com/microsoft/azure/azure-cosmosdb-spark_2.4.0_2.11/1.3.5/azure-cosmosdb-spark_2.4.0_2.11-1.3.5-uber.jar",
+ "6": "https://search.maven.org/remotecontent?filepath=com/microsoft/azure/azure-cosmosdb-spark_2.4.0_2.11/3.7.0/azure-cosmosdb-spark_2.4.0_2.11-3.7.0-uber.jar"
}
MMLSPARK_INFO = {
"maven": {
- "coordinates": "com.microsoft.ml.spark:mmlspark_2.11:0.18.1",
- "repo": "https://mvnrepository.com/artifact",
+ "coordinates": MMLSPARK_PACKAGE,
+ "repo": MMLSPARK_REPO,
}
}
@@ -68,50 +70,35 @@
5 * 60 / PENDING_SLEEP_INTERVAL
) # wait a maximum of 5 minutes...
-## Additional dependencies met below.
-
-
-def create_egg(
- path_to_recommenders_repo_root=os.getcwd(),
- local_eggname="Recommenders.egg",
- overwrite=False,
-):
- """
- Packages files in the recommenders directory as a .egg file that can be uploaded to dbfs and installed as a library on a databricks cluster.
-
- Args:
- path_to_recommenders_repo_root (str): the (relative or absolute) path to the root of the recommenders repository
- local_eggname (str): the basename of the egg you want to create (NOTE: must have .egg extension)
- overwrite (bool): whether to overwrite local_eggname if it already exists.
+## dependencies from PyPI
+PYPI_PREREQS = [
+ "pip==21.2.4",
+ "setuptools==54.0.0",
+ "numpy==1.18.0"
+]
- Returns:
- the path to the created egg file.
- """
- # create the zip archive:
- myzipfile = shutil.make_archive(
- "recommenders",
- "zip",
- root_dir=path_to_recommenders_repo_root,
- base_dir="recommenders",
- )
+PYPI_EXTRA_DEPS = [
+ "azure-cli-core==2.0.75",
+ "azure-mgmt-cosmosdb==0.8.0",
+ "azureml-sdk[databricks]",
+ "azure-storage-blob<=2.1.0",
+]
- # overwrite egg if it previously existed
- if os.path.exists(local_eggname) and overwrite:
- os.unlink(local_eggname)
- os.rename(myzipfile, local_eggname)
- return local_eggname
+PYPI_O16N_LIBS = [
+ "pydocumentdb>=2.3.3",
+]
+## Additional dependencies met below.
def dbfs_file_exists(api_client, dbfs_path):
- """
- Checks to determine whether a file exists.
+ """Checks to determine whether a file exists.
Args:
api_client (ApiClient object): Object used for authenticating to the workspace
dbfs_path (str): Path to check
Returns:
- True if file exists on dbfs, False otherwise.
+ bool: True if file exists on dbfs, False otherwise.
"""
try:
DbfsApi(api_client).list_files(dbfs_path=DbfsPath(dbfs_path))
@@ -121,6 +108,21 @@ def dbfs_file_exists(api_client, dbfs_path):
return file_exists
+def get_installed_libraries(api_client, cluster_id):
+ """Returns the installed PyPI packages and the ones that failed.
+
+ Args:
+ api_client (ApiClient object): object used for authenticating to the workspace
+ cluster_id (str): id of the cluster
+
+ Returns:
+ Dict[str, str]: dictionary of {package: status}
+ """
+ cluster_status = LibrariesApi(api_client).cluster_status(cluster_id)
+ libraries = {lib["library"]["pypi"]["package"]: lib["status"] for lib in cluster_status["library_statuses"] if "pypi" in lib["library"]}
+ return {pkg_resources.Requirement.parse(package).name: libraries[package] for package in libraries}
+
+
def prepare_for_operationalization(
cluster_id, api_client, dbfs_path, overwrite, spark_version
):
@@ -172,7 +174,7 @@ def prepare_for_operationalization(
if __name__ == "__main__":
parser = argparse.ArgumentParser(
description="""
- This script packages the recommenders directory into a .egg file and installs it onto a databricks cluster.
+ This script installs the recommenders package from PyPI onto a databricks cluster.
Optionally, this script may also install the mmlspark library, and it may also install additional libraries useful
for operationalization. This script requires that you have installed databricks-cli in the python environment in
which you are running this script, and that have you have already configured it with a profile.
@@ -189,11 +191,6 @@ def prepare_for_operationalization(
help="The path to the root of the recommenders repository. Default assumes that the script is run in the root of the repository",
default=".",
)
- parser.add_argument(
- "--eggname",
- help="Name of the egg you want to generate. Useful if you want to name based on branch or date.",
- default="Recommenders.egg",
- )
parser.add_argument(
"--dbfs-path",
help="The directory on dbfs that want to place files in",
@@ -221,32 +218,8 @@ def prepare_for_operationalization(
)
args = parser.parse_args()
- # Check for extension of eggname
- if not args.eggname.endswith(".egg"):
- args.eggname += ".egg"
-
# make sure path_to_recommenders is on sys.path to allow for import
sys.path.append(args.path_to_recommenders)
- from tools.generate_conda_file import PIP_BASE, CONDA_BASE
-
- ## depend on PIP_BASE:
- PYPI_RECO_LIB_DEPS = [CONDA_BASE["tqdm"]]
-
- PYPI_O16N_LIBS = [
- "azure-cli==2.0.56",
- "azureml-sdk[databricks]==1.0.69",
- PIP_BASE["pydocumentdb"],
- ]
-
- #################
- # Create the egg:
- #################
-
- print("Preparing Recommenders library file ({})...".format(args.eggname))
- myegg = create_egg(
- args.path_to_recommenders, local_eggname=args.eggname, overwrite=args.overwrite
- )
- print("Created: {}".format(myegg))
############################
# Interact with Databricks:
@@ -267,29 +240,6 @@ def prepare_for_operationalization(
)
)
- # Upload the egg:
- upload_path = Path(args.dbfs_path, args.eggname).as_posix()
-
- # Check if file exists to alert user.
- print("Uploading {} to databricks at {}".format(args.eggname, upload_path))
- if dbfs_file_exists(my_api_client, upload_path):
- if args.overwrite:
- print("Overwriting file at {}".format(upload_path))
- else:
- raise IOError(
- """
- {} already exists on databricks cluster.
- This is likely an older version of the library.
- Please use the '--overwrite' flag to proceed.
- """.format(
- upload_path
- )
- )
-
- DbfsApi(my_api_client).cp(
- recursive=False, src=myegg, dst=upload_path, overwrite=args.overwrite
- )
-
# steps below require the cluster to be running. Check status
try:
status = ClusterApi(my_api_client).get_cluster(args.cluster_id)
@@ -329,21 +279,48 @@ def prepare_for_operationalization(
)
sys.exit()
+
+ # install prerequisites
+ print(
+ "Installing required Python libraries onto databricks cluster {}".format(
+ args.cluster_id
+ )
+ )
+ libs2install = [{"pypi": {"package": i}} for i in PYPI_PREREQS]
+ LibrariesApi(my_api_client).install_libraries(args.cluster_id, libs2install)
+
# install the library and its dependencies
print(
- "Installing the recommenders module onto databricks cluster {}".format(
+ "Installing the recommenders package onto databricks cluster {}".format(
args.cluster_id
)
)
- libs2install = [{"egg": upload_path}]
- # PYPI dependencies:
- libs2install.extend([{"pypi": {"package": i}} for i in PYPI_RECO_LIB_DEPS])
+ LibrariesApi(my_api_client).install_libraries(args.cluster_id, [{"pypi": {"package": "recommenders"}}])
+
+ # pip cannot handle everything together, so wait until recommenders package is installed
+ installed_libraries = get_installed_libraries(my_api_client, args.cluster_id)
+ while "recommenders" not in installed_libraries:
+ time.sleep(PENDING_SLEEP_INTERVAL)
+ installed_libraries = get_installed_libraries(my_api_client, args.cluster_id)
+ while installed_libraries["recommenders"] != "INSTALLED":
+ time.sleep(PENDING_SLEEP_INTERVAL)
+ installed_libraries = get_installed_libraries(my_api_client, args.cluster_id)
+ if installed_libraries["recommenders"] == "FAILED":
+ raise Exception("recommenders package failed to install")
+
+ # additional PyPI dependencies:
+ libs2install = [{"pypi": {"package": i}} for i in PYPI_EXTRA_DEPS]
# add mmlspark if selected.
if args.mmlspark:
print("Installing MMLSPARK package...")
libs2install.extend([MMLSPARK_INFO])
- print(libs2install)
+ print(
+ "Installing {} onto databricks cluster {}".format(
+ libs2install,
+ args.cluster_id
+ )
+ )
LibrariesApi(my_api_client).install_libraries(args.cluster_id, libs2install)
# prepare for operationalization if desired: