Topic #2 Module: Broadcast Communication

Overview

Objective: To implement several versions of a broadcast collective (i.e., a one-to-many communication), and to compare them in various platform setups.

Roadmap

This module consists of 4 activities, each described in its own tab above, which should be done in sequence:

  • Activity #1: Realize a naive and a ring-based implementation.
  • Activity #2: Enhance the ring-based implementation with pipelining.
  • Activity #3: Enhance the ring-based implementation further using asynchronous communication.
  • Activity #4: Implement a binary-tree-based implementation (with pipelining and asynchronous communication).

In all activities above you will compare your implementation with MPI_Bcast, for different platform configurations.

Skeleton Program

Throughout this module, you will be adding code to a "skeleton program" (download bcast_skeleton.c):

See the source code of bcast_skeleton.c ...
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <mpi.h>
#include <string.h>

// See for the (bad) default random number generator
#define RAND_SEED 842270

// Number of bytes to broadcast

#define NUM_BYTES 100000000

///////////////////////////////////////////////////////
//// program_abort() and print_usage() functions //////
///////////////////////////////////////////////////////

static void program_abort(char *exec_name, char *message);
static void print_usage();

// Abort, printing the usage information only if the
// first argument is non-NULL (and hopefully set to argv[0]), and
// printing the second argument regardless.
static void program_abort(char *exec_name, char *message) {
  int my_rank;
  MPI_Comm_rank(MPI_COMM_WORLD,&my_rank);
  if (my_rank == 0) {
    if (message) {
      fprintf(stderr,"%s",message);
    }
    if (exec_name) {
      print_usage(exec_name);
    }
  }
  MPI_Finalize();
  //MPI_Abort(MPI_COMM_WORLD, 1);
  exit(1);
}

// Print the usage information
static void print_usage(char *exec_name) {
  int my_rank;
  MPI_Comm_rank(MPI_COMM_WORLD,&my_rank);

  if (my_rank == 0) {
    fprintf(stderr,"Usage: smpirun --cfg=smpi/bcast:mpich -np <num processes>\n");
    fprintf(stderr,"              -platform <XML platform file> -hostfile <host file>\n");
    fprintf(stderr,"              %s <bcast implementation name> [-c <chunk size>]\n",exec_name);
    fprintf(stderr,"MPIRUN arguments:\n");
    fprintf(stderr,"\t<num processes>: number of MPI processes\n");
    fprintf(stderr,"\t<XML platform file>: a Simgrid platform description file\n");
    fprintf(stderr,"\t<host file>: MPI host file with host names from the platform file\n");
    fprintf(stderr,"PROGRAM arguments:\n");
    fprintf(stderr,"\t<bcast implementation name>: the name of the broadcast implementation (e.g., naive_bcast)\n");
    fprintf(stderr,"\t[-c <chunk size>]: chunk size in bytes for message splitting (optional)\n");
    fprintf(stderr,"\n");
  }
  return;
}

///////////////////////////
////// Main function //////
///////////////////////////

int main(int argc, char *argv[])
{
  int i,j;
  int chunk_size = NUM_BYTES;
  char *bcast_implementation_name;

  // Parse command-line arguments (not using getopt because not thread-safe
  // and annoying anyway). The code below ignores extraneous command-line
  // arguments, which is lame, but we're not in the business of developing
  // a cool thread-safe command-line argument parser.

  MPI_Init(&argc, &argv);

  // Bcast implementation name
  if (argc < 2) {
    program_abort(argv[0],"Missing <bcast implementation name> argument\n");
  } else {
    bcast_implementation_name = argv[1];
  }

  // Check that the implementation name is valid
  if (strcmp(bcast_implementation_name, "naive_bcast") &&
      strcmp(bcast_implementation_name, "default_bcast") &&
      strcmp(bcast_implementation_name, "ring_bcast") &&
      strcmp(bcast_implementation_name, "pipelined_ring_bcast") &&
      strcmp(bcast_implementation_name, "asynchronous_pipelined_ring_bcast") &&
      strcmp(bcast_implementation_name, "asynchronous_pipelined_bintree_bcast")) {
    char message[256];
    sprintf(message, "Unknown bcast implementation name '%s'\n",bcast_implementation_name);
    program_abort(NULL,message);
  }

  // Chunk size optional argument
  for (i=1; i < argc; i++) {
    if (!strcmp(argv[i],"-c")) {
      if ((i+1 >= argc) || (sscanf(argv[i+1],"%d",&chunk_size) != 1)) {
        program_abort(argv[0],"Invalid <chunk size> argument\n");
      }
    }
  }
  
  // Determine rank and number of processes
  int num_procs;
  int rank;
  MPI_Comm_rank(MPI_COMM_WORLD, &rank);
  MPI_Comm_size(MPI_COMM_WORLD, &num_procs);

  // Allocate buffer
  int checksum;
  char *buffer;
  
  if ((buffer = malloc(sizeof(char) * NUM_BYTES)) == NULL) {
    program_abort(argv[0],"Out of memory!");
  }

  // On rank 0 fill the buffer with random data 
  if (0 == rank) { 
    checksum = 0;
    srandom(RAND_SEED);
    for (j = 0; j < NUM_BYTES; j++) {
      buffer[j] = (char) (random() % 256); 
      checksum += buffer[j];
    }
  }

  // Start the timer
  double start_time;
  MPI_Barrier(MPI_COMM_WORLD);
  if (rank == 0) {  
    start_time = MPI_Wtime();
  }

  /////////////////////////////////////////////////////////////////////////////
  //////////////////////////// TO IMPLEMENT: BEGIN ////////////////////////////
  /////////////////////////////////////////////////////////////////////////////

  // char *bcast_implementation_name:   the bcast implementation name (argument #1)
  // int chunk_size:                    the chunk size (optional argument #2)
  // int NUM_BYTES:                     the number of bytes to broadcast
  // char *buffer:                      the buffer to broadcast

  // Process rank 0 should be  the source of the broadcast

#include "bcast_solution.c"

  /////////////////////////////////////////////////////////////////////////////
  ///////////////////////////// TO IMPLEMENT: END /////////////////////////////
  /////////////////////////////////////////////////////////////////////////////
 
  // All processes send checksums back to the root, which checks for consistency
  char all_ok = 1;
  if (0 == rank) {
    for (j = 1; j < num_procs; j++) {
      int received_checksum;
      MPI_Recv(&received_checksum, 1, MPI_INT, MPI_ANY_SOURCE, 1, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
      // Print a single message in case of a mismatch, but continue
      // receiving other checksums to ensure that all processes
      // reach the MPI_Finalize()
      if ((all_ok == 1) && (checksum != received_checksum)) {
	    fprintf(stderr,"\t** Non-matching checksum! **\n");
	    all_ok = 0;
	    break;
      }
    }
  } else {
    int checksum=0;
    for (j = 0; j < NUM_BYTES; j++) {
      checksum += buffer[j];
    }
    MPI_Send(&checksum, 1, MPI_INT, 0, 1, MPI_COMM_WORLD);
  }

  // Print out bcast implementation name and wall-clock time, only if the bcast was successful
  MPI_Barrier(MPI_COMM_WORLD);
  if ((0 == rank) && (all_ok)) {
    fprintf(stdout,"implementation: %s | chunksize: %d |  time: %.3lf seconds\n",
		    bcast_implementation_name, 
		    chunk_size,
		    MPI_Wtime() - start_time);
  }

  // Clean-up
  free(buffer);
  MPI_Finalize();

  return 0;
}

This program performs a broadcast of a 108 bytes (~ 95 MiB) and takes the following command-line arguments:

required argument #1: a string that is the name of the broadcast implementation (see each activity)
optional argument #2 ( [-c chunksize]): an integer that specifies a "chunk size" (see activity #2)

You should not modify bcast_skeleton.c outside of the "TO IMPLEMENT: BEGIN" and "TO IMPLEMENT: END" comments. See comments therein for all needed information to implement your code. Note that you can opt to write your code directly in between these two comments, or #include it, up to you. Regardless, the goal is to have a single program, bcast_skeleton, that is executed with different command-line arguments for all activities in this module.

The skeleton program does the following for you: parsing of command-line arguments, creation of broadcast data, checking that the broadcast operation was done correctly, timing of the broadcast operation.

What to turn in

You should turn in a single archive (or a github repo) with:

All source code
XML platform files and hostfiles (see details in the activities)
A Makefile that compiles all executables (and has a 'clean' target!)
A README file with answers to the questions asked in the activities

In this activity we develop two simple broadcast implementations, in the first two steps below. You can verify correctness of your broadcast implementations using any SimGrid platform file and corresponding MPI hostfile. In the third step below we evaluate performance for a particular platform file.

Look at the section of code in bcast_skeleton.c between the TO IMPLEMENT: BEGIN and TO IMPLEMENT: END comments. The comments list the relevant variables that you will use in your code. You will note, in that section, that bcast_skeleton.c does an #include bcast_solution.c. So you should create a file bcast_solution.c in which you will write all the code for this assignment. This is more convenient that modifying the bcast_skeleton.c directly.

In bcast_solution.c, implement a broadcast implementation named naive_bcast (first command-line argument). This implementation simply has the root of the broadcast (process with rank 0) perform a single point-to-point MPI_Send of the whole buffer to each other process (which performs an MPI_Recv). In other words, the processes are logically organized as a mono-directional star with process of rank 0 at the center. The second command-line argument (chunk size) is ignored.

Augment your code in file bcast_solution.c with a broadcast implementation named ring_bcast (first command-line argument). In this implementation, the process of rank 0 (the root of the broadcast) does a single MPI_Send of the whole buffer to the process of rank 1, the process of rank 1 then does a single MPI_Send to the process of rank 2, etc. In other words, the processes are logically organized as a mono-directional ring. The last process should not send the buffer back to the root process (the root process does not call MPI_Recv). The second command-line argument (chunk size) is ignored.

Before you proceed with the evaluation of your two implementations, implement a third broadcast implementation, named default_bcast, that simply has all processes call MPI_Bcast. This is the MPI implementation of the broadcast collective communication, which in this module we implement "by hand" using point-to-point communications.

We compare the performance of the three implementations on a 50-processor physical ring. Although some supercomputers in the old days were designed with a ring network topology, this is no longer the case. The main drawback of a physical ring is that it has very large diameter (i.e., there can be ~n/2 hops between two processors in an n-processor ring). The main advantages is that the degree is low (2), which implies low cost, and that routing is straightforward. For now we assume a simple physical ring so as to better understand broadcast performance.

It is pretty simple to generate a Simgrid platform file for a ring and the corresponding hostfile. You can download a Python script (generate_xml_ring_and_hostfile.py) that generates these two files given a number of processors passed as a command-line argument. Just in case, here are the files generated using this script for 50 processors: ring_50.xml and hostfile_50.txt.

The way to invoke the program is as follows:

  smpirun --cfg=smpi/bcast:mpich -np 50 
  -platform ring_50.xml -hostfile hostfile_50.txt
  ./bcast_skeleton <implementation name>
  

The --cfg=smpi/bcase:mpich flag above specifies that we simulate MPI_Bcast (for the default_bcast implementation) as implemented in MPICH. Other options are possible, but it's okay to stick with this implementation

Answer the following questions:

What are the (simulated) wall-clock time of the three implementations on the 50-processor ring?
How far are your "by hand" implementations from the default broadcast?
You may observe that ring_bcast does not improve a lot over naive_bcast, which should be surprising to you. After all, naive_bcast sends long-haul messages while ring_bcast doesn't What do you think the reason is? To answer this question, you can instrument your code and run it on smaller rings to see when events happen and try to understand what's going on. Given that we're using simulation, you should take advantage of it and experiment with all kinds of platform configurations to gain understanding of the performance behavior. For instance, you can modify link latencies and bandwidths. The MPI_Wtime function is convenient to determine the current (simulated) date. This function returns the date as a double precision number (and is in fact already used in bcast_skeleton.c).

Warning: SMPI implements sophisticated simulation models that capture many real-world effects (network protocol idiosyncrasies, MPI implementation idiosyncrasies). These effects will be seen in your experiments, just as they would on a real-world platform, and they tend to make performance behavior more difficult to understand. For instance, if you modify the buffer size, you will see non-linear effects on performance (e.g., broadcasting a buffer twice as large will not require exactly twice as much time, broadcasting a buffer 1 byte larger may increase broadcast time significantly).

In this activity we enhance the ring-based broadcast so that it uses pipelining to better utilize the network links.

One of the reasons why the default MPI broadcast is fast is that it does pipelining of communication. Augment your code in bcast_solution.c with a new broadcast implementation called pipelined_ring_bcast.

This implementation uses the chunk size passed as the second command-line argument to split the broadcast data buffer into multiple chunks. These chunks are communicated in sequence along the ring. In the first step, process #0 sends chunk #0 to process #1; in the second step, process #1 sends chunk #0 to process #2 and process #0 sends chunk #1 to process #1; and so on. As a result of this pipelining, provided the chunk size is small enough, all network links can be utilized simultaneously. As in the previous activity, use MPI_Send and MPI_Recv for communications.

Note that the chunk size may not divide the message size, in which case the last chunk will be smaller.

Run pipelined_ring_bcast using chunk sizes of 100,000 bytes, 500,000 bytes, 1,000,000 bytes, 5,000,000 bytes, 10,000,000 bytes, 50,000,000 bytes, and 100,000,000 (i.e., 1 chunk), on a 20-processor ring, a 35-processor ring, and a 50-processor ring. Report the simulated execution times. Remember that the generate_xml_ring_and_hostfile.py Python script can be used to generate platform files and hostfiles.

Answer the following questions:

What is the best chunk size for each of the three platforms?
Does this chunk size depend on the platform size?
You should observe that the wall-clock time is minimized for a chunk size that is neither the smallest nor the largest. Discuss why you think that is.
For a 50-processor ring, with the best chunk size determined above, by how much does pipelining help when compared to using a single chunk?
How does the performance compare to that of the default MPI broadcast for that platform as seen in the previous question?
What do you conclude about the use of pipelined communications for the purpose of a ring-based broadcast on a ring-shaped physical platform?

Warning: You will likely see some "odd" behavior. For instance, as the chunk size increases you may see the broadcast time decrease, increase, and then decrease again. Or, if ou try very small chunk sizes, smaller than the ones above, you could see that for some particular chunk sizes the broadcast time is pretty low, if perhaps not the lowest across the board. These effects are because our simulated MPI captures many effects of network protocols and MPI implementations. These effects take us away from simple analytical models of communication performance, in which there is no weirdness, but that simply don't match the reality of communication behavior in real-world parallel platforms. As a result, experimental results are often confusing and it is difficult to draw valid conclusions without looking at a lot of results.

In this activity we enhance the ring-based pipelined broadcast so that it uses asynchronous communication, to use network links even better.

In our current implementation of the broadcast, we use MPI_Send and MPI_Recv. One drawback of this approach is that a process can only send or receive data at a time. However, it would make sense for a process to send chunk #i to its successor while receiving chunk #i+1 from its predecessor. Intuitively, it should make it possible to hide network overheads better and perhaps to achieve higher data transfer rates. Augment your code in bcast_solution.c program with a new broadcast implementation called asynchronous_pipelined_ring_bcast. This implementation builds on the pipelined_ring_bcast implementation but uses the following MPI asynchronous communication primitives: MPI_Isend and MPI_Wait.

Although in principle simple, adding asynchronous communication is often not done correctly by beginner MPI programmers. Here are common mistakes to avoid when implementing asynchronous_pipelined_ring_bcast:

Each call to MPI_Isend must have a matching MPI_Wait call.
The call to MPI_Isend for chunk #i must be issued only after the call to MPI_Recv for chunk #i (since one must wait to have received a chunk to forward it to one's successor).
Having a call to MPI_Isend immediately followed by its matching MPI_Wait call is equivalent to a single MPI_Send call, i.e., it does not implement asynchronous communication!
Given the above, and given that the intent is that one asynchronously sends chunk #i to one's successor while receiving chunk #i+1 from one's predecessor, you should be able to determine the sequence of communication operations at each process. Note that in all the above the root process is a special case because it only sends and never receives. And so is the last process since it only receives and never sends.

Run asynchronous_pipelined_ring_bcast using chunk sizes of 100,000 bytes, 500,000 bytes, 1,000,000 bytes, 5,000,000 bytes, 10,000,000 bytes, 50,000,000 bytes, and 100,000,000 (i.e., 1 chunk) on a 50-processor ring. Report the simulated execution times. Remember that the generate_xml_ring_and_hostfile.py Python script can be used to generate platform files and hostfiles.

Answer the following questions:

Is the best chunk size the same as that for pipelined_ring_bcast?
When using the best chunk size for each of the two implementations (which may be the same if the answer to the previous question is "yes"), does the use of MPI_Isend help? By how much?
Is asynchronous_pipelined_ring_bcast faster than default_bcast or slower? By how much?
What do you conclude about the use of asynchronous communications for the purpose of a ring-based broadcast on a ring-shaped physical platform?

In this activity we shift gear completely and abandon the idea of a ring-based broadcast. Instead we organize the processes as a logical binary tree. The root of the broadcast is also at the root of the tree. In practice, many broadcast algorithms are tree-based. And several real-world systems comprise tree-shaped network topologies. We will evaluate this new broadcast implementation on both a ring-shaped physical platform and a tree-shaped physical platform.

Augment your code in bcast_solution.c with an implementation called asynchronous_pipelined_bintree_bcast that implements the broadcast along a binary tree, splitting the message into chunks for pipelining and using asynchronous communication. The root of the broadcast, process of rank #0, is also the root of the binary tree. Note that the binary tree may not be complete if the number of processors is not of the form 2n − 1.

Use a level order traversal numbering of the nodes in your tree: rank 0 has rank 1 as its left child and rank 2 as its right child, rank 1 has rank 3 as its left child and rank 4 as its right child, etc. The reason why we impose this order is that this is also the order used to number processes in the platform description file used for experimental evaluations (see Step #2). The idea is that the logical binary tree of processes is identical to the physical binary tree of processors (process #i runs on processor #i).

Report the (simulated) wall-clock time of default_bcast, asynchronous_pipelined_ring_bcast, and asynchronous_pipelined_bintree_bcast, on a 50-processor ring platform and a 50-processor binary tree platform. For asynchronous_pipelined_ring_bcast and asynchronous_pipelined_bintree_bcast use the "best chunksize" you determined in Activity #3 for the 50-processor ring platform.

You are provided with a Python script (generate_xml_bintree_and_hostfile.py) that generates the XML file (and hostfile) for the binary tree platform. As for the generate_xml_bintree_and_hostfile.py script, the number of processors is passed as a command-line argument. Just in case, here are the files generated using this script for 50 processors: bintree_50.xml and hostfile_50.txt.

Answer the following questions:

On the 50-processor ring platform, how does asynchronous_pipelined_bintree_bcast compare to asynchronous_pipelined_ring_bcast? Is it unexpected?
On the 50-processor binary tree platform, how do the three implementations compare? Does it seem worth it to implement your own binary tree broadcast on a binary tree physical platform?

Many high-performance computing platforms these days are built using crossbar switches. Although the topology of these switches can itself be a ring, a tree, or anything else, in many cases sizeable platforms are deployed with only a small number of swicthes. In this activity we evaluate our broadcast implementation on two commonplace cluster platform configurations.

Report the (simulated) wall-clock time of default_bcast, asynchronous_pipelined_ring_bcast, and asynchronous_pipelined_bintree_bcast, on the following two platforms:

A 64-processor cluster based on a single crossbar switch: cluster_crossbar_64.xml.
A 64-processor cluster with a shared backbone: cluster_backbone_64.xml.
You can use this hostfile: hostfile_64.txt. For asynchronous_pipelined_ring_bcast and asynchronous_pipelined_bintree_bcast use the "best chunksize" you determined in Activity #3 for the 50-processor ring platform.

Answer the following question:

Overall, does it seem like it's a good idea to use the default MPI broadcast on these more standard platforms, or should one implement one's own? Note that the default broadcast has to pick a particular chunk size while we are "cheating" by picking an empirically good chunk size for our implementations!

What have we learned?

Congratulations for completing this module. At this point you have learned how to use point-to-point MPI communication, both synchronous and asynchronous; you have learned about the most common collective communication primitive, the broadcast; you have observed first hand the connection between communication patterns and the underlying network topology; you have learned how pipelining and asynchronous communication can boost communication performance.

As part of our experimental evaluations, you have observed the overall good performance of the default MPI_Bcast implementation. It would be very enlightening to look at open source MPI implementations and inspect the code (although it is intricate) to understand how the broadcast is implemented. In general, few developers implement broadcast by hand. However, in some cases (see upcoming modules), it can be useful to use a by-hand implementation of a broadcast to achieve better performance (in general, better overlap of communication and computation).

What was difficult?

At the end of your report, write a brief "essay" explaining what you found surprising/challenging, and what solutions you used to solve your difficulties. Are you happy with your implementations? Do you see room for improvement?

The End