/* * (c) 2022 by MPI Session working group * * Authors: * - Dominik Huber * - Martin Schreiber */ //#include #include "dummy_vars.h" /* * **************************************** * Proposal for dynamic MPI Sessions * **************************************** * * This should be seen as a what-if source code if we take the approach to split everything developed * so far around dynamic resources with MPI Sesions into little pieces. * * Some quite important remarks: * * - Instead of creating monolithic API calls, we broke them down into smaller pieces, e.g. * - A dictionary associated to each PSet to provide information * - An even more flexible abstraction of PSet operations using lists as input/output * - Operations to move & delete PSets * * - ADVANTAGES: The breakdown of monolithic API calls should lead to the following *benefits* * - It might be easier to support fault tolerance since the API is broken down into smaller pieces * - Transactional resource changes can be easily supported * * - DISADVANTAGES: The breakdown of monolithic API calls can also lead to the following *drawbacks* * - Additional complexity, which can be hidden by a some small library targeting, e.g., only * loop-based execution * - Obviously, additional overheads * * - We follow the approach * - of a "mathematical" perspective on resources as sets and the corresponding operations on sets * - that each operation is related to PSets and a graph is build with PSets as nodes and operations * as directed hyperedges. * * - There is a version without requiring to setup any communicator for resource changes (MPI_SESSION_BCAST=1) * This requires some broadcast to PSet members, hence an additional API. * But this would separate things even stronger. * * - New API is pointed out with a comment "NEWAPI" in the line above */ /* * Assumptions: * * - PSets are immutable from the outside. * The only way to modify a PSet is to overwrite it with a new PSet ONLY by the application. * * - There is so far no consensus-based resource change scheme. * Resource changes are managed by one particular MPI processes, called primary process. */ // Activate / Deactivate Dynamic Sessions #define DYN_SESSIONS 1 /* * Terminology * * - PSet: Set of processes, aka. Process Set. * * - URI: URI referring to a PSet. We often refer to PSet as a URI since we can lookup * the PSet with the URI. * * - Starting PSet: PSet at program start. * * - Primary rank or process: * There are two different versions: * * - Primary rank: Refers to the MPI rank who's in charge of resource changes. * Note, that there's a version avoiding this 'rank' since this requires setting up * a communicator first. * * - Primary process: Refers to the MPI process who's in charge of the * resource changes for a particular PSet (receiving + processing resource change). */ int main() { string main_pset = ""; // main pset used for simulation (a default application pset (size = 0) ) MPI_Group main_group; // main group MPI_Comm main_comm; // main communicator MPI_Request request; // some request handler MPI_Group new_main_group; // new group MPI_Comm new_main_comm; // new communicator /* * Start with MPI Sessions */ int session; MPI_Session_create(&session); { /* * Determine PSET to which this process belongs to from the very beginning on. * * We need to differentiate between dynamically and non-dynamically started * processes. */ #if 0 /* * Version where we determine the default PSET */ // NEWAPI MPI_Session_get_default_pset(&main_pset); // Here, we need to identify somehow the dynamically started processes if (main_pset != "") /* Starting pset is always empty string */ #elif 1 /* * Version where we determine the default PSET with a special flag */ // NEWAPI int dynamic; MPI_Session_get_default_pset(&main_pset, &dynamic); // Here, we need to identify somehow the dynamically started processes if (dynamic) /* This is a dynamically started process */ #else /* * Alternative: * Get this information from the MPI info object related to the MPI Session * MPI_Session_Get_Info(...) --> see Version 2 * * TODO */ #endif { /* * This process was created dynamically * * => Lookup further information in dictionary entry for this particular delta pset * Counterpart is "MPI_Session_Set_PSet_Info" below. * * If the information does not yet exist, this call is waiting for it! */ // NEWAPI MPI_Session_Get_PSet_Info( session, // session handler main_pset, // URI to get info for info // Info field to retrieve ); main_pset = info.main_pset; // Get string with main pset URI in it /* * We finally have our main_pset in which we will be a part of with the dynamic process */ } } /* * main_pset (potentially new dynamic one) => main_comm * Create group and then communicator */ MPI_Group_from_session_pset(session, main_pset, &main_group); /* * We use IComm also here since we might sync up with another IComm later on */ MPI_Comm_create_from_group(main_group, NULL, NULL, main_comm); /* * We can entirely avoid using the communicator if we'd * introduce a BCast on the MPI PSet session level. * * It's what this precompiler directive signals. */ #define MPI_SESSION_BCAST 0 // This can be used to enable the version *WITH* the MPI Session-based bcast #if !MPI_SESSION_BCAST /* * We simply assume here that rank 0 is the primary rank. */ int primary_rank = 0; int my_rank; MPI_Comm_rank(main_comm, &my_rank); #endif /* * +++++++++++++++++++++++++++++++++++++++++++++++++ * Variables related to the resource change * +++++++++++++++++++++++++++++++++++++++++++++++++ */ #if MPI_SESSION_BCAST /* * Flag indicating whether we're the primary process */ int primary_process = -1; /* * Determine whether we're the primary process for this PSET * * Note that the primary rank could be also determined differently or be entirely eliminated. * * Rationality: We can't make this rank-based, since for the rank we * first need to setup all communicators, etc. to determine it. * * primary_process is set to True if we're the primary process for this main_pset */ // NEWAPI MPI_Session_get_primary_process(&main_pset, &primary_process); #endif /* * Phase of resource change: * 1: No resource change * 2: Resource change currently in progress * 3: Waiting for resources to be ready * ...: more phases possible and partly required */ int resource_change_phase = 1; /* * PSets used during change */ string rc_change_pset; // URI with added / removed psets string new_main_pset; // URI with new main psets /* * +++++++++++++++++++++++++++++++++++++++++++++++++ * (END) Variables related to resource change * +++++++++++++++++++++++++++++++++++++++++++++++++ */ /* * Main working loop, * but could be also a node in a tree traversal, * etc. */ while (1 /* work */) { /* * Application dependent load: * Possible a balancing and work step */ rebalance_step(/* */); /*...;*/ work_step(); #if DYN_SESSIONS if (resource_change_phase == 1) { /* * Phase 1: * * No resource change currently processed, ready to process next resource change request */ int res_change_requested = 0; // Flag indicating whether there's a resource change to be processed #if MPI_SESSION_BCAST if (primary_process) #else if (primary_rank == my_rank) #endif { /* * flag: True if there is a resource change */ int flag; // NEWAPI MPI_Session_Dyn_Test_res_change(session, main_pset, &flag); if (flag) { res_change_requested = 1; // TODO: We could already start BCasting the information about a resource change here // Get resource change and process it // NEWAPI (used also before) MPI_Session_Dyn_Recv_res_change( session, // session handler main_pset, // PSet related to this resource change &rc_type, // OUT: Type of resource change &rc_change_pset, // OUT: Delta PSet related to resources (can be also multiple psets) &rc_status // OUT: Status information ); switch(rc_type) { case MPI_RC_ADD: // (P, P) -> P // Compute Union // NEWAPI MPI_Session_Dyn_pset_create_op( session, // session handler MPI_PSET_OP_UNION, // operation to execute {main_pset, rc_change_pset, 0}, // IN: main and delta pset new_main_pset // OUT: New PSet(s) ); /* * Publish information about new psets * * TODO: * We assume that all psets are directly visible to all MPI processes * (Can be efficiently implemented by "lazy synchronization") * Therefore, this call which publishes this is obsolete */ /* // NEWAPI MPI_Session_Dyn_pset_publish_All( [rc_change_pset, new_main_pset] ); */ /* * Provide information to newly started delta psets what's the new pset */ MPI_Info info = {new_main_pset, 0}; // NEWAPI MPI_Session_Set_PSet_Info( session, // session handler rc_change_pset, // new resources info // Information about new main pset URI ); break; case MPI_RC_SUB: // (P, P) -> P // Compute difference // NEWAPI MPI_Session_Dyn_pset_create_op( session, // session handler MPI_PSET_OP_DIFFERENCE, // operation to execute {main_pset, rc_change_pset, 0}, // IN: main and delta pset new_main_pset // OUT: New PSet(s) ); /* * Further steps not required since resources are only removed */ break; #if 0 // Future work case MPI_RC_REPLACE: // TODO P -> P break; case MPI_RC_MERGE: // TODO P^n -> P break; case MPI_RC_SPLIT: // TODO P -> P^n // Provide URI to “directory” break; // For future MPI to get rid of splitting groups, etc. // All managed by RM #endif default: FatalError("Not supported"); } } } /* * Now we broadcast the information whether there's some resource change also to all other MPI processes * * Can be also made non-blocking: * - Do an IBcast * - Introduce another resource change phase to do one or two more iteration * - Then, perform a blocking wait * (But this would only mess up the code even further) */ #if !MPI_SESSION_BCAST /* * This utilizes the COMMUNICATOR, even if we just need a BCast. * Maybe we can also use the PSet's dictionary for this to avoid * any MPI collective communication. */ MPI_Bcast(&res_change_requested, 1, MPI_INT, primary_rank, main_comm); if (res_change_requested) resource_change_phase = 2; // All MPI ranks now perform the resource change #else /* * New version avoiding BCast */ MPI_Session_bcast_PSet_info(main_pset, new_main_pset, primary_process); if (new_main_pset != "") resource_change_phase = 2; #endif } if (resource_change_phase == 2) { /* * Phase 2: * * All other MPI processes involved in the resource change need to be * informed about the new URIs and create a new communicator. * * => We broadcast the new URI to all MPI processes in the "old" communicator */ #if !MPI_SESSION_BCAST // This is not required for MPI_SESSION_BCAST MPI_Bcast(new_main_pset, len(new_main_pset)+1, MPI_BYTE, primary_rank, main_comm); #endif /* * Check whether the current MPI process is included in new PSet. * If not, our way to handle this in this example is to terminate the program (which is not always the case). * * TODO: This looks ugly. * TODO: Optimization possible by just looking up the changed resources. */ // NEWAPI int included = MPI_Session_Pset_IsProcessIncluded(new_main_pset); if (!included) break; resource_change_phase = 3; } if (resource_change_phase == 3) { /* * Phase 3: * Wait until all other MPI processes are ready with the MPI setup */ int flag; /* * Test for MPI Session running on processes * * This ensures, that the endpoints for setting up the communicators exist and * to take into account all other program startup delays * (e.g. if 1 GB of libraries need to be fetched, if the program has other startup code * before setting up MPI Sessions, etc.) * * TODO - ALTERNATIVE: Make the MPI_group_from_session_pset non-blocking */ MPI_Session_Test_Exists(rc_change_pset, &flag, NULL); if (flag) { /* * main_pset (potentially new dynamic one) => main_comm * Create group and then communicator */ MPI_group_from_session_pset(session, new_main_pset, &new_main_group); /* * Create communicator generation here */ MPI_Comm_create_from_group(new_main_group, "tag", NULL, NULL, new_main_comm); main_comm = new_main_comm; MPI_Comm_rank(main_comm, &my_rank); #if MPI_SESSION_BCAST if (primary_process) #else if (primary_rank == my_rank) #endif { /* * Trigger deletion of PSet. * (This does not force termination of MPI processes within PSet.) * * TODO: * - Option 1) Is the PSet URI directly deleted from the list of PSets OR * - Option 2) Is it marked for deletion and deleted sooner or later? * * TODO: * - What happens if an MPI process doesn't belong to any PSet? * - Or do we have something like a "dynamic all world" PSet? */ /* * We use a "MOVE" operation here because resource changes are related to PSets. * If we support this move, potentially existing resource change requests for main_pset can from hereon still be processed. */ #if 1 // NEWAPI MPI_Session_Dyn_pset_move(new_main_pset, main_pset); // NEWAPI MPI_Session_Dyn_pset_delete(rc_change_pset); /* * WARNING: This will destroy the graph description */ #else /* * Alternative: PSet can be overwritten by PSet operation */ #endif /* * main_pset: uri://atmos/resmain * new_main_pset: uri://atmos/resmain_new * * MPI_Session_Dyn_pset_move: * main_pset: uri://atmos/resmain # with content of resmain_new */ } main_pset = new_main_pset; new_main_pset = ""; /* * Maybe the primary process changed */ #if MPI_SESSION_BCAST MPI_Session_get_primary_process(&main_pset, &primary_process); #endif resource_change_phase = 4; // or additional phase for other setup delays } } if (resource_change_phase == 4) { /* * Setup rest (loading data, etc.) */ /* ... */ resource_change_phase = 1; /* * TODO: It shouldn't be part of MPI (since it's about message passing!), but that's the point * where we might inform the Resource optimizer that we can accept new resource changes. */ } #endif } /* * Release all communicators / groups which have been allocated. */ MPI_Session_finalize(session); }