recovery
[IRC.git] / Robust / src / Runtime / DSTM / interface_recovery / trans.c
1 #include "machinepile.h"
2 #include "mlookup.h"
3 #include "llookup.h"
4 #include "plookup.h"
5 #include "prelookup.h"
6 #include "threadnotify.h"
7 #include "queue.h"
8 #include "addUdpEnhance.h"
9 #include "addPrefetchEnhance.h"
10 #include "gCollect.h"
11 #include "dsmlock.h"
12 #include "prefetch.h"
13 #ifdef COMPILER
14 #include "thread.h"
15 #endif
16 #ifdef ABORTREADERS
17 #include "abortreaders.h"
18 #endif
19 #include "trans.h"
20
21 #ifdef RECOVERY
22 #include <unistd.h>
23 #include <signal.h>
24 #include <sys/select.h>
25 #include "tlookup.h"
26 #endif
27
28 #define NUM_THREADS 1
29 #define CONFIG_FILENAME "dstm.conf"
30
31 /* Thread transaction variables */
32
33 __thread objstr_t *t_cache;
34 __thread struct ___Object___ *revertlist;
35 #ifdef ABORTREADERS
36 __thread int t_abort;
37 __thread jmp_buf aborttrans;
38 #endif
39
40 /* Global Variables */
41 extern int classsize[];
42 pfcstats_t *evalPrefetch;
43 extern int numprefetchsites; //Global variable containing number of prefetch sites
44 extern pthread_mutex_t mainobjstore_mutex; // Mutex to lock main Object store
45 pthread_mutex_t prefetchcache_mutex; // Mutex to lock Prefetch Cache
46 pthread_mutexattr_t prefetchcache_mutex_attr; /* Attribute for lock to make it a recursive lock */
47 extern prehashtable_t pflookup; //Global Prefetch cache's lookup table
48 pthread_t wthreads[NUM_THREADS]; //Worker threads for working on the prefetch queue
49 pthread_t tPrefetch;            /* Primary Prefetch thread that processes the prefetch queue */
50 extern objstr_t *mainobjstore;
51 unsigned int myIpAddr;
52 unsigned int *hostIpAddrs;
53 int sizeOfHostArray;
54 int numHostsInSystem;
55 int myIndexInHostArray;
56 int waitThreadMid;
57 unsigned int waitThreadID; 
58
59 unsigned int oidsPerBlock;
60 unsigned int oidMin;
61 unsigned int oidMax;
62
63 sockPoolHashTable_t *transReadSockPool;
64 sockPoolHashTable_t *transPrefetchSockPool;
65 sockPoolHashTable_t *transRequestSockPool;
66 pthread_mutex_t notifymutex;
67 pthread_mutex_t atomicObjLock;
68
69 /***********************************
70  * Global Variables for statistics
71  **********************************/
72 int numTransCommit = 0;
73 int numTransAbort = 0;
74 int nchashSearch = 0;
75 int nmhashSearch = 0;
76 int nprehashSearch = 0;
77 int nRemoteSend = 0;
78 int nSoftAbort = 0;
79 int bytesSent = 0;
80 int bytesRecv = 0;
81 int totalObjSize = 0;
82
83 #ifdef RECOVERY
84 /***********************************
85  * Global variables for Duplication
86  ***********************************/
87 int *liveHosts;
88 int liveHostsValid;
89 int numLiveHostsInSystem;       
90 int flipBit;                                                            // Used to distribute requests between primary and backup evenly
91 unsigned int *locateObjHosts;
92 #endif
93
94 int transRetryFlag;
95 unsigned int transIDMax;
96 unsigned int transIDMin;
97 unsigned int transIDIndex;
98 char ip[16];
99
100 #ifdef RECOVERY
101 /******************************
102  * Global variables for Paxos
103  ******************************/
104 int n_a;
105 unsigned int v_a;
106 int n_h;
107 int my_n;
108 unsigned int leader;
109 unsigned int origleader;
110 unsigned int temp_v_a;
111 int paxosRound;
112 #endif
113
114 void printhex(unsigned char *, int);
115 plistnode_t *createPiles();
116 plistnode_t *sortPiles(plistnode_t *pileptr);
117
118 /*******************************
119 * Send and Recv function calls
120 *******************************/
121 int send_data(int fd, void *buf, int buflen) {
122   char *buffer = (char *)(buf);
123   int size = buflen;
124   int numbytes;
125
126   while (size > 0) {
127
128 #ifdef GDBDEBUG
129 GDBSEND1:
130 #endif
131     numbytes = send(fd, buffer, size, 0);
132
133     if( numbytes>0) {
134       bytesSent += numbytes;
135       size -= numbytes;
136     }
137 #ifdef RECOVERY
138     else if( numbytes < 0) {
139       // Receive returned an error.
140       // Analyze underlying cause
141 #ifndef DEBUG
142       printf("%s -> fd : %d errno = %d %s\n",__func__, fd, errno,strerror(errno));
143       fflush(stdout);
144 #endif
145       if(errno == ECONNRESET || errno == EAGAIN || errno == EWOULDBLOCK) {
146         // machine has failed
147         //
148         // if we see EAGAIN w/o failures, we should record the time
149         // when we start send and finish send see if it is longer
150         // than our threshold
151         //
152 #ifdef DEBUG
153         printf("%s -> EAGAIN : %s\n",__func__,(errno == EAGAIN)?"TRUE":"FALSE");
154 #endif
155         return -1;
156       } else {
157 #ifdef GDBDEBUG
158       if(errno == 4)
159         goto GDBSEND1;    
160 #endif
161
162
163 #ifdef DEBUG
164         printf("%s -> Unexpected ERROR!\n",__func__);
165 #endif
166         return -2;
167       }
168     }
169     else{
170       // Case : numbytes == 0
171       // // machine has failed -- this case probably doesn't occur in reality
172       //
173
174
175       
176 #ifdef DEBUG
177       printf("%s -> SHOULD NOT BE HERE\n",__func__);
178 #endif
179       return -1;
180     }
181 #endif
182   } // close while loop
183 #ifdef DEBUG
184   printf("%s-> Exiting\n", __func__);
185 #endif
186
187   return 0; // completed sending data
188 }
189
190 //Returns negative value if receive cannot be completed because of
191 //timeout or machine failure
192
193 int recv_data(int fd, void *buf, int buflen) {
194   char *buffer = (char *)(buf);
195   int size = buflen;
196   int numbytes;
197   int trycounter = 0;
198   
199   while (size > 0) {
200 #ifdef GDBDEBUG
201 GDBRECV1:
202 #endif
203     numbytes = recv(fd, buffer, size, 0);
204     
205     if (numbytes>0) {
206       buffer += numbytes;
207       size -= numbytes;
208     }
209 #ifdef RECOVERY
210     else if (numbytes<0){ 
211       //Receive returned an error.
212       //Analyze underlying cause
213 #ifdef DEBUG
214       printf("%s-> fd : %d errno = %d %s\n", __func__, fd, errno, strerror(errno));     
215 #endif
216       if(errno == ECONNRESET || errno == EAGAIN || errno == EWOULDBLOCK) {
217         //machine has failed
218         //if we see EAGAIN w/o failures, we should record the time
219         //when we start read and finish read and see if it is longer
220         //than our threshold
221 #ifdef DEBUG
222         printf("%s -> EAGAIN : %s\n",__func__,(errno == EAGAIN)?"TRUE":"FALSE");
223 #endif
224         if(errno == EAGAIN) {
225           if(trycounter < 5) {
226 #ifndef DEBUG
227             printf("%s -> TRYcounter increases\n",__func__);
228 #endif
229             trycounter++;
230             continue;
231           }
232           else
233             return -1;
234         }
235         return -1;
236       } else {
237 #ifdef GDBDEBUG
238         if(errno == 4)
239           goto GDBRECV1;
240 #endif
241
242 #ifdef DEBUG
243         printf("%s -> Unexpected ERROR!\n",__func__);
244         printf("%s-> errno = %d %s\n", __func__, errno, strerror(errno));
245 #endif
246         return -2;
247       }
248     } else {
249       //Case: numbytes==0
250       //machine has failed -- this case probably doesn't occur in reality
251       //
252 #ifdef DEBUG
253       printf("%s -> SHOULD NOT BE HERE\n",__func__);
254 #endif
255       return -1;
256     }
257 #endif
258   } //close while loop
259 #ifdef DEBUG
260   printf("%s -> fd = %d Exiting\n",__func__,fd);
261 #endif
262   return 0; // got all the data
263 }
264
265 int recv_data_errorcode(int fd, void *buf, int buflen) {
266 #ifdef DEBUG
267   printf("%s-> Start; fd:%d, buflen:%d\n", __func__, fd, buflen);
268 #endif
269   char *buffer = (char *)(buf);
270   int size = buflen;
271   int numbytes;
272   while (size > 0) {
273     numbytes = recv(fd, buffer, size, 0);
274 #ifdef DEBUG
275     printf("%s-> numbytes: %d\n", __func__, numbytes);
276 #endif
277     if (numbytes==0)
278       return 0;
279     else if (numbytes == -1) {
280       printf("%s -> ERROR NUMBER = %d %s\n",__func__,errno,strerror(errno));
281       perror("recv_data_errorcode");
282       return -1;
283     }
284     
285     buffer += numbytes;
286     size -= numbytes;
287   }
288 #ifdef DEBUG
289   printf("%s-> Exiting\n", __func__);
290 #endif
291   return 1;
292 }
293
294 void printhex(unsigned char *ptr, int numBytes) {
295   int i;
296   for (i = 0; i < numBytes; i++) {
297     if (ptr[i] < 16)
298       printf("0%x ", ptr[i]);
299     else
300       printf("%x ", ptr[i]);
301   }
302   printf("\n");
303   return;
304 }
305
306 inline int arrayLength(int *array) {
307   int i;
308   for(i=0 ; array[i] != -1; i++)
309     ;
310   return i;
311 }
312
313 inline int findmax(int *array, int arraylength) {
314   int max, i;
315   max = array[0];
316   for(i = 0; i < arraylength; i++) {
317     if(array[i] > max) {
318       max = array[i];
319     }
320   }
321   return max;
322 }
323
324 char* midtoIPString(unsigned int mid){
325                 midtoIP(mid, ip);
326                 return ip;
327 }
328 /* This function is a prefetch call generated by the compiler that
329  * populates the shared primary prefetch queue*/
330 void prefetch(int siteid, int ntuples, unsigned int *oids, unsigned short *endoffsets, short *arrayfields) {
331   /* Allocate for the queue node*/
332   int qnodesize = 2*sizeof(int) + ntuples * (sizeof(unsigned short) + sizeof(unsigned int)) + endoffsets[ntuples - 1] * sizeof(short);
333   int len;
334   char * node= getmemory(qnodesize);
335   int top=endoffsets[ntuples-1];
336
337   if (node==NULL)
338     return;
339   /* Set queue node values */
340
341   /* TODO: Remove this after testing */
342   evalPrefetch[siteid].callcount++;
343
344   *((int *)(node))=siteid;
345   *((int *)(node + sizeof(int))) = ntuples;
346   len = 2*sizeof(int);
347   memcpy(node+len, oids, ntuples*sizeof(unsigned int));
348   memcpy(node+len+ntuples*sizeof(unsigned int), endoffsets, ntuples*sizeof(unsigned short));
349   memcpy(node+len+ntuples*(sizeof(unsigned int)+sizeof(short)), arrayfields, top*sizeof(short));
350
351   /* Lock and insert into primary prefetch queue */
352   movehead(qnodesize);
353 }
354
355 /* This function starts up the transaction runtime. */
356 int dstmStartup(const char * option) {
357   pthread_t thread_Listen, udp_thread_Listen;
358   pthread_attr_t attr;
359   int master=option!=NULL && strcmp(option, "master")==0;
360   int fd;
361   int udpfd;
362
363   if (processConfigFile() != 0)
364     return 0; //TODO: return error value, cause main program to exit
365 #ifdef COMPILER
366   if (!master)
367     threadcount--;
368 #endif
369
370 #ifdef TRANSSTATS
371   printf("Trans stats is on\n");
372   fflush(stdout);
373 #endif
374 #ifdef ABORTREADERS
375   initreaderlist();
376 #endif
377
378   //Initialize socket pool
379   transReadSockPool = createSockPool(transReadSockPool, DEFAULTSOCKPOOLSIZE);
380   transPrefetchSockPool = createSockPool(transPrefetchSockPool, DEFAULTSOCKPOOLSIZE);
381   transRequestSockPool = createSockPool(transRequestSockPool, DEFAULTSOCKPOOLSIZE);
382
383   dstmInit();
384   transInit();
385
386   fd=startlistening();
387   pthread_attr_init(&attr);
388   pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
389 #ifdef CACHE
390   udpfd = udpInit();
391   pthread_create(&udp_thread_Listen, &attr, udpListenBroadcast, (void*)udpfd);
392 #endif
393   if (master) {
394                 pthread_create(&thread_Listen, &attr, dstmListen, (void*)fd);
395 #ifdef RECOVERY
396                 updateLiveHosts();
397                 setLocateObjHosts();
398                 updateLiveHostsCommit();
399                 paxos();
400                 if(!allHostsLive()) {
401                         printf("Not all hosts live. Exiting.\n");
402                         exit(-1);
403                 }
404 #endif
405                 return 1;
406         } else {
407                 dstmListen((void *)fd);
408                 return 0;
409   }
410 }
411
412 //TODO Use this later
413 void *pCacheAlloc(objstr_t *store, unsigned int size) {
414   void *tmp;
415   objstr_t *ptr;
416   ptr = store;
417   int success = 0;
418
419   while(ptr->next != NULL) {
420     /* check if store is empty */
421     if(((unsigned int)ptr->top - (unsigned int)ptr - sizeof(objstr_t) + size) <= ptr->size) {
422       tmp = ptr->top;
423       ptr->top += size;
424       success = 1;
425       return tmp;
426     } else {
427       ptr = ptr->next;
428     }
429   }
430
431   if(success == 0) {
432     return NULL;
433   }
434 }
435
436 /* This function initiates the prefetch thread A queue is shared
437  * between the main thread of execution and the prefetch thread to
438  * process the prefetch call Call from compiler populates the shared
439  * queue with prefetch requests while prefetch thread processes the
440  * prefetch requests */
441
442 void transInit() {
443   //Create and initialize prefetch cache structure
444 #ifdef CACHE
445   initializePCache();
446   if((evalPrefetch = initPrefetchStats()) == NULL) {
447     printf("%s() Error allocating memory at %s, %d\n", __func__, __FILE__, __LINE__);
448     exit(0);
449   }
450 #endif
451
452   /* Initialize attributes for mutex */
453   pthread_mutexattr_init(&prefetchcache_mutex_attr);
454   pthread_mutexattr_settype(&prefetchcache_mutex_attr, PTHREAD_MUTEX_RECURSIVE_NP);
455
456   pthread_mutex_init(&prefetchcache_mutex, &prefetchcache_mutex_attr);
457   pthread_mutex_init(&notifymutex, NULL);
458   pthread_mutex_init(&atomicObjLock, NULL);
459 #ifdef CACHE
460   //Create prefetch cache lookup table
461   if(prehashCreate(PHASH_SIZE, PLOADFACTOR)) {
462     printf("ERROR\n");
463     return; //Failure
464   }
465
466   //Initialize primary shared queue
467   queueInit();
468   //Initialize machine pile w/prefetch oids and offsets shared queue
469   mcpileqInit();
470
471   //Create the primary prefetch thread
472   int retval;
473 #ifdef RANGEPREFETCH
474   do {
475     retval=pthread_create(&tPrefetch, NULL, transPrefetchNew, NULL);
476   } while(retval!=0);
477 #else
478   do {
479     retval=pthread_create(&tPrefetch, NULL, transPrefetch, NULL);
480   } while(retval!=0);
481 #endif
482   pthread_detach(tPrefetch);
483 #endif
484 }
485
486 /* This function stops the threads spawned */
487 void transExit() {
488 #ifdef CACHE
489   int t;
490   pthread_cancel(tPrefetch);
491   for(t = 0; t < NUM_THREADS; t++)
492     pthread_cancel(wthreads[t]);
493 #endif
494
495   return;
496 }
497
498 /* This functions inserts randowm wait delays in the order of msec
499  * Mostly used when transaction commits retry*/
500 void randomdelay() {
501   struct timespec req;
502   time_t t;
503
504   t = time(NULL);
505   req.tv_sec = 0;
506   req.tv_nsec = (long)(1000 + (t%10000)); //1-11 microsec
507   nanosleep(&req, NULL);
508   return;
509 }
510
511 /* This function initializes things required in the transaction start*/
512 void transStart() {
513   t_cache = objstrCreate(1048576);
514   t_chashCreate(CHASH_SIZE, CLOADFACTOR);
515   revertlist=NULL;
516 #ifdef ABORTREADERS
517   t_abort=0;
518 #endif
519 }
520
521 /*#define INLINE    inline __attribute__((always_inline))
522
523 INLINE void * chashSearchI(chashtable_t *table, unsigned int key) {
524   //REMOVE HASH FUNCTION CALL TO MAKE SURE IT IS INLINED HERE                                                          
525   chashlistnode_t *node = &table->table[(key & table->mask)>>1];
526
527   do {
528     if(node->key == key) {
529       return node->val;
530     }
531     node = node->next;
532   } while(node != NULL);
533
534   return NULL;
535   }*/
536
537
538
539
540 /* This function finds the location of the objects involved in a transaction
541  * and returns the pointer to the object if found in a remote location */
542 __attribute__((pure)) objheader_t *transRead(unsigned int oid) {
543   unsigned int machinenumber;
544   objheader_t *tmp, *objheader;
545   objheader_t *objcopy;
546   int size;
547   void *buf;
548   chashlistnode_t *node;
549         
550         if(oid == 0) {
551     return NULL;
552   }
553
554   node= &c_table[(oid & c_mask)>>1];
555   do {
556     if(node->key == oid) {
557 #ifdef TRANSSTATS
558     nchashSearch++;
559 #endif
560 #ifdef COMPILER
561     return &((objheader_t*)node->val)[1];
562 #else
563     return node->val;
564 #endif
565     }
566     node = node->next;
567   } while(node != NULL);
568   
569
570   /*  
571   if((objheader = chashSearchI(record->lookupTable, oid)) != NULL) {
572 #ifdef TRANSSTATS
573     nchashSearch++;
574 #endif
575 #ifdef COMPILER
576     return &objheader[1];
577 #else
578     return objheader;
579 #endif
580   } else 
581   */
582
583 #ifdef ABORTREADERS
584   if (t_abort) {
585     //abort this transaction
586     //printf("ABORTING\n");
587     removetransactionhash();
588     objstrDelete(t_cache);
589     t_chashDelete();
590     _longjmp(aborttrans,1);
591   } else
592     addtransaction(oid);
593 #endif
594
595   if ((objheader = (objheader_t *) mhashSearch(oid)) != NULL) {
596 #ifdef TRANSSTATS
597     nmhashSearch++;
598 #endif
599     /* Look up in machine lookup table  and copy  into cache*/
600     GETSIZE(size, objheader);
601     size += sizeof(objheader_t);
602     objcopy = (objheader_t *) objstrAlloc(&t_cache, size);
603     memcpy(objcopy, objheader, size);
604     /* Insert into cache's lookup table */
605     STATUS(objcopy)=0;
606     t_chashInsert(OID(objheader), objcopy);
607 #ifdef COMPILER
608     return &objcopy[1];
609 #else
610     return objcopy;
611 #endif
612   } else {
613 #ifdef CACHE
614     if((tmp = (objheader_t *) prehashSearch(oid)) != NULL) {
615 #ifdef TRANSSTATS
616       nprehashSearch++;
617 #endif
618       /* Look up in prefetch cache */
619       GETSIZE(size, tmp);
620       size+=sizeof(objheader_t);
621       objcopy = (objheader_t *) objstrAlloc(&t_cache, size);
622       memcpy(objcopy, tmp, size);
623       /* Insert into cache's lookup table */
624       t_chashInsert(OID(tmp), objcopy);
625 #ifdef COMPILER
626       return &objcopy[1];
627 #else
628       return objcopy;
629 #endif
630     }
631 #endif
632     /* Get the object from the remote location */
633     if((machinenumber = lhashSearch(oid)) == 0) {
634       printf("Error: %s() No machine found for oid =% %s,%dx\n",__func__, machinenumber, __FILE__, __LINE__);
635       return NULL;
636     }
637     objcopy = getRemoteObj(machinenumber, oid);
638
639     if(objcopy == NULL) {
640       printf("Error: Object not found in Remote location %s, %d\n", __FILE__, __LINE__);
641       return NULL;
642     } else {
643 #ifdef TRANSSTATS
644       nRemoteSend++;
645 #endif
646 #ifdef COMPILER
647       return &objcopy[1];
648 #else
649       return objcopy;
650 #endif
651     }
652   }
653 }
654
655
656 /* This function finds the location of the objects involved in a transaction
657  * and returns the pointer to the object if found in a remote location */
658 __attribute__((pure)) objheader_t *transRead2(unsigned int oid) {
659   unsigned int machinenumber;
660   objheader_t *tmp, *objheader;
661   objheader_t *objcopy;
662   int size;
663
664 #ifdef DEBUG
665         printf("%s-> Start, oid:%u\n", __func__, oid);
666 #endif
667
668 #ifdef ABORTREADERS
669   if (t_abort) {
670     //abort this transaction
671     //printf("ABORTING\n");
672     removetransactionhash();
673     objstrDelete(t_cache);
674     t_chashDelete();
675     _longjmp(aborttrans,1);
676   } else
677     addtransaction(oid);
678 #endif
679
680     if ((objheader = (objheader_t *) mhashSearch(oid)) != NULL) {
681 #ifdef DEBUG
682                   printf("%s-> Grab from this machine\n", __func__);
683 #endif
684 #ifdef TRANSSTATS
685       nmhashSearch++;
686 #endif
687       /* Look up in machine lookup table  and copy  into cache*/
688       GETSIZE(size, objheader);
689       size += sizeof(objheader_t);
690       objcopy = (objheader_t *) objstrAlloc(&t_cache, size);
691       memcpy(objcopy, objheader, size);
692       /* Insert into cache's lookup table */
693       STATUS(objcopy)=0;
694       t_chashInsert(OID(objheader), objcopy);
695 #ifdef COMPILER
696       return &objcopy[1];
697 #else
698       return objcopy;
699 #endif
700     } else {
701 #ifdef CACHE
702       , TYPE(header)if((tmp = (objheader_t *) prehashSearch(oid)) != NULL) {
703 #ifdef TRANSSTATS
704       nprehashSearch++;
705 #endif
706       /* Look up in prefetch cache */
707       GETSIZE(size, tmp);
708       size+=sizeof(objheader_t);
709       objcopy = (objheader_t *) objstrAlloc(&t_cache, size);
710       memcpy(objcopy, tmp, size);
711       /* Insert into cache's lookup table */
712       t_chashInsert(OID(tmp), objcopy);
713 #ifdef COMPILER
714       return &objcopy[1];
715 #else
716                         return objcopy;
717 #endif
718         } 
719 #endif
720                 /* Get the object from the remote location */
721
722 #ifdef DEBUG
723         printf("%s-> Grab from remote machine\n", __func__);
724 #endif
725 #ifdef RECOVERY
726     transRetryFlag = 0;
727     unsigned int mindex = findHost(lhashSearch(oid));
728     machinenumber = locateObjHosts[2*mindex+flipBit];
729   
730     if(numLiveHostsInSystem > 1)
731       flipBit ^= 1;
732     else
733       flipBit = 0;
734
735 #ifdef DEBUG
736     printf("mindex:%d, oid:%d, machinenumber:%s\n", mindex, oid, midtoIPString(machinenumber));
737 #endif
738 #else
739     if((machinenumber = lhashSearch(oid)) == 0) {
740       printf("Error: %s() No machine found for oid =% %s,%dx\n",__func__, machinenumber, __FILE__, __LINE__);
741             return NULL;
742         }
743 #endif
744           objcopy = getRemoteObj(machinenumber, oid);
745 #ifdef RECOVERY
746     if(transRetryFlag) {
747       restoreDuplicationState(machinenumber);
748 #ifdef DEBUG
749       printf("%s -> Recall transRead2\n",__func__);
750 #endif
751       return transRead2(oid);
752     }
753 #endif
754    }
755
756   if(objcopy == NULL) {
757           printf("Error: Object not found in Remote location %s, %d\n", __FILE__, __LINE__);
758                 return NULL;
759         } else {
760 #ifdef TRANSSTATS
761           nRemoteSend++;
762 #endif
763 #ifdef COMPILER
764                 return &objcopy[1];
765 #else
766                 return objcopy;
767 #endif
768         }
769 #ifdef DEBUG
770   printf("%s -> Finished!!\n",__func__);
771 #endif
772 }
773
774 /* This function creates objects in the transaction record */
775 objheader_t *transCreateObj(unsigned int size) {
776   objheader_t *tmp = (objheader_t *) objstrAlloc(&t_cache, (sizeof(objheader_t) + size));
777   OID(tmp) = getNewOID();
778   tmp->version = 1;
779   tmp->rcount = 1;
780         tmp->isBackup = 0;
781   STATUS(tmp) = NEW;
782   t_chashInsert(OID(tmp), tmp);
783
784 #ifdef COMPILER
785   return &tmp[1]; //want space after object header
786 #else
787   return tmp;
788 #endif
789 }
790
791
792 #if 1
793 /* This function creates machine piles based on all machines involved in a
794  * transaction commit request */
795 plistnode_t *createPiles() {
796   int i;
797         unsigned int oid;
798   plistnode_t *pile = NULL;
799   unsigned int machinenum;
800         unsigned int destMachine[2];
801   objheader_t *headeraddr;
802   chashlistnode_t * ptr = c_table;
803   /* Represents number of bins in the chash table */
804   unsigned int size = c_size;
805
806         for(i = 0; i < size ; i++) {
807     chashlistnode_t * curr = &ptr[i];
808                 /* Inner loop to traverse the linked list of the cache lookupTable */
809     while(curr != NULL) {
810       //if the first bin in hash table is empty
811       if(curr->key == 0)
812         break;
813       headeraddr=(objheader_t *) curr->val;
814
815 #if RECOVERY
816       oid = OID(headeraddr);
817 #ifdef DEBUG
818                         printf("%s-> oid:%u, version:%d, status:%d, type:%d\n", __func__, OID(headeraddr), headeraddr->version, STATUS(headeraddr), TYPE(headeraddr));
819
820                         if (STATUS(headeraddr) & NEW) {  // new/local object
821                                 printf("%s-> new/local object\n", __func__);
822                         } 
823       else if ((mhashSearch(curr->key) != NULL)) {      //local/nonnew
824         if(STATUS(headeraddr) & DIRTY) {        // modified
825                                         printf("%s-> old/local/mod object\n", __func__);
826                                 }
827                                 else {  //read
828                                         printf("%s-> old/local/read object\n", __func__);
829                                 }
830                         } 
831       else if ((machinenum = lhashSearch(curr->key)) != 0) { // remote/nonnew object
832                                 if(STATUS(headeraddr) & DIRTY) {                //modified
833                                         printf("%s-> remote/local/mod object\n", __func__);
834                                 }
835                                 else {  //read
836                                         printf("%s-> remote/local/read object\n", __func__);
837                                 }
838                         } 
839       else {
840                                 printf("Error: No such machine %s, %d\n", __FILE__, __LINE__);
841                                 return NULL;
842                         }
843                         unsigned int pmid = getPrimaryMachine(lhashSearch(oid));
844                         unsigned int bmid = getBackupMachine(lhashSearch(oid));
845                         printf("%s-> Primary Machine: [%s], ", __func__, midtoIPString(pmid));
846                         printf("Backup Machine: [%s]\n", midtoIPString(bmid));
847 #endif  
848                         int makedirty = 0;
849                         if(STATUS(headeraddr) & DIRTY || STATUS(headeraddr) & NEW) {
850                                 makedirty = 1;
851                         }
852                         pile = pInsert(pile, headeraddr, getPrimaryMachine(lhashSearch(oid)), c_numelements);
853 //problem here
854                         if(makedirty) { 
855                                 STATUS(headeraddr) = DIRTY;
856                         }
857                         pile = pInsert(pile, headeraddr, getBackupMachine(lhashSearch(oid)), c_numelements);
858 #else
859                         // Get machine location for object id (and whether local or not)
860                         if (STATUS(headeraddr) & NEW || (mhashSearch(curr->key) != NULL)) {
861                                 machinenum = myIpAddr;
862                         } else if ((machinenum = lhashSearch(curr->key)) == 0) {
863         printf("Error: No such machine %s, %d\n", __FILE__, __LINE__);
864         return NULL;
865       }
866       //Make machine groups
867       pile = pInsert(pile, headeraddr, machinenum, c_numelements);
868 #endif
869       curr = curr->next;
870     }
871   }
872         return pile;
873 }
874 #else
875 /* This function creates machine piles based on all machines involved in a
876  * transaction commit request */
877 plistnode_t *createPiles() {
878   int i;
879   plistnode_t *pile = NULL;
880   unsigned int machinenum;
881         unsigned int destMachine[2];
882   objheader_t *headeraddr;
883   struct chashentry * ptr = c_table;
884   /* Represents number of bins in the chash table */
885   unsigned int size = c_size;
886
887   for(i = 0; i < size ; i++) {
888     struct chashentry * curr = & ptr[i];
889     /* Inner loop to traverse the linked list of the cache lookupTable */
890     // if the first bin in hash table is empty
891     if(curr->key == 0)
892       continue;
893     headeraddr=(objheader_t *) curr->ptr;
894
895     //Get machine location for object id (and whether local or not)
896     if (STATUS(headeraddr) & NEW || (mhashSearch(curr->key) != NULL)) {
897       machinenum = myIpAddr;
898     } else if ((machinenum = lhashSearch(curr->key)) == 0) {
899       printf("Error: No such machine %s, %d\n", __FILE__, __LINE__);
900       return NULL;
901     }
902
903     //Make machine groups
904     pile = pInsert(pile, headeraddr, machinenum, c_numelements);
905   }
906   return pile;
907 }
908 #endif
909
910 /* This function initiates the transaction commit process
911  * Spawns threads for each of the new connections with Participants
912  * and creates new piles by calling the createPiles(),
913  * Sends a transrequest() to each remote machines for objects found remotely
914  * and calls handleLocalReq() to process objects found locally */
915 int transCommit() {
916   unsigned int tot_bytes_mod, *listmid;
917   plistnode_t *pile, *pile_ptr;
918   char treplyretry; /* keeps track of the common response that needs to be sent */
919   int firsttime=1;
920   trans_commit_data_t transinfo; /* keeps track of objs locked during transaction */
921   char finalResponse;
922   int deadsd = -1;
923   int deadmid = -1;
924   unsigned int transID = getNewTransID();
925
926 #ifdef DEBUG
927   printf("%s -> Starts transCommit\n",__func__);
928 #endif
929
930 #ifdef ABORTREADERS
931   if (t_abort) {
932     //abort this transaction
933     /* Debug
934      * printf("ABORTING TRANSACTION AT COMMIT\n");
935      */
936     removetransactionhash();
937     objstrDelete(t_cache);
938     t_chashDelete();
939 #ifdef DEBUG
940           printf("%s-> End, line:%d\n\n", __func__, __LINE__);
941 #endif
942     return 1;
943   }
944 #endif
945
946   do {
947     treplyretry = 0;
948
949     /* Look through all the objects in the transaction record and make piles
950      * for each machine involved in the transaction*/
951     if (firsttime) {
952       pile_ptr = pile = createPiles();
953       pile_ptr = pile = sortPiles(pile);
954     } else {
955       pile = pile_ptr;
956     }
957     firsttime = 0;
958     /* Create the packet to be sent in TRANS_REQUEST */
959
960     /* Count the number of participants */
961     int pilecount;
962     pilecount = pCount(pile);
963
964     /* Create a list of machine ids(Participants) involved in transaction   */
965     listmid = calloc(pilecount, sizeof(unsigned int));
966     pListMid(pile, listmid);
967         
968     /* Create a socket and getReplyCtrl array, initialize */
969     int socklist[pilecount];
970     int loopcount;
971     for(loopcount = 0 ; loopcount < pilecount; loopcount++)
972       socklist[loopcount] = 0;
973     char getReplyCtrl[pilecount];
974     for(loopcount = 0 ; loopcount < pilecount; loopcount++)
975       getReplyCtrl[loopcount] = 0;
976
977     /* Process each machine pile */
978     int sockindex = 0;
979                 int localReqsock = -1;
980     trans_req_data_t *tosend;
981     tosend = calloc(pilecount, sizeof(trans_req_data_t));
982     while(pile != NULL) {
983 #ifdef DEBUG
984                         printf("%s-> New pile:[%s],", __func__, midtoIPString(pile->mid));
985                         printf(" myIp:[%s]\n", midtoIPString(myIpAddr));
986 #endif
987       tosend[sockindex].f.control = TRANS_REQUEST;
988                         tosend[sockindex].f.mcount = pilecount;
989                         tosend[sockindex].f.numread = pile->numread;
990                         tosend[sockindex].f.nummod = pile->nummod;
991                         tosend[sockindex].f.numcreated = pile->numcreated;
992 #ifdef DEBUG
993                         printf("%s-> numread:%d, nummod:%d, numcreated:%d\n", __func__, pile->numread, pile->nummod, pile->numcreated);
994 #endif
995                         tosend[sockindex].f.sum_bytes = pile->sum_bytes;
996                         tosend[sockindex].listmid = listmid;
997                         tosend[sockindex].objread = pile->objread;
998                         tosend[sockindex].oidmod = pile->oidmod;
999                         tosend[sockindex].oidcreated = pile->oidcreated;
1000                         int sd = 0;
1001                         if(pile->mid != myIpAddr) {
1002                                 if((sd = getSockWithLock(transRequestSockPool, pile->mid)) < 0) {
1003                                         printf("\ntransRequest(): socket create error\n");
1004                                         free(listmid);
1005                                         free(tosend);
1006 #ifdef DEBUG
1007                                         printf("%s-> End, line:%d\n\n", __func__, __LINE__);
1008 #endif
1009                                         return 1;
1010                                 }
1011                                 socklist[sockindex] = sd;
1012                                 /* Send bytes of data with TRANS_REQUEST control message */
1013                                 send_data(sd, &(tosend[sockindex].f), sizeof(fixed_data_t));
1014
1015                                 /* Send list of machines involved in the transaction */
1016                                 {
1017                                         int size=sizeof(unsigned int)*(tosend[sockindex].f.mcount);
1018                                         send_data(sd, tosend[sockindex].listmid, size);
1019                                 }
1020
1021                                 /* Send oids and version number tuples for objects that are read */
1022                                 {
1023                                         int size=(sizeof(unsigned int)+sizeof(unsigned short))*(tosend[sockindex].f.numread);
1024                                         send_data(sd, tosend[sockindex].objread, size);
1025                                 }
1026
1027                                 /* Send objects that are modified */
1028                                 void *modptr;
1029                                 if((modptr = calloc(1, tosend[sockindex].f.sum_bytes)) == NULL) {
1030                                         printf("Calloc error for modified objects %s, %d\n", __FILE__, __LINE__);
1031                                         free(listmid);
1032                                         free(tosend);
1033 #ifdef DEBUG
1034                                         printf("%s-> End, line:%d\n\n", __func__, __LINE__);
1035 #endif
1036                                         return 1;
1037                                 }
1038                                 int offset = 0;
1039                                 int i;
1040                                 for(i = 0; i < tosend[sockindex].f.nummod ; i++) {
1041                                         int size;
1042                                         objheader_t *headeraddr;
1043                                         if((headeraddr = t_chashSearch(tosend[sockindex].oidmod[i])) == NULL) {
1044                                                 printf("%s() Error: No such oid %s, %d\n", __func__, __FILE__, __LINE__);
1045                                                 free(modptr);
1046                                                 free(listmid);
1047                                                 free(tosend);
1048 #ifdef DEBUG
1049                                                 printf("%s-> End, line:%d\n\n", __func__, __LINE__);
1050 #endif
1051                                                 return 1;
1052                                         }
1053                                         GETSIZE(size,headeraddr);
1054                                         size+=sizeof(objheader_t);
1055                                         memcpy(modptr+offset, headeraddr, size);
1056                                         offset+=size;
1057                                 }
1058                                 send_data(sd, modptr, tosend[sockindex].f.sum_bytes);
1059
1060 #ifdef RECOVERY
1061         /* send transaction id, number of machine involved, machine ids */
1062         send_data(sd, &transID, sizeof(unsigned int));
1063 #endif
1064                                 free(modptr);
1065                         } else { //handle request locally
1066                                 localReqsock = sockindex;
1067                                 handleLocalReq(&tosend[sockindex], &transinfo, &getReplyCtrl[sockindex]);
1068                         }
1069                         sockindex++;
1070                         pile = pile->next;
1071                 } //end of pile processing
1072
1073                 /* Recv Ctrl msgs from all machines */
1074 #ifdef DEBUG
1075                 printf("%s-> Finished sending transaction read/mod objects\n",__func__);
1076 #endif
1077                 int i;
1078
1079                 for(i = 0; i < pilecount; i++) {
1080                         if(i == localReqsock)
1081                                 continue;
1082                         int sd = socklist[i]; 
1083                         if(sd != 0) {
1084                                 char control;
1085         int timeout;            // a variable to check if the connection is still alive. if it is -1, then need to transcommit again
1086         timeout = recv_data(sd, &control, sizeof(char));
1087                                 //Update common data structure with new ctrl msg
1088                                 getReplyCtrl[i] = control;
1089                                 /* Recv Objects if participant sends TRANS_DISAGREE */
1090                                 //printf("getReplyCtrl[%d] = %d\n", i, (int)getReplyCtrl[i]);
1091 #ifdef CACHE
1092                                 if(control == TRANS_DISAGREE) {
1093                                         int length;
1094                                         timeout = recv_data(sd, &length, sizeof(int));
1095                                         void *newAddr;
1096                                         pthread_mutex_lock(&prefetchcache_mutex);
1097                                         if ((newAddr = prefetchobjstrAlloc((unsigned int)length)) == NULL) {
1098                                                 printf("Error: %s() objstrAlloc error for copying into prefetch cache %s, %d\n", __func__, __FILE__, __LINE__);
1099                                                 free(tosend);
1100                                                 free(listmid);
1101                                                 pthread_mutex_unlock(&prefetchcache_mutex);
1102 #ifdef DEBUG
1103                                                 printf("%s-> End, line:%d\n\n", __func__, __LINE__);
1104 #endif
1105                                                 return 1;
1106                                         }
1107                                         pthread_mutex_unlock(&prefetchcache_mutex);
1108                                         timeout = recv_data(sd, newAddr, length);
1109                                         int offset = 0;
1110                                         while(length != 0) {
1111                                                 unsigned int oidToPrefetch;
1112                                                 objheader_t * header;
1113                                                 header = (objheader_t *)(((char *)newAddr) + offset);
1114                                                 oidToPrefetch = OID(header);
1115                                                 STATUS(header)=0;
1116                                                 int size = 0;
1117                                                 GETSIZE(size, header);
1118                                                 size += sizeof(objheader_t);
1119                                                 //make an entry in prefetch hash table
1120                                                 void *oldptr;
1121                                                 if((oldptr = prehashSearch(oidToPrefetch)) != NULL) {
1122                                                         prehashRemove(oidToPrefetch);
1123                                                         prehashInsert(oidToPrefetch, header);
1124                                                 } else {
1125                                                         prehashInsert(oidToPrefetch, header);
1126                                                 }
1127                                                 length = length - size;
1128                                                 offset += size;
1129                                         }
1130                                 } //end of receiving objs
1131 #endif
1132
1133 #ifdef RECOVERY
1134         if(timeout < 0) {
1135 #ifdef DEBUG
1136           printf("%s -> TIMEOUT!!!!!!!\n",__func__);
1137 #endif
1138
1139           deadmid = listmid[i];
1140           deadsd = sd;
1141 #ifdef DEBUG
1142           printf("%s -> Dead Machine ID : %s\n",__func__,midtoIPString(deadmid));  
1143           printf("%s -> Dead SD         : %d\n",__func__,sd);
1144 #endif
1145           getReplyCtrl[i] = TRANS_DISAGREE;
1146         }
1147 #endif
1148                         }
1149                 }
1150
1151 #ifdef DEBUG
1152                 printf("%s-> Decide final response now\n", __func__);
1153 #endif
1154                 /* Decide the final response */
1155                 if((finalResponse = decideResponse(getReplyCtrl, &treplyretry, pilecount)) == 0) {
1156                         printf("Error: %s() in updating prefetch cache %s, %d\n", __func__, __FILE__, __LINE__);
1157                         free(tosend);
1158                         free(listmid);
1159 #ifdef DEBUG
1160                         printf("%s-> End, line:%d\n\n", __func__, __LINE__);
1161 #endif
1162                         return 1;
1163                 }
1164 #ifdef DEBUG
1165     printf("%s-> Final Response: %d\n", __func__, (int)finalResponse);
1166 #endif
1167     
1168                 /* Send responses to all machines */
1169                 for(i = 0; i < pilecount; i++) {
1170                         int sd = socklist[i];
1171
1172       if(sd != deadsd) {
1173                         if(sd != 0) {
1174 #ifdef CACHE
1175                                 if(finalResponse == TRANS_COMMIT) {
1176                                         int retval;
1177                                         /* Update prefetch cache */
1178                                         if((retval = updatePrefetchCache(&(tosend[i]))) != 0) {
1179                                                 printf("Error: %s() in updating prefetch cache %s, %d\n", __func__, __FILE__, __LINE__);
1180                                                   free(tosend);
1181                                                 free(listmid);
1182 #ifdef DEBUG
1183                                                 printf("%s-> End, line:%d\n\n", __func__, __LINE__);
1184 #endif
1185                                                 return 1;
1186                                         }
1187
1188                                         /* Invalidate objects in other machine cache */
1189                                         if(tosend[i].f.nummod > 0) {
1190                                                 if((retval = invalidateObj(&(tosend[i]))) != 0) {
1191                                                         printf("Error: %s() in invalidating Objects %s, %d\n", __func__, __FILE__, __LINE__);
1192                                                         free(tosend);
1193                                                         free(listmid);
1194 #ifdef DEBUG
1195                                                         printf("%s-> End, line:%d\n\n", __func__, __LINE__);
1196 #endif
1197                                                         return 1;
1198                                                 }
1199                                         }
1200 #ifdef ABORTREADERS
1201                                         removetransaction(tosend[i].oidmod,tosend[i].f.nummod);
1202                                         removethisreadtransaction(tosend[i].objread, tosend[i].f.numread);
1203 #endif
1204                                   }
1205 #ifdef ABORTREADERS
1206                                 else if (!treplyretry) {
1207                                         removethistransaction(tosend[i].oidmod,tosend[i].f.nummod);
1208                                         removethisreadtransaction(tosend[i].objread,tosend[i].f.numread);
1209                                 }
1210 #endif
1211 #endif
1212           send_data(sd,&finalResponse,sizeof(char));
1213 #ifdef DEBUG
1214           printf("%s -> Decision Sent to %s\n",__func__,midtoIPString(listmid[i]));
1215 #endif
1216
1217       } else {
1218                                 /* Complete local processing */
1219 #ifdef RECOVERY
1220           thashInsert(transID,finalResponse);
1221 #endif
1222           doLocalProcess(finalResponse, &(tosend[i]), &transinfo);
1223
1224 #ifdef ABORTREADERS
1225                                   if(finalResponse == TRANS_COMMIT) {
1226                                           removetransaction(tosend[i].oidmod,tosend[i].f.nummod);
1227                                         removethisreadtransaction(tosend[i].objread,tosend[i].f.numread);
1228                                 } else if (!treplyretry) {
1229                                         removethistransaction(tosend[i].oidmod,tosend[i].f.nummod);
1230                                         removethisreadtransaction(tosend[i].objread,tosend[i].f.numread);
1231                                   }
1232 #endif
1233                         }
1234       } else {
1235 #ifdef ABORTREADERS
1236         removetransaction(tosend[i].oidmod,tosend[i].f.nummod);
1237         removethisreadtransaction(tosend[i].objread, tosend[i].f.numread);
1238 #endif
1239       }
1240                 }
1241
1242
1243 #ifdef DEBUG
1244                 printf("%s-> Free sockets\n", __func__);
1245 #endif
1246                 for(i = 0; i < pilecount; i++) {
1247                         if(socklist[i] != 0) {
1248                           freeSockWithLock(transRequestSockPool, listmid[i], socklist[i]);      
1249                         }
1250                 }
1251
1252                         /* Free resources */
1253     free(tosend);
1254     free(listmid);
1255     if (!treplyretry)
1256       pDelete(pile_ptr);
1257     /* wait a random amount of time before retrying to commit transaction*/
1258     if(treplyretry) {
1259       randomdelay();
1260 #ifdef TRANSSTATS
1261                         nSoftAbort++;
1262 #endif
1263                 }
1264         } while (treplyretry && deadmid != -1);
1265
1266         if(finalResponse == TRANS_ABORT) {
1267
1268 #ifdef TRANSSTATS
1269                 numTransAbort++;
1270 #endif
1271     /* Free Resources */
1272     objstrDelete(t_cache);
1273     t_chashDelete();
1274 #ifdef DEBUG
1275           printf("%s-> End, line:%d\n\n", __func__, __LINE__);
1276 #endif
1277 #ifdef RECOVERY
1278     if(deadmid != -1) { /* if deadmid is greater than or equal to 0, 
1279                           then there is dead machine. */
1280 #ifdef DEBUG
1281       printf("%s -> Dead machine Detected : %s\n",__func__,midtoIPString(deadmid));
1282 #endif
1283       restoreDuplicationState(deadmid);
1284 #ifdef DEBUG
1285       printf("%s -> Duplication completed\n",__func__);
1286 #endif
1287     }
1288 #endif
1289     return TRANS_ABORT;
1290   } else if(finalResponse == TRANS_COMMIT) {
1291 #ifdef TRANSSTATS
1292                 numTransCommit++;
1293 #endif
1294     /* Free Resources */
1295     objstrDelete(t_cache);
1296     t_chashDelete();
1297 #ifdef DEBUG
1298                                         printf("%s-> End, line:%d\n\n", __func__, __LINE__);
1299 #endif
1300     return 0;
1301   } else {
1302     //TODO Add other cases
1303     printf("Error: in %s() THIS SHOULD NOT HAPPEN.....EXIT PROGRAM\n", __func__);
1304 #ifdef DEBUG
1305         printf("%s-> End, line:%d\n\n", __func__, __LINE__);
1306 #endif
1307     exit(-1);
1308   }
1309 #ifdef DEBUG
1310         printf("%s-> End, line:%d\n\n", __func__, __LINE__);
1311 #endif
1312   return 0;
1313 }
1314
1315 /* This function handles the local objects involved in a transaction
1316  * commiting process.  It also makes a decision if this local machine
1317  * sends AGREE or DISAGREE or SOFT_ABORT to coordinator */
1318 void handleLocalReq(trans_req_data_t *tdata, trans_commit_data_t *transinfo, char *getReplyCtrl) {
1319   unsigned int *oidnotfound = NULL, *oidlocked = NULL;
1320   int numoidnotfound = 0, numoidlocked = 0;
1321   int v_nomatch = 0, v_matchlock = 0, v_matchnolock = 0;
1322   int numread, i;
1323   unsigned int oid;
1324   unsigned short version;
1325
1326   /* Counters and arrays to formulate decision on control message to be sent */
1327   oidnotfound = (unsigned int *) calloc((tdata->f.numread + tdata->f.nummod), sizeof(unsigned int));
1328         oidlocked = (unsigned int *) calloc((tdata->f.numread + tdata->f.nummod +1), sizeof(unsigned int)); // calloc additional 1 byte for
1329         //setting a divider between read and write locks
1330         numread = tdata->f.numread;
1331         /* Process each oid in the machine pile/ group per thread */
1332         for (i = 0; i < tdata->f.numread + tdata->f.nummod; i++) {
1333                 if (i < tdata->f.numread) {
1334                         int incr = sizeof(unsigned int) + sizeof(unsigned short); // Offset that points to next position in the objread array
1335                         incr *= i;
1336                         oid = *((unsigned int *)(((char *)tdata->objread) + incr));
1337                         version = *((unsigned short *)(((char *)tdata->objread) + incr + sizeof(unsigned int)));
1338                         commitCountForObjRead(getReplyCtrl, oidnotfound, oidlocked, &numoidnotfound, &numoidlocked, &v_nomatch, &v_matchlock, &v_matchnolock, oid, version);
1339                 } else { // Objects Modified
1340                         if(i == tdata->f.numread) {
1341                                 oidlocked[numoidlocked++] = -1;
1342                         }
1343                         int tmpsize;
1344                         objheader_t *headptr;
1345                         headptr = (objheader_t *) t_chashSearch(tdata->oidmod[i-numread]);
1346                         if (headptr == NULL) {
1347                                 printf("Error: handleLocalReq() returning NULL, no such oid %s, %d\n", __FILE__, __LINE__);
1348                                 return;
1349                         }
1350                         oid = OID(headptr);
1351                         version = headptr->version;
1352                         commitCountForObjMod(getReplyCtrl, oidnotfound, oidlocked, &numoidnotfound, &numoidlocked, &v_nomatch, &v_matchlock, &v_matchnolock, oid, version);
1353                 }
1354   }
1355
1356         /* Fill out the trans_commit_data_t data structure. This is required for a trans commit process
1357          * if Participant receives a TRANS_COMMIT */
1358         transinfo->objlocked = oidlocked;
1359         transinfo->objnotfound = oidnotfound;
1360         transinfo->modptr = NULL;
1361         transinfo->numlocked = numoidlocked;
1362         transinfo->numnotfound = numoidnotfound;
1363
1364   /* Condition to send TRANS_AGREE */
1365   if(v_matchnolock == tdata->f.numread + tdata->f.nummod) {
1366 #ifdef DEBUG
1367     printf("%s -> TRANS_AGREE\n",__func__);
1368 #endif
1369     *getReplyCtrl = TRANS_AGREE;
1370   }
1371   /* Condition to send TRANS_SOFT_ABORT */
1372   if((v_matchlock > 0 && v_nomatch == 0) || (numoidnotfound > 0 && v_nomatch == 0)) {
1373 #ifdef DEBUG
1374     printf("%s -> TRANS_SOFT_ABORT\n",__func__);
1375 #endif
1376     *getReplyCtrl = TRANS_SOFT_ABORT;
1377   }
1378 }
1379
1380 void doLocalProcess(char finalResponse, trans_req_data_t *tdata, trans_commit_data_t *transinfo) {
1381
1382   if(finalResponse == TRANS_ABORT) {
1383     if(transAbortProcess(transinfo) != 0) {
1384       printf("Error in transAbortProcess() %s,%d\n", __FILE__, __LINE__);
1385       fflush(stdout);
1386       return;
1387     }
1388   } else if(finalResponse == TRANS_COMMIT) {
1389 #ifdef CACHE
1390     /* Invalidate objects in other machine cache */
1391     if(tdata->f.nummod > 0) {
1392       int retval;
1393       if((retval = invalidateObj(tdata)) != 0) {
1394         printf("Error: %s() in invalidating Objects %s, %d\n", __func__, __FILE__, __LINE__);
1395         return;
1396       }
1397     }
1398 #endif
1399     if(transComProcess(tdata, transinfo) != 0) {
1400       printf("Error in transComProcess() %s,%d\n", __FILE__, __LINE__);
1401       fflush(stdout);
1402       return;
1403     }
1404   } else {
1405     printf("ERROR...No Decision\n");
1406   }
1407
1408   /* Free memory */
1409   if (transinfo->objlocked != NULL) {
1410     free(transinfo->objlocked);
1411   }
1412   if (transinfo->objnotfound != NULL) {
1413     free(transinfo->objnotfound);
1414   }
1415 }
1416
1417 /* This function decides the reponse that needs to be sent to
1418  * all Participant machines after the TRANS_REQUEST protocol */
1419 char decideResponse(char *getReplyCtrl, char *treplyretry, int pilecount) {
1420   int i, transagree = 0, transdisagree = 0, transsoftabort = 0; /* Counters to formulate decision of what
1421                                                                    message to send */
1422   for (i = 0 ; i < pilecount; i++) {
1423     char control;
1424     control = getReplyCtrl[i];
1425     switch(control) {
1426     default:
1427 #ifdef DEBUG
1428       printf("%s-> Participant sent unknown message, i:%d, Control: %d\n", __func__, i, (int)control);
1429 #endif
1430
1431       /* treat as disagree, pass thru */
1432     case TRANS_DISAGREE:
1433       transdisagree++;
1434 #ifdef DEBUG
1435       printf("%s-> Participant sent TRANS_DISAGREE, i:%d, Control: %d\n", __func__, i, (int)control);
1436 #endif
1437       break;
1438
1439     case TRANS_AGREE:
1440       transagree++;
1441 #ifdef DEBUG
1442       printf("%s-> Participant sent TRANS_AGREE, i:%d, Control: %d\n", __func__, i, (int)control);
1443 #endif
1444       break;
1445
1446     case TRANS_SOFT_ABORT:
1447       transsoftabort++;
1448 #ifdef DEBUG
1449       printf("%s-> Participant sent TRANS_SOFT_ABORT, i:%d, Control: %d\n", __func__, i, (int)control);
1450 #endif
1451       break;
1452     }
1453   }
1454
1455   if(transdisagree > 0) {
1456     /* Send Abort */
1457     *treplyretry = 0;
1458     return TRANS_ABORT;
1459 #ifdef CACHE
1460     /* clear objects from prefetch cache */
1461     cleanPCache();
1462 #endif
1463   } else if(transagree == pilecount) {
1464     /* Send Commit */
1465     *treplyretry = 0;
1466     return TRANS_COMMIT;
1467   } else {
1468     /* Send Abort in soft abort case followed by retry commiting transaction again*/
1469     *treplyretry = 1;
1470     return TRANS_ABORT;
1471   }
1472   return 0;
1473 }
1474
1475 /* This function opens a connection, places an object read request to
1476  * the remote machine, reads the control message and object if
1477  * available and copies the object and its header to the local
1478  * cache. */
1479
1480 void *getRemoteObj(unsigned int mnum, unsigned int oid) {
1481   int size, val;
1482   struct sockaddr_in serv_addr;
1483   char machineip[16];
1484   char control = 0;
1485   objheader_t *h;
1486   void *objcopy = NULL;
1487
1488   int sd;
1489   int flag;
1490
1491   if((sd = getSock2(transReadSockPool, mnum)) != -1) {
1492     char readrequest[sizeof(char)+sizeof(unsigned int)];
1493     readrequest[0] = READ_REQUEST;
1494     *((unsigned int *)(&readrequest[1])) = oid;
1495     send_data(sd, readrequest, sizeof(readrequest));
1496   }
1497   else {
1498       printf("%s -> creating socket error\n",__func__);
1499   }
1500   
1501   /* Read response from the Participant */
1502   if(recv_data(sd, &control, sizeof(char)) < 0) {
1503       transRetryFlag = 1;
1504       return NULL;
1505   }
1506
1507   if (control==OBJECT_NOT_FOUND) {
1508     objcopy = NULL;
1509   } else if(control==OBJECT_FOUND) {
1510   
1511   /* Read object if found into local cache */
1512
1513   if(recv_data(sd, &size, sizeof(int)) < 0) {
1514     transRetryFlag = 1;
1515     return NULL;
1516   }
1517
1518   objcopy = objstrAlloc(&t_cache, size);
1519
1520   if(recv_data(sd, objcopy, size) < 0) {
1521     transRetryFlag = 1;
1522     return NULL;
1523   }
1524
1525   STATUS(objcopy)=0;
1526   
1527   /* Insert into cache's lookup table */
1528   t_chashInsert(oid, objcopy);
1529 #ifdef TRANSSTATS
1530   totalObjSize += size;
1531 #endif
1532         }
1533         return objcopy;
1534 }
1535
1536 #ifdef RECOVERY
1537 /* ask machines if they received decision */
1538 char receiveDecisionFromBackup(unsigned int transID,int nummid,unsigned int *listmid)
1539 {
1540 #ifdef DEBUG
1541   printf("%s -> Entering\n",__func__);
1542 #endif
1543
1544   int sd; // socket id
1545   int i;
1546   char response;
1547   
1548   for(i = 0; i < nummid; i++) {
1549     if((sd = getSock(transReadSockPool, listmid[i])) < 0) {
1550       printf("%s -> socket Error!!\n");
1551     }
1552     else {
1553       char control = ASK_COMMIT;
1554
1555       send_data(sd,&control, sizeof(char));
1556       send_data(sd,&transID, sizeof(unsigned int));
1557
1558       // return -1 if it didn't receive the response
1559       int timeout = recv_data(sd,&response, sizeof(char));
1560
1561
1562       if(timeout == 0 || response > 0)
1563         break;  // received response
1564
1565       // else check next machine
1566       freeSock(transReadSockPool, listmid[i],sd);
1567      }
1568   }
1569 #ifdef DEBUG
1570   printf("%s -> response : %d\n",__func__,response);
1571 #endif
1572
1573   return (response==-1)?TRANS_ABORT:response;
1574 }
1575 #endif
1576
1577 #ifdef RECOVERY
1578 void restoreDuplicationState(unsigned int deadHost) {
1579         int sd;
1580         char ctrl;
1581
1582         if(!liveHosts[findHost(deadHost)]) {
1583                 sleep(WAIT_TIME);
1584                 return;
1585         }
1586
1587         if(deadHost == leader)
1588                 paxos();
1589         
1590 #ifdef DEBUG
1591         printf("%s-> leader?:%s, me?:%s\n", __func__, midtoIPString(leader), (myIpAddr == leader)?"LEADER":"NOT LEADER");
1592 #endif
1593         
1594         if(leader == myIpAddr) {
1595                 pthread_mutex_lock(&leaderFixing_mutex);
1596                 if(!leaderFixing) {
1597                         leaderFixing = 1;
1598                         pthread_mutex_unlock(&leaderFixing_mutex);
1599
1600       if(!liveHosts[findHost(deadHost)]) {
1601 #ifdef DEBUG
1602         printf("%s -> already fixed\n",__func__);
1603 #endif
1604         pthread_mutex_lock(&leaderFixing_mutex);
1605         leaderFixing =0;
1606         pthread_mutex_unlock(&leaderFixing_mutex);
1607       }
1608       else {
1609                         updateLiveHosts();
1610         duplicateLostObjects(deadHost);
1611                         
1612                     if(updateLiveHostsCommit() != 0) {
1613                                 printf("%s -> error updateLiveHostsCommit()\n",__func__);
1614                                   exit(1);
1615                         }
1616                 pthread_mutex_lock(&leaderFixing_mutex);
1617                         leaderFixing = 0;
1618                           pthread_mutex_unlock(&leaderFixing_mutex);
1619       }
1620                 }
1621                 else {
1622                         pthread_mutex_unlock(&leaderFixing_mutex);
1623 #ifdef DEBUG
1624       printf("%s (REMOTE_RESTORE_DUPLICATED_STATE -> LEADER is already fixing\n",__func__);
1625 #endif
1626                         sleep(WAIT_TIME);
1627                 }
1628         }
1629         else {
1630                 if((sd = getSockWithLock(transRequestSockPool, leader)) < 0) {
1631                         printf("%s -> socket create error\n",__func__);
1632                         exit(-1);
1633                 }
1634                 ctrl = REMOTE_RESTORE_DUPLICATED_STATE;
1635                 send_data(sd, &ctrl, sizeof(char));
1636                 send_data(sd, &deadHost, sizeof(unsigned int));
1637     freeSockWithLock(transRequestSockPool,leader,sd);
1638           sleep(WAIT_TIME);
1639         }
1640
1641   printf("%s -> Finished!\n",__func__);
1642 }
1643 #endif
1644
1645
1646 /*  Commit info for objects modified */
1647 void commitCountForObjMod(char *getReplyCtrl, unsigned int *oidnotfound, unsigned int *oidlocked, int *numoidnotfound,
1648                           int *numoidlocked, int *v_nomatch, int *v_matchlock, int *v_matchnolock, unsigned int oid, unsigned short version) {
1649   void *mobj;
1650   /* Check if object is still present in the machine since the beginning of TRANS_REQUEST */
1651   /* Save the oids not found and number of oids not found for later use */
1652   if ((mobj = mhashSearch(oid)) == NULL) { /* Obj not found */
1653     /* Save the oids not found and number of oids not found for later use */
1654     oidnotfound[*numoidnotfound] = oid;
1655     (*numoidnotfound)++;
1656   } else { /* If Obj found in machine (i.e. has not moved) */
1657     /* Check if Obj is locked by any previous transaction */
1658     if (write_trylock(STATUSPTR(mobj))) { // Can acquire write lock
1659       if (version == ((objheader_t *)mobj)->version) {      /* match versions */
1660         (*v_matchnolock)++;
1661         //Keep track of what is locked
1662         oidlocked[(*numoidlocked)++] = OID(((objheader_t *)mobj));
1663       } else { /* If versions don't match ...HARD ABORT */
1664         (*v_nomatch)++;
1665         /* Send TRANS_DISAGREE to Coordinator */
1666         *getReplyCtrl = TRANS_DISAGREE;
1667
1668         //Keep track of what is locked
1669         oidlocked[(*numoidlocked)++] = OID(((objheader_t *)mobj));
1670         //printf("%s() oid = %d, type = %d\t", __func__, OID(mobj), TYPE((objheader_t *)mobj));
1671         return;
1672       }
1673     } else { //A lock is acquired some place else
1674                       if (version == ((objheader_t *)mobj)->version) { /* Check if versions match */
1675         (*v_matchlock)++;
1676       } else { /* If versions don't match ...HARD ABORT */
1677         (*v_nomatch)++;
1678         /* Send TRANS_DISAGREE to Coordinator */
1679         *getReplyCtrl = TRANS_DISAGREE;
1680         //printf("%s() oid = %d, type = %d\t", __func__, OID(mobj), TYPE((objheader_t *)mobj));
1681         return;
1682       }
1683     }
1684   }
1685 }
1686
1687 /*  Commit info for objects modified */
1688 void commitCountForObjRead(char *getReplyCtrl, unsigned int *oidnotfound, unsigned int *oidlocked, int *numoidnotfound,
1689                            int *numoidlocked, int *v_nomatch, int *v_matchlock, int *v_matchnolock, unsigned int oid, unsigned short version) {
1690   void *mobj;
1691   /* Check if object is still present in the machine since the beginning of TRANS_REQUEST */
1692   /* Save the oids not found and number of oids not found for later use */
1693   if ((mobj = mhashSearch(oid)) == NULL) { /* Obj not found */
1694                 /* Save the oids not found and number of oids not found for later use */
1695                 oidnotfound[*numoidnotfound] = oid;
1696                 (*numoidnotfound)++;
1697         } else { /* If Obj found in machine (i.e. has not moved) */
1698                 /* Check if Obj is locked by any previous transaction */
1699                 if (read_trylock(STATUSPTR(mobj))) { // Can further acquire read locks
1700                         if (version == ((objheader_t *)mobj)->version) {      /* If locked then match versions */
1701                                 (*v_matchnolock)++;
1702                                 //Keep track of what is locked
1703                                 oidlocked[(*numoidlocked)++] = OID(((objheader_t *)mobj));
1704                         } else { /* If versions don't match ...HARD ABORT */
1705                                 (*v_nomatch)++;
1706                                 /* Send TRANS_DISAGREE to Coordinator */
1707                                 *getReplyCtrl = TRANS_DISAGREE;
1708                                 //Keep track of what is locked
1709                                 oidlocked[(*numoidlocked)++] = OID(((objheader_t *)mobj));
1710                                 //printf("%s() oid = %d, type = %d\t", __func__, OID(mobj), TYPE((objheader_t *)mobj));
1711                                 return;
1712                         }
1713                 } else { //Has reached max number of readers or some other transaction
1714                         //has acquired a lock on this object
1715                         if (version == ((objheader_t *)mobj)->version) { /* Check if versions match */
1716                                 (*v_matchlock)++;
1717                         } else { /* If versions don't match ...HARD ABORT */
1718                                 (*v_nomatch)++;
1719                                 /* Send TRANS_DISAGREE to Coordinator */
1720                                 *getReplyCtrl = TRANS_DISAGREE;
1721                                 //printf("%s() oid = %d, type = %d\t", __func__, OID(mobj), TYPE((objheader_t *)mobj));
1722                                 return;
1723                         }
1724     }
1725   }
1726 }
1727
1728 /* This function completes the ABORT process if the transaction is aborting */
1729 int transAbortProcess(trans_commit_data_t *transinfo) {
1730   int i, numlocked;
1731   unsigned int *objlocked;
1732   void *header;
1733
1734   numlocked = transinfo->numlocked;
1735   objlocked = transinfo->objlocked;
1736
1737   int useWriteUnlock = 0;
1738   for (i = 0; i < numlocked; i++) {
1739     if(objlocked[i] == -1) {
1740       useWriteUnlock = 1;
1741       continue;
1742     }
1743     if((header = mhashSearch(objlocked[i])) == NULL) {
1744       printf("mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__);
1745       return 1;
1746     }
1747     if(!useWriteUnlock) {
1748       read_unlock(STATUSPTR(header));
1749     } else {
1750       write_unlock(STATUSPTR(header));
1751     }
1752   }
1753
1754   return 0;
1755 }
1756
1757 /*This function completes the COMMIT process if the transaction is commiting*/
1758 int transComProcess(trans_req_data_t *tdata, trans_commit_data_t *transinfo) {
1759   objheader_t *header, *tcptr;
1760   int i, nummod, tmpsize, numcreated, numlocked;
1761   unsigned int *oidmod, *oidcreated, *oidlocked;
1762   void *ptrcreate;
1763 #ifdef DEBUG
1764         printf("%s-> Entering transComProcess, trans.c\n", __func__);
1765 #endif
1766
1767   nummod = tdata->f.nummod;
1768   oidmod = tdata->oidmod;
1769   numcreated = tdata->f.numcreated;
1770   oidcreated = tdata->oidcreated;
1771   numlocked = transinfo->numlocked;
1772   oidlocked = transinfo->objlocked;
1773
1774 #ifdef DEBUG
1775         printf("%s-> nummod: %d, numcreated: %d, numlocked: %d\n", __func__, nummod, numcreated, numlocked);
1776 #endif
1777
1778   for (i = 0; i < nummod; i++) {
1779     if((header = (objheader_t *) mhashSearch(oidmod[i])) == NULL) {
1780       printf("Error: transComProcess() mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__);
1781       return 1;
1782     }
1783     /* Copy from transaction cache -> main object store */
1784     if ((tcptr = ((objheader_t *) t_chashSearch(oidmod[i]))) == NULL) {
1785       printf("Error: transComProcess() chashSearch returned NULL at %s, %d\n", __FILE__, __LINE__);
1786       return 1;
1787     }
1788     GETSIZE(tmpsize, header);
1789     char *tmptcptr = (char *) tcptr;
1790     {
1791       struct ___Object___ *dst=(struct ___Object___*)((char*)header+sizeof(objheader_t));
1792       struct ___Object___ *src=(struct ___Object___*)((char*)tmptcptr+sizeof(objheader_t));
1793       dst->___cachedCode___=src->___cachedCode___;
1794       dst->___cachedHash___=src->___cachedHash___;
1795
1796       memcpy(&dst[1], &src[1], tmpsize-sizeof(struct ___Object___));
1797     }
1798
1799     header->version += 1;
1800     //printf("oid: %u, new header version: %d\n", oidmod[i], header->version);
1801     if(header->notifylist != NULL) {
1802 #ifdef DEBUG
1803       printf("%s -> type : %d notifylist : %d\n",__func__,TYPE(header),header->notifylist);
1804 #endif
1805 #ifdef RECOVERY
1806       if(header->isBackup != 0)
1807         notifyAll(&header->notifylist, OID(header), header->version);
1808       else
1809         clearNotifyList(OID(header));
1810 #else  
1811       notifyAll(&header->notifylist, OID(header), header->version);
1812 #endif
1813     }
1814   }
1815   /* If object is newly created inside transaction then commit it */
1816   for (i = 0; i < numcreated; i++) {
1817     if ((header = ((objheader_t *) t_chashSearch(oidcreated[i]))) == NULL) {
1818       printf("Error: transComProcess() chashSearch returned NULL for oid = %x at %s, %d\n", oidcreated[i], __FILE__, __LINE__);
1819       return 1;
1820     }
1821     header->version += 1;
1822     //printf("oid: %u, new header version: %d\n", oidcreated[i], header->version);
1823     GETSIZE(tmpsize, header);
1824     tmpsize += sizeof(objheader_t);
1825     pthread_mutex_lock(&mainobjstore_mutex);
1826     if ((ptrcreate = objstrAlloc(&mainobjstore, tmpsize)) == NULL) {
1827       printf("Error: transComProcess() failed objstrAlloc %s, %d\n", __FILE__, __LINE__);
1828       pthread_mutex_unlock(&mainobjstore_mutex);
1829       return 1;
1830     }
1831     pthread_mutex_unlock(&mainobjstore_mutex);
1832     /* Initialize read and write locks */
1833     initdsmlocks(STATUSPTR(header));
1834     memcpy(ptrcreate, header, tmpsize);
1835     mhashInsert(oidcreated[i], ptrcreate);
1836     lhashInsert(oidcreated[i], myIpAddr);
1837   }
1838   /* Unlock locked objects */
1839   int useWriteUnlock = 0;
1840   for(i = 0; i < numlocked; i++) {
1841     if(oidlocked[i] == -1) {
1842       useWriteUnlock = 1;
1843       continue;
1844     }
1845     if((header = (objheader_t *) mhashSearch(oidlocked[i])) == NULL) {
1846       printf("mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__);
1847       return 1;
1848     }
1849     if(!useWriteUnlock) {
1850       read_unlock(STATUSPTR(header));
1851     } else {
1852       write_unlock(STATUSPTR(header));
1853     }
1854   }
1855   return 0;
1856 }
1857
1858 prefetchpile_t *foundLocal(char *ptr) {
1859         int siteid = *(GET_SITEID(ptr));
1860         int ntuples = *(GET_NTUPLES(ptr));
1861         unsigned int * oidarray = GET_PTR_OID(ptr);
1862         unsigned short * endoffsets = GET_PTR_EOFF(ptr, ntuples);
1863         short * arryfields = GET_PTR_ARRYFLD(ptr, ntuples);
1864         prefetchpile_t * head=NULL;
1865         int numLocal = 0;
1866
1867         int i;
1868         for(i=0; i<ntuples; i++) {
1869                 unsigned short baseindex=(i==0) ? 0 : endoffsets[i-1];
1870                 unsigned short endindex=endoffsets[i];
1871                 unsigned int oid=oidarray[i];
1872                 int newbase;
1873                 int machinenum;
1874                 if (oid==0)
1875                         continue;
1876                 //Look up fields locally
1877                 for(newbase=baseindex; newbase<endindex; newbase++) {
1878                         if (!lookupObject(&oid, arryfields[newbase]))
1879                                 break;
1880                         //Ended in a null pointer...
1881                         if (oid==0)
1882                                 goto tuple;
1883                 }
1884                 //Entire prefetch is local
1885                 if (newbase==endindex&&checkoid(oid)) {
1886                         numLocal++;
1887                         goto tuple;
1888                 }
1889                 //Add to remote requests
1890                 machinenum=lhashSearch(oid);
1891                 insertPile(machinenum, oid, endindex-newbase, &arryfields[newbase], &head);
1892 tuple:
1893                 ;
1894         }
1895
1896         /* handle dynamic prefetching */
1897         handleDynPrefetching(numLocal, ntuples, siteid);
1898         return head;
1899 }
1900
1901 int checkoid(unsigned int oid) {
1902         objheader_t *header;
1903         if ((header=mhashSearch(oid))!=NULL) {
1904                 //Found on machine
1905                 return 1;
1906         } else if ((header=prehashSearch(oid))!=NULL) {
1907                 //Found in cache
1908                 return 1;
1909         } else {
1910                 return 0;
1911         }
1912 }
1913
1914 int lookupObject(unsigned int * oid, short offset) {
1915   objheader_t *header;
1916   if ((header=mhashSearch(*oid))!=NULL) {
1917     //Found on machine
1918     ;
1919   } else if ((header=prehashSearch(*oid))!=NULL) {
1920     //Found in cache
1921     ;
1922   } else {
1923     return 0;
1924   }
1925
1926   if(TYPE(header) >= NUMCLASSES) {
1927     int elementsize = classsize[TYPE(header)];
1928     struct ArrayObject *ao = (struct ArrayObject *) (((char *)header) + sizeof(objheader_t));
1929     int length = ao->___length___;
1930     /* Check if array out of bounds */
1931     if(offset < 0 || offset >= length) {
1932       //if yes treat the object as found
1933       (*oid)=0;
1934       return 1;
1935     }
1936     (*oid) = *((unsigned int *)(((char *)ao) + sizeof(struct ArrayObject) + (elementsize*offset)));
1937     return 1;
1938   } else {
1939     (*oid) = *((unsigned int *)(((char *)header) + sizeof(objheader_t) + offset));
1940     return 1;
1941   }
1942 }
1943
1944
1945 /* This function is called by the thread calling transPrefetch */
1946 void *transPrefetch(void *t) {
1947   while(1) {
1948     /* read from prefetch queue */
1949     void *node=gettail();
1950     /* Check if the tuples are found locally, if yes then reduce them further*/
1951     /* and group requests by remote machine ids by calling the makePreGroups() */
1952     prefetchpile_t *pilehead = foundLocal(node);
1953
1954     if (pilehead!=NULL) {
1955       // Get sock from shared pool
1956
1957       /* Send  Prefetch Request */
1958       prefetchpile_t *ptr = pilehead;
1959       while(ptr != NULL) {
1960         int sd = getSock2(transPrefetchSockPool, ptr->mid);
1961         sendPrefetchReq(ptr, sd);
1962         ptr = ptr->next;
1963       }
1964
1965       /* Release socket */
1966       //        freeSock(transPrefetchSockPool, pilehead->mid, sd);
1967
1968       /* Deallocated pilehead */
1969       mcdealloc(pilehead);
1970     }
1971     // Deallocate the prefetch queue pile node
1972     inctail();
1973   }
1974 }
1975
1976 void sendPrefetchReqnew(prefetchpile_t *mcpilenode, int sd) {
1977   objpile_t *tmp;
1978
1979   int size=sizeof(char)+sizeof(int);
1980   for(tmp=mcpilenode->objpiles; tmp!=NULL; tmp=tmp->next) {
1981     size += sizeof(int) + sizeof(unsigned int) + sizeof(unsigned int) + ((tmp->numoffset) * sizeof(short));
1982   }
1983
1984   char buft[size];
1985   char *buf=buft;
1986   *buf=TRANS_PREFETCH;
1987   buf+=sizeof(char);
1988
1989   for(tmp=mcpilenode->objpiles; tmp!=NULL; tmp=tmp->next) {
1990     int len = sizeof(int) + sizeof(unsigned int) + sizeof(unsigned int) + ((tmp->numoffset) * sizeof(short));
1991     *((int*)buf)=len;
1992     buf+=sizeof(int);
1993     *((unsigned int *)buf)=tmp->oid;
1994     buf+=sizeof(unsigned int);
1995     *((unsigned int *)(buf)) = myIpAddr;
1996     buf+=sizeof(unsigned int);
1997     memcpy(buf, tmp->offset, tmp->numoffset*sizeof(short));
1998     buf+=tmp->numoffset*sizeof(short);
1999   }
2000   *((int *)buf)=-1;
2001   send_data(sd, buft, size);
2002   return;
2003 }
2004
2005 void sendPrefetchReq(prefetchpile_t *mcpilenode, int sd) {
2006   int len, endpair;
2007   char control;
2008   objpile_t *tmp;
2009
2010   /* Send TRANS_PREFETCH control message */
2011   control = TRANS_PREFETCH;
2012   send_data(sd, &control, sizeof(char));
2013
2014   /* Send Oids and offsets in pairs */
2015   tmp = mcpilenode->objpiles;
2016   while(tmp != NULL) {
2017     len = sizeof(int) + sizeof(unsigned int) + sizeof(unsigned int) + ((tmp->numoffset) * sizeof(short));
2018     char oidnoffset[len];
2019     char *buf=oidnoffset;
2020     *((int*)buf) = tmp->numoffset;
2021     buf+=sizeof(int);
2022     *((unsigned int *)buf) = tmp->oid;
2023     buf+=sizeof(unsigned int);
2024     *((unsigned int *)buf) = myIpAddr;
2025     buf += sizeof(unsigned int);
2026     memcpy(buf, tmp->offset, (tmp->numoffset)*sizeof(short));
2027     send_data(sd, oidnoffset, len);
2028     tmp = tmp->next;
2029   }
2030
2031   /* Send a special char -1 to represent the end of sending oids + offset pair to remote machine */
2032   endpair = -1;
2033   send_data(sd, &endpair, sizeof(int));
2034
2035   return;
2036 }
2037
2038 int getPrefetchResponse(int sd) {
2039   int length = 0, size = 0;
2040   char control;
2041   unsigned int oid;
2042   void *modptr, *oldptr;
2043
2044   recv_data((int)sd, &length, sizeof(int));
2045   size = length - sizeof(int);
2046   char recvbuffer[size];
2047
2048   recv_data((int)sd, recvbuffer, size);
2049   control = *((char *) recvbuffer);
2050   if(control == OBJECT_FOUND) {
2051     oid = *((unsigned int *)(recvbuffer + sizeof(char)));
2052     size = size - (sizeof(char) + sizeof(unsigned int));
2053     pthread_mutex_lock(&prefetchcache_mutex);
2054     if ((modptr = prefetchobjstrAlloc(size)) == NULL) {
2055       printf("Error: objstrAlloc error for copying into prefetch cache %s, %d\n", __FILE__, __LINE__);
2056       pthread_mutex_unlock(&prefetchcache_mutex);
2057       return -1;
2058     }
2059     pthread_mutex_unlock(&prefetchcache_mutex);
2060     memcpy(modptr, recvbuffer + sizeof(char) + sizeof(unsigned int), size);
2061     STATUS(modptr)=0;
2062
2063     /* Insert the oid and its address into the prefetch hash lookup table */
2064     /* Do a version comparison if the oid exists */
2065     if((oldptr = prehashSearch(oid)) != NULL) {
2066       /* If older version then update with new object ptr */
2067       if(((objheader_t *)oldptr)->version <= ((objheader_t *)modptr)->version) {
2068         prehashRemove(oid);
2069         prehashInsert(oid, modptr);
2070       }
2071     } else { /* Else add the object ptr to hash table*/
2072       prehashInsert(oid, modptr);
2073     }
2074     /* Lock the Prefetch Cache look up table*/
2075     pthread_mutex_lock(&pflookup.lock);
2076     /* Broadcast signal on prefetch cache condition variable */
2077     pthread_cond_broadcast(&pflookup.cond);
2078     /* Unlock the Prefetch Cache look up table*/
2079     pthread_mutex_unlock(&pflookup.lock);
2080   } else if(control == OBJECT_NOT_FOUND) {
2081     oid = *((unsigned int *)(recvbuffer + sizeof(char)));
2082     /* TODO: For each object not found query DHT for new location and retrieve the object */
2083     /* Throw an error */
2084     //printf("OBJECT %x NOT FOUND.... THIS SHOULD NOT HAPPEN...TERMINATE PROGRAM\n", oid);
2085     //    exit(-1);
2086   } else {
2087     printf("Error: in decoding the control value %d, %s, %d\n",control, __FILE__, __LINE__);
2088   }
2089
2090   return 0;
2091 }
2092
2093 unsigned short getObjType(unsigned int oid) {
2094   objheader_t *objheader;
2095   unsigned short numoffset[] ={0};
2096   short fieldoffset[] ={};
2097
2098   if ((objheader = (objheader_t *) mhashSearch(oid)) == NULL) {
2099 #ifdef CACHE
2100     if ((objheader = (objheader_t *) prehashSearch(oid)) == NULL) {
2101 #endif
2102     unsigned int mid = lhashSearch(oid);
2103     int sd = getSock2(transReadSockPool, mid);
2104     char remotereadrequest[sizeof(char)+sizeof(unsigned int)];
2105     remotereadrequest[0] = READ_REQUEST;
2106     *((unsigned int *)(&remotereadrequest[1])) = oid;
2107     send_data(sd, remotereadrequest, sizeof(remotereadrequest));
2108
2109     /* Read response from the Participant */
2110     char control;
2111     recv_data(sd, &control, sizeof(char));
2112
2113     if (control==OBJECT_NOT_FOUND) {
2114       printf("Error: in %s() THIS SHOULD NOT HAPPEN.....EXIT PROGRAM\n", __func__);
2115       fflush(stdout);
2116       exit(-1);
2117     } else {
2118       /* Read object if found into local cache */
2119       int size;
2120       recv_data(sd, &size, sizeof(int));
2121 #ifdef CACHE
2122       pthread_mutex_lock(&prefetchcache_mutex);
2123       if ((objheader = prefetchobjstrAlloc(size)) == NULL) {
2124         printf("Error: %s() objstrAlloc error for copying into prefetch cache %s, %d\n", __func__, __FILE__, __LINE__);
2125         pthread_exit(NULL);
2126       }
2127       pthread_mutex_unlock(&prefetchcache_mutex);
2128       recv_data(sd, objheader, size);
2129       prehashInsert(oid, objheader);
2130       return TYPE(objheader);
2131 #else
2132       char *buffer;
2133       if((buffer = calloc(1, size)) == NULL) {
2134         printf("%s() Calloc Error %s at line %d\n", __func__, __FILE__, __LINE__);
2135         fflush(stdout);
2136         return 0;
2137       }
2138       recv_data(sd, buffer, size);
2139       objheader = (objheader_t *)buffer;
2140       unsigned short type = TYPE(objheader);
2141       free(buffer);
2142       return type;
2143 #endif
2144     }
2145 #ifdef CACHE
2146   }
2147 #endif
2148   }
2149   return TYPE(objheader);
2150 }
2151
2152 int startRemoteThread(unsigned int oid, unsigned int mid) {
2153   int sock;
2154   struct sockaddr_in remoteAddr;
2155   char msg[1 + sizeof(unsigned int)];
2156   int bytesSent;
2157   int status;
2158
2159   if ((sock = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
2160     perror("startRemoteThread():socket()");
2161     return -1;
2162   }
2163
2164   bzero(&remoteAddr, sizeof(remoteAddr));
2165   remoteAddr.sin_family = AF_INET;
2166   remoteAddr.sin_port = htons(LISTEN_PORT);
2167   remoteAddr.sin_addr.s_addr = htonl(mid);
2168
2169   if (connect(sock, (struct sockaddr *)&remoteAddr, sizeof(remoteAddr)) < 0) {
2170     printf("startRemoteThread():error %d connecting to %s:%d\n", errno,
2171            inet_ntoa(remoteAddr.sin_addr), LISTEN_PORT);
2172     status = -1;
2173   } else
2174   {
2175     msg[0] = START_REMOTE_THREAD;
2176     *((unsigned int *) &msg[1]) = oid;
2177     send_data(sock, msg, 1 + sizeof(unsigned int));
2178   }
2179
2180   close(sock);
2181   return status;
2182 }
2183
2184 //TODO: when reusing oids, make sure they are not already in use!
2185 static unsigned int id = 0xFFFFFFFF;
2186 unsigned int getNewOID(void) {
2187   id += 2;
2188   if (id > oidMax || id < oidMin) {
2189     id = (oidMin | 1);
2190   }
2191   return id;
2192 }
2193
2194 static unsigned int tid = 0xFFFFFFFF;
2195 unsigned int getNewTransID(void) {
2196   tid++;
2197   if (tid > transIDMax || tid < transIDMin) {
2198     tid = (transIDMin | 1);
2199   }
2200   return tid;
2201 }
2202
2203 int processConfigFile() {
2204   FILE *configFile;
2205   const int maxLineLength = 200;
2206   char lineBuffer[maxLineLength];
2207   char *token;
2208   const char *delimiters = " \t\n";
2209   char *commentBegin;
2210   unsigned int i;
2211   in_addr_t tmpAddr;
2212
2213   configFile = fopen(CONFIG_FILENAME, "r");
2214   if (configFile == NULL) {
2215     printf("error opening %s:\n", CONFIG_FILENAME);
2216     perror("");
2217     return -1;
2218   }
2219
2220   numHostsInSystem = 0;
2221   sizeOfHostArray = 8;
2222   hostIpAddrs = calloc(sizeOfHostArray, sizeof(unsigned int));
2223 #ifdef RECOVERY 
2224         liveHosts = calloc(sizeOfHostArray, sizeof(unsigned int));
2225         locateObjHosts = calloc(sizeOfHostArray*2, sizeof(unsigned int));
2226
2227   liveHostsValid = 0;
2228 #endif
2229
2230         while(fgets(lineBuffer, maxLineLength, configFile) != NULL) {
2231                 commentBegin = strchr(lineBuffer, '#');
2232                 if (commentBegin != NULL)
2233                         *commentBegin = '\0';
2234                 token = strtok(lineBuffer, delimiters);
2235                 while (token != NULL) {
2236                         tmpAddr = inet_addr(token);
2237                         if ((int)tmpAddr == -1) {
2238                                 printf("error in %s: bad token:%s\n", CONFIG_FILENAME, token);
2239                                 fclose(configFile);
2240                                 return -1;
2241                         } else
2242                                 addHost(htonl(tmpAddr));
2243                         token = strtok(NULL, delimiters);
2244                 }
2245         }
2246
2247         fclose(configFile);
2248
2249   if (numHostsInSystem < 1) {
2250     printf("error in %s: no IP Adresses found\n", CONFIG_FILENAME);
2251     return -1;
2252   }
2253 #ifdef MAC
2254   myIpAddr = getMyIpAddr("en1");
2255 #else
2256   myIpAddr = getMyIpAddr("eth0");
2257 #endif
2258   myIndexInHostArray = findHost(myIpAddr);
2259 #ifdef RECOVERY
2260         liveHosts[myIndexInHostArray] = 1;
2261         //locateObjHosts[myIndexInHostArray] = myIpAddr;
2262 #endif  
2263         if (myIndexInHostArray == -1) {
2264     printf("error in %s: IP Address of eth0 not found\n", CONFIG_FILENAME);
2265     return -1;
2266   }
2267   oidsPerBlock = (0xFFFFFFFF / numHostsInSystem) + 1;
2268   oidMin = oidsPerBlock * myIndexInHostArray;
2269   if (myIndexInHostArray == numHostsInSystem - 1)
2270     oidMax = 0xFFFFFFFF;
2271   else
2272     oidMax = oidsPerBlock * (myIndexInHostArray + 1) - 1;
2273
2274         transIDMin = oidMin;
2275         transIDMax = oidMax;
2276
2277   waitThreadID = -1;
2278   waitThreadMid = -1;
2279
2280   return 0;
2281 }
2282
2283 #ifdef RECOVERY
2284 unsigned int getDuplicatedPrimaryMachine(unsigned int mid) {
2285         int i;
2286         for(i = 0; i < numHostsInSystem; i++) {
2287                 if(mid == locateObjHosts[(i*2)+1]) {
2288                         return locateObjHosts[i*2];
2289                 }
2290         }
2291         return -1;
2292 }
2293
2294 unsigned int getPrimaryMachine(unsigned int mid) {
2295         unsigned int pmid;
2296         int pmidindex = 2*findHost(mid);
2297
2298         pthread_mutex_lock(&liveHosts_mutex);
2299         pmid = locateObjHosts[pmidindex];
2300         pthread_mutex_unlock(&liveHosts_mutex);
2301         return pmid;
2302 }
2303
2304 unsigned int getBackupMachine(unsigned int mid) {
2305         unsigned int bmid;
2306         int bmidindex = 2*findHost(mid)+1;
2307
2308         pthread_mutex_lock(&liveHosts_mutex);
2309         bmid = locateObjHosts[bmidindex];
2310         pthread_mutex_unlock(&liveHosts_mutex);
2311         return bmid;
2312 }
2313
2314 int getStatus(int mid) {
2315 #ifdef DEBUG
2316   printf("%s -> host %s : %s\n",__func__,midtoIPString(hostIpAddrs[mid]),(liveHosts[mid] == 1)?"LIVE":"DEAD");
2317 #endif
2318   return liveHosts[mid];
2319 }
2320 #endif
2321
2322 #ifdef RECOVERY
2323 // updates the leader's liveHostArray and locateObj
2324 unsigned int updateLiveHosts() {
2325 #ifdef DEBUG
2326     printf("%s-> Entering updateLiveHosts\n", __func__);        
2327 #endif
2328         // update everyone's list
2329     liveHostsValid = 0;
2330         
2331   //foreach in hostipaddrs, ping -> update list of livemachines 
2332   //socket connection?
2333
2334   int deadhost = -1;
2335         //liveHosts lock here
2336         int sd = 0, i, j, tmpNumLiveHosts = 0;
2337         for(i = 0; i < numHostsInSystem; i++) {
2338     if(i == myIndexInHostArray) 
2339                 {       
2340                         tmpNumLiveHosts++;
2341                         continue;
2342                 }
2343                 for(j = 0; j < 5; j++) {        // hard define num of retries
2344                         if((sd = getSock2WithLock(transRequestSockPool, hostIpAddrs[i])) < 0) {
2345 #ifdef DEBUG
2346                 printf("%s -> Cannot create socket connection to [%s], attempt %d\n", __func__, midtoIPString(hostIpAddrs[i]), j);
2347 #endif
2348                                 usleep(1000);
2349   
2350         if(j == 4) {
2351                           if(liveHosts[i]) {
2352             liveHosts[i] = 0;
2353             deadhost = i;
2354           }
2355         }
2356                         continue;
2357                   }
2358       
2359       char liverequest[sizeof(char)];
2360                         liverequest[0] = RESPOND_LIVE;
2361         
2362                         send_data(sd, &liverequest[0], sizeof(liverequest));
2363       
2364                         char response = 0;
2365                         int timeout = recv_data(sd, &response, sizeof(response));
2366                         
2367                         //try to send msg
2368                         //if timeout, dead host
2369                         if(response == LIVE) {
2370                         liveHosts[i] = 1;
2371                         tmpNumLiveHosts++;
2372                         }
2373                         else {
2374         if(liveHosts[i]) {
2375           liveHosts[i] = 0;
2376           deadhost = i;
2377         }
2378                   }
2379                         break;
2380                 }
2381 #ifdef DEBUG
2382           if(liveHosts[i] == 0)
2383
2384                   printf("updateLiveHosts(): cannot make connection to machine %s\n", midtoIPString(hostIpAddrs[i]));
2385 #endif
2386         }
2387         numLiveHostsInSystem = tmpNumLiveHosts;
2388 #ifdef DEBUG
2389         printf("numLiveHostsInSystem:%d\n", numLiveHostsInSystem);
2390 #endif
2391         //have updated list of live machines
2392 #ifdef DEBUG    
2393   printf("%s-> Exiting updateLiveHosts\n", __func__);   
2394         printHostsStatus();
2395 #endif
2396
2397   return deadhost;
2398 }
2399
2400 int getNumLiveHostsInSystem() {
2401         int count = 0, i = 0;
2402         for(; i<numHostsInSystem; i++) {
2403                 if(liveHosts[i])
2404                         count++;
2405         }
2406         return count;
2407 }
2408
2409 int updateLiveHostsCommit() {
2410 #ifdef DEBUG      
2411   printf("%s -> Enter\n",__func__);
2412 #endif
2413         int sd = 0, i;
2414         
2415         char updaterequest[sizeof(char)+sizeof(int)*numHostsInSystem+sizeof(unsigned int)*(numHostsInSystem*2)];
2416   
2417   updaterequest[0] = UPDATE_LIVE_HOSTS;
2418                 
2419         for(i = 0; i < numHostsInSystem; i++) {
2420                 *((int *)(&updaterequest[i*4+1])) = liveHosts[i];  // clean this up later
2421         }
2422
2423         for(i = 0; i < numHostsInSystem*2; i++) {
2424                 *((unsigned int *)(&updaterequest[i*4+(numHostsInSystem*4)+1])) = locateObjHosts[i];    //ditto
2425         }
2426
2427         //for each machine send data
2428         
2429         for(i = 0; i < numHostsInSystem; i++) {         // hard define num of retries
2430                 if(i == myIndexInHostArray) 
2431                         continue;
2432                 if(liveHosts[i] == 1) {
2433                         if((sd = getSock2WithLock(transRequestSockPool, hostIpAddrs[i])) < 0) {
2434                         printf("%s -> socket create error, attempt %d\n",__func__, i);
2435                                 return -1;
2436                         }
2437                         send_data(sd, updaterequest, sizeof(updaterequest));
2438                 }
2439         }
2440         liveHostsValid = 1;
2441 #ifdef DEBUG
2442         printHostsStatus();
2443   printf("%s -> Finish\n",__func__);
2444 #endif
2445
2446         return 0;
2447 }
2448
2449 void setLocateObjHosts() {
2450         int i = 0, validIndex = 0;
2451
2452         //check num hosts even valid first
2453         
2454         for(;i < numHostsInSystem; i++) {
2455 #ifdef DEBUG
2456     printf("%s-> i:%d\n", __func__, i);
2457 #endif
2458                 
2459                 while(liveHosts[(i+validIndex)%numHostsInSystem] == 0) {
2460                         validIndex++;
2461                 }
2462                 locateObjHosts[i*2] = hostIpAddrs[(i+validIndex)%numHostsInSystem];
2463 #ifdef DEBUG
2464                 printf("%s-> locateObjHosts[%d]:%s\n", __func__, i*2, midtoIPString(locateObjHosts[(i*2)]));
2465 #endif
2466
2467                 validIndex++;
2468                 while(liveHosts[(i+validIndex)%numHostsInSystem] == 0) {
2469                         validIndex++;
2470                 }
2471 #ifdef DEBUG
2472                 printf("%s-> validIndex:%d, this mid is: [%s]\n", __func__, validIndex, midtoIPString(hostIpAddrs[(i+validIndex)%numHostsInSystem]));
2473 #endif
2474                 locateObjHosts[(i*2)+1] = hostIpAddrs[(i+validIndex)%numHostsInSystem];
2475                 validIndex=0;
2476
2477 #ifdef DEBUG
2478                 printf("%s-> locateObjHosts[%d]:%s\n", __func__, i*2+1, midtoIPString(locateObjHosts[(i*2)+1]));
2479 #endif
2480         }
2481 }
2482
2483 void setReLocateObjHosts(int mid)
2484 {
2485   int mIndex = findHost(mid);
2486   int backupMachine = getBackupMachine(mid);
2487   int newPrimary = getDuplicatedPrimaryMachine(mid);
2488   int newPrimaryIndex = findHost(newPrimary);
2489   int i;
2490
2491   locateObjHosts[2*newPrimaryIndex+1] = backupMachine;
2492   locateObjHosts[2*mIndex] = newPrimary;
2493
2494 /* relocate the objects of the machines already dead */
2495   for(i=0; i<numHostsInSystem *2; i+=2) {
2496     if(locateObjHosts[i] == mid)
2497       locateObjHosts[i] = newPrimary;
2498     if(locateObjHosts[i+1] == mid)
2499       locateObjHosts[i+1] = backupMachine;
2500   }
2501 }
2502
2503
2504 //debug function
2505 void printHostsStatus() {
2506         int i;
2507         printf("%s-> *printing live machines and backups*\n", __func__);
2508         for(i = 0; i < numHostsInSystem; i++) {
2509                 if(liveHosts[i]) {
2510                         printf("%s-> [%s]: LIVE\n", __func__, midtoIPString(hostIpAddrs[i])); 
2511                 }
2512                 else {
2513                         printf("%s-> [%s]: DEAD\n", __func__, midtoIPString(hostIpAddrs[i]));
2514                 }
2515                         printf("%s-> original:\t[%s]\n", __func__, midtoIPString(locateObjHosts[i*2]));
2516                         printf("%s-> backup:\t[%s]\n", __func__, midtoIPString(locateObjHosts[i*2+1]));
2517         }
2518 }
2519
2520 int allHostsLive() {
2521         int i;
2522         for(i = 0; i < numHostsInSystem; i++) {
2523                 if(!liveHosts[i])
2524                         return 0;
2525         }
2526         return 1;
2527 }
2528
2529 void duplicateLostObjects(unsigned int mid){
2530
2531         printf("%s-> Start, mid: [%s]\n", __func__, midtoIPString(mid));  
2532         
2533         //this needs to be changed.
2534         unsigned int backupMid = getBackupMachine(mid); // get backup machine of dead machine
2535         unsigned int originalMid = getDuplicatedPrimaryMachine(mid); // get primary machine that used deadmachine as backup machine.
2536
2537 #ifdef DEBUG
2538         printf("%s-> backupMid: [%s], ", __func__, midtoIPString(backupMid));
2539         printf("originalMid: [%s]\n", midtoIPString(originalMid));
2540         printHostsStatus(); 
2541 #endif
2542
2543   setReLocateObjHosts(mid);
2544         
2545         //connect to these machines
2546         //go through their object store copying necessary (in a transaction)
2547         //transRequestSockPool = createSockPool(transRequestSockPool, DEFAULTSOCKPOOLSIZE);
2548         int sd = 0, i, j, tmpNumLiveHosts = 0;
2549
2550   /* duplicateLostObject example
2551    * Before M24 die,
2552    * MID        21      24      26
2553    * Primary    21      24      26
2554    * Backup     26      21      24
2555    * After M24 die,
2556    * MID        21      26
2557    * Primary   21,24    26
2558    * Backup     26      21,24
2559    */
2560
2561         if(originalMid == myIpAddr) {   // copy local machine's backup data, make it as primary data of backup machine.
2562                 duplicateLocalOriginalObjects(backupMid);       
2563         }
2564         else if((sd = getSockWithLock(transRequestSockPool, originalMid)) < 0) {
2565                 printf("%s -> socket create error, attempt %d\n", __func__,j);
2566     exit(0);
2567                 //usleep(1000);
2568         }
2569         else {      // if original is not local
2570                 char duperequest;
2571                 duperequest = DUPLICATE_ORIGINAL;
2572                 send_data(sd, &duperequest, sizeof(char));
2573 #ifdef DEBUG
2574           printf("%s-> SD : %d  Sent DUPLICATE_ORIGINAL request to %s\n", __func__,sd,midtoIPString(originalMid));      
2575 #endif
2576                 send_data(sd, &backupMid, sizeof(unsigned int));
2577
2578     char response;
2579                 recv_data(sd, &response, sizeof(char));
2580 #ifdef DEBUG
2581                 printf("%s (DUPLICATE_ORIGINAL) -> Received %s\n", __func__,(response==DUPLICATION_COMPLETE)?"DUPLICATION_COMPLETE":"DUPLICATION_FAIL");
2582 #endif
2583
2584     freeSockWithLock(transRequestSockPool, originalMid, sd);
2585         }
2586
2587         if(backupMid == myIpAddr) {   // copy local machine's primary data, and make it as backup data of original machine.
2588                 duplicateLocalBackupObjects(originalMid);       
2589         }
2590         else if((sd = getSockWithLock(transRequestSockPool, backupMid)) < 0) {
2591                 printf("updateLiveHosts(): socket create error, attempt %d\n", j);
2592                 exit(1);
2593         }
2594         else {
2595                 char duperequest;
2596                 duperequest = DUPLICATE_BACKUP;
2597                 send_data(sd, &duperequest, sizeof(char));
2598 #ifdef DEBUG
2599           printf("%s-> SD : %d  Sent DUPLICATE_BACKUP request to %s\n", __func__,sd,midtoIPString(backupMid));  
2600 #endif
2601                 send_data(sd, &originalMid, sizeof(unsigned int));
2602
2603                 char response;
2604                 recv_data(sd, &response, sizeof(char));
2605 #ifdef DEBUG
2606                 printf("%s (DUPLICATE_BACKUP) -> Received %s\n", __func__,(response==DUPLICATION_COMPLETE)?"DUPLICATION_COMPLETE":"DUPLICATION_FAIL");
2607 #endif
2608
2609     freeSockWithLock(transRequestSockPool, backupMid, sd);
2610         }
2611
2612 #ifndef DEBUG
2613         printf("%s-> End\n", __func__);  
2614 #endif
2615 }
2616
2617 void duplicateLocalBackupObjects(unsigned int mid) {
2618         int tempsize, sd;
2619   int i;
2620         char *dupeptr, ctrl, response;
2621 #ifndef DEBUG
2622         printf("%s-> Start; backup mid:%s\n", __func__, midtoIPString(mid));  
2623 #endif
2624
2625         //copy code from dstmserver here
2626         tempsize = mhashGetDuplicate((void**)&dupeptr, 1);
2627
2628 #ifdef DEBUG
2629         printf("tempsize:%d, dupeptrfirstvalue:%d\n", tempsize, *((unsigned int *)(dupeptr)));
2630 #endif
2631         //send control and dupes after
2632         ctrl = RECEIVE_DUPES;
2633         if((sd = getSockWithLock(transRequestSockPool, mid)) < 0) {
2634                 printf("duplicatelocalbackup: socket create error\n");
2635                 //usleep(1000);
2636         }
2637 #ifdef DEBUG
2638         printf("%s -> sd:%d, tempsize:%d, dupeptrfirstvalue:%d\n", __func__,sd, tempsize, *((unsigned int *)(dupeptr)));
2639 #endif
2640         send_data(sd, &ctrl, sizeof(char));
2641         send_data(sd, dupeptr, tempsize);
2642         
2643   recv_data(sd, &response, sizeof(char));
2644   freeSockWithLock(transRequestSockPool,mid,sd);
2645
2646 #ifdef DEBUG
2647   printf("%s ->response : %d  -  %d\n",__func__,response,DUPLICATION_COMPLETE);
2648 #endif
2649
2650         if(response != DUPLICATION_COMPLETE) {
2651 #ifndef DEBUG
2652     printf("%s -> DUPLICATION_FAIL\n",__func__);
2653 #endif
2654     exit(-1);
2655         }
2656   free(dupeptr);
2657
2658 #ifndef DEBUG
2659         printf("%s-> End\n", __func__);  
2660 #endif
2661
2662 }
2663
2664 void duplicateLocalOriginalObjects(unsigned int mid) {
2665         int tempsize, sd;
2666         char *dupeptr, ctrl, response;
2667
2668 #ifndef DEBUG
2669         printf("%s-> Start\n", __func__);  
2670 #endif
2671         //copy code fom dstmserver here
2672
2673         tempsize = mhashGetDuplicate((void**)&dupeptr, 0);
2674
2675         //send control and dupes after
2676         ctrl = RECEIVE_DUPES;
2677
2678         if((sd = getSockWithLock(transRequestSockPool, mid)) < 0) {
2679                 printf("DUPLICATE_ORIGINAL: socket create error\n");
2680                 //usleep(1000);
2681         }
2682 #ifdef DEBUG
2683         printf("sd:%d, tempsize:%d, dupeptrfirstvalue:%d\n", sd, tempsize, *((unsigned int *)(dupeptr)));
2684 #endif
2685
2686         send_data(sd, &ctrl, sizeof(char));
2687         send_data(sd, dupeptr, tempsize);
2688
2689         recv_data(sd, &response, sizeof(char));
2690   freeSockWithLock(transRequestSockPool,mid,sd);
2691
2692 #ifdef DEBUG
2693   printf("%s ->response : %d  -  %d\n",__func__,response,DUPLICATION_COMPLETE);
2694 #endif
2695
2696         if(response != DUPLICATION_COMPLETE) {
2697                 //fail message
2698 #ifndef DEBUG
2699     printf("%s -> DUPLICATION_FAIL\n",__func__);
2700 #endif
2701     exit(0);
2702         }
2703   
2704   free(dupeptr);
2705
2706 #ifndef DEBUG
2707         printf("%s-> End\n", __func__);  
2708 #endif
2709
2710 }
2711
2712 #endif
2713
2714 void addHost(unsigned int hostIp) {
2715   unsigned int *tmpArray;
2716   int *tmpliveHostsArray;       
2717         unsigned int *tmplocateObjHostsArray;
2718
2719   if (findHost(hostIp) != -1)
2720     return;
2721
2722   if (numHostsInSystem == sizeOfHostArray) {
2723     tmpArray = calloc(sizeOfHostArray * 2, sizeof(unsigned int));
2724     memcpy(tmpArray, hostIpAddrs, sizeof(unsigned int) * numHostsInSystem);
2725     free(hostIpAddrs);
2726     hostIpAddrs = tmpArray;
2727
2728 #ifdef RECOVERY
2729                 tmpliveHostsArray = calloc(sizeOfHostArray * 2, sizeof(unsigned int));
2730                 memcpy(tmpliveHostsArray, liveHosts, sizeof(unsigned int) * numHostsInSystem);
2731     free(liveHosts);
2732     liveHosts = tmpliveHostsArray;
2733                 
2734                 tmplocateObjHostsArray = calloc(sizeOfHostArray * 2 * 2, sizeof(unsigned int));
2735                 memcpy(tmplocateObjHostsArray, locateObjHosts, sizeof(unsigned int) * numHostsInSystem);
2736     free(locateObjHosts);
2737     locateObjHosts = tmplocateObjHostsArray;
2738 #endif
2739                 sizeOfHostArray *= 2;
2740   }
2741
2742   hostIpAddrs[numHostsInSystem] = hostIp;
2743
2744 #ifdef RECOVERY
2745   liveHosts[numHostsInSystem] = 0;
2746   locateObjHosts[numHostsInSystem*2] = hostIp;
2747 #endif
2748
2749         numHostsInSystem++;
2750   return;
2751 }
2752
2753 int findHost(unsigned int hostIp) {
2754   int i;
2755   for (i = 0; i < numHostsInSystem; i++)
2756     if (hostIpAddrs[i] == hostIp)
2757       return i;
2758
2759   //not found
2760   return -1;
2761 }
2762
2763 /* This function sends notification request per thread waiting on object(s) whose version
2764  * changes */
2765 #ifdef RECOVERY
2766 int reqNotify(unsigned int *oidarry, unsigned short *versionarry, unsigned int numoid, int waitmid) {
2767 #else
2768 int reqNotify(unsigned int *oidarry, unsigned short *versionarry, unsigned int numoid) {
2769 #endif
2770   int psock,i;
2771   objheader_t *objheader;
2772   struct sockaddr_in premoteAddr;
2773   char msg[1 + numoid * (sizeof(unsigned short) + sizeof(unsigned int)) +  3 * sizeof(unsigned int)];
2774   char *ptr;
2775   int status, size;
2776   unsigned short version;
2777   unsigned int oid,mid;
2778   static unsigned int threadid = 0;
2779   pthread_mutex_t threadnotify = PTHREAD_MUTEX_INITIALIZER; //Lock and condition var for threadjoin and notification
2780   pthread_cond_t threadcond = PTHREAD_COND_INITIALIZER;
2781   notifydata_t *ndata;
2782
2783 #ifdef RECOVERY
2784   int bsock;
2785   struct sockaddr_in bremoteAddr;
2786 #endif
2787
2788   oid = oidarry[0];
2789
2790   if((mid = lhashSearch(oid)) == 0) {
2791     printf("Error: %s() No such machine found for oid =%x\n",__func__, oid);
2792     return;
2793   }
2794 #ifdef RECOVERY
2795   int pmid = getPrimaryMachine(mid);
2796   int bmid = getBackupMachine(mid);
2797 #else
2798   int pmid = mid;
2799 #endif
2800
2801 #ifdef RECOVERY
2802   if ((psock = socket(AF_INET, SOCK_STREAM, 0)) < 0 ||
2803       (bsock = socket(AF_INET, SOCK_STREAM, 0)) < 0 ) {
2804 #else
2805   if ((psock = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
2806 #endif
2807     perror("reqNotify():socket()");
2808     return -1;
2809   }
2810
2811   /* for primary machine */
2812   bzero(&premoteAddr, sizeof(premoteAddr));
2813   premoteAddr.sin_family = AF_INET;
2814   premoteAddr.sin_port = htons(LISTEN_PORT);
2815   premoteAddr.sin_addr.s_addr = htonl(pmid);
2816
2817 #ifdef RECOVERY
2818   /* for backup machine */
2819   bzero(&bremoteAddr, sizeof(bremoteAddr));
2820   bremoteAddr.sin_family = AF_INET;
2821   bremoteAddr.sin_port = htons(LISTEN_PORT);
2822   bremoteAddr.sin_addr.s_addr = htonl(bmid);
2823 #endif
2824   /* Generate unique threadid */
2825   threadid++;
2826   
2827   /* Save threadid, numoid, oidarray, versionarray, pthread_cond_variable for later processing */
2828   if((ndata = calloc(1, sizeof(notifydata_t))) == NULL) {
2829     printf("Calloc Error %s, %d\n", __FILE__, __LINE__);
2830     return -1;
2831   }
2832   ndata->numoid = numoid;
2833   ndata->threadid = threadid;
2834   ndata->oidarry = oidarry;
2835   ndata->versionarry = versionarry;
2836   ndata->threadcond = threadcond;
2837   ndata->threadnotify = threadnotify;
2838   if((status = notifyhashInsert(threadid, ndata)) != 0) {
2839     printf("reqNotify(): Insert into notify hash table not successful %s, %d\n", __FILE__, __LINE__);
2840     free(ndata);
2841     return -1;
2842   }
2843
2844   /* Send  number of oids, oidarry, version array, machine id and threadid */
2845 #ifdef RECOVERY
2846   if ((connect(psock, (struct sockaddr *)&premoteAddr, sizeof(premoteAddr))< 0) || 
2847       (connect(bsock, (struct sockaddr *)&bremoteAddr, sizeof(bremoteAddr))< 0)) {
2848 #else
2849   if ((connect(psock, (struct sockaddr *)&premoteAddr, sizeof(premoteAddr))< 0)) {
2850 #endif
2851     printf("reqNotify():error %d connecting to %s:%d\n", errno,
2852     inet_ntoa(premoteAddr.sin_addr), LISTEN_PORT);
2853     free(ndata);
2854     return -1;
2855   } else {
2856
2857 #ifdef DEBUG
2858     printf("%s -> Pmid = %s\n",__func__,midtoIPString(pmid));
2859     printf("%s -> Bmid = %s\n",__func__,midtoIPString(bmid));
2860 #endif
2861
2862     msg[0] = THREAD_NOTIFY_REQUEST;
2863
2864     *((unsigned int *)(&msg[1])) = numoid;
2865     /* Send array of oids  */
2866     size = sizeof(unsigned int);
2867
2868     for(i = 0;i < numoid; i++) {
2869       oid = oidarry[i];
2870 #ifdef DEBUG
2871       printf("%s -> oid[%d] = %d\n",__func__,i,oidarry[i]);
2872 #endif
2873       *((unsigned int *)(&msg[1] + size)) = oid;
2874       size += sizeof(unsigned int);
2875     }
2876
2877     /* Send array of version  */
2878     for(i = 0;i < numoid; i++) {
2879       version = versionarry[i];
2880       *((unsigned short *)(&msg[1] + size)) = version;
2881       size += sizeof(unsigned short);
2882     }
2883
2884     *((unsigned int *)(&msg[1] + size)) = myIpAddr; 
2885     size += sizeof(unsigned int);
2886     *((unsigned int *)(&msg[1] + size)) = threadid;
2887 #ifdef RECOVERY
2888     waitThreadMid = waitmid;
2889     waitThreadID = threadid;
2890     printf("%s -> This Thread is waiting for %s\n",__func__,midtoIPString(waitmid));
2891 #endif
2892
2893     size = 1 + numoid * (sizeof(unsigned int) + sizeof(unsigned short)) + 3 * sizeof(unsigned int);
2894     pthread_mutex_lock(&(ndata->threadnotify));
2895     send_data(psock, msg, size);
2896 #ifdef RECOVERY
2897     send_data(bsock, msg, size);
2898 #endif
2899     pthread_cond_wait(&(ndata->threadcond), &(ndata->threadnotify));
2900     pthread_mutex_unlock(&(ndata->threadnotify));
2901   }
2902
2903   pthread_cond_destroy(&threadcond);
2904   pthread_mutex_destroy(&threadnotify);
2905   free(ndata);
2906   close(psock);
2907
2908 #ifdef RECOVERY
2909   close(bsock);
2910 #endif
2911
2912   return status;
2913 }
2914
2915 void threadNotify(unsigned int oid, unsigned short version, unsigned int tid) {
2916   notifydata_t *ndata;
2917   int i, objIsFound = 0, index = -1;
2918   void *ptr;
2919 #ifdef DEBUG
2920   printf("%s -> oid = %d   vesion = %d    tid = %d\n",__func__,oid,version,tid);
2921 #endif
2922
2923   //Look up the tid and call the corresponding pthread_cond_signal
2924   if((ndata = notifyhashSearch(tid)) == NULL) {
2925     printf("threadnotify(): No such threadid is present %s, %d\n", __FILE__, __LINE__);
2926     return;
2927   } else  {
2928     for(i = 0; i < ndata->numoid; i++) {
2929       if(ndata->oidarry[i] == oid) {
2930         objIsFound = 1;
2931         index = i;
2932
2933       }
2934     }
2935     if(objIsFound == 0) {
2936       printf("threadNotify(): Oid not found %s, %d\n", __FILE__, __LINE__);
2937       return;
2938     } 
2939     else {
2940       if(version <= ndata->versionarry[index] && version >= 0) {
2941               printf("threadNotify(): New version %d has not changed since last version for oid = %d, %s, %d\n", version, oid, __FILE__, __LINE__);
2942         return;
2943       } else {
2944 #ifdef CACHE
2945         /* Clear from prefetch cache and free thread related data structure */
2946         if((ptr = prehashSearch(oid)) != NULL) {
2947           prehashRemove(oid);
2948         }
2949 #endif
2950         pthread_mutex_lock(&(ndata->threadnotify));
2951         pthread_cond_signal(&(ndata->threadcond));
2952         pthread_mutex_unlock(&(ndata->threadnotify));
2953       }
2954     }
2955   }
2956
2957 #ifdef DEBUG
2958   printf("%s -> Finished\n",__func__);
2959 #endif
2960   return;
2961 }
2962
2963 int notifyAll(threadlist_t **head, unsigned int oid, unsigned int version) {
2964   threadlist_t *ptr;
2965   unsigned int mid;
2966   struct sockaddr_in remoteAddr;
2967   char msg[1 + sizeof(unsigned short) + 2*sizeof(unsigned int)];
2968   int sock, status, size, bytesSent;
2969 #ifdef DEBUG
2970   printf("%s -> Entering \n",__func__);
2971 #endif
2972
2973   while(*head != NULL) {
2974     ptr = *head;
2975
2976     mid = ptr->mid;
2977 #ifdef DEBUG
2978     printf("%s -> trying to connect MID : %s\n",__func__,midtoIPString(mid));
2979 #endif
2980
2981     //create a socket connection to that machine
2982     if ((sock = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
2983       perror("notifyAll():socket()");
2984       return -1;
2985     }
2986
2987     bzero(&remoteAddr, sizeof(remoteAddr));
2988     remoteAddr.sin_family = AF_INET;
2989     remoteAddr.sin_port = htons(LISTEN_PORT);
2990     remoteAddr.sin_addr.s_addr = htonl(mid);
2991     //send Thread Notify response and threadid to that machine
2992     if (connect(sock, (struct sockaddr *)&remoteAddr, sizeof(remoteAddr)) < 0) {
2993       printf("notifyAll():error %d connecting to %s:%d\n", errno,
2994              inet_ntoa(remoteAddr.sin_addr), LISTEN_PORT);
2995       fflush(stdout);
2996       status = -1;
2997     } else {
2998 #ifdef DEBUG
2999       printf("%s -> connected\n",__func__);
3000 #endif
3001       bzero(msg, (1+sizeof(unsigned short) + 2*sizeof(unsigned int)));
3002       msg[0] = THREAD_NOTIFY_RESPONSE;
3003       *((unsigned int *)&msg[1]) = oid;
3004       size = sizeof(unsigned int);
3005       *((unsigned short *)(&msg[1]+ size)) = version;
3006       size+= sizeof(unsigned short);
3007       *((unsigned int *)(&msg[1]+ size)) = ptr->threadid;
3008
3009       size = 1 + 2*sizeof(unsigned int) + sizeof(unsigned short);
3010       send_data(sock, msg, size);
3011     }
3012     //close socket
3013     close(sock);
3014
3015     // Update head
3016     *head = ptr->next;
3017     free(ptr);
3018 #ifdef DEBUG
3019     printf("%s -> End notifying MID : %s\n",__func__,midtoIPString(mid));  
3020 #endif
3021   }
3022   return status;
3023 }
3024
3025
3026 void transAbort() {
3027 #ifdef ABORTREADERS
3028   removetransactionhash();
3029 #endif
3030   objstrDelete(t_cache);
3031   t_chashDelete();
3032 }
3033
3034 /* This function inserts necessary information into
3035  * a machine pile data structure */
3036 plistnode_t *pInsert(plistnode_t *pile, objheader_t *headeraddr, unsigned int mid, int num_objs) {
3037   plistnode_t *ptr, *tmp;
3038   int found = 0, offset = 0;
3039   char ip[16];
3040   tmp = pile;
3041   //Add oid into a machine that is already present in the pile linked list structure
3042   while(tmp != NULL) {
3043 //    printf("tmp->mid = [%s], mid = [%s]\n", midtoIPString(tmp->mid), midtoIPString(mid));
3044     if (tmp->mid == mid) {
3045       int tmpsize;
3046
3047                         if (STATUS(headeraddr) & NEW) {
3048                                 tmp->oidcreated[tmp->numcreated] = OID(headeraddr);
3049                                 tmp->numcreated++;
3050                                 GETSIZE(tmpsize, headeraddr);
3051                                 tmp->sum_bytes += sizeof(objheader_t) + tmpsize;
3052                         } else if (STATUS(headeraddr) & DIRTY) {
3053                                 tmp->oidmod[tmp->nummod] = OID(headeraddr);
3054                                 tmp->nummod++;
3055                                 GETSIZE(tmpsize, headeraddr);
3056                                 tmp->sum_bytes += sizeof(objheader_t) + tmpsize;
3057                                 /*      midtoIP(tmp->mid, ip);
3058                                         printf("pp; Redo? pile->mid: %s, oid: %d, header version: %d\n", ip, OID(headeraddr), headeraddr->version);*/
3059                         } else {
3060                                 offset = (sizeof(unsigned int) + sizeof(short)) * tmp->numread;
3061                                 *((unsigned int *)(((char *)tmp->objread) + offset))=OID(headeraddr);
3062                                 offset += sizeof(unsigned int);
3063                                 *((short *)(((char *)tmp->objread) + offset)) = headeraddr->version;
3064                                 tmp->numread++;
3065       }
3066       found = 1;
3067       break;
3068     }
3069     tmp = tmp->next;
3070   }
3071   //Add oid for any new machine
3072   if (!found) {
3073     int tmpsize;
3074     if((ptr = pCreate(num_objs)) == NULL) {
3075       return NULL;
3076     }
3077     ptr->mid = mid;
3078     if (STATUS(headeraddr) & NEW) {
3079       ptr->oidcreated[ptr->numcreated] = OID(headeraddr);
3080       ptr->numcreated++;
3081       GETSIZE(tmpsize, headeraddr);
3082       ptr->sum_bytes += sizeof(objheader_t) + tmpsize;
3083           } else if (STATUS(headeraddr) & DIRTY) {
3084       ptr->oidmod[ptr->nummod] = OID(headeraddr);
3085       ptr->nummod++;
3086       GETSIZE(tmpsize, headeraddr);
3087       ptr->sum_bytes += sizeof(objheader_t) + tmpsize;
3088     } else {
3089       *((unsigned int *)ptr->objread)=OID(headeraddr);
3090       offset = sizeof(unsigned int);
3091       *((short *)(((char *)ptr->objread) + offset)) = headeraddr->version;
3092       ptr->numread++;
3093     }
3094     ptr->next = pile;
3095     pile = ptr;
3096   }
3097
3098   /* Clear Flags */
3099   STATUS(headeraddr) = 0;
3100
3101   return pile;
3102 }
3103
3104 plistnode_t *sortPiles(plistnode_t *pileptr) {
3105         plistnode_t *head, *ptr, *tail;
3106         head = pileptr;
3107         ptr = pileptr;
3108         /* Get tail pointer */
3109         while(ptr!= NULL) {
3110                 tail = ptr;
3111                 ptr = ptr->next;
3112         }
3113         ptr = pileptr;
3114         plistnode_t *prev = pileptr;
3115         /* Arrange local machine processing at the end of the pile list */
3116         while(ptr != NULL) {
3117                 if(ptr != tail) {
3118                         if(ptr->mid == myIpAddr && (prev != pileptr)) {
3119                                 prev->next = ptr->next;
3120                                 ptr->next = NULL;
3121                                 tail->next = ptr;
3122                                 return pileptr;
3123                         }
3124                         if((ptr->mid == myIpAddr) && (prev == pileptr)) {
3125                                 prev = ptr->next;
3126                                 ptr->next = NULL;
3127                                 tail->next = ptr;
3128                                 return prev;
3129                         }
3130                         prev = ptr;
3131                 }
3132                 ptr = ptr->next;
3133         }
3134         return pileptr;
3135 }
3136
3137 #ifdef RECOVERY
3138 /* Paxo Algorithm: 
3139  * Executes when the known leader has failed.  
3140  * Guarantees consensus on next leader among all live hosts.  */
3141 int paxos()
3142 {
3143         int origRound = paxosRound;
3144         origleader = leader;
3145         int ret = -1;
3146 #ifdef DEBUG
3147         printf(">> Debug : Starting paxos..\n");
3148 #endif
3149
3150         do {
3151                 ret = paxosPrepare();           // phase 1
3152                 if (ret == 1) {
3153                         ret = paxosAccept();    // phase 2
3154                         if (ret == 1) {
3155                                 paxosLearn();                           // phase 3
3156                                 break;
3157                         }
3158                 }
3159                 // Paxos not successful; wait and retry if new leader is not yet slected
3160                 sleep(WAIT_TIME);               
3161                 if(paxosRound != origRound)
3162                         break;
3163         } while (ret == -1);
3164
3165 #ifdef DEBUG
3166         printf("\n>> Debug : Leader : [%s]\t[%u]\n", midtoIPString(leader),leader);
3167 #endif
3168
3169         return ret;
3170 }
3171
3172 int paxosPrepare()
3173 {
3174         char control;
3175         //int origleader = leader;
3176         int remote_n;
3177         int remote_v;
3178         int tmp_n = -1;
3179         int cnt = 0;
3180         int sd;
3181         int i;
3182         temp_v_a = v_a;
3183         my_n = n_h + 1;
3184
3185 #ifdef DEBUG
3186         printf("[Prepare]...\n");
3187 #endif
3188
3189         temp_v_a = myIpAddr;    // if no other value is proposed, make this machine the new leader
3190
3191         for (i = 0; i < numHostsInSystem; ++i) {
3192                 control = PAXOS_PREPARE;
3193                 if(!liveHosts[i]) 
3194                         continue;
3195
3196                 if ((sd = getSockWithLock(transRequestSockPool, hostIpAddrs[i])) < 0) {
3197                         printf("paxosPrepare(): socket create error\n");
3198                         continue;
3199                 }
3200 #ifdef DEBUG
3201                 printf("%s-> Send PAXOS_PREPARE to mid [%s] with my_n=%d\n", __func__, midtoIPString(hostIpAddrs[i]), my_n);
3202 #endif
3203                 send_data(sd, &control, sizeof(char));  
3204                 send_data(sd, &my_n, sizeof(int));
3205                 int timeout = recv_data(sd, &control, sizeof(char));
3206                 if ((sd == -1) || (timeout < 0)) {
3207 #ifdef DEBUG
3208                         printf("%s-> timeout to machine [%s]\n", __func__, midtoIPString(hostIpAddrs[i]));
3209 #endif
3210                         continue;
3211                 }
3212
3213                 switch (control) {
3214                         case PAXOS_PREPARE_OK:
3215                                 cnt++;
3216                                 recv_data(sd, &remote_n, sizeof(int));
3217                                 recv_data(sd, &remote_v, sizeof(int));
3218 #ifdef DEBUG
3219                                 printf("%s-> Received PAXOS_PREPARE_OK from mindex [%d] with remote_v=%s\n", __func__, i, midtoIPString(remote_v));
3220 #endif
3221                                 if(remote_v != origleader) {
3222                                         if (remote_n > tmp_n) {
3223                                                 tmp_n = remote_n;
3224                                                 temp_v_a = remote_v;
3225                                         }
3226                                 }
3227                                 break;
3228                         case PAXOS_PREPARE_REJECT:
3229                                 break;
3230                 }
3231     
3232     freeSockWithLock(transRequestSockPool,hostIpAddrs[i],sd);
3233         }
3234
3235 #ifdef DEBUG
3236         printf("%s-> cnt:%d, numLiveHostsInSystem:%d\n", __func__, cnt, numLiveHostsInSystem);
3237 #endif
3238
3239         if (cnt >= (numLiveHostsInSystem / 2)) {                // majority of OK replies
3240                 return 1;
3241                 }
3242                 else {
3243                         return -1;
3244                 }
3245 }
3246
3247 int paxosAccept()
3248 {
3249         char control;
3250         int i;
3251         int cnt = 0;
3252         int sd;
3253         int remote_v = temp_v_a;
3254
3255 #ifdef DEBUG
3256         printf("[Accept]...\n");
3257 #endif
3258         for (i = 0; i < numHostsInSystem; ++i) {
3259                 control = PAXOS_ACCEPT;
3260           
3261     if(!liveHosts[i]) 
3262                         continue;
3263
3264         if ((sd = getSockWithLock(transRequestSockPool, hostIpAddrs[i])) < 0) {
3265                         printf("paxosAccept(): socket create error\n");
3266                         continue;
3267                 }
3268
3269                 send_data(sd, &control, sizeof(char));
3270                 send_data(sd, &my_n, sizeof(int));
3271                 send_data(sd, &remote_v, sizeof(int));
3272
3273                 int timeout = recv_data(sd, &control, sizeof(char));
3274                 if ((sd == -1) || (timeout < 0)) {
3275 #ifdef DEBUG
3276                         printf("%s-> timeout to machine [%s]\n", __func__, midtoIPString(hostIpAddrs[i]));
3277 #endif
3278                         continue;  
3279                 }
3280
3281                 switch (control) {
3282                         case PAXOS_ACCEPT_OK:
3283                                 cnt++;
3284                                 break;
3285                         case PAXOS_ACCEPT_REJECT:
3286                                 break;
3287                 }
3288 #ifdef DEBUG
3289                 printf(">> Debug : Accept - n_h [%d], n_a [%d], v_a [%s]\n", n_h, n_a, midtoIPString(v_a));
3290 #endif
3291     freeSockWithLock(transRequestSockPool,hostIpAddrs[i],sd);
3292         }
3293
3294         if (cnt >= (numLiveHostsInSystem / 2)) {
3295                 return 1;
3296         }
3297         else {
3298                 return -1;
3299         }
3300 }
3301
3302 void paxosLearn()
3303 {
3304         char control;
3305         int sd;
3306         int i;
3307
3308 #ifdef DEBUG
3309         printf("[Learn]...\n");
3310 #endif
3311
3312         control = PAXOS_LEARN;
3313
3314         for (i = 0; i < numHostsInSystem; ++i) {
3315                 if(!liveHosts[i]) 
3316                         continue;
3317                 if(hostIpAddrs[i] == myIpAddr)
3318                 {
3319                         leader = v_a;
3320                         paxosRound++;
3321 #ifdef DEBUG
3322                         printf("This is my leader!!!: [%s]\n", midtoIPString(leader));
3323 #endif
3324                         continue;
3325                 }
3326                 if ((sd = getSockWithLock(transRequestSockPool, hostIpAddrs[i])) < 0) {
3327                         continue;
3328                         //                      printf("paxosLearn(): socket create error, attemp\n");
3329                 }
3330
3331                 send_data(sd, &control, sizeof(char));
3332                 send_data(sd, &v_a, sizeof(int));
3333
3334     freeSockWithLock(transRequestSockPool,hostIpAddrs[i],sd);
3335
3336         }
3337         //return v_a;
3338 }
3339
3340
3341 void clearDeadThreadsNotification() 
3342 {
3343
3344 #ifdef DEBUG
3345   printf("%s -> Entered\n",__func__);
3346 #endif
3347 // clear all the threadnotify request first
3348   
3349   if(waitThreadID != -1) {
3350     printf("%s -> I was waitng for %s\n",__func__,midtoIPString(waitThreadMid));
3351     int waitThreadIndex = findHost(waitThreadMid);
3352     int i;
3353     notifydata_t *ndata;
3354
3355     if(liveHosts[waitThreadIndex] == 0) // the thread waiting for is dead
3356     {
3357       if((ndata = (notifydata_t*)notifyhashSearch(waitThreadID)) == NULL) {
3358         return;
3359       }
3360    
3361       for(i =0 ; i < ndata->numoid; i++) {
3362         clearNotifyList(ndata->oidarry[i]);  // clear thread object's notifylist
3363       }
3364
3365       pthread_mutex_lock(&(ndata->threadnotify));
3366       pthread_cond_signal(&(ndata->threadcond));
3367       pthread_mutex_unlock(&(ndata->threadnotify));
3368
3369       waitThreadMid = -1;
3370       waitThreadID = -1;
3371     }
3372   }
3373
3374 #ifdef DEBUG
3375   printf("%s -> Finished\n",__func__);
3376 #endif
3377 }
3378
3379 /* request the primary and the backup machines to clear
3380    thread obj's notify list */
3381 void reqClearNotifyList(unsigned int oid)
3382 {
3383   int psock,bsock,i;
3384   int mid,pmid,bmid;
3385   objheader_t *objheader;
3386   struct sockaddr_in premoteAddr, bremoteAddr;
3387   char msg[1 + sizeof(unsigned int)];
3388
3389   if((mid = lhashSearch(oid)) == 0) {
3390     printf("%s -> No such machine found for oid %x\n",__func__,oid);
3391     return;
3392   }
3393
3394   pmid = getPrimaryMachine(mid);
3395   bmid = getBackupMachine(mid);
3396
3397   if((psock = socket(AF_INET, SOCK_STREAM, 0)) < 0 ||
3398      (bsock = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
3399         perror("clearNotifyList() : socket()");
3400     return ;
3401   }
3402
3403   /* for primary machine */
3404   bzero(&premoteAddr, sizeof(premoteAddr));
3405   premoteAddr.sin_family = AF_INET;
3406   premoteAddr.sin_port = htons(LISTEN_PORT);
3407   premoteAddr.sin_addr.s_addr = htonl(pmid);
3408
3409   /* for backup machine */
3410   bzero(&bremoteAddr, sizeof(bremoteAddr));
3411   bremoteAddr.sin_family = AF_INET;
3412   bremoteAddr.sin_port = htons(LISTEN_PORT);
3413   bremoteAddr.sin_addr.s_addr = htonl(bmid);
3414
3415   /* send message to both the primary and the backup */
3416   if((connect(psock, (struct sockaddr *)&premoteAddr, sizeof(premoteAddr)) < 0) ||
3417      (connect(bsock, (struct sockaddr *)&bremoteAddr, sizeof(bremoteAddr)) < 0)) {
3418       printf("%s -> error in connecting\n",__func__);
3419       return;
3420   }
3421   else {
3422     printf("%s -> Pmid = %s\n",__func__,midtoIPString(pmid));
3423     printf("%s -> Bmid = %s\n",__func__,midtoIPString(bmid));
3424     
3425     msg[0] = CLEAR_NOTIFY_LIST;
3426     *((unsigned int *)(&msg[1])) = oid;
3427     
3428     send_data(psock, &msg, sizeof(char) + sizeof(unsigned int));
3429     send_data(bsock, &msg, sizeof(char) + sizeof(unsigned int)); 
3430   }
3431   
3432   close(psock);
3433   close(bsock);
3434   
3435 }
3436    
3437
3438 int checkiftheMachineDead(unsigned int mid) {
3439   int mIndex = findHost(mid);
3440   return getStatus(mIndex);
3441 }
3442
3443 #endif