A Big Data Approach to Decision Trees - Auxilary Functions

1 minute read

Published:

meanRDD(RDD): eturns the mean value of every features by applying a (class, [features]) to ([features], 1) map, reducing with adding and doing the division [sum of each feature]/n.

def meanRDD(RDD):
    """
        Input: 
            RDD: distributed database in format (class, [features])
        
        Output:
            list with mean values for each feature
    """
    meanVec, n = (RDD
                  .map(lambda sample : (sample[1], 1))
                  .reduce(lambda x,y : (np.array(x[0])+np.array(y[0]),x[1]+y[1]))
                 )
    
    return meanVec/n

featsBinarize(RDD): binarize the features by mapping (class, [features]) to (class, [features] - [means] > 0).

def featsBinarize(RDD):

    """
        Input: 
            RDD: distributed database in format (class, [features])
        
        Output:
            binRDD: distributed database in format (class, [features]) with features binarized by their mean values
    """
    
    mean = meanRDD(RDD)
    
    binRDD = (RDD
                 .map(lambda sample : (sample[0], (sample[1]-mean) > 0))
             )
    
    return binRDD

accuracy(binRDD, predictions): maps (class, [features]) to (class), zips it with predictions, filters class equals prediction, counts and do the division by the total of samples to return accuracy.

def accuracy(binRDD, predictions):
    
    """
        Input: 
            binRDD      : distributed database in format (class, [features]) with features binarized for train 
            predictions : predictions from the model for the given binRDD

        Output:
            accuracy    : model accuracy for prediction
    """
    
    num = ( binRDD
               .map(lambda sample: sample[0]) 
               .zip(predictions)              
               .filter(lambda sample : sample[0] == sample[1])
               .count() 
             )
              
    return num/predictions.count()

test(binRDD, model): testing function maps each sample with a non-parallel function predict(features, model) that goes through a given tree and returns a predicted class.

def test(binRDD, model):

    """
        Input: 
            binRDD     : distributed database in format (class, [features]) with features binarized for train 
            model      : dictionary with trained decision tree structure in format (index : node)
        
        Output:
            predictions: predictions from the model for the given binRDD
            acurracy   : model accuracy for prediction
    """

    #Time count initialization and printing on prompt
    start_time = time.time()
    print('Test started at:: {}\n'.format(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")))
    
    ###########################################################################################################
          
    predictions = binRDD.map(lambda sample : predict(sample[1], model))
    acc         = accuracy(binRDD, predictions)
       
    ###########################################################################################################
    
    #Calculating elapsed time and printing it on prompt
    elapsed_time = time.time() - start_time
    print('Execution time: {}\n'.format(time.strftime("%H:%M:%S", time.gmtime(elapsed_time))))
    
    return predictions, acc