3.12

# Example 3: finding extreme data points

## Data

We consider again the customers data CEnetBig data, which is already stored in the HDFS and contains data about the monthly bills of 1 million customers for 2017 and about the type of product/service that the were having in 2016. You can load it in R using.

CEnetBig=from.dfs("/CEnetBig")


But it is better to pass the data directly to mapreduce script. Its data format is the following:

id 2016_1 2016_2 2016_3 2016_4 2016_5 2016_6 2016_7 2016_8 2016_9 2016_10 2016_11 2016_12 type
100496 157.79 116.14 128.48 127.76 142.93  73.57 130.48 116.55  67.96   69.98  116.28  144.39   4
100910 131.59 137.26 134.65  98.14 119.42  90.22  97.13 105.46 116.65  118.49  138.66  139.56   3
100302 144.23 174.78 124.83 118.82 131.76 101.06  70.04 137.27  89.39  145.76  156.86  136.03 5
100456 123.67 106.47 142.16  75.10  75.35  60.41  79.14 102.06 100.06   89.25   92.03   69.60    2
100306 114.09 124.01  66.66  42.00  72.59  59.28  93.26 100.07  82.72   91.33  100.32  128.94    2
100117 140.07 169.40 154.56 140.57  72.29 124.94 127.50 109.57 106.45  127.06  106.78  171.33    5
100254 131.57 109.13 117.88 154.17  71.38  51.11 119.01  97.44 130.20   98.46  104.80  109.57    3


The last column therefore defines the groups (the type of product/service that they were having in 2016) and the interesting data is in columns 2016_1,…,2016_12.

## Goal

For each group of customers (1:5) find the customer with maximum total amount of bills paid in 2016.

## Method

We have to compute the row sums for all columns 2016_1,…,2016_12 and then find for each group the row that has the maximum sum. This can be efficiently done with mapreduce: in each data chunk we find the top row for each group by map operation and then pass them to the reduce part which instead of sum perform the maximum operation.

### Map

The map function computes the row sums for all data instances in each data chunk and then finds and returns the rows with maximum row sum, for each group.

mapperTopRows = function (., X) {
top5iList=list();
for (i in 1:5){
selectRows=which(X[,14]==i);
rSum=rowSums(X[selectRows,2:13]);
indMax <- which.max(rSum)
top5iList[[i]]=rSum[indMax];
}
keyval(1:5,top5iList);
}


## Reduce

In the reduce part we simply take the maximum of the row maxima over all key-value pairs.

reducerTopRows = function(k, A) {
keyval(k,list(Reduce('max', A)))
}


## Map-Reduce

In this part we perform map and reduce on the data CEnetBig

topRows <-   from.dfs(
mapreduce(
input = "/CEnetBig",
map = mapperTopRows,
reduce = reducerTopRows,
combine = T
)
)


## Results

In topRows\$val we obtain the row maxima for each group:

[[1]]
[1] 1359.73

[[2]]
[1] 1520.28

[[3]]
[1] 1589.35

[[4]]
[1] 1742.83

[[5]]
[1] 1812.06