-
Notifications
You must be signed in to change notification settings - Fork 472
Expand file tree
/
Copy pathspark-combineByKey.txt
More file actions
executable file
·161 lines (120 loc) · 5.81 KB
/
spark-combineByKey.txt
File metadata and controls
executable file
·161 lines (120 loc) · 5.81 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
Some Notes on Spark's combineByKey()
combineByKey(createCombiner, mergeValue, mergeCombiners)
Generic function to combine the elements for each key using
a custom set of aggregation functions.
Turns an RDD[(K, V)] into a result of type RDD[(K, C)],
for a “combined type” C. Note that V and C can be different –
for example, one might group an RDD of type (Int, Int) into an
RDD of type (Int, List[Int]).
Users provide three functions:
createCombiner, which turns a V into a C (e.g., creates a one-element list)
V --> C
mergeValue, to merge a V into a C (e.g., adds it to the end of a list)
C, V --> C
mergeCombiners, to combine two C’s into a single one.
C, C --> C
In addition, users can control the partitioning of the output RDD.
>>> x = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
>>> def add(a, b): return a + str(b)
>>> sorted(x.combineByKey(str, add, add).collect())
[('a', '11'), ('b', '1')]
========================
Average By Key: use combineByKey()
The example below uses data in the form of a list of key-value
tuples: (key, value). I turn that list into a Resilient Distributed
Dataset (RDD) with sc.parallelize, where sc is an instance of
pyspark.SparkContext.
The next step is to use combineByKey to compute the sum and count
for each key in data. Admittedly, using three lambda-functions as
arguments to combineByKey makes the code difficult to read. I will
explain each lambda-function in the next section. The result, sumCount,
is an RDD where its values are in the form of (label, (sum, count)).
To compute the average-by-key, I use the map method to divide the sum
by the count for each key.
Finally, I use the collectAsMap method to return the average-by-key
as a dictionary.
data = sc.parallelize( [(0, 2.), (0, 4.), (1, 0.), (1, 10.), (1, 20.)] )
sumCount = data.combineByKey(lambda value: (value, 1),
lambda x, value: (x[0] + value, x[1] + 1),
lambda x, y: (x[0] + y[0], x[1] + y[1]))
averageByKey =
sumCount.map(lambda (label, (value_sum, count)): (label, value_sum / count))
print averageByKey.collectAsMap()
Result:
{0: 3.0, 1: 10.0}
See here for the above example as an executable script.
The combineByKey Method
In order to aggregate an RDD's elements in parallel, Spark's combineByKey
method requires three functions:
createCombiner
mergeValue
mergeCombiner
Create a Combiner
=================
lambda value: (value, 1)
The first required argument in the combineByKey method is a function to
be used as the very first aggregation step for each key. The argument of
this function corresponds to the value in a key-value pair. If we want to
compute the sum and count using combineByKey, then we can create this
"combiner" to be a tuple in the form of (sum, count). The very first
step in this aggregation is then (value, 1), where value is the first
RDD value that combineByKey comes across and 1 initializes the count.
Merge a Value
=============
lambda x, value: (x[0] + value, x[1] + 1)
The next required function tells combineByKey what to do when a combiner
is given a new value. The arguments to this function are a combiner and
a new value. The structure of the combiner is defined above as a tuple
in the form of (sum, count) so we merge the new value by adding it to the
first element of the tuple while incrementing 1 to the second element of
the tuple.
Merge two Combiners
===================
lambda x, y: (x[0] + y[0], x[1] + y[1])
The final required function tells combineByKey how to merge two combiners.
In this example with tuples as combiners in the form of (sum, count), all
we need to do is add the first and last elements together.
Compute the Average
averageByKey =
sumCount.map(lambda (label, (value_sum, count)):
(label, value_sum / count))
Ultimately the goal is to compute the average-by-key. The result from
combineByKey is an RDD with elements in the form (label, (sum, count)),
so the average-by-key can easily be obtained by using the map method,
mapping (sum, count) to sum / count.
Note: I do not use sum as variable name in the code because it is a
built-in function in Python.
==================
First, let's break the process down:
First, createCombiner creates the initial value (combiner) for a key's
first encounter on a partition if one is not found --> (firstValueEncountered, 1).
So, this is merely initializing a tuple with the first value and a key counter
of 1.
Then, mergeValue is triggered only if a combiner (tuple in our case) has
already been created for the found key on this
partition --> (existingTuple._1 + subSequentValue, existingTuple._2 + 1).
This adds the existing tuple's value (in the first slot) with the newly
encountered value and takes the existing tuple's counter (in the second slot)
and increments it. So, we are
Then, mergeCombiner takes the combiners (tuples) created on each partition
and merges them
together --> (tupleFromPartition._1 + tupleFromPartition2._1,
tupleFromPartition1._2 + tupleFromPartition2._2).
This is merely adding the values from each tuple together and the
counters together into one tuple.
Then, let's break up a subset of your data into partitions and see
it in action:
("A", 3), ("A", 9), ("A", 12),("B", 4), ("B", 10), ("B", 11)
Partition 1
A=3 --> createCombiner(3) ==> accum[A] = (3, 1)
A=9 --> mergeValue(accum[A], 9) ==> accum[A] = (3 + 9, 1 + 1)
B=11 --> createCombiner(11) ==> accum[B] = (11, 1)
Partition 2
A=12 --> createCombiner(12) ==> accum[A] = (12, 1)
B=4 --> createCombiner(4) ==> accum[B] = (4, 1)
B=10 --> mergeValue(accum[B], 10) ==> accum[B] = (4 + 10, 1 + 1)
Merge partitions together
A ==> mergeCombiner((12, 2), (12, 1)) ==> (12 + 12, 2 + 1)
B ==> mergeCombiner((11, 1), (14, 2)) ==> (11 + 14, 1 + 2)
So, you should get back an array something like this:
Array((A, (24, 3)), (B, (25, 3)))