import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
n = 100 #Number of observations in the training set
trueSplitPoint = 5
trueLowerLevel = 15
trueUpperLevel = 30
x = np.random.uniform(0, 10, n)
def generateY(x, trueLowerLevel, trueUpperLevel, trueSplitPoint, sd):
if x < trueSplitPoint:
y = np.random.normal(trueLowerLevel, sd, 1)
else:
y = np.random.normal(trueUpperLevel, sd, 1)
return y[0]
y = [generateY(i, trueLowerLevel, trueUpperLevel, trueSplitPoint, 3) for i in x]
data = pd.DataFrame({'X':x, 'Y':y}) #Put the two arrays into a dataframe to make it easy to work with
data.head(10) #Inspect the first ten rows of our dataset
plt.scatter(data['X'], data['Y'])
plt.show()
Data appears to be generated by two distinct processes (as indeed it is).
For the simplest version of decision tree we want to identify the split point. When predicting the (continuous) label for a new observation, if the feature is less than the split point we assign one value and if it's greater than the split point we assign another value. Thus the set of possible values we can assign has size two. Clearly this is a very simple regression technique and it's unlikely we'd use this in a real scenario, but we've generated the data in a very contrived way so that it should perform reasonably here and we'll see that the decision tree can approximate non-linear functions by using techniques such as Bagging.
Mathematically, we want to learn a function $f: \mathbb{R} \rightarrow \mathbb{R}$, such that:
\begin{equation} f(x)=\begin{cases} c_0, & \text{if $x<c_{split}$}.\\ c_1, & \text{otherwise}. \end{cases} \end{equation}Our aim is to find the best combination of values for $c_0, c_1 \& c_{split}$
We can't use a Maximum-Likelihood style approach because we don't make any probabilistic assumptions. We're going to test lots of split points, then for each split point we're calculate $c_0$ and $c_1$ by taking all of the points on the relevant side of the split point and calculating the mean of those points.
That is
$$c_0|c_{split} = \frac{\sum_{x_i < c_{split}} x_i}{\sum_{x_i < c_{split}} 1}$$$$c_1|c_{split} = \frac{\sum_{x_i > c_{split}} x_i}{\sum_{x_i > c_{split}} 1}$$And then we'll choose the values of $c_0, c_1 \text{and} c_{split}$ which minimise the squared error on the training set
class univariateDecisionTree:
def __init__(self, data, target, feature, trainTestRatio = 0.9):
#data - a pandas dataset
#target - the name of the pandas column which contains the true labels
#feature - the name of the pandas column which we will use to do the regression
#trainTestRatio - the proportion of the entire dataset which we'll use for training
# - the rest will be used for testing
self.target = target
self.feature = feature
#Split up data into a training and testing set
self.train, self.test = train_test_split(data, test_size=1-trainTestRatio)
#Generate mesh of split points to try. We want the mesh to be of a decent size (say 1000 points)
#Think about what reasonable end points of the mesh would be - what would happen if we considered
#a point that was smaller than the smallest point in the training set as a split point?
#np.linspace() is a good function for constructing this 1-d mesh
meshMin = np.min(self.train[self.feature])
meshMax = np.max(self.train[self.feature])
self.splitPointMesh = np.linspace(meshMin, meshMax, 1000)
def computeMeansGivenSplitPoint(self, splitPoint):
#Given a split point, we want to split the training set in two
#One containing all the points below the split point and one containing all the points above the split point
#The means are then the means of the targets in those datasets and they are the values we want to return
belowSplitPoint = self.train.loc[self.train[self.feature] < splitPoint][self.target].mean()
aboveSplitPoint = self.train.loc[self.train[self.feature] >= splitPoint][self.target].mean()
return belowSplitPoint, aboveSplitPoint
def computeSquaredError(self, splitPoint):
#Once we have a split point and a set of means, we need to have some way of identifying whether it's
#a good split point or not
#First apply compuuteMeansGivenSplitPoint to get the values for above and below the dataset
#Then compute the sum of squared errors yielded by assigning the corresponding mean to each point in the training set
#If we add these two sums of squares together then we have a single error number which indicates how good our split point is
c0, c1 = self.computeMeansGivenSplitPoint(splitPoint)
#To get the value of errorBelow, subset the training set to the points below the split points
#Then calculate the squared difference between target and c0 for each observation in the subset
#Then sum them up (This can all be done in one line)
errorBelow = np.sum((self.train.loc[self.train[self.feature] < splitPoint][self.target] - c0)**2)
#errorAbove works in the same way
errorAbove = np.sum((self.train.loc[self.train[self.feature] >= splitPoint][self.target] - c1)**2)
totalError = errorBelow + errorAbove
return totalError
def fitDT(self):
#Calculate the squared error for each split point in the mesh,
#and save the values (splitPoint and two means) obtained by the best one
#Errors for each member of the mesh
errors = [self.computeSquaredError(sp) for sp in self.splitPointMesh]
#Find the member of the mesh which gives the smallest error
bestSplitPoint = self.splitPointMesh[np.argmin(errors)]
#Save a attributes of the class the split point and two means which give the smallest error
self.c0, self.c1 = self.computeMeansGivenSplitPoint(bestSplitPoint)
self.cSplit = bestSplitPoint
return 0
def predict(self, X):
X = np.array(X).reshape((-1,1))
return [self.c0 if x < self.cSplit else self.c1 for x in X]
myModel = univariateDecisionTree(data, 'Y', 'X')
myModel.fitDT()
We'll plot the regression line we obtained against the values we observed in the training set
regLine = myModel.predict(myModel.splitPointMesh)
plt.scatter(myModel.train[myModel.feature], myModel.train[myModel.target], label = 'Training set')
plt.scatter(myModel.splitPointMesh, regLine, label = 'Regression Line')
plt.legend()
plt.show()