Minor bug fix for trans abort case
[IRC.git] / Robust / src / Runtime / DSTM / interface / trans.c
index fd0cd0d6599253b8dab33ac0a3ba0936880dc4b4..6e6caba5399a5c0a73ec0880ce05501f32a7b8e7 100644 (file)
@@ -173,7 +173,8 @@ void randomdelay(void)
        t = time(NULL);
        req.tv_sec = 0;
        req.tv_nsec = (long)(1000000 + (t%10000000)); //1-11 msec
-       nanosleep(&req, &rem);
+       //nanosleep(&req, &rem);
+       nanosleep(&req, NULL);
        return;
 }
 
@@ -203,7 +204,7 @@ objheader_t *transRead(transrecord_t *record, unsigned int oid) {
        rc = gettimeofday(&tp, NULL);
 
        /* Convert from timeval to timespec */
-       ts.tv_nsec = tp.tv_usec * 10;
+       ts.tv_nsec = tp.tv_usec * 1000;
 
        /* Search local transaction cache */
        if((objheader = (objheader_t *)chashSearch(record->lookupTable, oid)) != NULL){
@@ -535,7 +536,7 @@ int transCommit(transrecord_t *record) {
                free(ltdata);
 
                /* wait a random amount of time */
-               if (treplyretry == 1)
+               if (treplyretry == 1) 
                        randomdelay();
 
        /* Retry trans commit procedure if not sucessful in the first try */
@@ -581,7 +582,6 @@ void *transRequest(void *threadarg) {
                pthread_exit(NULL);
        }
 
-       printf("DEBUG-> trans.c Sending TRANS_REQUEST to mid %s\n", machineip);
        /* Send bytes of data with TRANS_REQUEST control message */
        if (send(sd, &(tdata->buffer->f), sizeof(fixed_data_t),MSG_NOSIGNAL) < sizeof(fixed_data_t)) {
                perror("Error sending fixed bytes for thread\n");
@@ -700,7 +700,9 @@ void decideResponse(thread_data_array_t *tdata) {
 
        return;
 }
-/* This function sends the final response to remote machines per thread in their respective socket id */
+/* This function sends the final response to remote machines per thread in their respective socket id 
+ * It returns a char that is only needed to check the correctness of execution of this function inside
+ * transRequest()*/
 char sendResponse(thread_data_array_t *tdata, int sd) {
        int n, N, sum, oidcount = 0;
        char *ptr, retval = 0;
@@ -1007,7 +1009,6 @@ int transAbortProcess(void *modptr, unsigned int *objlocked, int numlocked, int
        }
 
        /* Send ack to Coordinator */
-       printf("TRANS_SUCCESSFUL\n");
 
        /*Free the pointer */
        ptr = NULL;
@@ -1329,6 +1330,7 @@ void *transPrefetch(void *t) {
                /* dequeue node to create a machine piles and  finally unlock mutex */
                if((qnode = pre_dequeue()) == NULL) {
                        printf("Error: No node returned %s, %d\n", __FILE__, __LINE__);
+                       pthread_mutex_unlock(&pqueue.qlock);
                        pthread_exit(NULL);
                }
                pthread_mutex_unlock(&pqueue.qlock);
@@ -1372,6 +1374,7 @@ void *mcqProcess(void *threadid) {
                /* Dequeue node to send remote machine connections*/
                if((mcpilenode = mcpiledequeue()) == NULL) {
                        printf("Dequeue Error: No node returned %s %d\n", __FILE__, __LINE__);
+                       pthread_mutex_unlock(&mcqueue.qlock);
                        pthread_exit(NULL);
                }
                /* Unlock mutex */
@@ -1518,19 +1521,20 @@ void getPrefetchResponse(int count, int sd) {
                                                        prehashInsert(oid, modptr);
                                                } else if(((objheader_t *)oldptr)->version == ((objheader_t *)modptr)->version) { 
                                                        /* Add the new object ptr to hash table */
+                                                       prehashRemove(oid);
                                                        prehashInsert(oid, modptr);
-                                               } else { /* Do nothing */
+                                               } else { /* Do nothing: TODO modptr should be reference counted */
                                                        ;
                                                }
                                        } else {/*If doesn't no match found in hashtable, add the object ptr to hash table*/
                                                prehashInsert(oid, modptr);
                                        }
                                        /* Lock the Prefetch Cache look up table*/
-                                       pthread_mutex_lock(&pflookup.lock);
+                                       //pthread_mutex_lock(&pflookup.lock);
                                        /* Broadcast signal on prefetch cache condition variable */ 
                                        pthread_cond_broadcast(&pflookup.cond);
                                        /* Unlock the Prefetch Cache look up table*/
-                                       pthread_mutex_unlock(&pflookup.lock);
+                                       //pthread_mutex_unlock(&pflookup.lock);
                                } else if(buffer[index] == OBJECT_NOT_FOUND) {
                                        /* Increment it to get the object */
                                        /* TODO: For each object not found query DHT for new location and retrieve the object */