As part of the ongoing series in which I exploring different ways to use the ingest stream to load data into Quine, I want to cover one of Quine's specialities: building a streaming graph from multiple data sources. This time, we'll work with CSV
data exported from IMDb to answer the question; "Which actors have acted in and directed the same movie?"
The CSV
Files
Usually, if someone says that they have data, most likely it's going to be in CSV
format or pretty darn close to it. (Or JSON
, but that is another blog post.) In our case, we have two files filled with data in CSV
format. Let's inspect what's inside.
File 1: movieData.csv
The movieData.csv
file contains records for actors, movies, and the actor's relationship to the movie. Conveniently, each record type has a schema, flattened into rows during export.
Should we separate the data back into discrete files and then load them? No, we can set up separate ingest streams to act on each data type in the file. Effectively, we will separate the "jobs to do" into Cypher queries and stream in the data.
File 2: ratingData.csv
Our second file, ratingData.csv
is very straightforward. It contains 100,000 rows of movie ratings. Adding the ratings
data into our model completes our discovery phase for the supplied data.
The CypherCsv
Ingest Stream
The Quine API documentation defines the schema of the File Ingest Format ingest stream for us. The schema is robust and accommodates CSV
, JSON
, and line
file types. Please take a moment to read through the documentation. Be sure to select type: FileIngest
-> format: CypherCsv
using the API documentation dropdowns.
I define ingest streams to transform and load the movie data into Quine. Quine ingest streams behave independently and in parallel when processing files. This means that we can have multiple ingest streams operating on a single file. This is the case for the movieData.csv
file because there are several operations that we need to perform on multiple types of data.
Movie Rows
The first ingest stream that I set up will address the Movie
rows in the movieData.csv
file. There are 9125 movies in the data set. I create two nodes from each Movie row using an ingest query, movie
and genre
. I store all of the movie data as properties in the Movie mode.
WITH $that AS row
MATCH (m) WHERE row.Entity = 'Movie' AND id(m) = idFrom("Movie", row.movieId)
SET
m:Movie,
m.tmdbId = row.tmdbId,
m.imdbId = row.imdbId,
m.imdbRating = toFloat(row.imdbRating),
m.released = row.released,
m.title = row.title,
m.year = toInteger(row.year),
m.poster = row.poster,
m.runtime = toInteger(row.runtime),
m.countries = split(coalesce(row.countries,""), "|"),
m.imdbVotes = toInteger(row.imdbVotes),
m.revenue = toInteger(row.revenue),
m.plot = row.plot,
m.url = row.url,
m.budget = toInteger(row.budget),
m.languages = split(coalesce(row.languages,""), "|"),
m.movieId = row.movieId
WITH m,split(coalesce(row.genres,""), "|") AS genres
UNWIND genres AS genre
WITH m, genre
MATCH (g) WHERE id(g) = idFrom("Genre", genre)
SET g.genre = genre, g:Genre
MERGE (m:Movie)-[:IN_GENRE]->(g:Genre)
Quine passes each line to the ingest stream via the variable $that
to which I assign the identity row
. A MATCH
is made when the row.Entity
value is Movie
and a node id
is returned from the idFrom()
function. SET
is used to give the node a label and to store metadata as node properties.
Each movie row has a pipe |
delimited list of genres in the genres
column. I split the column value apart and created a Genre node for each genre in the list, labeled and containing the genre as a property.
Finally, the Movie
node is related to the Genre
node with MERGE
.
Person Rows
The second ingest stream addresses the Person
rows in the same way I did for the Movie
rows. There are 19047 person records in the movieData.csv
file.
WITH $that AS row
MATCH (p) WHERE row.Entity = "Person" AND id(p) = idFrom("Person", row.tmdbId)
SET
p:Person,
p.imdbId = row.imdbId,
p.bornIn = row.bornIn,
p.name = row.name,
p.bio = row.bio,
p.poster = row.poster,
p.url = row.url,
p.born = row.born,
p.died = row.died,
p.tmdbId = row.tmdbId,
p.born = CASE row.born WHEN "" THEN null ELSE datetime(row.born + "T00:00:00Z") END,
p.died = CASE row.died WHEN "" THEN null ELSE datetime(row.died + "T00:00:00Z") END
The ingest query in this ingest stream matches when the row.Entity
is Person
, creates a node using the idFrom()
function, and stores the Person metadata in node parameters.
Join Rows
Looking at the rows that have Join
in the Entity
column leads me to believe that the data in this CSV
file originated from a relational database. There are two types of joins in the file, Acted
and Directed
. The ingest queries below process them.
Acted In
WITH $that AS row
WITH row WHERE row.Entity = "Join" AND row.Work = "Acting"
MATCH (p) WHERE id(p) = idFrom("Person", row.tmdbId)
MATCH (m) WHERE id(m) = idFrom("Movie", row.movieId)
MATCH (r) WHERE id(r) = idFrom("Role", row.tmdbId, row.movieId, row.role)
SET
r.role = row.role,
r.movie = row.movieId,
r.tmdbId = row.tmdbId,
r:Role
MERGE (p:Person)-[:PLAYED]->(r:Role)<-[:HAS_ROLE]-(m:Movie)
MERGE (p:Person)-[:ACTED_IN]->(m:Movie)
Acted join rows create relationships between Person, Role, and Movie nodes. There are two paths created from the Person nodes. The first path (p)-[:PLAYED]->(r)<-[:HAS_ROLE]-(m)
establishes the relationship between actors (Person) and the roles they have played as well as the roles in a movie (Movies). A second path is formed that directly relates an actor to movies they acted in.
Directed
WITH $that AS row
WITH row WHERE row.Entity = "Join" AND row.Work = "Directing"
MATCH (p) WHERE id(p) = idFrom("Person", row.tmdbId)
MATCH (m) WHERE id(m) = idFrom("Movie", row.movieId)
MERGE (p:Person)-[:DIRECTED]->(m:Movie)
The Directed ingest query matches join rows and creates a path relating directors with the movies they have directed.
Ratings
WITH $that AS row
MATCH (m) WHERE id(m) = idFrom("Movie", row.movieId)
MATCH (u) WHERE id(u) = idFrom("User", row.userId)
MATCH (rtg) WHERE id(rtg) = idFrom("Rating", row.movieId, row.userId, row.rating)
SET u.name = row.name, u:User
SET rtg.rating = row.rating,
rtg.timestamp = toInteger(row.timestamp),
rtg:Rating
MERGE (u:User)-[:SUBMITTED]->(rtg:Rating)<-[:HAS_RATING]-(m:Movie)
MERGE (u:User)-[:RATED]->(m:Movie)
The last ingest query processes rows from the ratingData.csv
file. The query creates User and Rating nodes, then relates them together.
Running the Recipe
As my project progressed, I developed a Quine recipe to load my CSV
files and perform the analysis. Running the recipe requires a couple of Quine options to pass in the locations of the CSV
files and an updated configuration setting.
java \
-Dquine.in-memory-soft-node-limit=30000 \
-jar ../releases/latest -r movieData \
--recipe-value movie_file=movieData.csv \
--recipe-value rating_file=ratingData.csv
After ingesting the CSV
files, it results in the data set stored in Quine:
The orange Movie and Person nodes are created directly from the Entity
column in movieData.csv
. The User node is from ratingData.csv
and the green nodes were derived from data stored within an entity row. The ActedDirected
relationship is built by the standing query in the recipe.
Answering the Question
Getting all of this data into Quine was only part of the challenge. Remember the question that we were asked, "which actors have acted in and directed the same movie?"
Quine is a streaming graph; if we were to connect the ingest streams to the streaming source, rather than CSV
files, the standing query inside of the recipe that I developed would answer the question for movies in the past as well as movies in the future.
Our standing query matches when a complete pattern for the situation when an actor (Person
) both ACTED_IN
and DIRECTED
the same movie.
MATCH (a:Movie)<-[:ACTED_IN]-(p:Person)-[:DIRECTED]->(m:Movie)
WHERE id(a) = id(m)
RETURN id(m) as movieId, m.title as Movie, id(p) as personId, p.name as Actor
When the standing query completes a match, it processes the movie id
and person id
through the output query and actions.
standingQueries:
- pattern:
type: Cypher
mode: MultipleValues
query: |-
MATCH (a:Movie)<-[:ACTED_IN]-(p:Person)-[:DIRECTED]->(m:Movie)
WHERE id(a) = id(m)
RETURN id(m) as movieId, m.title as Movie, id(p) as personId, p.name as Actor
outputs:
set-ActedDirected:
type: CypherQuery
query: |-
MATCH (m),(p)
WHERE strId(m) = $that.data.movie AND strId(p) = $that.data.person
MERGE (p:Person)-[:ActedDirected]->(m:Movie)
log-actor-director:
type: WriteToFile
path: "ActorDirector.jsonl"
My standing query creates a new ActedDirected
relationship between the Person and Movie nodes, then logs the relationship.
Four hundred ninety-one actors acted in and directed the same movie in our data set.
{
"data": {
"Actor": "Clint Eastwood",
"Movie": "Unforgiven",
"movieId": "4a6d64c8-9c90-3362-b443-4d2e7b2fb9d1",
"personId": "4638a820-3b68-3fc7-9fa7-341e876b701e"
}
}
Conclusion
Phew, we made it through! And we learned a lot along the way.
- CSV data is streamed into Quine
- Quine can read from external files and streaming providers
- You can ingest multiple streams at once, movies and reviewers, and combine them into one streaming graph
- Always separate ingest queries using the jobs to be done framework
Quine is open source if you want to run this analysis for yourself. Download a precompiled version or build it yourself from the codebase (Quine Github). I published the recipe that I developed at https://quine.io/recipes
. The page has instructions for downloading the CSV
files and running the recipe.
Have a question, suggestion, or improvement? I welcome your feedback! Please drop in to Quine Slack and let me know. I'm always happy to discuss Quine or answer questions.
Top comments (0)