ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

MPI — 2. 点对点通信

2021-09-20 19:01:21  阅读:171  来源: 互联网

标签:27 14 16 int 点对点 通信 MPI array


点对点通信

  • MPI中数据通信类似邮件发送

一个进程发送数据的拷贝到另一个进程/一组进程,其它进程则接收数据拷贝

  • 数据通信要求

发送方:(必须知道的)
1、发送数据谁,给哪个进程
2、发送什么数据,包括数据类型和数据的数量
3、用户定义的tag(类似邮件的主题,告诉接收方接收的是什么类型的数据)

接收方:(可能知道)
1、谁发送的(如果接收方不知道,那么sender rank就是MPI_ANY_SOURCE,即任意进程都可以发送)
2、接收到什么数据(可以是部分数据,不是完整的?为什么不接收完整的呢?)
3、用户定义的tag(如果接收方不知道,那么可以是MPI_ANY_TAG)

  • MPI数据描述

1、数据类型
C/C++ 中常规数据类型大写在其前面加上MPI_, 代表MPI中对应C/C++中的常规数据类型

MPI_INT --> int
MPI_DOUBLE --> double
MPI_CHAR --> char

也可以创建复杂的数据类型

2、数据总量
MPI_Send, MPI_Recv 传输数据需要指定传输数据的总量

int MPI_Send(const void *buf, int count, MPR_Datatype datatype, int dest, int tag, MPI_Comm comm)
int MPI_Recv(void *buf, int count, MPR_Datatype datatype, int source, int tag, MPI_Comm comm, MPI_Status *status)

MPI_Status声明如下:

typedef struct MPI_Status {
    int count_lo;
    int count_hi_and_cancelled;
    int MPI_SOURCE;
    int MPI_TAG;
    int MPI_ERROR;
} MPI_Status;

以一维数组排序为例说明,将数组拆分为N份,分到N个不同进程中进行排序后,然后在第一个进程中进行合并。c++代码如下:

#include <mpi.h>
#include <iostream>
#include <stdlib.h>
#include <string.h>
#include <vector>
using namespace std;

#define random(x) (rand()%x)

template <typename Dtype>
void print_array(const vector<Dtype> &array) {
    int size = array.size();
    if (size == 0) {
        cout << "It is an empty array." << endl;
        return;
    }
    for (int i=0; i<size-1; i++) {
        cout << array[i] << ",";
    }
    cout << array[size-1] << endl;
}

template <typename Dtype>
void quick_sort(vector<Dtype> &array, int start, int end) {
    if (start >= end) {
        return;
    }
    //cout << "quick_sort: " << start << " " << end << endl;    
    int low = start;
    int high = end;
    Dtype key = array[low];
    while (low < high) {
        while (array[high] >= key && low < high )
            high--;
        array[low] = array[high];
        //cout << low << " " << high << endl;
        //break;
        while (array[low] <= key && low < high)
            low++;
        array[high] = array[low];
        
    }
    //cout << low << " " << high << endl;
    array[low] = key;
    quick_sort(array, start, low-1);
    quick_sort(array, low+1, end);
}


template <typename Dtype>
void merge(const Dtype *ptr1, int num1, const Dtype *ptr2, int num2, vector<Dtype> &array_merged) {

    if (ptr1 == nullptr || ptr2 == nullptr || num1 == 0 || num2 == 0) {
        cout << "null point or zero size." << endl;
        return;
    }

    array_merged = vector<Dtype>(num1+num2, 0);
    int i=0, i1=0, i2=0;
    for (i=0; i< num1+num2; i++ ) {
        if (i1 < num1 && i2 < num2) {
            if (ptr1[i1] < ptr2[i2]) {
                array_merged[i]=ptr1[i1];
                i1 ++;
            }
            else {
                array_merged[i]=ptr2[i2];
                i2 ++;
            }
        }
        else {
            if (i1 < num1) {
                array_merged[i]=ptr1[i1];
                i1 ++;
            } 
            else {
                array_merged[i]=ptr2[i2];
                i2 ++;
            } 
        }

    }
}


int main(int argc, char **argv) {
    /*
    Split an array to K chunks, and each of K processes in a communicator sorts one chunk.
    Finally, the first process merge the sorted chunks recived from the others.
    */
   
    if (argc < 3) {
        cout << "Missing parameters" << endl;
        return 0;
    }
 
    int rank, size;
    MPI_Init(&argc, &argv);
    MPI_Comm_rank(MPI_COMM_WORLD, &rank);
    MPI_Comm_size(MPI_COMM_WORLD, &size);
   
    int N = atoi(argv[1]);
    int M = atoi(argv[2]);

    if (N < rank) {
        cout << "The length of the array is less than the number of processes.";
        return 0;
    }

    vector<int> array(N,-1);
    int num_per_chunk = N / size;
    int num_last_chunk = num_per_chunk;
    if (N % size != 0)
        num_last_chunk += N % size;

    //cout << "num_per_chunk: " << num_per_chunk << ", num_last_chunk: " << num_last_chunk << endl;

    if (rank == 0) {
        for (int i=0; i< N; i++)
            array[i] = random(M);
        cout << "initial array: " << endl;
        print_array(array);

        // send K-1 chunks to the other processes in the communicator. (K=size)
        for (int k=1; k<size-1; k++) {
            MPI_Send(&array[k*num_per_chunk], num_per_chunk, MPI_INT, k, 0, MPI_COMM_WORLD);     
        }
        MPI_Send(&array[N-num_last_chunk], num_last_chunk, MPI_INT, size-1, 0, MPI_COMM_WORLD);     

        // sort the first chunk        
        quick_sort(array, 0, num_per_chunk-1);    

        // recive K-1 chunks from the other processes 
        MPI_Status status;
        for (int k=1; k<size-1; k++) {
            MPI_Recv(&array[k*num_per_chunk], num_per_chunk, MPI_INT, k, 0, MPI_COMM_WORLD, &status);     
        }
        MPI_Recv(&array[N-num_last_chunk], num_last_chunk, MPI_INT, size-1, 0, MPI_COMM_WORLD, &status);     
       
        cout << "gathered array: " << endl; 
        print_array(array);
        
        // merge the K sorted chunks
        vector<int> merged_array(num_last_chunk);
        memcpy(merged_array.data(), &array[N-num_last_chunk], num_last_chunk*sizeof(int));
        //print_array(merged_array);
        
        for (int k=0; k<size-1; k++){
            vector<int> temp_array = merged_array;
            merge(&array[k*num_per_chunk], num_per_chunk, &merged_array[0], merged_array.size(), temp_array);
            merged_array = temp_array;
            //print_array(temp_array);
        }
        
        cout << "sorted array: " << endl;
        print_array(merged_array);
        

    } else {
        MPI_Status status;
        int num = 0;
        if (rank < size-1)
            num = num_per_chunk;
        else
            num = num_last_chunk;
        // receive a chunk of the array
        MPI_Recv(&array[0], num, MPI_INT, 0, 0, MPI_COMM_WORLD, &status);

        // sort the received chunk
        quick_sort(array, 0, num-1);

        // send back teh sorted chunk
        MPI_Send(&array[0], num, MPI_INT, 0, 0, MPI_COMM_WORLD);
            
    }
  

    MPI_Finalize();
    return 0;
}

执行如下命令,打印原始数组,从各个进程gather数组,最终排序后数组的元素

# mpiexec -np 17 ./sort_any_procs 123 30
initial array: 
13,16,27,25,23,25,16,12,9,1,2,7,20,19,23,16,0,6,22,16,11,8,27,9,2,20,2,13,7,25,29,12,12,18,29,27,13,16,1,22,9,3,21,29,14,7,8,14,5,0,23,16,1,20,26,3,2,20,16,1,15,15,14,27,26,5,16,9,13,17,24,15,12,15,14,27,14,14,3,20,7,18,6,8,8,24,3,11,14,19,12,0,26,18,19,22,16,6,24,29,15,10,14,28,17,21,17,2,27,12,22,26,1,20,26,1,15,29,4,29,10,9,21
gathered array: 
13,16,16,23,25,25,27,1,2,7,9,12,19,20,0,6,11,16,16,22,23,2,2,8,9,13,20,27,7,12,12,18,25,29,29,1,3,9,13,16,22,27,5,7,8,14,14,21,29,0,1,3,16,20,23,26,1,2,14,15,15,16,20,5,9,13,16,17,26,27,12,14,14,15,15,24,27,3,6,7,8,14,18,20,3,8,11,12,14,19,24,0,6,16,18,19,22,26,10,14,15,17,24,28,29,2,12,17,21,22,26,27,1,1,4,9,10,15,20,21,26,29,29
sorted array: 
0,0,0,1,1,1,1,1,1,2,2,2,2,2,3,3,3,3,4,5,5,6,6,6,7,7,7,7,8,8,8,8,9,9,9,9,9,10,10,11,11,12,12,12,12,12,12,13,13,13,13,14,14,14,14,14,14,14,14,15,15,15,15,15,15,16,16,16,16,16,16,16,16,16,17,17,17,18,18,18,19,19,19,20,20,20,20,20,20,21,21,21,22,22,22,22,23,23,23,24,24,24,25,25,25,26,26,26,26,26,27,27,27,27,27,27,28,29,29,29,29,29,29

标签:27,14,16,int,点对点,通信,MPI,array
来源: https://www.cnblogs.com/wolfling/p/15144258.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有