#include "mlookup.h"
#include "llookup.h"
#include "threadnotify.h"
+#include "prefetch.h"
+#include <sched.h>
#ifdef COMPILER
#include "thread.h"
#endif
+#include "gCollect.h"
+#include "readstruct.h"
#define BACKLOG 10 //max pending connections
#define RECEIVE_BUFFER_SIZE 2048
sockPoolHashTable_t *transPResponseSocketPool;
-
/* This function initializes the main objects store and creates the
* global machine and location lookup table */
pthread_mutexattr_settype(&mainobjstore_mutex_attr, PTHREAD_MUTEX_RECURSIVE_NP);
pthread_mutex_init(&mainobjstore_mutex, &mainobjstore_mutex_attr);
pthread_mutex_init(&lockObjHeader,NULL);
- if (mhashCreate(HASH_SIZE, LOADFACTOR))
+ if (mhashCreate(MHASH_SIZE, MLOADFACTOR))
return 1; //failure
if (lhashCreate(HASH_SIZE, LOADFACTOR))
trans_commit_data_t transinfo;
unsigned short objType, *versionarry, version;
unsigned int *oidarry, numoid, mid, threadid;
+ struct readstruct readbuffer;
+ readbuffer.head=0;
+ readbuffer.tail=0;
/* Receive control messages from other machines */
while(1) {
- int ret=recv_data_errorcode((int)acceptfd, &control, sizeof(char));
+ int ret=recv_data_errorcode_buf((int)acceptfd, &readbuffer, &control, sizeof(char));
if (ret==0)
break;
if (ret==-1) {
switch(control) {
case READ_REQUEST:
/* Read oid requested and search if available */
- recv_data((int)acceptfd, &oid, sizeof(unsigned int));
- if((srcObj = mhashSearch(oid)) == NULL) {
- printf("Error: Object 0x%x is not found in Main Object Store %s, %d\n", oid, __FILE__, __LINE__);
- break;
+ recv_data_buf((int)acceptfd, &readbuffer, &oid, sizeof(unsigned int));
+ while((srcObj = mhashSearch(oid)) == NULL) {
+ int ret;
+ if((ret = sched_yield()) != 0) {
+ printf("%s(): error no %d in thread yield\n", __func__, errno);
+ }
}
h = (objheader_t *) srcObj;
GETSIZE(size, h);
transinfo.modptr = NULL;
transinfo.numlocked = 0;
transinfo.numnotfound = 0;
- if((val = readClientReq(&transinfo, (int)acceptfd)) != 0) {
+ if((val = readClientReq(&transinfo, (int)acceptfd, &readbuffer)) != 0) {
printf("Error: In readClientReq() %s, %d\n", __FILE__, __LINE__);
pthread_exit(NULL);
}
break;
case TRANS_PREFETCH:
- if((val = prefetchReq((int)acceptfd)) != 0) {
+#ifdef RANGEPREFETCH
+ if((val = rangePrefetchReq((int)acceptfd, &readbuffer)) != 0) {
+ printf("Error: In rangePrefetchReq() %s, %d\n", __FILE__, __LINE__);
+ break;
+ }
+#else
+ if((val = prefetchReq((int)acceptfd, &readbuffer)) != 0) {
printf("Error: In prefetchReq() %s, %d\n", __FILE__, __LINE__);
break;
}
+#endif
break;
case TRANS_PREFETCH_RESPONSE:
- if((val = getPrefetchResponse((int) acceptfd)) != 0) {
+#ifdef RANGEPREFETCH
+ if((val = getRangePrefetchResponse((int)acceptfd, &readbuffer)) != 0) {
+ printf("Error: In getRangePrefetchRespose() %s, %d\n", __FILE__, __LINE__);
+ break;
+ }
+#else
+ if((val = getPrefetchResponse((int) acceptfd, &readbuffer)) != 0) {
printf("Error: In getPrefetchResponse() %s, %d\n", __FILE__, __LINE__);
break;
}
+#endif
break;
case START_REMOTE_THREAD:
- recv_data((int)acceptfd, &oid, sizeof(unsigned int));
+ recv_data_buf((int)acceptfd, &readbuffer, &oid, sizeof(unsigned int));
objType = getObjType(oid);
startDSMthread(oid, objType);
break;
case THREAD_NOTIFY_REQUEST:
- recv_data((int)acceptfd, &numoid, sizeof(unsigned int));
+ recv_data_buf((int)acceptfd, &readbuffer, &numoid, sizeof(unsigned int));
size = (sizeof(unsigned int) + sizeof(unsigned short)) * numoid + 2 * sizeof(unsigned int);
if((buffer = calloc(1,size)) == NULL) {
printf("%s() Calloc error at %s, %d\n", __func__, __FILE__, __LINE__);
pthread_exit(NULL);
}
- recv_data((int)acceptfd, buffer, size);
+ recv_data_buf((int)acceptfd, &readbuffer, buffer, size);
oidarry = calloc(numoid, sizeof(unsigned int));
memcpy(oidarry, buffer, sizeof(unsigned int) * numoid);
threadid = *((unsigned int *)(buffer+size));
processReqNotify(numoid, oidarry, versionarry, mid, threadid);
free(buffer);
-
break;
case THREAD_NOTIFY_RESPONSE:
pthread_exit(NULL);
}
- recv_data((int)acceptfd, buffer, size);
+ recv_data_buf((int)acceptfd, &readbuffer, buffer, size);
oid = *((unsigned int *)buffer);
size = sizeof(unsigned int);
/* This function reads the information available in a transaction request
* and makes a function call to process the request */
-int readClientReq(trans_commit_data_t *transinfo, int acceptfd) {
+int readClientReq(trans_commit_data_t *transinfo, int acceptfd, struct readstruct * readbuffer) {
char *ptr;
void *modptr;
unsigned int *oidmod, oid;
size = sizeof(fixed) - 1;
ptr = (char *)&fixed;;
fixed.control = TRANS_REQUEST;
- recv_data((int)acceptfd, ptr+1, size);
+ recv_data_buf((int)acceptfd, readbuffer, ptr+1, size);
/* Read list of mids */
int mcount = fixed.mcount;
size = mcount * sizeof(unsigned int);
unsigned int listmid[mcount];
ptr = (char *) listmid;
- recv_data((int)acceptfd, ptr, size);
+ recv_data_buf((int)acceptfd, readbuffer, ptr, size);
/* Read oid and version tuples for those objects that are not modified in the transaction */
int numread = fixed.numread;
char objread[size];
if(numread != 0) { //If pile contains more than one object to be read,
// keep reading all objects
- recv_data((int)acceptfd, objread, size);
+ recv_data_buf((int)acceptfd, readbuffer, objread, size);
}
/* Read modified objects */
return 1;
}
size = fixed.sum_bytes;
- recv_data((int)acceptfd, modptr, size);
+ recv_data_buf((int)acceptfd, readbuffer, modptr, size);
}
/* Create an array of oids for modified objects */
}
/*Process the information read */
- if((val = processClientReq(&fixed, transinfo, listmid, objread, modptr, oidmod, acceptfd)) != 0) {
+ if((val = processClientReq(&fixed, transinfo, listmid, objread, modptr, oidmod, acceptfd, readbuffer)) != 0) {
printf("Error: In processClientReq() %s, %d\n", __FILE__, __LINE__);
/* Free resources */
if(oidmod != NULL) {
* function and sends a reply to the co-ordinator.
* Following this it also receives a new control message from the co-ordinator and processes this message*/
int processClientReq(fixed_data_t *fixed, trans_commit_data_t *transinfo,
- unsigned int *listmid, char *objread, void *modptr, unsigned int *oidmod, int acceptfd) {
+ unsigned int *listmid, char *objread, void *modptr, unsigned int *oidmod, int acceptfd, struct readstruct *readbuffer) {
+
char control, sendctrl, retval;
objheader_t *tmp_header;
void *header;
printf("Error: In handleTransReq() %s, %d\n", __FILE__, __LINE__);
return 1;
}
- recv_data((int)acceptfd, &control, sizeof(char));
+
+ recv_data_buf((int)acceptfd, readbuffer, &control, sizeof(char));
/* Process the new control message */
switch(control) {
case TRANS_ABORT:
//TODO Use fixed.trans_id TID since Client may have died
break;
}
+
/* Free memory */
if (transinfo->objlocked != NULL) {
free(transinfo->objlocked);
if (transinfo->objnotfound != NULL) {
free(transinfo->objnotfound);
}
+
return 0;
}
&v_matchnolock, &v_matchlock, &v_nomatch, &numBytes, &control, oid, version);
} else { //Objs modified
if(i == fixed->numread) {
- oidlocked[objlocked] = -1;
- objlocked++;
+ oidlocked[objlocked++] = -1;
}
int tmpsize;
headptr = (objheader_t *) ptr;
*numBytes += size;
/* Send TRANS_DISAGREE to Coordinator */
*control = TRANS_DISAGREE;
+ //printf("%s() oid = %d, type = %d\t", __func__, OID(mobj), TYPE((objheader_t *)mobj));
}
//Keep track of oid locked
- oidlocked[*objlocked] = OID(((objheader_t *)mobj));
- (*objlocked)++;
+ oidlocked[(*objlocked)++] = OID(((objheader_t *)mobj));
} else { //we are locked
if (version == ((objheader_t *)mobj)->version) { /* Check if versions match */
(*v_matchlock)++;
size += sizeof(objheader_t);
*numBytes += size;
*control = TRANS_DISAGREE;
+ //printf("%s() oid = %d, type = %d\t", __func__, OID(mobj), TYPE((objheader_t *)mobj));
}
}
}
(*v_matchnolock)++;
} else { /* If versions don't match ...HARD ABORT */
(*v_nomatch)++;
- oidvernotmatch[*objvernotmatch] = oid;
- (*objvernotmatch)++;
+ oidvernotmatch[(*objvernotmatch)++] = oid;
int size;
GETSIZE(size, mobj);
size += sizeof(objheader_t);
*numBytes += size;
/* Send TRANS_DISAGREE to Coordinator */
*control = TRANS_DISAGREE;
+ //printf("%s() oid = %d, type = %d\t", __func__, OID(mobj), TYPE((objheader_t *)mobj));
}
//Keep track of oid locked
- oidlocked[*objlocked] = OID(((objheader_t *)mobj));
- (*objlocked)++;
+ oidlocked[(*objlocked)++] = OID(((objheader_t *)mobj));
} else { /* Some other transaction has aquired a write lock on this object */
if (version == ((objheader_t *)mobj)->version) { /* Check if versions match */
(*v_matchlock)++;
size += sizeof(objheader_t);
*numBytes += size;
*control = TRANS_DISAGREE;
+ //printf("%s() oid = %d, type = %d\t", __func__, OID(mobj), TYPE((objheader_t *)mobj));
}
}
}
transinfo->modptr = modptr;
transinfo->numlocked = *(objlocked);
transinfo->numnotfound = *(objnotfound);
-
return control;
}
return 1;
}
GETSIZE(tmpsize,header);
- memcpy((char*)header + sizeof(objheader_t), ((char *)modptr + sizeof(objheader_t) + offset), tmpsize);
+
+ {
+ struct ___Object___ *dst=(struct ___Object___*)((char*)header+sizeof(objheader_t));
+ struct ___Object___ *src=(struct ___Object___*)((char*)modptr+sizeof(objheader_t)+offset);
+ dst->type=src->type;
+ dst->___cachedCode___=src->___cachedCode___;
+ dst->___cachedHash___=src->___cachedHash___;
+ memcpy(&dst[1], &src[1], tmpsize-sizeof(struct ___Object___));
+ }
header->version += 1;
/* If threads are waiting on this object to be updated, notify them */
if(header->notifylist != NULL) {
* If objects are not found then record those and if objects are found
* then use offset values to prefetch references to other objects */
-int prefetchReq(int acceptfd) {
+int prefetchReq(int acceptfd, struct readstruct * readbuffer) {
int i, size, objsize, numoffset = 0;
int length;
char *recvbuffer, control;
objheader_t *header;
oidmidpair_t oidmid;
int sd = -1;
-
while(1) {
- recv_data((int)acceptfd, &numoffset, sizeof(int));
+ recv_data_buf((int)acceptfd, readbuffer, &numoffset, sizeof(int));
if(numoffset == -1)
break;
- recv_data((int)acceptfd, &oidmid, 2*sizeof(unsigned int));
+ recv_data_buf((int)acceptfd, readbuffer, &oidmid, 2*sizeof(unsigned int));
oid = oidmid.oid;
if (mid != oidmid.mid) {
if (mid!=-1) {
sd = getSockWithLock(transPResponseSocketPool, mid);
}
short offsetarry[numoffset];
- recv_data((int) acceptfd, offsetarry, numoffset*sizeof(short));
+ recv_data_buf((int) acceptfd, readbuffer, offsetarry, numoffset*sizeof(short));
/*Process each oid */
if ((header = mhashSearch(oid)) == NULL) { /* Obj not found */
/* Save the oids not found in buffer for later use */
size = sizeof(int) + sizeof(char) + sizeof(unsigned int) ;
- char sendbuffer[size];
- *((int *) sendbuffer) = size;
- *((char *)(sendbuffer + sizeof(int))) = OBJECT_NOT_FOUND;
- *((unsigned int *)(sendbuffer + sizeof(int) + sizeof(char))) = oid;
- control = TRANS_PREFETCH_RESPONSE;
- sendPrefetchResponse(sd, &control, sendbuffer, &size);
+ char sendbuffer[size+1];
+ sendbuffer[0]=TRANS_PREFETCH_RESPONSE;
+ *((int *) (sendbuffer+sizeof(char))) = size;
+ *((char *)(sendbuffer + sizeof(char)+sizeof(int))) = OBJECT_NOT_FOUND;
+ *((unsigned int *)(sendbuffer + sizeof(int) + sizeof(char)+sizeof(char))) = oid;
+ send_data(sd, sendbuffer, size+1);
} else { /* Object Found */
- int incr = 0;
+ int incr = 1;
GETSIZE(objsize, header);
size = sizeof(int) + sizeof(char) + sizeof(unsigned int) + sizeof(objheader_t) + objsize;
- char sendbuffer[size];
+ char sendbuffer[size+1];
+ sendbuffer[0]=TRANS_PREFETCH_RESPONSE;
*((int *)(sendbuffer + incr)) = size;
incr += sizeof(int);
*((char *)(sendbuffer + incr)) = OBJECT_FOUND;
*((unsigned int *)(sendbuffer+incr)) = oid;
incr += sizeof(unsigned int);
memcpy(sendbuffer + incr, header, objsize + sizeof(objheader_t));
-
- control = TRANS_PREFETCH_RESPONSE;
- sendPrefetchResponse(sd, &control, sendbuffer, &size);
+ send_data(sd, sendbuffer, size+1);
/* Calculate the oid corresponding to the offset value */
for(i = 0 ; i< numoffset ; i++) {
/* Check for arrays */
- if(TYPE(header) > NUMCLASSES) {
+ if(TYPE(header) >= NUMCLASSES) {
int elementsize = classsize[TYPE(header)];
struct ArrayObject *ao = (struct ArrayObject *) (((char *)header) + sizeof(objheader_t));
unsigned short length = ao->___length___;
if((header = mhashSearch(oid)) == NULL) {
size = sizeof(int) + sizeof(char) + sizeof(unsigned int) ;
- char sendbuffer[size];
- *((int *) sendbuffer) = size;
- *((char *)(sendbuffer + sizeof(int))) = OBJECT_NOT_FOUND;
- *((unsigned int *)(sendbuffer + sizeof(int) + sizeof(char))) = oid;
+ char sendbuffer[size+1];
+ sendbuffer[0]=TRANS_PREFETCH_RESPONSE;
+ *((int *) (sendbuffer+1)) = size;
+ *((char *)(sendbuffer + sizeof(char)+sizeof(int))) = OBJECT_NOT_FOUND;
+ *((unsigned int *)(sendbuffer + sizeof(char)+sizeof(int) + sizeof(char))) = oid;
- control = TRANS_PREFETCH_RESPONSE;
- sendPrefetchResponse(sd, &control, sendbuffer, &size);
+ send_data(sd, sendbuffer, size+1);
break;
} else { /* Obj Found */
- int incr = 0;
+ int incr = 1;
GETSIZE(objsize, header);
size = sizeof(int) + sizeof(char) + sizeof(unsigned int) + sizeof(objheader_t) + objsize;
- char sendbuffer[size];
+ char sendbuffer[size+1];
+ sendbuffer[0]=TRANS_PREFETCH_RESPONSE;
*((int *)(sendbuffer + incr)) = size;
incr += sizeof(int);
*((char *)(sendbuffer + incr)) = OBJECT_FOUND;
*((unsigned int *)(sendbuffer+incr)) = oid;
incr += sizeof(unsigned int);
memcpy(sendbuffer + incr, header, objsize + sizeof(objheader_t));
-
- control = TRANS_PREFETCH_RESPONSE;
- sendPrefetchResponse(sd, &control, sendbuffer, &size);
+ send_data(sd, sendbuffer, size+1);
}
- }
+ } //end of for
}
- }
- //Release socket
+ } //end of while
+ //Release socket
if (mid!=-1)
freeSockWithLock(transPResponseSocketPool, mid, sd);
-
return 0;
}