Disclaimer
The stock data used in this article is entirely fictitious. It is purely for demo purposes. Please do not use this data for making any financial decisions.
Abstract
This article will show how to connect a Kafka broker, streaming example stock tick data, to SingleStoreDB. We'll then query the data using English sentences through LangChain, which provides a basic question-and-answer capability for the tick data. We'll build a Python application, through several design iterations, to use OpenAI's Whisper to ask questions through speech and use speech synthesis to reply.
The notebook file, SQL and Python code are available on GitHub.
Introduction
The ability to ask questions about a database system using natural language is familiar. However, it has become much easier to implement with modern tools like LangChain and OpenAI's Whisper. In this article, we'll see how.
Create a SingleStoreDB Cloud account
A previous article showed the steps to create a free SingleStoreDB Cloud account. We'll use the following settings:
- Workspace Group Name: Whisper Demo Group
- Cloud Provider: AWS
- Region: US East 1 (N. Virginia)
- Workspace Name: whisper-demo
- Size: S-00
-
Settings:
- None selected
Once the workspace is available, we'll make a note of our password, host and port. The host and port will be available from whisper-demo > Connect > SQL IDE > Host. We'll need this information later for a Python application. We'll temporarily allow access from anywhere by configuring the firewall under Whisper Demo Group > Firewall.
Create a Database and Tables
From the left navigation pane, we'll select DEVELOP > Data Studio > Open SQL Editor to create a timeseries_db
database, tick
and stock_sentiment
tables, as follows:
DROP DATABASE IF EXISTS timeseries_db;
CREATE DATABASE IF NOT EXISTS timeseries_db;
USE timeseries_db;
DROP TABLE IF EXISTS tick;
CREATE TABLE IF NOT EXISTS tick (
symbol VARCHAR(10),
ts DATETIME SERIES TIMESTAMP,
open NUMERIC(18, 2),
high NUMERIC(18, 2),
low NUMERIC(18, 2),
price NUMERIC(18, 2),
volume INT,
KEY(ts)
);
DROP TABLE IF EXISTS stock_sentiment;
CREATE TABLE IF NOT EXISTS stock_sentiment (
headline VARCHAR(250),
positive FLOAT,
negative FLOAT,
neutral FLOAT,
url TEXT,
publisher VARCHAR(30),
ts DATETIME,
symbol VARCHAR(10)
);
Create a Pipeline
Pipelines allow us to create streaming ingest feeds from various sources, such as Kafka, S3 and HDFS, using a single command. With pipelines, we can also perform ETL operations.
For our use case, we'll create a simple pipeline in SingleStoreDB as follows:
CREATE PIPELINE tick
AS LOAD DATA KAFKA 'public-kafka.memcompute.com:9092/stockticker'
BATCH_INTERVAL 45000
INTO TABLE tick
FIELDS TERMINATED BY ','
(symbol,ts,open,high,low,price,volume);
We'll control the rate of data ingestion using the BATCH_INTERVAL
. Initially, we'll set this to 45000 milliseconds.
We'll configure the pipeline to start from the earliest offset, as follows:
ALTER PIPELINE tick SET OFFSETS EARLIEST;
and we'll test the pipeline before we start running it, as follows:
TEST PIPELINE tick LIMIT 1;
The output should be similar to the following:
+--------+---------------------+-------+-------+-------+-------+--------+
| symbol | ts | open | high | low | price | volume |
+--------+---------------------+-------+-------+-------+-------+--------+
| AIV | 2023-09-05 06:47:53 | 44.89 | 44.89 | 44.88 | 44.89 | 719 |
+--------+---------------------+-------+-------+-------+-------+--------+
Remember, this is fictitious data
We'll now start the pipeline:
START PIPELINE tick;
Load Stock Sentiment Data
We'll now load the data into the stock_sentiment
table. The data are derived from Kaggle. First, we'll unpack the zip file that contains the SQL file. Next, we'll load the SQL file into SingleStoreDB Cloud using a MySQL client, as follows:
mysql -u admin -p<password> -h <host> timeseries_db < /path/to/stock_sentiment.sql
The <password>
and <host>
being replaced with the values we obtained from SingleStoreDB Cloud earlier. The /path/to/
replaced with the actual path to where the SQL file is located.
Load the Notebook
We'll use the notebook file available on GitHub. From the left navigation pane in SingleStoreDB Cloud, we'll select Data Studio. In the top right of the web page will be New Notebook with a pulldown that has two options:
- New Notebook
- Import From File
We'll select the second option, locate the notebook file we downloaded from GitHub and load it into SingleStoreDB Cloud.
Run the Notebook
We'll start by selecting the Connection (whisper-demo) and Database (timeseries_db) using the drop-down menus above the notebook.
Analysing Time Series Data
Part 1 of the notebook contains a set of Time Series operations on the data in the tick
table. These operations were described in greater detail in a previous article.
LangChain OnlinePDFLoader
Part 2 of the notebook loads the contents of a PDF document and vector embeddings into a table called fintech_docs
. These operations were described in greater detail in a previous article. Replace <PDF document URL>
with the hyperlink of your chosen FinTech document:
loader = OnlinePDFLoader("<PDF document URL>")
data = loader.load()
The Fully Qualified Domain Name (FQDN) where the PDF file is located must be added to the firewall by selecting the Data Studio > Firewall option.
We'll use ChatGPT to answer questions on the PDF file, such as:
"What are the best investment opportunities in Blockchain?"
LangChain SQL Agent
Part 3 of the notebook contains LangChain agent operations on the data in the tick
and stock_sentiment
tables. This will be the main focus of this article.
We'll configure the LangChain toolkit and agent, as follows:
db = SQLDatabase.from_uri(connection_url, include_tables = ["tick", "stock_sentiment"])
llm = ChatOpenAI(model = "gpt-4o-mini", temperature = 0, verbose = False)
toolkit = SQLDatabaseToolkit(db = db, llm = llm)
agent_executor = create_sql_agent(
llm = llm,
toolkit = toolkit,
max_iterations = 15,
max_execution_time = 60,
top_k = 3,
verbose = False
)
We'll now test the code using an example query, as follows:
query1 = (
"From the tick table, which stock symbol saw the\n"
"least volatility in share trading in the dataset?"
)
result1 = run_agent_query(query1, agent_executor, error_string)
print(result1)
Here is some example output:
The stock symbol with the least volatility in share trading is FTR, with a volatility of 0.55.
We'll make this a little more interactive using the following code:
query2 = input("Please enter your question:")
result2 = run_agent_query(query2, agent_executor, error_string)
print(result2)
We'll test this with an example query:
How many records are in the tick table?
Here is some example output:
There are 21,188,124 records in the tick table.
To see the chain output, we'll set verbose
to True
, as follows:
agent_executor = create_sql_agent(
llm = llm,
toolkit = toolkit,
max_iterations = 15,
max_execution_time = 60,
top_k = 3,
verbose = True
)
An example of chain output from another database was shown in a previous article.
Bonus: Build a Visual Python Application
We'll now build a simple Python application that uses OpenAI's Whisper to ask questions about the database system. An excellent article (🔗❌) inspired this application.
Install the Required Software
For this article, this is the software that was required in a clean install of Ubuntu 22.04.2 running in a VMware Fusion Virtual Machine:
sudo apt install ffmpeg
sudo apt install libespeak1
sudo apt install portaudio19-dev
sudo apt install python3-tk
sudo apt install python3-pil python3-pil.imagetk
as well as the following packages:
langchain
langchain-community
langchain-openai
matplotlib
openai
openai-whisper
pyaudio
pyttsx3
sqlalchemy-singlestoredb
wave
These can be found in the requirements.txt
file on GitHub. Run the file as follows:
pip install -r requirements.txt
openai-whisper
may take a while to install.
We'll need to provide an OpenAI API Key
in our environment. For example:
export OPENAI_API_KEY="<OpenAI API Key>"
Replace <OpenAI API Key>
with your key.
In each of the following applications, we have the following code:
s2_user = "<username>"
s2_password = "<password>"
s2_host = "<host>"
s2_port = <port>
s2_db = "<database>"
db = SQLDatabase.from_uri(
f"singlestoredb://{s2_user}:{s2_password}@{s2_host}:{s2_port}/{s2_db}"
"?ssl_ca=/path/to/singlestore_bundle.pem",
include_tables = ["tick", "stock_sentiment"]
)
We'll fill-in the values from our environment. The singlestore_bundle.pem
file is required if using the Free Shared Tier, otherwise this line can be removed:
"?ssl_ca=/path/to/singlestore_bundle.pem"
First Iteration
Let's start with a simple visual Python application using Tkinter. We'll also add voice recognition using OpenAI's Whisper. The application enables up to 20 seconds of recorded speech. It can be run as follows:
python3 record-transcribe.py
Example output is shown in Figure 1.
Second Iteration
In the next iteration, we'll add an Audio Waveform. We'll run the program as follows:
python3 record-transcribe-visualise.py
Example output is shown in Figure 2.
Third Iteration
In the third and final iteration, we'll remove a text-based response by the application and replace it with speech synthesis. We'll run the program as follows:
python3 record-transcribe-visualise-speak.py
Example output is shown in Figure 3.
In the code for all three iterations, we can call the OpenAI Whisper API instead of using the local Whisper installation. To do this, we'd uncomment these lines of code in the transcribe_audio
function:
# from openai import OpenAI
# client = OpenAI()
# transcript = client.audio.transcriptions.create(
# model = "whisper-1",
# file = audio_file,
# response_format = "text",
# language = "en"
# )
# return transcript.strip()
and comment out these lines of code:
transcript = model.transcribe(filename)
return transcript["text"].strip()
However, calling the OpenAI Whisper API would incur additional costs. The local Whisper installation already provides excellent results.
Summary
In this article, we've seen that without creating vector embeddings, we've been able to access our data quite effectively using LangChain. However, our queries need to be more focused, and we need to understand the database schema before asking questions. Integrating a speech capability enables our applications to be more widely used.
Top comments (0)