changes, use atomic operation for managing dependency counter.
authoryeom <yeom>
Wed, 27 Jan 2010 18:19:52 +0000 (18:19 +0000)
committeryeom <yeom>
Wed, 27 Jan 2010 18:19:52 +0000 (18:19 +0000)
Robust/src/IR/Flat/BuildCode.java
Robust/src/Runtime/mlp_lock.h [new file with mode: 0644]
Robust/src/Runtime/mlp_runtime.h
Robust/src/Runtime/workschedule.c

index afca516538972fab2c68aaa609d88dead55a03e1..803c623ea693739353d7812f872536b020383da2 100644 (file)
@@ -3349,14 +3349,12 @@ public class BuildCode {
     //output.println("     addNewItem( seseCallStack, (void*) seseToIssue);");
 
     // fill in common data
+    output.println("     int localCount=0;");
     output.println("     seseToIssue->common.classID = "+fsen.getIdentifier()+";");
     output.println("     psem_init( &(seseToIssue->common.stallSem) );");
 
     output.println("     seseToIssue->common.forwardList = createQueue();");
-    //eom
-    output.println("     seseToIssue->common.connectedList = createQueue();");
-    //
-    output.println("     seseToIssue->common.unresolvedDependencies = 0;");
+    output.println("     seseToIssue->common.unresolvedDependencies = 10000;");
     output.println("     pthread_cond_init( &(seseToIssue->common.doneCond), NULL );");
     output.println("     seseToIssue->common.doneExecuting = FALSE;");    
     output.println("     pthread_cond_init( &(seseToIssue->common.runningChildrenCond), NULL );");
@@ -3389,6 +3387,11 @@ public class BuildCode {
       }
     }
     
+    // before potentially adding this SESE to other forwarding lists,
+    //  create it's lock and take it immediately
+    output.println("     pthread_mutex_init( &(seseToIssue->common.lock), NULL );");
+    output.println("     pthread_mutex_lock( &(seseToIssue->common.lock) );");
+    
     // count up memory conflict dependencies,
     // eom
     ConflictGraph graph=null;
@@ -3406,6 +3409,7 @@ public class BuildCode {
                        
                        output.println();
                        output.println("     /*add waiting queue element*/");
+                       output.println("     pthread_mutex_lock( &(parentCommon->lock) );");
                        output.println("     struct Queue*  newWaitingItemQueue=createQueue();");
                        
                        Set<WaitingElement> waitingQueueSet=graph.getWaitingElementSetBySESEID(fsen
@@ -3417,7 +3421,7 @@ public class BuildCode {
                                output.println("     WaitingElement* newElement=NULL;");
                                output.println("     struct Queue* list=NULL;");
                                output.println("     struct QueueItem* newQItem=NULL;");
-                               output.println("     pthread_mutex_lock( &(parentCommon->lock) );");
+//                             output.println("     pthread_mutex_lock( &(parentCommon->lock) );");
                                for (Iterator iterator = waitingQueueSet.iterator(); iterator
                                                .hasNext();) {
                                        WaitingElement waitingElement = (WaitingElement) iterator.next();
@@ -3437,20 +3441,19 @@ public class BuildCode {
                                                                        + waitingElement.getWaitingID() + ",newElement);");
                                        output
                                                        .println("     addNewItem(newWaitingItemQueue,newQItem);");
-                                       output
-                                                       .println("     ++(seseToIssue->common.unresolvedDependencies);");
+//                                     output.println("     ++(seseToIssue->common.unresolvedDependencies);");
+                                       output.println("     ++(localCount);");
                                        output
                                        .println();
                                }
-                               output
-                                               .println("     pthread_mutex_unlock( &(parentCommon->lock) );");
+//                             output.println("     pthread_mutex_unlock( &(parentCommon->lock) );");
                                output.println("     }");
                        }
                        
                        output.println("     /*decide whether it is runnable or not in regarding to memory conflicts*/");
                        output.println("     {");
                        output.println("      if( !isEmpty(newWaitingItemQueue) ){");
-                       output.println("         pthread_mutex_lock( &(parentCommon->lock)  );");
+//                     output.println("         pthread_mutex_lock( &(parentCommon->lock)  );");
                        output.println("         int idx;");
                        output.println("         for(idx = 0 ; idx < numRelatedWaitingQueue ; idx++){");
                        output.println("            struct Queue *allocQueue=parentCommon->allocSiteArray[idx].waitingQueue;");
@@ -3458,24 +3461,22 @@ public class BuildCode {
                        output.println("               struct QueueItem* nextQItem=getHead(allocQueue);");
                        output.println("               while( nextQItem != NULL ){");
                        output.println("                  if(contains(newWaitingItemQueue,nextQItem) && isRunnable(allocQueue,nextQItem)){");
-                       output.println("                     if(seseToIssue->common.unresolvedDependencies>0)");
-                       output.println("                     --(seseToIssue->common.unresolvedDependencies);");
+//                     output.println("                     if(seseToIssue->common.unresolvedDependencies>0)");
+//                     output.println("                     --(seseToIssue->common.unresolvedDependencies);");
+                       output.println("                     --(localCount);");
                        output.println("                  }");
                        output.println("                     nextQItem=getNextQueueItem(nextQItem);");
                        output.println("               }");
                        output.println("          }");
                        output.println("        }");
-                       output.println("        pthread_mutex_unlock( &(parentCommon->lock)  );");
+               
                        output.println("     }");
                        output.println("     }");
+                       output.println("        pthread_mutex_unlock( &(parentCommon->lock)  );");
                        output.println();
                        
                }
 
-    // before potentially adding this SESE to other forwarding lists,
-    //  create it's lock and take it immediately
-    output.println("     pthread_mutex_init( &(seseToIssue->common.lock), NULL );");
-    output.println("     pthread_mutex_lock( &(seseToIssue->common.lock) );");
     // eom
 //    output.println("     pthread_mutex_init( &(seseToIssue->common.waitingQueueLock), NULL );");
     //
@@ -3495,7 +3496,8 @@ public class BuildCode {
        output.println("       }");
        output.println("       if( !src->doneExecuting ) {");
        output.println("         addNewItem( src->forwardList, seseToIssue );");
-       output.println("         ++(seseToIssue->common.unresolvedDependencies);");
+//     output.println("         ++(seseToIssue->common.unresolvedDependencies);");
+       output.println("         ++(localCount);");
        output.println("       }");
        output.println("       pthread_mutex_unlock( &(src->lock) );");
        output.println("     }");
@@ -3522,7 +3524,8 @@ public class BuildCode {
        output.println("             seseToIssue != peekItem( src->forwardList ) ) {");
        output.println("           if( !src->doneExecuting ) {");
        output.println("             addNewItem( src->forwardList, seseToIssue );");
-       output.println("             ++(seseToIssue->common.unresolvedDependencies);");
+//     output.println("             ++(seseToIssue->common.unresolvedDependencies);");
+       output.println("             ++(localCount);");
        output.println("           }");
        output.println("         }");
        output.println("         pthread_mutex_unlock( &(src->lock) );");       
@@ -3569,15 +3572,23 @@ public class BuildCode {
       }
  
     }
-
+    
+    // release this SESE for siblings to update its dependencies or,
+    // eventually, for it to mark itself finished
+    output.println("     pthread_mutex_unlock( &(seseToIssue->common.lock) );");
+    
     // if there were no outstanding dependencies, issue here
+    output.println("     if(  atomic_sub_and_test(10000-localCount,&(seseToIssue->common.unresolvedDependencies) ) ) {");
+    output.println("       workScheduleSubmit( (void*)seseToIssue );");
+    output.println("     }");
+    /*
     output.println("     if( seseToIssue->common.unresolvedDependencies == 0 ) {");
     output.println("       workScheduleSubmit( (void*)seseToIssue );");
     output.println("     }");
-
+    */
     // release this SESE for siblings to update its dependencies or,
     // eventually, for it to mark itself finished
-    output.println("     pthread_mutex_unlock( &(seseToIssue->common.lock) );");
+//    output.println("     pthread_mutex_unlock( &(seseToIssue->common.lock) );");
     output.println("   }");
     
   }
@@ -3662,12 +3673,13 @@ public class BuildCode {
     // decrement dependency count for all SESE's on your forwarding list
     output.println("   while( !isEmpty( "+com+".forwardList ) ) {");
     output.println("     SESEcommon* consumer = (SESEcommon*) getItem( "+com+".forwardList );");
-    output.println("     pthread_mutex_lock( &(consumer->lock) );");
-    output.println("     --(consumer->unresolvedDependencies);");
-    output.println("     if( consumer->unresolvedDependencies == 0 ) {");
+//    output.println("     pthread_mutex_lock( &(consumer->lock) );");
+//  output.println("     --(consumer->unresolvedDependencies);");
+//    output.println("     if( consumer->unresolvedDependencies == 0 ) {");
+    output.println("     if( atomic_sub_and_test(1, &(consumer->unresolvedDependencies)) ){");
     output.println("       workScheduleSubmit( (void*)consumer );");
     output.println("     }");
-    output.println("     pthread_mutex_unlock( &(consumer->lock) );");
+//    output.println("     pthread_mutex_unlock( &(consumer->lock) );");
     output.println("   }");
     
     
@@ -3724,15 +3736,16 @@ public class BuildCode {
        output.println("                 }");
        output.println("              }else{");
        output.println("                 if(isResolved){");
-       output.println("                    pthread_mutex_lock( &(seseNextItem->lock) );");
-       output.println("                    if(seseNextItem->unresolvedDependencies > 0){");
-       output.println("                       --(seseNextItem->unresolvedDependencies);");
-       output.println("                       if( seseNextItem->unresolvedDependencies == 0){");
+//     output.println("                    pthread_mutex_lock( &(seseNextItem->lock) );");
+//     output.println("                    if(seseNextItem->unresolvedDependencies > 0){");
+//     output.println("                       --(seseNextItem->unresolvedDependencies);");
+//     output.println("                       if( seseNextItem->unresolvedDependencies == 0){");
        //output.println("                          workScheduleSubmit( (void*)nextItem);");
+       output.println("                       if( atomic_sub_and_test(1, &(seseNextItem->unresolvedDependencies)) ){");
        output.println("                            addNewItem(launchQueue,(void*)seseNextItem);");
        output.println("                       }");
-       output.println("                    }");
-       output.println("                    pthread_mutex_unlock( &(seseNextItem->lock) );");
+//     output.println("                    }");
+//     output.println("                    pthread_mutex_unlock( &(seseNextItem->lock) );");
        output.println("                 }");
        output.println("                 nextQItem=getNextQueueItem(nextQItem);");
        output.println("               }");
diff --git a/Robust/src/Runtime/mlp_lock.h b/Robust/src/Runtime/mlp_lock.h
new file mode 100644 (file)
index 0000000..18bcf4d
--- /dev/null
@@ -0,0 +1,26 @@
+#define LOCK_PREFIX \
+  ".section .smp_locks,\"a\"\n"   \
+  "  .align 4\n"                  \
+  "  .long 661f\n"             /* address */\
+  ".previous\n"                   \
+  "661:\n\tlock; "
+
+
+static inline void atomic_dec(volatile int *v) {
+  __asm__ __volatile__ (LOCK_PREFIX "decl %0"
+                        : "+m" (*v));
+}
+
+static inline void atomic_inc(volatile int *v) {
+  __asm__ __volatile__ (LOCK_PREFIX "incl %0"
+                        : "+m" (*v));
+}
+
+static inline int atomic_sub_and_test(int i, volatile int *v) {
+  unsigned char c;
+
+  __asm__ __volatile__ (LOCK_PREFIX "subl %2,%0; sete %1"
+                        : "+m" (*v), "=qm" (c)
+                        : "ir" (i) : "memory");
+  return c;
+}
index 61ebdd342b80518aa7d34812f942fbbd4effbb62..4ea520e37d7a7ce1697fc65960be3ce8c76b5f6d 100644 (file)
@@ -5,7 +5,7 @@
 #include <pthread.h>
 #include "Queue.h"
 #include "psemaphore.h"
-
+#include "mlp_lock.h"
 
 #ifndef FALSE
 #define FALSE 0
@@ -49,7 +49,7 @@ typedef struct SESEcommon_t {
   pthread_mutex_t lock;
 
   struct Queue*   forwardList;
-  int             unresolvedDependencies;
+  volatile int             unresolvedDependencies;
 
   pthread_cond_t  doneCond;
   int             doneExecuting;
index a83435c32f528d307344b630e78df4b166fd29c8..8b5b3c71bd46617c3aa8d8e0d5f2a80dbfa3a864 100644 (file)
@@ -248,7 +248,8 @@ void workScheduleInit( int numProcessors,
   pthread_mutex_init(&gclistlock, NULL);
   pthread_cond_init(&gccond, NULL);
 
-  numWorkers = numProcessors*5;
+  //numWorkers = numProcessors*5;
+  numWorkers = numProcessors + 1;
   workFunc   = func;
 
   dequeWorkUnits = createQueue();