Thursday, April 13, 2023

IBM Spectrum Symphony and LSF with Apache Hadoop

IBM Spectrum Symphony (formerly known as IBM Platform Symphony) is a high-performance computing (HPC) and grid computing software platform that enables organizations to process large amounts of data and run compute-intensive applications at scale. It provides a distributed computing infrastructure that can be used for a wide range of data-intensive workloads, such as scientific simulations, financial modeling, and big data analytics. IBM Spectrum Symphony is a parallel services middleware and cluster manager. It is widely used in banks for risk analytics, data analytics in a shared, multi-user, multi-application, multi-job environment. IBM Spectrum Symphony also works with IBM Spectrum LSF (for batch workloads) in the same cluster to allow both batch and parallel services workloads to share the same cluster.

Some of the key features of IBM Spectrum Symphony include:

  1. Distributed computing: The platform allows organizations to distribute computing workloads across a large number of nodes, which can be located in different data centers or cloud environments.
  2. Resource management: IBM Spectrum Symphony provides a resource management framework that allows organizations to allocate and manage compute, storage, and network resources more efficiently.
  3. High availability: The platform is designed to provide high availability and fault tolerance, ensuring that applications can continue to run even if individual nodes or components fail.
  4. Performance optimization: IBM Spectrum Symphony includes a range of performance optimization features, such as load balancing and data caching, which can help organizations to achieve faster processing times and better overall performance.
  5. Support for multiple programming languages: The platform supports a wide range of programming languages, including Java, Python, and C++, which makes it easy for developers to build and deploy applications on the platform.

IBM Spectrum LSF (Load Sharing Facility) is another software platform that is often used in conjunction with IBM Spectrum Symphony to manage and optimize workloads in a distributed computing environment. LSF provides a range of features for resource management, workload scheduling, and job prioritization, which can help organizations to improve performance and efficiency.

When used together, IBM Spectrum Symphony and IBM Spectrum LSF can provide a comprehensive solution for managing and optimizing large-scale distributed computing environments. IBM Spectrum Symphony provides the distributed computing infrastructure and application management capabilities, while IBM Spectrum LSF provides the workload management and optimization features.

Some of the key features of LSF that complement IBM Spectrum Symphony include:
  1. Advanced job scheduling: LSF provides sophisticated job scheduling capabilities, allowing organizations to prioritize and schedule jobs based on a wide range of criteria, such as resource availability, job dependencies, and user priorities.
  2. Resource allocation: LSF can manage the allocation of resources, ensuring that jobs are run on the most appropriate nodes and that resources are used efficiently.
  3. Job monitoring: LSF provides real-time monitoring of job progress and resource usage, allowing organizations to quickly identify and resolve issues that may impact performance.
  4. Integration with other tools: LSF can be integrated with a wide range of other HPC tools and applications, including IBM Spectrum Symphony, providing a seamless workflow for managing complex computing workloads.
Integrating LSF with Hadoop can help organizations to optimize the use of their resources and achieve better performance when running Hadoop workloads.  
Apache Hadoop ("Hadoop") is a framework for large-scale distributed data storage and processing on computer clusters that uses the Hadoop Distributed File System ("HDFS") for the data storage and MapReduce programming model for the data processing. Since MapReduce workloads might only represent a small fraction of overall workload, but typically requires their own standalone environment, MapReduce is difficult to support within traditional HPC clusters. However, HPC clusters typically use parallel file systems that are sufficient for initial MapReduce workloads, so you can run MapReduce workloads as regular parallel jobs running in an HPC cluster environment. Use the IBM Spectrum LSF integration with Apache Hadoop to submit Hadoop MapReduce workloads as regular LSF parallel jobs.
To run your Hadoop application through LSF, submit it as an LSF job. Once the LSF job starts to run, the Hadoop connector script (lsfhadoop.sh) automatically provisions an open source Hadoop cluster within LSF allocated resources, then submits actual MapReduce workloads into this Hadoop cluster. Since each LSF Hadoop job has its own resource (cluster), the integration provides a multi-tenancy environment to allow multiple users to share the common pool of HPC cluster resources. LSF is able to collect resource usage of MapReduce workloads as normal LSF parallel jobs and has full control of the job life cycle. After the job is complete, LSF shuts down the Hadoop cluster.

By default, the Apache Hadoop integration configures the Hadoop cluster with direct access to shared file systems and does not require HDFS. This allows you to use existing file systems in your HPC cluster without having to immediately invest in a new file system. Through the existing shared file system, data can be stored in common share locations, which avoids the typical data stage-in and stage-out steps with HDFS.

The general steps to integrate LSF with Hadoop:
  1. Install and configure LSF: The first step is to install and configure LSF on the Hadoop cluster. This involves setting up LSF daemons on the cluster nodes and configuring LSF to work with the Hadoop Distributed File System (HDFS).
  2. Configure Hadoop for LSF: Hadoop needs to be configured to use LSF as its resource manager. This involves setting the yarn.resourcemanager.scheduler.class property in the Hadoop configuration file to com.ibm.platform.lsf.yarn.LSFYarnScheduler.
  3. Configure LSF for Hadoop: LSF needs to be configured to work with Hadoop by setting up the necessary environment variables and resource limits. This includes setting the LSF_SERVERDIR and LSF_LIBDIR environment variables to the LSF installation directory and configuring LSF resource limits to ensure that Hadoop jobs have access to the necessary resources.
  4. Submit Hadoop jobs to LSF: Hadoop jobs can be submitted to LSF using the yarn command-line tool with the -Dmapreduce.job.submithostname and -Dmapreduce.job.queuename options set to the LSF submit host and queue, respectively.
  5. Monitor Hadoop jobs in LSF: LSF provides a web-based user interface and command-line tools for monitoring and managing Hadoop jobs running on the cluster. This allows users to monitor job progress, resource usage, and other metrics, and to take corrective action if necessary.

LSF can be used as a standalone workload management software for Hadoop clusters, without the need for IBM Spectrum Symphony. LSF provides advanced job scheduling and resource management capabilities, which can be used to manage and optimize Hadoop workloads running on large HPC clusters. By integrating LSF with Hadoop, organizations can ensure that Hadoop jobs have access to the necessary resources and are scheduled and managed efficiently, improving overall performance and resource utilization.

In addition,  IBM Spectrum Symphony provides additional capabilities beyond workload management, such as distributed computing infrastructure, data movement, and integration with other data center software. If an organization requires these additional capabilities, they may choose to use IBM Spectrum Symphony alongside LSF for even greater benefits. But LSF can be used independently as a workload manager for Hadoop clusters.

Submitting LSF jobs to a Hadoop cluster involves creating an LSF job script that launches the Hadoop job and then submitting the job to LSF using the bsub command. . LSF will then schedule the job to run on the cluster. To submit LSF jobs to a Hadoop cluster, you need to follow these general steps:
  1. Write the Hadoop job: First, you need to write the Hadoop job that you want to run on the cluster. This can be done using any of the Hadoop APIs, such as MapReduce, Spark, or Hive.
  2. Create the LSF job script: Next, you need to create an LSF job script that will launch the Hadoop job on the cluster. This script will typically include the Hadoop command to run the job, along with any necessary environment variables, resource requirements, and other LSF-specific settings.
  3. Submit the LSF job: Once the job script is ready, you can submit it to LSF using the bsub command. This will add the job to the LSF queue and wait for available resources to run the job.
  4. Monitor the job: LSF provides several tools for monitoring and managing jobs running on the cluster, such as the bjobs command and the LSF web interface. You can use 
Example 1:  bsub command that can be used to submit a Hadoop job to an LSF-managed Hadoop cluster:

bsub -J my_hadoop_job -oo my_hadoop_job.out -eo my_hadoop_job.err -R "rusage[mem=4096]" -q hadoop_queue hadoop jar my_hadoop_job.jar input_dir output_dir

where:
-J: Specifies a name for the job. In this case, we're using "my_hadoop_job" as the job name.

-oo: Redirects the standard output of the job to a file. In this case, we're using "my_hadoop_job.out" as the output file.

-eo: Redirects the standard error of the job to a file. In this case, we're using "my_hadoop_job.err" as the error file.

-R: Specifies resource requirements for the job. In this case, we're requesting 4 GB of memory (mem=4096) for the job.

-q: Specifies the LSF queue to submit the job to. In this case, we're using the "hadoop_queue" LSF queue.

After the bsub command options, we specify the Hadoop command to run the job (hadoop jar my_hadoop_job.jar) and the input and output directories for the job (input_dir and output_dir). This will submit the Hadoop job to LSF, which will then schedule and manage the job on the Hadoop cluster.  For more details please refer these links.

Example 2:  How to submit a Hadoop job using bsub command with LSF? 

bsub -q hadoop -J "Hadoop Job" -n 10 -o hadoop.log -hadoop /path/to/hadoop/bin/hadoop jar /path/to/hadoop/examples.jar pi 10 1000

This command will submit a Hadoop job to the LSF scheduler and allocate resources as necessary based on the job's requirements.

where:
-q hadoop specifies that the job should be submitted to the Hadoop queue.
-J "Hadoop Job" specifies a name for the job.
-n 10 specifies the number of cores to use for the job.
-o hadoop.log specifies the name of the output log file.
-hadoop specifies that the command that follows should be executed on a Hadoop cluster.
/path/to/hadoop/bin/hadoop specifies the path to the Hadoop executable.
jar /path/to/hadoop/examples.jar pi 10 1000 specifies the command to run the Hadoop job, which in this case is the pi example program with 10 mappers and 1000 samples.

Example 3: How to submit a wordcount MapReduce job using bsub with LSF ?

bsub -q hadoop -J "MapReduce Job" -n 10 -o mapreduce.log -hadoop /path/to/hadoop/bin/hadoop jar /path/to/hadoop/examples.jar wordcount /input/data /output/data

where:
-q hadoop specifies that the job should be submitted to the Hadoop queue.
-J "MapReduce Job" specifies a name for the job.
-n 10 specifies the number of cores to use for the job.
-o mapreduce.log specifies the name of the output log file.
-hadoop specifies that the command that follows should be executed on a Hadoop cluster.
/path/to/hadoop/bin/hadoop specifies the path to the Hadoop executable.
jar /path/to/hadoop/examples.jar wordcount /input/data /output/data specifies the command to run the MapReduce job, which in this case is the wordcount example program with input data in /input/data and output data in /output/data.

Example 4: How to submit a terasort MapReduce job using bsub with LSF?

bsub -q hadoop -J "MapReduce Job" -n 20 -o mapreduce.log -hadoop /path/to/hadoop/bin/hadoop jar /path/to/hadoop/examples.jar terasort -Dmapred.map.tasks=100 -Dmapred.reduce.tasks=50 /input/data /output/data
where:
-q hadoop specifies that the job should be submitted to the Hadoop queue.
-J "MapReduce Job" specifies a name for the job.
-n 20 specifies the number of cores to use for the job.
-o mapreduce.log specifies the name of the output log file.
-hadoop specifies that the command that follows should be executed on a Hadoop cluster.
/path/to/hadoop/bin/hadoop specifies the path to the Hadoop executable.
jar /path/to/hadoop/examples.jar terasort -Dmapred.map.tasks=100 -Dmapred.reduce.tasks=50 /input/data /output/data specifies the command to run the MapReduce job, which in this case is the terasort example program with input data in /input/data and output data in /output/data, and specific configuration parameters to control the number of map and reduce tasks.

Example 5: How to submit a grep MapReduce job using bsub with LSF?

bsub -q hadoop -J "MapReduce Job" -n 30 -o mapreduce.log -hadoop /path/to/hadoop/bin/hadoop jar /path/to/hadoop/examples.jar grep -input /input/data -output /output/data -regex "example.*"
where:
-q hadoop specifies that the job should be submitted to the Hadoop queue.
-J "MapReduce Job" specifies a name for the job.
-n 30 specifies the number of cores to use for the job.
-o mapreduce.log specifies the name of the output log file.
-hadoop specifies that the command that follows should be executed on a Hadoop cluster.
/path/to/hadoop/bin/hadoop specifies the path to the Hadoop executable.
jar /path/to/hadoop/examples.jar grep -input /input/data -output /output/data -regex "example.*" specifies the command to run the MapReduce job, which in this case is the grep example program with input data in /input/data, output data in /output/data, and a regular expression pattern to search for.

Example 6: How to submit a non MapReduce hadoop job using bsub with LSF?

bsub -q hadoop -J "Hadoop Job" -n 10 -o hadoopjob.log -hadoop /path/to/hadoop/bin/hadoop fs -rm -r /path/to/hdfs/directory

where:
-q hadoop specifies that the job should be submitted to the Hadoop queue.
-J "Hadoop Job" specifies a name for the job.
-n 10 specifies the number of cores to use for the job.
-o hadoopjob.log specifies the name of the output log file.
-hadoop specifies that the command that follows should be executed on a Hadoop cluster.
/path/to/hadoop/bin/hadoop fs -rm -r /path/to/hdfs/directory specifies the command to run the Hadoop job, which in this case is to remove a directory in HDFS at /path/to/hdfs/directory.
This command will submit a non-MapReduce Hadoop job to the LSF scheduler and allocate resources as necessary based on the job's requirement


Example 7: If you have a Hadoop cluster with YARN and Spark installed, you can submit Spark jobs to the cluster using bsub as shown in the example.

bsub -q normal -J "Spark Job" -n 20 -o sparkjob.log /path/to/spark/bin/spark-submit --class com.example.MyApp --master yarn --deploy-mode cluster /path/to/my/app.jar arg1 arg2
where:
-q normal specifies that the job should be submitted to the normal queue.
-J "Spark Job" specifies a name for the job.
-n 20 specifies the number of cores to use for the job.
-o sparkjob.log specifies the name of the output log file.
/path/to/spark/bin/spark-submit specifies the path to the spark-submit script.
--class com.example.MyApp specifies the main class of the Spark application.
--master yarn --deploy-mode cluster specifies the mode to run the application in.
/path/to/my/app.jar arg1 arg2 specifies the path to the application jar file and its arguments.

The above example does not explicitly require Hadoop to be installed or used. However, it assumes that the Spark cluster is running in YARN mode, which is typically used in a Hadoop cluster. In general, Spark can be run in various modes, including standalone, YARN, and Mesos. There are various other parameters and configurations that can be specified. Some examples include:
--num-executors: Specifies the number of executor processes to use for the job.
--executor-cores: Specifies the number of cores to allocate per executor.
--executor-memory: Specifies the amount of memory to allocate per executor.
--driver-memory: Specifies the amount of memory to allocate for the driver process.
--queue: Specifies the YARN queue to submit the job to.
--files: Specifies a comma-separated list of files to be distributed with the job.
--archives: Specifies a comma-separated list of archives to be distributed with the job.

These parameters can be used to fine-tune the resource allocation and performance of Spark jobs in a Hadoop cluster. Additionally, there are other options that can be used to configure the behavior of the Spark application itself, such as --conf to specify Spark configuration options and --jars to specify external JAR files to be used by the application

Here is an example LSF configuration file (lsf.conf) that includes settings for running Spark applications:
# LSF Configuration File
# Spark settings
LSB_JOB_REPORT_MAIL=N
LSB_DEFAULTGROUP=spark
LSB_DEFAULTJOBGROUP=spark
LSB_JOB_ACCOUNTING_INTERVAL=60
LSB_SUB_LOGLEVEL=3
LSB_JOB_PROLOGUE="/opt/spark/current/bin/load-spark-env.sh"
LSB_JOB_WRAPPER="mpirun -n 1 $LSF_BINDIR/lsb.wrapper $LSB_BINARY_NAME"
LSB_HOSTS_TASK_MODEL=cpu


An example Spark configuration file (spark-defaults.conf) that includes settings for running Spark applications using LSF:
# Spark Configuration File
# LSF settings
spark.master=yarn
spark.submit.deployMode=cluster
spark.yarn.queue=default
spark.executor.instances=2
spark.executor.memory=2g
spark.executor.cores=2
spark.driver.memory=1g
spark.driver.cores=1
spark.yarn.am.memory=1g
spark.yarn.am.cores=1
spark.yarn.maxAppAttempts=2
spark.eventLog.enabled=true
spark.eventLog.dir=hdfs://namenode:8020/spark-event-logs
spark.history.fs.logDirectory=hdfs://namenode:8020/spark-event-logs
spark.scheduler.mode=FAIR
spark.serializer=org.apache.spark.serializer.KryoSerializer

This configuration file sets several parameters for running Spark applications on a YARN cluster managed by LSF, including specifying the number of executor instances, executor memory, and executor cores, as well as setting the queue and memory allocation for the Spark ApplicationMaster.



Using LSF as the scheduler for Hadoop can provide better resource utilization, job scheduling, queuing, integration with other workloads, and monitoring and management capabilities than the built-in YARN scheduler. This can help improve the performance, scalability, and efficiency of Hadoop clusters, especially in large, complex environments.
  1. Better resource utilization: LSF has advanced resource allocation and scheduling algorithms that can improve resource utilization in Hadoop clusters. This can lead to better performance and reduced infrastructure costs.
  2. Better job scheduling: LSF has more advanced job scheduling features than YARN, such as support for job dependencies, job preemption, and priority-based job scheduling. This can help optimize job execution and reduce waiting times.
  3. Advanced queuing: LSF allows for more flexible and advanced queuing mechanisms, including job prioritization and preemption, multiple queues with different priorities, and customizable scheduling policies.
  4. Integration with other workloads: LSF is a general-purpose job scheduler that can be used to manage a wide range of workloads, including Hadoop, MPI, and other distributed computing frameworks. This allows for better integration and coordination of workloads on the same infrastructure.
  5. Advanced monitoring and management: LSF provides more advanced monitoring and management tools than YARN, including web-based interfaces, command-line tools, and APIs for job management, resource monitoring, and performance analysis.
LSF is a versatile job scheduler that can be used for a wide range of workloads, including batch and real-time scheduling. While LSF is often used for batch scheduling workloads, it can also be used for real-time scheduling workloads like Apache Kafka, thanks to its advanced scheduling capabilities and integration capabilities with other distributed computing frameworks.

LSF has advanced scheduling capabilities that can help optimize the allocation of resources for real-time workloads, including support for job prioritization, preemption, and multiple queues with different priorities. This can help ensure that real-time workloads are allocated the necessary resources in a timely and efficient manner.

Furthermore, LSF has integration capabilities with other distributed computing frameworks like Apache Kafka. For example, LSF can be used to manage the resource allocation and scheduling of Kafka brokers, consumers, and producers. This can help optimize the performance and scalability of Kafka clusters.

Examples for applications with real time scheduling:
  1. A major financial services company uses Hadoop and LSF to process real-time financial data. LSF is used to manage the allocation of compute resources for Hadoop, including managing the cluster's memory, CPU, and disk resources. This setup enables the company to process real-time financial data with low latency and high throughput.
  2. A large e-commerce company uses Hadoop and LSF to process large volumes of customer data in real-time. LSF is used to schedule and manage jobs across multiple Hadoop clusters, optimizing the allocation of resources to ensure that real-time processing is prioritized. This setup enables the company to personalize customer experiences and deliver targeted marketing campaigns in real-time.
  3. A global telecommunications company uses Hadoop and LSF to process real-time data from its network infrastructure. LSF is used to manage job scheduling and resource allocation, ensuring that data is processed quickly and efficiently. This setup enables the company to monitor and optimize network performance in real-time, providing a better customer experience.

Overall, the combination of Hadoop and LSF can provide a powerful and flexible platform for processing both historical as well as real-time data in production environments. By leveraging the advanced resource management and scheduling capabilities of LSF, organizations can optimize performance, reduce latency, and improve the overall efficiency of their Hadoop clusters.

Reference:

No comments:

Post a Comment