1 /* Coordinator => Machine that initiates the transaction request call for commiting a transaction
2 * Participant => Machines that host the objects involved in a transaction commit */
4 #include <netinet/tcp.h>
9 #include "threadnotify.h"
23 #define BACKLOG 10 //max pending connections
24 #define RECEIVE_BUFFER_SIZE 2048
26 extern int classsize[];
27 extern int numHostsInSystem;
28 extern pthread_mutex_t notifymutex;
30 extern unsigned int myIpAddr;
31 extern unsigned int *hostIpAddrs;
34 extern unsigned int *locateObjHosts;
35 extern int *liveHosts;
36 extern int numLiveHostsInSystem;
37 int clearNotifyListFlag;
40 objstr_t *mainobjstore;
41 pthread_mutex_t mainobjstore_mutex;
42 pthread_mutex_t lockObjHeader;
43 pthread_mutex_t clearNotifyList_mutex;
44 pthread_mutexattr_t mainobjstore_mutex_attr; /* Attribute for lock to make it a recursive lock */
46 sockPoolHashTable_t *transPResponseSocketPool;
47 extern sockPoolHashTable_t *transRequestSockPool;
48 extern sockPoolHashTable_t *transReadSockPool;
50 int failFlag = 0; //debug
53 /******************************
54 * Global variables for Paxos
55 ******************************/
57 extern unsigned int v_a;
61 extern int paxosRound;
62 /* This function initializes the main objects store and creates the
63 * global machine and location lookup table */
67 mainobjstore = objstrCreate(DEFAULT_OBJ_STORE_SIZE);
68 /* Initialize attribute for mutex */
69 pthread_mutexattr_init(&mainobjstore_mutex_attr);
70 pthread_mutexattr_settype(&mainobjstore_mutex_attr, PTHREAD_MUTEX_RECURSIVE_NP);
71 pthread_mutex_init(&mainobjstore_mutex, &mainobjstore_mutex_attr);
72 pthread_mutex_init(&lockObjHeader,NULL);
75 pthread_mutex_init(&liveHosts_mutex, NULL);
76 pthread_mutex_init(&leaderFixing_mutex, NULL);
77 pthread_mutex_init(&clearNotifyList_mutex,NULL);
80 if (mhashCreate(MHASH_SIZE, MLOADFACTOR))
83 if (lhashCreate(HASH_SIZE, LOADFACTOR))
87 if (thashCreate(THASH_SIZE, LOADFACTOR))
91 if (notifyhashCreate(N_HASH_SIZE, N_LOADFACTOR))
94 //Initialize socket pool
95 if((transPResponseSocketPool = createSockPool(transPResponseSocketPool, DEFAULTSOCKPOOLSIZE)) == NULL) {
96 printf("Error in creating new socket pool at %s line %d\n", __FILE__, __LINE__);
103 int startlistening() {
105 struct sockaddr_in my_addr;
106 socklen_t addrlength = sizeof(struct sockaddr);
109 listenfd = socket(AF_INET, SOCK_STREAM, 0);
110 if (listenfd == -1) {
115 if (setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, &setsockflag, sizeof (setsockflag)) < 0) {
120 if (setsockopt(listenfd, SOL_SOCKET, SO_NOSIGPIPE, &setsockflag, sizeof (setsockflag)) < 0) {
126 my_addr.sin_family = AF_INET;
127 my_addr.sin_port = htons(LISTEN_PORT);
128 my_addr.sin_addr.s_addr = INADDR_ANY;
129 memset(&(my_addr.sin_zero), '\0', 8);
131 if (bind(listenfd, (struct sockaddr *)&my_addr, addrlength) == -1) {
136 if (listen(listenfd, BACKLOG) == -1) {
143 /* This function starts the thread to listen on a socket
144 * for tranaction calls */
145 void *dstmListen(void *lfd) {
146 int listenfd=(int)lfd;
148 struct sockaddr_in client_addr;
149 socklen_t addrlength = sizeof(struct sockaddr);
150 pthread_t thread_dstm_accept;
154 pthread_t thread_dstm_asking;
157 printf("Listening on port %d, fd = %d\n", LISTEN_PORT, listenfd);
162 acceptfd = accept(listenfd, (struct sockaddr *)&client_addr, &addrlength);
167 retval = pthread_create(&thread_dstm_asking, NULL, startAsking, NULL);
170 pthread_detach(thread_dstm_asking);
174 printf("%s -> fd accepted\n",__func__);
177 setsockopt(acceptfd, IPPROTO_TCP, TCP_NODELAY, (char *) &flag, sizeof(flag));
179 retval=pthread_create(&thread_dstm_accept, NULL, dstmAccept, (void *)acceptfd);
181 pthread_detach(thread_dstm_accept);
188 unsigned int deadMachineIndex = -1;
194 printf("%s -> Entering\n",__func__);
197 socklist = (int*) calloc(numHostsInSystem,sizeof(int));
199 for(i = 0; i< numHostsInSystem;i++) { // for 1
200 if((sd = getSockWithLock(transPResponseSocketPool,hostIpAddrs[i])) < 0) {
201 printf("%s -> Cannot create socket connection to [%s]\n",__func__,midtoIPString(hostIpAddrs[i]));
211 deadMachineIndex = checkIfAnyMachineDead(socklist);
213 // free socket of dead machine
214 if(deadMachineIndex >= 0) { // if 2
216 printf("%s -> Dead Machine : %s\n",__func__, midtoIPString(hostIpAddrs[deadMachineIndex]));
218 restoreDuplicationState(hostIpAddrs[deadMachineIndex]);
219 freeSockWithLock(transPResponseSocketPool, hostIpAddrs[deadMachineIndex], socklist[deadMachineIndex]);
220 socklist[deadMachineIndex] = -1;
224 printf("%s -> Exiting\n",__func__);
229 unsigned int checkIfAnyMachineDead(int* socklist)
233 char control = RESPOND_LIVE;
236 printf("%s -> Entering\n",__func__);
240 for(i = 0; i< numHostsInSystem;i++) {
242 printf("%s -> socklist[%d] = %d\n",__func__,i,socklist[i]);
244 if(socklist[i] > 0) {
245 send_data(socklist[i], &control,sizeof(char));
247 if(recv_data(socklist[i], &response, sizeof(char)) < 0) {
248 // if machine is dead, returns index of socket
250 printf("%s -> Machine dead detecteed\n",__func__);
256 if(response != LIVE) {
258 printf("%s -> Machine dead detected\n",__func__);
263 }// end if(socklist[i]
266 clearDeadThreadsNotification();
268 sleep(numLiveHostsInSystem); // wait for seconds for next checking
274 /* This function accepts a new connection request, decodes the control message in the connection
275 * and accordingly calls other functions to process new requests */
276 void *dstmAccept(void *acceptfd) {
277 int val, retval, size, sum, sockid, sd = 0;
280 char control,ctrl, response;
286 trans_commit_data_t transinfo;
287 unsigned short objType, *versionarry, version;
288 unsigned int *oidarry, numoid, mid, threadid;
290 unsigned int transIDreceived;
292 struct sockaddr_in remoteAddr;
295 printf("%s-> Entering dstmAccept\n", __func__); fflush(stdout);
297 /* Receive control messages from other machines */
299 int ret=recv_data_errorcode((int)acceptfd, &control, sizeof(char));
306 printf("DEBUG -> RECV Error!.. retrying\n");
312 printf("%s-> dstmAccept control = %d\n", __func__, (int)control);
317 printf("control -> READ_REQUEST\n");
319 /* Read oid requested and search if available */
320 recv_data((int)acceptfd, &oid, sizeof(unsigned int));
321 while((srcObj = mhashSearch(oid)) == NULL) {
323 // printf("HERE!!\n");
324 if((ret = sched_yield()) != 0) {
325 printf("%s(): error no %d in thread yield\n", __func__, errno);
328 h = (objheader_t *) srcObj;
330 size += sizeof(objheader_t);
331 sockid = (int) acceptfd;
333 ctrl = OBJECT_NOT_FOUND;
334 send_data(sockid, &ctrl, sizeof(char));
337 char msg[]={OBJECT_FOUND, 0, 0, 0, 0};
338 *((int *)&msg[1])=size;
339 send_data(sockid, &msg, sizeof(msg));
340 send_data(sockid, h, size);
344 case READ_MULT_REQUEST:
350 case MOVE_MULT_REQUEST:
355 printf("control -> TRANS_REQUEST\n");
357 /* Read transaction request */
358 transinfo.objlocked = NULL;
359 transinfo.objnotfound = NULL;
360 transinfo.modptr = NULL;
361 transinfo.numlocked = 0;
362 transinfo.numnotfound = 0;
363 if((val = readClientReq(&transinfo, (int)acceptfd)) != 0) {
364 printf("Error: In readClientReq() %s, %d\n", __FILE__, __LINE__);
371 recv_data((int)acceptfd, &transIDreceived, sizeof(unsigned int));
373 decision = checkDecision(transIDreceived);
375 send_data((int)acceptfd,&decision,sizeof(char));
381 printf("control -> TRANS_PREFETCH\n");
384 if((val = rangePrefetchReq((int)acceptfd)) != 0) {
385 printf("Error: In rangePrefetchReq() %s, %d\n", __FILE__, __LINE__);
389 if((val = prefetchReq((int)acceptfd)) != 0) {
390 printf("Error: In prefetchReq() %s, %d\n", __FILE__, __LINE__);
396 case TRANS_PREFETCH_RESPONSE:
398 printf("control -> TRANS_PREFETCH_RESPONSE\n");
401 if((val = getRangePrefetchResponse((int)acceptfd)) != 0) {
402 printf("Error: In getRangePrefetchRespose() %s, %d\n", __FILE__, __LINE__);
406 if((val = getPrefetchResponse((int) acceptfd)) != 0) {
407 printf("Error: In getPrefetchResponse() %s, %d\n", __FILE__, __LINE__);
413 case START_REMOTE_THREAD:
415 printf("control -> START_REMOTE_THREAD\n");
417 recv_data((int)acceptfd, &oid, sizeof(unsigned int));
418 objType = getObjType(oid);
419 startDSMthread(oid, objType);
422 case THREAD_NOTIFY_REQUEST:
424 printf("control -> THREAD_NOTIFY_REQUEST FD : %d\n",acceptfd);
427 recv_data((int)acceptfd, &numoid, sizeof(unsigned int));
429 size = (sizeof(unsigned int) + sizeof(unsigned short)) * numoid + 2 * sizeof(unsigned int);
430 if((buffer = calloc(1,size)) == NULL) {
431 printf("%s() Calloc error at %s, %d\n", __func__, __FILE__, __LINE__);
435 recv_data((int)acceptfd, buffer, size);
437 oidarry = calloc(numoid, sizeof(unsigned int));
438 memcpy(oidarry, buffer, sizeof(unsigned int) * numoid);
439 size = sizeof(unsigned int) * numoid;
440 versionarry = calloc(numoid, sizeof(unsigned short));
441 memcpy(versionarry, buffer+size, sizeof(unsigned short) * numoid);
442 size += sizeof(unsigned short) * numoid;
443 mid = *((unsigned int *)(buffer+size));
444 size += sizeof(unsigned int);
445 threadid = *((unsigned int *)(buffer+size));
446 processReqNotify(numoid, oidarry, versionarry, mid, threadid);
451 case THREAD_NOTIFY_RESPONSE:
453 printf("control -> THREAD_NOTIFY_RESPONSE\n");
455 size = sizeof(unsigned short) + 2 * sizeof(unsigned int);
456 if((buffer = calloc(1,size)) == NULL) {
457 printf("%s() Calloc error at %s, %d\n", __func__, __FILE__, __LINE__);
461 recv_data((int)acceptfd, buffer, size);
464 oid = *((unsigned int *)buffer);
465 size = sizeof(unsigned int);
466 version = *((unsigned short *)(buffer+size));
467 size += sizeof(unsigned short);
468 threadid = *((unsigned int *)(buffer+size));
469 threadNotify(oid,version,threadid);
473 case CLEAR_NOTIFY_LIST:
475 printf("control -> CLEAR_NOTIFY_LIST\n");
477 size = sizeof(unsigned int);
478 if((buffer = calloc(1,size)) == NULL) {
479 printf("%s() Caclloc error at CLEAR_NOTIFY_LIST\n");
483 recv_data((int)acceptfd,buffer, size);
485 oid = *((unsigned int *)buffer);
487 pthread_mutex_lock(&clearNotifyList_mutex);
488 if(clearNotifyListFlag == 0) {
489 clearNotifyListFlag = 1;
490 pthread_mutex_unlock(&clearNotifyList_mutex);
491 clearNotifyList(oid);
494 pthread_mutex_unlock(&clearNotifyList_mutex);
499 case CLOSE_CONNECTION:
501 printf("control -> CLOSE_CONNECTION\n");
503 goto closeconnection;
508 printf("control -> RESPOND_LIVE\n");
511 send_data((int)acceptfd, &ctrl, sizeof(ctrl));
513 printf("%s (RESPOND_LIVE)-> Sending LIVE!\n", __func__);
518 case REMOTE_RESTORE_DUPLICATED_STATE:
520 printf("control -> REMOTE_RESTORE_DUPLICATED_STATE\n");
522 recv_data((int)acceptfd, &mid, sizeof(unsigned int));
523 if(!liveHosts[findHost(mid)]) {
525 printf("%s (REMOTE_RESTORE_DUPLICATED_STATE) -> already fixed\n",__func__);
529 pthread_mutex_lock(&leaderFixing_mutex);
532 pthread_mutex_unlock(&leaderFixing_mutex);
535 duplicateLostObjects(mid);
536 if(updateLiveHostsCommit() != 0) {
537 printf("error updateLiveHostsCommit()\n");
542 pthread_mutex_lock(&leaderFixing_mutex);
544 pthread_mutex_unlock(&leaderFixing_mutex);
547 pthread_mutex_unlock(&leaderFixing_mutex);
549 printf("%s (REMOTE_RESTORE_DUPLICATED_STATE -> LEADER is already fixing\n",__func__);
556 case UPDATE_LIVE_HOSTS:
558 printf("control -> UPDATE_LIVE_HOSTS\n");
561 pthread_mutex_lock(&liveHosts_mutex);
562 recv_data((int)acceptfd, liveHosts, sizeof(int)*numHostsInSystem);
563 recv_data((int)acceptfd, locateObjHosts, sizeof(unsigned int)*numHostsInSystem*2);
564 pthread_mutex_unlock(&liveHosts_mutex);
565 numLiveHostsInSystem = getNumLiveHostsInSystem();
568 printf("%s (UPDATE_LIVE_HOSTS)-> Finished\n", __func__);
575 case DUPLICATE_ORIGINAL:
577 printf("control -> DUPLICATE_ORIGINAL\n");
578 printf("%s (DUPLICATE_ORIGINAL)-> Attempt to duplicate original objects\n", __func__);
580 //object store stuffffff
581 recv_data((int)acceptfd, &mid, sizeof(unsigned int));
582 tempsize = mhashGetDuplicate(&dupeptr, 0);
584 //send control and dupes after
585 ctrl = RECEIVE_DUPES;
587 if((sd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
588 perror("ORIGINAL : ");
592 bzero(&remoteAddr, sizeof(remoteAddr));
593 remoteAddr.sin_family = AF_INET;
594 remoteAddr.sin_port = htons(LISTEN_PORT);
595 remoteAddr.sin_addr.s_addr = htonl(mid);
597 if(connect(sd, (struct sockaddr *)&remoteAddr, sizeof(remoteAddr))<0) {
598 printf("ORIGINAL ERROR : %s\n",strerror(errno));
602 send_data(sd, &ctrl, sizeof(char));
603 send_data(sd, dupeptr, tempsize);
605 recv_data(sd, &response, sizeof(char));
607 printf("%s ->response : %d - %d\n",__func__,response,DUPLICATION_COMPLETE);
609 if(response != DUPLICATION_COMPLETE) {
611 printf("%s(DUPLICATION_ORIGINAL) -> DUPLICATION FAIL\n",__func__);
621 ctrl = DUPLICATION_COMPLETE;
622 send_data((int)acceptfd, &ctrl, sizeof(char));
624 printf("%s (DUPLICATE_ORIGINAL)-> Finished\n", __func__);
628 case DUPLICATE_BACKUP:
630 printf("control -> DUPLICATE_BACKUP\n");
631 printf("%s (DUPLICATE_BACKUP)-> Attempt to duplicate backup objects\n", __func__);
633 //object store stuffffff
634 recv_data((int)acceptfd, &mid, sizeof(unsigned int));
637 tempsize = mhashGetDuplicate(&dupeptr, 1);
639 //send control and dupes after
640 ctrl = RECEIVE_DUPES;
642 if((sd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
647 bzero(&remoteAddr, sizeof(remoteAddr));
648 remoteAddr.sin_family = AF_INET;
649 remoteAddr.sin_port = htons(LISTEN_PORT);
650 remoteAddr.sin_addr.s_addr = htonl(mid);
652 if(connect(sd, (struct sockaddr *)&remoteAddr, sizeof(remoteAddr))<0) {
653 printf("BACKUP ERROR : %s\n",strerror(errno));
657 send_data(sd, &ctrl, sizeof(char));
658 send_data(sd, dupeptr, tempsize);
660 recv_data(sd, &response, sizeof(char));
662 printf("%s ->response : %d - %d\n",__func__,response,DUPLICATION_COMPLETE);
664 if(response != DUPLICATION_COMPLETE) {
666 printf("%s(DUPLICATION_BACKUP) -> DUPLICATION FAIL\n",__func__);
676 ctrl = DUPLICATION_COMPLETE;
677 send_data((int)acceptfd, &ctrl, sizeof(char));
679 printf("%s (DUPLICATE_BACKUP)-> Finished\n", __func__);
686 printf("control -> RECEIVE_DUPES sd : %d\n",(int)acceptfd);
688 if((readDuplicateObjs((int)acceptfd)) != 0) {
689 printf("Error: In readDuplicateObjs() %s, %d\n", __FILE__, __LINE__);
693 ctrl = DUPLICATION_COMPLETE;
694 send_data((int)acceptfd, &ctrl, sizeof(char));
696 printf("%s (RECEIVE_DUPES) -> Finished\n",__func__);
703 printf("control -> PAXOS_PREPARE\n");
705 recv_data((int)acceptfd, &val, sizeof(int));
707 control = PAXOS_PREPARE_REJECT;
708 send_data((int)acceptfd, &control, sizeof(char));
712 control = PAXOS_PREPARE_OK;
714 send_data((int)acceptfd, &control, sizeof(char));
715 send_data((int)acceptfd, &n_a, sizeof(int));
716 send_data((int)acceptfd, &v_a, sizeof(int));
722 printf("control -> PAXOS_ACCEPT\n");
724 recv_data((int)acceptfd, &n, sizeof(int));
725 recv_data((int)acceptfd, &v, sizeof(int));
727 control = PAXOS_ACCEPT_REJECT;
728 send_data((int)acceptfd, &control, sizeof(char));
734 control = PAXOS_ACCEPT_OK;
735 send_data((int)acceptfd, &control, sizeof(char));
741 printf("control -> PAXOS_LEARN\n");
743 recv_data((int)acceptfd, &v, sizeof(int));
747 printf("%s (PAXOS_LEARN)-> This is my leader!: [%s]\n", __func__, midtoIPString(leader));
753 printf("control -> DELETE_LEADER\n");
759 printf("Error: dstmAccept() Unknown opcode %d at %s, %d\n", control, __FILE__, __LINE__);
763 printf("%s-> Exiting\n", __func__); fflush(stdout);
766 /* Close connection */
767 if (close((int)acceptfd) == -1)
772 int readDuplicateObjs(int acceptfd) {
773 int numoid, i, size, tmpsize;
775 void *dupeptr, *ptrcreate, *ptr;
779 printf("%s-> Start\n", __func__);
781 recv_data((int)acceptfd, &numoid, sizeof(unsigned int));
782 recv_data((int)acceptfd, &size, sizeof(int));
783 // do i need array of oids?
784 // answer: no! now get to work
786 if ((dupeptr = calloc(1, size)) == NULL) {
787 printf("calloc error for duplicated objects %s, %d\n", __FILE__, __LINE__);
791 recv_data((int)acceptfd, dupeptr, size);
793 for(i = 0; i < numoid; i++) {
794 header = (objheader_t *)ptr;
796 GETSIZE(tmpsize, header);
797 tmpsize += sizeof(objheader_t);
800 printf("%s-> oid being received/backed:%u, version:%d, type:%d\n", __func__, oid, header->version, TYPE(header));
801 printf("STATUSPTR(header):%u, STATUS:%d\n", STATUSPTR(header), STATUS(header));
804 if(mhashSearch(oid) != NULL) {
806 printf("%s -> oid : %d is already in there\n",__func__,oid);
809 if(header->notifylist != NULL) {
810 unsigned int *listSize = (ptr + tmpsize);
811 tmpsize += sizeof(unsigned int);
812 tmpsize += sizeof(threadlist_t) * (*listSize);
816 pthread_mutex_lock(&mainobjstore_mutex);
817 if ((ptrcreate = objstrAlloc(&mainobjstore, tmpsize)) == NULL) {
818 printf("Error: readDuplicateObjs() failed objstrAlloc %s, %d\n", __FILE__, __LINE__);
819 pthread_mutex_unlock(&mainobjstore_mutex);
822 pthread_mutex_unlock(&mainobjstore_mutex);
823 memcpy(ptrcreate, header, tmpsize);
825 objheader_t* oPtr = (objheader_t*)ptrcreate;
827 if(oPtr->notifylist != NULL) {
828 oPtr->notifylist = NULL; // reset for new list
829 threadlist_t *listNode;
830 unsigned int* listSize = (ptr + tmpsize); // get number of notifylist
833 tmpsize += sizeof(unsigned int); // skip number of notifylist
834 listNode = (threadlist_t*)(ptr + tmpsize); // get first element of address
835 for(j = 0; j< *listSize; j++) { // retreive all threadlist
836 oPtr->notifylist = insNode(oPtr->notifylist,listNode[j].threadid,listNode[j].mid);
839 tmpsize += sizeof(threadlist_t) * (*listSize);
842 mhashInsert(oid, ptrcreate);
847 printf("%s-> End\n", __func__);
854 printf("%s-> No objects duplicated\n", __func__);
860 /* This function reads the information available in a transaction request
861 * and makes a function call to process the request */
862 int readClientReq(trans_commit_data_t *transinfo, int acceptfd) {
865 unsigned int *oidmod, oid;
867 objheader_t *headaddr;
868 int sum, i, size, n, val;
873 printf("%s-> Entering\n", __func__);
876 /* Read fixed_data_t data structure */
877 size = sizeof(fixed) - 1;
878 ptr = (char *)&fixed;
879 fixed.control = TRANS_REQUEST;
880 timeout = recv_data((int)acceptfd, ptr+1, size);
882 /* Read list of mids */
883 int mcount = fixed.mcount;
884 size = mcount * sizeof(unsigned int);
885 unsigned int listmid[mcount];
886 ptr = (char *) listmid;
887 timeout = recv_data((int)acceptfd, ptr, size);
889 if(timeout < 0) // coordinator failed
892 /* Read oid and version tuples for those objects that are not modified in the transaction */
893 int numread = fixed.numread;
894 size = numread * (sizeof(unsigned int) + sizeof(unsigned short));
896 if(numread != 0) { //If pile contains more than one object to be read,
897 // keep reading all objects
898 timeout = recv_data((int)acceptfd, objread, size);
901 /* Read modified objects */
902 if(fixed.nummod != 0) {
903 if ((modptr = calloc(1, fixed.sum_bytes)) == NULL) {
904 printf("calloc error for modified objects %s, %d\n", __FILE__, __LINE__);
907 size = fixed.sum_bytes;
908 timeout = recv_data((int)acceptfd, modptr, size);
911 if(timeout < 0) // coordinator failed
914 /* Create an array of oids for modified objects */
915 oidmod = (unsigned int *) calloc(fixed.nummod, sizeof(unsigned int));
916 if (oidmod == NULL) {
917 printf("calloc error %s, %d\n", __FILE__, __LINE__);
920 ptr = (char *) modptr;
921 for(i = 0 ; i < fixed.nummod; i++) {
923 headaddr = (objheader_t *) ptr;
926 GETSIZE(tmpsize, headaddr);
927 ptr += sizeof(objheader_t) + tmpsize;
930 printf("%s-> num oid read = %d, oids modified = %d, size = %d\n", __func__, fixed.numread, fixed.nummod, size);
932 /*Process the information read */
933 if((val = processClientReq(&fixed, transinfo, listmid, objread, modptr, oidmod, acceptfd)) != 0) {
934 printf("Error: In processClientReq() %s, %d\n", __FILE__, __LINE__);
947 printf("%s-> Exiting\n", __func__);
953 /* This function processes the Coordinator's transaction request using "handleTransReq"
954 * function and sends a reply to the co-ordinator.
955 * Following this it also receives a new control message from the co-ordinator and processes this message*/
956 int processClientReq(fixed_data_t *fixed, trans_commit_data_t *transinfo,
957 unsigned int *listmid, char *objread, void *modptr, unsigned int *oidmod, int acceptfd) {
959 char control, sendctrl, retval;
961 objheader_t *tmp_header;
964 unsigned int transID;
966 printf("%s-> Entering\n", __func__);
969 /* receives transaction id */
970 recv_data((int)acceptfd, &transID, sizeof(unsigned int));
972 /* Send reply to the Coordinator */
973 if((retval = handleTransReq(fixed, transinfo, listmid, objread, modptr,acceptfd)) == 0 ) {
974 printf("Error: In handleTransReq() %s, %d\n", __FILE__, __LINE__);
975 printf("DEBUG-> Exiting processClientReq, line = %d\n", __LINE__);
979 int timeout = recv_data((int)acceptfd, &control, sizeof(char));
980 /* Process the new control message */
982 printf("%s -> timeout = %d control = %d\n",__func__,timeout,control);
986 if(timeout < 0) { // timeout. failed to receiving data from coordinator
988 printf("%s -> timeout!! assumes coordinator is dead\n",__func__);
990 control = receiveDecisionFromBackup(transID,fixed->mcount,listmid);
992 printf("%s -> received Decision %d\n",__func__,control);
996 /* insert received control into thash for another transaction*/
997 thashInsert(transID, control);
1001 if (fixed->nummod > 0)
1003 /* Unlock objects that was locked due to this transaction */
1004 int useWriteUnlock = 0;
1005 for(i = 0; i< transinfo->numlocked; i++) {
1006 if(transinfo->objlocked[i] == -1) {
1010 if((header = mhashSearch(transinfo->objlocked[i])) == NULL) {
1011 printf("mhashSearch returns NULL at %s, %d\n", __FILE__, __LINE__); // find the header address
1012 printf("%s-> Exiting, line:%d\n", __func__, __LINE__);
1015 if(useWriteUnlock) {
1016 write_unlock(STATUSPTR(header));
1018 read_unlock(STATUSPTR(header));
1024 /* Invoke the transCommit process() */
1025 if((val = transCommitProcess(modptr, oidmod, transinfo->objlocked, fixed->nummod, transinfo->numlocked, (int)acceptfd)) != 0) {
1026 printf("Error: In transCommitProcess() %s, %d\n", __FILE__, __LINE__);
1028 if (transinfo->objlocked != NULL) {
1029 free(transinfo->objlocked);
1031 if (transinfo->objnotfound != NULL) {
1032 free(transinfo->objnotfound);
1034 printf("%s-> Exiting, line:%d\n", __func__, __LINE__);
1041 case TRANS_ABORT_BUT_RETRY_COMMIT_WITH_RELOCATING:
1045 printf("Error: No response to TRANS_AGREE OR DISAGREE protocol %s, %d\n", __FILE__, __LINE__);
1046 //TODO Use fixed.trans_id TID since Client may have died
1051 if (transinfo->objlocked != NULL) {
1052 free(transinfo->objlocked);
1054 if (transinfo->objnotfound != NULL) {
1055 free(transinfo->objnotfound);
1058 printf("%s-> Exiting, line:%d\n", __func__, __LINE__);
1065 char checkDecision(unsigned int transID)
1068 printf("%s -> transID : %u\n",__func__,transID);
1071 char response = thashSearch(transID);
1080 /* This function increments counters while running a voting decision on all objects involved
1081 * in TRANS_REQUEST and If a TRANS_DISAGREE sends the response immediately back to the coordinator */
1082 char handleTransReq(fixed_data_t *fixed, trans_commit_data_t *transinfo, unsigned int *listmid, char *objread, void *modptr, int acceptfd) {
1084 unsigned short version;
1085 char control = 0, *ptr;
1087 unsigned int *oidnotfound, *oidlocked, *oidvernotmatch;
1088 objheader_t *headptr;
1090 /* Counters and arrays to formulate decision on control message to be sent */
1091 oidnotfound = (unsigned int *) calloc(fixed->numread + fixed->nummod, sizeof(unsigned int));
1092 oidlocked = (unsigned int *) calloc(fixed->numread + fixed->nummod + 1, sizeof(unsigned int));
1093 oidvernotmatch = (unsigned int *) calloc(fixed->numread + fixed->nummod, sizeof(unsigned int));
1094 int objnotfound = 0, objlocked = 0, objvernotmatch = 0;
1095 int v_nomatch = 0, v_matchlock = 0, v_matchnolock = 0;
1097 /* modptr points to the beginning of the object store
1098 * created at the Pariticipant.
1099 * Object store holds the modified objects involved in the transaction request */
1100 ptr = (char *) modptr;
1102 /* Process each oid in the machine pile/ group per thread */
1103 for (i = 0; i < fixed->numread + fixed->nummod; i++) {
1104 if (i < fixed->numread) { //Objs only read and not modified
1105 int incr = sizeof(unsigned int) + sizeof(unsigned short); // Offset that points to next position in the objread array
1107 oid = *((unsigned int *)(objread + incr));
1108 incr += sizeof(unsigned int);
1109 version = *((unsigned short *)(objread + incr));
1111 printf("%s -> oid : %u version : %d\n",__func__,oid,version);
1113 getCommitCountForObjRead(oidnotfound, oidlocked, oidvernotmatch, &objnotfound, &objlocked, &objvernotmatch,
1114 &v_matchnolock, &v_matchlock, &v_nomatch, &numBytes, &control, oid, version);
1115 } else { //Objs modified
1116 if(i == fixed->numread) {
1117 oidlocked[objlocked++] = -1;
1120 headptr = (objheader_t *) ptr;
1122 version = headptr->version;
1123 GETSIZE(tmpsize, headptr);
1124 ptr += sizeof(objheader_t) + tmpsize;
1126 getCommitCountForObjMod(oidnotfound, oidlocked, oidvernotmatch, &objnotfound,
1127 &objlocked, &objvernotmatch, &v_matchnolock, &v_matchlock, &v_nomatch,
1128 &numBytes, &control, oid, version);
1132 /* send TRANS_DISAGREE and objs*/
1135 char *objs = calloc(1, numBytes);
1137 for(j = 0; j<objvernotmatch; j++) {
1138 objheader_t *header = mhashSearch(oidvernotmatch[j]);
1140 GETSIZE(size, header);
1141 size += sizeof(objheader_t);
1142 memcpy(objs+offset, header, size);
1146 if (objlocked > 0) {
1147 int useWriteUnlock = 0;
1148 for(j = 0; j < objlocked; j++) {
1149 if(oidlocked[j] == -1) {
1153 if((headptr = mhashSearch(oidlocked[j])) == NULL) {
1154 printf("mhashSearch returns NULL at %s, %d\n", __FILE__, __LINE__);
1157 if(useWriteUnlock) {
1158 write_unlock(STATUSPTR(headptr));
1160 read_unlock(STATUSPTR(headptr));
1168 printf("%s -> control = %d, file = %s, line = %d\n", __func__,(int)control, __FILE__, __LINE__);
1171 send_data(acceptfd, &control, sizeof(char));
1173 send_data(acceptfd, &numBytes, sizeof(int));
1174 send_data(acceptfd, objs, numBytes);
1176 transinfo->objvernotmatch = oidvernotmatch;
1177 transinfo->numvernotmatch = objvernotmatch;
1179 free(transinfo->objvernotmatch);
1184 /* Decide what control message to send to Coordinator */
1185 if ((control = decideCtrlMessage(fixed, transinfo, &v_matchnolock, &v_matchlock, &v_nomatch, &objnotfound, &objlocked,
1186 modptr, oidnotfound, oidlocked, acceptfd)) == 0) {
1187 printf("Error: In decideCtrlMessage() %s, %d\n", __FILE__, __LINE__);
1193 /* Update Commit info for objects that are modified */
1194 void getCommitCountForObjMod(unsigned int *oidnotfound, unsigned int *oidlocked,
1195 unsigned int *oidvernotmatch, int *objnotfound, int *objlocked, int *objvernotmatch,
1196 int *v_matchnolock, int *v_matchlock, int *v_nomatch, int *numBytes,
1197 char *control, unsigned int oid, unsigned short version) {
1199 /* Check if object is still present in the machine since the beginning of TRANS_REQUEST */
1200 //printf("version number: %d\n", version);
1205 printf("%s -> *backup object* oid:%u\n", __func__,oid);
1211 if ((mobj = mhashSearch(oid)) == NULL) { /* Obj not found */
1213 printf("Obj not found: %s() oid = %d, type = %d\t\n", __func__, OID(mobj), TYPE((objheader_t *)mobj));
1216 /* Save the oids not found and number of oids not found for later use */
1217 oidnotfound[*objnotfound] = oid;
1219 } else { /* If Obj found in machine (i.e. has not moved) */
1220 /* Check if Obj is locked by any previous transaction */
1221 if (write_trylock(STATUSPTR(mobj))) { // Can acquire write lock
1223 printf("****%s->Trying to acquire 'remote' writelock for oid:%d, version:%d\n", __func__, oid, version);
1224 printf("this version: %d, mlookup version: %d\n", version, ((objheader_t *)mobj)->version);
1226 if (version == ((objheader_t *)mobj)->version) { /* match versions */
1228 } else { /* If versions don't match ...HARD ABORT */
1230 oidvernotmatch[*objvernotmatch] = oid;
1231 (*objvernotmatch)++;
1233 GETSIZE(size, mobj);
1234 size += sizeof(objheader_t);
1236 /* Send TRANS_DISAGREE to Coordinator */
1237 *control = TRANS_DISAGREE;
1238 //printf("%s() oid = %d, type = %d\t", __func__, OID(mobj), TYPE((objheader_t *)mobj));
1240 //Keep track of oid locked
1241 oidlocked[(*objlocked)++] = OID(((objheader_t *)mobj));
1242 } else { //we are locked
1243 if (version == ((objheader_t *)mobj)->version) { /* Check if versions match */
1245 } else { /* If versions don't match ...HARD ABORT */
1247 oidvernotmatch[*objvernotmatch] = oid;
1248 (*objvernotmatch)++;
1250 GETSIZE(size, mobj);
1251 size += sizeof(objheader_t);
1253 *control = TRANS_DISAGREE;
1254 //printf("%s() oid = %d, type = %d\t", __func__, OID(mobj), TYPE((objheader_t *)mobj));
1259 printf("%s -> oid: %u, v_matchnolock: %d, v_matchlock: %d, v_nomatch: %d\n",__func__,oid, *v_matchnolock, *v_matchlock, *v_nomatch);
1263 /* Update Commit info for objects that are read */
1264 void getCommitCountForObjRead(unsigned int *oidnotfound, unsigned int *oidlocked, unsigned int *oidvernotmatch,
1265 int *objnotfound, int *objlocked, int * objvernotmatch, int *v_matchnolock, int *v_matchlock,
1266 int *v_nomatch, int *numBytes, char *control, unsigned int oid, unsigned short version) {
1268 /* Check if object is still present in the machine since the beginning of TRANS_REQUEST */
1269 //printf("version number: %d\n", version);
1271 printf("%s -> Entering\n",__func__);
1276 printf("*backup object* oid:%u\n", oid);
1281 if ((mobj = mhashSearch(oid)) == NULL) { /* Obj not found */
1283 printf("%s -> Obj not found!\n",__func__);
1284 printf("%s -> Obj not found: oid = %d, type = %d\t\n", __func__,OID(mobj), TYPE((objheader_t *)mobj));
1287 /* Save the oids not found and number of oids not found for later use */
1288 oidnotfound[*objnotfound] = oid;
1290 } else { /* If Obj found in machine (i.e. has not moved) */
1292 printf("%s -> Obj found!!\n",__func__);
1293 printf("%s -> Obj found: oid = %d, type = %d\t\n", __func__, OID(mobj), TYPE((objheader_t *)mobj));
1297 /* Check if Obj is locked by any previous transaction */
1298 if (read_trylock(STATUSPTR(mobj))) { //Can further acquire read locks
1299 if (version == ((objheader_t *)mobj)->version) { /* match versions */
1301 } else { /* If versions don't match ...HARD ABORT */
1303 oidvernotmatch[(*objvernotmatch)++] = oid;
1305 GETSIZE(size, mobj);
1306 size += sizeof(objheader_t);
1309 /* Send TRANS_DISAGREE to Coordinator */
1310 *control = TRANS_DISAGREE;
1313 //Keep track of oid locked
1314 oidlocked[(*objlocked)++] = OID(((objheader_t *)mobj));
1315 } else { /* Some other transaction has aquired a write lock on this object */
1316 if (version == ((objheader_t *)mobj)->version) { /* Check if versions match */
1318 } else { /* If versions don't match ...HARD ABORT */
1320 oidvernotmatch[*objvernotmatch] = oid;
1321 (*objvernotmatch)++;
1323 GETSIZE(size, mobj);
1324 size += sizeof(objheader_t);
1326 *control = TRANS_DISAGREE;
1331 printf("%s -> oid: %u, v_matchnolock: %d, v_matchlock: %d, v_nomatch: %d\n",__func__, oid, *v_matchnolock, *v_matchlock, *v_nomatch);
1335 /* This function decides what control message such as TRANS_AGREE, TRANS_DISAGREE or TRANS_SOFT_ABORT
1336 * to send to Coordinator based on the votes of oids involved in the transaction */
1337 char decideCtrlMessage(fixed_data_t *fixed, trans_commit_data_t *transinfo, int *v_matchnolock, int *v_matchlock,
1338 int *v_nomatch, int *objnotfound, int *objlocked, void *modptr,
1339 unsigned int *oidnotfound, unsigned int *oidlocked, int acceptfd) {
1343 /* Condition to send TRANS_AGREE */
1344 if(*(v_matchnolock) == fixed->numread + fixed->nummod) {
1345 control = TRANS_AGREE;
1346 /* Send control message */
1348 printf("%s -> control = %s\n", __func__,"TRANS_AGREE");
1350 send_data(acceptfd, &control, sizeof(char));
1353 printf("%s -> finished sending control\n",__func__);
1356 /* Condition to send TRANS_SOFT_ABORT */
1357 else if((*(v_matchlock) > 0 && *(v_nomatch) == 0) || (*(objnotfound) > 0 && *(v_nomatch) == 0)) {
1358 control = TRANS_SOFT_ABORT;
1360 printf("%s -> control = %s\n", __func__,"TRANS_SOFT_ABORT");
1362 /* Send control message */
1363 send_data(acceptfd, &control, sizeof(char));
1365 /* FIXME how to send objs Send number of oids not found and the missing oids if objects are missing in the machine */
1366 if(*(objnotfound) != 0) {
1368 msg[0] = *(objnotfound);
1369 send_data(acceptfd, &msg, sizeof(int));
1370 int size = sizeof(unsigned int)* *(objnotfound);
1371 send_data(acceptfd, oidnotfound, size);
1375 /* Fill out the trans_commit_data_t data structure. This is required for a trans commit process
1376 * if Participant receives a TRANS_COMMIT */
1377 transinfo->objlocked = oidlocked;
1378 transinfo->objnotfound = oidnotfound;
1379 transinfo->modptr = modptr;
1380 transinfo->numlocked = *(objlocked);
1381 transinfo->numnotfound = *(objnotfound);
1385 /* This function processes all modified objects involved in a TRANS_COMMIT and updates pointer
1386 * addresses in lookup table and also changes version number
1387 * Sends an ACK back to Coordinator */
1388 int transCommitProcess(void *modptr, unsigned int *oidmod, unsigned int *oidlocked, int nummod, int numlocked, int acceptfd) {
1389 objheader_t *header;
1390 objheader_t *newheader;
1391 int i = 0, offset = 0;
1396 printf("DEBUG-> Entering transCommitProcess, dstmserver.c\n");
1397 printf("nummod: %d, numlocked: %d\n", nummod, numlocked);
1400 /* Process each modified object saved in the mainobject store */
1401 for(i = 0; i < nummod; i++) {
1402 if((header = (objheader_t *) mhashSearch(oidmod[i])) == NULL) {
1404 printf("Error: mhashsearch returns NULL at dstmserver.c %d\n", __LINE__);
1407 header = (objheader_t *)(modptr+offset);
1408 header->version += 1;
1409 header->isBackup = 1;
1411 printf("oid: %u, new header version: %d\n", oidmod[i], header->version);
1413 GETSIZE(tmpsize, header);
1414 tmpsize += sizeof(objheader_t);
1415 pthread_mutex_lock(&mainobjstore_mutex);
1416 if ((ptrcreate = objstrAlloc(&mainobjstore, tmpsize)) == NULL) {
1417 printf("Error: transComProcess() failed objstrAlloc %s, %d\n", __FILE__, __LINE__);
1418 pthread_mutex_unlock(&mainobjstore_mutex);
1421 pthread_mutex_unlock(&mainobjstore_mutex);
1422 /* Initialize read and write locks */
1423 initdsmlocks(STATUSPTR(header));
1424 memcpy(ptrcreate, header, tmpsize);
1425 mhashInsert(oidmod[i], ptrcreate);
1432 GETSIZE(tmpsize,header);
1435 struct ___Object___ *dst=(struct ___Object___*)((char*)header+sizeof(objheader_t));
1436 struct ___Object___ *src=(struct ___Object___*)((char*)modptr+sizeof(objheader_t)+offset);
1437 dst->type=src->type;
1438 dst->___cachedCode___=src->___cachedCode___;
1439 dst->___cachedHash___=src->___cachedHash___;
1440 memcpy(&dst[1], &src[1], tmpsize-sizeof(struct ___Object___));
1442 header->version += 1;
1444 printf("oid: %u, new header version: %d\n", oidmod[i], header->version);
1446 /* If threads are waiting on this object to be updated, notify them */
1447 if(header->notifylist != NULL) {
1449 printf("%s -> type : %d notifylist : %d\n",__func__,TYPE(header),header->notifylist);
1453 if(header->isBackup != 0)
1454 notifyAll(&header->notifylist, OID(header), header->version);
1456 clearNotifyList(OID(header));
1458 notifyAll(&header->notifylist, OID(header), header->version);
1462 offset += sizeof(objheader_t) + tmpsize;
1468 /* Unlock locked objects */
1469 int useWriteUnlock = 0;
1470 for(i = 0; i < numlocked; i++) {
1471 if(oidlocked[i] == -1) {
1475 if((header = (objheader_t *) mhashSearch(oidlocked[i])) == NULL) {
1476 printf("Error: mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__);
1481 printf("header oid:%d, version:%d, useWriteUnlock:%d\n", OID(header), header->version, useWriteUnlock);
1483 if(useWriteUnlock) {
1484 write_unlock(STATUSPTR(header));
1486 read_unlock(STATUSPTR(header));
1489 //TODO Update location lookup table
1493 /* This function recevies the oid and offset tuples from the Coordinator's prefetch call.
1494 * Looks for the objects to be prefetched in the main object store.
1495 * If objects are not found then record those and if objects are found
1496 * then use offset values to prefetch references to other objects */
1498 int prefetchReq(int acceptfd) {
1499 int i, size, objsize, numoffset = 0;
1501 char *recvbuffer, control;
1502 unsigned int oid, mid=-1;
1503 objheader_t *header;
1504 oidmidpair_t oidmid;
1508 recv_data((int)acceptfd, &numoffset, sizeof(int));
1511 recv_data((int)acceptfd, &oidmid, 2*sizeof(unsigned int));
1513 if (mid != oidmid.mid) {
1515 freeSockWithLock(transPResponseSocketPool, mid, sd);
1518 sd = getSockWithLock(transPResponseSocketPool, mid);
1520 short offsetarry[numoffset];
1521 recv_data((int) acceptfd, offsetarry, numoffset*sizeof(short));
1523 /*Process each oid */
1524 if ((header = mhashSearch(oid)) == NULL) { /* Obj not found */
1525 /* Save the oids not found in buffer for later use */
1526 size = sizeof(int) + sizeof(char) + sizeof(unsigned int) ;
1527 char sendbuffer[size];
1528 *((int *) sendbuffer) = size;
1529 *((char *)(sendbuffer + sizeof(int))) = OBJECT_NOT_FOUND;
1530 *((unsigned int *)(sendbuffer + sizeof(int) + sizeof(char))) = oid;
1531 control = TRANS_PREFETCH_RESPONSE;
1532 sendPrefetchResponse(sd, &control, sendbuffer, &size);
1533 } else { /* Object Found */
1535 GETSIZE(objsize, header);
1536 size = sizeof(int) + sizeof(char) + sizeof(unsigned int) + sizeof(objheader_t) + objsize;
1537 char sendbuffer[size];
1538 *((int *)(sendbuffer + incr)) = size;
1539 incr += sizeof(int);
1540 *((char *)(sendbuffer + incr)) = OBJECT_FOUND;
1541 incr += sizeof(char);
1542 *((unsigned int *)(sendbuffer+incr)) = oid;
1543 incr += sizeof(unsigned int);
1544 memcpy(sendbuffer + incr, header, objsize + sizeof(objheader_t));
1546 control = TRANS_PREFETCH_RESPONSE;
1547 sendPrefetchResponse(sd, &control, sendbuffer, &size);
1549 /* Calculate the oid corresponding to the offset value */
1550 for(i = 0 ; i< numoffset ; i++) {
1551 /* Check for arrays */
1552 if(TYPE(header) >= NUMCLASSES) {
1553 int elementsize = classsize[TYPE(header)];
1554 struct ArrayObject *ao = (struct ArrayObject *) (((char *)header) + sizeof(objheader_t));
1555 unsigned short length = ao->___length___;
1556 /* Check if array out of bounds */
1557 if(offsetarry[i]< 0 || offsetarry[i] >= length) {
1560 oid = *((unsigned int *)(((char *)header) + sizeof(objheader_t) + sizeof(struct ArrayObject) + (elementsize*offsetarry[i])));
1562 oid = *((unsigned int *)(((char *)header) + sizeof(objheader_t) + offsetarry[i]));
1565 /* Don't continue if we hit a NULL pointer */
1569 if((header = mhashSearch(oid)) == NULL) {
1570 size = sizeof(int) + sizeof(char) + sizeof(unsigned int) ;
1571 char sendbuffer[size];
1572 *((int *) sendbuffer) = size;
1573 *((char *)(sendbuffer + sizeof(int))) = OBJECT_NOT_FOUND;
1574 *((unsigned int *)(sendbuffer + sizeof(int) + sizeof(char))) = oid;
1576 control = TRANS_PREFETCH_RESPONSE;
1577 sendPrefetchResponse(sd, &control, sendbuffer, &size);
1579 } else { /* Obj Found */
1581 GETSIZE(objsize, header);
1582 size = sizeof(int) + sizeof(char) + sizeof(unsigned int) + sizeof(objheader_t) + objsize;
1583 char sendbuffer[size];
1584 *((int *)(sendbuffer + incr)) = size;
1585 incr += sizeof(int);
1586 *((char *)(sendbuffer + incr)) = OBJECT_FOUND;
1587 incr += sizeof(char);
1588 *((unsigned int *)(sendbuffer+incr)) = oid;
1589 incr += sizeof(unsigned int);
1590 memcpy(sendbuffer + incr, header, objsize + sizeof(objheader_t));
1592 control = TRANS_PREFETCH_RESPONSE;
1593 sendPrefetchResponse(sd, &control, sendbuffer, &size);
1600 freeSockWithLock(transPResponseSocketPool, mid, sd);
1605 void sendPrefetchResponse(int sd, char *control, char *sendbuffer, int *size) {
1607 printf("control = %d, file = %s, line = %d\n", (int)control, __FILE__, __LINE__);
1609 send_data(sd, control, sizeof(char));
1610 /* Send the buffer with its size */
1611 int length = *(size);
1612 send_data(sd, sendbuffer, length);
1615 void processReqNotify(unsigned int numoid, unsigned int *oidarry, unsigned short *versionarry, unsigned int mid, unsigned int threadid) {
1616 objheader_t *header;
1618 unsigned short newversion;
1619 char msg[1+ 2 * sizeof(unsigned int) + sizeof(unsigned short)];
1621 struct sockaddr_in remoteAddr;
1627 oid = *(oidarry + i);
1628 if((header = (objheader_t *) mhashSearch(oid)) == NULL) {
1629 printf("Error: mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__);
1632 /* Check to see if versions are same */
1634 if (write_trylock(STATUSPTR(header))) { // Can acquire write lock
1635 newversion = header->version;
1637 if(newversion == *(versionarry + i)) {
1638 //Add to the notify list
1639 if((header->notifylist = insNode(header->notifylist, threadid, mid)) == NULL) {
1640 printf("Error: Obj notify list points to NULL %s, %d\n", __FILE__, __LINE__);
1643 write_unlock(STATUSPTR(header));
1646 write_unlock(STATUSPTR(header));
1647 if ((sd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
1648 perror("processReqNotify():socket()");
1651 bzero(&remoteAddr, sizeof(remoteAddr));
1652 remoteAddr.sin_family = AF_INET;
1653 remoteAddr.sin_port = htons(LISTEN_PORT);
1654 remoteAddr.sin_addr.s_addr = htonl(mid);
1656 if(connect(sd, (struct sockaddr *)&remoteAddr, sizeof(remoteAddr)) < 0) {
1657 printf("Error: processReqNotify():error %d connecting to %s:%d\n", errno,
1658 inet_ntoa(remoteAddr.sin_addr), LISTEN_PORT);
1662 //Send Update notification
1663 msg[0] = THREAD_NOTIFY_RESPONSE;
1664 *((unsigned int *)&msg[1]) = oid;
1665 size = sizeof(unsigned int);
1666 *((unsigned short *)(&msg[1]+size)) = newversion;
1667 size += sizeof(unsigned short);
1668 *((unsigned int *)(&msg[1]+size)) = threadid;
1669 size = 1+ 2*sizeof(unsigned int) + sizeof(unsigned short);
1670 send_data(sd, msg, size);
1686 /* go through oid's notifylist and clear them */
1687 void clearNotifyList(unsigned int oid)
1690 printf("%s -> Entering\n",__func__);
1693 objheader_t* header;
1697 if((header = (objheader_t *) mhashSearch(oid)) == NULL) {
1698 printf("%s -> mhashSearch returned NULL!!\n",__func__);
1701 if(header->notifylist != NULL) {
1702 t = header->notifylist;
1710 header->notifylist = NULL;
1713 pthread_mutex_lock(&clearNotifyList_mutex);
1714 clearNotifyListFlag = 0;
1715 pthread_mutex_unlock(&clearNotifyList_mutex);
1717 printf("%s -> finished\n",__func__);