Build An Automated Machine Learning Pipeline On AWS

Machine Learning(ML) is the art of using historical data to predict the future. But how can you use historic, 'ground truth' data when the 'ground' is constantly moving? In this post, Senior Cloud Data Architect, Yoyu Li, shares how she used AWS to build a fully automated machine learning model using live-streaming data.

If you are reading this article, you are probably already familiar with the basic concepts. A typical ML workflow looks like this:

  1. First, we define a problem
  2. Then, we collect some data and label them
  3. Using this 'ground truth' data, data scientists work to find the best feature set, algorithm and parameters to create a model
  4. Finally, we deploy the model for prediction.

However, a problem we may encounter when applying this workflow in production is in situations when the  'ground' is constantly moving, like in the stock market for example.

Can we continuously update the ML model based on the most recent data by automating the process?

In this article, we are going to look at a fictional scenario where we are building an ML model to predict when he best time would be to buy bitcoin (in order to make a profit, of course!). As you probably know, the market of bitcoin is extremely volatile and seemingly irrational. It is impossible to use yesterday's pattern to predict how the market would go today. So, to increase the accuracy of the prediction, we want the ML model to be as 'fresh' as possible. To address this challenge, we present a fully automated ML model training pipeline using Live Streaming Data.

Before we dive into the architecture, keep in mind that, most likely, there is no winning pattern at all in bitcoin trading. The best the model can do is minimise loss. This demo is for experimental purposes only, so do not apply the model in real-world trading.

Architecture Overview

The demo is built on AWS and relies heavily on AWS's ML toolkit, Sagemaker, which we are going to dive deeper into. The diagram below offers an overview of the architecture:

  • Streaming Client: The streaming client is a node.js application that subscribes to IG's real-time streaming data source for Bitcoin price. The application then stores the market data in a MongoDB database, and generates mock-up transaction data, which we will use for ML purposes later. Together with the MongoDB instance, the application is then packaged and deployed as a docker container on Amazon ECS, powered by Fargate (which allows you to run containers without managing any underlying compute resources). For security, we also use AWS Secrets Manager to store the user credentials for IG. 
  • S3 Notification + Object Watcher Lambda Function: Whenever the Streaming Client has accumulated enough data points (in our demo, that is 10000 completed transactions), it exports two CSV files to Amazon S3 - one for historical prices, the other for transactions. Once the objects arrive in S3, a Lambda function receives the notification and triggers a Step Function execution. 
  • Step Functions Orchestration: We use AWS Step Functions to orchestrate our ETL, model training, tuning and deploy pipeline. When a Step Function execution starts, it first triggers a Lambda function, the Input File Checker, to see whether both CSV files are in S3, and pass the object names and other parameters to subsequent jobs. 
  • Glue ETL Jobs: As part of the pipeline, the Glue ETL Job joins the transaction data with the price data, extracts features, fills missing values and transforms the data into a binary format, which is required by the ML algorithm we chose to use. It also splits the data into training, validation and test datasets. 
  • Sagemaker Model Training and Hyperparameter Tuning: With the training, validation and test datasets we get from the ETL job, we launch Sagermaker Model Training service to prepare the model. For this demo, we use one of Sagemaker's built-in algorithms, Linear LearnerSagemaker Hyperparameter Tuning is a fully managed and automated service that helps us to find the best hyperparameters for a given ML algorithm, by training, adjusting and retraining the models through multiple Model Training jobs. We can set an objective for each tuning job, for example, whether we want to maximise the precision, the recall, or the F1 score for binary classification. By the end of the tuning, the service will suggest the best model in this batch, however, we can also access the algorithm metrics to select based on our own model selection criteria. 
  • Sagemaker Model Hosting: Once we have selected the best model (in the demo, we use another Lambda function, the Model Selection function), we can deploy the model and expose an endpoint for inference. 

This event-driven architecture is capable of converting streaming data into small batches and kicking off an ML process as soon as a sufficient amount of data points are collected. The pipeline automates the model training, tuning and deploy process. By the end of the pipeline, the model behind the inference endpoint will be 'refreshed' with the latest training data. Downstream applications should be able to use this endpoint for more up-to-date prediction. 

Deep Dives

The Data and The ML Problem Statement

As stated before, the purpose of this ML model is to predict when is the best time to buy bitcoin. There are several ways to frame this problem, for example, you can build a forecast model to predict the future price of bitcoin, or a regression model to predict how likely a given order could profit. Here, we framed this question into a binary-classification problem - whether buying bitcoin at a certain point of time is likely to result in profit or loss - so we label each transaction as either profit (1) or loss (0), and we are building a model to predict whether the profitability of a future order is likely to be positive or negative.

The streaming data source contains the real-time price of Bitcoin in USD and some other information like the opening price, the high and the low of the day. Below is an example of the price data:

As we don't have real transaction data, we need to mock up the transactions. Each second, a trading bot places several fake orders in the database. Each order has a lower and upper threshold at which points a sell action will be triggered. For instance, we have made an order to buy Bitcoin at $6512.5, and there is a lower limit of -$10 and upper limit or +$10. If the price rises to $6522.5, a sell order will be triggered, which results in $10 of profit; and if the price declines to $6502.5, a sell order will also be triggered to limit the loss. Of course, in the real world traders' behaviour is much more sophisticated. We simplified the order pattern for this demo, but the process of training and tuning ML models should be similar. 

Below is an example of the transaction data:

For the purpose of this article, we will not go in-depth into the Streaming Client application itself. We can just assume every now and then, there will be two CSV files being generated - one for all the completed transactions in a certain period, and the other for the prices. 

Feature Extraction

In this demo, we use Glue ETL Jobs for data processing. Glue is a fully managed big data ETL tool on AWS. With Glue, we can run Spark or plain Python ETL Jobs on-demand, without paying for the infrastructure when we don't need it. Although in our case, the data volume is small enough to be handled by a Lambda function, for the sake of the demo, we still use Glue ETL for processing. 

The exploratory data analysis and ETL prototyping were done in a local Jupyter environment. In the git repository, there is a sample notebook glue/glue-etl.ipynb which gives a flavour of this process (Although in reality, the notebooks look much messier). After several experimentations, the final feature set used in the demo is as follows:

The feature set is created by joining the transaction data and prices. Each line of the table contains the price information at the time point when the order was made, the prices in the past 10 minutes, and the label (whether the transaction was profit-making).  It excluded columns like timestamp that can be noise for the purpose, and kept several columns like bid price, offer price from the price table. And the columns 0 to 599 are the historical price at minus x second(s) in relation to the time when the order was made. By having the historical price points as features, we re-frame a time series prediction problem into a conventional supervised machine learning problem

In real-world practice, we may want to include the price information of a longer period (e.g. the past several hours or days), or at a different resolution (e.g. every minute or every hour) depending on the trading frequency we intend to make. Here, for the convenience of the demo, we assume a very high trading frequency, therefore, we use the price data per second in the past 10 minutes for prediction. 

Finally, we convert all the absolute prices into relative prices against the mid price (the average of bid and offer price) at the time, as shown below. Using this technique has seen better prediction results in the experimental training models. Later, we will configure Sagemaker to normalise the data within the features.

The Glue ETL Job also splits the data into training, validation and test datasets (70/15/15) and exports them as RecordIO-Protobuf binary format, as required by the Linear Learner algorithm. 

The ETL script is then deployed to Glue ETL as a .py file, and the python dependencies can be deployed as an egg or wheel file. Once the Job is triggered, Glue will run the ETL script on an ad-hoc basis. 

A Closer Look into Hyperparameter Tuning and Model Selection

Once we've got the datasets, we can start training the model using Sagemaker Model Training jobs. However, there are many tunable hyperparameters for each ML algorithm. We have already introduced that we can use Sagemaker Hyperparameter Tuning to find the best combination for a given ML algorithm. 

Hyperparameter Tuning is an Amazon implementation of Bayesian Hyperparameter Optimization. It starts the tuning process by using a baseline combination of hyperparameters; and based on the test results of the first models, it runs a Bayesian Search to find new combinations that are likely to generate better results and train the models again; by repeating this train-adjust-retrain process, in theory, the algorithm can find the optimal combination for the given objective. 

How does this work in practice? As Sagemaker is a fully managed service, all we need to do is to define an objective and which parameters are tunable. Sagemaker will be running the optimisation process and spin up training jobs for us.

First of all, we need to define a training job template, where we specify some static hyperparameters - the ones which are not tunable and will be applied to all training jobs - e.g. the feature dimension and the predictor type.  

"StaticHyperParameters": {
     "feature_dim": "605",
     "epochs": "15",
     "normalize_data": "true",
     "normalize_label": "false",
     "predictor_type": "binary_classifier",
     "binary_classifier_model_selection_criteria": "precision_at_target_recall",
     "target_recall": "0.25"
    }

 

Here we also define the binary_classifier_model_selection_criteria to precision_at_target_recall and the target_recall to 25%. This means we want the precision (how many orders that the model predicts would be profitable actually make money) to be as high as possible, whereas we don't care too much about the recall (the proportion of profit-making opportunities that the model can identify). Intuitively we know that we do not need the model to be able to identify all the profit-making opportunities; but for those opportunities that are flagged as positive, they are better to be true

Then we define a set of tunable hyperparameters in the tuning job configuration. Some of these parameters are categorical, e.g. use_bias can be either true or false; some of them can fit in a range, e.g. the weight of the positive examples. The tuning job will try different combinations of the hyperparameters within these possible ranges.

"ParameterRanges": {
      "CategoricalParameterRanges": [
        {
            "Name": "use_bias",
            "Values": [ "true","false" ]
        }
      ],
      "ContinuousParameterRanges": [
        {
          "MaxValue": "1",
          "MinValue": "0.0000001",
          "ScalingType":"Logarithmic",
          "Name": "l1"
        },
        {
          "MaxValue": "1",
          "MinValue": "0.00001",
          "ScalingType":"Logarithmic",
          "Name": "learning_rate"
        },
        {
          "MaxValue": "10000",
          "MinValue": "0.00001",
          "ScalingType":"Logarithmic",
          "Name": "positive_example_weight_mult"
        },
        {
          "MaxValue": "1",
          "MinValue": "0.0000001",
          "ScalingType":"Logarithmic",
          "Name": "wd"
        }
      ],
      "IntegerParameterRanges": [
        {
          "MaxValue": "1000",
          "MinValue": "100",
          "ScalingType":"Logarithmic",
          "Name": "mini_batch_size"
        }
      ]
    }

 

Finally, we define an objective for the training. As explained before, we want to maximise the precision for the purpose of the prediction. Also, we want to have a maximum of 10 training jobs, with 2 jobs running in parallel at each stage. In theory, we can find the optimal combination of hyperparameters if we run the training jobs an infinite number of times. But obviously, time and budget will be the constraints.

"HyperParameterTuningJobObjective": {     
"MetricName": "test:precision",
      "Type": "Maximize"
    },
"Strategy":"Bayesian",
"ResourceLimits": {
      "MaxNumberOfTrainingJobs": 10,
      "MaxParallelTrainingJobs": 2
    }

 

For the volume of data we have and the machine type we use, each training job takes about 3 minutes, including the time needed to provision the resources. So the whole tuning process takes about 15 minutes to complete. 

After the tuning job completes, we can view the algorithm metrics in CloudWatch. Here, each of the dots represents a training job, and the Y-axis shows the objective metric - test:precision. We can see the precision increased steadily through the tuning:

From the graph, we can also see the 6th training job even achieved a 100% precision. Although from experience we know this is not necessarily a good sign, it could either mean there is an overfit or the recall is very low. And indeed, as we drill down further into the metrics, we find out the recall of this job is only 1.7%.

test:binary_f_beta 0.033

Test:precision  1

Test:recall  0.017

 

Nevertheless, since this is the best tuning job according to our model selection criteria, the automation pipeline will deploy this model for prediction.

Inference Endpoint Utilisation

Now, we can put this endpoint into use to produce some prediction results. For this, in the demo, we use the Sagemaker Python SDK. A complete example can be found in notebooks/linear_learner.ipynb in the git repo.

from sagemaker.predictor import RealTimePredictor
linear_predictor=RealTimePredictor('endpoint-name',sagemaker_session)
linear_predictor.predict(input_data)

 

As a response, the Linear Learner model gives us a predicted_label and a score

{'predictions': [{'score': 0.1967127025127411, 'predicted_label': 0.0}]}

 

We can evaluate the model using more data we have. Here is the confusion matrix of all the predictions the model has made:

Not impressed? However, if we set a lower threshold to the score(so only the most certain predictions count), we can actually find the prediction is pretty good. Here is the confusion matrix of all predictions with a score higher than 0.75. Out of 122 most certain predictions the model has made, 106 of them are true positives. Not bad?

Orchestration

So far, we have walked through the data ETL, model training, tuning and deploy process. To link everything together, it's worth taking a look back at the orchestration tool, Step Functions. The Step Functions service is the primary orchestration tool in AWS. It integrates with a wide range of AWS services, and can pass parameters (the state) between services. 

In our case, we utilise some of the built-in Step Function job types like Glue’s StartJobRun, Sagemaker’s CreateHyperParameterTuningJob and so on. This way, we do not need to write additional code just to submit such jobs, instead, we can simply use Step Functions' declarative JSON syntax to define the pipeline. And the execution and state transition will be managed by Step Functions service. It also offers a nice visual presentation for each execution. Below is a graph representation of a successful Step Function execution generated by the service.

Try it yourself

If you wish to play around the demo or use it at the basis for your own ML problem solving, the complete code can be found at: https://github.com/

The demo is built using configuration automation tool Terraform, and the deployment process is fully automated. So you can try it yourself by following the instructions in the README.md.

Conclusion

In this article, we discovered the power of Sagemaker for automated model training and hyperparameter tuning. We also introduced a couple of handy services on AWS for data engineering such as Glue ETL and Step Functions orchestration. The fully-managed services make it entirely possible for even a small team to achieve automated near-real-time model training and prediction. 

However, it's also worth pointing out what can not be automated. Looking back to the entire process, we still need adequate ML knowledge to translate a real-world problem into an ML problem - whether it is a classification or a regression problem for instance. We still need data scientists' expertise, especially during the exploratory analysis, to determine which feature set as well as which base model to use. Finally, limited to my own ML experience, I find it hard to interpret some of the hyperparameter tuning results, which certainly offers scope for improvement. 

Nevertheless, not only does the demo showcase the possibility of automating the learning process, but the architecture has also taken into account the cost, security, and operational complexity of running a system like this. It should serve as a satisfactory baseline for solving similar ML problems. 

 

For more information on how our Data Insights team could help your business, click here.

  • machine-learning
  • aws
  • Data Insights
  • Tech Blog