Work with Hive in Spark

Spark version: 2.1.0
Hive version: 2.1.1
Hadoop version: 2.7.3

For data exchange or integration with other applications, Spark can read from or write to Hive tables. Here are the configurations to make that work. Hive is a database technology using HDFS storage. Since its inception, people found its value as well limitations. I think the most valuable legacy of Hive is that many new applications using Hive metastore. The Hive query engine is no longer that important. However, here I meant to show how these products can work together.

Step 1: Setup configuration files

Upon installation of Hadoop, Hive, and Spark as detailed in prior blogs, make Hadoop and Hive configurations available to Spark by doing one of the following:

a. set environment variable HADOOP_CONF_DIR, which is normally $HADOOP_HOME/etc/hadoop, and HIVE_CONF_DIR, which is normally $HIVE_HOME/conf
b. copy hive-site.xml, core-site.xml, hdfs-site.xml to $SPARK_HOME/conf folder

Step 2: Enable Spark session with Hive support

Some preparation steps quickly here:

– start Hadoop by running start-dfs.sh on nnode1
– start Yarn by running start-yarn.sh on nnode2
– start Hive metastore on nnode1 by running “hive –service metastore”
– start Spark default command line by running spark-shell

Once all started and Spark shell running, execute following code to enable Hive support. Note, a default Spark session was started when Spark shell starts. The enableHiveSupport method will set the configuration. The getOrCreate method will make sure no new Spark Session object is created.

import org.apache.spark.sql.Row
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder().appName("Spark Hive Example").enableHiveSupport().getOrCreate()

Step 3: Load data into Hive tables

Spark installation comes with a few examples under folder SPARK_HOME/examples/src/main/resources. We will copy them to Hadoop so that they can be loaded to Hive. In a separate terminal window, run following on nnode1 as hadoop user:

hadoop fs -mkdir data
hadoop fs -mkdir data/spark-examples
hadoop fs -put $SPARK_HOME/examples/src/main/resources/* data/spark-examples

Next, go back to the Spark shell, and execute following to create a table called “src” in the “default” database. This table schema is defined according to the sample data kv1.txt, which contains a list of key-value pairs.

import spark.implicits._
import spark.sql
sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)")
sql("LOAD DATA INPATH 'data/spark-examples/kv1.txt' INTO TABLE src")

Verify the sample file is cleaned up, by checking kv1.txt under data/spark-examples folder.

hadoop fs -ls data/spark-examples/

Step 4: Query the Hive table

Hive tables can be queried in Spark with the sql() method. Note Hive-SQL supports only SQL-92 standards. But that covers most common SQL syntaxes.

sql("SELECT * FROM src").show()
sql("SELECT COUNT(*) FROM src").show()

Step 5: Manipulate Hive data using Data Frames

Following code are explained in Spark Apache web site, you can find details here

val sqlDF = sql("SELECT key, value FROM src WHERE key < 10 ORDER BY key") val stringsDS = sqlDF.map { case Row(key: Int, value: String) => s"Key: $key, Value: $value"
}
stringsDS.show()
val recordsDF = spark.createDataFrame((1 to 100).map(i => Record(i, s"val_$i")))
recordsDF.createOrReplaceTempView("records")
sql("SELECT * FROM records r JOIN src s ON r.key = s.key").show()