libpressio 0.93.0
Loading...
Searching...
No Matches
distributed_manager.h
Go to the documentation of this file.
1#ifndef LIBPRESIO_DISTRIBUTED_MANAGER_H
2#define LIBPRESIO_DISTRIBUTED_MANAGER_H
3#include <mpi.h>
4#include <libdistributed/libdistributed_work_queue.h>
5#include <libdistributed/libdistributed_work_queue_options.h>
6#include <std_compat/optional.h>
7#include <utility>
8#include <cassert>
9#include "configurable.h"
10#include "errorable.h"
11#include "options.h"
12#include "data.h"
13#include <pressio_option.h>
14#include <pressio_options.h>
15
24compat::optional<std::vector<size_t>> distributed_build_groups(const unsigned int size, const unsigned int n_workers_groups, const unsigned int n_masters, const unsigned int root);
25
30
35 public:
39 static size_t unlimited;
46 pressio_distributed_manager(unsigned int max_ranks_per_worker = 1, unsigned int max_masters = 1):
48 max_masters(max_masters),
49 max_ranks_per_worker(max_ranks_per_worker),
50 n_workers(0),
51 n_masters(0)
52 {}
53
61 template <class TaskRandomIt, class MasterFn, class WorkerFn>
62 int work_queue(TaskRandomIt begin, TaskRandomIt end, WorkerFn&& workerfn, MasterFn&& masterfn) {
64 int initalized = 0;
65 MPI_Initialized(&initalized);
66 if(!initalized) {
67 return set_error(1, "MPI must be initialized");
68 }
69 distributed::queue::work_queue_options<typename distributed::queue::iterator_to_request_type<TaskRandomIt>::type> options(comm);
70 options.set_root(root);
71 options.set_groups(groups);
72 distributed::queue::work_queue( options, begin, end, std::forward<WorkerFn>(workerfn), std::forward<MasterFn>(masterfn));
73 return error_code();
74 }
75
83 template <class T>
84 int send(T const& t, int dest, int tag=0) {
85 return distributed::comm::send(t, dest, tag, comm);
86 }
87
88
97 template <class T>
98 int recv(T& t, int source, int tag=0, MPI_Status* s=nullptr) {
99 return distributed::comm::recv(t, source, tag, comm, s);
100 }
101
102
109 template <class T>
110 int bcast(T& t, int bcast_root) {
111 return distributed::comm::bcast(t, bcast_root, comm);
112 }
113
119 template <class T>
120 int bcast(T& t) {
121 return distributed::comm::bcast(t, root, comm);
122 }
123
127 int comm_size() const {
128 int size;
129 MPI_Comm_size(comm, &size);
130 return size;
131 }
132
136 int comm_rank() const {
137 int rank;
138 MPI_Comm_rank(comm, &rank);
139 return rank;
140 }
141
142 struct pressio_options get_documentation() const override {
143 pressio_options opts;
144 set(opts, "distributed:root", "which rank should be considered the root?");
145 set(opts, "distributed:mpi_comm", "which MPI communicator to use");
146 set(opts, "distributed:n_masters", "How many ranks are assigned to as task masters?");
147 set(opts, "distributed:n_worker_groups", "How many groups of workers are there?");
148 set(opts, "distributed:groups", "maps the each rank to either a worker or master processes group a la MPI_Comm_split");
149 return opts;
150 }
151
156 struct pressio_options get_options () const override {
157 pressio_options opts;
158 set(opts, "distributed:root", root);
159 set(opts, "distributed:mpi_comm", (void*)&comm);
160 if(max_masters > 1 || max_masters == 0) {
161 set(opts, "distributed:n_masters", n_masters);
162 }
163 if(max_ranks_per_worker > 1 || max_ranks_per_worker == 0) {
164 set(opts, "distributed:n_worker_groups", n_workers);
165 if(max_masters > 1 || max_masters == 0) {
166 set(opts, "distributed:groups", pressio_data(groups.begin(), groups.end()));
167 }
168 }
169 return opts;
170 }
175 virtual int set_options (struct pressio_options const &options) override {
176 get(options, "distributed:root", &root);
177 MPI_Comm* tmp_comm;
178 if(get(options, "distributed:mpi_comm", (void**)&tmp_comm) == pressio_options_key_set) {
179 comm = *tmp_comm;
180 }
181
182 int size;
183 int initialized;
184 MPI_Initialized(&initialized);
185 if(initialized) {
186 MPI_Comm_size(comm, &size);
187 }
188
189 pressio_data groups_data;
190 auto workers_set = get(options, "distributed:n_worker_groups", &n_workers);
191 auto masters_set = get(options, "distributed:n_masters", &n_masters);
192 if(get(options, "distributed:groups", &groups_data) == pressio_options_key_set) {
193 groups = groups_data.to_vector<size_t>();
194 n_workers = compat::nullopt;
195 n_masters = compat::nullopt;
196
197 } else if(workers_set == pressio_options_key_set && masters_set == pressio_options_key_set) {
198 n_masters = 1;
199 n_workers = size - *n_masters;
200 auto tmp = distributed_build_groups(size, *n_workers, *n_masters, root);
201 if(tmp) {
202 groups = std::move(*tmp);
203 }
204 } else if (workers_set == pressio_options_key_set) {
205 n_masters = 1;
206 auto tmp = distributed_build_groups(size, *n_workers, *n_masters, root);
207 if(tmp) {
208 groups = std::move(*tmp);
209 }
210 } else if (masters_set == pressio_options_key_set) {
211 n_workers = size - *n_masters;
212 auto tmp = distributed_build_groups(size, *n_workers, *n_masters, root);
213 if(tmp){
214 groups = std::move(*tmp);
215 }
216 }
217 return 0;
218 }
219
223 const char* prefix() const override { return "distributed"; }
224
225 private:
226 MPI_Comm comm = MPI_COMM_WORLD;
227 std::vector<size_t> groups;
228 unsigned int root = 0;
229 unsigned int max_masters = 1;
230 unsigned int max_ranks_per_worker = 1;
231 compat::optional<unsigned int> n_workers, n_masters;
232};
233#endif /* end of include guard: LIBPRESIO_DISTRIBUTED_MANAGER_H */
Definition: configurable.h:17
enum pressio_options_key_status get(pressio_options const &options, StringType &&key, PointerType value) const
Definition: configurable.h:127
Definition: distributed_manager.h:34
int send(T const &t, int dest, int tag=0)
Definition: distributed_manager.h:84
struct pressio_options get_documentation() const override
Definition: distributed_manager.h:142
int work_queue(TaskRandomIt begin, TaskRandomIt end, WorkerFn &&workerfn, MasterFn &&masterfn)
Definition: distributed_manager.h:62
int recv(T &t, int source, int tag=0, MPI_Status *s=nullptr)
Definition: distributed_manager.h:98
virtual int set_options(struct pressio_options const &options) override
Definition: distributed_manager.h:175
static size_t unlimited
Definition: distributed_manager.h:39
pressio_distributed_manager(unsigned int max_ranks_per_worker=1, unsigned int max_masters=1)
Definition: distributed_manager.h:46
int bcast(T &t)
Definition: distributed_manager.h:120
struct pressio_options get_options() const override
Definition: distributed_manager.h:156
int comm_rank() const
Definition: distributed_manager.h:136
int comm_size() const
Definition: distributed_manager.h:127
const char * prefix() const override
Definition: distributed_manager.h:223
int bcast(T &t, int bcast_root)
Definition: distributed_manager.h:110
int error_code() const
int set_error(int code, std::string const &msg)
interface for configurable types
C++ pressio_data interface.
compat::optional< std::vector< size_t > > distributed_build_groups(const unsigned int size, const unsigned int n_workers_groups, const unsigned int n_masters, const unsigned int root)
int distributed_world_size()
interface for types which can report and have errors
C++ pressio_options and pressio_option interfaces.
A single option value for a compressor.
A set of options for a compressor.
@ pressio_options_key_set
Definition: pressio_options.h:29
Definition: data.h:52
std::vector< T > to_vector() const
Definition: data.h:484
Definition: options.h:352