From ff984aec81c5c08ce28443d896c0818cfae4f789 Mon Sep 17 00:00:00 2001 From: Adrian-Mahjour Date: Mon, 22 Nov 2021 11:24:09 -0500 Subject: [PATCH] AutoAI Model Deployment Resources --- .../Deploying AutoAI Models to Db2/README.md | 7 + .../helperfunction_experiment.ipynb | 450 +++++++++++++++++ .../helperfunction_model.ipynb | 473 ++++++++++++++++++ In_Db2_Machine_Learning/README.md | 1 + 4 files changed, 931 insertions(+) create mode 100644 In_Db2_Machine_Learning/Deploying AutoAI Models to Db2/README.md create mode 100644 In_Db2_Machine_Learning/Deploying AutoAI Models to Db2/helperfunction_experiment.ipynb create mode 100644 In_Db2_Machine_Learning/Deploying AutoAI Models to Db2/helperfunction_model.ipynb diff --git a/In_Db2_Machine_Learning/Deploying AutoAI Models to Db2/README.md b/In_Db2_Machine_Learning/Deploying AutoAI Models to Db2/README.md new file mode 100644 index 0000000..17c4c71 --- /dev/null +++ b/In_Db2_Machine_Learning/Deploying AutoAI Models to Db2/README.md @@ -0,0 +1,7 @@ +# Helper Functions for Deploying AutoAI Models + +Use these functions to automatically deploy AutoAI models to Db2 from IBM Cloud Pak for Data + +Use `helperfunction_model.ipynb` to deploy from an AutoAI pipeline notebook. + +Use `helperfunction_experiement.ipynb` to deploy from an AutoAI experiment notebook \ No newline at end of file diff --git a/In_Db2_Machine_Learning/Deploying AutoAI Models to Db2/helperfunction_experiment.ipynb b/In_Db2_Machine_Learning/Deploying AutoAI Models to Db2/helperfunction_experiment.ipynb new file mode 100644 index 0000000..aa08469 --- /dev/null +++ b/In_Db2_Machine_Learning/Deploying AutoAI Models to Db2/helperfunction_experiment.ipynb @@ -0,0 +1,450 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": { + "id": "6d139251a938458081be7742adbd2428" + }, + "source": [ + "# Helper Function to Automatically Deploy AutoAI Models to Db2 as Python UDF from an AutoAI Experiment\n", + "\n", + "This notebook loads the helper function that automatically deploys an AutoAI Model as a Db2 Python UDF. This must be used an AutoAI **experiment notebook**. It should not be used in a model notebook" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "67eab20fedc14e429c56b537426f966f" + }, + "source": [ + "## Output Handling Functions" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "f7b23d9d893e41c68793dd133a522063" + }, + "source": [ + "These two functions deal with formatting errors or successes within the main function" + ] + }, + { + "cell_type": "code", + "execution_count": 1, + "metadata": { + "id": "196eb35baa05463fb584ee6bd25dcb76" + }, + "outputs": [], + "source": [ + "# How to output success message\n", + "def success_msg(message):\n", + " from IPython.display import HTML, display\n", + " html = '

'\n", + " display(HTML(html + message + \"

\")) " + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "metadata": { + "id": "f337b9cb56a14abeb3aef4655d6c0590" + }, + "outputs": [], + "source": [ + "# How to ouput error message\n", + "def errormsg(message):\n", + " from IPython.display import HTML, display\n", + " html = '

'\n", + " display(HTML(html + message + \"

\"))" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "1b6d6e5de10141a0a42b0e0f79790dc6" + }, + "source": [ + "## Helper Function" + ] + }, + { + "cell_type": "code", + "execution_count": 3, + "metadata": { + "id": "9d4f0447-db30-41c9-b43a-d01f434c33ce" + }, + "outputs": [], + "source": [ + "def deploy_autoai_model_as_db2pyudf(udf_source_filename, model_filename, jupyterpod_path, db2pod_path,id_col_index,create_function=False,function_name=\"\"):\n", + " \"\"\"\n", + " Deploy an IBM AutoAI Model to Db2 as a Python UDF. This is done in the following steps:\n", + " 1. Save the AutoAI model as a joblib file on the shared filesystem between the Jupyter and Db2 pods\n", + " 2. Write the Python UDF source file on the shared filesystem between the Jupyter and Db2 pods\n", + " 3. Change permissions of UDF source file and joblib file to be accessible by the Db2 fenced process\n", + " 4. (Optional) Register the UDF with Db2 through a CREATE OR REPLACE FUNCTION statement\n", + " \n", + " Parameters\n", + " ----------\n", + " udf_source_filename : str\n", + " The filename that you would like to save the UDF source file under. Do not include full path.\n", + " Example: 'myudf.py'\n", + " model_filename: str\n", + " The filename that you would like to save the the AutoAI model under. Do not include full path. Must be a joblib file.\n", + " Example: 'myautoaimodel.joblib'\n", + " jupyterpod_path: str\n", + " The path in the Jupyter pod to save the model and UDF source file to. \n", + " This should be path that is shared between the Jupyter pod and the Db2 pod.\n", + " Example: '/mnts/jupyterfiles/adrian/'\n", + " db2pod_path: str\n", + " The path in the Db2 pod where the Db2 fenced process can access the UDF source file and AutoAI model. \n", + " This should be path that is shared between the Jupyter pod and the Db2 pod.\n", + " Example: '/mnt/blumeta0/adrian/'\n", + " id_col_index: int\n", + " The index (starting at 0) of the column of the input table that contains the unique row id.\n", + " Used to map the output prediction to the input row.\n", + " Example: If the input to the UDF is MY_UDF((SELECT COUNT(*) FROM T1),i.C1,i.C2,i.ID,i.C4), then id_col_index=2\n", + " create_function: boolean, optional (default is False)\n", + " A flag to indicate whether the function should automatically register the UDF with Db2 through a CREATE OR REPLACE FUNCTION statement. Will overwrite any existing function with the same name.\n", + " If set to true, argument function_name must be provided.\n", + " function_name: str, optional (default is \"\")\n", + " A string for the function name to be registered with Db2 during the CREATE OR REPLACE FUNCTION statement.\n", + " Example: 'MY_UDF'\n", + " \n", + " Example Use\n", + " ----------\n", + " deploy_autoai_model_as_db2pyudf(udf_source_filename='myudf.py', \n", + " model_filename='myautoaimodel.joblib',\n", + " jupyterpod_path='/mnts/jupyterfiles/adrian/',\n", + " db2pod_path='/mnt/blumeta0/adrian/',\n", + " id_col_index=0,\n", + " create_function=True,\n", + " function_name='FLIGHT_PREDICTER')\n", + " \n", + " \"\"\"\n", + " import ibm_db\n", + " import ibm_db_dbi\n", + " import pandas as pd\n", + " from joblib import dump\n", + " from ibm_watson_machine_learning.experiment import AutoAI\n", + " from ibm_watson_machine_learning import APIClient\n", + " \n", + " jupyter_model_path = jupyterpod_path+model_filename\n", + " db2_model_path = db2pod_path+model_filename\n", + " jupyter_udf_path = jupyterpod_path+udf_source_filename\n", + " db2_udf_path = db2pod_path+udf_source_filename\n", + " \n", + " \n", + " ############################\n", + " # 1. Save the AutoAI model #\n", + " ############################\n", + " print('Saving the AutoAI model...')\n", + "\n", + " try:\n", + " # Convert the Lale pipeline to a sklearn pipeline and save as joblib\n", + " scikit_learn_pipeline = pipeline_model.export_to_sklearn_pipeline()\n", + " dump(scikit_learn_pipeline,jupyter_model_path)\n", + " except Exception as e:\n", + " errormsg(\"ERROR: Unable to save AutoAI model as joblib file\")\n", + " print(e)\n", + " return\n", + " else:\n", + " print(\"Successfully saved the AutoAI model to path:\", jupyter_model_path)\n", + " print(\"\")\n", + "\n", + "\n", + " ###########################\n", + " # 2. Write the Python UDF #\n", + " ###########################\n", + " \n", + " # Import statements\n", + " udf_import = '''\n", + "################\n", + "### IMPORTS ###\n", + "###############\n", + "import nzae\n", + "\n", + "import numpy as np\n", + "from joblib import load\n", + "\n", + "class full_pipeline(nzae.Ae):\n", + " def _runUdtf(self):\n", + " #####################\n", + " ### INITIALIZATON ###\n", + " #####################\n", + " '''\n", + " \n", + " # Load the model from the filesystem\n", + " udf_loadmodel = '''\n", + " trained_pipeline = load('{}')\n", + " '''.format(db2_model_path)\n", + "\n", + " # UDF body - row batching, model scoring, and output\n", + " udf_body = '''\n", + " #######################\n", + " ### DATA COLLECTION ###\n", + " #######################\n", + " # Collect rows into a single batch\n", + " batchsize = 0\n", + " rownum = 0\n", + " row_list = []\n", + " for row in self:\n", + " if (rownum==0):\n", + " # Grab batchsize from first element value (select count (*))\n", + " batchsize=row[0] \n", + " \n", + " # Collect everything but first element (which is select count(*))\n", + " row_list.append(row[1:])\n", + " rownum = rownum+1\n", + "\n", + " if rownum==batchsize:\n", + " data= np.array(row_list)\n", + "\n", + " ##############################\n", + " ### MODEL SCORING & OUTPUT ###\n", + " ##############################\n", + " \n", + " # Collect data into a numpy array for scoring\n", + " data=np.array(row_list)\n", + " \n", + " # Collect row IDs - TODO can probably just do this in the output step!\n", + " ids=data[:,{}]\n", + " \n", + " # Call our trained pipeline to transform the data and make predictions\n", + " predictions = trained_pipeline.predict(data)\n", + "\n", + " # Output the row id and the corresponding prediction\n", + " for x in range(predictions.shape[0]):\n", + " self.output(int(ids[x]),int(predictions[x]))\n", + " \n", + " #Reset rownum and row_list for next batch\n", + " row_list=[]\n", + " rownum=0\n", + " self.done()\n", + "full_pipeline.run()\n", + " '''.format(id_col_index)\n", + " # Write the PyUDF file\n", + " print('Writing Python UDF source file...')\n", + " try:\n", + " with open(jupyter_udf_path, mode='w') as file:\n", + " file.write(udf_import)\n", + "\n", + " with open(jupyter_udf_path, mode='a') as file:\n", + " file.write(udf_loadmodel)\n", + "\n", + " with open(jupyter_udf_path, mode='a') as file:\n", + " file.write(udf_body)\n", + " except Exception as e:\n", + " errormsg(\"ERROR: Unable to write Python UDF source file\")\n", + " print(e)\n", + " return\n", + " else:\n", + " print(\"Successfully saved the Python UDF source file to path:\", jupyter_udf_path)\n", + " print(\"\")\n", + " \n", + " ##############################\n", + " # 3. Change file permissions # ##TODO: figure out how to handle errors!\n", + " ##############################\n", + " # Change permissions of UDF source file and joblib file to be accessible by the Db2 fenced process\n", + "\n", + " print('Changing file permissions...')\n", + " !chmod -R 777 $jupyter_model_path\n", + " !chmod -R 777 $jupyter_udf_path\n", + " print(\"\")\n", + " \n", + " ##########################\n", + " # 4. Create UDF function #\n", + " ##########################\n", + " \n", + " if create_function==True:\n", + " print('Automatically registering UDF function...')\n", + " print(\"\")\n", + " \n", + " # First check required optional arguments are specified\n", + " if function_name == \"\":\n", + " errormsg(\"ERROR: Function name not provided!\")\n", + " return\n", + " \n", + " # Connect to Db2\n", + " print('Attempting to make a connection to Db2...')\n", + " try:\n", + " # Get the Db2 credentials from the experiment\n", + " client = APIClient(wml_credentials)\n", + " client.set.default_project(experiment_metadata['project_id'])\n", + " Db2_credentials = client.connections.get_details()['resources'][0]['entity']['properties']\n", + " Db2_dsn = 'DATABASE={};HOSTNAME={};PORT={};PROTOCOL=TCPIP;UID={uid};PWD={pwd}'.format(\n", + " Db2_credentials['database'],\n", + " Db2_credentials['host'],\n", + " Db2_credentials['port'],\n", + " uid=Db2_credentials['username'],\n", + " pwd=Db2_credentials['password']\n", + " )\n", + " Db2_connection = ibm_db.connect(Db2_dsn,\"\",\"\")\n", + " dbi_connection = ibm_db_dbi.Connection(Db2_connection)\n", + " except Exception as e:\n", + " errormsg(\"ERROR: Connect to Db2 failed\")\n", + " print(e)\n", + " return\n", + " else:\n", + " print('Connection successful!')\n", + " print(\"\")\n", + " \n", + " # Determine input column datatypes\n", + " print('Attempting to determine input column datatypes...')\n", + " try:\n", + " input_table = experiment_metadata['excel_sheet']\n", + " sql = '''SELECT NAME, COLTYPE,LENGTH FROM SYSIBM.SYSCOLUMNS \n", + " WHERE TBCREATOR='{}' AND TBNAME='{}' AND NAME!='{}' ORDER BY COLNO \n", + " '''.format(input_table.split('.')[0],input_table.split('.')[1],experiment_metadata['prediction_column'])\n", + " # Create a string from the mapping. This is used in the CREATE FUNCTION statement\n", + " dtypes_df = pd.read_sql(sql,dbi_connection)\n", + " mapping = [str(dtypes_df['COLTYPE'][dtypes_df['NAME']==x].values[0]).strip()+'('+str(dtypes_df['LENGTH'][dtypes_df['NAME']==x].values[0])+')' \n", + " if str(dtypes_df['COLTYPE'][dtypes_df['NAME']==x].values[0]).strip()==\"VARCHAR\" \n", + " else str(dtypes_df['COLTYPE'][dtypes_df['NAME']==x].values[0]).strip() \n", + " for x in dtypes_df['NAME']]\n", + " input_dtypes_string = ', '.join([x for x in mapping ])\n", + " except Exception as e:\n", + " errormsg(\"ERROR: Error determining input datatypes\")\n", + " print(e)\n", + " return\n", + " else:\n", + " print('Successfully determined input column datatypes!')\n", + " print(\"\")\n", + " \n", + " # Automatically execute CREATE FUNCTION statement\n", + " print('Attempting to execute CREATE FUNCTION statement...')\n", + " try:\n", + " sql='''\n", + "CREATE OR REPLACE FUNCTION \n", + "{}(INTEGER,{}) \n", + "RETURNS TABLE (ID INTEGER,PREDICTION SMALLINT)\n", + "LANGUAGE PYTHON PARAMETER STYLE NPSGENERIC FENCED NOT THREADSAFE NO FINAL CALL DISALLOW PARALLEL NO DBINFO \n", + "DETERMINISTIC NO EXTERNAL ACTION CALLED ON NULL INPUT \n", + "NO SQL EXTERNAL NAME '{}'\n", + " '''.format(function_name,input_dtypes_string,db2_udf_path)\n", + "\n", + " print(sql)\n", + " stmt = ibm_db.prepare(Db2_connection, sql)\n", + " ibm_db.execute(stmt)\n", + " except Exception as e:\n", + " errormsg(\"ERROR: Unable to execute CREATE FUNCTION statement!\")\n", + " print(e)\n", + " return\n", + " else:\n", + " print('UDF registered with Db2!')\n", + " \n", + " \n", + " # Show how to call the UDF\n", + " msg = \"\"\"Execute the following SQL statement to call your UDF to make predictions on input data
\n", + " \n", + " SELECT f.* from <INPUT_TABLE> i,\n", + " TABLE({}((SELECT COUNT(*) from <INPUT_TABLE>),i.C1,i.C2, ...)) f
\n", + "
\n", + " Replace <INPUT_TABLE> with the name of the table that contains the raw data to be scored (e.g., FLIGHTS.DATA)
\n", + " Replace i.C1, i.C2, ... with the input columns (e.g., i.DAY, i.ORIGIN,...)
\n", + " You may choose to replace the first argument (SELECT COUNT(*) from <INPUT_TABLE>) with a custom batchsize.
\n", + " Note that the batchsize must be a clean divisor of the input table. E.g., for a table of 10 rows, you may choose a batchsize of 1, 2, 5, or 10.\"\"\".format(function_name)\n", + " success_msg(msg)\n", + " \n", + " # If Create Function argument not provided, provide steps for manual function registration\n", + " else:\n", + " print('Steps to manually create your UDF function:')\n", + " \n", + " # How to write the CREATE FUNCTION statement\n", + " msg= '''\n", + " Execute the following SQL statement to create your UDF:
\n", + " \n", + " CREATE OR REPLACE FUNCTION \n", + " <UDF_NAME>(INTEGER,<C1 DATATYPE>,<C2 DATATYPE>,...) \n", + " RETURNS TABLE (ID INTEGER,PREDICTION SMALLINT)\n", + " LANGUAGE PYTHON PARAMETER STYLE NPSGENERIC FENCED NOT THREADSAFE NO FINAL CALL \n", + " DISALLOW PARALLEL NO DBINFO DETERMINISTIC NO EXTERNAL ACTION CALLED ON NULL INPUT \n", + " NO SQL EXTERNAL NAME '{}'
\n", + "
\n", + " Replace <UDF_NAME> with a function name (e.g., MY_UDF)
\n", + " Replace <Cn DATATYPE> with the datatype of the nth input column (e.g., VARCHAR(8))\n", + " '''.format(db2_udf_path)\n", + " success_msg(msg)\n", + " \n", + " # How to call the UDF\n", + " msg='''\n", + " Execute the following SQL statement to call your UDF to make predictions on input data:
\n", + " \n", + " SELECT f.* from <INPUT_TABLE> i,\n", + " TABLE(<UDF_NAME>((SELECT COUNT(*) from <INPUT_TABLE>),i.C1,i.C2, ...)) f
\n", + "
\n", + " Replace <UDF_NAME> with the name of your UDF (e.g., MY_UDF)
\n", + " Replace <INPUT_TABLE> with the name of the table that contains the raw data to be scored (e.g., FLIGHTS.DATA)
\n", + " Replace i.C1, i.C2, ... with the input columns (e.g., i.DAY, i.ORIGIN,...)
\n", + " You may choose to replace the first argument (SELECT COUNT(*) from <INPUT_TABLE>) with a custom batchsize.
\n", + " Note that the batchsize must be a clean divisor of the input table. E.g., for a table of 10 rows, you may choose a batchsize of 1, 2, 5, or 10.\n", + " '''\n", + " success_msg(msg)" + ] + }, + { + "cell_type": "code", + "execution_count": 4, + "metadata": { + "id": "2c2999f64edd4c53bee162844b6c2acd" + }, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "--------------\n" + ] + }, + { + "data": { + "text/html": [ + "

Function deploy_autoai_model_as_db2pyudf successfully loaded!
\n", + "Run help(deploy_autoai_model_as_db2pyudf) to get function information

" + ], + "text/plain": [ + "" + ] + }, + "metadata": {}, + "output_type": "display_data" + } + ], + "source": [ + "print('--------------')\n", + "success_msg('''Function deploy_autoai_model_as_db2pyudf successfully loaded!
\n", + "Run help(deploy_autoai_model_as_db2pyudf) to get function information''')" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "id": "dff2ba54ab43414582d17ce5d39f33cb" + }, + "outputs": [], + "source": [] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3.7", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.7.10" + } + }, + "nbformat": 4, + "nbformat_minor": 1 +} diff --git a/In_Db2_Machine_Learning/Deploying AutoAI Models to Db2/helperfunction_model.ipynb b/In_Db2_Machine_Learning/Deploying AutoAI Models to Db2/helperfunction_model.ipynb new file mode 100644 index 0000000..03a81ae --- /dev/null +++ b/In_Db2_Machine_Learning/Deploying AutoAI Models to Db2/helperfunction_model.ipynb @@ -0,0 +1,473 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": { + "id": "6d139251a938458081be7742adbd2428" + }, + "source": [ + "# Helper Function to Automatically Deploy AutoAI Models to Db2 as Python UDF" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "bab2f37c743d45ec884771eb2ebeb171" + }, + "source": [ + "This notebook loads the helper function that automatically deploys an AutoAI Model as a Db2 Python UDF. This must be used an AutoAI **model notebook**. It should not be used in an experiment notebook" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "67eab20fedc14e429c56b537426f966f" + }, + "source": [ + "## Output Handling Functions" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "4435e67bd65648d580595fb52effe834" + }, + "source": [ + "These two functions deal with formatting errors or successes within the main function" + ] + }, + { + "cell_type": "code", + "execution_count": 1, + "metadata": { + "id": "196eb35baa05463fb584ee6bd25dcb76" + }, + "outputs": [], + "source": [ + "# How to output success message\n", + "def success_msg(message):\n", + " from IPython.display import HTML, display\n", + " html = '

'\n", + " display(HTML(html + message + \"

\")) " + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "metadata": { + "id": "f337b9cb56a14abeb3aef4655d6c0590" + }, + "outputs": [], + "source": [ + "# How to ouput error message\n", + "def errormsg(message):\n", + " from IPython.display import HTML, display\n", + " html = '

'\n", + " display(HTML(html + message + \"

\"))" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "1b6d6e5de10141a0a42b0e0f79790dc6" + }, + "source": [ + "## Helper Function" + ] + }, + { + "cell_type": "code", + "execution_count": 3, + "metadata": { + "id": "9d4f0447-db30-41c9-b43a-d01f434c33ce" + }, + "outputs": [], + "source": [ + "def deploy_autoai_model_as_db2pyudf(udf_source_filename, model_filename, jupyterpod_path, db2pod_path,id_col_index,create_function=False,instance_url=\"\",function_name=\"\"):\n", + " \"\"\"\n", + " Deploy an IBM AutoAI Model to Db2 as a Python UDF. This is done in the following steps:\n", + " 1. Save the AutoAI model as a joblib file on the shared filesystem between the Jupyter and Db2 pods\n", + " 2. Write the Python UDF source file on the shared filesystem between the Jupyter and Db2 pods\n", + " 3. Change permissions of UDF source file and joblib file to be accessible by the Db2 fenced process\n", + " 4. (Optional) Register the UDF with Db2 through a CREATE OR REPLACE FUNCTION statement\n", + " \n", + " Parameters\n", + " ----------\n", + " udf_source_filename : str\n", + " The filename that you would like to save the UDF source file under. Do not include full path.\n", + " Example: 'myudf.py'\n", + " model_filename: str\n", + " The filename that you would like to save the the AutoAI model under. Do not include full path. Must be a joblib file.\n", + " Example: 'myautoaimodel.joblib'\n", + " jupyterpod_path: str\n", + " The path in the Jupyter pod to save the model and UDF source file to. \n", + " This should be path that is shared between the Jupyter pod and the Db2 pod.\n", + " Example: '/mnts/jupyterfiles/adrian/'\n", + " db2pod_path: str\n", + " The path in the Db2 pod where the Db2 fenced process can access the UDF source file and AutoAI model. \n", + " This should be path that is shared between the Jupyter pod and the Db2 pod.\n", + " Example: '/mnt/blumeta0/adrian/'\n", + " id_col_index: int\n", + " The index (starting at 0) of the column of the input table that contains the unique row id.\n", + " Used to map the output prediction to the input row.\n", + " Example: If the input to the UDF is MY_UDF((SELECT COUNT(*) FROM T1),i.C1,i.C2,i.ID,i.C4), then id_col_index=2\n", + " create_function: boolean, optional (default is False)\n", + " A flag to indicate whether the function should automatically register the UDF with Db2 through a CREATE OR REPLACE FUNCTION statement. Will overwrite any existing function with the same name.\n", + " If set to true, arguments instance_url and function_name must be provided.\n", + " instance_url: str, optional (default is \"\")\n", + " A string of your Cloud Pak for Data instance home url. \n", + " Example: \"https://cpd-cpd-instance.apps.db2ai.cp.fyre.ibm.com\"\n", + " function_name: str, optional (default is \"\")\n", + " A string for the function name to be registered with Db2 during the CREATE OR REPLACE FUNCTION statement.\n", + " Example: 'MY_UDF'\n", + " \n", + " Example Use\n", + " ----------\n", + " deploy_autoai_model_as_db2pyudf(udf_source_filename='myudf.py', \n", + " model_filename='myautoaimodel.joblib',\n", + " jupyterpod_path='/mnts/jupyterfiles/adrian/',\n", + " db2pod_path='/mnt/blumeta0/adrian/',\n", + " id_col_index=0,\n", + " create_function=True,\n", + " instance_url=\"https://cpd-cpd-instance.apps.db2ai.cp.fyre.ibm.com\"\n", + " function_name='FLIGHT_PREDICTER')\n", + " \n", + " \"\"\"\n", + " import ibm_db\n", + " import ibm_db_dbi\n", + " from joblib import dump\n", + " import os\n", + " import pandas as pd\n", + " from ibm_watson_machine_learning.experiment import AutoAI\n", + " from ibm_watson_machine_learning import APIClient\n", + " \n", + " jupyter_model_path = jupyterpod_path+model_filename\n", + " db2_model_path = db2pod_path+model_filename\n", + " jupyter_udf_path = jupyterpod_path+udf_source_filename\n", + " db2_udf_path = db2pod_path+udf_source_filename\n", + " \n", + " \n", + " ############################\n", + " # 1. Save the AutoAI model #\n", + " ############################\n", + " print('Saving the AutoAI model...')\n", + "\n", + " try:\n", + " dump(pipeline,jupyter_model_path)\n", + " except Exception as e:\n", + " errormsg(\"ERROR: Unable to save AutoAI model as joblib file\")\n", + " print(e)\n", + " return\n", + " else:\n", + " print(\"Successfully saved the AutoAI model to path:\", jupyter_model_path)\n", + " print(\"\")\n", + "\n", + "\n", + " ###########################\n", + " # 2. Write the Python UDF #\n", + " ###########################\n", + " \n", + " # Import statements\n", + " udf_import = '''\n", + "################\n", + "### IMPORTS ###\n", + "###############\n", + "import nzae\n", + "\n", + "import numpy as np\n", + "from joblib import load\n", + "\n", + "class full_pipeline(nzae.Ae):\n", + " def _runUdtf(self):\n", + " #####################\n", + " ### INITIALIZATON ###\n", + " #####################\n", + " '''\n", + " \n", + " # Load the model from the filesystem\n", + " udf_loadmodel = '''\n", + " trained_pipeline = load('{}')\n", + " '''.format(db2_model_path)\n", + "\n", + " # UDF body - row batching, model scoring, and output\n", + " udf_body = '''\n", + " #######################\n", + " ### DATA COLLECTION ###\n", + " #######################\n", + " # Collect rows into a single batch\n", + " batchsize = 0\n", + " rownum = 0\n", + " row_list = []\n", + " for row in self:\n", + " if (rownum==0):\n", + " # Grab batchsize from first element value (select count (*))\n", + " batchsize=row[0] \n", + " \n", + " # Collect everything but first element (which is select count(*))\n", + " row_list.append(row[1:])\n", + " rownum = rownum+1\n", + "\n", + " if rownum==batchsize:\n", + " data= np.array(row_list)\n", + "\n", + " ##############################\n", + " ### MODEL SCORING & OUTPUT ###\n", + " ##############################\n", + " \n", + " # Collect data into a numpy array for scoring\n", + " data=np.array(row_list)\n", + " \n", + " # Collect row IDs - TODO can probably just do this in the output step!\n", + " ids=data[:,{}]\n", + " \n", + " # Call our trained pipeline to transform the data and make predictions\n", + " predictions = trained_pipeline.predict(data)\n", + "\n", + " # Output the row id and the corresponding prediction\n", + " for x in range(predictions.shape[0]):\n", + " self.output(int(ids[x]),int(predictions[x]))\n", + " \n", + " #Reset rownum and row_list for next batch\n", + " row_list=[]\n", + " rownum=0\n", + " self.done()\n", + "full_pipeline.run()\n", + " '''.format(id_col_index)\n", + " # Write the PyUDF file\n", + " print('Writing Python UDF source file...')\n", + " try:\n", + " with open(jupyter_udf_path, mode='w') as file:\n", + " file.write(udf_import)\n", + "\n", + " with open(jupyter_udf_path, mode='a') as file:\n", + " file.write(udf_loadmodel)\n", + "\n", + " with open(jupyter_udf_path, mode='a') as file:\n", + " file.write(udf_body)\n", + " except Exception as e:\n", + " errormsg(\"ERROR: Unable to write Python UDF source file\")\n", + " print(e)\n", + " return\n", + " else:\n", + " print(\"Successfully saved the Python UDF source file to path:\", jupyter_udf_path)\n", + " print(\"\")\n", + " \n", + " ##############################\n", + " # 3. Change file permissions # ##TODO: figure out how to handle errors!\n", + " ##############################\n", + " # Change permissions of UDF source file and joblib file to be accessible by the Db2 fenced process\n", + "\n", + " print('Changing file permissions...')\n", + " !chmod -R 777 $jupyter_model_path\n", + " !chmod -R 777 $jupyter_udf_path\n", + " print(\"\")\n", + " \n", + " ##########################\n", + " # 4. Create UDF function #\n", + " ##########################\n", + " \n", + " if create_function==True:\n", + " print('Automatically registering UDF function...')\n", + " print(\"\")\n", + " \n", + " # First check required optional arguments are specified\n", + " if instance_url == \"\":\n", + " errormsg(\"ERROR: Cloud Pak for Data instance URL not provided!\")\n", + " return\n", + " if function_name == \"\":\n", + " errormsg(\"ERROR: Function name not provided!\")\n", + " return\n", + " \n", + " # Connect to Db2\n", + " print('Attempting to make a connection to Db2...')\n", + " try:\n", + " # Get the Db2 credentials from WML\n", + " url = instance_url\n", + " wml_credentials = {\n", + " \"instance_id\": \"openshift\",\n", + " \"token\": os.environ.get(\"USER_ACCESS_TOKEN\"),\n", + " \"url\": url,\n", + " \"version\": \"4.0\"\n", + " }\n", + " client = APIClient(wml_credentials)\n", + " client.set.default_project(experiment_metadata['project_id'])\n", + " Db2_credentials = client.connections.get_details()['resources'][0]['entity']['properties']\n", + " \n", + " # Make a connection to Db2\n", + " Db2_dsn = 'DATABASE={};HOSTNAME={};PORT={};PROTOCOL=TCPIP;UID={uid};PWD={pwd}'.format(\n", + " Db2_credentials['database'],\n", + " Db2_credentials['host'],\n", + " Db2_credentials['port'],\n", + " uid=Db2_credentials['username'],\n", + " pwd=Db2_credentials['password']\n", + " )\n", + " Db2_connection = ibm_db.connect(Db2_dsn,\"\",\"\")\n", + " dbi_connection = ibm_db_dbi.Connection(Db2_connection)\n", + " except Exception as e:\n", + " errormsg(\"ERROR: Connect to Db2 failed\")\n", + " print(e)\n", + " return\n", + " else:\n", + " print('Connection successful!')\n", + " print(\"\")\n", + " \n", + " # Determine input column datatypes\n", + " print('Attempting to determine input column datatypes...')\n", + " try:\n", + " # Get input table name from experiment metadata\n", + " input_table = experiment_metadata['excel_sheet']\n", + " sql = '''SELECT NAME, COLTYPE,LENGTH FROM SYSIBM.SYSCOLUMNS \n", + " WHERE TBCREATOR='{}' AND TBNAME='{}' AND NAME!='{}' ORDER BY COLNO \n", + " '''.format(input_table.split('.')[0],input_table.split('.')[1],experiment_metadata['prediction_column'])\n", + " # Create a string from the mapping. This is used in the CREATE FUNCTION statement\n", + " dtypes_df = pd.read_sql(sql,dbi_connection)\n", + " mapping = [str(dtypes_df['COLTYPE'][dtypes_df['NAME']==x].values[0]).strip()+'('+str(dtypes_df['LENGTH'][dtypes_df['NAME']==x].values[0])+')' \n", + " if str(dtypes_df['COLTYPE'][dtypes_df['NAME']==x].values[0]).strip()==\"VARCHAR\" \n", + " else str(dtypes_df['COLTYPE'][dtypes_df['NAME']==x].values[0]).strip() \n", + " for x in dtypes_df['NAME']]\n", + " input_dtypes_string = ', '.join([x for x in mapping ])\n", + " except Exception as e:\n", + " errormsg(\"ERROR: Error determining input datatypes\")\n", + " print(e)\n", + " return\n", + " else:\n", + " print('Successfully determined input column datatypes!')\n", + " print(\"\")\n", + " \n", + " # Automatically execute CREATE FUNCTION statement\n", + " print('Attempting to execute CREATE FUNCTION statement...')\n", + " try:\n", + " sql='''\n", + "CREATE OR REPLACE FUNCTION \n", + "{}(INTEGER,{}) \n", + "RETURNS TABLE (ID INTEGER,PREDICTION SMALLINT)\n", + "LANGUAGE PYTHON PARAMETER STYLE NPSGENERIC FENCED NOT THREADSAFE NO FINAL CALL DISALLOW PARALLEL NO DBINFO \n", + "DETERMINISTIC NO EXTERNAL ACTION CALLED ON NULL INPUT \n", + "NO SQL EXTERNAL NAME '{}'\n", + " '''.format(function_name,input_dtypes_string,db2_udf_path)\n", + "\n", + " print(sql)\n", + " stmt = ibm_db.prepare(Db2_connection, sql)\n", + " ibm_db.execute(stmt)\n", + " except Exception as e:\n", + " errormsg(\"ERROR: Unable to execute CREATE FUNCTION statement!\")\n", + " print(e)\n", + " return\n", + " else:\n", + " print('UDF registered with Db2!')\n", + " \n", + " \n", + " # Show how to call the UDF\n", + " msg = \"\"\"Execute the following SQL statement to call your UDF to make predictions on input data
\n", + " \n", + " SELECT f.* from <INPUT_TABLE> i,\n", + " TABLE({}((SELECT COUNT(*) from <INPUT_TABLE>),i.C1,i.C2, ...)) f
\n", + "
\n", + " Replace <INPUT_TABLE> with the name of the table that contains the raw data to be scored (e.g., FLIGHTS.DATA)
\n", + " Replace i.C1, i.C2, ... with the input columns (e.g., i.DAY, i.ORIGIN,...)
\n", + " You may choose to replace the first argument (SELECT COUNT(*) from <INPUT_TABLE>) with a custom batchsize.
\n", + " Note that the batchsize must be a clean divisor of the input table. E.g., for a table of 10 rows, you may choose a batchsize of 1, 2, 5, or 10.\"\"\".format(function_name)\n", + " success_msg(msg)\n", + " \n", + " # If Create Function argument not provided, provide steps for manual function registration\n", + " else:\n", + " print('Steps to manually create your UDF function:')\n", + " \n", + " # How to write the CREATE FUNCTION statement\n", + " msg= '''\n", + " Execute the following SQL statement to create your UDF:
\n", + " \n", + " CREATE OR REPLACE FUNCTION \n", + " <UDF_NAME>(INTEGER,<C1 DATATYPE>,<C2 DATATYPE>,...) \n", + " RETURNS TABLE (ID INTEGER,PREDICTION SMALLINT)\n", + " LANGUAGE PYTHON PARAMETER STYLE NPSGENERIC FENCED NOT THREADSAFE NO FINAL CALL \n", + " DISALLOW PARALLEL NO DBINFO DETERMINISTIC NO EXTERNAL ACTION CALLED ON NULL INPUT \n", + " NO SQL EXTERNAL NAME '{}'
\n", + "
\n", + " Replace <UDF_NAME> with a function name (e.g., MY_UDF)
\n", + " Replace <Cn DATATYPE> with the datatype of the nth input column (e.g., VARCHAR(8))\n", + " '''.format(db2_udf_path)\n", + " success_msg(msg)\n", + " \n", + " # How to call the UDF\n", + " msg='''\n", + " Execute the following SQL statement to call your UDF to make predictions on input data:
\n", + " \n", + " SELECT f.* from <INPUT_TABLE> i,\n", + " TABLE(<UDF_NAME>((SELECT COUNT(*) from <INPUT_TABLE>),i.C1,i.C2, ...)) f
\n", + "
\n", + " Replace <UDF_NAME> with the name of your UDF (e.g., MY_UDF)
\n", + " Replace <INPUT_TABLE> with the name of the table that contains the raw data to be scored (e.g., FLIGHTS.DATA)
\n", + " Replace i.C1, i.C2, ... with the input columns (e.g., i.DAY, i.ORIGIN,...)
\n", + " You may choose to replace the first argument (SELECT COUNT(*) from <INPUT_TABLE>) with a custom batchsize.
\n", + " Note that the batchsize must be a clean divisor of the input table. E.g., for a table of 10 rows, you may choose a batchsize of 1, 2, 5, or 10.\n", + " '''\n", + " success_msg(msg)" + ] + }, + { + "cell_type": "code", + "execution_count": 4, + "metadata": { + "id": "2c2999f64edd4c53bee162844b6c2acd" + }, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "--------------\n" + ] + }, + { + "data": { + "text/html": [ + "

Function deploy_autoai_model_as_db2pyudf successfully loaded!
\n", + "Run help(deploy_autoai_model_as_db2pyudf) to get function information

" + ], + "text/plain": [ + "" + ] + }, + "metadata": {}, + "output_type": "display_data" + } + ], + "source": [ + "print('--------------')\n", + "success_msg('''Function deploy_autoai_model_as_db2pyudf successfully loaded!
\n", + "Run help(deploy_autoai_model_as_db2pyudf) to get function information''')" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "id": "dff2ba54ab43414582d17ce5d39f33cb" + }, + "outputs": [], + "source": [] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3.7", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.7.10" + } + }, + "nbformat": 4, + "nbformat_minor": 1 +} diff --git a/In_Db2_Machine_Learning/README.md b/In_Db2_Machine_Learning/README.md index 1bcd2e7..0b5d5bc 100644 --- a/In_Db2_Machine_Learning/README.md +++ b/In_Db2_Machine_Learning/README.md @@ -5,3 +5,4 @@ This repository contains the following example use-cases: - **Building a Scoring Pipeline with Db2**: Use in-Db2 machine learning functionality for single row scoring inside the database - **Deploying External Models with Python UDF**: Use Python UDFs to deploy and call externally trained models in Db2 - **Automated AI Model Development with IBM Cloud Pak for Data and Db2**: Catalog a database table from IBM Db2 into Watson Knowledge catalog. Once cataloged, we’ll use this data asset with AutoAI to automatically build a classification model +- **Deploying AutoAI Models to Db2**: Automatically deploy AutoAI ML pipelines to Db2 for scoring as Python UDFs