From: erubow Date: Sat, 15 Sep 2007 00:23:17 +0000 (+0000) Subject: several bug fixes. prefetching works so far, as well as starting remote threads. X-Git-Tag: preEdgeChange~440 X-Git-Url: http://plrg.eecs.uci.edu/git/?a=commitdiff_plain;h=816d114debd2cf1ab71c5cf39d4b18941c968bfa;p=IRC.git several bug fixes. prefetching works so far, as well as starting remote threads. --- diff --git a/Robust/src/Runtime/DSTM/interface/dstmserver.c b/Robust/src/Runtime/DSTM/interface/dstmserver.c index 72997705..4c3758f8 100644 --- a/Robust/src/Runtime/DSTM/interface/dstmserver.c +++ b/Robust/src/Runtime/DSTM/interface/dstmserver.c @@ -23,13 +23,15 @@ extern int classsize[]; objstr_t *mainobjstore; +pthread_mutex_t mainobjstore_mutex; /* This function initializes the main objects store and creates the * global machine and location lookup table */ int dstmInit(void) { - mainobjstore = objstrCreate(DEFAULT_OBJ_STORE_SIZE); + mainobjstore = objstrCreate(DEFAULT_OBJ_STORE_SIZE); + pthread_mutex_init(&mainobjstore_mutex, NULL); if (mhashCreate(HASH_SIZE, LOADFACTOR)) return 1; //failure @@ -252,10 +254,13 @@ int readClientReq(trans_commit_data_t *transinfo, int acceptfd) { if(fixed.nummod != 0) { // If pile contains more than one modified object, // allocate new object store and recv all modified objects // TODO deallocate this space + pthread_mutex_lock(&mainobjstore_mutex); if ((modptr = objstrAlloc(mainobjstore, fixed.sum_bytes)) == NULL) { printf("objstrAlloc error for modified objects %s, %d\n", __FILE__, __LINE__); + pthread_mutex_unlock(&mainobjstore_mutex); return 1; } + pthread_mutex_unlock(&mainobjstore_mutex); sum = 0; do { // Recv the objs that are modified by the Coordinator n = recv((int)acceptfd, (char *) modptr+sum, fixed.sum_bytes-sum, 0); @@ -575,6 +580,7 @@ int prefetchReq(int acceptfd) { unsigned int objoid; char *header, control; objheader_t * head; + int bytesRecvd; /* Repeatedly recv the oid and offset pairs sent for prefetch */ while(numbytes = recv((int)acceptfd, &length, sizeof(int), 0) != 0) { @@ -585,7 +591,11 @@ int prefetchReq(int acceptfd) { index = sizeof(unsigned int); // Index starts with sizeof unsigned int because the // first 4 bytes are saved to send the // size of the buffer (that is computed at the end of the loop) - oid = recv((int)acceptfd, &oid, sizeof(unsigned int), 0); + bytesRecvd = 0; + do { + bytesRecvd += recv((int)acceptfd, (char *)&oid +bytesRecvd, + sizeof(unsigned int) - bytesRecvd, 0); + } while (bytesRecvd < sizeof(unsigned int)); numoffset = (length - (sizeof(int) + sizeof(unsigned int)))/ sizeof(short); N = numoffset * sizeof(short); short offset[numoffset]; @@ -657,9 +667,9 @@ int prefetchReq(int acceptfd) { } /* Add the buffer size into buffer as a parameter */ - memcpy(buffer, &index, sizeof(unsigned int)); + *((unsigned int *)buffer)=index; /* Send the entire buffer with its size and oids found and not found */ - if(send((int)acceptfd, &buffer, sizeof(index - 1), MSG_NOSIGNAL) < sizeof(index -1)) { + if(send((int)acceptfd, &buffer, index, MSG_NOSIGNAL) < sizeof(index -1)) { perror("Error sending oids found\n"); return 1; } diff --git a/Robust/src/Runtime/DSTM/interface/machinepile.c b/Robust/src/Runtime/DSTM/interface/machinepile.c index 9b5c416f..ec5e9ba9 100644 --- a/Robust/src/Runtime/DSTM/interface/machinepile.c +++ b/Robust/src/Runtime/DSTM/interface/machinepile.c @@ -1,7 +1,7 @@ #include "machinepile.h" -int insertPile(int mid, unsigned int oid, short numoffset, short *offset, prefetchpile_t *head) { - prefetchpile_t *tmp = head; +int insertPile(int mid, unsigned int oid, short numoffset, short *offset, prefetchpile_t **head) { + prefetchpile_t *tmp = *head; objpile_t *objnode; unsigned int *oidarray; int ntuples; @@ -40,14 +40,10 @@ int insertPile(int mid, unsigned int oid, short numoffset, short *offset, prefet objnode->offset = offset; objnode->next = tmp->objpiles; // i.e., objnode->next = NULL; tmp->objpiles = objnode; - tmp->next = head; - head = tmp; + tmp->next = *head; + *head = tmp; } return 0; } -//TODO -int deletePile() { - return 0; -} diff --git a/Robust/src/Runtime/DSTM/interface/machinepile.h b/Robust/src/Runtime/DSTM/interface/machinepile.h index 70fd47fb..7d98b214 100644 --- a/Robust/src/Runtime/DSTM/interface/machinepile.h +++ b/Robust/src/Runtime/DSTM/interface/machinepile.h @@ -5,7 +5,6 @@ #include #include -int insertPile(int, unsigned int, short, short *, prefetchpile_t *); -int deletePile(); +int insertPile(int, unsigned int, short, short *, prefetchpile_t **); #endif diff --git a/Robust/src/Runtime/DSTM/interface/mcpileq.c b/Robust/src/Runtime/DSTM/interface/mcpileq.c index 27081a08..0555e8c4 100644 --- a/Robust/src/Runtime/DSTM/interface/mcpileq.c +++ b/Robust/src/Runtime/DSTM/interface/mcpileq.c @@ -26,12 +26,13 @@ void mcpileenqueue(prefetchpile_t *node) { void mcpileenqueue(prefetchpile_t *node) { prefetchpile_t *tmp, *prev; if(mcqueue.front == NULL && mcqueue.rear == NULL) { - tmp = mcqueue.front = node; + mcqueue.front = mcqueue.rear = node; + /*tmp = mcqueue.front = node; while(tmp != NULL) { prev = tmp; tmp = tmp->next; } - mcqueue.rear = prev; + mcqueue.rear = prev;*/ } else { tmp = mcqueue.rear->next = node; while(tmp != NULL) { @@ -51,6 +52,8 @@ prefetchpile_t *mcpiledequeue(void) { } retnode = mcqueue.front; mcqueue.front = mcqueue.front->next; + if (mcqueue.front == NULL) + mcqueue.rear = NULL; retnode->next = NULL; return retnode; @@ -92,16 +95,30 @@ void mcpiledisplay() { } } +/* Delete prefetchpile_t and everything it points to */ void mcdealloc(prefetchpile_t *node) { - /* Remove the offset ptr and linked lists of objpile_t */ - objpile_t *delnode; - while(node->objpiles != NULL) { - node->objpiles->offset = NULL; - delnode = node->objpiles; - node->objpiles = node->objpiles->next; - free(delnode); - node->objpiles->next = NULL; + prefetchpile_t *prefetchpile_ptr; + prefetchpile_t *prefetchpile_next_ptr; + objpile_t *objpile_ptr; + objpile_t *objpile_next_ptr; + + prefetchpile_ptr = node; + + while (prefetchpile_ptr != NULL) + { + objpile_ptr = prefetchpile_ptr->objpiles; + while (objpile_ptr != NULL) + { + if (objpile_ptr->numoffset > 0) + free(objpile_ptr->offset); + objpile_next_ptr = objpile_ptr->next; + free(objpile_ptr); + objpile_ptr = objpile_next_ptr; + } + prefetchpile_next_ptr = prefetchpile_ptr->next; + free(prefetchpile_ptr); + prefetchpile_ptr = prefetchpile_next_ptr; } - free(node); - node->next = NULL; } + + diff --git a/Robust/src/Runtime/DSTM/interface/queue.c b/Robust/src/Runtime/DSTM/interface/queue.c index 5ff02329..373de749 100644 --- a/Robust/src/Runtime/DSTM/interface/queue.c +++ b/Robust/src/Runtime/DSTM/interface/queue.c @@ -16,13 +16,11 @@ void delqnode() { printf("The queue is empty: UNDERFLOW %s, %d\n", __FILE__, __LINE__); return; } else if ((pqueue.front == pqueue.rear) && pqueue.front != NULL && pqueue.rear != NULL) { - printf("TEST1\n"); free(pqueue.front); pqueue.front = pqueue.rear = NULL; } else { delnode = pqueue.front; pqueue.front = pqueue.front->next; - printf("TEST2\n"); free(delnode); } } @@ -54,6 +52,8 @@ prefetchqelem_t *pre_dequeue(void) { } retnode = pqueue.front; pqueue.front = pqueue.front->next; + if (pqueue.front == NULL) + pqueue.rear = NULL; return retnode; } @@ -68,7 +68,6 @@ void queueDisplay() { ptr1 = (char *) tmp; ptr = (int *)(ptr1 + offset); ntuples = *ptr; - printf("Number of tuples = %d\n", ntuples); tmp = tmp->next; } } diff --git a/Robust/src/Runtime/DSTM/interface/trans.c b/Robust/src/Runtime/DSTM/interface/trans.c index 3083210e..e8e16c9e 100644 --- a/Robust/src/Runtime/DSTM/interface/trans.c +++ b/Robust/src/Runtime/DSTM/interface/trans.c @@ -29,7 +29,9 @@ extern int classsize[]; extern primarypfq_t pqueue; // shared prefetch queue extern mcpileq_t mcqueue; //Shared queue containing prefetch requests sorted by remote machineids -objstr_t *prefetchcache; //Global Prefetch cache +objstr_t *prefetchcache; //Global Prefetch cache +pthread_mutex_t prefetchcache_mutex; +extern pthread_mutex_t mainobjstore_mutex; extern prehashtable_t pflookup; //Global Prefetch cache's lookup table pthread_t wthreads[NUM_THREADS]; //Worker threads for working on the prefetch queue pthread_t tPrefetch; @@ -123,6 +125,7 @@ void transInit() { int t, rc; //Create and initialize prefetch cache structure prefetchcache = objstrCreate(PREFETCH_CACHE_SIZE); + pthread_mutex_init(&prefetchcache_mutex, NULL); //Create prefetch cache lookup table if(prehashCreate(HASH_SIZE, LOADFACTOR)) return; //Failure @@ -798,10 +801,13 @@ void *handleLocalReq(void *threadarg) { /* modptr points to the beginning of the object store * created at the Pariticipant */ + pthread_mutex_lock(&mainobjstore_mutex); if ((modptr = objstrAlloc(mainobjstore, localtdata->tdata->buffer->f.sum_bytes)) == NULL) { printf("objstrAlloc error for modified objects %s, %d\n", __FILE__, __LINE__); + pthread_mutex_unlock(&mainobjstore_mutex); return NULL; } + pthread_mutex_unlock(&mainobjstore_mutex); /* Write modified objects into the mainobject store */ for(i = 0; i< localtdata->tdata->buffer->f.nummod; i++) { headeraddr = chashSearch(localtdata->tdata->rec->lookupTable, localtdata->tdata->buffer->oidmod[i]); @@ -1186,9 +1192,8 @@ prefetchpile_t *makePreGroups(prefetchqelem_t *node, int *numoffset) { } /* Insert into machine pile */ offset = &arryfields[endoffsets[i-1]]; - insertPile(machinenum, oid[i], numoffset[i], offset, head); + insertPile(machinenum, oid[i], numoffset[i], offset, &head); } - return head; } @@ -1338,7 +1343,7 @@ void *mcqProcess(void *threadid) { } void sendPrefetchReq(prefetchpile_t *mcpilenode, int threadid) { - int sd, i, offset, off, len, endpair, numoffsets, count = 0; + int sd, i, offset, off, len, endpair, count = 0; struct sockaddr_in serv_addr; struct hostent *server; char machineip[16], control; @@ -1382,9 +1387,9 @@ void sendPrefetchReq(prefetchpile_t *mcpilenode, int threadid) { off = sizeof(int); memcpy(oidnoffset + off, &tmp->oid, sizeof(unsigned int)); off += sizeof(unsigned int); - for(i = 0; i < numoffsets; i++) { - offset = off + (i * sizeof(short)); - memcpy(oidnoffset + offset, tmp->offset, sizeof(short)); + for(i = 0; i < tmp->numoffset; i++) { + memcpy(oidnoffset + off, &tmp->offset[i], sizeof(short)); + off+=sizeof(short); } if (send(sd, &oidnoffset, sizeof(oidnoffset),MSG_NOSIGNAL) < sizeof(oidnoffset)) { perror("Error sending fixed bytes for thread\n"); @@ -1446,16 +1451,17 @@ void getPrefetchResponse(int count, int sd) { index += sizeof(char); memcpy(&oid, buffer + index, sizeof(unsigned int)); index += sizeof(unsigned int); - /* Lock the Prefetch Cache look up table*/ - pthread_mutex_lock(&pflookup.lock); /* For each object found add to Prefetch Cache */ memcpy(&objsize, buffer + index, sizeof(int)); + pthread_mutex_lock(&prefetchcache_mutex); if ((modptr = objstrAlloc(prefetchcache, objsize)) == NULL) { printf("objstrAlloc error for copying into prefetch cache %s, %d\n", __FILE__, __LINE__); + pthread_mutex_unlock(&prefetchcache_mutex); return; } + pthread_mutex_unlock(&prefetchcache_mutex); memcpy(modptr, buffer+index, objsize); - index += sizeof(int); + index += objsize; /* Insert the oid and its address into the prefetch hash lookup table */ /* Do a version comparison if the oid exists */ if((oldptr = prehashSearch(oid)) != NULL) { @@ -1472,6 +1478,8 @@ void getPrefetchResponse(int count, int sd) { } else {/*If doesn't no match found in hashtable, add the object ptr to hash table*/ prehashInsert(oid, modptr); } + /* Lock the Prefetch Cache look up table*/ + pthread_mutex_lock(&pflookup.lock); /* Broadcast signal on prefetch cache condition variable */ pthread_cond_broadcast(&pflookup.cond); /* Unlock the Prefetch Cache look up table*/ @@ -1485,8 +1493,10 @@ void getPrefetchResponse(int count, int sd) { /* Throw an error */ printf("OBJECT NOT FOUND.... THIS SHOULD NOT HAPPEN...TERMINATE PROGRAM\n"); exit(-1); - } else + } else { printf("Error in decoding the index value %s, %d\n",__FILE__, __LINE__); + return; + } } i++; @@ -1506,10 +1516,12 @@ unsigned short getObjType(unsigned int oid) if ((objheader = (objheader_t *) prehashSearch(oid)) == NULL) { prefetch(1, &oid, &numoffsets, NULL); - pthread_mutex_lock(&pflookup.lock); while ((objheader = (objheader_t *) prehashSearch(oid)) == NULL) + { + pthread_mutex_lock(&pflookup.lock); pthread_cond_wait(&pflookup.cond, &pflookup.lock); - pthread_mutex_unlock(&pflookup.lock); + pthread_mutex_unlock(&pflookup.lock); + } } }