fix bugs in multicore version runtime
[IRC.git] / Robust / src / Runtime / multicoretask.c
index 44e910a7d32fad0dfc1259cf5e78f4fee7038c2f..6b7a2e807c002a54219916037586a573e729929c 100644 (file)
@@ -28,10 +28,6 @@ struct RuntimeHash * forward;
 struct RuntimeHash * reverse;
 #endif // if 0: for recovery
 
-#ifdef PROFILE
-void outputProfileData();
-#endif
-
 bool getreadlock(void* ptr);
 void releasereadlock(void* ptr);
 bool getwritelock(void* ptr);
@@ -54,7 +50,6 @@ inline void run(void * arg) {
   bool sendStall = false;
   bool isfirst = true;
   bool tocontinue = false;
-  struct QueueItem * objitem = NULL;
   struct transObjInfo * objInfo = NULL;
   int grount = 0;
   bool allStall = true;
@@ -103,6 +98,10 @@ inline void run(void * arg) {
   isMsgHanging = false;
   isMsgSending = false;
 
+  smemflag = false;
+  bamboo_cur_msp = NULL;
+  bamboo_smem_size = 0;
+
   // create the lock table, lockresult table and obj queue
   locktable.size = 20;
   locktable.bucket = (struct RuntimeNode **) RUNMALLOC_I(sizeof(struct RuntimeNode *)*20);
@@ -225,8 +224,7 @@ inline void run(void * arg) {
 #endif
                          sendStall = false;
                          tocontinue = true;
-                         objitem = getTail(&objqueue);
-                         objInfo = (struct transObjInfo *)objitem->objectptr;
+                         objInfo = (struct transObjInfo *)getItem(&objqueue); 
                          obj = objInfo->objptr;
 #ifdef DEBUG
                          BAMBOO_DEBUGPRINT_REG((int)obj);
@@ -249,10 +247,12 @@ inline void run(void * arg) {
 #ifndef INTERRUPT
                          reside = false;
 #endif
+
                          if(grount == 1) {
                                  int k = 0;
                                  // flush the object
 #ifdef CACHEFLUSH
+                                 BAMBOO_CACHE_FLUSH_RANGE((int)obj,sizeof(int));
                                  BAMBOO_CACHE_FLUSH_RANGE((int)obj, classsize[((struct ___Object___ *)obj)->type]);
 #endif
                                  // enqueue the object
@@ -264,27 +264,40 @@ inline void run(void * arg) {
                                          BAMBOO_DEBUGPRINT_REG(taskindex);
                                          BAMBOO_DEBUGPRINT_REG(paramindex);
                                          struct ___Object___ * tmpptr = (struct ___Object___ *)obj;
-         tprintf("Process %x(%d): receive obj %x(%lld), ptrflag %x\n", corenum, corenum, (int)obj, (long)obj, tmpptr->flag);
+                                         tprintf("Process %x(%d): receive obj %x(%lld), ptrflag %x\n", corenum, corenum, (int)obj, (long)obj, tmpptr->flag);
 #endif
-
                                          enqueueObject_I(obj, queues, 1);
-#ifdef DEBUG
-         BAMBOO_DEBUGPRINT_REG(hashsize(activetasks));
+#ifdef DEBUG                            
+                                         BAMBOO_DEBUGPRINT_REG(hashsize(activetasks));
 #endif
                                  }
-                                 removeItem(&objqueue, objitem);
                                  releasewritelock_I(obj);
                                  RUNFREE(objInfo->queues);
                                  RUNFREE(objInfo);
                          } else {
                                  // can not get lock
-                                 // put it at the end of the queue
-                                 // and try to execute active tasks already enqueued first
-                                 removeItem(&objqueue, objitem);
+                                 // put it at the end of the queue if no update version in the queue
+                                 struct QueueItem * qitem = getHead(&objqueue);
+                                 struct QueueItem * prev = NULL;
+                                 while(qitem != NULL) {
+                                         struct transObjInfo * tmpinfo = (struct transObjInfo *)(qitem->objectptr);
+                                         if(tmpinfo->objptr == obj) {
+                                                 // the same object in the queue, which should be enqueued
+                                                 // recently. Current one is outdate, do not re-enqueue it
+                                                 RUNFREE(objInfo->queues);
+                                                 RUNFREE(objInfo);
+                                                 goto objqueuebreak;
+                                         } else {
+                                                 prev = qitem;
+                                         }
+                                         qitem = getNextQueueItem(prev);
+                                 }
+                                 // try to execute active tasks already enqueued first
                                  addNewItem_I(&objqueue, objInfo);
 #ifdef PROFILE
                                  //isInterrupt = true;
 #endif
+objqueuebreak:
                                  BAMBOO_CLOSE_CRITICAL_SECTION_OBJ_QUEUE();
 #ifdef DEBUG
                                  BAMBOO_DEBUGPRINT(0xf000);
@@ -385,8 +398,8 @@ inline void run(void * arg) {
 #ifdef USEIO
                                                                  totalexetime = BAMBOO_GET_EXE_TIME();
 #else
-                                                                 BAMBOO_DEBUGPRINT(0xbbbbbbbb);
                                                                  BAMBOO_DEBUGPRINT(BAMBOO_GET_EXE_TIME());
+                                                                 BAMBOO_DEBUGPRINT(0xbbbbbbbb);
 #endif
                                                                  // profile mode, send msgs to other cores to request pouring
                                                                  // out progiling data
@@ -1060,6 +1073,19 @@ void addAliasLock(void * ptr, int lock) {
   }
 }
 
+#ifdef PROFILE
+inline void setTaskExitIndex(int index) {
+       taskInfoArray[taskInfoIndex]->exitIndex = index;
+}
+
+inline void addNewObjInfo(void * nobj) {
+       if(taskInfoArray[taskInfoIndex]->newObjs == NULL) {
+               taskInfoArray[taskInfoIndex]->newObjs = createQueue();
+       }
+       addNewItem(taskInfoArray[taskInfoIndex]->newObjs, nobj);
+}
+#endif
+
 /* Message format:
  *      type + Msgbody
  * type: 0 -- transfer object
@@ -1079,6 +1105,8 @@ void addAliasLock(void * ptr, int lock) {
  *       c -- status confirm request
  *       d -- status report msg
  *       e -- terminate
+ *       f -- requiring for new memory
+ *      10 -- response for new memory request
  *
  * ObjMsg: 0 + size of msg + obj's address + (task index + param index)+
  * StallMsg: 1 + corenum + sendobjs + receiveobjs (size is always 4 * sizeof(int))
@@ -1094,191 +1122,10 @@ void addAliasLock(void * ptr, int lock) {
  *            d + status + corenum (size is always 3 * sizeof(int))
  *            status: 0 -- stall; 1 -- busy
  * TerminateMsg: e (size is always 1 * sizeof(int)
+ * MemoryMsg: f + size + corenum (size is always 3 * sizeof(int))
+ *           10 + base_va + size (size is always 3 * sizeof(int))
  */
 
-#ifdef PROFILE
-// output the profiling data
-void outputProfileData() {
-#ifdef USEIO
-  FILE * fp;
-  char fn[50];
-  int self_y, self_x;
-  char c_y, c_x;
-  int i;
-  int totaltasktime = 0;
-  int preprocessingtime = 0;
-  int objqueuecheckingtime = 0;
-  int postprocessingtime = 0;
-  //int interruptiontime = 0;
-  int other = 0;
-  int averagetasktime = 0;
-  int tasknum = 0;
-
-  for(i = 0; i < 50; i++) {
-    fn[i] = 0;
-  }
-
-  calCoords(corenum, &self_y, &self_x);
-  c_y = (char)self_y + '0';
-  c_x = (char)self_x + '0';
-  strcat(fn, "profile_");
-  strcat(fn, &c_x);
-  strcat(fn, "_");
-  strcat(fn, &c_y);
-  strcat(fn, ".rst");
-
-  if((fp = fopen(fn, "w+")) == NULL) {
-    fprintf(stderr, "fopen error\n");
-    return;
-  }
-
-  fprintf(fp, "Task Name, Start Time, End Time, Duration, Exit Index(, NewObj Name, Num)+\n");
-  // output task related info
-  for(i = 0; i < taskInfoIndex; i++) {
-    TaskInfo* tmpTInfo = taskInfoArray[i];
-    int duration = tmpTInfo->endTime - tmpTInfo->startTime;
-    fprintf(fp, "%s, %d, %d, %d, %d", tmpTInfo->taskName, tmpTInfo->startTime, tmpTInfo->endTime, duration, tmpTInfo->exitIndex);
-       // summarize new obj info
-       if(tmpTInfo->newObjs != NULL) {
-               struct RuntimeHash * nobjtbl = allocateRuntimeHash(5);
-               struct RuntimeIterator * iter = NULL;
-               while(0 == isEmpty(tmpTInfo->newObjs)) {
-                       char * objtype = (char *)(getItem(tmpTInfo->newObjs));
-                       if(RuntimeHashcontainskey(nobjtbl, (int)(objtype))) {
-                               int num = 0;
-                               RuntimeHashget(nobjtbl, (int)objtype, &num);
-                               RuntimeHashremovekey(nobjtbl, (int)objtype);
-                               num++;
-                               RuntimeHashadd(nobjtbl, (int)objtype, num);
-                       } else {
-                               RuntimeHashadd(nobjtbl, (int)objtype, 1);
-                       }
-                       //fprintf(stderr, "new obj!\n");
-               }
-
-               // output all new obj info
-               iter = RuntimeHashcreateiterator(nobjtbl);
-               while(RunhasNext(iter)) {
-                       char * objtype = (char *)Runkey(iter);
-                       int num = Runnext(iter);
-                       fprintf(fp, ", %s, %d", objtype, num);
-               }
-       }
-       fprintf(fp, "\n");
-    if(strcmp(tmpTInfo->taskName, "tpd checking") == 0) {
-      preprocessingtime += duration;
-    } else if(strcmp(tmpTInfo->taskName, "post task execution") == 0) {
-      postprocessingtime += duration;
-    } else if(strcmp(tmpTInfo->taskName, "objqueue checking") == 0) {
-      objqueuecheckingtime += duration;
-    } else {
-      totaltasktime += duration;
-      averagetasktime += duration;
-      tasknum++;
-    }
-  }
-
-  if(taskInfoOverflow) {
-    fprintf(stderr, "Caution: task info overflow!\n");
-  }
-
-  other = totalexetime - totaltasktime - preprocessingtime - postprocessingtime;
-  averagetasktime /= tasknum;
-
-  fprintf(fp, "\nTotal time: %d\n", totalexetime);
-  fprintf(fp, "Total task execution time: %d (%f%%)\n", totaltasktime, ((double)totaltasktime/(double)totalexetime)*100);
-  fprintf(fp, "Total objqueue checking time: %d (%f%%)\n", objqueuecheckingtime, ((double)objqueuecheckingtime/(double)totalexetime)*100);
-  fprintf(fp, "Total pre-processing time: %d (%f%%)\n", preprocessingtime, ((double)preprocessingtime/(double)totalexetime)*100);
-  fprintf(fp, "Total post-processing time: %d (%f%%)\n", postprocessingtime, ((double)postprocessingtime/(double)totalexetime)*100);
-  fprintf(fp, "Other time: %d (%f%%)\n", other, ((double)other/(double)totalexetime)*100);
-
-  fprintf(fp, "\nAverage task execution time: %d\n", averagetasktime);
-
-  fclose(fp);
-#else
-  int i = 0;
-  int j = 0;
-
-  BAMBOO_DEBUGPRINT(0xdddd);
-  // output task related info
-  for(i= 0; i < taskInfoIndex; i++) {
-    TaskInfo* tmpTInfo = taskInfoArray[i];
-    char* tmpName = tmpTInfo->taskName;
-    int nameLen = strlen(tmpName);
-    BAMBOO_DEBUGPRINT(0xddda);
-    for(j = 0; j < nameLen; j++) {
-      BAMBOO_DEBUGPRINT_REG(tmpName[j]);
-    }
-    BAMBOO_DEBUGPRINT(0xdddb);
-    BAMBOO_DEBUGPRINT_REG(tmpTInfo->startTime);
-    BAMBOO_DEBUGPRINT_REG(tmpTInfo->endTime);
-       BAMBOO_DEBUGPRINT_REG(tmpTInfo->exitIndex);
-       if(tmpTInfo->newObjs != NULL) {
-               struct RuntimeHash * nobjtbl = allocateRuntimeHash(5);
-               struct RuntimeIterator * iter = NULL;
-               while(0 == isEmpty(tmpTInfo->newObjs)) {
-                       char * objtype = (char *)(getItem(tmpTInfo->newObjs));
-                       if(RuntimeHashcontainskey(nobjtbl, (int)(objtype))) {
-                               int num = 0;
-                               RuntimeHashget(nobjtbl, (int)objtype, &num);
-                               RuntimeHashremovekey(nobjtbl, (int)objtype);
-                               num++;
-                               RuntimeHashadd(nobjtbl, (int)objtype, num);
-                       } else {
-                               RuntimeHashadd(nobjtbl, (int)objtype, 1);
-                       }
-               }
-
-               // ouput all new obj info
-               iter = RuntimeHashcreateiterator(nobjtbl);
-               while(RunhasNext(iter)) {
-                       char * objtype = (char *)Runkey(iter);
-                       int num = Runnext(iter);
-                       int nameLen = strlen(objtype);
-                       BAMBOO_DEBUGPRINT(0xddda);
-                       for(j = 0; j < nameLen; j++) {
-                               BAMBOO_DEBUGPRINT_REG(objtype[j]);
-                       }
-                       BAMBOO_DEBUGPRINT(0xdddb);
-                       BAMBOO_DEBUGPRINT_REG(num);
-               }
-       }
-    BAMBOO_DEBUGPRINT(0xdddc);
-  }
-
-  if(taskInfoOverflow) {
-    BAMBOO_DEBUGPRINT(0xefee);
-  }
-
-  // output interrupt related info
-  /*for(i = 0; i < interruptInfoIndex; i++) {
-       InterruptInfo* tmpIInfo = interruptInfoArray[i];
-       BAMBOO_DEBUGPRINT(0xddde);
-       BAMBOO_DEBUGPRINT_REG(tmpIInfo->startTime);
-       BAMBOO_DEBUGPRINT_REG(tmpIInfo->endTime);
-       BAMBOO_DEBUGPRINT(0xdddf);
-     }
-
-     if(interruptInfoOverflow) {
-       BAMBOO_DEBUGPRINT(0xefef);
-     }*/
-
-  BAMBOO_DEBUGPRINT(0xeeee);
-#endif
-}
-
-inline void setTaskExitIndex(int index) {
-       taskInfoArray[taskInfoIndex]->exitIndex = index;
-}
-
-inline void addNewObjInfo(void * nobj) {
-       if(taskInfoArray[taskInfoIndex]->newObjs == NULL) {
-               taskInfoArray[taskInfoIndex]->newObjs = createQueue();
-       }
-       addNewItem(taskInfoArray[taskInfoIndex]->newObjs, nobj);
-}
-#endif
-
 /* this function is to process lock requests. 
  * can only be invoked in receiveObject() */
 // if return -1: the lock request is redirected
@@ -1353,18 +1200,19 @@ msg:
       }
       // check if there is an existing duplicate item
       {
-                 struct QueueItem * qitem = getTail(&objqueue);
+                 struct QueueItem * qitem = getHead(&objqueue);
                  struct QueueItem * prev = NULL;
                  while(qitem != NULL) {
                          struct transObjInfo * tmpinfo = (struct transObjInfo *)(qitem->objectptr);
                          if(tmpinfo->objptr == transObj->objptr) {
                                  // the same object, remove outdate one
                                  removeItem(&objqueue, qitem);
+                                 //break;
                          } else {
                                  prev = qitem;
                          }
                          if(prev == NULL) {
-                                 qitem = getTail(&objqueue);
+                                 qitem = getHead(&objqueue);
                          } else {
                                  qitem = getNextQueueItem(prev);
                          }
@@ -1749,6 +1597,63 @@ msg:
                                  BAMBOO_EXIT(0);
          break;
        }
+
+       case 0xf: {
+         // receive a shared memory request msg
+         if(corenum != STARTUPCORE) {
+                 // wrong core to receive such msg
+#ifndef TILERA
+                 BAMBOO_DEBUGPRINT_REG(msgdata[2]);
+#endif
+                 BAMBOO_EXIT(0xa015);
+      } else {
+#ifdef DEBUG
+#ifndef TILERA
+                 BAMBOO_DEBUGPRINT(0xe88a);
+#endif
+#endif
+                 struct bamboo_shared_mem * cur_mem = bamboo_free_msps->head;
+                 void * mem = mspace_calloc(cur_mem->msp, 1, msgdata[1]);
+                 struct bamboo_shared_mem * failmem = cur_mem;
+                 while(mem == NULL) {
+                         // move current head to the tail
+                         bamboo_free_msps->tail->next = cur_mem;
+                         bamboo_free_msps->tail = cur_mem;
+                         bamboo_free_msps->head = cur_mem->next;
+                         cur_mem->next = NULL;
+                         cur_mem = bamboo_free_msps->head;
+                         if(cur_mem == failmem) {
+                                 BAMBOO_EXIT(0xa016);
+                         }
+                         mem = mspace_calloc(cur_mem->msp, 1, msgdata[1]);
+                 }
+                 // send the start_va to request core
+                if(isMsgSending) {
+                         cache_msg_3(msgdata[2], 0x10, mem, msgdata[1]);
+                 } else {
+                         send_msg_3(msgdata[2], 0x10, mem, msgdata[1]);
+                 } 
+      }
+         break;
+       }
+
+       case 0x10: {
+      // receive a shared memory response msg
+#ifdef DEBUG
+#ifndef TILERA
+         BAMBOO_DEBUGPRINT(0xe88b);
+#endif
+#endif
+         if(msgdata[2] == 0) {
+                 bamboo_smem_size = 0;
+                 bamboo_cur_msp = NULL;
+         } else {
+                 bamboo_smem_size = msgdata[2];
+                 bamboo_cur_msp = create_mspace_with_base((void*)msgdata[1], msgdata[2], 0);
+         }
+         smemflag = true;
+         break;
+    }
        
     default:
       break;
@@ -1760,7 +1665,7 @@ msg:
     msglength = 30;
 #ifdef DEBUG
 #ifndef TILERA
-    BAMBOO_DEBUGPRINT(0xe88a);
+    BAMBOO_DEBUGPRINT(0xe88c);
 #endif
 #endif
 
@@ -1777,7 +1682,7 @@ msg:
     // not a whole msg
 #ifdef DEBUG
 #ifndef TILERA
-    BAMBOO_DEBUGPRINT(0xe88b);
+    BAMBOO_DEBUGPRINT(0xe88d);
 #endif
 #endif
 #ifdef PROFILE
@@ -1803,7 +1708,7 @@ int processlockrequest(int locktype, int lock, int obj, int requestcore, int roo
          BAMBOO_DEBUGPRINT_REG(lock);
          BAMBOO_DEBUGPRINT_REG(corenum);
 #endif
-         BAMBOO_EXIT(0xa015);
+         BAMBOO_EXIT(0xa017);
   }
   /*if((corenum == STARTUPCORE) && waitconfirm) {
          waitconfirm = false;
@@ -1926,7 +1831,7 @@ bool getreadlock(void * ptr) {
 #endif
                } else {
                        // conflicts on lockresults
-                       BAMBOO_EXIT(0xa016);
+                       BAMBOO_EXIT(0xa018);
                }
        }
     return true;
@@ -1956,7 +1861,7 @@ void releasereadlock(void * ptr) {
     // reside on this core
     if(!RuntimeHashcontainskey(locktbl, reallock)) {
       // no locks for this object, something is wrong
-      BAMBOO_EXIT(0xa017);
+      BAMBOO_EXIT(0xa019);
     } else {
       int rwlock_obj = 0;
          struct LockValue * lockvalue = NULL;
@@ -2012,7 +1917,7 @@ bool getreadlock_I_r(void * ptr, void * redirectlock, int core, bool cache) {
 #endif
                        } else {
                                // conflicts on lockresults
-                               BAMBOO_EXIT(0xa018);
+                               BAMBOO_EXIT(0xa01a);
                        }
                        return true;
                } else {
@@ -2096,7 +2001,7 @@ bool getwritelock(void * ptr) {
 #endif
                } else {
                        // conflicts on lockresults
-                       BAMBOO_EXIT(0xa019);
+                       BAMBOO_EXIT(0xa01b);
                }
        }
     return true;
@@ -2133,7 +2038,7 @@ void releasewritelock(void * ptr) {
     // reside on this core
     if(!RuntimeHashcontainskey(locktbl, reallock)) {
       // no locks for this object, something is wrong
-      BAMBOO_EXIT(0xa01a);
+      BAMBOO_EXIT(0xa01c);
     } else {
       int rwlock_obj = 0;
          struct LockValue * lockvalue = NULL;
@@ -2173,7 +2078,7 @@ void releasewritelock_r(void * lock, void * redirectlock) {
     // reside on this core
     if(!RuntimeHashcontainskey(locktbl, reallock)) {
       // no locks for this object, something is wrong
-      BAMBOO_EXIT(0xa01b);
+      BAMBOO_EXIT(0xa01d);
     } else {
       int rwlock_obj = 0;
          struct LockValue * lockvalue = NULL;
@@ -2250,7 +2155,7 @@ bool getwritelock_I(void * ptr) {
 #endif
                } else {
                        // conflicts on lockresults
-                       BAMBOO_EXIT(0xa01c);
+                       BAMBOO_EXIT(0xa01e);
                }
                return true;
        }
@@ -2308,7 +2213,7 @@ bool getwritelock_I_r(void * ptr, void * redirectlock, int core, bool cache) {
 #endif
                        } else {
                                // conflicts on lockresults
-                               BAMBOO_EXIT(0xa01d);
+                               BAMBOO_EXIT(0xa01f);
                        }
                        return true;
                } else {
@@ -2354,7 +2259,7 @@ void releasewritelock_I(void * ptr) {
     // reside on this core
     if(!RuntimeHashcontainskey(locktbl, reallock)) {
       // no locks for this object, something is wrong
-      BAMBOO_EXIT(0xa01e);
+      BAMBOO_EXIT(0xa020);
     } else {
       int rwlock_obj = 0;
          struct LockValue * lockvalue = NULL;
@@ -2386,7 +2291,7 @@ void releasewritelock_I_r(void * lock, void * redirectlock) {
     // reside on this core
     if(!RuntimeHashcontainskey(locktbl, reallock)) {
       // no locks for this object, something is wrong
-      BAMBOO_EXIT(0xa01f);
+      BAMBOO_EXIT(0xa021);
     } else {
       int rwlock_obj = 0;
          struct LockValue * lockvalue = NULL;
@@ -2973,7 +2878,7 @@ parameterpresent:
             reverse=NULL;
 #endif  // #if 0: for recovery
          BAMBOO_DEBUGPRINT_REG(x);
-         BAMBOO_EXIT(0xa020);
+         BAMBOO_EXIT(0xa022);
        } else {
 #endif // #ifndef MULTICORE
 #if 0