Sunday, March 5, 2017

Sample Spark Application -wordcount with Simple Build Tool (SBT)

Developing and Running a Spark WordCount Application written in Scala :

Apache Spark runs on Hadoop, Mesos, standalone, or in the cloud. It can access diverse data sources including HDFS, Cassandra, HBase, and S3. You can run Spark using its standalone cluster mode, on EC2, on Hadoop YARN, or on Apache Mesos. Access data in HDFS, Cassandra, HBase, Hive, Tachyon, and any Hadoop data source.

WordCount example reads text files and counts how often words occur. The input is text files and the output is text files, each line of which contains a word and the count of how often it occured, separated by a space (" ").

Simple Build Tool (SBT) is an open source build tool for Scala and Java projects, similar to Java's Maven or Ant. Its main features are: native support for compiling Scala code and integrating with many Scala frameworks. sbt is the de facto build tool in the Scala community.

This tutorial describes how to write, compile and run a simple Spark word count application in scala language supported by Spark. 

Pr-requisites: You could download and install listed packages:
1) sbt from: http://www.scala-sbt.org/    or  git clone https://github.com/sbt/sbt.git
2) scala from https://www.scala-lang.org/download/all.html
3) Apache spark from  http://spark.apache.org/downloads.html
 ---------------------------------------------------------------------------------
Step 1:  create the folder scala_project :
               mkdir Scala_wc_project
Step 2:   cd  scala_wc_project

Step 3 :  mkdir -p project src/{main,test}/{scala,java,resources}

spb@spb-VirtualBox:~/Scala_wc_project$ ls
project  src
Step 4 : create wordcount.scala program at src/main/scala

spb@spb-VirtualBox:~/Scala_wc_project/src/main/scala$ vi wordcount.scala
-----------------------------------
 import org.apache.spark._
 import org.apache.spark.SparkContext._


 object WordCount {
    def main(args: Array[String]) {
      val inputFile = args(0)
      val outputFile = args(1)
      val conf = new SparkConf().setAppName("
wordCount")
      // Create a Scala Spark Context.
      val sc = new SparkContext(conf)
      // Load our input data.
      val input =  sc.textFile(inputFile)
      // Split up into words.
      val words = input.flatMap(line => line.split(" "))
      // Transform into word and count.
      val counts = words.map(word => (word, 1)).reduceByKey{case (x, y) => x + y}
      // Save the word count back out to a text file, causing evaluation.
      counts.saveAsTextFile(
outputFile)
    }
}
-----------------------------------------------------------------------------------
where



 
Step 5 : create  build.sbt at root of project directory
spb@spb-VirtualBox:~/Scala_wc_
project$ cat build.sbt
name := "SPB_Word Count"

version := "1.0"

scalaVersion := "2.11.8"

sbtVersion := "0.13.13"

libraryDependencies += "org.apache.spark" % "spark-core_2.10" % "2.1.0"
-------------------------------------------------------------------------------------
 Step 6 :  Execute sbt commands : clean, update,compile, package
This will create the  jar file (package) at target directory

spb@spb-VirtualBox:~/Scala_wc_project$ sbt clean
[info] Set current project to SPB_Word Count (in build file:/home/spb/Scala_wc_project/)
[success] Total time: 1 s, completed 5 Mar, 2017 7:29:04 PM
spb@spb-VirtualBox:~/Scala_wc_project$ sbt update
[info] Set current project to SPB_Word Count (in build file:/home/spb/Scala_wc_project/)
[info] Updating {file:/home/spb/Scala_wc_project/}scala_wc_project...
[info] Resolving jline#jline;2.12.1 ...
[info] Done updating.
[success] Total time: 18 s, completed 5 Mar, 2017 7:29:33 PM
spb@spb-VirtualBox:~/Scala_wc_project$ sbt compile
[info] Set current project to SPB_Word Count (in build file:/home/spb/Scala_wc_project/)
[info] Compiling 1 Scala source to /home/spb/Scala_wc_project/target/scala-2.11/classes...
[success] Total time: 10 s, completed 5 Mar, 2017 7:29:54 PM
spb@spb-VirtualBox:~/Scala_wc_project$
spb@spb-VirtualBox:~/Scala_wc_project$ sbt package
[info] Set current project to SPB_Word Count (in build file:/home/spb/Scala_wc_project/)
[info] Packaging /home/spb/Scala_wc_project/target/scala-2.11/spb_word-count_2.11-1.0.jar ...
[info] Done packaging.
[success] Total time: 2 s, completed 5 Mar, 2017 7:30:47 PM
spb@spb-VirtualBox:~/Scala_wc_project$

spb@spb-VirtualBox:~/Scala_wc_project/target/scala-2.11$ ls
classes  spb_word-count_2.11-1.0.jar
spb@spb-VirtualBox:~/Scala_wc_project/target/scala-2.11$
-------------------
Step 7: Project directory structure :

Step 8 :  Run  "spb_word-count_2.11-1.0.jar" file  on spark framework  as shown here :

 spark-submit --class WordCount --master local spb_word-count_2.11-1.0.jar /home/spb/data/input.txt /home/spb/data/output8

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
17/03/05 19:36:32 INFO SparkContext: Running Spark version 2.1.0
17/03/05 19:36:33 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
17/03/05 19:36:34 WARN Utils: Your hostname, spb-VirtualBox resolves to a loopback address: 127.0.1.1; using 192.168.1.123 instead (on interface enp0s3)
17/03/05 19:36:34 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
17/03/05 19:36:34 INFO SecurityManager: Changing view acls to: spb
17/03/05 19:36:34 INFO SecurityManager: Changing modify acls to: spb
17/03/05 19:36:34 INFO SecurityManager: Changing view acls groups to:
17/03/05 19:36:34 INFO SecurityManager: Changing modify acls groups to:
17/03/05 19:36:34 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(spb); groups with view permissions: Set(); users  with modify permissions: Set(spb); groups with modify permissions: Set()
17/03/05 19:36:34 INFO Utils: Successfully started service 'sparkDriver' on port 39733.
17/03/05 19:36:34 INFO SparkEnv: Registering MapOutputTracker
17/03/05 19:36:34 INFO SparkEnv: Registering BlockManagerMaster
17/03/05 19:36:34 INFO BlockManagerMasterEndpoint: Using org.apache.spark.storage.
DefaultTopologyMapper for getting topology information
17/03/05 19:36:34 INFO BlockManagerMasterEndpoint: BlockManagerMasterEndpoint up
17/03/05 19:36:35 INFO DiskBlockManager: Created local directory at /tmp/blockmgr-8b32665a-9d41-
4c8d-bee4-42b6d5891c15
17/03/05 19:36:35 INFO MemoryStore: MemoryStore started with capacity 366.3 MB
17/03/05 19:36:35 INFO SparkEnv: Registering OutputCommitCoordinator
17/03/05 19:36:35 INFO Utils: Successfully started service 'SparkUI' on port 4040.
17/03/05 19:36:35 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at http://192.168.1.123:4040
17/03/05 19:36:36 INFO SparkContext: Added JAR file:/home/spb/Scala_wc_
project/target/scala-2.11/spb_word-count_2.11-1.0.jar at spark://192.168.1.123:39733/jars/spb_word-count_2.11-1.0.jar with timestamp 1488722796080
17/03/05 19:36:36 INFO Executor: Starting executor ID driver on host localhost
17/03/05 19:36:36 INFO Utils: Successfully started service 'org.apache.spark.network.
netty.NettyBlockTransferService' on port 37957.
17/03/05 19:36:36 INFO NettyBlockTransferService: Server created on 192.168.1.123:37957
17/03/05 19:36:36 INFO BlockManager: Using org.apache.spark.storage.
RandomBlockReplicationPolicy for block replication policy
17/03/05 19:36:36 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, 192.168.1.123, 37957, None)
17/03/05 19:36:36 INFO BlockManagerMasterEndpoint: Registering block manager 192.168.1.123:37957 with 366.3 MB RAM, BlockManagerId(driver, 192.168.1.123, 37957, None)
17/03/05 19:36:36 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, 192.168.1.123, 37957, None)
17/03/05 19:36:36 INFO BlockManager: Initialized BlockManager: BlockManagerId(driver, 192.168.1.123, 37957, None)
17/03/05 19:36:37 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 236.5 KB, free 366.1 MB)
17/03/05 19:36:38 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 22.9 KB, free 366.0 MB)
17/03/05 19:36:38 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on 192.168.1.123:37957 (size: 22.9 KB, free: 366.3 MB)
17/03/05 19:36:38 INFO SparkContext: Created broadcast 0 from textFile at wordcount.scala:12
17/03/05 19:36:38 INFO FileInputFormat: Total input paths to process : 1
17/03/05 19:36:38 INFO deprecation: mapred.tip.id is deprecated. Instead, use mapreduce.task.id
17/03/05 19:36:38 INFO deprecation: mapred.task.id is deprecated. Instead, use mapreduce.task.attempt.id
17/03/05 19:36:38 INFO deprecation: mapred.task.is.map is deprecated. Instead, use mapreduce.task.ismap
17/03/05 19:36:38 INFO deprecation: mapred.task.partition is deprecated. Instead, use mapreduce.task.partition
17/03/05 19:36:38 INFO deprecation: mapred.job.id is deprecated. Instead, use mapreduce.job.id
17/03/05 19:36:38 INFO FileOutputCommitter: File Output Committer Algorithm version is 1
17/03/05 19:36:38 INFO SparkContext: Starting job: saveAsTextFile at wordcount.scala:18
17/03/05 19:36:39 INFO DAGScheduler: Registering RDD 3 (map at wordcount.scala:16)
17/03/05 19:36:39 INFO DAGScheduler: Got job 0 (saveAsTextFile at wordcount.scala:18) with 2 output partitions
17/03/05 19:36:39 INFO DAGScheduler: Final stage: ResultStage 1 (saveAsTextFile at wordcount.scala:18)
17/03/05 19:36:39 INFO DAGScheduler: Parents of final stage: List(ShuffleMapStage 0)
17/03/05 19:36:39 INFO DAGScheduler: Missing parents: List(ShuffleMapStage 0)
17/03/05 19:36:39 INFO DAGScheduler: Submitting ShuffleMapStage 0 (MapPartitionsRDD[3] at map at wordcount.scala:16), which has no missing parents
17/03/05 19:36:39 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 4.7 KB, free 366.0 MB)
17/03/05 19:36:39 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 2.7 KB, free 366.0 MB)
17/03/05 19:36:39 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on 192.168.1.123:37957 (size: 2.7 KB, free: 366.3 MB)
17/03/05 19:36:39 INFO SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:996
17/03/05 19:36:39 INFO DAGScheduler: Submitting 2 missing tasks from ShuffleMapStage 0 (MapPartitionsRDD[3] at map at wordcount.scala:16)
17/03/05 19:36:39 INFO TaskSchedulerImpl: Adding task set 0.0 with 2 tasks
17/03/05 19:36:39 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, localhost, executor driver, partition 0, PROCESS_LOCAL, 6043 bytes)
17/03/05 19:36:39 INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID 1, localhost, executor driver, partition 1, PROCESS_LOCAL, 6043 bytes)
17/03/05 19:36:39 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)
17/03/05 19:36:39 INFO Executor: Running task 1.0 in stage 0.0 (TID 1)
17/03/05 19:36:39 INFO Executor: Fetching spark://192.168.1.123:39733/jars/spb_word-count_2.11-1.0.jar with timestamp 1488722796080
17/03/05 19:36:40 INFO TransportClientFactory: Successfully created connection to /192.168.1.123:39733 after 110 ms (0 ms spent in bootstraps)
17/03/05 19:36:40 INFO Utils: Fetching spark://192.168.1.123:39733/jars/spb_word-count_2.11-1.0.jar to /tmp/spark-58cba0fb-b20f-46f4-
bb17-3201f9dec45b/userFiles-69c444db-7e90-48cb-805b-12225146686f/fetchFileTemp6212103198181174966.tmp
17/03/05 19:36:40 INFO Executor: Adding file:/tmp/spark-58cba0fb-b20f-
46f4-bb17-3201f9dec45b/userFiles-69c444db-7e90-48cb-805b-12225146686f/spb_word-count_2.11-1.0.jar to class loader
17/03/05 19:36:40 INFO HadoopRDD: Input split: file:/home/spb/data/input.txt:
0+39
17/03/05 19:36:40 INFO HadoopRDD: Input split: file:/home/spb/data/input.txt:
39+40
17/03/05 19:36:41 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 1819 bytes result sent to driver
17/03/05 19:36:41 INFO Executor: Finished task 1.0 in stage 0.0 (TID 1). 1819 bytes result sent to driver
17/03/05 19:36:41 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 1221 ms on localhost (executor driver) (1/2)
17/03/05 19:36:41 INFO TaskSetManager: Finished task 1.0 in stage 0.0 (TID 1) in 1201 ms on localhost (executor driver) (2/2)
17/03/05 19:36:41 INFO DAGScheduler: ShuffleMapStage 0 (map at wordcount.scala:16) finished in 1.355 s
17/03/05 19:36:41 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool
17/03/05 19:36:41 INFO DAGScheduler: looking for newly runnable stages
17/03/05 19:36:41 INFO DAGScheduler: running: Set()
17/03/05 19:36:41 INFO DAGScheduler: waiting: Set(ResultStage 1)
17/03/05 19:36:41 INFO DAGScheduler: failed: Set()
17/03/05 19:36:41 INFO DAGScheduler: Submitting ResultStage 1 (MapPartitionsRDD[5] at saveAsTextFile at wordcount.scala:18), which has no missing parents
17/03/05 19:36:41 INFO MemoryStore: Block broadcast_2 stored as values in memory (estimated size 72.8 KB, free 366.0 MB)
17/03/05 19:36:41 INFO MemoryStore: Block broadcast_2_piece0 stored as bytes in memory (estimated size 26.4 KB, free 365.9 MB)
17/03/05 19:36:41 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on 192.168.1.123:37957 (size: 26.4 KB, free: 366.2 MB)
17/03/05 19:36:41 INFO SparkContext: Created broadcast 2 from broadcast at DAGScheduler.scala:996
17/03/05 19:36:41 INFO DAGScheduler: Submitting 2 missing tasks from ResultStage 1 (MapPartitionsRDD[5] at saveAsTextFile at wordcount.scala:18)
17/03/05 19:36:41 INFO TaskSchedulerImpl: Adding task set 1.0 with 2 tasks
17/03/05 19:36:41 INFO TaskSetManager: Starting task 0.0 in stage 1.0 (TID 2, localhost, executor driver, partition 0, ANY, 5827 bytes)
17/03/05 19:36:41 INFO TaskSetManager: Starting task 1.0 in stage 1.0 (TID 3, localhost, executor driver, partition 1, ANY, 5827 bytes)
17/03/05 19:36:41 INFO Executor: Running task 1.0 in stage 1.0 (TID 3)
17/03/05 19:36:41 INFO Executor: Running task 0.0 in stage 1.0 (TID 2)
17/03/05 19:36:41 INFO ShuffleBlockFetcherIterator: Getting 2 non-empty blocks out of 2 blocks
17/03/05 19:36:41 INFO ShuffleBlockFetcherIterator: Getting 2 non-empty blocks out of 2 blocks
17/03/05 19:36:41 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 22 ms
17/03/05 19:36:41 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 38 ms
17/03/05 19:36:41 INFO FileOutputCommitter: File Output Committer Algorithm version is 1
17/03/05 19:36:41 INFO FileOutputCommitter: File Output Committer Algorithm version is 1
17/03/05 19:36:41 INFO FileOutputCommitter: Saved output of task 'attempt_20170305193638_0001_
m_000000_2' to file:/home/spb/data/output8/_temporary/0/task_20170305193638_0001_m_000000
17/03/05 19:36:41 INFO SparkHadoopMapRedUtil: attempt_20170305193638_0001_m_
000000_2: Committed
17/03/05 19:36:41 INFO FileOutputCommitter: Saved output of task 'attempt_20170305193638_0001_
m_000001_3' to file:/home/spb/data/output8/_temporary/0/task_20170305193638_0001_m_000001
17/03/05 19:36:41 INFO Executor: Finished task 0.0 in stage 1.0 (TID 2). 1977 bytes result sent to driver
17/03/05 19:36:41 INFO SparkHadoopMapRedUtil: attempt_20170305193638_0001_m_
000001_3: Committed
17/03/05 19:36:41 INFO Executor: Finished task 1.0 in stage 1.0 (TID 3). 1890 bytes result sent to driver
17/03/05 19:36:41 INFO TaskSetManager: Finished task 0.0 in stage 1.0 (TID 2) in 519 ms on localhost (executor driver) (1/2)
17/03/05 19:36:41 INFO TaskSetManager: Finished task 1.0 in stage 1.0 (TID 3) in 519 ms on localhost (executor driver) (2/2)
17/03/05 19:36:41 INFO DAGScheduler: ResultStage 1 (saveAsTextFile at wordcount.scala:18) finished in 0.532 s
17/03/05 19:36:41 INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool
17/03/05 19:36:41 INFO DAGScheduler: Job 0 finished: saveAsTextFile at wordcount.scala:18, took 2.968627 s
17/03/05 19:36:42 INFO SparkContext: Invoking stop() from shutdown hook
17/03/05 19:36:42 INFO SparkUI: Stopped Spark web UI at http://192.168.1.123:4040
17/03/05 19:36:42 INFO MapOutputTrackerMasterEndpoint
: MapOutputTrackerMasterEndpoint stopped!
17/03/05 19:36:42 INFO MemoryStore: MemoryStore cleared
17/03/05 19:36:42 INFO BlockManager: BlockManager stopped
17/03/05 19:36:42 INFO BlockManagerMaster: BlockManagerMaster stopped
17/03/05 19:36:42 INFO OutputCommitCoordinator$
OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
17/03/05 19:36:42 INFO SparkContext: Successfully stopped SparkContext
17/03/05 19:36:42 INFO ShutdownHookManager: Shutdown hook called
17/03/05 19:36:42 INFO ShutdownHookManager: Deleting directory /tmp/spark-58cba0fb-b20f-46f4-
bb17-3201f9dec45b
spb@spb-VirtualBox:~/Scala_wc_
project/target/scala-2.11$
NOTE:
             --class: The entry point for your wordcount application
             --master: The master URL for the cluster/local (e.g. spark://23.195.26.187:7077)
---------------------------------------------
 Step 9:  Verify  the wordcount output file  as mentioned in previous step.

These examples give a quick overview of the Spark API. Spark is built on the concept of distributed datasets, which contain arbitrary Java or Python objects. You create a dataset from external data, then apply parallel operations to it. 

                                                                                  -----------------THE END -------------------