From: adash Date: Fri, 21 Mar 2008 21:20:16 +0000 (+0000) Subject: Added send_data() and recv_data() methods for send() and recv() X-Git-Tag: preEdgeChange~221 X-Git-Url: http://plrg.eecs.uci.edu/git/?a=commitdiff_plain;h=b6e00731883c92ffe4c126b9e14877ca76320ef6;p=IRC.git Added send_data() and recv_data() methods for send() and recv() --- diff --git a/Robust/src/Runtime/DSTM/interface/dstm.h b/Robust/src/Runtime/DSTM/interface/dstm.h index 09684f9c..6c893cfc 100644 --- a/Robust/src/Runtime/DSTM/interface/dstm.h +++ b/Robust/src/Runtime/DSTM/interface/dstm.h @@ -5,13 +5,16 @@ #define MSG_NOSIGNAL 0 #endif +/*********************************************************** + * Macros + **********************************************************/ #define GET_NTUPLES(x) ((int *)(x + sizeof(prefetchqelem_t))) #define GET_PTR_OID(x) ((unsigned int *)(x + sizeof(prefetchqelem_t) + sizeof(int))) #define GET_PTR_EOFF(x,n) ((short *)(x + sizeof(prefetchqelem_t) + sizeof(int) + (n*sizeof(unsigned int)))) #define GET_PTR_ARRYFLD(x,n) ((short *)(x + sizeof(prefetchqelem_t) + sizeof(int) + (n*sizeof(unsigned int)) + (n*sizeof(short)))) - - -//Coordinator Messages +/***************************************** + * Coordinator Messages + ***************************************/ #define READ_REQUEST 1 #define READ_MULT_REQUEST 2 #define MOVE_REQUEST 3 @@ -22,7 +25,9 @@ #define TRANS_PREFETCH 8 #define TRANS_ABORT_BUT_RETRY_COMMIT_WITH_RELOCATING 9 -//Participant Messages +/********************************* + * Participant Messages + *******************************/ #define OBJECT_FOUND 10 #define OBJECT_NOT_FOUND 11 #define OBJECTS_FOUND 12 @@ -38,13 +43,13 @@ #define THREAD_NOTIFY_RESPONSE 25 #define TRANS_UNSUCESSFUL 26 -//Control bits for status of objects in Machine pile -#define OBJ_LOCKED_BUT_VERSION_MATCH 14 -#define OBJ_UNLOCK_BUT_VERSION_MATCH 15 -#define VERSION_NO_MATCH 16 - //Max number of objects #define MAX_OBJECTS 20 +//Max remote-machine connections +#define NUM_MACHINES 2 +#define DEFAULT_OBJ_STORE_SIZE 1048510 //1MB +//Transaction id per machine +#define TID_LEN 20 #include @@ -57,8 +62,6 @@ #include "threadnotify.h" -#define DEFAULT_OBJ_STORE_SIZE 1048510 //1MB -#define TID_LEN 20 //bit designations for status field of objheader #define DIRTY 0x01 #define NEW 0x02 @@ -127,6 +130,7 @@ typedef struct transrecord { struct ___Object___ * revertlist; #endif } transrecord_t; + // Structure is a shared structure that keeps track of responses from the participants typedef struct thread_response { char rcv_status; @@ -188,12 +192,14 @@ typedef struct local_thread_data_array { //Structure to store mid and socketid information typedef struct midSocketInfo { - unsigned int mid; + unsigned int mid; /* To communicate with mid use sockid in this data structure*/ int sockid; } midSocketInfo_t; /* Initialize main object store and lookup tables, start server thread. */ int dstmInit(void); +void send_data(int fd, void *buf, int buflen); +void recv_data(int fd, void *buf, int buflen); /* Prototypes for object header */ unsigned int getNewOID(void); @@ -229,13 +235,13 @@ void mapObjMethod(unsigned short); void randomdelay(); transrecord_t *transStart(); objheader_t *transRead(transrecord_t *, unsigned int); -objheader_t *transCreateObj(transrecord_t *, unsigned int); //returns oid +objheader_t *transCreateObj(transrecord_t *, unsigned int); //returns oid header int transCommit(transrecord_t *record); //return 0 if successful void *transRequest(void *); //the C routine that the thread will execute when TRANS_REQUEST begins void decideResponse(thread_data_array_t *);// Coordinator decides what response to send to the participant char sendResponse(thread_data_array_t *, int); //Sends control message back to Participants -void *getRemoteObj(transrecord_t *, unsigned int, unsigned int); -void *handleLocalReq(void *); +void *getRemoteObj(transrecord_t *, unsigned int, unsigned int);// returns object header from main object store after object is copied into it from remote machine +void *handleLocalReq(void *);//handles Local requests int transComProcess(local_thread_data_array_t *); int transAbortProcess(local_thread_data_array_t *); void transAbort(transrecord_t *trans); @@ -244,8 +250,8 @@ void prefetch(int, unsigned int *, unsigned short *, short*); void *transPrefetch(void *); void *mcqProcess(void *); void checkPrefetchTuples(prefetchqelem_t *); -prefetchpile_t *foundLocal(prefetchqelem_t *); -prefetchpile_t *makePreGroups(prefetchqelem_t *, int *); +prefetchpile_t *foundLocal(prefetchqelem_t *);// returns node with prefetch elements(oids, offsets) +prefetchpile_t *makePreGroups(prefetchqelem_t *, int *);// returns node with prefetch elements(oids, offsets) void checkPreCache(prefetchqelem_t *, int *, unsigned int, int); int transPrefetchProcess(transrecord_t *, int **, short); void sendPrefetchReq(prefetchpile_t*, int); diff --git a/Robust/src/Runtime/DSTM/interface/dstmserver.c b/Robust/src/Runtime/DSTM/interface/dstmserver.c index e76ab19f..672a955c 100644 --- a/Robust/src/Runtime/DSTM/interface/dstmserver.c +++ b/Robust/src/Runtime/DSTM/interface/dstmserver.c @@ -27,6 +27,14 @@ extern int classsize[]; objstr_t *mainobjstore; pthread_mutex_t mainobjstore_mutex; pthread_mutexattr_t mainobjstore_mutex_attr; /* Attribute for lock to make it a recursive lock */ +/********************************************************** + * Global variables to map socketid and remote mid + * to resuse sockets + **************************************************/ +midSocketInfo_t sockArray[NUM_MACHINES]; +int sockCount; //number of connections with all remote machines(one socket per mc) +int sockIdFound; //track if socket file descriptor is already established +pthread_mutex_t sockLock = PTHREAD_MUTEX_INITIALIZER; //lock to prevent global sock variables to be inconsistent /* This function initializes the main objects store and creates the * global machine and location lookup table */ @@ -47,6 +55,14 @@ int dstmInit(void) if (notifyhashCreate(N_HASH_SIZE, N_LOADFACTOR)) return 1; //failure + //Initialize mid to socketid mapping array + int t; + sockCount = 0; + for(t = 0; t < NUM_MACHINES; t++) { + sockArray[t].mid = 0; + sockArray[t].sockid = 0; + } + return 0; } @@ -112,7 +128,7 @@ void *dstmListen() * and accordingly calls other functions to process new requests */ void *dstmAccept(void *acceptfd) { - int val, retval, size, sum; + int val, retval, size, sum, sockid; unsigned int oid; char *buffer; char control,ctrl; @@ -130,18 +146,12 @@ void *dstmAccept(void *acceptfd) transinfo.numnotfound = 0; /* Receive control messages from other machines */ - if((retval = recv((int)acceptfd, &control, sizeof(char), 0)) <= 0) { - printf("%s() Error: Receiving control = %d at %s, %d\n", __func__, control, __FILE__, __LINE__); - pthread_exit(NULL); - } - + recv_data((int)acceptfd, &control, sizeof(char)); + switch(control) { case READ_REQUEST: /* Read oid requested and search if available */ - if((retval = recv((int)acceptfd, &oid, sizeof(unsigned int), 0)) <= 0) { - perror("Error: receiving 0x0 object from cooridnator\n"); - pthread_exit(NULL); - } + recv_data((int)acceptfd, &oid, sizeof(unsigned int)); if((srcObj = mhashSearch(oid)) == NULL) { printf("Error: Object 0x%x is not found in Main Object Store %s, %d\n", oid, __FILE__, __LINE__); pthread_exit(NULL); @@ -149,25 +159,17 @@ void *dstmAccept(void *acceptfd) h = (objheader_t *) srcObj; GETSIZE(size, h); size += sizeof(objheader_t); + sockid = (int) acceptfd; if (h == NULL) { ctrl = OBJECT_NOT_FOUND; - if(send((int)acceptfd, &ctrl, sizeof(char), MSG_NOSIGNAL) < sizeof(char)) { - perror("Error sending control msg to coordinator\n"); - pthread_exit(NULL); - } + send_data(sockid, &ctrl, sizeof(char)); } else { /* Type */ char msg[]={OBJECT_FOUND, 0, 0, 0, 0}; *((int *)&msg[1])=size; - if(send((int)acceptfd, &msg, sizeof(msg), MSG_NOSIGNAL) < sizeof(msg)) { - perror("Error sending size of object to coordinator\n"); - pthread_exit(NULL); - } - if(send((int)acceptfd, h, size, MSG_NOSIGNAL) < size) { - perror("Error in sending object\n"); - pthread_exit(NULL); - } + send_data(sockid, &msg, sizeof(msg)); + send_data(sockid, h, size); } break; @@ -191,51 +193,34 @@ void *dstmAccept(void *acceptfd) do { if((val = prefetchReq((int)acceptfd)) != 0) { printf("Error: In prefetchReq() %s, %d\n", __FILE__, __LINE__); - pthread_exit(NULL); - } - - if((retval = recv((int)acceptfd, &control, sizeof(char), 0)) < 0) { - printf("%s() Error: Receiving control = %d at %s, %d\n", __func__, control, __FILE__, __LINE__); - pthread_exit(NULL); - } else if(retval == 0) { - printf("%s() Error: socket closed at the requesting side\n"); - pthread_exit(NULL); + break; } - + recv_data((int)acceptfd, &control, sizeof(char)); } while (control == TRANS_PREFETCH); - break; case TRANS_PREFETCH_RESPONSE: - if((val = getPrefetchResponse((int) acceptfd)) != 0) { - printf("Error: In getPrefetchResponse() %s, %d\n", __FILE__, __LINE__); - pthread_exit(NULL); - } + //do { + if((val = getPrefetchResponse((int) acceptfd)) != 0) { + printf("Error: In getPrefetchResponse() %s, %d\n", __FILE__, __LINE__); + pthread_exit(NULL); + } + //} while (control == TRANS_PREFETCH_RESPONSE); break; case START_REMOTE_THREAD: - retval = recv((int)acceptfd, &oid, sizeof(unsigned int), 0); - if (retval <= 0) - perror("dstmAccept(): error receiving START_REMOTE_THREAD msg"); - else if (retval != sizeof(unsigned int)) - printf("dstmAccept(): incorrect msg size %d for START_REMOTE_THREAD %s, %d\n", - retval, __FILE__, __LINE__); - else - { - objType = getObjType(oid); - startDSMthread(oid, objType); - } + recv_data((int)acceptfd, &oid, sizeof(unsigned int)); + objType = getObjType(oid); + startDSMthread(oid, objType); break; case THREAD_NOTIFY_REQUEST: - retval = recv((int)acceptfd, &numoid, sizeof(unsigned int), 0); + recv_data((int)acceptfd, &numoid, sizeof(unsigned int)); size = (sizeof(unsigned int) + sizeof(unsigned short)) * numoid + 2 * sizeof(unsigned int); if((buffer = calloc(1,size)) == NULL) { printf("%s() Calloc error at %s, %d\n", __func__, __FILE__, __LINE__); pthread_exit(NULL); } - sum = 0; - do { - sum += recv((int)acceptfd, buffer+sum, size-sum, 0); - } while(sum < size); + + recv_data((int)acceptfd, buffer, size); oidarry = calloc(numoid, sizeof(unsigned int)); memcpy(oidarry, buffer, sizeof(unsigned int) * numoid); @@ -258,10 +243,7 @@ void *dstmAccept(void *acceptfd) pthread_exit(NULL); } - sum = 0; - do { - sum += recv((int)acceptfd, buffer+sum, size-sum, 0); - } while(sum < size); + recv_data((int)acceptfd, buffer, size); oid = *((unsigned int *)buffer); size = sizeof(unsigned int); @@ -290,41 +272,30 @@ int readClientReq(trans_commit_data_t *transinfo, int acceptfd) { unsigned int *oidmod, oid; fixed_data_t fixed; objheader_t *headaddr; - int sum = 0, i, N, n, val; + int sum, i, size, n, val; oidmod = NULL; /* Read fixed_data_t data structure */ - N = sizeof(fixed) - 1; + size = sizeof(fixed) - 1; ptr = (char *)&fixed;; fixed.control = TRANS_REQUEST; - do { - n = recv((int)acceptfd, (void *) ptr+1+sum, N-sum, 0); - sum += n; - } while(sum < N && n != 0); + recv_data((int)acceptfd, ptr+1, size); /* Read list of mids */ int mcount = fixed.mcount; - N = mcount * sizeof(unsigned int); + size = mcount * sizeof(unsigned int); unsigned int listmid[mcount]; ptr = (char *) listmid; - sum = 0; - do { - n = recv((int)acceptfd, (void *) ptr+sum, N-sum, 0); - sum += n; - } while(sum < N && n != 0); - + recv_data((int)acceptfd, ptr, size); + /* Read oid and version tuples for those objects that are not modified in the transaction */ int numread = fixed.numread; - N = numread * (sizeof(unsigned int) + sizeof(unsigned short)); - char objread[N]; + size = numread * (sizeof(unsigned int) + sizeof(unsigned short)); + char objread[size]; if(numread != 0) { //If pile contains more than one object to be read, // keep reading all objects - sum = 0; - do { - n = recv((int)acceptfd, (void *) objread+sum, N-sum, 0); - sum += n; - } while(sum < N && n != 0); + recv_data((int)acceptfd, objread, size); } /* Read modified objects */ @@ -333,11 +304,8 @@ int readClientReq(trans_commit_data_t *transinfo, int acceptfd) { printf("calloc error for modified objects %s, %d\n", __FILE__, __LINE__); return 1; } - sum = 0; - do { // Recv the objs that are modified by the Coordinator - n = recv((int)acceptfd, (char *) modptr+sum, fixed.sum_bytes-sum, 0); - sum += n; - } while (sum < fixed.sum_bytes && n != 0); + size = fixed.sum_bytes; + recv_data((int)acceptfd, modptr, size); } /* Create an array of oids for modified objects */ @@ -391,10 +359,8 @@ int processClientReq(fixed_data_t *fixed, trans_commit_data_t *transinfo, return 1; } - do { - retval = recv((int)acceptfd, &control, sizeof(char), 0); - } while(retval < sizeof(char)); - + recv_data((int)acceptfd, &control, sizeof(char)); + /* Process the new control message */ switch(control) { case TRANS_ABORT: @@ -411,17 +377,7 @@ int processClientReq(fixed_data_t *fixed, trans_commit_data_t *transinfo, /* Send ack to Coordinator */ sendctrl = TRANS_UNSUCESSFUL; - if(send((int)acceptfd, &sendctrl, sizeof(char), MSG_NOSIGNAL) < sizeof(char)) { - perror("Error: In sending ACK to coordinator\n"); - if (transinfo->objlocked != NULL) { - free(transinfo->objlocked); - } - if (transinfo->objnotfound != NULL) { - free(transinfo->objnotfound); - } - - return 1; - } + send_data((int)acceptfd, &sendctrl, sizeof(char)); break; case TRANS_COMMIT: @@ -522,10 +478,7 @@ char handleTransReq(fixed_data_t *fixed, trans_commit_data_t *transinfo, unsigne } free(oidlocked); } - if((val = send(acceptfd, &control, sizeof(char), MSG_NOSIGNAL)) < sizeof(char)) { - perror("Error in sending control to the Coordinator\n"); - return 0; - } + send_data(acceptfd, &control, sizeof(char)); return control; } } else {/* If Obj is not locked then lock object */ @@ -550,11 +503,7 @@ char handleTransReq(fixed_data_t *fixed, trans_commit_data_t *transinfo, unsigne } /* Send TRANS_DISAGREE to Coordinator */ - if((val = send(acceptfd, &control, sizeof(char), MSG_NOSIGNAL)) < sizeof(char)) { - perror("Error in sending control to the Coordinator\n"); - return 0; - } - + send_data(acceptfd, &control, sizeof(char)); return control; } } @@ -583,34 +532,22 @@ char decideCtrlMessage(fixed_data_t *fixed, trans_commit_data_t *transinfo, int if(*(v_matchnolock) == fixed->numread + fixed->nummod) { control = TRANS_AGREE; /* Send control message */ - if((val = send(acceptfd, &control, sizeof(char), MSG_NOSIGNAL)) < sizeof(char)) { - perror("Error in sending control to Coordinator\n"); - return 0; - } + send_data(acceptfd, &control, sizeof(char)); } /* Condition to send TRANS_SOFT_ABORT */ if((*(v_matchlock) > 0 && *(v_nomatch) == 0) || (*(objnotfound) > 0 && *(v_nomatch) == 0)) { control = TRANS_SOFT_ABORT; /* Send control message */ - if((val = send(acceptfd, &control, sizeof(char),MSG_NOSIGNAL)) < sizeof(char)) { - perror("Error in sending TRANS_SOFT_ABORT control\n"); - return 0; - } - + send_data(acceptfd, &control, sizeof(char)); + /* Send number of oids not found and the missing oids if objects are missing in the machine */ if(*(objnotfound) != 0) { int msg[1]; msg[0] = *(objnotfound); - if((val = send(acceptfd, msg, sizeof(int) ,MSG_NOSIGNAL)) < sizeof(int)) { - perror("Error in sending objects that are not found\n"); - return 0; - } + send_data(acceptfd, &msg, sizeof(int)); int size = sizeof(unsigned int)* *(objnotfound); - if((val = send(acceptfd, oidnotfound, size ,MSG_NOSIGNAL)) < size) { - perror("Error in sending objects that are not found\n"); - return 0; - } + send_data(acceptfd, oidnotfound, size); } } @@ -668,11 +605,7 @@ int transCommitProcess(void *modptr, unsigned int *oidmod, unsigned int *oidlock /* Send ack to coordinator */ control = TRANS_SUCESSFUL; - if(send((int)acceptfd, &control, sizeof(char), MSG_NOSIGNAL) < sizeof(char)) { - perror("Error sending ACK to coordinator\n"); - return 1; - } - + send_data((int)acceptfd, &control, sizeof(char)); return 0; } @@ -683,27 +616,22 @@ int transCommitProcess(void *modptr, unsigned int *oidmod, unsigned int *oidlock int prefetchReq(int acceptfd) { int i, size, objsize, numbytes = 0, isArray = 0, numoffset = 0; - int length, sd; + int length, sd = -1; char *recvbuffer, *sendbuffer, control; unsigned int oid, mid; short *offsetarry; objheader_t *header; struct sockaddr_in remoteAddr; - while((numbytes = recv((int)acceptfd, &length, sizeof(int), 0)) != 0) { - if(length == -1) { //-1 is special character to represent end of sending oids and offsets - break; - } else { - numbytes = 0; + do { + recv_data((int)acceptfd, &length, sizeof(int)); + if(length != -1) { size = length - sizeof(int); if((recvbuffer = calloc(1, size)) == NULL) { printf("Calloc error at %s,%d\n", __FILE__, __LINE__); return -1; } - while(numbytes < size) { - numbytes += recv((int)acceptfd, recvbuffer+numbytes, size-numbytes, 0); - } - + recv_data((int)acceptfd, recvbuffer, size); oid = *((unsigned int *) recvbuffer); mid = *((unsigned int *) (recvbuffer + sizeof(unsigned int))); size = size - (2 * sizeof(unsigned int)); @@ -715,23 +643,54 @@ int prefetchReq(int acceptfd) { } memcpy(offsetarry, recvbuffer + (2 * sizeof(unsigned int)), size); free(recvbuffer); - - /* Create socket to send information */ - if ((sd = socket(AF_INET, SOCK_STREAM, 0)) < 0){ - perror("prefetchReq():socket()"); - return -1; +#if 0 + pthread_mutex_lock(&sockLock); + sockIdFound = 0; + pthread_mutex_unlock(&sockLock); + for(i = 0; i < NUM_MACHINES; i++) { + if(sockArray[i].mid == mid) { + sd = sockArray[i].sockid; + pthread_mutex_lock(&sockLock); + sockIdFound = 1; + pthread_mutex_unlock(&sockLock); + break; + } } - bzero(&remoteAddr, sizeof(remoteAddr)); - remoteAddr.sin_family = AF_INET; - remoteAddr.sin_port = htons(LISTEN_PORT); - remoteAddr.sin_addr.s_addr = htonl(mid); - - if (connect(sd, (struct sockaddr *)&remoteAddr, sizeof(remoteAddr)) < 0){ - printf("Error: prefetchReq():error %d connecting to %s:%d\n", errno, - inet_ntoa(remoteAddr.sin_addr), LISTEN_PORT); - close(sd); - return -1; + + if(sockIdFound == 0) { + if(sockCount < NUM_MACHINES) { + +#endif + /* Create socket to send information */ + if ((sd = socket(AF_INET, SOCK_STREAM, 0)) < 0){ + perror("prefetchReq():socket()"); + return -1; + } + bzero(&remoteAddr, sizeof(remoteAddr)); + remoteAddr.sin_family = AF_INET; + remoteAddr.sin_port = htons(LISTEN_PORT); + remoteAddr.sin_addr.s_addr = htonl(mid); + + if (connect(sd, (struct sockaddr *)&remoteAddr, sizeof(remoteAddr)) < 0){ + printf("Error: prefetchReq():error %d connecting to %s:%d\n", errno, + inet_ntoa(remoteAddr.sin_addr), LISTEN_PORT); + close(sd); + return -1; + } + +#if 0 + sockArray[sockCount].mid = mid; + sockArray[sockCount].sockid = sd; + pthread_mutex_lock(&sockLock); + sockCount++; + pthread_mutex_unlock(&sockLock); + } else { + //TODO Fix for connecting to more than 2 machines && close socket + printf("%s(): Error: Currently works for only 2 machines\n", __func__); + return -1; + } } +#endif /*Process each oid */ if ((header = mhashSearch(oid)) == NULL) {/* Obj not found */ @@ -781,6 +740,7 @@ int prefetchReq(int acceptfd) { close(sd); return -1; } + /* Calculate the oid corresponding to the offset value */ for(i = 0 ; i< numoffset ; i++) { /* Check for arrays */ @@ -852,29 +812,19 @@ int prefetchReq(int acceptfd) { } free(offsetarry); } + close(sd); } - } - close(sd); + } while (length != -1); return 0; } int sendPrefetchResponse(int sd, char *control, char *sendbuffer, int *size) { int numbytes = 0; - if((numbytes = send(sd, control, sizeof(char), MSG_NOSIGNAL)) < sizeof(char)) { - printf("%s() Error: in sending PREFETCH RESPONSE to Coordinator at %s, %d\n", __func__, __FILE__, __LINE__); - free(sendbuffer); - return -1; - } - + send_data(sd, control, sizeof(char)); /* Send the buffer with its size */ - if((numbytes = send(sd, sendbuffer, *(size), MSG_NOSIGNAL)) < *(size)) { - printf("%s() Error: in sending oid found at %s, %d size sent = %d, actual size = %d\n", - __func__, __FILE__, __LINE__, numbytes, *(size)); - free(sendbuffer); - return -1; - } - + int length = *(size); + send_data(sd, sendbuffer, length); free(sendbuffer); return 0; } @@ -933,21 +883,8 @@ checkversion: *((unsigned short *)(&msg[1]+size)) = newversion; size += sizeof(unsigned short); *((unsigned int *)(&msg[1]+size)) = threadid; - bytesSent = send(sd, msg, 1+ 2*sizeof(unsigned int) + sizeof(unsigned short), 0); - if (bytesSent < 0){ - perror("processReqNotify():send()"); - close(sd); - return; - } else if (bytesSent != 1 + sizeof(unsigned short) + 2*sizeof(unsigned int)){ - printf("Error: processReqNotify(): error, sent %d bytes %s, %d\n", - bytesSent, __FILE__, __LINE__); - close(sd); - return; - } else { - close(sd); - return; - } - + size = 1+ 2*sizeof(unsigned int) + sizeof(unsigned short); + send_data(sd, msg, size); } close(sd); } @@ -961,4 +898,3 @@ checkversion: free(oidarry); free(versionarry); } - diff --git a/Robust/src/Runtime/DSTM/interface/trans.c b/Robust/src/Runtime/DSTM/interface/trans.c index e9d2d90b..485e7121 100644 --- a/Robust/src/Runtime/DSTM/interface/trans.c +++ b/Robust/src/Runtime/DSTM/interface/trans.c @@ -25,7 +25,6 @@ #define LISTEN_PORT 2156 #define NUM_THREADS 1 #define PREFETCH_CACHE_SIZE 1048576 //1MB -#define NUM_MACHINES 2 #define CONFIG_FILENAME "dstm.conf" /* Global Variables */ @@ -49,14 +48,52 @@ unsigned int oidsPerBlock; unsigned int oidMin; unsigned int oidMax; -/* Global variables to track mapping of socketid and remote mid */ +/************************************************************ + * Global variables to map socketid and remote mid to + * reuse sockets + ***********************************************************/ midSocketInfo_t midSocketArray[NUM_MACHINES]; -int sockCount = 0; -int sockIdFound; +int sockCount; //number of connections with all remote machines(one socket per mc) +int sockIdFound; //track if socket file descriptor is already established void printhex(unsigned char *, int); plistnode_t *createPiles(transrecord_t *); +/******************************* + * Send and Recv function calls + *******************************/ +void send_data(int fd , void *buf, int buflen) { + char *buffer = (char *)(buf); + int size = buflen; + int numbytes; + while (size > 0) { + numbytes = send(fd, buffer, size, MSG_NOSIGNAL); + if (numbytes == -1) { + perror("send"); + printf("error: at %s, %d\n", __FILE__, __LINE__); + exit(-1); + } + buffer += numbytes; + size -= numbytes; + } +} + +void recv_data(int fd , void *buf, int buflen) { + char *buffer = (char *)(buf); + int size = buflen; + int numbytes; + while (size > 0) { + numbytes = recv(fd, buffer, size, 0); + if (numbytes == -1) { + perror("recv"); + printf("error: at %s, %d\n", __FILE__, __LINE__); + exit(-1); + } + buffer += numbytes; + size -= numbytes; + } +} + void printhex(unsigned char *ptr, int numBytes) { int i; @@ -208,6 +245,7 @@ void transInit() { pthread_detach(tPrefetch); //Initialize mid to socketid mapping array + sockCount = 0; for(t = 0; t < NUM_MACHINES; t++) { midSocketArray[t].mid = 0; midSocketArray[t].sockid = 0; @@ -685,31 +723,18 @@ void *transRequest(void *threadarg) { } /* Send bytes of data with TRANS_REQUEST control message */ - if (send(sd, &(tdata->buffer->f), sizeof(fixed_data_t),MSG_NOSIGNAL) < sizeof(fixed_data_t)) { - perror("Error sending fixed bytes for thread\n"); - close(sd); - pthread_exit(NULL); - } - + send_data(sd, &(tdata->buffer->f), sizeof(fixed_data_t)); + /* Send list of machines involved in the transaction */ { int size=sizeof(unsigned int)*tdata->buffer->f.mcount; - if (send(sd, tdata->buffer->listmid, size, MSG_NOSIGNAL) < size) { - perror("Error sending list of machines for thread\n"); - close(sd); - pthread_exit(NULL); - } + send_data(sd, tdata->buffer->listmid, size); } /* Send oids and version number tuples for objects that are read */ { int size=(sizeof(unsigned int)+sizeof(unsigned short))*tdata->buffer->f.numread; - - if (send(sd, tdata->buffer->objread, size, MSG_NOSIGNAL) < size) { - perror("Error sending tuples for thread\n"); - close(sd); - pthread_exit(NULL); - } + send_data(sd, tdata->buffer->objread, size); } /* Send objects that are modified */ @@ -718,20 +743,11 @@ void *transRequest(void *threadarg) { headeraddr = chashSearch(tdata->rec->lookupTable, tdata->buffer->oidmod[i]); GETSIZE(size,headeraddr); size+=sizeof(objheader_t); - if (send(sd, headeraddr, size, MSG_NOSIGNAL) < size) { - perror("Error sending obj modified for thread\n"); - close(sd); - pthread_exit(NULL); - } + send_data(sd, headeraddr, size); } /* Read control message from Participant */ - if((n = read(sd, &control, sizeof(char))) <= 0) { - perror("Error in reading control message from Participant\n"); - close(sd); - pthread_exit(NULL); - } - + recv_data(sd, &control, sizeof(char)); recvcontrol = control; /* Update common data structure and increment count */ @@ -760,9 +776,7 @@ void *transRequest(void *threadarg) { pthread_exit(NULL); } - do { - retval = recv((int)sd, &control, sizeof(char), 0); - } while (retval < sizeof(char)); + recv_data((int)sd, &control, sizeof(char)); if(control == TRANS_UNSUCESSFUL) { //printf("DEBUG-> TRANS_ABORTED\n"); @@ -830,32 +844,28 @@ void decideResponse(thread_data_array_t *tdata) { * It returns a char that is only needed to check the correctness of execution of this function inside * transRequest()*/ char sendResponse(thread_data_array_t *tdata, int sd) { - int n, N, sum, oidcount = 0, control; + int n, size, sum, oidcount = 0, control; char *ptr, retval = 0; unsigned int *oidnotfound; control = *(tdata->replyctrl); - if (send(sd, &control, sizeof(char), MSG_NOSIGNAL) < sizeof(char)) { - perror("Error sending ctrl message for participant\n"); - return 0; - } + send_data(sd, &control, sizeof(char)); - //FIXME read missing objects + //TODO read missing objects to be used during object migration /* If the decided response is due to a soft abort and missing objects at the Participant's side */ /* if(tdata->recvmsg[tdata->thread_id].rcv_status == TRANS_SOFT_ABORT) { // Read list of objects missing - if((read(sd, &oidcount, sizeof(int)) != 0) && (oidcount != 0)) { - N = oidcount * sizeof(unsigned int); + recv_data(sd, &oidcount, sizeof(int)); + //if((read(sd, &oidcount, sizeof(int)) != 0) && (oidcount != 0)) { + if(oidcount != 0) { + size = oidcount * sizeof(unsigned int); if((oidnotfound = calloc(oidcount, sizeof(unsigned int))) == NULL) { printf("Calloc error %s, %d\n", __FILE__, __LINE__); return 0; } ptr = (char *) oidnotfound; - do { - n = read(sd, ptr+sum, N-sum); - sum += n; - } while(sum < N && n !=0); + recv_data(sd, ptr, size); } retval = TRANS_SOFT_ABORT; } @@ -905,31 +915,20 @@ void *getRemoteObj(transrecord_t *record, unsigned int mnum, unsigned int oid) { char readrequest[sizeof(char)+sizeof(unsigned int)]; readrequest[0] = READ_REQUEST; *((unsigned int *)(&readrequest[1])) = oid; - if (send(sd, readrequest, sizeof(readrequest), MSG_NOSIGNAL) < sizeof(readrequest)) { - perror("getRemoteObj(): error sending message\n"); - return NULL; - } + send_data(sd, readrequest, sizeof(readrequest)); /* Read response from the Participant */ - if((val = read(sd, &control, sizeof(char))) <= 0) { - printf("getRemoteObj(): error no response, %d\n", val); - return NULL; - } + recv_data(sd, &control, sizeof(char)); switch(control) { case OBJECT_NOT_FOUND: return NULL; case OBJECT_FOUND: /* Read object if found into local cache */ - if((val = read(sd, &size, sizeof(int))) <= 0) { - perror("getRemoteObj(): error in reading size\n"); - return NULL; - } + recv_data(sd, &size, sizeof(int)); objcopy = objstrAlloc(record->cache, size); - int sum = 0; - while (sum < size) { - sum += read(sd, (char *)objcopy+sum, size-sum); - } + recv_data(sd, objcopy, size); + /* Insert into cache's lookup table */ chashInsert(record->lookupTable, oid, objcopy); break; @@ -1599,10 +1598,7 @@ void sendPrefetchReq(prefetchpile_t *mcpilenode, int sd) { /* Send TRANS_PREFETCH control message */ control = TRANS_PREFETCH; - if(send(sd, &control, sizeof(char), MSG_NOSIGNAL) < sizeof(char)) { - perror("sendPrefetchReq() Sending TRANS_PREFETCH"); - return; - } + send_data(sd, &control, sizeof(char)); /* Send Oids and offsets in pairs */ tmp = mcpilenode->objpiles; @@ -1622,21 +1618,14 @@ void sendPrefetchReq(prefetchpile_t *mcpilenode, int sd) { *((short*)(oidnoffset + off)) = tmp->offset[i]; off+=sizeof(short); } - if (send(sd, oidnoffset, len , MSG_NOSIGNAL) < len) { - perror("Sending oids and offsets"); - return; - } - + send_data(sd, oidnoffset, len); tmp = tmp->next; } /* Send a special char -1 to represent the end of sending oids + offset pair to remote machine */ endpair = -1; - if (send(sd, &endpair, sizeof(int), MSG_NOSIGNAL) < sizeof(int)) { - perror("Error sending endpair\n"); - return; - } - + send_data(sd, &endpair, sizeof(int)); + return; } @@ -1646,67 +1635,61 @@ int getPrefetchResponse(int sd) { unsigned int oid; void *modptr, *oldptr; - if((numbytes = recv((int)sd, &length, sizeof(int), 0)) <= 0) { - printf("%s() Error: in receiving length at %s, %d\n", __func__, __FILE__, __LINE__); + recv_data((int)sd, &length, sizeof(int)); + size = length - sizeof(int); + if((recvbuffer = calloc(1, size)) == NULL) { + printf("%s() Calloc error at %s,%d\n", __func__, __FILE__, __LINE__); return -1; - } else { + } + recv_data((int)sd, recvbuffer, size); + + control = *((char *) recvbuffer); + if(control == OBJECT_FOUND) { numbytes = 0; - size = length - sizeof(int); - if((recvbuffer = calloc(1, size)) == NULL) { - printf("%s() Calloc error at %s,%d\n", __func__, __FILE__, __LINE__); + oid = *((unsigned int *)(recvbuffer + sizeof(char))); + size = size - (sizeof(char) + sizeof(unsigned int)); + pthread_mutex_lock(&prefetchcache_mutex); + if ((modptr = objstrAlloc(prefetchcache, size)) == NULL) { + printf("Error: objstrAlloc error for copying into prefetch cache %s, %d\n", __FILE__, __LINE__); + pthread_mutex_unlock(&prefetchcache_mutex); + free(recvbuffer); return -1; } - while(numbytes < size) { - numbytes += recv((int)sd, recvbuffer+numbytes, size-numbytes, 0); - } - - control = *((char *) recvbuffer); - if(control == OBJECT_FOUND) { - numbytes = 0; - oid = *((unsigned int *)(recvbuffer + sizeof(char))); - size = size - (sizeof(char) + sizeof(unsigned int)); - pthread_mutex_lock(&prefetchcache_mutex); - if ((modptr = objstrAlloc(prefetchcache, size)) == NULL) { - printf("Error: objstrAlloc error for copying into prefetch cache %s, %d\n", __FILE__, __LINE__); - pthread_mutex_unlock(&prefetchcache_mutex); - free(recvbuffer); - return -1; - } - pthread_mutex_unlock(&prefetchcache_mutex); - memcpy(modptr, recvbuffer + sizeof(char) + sizeof(unsigned int), size); - - /* Insert the oid and its address into the prefetch hash lookup table */ - /* Do a version comparison if the oid exists */ - if((oldptr = prehashSearch(oid)) != NULL) { - /* If older version then update with new object ptr */ - if(((objheader_t *)oldptr)->version <= ((objheader_t *)modptr)->version) { - prehashRemove(oid); - prehashInsert(oid, modptr); - } else { - /* TODO modptr should be reference counted */ - } - } else {/* Else add the object ptr to hash table*/ + pthread_mutex_unlock(&prefetchcache_mutex); + memcpy(modptr, recvbuffer + sizeof(char) + sizeof(unsigned int), size); + + /* Insert the oid and its address into the prefetch hash lookup table */ + /* Do a version comparison if the oid exists */ + if((oldptr = prehashSearch(oid)) != NULL) { + /* If older version then update with new object ptr */ + if(((objheader_t *)oldptr)->version <= ((objheader_t *)modptr)->version) { + prehashRemove(oid); prehashInsert(oid, modptr); + } else { + /* TODO modptr should be reference counted */ } - /* Lock the Prefetch Cache look up table*/ - pthread_mutex_lock(&pflookup.lock); - /* Broadcast signal on prefetch cache condition variable */ - pthread_cond_broadcast(&pflookup.cond); - /* Unlock the Prefetch Cache look up table*/ - pthread_mutex_unlock(&pflookup.lock); - } else if(control == OBJECT_NOT_FOUND) { - oid = *((unsigned int *)(recvbuffer + sizeof(char))); - /* TODO: For each object not found query DHT for new location and retrieve the object */ - /* Throw an error */ - printf("OBJECT %x NOT FOUND.... THIS SHOULD NOT HAPPEN...TERMINATE PROGRAM\n", oid); - free(recvbuffer); - exit(-1); - } else { - printf("Error: in decoding the control value %d, %s, %d\n",control, __FILE__, __LINE__); + } else {/* Else add the object ptr to hash table*/ + prehashInsert(oid, modptr); } + /* Lock the Prefetch Cache look up table*/ + pthread_mutex_lock(&pflookup.lock); + /* Broadcast signal on prefetch cache condition variable */ + pthread_cond_broadcast(&pflookup.cond); + /* Unlock the Prefetch Cache look up table*/ + pthread_mutex_unlock(&pflookup.lock); + } else if(control == OBJECT_NOT_FOUND) { + oid = *((unsigned int *)(recvbuffer + sizeof(char))); + /* TODO: For each object not found query DHT for new location and retrieve the object */ + /* Throw an error */ + printf("OBJECT %x NOT FOUND.... THIS SHOULD NOT HAPPEN...TERMINATE PROGRAM\n", oid); free(recvbuffer); + exit(-1); + } else { + printf("Error: in decoding the control value %d, %s, %d\n",control, __FILE__, __LINE__); } + free(recvbuffer); + return 0; } @@ -1762,22 +1745,7 @@ int startRemoteThread(unsigned int oid, unsigned int mid) { msg[0] = START_REMOTE_THREAD; memcpy(&msg[1], &oid, sizeof(unsigned int)); - - bytesSent = send(sock, msg, 1 + sizeof(unsigned int), 0); - if (bytesSent < 0) - { - perror("startRemoteThread():send()"); - status = -1; - } - else if (bytesSent != 1 + sizeof(unsigned int)) - { - printf("startRemoteThread(): error, sent %d bytes\n", bytesSent); - status = -1; - } - else - { - status = 0; - } + send_data(sock, msg, 1 + sizeof(unsigned int)); } close(sock); @@ -1989,17 +1957,8 @@ int reqNotify(unsigned int *oidarry, unsigned short *versionarry, unsigned int n *((unsigned int *)(&msg[1] + size)) = threadid; pthread_mutex_lock(&(ndata->threadnotify)); - bytesSent = send(sock, msg, 1 + numoid * (sizeof(unsigned int) + sizeof(unsigned short)) + 3 * sizeof(unsigned int) , 0); - if (bytesSent < 0){ - perror("reqNotify():send()"); - status = -1; - } else if (bytesSent != 1 + numoid*(sizeof(unsigned int) + sizeof(unsigned short)) + 3 * sizeof(unsigned int)){ - printf("reNotify(): error, sent %d bytes %s, %d\n", bytesSent, __FILE__, __LINE__); - status = -1; - } else { - status = 0; - } - + size = 1 + numoid * (sizeof(unsigned int) + sizeof(unsigned short)) + 3 * sizeof(unsigned int); + send_data(sock, msg, size); pthread_cond_wait(&(ndata->threadcond), &(ndata->threadnotify)); pthread_mutex_unlock(&(ndata->threadnotify)); } @@ -2080,17 +2039,8 @@ int notifyAll(threadlist_t **head, unsigned int oid, unsigned int version) { size+= sizeof(unsigned short); *((unsigned int *)(&msg[1]+ size)) = ptr->threadid; - bytesSent = send(sock, msg, (1 + 2*sizeof(unsigned int) + sizeof(unsigned short)), 0); - if (bytesSent < 0){ - perror("notifyAll():send()"); - status = -1; - } else if (bytesSent != 1 + 2*sizeof(unsigned int) + sizeof(unsigned short)){ - printf("notifyAll(): error, sent %d bytes %s, %d\n", - bytesSent, __FILE__, __LINE__); - status = -1; - } else { - status = 0; - } + size = 1 + 2*sizeof(unsigned int) + sizeof(unsigned short); + send_data(sock, msg, size); } //close socket close(sock);