X-Git-Url: http://plrg.eecs.uci.edu/git/?a=blobdiff_plain;f=Robust%2Fsrc%2FRuntime%2FDSTM%2Finterface%2Fdstmserver.c;h=0cb3490d2e74c5342e3252120fa73a8ca11fda52;hb=09700dd8510a62b141e53df26b5b1c88f01dd1db;hp=79351126fac1d48a0a1fc556d223a69ced07e067;hpb=1f51fd64a9d725ed875d89a6ee50dc1116f19e7e;p=IRC.git diff --git a/Robust/src/Runtime/DSTM/interface/dstmserver.c b/Robust/src/Runtime/DSTM/interface/dstmserver.c index 79351126..0cb3490d 100644 --- a/Robust/src/Runtime/DSTM/interface/dstmserver.c +++ b/Robust/src/Runtime/DSTM/interface/dstmserver.c @@ -7,9 +7,12 @@ #include "llookup.h" #include "threadnotify.h" #include "prefetch.h" +#include #ifdef COMPILER #include "thread.h" #endif +#include "gCollect.h" +#include "readstruct.h" #define BACKLOG 10 //max pending connections #define RECEIVE_BUFFER_SIZE 2048 @@ -35,7 +38,7 @@ int dstmInit(void) { pthread_mutexattr_settype(&mainobjstore_mutex_attr, PTHREAD_MUTEX_RECURSIVE_NP); pthread_mutex_init(&mainobjstore_mutex, &mainobjstore_mutex_attr); pthread_mutex_init(&lockObjHeader,NULL); - if (mhashCreate(HASH_SIZE, LOADFACTOR)) + if (mhashCreate(MHASH_SIZE, MLOADFACTOR)) return 1; //failure if (lhashCreate(HASH_SIZE, LOADFACTOR)) @@ -128,10 +131,13 @@ void *dstmAccept(void *acceptfd) { trans_commit_data_t transinfo; unsigned short objType, *versionarry, version; unsigned int *oidarry, numoid, mid, threadid; + struct readstruct readbuffer; + readbuffer.head=0; + readbuffer.tail=0; /* Receive control messages from other machines */ while(1) { - int ret=recv_data_errorcode((int)acceptfd, &control, sizeof(char)); + int ret=recv_data_errorcode_buf((int)acceptfd, &readbuffer, &control, sizeof(char)); if (ret==0) break; if (ret==-1) { @@ -141,10 +147,12 @@ void *dstmAccept(void *acceptfd) { switch(control) { case READ_REQUEST: /* Read oid requested and search if available */ - recv_data((int)acceptfd, &oid, sizeof(unsigned int)); - if((srcObj = mhashSearch(oid)) == NULL) { - printf("Error: Object 0x%x is not found in Main Object Store %s, %d\n", oid, __FILE__, __LINE__); - break; + recv_data_buf((int)acceptfd, &readbuffer, &oid, sizeof(unsigned int)); + while((srcObj = mhashSearch(oid)) == NULL) { + int ret; + if((ret = sched_yield()) != 0) { + printf("%s(): error no %d in thread yield\n", __func__, errno); + } } h = (objheader_t *) srcObj; GETSIZE(size, h); @@ -178,7 +186,7 @@ void *dstmAccept(void *acceptfd) { transinfo.modptr = NULL; transinfo.numlocked = 0; transinfo.numnotfound = 0; - if((val = readClientReq(&transinfo, (int)acceptfd)) != 0) { + if((val = readClientReq(&transinfo, (int)acceptfd, &readbuffer)) != 0) { printf("Error: In readClientReq() %s, %d\n", __FILE__, __LINE__); pthread_exit(NULL); } @@ -186,12 +194,12 @@ void *dstmAccept(void *acceptfd) { case TRANS_PREFETCH: #ifdef RANGEPREFETCH - if((val = rangePrefetchReq((int)acceptfd)) != 0) { + if((val = rangePrefetchReq((int)acceptfd, &readbuffer)) != 0) { printf("Error: In rangePrefetchReq() %s, %d\n", __FILE__, __LINE__); break; } #else - if((val = prefetchReq((int)acceptfd)) != 0) { + if((val = prefetchReq((int)acceptfd, &readbuffer)) != 0) { printf("Error: In prefetchReq() %s, %d\n", __FILE__, __LINE__); break; } @@ -200,12 +208,12 @@ void *dstmAccept(void *acceptfd) { case TRANS_PREFETCH_RESPONSE: #ifdef RANGEPREFETCH - if((val = getRangePrefetchResponse((int)acceptfd)) != 0) { + if((val = getRangePrefetchResponse((int)acceptfd, &readbuffer)) != 0) { printf("Error: In getRangePrefetchRespose() %s, %d\n", __FILE__, __LINE__); break; } #else - if((val = getPrefetchResponse((int) acceptfd)) != 0) { + if((val = getPrefetchResponse((int) acceptfd, &readbuffer)) != 0) { printf("Error: In getPrefetchResponse() %s, %d\n", __FILE__, __LINE__); break; } @@ -213,20 +221,20 @@ void *dstmAccept(void *acceptfd) { break; case START_REMOTE_THREAD: - recv_data((int)acceptfd, &oid, sizeof(unsigned int)); + recv_data_buf((int)acceptfd, &readbuffer, &oid, sizeof(unsigned int)); objType = getObjType(oid); startDSMthread(oid, objType); break; case THREAD_NOTIFY_REQUEST: - recv_data((int)acceptfd, &numoid, sizeof(unsigned int)); + recv_data_buf((int)acceptfd, &readbuffer, &numoid, sizeof(unsigned int)); size = (sizeof(unsigned int) + sizeof(unsigned short)) * numoid + 2 * sizeof(unsigned int); if((buffer = calloc(1,size)) == NULL) { printf("%s() Calloc error at %s, %d\n", __func__, __FILE__, __LINE__); pthread_exit(NULL); } - recv_data((int)acceptfd, buffer, size); + recv_data_buf((int)acceptfd, &readbuffer, buffer, size); oidarry = calloc(numoid, sizeof(unsigned int)); memcpy(oidarry, buffer, sizeof(unsigned int) * numoid); @@ -239,7 +247,6 @@ void *dstmAccept(void *acceptfd) { threadid = *((unsigned int *)(buffer+size)); processReqNotify(numoid, oidarry, versionarry, mid, threadid); free(buffer); - break; case THREAD_NOTIFY_RESPONSE: @@ -249,7 +256,7 @@ void *dstmAccept(void *acceptfd) { pthread_exit(NULL); } - recv_data((int)acceptfd, buffer, size); + recv_data_buf((int)acceptfd, &readbuffer, buffer, size); oid = *((unsigned int *)buffer); size = sizeof(unsigned int); @@ -277,7 +284,7 @@ closeconnection: /* This function reads the information available in a transaction request * and makes a function call to process the request */ -int readClientReq(trans_commit_data_t *transinfo, int acceptfd) { +int readClientReq(trans_commit_data_t *transinfo, int acceptfd, struct readstruct * readbuffer) { char *ptr; void *modptr; unsigned int *oidmod, oid; @@ -291,14 +298,14 @@ int readClientReq(trans_commit_data_t *transinfo, int acceptfd) { size = sizeof(fixed) - 1; ptr = (char *)&fixed;; fixed.control = TRANS_REQUEST; - recv_data((int)acceptfd, ptr+1, size); + recv_data_buf((int)acceptfd, readbuffer, ptr+1, size); /* Read list of mids */ int mcount = fixed.mcount; size = mcount * sizeof(unsigned int); unsigned int listmid[mcount]; ptr = (char *) listmid; - recv_data((int)acceptfd, ptr, size); + recv_data_buf((int)acceptfd, readbuffer, ptr, size); /* Read oid and version tuples for those objects that are not modified in the transaction */ int numread = fixed.numread; @@ -306,7 +313,7 @@ int readClientReq(trans_commit_data_t *transinfo, int acceptfd) { char objread[size]; if(numread != 0) { //If pile contains more than one object to be read, // keep reading all objects - recv_data((int)acceptfd, objread, size); + recv_data_buf((int)acceptfd, readbuffer, objread, size); } /* Read modified objects */ @@ -316,7 +323,7 @@ int readClientReq(trans_commit_data_t *transinfo, int acceptfd) { return 1; } size = fixed.sum_bytes; - recv_data((int)acceptfd, modptr, size); + recv_data_buf((int)acceptfd, readbuffer, modptr, size); } /* Create an array of oids for modified objects */ @@ -336,7 +343,7 @@ int readClientReq(trans_commit_data_t *transinfo, int acceptfd) { } /*Process the information read */ - if((val = processClientReq(&fixed, transinfo, listmid, objread, modptr, oidmod, acceptfd)) != 0) { + if((val = processClientReq(&fixed, transinfo, listmid, objread, modptr, oidmod, acceptfd, readbuffer)) != 0) { printf("Error: In processClientReq() %s, %d\n", __FILE__, __LINE__); /* Free resources */ if(oidmod != NULL) { @@ -357,7 +364,7 @@ int readClientReq(trans_commit_data_t *transinfo, int acceptfd) { * function and sends a reply to the co-ordinator. * Following this it also receives a new control message from the co-ordinator and processes this message*/ int processClientReq(fixed_data_t *fixed, trans_commit_data_t *transinfo, - unsigned int *listmid, char *objread, void *modptr, unsigned int *oidmod, int acceptfd) { + unsigned int *listmid, char *objread, void *modptr, unsigned int *oidmod, int acceptfd, struct readstruct *readbuffer) { char control, sendctrl, retval; objheader_t *tmp_header; @@ -370,7 +377,7 @@ int processClientReq(fixed_data_t *fixed, trans_commit_data_t *transinfo, return 1; } - recv_data((int)acceptfd, &control, sizeof(char)); + recv_data_buf((int)acceptfd, readbuffer, &control, sizeof(char)); /* Process the new control message */ switch(control) { case TRANS_ABORT: @@ -464,8 +471,7 @@ char handleTransReq(fixed_data_t *fixed, trans_commit_data_t *transinfo, unsigne &v_matchnolock, &v_matchlock, &v_nomatch, &numBytes, &control, oid, version); } else { //Objs modified if(i == fixed->numread) { - oidlocked[objlocked] = -1; - objlocked++; + oidlocked[objlocked++] = -1; } int tmpsize; headptr = (objheader_t *) ptr; @@ -563,8 +569,7 @@ void getCommitCountForObjMod(unsigned int *oidnotfound, unsigned int *oidlocked, //printf("%s() oid = %d, type = %d\t", __func__, OID(mobj), TYPE((objheader_t *)mobj)); } //Keep track of oid locked - oidlocked[*objlocked] = OID(((objheader_t *)mobj)); - (*objlocked)++; + oidlocked[(*objlocked)++] = OID(((objheader_t *)mobj)); } else { //we are locked if (version == ((objheader_t *)mobj)->version) { /* Check if versions match */ (*v_matchlock)++; @@ -600,8 +605,7 @@ void getCommitCountForObjRead(unsigned int *oidnotfound, unsigned int *oidlocked (*v_matchnolock)++; } else { /* If versions don't match ...HARD ABORT */ (*v_nomatch)++; - oidvernotmatch[*objvernotmatch] = oid; - (*objvernotmatch)++; + oidvernotmatch[(*objvernotmatch)++] = oid; int size; GETSIZE(size, mobj); size += sizeof(objheader_t); @@ -611,8 +615,7 @@ void getCommitCountForObjRead(unsigned int *oidnotfound, unsigned int *oidlocked //printf("%s() oid = %d, type = %d\t", __func__, OID(mobj), TYPE((objheader_t *)mobj)); } //Keep track of oid locked - oidlocked[*objlocked] = OID(((objheader_t *)mobj)); - (*objlocked)++; + oidlocked[(*objlocked)++] = OID(((objheader_t *)mobj)); } else { /* Some other transaction has aquired a write lock on this object */ if (version == ((objheader_t *)mobj)->version) { /* Check if versions match */ (*v_matchlock)++; @@ -689,7 +692,15 @@ int transCommitProcess(void *modptr, unsigned int *oidmod, unsigned int *oidlock return 1; } GETSIZE(tmpsize,header); - memcpy((char*)header + sizeof(objheader_t), ((char *)modptr + sizeof(objheader_t) + offset), tmpsize); + + { + struct ___Object___ *dst=(struct ___Object___*)((char*)header+sizeof(objheader_t)); + struct ___Object___ *src=(struct ___Object___*)((char*)modptr+sizeof(objheader_t)+offset); + dst->type=src->type; + dst->___cachedCode___=src->___cachedCode___; + dst->___cachedHash___=src->___cachedHash___; + memcpy(&dst[1], &src[1], tmpsize-sizeof(struct ___Object___)); + } header->version += 1; /* If threads are waiting on this object to be updated, notify them */ if(header->notifylist != NULL) { @@ -728,7 +739,7 @@ int transCommitProcess(void *modptr, unsigned int *oidmod, unsigned int *oidlock * If objects are not found then record those and if objects are found * then use offset values to prefetch references to other objects */ -int prefetchReq(int acceptfd) { +int prefetchReq(int acceptfd, struct readstruct * readbuffer) { int i, size, objsize, numoffset = 0; int length; char *recvbuffer, control; @@ -736,12 +747,11 @@ int prefetchReq(int acceptfd) { objheader_t *header; oidmidpair_t oidmid; int sd = -1; - while(1) { - recv_data((int)acceptfd, &numoffset, sizeof(int)); + recv_data_buf((int)acceptfd, readbuffer, &numoffset, sizeof(int)); if(numoffset == -1) break; - recv_data((int)acceptfd, &oidmid, 2*sizeof(unsigned int)); + recv_data_buf((int)acceptfd, readbuffer, &oidmid, 2*sizeof(unsigned int)); oid = oidmid.oid; if (mid != oidmid.mid) { if (mid!=-1) { @@ -751,23 +761,24 @@ int prefetchReq(int acceptfd) { sd = getSockWithLock(transPResponseSocketPool, mid); } short offsetarry[numoffset]; - recv_data((int) acceptfd, offsetarry, numoffset*sizeof(short)); + recv_data_buf((int) acceptfd, readbuffer, offsetarry, numoffset*sizeof(short)); /*Process each oid */ if ((header = mhashSearch(oid)) == NULL) { /* Obj not found */ /* Save the oids not found in buffer for later use */ size = sizeof(int) + sizeof(char) + sizeof(unsigned int) ; - char sendbuffer[size]; - *((int *) sendbuffer) = size; - *((char *)(sendbuffer + sizeof(int))) = OBJECT_NOT_FOUND; - *((unsigned int *)(sendbuffer + sizeof(int) + sizeof(char))) = oid; - control = TRANS_PREFETCH_RESPONSE; - sendPrefetchResponse(sd, &control, sendbuffer, &size); + char sendbuffer[size+1]; + sendbuffer[0]=TRANS_PREFETCH_RESPONSE; + *((int *) (sendbuffer+sizeof(char))) = size; + *((char *)(sendbuffer + sizeof(char)+sizeof(int))) = OBJECT_NOT_FOUND; + *((unsigned int *)(sendbuffer + sizeof(int) + sizeof(char)+sizeof(char))) = oid; + send_data(sd, sendbuffer, size+1); } else { /* Object Found */ - int incr = 0; + int incr = 1; GETSIZE(objsize, header); size = sizeof(int) + sizeof(char) + sizeof(unsigned int) + sizeof(objheader_t) + objsize; - char sendbuffer[size]; + char sendbuffer[size+1]; + sendbuffer[0]=TRANS_PREFETCH_RESPONSE; *((int *)(sendbuffer + incr)) = size; incr += sizeof(int); *((char *)(sendbuffer + incr)) = OBJECT_FOUND; @@ -775,14 +786,12 @@ int prefetchReq(int acceptfd) { *((unsigned int *)(sendbuffer+incr)) = oid; incr += sizeof(unsigned int); memcpy(sendbuffer + incr, header, objsize + sizeof(objheader_t)); - - control = TRANS_PREFETCH_RESPONSE; - sendPrefetchResponse(sd, &control, sendbuffer, &size); + send_data(sd, sendbuffer, size+1); /* Calculate the oid corresponding to the offset value */ for(i = 0 ; i< numoffset ; i++) { /* Check for arrays */ - if(TYPE(header) > NUMCLASSES) { + if(TYPE(header) >= NUMCLASSES) { int elementsize = classsize[TYPE(header)]; struct ArrayObject *ao = (struct ArrayObject *) (((char *)header) + sizeof(objheader_t)); unsigned short length = ao->___length___; @@ -801,19 +810,20 @@ int prefetchReq(int acceptfd) { if((header = mhashSearch(oid)) == NULL) { size = sizeof(int) + sizeof(char) + sizeof(unsigned int) ; - char sendbuffer[size]; - *((int *) sendbuffer) = size; - *((char *)(sendbuffer + sizeof(int))) = OBJECT_NOT_FOUND; - *((unsigned int *)(sendbuffer + sizeof(int) + sizeof(char))) = oid; + char sendbuffer[size+1]; + sendbuffer[0]=TRANS_PREFETCH_RESPONSE; + *((int *) (sendbuffer+1)) = size; + *((char *)(sendbuffer + sizeof(char)+sizeof(int))) = OBJECT_NOT_FOUND; + *((unsigned int *)(sendbuffer + sizeof(char)+sizeof(int) + sizeof(char))) = oid; - control = TRANS_PREFETCH_RESPONSE; - sendPrefetchResponse(sd, &control, sendbuffer, &size); + send_data(sd, sendbuffer, size+1); break; } else { /* Obj Found */ - int incr = 0; + int incr = 1; GETSIZE(objsize, header); size = sizeof(int) + sizeof(char) + sizeof(unsigned int) + sizeof(objheader_t) + objsize; - char sendbuffer[size]; + char sendbuffer[size+1]; + sendbuffer[0]=TRANS_PREFETCH_RESPONSE; *((int *)(sendbuffer + incr)) = size; incr += sizeof(int); *((char *)(sendbuffer + incr)) = OBJECT_FOUND; @@ -821,17 +831,14 @@ int prefetchReq(int acceptfd) { *((unsigned int *)(sendbuffer+incr)) = oid; incr += sizeof(unsigned int); memcpy(sendbuffer + incr, header, objsize + sizeof(objheader_t)); - - control = TRANS_PREFETCH_RESPONSE; - sendPrefetchResponse(sd, &control, sendbuffer, &size); + send_data(sd, sendbuffer, size+1); } } //end of for } } //end of while - //Release socket + //Release socket if (mid!=-1) freeSockWithLock(transPResponseSocketPool, mid, sd); - return 0; }