Import

parent 3894390a
Boost Software License - Version 1.0 - August 17th, 2003
Permission is hereby granted, free of charge, to any person or organization
obtaining a copy of the software and accompanying documentation covered by
this license (the "Software") to use, reproduce, display, distribute,
execute, and transmit the Software, and to prepare derivative works of the
Software, and to permit third-parties to whom the Software is furnished to
do so, all subject to the following:
The copyright notices in the Software and this entire statement, including
the above license grant, this restriction and the following disclaimer,
must be included in all copies of the Software, in whole or in part, and
all derivative works of the Software, unless such copies or derivative
works are solely in the form of machine-executable object code generated by
a source language processor.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE, TITLE AND NON-INFRINGEMENT. IN NO EVENT
SHALL THE COPYRIGHT HOLDERS OR ANYONE DISTRIBUTING THE SOFTWARE BE LIABLE
FOR ANY DAMAGES OR OTHER LIABILITY, WHETHER IN CONTRACT, TORT OR OTHERWISE,
ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
DEALINGS IN THE SOFTWARE.
#pragma once
#include <errno.h>
#include <mpi.h>
#include <algorithm>
#include <cstdint>
#include <cstring>
#include <numeric>
#include <vector>
#include <boost/mpi/communicator.hpp>
#include <boost/mpi/collectives.hpp>
#include <boost/mpi/packed_iarchive.hpp>
#include <boost/mpi/packed_oarchive.hpp>
#include <boost/serialization/string.hpp>
#include <boost/serialization/utility.hpp>
#include <boost/serialization/vector.hpp>
#include "common.h"
namespace unsafe_mpi {
template <typename T, typename transmit_type=uint64_t>
static void allgatherv(const boost::mpi::communicator &comm, const std::vector<T> &in, std::vector<T> &out) {
// Trivial (enough) datatypes can be transmit directly via MPI_Allgatherv
// For all others, we have to serialize them using boost::serialize
if (is_trivial_enough<T>::value) {
allgatherv_unsafe<T, transmit_type>(comm, in, out);
} else {
allgatherv_serialize<T>(comm, in, out);
}
}
template <typename T>
static void allgatherv_serialize(const boost::mpi::communicator &comm, const std::vector<T> &in, std::vector<T> &out) {
// Step 1: serialize input data
boost::mpi::packed_oarchive oa(comm);
if (!in.empty())
oa << in;
// Step 2: exchange sizes (archives' .size() is measured in bytes)
// Need to cast to int because this is what MPI uses as size_t...
const int in_size = static_cast<int>(in.size()),
transmit_size = (in.empty() ? 0 : static_cast<int>(oa.size()));
std::vector<int> in_sizes(comm.size()), transmit_sizes(comm.size());
boost::mpi::all_gather(comm, in_size, in_sizes.data());
boost::mpi::all_gather(comm, transmit_size, transmit_sizes.data());
// Step 3: calculate displacements from sizes (prefix sum)
std::vector<int> displacements(comm.size() + 1);
displacements[0] = sizeof(boost::mpi::packed_iarchive);
for (int i = 1; i <= comm.size(); ++i) {
displacements[i] = displacements[i-1] + transmit_sizes[i-1];
}
// Step 4: allocate space for result and MPI_Allgatherv
char* recv = new char[displacements.back()];
// If in.empty(), transmit_size is 0 so we don't really care
auto sendptr = const_cast<void*>(oa.address());
int status = MPI_Allgatherv(sendptr, transmit_size, MPI_PACKED, recv,
transmit_sizes.data(), displacements.data(),
MPI_PACKED, comm);
if (status != 0) {
ERR << "MPI_Allgatherv returned " << status << ", errno " << errno << std::endl;
return;
}
// Step 5: deserialize received data
// Preallocate storage to prevent reallocations
std::vector<T> temp;
size_t largest_size = *std::max_element(in_sizes.begin(), in_sizes.end());
temp.reserve(largest_size);
out.reserve(std::accumulate(in_sizes.begin(), in_sizes.end(), 0));
// Deserialize archives one by one, inserting elements at the end of ̀out̀
for (int i = 0; i < comm.size(); ++i) {
if (in_sizes[i] == 0) {
// We can ignore processes which didn't have anything to send
continue;
}
boost::mpi::packed_iarchive archive(comm);
archive.resize(transmit_sizes[i]);
memcpy(archive.address(), recv + displacements[i], transmit_sizes[i]);
temp.clear();
temp.resize(in_sizes[i]);
archive >> temp;
out.insert(out.end(), temp.begin(), temp.end());
}
}
template <typename T, typename transmit_type=uint64_t>
static void allgatherv_unsafe(const boost::mpi::communicator &comm, const std::vector<T> &in, std::vector<T> &out) {
static_assert((sizeof(T)/sizeof(transmit_type)) * sizeof(transmit_type) == sizeof(T),
"Invalid transmit_type for element type (sizeof(transmit_type) is not a multiple of sizeof(T))");
// Step 1: exchange sizes
// We need to compute the displacement array, specifying for each PE
// at which position in out to place the data received from it
// Need to cast to int because this is what MPI uses as size_t...
const int factor = sizeof(T) / sizeof(transmit_type);
const int in_size = static_cast<int>(in.size()) * factor;
std::vector<int> sizes(comm.size());
boost::mpi::all_gather(comm, in_size, sizes.data());
// Step 2: calculate displacements from sizes
// Compute prefix sum to compute displacements from sizes
std::vector<int> displacements(comm.size() + 1);
displacements[0] = 0;
std::partial_sum(sizes.begin(), sizes.end(), displacements.begin() + 1);
// divide by factor by which T is larger than transmit_type
out.resize(displacements.back() / factor);
// Step 3: MPI_Allgatherv
const transmit_type *sendptr = reinterpret_cast<const transmit_type*>(in.data());
transmit_type *recvptr = reinterpret_cast<transmit_type*>(out.data());
const MPI_Datatype datatype = boost::mpi::get_mpi_datatype<transmit_type>();
int status = MPI_Allgatherv(sendptr, in_size, datatype, recvptr,
sizes.data(), displacements.data(),
datatype, comm);
if (status != 0) {
ERR << "MPI_Allgatherv returned " << status << ", errno " << errno << std::endl;
}
}
}
#pragma once
#include <mpi.h>
#include <errno.h>
#include <cstdint>
#include <vector>
#include <boost/mpi/communicator.hpp>
#include <boost/mpi/collectives.hpp>
#include <boost/mpi/datatype.hpp>
#include <boost/mpi/packed_iarchive.hpp>
#include <boost/mpi/packed_oarchive.hpp>
#include <boost/serialization/string.hpp>
#include <boost/serialization/utility.hpp>
#include <boost/serialization/vector.hpp>
#include "common.h"
namespace unsafe_mpi {
template <typename T, typename transmit_type = uint64_t>
void broadcast(const boost::mpi::communicator &comm, std::vector<T> &data, int root) {
const bool trivial = is_trivial_enough<T>::value;
static_assert(!trivial || boost::mpi::is_mpi_datatype<T>() ||
((sizeof(T)/sizeof(transmit_type)) * sizeof(transmit_type) == sizeof(T)),
"Invalid transmit_type for element type (sizeof(transmit_type) is not a multiple of sizeof(T))");
if (comm.size() < 2) return;
if (trivial) {
// MPI only supports "int" as size type, and the MPI Forum's reply to
// the issue can be summed up as "deal with it" (they refer to user-
// defined contiguous data types, e.g. ones that hold 1024 elements)
// MPI really hasn't moved on since the 90's...
int size = static_cast<int>(data.size());
// broadcast size and allocate space
boost::mpi::broadcast<int>(comm, size, root);
data.resize(size); // harmless on root, required on others
// broadcast elements as transmit_type
auto ptr = reinterpret_cast<transmit_type*>(data.data());
size = static_cast<int>(size * sizeof(T)/sizeof(transmit_type));
boost::mpi::broadcast(comm, ptr, size, root);
} else if (boost::mpi::is_mpi_datatype<T>()) {
// We can use Boost.MPI directly to transmit MPI datatypes
// But send size and data separately to avoid vector serialization
// I don't think this codepath will ever be called, as all
// native MPI datatypes should be trivial enough.
int size = static_cast<int>(data.size());
boost::mpi::broadcast(comm, size, root);
data.resize(size); // harmless on root, required on others
boost::mpi::broadcast<T>(comm, data.data(), size, root);
} else {
// Boost.MPI doesn't use MPI_Broadcast for types it doesn't know. WTF.
// Therefore, we need to do the archive broadcast ourselves.
if (comm.rank() == root) {
// Serialize data
boost::mpi::packed_oarchive oa(comm);
oa << data;
// Broadcast archive size
int archive_size = static_cast<int>(oa.size());
boost::mpi::broadcast(comm, archive_size, root);
// Broadcast archive data
auto sendptr = const_cast<void*>(oa.address());
int status = MPI_Bcast(sendptr, archive_size, MPI_PACKED, root, comm);
if (status != 0) {
ERR << "MPI_Bcast returned non-zero value " << status
<< ", errno: " << errno << std::endl;
}
} else {
// Receive archive size and allocate space
int archive_size;
boost::mpi::broadcast(comm, archive_size, root);
boost::mpi::packed_iarchive ia(comm);
ia.resize(archive_size);
// Receive broadcast archive data
auto recvptr = ia.address();
MPI_Bcast(recvptr, archive_size, MPI_PACKED, root, comm);
if (status != 0) {
ERR << "MPI_Bcast returned non-zero value " << status
<< ", errno: " << errno << std::endl;
return;
}
// Unpack received data
ia >> data;
}
}
}
}
#pragma once
#include <type_traits>
#include <utility>
namespace unsafe_mpi {
// Sort-of unsafe MPI operations on "trivial enough" (i.e. standard layout) data
// mostly because std::pair isn't technically a trivial type, but we want to treat it like one
template <typename T>
struct is_trivial_enough : public std::is_trivial<T> {};
// Pairs are trivial enough [TM] if both components are trivial enough [TM]
template <typename U, typename V>
struct is_trivial_enough<std::pair<U,V>> :
public std::integral_constant<bool,
is_trivial_enough<U>::value && is_trivial_enough<V>::value
> {};
}
#pragma once
#include <errno.h>
#include <mpi.h>
#include <cstdint>
#include <cstring>
#include <numeric>
#include <vector>
#include <boost/mpi/communicator.hpp>
#include <boost/mpi/collectives.hpp>
#include <boost/mpi/packed_iarchive.hpp>
#include <boost/mpi/packed_oarchive.hpp>
#Include <boost/serialization/string.hpp>
#include <boost/serialization/utility.hpp>
#include <boost/serialization/vector.hpp>
#include "common.h"
namespace unsafe_mpi {
template <typename T, typename transmit_type = uint64_t>
static void gatherv(const boost::mpi::communicator &comm, const std::vector<T> &in, std::vector<T> &out, const int root) {
if (is_trivial_enough<T>::value) {
gatherv_trivial<T, transmit_type>(comm, in, out, root);
} else {
gatherv_serialize<T>(comm, in, out, root);
}
}
template <typename T, typename transmit_type = uint64_t>
static void gatherv_trivial(const boost::mpi::communicator &comm,
const std::vector<T> &in, std::vector<T> &out,
const int root) {
static_assert((sizeof(T)/sizeof(transmit_type)) * sizeof(transmit_type) == sizeof(T),
"Invalid transmit_type for element type (sizeof(transmit_type) is not a multiple of sizeof(T))");
// exchange sizes
const int factor = sizeof(T) / sizeof(transmit_type);
const int sendsize = static_cast<int>(in.size() * factor);
const auto datatype = boost::mpi::get_mpi_datatype<transmit_type>();
const auto sendptr = reinterpret_cast<const transmit_type*>(in.data());
if (comm.rank() == root) {
// Receive sizes
std::vector<int> sizes;
sizes.reserve(comm.size());
boost::mpi::gather(comm, sendsize, sizes, root);
// Calculate displacements from spaces
std::vector<int> displacements(sizes.size() + 1);
std::partial_sum(sizes.begin(), sizes.end(), displacements.begin() + 1);
const int outsize = displacements.back();
// Allocate space
// in terms of #elements -> divide by factor
out.resize(outsize / factor);
auto recvptr = reinterpret_cast<transmit_type*>(out.data());
MPI_Gatherv(sendptr, sendsize, datatype,
recvptr, sizes.data(), displacements.data(),
datatype, root, comm);
} else {
// send size, then gather
boost::mpi::gather(comm, sendsize, root);
MPI_Gatherv(sendptr, sendsize, datatype,
nullptr, nullptr, nullptr,
datatype, root, comm);
}
}
// UNTESTED, mostly copied from allgatherv
template <typename T>
static void gatherv_serialize(const boost::mpi::communicator &comm, const std::vector<T> &in, std::vector<T> &out, const int root) {
// Step 1: serialize input data
boost::mpi::packed_oarchive oa(comm);
if (!in.empty())
oa << in;
// Step 2: exchange sizes (archives' .size() is measured in bytes)
// Need to cast to int because this is what MPI uses as size_t...
const int in_size = static_cast<int>(in.size()),
transmit_size = (in.empty() ? 0 : static_cast<int>(oa.size()));
// If in.empty(), transmit_size is 0 so we don't really care
auto sendptr = const_cast<void*>(oa.address());
if (comm.rank() == root) {
std::vector<int> in_sizes(comm.size()), transmit_sizes(comm.size());
boost::mpi::gather<int>(comm, in_size, in_sizes.data(), root);
boost::mpi::gather<int>(comm, transmit_size, transmit_sizes.data(), root);
// Step 3: calculate displacements from sizes (prefix sum)
std::vector<int> displacements(comm.size() + 1);
displacements[0] = sizeof(boost::mpi::packed_iarchive);
for (int i = 1; i <= comm.size(); ++i) {
displacements[i] = displacements[i-1] + transmit_sizes[i-1];
}
// Step 4: allocate space for result and MPI_Allgatherv
char* recv = new char[displacements.back()];
int status = MPI_Gatherv(sendptr, transmit_size, MPI_PACKED, recv,
transmit_sizes.data(), displacements.data(),
MPI_PACKED, root, comm);
if (status != 0) {
ERR << "MPI_Allgatherv returned " << status << ", errno " << errno << std::endl;
return;
}
// Step 5: deserialize received data
// Preallocate storage to prevent reallocations
std::vector<T> temp;
size_t largest_size = *std::max_element(in_sizes.begin(), in_sizes.end());
temp.reserve(largest_size);
out.reserve(std::accumulate(in_sizes.begin(), in_sizes.end(), 0));
// Deserialize archives one by one, inserting elements at the end of ̀out̀
for (int i = 0; i < comm.size(); ++i) {
if (in_sizes[i] == 0) {
// We can ignore processes which didn't have anything to send
continue;
}
boost::mpi::packed_iarchive archive(comm);
archive.resize(transmit_sizes[i]);
memcpy(archive.address(), recv + displacements[i], transmit_sizes[i]);
temp.clear();
temp.resize(in_sizes[i]);
archive >> temp;
out.insert(out.end(), temp.begin(), temp.end());
}
} else {
boost::mpi::gather<int>(comm, in_size, root);
boost::mpi::gather<int>(comm, transmit_size, root);
int status = MPI_Gatherv(sendptr, transmit_size, MPI_PACKED,
nullptr, nullptr, nullptr,
MPI_PACKED, root, comm);
if (status != 0) {
ERR << "MPI_Allgatherv returned " << status << ", errno " << errno << std::endl;
return;
}
}
}
}
#pragma once
#include <cstdint>
#include <vector>
#include <boost/mpi/communicator.hpp>
#include "common.h"
namespace unsafe_mpi {
// Send `size` elements of type `T` starting at `data` to `dest` via `comm` with `tag`,
// using trivial type `transmit_type` if `T` is Standard Layout
template <typename T, typename transmit_type = uint64_t>
static void send(const boost::mpi::communicator &comm, int dest, int tag, const T *data, const size_t size) {
const bool trivial = is_trivial_enough<T>::value;
static_assert(!trivial || (sizeof(T)/sizeof(transmit_type)) * sizeof(transmit_type) == sizeof(T),
"Invalid transmit_type for element type (sizeof(transmit_type) is not a multiple of sizeof(T))");
// send size
comm.send(dest, tag, size);
// send actual data
if (trivial) {
auto sendptr = reinterpret_cast<const transmit_type*>(data);
auto sendsize = size * sizeof(T)/sizeof(transmit_type);
comm.send(dest, tag, sendptr, sendsize);
} else {
comm.send(dest, tag, data, size);
}
}
// convenience wrapper for vectors
template <typename T, typename transmit_type = uint64_t>
static void send(const boost::mpi::communicator &comm, int dest, int tag, const std::vector<T> &data) {
send<T, transmit_type>(comm, dest, tag, data.data(), data.size());
}
template <typename T, typename transmit_type = uint64_t>
static void recv(const boost::mpi::communicator &comm, int src, int tag, std::vector<T> &data) {
const bool trivial = is_trivial_enough<T>::value;
static_assert(!trivial || (sizeof(T)/sizeof(transmit_type)) * sizeof(transmit_type) == sizeof(T),
"Invalid transmit_type for element type (sizeof(transmit_type) is not a multiple of sizeof(T))");
auto size = data.size(); // for the type deduction
// receive size and resize
comm.recv(src, tag, size);
data.resize(size);
// receive actual data
if (trivial) {
auto recvptr = reinterpret_cast<transmit_type*>(data.data());
auto recvsize = size * sizeof(T)/sizeof(transmit_type);
comm.recv(src, tag, recvptr, recvsize);
} else {
comm.recv(src, tag, data.data(), size);
}
}
}
#pragma once
#include "common.h"
#include "point-to-point.h"
// Collectives
#include "broadcast.h"
#include "allgatherv.h"
#include "gatherv.h"
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment