5.6 组间通信子 |
本节介绍了组间通信子的概念以及MPI中支持这一概念的部分.它描述了对于书写包含用户级服务器程序的支持.
迄今所描述的所有点对点通信都涉及同一组内进程间的通信.如我们在本章前面所注意到的一样, 这种类型的通信叫"组内通信"并且所用的通信子叫"组内通信子".
在模块化或多学科的应用中,不同的进程组执行不同的模块而且不同模块内的进程在流水线内或更一般的模块图中相互间进行通信. 在这些应用中, 为一个进程指定目标进程的最自然的方式是利用目标组内的目标进程。在包含内部用户级服务器的应用中,每个服务器可以是为一个或更多客户机提供服务的进程组,同时客户机可以是使用一个或更多服务器服务的进程组. 在这些应用中,通过目标组内的序列号指定目标进程仍是最自然的.如前所述,这种类型的通信叫作"组间通信",所使用的通信子叫"组间通信子".
一个组间通信是不同组内进程间的点对点通信. 包含初始化组间通信操作进程的组叫"本地组",即接收者在接收里而发送者在发送里.包含目标进程的组叫"远程组",即接收者在发送里而发送者在接收里. 正如在组内通信中一样, 通过使用(communicator,rank)对来指定目标进程. 不象组内通信,序列号是与远程组相关的.
所有的组间通信子构造子都是阻塞的,并且要求本地和远程组间互不相连以便避免死锁.
这里是组间通信和组间通信子的特性摘要:
函数MPI_COMM_TEST_INTER可被用于检测一个通信子是组间还是组内通信子.组间通信子可被用作一些其它通信子访问函数的参数.组间通信子不能用作一些组内通信子构造函数(例如 MPI_COMM_CREATE)的输入.
对实现者的建议:为实现点对点通信,在每个进程中可用一个多元组来描述通信子,包 括:
group
send_context
receive_context
source.
对于组间通信子,group描述了远程组, 而source则是本地组中进程的序列号.对于组内通信子,group是通信子组(远程等于本地), source是此组内的进程序列号, send_context和receive_context是相同的. 一个组可用序列号--绝对地址转换表来描述.
不同时考虑本地和远程组内的进程就不能合理地讨论组间通信子.想象组p内的进程P,该进程拥有通信子Cp,和组q内的进程Q,该进程拥有通信子Cq.则:
假定P使用组间通信子向Q发送一个消息.则P使用group表来找到Q的绝对地址;sourec和send_context被追加到此消息上.
假定Q用一个使用组间通信子的显式的源参数获得了一个接收.则Q将receive_context匹配到消息上下文上而将源参数匹配到消息源上.
同样的算法对于组内通信子也是适宜的.
为支持组间通信子访问子和构造子,有必要用附加的结构来补充这个模型,此模型应能够存储本地通信组的信息,并拥有附加的安全上下文.(对实现者的建议结束)
MPI_COMM_TEST_INTER(comm,flag)
输入: comm 通信子(句柄)
输出: flag (逻辑值)
int MPI_Comm_test_inter(MPI_Comm comm,int *flag)
MPI_COMM_TEST_INTER(COMM,FLAG,IERROR)
INTEGER COMM,IERROR LOGICAL FLAG
这个本地函数允许调用进程确定一个通信子是组间通信子还是组内通信子.如果是组间通信子则返回true,否则返回false.
在组内通信下,当一个组间通信子被用作上面所描述的通信子访问子的输入参数时,下表描述了动作.
MPI_COMM_*函数动作 (在组间通信模式下) MPI_COMM_SIZE 返回本地组的大小 |
进一步讲,MPI_COMM_COMPARE对于组间通信是有效的.两个通信子必须或者是组内或者是组间通信子, 否则返回MPI_UNEQUAL.两个对应的本地和远程组必须正确比较以得到结果MPI_CONGRUENT和 MPI_SIMILAR.特别是,很可能得到结果MPI_SIMILAR,这是因为本地或远程组是近似但不同的.
下面的访问子提供了对组间通信子远程组的一致性访问:
下面的都是本地操作.
MPI_COMM_REMOTE_SIZE(comm,size)
输入: comm 组间通信子(句柄)
输出: size comm的远程组中的进程数(整数)
int MPI_COMM_Comm_remote_size(MPI_Comm comm,int *size)
MPI_COMM_REMOTE_SIZE(COMM,SIZE,IERROR)
INTEGER COMM,SIZE,IERROR
MPI_COMM_REMOTE_GROUP(comm,group)
输入: comm 组间通信子(句柄)
输出: group 对应于comm的远程组(句柄)
int MPI_Comm_remote_group(MPI_Comm comm,MPI_Group *group)
MPI_COMM_REMOTE_GROUP(COMM,GROUP,IERROR)
INTEGER COMM,GROUP,IERROR
基本原理:对于组间通信子的本地和远程组的对称访问是重要的,因此,象MPI_COMM_REMOTE_SIZE一样提供了这个函数.(基本原理结束)
本节介绍了四个阻塞的组间通信子操作.MPI_INTERCOMM_CREATE用于将两个组内通信子绑定到 一个组间通信子上;函数MPI_INTERCOMM_MERGE通过合并组间通信子的本地和远程组创建一个 组内通信子.函数MPI_COMM_DUP和MPI_COMM_FREE,如前所述,分别复制和释放一个组间通信子.
禁止绑定到一个组间通信子上的本地和远程组发生重迭.如果有重迭则程序出错并可能发生死锁.(若一个进程是多线索的,且MPI调用仅阻塞一个线索,而非一个进程,则可支持"双重成员 ".由用户负责确保代表一个进程两个角色的调用被两个独立的线索所执行.)
函数MPI_INTERCOMM_CREATE可在下列条件下被用来从两个已存在的组内通信子中创建一个 组间通信子:每一组中至少有一个可选成员("组首领")有能力与其它组的被选成员通信;即存在一个两个首领都属于它的"平等"通信子,且每个首领都知道在此平等通信子中其它首领的序列号 (两个首领可以是同一进程).而且,每个组的成员知道它们首领的序列号。
从两个组内通信子中构造一个组间通信子需要在本地组和远程组中进行分离的集合操作,就象本地组的进程和远程组的进程间的点对点通信一样.
在标准的MPI实现中(在初始化时使用静态进程分配),通信子MPI_COMM_WORLD(或其复制品) 可作为此平等通信子. 在动态MPI实现中,在这里,例如,一个进程可在MPI执行过程中派生新的子进程,父进程可作为旧通信空间与包含父子进程的新通信世界间的桥梁.
第六章所描述的应用拓扑函数不适合于组间通信子. 要求此能力的用户应该利用 MPI_INTERCOMM_MERGE来建立一个组内通信子,然后将图与笛卡尔拓扑能力应用到组内通信子上,创建一个适当的拓扑定位的组内通信子.对于这种情况,在不丢失一般性的前提下也可设计一个自己的应用拓扑机制.
MPI_INTERCOMM_CREATE(local_comm,local_leader,peer_comm,remote_leader ,tag,ne wintercomm)
输入: local_comm 本地组内通信子(句柄)
local_leader local_comm内的本地组首领的序列号(整数)
peer_comm "平等"组内通信子;仅在local_leader中有意义(句柄)
remote_leader peer_comm中远程组首领的序列号;仅在local_leader中有意义(句柄)
tag "安全"标志(整数)
输出: newintercomm 新的组间通信子(句柄)
int MPI_Intercomm_create(MPI_Comm local_comm,int local_leader,MPI_Comm peer_comm,int remote_leader,int tag,MPI_Comm *newintercomm)
MPI_INTERCOMM_CREATE(LOCAL_COMM,LOCAL_LEADER,PEER_COMM,REMOTE_LEADER ,TAG,N EWINTERCOMM,IERROR)
INTEGER LOCAL_COMM,LOCAL_LEADER,PEER_COMM,REMOTE_LEADER,TAG ,NEWINTERCOMM,IERROR
本调用创建了一个组间通信子.它对于本地和远程组的合并是集合性的.进程应该在每个组中提供相同的local_comm和local_leader参数.对于remote_leader,local_leader和tag不允许通配符.
本调用通过通信子peer_comm和首领间的标志tag来使用点对点通信.这样,就必须关心在 peer_comm上是否有可能干扰这个通信的挂起通信.
对用户的建议: 我们推荐使用现成的平等通信子,例如MPI_COMM_WORLD的复制品,以避免与平等通信子发生干扰.(对用户的建议结束)
MPI_INTERCOMM_MERGE(intercomm,high,newintracomm)
输入: intercomm 组间通信子(句柄)
high (逻辑值)
输出: newintracomm 新的组内通信子(句柄)
int MPI_Intercomm_merge(MPI_Comm intercomm,int high,MPI_Comm *newintracomm)
MPI_INTERCOMM_MERGE(INTERCOMM,HIGH,INTRACOMM,IERROR)
INTEGER INTERCOMM,INTRACOMM,IERROR LOGICAL HIGH
此函数从两个与intercomm相关的组的并中创建了一个组内通信子.对于两个组中每一个,其中的所有进程都应提供相同的high值.如果一个组中的进程提供值high=false,而另一个组中的进程提供值high=true,则并将"低"组排在"高"组的前面.如果所有的进程提供相同的high参数, 则并的顺序是任意的.此调用在两个组的并中是阻塞和集合性.
对实现者的建议:MPI_INTERCOMM_MERGE,MPI_COMM_FREE和MPI_COMM_DUP的实现类似于 MPI_INTERCOMM_CREATE的实现,所不同的是:用于组首领间通信的上下文是输入组间通信子所私有的上下文而非桥梁通信子内部的上下文.
例1:三组"流水线"
组0与组1通信.组1与组2通信.因此,组0需要一个组间通信子,组1需要两个组间通信子,而组2需要一个组间通信子.
main(int argc,char**argv)
{
MPI_Comm myComm;/*本地子组的组内通信子*/
MPI_Comm myFirstComm;/*组间通信子*/
MPI_Comm mySecondComm;/*第二个组间通信子(仅用于组1)*/
int membershipKey;
int rank;
MPI_Init(&argc,&argv);
MPI_Comm_rank(MPI_COMM_WORLD,&rank);
/*用户代码必须在范围[0,1,2]中产生membershipKey*/
membershipKey=rank%3;
/*为本地子组建立组内通信子*/
MPI_Comm_split(MPI_COMM_WORLD,membershipKey,rank,&myComm);
/*建立组间通信子,标志是硬编码*/
if(membershipKey==0)
{
/*组0与组1通信*/
MPI_Intercomm_create(myComm,0,MPI_COMM_WORLD,1, 1,&myFirstComm);
}
else if (membershipKey==1)
{
/*组1与组0和2通信*/
MPI_Intercomm_create(myComm,0,MPI_COMM_WORLD,0, 1,&myFirstComm);
MPI_Intercomm_ereate(myComm,0,MPI_COMM_WORLD,2, 12,&mySecondComm);
}
else if (membershipKey==2)
{
/*组2与组1通信*/
MPI_Intercomm_create(myComm,0,MPI_COMM_WORLD,1, 12,&myFirstComm);
}
/*做工作...*/
switch(membershipKey)/*释放适宜的通信子*/
{
case 1:
MPI_COMM_free(&mySecondComm);
case 0:
case 2:
MPI_COMM_free(&myFirstComm);
break;
}
MPI_Finalize();
}
例2:三组"环"
组0与组1通信.组1与组2通信.组0与组2通信.因此,每组需要两个通信子.
main(int argc,char **argv)
{
MPI_Comm myComm;/*本地子组的组内通信子*/
MPI_Comm myFirstComm;/*组间通信子*/
MPI_Comm mySecondComm;
MPI_Status status;
int membershipKey;
int rank;
MPI_Init(&argc,&argv);
MPI_Comm_rank(MPI_COMM_WORLD,&rank);
...
/*用户代码必须在范围[0,1,2]内产生membershipKey*/
membershipKey=rank % 3;
/*为本地子组建立组内通信子*/
MPI_Comm_split(MPI_COMM_WORLD,membershipKey,rank,&myComm); /
*建立组间通信子,标志是硬编码*/
if (membershipKey== 0)
{
/*组0与组1和2通信*/
MPI_Intercomm_create(myComm, 0,MPI_COMM_WORLD,1, 1,&myFirstComm);
MPI_Intercomm_create(myComm, 0,MPI_COMM_WORLD,2, 2,&mySecondComm);
}
else if (membershipKey==1)
{
/*组1与组0和2通信*/
MPI_Intercomm_create(myComm,0, MPI_COMM_WORLD, 0, 1,&myFirstComm);
MPI_Intercomm_create(myComm,0, MPI_COMM_WORLD, 2, 12,&mySecondComm);
}
else if (membershipKey==2)
{
/*组2与组0和1通信*/
MPI_Intercomm_create(myComm, 0,MPI_COMM_WORLD,0, 2,&myFirstComm);
MPI_Intercomm_create(myComm,0,MPI_COMM_WORLD,1, 12,&mySecondComm);
}
/*做一些工作*/
/*然后在中止前释放通信子*/
MPI_Comm_free(&myFirstComm);
MPI_Comm_free(&mySecondComm);
MPI_Comm_free(&myComm);
MPI_Finalize();
}
例3:为组间通信建立名字服务
后面的过程举例说明了能够创建名字服务的进程,名字服务通过一个包含服务器通信子的集合和一个由两个组共同选定的标志名来建立组间通信子.
当所有MPI进程执行了MPI_INIT后,每个进程调用下面所定义的函数例子Init_serve().接着, 如果new_world返回NULL,得到NULL的进程则需要在一个重新激活的循环Do_serve()内实现一个服务器函数. 其它的每个进程通过使用new_world作为新的有效的"全局"通信子来作它们预先规划好的计算。当不再需要服务器时,某一个指定的进程调用Undo_Server()来清除服务器.
此方法包括以下特征:
#define INIT_SERVER_TAG_1 666
#define UNDO_SERVER_TAG_1 777
static int server_key_val;
/*对于server_comm的属性管理,复制回调*/
void handle_copy_fn(MPI_Comm *oldcomm,int *keyval, void *extra_state,
void *attribute_val_in,void **attribute_val_out,int *flag)
{
/*复制句柄*/
*attributez_val_out=attribute_val_in;
*flag=1;/*指出此拷贝将要发生*/
}
int Init_server(peer_comm,rank_of_server,server_comm,new_world)
MPI_Comm peer_comm;
int rank_of_server;
MPI_Comm *server_comm;
MPI_Comm *new_world;/*新的有效的通信子,无服务器*/
{
MPI_Comm temp_comm, lone_xomm;
MPI_Group peer_group, temp_group;
int rank_in_peer_comm,size,color,key=0;
int peer_leader,peer_leader_rank_in_temp_comm;
MPI_Comm_rank(peer_comm,&rank_in_peer_comm);
MPI_Comm_size(peer_comm,*size);
if((size<2) ||(0>rank_of_server)||(rank_of_server >= siae))
return (MPI_ERR_OTHER);
/*通过将peer_comm分成服务器进程和其它进程,来创建两个通信子*/
peer_leader=(rank_of_server+1)%size;/*任意选择*/
if((color=(rank_in_peer_comm==rank_of_server)))
{
MPI_Comm_split(peer_comm,color,key,&lone_comm);
MPI_Intercomm_create(lone_comm,0,peer_comm,peer_leader, INIT_SERVER_TAG_1,server_comm);
MPI_Comm_free(&lone_comm);
*new_world=(MPI_Comm)0;
}
else
{
MPI_Comm_Split(peer_comm,color,key,&temp_comm);
MPI_Comm_group(peer_comm,&peer_group);
MPI_Comm_group(temp_comm,&temp_group);
MPI_Group_translate_ranks(peer_group, 1, &peer_leader,temp_group,&peer_leader_rank_in_temp_comm);
MPI_Intercomm_create(cemp_comm,peer_leader_rank_in_temp_comm, peer_comm,rank_of_server,
INIT_SERVER_TAG_1,server_comm);
/*将new_world通信属性附到server_comm上*/
/*多线索的关键部分*/
if(server_keyval==MPI_KEYVAL_INVALID)
{
/*为服务器keyval获得进程本地名字*/
MPI_Attr_keyval_create(handle_copy_fn,NULL, &Server_keyval,NULL);
}
*new_world=temp_comm;
/*在组间通信子上的组内通信子缓冲句柄*/
MPI_Attr_put(server_comm,server_keyval,(void *)(*new_world));
}
return(MPI_SUCCESS);
}
实际的服务器进程将交付运行下列代码:
int Do_server(server_comm)
MPI_Comm server_comm;
{
void init_queue();
int en_queue(),de_queue();/*为后面的匹配保存整数三元组(不显示fns)*/
MPI_Comm comm;
MPI_Status status;
int client_tag,client_source;
int client_rank_in_new_world,pairs_rank_in_new_world;
int buffer[10],count=1;
void *queue;
init_queue(&queue);
for(;;)
{
MPI_Recv(buffer,count,MPI_INT,MPI_ANY_SOURCE,MPI_ANY_TAG, server_comm,&status);
/*从任意客户机中接收*/
/*确定客户机*/
client_tag=status.MPI_TAG;
client_source=status.MPI_SOURCE;
client_rank_in_new_world=buffer[0];
if(client_tag==UNDO_SERVER_TAG_1)/*中止进程的客户机*/
{
while (de_queue,MPI_ANY_TAG,&pairs_rank_in_new_world, &pairs_rank_in_server))
;
MPI_Intercomm_free(&server_comm);
break;
}
if(de_queue(queue,client_tag,&pairs_rank_in_new_world, &pairs_rank_in_server))
{
/*用一些标志进行成对匹配,告诉它们相互的情况*/
buffer[0]=pairs_rank_in_new_world;
MPI_Send(buffer,1,MPI_INT,client_src,client_tag, server_comm);
buffer[0]=client_rank_in_new_world;
MPI_Send(buffer,1,MPI_INT,pairs_rank_in_server,client_tag, server_comm);
}
else
en_queue(queue,client_tag,client_source, client_rank_in_new_world);
}
}
当服务器不再需要时由一个专门的进程来负责结束它.调用Undo_server将中止服务器函数.
int Undo_server(server_comm)/*结束服务器的客户机例程*/
MPI_Comm *server_comm;
{
int buffer = 0;
MPI_Send(&buffer,1, MPI_INT,0, UNDO_SERBER_TAG_1,*server_comm);
MPI_Intercomm_free(server_comm);
}
下面是一个用于组间通信的阻塞的名字服务,其语义约束与MPI_Intercomm_create相同,但是简化了语法.它仅使用了创建名字服务的功能.
int Intercomm_name_create(local_comm,server_comm,tag,comm)
MPI_Comm local_comm, server_comm;
int tag;
MPI_Comm *comm;
{
int error;
int found;/*attribute acquisitio mgmt for new_world */
/*server_comm中的comm*/
void *val;
MPI_Comm new_world;
int brffer[10],rank;
int local_leader=0;
MPI_Attr_get(server_comm,server_keyval,&val,&found);
new_world=(MPI_Comm)val;/*收回缓冲句柄*/
MPI_Comm_rank(server_comm,&rank);/*本地组中的序列号*/
if(rank==local_leader)
{
buffer[0]=rank;
MPI_Send(&buffer,1,MPI_INT,0,tag,server_comm);
MPI_Recv(&buffer,1,MPI_INT,0,tag,server_comm);
}
error =MPI_Intercomm_create(local_leader,local_comm,buffer[0], new_world,tag,comm);
return(error);
}
Copyright: NPACT |