execution: bugfix - resolved promises should propagate synchronization
[cdsspec-compiler.git] / execution.cc
index e01181af486e3b518d2bddcb7b78003f7c9b6a68..4496b9c63deeca6fa8198ec086425ba8fa1d6932 100644 (file)
@@ -817,9 +817,10 @@ void ModelExecution::add_future_value(const ModelAction *writer, ModelAction *re
 /**
  * Process a write ModelAction
  * @param curr The ModelAction to process
+ * @param work The work queue, for adding fixup work
  * @return True if the mo_graph was updated or promises were resolved
  */
-bool ModelExecution::process_write(ModelAction *curr)
+bool ModelExecution::process_write(ModelAction *curr, work_queue_t *work)
 {
        /* Readers to which we may send our future value */
        ModelVector<ModelAction *> send_fv;
@@ -832,7 +833,7 @@ bool ModelExecution::process_write(ModelAction *curr)
 
        if (promise) {
                earliest_promise_reader = promise->get_reader(0);
-               updated_promises = resolve_promise(curr, promise);
+               updated_promises = resolve_promise(curr, promise, work);
        } else
                earliest_promise_reader = NULL;
 
@@ -1025,21 +1026,9 @@ void ModelExecution::process_relseq_fixup(ModelAction *curr, work_queue_t *work_
                /* Must synchronize */
                if (!synchronize(release, acquire))
                        return;
-               /* Re-check all pending release sequences */
-               work_queue->push_back(CheckRelSeqWorkEntry(NULL));
-               /* Re-check act for mo_graph edges */
-               work_queue->push_back(MOEdgeWorkEntry(acquire));
-
-               /* propagate synchronization to later actions */
-               action_list_t::reverse_iterator rit = action_trace.rbegin();
-               for (; (*rit) != acquire; rit++) {
-                       ModelAction *propagate = *rit;
-                       if (acquire->happens_before(propagate)) {
-                               synchronize(acquire, propagate);
-                               /* Re-check 'propagate' for mo_graph edges */
-                               work_queue->push_back(MOEdgeWorkEntry(propagate));
-                       }
-               }
+
+               /* Propagate the changed clock vector */
+               propagate_clockvector(acquire, work_queue);
        } else {
                /* Break release sequence with new edges:
                 *   release --mo--> write --mo--> rf */
@@ -1284,7 +1273,7 @@ ModelAction * ModelExecution::check_current_action(ModelAction *curr)
                        if (act->is_read() && !second_part_of_rmw && process_read(act))
                                update = true;
 
-                       if (act->is_write() && process_write(act))
+                       if (act->is_write() && process_write(act, &work_queue))
                                update = true;
 
                        if (act->is_fence() && process_fence(act))
@@ -2062,6 +2051,37 @@ void ModelExecution::get_release_seq_heads(ModelAction *acquire,
        }
 }
 
+/**
+ * @brief Propagate a modified clock vector to actions later in the execution
+ * order
+ *
+ * After an acquire operation lazily completes a release-sequence
+ * synchronization, we must update all clock vectors for operations later than
+ * the acquire in the execution order.
+ *
+ * @param acquire The ModelAction whose clock vector must be propagated
+ * @param work The work queue to which we can add work items, if this
+ * propagation triggers more updates (e.g., to the modification order)
+ */
+void ModelExecution::propagate_clockvector(ModelAction *acquire, work_queue_t *work)
+{
+       /* Re-check all pending release sequences */
+       work->push_back(CheckRelSeqWorkEntry(NULL));
+       /* Re-check read-acquire for mo_graph edges */
+       work->push_back(MOEdgeWorkEntry(acquire));
+
+       /* propagate synchronization to later actions */
+       action_list_t::reverse_iterator rit = action_trace.rbegin();
+       for (; (*rit) != acquire; rit++) {
+               ModelAction *propagate = *rit;
+               if (acquire->happens_before(propagate)) {
+                       synchronize(acquire, propagate);
+                       /* Re-check 'propagate' for mo_graph edges */
+                       work->push_back(MOEdgeWorkEntry(propagate));
+               }
+       }
+}
+
 /**
  * Attempt to resolve all stashed operations that might synchronize with a
  * release sequence for a given location. This implements the "lazy" portion of
@@ -2100,22 +2120,8 @@ bool ModelExecution::resolve_release_sequences(void *location, work_queue_t *wor
                                        updated = true;
 
                if (updated) {
-                       /* Re-check all pending release sequences */
-                       work_queue->push_back(CheckRelSeqWorkEntry(NULL));
-                       /* Re-check read-acquire for mo_graph edges */
-                       if (acquire->is_read())
-                               work_queue->push_back(MOEdgeWorkEntry(acquire));
-
-                       /* propagate synchronization to later actions */
-                       action_list_t::reverse_iterator rit = action_trace.rbegin();
-                       for (; (*rit) != acquire; rit++) {
-                               ModelAction *propagate = *rit;
-                               if (acquire->happens_before(propagate)) {
-                                       synchronize(acquire, propagate);
-                                       /* Re-check 'propagate' for mo_graph edges */
-                                       work_queue->push_back(MOEdgeWorkEntry(propagate));
-                               }
-                       }
+                       /* Propagate the changed clock vector */
+                       propagate_clockvector(acquire, work_queue);
                }
                if (complete) {
                        it = pending_rel_seqs.erase(it);
@@ -2328,15 +2334,20 @@ Promise * ModelExecution::pop_promise_to_resolve(const ModelAction *curr)
  * Resolve a Promise with a current write.
  * @param write The ModelAction that is fulfilling Promises
  * @param promise The Promise to resolve
+ * @param work The work queue, for adding new fixup work
  * @return True if the Promise was successfully resolved; false otherwise
  */
-bool ModelExecution::resolve_promise(ModelAction *write, Promise *promise)
+bool ModelExecution::resolve_promise(ModelAction *write, Promise *promise,
+               work_queue_t *work)
 {
        ModelVector<ModelAction *> actions_to_check;
 
        for (unsigned int i = 0; i < promise->get_num_readers(); i++) {
                ModelAction *read = promise->get_reader(i);
-               read_from(read, write);
+               if (read_from(read, write)) {
+                       /* Propagate the changed clock vector */
+                       propagate_clockvector(read, work);
+               }
                actions_to_check.push_back(read);
        }
        /* Make sure the promise's value matches the write's value */
@@ -2643,7 +2654,7 @@ void ModelExecution::dumpGraph(char *filename) const
        mo_graph->dumpNodes(file);
        ModelAction **thread_array = (ModelAction **)model_calloc(1, sizeof(ModelAction *) * get_num_threads());
 
-       for (action_list_t::iterator it = action_trace.begin(); it != action_trace.end(); it++) {
+       for (action_list_t::const_iterator it = action_trace.begin(); it != action_trace.end(); it++) {
                ModelAction *act = *it;
                if (act->is_read()) {
                        mo_graph->dot_print_node(file, act);