Fixed : crashing due to pile creation
authoradash <adash>
Wed, 19 Sep 2007 21:15:03 +0000 (21:15 +0000)
committeradash <adash>
Wed, 19 Sep 2007 21:15:03 +0000 (21:15 +0000)
Changed locks to have recursive attributes

Robust/src/Runtime/DSTM/interface/mcpileq.c
Robust/src/Runtime/DSTM/interface/mcpileq.h
Robust/src/Runtime/DSTM/interface/prelookup.c
Robust/src/Runtime/DSTM/interface/queue.c
Robust/src/Runtime/DSTM/interface/queue.h
Robust/src/Runtime/DSTM/interface/trans.c

index 38941ef5289dd3b9ea317bd5d3405844df4e098a..d6da34f318683f5487512b62bdc243b9d73fdd6a 100644 (file)
@@ -5,23 +5,14 @@ mcpileq_t mcqueue;
 void mcpileqInit(void) {
        /* Initialize machine queue that containing prefetch oids and offset values  sorted by remote machineid */  
        mcqueue.front = mcqueue.rear = NULL;
-       pthread_mutex_init(&mcqueue.qlock, NULL); 
+       //Intiliaze and set machile pile queue's mutex attribute
+       pthread_mutexattr_init(&mcqueue.qlockattr);
+       pthread_mutexattr_settype(&mcqueue.qlockattr, PTHREAD_MUTEX_RECURSIVE_NP);
+       //pthread_mutex_init(&mcqueue.qlock, NULL); 
+       pthread_mutex_init(&mcqueue.qlock,&mcqueue.qlockattr); 
        pthread_cond_init(&mcqueue.qcond, NULL); 
 }
 
-/* Insert to the rear of machine pile queue */
-/*
-void mcpileenqueue(prefetchpile_t *node) {
-       if(mcqueue.front == NULL && mcqueue.rear == NULL) {
-               mcqueue.front = mcqueue.rear = node;
-       } else {
-               node->next = NULL;
-               mcqueue.rear->next = node;
-               mcqueue.rear = node;
-       }
-}
-*/
-
 /* Insert to the rear of machine pile queue */
 void mcpileenqueue(prefetchpile_t *node) {
        prefetchpile_t *tmp, *prev;
index 8291c5919e56e3f9c9407cc31d9fcc0050efa3e6..8c570d7f0428c26d34f5d8af82a2128a2b94f63d 100644 (file)
@@ -24,6 +24,7 @@ typedef struct prefetchpile {
 typedef struct mcpileq {
        prefetchpile_t *front, *rear;
        pthread_mutex_t qlock;
+       pthread_mutexattr_t qlockattr;
        pthread_cond_t qcond;
 }mcpileq_t;
 
index 64f756ba298760bcb1a4aec234321d11f538b66f..6eda49cf3e256f1cd4587cef622a27536c7bbaa3 100644 (file)
@@ -27,7 +27,7 @@ unsigned int prehashCreate(unsigned int size, float loadfactor) {
        //Initialize mutex var
        pthread_mutex_init(&pflookup.lock, &pflookup.prefetchmutexattr);
        //pthread_mutex_init(&pflookup.lock, NULL);
-        
+       pthread_cond_init(&pflookup.cond, NULL); 
        return 0;
 }
 
index 7a8fbba392ade522054a244ba793079e28d7257a..954a52b5443b063e4f3b82c2f2fc9f5d2aa71002 100644 (file)
@@ -5,7 +5,10 @@ primarypfq_t pqueue; //Global queue
 void queueInit(void) {
        /* Intitialize primary queue */
        pqueue.front = pqueue.rear = NULL;
-       pthread_mutex_init(&pqueue.qlock, NULL);
+       pthread_mutexattr_init(&pqueue.qlockattr);
+       pthread_mutexattr_settype(&pqueue.qlockattr, PTHREAD_MUTEX_RECURSIVE_NP);
+       pthread_mutex_init(&pqueue.qlock, &pqueue.qlockattr);
+       //pthread_mutex_init(&pqueue.qlock, NULL);
        pthread_cond_init(&pqueue.qcond, NULL);
 }
 
index 2a3754dd85dc0afe91394a05bae8568649e8a020..d315135bc0e6fe301e2f78fdb0e5dab034e0574f 100644 (file)
@@ -14,6 +14,7 @@ typedef struct prefetchqelem {
 typedef struct primarypfq {
        prefetchqelem_t *front, *rear;
        pthread_mutex_t qlock;
+       pthread_mutexattr_t qlockattr;
        pthread_cond_t qcond;
 } primarypfq_t; 
 
index c50a10b07ac1a381815df6b65b9b9bf6a1b4b6f0..fd0cd0d6599253b8dab33ac0a3ba0936880dc4b4 100644 (file)
 
 /* Global Variables */
 extern int classsize[];
-extern primarypfq_t pqueue; // shared prefetch queue
+extern primarypfq_t pqueue; //Shared prefetch queue
 extern mcpileq_t mcqueue;  //Shared queue containing prefetch requests sorted by remote machineids 
 objstr_t *prefetchcache; //Global Prefetch cache
 pthread_mutex_t prefetchcache_mutex;// Mutex to lock Prefetch Cache
+pthread_mutexattr_t prefetchcache_mutex_attr; /* Attribute for lock to make it a recursive lock */
 extern pthread_mutex_t mainobjstore_mutex;// Mutex to lock main Object store
 extern prehashtable_t pflookup; //Global Prefetch cache's lookup table
 pthread_t wthreads[NUM_THREADS]; //Worker threads for working on the prefetch queue
@@ -125,7 +126,13 @@ void transInit() {
        int t, rc;
        //Create and initialize prefetch cache structure
        prefetchcache = objstrCreate(PREFETCH_CACHE_SIZE);
-       pthread_mutex_init(&prefetchcache_mutex, NULL);
+
+       /* Initialize attributes for mutex */
+       pthread_mutexattr_init(&prefetchcache_mutex_attr);
+       pthread_mutexattr_settype(&prefetchcache_mutex_attr, PTHREAD_MUTEX_RECURSIVE_NP);
+       
+       pthread_mutex_init(&prefetchcache_mutex, &prefetchcache_mutex_attr);
+
        //Create prefetch cache lookup table
        if(prehashCreate(HASH_SIZE, LOADFACTOR))
                return; //Failure
@@ -207,8 +214,7 @@ objheader_t *transRead(transrecord_t *record, unsigned int oid) {
 #endif
        } else if ((objheader = (objheader_t *) mhashSearch(oid)) != NULL) {
                /* Look up in machine lookup table  and copy  into cache*/
-               tmp = mhashSearch(oid);
-               GETSIZE(size, tmp);
+               GETSIZE(size, objheader);
                size += sizeof(objheader_t);
                //TODO:Lock the local trans cache while copying the object here
                objcopy = objstrAlloc(record->cache, size);
@@ -236,7 +242,7 @@ objheader_t *transRead(transrecord_t *record, unsigned int oid) {
 #endif
        } else {
                /*If object not found in prefetch cache then block until object appears in the prefetch cache */
-               pthread_mutex_lock(&prefetchcache_mutex);
+               pthread_mutex_lock(&pflookup.lock);
                while(!found) {
                        rc = pthread_cond_timedwait(&pflookup.cond, &pflookup.lock, &ts);
                        if(rc == ETIMEDOUT) {
@@ -249,14 +255,14 @@ objheader_t *transRead(transrecord_t *record, unsigned int oid) {
                                        objcopy = objstrAlloc(record->cache, size);
                                        memcpy(objcopy, (void *)tmp, size);
                                        chashInsert(record->lookupTable, OID(tmp), objcopy); 
-                                       pthread_mutex_unlock(&prefetchcache_mutex);
+                                       pthread_mutex_unlock(&pflookup.lock);
 #ifdef COMPILER
                                        return &objcopy[1];
 #else
                                        return objcopy;
 #endif
                                } else {
-                                       pthread_mutex_unlock(&prefetchcache_mutex);
+                                       pthread_mutex_unlock(&pflookup.lock);
                                        break;
                                }
                        }
@@ -535,6 +541,9 @@ int transCommit(transrecord_t *record) {
        /* Retry trans commit procedure if not sucessful in the first try */
        } while (treplyretry == 1);
        
+       /* Free Resources */
+       objstrDelete(record->cache);
+       chashDelete(record->lookupTable);
        free(record);
        return 0;
 }
@@ -677,19 +686,13 @@ void decideResponse(thread_data_array_t *tdata) {
                }
        }
 
-       /* Send Abort */
        if(transdisagree > 0) {
+               /* Send Abort */
                *(tdata->replyctrl) = TRANS_ABORT;
-               /* Free resources */
-               objstrDelete(tdata->rec->cache);
-               chashDelete(tdata->rec->lookupTable);
        } else if(transagree == tdata->buffer->f.mcount){
                /* Send Commit */
                *(tdata->replyctrl) = TRANS_COMMIT;
-               /* Free resources */
-               objstrDelete(tdata->rec->cache);
-               chashDelete(tdata->rec->lookupTable);
-       } else { /* (transsoftabort > 0 && transdisagree == 0) */
+       } else { 
                /* Send Abort in soft abort case followed by retry commiting transaction again*/
                *(tdata->replyctrl) = TRANS_ABORT;
                *(tdata->replyretry) = 1;
@@ -1039,6 +1042,10 @@ int transComProcess(void *modptr, unsigned int *oidmod, unsigned int *oidcreated
                header->version += 1;
        }
 
+       /*If object is in prefetch cache then update it in prefetch cache */ 
+
+
+       /* If object is newly created inside transaction then commit it */
        for (i = 0; i < numcreated; i++)
        {
                int tmpsize;
@@ -1046,7 +1053,6 @@ int transComProcess(void *modptr, unsigned int *oidmod, unsigned int *oidcreated
                mhashInsert(oidcreated[i], (((char *)modptr) + offset));
                GETSIZE(tmpsize, header);
                offset += sizeof(objheader_t) + tmpsize;
-
                lhashInsert(oidcreated[i], myIpAddr);
        }