Given by Geoffrey C. Fox, Nancy McCracken at Computational Science for Simulations on Fall Semester 1998. Foils prepared 19 September 98
Outside Index
Summary of Material
This covers MPI from a user's point of view and is meant to be a supplement to other online resources from the MPI Forum, David Walker's Tutorial, Ian Foster's Book "Designing and Building Parallel Programs", Gropp,Lusk and Skjellum "Using MPI" |
An Overview is based on subset of 6 routines that cover send/receive, environment inquiry (for rank and total number of processors) initialize and finalization with simple examples |
Processor Groups, Collective Communication and Computation, Topologies, and Derived Datatypes are also discussed |
Outside Index Summary of Material
September 9, 1998 |
Geoffrey Fox, Nancy McCracken
|
This covers MPI from a user's point of view and is meant to be a supplement to other online resources from the MPI Forum, David Walker's Tutorial, Ian Foster's Book "Designing and Building Parallel Programs", Gropp,Lusk and Skjellum "Using MPI" |
An Overview is based on subset of 6 routines that cover send/receive, environment inquiry (for rank and total number of processors) initialize and finalization with simple examples |
Processor Groups, Collective Communication and Computation, Topologies, and Derived Datatypes are also discussed |
MPI collected ideas from many previous message passing systems and put them into a "standard" so we could write portable (runs on all current machines) and scalable (runs on future machines we can think of) parallel software |
MPI plays the same role to message passing systems that HPF does to data parallel languages
|
HPF runs on SIMD and MIMD machines and is high level as it expresses a style of programming or problem architecture |
MPI runs on MIMD machines (in principle it could run on SIMD but unnatural and inefficient) -- it expresses a machine architecture |
Traditional Software Model is
|
So in this analogy MPI is universal "machine-language" of Parallel processing |
MPI can be built efficiently at low risk whereas HPF compiler is difficult project with many unsolved issues |
An MPI program defines a set of processes, each executing the same program (SPMD)
|
... that communicate by calling MPI messaging functions
|
... and can be constructed in a modular fashion
|
Also
|
A standard message-passing library
|
MPI defines a language-independent interface
|
Bindings are defined for different languages
|
Multiple implementations
|
Began at Williamsburg Workshop in April 1992 |
Organized at Supercomputing 92 (November 92) |
Followed HPF Forum format and process
|
Pre-final draft distributed at Supercomputing 93 |
Two-month public comment period |
Final version of draft in May 1994 |
Widely available now on the Web, ftp sites, netlib |
Public and optimized Vendor implementations available for Unix and Windows NT |
Further MPI Forum meetings through 1995 and 1996 to discuss additions to the standard |
Standard announced at Supercomputing 1996 |
Broad Participation |
Vendors
|
Message Passing Library writers
|
Application specialists and consultants |
Companies:
|
Laboratories:
|
Universities:
|
MPI was designed by the Kitchen Sink approach and has 129 functions and each has many arguments
|
It is not a complete operating environment and does not have ability to create and spawn processes etc. |
PVM is the previous dominant approach
|
However it seems clear that MPI has been adopted as the standard messaging system by parallel computer vendors |
Questions:
|
First generation message passing systems only allowed one to transmit information originating in a contiguous array of bytes
|
MPI specifies the buffer by starting address, datatype, and count
|
Cominations of elementary datatypes into a derived user defined datatype allows clean communication of collections of disparate types in a single MPI call. |
Elimination of length (in bytes) in favor of count (of items of a given type) is clearer |
Specifying application-oriented layouts allows maximal use of special hardware and optimized memory use |
However this wonderful technology is problematical in Java where layout of data structures in memory is not defined in most cases
|
1st generation message passing systems used hardware addresses
|
MPI supports process groups
|
All communication takes place in groups
|
We find a good example when we consider typical Matrix Algorithm
|
Consider a block decomposition of 16 by 16 matrices B and C as for Laplace's equation. (Efficient Decomposition as we will see later) |
Each sum operation involves a subset(group) of 4 processors |
k = 2 |
1st generation message passing systems used an integer "tag" (a.k.a. "type" or "id") to match messages when received
|
Calls Sub1 and Sub2 are from different libraries |
Same sequence of calls on all processes, with no global synch
|
We follow with two cases showing possibility of error with messages getting mixed up between subroutine calls |
Each library was self-consistent
|
Interaction between the libraries killed them
|
The lesson:
|
Other examples teach other lessons:
|
Generalize tag to tag and communicator |
A separate communication context for each family of messages
|
No wild cards allowed in communicator, for security |
Communicator allocated by the system, for security |
Tags retained for use within a context
|
All MPI routines are prefixed by MPI_
|
MPI constants are in upper case as are MPI datatypes, e.g. MPI_FLOAT for floating point number in C |
Specify overall constants with
|
C routines are actually integer functions and always return an integer status (error) code |
Fortran routines are really subroutines and have returned status code as last argument
|
There a set of predefined constants in include files for each language and these include: |
MPI_SUCCESS -- successful return code |
MPI_COMM_WORLD (everything) and MPI_COMM_SELF(current process) are predefined reserved communicators in C and Fortran |
Fortran elementary datatypes are:
|
C elementary datatypes are:
|
MPI_Init (argc, argv) -- initialize |
MPI_Comm_rank (comm, rank) -- find process label (rank) in group |
MPI_Comm_size(comm, size) -- find total number of processes |
MPI_Send (sndbuf,count,datatype,dest,tag,comm) -- send a message |
MPI_Recv (recvbuf,count,datatype,source,tag,comm,status) -- receive a message |
MPI_Finalize( ) -- End Up |
This MUST be called to set up MPI before any other MPI routines may be called |
For C: int MPI_Init(int *argc, char **argv[] )
|
For Fortran: call MPI_INIT(mpierr)
|
This allows you to identify each process by a unique integer called the rank which runs from 0 to N-1 where there are N processes |
If we divide the region 0 to 1 by domain decomposition into N parts, the process with rank r controls
|
This returns in integer size number of processes in given communicator comm (remember this specifies processor group) |
For C: int MPI_Comm_size(MPI_Comm comm,int *size) |
For Fortran: call MPI_COMM_SIZE (comm, size, mpierr)
|
Before exiting an MPI application, it is courteous to clean up the MPI state and MPI_FINALIZE does this. No MPI routine may be called in a given process after that process has called MPI_FINALIZE |
for C: int MPI_Finalize() |
for Fortran:call MPI_FINALIZE(mpierr)
|
# all processes execute this program |
#include <stdio.h> |
#include <mpi.h> |
void main(int argc,char *argv[]) |
{ int ierror, rank, size
|
Parallel I/O has technical issues -- how best to optimize access to a file whose contents may be stored on N different disks which can deliver data in parallel and |
Semantic issues -- what does printf in C (and PRINT in Fortran) mean? |
The meaning of printf/PRINT is both undefined and changing
|
Today, memory costs have declined and ALL mainstream MIMD distributed memory machines whether clusters of workstations/PC's or integrated systems such as T3D/ Paragon/ SP-2 have enough memory on each node to run UNIX or Windows NT |
Thus printf today means typically that the node on which it runs will stick it out on "standard output" file for that node
|
If on other hand you want a stream of output with information in order
|
Then in general you need to communicate information from nodes 1 to N-1 to node 0 and let node 0 sort it and output in required order |
MPI-IO standard links I/O to MPI in a standard fashion |
call MPI_SEND (
|
integer count, datatype, dest, tag, comm, mpierr |
real sndbuf(50) |
comm = MPI_COMM_WORLD |
tag = 0 |
count = 50 |
datatype = MPI_REAL |
call MPI_SEND (sndbuf, count, datatype, dest, tag, comm, mpierr) |
call MPI_RECV(
|
Note that return_status is used after completion of receive to find actual received length (buffer_len is a maximum length allowed), actual source processor source_ rank and actual message tag |
In C syntax is |
int error_message = MPI_Recv(
|
integer status(MPI_STATUS_SIZE) An array to store status of received information |
integer mpierr, count, datatype, source, tag, comm |
integer recvbuf(100) |
count = 100 |
datatype = MPI_REAL |
comm = MPI_COMM_WORLD |
source = MPI_ANY_SOURCE accept any source processor |
tag = MPI_ANY_TAG accept any message tag |
call MPI_RECV (recvbuf, count, datatype, source, tag, comm, status, mpierr)
|
# All processes execute this program |
#include "mpi.h" |
main( int argc, char **argv ) |
{
|
if( rank == 0 ) { # We are on "root" -- Process 0
|
else { # Any processor except root -- Process 0
|
printf("This is a message from node %d saying %s\n", rank, message); |
MPI_Finalize(); |
} |
In C status is a structure of type MPI_Status
|
In Fortran the status is an integer array and different elements give:
|
In C and Fortran, the number of elements (called count) in the message can be found from call to |
call MPI_GET_COUNT (IN status, IN datatype, |
OUT count, OUT error_message)
|
Provides standard interfaces to common global operations
|
A collective operation uses a process group
|
Message tags not needed (generated internally) |
All collective operations are blocking. |
MPI_BARRIER(comm) Global Synchronization within a given communicator |
MPI_BCAST Global Broadcast |
MPI_GATHER Concatenate data from all processors in a communicator into one process
|
MPI_SCATTER takes data from one processor and scatters over all processors |
MPI_ALLTOALL sends data from all processes to all other processes |
MPI_SENDRECV exchanges data between two processors -- often used to implement "shifts"
|
#include "mpi.h" |
main( int argc, char **argv ) |
{ char message[20];
|
Note that all processes issue the broadcast operation, process 0 sends the message and all processes receive the message. |
One can often perform computing during a collective communication |
MPI_REDUCE performs reduction operation of type chosen from
|
MPI_ALLREDUCE is same as MPI_REDUCE but it stores result in all -- not just one -- processors |
MPI_SCAN performs reductions with result for processor r depending on data in processors 0 to r |
Four Processors where each has a send buffer of size 2
|
Processors |
Memory Locations |
Four Processors where each has a send buffer of size 2
|
Processors |
Memory Locations |
A |
B |
C |
D |
E |
All to All Communication with i'th location in j'th processor being sent to j'th location in i'th processor |
Processor 0 1 2 3 |
Start (a0,a1,a2,a3) (b0,b1,b2,b3) (c0,c1,c2,c3) (d0,d1,d2,d3) |
After (a0,b0,c0,d0) (a1,b1,c1,d1) (a2,b2,c2,d2) (a3,b3,c3,d3) |
There are extensions MPI_ALLTOALLV to handle case where data stored in noncontiguous fashion in each processor and when each processor sends different amounts of data to other processors |
Many MPI routines have such "vector" extensions |
"ALL" routines deliver results to all participating processes |
Routines ending in "V" allow different sized inputs on different processors |
call MPI_COMM_RANK( comm, rank, ierr ) |
if (rank .eq. 0) then |
read *, n |
end if |
call MPI_BCAST(n, 1, MPI_INTEGER, 0, comm, ierr ) |
# Each process computes its range of numbers to sum |
lo = rank*n+1 |
hi = lo+n-1 |
sum = 0.0d0 |
do i = lo, hi |
sum = sum + 1.0d0 / i |
end do |
call MPI_REDUCEALL( sum, sumout, 1, MPI_DOUBLE, |
& MPI_ADD_DOUBLE, comm, ierr) |
#include "mpi.h" |
#include <math.h> |
int main (argc, argv) |
int argc; char *argv[]; |
{ |
int n, myid, numprocs, i, rc; |
double PI25DT = 3.14159265358979323842643; |
double mypi, pi, h, sum, x, a; |
MPI_Init(&argc, &argv); |
MPI_Comm_size (MPI_COMM_WORLD, &numprocs); |
MPI_Comm_rank (MPI_COMM_WORLD, &myid); |
{ if (myid == 0) |
{ printf ("Enter the number of intervals: (0 quits) "); |
scanf ("%d", &n); |
} |
MPI_Bcast (&n, 1, MPI_INT, 0, MPI_COMMWORLD); |
if (n == 0) break; |
h = 1.0 / (double) n; |
sum = 0.0; |
for (i = myid+1; i <= n; i += numprocs) |
{ x = h * ((double) i - 0.5); sum += 4.0 / 1.0 + x*x): } |
mypi = h * sum; |
MPI_Reduce (&mypi, &pi,1, MPI_DOUBLE,MPI_SUM, 0,MPI_COMMWORLD); |
if (myid == 0) |
printf("pi is approximately %.16f, Error is %.16f\n",pi, fabs(pi-PI35DT); } |
MPI_Finalize; } |
Where does data go when you send it?
|
B) is more efficient than A), but not always correct |
B) |
A) |
Copies are not needed if
|
MPI provides modes to arrange this
|
All combinations are legal
|
MPI provides routines to provide structure to collections of processes. Although it also has graph topologies, here we concentrate on cartesian. |
A Cartesian topology is a mesh |
Example of a 3 x 4 mesh with arrows pointing at the right neighbors: |
(0,0) |
(0,1) |
(0,2) |
(0,3) |
(1,0) |
(1,1) |
(1,2) |
(1,3) |
(2,0) |
(2,1) |
(2,2) |
(2,3) |
The routine MPI_Cart_create creates a Cartesian decomposition of the processes, with the number of dimensions given by the ndim argument. It returns a new communicator (in comm2d in example below) with the same processes as in the input communicator, but different topology. |
ndim = 2; |
dims[0] = 3; dims[1] = 4; |
periods[0] = 0; periods[1] = 0; // periodic is false |
reorder = 1; // reordering is true |
ierr = MPI_Cart_create (MPI_COMM_WORLD, ndim, |
dims, periods, reorder, &comm2d);
|
Given the rank of the process returned by MPI_Comm_rank in a variable myrank, this routine gives a two element (for two dimensional topology) array (coords in example below) with the (i, j) coordinates of this process in the new cartesian communicator. |
ierr = MPI_Cart_coords (MPI_COMM_WORLD, |
myrank, ndim, coords); |
coords[0] and coords[1] will be the i and j coordinates. |
The routine MPI_Cart_shift finds the neighbors in each direction of the new communicator. |
dir = 0; // in C 0 for columns, 1 for rows |
// in Fortran, it's 1 and 2 |
disp = 1; // specifies first neighbor to the right and left |
ierr = MPI_Cart_shift (comm2d, dir, disp, &nbrbottom, |
&nbrtop): |
This returns the process numbers (ranks) for a communication of the bottom and top neighbors. |
If a process in a non-periodic mesh is on the border and has no neighbor, then the value MPI_PROCNULL is returned. This process value can be used in a send/recv, but it will have no effect. |
In a periodic mesh, as shown below the processes at the edge of the mesh wrap around in their dimension to find their neighbors. The right neighbor is wrapped |
These are an elegant solution to a problem we struggled with a lot in the early days -- all message passing is naturally built on buffers holding contiguous data |
However often (usually) the data is not stored contiguously. One can address this with a set of small MPI_SEND commands but we want messages to be as big as possible as latency is so high |
One can copy all the data elements into a single buffer and transmit this but this is tedious for the user and not very efficient |
It has extra memory to memory copies which are often quite slow |
So derived datatypes can be used to set up arbitary memory templates with variable offsets and primitive datatypes. Derived datatypes can then be used in "ordinary" MPI calls in place of primitive datatypes MPI_REAL MPI_FLOAT etc. |
Derived Datatypes should be declared integer in Fortran and MPI_Datatype in C |
Generally have form { (type0,disp0), (type1,disp1) ... (type(n-1),disp(n-1)) } with list of primitive data types typei and displacements (from start of buffer) dispi |
call MPI_TYPE_CONTIGUOUS (count, oldtype, newtype, ierr)
|
one must use call MPI_TYPE_COMMIT(derivedtype, ierr) before one can use the type derivedtype in a communication call |
call MPI_TYPE_FREE(derivedtype, ierr) frees up space used by this derived type |
integer derivedtype, ... |
call MPI_TYPE_CONTIGUOUS(10, MPI_REAL, derivedtype, ierr) |
call MPI_TYPE_COMMIT(derivedtype, ierr) |
call MPI_SEND(data, 1, derivedtype, dest, tag, MPI_COMM_WORLD, ierr) |
call MPI_TYPE_FREE(derivedtype, ierr) |
is equivalent to simpler single call |
call MPI_SEND(data, 10, MPI_REAL, dest, tag, MPI_COMM_WORLD, ierr) |
and each sends 10 contiguous real values at location data to process dest |
MPI_TYPE_VECTOR (count, blocklen, stride, oldtype, newtype, ierr)
|
Suppose in Fortran, we have an array
|
Contiguous elements |
MPI_Type_vector( nyblock, 1, nxblock+2, MPI_REAL, rowtype ) |
defines a row of elements 1:nyblock |
0 nyblock+1 |
nxblock+1 |
In Jacobi like algoritms, each processor stores its own nxblock by nyblock array of variables as well as guard rings containing the rows and columns from neighbours. One loads these guard rings at start of computation iteration and only updates points internal to array |
Guard Rings |
Display Fortran version (interchange row and column for C) |
Integer nbrtop, nbrbottom, nbrleft, nbrright |
# These are processor ranks of 4 nearest neighbors ( top, bottom, left and right respectively) -- find from MPI_CART_SHIFT (see later) |
integer rowtype, coltype # The new derived types |
call MPI_TYPE_CONTIGUOUS(nxblock, MPI_REAL, coltype, ierr) |
call MPI_TYPE_COMMIT(coltype, ierr) |
call MPI_TYPE_VECTOR(nyblock, 1, nxblock+2, MPI_REAL, rowtype, ierr) |
call MPI_TYPE_COMMIT(rowtype, ierr) |
# Now Transmit from internal edges rows to guard rings on neighbors |
call MPI_SEND( u(1,1), 1, coltype, nbrleft, 0, comm, ierr) |
call MPI_SEND( u(1,nyblock), 1, coltype, nbrright, 0, comm, ierr) |
call MPI_SEND( u(1,1), 1, rowtype, nbtop, 0, comm, ierr) |
call MPI_SEND( u(nxblock,1), 1, rowtype, nbtop, 0, comm, ierr) |
Array of indices, useful for gather/scatter |
MPI_TYPE_INDEXED (count, blocklens, indices, oldtype, newtype, ierr)
|
Partitioning
|
Communication
|
Agglomeration
|
Mapping
|
Used to numerically solve a Partial Differential Equation (PDE) on a square mesh -- below is Poisson's Equation |
Method:
|
x |
u |
y |
This is right hand side f(x,y) |
Partitioning is simple
|
Communication is simple
|
Agglomeration works along dimensions
|
Mapping: Cartesian grid directly supported by MPI virtual topologies |
For generality, write as the 2-D version
|
Adjust array bounds, iterate over local array
|
0 nylocal |
nxlocal |
nx by ny points in a |
npx by npy decomposition |
PARAMETER(nxblock=nx/nxp, nyblock=ny/nyp, nxlocal=nxblock+1, nylocal=nyblock+1) |
REAL u(0:nxlocal,0:nylocal), |
unew(0:nxlocal,0:nylocal), |
f(0:nxlocal,0:nylocal) |
REAL coord(1:2), dims(1:2), periods(1,2) |
dims(1) = nxp; dims(2) = nyp; |
periods(1) = .false.; |
periods(2) = .false. |
reorder = .true. |
ndim = 2 |
call MPI_CART_CREATE(MPI_COMM_WORLD, |
ndim,dims,periods, |
reorder,comm2d,ierr) |
CALL MPI_COMM_RANK( MPI_COMM_WORLD, |
myrank, ierr ) |
CALL MPI_CART_COORDS( MPI_COMM_WORLD, |
myrank, 2, coords, ierr ) |
CALL MPI_CART_SHIFT( comm2d, |
2, 1, nbrleft, nbrright, ierr ) |
CALL MPI_CART_SHIFT( comm2d, |
1, 1, nbrbottom, nbrtop, ierr ) |
CALL MPI_Type_vector( nyblock, |
1, nxlocal+1, MPI_REAL, rowtype, ierr) |
CALL MPI_Type_commit( |
rowtype, ierr ); |
dx = 1.0/nx; dy = 1.0/ny; err = tol * 1.0 e6 |
DO j = 0, nylocal
|
END DO |
DO WHILE (err > tol)
|
myerr = 0.0 |
DO j=1, nylocal-1
|
END DO |
CALL MPI_ALLREDUCE(myerr,err, |
1,MPI_REAL,MPI_MAX,MPI_COMM_WORLD,ierr) |
DO j=1, nylocal-1
|
END DO |
END DO |
The elapsed (wall-clock) time between two points in an MPI program can be computed using MPI_Wtime:
|
The times are local; the attribute MPI_WTIME_IS_GLOBAL may be used to determine if the times are also synchronized with each other for all processes in MPI_COMM_WORLD. |
The MPI Forum produced a new standard which include MPI 1.2 clarifications and corrections to MPI 1.1 |
MPI-2 new topics are:
|
additional language bindings for C++ and Fortran-90 |
Goal is to provide model for portable file system allowing for optimization of parallel I/O
|
Parallel I/O system provides high-level interface supporting transfers of global data structures between process memories and files. |
Significant optimizations required include:
|
Other optimizations also achieved by
|
I/O access modes defined by data partitioning expressed with derived datatypes |