modifies getTypeObj()
[IRC.git] / Robust / src / Runtime / DSTM / interface_recovery / dstmserver.c
1 /* Coordinator => Machine that initiates the transaction request call for commiting a transaction
2  * Participant => Machines that host the objects involved in a transaction commit */
3
4 #include <netinet/tcp.h>
5 #include <ip.h>
6 #include "dstm.h"
7 #include "mlookup.h"
8 #include "llookup.h"
9 #include "threadnotify.h"
10 #include "prefetch.h"
11 #include <sched.h>
12 #ifdef COMPILER
13 #include "thread.h"
14 #endif
15 #include "gCollect.h"
16
17 #ifdef RECOVERY
18 #include <unistd.h>
19 #include <signal.h>
20 #include "tlookup.h"
21 #endif
22
23 #define BACKLOG 10 //max pending connections
24 #define RECEIVE_BUFFER_SIZE 2048
25
26 extern int classsize[];
27 extern int numHostsInSystem;
28 extern pthread_mutex_t notifymutex;
29
30 extern unsigned int myIpAddr;
31 extern unsigned int *hostIpAddrs;
32
33 #ifdef RECOVERY
34 extern unsigned int *locateObjHosts;
35 extern int *liveHosts;
36 extern int numLiveHostsInSystem;
37 int clearNotifyListFlag;
38 #endif
39
40 objstr_t *mainobjstore;
41 pthread_mutex_t mainobjstore_mutex;
42 pthread_mutex_t lockObjHeader;
43 pthread_mutex_t clearNotifyList_mutex;
44 pthread_mutexattr_t mainobjstore_mutex_attr; /* Attribute for lock to make it a recursive lock */
45
46 sockPoolHashTable_t *transPResponseSocketPool;
47 extern sockPoolHashTable_t *transRequestSockPool;
48 extern sockPoolHashTable_t *transReadSockPool;
49
50 int failFlag = 0; //debug
51
52 #ifdef RECOVERY
53 /******************************
54  * Global variables for Paxos
55  ******************************/
56 extern int n_a;
57 extern unsigned int v_a;
58 extern int n_h;
59 extern int my_n;
60 extern int leader;
61 extern int paxosRound;
62 /* This function initializes the main objects store and creates the
63  * global machine and location lookup table */
64 #endif
65
66 int dstmInit(void) {
67   mainobjstore = objstrCreate(DEFAULT_OBJ_STORE_SIZE);
68   /* Initialize attribute for mutex */
69   pthread_mutexattr_init(&mainobjstore_mutex_attr);
70   pthread_mutexattr_settype(&mainobjstore_mutex_attr, PTHREAD_MUTEX_RECURSIVE_NP);
71   pthread_mutex_init(&mainobjstore_mutex, &mainobjstore_mutex_attr);
72   pthread_mutex_init(&lockObjHeader,NULL);
73
74 #ifdef RECOVERY
75         pthread_mutex_init(&liveHosts_mutex, NULL);
76         pthread_mutex_init(&leaderFixing_mutex, NULL);
77   pthread_mutex_init(&clearNotifyList_mutex,NULL);
78 #endif
79
80   if (mhashCreate(MHASH_SIZE, MLOADFACTOR))
81     return 1;             //failure
82
83   if (lhashCreate(HASH_SIZE, LOADFACTOR))
84     return 1;             //failure
85
86 #ifdef RECOVERY
87   if (thashCreate(THASH_SIZE, LOADFACTOR))
88     return 1;
89 #endif
90
91   if (notifyhashCreate(N_HASH_SIZE, N_LOADFACTOR))
92     return 1;             //failure
93
94   //Initialize socket pool
95   if((transPResponseSocketPool = createSockPool(transPResponseSocketPool, DEFAULTSOCKPOOLSIZE)) == NULL) {
96     printf("Error in creating new socket pool at  %s line %d\n", __FILE__, __LINE__);
97     return 0;
98   }
99
100   return 0;
101 }
102
103 int startlistening() {
104   int listenfd;
105   struct sockaddr_in my_addr;
106   socklen_t addrlength = sizeof(struct sockaddr);
107   int setsockflag=1;
108
109   listenfd = socket(AF_INET, SOCK_STREAM, 0);
110   if (listenfd == -1) {
111     perror("socket");
112     exit(1);
113   }
114
115   if (setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, &setsockflag, sizeof (setsockflag)) < 0) {
116     perror("socket");
117     exit(1);
118   }
119 #ifdef MAC
120   if (setsockopt(listenfd, SOL_SOCKET, SO_NOSIGPIPE, &setsockflag, sizeof (setsockflag)) < 0) {
121     perror("socket");
122     exit(1);
123   }
124 #endif
125
126   my_addr.sin_family = AF_INET;
127   my_addr.sin_port = htons(LISTEN_PORT);
128   my_addr.sin_addr.s_addr = INADDR_ANY;
129   memset(&(my_addr.sin_zero), '\0', 8);
130
131   if (bind(listenfd, (struct sockaddr *)&my_addr, addrlength) == -1) {
132     perror("bind");
133     exit(1);
134   }
135
136   if (listen(listenfd, BACKLOG) == -1) {
137     perror("listen");
138     exit(1);
139   }
140   return listenfd;
141 }
142
143 /* This function starts the thread to listen on a socket
144  * for tranaction calls */
145 void *dstmListen(void *lfd) {
146   int listenfd=(int)lfd;
147   int acceptfd;
148   struct sockaddr_in client_addr;
149   socklen_t addrlength = sizeof(struct sockaddr);
150   pthread_t thread_dstm_accept;
151
152 #ifdef RECOVERY
153   int firsttime = 1;
154   pthread_t thread_dstm_asking;
155 #endif
156 #ifdef DEBUG
157   printf("Listening on port %d, fd = %d\n", LISTEN_PORT, listenfd);
158 #endif
159   while(1) {
160     int retval;
161     int flag=1;
162     acceptfd = accept(listenfd, (struct sockaddr *)&client_addr, &addrlength);
163
164 #ifdef RECOVERY
165     if(firsttime) {
166       do {
167         retval = pthread_create(&thread_dstm_asking, NULL, startAsking, NULL);
168       }while(retval!=0);
169       firsttime=0;
170       pthread_detach(thread_dstm_asking);
171     }
172 #endif
173 #ifdef debug
174     printf("%s -> fd accepted\n",__func__);
175 #endif
176
177     setsockopt(acceptfd, IPPROTO_TCP, TCP_NODELAY, (char *) &flag, sizeof(flag));
178     do {
179         retval=pthread_create(&thread_dstm_accept, NULL, dstmAccept, (void *)acceptfd);
180     } while(retval!=0);
181     pthread_detach(thread_dstm_accept);
182   }
183 }
184
185 #ifdef RECOVERY
186 void* startAsking()
187 {
188   unsigned int deadMachineIndex = -1;
189   int i;
190   int validHost;
191   int *socklist;
192   int sd;
193 #ifdef DEBUG
194   printf("%s -> Entering\n",__func__);
195 #endif
196
197     socklist = (int*) calloc(numHostsInSystem,sizeof(int)); 
198
199     for(i = 0; i< numHostsInSystem;i++) { // for 1
200         if((sd = getSockWithLock(transPResponseSocketPool,hostIpAddrs[i])) < 0) {
201           printf("%s -> Cannot create socket connection to [%s]\n",__func__,midtoIPString(hostIpAddrs[i]));
202           socklist[i] = -1;
203         }
204         else { // else 1
205           socklist[i] = sd;
206         }   // end of else 1
207     }
208   
209     while(1) {
210
211      deadMachineIndex = checkIfAnyMachineDead(socklist);
212
213       // free socket of dead machine
214       if(deadMachineIndex >= 0) { // if 2
215 #ifdef DEBUG
216         printf("%s -> Dead Machine : %s\n",__func__, midtoIPString(hostIpAddrs[deadMachineIndex]));
217 #endif
218         restoreDuplicationState(hostIpAddrs[deadMachineIndex]);
219         freeSockWithLock(transPResponseSocketPool, hostIpAddrs[deadMachineIndex], socklist[deadMachineIndex]);
220         socklist[deadMachineIndex] = -1;
221       } // end of if 2
222     } // end of while 1
223 #ifdef DEBUG
224    printf("%s -> Exiting\n",__func__);
225 #endif
226 }
227
228
229 unsigned int checkIfAnyMachineDead(int* socklist)
230 {
231   int timeout = 0;
232   int i;
233   char control = RESPOND_LIVE;
234   char response;
235 #ifdef DEBUG
236   printf("%s -> Entering\n",__func__);
237 #endif
238   
239   while(1){
240     for(i = 0; i< numHostsInSystem;i++) {
241 #ifdef DEBUG
242       printf("%s -> socklist[%d] = %d\n",__func__,i,socklist[i]);
243 #endif
244       if(socklist[i] > 0) {
245         send_data(socklist[i], &control,sizeof(char));
246
247         if(recv_data(socklist[i], &response, sizeof(char)) < 0) {
248           // if machine is dead, returns index of socket
249 #ifdef DEBUG
250           printf("%s -> Machine dead detecteed\n",__func__);
251 #endif
252           return i;
253         }
254         else {
255           // machine responded
256           if(response != LIVE) {
257 #ifdef DEBUG
258             printf("%s -> Machine dead detected\n",__func__);
259 #endif
260             return i;
261           }
262         } // end else
263       }// end if(socklist[i]
264     } // end for()
265
266     clearDeadThreadsNotification();
267
268     sleep(numLiveHostsInSystem);  // wait for seconds for next checking
269   } // end while(1)
270 }
271 #endif
272
273
274 /* This function accepts a new connection request, decodes the control message in the connection
275  * and accordingly calls other functions to process new requests */
276 void *dstmAccept(void *acceptfd) {
277   int val, retval, size, sum, sockid, sd = 0;
278   unsigned int oid;
279   char *buffer;
280         char control,ctrl, response;
281         char *ptr;
282         void *srcObj;
283         void *dupeptr;
284         int i, tempsize;
285         objheader_t *h;
286         trans_commit_data_t transinfo;
287   unsigned short objType, *versionarry, version;
288         unsigned int *oidarry, numoid, mid, threadid;
289   int n, v;
290   unsigned int transIDreceived;
291   char decision;
292   struct sockaddr_in remoteAddr;
293
294 #ifdef DEBUG
295         printf("%s-> Entering dstmAccept\n", __func__); fflush(stdout);
296 #endif
297         /* Receive control messages from other machines */
298         while(1) {
299                 int ret=recv_data_errorcode((int)acceptfd, &control, sizeof(char));
300     dupeptr = NULL;
301
302                 if (ret==0)
303                         break;
304                 if (ret==-1) {
305 #ifdef DEBUG
306                         printf("DEBUG -> RECV Error!.. retrying\n");
307 #endif
308         //              exit(0);
309                         break;
310                 }
311 #ifdef DEBUG
312                 printf("%s-> dstmAccept control = %d\n", __func__, (int)control);
313 #endif
314                 switch(control) {
315                         case READ_REQUEST:
316 #ifdef DEBUG
317         printf("control -> READ_REQUEST\n");
318 #endif
319                                 /* Read oid requested and search if available */
320                                 recv_data((int)acceptfd, &oid, sizeof(unsigned int));
321                                 while((srcObj = mhashSearch(oid)) == NULL) {
322                                         int ret;
323 //          printf("HERE!!\n");
324                                         if((ret = sched_yield()) != 0) {
325                                                 printf("%s(): error no %d in thread yield\n", __func__, errno);
326                                         }
327                                 }
328                                 h = (objheader_t *) srcObj;
329                                 GETSIZE(size, h);
330                                 size += sizeof(objheader_t);
331                                 sockid = (int) acceptfd;
332                                 if (h == NULL) {
333                                         ctrl = OBJECT_NOT_FOUND;
334                                         send_data(sockid, &ctrl, sizeof(char));
335                                 } else {
336                                         // Type
337                                         char msg[]={OBJECT_FOUND, 0, 0, 0, 0};
338                                         *((int *)&msg[1])=size;
339                                         send_data(sockid, &msg, sizeof(msg));
340                                         send_data(sockid, h, size);
341                                 }
342                                 break;
343
344                         case READ_MULT_REQUEST:
345                                 break;
346
347                         case MOVE_REQUEST:
348                                 break;
349
350                         case MOVE_MULT_REQUEST:
351                                 break;
352
353                         case TRANS_REQUEST:
354 #ifdef DEBUG
355         printf("control -> TRANS_REQUEST\n");
356 #endif
357                                 /* Read transaction request */
358                                 transinfo.objlocked = NULL;
359                                 transinfo.objnotfound = NULL;
360                                 transinfo.modptr = NULL;
361                                 transinfo.numlocked = 0;
362                                 transinfo.numnotfound = 0;
363                                 if((val = readClientReq(&transinfo, (int)acceptfd)) != 0) {
364                                         printf("Error: In readClientReq() %s, %d\n", __FILE__, __LINE__);
365                                         pthread_exit(NULL);
366                                 }
367                                 break;
368 #ifdef RECOVERY
369       case ASK_COMMIT :
370
371         recv_data((int)acceptfd, &transIDreceived, sizeof(unsigned int));
372
373         decision = checkDecision(transIDreceived);
374
375         send_data((int)acceptfd,&decision,sizeof(char));
376
377         break;
378 #endif
379                         case TRANS_PREFETCH:
380 #ifdef DEBUG
381         printf("control -> TRANS_PREFETCH\n");
382 #endif
383 #ifdef RANGEPREFETCH
384                                 if((val = rangePrefetchReq((int)acceptfd)) != 0) {
385                                         printf("Error: In rangePrefetchReq() %s, %d\n", __FILE__, __LINE__);
386                                         break;
387                                 }
388 #else
389                                 if((val = prefetchReq((int)acceptfd)) != 0) {
390                                         printf("Error: In prefetchReq() %s, %d\n", __FILE__, __LINE__);
391                                         break;
392                                 }
393 #endif
394                                 break;
395
396                         case TRANS_PREFETCH_RESPONSE:
397 #ifdef DEBUG
398                 printf("control -> TRANS_PREFETCH_RESPONSE\n");
399 #endif
400 #ifdef RANGEPREFETCH
401                                 if((val = getRangePrefetchResponse((int)acceptfd)) != 0) {
402                                         printf("Error: In getRangePrefetchRespose() %s, %d\n", __FILE__, __LINE__);
403                                         break;
404                                 }
405 #else
406                                 if((val = getPrefetchResponse((int) acceptfd)) != 0) {
407                                         printf("Error: In getPrefetchResponse() %s, %d\n", __FILE__, __LINE__);
408                                         break;
409                                 }
410 #endif
411                                 break;
412
413                         case START_REMOTE_THREAD:
414 #ifdef DEBUG
415         printf("control -> START_REMOTE_THREAD\n");
416 #endif
417                                 recv_data((int)acceptfd, &oid, sizeof(unsigned int));
418                                 objType = getObjType(oid);
419                                 startDSMthread(oid, objType);
420                                 break;
421
422       case THREAD_NOTIFY_REQUEST:
423 #ifdef DEBUG
424         printf("control -> THREAD_NOTIFY_REQUEST FD : %d\n",acceptfd);
425 #endif
426         numoid = 0;
427                                 recv_data((int)acceptfd, &numoid, sizeof(unsigned int));
428       
429                                 size = (sizeof(unsigned int) + sizeof(unsigned short)) * numoid + 2 * sizeof(unsigned int);
430                                 if((buffer = calloc(1,size)) == NULL) {
431                                         printf("%s() Calloc error at %s, %d\n", __func__, __FILE__, __LINE__);
432                                         pthread_exit(NULL);
433                                 }
434                 
435                                 recv_data((int)acceptfd, buffer, size);
436
437                                 oidarry = calloc(numoid, sizeof(unsigned int));
438                                 memcpy(oidarry, buffer, sizeof(unsigned int) * numoid);
439                                 size = sizeof(unsigned int) * numoid;
440                                 versionarry = calloc(numoid, sizeof(unsigned short));
441                                 memcpy(versionarry, buffer+size, sizeof(unsigned short) * numoid);
442                                 size += sizeof(unsigned short) * numoid;
443                                 mid = *((unsigned int *)(buffer+size));
444                                 size += sizeof(unsigned int);
445                                 threadid = *((unsigned int *)(buffer+size));
446                                 processReqNotify(numoid, oidarry, versionarry, mid, threadid);
447                                 free(buffer);
448
449                                 break;
450
451                         case THREAD_NOTIFY_RESPONSE:
452 #ifdef DEBUG
453         printf("control -> THREAD_NOTIFY_RESPONSE\n");
454 #endif
455                                 size = sizeof(unsigned short) + 2 * sizeof(unsigned int);
456                                 if((buffer = calloc(1,size)) == NULL) {
457                                         printf("%s() Calloc error at %s, %d\n", __func__, __FILE__, __LINE__);
458                                         pthread_exit(NULL);
459                                 }
460
461                                 recv_data((int)acceptfd, buffer, size);
462
463
464                                 oid = *((unsigned int *)buffer);
465                                 size = sizeof(unsigned int);
466                                 version = *((unsigned short *)(buffer+size));
467                                 size += sizeof(unsigned short);
468                                 threadid = *((unsigned int *)(buffer+size));
469                                 threadNotify(oid,version,threadid);
470                                 free(buffer);
471                                 break;
472 #ifdef RECOVERY
473       case CLEAR_NOTIFY_LIST:
474 #ifdef DEBUG
475         printf("control -> CLEAR_NOTIFY_LIST\n");
476 #endif  
477         size = sizeof(unsigned int);
478         if((buffer = calloc(1,size)) == NULL) {
479           printf("%s() Caclloc error at CLEAR_NOTIFY_LIST\n");
480           pthread_exit(NULL);
481         }
482
483         recv_data((int)acceptfd,buffer, size);
484
485         oid = *((unsigned int *)buffer);
486
487         pthread_mutex_lock(&clearNotifyList_mutex);
488         if(clearNotifyListFlag == 0) {
489           clearNotifyListFlag = 1;
490           pthread_mutex_unlock(&clearNotifyList_mutex);
491           clearNotifyList(oid);
492         }
493         else {
494           pthread_mutex_unlock(&clearNotifyList_mutex);
495         }
496         free(buffer);
497         break;
498 #endif
499                         case CLOSE_CONNECTION:
500 #ifdef DEBUG
501         printf("control -> CLOSE_CONNECTION\n");
502 #endif
503                                 goto closeconnection;
504
505 #ifdef RECOVERY
506                         case RESPOND_LIVE:
507 #ifdef DEBUG
508         printf("control -> RESPOND_LIVE\n");
509 #endif
510                                 ctrl = LIVE;
511                                 send_data((int)acceptfd, &ctrl, sizeof(ctrl));
512 #ifdef DEBUG
513                                 printf("%s (RESPOND_LIVE)-> Sending LIVE!\n", __func__);
514 #endif
515                                 break;
516 #endif
517 #ifdef RECOVERY
518                         case REMOTE_RESTORE_DUPLICATED_STATE:
519 #ifdef DEBUG
520         printf("control -> REMOTE_RESTORE_DUPLICATED_STATE\n");
521 #endif
522                                 recv_data((int)acceptfd, &mid, sizeof(unsigned int));
523                                 if(!liveHosts[findHost(mid)]) {
524 #ifdef DEBUG
525           printf("%s (REMOTE_RESTORE_DUPLICATED_STATE) -> already fixed\n",__func__);
526 #endif
527                                         break;
528         }
529                                 pthread_mutex_lock(&leaderFixing_mutex);
530                                 if(!leaderFixing) {
531                                         leaderFixing = 1;
532                                         pthread_mutex_unlock(&leaderFixing_mutex);
533                                         // begin fixing
534                                         updateLiveHosts();
535                                         duplicateLostObjects(mid);
536                                 if(updateLiveHostsCommit() != 0) {
537                                         printf("error updateLiveHostsCommit()\n");
538                                         exit(1);
539                                 }
540
541         // finish fixing
542                                 pthread_mutex_lock(&leaderFixing_mutex);
543                                 leaderFixing = 0;
544                                 pthread_mutex_unlock(&leaderFixing_mutex);
545                                 }
546                                 else {
547                                         pthread_mutex_unlock(&leaderFixing_mutex);
548 #ifdef DEBUG
549           printf("%s (REMOTE_RESTORE_DUPLICATED_STATE -> LEADER is already fixing\n",__func__);
550 #endif
551           sleep(WAIT_TIME);
552                                 }
553                                 break;
554 #endif
555 #ifdef RECOVERY
556                         case UPDATE_LIVE_HOSTS:
557 #ifdef DEBUG
558         printf("control -> UPDATE_LIVE_HOSTS\n");
559 #endif
560                                 // copy back
561                                 pthread_mutex_lock(&liveHosts_mutex);
562                           recv_data((int)acceptfd, liveHosts, sizeof(int)*numHostsInSystem);
563                                 recv_data((int)acceptfd, locateObjHosts, sizeof(unsigned int)*numHostsInSystem*2);
564                                 pthread_mutex_unlock(&liveHosts_mutex);
565                                 numLiveHostsInSystem = getNumLiveHostsInSystem();
566 #ifdef DEBUG
567                                 printHostsStatus();
568                           printf("%s (UPDATE_LIVE_HOSTS)-> Finished\n", __func__);      
569 #endif
570                                 //exit(0);
571                                 break;
572 #endif
573
574 #ifdef RECOVERY
575                         case DUPLICATE_ORIGINAL:
576 #ifdef DEBUG
577         printf("control -> DUPLICATE_ORIGINAL\n");
578                                 printf("%s (DUPLICATE_ORIGINAL)-> Attempt to duplicate original objects\n", __func__);  
579 #endif
580                                 //object store stuffffff
581                                 recv_data((int)acceptfd, &mid, sizeof(unsigned int));
582                                 tempsize = mhashGetDuplicate(&dupeptr, 0);
583
584                                 //send control and dupes after
585                                 ctrl = RECEIVE_DUPES;
586
587         if((sd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
588           perror("ORIGINAL : ");
589           exit(0);
590         }
591
592         bzero(&remoteAddr, sizeof(remoteAddr));
593         remoteAddr.sin_family = AF_INET;
594         remoteAddr.sin_port = htons(LISTEN_PORT);
595         remoteAddr.sin_addr.s_addr = htonl(mid);
596
597         if(connect(sd, (struct sockaddr *)&remoteAddr, sizeof(remoteAddr))<0) {
598           printf("ORIGINAL ERROR : %s\n",strerror(errno));
599           exit(0);
600         }
601         else {
602                                 send_data(sd, &ctrl, sizeof(char));
603                                 send_data(sd, dupeptr, tempsize);
604
605                                 recv_data(sd, &response, sizeof(char));
606 #ifdef DEBUG
607           printf("%s ->response : %d  -  %d\n",__func__,response,DUPLICATION_COMPLETE);
608 #endif
609                                 if(response != DUPLICATION_COMPLETE) {
610 #ifdef DEBUG
611            printf("%s(DUPLICATION_ORIGINAL) -> DUPLICATION FAIL\n",__func__);
612 #endif
613                                   //fail message
614            exit(0);
615                                 }
616
617           close(sd);
618         }
619         free(dupeptr);
620
621         ctrl = DUPLICATION_COMPLETE;
622                                 send_data((int)acceptfd, &ctrl, sizeof(char));
623 #ifndef DEBUG
624                                 printf("%s (DUPLICATE_ORIGINAL)-> Finished\n", __func__);       
625 #endif
626                                 break;
627
628                         case DUPLICATE_BACKUP:
629 #ifndef DEBUG
630         printf("control -> DUPLICATE_BACKUP\n");
631                                 printf("%s (DUPLICATE_BACKUP)-> Attempt to duplicate backup objects\n", __func__);
632 #endif
633                                 //object store stuffffff
634                                 recv_data((int)acceptfd, &mid, sizeof(unsigned int));
635
636
637                                 tempsize = mhashGetDuplicate(&dupeptr, 1);
638
639                                 //send control and dupes after
640                                 ctrl = RECEIVE_DUPES;
641
642         if((sd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
643           perror("BACKUP : ");
644           exit(0);
645         }
646
647          bzero(&remoteAddr, sizeof(remoteAddr));                                       
648          remoteAddr.sin_family = AF_INET;                                              
649          remoteAddr.sin_port = htons(LISTEN_PORT);                                     
650          remoteAddr.sin_addr.s_addr = htonl(mid);                                      
651                                                                                        
652          if(connect(sd, (struct sockaddr *)&remoteAddr, sizeof(remoteAddr))<0) {       
653            printf("BACKUP ERROR : %s\n",strerror(errno));
654            exit(0);
655          }                                                                             
656          else {                                                                        
657           send_data(sd, &ctrl, sizeof(char));
658                                 send_data(sd, dupeptr, tempsize);
659           
660           recv_data(sd, &response, sizeof(char));
661 #ifdef DEBUG
662           printf("%s ->response : %d  -  %d\n",__func__,response,DUPLICATION_COMPLETE);
663 #endif
664                                 if(response != DUPLICATION_COMPLETE) {
665 #ifndef DEBUG
666             printf("%s(DUPLICATION_BACKUP) -> DUPLICATION FAIL\n",__func__);
667 #endif
668             exit(0);
669           }
670
671           close(sd);
672          }
673
674         free(dupeptr);
675
676                                 ctrl = DUPLICATION_COMPLETE;
677                                 send_data((int)acceptfd, &ctrl, sizeof(char));
678 #ifndef DEBUG
679                                 printf("%s (DUPLICATE_BACKUP)-> Finished\n", __func__); 
680 #endif
681                                 
682                                 break;
683
684                         case RECEIVE_DUPES:
685 #ifndef DEBUG
686         printf("control -> RECEIVE_DUPES sd : %d\n",(int)acceptfd);
687 #endif
688                                 if((readDuplicateObjs((int)acceptfd)) != 0) {
689                                         printf("Error: In readDuplicateObjs() %s, %d\n", __FILE__, __LINE__);
690                                         pthread_exit(NULL);
691                                 }
692
693                                 ctrl = DUPLICATION_COMPLETE;
694                                 send_data((int)acceptfd, &ctrl, sizeof(char));
695 #ifndef DEBUG
696         printf("%s (RECEIVE_DUPES) -> Finished\n",__func__);
697 #endif
698                                 break;
699 #endif
700 #ifdef RECOVERY
701                         case PAXOS_PREPARE:
702 #ifdef DEBUG
703         printf("control -> PAXOS_PREPARE\n");
704 #endif
705                                 recv_data((int)acceptfd, &val, sizeof(int));
706                                 if (val <= n_h) {
707                                         control = PAXOS_PREPARE_REJECT;
708                                         send_data((int)acceptfd, &control, sizeof(char));
709                                 }
710                                 else {
711                                         n_h = val;
712                                         control = PAXOS_PREPARE_OK;
713                     
714                                         send_data((int)acceptfd, &control, sizeof(char));
715                                         send_data((int)acceptfd, &n_a, sizeof(int));
716                                         send_data((int)acceptfd, &v_a, sizeof(int));
717                                 }
718                                 break;
719
720                         case PAXOS_ACCEPT:
721 #ifdef DEBUG
722         printf("control -> PAXOS_ACCEPT\n");
723 #endif
724                                 recv_data((int)acceptfd, &n, sizeof(int));
725                                 recv_data((int)acceptfd, &v, sizeof(int));
726                                 if (n < n_h) {
727                                         control = PAXOS_ACCEPT_REJECT;
728                                         send_data((int)acceptfd, &control, sizeof(char));
729                                 }
730                                 else {
731                                         n_a = n;
732                                         v_a = v;
733                                         n_h = n;
734                                         control = PAXOS_ACCEPT_OK;
735                                         send_data((int)acceptfd, &control, sizeof(char));
736                                 }
737                                 break;
738
739                         case PAXOS_LEARN:
740 #ifdef DEBUG
741         printf("control -> PAXOS_LEARN\n");
742 #endif
743                                 recv_data((int)acceptfd, &v, sizeof(int));
744                                 leader = v_a;
745                                 paxosRound++;
746 #ifdef DEBUG
747                                 printf("%s (PAXOS_LEARN)-> This is my leader!: [%s]\n", __func__, midtoIPString(leader));
748 #endif
749                                 break;
750
751                         case DELETE_LEADER:
752 #ifdef DEBUG
753         printf("control -> DELETE_LEADER\n");
754 #endif
755                                 v_a = 0;
756                                 break;
757 #endif
758                         default:
759                                 printf("Error: dstmAccept() Unknown opcode %d at %s, %d\n", control, __FILE__, __LINE__);
760                 }
761         }
762 #ifdef DEBUG
763         printf("%s-> Exiting\n", __func__); fflush(stdout);
764 #endif
765 closeconnection:
766         /* Close connection */
767         if (close((int)acceptfd) == -1)
768                 perror("close");
769         pthread_exit(NULL);
770 }
771
772 int readDuplicateObjs(int acceptfd) {
773         int numoid, i, size, tmpsize;
774         unsigned int oid;
775         void *dupeptr, *ptrcreate, *ptr;
776         objheader_t *header;
777
778 #ifdef DEBUG
779         printf("%s-> Start\n", __func__);
780 #endif
781         recv_data((int)acceptfd, &numoid, sizeof(unsigned int));
782         recv_data((int)acceptfd, &size, sizeof(int));   
783         // do i need array of oids?
784         // answer: no! now get to work
785         if(numoid != 0) {
786                 if ((dupeptr = calloc(1, size)) == NULL) {
787                         printf("calloc error for duplicated objects %s, %d\n", __FILE__, __LINE__);
788                         return 1;
789                 }
790
791                 recv_data((int)acceptfd, dupeptr, size);
792                 ptr = dupeptr;
793                 for(i = 0; i < numoid; i++) {
794                         header = (objheader_t *)ptr;
795                         oid = OID(header);
796                         GETSIZE(tmpsize, header);
797                         tmpsize += sizeof(objheader_t);
798
799 #ifdef DEBUG
800                         printf("%s-> oid being received/backed:%u, version:%d, type:%d\n", __func__, oid, header->version, TYPE(header));
801                         printf("STATUSPTR(header):%u, STATUS:%d\n", STATUSPTR(header), STATUS(header));
802 #endif
803
804       if(mhashSearch(oid) != NULL) {
805 #ifdef DEBUG
806         printf("%s -> oid : %d is already in there\n",__func__,oid);
807 #endif
808
809         if(header->notifylist != NULL) {
810           unsigned int *listSize = (ptr + tmpsize);
811           tmpsize += sizeof(unsigned int);
812           tmpsize += sizeof(threadlist_t) * (*listSize);
813         }
814       }
815       else {
816                         pthread_mutex_lock(&mainobjstore_mutex);
817                         if ((ptrcreate = objstrAlloc(&mainobjstore, tmpsize)) == NULL) {
818                                 printf("Error: readDuplicateObjs() failed objstrAlloc %s, %d\n", __FILE__, __LINE__);
819                                 pthread_mutex_unlock(&mainobjstore_mutex);
820                                   return 1;
821                         }
822                         pthread_mutex_unlock(&mainobjstore_mutex);
823               memcpy(ptrcreate, header, tmpsize);
824
825         objheader_t* oPtr = (objheader_t*)ptrcreate;
826
827         if(oPtr->notifylist != NULL) {
828           oPtr->notifylist = NULL;  // reset for new list
829           threadlist_t *listNode;
830           unsigned int* listSize = (ptr + tmpsize);  // get number of notifylist
831           unsigned int j;
832
833           tmpsize += sizeof(unsigned int);   // skip number of notifylist 
834           listNode = (threadlist_t*)(ptr + tmpsize); // get first element of address
835           for(j = 0; j< *listSize; j++) {      // retreive all threadlist
836             oPtr->notifylist = insNode(oPtr->notifylist,listNode[j].threadid,listNode[j].mid);
837           
838           }
839           tmpsize += sizeof(threadlist_t) * (*listSize);
840
841         }
842                 mhashInsert(oid, ptrcreate);
843       }
844                 ptr += tmpsize;
845                 }
846 #ifdef DEBUG
847                 printf("%s-> End\n", __func__);
848 #endif
849     free(dupeptr);
850                 return 0;
851         }
852         else {
853 #ifdef DEBUG
854                 printf("%s-> No objects duplicated\n", __func__);
855 #endif
856                 return 0;
857         }
858 }
859
860 /* This function reads the information available in a transaction request
861  * and makes a function call to process the request */
862 int readClientReq(trans_commit_data_t *transinfo, int acceptfd) {
863   char *ptr;
864   void *modptr;
865   unsigned int *oidmod, oid;
866   fixed_data_t fixed;
867   objheader_t *headaddr;
868   int sum, i, size, n, val;
869   int timeout;
870
871   oidmod = NULL;
872 #ifdef DEBUG
873         printf("%s-> Entering\n", __func__);
874 #endif
875
876   /* Read fixed_data_t data structure */
877   size = sizeof(fixed) - 1;
878   ptr = (char *)&fixed;
879   fixed.control = TRANS_REQUEST;
880   timeout = recv_data((int)acceptfd, ptr+1, size);
881
882   /* Read list of mids */
883   int mcount = fixed.mcount;
884   size = mcount * sizeof(unsigned int);
885   unsigned int listmid[mcount];
886   ptr = (char *) listmid;
887   timeout = recv_data((int)acceptfd, ptr, size);
888
889   if(timeout < 0)   // coordinator failed
890     return 0;
891
892   /* Read oid and version tuples for those objects that are not modified in the transaction */
893   int numread = fixed.numread;
894   size = numread * (sizeof(unsigned int) + sizeof(unsigned short));
895   char objread[size];
896   if(numread != 0) { //If pile contains more than one object to be read,
897     // keep reading all objects
898     timeout = recv_data((int)acceptfd, objread, size);
899   }
900
901   /* Read modified objects */
902   if(fixed.nummod != 0) {
903     if ((modptr = calloc(1, fixed.sum_bytes)) == NULL) {
904       printf("calloc error for modified objects %s, %d\n", __FILE__, __LINE__);
905       return 1;
906     }
907     size = fixed.sum_bytes;
908     timeout = recv_data((int)acceptfd, modptr, size);
909   }
910
911   if(timeout < 0) // coordinator failed
912     return 0;
913
914   /* Create an array of oids for modified objects */
915   oidmod = (unsigned int *) calloc(fixed.nummod, sizeof(unsigned int));
916   if (oidmod == NULL) {
917     printf("calloc error %s, %d\n", __FILE__, __LINE__);
918     return 1;
919   }
920   ptr = (char *) modptr;
921   for(i = 0 ; i < fixed.nummod; i++) {
922     int tmpsize;
923     headaddr = (objheader_t *) ptr;
924     oid = OID(headaddr);
925     oidmod[i] = oid;
926     GETSIZE(tmpsize, headaddr);
927     ptr += sizeof(objheader_t) + tmpsize;
928   }
929 #ifdef DEBUG
930         printf("%s-> num oid read = %d, oids modified = %d, size = %d\n", __func__, fixed.numread,  fixed.nummod, size);
931 #endif
932   /*Process the information read */
933   if((val = processClientReq(&fixed, transinfo, listmid, objread, modptr, oidmod, acceptfd)) != 0) {
934     printf("Error: In processClientReq() %s, %d\n", __FILE__, __LINE__);
935     /* Free resources */
936     if(oidmod != NULL) {
937       free(oidmod);
938     }
939     return 1;
940   }
941
942   /* Free resources */
943   if(oidmod != NULL) {
944     free(oidmod);
945   }
946 #ifdef DEBUG
947         printf("%s-> Exiting\n", __func__);
948 #endif
949
950   return 0;
951 }
952
953 /* This function processes the Coordinator's transaction request using "handleTransReq"
954  * function and sends a reply to the co-ordinator.
955  * Following this it also receives a new control message from the co-ordinator and processes this message*/
956 int processClientReq(fixed_data_t *fixed, trans_commit_data_t *transinfo,
957                      unsigned int *listmid, char *objread, void *modptr, unsigned int *oidmod, int acceptfd) {
958
959   char control, sendctrl, retval;
960   
961   objheader_t *tmp_header;
962   void *header;
963   int i = 0, val;
964   unsigned int transID;
965 #ifdef DEBUG
966         printf("%s-> Entering\n", __func__);
967 #endif
968
969   /* receives transaction id */
970   recv_data((int)acceptfd, &transID, sizeof(unsigned int));
971
972   /* Send reply to the Coordinator */
973   if((retval = handleTransReq(fixed, transinfo, listmid, objread, modptr,acceptfd)) == 0 ) {
974     printf("Error: In handleTransReq() %s, %d\n", __FILE__, __LINE__);
975           printf("DEBUG-> Exiting processClientReq, line = %d\n", __LINE__);
976     return 1;
977   }
978
979         int timeout = recv_data((int)acceptfd, &control, sizeof(char));
980         /* Process the new control message */
981 #ifdef DEBUG
982   printf("%s -> timeout = %d   control = %d\n",__func__,timeout,control); 
983 #endif
984   
985 #ifdef RECOVERY
986   if(timeout < 0) {  // timeout. failed to receiving data from coordinator
987 #ifdef DEBUG
988     printf("%s -> timeout!! assumes coordinator is dead\n",__func__);
989 #endif
990     control = receiveDecisionFromBackup(transID,fixed->mcount,listmid);
991 #ifdef DEBUG
992     printf("%s -> received Decision %d\n",__func__,control);
993 #endif
994   }    
995   
996   /* insert received control into thash for another transaction*/
997   thashInsert(transID, control);
998 #endif
999   switch(control) {
1000                 case TRANS_ABORT:
1001                         if (fixed->nummod > 0)
1002                                 free(modptr);
1003                         /* Unlock objects that was locked due to this transaction */
1004                         int useWriteUnlock = 0;
1005                         for(i = 0; i< transinfo->numlocked; i++) {
1006                                 if(transinfo->objlocked[i] == -1) {
1007                                         useWriteUnlock = 1;
1008                                         continue;
1009                                 }
1010                                 if((header = mhashSearch(transinfo->objlocked[i])) == NULL) {
1011                                         printf("mhashSearch returns NULL at %s, %d\n", __FILE__, __LINE__); // find the header address
1012                                         printf("%s-> Exiting, line:%d\n", __func__, __LINE__);
1013                                         return 1;
1014                                 }
1015                                 if(useWriteUnlock) {
1016                                         write_unlock(STATUSPTR(header));
1017                                 } else {
1018                                         read_unlock(STATUSPTR(header));
1019                                 }
1020                         }
1021                         break;
1022
1023                 case TRANS_COMMIT:
1024                         /* Invoke the transCommit process() */
1025                         if((val = transCommitProcess(modptr, oidmod, transinfo->objlocked, fixed->nummod, transinfo->numlocked, (int)acceptfd)) != 0) {
1026                                 printf("Error: In transCommitProcess() %s, %d\n", __FILE__, __LINE__);
1027                                 /* Free memory */
1028                                 if (transinfo->objlocked != NULL) {
1029                                         free(transinfo->objlocked);
1030                                 }
1031                                 if (transinfo->objnotfound != NULL) {
1032                                         free(transinfo->objnotfound);
1033                                 }
1034                                 printf("%s-> Exiting, line:%d\n", __func__, __LINE__);
1035                                 return 1;
1036                         }
1037
1038
1039                         break;
1040
1041                 case TRANS_ABORT_BUT_RETRY_COMMIT_WITH_RELOCATING:
1042                         break;
1043
1044                 default:
1045                         printf("Error: No response to TRANS_AGREE OR DISAGREE protocol %s, %d\n", __FILE__, __LINE__);
1046                         //TODO Use fixed.trans_id  TID since Client may have died
1047                         break;
1048         }
1049
1050   /* Free memory */
1051   if (transinfo->objlocked != NULL) {
1052     free(transinfo->objlocked);
1053   }
1054   if (transinfo->objnotfound != NULL) {
1055     free(transinfo->objnotfound);
1056   }
1057 #ifdef DEBUG
1058         printf("%s-> Exiting, line:%d\n", __func__, __LINE__);
1059 #endif
1060
1061   return 0;
1062 }
1063
1064 #ifdef RECOVERY
1065 char checkDecision(unsigned int transID) 
1066 {
1067 #ifdef DEBUG
1068   printf("%s -> transID :  %u\n",__func__,transID);
1069 #endif
1070
1071   char response = thashSearch(transID);
1072
1073   if(response == 0)
1074     return -1;
1075   else
1076     return response;
1077 }
1078 #endif
1079
1080 /* This function increments counters while running a voting decision on all objects involved
1081  * in TRANS_REQUEST and If a TRANS_DISAGREE sends the response immediately back to the coordinator */
1082 char handleTransReq(fixed_data_t *fixed, trans_commit_data_t *transinfo, unsigned int *listmid, char *objread, void *modptr, int acceptfd) {
1083   int val, i = 0, j;
1084   unsigned short version;
1085   char control = 0, *ptr;
1086   unsigned int oid;
1087   unsigned int *oidnotfound, *oidlocked, *oidvernotmatch;
1088   objheader_t *headptr;
1089
1090   /* Counters and arrays to formulate decision on control message to be sent */
1091   oidnotfound = (unsigned int *) calloc(fixed->numread + fixed->nummod, sizeof(unsigned int));
1092   oidlocked = (unsigned int *) calloc(fixed->numread + fixed->nummod + 1, sizeof(unsigned int));
1093   oidvernotmatch = (unsigned int *) calloc(fixed->numread + fixed->nummod, sizeof(unsigned int));
1094   int objnotfound = 0, objlocked = 0, objvernotmatch = 0;
1095   int v_nomatch = 0, v_matchlock = 0, v_matchnolock = 0;
1096   int numBytes = 0;
1097   /* modptr points to the beginning of the object store
1098    * created at the Pariticipant.
1099    * Object store holds the modified objects involved in the transaction request */
1100   ptr = (char *) modptr;
1101
1102   /* Process each oid in the machine pile/ group per thread */
1103   for (i = 0; i < fixed->numread + fixed->nummod; i++) {
1104     if (i < fixed->numread) { //Objs only read and not modified
1105       int incr = sizeof(unsigned int) + sizeof(unsigned short); // Offset that points to next position in the objread array
1106       incr *= i;
1107       oid = *((unsigned int *)(objread + incr));
1108       incr += sizeof(unsigned int);
1109       version = *((unsigned short *)(objread + incr));
1110 #ifdef DEBUG
1111       printf("%s -> oid : %u    version : %d\n",__func__,oid,version);
1112 #endif
1113       getCommitCountForObjRead(oidnotfound, oidlocked, oidvernotmatch, &objnotfound, &objlocked, &objvernotmatch,
1114                                &v_matchnolock, &v_matchlock, &v_nomatch, &numBytes, &control, oid, version);
1115     } else {  //Objs modified
1116       if(i == fixed->numread) {
1117         oidlocked[objlocked++] = -1;
1118       }
1119       int tmpsize;
1120       headptr = (objheader_t *) ptr;
1121       oid = OID(headptr);
1122       version = headptr->version;
1123       GETSIZE(tmpsize, headptr);
1124       ptr += sizeof(objheader_t) + tmpsize;
1125
1126       getCommitCountForObjMod(oidnotfound, oidlocked, oidvernotmatch, &objnotfound,
1127                               &objlocked, &objvernotmatch, &v_matchnolock, &v_matchlock, &v_nomatch,
1128                               &numBytes, &control, oid, version);
1129     }
1130   }
1131
1132   /* send TRANS_DISAGREE and objs*/
1133         if(v_nomatch > 0) {
1134 #ifdef CACHE
1135                 char *objs = calloc(1, numBytes);
1136                 int j, offset = 0;
1137                 for(j = 0; j<objvernotmatch; j++) {
1138                         objheader_t *header = mhashSearch(oidvernotmatch[j]);
1139                         int size = 0;
1140                         GETSIZE(size, header);
1141                         size += sizeof(objheader_t);
1142                         memcpy(objs+offset, header, size);
1143                         offset += size;
1144                 }
1145 #endif
1146                 if (objlocked > 0) {
1147                         int useWriteUnlock = 0;
1148                         for(j = 0; j < objlocked; j++) {
1149                                 if(oidlocked[j] == -1) {
1150                                         useWriteUnlock = 1;
1151                                         continue;
1152                                 }
1153                                 if((headptr = mhashSearch(oidlocked[j])) == NULL) {
1154                                         printf("mhashSearch returns NULL at %s, %d\n", __FILE__, __LINE__);
1155                                         return 0;
1156                                 }
1157                                 if(useWriteUnlock) {
1158                                         write_unlock(STATUSPTR(headptr));
1159                                 } else {
1160                                         read_unlock(STATUSPTR(headptr));
1161                                 }
1162                         }
1163                         free(oidlocked);
1164                 }
1165     
1166
1167 #ifdef DEBUG
1168                 printf("%s -> control = %d, file = %s, line = %d\n", __func__,(int)control, __FILE__, __LINE__);
1169 #endif
1170
1171                 send_data(acceptfd, &control, sizeof(char));
1172 #ifdef CACHE
1173                 send_data(acceptfd, &numBytes, sizeof(int));
1174                 send_data(acceptfd, objs, numBytes);
1175
1176                 transinfo->objvernotmatch = oidvernotmatch;
1177                 transinfo->numvernotmatch = objvernotmatch;
1178                 free(objs);
1179                 free(transinfo->objvernotmatch);
1180 #endif
1181                 return control;
1182   }
1183
1184   /* Decide what control message to send to Coordinator */
1185   if ((control = decideCtrlMessage(fixed, transinfo, &v_matchnolock, &v_matchlock, &v_nomatch, &objnotfound, &objlocked,
1186                                    modptr, oidnotfound, oidlocked, acceptfd)) == 0) {
1187     printf("Error: In decideCtrlMessage() %s, %d\n", __FILE__, __LINE__);
1188     return 0;
1189   }
1190   return control;
1191 }
1192
1193 /* Update Commit info for objects that are modified */
1194 void getCommitCountForObjMod(unsigned int *oidnotfound, unsigned int *oidlocked,
1195                              unsigned int *oidvernotmatch, int *objnotfound, int *objlocked, int *objvernotmatch,
1196                              int *v_matchnolock, int *v_matchlock, int *v_nomatch, int *numBytes,
1197                              char *control, unsigned int oid, unsigned short version) {
1198   void *mobj;
1199   /* Check if object is still present in the machine since the beginning of TRANS_REQUEST */
1200         //printf("version number: %d\n", version);
1201 #ifdef RECOVERY
1202   if(version == 1) {
1203                 (*v_matchnolock)++;
1204 #ifdef DEBUG
1205                 printf("%s -> *backup object* oid:%u\n", __func__,oid);
1206 #endif
1207                 return;
1208         }
1209 #endif
1210
1211         if ((mobj = mhashSearch(oid)) == NULL) {    /* Obj not found */
1212 #ifdef DEBUG
1213                 printf("Obj not found: %s() oid = %d, type = %d\t\n", __func__, OID(mobj), TYPE((objheader_t *)mobj));
1214                 fflush(stdout);
1215 #endif
1216                 /* Save the oids not found and number of oids not found for later use */
1217                 oidnotfound[*objnotfound] = oid;
1218                 (*objnotfound)++;
1219         } else {     /* If Obj found in machine (i.e. has not moved) */
1220                 /* Check if Obj is locked by any previous transaction */
1221                 if (write_trylock(STATUSPTR(mobj))) { // Can acquire write lock
1222 #ifdef DEBUG
1223                   printf("****%s->Trying to acquire 'remote' writelock for oid:%d, version:%d\n", __func__, oid, version);
1224                         printf("this version: %d, mlookup version: %d\n", version, ((objheader_t *)mobj)->version);
1225 #endif
1226                         if (version == ((objheader_t *)mobj)->version) { /* match versions */
1227                                 (*v_matchnolock)++;
1228                         } else { /* If versions don't match ...HARD ABORT */
1229                                 (*v_nomatch)++;
1230                                 oidvernotmatch[*objvernotmatch] = oid;
1231                                 (*objvernotmatch)++;
1232                                 int size;
1233                                 GETSIZE(size, mobj);
1234                                 size += sizeof(objheader_t);
1235                                 *numBytes += size;
1236                                 /* Send TRANS_DISAGREE to Coordinator */
1237                                 *control = TRANS_DISAGREE;
1238                                 //printf("%s() oid = %d, type = %d\t", __func__, OID(mobj), TYPE((objheader_t *)mobj));
1239                         }
1240                         //Keep track of oid locked
1241                         oidlocked[(*objlocked)++] = OID(((objheader_t *)mobj));
1242                 } else {  //we are locked
1243                         if (version == ((objheader_t *)mobj)->version) {     /* Check if versions match */
1244                                 (*v_matchlock)++;
1245                         } else { /* If versions don't match ...HARD ABORT */
1246                                 (*v_nomatch)++;
1247                                 oidvernotmatch[*objvernotmatch] = oid;
1248                                 (*objvernotmatch)++;
1249                                 int size;
1250                                 GETSIZE(size, mobj);
1251                                 size += sizeof(objheader_t);
1252                                 *numBytes += size;
1253                                 *control = TRANS_DISAGREE;
1254                                 //printf("%s() oid = %d, type = %d\t", __func__, OID(mobj), TYPE((objheader_t *)mobj));
1255                         }
1256                 }
1257   }
1258 #ifdef DEBUG
1259         printf("%s -> oid: %u, v_matchnolock: %d, v_matchlock: %d, v_nomatch: %d\n",__func__,oid, *v_matchnolock, *v_matchlock, *v_nomatch);
1260 #endif
1261 }
1262
1263 /* Update Commit info for objects that are read */
1264 void getCommitCountForObjRead(unsigned int *oidnotfound, unsigned int *oidlocked, unsigned int *oidvernotmatch,
1265                               int *objnotfound, int *objlocked, int * objvernotmatch, int *v_matchnolock, int *v_matchlock,
1266                               int *v_nomatch, int *numBytes, char *control, unsigned int oid, unsigned short version) {
1267   void *mobj;
1268   /* Check if object is still present in the machine since the beginning of TRANS_REQUEST */
1269   //printf("version number: %d\n", version);
1270 #ifdef DEBUG
1271   printf("%s -> Entering\n",__func__);
1272 #endif
1273 #ifdef RECOVERY
1274   if(version == 1) {
1275                 (*v_matchnolock)++;
1276                 printf("*backup object* oid:%u\n", oid);
1277                 return;
1278         }
1279 #endif
1280
1281         if ((mobj = mhashSearch(oid)) == NULL) {    /* Obj not found */
1282 #ifdef DEBUG
1283     printf("%s -> Obj not found!\n",__func__);
1284           printf("%s -> Obj not found: oid = %d, type = %d\t\n", __func__,OID(mobj), TYPE((objheader_t *)mobj));
1285         fflush(stdout);
1286 #endif
1287     /* Save the oids not found and number of oids not found for later use */
1288     oidnotfound[*objnotfound] = oid;
1289     (*objnotfound)++;
1290   } else {     /* If Obj found in machine (i.e. has not moved) */
1291 #ifdef DEBUG
1292     printf("%s -> Obj found!!\n",__func__);
1293         printf("%s -> Obj found: oid = %d, type = %d\t\n", __func__, OID(mobj), TYPE((objheader_t *)mobj));
1294           fflush(stdout);
1295 #endif
1296     
1297     /* Check if Obj is locked by any previous transaction */
1298     if (read_trylock(STATUSPTR(mobj))) { //Can further acquire read locks
1299       if (version == ((objheader_t *)mobj)->version) { /* match versions */
1300         (*v_matchnolock)++;
1301       } else { /* If versions don't match ...HARD ABORT */
1302         (*v_nomatch)++;
1303         oidvernotmatch[(*objvernotmatch)++] = oid;
1304         int size;
1305         GETSIZE(size, mobj);
1306         size += sizeof(objheader_t);
1307         *numBytes += size;
1308       
1309         /* Send TRANS_DISAGREE to Coordinator */
1310         *control = TRANS_DISAGREE;
1311       }
1312
1313       //Keep track of oid locked
1314       oidlocked[(*objlocked)++] = OID(((objheader_t *)mobj));
1315     } else { /* Some other transaction has aquired a write lock on this object */
1316       if (version == ((objheader_t *)mobj)->version) { /* Check if versions match */
1317         (*v_matchlock)++;
1318       } else { /* If versions don't match ...HARD ABORT */
1319         (*v_nomatch)++;
1320         oidvernotmatch[*objvernotmatch] = oid;
1321         (*objvernotmatch)++;
1322         int size;
1323         GETSIZE(size, mobj);
1324         size += sizeof(objheader_t);
1325         *numBytes += size;
1326         *control = TRANS_DISAGREE;
1327       }
1328     }
1329   }
1330 #ifdef DEBUG
1331         printf("%s -> oid: %u, v_matchnolock: %d, v_matchlock: %d, v_nomatch: %d\n",__func__, oid, *v_matchnolock, *v_matchlock, *v_nomatch);
1332 #endif
1333 }
1334
1335 /* This function decides what control message such as TRANS_AGREE, TRANS_DISAGREE or TRANS_SOFT_ABORT
1336  * to send to Coordinator based on the votes of oids involved in the transaction */
1337 char decideCtrlMessage(fixed_data_t *fixed, trans_commit_data_t *transinfo, int *v_matchnolock, int *v_matchlock,
1338                        int *v_nomatch, int *objnotfound, int *objlocked, void *modptr,
1339                        unsigned int *oidnotfound, unsigned int *oidlocked, int acceptfd) {
1340   int val;
1341   char control = 0;
1342
1343   /* Condition to send TRANS_AGREE */
1344   if(*(v_matchnolock) == fixed->numread + fixed->nummod) {
1345     control = TRANS_AGREE;
1346     /* Send control message */
1347 #ifdef DEBUG
1348                 printf("%s -> control = %s\n", __func__,"TRANS_AGREE");
1349 #endif
1350     send_data(acceptfd, &control, sizeof(char));
1351     
1352 #ifdef DEBUG
1353                 printf("%s -> finished sending control\n",__func__);
1354 #endif
1355   }
1356   /* Condition to send TRANS_SOFT_ABORT */
1357   else if((*(v_matchlock) > 0 && *(v_nomatch) == 0) || (*(objnotfound) > 0 && *(v_nomatch) == 0)) {
1358     control = TRANS_SOFT_ABORT;
1359 #ifdef DEBUG
1360                 printf("%s -> control = %s\n", __func__,"TRANS_SOFT_ABORT");
1361 #endif
1362     /* Send control message */
1363     send_data(acceptfd, &control, sizeof(char));
1364
1365     /*  FIXME how to send objs Send number of oids not found and the missing oids if objects are missing in the machine */
1366     if(*(objnotfound) != 0) {
1367       int msg[1];
1368       msg[0] = *(objnotfound);
1369       send_data(acceptfd, &msg, sizeof(int));
1370       int size = sizeof(unsigned int)* *(objnotfound);
1371       send_data(acceptfd, oidnotfound, size);
1372     }
1373   }
1374
1375   /* Fill out the trans_commit_data_t data structure. This is required for a trans commit process
1376    * if Participant receives a TRANS_COMMIT */
1377   transinfo->objlocked = oidlocked;
1378   transinfo->objnotfound = oidnotfound;
1379   transinfo->modptr = modptr;
1380   transinfo->numlocked = *(objlocked);
1381   transinfo->numnotfound = *(objnotfound);
1382   return control;
1383 }
1384
1385 /* This function processes all modified objects involved in a TRANS_COMMIT and updates pointer
1386  * addresses in lookup table and also changes version number
1387  * Sends an ACK back to Coordinator */
1388 int transCommitProcess(void *modptr, unsigned int *oidmod, unsigned int *oidlocked, int nummod, int numlocked, int acceptfd) {
1389   objheader_t *header;
1390   objheader_t *newheader;
1391   int i = 0, offset = 0;
1392   char control;
1393   int tmpsize;
1394   void *ptrcreate;
1395 #ifdef DEBUG
1396         printf("DEBUG-> Entering transCommitProcess, dstmserver.c\n");
1397         printf("nummod: %d, numlocked: %d\n", nummod, numlocked);
1398 #endif
1399
1400   /* Process each modified object saved in the mainobject store */
1401   for(i = 0; i < nummod; i++) {
1402     if((header = (objheader_t *) mhashSearch(oidmod[i])) == NULL) {
1403 #ifndef RECOVERY
1404       printf("Error: mhashsearch returns NULL at dstmserver.c %d\n", __LINE__);
1405                         return 1;
1406 #else
1407                         header = (objheader_t *)(modptr+offset);
1408                         header->version += 1;
1409                         header->isBackup = 1;
1410 #ifdef DEBUG
1411       printf("oid: %u, new header version: %d\n", oidmod[i], header->version);
1412 #endif
1413                         GETSIZE(tmpsize, header);
1414                         tmpsize += sizeof(objheader_t);
1415                         pthread_mutex_lock(&mainobjstore_mutex);
1416                         if ((ptrcreate = objstrAlloc(&mainobjstore, tmpsize)) == NULL) {
1417                                 printf("Error: transComProcess() failed objstrAlloc %s, %d\n", __FILE__, __LINE__);
1418                                 pthread_mutex_unlock(&mainobjstore_mutex);
1419                                 return 1;
1420                         }
1421                         pthread_mutex_unlock(&mainobjstore_mutex);
1422                         /* Initialize read and write locks  */
1423                         initdsmlocks(STATUSPTR(header));
1424                         memcpy(ptrcreate, header, tmpsize);
1425                         mhashInsert(oidmod[i], ptrcreate);
1426
1427                         offset += tmpsize;
1428 #endif
1429     }
1430                 else{
1431
1432     GETSIZE(tmpsize,header);
1433
1434     {
1435       struct ___Object___ *dst=(struct ___Object___*)((char*)header+sizeof(objheader_t));
1436       struct ___Object___ *src=(struct ___Object___*)((char*)modptr+sizeof(objheader_t)+offset);
1437       dst->type=src->type;
1438       dst->___cachedCode___=src->___cachedCode___;
1439       dst->___cachedHash___=src->___cachedHash___;
1440       memcpy(&dst[1], &src[1], tmpsize-sizeof(struct ___Object___));
1441     }
1442     header->version += 1;
1443 #ifdef DEBUG
1444     printf("oid: %u, new header version: %d\n", oidmod[i], header->version);
1445 #endif
1446     /* If threads are waiting on this object to be updated, notify them */
1447     if(header->notifylist != NULL) {
1448 #ifdef DEBUG
1449       printf("%s -> type : %d notifylist : %d\n",__func__,TYPE(header),header->notifylist);
1450 #endif
1451
1452 #ifdef RECOVERY
1453       if(header->isBackup != 0)
1454         notifyAll(&header->notifylist, OID(header), header->version);
1455       else
1456         clearNotifyList(OID(header));
1457 #else
1458         notifyAll(&header->notifylist, OID(header), header->version);
1459 #endif
1460
1461     }
1462     offset += sizeof(objheader_t) + tmpsize;
1463   }
1464 }
1465   if (nummod > 0)
1466     free(modptr);
1467
1468   /* Unlock locked objects */
1469   int useWriteUnlock = 0;
1470   for(i = 0; i < numlocked; i++) {
1471     if(oidlocked[i] == -1) {
1472       useWriteUnlock = 1;
1473       continue;
1474     }
1475     if((header = (objheader_t *) mhashSearch(oidlocked[i])) == NULL) {
1476       printf("Error: mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__);
1477       return 1;
1478     }
1479                 
1480 #ifdef DEBUG
1481                 printf("header oid:%d, version:%d, useWriteUnlock:%d\n", OID(header), header->version, useWriteUnlock);
1482 #endif
1483     if(useWriteUnlock) {
1484       write_unlock(STATUSPTR(header));
1485     } else {
1486       read_unlock(STATUSPTR(header));
1487     }
1488   }
1489   //TODO Update location lookup table
1490   return 0;
1491 }
1492
1493 /* This function recevies the oid and offset tuples from the Coordinator's prefetch call.
1494  * Looks for the objects to be prefetched in the main object store.
1495  * If objects are not found then record those and if objects are found
1496  * then use offset values to prefetch references to other objects */
1497
1498 int prefetchReq(int acceptfd) {
1499   int i, size, objsize, numoffset = 0;
1500   int length;
1501   char *recvbuffer, control;
1502   unsigned int oid, mid=-1;
1503   objheader_t *header;
1504   oidmidpair_t oidmid;
1505   int sd = -1;
1506
1507   while(1) {
1508     recv_data((int)acceptfd, &numoffset, sizeof(int));
1509     if(numoffset == -1)
1510       break;
1511     recv_data((int)acceptfd, &oidmid, 2*sizeof(unsigned int));
1512     oid = oidmid.oid;
1513     if (mid != oidmid.mid) {
1514       if (mid!=-1) {
1515         freeSockWithLock(transPResponseSocketPool, mid, sd);
1516       }
1517       mid=oidmid.mid;
1518       sd = getSockWithLock(transPResponseSocketPool, mid);
1519     }
1520     short offsetarry[numoffset];
1521     recv_data((int) acceptfd, offsetarry, numoffset*sizeof(short));
1522
1523     /*Process each oid */
1524     if ((header = mhashSearch(oid)) == NULL) { /* Obj not found */
1525       /* Save the oids not found in buffer for later use */
1526       size = sizeof(int) + sizeof(char) + sizeof(unsigned int) ;
1527       char sendbuffer[size];
1528       *((int *) sendbuffer) = size;
1529       *((char *)(sendbuffer + sizeof(int))) = OBJECT_NOT_FOUND;
1530       *((unsigned int *)(sendbuffer + sizeof(int) + sizeof(char))) = oid;
1531       control = TRANS_PREFETCH_RESPONSE;
1532       sendPrefetchResponse(sd, &control, sendbuffer, &size);
1533     } else { /* Object Found */
1534       int incr = 0;
1535       GETSIZE(objsize, header);
1536       size = sizeof(int) + sizeof(char) + sizeof(unsigned int) + sizeof(objheader_t) + objsize;
1537       char sendbuffer[size];
1538       *((int *)(sendbuffer + incr)) = size;
1539       incr += sizeof(int);
1540       *((char *)(sendbuffer + incr)) = OBJECT_FOUND;
1541       incr += sizeof(char);
1542       *((unsigned int *)(sendbuffer+incr)) = oid;
1543       incr += sizeof(unsigned int);
1544       memcpy(sendbuffer + incr, header, objsize + sizeof(objheader_t));
1545
1546       control = TRANS_PREFETCH_RESPONSE;
1547       sendPrefetchResponse(sd, &control, sendbuffer, &size);
1548
1549       /* Calculate the oid corresponding to the offset value */
1550       for(i = 0 ; i< numoffset ; i++) {
1551         /* Check for arrays  */
1552         if(TYPE(header) >= NUMCLASSES) {
1553           int elementsize = classsize[TYPE(header)];
1554           struct ArrayObject *ao = (struct ArrayObject *) (((char *)header) + sizeof(objheader_t));
1555           unsigned short length = ao->___length___;
1556           /* Check if array out of bounds */
1557           if(offsetarry[i]< 0 || offsetarry[i] >= length) {
1558             break;
1559           }
1560           oid = *((unsigned int *)(((char *)header) + sizeof(objheader_t) + sizeof(struct ArrayObject) + (elementsize*offsetarry[i])));
1561         } else {
1562           oid = *((unsigned int *)(((char *)header) + sizeof(objheader_t) + offsetarry[i]));
1563         }
1564
1565         /* Don't continue if we hit a NULL pointer */
1566         if (oid==0)
1567           break;
1568
1569         if((header = mhashSearch(oid)) == NULL) {
1570           size = sizeof(int) + sizeof(char) + sizeof(unsigned int) ;
1571           char sendbuffer[size];
1572           *((int *) sendbuffer) = size;
1573           *((char *)(sendbuffer + sizeof(int))) = OBJECT_NOT_FOUND;
1574           *((unsigned int *)(sendbuffer + sizeof(int) + sizeof(char))) = oid;
1575
1576           control = TRANS_PREFETCH_RESPONSE;
1577           sendPrefetchResponse(sd, &control, sendbuffer, &size);
1578           break;
1579         } else { /* Obj Found */
1580           int incr = 0;
1581           GETSIZE(objsize, header);
1582           size = sizeof(int) + sizeof(char) + sizeof(unsigned int) + sizeof(objheader_t) + objsize;
1583           char sendbuffer[size];
1584           *((int *)(sendbuffer + incr)) = size;
1585           incr += sizeof(int);
1586           *((char *)(sendbuffer + incr)) = OBJECT_FOUND;
1587           incr += sizeof(char);
1588           *((unsigned int *)(sendbuffer+incr)) = oid;
1589           incr += sizeof(unsigned int);
1590           memcpy(sendbuffer + incr, header, objsize + sizeof(objheader_t));
1591
1592           control = TRANS_PREFETCH_RESPONSE;
1593           sendPrefetchResponse(sd, &control, sendbuffer, &size);
1594         }
1595       } //end of for
1596     }
1597   } //end of while
1598     //Release socket
1599   if (mid!=-1)
1600     freeSockWithLock(transPResponseSocketPool, mid, sd);
1601
1602   return 0;
1603 }
1604
1605 void sendPrefetchResponse(int sd, char *control, char *sendbuffer, int *size) {
1606 #ifdef DEBUG
1607                 printf("control = %d, file = %s, line = %d\n", (int)control, __FILE__, __LINE__);
1608 #endif
1609   send_data(sd, control, sizeof(char));
1610   /* Send the buffer with its size */
1611   int length = *(size);
1612   send_data(sd, sendbuffer, length);
1613 }
1614
1615 void processReqNotify(unsigned int numoid, unsigned int *oidarry, unsigned short *versionarry, unsigned int mid, unsigned int threadid) {
1616   objheader_t *header;
1617   unsigned int oid;
1618   unsigned short newversion;
1619   char msg[1+  2 * sizeof(unsigned int) + sizeof(unsigned short)];
1620   int sd;
1621   struct sockaddr_in remoteAddr;
1622   int bytesSent;
1623   int size;
1624   int i = 0;
1625
1626   while(i < numoid) {
1627     oid = *(oidarry + i);
1628     if((header = (objheader_t *) mhashSearch(oid)) == NULL) {
1629       printf("Error: mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__);
1630       return;
1631     } else {
1632       /* Check to see if versions are same */
1633 checkversion:
1634       if (write_trylock(STATUSPTR(header))) { // Can acquire write lock
1635         newversion = header->version;
1636           
1637         if(newversion == *(versionarry + i)) {
1638                 //Add to the notify list
1639               if((header->notifylist = insNode(header->notifylist, threadid, mid)) == NULL) {
1640                   printf("Error: Obj notify list points to NULL %s, %d\n", __FILE__, __LINE__);
1641                 return;
1642               }
1643             write_unlock(STATUSPTR(header));
1644         } 
1645         else {
1646           write_unlock(STATUSPTR(header));
1647           if ((sd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
1648                   perror("processReqNotify():socket()");
1649                   return;
1650             }
1651               bzero(&remoteAddr, sizeof(remoteAddr));
1652             remoteAddr.sin_family = AF_INET;
1653               remoteAddr.sin_port = htons(LISTEN_PORT);
1654             remoteAddr.sin_addr.s_addr = htonl(mid);
1655   
1656           if(connect(sd, (struct sockaddr *)&remoteAddr, sizeof(remoteAddr)) < 0) {
1657                 printf("Error: processReqNotify():error %d connecting to %s:%d\n", errno,
1658             inet_ntoa(remoteAddr.sin_addr), LISTEN_PORT);
1659               close(sd);
1660               return;
1661               } else {
1662                 //Send Update notification
1663               msg[0] = THREAD_NOTIFY_RESPONSE;
1664               *((unsigned int *)&msg[1]) = oid;
1665             size = sizeof(unsigned int);
1666                   *((unsigned short *)(&msg[1]+size)) = newversion;
1667             size += sizeof(unsigned short);
1668               *((unsigned int *)(&msg[1]+size)) = threadid;
1669               size = 1+ 2*sizeof(unsigned int) + sizeof(unsigned short);
1670             send_data(sd, msg, size);
1671                   }
1672           close(sd);
1673               }
1674       } else {
1675             randomdelay();
1676           goto checkversion;
1677       }
1678     }
1679     i++;
1680   }
1681   free(oidarry);
1682   free(versionarry);
1683 }
1684
1685 #ifdef RECOVERY
1686 /* go through oid's notifylist and clear them */
1687 void clearNotifyList(unsigned int oid)
1688 {
1689 #ifdef DEBUG
1690   printf("%s -> Entering\n",__func__);
1691 #endif
1692
1693   objheader_t* header;
1694   threadlist_t* t;
1695   threadlist_t* tmp;
1696   
1697   if((header = (objheader_t *) mhashSearch(oid)) == NULL) {
1698     printf("%s -> mhashSearch returned NULL!!\n",__func__);
1699   }
1700
1701   if(header->notifylist != NULL) {
1702       t = header->notifylist;
1703        
1704       while(t) {
1705         tmp = t;
1706         t = t->next;
1707
1708         free(tmp);
1709       }
1710       header->notifylist = NULL;
1711   }
1712   
1713   pthread_mutex_lock(&clearNotifyList_mutex);
1714   clearNotifyListFlag = 0;
1715   pthread_mutex_unlock(&clearNotifyList_mutex);
1716 #ifdef DEBUG
1717   printf("%s -> finished\n",__func__);
1718 #endif
1719 }
1720 #endif