Athena Spark is hands down my favorite Spark implementations on AWS. First off, it's a managed service and serverless, meaning you don't need to worry about clusters and you only pay for what you use. Secondly it autoscales for a given workload and very successfully hides the complexity of Spark. Last but not least it's instant - you get a useable Spark session literally within the time it takes for the Notebook editor to render. It's magical!
But what if we want to leverage this magic for production workloads? Specifically scheduled jobs, where an interactive Spark environment just doesn't fit? Although the current version of the service is definitely optimized for interactive experience, running scheduled jobs through Athena is both possible, and magical.
Here is a quick walkthrough of how you can run Athena Spark jobs through good ol' boto3.
First we establish a client connection and start a session. Note this session could be reused for several jobs, although the instantiation time is so fast there is probably not much reason to persist it.
import boto3
client = boto3.client('athena')
calculation_response = client.start_session(
Description='job_session',
WorkGroup='spark_jobs',
EngineConfiguration={
'CoordinatorDpuSize': 1,
'MaxConcurrentDpus': 20 }
)
session_id = calculation_response.get('SessionId')
session_state = calculation_response.get('State')
print(session_id)
print(session_state)
Once the session is established we can start submitting work. Instead of referencing an existing notebook, you will instead submit your code as a string. Yes, this seems a bit weird, but after all Python is an interpreted language, stored in plain text, i.e. a string, so get over it 😃! It would be great if you could reference an S3 URI and hopefully they will provide additional options in the future.
I'd recommend maintaining this code as a separate .py file that could be mocked/tested outside this "driver" code.
with open("complicated_spark_job.py","r") as f:
notebook_code = f.read()
execution_response = client.start_calculation_execution(
SessionId=session_id,
Description='daily job',
CodeBlock=notebook_code,
)
calc_exec_id = execution_response.get('CalculationExecutionId')
We can then iterate and monitor our Spark jobs progress.
while True:
exec_status_response = client.get_calculation_execution_status(
CalculationExecutionId=calc_exec_id
)
exec_state = exec_status_response.get('Status').get('State')
print(exec_state)
if exec_state in ['CANCELED','COMPLETED','FAILED']:
break
sleep(5)
When the job completes we can terminate our session, if we have no other work to submit. Don't worry, if you don't forcibly terminate it will time out after a few minutes of idle time.
client.terminate_session(
SessionId=session_id)
In addition to Athena Spark being fast and easy, it's also very cost effective. DPU's are priced at $0.35 per hour, rounded to the second. So a 1 hour 20 DPU workload (which is allot of processing power) would cost you about 7 bucks!
Top comments (0)