## Monday, December 7, 2020

### Overview Of MPI Reduction Operations in HPC cluster

Message Passing Interface[ MPI ] is a de facto standard framework for distributed computing in many HPC applications. MPI collective operations involve a group of processes communicating by message passing in an isolated context, known as a communicator. Each process is identified by its rank, an integer number ranging from 0 to P − 1, where P is the size of the communicator. All processes place the same call (SPMD fashion i.e Single Program Multiple Data) depending on the process.

MPI Reductions are among the most useful MPI operations and form an important class of computational operations. . The operation can be either user-specified or from the list of pre-defined operations. Usually, the predefined operations are largely sufficient for any application.

Consider a system where you have N processes. The goal of the game is to compute the dot product of two N-vectors in parallel. Now the dot product of two vectors u and v Example operation : u⋅v=u1v1+u2v2+...+uNvN . As you can imagine, this is highly parallelizable. If you have N processes, each process i can compute the intermediate value ui×vi. Then, the program needs to find a way to sum all of these values. This is where the reduction comes into play. We can ask MPI to sum all those value and store them either on only one process (for instance process 0) or to redistribute the value to every process.

MPI reduction operations fall into three categories:

1) Global Reduction Operations:
• MPI REDUCE,
• MPI IREDUCE,
• MPI ALLREDUCE and
• MPI IALLREDUCE.
2) Combined Reduction and Scatter Operations:
• MPI REDUCE SCATTER,
• MPI IREDUCE SCATTER,
• MPI REDUCE SCATTER BLOCK and
• MPI IREDUCE SCATTER BLOCK.

3) Scan Operations:
• MPI SCAN,
• MPI ISCAN,
• MPI EXSCAN, and
• MPI IEXSCAN.

The primary idea of these operations is to collectively compute on a set of input data elements to generate a combined output. MPI REDUCE is a collective function where each process provides some input data (e.g., an array of double-precision floating-point numbers). This input data is combined through an MPI operation, as specified by the“op” parameter. Most applications use MPI predefined operations such as summations or maximum value identification, although some applications also utilize reductions based on user-defined function handlers. The MPI operator “op” is always assumed to be associative. All predefined operations are also assumed to be commutative. Applications, however, may define their own operations that are associative but not commutative. The “canonical” evaluation order of a reduction is determined by the ranks of the processes in the group. However, an MPI implementation can take advantage of associativity, or associativity and commutativity of the operations, in order to change the order of evaluation. Doing so may change the result of the reduction for operations that are not strictly associative and commutative, such as floating-point addition

The following predefined operations are supplied for MPI_REDUCE and related functions MPI_ALLREDUCE, MPI_REDUCE_SCATTER, and MPI_SCAN.

These operations are invoked by placing the following in op
• [ Name] Meaning
• [ MPI_MAX] maximum
• [ MPI_MIN] minimum
• [ MPI_SUM] sum
• [ MPI_PROD] product
• [ MPI_LAND] logical and
• [ MPI_BAND] bit-wise and
• [ MPI_LOR] logical or
• [ MPI_BOR] bit-wise or
• [ MPI_LXOR] logical xor
• [ MPI_BXOR] bit-wise xor
• [ MPI_MAXLOC] max value and location
• [ MPI_MINLOC] min value and location

Example 1: Get the memory on each node and perform MPI_SUM operation to calculate average Memory on the cluster.

cat reduce_sum.c
#include <stdio.h>
#include <stdlib.h>
#include <mpi.h>
#include <assert.h>
#include <time.h>
#include <string.h>

//Get meminfo function to calc size of RAM
char *get_meminfo()
{
FILE *in=NULL;
char temp[256];
char *ram_size=(char*)malloc(256);
assert(ram_size != NULL);
in=popen("cat /proc/meminfo | grep MemTotal | cut -c15-26", "r");
fgets(temp, 255, in);
strcpy(ram_size, temp);
//printf("In function %s", ram_size);
pclose(in);
return ram_size;
}

int main(int argc, char** argv) {

MPI_Init(NULL, NULL);

int world_rank;
MPI_Comm_rank(MPI_COMM_WORLD, &world_rank);
int world_size;
MPI_Comm_size(MPI_COMM_WORLD, &world_size);

// Get the size of RAM  locally
char *size_of_RAM=NULL;
int local_val;
size_of_RAM=get_meminfo();
local_val=atoi(size_of_RAM);
// Print the SIZE of memory  on each process
printf("Size of Local Memory for process %d , RAM_size_local= %d\n",
world_rank, local_val);

// Reduce all of the local meminfo into the global meminfo
int global_val;
MPI_Reduce(&local_val, &global_val, 1, MPI_INT, MPI_SUM, 0, MPI_COMM_WORLD);

// Print the result
if (world_rank == 0) {
printf("Total global sum = %d, avg = %d\n", global_val,
global_val / (world_size));
}

// Clean up
free(size_of_RAM);

MPI_Barrier(MPI_COMM_WORLD);
MPI_Finalize();
}

Compile :
mpicc -o reduce_sum reduce_sum.c
--------------------------------------------------------------
$mpirun -np 6 -host exechost03:1,exechost04:1,exechost05:1,exechost06:1,exechost07:1,exechost08:1 reduce_sum Size of Local Memory for process 3 , RAM_size_local= 32742748 Size of Local Memory for process 4 , RAM_size_local= 32742748 Size of Local Memory for process 2 , RAM_size_local= 65666080 Size of Local Memory for process 1 , RAM_size_local= 65666088 Size of Local Memory for process 5 , RAM_size_local= 65633576 Size of Local Memory for process 0 , RAM_size_local= 65669284 Total global sum = 328120524, avg = 54686754$

Example 2: Get the memory on each node and perform MPI_MIN operation.

cat reduce_min.c

#include <stdio.h>
#include <stdlib.h>
#include <mpi.h>
#include <assert.h>
#include <time.h>
#include <string.h>

//Get meminfo function to calc size of RAM
char *get_meminfo()
{
FILE *in=NULL;
char temp[256];
char *ram_size=(char*)malloc(256);
assert(ram_size != NULL);
in=popen("cat /proc/meminfo | grep MemTotal | cut -c15-26", "r");
fgets(temp, 255, in);
strcpy(ram_size, temp);
//printf("In function %s", ram_size);
pclose(in);
return ram_size;
}

int main(int argc, char** argv) {

MPI_Init(NULL, NULL);

int world_rank;
MPI_Comm_rank(MPI_COMM_WORLD, &world_rank);
int world_size;
MPI_Comm_size(MPI_COMM_WORLD, &world_size);

// Get the size of RAM  locally
char *size_of_RAM=NULL;
int local_val;
size_of_RAM=get_meminfo();
local_val=atoi(size_of_RAM);
// Print the SIZE of memory  on each process
printf("Size of Local Memory for process %d , RAM_size_local= %d\n",
world_rank, local_val);

// Reduce all of the local meminfo into the global meminfo
int global_val;
MPI_Reduce(&local_val, &global_val, 1, MPI_INT, MPI_MIN, 0, MPI_COMM_WORLD);

// Print the result
if (world_rank == 0) {
printf("Total global min val = %d\n", global_val);
}

// Clean up
free(size_of_RAM);

MPI_Barrier(MPI_COMM_WORLD);
MPI_Finalize();
}

--------------

Compile :
mpicc -o reduce_min reduce_min.c
$mpirun -np 6 -host exechost03:1,exechost04:1,exechost05:1,exechost06:1,exechost07:1,exechost08:1 reduce_min Size of Local Memory for process 3 , RAM_size_local= 32742748 Size of Local Memory for process 1 , RAM_size_local= 65666088 Size of Local Memory for process 2 , RAM_size_local= 65666080 Size of Local Memory for process 4 , RAM_size_local= 32742748 Size of Local Memory for process 5 , RAM_size_local= 65633576 Size of Local Memory for process 0 , RAM_size_local= 65669284 Total global min val = 32742748$

Another  example where you can find Minimum  Free memory available and Maximum  PPN value to calculate max_bytes_per_rank to avoid OOM .

cat reduce_ppn_Memory.c

#include <stdio.h>
#include <stdlib.h>
#include <mpi.h>
#include <assert.h>
#include <time.h>
#include <string.h>
#include <sys/sysinfo.h>

int main(int argc, char** argv) {

MPI_Init(NULL, NULL);
const double gigabyte = 1024 * 1024 *1024;
double local_free_mem, global_min_memory, max_bytes_per_rank;
int len, local_ppn, global_max_ppn, world_rank, world_size;
char name[MPI_MAX_PROCESSOR_NAME];
struct sysinfo si;
sysinfo (&si);

MPI_Comm_rank(MPI_COMM_WORLD, &world_rank);
MPI_Comm_size(MPI_COMM_WORLD, &world_size);

MPI_Comm shared_comm;
MPI_Comm_split_type(MPI_COMM_WORLD, MPI_COMM_TYPE_SHARED, 0,  MPI_INFO_NULL, &shared_comm);
MPI_Get_processor_name(name, &len);
MPI_Comm_size(shared_comm, &local_ppn);
MPI_Comm_free(&shared_comm);

/* Get the size of Free RAM  locally */
local_free_mem = (si.freeram * si.mem_unit) / gigabyte;

/* Print the SIZE of memory  on each process */
printf("Size of Local Memory for process %d is  %f GB and local_ppn is %d on %s \n", world_rank, local_free_mem, local_ppn, name);

/* Reduce all of the local meminfo & ppn  into the global meminfo & ppn  */
/* Get Minimum Memory available  and Maximum value of PPN on all the nodes */

MPI_Reduce(&local_free_mem, &global_min_memory, 1, MPI_DOUBLE, MPI_MIN, 0, MPI_COMM_WORLD);
MPI_Reduce(&local_ppn, &global_max_ppn, 1, MPI_INT, MPI_MAX, 0, MPI_COMM_WORLD);

/* Print the result Minimum  Free memory available and Maximum  PPN value */

if (world_rank == 0) {
printf("After MPI reduction, Minimum  Free memory available is  %f GB\n", global_min_memory);
printf("After MPI reduction, Maximum  PPN value  is  %d\n", global_max_ppn);

/* max_bytes_per_rank calculation based on 50% of minimum free memory available */
/* divided by max value of PPN on the nodes in cluster. This will help to avoid OOM Failure */
/* in heterogeneous cluster where some of the nodes installed with lesser memory (Example 32GB RAM) */

max_bytes_per_rank = (global_min_memory * 0.5) / global_max_ppn;
printf("Maximum bytes per rank is %f GB \n", max_bytes_per_rank);
}

MPI_Barrier(MPI_COMM_WORLD);
MPI_Finalize();
}

---------------------------------------------------------------------------------------------

Compile:
[sachinpb@exechost04]$mpicc -o reduce_ppn_Memory reduce_ppn_Memory.c [sachinpb@exechost04]$

Run  executable with 5  procs on 4 nodes :

[sachinpb@exechost04]$mpirun -np 5 -host exechost04:1,exechost05:1,exechost06:1,exechost07:2 reduce_ppn_Memory Size of Local Memory for process 0 is 59.349953 GB and local_ppn is 1 on exechost04 Size of Local Memory for process 2 is 29.662674 GB and local_ppn is 1 on exechost06 Size of Local Memory for process 1 is 59.596630 GB and local_ppn is 1 on exechost05 Size of Local Memory for process 3 is 29.628567 GB and local_ppn is 2 on exechost07 Size of Local Memory for process 4 is 29.628567 GB and local_ppn is 2 on exechost07 After MPI reduction, Minimum Free memory available is 29.628567 GB After MPI reduction, Maximum PPN value is 2 Maximum bytes per rank is 7.407142 GB ------------------------------------------------------------------------------------------------------ Run executable with 13 procs on 8 nodes : [sachinpb@exechost04]$ mpirun -np 13 -host exechost03:1,exechost04:4,exechost05:1,exechost06:1,exechost07:1,exechost08:3,exechost09:1,exechost10:1 reduce_ppn_Memory
Size of Local Memory for process 7 is  29.717995 GB and local_ppn is 1 on exechost07
Size of
Local Memory for process 6 is  29.657806 GB and local_ppn is 1 on exechost06
Size of
Local Memory for process 5 is  59.594044 GB and local_ppn is 1 on exechost05
Size of
Local Memory for process 11 is  60.412170 GB and local_ppn is 1 on exechost09
Size of
Local Memory for process 12 is  46.994701 GB and local_ppn is 1 on exechost10
Size of
Local Memory for process 4 is  59.661568 GB and local_ppn is 1 on exechost03
Size of
Local Memory for process 1 is  59.044613 GB and local_ppn is 4 on exechost04
Size of
Local Memory for process 0 is  59.044613 GB and local_ppn is 4 on exechost04
Size of
Local Memory for process 3 is  59.044613 GB and local_ppn is 4 on exechost04
Size of
Local Memory for process 2 is  59.044613 GB and local_ppn is 4 on exechost04
Size of
Local Memory for process 9 is  60.254044 GB and local_ppn is 3 on exechost08
Size of
Local Memory for process 10 is  60.254044 GB and local_ppn is 3 on exechost08
Size of
Local Memory for process 8 is  60.254044 GB and local_ppn is 3 on exechost08

After MPI reduction, Minimum  Free memory available is  29.657806 GB
After MPI reduction, Maximum  PPN value  is  4
Maximum bytes per rank is 3.707226 GB

MPI_Allreduce

Many parallel applications will require accessing the reduced results across all processes rather than the root process. In a similar complementary style of MPI_Allgather to MPI_Gather, MPI_Allreduce will reduce the values and distribute the results to all processes. Allreduce is a complex collective operation that performs a combination of vectors owned by processes into a result vector which is distributed back to the processes. In MPI Allreduce operation can be accessed by calling MPI Allreduce()
function .The function prototype is the following:

The example above shows  MPI_Reduce in which the reduction operation takes place on only one process (in this case process 0). In our case, the reception buffer (result) is only valid for process 0. The other processes will not have a valid value stored in result. Sometimes, you might want to have the result of the reduction stored on all processes, in which case MPI_Reduce is not suited. In such a case, you can use MPI_Allreduce to store the result on every process. So, if we had used MPI_Allreduce instead of MPI_Reduce in the example, all processes would have a valid value in result and could be using this value after the communication. MPI_Allreduce is the equivalent of doing MPI_Reduce followed by an MPI_Bcast.

One of the more common applications of the reduction operation is the inner product computation. Typically, you have two vectors that have the same distribution, that is, where all processes store equal parts of and .  For Example :
local_inprod = 0;
for (i=0; i<localsize; i++)
local_inprod += x[i]*y[i];
MPI_Allreduce( &local_inprod, &global_inprod, 1,MPI_DOUBLE ... )


Recently one more application for the Allreduce operation has emerged. Distributed training of Deep Neural Networks(DNN) uses Allreduce operation to synchronize neural network parameters between separate training processes after each step of gradient descent optimization. ThŒis new application involves data sets of medium and big sizes which depends on a particular neural network model bringing new requirements to the Allreduce performance.

MPI_SCAN is used to perform a prefix reduction on data distributed across the group. Parallel prefix, also known as scanning operation calculates the reduction of all parts of the local data stored in the process. The operation returns, in the receive buffer of the process with rank i, the reduction of the values in the send buffers of processes with ranks 0,...,i (inclusive). The type of operations supported, their semantics, and the constraints on send and receive buffers are as for MPI_REDUCE.

The diagram below shows the schematic diagram of reduction operation and parallel prefix operation

MPI_Exscan is an exclusive scan: it performs a prefix reduction across all MPI processes in the given communicator, excluding the calling MPI process.  The operation returns, in the recvbuf of the process with rank i, the reduction (calculated according to the function op) of the values in the sendbufs of processes with ranks 0, ..., i-1. Compare this with the functionality of MPI_Scan, which calculates over the range 0, ..., i (inclusive). The type of operations supported, their semantics, and the constraints on send and receive buffers are as for MPI_Reduce. The value in recvbuf on process 0 is undefined and unreliable as recvbuf is not significant for process 0. The value of recvbuf on process 1 is always the value in sendbuf on process 0.

-----------

int main(int argc, char* argv[])
{
MPI_Init(&argc, &argv);

// Get my rank
int my_rank;
MPI_Comm_rank(MPI_COMM_WORLD, &my_rank);

// Get the sum of all ranks up to the one before mine and print it
int total;
MPI_Exscan(&my_rank, &total, 1, MPI_INT, MPI_SUM, MPI_COMM_WORLD);

// The result on MPI process 0 is undefined, do not print it
if(my_rank == 0)
{
printf("[MPI process 0] Total = undefined.\n");
}
else
{
printf("[MPI process %d] Total = %d.\n", my_rank, total);
}

MPI_Finalize();

return EXIT_SUCCESS;

-----------

MPI_IN_PLACE tells MPI_Allreduce to use a single buffer as both the sending and receiving buffer.The "in place'' operations are provided to reduce unnecessary memory motion by both the MPI implementation and by the user. By allowing the 'in place'[MPI_IN_PLACE ] option, the receive buffer in many of the collective calls becomes a send-and-receive buffer(i.e send and receive buffer are one same buffer). Collective communication applies to intracommunicators. If the operation is rooted (e.g., broadcast, gather, scatter), then the transfer is unidirectional. Non-rooted operations, such as all-to-all, will often occur as part of an exchange, where it makes sense to communicate in both directions at once (i.e transfer is bidirectional). Note that the in place'' option does not apply to intercommunicators since in the intercommunicator case there is no communication from a process to itself.

An intra-communicator refers to a single group, an inter-communicator refers to a pair of groups. The intra-communicator is simply the group of all processes which share that communicator.  Collective communications can be performed with an intra-communicator. They cannot be performed on an inter-communicator.The "in place" option for intracommunicators is specified by passing the value MPI_IN_PLACE to the argument sendbuf at the root.  In such a case, the input data is taken at the root from the receive buffer, where it will be replaced by the output data [i.e use if-else to pass MPI_IN_PLACE as sendbuf only on the root [not for other ranks].

For example :
-------------------------
if ( rank == root )
rc = MPI_Reduce( MPI_IN_PLACE, buffer, size * count, MPI_INT, MPI_SUM, root, MPI_COMM_WORLD );
else
rc = MPI_Reduce( buffer, NULL  , size * count, MPI_INT, MPI_SUM, root, MPI_COMM_WORLD );

NOTE: For gather and most other collectives, MPI_IN_PLACE should passed as  the sendbuf. For scatter/scatterv, MPI_IN_PLACE should be passed as the recvbuf.

On the other side,  inter-communication is a point-to-point communication between processes in different groups. Inter-communicators are more likely to be used by parallel library designers than application developers. If comm is an intercommunicator, then the result of the reduction of the data provided by processes in group A is stored at each process in group B, and vice versa.  Both groups should provide count and datatype arguments that specify the same type signature.

I hope this blog helped in understanding MPI reduction operations  and running sample code on your HPC cluster.

--------------------------------------------------------------------------------------------------------------------

Reference:
https://www.open-mpi.org/doc/v4.0/man3/MPI_Reduce.3.php
https://www.programmersought.com/article/96223924595
https://www.rookiehpc.com/mpi/docs/