Adding a New Model Definition¶
Teradata’s Analytic Ops (AOPS) framework provides an easy-to-use web-based user interface (UI) and a command line interface (CLI) to handle end-to-end pipelining of data science workflows.
In this tutorial, you will complete the following tasks related to defining a new Model definition in Git. Note: these examples use python, but the same can be done with R.
In order to install AnalyticOps Accelerator CLI follow see installation instructions.
Before adding a new model, a project must be set up and configured in your local machine. This can be achieved by creating a new project or cloning an existing one. In both cases, a git repository to host the project is required.
Add a New Project¶
Prior to adding a new project locally, it must be created in the UI as explained in the User Guide: Create a New Project. When done, it can be added locally using the CLI:
Open a shell terminal and navigate to the path where you want your project to reside.
cd <path to project dir>
Initialize the project files and structure with the command
init
:aoa init
Then the CLI will prompt the user to select a project to initialize locally. Type the index for the newly created project.
Available projects: ------------------- [0] (1dc14dba-49f9-4e31-8c84-abbb2177e14e) Aoa Demo Models [1] (23e1df4b-b630-47a1-ab80-7ad5385fcd8d) New Project Select project by index (current selection: none): _
Using CLI to Clone an Existing Project¶
To clone an existing project using the CLI:
Open a terminal and navigate to the path where you want your project to reside.
cd <path to project dir>
Initialize the project files and structure with the command
clone
:aoa clone
Then the CLI will prompt the user to select a project to initialize locally. Type the index for the selected project.
Available projects: ------------------- [0] (1dc14dba-49f9-4e31-8c84-abbb2177e14e) Aoa Demo Models [1] (23e1df4b-b630-47a1-ab80-7ad5385fcd8d) Existent Project Select project by index (current selection: none): _
Using CLI to Add a New Model¶
To add a new model using the CLI:
Open a terminal and navigate to the path where your AnalyticsOps project resides.
cd <path to project dir>
Use the
add
command from the CLI.aoa add
When prompted, enter the desired model name and model description.
Enter model name: _ Enter model description: _
Choose the desired language, by typing the index number from the list.
Supported languages: -------------------- [0] R [1] python [2] sql Select model language by index: _
Choose the desired template, by typing the index number from the list.
Supported templates: -------------------- [0] empty (default) [1] pyspark [2] sklearn Select template type by index (or leave blank for the default one): _
As a result, a message will be shown with the id and name of the newly creted model. E.g.
Creating model structure for model: (f7627d7a-3a4b-430b-953c-fc4fa738b642) demo
The files and folder structure of this new model will be under a path with the pattern
<path to project dir>/model_definitions/<model id>
Copy Example Notebook¶
To keep all documents and code when developing a model in the same location, it is recommended to save or copy all your notebooks under the path <path to project dir>/model_definitions/<model id>/notebooks
As an example, a notebook can be copied in the terminal with:
cp <origin path dir>/example.ipynb <path to project dir>/model_definitions/<model id>/notebooks
Define the Model Configuration¶
Once the model is defined, it can be formatted for AnalyticOps Accelerator.
Create or edit the existing
requirements.txt
file with all the model’s dependencies and versions in the folder<path to project dir>/model_definitions/<model id>/model_modules
. E.g.xgboost==0.90 scikit-learn==0.22.2 shap==0.36.0 matplotlib>=2.2.2 teradataml>=17.0.0.2 nyoka>=4.3.0 aoa==4.1.4
Create or edit the existing
<path to project dir>/model_definitions/<model id>/config.json
file with all the model’s configuration parameters and values in json format. E.g.{ "hyperParameters": { "eta": 0.2, "max_depth": 6 } }
Define the Training File¶
Open and edit the file
<path to project dir>/model_definitions/<model id>/model_modules/training.py
. The contents will be something like:1def train(data_conf, model_conf, **kwargs): 2 """Python train method called by AOA framework 3 4 Parameters: 5 data_conf (dict): The dataset metadata 6 model_conf (dict): The model configuration to use 7 8 Returns: 9 None:No return 10 11 """ 12 13 hyperparams = model_conf["hyperParameters"] 14 15 # load data & engineer 16 17 print("Starting training...") 18 19 # fit model to training data 20 21 print("Finished training") 22 23 # export model artefacts to models/ folder 24 25 print("Saved trained model")
Add all the necessary imports at the beginning of the file. E.g.
1from xgboost import XGBClassifier 2from sklearn.preprocessing import MinMaxScaler 3from sklearn.pipeline import Pipeline 4from nyoka import xgboost_to_pmml 5from teradataml import create_context 6from teradataml.dataframe.dataframe import DataFrame 7from aoa.stats import stats 8from aoa.util.artefacts import save_plot 9 10import joblib 11import os 12 13 14def train(data_conf, model_conf, **kwargs): 15 """Python train method called by AOA framework 16 17 Parameters: 18 data_conf (dict): The dataset metadata 19 model_conf (dict): The model configuration to use 20 21 Returns: 22 None:No return 23 24 """ 25 26 hyperparams = model_conf["hyperParameters"] 27 28 # load data & engineer 29 30 print("Starting training...") 31 32 # fit model to training data 33 34 print("Finished training") 35 36 # export model artefacts to models/ folder 37 38 print("Saved trained model")
Replace the comment
# load data & engineer
with the code to load the data and do all the necessary engineering before the training step. E.g.1from xgboost import XGBClassifier 2from sklearn.preprocessing import MinMaxScaler 3from sklearn.pipeline import Pipeline 4from nyoka import xgboost_to_pmml 5from teradataml import create_context 6from teradataml.dataframe.dataframe import DataFrame 7from aoa.stats import stats 8from aoa.util.artefacts import save_plot 9 10import joblib 11import os 12 13 14def train(data_conf, model_conf, **kwargs): 15 hyperparams = model_conf["hyperParameters"] 16 17 # Create context to connect to Vantage 18 create_context(host=os.environ["AOA_CONN_HOST"], 19 username=os.environ["AOA_CONN_USERNAME"], 20 password=os.environ["AOA_CONN_PASSWORD"], 21 database=data_conf["schema"] if "schema" in data_conf and data_conf["schema"] != "" else None) 22 23 feature_names = ["NumTimesPrg", "PlGlcConc", "BloodP", "SkinThick", "TwoHourSerIns", "BMI", "DiPedFunc", "Age"] 24 target_name = "HasDiabetes" 25 26 # read training dataset from Teradata and convert to pandas 27 train_df = DataFrame(data_conf["table"]) 28 train_df = train_df.select([feature_names + [target_name]]) 29 train_pdf = train_df.to_pandas() 30 31 # split data into X and y 32 X_train = train_pdf.drop(target_name, 1) 33 y_train = train_pdf[target_name] 34 35 print("Starting training...") 36 37 # fit model to training data 38 39 print("Finished training") 40 41 # export model artefacts to models/ folder 42 43 print("Saved trained model")
Note: the function
train
and all it’s arguments must be present in the function definition. As shown in the example above, thedata_conf
parameter is a python dictionary that holds all the info for the dataset that can be established on the UI. Similarly, themodel_conf
parameter holds all the info established on the configuration file config.json and overridden on the UI. The env varsAOA_CONN_HOST
,AOA_CONN_USERNAME
andAOA_CONN_PASSWORD
are passed by the system from the connection selected in the UI when executing the task.Replace the comment
# fit model to training data
with the code required to train the model. E.g.1from xgboost import XGBClassifier 2from sklearn.preprocessing import MinMaxScaler 3from sklearn.pipeline import Pipeline 4from nyoka import xgboost_to_pmml 5from teradataml import create_context 6from teradataml.dataframe.dataframe import DataFrame 7from aoa.stats import stats 8from aoa.util.artefacts import save_plot 9 10import joblib 11import os 12 13 14def train(data_conf, model_conf, **kwargs): 15 hyperparams = model_conf["hyperParameters"] 16 17 # Create context to connect to Vantage 18 create_context(host=os.environ["AOA_CONN_HOST"], 19 username=os.environ["AOA_CONN_USERNAME"], 20 password=os.environ["AOA_CONN_PASSWORD"], 21 database=data_conf["schema"] if "schema" in data_conf and data_conf["schema"] != "" else None) 22 23 feature_names = ["NumTimesPrg", "PlGlcConc", "BloodP", "SkinThick", "TwoHourSerIns", "BMI", "DiPedFunc", "Age"] 24 target_name = "HasDiabetes" 25 26 # read training dataset from Teradata and convert to pandas 27 train_df = DataFrame(data_conf["table"]) 28 train_df = train_df.select([feature_names + [target_name]]) 29 train_pdf = train_df.to_pandas() 30 31 # split data into X and y 32 X_train = train_pdf.drop(target_name, 1) 33 y_train = train_pdf[target_name] 34 35 print("Starting training...") 36 37 # fit model to training data 38 model = Pipeline([('scaler', MinMaxScaler()), 39 ('xgb', XGBClassifier(eta=hyperparams["eta"], 40 max_depth=hyperparams["max_depth"]))]) 41 # xgboost saves feature names but lets store on pipeline for easy access later 42 model.feature_names = feature_names 43 model.target_name = target_name 44 45 model.fit(X_train, y_train) 46 47 print("Finished training") 48 49 # export model artefacts to models/ folder 50 51 print("Saved trained model")
Replace the comment
# export model artefacts to models/ folder
with the code to export all the model’s artefacts. E.g.1from xgboost import XGBClassifier 2from sklearn.preprocessing import MinMaxScaler 3from sklearn.pipeline import Pipeline 4from nyoka import xgboost_to_pmml 5from teradataml import create_context 6from teradataml.dataframe.dataframe import DataFrame 7from aoa.stats import stats 8from aoa.util.artefacts import save_plot 9 10import joblib 11import os 12 13 14def train(data_conf, model_conf, **kwargs): 15 hyperparams = model_conf["hyperParameters"] 16 17 # Create context to connect to Vantage 18 create_context(host=os.environ["AOA_CONN_HOST"], 19 username=os.environ["AOA_CONN_USERNAME"], 20 password=os.environ["AOA_CONN_PASSWORD"], 21 database=data_conf["schema"] if "schema" in data_conf and data_conf["schema"] != "" else None) 22 23 feature_names = ["NumTimesPrg", "PlGlcConc", "BloodP", "SkinThick", "TwoHourSerIns", "BMI", "DiPedFunc", "Age"] 24 target_name = "HasDiabetes" 25 26 # read training dataset from Teradata and convert to pandas 27 train_df = DataFrame(data_conf["table"]) 28 train_df = train_df.select([feature_names + [target_name]]) 29 train_pdf = train_df.to_pandas() 30 31 # split data into X and y 32 X_train = train_pdf.drop(target_name, 1) 33 y_train = train_pdf[target_name] 34 35 print("Starting training...") 36 37 # fit model to training data 38 model = Pipeline([('scaler', MinMaxScaler()), 39 ('xgb', XGBClassifier(eta=hyperparams["eta"], 40 max_depth=hyperparams["max_depth"]))]) 41 # xgboost saves feature names but lets store on pipeline for easy access later 42 model.feature_names = feature_names 43 model.target_name = target_name 44 45 model.fit(X_train, y_train) 46 47 print("Finished training") 48 49 # export model artefacts 50 joblib.dump(model, "artifacts/output/model.joblib") 51 52 # we can also save as pmml so it can be used for In-Vantage scoring etc. 53 xgboost_to_pmml(pipeline=model, col_names=feature_names, target_name=target_name, pmml_f_name="artifacts/output/model.pmml") 54 55 print("Saved trained model")
Note: the path used to store the artefacts must be in any case
artifacts/output/
. In the above example, the main artefact is stored asmodel.joblib
, and optionally, as an exportable model in PMML format asmodel.pmml
.Add at the end of the train function the code required for monitoring. E.g.
1from xgboost import XGBClassifier 2from sklearn.preprocessing import MinMaxScaler 3from sklearn.pipeline import Pipeline 4from nyoka import xgboost_to_pmml 5from teradataml import create_context 6from teradataml.dataframe.dataframe import DataFrame 7from aoa.stats import stats 8from aoa.util.artefacts import save_plot 9 10import joblib 11import os 12 13 14def train(data_conf, model_conf, **kwargs): 15 hyperparams = model_conf["hyperParameters"] 16 17 # Create context to connect to Vantage 18 create_context(host=os.environ["AOA_CONN_HOST"], 19 username=os.environ["AOA_CONN_USERNAME"], 20 password=os.environ["AOA_CONN_PASSWORD"], 21 database=data_conf["schema"] if "schema" in data_conf and data_conf["schema"] != "" else None) 22 23 feature_names = ["NumTimesPrg", "PlGlcConc", "BloodP", "SkinThick", "TwoHourSerIns", "BMI", "DiPedFunc", "Age"] 24 target_name = "HasDiabetes" 25 26 # read training dataset from Teradata and convert to pandas 27 train_df = DataFrame(data_conf["table"]) 28 train_df = train_df.select([feature_names + [target_name]]) 29 train_pdf = train_df.to_pandas() 30 31 # split data into X and y 32 X_train = train_pdf.drop(target_name, 1) 33 y_train = train_pdf[target_name] 34 35 print("Starting training...") 36 37 # fit model to training data 38 model = Pipeline([('scaler', MinMaxScaler()), 39 ('xgb', XGBClassifier(eta=hyperparams["eta"], 40 max_depth=hyperparams["max_depth"]))]) 41 # xgboost saves feature names but lets store on pipeline for easy access later 42 model.feature_names = feature_names 43 model.target_name = target_name 44 45 model.fit(X_train, y_train) 46 47 print("Finished training") 48 49 # export model artefacts 50 joblib.dump(model, "artifacts/output/model.joblib") 51 52 # we can also save as pmml so it can be used for In-Vantage scoring etc. 53 xgboost_to_pmml(pipeline=model, col_names=feature_names, target_name=target_name, pmml_f_name="artifacts/output/model.pmml") 54 55 print("Saved trained model") 56 57 from xgboost import plot_importance 58 model["xgb"].get_booster().feature_names = feature_names 59 plot_importance(model["xgb"].get_booster(), max_num_features=10) 60 save_plot("feature_importance.png") 61 62 feature_importance = model["xgb"].get_booster().get_score(importance_type="weight") 63 stats.record_training_stats(train_df, 64 features=feature_names, 65 predictors=[target_name], 66 categorical=[target_name], 67 importance=feature_importance, 68 category_labels={target_name: {0: "false", 1: "true"}})
Note: to enable monitoring capabilities for the model, the method
stats.record_training_stats
must be called with all the parameters required as shown in the above example.The resulting file should be:
1from xgboost import XGBClassifier 2from sklearn.preprocessing import MinMaxScaler 3from sklearn.pipeline import Pipeline 4from nyoka import xgboost_to_pmml 5from teradataml import create_context 6from teradataml.dataframe.dataframe import DataFrame 7from aoa.stats import stats 8from aoa.util.artefacts import save_plot 9 10import joblib 11import os 12 13 14def train(data_conf, model_conf, **kwargs): 15 hyperparams = model_conf["hyperParameters"] 16 17 create_context(host=os.environ["AOA_CONN_HOST"], 18 username=os.environ["AOA_CONN_USERNAME"], 19 password=os.environ["AOA_CONN_PASSWORD"], 20 database=data_conf["schema"] if "schema" in data_conf and data_conf["schema"] != "" else None) 21 22 feature_names = ["NumTimesPrg", "PlGlcConc", "BloodP", "SkinThick", "TwoHourSerIns", "BMI", "DiPedFunc", "Age"] 23 target_name = "HasDiabetes" 24 25 # read training dataset from Teradata and convert to pandas 26 train_df = DataFrame(data_conf["table"]) 27 train_df = train_df.select([feature_names + [target_name]]) 28 train_pdf = train_df.to_pandas() 29 30 # split data into X and y 31 X_train = train_pdf.drop(target_name, 1) 32 y_train = train_pdf[target_name] 33 34 print("Starting training...") 35 36 # fit model to training data 37 model = Pipeline([('scaler', MinMaxScaler()), 38 ('xgb', XGBClassifier(eta=hyperparams["eta"], 39 max_depth=hyperparams["max_depth"]))]) 40 # xgboost saves feature names but lets store on pipeline for easy access later 41 model.feature_names = feature_names 42 model.target_name = target_name 43 44 model.fit(X_train, y_train) 45 46 print("Finished training") 47 48 # export model artefacts 49 joblib.dump(model, "artifacts/output/model.joblib") 50 51 # we can also save as pmml so it can be used for In-Vantage scoring etc. 52 xgboost_to_pmml(pipeline=model, col_names=feature_names, target_name=target_name, 53 pmml_f_name="artifacts/output/model.pmml") 54 55 print("Saved trained model") 56 57 from xgboost import plot_importance 58 model["xgb"].get_booster().feature_names = feature_names 59 plot_importance(model["xgb"].get_booster(), max_num_features=10) 60 save_plot("feature_importance.png") 61 62 feature_importance = model["xgb"].get_booster().get_score(importance_type="weight") 63 stats.record_training_stats(train_df, 64 features=feature_names, 65 predictors=[target_name], 66 categorical=[target_name], 67 importance=feature_importance, 68 category_labels={target_name: {0: "false", 1: "true"}})
Define the Evaluation File¶
Open and edit the file
<path to project dir>/model_definitions/<model id>/model_modules/evaluation.py
. The contents will be something like:1def evaluate(data_conf, model_conf, **kwargs): 2"""Python evaluate method called by AOA framework 3 4 Parameters: 5 data_conf (dict): The dataset metadata 6 model_conf (dict): The model configuration to use 7 8 Returns: 9 None:No return 10 11 """ 12 13 # dump results as json file evaluation.json to models/ folder 14 print("Evaluation complete...")
Add all the necessary imports at the beginning of the file. E.g.
1from sklearn import metrics 2from teradataml import create_context 3from teradataml.dataframe.dataframe import DataFrame 4from teradataml.dataframe.copy_to import copy_to_sql 5from aoa.stats import stats 6from aoa.util.artefacts import save_plot 7 8import os 9import joblib 10import json 11import numpy as np 12import pandas as pd 13 14 15def evaluate(data_conf, model_conf, **kwargs): 16"""Python evaluate method called by AOA framework 17 18 Parameters: 19 data_conf (dict): The dataset metadata 20 model_conf (dict): The model configuration to use 21 22 Returns: 23 None:No return 24 25 """ 26 27 # dump results as json file evaluation.json to models/ folder 28 print("Evaluation complete...")
Define the function and load the model artifact, previously stored in the training phase. E.g.
1from sklearn import metrics 2from teradataml import create_context 3from teradataml.dataframe.dataframe import DataFrame 4from teradataml.dataframe.copy_to import copy_to_sql 5from aoa.stats import stats 6from aoa.util.artefacts import save_plot 7 8import os 9import joblib 10import json 11import numpy as np 12import pandas as pd 13 14 15def evaluate(data_conf, model_conf, **kwargs): 16 model = joblib.load('artifacts/input/model.joblib') 17 18 # dump results as json file evaluation.json to models/ folder 19 print("Evaluation complete...")
Note: the function
evaluate
and all it’s arguments must be present in the function definition. The path used to store the artefacts used in the trainingartifacts/output/
seen in Define the Training File has now becomeartifacts/input/
in the evaluation phase and cannot be changed.Create the connection context using teradataml package. E.g.
1from sklearn import metrics 2from teradataml import create_context 3from teradataml.dataframe.dataframe import DataFrame 4from teradataml.dataframe.copy_to import copy_to_sql 5from aoa.stats import stats 6from aoa.util.artefacts import save_plot 7 8import os 9import joblib 10import json 11import numpy as np 12import pandas as pd 13 14 15def evaluate(data_conf, model_conf, **kwargs): 16 model = joblib.load('artifacts/input/model.joblib') 17 18 create_context(host=os.environ["AOA_CONN_HOST"], 19 username=os.environ["AOA_CONN_USERNAME"], 20 password=os.environ["AOA_CONN_PASSWORD"], 21 database=data_conf["schema"] if "schema" in data_conf and data_conf["schema"] != "" else None) 22 23 # dump results as json file evaluation.json to models/ folder 24 print("Evaluation complete...")
Note: the parameters
data_conf
,model_conf
and the env vars for the connection are as seen in Define the Training File.Load and process the data that will be used to evaluate the model. E.g.
1from sklearn import metrics 2from teradataml import create_context 3from teradataml.dataframe.dataframe import DataFrame 4from teradataml.dataframe.copy_to import copy_to_sql 5from aoa.stats import stats 6from aoa.util.artefacts import save_plot 7 8import os 9import joblib 10import json 11import numpy as np 12import pandas as pd 13 14 15def evaluate(data_conf, model_conf, **kwargs): 16 model = joblib.load('artifacts/input/model.joblib') 17 18 create_context(host=os.environ["AOA_CONN_HOST"], 19 username=os.environ["AOA_CONN_USERNAME"], 20 password=os.environ["AOA_CONN_PASSWORD"], 21 database=data_conf["schema"] if "schema" in data_conf and data_conf["schema"] != "" else None) 22 23 # Read test dataset from Teradata 24 # As this is for demo purposes, we simulate the test dataset changing between executions 25 # by introducing a random sample. Note that the sampling is performed in Teradata! 26 test_df = DataFrame(data_conf["table"]).sample(frac=0.8) 27 test_pdf = test_df.to_pandas() 28 29 X_test = test_pdf[model.feature_names] 30 y_test = test_pdf[model.target_name] 31 32 # dump results as json file evaluation.json to models/ folder 33 print("Evaluation complete...")
Score the data to evaluate the model. E.g.
1from sklearn import metrics 2from teradataml import create_context 3from teradataml.dataframe.dataframe import DataFrame 4from teradataml.dataframe.copy_to import copy_to_sql 5from aoa.stats import stats 6from aoa.util.artefacts import save_plot 7 8import os 9import joblib 10import json 11import numpy as np 12import pandas as pd 13 14 15def evaluate(data_conf, model_conf, **kwargs): 16 model = joblib.load('artifacts/input/model.joblib') 17 18 create_context(host=os.environ["AOA_CONN_HOST"], 19 username=os.environ["AOA_CONN_USERNAME"], 20 password=os.environ["AOA_CONN_PASSWORD"], 21 database=data_conf["schema"] if "schema" in data_conf and data_conf["schema"] != "" else None) 22 23 # Read test dataset from Teradata 24 # As this is for demo purposes, we simulate the test dataset changing between executions 25 # by introducing a random sample. Note that the sampling is performed in Teradata! 26 test_df = DataFrame(data_conf["table"]).sample(frac=0.8) 27 test_pdf = test_df.to_pandas() 28 29 X_test = test_pdf[model.feature_names] 30 y_test = test_pdf[model.target_name] 31 32 print("Scoring") 33 y_pred = model.predict(test_pdf[model.feature_names]) 34 35 # dump results as json file evaluation.json to models/ folder 36 print("Evaluation complete...")
Generate and store the metrics of the evaluation. E.g.
1from sklearn import metrics 2from teradataml import create_context 3from teradataml.dataframe.dataframe import DataFrame 4from teradataml.dataframe.copy_to import copy_to_sql 5from aoa.stats import stats 6from aoa.util.artefacts import save_plot 7 8import os 9import joblib 10import json 11import numpy as np 12import pandas as pd 13 14 15def evaluate(data_conf, model_conf, **kwargs): 16 model = joblib.load('artifacts/input/model.joblib') 17 18 create_context(host=os.environ["AOA_CONN_HOST"], 19 username=os.environ["AOA_CONN_USERNAME"], 20 password=os.environ["AOA_CONN_PASSWORD"], 21 database=data_conf["schema"] if "schema" in data_conf and data_conf["schema"] != "" else None) 22 23 # Read test dataset from Teradata 24 # As this is for demo purposes, we simulate the test dataset changing between executions 25 # by introducing a random sample. Note that the sampling is performed in Teradata! 26 test_df = DataFrame(data_conf["table"]).sample(frac=0.8) 27 test_pdf = test_df.to_pandas() 28 29 X_test = test_pdf[model.feature_names] 30 y_test = test_pdf[model.target_name] 31 32 print("Scoring") 33 y_pred = model.predict(test_pdf[model.feature_names]) 34 35 y_pred_tdf = pd.DataFrame(y_pred, columns=[model.target_name]) 36 y_pred_tdf["PatientId"] = test_pdf["PatientId"].values 37 38 evaluation = { 39 'Accuracy': '{:.2f}'.format(metrics.accuracy_score(y_test, y_pred)), 40 'Recall': '{:.2f}'.format(metrics.recall_score(y_test, y_pred)), 41 'Precision': '{:.2f}'.format(metrics.precision_score(y_test, y_pred)), 42 'f1-score': '{:.2f}'.format(metrics.f1_score(y_test, y_pred)) 43 } 44 45 with open("artifacts/output/metrics.json", "w+") as f: 46 json.dump(evaluation, f) 47 48 metrics.plot_confusion_matrix(model, X_test, y_test) 49 save_plot('Confusion Matrix') 50 51 metrics.plot_roc_curve(model, X_test, y_test) 52 save_plot('ROC Curve') 53 54 # dump results as json file evaluation.json to models/ folder 55 print("Evaluation complete...")
Note: the outputs of the evaluation phase must be stored in any case under
artifacts/output/
.Finally, generate and record the stats for monitoring. E.g.
1from sklearn import metrics 2from teradataml import create_context 3from teradataml.dataframe.dataframe import DataFrame 4from teradataml.dataframe.copy_to import copy_to_sql 5from aoa.stats import stats 6from aoa.util.artefacts import save_plot 7 8import os 9import joblib 10import json 11import numpy as np 12import pandas as pd 13 14 15def evaluate(data_conf, model_conf, **kwargs): 16 model = joblib.load('artifacts/input/model.joblib') 17 18 create_context(host=os.environ["AOA_CONN_HOST"], 19 username=os.environ["AOA_CONN_USERNAME"], 20 password=os.environ["AOA_CONN_PASSWORD"], 21 database=data_conf["schema"] if "schema" in data_conf and data_conf["schema"] != "" else None) 22 23 # Read test dataset from Teradata 24 # As this is for demo purposes, we simulate the test dataset changing between executions 25 # by introducing a random sample. Note that the sampling is performed in Teradata! 26 test_df = DataFrame(data_conf["table"]).sample(frac=0.8) 27 test_pdf = test_df.to_pandas() 28 29 X_test = test_pdf[model.feature_names] 30 y_test = test_pdf[model.target_name] 31 32 print("Scoring") 33 y_pred = model.predict(test_pdf[model.feature_names]) 34 35 y_pred_tdf = pd.DataFrame(y_pred, columns=[model.target_name]) 36 y_pred_tdf["PatientId"] = test_pdf["PatientId"].values 37 38 evaluation = { 39 'Accuracy': '{:.2f}'.format(metrics.accuracy_score(y_test, y_pred)), 40 'Recall': '{:.2f}'.format(metrics.recall_score(y_test, y_pred)), 41 'Precision': '{:.2f}'.format(metrics.precision_score(y_test, y_pred)), 42 'f1-score': '{:.2f}'.format(metrics.f1_score(y_test, y_pred)) 43 } 44 45 with open("artifacts/output/metrics.json", "w+") as f: 46 json.dump(evaluation, f) 47 48 metrics.plot_confusion_matrix(model, X_test, y_test) 49 save_plot('Confusion Matrix') 50 51 metrics.plot_roc_curve(model, X_test, y_test) 52 save_plot('ROC Curve') 53 54 # xgboost has its own feature importance plot support but lets use shap as explainability example 55 import shap 56 57 shap_explainer = shap.TreeExplainer(model['xgb']) 58 shap_values = shap_explainer.shap_values(X_test) 59 60 shap.summary_plot(shap_values, X_test, feature_names=model.feature_names, 61 show=False, plot_size=(12, 8), plot_type='bar') 62 save_plot('SHAP Feature Importance') 63 64 feature_importance = pd.DataFrame(list(zip(model.feature_names, np.abs(shap_values).mean(0))), 65 columns=['col_name', 'feature_importance_vals']) 66 feature_importance = feature_importance.set_index("col_name").T.to_dict(orient='records')[0] 67 68 predictions_table="TMP_{}".format(data_conf["predictions"]).lower() 69 copy_to_sql(df=y_pred_tdf, table_name=predictions_table, index=False, if_exists="replace", temporary=True) 70 71 stats.record_evaluation_stats(test_df, DataFrame(predictions_table), feature_importance)
Note: to enable monitoring capabilities for the model, the method
stats.record_evaluation_stats
must be called with all the parameters required as shown in the above example.The resulting file should be:
1from sklearn import metrics 2from teradataml import create_context 3from teradataml.dataframe.dataframe import DataFrame 4from teradataml.dataframe.copy_to import copy_to_sql 5from aoa.stats import stats 6from aoa.util.artefacts import save_plot 7 8import os 9import joblib 10import json 11import numpy as np 12import pandas as pd 13 14 15def evaluate(data_conf, model_conf, **kwargs): 16 model = joblib.load('artifacts/input/model.joblib') 17 18 create_context(host=os.environ["AOA_CONN_HOST"], 19 username=os.environ["AOA_CONN_USERNAME"], 20 password=os.environ["AOA_CONN_PASSWORD"], 21 database=data_conf["schema"] if "schema" in data_conf and data_conf["schema"] != "" else None) 22 23 # Read test dataset from Teradata 24 # As this is for demo purposes, we simulate the test dataset changing between executions 25 # by introducing a random sample. Note that the sampling is performed in Teradata! 26 test_df = DataFrame(data_conf["table"]).sample(frac=0.8) 27 test_pdf = test_df.to_pandas() 28 29 X_test = test_pdf[model.feature_names] 30 y_test = test_pdf[model.target_name] 31 32 print("Scoring") 33 y_pred = model.predict(test_pdf[model.feature_names]) 34 35 y_pred_tdf = pd.DataFrame(y_pred, columns=[model.target_name]) 36 y_pred_tdf["PatientId"] = test_pdf["PatientId"].values 37 38 evaluation = { 39 'Accuracy': '{:.2f}'.format(metrics.accuracy_score(y_test, y_pred)), 40 'Recall': '{:.2f}'.format(metrics.recall_score(y_test, y_pred)), 41 'Precision': '{:.2f}'.format(metrics.precision_score(y_test, y_pred)), 42 'f1-score': '{:.2f}'.format(metrics.f1_score(y_test, y_pred)) 43 } 44 45 with open("artifacts/output/metrics.json", "w+") as f: 46 json.dump(evaluation, f) 47 48 metrics.plot_confusion_matrix(model, X_test, y_test) 49 save_plot('Confusion Matrix') 50 51 metrics.plot_roc_curve(model, X_test, y_test) 52 save_plot('ROC Curve') 53 54 # xgboost has its own feature importance plot support but lets use shap as explainability example 55 import shap 56 57 shap_explainer = shap.TreeExplainer(model['xgb']) 58 shap_values = shap_explainer.shap_values(X_test) 59 60 shap.summary_plot(shap_values, X_test, feature_names=model.feature_names, 61 show=False, plot_size=(12, 8), plot_type='bar') 62 save_plot('SHAP Feature Importance') 63 64 feature_importance = pd.DataFrame(list(zip(model.feature_names, np.abs(shap_values).mean(0))), 65 columns=['col_name', 'feature_importance_vals']) 66 feature_importance = feature_importance.set_index("col_name").T.to_dict(orient='records')[0] 67 68 predictions_table="TMP_{}".format(data_conf["predictions"]).lower() 69 copy_to_sql(df=y_pred_tdf, table_name=predictions_table, index=False, if_exists="replace", temporary=True) 70 71 stats.record_evaluation_stats(test_df, DataFrame(predictions_table), feature_importance)
Define the Scoring File¶
Open and edit the file
<path to project dir>/model_definitions/<model id>/model_modules/scoring.py
. The contents will be something like:1def score(data_conf, model_conf, **kwargs): 2"""Python score method called by AOA framework batch mode 3 4 Parameters: 5 data_conf (dict): The dataset metadata 6 model_conf (dict): The model configuration to use 7 8 Returns: 9 None:No return 10 11 """ 12 13 14# Uncomment this code if you want to deploy your model as a Web Service (Real-time / Interactive usage) 15# class ModelScorer(object): 16# def __init__(self, config=None): 17# self.model = joblib.load('models/iris_knn.joblib') 18# 19# def predict(self, data): 20# return self.model.predict([data]) 21#
Add all the necessary imports at the beginning of the file. E.g.
1from teradataml import create_context 2from teradataml.dataframe.dataframe import DataFrame 3from teradataml.dataframe.copy_to import copy_to_sql 4from aoa.stats import stats 5 6import os 7import joblib 8import pandas as pd 9import numpy as np 10 11 12def score(data_conf, model_conf, **kwargs): 13"""Python score method called by AOA framework batch mode 14 15 Parameters: 16 data_conf (dict): The dataset metadata 17 model_conf (dict): The model configuration to use 18 19 Returns: 20 None:No return 21 22 """ 23 24 25# Uncomment this code if you want to deploy your model as a Web Service (Real-time / Interactive usage) 26# class ModelScorer(object): 27# def __init__(self, config=None): 28# self.model = joblib.load('models/iris_knn.joblib') 29# 30# def predict(self, data): 31# return self.model.predict([data]) 32#
For batch scoring, define the function and load the model artifact, previously stored in the training phase. E.g.
1from teradataml import create_context 2from teradataml.dataframe.dataframe import DataFrame 3from teradataml.dataframe.copy_to import copy_to_sql 4from aoa.stats import stats 5 6import os 7import joblib 8import pandas as pd 9import numpy as np 10 11 12def score(data_conf, model_conf, **kwargs): 13 model = joblib.load('artifacts/input/model.joblib') 14 15# Uncomment this code if you want to deploy your model as a Web Service (Real-time / Interactive usage) 16# class ModelScorer(object): 17# def __init__(self, config=None): 18# self.model = joblib.load('models/iris_knn.joblib') 19# 20# def predict(self, data): 21# return self.model.predict([data]) 22#
Note: the function
score
and all it’s arguments must be present in the function definition, this function will be called when doing batch scoring. The path used to store the artefacts used in the trainingartifacts/output/
seen in Define the Training File has now becomeartifacts/input/
in the scoring phase and cannot be changed.Create the connection context using teradataml package. E.g.
1from teradataml import create_context 2from teradataml.dataframe.dataframe import DataFrame 3from teradataml.dataframe.copy_to import copy_to_sql 4from aoa.stats import stats 5 6import os 7import joblib 8import pandas as pd 9import numpy as np 10 11 12def score(data_conf, model_conf, **kwargs): 13 model = joblib.load('artifacts/input/model.joblib') 14 15 create_context(host=os.environ["AOA_CONN_HOST"], 16 username=os.environ["AOA_CONN_USERNAME"], 17 password=os.environ["AOA_CONN_PASSWORD"], 18 database=data_conf["schema"] if "schema" in data_conf and data_conf["schema"] != "" else None) 19 20# Uncomment this code if you want to deploy your model as a Web Service (Real-time / Interactive usage) 21# class ModelScorer(object): 22# def __init__(self, config=None): 23# self.model = joblib.load('models/iris_knn.joblib') 24# 25# def predict(self, data): 26# return self.model.predict([data]) 27#
Note: the parameters
data_conf
,model_conf
and the env vars for the connection are as seen in Define the Training File.Load and process the data that will be scored by the model. E.g.
1from teradataml import create_context 2from teradataml.dataframe.dataframe import DataFrame 3from teradataml.dataframe.copy_to import copy_to_sql 4from aoa.stats import stats 5 6import os 7import joblib 8import pandas as pd 9import numpy as np 10 11 12def score(data_conf, model_conf, **kwargs): 13 model = joblib.load('artifacts/input/model.joblib') 14 15 create_context(host=os.environ["AOA_CONN_HOST"], 16 username=os.environ["AOA_CONN_USERNAME"], 17 password=os.environ["AOA_CONN_PASSWORD"], 18 database=data_conf["schema"] if "schema" in data_conf and data_conf["schema"] != "" else None) 19 20 features_tdf = DataFrame(data_conf["table"]) 21 22 # convert to pandas to use locally 23 features_df = features_tdf.to_pandas() 24 25# Uncomment this code if you want to deploy your model as a Web Service (Real-time / Interactive usage) 26# class ModelScorer(object): 27# def __init__(self, config=None): 28# self.model = joblib.load('models/iris_knn.joblib') 29# 30# def predict(self, data): 31# return self.model.predict([data]) 32#
Score the data. E.g.
1from teradataml import create_context 2from teradataml.dataframe.dataframe import DataFrame 3from teradataml.dataframe.copy_to import copy_to_sql 4from aoa.stats import stats 5 6import os 7import joblib 8import pandas as pd 9import numpy as np 10 11 12def score(data_conf, model_conf, **kwargs): 13 model = joblib.load('artifacts/input/model.joblib') 14 15 create_context(host=os.environ["AOA_CONN_HOST"], 16 username=os.environ["AOA_CONN_USERNAME"], 17 password=os.environ["AOA_CONN_PASSWORD"], 18 database=data_conf["schema"] if "schema" in data_conf and data_conf["schema"] != "" else None) 19 20 features_tdf = DataFrame(data_conf["table"]) 21 22 # convert to pandas to use locally 23 features_df = features_tdf.to_pandas() 24 25 print("Scoring") 26 y_pred = model.predict(features_df[model.feature_names]) 27 28# Uncomment this code if you want to deploy your model as a Web Service (Real-time / Interactive usage) 29# class ModelScorer(object): 30# def __init__(self, config=None): 31# self.model = joblib.load('models/iris_knn.joblib') 32# 33# def predict(self, data): 34# return self.model.predict([data]) 35#
Process and save the scored result (predictions). E.g.
1from teradataml import create_context 2from teradataml.dataframe.dataframe import DataFrame 3from teradataml.dataframe.copy_to import copy_to_sql 4from aoa.stats import stats 5 6import os 7import joblib 8import pandas as pd 9import numpy as np 10 11 12def score(data_conf, model_conf, **kwargs): 13 model = joblib.load('artifacts/input/model.joblib') 14 15 create_context(host=os.environ["AOA_CONN_HOST"], 16 username=os.environ["AOA_CONN_USERNAME"], 17 password=os.environ["AOA_CONN_PASSWORD"], 18 database=data_conf["schema"] if "schema" in data_conf and data_conf["schema"] != "" else None) 19 20 features_tdf = DataFrame(data_conf["table"]) 21 22 # convert to pandas to use locally 23 features_df = features_tdf.to_pandas() 24 25 print("Scoring") 26 y_pred = model.predict(features_df[model.feature_names]) 27 28 print("Finished Scoring") 29 30 # create result dataframe and store in Teradata 31 y_pred = pd.DataFrame(y_pred, columns=[model.target_name]) 32 y_pred["PatientId"] = features_df["PatientId"].values 33 copy_to_sql(df=y_pred, table_name=data_conf["predictions"], index=False, if_exists="replace") 34 35# Uncomment this code if you want to deploy your model as a Web Service (Real-time / Interactive usage) 36# class ModelScorer(object): 37# def __init__(self, config=None): 38# self.model = joblib.load('models/iris_knn.joblib') 39# 40# def predict(self, data): 41# return self.model.predict([data]) 42#
Generate and record the stats for monitoring. E.g.
1from teradataml import create_context 2from teradataml.dataframe.dataframe import DataFrame 3from teradataml.dataframe.copy_to import copy_to_sql 4from aoa.stats import stats 5 6import os 7import joblib 8import pandas as pd 9import numpy as np 10 11 12def score(data_conf, model_conf, **kwargs): 13 model = joblib.load('artifacts/input/model.joblib') 14 15 create_context(host=os.environ["AOA_CONN_HOST"], 16 username=os.environ["AOA_CONN_USERNAME"], 17 password=os.environ["AOA_CONN_PASSWORD"], 18 database=data_conf["schema"] if "schema" in data_conf and data_conf["schema"] != "" else None) 19 20 features_tdf = DataFrame(data_conf["table"]) 21 22 # convert to pandas to use locally 23 features_df = features_tdf.to_pandas() 24 25 print("Scoring") 26 y_pred = model.predict(features_df[model.feature_names]) 27 28 print("Finished Scoring") 29 30 # create result dataframe and store in Teradata 31 y_pred = pd.DataFrame(y_pred, columns=[model.target_name]) 32 y_pred["PatientId"] = features_df["PatientId"].values 33 copy_to_sql(df=y_pred, table_name=data_conf["predictions"], index=False, if_exists="replace") 34 35 predictions_tdf = DataFrame(data_conf["predictions"]) 36 37 stats.record_scoring_stats(features_tdf, predictions_tdf) 38 39# Uncomment this code if you want to deploy your model as a Web Service (Real-time / Interactive usage) 40# class ModelScorer(object): 41# def __init__(self, config=None): 42# self.model = joblib.load('models/iris_knn.joblib') 43# 44# def predict(self, data): 45# return self.model.predict([data]) 46#
Note: to enable monitoring capabilities for the model, the method
stats.record_scoring_stats
must be called with all the parameters required as shown in the above example.For RESTful scoring, define the class and the init method to load the model artifact, previously stored in the training phase. E.g.
1from teradataml import create_context 2from teradataml.dataframe.dataframe import DataFrame 3from teradataml.dataframe.copy_to import copy_to_sql 4from aoa.stats import stats 5 6import os 7import joblib 8import pandas as pd 9import numpy as np 10 11 12def score(data_conf, model_conf, **kwargs): 13 model = joblib.load('artifacts/input/model.joblib') 14 15 create_context(host=os.environ["AOA_CONN_HOST"], 16 username=os.environ["AOA_CONN_USERNAME"], 17 password=os.environ["AOA_CONN_PASSWORD"], 18 database=data_conf["schema"] if "schema" in data_conf and data_conf["schema"] != "" else None) 19 20 features_tdf = DataFrame(data_conf["table"]) 21 22 # convert to pandas to use locally 23 features_df = features_tdf.to_pandas() 24 25 print("Scoring") 26 y_pred = model.predict(features_df[model.feature_names]) 27 28 print("Finished Scoring") 29 30 # create result dataframe and store in Teradata 31 y_pred = pd.DataFrame(y_pred, columns=[model.target_name]) 32 y_pred["PatientId"] = features_df["PatientId"].values 33 copy_to_sql(df=y_pred, table_name=data_conf["predictions"], index=False, if_exists="replace") 34 35 predictions_tdf = DataFrame(data_conf["predictions"]) 36 37 stats.record_scoring_stats(features_tdf, predictions_tdf) 38 39 40# Add code required for RESTful API 41class ModelScorer(object): 42 43 def __init__(self, config=None): 44 self.model = joblib.load('artifacts/input/model.joblib') 45 46 from prometheus_client import Counter 47 self.pred_class_counter = Counter('model_prediction_classes', 48 'Model Prediction Classes', ['model', 'version', 'clazz'])
Note: the class
ModelScorer
and all it’s methods must be present as it will be called when doing RESTful scoring.Next, define the
predict
method. E.g.1from teradataml import create_context 2from teradataml.dataframe.dataframe import DataFrame 3from teradataml.dataframe.copy_to import copy_to_sql 4from aoa.stats import stats 5 6import os 7import joblib 8import pandas as pd 9import numpy as np 10 11 12def score(data_conf, model_conf, **kwargs): 13 model = joblib.load('artifacts/input/model.joblib') 14 15 create_context(host=os.environ["AOA_CONN_HOST"], 16 username=os.environ["AOA_CONN_USERNAME"], 17 password=os.environ["AOA_CONN_PASSWORD"], 18 database=data_conf["schema"] if "schema" in data_conf and data_conf["schema"] != "" else None) 19 20 features_tdf = DataFrame(data_conf["table"]) 21 22 # convert to pandas to use locally 23 features_df = features_tdf.to_pandas() 24 25 print("Scoring") 26 y_pred = model.predict(features_df[model.feature_names]) 27 28 print("Finished Scoring") 29 30 # create result dataframe and store in Teradata 31 y_pred = pd.DataFrame(y_pred, columns=[model.target_name]) 32 y_pred["PatientId"] = features_df["PatientId"].values 33 copy_to_sql(df=y_pred, table_name=data_conf["predictions"], index=False, if_exists="replace") 34 35 predictions_tdf = DataFrame(data_conf["predictions"]) 36 37 stats.record_scoring_stats(features_tdf, predictions_tdf) 38 39 40# Add code required for RESTful API 41class ModelScorer(object): 42 43 def __init__(self, config=None): 44 self.model = joblib.load('artifacts/input/model.joblib') 45 46 def predict(self, data): 47 return self.model.predict([data])
Note: the method
predict
from classModelScorer
and all it’s arguments must be present as it will be called when doing RESTful scoring.The resulting file should be:
1from teradataml import create_context 2from teradataml.dataframe.dataframe import DataFrame 3from teradataml.dataframe.copy_to import copy_to_sql 4from aoa.stats import stats 5 6import os 7import joblib 8import pandas as pd 9import numpy as np 10 11 12def score(data_conf, model_conf, **kwargs): 13 model = joblib.load("artifacts/input/model.joblib") 14 15 create_context(host=os.environ["AOA_CONN_HOST"], 16 username=os.environ["AOA_CONN_USERNAME"], 17 password=os.environ["AOA_CONN_PASSWORD"], 18 database=data_conf["schema"] if "schema" in data_conf and data_conf["schema"] != "" else None) 19 20 features_tdf = DataFrame(data_conf["table"]) 21 22 # convert to pandas to use locally 23 features_df = features_tdf.to_pandas() 24 25 print("Scoring") 26 y_pred = model.predict(features_df[model.feature_names]) 27 28 print("Finished Scoring") 29 30 # create result dataframe and store in Teradata 31 y_pred = pd.DataFrame(y_pred, columns=[model.target_name]) 32 y_pred["PatientId"] = features_df["PatientId"].values 33 copy_to_sql(df=y_pred, table_name=data_conf["predictions"], index=False, if_exists="replace") 34 35 predictions_tdf = DataFrame(data_conf["predictions"]) 36 37 stats.record_scoring_stats(features_tdf, predictions_tdf) 38 39 40# Add code required for RESTful API 41class ModelScorer(object): 42 43 def __init__(self, config=None): 44 self.model = joblib.load('artifacts/input/model.joblib') 45 46 def predict(self, data): 47 return self.model.predict([data])
Using CLI to validate the model files¶
In order to validate all the files previously created, just execute the model with the command
run
from the CLI. When prompted, type the index of the model to validate.> aoa run Available models: ----------------- [0] PySpark PIMA Prediction [1] Python Demand Forecasting [2] Python STO Forecasting [3] R Diabetes Prediction [4] Python Diabetes Prediction Select model by index: _
Then it will prompt to select the mode, and type the index of the mode you want to validate.
Available modes: ---------------- [0] Train [1] Evaluate [2] Score (Batch) Select mode by index: _
The next step will prompt to select the dataset (for the
Score
mode, it will be the dataset template). Type the index of the dataset you want to use for this execution.Available datasets: ------------------- [0] Demand Forecast Evaluate [1] Demand Forecast Train [2] PIMA Diabetes [3] PIMA Diabetes Evaluate [4] PIMA Diabetes Train [5] STO Synthetic Evaluate [6] STO Synthetic Train Select dataset by index: _
Finally, the CLI will prompt to select the dataset connection (you can create local connections with the CLI by using the command
aoa connection add
, check the manual). Type the index of the dataset you want to use for this execution.Available connections: ---------------------- [0] Demo Connection [1] Vantage Connection Select connection by index: _
The CLI will run your model and it will fail if there is any bug or error. If not, it will exit successfully, thus meaning your model file has been validated. You should repeat these steps for all 3 modes and/or files: training, evaluation and scoring.
Commiting the code¶
To commit your code to the configured repository, in order to make it available to the AOA system, start by adding the files you want to commit.
git add <path to project dir>/model_definitions/<model id>
Note: this will add all the files and folders under the specified model folder.
Now create the commit and set a commit message.
git commit -m "First commit for model <model id>"
Finally, push the commit to the remote repository at the specified branch.
git push origin master
Note: this example pushes to the remote
origin
and branchmaster