Tuesday, August 30, 2016

JENKINS inside DOCKER container - Building continous Integration

 Docker is an open-source project that automates the deployment of applications inside software containers, by providing an additional layer of abstraction and automation of operating-system-level virtualization on Linux

Jenkins: The leading open source automation server, Jenkins provides hundreds of plugins to support building, deploying and automating any project.

Docker is the open platform to build, ship and run distributed applications, anywhere. At the core of the Docker solution is a registry service to manage images and the Docker Engine to build, ship and and run application containers. Continuous Integration (CI) and Continuous Delivery (CD) practices have emerged as the standard for modern software testing and delivery. The Docker solution accelerates and optimizes CI/CD pipelines, while maintaining a clear separation of concerns between the development and operations teams.

 NOTE: Get 2.7.2 LTS (Long-Term Support) .war or the latest 2.20 weekly release .

Step 1 : Search for a docker-io   after "yum update"

                      yum search docker

 NOTE:      docker.io is maintained by Ubuntu
                   docker-engine is maintained by Docker
cat /etc/yum.repos.d/docker.repo
name=Docker Repository

Step2 :  Get the information on docke-io

                   yum info docker-io
Step 3 :  Install docker

                yum install docker-io
Step 4:  Check  the rpm package installed

              rpm -qa | grep docker

              yum list installed | grep docker
             docker-io.x86_64                       1.7.1-2.el6                @epel

Step 5:  Start docker

              service docker start
Step 6:  Check docker process

               ps -ef | grep docker
Step 7:  Check docker version

           docker version
Step 8:  Verify docker is installed correctly by running a test image in a container.

[root@xyz sachin]# docker run hello-world

Hello from Docker!
This message shows that your installation appears to be working correctly.

To generate this message, Docker took the following steps:
 1. The Docker client contacted the Docker daemon.
 2. The Docker daemon pulled the "hello-world" image from the Docker Hub.
 3. The Docker daemon created a new container from that image which runs the
    executable that produces the output you are currently reading.
 4. The Docker daemon streamed that output to the Docker client, which sent it
    to your terminal.

To try something more ambitious, you can run an Ubuntu container with:
 $ docker run -it ubuntu bash

Share images, automate workflows, and more with a free Docker Hub account:

For more examples and ideas, visit:

[root@xyz sachin]#

Step 9:   Get Jenkins Image

                docker pull jenkins

Step 10:  Check the list of images

[root@xyz ~]# docker images
REPOSITORY          TAG                 IMAGE ID            CREATED             VIRTUAL SIZE
jenkins             latest              6785d71c011c        2 weeks ago         715.3 MB
hello-world         latest              f0cb9bdcaa69        8 weeks ago         1.848 kB
[root@xyz ~]#

Step 11 : Clean up all previous  containers :
[root@xyz sachin]# docker stop master1
[root@xyz sachin]# docker rm master1
[root@xyz sachin]# docker rm b5d566eb56a8
[root@xyz sachin]# docker rm 2aed2624515d
[root@xyz sachin]#

[root@xyz sachin]# docker ps -a
CONTAINER ID        IMAGE               COMMAND             CREATED             STATUS              PORTS               NAMES
[root@xyz sachin]#
Step 12 : Remove the previously created Image

[root@xyz sachin]# docker images
REPOSITORY          TAG                 IMAGE ID            CREATED             VIRTUAL SIZE
newtest             latest              6f0bffe3609e        22 hours ago        715.3 MB
jenkins             latest              6785d71c011c        2 weeks ago         715.3 MB
jenkins             2.7.2               6785d71c011c        2 weeks ago         715.3 MB
hello-world         latest              f0cb9bdcaa69        8 weeks ago         1.848 kB
[root@xyz sachin]# docker rmi newtest
Untagged: newtest:latest
Deleted: 6f0bffe3609e15741c0a8a5635ff2ca530a8af0e573a5690b3291ffd76f6a6f9
Deleted: f955dee8ef822202fb0f34b9479a5406b157fe7a41f54f4b3ca6cb5ecc82ff1e
[root@xyz sachin]# docker images
REPOSITORY          TAG                 IMAGE ID            CREATED             VIRTUAL SIZE
jenkins             2.7.2               6785d71c011c        2 weeks ago         715.3 MB
jenkins             latest              6785d71c011c        2 weeks ago         715.3 MB
hello-world         latest              f0cb9bdcaa69        8 weeks ago         1.848 kB
[root@xyz sachin]#

Step 13:   Run the docker container

[root@xyz sachin]# docker run -p 8080:8080 --name=My_jenkins-master -d jenkins
[root@xyz sachin]#

[root@xyz sachin]# docker ps -a
CONTAINER ID        IMAGE               COMMAND                CREATED             STATUS              PORTS                               NAMES
961b31686b20        jenkins             "/bin/tini -- /usr/l   27 seconds ago      Up 25 seconds>8080/tcp, 50000/tcp   My_jenkins-master 
[root@xyz sachin]#

[root@xyz sachin]# docker stop My_jenkins-master
[root@xyz sachinpb]# docker rm My_jenkins-master
[root@xyz sachinpb]# docker ps -a
CONTAINER ID        IMAGE               COMMAND             CREATED             STATUS              PORTS               NAMES
[root@xyz sachinpb]#

Step 14:  Create a dockerfile

[root@xyz sachin]# cat Dockerfile
FROM jenkins:2.7.2
ENV JAVA_OPTS="-Xmx8192m"
[root@xyz sachin]#

step 15: Build docker Image
[root@xyz sachin]#     docker build -t myjenkins .
Sending build context to Docker daemon 2.048 kB
Sending build context to Docker daemon
Step 0 : FROM jenkins:2.7.2
 ---> 6785d71c011c
Step 1 : MAINTAINER spb
 ---> Running in 2e9649b9d84c
 ---> 1b5e85ca936e
Removing intermediate container 2e9649b9d84c
Step 2 : ENV JAVA_OPTS "-Xmx8192m"
 ---> Running in 96ef7fc01890
 ---> 7225f6c3c55b
Removing intermediate container 96ef7fc01890
Successfully built 7225f6c3c55b
[root@xyz sachin]#

Check  your new build Image  at repo :

[root@xyz sachin]# docker images
REPOSITORY          TAG                 IMAGE ID            CREATED             VIRTUAL SIZE
myjenkins           latest              7225f6c3c55b        12 seconds ago      715.3 MB
jenkins             2.7.2               6785d71c011c        2 weeks ago         715.3 MB
jenkins             latest              6785d71c011c        2 weeks ago         715.3 MB
hello-world         latest              f0cb9bdcaa69        8 weeks ago         1.848 kB
[root@xyz sachin]# docker ps -a
CONTAINER ID        IMAGE               COMMAND             CREATED             STATUS              PORTS               NAMES
[root@xyz sachin]#

Step 16:  Run docker container  created from new image (myjenkins) :

[root@xyz sachinpb]# docker run -p 8080:8080 --name=spb_master -d myjenkins
[root@xyz sachinpb]# docker ps -a
CONTAINER ID        IMAGE               COMMAND                CREATED             STATUS              PORTS                               NAMES
ceafb4599eba        myjenkins           "/bin/tini -- /usr/l   10 seconds ago      Up 8 seconds        50000/tcp,>8080/tcp   spb_master        
[root@xyz sachin]#


Step 17:  Get more information on newly built image  about  environment variables and other configurations  details.

[root@xyz sachin]# docker inspect ceafb4599eba
    "Id": "ceafb4599eba6f09a56d211aef21622608a9f9e62c49ce6c85db5bbbfdcca257",
    "Created": "2016-08-31T06:32:35.058410573Z",
    "Path": "/bin/tini",
    "Args": [
    "State": {
        "Running": true,
        "Paused": false,
        "Restarting": false,
        "OOMKilled": false,
        "Dead": false,
        "Pid": 11921,
        "ExitCode": 0,
        "Error": "",
        "StartedAt": "2016-08-31T06:32:36.935174498Z",
        "FinishedAt": "0001-01-01T00:00:00Z"

        "Env": [

[root@xyz sachin]#

Step 18 : To access the jenkin server , you need to unlock the Jenkins as shown below:

[root@xyz sachin]# docker exec ceafb4599eba cat /var/jenkins_home/secrets/initialAdminPassword
[root@xyz sachin]#

Organizations who leverage Docker as part of their continuous integration pipeline can expect to increase stability, reduce complexity, and increase the agility of their software development processes. Docker allows users to ensure consistent repeatable processes, while simultaneously
allowing for a reduction in the number of test images that need to be tracked and maintained. The lightweight nature of containers means they can be spun up faster, allowing for more rapid test cycles.

1) https://engineering.riotgames.com/news/putting-jenkins-docker-container
2) https://github.com/jenkinsci/docker/blob/master/Dockerfile
3) https://docs.docker.com/engine/installation/linux/rhel/
4) https://github.com/maxfields2000/dockerjenkins_tutorial

Sunday, November 22, 2015

Apache Spark

"Apache Spark" is an open-source data analytics cluster computing framework originally developed in the AMPLab at UC Berkeley in 2009 and became an Apache open-source project in 2010. Spark fits into the Hadoop open-source community, building on top of the Hadoop Distributed File System (HDFS).However, Spark is not tied to the two-stage MapReduce paradigm, and promises performance up to 100 times faster than Hadoop MapReduce for certain applications. Spark provides primitives for in-memory cluster computing that allows user programs to load data into a cluster's memory and query it repeatedly, making it well suited to machine learning algorithms.Spark became an Apache Top-Level Project in February 2014 and was previously an Apache Incubator project since June 2013. It has received code contributions from large companies that use Spark, including Yahoo! and Intel as well as small companies and startups. By March 2014, over 150 individual developers had contributed code to Spark, representing over 30 different companies. Prior to joining Apache Incubator, versions 0.7 and earlier were licensed under the BSD License.

History Of Apache Spark (source)
Apache Spark is a fast and general cluster computing system for Big Data. Apache Spark is more generalized system, where you can run both batch and streaming jobs at a time. It supersedes its predecessor MapReduce in speed by adding capabilities to process data faster in memory. It is also more efficient on disk. It leverages in memory processing using its basic data unit RDD (Resilient Distributed Dataset). These hold as much dataset as possible in memory for complete lifecycle of job hence saving on disk I/O.  Some data can get spilled over disk after memory upper limits. Spark offers development APIs for Java, Scala, Python and R languages and an optimized engine that supports general computation graphs for data analysis. It also supports a rich set of higher-level tools including Spark SQL for SQL and structured data processing, MLlib for machine learning, GraphX for graph processing, and Spark Streaming for stream processing. Spark runs on Hadoop YARN, Apache Mesos as well as it has its own standalone cluster manager.

Apache Spark Ecosystem (source)

Spark core API is base of Apache Spark framework, which handles job scheduling, task distribution, memory management, I/O operations and recovering from failures. Main logical data unit in spark is called RDD (Resilient Distributed Dataset), which stores data in distributed way to be processed parallel later. It lazily computes operations. Therefore, memory need not be occupied all the time, and other jobs can utilize it.

Many problems do not lend themselves to the two-step process of map and reduce . Spark can do map and reduce much faster than Hadoop can. One of the great things about Apache Spark is that it's a single environment and you have a single API from which you can call machine learning algorithms, or you can do graph processing or SQL. Spark's distributed data storage model, resilient distributed datasets (RDD), guarantees fault tolerance which in turn minimizes network I/O. RDDs achieve fault tolerance through a notion of lineage: if a partition of an RDD is lost, the RDD has enough information about how it was derived from other RDDs to be able to rebuild just that partition.So you don’t need to replicate data to achieve fault tolerance. In Spark MapReduce, mappers output is kept in OS buffer cache and reducers pull it to their side and write it directly to their memory, unlike Hadoop where output gets spilled to disk and read it again. Spark’s in memory cache makes it fit for machine learning algorithms where you need to use same data over and over again. Spark can run complex jobs, multiple steps data pipelines using Direct Acyclic Graph (DAGs). Spark is written in Scala and it runs on JVM (Java Virtual Machine).
Overview :
At a high level, every Spark application consists of a driver program that runs the user’s main function and executes various parallel operations on a cluster. The main abstraction Spark provides is a resilient distributed dataset (RDD), which is a collection of elements partitioned across the nodes of the cluster that can be operated on in parallel. RDDs are created by starting with a file in the Hadoop file system (or any other Hadoop-supported file system), or an existing Scala collection in the driver program, and transforming it. Users may also ask Spark to persist an RDD in memory, allowing it to be reused efficiently across parallel operations. Finally, RDDs automatically recover from node failures. Spark app(Driver) buils DAG()from RDD operations. DAG is split into tasks that are executed by workers as shown in the block diagram .
Spark internals(source)

 A second abstraction in Spark is shared variables that can be used in parallel operations. By default, when Spark runs a function in parallel as a set of tasks on different nodes, it ships a copy of each variable used in the function to each task. Sometimes, a variable needs to be shared across tasks, or between tasks and the driver program. Spark supports two types of shared variables: broadcast variables, which can be used to cache a value in memory on all nodes, and accumulators, which are variables that are only “added” to, such as counters and sums.

Spark core API: RDDs, transformations and actions
RDD (Resilient Distributed Dataset) is main logical data unit in Spark. An RDD is distributed collection of objects. Distributed means, each RDD is divided into multiple partitions. Each of these partitions can reside in memory or stored on disk of different machines in a cluster. RDDs are immutable (Read Only) data structure. You can’t change original RDD, but you can always transform it into different RDD with all changes you want. RDDs can be created by 2 ways:

1. Parallelizing existing collection.

2. Loading external dataset from HDFS (or any other HDFS supported file types).

Creating SparkContext

To execute any operation in spark, you have to first create object of SparkContext class. A SparkContext class represents the connection to our existing Spark cluster and provides the entry point for interacting with Spark. We need to create a SparkContext instance so that we can interact with Spark and distribute our jobs. Spark provides a rich set of operators to manipulate RDDs. RDD performs 2 operations mainly, transformations and actions.

Transformations:Transformations create new RDD from existing RDD like map, reduceByKey and filter we just saw. Transformations are executed on demand. That means they are computed lazily. We will see lazy evaluations more in details in next part.

Lineage Graph: RDDs maintain a graph of 1 RDD getting transformed into another called lineage graph, which helps Spark to recompute any intermediate RDD in case of failures. This way spark achieves fault tolerance.

Hadoop v/s Spark Fault Tolerance
Actions return final results of RDD computations. Actions triggers execution using lineage graph to load the data into original RDD, carry out all intermediate transformations and return final results to Driver program or write it out to file system.

RDD Transformation and action (source)
Basic setup instructions.

1)Building Spark
Spark is built using [Apache Maven](http://maven.apache.org/). To build Spark and its example programs, run:mvn -DskipTests clean package(You do not need to do this if you downloaded a pre-built package). More detailed documentation is available from the project site@ "http://spark.apache.or/docs/latest/building-spark.html".
2) Interactive Scala Shell: The easiest way to start using Spark is through the Scala shell:"./bin/spark-shell". Try the following command, which should return 1000:scala> sc.parallelize(1 to 1000).count()
3) Interactive Python Shell: Alternatively, if you prefer Python, you can use the Python shell: "./bin/pyspark ". And run the following command, which should also return 1000:    sc.parallelize(range(1000)).count()
4) Example Programs: Spark also comes with several sample programs in the `examples` directory.To run one of them, use "./bin/run-example <class> [params]". For example:"./bin/run-example SparkPi" will run the Pi example locally. You can set the MASTER environment variable when running examples to submit examples to a cluster. This can be a mesos:// or spark:// URL, "yarn-cluster" or "yarn-client" to run on YARN, and "local" to run locally with one thread, or "local[N]" to run locally with N threads. You can also use an abbreviated class name if the class is in the examples package.For instance:  MASTER=spark://host:7077" ./bin/run-example SparkPi" Many of the example programs print usage help if no params are given.
5)Running Tests:Testing requires [building Spark](#building-spark). Once Spark is built, tests can be run using:"./dev/run-tests". Steps to  run automated tests : "https://cwiki.apache.org/confluence/display/SPARK/Contributing+to+Spark#ContributingtoSpark-AutomatedTesting".
6) A Note About Hadoop Versions:
Spark uses the Hadoop core library to talk to HDFS and other Hadoop-supported storage systems. Because the protocols have changed in different versions of Hadoop, you must build Spark against the same version that your cluster runs

Please refer to the build documentation at "http://spark.apache.org/docs/latest/building-spark.html#specifying-the-hadoop-version" for detailed guidance on building for a particular distribution of Hadoop, including building for particular Hive and Hive Thriftserver distributions. See also "http://spark.apache.org/docs/latest/hadoop-third-party-distributions.html" for guidance on building a Spark application that works with a particular distribution.
7) Configuration: Please refer to the Configuration guide at "http://spark.apache.org/docs/latest/configuration.html" in the online documentation for an overview on how to configure Spark.

Apache Spark is a fast and general engine for large-scale data processing.

  • Run programs up to 100x faster than Hadoop MapReduce in memory, or 10x faster on diskWrite applications quickly in Java, Scala, Python, R
  • Combine SQL, streaming, and complex analytics
  • Spark runs on Hadoop, Mesos, standalone, or in the cloud.
  • It can access diverse data sources including HDFS, Cassandra, HBase, and S3.
Hadoop  v/s  Spark  Computation Model
Spark does not store all data in memory. But if data is in memory it makes best use of LRU cache to process it faster. It is 100x faster while computing data in memory and still faster on disk than Hadoop. Spark does not have its own storage system. It relies on HDFS for that. So, Hadoop MapReduce is still good for certain batch jobs, which does not include much data pipelining. New technology never completely replaces old one; they both would rather coexist.

Spark is not tied specifically to Hadoop. Although it does work with YARN, it can also work well with Apache Mesos and can also read data from Cassandra. So although Spark may become the real-time engine for Hadoop, it can also live independent of it, with users leveraging its related projects such as Spark SQL, Spark Streaming, and MLlib (Machine Learning). I think this capability means that Spark will soon become more important with Big Data developers and MapReduce will in turn become the solution for batch processing as opposed to the core paradigm for Hadoop. Specifically for batch use cases, MapReduce for now will be stronger than Spark, especially for very large datasets. - See more at: http://blog.gogrid.com/2014/07/15/mapreduce-dead/#sthash.gM4nEOrw.dpuf
  • https://spark.apache.org/
  • https://cwiki.apache.org/confluence/display/SPARK
  • http://spark.apache.org/documentation.html
  • http://data-informed.com/performing-mapreduce-in-memory-no-hadoop-needed/
  • http://stanford.edu/~rezab/sparkclass/slides/itas_workshop.pdf 
  • http://www.jorditorres.org/spark-ecosystem/
  • http://www.informationweek.com/big-data/big-data-analytics/will-spark-google-dataflow-steal-hadoops-thunder/a/d-id/1278959?page_number=2
  • https://github.com/SatyaNarayan1/spark-workshop/commit/caac3f9b7dd771c65d83398b57acc4e99876b62a 
  • http://www.edupristine.com/blog/apache-spark-vs-hadoop

Sunday, September 14, 2014

IBM - Blue Gene Q Supercomputer

Blue Gene is an IBM project aimed at designing supercomputers that can reach operating speeds in the PFLOPS (petaFLOPS) range, with low power consumption.The project created three generations of supercomputers, Blue Gene/L, Blue Gene/P, and Blue Gene/Q. 

Blue Gene systems have often led the TOP500 and Green500  rankings of the most powerful and most power efficient supercomputers, respectively. Blue Gene systems have also consistently scored top positions in the Graph500 list.

The third supercomputer design in the Blue Gene series, Blue Gene/Q has a peak performance 20 Petaflops, reaching LINPACK benchmarks performance of 17 Petaflops. United States supercomputer sat atop the TOP500 list in June 2012. Named Sequoia, the IBM BlueGene/Q system installed at the Department of Energy’s Lawrence Livermore National Laboratory achieved 16.32 petaflop/s performance running the Linpack benchmark using 1,572,864 cores. Sequoia was the first system to be built using more than one million cores.

Sequoia is primarily water cooled and consists of 96 racks; 98,304 compute nodes; 1.6 million cores; and 1.6 petabytes of memory. Sequoia is roughly 90 times more power efficient than Purple and about eight times more than BG/L relative to the peak speeds of these systems.Sequoia is dedicated to NNSA's Advanced Simulation and Computing (ASC) program.

 Blue Gene/Q  hardware Overview :
The Blue Gene/Q Compute chip is an 18 core chip. The 64-bit PowerPC A2 processor cores are 4-way simultaneously multithreaded, and run at 1.6 GHz. Each processor core has a SIMD Quad-vector double precision floating point unit (IBM QPX). 16 Processor cores are used for computing, and a 17th core for operating system assist functions such as interrupts, asynchronous I/O, MPI pacing and RAS. The 18th core is used as a redundant spare, used to increase manufacturing yield. The spared-out core is shut down in functional operation. 

The processor cores are linked by a crossbar switch to a 32 MB eDRAM L2 cache, operating at half core speed. The L2 cache is multi-versioned, supporting transactional memory and speculative execution, and has hardware support for atomic operations. L2 cache misses are handled by two built-in DDR3 memory controllers running at 1.33 GHz. The chip also integrates logic for chip-to-chip communications in a 5D torus configuration, with 2GB/s chip-to-chip links. The Blue Gene/Q chip is manufactured on IBM's copper SOI process at 45 nm. It delivers a peak performance of 204.8 GFLOPS at 1.6 GHz, drawing about 55 watts. The chip measures 19×19 mm (359.5 mm²) and comprises 1.47 billion transistors. The chip is mounted on a compute card along with 16 GB DDR3 DRAM (i.e., 1 GB for each user processor core).A Q32 compute drawer will have 32 compute cards, each water cooled.

A "midplane" (crate) of 16 compute drawers will have a total of 512 compute nodes, electrically interconnected in a 5D torus configuration (4x4x4x4x2). Beyond the midplane level, all connections are optical. Racks have two midplanes, thus 32 compute drawers, for a total of 1024 compute nodes, 16,384 user cores and 16 TB RAM.Separate I/O drawers, placed at the top of a rack or in a separate rack, are air cooled and contain 8 compute cards and 8 PCIe expansion slots for Infiniband or 10 Gigabit Ethernet networking.

Blue Gene Q Architecture

Compiler Invocation on Blue Gene/Q
To run a code on the compute nodes you must compile and link it using a "cross-compiler" on the front-end (login node). A cross-compiler produces an executable that will run on the compute nodes.

If you instead use a native compiler, the resulting executable will run on the front-end but not on the remote nodes. Also, you can use the native compilers to compile and link only serial code because there are no mpich libraries on the front-end.

Currently you should run your applications on the compute nodes only. The native compilers can be used for compiling and linking the serial portion of your parallel application.

The locations and names of the "unwrapped" cross compilers appear below. For each such there is a description of the corresponding mpich-wrapper cross compiler, which is just a wrapper script that makes the cross compiler a bit easier for the user to invoke.

If there is a Thread-safe version of a compiler, its invocation name will have a _r suffix, for example bgxlc_r is the thread-safe version of the unwrapped IBM C cross compiler bgxlc. 

Simple Example: Compile, Link, Run

Step 1 : Compile and link file hello.c which contains a C language MPI program:
                 mpixlc_r -o helloc hello.c
Step 2 : Submit the job interactively:
                 runjob --block BLOCKID --exe /home/spb/helloc -p 16 --np 2048 --env-all --cwd /home/spb> job.output 2>&1

Record-breaking science applications have been run on the BG/Q, the first to cross 10 petaflops of sustained performance. The cosmology simulation framework HACC achieved almost 14 petaflops with a 3.6 trillion particle benchmark run, while the Cardioid code, which models the electrophysiology of the human heart, achieved nearly 12 petaflops with a near real-time simulation, both on Sequoia. 

Examples of applications running on Blue Gene

A typical supercomputer consumes large amounts of electrical power, almost all of which is converted into heat due to thermal design power and CPU power dissipation issues. The packing of thousands of processors together inevitably generates significant amounts of heat density that need to be dealt with. In the Blue Gene system, IBM deliberately used low power processors to deal with heat density and  hot water cooling to achieve energy efficiency. The energy efficiency of computer systems is generally measured in terms of "FLOPS per Watt". In 2008 IBM's Blue Gene/Q reached 1684 MFLOPS/Watt. In June 2011 the top 2 spots on the Green 500 list were occupied by Blue Gene machines achieving 2097 MFLOPS/W.

Open-Source High-Availability with MySQL Database Fabric

MySQL Fabric is an integrated system for managing a collection of MySQL servers and is the framework on which high-availability and sharding is built. MySQL Fabric is open-source and is intended to be extensible, easy to use, and support procedure execution even in the presence of failure, an execution model we call resilient execution.

MySQL (My Sequel) is one of the world's most widely used open-source relational database management system (RDBMS) . MySQL is a relational database management system (RDBMS), and ships with no GUI tools to administer MySQL databases or manage data contained within the databases.The official set of MySQL front-end tools, MySQL Workbench is actively developed by Oracle, and is freely available for use.

Though MySQL began as a low-end alternative to more powerful proprietary databases, it has gradually evolved to support higher-scale needs as well. There are however limits to how far performance can scale on a single server ('scaling up'), so on larger scales, multi-server. MySQL ('scaling out') deployments are required to provide improved performance and reliability. A typical high-end configuration can include a powerful master database which handles data write operations and is replicated to multiple slaves that handle all read operations. The master server synchronizes continually with its slaves so in the event of failure a slave can be promoted to become the new master, minimizing downtime. Further improvements in performance can be achieved by caching the results from database queries in memory using memcached, or breaking down a database into smaller chunks called shards which can be spread across a number of distributed server clusters.

Ensuring high availability requires a certain amount of redundancy in the system. For database systems, the redundancy traditionally takes the form of having a primary server acting as a master, and using replication to keep secondaries available to take over in case the primary fails. This means that the "server" that the application connects to is in reality a collection of servers, not a single server. In a similar manner, if the application is using a sharded database, it is in reality working with a collection of servers, not a single server. In this case, a collection of servers is usually referred to as a farm. MySQL Fabric - an integrated framework for managing farms of MySQL servers with support for both high-availability and sharding. 

MySQL Fabric is an extensible framework for managing farms of MySQL Servers. Two features have been implemented - High Availability (HA) and scaling out using data sharding. These features can be used in isolation or in combination.

Introduction to Fabric :

To take advantage of Fabric, an application requires an augmented version of a MySQL connector which accesses Fabric using the XML-RPC protocol. MySQL Connectors are used by the application code to access the database(s), converting instructions from a specific programming language to the MySQL wire protocol, which is used to communicate with the MySQL Server processes. A ‘Fabric-aware’ connector stores a cache of the routing information that it has received from the mysqlfabric process and then uses that information to send transactions or queries to the correct MySQL Server. Currently the three supported Fabric-aware MySQL connectors are for PHP, Python and Java (and in turn the Doctrine and Hibernate Object-Relational Mapping frameworks).

Fabric manages sets of MySQL servers that have Global Transaction Identifiers (GTIDs) enabled to check and maintain consistency among servers. Sets of servers are called high-availability groups. Information about all of the servers and groups is managed by a separate MySQL instance, which cannot be a member of the Fabric high-availability groups. This server instance is called the backing store.

Fabric organizes servers in high-availability groups for managing different shards. For example, if standard asynchronous replication is in use, Fabric may be configured to automatically monitor the status of servers in a group. If the current master in a group fails, it elects a new one if a server in the group can become a master.

Besides the high-availability operations such as failover and switchover, Fabric also permits shard operations such as shard creation and removal.

Fabric is written in Python and includes a special library that implements all of the functionality provided. To interact with Fabric, a special utility named mysqlfabric provides a set of commands you can use to create and manage groups, define and manipulate sharding, and much more.

Both features are implemented in two layers:

The mysqlfabric process which processes any management requests. When using the HA feature, this process can also be made responsible for monitoring the master server and initiating failover to promote a slave to be the new master should it fail.

MySQL Fabric-aware connectors store a cache of the routing information that it has fetched from MySQL Fabric and then uses that information to send transactions or queries to the correct MySQL Server.

MySQL Fabric provides high availability and database sharding for MySQL Servers

High Availability

HA Groups are formed from pools of two or more MySQL Servers; at any point in time, one of those servers is the Primary (MySQL Replication master) and the others are Secondaries (MySQL Replication slaves). The role of a HA Group is to ensure that access to the data held within that group is always available.

While MySQL Replication allows the data to be made safe by duplicating it, for a HA solution two extra components are needed and MySQL Fabric provides these:
1) Failure detection and promotion - MySQL Fabric monitors the Primary within the HA group and should that server fail then it selects one of the Secondaries and promotes it to be the Primary
2) Routing of database requests - The routing of writes to the Primary and load balancing reads across the slaves is transparent to the application, even when the topology changes during failover.

Adding MySQL Servers to Create a HA Farm: At this point, MySQL Fabric is up and running but it has no MySQL Servers to manage. This figure shows the what the configuration will look like once MySQL Servers have been added to create a HA server farm.
Three MySQL Servers will make up the managed HA group – each running on a different machine
Sharding - Scaling out

When nearing the capacity or write performance limit of a single MySQL Server (or HA group), MySQL Fabric can be used to scale-out the database servers by partitioning the data across multiple MySQL Server "groups". Note that a group could contain a single MySQL Server or it could be a HA group.

The administrator defines how data should be sharded between these servers; indicating which table columns should be used as shard keys and whether HASH or RANGE mappings should be used to map from those keys to the correct shard as described below:

1) HASH: A hash function is run on the shard key to generate the shard number. If values held in the column used as the sharding key don’t tend to have too many repeated values then this should result in an even partitioning of rows across the shards.

2) RANGE: The administrator defines an explicit mapping between ranges of values for the sharding key and shards. This gives maximum control to the user of how data is partitioned and which rows should be co-located.

When the application needs to access the sharded database, it sets a property for the connection that specifies the sharding key – the Fabric-aware connector will then apply the correct range or hash mapping and route the transaction to the correct shard.

If further shards/groups are needed then MySQL Fabric can split an existing shard into two and then update the state-store and the caches of routing data held by the connectors. Similarly, a shard can be moved from one HA group to another.

The steps that follow evolve that configuration into one containing two shards as shown in the following figure.

Another HA group (group_id-2) is created, from three newly created MySQL Servers then one of the servers is promoted to be the Primary. At this point, the new HA group exists but is missing the application schema and data. Before allocating a shard to the group, a reset master needs to be executed on the Primary for the group (this is required because changes have already been made on that server – if nothing else, to grant permissions for one or more users to connect remotely). The mysqlfabric group lookup_server command is used to first check which of the three servers is currently the Primary. The next step is to split the existing shard, specifying the shard id and the name of the HA group where the new shard will be stored. Python code adds some new rows to the subscribers table  and the tables property for the connection is set  and the key to the value of the sub_no column for that table – this is enough information for the Fabric-aware connector to choose the correct shard/HA group and then the fact that the mode property is set to fabric. MODE_READWRITE further tells the connector that the transaction should be sent to the Primary within that HA group.The mysql client can then be used to confirm that the new data has also been partitioned between the two shards/HA groups.

Current Limitations

1) Sharding is not completely transparent to the application. While the application need not be aware of which server stores a set of rows and it doesn’t need to be concerned when that data is moved, it does need to provide the sharding key when accessing the database.
2) All transactions and queries need to be limited in scope to the rows held in a single shard, together with the global (non-sharded) tables. For example, Joins involving multiple shards are not supported.Because the connectors perform the routing function, the extra latency involved in proxy-based solutions is avoided but it does mean that Fabric-aware connectors are required – at the time of writing these exist for PHP, Python and Java
3) The MySQL Fabric process itself is not fault-tolerant and must be restarted in the event of it failing. Note that this does not represent a single-point-of-failure for the server farm (HA and/or sharding) as the connectors are able to continue routing operations using their local caches while the MySQL Fabric process is unavailable.
1) http://dev.mysql.com/doc/mysql-utilities/1.4/en/fabric-intro.html
2) http://en.wikipedia.org/wiki/MySQL
3) http://mysqlmusings.blogspot.in/2014/05/mysql-fabric-musings-release-1.4.3.html
4) http://mysqlmusings.blogspot.in/2013/09/brief-introduction-to-mysql-fabric.html 
5) http://www.mysql.com/products/enterprise/fabric.html
6) http://www.paranet.com/blog/bid/133845/Difference-between-Synchronous-Asynchronous-Replication-Table
7) http://www.clusterdb.com/mysql-fabric/mysql-fabric-adding-high-availability-and-scaling-to-mysql
8) http://www.evidian.com/products/high-availability-software-for-application-clustering/shared-nothing-cluster-vs-shared-disk-cluster/

Monday, September 8, 2014

Apache Mesos - Open Source Datacenter Computing

Apache Mesos is a cluster manager that provides efficient resource isolation and sharing across distributed applications or frameworks. Mesos is a open-source software originally developed at the University of California at Berkeley. It sits between the application layer and the operating system and makes it easier to deploy and manage applications in large-scale clustered environments more efficiently. It can run many applications on a dynamically shared pool of nodes. Prominent users of Mesos include Twitter, Airbnb, MediaCrossing, Xogito and Categorize.

Mesos leverages features of the modern kernel – “cgroups” in Linux, “zones” in Solaris – to provide isolation for CPU, memory, I/O, file system, rack locality, etc. The big idea is to make a large collection of heterogeneous resources. Mesos introduces a distributed two-level scheduling mechanism called resource offers. Mesos decides how many resources to offer each framework, while frameworks decide which resources to accept and which computations to run on them. It is a thin resource sharing layer that enables fine-grained sharing across diverse cluster computing frameworks, by giving frameworks a common interface for accessing cluster resources.The idea is to deploy multiple distributed systems to a shared pool of nodes in order to increase resource utilization. A lot of modern workloads and frameworks can run on Mesos, including Hadoop, Memecached, Ruby on Rails, Storm, JBoss Data Grid,MPI, Spark and Node.js, as well as various Web servers, databases and application servers.

Mesos - Node Abstraction source

 In a similar way that a PC OS manages access to the resources on a desktop computer, Mesos ensures applications have access to the resources they need in a cluster. Instead of setting up numerous server clusters for different parts of an application, Mesos allows you to share a pool of servers that can all run different parts of your application without them interfering with each other and with the ability to dynamically allocate resources across the cluster as needed.  That means , it could easily switch resources away from framework1 [ big-data analysis ] and allocate them to  framework2 [web server ] , if  there is a heavy network. It also reduces a lot of the manual steps in deploying applications and can shift workloads around automatically to provide fault tolerance and keep utilization rates high.

Resource sharing across the cluster increases throughput and utilization

                         Mesos  ==>  " Data Center Kernel "
Mesos - One large pool of resources

Mesos is essentially data center  kernel - which means it’s the software that actually isolates the running workloads from each other . It still needs additional tooling to let engineers get their workloads running on the system and to manage when those jobs actually run. Otherwise, some workloads might consume all the resources, or important workloads might get bumped by less-important workloads that happen to require more resources.Hence  Mesos needs more than just a kernel - Chronos scheduler, a cron replacement for automatically starting and stopping services (and handling failures) that runs on top of Mesos. The other part of the Mesos  is Marathon that  provides API for starting, stopping and scaling services (and Chronos could be one of those services). 



Mesos consists of a master process that manages slave daemons running on each cluster node, and frameworks that run tasks on these slaves. The master implements fine-grained sharing across frameworks using resource offers. Each resource offer is a list of free resources on multiple slaves. The master decides how many resources to offer to each framework according to an organizational policy, such as fair sharing or priority. To support a diverse set of inter-framework allocation policies, Mesos lets organizations define their own policies via a pluggable allocation module.

Mesos Architecture with two running Frameworks (Hadoop and MPI)

 Each framework running on Mesos consists of two components: a scheduler that registers with the master to be offered resources, and an executor process that is launched on slave nodes to run the framework’s tasks. While the master determines how many resources to offer to each framework, the frameworks’ schedulers select which of the offered resources to use. When a framework accepts offered resources, it passes Mesos a description of the tasks it wants to launch on them.

Resource Offer
Figure shows an example of how a framework gets scheduled to run tasks. In step (1), slave 1 reports to the master that it has 4 CPUs and 4 GB of memory free. The master then invokes the allocation module, which tells it that framework 1 should be offered all available resources. In step (2), the master sends a resource offer describing these resources to framework 1. In step (3), the framework’s scheduler replies to the master with information about two tasks to run on the slave, using 2 CPUs; 1 GB RAM for the first task, and 1 CPUs; 2 GB RAM for the second task. Finally, in step (4), the master sends the tasks to the slave, which allocates appropriate resources to the framework’s executor, which in turn launches the two tasks (depicted with dotted borders). Because 1 CPU and 1 GB of RAM are still free, the allocation module may now offer them to framework 2. In addition, this resource offer process repeats when tasks finish and new resources become free.

While the thin interface provided by Mesos allows it to scale and allows the frameworks to evolve independently. A framework will reject the offers that do not satisfy its constraints and accept the ones that do. In particular, we have found that a simple policy called delay scheduling, in which frameworks wait for a limited time to acquire nodes storing the input data, yields nearly optimal data locality.

Mesos Scheduler APIs :
API cosists of two primitives: calls and events  which are low-vel,unreliable and one way message passing.

"calls" are basically messages sent to mesos.
  •  Life cycle management (start, failover, stop) - Register, Reregister, Unregister
  •  Resource Allocation -Request, Decline,Revive 
  • TaskManagement-Launch, Kill,Acknowledgemnet,Reconcile 

"events" are messages that framework received.
  • Life cycle management - Registered, Reregistered
  • Resource allocation - Offers, Rescind
  • Task Management -Update 

Scheduler communication with Mesos

  • Scheduler sends a REGISTER call to Mesos Master .
  • Mesos master responds with acknowledgement that you got REGISTERED .
  • Offer will be made to  the scheduler with specific requests (optional).
  • Master allocates some resources for scheduler shows up as a OFFER
  • Scheduler can use offered resources to run tasks, once it decides what tasks it might want to run.
  • Then Master launches task on specified slaves. It might receive OFFER for mor resource allocation. Later , it may get update i.e state of task ( asynchronous in nature).

Task/Executor isolation:
To get more control over task management, executors are used. The executor would decide how it actually wants to run the task. Executor can run one or more tasks (like threads of distributed systems). Advantage here is that you could assign multiple tasks to the executor. Interesting point to note here is isolation (i.e allocation of containers for Executor tree with multiple tasks and for individual task) as shown below. The executor's resources change overtime and dynamically adjust the resources per container. In summary , Mesos gives an elasticity on per node basis or across the cluster for containers to grow or shrink dynamically. Applications like Spark would show great performance for the same reason.

Features of Mesos :
  •     Fault-tolerant replicated master using ZooKeeper
  •     Scalability to 10,000s of nodes
  •     Isolation between tasks with Linux Containers
  •     Multi-resource scheduling (memory and CPU aware)
  •     Java, Python and C++ APIs for developing new parallel applications
  •     Web UI for viewing cluster state

Running at container level increases the performance 

Software projects built on Mesos :
Long Running Services:
1)Aurora is a service scheduler that runs on top of Mesos, enabling you to run long-running   services that take advantage of Mesos' scalability, fault-tolerance, and resource isolation.
2)Marathon is a private PaaS built on Mesos. It automatically handles hardware or software failures and ensures that an app is “always on”. 
3)Singularity is a scheduler (HTTP API and web interface) for running Mesos tasks: long running processes, one-off tasks, and scheduled jobs.
4)SSSP is a simple web application that provides a white-label “Megaupload” for storing and sharing files in S3.
Big Data Processing :
1)Cray Chapel is a productive parallel programming language. The Chapel Mesos scheduler lets you run Chapel programs on Mesos.
2)Dpark is a Python clone of Spark, a MapReduce-like framework written in Python, running on Mesos.
3)Exelixi is a distributed framework for running genetic algorithms at scale.
4)Hadoop : Running Hadoop on Mesos distributes MapReduce jobs efficiently across an entire cluster.
5)Hama is a distributed computing framework based on Bulk Synchronous Parallel computing techniques for massive scientific computations e.g., matrix, graph and network algorithms.
6)MPI is a message-passing system designed to function on a wide variety of parallel computers.
7)Spark is a fast and general-purpose cluster computing system which makes parallel jobs easy to write.
8)Storm is a distributed realtime computation system. Storm makes it easy to reliably process unbounded streams of data, doing for realtime processing what Hadoop did for batch processing.
Batch Scheduling:
1)Chronos is a distributed job scheduler that supports complex job topologies. It can be used as a more fault-tolerant replacement for Cron.
2)Jenkins is a continuous integration server. The mesos-jenkins plugin allows it to dynamically launch workers on a Mesos cluster depending on the workload.
3)JobServer is a distributed job scheduler and processor which allows developers to build custom batch processing Tasklets using point and click web UI.
4)Torque is a distributed resource manager providing control over batch jobs and distributed compute nodes.
Data Storage:
1)Cassandra is a highly available distributed database. Linear scalability and proven fault-tolerance on commodity hardware or cloud infrastructure make it the perfect platform for mission-critical data.
2)ElasticSearch is a distributed search engine. Mesos makes it easy to run and scale.
3)Hypertable is a high performance, scalable, distributed storage and processing system for structured and unstructured data.

  Trends such as cloud computing and big data are moving organizations away from consolidation and into situations where they might have multiple distributed systems dedicated to specific tasks.  With the help of Docker executor for Mesos, Mesos can run and manage Docker containers in conjunction with Chronos and Marathon frameworks. Docker containers provide a consistent, compact and flexible means of packaging application builds. Delivering applications with Docker on Mesos promises a truly elastic, efficient and consistent platform for delivering a range of applications on premises or in the cloud.
References :
1)  http://mesos.apache.org/documentation/latest/
2)  http://static.usenix.org/event/nsdi11/tech/full_papers/Hindman_new.pdf
3)  http://howtojboss.com/2013/09/04/ampd-for-hadoop-alternatives/ 
4)  http://typesafe.com/blog/play-framework-grid-deployment-with-mesos
5) https://mesosphere.io/2013/09/26/docker-on-mesos/