A Big Data Approach to Decision Trees - Implementation
Published:
A Big Data Approach to Decision Trees by Marco Camargo
MIT License
Copyright (c) 2018 Marco Camargo (myoshiro@outlook.com)
Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the “Software”), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
Header
import time
import datetime
from math import log
from operator import add
import numpy as np
from pyspark import SparkContext
sc = SparkContext.getOrCreate()
Node Class
class node:
def __init__(self, binRDD, parent_node):
self.data = binRDD
self.parent = parent_node
self.left_child = None
self.right_child = None
self.best_option = None
self.depth = None
ones, n = binRDD.map(lambda x : (x[0],1)).reduce(lambda y,z : (y[0]+z[0],y[1]+z[1]))
if ones/n > 0.5:
self.label = 1
else:
self.label = 0
def __str__(self):
msg = 'Parent: {}'.format(self.parent)
msg += '\t\tLeft: {}'.format(self.left_child)
msg += '\t\tRight: {}'.format(self.right_child)
msg += '\t\tBest Option: {}'.format(self.best_option)
msg += '\t\tDepth: {}'.format(self.depth)
msg += '\t\tClass: {}'.format(self.label)
return msg
def __repr__(self):
return '(p:{},\tl:{},\tr:{},\tbo:{},\td:{},\tc:{})'.format(self.parent, self.left_child, self.right_child, self.best_option, self.depth, self.label)
Cost Function
def costFunction(RDD, metric = 'Entropy'):
"""
Input:
RDD : distributed database in format (class, [features])
metric : metric to be returned {Gini or Entropia}
Output:
tuple : (metric, input RDD size)
"""
if RDD.count() > 0:
labelOne, n = ( RDD
.map(lambda sample : (sample[0],1))
.reduce(lambda x,y : (x[0]+y[0],x[1]+y[1])) )
p1 = labelOne/n
p0 = 1 - p1
if metric == 'Gini':
metric = 1 - (p0**2 + p1**2)
elif metric == 'Entropy':
#lim x.log(x), x-->0 = 0
if p0 == 0 or p1 == 0:
metric = 0
else:
metric = (-1)*(p0*log(p0,2) + p1*log(p1,2))
return (metric, n)
else:
return (0, 0)
Information Gain
def infoGain(parentRDD, childsList):
"""
Input:
parentRDD : parent node distributed database in format (class, [features])
childsList : list of distributed database in format (class, [features]) for childres nodes
Output:
número real que representa o ganho de informação
"""
parentEntropy, parentN = costFunction(parentRDD,'Entropy')
childsCosts = []
childsEntropy = 0
for child in childsList:
childsCosts.append(costFunction(child, 'Entropy'))
for child in childsCosts:
childsEntropy += child[0]*(child[1]/parentN)
return parentEntropy - childsEntropy
Binarizing Features by Mean Values
def meanRDD(RDD):
"""
Input:
RDD: distributed database in format (class, [features])
Output:
list with mean values for each feature
"""
meanVec, n = (RDD
.map(lambda sample : (sample[1], 1))
.reduce(lambda x,y : (np.array(x[0])+np.array(y[0]),x[1]+y[1]))
)
return meanVec/n
def featsBinarize(RDD):
"""
Input:
RDD: distributed database in format (class, [features])
Output:
binRDD: distributed database in format (class, [features]) with features binarized by their mean values
"""
mean = meanRDD(RDD)
binRDD = (RDD
.map(lambda sample : (sample[0], (sample[1]-mean) > 0))
)
return binRDD
Split Node by Given Feature
def SplitByFeat(binRDD, feat_index):
"""
Input:
binRDD : distributed database in format (class, [features]) with features binarized
feat_index : index of the feature to split binRDD
Saída:
leftRDD : distributed database in format (class, [features]) with samples where choosen feature is TRUE
rightRDD : distributed database in format (class, [features]) with samples where choosen feature is FALSE
"""
#Filho da Esquerda -> o atributo dado é TRUE
leftRDD = binRDD.filter(lambda sample : sample[1][feat_index])
#Filho da Direita -> o atributo dado é FALSE
rightRDD = binRDD.filter(lambda sample : not sample[1][feat_index])
return leftRDD, rightRDD
Evaluate Best Option Feature and Split by It
def evalAndSplit(binRDD):
"""
Input:
binRDD : distributed database in format (class, [features]) with features binarized
Output:
leftRDD : distributed database in format (class, [features]) with samples where best option feature is TRUE
rightRDD : distributed database in format (class, [features]) with samples where best option feature is FALSE
bestOption : index of the feature with maximum information gain
maxIG : information gain for the bestOption feature
"""
candidatesIG = []
for i in range(len(binRDD.first()[1])):
candidatesIG.append(infoGain(binRDD,(SplitByFeat(binRDD, i))))
maxIG = max(candidatesIG)
bestOption = candidatesIG.index(maxIG)
left, right = SplitByFeat(binRDD, bestOption)
return left, right, bestOption, maxIG
Building the Tree
def insertNode(this_node_index, parcial_tree, process_queue, maxDepth = 4, minSize = 2):
"""
Input:
this_node_index : node index to be evaluated
parcial_tree : dictionary with nodes already in the tree
process_queue : queue with nodes that must be evaluated
maxDepth : maximum depth for the tree
minSize : minimum number if samples for each node
Output:
String with elapsed time in format HH:MM:SS.
"""
#Time count initialization
start_time = time.time()
###########################################################################################################
this_node = parcial_tree[this_node_index]
if this_node.depth < maxDepth:
left, right, bestOption, maxIG = evalAndSplit(this_node.data)
if maxIG > 0:
l_size = left.count()
r_size = right.count()
if (l_size >= minSize) or (l_size >= minSize):
this_node.best_option = bestOption
if (l_size >= minSize):
node_index = 1 + sorted(parcial_tree.keys())[-1]
l_child = node(left, node_index)
l_child.depth = 1 + this_node.depth
l_child.parent = this_node_index
parcial_tree[node_index] = l_child
process_queue.append(node_index)
this_node.left_child = node_index
if (r_size >= minSize):
node_index = 1 + sorted(parcial_tree.keys())[-1]
r_child = node(right, node_index)
r_child.depth = 1 + this_node.depth
r_child.parent = this_node_index
parcial_tree[node_index] = r_child
process_queue.append(node_index)
this_node.right_child = node_index
###########################################################################################################
#Calculating elapsed time and printing it on prompt
elapsed_time = time.time() - start_time
return ('\tProcessing Time: {}'.format(time.strftime("%H:%M:%S", time.gmtime(elapsed_time))))
Training the Model
def train(binRDD, maxDepth = 4, minSize = 2):
"""
Input:
binRDD : distributed database in format (class, [features]) with features binarized for train
maxDepth : maximum depth for the tree
minSize : minimum number if samples for each node
Output:
tree : dictionary with trained decision tree structure in format (index : node)
"""
#Time count initialization and printing on prompt
start_time = time.time()
print('Train started at: {}\tMaximum Depth: {}\tMinimum size: {}\n'.format(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), maxDepth, minSize))
###########################################################################################################
tree = {}
process_queue = []
root = node(binRDD, None)
root.depth = 0
tree[0] = root
process_queue.append(0)
while len(process_queue) > 0:
next_index = process_queue.pop(0)
exec_time = insertNode(next_index, tree, process_queue, maxDepth, minSize)
print('Last evaluated node: {}\t\tQueue size: {}{}'.format(next_index,len(process_queue),exec_time))
for i in range(len(tree)):
tree[i].data = None
###########################################################################################################
#Calculating elapsed time and printing it on prompt
elapsed_time = time.time() - start_time
print('\nExecution time: {}\n'.format(time.strftime("%H:%M:%S", time.gmtime(elapsed_time))))
return tree
Predict Class of a Sample
def predict(features, model):
"""
Input:
features : features of a sample from a binRDD that class will be predicted
model : dictionary with trained decision tree structure in format (index : node)
Output:
pred : predicted class for the given sample
"""
next_node = 0
pred = None
while next_node != None:
pred = model[next_node].label
if model[next_node].best_option != None:
if features[model[next_node].best_option]:
next_node = model[next_node].left_child
else:
next_node = model[next_node].right_child
else:
next_node = None
return pred
Accuracy
def accuracy(binRDD, predictions):
"""
Input:
binRDD : distributed database in format (class, [features]) with features binarized for train
predictions : predictions from the model for the given binRDD
Output:
accuracy : model accuracy for prediction
"""
num = ( binRDD
.map(lambda sample: sample[0])
.zip(predictions)
.filter(lambda sample : sample[0] == sample[1])
.count()
)
return num/predictions.count()
Predicting Label of a Dataset
def test(binRDD, model):
"""
Input:
binRDD : distributed database in format (class, [features]) with features binarized for train
model : dictionary with trained decision tree structure in format (index : node)
Output:
predictions: predictions from the model for the given binRDD
acurracy : model accuracy for prediction
"""
#Time count initialization and printing on prompt
start_time = time.time()
print('Test started at:: {}\n'.format(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")))
###########################################################################################################
predictions = binRDD.map(lambda sample : predict(sample[1], model))
acc = accuracy(binRDD, predictions)
###########################################################################################################
#Calculating elapsed time and printing it on prompt
elapsed_time = time.time() - start_time
print('Execution time: {}\n'.format(time.strftime("%H:%M:%S", time.gmtime(elapsed_time))))
return predictions, acc
Main Function
#Reading data file and formating to (class, [features])
dataset = (
sc.textFile('SUSY.csv', )
.map(lambda line : line.split(','))
.map(lambda row : [float(element) for element in row])
.map(lambda sample : (sample[0], sample[1:])) #Formating tuple (class, [features])
).cache()
#Binarinzing features of dataset
binRDD = featsBinarize(dataset)
binRDD.cache()
binRDD.count()
#Spliting dataset to train and test
seed = 42
weights = [0.8, 0.2]
binTrain, binTest = binRDD.randomSplit(weights, seed)
binTrain
binTest
binTrain.count()
binTest.count()
#Training the decision tree model
model = train(binRDD)
#Printing result model
model
#Predicting label for train dataset
predictionsTrain, trainAcc = test(binTrain, model)
#Predicting label for test dataset
predictionsTest, testAcc = test(binTest, model)
#Printing train and test accuracy
print('Train Accuracy: {:.3f}%\tTest Accuracy: {:.3f}%'.format(100*trainAcc, 100*testAcc))