- MPI REDUCE,
- MPI IREDUCE,
- MPI ALLREDUCE and
- MPI IALLREDUCE.
- MPI REDUCE SCATTER,
- MPI IREDUCE SCATTER,
- MPI REDUCE SCATTER BLOCK and
- MPI IREDUCE SCATTER BLOCK.
- MPI SCAN,
- MPI ISCAN,
- MPI EXSCAN, and
- MPI IEXSCAN.
- [ Name] Meaning
- [ MPI_MAX] maximum
- [ MPI_MIN] minimum
- [ MPI_SUM] sum
- [ MPI_PROD] product
- [ MPI_LAND] logical and
- [ MPI_BAND] bit-wise and
- [ MPI_LOR] logical or
- [ MPI_BOR] bit-wise or
- [ MPI_LXOR] logical xor
- [ MPI_BXOR] bit-wise xor
- [ MPI_MAXLOC] max value and location
- [ MPI_MINLOC] min value and location
Example 1: Get the memory on each node and perform MPI_SUM operation to calculate average Memory on the cluster.
cat reduce_sum.c
#include <stdlib.h>
#include <mpi.h>
#include <assert.h>
#include <time.h>
#include <string.h>
//Get meminfo function to calc size of RAM
char *get_meminfo()
{
FILE *in=NULL;
char temp[256];
char *ram_size=(char*)malloc(256);
assert(ram_size != NULL);
in=popen("cat /proc/meminfo | grep MemTotal | cut -c15-26", "r");
fgets(temp, 255, in);
strcpy(ram_size, temp);
//printf("In function %s", ram_size);
pclose(in);
return ram_size;
}
int main(int argc, char** argv) {
MPI_Init(NULL, NULL);
int world_rank;
MPI_Comm_rank(MPI_COMM_WORLD, &world_rank);
int world_size;
MPI_Comm_size(MPI_COMM_WORLD, &world_size);
// Get the size of RAM locally
char *size_of_RAM=NULL;
int local_val;
size_of_RAM=get_meminfo();
local_val=atoi(size_of_RAM);
// Print the SIZE of memory on each process
printf("Size of Local Memory for process %d , RAM_size_local= %d\n",
world_rank, local_val);
// Reduce all of the local meminfo into the global meminfo
int global_val;
MPI_Reduce(&local_val, &global_val, 1, MPI_INT, MPI_SUM, 0, MPI_COMM_WORLD);
// Print the result
if (world_rank == 0) {
printf("Total global sum = %d, avg = %d\n", global_val,
global_val / (world_size));
}
// Clean up
free(size_of_RAM);
MPI_Barrier(MPI_COMM_WORLD);
MPI_Finalize();
}
Compile :
mpicc -o reduce_sum reduce_sum.c
--------------------------------------------------------------
$mpirun -np 6 -host exechost03:1,exechost04:1,exechost05:1,exechost06:1,exechost07:1,exechost08:1 reduce_sum
Size of Local Memory for process 3 , RAM_size_local= 32742748
Size of Local Memory for process 4 , RAM_size_local= 32742748
Size of Local Memory for process 2 , RAM_size_local= 65666080
Size of Local Memory for process 1 , RAM_size_local= 65666088
Size of Local Memory for process 5 , RAM_size_local= 65633576
Size of Local Memory for process 0 , RAM_size_local= 65669284
Total global sum = 328120524, avg = 54686754
$
Example 2: Get the memory on each node and perform MPI_MIN operation.
cat reduce_min.c
#include <stdio.h>
#include <stdlib.h>
#include <mpi.h>
#include <assert.h>
#include <time.h>
#include <string.h>
//Get meminfo function to calc size of RAM
char *get_meminfo()
{
FILE *in=NULL;
char temp[256];
char *ram_size=(char*)malloc(256);
assert(ram_size != NULL);
in=popen("cat /proc/meminfo | grep MemTotal | cut -c15-26", "r");
fgets(temp, 255, in);
strcpy(ram_size, temp);
//printf("In function %s", ram_size);
pclose(in);
return ram_size;
}
int main(int argc, char** argv) {
MPI_Init(NULL, NULL);
int world_rank;
MPI_Comm_rank(MPI_COMM_WORLD, &world_rank);
int world_size;
MPI_Comm_size(MPI_COMM_WORLD, &world_size);
// Get the size of RAM locally
char *size_of_RAM=NULL;
int local_val;
size_of_RAM=get_meminfo();
local_val=atoi(size_of_RAM);
// Print the SIZE of memory on each process
printf("Size of Local Memory for process %d , RAM_size_local= %d\n",
world_rank, local_val);
// Reduce all of the local meminfo into the global meminfo
int global_val;
MPI_Reduce(&local_val, &global_val, 1, MPI_INT, MPI_MIN, 0, MPI_COMM_WORLD);
// Print the result
if (world_rank == 0) {
printf("Total global min val = %d\n", global_val);
}
// Clean up
free(size_of_RAM);
MPI_Barrier(MPI_COMM_WORLD);
MPI_Finalize();
}
--------------
Compile :
mpicc -o reduce_min reduce_min.c
$mpirun -np 6 -host exechost03:1,exechost04:1,exechost05:1,exechost06:1,exechost07:1,exechost08:1 reduce_min
Size of Local Memory for process 3 , RAM_size_local= 32742748
Size of Local Memory for process 1 , RAM_size_local= 65666088
Size of Local Memory for process 2 , RAM_size_local= 65666080
Size of Local Memory for process 4 , RAM_size_local= 32742748
Size of Local Memory for process 5 , RAM_size_local= 65633576
Size of Local Memory for process 0 , RAM_size_local= 65669284
Total global min val = 32742748
$
Another example where you can find Minimum Free memory available and Maximum PPN value to calculate max_bytes_per_rank to avoid OOM .
cat reduce_ppn_Memory.c
#include <stdio.h>
#include <stdlib.h>
#include <mpi.h>
#include <assert.h>
#include <time.h>
#include <string.h>
#include <sys/sysinfo.h>
int main(int argc, char** argv) {
MPI_Init(NULL, NULL);
const double gigabyte = 1024 * 1024 *1024;
double local_free_mem, global_min_memory, max_bytes_per_rank;
int len, local_ppn, global_max_ppn, world_rank, world_size;
char name[MPI_MAX_PROCESSOR_NAME];
struct sysinfo si;
sysinfo (&si);
MPI_Comm_rank(MPI_COMM_WORLD, &world_rank);
MPI_Comm_size(MPI_COMM_WORLD, &world_size);
MPI_Comm shared_comm;
MPI_Comm_split_type(MPI_COMM_WORLD, MPI_COMM_TYPE_SHARED, 0, MPI_INFO_NULL, &shared_comm);
MPI_Get_processor_name(name, &len);
MPI_Comm_size(shared_comm, &local_ppn);
MPI_Comm_free(&shared_comm);
/* Get the size of Free RAM locally */
local_free_mem = (si.freeram * si.mem_unit) / gigabyte;
/* Print the SIZE of memory on each process */
printf("Size of Local Memory for process %d is %f GB and local_ppn is %d on %s \n", world_rank, local_free_mem, local_ppn, name);
/* Reduce all of the local meminfo & ppn into the global meminfo & ppn */
/* Get Minimum Memory available and Maximum value of PPN on all the nodes */
MPI_Reduce(&local_free_mem, &global_min_memory, 1, MPI_DOUBLE, MPI_MIN, 0, MPI_COMM_WORLD);
MPI_Reduce(&local_ppn, &global_max_ppn, 1, MPI_INT, MPI_MAX, 0, MPI_COMM_WORLD);
/* Print the result Minimum Free memory available and Maximum PPN value */
if (world_rank == 0) {
printf("After MPI reduction, Minimum Free memory available is %f GB\n", global_min_memory);
printf("After MPI reduction, Maximum PPN value is %d\n", global_max_ppn);
/* max_bytes_per_rank calculation based on 50% of minimum free memory available */
/* divided by max value of PPN on the nodes in cluster. This will help to avoid OOM Failure */
/* in heterogeneous cluster where some of the nodes installed with lesser memory (Example 32GB RAM) */
max_bytes_per_rank = (global_min_memory * 0.5) / global_max_ppn;
printf("Maximum bytes per rank is %f GB \n", max_bytes_per_rank);
}
MPI_Barrier(MPI_COMM_WORLD);
MPI_Finalize();
}
---------------------------------------------------------------------------------------------
Compile:
[sachinpb@exechost04]$ mpicc -o reduce_ppn_Memory reduce_ppn_Memory.c
[sachinpb@exechost04]$
Run executable with 5 procs on 4 nodes :
[sachinpb@exechost04]$ mpirun -np 5 -host exechost04:1,exechost05:1,exechost06:1,exechost07:2 reduce_ppn_Memory
Size of Local Memory for process 0 is 59.349953 GB and local_ppn is 1 on exechost04
Size of Local Memory for process 2 is 29.662674 GB and local_ppn is 1 on exechost06
Size of Local Memory for process 1 is 59.596630 GB and local_ppn is 1 on exechost05
Size of Local Memory for process 3 is 29.628567 GB and local_ppn is 2 on exechost07
Size of Local Memory for process 4 is 29.628567 GB and local_ppn is 2 on exechost07
After MPI reduction, Minimum Free memory available is 29.628567 GB
After MPI reduction, Maximum PPN value is 2
Maximum bytes per rank is 7.407142 GB
------------------------------------------------------------------------------------------------------
Run executable with 13 procs on 8 nodes :
[sachinpb@exechost04]$ mpirun -np 13 -host exechost03:1,exechost04:4,exechost05:1,exechost06:1,exechost07:1,exechost08:3,exechost09:1,exechost10:1 reduce_ppn_Memory
Size of Local Memory for process 7 is 29.717995 GB and local_ppn is 1 on exechost07
Size of Local Memory for process 6 is 29.657806 GB and local_ppn is 1 on exechost06
Size of Local Memory for process 5 is 59.594044 GB and local_ppn is 1 on exechost05
Size of Local Memory for process 11 is 60.412170 GB and local_ppn is 1 on exechost09
Size of Local Memory for process 12 is 46.994701 GB and local_ppn is 1 on exechost10
Size of Local Memory for process 4 is 59.661568 GB and local_ppn is 1 on exechost03
Size of Local Memory for process 1 is 59.044613 GB and local_ppn is 4 on exechost04
Size of Local Memory for process 0 is 59.044613 GB and local_ppn is 4 on exechost04
Size of Local Memory for process 3 is 59.044613 GB and local_ppn is 4 on exechost04
Size of Local Memory for process 2 is 59.044613 GB and local_ppn is 4 on exechost04
Size of Local Memory for process 9 is 60.254044 GB and local_ppn is 3 on exechost08
Size of Local Memory for process 10 is 60.254044 GB and local_ppn is 3 on exechost08
Size of Local Memory for process 8 is 60.254044 GB and local_ppn is 3 on exechost08
After MPI reduction, Minimum Free memory available is 29.657806 GB
After MPI reduction, Maximum PPN value is 4
Maximum bytes per rank is 3.707226 GB
Many parallel applications will require accessing the reduced results across all processes rather than the root process. In a similar complementary style of MPI_Allgather to MPI_Gather, MPI_Allreduce will reduce the values and distribute the results to all processes. Allreduce is a complex collective operation that performs a combination of vectors owned by processes into a result vector which is distributed back to the processes. In MPI Allreduce operation can be accessed by calling MPI Allreduce()
function .The function prototype is the following:
One of the more common applications of the reduction operation is the inner product computation. Typically, you have two vectors that have the same distribution, that is, where all processes store equal parts of and . For Example :
local_inprod = 0; for (i=0; i<localsize; i++) local_inprod += x[i]*y[i]; MPI_Allreduce( &local_inprod, &global_inprod, 1,MPI_DOUBLE ... )
Recently one more application for the Allreduce operation has emerged. Distributed training of Deep Neural Networks(DNN) uses Allreduce operation to synchronize neural network parameters between separate training processes after each step of gradient descent optimization. This new application involves data sets of medium and big sizes which depends on a particular neural network model bringing new requirements to the Allreduce performance.
MPI_SCAN is used to perform a prefix reduction on data distributed across the group. Parallel prefix, also known as scanning operation calculates the reduction of all parts of the local data stored in the process. The operation returns, in the receive buffer of the process with rank i, the reduction of the values in the send buffers of processes with ranks 0,...,i (inclusive). The type of operations supported, their semantics, and the constraints on send and receive buffers are as for MPI_REDUCE.
The diagram below shows the schematic diagram of reduction operation and parallel prefix operation
MPI_Exscan is an exclusive scan: it performs a prefix reduction across all MPI processes in the given communicator, excluding the calling MPI process. The operation returns, in the recvbuf of the process with rank i, the reduction (calculated according to the function op) of the values in the sendbufs of processes with ranks 0, ..., i-1. Compare this with the functionality of MPI_Scan, which calculates over the range 0, ..., i (inclusive). The type of operations supported, their semantics, and the constraints on send and receive buffers are as for MPI_Reduce. The value in recvbuf on process 0 is undefined and unreliable as recvbuf is not significant for process 0. The value of recvbuf on process 1 is always the value in sendbuf on process 0.
-----------
int main(int argc, char* argv[])
{
MPI_Init(&argc, &argv);
// Get my rank
int my_rank;
MPI_Comm_rank(MPI_COMM_WORLD, &my_rank);
// Get the sum of all ranks up to the one before mine and print it
int total;
MPI_Exscan(&my_rank, &total, 1, MPI_INT, MPI_SUM, MPI_COMM_WORLD);
// The result on MPI process 0 is undefined, do not print it
if(my_rank == 0)
{
printf("[MPI process 0] Total = undefined.\n");
}
else
{
printf("[MPI process %d] Total = %d.\n", my_rank, total);
}
MPI_Finalize();
return EXIT_SUCCESS;
}
-----------
MPI_IN_PLACE tells MPI_Allreduce to use a single buffer as both the sending and receiving buffer.The "in place'' operations are provided to reduce unnecessary memory motion by both the MPI implementation and by the user. By allowing the 'in place'[MPI_IN_PLACE ] option, the receive buffer in many of the collective calls becomes a send-and-receive buffer(i.e send and receive buffer are one same buffer). Collective communication applies to intracommunicators. If the operation is rooted (e.g., broadcast, gather, scatter), then the transfer is unidirectional. Non-rooted operations, such as all-to-all, will often occur as part of an exchange, where it makes sense to communicate in both directions at once (i.e transfer is bidirectional). Note that the ``in place'' option does not apply to intercommunicators since in the intercommunicator case there is no communication from a process to itself.
An intra-communicator refers to a single group, an inter-communicator refers to a pair of groups. The intra-communicator is simply the group of all processes which share that communicator. Collective communications can be performed with an intra-communicator. They cannot be performed on an inter-communicator.The "in place" option for
intracommunicators is specified by passing the value MPI_IN_PLACE to the
argument sendbuf at the root. In such a case, the input data is taken
at the root from the receive buffer, where it will be replaced by the
output data [i.e use if-else to pass MPI_IN_PLACE as sendbuf only on the
root [not for other ranks].
For example :
-------------------------
if ( rank == root )
rc = MPI_Reduce( MPI_IN_PLACE, buffer, size * count, MPI_INT, MPI_SUM, root, MPI_COMM_WORLD );
else
rc = MPI_Reduce( buffer, NULL , size * count, MPI_INT, MPI_SUM, root, MPI_COMM_WORLD );
NOTE:
For gather and most other collectives, MPI_IN_PLACE should passed as
the sendbuf. For scatter/scatterv, MPI_IN_PLACE should be passed as the
recvbuf.
On the other side, inter-communication is a point-to-point communication between processes in different groups. Inter-communicators are more likely to be used by parallel library designers than application developers. If comm is an intercommunicator, then the result of the reduction of the data provided by processes in group A is stored at each process in group B, and vice versa. Both groups should provide count and datatype arguments that specify the same type signature.
I hope this blog helped in understanding MPI reduction operations and running sample code on your HPC cluster.
--------------------------------------------------------------------------------------------------------------------
No comments:
Post a Comment