A Big Data Approach to Decision Trees - Building the Tree

2 minute read

Published:

Building the tree itself does not require parallel computing because it is basically on updating tree nodes properties, controlling the processing queue (list of tree nodes to be processed) and the model (dictionary of nodes in the tree).

As the quantity of nodes in the tree is usually small and can be adjusted by maximum depth and minimum size, there is no need to distribute tasks. Functions which build the tree are insertNode(this_node_index, parcial_tree, process_queue, maxDepth = 4, minSize = 2) and train(binRDD, maxDepth = 4, minSize = 2).

def insertNode(this_node_index, parcial_tree, process_queue, maxDepth = 4, minSize = 2):

    """ 
        Input: 
            this_node_index    : node index to be evaluated
            parcial_tree       : dictionary with nodes already in the tree
            process_queue      : queue with nodes that must be evaluated
            maxDepth           : maximum depth for the tree
            minSize            : minimum number if samples for each node
        
        Output:
            String with elapsed time in format HH:MM:SS.
           
    """
    #Time count initialization
    start_time = time.time()
    ###########################################################################################################
    
    
    
    this_node = parcial_tree[this_node_index]
    
    if this_node.depth < maxDepth:
           
        left, right, bestOption, maxIG = evalAndSplit(this_node.data)

        
        if maxIG > 0:

            l_size = left.count()
            r_size = right.count()
            
            if (l_size >= minSize) or (l_size >= minSize):
                
                this_node.best_option = bestOption
                
                
                if (l_size >= minSize): 
                    
                    node_index = 1 + sorted(parcial_tree.keys())[-1]
                    l_child = node(left, node_index)
                    l_child.depth = 1 + this_node.depth
                    l_child.parent = this_node_index
                    parcial_tree[node_index] = l_child

                    process_queue.append(node_index)
                    this_node.left_child = node_index
                
                
                if (r_size >= minSize): 
                    
                    node_index = 1 + sorted(parcial_tree.keys())[-1]
                    r_child = node(right, node_index)
                    r_child.depth = 1 + this_node.depth
                    r_child.parent = this_node_index
                    parcial_tree[node_index] = r_child

                    process_queue.append(node_index)
                    this_node.right_child = node_index
                    
    ###########################################################################################################
    
    #Calculating elapsed time and printing it on prompt
    elapsed_time = time.time() - start_time
    return ('\tProcessing Time: {}'.format(time.strftime("%H:%M:%S", time.gmtime(elapsed_time))))

def train(binRDD, maxDepth = 4, minSize = 2):

    """
        Input: 
            binRDD     : distributed database in format (class, [features]) with features binarized for train
            maxDepth   : maximum depth for the tree
            minSize    : minimum number if samples for each node
        
        Output:
            tree       : dictionary with trained decision tree structure in format (index : node)
    """
    
    #Time count initialization and printing on prompt
    start_time = time.time()
    print('Train started at: {}\tMaximum Depth: {}\tMinimum size: {}\n'.format(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), maxDepth, minSize))
    
    ###########################################################################################################
    
    tree = {}
    process_queue = []
    root = node(binRDD, None)
    root.depth = 0
    
    tree[0] = root
    process_queue.append(0)
    
    while len(process_queue) > 0:
        next_index = process_queue.pop(0)
        exec_time = insertNode(next_index, tree, process_queue, maxDepth, minSize)
        print('Last evaluated node: {}\t\tQueue size: {}{}'.format(next_index,len(process_queue),exec_time))
    
    for i in range(len(tree)):
        tree[i].data = None
    ###########################################################################################################
    
    #Calculating elapsed time and printing it on prompt
    elapsed_time = time.time() - start_time
    print('\nExecution time: {}\n'.format(time.strftime("%H:%M:%S", time.gmtime(elapsed_time))))
          
       
    return tree