Example 3: Finding the extreme data points

Data

We consider again the customer data CEnetBig data, which is already stored in the HDFS and contains data about the monthly bills of 1 million customers for 2017 and about the type of product/service that they had in 2016. You can load it into R active memory by:

CEnetBig=from.dfs("/CEnetBig")

But it is better to pass the data directly to the mapreduce script. Its data format is the following:

id 2016_1 2016_2 2016_3 2016_4 2016_5 2016_6 2016_7 2016_8 2016_9 2016_10 2016_11 2016_12 type
100496 157.79 116.14 128.48 127.76 142.93  73.57 130.48 116.55  67.96   69.98  116.28  144.39   4
100910 131.59 137.26 134.65  98.14 119.42  90.22  97.13 105.46 116.65  118.49  138.66  139.56   3
100302 144.23 174.78 124.83 118.82 131.76 101.06  70.04 137.27  89.39  145.76  156.86  136.03 5
100456 123.67 106.47 142.16  75.10  75.35  60.41  79.14 102.06 100.06   89.25   92.03   69.60    2
100306 114.09 124.01  66.66  42.00  72.59  59.28  93.26 100.07  82.72   91.33  100.32  128.94    2
100117 140.07 169.40 154.56 140.57  72.29 124.94 127.50 109.57 106.45  127.06  106.78  171.33    5
100254 131.57 109.13 117.88 154.17  71.38  51.11 119.01  97.44 130.20   98.46  104.80  109.57    3

The last column therefore defines the groups (the type of product/service that they had in 2016) and the interesting data is in columns 2016_1 to 2016_12.

Goal

For each group of customers (1:5) find the customer with the maximum total amount for the bills paid in 2016.

Method

We have to compute the row sums for all columns 2016_1 to 2016_12 and then find for each group the row that has the maximum sum. This can be efficiently done with map-reduce: in each data chunk we find the top row for each group using the map operation and then pass them to the reduce part, which instead of sum performs the maximum operation.

Map

The map function computes the row sums for all the data instances in each data chunk and then finds and returns the rows with the maximum row sum for each group.

mapperTopRows = function (., X) {
  top5iList=list();
  for (i in 1:5){
    selectRows=which(X[,14]==i);
    rSum=rowSums(X[selectRows,2:13]);
    indMax <- which.max(rSum)
    top5iList[[i]]=rSum[indMax];  
  }
  keyval(1:5,top5iList);
}

Reduce

In the reduce part we simply take the maximum of the row maxima over all the key-value pairs.

reducerTopRows = function(k, A) {
  keyval(k,list(Reduce('max', A)))
}

Map-Reduce

In this part we perform map and reduce on the data CEnetBig

topRows <-   from.dfs(
    mapreduce(
      input = "/CEnetBig",
      map = mapperTopRows,
      reduce = reducerTopRows,
      combine = T
    )
  )

Results

In topRows$val we obtain the row maxima for each group:

[[1]]
[1] 1359.73

[[2]]
[1] 1520.28

[[3]]
[1] 1589.35

[[4]]
[1] 1742.83

[[5]]
[1] 1812.06

Share this article:

This article is from the free online course:

Managing Big Data with R and Hadoop

Partnership for Advanced Computing in Europe (PRACE)