several bug fixes. prefetching works so far, as well as starting remote threads.
authorerubow <erubow>
Sat, 15 Sep 2007 00:23:17 +0000 (00:23 +0000)
committererubow <erubow>
Sat, 15 Sep 2007 00:23:17 +0000 (00:23 +0000)
Robust/src/Runtime/DSTM/interface/dstmserver.c
Robust/src/Runtime/DSTM/interface/machinepile.c
Robust/src/Runtime/DSTM/interface/machinepile.h
Robust/src/Runtime/DSTM/interface/mcpileq.c
Robust/src/Runtime/DSTM/interface/queue.c
Robust/src/Runtime/DSTM/interface/trans.c

index 72997705db93fb8d73b85e8fe6edf0cb11f314a6..4c3758f806871892636d4b7f418244eb358ff46b 100644 (file)
 extern int classsize[];
 
 objstr_t *mainobjstore;
+pthread_mutex_t mainobjstore_mutex;
 
 /* This function initializes the main objects store and creates the 
  * global machine and location lookup table */
 
 int dstmInit(void)
 {
-       mainobjstore = objstrCreate(DEFAULT_OBJ_STORE_SIZE);    
+       mainobjstore = objstrCreate(DEFAULT_OBJ_STORE_SIZE);
+       pthread_mutex_init(&mainobjstore_mutex, NULL);
        if (mhashCreate(HASH_SIZE, LOADFACTOR))
                return 1; //failure
        
@@ -252,10 +254,13 @@ int readClientReq(trans_commit_data_t *transinfo, int acceptfd) {
        if(fixed.nummod != 0) { // If pile contains more than one modified object,
                                // allocate new object store and recv all modified objects
                                // TODO deallocate this space
+               pthread_mutex_lock(&mainobjstore_mutex);
                if ((modptr = objstrAlloc(mainobjstore, fixed.sum_bytes)) == NULL) {
                        printf("objstrAlloc error for modified objects %s, %d\n", __FILE__, __LINE__);
+                       pthread_mutex_unlock(&mainobjstore_mutex);
                        return 1;
                }
+               pthread_mutex_unlock(&mainobjstore_mutex);
                sum = 0;
                do { // Recv the objs that are modified by the Coordinator
                        n = recv((int)acceptfd, (char *) modptr+sum, fixed.sum_bytes-sum, 0);
@@ -575,6 +580,7 @@ int prefetchReq(int acceptfd) {
        unsigned int objoid;
        char *header, control;
        objheader_t * head;
+       int bytesRecvd;
        
        /* Repeatedly recv the oid and offset pairs sent for prefetch */
        while(numbytes = recv((int)acceptfd, &length, sizeof(int), 0) != 0) {
@@ -585,7 +591,11 @@ int prefetchReq(int acceptfd) {
                index = sizeof(unsigned int); // Index starts with sizeof  unsigned int because the 
                                              // first 4 bytes are saved to send the
                                              // size of the buffer (that is computed at the end of the loop)
-               oid = recv((int)acceptfd, &oid, sizeof(unsigned int), 0);
+               bytesRecvd = 0;
+               do {
+                       bytesRecvd += recv((int)acceptfd, (char *)&oid +bytesRecvd,
+                                       sizeof(unsigned int) - bytesRecvd, 0);
+               } while (bytesRecvd < sizeof(unsigned int));
                numoffset = (length - (sizeof(int) + sizeof(unsigned int)))/ sizeof(short);
                N = numoffset * sizeof(short);
                short offset[numoffset];
@@ -657,9 +667,9 @@ int prefetchReq(int acceptfd) {
                }
 
                /* Add the buffer size into buffer as a parameter */
-               memcpy(buffer, &index, sizeof(unsigned int));
+               *((unsigned int *)buffer)=index;
                /* Send the entire buffer with its size and oids found and not found */
-               if(send((int)acceptfd, &buffer, sizeof(index - 1), MSG_NOSIGNAL) < sizeof(index -1)) {
+               if(send((int)acceptfd, &buffer, index, MSG_NOSIGNAL) < sizeof(index -1)) {
                        perror("Error sending oids found\n");
                        return 1;
                }
index 9b5c416f4975d0b5720bfabff8a39ac5b778177b..ec5e9ba9cbff19c1bebd2d0ad6b92b8a808ab274 100644 (file)
@@ -1,7 +1,7 @@
 #include "machinepile.h"
 
-int insertPile(int mid, unsigned int oid, short numoffset, short *offset, prefetchpile_t *head) {
-       prefetchpile_t *tmp = head;
+int insertPile(int mid, unsigned int oid, short numoffset, short *offset, prefetchpile_t **head) {
+       prefetchpile_t *tmp = *head;
        objpile_t *objnode;
        unsigned int *oidarray;
        int ntuples;
@@ -40,14 +40,10 @@ int insertPile(int mid, unsigned int oid, short numoffset, short *offset, prefet
                objnode->offset = offset;
                objnode->next = tmp->objpiles; // i.e., objnode->next = NULL;
                tmp->objpiles = objnode;
-               tmp->next = head;
-               head = tmp;
+               tmp->next = *head;
+               *head = tmp;
        }
        return 0;
 }
 
-//TODO
-int deletePile() {
 
-       return 0;
-}
index 70fd47fb331083f2f2f14fa86b44cc11cf0ac690..7d98b2144233d640faf10dd2e11bec3145f0ff32 100644 (file)
@@ -5,7 +5,6 @@
 #include <stdio.h>
 #include <stdlib.h>
 
-int insertPile(int, unsigned int, short, short *, prefetchpile_t *);
-int deletePile();
+int insertPile(int, unsigned int, short, short *, prefetchpile_t **);
 
 #endif
index 27081a0890fa302f89492ba384e2e23753e87c08..0555e8c451e81aad07b993ec9dc64e26af416dee 100644 (file)
@@ -26,12 +26,13 @@ void mcpileenqueue(prefetchpile_t *node) {
 void mcpileenqueue(prefetchpile_t *node) {
        prefetchpile_t *tmp, *prev;
        if(mcqueue.front == NULL && mcqueue.rear == NULL) {
-               tmp = mcqueue.front = node;
+               mcqueue.front = mcqueue.rear = node;
+               /*tmp = mcqueue.front = node;
                while(tmp != NULL) {
                        prev = tmp;
                        tmp = tmp->next;
                }
-               mcqueue.rear = prev;
+               mcqueue.rear = prev;*/
        } else {
                tmp = mcqueue.rear->next = node;
                while(tmp != NULL) {
@@ -51,6 +52,8 @@ prefetchpile_t *mcpiledequeue(void) {
        }
        retnode = mcqueue.front;
        mcqueue.front = mcqueue.front->next;
+       if (mcqueue.front == NULL)
+               mcqueue.rear = NULL;
        retnode->next = NULL;
 
        return retnode;
@@ -92,16 +95,30 @@ void mcpiledisplay() {
        }
 }
 
+/* Delete prefetchpile_t and everything it points to */
 void mcdealloc(prefetchpile_t *node) {
-       /* Remove the offset ptr and linked lists of objpile_t */
-       objpile_t *delnode;
-       while(node->objpiles != NULL) {
-               node->objpiles->offset = NULL;
-               delnode = node->objpiles;
-               node->objpiles = node->objpiles->next;
-               free(delnode);
-               node->objpiles->next = NULL;
+       prefetchpile_t *prefetchpile_ptr;
+       prefetchpile_t *prefetchpile_next_ptr;
+       objpile_t *objpile_ptr;
+       objpile_t *objpile_next_ptr;
+
+       prefetchpile_ptr = node;
+
+       while (prefetchpile_ptr != NULL)
+       {
+               objpile_ptr = prefetchpile_ptr->objpiles;
+               while (objpile_ptr != NULL)
+               {
+                       if (objpile_ptr->numoffset > 0)
+                               free(objpile_ptr->offset);
+                       objpile_next_ptr = objpile_ptr->next;
+                       free(objpile_ptr);
+                       objpile_ptr = objpile_next_ptr;
+               }
+               prefetchpile_next_ptr = prefetchpile_ptr->next;
+               free(prefetchpile_ptr);
+               prefetchpile_ptr = prefetchpile_next_ptr;
        }
-       free(node);
-       node->next = NULL;
 }
+
+
index 5ff02329df6d183b7da891fb8a97e722cddcfb73..373de7492dfb8730f682987ffc5eb3793e96e076 100644 (file)
@@ -16,13 +16,11 @@ void delqnode() {
                printf("The queue is empty: UNDERFLOW %s, %d\n", __FILE__, __LINE__);
                return;
        } else if ((pqueue.front == pqueue.rear) && pqueue.front != NULL && pqueue.rear != NULL) {
-               printf("TEST1\n");
                free(pqueue.front);
                pqueue.front = pqueue.rear = NULL;
        } else {
                delnode = pqueue.front;
                pqueue.front = pqueue.front->next;
-               printf("TEST2\n");
                free(delnode);
        }
 }
@@ -54,6 +52,8 @@ prefetchqelem_t *pre_dequeue(void) {
        }
        retnode = pqueue.front;
        pqueue.front = pqueue.front->next;
+       if (pqueue.front == NULL)
+               pqueue.rear = NULL;
 
        return retnode;
 }
@@ -68,7 +68,6 @@ void queueDisplay() {
                ptr1 = (char *) tmp;
                ptr = (int *)(ptr1 + offset);
                ntuples = *ptr;
-               printf("Number of tuples = %d\n", ntuples);
                tmp = tmp->next;
        }
 }
index 3083210ef9651434826dd1d7c163837a819d06af..e8e16c9e6d8e61d2b3025520ca2e4d377a23a1f8 100644 (file)
@@ -29,7 +29,9 @@
 extern int classsize[];
 extern primarypfq_t pqueue; // shared prefetch queue
 extern mcpileq_t mcqueue;  //Shared queue containing prefetch requests sorted by remote machineids 
-objstr_t *prefetchcache; //Global Prefetch cache 
+objstr_t *prefetchcache; //Global Prefetch cache
+pthread_mutex_t prefetchcache_mutex;
+extern pthread_mutex_t mainobjstore_mutex;
 extern prehashtable_t pflookup; //Global Prefetch cache's lookup table
 pthread_t wthreads[NUM_THREADS]; //Worker threads for working on the prefetch queue
 pthread_t tPrefetch;
@@ -123,6 +125,7 @@ void transInit() {
        int t, rc;
        //Create and initialize prefetch cache structure
        prefetchcache = objstrCreate(PREFETCH_CACHE_SIZE);
+       pthread_mutex_init(&prefetchcache_mutex, NULL);
        //Create prefetch cache lookup table
        if(prehashCreate(HASH_SIZE, LOADFACTOR))
                return; //Failure
@@ -798,10 +801,13 @@ void *handleLocalReq(void *threadarg) {
 
        /* modptr points to the beginning of the object store 
         * created at the Pariticipant */ 
+       pthread_mutex_lock(&mainobjstore_mutex);
        if ((modptr = objstrAlloc(mainobjstore, localtdata->tdata->buffer->f.sum_bytes)) == NULL) {
                printf("objstrAlloc error for modified objects %s, %d\n", __FILE__, __LINE__);
+               pthread_mutex_unlock(&mainobjstore_mutex);
                return NULL;
        }
+       pthread_mutex_unlock(&mainobjstore_mutex);
        /* Write modified objects into the mainobject store */
        for(i = 0; i< localtdata->tdata->buffer->f.nummod; i++) {
                headeraddr = chashSearch(localtdata->tdata->rec->lookupTable, localtdata->tdata->buffer->oidmod[i]);
@@ -1186,9 +1192,8 @@ prefetchpile_t *makePreGroups(prefetchqelem_t *node, int *numoffset) {
                }
                /* Insert into machine pile */
                offset = &arryfields[endoffsets[i-1]];
-               insertPile(machinenum, oid[i], numoffset[i], offset, head);
+               insertPile(machinenum, oid[i], numoffset[i], offset, &head);
        }
-
        return head;
 }
 
@@ -1338,7 +1343,7 @@ void *mcqProcess(void *threadid) {
 }
 
 void sendPrefetchReq(prefetchpile_t *mcpilenode, int threadid) {
-       int sd, i, offset, off, len, endpair, numoffsets, count = 0;
+       int sd, i, offset, off, len, endpair, count = 0;
        struct sockaddr_in serv_addr;
        struct hostent *server;
        char machineip[16], control;
@@ -1382,9 +1387,9 @@ void sendPrefetchReq(prefetchpile_t *mcpilenode, int threadid) {
                off = sizeof(int);
                memcpy(oidnoffset + off, &tmp->oid, sizeof(unsigned int));
                off += sizeof(unsigned int);
-               for(i = 0; i < numoffsets; i++) {
-                       offset = off +  (i * sizeof(short));
-                       memcpy(oidnoffset + offset, tmp->offset, sizeof(short));
+               for(i = 0; i < tmp->numoffset; i++) {
+                       memcpy(oidnoffset + off, &tmp->offset[i], sizeof(short));
+                       off+=sizeof(short);
                }
                if (send(sd, &oidnoffset, sizeof(oidnoffset),MSG_NOSIGNAL) < sizeof(oidnoffset)) {
                        perror("Error sending fixed bytes for thread\n");
@@ -1446,16 +1451,17 @@ void getPrefetchResponse(int count, int sd) {
                                        index += sizeof(char);
                                        memcpy(&oid, buffer + index, sizeof(unsigned int));
                                        index += sizeof(unsigned int);
-                                       /* Lock the Prefetch Cache look up table*/
-                                       pthread_mutex_lock(&pflookup.lock);
                                        /* For each object found add to Prefetch Cache */
                                        memcpy(&objsize, buffer + index, sizeof(int));
+                                       pthread_mutex_lock(&prefetchcache_mutex);
                                        if ((modptr = objstrAlloc(prefetchcache, objsize)) == NULL) {
                                                printf("objstrAlloc error for copying into prefetch cache %s, %d\n", __FILE__, __LINE__);
+                                               pthread_mutex_unlock(&prefetchcache_mutex);
                                                return;
                                        }
+                                       pthread_mutex_unlock(&prefetchcache_mutex);
                                        memcpy(modptr, buffer+index, objsize);
-                                       index += sizeof(int);
+                                       index += objsize;
                                        /* Insert the oid and its address into the prefetch hash lookup table */
                                        /* Do a version comparison if the oid exists */
                                        if((oldptr = prehashSearch(oid)) != NULL) {
@@ -1472,6 +1478,8 @@ void getPrefetchResponse(int count, int sd) {
                                        } else {/*If doesn't no match found in hashtable, add the object ptr to hash table*/
                                                prehashInsert(oid, modptr);
                                        }
+                                       /* Lock the Prefetch Cache look up table*/
+                                       pthread_mutex_lock(&pflookup.lock);
                                        /* Broadcast signal on prefetch cache condition variable */ 
                                        pthread_cond_broadcast(&pflookup.cond);
                                        /* Unlock the Prefetch Cache look up table*/
@@ -1485,8 +1493,10 @@ void getPrefetchResponse(int count, int sd) {
                                        /* Throw an error */
                                        printf("OBJECT NOT FOUND.... THIS SHOULD NOT HAPPEN...TERMINATE PROGRAM\n");
                                        exit(-1);
-                               } else 
+                               } else {
                                        printf("Error in decoding the index value %s, %d\n",__FILE__, __LINE__);
+                                       return;
+                               }
                        }
 
                        i++;
@@ -1506,10 +1516,12 @@ unsigned short getObjType(unsigned int oid)
                if ((objheader = (objheader_t *) prehashSearch(oid)) == NULL)
                {
                        prefetch(1, &oid, &numoffsets, NULL);
-                       pthread_mutex_lock(&pflookup.lock);
                        while ((objheader = (objheader_t *) prehashSearch(oid)) == NULL)
+                       {
+                               pthread_mutex_lock(&pflookup.lock);
                                pthread_cond_wait(&pflookup.cond, &pflookup.lock);
-                       pthread_mutex_unlock(&pflookup.lock);
+                               pthread_mutex_unlock(&pflookup.lock);
+                       }
                }
        }