add changes to release locks early on version mismatch and soft abort
[IRC.git] / Robust / src / Runtime / DSTM / interface / trans.c
1 #include "dstm.h"
2 #include "ip.h"
3 #include "machinepile.h"
4 #include "mlookup.h"
5 #include "llookup.h"
6 #include "plookup.h"
7 #include "prelookup.h"
8 #include "threadnotify.h"
9 #include "queue.h"
10 #include "addUdpEnhance.h"
11 #include "addPrefetchEnhance.h"
12 #include "gCollect.h"
13 #include "dsmlock.h"
14 #include "prefetch.h"
15 #ifdef COMPILER
16 #include "thread.h"
17 #endif
18 #ifdef ABORTREADERS
19 #include "abortreaders.h"
20 #endif
21 #include "trans.h"
22
23 #define NUM_THREADS 1
24 #define CONFIG_FILENAME "dstm.conf"
25
26 /* Thread transaction variables */
27
28 __thread objstr_t *t_cache;
29 __thread struct ___Object___ *revertlist;
30 #ifdef ABORTREADERS
31 __thread int t_abort;
32 __thread jmp_buf aborttrans;
33 #endif
34
35
36 /* Global Variables */
37 extern int classsize[];
38 pfcstats_t *evalPrefetch;
39 extern int numprefetchsites; //Global variable containing number of prefetch sites
40 extern pthread_mutex_t mainobjstore_mutex; // Mutex to lock main Object store
41 pthread_mutex_t prefetchcache_mutex; // Mutex to lock Prefetch Cache
42 pthread_mutexattr_t prefetchcache_mutex_attr; /* Attribute for lock to make it a recursive lock */
43 extern prehashtable_t pflookup; //Global Prefetch cache's lookup table
44 pthread_t wthreads[NUM_THREADS]; //Worker threads for working on the prefetch queue
45 pthread_t tPrefetch;            /* Primary Prefetch thread that processes the prefetch queue */
46 extern objstr_t *mainobjstore;
47 unsigned int myIpAddr;
48 unsigned int *hostIpAddrs;
49 int sizeOfHostArray;
50 int numHostsInSystem;
51 int myIndexInHostArray;
52 unsigned int oidsPerBlock;
53 unsigned int oidMin;
54 unsigned int oidMax;
55
56 sockPoolHashTable_t *transReadSockPool;
57 sockPoolHashTable_t *transPrefetchSockPool;
58 sockPoolHashTable_t *transRequestSockPool;
59 pthread_mutex_t notifymutex;
60 pthread_mutex_t atomicObjLock;
61
62 /***********************************
63  * Global Variables for statistics
64  **********************************/
65 int numTransCommit = 0;
66 int numTransAbort = 0;
67 int nchashSearch = 0;
68 int nmhashSearch = 0;
69 int nprehashSearch = 0;
70 int nRemoteSend = 0;
71 int nSoftAbort = 0;
72 int bytesSent = 0;
73 int bytesRecv = 0;
74 int totalObjSize = 0;
75 int sendRemoteReq = 0;
76 int getResponse = 0;
77
78 void printhex(unsigned char *, int);
79 plistnode_t *createPiles();
80 plistnode_t *sortPiles(plistnode_t *pileptr);
81
82 //#define LOGEVENTS
83 #ifdef LOGEVENTS
84 char bigarray[16*1024*1024];
85 int bigindex=0;
86 #define LOGEVENT(x) { \
87     int tmp=bigindex++;                         \
88     bigarray[tmp]=x;                            \
89   }
90 #else
91 #define LOGEVENT(x)
92 #endif
93
94 /*******************************
95 * Send and Recv function calls
96 *******************************/
97 void send_data(int fd, void *buf, int buflen) {
98   char *buffer = (char *)(buf);
99   int size = buflen;
100   int numbytes;
101   while (size > 0) {
102     numbytes = send(fd, buffer, size, MSG_NOSIGNAL);
103     bytesSent = bytesSent + numbytes;
104     if (numbytes == -1) {
105       perror("send");
106       exit(0);
107     }
108     buffer += numbytes;
109     size -= numbytes;
110   }
111 }
112
113 void send_buf(int fd, struct writestruct * sendbuffer, void *buffer, int buflen) {
114   if (buflen+sendbuffer->offset>WMAXBUF) {
115     send_data(fd, sendbuffer->buf, sendbuffer->offset);
116     sendbuffer->offset=0;
117     send_data(fd, buffer, buflen);
118     return;
119   }
120   memcpy(&sendbuffer->buf[sendbuffer->offset], buffer, buflen);
121   sendbuffer->offset+=buflen;
122   if (sendbuffer->offset>WTOP) {
123     send_data(fd, sendbuffer->buf, sendbuffer->offset);
124     sendbuffer->offset=0;
125   }
126 }
127
128 void forcesend_buf(int fd, struct writestruct * sendbuffer, void *buffer, int buflen) {
129   if (buflen+sendbuffer->offset>WMAXBUF) {
130     send_data(fd, sendbuffer->buf, sendbuffer->offset);
131     sendbuffer->offset=0;
132     send_data(fd, buffer, buflen);
133     return;
134   }
135   memcpy(&sendbuffer->buf[sendbuffer->offset], buffer, buflen);
136   sendbuffer->offset+=buflen;
137   send_data(fd, sendbuffer->buf, sendbuffer->offset);
138   sendbuffer->offset=0;
139 }
140
141 int recvw(int fd, void *buf, int len, int flags) {
142   return recv(fd, buf, len, flags);
143 }
144  
145 void recv_data_buf(int fd, struct readstruct * readbuffer, void *buffer, int buflen) {
146   char *buf=(char *)buffer;
147   int numbytes=readbuffer->head-readbuffer->tail;
148   if (numbytes>buflen)
149     numbytes=buflen;
150   if (numbytes>0) {
151     memcpy(buf, &readbuffer->buf[readbuffer->tail], numbytes);
152     readbuffer->tail+=numbytes;
153     buflen-=numbytes;
154     buf+=numbytes;
155   }
156   if (buflen==0) {
157     return;
158   }
159   if (buflen>=MAXBUF) {
160     recv_data(fd, buf, buflen);
161     return;
162   }
163   
164   int maxbuf=MAXBUF;
165   int obufflen=buflen;
166   readbuffer->head=0;
167   
168   while (buflen > 0) {
169     int numbytes = recvw(fd, &readbuffer->buf[readbuffer->head], maxbuf, 0);
170     if (numbytes == -1) {
171       perror("recv");
172       exit(0);
173     }
174     bytesRecv+=numbytes;
175     buflen-=numbytes;
176     readbuffer->head+=numbytes;
177     maxbuf-=numbytes;
178   }
179   memcpy(buf,readbuffer->buf,obufflen);
180   readbuffer->tail=obufflen;
181 }
182
183 int recv_data_errorcode_buf(int fd, struct readstruct * readbuffer, void *buffer, int buflen) {
184   char *buf=(char *)buffer;
185   //now tail<=head
186   int numbytes=readbuffer->head-readbuffer->tail;
187   if (numbytes>buflen)
188     numbytes=buflen;
189   if (numbytes>0) {
190     memcpy(buf, &readbuffer->buf[readbuffer->tail], numbytes);
191     readbuffer->tail+=numbytes;
192     buflen-=numbytes;
193     buf+=numbytes;
194   }
195   if (buflen==0)
196     return 1;
197
198   if (buflen>=MAXBUF) {
199     return recv_data_errorcode(fd, buf, buflen);
200   }
201
202   int maxbuf=MAXBUF;
203   int obufflen=buflen;
204   readbuffer->head=0;
205   
206   while (buflen > 0) {
207     int numbytes = recvw(fd, &readbuffer->buf[readbuffer->head], maxbuf, 0);
208     if (numbytes ==0) {
209       return 0;
210     }
211     if (numbytes==-1) {
212       perror("recvbuf");
213       return -1;
214     }
215     bytesRecv+=numbytes;
216     buflen-=numbytes;
217     readbuffer->head+=numbytes;
218     maxbuf-=numbytes;
219   }
220   memcpy(buf,readbuffer->buf,obufflen);
221   readbuffer->tail=obufflen;
222   return 1;
223 }
224
225
226 void recv_data(int fd, void *buf, int buflen) {
227   char *buffer = (char *)(buf);
228   int size = buflen;
229   int numbytes;
230   while (size > 0) {
231     numbytes = recvw(fd, buffer, size, 0);
232     bytesRecv = bytesRecv + numbytes;
233     if (numbytes == -1) {
234       perror("recv");
235       exit(0);
236     }
237     buffer += numbytes;
238     size -= numbytes;
239   }
240 }
241
242 int recv_data_errorcode(int fd, void *buf, int buflen) {
243   char *buffer = (char *)(buf);
244   int size = buflen;
245   int numbytes;
246   while (size > 0) {
247     numbytes = recvw(fd, buffer, size, 0);
248     if (numbytes==0)
249       return 0;
250     if (numbytes == -1) {
251       perror("recv");
252       return -1;
253     }
254     bytesRecv+=numbytes;
255     buffer += numbytes;
256     size -= numbytes;
257   }
258   return 1;
259 }
260
261 void printhex(unsigned char *ptr, int numBytes) {
262   int i;
263   for (i = 0; i < numBytes; i++) {
264     if (ptr[i] < 16)
265       printf("0%x ", ptr[i]);
266     else
267       printf("%x ", ptr[i]);
268   }
269   printf("\n");
270   return;
271 }
272
273 inline int arrayLength(int *array) {
274   int i;
275   for(i=0 ; array[i] != -1; i++)
276     ;
277   return i;
278 }
279
280 inline int findmax(int *array, int arraylength) {
281   int max, i;
282   max = array[0];
283   for(i = 0; i < arraylength; i++) {
284     if(array[i] > max) {
285       max = array[i];
286     }
287   }
288   return max;
289 }
290
291 //#define INLINEPREFETCH
292 #define PREFTHRESHOLD 4
293
294 /* This function is a prefetch call generated by the compiler that
295  * populates the shared primary prefetch queue*/
296 void prefetch(int siteid, int ntuples, unsigned int *oids, unsigned short *endoffsets, short *arrayfields) {
297   /* Allocate for the queue node*/
298   int qnodesize = 2*sizeof(int) + ntuples * (sizeof(unsigned short) + sizeof(unsigned int)) + endoffsets[ntuples - 1] * sizeof(short);
299   int len;
300 #ifdef INLINEPREFETCH
301   int attempted=0;
302   char *node;
303   do {
304   node=getmemory(qnodesize);
305   if (node==NULL&&attempted)
306     break;
307   if (node!=NULL) {
308 #else
309   char *node=getmemory(qnodesize);
310 #endif
311   int top=endoffsets[ntuples-1];
312
313   if (node==NULL) {
314     LOGEVENT('D');
315     return;
316   }
317   /* Set queue node values */
318
319   /* TODO: Remove this after testing */
320   evalPrefetch[siteid].callcount++;
321
322   *((int *)(node))=siteid;
323   *((int *)(node + sizeof(int))) = ntuples;
324   len = 2*sizeof(int);
325   memcpy(node+len, oids, ntuples*sizeof(unsigned int));
326   memcpy(node+len+ntuples*sizeof(unsigned int), endoffsets, ntuples*sizeof(unsigned short));
327   memcpy(node+len+ntuples*(sizeof(unsigned int)+sizeof(short)), arrayfields, top*sizeof(short));
328
329 #ifdef INLINEPREFETCH
330   movehead(qnodesize);
331   }
332   int numpref=numavailable();
333   attempted=1;
334
335   if (node==NULL && numpref!=0 || numpref==PREFTHRESHOLD) {
336     node=gettail();
337     prefetchpile_t *pilehead = foundLocal(node,numpref);
338     if (pilehead!=NULL) {
339       // Get sock from shared pool
340       
341       /* Send  Prefetch Request */
342       prefetchpile_t *ptr = pilehead;
343       while(ptr != NULL) {
344         int sd = getSock2(transPrefetchSockPool, ptr->mid);
345         sendPrefetchReq(ptr, sd);
346         ptr = ptr->next;
347       }
348       
349       mcdealloc(pilehead);
350       resetqueue();
351     }
352   }//end do prefetch if condition
353   } while(node==NULL);
354 #else
355   /* Lock and insert into primary prefetch queue */
356   movehead(qnodesize);
357 #endif
358 }
359
360 /* This function starts up the transaction runtime. */
361 int dstmStartup(const char * option) {
362   pthread_t thread_Listen, udp_thread_Listen;
363   pthread_attr_t attr;
364   int master=option!=NULL && strcmp(option, "master")==0;
365   int fd;
366   int udpfd;
367
368   if (processConfigFile() != 0)
369     return 0; //TODO: return error value, cause main program to exit
370 #ifdef COMPILER
371   if (!master)
372     threadcount--;
373 #endif
374
375 #ifdef TRANSSTATS
376   printf("Trans stats is on\n");
377   fflush(stdout);
378 #endif
379 #ifdef ABORTREADERS
380   initreaderlist();
381 #endif
382
383   //Initialize socket pool
384   transReadSockPool = createSockPool(transReadSockPool, DEFAULTSOCKPOOLSIZE);
385   transPrefetchSockPool = createSockPool(transPrefetchSockPool, DEFAULTSOCKPOOLSIZE);
386   transRequestSockPool = createSockPool(transRequestSockPool, DEFAULTSOCKPOOLSIZE);
387
388   dstmInit();
389   transInit();
390
391   fd=startlistening();
392   pthread_attr_init(&attr);
393   pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
394 #ifdef CACHE
395   udpfd = udpInit();
396   pthread_create(&udp_thread_Listen, &attr, udpListenBroadcast, (void*)udpfd);
397 #endif
398   if (master) {
399     pthread_create(&thread_Listen, &attr, dstmListen, (void*)fd);
400     return 1;
401   } else {
402     dstmListen((void *)fd);
403     return 0;
404   }
405 }
406
407 //TODO Use this later
408 void *pCacheAlloc(objstr_t *store, unsigned int size) {
409   void *tmp;
410   objstr_t *ptr;
411   ptr = store;
412   int success = 0;
413
414   while(ptr->next != NULL) {
415     /* check if store is empty */
416     if(((unsigned int)ptr->top - (unsigned int)ptr - sizeof(objstr_t) + size) <= ptr->size) {
417       tmp = ptr->top;
418       ptr->top += size;
419       success = 1;
420       return tmp;
421     } else {
422       ptr = ptr->next;
423     }
424   }
425
426   if(success == 0) {
427     return NULL;
428   }
429 }
430
431 /* This function initiates the prefetch thread A queue is shared
432  * between the main thread of execution and the prefetch thread to
433  * process the prefetch call Call from compiler populates the shared
434  * queue with prefetch requests while prefetch thread processes the
435  * prefetch requests */
436
437 void transInit() {
438   //Create and initialize prefetch cache structure
439 #ifdef CACHE
440   initializePCache();
441   if((evalPrefetch = initPrefetchStats()) == NULL) {
442     printf("%s() Error allocating memory at %s, %d\n", __func__, __FILE__, __LINE__);
443     exit(0);
444   }
445 #endif
446
447   /* Initialize attributes for mutex */
448   pthread_mutexattr_init(&prefetchcache_mutex_attr);
449   pthread_mutexattr_settype(&prefetchcache_mutex_attr, PTHREAD_MUTEX_RECURSIVE_NP);
450
451   pthread_mutex_init(&prefetchcache_mutex, &prefetchcache_mutex_attr);
452   pthread_mutex_init(&notifymutex, NULL);
453   pthread_mutex_init(&atomicObjLock, NULL);
454 #ifdef CACHE
455   //Create prefetch cache lookup table
456   if(prehashCreate(PHASH_SIZE, PLOADFACTOR)) {
457     printf("ERROR\n");
458     return; //Failure
459   }
460
461   //Initialize primary shared queue
462   queueInit();
463   //Initialize machine pile w/prefetch oids and offsets shared queue
464   mcpileqInit();
465
466   //Create the primary prefetch thread
467   int retval;
468 #ifdef RANGEPREFETCH
469   do {
470     retval=pthread_create(&tPrefetch, NULL, transPrefetchNew, NULL);
471   } while(retval!=0);
472 #else
473 #ifndef INLINEPREFETCH
474   do {
475     retval=pthread_create(&tPrefetch, NULL, transPrefetch, NULL);
476   } while(retval!=0);
477 #endif
478 #endif
479 #ifndef INLINEPREFETCH
480   pthread_detach(tPrefetch);
481 #endif
482 #endif
483 }
484
485 /* This function stops the threads spawned */
486 void transExit() {
487 #ifdef CACHE
488   int t;
489   pthread_cancel(tPrefetch);
490   for(t = 0; t < NUM_THREADS; t++)
491     pthread_cancel(wthreads[t]);
492 #endif
493
494   return;
495 }
496
497 /* This functions inserts randowm wait delays in the order of msec
498  * Mostly used when transaction commits retry*/
499 void randomdelay() {
500   struct timespec req;
501   time_t t;
502
503   t = time(NULL);
504   req.tv_sec = 0;
505   req.tv_nsec = (long)(1000 + (t%10000)); //1-11 microsec
506   nanosleep(&req, NULL);
507   return;
508 }
509
510 /* This function initializes things required in the transaction start*/
511 void transStart() {
512   t_cache = objstrCreate(1048576);
513   t_chashCreate(CHASH_SIZE, CLOADFACTOR);
514   revertlist=NULL;
515 #ifdef ABORTREADERS
516   t_abort=0;
517 #endif
518 }
519
520 // Search for an address for a given oid                                                                               
521 /*#define INLINE    inline __attribute__((always_inline))
522
523 INLINE void * chashSearchI(chashtable_t *table, unsigned int key) {
524   //REMOVE HASH FUNCTION CALL TO MAKE SURE IT IS INLINED HERE                                                          
525   chashlistnode_t *node = &table->table[(key & table->mask)>>1];
526
527   do {
528     if(node->key == key) {
529       return node->val;
530     }
531     node = node->next;
532   } while(node != NULL);
533
534   return NULL;
535   }*/
536
537
538
539
540 /* This function finds the location of the objects involved in a transaction
541  * and returns the pointer to the object if found in a remote location */
542 __attribute__((pure)) objheader_t *transRead(unsigned int oid) {
543   unsigned int machinenumber;
544   objheader_t *tmp, *objheader;
545   objheader_t *objcopy;
546   int size;
547   void *buf;
548   chashlistnode_t *node;
549
550   if(oid == 0) {
551     return NULL;
552   }
553   
554
555   node= &c_table[(oid & c_mask)>>1];
556   do {
557     if(node->key == oid) {
558 #ifdef TRANSSTATS
559     nchashSearch++;
560 #endif
561 #ifdef COMPILER
562     return &((objheader_t*)node->val)[1];
563 #else
564     return node->val;
565 #endif
566     }
567     node = node->next;
568   } while(node != NULL);
569   
570
571   /*  
572   if((objheader = chashSearchI(record->lookupTable, oid)) != NULL) {
573 #ifdef TRANSSTATS
574     nchashSearch++;
575 #endif
576 #ifdef COMPILER
577     return &objheader[1];
578 #else
579     return objheader;
580 #endif
581   } else 
582   */
583
584 #ifdef ABORTREADERS
585   if (t_abort) {
586     //abort this transaction
587     //printf("ABORTING\n");
588     removetransactionhash();
589     objstrDelete(t_cache);
590     t_chashDelete();
591     _longjmp(aborttrans,1);
592   } else
593     addtransaction(oid);
594 #endif
595
596   if ((objheader = (objheader_t *) mhashSearch(oid)) != NULL) {
597 #ifdef TRANSSTATS
598     nmhashSearch++;
599 #endif
600     /* Look up in machine lookup table  and copy  into cache*/
601     GETSIZE(size, objheader);
602     size += sizeof(objheader_t);
603     objcopy = (objheader_t *) objstrAlloc(&t_cache, size);
604     memcpy(objcopy, objheader, size);
605     /* Insert into cache's lookup table */
606     STATUS(objcopy)=0;
607     t_chashInsert(OID(objheader), objcopy);
608 #ifdef COMPILER
609     return &objcopy[1];
610 #else
611     return objcopy;
612 #endif
613   } else {
614 #ifdef CACHE
615     if((tmp = (objheader_t *) prehashSearch(oid)) != NULL) {
616 #ifdef TRANSSTATS
617       nprehashSearch++;
618 #endif
619       /* Look up in prefetch cache */
620       GETSIZE(size, tmp);
621       size+=sizeof(objheader_t);
622       objcopy = (objheader_t *) objstrAlloc(&t_cache, size);
623       memcpy(objcopy, tmp, size);
624       /* Insert into cache's lookup table */
625       t_chashInsert(OID(tmp), objcopy);
626 #ifdef COMPILER
627       return &objcopy[1];
628 #else
629       return objcopy;
630 #endif
631     }
632 #endif
633     /* Get the object from the remote location */
634     if((machinenumber = lhashSearch(oid)) == 0) {
635       printf("Error: %s() No machine found for oid =% %s,%dx\n",__func__, machinenumber, __FILE__, __LINE__);
636       return NULL;
637     }
638     objcopy = getRemoteObj(machinenumber, oid);
639
640     if(objcopy == NULL) {
641       printf("Error: Object not found in Remote location %s, %d\n", __FILE__, __LINE__);
642       return NULL;
643     } else {
644 #ifdef TRANSSTATS
645       nRemoteSend++;
646 #endif
647 #ifdef COMPILER
648 #ifdef CACHE
649       //Copy object to prefetch cache
650       pthread_mutex_lock(&prefetchcache_mutex);
651       objheader_t *headerObj;
652       int size;
653       GETSIZE(size, objcopy);
654       if((headerObj = prefetchobjstrAlloc(size + sizeof(objheader_t))) == NULL) {
655         printf("%s(): Error in getting memory from prefetch cache at %s, %d\n", __func__,
656             __FILE__, __LINE__);
657         pthread_mutex_unlock(&prefetchcache_mutex);
658         return NULL;
659       }
660       pthread_mutex_unlock(&prefetchcache_mutex);
661       memcpy(headerObj, objcopy, size+sizeof(objheader_t));
662       //make an entry in prefetch lookup hashtable
663       void *oldptr;
664       if((oldptr = prehashSearch(oid)) != NULL) {
665         prehashRemove(oid);
666         prehashInsert(oid, headerObj);
667       } else {
668         prehashInsert(oid, headerObj);
669       }
670       LOGEVENT('B');
671 #endif
672       return &objcopy[1];
673 #else
674       return objcopy;
675 #endif
676     }
677   }
678 }
679
680
681 /* This function finds the location of the objects involved in a transaction
682  * and returns the pointer to the object if found in a remote location */
683 __attribute__((pure)) objheader_t *transRead2(unsigned int oid) {
684   unsigned int machinenumber;
685   objheader_t *tmp, *objheader;
686   objheader_t *objcopy;
687   int size;
688
689 #ifdef ABORTREADERS
690   if (t_abort) {
691     //abort this transaction
692     //printf("ABORTING\n");
693     removetransactionhash();
694     objstrDelete(t_cache);
695     t_chashDelete();
696     _longjmp(aborttrans,1);
697   } else
698     addtransaction(oid);
699 #endif
700
701   if ((objheader = (objheader_t *) mhashSearch(oid)) != NULL) {
702 #ifdef TRANSSTATS
703     nmhashSearch++;
704 #endif
705     /* Look up in machine lookup table  and copy  into cache*/
706     GETSIZE(size, objheader);
707     size += sizeof(objheader_t);
708     objcopy = (objheader_t *) objstrAlloc(&t_cache, size);
709     memcpy(objcopy, objheader, size);
710     /* Insert into cache's lookup table */
711     STATUS(objcopy)=0;
712     t_chashInsert(OID(objheader), objcopy);
713 #ifdef COMPILER
714     return &objcopy[1];
715 #else
716     return objcopy;
717 #endif
718   } else {
719 #ifdef CACHE
720     if((tmp = (objheader_t *) prehashSearch(oid)) != NULL) {
721 #ifdef TRANSSTATS
722       LOGEVENT('P')
723       nprehashSearch++;
724 #endif
725       /* Look up in prefetch cache */
726       GETSIZE(size, tmp);
727       size+=sizeof(objheader_t);
728       objcopy = (objheader_t *) objstrAlloc(&t_cache, size);
729       memcpy(objcopy, tmp, size);
730       /* Insert into cache's lookup table */
731       t_chashInsert(OID(tmp), objcopy);
732 #ifdef COMPILER
733       return &objcopy[1];
734 #else
735       return objcopy;
736 #endif
737     }
738 #endif
739     /* Get the object from the remote location */
740     if((machinenumber = lhashSearch(oid)) == 0) {
741       printf("Error: %s() No machine found for oid =% %s,%dx\n",__func__, machinenumber, __FILE__, __LINE__);
742       return NULL;
743     }
744     objcopy = getRemoteObj(machinenumber, oid);
745
746     if(objcopy == NULL) {
747       printf("Error: Object not found in Remote location %s, %d\n", __FILE__, __LINE__);
748       return NULL;
749     } else {
750 #ifdef TRANSSTATS
751
752       LOGEVENT('R');
753       nRemoteSend++;
754 #endif
755 #ifdef COMPILER
756 #ifdef CACHE
757       //Copy object to prefetch cache
758       pthread_mutex_lock(&prefetchcache_mutex);
759       objheader_t *headerObj;
760       int size;
761       GETSIZE(size, objcopy);
762       if((headerObj = prefetchobjstrAlloc(size+sizeof(objheader_t))) == NULL) {
763         printf("%s(): Error in getting memory from prefetch cache at %s, %d\n", __func__,
764             __FILE__, __LINE__);
765         pthread_mutex_unlock(&prefetchcache_mutex);
766         return NULL;
767       }
768       pthread_mutex_unlock(&prefetchcache_mutex);
769       memcpy(headerObj, objcopy, size+sizeof(objheader_t));
770       //make an entry in prefetch lookup hashtable
771       void *oldptr;
772       if((oldptr = prehashSearch(oid)) != NULL) {
773         prehashRemove(oid);
774         prehashInsert(oid, headerObj);
775       } else {
776         prehashInsert(oid, headerObj);
777       }
778       LOGEVENT('B');
779 #endif
780       return &objcopy[1];
781 #else
782       return objcopy;
783 #endif
784     }
785   }
786 }
787
788 /* This function creates objects in the transaction record */
789 objheader_t *transCreateObj(unsigned int size) {
790   objheader_t *tmp = (objheader_t *) objstrAlloc(&t_cache, (sizeof(objheader_t) + size));
791   OID(tmp) = getNewOID();
792   tmp->version = 1;
793   tmp->rcount = 1;
794   STATUS(tmp) = NEW;
795   t_chashInsert(OID(tmp), tmp);
796
797 #ifdef COMPILER
798   return &tmp[1]; //want space after object header
799 #else
800   return tmp;
801 #endif
802 }
803
804 #if 1
805 /* This function creates machine piles based on all machines involved in a
806  * transaction commit request */
807 plistnode_t *createPiles() {
808   int i;
809   plistnode_t *pile = NULL;
810   unsigned int machinenum;
811   objheader_t *headeraddr;
812   chashlistnode_t * ptr = c_table;
813   /* Represents number of bins in the chash table */
814   unsigned int size = c_size;
815
816   for(i = 0; i < size ; i++) {
817     chashlistnode_t * curr = &ptr[i];
818     /* Inner loop to traverse the linked list of the cache lookupTable */
819     while(curr != NULL) {
820       //if the first bin in hash table is empty
821       if(curr->key == 0)
822         break;
823       headeraddr=(objheader_t *) curr->val;
824
825       //Get machine location for object id (and whether local or not)
826       if (STATUS(headeraddr) & NEW || (mhashSearch(curr->key) != NULL)) {
827         machinenum = myIpAddr;
828       } else if ((machinenum = lhashSearch(curr->key)) == 0) {
829         printf("Error: No such machine %s, %d\n", __FILE__, __LINE__);
830         return NULL;
831       }
832
833       //Make machine groups
834       pile = pInsert(pile, headeraddr, machinenum, c_numelements);
835       curr = curr->next;
836     }
837   }
838   return pile;
839 }
840 #else
841 /* This function creates machine piles based on all machines involved in a
842  * transaction commit request */
843 plistnode_t *createPiles() {
844   int i;
845   plistnode_t *pile = NULL;
846   unsigned int machinenum;
847   objheader_t *headeraddr;
848   struct chashentry * ptr = c_table;
849   /* Represents number of bins in the chash table */
850   unsigned int size = c_size;
851
852   for(i = 0; i < size ; i++) {
853     struct chashentry * curr = & ptr[i];
854     /* Inner loop to traverse the linked list of the cache lookupTable */
855     //if the first bin in hash table is empty
856     if(curr->key == 0)
857       continue;
858     headeraddr=(objheader_t *) curr->ptr;
859
860     //Get machine location for object id (and whether local or not)
861     if (STATUS(headeraddr) & NEW || (mhashSearch(curr->key) != NULL)) {
862       machinenum = myIpAddr;
863     } else if ((machinenum = lhashSearch(curr->key)) == 0) {
864       printf("Error: No such machine %s, %d\n", __FILE__, __LINE__);
865       return NULL;
866     }
867
868     //Make machine groups
869     pile = pInsert(pile, headeraddr, machinenum, c_numelements);
870   }
871   return pile;
872 }
873 #endif
874
875 /* This function initiates the transaction commit process
876  * Spawns threads for each of the new connections with Participants
877  * and creates new piles by calling the createPiles(),
878  * Sends a transrequest() to each remote machines for objects found remotely
879  * and calls handleLocalReq() to process objects found locally */
880 int transCommit() {
881   unsigned int tot_bytes_mod, *listmid;
882   plistnode_t *pile, *pile_ptr;
883   char treplyretry; /* keeps track of the common response that needs to be sent */
884   int firsttime=1;
885   trans_commit_data_t transinfo; /* keeps track of objs locked during transaction */
886   char finalResponse;
887
888 #ifdef LOGEVENTS
889   int iii;
890   for(iii=0;iii<bigindex;iii++) {
891     printf("%c", bigarray[iii]);
892   }
893 #endif
894
895 #ifdef ABORTREADERS
896   if (t_abort) {
897     //abort this transaction
898     /* Debug
899      * printf("ABORTING TRANSACTION AT COMMIT\n");
900      */
901     removetransactionhash();
902     objstrDelete(t_cache);
903     t_chashDelete();
904     return 1;
905   }
906 #endif
907
908
909   do {
910     treplyretry = 0;
911
912     /* Look through all the objects in the transaction record and make piles
913      * for each machine involved in the transaction*/
914     if (firsttime) {
915       pile_ptr = pile = createPiles();
916       pile_ptr = pile = sortPiles(pile);
917     } else {
918       pile = pile_ptr;
919     }
920     firsttime = 0;
921     /* Create the packet to be sent in TRANS_REQUEST */
922
923     /* Count the number of participants */
924     int pilecount;
925     pilecount = pCount(pile);
926
927     /* Create a list of machine ids(Participants) involved in transaction   */
928     listmid = calloc(pilecount, sizeof(unsigned int));
929     pListMid(pile, listmid);
930
931     /* Create a socket and getReplyCtrl array, initialize */
932     int socklist[pilecount];
933     int loopcount;
934     for(loopcount = 0 ; loopcount < pilecount; loopcount++)
935       socklist[loopcount] = 0;
936     char getReplyCtrl[pilecount];
937     for(loopcount = 0 ; loopcount < pilecount; loopcount++)
938       getReplyCtrl[loopcount] = 0;
939
940     /* Process each machine pile */
941     int sockindex = 0;
942     trans_req_data_t *tosend;
943     tosend = calloc(pilecount, sizeof(trans_req_data_t));
944     while(pile != NULL) {
945       tosend[sockindex].f.control = TRANS_REQUEST;
946       tosend[sockindex].f.mcount = pilecount;
947       tosend[sockindex].f.numread = pile->numread;
948       tosend[sockindex].f.nummod = pile->nummod;
949       tosend[sockindex].f.numcreated = pile->numcreated;
950       tosend[sockindex].f.sum_bytes = pile->sum_bytes;
951       tosend[sockindex].listmid = listmid;
952       tosend[sockindex].objread = pile->objread;
953       tosend[sockindex].oidmod = pile->oidmod;
954       tosend[sockindex].oidcreated = pile->oidcreated;
955       int sd = 0;
956       if(pile->mid != myIpAddr) {
957         if((sd = getSock2WithLock(transRequestSockPool, pile->mid)) < 0) {
958           printf("transRequest(): socket create error\n");
959           free(listmid);
960           free(tosend);
961           return 1;
962         }
963         socklist[sockindex] = sd;
964         /* Send bytes of data with TRANS_REQUEST control message */
965         send_data(sd, &(tosend[sockindex].f), sizeof(fixed_data_t));
966
967         /* Send list of machines involved in the transaction */
968         {
969           int size=sizeof(unsigned int)*(tosend[sockindex].f.mcount);
970           send_data(sd, tosend[sockindex].listmid, size);
971         }
972
973         /* Send oids and version number tuples for objects that are read */
974         {
975           int size=(sizeof(unsigned int)+sizeof(unsigned short))*(tosend[sockindex].f.numread);
976           send_data(sd, tosend[sockindex].objread, size);
977         }
978
979         /* Send objects that are modified */
980         void *modptr;
981         if((modptr = calloc(1, tosend[sockindex].f.sum_bytes)) == NULL) {
982           printf("Calloc error for modified objects %s, %d\n", __FILE__, __LINE__);
983           free(listmid);
984           free(tosend);
985           return 1;
986         }
987         int offset = 0;
988         int i;
989         for(i = 0; i < tosend[sockindex].f.nummod ; i++) {
990           int size;
991           objheader_t *headeraddr;
992           if((headeraddr = t_chashSearch(tosend[sockindex].oidmod[i])) == NULL) {
993             printf("%s() Error: No such oid %s, %d\n", __func__, __FILE__, __LINE__);
994             free(modptr);
995             free(listmid);
996             free(tosend);
997             return 1;
998           }
999           GETSIZE(size,headeraddr);
1000           size+=sizeof(objheader_t);
1001           memcpy(modptr+offset, headeraddr, size);
1002           offset+=size;
1003         }
1004         send_data(sd, modptr, tosend[sockindex].f.sum_bytes);
1005         free(modptr);
1006       } else { //handle request locally
1007         handleLocalReq(&tosend[sockindex], &transinfo, &getReplyCtrl[sockindex]);
1008       }
1009       sockindex++;
1010       pile = pile->next;
1011     } //end of pile processing
1012       /* Recv Ctrl msgs from all machines */
1013     int i;
1014     for(i = 0; i < pilecount; i++) {
1015       int sd = socklist[i];
1016       if(sd != 0) {
1017         char control;
1018         recv_data(sd, &control, sizeof(char));
1019         //Update common data structure with new ctrl msg
1020         getReplyCtrl[i] = control;
1021         /* Recv Objects if participant sends TRANS_DISAGREE */
1022 #ifdef CACHE
1023         if(control == TRANS_DISAGREE) {
1024           int length;
1025           recv_data(sd, &length, sizeof(int));
1026           void *newAddr;
1027           pthread_mutex_lock(&prefetchcache_mutex);
1028           if ((newAddr = prefetchobjstrAlloc((unsigned int)length)) == NULL) {
1029             printf("Error: %s() objstrAlloc error for copying into prefetch cache %s, %d\n", __func__, __FILE__, __LINE__);
1030             free(tosend);
1031             free(listmid);
1032             pthread_mutex_unlock(&prefetchcache_mutex);
1033             return 1;
1034           }
1035           pthread_mutex_unlock(&prefetchcache_mutex);
1036           recv_data(sd, newAddr, length);
1037           int offset = 0;
1038           while(length != 0) {
1039             unsigned int oidToPrefetch;
1040             objheader_t * header;
1041             header = (objheader_t *)(((char *)newAddr) + offset);
1042             oidToPrefetch = OID(header);
1043             STATUS(header)=0;
1044             int size = 0;
1045             GETSIZE(size, header);
1046             size += sizeof(objheader_t);
1047             //make an entry in prefetch hash table
1048             void *oldptr;
1049             if((oldptr = prehashSearch(oidToPrefetch)) != NULL) {
1050               prehashRemove(oidToPrefetch);
1051               prehashInsert(oidToPrefetch, header);
1052             } else {
1053               prehashInsert(oidToPrefetch, header);
1054             }
1055         LOGEVENT('E');
1056             length = length - size;
1057             offset += size;
1058           }
1059         } //end of receiving objs
1060 #endif
1061       }
1062     }
1063     /* Decide the final response */
1064     if((finalResponse = decideResponse(getReplyCtrl, &treplyretry, pilecount)) == 0) {
1065       printf("Error: %s() in updating prefetch cache %s, %d\n", __func__, __FILE__, __LINE__);
1066       free(tosend);
1067       free(listmid);
1068       return 1;
1069     }
1070
1071     /* Send responses to all machines */
1072     for(i = 0; i < pilecount; i++) {
1073       int sd = socklist[i];
1074       if(sd != 0) {
1075 #ifdef CACHE
1076         if(finalResponse == TRANS_COMMIT) {
1077           int retval;
1078           /* Update prefetch cache */
1079           if((retval = updatePrefetchCache(&(tosend[i]))) != 0) {
1080             printf("Error: %s() in updating prefetch cache %s, %d\n", __func__, __FILE__, __LINE__);
1081             free(tosend);
1082             free(listmid);
1083             return 1;
1084           }
1085
1086
1087           /* Invalidate objects in other machine cache */
1088           if(tosend[i].f.nummod > 0) {
1089             if((retval = invalidateObj(&(tosend[i]))) != 0) {
1090               printf("Error: %s() in invalidating Objects %s, %d\n", __func__, __FILE__, __LINE__);
1091               free(tosend);
1092               free(listmid);
1093               return 1;
1094             }
1095           }
1096 #ifdef ABORTREADERS
1097           removetransaction(tosend[i].oidmod,tosend[i].f.nummod);
1098           removethisreadtransaction(tosend[i].objread, tosend[i].f.numread);
1099 #endif
1100         }
1101 #ifdef ABORTREADERS
1102         else if (!treplyretry) {
1103           removethistransaction(tosend[i].oidmod,tosend[i].f.nummod);
1104           removethisreadtransaction(tosend[i].objread,tosend[i].f.numread);
1105         }
1106 #endif
1107 #endif
1108         send_data(sd, &finalResponse, sizeof(char));
1109       } else {
1110         /* Complete local processing */
1111         doLocalProcess(finalResponse, &(tosend[i]), &transinfo);
1112 #ifdef ABORTREADERS
1113         if(finalResponse == TRANS_COMMIT) {
1114           removetransaction(tosend[i].oidmod,tosend[i].f.nummod);
1115           removethisreadtransaction(tosend[i].objread,tosend[i].f.numread);
1116         } else if (!treplyretry) {
1117           removethistransaction(tosend[i].oidmod,tosend[i].f.nummod);
1118           removethisreadtransaction(tosend[i].objread,tosend[i].f.numread);
1119         }
1120 #endif
1121       }
1122     }
1123
1124     /* Free resources */
1125     free(tosend);
1126     free(listmid);
1127     if (!treplyretry)
1128       pDelete(pile_ptr);
1129     /* wait a random amount of time before retrying to commit transaction*/
1130     if(treplyretry) {
1131       randomdelay();
1132 #ifdef TRANSSTATS
1133       nSoftAbort++;
1134 #endif
1135     }
1136     /* Retry trans commit procedure during soft_abort case */
1137   } while (treplyretry);
1138
1139   if(finalResponse == TRANS_ABORT) {
1140     //printf("Aborting trans\n");
1141 #ifdef TRANSSTATS
1142     LOGEVENT('A');
1143     numTransAbort++;
1144 #endif
1145     /* Free Resources */
1146     objstrDelete(t_cache);
1147     t_chashDelete();
1148     return TRANS_ABORT;
1149   } else if(finalResponse == TRANS_COMMIT) {
1150 #ifdef TRANSSTATS
1151     LOGEVENT('C');
1152     numTransCommit++;
1153 #endif
1154     /* Free Resources */
1155     objstrDelete(t_cache);
1156     t_chashDelete();
1157     return 0;
1158   } else {
1159     //TODO Add other cases
1160     printf("Error: in %s() THIS SHOULD NOT HAPPEN.....EXIT PROGRAM\n", __func__);
1161     exit(-1);
1162   }
1163   return 0;
1164 }
1165
1166 /* This function handles the local objects involved in a transaction
1167  * commiting process.  It also makes a decision if this local machine
1168  * sends AGREE or DISAGREE or SOFT_ABORT to coordinator */
1169 void handleLocalReq(trans_req_data_t *tdata, trans_commit_data_t *transinfo, char *getReplyCtrl) {
1170   unsigned int *oidnotfound = NULL, *oidlocked = NULL;
1171   int numoidnotfound = 0, numoidlocked = 0;
1172   int v_nomatch = 0, v_matchlock = 0, v_matchnolock = 0;
1173   int numread, i;
1174   unsigned int oid;
1175   unsigned short version;
1176
1177   /* Counters and arrays to formulate decision on control message to be sent */
1178   oidnotfound = (unsigned int *) calloc((tdata->f.numread + tdata->f.nummod), sizeof(unsigned int));
1179   oidlocked = (unsigned int *) calloc((tdata->f.numread + tdata->f.nummod +1), sizeof(unsigned int)); // calloc additional 1 byte for
1180   //setting a divider between read and write locks
1181   numread = tdata->f.numread;
1182   /* Process each oid in the machine pile/ group per thread */
1183   for (i = 0; i < tdata->f.numread + tdata->f.nummod; i++) {
1184     if (i < tdata->f.numread) {
1185       int incr = sizeof(unsigned int) + sizeof(unsigned short); // Offset that points to next position in the objread array
1186       incr *= i;
1187       oid = *((unsigned int *)(((char *)tdata->objread) + incr));
1188       version = *((unsigned short *)(((char *)tdata->objread) + incr + sizeof(unsigned int)));
1189       commitCountForObjRead(getReplyCtrl, oidnotfound, oidlocked, &numoidnotfound, &numoidlocked, &v_nomatch, &v_matchlock, &v_matchnolock, oid, version);
1190     } else { // Objects Modified
1191       if(i == tdata->f.numread) {
1192         oidlocked[numoidlocked++] = -1;
1193       }
1194       int tmpsize;
1195       objheader_t *headptr;
1196       headptr = (objheader_t *) t_chashSearch(tdata->oidmod[i-numread]);
1197       if (headptr == NULL) {
1198         printf("Error: handleLocalReq() returning NULL, no such oid %s, %d\n", __FILE__, __LINE__);
1199         return;
1200       }
1201       oid = OID(headptr);
1202       version = headptr->version;
1203       commitCountForObjMod(getReplyCtrl, oidnotfound, oidlocked, &numoidnotfound, &numoidlocked, &v_nomatch, &v_matchlock, &v_matchnolock, oid, version);
1204     }
1205   }
1206
1207 /* Fill out the trans_commit_data_t data structure. This is required for a trans commit process
1208  * if Participant receives a TRANS_COMMIT */
1209   transinfo->objlocked = oidlocked;
1210   transinfo->objnotfound = oidnotfound;
1211   transinfo->modptr = NULL;
1212   transinfo->numlocked = numoidlocked;
1213   transinfo->numnotfound = numoidnotfound;
1214
1215   /* Condition to send TRANS_AGREE */
1216   if(v_matchnolock == tdata->f.numread + tdata->f.nummod) {
1217     *getReplyCtrl = TRANS_AGREE;
1218   }
1219   /* Condition to send TRANS_SOFT_ABORT */
1220   if((v_matchlock > 0 && v_nomatch == 0) || (numoidnotfound > 0 && v_nomatch == 0)) {
1221     *getReplyCtrl = TRANS_SOFT_ABORT;
1222   }
1223 }
1224
1225 void doLocalProcess(char finalResponse, trans_req_data_t *tdata, trans_commit_data_t *transinfo) {
1226   if(finalResponse == TRANS_ABORT) {
1227     if(transAbortProcess(transinfo) != 0) {
1228       printf("Error in transAbortProcess() %s,%d\n", __FILE__, __LINE__);
1229       fflush(stdout);
1230       return;
1231     }
1232   } else if(finalResponse == TRANS_COMMIT) {
1233 #ifdef CACHE
1234     /* Invalidate objects in other machine cache */
1235     if(tdata->f.nummod > 0) {
1236       int retval;
1237       if((retval = invalidateObj(tdata)) != 0) {
1238         printf("Error: %s() in invalidating Objects %s, %d\n", __func__, __FILE__, __LINE__);
1239         return;
1240       }
1241     }
1242 #endif
1243     if(transComProcess(tdata, transinfo) != 0) {
1244       printf("Error in transComProcess() %s,%d\n", __FILE__, __LINE__);
1245       fflush(stdout);
1246       return;
1247     }
1248   } else {
1249     printf("ERROR...No Decision\n");
1250   }
1251
1252   /* Free memory */
1253   if (transinfo->objlocked != NULL) {
1254     free(transinfo->objlocked);
1255   }
1256   if (transinfo->objnotfound != NULL) {
1257     free(transinfo->objnotfound);
1258   }
1259 }
1260
1261 /* This function decides the reponse that needs to be sent to
1262  * all Participant machines after the TRANS_REQUEST protocol */
1263 char decideResponse(char *getReplyCtrl, char *treplyretry, int pilecount) {
1264   int i, transagree = 0, transdisagree = 0, transsoftabort = 0; /* Counters to formulate decision of what
1265                                                                    message to send */
1266   for (i = 0 ; i < pilecount; i++) {
1267     char control;
1268     control = getReplyCtrl[i];
1269     switch(control) {
1270     default:
1271       printf("Participant sent unknown message in %s, %d\n", __FILE__, __LINE__);
1272
1273       /* treat as disagree, pass thru */
1274     case TRANS_DISAGREE:
1275       transdisagree++;
1276       break;
1277
1278     case TRANS_AGREE:
1279       transagree++;
1280       break;
1281
1282     case TRANS_SOFT_ABORT:
1283       transsoftabort++;
1284       break;
1285     }
1286   }
1287
1288   if(transdisagree > 0) {
1289     /* Send Abort */
1290     *treplyretry = 0;
1291     return TRANS_ABORT;
1292 #ifdef CACHE
1293     /* clear objects from prefetch cache */
1294     cleanPCache();
1295 #endif
1296   } else if(transagree == pilecount) {
1297     /* Send Commit */
1298     *treplyretry = 0;
1299     return TRANS_COMMIT;
1300   } else {
1301     /* Send Abort in soft abort case followed by retry commiting transaction again*/
1302     *treplyretry = 1;
1303     return TRANS_ABORT;
1304   }
1305   return 0;
1306 }
1307
1308 /* This function opens a connection, places an object read request to
1309  * the remote machine, reads the control message and object if
1310  * available and copies the object and its header to the local
1311  * cache. */
1312
1313 void *getRemoteObj(unsigned int mnum, unsigned int oid) {
1314   int size, val;
1315   struct sockaddr_in serv_addr;
1316   char machineip[16];
1317   char control;
1318   objheader_t *h;
1319   void *objcopy = NULL;
1320
1321   int sd = getSock2(transReadSockPool, mnum);
1322   char readrequest[sizeof(char)+sizeof(unsigned int)];
1323   readrequest[0] = READ_REQUEST;
1324   *((unsigned int *)(&readrequest[1])) = oid;
1325   send_data(sd, readrequest, sizeof(readrequest));
1326
1327   /* Read response from the Participant */
1328   recv_data(sd, &control, sizeof(char));
1329
1330   if (control==OBJECT_NOT_FOUND) {
1331     objcopy = NULL;
1332   } else {
1333     /* Read object if found into local cache */
1334     recv_data(sd, &size, sizeof(int));
1335     objcopy = objstrAlloc(&t_cache, size);
1336     recv_data(sd, objcopy, size);
1337     STATUS(objcopy)=0;
1338     /* Insert into cache's lookup table */
1339     t_chashInsert(oid, objcopy);
1340 #ifdef TRANSSTATS
1341     totalObjSize += size;
1342 #endif
1343   }
1344
1345   return objcopy;
1346 }
1347
1348 /*  Commit info for objects modified */
1349 void commitCountForObjMod(char *getReplyCtrl, unsigned int *oidnotfound, unsigned int *oidlocked, int *numoidnotfound,
1350                           int *numoidlocked, int *v_nomatch, int *v_matchlock, int *v_matchnolock, unsigned int oid, unsigned short version) {
1351   void *mobj;
1352   /* Check if object is still present in the machine since the beginning of TRANS_REQUEST */
1353   /* Save the oids not found and number of oids not found for later use */
1354   if ((mobj = mhashSearch(oid)) == NULL) { /* Obj not found */
1355     /* Save the oids not found and number of oids not found for later use */
1356     oidnotfound[*numoidnotfound] = oid;
1357     (*numoidnotfound)++;
1358   } else { /* If Obj found in machine (i.e. has not moved) */
1359     /* Check if Obj is locked by any previous transaction */
1360     if (write_trylock(STATUSPTR(mobj))) { // Can acquire write lock
1361       if (version == ((objheader_t *)mobj)->version) {      /* match versions */
1362         (*v_matchnolock)++;
1363         //Keep track of what is locked
1364         oidlocked[(*numoidlocked)++] = OID(((objheader_t *)mobj));
1365       } else { /* If versions don't match ...HARD ABORT */
1366         (*v_nomatch)++;
1367         /* Send TRANS_DISAGREE to Coordinator */
1368         *getReplyCtrl = TRANS_DISAGREE;
1369
1370         //Keep track of what is locked
1371         oidlocked[(*numoidlocked)++] = OID(((objheader_t *)mobj));
1372         //printf("%s() oid = %d, type = %d\t", __func__, OID(mobj), TYPE((objheader_t *)mobj));
1373         return;
1374       }
1375     } else { //A lock is acquired some place else
1376       if (version == ((objheader_t *)mobj)->version) { /* Check if versions match */
1377         (*v_matchlock)++;
1378       } else { /* If versions don't match ...HARD ABORT */
1379         (*v_nomatch)++;
1380         /* Send TRANS_DISAGREE to Coordinator */
1381         *getReplyCtrl = TRANS_DISAGREE;
1382         //printf("%s() oid = %d, type = %d\t", __func__, OID(mobj), TYPE((objheader_t *)mobj));
1383         return;
1384       }
1385     }
1386   }
1387 }
1388
1389 /*  Commit info for objects modified */
1390 void commitCountForObjRead(char *getReplyCtrl, unsigned int *oidnotfound, unsigned int *oidlocked, int *numoidnotfound,
1391                            int *numoidlocked, int *v_nomatch, int *v_matchlock, int *v_matchnolock, unsigned int oid, unsigned short version) {
1392   void *mobj;
1393   /* Check if object is still present in the machine since the beginning of TRANS_REQUEST */
1394   /* Save the oids not found and number of oids not found for later use */
1395   if ((mobj = mhashSearch(oid)) == NULL) { /* Obj not found */
1396     /* Save the oids not found and number of oids not found for later use */
1397     oidnotfound[*numoidnotfound] = oid;
1398     (*numoidnotfound)++;
1399   } else { /* If Obj found in machine (i.e. has not moved) */
1400     /* Check if Obj is locked by any previous transaction */
1401     if (read_trylock(STATUSPTR(mobj))) { // Can further acquire read locks
1402       if (version == ((objheader_t *)mobj)->version) {      /* If locked then match versions */
1403         (*v_matchnolock)++;
1404         //Keep track of what is locked
1405         oidlocked[(*numoidlocked)++] = OID(((objheader_t *)mobj));
1406       } else { /* If versions don't match ...HARD ABORT */
1407         (*v_nomatch)++;
1408         /* Send TRANS_DISAGREE to Coordinator */
1409         *getReplyCtrl = TRANS_DISAGREE;
1410         //Keep track of what is locked
1411         oidlocked[(*numoidlocked)++] = OID(((objheader_t *)mobj));
1412         //printf("%s() oid = %d, type = %d\t", __func__, OID(mobj), TYPE((objheader_t *)mobj));
1413         return;
1414       }
1415     } else { //Has reached max number of readers or some other transaction
1416       //has acquired a lock on this object
1417       if (version == ((objheader_t *)mobj)->version) { /* Check if versions match */
1418         (*v_matchlock)++;
1419       } else { /* If versions don't match ...HARD ABORT */
1420         (*v_nomatch)++;
1421         /* Send TRANS_DISAGREE to Coordinator */
1422         *getReplyCtrl = TRANS_DISAGREE;
1423         //printf("%s() oid = %d, type = %d\t", __func__, OID(mobj), TYPE((objheader_t *)mobj));
1424         return;
1425       }
1426     }
1427   }
1428 }
1429
1430 /* This function completes the ABORT process if the transaction is aborting */
1431 int transAbortProcess(trans_commit_data_t *transinfo) {
1432   int i, numlocked;
1433   unsigned int *objlocked;
1434   void *header;
1435
1436   numlocked = transinfo->numlocked;
1437   objlocked = transinfo->objlocked;
1438
1439   int useWriteUnlock = 0;
1440   for (i = 0; i < numlocked; i++) {
1441     if(objlocked[i] == -1) {
1442       useWriteUnlock = 1;
1443       continue;
1444     }
1445     if((header = mhashSearch(objlocked[i])) == NULL) {
1446       printf("mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__);
1447       return 1;
1448     }
1449     if(!useWriteUnlock) {
1450       read_unlock(STATUSPTR(header));
1451     } else {
1452       write_unlock(STATUSPTR(header));
1453     }
1454   }
1455
1456   return 0;
1457 }
1458
1459 /*This function completes the COMMIT process if the transaction is commiting*/
1460 int transComProcess(trans_req_data_t *tdata, trans_commit_data_t *transinfo) {
1461   objheader_t *header, *tcptr;
1462   int i, nummod, tmpsize, numcreated, numlocked;
1463   unsigned int *oidmod, *oidcreated, *oidlocked;
1464   void *ptrcreate;
1465
1466   nummod = tdata->f.nummod;
1467   oidmod = tdata->oidmod;
1468   numcreated = tdata->f.numcreated;
1469   oidcreated = tdata->oidcreated;
1470   numlocked = transinfo->numlocked;
1471   oidlocked = transinfo->objlocked;
1472
1473   for (i = 0; i < nummod; i++) {
1474     if((header = (objheader_t *) mhashSearch(oidmod[i])) == NULL) {
1475       printf("Error: transComProcess() mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__);
1476       return 1;
1477     }
1478     /* Copy from transaction cache -> main object store */
1479     if ((tcptr = ((objheader_t *) t_chashSearch(oidmod[i]))) == NULL) {
1480       printf("Error: transComProcess() chashSearch returned NULL at %s, %d\n", __FILE__, __LINE__);
1481       return 1;
1482     }
1483     GETSIZE(tmpsize, header);
1484     char *tmptcptr = (char *) tcptr;
1485     {
1486       struct ___Object___ *dst=(struct ___Object___*)((char*)header+sizeof(objheader_t));
1487       struct ___Object___ *src=(struct ___Object___*)((char*)tmptcptr+sizeof(objheader_t));
1488       dst->___cachedCode___=src->___cachedCode___;
1489       dst->___cachedHash___=src->___cachedHash___;
1490
1491       memcpy(&dst[1], &src[1], tmpsize-sizeof(struct ___Object___));
1492     }
1493
1494     header->version += 1;
1495     if(header->notifylist != NULL) {
1496       notifyAll(&header->notifylist, OID(header), header->version);
1497     }
1498   }
1499   /* If object is newly created inside transaction then commit it */
1500   for (i = 0; i < numcreated; i++) {
1501     if ((header = ((objheader_t *) t_chashSearch(oidcreated[i]))) == NULL) {
1502       printf("Error: transComProcess() chashSearch returned NULL for oid = %x at %s, %d\n", oidcreated[i], __FILE__, __LINE__);
1503       return 1;
1504     }
1505     GETSIZE(tmpsize, header);
1506     tmpsize += sizeof(objheader_t);
1507     pthread_mutex_lock(&mainobjstore_mutex);
1508     if ((ptrcreate = objstrAlloc(&mainobjstore, tmpsize)) == NULL) {
1509       printf("Error: transComProcess() failed objstrAlloc %s, %d\n", __FILE__, __LINE__);
1510       pthread_mutex_unlock(&mainobjstore_mutex);
1511       return 1;
1512     }
1513     pthread_mutex_unlock(&mainobjstore_mutex);
1514     /* Initialize read and write locks */
1515     initdsmlocks(STATUSPTR(header));
1516     memcpy(ptrcreate, header, tmpsize);
1517     mhashInsert(oidcreated[i], ptrcreate);
1518     lhashInsert(oidcreated[i], myIpAddr);
1519   }
1520   /* Unlock locked objects */
1521   int useWriteUnlock = 0;
1522   for(i = 0; i < numlocked; i++) {
1523     if(oidlocked[i] == -1) {
1524       useWriteUnlock = 1;
1525       continue;
1526     }
1527     if((header = (objheader_t *) mhashSearch(oidlocked[i])) == NULL) {
1528       printf("mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__);
1529       return 1;
1530     }
1531     if(!useWriteUnlock) {
1532       read_unlock(STATUSPTR(header));
1533     } else {
1534       write_unlock(STATUSPTR(header));
1535     }
1536   }
1537   return 0;
1538 }
1539
1540 prefetchpile_t *foundLocal(char *ptr, int numprefetches) {
1541   int i;
1542   int j;
1543   prefetchpile_t * head=NULL;
1544
1545   for(j=0;j<numprefetches;j++) {
1546     int siteid = *(GET_SITEID(ptr));
1547     int ntuples = *(GET_NTUPLES(ptr));
1548     unsigned int * oidarray = GET_PTR_OID(ptr);
1549     unsigned short * endoffsets = GET_PTR_EOFF(ptr, ntuples);
1550     short * arryfields = GET_PTR_ARRYFLD(ptr, ntuples);
1551     int numLocal = 0;
1552     
1553     for(i=0; i<ntuples; i++) {
1554       unsigned short baseindex=(i==0) ? 0 : endoffsets[i-1];
1555       unsigned short endindex=endoffsets[i];
1556       unsigned int oid=oidarray[i];
1557       int newbase;
1558       int machinenum;
1559       
1560       if (oid==0)
1561         continue;
1562       //Look up fields locally
1563       for(newbase=baseindex; newbase<endindex; newbase++) {
1564         if (!lookupObject(&oid, arryfields[newbase]))
1565           break;
1566         //Ended in a null pointer...
1567         if (oid==0)
1568           goto tuple;
1569       }
1570       //Entire prefetch is local
1571       if (newbase==endindex&&checkoid(oid)) {
1572         numLocal++;
1573         goto tuple;
1574       }
1575       //Add to remote requests
1576       machinenum=lhashSearch(oid);
1577       insertPile(machinenum, oid, endindex-newbase, &arryfields[newbase], &head);
1578     tuple:
1579       ;
1580     }
1581     
1582     /* handle dynamic prefetching */
1583     handleDynPrefetching(numLocal, ntuples, siteid);
1584     ptr=((char *)&arryfields[endoffsets[ntuples-1]])+sizeof(int);
1585   }
1586
1587   return head;
1588 }
1589
1590 int checkoid(unsigned int oid) {
1591   objheader_t *header;
1592   if ((header=mhashSearch(oid))!=NULL) {
1593     //Found on machine
1594     return 1;
1595   } else if ((header=prehashSearch(oid))!=NULL) {
1596     //Found in cache
1597     return 1;
1598   } else {
1599     return 0;
1600   }
1601 }
1602
1603 int lookupObject(unsigned int * oid, short offset) {
1604   objheader_t *header;
1605   if ((header=mhashSearch(*oid))!=NULL) {
1606     //Found on machine
1607     ;
1608   } else if ((header=prehashSearch(*oid))!=NULL) {
1609     //Found in cache
1610     ;
1611   } else {
1612     return 0;
1613   }
1614
1615   if(TYPE(header) >= NUMCLASSES) {
1616     int elementsize = classsize[TYPE(header)];
1617     struct ArrayObject *ao = (struct ArrayObject *) (((char *)header) + sizeof(objheader_t));
1618     int length = ao->___length___;
1619     /* Check if array out of bounds */
1620     if(offset < 0 || offset >= length) {
1621       //if yes treat the object as found
1622       (*oid)=0;
1623       return 1;
1624     }
1625     (*oid) = *((unsigned int *)(((char *)ao) + sizeof(struct ArrayObject) + (elementsize*offset)));
1626     return 1;
1627   } else {
1628     (*oid) = *((unsigned int *)(((char *)header) + sizeof(objheader_t) + offset));
1629     return 1;
1630   }
1631 }
1632
1633
1634 /* This function is called by the thread calling transPrefetch */
1635 void *transPrefetch(void *t) {
1636   while(1) {
1637     /* read from prefetch queue */
1638     void *node=gettail();
1639     /* Check if the tuples are found locally, if yes then reduce them further*/
1640     /* and group requests by remote machine ids by calling the makePreGroups() */
1641     int count=numavailable();
1642     prefetchpile_t *pilehead = foundLocal(node, count);
1643
1644     if (pilehead!=NULL) {
1645       // Get sock from shared pool
1646
1647       /* Send  Prefetch Request */
1648       prefetchpile_t *ptr = pilehead;
1649       while(ptr != NULL) {
1650         int sd = getSock2(transPrefetchSockPool, ptr->mid);
1651         sendPrefetchReq(ptr, sd);
1652         ptr = ptr->next;
1653       }
1654
1655       /* Release socket */
1656       //        freeSock(transPrefetchSockPool, pilehead->mid, sd);
1657
1658       /* Deallocated pilehead */
1659       mcdealloc(pilehead);
1660     }
1661     // Deallocate the prefetch queue pile node
1662     incmulttail(count);
1663   }
1664 }
1665
1666 void sendPrefetchReqnew(prefetchpile_t *mcpilenode, int sd) {
1667   objpile_t *tmp;
1668
1669   int size=sizeof(char)+sizeof(int);
1670   for(tmp=mcpilenode->objpiles; tmp!=NULL; tmp=tmp->next) {
1671     size += sizeof(int) + sizeof(unsigned int) + sizeof(unsigned int) + ((tmp->numoffset) * sizeof(short));
1672   }
1673
1674   char buft[size];
1675   char *buf=buft;
1676   *buf=TRANS_PREFETCH;
1677   buf+=sizeof(char);
1678
1679   for(tmp=mcpilenode->objpiles; tmp!=NULL; tmp=tmp->next) {
1680     int len = sizeof(int) + sizeof(unsigned int) + sizeof(unsigned int) + ((tmp->numoffset) * sizeof(short));
1681     *((int*)buf)=len;
1682     buf+=sizeof(int);
1683     *((unsigned int *)buf)=tmp->oid;
1684     buf+=sizeof(unsigned int);
1685     *((unsigned int *)(buf)) = myIpAddr;
1686     buf+=sizeof(unsigned int);
1687     memcpy(buf, tmp->offset, tmp->numoffset*sizeof(short));
1688     buf+=tmp->numoffset*sizeof(short);
1689   }
1690   *((int *)buf)=-1;
1691   send_data(sd, buft, size);
1692   return;
1693 }
1694
1695 void sendPrefetchReq(prefetchpile_t *mcpilenode, int sd) {
1696   int len, endpair;
1697   char control;
1698   objpile_t *tmp;
1699   struct writestruct writebuffer;
1700   writebuffer.offset=0;
1701
1702   /* Send TRANS_PREFETCH control message */
1703   int first=1;
1704
1705   /* Send Oids and offsets in pairs */
1706   tmp = mcpilenode->objpiles;
1707   while(tmp != NULL) {
1708     len = sizeof(int) + sizeof(unsigned int) + sizeof(unsigned int) + ((tmp->numoffset) * sizeof(short));
1709     char oidnoffset[len+5];
1710     char *buf=oidnoffset;
1711     if (first) {
1712       *buf=TRANS_PREFETCH;
1713       buf++;len++;
1714       first=0;
1715     }
1716     *((int*)buf) = tmp->numoffset;
1717     buf+=sizeof(int);
1718     *((unsigned int *)buf) = tmp->oid;
1719 #ifdef TRANSSTATS
1720     sendRemoteReq++;
1721 #endif
1722     buf+=sizeof(unsigned int);
1723     *((unsigned int *)buf) = myIpAddr;
1724     buf += sizeof(unsigned int);
1725     memcpy(buf, tmp->offset, (tmp->numoffset)*sizeof(short));
1726     tmp = tmp->next;
1727     if (tmp==NULL) {
1728       *((int *)(&oidnoffset[len]))=-1;
1729       len+=sizeof(int);
1730     }
1731     if (tmp!=NULL)
1732       send_buf(sd, & writebuffer, oidnoffset, len);
1733     else
1734       forcesend_buf(sd, & writebuffer, oidnoffset, len);
1735   }
1736
1737   LOGEVENT('S');
1738   return;
1739 }
1740
1741 int getPrefetchResponse(int sd, struct readstruct *readbuffer) {
1742   int length = 0, size = 0;
1743   char control;
1744   unsigned int oid;
1745   void *modptr, *oldptr;
1746
1747   recv_data_buf(sd, readbuffer, &length, sizeof(int));
1748   size = length - sizeof(int);
1749   char recvbuffer[size];
1750 #ifdef TRANSSTATS
1751     getResponse++;
1752     LOGEVENT('Z');
1753 #endif
1754     recv_data_buf(sd, readbuffer, recvbuffer, size);
1755   control = *((char *) recvbuffer);
1756   if(control == OBJECT_FOUND) {
1757     oid = *((unsigned int *)(recvbuffer + sizeof(char)));
1758     size = size - (sizeof(char) + sizeof(unsigned int));
1759     pthread_mutex_lock(&prefetchcache_mutex);
1760     if ((modptr = prefetchobjstrAlloc(size)) == NULL) {
1761       printf("Error: objstrAlloc error for copying into prefetch cache %s, %d\n", __FILE__, __LINE__);
1762       pthread_mutex_unlock(&prefetchcache_mutex);
1763       return -1;
1764     }
1765     pthread_mutex_unlock(&prefetchcache_mutex);
1766     memcpy(modptr, recvbuffer + sizeof(char) + sizeof(unsigned int), size);
1767     STATUS(modptr)=0;
1768
1769     /* Insert the oid and its address into the prefetch hash lookup table */
1770     /* Do a version comparison if the oid exists */
1771     if((oldptr = prehashSearch(oid)) != NULL) {
1772       /* If older version then update with new object ptr */
1773       if(((objheader_t *)oldptr)->version <= ((objheader_t *)modptr)->version) {
1774         prehashRemove(oid);
1775         prehashInsert(oid, modptr);
1776       }
1777     } else { /* Else add the object ptr to hash table*/
1778       prehashInsert(oid, modptr);
1779     }
1780     /* Lock the Prefetch Cache look up table*/
1781     pthread_mutex_lock(&pflookup.lock);
1782     /* Broadcast signal on prefetch cache condition variable */
1783     pthread_cond_broadcast(&pflookup.cond);
1784     /* Unlock the Prefetch Cache look up table*/
1785     pthread_mutex_unlock(&pflookup.lock);
1786   } else if(control == OBJECT_NOT_FOUND) {
1787     oid = *((unsigned int *)(recvbuffer + sizeof(char)));
1788     /* TODO: For each object not found query DHT for new location and retrieve the object */
1789     /* Throw an error */
1790     //printf("OBJECT %x NOT FOUND.... THIS SHOULD NOT HAPPEN...TERMINATE PROGRAM\n", oid);
1791     //    exit(-1);
1792   } else {
1793     printf("Error: in decoding the control value %d, %s, %d\n",control, __FILE__, __LINE__);
1794   }
1795
1796   return 0;
1797 }
1798
1799 unsigned short getObjType(unsigned int oid) {
1800   objheader_t *objheader;
1801   unsigned short numoffset[] ={0};
1802   short fieldoffset[] ={};
1803
1804   if ((objheader = (objheader_t *) mhashSearch(oid)) == NULL) {
1805 #ifdef CACHE
1806     if ((objheader = (objheader_t *) prehashSearch(oid)) == NULL) {
1807 #endif
1808     unsigned int mid = lhashSearch(oid);
1809     int sd = getSock2(transReadSockPool, mid);
1810     char remotereadrequest[sizeof(char)+sizeof(unsigned int)];
1811     remotereadrequest[0] = READ_REQUEST;
1812     *((unsigned int *)(&remotereadrequest[1])) = oid;
1813     send_data(sd, remotereadrequest, sizeof(remotereadrequest));
1814
1815     /* Read response from the Participant */
1816     char control;
1817     recv_data(sd, &control, sizeof(char));
1818
1819     if (control==OBJECT_NOT_FOUND) {
1820       printf("Error: in %s() THIS SHOULD NOT HAPPEN.....EXIT PROGRAM\n", __func__);
1821       fflush(stdout);
1822       exit(-1);
1823     } else {
1824       /* Read object if found into local cache */
1825       int size;
1826       recv_data(sd, &size, sizeof(int));
1827 #ifdef CACHE
1828       pthread_mutex_lock(&prefetchcache_mutex);
1829       if ((objheader = prefetchobjstrAlloc(size)) == NULL) {
1830         printf("Error: %s() objstrAlloc error for copying into prefetch cache %s, %d\n", __func__, __FILE__, __LINE__);
1831         pthread_exit(NULL);
1832       }
1833       pthread_mutex_unlock(&prefetchcache_mutex);
1834       recv_data(sd, objheader, size);
1835       prehashInsert(oid, objheader);
1836       return TYPE(objheader);
1837 #else
1838       char *buffer;
1839       if((buffer = calloc(1, size)) == NULL) {
1840         printf("%s() Calloc Error %s at line %d\n", __func__, __FILE__, __LINE__);
1841         fflush(stdout);
1842         return 0;
1843       }
1844       recv_data(sd, buffer, size);
1845       objheader = (objheader_t *)buffer;
1846       unsigned short type = TYPE(objheader);
1847       free(buffer);
1848       return type;
1849 #endif
1850     }
1851 #ifdef CACHE
1852   }
1853 #endif
1854   }
1855   return TYPE(objheader);
1856 }
1857
1858 int startRemoteThread(unsigned int oid, unsigned int mid) {
1859   int sock;
1860   struct sockaddr_in remoteAddr;
1861   char msg[1 + sizeof(unsigned int)];
1862   int bytesSent;
1863   int status;
1864
1865   if ((sock = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
1866     perror("startRemoteThread():socket()");
1867     return -1;
1868   }
1869
1870   bzero(&remoteAddr, sizeof(remoteAddr));
1871   remoteAddr.sin_family = AF_INET;
1872   remoteAddr.sin_port = htons(LISTEN_PORT);
1873   remoteAddr.sin_addr.s_addr = htonl(mid);
1874
1875   if (connect(sock, (struct sockaddr *)&remoteAddr, sizeof(remoteAddr)) < 0) {
1876     printf("startRemoteThread():error %d connecting to %s:%d\n", errno,
1877            inet_ntoa(remoteAddr.sin_addr), LISTEN_PORT);
1878     status = -1;
1879   } else
1880   {
1881     msg[0] = START_REMOTE_THREAD;
1882     *((unsigned int *) &msg[1]) = oid;
1883     send_data(sock, msg, 1 + sizeof(unsigned int));
1884   }
1885
1886   close(sock);
1887   return status;
1888 }
1889
1890 //TODO: when reusing oids, make sure they are not already in use!
1891 static unsigned int id = 0xFFFFFFFF;
1892 unsigned int getNewOID(void) {
1893   id += 2;
1894   if (id > oidMax || id < oidMin) {
1895     id = (oidMin | 1);
1896   }
1897   return id;
1898 }
1899
1900 int processConfigFile() {
1901   FILE *configFile;
1902   const int maxLineLength = 200;
1903   char lineBuffer[maxLineLength];
1904   char *token;
1905   const char *delimiters = " \t\n";
1906   char *commentBegin;
1907   in_addr_t tmpAddr;
1908
1909   configFile = fopen(CONFIG_FILENAME, "r");
1910   if (configFile == NULL) {
1911     printf("error opening %s:\n", CONFIG_FILENAME);
1912     perror("");
1913     return -1;
1914   }
1915
1916   numHostsInSystem = 0;
1917   sizeOfHostArray = 8;
1918   hostIpAddrs = calloc(sizeOfHostArray, sizeof(unsigned int));
1919
1920   while(fgets(lineBuffer, maxLineLength, configFile) != NULL) {
1921     commentBegin = strchr(lineBuffer, '#');
1922     if (commentBegin != NULL)
1923       *commentBegin = '\0';
1924     token = strtok(lineBuffer, delimiters);
1925     while (token != NULL) {
1926       tmpAddr = inet_addr(token);
1927       if ((int)tmpAddr == -1) {
1928         printf("error in %s: bad token:%s\n", CONFIG_FILENAME, token);
1929         fclose(configFile);
1930         return -1;
1931       } else
1932         addHost(htonl(tmpAddr));
1933       token = strtok(NULL, delimiters);
1934     }
1935   }
1936
1937   fclose(configFile);
1938
1939   if (numHostsInSystem < 1) {
1940     printf("error in %s: no IP Adresses found\n", CONFIG_FILENAME);
1941     return -1;
1942   }
1943 #ifdef MAC
1944   myIpAddr = getMyIpAddr("en1");
1945 #else
1946   myIpAddr = getMyIpAddr("eth0");
1947 #endif
1948   myIndexInHostArray = findHost(myIpAddr);
1949   if (myIndexInHostArray == -1) {
1950     printf("error in %s: IP Address of eth0 not found\n", CONFIG_FILENAME);
1951     return -1;
1952   }
1953   oidsPerBlock = (0xFFFFFFFF / numHostsInSystem) + 1;
1954   oidMin = oidsPerBlock * myIndexInHostArray;
1955   if (myIndexInHostArray == numHostsInSystem - 1)
1956     oidMax = 0xFFFFFFFF;
1957   else
1958     oidMax = oidsPerBlock * (myIndexInHostArray + 1) - 1;
1959
1960   return 0;
1961 }
1962
1963 void addHost(unsigned int hostIp) {
1964   unsigned int *tmpArray;
1965
1966   if (findHost(hostIp) != -1)
1967     return;
1968
1969   if (numHostsInSystem == sizeOfHostArray) {
1970     tmpArray = calloc(sizeOfHostArray * 2, sizeof(unsigned int));
1971     memcpy(tmpArray, hostIpAddrs, sizeof(unsigned int) * numHostsInSystem);
1972     free(hostIpAddrs);
1973     hostIpAddrs = tmpArray;
1974   }
1975
1976   hostIpAddrs[numHostsInSystem++] = hostIp;
1977
1978   return;
1979 }
1980
1981 int findHost(unsigned int hostIp) {
1982   int i;
1983   for (i = 0; i < numHostsInSystem; i++)
1984     if (hostIpAddrs[i] == hostIp)
1985       return i;
1986
1987   //not found
1988   return -1;
1989 }
1990
1991 /* This function sends notification request per thread waiting on object(s) whose version
1992  * changes */
1993 int reqNotify(unsigned int *oidarry, unsigned short *versionarry, unsigned int numoid) {
1994   int sock,i;
1995   objheader_t *objheader;
1996   struct sockaddr_in remoteAddr;
1997   char msg[1 + numoid * (sizeof(unsigned short) + sizeof(unsigned int)) +  3 * sizeof(unsigned int)];
1998   char *ptr;
1999   int bytesSent;
2000   int status, size;
2001   unsigned short version;
2002   unsigned int oid,mid;
2003   static unsigned int threadid = 0;
2004   pthread_mutex_t threadnotify = PTHREAD_MUTEX_INITIALIZER; //Lock and condition var for threadjoin and notification
2005   pthread_cond_t threadcond = PTHREAD_COND_INITIALIZER;
2006   notifydata_t *ndata;
2007
2008   oid = oidarry[0];
2009   if((mid = lhashSearch(oid)) == 0) {
2010     printf("Error: %s() No such machine found for oid =%x\n",__func__, oid);
2011     return;
2012   }
2013
2014   if ((sock = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
2015     perror("reqNotify():socket()");
2016     return -1;
2017   }
2018
2019   bzero(&remoteAddr, sizeof(remoteAddr));
2020   remoteAddr.sin_family = AF_INET;
2021   remoteAddr.sin_port = htons(LISTEN_PORT);
2022   remoteAddr.sin_addr.s_addr = htonl(mid);
2023
2024   /* Generate unique threadid */
2025   threadid++;
2026
2027   /* Save threadid, numoid, oidarray, versionarray, pthread_cond_variable for later processing */
2028   if((ndata = calloc(1, sizeof(notifydata_t))) == NULL) {
2029     printf("Calloc Error %s, %d\n", __FILE__, __LINE__);
2030     return -1;
2031   }
2032   ndata->numoid = numoid;
2033   ndata->threadid = threadid;
2034   ndata->oidarry = oidarry;
2035   ndata->versionarry = versionarry;
2036   ndata->threadcond = threadcond;
2037   ndata->threadnotify = threadnotify;
2038   if((status = notifyhashInsert(threadid, ndata)) != 0) {
2039     printf("reqNotify(): Insert into notify hash table not successful %s, %d\n", __FILE__, __LINE__);
2040     free(ndata);
2041     return -1;
2042   }
2043
2044   /* Send  number of oids, oidarry, version array, machine id and threadid */
2045   if (connect(sock, (struct sockaddr *)&remoteAddr, sizeof(remoteAddr)) < 0) {
2046     printf("reqNotify():error %d connecting to %s:%d\n", errno,
2047            inet_ntoa(remoteAddr.sin_addr), LISTEN_PORT);
2048     free(ndata);
2049     return -1;
2050   } else {
2051     msg[0] = THREAD_NOTIFY_REQUEST;
2052     *((unsigned int *)(&msg[1])) = numoid;
2053     /* Send array of oids  */
2054     size = sizeof(unsigned int);
2055
2056     for(i = 0;i < numoid; i++) {
2057       oid = oidarry[i];
2058       *((unsigned int *)(&msg[1] + size)) = oid;
2059       size += sizeof(unsigned int);
2060     }
2061
2062     /* Send array of version  */
2063     for(i = 0;i < numoid; i++) {
2064       version = versionarry[i];
2065       *((unsigned short *)(&msg[1] + size)) = version;
2066       size += sizeof(unsigned short);
2067     }
2068
2069     *((unsigned int *)(&msg[1] + size)) = myIpAddr; size += sizeof(unsigned int);
2070     *((unsigned int *)(&msg[1] + size)) = threadid;
2071     pthread_mutex_lock(&(ndata->threadnotify));
2072     size = 1 + numoid * (sizeof(unsigned int) + sizeof(unsigned short)) + 3 * sizeof(unsigned int);
2073     send_data(sock, msg, size);
2074     pthread_cond_wait(&(ndata->threadcond), &(ndata->threadnotify));
2075     pthread_mutex_unlock(&(ndata->threadnotify));
2076   }
2077
2078   pthread_cond_destroy(&threadcond);
2079   pthread_mutex_destroy(&threadnotify);
2080   free(ndata);
2081   close(sock);
2082   return status;
2083 }
2084
2085 void threadNotify(unsigned int oid, unsigned short version, unsigned int tid) {
2086   notifydata_t *ndata;
2087   int i, objIsFound = 0, index;
2088   void *ptr;
2089
2090   //Look up the tid and call the corresponding pthread_cond_signal
2091   if((ndata = notifyhashSearch(tid)) == NULL) {
2092     printf("threadnotify(): No such threadid is present %s, %d\n", __FILE__, __LINE__);
2093     return;
2094   } else  {
2095     for(i = 0; i < ndata->numoid; i++) {
2096       if(ndata->oidarry[i] == oid) {
2097         objIsFound = 1;
2098         index = i;
2099       }
2100     }
2101     if(objIsFound == 0) {
2102       printf("threadNotify(): Oid not found %s, %d\n", __FILE__, __LINE__);
2103       return;
2104     } else {
2105       if(version <= ndata->versionarry[index]) {
2106         printf("threadNotify(): New version %d has not changed since last version for oid = %d, %s, %d\n", version, oid, __FILE__, __LINE__);
2107         return;
2108       } else {
2109 #ifdef CACHE
2110         /* Clear from prefetch cache and free thread related data structure */
2111         if((ptr = prehashSearch(oid)) != NULL) {
2112           prehashRemove(oid);
2113         }
2114 #endif
2115         pthread_mutex_lock(&(ndata->threadnotify));
2116         pthread_cond_signal(&(ndata->threadcond));
2117         pthread_mutex_unlock(&(ndata->threadnotify));
2118       }
2119     }
2120   }
2121   return;
2122 }
2123
2124 int notifyAll(threadlist_t **head, unsigned int oid, unsigned int version) {
2125   threadlist_t *ptr;
2126   unsigned int mid;
2127   struct sockaddr_in remoteAddr;
2128   char msg[1 + sizeof(unsigned short) + 2*sizeof(unsigned int)];
2129   int sock, status, size, bytesSent;
2130
2131   while(*head != NULL) {
2132     ptr = *head;
2133     mid = ptr->mid;
2134     //create a socket connection to that machine
2135     if ((sock = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
2136       perror("notifyAll():socket()");
2137       return -1;
2138     }
2139
2140     bzero(&remoteAddr, sizeof(remoteAddr));
2141     remoteAddr.sin_family = AF_INET;
2142     remoteAddr.sin_port = htons(LISTEN_PORT);
2143     remoteAddr.sin_addr.s_addr = htonl(mid);
2144     //send Thread Notify response and threadid to that machine
2145     if (connect(sock, (struct sockaddr *)&remoteAddr, sizeof(remoteAddr)) < 0) {
2146       printf("notifyAll():error %d connecting to %s:%d\n", errno,
2147              inet_ntoa(remoteAddr.sin_addr), LISTEN_PORT);
2148       fflush(stdout);
2149       status = -1;
2150     } else {
2151       bzero(msg, (1+sizeof(unsigned short) + 2*sizeof(unsigned int)));
2152       msg[0] = THREAD_NOTIFY_RESPONSE;
2153       *((unsigned int *)&msg[1]) = oid;
2154       size = sizeof(unsigned int);
2155       *((unsigned short *)(&msg[1]+ size)) = version;
2156       size+= sizeof(unsigned short);
2157       *((unsigned int *)(&msg[1]+ size)) = ptr->threadid;
2158
2159       size = 1 + 2*sizeof(unsigned int) + sizeof(unsigned short);
2160       send_data(sock, msg, size);
2161     }
2162     //close socket
2163     close(sock);
2164     // Update head
2165     *head = ptr->next;
2166     free(ptr);
2167   }
2168   return status;
2169 }
2170
2171 void transAbort() {
2172 #ifdef ABORTREADERS
2173   removetransactionhash();
2174 #endif
2175   objstrDelete(t_cache);
2176   t_chashDelete();
2177 }
2178
2179 /* This function inserts necessary information into
2180  * a machine pile data structure */
2181 plistnode_t *pInsert(plistnode_t *pile, objheader_t *headeraddr, unsigned int mid, int num_objs) {
2182   plistnode_t *ptr, *tmp;
2183   int found = 0, offset = 0;
2184
2185   tmp = pile;
2186   //Add oid into a machine that is already present in the pile linked list structure
2187   while(tmp != NULL) {
2188     if (tmp->mid == mid) {
2189       int tmpsize;
2190
2191       if (STATUS(headeraddr) & NEW) {
2192         tmp->oidcreated[tmp->numcreated] = OID(headeraddr);
2193         tmp->numcreated++;
2194         GETSIZE(tmpsize, headeraddr);
2195         tmp->sum_bytes += sizeof(objheader_t) + tmpsize;
2196       } else if (STATUS(headeraddr) & DIRTY) {
2197         tmp->oidmod[tmp->nummod] = OID(headeraddr);
2198         tmp->nummod++;
2199         GETSIZE(tmpsize, headeraddr);
2200         tmp->sum_bytes += sizeof(objheader_t) + tmpsize;
2201       } else {
2202         offset = (sizeof(unsigned int) + sizeof(short)) * tmp->numread;
2203         *((unsigned int *)(((char *)tmp->objread) + offset))=OID(headeraddr);
2204         offset += sizeof(unsigned int);
2205         *((short *)(((char *)tmp->objread) + offset)) = headeraddr->version;
2206         tmp->numread++;
2207       }
2208       found = 1;
2209       break;
2210     }
2211     tmp = tmp->next;
2212   }
2213   //Add oid for any new machine
2214   if (!found) {
2215     int tmpsize;
2216     if((ptr = pCreate(num_objs)) == NULL) {
2217       return NULL;
2218     }
2219     ptr->mid = mid;
2220     if (STATUS(headeraddr) & NEW) {
2221       ptr->oidcreated[ptr->numcreated] = OID(headeraddr);
2222       ptr->numcreated++;
2223       GETSIZE(tmpsize, headeraddr);
2224       ptr->sum_bytes += sizeof(objheader_t) + tmpsize;
2225     } else if (STATUS(headeraddr) & DIRTY) {
2226       ptr->oidmod[ptr->nummod] = OID(headeraddr);
2227       ptr->nummod++;
2228       GETSIZE(tmpsize, headeraddr);
2229       ptr->sum_bytes += sizeof(objheader_t) + tmpsize;
2230     } else {
2231       *((unsigned int *)ptr->objread)=OID(headeraddr);
2232       offset = sizeof(unsigned int);
2233       *((short *)(((char *)ptr->objread) + offset)) = headeraddr->version;
2234       ptr->numread++;
2235     }
2236     ptr->next = pile;
2237     pile = ptr;
2238   }
2239
2240   /* Clear Flags */
2241   STATUS(headeraddr) =0;
2242
2243
2244   return pile;
2245 }
2246
2247 plistnode_t *sortPiles(plistnode_t *pileptr) {
2248   plistnode_t *head, *ptr, *tail;
2249   head = pileptr;
2250   ptr = pileptr;
2251   /* Get tail pointer */
2252   while(ptr!= NULL) {
2253     tail = ptr;
2254     ptr = ptr->next;
2255   }
2256   ptr = pileptr;
2257   plistnode_t *prev = pileptr;
2258   /* Arrange local machine processing at the end of the pile list */
2259   while(ptr != NULL) {
2260     if(ptr != tail) {
2261       if(ptr->mid == myIpAddr && (prev != pileptr)) {
2262         prev->next = ptr->next;
2263         ptr->next = NULL;
2264         tail->next = ptr;
2265         return pileptr;
2266       }
2267       if((ptr->mid == myIpAddr) && (prev == pileptr)) {
2268         prev = ptr->next;
2269         ptr->next = NULL;
2270         tail->next = ptr;
2271         return prev;
2272       }
2273       prev = ptr;
2274     }
2275     ptr = ptr->next;
2276   }
2277   return pileptr;
2278 }