From 1b1739855b2131672e6f2fceed1a4cc384de4845 Mon Sep 17 00:00:00 2001 From: bdemsky Date: Mon, 5 May 2008 07:28:51 +0000 Subject: [PATCH] alternate fix for fd's.... issue is that ==0 indicates socket is closed...but we need to close our end, not return... --- .../src/Runtime/DSTM/interface/dstmserver.c | 271 +++++++++--------- 1 file changed, 128 insertions(+), 143 deletions(-) diff --git a/Robust/src/Runtime/DSTM/interface/dstmserver.c b/Robust/src/Runtime/DSTM/interface/dstmserver.c index a5e2016b..d28eb808 100644 --- a/Robust/src/Runtime/DSTM/interface/dstmserver.c +++ b/Robust/src/Runtime/DSTM/interface/dstmserver.c @@ -16,7 +16,6 @@ extern int classsize[]; extern int numHostsInSystem; extern pthread_mutex_t notifymutex; -extern unsigned int myIpAddr; objstr_t *mainobjstore; pthread_mutex_t mainobjstore_mutex; @@ -30,92 +29,89 @@ sockPoolHashTable_t *transPResponseSocketPool; int dstmInit(void) { - mainobjstore = objstrCreate(DEFAULT_OBJ_STORE_SIZE); - /* Initialize attribute for mutex */ - pthread_mutexattr_init(&mainobjstore_mutex_attr); - pthread_mutexattr_settype(&mainobjstore_mutex_attr, PTHREAD_MUTEX_RECURSIVE_NP); - pthread_mutex_init(&mainobjstore_mutex, &mainobjstore_mutex_attr); - pthread_mutex_init(&lockObjHeader,NULL); - if (mhashCreate(HASH_SIZE, LOADFACTOR)) - return 1; //failure - - if (lhashCreate(HASH_SIZE, LOADFACTOR)) - return 1; //failure - - if (notifyhashCreate(N_HASH_SIZE, N_LOADFACTOR)) - return 1; //failure - - //Initialize socket pool - if((transPResponseSocketPool = createSockPool(transPResponseSocketPool, 2*numHostsInSystem+1)) == NULL) { - printf("Error in creating new socket pool at %s line %d\n", __FILE__, __LINE__); - return 0; - } - return 0; + mainobjstore = objstrCreate(DEFAULT_OBJ_STORE_SIZE); + /* Initialize attribute for mutex */ + pthread_mutexattr_init(&mainobjstore_mutex_attr); + pthread_mutexattr_settype(&mainobjstore_mutex_attr, PTHREAD_MUTEX_RECURSIVE_NP); + pthread_mutex_init(&mainobjstore_mutex, &mainobjstore_mutex_attr); + pthread_mutex_init(&lockObjHeader,NULL); + if (mhashCreate(HASH_SIZE, LOADFACTOR)) + return 1; //failure + + if (lhashCreate(HASH_SIZE, LOADFACTOR)) + return 1; //failure + + if (notifyhashCreate(N_HASH_SIZE, N_LOADFACTOR)) + return 1; //failure + + //Initialize socket pool + if((transPResponseSocketPool = createSockPool(transPResponseSocketPool, 2*numHostsInSystem+1)) == NULL) { + printf("Error in creating new socket pool at %s line %d\n", __FILE__, __LINE__); + return 0; + } + + return 0; } /* This function starts the thread to listen on a socket * for tranaction calls */ void *dstmListen() { - int listenfd, acceptfd; - struct sockaddr_in my_addr; - struct sockaddr_in client_addr; - socklen_t addrlength = sizeof(struct sockaddr); - pthread_t thread_dstm_accept; - int i; - int setsockflag=1; - - listenfd = socket(AF_INET, SOCK_STREAM, 0); - if (listenfd == -1) - { - perror("socket"); - exit(1); - } + int listenfd, acceptfd; + struct sockaddr_in my_addr; + struct sockaddr_in client_addr; + socklen_t addrlength = sizeof(struct sockaddr); + pthread_t thread_dstm_accept; + int i; + int setsockflag=1; + + listenfd = socket(AF_INET, SOCK_STREAM, 0); + if (listenfd == -1) + { + perror("socket"); + exit(1); + } - if (setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, &setsockflag, sizeof (setsockflag)) < 0) { - perror("socket"); - exit(1); - } + if (setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, &setsockflag, sizeof (setsockflag)) < 0) { + perror("socket"); + exit(1); + } #ifdef MAC - if (setsockopt(listenfd, SOL_SOCKET, SO_NOSIGPIPE, &setsockflag, sizeof (setsockflag)) < 0) { - perror("socket"); - exit(1); - } + if (setsockopt(listenfd, SOL_SOCKET, SO_NOSIGPIPE, &setsockflag, sizeof (setsockflag)) < 0) { + perror("socket"); + exit(1); + } #endif - my_addr.sin_family = AF_INET; - my_addr.sin_port = htons(LISTEN_PORT); - my_addr.sin_addr.s_addr = INADDR_ANY; - memset(&(my_addr.sin_zero), '\0', 8); - - if (bind(listenfd, (struct sockaddr *)&my_addr, addrlength) == -1) - { - perror("bind"); - exit(1); - } + my_addr.sin_family = AF_INET; + my_addr.sin_port = htons(LISTEN_PORT); + my_addr.sin_addr.s_addr = INADDR_ANY; + memset(&(my_addr.sin_zero), '\0', 8); - if (listen(listenfd, BACKLOG) == -1) - { - perror("listen"); - exit(1); - } + if (bind(listenfd, (struct sockaddr *)&my_addr, addrlength) == -1) + { + perror("bind"); + exit(1); + } + + if (listen(listenfd, BACKLOG) == -1) + { + perror("listen"); + exit(1); + } - printf("Listening on port %d, fd = %d\n", LISTEN_PORT, listenfd); - while(1) - { - int retval; - int flag=1; - acceptfd = accept(listenfd, (struct sockaddr *)&client_addr, &addrlength); - if (acceptfd < 0) { - perror("Error in accept: "); - printf("error %x", acceptfd); fflush(stdout); - } - setsockopt(acceptfd, IPPROTO_TCP, TCP_NODELAY, (char *) &flag, sizeof(flag)); - do { - retval=pthread_create(&thread_dstm_accept, NULL, dstmAccept, (void *)acceptfd); - } while(retval!=0); - pthread_detach(thread_dstm_accept); - } + printf("Listening on port %d, fd = %d\n", LISTEN_PORT, listenfd); + while(1) + { + int retval; + int flag=1; + acceptfd = accept(listenfd, (struct sockaddr *)&client_addr, &addrlength); + setsockopt(acceptfd, IPPROTO_TCP, TCP_NODELAY, (char *) &flag, sizeof(flag)); + do { + retval=pthread_create(&thread_dstm_accept, NULL, dstmAccept, (void *)acceptfd); + } while(retval!=0); + pthread_detach(thread_dstm_accept); + } } /* This function accepts a new connection request, decodes the control message in the connection * and accordingly calls other functions to process new requests */ @@ -130,52 +126,50 @@ void *dstmAccept(void *acceptfd) { trans_commit_data_t transinfo; unsigned short objType, *versionarry, version; unsigned int *oidarry, numoid, mid, threadid; - + /* Receive control messages from other machines */ - if((retval = recv_data_errorcode((int)acceptfd, &control, sizeof(char))) <= 0) { - printf("%s() Error: Receiving control = %d at %s, %d\n", __func__, control, __FILE__, __LINE__); - pthread_exit(NULL); - } - - switch(control) { + while(1) { + int ret=recv_data_errorcode((int)acceptfd, &control, sizeof(char)); + if (ret==0) + break; + if (ret==-1) { + printf("DEBUG -> RECV Error!.. retrying\n"); + break; + } + switch(control) { case READ_REQUEST: - do { - /* Read oid requested and search if available */ - recv_data((int)acceptfd, &oid, sizeof(unsigned int)); - if((srcObj = mhashSearch(oid)) == NULL) { - printf("Error: Object 0x%x is not found in Main Object Store %s, %d\n", oid, __FILE__, __LINE__); - break; - } - h = (objheader_t *) srcObj; - GETSIZE(size, h); - size += sizeof(objheader_t); - sockid = (int) acceptfd; - - if (h == NULL) { - ctrl = OBJECT_NOT_FOUND; - send_data(sockid, &ctrl, sizeof(char)); - } else { - /* Type */ - char msg[]={OBJECT_FOUND, 0, 0, 0, 0}; - *((int *)&msg[1])=size; - send_data(sockid, &msg, sizeof(msg)); - send_data(sockid, h, size); - } - if((retval = recv_data_errorcode((int)acceptfd, &control, sizeof(char))) <= 0) { - pthread_exit(NULL); - } - } while(control == READ_REQUEST); + /* Read oid requested and search if available */ + recv_data((int)acceptfd, &oid, sizeof(unsigned int)); + if((srcObj = mhashSearch(oid)) == NULL) { + printf("Error: Object 0x%x is not found in Main Object Store %s, %d\n", oid, __FILE__, __LINE__); + break; + } + h = (objheader_t *) srcObj; + GETSIZE(size, h); + size += sizeof(objheader_t); + sockid = (int) acceptfd; + + if (h == NULL) { + ctrl = OBJECT_NOT_FOUND; + send_data(sockid, &ctrl, sizeof(char)); + } else { + /* Type */ + char msg[]={OBJECT_FOUND, 0, 0, 0, 0}; + *((int *)&msg[1])=size; + send_data(sockid, &msg, sizeof(msg)); + send_data(sockid, h, size); + } break; - + case READ_MULT_REQUEST: break; - + case MOVE_REQUEST: break; - + case MOVE_MULT_REQUEST: break; - + case TRANS_REQUEST: /* Read transaction request */ transinfo.objlocked = NULL; @@ -184,48 +178,38 @@ void *dstmAccept(void *acceptfd) { transinfo.numlocked = 0; transinfo.numnotfound = 0; if((val = readClientReq(&transinfo, (int)acceptfd)) != 0) { - printf("Error: In readClientReq() %s, %d\n", __FILE__, __LINE__); - break; + printf("Error: In readClientReq() %s, %d\n", __FILE__, __LINE__); + pthread_exit(NULL); } break; case TRANS_PREFETCH: - do { - if((val = prefetchReq((int)acceptfd)) != 0) { - printf("Error: In prefetchReq() %s, %d\n", __FILE__, __LINE__); - break; - } - if((retval = recv_data_errorcode((int)acceptfd, &control, sizeof(char))) <= 0) { - pthread_exit(NULL); - } - } while (control == TRANS_PREFETCH); + if((val = prefetchReq((int)acceptfd)) != 0) { + printf("Error: In prefetchReq() %s, %d\n", __FILE__, __LINE__); + break; + } break; case TRANS_PREFETCH_RESPONSE: - do { - if((val = getPrefetchResponse((int) acceptfd)) != 0) { - printf("Error: In getPrefetchResponse() %s, %d\n", __FILE__, __LINE__); - break; - } - if((retval = recv_data_errorcode((int)acceptfd, &control, sizeof(char))) <= 0) { - pthread_exit(NULL); - } - } while (control == TRANS_PREFETCH_RESPONSE); + if((val = getPrefetchResponse((int) acceptfd)) != 0) { + printf("Error: In getPrefetchResponse() %s, %d\n", __FILE__, __LINE__); + break; + } break; case START_REMOTE_THREAD: recv_data((int)acceptfd, &oid, sizeof(unsigned int)); objType = getObjType(oid); startDSMthread(oid, objType); break; - + case THREAD_NOTIFY_REQUEST: recv_data((int)acceptfd, &numoid, sizeof(unsigned int)); size = (sizeof(unsigned int) + sizeof(unsigned short)) * numoid + 2 * sizeof(unsigned int); if((buffer = calloc(1,size)) == NULL) { - printf("%s() Calloc error at %s, %d\n", __func__, __FILE__, __LINE__); - pthread_exit(NULL); + printf("%s() Calloc error at %s, %d\n", __func__, __FILE__, __LINE__); + pthread_exit(NULL); } - + recv_data((int)acceptfd, buffer, size); - + oidarry = calloc(numoid, sizeof(unsigned int)); memcpy(oidarry, buffer, sizeof(unsigned int) * numoid); size = sizeof(unsigned int) * numoid; @@ -237,18 +221,18 @@ void *dstmAccept(void *acceptfd) { threadid = *((unsigned int *)(buffer+size)); processReqNotify(numoid, oidarry, versionarry, mid, threadid); free(buffer); - + break; case THREAD_NOTIFY_RESPONSE: size = sizeof(unsigned short) + 2 * sizeof(unsigned int); if((buffer = calloc(1,size)) == NULL) { - printf("%s() Calloc error at %s, %d\n", __func__, __FILE__, __LINE__); - pthread_exit(NULL); + printf("%s() Calloc error at %s, %d\n", __func__, __FILE__, __LINE__); + pthread_exit(NULL); } - + recv_data((int)acceptfd, buffer, size); - + oid = *((unsigned int *)buffer); size = sizeof(unsigned int); version = *((unsigned short *)(buffer+size)); @@ -263,9 +247,10 @@ void *dstmAccept(void *acceptfd) { default: printf("Error: dstmAccept() Unknown opcode %d at %s, %d\n", control, __FILE__, __LINE__); + } } -closeconnection: + closeconnection: /* Close connection */ if (close((int)acceptfd) == -1) perror("close"); -- 2.34.1