Thursday, May 11, 2017

Casbah - Scala toolkit for MongoDB

 Casbah is a Scala toolkit for MongoDB  and it  integrates a layer on top of the official mongo-java-driver for better integration with Scala.

The recommended way to get started is with a dependency management system. 

 libraryDependencies += "org.mongodb" %% "casbah" % "3.1.1"

Casbah is MongoDB project and will continue to improve the interaction of Scala + MongoDB.

Add import:
import com.mongodb.casbah.Imports._

 ---------------------------------------------

You could get the source from :
https://github.com/alvinj/ScalaCasbahConnections

Then you could modify your
-----------------
spb@spb-VirtualBox:~/
mongoConnector/ScalaCasbahConnections$ cat build.sbt
organization := "com.alvinalexander"

name := "ScalatraCasbahMongo"

version := "0.1.0-SNAPSHOT"

scalaVersion := "2.11.8"

libraryDependencies += "org.mongodb" %% "casbah" % "3.1.1"

libraryDependencies += "com.mongodb.casbah" % "casbah-gridfs_2.8.1" % "2.1.5-1"

libraryDependencies += "org.slf4j" % "slf4j-log4j12" % "1.7.24"

resolvers += "Sonatype OSS Snapshots" at "http://oss.sonatype.org/content/repositories/snapshots/"
spb@spb-VirtualBox:~/mongoConnector/ScalaCasbahConnections$

spb@spb-VirtualBox:~/
mongoConnector/ScalaCasbahConnections$ sbt run
[info] Loading project definition from /home/spb/mongoConnector/ScalaCasbahConnections/project
[info] Set current project to ScalatraCasbahMongo (in build file:/home/spb/mongoConnector/ScalaCasbahConnections/)
[info] Compiling 1 Scala source to /home/spb/mongoConnector/ScalaCasbahConnections/target/scala-2.11/classes...
[warn] there was one deprecation warning; re-run with -deprecation for details
[warn] one warning found
[info] Running casbahtests.MainDriver
debug: a
log4j:WARN No appenders could be found for logger (com.mongodb.casbah.commons.conversions.scala.RegisterConversionHelpers$).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
debug: b
debug: c
debug: d
debug: e
debug: f
debug: g
debug: h
debug: i
debug: j
debug: k
debug: l
debug: m
debug: n
debug: o
debug: p
debug: q
debug: r
debug: s
debug: t
debug: u
debug: v
debug: w
debug: x
debug: y
debug: z
sleeping at the end
  sleeping: 1
  sleeping: 2
  sleeping: 3
  sleeping: 4
  sleeping: 5
  sleeping: 6
  sleeping: 7
  sleeping: 8
  sleeping: 9
  sleeping: 10
  sleeping: 11
  sleeping: 12
  sleeping: 13
  sleeping: 14
  sleeping: 15
  sleeping: 16
  sleeping: 17
  sleeping: 18
  sleeping: 19
  sleeping: 20
  sleeping: 21
  sleeping: 22
  sleeping: 23
  sleeping: 24
  sleeping: 25
  sleeping: 26
  sleeping: 27
  sleeping: 28
  sleeping: 29
  sleeping: 30
game over
[success] Total time: 62 s, completed 13 Mar, 2017 5:37:31 PM
spb@spb-VirtualBox:~/mongoConnector/ScalaCasbahConnections$
spb@spb-VirtualBox:~/mongoConnector/ScalaCasbahConnections$ sbt package
[info] Loading project definition from /home/spb/mongoConnector/ScalaCasbahConnections/project
[info] Set current project to ScalatraCasbahMongo (in build file:/home/spb/mongoConnector/ScalaCasbahConnections/)
[info] Packaging /home/spb/mongoConnector/ScalaCasbahConnections/target/scala-2.11/scalatracasbahmongo_2.11-0.1.0-SNAPSHOT.jar ...
[info] Done packaging.
[success] Total time: 1 s, completed 13 Mar, 2017 5:54:42 PM
spb@spb-VirtualBox:~/mongoConnector/ScalaCasbahConnections$
------------------------------------------------------------


spb@spb-VirtualBox:~/Scala_project$ mongo
MongoDB shell version: 3.2.12
connecting to: test
Server has startup warnings:
> show dbs
local  0.000GB
mydb   0.000GB
> show dbs
finance  0.000GB
local    0.000GB
mydb     0.000GB
> show collections
> use finance
switched to db finance
> show collections
stocks
> db.stocks.find()
{ "_id" : ObjectId("58cd184edffa1f1829bfbc94"), "name" : "a", "symbol" : "a" }
{ "_id" : ObjectId("58cd184fdffa1f1829bfbc95"), "name" : "b", "symbol" : "b" }
{ "_id" : ObjectId("58cd1850dffa1f1829bfbc96"), "name" : "c", "symbol" : "c" }
{ "_id" : ObjectId("58cd1851dffa1f1829bfbc97"), "name" : "d", "symbol" : "d" }
{ "_id" : ObjectId("58cd1852dffa1f1829bfbc98"), "name" : "e", "symbol" : "e" }
{ "_id" : ObjectId("58cd1853dffa1f1829bfbc99"), "name" : "f", "symbol" : "f" }
{ "_id" : ObjectId("58cd1854dffa1f1829bfbc9a"), "name" : "g", "symbol" : "g" }
{ "_id" : ObjectId("58cd1855dffa1f1829bfbc9b"), "name" : "h", "symbol" : "h" }
{ "_id" : ObjectId("58cd1856dffa1f1829bfbc9c"), "name" : "i", "symbol" : "i" }
{ "_id" : ObjectId("58cd1857dffa1f1829bfbc9d"), "name" : "j", "symbol" : "j" }
{ "_id" : ObjectId("58cd1858dffa1f1829bfbc9e"), "name" : "k", "symbol" : "k" }
{ "_id" : ObjectId("58cd1859dffa1f1829bfbc9f"), "name" : "l", "symbol" : "l" }
{ "_id" : ObjectId("58cd185adffa1f1829bfbca0"), "name" : "m", "symbol" : "m" }
{ "_id" : ObjectId("58cd185bdffa1f1829bfbca1"), "name" : "n", "symbol" : "n" }
{ "_id" : ObjectId("58cd185cdffa1f1829bfbca2"), "name" : "o", "symbol" : "o" }
{ "_id" : ObjectId("58cd185ddffa1f1829bfbca3"), "name" : "p", "symbol" : "p" }
{ "_id" : ObjectId("58cd185edffa1f1829bfbca4"), "name" : "q", "symbol" : "q" }
{ "_id" : ObjectId("58cd185fdffa1f1829bfbca5"), "name" : "r", "symbol" : "r" }
{ "_id" : ObjectId("58cd1860dffa1f1829bfbca6"), "name" : "s", "symbol" : "s" }
{ "_id" : ObjectId("58cd1861dffa1f1829bfbca7"), "name" : "t", "symbol" : "t" }
Type "it" for more
>
-------------------------

----------------

There are two ways of getting the data from MongoDB to Apache Spark.
Method 1: Using Casbah (Layer on MongDB Java Driver)
val uriRemote = MongoClientURI("mongodb://RemoteURL:27017/")
val mongoClientRemote =  MongoClient(uriRemote)
val dbRemote = mongoClientRemote("dbName")
val collectionRemote = dbRemote("collectionName")
val ipMongo = collectionRemote.find
val ipRDD = sc.makeRDD(ipMongo.toList)
ipRDD.saveAsTextFile("hdfs://path/to/hdfs")

Method 2: Spark Worker at our use
Better version of code: Using Spark worker and multiple core to use to get the data in short time.

val config = new Configuration()
config.set("mongo.job.input.format","com.mongodb.hadoop.MongoInputFormat")
config.set("mongo.input.uri", "mongodb://RemoteURL:27017/dbName.collectionName")
val keyClassName = classOf[Object]
val valueClassName = classOf[BSONObject]
val inputFormatClassName = classOf[com.mongodb.hadoop.MongoInputFormat]
val ipRDD = sc.newAPIHadoopRDD(config,inputFormatClassName,keyClassName,valueClassName)
ipRDD.saveAsTextFile("hdfs://path/to/hdfs")

---------------------------------------------------------------
Reference:

https://web.archive.org/web/20120402085626/http://api.mongodb.org/scala/casbah/current/setting_up.html#setting-up-sbt



Friday, May 5, 2017

Spectrum LSF -Useful Job Submission Scripts

IBM® Spectrum LSF (formerly IBM® Platform™ LSF®) is a complete workload management solution for demanding HPC environments. Featuring intelligent, policy-driven scheduling and easy to use interfaces for job and workflow management, it helps organizations to improve competitiveness by accelerating research and design while controlling costs through superior resource utilization.
source:https://portal.ictp.it/icts/manuals/lsf6/A_terms.html

After  installation of Spectrum LSF  as  per the instructions at link , you could verify the cluster by issuing  lsid , lshosts  and bhosts commands.

[lsfadmin@host_machine2 test]$ lsid
IBM Spectrum LSF Community Edition 10.1.0.0, Jun 15 2016
Copyright IBM Corp. 1992, 2016. All rights reserved.
US Government Users Restricted Rights - Use, duplication or disclosure restricted by GSA ADP Schedule Contract with IBM Corp.
My cluster name is CI_cluster1
My master name is host_machine2
----------------------------------------------------
[lsfadmin@host_machine2 test]$ lshosts
HOST_NAME      type               model  cpuf ncpus maxmem maxswp server RESOURCES
host_machine2   LINUXPP   POWER8 250.0    20   256G      -             Yes (mg)
host_machine3   LINUXPP   POWER8 250.0    20   256G      -             Yes ()
host_machine1   LINUXPP   POWER8 250.0    20   256G      -             Yes ()

[lsfadmin@host_machine2 test]$ bhosts
HOST_NAME          STATUS       JL/U    MAX  NJOBS    RUN  SSUSP  USUSP    RSV
host_machine1                         ok              -     20                   0      0               0          0      0
host_machine2                         ok              -     20                   0      0               0          0      0
host_machine3                         ok              -     20                   0      0               0          0      0
------------------------------------------------------
Now LSF cluster is ready to submit a job .  bsub - Submits a job for execution and assigns it a unique numerical job ID. You can build a job file one line at a time, or create it from another file, by running bsub without specifying a job to submit. When you do this, you start an interactive session in which bsub reads command lines from the standard input and submits them as a single batch job. You are prompted with bsub> for each line.
Example :
[lsfadmin@host_machine2 test]$ bsub
bsub> sleep 100
bsub> Job <1588> is submitted to default queue <normal>.

[lsfadmin@host_machine2 test]$ bjobs
JOBID   USER    STAT  QUEUE      FROM_HOST   EXEC_HOST   JOB_NAME   SUBMIT_TIME
1588    lsfadmi      RUN   normal    host_machine2   host_machine1          sleep 100        May  5 05:59
[lsfadmin@host_machine2 test]$

NOTE: where job <1588> was submitted to the normal queue (default)
--------------------------------------------------
Next,  submit a job  to  another queue  (option -q)  from the list of bqueues with  job name (option -J)  as "job_sachin"  on  host1 (option -m) 

[lsfadmin@host_machine2 test]$ bsub -J job_sachin -q short -m host_machine2
bsub> sleep 100
bsub> Job <1590> is submitted to queue <short>.

[lsfadmin@host_machine2 test]$ bjobs -w
JOBID   USER    STAT  QUEUE      FROM_HOST   EXEC_HOST   JOB_NAME   SUBMIT_TIME
1590    lsfadmin   RUN   short       host_machine2     host_machine2         job_sachin      May  5 06:07
[lsfadmin@host_machine2 test]$

NOTE: where job_sachin (JOBID 1590)  was submitted  to the queue "short"  and  running on the host2
----------------------------------------------------------
Next , lets create a output file with option "-o"

[lsfadmin@host_machine2 new]$ bsub -J job_sachin -q short -m host_machine2 -o output_file
bsub> sleep 60
bsub> hostname
bsub> Job <1591> is submitted to queue <short>.
[lsfadmin@host_machine2 new]$ bjobs -w
JOBID   USER    STAT  QUEUE      FROM_HOST   EXEC_HOST   JOB_NAME   SUBMIT_TIME
1591    lsfadmin RUN   short         host_machine2      host_machine2       job_sachin    May  5 06:18

NOTE: after execution of job , it will create the output_file with all details shown below :
[lsfadmin@host_machine2 new]$ ls
output_file
[lsfadmin@host_machine2 new]$ cat output_file
host_machine2
------------------------------------------------------------
Sender: LSF System <lsfadmin@host_machine2>
Subject: Job 1591: <job_sachin> in cluster <CI_cluster1> Done
Job <job_sachin> was submitted from host <host_machine2> by user <lsfadmin> in cluster <CI_cluster1>.
Job was executed on host(s) <host_machine2>, in queue <short>, as user <lsfadmin> in cluster <CI_cluster1>.
</home/lsfadmin> was used as the home directory.
</home/lsfadmin/test/new> was used as the working directory.
Started at Results reported on
Your job looked like:
# LSBATCH: User input
sleep 60
hostname
Successfully completed.
Resource usage summary:
    CPU time :                                   0.34 sec.
    Max Memory :                             12 MB
    Average Memory :                       12.00 MB
    Total Requested Memory :           -
    Delta Memory :                            -
    Max Swap :                                 344 MB
    Max Processes :                              4
    Max Threads :                                 5
    Run time :                                     62 sec.
    Turnaround time :                         61 sec.
The output (if any) is above this job summary.
-----------------------------------------------------------------------------------
 

Lets see both output_file and err_file  by submitting wrong command "hostgame" instead of "hostname"
[lsfadmin@host_machine2 new]$ bsub -J job_sachin -q short -m host_machine2 -o output_file -e err_file
bsub> sleep 60
bsub> hostgame
bsub> Job <1592> is submitted to queue <short>.

[lsfadmin@host_machine2 new]$ bjobs -w
JOBID   USER    STAT  QUEUE      FROM_HOST   EXEC_HOST   JOB_NAME   SUBMIT_TIME
1592    lsfadmin RUN   short      host_machine2   host_machine2   job_sachin May  5 06:24

[lsfadmin@host_machine2 new]$ ls
err_file  output_file

[lsfadmin@host_machine2 new]$ cat err_file
/home/lsfadmin/.lsbatch/1493979887.1592.shell: line 2: hostgame: command not found
[lsfadmin@host_machine2 new]$

[lsfadmin@host_machine2 new]$ cat output_file
Sender: LSF System <lsfadmin@host_machine2>
Subject: Job 1592: <job_sachin> in cluster <CI_cluster1> Exited

Job <job_sachin> was submitted from host <host_machine2> by user <lsfadmin> in cluster <CI_cluster1>.
Job was executed on host(s) <host_machine2>, in queue <short>, as user <lsfadmin> in cluster <CI_cluster1>.
</home/lsfadmin> was used as the home directory.
</home/lsfadmin/test/new> was used as the working directory.
Started at Results reported on
Your job looked like:
# LSBATCH: User input
sleep 60
hostgame
Exited with exit code 127.
Resource usage summary:
    CPU time :                                   0.34 sec.
    Max Memory :                              12 MB
    Average Memory :                        12.00 MB
    Total Requested Memory :                     -
    Delta Memory :                               -
    Max Swap :                                   344 MB
    Max Processes :                              4
    Max Threads :                                5
    Run time :                                       74 sec.
    Turnaround time :                           61 sec.

The output (if any) is above this job summary.
PS:
Read file <err_file> for stderr output of this job.


NOTE: If you don't mention the error file , you will see both output & error in same output file . If LSF  job runs on remote host , then you need to redirect the output from other machine by using the job directive   #BSUB -f " outputfile < outputfile"
---------------------------------------------------------
Now ,  we can write a small script  which could be submitted as a LSF job. You can redirect a script to the standard input of the bsub command as shown here:  Create a file submit.lsf  
-------------------------------------------------------------
[lsfadmin@host_machine2 new]$ cat submit.lsf
#BSUB -n 12
#BSUB -R "span[ptile=4]"                 # Where X is in the set {1..X}
#BSUB -J job_sachin          # Job Name
#BSUB -outdir "outputdir/%J_%I"
#BSUB -o outputfile
#BSUB -f " outputfile < outputfile"
#BSUB -q short               # Which queue to use {short, long, parallel, GPU, interactive}
#BSUB -W 0:55                # How much time does your job need (HH:MM)
#BSUB -L /bin/sh             # Shell to use
sleep 30
/opt/xyz/spectrum_mpi/bin/mpirun hostname
----------------------------------------------------------------------------------------------
NOTE: after the job submission  you will see 4 tasks running on each node due to ptile=4  for total 12 processes. It will create an output directory  with outfile

[lsfadmin@host_machine2 new]$ bsub < submit.lsf
Job <1596> is submitted to queue <short>.
[lsfadmin@host_machine2 new]$ bjobs
JOBID   USER    STAT  QUEUE      FROM_HOST   EXEC_HOST   JOB_NAME   SUBMIT_TIME
1596    lsfadmi       RUN   short      host_machine2   host_machine1   job_sachin May  5 06:48
                                                                                    host_machine1
                                                                                    host_machine1
                                                                                    host_machine1
                                                                                    host_machine2
                                                                                    host_machine2
                                                                                    host_machine2
                                                                                    host_machine2
                                                                                    host_machine3
                                                                                    host_machine3
                                                                                    host_machine3
                                                                                    host_machine3
[lsfadmin@host_machine2 new]$ ls
outputdir  submit.lsf
[lsfadmin@host_machine2 new]$

[lsfadmin@host_machine2 new]$ cd outputdir/1596_0/
[lsfadmin@host_machine2 1596_0]$ ls
outputfile
[lsfadmin@host_machine2 1596_0]$ cat outputfile
Sender: LSF System <lsfadmin@host_machine1>
Subject: Job 1596: <job_sachin> in cluster <CI_cluster1> Done

Job <job_sachin> was submitted from host <host_machine2> by user <lsfadmin> in cluster <CI_cluster1>.
Job was executed on host(s) <4*host_machine1>, in queue <short>, as user <lsfadmin> in cluster <CI_cluster1>.
                                                <4*host_machine2>
                                                <4*host_machine3>

</home/lsfadmin> was used as the home directory.
</tmp> was used as the working directory.
Started at Results reported on
Your job looked like:

host_machine1
host_machine1
host_machine1
host_machine1
host_machine3
host_machine3
host_machine3
host_machine3
host_machine2
host_machine2
host_machine2
host_machine2

----------------
# LSBATCH: User input
#BSUB -n 12
#BSUB -R "span[ptile=4]"                 # Where X is in the set {1..X}
#BSUB -J job_sachin          # Job Name
#BSUB -outdir "outputdir/%J_%I"
#BSUB -o outputfile
#BSUB -f " outputfile < outputfile"
#BSUB -q short               # Which queue to use {short, long, parallel, GPU, interactive}
#BSUB -W 0:55                # How much time does your job need (HH:MM)
#BSUB -L /bin/sh             # Shell to use
sleep 30
/opt/xyz/spectrum_mpi/bin/mpirun hostname
Successfully completed.
Resource usage summary:
    CPU time :                                   0.73 sec.
    Max Memory :                             12 MB
    Average Memory :                       11.67 MB
    Total Requested Memory :                     -
    Delta Memory :                               -
    Max Swap :                                   344 MB
    Max Processes :                              4
    Max Threads :                                5
    Run time :                                   31 sec.
    Turnaround time :                            43 sec.
The output (if any) is above this job summary.

-------------------------------------------------------------------------------------

You want LSF to wait until a job finishes before accepting new submissions.Submt the job with bsub -K. This will have LSF wait until the job finishes before it accepts another job.

[lsfadmin@host_machine2 test]$ cat our_wait.sh
#!/bin/bash
bsub -K -L /bin/bash < job2.sh &
bsub_pid=$!

if [ -e /NFSshare/lsf_logs ]
then
     echo "FileName - Found"
else
   echo "FileName - Not found,  wait for sometime"
   sleep 3
fi
echo "Finally,  lsf_logs now exists!!!"

tail -f /NFSshare/lsf_logs &
tail_pid=$!
wait $bsub_pid
kill $tail_pid
--------------------------------------------
where:  job2.sh
#BSUB -n 10
#BSUB -R "span[ptile=4]"
#BSUB -J job_sachin
#BSUB -o /NFSshare/lsf_logs
#BSUB -R "rusage[mem=6000]"
 hostname
----------------------------------------
The “bpeek” command displays the stdout and stderr of a job while it is running. Usually this is only the most recent 10 lines of output. If you use the “-f” option, “bpeek” will continue to show additional lines as they are produced. It uses the “tail –f” command to do this, so you can stop the display of the output at any time by using <Ctrl-C>.

[lsfadmin@host_machine2 test]$ cat my_wait.sh
#!/bin/bash
bsub -K -L /bin/bash < my_script.sh &
bsub_pid=$!
bpeek -f -J job_sachin &
tail_pid=$!
wait $bsub_pid
kill $tail_pid

---------------------------------------------

How to submit the interactive jobs  ?
You can also submit an interactive job using a pseudo-terminal with shell mode support. When you specify the -Ip option, bsub submits a batch interactive job and creates a pseudo-terminal when the job starts. 

bsub -Ip -q interactive_queue  -J job_sachin -n 8 -L /bin/bash -R "span[ptile=4]" "sh $HOME/bin/my_test.sh $1"

-----------------------------THE END ----------------------------------------
Reference :
https://www-03.ibm.com/systems/uk/spectrum-computing/products/lsf/
https://www.ibm.com/support/knowledgecenter/en/SSWRJV_10.1.0/lsf_welcome/lsf_welcome.html
https://www.ibm.com/developerworks/community/wikis/home?lang=en#!/wiki/New%20IBM%20Platform%20LSF%20Wiki

Tuesday, April 18, 2017

Text Mining(TM) with an example of WordCloud on RStudio

It is estimated that major part of useable business information is unstructured, often in the form of text data. Text mining provides a collection of methods that help us to derive actionable insights from these data. 

The main package to perform text mining tasks in R is tm .The structure for managing documents in tm is  Corpus, representing a collection of text documents. Or "A corpus is a large body of natural language text used for accumulating statistics on natural language text. The plural is corpora. A lexicon is a collection of information about the words of a language about the lexical categories to which they belong. A lexicon is usually structured as a collection of lexical entries like same word used for verb, Noun and adjectives.

Transformations:
Once we have a corpus we typically want to modify the documents in it, e.g., stemming, stopword removal…etc.  In tm, all this functionality is subsumed into the concept of a transformation. Transformations are done via the tm_map() function which applies (maps) a function to all elements of the corpus. Basically, all transformations work on single text documents and tm_map() just applies them to all documents in a corpus.

Eliminating Extra Whitespace
> sample <- tm_map(sample, stripWhitespace)

Convert to Lower Case
> sample <- tm_map(sample, content_transformer(tolower))

Remove Stopwords
> sample <- tm_map(sample, removeWords, stopwords("english"))

Stemming is done by:
> sample <- tm_map(sample, stemDocument)
 
 ------------------------------------------------------------------------------
Wordcloud _example_1: 

Step 1 : Install package "tm"


Step 2:  Install package "RColorBrewer"


Step 3 : Install package wordCloud 


Step 4 :  Load Libraries 

Step 5 : Execute the  R script :
-------------------------------------------------------------------------------------------------------
my_data_file = readLines("/home/spb/data/input.txt")

library(tm)
myCorpus = Corpus(VectorSource(my_data_file))

myCorpus = tm_map(myCorpus, tolower)
myCorpus = tm_map(myCorpus, removePunctuation)
myCorpus = tm_map(myCorpus, removeNumbers)
myCorpus = tm_map(myCorpus, removeWords, stopwords("english"))

myTDM = TermDocumentMatrix(myCorpus, control = list(minWordLength = 1))

m = as.matrix(myTDM)

v = sort(rowSums(m), decreasing = TRUE)
library(wordcloud)
set.seed(4363)

wordcloud(names(v), v, min.freq = 50) 
-----------------------------------------------------------------------------------------------------
 Step 6 :  wordcloud visualization :
 


--------------------------------------------------------------------------------------- 
Wordcloud _example_2:
wordcloud(names(v), v, min.freq = 50, colors=brewer.pal(7, "Dark2"), random.order = TRUE) 


 -------------------------------------
Wordcloud _example_3: 
wordcloud(names(v), v, min.freq = 50, colors=brewer.pal(7, "Dark2"), random.order = FALSE)