A Big Data Approach to Decision Trees - Implementation

8 minute read

Published:

A Big Data Approach to Decision Trees by Marco Camargo

MIT License

Copyright (c) 2018 Marco Camargo (myoshiro@outlook.com)

Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the “Software”), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.


import time
import datetime
from math import log
from operator import add
import numpy as np

from pyspark import SparkContext
sc = SparkContext.getOrCreate()


Node Class

class node:
    
    def __init__(self, binRDD, parent_node):
        self.data        = binRDD
        self.parent      = parent_node
        self.left_child  = None
        self.right_child = None
        self.best_option = None
        self.depth       = None
        
        ones, n          = binRDD.map(lambda x : (x[0],1)).reduce(lambda y,z : (y[0]+z[0],y[1]+z[1]))
        if ones/n > 0.5:
            self.label   = 1
        else:
            self.label   = 0
        
    def __str__(self):
        msg = 'Parent: {}'.format(self.parent)
        msg += '\t\tLeft: {}'.format(self.left_child)
        msg += '\t\tRight: {}'.format(self.right_child)
        msg += '\t\tBest Option: {}'.format(self.best_option)
        msg += '\t\tDepth: {}'.format(self.depth)
        msg += '\t\tClass: {}'.format(self.label)
        return msg
    
    def __repr__(self):
        return '(p:{},\tl:{},\tr:{},\tbo:{},\td:{},\tc:{})'.format(self.parent, self.left_child, self.right_child, self.best_option, self.depth, self.label)
        


Cost Function

def costFunction(RDD, metric = 'Entropy'):
    
    """
        Input: 
            RDD    : distributed database in format (class, [features])
            metric : metric to be returned {Gini or Entropia}
        
        Output:
            tuple  : (metric, input RDD size) 
    """ 
    
    if RDD.count() > 0:

        labelOne, n = ( RDD
                        .map(lambda sample : (sample[0],1))
                        .reduce(lambda x,y : (x[0]+y[0],x[1]+y[1])) )

        p1 = labelOne/n
        p0 = 1 - p1

        if metric == 'Gini':
            metric = 1 - (p0**2 + p1**2)
        elif metric == 'Entropy':
        
        #lim x.log(x), x-->0 = 0
            if p0 == 0 or p1 == 0:
                metric = 0
            else:
                metric = (-1)*(p0*log(p0,2) + p1*log(p1,2))

        return (metric, n)   
    
    
    else:

        return (0, 0) 


Information Gain

def infoGain(parentRDD, childsList):
    
    """
        Input: 
            parentRDD  : parent node distributed database in format (class, [features])
            childsList : list of distributed database in format (class, [features]) for childres nodes
        
        Output:
            número real que representa o ganho de informação
    """    
    
    parentEntropy, parentN = costFunction(parentRDD,'Entropy')
    
    childsCosts = []
    
    childsEntropy = 0
    
    
    for child in childsList:
        childsCosts.append(costFunction(child, 'Entropy'))
                           
    
    for child in childsCosts:
        childsEntropy += child[0]*(child[1]/parentN)
                           
    
    return parentEntropy - childsEntropy
    


Binarizing Features by Mean Values

def meanRDD(RDD):
    """
        Input: 
            RDD: distributed database in format (class, [features])
        
        Output:
            list with mean values for each feature
    """
    meanVec, n = (RDD
                  .map(lambda sample : (sample[1], 1))
                  .reduce(lambda x,y : (np.array(x[0])+np.array(y[0]),x[1]+y[1]))
                 )
    
    return meanVec/n
def featsBinarize(RDD):

    """
        Input: 
            RDD: distributed database in format (class, [features])
        
        Output:
            binRDD: distributed database in format (class, [features]) with features binarized by their mean values
    """
    
    mean = meanRDD(RDD)
    
    binRDD = (RDD
                 .map(lambda sample : (sample[0], (sample[1]-mean) > 0))
             )
    
    return binRDD


Split Node by Given Feature

def SplitByFeat(binRDD, feat_index):
    
    """
        Input: 
            binRDD     : distributed database in format (class, [features]) with features binarized
            feat_index : index of the feature to split binRDD
        
        Saída:
           leftRDD     : distributed database in format (class, [features]) with samples where choosen feature is TRUE
           rightRDD    : distributed database in format (class, [features]) with samples where choosen feature is FALSE
    """
    
    #Filho da Esquerda -> o atributo dado é TRUE
    leftRDD  = binRDD.filter(lambda sample : sample[1][feat_index])

    #Filho da Direita -> o atributo dado é FALSE
    rightRDD = binRDD.filter(lambda sample : not sample[1][feat_index])

    return leftRDD, rightRDD    


Evaluate Best Option Feature and Split by It

def evalAndSplit(binRDD):

    """
        Input: 
            binRDD     : distributed database in format (class, [features]) with features binarized
        
        Output:
           leftRDD     : distributed database in format (class, [features]) with samples where best option feature is TRUE
           rightRDD    : distributed database in format (class, [features]) with samples where best option feature is FALSE
           bestOption  : index of the feature with maximum information gain
           maxIG       : information gain for the bestOption feature
    """

    candidatesIG = []
    
    for i in range(len(binRDD.first()[1])):
        candidatesIG.append(infoGain(binRDD,(SplitByFeat(binRDD, i))))
        
    maxIG      = max(candidatesIG)
    bestOption = candidatesIG.index(maxIG)
    
        
    left, right = SplitByFeat(binRDD, bestOption)
    
    return left, right, bestOption, maxIG


Building the Tree

def insertNode(this_node_index, parcial_tree, process_queue, maxDepth = 4, minSize = 2):

    """ 
        Input: 
            this_node_index    : node index to be evaluated
            parcial_tree       : dictionary with nodes already in the tree
            process_queue      : queue with nodes that must be evaluated
            maxDepth           : maximum depth for the tree
            minSize            : minimum number if samples for each node
        
        Output:
            String with elapsed time in format HH:MM:SS.
           
    """
    #Time count initialization
    start_time = time.time()
    ###########################################################################################################
    
    
    
    this_node = parcial_tree[this_node_index]
    
    if this_node.depth < maxDepth:
           
        left, right, bestOption, maxIG = evalAndSplit(this_node.data)

        
        if maxIG > 0:

            l_size = left.count()
            r_size = right.count()
            
            if (l_size >= minSize) or (l_size >= minSize):
                
                this_node.best_option = bestOption
                
                
                if (l_size >= minSize): 
                    
                    node_index = 1 + sorted(parcial_tree.keys())[-1]
                    l_child = node(left, node_index)
                    l_child.depth = 1 + this_node.depth
                    l_child.parent = this_node_index
                    parcial_tree[node_index] = l_child

                    process_queue.append(node_index)
                    this_node.left_child = node_index
                
                
                if (r_size >= minSize): 
                    
                    node_index = 1 + sorted(parcial_tree.keys())[-1]
                    r_child = node(right, node_index)
                    r_child.depth = 1 + this_node.depth
                    r_child.parent = this_node_index
                    parcial_tree[node_index] = r_child

                    process_queue.append(node_index)
                    this_node.right_child = node_index
                    
    ###########################################################################################################
    
    #Calculating elapsed time and printing it on prompt
    elapsed_time = time.time() - start_time
    return ('\tProcessing Time: {}'.format(time.strftime("%H:%M:%S", time.gmtime(elapsed_time))))


Training the Model

def train(binRDD, maxDepth = 4, minSize = 2):

    """
        Input: 
            binRDD     : distributed database in format (class, [features]) with features binarized for train
            maxDepth   : maximum depth for the tree
            minSize    : minimum number if samples for each node
        
        Output:
            tree       : dictionary with trained decision tree structure in format (index : node)
    """
    
    #Time count initialization and printing on prompt
    start_time = time.time()
    print('Train started at: {}\tMaximum Depth: {}\tMinimum size: {}\n'.format(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), maxDepth, minSize))
    
    ###########################################################################################################
    
    tree = {}
    process_queue = []
    root = node(binRDD, None)
    root.depth = 0
    
    tree[0] = root
    process_queue.append(0)
    
    while len(process_queue) > 0:
        next_index = process_queue.pop(0)
        exec_time = insertNode(next_index, tree, process_queue, maxDepth, minSize)
        print('Last evaluated node: {}\t\tQueue size: {}{}'.format(next_index,len(process_queue),exec_time))
    
    for i in range(len(tree)):
        tree[i].data = None
    ###########################################################################################################
    
    #Calculating elapsed time and printing it on prompt
    elapsed_time = time.time() - start_time
    print('\nExecution time: {}\n'.format(time.strftime("%H:%M:%S", time.gmtime(elapsed_time))))
          
       
    return tree


Predict Class of a Sample

def predict(features, model):

    """
        Input: 
            features    : features of a sample from a binRDD that class will be predicted
            model       : dictionary with trained decision tree structure in format (index : node)

        Output:
            pred        : predicted class for the given sample
    """
    
    next_node = 0

    pred = None

    while next_node != None:


        pred = model[next_node].label

        if model[next_node].best_option != None:

            if features[model[next_node].best_option]:
                next_node = model[next_node].left_child
            else:
                next_node = model[next_node].right_child
        else:

            next_node = None        

    return pred


Accuracy

def accuracy(binRDD, predictions):
    
    """
        Input: 
            binRDD      : distributed database in format (class, [features]) with features binarized for train 
            predictions : predictions from the model for the given binRDD

        Output:
            accuracy    : model accuracy for prediction
    """
    
    num = ( binRDD
               .map(lambda sample: sample[0]) 
               .zip(predictions)              
               .filter(lambda sample : sample[0] == sample[1])
               .count() 
             )
              
    return num/predictions.count()


Predicting Label of a Dataset

def test(binRDD, model):

    """
        Input: 
            binRDD     : distributed database in format (class, [features]) with features binarized for train 
            model      : dictionary with trained decision tree structure in format (index : node)
        
        Output:
            predictions: predictions from the model for the given binRDD
            acurracy   : model accuracy for prediction
    """

    #Time count initialization and printing on prompt
    start_time = time.time()
    print('Test started at:: {}\n'.format(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")))
    
    ###########################################################################################################
          
    predictions = binRDD.map(lambda sample : predict(sample[1], model))
    acc         = accuracy(binRDD, predictions)
       
    ###########################################################################################################
    
    #Calculating elapsed time and printing it on prompt
    elapsed_time = time.time() - start_time
    print('Execution time: {}\n'.format(time.strftime("%H:%M:%S", time.gmtime(elapsed_time))))
    
    return predictions, acc


Main Function

#Reading data file and formating to (class, [features])
dataset = (
            sc.textFile('SUSY.csv', ) 
            .map(lambda line   : line.split(','))
            .map(lambda row    : [float(element) for element in row])
            .map(lambda sample : (sample[0], sample[1:])) #Formating tuple (class, [features])
           ).cache()
#Binarinzing features of dataset
binRDD = featsBinarize(dataset)
binRDD.cache()
binRDD.count()
#Spliting dataset to train and test
seed = 42
weights = [0.8, 0.2]

binTrain, binTest = binRDD.randomSplit(weights, seed)

binTrain
binTest
binTrain.count()
binTest.count()
#Training the decision tree model
model = train(binRDD)
#Printing result model
model
#Predicting label for train dataset
predictionsTrain, trainAcc = test(binTrain, model)
#Predicting label for test dataset
predictionsTest, testAcc = test(binTest, model)
#Printing train and test accuracy
print('Train Accuracy: {:.3f}%\tTest Accuracy: {:.3f}%'.format(100*trainAcc, 100*testAcc))