1 /* ============================================================
3 * - single thread commit on local machine
4 * =============================================================
5 * Copyright (c) 2009, University of California, Irvine, USA.
9 * =============================================================
16 /* Per thread transaction variables */
17 __thread objstr_t *t_cache;
18 __thread objstr_t *t_reserve;
19 __thread struct objlist * newobjs;
22 int numTransCommit = 0;
23 int numTransAbort = 0;
25 int nSoftAbortCommit = 0;
26 int nSoftAbortAbort = 0;
30 /* Thread variable for locking/unlocking */
31 __thread threadrec_t *trec;
32 __thread struct objlist * lockedobjs;
34 int typesCausingAbort[TOTALNUMCLASSANDARRAY];
35 /******Keep track of objects and types causing aborts******/
36 /* TODO uncomment for later use
37 #define DEBUGSTMSTAT(args...) { \
42 #define DEBUGSTMSTAT(args...)
44 #define DEBUGSTMSTAT(args...)
48 #define DEBUGSTM(x...) printf(x);
50 #define DEBUGSTM(x...)
54 void * A_memcpy (void * dest, const void * src, size_t count);
56 #define A_memcpy memcpy
60 /*** Global variables *****/
61 objlockstate_t *objlockscope;
64 * params: object header
65 * Increments the abort count for each object
67 void ABORTCOUNT(objheader_t * x) {
69 if (x->abortCount > MAXABORTS && (x->riskyflag != 1)) {
70 //makes riskflag sticky
71 pthread_mutex_lock(&lockedobjstore);
72 if (objlockscope->offset<MAXOBJLIST) {
73 x->objlock=&(objlockscope->lock[objlockscope->offset++]);
75 objlockstate_t *tmp=malloc(sizeof(objlockstate_t));
76 tmp->next=objlockscope;
78 x->objlock=&(tmp->lock[0]);
81 pthread_mutex_unlock(&lockedobjstore);
82 pthread_mutex_init(x->objlock, NULL);
83 //should put a memory barrier here
89 /* ==================================================
91 * This function starts up the transaction runtime.
92 * ==================================================
98 /* ======================================
100 * - create an object store of given size
101 * ======================================
103 objstr_t *objstrCreate(unsigned int size) {
105 if((tmp = calloc(1, (sizeof(objstr_t) + size))) == NULL) {
106 printf("%s() Calloc error at line %d, %s\n", __func__, __LINE__, __FILE__);
111 tmp->top = tmp + 1; //points to end of objstr_t structure!
116 while(t_cache->next!=NULL) {
117 objstr_t *next=t_cache->next;
118 t_cache->next=t_reserve;
122 t_cache->top=t_cache+1;
125 //free entire list, starting at store
126 void objstrDelete(objstr_t *store) {
128 while (store != NULL) {
136 /* =================================================
138 * This function initializes things required in the
140 * =================================================
143 //Transaction start is currently free...commit and aborting is not
146 /* =======================================================
148 * This function creates objects in the transaction record
149 * =======================================================
151 objheader_t *transCreateObj(void * ptr, unsigned int size) {
152 objheader_t *tmp = mygcmalloc(ptr, (sizeof(objheader_t) + size));
153 objheader_t *retval=&tmp[1];
154 tmp->lock=RW_LOCK_BIAS;
156 //initialize obj lock to the header
158 // don't insert into table
159 if (newobjs->offset<MAXOBJLIST) {
160 newobjs->objs[newobjs->offset++]=retval;
162 struct objlist *tmp=malloc(sizeof(struct objlist));
168 return retval; //want space after object header
171 /* This functions inserts randowm wait delays in the order of msec
172 * Mostly used when transaction commits retry*/
173 void randomdelay(int softaborted) {
177 gettimeofday(&t,NULL);
180 req.tv_nsec = (long)((t.tv_usec)%(1<<softaborted))<<1; //1-11 microsec
181 nanosleep(&req, NULL);
185 /* ==============================================
187 * - allocate space in an object store
188 * ==============================================
190 void *objstrAlloc(unsigned int size) {
193 objstr_t *store=t_cache;
199 if (OSFREE(store)>=size) {
204 if ((store=store->next)==NULL)
209 unsigned int newsize=size>DEFAULT_OBJ_STORE_SIZE ? size : DEFAULT_OBJ_STORE_SIZE;
210 objstr_t **otmp=&t_reserve;
212 while((ptr=*otmp)!=NULL) {
213 if (ptr->size>=newsize) {
218 ptr->top=((char *)(&ptr[1]))+size;
223 objstr_t *os=(objstr_t *)calloc(1,(sizeof(objstr_t) + newsize));
228 os->top=((char *)nptr)+size;
233 /* =============================================================
235 * -finds the objects either in main heap
236 * -copies the object into the transaction cache
237 * =============================================================
239 __attribute__((pure)) void *transRead(void * oid, void *gl) {
240 objheader_t *tmp, *objheader;
241 objheader_t *objcopy;
244 /* Read from the main heap */
246 objheader_t *header = (objheader_t *)(((char *)oid) - sizeof(objheader_t));
247 GETSIZE(size, header);
248 size += sizeof(objheader_t);
249 objcopy = (objheader_t *) objstrAlloc(size);
251 header->accessCount++;
252 if(header->riskyflag) {
253 header=needLock(header,gl);
256 A_memcpy(objcopy, header, size);
257 /* Insert into cache's lookup table */
259 t_chashInsert(oid, &objcopy[1]);
264 struct objlist *ptr=newobjs;
265 while(ptr->next!=NULL) {
266 struct objlist *tmp=ptr->next;
275 void freelockedobjs() {
276 struct objlist *ptr=lockedobjs;
277 while(ptr->next!=NULL) {
278 struct objlist *tmp=ptr->next;
287 /* ================================================================
289 * - This function initiates the transaction commit process
290 * - goes through the transaction cache and decides
292 * ================================================================
295 int transCommit(void (*commitmethod)(void *, void *, void *), void * primitives, void * locals, void * params) {
301 /* Look through all the objects in the transaction hash table */
303 if (c_numelements<(c_size>>3))
304 finalResponse= alttraverseCache(commitmethod, primitives, locals, params);
306 finalResponse= traverseCache(commitmethod, primitives, locals, params);
307 if(finalResponse == TRANS_ABORT) {
325 if(finalResponse == TRANS_COMMIT) {
343 /* wait a random amount of time before retrying to commit transaction*/
344 if(finalResponse == TRANS_SOFT_ABORT) {
354 //retry if too many soft aborts
366 //randomdelay(softaborted);
368 printf("Error: in %s() Unknown outcome", __func__);
375 #define freearrays if (c_numelements>=200) { \
377 free(oidrdversion); \
379 if (t_numelements>=200) { \
383 #define freearrays if (c_numelements>=200) { \
385 free(oidrdversion); \
389 /* ==================================================
391 * - goes through the transaction cache and
392 * - decides if a transaction should commit or abort
393 * ==================================================
396 int traverseCache(void (*commitmethod)(void *, void *, void *), void * primitives, void * locals, void * params) {
398 int traverseCache() {
400 /* Create info to keep track of objects that can be locked */
401 int numoidrdlocked=0;
402 int numoidwrlocked=0;
403 void * rdlocked[200];
405 void * wrlocked[200];
412 int t_numelements=c_numelements+dc_c_numelements;
413 if (t_numelements<200) {
414 oidwrlocked=wrlocked;
416 oidwrlocked=malloc(t_numelements*sizeof(void *));
418 if (c_numelements<200) {
419 oidrdlocked=rdlocked;
420 oidrdversion=rdversion;
422 int size=c_numelements*sizeof(void*);
423 oidrdlocked=malloc(size);
424 oidrdversion=malloc(size);
427 if (c_numelements<200) {
428 oidrdlocked=rdlocked;
429 oidrdversion=rdversion;
430 oidwrlocked=wrlocked;
432 int size=c_numelements*sizeof(void*);
433 oidrdlocked=malloc(size);
434 oidrdversion=malloc(size);
435 oidwrlocked=malloc(size);
438 chashlistnode_t *ptr = c_table;
439 /* Represents number of bins in the chash table */
440 unsigned int size = c_size;
441 for(i = 0; i<size; i++) {
442 chashlistnode_t *curr = &ptr[i];
443 /* Inner loop to traverse the linked list of the cache lookupTable */
444 while(curr != NULL) {
445 //if the first bin in hash table is empty
446 if(curr->key == NULL)
448 objheader_t * headeraddr=&((objheader_t *) curr->val)[-1];
449 objheader_t *header=(objheader_t *)(((char *)curr->key)-sizeof(objheader_t));
450 unsigned int version = headeraddr->version;
452 if(STATUS(headeraddr) & DIRTY) {
453 /* Read from the main heap and compare versions */
454 if(write_trylock(&header->lock)) { //can aquire write lock
455 if (version == header->version) { /* versions match */
456 /* Keep track of objects locked */
457 oidwrlocked[numoidwrlocked++] = header;
459 oidwrlocked[numoidwrlocked++] = header;
460 transAbortProcess(oidwrlocked, numoidwrlocked);
463 (typesCausingAbort[TYPE(header)])++;
464 getTotalAbortCount(i+1, size, (void *)(curr->next), NULL, 1);
466 DEBUGSTM("WR Abort: rd: %u wr: %u tot: %u type: %u ver: %u\n", numoidrdlocked, numoidwrlocked, c_numelements, TYPE(header), header->version);
467 DEBUGSTMSTAT("WR Abort: Access Count: %u AbortCount: %u type: %u ver: %u \n", header->accessCount, header->abortCount, TYPE(header), header->version);
470 return TRANS_SOFT_ABORT;
475 if(version == header->version) {
479 transAbortProcess(oidwrlocked, numoidwrlocked);
482 (typesCausingAbort[TYPE(header)])++;
484 #if defined(STMSTATS)||defined(SOFTABORT)
485 if (getTotalAbortCount(i+1, size, (void *)(curr->next), NULL, 1))
488 DEBUGSTM("WR Abort: rd: %u wr: %u tot: %u type: %u ver: %u\n", numoidrdlocked, numoidwrlocked, c_numelements, TYPE(header), header->version);
489 DEBUGSTMSTAT("WR Abort: Access Count: %u AbortCount: %u type: %u ver: %u \n", header->accessCount, header->abortCount, TYPE(header), header->version);
492 return TRANS_SOFT_ABORT;
497 oidrdversion[numoidrdlocked]=version;
498 oidrdlocked[numoidrdlocked++] = header;
505 //acquire other locks
506 chashlistnode_t *dc_curr = dc_c_list;
507 /* Inner loop to traverse the linked list of the cache lookupTable */
508 while(likely(dc_curr != NULL)) {
509 //if the first bin in hash table is empty
510 objheader_t * headeraddr=&((objheader_t *) dc_curr->val)[-1];
511 objheader_t *header=(objheader_t *)(((char *)dc_curr->key)-sizeof(objheader_t));
512 if(write_trylock(&header->lock)) { //can aquire write lock
513 oidwrlocked[numoidwrlocked++] = header;
515 //maybe we already have lock
516 chashlistnode_t *node = &c_table[(((unsigned INTPTR)key) & c_mask)>>4];
519 if(node->key == key) {
523 } while(node != NULL);
525 //have to abort to avoid deadlock
526 transAbortProcess(oidwrlocked, numoidwrlocked);
529 (typesCausingAbort[TYPE(header)])++;
531 #if defined(STMSTATS)||defined(SOFTABORT)
532 if (getTotalAbortCount(i+1, size, (void *)(curr->next), NULL, 1))
535 DEBUGSTM("WR Abort: rd: %u wr: %u tot: %u type: %u ver: %u\n", numoidrdlocked, numoidwrlocked, c_numelements, TYPE(header), header->version);
536 DEBUGSTMSTAT("WR Abort: Access Count: %u AbortCount: %u type: %u ver: %u \n", header->accessCount, header->abortCount, TYPE(header), header->version);
539 return TRANS_SOFT_ABORT;
544 dc_curr = dc_curr->lnext;
548 //THIS IS THE SERIALIZATION END POINT (START POINT IS END OF EXECUTION)*****
550 for(i=0; i<numoidrdlocked; i++) {
551 /* Read from the main heap and compare versions */
552 objheader_t *header=oidrdlocked[i];
553 unsigned int version=oidrdversion[i];
554 if(header->lock>0) { //not write locked
555 if(version != header->version) { /* versions do not match */
556 oidrdlocked[numoidrdlocked++] = header;
557 transAbortProcess(oidwrlocked, numoidwrlocked);
560 (typesCausingAbort[TYPE(header)])++;
561 getTotalAbortCount(i+1, numoidrdlocked, oidrdlocked, (void *) oidrdversion, 0);
563 DEBUGSTM("RD Abort: rd: %u wr: %u tot: %u type: %u ver: %u\n", numoidrdlocked, numoidwrlocked, c_numelements, TYPE(header), header->version);
564 DEBUGSTMSTAT("RD Abort: Access Count: %u AbortCount: %u type: %u ver: %u \n", header->accessCount, header->abortCount, TYPE(header), header->version);
568 } else { /* cannot aquire lock */
569 //do increment as we didn't get lock
570 if(version == header->version) {
573 transAbortProcess(oidwrlocked, numoidwrlocked);
576 (typesCausingAbort[TYPE(header)])++;
578 #if defined(STMSTATS)||defined(SOFTABORT)
579 if (getTotalAbortCount(i+1, numoidrdlocked, oidrdlocked, (void *) oidrdversion, 0))
582 DEBUGSTM("RD Abort: rd: %u wr: %u tot: %u type: %u ver: %u\n", numoidrdlocked, numoidwrlocked, c_numelements, TYPE(header), header->version);
583 DEBUGSTMSTAT("RD Abort: Access Count: %u AbortCount: %u type: %u ver: %u \n", header->accessCount, header->abortCount, TYPE(header), header->version);
586 return TRANS_SOFT_ABORT;
592 /* Decide the final response */
594 transCommitProcess(oidwrlocked, numoidwrlocked, commitmethod, primitives, locals, params);
596 transCommitProcess(oidwrlocked, numoidwrlocked);
598 DEBUGSTM("Commit: rd: %u wr: %u tot: %u\n", numoidrdlocked, numoidwrlocked, c_numelements);
603 /* ==================================================
605 * - goes through the transaction cache and
606 * - decides if a transaction should commit or abort
607 * ==================================================
611 int alttraverseCache(void (*commitmethod)(void *, void *, void *), void * primitives, void * locals, void * params) {
613 int alttraverseCache() {
615 /* Create info to keep track of objects that can be locked */
616 int numoidrdlocked=0;
617 int numoidwrlocked=0;
618 void * rdlocked[200];
620 void * wrlocked[200];
627 int t_numelements=c_numelements+dc_c_numelements;
628 if (t_numelements<200) {
629 oidwrlocked=wrlocked;
631 oidwrlocked=malloc(t_numelements*sizeof(void *));
633 if (c_numelements<200) {
634 oidrdlocked=rdlocked;
635 oidrdversion=rdversion;
637 int size=c_numelements*sizeof(void*);
638 oidrdlocked=malloc(size);
639 oidrdversion=malloc(size);
642 if (c_numelements<200) {
643 oidrdlocked=rdlocked;
644 oidrdversion=rdversion;
645 oidwrlocked=wrlocked;
647 int size=c_numelements*sizeof(void*);
648 oidrdlocked=malloc(size);
649 oidrdversion=malloc(size);
650 oidwrlocked=malloc(size);
653 chashlistnode_t *curr = c_list;
654 /* Inner loop to traverse the linked list of the cache lookupTable */
655 while(likely(curr != NULL)) {
656 //if the first bin in hash table is empty
657 objheader_t * headeraddr=&((objheader_t *) curr->val)[-1];
658 objheader_t *header=(objheader_t *)(((char *)curr->key)-sizeof(objheader_t));
659 unsigned int version = headeraddr->version;
661 if(STATUS(headeraddr) & DIRTY) {
662 /* Read from the main heap and compare versions */
663 if(likely(write_trylock(&header->lock))) { //can aquire write lock
664 if (likely(version == header->version)) { /* versions match */
665 /* Keep track of objects locked */
666 oidwrlocked[numoidwrlocked++] = header;
668 oidwrlocked[numoidwrlocked++] = header;
669 transAbortProcess(oidwrlocked, numoidwrlocked);
672 (typesCausingAbort[TYPE(header)])++;
673 getTotalAbortCount(0, 1, (void *) curr->next, NULL, 1);
675 DEBUGSTM("WR Abort: rd: %u wr: %u tot: %u type: %u ver: %u\n", numoidrdlocked, numoidwrlocked, c_numelements, TYPE(header), header->version);
676 DEBUGSTMSTAT("WR Abort: Access Count: %u AbortCount: %u type: %u ver: %u \n", header->accessCount, header->abortCount, TYPE(header), header->version);
680 } else { /* cannot aquire lock */
681 if(version == header->version) {
685 transAbortProcess(oidwrlocked, numoidwrlocked);
688 (typesCausingAbort[TYPE(header)])++;
690 #if defined(STMSTATS)||defined(SOFTABORT)
691 if (getTotalAbortCount(0, 1, (void *) curr->next, NULL, 1))
694 DEBUGSTM("WR Abort: rd: %u wr: %u tot: %u type: %u ver: %u\n", numoidrdlocked, numoidwrlocked, c_numelements, TYPE(header), header->version);
695 DEBUGSTMSTAT("WR Abort: Access Count: %u AbortCount: %u type: %u ver: %u \n", header->accessCount, header->abortCount, TYPE(header), header->version);
698 return TRANS_SOFT_ABORT;
703 /* Read from the main heap and compare versions */
704 oidrdversion[numoidrdlocked]=version;
705 oidrdlocked[numoidrdlocked++] = header;
711 //acquire other locks
712 chashlistnode_t *dc_curr = dc_c_list;
713 /* Inner loop to traverse the linked list of the cache lookupTable */
714 while(likely(dc_curr != NULL)) {
715 //if the first bin in hash table is empty
716 objheader_t * headeraddr=&((objheader_t *) dc_curr->val)[-1];
717 objheader_t *header=(objheader_t *)(((char *)dc_curr->key)-sizeof(objheader_t));
718 if(write_trylock(&header->lock)) { //can aquire write lock
719 oidwrlocked[numoidwrlocked++] = header;
721 //maybe we already have lock
722 chashlistnode_t *node = &c_table[(((unsigned INTPTR)key) & c_mask)>>4];
725 if(node->key == key) {
729 } while(node != NULL);
731 //have to abort to avoid deadlock
732 transAbortProcess(oidwrlocked, numoidwrlocked);
735 (typesCausingAbort[TYPE(header)])++;
737 #if defined(STMSTATS)||defined(SOFTABORT)
738 if (getTotalAbortCount(i+1, size, (void *)(curr->next), NULL, 1))
741 DEBUGSTM("WR Abort: rd: %u wr: %u tot: %u type: %u ver: %u\n", numoidrdlocked, numoidwrlocked, c_numelements, TYPE(header), header->version);
742 DEBUGSTMSTAT("WR Abort: Access Count: %u AbortCount: %u type: %u ver: %u \n", header->accessCount, header->abortCount, TYPE(header), header->version);
745 return TRANS_SOFT_ABORT;
750 dc_curr = dc_curr->lnext;
754 //THIS IS THE SERIALIZATION END POINT (START POINT IS END OF EXECUTION)*****
756 for(i=0; i<numoidrdlocked; i++) {
757 objheader_t * header=oidrdlocked[i];
758 unsigned int version=oidrdversion[i];
759 if(header->lock>=0) {
760 if(version != header->version) {
761 transAbortProcess(oidwrlocked, numoidwrlocked);
764 (typesCausingAbort[TYPE(header)])++;
765 getTotalAbortCount(i+1, numoidrdlocked, oidrdlocked, (void *)oidrdversion, 0);
767 DEBUGSTM("RD Abort: rd: %u wr: %u tot: %u type: %u ver: %u\n", numoidrdlocked, numoidwrlocked, c_numelements, TYPE(header), header->version);
768 DEBUGSTMSTAT("RD Abort: Access Count: %u AbortCount: %u type: %u ver: %u \n", header->accessCount, header->abortCount, TYPE(header), header->version);
772 } else { /* cannot aquire lock */
773 if(version == header->version) {
776 transAbortProcess(oidwrlocked, numoidwrlocked);
779 (typesCausingAbort[TYPE(header)])++;
781 #if defined(STMSTATS)||defined(SOFTABORT)
782 if (getTotalAbortCount(i+1, numoidrdlocked, oidrdlocked, (void *)oidrdversion, 0))
785 DEBUGSTM("RD Abort: rd: %u wr: %u tot: %u type: %u ver: %u\n", numoidrdlocked, numoidwrlocked, c_numelements, TYPE(header), header->version);
786 DEBUGSTMSTAT("RD Abort: Access Count: %u AbortCount: %u type: %u ver: %u \n", header->accessCount, header->abortCount, TYPE(header), header->version);
789 return TRANS_SOFT_ABORT;
795 /* Decide the final response */
797 transCommitProcess(oidwrlocked, numoidwrlocked, commitmethod, primitives, locals, params);
799 transCommitProcess(oidwrlocked, numoidwrlocked);
801 DEBUGSTM("Commit: rd: %u wr: %u tot: %u\n", numoidrdlocked, numoidwrlocked, c_numelements);
806 int altalttraverseCache() {
807 /* Create info to keep track of objects that can be locked */
808 int numoidrdlocked=0;
809 int numoidwrlocked=0;
810 void * rdlocked[200];
812 void * wrlocked[200];
818 if (c_numelements<200) {
819 oidrdlocked=rdlocked;
820 oidrdversion=rdversion;
821 oidwrlocked=wrlocked;
823 int size=c_numelements*sizeof(void*);
824 oidrdlocked=malloc(size);
825 oidrdversion=malloc(size);
826 oidwrlocked=malloc(size);
829 objstr_t * curr=t_cache;
831 while(curr != NULL) {
834 for(bottom=(char *)(curr+1);bottom < ((char *)curr->top);) {
835 objheader_t *headeraddr=(objheader_t *)bottom;
836 objheader_t *header=OID(headeraddr)-sizeof(objheader_t);
837 unsigned int version = headeraddr->version;
839 if(STATUS(headeraddr) & DIRTY) {
840 /* Read from the main heap and compare versions */
841 if(likely(write_trylock(&header->lock))) { //can aquire write lock
842 if (likely(version == header->version)) { /* versions match */
843 /* Keep track of objects locked */
844 oidwrlocked[numoidwrlocked++] = header;
846 oidwrlocked[numoidwrlocked++] = header;
847 transAbortProcess(oidwrlocked, numoidwrlocked);
850 (typesCausingAbort[TYPE(header)])++;
851 //getTotalAbortCount(0, 1, (void *) curr->next, NULL, 1);
853 DEBUGSTM("WR Abort: rd: %u wr: %u tot: %u type: %u ver: %u\n", numoidrdlocked, numoidwrlocked, c_numelements, TYPE(header), header->version);
854 DEBUGSTMSTAT("WR Abort: Access Count: %u AbortCount: %u type: %u ver: %u \n", header->accessCount, header->abortCount, TYPE(header), header->version);
855 if (c_numelements>=200) {
862 } else { /* cannot aquire lock */
863 if(version == header->version) {
867 transAbortProcess(oidwrlocked, numoidwrlocked);
870 (typesCausingAbort[TYPE(header)])++;
872 #if defined(STMSTATS)||defined(SOFTABORT)
873 // if (getTotalAbortCount(0, 1, (void *) curr->next, NULL, 1))
876 DEBUGSTM("WR Abort: rd: %u wr: %u tot: %u type: %u ver: %u\n", numoidrdlocked, numoidwrlocked, c_numelements, TYPE(header), header->version);
877 DEBUGSTMSTAT("WR Abort: Access Count: %u AbortCount: %u type: %u ver: %u \n", header->accessCount, header->abortCount, TYPE(header), header->version);
878 if (c_numelements>=200) {
884 return TRANS_SOFT_ABORT;
889 /* Read from the main heap and compare versions */
890 oidrdversion[numoidrdlocked]=version;
891 oidrdlocked[numoidrdlocked++] = header;
894 GETSIZE(size, headeraddr);
895 size+=sizeof(objheader_t);
903 //THIS IS THE SERIALIZATION POINT *****
904 for(i=0; i<numoidrdlocked; i++) {
905 objheader_t * header = oidrdlocked[i];
906 unsigned int version=oidrdversion[i];
907 if(header->lock>=0) {
908 if(version != header->version) {
909 transAbortProcess(oidwrlocked, numoidwrlocked);
912 (typesCausingAbort[TYPE(header)])++;
913 getTotalAbortCount(i+1, numoidrdlocked, oidrdlocked, (void *)oidrdversion, 0);
915 DEBUGSTM("RD Abort: rd: %u wr: %u tot: %u type: %u ver: %u\n", numoidrdlocked, numoidwrlocked, c_numelements, TYPE(header), header->version);
916 DEBUGSTMSTAT("RD Abort: Access Count: %u AbortCount: %u type: %u ver: %u \n", header->accessCount, header->abortCount, TYPE(header), header->version);
917 if (c_numelements>=200) {
924 } else { /* cannot aquire lock */
925 if(version == header->version) {
928 transAbortProcess(oidwrlocked, numoidwrlocked);
931 (typesCausingAbort[TYPE(header)])++;
933 #if defined(STMSTATS)||defined(SOFTABORT)
934 if (getTotalAbortCount(i+1, numoidrdlocked, oidrdlocked, (void *)oidrdversion, 0))
937 DEBUGSTM("RD Abort: rd: %u wr: %u tot: %u type: %u ver: %u\n", numoidrdlocked, numoidwrlocked, c_numelements, TYPE(header), header->version);
938 DEBUGSTMSTAT("RD Abort: Access Count: %u AbortCount: %u type: %u ver: %u \n", header->accessCount, header->abortCount, TYPE(header), header->version);
939 if (c_numelements>=200) {
945 return TRANS_SOFT_ABORT;
951 /* Decide the final response */
952 transCommitProcess(oidwrlocked, numoidwrlocked);
953 DEBUGSTM("Commit: rd: %u wr: %u tot: %u\n", numoidrdlocked, numoidwrlocked, c_numelements);
954 if (c_numelements>=200) {
963 /* ==================================
966 * =================================
968 void transAbortProcess(void **oidwrlocked, int numoidwrlocked) {
971 /* Release read locks */
973 /* Release write locks */
974 for(i=numoidwrlocked-1; i>=0; i--) {
975 /* Read from the main heap */
976 header = (objheader_t *)oidwrlocked[i];
977 write_unlock(&header->lock);
981 /* clear trec and then release objects locked */
982 struct objlist *ptr=lockedobjs;
985 for(i=max-1; i>=0; i--) {
986 header = (objheader_t *)ptr->objs[i];
988 pthread_mutex_unlock(header->objlock);
995 /* ==================================
998 * =================================
1001 void transCommitProcess(void ** oidwrlocked, int numoidwrlocked, void (*commitmethod)(void *, void *, void *), void * primitives, void * locals, void * params) {
1003 void transCommitProcess(void ** oidwrlocked, int numoidwrlocked) {
1005 objheader_t *header;
1008 struct objlist *ptr=newobjs;
1010 int max=ptr->offset;
1011 for(i=0; i<max; i++) {
1012 //clear the new flag
1013 ((struct ___Object___ *)ptr->objs[i])->___objstatus___=0;
1018 /* Copy from transaction cache -> main object store */
1019 for (i = numoidwrlocked-1; i >=0; i--) {
1020 /* Read from the main heap */
1021 header = (objheader_t *)oidwrlocked[i];
1023 GETSIZE(tmpsize, header);
1024 struct ___Object___ *dst=(struct ___Object___*)(((char *)oidwrlocked[i])+sizeof(objheader_t));
1025 struct ___Object___ *src=t_chashSearch(dst);
1026 dst->___cachedCode___=src->___cachedCode___;
1027 dst->___cachedHash___=src->___cachedHash___;
1028 A_memcpy(&dst[1], &src[1], tmpsize-sizeof(struct ___Object___));
1029 __asm__ __volatile__("": : :"memory");
1032 __asm__ __volatile__("": : :"memory");
1035 // call commit method
1036 commitmethod(primitives, locals, params);
1039 /* Release write locks */
1040 for(i=numoidwrlocked-1; i>=0; i--) {
1041 header = (objheader_t *)oidwrlocked[i];
1042 write_unlock(&header->lock);
1046 /* clear trec and then release objects locked */
1049 int max=ptr->offset;
1050 for(i=max-1; i>=0; i--) {
1051 header = (objheader_t *)ptr->objs[i];
1052 header->trec = NULL;
1053 pthread_mutex_unlock(header->objlock);
1060 /** ========================================================================================
1061 * getTotalAbortCount
1062 * params : start: start index of the loop
1063 * : stop: stop index of the loop
1064 * : startptr: pointer that points to where to start looking in the array/ linked list
1065 * 0='r'/1='w' if found when visiting objects read/ objects modified
1066 * =========================================================================================
1068 #if defined(STMSTATS)||defined(SOFTABORT)
1069 int getTotalAbortCount(int start, int stop, void *startptr, void *checkptr, int type) {
1073 int isFirstTime = 0;
1074 chashlistnode_t *curr = (chashlistnode_t *) startptr;
1075 chashlistnode_t *ptr = c_table;
1076 for(i = start; i < stop; i++) {
1079 /* Inner loop to traverse the linked list of the cache lookupTable */
1080 while(curr != NULL) {
1081 if(curr->key == NULL)
1083 objheader_t * headeraddr=&((objheader_t *) curr->val)[-1];
1084 objheader_t *header=(objheader_t *)(((char *)curr->key)-sizeof(objheader_t));
1085 unsigned int version = headeraddr->version;
1086 /* versions do not match */
1087 if(version != header->version) {
1090 (typesCausingAbort[TYPE(header)])++;
1099 /* Go through oids read that are locked */
1100 for(i = start; i < stop; i++) {
1101 objheader_t *header = ((void **)startptr)[i];
1102 unsigned int version = ((int *)checkptr)[i];
1103 if(version != header->version) { /* versions do not match */
1106 (typesCausingAbort[TYPE(header)])++;
1117 * params: Object header
1118 * Locks an object that causes aborts
1120 objheader_t * needLock(objheader_t *header, void *gl) {
1123 while((lockstatus = pthread_mutex_trylock(header->objlock))
1124 && ((ptr = header->trec) == NULL)) { //retry
1127 if(lockstatus==0) { //acquired lock
1129 header->trec = trec;
1130 } else { //failed to get lock
1133 __asm__ __volatile__("":::"memory");
1134 //see if other thread is blocked
1135 if(ptr->blocked == 1) {
1136 //it might be block, so ignore lock and clear our blocked flag
1141 INTPTR ptrarray[]={1, (INTPTR)gl, (INTPTR) header};
1142 void *lockptr=header->objlock;
1143 stopforgc((struct garbagelist *)ptrarray);
1144 //grab lock and wait our turn
1145 pthread_mutex_lock(lockptr);
1147 header=(objheader_t *) ptrarray[2];
1149 pthread_mutex_lock(header->objptr);
1151 /* we have lock, so we are not blocked anymore */
1154 header->trec = trec;
1157 //trec->blocked is zero now
1159 /* Save the locked object */
1160 if (lockedobjs->offset<MAXOBJLIST) {
1161 lockedobjs->objs[lockedobjs->offset++]=header;
1163 struct objlist *tmp=malloc(sizeof(struct objlist));
1164 tmp->next=lockedobjs;
1165 tmp->objs[0]=header;