DEV Community

Neylson Crepalde for AWS Community Builders

Posted on

How to run Amazon EMR Serverless with --packages flag

In this previous post, we showed how to run Delta Lake on Amazon EMR Serverless. Since then, a new release was out (6.7.0) with the --packages flag implemented . This helps us getting things done with spark a lot easier. Yet, --packages flag requires some extra networking setup that most of Data Scientists and Engineers are not familiar with. Our goal is to show step by step how to do it.

First, some concept explanations

When using Spark with Java dependencies, we have two options: (1) build and insert .jar files manually in cluster or (2) pass the dependencies to the --packages flag so spark can automatically download them from maven. Since release 6.7.0 of EMR Serverless, this flag is available for use.

The problem is that spark cluster must reach the internet to download packages from maven. Amazon EMR Serverless, at first, lives outside any VPC and so, cannot reach the internet. To do that, you must create your EMR application inside a VPC. However, EMR applications can only be created in private subnets which (by the way...) don't reach the internet and cannot reach S3 😭... How do we fix this?

Step one: networking

The diagram below shows the whole network structure that is necessary:

Network diagram

This is easily created on AWS in the VPC interface. Click the Create VPC button and select VPC and more. AWS does the heavy lifting and provides a design for a VPC with 2 public subnets, 2 private subnets, internet gateway, the necessary route tables and an S3-endpoint (so resources inside the VPC can reach S3).

Create VPC

You can set the Number of availability zones to 1 if you want but in order to have high availability, you should work with, at least, 2 AZ's.

Next, you need to make sure you mark at least one NAT Gateway which is responsible for letting private subnets reach the internet. Below is the screen with the final setup:

Final VPC configuration

Hit create VPC and we're done with networking.

Creating network resources

Last thing is to create a Security Group that allows outbound traffic to the internet. Go back to VPC in AWS and click Security Group in the left panel. Then, click Create security group. Name your security group, uncheck the selected VPC and check the one you have just created. By default, security groups don't allow any inbound traffic and allow all outbound traffic. We can leave it that way. Create the security group et voilà!

Create security group

Step two: IAM Roles and Policies

You need two roles, a Service Linked Role and another role that gives permission to Access S3 and Glue. We have already discussed that in this previous post in the Setup - Authentication section. Check it out. We will also need a dataset to work with. The famous Titanic dataset should do it. You can download it here.

Step three: Create EMR Studio and an EMR Serverless Application

First, we must create an EMR Studio. If you don't have any studios created yet, this is very straightforward. After clicking Get started in the EMR Serverless home page, you can click to create a studio automatically.

Second, you have to create an EMR Serverless application. Set up a name and (remember!) choose release 6.7.0. To setup networking you have to check Choose custom settings and scroll down to Network connections.

Create application

In Network connections choose the VPC you created, the two private subnets and the security group.

EMR application networking configuration

Step four: Spark code

Now, we are preparing a simple pyspark code to simulate some modifications in the dataset (we will include two new passengers - Ney and Sarah - and we will update information on two passengers that were presumed dead but found alive, Mr. Owen Braund and Mr. William Allen). Below is the code to do that.



from pyspark.sql import functions as f
from pyspark.sql import SparkSession

spark = (
    SparkSession.builder
    .config("spark.jars.packages", "io.delta:delta-core_2.12:2.0.0")
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
    .getOrCreate()
)

from delta.tables import *

print("Reading CSV file from S3...")

schema = "PassengerId int, Survived int, Pclass int, Name string, Sex string, Age double, SibSp int, Parch int, Ticket string, Fare double, Cabin string, Embarked string"
df = spark.read.csv(
    "s3://<YOUR-BUCKET>/titanic", 
    header=True, schema=schema, sep=";"
)

print("Writing titanic dataset as a delta table...")
df.write.format("delta").save("s3://<YOUR-BUCKET>/silver/titanic_delta")

print("Updating and inserting new rows...")
new = df.where("PassengerId IN (1, 5)")
new = new.withColumn("Survived", f.lit(1))
newrows = [
    (892, 1, 1, "Sarah Crepalde", "female", 23.0, 1, 0, None, None, None, None),
    (893, 0, 1, "Ney Crepalde", "male", 35.0, 1, 0, None, None, None, None)
]
newrowsdf = spark.createDataFrame(newrows, schema=schema)
new = new.union(newrowsdf)

print("Create a delta table object...")
old = DeltaTable.forPath(spark, "s3://<YOUR-BUCKET>/silver/titanic_delta")


print("UPSERT...")
# UPSERT
(
    old.alias("old")
    .merge(new.alias("new"), 
    "old.PassengerId = new.PassengerId"
    )
    .whenMatchedUpdateAll()
    .whenNotMatchedInsertAll()
    .execute()
)

print("Checking if everything is ok")
print("New data...")

(
    spark.read.format("delta")
    .load("s3://<YOUR-BUCKET>/silver/titanic_delta")
    .where("PassengerId < 6 OR PassengerId > 888")
    .show()
)

print("Old data - with time travel")
(
    spark.read.format("delta")
    .option("versionAsOf", "0")
    .load("s3://<YOUR-BUCKET>/silver/titanic_delta")
    .where("PassengerId < 6 OR PassengerId > 888")
    .show()
)


Enter fullscreen mode Exit fullscreen mode

This .py file should be uploaded to S3.

Step five: GO!

Now, we submit a job for execution. We can do it with AWS CLI:



aws emr-serverless start-job-run \
--name Delta-Upsert \
--application-id <YOUR-APPLICATION-ID> \
--execution-role-arn arn:aws:iam::<ACCOUNT-NUMBER>:role/EMRServerlessJobRole \
--job-driver '{
  "sparkSubmit": {
    "entryPoint": "s3://<YOUR-BUCKET>/pyspark/emrserverless_delta_titanic.py", 
    "sparkSubmitParameters": "--packages io.delta:delta-core_2.12:2.0.0"
  }
}' \
--configuration-overrides '{
"monitoringConfiguration": {
  "s3MonitoringConfiguration": {
    "logUri": "s3://<YOUR-BUCKET>/emr-serverless-logs/"} 
  } 
}'


Enter fullscreen mode Exit fullscreen mode

That's it! When the job is done, go the you log folder and check the logs (look for your application ID, job ID and SPARK_DRIVER logs). You should see something like this:



Reading CSV file from S3...
Writing titanic dataset as a delta table...
Updating and inserting new rows...
Create a delta table object...
UPSERT...
Checking if everything is ok
New data...
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|          1|       1|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7.25| null|       S|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599|71.2833|  C85|       C|
|          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|  7.925| null|       S|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|          113803|   53.1| C123|       S|
|          5|       1|     3|Allen, Mr. Willia...|  male|35.0|    0|    0|          373450|   8.05| null|       S|
|        889|       0|     3|"Johnston, Miss. ...|female|null|    1|    2|      W./C. 6607|  23.45| null|       S|
|        890|       1|     1|Behr, Mr. Karl Ho...|  male|26.0|    0|    0|          111369|   30.0| C148|       C|
|        891|       0|     3| Dooley, Mr. Patrick|  male|32.0|    0|    0|          370376|   7.75| null|       Q|
|        892|       1|     1|      Sarah Crepalde|female|23.0|    1|    0|            null|   null| null|    null|
|        893|       0|     1|        Ney Crepalde|  male|35.0|    1|    0|            null|   null| null|    null|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+

Old data - with time travel
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7.25| null|       S|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599|71.2833|  C85|       C|
|          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|  7.925| null|       S|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|          113803|   53.1| C123|       S|
|          5|       0|     3|Allen, Mr. Willia...|  male|35.0|    0|    0|          373450|   8.05| null|       S|
|        889|       0|     3|"Johnston, Miss. ...|female|null|    1|    2|      W./C. 6607|  23.45| null|       S|
|        890|       1|     1|Behr, Mr. Karl Ho...|  male|26.0|    0|    0|          111369|   30.0| C148|       C|
|        891|       0|     3| Dooley, Mr. Patrick|  male|32.0|    0|    0|          370376|   7.75| null|       Q|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+


Enter fullscreen mode Exit fullscreen mode

Happy coding and build on!

Top comments (2)

Collapse
 
trinketw159 profile image
Trinketw159

Your solution works great! It is effective and easy to follow. I was struggling with this for the past 24 hours. Thank you for posting this!

Collapse
 
bennyelg profile image
Benny Elgazar

What if my project is not single file and much more complex?