DEV Community

Timothy Spann.   πŸ‡ΊπŸ‡¦
Timothy Spann. πŸ‡ΊπŸ‡¦

Posted on • Originally published at datainmotion.dev on

Ingesting Websocket Data for Live Stock Streams with Cloudera Flow Management Powered by Apache NiFi

Ingesting Websocket Data for Live Stock Streams with Cloudera Flow Management Powered by Apache NiFi

The stocks I follow have a lot of trades and changes throughout the day, I would like to capture all of this data and make it available to my colleagues. I will push it to Kafka and make it available via a topic and I may also push it to Slack or Dischord or a webpage or dashboard or Cloudera Visual App dashboard. We'll see what people request.

We will read websockets from wss://ws.finnhub.io?token=YOURTOKEN. You will need to sign up for a finnhub.io account to get this data. The API is well documented and very easy to use with Apache NiFi.

As updates happen we receive websocket calls and send them to Kafka for use in Flink SQL, Kafka Connect, Spark Streaming, Kafka Streams, Python, .Java Spring Boot Apps, NET Apps and NIFi.

Definition of Fields

s

Symbol.

p

Last price.

t

UNIX milliseconds timestamp.

v

Volume.

c

List of trade conditions. A comprehensive list of trade conditions code can be found here

Incoming Websocket Text Message Processing

We parse out the fields we want, then rename them for something readable. Then we build a new JSON field that matches our trades schema then we push to Kafka.

First step we need to setup a controller pool to connect to finnhub's web socket API.

We can see data in flight via NiFi Provenance.

The detailed steps and settings for converting raw websocket text messages to final messages to send to Kafka.

Raw Data From Websockets Text Message

Formatted JSON Data Before Converting and Sending to Kafka Topic (trades)

We can view the final clean data in Kafka via Cloudera Streams Messaging Manager (SMM)

Schema

https://github.com/tspannhw/ApacheConAtHome2020/blob/main/schemas/trades.avsc

Happy Holidays from Tim and the Streaming Felines!

Reference

Related Youtube Video

Top comments (1)

Collapse
 
ajazurrahman07 profile image
ajazurrahman07

hi Tim,
i am trying to replicate the example. I was not able to get through the first step i.e. connect websocket. Can you please share the nifi settings you are using in connect websocket to reterive the data from the api. Also if i want to connect to websocket using wss, how can i specify an bearer authorization in connectwebsocket.