4 #include "structdefs.h"
6 #include "checkpoint.h"
8 #include "SimpleHash.h"
9 #include "GenericHashtable.h"
10 #include <sys/select.h>
11 #include <sys/types.h>
20 #include <raw_compiler_defs.h>
22 #elif defined THREADSIMULATE
23 // use POSIX message queue
24 // for each core, its message queue named as
30 extern int injectfailures;
31 extern float failurechance;
37 #define TOTALCORE raw_get_num_tiles()
41 #include "instrument.h"
44 struct genhashtable * activetasks;
45 struct genhashtable * failedtasks;
46 struct taskparamdescriptor * currtpd;
48 struct RuntimeHash * forward;
49 struct RuntimeHash * reverse;
52 int corestatus[NUMCORES]; // records status of each core
55 int numsendobjs[NUMCORES]; // records how many objects a core has sent out
56 int numreceiveobjs[NUMCORES]; // records how many objects a core has received
58 struct RuntimeHash locktable;
59 static struct RuntimeHash* locktbl = &locktable;
60 void * curr_heapbase=0;
61 void * curr_heaptop=0;
63 int self_numreceiveobjs;
70 struct Queue objqueue;
75 void calCoords(int core_num, int* coordY, int* coordX);
77 #elif defined THREADSIMULATE
78 static struct RuntimeHash* locktbl;
86 struct thread_data thread_data_array[NUMCORES];
88 static pthread_key_t key;
89 static pthread_rwlock_t rwlock_tbl;
90 static pthread_rwlock_t rwlock_init;
95 bool transStallMsg(int targetcore);
96 void transTerminateMsg(int targetcore);
98 bool getreadlock(void* ptr);
99 void releasereadlock(void* ptr);
101 bool getreadlock_I(void* ptr);
102 void releasereadlock_I(void* ptr);
104 bool getwritelock(void* ptr);
105 void releasewritelock(void* ptr);
110 void flushAll(void) {
113 raw_user_interrupts_off();
115 raw_test_pass(0xec00);
116 for(i = 0; i < 512; ++i) {
119 flushCacheline(base);
120 flushCacheline(base|off1);
123 raw_user_interrupts_on();
125 raw_test_pass(0xec02);
129 raw_test_pass(0xefee);
130 raw_user_interrupts_off();
131 raw_test_pass(0xef00);
133 raw_test_pass(0xefff);
134 raw_user_interrupts_on();
135 raw_test_pass(0xefef);
140 int main(int argc, char **argv) {
146 bool sendStall = false;
148 bool tocontinue = false;
149 struct QueueItem * objitem = NULL;
150 struct transObjInfo * objInfo = NULL;
152 bool allStall = true;
155 raw_test_pass_reg(&locktable);
156 raw_test_pass_reg(&msglength);
157 raw_test_pass_reg(&i);
158 raw_test_pass(0xee01);
159 corenum = raw_get_abs_pos_x() + 4 * raw_get_abs_pos_y();
161 // initialize the arrays
162 if(STARTUPCORE == corenum) {
163 // startup core to initialize corestatus[]
164 for(i = 0; i < NUMCORES; ++i) {
166 numsendobjs[i] = 0; // assume all variables in RAW are local variables! MAY BE WRONG!!!
167 numreceiveobjs[i] = 0;
170 self_numsendobjs = 0;
171 self_numreceiveobjs = 0;
172 for(i = 0; i < 30; ++i) {
179 raw_test_pass(0xee02);
181 // create the lock table, lockresult table and obj queue
183 locktable.bucket = (struct RuntimeNode **) RUNMALLOC_I(sizeof(struct RuntimeNode *)*20);
184 /* Set allocation blocks*/
185 locktable.listhead=NULL;
186 locktable.listtail=NULL;
188 locktable.numelements = 0;
195 objqueue.head = NULL;
196 objqueue.tail = NULL;
197 raw_test_pass(0xee03);
200 if (corenum < NUMCORES) {
203 start_gdn_avail_ints(recvMsg);
204 raw_user_interrupts_on();
205 raw_test_pass(0xee04);
209 #elif defined THREADSIMULATE
213 pthread_t threads[NUMCORES];
216 // initialize three arrays and msg queue array
217 char * pathhead = "/msgqueue_";
218 int targetlen = strlen(pathhead);
219 for(i = 0; i < NUMCORES; ++i) {
222 numreceiveobjs[i] = 0;
227 corenumstr[0] = i + '0';
228 corenumstr[1] = '\0';
231 corenumstr[1] = i %10 + '0';
232 corenumstr[0] = (i / 10) + '0';
233 corenumstr[2] = '\0';
236 printf("Error: i >= 100\n");
240 char path[targetlen + sourcelen + 1];
241 strcpy(path, pathhead);
242 strncat(path, corenumstr, sourcelen);
243 int oflags = O_RDONLY|O_CREAT|O_NONBLOCK;
244 int omodes = S_IRWXU|S_IRWXG|S_IRWXO;
246 mqd[i]= mq_open(path, oflags, omodes, NULL);
248 printf("[Main] mq_open %s fails: %d, error: %s\n", path, mqd[i], strerror(errno));
251 printf("[Main] mq_open %s returns: %d\n", path, mqd[i]);
256 pthread_key_create(&key, NULL);
258 // create the lock table and initialize its mutex
259 locktbl = allocateRuntimeHash(20);
260 int rc_locktbl = pthread_rwlock_init(&rwlock_tbl, NULL);
261 printf("[Main] initialize the rwlock for lock table: %d error: \n", rc_locktbl, strerror(rc_locktbl));
263 for(i = 0; i < NUMCORES; ++i) {
264 thread_data_array[i].corenum = i;
265 thread_data_array[i].argc = argc;
266 thread_data_array[i].argv = argv;
267 thread_data_array[i].numsendobjs = 0;
268 thread_data_array[i].numreceiveobjs = 0;
269 printf("[main] creating thread %d\n", i);
270 rc[i] = pthread_create(&threads[i], NULL, run, (void *)&thread_data_array[i]);
272 printf("[main] ERROR; return code from pthread_create() is %d\n", rc[i]);
281 void run(void* arg) {
282 struct thread_data * my_tdata = (struct thread_data *)arg;
283 pthread_setspecific(key, (void *)my_tdata->corenum);
284 int argc = my_tdata->argc;
285 char** argv = my_tdata->argv;
286 printf("[run, %d] Thread %d runs: %x\n", my_tdata->corenum, my_tdata->corenum, (int)pthread_self());
292 GC_init(); // Initialize the garbage collector
300 initializeexithandler();
302 raw_test_pass(0xee05);
304 /* Create table for failed tasks */
306 if(corenum > NUMCORES - 1) {
313 raw_test_pass(0xee06);
315 /*failedtasks=genallocatehashtable((unsigned int (*)(void *)) &hashCodetpd,
316 (int (*)(void *,void *)) &comparetpd);*/
319 raw_test_pass(0xee07);
321 /* Create queue of active tasks */
322 activetasks=genallocatehashtable((unsigned int (*)(void *)) &hashCodetpd,
323 (int (*)(void *,void *)) &comparetpd);
325 raw_test_pass(0xee08);
328 /* Process task information */
331 raw_test_pass(0xee09);
334 /* Create startup object */
335 createstartupobject(argc, argv);
337 raw_test_pass(0xee0a);
341 raw_test_pass(0xee0b);
345 while(receiveObject() != -1) {
349 // check if there are new active tasks can be executed
353 while(receiveObject() != -1) {
357 raw_test_pass(0xee0c);
359 // check if there are some pending objects, if yes, enqueue them and executetasks again
361 raw_test_pass(0xee0d);
362 while(!isEmpty(&objqueue)) {
365 raw_user_interrupts_off();
367 raw_test_pass(0xeee1);
370 objitem = getTail(&objqueue);
371 //obj = objitem->objectptr;
372 objInfo = (struct transObjInfo *)objitem->objectptr;
373 obj = objInfo->objptr;
374 raw_test_pass_reg((int)obj);
375 // grab lock and flush the obj
381 raw_test_pass_reg(grount);
393 for(k = 0; k < classsize[((struct ___Object___ *)obj)->type]; ++k) {
394 invalidateAddr(obj + k);
396 // enqueue the object
397 for(k = 0; k < objInfo->length; ++k) {
398 int taskindex = objInfo->queues[2 * k];
399 int paramindex = objInfo->queues[2 * k + 1];
400 struct parameterwrapper ** queues = &(paramqueues[corenum][taskindex][paramindex]);
401 raw_test_pass_reg(taskindex);
402 raw_test_pass_reg(paramindex);
403 enqueueObject_I(obj, queues, 1);
405 removeItem(&objqueue, objitem);
406 releasereadlock_I(obj);
407 RUNFREE(objInfo->queues);
409 /*enqueueObject_I(obj, NULL, 0);
410 removeItem(&objqueue, objitem);
411 releasereadlock_I(obj);*/
414 // put it at the end of the queue
415 // and try to execute active tasks already enqueued first
416 removeItem(&objqueue, objitem);
417 addNewItem_I(&objqueue, objInfo);
421 raw_user_interrupts_on();
423 raw_test_pass(0xee0e);
428 if(STARTUPCORE == corenum) {
430 raw_test_pass(0xee0f);
434 raw_user_interrupts_off();
436 corestatus[corenum] = 0;
437 numsendobjs[corenum] = self_numsendobjs;
438 numreceiveobjs[corenum] = self_numreceiveobjs;
439 // check the status of all cores
441 raw_test_pass_reg(NUMCORES);
442 for(i = 0; i < NUMCORES; ++i) {
443 raw_test_pass(0xe000 + corestatus[i]);
444 if(corestatus[i] != 0) {
450 // check if the sum of send objs and receive obj are the same
452 // no->go on executing
454 for(i = 0; i < NUMCORES; ++i) {
455 sumsendobj += numsendobjs[i];
456 raw_test_pass(0xf000 + numsendobjs[i]);
458 for(i = 0; i < NUMCORES; ++i) {
459 sumsendobj -= numreceiveobjs[i];
460 raw_test_pass(0xf000 + numreceiveobjs[i]);
462 if(0 == sumsendobj) {
464 raw_test_pass(0xee10);
465 raw_test_done(1); // All done.
469 raw_user_interrupts_on();
473 raw_test_pass(0xee11);
475 // wait for some time
477 raw_test_pass(0xee12);
480 raw_test_pass(0xee13);
482 // send StallMsg to startup core
483 raw_test_pass(0xee14);
484 sendStall = transStallMsg(STARTUPCORE);
489 raw_test_pass(0xee15);
495 #elif defined THREADSIMULATE
496 /* Start executing the tasks */
500 // check if there are new objects coming
501 bool sendStall = false;
503 int numofcore = pthread_getspecific(key);
505 switch(receiveObject()) {
507 printf("[run, %d] receive an object\n", numofcore);
509 // received an object
510 // check if there are new active tasks can be executed
515 //printf("[run, %d] no msg\n", numofcore);
517 if(STARTUPCORE == numofcore) {
518 corestatus[numofcore] = 0;
519 // check the status of all cores
520 bool allStall = true;
521 for(i = 0; i < NUMCORES; ++i) {
522 if(corestatus[i] != 0) {
528 // check if the sum of send objs and receive obj are the same
530 // no->go on executing
532 for(i = 0; i < NUMCORES; ++i) {
533 sumsendobj += numsendobjs[i];
535 for(i = 0; i < NUMCORES; ++i) {
536 sumsendobj -= numreceiveobjs[i];
538 if(0 == sumsendobj) {
542 int rc_tbl = pthread_rwlock_wrlock(&rwlock_tbl);
543 printf("[run, %d] getting the write lock for locktbl: %d error: \n", numofcore, rc_tbl, strerror(rc_tbl));
544 struct RuntimeIterator* it_lock = RuntimeHashcreateiterator(locktbl);
545 while(0 != RunhasNext(it_lock)) {
546 int key = Runkey(it_lock);
547 pthread_rwlock_t* rwlock_obj = (pthread_rwlock_t*)Runnext(it_lock);
548 int rc_des = pthread_rwlock_destroy(rwlock_obj);
549 printf("[run, %d] destroy the rwlock for object: %d error: \n", numofcore, key, strerror(rc_des));
552 freeRuntimeHash(locktbl);
556 // destroy all message queues
557 char * pathhead = "/msgqueue_";
558 int targetlen = strlen(pathhead);
559 for(i = 0; i < NUMCORES; ++i) {
563 corenumstr[0] = i + '0';
564 corenumstr[1] = '\0';
567 corenumstr[1] = i %10 + '0';
568 corenumstr[0] = (i / 10) + '0';
569 corenumstr[2] = '\0';
572 printf("Error: i >= 100\n");
576 char path[targetlen + sourcelen + 1];
577 strcpy(path, pathhead);
578 strncat(path, corenumstr, sourcelen);
582 printf("[run, %d] terminate!\n", numofcore);
589 // send StallMsg to startup core
590 sendStall = transStallMsg(STARTUPCORE);
596 printf("[run, %d] receive a stall msg\n", numofcore);
597 // receive a Stall Msg, do nothing
598 assert(STARTUPCORE == numofcore); // only startup core can receive such msg
603 printf("[run, %d] receive a terminate msg\n", numofcore);
604 // receive a terminate Msg
605 assert(STARTUPCORE != corenum); // only non-startup core can receive such msg
606 mq_close(mqd[corenum]);
612 printf("[run, %d] Error: invalid message type.\n", numofcore);
622 void createstartupobject(int argc, char ** argv) {
625 /* Allocate startup object */
627 struct ___StartupObject___ *startupobject=(struct ___StartupObject___*) allocate_new(NULL, STARTUPTYPE);
628 struct ArrayObject * stringarray=allocate_newarray(NULL, STRINGARRAYTYPE, argc-1);
630 struct ___StartupObject___ *startupobject=(struct ___StartupObject___*) allocate_new(STARTUPTYPE);
631 struct ArrayObject * stringarray=allocate_newarray(STRINGARRAYTYPE, argc-1);
633 /* Build array of strings */
634 startupobject->___parameters___=stringarray;
635 for(i=1;i<argc;i++) {
636 int length=strlen(argv[i]);
638 struct ___String___ *newstring=NewString(NULL, argv[i],length);
640 struct ___String___ *newstring=NewString(argv[i],length);
642 ((void **)(((char *)& stringarray->___length___)+sizeof(int)))[i-1]=newstring;
645 startupobject->isolate = 1;
646 startupobject->version = 0;
648 /* Set initialized flag for startup object */
649 flagorandinit(startupobject,1,0xFFFFFFFF);
650 enqueueObject(startupobject, NULL, 0);
656 int hashCodetpd(struct taskparamdescriptor *ftd) {
657 int hash=(int)ftd->task;
659 for(i=0;i<ftd->numParameters;i++){
660 hash^=(int)ftd->parameterArray[i];
665 int comparetpd(struct taskparamdescriptor *ftd1, struct taskparamdescriptor *ftd2) {
667 if (ftd1->task!=ftd2->task)
669 for(i=0;i<ftd1->numParameters;i++)
670 if(ftd1->parameterArray[i]!=ftd2->parameterArray[i])
675 /* This function sets a tag. */
677 void tagset(void *ptr, struct ___Object___ * obj, struct ___TagDescriptor___ * tagd) {
679 void tagset(struct ___Object___ * obj, struct ___TagDescriptor___ * tagd) {
681 struct ArrayObject * ao=NULL;
682 struct ___Object___ * tagptr=obj->___tags___;
684 raw_test_pass(0xebb0);
688 raw_test_pass(0xebb1);
690 obj->___tags___=(struct ___Object___ *)tagd;
692 /* Have to check if it is already set */
693 if (tagptr->type==TAGTYPE) {
694 struct ___TagDescriptor___ * td=(struct ___TagDescriptor___ *) tagptr;
696 raw_test_pass(0xebb2);
700 raw_test_pass(0xebb3);
705 int ptrarray[]={2, (int) ptr, (int) obj, (int)tagd};
706 struct ArrayObject * ao=allocate_newarray(&ptrarray,TAGARRAYTYPE,TAGARRAYINTERVAL);
707 obj=(struct ___Object___ *)ptrarray[2];
708 tagd=(struct ___TagDescriptor___ *)ptrarray[3];
709 td=(struct ___TagDescriptor___ *) obj->___tags___;
712 raw_test_pass(0xebb4);
714 ao=allocate_newarray(TAGARRAYTYPE,TAGARRAYINTERVAL);
717 raw_test_pass(0xebb5);
719 ARRAYSET(ao, struct ___TagDescriptor___ *, 0, td);
720 ARRAYSET(ao, struct ___TagDescriptor___ *, 1, tagd);
721 obj->___tags___=(struct ___Object___ *) ao;
722 ao->___cachedCode___=2;
724 raw_test_pass(0xebb6);
729 struct ArrayObject *ao=(struct ArrayObject *) tagptr;
731 raw_test_pass(0xebb7);
733 for(i=0;i<ao->___cachedCode___;i++) {
734 struct ___TagDescriptor___ * td=ARRAYGET(ao, struct ___TagDescriptor___*, i);
736 raw_test_pass(0xebb8);
740 raw_test_pass(0xebb9);
745 if (ao->___cachedCode___<ao->___length___) {
747 raw_test_pass(0xebba);
749 ARRAYSET(ao, struct ___TagDescriptor___ *, ao->___cachedCode___, tagd);
750 ao->___cachedCode___++;
752 raw_test_pass(0xebbb);
756 int ptrarray[]={2,(int) ptr, (int) obj, (int) tagd};
757 struct ArrayObject * aonew=allocate_newarray(&ptrarray,TAGARRAYTYPE,TAGARRAYINTERVAL+ao->___length___);
758 obj=(struct ___Object___ *)ptrarray[2];
759 tagd=(struct ___TagDescriptor___ *) ptrarray[3];
760 ao=(struct ArrayObject *)obj->___tags___;
762 struct ArrayObject * aonew=allocate_newarray(TAGARRAYTYPE,TAGARRAYINTERVAL+ao->___length___);
765 raw_test_pass(0xebbc);
767 aonew->___cachedCode___=ao->___length___+1;
768 for(i=0;i<ao->___length___;i++) {
770 raw_test_pass(0xebbd);
772 ARRAYSET(aonew, struct ___TagDescriptor___*, i, ARRAYGET(ao, struct ___TagDescriptor___*, i));
775 raw_test_pass(0xebbe);
777 ARRAYSET(aonew, struct ___TagDescriptor___ *, ao->___length___, tagd);
779 raw_test_pass(0xebbf);
786 struct ___Object___ * tagset=tagd->flagptr;
788 raw_test_pass(0xb008);
792 raw_test_pass(0xb009);
795 } else if (tagset->type!=OBJECTARRAYTYPE) {
797 int ptrarray[]={2, (int) ptr, (int) obj, (int)tagd};
798 struct ArrayObject * ao=allocate_newarray(&ptrarray,OBJECTARRAYTYPE,OBJECTARRAYINTERVAL);
799 obj=(struct ___Object___ *)ptrarray[2];
800 tagd=(struct ___TagDescriptor___ *)ptrarray[3];
802 struct ArrayObject * ao=allocate_newarray(OBJECTARRAYTYPE,OBJECTARRAYINTERVAL);
804 ARRAYSET(ao, struct ___Object___ *, 0, tagd->flagptr);
805 ARRAYSET(ao, struct ___Object___ *, 1, obj);
806 ao->___cachedCode___=2;
807 tagd->flagptr=(struct ___Object___ *)ao;
809 raw_test_pass(0xb00a);
812 struct ArrayObject *ao=(struct ArrayObject *) tagset;
813 if (ao->___cachedCode___<ao->___length___) {
815 raw_test_pass(0xb00b);
817 ARRAYSET(ao, struct ___Object___*, ao->___cachedCode___++, obj);
821 int ptrarray[]={2, (int) ptr, (int) obj, (int)tagd};
822 struct ArrayObject * aonew=allocate_newarray(&ptrarray,OBJECTARRAYTYPE,OBJECTARRAYINTERVAL+ao->___length___);
823 obj=(struct ___Object___ *)ptrarray[2];
824 tagd=(struct ___TagDescriptor___ *)ptrarray[3];
825 ao=(struct ArrayObject *)tagd->flagptr;
827 struct ArrayObject * aonew=allocate_newarray(OBJECTARRAYTYPE,OBJECTARRAYINTERVAL);
829 aonew->___cachedCode___=ao->___cachedCode___+1;
830 for(i=0;i<ao->___length___;i++) {
831 ARRAYSET(aonew, struct ___Object___*, i, ARRAYGET(ao, struct ___Object___*, i));
833 ARRAYSET(aonew, struct ___Object___ *, ao->___cachedCode___, obj);
834 tagd->flagptr=(struct ___Object___ *) aonew;
836 raw_test_pass(0xb00c);
843 /* This function clears a tag. */
845 void tagclear(void *ptr, struct ___Object___ * obj, struct ___TagDescriptor___ * tagd) {
847 void tagclear(struct ___Object___ * obj, struct ___TagDescriptor___ * tagd) {
849 /* We'll assume that tag is alway there.
850 Need to statically check for this of course. */
851 struct ___Object___ * tagptr=obj->___tags___;
853 if (tagptr->type==TAGTYPE) {
854 if ((struct ___TagDescriptor___ *)tagptr==tagd)
855 obj->___tags___=NULL;
858 printf("ERROR 1 in tagclear\n");
862 struct ArrayObject *ao=(struct ArrayObject *) tagptr;
864 for(i=0;i<ao->___cachedCode___;i++) {
865 struct ___TagDescriptor___ * td=ARRAYGET(ao, struct ___TagDescriptor___ *, i);
867 ao->___cachedCode___--;
868 if (i<ao->___cachedCode___)
869 ARRAYSET(ao, struct ___TagDescriptor___ *, i, ARRAYGET(ao, struct ___TagDescriptor___ *, ao->___cachedCode___));
870 ARRAYSET(ao, struct ___TagDescriptor___ *, ao->___cachedCode___, NULL);
871 if (ao->___cachedCode___==0)
872 obj->___tags___=NULL;
877 printf("ERROR 2 in tagclear\n");
883 struct ___Object___ *tagset=tagd->flagptr;
884 if (tagset->type!=OBJECTARRAYTYPE) {
889 printf("ERROR 3 in tagclear\n");
893 struct ArrayObject *ao=(struct ArrayObject *) tagset;
895 for(i=0;i<ao->___cachedCode___;i++) {
896 struct ___Object___ * tobj=ARRAYGET(ao, struct ___Object___ *, i);
898 ao->___cachedCode___--;
899 if (i<ao->___cachedCode___)
900 ARRAYSET(ao, struct ___Object___ *, i, ARRAYGET(ao, struct ___Object___ *, ao->___cachedCode___));
901 ARRAYSET(ao, struct ___Object___ *, ao->___cachedCode___, NULL);
902 if (ao->___cachedCode___==0)
908 printf("ERROR 4 in tagclear\n");
916 /* This function allocates a new tag. */
918 struct ___TagDescriptor___ * allocate_tag(void *ptr, int index) {
919 struct ___TagDescriptor___ * v=(struct ___TagDescriptor___ *) mygcmalloc((struct garbagelist *) ptr, classsize[TAGTYPE]);
921 struct ___TagDescriptor___ * allocate_tag(int index) {
922 struct ___TagDescriptor___ * v=FREEMALLOC(classsize[TAGTYPE]);
931 /* This function updates the flag for object ptr. It or's the flag
932 with the or mask and and's it with the andmask. */
934 void flagbody(struct ___Object___ *ptr, int flag, struct parameterwrapper ** queues, int length, bool isnew);
936 int flagcomp(const int *val1, const int *val2) {
937 return (*val1)-(*val2);
940 void flagorand(void * ptr, int ormask, int andmask, struct parameterwrapper ** queues, int length) {
942 int oldflag=((int *)ptr)[1];
943 int flag=ormask|oldflag;
946 raw_test_pass(0xaa000000 + oldflag);
947 raw_test_pass(0xaa000000 + flag);
949 flagbody(ptr, flag, queues, length, false);
953 bool intflagorand(void * ptr, int ormask, int andmask) {
955 int oldflag=((int *)ptr)[1];
956 int flag=ormask|oldflag;
958 if (flag==oldflag) /* Don't do anything */
961 flagbody(ptr, flag, NULL, 0, false);
967 void flagorandinit(void * ptr, int ormask, int andmask) {
968 int oldflag=((int *)ptr)[1];
969 int flag=ormask|oldflag;
972 raw_test_pass(0xaa100000 + oldflag);
973 raw_test_pass(0xaa100000 + flag);
975 flagbody(ptr,flag,NULL,0,true);
978 void flagbody(struct ___Object___ *ptr, int flag, struct parameterwrapper ** vqueues, int vlength, bool isnew) {
979 struct parameterwrapper * flagptr = NULL;
981 struct parameterwrapper ** queues = vqueues;
982 int length = vlength;
985 int * enterflags = NULL;
986 if((!isnew) && (queues == NULL)) {
987 #ifdef THREADSIMULATE
988 int numofcore = pthread_getspecific(key);
989 queues = objectqueues[numofcore][ptr->type];
990 length = numqueues[numofcore][ptr->type];
993 if(corenum < NUMCORES) {
995 queues = objectqueues[corenum][ptr->type];
996 length = numqueues[corenum][ptr->type];
1006 raw_test_pass(0xbb000000 + ptr->flag);
1009 /*Remove object from all queues */
1010 for(i = 0; i < length; ++i) {
1011 flagptr = queues[i];
1012 ObjectHashget(flagptr->objectset, (int) ptr, (int *) &next, (int *) &enterflags, &UNUSED, &UNUSED2);
1013 ObjectHashremove(flagptr->objectset, (int)ptr);
1014 if (enterflags!=NULL)
1015 RUNFREE(enterflags);
1019 void enqueueObject(void * vptr, struct parameterwrapper ** vqueues, int vlength) {
1020 struct ___Object___ *ptr = (struct ___Object___ *)vptr;
1023 struct QueueItem *tmpptr;
1024 struct parameterwrapper * parameter=NULL;
1027 struct parameterwrapper * prevptr=NULL;
1028 struct ___Object___ *tagptr=NULL;
1029 struct parameterwrapper ** queues = vqueues;
1030 int length = vlength;
1032 if(corenum > NUMCORES - 1) {
1036 if(queues == NULL) {
1037 #ifdef THREADSIMULATE
1038 int numofcore = pthread_getspecific(key);
1039 queues = objectqueues[numofcore][ptr->type];
1040 length = numqueues[numofcore][ptr->type];
1042 queues = objectqueues[corenum][ptr->type];
1043 length = numqueues[corenum][ptr->type];
1046 tagptr=ptr->___tags___;
1048 /* Outer loop iterates through all parameter queues an object of
1049 this type could be in. */
1050 for(j = 0; j < length; ++j) {
1051 parameter = queues[j];
1053 if (parameter->numbertags>0) {
1055 goto nextloop;//that means the object has no tag but that param needs tag
1056 else if(tagptr->type==TAGTYPE) {//one tag
1057 struct ___TagDescriptor___ * tag=(struct ___TagDescriptor___*) tagptr;
1058 for(i=0;i<parameter->numbertags;i++) {
1059 //slotid is parameter->tagarray[2*i];
1060 int tagid=parameter->tagarray[2*i+1];
1061 if (tagid!=tagptr->flag)
1062 goto nextloop; /*We don't have this tag */
1064 } else {//multiple tags
1065 struct ArrayObject * ao=(struct ArrayObject *) tagptr;
1066 for(i=0;i<parameter->numbertags;i++) {
1067 //slotid is parameter->tagarray[2*i];
1068 int tagid=parameter->tagarray[2*i+1];
1070 for(j=0;j<ao->___cachedCode___;j++) {
1071 if (tagid==ARRAYGET(ao, struct ___TagDescriptor___*, j)->flag)
1082 for(i=0;i<parameter->numberofterms;i++) {
1083 int andmask=parameter->intarray[i*2];
1084 int checkmask=parameter->intarray[i*2+1];
1085 if ((ptr->flag&andmask)==checkmask) {
1087 raw_test_pass(0xcc000000 + andmask);
1088 raw_test_pass_reg((int)ptr);
1089 raw_test_pass(0xcc000000 + ptr->flag);
1090 raw_test_pass(0xcc000000 + checkmask);
1092 enqueuetasks(parameter, prevptr, ptr, NULL, 0);
1104 void enqueueObject_I(void * vptr, struct parameterwrapper ** vqueues, int vlength) {
1105 struct ___Object___ *ptr = (struct ___Object___ *)vptr;
1108 struct QueueItem *tmpptr;
1109 struct parameterwrapper * parameter=NULL;
1112 struct parameterwrapper * prevptr=NULL;
1113 struct ___Object___ *tagptr=NULL;
1114 struct parameterwrapper ** queues = vqueues;
1115 int length = vlength;
1117 if(corenum > NUMCORES - 1) {
1121 if(queues == NULL) {
1122 #ifdef THREADSIMULATE
1123 int numofcore = pthread_getspecific(key);
1124 queues = objectqueues[numofcore][ptr->type];
1125 length = numqueues[numofcore][ptr->type];
1127 queues = objectqueues[corenum][ptr->type];
1128 length = numqueues[corenum][ptr->type];
1132 raw_test_pass(0xeaa1);
1133 raw_test_pass_reg(queues);
1134 raw_test_pass_reg(length);
1136 tagptr=ptr->___tags___;
1138 /* Outer loop iterates through all parameter queues an object of
1139 this type could be in. */
1140 for(j = 0; j < length; ++j) {
1141 parameter = queues[j];
1143 if (parameter->numbertags>0) {
1145 raw_test_pass(0xeaa2);
1146 raw_test_pass_reg(tagptr);
1149 goto nextloop;//that means the object has no tag but that param needs tag
1150 else if(tagptr->type==TAGTYPE) {//one tag
1151 struct ___TagDescriptor___ * tag=(struct ___TagDescriptor___*) tagptr;
1153 raw_test_pass(0xeaa3);
1155 for(i=0;i<parameter->numbertags;i++) {
1156 //slotid is parameter->tagarray[2*i];
1157 int tagid=parameter->tagarray[2*i+1];
1158 if (tagid!=tagptr->flag) {
1160 raw_test_pass(0xeaa4);
1162 goto nextloop; /*We don't have this tag */
1165 } else {//multiple tags
1166 struct ArrayObject * ao=(struct ArrayObject *) tagptr;
1168 raw_test_pass(0xeaa5);
1170 for(i=0;i<parameter->numbertags;i++) {
1171 //slotid is parameter->tagarray[2*i];
1172 int tagid=parameter->tagarray[2*i+1];
1174 for(j=0;j<ao->___cachedCode___;j++) {
1175 if (tagid==ARRAYGET(ao, struct ___TagDescriptor___*, j)->flag) {
1180 raw_test_pass(0xeaa6);
1190 for(i=0;i<parameter->numberofterms;i++) {
1191 int andmask=parameter->intarray[i*2];
1192 int checkmask=parameter->intarray[i*2+1];
1194 raw_test_pass(0xeaa7);
1195 raw_test_pass(0xcc000000 + andmask);
1196 raw_test_pass_reg(ptr);
1197 raw_test_pass(0xcc000000 + ptr->flag);
1198 raw_test_pass(0xcc000000 + checkmask);
1200 if ((ptr->flag&andmask)==checkmask) {
1202 raw_test_pass(0xeaa8);
1204 enqueuetasks_I(parameter, prevptr, ptr, NULL, 0);
1215 // helper function to compute the coordinates of a core from the core number
1216 void calCoords(int core_num, int* coordY, int* coordX) {
1217 *coordX = core_num % 4;
1218 *coordY = core_num / 4;
1222 /* Message format for RAW version:
1224 * type: 0 -- transfer object
1225 * 1 -- transfer stall msg
1231 * ObjMsg: 0 + size of msg + obj's address + (task index + param index)+
1232 * StallMsg: 1 + corenum + sendobjs + receiveobjs (size is always 4 * sizeof(int))
1233 * LockMsg: 2 + lock type + obj pointer + request core (size is always 4 * sizeof(int))
1234 * 3/4/5 + lock type + obj pointer (size is always 3 * sizeof(int))
1235 * lock type: 0 -- read; 1 -- write
1238 // transfer an object to targetcore
1240 void transferObject(struct transObjInfo * transObj) {
1241 void * obj = transObj->objptr;
1242 int type=((int *)obj)[0];
1243 int size=classsize[type];
1244 int targetcore = transObj->targetcore;
1245 //assert(type < NUMCLASSES); // can only transfer normal object
1249 int self_y, self_x, target_y, target_x;
1251 // for 32 bit machine, the size of fixed part is always 3 words
1252 //int msgsize = sizeof(int) * 2 + sizeof(void *);
1253 int msgsize = 3 + transObj->length * 2;
1256 struct ___Object___ * newobj = (struct ___Object___ *)obj;
1257 /*if(0 == newobj->isolate) {
1261 calCoords(corenum, &self_y, &self_x);
1262 calCoords(targetcore, &target_y, &target_x);
1263 // Build the message header
1264 msgHdr = construct_dyn_hdr(0, msgsize, 0, // msgsize word sent.
1266 target_y, target_x);
1267 gdn_send(msgHdr); // Send the message header to EAST to handle fab(n - 1).
1268 raw_test_pass(0xbbbb);
1269 raw_test_pass(0xb000 + targetcore); // targetcore
1273 raw_test_pass_reg(msgsize);
1275 raw_test_pass_reg(obj);
1276 //gdn_send(isshared);
1277 //raw_test_pass_reg(isshared);
1278 for(i = 0; i < transObj->length; ++i) {
1279 int taskindex = transObj->queues[2*i];
1280 int paramindex = transObj->queues[2*i+1];
1281 gdn_send(taskindex);
1282 raw_test_pass_reg(taskindex);
1283 gdn_send(paramindex);
1284 raw_test_pass_reg(paramindex);
1286 raw_test_pass(0xffff);
1287 ++(self_numsendobjs);
1288 #elif defined THREADSIMULATE
1289 int numofcore = pthread_getspecific(key);
1291 // use POSIX message queue to transfer objects between cores
1295 if(targetcore < 10) {
1296 corenumstr[0] = targetcore + '0';
1297 corenumstr[1] = '\0';
1299 } else if(targetcore < 100) {
1300 corenumstr[1] = targetcore % 10 + '0';
1301 corenumstr[0] = (targetcore / 10) + '0';
1302 corenumstr[2] = '\0';
1305 printf("Error: targetcore >= 100\n");
1309 char * pathhead = "/msgqueue_";
1310 int targetlen = strlen(pathhead);
1311 char path[targetlen + sourcelen + 1];
1312 strcpy(path, pathhead);
1313 strncat(path, corenumstr, sourcelen);
1314 int oflags = O_WRONLY|O_NONBLOCK;
1315 int omodes = S_IRWXU|S_IRWXG|S_IRWXO;
1316 mqdnum = mq_open(path, oflags, omodes, NULL);
1318 printf("[transferObject, %d] mq_open %s fail: %d, error: %s\n", numofcore, path, mqdnum, strerror(errno));
1322 /*struct ___Object___ * newobj = (struct ___Object___ *)obj;
1323 if(0 == newobj->isolate) {
1324 newobj = RUNMALLOC(size);
1325 memcpy(newobj, obj, size);
1326 newobj->original=obj;
1328 struct transObjInfo * tmptransObj = RUNMALLOC(sizeof(struct transObjInfo));
1329 memcpy(tmptransObj, transObj, sizeof(struct transObjInfo));
1330 int * tmpqueue = RUNMALLOC(sizeof(int)*2*tmptransObj->length);
1331 memcpy(tmpqueue, tmptransObj->queues, sizeof(int)*2*tmptransObj->length);
1332 tmptransObj->queues = tmpqueue;
1333 struct ___Object___ * newobj = RUNMALLOC(sizeof(struct ___Object___));
1334 newobj->type = ((struct ___Object___ *)obj)->type;
1335 newobj->original = (struct ___Object___ *)tmptransObj;
1338 ret=mq_send(mqdnum, (void *)newobj, sizeof(struct ___Object___), 0); // send the object into the queue
1340 printf("[transferObject, %d] mq_send to %s returned: %d, error: %s\n", numofcore, path, ret, strerror(errno));
1344 if(numofcore == STARTUPCORE) {
1345 ++numsendobjs[numofcore];
1347 ++(thread_data_array[numofcore].numsendobjs);
1349 printf("[transferObject, %d] mq_send to %s returned: $%x\n", numofcore, path, ret);
1353 // send terminate message to targetcore
1355 bool transStallMsg(int targetcore) {
1358 int self_y, self_x, target_y, target_x;
1359 // for 32 bit machine, the size is always 4 words
1360 //int msgsize = sizeof(int) * 4;
1363 calCoords(corenum, &self_y, &self_x);
1364 calCoords(targetcore, &target_y, &target_x);
1365 // Build the message header
1366 msgHdr = construct_dyn_hdr(0, msgsize, 0, // msgsize word sent.
1368 target_y, target_x);
1369 gdn_send(msgHdr); // Send the message header to EAST to handle fab(n - 1).
1370 raw_test_pass(0xbbbb);
1371 raw_test_pass(0xb000 + targetcore); // targetcore
1375 raw_test_pass_reg(corenum);
1376 gdn_send(self_numsendobjs);
1377 raw_test_pass_reg(self_numsendobjs);
1378 gdn_send(self_numreceiveobjs);
1379 raw_test_pass_reg(self_numreceiveobjs);
1380 raw_test_pass(0xffff);
1382 #elif defined THREADSIMULATE
1383 struct ___Object___ *newobj = RUNMALLOC(sizeof(struct ___Object___));
1384 // use the first four int field to hold msgtype/corenum/sendobj/receiveobj
1386 int numofcore = pthread_getspecific(key);
1387 newobj->flag = numofcore;
1388 newobj->___cachedHash___ = thread_data_array[numofcore].numsendobjs;
1389 newobj->___cachedCode___ = thread_data_array[numofcore].numreceiveobjs;
1391 // use POSIX message queue to send stall msg to startup core
1392 assert(targetcore == STARTUPCORE);
1396 if(targetcore < 10) {
1397 corenumstr[0] = targetcore + '0';
1398 corenumstr[1] = '\0';
1400 } else if(targetcore < 100) {
1401 corenumstr[1] = targetcore % 10 + '0';
1402 corenumstr[0] = (targetcore / 10) + '0';
1403 corenumstr[2] = '\0';
1406 printf("Error: targetcore >= 100\n");
1410 char * pathhead = "/msgqueue_";
1411 int targetlen = strlen(pathhead);
1412 char path[targetlen + sourcelen + 1];
1413 strcpy(path, pathhead);
1414 strncat(path, corenumstr, sourcelen);
1415 int oflags = O_WRONLY|O_NONBLOCK;
1416 int omodes = S_IRWXU|S_IRWXG|S_IRWXO;
1417 mqdnum = mq_open(path, oflags, omodes, NULL);
1419 printf("[transStallMsg, %d] mq_open %s fail: %d, error: %s\n", numofcore, path, mqdnum, strerror(errno));
1424 ret=mq_send(mqdnum, (void *)newobj, sizeof(struct ___Object___), 0); // send the object into the queue
1426 printf("[transStallMsg, %d] mq_send to %s returned: %d, error: %s\n", numofcore, path, ret, strerror(errno));
1430 printf("[transStallMsg, %d] mq_send to %s returned: $%x\n", numofcore, path, ret);
1431 printf("<transStallMsg> to %s index: %d, sendobjs: %d, receiveobjs: %d\n", path, newobj->flag, newobj->___cachedHash___, newobj->___cachedCode___);
1438 // receive object transferred from other cores
1439 // or the terminate message from other cores
1440 // NOTICE: following format is for threadsimulate version only
1441 // RAW version please see previous description
1442 // format: type + object
1443 // type: -1--stall msg
1445 // return value: 0--received an object
1446 // 1--received nothing
1447 // 2--received a Stall Msg
1448 // 3--received a lock Msg
1449 // RAW version: -1 -- received nothing
1450 // otherwise -- received msg type
1451 int receiveObject() {
1455 int self_y, self_x, target_y, target_x;
1457 if(gdn_input_avail() == 0) {
1458 if(corenum < NUMCORES) {
1459 raw_test_pass(0xd001);
1463 raw_test_pass(0xcccc);
1464 while((gdn_input_avail() != 0) && (msgdataindex < msglength)) {
1465 msgdata[msgdataindex] = gdn_receive();
1466 if(msgdataindex == 0) {
1467 if(msgdata[0] > 2) {
1469 } else if(msgdata[0] > 0) {
1472 } else if((msgdataindex == 1) && (msgdata[0] == 0)) {
1473 msglength = msgdata[msgdataindex];
1475 raw_test_pass_reg(msgdata[msgdataindex]);
1478 /*if(msgdataindex == 0) {
1480 msgtype = gdn_receive();
1487 msgdata = (int *)RUNMALLOC_I(msglength * sizeof(int));
1488 msgdata[msgdataindex] = msgtype;
1490 raw_test_pass_reg(msgtype);
1491 } else if((msgdataindex == 1) && (msgtype == 0)) {
1492 // object transfer msg
1493 msglength = gdn_receive();
1494 msgdata = (int *)RUNMALLOC_I(msglength * sizeof(int));
1495 msgdata[0] = msgtype;
1496 msgdata[msgdataindex] = msglength;
1497 raw_test_pass_reg(msgdata[msgdataindex]);
1499 msgdata[msgdataindex] = gdn_receive();
1500 raw_test_pass_reg(msgdata[msgdataindex]);
1504 raw_test_pass(0xffff);
1505 if(msgdataindex == msglength) {
1506 // received a whole msg
1507 int type, data1, data2; // will receive at least 3 words including type
1513 // receive a object transfer msg
1514 struct transObjInfo * transObj = RUNMALLOC_I(sizeof(struct transObjInfo));
1516 if(corenum > NUMCORES - 1) {
1517 raw_test_done(0xa00a);
1519 // store the object and its corresponding queue info, enqueue it later
1520 transObj->objptr = (void *)data2; // data1 is now size of the msg
1521 transObj->length = (msglength - 3) / 2;
1522 transObj->queues = RUNMALLOC_I(sizeof(int)*(msglength - 3));
1523 for(k = 0; k < transObj->length; ++k) {
1524 transObj->queues[2*k] = msgdata[3+2*k];
1525 raw_test_pass_reg(transObj->queues[2*k]);
1526 transObj->queues[2*k+1] = msgdata[3+2*k+1];
1527 raw_test_pass_reg(transObj->queues[2*k+1]);
1529 //memcpy(transObj->queues, msgdata[3], sizeof(int)*(msglength - 3));
1530 addNewItem_I(&objqueue, (void *)transObj);
1531 ++(self_numreceiveobjs);
1532 raw_test_pass(0xe881);
1534 addNewItem_I(&objqueue, (void *)data2);
1535 ++(self_numreceiveobjs);
1536 raw_test_pass(0xe881);*/
1540 // receive a stall msg
1541 if(corenum != STARTUPCORE) {
1542 // non startup core can not receive stall msg
1544 raw_test_done(0xa001);
1546 if(data1 < NUMCORES) {
1547 raw_test_pass(0xe882);
1548 corestatus[data1] = 0;
1549 numsendobjs[data1] = data2;
1550 numreceiveobjs[data1] = msgdata[3];
1555 // receive lock request msg
1556 // for 32 bit machine, the size is always 3 words
1557 //int msgsize = sizeof(int) * 3;
1559 // lock request msg, handle it right now
1560 // check to see if there is a lock exist in locktbl for the required obj
1561 int data3 = msgdata[3];
1563 if(!RuntimeHashcontainskey(locktbl, data2)) {
1564 // no locks for this object
1565 // first time to operate on this shared object
1566 // create a lock for it
1567 // the lock is an integer: 0 -- stall, >0 -- read lock, -1 -- write lock
1568 raw_test_pass(0xe883);
1570 RuntimeHashadd_I(locktbl, data2, 1);
1572 RuntimeHashadd_I(locktbl, data2, -1);
1576 raw_test_pass(0xe884);
1577 RuntimeHashget(locktbl, data2, &rwlock_obj);
1578 raw_test_pass_reg(rwlock_obj);
1579 if(0 == rwlock_obj) {
1585 RuntimeHashremovekey(locktbl, data2);
1586 RuntimeHashadd_I(locktbl, data2, rwlock_obj);
1587 } else if((rwlock_obj > 0) && (data1 == 0)) {
1588 // read lock request and there are only read locks
1590 RuntimeHashremovekey(locktbl, data2);
1591 RuntimeHashadd_I(locktbl, data2, rwlock_obj);
1595 raw_test_pass_reg(rwlock_obj);
1598 calCoords(corenum, &self_y, &self_x);
1599 calCoords(targetcore, &target_y, &target_x);
1600 // Build the message header
1601 msgHdr = construct_dyn_hdr(0, msgsize, 0, // msgsize word sent.
1603 target_y, target_x);
1604 gdn_send(msgHdr); // Send the message header to EAST to handle fab(n - 1).
1605 raw_test_pass(0xbbbb);
1606 raw_test_pass(0xb000 + targetcore); // targetcore
1608 // deny the lock request
1609 gdn_send(4); // lock request
1612 // grount the lock request
1613 gdn_send(3); // lock request
1616 gdn_send(data1); // lock type
1617 raw_test_pass_reg(data1);
1618 gdn_send(data2); // lock target
1619 raw_test_pass_reg(data2);
1620 raw_test_pass(0xffff);
1624 // receive lock grount msg
1625 if(corenum > NUMCORES - 1) {
1626 raw_test_done(0xa00b);
1628 if(lockobj == data2) {
1635 // conflicts on lockresults
1636 raw_test_done(0xa002);
1641 // receive lock grount/deny msg
1642 if(corenum > NUMCORES - 1) {
1643 raw_test_done(0xa00c);
1645 if(lockobj == data2) {
1652 // conflicts on lockresults
1653 raw_test_done(0xa003);
1658 // receive lock release msg
1659 if(!RuntimeHashcontainskey(locktbl, data2)) {
1660 // no locks for this object, something is wrong
1661 raw_test_done(0xa004);
1664 RuntimeHashget(locktbl, data2, &rwlock_obj);
1665 raw_test_pass(0xe885);
1666 raw_test_pass_reg(rwlock_obj);
1672 RuntimeHashremovekey(locktbl, data2);
1673 RuntimeHashadd_I(locktbl, data2, rwlock_obj);
1674 raw_test_pass_reg(rwlock_obj);
1683 for(msgdataindex--;msgdataindex > 0; --msgdataindex) {
1684 msgdata[msgdataindex] = -1;
1689 raw_test_pass(0xe886);
1693 raw_test_pass(0xe887);
1696 #elif defined THREADSIMULATE
1697 int numofcore = pthread_getspecific(key);
1698 // use POSIX message queue to transfer object
1700 struct mq_attr mqattr;
1701 mq_getattr(mqd[numofcore], &mqattr);
1702 void * msgptr =RUNMALLOC(mqattr.mq_msgsize);
1703 msglen=mq_receive(mqd[numofcore], msgptr, mqattr.mq_msgsize, NULL); // receive the object into the queue
1709 //printf("msg: %s\n",msgptr);
1710 if(((int*)msgptr)[0] == -1) {
1712 struct ___Object___ * tmpptr = (struct ___Object___ *)msgptr;
1713 int index = tmpptr->flag;
1714 corestatus[index] = 0;
1715 numsendobjs[index] = tmpptr->___cachedHash___;
1716 numreceiveobjs[index] = tmpptr->___cachedCode___;
1717 printf("<receiveObject> index: %d, sendobjs: %d, reveiveobjs: %d\n", index, numsendobjs[index], numreceiveobjs[index]);
1720 } /*else if(((int*)msgptr)[0] == -2) {
1725 if(numofcore == STARTUPCORE) {
1726 ++(numreceiveobjs[numofcore]);
1728 ++(thread_data_array[numofcore].numreceiveobjs);
1730 struct ___Object___ * tmpptr = (struct ___Object___ *)msgptr;
1731 struct transObjInfo * transObj = (struct transObjInfo *)tmpptr->original;
1732 tmpptr = (struct ___Object___ *)(transObj->objptr);
1733 int type = tmpptr->type;
1734 int size=classsize[type];
1735 struct ___Object___ * newobj=RUNMALLOC(size);
1736 memcpy(newobj, tmpptr, size);
1737 if(0 == newobj->isolate) {
1738 newobj->original=tmpptr;
1743 for(k = 0; k < transObj->length; ++k) {
1744 int taskindex = transObj->queues[2 * k];
1745 int paramindex = transObj->queues[2 * k + 1];
1746 struct parameterwrapper ** queues = &(paramqueues[numofcore][taskindex][paramindex]);
1747 enqueueObject(newobj, queues, 1);
1749 RUNFREE(transObj->queues);
1756 bool getreadlock(void * ptr) {
1759 int self_y, self_x, target_y, target_x;
1760 int targetcore = ((int)ptr) % TOTALCORE;
1761 // for 32 bit machine, the size is always 4 words
1762 //int msgsize = sizeof(int) * 4;
1772 if(targetcore == corenum) {
1773 // reside on this core
1776 raw_user_interrupts_off();
1778 if(!RuntimeHashcontainskey(locktbl, (int)ptr)) {
1779 // no locks for this object
1780 // first time to operate on this shared object
1781 // create a lock for it
1782 // the lock is an integer: 0 -- stall, >0 -- read lock, -1 -- write lock
1783 RuntimeHashadd_I(locktbl, (int)ptr, 1);
1786 RuntimeHashget(locktbl, (int)ptr, &rwlock_obj);
1787 if(-1 != rwlock_obj) {
1789 RuntimeHashremovekey(locktbl, (int)ptr);
1790 RuntimeHashadd_I(locktbl, (int)ptr, rwlock_obj);
1796 raw_user_interrupts_on();
1798 if(lockobj == (int)ptr) {
1809 // conflicts on lockresults
1810 raw_test_done(0xa005);
1815 calCoords(corenum, &self_y, &self_x);
1816 calCoords(targetcore, &target_y, &target_x);
1817 // Build the message header
1818 msgHdr = construct_dyn_hdr(0, msgsize, 0, // msgsize word sent.
1820 target_y, target_x);
1821 gdn_send(msgHdr); // Send the message header to EAST to handle fab(n - 1).
1822 raw_test_pass(0xbbbb);
1823 raw_test_pass(0xb000 + targetcore); // targetcore
1824 gdn_send(2); // lock request
1826 gdn_send(0); // read lock
1829 raw_test_pass_reg(ptr);
1831 raw_test_pass_reg(corenum);
1832 raw_test_pass(0xffff);
1834 #elif defined THREADSIMULATE
1835 int numofcore = pthread_getspecific(key);
1837 int rc = pthread_rwlock_tryrdlock(&rwlock_tbl);
1838 printf("[getreadlock, %d] getting the read lock for locktbl: %d error: \n", numofcore, rc, strerror(rc));
1842 if(!RuntimeHashcontainskey(locktbl, (int)ptr)) {
1843 // no locks for this object
1844 // first time to operate on this shared object
1845 // create a lock for it
1846 rc = pthread_rwlock_unlock(&rwlock_tbl);
1847 printf("[getreadlock, %d] release the read lock for locktbl: %d error: \n", numofcore, rc, strerror(rc));
1848 pthread_rwlock_t* rwlock = (pthread_rwlock_t *)RUNMALLOC(sizeof(pthread_rwlock_t));
1849 memcpy(rwlock, &rwlock_init, sizeof(pthread_rwlock_t));
1850 rc = pthread_rwlock_init(rwlock, NULL);
1851 printf("[getreadlock, %d] initialize the rwlock for object %d: %d error: \n", numofcore, (int)ptr, rc, strerror(rc));
1852 rc = pthread_rwlock_trywrlock(&rwlock_tbl);
1853 printf("[getreadlock, %d] getting the write lock for locktbl: %d error: \n", numofcore, rc, strerror(rc));
1858 if(!RuntimeHashcontainskey(locktbl, (int)ptr)) {
1860 RuntimeHashadd(locktbl, (int)ptr, (int)rwlock);
1863 RuntimeHashget(locktbl, (int)ptr, (int*)&rwlock);
1865 rc = pthread_rwlock_unlock(&rwlock_tbl);
1866 printf("[getreadlock, %d] release the write lock for locktbl: %d error: \n", numofcore, rc, strerror(rc));
1868 rc = pthread_rwlock_tryrdlock(rwlock);
1869 printf("[getreadlock, %d] getting read lock for object %d: %d error: \n", numofcore, (int)ptr, rc, strerror(rc));
1876 pthread_rwlock_t* rwlock_obj = NULL;
1877 RuntimeHashget(locktbl, (int)ptr, (int*)&rwlock_obj);
1878 rc = pthread_rwlock_unlock(&rwlock_tbl);
1879 printf("[getreadlock, %d] release the read lock for locktbl: %d error: \n", numofcore, rc, strerror(rc));
1880 int rc_obj = pthread_rwlock_tryrdlock(rwlock_obj);
1881 printf("[getreadlock, %d] getting read lock for object %d: %d error: \n", numofcore, (int)ptr, rc_obj, strerror(rc_obj));
1891 void releasereadlock(void * ptr) {
1894 int self_y, self_x, target_y, target_x;
1895 int targetcore = ((int)ptr) % TOTALCORE;
1896 // for 32 bit machine, the size is always 3 words
1897 //int msgsize = sizeof(int) * 3;
1900 if(targetcore == corenum) {
1902 raw_user_interrupts_off();
1904 // reside on this core
1905 if(!RuntimeHashcontainskey(locktbl, (int)ptr)) {
1906 // no locks for this object, something is wrong
1907 raw_test_done(0xa006);
1910 RuntimeHashget(locktbl, (int)ptr, &rwlock_obj);
1912 RuntimeHashremovekey(locktbl, (int)ptr);
1913 RuntimeHashadd_I(locktbl, (int)ptr, rwlock_obj);
1916 raw_user_interrupts_on();
1921 calCoords(corenum, &self_y, &self_x);
1922 calCoords(targetcore, &target_y, &target_x);
1923 // Build the message header
1924 msgHdr = construct_dyn_hdr(0, msgsize, 0, // msgsize word sent.
1926 target_y, target_x);
1927 gdn_send(msgHdr); // Send the message header to EAST to handle fab(n - 1).
1928 raw_test_pass(0xbbbb);
1929 raw_test_pass(0xb000 + targetcore); // targetcore
1930 gdn_send(5); // lock release
1932 gdn_send(0); // read lock
1935 raw_test_pass_reg(ptr);
1936 raw_test_pass(0xffff);
1937 #elif defined THREADSIMULATE
1938 int numofcore = pthread_getspecific(key);
1939 int rc = pthread_rwlock_rdlock(&rwlock_tbl);
1940 printf("[releasereadlock, %d] getting the read lock for locktbl: %d error: \n", numofcore, rc, strerror(rc));
1941 if(!RuntimeHashcontainskey(locktbl, (int)ptr)) {
1942 printf("[releasereadlock, %d] Error: try to release a lock without previously grab it\n", numofcore);
1945 pthread_rwlock_t* rwlock_obj = NULL;
1946 RuntimeHashget(locktbl, (int)ptr, (int*)&rwlock_obj);
1947 int rc_obj = pthread_rwlock_unlock(rwlock_obj);
1948 printf("[releasereadlock, %d] unlocked object %d: %d error: \n", numofcore, (int)ptr, rc_obj, strerror(rc_obj));
1949 rc = pthread_rwlock_unlock(&rwlock_tbl);
1950 printf("[releasereadlock, %d] release the read lock for locktbl: %d error: \n", numofcore, rc, strerror(rc));
1955 bool getreadlock_I(void * ptr) {
1957 int self_y, self_x, target_y, target_x;
1958 int targetcore = ((int)ptr) % TOTALCORE;
1959 // for 32 bit machine, the size is always 4 words
1960 //int msgsize = sizeof(int) * 4;
1970 if(targetcore == corenum) {
1971 // reside on this core
1973 if(!RuntimeHashcontainskey(locktbl, (int)ptr)) {
1974 // no locks for this object
1975 // first time to operate on this shared object
1976 // create a lock for it
1977 // the lock is an integer: 0 -- stall, >0 -- read lock, -1 -- write lock
1978 RuntimeHashadd_I(locktbl, (int)ptr, 1);
1981 RuntimeHashget(locktbl, (int)ptr, &rwlock_obj);
1982 if(-1 != rwlock_obj) {
1984 RuntimeHashremovekey(locktbl, (int)ptr);
1985 RuntimeHashadd_I(locktbl, (int)ptr, rwlock_obj);
1990 if(lockobj == (int)ptr) {
2001 // conflicts on lockresults
2002 raw_test_done(0xa005);
2007 calCoords(corenum, &self_y, &self_x);
2008 calCoords(targetcore, &target_y, &target_x);
2009 // Build the message header
2010 msgHdr = construct_dyn_hdr(0, msgsize, 0, // msgsize word sent.
2012 target_y, target_x);
2013 gdn_send(msgHdr); // Send the message header to EAST to handle fab(n - 1).
2014 raw_test_pass(0xbbbb);
2015 raw_test_pass(0xb000 + targetcore); // targetcore
2016 gdn_send(2); // lock request
2018 gdn_send(0); // read lock
2021 raw_test_pass_reg(ptr);
2023 raw_test_pass_reg(corenum);
2024 raw_test_pass(0xffff);
2028 void releasereadlock_I(void * ptr) {
2030 int self_y, self_x, target_y, target_x;
2031 int targetcore = ((int)ptr) % TOTALCORE;
2032 // for 32 bit machine, the size is always 3 words
2033 //int msgsize = sizeof(int) * 3;
2036 if(targetcore == corenum) {
2037 // reside on this core
2038 if(!RuntimeHashcontainskey(locktbl, (int)ptr)) {
2039 // no locks for this object, something is wrong
2040 raw_test_done(0xa006);
2043 RuntimeHashget(locktbl, (int)ptr, &rwlock_obj);
2045 RuntimeHashremovekey(locktbl, (int)ptr);
2046 RuntimeHashadd_I(locktbl, (int)ptr, rwlock_obj);
2051 calCoords(corenum, &self_y, &self_x);
2052 calCoords(targetcore, &target_y, &target_x);
2053 // Build the message header
2054 msgHdr = construct_dyn_hdr(0, msgsize, 0, // msgsize word sent.
2056 target_y, target_x);
2057 gdn_send(msgHdr); // Send the message header to EAST to handle fab(n - 1).
2058 raw_test_pass(0xbbbb);
2059 raw_test_pass(0xb000 + targetcore); // targetcore
2060 gdn_send(5); // lock release
2062 gdn_send(0); // read lock
2065 raw_test_pass_reg(ptr);
2066 raw_test_pass(0xffff);
2070 bool getwritelock(void * ptr) {
2073 int self_y, self_x, target_y, target_x;
2074 int targetcore = ((int)ptr) % TOTALCORE;
2075 // for 32 bit machine, the size is always 4 words
2076 //int msgsize = sizeof(int) * 4;
2086 if(targetcore == corenum) {
2087 // reside on this core
2090 raw_user_interrupts_off();
2092 if(!RuntimeHashcontainskey(locktbl, (int)ptr)) {
2093 // no locks for this object
2094 // first time to operate on this shared object
2095 // create a lock for it
2096 // the lock is an integer: 0 -- stall, >0 -- read lock, -1 -- write lock
2097 raw_test_pass(0xe552);
2098 RuntimeHashadd_I(locktbl, (int)ptr, -1);
2101 RuntimeHashget(locktbl, (int)ptr, &rwlock_obj);
2102 raw_test_pass(0xe553);
2103 raw_test_pass_reg(rwlock_obj);
2104 if(0 == rwlock_obj) {
2106 RuntimeHashremovekey(locktbl, (int)ptr);
2107 RuntimeHashadd_I(locktbl, (int)ptr, rwlock_obj);
2113 raw_user_interrupts_on();
2115 raw_test_pass(0xe554);
2116 raw_test_pass_reg(lockresult);
2117 if(lockobj == (int)ptr) {
2130 // conflicts on lockresults
2131 raw_test_done(0xa007);
2136 raw_test_pass(0xe555);
2137 calCoords(corenum, &self_y, &self_x);
2138 calCoords(targetcore, &target_y, &target_x);
2139 // Build the message header
2140 msgHdr = construct_dyn_hdr(0, msgsize, 0, // msgsize word sent.
2142 target_y, target_x);
2143 gdn_send(msgHdr); // Send the message header to EAST to handle fab(n - 1).
2144 raw_test_pass(0xbbbb);
2145 raw_test_pass(0xb000 + targetcore); // targetcore
2146 gdn_send(2); // lock request
2148 gdn_send(1); // write lock
2151 raw_test_pass_reg(ptr);
2153 raw_test_pass_reg(corenum);
2154 raw_test_pass(0xffff);
2156 #elif defined THREADSIMULATE
2157 int numofcore = pthread_getspecific(key);
2159 int rc = pthread_rwlock_tryrdlock(&rwlock_tbl);
2160 printf("[getwritelock, %d] getting the read lock for locktbl: %d error: \n", numofcore, rc, strerror(rc));
2164 if(!RuntimeHashcontainskey(locktbl, (int)ptr)) {
2165 // no locks for this object
2166 // first time to operate on this shared object
2167 // create a lock for it
2168 rc = pthread_rwlock_unlock(&rwlock_tbl);
2169 printf("[getwritelock, %d] release the read lock for locktbl: %d error: \n", numofcore, rc, strerror(rc));
2170 pthread_rwlock_t* rwlock = (pthread_rwlock_t *)RUNMALLOC(sizeof(pthread_rwlock_t));
2171 memcpy(rwlock, &rwlock_init, sizeof(pthread_rwlock_t));
2172 rc = pthread_rwlock_init(rwlock, NULL);
2173 printf("[getwritelock, %d] initialize the rwlock for object %d: %d error: \n", numofcore, (int)ptr, rc, strerror(rc));
2174 rc = pthread_rwlock_trywrlock(&rwlock_tbl);
2175 printf("[getwritelock, %d] getting the write lock for locktbl: %d error: \n", numofcore, rc, strerror(rc));
2177 pthread_rwlock_destroy(rwlock);
2181 if(!RuntimeHashcontainskey(locktbl, (int)ptr)) {
2183 RuntimeHashadd(locktbl, (int)ptr, (int)rwlock);
2185 pthread_rwlock_destroy(rwlock);
2187 RuntimeHashget(locktbl, (int)ptr, (int*)&rwlock);
2189 rc = pthread_rwlock_unlock(&rwlock_tbl);
2190 printf("[getwritelock, %d] release the write lock for locktbl: %d error: \n", numofcore, rc, strerror(rc));
2192 rc = pthread_rwlock_trywrlock(rwlock);
2193 printf("[getwritelock, %d] getting write lock for object %d: %d error: \n", numofcore, (int)ptr, rc, strerror(rc));
2200 pthread_rwlock_t* rwlock_obj = NULL;
2201 RuntimeHashget(locktbl, (int)ptr, (int*)&rwlock_obj);
2202 rc = pthread_rwlock_unlock(&rwlock_tbl);
2203 printf("[getwritelock, %d] release the read lock for locktbl: %d error: \n", numofcore, rc, strerror(rc));
2204 int rc_obj = pthread_rwlock_trywrlock(rwlock_obj);
2205 printf("[getwritelock, %d] getting write lock for object %d: %d error: \n", numofcore, (int)ptr, rc_obj, strerror(rc_obj));
2216 void releasewritelock(void * ptr) {
2219 int self_y, self_x, target_y, target_x;
2220 int targetcore = ((int)ptr) % TOTALCORE;
2221 // for 32 bit machine, the size is always 3 words
2222 //int msgsize = sizeof(int) * 3;
2225 if(targetcore == corenum) {
2227 raw_user_interrupts_off();
2229 // reside on this core
2230 if(!RuntimeHashcontainskey(locktbl, (int)ptr)) {
2231 // no locks for this object, something is wrong
2232 raw_test_done(0xa008);
2235 raw_test_pass(0xe662);
2236 RuntimeHashget(locktbl, (int)ptr, &rwlock_obj);
2237 raw_test_pass_reg(rwlock_obj);
2239 RuntimeHashremovekey(locktbl, (int)ptr);
2240 RuntimeHashadd_I(locktbl, (int)ptr, rwlock_obj);
2241 raw_test_pass_reg(rwlock_obj);
2244 raw_user_interrupts_on();
2249 raw_test_pass(0xe663);
2250 calCoords(corenum, &self_y, &self_x);
2251 calCoords(targetcore, &target_y, &target_x);
2252 // Build the message header
2253 msgHdr = construct_dyn_hdr(0, msgsize, 0, // msgsize word sent.
2255 target_y, target_x);
2256 gdn_send(msgHdr); // Send the message header to EAST to handle fab(n - 1).
2257 raw_test_pass(0xbbbb);
2258 raw_test_pass(0xb000 + targetcore);
2259 gdn_send(5); // lock release
2261 gdn_send(1); // write lock
2264 raw_test_pass_reg(ptr);
2265 raw_test_pass(0xffff);
2266 #elif defined THREADSIMULATE
2267 int numofcore = pthread_getspecific(key);
2268 int rc = pthread_rwlock_rdlock(&rwlock_tbl);
2269 printf("[releasewritelock, %d] getting the read lock for locktbl: %d error: \n", numofcore, rc, strerror(rc));
2270 if(!RuntimeHashcontainskey(locktbl, (int)ptr)) {
2271 printf("[releasewritelock, %d] Error: try to release a lock without previously grab it\n", numofcore);
2274 pthread_rwlock_t* rwlock_obj = NULL;
2275 RuntimeHashget(locktbl, (int)ptr, (int*)&rwlock_obj);
2276 int rc_obj = pthread_rwlock_unlock(rwlock_obj);
2277 printf("[releasewritelock, %d] unlocked object %d: %d error:\n", numofcore, (int)ptr, rc_obj, strerror(rc_obj));
2278 rc = pthread_rwlock_unlock(&rwlock_tbl);
2279 printf("[releasewritelock, %d] release the read lock for locktbl: %d error: \n", numofcore, rc, strerror(rc));
2283 int enqueuetasks(struct parameterwrapper *parameter, struct parameterwrapper *prevptr, struct ___Object___ *ptr, int * enterflags, int numenterflags) {
2284 void * taskpointerarray[MAXTASKPARAMS];
2286 int numparams=parameter->task->numParameters;
2287 int numiterators=parameter->task->numTotal-1;
2292 struct taskdescriptor * task=parameter->task;
2294 ObjectHashadd(parameter->objectset, (int) ptr, 0, (int) enterflags, numenterflags, enterflags==NULL);//this add the object to parameterwrapper
2296 /* Add enqueued object to parameter vector */
2297 taskpointerarray[parameter->slot]=ptr;
2299 /* Reset iterators */
2300 for(j=0;j<numiterators;j++) {
2301 toiReset(¶meter->iterators[j]);
2304 /* Find initial state */
2305 for(j=0;j<numiterators;j++) {
2307 if(toiHasNext(¶meter->iterators[j], taskpointerarray OPTARG(failed)))
2308 toiNext(¶meter->iterators[j], taskpointerarray OPTARG(failed));
2310 /* Need to backtrack */
2311 toiReset(¶meter->iterators[j]);
2315 /* Nothing to enqueue */
2322 /* Enqueue current state */
2324 struct taskparamdescriptor *tpd=RUNMALLOC(sizeof(struct taskparamdescriptor));
2326 tpd->numParameters=numiterators+1;
2327 tpd->parameterArray=RUNMALLOC(sizeof(void *)*(numiterators+1));
2328 for(j=0;j<=numiterators;j++){
2329 tpd->parameterArray[j]=taskpointerarray[j];//store the actual parameters
2332 if ((/*!gencontains(failedtasks, tpd)&&*/!gencontains(activetasks,tpd))) {
2333 genputtable(activetasks, tpd, tpd);
2335 RUNFREE(tpd->parameterArray);
2339 /* This loop iterates to the next parameter combination */
2340 if (numiterators==0)
2343 for(j=numiterators-1; j<numiterators;j++) {
2345 if(toiHasNext(¶meter->iterators[j], taskpointerarray OPTARG(failed)))
2346 toiNext(¶meter->iterators[j], taskpointerarray OPTARG(failed));
2348 /* Need to backtrack */
2349 toiReset(¶meter->iterators[j]);
2353 /* Nothing more to enqueue */
2362 int enqueuetasks_I(struct parameterwrapper *parameter, struct parameterwrapper *prevptr, struct ___Object___ *ptr, int * enterflags, int numenterflags) {
2363 void * taskpointerarray[MAXTASKPARAMS];
2365 int numparams=parameter->task->numParameters;
2366 int numiterators=parameter->task->numTotal-1;
2371 struct taskdescriptor * task=parameter->task;
2373 ObjectHashadd_I(parameter->objectset, (int) ptr, 0, (int) enterflags, numenterflags, enterflags==NULL);//this add the object to parameterwrapper
2375 /* Add enqueued object to parameter vector */
2376 taskpointerarray[parameter->slot]=ptr;
2378 /* Reset iterators */
2379 for(j=0;j<numiterators;j++) {
2380 toiReset(¶meter->iterators[j]);
2383 /* Find initial state */
2384 for(j=0;j<numiterators;j++) {
2386 if(toiHasNext(¶meter->iterators[j], taskpointerarray OPTARG(failed)))
2387 toiNext(¶meter->iterators[j], taskpointerarray OPTARG(failed));
2389 /* Need to backtrack */
2390 toiReset(¶meter->iterators[j]);
2394 /* Nothing to enqueue */
2400 /* Enqueue current state */
2402 struct taskparamdescriptor *tpd=RUNMALLOC_I(sizeof(struct taskparamdescriptor));
2404 tpd->numParameters=numiterators+1;
2405 tpd->parameterArray=RUNMALLOC_I(sizeof(void *)*(numiterators+1));
2406 for(j=0;j<=numiterators;j++){
2407 tpd->parameterArray[j]=taskpointerarray[j];//store the actual parameters
2410 if ((/*!gencontains(failedtasks, tpd)&&*/!gencontains(activetasks,tpd))) {
2411 genputtable_I(activetasks, tpd, tpd);
2413 RUNFREE(tpd->parameterArray);
2417 /* This loop iterates to the next parameter combination */
2418 if (numiterators==0)
2421 for(j=numiterators-1; j<numiterators;j++) {
2423 if(toiHasNext(¶meter->iterators[j], taskpointerarray OPTARG(failed)))
2424 toiNext(¶meter->iterators[j], taskpointerarray OPTARG(failed));
2426 /* Need to backtrack */
2427 toiReset(¶meter->iterators[j]);
2431 /* Nothing more to enqueue */
2440 /* Handler for signals. The signals catch null pointer errors and
2441 arithmatic errors. */
2443 void myhandler(int sig, siginfo_t *info, void *uap) {
2446 printf("sig=%d\n",sig);
2449 sigemptyset(&toclear);
2450 sigaddset(&toclear, sig);
2451 sigprocmask(SIG_UNBLOCK, &toclear,NULL);
2452 longjmp(error_handler,1);
2458 struct RuntimeHash *fdtoobject;
2460 void addreadfd(int fd) {
2463 FD_SET(fd, &readfds);
2466 void removereadfd(int fd) {
2467 FD_CLR(fd, &readfds);
2468 if (maxreadfd==(fd+1)) {
2470 while(maxreadfd>0&&!FD_ISSET(maxreadfd-1, &readfds))
2481 void executetasks() {
2482 void * taskpointerarray[MAXTASKPARAMS+OFFSET];
2485 struct ___Object___ * tmpparam = NULL;
2486 struct parameterdescriptor * pd=NULL;
2487 struct parameterwrapper *pw=NULL;
2496 raw_test_pass(0xe991);
2500 /* Set up signal handlers */
2501 struct sigaction sig;
2502 sig.sa_sigaction=&myhandler;
2503 sig.sa_flags=SA_SIGINFO;
2504 sigemptyset(&sig.sa_mask);
2506 /* Catch bus errors, segmentation faults, and floating point exceptions*/
2507 sigaction(SIGBUS,&sig,0);
2508 sigaction(SIGSEGV,&sig,0);
2509 sigaction(SIGFPE,&sig,0);
2510 sigaction(SIGPIPE,&sig,0);
2519 fdtoobject=allocateRuntimeHash(100);
2523 /* Map first block of memory to protected, anonymous page */
2524 mmap(0, 0x1000, 0, MAP_SHARED|MAP_FIXED|MAP_ANON, -1, 0);
2528 while((hashsize(activetasks)>0)||(maxreadfd>0)) {
2531 raw_test_pass(0xe992);
2533 /* Check if any filedescriptors have IO pending */
2536 struct timeval timeout={0,0};
2540 numselect=select(maxreadfd, &tmpreadfds, NULL, NULL, &timeout);
2542 /* Process ready fd's */
2544 for(fd=0;fd<maxreadfd;fd++) {
2545 if (FD_ISSET(fd, &tmpreadfds)) {
2546 /* Set ready flag on object */
2548 // printf("Setting fd %d\n",fd);
2549 if (RuntimeHashget(fdtoobject, fd,(int *) &objptr)) {
2550 if(intflagorand(objptr,1,0xFFFFFFFF)) { /* Set the first flag to 1 */
2551 enqueueObject(objptr, NULL, 0);
2560 /* See if there are any active tasks */
2561 if (hashsize(activetasks)>0) {
2563 currtpd=(struct taskparamdescriptor *) getfirstkey(activetasks);
2564 genfreekey(activetasks, currtpd);
2566 /* Check if this task has failed, allow a task that contains optional objects to fire */
2567 /*if (gencontains(failedtasks, currtpd)) {
2568 // Free up task parameter descriptor
2569 RUNFREE(currtpd->parameterArray);
2573 numparams=currtpd->task->numParameters;
2574 numtotal=currtpd->task->numTotal;
2576 #ifdef THREADSIMULATE
2577 int isolateflags[numparams];
2579 /* Make sure that the parameters are still in the queues */
2580 for(i=0;i<numparams;i++) {
2581 void * parameter=currtpd->parameterArray[i];
2583 raw_test_pass(0xe993);
2584 // require locks for this parameter
2585 getwritelock(parameter);
2594 while(receiveObject() != -1) {
2598 grount = lockresult;
2608 raw_test_pass(0xe994);
2609 // can not get the lock, try later
2610 for(j = 0; j < i; ++j) {
2611 releasewritelock(taskpointerarray[j+OFFSET]);
2613 genputtable(activetasks, currtpd, currtpd);
2619 for(tmp = 0; tmp < classsize[((struct ___Object___ *)parameter)->type]; ++tmp) {
2620 invalidateAddr(parameter + tmp);
2624 tmpparam = (struct ___Object___ *)parameter;
2625 #ifdef THREADSIMULATE
2626 if(0 == tmpparam->isolate) {
2627 isolateflags[i] = 0;
2628 // shared object, need to flush with current value
2629 //if(!getreadlock(tmpparam->original)) {
2630 // // fail to get read lock of the original object, try this task later
2631 if(!getwritelock(tmpparam->original)) {
2632 // fail to get write lock, release all obtained locks and try this task later
2634 for(j = 0; j < i; ++j) {
2635 if(0 == isolateflags[j]) {
2636 releasewritelock(((struct ___Object___ *)taskpointerarray[j+OFFSET])->original);
2639 genputtable(activetasks, currtpd, currtpd);
2642 if(tmpparam->version != tmpparam->original->version) {
2643 // some task on another core has changed this object
2644 // flush this object
2645 //memcpy(tmpparam, tmpparam->original, classsize[tmpparam->type]);
2646 // release all obtained locks
2648 for(j = 0; j < i; ++j) {
2649 if(0 == isolateflags[j]) {
2650 releasewritelock(((struct ___Object___ *)taskpointerarray[j+OFFSET])->original);
2653 releasewritelock(tmpparam->original);
2655 // dequeue this object
2656 int numofcore = pthread_getspecific(key);
2657 struct parameterwrapper ** queues = objectqueues[numofcore][tmpparam->type];
2658 int length = numqueues[numofcore][tmpparam->type];
2659 for(j = 0; j < length; ++j) {
2660 struct parameterwrapper * pw = queues[j];
2661 if(ObjectHashcontainskey(pw->objectset, (int)tmpparam)) {
2663 int UNUSED, UNUSED2;
2665 ObjectHashget(pw->objectset, (int) tmpparam, (int *) &next, (int *) &enterflags, &UNUSED, &UNUSED2);
2666 ObjectHashremove(pw->objectset, (int)tmpparam);
2667 if (enterflags!=NULL)
2671 // try to enqueue it again to check if it feeds other tasks;
2672 //enqueueObject(tmpparam, NULL, 0);
2673 // Free up task parameter descriptor
2674 RUNFREE(currtpd->parameterArray);
2679 isolateflags[i] = 1;
2682 pd=currtpd->task->descriptorarray[i];
2683 pw=(struct parameterwrapper *) pd->queue;
2684 /* Check that object is still in queue */
2686 if (!ObjectHashcontainskey(pw->objectset, (int) parameter)) {
2688 raw_test_pass(0xe995);
2690 RUNFREE(currtpd->parameterArray);
2696 /* Check if the object's flags still meets requirements */
2700 for(tmpi = 0; tmpi < pw->numberofterms; ++tmpi) {
2701 andmask=pw->intarray[tmpi*2];
2702 checkmask=pw->intarray[tmpi*2+1];
2703 raw_test_pass(0xdd000000 + andmask);
2704 raw_test_pass_reg((int)parameter);
2705 raw_test_pass(0xdd000000 + ((struct ___Object___ *)parameter)->flag);
2706 raw_test_pass(0xdd000000 + checkmask);
2707 if((((struct ___Object___ *)parameter)->flag&andmask)==checkmask) {
2713 // flags are never suitable
2714 // remove this obj from the queue
2716 int UNUSED, UNUSED2;
2718 raw_test_pass(0xe996);
2719 ObjectHashget(pw->objectset, (int) parameter, (int *) &next, (int *) &enterflags, &UNUSED, &UNUSED2);
2720 ObjectHashremove(pw->objectset, (int)parameter);
2721 if (enterflags!=NULL)
2723 // release grabbed locks
2724 for(j = 0; j < i; ++j) {
2725 releasewritelock(taskpointerarray[j+OFFSET]);
2727 releasewritelock(parameter);
2728 RUNFREE(currtpd->parameterArray);
2736 /* Check that object still has necessary tags */
2737 for(j=0;j<pd->numbertags;j++) {
2738 int slotid=pd->tagarray[2*j]+numparams;
2739 struct ___TagDescriptor___ *tagd=currtpd->parameterArray[slotid];
2740 if (!containstag(parameter, tagd)) {
2742 raw_test_pass(0xe997);
2744 RUNFREE(currtpd->parameterArray);
2750 taskpointerarray[i+OFFSET]=parameter;
2753 for(;i<numtotal;i++) {
2754 taskpointerarray[i+OFFSET]=currtpd->parameterArray[i];
2757 #ifdef THREADSIMULATE
2758 for(i = 0; i < numparams; ++i) {
2759 if(0 == isolateflags[i]) {
2760 struct ___Object___ * tmpparam = (struct ___Object___ *)taskpointerarray[i+OFFSET];
2761 if(tmpparam != tmpparam->original) {
2762 taskpointerarray[i+OFFSET] = tmpparam->original;
2771 /* Checkpoint the state */
2772 forward=allocateRuntimeHash(100);
2773 reverse=allocateRuntimeHash(100);
2774 //void ** checkpoint=makecheckpoint(currtpd->task->numParameters, currtpd->parameterArray, forward, reverse);
2777 if (x=setjmp(error_handler)) {
2782 printf("Fatal Error=%d, Recovering!\n",x);
2786 genputtable(failedtasks,currtpd,currtpd);
2787 //restorecheckpoint(currtpd->task->numParameters, currtpd->parameterArray, checkpoint, forward, reverse);
2789 freeRuntimeHash(forward);
2790 freeRuntimeHash(reverse);
2797 raw_test_pass_reg(x);
2798 raw_test_done(0xa009);
2803 /*if (injectfailures) {
2804 if ((((double)random())/RAND_MAX)<failurechance) {
2805 printf("\nINJECTING TASK FAILURE to %s\n", currtpd->task->name);
2806 longjmp(error_handler,10);
2809 /* Actually call task */
2811 ((int *)taskpointerarray)[0]=currtpd->numParameters;
2812 taskpointerarray[1]=NULL;
2816 printf("ENTER %s count=%d\n",currtpd->task->name, (instaccum-instructioncount));
2818 ((void (*) (void **)) currtpd->task->taskptr)(taskpointerarray);
2820 printf("EXIT %s count=%d\n",currtpd->task->name, (instaccum-instructioncount));
2823 ((void (*) (void **)) currtpd->task->taskptr)(taskpointerarray);
2826 for(i = 0; i < numparams; ++i) {
2828 struct ___Object___ * tmpparam = (struct ___Object___ *)taskpointerarray[i+OFFSET];
2829 raw_test_pass(0xe998);
2830 raw_test_pass(0xdd100000 + tmpparam->flag);
2831 releasewritelock(tmpparam);
2833 #elif defined THREADSIMULATE
2834 for(i = 0; i < numparams; ++i) {
2835 if(0 == isolateflags[i]) {
2836 struct ___Object___ * tmpparam = (struct ___Object___ *)taskpointerarray[i+OFFSET];
2837 releasewritelock(tmpparam);
2844 freeRuntimeHash(forward);
2845 freeRuntimeHash(reverse);
2849 // Free up task parameter descriptor
2850 RUNFREE(currtpd->parameterArray);
2863 freeRuntimeHash(fdtoobject);
2867 /* This function processes an objects tags */
2868 void processtags(struct parameterdescriptor *pd, int index, struct parameterwrapper *parameter, int * iteratorcount, int *statusarray, int numparams) {
2871 for(i=0;i<pd->numbertags;i++) {
2872 int slotid=pd->tagarray[2*i];
2873 int tagid=pd->tagarray[2*i+1];
2875 if (statusarray[slotid+numparams]==0) {
2876 parameter->iterators[*iteratorcount].istag=1;
2877 parameter->iterators[*iteratorcount].tagid=tagid;
2878 parameter->iterators[*iteratorcount].slot=slotid+numparams;
2879 parameter->iterators[*iteratorcount].tagobjectslot=index;
2880 statusarray[slotid+numparams]=1;
2887 void processobject(struct parameterwrapper *parameter, int index, struct parameterdescriptor *pd, int *iteratorcount, int * statusarray, int numparams) {
2890 struct ObjectHash * objectset=((struct parameterwrapper *)pd->queue)->objectset;
2892 parameter->iterators[*iteratorcount].istag=0;
2893 parameter->iterators[*iteratorcount].slot=index;
2894 parameter->iterators[*iteratorcount].objectset=objectset;
2895 statusarray[index]=1;
2897 for(i=0;i<pd->numbertags;i++) {
2898 int slotid=pd->tagarray[2*i];
2899 int tagid=pd->tagarray[2*i+1];
2900 if (statusarray[slotid+numparams]!=0) {
2901 /* This tag has already been enqueued, use it to narrow search */
2902 parameter->iterators[*iteratorcount].tagbindings[tagcount]=slotid+numparams;
2906 parameter->iterators[*iteratorcount].numtags=tagcount;
2911 /* This function builds the iterators for a task & parameter */
2913 void builditerators(struct taskdescriptor * task, int index, struct parameterwrapper * parameter) {
2914 int statusarray[MAXTASKPARAMS];
2916 int numparams=task->numParameters;
2917 int iteratorcount=0;
2918 for(i=0;i<MAXTASKPARAMS;i++) statusarray[i]=0;
2920 statusarray[index]=1; /* Initial parameter */
2921 /* Process tags for initial iterator */
2923 processtags(task->descriptorarray[index], index, parameter, & iteratorcount, statusarray, numparams);
2927 /* Check for objects with existing tags */
2928 for(i=0;i<numparams;i++) {
2929 if (statusarray[i]==0) {
2930 struct parameterdescriptor *pd=task->descriptorarray[i];
2932 for(j=0;j<pd->numbertags;j++) {
2933 int slotid=pd->tagarray[2*j];
2934 if(statusarray[slotid+numparams]!=0) {
2935 processobject(parameter, i, pd, &iteratorcount, statusarray, numparams);
2936 processtags(pd, i, parameter, &iteratorcount, statusarray, numparams);
2943 /* Next do objects w/ unbound tags*/
2945 for(i=0;i<numparams;i++) {
2946 if (statusarray[i]==0) {
2947 struct parameterdescriptor *pd=task->descriptorarray[i];
2948 if (pd->numbertags>0) {
2949 processobject(parameter, i, pd, &iteratorcount, statusarray, numparams);
2950 processtags(pd, i, parameter, &iteratorcount, statusarray, numparams);
2956 /* Nothing with a tag enqueued */
2958 for(i=0;i<numparams;i++) {
2959 if (statusarray[i]==0) {
2960 struct parameterdescriptor *pd=task->descriptorarray[i];
2961 processobject(parameter, i, pd, &iteratorcount, statusarray, numparams);
2962 processtags(pd, i, parameter, &iteratorcount, statusarray, numparams);
2975 #ifdef THREADSIMULATE
2976 int numofcore = pthread_getspecific(key);
2977 for(i=0;i<numtasks[numofcore];i++) {
2978 struct taskdescriptor * task=taskarray[numofcore][i];
2981 if(corenum > NUMCORES - 1) {
2985 for(i=0;i<numtasks[corenum];i++) {
2986 struct taskdescriptor * task=taskarray[corenum][i];
2989 printf("%s\n", task->name);
2991 for(j=0;j<task->numParameters;j++) {
2992 struct parameterdescriptor *param=task->descriptorarray[j];
2993 struct parameterwrapper *parameter=param->queue;
2994 struct ObjectHash * set=parameter->objectset;
2995 struct ObjectIterator objit;
2997 printf(" Parameter %d\n", j);
2999 ObjectHashiterator(set, &objit);
3000 while(ObjhasNext(&objit)) {
3001 struct ___Object___ * obj=(struct ___Object___ *)Objkey(&objit);
3002 struct ___Object___ * tagptr=obj->___tags___;
3003 int nonfailed=Objdata4(&objit);
3004 int numflags=Objdata3(&objit);
3005 int flags=Objdata2(&objit);
3008 printf(" Contains %lx\n", obj);
3009 printf(" flag=%d\n", obj->flag);
3012 } else if (tagptr->type==TAGTYPE) {
3014 printf(" tag=%lx\n",tagptr);
3019 struct ArrayObject *ao=(struct ArrayObject *)tagptr;
3020 for(;tagindex<ao->___cachedCode___;tagindex++) {
3022 printf(" tag=%lx\n",ARRAYGET(ao, struct ___TagDescriptor___*, tagindex));
3032 /* This function processes the task information to create queues for
3033 each parameter type. */
3035 void processtasks() {
3038 if(corenum > NUMCORES - 1) {
3042 #ifdef THREADSIMULATE
3043 int numofcore = pthread_getspecific(key);
3044 for(i=0;i<numtasks[numofcore];i++) {
3045 struct taskdescriptor *task=taskarray[numofcore][i];
3047 for(i=0;i<numtasks[corenum];i++) {
3048 struct taskdescriptor * task=taskarray[corenum][i];
3052 /* Build objectsets */
3053 for(j=0;j<task->numParameters;j++) {
3054 struct parameterdescriptor *param=task->descriptorarray[j];
3055 struct parameterwrapper *parameter=param->queue;
3056 parameter->objectset=allocateObjectHash(10);
3057 parameter->task=task;
3060 /* Build iterators for parameters */
3061 for(j=0;j<task->numParameters;j++) {
3062 struct parameterdescriptor *param=task->descriptorarray[j];
3063 struct parameterwrapper *parameter=param->queue;
3064 builditerators(task, j, parameter);
3069 void toiReset(struct tagobjectiterator * it) {
3072 } else if (it->numtags>0) {
3075 ObjectHashiterator(it->objectset, &it->it);
3079 int toiHasNext(struct tagobjectiterator *it, void ** objectarray OPTARG(int * failed)) {
3082 /* Get object with tags */
3083 struct ___Object___ *obj=objectarray[it->tagobjectslot];
3084 struct ___Object___ *tagptr=obj->___tags___;
3085 if (tagptr->type==TAGTYPE) {
3086 if ((it->tagobjindex==0)&& /* First object */
3087 (it->tagid==((struct ___TagDescriptor___ *)tagptr)->flag)) /* Right tag type */
3092 struct ArrayObject *ao=(struct ArrayObject *) tagptr;
3093 int tagindex=it->tagobjindex;
3094 for(;tagindex<ao->___cachedCode___;tagindex++) {
3095 struct ___TagDescriptor___ *td=ARRAYGET(ao, struct ___TagDescriptor___ *, tagindex);
3096 if (td->flag==it->tagid) {
3097 it->tagobjindex=tagindex; /* Found right type of tag */
3103 } else if (it->numtags>0) {
3104 /* Use tags to locate appropriate objects */
3105 struct ___TagDescriptor___ *tag=objectarray[it->tagbindings[0]];
3106 struct ___Object___ *objptr=tag->flagptr;
3108 if (objptr->type!=OBJECTARRAYTYPE) {
3109 if (it->tagobjindex>0)
3111 if (!ObjectHashcontainskey(it->objectset, (int) objptr))
3113 for(i=1;i<it->numtags;i++) {
3114 struct ___TagDescriptor___ *tag2=objectarray[it->tagbindings[i]];
3115 if (!containstag(objptr,tag2))
3120 struct ArrayObject *ao=(struct ArrayObject *) objptr;
3123 for(tagindex=it->tagobjindex;tagindex<ao->___cachedCode___;tagindex++) {
3124 struct ___Object___ *objptr=ARRAYGET(ao, struct ___Object___*, tagindex);
3125 if (!ObjectHashcontainskey(it->objectset, (int) objptr))
3127 for(i=1;i<it->numtags;i++) {
3128 struct ___TagDescriptor___ *tag2=objectarray[it->tagbindings[i]];
3129 if (!containstag(objptr,tag2))
3132 it->tagobjindex=tagindex;
3137 it->tagobjindex=tagindex;
3141 return ObjhasNext(&it->it);
3145 int containstag(struct ___Object___ *ptr, struct ___TagDescriptor___ *tag) {
3147 struct ___Object___ * objptr=tag->flagptr;
3148 if (objptr->type==OBJECTARRAYTYPE) {
3149 struct ArrayObject *ao=(struct ArrayObject *)objptr;
3150 for(j=0;j<ao->___cachedCode___;j++) {
3151 if (ptr==ARRAYGET(ao, struct ___Object___*, j))
3159 void toiNext(struct tagobjectiterator *it , void ** objectarray OPTARG(int * failed)) {
3160 /* hasNext has all of the intelligence */
3163 /* Get object with tags */
3164 struct ___Object___ *obj=objectarray[it->tagobjectslot];
3165 struct ___Object___ *tagptr=obj->___tags___;
3166 if (tagptr->type==TAGTYPE) {
3168 objectarray[it->slot]=tagptr;
3170 struct ArrayObject *ao=(struct ArrayObject *) tagptr;
3171 objectarray[it->slot]=ARRAYGET(ao, struct ___TagDescriptor___ *, it->tagobjindex++);
3173 } else if (it->numtags>0) {
3174 /* Use tags to locate appropriate objects */
3175 struct ___TagDescriptor___ *tag=objectarray[it->tagbindings[0]];
3176 struct ___Object___ *objptr=tag->flagptr;
3177 if (objptr->type!=OBJECTARRAYTYPE) {
3179 objectarray[it->slot]=objptr;
3181 struct ArrayObject *ao=(struct ArrayObject *) objptr;
3182 objectarray[it->slot]=ARRAYGET(ao, struct ___Object___ *, it->tagobjindex++);
3185 /* Iterate object */
3186 objectarray[it->slot]=(void *)Objkey(&it->it);