.. Copyright 2025 .. _DTLMod_Actors: Simulated actors ################ The simulated actors that for your in situ processing workflow and interact with the DTL can be programmed at different level of complexity. This allows you to quickly develop prototypes just focusing on the workflow structure and the data flow between its components and then complexify each individual workflow component to increase the realism of your simulator. Pure S4U actors *************** The simplest way to program a simulated actor acting as a component of your in situ processing workflow is to rely solely on the `S4U interface of Simgrid `. The following code examples show you how to program simple data publisher (distributed over mutiple ranks) and subscriber (single rank). Data publisher ^^^^^^^^^^^^^^ .. code-block:: cpp void distributed_publisher() { // Connect from the DTL auto dtl = DTL::connect() // Add a ``Data'' stream using a ``File'' engine auto s = dtl->add_stream("Data") ->set_engine_type("File") ->set_transport_method("File"); // Define a 2D array of int distributed over multiple ranks auto V = s->define_variable("V", {size, size}, {size*rank, size*rank}, {l_size, l_size}, sizeof(int)); // Open the stream in ``Publish'' mode auto e = s->open("cluster:file_system:/working_dir/", Stream::Mode::Publish); for (int i = 0; i < 10 ; i++) { // Compute 1e3 floating point operations per element sg4::this_actor::execute(V->get_local_size() * 1e3); // Then publish ``V'' to the DTL e->begin_transaction(); e->put(V); e->end_transaction(); } // Close the engine e->close(); // Disconnect from the DTL DTL::disconnect(); } Data subscriber ^^^^^^^^^^^^^^^ .. code-block:: cpp void subscriber() { // Connect from the DTL auto dtl = DTL::connect() // Add a stream auto s = dtl->add_stream("Data"); // Obtain metadata for variable ``V'' auto V = s->inquire_variable("V"); // Open the stream in ``Subscribe'' mode auto e = s->open("cluster:file_system:/working_dir/", Stream::Mode::Subscribe); for (int i = 0; i < 10 ; i++) { // Get the latest transaction for variable ``V'' e->begin_transaction(); e->get(V); e->end_transaction(); // Compute 1e3 floating point operations per element sg4::this_actor::execute(V->get_local_size() * 1e3); } // Close the engine e->close(); // Disconnect from the DTL DTL::disconnect(); } Mixing MPI and S4U ****************** For instance, you can replace your ``distributed_publisher()`` actor by the following ``MPI_publisher`` whose core is the ``for`` loop in which each actor computes a billion floating point operations, performs an all-to-all MPI collective communication, performs more computation, and finally initiates a transaction with the DTL to publish its share of the variable. This example also illustrates how SMPI features can be used to reduce the memory footprint of the simulated execution. Here, actors allocate and free a single buffer shared across all actors instead of allocating a distinct buffer each. .. code-block:: cpp void MPI_publisher(int argc, char** argv) { MPI_Init(); int nranks, rank; MPI_Comm_size(MPI_COMM_WORLD, &nranks); MPI_Comm_rank(MPI_COMM_WORLD, &rank); double size = std::stod(argv[1]); double l_size = size / sqrt(nranks); // Connect to the DTL auto dtl = DTL::connect(); // Add a ``Data'' stream using a ``File'' engine auto s = dtl->add_stream("Data") ->set_engine_type("File") ->set_transport_method("File"); // Define a 2D array of int auto v = s->define_variable("V", {size, size}, {size*rank, size*rank}, {l_size, l_size}, sizeof(int)); // Open the stream in ``Publish'' mode auto e = s->open("cluster:file_system:/working_dir/", Stream::Mode::Publish); // Allocate a shared data buffer for MPI void* data = SMPI_SHARED_MALLOC(size * size); for (int it = 0; it < 100; it++) { // Compute a GFLOP sg4::this_actor::execute(1e9); // Perform an all-to-all collective communication MPI_Alltoall(data, size * size, MPI_CHAR, data, size * size, MPI_CHAR, MPI_COMM_WORLD); // More computation sg4::this_actor::execute(500e8); // publish data to the DTL e->begin_transaction(); e->put(v); e->end_transaction(); } e->close(); DTL::disconnect(); SMPI_SHARED_FREE(data); MPI_Finalize(); }