3.13 打包(Pack)和解包(Unpack) |
一些现存的通信库为发送不连续数据提供打包/解包函数. 在这种系统下用户在发送前显式地把数据包装到一个连续的缓冲区, 在接收之后从连续缓冲区中解包.在多数情况下允许有一个派生数据类型不显式打包和解包. 用户指明要发送的和接收的数据的分布, 通信库直接访问一个不连续的缓冲区. 打包/解包是为了保持和以前的库兼容而设置的. 他们也提供了一些MPI以前不能提供的功能. 例如, 一个消息可以被分成几个部分来接收, 接收后面部分的接收操作可能依赖于前一部分的内容. 另一个用途是即将发出的消息可能被显式地暂存于用户提供的空间, 以此来代替系统缓存策略. 最后, 打包和解包操作有助于在MPI的顶层进一步开发附加的通信库.
MPI_PACK(inbuf, incount, datatype, outbuf, outcount, position, comm )
IN inbuf 输入缓冲区起始(选择)
IN incount 输入数据项数目(整型)
IN datatype 每个输入数据项的类型(句柄)
OUT outbuf 输出缓冲区开始(选择)
IN outcount 输出缓冲区大小, 字节(整型)
INOUT position 缓冲区当前位置, 字节(整型)
IN comm 打包的消息的通信子(句柄)
int MPI_Pack(void* inbuf, int incount, MPI_datatype, void *outbuf, int outcount, int *position, MPI_Comm comm)
MPI_PACK(INBUF,INCOUNT,DATATYPE,OUTBUF,OUTCOUNT,POSITION,COMM, IERROR) INBUF(*),OUTBUF(*)
INTEGER INCOUNT,DATATYPE,OUTCOUNT,POSITION,COMM,IERROR
包装由inbuf,incount,datatype指定的发送缓冲区中的消息到outbuf和outcount指定的缓冲区空间. 输入缓冲区可以是MPI_SEND允许的任何通信缓冲区. 输出缓冲区是一个连续的存储空间, 含有outcount个字节, 起始地址为outbuf (长度是以字节为单位的, 而不是以元素为单位, 就象它是一个为MPI_PACKED类型的通信而准备的缓冲区)
入口参数position的值是输出缓冲区中用于打包的起始地址. 它的值以打包消息的大小来增加, 出口参数position的值是被打包的消息占用的输出缓冲区后面的第一个地址. comm参数是将在后面用于发送打包的消息时用的通信子.
MPI_UNPACK(inbuf, insize, position, outbuf, outcount, datatype, comm )
IN inbuf 输入缓冲区起始(选择)
IN insize 输入数据项数目(整型)
INOUT position 缓冲区当前位置, 字节(整型)
OUT outbuf 输出缓冲区开始(选择)
IN outcount 输出缓冲区大小, 字节(整型)
IN datatype 每个输入数据项的类型(句柄)
IN comm 打包的消息的通信子(句柄)
int MPI_Unpack(void* inbuf, int insize, int *position, void *outbuf, int outcount, MPI_Datatype datatype, MPI_Comm comm)
MPI_UNPACK(INBUF,INSIZE, POSITION,OUTBUF,OUTCOUNT, DATATYPE,COMM, IERROR) INBUF(*),OUTBUF(*)
INTEGER INSIZE, POSITION,OUTCOUNT, , DATATYPE COMM,IERROR
从inbuf和insize指定的缓冲区空间解包一个消息到outbuf,outcount,datatype指定的缓冲区中.输出缓冲区可以是MPI_RECV允许的任何通信缓冲区. 输入缓冲区是一个连续的存储空间, 大小为insize字节, 开始地址为inbuf. 入口参数position的值是输出缓冲区中被打包消息占用的起始地址. 它的值以打包消息的大小来增加, 因此出口参数position的值是输出缓冲区中被解包的消息占用空间后面的第一个地址. comm参数是用于接收消息的通信子.
##对用户的建议##
注意MPI_RECV和MPI_UNPACK的区别: 在MPI_RECV中, count参数指明的是可以接收的最大项数. 实际接收的项数是由接收的消息的长度来决定的. 在MPI_UNPACK中, count参数指明实际打包的项数; 相应消息的"size"是position的增加值. 这种改动的原因是"输入消息的大小" 直到用户决定如何解包之前是不能预先确定的;从解包的项数来确定"消息大小"也是很困难的. 实际上在异构系统中,该数据是不能被确定的一个priori
要理解打包和解包的执行, 可以想象消息中的数据部分是一个序列, 这个序列是通过连接该发送消息中的连续值而占用的.打包操作将缓冲区的这一序列存储,就象向这个缓冲区发送消息.解包操作将这一序列从缓冲区里退回,就象从这个缓冲区里读消息.
几个消息能同时成功地打到一个包装单元里.这是由几个对MPI_PACK的连续相关的调用来完成的, 第一个调用提供 position=0,每个后续的调用以前一个调用输出的position的值作为输入, 而使用和前一个调用相同的inbuf,insize,和comm值.这个打包单元现在包含的信息同一个发送操作导致的存储在缓冲区里的信息一样,这个缓冲区由单独的发送缓冲区"连接"而成的.
一个打包单元可以用MPI_PACKED发送. 任何点到点或者收集式通信功能可以被用来从一个进程移动构成打包单元的字节序列到另一个进程中. 该打包单元这时可以用任何接收操作使用任意数据类型来接收: 类型匹配的规定对于以MPI_PACKED类型发送的消息不再起作用.
以任何类型发送的消息(包括MPI_PACKED类型)都可以用MPI_PACKED类型接收. 这样的消息于是就可以被调用MPI_UNPACK来解包.
一个打包单元(或者一个一般创建,"类型化"发送的消息)可以被解包成几个连续的消息. 这是通过几个连续相关的对MPI_UNPACK调用来实现的, 第一个调用提供 position=0,每个后续的调用以前一个调用输出的position的值作为输入, 而使用和前一个调用相同的inbuf,insize,和comm值.
连接两个打包单元不一定形成一个打包单元; 一个打包单元的子串也不一定是一个打包单元. 因此, 用户不能连接两个打包单元然后把结果当作一个打包单元来解包; 也不能把一个打包单元的子串作为一个打包单元来解包. 每个由几个相关的打包调用序列或由一个普通发送生成的打包单元都必须被作为一个单元, 通过一系列相关的解包调用来解包,.
##基本原理##
对"原子"包和对打包单元解包的限制允许执行者在包装单元的头部增加附加的信息, 例如对发送者结构的描述(在异构环境中用于类型转换) 下面的调用允许用户确定需要多少空间来对一个消息打包, 以便管理缓冲区的空间分配.
MPI_PACK_SIZE( incount, datatype, comm, size )
IN incount 打包调用的计数参数(整型)
IN datatype 打包调用的数据类型参数(句柄)
IN comm 打包调用的通信子参数(句柄)
OUT size 打包消息大小的上界, 字节数(整型)
int MPI_Pack_size(int incount, MPI_Datatype datatype, MPI_Comm comm, int *size)
MPI_PACK_SIZE(INCOUNT,DATATYPE,COMM,SIZE,IERROR)
INTEGER INCOUNT,DATATYPE,COMM,SIZE,IERROR
MPI_PACK_SIZE(incount,datatype,comm,size)调用用size返回position增长的上界值, position的值是受MPI_PACK(inbuf, incount,datatype, outbuf, outcount, position, comm)影响的.
##基本原理##
该调用返回的是上界, 而不是一个精确界, 这是因为包装一个消息所需要的精确空间可能依赖于上下文(例如, 第一个打包单元中包装的消息可能占用相对更多的空间)
例3.36 使用MPI_PACK的例子
int position , i,j,a[2];
char buff[1000];
...
MPI_Comm_rank(MPI_COMM_WORLD,&myrank);
if (myrank ==0) {
/* SENDER CODE */
position =0;
MPI_Pack(&i,1,MPI_INT,buff,1000,&position, MPI_COMM_WORLD);
MPI_Pack(&j,1,MPI_INT,buff,1000,&position, MPI_COMM_WORLD);
MPI_Send(buff,position, MPI_PACKED,1,0,MPI_COMM_WORLD); }
else
/* RECEIVER CODE */
MPI_Recv(a,2, MPI_INT,0,0,MPI_COMM_WORLD)
}
例3.37 一个详细的例子
int position , i;
float a[1000];
char buff[1000];
....
MPI_Comm_rank(MPI_Comm_world,&myrank);
if (myrank ==0) {
/* SENDER CODE */
int len[2];
MPI_Aint disp[2];
MPI_Datatype type[2], newtype;
/* build datatype for i followed by a[0]...a[i-1] */
len[0]=1;
len[1]=i;
MPI_Address( &I,disp);
MPI_Address( a,disp+1);
type[0]=MPI_INT;
type[1]=MPI_FLOAT;
MPI_Type_struct(2,len,disp,type,&newtype);
MPI_Type_commit(&newtype);
/*Pack i followed by a[0]...a[i-1] */
position =0;
MPI_Pack(MPI_BOTTOM, 1,newtype, buff, 1000,&position,MPI_COMM_WORLD);
/* Send */
MPI_Send(buff,postion, PI_PACKED,1,0, MPI_COMM_WORLD)
/* ***** One can replace the last three lines with^M MPI_Send( MPI_BOTTOM,1,newtype, 1, 0,MPI_COMM_WORLD); ***** */
}
else /* myrank ==0 */ {
/* RECIEVER CODE */
MPI_Status status; /* Receive */
MPI_Recv(buff, 1000,MPI_PACKED,0,0,&status); /* Unpack i */
position =0;
MPI_Unpack(buff,1000,&position,&i,1,MPI_INT,MPI_COMM_WORLD);
/* Unpack a[0]...a[i-1] */
MPI_Unpack(buff,1000,&position,a,i,MPI_FLOAT,MPI_COMM_WORLD);
}
例3.38 每个进程向root发送一个计数, 后跟相应个数的字符;
root连接所有字符成为一个字符串.
int count, gsize, counts[64], totalcount, k1, k2, k;
displs[64], position, concat_pos;
char chr[100], *lbuf, *rbuf, *cbuf;
...
MPI_Comm_size(comm, &gsize);
MPI_Comm_rank(comm, &myrank);
/* allocate local pack buffer */
MPI_Pack_size(1, MPI_INT, comm, &k1);
MPI_Pack_size(count, MPI_CHAR, &k2);
k=k1+k2;
lbuf = (char *)malloc(k);
/* pack count, followed by count characters */
position =0;
MPI_Pack(&count, 1, MPI_INT, &lbuf, k, &position, comm);
MPI_Pack(chr, count, MPI_CHAR, &lbuf, k, &position, comm);
if (myrank != root){
/* gather at root sizes of all packed messages */
MPI_Gather( &position, 1, MPI_INT, NULL, NULL,^M NULL, root, comm);
/* gather at root packed messages */
MPI_Gatherv( &buf, postion, MPI_PACKED, NULL, NULL, NULL, NULL,root, comm);
}
else {
/* root code */
/* gather size of all packed messages */
MPI_Gather( &position, 1, MPI_INT, counts, 1, MPI_INT, root, comm);
/* gather all packed messages */
displs[0]=0;
for (i=1; i<gsize; i++)
displs[i]= displs[i-1]+ counts[i-1];
totalcount=disps[gsize-1]+counts[gsize-1];
rbuf=(char *)malloc(totalcount);
cbuf=(char *)malloc(totalcount);
MPI_Gatherv( lbuf, postion, MPI_PACKED, rbuf,^M counts, displs, MPI_PACKED, root, comm);
/* unpack all messages amd concatenate strings */
concat_pos = 0;
for (i=0; i<gsize; i++) {
position =0;
MPI_Unpack(rbuf+displs[i], totalcount-displs[i], &position,&count,1,MPI_INT,comm);
MPI_Unpack(rbuf+displs[i], totalcount-displs[i],&position,cbuf+concat_pos,1,count,MPI_CHAR,comm);
concat_pos+=count; }
cbuf[concat_pos]='\0';
}
Copyright: NPACT |