Fixed bugs..Atomic2.java testcase works fine
authoradash <adash>
Sat, 22 Sep 2007 00:06:08 +0000 (00:06 +0000)
committeradash <adash>
Sat, 22 Sep 2007 00:06:08 +0000 (00:06 +0000)
Robust/src/Runtime/DSTM/interface/dstm.h
Robust/src/Runtime/DSTM/interface/dstmserver.c
Robust/src/Runtime/DSTM/interface/objstr.c
Robust/src/Runtime/DSTM/interface/trans.c
Robust/src/Runtime/callconventions [new file with mode: 0644]

index 9ef13b5a22d3fe5b785eee26bea953c8f17f984d..ede8fdda9e2da582707a46beda70dc8d9e168c93 100644 (file)
@@ -226,12 +226,14 @@ objheader_t *transRead(transrecord_t *, unsigned int);
 objheader_t *transCreateObj(transrecord_t *, unsigned int); //returns oid
 int transCommit(transrecord_t *record); //return 0 if successful
 void *transRequest(void *);    //the C routine that the thread will execute when TRANS_REQUEST begins
-void *handleLocalReq(void *);  //the C routine that the local m/c thread will execute 
 void decideResponse(thread_data_array_t *);// Coordinator decides what response to send to the participant
 char sendResponse(thread_data_array_t *, int); //Sends control message back to Participants
 void *getRemoteObj(transrecord_t *, unsigned int, unsigned int);
-int transAbortProcess(void *, unsigned int *, int, int);
-int transComProcess(void*, unsigned int *, unsigned int *, unsigned int *, int, int, int);
+void *handleLocalReq(void *);
+int transComProcess(local_thread_data_array_t *);
+int transAbortProcess(local_thread_data_array_t *);
+
 void prefetch(int, unsigned int *, unsigned short *, short*);
 void *transPrefetch(void *);
 void *mcqProcess(void *);
index 144e5e81259bb22be97b55b3b9994631c17119e7..00b54b0be5cd51f740d1b89102902eff30ab7e43 100644 (file)
@@ -203,13 +203,9 @@ void *dstmAccept(void *acceptfd)
                                printf("dstmAccept(): incorrect msg size %d for START_REMOTE_THREAD\n",
                                        retval);
                        else
-                       { //TODO: execute run method on this global thread object
-                               printf("dstmAccept(): received START_REMOTE_THREAD msg, oid=0x%x\n", oid);
+                       {
                                objType = getObjType(oid);
-                               printf("dstmAccept(): type of object 0x%x is %d\n", oid, objType);
                                startDSMthread(oid, objType);
-
-
                        }
                        break;
 
@@ -270,16 +266,11 @@ int readClientReq(trans_commit_data_t *transinfo, int acceptfd) {
        }
        
        /* Read modified objects */
-       if(fixed.nummod != 0) { // If pile contains more than one modified object,
-                               // allocate new object store and recv all modified objects
-                               // TODO deallocate this space
-               pthread_mutex_lock(&mainobjstore_mutex);
-               if ((modptr = objstrAlloc(mainobjstore, fixed.sum_bytes)) == NULL) {
-                       printf("objstrAlloc error for modified objects %s, %d\n", __FILE__, __LINE__);
-                       pthread_mutex_unlock(&mainobjstore_mutex);
+       if(fixed.nummod != 0) {
+               if ((modptr = calloc(1, fixed.sum_bytes)) == NULL) {
+                       printf("calloc error for modified objects %s, %d\n", __FILE__, __LINE__);
                        return 1;
                }
-               pthread_mutex_unlock(&mainobjstore_mutex);
                sum = 0;
                do { // Recv the objs that are modified by the Coordinator
                        n = recv((int)acceptfd, (char *) modptr+sum, fixed.sum_bytes-sum, 0);
@@ -310,7 +301,6 @@ int readClientReq(trans_commit_data_t *transinfo, int acceptfd) {
                /* Free resources */
                if(oidmod != NULL) {
                        free(oidmod);
-                       oidmod = NULL;
                }
                return 1;
        }
@@ -318,7 +308,6 @@ int readClientReq(trans_commit_data_t *transinfo, int acceptfd) {
        /* Free resources */
        if(oidmod != NULL) {
                free(oidmod);
-               oidmod = NULL;
        }
 
        return 0;
@@ -349,15 +338,8 @@ int processClientReq(fixed_data_t *fixed, trans_commit_data_t *transinfo,
        /* Process the new control message */
        switch(control) {
                case TRANS_ABORT:
-                       /* Set all ref counts as 1 and do garbage collection */
-                       ptr = modptr;
-                       for(i = 0; i< fixed->nummod; i++) {
-                         int tmpsize;
-                         tmp_header = (objheader_t *)ptr;
-                         tmp_header->rcount = 0;
-                         GETSIZE(tmpsize, tmp_header);
-                         ptr += sizeof(objheader_t) + tmpsize;
-                       }
+                       if (fixed->nummod > 0)
+                               free(modptr);
                        /* Unlock objects that was locked due to this transaction */
                        for(i = 0; i< transinfo->numlocked; i++) {
                                header = mhashSearch(transinfo->objlocked[i]);// find the header address
@@ -385,8 +367,6 @@ int processClientReq(fixed_data_t *fixed, trans_commit_data_t *transinfo,
                        if((val = transCommitProcess(modptr, oidmod, transinfo->objlocked, fixed->nummod, transinfo->numlocked, (int)acceptfd)) != 0) {
                                printf("Error in transCommitProcess %s, %d\n", __FILE__, __LINE__);
                                /* Free memory */
-                               printf("DEBUG -> Freeing...\n");
-                               fflush(stdout);
                                if (transinfo->objlocked != NULL) {
                                        free(transinfo->objlocked);
                                }
@@ -408,9 +388,6 @@ int processClientReq(fixed_data_t *fixed, trans_commit_data_t *transinfo,
        }
 
        /* Free memory */
-       printf("DEBUG -> Freeing...\n");
-       fflush(stdout);
-
        if (transinfo->objlocked != NULL) {
                free(transinfo->objlocked);
        }
@@ -425,7 +402,7 @@ int processClientReq(fixed_data_t *fixed, trans_commit_data_t *transinfo,
  * in TRANS_REQUEST and If a TRANS_DISAGREE sends the response immediately back to the coordinator */
 char handleTransReq(fixed_data_t *fixed, trans_commit_data_t *transinfo, unsigned int *listmid, char *objread, void *modptr, int acceptfd) {
        int val, i = 0;
-       short version;
+       unsigned short version;
        char control = 0, *ptr;
        unsigned int oid;
        unsigned int *oidnotfound, *oidlocked;
@@ -450,7 +427,7 @@ char handleTransReq(fixed_data_t *fixed, trans_commit_data_t *transinfo, unsigne
                        incr *= i;
                        oid = *((unsigned int *)(objread + incr));
                        incr += sizeof(unsigned int);
-                       version = *((short *)(objread + incr));
+                       version = *((unsigned short *)(objread + incr));
                } else {//Objs modified
                  int tmpsize;
                  headptr = (objheader_t *) ptr;
@@ -483,12 +460,6 @@ char handleTransReq(fixed_data_t *fixed, trans_commit_data_t *transinfo, unsigne
                                }
                        } else {/* If Obj is not locked then lock object */
                                STATUS(((objheader_t *)mobj)) |= LOCK;
-                              
-                               /*TESTING Add random wait to make transactions run for a long time such that
-                                * we can test for soft abort case */
-                       
-                               //randomdelay();
-
                                /* Save all object oids that are locked on this machine during this transaction request call */
                                oidlocked[objlocked] = OID(((objheader_t *)mobj));
                                objlocked++;
@@ -575,8 +546,10 @@ int decideCtrlMessage(fixed_data_t *fixed, trans_commit_data_t *transinfo, int *
  * Sends an ACK back to Coordinator */
 int transCommitProcess(void *modptr, unsigned int *oidmod, unsigned int *oidlocked, int nummod, int numlocked, int acceptfd) {
        objheader_t *header;
+       objheader_t *newheader;
        int i = 0, offset = 0;
        char control;
+       int tmpsize;
 
        /* Process each modified object saved in the mainobject store */
        for(i = 0; i < nummod; i++) {
@@ -584,22 +557,17 @@ int transCommitProcess(void *modptr, unsigned int *oidmod, unsigned int *oidlock
                        printf("mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__);
                        return 1;
                }
-               /* Change reference count of older address and free space in objstr ?? */
-               header->rcount = 0;
-
-               /* Change ptr address in mhash table */
-               mhashRemove(oidmod[i]);
-               mhashInsert(oidmod[i], (((char *)modptr) + offset));
-               {
-                 int tmpsize;
-                 GETSIZE(tmpsize,header);
-                 offset += sizeof(objheader_t) + tmpsize;
-               }
-               /* Update object version number */
-               header = (objheader_t *) mhashSearch(oidmod[i]);
+               GETSIZE(tmpsize,header);
+               pthread_mutex_lock(&mainobjstore_mutex);
+               memcpy(header, (char *)modptr + offset, tmpsize + sizeof(objheader_t));
                header->version += 1; 
+               pthread_mutex_unlock(&mainobjstore_mutex);
+               offset += sizeof(objheader_t) + tmpsize;
        }
 
+       if (nummod > 0)
+               free(modptr);
+
        /* Unlock locked objects */
        for(i = 0; i < numlocked; i++) {
                if((header = (objheader_t *) mhashSearch(oidlocked[i])) == NULL) {
index 4a66fa54565f9e81d87e310c58395f4dd6aec7a6..08b3c5712ebf07e2e123be0717d6fec836f12b05 100644 (file)
@@ -38,12 +38,16 @@ void *objstrAlloc(objstr_t *store, unsigned int size)
                        if (size > DEFAULT_OBJ_STORE_SIZE) //in case of large objects
                        {
                                store->next = (objstr_t *)malloc(sizeof(objstr_t) + size);
+                               if (store->next == NULL)
+                                       return NULL;
                                store = store->next;
                                store->size = size;
                        }
                        else
                        {
                                store->next = malloc(sizeof(objstr_t) + DEFAULT_OBJ_STORE_SIZE);
+                               if (store->next == NULL)
+                                       return NULL;
                                store = store->next;
                                store->size = DEFAULT_OBJ_STORE_SIZE;
                        }
index cf7f77e707c8a5ebd33f2419854303b8e7e64349..e76fc95c74694ff09550a781035e3dc1f60e8f5c 100644 (file)
@@ -46,6 +46,22 @@ unsigned int oidsPerBlock;
 unsigned int oidMin;
 unsigned int oidMax;
 
+void printhex(unsigned char *, int);
+
+void printhex(unsigned char *ptr, int numBytes)
+{
+       int i;
+       for (i = 0; i < numBytes; i++)
+       {
+               if (ptr[i] < 16)
+                       printf("0%x ", ptr[i]);
+               else
+                       printf("%x ", ptr[i]);
+       }
+       printf("\n");
+       return;
+}
+
 plistnode_t *createPiles(transrecord_t *);
 inline int arrayLength(int *array) {
        int i;
@@ -186,7 +202,6 @@ void randomdelay(void)
 /* This function initializes things required in the transaction start*/
 transrecord_t *transStart()
 {
-       printf("Starting transaction\n");
        transrecord_t *tmp = malloc(sizeof(transrecord_t));
        tmp->cache = objstrCreate(1048576);
        tmp->lookupTable = chashCreate(HASH_SIZE, LOADFACTOR);
@@ -209,7 +224,15 @@ objheader_t *transRead(transrecord_t *record, unsigned int oid) {
         
        rc = gettimeofday(&tp, NULL);
 
+       /* 1ms delay */
+       tp.tv_usec += 1000;
+       if (tp.tv_usec >= 1000000)
+       {
+               tp.tv_usec -= 1000000;
+               tp.tv_sec += 1;
+       }
        /* Convert from timeval to timespec */
+       ts.tv_sec = tp.tv_sec;
        ts.tv_nsec = tp.tv_usec * 1000;
 
        /* Search local transaction cache */
@@ -223,7 +246,6 @@ objheader_t *transRead(transrecord_t *record, unsigned int oid) {
                /* Look up in machine lookup table  and copy  into cache*/
                GETSIZE(size, objheader);
                size += sizeof(objheader_t);
-               //TODO:Lock the local trans cache while copying the object here
                objcopy = objstrAlloc(record->cache, size);
                memcpy(objcopy, (void *)objheader, size);
                /* Insert into cache's lookup table */
@@ -234,10 +256,8 @@ objheader_t *transRead(transrecord_t *record, unsigned int oid) {
                return objcopy;
 #endif
        } else if((tmp = (objheader_t *) prehashSearch(oid)) != NULL) { /* Look up in prefetch cache */
-               found = 1;
                GETSIZE(size, tmp);
                size+=sizeof(objheader_t);
-               //TODO:Lock the local  trans cache while copying the object here
                objcopy = objstrAlloc(record->cache, size);
                memcpy(objcopy, (void *)tmp, size);
                /* Insert into cache's lookup table */
@@ -252,26 +272,24 @@ objheader_t *transRead(transrecord_t *record, unsigned int oid) {
                pthread_mutex_lock(&pflookup.lock);
                while(!found) {
                        rc = pthread_cond_timedwait(&pflookup.cond, &pflookup.lock, &ts);
-                       if(rc == ETIMEDOUT) {
-                               printf("Wait timed out\n");
-                               /* Check Prefetch cache again */
-                               if((tmp =(objheader_t *) prehashSearch(oid)) != NULL) {
-                                       found = 1;
-                                       GETSIZE(size,tmp);
-                                       size+=sizeof(objheader_t);
-                                       objcopy = objstrAlloc(record->cache, size);
-                                       memcpy(objcopy, (void *)tmp, size);
-                                       chashInsert(record->lookupTable, OID(tmp), objcopy); 
-                                       pthread_mutex_unlock(&pflookup.lock);
+                       /* Check Prefetch cache again */
+                       if((tmp =(objheader_t *) prehashSearch(oid)) != NULL) {
+                               found = 1;
+                               GETSIZE(size,tmp);
+                               size+=sizeof(objheader_t);
+                               objcopy = objstrAlloc(record->cache, size);
+                               memcpy(objcopy, (void *)tmp, size);
+                               chashInsert(record->lookupTable, OID(tmp), objcopy); 
+                               pthread_mutex_unlock(&pflookup.lock);
 #ifdef COMPILER
-                                       return &objcopy[1];
+                               return &objcopy[1];
 #else
-                                       return objcopy;
+                               return objcopy;
 #endif
-                               } else {
+                       } else if (rc == ETIMEDOUT) {
+//                                     printf("Wait timed out\n");
                                        pthread_mutex_unlock(&pflookup.lock);
                                        break;
-                               }
                        }
                }
 
@@ -701,6 +719,11 @@ void decideResponse(thread_data_array_t *tdata) {
                /* Send Abort */
                *(tdata->replyctrl) = TRANS_ABORT;
                *(tdata->replyretry) = 0;
+               /* clear objects from prefetch cache */
+               for (i = 0; i < tdata->buffer->f.numread; i++)
+                       prehashRemove(*(unsigned int *)(tdata->buffer->objread + (sizeof(unsigned int) + sizeof(unsigned short))*i));
+               for (i = 0; i < tdata->buffer->f.nummod; i++)
+                       prehashRemove(tdata->buffer->oidmod[i]);
        } else if(transagree == tdata->buffer->f.mcount){
                /* Send Commit */
                *(tdata->replyctrl) = TRANS_COMMIT;
@@ -790,9 +813,6 @@ void *getRemoteObj(transrecord_t *record, unsigned int mnum, unsigned int oid) {
                return NULL;
        }
 
-#ifdef DEBUG1
-       printf("DEBUG -> ready to rcv ...\n");
-#endif
        /* Read response from the Participant */
        if((val = read(sd, &control, sizeof(char))) <= 0) {
                perror("No control response for getRemoteObj sent\n");
@@ -831,73 +851,44 @@ void *getRemoteObj(transrecord_t *record, unsigned int mnum, unsigned int oid) {
  * based on common agreement it either commits or aborts the transaction.
  * It also frees the memory resources */
 void *handleLocalReq(void *threadarg) {
-       int val, i = 0, size, offset = 0;
-       short version;
-       char control = 0, *ptr;
-       unsigned int oid;
        unsigned int *oidnotfound = NULL, *oidlocked = NULL;
-       void *mobj, *modptr;
-       objheader_t *headptr, *headeraddr;
        local_thread_data_array_t *localtdata;
+       int objnotfound = 0, objlocked = 0; 
+       int v_nomatch = 0, v_matchlock = 0, v_matchnolock = 0;
+       int numread, i;
+       unsigned int oid;
+       unsigned short version;
+       void *mobj;
+       objheader_t *headptr;
 
        localtdata = (local_thread_data_array_t *) threadarg;
 
        /* Counters and arrays to formulate decision on control message to be sent */
        oidnotfound = (unsigned int *) calloc((localtdata->tdata->buffer->f.numread + localtdata->tdata->buffer->f.nummod), sizeof(unsigned int));
        oidlocked = (unsigned int *) calloc((localtdata->tdata->buffer->f.numread + localtdata->tdata->buffer->f.nummod), sizeof(unsigned int));
-       int objnotfound = 0, objlocked = 0; 
-       int v_nomatch = 0, v_matchlock = 0, v_matchnolock = 0;
-
-       /* modptr points to the beginning of the object store 
-        * created at the Pariticipant */ 
-       pthread_mutex_lock(&mainobjstore_mutex);
-       if ((modptr = objstrAlloc(mainobjstore, localtdata->tdata->buffer->f.sum_bytes)) == NULL) {
-               printf("objstrAlloc error for modified objects %s, %d\n", __FILE__, __LINE__);
-               pthread_mutex_unlock(&mainobjstore_mutex);
-               pthread_exit(NULL);
-       }
-       pthread_mutex_unlock(&mainobjstore_mutex);
-       /* Write modified objects into the mainobject store */
-       for(i = 0; i< localtdata->tdata->buffer->f.nummod; i++) {
-               headeraddr = chashSearch(localtdata->tdata->rec->lookupTable, localtdata->tdata->buffer->oidmod[i]);
-               GETSIZE(size,headeraddr);
-               size+=sizeof(objheader_t);
-               memcpy((char *)modptr+offset, headeraddr, size);  
-               offset += size;
-       }
-       /* Write new objects into the mainobject store */
-       for(i = 0; i< localtdata->tdata->buffer->f.numcreated; i++) {
-               headeraddr = chashSearch(localtdata->tdata->rec->lookupTable, localtdata->tdata->buffer->oidcreated[i]);
-               GETSIZE(size, headeraddr);
-               size+=sizeof(objheader_t);
-               memcpy((char *)modptr+offset, headeraddr, size);  
-               offset += size;
-       }
-
-       ptr = modptr;
-       offset = 0; //Reset 
 
+       numread = localtdata->tdata->buffer->f.numread;
        /* Process each oid in the machine pile/ group per thread */
        for (i = 0; i < localtdata->tdata->buffer->f.numread + localtdata->tdata->buffer->f.nummod; i++) {
-               if (i < localtdata->tdata->buffer->f.numread) {//Objs only read and not modified
+               if (i < localtdata->tdata->buffer->f.numread) {
                        int incr = sizeof(unsigned int) + sizeof(short);// Offset that points to next position in the objread array
                        incr *= i;
                        oid = *((unsigned int *)(localtdata->tdata->buffer->objread + incr));
-                       incr += sizeof(unsigned int);
-                       version = *((short *)(localtdata->tdata->buffer->objread + incr));
-               } else {//Objs modified
+                       version = *((short *)(localtdata->tdata->buffer->objread + incr + sizeof(unsigned int)));
+               } else { // Objects Modified
                        int tmpsize;
-                       headptr = (objheader_t *)ptr;
+                       headptr = (objheader_t *) chashSearch(localtdata->tdata->rec->lookupTable, localtdata->tdata->buffer->oidmod[i-numread]);
+                       if (headptr == NULL) {
+                               printf("Error: handleLocalReq() returning NULL\n");
+                               return NULL;
+                       }
                        oid = OID(headptr);
                        version = headptr->version;
-                       GETSIZE(tmpsize, headptr);
-                       ptr += sizeof(objheader_t) + tmpsize;
                }
-
                /* Check if object is still present in the machine since the beginning of TRANS_REQUEST */
 
                /* Save the oids not found and number of oids not found for later use */
-               if ((mobj = mhashSearch(oid)) == NULL) {/* Obj not found */
+               if ((mobj = mhashSearch(oid)) == NULL) { /* Obj not found */
                        /* Save the oids not found and number of oids not found for later use */
                        oidnotfound[objnotfound] = oid;
                        objnotfound++;
@@ -913,9 +904,6 @@ void *handleLocalReq(void *threadarg) {
                                }
                        } else {/* If Obj is not locked then lock object */
                                STATUS(((objheader_t *)mobj)) |= LOCK;
-                               //TODO Remove this for Testing
-                               //randomdelay(); -- Why is this here.  BCD
-
                                /* Save all object oids that are locked on this machine during this transaction request call */
                                oidlocked[objlocked] = OID(((objheader_t *)mobj));
                                objlocked++;
@@ -928,8 +916,7 @@ void *handleLocalReq(void *threadarg) {
                                }
                        }
                }
-       }
-
+       } // End for
        /* Condition to send TRANS_AGREE */
        if(v_matchnolock == localtdata->tdata->buffer->f.numread + localtdata->tdata->buffer->f.nummod) {
                localtdata->tdata->recvmsg[localtdata->tdata->thread_id].rcv_status = TRANS_AGREE;
@@ -937,25 +924,15 @@ void *handleLocalReq(void *threadarg) {
        /* Condition to send TRANS_SOFT_ABORT */
        if((v_matchlock > 0 && v_nomatch == 0) || (objnotfound > 0 && v_nomatch == 0)) {
                localtdata->tdata->recvmsg[localtdata->tdata->thread_id].rcv_status = TRANS_SOFT_ABORT;
-               //TODO  currently the only soft abort case that is supported is when object locked by previous
-               //transaction => v_matchlock > 0 
-               //The other case for SOFT ABORT i.e. when object is not found but versions match is not supported 
-               /* Send number of oids not found and the missing oids if objects are missing in the machine */
-               /* TODO Remember to store the oidnotfound for later use
-                  if(objnotfound != 0) {
-                  int size = sizeof(unsigned int)* objnotfound;
-                  }
-                  */
        }
 
        /* Fill out the trans_commit_data_t data structure. This is required for a trans commit process
         * if Participant receives a TRANS_COMMIT */
        localtdata->transinfo->objlocked = oidlocked;
        localtdata->transinfo->objnotfound = oidnotfound;
-       localtdata->transinfo->modptr = modptr;
+       localtdata->transinfo->modptr = NULL;
        localtdata->transinfo->numlocked = objlocked;
        localtdata->transinfo->numnotfound = objnotfound;
-
        /* Lock and update count */
        //Thread sleeps until all messages from pariticipants are received by coordinator
        pthread_mutex_lock(localtdata->tdata->lock);
@@ -969,51 +946,38 @@ void *handleLocalReq(void *threadarg) {
                pthread_cond_wait(localtdata->tdata->threshold, localtdata->tdata->lock);
        }
        pthread_mutex_unlock(localtdata->tdata->lock);
-
-       /*Based on DecideResponse(), Either COMMIT or ABORT the operation*/
        if(*(localtdata->tdata->replyctrl) == TRANS_ABORT){
-               if(transAbortProcess(modptr,oidlocked, localtdata->transinfo->numlocked, localtdata->tdata->buffer->f.nummod) != 0) {
+               if(transAbortProcess(localtdata) != 0) {
                        printf("Error in transAbortProcess() %s,%d\n", __FILE__, __LINE__);
                        pthread_exit(NULL);
                }
-       }else if(*(localtdata->tdata->replyctrl) == TRANS_COMMIT){
-               if(transComProcess(modptr, localtdata->tdata->buffer->oidmod, localtdata->tdata->buffer->oidcreated, oidlocked, localtdata->tdata->buffer->f.nummod, localtdata->tdata->buffer->f.numcreated, localtdata->transinfo->numlocked) != 0) {
+       } else if(*(localtdata->tdata->replyctrl) == TRANS_COMMIT) {
+               if(transComProcess(localtdata) != 0) {
                        printf("Error in transComProcess() %s,%d\n", __FILE__, __LINE__);
                        pthread_exit(NULL);
                }
        }
-
        /* Free memory */
        if (localtdata->transinfo->objlocked != NULL) {
                free(localtdata->transinfo->objlocked);
-               localtdata->transinfo->objlocked = NULL;
        }
        if (localtdata->transinfo->objnotfound != NULL) {
                free(localtdata->transinfo->objnotfound);
-               localtdata->transinfo->objnotfound = NULL;
        }
 
        pthread_exit(NULL);
 }
-/* This function completes the ABORT process if the transaction is aborting 
-*/
-int transAbortProcess(void *modptr, unsigned int *objlocked, int numlocked, int nummod) {
-       char *ptr;
-       int i;
-       objheader_t *tmp_header;
+
+/* This function completes the ABORT process if the transaction is aborting */
+int transAbortProcess(local_thread_data_array_t  *localtdata) {
+       int i, numlocked;
+       unsigned int *objlocked;
        void *header;
 
-       /* Set all ref counts as 1 and do garbage collection */
-       ptr = modptr;
-       for(i = 0; i< nummod; i++) {
-               int tmpsize;
-               tmp_header = (objheader_t *)ptr;
-               tmp_header->rcount = 0;
-               GETSIZE(tmpsize, tmp_header);
-               ptr += sizeof(objheader_t) + tmpsize;
-       }
-       /* Unlock objects that was locked due to this transaction */
-       for(i = 0; i< numlocked; i++) {
+       numlocked = localtdata->transinfo->numlocked;
+       objlocked = localtdata->transinfo->objlocked;
+
+       for (i = 0; i < numlocked; i++) {
                if((header = mhashSearch(objlocked[i])) == NULL) {
                        printf("mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__);
                        return 1;
@@ -1021,68 +985,68 @@ int transAbortProcess(void *modptr, unsigned int *objlocked, int numlocked, int
                STATUS(((objheader_t *)header)) &= ~(LOCK);
        }
 
-       /* Send ack to Coordinator */
+       printf("TRANS_ABORTED at Coordinator end\n");
 
-       /*Free the pointer */
-       ptr = NULL;
        return 0;
 }
 
-/*This function completes the COMMIT process is the transaction is commiting
-*/
-int transComProcess(void *modptr, unsigned int *oidmod, unsigned int *oidcreated, unsigned int *objlocked, int nummod, int numcreated, int numlocked) {
-       objheader_t *header;
-       int i = 0, offset = 0;
-       char control;
-
-       /* Process each modified object saved in the mainobject store */
-       for(i = 0; i < nummod; i++) {
-         int tmpsize;
+/*This function completes the COMMIT process is the transaction is commiting*/
+int transComProcess(local_thread_data_array_t  *localtdata) {
+       objheader_t *header, *tcptr;
+       int i, nummod, tmpsize, numcreated, numlocked;
+       unsigned int *oidmod, *oidcreated, *oidlocked;
+       void *ptrcreate;
+
+       nummod = localtdata->tdata->buffer->f.nummod;
+       oidmod = localtdata->tdata->buffer->oidmod;
+       numcreated = localtdata->tdata->buffer->f.numcreated;
+       oidcreated = localtdata->tdata->buffer->oidcreated;
+       numlocked = localtdata->transinfo->numlocked;
+       oidlocked = localtdata->transinfo->objlocked;
+       for (i = 0; i < nummod; i++) {
                if((header = (objheader_t *) mhashSearch(oidmod[i])) == NULL) {
-                       printf("mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__);
+                       printf("Error: transComProcess() mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__);
+                       return 1;
+               }
+               /* Copy from transaction cache -> main object store */
+               if ((tcptr = ((objheader_t *) chashSearch(localtdata->tdata->rec->lookupTable, oidmod[i]))) == NULL) {
+                       printf("Error: transComProcess() chashSearch returned NULL at %s, %d\n", __FILE__, __LINE__);
                        return 1;
                }
-               /* Change reference count of older address and free space in objstr ?? */
-               header->rcount = 0;
-
-               /* Change ptr address in mhash table */
-               mhashRemove(oidmod[i]); //TODO: this shouldn't be necessary
-               mhashInsert(oidmod[i], (((char *)modptr) + offset));
                GETSIZE(tmpsize, header);
-               offset += sizeof(objheader_t) + tmpsize;
 
-               /* Update object version number */
-               header = (objheader_t *) mhashSearch(oidmod[i]);
+               pthread_mutex_lock(&mainobjstore_mutex);
+               memcpy(header, tcptr, tmpsize + sizeof(objheader_t));
                header->version += 1;
+               pthread_mutex_unlock(&mainobjstore_mutex);
        }
-
-       /*If object is in prefetch cache then update it in prefetch cache */ 
-
-
        /* If object is newly created inside transaction then commit it */
-       for (i = 0; i < numcreated; i++)
-       {
-               int tmpsize;
-               header = (objheader_t *)(((char *)modptr) + offset);
-               mhashInsert(oidcreated[i], (((char *)modptr) + offset));
+       for (i = 0; i < numcreated; i++) {
+               if ((header = ((objheader_t *) chashSearch(localtdata->tdata->rec->lookupTable, oidcreated[i]))) == NULL) {
+                       printf("Error: transComProcess() chashSearch returned NULL at %s, %d\n", __FILE__, __LINE__);
+                       return 1;
+               }
                GETSIZE(tmpsize, header);
-               offset += sizeof(objheader_t) + tmpsize;
+               pthread_mutex_lock(&mainobjstore_mutex);
+               if ((ptrcreate = objstrAlloc(mainobjstore, tmpsize)) == NULL) {
+                       printf("Error: transComProcess() failed objstrAlloc\n");
+                       pthread_mutex_unlock(&mainobjstore_mutex);
+                       return 1;
+               }
+               pthread_mutex_unlock(&mainobjstore_mutex);
+               memcpy(ptrcreate, header, tmpsize + sizeof(objheader_t));
+
+               mhashInsert(oidcreated[i], ptrcreate);
                lhashInsert(oidcreated[i], myIpAddr);
        }
-
        /* Unlock locked objects */
        for(i = 0; i < numlocked; i++) {
-               if((header = (objheader_t *) mhashSearch(objlocked[i])) == NULL) {
+               if((header = (objheader_t *) mhashSearch(oidlocked[i])) == NULL) {
                        printf("mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__);
                        return 1;
                }
                STATUS(header) &= ~(LOCK);
        }
-
-       //TODO Update location lookup table
-
-       /* Send ack to Coordinator */
-       printf("TRANS_SUCCESSFUL\n");
        return 0;
 }
 
@@ -1146,7 +1110,6 @@ void checkPrefetchTuples(prefetchqelem_t *node) {
                                } else {
                                        k = endoffsets[i-1];
                                        index = endoffsets[j-1];
-                                       printf("Value of slength = %d\n", slength);
                                        for(count = 0; count < slength; count++) {
                                                if(arryfields[k] != arryfields[index]) {
                                                        break;
@@ -1418,7 +1381,6 @@ void sendPrefetchReq(prefetchpile_t *mcpilenode, int threadid) {
        bzero((char*) &serv_addr, sizeof(serv_addr));
        serv_addr.sin_family = AF_INET;
        serv_addr.sin_port = htons(LISTEN_PORT);
-       //serv_addr.sin_addr.s_addr = inet_addr(MACHINE_IP);
        midtoIP(mcpilenode->mid ,machineip);
        machineip[15] = '\0';
        serv_addr.sin_addr.s_addr = inet_addr(machineip);
diff --git a/Robust/src/Runtime/callconventions b/Robust/src/Runtime/callconventions
new file mode 100644 (file)
index 0000000..6eb59c7
--- /dev/null
@@ -0,0 +1,10 @@
+All calling conventions for native methods are #defines
+
+We support two types of garbage collectors
+1. Precise Garbage Collector
+2. Conservative Garbage Collector
+
+CALLXX => CALL, no of additional parameters other than the name, no of parameters for garbage collection
+for e.g. 
+CALL11(x, y, y)
+call, 1= number of parameters i.e. y,  1= No of parameters to be garbage collected i.e. y