DEV Community

Cover image for Ingesting From Multiple Data Sources into Quine Streaming Graphs
Michael Aglietti
Michael Aglietti

Posted on • Edited on • Originally published at thatdot.com

Ingesting From Multiple Data Sources into Quine Streaming Graphs

As part of the ongoing series in which I exploring different ways to use the ingest stream to load data into Quine, I want to cover one of Quine's specialities: building a streaming graph from multiple data sources. This time, we'll work with CSV data exported from IMDb to answer the question; "Which actors have acted in and directed the same movie?"

The CSV Files

Usually, if someone says that they have data, most likely it's going to be in CSV format or pretty darn close to it. (Or JSON, but that is another blog post.) In our case, we have two files filled with data in CSV format. Let's inspect what's inside.

File 1: movieData.csv

The movieData.csv file contains records for actors, movies, and the actor's relationship to the movie. Conveniently, each record type has a schema, flattened into rows during export.

Should we separate the data back into discrete files and then load them? No, we can set up separate ingest streams to act on each data type in the file. Effectively, we will separate the "jobs to do" into Cypher queries and stream in the data.

File 2: ratingData.csv

Our second file, ratingData.csv is very straightforward. It contains 100,000 rows of movie ratings. Adding the ratings data into our model completes our discovery phase for the supplied data.

Original implied schema of IMDB data.

The CypherCsv Ingest Stream

The Quine API documentation defines the schema of the File Ingest Format ingest stream for us. The schema is robust and accommodates CSV, JSON, and line file types. Please take a moment to read through the documentation. Be sure to select type: FileIngest -> format: CypherCsv using the API documentation dropdowns.

I define ingest streams to transform and load the movie data into Quine. Quine ingest streams behave independently and in parallel when processing files. This means that we can have multiple ingest streams operating on a single file. This is the case for the movieData.csv file because there are several operations that we need to perform on multiple types of data.

Movie Rows

The first ingest stream that I set up will address the Movie rows in the movieData.csv file. There are 9125 movies in the data set. I create two nodes from each Movie row using an ingest query, movie and genre. I store all of the movie data as properties in the Movie mode.

WITH $that AS row
MATCH (m) WHERE row.Entity = 'Movie' AND id(m) = idFrom("Movie", row.movieId)
SET
  m:Movie,
  m.tmdbId = row.tmdbId,
  m.imdbId = row.imdbId,
  m.imdbRating = toFloat(row.imdbRating),
  m.released = row.released,
  m.title = row.title,
  m.year = toInteger(row.year),
  m.poster = row.poster,
  m.runtime = toInteger(row.runtime),
  m.countries = split(coalesce(row.countries,""), "|"),
  m.imdbVotes = toInteger(row.imdbVotes),
  m.revenue = toInteger(row.revenue),
  m.plot = row.plot,
  m.url = row.url,
  m.budget = toInteger(row.budget),
  m.languages = split(coalesce(row.languages,""), "|"),
  m.movieId = row.movieId
WITH m,split(coalesce(row.genres,""), "|") AS genres
UNWIND genres AS genre
WITH m, genre
MATCH (g) WHERE id(g) = idFrom("Genre", genre)
SET g.genre = genre, g:Genre
MERGE (m:Movie)-[:IN_GENRE]->(g:Genre)
Enter fullscreen mode Exit fullscreen mode

Quine passes each line to the ingest stream via the variable $that to which I assign the identity row. A MATCH is made when the row.Entity value is Movie and a node id is returned from the idFrom() function. SET is used to give the node a label and to store metadata as node properties.

Each movie row has a pipe | delimited list of genres in the genres column. I split the column value apart and created a Genre node for each genre in the list, labeled and containing the genre as a property.

Finally, the Movie node is related to the Genre node with MERGE.

Person Rows

The second ingest stream addresses the Person rows in the same way I did for the Movie rows. There are 19047 person records in the movieData.csv file.

WITH $that AS row
MATCH (p) WHERE row.Entity = "Person" AND id(p) = idFrom("Person", row.tmdbId)
SET
  p:Person,
  p.imdbId = row.imdbId,
  p.bornIn = row.bornIn,
  p.name = row.name,
  p.bio = row.bio,
  p.poster = row.poster,
  p.url = row.url,
  p.born = row.born,
  p.died = row.died,
  p.tmdbId = row.tmdbId,
  p.born = CASE row.born WHEN "" THEN null ELSE datetime(row.born + "T00:00:00Z") END,
  p.died = CASE row.died WHEN "" THEN null ELSE datetime(row.died + "T00:00:00Z") END
Enter fullscreen mode Exit fullscreen mode

The ingest query in this ingest stream matches when the row.Entity is Person, creates a node using the idFrom() function, and stores the Person metadata in node parameters.

Join Rows

Looking at the rows that have Join in the Entity column leads me to believe that the data in this CSV file originated from a relational database. There are two types of joins in the file, Acted and Directed. The ingest queries below process them.

Acted In

WITH $that AS row
WITH row WHERE row.Entity = "Join" AND row.Work = "Acting"
MATCH (p) WHERE id(p) = idFrom("Person", row.tmdbId)
MATCH (m) WHERE id(m) = idFrom("Movie", row.movieId)
MATCH (r) WHERE id(r) = idFrom("Role", row.tmdbId, row.movieId, row.role)
SET 
  r.role = row.role, 
  r.movie = row.movieId, 
  r.tmdbId = row.tmdbId, 
  r:Role
MERGE (p:Person)-[:PLAYED]->(r:Role)<-[:HAS_ROLE]-(m:Movie)
MERGE (p:Person)-[:ACTED_IN]->(m:Movie)
Enter fullscreen mode Exit fullscreen mode

Acted join rows create relationships between Person, Role, and Movie nodes. There are two paths created from the Person nodes. The first path (p)-[:PLAYED]->(r)<-[:HAS_ROLE]-(m) establishes the relationship between actors (Person) and the roles they have played as well as the roles in a movie (Movies). A second path is formed that directly relates an actor to movies they acted in.

Directed

WITH $that AS row
WITH row WHERE row.Entity = "Join" AND row.Work = "Directing"
MATCH (p) WHERE id(p) = idFrom("Person", row.tmdbId)
MATCH (m) WHERE id(m) = idFrom("Movie", row.movieId)
MERGE (p:Person)-[:DIRECTED]->(m:Movie)
Enter fullscreen mode Exit fullscreen mode

The Directed ingest query matches join rows and creates a path relating directors with the movies they have directed.

Ratings

WITH $that AS row
MATCH (m) WHERE id(m) = idFrom("Movie", row.movieId)
MATCH (u) WHERE id(u) = idFrom("User", row.userId)
MATCH (rtg) WHERE id(rtg) = idFrom("Rating", row.movieId, row.userId, row.rating)
SET u.name = row.name, u:User
SET rtg.rating = row.rating,
  rtg.timestamp = toInteger(row.timestamp),
  rtg:Rating
MERGE (u:User)-[:SUBMITTED]->(rtg:Rating)<-[:HAS_RATING]-(m:Movie)
MERGE (u:User)-[:RATED]->(m:Movie)
Enter fullscreen mode Exit fullscreen mode

The last ingest query processes rows from the ratingData.csv file. The query creates User and Rating nodes, then relates them together.

Running the Recipe

As my project progressed, I developed a Quine recipe to load my CSV files and perform the analysis. Running the recipe requires a couple of Quine options to pass in the locations of the CSV files and an updated configuration setting.

java \
-Dquine.in-memory-soft-node-limit=30000 \
-jar ../releases/latest -r movieData \
--recipe-value movie_file=movieData.csv \
--recipe-value rating_file=ratingData.csv
Enter fullscreen mode Exit fullscreen mode

After ingesting the CSV files, it results in the data set stored in Quine:

The data model in Quine for the IMDB data.

The orange Movie and Person nodes are created directly from the Entity column in movieData.csv. The User node is from ratingData.csv and the green nodes were derived from data stored within an entity row. The ActedDirected relationship is built by the standing query in the recipe.

Answering the Question

Getting all of this data into Quine was only part of the challenge. Remember the question that we were asked, "which actors have acted in and directed the same movie?"

Quine is a streaming graph; if we were to connect the ingest streams to the streaming source, rather than CSV files, the standing query inside of the recipe that I developed would answer the question for movies in the past as well as movies in the future.

Our standing query matches when a complete pattern for the situation when an actor (Person) both ACTED_IN and DIRECTED the same movie.

MATCH (a:Movie)<-[:ACTED_IN]-(p:Person)-[:DIRECTED]->(m:Movie) 
WHERE id(a) = id(m)
RETURN id(m) as movieId, m.title as Movie, id(p) as personId, p.name as Actor
Enter fullscreen mode Exit fullscreen mode

When the standing query completes a match, it processes the movie id and person id through the output query and actions.

standingQueries:
  - pattern:
      type: Cypher
      mode: MultipleValues
      query: |-
        MATCH (a:Movie)<-[:ACTED_IN]-(p:Person)-[:DIRECTED]->(m:Movie) 
        WHERE id(a) = id(m)
        RETURN id(m) as movieId, m.title as Movie, id(p) as personId, p.name as Actor
    outputs:
      set-ActedDirected:
        type: CypherQuery
        query: |-
          MATCH (m),(p)
          WHERE strId(m) = $that.data.movie AND strId(p) = $that.data.person
          MERGE (p:Person)-[:ActedDirected]->(m:Movie)
      log-actor-director:
        type: WriteToFile
        path: "ActorDirector.jsonl"
Enter fullscreen mode Exit fullscreen mode

My standing query creates a new ActedDirected relationship between the Person and Movie nodes, then logs the relationship.

Four hundred ninety-one actors acted in and directed the same movie in our data set.

{
    "data": {
        "Actor": "Clint Eastwood",
        "Movie": "Unforgiven",
        "movieId": "4a6d64c8-9c90-3362-b443-4d2e7b2fb9d1",
        "personId": "4638a820-3b68-3fc7-9fa7-341e876b701e"
    }
}
Enter fullscreen mode Exit fullscreen mode

Conclusion

Phew, we made it through! And we learned a lot along the way.

  • CSV data is streamed into Quine
  • Quine can read from external files and streaming providers
  • You can ingest multiple streams at once, movies and reviewers, and combine them into one streaming graph
  • Always separate ingest queries using the jobs to be done framework

Quine is open source if you want to run this analysis for yourself. Download a precompiled version or build it yourself from the codebase (Quine Github). I published the recipe that I developed at https://quine.io/recipes. The page has instructions for downloading the CSV files and running the recipe.

Have a question, suggestion, or improvement? I welcome your feedback! Please drop in to Quine Slack and let me know. I'm always happy to discuss Quine or answer questions.

Top comments (0)