1#ifndef LIBPRESIO_DISTRIBUTED_MANAGER_H
2#define LIBPRESIO_DISTRIBUTED_MANAGER_H
4#include <libdistributed/libdistributed_work_queue.h>
5#include <libdistributed/libdistributed_work_queue_options.h>
6#include <std_compat/optional.h>
24compat::optional<std::vector<size_t>>
distributed_build_groups(
const unsigned int size,
const unsigned int n_workers_groups,
const unsigned int n_masters,
const unsigned int root);
48 max_masters(max_masters),
49 max_ranks_per_worker(max_ranks_per_worker),
61 template <
class TaskRandomIt,
class MasterFn,
class WorkerFn>
62 int work_queue(TaskRandomIt begin, TaskRandomIt end, WorkerFn&& workerfn, MasterFn&& masterfn) {
65 MPI_Initialized(&initalized);
67 return set_error(1,
"MPI must be initialized");
69 distributed::queue::work_queue_options<typename distributed::queue::iterator_to_request_type<TaskRandomIt>::type> options(comm);
70 options.set_root(root);
71 options.set_groups(groups);
72 distributed::queue::work_queue( options, begin, end, std::forward<WorkerFn>(workerfn), std::forward<MasterFn>(masterfn));
84 int send(T
const& t,
int dest,
int tag=0) {
85 return distributed::comm::send(t, dest, tag, comm);
98 int recv(T& t,
int source,
int tag=0, MPI_Status* s=
nullptr) {
99 return distributed::comm::recv(t, source, tag, comm, s);
111 return distributed::comm::bcast(t, bcast_root, comm);
121 return distributed::comm::bcast(t, root, comm);
129 MPI_Comm_size(comm, &size);
138 MPI_Comm_rank(comm, &rank);
144 set(opts,
"distributed:root",
"which rank should be considered the root?");
145 set(opts,
"distributed:mpi_comm",
"which MPI communicator to use");
146 set(opts,
"distributed:n_masters",
"How many ranks are assigned to as task masters?");
147 set(opts,
"distributed:n_worker_groups",
"How many groups of workers are there?");
148 set(opts,
"distributed:groups",
"maps the each rank to either a worker or master processes group a la MPI_Comm_split");
158 set(opts,
"distributed:root", root);
159 set(opts,
"distributed:mpi_comm", (
void*)&comm);
160 if(max_masters > 1 || max_masters == 0) {
161 set(opts,
"distributed:n_masters", n_masters);
163 if(max_ranks_per_worker > 1 || max_ranks_per_worker == 0) {
164 set(opts,
"distributed:n_worker_groups", n_workers);
165 if(max_masters > 1 || max_masters == 0) {
166 set(opts,
"distributed:groups",
pressio_data(groups.begin(), groups.end()));
176 get(options,
"distributed:root", &root);
184 MPI_Initialized(&initialized);
186 MPI_Comm_size(comm, &size);
190 auto workers_set =
get(options,
"distributed:n_worker_groups", &n_workers);
191 auto masters_set =
get(options,
"distributed:n_masters", &n_masters);
193 groups = groups_data.
to_vector<
size_t>();
194 n_workers = compat::nullopt;
195 n_masters = compat::nullopt;
199 n_workers = size - *n_masters;
202 groups = std::move(*tmp);
208 groups = std::move(*tmp);
211 n_workers = size - *n_masters;
214 groups = std::move(*tmp);
223 const char*
prefix()
const override {
return "distributed"; }
226 MPI_Comm comm = MPI_COMM_WORLD;
227 std::vector<size_t> groups;
228 unsigned int root = 0;
229 unsigned int max_masters = 1;
230 unsigned int max_ranks_per_worker = 1;
231 compat::optional<unsigned int> n_workers, n_masters;
Definition: configurable.h:17
enum pressio_options_key_status get(pressio_options const &options, StringType &&key, PointerType value) const
Definition: configurable.h:127
Definition: distributed_manager.h:34
int send(T const &t, int dest, int tag=0)
Definition: distributed_manager.h:84
struct pressio_options get_documentation() const override
Definition: distributed_manager.h:142
int work_queue(TaskRandomIt begin, TaskRandomIt end, WorkerFn &&workerfn, MasterFn &&masterfn)
Definition: distributed_manager.h:62
int recv(T &t, int source, int tag=0, MPI_Status *s=nullptr)
Definition: distributed_manager.h:98
virtual int set_options(struct pressio_options const &options) override
Definition: distributed_manager.h:175
static size_t unlimited
Definition: distributed_manager.h:39
pressio_distributed_manager(unsigned int max_ranks_per_worker=1, unsigned int max_masters=1)
Definition: distributed_manager.h:46
int bcast(T &t)
Definition: distributed_manager.h:120
struct pressio_options get_options() const override
Definition: distributed_manager.h:156
int comm_rank() const
Definition: distributed_manager.h:136
int comm_size() const
Definition: distributed_manager.h:127
const char * prefix() const override
Definition: distributed_manager.h:223
int bcast(T &t, int bcast_root)
Definition: distributed_manager.h:110
int set_error(int code, std::string const &msg)
interface for configurable types
C++ pressio_data interface.
compat::optional< std::vector< size_t > > distributed_build_groups(const unsigned int size, const unsigned int n_workers_groups, const unsigned int n_masters, const unsigned int root)
int distributed_world_size()
interface for types which can report and have errors
C++ pressio_options and pressio_option interfaces.
A single option value for a compressor.
A set of options for a compressor.
@ pressio_options_key_set
Definition: pressio_options.h:29
std::vector< T > to_vector() const
Definition: data.h:484
Definition: options.h:352