4 #include "structdefs.h"
6 #include "checkpoint.h"
8 #include "SimpleHash.h"
9 #include "GenericHashtable.h"
10 #include <sys/select.h>
11 #include <sys/types.h>
20 #include <raw_compiler_defs.h>
21 //#include <libints.h>
22 #elif defined THREADSIMULATE
23 // use POSIX message queue
24 // for each core, its message queue named as
30 extern int injectfailures;
31 extern float failurechance;
37 #define TOTALCORE raw_get_num_tiles()
41 #include "instrument.h"
44 struct genhashtable * activetasks;
45 struct genhashtable * failedtasks;
46 struct taskparamdescriptor * currtpd;
48 struct RuntimeHash * forward;
49 struct RuntimeHash * reverse;
52 int corestatus[NUMCORES]; // records status of each core
55 int numsendobjs[NUMCORES]; // records how many objects a core has sent out
56 int numreceiveobjs[NUMCORES]; // records how many objects a core has received
58 struct RuntimeHash locktable;
59 static struct RuntimeHash* locktbl = &locktable;
60 void * curr_heapbase=0;
61 void * curr_heaptop=0;
63 int self_numreceiveobjs;
70 struct Queue objqueue;
75 void calCoords(int core_num, int* coordY, int* coordX);
76 #elif defined THREADSIMULATE
77 static struct RuntimeHash* locktbl;
85 struct thread_data thread_data_array[NUMCORES];
87 static pthread_key_t key;
88 static pthread_rwlock_t rwlock_tbl;
89 static pthread_rwlock_t rwlock_init;
94 bool transStallMsg(int targetcore);
95 void transTerminateMsg(int targetcore);
97 bool getreadlock(void* ptr);
98 void releasereadlock(void* ptr);
100 bool getreadlock_I(void* ptr);
101 void releasereadlock_I(void* ptr);
103 bool getwritelock(void* ptr);
104 void releasewritelock(void* ptr);
109 int main(int argc, char **argv) {
115 bool sendStall = false;
117 bool tocontinue = false;
118 struct QueueItem * objitem = NULL;
119 struct transObjInfo * objInfo = NULL;
121 bool allStall = true;
125 raw_test_pass(0xee01);
127 corenum = raw_get_abs_pos_x() + 4 * raw_get_abs_pos_y();
129 // initialize the arrays
130 if(STARTUPCORE == corenum) {
131 // startup core to initialize corestatus[]
132 for(i = 0; i < NUMCORES; ++i) {
134 numsendobjs[i] = 0; // assume all variables in RAW are local variables! MAY BE WRONG!!!
135 numreceiveobjs[i] = 0;
138 self_numsendobjs = 0;
139 self_numreceiveobjs = 0;
140 for(i = 0; i < 30; ++i) {
148 raw_test_pass(0xee02);
151 // create the lock table, lockresult table and obj queue
153 locktable.bucket = (struct RuntimeNode **) RUNMALLOC_I(sizeof(struct RuntimeNode *)*20);
154 /* Set allocation blocks*/
155 locktable.listhead=NULL;
156 locktable.listtail=NULL;
158 locktable.numelements = 0;
165 objqueue.head = NULL;
166 objqueue.tail = NULL;
168 raw_test_pass(0xee03);
172 if (corenum < NUMCORES) {
175 //setup_interrupts();
176 //start_gdn_avail_ints(recvMsg);
177 raw_user_interrupts_on();
179 raw_test_pass(0xee04);
184 #elif defined THREADSIMULATE
188 pthread_t threads[NUMCORES];
191 // initialize three arrays and msg queue array
192 char * pathhead = "/msgqueue_";
193 int targetlen = strlen(pathhead);
194 for(i = 0; i < NUMCORES; ++i) {
197 numreceiveobjs[i] = 0;
202 corenumstr[0] = i + '0';
203 corenumstr[1] = '\0';
206 corenumstr[1] = i %10 + '0';
207 corenumstr[0] = (i / 10) + '0';
208 corenumstr[2] = '\0';
211 printf("Error: i >= 100\n");
215 char path[targetlen + sourcelen + 1];
216 strcpy(path, pathhead);
217 strncat(path, corenumstr, sourcelen);
218 int oflags = O_RDONLY|O_CREAT|O_NONBLOCK;
219 int omodes = S_IRWXU|S_IRWXG|S_IRWXO;
221 mqd[i]= mq_open(path, oflags, omodes, NULL);
223 printf("[Main] mq_open %s fails: %d, error: %s\n", path, mqd[i], strerror(errno));
226 printf("[Main] mq_open %s returns: %d\n", path, mqd[i]);
231 pthread_key_create(&key, NULL);
233 // create the lock table and initialize its mutex
234 locktbl = allocateRuntimeHash(20);
235 int rc_locktbl = pthread_rwlock_init(&rwlock_tbl, NULL);
236 printf("[Main] initialize the rwlock for lock table: %d error: \n", rc_locktbl, strerror(rc_locktbl));
238 for(i = 0; i < NUMCORES; ++i) {
239 thread_data_array[i].corenum = i;
240 thread_data_array[i].argc = argc;
241 thread_data_array[i].argv = argv;
242 thread_data_array[i].numsendobjs = 0;
243 thread_data_array[i].numreceiveobjs = 0;
244 printf("[main] creating thread %d\n", i);
245 rc[i] = pthread_create(&threads[i], NULL, run, (void *)&thread_data_array[i]);
247 printf("[main] ERROR; return code from pthread_create() is %d\n", rc[i]);
253 //pthread_exit(NULL);
257 void run(void* arg) {
258 struct thread_data * my_tdata = (struct thread_data *)arg;
259 pthread_setspecific(key, (void *)my_tdata->corenum);
260 int argc = my_tdata->argc;
261 char** argv = my_tdata->argv;
262 printf("[run, %d] Thread %d runs: %x\n", my_tdata->corenum, my_tdata->corenum, (int)pthread_self());
268 GC_init(); // Initialize the garbage collector
276 initializeexithandler();
278 raw_test_pass(0xee05);
280 /* Create table for failed tasks */
282 if(corenum > NUMCORES - 1) {
290 raw_test_pass(0xee06);
293 /*failedtasks=genallocatehashtable((unsigned int (*)(void *)) &hashCodetpd,
294 (int (*)(void *,void *)) &comparetpd);*/
297 raw_test_pass(0xee07);
299 /* Create queue of active tasks */
300 activetasks=genallocatehashtable((unsigned int (*)(void *)) &hashCodetpd,
301 (int (*)(void *,void *)) &comparetpd);
303 raw_test_pass(0xee08);
306 /* Process task information */
309 raw_test_pass(0xee09);
312 if(STARTUPCORE == corenum) {
313 /* Create startup object */
314 createstartupobject(argc, argv);
317 raw_test_pass(0xee0a);
322 raw_test_pass(0xee0b);
327 while(receiveObject() != -1) {
331 // check if there are new active tasks can be executed
335 while(receiveObject() != -1) {
340 raw_test_pass(0xee0c);
343 // check if there are some pending objects, if yes, enqueue them and executetasks again
346 raw_test_pass(0xee0d);
348 while(!isEmpty(&objqueue)) {
351 raw_user_interrupts_off();
354 raw_test_pass(0xeee1);
358 objitem = getTail(&objqueue);
359 //obj = objitem->objectptr;
360 objInfo = (struct transObjInfo *)objitem->objectptr;
361 obj = objInfo->objptr;
363 raw_test_pass_reg((int)obj);
365 // grab lock and flush the obj
372 raw_test_pass_reg(grount);
384 raw_invalidate_cache_range(obj, classsize[((struct ___Object___ *)obj)->type]);
386 /*for(k = 0; k < classsize[((struct ___Object___ *)obj)->type]; ++k) {
387 invalidateAddr(obj + k);
389 // enqueue the object
390 for(k = 0; k < objInfo->length; ++k) {
391 int taskindex = objInfo->queues[2 * k];
392 int paramindex = objInfo->queues[2 * k + 1];
393 struct parameterwrapper ** queues = &(paramqueues[corenum][taskindex][paramindex]);
395 raw_test_pass_reg(taskindex);
396 raw_test_pass_reg(paramindex);
398 enqueueObject_I(obj, queues, 1);
400 removeItem(&objqueue, objitem);
401 releasereadlock_I(obj);
402 RUNFREE(objInfo->queues);
404 /*enqueueObject_I(obj, NULL, 0);
405 removeItem(&objqueue, objitem);
406 releasereadlock_I(obj);*/
409 // put it at the end of the queue
410 // and try to execute active tasks already enqueued first
411 removeItem(&objqueue, objitem);
412 addNewItem_I(&objqueue, objInfo);
414 raw_user_interrupts_on();
419 raw_user_interrupts_on();
422 raw_test_pass(0xee0e);
426 raw_test_pass(0xee0f);
431 if(STARTUPCORE == corenum) {
434 raw_test_pass(0xee10);
439 raw_user_interrupts_off();
441 corestatus[corenum] = 0;
442 numsendobjs[corenum] = self_numsendobjs;
443 numreceiveobjs[corenum] = self_numreceiveobjs;
444 // check the status of all cores
447 raw_test_pass_reg(NUMCORES);
449 for(i = 0; i < NUMCORES; ++i) {
451 raw_test_pass(0xe000 + corestatus[i]);
453 if(corestatus[i] != 0) {
459 // check if the sum of send objs and receive obj are the same
461 // no->go on executing
463 for(i = 0; i < NUMCORES; ++i) {
464 sumsendobj += numsendobjs[i];
466 raw_test_pass(0xf000 + numsendobjs[i]);
469 for(i = 0; i < NUMCORES; ++i) {
470 sumsendobj -= numreceiveobjs[i];
472 raw_test_pass(0xf000 + numreceiveobjs[i]);
475 if(0 == sumsendobj) {
478 raw_test_pass(0xee11);
480 raw_test_pass(raw_get_cycle());
481 raw_test_done(1); // All done.
485 raw_user_interrupts_on();
490 raw_test_pass(0xee12);
493 // wait for some time
496 raw_test_pass(0xee13);
501 raw_test_pass(0xee14);
504 // send StallMsg to startup core
506 raw_test_pass(0xee15);
508 sendStall = transStallMsg(STARTUPCORE);
514 raw_test_pass(0xee16);
521 #elif defined THREADSIMULATE
522 /* Start executing the tasks */
526 // check if there are new objects coming
527 bool sendStall = false;
529 int numofcore = pthread_getspecific(key);
531 switch(receiveObject()) {
533 printf("[run, %d] receive an object\n", numofcore);
535 // received an object
536 // check if there are new active tasks can be executed
541 //printf("[run, %d] no msg\n", numofcore);
543 if(STARTUPCORE == numofcore) {
544 corestatus[numofcore] = 0;
545 // check the status of all cores
546 bool allStall = true;
547 for(i = 0; i < NUMCORES; ++i) {
548 if(corestatus[i] != 0) {
554 // check if the sum of send objs and receive obj are the same
556 // no->go on executing
558 for(i = 0; i < NUMCORES; ++i) {
559 sumsendobj += numsendobjs[i];
561 for(i = 0; i < NUMCORES; ++i) {
562 sumsendobj -= numreceiveobjs[i];
564 if(0 == sumsendobj) {
568 int rc_tbl = pthread_rwlock_wrlock(&rwlock_tbl);
569 printf("[run, %d] getting the write lock for locktbl: %d error: \n", numofcore, rc_tbl, strerror(rc_tbl));
570 struct RuntimeIterator* it_lock = RuntimeHashcreateiterator(locktbl);
571 while(0 != RunhasNext(it_lock)) {
572 int key = Runkey(it_lock);
573 pthread_rwlock_t* rwlock_obj = (pthread_rwlock_t*)Runnext(it_lock);
574 int rc_des = pthread_rwlock_destroy(rwlock_obj);
575 printf("[run, %d] destroy the rwlock for object: %d error: \n", numofcore, key, strerror(rc_des));
578 freeRuntimeHash(locktbl);
582 // destroy all message queues
583 char * pathhead = "/msgqueue_";
584 int targetlen = strlen(pathhead);
585 for(i = 0; i < NUMCORES; ++i) {
589 corenumstr[0] = i + '0';
590 corenumstr[1] = '\0';
593 corenumstr[1] = i %10 + '0';
594 corenumstr[0] = (i / 10) + '0';
595 corenumstr[2] = '\0';
598 printf("Error: i >= 100\n");
602 char path[targetlen + sourcelen + 1];
603 strcpy(path, pathhead);
604 strncat(path, corenumstr, sourcelen);
608 printf("[run, %d] terminate!\n", numofcore);
615 // send StallMsg to startup core
616 sendStall = transStallMsg(STARTUPCORE);
622 printf("[run, %d] receive a stall msg\n", numofcore);
623 // receive a Stall Msg, do nothing
624 assert(STARTUPCORE == numofcore); // only startup core can receive such msg
629 printf("[run, %d] receive a terminate msg\n", numofcore);
630 // receive a terminate Msg
631 assert(STARTUPCORE != corenum); // only non-startup core can receive such msg
632 mq_close(mqd[corenum]);
638 printf("[run, %d] Error: invalid message type.\n", numofcore);
648 void createstartupobject(int argc, char ** argv) {
651 /* Allocate startup object */
653 struct ___StartupObject___ *startupobject=(struct ___StartupObject___*) allocate_new(NULL, STARTUPTYPE);
654 struct ArrayObject * stringarray=allocate_newarray(NULL, STRINGARRAYTYPE, argc-1);
656 struct ___StartupObject___ *startupobject=(struct ___StartupObject___*) allocate_new(STARTUPTYPE);
657 struct ArrayObject * stringarray=allocate_newarray(STRINGARRAYTYPE, argc-1);
659 /* Build array of strings */
660 startupobject->___parameters___=stringarray;
661 for(i=1;i<argc;i++) {
662 int length=strlen(argv[i]);
664 struct ___String___ *newstring=NewString(NULL, argv[i],length);
666 struct ___String___ *newstring=NewString(argv[i],length);
668 ((void **)(((char *)& stringarray->___length___)+sizeof(int)))[i-1]=newstring;
671 startupobject->isolate = 1;
672 startupobject->version = 0;
674 /* Set initialized flag for startup object */
675 flagorandinit(startupobject,1,0xFFFFFFFF);
676 enqueueObject(startupobject, NULL, 0);
679 raw_flush_entire_cache();
683 int hashCodetpd(struct taskparamdescriptor *ftd) {
684 int hash=(int)ftd->task;
686 for(i=0;i<ftd->numParameters;i++){
687 hash^=(int)ftd->parameterArray[i];
692 int comparetpd(struct taskparamdescriptor *ftd1, struct taskparamdescriptor *ftd2) {
694 if (ftd1->task!=ftd2->task)
696 for(i=0;i<ftd1->numParameters;i++)
697 if(ftd1->parameterArray[i]!=ftd2->parameterArray[i])
702 /* This function sets a tag. */
704 void tagset(void *ptr, struct ___Object___ * obj, struct ___TagDescriptor___ * tagd) {
706 void tagset(struct ___Object___ * obj, struct ___TagDescriptor___ * tagd) {
708 struct ArrayObject * ao=NULL;
709 struct ___Object___ * tagptr=obj->___tags___;
711 raw_test_pass(0xebb0);
715 raw_test_pass(0xebb1);
717 obj->___tags___=(struct ___Object___ *)tagd;
719 /* Have to check if it is already set */
720 if (tagptr->type==TAGTYPE) {
721 struct ___TagDescriptor___ * td=(struct ___TagDescriptor___ *) tagptr;
723 raw_test_pass(0xebb2);
727 raw_test_pass(0xebb3);
732 int ptrarray[]={2, (int) ptr, (int) obj, (int)tagd};
733 struct ArrayObject * ao=allocate_newarray(&ptrarray,TAGARRAYTYPE,TAGARRAYINTERVAL);
734 obj=(struct ___Object___ *)ptrarray[2];
735 tagd=(struct ___TagDescriptor___ *)ptrarray[3];
736 td=(struct ___TagDescriptor___ *) obj->___tags___;
739 raw_test_pass(0xebb4);
741 ao=allocate_newarray(TAGARRAYTYPE,TAGARRAYINTERVAL);
744 raw_test_pass(0xebb5);
746 ARRAYSET(ao, struct ___TagDescriptor___ *, 0, td);
747 ARRAYSET(ao, struct ___TagDescriptor___ *, 1, tagd);
748 obj->___tags___=(struct ___Object___ *) ao;
749 ao->___cachedCode___=2;
751 raw_test_pass(0xebb6);
756 struct ArrayObject *ao=(struct ArrayObject *) tagptr;
758 raw_test_pass(0xebb7);
760 for(i=0;i<ao->___cachedCode___;i++) {
761 struct ___TagDescriptor___ * td=ARRAYGET(ao, struct ___TagDescriptor___*, i);
763 raw_test_pass(0xebb8);
767 raw_test_pass(0xebb9);
772 if (ao->___cachedCode___<ao->___length___) {
774 raw_test_pass(0xebba);
776 ARRAYSET(ao, struct ___TagDescriptor___ *, ao->___cachedCode___, tagd);
777 ao->___cachedCode___++;
779 raw_test_pass(0xebbb);
783 int ptrarray[]={2,(int) ptr, (int) obj, (int) tagd};
784 struct ArrayObject * aonew=allocate_newarray(&ptrarray,TAGARRAYTYPE,TAGARRAYINTERVAL+ao->___length___);
785 obj=(struct ___Object___ *)ptrarray[2];
786 tagd=(struct ___TagDescriptor___ *) ptrarray[3];
787 ao=(struct ArrayObject *)obj->___tags___;
789 struct ArrayObject * aonew=allocate_newarray(TAGARRAYTYPE,TAGARRAYINTERVAL+ao->___length___);
792 raw_test_pass(0xebbc);
794 aonew->___cachedCode___=ao->___length___+1;
795 for(i=0;i<ao->___length___;i++) {
797 raw_test_pass(0xebbd);
799 ARRAYSET(aonew, struct ___TagDescriptor___*, i, ARRAYGET(ao, struct ___TagDescriptor___*, i));
802 raw_test_pass(0xebbe);
804 ARRAYSET(aonew, struct ___TagDescriptor___ *, ao->___length___, tagd);
806 raw_test_pass(0xebbf);
813 struct ___Object___ * tagset=tagd->flagptr;
815 raw_test_pass(0xb008);
819 raw_test_pass(0xb009);
822 } else if (tagset->type!=OBJECTARRAYTYPE) {
824 int ptrarray[]={2, (int) ptr, (int) obj, (int)tagd};
825 struct ArrayObject * ao=allocate_newarray(&ptrarray,OBJECTARRAYTYPE,OBJECTARRAYINTERVAL);
826 obj=(struct ___Object___ *)ptrarray[2];
827 tagd=(struct ___TagDescriptor___ *)ptrarray[3];
829 struct ArrayObject * ao=allocate_newarray(OBJECTARRAYTYPE,OBJECTARRAYINTERVAL);
831 ARRAYSET(ao, struct ___Object___ *, 0, tagd->flagptr);
832 ARRAYSET(ao, struct ___Object___ *, 1, obj);
833 ao->___cachedCode___=2;
834 tagd->flagptr=(struct ___Object___ *)ao;
836 raw_test_pass(0xb00a);
839 struct ArrayObject *ao=(struct ArrayObject *) tagset;
840 if (ao->___cachedCode___<ao->___length___) {
842 raw_test_pass(0xb00b);
844 ARRAYSET(ao, struct ___Object___*, ao->___cachedCode___++, obj);
848 int ptrarray[]={2, (int) ptr, (int) obj, (int)tagd};
849 struct ArrayObject * aonew=allocate_newarray(&ptrarray,OBJECTARRAYTYPE,OBJECTARRAYINTERVAL+ao->___length___);
850 obj=(struct ___Object___ *)ptrarray[2];
851 tagd=(struct ___TagDescriptor___ *)ptrarray[3];
852 ao=(struct ArrayObject *)tagd->flagptr;
854 struct ArrayObject * aonew=allocate_newarray(OBJECTARRAYTYPE,OBJECTARRAYINTERVAL);
856 aonew->___cachedCode___=ao->___cachedCode___+1;
857 for(i=0;i<ao->___length___;i++) {
858 ARRAYSET(aonew, struct ___Object___*, i, ARRAYGET(ao, struct ___Object___*, i));
860 ARRAYSET(aonew, struct ___Object___ *, ao->___cachedCode___, obj);
861 tagd->flagptr=(struct ___Object___ *) aonew;
863 raw_test_pass(0xb00c);
870 /* This function clears a tag. */
872 void tagclear(void *ptr, struct ___Object___ * obj, struct ___TagDescriptor___ * tagd) {
874 void tagclear(struct ___Object___ * obj, struct ___TagDescriptor___ * tagd) {
876 /* We'll assume that tag is alway there.
877 Need to statically check for this of course. */
878 struct ___Object___ * tagptr=obj->___tags___;
880 if (tagptr->type==TAGTYPE) {
881 if ((struct ___TagDescriptor___ *)tagptr==tagd)
882 obj->___tags___=NULL;
885 printf("ERROR 1 in tagclear\n");
889 struct ArrayObject *ao=(struct ArrayObject *) tagptr;
891 for(i=0;i<ao->___cachedCode___;i++) {
892 struct ___TagDescriptor___ * td=ARRAYGET(ao, struct ___TagDescriptor___ *, i);
894 ao->___cachedCode___--;
895 if (i<ao->___cachedCode___)
896 ARRAYSET(ao, struct ___TagDescriptor___ *, i, ARRAYGET(ao, struct ___TagDescriptor___ *, ao->___cachedCode___));
897 ARRAYSET(ao, struct ___TagDescriptor___ *, ao->___cachedCode___, NULL);
898 if (ao->___cachedCode___==0)
899 obj->___tags___=NULL;
904 printf("ERROR 2 in tagclear\n");
910 struct ___Object___ *tagset=tagd->flagptr;
911 if (tagset->type!=OBJECTARRAYTYPE) {
916 printf("ERROR 3 in tagclear\n");
920 struct ArrayObject *ao=(struct ArrayObject *) tagset;
922 for(i=0;i<ao->___cachedCode___;i++) {
923 struct ___Object___ * tobj=ARRAYGET(ao, struct ___Object___ *, i);
925 ao->___cachedCode___--;
926 if (i<ao->___cachedCode___)
927 ARRAYSET(ao, struct ___Object___ *, i, ARRAYGET(ao, struct ___Object___ *, ao->___cachedCode___));
928 ARRAYSET(ao, struct ___Object___ *, ao->___cachedCode___, NULL);
929 if (ao->___cachedCode___==0)
935 printf("ERROR 4 in tagclear\n");
943 /* This function allocates a new tag. */
945 struct ___TagDescriptor___ * allocate_tag(void *ptr, int index) {
946 struct ___TagDescriptor___ * v=(struct ___TagDescriptor___ *) mygcmalloc((struct garbagelist *) ptr, classsize[TAGTYPE]);
948 struct ___TagDescriptor___ * allocate_tag(int index) {
949 struct ___TagDescriptor___ * v=FREEMALLOC(classsize[TAGTYPE]);
958 /* This function updates the flag for object ptr. It or's the flag
959 with the or mask and and's it with the andmask. */
961 void flagbody(struct ___Object___ *ptr, int flag, struct parameterwrapper ** queues, int length, bool isnew);
963 int flagcomp(const int *val1, const int *val2) {
964 return (*val1)-(*val2);
967 void flagorand(void * ptr, int ormask, int andmask, struct parameterwrapper ** queues, int length) {
969 int oldflag=((int *)ptr)[1];
970 int flag=ormask|oldflag;
973 raw_test_pass_reg((int)ptr);
974 raw_test_pass(0xaa000000 + oldflag);
975 raw_test_pass(0xaa000000 + flag);
977 flagbody(ptr, flag, queues, length, false);
981 bool intflagorand(void * ptr, int ormask, int andmask) {
983 int oldflag=((int *)ptr)[1];
984 int flag=ormask|oldflag;
986 if (flag==oldflag) /* Don't do anything */
989 flagbody(ptr, flag, NULL, 0, false);
995 void flagorandinit(void * ptr, int ormask, int andmask) {
996 int oldflag=((int *)ptr)[1];
997 int flag=ormask|oldflag;
1000 raw_test_pass(0xaa100000 + oldflag);
1001 raw_test_pass(0xaa100000 + flag);
1003 flagbody(ptr,flag,NULL,0,true);
1006 void flagbody(struct ___Object___ *ptr, int flag, struct parameterwrapper ** vqueues, int vlength, bool isnew) {
1007 struct parameterwrapper * flagptr = NULL;
1009 struct parameterwrapper ** queues = vqueues;
1010 int length = vlength;
1012 int UNUSED, UNUSED2;
1013 int * enterflags = NULL;
1014 if((!isnew) && (queues == NULL)) {
1015 #ifdef THREADSIMULATE
1016 int numofcore = pthread_getspecific(key);
1017 queues = objectqueues[numofcore][ptr->type];
1018 length = numqueues[numofcore][ptr->type];
1021 if(corenum < NUMCORES) {
1023 queues = objectqueues[corenum][ptr->type];
1024 length = numqueues[corenum][ptr->type];
1034 raw_test_pass(0xbb000000 + ptr->flag);
1037 /*Remove object from all queues */
1038 for(i = 0; i < length; ++i) {
1039 flagptr = queues[i];
1040 ObjectHashget(flagptr->objectset, (int) ptr, (int *) &next, (int *) &enterflags, &UNUSED, &UNUSED2);
1041 ObjectHashremove(flagptr->objectset, (int)ptr);
1042 if (enterflags!=NULL)
1043 RUNFREE(enterflags);
1047 void enqueueObject(void * vptr, struct parameterwrapper ** vqueues, int vlength) {
1048 struct ___Object___ *ptr = (struct ___Object___ *)vptr;
1051 struct QueueItem *tmpptr;
1052 struct parameterwrapper * parameter=NULL;
1055 struct parameterwrapper * prevptr=NULL;
1056 struct ___Object___ *tagptr=NULL;
1057 struct parameterwrapper ** queues = vqueues;
1058 int length = vlength;
1060 if(corenum > NUMCORES - 1) {
1064 if(queues == NULL) {
1065 #ifdef THREADSIMULATE
1066 int numofcore = pthread_getspecific(key);
1067 queues = objectqueues[numofcore][ptr->type];
1068 length = numqueues[numofcore][ptr->type];
1070 queues = objectqueues[corenum][ptr->type];
1071 length = numqueues[corenum][ptr->type];
1074 tagptr=ptr->___tags___;
1076 /* Outer loop iterates through all parameter queues an object of
1077 this type could be in. */
1078 for(j = 0; j < length; ++j) {
1079 parameter = queues[j];
1081 if (parameter->numbertags>0) {
1083 goto nextloop;//that means the object has no tag but that param needs tag
1084 else if(tagptr->type==TAGTYPE) {//one tag
1085 struct ___TagDescriptor___ * tag=(struct ___TagDescriptor___*) tagptr;
1086 for(i=0;i<parameter->numbertags;i++) {
1087 //slotid is parameter->tagarray[2*i];
1088 int tagid=parameter->tagarray[2*i+1];
1089 if (tagid!=tagptr->flag)
1090 goto nextloop; /*We don't have this tag */
1092 } else {//multiple tags
1093 struct ArrayObject * ao=(struct ArrayObject *) tagptr;
1094 for(i=0;i<parameter->numbertags;i++) {
1095 //slotid is parameter->tagarray[2*i];
1096 int tagid=parameter->tagarray[2*i+1];
1098 for(j=0;j<ao->___cachedCode___;j++) {
1099 if (tagid==ARRAYGET(ao, struct ___TagDescriptor___*, j)->flag)
1110 for(i=0;i<parameter->numberofterms;i++) {
1111 int andmask=parameter->intarray[i*2];
1112 int checkmask=parameter->intarray[i*2+1];
1113 if ((ptr->flag&andmask)==checkmask) {
1115 raw_test_pass(0xcc000000 + andmask);
1116 raw_test_pass_reg((int)ptr);
1117 raw_test_pass(0xcc000000 + ptr->flag);
1118 raw_test_pass(0xcc000000 + checkmask);
1120 enqueuetasks(parameter, prevptr, ptr, NULL, 0);
1132 void enqueueObject_I(void * vptr, struct parameterwrapper ** vqueues, int vlength) {
1133 struct ___Object___ *ptr = (struct ___Object___ *)vptr;
1136 struct QueueItem *tmpptr;
1137 struct parameterwrapper * parameter=NULL;
1140 struct parameterwrapper * prevptr=NULL;
1141 struct ___Object___ *tagptr=NULL;
1142 struct parameterwrapper ** queues = vqueues;
1143 int length = vlength;
1145 if(corenum > NUMCORES - 1) {
1149 if(queues == NULL) {
1150 #ifdef THREADSIMULATE
1151 int numofcore = pthread_getspecific(key);
1152 queues = objectqueues[numofcore][ptr->type];
1153 length = numqueues[numofcore][ptr->type];
1155 queues = objectqueues[corenum][ptr->type];
1156 length = numqueues[corenum][ptr->type];
1160 raw_test_pass(0xeaa1);
1161 raw_test_pass_reg(queues);
1162 raw_test_pass_reg(length);
1164 tagptr=ptr->___tags___;
1166 /* Outer loop iterates through all parameter queues an object of
1167 this type could be in. */
1168 for(j = 0; j < length; ++j) {
1169 parameter = queues[j];
1171 if (parameter->numbertags>0) {
1173 raw_test_pass(0xeaa2);
1174 raw_test_pass_reg(tagptr);
1177 goto nextloop;//that means the object has no tag but that param needs tag
1178 else if(tagptr->type==TAGTYPE) {//one tag
1179 struct ___TagDescriptor___ * tag=(struct ___TagDescriptor___*) tagptr;
1181 raw_test_pass(0xeaa3);
1183 for(i=0;i<parameter->numbertags;i++) {
1184 //slotid is parameter->tagarray[2*i];
1185 int tagid=parameter->tagarray[2*i+1];
1186 if (tagid!=tagptr->flag) {
1188 raw_test_pass(0xeaa4);
1190 goto nextloop; /*We don't have this tag */
1193 } else {//multiple tags
1194 struct ArrayObject * ao=(struct ArrayObject *) tagptr;
1196 raw_test_pass(0xeaa5);
1198 for(i=0;i<parameter->numbertags;i++) {
1199 //slotid is parameter->tagarray[2*i];
1200 int tagid=parameter->tagarray[2*i+1];
1202 for(j=0;j<ao->___cachedCode___;j++) {
1203 if (tagid==ARRAYGET(ao, struct ___TagDescriptor___*, j)->flag) {
1208 raw_test_pass(0xeaa6);
1218 for(i=0;i<parameter->numberofterms;i++) {
1219 int andmask=parameter->intarray[i*2];
1220 int checkmask=parameter->intarray[i*2+1];
1222 raw_test_pass(0xeaa7);
1223 raw_test_pass(0xcc000000 + andmask);
1224 raw_test_pass_reg(ptr);
1225 raw_test_pass(0xcc000000 + ptr->flag);
1226 raw_test_pass(0xcc000000 + checkmask);
1228 if ((ptr->flag&andmask)==checkmask) {
1230 raw_test_pass(0xeaa8);
1232 enqueuetasks_I(parameter, prevptr, ptr, NULL, 0);
1243 // helper function to compute the coordinates of a core from the core number
1244 void calCoords(int core_num, int* coordY, int* coordX) {
1245 *coordX = core_num % 4;
1246 *coordY = core_num / 4;
1250 /* Message format for RAW version:
1252 * type: 0 -- transfer object
1253 * 1 -- transfer stall msg
1259 * ObjMsg: 0 + size of msg + obj's address + (task index + param index)+
1260 * StallMsg: 1 + corenum + sendobjs + receiveobjs (size is always 4 * sizeof(int))
1261 * LockMsg: 2 + lock type + obj pointer + request core (size is always 4 * sizeof(int))
1262 * 3/4/5 + lock type + obj pointer (size is always 3 * sizeof(int))
1263 * lock type: 0 -- read; 1 -- write
1266 // transfer an object to targetcore
1268 void transferObject(struct transObjInfo * transObj) {
1269 void * obj = transObj->objptr;
1270 int type=((int *)obj)[0];
1271 int size=classsize[type];
1272 int targetcore = transObj->targetcore;
1273 //assert(type < NUMCLASSES); // can only transfer normal object
1277 int self_y, self_x, target_y, target_x;
1279 // for 32 bit machine, the size of fixed part is always 3 words
1280 //int msgsize = sizeof(int) * 2 + sizeof(void *);
1281 int msgsize = 3 + transObj->length * 2;
1284 struct ___Object___ * newobj = (struct ___Object___ *)obj;
1285 /*if(0 == newobj->isolate) {
1289 calCoords(corenum, &self_y, &self_x);
1290 calCoords(targetcore, &target_y, &target_x);
1291 // Build the message header
1292 msgHdr = construct_dyn_hdr(0, msgsize, 0, // msgsize word sent.
1294 target_y, target_x);
1295 gdn_send(msgHdr); // Send the message header to EAST to handle fab(n - 1).
1297 raw_test_pass(0xbbbb);
1298 raw_test_pass(0xb000 + targetcore); // targetcore
1306 raw_test_pass_reg(msgsize);
1310 raw_test_pass_reg(obj);
1312 for(i = 0; i < transObj->length; ++i) {
1313 int taskindex = transObj->queues[2*i];
1314 int paramindex = transObj->queues[2*i+1];
1315 gdn_send(taskindex);
1317 raw_test_pass_reg(taskindex);
1319 gdn_send(paramindex);
1321 raw_test_pass_reg(paramindex);
1325 raw_test_pass(0xffff);
1327 ++(self_numsendobjs);
1328 #elif defined THREADSIMULATE
1329 int numofcore = pthread_getspecific(key);
1331 // use POSIX message queue to transfer objects between cores
1335 if(targetcore < 10) {
1336 corenumstr[0] = targetcore + '0';
1337 corenumstr[1] = '\0';
1339 } else if(targetcore < 100) {
1340 corenumstr[1] = targetcore % 10 + '0';
1341 corenumstr[0] = (targetcore / 10) + '0';
1342 corenumstr[2] = '\0';
1345 printf("Error: targetcore >= 100\n");
1349 char * pathhead = "/msgqueue_";
1350 int targetlen = strlen(pathhead);
1351 char path[targetlen + sourcelen + 1];
1352 strcpy(path, pathhead);
1353 strncat(path, corenumstr, sourcelen);
1354 int oflags = O_WRONLY|O_NONBLOCK;
1355 int omodes = S_IRWXU|S_IRWXG|S_IRWXO;
1356 mqdnum = mq_open(path, oflags, omodes, NULL);
1358 printf("[transferObject, %d] mq_open %s fail: %d, error: %s\n", numofcore, path, mqdnum, strerror(errno));
1362 /*struct ___Object___ * newobj = (struct ___Object___ *)obj;
1363 if(0 == newobj->isolate) {
1364 newobj = RUNMALLOC(size);
1365 memcpy(newobj, obj, size);
1366 newobj->original=obj;
1368 struct transObjInfo * tmptransObj = RUNMALLOC(sizeof(struct transObjInfo));
1369 memcpy(tmptransObj, transObj, sizeof(struct transObjInfo));
1370 int * tmpqueue = RUNMALLOC(sizeof(int)*2*tmptransObj->length);
1371 memcpy(tmpqueue, tmptransObj->queues, sizeof(int)*2*tmptransObj->length);
1372 tmptransObj->queues = tmpqueue;
1373 struct ___Object___ * newobj = RUNMALLOC(sizeof(struct ___Object___));
1374 newobj->type = ((struct ___Object___ *)obj)->type;
1375 newobj->original = (struct ___Object___ *)tmptransObj;
1378 ret=mq_send(mqdnum, (void *)newobj, sizeof(struct ___Object___), 0); // send the object into the queue
1380 printf("[transferObject, %d] mq_send to %s returned: %d, error: %s\n", numofcore, path, ret, strerror(errno));
1384 if(numofcore == STARTUPCORE) {
1385 ++numsendobjs[numofcore];
1387 ++(thread_data_array[numofcore].numsendobjs);
1389 printf("[transferObject, %d] mq_send to %s returned: $%x\n", numofcore, path, ret);
1393 // send terminate message to targetcore
1395 bool transStallMsg(int targetcore) {
1398 int self_y, self_x, target_y, target_x;
1399 // for 32 bit machine, the size is always 4 words
1400 //int msgsize = sizeof(int) * 4;
1403 calCoords(corenum, &self_y, &self_x);
1404 calCoords(targetcore, &target_y, &target_x);
1405 // Build the message header
1406 msgHdr = construct_dyn_hdr(0, msgsize, 0, // msgsize word sent.
1408 target_y, target_x);
1409 gdn_send(msgHdr); // Send the message header to EAST to handle fab(n - 1).
1411 raw_test_pass(0xbbbb);
1412 raw_test_pass(0xb000 + targetcore); // targetcore
1420 raw_test_pass_reg(corenum);
1422 gdn_send(self_numsendobjs);
1424 raw_test_pass_reg(self_numsendobjs);
1426 gdn_send(self_numreceiveobjs);
1428 raw_test_pass_reg(self_numreceiveobjs);
1429 raw_test_pass(0xffff);
1432 #elif defined THREADSIMULATE
1433 struct ___Object___ *newobj = RUNMALLOC(sizeof(struct ___Object___));
1434 // use the first four int field to hold msgtype/corenum/sendobj/receiveobj
1436 int numofcore = pthread_getspecific(key);
1437 newobj->flag = numofcore;
1438 newobj->___cachedHash___ = thread_data_array[numofcore].numsendobjs;
1439 newobj->___cachedCode___ = thread_data_array[numofcore].numreceiveobjs;
1441 // use POSIX message queue to send stall msg to startup core
1442 assert(targetcore == STARTUPCORE);
1446 if(targetcore < 10) {
1447 corenumstr[0] = targetcore + '0';
1448 corenumstr[1] = '\0';
1450 } else if(targetcore < 100) {
1451 corenumstr[1] = targetcore % 10 + '0';
1452 corenumstr[0] = (targetcore / 10) + '0';
1453 corenumstr[2] = '\0';
1456 printf("Error: targetcore >= 100\n");
1460 char * pathhead = "/msgqueue_";
1461 int targetlen = strlen(pathhead);
1462 char path[targetlen + sourcelen + 1];
1463 strcpy(path, pathhead);
1464 strncat(path, corenumstr, sourcelen);
1465 int oflags = O_WRONLY|O_NONBLOCK;
1466 int omodes = S_IRWXU|S_IRWXG|S_IRWXO;
1467 mqdnum = mq_open(path, oflags, omodes, NULL);
1469 printf("[transStallMsg, %d] mq_open %s fail: %d, error: %s\n", numofcore, path, mqdnum, strerror(errno));
1474 ret=mq_send(mqdnum, (void *)newobj, sizeof(struct ___Object___), 0); // send the object into the queue
1476 printf("[transStallMsg, %d] mq_send to %s returned: %d, error: %s\n", numofcore, path, ret, strerror(errno));
1480 printf("[transStallMsg, %d] mq_send to %s returned: $%x\n", numofcore, path, ret);
1481 printf("<transStallMsg> to %s index: %d, sendobjs: %d, receiveobjs: %d\n", path, newobj->flag, newobj->___cachedHash___, newobj->___cachedCode___);
1488 // receive object transferred from other cores
1489 // or the terminate message from other cores
1490 // NOTICE: following format is for threadsimulate version only
1491 // RAW version please see previous description
1492 // format: type + object
1493 // type: -1--stall msg
1495 // return value: 0--received an object
1496 // 1--received nothing
1497 // 2--received a Stall Msg
1498 // 3--received a lock Msg
1499 // RAW version: -1 -- received nothing
1500 // otherwise -- received msg type
1501 int receiveObject() {
1505 int self_y, self_x, target_y, target_x;
1507 if(gdn_input_avail() == 0) {
1509 if(corenum < NUMCORES) {
1510 raw_test_pass(0xd001);
1517 raw_test_pass(0xcccc);
1519 while((gdn_input_avail() != 0) && (msgdataindex < msglength)) {
1520 msgdata[msgdataindex] = gdn_receive();
1521 if(msgdataindex == 0) {
1522 if(msgdata[0] > 2) {
1524 } else if(msgdata[0] > 0) {
1527 } else if((msgdataindex == 1) && (msgdata[0] == 0)) {
1528 msglength = msgdata[msgdataindex];
1531 raw_test_pass_reg(msgdata[msgdataindex]);
1535 /*if(msgdataindex == 0) {
1537 msgtype = gdn_receive();
1544 msgdata = (int *)RUNMALLOC_I(msglength * sizeof(int));
1545 msgdata[msgdataindex] = msgtype;
1548 raw_test_pass_reg(msgtype);
1550 } else if((msgdataindex == 1) && (msgtype == 0)) {
1551 // object transfer msg
1552 msglength = gdn_receive();
1553 msgdata = (int *)RUNMALLOC_I(msglength * sizeof(int));
1554 msgdata[0] = msgtype;
1555 msgdata[msgdataindex] = msglength;
1557 raw_test_pass_reg(msgdata[msgdataindex]);
1560 msgdata[msgdataindex] = gdn_receive();
1562 raw_test_pass_reg(msgdata[msgdataindex]);
1568 raw_test_pass(0xffff);
1570 if(msgdataindex == msglength) {
1571 // received a whole msg
1572 int type, data1, data2; // will receive at least 3 words including type
1578 // receive a object transfer msg
1579 struct transObjInfo * transObj = RUNMALLOC_I(sizeof(struct transObjInfo));
1581 if(corenum > NUMCORES - 1) {
1582 raw_test_done(0xa00a);
1584 // store the object and its corresponding queue info, enqueue it later
1585 transObj->objptr = (void *)data2; // data1 is now size of the msg
1586 transObj->length = (msglength - 3) / 2;
1587 transObj->queues = RUNMALLOC_I(sizeof(int)*(msglength - 3));
1588 for(k = 0; k < transObj->length; ++k) {
1589 transObj->queues[2*k] = msgdata[3+2*k];
1591 raw_test_pass_reg(transObj->queues[2*k]);
1593 transObj->queues[2*k+1] = msgdata[3+2*k+1];
1595 raw_test_pass_reg(transObj->queues[2*k+1]);
1598 //memcpy(transObj->queues, msgdata[3], sizeof(int)*(msglength - 3));
1599 addNewItem_I(&objqueue, (void *)transObj);
1600 ++(self_numreceiveobjs);
1602 raw_test_pass(0xe881);
1605 addNewItem_I(&objqueue, (void *)data2);
1606 ++(self_numreceiveobjs);
1608 raw_test_pass(0xe881);
1614 // receive a stall msg
1615 if(corenum != STARTUPCORE) {
1616 // non startup core can not receive stall msg
1618 raw_test_done(0xa001);
1620 if(data1 < NUMCORES) {
1622 raw_test_pass(0xe882);
1624 corestatus[data1] = 0;
1625 numsendobjs[data1] = data2;
1626 numreceiveobjs[data1] = msgdata[3];
1631 // receive lock request msg
1632 // for 32 bit machine, the size is always 3 words
1633 //int msgsize = sizeof(int) * 3;
1635 // lock request msg, handle it right now
1636 // check to see if there is a lock exist in locktbl for the required obj
1637 int data3 = msgdata[3];
1639 if(!RuntimeHashcontainskey(locktbl, data2)) {
1640 // no locks for this object
1641 // first time to operate on this shared object
1642 // create a lock for it
1643 // the lock is an integer: 0 -- stall, >0 -- read lock, -1 -- write lock
1645 raw_test_pass(0xe883);
1648 RuntimeHashadd_I(locktbl, data2, 1);
1650 RuntimeHashadd_I(locktbl, data2, -1);
1655 raw_test_pass(0xe884);
1657 RuntimeHashget(locktbl, data2, &rwlock_obj);
1659 raw_test_pass_reg(rwlock_obj);
1661 if(0 == rwlock_obj) {
1667 RuntimeHashremovekey(locktbl, data2);
1668 RuntimeHashadd_I(locktbl, data2, rwlock_obj);
1669 } else if((rwlock_obj > 0) && (data1 == 0)) {
1670 // read lock request and there are only read locks
1672 RuntimeHashremovekey(locktbl, data2);
1673 RuntimeHashadd_I(locktbl, data2, rwlock_obj);
1678 raw_test_pass_reg(rwlock_obj);
1682 calCoords(corenum, &self_y, &self_x);
1683 calCoords(targetcore, &target_y, &target_x);
1684 // Build the message header
1685 msgHdr = construct_dyn_hdr(0, msgsize, 0, // msgsize word sent.
1687 target_y, target_x);
1688 gdn_send(msgHdr); // Send the message header to EAST to handle fab(n - 1).
1690 raw_test_pass(0xbbbb);
1691 raw_test_pass(0xb000 + targetcore); // targetcore
1694 // deny the lock request
1695 gdn_send(4); // lock request
1700 // grount the lock request
1701 gdn_send(3); // lock request
1706 gdn_send(data1); // lock type
1708 raw_test_pass_reg(data1);
1710 gdn_send(data2); // lock target
1712 raw_test_pass_reg(data2);
1713 raw_test_pass(0xffff);
1718 // receive lock grount msg
1719 if(corenum > NUMCORES - 1) {
1720 raw_test_done(0xa00b);
1722 if(lockobj == data2) {
1729 // conflicts on lockresults
1730 raw_test_done(0xa002);
1735 // receive lock grount/deny msg
1736 if(corenum > NUMCORES - 1) {
1737 raw_test_done(0xa00c);
1739 if(lockobj == data2) {
1746 // conflicts on lockresults
1747 raw_test_done(0xa003);
1752 // receive lock release msg
1753 if(!RuntimeHashcontainskey(locktbl, data2)) {
1754 // no locks for this object, something is wrong
1755 raw_test_done(0xa004);
1758 RuntimeHashget(locktbl, data2, &rwlock_obj);
1760 raw_test_pass(0xe885);
1761 raw_test_pass_reg(rwlock_obj);
1768 RuntimeHashremovekey(locktbl, data2);
1769 RuntimeHashadd_I(locktbl, data2, rwlock_obj);
1771 raw_test_pass_reg(rwlock_obj);
1781 for(msgdataindex--;msgdataindex > 0; --msgdataindex) {
1782 msgdata[msgdataindex] = -1;
1788 raw_test_pass(0xe886);
1790 if(gdn_input_avail() != 0) {
1797 raw_test_pass(0xe887);
1801 #elif defined THREADSIMULATE
1802 int numofcore = pthread_getspecific(key);
1803 // use POSIX message queue to transfer object
1805 struct mq_attr mqattr;
1806 mq_getattr(mqd[numofcore], &mqattr);
1807 void * msgptr =RUNMALLOC(mqattr.mq_msgsize);
1808 msglen=mq_receive(mqd[numofcore], msgptr, mqattr.mq_msgsize, NULL); // receive the object into the queue
1814 //printf("msg: %s\n",msgptr);
1815 if(((int*)msgptr)[0] == -1) {
1817 struct ___Object___ * tmpptr = (struct ___Object___ *)msgptr;
1818 int index = tmpptr->flag;
1819 corestatus[index] = 0;
1820 numsendobjs[index] = tmpptr->___cachedHash___;
1821 numreceiveobjs[index] = tmpptr->___cachedCode___;
1822 printf("<receiveObject> index: %d, sendobjs: %d, reveiveobjs: %d\n", index, numsendobjs[index], numreceiveobjs[index]);
1825 } /*else if(((int*)msgptr)[0] == -2) {
1830 if(numofcore == STARTUPCORE) {
1831 ++(numreceiveobjs[numofcore]);
1833 ++(thread_data_array[numofcore].numreceiveobjs);
1835 struct ___Object___ * tmpptr = (struct ___Object___ *)msgptr;
1836 struct transObjInfo * transObj = (struct transObjInfo *)tmpptr->original;
1837 tmpptr = (struct ___Object___ *)(transObj->objptr);
1838 int type = tmpptr->type;
1839 int size=classsize[type];
1840 struct ___Object___ * newobj=RUNMALLOC(size);
1841 memcpy(newobj, tmpptr, size);
1842 if(0 == newobj->isolate) {
1843 newobj->original=tmpptr;
1848 for(k = 0; k < transObj->length; ++k) {
1849 int taskindex = transObj->queues[2 * k];
1850 int paramindex = transObj->queues[2 * k + 1];
1851 struct parameterwrapper ** queues = &(paramqueues[numofcore][taskindex][paramindex]);
1852 enqueueObject(newobj, queues, 1);
1854 RUNFREE(transObj->queues);
1861 bool getreadlock(void * ptr) {
1864 int self_y, self_x, target_y, target_x;
1865 int targetcore = ((int)ptr >> 5) % TOTALCORE;
1866 // for 32 bit machine, the size is always 4 words
1867 //int msgsize = sizeof(int) * 4;
1877 if(targetcore == corenum) {
1878 // reside on this core
1881 raw_user_interrupts_off();
1883 if(!RuntimeHashcontainskey(locktbl, (int)ptr)) {
1884 // no locks for this object
1885 // first time to operate on this shared object
1886 // create a lock for it
1887 // the lock is an integer: 0 -- stall, >0 -- read lock, -1 -- write lock
1888 RuntimeHashadd_I(locktbl, (int)ptr, 1);
1891 RuntimeHashget(locktbl, (int)ptr, &rwlock_obj);
1892 if(-1 != rwlock_obj) {
1894 RuntimeHashremovekey(locktbl, (int)ptr);
1895 RuntimeHashadd_I(locktbl, (int)ptr, rwlock_obj);
1901 raw_user_interrupts_on();
1903 if(lockobj == (int)ptr) {
1914 // conflicts on lockresults
1915 raw_test_done(0xa005);
1920 calCoords(corenum, &self_y, &self_x);
1921 calCoords(targetcore, &target_y, &target_x);
1922 // Build the message header
1923 msgHdr = construct_dyn_hdr(0, msgsize, 0, // msgsize word sent.
1925 target_y, target_x);
1926 gdn_send(msgHdr); // Send the message header to EAST to handle fab(n - 1).
1928 raw_test_pass(0xbbbb);
1929 raw_test_pass(0xb000 + targetcore); // targetcore
1931 gdn_send(2); // lock request
1935 gdn_send(0); // read lock
1941 raw_test_pass_reg(ptr);
1945 raw_test_pass_reg(corenum);
1946 raw_test_pass(0xffff);
1949 #elif defined THREADSIMULATE
1950 int numofcore = pthread_getspecific(key);
1952 int rc = pthread_rwlock_tryrdlock(&rwlock_tbl);
1953 printf("[getreadlock, %d] getting the read lock for locktbl: %d error: \n", numofcore, rc, strerror(rc));
1957 if(!RuntimeHashcontainskey(locktbl, (int)ptr)) {
1958 // no locks for this object
1959 // first time to operate on this shared object
1960 // create a lock for it
1961 rc = pthread_rwlock_unlock(&rwlock_tbl);
1962 printf("[getreadlock, %d] release the read lock for locktbl: %d error: \n", numofcore, rc, strerror(rc));
1963 pthread_rwlock_t* rwlock = (pthread_rwlock_t *)RUNMALLOC(sizeof(pthread_rwlock_t));
1964 memcpy(rwlock, &rwlock_init, sizeof(pthread_rwlock_t));
1965 rc = pthread_rwlock_init(rwlock, NULL);
1966 printf("[getreadlock, %d] initialize the rwlock for object %d: %d error: \n", numofcore, (int)ptr, rc, strerror(rc));
1967 rc = pthread_rwlock_trywrlock(&rwlock_tbl);
1968 printf("[getreadlock, %d] getting the write lock for locktbl: %d error: \n", numofcore, rc, strerror(rc));
1973 if(!RuntimeHashcontainskey(locktbl, (int)ptr)) {
1975 RuntimeHashadd(locktbl, (int)ptr, (int)rwlock);
1978 RuntimeHashget(locktbl, (int)ptr, (int*)&rwlock);
1980 rc = pthread_rwlock_unlock(&rwlock_tbl);
1981 printf("[getreadlock, %d] release the write lock for locktbl: %d error: \n", numofcore, rc, strerror(rc));
1983 rc = pthread_rwlock_tryrdlock(rwlock);
1984 printf("[getreadlock, %d] getting read lock for object %d: %d error: \n", numofcore, (int)ptr, rc, strerror(rc));
1991 pthread_rwlock_t* rwlock_obj = NULL;
1992 RuntimeHashget(locktbl, (int)ptr, (int*)&rwlock_obj);
1993 rc = pthread_rwlock_unlock(&rwlock_tbl);
1994 printf("[getreadlock, %d] release the read lock for locktbl: %d error: \n", numofcore, rc, strerror(rc));
1995 int rc_obj = pthread_rwlock_tryrdlock(rwlock_obj);
1996 printf("[getreadlock, %d] getting read lock for object %d: %d error: \n", numofcore, (int)ptr, rc_obj, strerror(rc_obj));
2006 void releasereadlock(void * ptr) {
2009 int self_y, self_x, target_y, target_x;
2010 int targetcore = ((int)ptr >> 5) % TOTALCORE;
2011 // for 32 bit machine, the size is always 3 words
2012 //int msgsize = sizeof(int) * 3;
2015 if(targetcore == corenum) {
2017 raw_user_interrupts_off();
2019 // reside on this core
2020 if(!RuntimeHashcontainskey(locktbl, (int)ptr)) {
2021 // no locks for this object, something is wrong
2022 raw_test_done(0xa006);
2025 RuntimeHashget(locktbl, (int)ptr, &rwlock_obj);
2027 RuntimeHashremovekey(locktbl, (int)ptr);
2028 RuntimeHashadd_I(locktbl, (int)ptr, rwlock_obj);
2031 raw_user_interrupts_on();
2036 calCoords(corenum, &self_y, &self_x);
2037 calCoords(targetcore, &target_y, &target_x);
2038 // Build the message header
2039 msgHdr = construct_dyn_hdr(0, msgsize, 0, // msgsize word sent.
2041 target_y, target_x);
2042 gdn_send(msgHdr); // Send the message header to EAST to handle fab(n - 1).
2044 raw_test_pass(0xbbbb);
2045 raw_test_pass(0xb000 + targetcore); // targetcore
2047 gdn_send(5); // lock release
2051 gdn_send(0); // read lock
2057 raw_test_pass_reg(ptr);
2058 raw_test_pass(0xffff);
2060 #elif defined THREADSIMULATE
2061 int numofcore = pthread_getspecific(key);
2062 int rc = pthread_rwlock_rdlock(&rwlock_tbl);
2063 printf("[releasereadlock, %d] getting the read lock for locktbl: %d error: \n", numofcore, rc, strerror(rc));
2064 if(!RuntimeHashcontainskey(locktbl, (int)ptr)) {
2065 printf("[releasereadlock, %d] Error: try to release a lock without previously grab it\n", numofcore);
2068 pthread_rwlock_t* rwlock_obj = NULL;
2069 RuntimeHashget(locktbl, (int)ptr, (int*)&rwlock_obj);
2070 int rc_obj = pthread_rwlock_unlock(rwlock_obj);
2071 printf("[releasereadlock, %d] unlocked object %d: %d error: \n", numofcore, (int)ptr, rc_obj, strerror(rc_obj));
2072 rc = pthread_rwlock_unlock(&rwlock_tbl);
2073 printf("[releasereadlock, %d] release the read lock for locktbl: %d error: \n", numofcore, rc, strerror(rc));
2078 bool getreadlock_I(void * ptr) {
2080 int self_y, self_x, target_y, target_x;
2081 int targetcore = ((int)ptr >> 5) % TOTALCORE;
2082 // for 32 bit machine, the size is always 4 words
2083 //int msgsize = sizeof(int) * 4;
2093 if(targetcore == corenum) {
2094 // reside on this core
2096 if(!RuntimeHashcontainskey(locktbl, (int)ptr)) {
2097 // no locks for this object
2098 // first time to operate on this shared object
2099 // create a lock for it
2100 // the lock is an integer: 0 -- stall, >0 -- read lock, -1 -- write lock
2101 RuntimeHashadd_I(locktbl, (int)ptr, 1);
2104 RuntimeHashget(locktbl, (int)ptr, &rwlock_obj);
2105 if(-1 != rwlock_obj) {
2107 RuntimeHashremovekey(locktbl, (int)ptr);
2108 RuntimeHashadd_I(locktbl, (int)ptr, rwlock_obj);
2113 if(lockobj == (int)ptr) {
2124 // conflicts on lockresults
2125 raw_test_done(0xa005);
2130 calCoords(corenum, &self_y, &self_x);
2131 calCoords(targetcore, &target_y, &target_x);
2132 // Build the message header
2133 msgHdr = construct_dyn_hdr(0, msgsize, 0, // msgsize word sent.
2135 target_y, target_x);
2136 gdn_send(msgHdr); // Send the message header to EAST to handle fab(n - 1).
2138 raw_test_pass(0xbbbb);
2139 raw_test_pass(0xb000 + targetcore); // targetcore
2141 gdn_send(2); // lock request
2145 gdn_send(0); // read lock
2151 raw_test_pass_reg(ptr);
2155 raw_test_pass_reg(corenum);
2156 raw_test_pass(0xffff);
2161 void releasereadlock_I(void * ptr) {
2163 int self_y, self_x, target_y, target_x;
2164 int targetcore = ((int)ptr >> 5) % TOTALCORE;
2165 // for 32 bit machine, the size is always 3 words
2166 //int msgsize = sizeof(int) * 3;
2169 if(targetcore == corenum) {
2170 // reside on this core
2171 if(!RuntimeHashcontainskey(locktbl, (int)ptr)) {
2172 // no locks for this object, something is wrong
2173 raw_test_done(0xa006);
2176 RuntimeHashget(locktbl, (int)ptr, &rwlock_obj);
2178 RuntimeHashremovekey(locktbl, (int)ptr);
2179 RuntimeHashadd_I(locktbl, (int)ptr, rwlock_obj);
2184 calCoords(corenum, &self_y, &self_x);
2185 calCoords(targetcore, &target_y, &target_x);
2186 // Build the message header
2187 msgHdr = construct_dyn_hdr(0, msgsize, 0, // msgsize word sent.
2189 target_y, target_x);
2190 gdn_send(msgHdr); // Send the message header to EAST to handle fab(n - 1).
2192 raw_test_pass(0xbbbb);
2193 raw_test_pass(0xb000 + targetcore); // targetcore
2195 gdn_send(5); // lock release
2199 gdn_send(0); // read lock
2205 raw_test_pass_reg(ptr);
2206 raw_test_pass(0xffff);
2212 bool getwritelock(void * ptr) {
2215 int self_y, self_x, target_y, target_x;
2216 int targetcore = ((int)ptr >> 5) % TOTALCORE;
2217 // for 32 bit machine, the size is always 4 words
2218 //int msgsize = sizeof(int) * 4;
2222 //raw_user_interrupts_off();
2224 //targetcore = ((int)ptr) % tc;
2226 //raw_user_interrupts_on();
2230 raw_test_pass(0xe551);
2231 raw_test_pass_reg(ptr);
2232 raw_test_pass_reg(targetcore);
2233 raw_test_pass_reg(tc);
2243 if(targetcore == corenum) {
2244 // reside on this core
2247 raw_user_interrupts_off();
2249 if(!RuntimeHashcontainskey(locktbl, (int)ptr)) {
2250 // no locks for this object
2251 // first time to operate on this shared object
2252 // create a lock for it
2253 // the lock is an integer: 0 -- stall, >0 -- read lock, -1 -- write lock
2255 raw_test_pass(0xe552);
2257 RuntimeHashadd_I(locktbl, (int)ptr, -1);
2260 RuntimeHashget(locktbl, (int)ptr, &rwlock_obj);
2262 raw_test_pass(0xe553);
2263 raw_test_pass_reg(rwlock_obj);
2265 if(0 == rwlock_obj) {
2267 RuntimeHashremovekey(locktbl, (int)ptr);
2268 RuntimeHashadd_I(locktbl, (int)ptr, rwlock_obj);
2274 raw_user_interrupts_on();
2277 raw_test_pass(0xe554);
2278 raw_test_pass_reg(lockresult);
2280 if(lockobj == (int)ptr) {
2297 // conflicts on lockresults
2298 raw_test_done(0xa007);
2304 raw_test_pass(0xe555);
2306 calCoords(corenum, &self_y, &self_x);
2307 calCoords(targetcore, &target_y, &target_x);
2308 // Build the message header
2309 msgHdr = construct_dyn_hdr(0, msgsize, 0, // msgsize word sent.
2311 target_y, target_x);
2312 gdn_send(msgHdr); // Send the message header to EAST to handle fab(n - 1).
2314 raw_test_pass(0xbbbb);
2315 raw_test_pass(0xb000 + targetcore); // targetcore
2317 gdn_send(2); // lock request
2321 gdn_send(1); // write lock
2327 raw_test_pass_reg(ptr);
2331 raw_test_pass_reg(corenum);
2332 raw_test_pass(0xffff);
2335 #elif defined THREADSIMULATE
2336 int numofcore = pthread_getspecific(key);
2338 int rc = pthread_rwlock_tryrdlock(&rwlock_tbl);
2339 printf("[getwritelock, %d] getting the read lock for locktbl: %d error: \n", numofcore, rc, strerror(rc));
2343 if(!RuntimeHashcontainskey(locktbl, (int)ptr)) {
2344 // no locks for this object
2345 // first time to operate on this shared object
2346 // create a lock for it
2347 rc = pthread_rwlock_unlock(&rwlock_tbl);
2348 printf("[getwritelock, %d] release the read lock for locktbl: %d error: \n", numofcore, rc, strerror(rc));
2349 pthread_rwlock_t* rwlock = (pthread_rwlock_t *)RUNMALLOC(sizeof(pthread_rwlock_t));
2350 memcpy(rwlock, &rwlock_init, sizeof(pthread_rwlock_t));
2351 rc = pthread_rwlock_init(rwlock, NULL);
2352 printf("[getwritelock, %d] initialize the rwlock for object %d: %d error: \n", numofcore, (int)ptr, rc, strerror(rc));
2353 rc = pthread_rwlock_trywrlock(&rwlock_tbl);
2354 printf("[getwritelock, %d] getting the write lock for locktbl: %d error: \n", numofcore, rc, strerror(rc));
2356 pthread_rwlock_destroy(rwlock);
2360 if(!RuntimeHashcontainskey(locktbl, (int)ptr)) {
2362 RuntimeHashadd(locktbl, (int)ptr, (int)rwlock);
2364 pthread_rwlock_destroy(rwlock);
2366 RuntimeHashget(locktbl, (int)ptr, (int*)&rwlock);
2368 rc = pthread_rwlock_unlock(&rwlock_tbl);
2369 printf("[getwritelock, %d] release the write lock for locktbl: %d error: \n", numofcore, rc, strerror(rc));
2371 rc = pthread_rwlock_trywrlock(rwlock);
2372 printf("[getwritelock, %d] getting write lock for object %d: %d error: \n", numofcore, (int)ptr, rc, strerror(rc));
2379 pthread_rwlock_t* rwlock_obj = NULL;
2380 RuntimeHashget(locktbl, (int)ptr, (int*)&rwlock_obj);
2381 rc = pthread_rwlock_unlock(&rwlock_tbl);
2382 printf("[getwritelock, %d] release the read lock for locktbl: %d error: \n", numofcore, rc, strerror(rc));
2383 int rc_obj = pthread_rwlock_trywrlock(rwlock_obj);
2384 printf("[getwritelock, %d] getting write lock for object %d: %d error: \n", numofcore, (int)ptr, rc_obj, strerror(rc_obj));
2395 void releasewritelock(void * ptr) {
2398 int self_y, self_x, target_y, target_x;
2399 int targetcore = ((int)ptr >> 5) % TOTALCORE;
2400 // for 32 bit machine, the size is always 3 words
2401 //int msgsize = sizeof(int) * 3;
2404 if(targetcore == corenum) {
2406 raw_user_interrupts_off();
2408 // reside on this core
2409 if(!RuntimeHashcontainskey(locktbl, (int)ptr)) {
2410 // no locks for this object, something is wrong
2411 raw_test_done(0xa008);
2415 raw_test_pass(0xe662);
2417 RuntimeHashget(locktbl, (int)ptr, &rwlock_obj);
2419 raw_test_pass_reg(rwlock_obj);
2422 RuntimeHashremovekey(locktbl, (int)ptr);
2423 RuntimeHashadd_I(locktbl, (int)ptr, rwlock_obj);
2425 raw_test_pass_reg(rwlock_obj);
2429 raw_user_interrupts_on();
2435 raw_test_pass(0xe663);
2437 calCoords(corenum, &self_y, &self_x);
2438 calCoords(targetcore, &target_y, &target_x);
2439 // Build the message header
2440 msgHdr = construct_dyn_hdr(0, msgsize, 0, // msgsize word sent.
2442 target_y, target_x);
2443 gdn_send(msgHdr); // Send the message header to EAST to handle fab(n - 1).
2445 raw_test_pass(0xbbbb);
2446 raw_test_pass(0xb000 + targetcore);
2448 gdn_send(5); // lock release
2452 gdn_send(1); // write lock
2458 raw_test_pass_reg(ptr);
2459 raw_test_pass(0xffff);
2461 #elif defined THREADSIMULATE
2462 int numofcore = pthread_getspecific(key);
2463 int rc = pthread_rwlock_rdlock(&rwlock_tbl);
2464 printf("[releasewritelock, %d] getting the read lock for locktbl: %d error: \n", numofcore, rc, strerror(rc));
2465 if(!RuntimeHashcontainskey(locktbl, (int)ptr)) {
2466 printf("[releasewritelock, %d] Error: try to release a lock without previously grab it\n", numofcore);
2469 pthread_rwlock_t* rwlock_obj = NULL;
2470 RuntimeHashget(locktbl, (int)ptr, (int*)&rwlock_obj);
2471 int rc_obj = pthread_rwlock_unlock(rwlock_obj);
2472 printf("[releasewritelock, %d] unlocked object %d: %d error:\n", numofcore, (int)ptr, rc_obj, strerror(rc_obj));
2473 rc = pthread_rwlock_unlock(&rwlock_tbl);
2474 printf("[releasewritelock, %d] release the read lock for locktbl: %d error: \n", numofcore, rc, strerror(rc));
2478 int enqueuetasks(struct parameterwrapper *parameter, struct parameterwrapper *prevptr, struct ___Object___ *ptr, int * enterflags, int numenterflags) {
2479 void * taskpointerarray[MAXTASKPARAMS];
2481 int numparams=parameter->task->numParameters;
2482 int numiterators=parameter->task->numTotal-1;
2487 struct taskdescriptor * task=parameter->task;
2489 ObjectHashadd(parameter->objectset, (int) ptr, 0, (int) enterflags, numenterflags, enterflags==NULL);//this add the object to parameterwrapper
2491 /* Add enqueued object to parameter vector */
2492 taskpointerarray[parameter->slot]=ptr;
2494 /* Reset iterators */
2495 for(j=0;j<numiterators;j++) {
2496 toiReset(¶meter->iterators[j]);
2499 /* Find initial state */
2500 for(j=0;j<numiterators;j++) {
2502 if(toiHasNext(¶meter->iterators[j], taskpointerarray OPTARG(failed)))
2503 toiNext(¶meter->iterators[j], taskpointerarray OPTARG(failed));
2505 /* Need to backtrack */
2506 toiReset(¶meter->iterators[j]);
2510 /* Nothing to enqueue */
2517 /* Enqueue current state */
2519 struct taskparamdescriptor *tpd=RUNMALLOC(sizeof(struct taskparamdescriptor));
2521 tpd->numParameters=numiterators+1;
2522 tpd->parameterArray=RUNMALLOC(sizeof(void *)*(numiterators+1));
2523 for(j=0;j<=numiterators;j++){
2524 tpd->parameterArray[j]=taskpointerarray[j];//store the actual parameters
2527 if ((/*!gencontains(failedtasks, tpd)&&*/!gencontains(activetasks,tpd))) {
2528 genputtable(activetasks, tpd, tpd);
2530 RUNFREE(tpd->parameterArray);
2534 /* This loop iterates to the next parameter combination */
2535 if (numiterators==0)
2538 for(j=numiterators-1; j<numiterators;j++) {
2540 if(toiHasNext(¶meter->iterators[j], taskpointerarray OPTARG(failed)))
2541 toiNext(¶meter->iterators[j], taskpointerarray OPTARG(failed));
2543 /* Need to backtrack */
2544 toiReset(¶meter->iterators[j]);
2548 /* Nothing more to enqueue */
2557 int enqueuetasks_I(struct parameterwrapper *parameter, struct parameterwrapper *prevptr, struct ___Object___ *ptr, int * enterflags, int numenterflags) {
2558 void * taskpointerarray[MAXTASKPARAMS];
2560 int numparams=parameter->task->numParameters;
2561 int numiterators=parameter->task->numTotal-1;
2566 struct taskdescriptor * task=parameter->task;
2568 ObjectHashadd_I(parameter->objectset, (int) ptr, 0, (int) enterflags, numenterflags, enterflags==NULL);//this add the object to parameterwrapper
2570 /* Add enqueued object to parameter vector */
2571 taskpointerarray[parameter->slot]=ptr;
2573 /* Reset iterators */
2574 for(j=0;j<numiterators;j++) {
2575 toiReset(¶meter->iterators[j]);
2578 /* Find initial state */
2579 for(j=0;j<numiterators;j++) {
2581 if(toiHasNext(¶meter->iterators[j], taskpointerarray OPTARG(failed)))
2582 toiNext(¶meter->iterators[j], taskpointerarray OPTARG(failed));
2584 /* Need to backtrack */
2585 toiReset(¶meter->iterators[j]);
2589 /* Nothing to enqueue */
2595 /* Enqueue current state */
2597 struct taskparamdescriptor *tpd=RUNMALLOC_I(sizeof(struct taskparamdescriptor));
2599 tpd->numParameters=numiterators+1;
2600 tpd->parameterArray=RUNMALLOC_I(sizeof(void *)*(numiterators+1));
2601 for(j=0;j<=numiterators;j++){
2602 tpd->parameterArray[j]=taskpointerarray[j];//store the actual parameters
2605 if ((/*!gencontains(failedtasks, tpd)&&*/!gencontains(activetasks,tpd))) {
2606 genputtable_I(activetasks, tpd, tpd);
2608 RUNFREE(tpd->parameterArray);
2612 /* This loop iterates to the next parameter combination */
2613 if (numiterators==0)
2616 for(j=numiterators-1; j<numiterators;j++) {
2618 if(toiHasNext(¶meter->iterators[j], taskpointerarray OPTARG(failed)))
2619 toiNext(¶meter->iterators[j], taskpointerarray OPTARG(failed));
2621 /* Need to backtrack */
2622 toiReset(¶meter->iterators[j]);
2626 /* Nothing more to enqueue */
2635 /* Handler for signals. The signals catch null pointer errors and
2636 arithmatic errors. */
2638 void myhandler(int sig, siginfo_t *info, void *uap) {
2641 printf("sig=%d\n",sig);
2644 sigemptyset(&toclear);
2645 sigaddset(&toclear, sig);
2646 sigprocmask(SIG_UNBLOCK, &toclear,NULL);
2647 longjmp(error_handler,1);
2653 struct RuntimeHash *fdtoobject;
2655 void addreadfd(int fd) {
2658 FD_SET(fd, &readfds);
2661 void removereadfd(int fd) {
2662 FD_CLR(fd, &readfds);
2663 if (maxreadfd==(fd+1)) {
2665 while(maxreadfd>0&&!FD_ISSET(maxreadfd-1, &readfds))
2676 void executetasks() {
2677 void * taskpointerarray[MAXTASKPARAMS+OFFSET];
2680 struct ___Object___ * tmpparam = NULL;
2681 struct parameterdescriptor * pd=NULL;
2682 struct parameterwrapper *pw=NULL;
2692 raw_test_pass(0xe991);
2697 /* Set up signal handlers */
2698 struct sigaction sig;
2699 sig.sa_sigaction=&myhandler;
2700 sig.sa_flags=SA_SIGINFO;
2701 sigemptyset(&sig.sa_mask);
2703 /* Catch bus errors, segmentation faults, and floating point exceptions*/
2704 sigaction(SIGBUS,&sig,0);
2705 sigaction(SIGSEGV,&sig,0);
2706 sigaction(SIGFPE,&sig,0);
2707 sigaction(SIGPIPE,&sig,0);
2716 fdtoobject=allocateRuntimeHash(100);
2720 /* Map first block of memory to protected, anonymous page */
2721 mmap(0, 0x1000, 0, MAP_SHARED|MAP_FIXED|MAP_ANON, -1, 0);
2725 while((hashsize(activetasks)>0)||(maxreadfd>0)) {
2729 raw_test_pass(0xe992);
2732 /* Check if any filedescriptors have IO pending */
2735 struct timeval timeout={0,0};
2739 numselect=select(maxreadfd, &tmpreadfds, NULL, NULL, &timeout);
2741 /* Process ready fd's */
2743 for(fd=0;fd<maxreadfd;fd++) {
2744 if (FD_ISSET(fd, &tmpreadfds)) {
2745 /* Set ready flag on object */
2747 // printf("Setting fd %d\n",fd);
2748 if (RuntimeHashget(fdtoobject, fd,(int *) &objptr)) {
2749 if(intflagorand(objptr,1,0xFFFFFFFF)) { /* Set the first flag to 1 */
2750 enqueueObject(objptr, NULL, 0);
2759 /* See if there are any active tasks */
2760 if (hashsize(activetasks)>0) {
2762 currtpd=(struct taskparamdescriptor *) getfirstkey(activetasks);
2763 genfreekey(activetasks, currtpd);
2765 /* Check if this task has failed, allow a task that contains optional objects to fire */
2766 /*if (gencontains(failedtasks, currtpd)) {
2767 // Free up task parameter descriptor
2768 RUNFREE(currtpd->parameterArray);
2772 numparams=currtpd->task->numParameters;
2773 numtotal=currtpd->task->numTotal;
2775 #ifdef THREADSIMULATE
2776 int isolateflags[numparams];
2778 /* Make sure that the parameters are still in the queues */
2779 for(i=0;i<numparams;i++) {
2780 void * parameter=currtpd->parameterArray[i];
2783 raw_test_pass(0xe993);
2786 if(((struct ___Object___ *)parameter)->type == STARTUPTYPE) {
2788 taskpointerarray[i+OFFSET]=parameter;
2792 // require locks for this parameter if it is not a startup object
2793 getwritelock(parameter);
2797 raw_user_interrupts_off();
2804 while(receiveObject() != -1) {
2808 grount = lockresult;
2817 raw_user_interrupts_on();
2822 raw_test_pass(0xe994);
2824 // can not get the lock, try later
2825 for(j = 0; j < i; ++j) {
2826 releasewritelock(taskpointerarray[j+OFFSET]);
2828 genputtable(activetasks, currtpd, currtpd);
2829 if(hashsize(activetasks) == 1) {
2830 // only one task right now, wait a little while before next try
2838 raw_invalidate_cache_range((int)parameter, classsize[((struct ___Object___ *)parameter)->type]);
2840 for(tmp = 0; tmp < classsize[((struct ___Object___ *)parameter)->type]; ++tmp) {
2841 invalidateAddr(parameter + tmp);
2845 tmpparam = (struct ___Object___ *)parameter;
2846 #ifdef THREADSIMULATE
2847 if(((struct ___Object___ *)parameter)->type == STARTUPTYPE) {
2849 taskpointerarray[i+OFFSET]=parameter;
2853 if(0 == tmpparam->isolate) {
2854 isolateflags[i] = 0;
2855 // shared object, need to flush with current value
2856 //if(!getreadlock(tmpparam->original)) {
2857 // // fail to get read lock of the original object, try this task later
2858 if(!getwritelock(tmpparam->original)) {
2859 // fail to get write lock, release all obtained locks and try this task later
2861 for(j = 0; j < i; ++j) {
2862 if(0 == isolateflags[j]) {
2863 releasewritelock(((struct ___Object___ *)taskpointerarray[j+OFFSET])->original);
2866 genputtable(activetasks, currtpd, currtpd);
2869 if(tmpparam->version != tmpparam->original->version) {
2870 // some task on another core has changed this object
2871 // flush this object
2872 //memcpy(tmpparam, tmpparam->original, classsize[tmpparam->type]);
2873 // release all obtained locks
2875 for(j = 0; j < i; ++j) {
2876 if(0 == isolateflags[j]) {
2877 releasewritelock(((struct ___Object___ *)taskpointerarray[j+OFFSET])->original);
2880 releasewritelock(tmpparam->original);
2882 // dequeue this object
2883 int numofcore = pthread_getspecific(key);
2884 struct parameterwrapper ** queues = objectqueues[numofcore][tmpparam->type];
2885 int length = numqueues[numofcore][tmpparam->type];
2886 for(j = 0; j < length; ++j) {
2887 struct parameterwrapper * pw = queues[j];
2888 if(ObjectHashcontainskey(pw->objectset, (int)tmpparam)) {
2890 int UNUSED, UNUSED2;
2892 ObjectHashget(pw->objectset, (int) tmpparam, (int *) &next, (int *) &enterflags, &UNUSED, &UNUSED2);
2893 ObjectHashremove(pw->objectset, (int)tmpparam);
2894 if (enterflags!=NULL)
2898 // try to enqueue it again to check if it feeds other tasks;
2899 //enqueueObject(tmpparam, NULL, 0);
2900 // Free up task parameter descriptor
2901 RUNFREE(currtpd->parameterArray);
2906 isolateflags[i] = 1;
2909 pd=currtpd->task->descriptorarray[i];
2910 pw=(struct parameterwrapper *) pd->queue;
2911 /* Check that object is still in queue */
2913 if (!ObjectHashcontainskey(pw->objectset, (int) parameter)) {
2915 raw_test_pass(0xe995);
2917 // release grabbed locks
2918 for(j = 0; j < i; ++j) {
2919 releasewritelock(taskpointerarray[j+OFFSET]);
2921 releasewritelock(parameter);
2922 RUNFREE(currtpd->parameterArray);
2928 /* Check if the object's flags still meets requirements */
2932 for(tmpi = 0; tmpi < pw->numberofterms; ++tmpi) {
2933 andmask=pw->intarray[tmpi*2];
2934 checkmask=pw->intarray[tmpi*2+1];
2936 raw_test_pass(0xdd000000 + andmask);
2937 raw_test_pass_reg((int)parameter);
2938 raw_test_pass(0xdd000000 + ((struct ___Object___ *)parameter)->flag);
2939 raw_test_pass(0xdd000000 + checkmask);
2941 if((((struct ___Object___ *)parameter)->flag&andmask)==checkmask) {
2947 // flags are never suitable
2948 // remove this obj from the queue
2950 int UNUSED, UNUSED2;
2953 raw_test_pass(0xe996);
2955 ObjectHashget(pw->objectset, (int) parameter, (int *) &next, (int *) &enterflags, &UNUSED, &UNUSED2);
2956 ObjectHashremove(pw->objectset, (int)parameter);
2957 if (enterflags!=NULL)
2959 // release grabbed locks
2960 for(j = 0; j < i; ++j) {
2961 releasewritelock(taskpointerarray[j+OFFSET]);
2963 releasewritelock(parameter);
2964 RUNFREE(currtpd->parameterArray);
2972 /* Check that object still has necessary tags */
2973 for(j=0;j<pd->numbertags;j++) {
2974 int slotid=pd->tagarray[2*j]+numparams;
2975 struct ___TagDescriptor___ *tagd=currtpd->parameterArray[slotid];
2976 if (!containstag(parameter, tagd)) {
2978 raw_test_pass(0xe997);
2980 RUNFREE(currtpd->parameterArray);
2986 taskpointerarray[i+OFFSET]=parameter;
2989 for(;i<numtotal;i++) {
2990 taskpointerarray[i+OFFSET]=currtpd->parameterArray[i];
2993 #ifdef THREADSIMULATE
2994 for(i = 0; i < numparams; ++i) {
2995 if(0 == isolateflags[i]) {
2996 struct ___Object___ * tmpparam = (struct ___Object___ *)taskpointerarray[i+OFFSET];
2997 if(tmpparam != tmpparam->original) {
2998 taskpointerarray[i+OFFSET] = tmpparam->original;
3007 /* Checkpoint the state */
3008 forward=allocateRuntimeHash(100);
3009 reverse=allocateRuntimeHash(100);
3010 //void ** checkpoint=makecheckpoint(currtpd->task->numParameters, currtpd->parameterArray, forward, reverse);
3013 if (x=setjmp(error_handler)) {
3018 printf("Fatal Error=%d, Recovering!\n",x);
3022 genputtable(failedtasks,currtpd,currtpd);
3023 //restorecheckpoint(currtpd->task->numParameters, currtpd->parameterArray, checkpoint, forward, reverse);
3025 freeRuntimeHash(forward);
3026 freeRuntimeHash(reverse);
3034 raw_test_pass_reg(x);
3036 raw_test_done(0xa009);
3041 /*if (injectfailures) {
3042 if ((((double)random())/RAND_MAX)<failurechance) {
3043 printf("\nINJECTING TASK FAILURE to %s\n", currtpd->task->name);
3044 longjmp(error_handler,10);
3047 /* Actually call task */
3049 ((int *)taskpointerarray)[0]=currtpd->numParameters;
3050 taskpointerarray[1]=NULL;
3055 printf("ENTER %s count=%d\n",currtpd->task->name, (instaccum-instructioncount));
3057 ((void (*) (void **)) currtpd->task->taskptr)(taskpointerarray);
3059 printf("EXIT %s count=%d\n",currtpd->task->name, (instaccum-instructioncount));
3062 ((void (*) (void **)) currtpd->task->taskptr)(taskpointerarray);
3065 raw_test_pass(0xe998);
3066 raw_test_pass_reg(lock);
3071 for(i = 0; i < numparams; ++i) {
3073 struct ___Object___ * tmpparam = (struct ___Object___ *)taskpointerarray[i+OFFSET];
3075 raw_test_pass(0xe999);
3076 raw_test_pass(0xdd100000 + tmpparam->flag);
3078 releasewritelock(tmpparam);
3080 #elif defined THREADSIMULATE
3081 for(i = 0; i < numparams; ++i) {
3082 if(0 == isolateflags[i]) {
3083 struct ___Object___ * tmpparam = (struct ___Object___ *)taskpointerarray[i+OFFSET];
3084 releasewritelock(tmpparam);
3092 freeRuntimeHash(forward);
3093 freeRuntimeHash(reverse);
3097 // Free up task parameter descriptor
3098 RUNFREE(currtpd->parameterArray);
3107 raw_test_pass(0xe99a);
3108 raw_test_pass_reg(lock);
3116 raw_test_pass(0xe999);
3120 /* This function processes an objects tags */
3121 void processtags(struct parameterdescriptor *pd, int index, struct parameterwrapper *parameter, int * iteratorcount, int *statusarray, int numparams) {
3124 for(i=0;i<pd->numbertags;i++) {
3125 int slotid=pd->tagarray[2*i];
3126 int tagid=pd->tagarray[2*i+1];
3128 if (statusarray[slotid+numparams]==0) {
3129 parameter->iterators[*iteratorcount].istag=1;
3130 parameter->iterators[*iteratorcount].tagid=tagid;
3131 parameter->iterators[*iteratorcount].slot=slotid+numparams;
3132 parameter->iterators[*iteratorcount].tagobjectslot=index;
3133 statusarray[slotid+numparams]=1;
3140 void processobject(struct parameterwrapper *parameter, int index, struct parameterdescriptor *pd, int *iteratorcount, int * statusarray, int numparams) {
3143 struct ObjectHash * objectset=((struct parameterwrapper *)pd->queue)->objectset;
3145 parameter->iterators[*iteratorcount].istag=0;
3146 parameter->iterators[*iteratorcount].slot=index;
3147 parameter->iterators[*iteratorcount].objectset=objectset;
3148 statusarray[index]=1;
3150 for(i=0;i<pd->numbertags;i++) {
3151 int slotid=pd->tagarray[2*i];
3152 int tagid=pd->tagarray[2*i+1];
3153 if (statusarray[slotid+numparams]!=0) {
3154 /* This tag has already been enqueued, use it to narrow search */
3155 parameter->iterators[*iteratorcount].tagbindings[tagcount]=slotid+numparams;
3159 parameter->iterators[*iteratorcount].numtags=tagcount;
3164 /* This function builds the iterators for a task & parameter */
3166 void builditerators(struct taskdescriptor * task, int index, struct parameterwrapper * parameter) {
3167 int statusarray[MAXTASKPARAMS];
3169 int numparams=task->numParameters;
3170 int iteratorcount=0;
3171 for(i=0;i<MAXTASKPARAMS;i++) statusarray[i]=0;
3173 statusarray[index]=1; /* Initial parameter */
3174 /* Process tags for initial iterator */
3176 processtags(task->descriptorarray[index], index, parameter, & iteratorcount, statusarray, numparams);
3180 /* Check for objects with existing tags */
3181 for(i=0;i<numparams;i++) {
3182 if (statusarray[i]==0) {
3183 struct parameterdescriptor *pd=task->descriptorarray[i];
3185 for(j=0;j<pd->numbertags;j++) {
3186 int slotid=pd->tagarray[2*j];
3187 if(statusarray[slotid+numparams]!=0) {
3188 processobject(parameter, i, pd, &iteratorcount, statusarray, numparams);
3189 processtags(pd, i, parameter, &iteratorcount, statusarray, numparams);
3196 /* Next do objects w/ unbound tags*/
3198 for(i=0;i<numparams;i++) {
3199 if (statusarray[i]==0) {
3200 struct parameterdescriptor *pd=task->descriptorarray[i];
3201 if (pd->numbertags>0) {
3202 processobject(parameter, i, pd, &iteratorcount, statusarray, numparams);
3203 processtags(pd, i, parameter, &iteratorcount, statusarray, numparams);
3209 /* Nothing with a tag enqueued */
3211 for(i=0;i<numparams;i++) {
3212 if (statusarray[i]==0) {
3213 struct parameterdescriptor *pd=task->descriptorarray[i];
3214 processobject(parameter, i, pd, &iteratorcount, statusarray, numparams);
3215 processtags(pd, i, parameter, &iteratorcount, statusarray, numparams);
3228 #ifdef THREADSIMULATE
3229 int numofcore = pthread_getspecific(key);
3230 for(i=0;i<numtasks[numofcore];i++) {
3231 struct taskdescriptor * task=taskarray[numofcore][i];
3234 if(corenum > NUMCORES - 1) {
3238 for(i=0;i<numtasks[corenum];i++) {
3239 struct taskdescriptor * task=taskarray[corenum][i];
3242 printf("%s\n", task->name);
3244 for(j=0;j<task->numParameters;j++) {
3245 struct parameterdescriptor *param=task->descriptorarray[j];
3246 struct parameterwrapper *parameter=param->queue;
3247 struct ObjectHash * set=parameter->objectset;
3248 struct ObjectIterator objit;
3250 printf(" Parameter %d\n", j);
3252 ObjectHashiterator(set, &objit);
3253 while(ObjhasNext(&objit)) {
3254 struct ___Object___ * obj=(struct ___Object___ *)Objkey(&objit);
3255 struct ___Object___ * tagptr=obj->___tags___;
3256 int nonfailed=Objdata4(&objit);
3257 int numflags=Objdata3(&objit);
3258 int flags=Objdata2(&objit);
3261 printf(" Contains %lx\n", obj);
3262 printf(" flag=%d\n", obj->flag);
3265 } else if (tagptr->type==TAGTYPE) {
3267 printf(" tag=%lx\n",tagptr);
3272 struct ArrayObject *ao=(struct ArrayObject *)tagptr;
3273 for(;tagindex<ao->___cachedCode___;tagindex++) {
3275 printf(" tag=%lx\n",ARRAYGET(ao, struct ___TagDescriptor___*, tagindex));
3285 /* This function processes the task information to create queues for
3286 each parameter type. */
3288 void processtasks() {
3291 if(corenum > NUMCORES - 1) {
3295 #ifdef THREADSIMULATE
3296 int numofcore = pthread_getspecific(key);
3297 for(i=0;i<numtasks[numofcore];i++) {
3298 struct taskdescriptor *task=taskarray[numofcore][i];
3300 for(i=0;i<numtasks[corenum];i++) {
3301 struct taskdescriptor * task=taskarray[corenum][i];
3305 /* Build objectsets */
3306 for(j=0;j<task->numParameters;j++) {
3307 struct parameterdescriptor *param=task->descriptorarray[j];
3308 struct parameterwrapper *parameter=param->queue;
3309 parameter->objectset=allocateObjectHash(10);
3310 parameter->task=task;
3313 /* Build iterators for parameters */
3314 for(j=0;j<task->numParameters;j++) {
3315 struct parameterdescriptor *param=task->descriptorarray[j];
3316 struct parameterwrapper *parameter=param->queue;
3317 builditerators(task, j, parameter);
3322 void toiReset(struct tagobjectiterator * it) {
3325 } else if (it->numtags>0) {
3328 ObjectHashiterator(it->objectset, &it->it);
3332 int toiHasNext(struct tagobjectiterator *it, void ** objectarray OPTARG(int * failed)) {
3335 /* Get object with tags */
3336 struct ___Object___ *obj=objectarray[it->tagobjectslot];
3337 struct ___Object___ *tagptr=obj->___tags___;
3338 if (tagptr->type==TAGTYPE) {
3339 if ((it->tagobjindex==0)&& /* First object */
3340 (it->tagid==((struct ___TagDescriptor___ *)tagptr)->flag)) /* Right tag type */
3345 struct ArrayObject *ao=(struct ArrayObject *) tagptr;
3346 int tagindex=it->tagobjindex;
3347 for(;tagindex<ao->___cachedCode___;tagindex++) {
3348 struct ___TagDescriptor___ *td=ARRAYGET(ao, struct ___TagDescriptor___ *, tagindex);
3349 if (td->flag==it->tagid) {
3350 it->tagobjindex=tagindex; /* Found right type of tag */
3356 } else if (it->numtags>0) {
3357 /* Use tags to locate appropriate objects */
3358 struct ___TagDescriptor___ *tag=objectarray[it->tagbindings[0]];
3359 struct ___Object___ *objptr=tag->flagptr;
3361 if (objptr->type!=OBJECTARRAYTYPE) {
3362 if (it->tagobjindex>0)
3364 if (!ObjectHashcontainskey(it->objectset, (int) objptr))
3366 for(i=1;i<it->numtags;i++) {
3367 struct ___TagDescriptor___ *tag2=objectarray[it->tagbindings[i]];
3368 if (!containstag(objptr,tag2))
3373 struct ArrayObject *ao=(struct ArrayObject *) objptr;
3376 for(tagindex=it->tagobjindex;tagindex<ao->___cachedCode___;tagindex++) {
3377 struct ___Object___ *objptr=ARRAYGET(ao, struct ___Object___*, tagindex);
3378 if (!ObjectHashcontainskey(it->objectset, (int) objptr))
3380 for(i=1;i<it->numtags;i++) {
3381 struct ___TagDescriptor___ *tag2=objectarray[it->tagbindings[i]];
3382 if (!containstag(objptr,tag2))
3385 it->tagobjindex=tagindex;
3390 it->tagobjindex=tagindex;
3394 return ObjhasNext(&it->it);
3398 int containstag(struct ___Object___ *ptr, struct ___TagDescriptor___ *tag) {
3400 struct ___Object___ * objptr=tag->flagptr;
3401 if (objptr->type==OBJECTARRAYTYPE) {
3402 struct ArrayObject *ao=(struct ArrayObject *)objptr;
3403 for(j=0;j<ao->___cachedCode___;j++) {
3404 if (ptr==ARRAYGET(ao, struct ___Object___*, j))
3412 void toiNext(struct tagobjectiterator *it , void ** objectarray OPTARG(int * failed)) {
3413 /* hasNext has all of the intelligence */
3416 /* Get object with tags */
3417 struct ___Object___ *obj=objectarray[it->tagobjectslot];
3418 struct ___Object___ *tagptr=obj->___tags___;
3419 if (tagptr->type==TAGTYPE) {
3421 objectarray[it->slot]=tagptr;
3423 struct ArrayObject *ao=(struct ArrayObject *) tagptr;
3424 objectarray[it->slot]=ARRAYGET(ao, struct ___TagDescriptor___ *, it->tagobjindex++);
3426 } else if (it->numtags>0) {
3427 /* Use tags to locate appropriate objects */
3428 struct ___TagDescriptor___ *tag=objectarray[it->tagbindings[0]];
3429 struct ___Object___ *objptr=tag->flagptr;
3430 if (objptr->type!=OBJECTARRAYTYPE) {
3432 objectarray[it->slot]=objptr;
3434 struct ArrayObject *ao=(struct ArrayObject *) objptr;
3435 objectarray[it->slot]=ARRAYGET(ao, struct ___Object___ *, it->tagobjindex++);
3438 /* Iterate object */
3439 objectarray[it->slot]=(void *)Objkey(&it->it);