Simulated actors
The simulated actors that for your in situ processing workflow and interact with the DTL can be programmed at different level of complexity. This allows you to quickly develop prototypes just focusing on the workflow structure and the data flow between its components and then complexify each individual workflow component to increase the realism of your simulator.
Pure S4U actors
The simplest way to program a simulated actor acting as a component of your in situ processing workflow is to rely solely on the S4U interface of Simgrid <https://simgrid.org/doc/latest/app_s4u.html>. The following code examples show you how to program simple data publisher (distributed over mutiple ranks) and subscriber (single rank).
Data publisher
void distributed_publisher() {
// Connect from the DTL
auto dtl = DTL::connect()
// Add a ``Data'' stream using a ``File'' engine
auto s = dtl->add_stream("Data")
->set_engine_type("File")
->set_transport_method("File");
// Define a 2D array of int distributed over multiple ranks
auto V = s->define_variable("V", {size, size}, {size*rank, size*rank}, {l_size, l_size}, sizeof(int));
// Open the stream in ``Publish'' mode
auto e = s->open("cluster:file_system:/working_dir/", Stream::Mode::Publish);
for (int i = 0; i < 10 ; i++) {
// Compute 1e3 floating point operations per element
sg4::this_actor::execute(V->get_local_size() * 1e3);
// Then publish ``V'' to the DTL
e->begin_transaction();
e->put(V);
e->end_transaction();
}
// Close the engine
e->close();
// Disconnect from the DTL
DTL::disconnect();
}
Data subscriber
void subscriber() {
// Connect from the DTL
auto dtl = DTL::connect()
// Add a stream
auto s = dtl->add_stream("Data");
// Obtain metadata for variable ``V''
auto V = s->inquire_variable("V");
// Open the stream in ``Subscribe'' mode
auto e = s->open("cluster:file_system:/working_dir/", Stream::Mode::Subscribe);
for (int i = 0; i < 10 ; i++) {
// Get the latest transaction for variable ``V''
e->begin_transaction();
e->get(V);
e->end_transaction();
// Compute 1e3 floating point operations per element
sg4::this_actor::execute(V->get_local_size() * 1e3);
}
// Close the engine
e->close();
// Disconnect from the DTL
DTL::disconnect();
}
Mixing MPI and S4U
For instance, you can replace your distributed_publisher() actor by the following MPI_publisher whose core is
the for loop in which each actor computes a billion floating point operations, performs an all-to-all MPI
collective communication, performs more computation, and finally initiates a transaction with the DTL to publish its
share of the variable.
This example also illustrates how SMPI features can be used to reduce the memory footprint of the simulated execution. Here, actors allocate and free a single buffer shared across all actors instead of allocating a distinct buffer each.
void MPI_publisher(int argc, char** argv) {
MPI_Init();
int nranks, rank;
MPI_Comm_size(MPI_COMM_WORLD, &nranks);
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
double size = std::stod(argv[1]);
double l_size = size / sqrt(nranks);
// Connect to the DTL
auto dtl = DTL::connect();
// Add a ``Data'' stream using a ``File'' engine
auto s = dtl->add_stream("Data")
->set_engine_type("File")
->set_transport_method("File");
// Define a 2D array of int
auto v = s->define_variable("V", {size, size}, {size*rank, size*rank}, {l_size, l_size}, sizeof(int));
// Open the stream in ``Publish'' mode
auto e = s->open("cluster:file_system:/working_dir/", Stream::Mode::Publish);
// Allocate a shared data buffer for MPI
void* data = SMPI_SHARED_MALLOC(size * size);
for (int it = 0; it < 100; it++) {
// Compute a GFLOP
sg4::this_actor::execute(1e9);
// Perform an all-to-all collective communication
MPI_Alltoall(data, size * size, MPI_CHAR, data, size * size, MPI_CHAR, MPI_COMM_WORLD);
// More computation
sg4::this_actor::execute(500e8);
// publish data to the DTL
e->begin_transaction();
e->put(v);
e->end_transaction();
}
e->close();
DTL::disconnect();
SMPI_SHARED_FREE(data);
MPI_Finalize();
}