The Apache Software Foundation's Hadoop and Spark projects comprise the core of what is probably the most popular open-source Big Data analysis pipeline today. Hadoop's distributed file system, HDFS, breaks files into chunks and replicates those chunks across commodity hardware, resulting in a cheap, scalable, fault-tolerant data storage solution. Spark can then analyse those data chunks in-place, reducing network and I/O latency, with a speedup of 100x over Hadoop's built-in analysis framework, MapReduce.
But sometimes, you want to present a nice graphical user interface (GUI) so users who aren't as tech savvy can still access and analyse your Big Data stores. There are all sorts of applications for this -- real-time data analysis and modelling of manufacturing processes; discovering daily, weekly, or seasonal trends in consumer or user data; or analyzing scientific data to make live interactive plots using plotly
. In this post, I'll show you how to read and analyze distributed data stored on HDFS using Spark, and present those results in R.
(Note: This post assumes that you have HDFS / Spark up and running on your machine. If not, check out my previous post on Installing and Running Hadoop and Spark on Windows.)
The SparkR Shell
Getting R to talk to HDFS through Spark is very easy with the SparkR
shell. From the command like, just run:
C:\Users\andrew>sparkR
R version 3.5.1 (2018-07-02) -- "Feather Spray"
Copyright (C) 2018 The R Foundation for Statistical Computing
Platform: x86_64-w64-mingw32/x64 (64-bit)
...
Spark package found in SPARK_HOME: C:\Spark\spark-2.3.2-bin-hadoop2.7
Launching java with spark-submit command C:\Spark\spark-2.3.2-bin-hadoop2.7/bin/spark-submit2.cmd "sparkr-shell" C:\Users\andrew\AppData\Local\Temp\RtmpuMdeVg\backend_port2664d055020
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 2.3.2
/_/
SparkSession available as 'spark'.
>
This is an R shell with access to the HDFS. Try running some R commands:
> x <- c(1,2,3)
> x
[1] 1 2 3
Text Files
If your HDFS is set up correctly, you should be able to access plain text files right away using the read.df()
method:
> txt <- read.df("hdfs://localhost:9000/example_data/example_text.md", "text")
> txt
SparkDataFrame[value:string]
> head(txt)
value
1 # Apache Spark
2
3 Spark is a fast and general cluster computing system for Big Data. It provides
4 high-level APIs in Scala, Java, Python, and R, and an optimized engine that
5 supports general computation graphs for data analysis. It also supports a
6 rich set of higher-level tools including Spark SQL for SQL and DataFrames,
>
Note that this file is located on my HDFS at /example_data/example_text.md
. This is confirmed by running hadoop fs -ls
at the cmd
prompt:
C:\Users\andrew>hadoop fs -ls /example_data
Found 5 items
-rw-r--r-- 1 andrew supergroup 2848 2018-11-19 16:23 /example_data/example_csv.csv
-rw-r--r-- 1 andrew supergroup 7882 2018-11-19 16:23 /example_data/example_sql_linux.txt
-rw-r--r-- 1 andrew supergroup 13958 2018-11-19 16:23 /example_data/example_sql_windows.txt
-rw-r--r-- 1 andrew supergroup 3809 2018-11-16 16:35 /example_data/example_text.md
-rw-r--r-- 1 andrew supergroup 14381 2018-11-19 16:24 /example_data/example_xlsx.xlsx
The hostname (localhost
for me) and port number (9000
for me) are specific to your HDFS setup, but these are the current default settings for a standalone installation.
CSV Files
CSV files can be read with read.df()
as well, but you have to set source
equal to "csv"
, rather than "text"
:
> csv <- read.df("hdfs://localhost:9000/example_data/example_csv.csv", "csv")
> csv
SparkDataFrame[_c0:string, _c1:string, _c2:string, _c3:string, _c4:string]
> head(csv)
_c0 _c1 _c2 _c3
1 Time dissolvedO2 pH Temperature
2 2018-01-01 15:00:10 49.56497432219166 7.056500932431841 36.952017501071516
3 2018-01-01 15:00:40 49.04355394077128 7.056606732537641 36.94562695468097
...
Notice that we have column headers, above, that weren't interpreted correctly. To fix this, there's another option for read.df()
, called header
, which we need to set to TRUE
(or just T
for short):
> csv <- read.df("hdfs://localhost:9000/example_data/example_csv.csv", "csv", header=T)
> csv
SparkDataFrame[Time:string, dissolvedO2:string, pH:string, Temperature:string, AgitatorSpeed:string]
> head(csv)
Time dissolvedO2 pH Temperature
1 2018-01-01 15:00:10 49.56497432219166 7.056500932431841 36.952017501071516
2 2018-01-01 15:00:40 49.04355394077128 7.056606732537641 36.94562695468097
3 2018-01-01 15:01:10 49.7866879539053 7.056590932521841 36.98149607055008
...
>
Also, notice that both csv
and txt
are SparkDataFrame
objects. A SparkDataFrame
is a distributed collection of data which SparkR
can access and analyse in its distributed form across the HDFS cluster. This means that the data isn't read into the R session, rather, it's treated exactly the way it would be if you ran Scala or Java commands in the Spark shell itself.
SQL Tables
Importing SQL Tables into HDFS
In a previous post, I explained how to copy data files from the local filesystem into HDFS using the hadoop fs -put
command, but importing SQL tables is a bit more involved. I'm going to start by assuming that you have MySQL set up and you have a table available in some database:
C:\Users\andrew>mysql -u root -p
Enter password: ***********
...
mysql> show databases;
+--------------------+
| Database |
+--------------------+
| information_schema |
| mysql |
| performance_schema |
| siemens |
| sys |
+--------------------+
5 rows in set (0.00 sec)
mysql> use siemens;
Database changed
mysql> show tables;
+-------------------+
| Tables_in_siemens |
+-------------------+
| simulate |
+-------------------+
1 row in set (0.00 sec)
We're going to use Apache Sqoop to import the simulate
table into HDFS:
-
Download the most recent version of Sqoop. (I downloaded the binary
*.tar.gz
file.) Unpack the file and move it intoC:\Sqoop
. -
Update your system environment variables and add
SQOOP_HOME
as the directory which you just unpacked intoC:\Sqoop
(it should be something likeC:\Sqoop\sqoop-1.4.7.bin__hadoop-2.6.0
):And add Sqoop to your
%PATH%
by appending%SQOOP_HOME%\bin
: Verify the installation by opening a new
cmd
window and typing
C:\Users\andrew>sqoop version
...
2018-11-21 13:20:29,408 INFO sqoop.Sqoop: Running Sqoop version: 1.4.7
Sqoop 1.4.7
git commit id 2328971411f57f0cb683dfb79d19d4d19d185dd8
Compiled by maugli on Thu Dec 21 15:59:58 STD 2017
Great! Now, we need to make sure that Sqoop can talk to MySQL through the Java Database Connectivity API (JDBC). To do that, we need to download Connector/J, the official JDBC driver for MySQL. I downloaded the platform-independent *.zip
file (currently mysql-connector-java-5.1.47.zip
) and unzipped it into C:\Program Files\MySQL\
for safekeeping. Then, you'll need to copy C:\Program Files\MySQL\mysql-connector-java-5.1.47\mysql-connector-java-5.1.47.jar
into C:\Sqoop\sqoop-1.4.7.bin__hadoop-2.6.0\lib\
(making sure to account for version numbers, which might be different).
The final thing to do is to download the Cloudera Sqoop Java library. (I got mine from here.) This should be named something like sqoop-1.4.2-hadoop20.jar
. Put this jar
into C:\Spark\spark-2.3.2-bin-hadoop2.7\jars
. Then, open a cmd
window with Administrator permissions. You should now be able to import an SQL table into HDFS.
I'm going to import this SQL table as a Parquet File
, but there are other options available, including importing as SequenceFiles, Avro Data files, or just as plain text. The command to import is rather long:
C:\Users\andrew> sqoop import --connect "jdbc:mysql://localhost:3306/Siemens" --table "simulate" --username "root" --password "<password here>" -m 1 --target-dir "hdfs://localhost:9000/big_data/example_SQL" --as-parquetfile --bindir sqoop_jars
Note that my MySQL database is also hosted on localhost
and the port is the default MySQL port number, 3306
. The database in my case is called Siemens
and the table, as noted above, is simulate
. I connect to MySQL as root
, type out my MySQL password, set -m 1
(the number of "map" tasks for a parallel import). I want the database to show up under /big_data/example_SQL
in my HDFS as a Parquet File, and I want any generated *.class
or *.jar
files to be dumped to C:\Users\andrew\sqoop_jars
.
Note that you can also use an "options" file to pass arguments to
sqoop import
so you don't have to write out your password as plaintext to the terminal (which is unsafe).
We can verify that the table was correctly imported to HDFS with the following command:
C:\Users\andrew>hadoop fs -ls -R /big_data
...
drwxr-xr-x - andrew supergroup 0 2018-11-19 17:46 /big_data/example_SQL
drwxr-xr-x - andrew supergroup 0 2018-11-19 17:46 /big_data/example_SQL/.metadata
-rw-r--r-- 1 andrew supergroup 174 2018-11-19 17:46 /big_data/example_SQL/.metadata/descriptor.properties
-rw-r--r-- 1 andrew supergroup 2350 2018-11-19 17:46 /big_data/example_SQL/.metadata/schema.avsc
drwxr-xr-x - andrew supergroup 0 2018-11-19 17:46 /big_data/example_SQL/.metadata/schemas
-rw-r--r-- 1 andrew supergroup 2350 2018-11-19 17:46 /big_data/example_SQL/.metadata/schemas/1.avsc
drwxr-xr-x - andrew supergroup 0 2018-11-19 17:46 /big_data/example_SQL/.signals
-rw-r--r-- 1 andrew supergroup 0 2018-11-19 17:46 /big_data/example_SQL/.signals/unbounded
-rw-r--r-- 1 andrew supergroup 118032 2018-11-19 17:46 /big_data/example_SQL/684c9b31-892c-48c5-8574-9dd61a9d7e78.parquet
Beautiful! Finally, we can read this SQL table from HDFS. Note that importing as a Parquet File means that we have schemas
. This is one advantage over importing as some other file type -- Parquet Files maintain a database schema with data types and so on, so we don't need to re-parse these later, which saves time.
Reading SQL Tables from HDFS
Assuming that you've got your SQL table into HDFS as outlined in the previous subsection, we can now read it from the SparkR shell. Simply run:
C:\Users\andrew>sparkR
...
> sql <- read.df("hdfs://localhost:9000/big_data/example_SQL/684c9b31-892c-48c5-8574-9dd61a9d7e78.parquet", "parquet")
> sql
SparkDataFrame[Time:bigint, dissolvedO2:double, pH:double, outletN2:double, outletCO2:double, outletO2:double, outletAR:double, AgitatorSpeed:double, Temperature:double, Mannose:double, Osmolality:double, Yield:double, Afuco_glycans:double, Galacto_glycans:double, Viability:double]
> head(sql)
Time dissolvedO2 pH outletN2 outletCO2 outletO2 outletAR
1 1.514819e+12 49.56497 7.056501 60.14372 10.32145 28.80335 0.7314709
2 1.514819e+12 49.26843 7.056541 60.13668 10.32402 28.82944 0.7305011
3 1.514819e+12 49.50221 7.056421 60.09855 10.31529 28.77520 0.7297820
...
>
This again works exactly the same way as it did for plain text files and CSV files, only we had to specify source="parquet"
.
Any Other Kind of File
Literally any other kind of file can also be read from HDFS using the R package curl
. I found myself wanting to read Excel files from HDFS. Here's now I did it. First, include whatever libraries you need to read the files, as well as the curl
library:
> library(curl)
> library(xlsx)
Fetch the file using http://
. Note that your port here is different than the one used above for hdfs://
, and is actually the same port which can be used in the browser to manage the HDFS cluster:
Any file accessed on the HDFS through http://
must have /webhdfs/v1
appended to it. So, for example, /example_data/example_xlsx.xlsx
would become /webhdfs/v1/example_data/example_xlsx.xlsx
. Finally, we need to append ?op=OPEN
to the end of the URI so curl
knows that we want to open the file and read it:
> curlfile <- curl_fetch_memory("http://localhost:9870/webhdfs/v1/example_data/example_xlsx.xlsx?op=OPEN")
> tmpfile <- tempfile("name_of_temp_file")
> tt <- file(tmpfile, "wb")
> writeBin(curlfile$content, tt)
> close(tt)
And that's it! We now have a temporary file (on Windows, these are saved in some subdirectory of C:\tmp
) which we can read using the appropriate reader:
> xlsx <- createDataFrame(read.xlsx(tmpfile, sheetIndex=1))
> xlsx
SparkDataFrame[Time:timestamp, dissolvedO2:double, pH:double, outletN2:double, outletCO2:double, outletO2:double, outletAR:double, AgitatorSpeed:double, Temperature:double, Mannose:double, Osmolality:double, Yield:double, Afuco_glycans:double, Galacto_glycans:double, Viability:double]
> head(xlsx)
Time dissolvedO2 pH outletN2 outletCO2 outletO2
1 2018-01-01 15:00:00 49.56497 7.056501 60.14372 10.32145 28.80335
2 2018-01-01 15:00:30 49.04355 7.056607 60.21205 10.38589 28.80089
3 2018-01-01 15:01:00 49.78669 7.056591 60.24343 10.37360 28.84372
Again, notice that both the SQL Parquet File and this Excel file are imported as SparkDataFrame
objects! (Note: to the best of my knowledge, there doesn't seem to be a way to read Excel files into SparkR directly from HDFS. They must be downloaded as temporary files to the local filesystem. But assuming that most of your data is in CSV, text, or SQL-as-Parquet Files, you can still distribute most of your analysis tasks.)
Spark in RStudio
You can also execute any of the above commands in the plain R shell or even RStudio. All you have to do is tell R where the SparkR
library is located. This should be in your SPARK_HOME
directory. For me, this is:
C:\Users\andrew>echo %SPARK_HOME%
C:\Spark\spark-2.3.2-bin-hadoop2.7
So I can open the plain R shell and get my SQL table from above with:
C:\Users\andrew>R --no-restore --no-save
...
> library(SparkR, lib.loc = c(file.path(Sys.getenv("SPARK_HOME"), "R", "lib")))
...
> sparkR.session(master = "local[*]", sparkConfig = list(spark.driver.memory="4g"))
Spark package found in SPARK_HOME: C:\Spark\spark-2.3.2-bin-hadoop2.7
Launching java with spark-submit command C:\Spark\spark-2.3.2-bin-hadoop2.7/bin/spark-submit2.cmd --driver-memory "64g" sparkr-shell
...
> sql <- read.df("hdfs://localhost:9000/big_data/example_SQL/684c9b31-892c-48c5-8574-9dd61a9d7e78.parquet", "parquet")
> sql
SparkDataFrame[Time:bigint, dissolvedO2:double, pH:double, outletN2:double, outletCO2:double, outletO2:double, outletAR:double, AgitatorSpeed:double, Temperature:double, Mannose:double, Osmolality:double, Yield:double, Afuco_glycans:double, Galacto_glycans:double, Viability:double]
> head(sql)
Time dissolvedO2 pH outletN2 outletCO2 outletO2 outletAR
1 1.514819e+12 49.56497 7.056501 60.14372 10.32145 28.80335 0.7314709
2 1.514819e+12 49.26843 7.056541 60.13668 10.32402 28.82944 0.7305011
3 1.514819e+12 49.50221 7.056421 60.09855 10.31529 28.77520 0.7297820
It's exactly the same! So how can we use this to help build a data analysis GUI in R?
R Shiny
Let's set up a simple R Shiny app to connect to this database. Create two files called server.R
and ui.R
and paste the following code into them:
server.R
library(shiny)
# Connect to Spark outside the shinyServer() method
library(SparkR, lib.loc = c(file.path(Sys.getenv("SPARK_HOME"), "R", "lib")))
sparkR.session(master = "local[*]", sparkConfig = list(spark.driver.memory="4g"))
# Define server logic
shinyServer(function(input, output) {
output$myTable <- renderDT(
# get the SparkDataFrame from HDFS; collect() converts SparkDataFrame -> data.frame
collect(read.df(paste("hdfs://localhost:9000/", input$path, sep=""), input$type))
)
})
ui.R
library(shiny)
library(DT)
# Define UI for application that draws a data table
shinyUI(fluidPage(
# Application title
titlePanel("Simple SparkDataFrame Example"),
# Sidebar with a text input for the filename
sidebarLayout(
sidebarPanel(
textInput("path",
"HDFS File Path:",
"/big_data/example_SQL/684c9b31-892c-48c5-8574-9dd61a9d7e78.parquet"
),
textInput("type",
"HDFS File Type:",
"parquet"
)
),
# Show the table
mainPanel(
dataTableOutput("myTable")
)
)
))
Run the app in RStudio and you'll get...
Ta da! It just works. All we have to do is set up the sparkR.session
in server.R
and we have access to the same commands we ran earlier in the plain R shell. Note that using collect()
to convert the SparkDataFrame
to a data.frame
means that the data has been collected from HDFS and is being held in memory for R to use! Sometimes, this may be what you want (but usually not, if you're working with gigantic datasets). It's best practice to use as many SparkDataFrame
operations as you can before converting to an R data.frame
.
So there you have it! A Hadoop/Spark Big Data back end with a nice R Shiny front end. Perfect for analyzing your data lake and sending nice, polished results to the end user, with all the power of Spark behind your analysis!
Note:
A useful package in R for finding which package holds a particular function you're calling (among other things) is sos
, which provides findFn()
:
> findFn("read.df")
found 3 matches
Downloaded 3 links in 1 packages.
Ignoring template.
...which will pop up a window like the following in your browser:
Top comments (1)
What is the purpose of using Cloudera Sqoop Java library? How does Sqoop affect its operation?