A Big Data Approach to Decision Trees - Building the Tree
Published:
Building the tree itself does not require parallel computing because it is basically on updating tree nodes properties, controlling the processing queue (list of tree nodes to be processed) and the model (dictionary of nodes in the tree).
As the quantity of nodes in the tree is usually small and can be adjusted by maximum depth and minimum size, there is no need to distribute tasks. Functions which build the tree are insertNode(this_node_index, parcial_tree, process_queue, maxDepth = 4, minSize = 2)
and train(binRDD, maxDepth = 4, minSize = 2)
.
def insertNode(this_node_index, parcial_tree, process_queue, maxDepth = 4, minSize = 2):
"""
Input:
this_node_index : node index to be evaluated
parcial_tree : dictionary with nodes already in the tree
process_queue : queue with nodes that must be evaluated
maxDepth : maximum depth for the tree
minSize : minimum number if samples for each node
Output:
String with elapsed time in format HH:MM:SS.
"""
#Time count initialization
start_time = time.time()
###########################################################################################################
this_node = parcial_tree[this_node_index]
if this_node.depth < maxDepth:
left, right, bestOption, maxIG = evalAndSplit(this_node.data)
if maxIG > 0:
l_size = left.count()
r_size = right.count()
if (l_size >= minSize) or (l_size >= minSize):
this_node.best_option = bestOption
if (l_size >= minSize):
node_index = 1 + sorted(parcial_tree.keys())[-1]
l_child = node(left, node_index)
l_child.depth = 1 + this_node.depth
l_child.parent = this_node_index
parcial_tree[node_index] = l_child
process_queue.append(node_index)
this_node.left_child = node_index
if (r_size >= minSize):
node_index = 1 + sorted(parcial_tree.keys())[-1]
r_child = node(right, node_index)
r_child.depth = 1 + this_node.depth
r_child.parent = this_node_index
parcial_tree[node_index] = r_child
process_queue.append(node_index)
this_node.right_child = node_index
###########################################################################################################
#Calculating elapsed time and printing it on prompt
elapsed_time = time.time() - start_time
return ('\tProcessing Time: {}'.format(time.strftime("%H:%M:%S", time.gmtime(elapsed_time))))
def train(binRDD, maxDepth = 4, minSize = 2):
"""
Input:
binRDD : distributed database in format (class, [features]) with features binarized for train
maxDepth : maximum depth for the tree
minSize : minimum number if samples for each node
Output:
tree : dictionary with trained decision tree structure in format (index : node)
"""
#Time count initialization and printing on prompt
start_time = time.time()
print('Train started at: {}\tMaximum Depth: {}\tMinimum size: {}\n'.format(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), maxDepth, minSize))
###########################################################################################################
tree = {}
process_queue = []
root = node(binRDD, None)
root.depth = 0
tree[0] = root
process_queue.append(0)
while len(process_queue) > 0:
next_index = process_queue.pop(0)
exec_time = insertNode(next_index, tree, process_queue, maxDepth, minSize)
print('Last evaluated node: {}\t\tQueue size: {}{}'.format(next_index,len(process_queue),exec_time))
for i in range(len(tree)):
tree[i].data = None
###########################################################################################################
#Calculating elapsed time and printing it on prompt
elapsed_time = time.time() - start_time
print('\nExecution time: {}\n'.format(time.strftime("%H:%M:%S", time.gmtime(elapsed_time))))
return tree