创建一个跨 MPI 进程保持同步的计数器

2021-12-30 00:00:00 thread-safety mpi count c++ mpi-rma

我在使用基本的 comm 和 group MPI2 方法方面有相当多的经验,并且使用 MPI 做了相当多令人尴尬的并行模拟工作.到目前为止,我已经构建了我的代码,使其具有一个调度节点和一堆工作节点.调度节点有一个将与模拟器一起运行的参数文件列表.它使用参数文件为每个工作节点播种.工作节点运行他们的模拟,然后请求调度节点提供的另一个参数文件.运行完所有参数文件后,调度节点会先关闭每个工作节点,然后再自行关闭.

I have quite a bit of experience using the basic comm and group MPI2 methods, and do quite a bit of embarrassingly parallel simulation work using MPI. Up until now, I have structured my code to have a dispatch node, and a bunch of worker nodes. The dispatch node has a list of parameter files that will be run with the simulator. It seeds each worker node with a parameter file. The worker nodes run their simulation, then request another parameter file, which the dispatch node provides. Once all parameter files have been run, the dispatch node shuts down each worker node, before shutting itself down.

参数文件通常命名为Par_N.txt",其中 N 是标识整数(例如- N = 1-1000).所以我在想,如果我可以创建一个计数器,并且可以让这个计数器在我的所有节点之间同步,我就可以消除对调度节点的需求,并使系统更简单一些.这在理论上听起来很简单,但实际上我怀疑这有点困难,因为我需要确保在更改时锁定计数器等.并且认为 MPI 可能有一种内置方式处理这个(事情.有什么想法吗?我是不是想多了?

The parameter files are typically named "Par_N.txt" where N is the identifying integer (e.g.- N = 1-1000). So I was thinking, if I could create a counter, and could have this counter synchronized across all of my nodes, I could eliminate the need to have a dispatch node, and make the system a bit more simple. As simple as this sounds in theory, in practice I suspect it is a bit more difficult, as I'd need to ensure the counter is locked while being changed, etc.. And thought there might be a built-in way for MPI to handle this. Any thoughts? Am I over thinking this?

推荐答案

实现一个共享计数器并非易事,但是一旦你做到了,并且把它放在某个图书馆的某个地方,你就可以做很多与它.

Implementing a shared counter isn't trivial, but once you do it and have it in a library somewhere you can do a lot with it.

在 Using MPI-2 一书中,您应该如果您要实施这些东西,则必须提供示例之一(代码是 在线提供) 是一个共享计数器.不可扩展"的应该适用于几十个进程――计数器是一个 0..size-1 的整数数组,每个等级一个,然后获取下一个工作项#"操作包括锁定窗口,读取其他人对计数器的贡献(在这种情况下,他们拿走了多少项目),更新自己的(++),关闭窗口,并计算总数.这一切都是通过被动的单方面操作完成的.(更好的缩放只使用一棵树而不是一维数组).

In the Using MPI-2 book, which you should have to hand if you're going to implement this stuff, one of the examples (the code is available online) is a shared counter. The "non-scalable" one should work well out to several dozens of processes -- the counter is an array of 0..size-1 of integers, one per rank, and then the `get next work item #' operation consists of locking the window, reading everyone elses' contribution to the counter (in this case, how many items they've taken), updating your own (++), closing the window, and calculating the total. This is all done with passive one-sided operations. (The better-scaling one just uses a tree rather than a 1-d array).

因此,您可以说等级 0 托管计数器,并且每个人都继续做工作单元并更新计数器以获取下一个计数器,直到没有更多工作为止;然后你在障碍物或其他地方等待并完成.

So the use would be you have say rank 0 host the counter, and everyone keeps doing work units and updating the counter to get the next one until there's no more work; then you wait at a barrier or something and finalize.

一旦你有这样的东西 - 使用共享值来获得下一个可用的工作单元 - 工作,那么你可以推广到更复杂的方法.因此,正如 suzterpatt 所建议的那样,每个人在开始时都获得他们份额"的工作单位效果很好,但是如果有些人完成得比其他人快怎么办?现在通常的答案是偷工减料;每个人都将自己的工作单元列表保存在出队中,然后当一个工作用完时,它会从其他人出队的另一端窃取工作单元,直到没有更多工作为止.这实际上是 master-worker 的完全分布式版本,其中不再有单个 master 分区工作.一旦你有一个单一的共享计数器工作,你可以从中创建互斥锁,然后你可以实现出队.但如果简单的共享计数器运行良好,您可能不需要去那里.

Once you have something like this - using a shared value to get the next work unit available - working, then you can generalize to more sophisticated approach. So as suzterpatt suggested, everyone taking "their share" of work units at the start works great, but what to do if some finish faster than others? The usual answer now is work-stealing; everyone keeps their list of work units in a dequeue, and then when one runs out of work, it steals work units from the other end of someone elses dequeue, until there's no more work left. This is really the completely-distributed version of master-worker, where there's no more single master partitioning work. Once you have a single shared counter working, you can make mutexes from those, and from that you can implement the dequeue. But if the simple shared-counter works well enough, you may not need to go there.

更新: 好的,这里有一个尝试做共享计数器的 hacky-尝试 - 我在 MPI-2 书中的简单版本:似乎有效,但我不会说什么比那强得多(很久没玩这个东西了).有一个简单的计数器实现(对应于 MPI-2 书中的非缩放版本)和两个简单??的测试,一个大致对应于您的工作案例;每个项目都会更新计数器以获取一个工作项目,然后执行工作"(随机睡眠时间).在每次测试结束时,会打印出计数器数据结构,即每个等级所做的增量#.

Update: Ok, so here's a hacky-attempt at doing the shared counter - my version of the simple one in the MPI-2 book: seems to work, but I wouldn't say anything much stronger than that (haven't played with this stuff for a long time). There's a simple counter implementation (corresponding to the non-scaling version in the MPI-2 book) with two simple tests, one corresponding roughly to your work case; each item updates the counter to get a work item, then does the "work" (sleeps for random amount of time). At the end of each test, the counter data structure is printed out, which is the # of increments each rank has done.

#include <mpi.h>
#include <stdlib.h>
#include <stdio.h>
#include <unistd.h>

struct mpi_counter_t {
    MPI_Win win;
    int  hostrank ;
    int  myval;
    int *data;
    int rank, size;
};

struct mpi_counter_t *create_counter(int hostrank) {
    struct mpi_counter_t *count;

    count = (struct mpi_counter_t *)malloc(sizeof(struct mpi_counter_t));
    count->hostrank = hostrank;
    MPI_Comm_rank(MPI_COMM_WORLD, &(count->rank));
    MPI_Comm_size(MPI_COMM_WORLD, &(count->size));

    if (count->rank == hostrank) {
        MPI_Alloc_mem(count->size * sizeof(int), MPI_INFO_NULL, &(count->data));
        for (int i=0; i<count->size; i++) count->data[i] = 0;
        MPI_Win_create(count->data, count->size * sizeof(int), sizeof(int),
                       MPI_INFO_NULL, MPI_COMM_WORLD, &(count->win));
    } else {
        count->data = NULL;
        MPI_Win_create(count->data, 0, 1,
                       MPI_INFO_NULL, MPI_COMM_WORLD, &(count->win));
    }
    count -> myval = 0;

    return count;
}

int increment_counter(struct mpi_counter_t *count, int increment) {
    int *vals = (int *)malloc( count->size * sizeof(int) );
    int val;

    MPI_Win_lock(MPI_LOCK_EXCLUSIVE, count->hostrank, 0, count->win);

    for (int i=0; i<count->size; i++) {

        if (i == count->rank) {
            MPI_Accumulate(&increment, 1, MPI_INT, 0, i, 1, MPI_INT, MPI_SUM,
                           count->win);
        } else {
            MPI_Get(&vals[i], 1, MPI_INT, 0, i, 1, MPI_INT, count->win);
        }
    }

    MPI_Win_unlock(0, count->win);
    count->myval += increment;

    vals[count->rank] = count->myval;
    val = 0;
    for (int i=0; i<count->size; i++)
        val += vals[i];

    free(vals);
    return val;
}

void delete_counter(struct mpi_counter_t **count) {
    if ((*count)->rank == (*count)->hostrank) {
        MPI_Free_mem((*count)->data);
    }
    MPI_Win_free(&((*count)->win));
    free((*count));
    *count = NULL;

    return;
}

void print_counter(struct mpi_counter_t *count) {
    if (count->rank == count->hostrank) {
        for (int i=0; i<count->size; i++) {
            printf("%2d ", count->data[i]);
        }
        puts("");
    }
}

int test1() {
    struct mpi_counter_t *c;
    int rank;
    int result;

    c = create_counter(0);

    MPI_Comm_rank(MPI_COMM_WORLD, &rank);
    result = increment_counter(c, 1);
    printf("%d got counter %d
", rank, result);

    MPI_Barrier(MPI_COMM_WORLD);
    print_counter(c);
    delete_counter(&c);
}


int test2() {
    const int WORKITEMS=50;

    struct mpi_counter_t *c;
    int rank;
    int result = 0;

    c = create_counter(0);

    MPI_Comm_rank(MPI_COMM_WORLD, &rank);
    srandom(rank);

    while (result < WORKITEMS) {
        result = increment_counter(c, 1);
        if (result <= WORKITEMS) {
             printf("%d working on item %d...
", rank, result);
             sleep(random() % 10);
         } else {
             printf("%d done
", rank);
         }
    }

    MPI_Barrier(MPI_COMM_WORLD);
    print_counter(c);
    delete_counter(&c);
}

int main(int argc, char **argv) {

    MPI_Init(&argc, &argv);

    test1();
    test2();

    MPI_Finalize();
}

相关文章