Apache Spark shell scala examples

posted on Nov 20th, 2016

Apache Spark

Apache Spark is an open source cluster computing framework. Originally developed at the University of California, Berkeley's AMPLab, the Spark codebase was later donated to the Apache Software Foundation, which has maintained it since. Spark provides an interface for programming entire clusters with implicit data parallelism and fault-tolerance.

Pre Requirements

1) A machine with Ubuntu 14.04 LTS operating system

2) Apache Hadoop 2.6.4 pre installed (How to install Hadoop on Ubuntu 14.04)

3) Apache Spark 1.6.1 pre installed (How to install Spark on Ubuntu 14.04)

Spark Shell Scala Examples

Step 1 - Change the directory to /usr/local/spark/sbin.

$ cd /usr/local/spark/sbin

Step 2 - Start all spark daemons.

$ ./start-all.sh

Step 3 - The JPS (Java Virtual Machine Process Status Tool) tool is limited to reporting information on JVMs for which it has the access permissions.

$ jps

Apache Spark Shell Scala Examples

Step 4 - The following command is used to open Spark shell.

$ spark-shell

Apache Spark Shell Scala Examples

Broadcast Variables. Broadcast variables allow the programmer to keep a read-only variable cached on each machine rather than shipping a copy of it with tasks.

scala> val broadcastVar = sc.broadcast(Array(1, 2, 3))

Accumulators. Accumulators are variables that are only "added" to through an associative operation and can therefore, be efficiently supported in parallel. They can be used to implement counters (as in MapReduce) or sums.

scala> val accum = sc.accumulator(0)
scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum += x)
scala> accum.value

Spark SQL

Read the JSON Document

employee.json

{
{"id" : "1201", "name" : "satish", "age" : "25"}
{"id" : "1202", "name" : "krishna", "age" : "28"}
{"id" : "1203", "name" : "amith", "age" : "39"}
{"id" : "1204", "name" : "javed", "age" : "23"}
{"id" : "1205", "name" : "prudvi", "age" : "23"}
}

scala> val sqlcontext = new org.apache.spark.sql.SQLContext(sc)
scala> hadoop dfs -copyFromLocal /home/hduser/Desktop/employee.json /user/hduser/
scala> val dfs = sqlContext.read.json("/user/hduser/employee.json")
scala> dfs.show()
scala> dfs.printSchema()
scala> dfs.select("name").show()
scala> dfs.filter(dfs("age") > 23).show()
scala> dfs.groupBy("age").count().show()

1) By default, the SparkContext object is initialized with the name sc when the spark-shell starts.

2) Store the employee.json file in HDFS.

3) First, we have to read the JSON document. Based on this, generate a DataFrame named (dfs).

4) you want to see the data in the DataFrame, then use this command.

5) If you want to see the Structure (Schema) of the DataFrame, then use this command.

6) Use this command to fetch name-column among three columns from the DataFrame.

7) Use this following command for finding the employees whose age is greater than 23 (age > 23).

8) Use this following command for counting the number of employees who are of the same age.

Read the Text Document

employee.txt

1201, satish, 25
1202, krishna, 28
1203, amith, 39
1204, javed, 23
1205, prudvi, 23

scala> hadoop dfs -copyFromLocal /home/hduser/Desktop/employee.txt /user/hduser/
scala> val sqlContext = new org.apache.spark.sql.SQLContext(sc)
scala> import sqlContext.implicits._
scala> val empl=sc.textFile("/user/hduser/employee.txt").map(_.split(",")).map(e => Employee(e(0).trim.toInt,e(1), e(2).trim.toInt)).toDF()
scala> empl.registerTempTable("employee")
scala> val allrecords = sqlContext.sql("SELECT * FROM employee")
scala> allrecords.show()
scala> val agefilter = sqlContext.sql("SELECT * FROM employee WHERE age>=20 AND age <= 35")
scala> agefilter.show()
scala> agefilter.map(t=>"ID: "+t(0)).collect().foreach(println)

1) Store the employee.txt file in HDFS.

2) By default, the SparkContext object is initialized with the name sc when the spark-shell starts.

3) import all the SQL functions used to implicitly convert an RDD to a DataFrame.

4) we have to define a schema for employee record data using a case class.

5) generate an RDD named empl by reading the data from employee.txt and converting it into DataFrame, using the Map functions.

6) store the DataFrame data into a table named employee.

7) we use the variable allrecords for capturing all records data.

8) display those records, call show() method on it.

9) the variable agefilter stores the records of employees whose age are between 20 and 35.

10) To see the result data of agefilter DataFrame

11) for fetching the ID values from agefilter RDD result, using field index.

Hive Tables

Hive comes bundled with the Spark library as HiveContext, which inherits from SQLContext. Using HiveContext, you can create and find tables in the HiveMetaStore and write queries on it using HiveQL. Users who do not have an existing Hive deployment can still create a HiveContext. When not configured by the hive-site.xml, the context automatically creates a metastore called metastore_db and a folder called warehouse in the current directory.

employee.json

{
{"id" : "1201", "name" : "satish", "age" : "25"}
{"id" : "1202", "name" : "krishna", "age" : "28"}
{"id" : "1203", "name" : "amith", "age" : "39"}
{"id" : "1204", "name" : "javed", "age" : "23"}
{"id" : "1205", "name" : "prudvi", "age" : "23"}
}

scala> val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)
scala> sqlContext.sql("CREATE TABLE IF NOT EXISTS employee(id INT, name STRING,age INT) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LINES TERMINATED BY'\n'")
scala> sqlContext.sql("LOAD DATA LOCAL INPATH '/home/hduser/Desktop/employee.txt' INTO TABLE employee")
scala> val result = sqlContext.sql("FROM employee SELECT id, name, age")
scala> result.show()

1) initializing the HiveContext into the Spark Shell.

2) creating a table named employee with the fields id, name, and age. Here, we are using the Create statement of HiveQL syntax.

3) loading the employee record data into the employee table.

4) execute any kind of SQL queries into the table. Use the following command for fetching all records using HiveQL select query.

5) To display the record data, call the show() method on the result DataFrame.

Please share this blog post and follow me for latest updates on

facebook             google+             twitter             feedburner

Previous Post                                                                                          Next Post

Labels : Spark Standalone Mode Installation   Spark Cluster Mode Installation   Spark With YARN Configuration   Spark WordCount Java Example   Spark submit-script Usage   Spark Shell Usage   Spark WordCount Scala Example