DEV Community

Abdullah Paracha for AWS Community Builders

Posted on • Updated on

Real-Time User Behavior Insights with Amazon Kinesis and Apache Flink: A Guide to Sessionizing Clickstream Data

šŸ‘‰šŸ» This walkthrough is to stimulate a real world use case of sessionizing the clickstream data using Amazon Managed Service for Apache Flink, and storing sessions in an Amazon DynamoDB Table with AWS Lambda.
Note: Some processes and resources have been pre-configured.

  1. Use Case Scenario
  2. AWS Tools and Resources
  3. Process and Architecture
  4. Step by Step Configuration and Walkthrough
    • Starting a Managed Apache Flink Studio Notebook Application
    • Connecting to the Virtual Machine using EC2 Instance Connect and Simulating a Real-Time Clickstream
    • Sessionizing the Clickstream Data using Amazon Managed Service for Apache Flink
    • Storing Sessions in an Amazon DynamoDB Table with AWS Lambda
  5. Conclusion

Streaming data is data that is generated continuously by thousands of data sources, such as log files generated by customers using your mobile or web applications, ecommerce purchases, in-game player activity, information from social networks, etc.. [1]

Sessionizing clickstream data involves grouping user interactions into sessions, where each session represents a sequence of activities by a user within a specific time window. There are many use cases of sessionizing clickstream data, including user behavior analysis, personalized recommendations, performance measurement, and fraud detection, etc..

Sessionizing clickstream data allows for real-time processing and analysis, providing immediate insights into user behavior and enabling prompt actions based on these insights. This real-time capability is essential for applications that require up-to-the-minute data to enhance user experiences and operational efficiency.

In the following use case, we will be stimulating a real-time clickstream data, sessionizing clickstream data using Amazon Managed Service for Apache Flink, and storing sessions data in Amazon DynamoDB table with AWS Lambda.

1. Use Case Scenario:

XShop, an online retail company, wants to enhance its customer experience by gaining deeper insights into user behavior on its website. They aim to achieve this by analyzing clickstream data in real-time to understand user interactions, optimize marketing strategies, and improve site performance.

As the data engineering teamof Xshop, we are targeting to build a solution on AWS to collect clickstream data and store session data for downstream processing and analysis.

2. AWS tools and resources:

Amazon Managed Service for Apache Flink is a fully managed service that enables you to perform analysis using SQL and other tools on streaming data in real time. There are no servers and clusters to manage, and there is no compute and storage infrastructure to set up. You pay only for the resources you use. [2]

Use cases for Amazon Managed Service for Apache Flink include:

  • Streaming extract, transform, and load (ETL) jobs
  • Create real-time log analysis
  • Ad-tech and digital marketing analysis
  • Perform stateful processing

Amazon DynamoDB is a fully managed NoSQL database service that provides fast and predictable performance with seamless scalability. You can use Amazon DynamoDB to create a database table that can store and retrieve any amount of data, and serve any level of request traffic. [3]

AWS Lambda provides compute service that runs your code in response to events and automatically manages the compute resources. You can combine AWS Lambda with other AWS services, preprocess data before feeding data into machine learning mode, or execute code in response to trigger changes in data. [4]

In this use case, we will:

  • Use Amazon Kinesis and Amazon Managed Service for Apache Flink to analyze clickstream data
  • Create an AWS Lambda function that adds records to an Amazon DynamoDB table
  • Configure Amazon Kinesis to send results to your AWS Lambda function.

3. Process & Architecture:

As the data engineering team, we will use Amazon Kinesis and Amazon Managed Service for Apache Flink to sessionize sample clickstream data and output it to DynamoDB using an AWS Lambda function.

Image description

4. Step-by-Step configuration & walkthrough:

4.1 Starting a Managed Apache Flink Studio Notebook Application

  • Currently, we have 2 data streams in Kinesis:

(1) click-stream data, which will ingest incoming click events from a data source

(2) session-stream data, which will ingest sessionized click stream data to be consumed by an AWS Lambda function

Image description

  • In Managed Apache Flink, create a studio notebook to process the click events and sessionize them using Structured Query Language (SQL).

Image description

4.2 Connecting to the Virtual Machine using EC2 Instance Connect and Simulating a Real-Time Clickstream:

  • Connect to an EC2 instance

Image description

  • Create a JSON file for a click event in the shell


echo '{
  "user_id": "$USER_ID",
  "event_timestamp": "$EVENT_TIMESTAMP",
  "event_name": "$EVENT_NAME",
  "event_type": "click",
  "device_type": "desktop"
}' > click.json


Enter fullscreen mode Exit fullscreen mode

Hint:
šŸ’” - built-inĀ Bash commandĀ echo: print a JSON template

  • Bash shell called redirection: redirects the output of the echo command to a file (creating it if doesn't exist) calledĀ click.json.
  • To put records into Kinesis and simulate a clickstream


DATA_STREAM="click-stream"
USER_IDS=(user1 user2 user3)
EVENTS=(checkout search category detail navigate)
for i in $(seq 1 5000); do
    echo "Iteration: ${i}"
    export USER_ID="${USER_IDS[RANDOM%${#USER_IDS[@]}]}";
    export EVENT_NAME="${EVENTS[RANDOM%${#EVENTS[@]}]}";
    export EVENT_TIMESTAMP=$(($(date +%s) * 1000))
    JSON=$(cat click.json | envsubst)
    echo $JSON
    aws kinesis put-record --stream-name $DATA_STREAM --data "${JSON}" --partition-key 1 --region us-west-2
    session_interval=15
    click_interval=2
    if ! (($i%60)); then
        echo "Sleeping for ${session_interval} seconds" && sleep ${session_interval}
    else
        echo "Sleeping for ${click_interval} second(s)" && sleep ${click_interval}
    fi
done


Enter fullscreen mode Exit fullscreen mode

Hint:
šŸ’” - Setup of sample user ids and event types at the beginning

  • A loop that will executeĀ 5000Ā times and a sleep statement
  • Statements that randomly selectĀ a user id and an event type, and assignĀ them along with the current timestamp to variables
  • A statement that uses theĀ envsubstĀ command toĀ substitute defined environment variables in the JSON template
  • AĀ statement invoking the AWS command-line interface tool, putting the templated JSON recordĀ into theĀ Kinesis Data Stream
  • A condition at the end of the loop thatĀ either sleeps for a few seconds or, periodically for longer, simulating the end of a session
  • The templated JSON and also the JSON response from Kinesis for each record put into the Data Stream

Image description

  • Select the studio notebook, open in Apache Zeppelin and create a new notebook

Image description

  • create tables for the Kinesis Data Streams


%flink.ssql(type = update)
DROP TABLE IF EXISTS click_stream;
CREATE TABLE click_stream (
          user_id STRING,
          event_timestamp BIGINT,
          event_name STRING,
          event_type STRING,
          device_type STRING,
          event_time AS TO_TIMESTAMP(FROM_UNIXTIME(event_timestamp/1000)),
            WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
) WITH (
  'connector' = 'kinesis',
  'stream' = 'click-stream',
  'aws.region' = 'us-west-2',
  'scan.startup.mode' = 'latest-offset',
  'format' = 'json'
);
DROP TABLE IF EXISTS session_stream;
CREATE TABLE session_stream (
          user_id STRING,
          session_timestamp BIGINT,
          session_time TIMESTAMP(3),
          session_id STRING,
          event_type STRING,
          device_type STRING
) PARTITIONED BY (user_id) WITH (
  'connector' = 'kinesis',
  'stream' = 'session-stream',
  'aws.region' = 'us-west-2',
  'format' = 'json'
);


Enter fullscreen mode Exit fullscreen mode

Hint:
šŸ’” - in WITHĀ clause for each table, specify configuration options to configure a connector. In this case, the connector isĀ kinesisĀ denoting that these tables are to be backed with AWS Kinesis Data Streams. The value of theĀ streamĀ option for both match the names of the Kinesis Data Streams you observed earlier.

  • view data from the stream


%flink.ssql
SELECT * FROM click_stream LIMIT 10;


Enter fullscreen mode Exit fullscreen mode
  • The data is being put into theĀ click-streamĀ Kinesis data stream by the bash code ran earlier on the Amazon EC2 instance.

Image description

4.3 Sessionizing the Clickstream Data using Amazon Managed Service for Apache Flink:

Amazon Managed Service for Apache Flink allows you to continuously run SQL on streaming data, processing the data in real-time, and sending the results to many different destinations.

We will use SQL to determine the beginning of a new session for a user in your simulated click stream.

  • Identify events that start a new session


%flink.ssql
SELECT 
  *, 
  CASE WHEN event_timestamp - LAG(event_timestamp) OVER (
    PARTITION BY user_id 
    ORDER BY 
      event_time
  ) >= (10 * 1000) THEN 1 ELSE 0 END as new_session 
FROM 
  click_stream;


Enter fullscreen mode Exit fullscreen mode

šŸ’” This SQL query uses theĀ LAGĀ SQL function to compare a record with a previous record. TheĀ PARTITION BY user_idĀ clause means it will restrict comparisons to records where theĀ user_idĀ is the same.

TheĀ ORDER BY event_timeĀ clause means records will be compared in ascending order.

When the difference between theĀ event_timestampĀ field of two records is greater than ten seconds theĀ new_sessionĀ field will be 1, denoting the start of a new session, WhenĀ new_essionĀ is 0, it denotes the event is a continuation of a session.

Notice that the comparison multiplies ten by a thousand. This is because the event_timestamp field isĀ Unix timeĀ and includes milliseconds.Ā  So multiplying by a thousand is required to get ten seconds in milliseconds.

  • To get only the new session boundaries


%flink.ssql
SELECT 
  *, 
  user_id || '_' || CAST(
    SUM(new_session) OVER (
      PARTITION BY user_id 
      ORDER BY 
        event_time
    ) AS STRING
  ) AS session_id 
FROM 
  (
    SELECT 
      *, 
      CASE WHEN event_timestamp - LAG(event_timestamp) OVER (
        PARTITION BY user_id 
        ORDER BY 
          event_time
      ) >= (10 * 1000) THEN 1 ELSE 0 END as new_session 
    FROM 
      click_stream
  ) 
WHERE 
  new_session = 1


Enter fullscreen mode Exit fullscreen mode

This query wraps the previous query with an insert statement that puts records into theĀ session_streamĀ table. Because this query produces no output and is expected to be long-running, you will see aĀ DurationĀ displayed at the bottom of the paragraph that shows how long the query has been running.

Image description

šŸ’” Amazon Managed Service for Apache Flink can be customized using Java JAR files you provide yourself. This means you could use Java code to load or deliver data to and from any network-accessible system. As an example, data could be loaded from a legacy API that has a Java library available.

4.4 Storing Sessions in an Amazon DynamoDB Table with AWS Lambda:

  • In AWS Lambda dashboard, in source code session, update source code:


from __future__ import print_function
import boto3
import base64
from json import loads
dynamodb_client = boto3.client('dynamodb')
table_name = "Table_Name"   #change to your table name
def lambda_handler(event, context):
    records = event['Records']
    output = []
    success = 0
    failure = 0
    for record in records:
        try:
            payload = base64.b64decode(record['kinesis']['data'])
            data_item = loads(payload)
            ddb_item = {
                'session_id': {'S': data_item['session_id']},
                'session_time': {'S': data_item['session_time']},
                'user_id': {'S': data_item['user_id']}
            }
            dynamodb_client.put_item(TableName=table_name, Item=ddb_item)
            success += 1
            output.append({'recordId': record['eventID'], 'result': 'Ok'})
        except Exception:
            failure += 1
            output.append({'recordId': record['eventID'], 'result': 'DeliveryFailed'})
    print('Successfully delivered {0} records, failed to deliver {1} records'.format(success, failure))
    return {'records': output}


Enter fullscreen mode Exit fullscreen mode
  • Configure the session data stream events as an event source for this function

Trigger configuration:

  • Select a source: SelectĀ Kinesis
  • Kinesis stream: SelectĀ session-stream

Image description

  • In DynamoDB, go to Explore Items section

Image description

We are processing the records in theĀ session-streamĀ Kinesis data stream with an AWS Lambda function and putting the records into a DynamoDB table.

In this lab, you started a Kinesis Data Analytics application, simulated a real-time click-stream, sessionized the click-stream using your Kinesis Data Analytics application, and finally, you processed the output of your Kinesis Data Analytics stream with a Lambda function that sends the data to a DynamoDB table.

5. Conclusion:

In this walkthrough, we have:

  • Started a Kinesis Data Analytics application
  • Simulated a real-time clickstream
  • Sessionized the clickstream using the Kinesis Data Analytics application
  • Processed the output of the Kinesis Data Analytics stream with a Lambda function
  • Sent the processed data to a DynamoDB table using the Lambda function

Reference and Further Readings:

[1] What is streaming data? https://aws.amazon.com/streaming-data/#:~:text=Streaming data includes a wide,devices or instrumentation in data

[2] Amazon Managed Service for Apache Flink https://aws.amazon.com/managed-service-apache-flink/

[3] Amazon DynamoDB https://aws.amazon.com/managed-service-apache-flink/

[4] AWS Lambda https://aws.amazon.com/pm/lambda/?gclid=Cj0KCQjw-uK0BhC0ARIsANQtgGP7fWsSMGCsLXcj3w6sDSsTJpmPc4m0SmMWTzgw_xFKWQcPuUdh1IMaAtKmEALw_wcB&trk=d87368f2-b0ac-4e30-804b-b10e2d25d291&sc_channel=ps&ef_id=Cj0KCQjw-uK0BhC0ARIsANQtgGP7fWsSMGCsLXcj3w6sDSsTJpmPc4m0SmMWTzgw_xFKWQcPuUdh1IMaAtKmEALw_wcB:G:s&s_kwcid=AL!4422!3!651612781100!e!!g!!aws lambda!19836398320!150095228874

Top comments (2)

Collapse
 
andrealiao profile image
Andrea Liao

Very informative !

Collapse
 
abdullahparacha profile image
Abdullah Paracha

Thank you @andrealiao