diff --git a/examples/06_benchmarks/benchmark_utils.py b/examples/06_benchmarks/benchmark_utils.py index 561eea4d40..867b4c66ee 100644 --- a/examples/06_benchmarks/benchmark_utils.py +++ b/examples/06_benchmarks/benchmark_utils.py @@ -5,13 +5,20 @@ import numpy as np import pandas as pd from tempfile import TemporaryDirectory -from pyspark.ml.recommendation import ALS -from pyspark.sql.types import StructType, StructField -from pyspark.sql.types import FloatType, IntegerType, LongType -from fastai.collab import collab_learner, CollabDataBunch import surprise import cornac +try: + from pyspark.ml.recommendation import ALS + from pyspark.sql.types import StructType, StructField + from pyspark.sql.types import FloatType, IntegerType, LongType +except ImportError: + pass # skip this import if we are not in a Spark environment +try: + from fastai.collab import collab_learner, CollabDataBunch +except ImportError: + pass # skip this import if we are not in a GPU environment +from recommenders.utils.timer import Timer from recommenders.utils.constants import ( COL_DICT, DEFAULT_K, @@ -22,27 +29,12 @@ DEFAULT_TIMESTAMP_COL, SEED, ) -from recommenders.utils.timer import Timer -from recommenders.utils.spark_utils import start_or_get_spark from recommenders.models.sar import SAR -from recommenders.models.ncf.ncf_singlenode import NCF -from recommenders.models.ncf.dataset import Dataset as NCFDataset from recommenders.models.surprise.surprise_utils import ( predict, compute_ranking_predictions, ) -from recommenders.models.fastai.fastai_utils import ( - cartesian_product, - score, -) from recommenders.models.cornac.cornac_utils import predict_ranking -from recommenders.models.deeprec.models.graphrec.lightgcn import LightGCN -from recommenders.models.deeprec.DataModel.ImplicitCF import ImplicitCF -from recommenders.models.deeprec.deeprec_utils import prepare_hparams -from recommenders.evaluation.spark_evaluation import ( - SparkRatingEvaluation, - SparkRankingEvaluation, -) from recommenders.evaluation.python_evaluation import ( map_at_k, ndcg_at_k, @@ -51,6 +43,26 @@ ) from recommenders.evaluation.python_evaluation import rmse, mae, rsquared, exp_var +try: + from recommenders.utils.spark_utils import start_or_get_spark + from recommenders.evaluation.spark_evaluation import ( + SparkRatingEvaluation, + SparkRankingEvaluation, + ) +except (ImportError,NameError): + pass # skip this import if we are not in a Spark environment +try: + from recommenders.models.deeprec.deeprec_utils import prepare_hparams + from recommenders.models.fastai.fastai_utils import ( + cartesian_product, + score, + ) + from recommenders.models.deeprec.models.graphrec.lightgcn import LightGCN + from recommenders.models.deeprec.DataModel.ImplicitCF import ImplicitCF + from recommenders.models.ncf.ncf_singlenode import NCF + from recommenders.models.ncf.dataset import Dataset as NCFDataset +except ImportError: + pass # skip this import if we are not in a GPU environment # Helpers tmp_dir = TemporaryDirectory() @@ -235,8 +247,6 @@ def recommend_k_fastai(model, test, train, top_k=DEFAULT_K, remove_seen=True): def prepare_training_ncf(df_train, df_test): - #df_train.sort_values(["userID"], axis=0, ascending=[True], inplace=True) - #df_test.sort_values(["userID"], axis=0, ascending=[True], inplace=True) train = df_train.sort_values(["userID"], axis=0, ascending=[True]) test = df_test.sort_values(["userID"], axis=0, ascending=[True]) test = test[df_test["userID"].isin(train["userID"].unique())] diff --git a/examples/06_benchmarks/movielens.ipynb b/examples/06_benchmarks/movielens.ipynb index 28dc5ceb1c..28b7381159 100644 --- a/examples/06_benchmarks/movielens.ipynb +++ b/examples/06_benchmarks/movielens.ipynb @@ -69,7 +69,7 @@ }, { "cell_type": "code", - "execution_count": 1, + "execution_count": 2, "metadata": {}, "outputs": [], "source": [ @@ -81,7 +81,7 @@ }, { "cell_type": "code", - "execution_count": 2, + "execution_count": 11, "metadata": {}, "outputs": [ { @@ -90,17 +90,19 @@ "text": [ "System version: 3.7.13 (default, Mar 29 2022, 02:18:16) \n", "[GCC 7.5.0]\n", + "Number of cores: 6\n", "NumPy version: 1.21.6\n", "Pandas version: 1.3.5\n", - "PySpark version: 3.2.2\n", "Surprise version: 1.1.1\n", - "PyTorch version: 1.12.1+cu102\n", - "Fast AI version: 1.0.61\n", "Cornac version: 1.14.2\n", - "TensorFlow version: 2.7.4\n", + "PySpark version: 3.2.2\n", "CUDA version: 10.2\n", "CuDNN version: 7605\n", - "Number of cores: 6\n" + "TensorFlow version: 2.7.4\n", + "PyTorch version: 1.12.1+cu102\n", + "Fast AI version: 1.0.61\n", + "The autoreload extension is already loaded. To reload it, use:\n", + " %reload_ext autoreload\n" ] } ], @@ -108,38 +110,62 @@ "import os\n", "import sys\n", "import json\n", - "import pandas as pd\n", "import numpy as np\n", + "import pandas as pd\n", "import seaborn as sns\n", - "import pyspark\n", - "import tensorflow as tf # NOTE: TF needs to be imported before PyTorch, otherwise we get a weird initialization error\n", - "tf.get_logger().setLevel('ERROR') # only show error messages\n", - "import torch\n", - "import fastai\n", + "import scrapbook as sb\n", "import surprise\n", "import cornac\n", "\n", - "from recommenders.utils.spark_utils import start_or_get_spark\n", - "from recommenders.utils.general_utils import get_number_processors\n", - "from recommenders.utils.gpu_utils import get_cuda_version, get_cudnn_version\n", - "from recommenders.datasets import movielens\n", - "from recommenders.datasets.python_splitters import python_stratified_split\n", - "from recommenders.models.fastai.fastai_utils import hide_fastai_progress_bar\n", + "try:\n", + " import pyspark\n", + "except ImportError:\n", + " pass # skip this import if we are not in a Spark environment\n", + "\n", + "try:\n", + " import tensorflow as tf # NOTE: TF needs to be imported before PyTorch, otherwise we get an error\n", + " tf.get_logger().setLevel('ERROR') # only show error messages\n", + " import torch\n", + " import fastai\n", + "except ImportError:\n", + " pass # skip this import if we are not in a GPU environment\n", "\n", + "current_path = os.path.join(os.getcwd(), \"examples\", \"06_benchmarks\") # To execute the notebook programmatically from root folder\n", + "sys.path.append(current_path)\n", "from benchmark_utils import * \n", "\n", + "from recommenders.datasets import movielens\n", + "from recommenders.utils.general_utils import get_number_processors\n", + "from recommenders.datasets.python_splitters import python_stratified_split\n", + "try:\n", + " from recommenders.utils.spark_utils import start_or_get_spark\n", + "except ImportError:\n", + " pass # skip this import if we are not in a Spark environment\n", + "try:\n", + " from recommenders.utils.gpu_utils import get_cuda_version, get_cudnn_version\n", + " from recommenders.models.fastai.fastai_utils import hide_fastai_progress_bar\n", + " hide_fastai_progress_bar()\n", + "except ImportError:\n", + " pass # skip this import if we are not in a GPU environment\n", + "\n", "print(f\"System version: {sys.version}\")\n", + "print(f\"Number of cores: {get_number_processors()}\")\n", "print(f\"NumPy version: {np.__version__}\")\n", "print(f\"Pandas version: {pd.__version__}\")\n", - "print(f\"PySpark version: {pyspark.__version__}\")\n", "print(f\"Surprise version: {surprise.__version__}\")\n", - "print(f\"PyTorch version: {torch.__version__}\")\n", - "print(f\"Fast AI version: {fastai.__version__}\")\n", "print(f\"Cornac version: {cornac.__version__}\")\n", - "print(f\"TensorFlow version: {tf.__version__}\")\n", - "print(f\"CUDA version: {get_cuda_version()}\")\n", - "print(f\"CuDNN version: {get_cudnn_version()}\")\n", - "print(f\"Number of cores: {get_number_processors()}\")\n", + "try:\n", + " print(f\"PySpark version: {pyspark.__version__}\")\n", + "except NameError:\n", + " pass # skip this import if we are not in a Spark environment\n", + "try:\n", + " print(f\"CUDA version: {get_cuda_version()}\")\n", + " print(f\"CuDNN version: {get_cudnn_version()}\")\n", + " print(f\"TensorFlow version: {tf.__version__}\")\n", + " print(f\"PyTorch version: {torch.__version__}\")\n", + " print(f\"Fast AI version: {fastai.__version__}\")\n", + "except NameError:\n", + " pass # skip this import if we are not in a GPU environment\n", "\n", "%load_ext autoreload\n", "%autoreload 2" @@ -147,34 +173,31 @@ }, { "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "spark = start_or_get_spark(\"PySpark\", memory=\"32g\")\n", - "spark.conf.set(\"spark.sql.analyzer.failAmbiguousSelfJoin\", \"false\")" - ] - }, - { - "cell_type": "code", - "execution_count": 4, + "execution_count": 6, "metadata": {}, "outputs": [], "source": [ - "# Hide fastai progress bar\n", - "hide_fastai_progress_bar()" + "try:\n", + " spark = start_or_get_spark(\"PySpark\", memory=\"32g\")\n", + " spark.conf.set(\"spark.sql.analyzer.failAmbiguousSelfJoin\", \"false\")\n", + "except NameError:\n", + " pass # skip this import if we are not in a Spark environment" ] }, { "cell_type": "code", - "execution_count": 5, + "execution_count": 9, "metadata": {}, "outputs": [], "source": [ "# fix random seeds to make sure out runs are reproducible\n", "np.random.seed(SEED)\n", - "torch.manual_seed(SEED)\n", - "torch.cuda.manual_seed_all(SEED)" + "try:\n", + " tf.random.set_seed(SEED)\n", + " torch.manual_seed(SEED)\n", + " torch.cuda.manual_seed_all(SEED)\n", + "except NameError:\n", + " pass # skip this import if we are not in a GPU environment" ] }, { @@ -314,13 +337,18 @@ "}\n", "\n", "lightgcn_param = {\n", - " \"yaml_file\": os.path.join(\"..\",\"..\",\"recommenders\", \"models\", \"deeprec\", \"config\", \"lightgcn.yaml\"),\n", + " \"model_type\": \"lightgcn\",\n", " \"n_layers\": 3,\n", " \"batch_size\": 1024,\n", + " \"embed_size\": 64,\n", + " \"decay\": 0.0001,\n", " \"epochs\": 20,\n", " \"learning_rate\": 0.005,\n", " \"eval_epoch\": 5,\n", " \"top_k\": DEFAULT_K,\n", + " \"metrics\": [\"recall\", \"ndcg\", \"precision\", \"map\"],\n", + " \"save_model\":False,\n", + " \"MODEL_DIR\":\".\",\n", "}\n", "\n", "params = {\n", @@ -778,7 +806,8 @@ " summary = generate_summary(data_size, algo, DEFAULT_K, time_train, time_rating, ratings, time_ranking, rankings)\n", " df_results.loc[df_results.shape[0] + 1] = summary\n", " \n", - "print(\"\\nComputation finished\")\n" + "print(\"\\nComputation finished\")\n", + "sb.glue(\"results\", df_results[\"nDCG@k\"].tolist())" ] }, { @@ -1175,9 +1204,9 @@ ], "metadata": { "kernelspec": { - "display_name": "reco", + "display_name": "Python 3.7.13 ('reco')", "language": "python", - "name": "conda-env-reco-py" + "name": "python3" }, "language_info": { "codemirror_mode": { @@ -1193,7 +1222,7 @@ }, "vscode": { "interpreter": { - "hash": "5659d8898e613821d32ce6048323094401019f14577fd202084feb69010a20de" + "hash": "2d9774f375d93db4a064c1fe757ad48aac12718e2d70725b436da9188beb8cf3" } } }, diff --git a/recommenders/models/deeprec/models/graphrec/lightgcn.py b/recommenders/models/deeprec/models/graphrec/lightgcn.py index 7f17bf95f3..60042464dd 100644 --- a/recommenders/models/deeprec/models/graphrec/lightgcn.py +++ b/recommenders/models/deeprec/models/graphrec/lightgcn.py @@ -301,25 +301,25 @@ def run_eval(self): if metric == "map": ret.append( map_at_k( - self.data.test, topk_scores, relevancy_method=None, k=self.top_k + self.data.test, topk_scores, k=self.top_k ) ) elif metric == "ndcg": ret.append( ndcg_at_k( - self.data.test, topk_scores, relevancy_method=None, k=self.top_k + self.data.test, topk_scores, k=self.top_k ) ) elif metric == "precision": ret.append( precision_at_k( - self.data.test, topk_scores, relevancy_method=None, k=self.top_k + self.data.test, topk_scores, k=self.top_k ) ) elif metric == "recall": ret.append( recall_at_k( - self.data.test, topk_scores, relevancy_method=None, k=self.top_k + self.data.test, topk_scores, k=self.top_k ) ) return ret diff --git a/tests/ci/azureml_tests/test_groups.py b/tests/ci/azureml_tests/test_groups.py index 6b7c5b5e42..6817500467 100644 --- a/tests/ci/azureml_tests/test_groups.py +++ b/tests/ci/azureml_tests/test_groups.py @@ -44,7 +44,7 @@ "tests/smoke/examples/test_notebooks_python.py::test_cornac_bpr_smoke", # 16.62s "tests/integration/examples/test_notebooks_python.py::test_cornac_bpr_integration", # 165.72s ], - "group_cpu_002": [ # Total group time: 1742.32s (didn't add xlearn) + "group_cpu_002": [ # Total group time: 1800.32s (didn't add xlearn) # "tests/smoke/examples/test_notebooks_python.py::test_baseline_deep_dive_smoke", # 15.98s "tests/integration/examples/test_notebooks_python.py::test_baseline_deep_dive_integration", # 170.73s @@ -54,6 +54,7 @@ # "tests/integration/examples/test_notebooks_python.py::test_geoimc_integration", # 1006.19s # + "tests/integration/examples/test_notebooks_python.py::test_benchmark_movielens_cpu", #58s # FIXME: Add experimental tests in a later iteration # "tests/integration/examples/test_notebooks_python.py::test_xlearn_fm_integration", # 255.73s ], @@ -119,15 +120,17 @@ "tests/smoke/examples/test_notebooks_gpu.py::test_npa_smoke", # 366.22s "tests/integration/examples/test_notebooks_gpu.py::test_npa_quickstart_integration", # 810.92s ], - "group_gpu_007": [ # Total group time: 620.89s + "group_gpu_007": [ # Total group time: 846.89s "tests/unit/examples/test_notebooks_gpu.py::test_gpu_vm", # 0.76s (Always the first test to check the GPU works) "tests/smoke/examples/test_notebooks_gpu.py::test_naml_smoke", # 620.13s + # + "tests/integration/examples/test_notebooks_gpu.py::test_benchmark_movielens_gpu", # 226s # FIXME: Reduce test time https://github.com/microsoft/recommenders/issues/1731 # "tests/integration/examples/test_notebooks_gpu.py::test_naml_quickstart_integration", # 2033.85s # FIXME: https://github.com/microsoft/recommenders/issues/1716 # "tests/integration/examples/test_notebooks_gpu.py::test_sasrec_quickstart_integration", # 448.06s + 614.69s ], - "group_spark_001": [ # Total group time: 845.16s + "group_spark_001": [ # Total group time: 987.16s "tests/smoke/recommenders/dataset/test_movielens.py::test_load_spark_df", # 4.33s "tests/integration/recommenders/datasets/test_movielens.py::test_load_spark_df", # 25.58s + 101.99s + 139.23s # @@ -137,6 +140,7 @@ # "tests/smoke/examples/test_notebooks_pyspark.py::test_als_pyspark_smoke", # 49.53s "tests/integration/examples/test_notebooks_pyspark.py::test_als_pyspark_integration", # 110.58s + "tests/integration/examples/test_notebooks_pyspark.py::test_benchmark_movielens_pyspark", #142 ], } diff --git a/tests/conftest.py b/tests/conftest.py index d5212fcff5..7ffa163494 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -335,6 +335,9 @@ def notebooks(): "nni_tuning_svd": os.path.join( folder_notebooks, "04_model_select_and_optimize", "nni_surprise_svd.ipynb" ), + "benchmark_movielens": os.path.join( + folder_notebooks, "06_benchmarks", "movielens.ipynb" + ), } return paths diff --git a/tests/integration/examples/test_notebooks_gpu.py b/tests/integration/examples/test_notebooks_gpu.py index 78f0bf54f2..1a539a215c 100644 --- a/tests/integration/examples/test_notebooks_gpu.py +++ b/tests/integration/examples/test_notebooks_gpu.py @@ -14,7 +14,7 @@ from recommenders.utils.gpu_utils import get_number_gpus -TOL = 0.5 +TOL = 0.1 ABS_TOL = 0.05 @@ -709,8 +709,6 @@ def test_sasrec_quickstart_integration( "model_name": model_name, "seed": seed, } - - print("Executing notebook ... ") pm.execute_notebook( notebook_path, output_notebook, @@ -723,3 +721,32 @@ def test_sasrec_quickstart_integration( for key, value in expected_values.items(): assert results[key] == pytest.approx(value, rel=TOL, abs=ABS_TOL) + + +@pytest.mark.gpu +@pytest.mark.notebooks +@pytest.mark.integration +@pytest.mark.parametrize( + "size, algos, expected_values_ndcg", + [ + ( + ["100k"], + ["ncf", "fastai", "bivae", "lightgcn"], + [0.382793, 0.147583, 0.471722, 0.412664] + ), + ], +) +def test_benchmark_movielens_gpu(notebooks, output_notebook, kernel_name, size, algos, expected_values_ndcg): + notebook_path = notebooks["benchmark_movielens"] + pm.execute_notebook( + notebook_path, + output_notebook, + kernel_name=kernel_name, + parameters=dict(data_sizes=size, algorithms=algos), + ) + results = sb.read_notebook(output_notebook).scraps.dataframe.set_index("name")[ + "data" + ] + assert len(results["results"]) == 4 + for i, value in enumerate(results["results"]): + assert results["results"][i] == pytest.approx(value, rel=TOL, abs=ABS_TOL) diff --git a/tests/integration/examples/test_notebooks_pyspark.py b/tests/integration/examples/test_notebooks_pyspark.py index dd5cc538b6..9b1a7b9558 100644 --- a/tests/integration/examples/test_notebooks_pyspark.py +++ b/tests/integration/examples/test_notebooks_pyspark.py @@ -62,3 +62,32 @@ def test_mmlspark_lightgbm_criteo_integration(notebooks, output_notebook, kernel ] assert results["auc"] == pytest.approx(0.68895, rel=TOL, abs=ABS_TOL) + + +@pytest.mark.spark +@pytest.mark.notebooks +@pytest.mark.integration +@pytest.mark.parametrize( + "size, algos, expected_values_ndcg", + [ + ( + ["100k"], + ["als"], + [0.035812] + ), + ], +) +def test_benchmark_movielens_pyspark(notebooks, output_notebook, kernel_name, size, algos, expected_values_ndcg): + notebook_path = notebooks["benchmark_movielens"] + pm.execute_notebook( + notebook_path, + output_notebook, + kernel_name=kernel_name, + parameters=dict(data_sizes=size, algorithms=algos), + ) + results = sb.read_notebook(output_notebook).scraps.dataframe.set_index("name")[ + "data" + ] + assert len(results["results"]) == 1 + for i, value in enumerate(results["results"]): + assert results["results"][i] == pytest.approx(value, rel=TOL, abs=ABS_TOL) \ No newline at end of file diff --git a/tests/integration/examples/test_notebooks_python.py b/tests/integration/examples/test_notebooks_python.py index 98dcef806e..4cd9fd0a55 100644 --- a/tests/integration/examples/test_notebooks_python.py +++ b/tests/integration/examples/test_notebooks_python.py @@ -1,7 +1,6 @@ # Copyright (c) Microsoft Corporation. All rights reserved. # Licensed under the MIT License. -import sys import pytest try: @@ -309,3 +308,31 @@ def test_xlearn_fm_integration(notebooks, output_notebook, kernel_name): ] assert results["auc_score"] == pytest.approx(0.75, rel=TOL, abs=ABS_TOL) + + +@pytest.mark.notebooks +@pytest.mark.integration +@pytest.mark.parametrize( + "size, algos, expected_values_ndcg", + [ + ( + ["100k"], + ["svd", "sar", "bpr"], + [0.094444, 0.393818, 0.444990] + ), + ], +) +def test_benchmark_movielens_cpu(notebooks, output_notebook, kernel_name, size, algos, expected_values_ndcg): + notebook_path = notebooks["benchmark_movielens"] + pm.execute_notebook( + notebook_path, + output_notebook, + kernel_name=kernel_name, + parameters=dict(data_sizes=size, algorithms=algos), + ) + results = sb.read_notebook(output_notebook).scraps.dataframe.set_index("name")[ + "data" + ] + assert len(results["results"]) == 3 + for i, value in enumerate(results["results"]): + assert results["results"][i] == pytest.approx(value, rel=TOL, abs=ABS_TOL) \ No newline at end of file