4 #include "structdefs.h"
6 #include "checkpoint.h"
8 #include "SimpleHash.h"
9 #include "GenericHashtable.h"
10 #include <sys/select.h>
11 #include <sys/types.h>
20 #include <raw_compiler_defs.h>
21 //#include <libints.h>
22 #elif defined THREADSIMULATE
23 // use POSIX message queue
24 // for each core, its message queue named as
30 extern int injectfailures;
31 extern float failurechance;
37 #define TOTALCORE raw_get_num_tiles()
41 #include "instrument.h"
44 struct genhashtable * activetasks;
45 struct genhashtable * failedtasks;
46 struct taskparamdescriptor * currtpd;
48 struct RuntimeHash * forward;
49 struct RuntimeHash * reverse;
52 int corestatus[NUMCORES]; // records status of each core
55 int numsendobjs[NUMCORES]; // records how many objects a core has sent out
56 int numreceiveobjs[NUMCORES]; // records how many objects a core has received
58 struct RuntimeHash locktable;
59 static struct RuntimeHash* locktbl = &locktable;
60 void * curr_heapbase=0;
61 void * curr_heaptop=0;
63 int self_numreceiveobjs;
70 struct Queue objqueue;
75 void calCoords(int core_num, int* coordY, int* coordX);
77 #elif defined THREADSIMULATE
78 static struct RuntimeHash* locktbl;
86 struct thread_data thread_data_array[NUMCORES];
88 static pthread_key_t key;
89 static pthread_rwlock_t rwlock_tbl;
90 static pthread_rwlock_t rwlock_init;
95 bool transStallMsg(int targetcore);
96 void transTerminateMsg(int targetcore);
98 bool getreadlock(void* ptr);
99 void releasereadlock(void* ptr);
101 bool getreadlock_I(void* ptr);
102 void releasereadlock_I(void* ptr);
104 bool getwritelock(void* ptr);
105 void releasewritelock(void* ptr);
109 void flushAll(void) {
113 raw_user_interrupts_off();
115 raw_test_pass(0xec00);
116 for(i = 0; i < 512; ++i) {
119 flushCacheline(base);
120 flushCacheline(base|off1);
123 raw_user_interrupts_on();
125 raw_test_pass(0xec02);
131 raw_test_pass(0xefee);
132 raw_user_interrupts_off();
133 raw_test_pass(0xef00);
135 raw_test_pass(0xefff);
136 raw_user_interrupts_on();
137 raw_test_pass(0xefef);
142 int main(int argc, char **argv) {
148 bool sendStall = false;
150 bool tocontinue = false;
151 struct QueueItem * objitem = NULL;
152 struct transObjInfo * objInfo = NULL;
154 bool allStall = true;
157 raw_test_pass_reg(&locktable);
158 raw_test_pass_reg(&msglength);
159 raw_test_pass_reg(&i);
160 raw_test_pass(0xee01);
161 corenum = raw_get_abs_pos_x() + 4 * raw_get_abs_pos_y();
163 // initialize the arrays
164 if(STARTUPCORE == corenum) {
165 // startup core to initialize corestatus[]
166 for(i = 0; i < NUMCORES; ++i) {
168 numsendobjs[i] = 0; // assume all variables in RAW are local variables! MAY BE WRONG!!!
169 numreceiveobjs[i] = 0;
172 self_numsendobjs = 0;
173 self_numreceiveobjs = 0;
174 for(i = 0; i < 30; ++i) {
181 raw_test_pass(0xee02);
183 // create the lock table, lockresult table and obj queue
185 locktable.bucket = (struct RuntimeNode **) RUNMALLOC_I(sizeof(struct RuntimeNode *)*20);
186 /* Set allocation blocks*/
187 locktable.listhead=NULL;
188 locktable.listtail=NULL;
190 locktable.numelements = 0;
197 objqueue.head = NULL;
198 objqueue.tail = NULL;
199 raw_test_pass(0xee03);
202 if (corenum < NUMCORES) {
205 //setup_interrupts();
206 //start_gdn_avail_ints(recvMsg);
207 raw_user_interrupts_on();
208 raw_test_pass(0xee04);
212 #elif defined THREADSIMULATE
216 pthread_t threads[NUMCORES];
219 // initialize three arrays and msg queue array
220 char * pathhead = "/msgqueue_";
221 int targetlen = strlen(pathhead);
222 for(i = 0; i < NUMCORES; ++i) {
225 numreceiveobjs[i] = 0;
230 corenumstr[0] = i + '0';
231 corenumstr[1] = '\0';
234 corenumstr[1] = i %10 + '0';
235 corenumstr[0] = (i / 10) + '0';
236 corenumstr[2] = '\0';
239 printf("Error: i >= 100\n");
243 char path[targetlen + sourcelen + 1];
244 strcpy(path, pathhead);
245 strncat(path, corenumstr, sourcelen);
246 int oflags = O_RDONLY|O_CREAT|O_NONBLOCK;
247 int omodes = S_IRWXU|S_IRWXG|S_IRWXO;
249 mqd[i]= mq_open(path, oflags, omodes, NULL);
251 printf("[Main] mq_open %s fails: %d, error: %s\n", path, mqd[i], strerror(errno));
254 printf("[Main] mq_open %s returns: %d\n", path, mqd[i]);
259 pthread_key_create(&key, NULL);
261 // create the lock table and initialize its mutex
262 locktbl = allocateRuntimeHash(20);
263 int rc_locktbl = pthread_rwlock_init(&rwlock_tbl, NULL);
264 printf("[Main] initialize the rwlock for lock table: %d error: \n", rc_locktbl, strerror(rc_locktbl));
266 for(i = 0; i < NUMCORES; ++i) {
267 thread_data_array[i].corenum = i;
268 thread_data_array[i].argc = argc;
269 thread_data_array[i].argv = argv;
270 thread_data_array[i].numsendobjs = 0;
271 thread_data_array[i].numreceiveobjs = 0;
272 printf("[main] creating thread %d\n", i);
273 rc[i] = pthread_create(&threads[i], NULL, run, (void *)&thread_data_array[i]);
275 printf("[main] ERROR; return code from pthread_create() is %d\n", rc[i]);
281 //pthread_exit(NULL);
285 void run(void* arg) {
286 struct thread_data * my_tdata = (struct thread_data *)arg;
287 pthread_setspecific(key, (void *)my_tdata->corenum);
288 int argc = my_tdata->argc;
289 char** argv = my_tdata->argv;
290 printf("[run, %d] Thread %d runs: %x\n", my_tdata->corenum, my_tdata->corenum, (int)pthread_self());
296 GC_init(); // Initialize the garbage collector
304 initializeexithandler();
306 raw_test_pass(0xee05);
308 /* Create table for failed tasks */
310 if(corenum > NUMCORES - 1) {
317 raw_test_pass(0xee06);
319 /*failedtasks=genallocatehashtable((unsigned int (*)(void *)) &hashCodetpd,
320 (int (*)(void *,void *)) &comparetpd);*/
323 raw_test_pass(0xee07);
325 /* Create queue of active tasks */
326 activetasks=genallocatehashtable((unsigned int (*)(void *)) &hashCodetpd,
327 (int (*)(void *,void *)) &comparetpd);
329 raw_test_pass(0xee08);
332 /* Process task information */
335 raw_test_pass(0xee09);
338 /* Create startup object */
339 createstartupobject(argc, argv);
341 raw_test_pass(0xee0a);
345 raw_test_pass(0xee0b);
349 while(receiveObject() != -1) {
353 // check if there are new active tasks can be executed
357 while(receiveObject() != -1) {
361 raw_test_pass(0xee0c);
363 // check if there are some pending objects, if yes, enqueue them and executetasks again
365 raw_test_pass(0xee0d);
366 while(!isEmpty(&objqueue)) {
369 raw_user_interrupts_off();
371 raw_test_pass(0xeee1);
374 objitem = getTail(&objqueue);
375 //obj = objitem->objectptr;
376 objInfo = (struct transObjInfo *)objitem->objectptr;
377 obj = objInfo->objptr;
378 raw_test_pass_reg((int)obj);
379 // grab lock and flush the obj
385 raw_test_pass_reg(grount);
397 for(k = 0; k < classsize[((struct ___Object___ *)obj)->type]; ++k) {
398 invalidateAddr(obj + k);
400 // enqueue the object
401 for(k = 0; k < objInfo->length; ++k) {
402 int taskindex = objInfo->queues[2 * k];
403 int paramindex = objInfo->queues[2 * k + 1];
404 struct parameterwrapper ** queues = &(paramqueues[corenum][taskindex][paramindex]);
405 raw_test_pass_reg(taskindex);
406 raw_test_pass_reg(paramindex);
407 enqueueObject_I(obj, queues, 1);
409 removeItem(&objqueue, objitem);
410 releasereadlock_I(obj);
411 RUNFREE(objInfo->queues);
413 /*enqueueObject_I(obj, NULL, 0);
414 removeItem(&objqueue, objitem);
415 releasereadlock_I(obj);*/
418 // put it at the end of the queue
419 // and try to execute active tasks already enqueued first
420 removeItem(&objqueue, objitem);
421 addNewItem_I(&objqueue, objInfo);
425 raw_user_interrupts_on();
427 raw_test_pass(0xee0e);
432 if(STARTUPCORE == corenum) {
434 raw_test_pass(0xee0f);
438 raw_user_interrupts_off();
440 corestatus[corenum] = 0;
441 numsendobjs[corenum] = self_numsendobjs;
442 numreceiveobjs[corenum] = self_numreceiveobjs;
443 // check the status of all cores
445 raw_test_pass_reg(NUMCORES);
446 for(i = 0; i < NUMCORES; ++i) {
447 raw_test_pass(0xe000 + corestatus[i]);
448 if(corestatus[i] != 0) {
454 // check if the sum of send objs and receive obj are the same
456 // no->go on executing
458 for(i = 0; i < NUMCORES; ++i) {
459 sumsendobj += numsendobjs[i];
460 raw_test_pass(0xf000 + numsendobjs[i]);
462 for(i = 0; i < NUMCORES; ++i) {
463 sumsendobj -= numreceiveobjs[i];
464 raw_test_pass(0xf000 + numreceiveobjs[i]);
466 if(0 == sumsendobj) {
468 raw_test_pass(0xee10);
469 raw_test_done(1); // All done.
473 raw_user_interrupts_on();
477 raw_test_pass(0xee11);
479 // wait for some time
481 raw_test_pass(0xee12);
484 raw_test_pass(0xee13);
486 // send StallMsg to startup core
487 raw_test_pass(0xee14);
488 sendStall = transStallMsg(STARTUPCORE);
493 raw_test_pass(0xee15);
499 #elif defined THREADSIMULATE
500 /* Start executing the tasks */
504 // check if there are new objects coming
505 bool sendStall = false;
507 int numofcore = pthread_getspecific(key);
509 switch(receiveObject()) {
511 printf("[run, %d] receive an object\n", numofcore);
513 // received an object
514 // check if there are new active tasks can be executed
519 //printf("[run, %d] no msg\n", numofcore);
521 if(STARTUPCORE == numofcore) {
522 corestatus[numofcore] = 0;
523 // check the status of all cores
524 bool allStall = true;
525 for(i = 0; i < NUMCORES; ++i) {
526 if(corestatus[i] != 0) {
532 // check if the sum of send objs and receive obj are the same
534 // no->go on executing
536 for(i = 0; i < NUMCORES; ++i) {
537 sumsendobj += numsendobjs[i];
539 for(i = 0; i < NUMCORES; ++i) {
540 sumsendobj -= numreceiveobjs[i];
542 if(0 == sumsendobj) {
546 int rc_tbl = pthread_rwlock_wrlock(&rwlock_tbl);
547 printf("[run, %d] getting the write lock for locktbl: %d error: \n", numofcore, rc_tbl, strerror(rc_tbl));
548 struct RuntimeIterator* it_lock = RuntimeHashcreateiterator(locktbl);
549 while(0 != RunhasNext(it_lock)) {
550 int key = Runkey(it_lock);
551 pthread_rwlock_t* rwlock_obj = (pthread_rwlock_t*)Runnext(it_lock);
552 int rc_des = pthread_rwlock_destroy(rwlock_obj);
553 printf("[run, %d] destroy the rwlock for object: %d error: \n", numofcore, key, strerror(rc_des));
556 freeRuntimeHash(locktbl);
560 // destroy all message queues
561 char * pathhead = "/msgqueue_";
562 int targetlen = strlen(pathhead);
563 for(i = 0; i < NUMCORES; ++i) {
567 corenumstr[0] = i + '0';
568 corenumstr[1] = '\0';
571 corenumstr[1] = i %10 + '0';
572 corenumstr[0] = (i / 10) + '0';
573 corenumstr[2] = '\0';
576 printf("Error: i >= 100\n");
580 char path[targetlen + sourcelen + 1];
581 strcpy(path, pathhead);
582 strncat(path, corenumstr, sourcelen);
586 printf("[run, %d] terminate!\n", numofcore);
593 // send StallMsg to startup core
594 sendStall = transStallMsg(STARTUPCORE);
600 printf("[run, %d] receive a stall msg\n", numofcore);
601 // receive a Stall Msg, do nothing
602 assert(STARTUPCORE == numofcore); // only startup core can receive such msg
607 printf("[run, %d] receive a terminate msg\n", numofcore);
608 // receive a terminate Msg
609 assert(STARTUPCORE != corenum); // only non-startup core can receive such msg
610 mq_close(mqd[corenum]);
616 printf("[run, %d] Error: invalid message type.\n", numofcore);
626 void createstartupobject(int argc, char ** argv) {
629 /* Allocate startup object */
631 struct ___StartupObject___ *startupobject=(struct ___StartupObject___*) allocate_new(NULL, STARTUPTYPE);
632 struct ArrayObject * stringarray=allocate_newarray(NULL, STRINGARRAYTYPE, argc-1);
634 struct ___StartupObject___ *startupobject=(struct ___StartupObject___*) allocate_new(STARTUPTYPE);
635 struct ArrayObject * stringarray=allocate_newarray(STRINGARRAYTYPE, argc-1);
637 /* Build array of strings */
638 startupobject->___parameters___=stringarray;
639 for(i=1;i<argc;i++) {
640 int length=strlen(argv[i]);
642 struct ___String___ *newstring=NewString(NULL, argv[i],length);
644 struct ___String___ *newstring=NewString(argv[i],length);
646 ((void **)(((char *)& stringarray->___length___)+sizeof(int)))[i-1]=newstring;
649 startupobject->isolate = 1;
650 startupobject->version = 0;
652 /* Set initialized flag for startup object */
653 flagorandinit(startupobject,1,0xFFFFFFFF);
654 enqueueObject(startupobject, NULL, 0);
660 int hashCodetpd(struct taskparamdescriptor *ftd) {
661 int hash=(int)ftd->task;
663 for(i=0;i<ftd->numParameters;i++){
664 hash^=(int)ftd->parameterArray[i];
669 int comparetpd(struct taskparamdescriptor *ftd1, struct taskparamdescriptor *ftd2) {
671 if (ftd1->task!=ftd2->task)
673 for(i=0;i<ftd1->numParameters;i++)
674 if(ftd1->parameterArray[i]!=ftd2->parameterArray[i])
679 /* This function sets a tag. */
681 void tagset(void *ptr, struct ___Object___ * obj, struct ___TagDescriptor___ * tagd) {
683 void tagset(struct ___Object___ * obj, struct ___TagDescriptor___ * tagd) {
685 struct ArrayObject * ao=NULL;
686 struct ___Object___ * tagptr=obj->___tags___;
688 raw_test_pass(0xebb0);
692 raw_test_pass(0xebb1);
694 obj->___tags___=(struct ___Object___ *)tagd;
696 /* Have to check if it is already set */
697 if (tagptr->type==TAGTYPE) {
698 struct ___TagDescriptor___ * td=(struct ___TagDescriptor___ *) tagptr;
700 raw_test_pass(0xebb2);
704 raw_test_pass(0xebb3);
709 int ptrarray[]={2, (int) ptr, (int) obj, (int)tagd};
710 struct ArrayObject * ao=allocate_newarray(&ptrarray,TAGARRAYTYPE,TAGARRAYINTERVAL);
711 obj=(struct ___Object___ *)ptrarray[2];
712 tagd=(struct ___TagDescriptor___ *)ptrarray[3];
713 td=(struct ___TagDescriptor___ *) obj->___tags___;
716 raw_test_pass(0xebb4);
718 ao=allocate_newarray(TAGARRAYTYPE,TAGARRAYINTERVAL);
721 raw_test_pass(0xebb5);
723 ARRAYSET(ao, struct ___TagDescriptor___ *, 0, td);
724 ARRAYSET(ao, struct ___TagDescriptor___ *, 1, tagd);
725 obj->___tags___=(struct ___Object___ *) ao;
726 ao->___cachedCode___=2;
728 raw_test_pass(0xebb6);
733 struct ArrayObject *ao=(struct ArrayObject *) tagptr;
735 raw_test_pass(0xebb7);
737 for(i=0;i<ao->___cachedCode___;i++) {
738 struct ___TagDescriptor___ * td=ARRAYGET(ao, struct ___TagDescriptor___*, i);
740 raw_test_pass(0xebb8);
744 raw_test_pass(0xebb9);
749 if (ao->___cachedCode___<ao->___length___) {
751 raw_test_pass(0xebba);
753 ARRAYSET(ao, struct ___TagDescriptor___ *, ao->___cachedCode___, tagd);
754 ao->___cachedCode___++;
756 raw_test_pass(0xebbb);
760 int ptrarray[]={2,(int) ptr, (int) obj, (int) tagd};
761 struct ArrayObject * aonew=allocate_newarray(&ptrarray,TAGARRAYTYPE,TAGARRAYINTERVAL+ao->___length___);
762 obj=(struct ___Object___ *)ptrarray[2];
763 tagd=(struct ___TagDescriptor___ *) ptrarray[3];
764 ao=(struct ArrayObject *)obj->___tags___;
766 struct ArrayObject * aonew=allocate_newarray(TAGARRAYTYPE,TAGARRAYINTERVAL+ao->___length___);
769 raw_test_pass(0xebbc);
771 aonew->___cachedCode___=ao->___length___+1;
772 for(i=0;i<ao->___length___;i++) {
774 raw_test_pass(0xebbd);
776 ARRAYSET(aonew, struct ___TagDescriptor___*, i, ARRAYGET(ao, struct ___TagDescriptor___*, i));
779 raw_test_pass(0xebbe);
781 ARRAYSET(aonew, struct ___TagDescriptor___ *, ao->___length___, tagd);
783 raw_test_pass(0xebbf);
790 struct ___Object___ * tagset=tagd->flagptr;
792 raw_test_pass(0xb008);
796 raw_test_pass(0xb009);
799 } else if (tagset->type!=OBJECTARRAYTYPE) {
801 int ptrarray[]={2, (int) ptr, (int) obj, (int)tagd};
802 struct ArrayObject * ao=allocate_newarray(&ptrarray,OBJECTARRAYTYPE,OBJECTARRAYINTERVAL);
803 obj=(struct ___Object___ *)ptrarray[2];
804 tagd=(struct ___TagDescriptor___ *)ptrarray[3];
806 struct ArrayObject * ao=allocate_newarray(OBJECTARRAYTYPE,OBJECTARRAYINTERVAL);
808 ARRAYSET(ao, struct ___Object___ *, 0, tagd->flagptr);
809 ARRAYSET(ao, struct ___Object___ *, 1, obj);
810 ao->___cachedCode___=2;
811 tagd->flagptr=(struct ___Object___ *)ao;
813 raw_test_pass(0xb00a);
816 struct ArrayObject *ao=(struct ArrayObject *) tagset;
817 if (ao->___cachedCode___<ao->___length___) {
819 raw_test_pass(0xb00b);
821 ARRAYSET(ao, struct ___Object___*, ao->___cachedCode___++, obj);
825 int ptrarray[]={2, (int) ptr, (int) obj, (int)tagd};
826 struct ArrayObject * aonew=allocate_newarray(&ptrarray,OBJECTARRAYTYPE,OBJECTARRAYINTERVAL+ao->___length___);
827 obj=(struct ___Object___ *)ptrarray[2];
828 tagd=(struct ___TagDescriptor___ *)ptrarray[3];
829 ao=(struct ArrayObject *)tagd->flagptr;
831 struct ArrayObject * aonew=allocate_newarray(OBJECTARRAYTYPE,OBJECTARRAYINTERVAL);
833 aonew->___cachedCode___=ao->___cachedCode___+1;
834 for(i=0;i<ao->___length___;i++) {
835 ARRAYSET(aonew, struct ___Object___*, i, ARRAYGET(ao, struct ___Object___*, i));
837 ARRAYSET(aonew, struct ___Object___ *, ao->___cachedCode___, obj);
838 tagd->flagptr=(struct ___Object___ *) aonew;
840 raw_test_pass(0xb00c);
847 /* This function clears a tag. */
849 void tagclear(void *ptr, struct ___Object___ * obj, struct ___TagDescriptor___ * tagd) {
851 void tagclear(struct ___Object___ * obj, struct ___TagDescriptor___ * tagd) {
853 /* We'll assume that tag is alway there.
854 Need to statically check for this of course. */
855 struct ___Object___ * tagptr=obj->___tags___;
857 if (tagptr->type==TAGTYPE) {
858 if ((struct ___TagDescriptor___ *)tagptr==tagd)
859 obj->___tags___=NULL;
862 printf("ERROR 1 in tagclear\n");
866 struct ArrayObject *ao=(struct ArrayObject *) tagptr;
868 for(i=0;i<ao->___cachedCode___;i++) {
869 struct ___TagDescriptor___ * td=ARRAYGET(ao, struct ___TagDescriptor___ *, i);
871 ao->___cachedCode___--;
872 if (i<ao->___cachedCode___)
873 ARRAYSET(ao, struct ___TagDescriptor___ *, i, ARRAYGET(ao, struct ___TagDescriptor___ *, ao->___cachedCode___));
874 ARRAYSET(ao, struct ___TagDescriptor___ *, ao->___cachedCode___, NULL);
875 if (ao->___cachedCode___==0)
876 obj->___tags___=NULL;
881 printf("ERROR 2 in tagclear\n");
887 struct ___Object___ *tagset=tagd->flagptr;
888 if (tagset->type!=OBJECTARRAYTYPE) {
893 printf("ERROR 3 in tagclear\n");
897 struct ArrayObject *ao=(struct ArrayObject *) tagset;
899 for(i=0;i<ao->___cachedCode___;i++) {
900 struct ___Object___ * tobj=ARRAYGET(ao, struct ___Object___ *, i);
902 ao->___cachedCode___--;
903 if (i<ao->___cachedCode___)
904 ARRAYSET(ao, struct ___Object___ *, i, ARRAYGET(ao, struct ___Object___ *, ao->___cachedCode___));
905 ARRAYSET(ao, struct ___Object___ *, ao->___cachedCode___, NULL);
906 if (ao->___cachedCode___==0)
912 printf("ERROR 4 in tagclear\n");
920 /* This function allocates a new tag. */
922 struct ___TagDescriptor___ * allocate_tag(void *ptr, int index) {
923 struct ___TagDescriptor___ * v=(struct ___TagDescriptor___ *) mygcmalloc((struct garbagelist *) ptr, classsize[TAGTYPE]);
925 struct ___TagDescriptor___ * allocate_tag(int index) {
926 struct ___TagDescriptor___ * v=FREEMALLOC(classsize[TAGTYPE]);
935 /* This function updates the flag for object ptr. It or's the flag
936 with the or mask and and's it with the andmask. */
938 void flagbody(struct ___Object___ *ptr, int flag, struct parameterwrapper ** queues, int length, bool isnew);
940 int flagcomp(const int *val1, const int *val2) {
941 return (*val1)-(*val2);
944 void flagorand(void * ptr, int ormask, int andmask, struct parameterwrapper ** queues, int length) {
946 int oldflag=((int *)ptr)[1];
947 int flag=ormask|oldflag;
950 raw_test_pass(0xaa000000 + oldflag);
951 raw_test_pass(0xaa000000 + flag);
953 flagbody(ptr, flag, queues, length, false);
957 bool intflagorand(void * ptr, int ormask, int andmask) {
959 int oldflag=((int *)ptr)[1];
960 int flag=ormask|oldflag;
962 if (flag==oldflag) /* Don't do anything */
965 flagbody(ptr, flag, NULL, 0, false);
971 void flagorandinit(void * ptr, int ormask, int andmask) {
972 int oldflag=((int *)ptr)[1];
973 int flag=ormask|oldflag;
976 raw_test_pass(0xaa100000 + oldflag);
977 raw_test_pass(0xaa100000 + flag);
979 flagbody(ptr,flag,NULL,0,true);
982 void flagbody(struct ___Object___ *ptr, int flag, struct parameterwrapper ** vqueues, int vlength, bool isnew) {
983 struct parameterwrapper * flagptr = NULL;
985 struct parameterwrapper ** queues = vqueues;
986 int length = vlength;
989 int * enterflags = NULL;
990 if((!isnew) && (queues == NULL)) {
991 #ifdef THREADSIMULATE
992 int numofcore = pthread_getspecific(key);
993 queues = objectqueues[numofcore][ptr->type];
994 length = numqueues[numofcore][ptr->type];
997 if(corenum < NUMCORES) {
999 queues = objectqueues[corenum][ptr->type];
1000 length = numqueues[corenum][ptr->type];
1010 raw_test_pass(0xbb000000 + ptr->flag);
1013 /*Remove object from all queues */
1014 for(i = 0; i < length; ++i) {
1015 flagptr = queues[i];
1016 ObjectHashget(flagptr->objectset, (int) ptr, (int *) &next, (int *) &enterflags, &UNUSED, &UNUSED2);
1017 ObjectHashremove(flagptr->objectset, (int)ptr);
1018 if (enterflags!=NULL)
1019 RUNFREE(enterflags);
1023 void enqueueObject(void * vptr, struct parameterwrapper ** vqueues, int vlength) {
1024 struct ___Object___ *ptr = (struct ___Object___ *)vptr;
1027 struct QueueItem *tmpptr;
1028 struct parameterwrapper * parameter=NULL;
1031 struct parameterwrapper * prevptr=NULL;
1032 struct ___Object___ *tagptr=NULL;
1033 struct parameterwrapper ** queues = vqueues;
1034 int length = vlength;
1036 if(corenum > NUMCORES - 1) {
1040 if(queues == NULL) {
1041 #ifdef THREADSIMULATE
1042 int numofcore = pthread_getspecific(key);
1043 queues = objectqueues[numofcore][ptr->type];
1044 length = numqueues[numofcore][ptr->type];
1046 queues = objectqueues[corenum][ptr->type];
1047 length = numqueues[corenum][ptr->type];
1050 tagptr=ptr->___tags___;
1052 /* Outer loop iterates through all parameter queues an object of
1053 this type could be in. */
1054 for(j = 0; j < length; ++j) {
1055 parameter = queues[j];
1057 if (parameter->numbertags>0) {
1059 goto nextloop;//that means the object has no tag but that param needs tag
1060 else if(tagptr->type==TAGTYPE) {//one tag
1061 struct ___TagDescriptor___ * tag=(struct ___TagDescriptor___*) tagptr;
1062 for(i=0;i<parameter->numbertags;i++) {
1063 //slotid is parameter->tagarray[2*i];
1064 int tagid=parameter->tagarray[2*i+1];
1065 if (tagid!=tagptr->flag)
1066 goto nextloop; /*We don't have this tag */
1068 } else {//multiple tags
1069 struct ArrayObject * ao=(struct ArrayObject *) tagptr;
1070 for(i=0;i<parameter->numbertags;i++) {
1071 //slotid is parameter->tagarray[2*i];
1072 int tagid=parameter->tagarray[2*i+1];
1074 for(j=0;j<ao->___cachedCode___;j++) {
1075 if (tagid==ARRAYGET(ao, struct ___TagDescriptor___*, j)->flag)
1086 for(i=0;i<parameter->numberofterms;i++) {
1087 int andmask=parameter->intarray[i*2];
1088 int checkmask=parameter->intarray[i*2+1];
1089 if ((ptr->flag&andmask)==checkmask) {
1091 raw_test_pass(0xcc000000 + andmask);
1092 raw_test_pass_reg((int)ptr);
1093 raw_test_pass(0xcc000000 + ptr->flag);
1094 raw_test_pass(0xcc000000 + checkmask);
1096 enqueuetasks(parameter, prevptr, ptr, NULL, 0);
1108 void enqueueObject_I(void * vptr, struct parameterwrapper ** vqueues, int vlength) {
1109 struct ___Object___ *ptr = (struct ___Object___ *)vptr;
1112 struct QueueItem *tmpptr;
1113 struct parameterwrapper * parameter=NULL;
1116 struct parameterwrapper * prevptr=NULL;
1117 struct ___Object___ *tagptr=NULL;
1118 struct parameterwrapper ** queues = vqueues;
1119 int length = vlength;
1121 if(corenum > NUMCORES - 1) {
1125 if(queues == NULL) {
1126 #ifdef THREADSIMULATE
1127 int numofcore = pthread_getspecific(key);
1128 queues = objectqueues[numofcore][ptr->type];
1129 length = numqueues[numofcore][ptr->type];
1131 queues = objectqueues[corenum][ptr->type];
1132 length = numqueues[corenum][ptr->type];
1136 raw_test_pass(0xeaa1);
1137 raw_test_pass_reg(queues);
1138 raw_test_pass_reg(length);
1140 tagptr=ptr->___tags___;
1142 /* Outer loop iterates through all parameter queues an object of
1143 this type could be in. */
1144 for(j = 0; j < length; ++j) {
1145 parameter = queues[j];
1147 if (parameter->numbertags>0) {
1149 raw_test_pass(0xeaa2);
1150 raw_test_pass_reg(tagptr);
1153 goto nextloop;//that means the object has no tag but that param needs tag
1154 else if(tagptr->type==TAGTYPE) {//one tag
1155 struct ___TagDescriptor___ * tag=(struct ___TagDescriptor___*) tagptr;
1157 raw_test_pass(0xeaa3);
1159 for(i=0;i<parameter->numbertags;i++) {
1160 //slotid is parameter->tagarray[2*i];
1161 int tagid=parameter->tagarray[2*i+1];
1162 if (tagid!=tagptr->flag) {
1164 raw_test_pass(0xeaa4);
1166 goto nextloop; /*We don't have this tag */
1169 } else {//multiple tags
1170 struct ArrayObject * ao=(struct ArrayObject *) tagptr;
1172 raw_test_pass(0xeaa5);
1174 for(i=0;i<parameter->numbertags;i++) {
1175 //slotid is parameter->tagarray[2*i];
1176 int tagid=parameter->tagarray[2*i+1];
1178 for(j=0;j<ao->___cachedCode___;j++) {
1179 if (tagid==ARRAYGET(ao, struct ___TagDescriptor___*, j)->flag) {
1184 raw_test_pass(0xeaa6);
1194 for(i=0;i<parameter->numberofterms;i++) {
1195 int andmask=parameter->intarray[i*2];
1196 int checkmask=parameter->intarray[i*2+1];
1198 raw_test_pass(0xeaa7);
1199 raw_test_pass(0xcc000000 + andmask);
1200 raw_test_pass_reg(ptr);
1201 raw_test_pass(0xcc000000 + ptr->flag);
1202 raw_test_pass(0xcc000000 + checkmask);
1204 if ((ptr->flag&andmask)==checkmask) {
1206 raw_test_pass(0xeaa8);
1208 enqueuetasks_I(parameter, prevptr, ptr, NULL, 0);
1219 // helper function to compute the coordinates of a core from the core number
1220 void calCoords(int core_num, int* coordY, int* coordX) {
1221 *coordX = core_num % 4;
1222 *coordY = core_num / 4;
1226 /* Message format for RAW version:
1228 * type: 0 -- transfer object
1229 * 1 -- transfer stall msg
1235 * ObjMsg: 0 + size of msg + obj's address + (task index + param index)+
1236 * StallMsg: 1 + corenum + sendobjs + receiveobjs (size is always 4 * sizeof(int))
1237 * LockMsg: 2 + lock type + obj pointer + request core (size is always 4 * sizeof(int))
1238 * 3/4/5 + lock type + obj pointer (size is always 3 * sizeof(int))
1239 * lock type: 0 -- read; 1 -- write
1242 // transfer an object to targetcore
1244 void transferObject(struct transObjInfo * transObj) {
1245 void * obj = transObj->objptr;
1246 int type=((int *)obj)[0];
1247 int size=classsize[type];
1248 int targetcore = transObj->targetcore;
1249 //assert(type < NUMCLASSES); // can only transfer normal object
1253 int self_y, self_x, target_y, target_x;
1255 // for 32 bit machine, the size of fixed part is always 3 words
1256 //int msgsize = sizeof(int) * 2 + sizeof(void *);
1257 int msgsize = 3 + transObj->length * 2;
1260 struct ___Object___ * newobj = (struct ___Object___ *)obj;
1261 /*if(0 == newobj->isolate) {
1265 calCoords(corenum, &self_y, &self_x);
1266 calCoords(targetcore, &target_y, &target_x);
1267 // Build the message header
1268 msgHdr = construct_dyn_hdr(0, msgsize, 0, // msgsize word sent.
1270 target_y, target_x);
1271 gdn_send(msgHdr); // Send the message header to EAST to handle fab(n - 1).
1272 raw_test_pass(0xbbbb);
1273 raw_test_pass(0xb000 + targetcore); // targetcore
1277 raw_test_pass_reg(msgsize);
1279 raw_test_pass_reg(obj);
1280 //gdn_send(isshared);
1281 //raw_test_pass_reg(isshared);
1282 for(i = 0; i < transObj->length; ++i) {
1283 int taskindex = transObj->queues[2*i];
1284 int paramindex = transObj->queues[2*i+1];
1285 gdn_send(taskindex);
1286 raw_test_pass_reg(taskindex);
1287 gdn_send(paramindex);
1288 raw_test_pass_reg(paramindex);
1290 raw_test_pass(0xffff);
1291 ++(self_numsendobjs);
1292 #elif defined THREADSIMULATE
1293 int numofcore = pthread_getspecific(key);
1295 // use POSIX message queue to transfer objects between cores
1299 if(targetcore < 10) {
1300 corenumstr[0] = targetcore + '0';
1301 corenumstr[1] = '\0';
1303 } else if(targetcore < 100) {
1304 corenumstr[1] = targetcore % 10 + '0';
1305 corenumstr[0] = (targetcore / 10) + '0';
1306 corenumstr[2] = '\0';
1309 printf("Error: targetcore >= 100\n");
1313 char * pathhead = "/msgqueue_";
1314 int targetlen = strlen(pathhead);
1315 char path[targetlen + sourcelen + 1];
1316 strcpy(path, pathhead);
1317 strncat(path, corenumstr, sourcelen);
1318 int oflags = O_WRONLY|O_NONBLOCK;
1319 int omodes = S_IRWXU|S_IRWXG|S_IRWXO;
1320 mqdnum = mq_open(path, oflags, omodes, NULL);
1322 printf("[transferObject, %d] mq_open %s fail: %d, error: %s\n", numofcore, path, mqdnum, strerror(errno));
1326 /*struct ___Object___ * newobj = (struct ___Object___ *)obj;
1327 if(0 == newobj->isolate) {
1328 newobj = RUNMALLOC(size);
1329 memcpy(newobj, obj, size);
1330 newobj->original=obj;
1332 struct transObjInfo * tmptransObj = RUNMALLOC(sizeof(struct transObjInfo));
1333 memcpy(tmptransObj, transObj, sizeof(struct transObjInfo));
1334 int * tmpqueue = RUNMALLOC(sizeof(int)*2*tmptransObj->length);
1335 memcpy(tmpqueue, tmptransObj->queues, sizeof(int)*2*tmptransObj->length);
1336 tmptransObj->queues = tmpqueue;
1337 struct ___Object___ * newobj = RUNMALLOC(sizeof(struct ___Object___));
1338 newobj->type = ((struct ___Object___ *)obj)->type;
1339 newobj->original = (struct ___Object___ *)tmptransObj;
1342 ret=mq_send(mqdnum, (void *)newobj, sizeof(struct ___Object___), 0); // send the object into the queue
1344 printf("[transferObject, %d] mq_send to %s returned: %d, error: %s\n", numofcore, path, ret, strerror(errno));
1348 if(numofcore == STARTUPCORE) {
1349 ++numsendobjs[numofcore];
1351 ++(thread_data_array[numofcore].numsendobjs);
1353 printf("[transferObject, %d] mq_send to %s returned: $%x\n", numofcore, path, ret);
1357 // send terminate message to targetcore
1359 bool transStallMsg(int targetcore) {
1362 int self_y, self_x, target_y, target_x;
1363 // for 32 bit machine, the size is always 4 words
1364 //int msgsize = sizeof(int) * 4;
1367 calCoords(corenum, &self_y, &self_x);
1368 calCoords(targetcore, &target_y, &target_x);
1369 // Build the message header
1370 msgHdr = construct_dyn_hdr(0, msgsize, 0, // msgsize word sent.
1372 target_y, target_x);
1373 gdn_send(msgHdr); // Send the message header to EAST to handle fab(n - 1).
1374 raw_test_pass(0xbbbb);
1375 raw_test_pass(0xb000 + targetcore); // targetcore
1379 raw_test_pass_reg(corenum);
1380 gdn_send(self_numsendobjs);
1381 raw_test_pass_reg(self_numsendobjs);
1382 gdn_send(self_numreceiveobjs);
1383 raw_test_pass_reg(self_numreceiveobjs);
1384 raw_test_pass(0xffff);
1386 #elif defined THREADSIMULATE
1387 struct ___Object___ *newobj = RUNMALLOC(sizeof(struct ___Object___));
1388 // use the first four int field to hold msgtype/corenum/sendobj/receiveobj
1390 int numofcore = pthread_getspecific(key);
1391 newobj->flag = numofcore;
1392 newobj->___cachedHash___ = thread_data_array[numofcore].numsendobjs;
1393 newobj->___cachedCode___ = thread_data_array[numofcore].numreceiveobjs;
1395 // use POSIX message queue to send stall msg to startup core
1396 assert(targetcore == STARTUPCORE);
1400 if(targetcore < 10) {
1401 corenumstr[0] = targetcore + '0';
1402 corenumstr[1] = '\0';
1404 } else if(targetcore < 100) {
1405 corenumstr[1] = targetcore % 10 + '0';
1406 corenumstr[0] = (targetcore / 10) + '0';
1407 corenumstr[2] = '\0';
1410 printf("Error: targetcore >= 100\n");
1414 char * pathhead = "/msgqueue_";
1415 int targetlen = strlen(pathhead);
1416 char path[targetlen + sourcelen + 1];
1417 strcpy(path, pathhead);
1418 strncat(path, corenumstr, sourcelen);
1419 int oflags = O_WRONLY|O_NONBLOCK;
1420 int omodes = S_IRWXU|S_IRWXG|S_IRWXO;
1421 mqdnum = mq_open(path, oflags, omodes, NULL);
1423 printf("[transStallMsg, %d] mq_open %s fail: %d, error: %s\n", numofcore, path, mqdnum, strerror(errno));
1428 ret=mq_send(mqdnum, (void *)newobj, sizeof(struct ___Object___), 0); // send the object into the queue
1430 printf("[transStallMsg, %d] mq_send to %s returned: %d, error: %s\n", numofcore, path, ret, strerror(errno));
1434 printf("[transStallMsg, %d] mq_send to %s returned: $%x\n", numofcore, path, ret);
1435 printf("<transStallMsg> to %s index: %d, sendobjs: %d, receiveobjs: %d\n", path, newobj->flag, newobj->___cachedHash___, newobj->___cachedCode___);
1442 // receive object transferred from other cores
1443 // or the terminate message from other cores
1444 // NOTICE: following format is for threadsimulate version only
1445 // RAW version please see previous description
1446 // format: type + object
1447 // type: -1--stall msg
1449 // return value: 0--received an object
1450 // 1--received nothing
1451 // 2--received a Stall Msg
1452 // 3--received a lock Msg
1453 // RAW version: -1 -- received nothing
1454 // otherwise -- received msg type
1455 int receiveObject() {
1459 int self_y, self_x, target_y, target_x;
1461 if(gdn_input_avail() == 0) {
1462 if(corenum < NUMCORES) {
1463 raw_test_pass(0xd001);
1467 raw_test_pass(0xcccc);
1468 while((gdn_input_avail() != 0) && (msgdataindex < msglength)) {
1469 msgdata[msgdataindex] = gdn_receive();
1470 if(msgdataindex == 0) {
1471 if(msgdata[0] > 2) {
1473 } else if(msgdata[0] > 0) {
1476 } else if((msgdataindex == 1) && (msgdata[0] == 0)) {
1477 msglength = msgdata[msgdataindex];
1479 raw_test_pass_reg(msgdata[msgdataindex]);
1482 /*if(msgdataindex == 0) {
1484 msgtype = gdn_receive();
1491 msgdata = (int *)RUNMALLOC_I(msglength * sizeof(int));
1492 msgdata[msgdataindex] = msgtype;
1494 raw_test_pass_reg(msgtype);
1495 } else if((msgdataindex == 1) && (msgtype == 0)) {
1496 // object transfer msg
1497 msglength = gdn_receive();
1498 msgdata = (int *)RUNMALLOC_I(msglength * sizeof(int));
1499 msgdata[0] = msgtype;
1500 msgdata[msgdataindex] = msglength;
1501 raw_test_pass_reg(msgdata[msgdataindex]);
1503 msgdata[msgdataindex] = gdn_receive();
1504 raw_test_pass_reg(msgdata[msgdataindex]);
1508 raw_test_pass(0xffff);
1509 if(msgdataindex == msglength) {
1510 // received a whole msg
1511 int type, data1, data2; // will receive at least 3 words including type
1517 // receive a object transfer msg
1518 struct transObjInfo * transObj = RUNMALLOC_I(sizeof(struct transObjInfo));
1520 if(corenum > NUMCORES - 1) {
1521 raw_test_done(0xa00a);
1523 // store the object and its corresponding queue info, enqueue it later
1524 transObj->objptr = (void *)data2; // data1 is now size of the msg
1525 transObj->length = (msglength - 3) / 2;
1526 transObj->queues = RUNMALLOC_I(sizeof(int)*(msglength - 3));
1527 for(k = 0; k < transObj->length; ++k) {
1528 transObj->queues[2*k] = msgdata[3+2*k];
1529 raw_test_pass_reg(transObj->queues[2*k]);
1530 transObj->queues[2*k+1] = msgdata[3+2*k+1];
1531 raw_test_pass_reg(transObj->queues[2*k+1]);
1533 //memcpy(transObj->queues, msgdata[3], sizeof(int)*(msglength - 3));
1534 addNewItem_I(&objqueue, (void *)transObj);
1535 ++(self_numreceiveobjs);
1536 raw_test_pass(0xe881);
1538 addNewItem_I(&objqueue, (void *)data2);
1539 ++(self_numreceiveobjs);
1540 raw_test_pass(0xe881);*/
1544 // receive a stall msg
1545 if(corenum != STARTUPCORE) {
1546 // non startup core can not receive stall msg
1548 raw_test_done(0xa001);
1550 if(data1 < NUMCORES) {
1551 raw_test_pass(0xe882);
1552 corestatus[data1] = 0;
1553 numsendobjs[data1] = data2;
1554 numreceiveobjs[data1] = msgdata[3];
1559 // receive lock request msg
1560 // for 32 bit machine, the size is always 3 words
1561 //int msgsize = sizeof(int) * 3;
1563 // lock request msg, handle it right now
1564 // check to see if there is a lock exist in locktbl for the required obj
1565 int data3 = msgdata[3];
1567 if(!RuntimeHashcontainskey(locktbl, data2)) {
1568 // no locks for this object
1569 // first time to operate on this shared object
1570 // create a lock for it
1571 // the lock is an integer: 0 -- stall, >0 -- read lock, -1 -- write lock
1572 raw_test_pass(0xe883);
1574 RuntimeHashadd_I(locktbl, data2, 1);
1576 RuntimeHashadd_I(locktbl, data2, -1);
1580 raw_test_pass(0xe884);
1581 RuntimeHashget(locktbl, data2, &rwlock_obj);
1582 raw_test_pass_reg(rwlock_obj);
1583 if(0 == rwlock_obj) {
1589 RuntimeHashremovekey(locktbl, data2);
1590 RuntimeHashadd_I(locktbl, data2, rwlock_obj);
1591 } else if((rwlock_obj > 0) && (data1 == 0)) {
1592 // read lock request and there are only read locks
1594 RuntimeHashremovekey(locktbl, data2);
1595 RuntimeHashadd_I(locktbl, data2, rwlock_obj);
1599 raw_test_pass_reg(rwlock_obj);
1602 calCoords(corenum, &self_y, &self_x);
1603 calCoords(targetcore, &target_y, &target_x);
1604 // Build the message header
1605 msgHdr = construct_dyn_hdr(0, msgsize, 0, // msgsize word sent.
1607 target_y, target_x);
1608 gdn_send(msgHdr); // Send the message header to EAST to handle fab(n - 1).
1609 raw_test_pass(0xbbbb);
1610 raw_test_pass(0xb000 + targetcore); // targetcore
1612 // deny the lock request
1613 gdn_send(4); // lock request
1616 // grount the lock request
1617 gdn_send(3); // lock request
1620 gdn_send(data1); // lock type
1621 raw_test_pass_reg(data1);
1622 gdn_send(data2); // lock target
1623 raw_test_pass_reg(data2);
1624 raw_test_pass(0xffff);
1628 // receive lock grount msg
1629 if(corenum > NUMCORES - 1) {
1630 raw_test_done(0xa00b);
1632 if(lockobj == data2) {
1639 // conflicts on lockresults
1640 raw_test_done(0xa002);
1645 // receive lock grount/deny msg
1646 if(corenum > NUMCORES - 1) {
1647 raw_test_done(0xa00c);
1649 if(lockobj == data2) {
1656 // conflicts on lockresults
1657 raw_test_done(0xa003);
1662 // receive lock release msg
1663 if(!RuntimeHashcontainskey(locktbl, data2)) {
1664 // no locks for this object, something is wrong
1665 raw_test_done(0xa004);
1668 RuntimeHashget(locktbl, data2, &rwlock_obj);
1669 raw_test_pass(0xe885);
1670 raw_test_pass_reg(rwlock_obj);
1676 RuntimeHashremovekey(locktbl, data2);
1677 RuntimeHashadd_I(locktbl, data2, rwlock_obj);
1678 raw_test_pass_reg(rwlock_obj);
1687 for(msgdataindex--;msgdataindex > 0; --msgdataindex) {
1688 msgdata[msgdataindex] = -1;
1693 raw_test_pass(0xe886);
1697 raw_test_pass(0xe887);
1700 #elif defined THREADSIMULATE
1701 int numofcore = pthread_getspecific(key);
1702 // use POSIX message queue to transfer object
1704 struct mq_attr mqattr;
1705 mq_getattr(mqd[numofcore], &mqattr);
1706 void * msgptr =RUNMALLOC(mqattr.mq_msgsize);
1707 msglen=mq_receive(mqd[numofcore], msgptr, mqattr.mq_msgsize, NULL); // receive the object into the queue
1713 //printf("msg: %s\n",msgptr);
1714 if(((int*)msgptr)[0] == -1) {
1716 struct ___Object___ * tmpptr = (struct ___Object___ *)msgptr;
1717 int index = tmpptr->flag;
1718 corestatus[index] = 0;
1719 numsendobjs[index] = tmpptr->___cachedHash___;
1720 numreceiveobjs[index] = tmpptr->___cachedCode___;
1721 printf("<receiveObject> index: %d, sendobjs: %d, reveiveobjs: %d\n", index, numsendobjs[index], numreceiveobjs[index]);
1724 } /*else if(((int*)msgptr)[0] == -2) {
1729 if(numofcore == STARTUPCORE) {
1730 ++(numreceiveobjs[numofcore]);
1732 ++(thread_data_array[numofcore].numreceiveobjs);
1734 struct ___Object___ * tmpptr = (struct ___Object___ *)msgptr;
1735 struct transObjInfo * transObj = (struct transObjInfo *)tmpptr->original;
1736 tmpptr = (struct ___Object___ *)(transObj->objptr);
1737 int type = tmpptr->type;
1738 int size=classsize[type];
1739 struct ___Object___ * newobj=RUNMALLOC(size);
1740 memcpy(newobj, tmpptr, size);
1741 if(0 == newobj->isolate) {
1742 newobj->original=tmpptr;
1747 for(k = 0; k < transObj->length; ++k) {
1748 int taskindex = transObj->queues[2 * k];
1749 int paramindex = transObj->queues[2 * k + 1];
1750 struct parameterwrapper ** queues = &(paramqueues[numofcore][taskindex][paramindex]);
1751 enqueueObject(newobj, queues, 1);
1753 RUNFREE(transObj->queues);
1760 bool getreadlock(void * ptr) {
1763 int self_y, self_x, target_y, target_x;
1764 int targetcore = ((int)ptr) % TOTALCORE;
1765 // for 32 bit machine, the size is always 4 words
1766 //int msgsize = sizeof(int) * 4;
1776 if(targetcore == corenum) {
1777 // reside on this core
1780 raw_user_interrupts_off();
1782 if(!RuntimeHashcontainskey(locktbl, (int)ptr)) {
1783 // no locks for this object
1784 // first time to operate on this shared object
1785 // create a lock for it
1786 // the lock is an integer: 0 -- stall, >0 -- read lock, -1 -- write lock
1787 RuntimeHashadd_I(locktbl, (int)ptr, 1);
1790 RuntimeHashget(locktbl, (int)ptr, &rwlock_obj);
1791 if(-1 != rwlock_obj) {
1793 RuntimeHashremovekey(locktbl, (int)ptr);
1794 RuntimeHashadd_I(locktbl, (int)ptr, rwlock_obj);
1800 raw_user_interrupts_on();
1802 if(lockobj == (int)ptr) {
1813 // conflicts on lockresults
1814 raw_test_done(0xa005);
1819 calCoords(corenum, &self_y, &self_x);
1820 calCoords(targetcore, &target_y, &target_x);
1821 // Build the message header
1822 msgHdr = construct_dyn_hdr(0, msgsize, 0, // msgsize word sent.
1824 target_y, target_x);
1825 gdn_send(msgHdr); // Send the message header to EAST to handle fab(n - 1).
1826 raw_test_pass(0xbbbb);
1827 raw_test_pass(0xb000 + targetcore); // targetcore
1828 gdn_send(2); // lock request
1830 gdn_send(0); // read lock
1833 raw_test_pass_reg(ptr);
1835 raw_test_pass_reg(corenum);
1836 raw_test_pass(0xffff);
1838 #elif defined THREADSIMULATE
1839 int numofcore = pthread_getspecific(key);
1841 int rc = pthread_rwlock_tryrdlock(&rwlock_tbl);
1842 printf("[getreadlock, %d] getting the read lock for locktbl: %d error: \n", numofcore, rc, strerror(rc));
1846 if(!RuntimeHashcontainskey(locktbl, (int)ptr)) {
1847 // no locks for this object
1848 // first time to operate on this shared object
1849 // create a lock for it
1850 rc = pthread_rwlock_unlock(&rwlock_tbl);
1851 printf("[getreadlock, %d] release the read lock for locktbl: %d error: \n", numofcore, rc, strerror(rc));
1852 pthread_rwlock_t* rwlock = (pthread_rwlock_t *)RUNMALLOC(sizeof(pthread_rwlock_t));
1853 memcpy(rwlock, &rwlock_init, sizeof(pthread_rwlock_t));
1854 rc = pthread_rwlock_init(rwlock, NULL);
1855 printf("[getreadlock, %d] initialize the rwlock for object %d: %d error: \n", numofcore, (int)ptr, rc, strerror(rc));
1856 rc = pthread_rwlock_trywrlock(&rwlock_tbl);
1857 printf("[getreadlock, %d] getting the write lock for locktbl: %d error: \n", numofcore, rc, strerror(rc));
1862 if(!RuntimeHashcontainskey(locktbl, (int)ptr)) {
1864 RuntimeHashadd(locktbl, (int)ptr, (int)rwlock);
1867 RuntimeHashget(locktbl, (int)ptr, (int*)&rwlock);
1869 rc = pthread_rwlock_unlock(&rwlock_tbl);
1870 printf("[getreadlock, %d] release the write lock for locktbl: %d error: \n", numofcore, rc, strerror(rc));
1872 rc = pthread_rwlock_tryrdlock(rwlock);
1873 printf("[getreadlock, %d] getting read lock for object %d: %d error: \n", numofcore, (int)ptr, rc, strerror(rc));
1880 pthread_rwlock_t* rwlock_obj = NULL;
1881 RuntimeHashget(locktbl, (int)ptr, (int*)&rwlock_obj);
1882 rc = pthread_rwlock_unlock(&rwlock_tbl);
1883 printf("[getreadlock, %d] release the read lock for locktbl: %d error: \n", numofcore, rc, strerror(rc));
1884 int rc_obj = pthread_rwlock_tryrdlock(rwlock_obj);
1885 printf("[getreadlock, %d] getting read lock for object %d: %d error: \n", numofcore, (int)ptr, rc_obj, strerror(rc_obj));
1895 void releasereadlock(void * ptr) {
1898 int self_y, self_x, target_y, target_x;
1899 int targetcore = ((int)ptr) % TOTALCORE;
1900 // for 32 bit machine, the size is always 3 words
1901 //int msgsize = sizeof(int) * 3;
1904 if(targetcore == corenum) {
1906 raw_user_interrupts_off();
1908 // reside on this core
1909 if(!RuntimeHashcontainskey(locktbl, (int)ptr)) {
1910 // no locks for this object, something is wrong
1911 raw_test_done(0xa006);
1914 RuntimeHashget(locktbl, (int)ptr, &rwlock_obj);
1916 RuntimeHashremovekey(locktbl, (int)ptr);
1917 RuntimeHashadd_I(locktbl, (int)ptr, rwlock_obj);
1920 raw_user_interrupts_on();
1925 calCoords(corenum, &self_y, &self_x);
1926 calCoords(targetcore, &target_y, &target_x);
1927 // Build the message header
1928 msgHdr = construct_dyn_hdr(0, msgsize, 0, // msgsize word sent.
1930 target_y, target_x);
1931 gdn_send(msgHdr); // Send the message header to EAST to handle fab(n - 1).
1932 raw_test_pass(0xbbbb);
1933 raw_test_pass(0xb000 + targetcore); // targetcore
1934 gdn_send(5); // lock release
1936 gdn_send(0); // read lock
1939 raw_test_pass_reg(ptr);
1940 raw_test_pass(0xffff);
1941 #elif defined THREADSIMULATE
1942 int numofcore = pthread_getspecific(key);
1943 int rc = pthread_rwlock_rdlock(&rwlock_tbl);
1944 printf("[releasereadlock, %d] getting the read lock for locktbl: %d error: \n", numofcore, rc, strerror(rc));
1945 if(!RuntimeHashcontainskey(locktbl, (int)ptr)) {
1946 printf("[releasereadlock, %d] Error: try to release a lock without previously grab it\n", numofcore);
1949 pthread_rwlock_t* rwlock_obj = NULL;
1950 RuntimeHashget(locktbl, (int)ptr, (int*)&rwlock_obj);
1951 int rc_obj = pthread_rwlock_unlock(rwlock_obj);
1952 printf("[releasereadlock, %d] unlocked object %d: %d error: \n", numofcore, (int)ptr, rc_obj, strerror(rc_obj));
1953 rc = pthread_rwlock_unlock(&rwlock_tbl);
1954 printf("[releasereadlock, %d] release the read lock for locktbl: %d error: \n", numofcore, rc, strerror(rc));
1959 bool getreadlock_I(void * ptr) {
1961 int self_y, self_x, target_y, target_x;
1962 int targetcore = ((int)ptr) % TOTALCORE;
1963 // for 32 bit machine, the size is always 4 words
1964 //int msgsize = sizeof(int) * 4;
1974 if(targetcore == corenum) {
1975 // reside on this core
1977 if(!RuntimeHashcontainskey(locktbl, (int)ptr)) {
1978 // no locks for this object
1979 // first time to operate on this shared object
1980 // create a lock for it
1981 // the lock is an integer: 0 -- stall, >0 -- read lock, -1 -- write lock
1982 RuntimeHashadd_I(locktbl, (int)ptr, 1);
1985 RuntimeHashget(locktbl, (int)ptr, &rwlock_obj);
1986 if(-1 != rwlock_obj) {
1988 RuntimeHashremovekey(locktbl, (int)ptr);
1989 RuntimeHashadd_I(locktbl, (int)ptr, rwlock_obj);
1994 if(lockobj == (int)ptr) {
2005 // conflicts on lockresults
2006 raw_test_done(0xa005);
2011 calCoords(corenum, &self_y, &self_x);
2012 calCoords(targetcore, &target_y, &target_x);
2013 // Build the message header
2014 msgHdr = construct_dyn_hdr(0, msgsize, 0, // msgsize word sent.
2016 target_y, target_x);
2017 gdn_send(msgHdr); // Send the message header to EAST to handle fab(n - 1).
2018 raw_test_pass(0xbbbb);
2019 raw_test_pass(0xb000 + targetcore); // targetcore
2020 gdn_send(2); // lock request
2022 gdn_send(0); // read lock
2025 raw_test_pass_reg(ptr);
2027 raw_test_pass_reg(corenum);
2028 raw_test_pass(0xffff);
2032 void releasereadlock_I(void * ptr) {
2034 int self_y, self_x, target_y, target_x;
2035 int targetcore = ((int)ptr) % TOTALCORE;
2036 // for 32 bit machine, the size is always 3 words
2037 //int msgsize = sizeof(int) * 3;
2040 if(targetcore == corenum) {
2041 // reside on this core
2042 if(!RuntimeHashcontainskey(locktbl, (int)ptr)) {
2043 // no locks for this object, something is wrong
2044 raw_test_done(0xa006);
2047 RuntimeHashget(locktbl, (int)ptr, &rwlock_obj);
2049 RuntimeHashremovekey(locktbl, (int)ptr);
2050 RuntimeHashadd_I(locktbl, (int)ptr, rwlock_obj);
2055 calCoords(corenum, &self_y, &self_x);
2056 calCoords(targetcore, &target_y, &target_x);
2057 // Build the message header
2058 msgHdr = construct_dyn_hdr(0, msgsize, 0, // msgsize word sent.
2060 target_y, target_x);
2061 gdn_send(msgHdr); // Send the message header to EAST to handle fab(n - 1).
2062 raw_test_pass(0xbbbb);
2063 raw_test_pass(0xb000 + targetcore); // targetcore
2064 gdn_send(5); // lock release
2066 gdn_send(0); // read lock
2069 raw_test_pass_reg(ptr);
2070 raw_test_pass(0xffff);
2074 bool getwritelock(void * ptr) {
2077 int self_y, self_x, target_y, target_x;
2078 int targetcore = ((int)ptr) % TOTALCORE;
2079 // for 32 bit machine, the size is always 4 words
2080 //int msgsize = sizeof(int) * 4;
2090 if(targetcore == corenum) {
2091 // reside on this core
2094 raw_user_interrupts_off();
2096 if(!RuntimeHashcontainskey(locktbl, (int)ptr)) {
2097 // no locks for this object
2098 // first time to operate on this shared object
2099 // create a lock for it
2100 // the lock is an integer: 0 -- stall, >0 -- read lock, -1 -- write lock
2101 raw_test_pass(0xe552);
2102 RuntimeHashadd_I(locktbl, (int)ptr, -1);
2105 RuntimeHashget(locktbl, (int)ptr, &rwlock_obj);
2106 raw_test_pass(0xe553);
2107 raw_test_pass_reg(rwlock_obj);
2108 if(0 == rwlock_obj) {
2110 RuntimeHashremovekey(locktbl, (int)ptr);
2111 RuntimeHashadd_I(locktbl, (int)ptr, rwlock_obj);
2117 raw_user_interrupts_on();
2119 raw_test_pass(0xe554);
2120 raw_test_pass_reg(lockresult);
2121 if(lockobj == (int)ptr) {
2134 // conflicts on lockresults
2135 raw_test_done(0xa007);
2140 raw_test_pass(0xe555);
2141 calCoords(corenum, &self_y, &self_x);
2142 calCoords(targetcore, &target_y, &target_x);
2143 // Build the message header
2144 msgHdr = construct_dyn_hdr(0, msgsize, 0, // msgsize word sent.
2146 target_y, target_x);
2147 gdn_send(msgHdr); // Send the message header to EAST to handle fab(n - 1).
2148 raw_test_pass(0xbbbb);
2149 raw_test_pass(0xb000 + targetcore); // targetcore
2150 gdn_send(2); // lock request
2152 gdn_send(1); // write lock
2155 raw_test_pass_reg(ptr);
2157 raw_test_pass_reg(corenum);
2158 raw_test_pass(0xffff);
2160 #elif defined THREADSIMULATE
2161 int numofcore = pthread_getspecific(key);
2163 int rc = pthread_rwlock_tryrdlock(&rwlock_tbl);
2164 printf("[getwritelock, %d] getting the read lock for locktbl: %d error: \n", numofcore, rc, strerror(rc));
2168 if(!RuntimeHashcontainskey(locktbl, (int)ptr)) {
2169 // no locks for this object
2170 // first time to operate on this shared object
2171 // create a lock for it
2172 rc = pthread_rwlock_unlock(&rwlock_tbl);
2173 printf("[getwritelock, %d] release the read lock for locktbl: %d error: \n", numofcore, rc, strerror(rc));
2174 pthread_rwlock_t* rwlock = (pthread_rwlock_t *)RUNMALLOC(sizeof(pthread_rwlock_t));
2175 memcpy(rwlock, &rwlock_init, sizeof(pthread_rwlock_t));
2176 rc = pthread_rwlock_init(rwlock, NULL);
2177 printf("[getwritelock, %d] initialize the rwlock for object %d: %d error: \n", numofcore, (int)ptr, rc, strerror(rc));
2178 rc = pthread_rwlock_trywrlock(&rwlock_tbl);
2179 printf("[getwritelock, %d] getting the write lock for locktbl: %d error: \n", numofcore, rc, strerror(rc));
2181 pthread_rwlock_destroy(rwlock);
2185 if(!RuntimeHashcontainskey(locktbl, (int)ptr)) {
2187 RuntimeHashadd(locktbl, (int)ptr, (int)rwlock);
2189 pthread_rwlock_destroy(rwlock);
2191 RuntimeHashget(locktbl, (int)ptr, (int*)&rwlock);
2193 rc = pthread_rwlock_unlock(&rwlock_tbl);
2194 printf("[getwritelock, %d] release the write lock for locktbl: %d error: \n", numofcore, rc, strerror(rc));
2196 rc = pthread_rwlock_trywrlock(rwlock);
2197 printf("[getwritelock, %d] getting write lock for object %d: %d error: \n", numofcore, (int)ptr, rc, strerror(rc));
2204 pthread_rwlock_t* rwlock_obj = NULL;
2205 RuntimeHashget(locktbl, (int)ptr, (int*)&rwlock_obj);
2206 rc = pthread_rwlock_unlock(&rwlock_tbl);
2207 printf("[getwritelock, %d] release the read lock for locktbl: %d error: \n", numofcore, rc, strerror(rc));
2208 int rc_obj = pthread_rwlock_trywrlock(rwlock_obj);
2209 printf("[getwritelock, %d] getting write lock for object %d: %d error: \n", numofcore, (int)ptr, rc_obj, strerror(rc_obj));
2220 void releasewritelock(void * ptr) {
2223 int self_y, self_x, target_y, target_x;
2224 int targetcore = ((int)ptr) % TOTALCORE;
2225 // for 32 bit machine, the size is always 3 words
2226 //int msgsize = sizeof(int) * 3;
2229 if(targetcore == corenum) {
2231 raw_user_interrupts_off();
2233 // reside on this core
2234 if(!RuntimeHashcontainskey(locktbl, (int)ptr)) {
2235 // no locks for this object, something is wrong
2236 raw_test_done(0xa008);
2239 raw_test_pass(0xe662);
2240 RuntimeHashget(locktbl, (int)ptr, &rwlock_obj);
2241 raw_test_pass_reg(rwlock_obj);
2243 RuntimeHashremovekey(locktbl, (int)ptr);
2244 RuntimeHashadd_I(locktbl, (int)ptr, rwlock_obj);
2245 raw_test_pass_reg(rwlock_obj);
2248 raw_user_interrupts_on();
2253 raw_test_pass(0xe663);
2254 calCoords(corenum, &self_y, &self_x);
2255 calCoords(targetcore, &target_y, &target_x);
2256 // Build the message header
2257 msgHdr = construct_dyn_hdr(0, msgsize, 0, // msgsize word sent.
2259 target_y, target_x);
2260 gdn_send(msgHdr); // Send the message header to EAST to handle fab(n - 1).
2261 raw_test_pass(0xbbbb);
2262 raw_test_pass(0xb000 + targetcore);
2263 gdn_send(5); // lock release
2265 gdn_send(1); // write lock
2268 raw_test_pass_reg(ptr);
2269 raw_test_pass(0xffff);
2270 #elif defined THREADSIMULATE
2271 int numofcore = pthread_getspecific(key);
2272 int rc = pthread_rwlock_rdlock(&rwlock_tbl);
2273 printf("[releasewritelock, %d] getting the read lock for locktbl: %d error: \n", numofcore, rc, strerror(rc));
2274 if(!RuntimeHashcontainskey(locktbl, (int)ptr)) {
2275 printf("[releasewritelock, %d] Error: try to release a lock without previously grab it\n", numofcore);
2278 pthread_rwlock_t* rwlock_obj = NULL;
2279 RuntimeHashget(locktbl, (int)ptr, (int*)&rwlock_obj);
2280 int rc_obj = pthread_rwlock_unlock(rwlock_obj);
2281 printf("[releasewritelock, %d] unlocked object %d: %d error:\n", numofcore, (int)ptr, rc_obj, strerror(rc_obj));
2282 rc = pthread_rwlock_unlock(&rwlock_tbl);
2283 printf("[releasewritelock, %d] release the read lock for locktbl: %d error: \n", numofcore, rc, strerror(rc));
2287 int enqueuetasks(struct parameterwrapper *parameter, struct parameterwrapper *prevptr, struct ___Object___ *ptr, int * enterflags, int numenterflags) {
2288 void * taskpointerarray[MAXTASKPARAMS];
2290 int numparams=parameter->task->numParameters;
2291 int numiterators=parameter->task->numTotal-1;
2296 struct taskdescriptor * task=parameter->task;
2298 ObjectHashadd(parameter->objectset, (int) ptr, 0, (int) enterflags, numenterflags, enterflags==NULL);//this add the object to parameterwrapper
2300 /* Add enqueued object to parameter vector */
2301 taskpointerarray[parameter->slot]=ptr;
2303 /* Reset iterators */
2304 for(j=0;j<numiterators;j++) {
2305 toiReset(¶meter->iterators[j]);
2308 /* Find initial state */
2309 for(j=0;j<numiterators;j++) {
2311 if(toiHasNext(¶meter->iterators[j], taskpointerarray OPTARG(failed)))
2312 toiNext(¶meter->iterators[j], taskpointerarray OPTARG(failed));
2314 /* Need to backtrack */
2315 toiReset(¶meter->iterators[j]);
2319 /* Nothing to enqueue */
2326 /* Enqueue current state */
2328 struct taskparamdescriptor *tpd=RUNMALLOC(sizeof(struct taskparamdescriptor));
2330 tpd->numParameters=numiterators+1;
2331 tpd->parameterArray=RUNMALLOC(sizeof(void *)*(numiterators+1));
2332 for(j=0;j<=numiterators;j++){
2333 tpd->parameterArray[j]=taskpointerarray[j];//store the actual parameters
2336 if ((/*!gencontains(failedtasks, tpd)&&*/!gencontains(activetasks,tpd))) {
2337 genputtable(activetasks, tpd, tpd);
2339 RUNFREE(tpd->parameterArray);
2343 /* This loop iterates to the next parameter combination */
2344 if (numiterators==0)
2347 for(j=numiterators-1; j<numiterators;j++) {
2349 if(toiHasNext(¶meter->iterators[j], taskpointerarray OPTARG(failed)))
2350 toiNext(¶meter->iterators[j], taskpointerarray OPTARG(failed));
2352 /* Need to backtrack */
2353 toiReset(¶meter->iterators[j]);
2357 /* Nothing more to enqueue */
2366 int enqueuetasks_I(struct parameterwrapper *parameter, struct parameterwrapper *prevptr, struct ___Object___ *ptr, int * enterflags, int numenterflags) {
2367 void * taskpointerarray[MAXTASKPARAMS];
2369 int numparams=parameter->task->numParameters;
2370 int numiterators=parameter->task->numTotal-1;
2375 struct taskdescriptor * task=parameter->task;
2377 ObjectHashadd_I(parameter->objectset, (int) ptr, 0, (int) enterflags, numenterflags, enterflags==NULL);//this add the object to parameterwrapper
2379 /* Add enqueued object to parameter vector */
2380 taskpointerarray[parameter->slot]=ptr;
2382 /* Reset iterators */
2383 for(j=0;j<numiterators;j++) {
2384 toiReset(¶meter->iterators[j]);
2387 /* Find initial state */
2388 for(j=0;j<numiterators;j++) {
2390 if(toiHasNext(¶meter->iterators[j], taskpointerarray OPTARG(failed)))
2391 toiNext(¶meter->iterators[j], taskpointerarray OPTARG(failed));
2393 /* Need to backtrack */
2394 toiReset(¶meter->iterators[j]);
2398 /* Nothing to enqueue */
2404 /* Enqueue current state */
2406 struct taskparamdescriptor *tpd=RUNMALLOC_I(sizeof(struct taskparamdescriptor));
2408 tpd->numParameters=numiterators+1;
2409 tpd->parameterArray=RUNMALLOC_I(sizeof(void *)*(numiterators+1));
2410 for(j=0;j<=numiterators;j++){
2411 tpd->parameterArray[j]=taskpointerarray[j];//store the actual parameters
2414 if ((/*!gencontains(failedtasks, tpd)&&*/!gencontains(activetasks,tpd))) {
2415 genputtable_I(activetasks, tpd, tpd);
2417 RUNFREE(tpd->parameterArray);
2421 /* This loop iterates to the next parameter combination */
2422 if (numiterators==0)
2425 for(j=numiterators-1; j<numiterators;j++) {
2427 if(toiHasNext(¶meter->iterators[j], taskpointerarray OPTARG(failed)))
2428 toiNext(¶meter->iterators[j], taskpointerarray OPTARG(failed));
2430 /* Need to backtrack */
2431 toiReset(¶meter->iterators[j]);
2435 /* Nothing more to enqueue */
2444 /* Handler for signals. The signals catch null pointer errors and
2445 arithmatic errors. */
2447 void myhandler(int sig, siginfo_t *info, void *uap) {
2450 printf("sig=%d\n",sig);
2453 sigemptyset(&toclear);
2454 sigaddset(&toclear, sig);
2455 sigprocmask(SIG_UNBLOCK, &toclear,NULL);
2456 longjmp(error_handler,1);
2462 struct RuntimeHash *fdtoobject;
2464 void addreadfd(int fd) {
2467 FD_SET(fd, &readfds);
2470 void removereadfd(int fd) {
2471 FD_CLR(fd, &readfds);
2472 if (maxreadfd==(fd+1)) {
2474 while(maxreadfd>0&&!FD_ISSET(maxreadfd-1, &readfds))
2485 void executetasks() {
2486 void * taskpointerarray[MAXTASKPARAMS+OFFSET];
2489 struct ___Object___ * tmpparam = NULL;
2490 struct parameterdescriptor * pd=NULL;
2491 struct parameterwrapper *pw=NULL;
2500 raw_test_pass(0xe991);
2504 /* Set up signal handlers */
2505 struct sigaction sig;
2506 sig.sa_sigaction=&myhandler;
2507 sig.sa_flags=SA_SIGINFO;
2508 sigemptyset(&sig.sa_mask);
2510 /* Catch bus errors, segmentation faults, and floating point exceptions*/
2511 sigaction(SIGBUS,&sig,0);
2512 sigaction(SIGSEGV,&sig,0);
2513 sigaction(SIGFPE,&sig,0);
2514 sigaction(SIGPIPE,&sig,0);
2523 fdtoobject=allocateRuntimeHash(100);
2527 /* Map first block of memory to protected, anonymous page */
2528 mmap(0, 0x1000, 0, MAP_SHARED|MAP_FIXED|MAP_ANON, -1, 0);
2532 while((hashsize(activetasks)>0)||(maxreadfd>0)) {
2535 raw_test_pass(0xe992);
2537 /* Check if any filedescriptors have IO pending */
2540 struct timeval timeout={0,0};
2544 numselect=select(maxreadfd, &tmpreadfds, NULL, NULL, &timeout);
2546 /* Process ready fd's */
2548 for(fd=0;fd<maxreadfd;fd++) {
2549 if (FD_ISSET(fd, &tmpreadfds)) {
2550 /* Set ready flag on object */
2552 // printf("Setting fd %d\n",fd);
2553 if (RuntimeHashget(fdtoobject, fd,(int *) &objptr)) {
2554 if(intflagorand(objptr,1,0xFFFFFFFF)) { /* Set the first flag to 1 */
2555 enqueueObject(objptr, NULL, 0);
2564 /* See if there are any active tasks */
2565 if (hashsize(activetasks)>0) {
2567 currtpd=(struct taskparamdescriptor *) getfirstkey(activetasks);
2568 genfreekey(activetasks, currtpd);
2570 /* Check if this task has failed, allow a task that contains optional objects to fire */
2571 /*if (gencontains(failedtasks, currtpd)) {
2572 // Free up task parameter descriptor
2573 RUNFREE(currtpd->parameterArray);
2577 numparams=currtpd->task->numParameters;
2578 numtotal=currtpd->task->numTotal;
2580 #ifdef THREADSIMULATE
2581 int isolateflags[numparams];
2583 /* Make sure that the parameters are still in the queues */
2584 for(i=0;i<numparams;i++) {
2585 void * parameter=currtpd->parameterArray[i];
2587 raw_test_pass(0xe993);
2588 // require locks for this parameter
2589 getwritelock(parameter);
2598 while(receiveObject() != -1) {
2602 grount = lockresult;
2612 raw_test_pass(0xe994);
2613 // can not get the lock, try later
2614 for(j = 0; j < i; ++j) {
2615 releasewritelock(taskpointerarray[j+OFFSET]);
2617 genputtable(activetasks, currtpd, currtpd);
2623 for(tmp = 0; tmp < classsize[((struct ___Object___ *)parameter)->type]; ++tmp) {
2624 invalidateAddr(parameter + tmp);
2628 tmpparam = (struct ___Object___ *)parameter;
2629 #ifdef THREADSIMULATE
2630 if(0 == tmpparam->isolate) {
2631 isolateflags[i] = 0;
2632 // shared object, need to flush with current value
2633 //if(!getreadlock(tmpparam->original)) {
2634 // // fail to get read lock of the original object, try this task later
2635 if(!getwritelock(tmpparam->original)) {
2636 // fail to get write lock, release all obtained locks and try this task later
2638 for(j = 0; j < i; ++j) {
2639 if(0 == isolateflags[j]) {
2640 releasewritelock(((struct ___Object___ *)taskpointerarray[j+OFFSET])->original);
2643 genputtable(activetasks, currtpd, currtpd);
2646 if(tmpparam->version != tmpparam->original->version) {
2647 // some task on another core has changed this object
2648 // flush this object
2649 //memcpy(tmpparam, tmpparam->original, classsize[tmpparam->type]);
2650 // release all obtained locks
2652 for(j = 0; j < i; ++j) {
2653 if(0 == isolateflags[j]) {
2654 releasewritelock(((struct ___Object___ *)taskpointerarray[j+OFFSET])->original);
2657 releasewritelock(tmpparam->original);
2659 // dequeue this object
2660 int numofcore = pthread_getspecific(key);
2661 struct parameterwrapper ** queues = objectqueues[numofcore][tmpparam->type];
2662 int length = numqueues[numofcore][tmpparam->type];
2663 for(j = 0; j < length; ++j) {
2664 struct parameterwrapper * pw = queues[j];
2665 if(ObjectHashcontainskey(pw->objectset, (int)tmpparam)) {
2667 int UNUSED, UNUSED2;
2669 ObjectHashget(pw->objectset, (int) tmpparam, (int *) &next, (int *) &enterflags, &UNUSED, &UNUSED2);
2670 ObjectHashremove(pw->objectset, (int)tmpparam);
2671 if (enterflags!=NULL)
2675 // try to enqueue it again to check if it feeds other tasks;
2676 //enqueueObject(tmpparam, NULL, 0);
2677 // Free up task parameter descriptor
2678 RUNFREE(currtpd->parameterArray);
2683 isolateflags[i] = 1;
2686 pd=currtpd->task->descriptorarray[i];
2687 pw=(struct parameterwrapper *) pd->queue;
2688 /* Check that object is still in queue */
2690 if (!ObjectHashcontainskey(pw->objectset, (int) parameter)) {
2692 raw_test_pass(0xe995);
2694 RUNFREE(currtpd->parameterArray);
2700 /* Check if the object's flags still meets requirements */
2704 for(tmpi = 0; tmpi < pw->numberofterms; ++tmpi) {
2705 andmask=pw->intarray[tmpi*2];
2706 checkmask=pw->intarray[tmpi*2+1];
2707 raw_test_pass(0xdd000000 + andmask);
2708 raw_test_pass_reg((int)parameter);
2709 raw_test_pass(0xdd000000 + ((struct ___Object___ *)parameter)->flag);
2710 raw_test_pass(0xdd000000 + checkmask);
2711 if((((struct ___Object___ *)parameter)->flag&andmask)==checkmask) {
2717 // flags are never suitable
2718 // remove this obj from the queue
2720 int UNUSED, UNUSED2;
2722 raw_test_pass(0xe996);
2723 ObjectHashget(pw->objectset, (int) parameter, (int *) &next, (int *) &enterflags, &UNUSED, &UNUSED2);
2724 ObjectHashremove(pw->objectset, (int)parameter);
2725 if (enterflags!=NULL)
2727 // release grabbed locks
2728 for(j = 0; j < i; ++j) {
2729 releasewritelock(taskpointerarray[j+OFFSET]);
2731 releasewritelock(parameter);
2732 RUNFREE(currtpd->parameterArray);
2740 /* Check that object still has necessary tags */
2741 for(j=0;j<pd->numbertags;j++) {
2742 int slotid=pd->tagarray[2*j]+numparams;
2743 struct ___TagDescriptor___ *tagd=currtpd->parameterArray[slotid];
2744 if (!containstag(parameter, tagd)) {
2746 raw_test_pass(0xe997);
2748 RUNFREE(currtpd->parameterArray);
2754 taskpointerarray[i+OFFSET]=parameter;
2757 for(;i<numtotal;i++) {
2758 taskpointerarray[i+OFFSET]=currtpd->parameterArray[i];
2761 #ifdef THREADSIMULATE
2762 for(i = 0; i < numparams; ++i) {
2763 if(0 == isolateflags[i]) {
2764 struct ___Object___ * tmpparam = (struct ___Object___ *)taskpointerarray[i+OFFSET];
2765 if(tmpparam != tmpparam->original) {
2766 taskpointerarray[i+OFFSET] = tmpparam->original;
2775 /* Checkpoint the state */
2776 forward=allocateRuntimeHash(100);
2777 reverse=allocateRuntimeHash(100);
2778 //void ** checkpoint=makecheckpoint(currtpd->task->numParameters, currtpd->parameterArray, forward, reverse);
2781 if (x=setjmp(error_handler)) {
2786 printf("Fatal Error=%d, Recovering!\n",x);
2790 genputtable(failedtasks,currtpd,currtpd);
2791 //restorecheckpoint(currtpd->task->numParameters, currtpd->parameterArray, checkpoint, forward, reverse);
2793 freeRuntimeHash(forward);
2794 freeRuntimeHash(reverse);
2801 raw_test_pass_reg(x);
2802 raw_test_done(0xa009);
2807 /*if (injectfailures) {
2808 if ((((double)random())/RAND_MAX)<failurechance) {
2809 printf("\nINJECTING TASK FAILURE to %s\n", currtpd->task->name);
2810 longjmp(error_handler,10);
2813 /* Actually call task */
2815 ((int *)taskpointerarray)[0]=currtpd->numParameters;
2816 taskpointerarray[1]=NULL;
2820 printf("ENTER %s count=%d\n",currtpd->task->name, (instaccum-instructioncount));
2822 ((void (*) (void **)) currtpd->task->taskptr)(taskpointerarray);
2824 printf("EXIT %s count=%d\n",currtpd->task->name, (instaccum-instructioncount));
2827 ((void (*) (void **)) currtpd->task->taskptr)(taskpointerarray);
2830 for(i = 0; i < numparams; ++i) {
2832 struct ___Object___ * tmpparam = (struct ___Object___ *)taskpointerarray[i+OFFSET];
2833 raw_test_pass(0xe998);
2834 raw_test_pass(0xdd100000 + tmpparam->flag);
2835 releasewritelock(tmpparam);
2837 #elif defined THREADSIMULATE
2838 for(i = 0; i < numparams; ++i) {
2839 if(0 == isolateflags[i]) {
2840 struct ___Object___ * tmpparam = (struct ___Object___ *)taskpointerarray[i+OFFSET];
2841 releasewritelock(tmpparam);
2848 freeRuntimeHash(forward);
2849 freeRuntimeHash(reverse);
2853 // Free up task parameter descriptor
2854 RUNFREE(currtpd->parameterArray);
2868 /* This function processes an objects tags */
2869 void processtags(struct parameterdescriptor *pd, int index, struct parameterwrapper *parameter, int * iteratorcount, int *statusarray, int numparams) {
2872 for(i=0;i<pd->numbertags;i++) {
2873 int slotid=pd->tagarray[2*i];
2874 int tagid=pd->tagarray[2*i+1];
2876 if (statusarray[slotid+numparams]==0) {
2877 parameter->iterators[*iteratorcount].istag=1;
2878 parameter->iterators[*iteratorcount].tagid=tagid;
2879 parameter->iterators[*iteratorcount].slot=slotid+numparams;
2880 parameter->iterators[*iteratorcount].tagobjectslot=index;
2881 statusarray[slotid+numparams]=1;
2888 void processobject(struct parameterwrapper *parameter, int index, struct parameterdescriptor *pd, int *iteratorcount, int * statusarray, int numparams) {
2891 struct ObjectHash * objectset=((struct parameterwrapper *)pd->queue)->objectset;
2893 parameter->iterators[*iteratorcount].istag=0;
2894 parameter->iterators[*iteratorcount].slot=index;
2895 parameter->iterators[*iteratorcount].objectset=objectset;
2896 statusarray[index]=1;
2898 for(i=0;i<pd->numbertags;i++) {
2899 int slotid=pd->tagarray[2*i];
2900 int tagid=pd->tagarray[2*i+1];
2901 if (statusarray[slotid+numparams]!=0) {
2902 /* This tag has already been enqueued, use it to narrow search */
2903 parameter->iterators[*iteratorcount].tagbindings[tagcount]=slotid+numparams;
2907 parameter->iterators[*iteratorcount].numtags=tagcount;
2912 /* This function builds the iterators for a task & parameter */
2914 void builditerators(struct taskdescriptor * task, int index, struct parameterwrapper * parameter) {
2915 int statusarray[MAXTASKPARAMS];
2917 int numparams=task->numParameters;
2918 int iteratorcount=0;
2919 for(i=0;i<MAXTASKPARAMS;i++) statusarray[i]=0;
2921 statusarray[index]=1; /* Initial parameter */
2922 /* Process tags for initial iterator */
2924 processtags(task->descriptorarray[index], index, parameter, & iteratorcount, statusarray, numparams);
2928 /* Check for objects with existing tags */
2929 for(i=0;i<numparams;i++) {
2930 if (statusarray[i]==0) {
2931 struct parameterdescriptor *pd=task->descriptorarray[i];
2933 for(j=0;j<pd->numbertags;j++) {
2934 int slotid=pd->tagarray[2*j];
2935 if(statusarray[slotid+numparams]!=0) {
2936 processobject(parameter, i, pd, &iteratorcount, statusarray, numparams);
2937 processtags(pd, i, parameter, &iteratorcount, statusarray, numparams);
2944 /* Next do objects w/ unbound tags*/
2946 for(i=0;i<numparams;i++) {
2947 if (statusarray[i]==0) {
2948 struct parameterdescriptor *pd=task->descriptorarray[i];
2949 if (pd->numbertags>0) {
2950 processobject(parameter, i, pd, &iteratorcount, statusarray, numparams);
2951 processtags(pd, i, parameter, &iteratorcount, statusarray, numparams);
2957 /* Nothing with a tag enqueued */
2959 for(i=0;i<numparams;i++) {
2960 if (statusarray[i]==0) {
2961 struct parameterdescriptor *pd=task->descriptorarray[i];
2962 processobject(parameter, i, pd, &iteratorcount, statusarray, numparams);
2963 processtags(pd, i, parameter, &iteratorcount, statusarray, numparams);
2976 #ifdef THREADSIMULATE
2977 int numofcore = pthread_getspecific(key);
2978 for(i=0;i<numtasks[numofcore];i++) {
2979 struct taskdescriptor * task=taskarray[numofcore][i];
2982 if(corenum > NUMCORES - 1) {
2986 for(i=0;i<numtasks[corenum];i++) {
2987 struct taskdescriptor * task=taskarray[corenum][i];
2990 printf("%s\n", task->name);
2992 for(j=0;j<task->numParameters;j++) {
2993 struct parameterdescriptor *param=task->descriptorarray[j];
2994 struct parameterwrapper *parameter=param->queue;
2995 struct ObjectHash * set=parameter->objectset;
2996 struct ObjectIterator objit;
2998 printf(" Parameter %d\n", j);
3000 ObjectHashiterator(set, &objit);
3001 while(ObjhasNext(&objit)) {
3002 struct ___Object___ * obj=(struct ___Object___ *)Objkey(&objit);
3003 struct ___Object___ * tagptr=obj->___tags___;
3004 int nonfailed=Objdata4(&objit);
3005 int numflags=Objdata3(&objit);
3006 int flags=Objdata2(&objit);
3009 printf(" Contains %lx\n", obj);
3010 printf(" flag=%d\n", obj->flag);
3013 } else if (tagptr->type==TAGTYPE) {
3015 printf(" tag=%lx\n",tagptr);
3020 struct ArrayObject *ao=(struct ArrayObject *)tagptr;
3021 for(;tagindex<ao->___cachedCode___;tagindex++) {
3023 printf(" tag=%lx\n",ARRAYGET(ao, struct ___TagDescriptor___*, tagindex));
3033 /* This function processes the task information to create queues for
3034 each parameter type. */
3036 void processtasks() {
3039 if(corenum > NUMCORES - 1) {
3043 #ifdef THREADSIMULATE
3044 int numofcore = pthread_getspecific(key);
3045 for(i=0;i<numtasks[numofcore];i++) {
3046 struct taskdescriptor *task=taskarray[numofcore][i];
3048 for(i=0;i<numtasks[corenum];i++) {
3049 struct taskdescriptor * task=taskarray[corenum][i];
3053 /* Build objectsets */
3054 for(j=0;j<task->numParameters;j++) {
3055 struct parameterdescriptor *param=task->descriptorarray[j];
3056 struct parameterwrapper *parameter=param->queue;
3057 parameter->objectset=allocateObjectHash(10);
3058 parameter->task=task;
3061 /* Build iterators for parameters */
3062 for(j=0;j<task->numParameters;j++) {
3063 struct parameterdescriptor *param=task->descriptorarray[j];
3064 struct parameterwrapper *parameter=param->queue;
3065 builditerators(task, j, parameter);
3070 void toiReset(struct tagobjectiterator * it) {
3073 } else if (it->numtags>0) {
3076 ObjectHashiterator(it->objectset, &it->it);
3080 int toiHasNext(struct tagobjectiterator *it, void ** objectarray OPTARG(int * failed)) {
3083 /* Get object with tags */
3084 struct ___Object___ *obj=objectarray[it->tagobjectslot];
3085 struct ___Object___ *tagptr=obj->___tags___;
3086 if (tagptr->type==TAGTYPE) {
3087 if ((it->tagobjindex==0)&& /* First object */
3088 (it->tagid==((struct ___TagDescriptor___ *)tagptr)->flag)) /* Right tag type */
3093 struct ArrayObject *ao=(struct ArrayObject *) tagptr;
3094 int tagindex=it->tagobjindex;
3095 for(;tagindex<ao->___cachedCode___;tagindex++) {
3096 struct ___TagDescriptor___ *td=ARRAYGET(ao, struct ___TagDescriptor___ *, tagindex);
3097 if (td->flag==it->tagid) {
3098 it->tagobjindex=tagindex; /* Found right type of tag */
3104 } else if (it->numtags>0) {
3105 /* Use tags to locate appropriate objects */
3106 struct ___TagDescriptor___ *tag=objectarray[it->tagbindings[0]];
3107 struct ___Object___ *objptr=tag->flagptr;
3109 if (objptr->type!=OBJECTARRAYTYPE) {
3110 if (it->tagobjindex>0)
3112 if (!ObjectHashcontainskey(it->objectset, (int) objptr))
3114 for(i=1;i<it->numtags;i++) {
3115 struct ___TagDescriptor___ *tag2=objectarray[it->tagbindings[i]];
3116 if (!containstag(objptr,tag2))
3121 struct ArrayObject *ao=(struct ArrayObject *) objptr;
3124 for(tagindex=it->tagobjindex;tagindex<ao->___cachedCode___;tagindex++) {
3125 struct ___Object___ *objptr=ARRAYGET(ao, struct ___Object___*, tagindex);
3126 if (!ObjectHashcontainskey(it->objectset, (int) objptr))
3128 for(i=1;i<it->numtags;i++) {
3129 struct ___TagDescriptor___ *tag2=objectarray[it->tagbindings[i]];
3130 if (!containstag(objptr,tag2))
3133 it->tagobjindex=tagindex;
3138 it->tagobjindex=tagindex;
3142 return ObjhasNext(&it->it);
3146 int containstag(struct ___Object___ *ptr, struct ___TagDescriptor___ *tag) {
3148 struct ___Object___ * objptr=tag->flagptr;
3149 if (objptr->type==OBJECTARRAYTYPE) {
3150 struct ArrayObject *ao=(struct ArrayObject *)objptr;
3151 for(j=0;j<ao->___cachedCode___;j++) {
3152 if (ptr==ARRAYGET(ao, struct ___Object___*, j))
3160 void toiNext(struct tagobjectiterator *it , void ** objectarray OPTARG(int * failed)) {
3161 /* hasNext has all of the intelligence */
3164 /* Get object with tags */
3165 struct ___Object___ *obj=objectarray[it->tagobjectslot];
3166 struct ___Object___ *tagptr=obj->___tags___;
3167 if (tagptr->type==TAGTYPE) {
3169 objectarray[it->slot]=tagptr;
3171 struct ArrayObject *ao=(struct ArrayObject *) tagptr;
3172 objectarray[it->slot]=ARRAYGET(ao, struct ___TagDescriptor___ *, it->tagobjindex++);
3174 } else if (it->numtags>0) {
3175 /* Use tags to locate appropriate objects */
3176 struct ___TagDescriptor___ *tag=objectarray[it->tagbindings[0]];
3177 struct ___Object___ *objptr=tag->flagptr;
3178 if (objptr->type!=OBJECTARRAYTYPE) {
3180 objectarray[it->slot]=objptr;
3182 struct ArrayObject *ao=(struct ArrayObject *) objptr;
3183 objectarray[it->slot]=ARRAYGET(ao, struct ___Object___ *, it->tagobjindex++);
3186 /* Iterate object */
3187 objectarray[it->slot]=(void *)Objkey(&it->it);