Minor bug fix for trans abort case
authoradash <adash>
Thu, 20 Sep 2007 08:09:59 +0000 (08:09 +0000)
committeradash <adash>
Thu, 20 Sep 2007 08:09:59 +0000 (08:09 +0000)
Robust/src/Runtime/DSTM/interface/dstmserver.c
Robust/src/Runtime/DSTM/interface/trans.c

index 2cea115cd032679706d0266c1cecca8c681710d1..7b27ce8dfb3ee9b76f1b032e36c625884bf483c7 100644 (file)
@@ -24,6 +24,7 @@ extern int classsize[];
 
 objstr_t *mainobjstore;
 pthread_mutex_t mainobjstore_mutex;
+pthread_mutexattr_t mainobjstore_mutex_attr; /* Attribute for lock to make it a recursive lock */
 
 /* This function initializes the main objects store and creates the 
  * global machine and location lookup table */
@@ -31,7 +32,11 @@ pthread_mutex_t mainobjstore_mutex;
 int dstmInit(void)
 {
        mainobjstore = objstrCreate(DEFAULT_OBJ_STORE_SIZE);
-       pthread_mutex_init(&mainobjstore_mutex, NULL);
+       /* Initialize attribute for mutex */
+       pthread_mutexattr_init(&mainobjstore_mutex_attr);
+       pthread_mutexattr_settype(&mainobjstore_mutex_attr, PTHREAD_MUTEX_RECURSIVE_NP);
+       //pthread_mutex_init(&mainobjstore_mutex, NULL);
+       pthread_mutex_init(&mainobjstore_mutex, &mainobjstore_mutex_attr);
        if (mhashCreate(HASH_SIZE, LOADFACTOR))
                return 1; //failure
        
@@ -357,7 +362,6 @@ int processClientReq(fixed_data_t *fixed, trans_commit_data_t *transinfo,
                        }
 
                        /* Send ack to Coordinator */
-                       printf("DEBUG -> Recv TRANS_ABORT\n");
                        sendctrl = TRANS_SUCESSFUL;
                        if(send((int)acceptfd, &sendctrl, sizeof(char), MSG_NOSIGNAL) < sizeof(char)) {
                                perror("Error sending ACK to coordinator\n");
@@ -377,7 +381,6 @@ int processClientReq(fixed_data_t *fixed, trans_commit_data_t *transinfo,
 
                case TRANS_COMMIT:
                        /* Invoke the transCommit process() */
-                       printf("DEBUG -> Recv TRANS_COMMIT \n");
                        if((val = transCommitProcess(modptr, oidmod, transinfo->objlocked, fixed->nummod, transinfo->numlocked, (int)acceptfd)) != 0) {
                                printf("Error in transCommitProcess %s, %d\n", __FILE__, __LINE__);
                                /* Free memory */
@@ -479,7 +482,6 @@ char handleTransReq(fixed_data_t *fixed, trans_commit_data_t *transinfo, unsigne
                                                perror("Error in sending control to the Coordinator\n");
                                                return 0;
                                        }
-                                       printf("DEBUG -> Sending TRANS_DISAGREE\n");
                                        return control;
                                }
                        } else {/* If Obj is not locked then lock object */
@@ -503,7 +505,10 @@ char handleTransReq(fixed_data_t *fixed, trans_commit_data_t *transinfo, unsigne
                                                perror("Error in sending control to the Coordinator\n");
                                                return 0;
                                        }
-                                       printf("DEBUG -> Sending TRANS_DISAGREE\n");
+                                       if (objlocked > 0) {
+                                               STATUS(((objheader_t *)mobj)) &= ~(LOCK);
+                                               free(oidlocked);
+                                       }
                                        return control;
                                }
                        }
@@ -535,7 +540,6 @@ int decideCtrlMessage(fixed_data_t *fixed, trans_commit_data_t *transinfo, int *
                        perror("Error in sending control to Coordinator\n");
                        return 0;
                }
-               printf("DEBUG -> Sending TRANS_AGREE\n");
        }
        /* Condition to send TRANS_SOFT_ABORT */
        if((*(v_matchlock) > 0 && *(v_nomatch) == 0) || (*(objnotfound) > 0 && *(v_nomatch) == 0)) {
@@ -543,7 +547,6 @@ int decideCtrlMessage(fixed_data_t *fixed, trans_commit_data_t *transinfo, int *
                char msg[]={TRANS_SOFT_ABORT, 0,0,0,0};
                *((int*)&msg[1])= *(objnotfound);
 
-               printf("DEBUG -> Sending TRANS_SOFT_ABORT\n");
                /* Send control message */
                if((val = send(acceptfd, &msg, sizeof(msg),MSG_NOSIGNAL)) < sizeof(msg)) {
                        perror("Error in sending no of objects that are not found\n");
index fd0cd0d6599253b8dab33ac0a3ba0936880dc4b4..6e6caba5399a5c0a73ec0880ce05501f32a7b8e7 100644 (file)
@@ -173,7 +173,8 @@ void randomdelay(void)
        t = time(NULL);
        req.tv_sec = 0;
        req.tv_nsec = (long)(1000000 + (t%10000000)); //1-11 msec
-       nanosleep(&req, &rem);
+       //nanosleep(&req, &rem);
+       nanosleep(&req, NULL);
        return;
 }
 
@@ -203,7 +204,7 @@ objheader_t *transRead(transrecord_t *record, unsigned int oid) {
        rc = gettimeofday(&tp, NULL);
 
        /* Convert from timeval to timespec */
-       ts.tv_nsec = tp.tv_usec * 10;
+       ts.tv_nsec = tp.tv_usec * 1000;
 
        /* Search local transaction cache */
        if((objheader = (objheader_t *)chashSearch(record->lookupTable, oid)) != NULL){
@@ -535,7 +536,7 @@ int transCommit(transrecord_t *record) {
                free(ltdata);
 
                /* wait a random amount of time */
-               if (treplyretry == 1)
+               if (treplyretry == 1) 
                        randomdelay();
 
        /* Retry trans commit procedure if not sucessful in the first try */
@@ -581,7 +582,6 @@ void *transRequest(void *threadarg) {
                pthread_exit(NULL);
        }
 
-       printf("DEBUG-> trans.c Sending TRANS_REQUEST to mid %s\n", machineip);
        /* Send bytes of data with TRANS_REQUEST control message */
        if (send(sd, &(tdata->buffer->f), sizeof(fixed_data_t),MSG_NOSIGNAL) < sizeof(fixed_data_t)) {
                perror("Error sending fixed bytes for thread\n");
@@ -700,7 +700,9 @@ void decideResponse(thread_data_array_t *tdata) {
 
        return;
 }
-/* This function sends the final response to remote machines per thread in their respective socket id */
+/* This function sends the final response to remote machines per thread in their respective socket id 
+ * It returns a char that is only needed to check the correctness of execution of this function inside
+ * transRequest()*/
 char sendResponse(thread_data_array_t *tdata, int sd) {
        int n, N, sum, oidcount = 0;
        char *ptr, retval = 0;
@@ -1007,7 +1009,6 @@ int transAbortProcess(void *modptr, unsigned int *objlocked, int numlocked, int
        }
 
        /* Send ack to Coordinator */
-       printf("TRANS_SUCCESSFUL\n");
 
        /*Free the pointer */
        ptr = NULL;
@@ -1329,6 +1330,7 @@ void *transPrefetch(void *t) {
                /* dequeue node to create a machine piles and  finally unlock mutex */
                if((qnode = pre_dequeue()) == NULL) {
                        printf("Error: No node returned %s, %d\n", __FILE__, __LINE__);
+                       pthread_mutex_unlock(&pqueue.qlock);
                        pthread_exit(NULL);
                }
                pthread_mutex_unlock(&pqueue.qlock);
@@ -1372,6 +1374,7 @@ void *mcqProcess(void *threadid) {
                /* Dequeue node to send remote machine connections*/
                if((mcpilenode = mcpiledequeue()) == NULL) {
                        printf("Dequeue Error: No node returned %s %d\n", __FILE__, __LINE__);
+                       pthread_mutex_unlock(&mcqueue.qlock);
                        pthread_exit(NULL);
                }
                /* Unlock mutex */
@@ -1518,19 +1521,20 @@ void getPrefetchResponse(int count, int sd) {
                                                        prehashInsert(oid, modptr);
                                                } else if(((objheader_t *)oldptr)->version == ((objheader_t *)modptr)->version) { 
                                                        /* Add the new object ptr to hash table */
+                                                       prehashRemove(oid);
                                                        prehashInsert(oid, modptr);
-                                               } else { /* Do nothing */
+                                               } else { /* Do nothing: TODO modptr should be reference counted */
                                                        ;
                                                }
                                        } else {/*If doesn't no match found in hashtable, add the object ptr to hash table*/
                                                prehashInsert(oid, modptr);
                                        }
                                        /* Lock the Prefetch Cache look up table*/
-                                       pthread_mutex_lock(&pflookup.lock);
+                                       //pthread_mutex_lock(&pflookup.lock);
                                        /* Broadcast signal on prefetch cache condition variable */ 
                                        pthread_cond_broadcast(&pflookup.cond);
                                        /* Unlock the Prefetch Cache look up table*/
-                                       pthread_mutex_unlock(&pflookup.lock);
+                                       //pthread_mutex_unlock(&pflookup.lock);
                                } else if(buffer[index] == OBJECT_NOT_FOUND) {
                                        /* Increment it to get the object */
                                        /* TODO: For each object not found query DHT for new location and retrieve the object */