The dataset
Let's say you have a simple log file which contains billions of financial transactions, and you'd like to count the occurrences of each type of credit card in order to build some analytics (your consumer's behaviour...).
id | sender | receiver | card_type | amount |
---|---|---|---|---|
1 | Bob | Alan | Visa | 42.00 |
2 | Alan | Dylan | Mastercard | 69.40 |
3 | Lucie | Eric | Visa | 11.90 |
The map phase
First of, you have to build a PCollection of transactions from this source text file :
// Representation of a transaction
type Transaction struct {
Id int
Sender string
Receiver string
CardType string
Amount float64
}
// NOTHING -> PCollection<Transaction>
transactions := textio.Read(s, "../path/to/file.txt")
Then, you must extract the credit card type from each transaction and build a key-value pair where the key is going to be the card type and the value's gonna be 1. This approach will allow you to run a reduce function in aim to aggregate the sum.
// NOTHING -> PCollection<Transaction>
transactions := textio.Read(s, "../path/to/file.txt")
// New
// PCollection<Transaction> -> PCollection<KV<string, int>>
pairedWithOne := beam.ParDo(s, func (tsx Transaction) (string, int) {
return tsx.CardType, 1
}, transactions)
What might this look like on the cluster?
Let's say you have a cluster of 3 worker nodes. Each node has a chunk (or partition) of your data and the user code (UDFs) to apply to each element. We will get a bunch of key-value pairs on each node, just like that:
Worker1 | Worker2 | Worker3 |
---|---|---|
(Visa, 1) | (Mastercard, 1) | (Visa, 1) |
(Mastercard, 1) | (Visa, 1) | (Mastercard, 1) |
(Mastercard, 1) | (Visa, 1) | (Visa, 1) |
(Visa, 1) | (Maestro, 1) | (Mastercard, 1) |
(Visa, 1) | (Mastercard, 1) | (Maestro, 1) |
(Maestro, 1) | (Mastercard, 1) | (Visa, 1) |
The reduce phase
Now, you want to group all values with the same key (the credit card Type) together in order to aggregate a sum. So you'd use a combination of a GroupByKey
with a reducer.
// NOTHING -> PCollection<Transaction>
transactions := textio.Read(s, "../path/to/file.txt")
// PCollection<Transaction> -> PCollection<KV<string, int>>
pairedWithOne := beam.ParDo(s, func (tsx Transaction) (string, int) {
return tsx.CardType, 1
}, transactions)
// New
// PCollection<KV<string, int>> -> PCollection<GBK<string, iter<int>>>
grouppedByKey := beam.GroupByKey(s, pairedWithOne)
// PCollection<GBK<string, iter<int>>> -> PCollection<KV<string, int>>
counted := beam.ParDo(s, func(key string, iter []int) (key, int) {
return key, sum(iter)
}, grouppedByKey)
What might this look like on the cluster?
Right before the reduce phase happens, key-value pairs will move across the network so the same keys will be gathered on the same machine. In this example, we assume that each worker node is hosting a unique key. Each key holds an iterable of integers as a value, which represents the number of time the system has seen a credit card type.
GroupByKey
Worker1 | Worker2 | Worker3 |
---|---|---|
(Visa, [1, 1, 1, 1, 1, 1, 1, 1]) | (Mastercard, [1, 1, 1, 1, 1, 1, 1]) | (Maestro, [1, 1, 1]) |
Reduction
Worker1 | Worker2 | Worker3 |
---|---|---|
(Visa, 8) | (Mastercard, 7) | (Maestro, 3) |
Houston, we have a problem
But didn't we talk about shuffling? Just before the reduce phase, when data is moving across the network, this is called shuffling. And when shuffling a small dataset might be okay... on a huge dataset this will introduce latency because too much network communication kills performance.
So you do not want sending all of your data across the network if it's not absolutely required.
So, in our case, the GroupByKey
was a naive approach. We have elements of the same type and we want to calculate a sum, hum, can we do it in a more efficient way? Can we reduce before doing the shuffle to reduce the data that will be sent over the network? Yay!
SumPerKey/CombinePerKey
SumPerKey
or CombinePerKey
are a combination of first doing a GroupByKey and then reduce-ing on all the values grouped by that key. This is way more efficient than using each separately. And you can use them because you are aggregating values of the same type. So that's perfectly fine.
Using GroupByKey
// PCollection<KV<string, int>> -> PCollection<GBK<string, iter<int>>>
grouppedByKey := beam.GroupByKey(s, pairedWithOne)
// PCollection<GBK<string, iter<int>>> -> PCollection<KV<string, int>>
counted := beam.ParDo(s, func(key string, iter []int) (key, int) {
return key, sum(iter)
}, grouppedByKey)
Using SumPerKey
// PCollection<KV<string, int>> -> PCollection<KV<string, int>>
stats.SumPerKey(s, pairedWithOne)
What might this look like on the cluster?
Because SumPerKey
is an associative reduction : magic happens! In fact, it will reduces the data on the mapper side first before sending the aggregated results to the down stream. And because results are already pre-aggregated, the data that will be sent over the network for the final reduction will be greatly limited. So you get the same output but faster.
Mappers will build key-value pairs
Worker1 | Worker2 | Worker3 |
---|---|---|
(Visa, 1) | (Mastercard, 1) | (Visa, 1) |
(Mastercard, 1) | (Visa, 1) | (Mastercard, 1) |
(Mastercard, 1) | (Visa, 1) | (Visa, 1) |
(Visa, 1) | (Maestro, 1) | (Mastercard, 1) |
(Visa, 1) | (Mastercard, 1) | (Maestro, 1) |
(Maestro, 1) | (Mastercard, 1) | (Visa, 1) |
Mappers will group elements by key
Worker1 | Worker2 | Worker3 |
---|---|---|
(Visa, [1, 1, 1]) | (Mastercard, [1, 1, 1]) | (Visa, [1, 1, 1]) |
(Mastercard, [1, 1]) | (Visa, [1, 1]) | (Mastercard, [1, 1]) |
(Maestro, [1]) | (Maestro, [1]) | (Maestro, [1])) |
Mappers will reduce the data they own locally
Worker1 | Worker2 | Worker3 |
---|---|---|
(Visa, 3) | (Mastercard, 3) | (Visa, 3) |
(Mastercard, 2) | (Visa, 2) | (Mastercard, 2) |
(Maestro, 1) | (Maestro, 1) | (Maestro, 1)) |
And then, here comes the shuffle phase followed by the reduce phase.
Aggregated output
Worker1 | Worker2 | Worker3 |
---|---|---|
(Visa, 8) | (Mastercard, 7) | (Maestro, 3) |
To sum up
Not all problems that can be solved by GroupByKey
can be calculated with SumPerKey
/ CombinePerKey
, in fact they require combining all your values into another value with the exact same type.
But just keep in mind that you can greatly optimise your data processing pipeline by reducing the shuffle as much as possible.
Happy coding. 🔥
Top comments (0)