In the next three videos for the Data Engineering Zoomcamp, we learned about Spark internals and the SQL commands GROUP BY and JOIN.
Spark uses partitioned data. Each executor takes one partition and works on it until it's done, then takes another partition, until all the partitions are done being processed. In the case of a GROUP BY, the first pass can only group whatever is in one partition. The executors do this and store the results in temporary intermediate files. That means that if you are grouping by day, for example, there may be all the days in each parquet file. Each executor groups all the days in one partition, but there are many such groups across all the partitions.
So the output of this first step is a set of temporary files, each belonging to one partition, that has the grouped data in it. The next step is called "reshuffling", where the data from all partitions is combined for the final result.
Here's an example of a GROUP BY command, where we're grouping by the hour and the zone, counting the number of records and adding the revenue for that day and zone:
df_green_revenue = spark.sql("""
SELECT
date_trunc('hour', lpep_pickup_datetime) AS hour,
PULocationID AS zone,
SUM(total_amount) AS amount,
COUNT(1) AS number_records
FROM
green
WHERE
lpep_pickup_datetime >= '2020-01-01 00:00:00'
GROUP BY
1, 2
""")
Here's a visualization from Spark, showing the two stages of the GROUP BY command.
JOIN works similarly when the files being joined are large and approximately the same size. We made another set of files from yellow taxi data, similar to the above, and then joined them with the Spark command df_join = df_green_revenue_tmp.join(df_yellow_revenue_tmp, on=['hour', 'zone'], how='outer')
. This is an outer join, meaning that if the first dataframe doesn't have any data for the hour and zone, but the second one does, we'll keep the record and put Null for the first dataframe. We'll do the same if the second dataframe doesn't have any data for the hour and zone, but the first dataframe does.
Here's the visualization from the Spark UI:
When one file is considerably smaller than the other, Spark can accomplish the join in one pass. It copies the smaller file into each executor along with the partition and does the join completely within the executor.
Here we're joining the zone and borough data from the zones file, which only has 250 or so records.
df_result = df_join.join(df_zones, df_join.zone == df_zones.LocationID)
And here's the Spark UI visualization:
Top comments (0)