Today we learned about how you can use Spark to execute SQL commands. As an example, we took the revenue command from week 4 and wrote it as a Spark command that called SQL.
First, we had to download all the 2020 and 2021 green and yellow taxi trip data to parquet files on our virtual machine. This caused a problem for me which took me some time to figure out. I ran out of storage space on my virtual machine, and rather than increasing the disk size, I increased RAM with no effect other than to increase RAM, which would have been a problem because RAM is the most expensive part of a virtual machine.
So let's start from the beginning. First we opened a jupyter notebook, imported Spark, and created a Spark session.
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.master("local[*]") \
.appName('test') \
.getOrCreate()
Next, we used pandas to get the general schema from a file, and we edited the schema in VSCode in order to tell Spark what it's reading. I skipped this step, simply copying what already was in the jupyter notebook. Here's part of the schema, plus the import statement that we use:
from pyspark.sql import types
green_schema = types.StructType([
types.StructField("VendorID", types.IntegerType(), True),
types.StructField("lpep_pickup_datetime", types.TimestampType(), True),
types.StructField("lpep_dropoff_datetime", types.TimestampType(), True),
types.StructField("store_and_fwd_flag", types.StringType(), True),
types.StructField("RatecodeID", types.IntegerType(), True),
types.StructField("PULocationID", types.IntegerType(), True),
types.StructField("DOLocationID", types.IntegerType(), True),
[etc.]
Then we wrote a function to fetch the csv files from the repository and read them back into parquet files. I set up a directory structure to hold the files: in the same directory, I made a directory called data, then in that directory I made two directories, raw and pq. They were both subdivided by type (green or yellow), year, and month, with one file going into each month directory.
Then we cut and pasted the following code 4 times, for each type and each year (2020 and 2021).
year = 2020
for month in range(1, 13):
print(f'processing data for {year}/{month}')
input_path = f'data/raw/green/{year}/{month:02d}/'
output_path = f'data/pq/green/{year}/{month:02d}/'
df_green = spark.read \
.option("header", "true") \
.schema(green_schema) \
.csv(input_path)
df_green \
.repartition(4) \
.write.parquet(output_path)
This went well for green data, and for yellow data in 2021. But for yellow taxi trips in January and February of 2020 (pre-pandemic), I ran out of memory. I asked ChatGPT how to fix this, and it told me to go to Google cloud, stop the virtual machine, edit it, and choose a new machine with more memory. This was wrong. I did that, changing from a machine with 16G to one with 32G, and nothing changed. I did it again, this time changing from 32G to 64G. Nothing! I still ran out of storage. I looked at the FAQ, and I searched the Slack channel for similar problems. There was a similar problem, and the solution was to increase the disk size. Well, I already did that, right? So I gave up for the day.
The next day I tried different things - cleaning up my disk space (there wasn't much else on it), typing all kinds of terminal commands to view disk space, etc. Finally I posted my dilemma in the Slack channel. While waiting for an answer, I looked back over the questions on Slack and found the one I had seen before. I noticed this time, however, that the instructions were to increase disk space rather than change the machine type. I went back and increased disk space, and it worked! Meanwhile, Bruno Oliveira answered my question, I replied that I found the answer, and he told me to definitely not change the machine type. So I went back, edited the virtual machine, and put it back to 16G.
Now I was ready to start the video where we would reproduce what we did in week 4. This was pretty simple. First I read the parquet files into Spark dataframes. There was no need to specify schema now, because the parquet files had the correct schema. In order to union the two dataframes together, I needed to change a few things. First, I had to change the pickup and dropoff datatimes names and remove the prefix "lpep" from the green taxi data and "tpep" from the yellow taxi data.
df_green = spark.read.parquet('data/pq/green/*/*')
df_green = df_green \
.withColumnRenamed('lpep_pickup_datetime', 'pickup_datetime') \
.withColumnRenamed('lpep_dropoff_datetime', 'dropoff_datetime')
Next, I made a list of columns common to both types. There are different ways to do this, but I wanted to preserve the order of the columns in the original data.
common_columns = []
yellow_columns = set(df_yellow.columns)
for col in df_green.columns:
if col in yellow_columns:
common_columns.append(col)
Then I had to add a column for service_type, green or yellow. This required me to import Spark functions and use F.lit (for literal) function. I don't know why, but that's what the instructor did.
from pyspark.sql import functions as F
df_green_sel = df_green \
.select(common_columns) \
.withColumn('service_type', F.lit('green'))
df_yellow_sel = df_yellow \
.select(common_columns) \
.withColumn('service_type', F.lit('yellow'))
Now I was ready to union the two datasets:
df_trips_data = df_green_sel.unionAll(df_yellow_sel)
And now I could count the data and see how much I had.
df_trips_data.groupBy('service_type').count().show()
+------------+--------+
|service_type| count|
+------------+--------+
| green| 2304517|
| yellow|39649199|
+------------+--------+
These were the same numbers that the instructor had, which means I loaded my data correctly.
Finally, I ran the SQL command from week 4. In order to do this, first we have to define a table.
df_trips_data.registerTempTable('trips_data')
To test this, we ran the groupBy above as a SQL command:
spark.sql("""
SELECT
service_type,
count(1)
FROM
trips_data
GROUP BY
service_type
""").show()
The results were the same as for the dataframe groupBy function.
The final SQL command was to generate revenue totals by month for each zone and each service type.
df_result = spark.sql("""
SELECT
-- Reveneue grouping
PULocationID AS revenue_zone,
date_trunc('month', pickup_datetime) AS revenue_month,
service_type,
-- Revenue calculation
SUM(fare_amount) AS revenue_monthly_fare,
SUM(extra) AS revenue_monthly_extra,
SUM(mta_tax) AS revenue_monthly_mta_tax,
SUM(tip_amount) AS revenue_monthly_tip_amount,
SUM(tolls_amount) AS revenue_monthly_tolls_amount,
SUM(improvement_surcharge) AS revenue_monthly_improvement_surcharge,
SUM(total_amount) AS revenue_monthly_total_amount,
SUM(congestion_surcharge) AS revenue_monthly_congestion_surcharge,
-- Additional calculations
AVG(passenger_count) AS avg_montly_passenger_count,
AVG(trip_distance) AS avg_montly_trip_distance
FROM
trips_data
GROUP BY
1, 2, 3
""")
df_result.show()
+------------+-------------------+------------+--------------------+---------------------+-----------------------+--------------------------+----------------------------+-------------------------------------+----------------------------+------------------------------------+--------------------------+------------------------+
|revenue_zone| revenue_month|service_type|revenue_monthly_fare|revenue_monthly_extra|revenue_monthly_mta_tax|revenue_monthly_tip_amount|revenue_monthly_tolls_amount|revenue_monthly_improvement_surcharge|revenue_monthly_total_amount|revenue_monthly_congestion_surcharge|avg_montly_passenger_count|avg_montly_trip_distance|
+------------+-------------------+------------+--------------------+---------------------+-----------------------+--------------------------+----------------------------+-------------------------------------+----------------------------+------------------------------------+--------------------------+------------------------+
| 250|2020-02-01 00:00:00| green| 15359.960000000061| 1282.5| 117.5| 56.01| 590.3200000000003| 180.0000000000011| 17598.43999999995| 11.0| 1.2394957983193278| 4.962810650887575|
| 158|2020-02-01 00:00:00| green| 124.36| 8.25| 0.5| 0.0| 2.8| 0.8999999999999999| 136.81| null| null| 11.090000000000002|
| 15|2020-03-01 00:00:00| green| 1682.2299999999996| 5.5| 6.5| 0.0| 79.56| 19.800000000000026| 1802.44| 0.0| 1.0| 7.910298507462685|
| 229|2020-03-01 00:00:00| green| 676.3600000000001| 0.0| 1.0| 0.0| 42.839999999999996| 8.099999999999998| 728.3| 0.0| 1.0| 7.691111111111111|
| 137|2020-10-01 00:00:00| green| 3480.8299999999995| 0.0| 0.0| 297.0| 250.92000000000013| 32.40000000000005| 4061.1499999999996| null| null| 9.184722222222224|
| 44|2020-10-01 00:00:00| green| 146.5| 3.5| 2.0| 5.5| 0.0| 1.2| 158.7| 0.0| 1.0| 12.0475|
| 152|2020-02-01 00:00:00| green| 37630.82| 1541.25| 1398.5| 2836.6600000000035| 538.3700000000003| 903.8999999999543| 45984.60000000085| 1336.25| 1.2240055826936498| 2.72640336678537|
| 136|2020-03-01 00:00:00| green| 12226.930000000018| 60.5| 89.5| 32.16| 485.2000000000003| 163.20000000000047| 13123.939999999977| 8.25| 1.1637426900584795| 6.4872972972972995|
| 119|2020-03-01 00:00:00| green| 11026.2| 100.5| 108.5| 40.459999999999994| 525.8000000000003| 156.0000000000002| 12021.45999999997| 8.25| 1.0924369747899159| 5.552723880597017|
| 153|2020-03-01 00:00:00| green| 2039.5400000000006| 15.0| 30.5| 29.790000000000006| 33.4| 34.800000000000026| 2204.7799999999993| 0.0| 1.1333333333333333| 4.165042016806725|
| 26|2020-03-01 00:00:00| green| 9467.539999999999| 52.5| 52.0| 0.0| 184.35000000000005| 122.99999999999919| 9929.640000000001| 0.0| 1.1397849462365592| 5.237601918465228|
| 243|2019-12-01 00:00:00| green| 13.99| 0.0| 0.0| 1.0| 0.0| 0.3| 15.29| 0.0| 1.0| 0.0|
| 28|2020-03-01 00:00:00| green| 5891.060000000002| 29.25| 53.0| 80.32000000000001| 195.8400000000001| 84.29999999999956| 6368.820000000014| 2.75| 1.2043795620437956| 5.802602739726027|
| 45|2020-03-01 00:00:00| green| 2403.39| 11.0| 5.0| 0.0| 64.0| 21.30000000000003| 2506.6399999999994| 0.0| 1.2| 9.125633802816903|
| 102|2020-03-01 00:00:00| green| 2874.7399999999984| 26.5| 17.5| 40.22| 104.04| 46.19999999999992| 3135.650000000001| 5.5| 1.1481481481481481| 4.857922077922078|
| 48|2020-10-01 00:00:00| green| 1504.43| 0.0| 0.0| 115.5| 75.72999999999999| 12.600000000000009| 1708.2600000000007| null| null| 9.061428571428573|
| 20|2020-01-01 00:00:00| green| 11375.42| 681.0| 131.0| 90.61999999999999| 479.8600000000003| 147.29999999999987| 12923.199999999997| 11.0| 1.2297872340425533| 4.872311926605508|
| 130|2020-03-01 00:00:00| green| 60166.09999999997| 1436.5| 1759.0| 4972.6999999999925| 731.1700000000004| 1234.1999999999043| 70458.47000000314| 74.25| 1.1157574079202437| 3.8739044205495867|
| 18|2020-03-01 00:00:00| green| 9502.130000000005| 69.5| 100.0| 78.06| 414.14000000000027| 136.19999999999945| 10401.979999999987| 5.5| 1.1727272727272726| 5.481708860759492|
| 265|2020-10-01 00:00:00| green| 5726.509999999998| 16.25| 33.0| 329.21999999999997| 213.6800000000001| 38.69999999999999| 6359.310000000006| 2.75| 1.178082191780822| 12.57108527131784|
+------------+-------------------+------------+--------------------+---------------------+-----------------------+--------------------------+----------------------------+-------------------------------------+----------------------------+------------------------------------+--------------------------+------------------------+
only showing top 20 rows
This created a ton of tiny parquet files, so the last thing we did was coalesce it into one file:
df_result.coalesce(1).write.parquet('data/report/revenue/', mode='overwrite')
Top comments (0)