Abstract
Continuing our series on using Apache Spark with SingleStore, we'll look at a simple example of how to read the data in a set of local text files, create vector embeddings and save the file data and embeddings in SingleStore using Spark's Structured Streaming.
The notebook file used in this article is available on GitHub.
Create a SingleStore Cloud account
A previous article showed the steps to create a free SingleStore Cloud account. We'll use the following settings:
- Workspace Group Name: Spark Demo Group
- Cloud Provider: AWS
- Region: US East 1 (N. Virginia)
- Workspace Name: spark-demo
- Size: S-00
We'll make a note of the password and store it in the secrets vault using the name password
. We'll also save our OpenAI API Key using the name OPENAI_API_KEY
.
Create a new notebook
From the left navigation pane in the cloud portal, we'll select DEVELOP > Data Studio.
In the top right of the web page, we'll select New Notebook > New Notebook, as shown in Figure 1.
We'll call the notebook spark_streaming_demo, select a Blank notebook template from the available options, and save it in the Personal location.
Fill out the notebook
First, let's install Java:
!conda install -y --quiet -c conda-forge openjdk=8
Next, we'll install some libraries:
!pip install openai==0.28 --quiet
!pip install nltk --quiet
!pip install pyspark --quiet
We'll enter our OpenAI API Key
:
os.environ["OPENAI_API_KEY"] = get_secret("OPENAI_API_KEY")
Now we'll create two directories to store some jar files and data files:
os.makedirs("jars", exist_ok = True)
os.makedirs("data", exist_ok = True)
We'll now download some jar files, as follows:
def download_jar(url, destination):
response = requests.get(url)
with open(destination, "wb") as f:
f.write(response.content)
jar_urls = [
("https://repo1.maven.org/maven2/com/singlestore/singlestore-jdbc-client/1.2.4/singlestore-jdbc-client-1.2.4.jar", "jars/singlestore-jdbc-client-1.2.4.jar"),
("https://repo1.maven.org/maven2/com/singlestore/singlestore-spark-connector_2.12/4.1.8-spark-3.5.0/singlestore-spark-connector_2.12-4.1.8-spark-3.5.0.jar", "jars/singlestore-spark-connector_2.12-4.1.8-spark-3.5.0.jar"),
("https://repo1.maven.org/maven2/org/apache/commons/commons-dbcp2/2.12.0/commons-dbcp2-2.12.0.jar", "jars/commons-dbcp2-2.12.0.jar"),
("https://repo1.maven.org/maven2/org/apache/commons/commons-pool2/2.12.0/commons-pool2-2.12.0.jar", "jars/commons-pool2-2.12.0.jar"),
("https://repo1.maven.org/maven2/io/spray/spray-json_3/1.3.6/spray-json_3-1.3.6.jar", "jars/spray-json_3-1.3.6.jar")
]
for url, destination in jar_urls:
download_jar(url, destination)
print("JAR files downloaded successfully")
These jar files include the SingleStore JDBC Client and the SingleStore Spark Connector, as well as several other jar files needed for connectivity and data management.
Now we are ready to create a SparkSession
:
from pyspark.sql import SparkSession
# Create a Spark session
spark = (SparkSession
.builder
.config("spark.jars", ",".join([destination for _, destination in jar_urls]))
.appName("Spark Streaming Test")
.getOrCreate()
)
spark.sparkContext.setLogLevel("ERROR")
We need some data and we need to create some files. We can create a small number of files with each file containing a meaningful sentence, as follows:
# Download NLTK
nltk.download("punkt_tab")
nltk.download("averaged_perceptron_tagger")
nltk.download("wordnet")
nltk.download("omw")
# Define the directory to save the files
output_dir = "data"
# Generate meaningful sentences
def generate_meaningful_sentence():
# Choose a random set of synonyms from WordNet
synset = random.choice(list(wn.all_synsets()))
# Generate a sentence
definition = synset.definition()
tokens = word_tokenize(definition)
# Capitalise the first word and end with a period
tokens[0] = tokens[0].capitalize()
tokens[-1] = tokens[-1] + "."
return " ".join(tokens)
# Number of files to generate
num_files = 5
# Number of sentences in each file
num_sentences_per_file = 1
# Generate text files
for i in range(num_files):
file_path = os.path.join(output_dir, f"file_{i+1}.txt")
with open(file_path, "w") as file:
for _ in range(num_sentences_per_file):
# Generate a meaningful sentence
sentence = generate_meaningful_sentence()
file.write(sentence + "\n")
We can easily check the contents of the files using the following:
# Specify the directory containing the .txt files
directory = Path("data")
# Loop through each .txt file in the directory
for file_path in directory.glob("*.txt"):
print(f"File: {file_path}")
# Read and print the contents of the file
with file_path.open() as file:
print(file.read())
print("----------------------")
Example output:
File: data/file_1.txt
A small vehicle with four wheels in which a baby or child is pushed around.
----------------------
File: data/file_2.txt
Make a duplicate or duplicates of.
----------------------
File: data/file_3.txt
A symbol in a logical or mathematical expression that can be replaced by the name of any member of specified set.
----------------------
File: data/file_4.txt
Lean dried meat pounded fine and mixed with melted fat ; used especially by North American Indians.
----------------------
File: data/file_5.txt
Take liquid out of a container or well.
----------------------
A database is required, so we'll create one:
DROP DATABASE IF EXISTS spark_demo;
CREATE DATABASE IF NOT EXISTS spark_demo;
We'll also create a table to store the file data and vector embeddings:
USE spark_demo;
DROP TABLE IF EXISTS streaming;
CREATE TABLE IF NOT EXISTS streaming (
value TEXT,
file_name TEXT,
embedding VECTOR(1536) NOT NULL
);
We'll now prepare the connection to SingleStore:
from sqlalchemy import *
db_connection = create_engine(connection_url)
url = db_connection.url
Now we'll create the Spark connection to SingleStore:
password = get_secret("password")
host = url.host
port = url.port
cluster = host + ":" + str(port)
We also need to set some configuration parameters:
spark.conf.set("spark.datasource.singlestore.ddlEndpoint", cluster)
spark.conf.set("spark.datasource.singlestore.user", "admin")
spark.conf.set("spark.datasource.singlestore.password", password)
spark.conf.set("spark.datasource.singlestore.disablePushdown", "false")
Now we'll define and register a UDF to convert the text in each file to vector embeddings, as follows:
openai.api_key = os.environ.get("OPENAI_API_KEY")
# Generate embeddings for text
def generate_embeddings(text):
# Generate embeddings for text using OpenAI
return openai.Embedding.create(
input = text,
engine = "text-embedding-3-small"
).data[0].embedding
# Register the function as a UDF
generate_embeddings_udf = udf(generate_embeddings, StringType())
Now we are ready to read the data in each file, convert the text to embeddings and write the data to SingleStore:
input_dir = output_dir
# Read from the directory
df = (spark.readStream
.format("text")
.option("path", input_dir)
.load()
.withColumn("file_name", input_file_name())
)
# Apply the function to the DataFrame to generate embeddings for each row
df_with_embeddings = df.withColumn("embedding", generate_embeddings_udf("value"))
# Write each batch of data to SingleStore
def write_to_singlestore(df_with_embeddings, epoch_id):
(df_with_embeddings.write
.format("singlestore")
.option("loadDataCompression", "LZ4")
.mode("append")
.save("spark_demo.streaming")
)
# Write the streaming DataFrame to SingleStore using foreachBatch
query = (df_with_embeddings.writeStream
.foreachBatch(write_to_singlestore)
.start()
)
# Wait for the query to finish processing
while query.isActive:
time.sleep(1)
if not query.status["isDataAvailable"]:
query.stop()
We can use an SQL statement to quickly view the data:
USE spark_demo;
SELECT
SUBSTR(value, 1, 30) AS value,
SUBSTR(file_name, LENGTH(file_name) - 9) AS file_name,
SUBSTR(JSON_ARRAY_UNPACK(embedding :> BLOB), 1, 50) AS embedding
FROM streaming;
Finally, we can stop Spark:
spark.stop()
Summary
In this short article, we used Spark's Structured Streaming to read data from text files located in a local directory, created vector embeddings for the data in each file and stored the file data and embeddings in SingleStore.
We could improve the code to keep monitoring the directory for new files that may arrive. We could also improve the code to manage larger quantities of text in each data file by chunking the data, which would require some schema changes so that we could correctly manage all the data for each data file.
Top comments (0)