bug fix
[IRC.git] / Robust / src / Runtime / DSTM / interface / trans.c
1 #include "dstm.h"
2 #include "ip.h"
3 #include "machinepile.h"
4 #include "mlookup.h"
5 #include "llookup.h"
6 #include "plookup.h"
7 #include "prelookup.h"
8 #include "threadnotify.h"
9 #include "queue.h"
10 #include "addUdpEnhance.h"
11 #include "addPrefetchEnhance.h"
12 #include "gCollect.h"
13 #include "dsmlock.h"
14 #include "prefetch.h"
15 #ifdef COMPILER
16 #include "thread.h"
17 #endif
18 #ifdef ABORTREADERS
19 #include "abortreaders.h"
20 #endif
21 #include "trans.h"
22
23 #define NUM_THREADS 1
24 #define CONFIG_FILENAME "dstm.conf"
25
26 /* Thread transaction variables */
27
28 __thread objstr_t *t_cache;
29 __thread struct ___Object___ *revertlist;
30 #ifdef ABORTREADERS
31 __thread int t_abort;
32 __thread jmp_buf aborttrans;
33 #endif
34
35
36 /* Global Variables */
37 extern int classsize[];
38 pfcstats_t *evalPrefetch;
39 extern int numprefetchsites; //Global variable containing number of prefetch sites
40 extern pthread_mutex_t mainobjstore_mutex; // Mutex to lock main Object store
41 pthread_mutex_t prefetchcache_mutex; // Mutex to lock Prefetch Cache
42 pthread_mutexattr_t prefetchcache_mutex_attr; /* Attribute for lock to make it a recursive lock */
43 extern prehashtable_t pflookup; //Global Prefetch cache's lookup table
44 pthread_t wthreads[NUM_THREADS]; //Worker threads for working on the prefetch queue
45 pthread_t tPrefetch;            /* Primary Prefetch thread that processes the prefetch queue */
46 extern objstr_t *mainobjstore;
47 unsigned int myIpAddr;
48 unsigned int *hostIpAddrs;
49 int sizeOfHostArray;
50 int numHostsInSystem;
51 int myIndexInHostArray;
52 unsigned int oidsPerBlock;
53 unsigned int oidMin;
54 unsigned int oidMax;
55
56 sockPoolHashTable_t *transReadSockPool;
57 sockPoolHashTable_t *transPrefetchSockPool;
58 sockPoolHashTable_t *transRequestSockPool;
59 pthread_mutex_t notifymutex;
60 pthread_mutex_t atomicObjLock;
61
62 /***********************************
63  * Global Variables for statistics
64  **********************************/
65 int numTransCommit = 0;
66 int numTransAbort = 0;
67 int nchashSearch = 0;
68 int nmhashSearch = 0;
69 int nprehashSearch = 0;
70 int nRemoteSend = 0;
71 int nSoftAbort = 0;
72 int bytesSent = 0;
73 int bytesRecv = 0;
74 int totalObjSize = 0;
75 int sendRemoteReq = 0;
76 int getResponse = 0;
77
78 void printhex(unsigned char *, int);
79 plistnode_t *createPiles();
80 plistnode_t *sortPiles(plistnode_t *pileptr);
81
82 #ifdef LOGEVENTS
83 char bigarray[16*1024*1024];
84 int bigindex=0;
85 #define LOGEVENT(x) { \
86     int tmp=bigindex++;                         \
87     bigarray[tmp]=x;                            \
88   }
89 #else
90 #define LOGEVENT(x)
91 #endif
92
93 /*******************************
94 * Send and Recv function calls
95 *******************************/
96 void send_data(int fd, void *buf, int buflen) {
97   char *buffer = (char *)(buf);
98   int size = buflen;
99   int numbytes;
100   while (size > 0) {
101     numbytes = send(fd, buffer, size, MSG_NOSIGNAL);
102     bytesSent = bytesSent + numbytes;
103     if (numbytes == -1) {
104       perror("send");
105       exit(0);
106     }
107     buffer += numbytes;
108     size -= numbytes;
109   }
110 }
111
112 void recv_dataz(int fd, void *buf, int buflen) {
113   char *buffer = (char *)(buf);
114   int size = buflen;
115   int numbytes;
116   while (size > 0) {
117     numbytes = recv(fd, buffer, size, 0);
118     bytesRecv = bytesRecv + numbytes;
119     if (numbytes == -1) {
120       perror("recv");
121       exit(0);
122     }
123     buffer += numbytes;
124     size -= numbytes;
125   }
126 }
127
128 int recv_data_errorcodez(int fd, void *buf, int buflen) {
129   char *buffer = (char *)(buf);
130   int size = buflen;
131   int numbytes;
132   while (size > 0) {
133     numbytes = recv(fd, buffer, size, 0);
134     if (numbytes==0)
135       return 0;
136     if (numbytes == -1) {
137       perror("recv");
138       return -1;
139     }
140     buffer += numbytes;
141     size -= numbytes;
142   }
143   return 1;
144 }
145
146
147 void recv_data_buf(int fd, struct readstruct * readbuffer, void *buffer, int buflen) {
148   char *buf=(char *)buffer;
149   int numbytes=readbuffer->head-readbuffer->tail;
150   if (numbytes>buflen)
151     numbytes=buflen;
152   if (numbytes>0) {
153     memcpy(buf, &readbuffer->buf[readbuffer->tail], numbytes);
154     readbuffer->tail+=numbytes;
155     buflen-=numbytes;
156     buf+=numbytes;
157   }
158   if (buflen==0) {
159     return;
160   }
161   if (buflen>=MAXBUF) {
162     recv_dataz(fd, buf, buflen);
163     return;
164   }
165   
166   int maxbuf=MAXBUF;
167   int obufflen=buflen;
168   readbuffer->head=0;
169   
170   while (buflen > 0) {
171     int numbytes = recv(fd, &readbuffer->buf[readbuffer->head], maxbuf, 0);
172     if (numbytes == -1) {
173       perror("recv");
174       exit(0);
175     }
176     buflen-=numbytes;
177     readbuffer->head+=numbytes;
178     maxbuf-=numbytes;
179   }
180   memcpy(buf,readbuffer->buf,obufflen);
181   readbuffer->tail=obufflen;
182 }
183
184 int recv_data_errorcode_buf(int fd, struct readstruct * readbuffer, void *buffer, int buflen) {
185   char *buf=(char *)buffer;
186   //now tail<=head
187   int numbytes=readbuffer->head-readbuffer->tail;
188   if (numbytes>buflen)
189     numbytes=buflen;
190   if (numbytes>0) {
191     memcpy(buf, &readbuffer->buf[readbuffer->tail], numbytes);
192     readbuffer->tail+=numbytes;
193     buflen-=numbytes;
194     buf+=numbytes;
195   }
196   if (buflen==0)
197     return 1;
198
199   if (buflen>=MAXBUF) {
200     return recv_data_errorcodez(fd, buf, buflen);
201   }
202
203   int maxbuf=MAXBUF;
204   int obufflen=buflen;
205   readbuffer->head=0;
206   
207   while (buflen > 0) {
208     int numbytes = recv(fd, &readbuffer->buf[readbuffer->head], maxbuf, 0);
209     if (numbytes ==0) {
210       return 0;
211     }
212     if (numbytes==-1) {
213       perror("recvbuf");
214       return -1;
215     }
216     buflen-=numbytes;
217     readbuffer->head+=numbytes;
218     maxbuf-=numbytes;
219   }
220   memcpy(buf,readbuffer->buf,obufflen);
221   readbuffer->tail=obufflen;
222   return 1;
223 }
224
225
226 void recv_data(int fd, void *buf, int buflen) {
227   char *buffer = (char *)(buf);
228   int size = buflen;
229   int numbytes;
230   while (size > 0) {
231     numbytes = recv(fd, buffer, size, 0);
232     bytesRecv = bytesRecv + numbytes;
233     if (numbytes == -1) {
234       perror("recv");
235       exit(0);
236     }
237     buffer += numbytes;
238     size -= numbytes;
239   }
240 }
241
242 int recv_data_errorcode(int fd, void *buf, int buflen) {
243   char *buffer = (char *)(buf);
244   int size = buflen;
245   int numbytes;
246   while (size > 0) {
247     numbytes = recv(fd, buffer, size, 0);
248     if (numbytes==0)
249       return 0;
250     if (numbytes == -1) {
251       perror("recv");
252       return -1;
253     }
254     buffer += numbytes;
255     size -= numbytes;
256   }
257   return 1;
258 }
259
260 void printhex(unsigned char *ptr, int numBytes) {
261   int i;
262   for (i = 0; i < numBytes; i++) {
263     if (ptr[i] < 16)
264       printf("0%x ", ptr[i]);
265     else
266       printf("%x ", ptr[i]);
267   }
268   printf("\n");
269   return;
270 }
271
272 inline int arrayLength(int *array) {
273   int i;
274   for(i=0 ; array[i] != -1; i++)
275     ;
276   return i;
277 }
278
279 inline int findmax(int *array, int arraylength) {
280   int max, i;
281   max = array[0];
282   for(i = 0; i < arraylength; i++) {
283     if(array[i] > max) {
284       max = array[i];
285     }
286   }
287   return max;
288 }
289
290 /* This function is a prefetch call generated by the compiler that
291  * populates the shared primary prefetch queue*/
292 void prefetch(int siteid, int ntuples, unsigned int *oids, unsigned short *endoffsets, short *arrayfields) {
293   /* Allocate for the queue node*/
294   int qnodesize = 2*sizeof(int) + ntuples * (sizeof(unsigned short) + sizeof(unsigned int)) + endoffsets[ntuples - 1] * sizeof(short);
295   int len;
296 #ifdef INLINEPREFETCH
297   char node[qnodesize];
298 #else
299   char *node=getmemory(qnodesize);
300 #endif
301   int top=endoffsets[ntuples-1];
302
303   if (node==NULL) {
304     LOGEVENT('D');
305     return;
306   }
307   /* Set queue node values */
308
309   /* TODO: Remove this after testing */
310   evalPrefetch[siteid].callcount++;
311
312   *((int *)(node))=siteid;
313   *((int *)(node + sizeof(int))) = ntuples;
314   len = 2*sizeof(int);
315   memcpy(node+len, oids, ntuples*sizeof(unsigned int));
316   memcpy(node+len+ntuples*sizeof(unsigned int), endoffsets, ntuples*sizeof(unsigned short));
317   memcpy(node+len+ntuples*(sizeof(unsigned int)+sizeof(short)), arrayfields, top*sizeof(short));
318
319 #ifdef INLINEPREFETCH
320   prefetchpile_t *pilehead = foundLocal(node);
321
322   if (pilehead!=NULL) {
323     // Get sock from shared pool
324     
325     /* Send  Prefetch Request */
326     prefetchpile_t *ptr = pilehead;
327     while(ptr != NULL) {
328       int sd = getSock2(transPrefetchSockPool, ptr->mid);
329       sendPrefetchReq(ptr, sd);
330       ptr = ptr->next;
331     }
332     
333     /* Release socket */
334     //  freeSock(transPrefetchSockPool, pilehead->mid, sd);
335     
336     /* Deallocated pilehead */
337     mcdealloc(pilehead);
338   }
339 #else
340   /* Lock and insert into primary prefetch queue */
341   movehead(qnodesize);
342 #endif
343 }
344
345 /* This function starts up the transaction runtime. */
346 int dstmStartup(const char * option) {
347   pthread_t thread_Listen, udp_thread_Listen;
348   pthread_attr_t attr;
349   int master=option!=NULL && strcmp(option, "master")==0;
350   int fd;
351   int udpfd;
352
353   if (processConfigFile() != 0)
354     return 0; //TODO: return error value, cause main program to exit
355 #ifdef COMPILER
356   if (!master)
357     threadcount--;
358 #endif
359
360 #ifdef TRANSSTATS
361   printf("Trans stats is on\n");
362   fflush(stdout);
363 #endif
364 #ifdef ABORTREADERS
365   initreaderlist();
366 #endif
367
368   //Initialize socket pool
369   transReadSockPool = createSockPool(transReadSockPool, DEFAULTSOCKPOOLSIZE);
370   transPrefetchSockPool = createSockPool(transPrefetchSockPool, DEFAULTSOCKPOOLSIZE);
371   transRequestSockPool = createSockPool(transRequestSockPool, DEFAULTSOCKPOOLSIZE);
372
373   dstmInit();
374   transInit();
375
376   fd=startlistening();
377   pthread_attr_init(&attr);
378   pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
379 #ifdef CACHE
380   udpfd = udpInit();
381   pthread_create(&udp_thread_Listen, &attr, udpListenBroadcast, (void*)udpfd);
382 #endif
383   if (master) {
384     pthread_create(&thread_Listen, &attr, dstmListen, (void*)fd);
385     return 1;
386   } else {
387     dstmListen((void *)fd);
388     return 0;
389   }
390 }
391
392 //TODO Use this later
393 void *pCacheAlloc(objstr_t *store, unsigned int size) {
394   void *tmp;
395   objstr_t *ptr;
396   ptr = store;
397   int success = 0;
398
399   while(ptr->next != NULL) {
400     /* check if store is empty */
401     if(((unsigned int)ptr->top - (unsigned int)ptr - sizeof(objstr_t) + size) <= ptr->size) {
402       tmp = ptr->top;
403       ptr->top += size;
404       success = 1;
405       return tmp;
406     } else {
407       ptr = ptr->next;
408     }
409   }
410
411   if(success == 0) {
412     return NULL;
413   }
414 }
415
416 /* This function initiates the prefetch thread A queue is shared
417  * between the main thread of execution and the prefetch thread to
418  * process the prefetch call Call from compiler populates the shared
419  * queue with prefetch requests while prefetch thread processes the
420  * prefetch requests */
421
422 void transInit() {
423   //Create and initialize prefetch cache structure
424 #ifdef CACHE
425   initializePCache();
426   if((evalPrefetch = initPrefetchStats()) == NULL) {
427     printf("%s() Error allocating memory at %s, %d\n", __func__, __FILE__, __LINE__);
428     exit(0);
429   }
430 #endif
431
432   /* Initialize attributes for mutex */
433   pthread_mutexattr_init(&prefetchcache_mutex_attr);
434   pthread_mutexattr_settype(&prefetchcache_mutex_attr, PTHREAD_MUTEX_RECURSIVE_NP);
435
436   pthread_mutex_init(&prefetchcache_mutex, &prefetchcache_mutex_attr);
437   pthread_mutex_init(&notifymutex, NULL);
438   pthread_mutex_init(&atomicObjLock, NULL);
439 #ifdef CACHE
440   //Create prefetch cache lookup table
441   if(prehashCreate(PHASH_SIZE, PLOADFACTOR)) {
442     printf("ERROR\n");
443     return; //Failure
444   }
445
446   //Initialize primary shared queue
447   queueInit();
448   //Initialize machine pile w/prefetch oids and offsets shared queue
449   mcpileqInit();
450
451   //Create the primary prefetch thread
452   int retval;
453 #ifdef RANGEPREFETCH
454   do {
455     retval=pthread_create(&tPrefetch, NULL, transPrefetchNew, NULL);
456   } while(retval!=0);
457 #else
458 #ifndef INLINEPREFETCH
459   do {
460     retval=pthread_create(&tPrefetch, NULL, transPrefetch, NULL);
461   } while(retval!=0);
462 #endif
463 #endif
464   pthread_detach(tPrefetch);
465 #endif
466 }
467
468 /* This function stops the threads spawned */
469 void transExit() {
470 #ifdef CACHE
471   int t;
472   pthread_cancel(tPrefetch);
473   for(t = 0; t < NUM_THREADS; t++)
474     pthread_cancel(wthreads[t]);
475 #endif
476
477   return;
478 }
479
480 /* This functions inserts randowm wait delays in the order of msec
481  * Mostly used when transaction commits retry*/
482 void randomdelay() {
483   struct timespec req;
484   time_t t;
485
486   t = time(NULL);
487   req.tv_sec = 0;
488   req.tv_nsec = (long)(1000 + (t%10000)); //1-11 microsec
489   nanosleep(&req, NULL);
490   return;
491 }
492
493 /* This function initializes things required in the transaction start*/
494 void transStart() {
495   t_cache = objstrCreate(1048576);
496   t_chashCreate(CHASH_SIZE, CLOADFACTOR);
497   revertlist=NULL;
498 #ifdef ABORTREADERS
499   t_abort=0;
500 #endif
501 }
502
503 // Search for an address for a given oid                                                                               
504 /*#define INLINE    inline __attribute__((always_inline))
505
506 INLINE void * chashSearchI(chashtable_t *table, unsigned int key) {
507   //REMOVE HASH FUNCTION CALL TO MAKE SURE IT IS INLINED HERE                                                          
508   chashlistnode_t *node = &table->table[(key & table->mask)>>1];
509
510   do {
511     if(node->key == key) {
512       return node->val;
513     }
514     node = node->next;
515   } while(node != NULL);
516
517   return NULL;
518   }*/
519
520
521
522
523 /* This function finds the location of the objects involved in a transaction
524  * and returns the pointer to the object if found in a remote location */
525 __attribute__((pure)) objheader_t *transRead(unsigned int oid) {
526   unsigned int machinenumber;
527   objheader_t *tmp, *objheader;
528   objheader_t *objcopy;
529   int size;
530   void *buf;
531   chashlistnode_t *node;
532
533   if(oid == 0) {
534     return NULL;
535   }
536   
537
538   node= &c_table[(oid & c_mask)>>1];
539   do {
540     if(node->key == oid) {
541 #ifdef TRANSSTATS
542     nchashSearch++;
543 #endif
544 #ifdef COMPILER
545     return &((objheader_t*)node->val)[1];
546 #else
547     return node->val;
548 #endif
549     }
550     node = node->next;
551   } while(node != NULL);
552   
553
554   /*  
555   if((objheader = chashSearchI(record->lookupTable, oid)) != NULL) {
556 #ifdef TRANSSTATS
557     nchashSearch++;
558 #endif
559 #ifdef COMPILER
560     return &objheader[1];
561 #else
562     return objheader;
563 #endif
564   } else 
565   */
566
567 #ifdef ABORTREADERS
568   if (t_abort) {
569     //abort this transaction
570     //printf("ABORTING\n");
571     removetransactionhash();
572     objstrDelete(t_cache);
573     t_chashDelete();
574     _longjmp(aborttrans,1);
575   } else
576     addtransaction(oid);
577 #endif
578
579   if ((objheader = (objheader_t *) mhashSearch(oid)) != NULL) {
580 #ifdef TRANSSTATS
581     nmhashSearch++;
582 #endif
583     /* Look up in machine lookup table  and copy  into cache*/
584     GETSIZE(size, objheader);
585     size += sizeof(objheader_t);
586     objcopy = (objheader_t *) objstrAlloc(&t_cache, size);
587     memcpy(objcopy, objheader, size);
588     /* Insert into cache's lookup table */
589     STATUS(objcopy)=0;
590     t_chashInsert(OID(objheader), objcopy);
591 #ifdef COMPILER
592     return &objcopy[1];
593 #else
594     return objcopy;
595 #endif
596   } else {
597 #ifdef CACHE
598     if((tmp = (objheader_t *) prehashSearch(oid)) != NULL) {
599 #ifdef TRANSSTATS
600       nprehashSearch++;
601 #endif
602       /* Look up in prefetch cache */
603       GETSIZE(size, tmp);
604       size+=sizeof(objheader_t);
605       objcopy = (objheader_t *) objstrAlloc(&t_cache, size);
606       memcpy(objcopy, tmp, size);
607       /* Insert into cache's lookup table */
608       t_chashInsert(OID(tmp), objcopy);
609 #ifdef COMPILER
610       return &objcopy[1];
611 #else
612       return objcopy;
613 #endif
614     }
615 #endif
616     /* Get the object from the remote location */
617     if((machinenumber = lhashSearch(oid)) == 0) {
618       printf("Error: %s() No machine found for oid =% %s,%dx\n",__func__, machinenumber, __FILE__, __LINE__);
619       return NULL;
620     }
621     objcopy = getRemoteObj(machinenumber, oid);
622
623     if(objcopy == NULL) {
624       printf("Error: Object not found in Remote location %s, %d\n", __FILE__, __LINE__);
625       return NULL;
626     } else {
627 #ifdef TRANSSTATS
628       nRemoteSend++;
629 #endif
630 #ifdef COMPILER
631       return &objcopy[1];
632 #else
633       return objcopy;
634 #endif
635     }
636   }
637 }
638
639
640 /* This function finds the location of the objects involved in a transaction
641  * and returns the pointer to the object if found in a remote location */
642 __attribute__((pure)) objheader_t *transRead2(unsigned int oid) {
643   unsigned int machinenumber;
644   objheader_t *tmp, *objheader;
645   objheader_t *objcopy;
646   int size;
647
648 #ifdef ABORTREADERS
649   if (t_abort) {
650     //abort this transaction
651     //printf("ABORTING\n");
652     removetransactionhash();
653     objstrDelete(t_cache);
654     t_chashDelete();
655     _longjmp(aborttrans,1);
656   } else
657     addtransaction(oid);
658 #endif
659
660   if ((objheader = (objheader_t *) mhashSearch(oid)) != NULL) {
661 #ifdef TRANSSTATS
662     nmhashSearch++;
663 #endif
664     /* Look up in machine lookup table  and copy  into cache*/
665     GETSIZE(size, objheader);
666     size += sizeof(objheader_t);
667     objcopy = (objheader_t *) objstrAlloc(&t_cache, size);
668     memcpy(objcopy, objheader, size);
669     /* Insert into cache's lookup table */
670     STATUS(objcopy)=0;
671     t_chashInsert(OID(objheader), objcopy);
672 #ifdef COMPILER
673     return &objcopy[1];
674 #else
675     return objcopy;
676 #endif
677   } else {
678 #ifdef CACHE
679     if((tmp = (objheader_t *) prehashSearch(oid)) != NULL) {
680 #ifdef TRANSSTATS
681       LOGEVENT('P')
682       nprehashSearch++;
683 #endif
684       /* Look up in prefetch cache */
685       GETSIZE(size, tmp);
686       size+=sizeof(objheader_t);
687       objcopy = (objheader_t *) objstrAlloc(&t_cache, size);
688       memcpy(objcopy, tmp, size);
689       /* Insert into cache's lookup table */
690       t_chashInsert(OID(tmp), objcopy);
691 #ifdef COMPILER
692       return &objcopy[1];
693 #else
694       return objcopy;
695 #endif
696     }
697 #endif
698     /* Get the object from the remote location */
699     if((machinenumber = lhashSearch(oid)) == 0) {
700       printf("Error: %s() No machine found for oid =% %s,%dx\n",__func__, machinenumber, __FILE__, __LINE__);
701       return NULL;
702     }
703     objcopy = getRemoteObj(machinenumber, oid);
704
705     if(objcopy == NULL) {
706       printf("Error: Object not found in Remote location %s, %d\n", __FILE__, __LINE__);
707       return NULL;
708     } else {
709 #ifdef TRANSSTATS
710
711       LOGEVENT('R');
712       nRemoteSend++;
713 #endif
714 #ifdef COMPILER
715       return &objcopy[1];
716 #else
717       return objcopy;
718 #endif
719     }
720   }
721 }
722
723 /* This function creates objects in the transaction record */
724 objheader_t *transCreateObj(unsigned int size) {
725   objheader_t *tmp = (objheader_t *) objstrAlloc(&t_cache, (sizeof(objheader_t) + size));
726   OID(tmp) = getNewOID();
727   tmp->version = 1;
728   tmp->rcount = 1;
729   STATUS(tmp) = NEW;
730   t_chashInsert(OID(tmp), tmp);
731
732 #ifdef COMPILER
733   return &tmp[1]; //want space after object header
734 #else
735   return tmp;
736 #endif
737 }
738
739 #if 1
740 /* This function creates machine piles based on all machines involved in a
741  * transaction commit request */
742 plistnode_t *createPiles() {
743   int i;
744   plistnode_t *pile = NULL;
745   unsigned int machinenum;
746   objheader_t *headeraddr;
747   chashlistnode_t * ptr = c_table;
748   /* Represents number of bins in the chash table */
749   unsigned int size = c_size;
750
751   for(i = 0; i < size ; i++) {
752     chashlistnode_t * curr = &ptr[i];
753     /* Inner loop to traverse the linked list of the cache lookupTable */
754     while(curr != NULL) {
755       //if the first bin in hash table is empty
756       if(curr->key == 0)
757         break;
758       headeraddr=(objheader_t *) curr->val;
759
760       //Get machine location for object id (and whether local or not)
761       if (STATUS(headeraddr) & NEW || (mhashSearch(curr->key) != NULL)) {
762         machinenum = myIpAddr;
763       } else if ((machinenum = lhashSearch(curr->key)) == 0) {
764         printf("Error: No such machine %s, %d\n", __FILE__, __LINE__);
765         return NULL;
766       }
767
768       //Make machine groups
769       pile = pInsert(pile, headeraddr, machinenum, c_numelements);
770       curr = curr->next;
771     }
772   }
773   return pile;
774 }
775 #else
776 /* This function creates machine piles based on all machines involved in a
777  * transaction commit request */
778 plistnode_t *createPiles() {
779   int i;
780   plistnode_t *pile = NULL;
781   unsigned int machinenum;
782   objheader_t *headeraddr;
783   struct chashentry * ptr = c_table;
784   /* Represents number of bins in the chash table */
785   unsigned int size = c_size;
786
787   for(i = 0; i < size ; i++) {
788     struct chashentry * curr = & ptr[i];
789     /* Inner loop to traverse the linked list of the cache lookupTable */
790     //if the first bin in hash table is empty
791     if(curr->key == 0)
792       continue;
793     headeraddr=(objheader_t *) curr->ptr;
794
795     //Get machine location for object id (and whether local or not)
796     if (STATUS(headeraddr) & NEW || (mhashSearch(curr->key) != NULL)) {
797       machinenum = myIpAddr;
798     } else if ((machinenum = lhashSearch(curr->key)) == 0) {
799       printf("Error: No such machine %s, %d\n", __FILE__, __LINE__);
800       return NULL;
801     }
802
803     //Make machine groups
804     pile = pInsert(pile, headeraddr, machinenum, c_numelements);
805   }
806   return pile;
807 }
808 #endif
809
810 /* This function initiates the transaction commit process
811  * Spawns threads for each of the new connections with Participants
812  * and creates new piles by calling the createPiles(),
813  * Sends a transrequest() to each remote machines for objects found remotely
814  * and calls handleLocalReq() to process objects found locally */
815 int transCommit() {
816   unsigned int tot_bytes_mod, *listmid;
817   plistnode_t *pile, *pile_ptr;
818   char treplyretry; /* keeps track of the common response that needs to be sent */
819   int firsttime=1;
820   trans_commit_data_t transinfo; /* keeps track of objs locked during transaction */
821   char finalResponse;
822
823 #ifdef LOGEVENTS
824   int iii;
825   for(iii=0;iii<bigindex;iii++) {
826     printf("%c", bigarray[iii]);
827   }
828 #endif
829
830 #ifdef ABORTREADERS
831   if (t_abort) {
832     //abort this transaction
833     /* Debug
834      * printf("ABORTING TRANSACTION AT COMMIT\n");
835      */
836     removetransactionhash();
837     objstrDelete(t_cache);
838     t_chashDelete();
839     return 1;
840   }
841 #endif
842
843
844   do {
845     treplyretry = 0;
846
847     /* Look through all the objects in the transaction record and make piles
848      * for each machine involved in the transaction*/
849     if (firsttime) {
850       pile_ptr = pile = createPiles();
851       pile_ptr = pile = sortPiles(pile);
852     } else {
853       pile = pile_ptr;
854     }
855     firsttime = 0;
856     /* Create the packet to be sent in TRANS_REQUEST */
857
858     /* Count the number of participants */
859     int pilecount;
860     pilecount = pCount(pile);
861
862     /* Create a list of machine ids(Participants) involved in transaction   */
863     listmid = calloc(pilecount, sizeof(unsigned int));
864     pListMid(pile, listmid);
865
866     /* Create a socket and getReplyCtrl array, initialize */
867     int socklist[pilecount];
868     int loopcount;
869     for(loopcount = 0 ; loopcount < pilecount; loopcount++)
870       socklist[loopcount] = 0;
871     char getReplyCtrl[pilecount];
872     for(loopcount = 0 ; loopcount < pilecount; loopcount++)
873       getReplyCtrl[loopcount] = 0;
874
875     /* Process each machine pile */
876     int sockindex = 0;
877     trans_req_data_t *tosend;
878     tosend = calloc(pilecount, sizeof(trans_req_data_t));
879     while(pile != NULL) {
880       tosend[sockindex].f.control = TRANS_REQUEST;
881       tosend[sockindex].f.mcount = pilecount;
882       tosend[sockindex].f.numread = pile->numread;
883       tosend[sockindex].f.nummod = pile->nummod;
884       tosend[sockindex].f.numcreated = pile->numcreated;
885       tosend[sockindex].f.sum_bytes = pile->sum_bytes;
886       tosend[sockindex].listmid = listmid;
887       tosend[sockindex].objread = pile->objread;
888       tosend[sockindex].oidmod = pile->oidmod;
889       tosend[sockindex].oidcreated = pile->oidcreated;
890       int sd = 0;
891       if(pile->mid != myIpAddr) {
892         if((sd = getSock2WithLock(transRequestSockPool, pile->mid)) < 0) {
893           printf("transRequest(): socket create error\n");
894           free(listmid);
895           free(tosend);
896           return 1;
897         }
898         socklist[sockindex] = sd;
899         /* Send bytes of data with TRANS_REQUEST control message */
900         send_data(sd, &(tosend[sockindex].f), sizeof(fixed_data_t));
901
902         /* Send list of machines involved in the transaction */
903         {
904           int size=sizeof(unsigned int)*(tosend[sockindex].f.mcount);
905           send_data(sd, tosend[sockindex].listmid, size);
906         }
907
908         /* Send oids and version number tuples for objects that are read */
909         {
910           int size=(sizeof(unsigned int)+sizeof(unsigned short))*(tosend[sockindex].f.numread);
911           send_data(sd, tosend[sockindex].objread, size);
912         }
913
914         /* Send objects that are modified */
915         void *modptr;
916         if((modptr = calloc(1, tosend[sockindex].f.sum_bytes)) == NULL) {
917           printf("Calloc error for modified objects %s, %d\n", __FILE__, __LINE__);
918           free(listmid);
919           free(tosend);
920           return 1;
921         }
922         int offset = 0;
923         int i;
924         for(i = 0; i < tosend[sockindex].f.nummod ; i++) {
925           int size;
926           objheader_t *headeraddr;
927           if((headeraddr = t_chashSearch(tosend[sockindex].oidmod[i])) == NULL) {
928             printf("%s() Error: No such oid %s, %d\n", __func__, __FILE__, __LINE__);
929             free(modptr);
930             free(listmid);
931             free(tosend);
932             return 1;
933           }
934           GETSIZE(size,headeraddr);
935           size+=sizeof(objheader_t);
936           memcpy(modptr+offset, headeraddr, size);
937           offset+=size;
938         }
939         send_data(sd, modptr, tosend[sockindex].f.sum_bytes);
940         free(modptr);
941       } else { //handle request locally
942         handleLocalReq(&tosend[sockindex], &transinfo, &getReplyCtrl[sockindex]);
943       }
944       sockindex++;
945       pile = pile->next;
946     } //end of pile processing
947       /* Recv Ctrl msgs from all machines */
948     int i;
949     for(i = 0; i < pilecount; i++) {
950       int sd = socklist[i];
951       if(sd != 0) {
952         char control;
953         recv_data(sd, &control, sizeof(char));
954         //Update common data structure with new ctrl msg
955         getReplyCtrl[i] = control;
956         /* Recv Objects if participant sends TRANS_DISAGREE */
957 #ifdef CACHE
958         if(control == TRANS_DISAGREE) {
959           int length;
960           recv_data(sd, &length, sizeof(int));
961           void *newAddr;
962           pthread_mutex_lock(&prefetchcache_mutex);
963           if ((newAddr = prefetchobjstrAlloc((unsigned int)length)) == NULL) {
964             printf("Error: %s() objstrAlloc error for copying into prefetch cache %s, %d\n", __func__, __FILE__, __LINE__);
965             free(tosend);
966             free(listmid);
967             pthread_mutex_unlock(&prefetchcache_mutex);
968             return 1;
969           }
970           pthread_mutex_unlock(&prefetchcache_mutex);
971           recv_data(sd, newAddr, length);
972           int offset = 0;
973           while(length != 0) {
974             unsigned int oidToPrefetch;
975             objheader_t * header;
976             header = (objheader_t *)(((char *)newAddr) + offset);
977             oidToPrefetch = OID(header);
978             STATUS(header)=0;
979             int size = 0;
980             GETSIZE(size, header);
981             size += sizeof(objheader_t);
982             //make an entry in prefetch hash table
983             void *oldptr;
984             if((oldptr = prehashSearch(oidToPrefetch)) != NULL) {
985               prehashRemove(oidToPrefetch);
986               prehashInsert(oidToPrefetch, header);
987             } else {
988               prehashInsert(oidToPrefetch, header);
989             }
990             length = length - size;
991             offset += size;
992           }
993         } //end of receiving objs
994 #endif
995       }
996     }
997     /* Decide the final response */
998     if((finalResponse = decideResponse(getReplyCtrl, &treplyretry, pilecount)) == 0) {
999       printf("Error: %s() in updating prefetch cache %s, %d\n", __func__, __FILE__, __LINE__);
1000       free(tosend);
1001       free(listmid);
1002       return 1;
1003     }
1004
1005     /* Send responses to all machines */
1006     for(i = 0; i < pilecount; i++) {
1007       int sd = socklist[i];
1008       if(sd != 0) {
1009 #ifdef CACHE
1010         if(finalResponse == TRANS_COMMIT) {
1011           int retval;
1012           /* Update prefetch cache */
1013           if((retval = updatePrefetchCache(&(tosend[i]))) != 0) {
1014             printf("Error: %s() in updating prefetch cache %s, %d\n", __func__, __FILE__, __LINE__);
1015             free(tosend);
1016             free(listmid);
1017             return 1;
1018           }
1019
1020
1021           /* Invalidate objects in other machine cache */
1022           if(tosend[i].f.nummod > 0) {
1023             if((retval = invalidateObj(&(tosend[i]))) != 0) {
1024               printf("Error: %s() in invalidating Objects %s, %d\n", __func__, __FILE__, __LINE__);
1025               free(tosend);
1026               free(listmid);
1027               return 1;
1028             }
1029           }
1030 #ifdef ABORTREADERS
1031           removetransaction(tosend[i].oidmod,tosend[i].f.nummod);
1032           removethisreadtransaction(tosend[i].objread, tosend[i].f.numread);
1033 #endif
1034         }
1035 #ifdef ABORTREADERS
1036         else if (!treplyretry) {
1037           removethistransaction(tosend[i].oidmod,tosend[i].f.nummod);
1038           removethisreadtransaction(tosend[i].objread,tosend[i].f.numread);
1039         }
1040 #endif
1041 #endif
1042         send_data(sd, &finalResponse, sizeof(char));
1043       } else {
1044         /* Complete local processing */
1045         doLocalProcess(finalResponse, &(tosend[i]), &transinfo);
1046 #ifdef ABORTREADERS
1047         if(finalResponse == TRANS_COMMIT) {
1048           removetransaction(tosend[i].oidmod,tosend[i].f.nummod);
1049           removethisreadtransaction(tosend[i].objread,tosend[i].f.numread);
1050         } else if (!treplyretry) {
1051           removethistransaction(tosend[i].oidmod,tosend[i].f.nummod);
1052           removethisreadtransaction(tosend[i].objread,tosend[i].f.numread);
1053         }
1054 #endif
1055       }
1056     }
1057
1058     /* Free resources */
1059     free(tosend);
1060     free(listmid);
1061     if (!treplyretry)
1062       pDelete(pile_ptr);
1063     /* wait a random amount of time before retrying to commit transaction*/
1064     if(treplyretry) {
1065       randomdelay();
1066 #ifdef TRANSSTATS
1067       nSoftAbort++;
1068 #endif
1069     }
1070     /* Retry trans commit procedure during soft_abort case */
1071   } while (treplyretry);
1072
1073   if(finalResponse == TRANS_ABORT) {
1074     //printf("Aborting trans\n");
1075 #ifdef TRANSSTATS
1076     numTransAbort++;
1077 #endif
1078     /* Free Resources */
1079     objstrDelete(t_cache);
1080     t_chashDelete();
1081     return TRANS_ABORT;
1082   } else if(finalResponse == TRANS_COMMIT) {
1083 #ifdef TRANSSTATS
1084     numTransCommit++;
1085 #endif
1086     /* Free Resources */
1087     objstrDelete(t_cache);
1088     t_chashDelete();
1089     return 0;
1090   } else {
1091     //TODO Add other cases
1092     printf("Error: in %s() THIS SHOULD NOT HAPPEN.....EXIT PROGRAM\n", __func__);
1093     exit(-1);
1094   }
1095   return 0;
1096 }
1097
1098 /* This function handles the local objects involved in a transaction
1099  * commiting process.  It also makes a decision if this local machine
1100  * sends AGREE or DISAGREE or SOFT_ABORT to coordinator */
1101 void handleLocalReq(trans_req_data_t *tdata, trans_commit_data_t *transinfo, char *getReplyCtrl) {
1102   unsigned int *oidnotfound = NULL, *oidlocked = NULL;
1103   int numoidnotfound = 0, numoidlocked = 0;
1104   int v_nomatch = 0, v_matchlock = 0, v_matchnolock = 0;
1105   int numread, i;
1106   unsigned int oid;
1107   unsigned short version;
1108
1109   /* Counters and arrays to formulate decision on control message to be sent */
1110   oidnotfound = (unsigned int *) calloc((tdata->f.numread + tdata->f.nummod), sizeof(unsigned int));
1111   oidlocked = (unsigned int *) calloc((tdata->f.numread + tdata->f.nummod +1), sizeof(unsigned int)); // calloc additional 1 byte for
1112   //setting a divider between read and write locks
1113   numread = tdata->f.numread;
1114   /* Process each oid in the machine pile/ group per thread */
1115   for (i = 0; i < tdata->f.numread + tdata->f.nummod; i++) {
1116     if (i < tdata->f.numread) {
1117       int incr = sizeof(unsigned int) + sizeof(unsigned short); // Offset that points to next position in the objread array
1118       incr *= i;
1119       oid = *((unsigned int *)(((char *)tdata->objread) + incr));
1120       version = *((unsigned short *)(((char *)tdata->objread) + incr + sizeof(unsigned int)));
1121       commitCountForObjRead(getReplyCtrl, oidnotfound, oidlocked, &numoidnotfound, &numoidlocked, &v_nomatch, &v_matchlock, &v_matchnolock, oid, version);
1122     } else { // Objects Modified
1123       if(i == tdata->f.numread) {
1124         oidlocked[numoidlocked++] = -1;
1125       }
1126       int tmpsize;
1127       objheader_t *headptr;
1128       headptr = (objheader_t *) t_chashSearch(tdata->oidmod[i-numread]);
1129       if (headptr == NULL) {
1130         printf("Error: handleLocalReq() returning NULL, no such oid %s, %d\n", __FILE__, __LINE__);
1131         return;
1132       }
1133       oid = OID(headptr);
1134       version = headptr->version;
1135       commitCountForObjMod(getReplyCtrl, oidnotfound, oidlocked, &numoidnotfound, &numoidlocked, &v_nomatch, &v_matchlock, &v_matchnolock, oid, version);
1136     }
1137   }
1138
1139 /* Fill out the trans_commit_data_t data structure. This is required for a trans commit process
1140  * if Participant receives a TRANS_COMMIT */
1141   transinfo->objlocked = oidlocked;
1142   transinfo->objnotfound = oidnotfound;
1143   transinfo->modptr = NULL;
1144   transinfo->numlocked = numoidlocked;
1145   transinfo->numnotfound = numoidnotfound;
1146
1147   /* Condition to send TRANS_AGREE */
1148   if(v_matchnolock == tdata->f.numread + tdata->f.nummod) {
1149     *getReplyCtrl = TRANS_AGREE;
1150   }
1151   /* Condition to send TRANS_SOFT_ABORT */
1152   if((v_matchlock > 0 && v_nomatch == 0) || (numoidnotfound > 0 && v_nomatch == 0)) {
1153     *getReplyCtrl = TRANS_SOFT_ABORT;
1154   }
1155 }
1156
1157 void doLocalProcess(char finalResponse, trans_req_data_t *tdata, trans_commit_data_t *transinfo) {
1158   if(finalResponse == TRANS_ABORT) {
1159     if(transAbortProcess(transinfo) != 0) {
1160       printf("Error in transAbortProcess() %s,%d\n", __FILE__, __LINE__);
1161       fflush(stdout);
1162       return;
1163     }
1164   } else if(finalResponse == TRANS_COMMIT) {
1165 #ifdef CACHE
1166     /* Invalidate objects in other machine cache */
1167     if(tdata->f.nummod > 0) {
1168       int retval;
1169       if((retval = invalidateObj(tdata)) != 0) {
1170         printf("Error: %s() in invalidating Objects %s, %d\n", __func__, __FILE__, __LINE__);
1171         return;
1172       }
1173     }
1174 #endif
1175     if(transComProcess(tdata, transinfo) != 0) {
1176       printf("Error in transComProcess() %s,%d\n", __FILE__, __LINE__);
1177       fflush(stdout);
1178       return;
1179     }
1180   } else {
1181     printf("ERROR...No Decision\n");
1182   }
1183
1184   /* Free memory */
1185   if (transinfo->objlocked != NULL) {
1186     free(transinfo->objlocked);
1187   }
1188   if (transinfo->objnotfound != NULL) {
1189     free(transinfo->objnotfound);
1190   }
1191 }
1192
1193 /* This function decides the reponse that needs to be sent to
1194  * all Participant machines after the TRANS_REQUEST protocol */
1195 char decideResponse(char *getReplyCtrl, char *treplyretry, int pilecount) {
1196   int i, transagree = 0, transdisagree = 0, transsoftabort = 0; /* Counters to formulate decision of what
1197                                                                    message to send */
1198   for (i = 0 ; i < pilecount; i++) {
1199     char control;
1200     control = getReplyCtrl[i];
1201     switch(control) {
1202     default:
1203       printf("Participant sent unknown message in %s, %d\n", __FILE__, __LINE__);
1204
1205       /* treat as disagree, pass thru */
1206     case TRANS_DISAGREE:
1207       transdisagree++;
1208       break;
1209
1210     case TRANS_AGREE:
1211       transagree++;
1212       break;
1213
1214     case TRANS_SOFT_ABORT:
1215       transsoftabort++;
1216       break;
1217     }
1218   }
1219
1220   if(transdisagree > 0) {
1221     /* Send Abort */
1222     *treplyretry = 0;
1223     return TRANS_ABORT;
1224 #ifdef CACHE
1225     /* clear objects from prefetch cache */
1226     cleanPCache();
1227 #endif
1228   } else if(transagree == pilecount) {
1229     /* Send Commit */
1230     *treplyretry = 0;
1231     return TRANS_COMMIT;
1232   } else {
1233     /* Send Abort in soft abort case followed by retry commiting transaction again*/
1234     *treplyretry = 1;
1235     return TRANS_ABORT;
1236   }
1237   return 0;
1238 }
1239
1240 /* This function opens a connection, places an object read request to
1241  * the remote machine, reads the control message and object if
1242  * available and copies the object and its header to the local
1243  * cache. */
1244
1245 void *getRemoteObj(unsigned int mnum, unsigned int oid) {
1246   int size, val;
1247   struct sockaddr_in serv_addr;
1248   char machineip[16];
1249   char control;
1250   objheader_t *h;
1251   void *objcopy = NULL;
1252
1253   int sd = getSock2(transReadSockPool, mnum);
1254   char readrequest[sizeof(char)+sizeof(unsigned int)];
1255   readrequest[0] = READ_REQUEST;
1256   *((unsigned int *)(&readrequest[1])) = oid;
1257   send_data(sd, readrequest, sizeof(readrequest));
1258
1259   /* Read response from the Participant */
1260   recv_data(sd, &control, sizeof(char));
1261
1262   if (control==OBJECT_NOT_FOUND) {
1263     objcopy = NULL;
1264   } else {
1265     /* Read object if found into local cache */
1266     recv_data(sd, &size, sizeof(int));
1267     objcopy = objstrAlloc(&t_cache, size);
1268     recv_data(sd, objcopy, size);
1269     STATUS(objcopy)=0;
1270     /* Insert into cache's lookup table */
1271     t_chashInsert(oid, objcopy);
1272 #ifdef TRANSSTATS
1273     totalObjSize += size;
1274 #endif
1275   }
1276
1277   return objcopy;
1278 }
1279
1280 /*  Commit info for objects modified */
1281 void commitCountForObjMod(char *getReplyCtrl, unsigned int *oidnotfound, unsigned int *oidlocked, int *numoidnotfound,
1282                           int *numoidlocked, int *v_nomatch, int *v_matchlock, int *v_matchnolock, unsigned int oid, unsigned short version) {
1283   void *mobj;
1284   /* Check if object is still present in the machine since the beginning of TRANS_REQUEST */
1285   /* Save the oids not found and number of oids not found for later use */
1286   if ((mobj = mhashSearch(oid)) == NULL) { /* Obj not found */
1287     /* Save the oids not found and number of oids not found for later use */
1288     oidnotfound[*numoidnotfound] = oid;
1289     (*numoidnotfound)++;
1290   } else { /* If Obj found in machine (i.e. has not moved) */
1291     /* Check if Obj is locked by any previous transaction */
1292     if (write_trylock(STATUSPTR(mobj))) { // Can acquire write lock
1293       if (version == ((objheader_t *)mobj)->version) {      /* match versions */
1294         (*v_matchnolock)++;
1295         //Keep track of what is locked
1296         oidlocked[(*numoidlocked)++] = OID(((objheader_t *)mobj));
1297       } else { /* If versions don't match ...HARD ABORT */
1298         (*v_nomatch)++;
1299         /* Send TRANS_DISAGREE to Coordinator */
1300         *getReplyCtrl = TRANS_DISAGREE;
1301
1302         //Keep track of what is locked
1303         oidlocked[(*numoidlocked)++] = OID(((objheader_t *)mobj));
1304         //printf("%s() oid = %d, type = %d\t", __func__, OID(mobj), TYPE((objheader_t *)mobj));
1305         return;
1306       }
1307     } else { //A lock is acquired some place else
1308       if (version == ((objheader_t *)mobj)->version) { /* Check if versions match */
1309         (*v_matchlock)++;
1310       } else { /* If versions don't match ...HARD ABORT */
1311         (*v_nomatch)++;
1312         /* Send TRANS_DISAGREE to Coordinator */
1313         *getReplyCtrl = TRANS_DISAGREE;
1314         //printf("%s() oid = %d, type = %d\t", __func__, OID(mobj), TYPE((objheader_t *)mobj));
1315         return;
1316       }
1317     }
1318   }
1319 }
1320
1321 /*  Commit info for objects modified */
1322 void commitCountForObjRead(char *getReplyCtrl, unsigned int *oidnotfound, unsigned int *oidlocked, int *numoidnotfound,
1323                            int *numoidlocked, int *v_nomatch, int *v_matchlock, int *v_matchnolock, unsigned int oid, unsigned short version) {
1324   void *mobj;
1325   /* Check if object is still present in the machine since the beginning of TRANS_REQUEST */
1326   /* Save the oids not found and number of oids not found for later use */
1327   if ((mobj = mhashSearch(oid)) == NULL) { /* Obj not found */
1328     /* Save the oids not found and number of oids not found for later use */
1329     oidnotfound[*numoidnotfound] = oid;
1330     (*numoidnotfound)++;
1331   } else { /* If Obj found in machine (i.e. has not moved) */
1332     /* Check if Obj is locked by any previous transaction */
1333     if (read_trylock(STATUSPTR(mobj))) { // Can further acquire read locks
1334       if (version == ((objheader_t *)mobj)->version) {      /* If locked then match versions */
1335         (*v_matchnolock)++;
1336         //Keep track of what is locked
1337         oidlocked[(*numoidlocked)++] = OID(((objheader_t *)mobj));
1338       } else { /* If versions don't match ...HARD ABORT */
1339         (*v_nomatch)++;
1340         /* Send TRANS_DISAGREE to Coordinator */
1341         *getReplyCtrl = TRANS_DISAGREE;
1342         //Keep track of what is locked
1343         oidlocked[(*numoidlocked)++] = OID(((objheader_t *)mobj));
1344         //printf("%s() oid = %d, type = %d\t", __func__, OID(mobj), TYPE((objheader_t *)mobj));
1345         return;
1346       }
1347     } else { //Has reached max number of readers or some other transaction
1348       //has acquired a lock on this object
1349       if (version == ((objheader_t *)mobj)->version) { /* Check if versions match */
1350         (*v_matchlock)++;
1351       } else { /* If versions don't match ...HARD ABORT */
1352         (*v_nomatch)++;
1353         /* Send TRANS_DISAGREE to Coordinator */
1354         *getReplyCtrl = TRANS_DISAGREE;
1355         //printf("%s() oid = %d, type = %d\t", __func__, OID(mobj), TYPE((objheader_t *)mobj));
1356         return;
1357       }
1358     }
1359   }
1360 }
1361
1362 /* This function completes the ABORT process if the transaction is aborting */
1363 int transAbortProcess(trans_commit_data_t *transinfo) {
1364   int i, numlocked;
1365   unsigned int *objlocked;
1366   void *header;
1367
1368   numlocked = transinfo->numlocked;
1369   objlocked = transinfo->objlocked;
1370
1371   int useWriteUnlock = 0;
1372   for (i = 0; i < numlocked; i++) {
1373     if(objlocked[i] == -1) {
1374       useWriteUnlock = 1;
1375       continue;
1376     }
1377     if((header = mhashSearch(objlocked[i])) == NULL) {
1378       printf("mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__);
1379       return 1;
1380     }
1381     if(!useWriteUnlock) {
1382       read_unlock(STATUSPTR(header));
1383     } else {
1384       write_unlock(STATUSPTR(header));
1385     }
1386   }
1387
1388   return 0;
1389 }
1390
1391 /*This function completes the COMMIT process if the transaction is commiting*/
1392 int transComProcess(trans_req_data_t *tdata, trans_commit_data_t *transinfo) {
1393   objheader_t *header, *tcptr;
1394   int i, nummod, tmpsize, numcreated, numlocked;
1395   unsigned int *oidmod, *oidcreated, *oidlocked;
1396   void *ptrcreate;
1397
1398   nummod = tdata->f.nummod;
1399   oidmod = tdata->oidmod;
1400   numcreated = tdata->f.numcreated;
1401   oidcreated = tdata->oidcreated;
1402   numlocked = transinfo->numlocked;
1403   oidlocked = transinfo->objlocked;
1404
1405   for (i = 0; i < nummod; i++) {
1406     if((header = (objheader_t *) mhashSearch(oidmod[i])) == NULL) {
1407       printf("Error: transComProcess() mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__);
1408       return 1;
1409     }
1410     /* Copy from transaction cache -> main object store */
1411     if ((tcptr = ((objheader_t *) t_chashSearch(oidmod[i]))) == NULL) {
1412       printf("Error: transComProcess() chashSearch returned NULL at %s, %d\n", __FILE__, __LINE__);
1413       return 1;
1414     }
1415     GETSIZE(tmpsize, header);
1416     char *tmptcptr = (char *) tcptr;
1417     {
1418       struct ___Object___ *dst=(struct ___Object___*)((char*)header+sizeof(objheader_t));
1419       struct ___Object___ *src=(struct ___Object___*)((char*)tmptcptr+sizeof(objheader_t));
1420       dst->___cachedCode___=src->___cachedCode___;
1421       dst->___cachedHash___=src->___cachedHash___;
1422
1423       memcpy(&dst[1], &src[1], tmpsize-sizeof(struct ___Object___));
1424     }
1425
1426     header->version += 1;
1427     if(header->notifylist != NULL) {
1428       notifyAll(&header->notifylist, OID(header), header->version);
1429     }
1430   }
1431   /* If object is newly created inside transaction then commit it */
1432   for (i = 0; i < numcreated; i++) {
1433     if ((header = ((objheader_t *) t_chashSearch(oidcreated[i]))) == NULL) {
1434       printf("Error: transComProcess() chashSearch returned NULL for oid = %x at %s, %d\n", oidcreated[i], __FILE__, __LINE__);
1435       return 1;
1436     }
1437     GETSIZE(tmpsize, header);
1438     tmpsize += sizeof(objheader_t);
1439     pthread_mutex_lock(&mainobjstore_mutex);
1440     if ((ptrcreate = objstrAlloc(&mainobjstore, tmpsize)) == NULL) {
1441       printf("Error: transComProcess() failed objstrAlloc %s, %d\n", __FILE__, __LINE__);
1442       pthread_mutex_unlock(&mainobjstore_mutex);
1443       return 1;
1444     }
1445     pthread_mutex_unlock(&mainobjstore_mutex);
1446     /* Initialize read and write locks */
1447     initdsmlocks(STATUSPTR(header));
1448     memcpy(ptrcreate, header, tmpsize);
1449     mhashInsert(oidcreated[i], ptrcreate);
1450     lhashInsert(oidcreated[i], myIpAddr);
1451   }
1452   /* Unlock locked objects */
1453   int useWriteUnlock = 0;
1454   for(i = 0; i < numlocked; i++) {
1455     if(oidlocked[i] == -1) {
1456       useWriteUnlock = 1;
1457       continue;
1458     }
1459     if((header = (objheader_t *) mhashSearch(oidlocked[i])) == NULL) {
1460       printf("mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__);
1461       return 1;
1462     }
1463     if(!useWriteUnlock) {
1464       read_unlock(STATUSPTR(header));
1465     } else {
1466       write_unlock(STATUSPTR(header));
1467     }
1468   }
1469   return 0;
1470 }
1471
1472 prefetchpile_t *foundLocal(char *ptr) {
1473   int siteid = *(GET_SITEID(ptr));
1474   int ntuples = *(GET_NTUPLES(ptr));
1475   unsigned int * oidarray = GET_PTR_OID(ptr);
1476   unsigned short * endoffsets = GET_PTR_EOFF(ptr, ntuples);
1477   short * arryfields = GET_PTR_ARRYFLD(ptr, ntuples);
1478   prefetchpile_t * head=NULL;
1479   int numLocal = 0;
1480
1481   int i;
1482   for(i=0; i<ntuples; i++) {
1483     unsigned short baseindex=(i==0) ? 0 : endoffsets[i-1];
1484     unsigned short endindex=endoffsets[i];
1485     unsigned int oid=oidarray[i];
1486     int newbase;
1487     int machinenum;
1488
1489     if (oid==0)
1490       continue;
1491     //Look up fields locally
1492     for(newbase=baseindex; newbase<endindex; newbase++) {
1493       if (!lookupObject(&oid, arryfields[newbase]))
1494         break;
1495       //Ended in a null pointer...
1496       if (oid==0)
1497         goto tuple;
1498     }
1499     //Entire prefetch is local
1500     if (newbase==endindex&&checkoid(oid)) {
1501       numLocal++;
1502       goto tuple;
1503     }
1504     //Add to remote requests
1505     machinenum=lhashSearch(oid);
1506     insertPile(machinenum, oid, endindex-newbase, &arryfields[newbase], &head);
1507 tuple:
1508     ;
1509   }
1510
1511   /* handle dynamic prefetching */
1512   handleDynPrefetching(numLocal, ntuples, siteid);
1513   return head;
1514 }
1515
1516 int checkoid(unsigned int oid) {
1517   objheader_t *header;
1518   if ((header=mhashSearch(oid))!=NULL) {
1519     //Found on machine
1520     return 1;
1521   } else if ((header=prehashSearch(oid))!=NULL) {
1522     //Found in cache
1523     return 1;
1524   } else {
1525     return 0;
1526   }
1527 }
1528
1529 int lookupObject(unsigned int * oid, short offset) {
1530   objheader_t *header;
1531   if ((header=mhashSearch(*oid))!=NULL) {
1532     //Found on machine
1533     ;
1534   } else if ((header=prehashSearch(*oid))!=NULL) {
1535     //Found in cache
1536     ;
1537   } else {
1538     return 0;
1539   }
1540
1541   if(TYPE(header) >= NUMCLASSES) {
1542     int elementsize = classsize[TYPE(header)];
1543     struct ArrayObject *ao = (struct ArrayObject *) (((char *)header) + sizeof(objheader_t));
1544     int length = ao->___length___;
1545     /* Check if array out of bounds */
1546     if(offset < 0 || offset >= length) {
1547       //if yes treat the object as found
1548       (*oid)=0;
1549       return 1;
1550     }
1551     (*oid) = *((unsigned int *)(((char *)ao) + sizeof(struct ArrayObject) + (elementsize*offset)));
1552     return 1;
1553   } else {
1554     (*oid) = *((unsigned int *)(((char *)header) + sizeof(objheader_t) + offset));
1555     return 1;
1556   }
1557 }
1558
1559
1560 /* This function is called by the thread calling transPrefetch */
1561 void *transPrefetch(void *t) {
1562   while(1) {
1563     /* read from prefetch queue */
1564     void *node=gettail();
1565     /* Check if the tuples are found locally, if yes then reduce them further*/
1566     /* and group requests by remote machine ids by calling the makePreGroups() */
1567     prefetchpile_t *pilehead = foundLocal(node);
1568
1569     if (pilehead!=NULL) {
1570       // Get sock from shared pool
1571
1572       /* Send  Prefetch Request */
1573       prefetchpile_t *ptr = pilehead;
1574       while(ptr != NULL) {
1575         int sd = getSock2(transPrefetchSockPool, ptr->mid);
1576         sendPrefetchReq(ptr, sd);
1577         ptr = ptr->next;
1578       }
1579
1580       /* Release socket */
1581       //        freeSock(transPrefetchSockPool, pilehead->mid, sd);
1582
1583       /* Deallocated pilehead */
1584       mcdealloc(pilehead);
1585     }
1586     // Deallocate the prefetch queue pile node
1587     inctail();
1588   }
1589 }
1590
1591 void sendPrefetchReqnew(prefetchpile_t *mcpilenode, int sd) {
1592   objpile_t *tmp;
1593
1594   int size=sizeof(char)+sizeof(int);
1595   for(tmp=mcpilenode->objpiles; tmp!=NULL; tmp=tmp->next) {
1596     size += sizeof(int) + sizeof(unsigned int) + sizeof(unsigned int) + ((tmp->numoffset) * sizeof(short));
1597   }
1598
1599   char buft[size];
1600   char *buf=buft;
1601   *buf=TRANS_PREFETCH;
1602   buf+=sizeof(char);
1603
1604   for(tmp=mcpilenode->objpiles; tmp!=NULL; tmp=tmp->next) {
1605     int len = sizeof(int) + sizeof(unsigned int) + sizeof(unsigned int) + ((tmp->numoffset) * sizeof(short));
1606     *((int*)buf)=len;
1607     buf+=sizeof(int);
1608     *((unsigned int *)buf)=tmp->oid;
1609     buf+=sizeof(unsigned int);
1610     *((unsigned int *)(buf)) = myIpAddr;
1611     buf+=sizeof(unsigned int);
1612     memcpy(buf, tmp->offset, tmp->numoffset*sizeof(short));
1613     buf+=tmp->numoffset*sizeof(short);
1614   }
1615   *((int *)buf)=-1;
1616   send_data(sd, buft, size);
1617   return;
1618 }
1619
1620 void sendPrefetchReq(prefetchpile_t *mcpilenode, int sd) {
1621   int len, endpair;
1622   char control;
1623   objpile_t *tmp;
1624
1625   /* Send TRANS_PREFETCH control message */
1626   int first=1;
1627
1628   /* Send Oids and offsets in pairs */
1629   tmp = mcpilenode->objpiles;
1630   while(tmp != NULL) {
1631     len = sizeof(int) + sizeof(unsigned int) + sizeof(unsigned int) + ((tmp->numoffset) * sizeof(short));
1632     char oidnoffset[len+5];
1633     char *buf=oidnoffset;
1634     if (first) {
1635       *buf=TRANS_PREFETCH;
1636       buf++;len++;
1637       first=0;
1638     }
1639     *((int*)buf) = tmp->numoffset;
1640     buf+=sizeof(int);
1641     *((unsigned int *)buf) = tmp->oid;
1642 #ifdef TRANSSTATS
1643     sendRemoteReq++;
1644 #endif
1645     buf+=sizeof(unsigned int);
1646     *((unsigned int *)buf) = myIpAddr;
1647     buf += sizeof(unsigned int);
1648     memcpy(buf, tmp->offset, (tmp->numoffset)*sizeof(short));
1649     tmp = tmp->next;
1650     if (tmp==NULL) {
1651       *((int *)(&oidnoffset[len]))=-1;
1652       len+=sizeof(int);
1653     }
1654     send_data(sd, oidnoffset, len);
1655   }
1656
1657   LOGEVENT('S');
1658   return;
1659 }
1660
1661 int getPrefetchResponse(int sd, struct readstruct *readbuffer) {
1662   int length = 0, size = 0;
1663   char control;
1664   unsigned int oid;
1665   void *modptr, *oldptr;
1666
1667   recv_data_buf(sd, readbuffer, &length, sizeof(int));
1668   size = length - sizeof(int);
1669   char recvbuffer[size];
1670 #ifdef TRANSSTATS
1671     getResponse++;
1672     LOGEVENT('Z');
1673 #endif
1674     recv_data_buf(sd, readbuffer, recvbuffer, size);
1675   control = *((char *) recvbuffer);
1676   if(control == OBJECT_FOUND) {
1677     oid = *((unsigned int *)(recvbuffer + sizeof(char)));
1678     size = size - (sizeof(char) + sizeof(unsigned int));
1679     pthread_mutex_lock(&prefetchcache_mutex);
1680     if ((modptr = prefetchobjstrAlloc(size)) == NULL) {
1681       printf("Error: objstrAlloc error for copying into prefetch cache %s, %d\n", __FILE__, __LINE__);
1682       pthread_mutex_unlock(&prefetchcache_mutex);
1683       return -1;
1684     }
1685     pthread_mutex_unlock(&prefetchcache_mutex);
1686     memcpy(modptr, recvbuffer + sizeof(char) + sizeof(unsigned int), size);
1687     STATUS(modptr)=0;
1688
1689     /* Insert the oid and its address into the prefetch hash lookup table */
1690     /* Do a version comparison if the oid exists */
1691     if((oldptr = prehashSearch(oid)) != NULL) {
1692       /* If older version then update with new object ptr */
1693       if(((objheader_t *)oldptr)->version <= ((objheader_t *)modptr)->version) {
1694         prehashRemove(oid);
1695         prehashInsert(oid, modptr);
1696       }
1697     } else { /* Else add the object ptr to hash table*/
1698       prehashInsert(oid, modptr);
1699     }
1700     /* Lock the Prefetch Cache look up table*/
1701     pthread_mutex_lock(&pflookup.lock);
1702     /* Broadcast signal on prefetch cache condition variable */
1703     pthread_cond_broadcast(&pflookup.cond);
1704     /* Unlock the Prefetch Cache look up table*/
1705     pthread_mutex_unlock(&pflookup.lock);
1706   } else if(control == OBJECT_NOT_FOUND) {
1707     oid = *((unsigned int *)(recvbuffer + sizeof(char)));
1708     /* TODO: For each object not found query DHT for new location and retrieve the object */
1709     /* Throw an error */
1710     //printf("OBJECT %x NOT FOUND.... THIS SHOULD NOT HAPPEN...TERMINATE PROGRAM\n", oid);
1711     //    exit(-1);
1712   } else {
1713     printf("Error: in decoding the control value %d, %s, %d\n",control, __FILE__, __LINE__);
1714   }
1715
1716   return 0;
1717 }
1718
1719 unsigned short getObjType(unsigned int oid) {
1720   objheader_t *objheader;
1721   unsigned short numoffset[] ={0};
1722   short fieldoffset[] ={};
1723
1724   if ((objheader = (objheader_t *) mhashSearch(oid)) == NULL) {
1725 #ifdef CACHE
1726     if ((objheader = (objheader_t *) prehashSearch(oid)) == NULL) {
1727 #endif
1728     unsigned int mid = lhashSearch(oid);
1729     int sd = getSock2(transReadSockPool, mid);
1730     char remotereadrequest[sizeof(char)+sizeof(unsigned int)];
1731     remotereadrequest[0] = READ_REQUEST;
1732     *((unsigned int *)(&remotereadrequest[1])) = oid;
1733     send_data(sd, remotereadrequest, sizeof(remotereadrequest));
1734
1735     /* Read response from the Participant */
1736     char control;
1737     recv_data(sd, &control, sizeof(char));
1738
1739     if (control==OBJECT_NOT_FOUND) {
1740       printf("Error: in %s() THIS SHOULD NOT HAPPEN.....EXIT PROGRAM\n", __func__);
1741       fflush(stdout);
1742       exit(-1);
1743     } else {
1744       /* Read object if found into local cache */
1745       int size;
1746       recv_data(sd, &size, sizeof(int));
1747 #ifdef CACHE
1748       pthread_mutex_lock(&prefetchcache_mutex);
1749       if ((objheader = prefetchobjstrAlloc(size)) == NULL) {
1750         printf("Error: %s() objstrAlloc error for copying into prefetch cache %s, %d\n", __func__, __FILE__, __LINE__);
1751         pthread_exit(NULL);
1752       }
1753       pthread_mutex_unlock(&prefetchcache_mutex);
1754       recv_data(sd, objheader, size);
1755       prehashInsert(oid, objheader);
1756       return TYPE(objheader);
1757 #else
1758       char *buffer;
1759       if((buffer = calloc(1, size)) == NULL) {
1760         printf("%s() Calloc Error %s at line %d\n", __func__, __FILE__, __LINE__);
1761         fflush(stdout);
1762         return 0;
1763       }
1764       recv_data(sd, buffer, size);
1765       objheader = (objheader_t *)buffer;
1766       unsigned short type = TYPE(objheader);
1767       free(buffer);
1768       return type;
1769 #endif
1770     }
1771 #ifdef CACHE
1772   }
1773 #endif
1774   }
1775   return TYPE(objheader);
1776 }
1777
1778 int startRemoteThread(unsigned int oid, unsigned int mid) {
1779   int sock;
1780   struct sockaddr_in remoteAddr;
1781   char msg[1 + sizeof(unsigned int)];
1782   int bytesSent;
1783   int status;
1784
1785   if ((sock = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
1786     perror("startRemoteThread():socket()");
1787     return -1;
1788   }
1789
1790   bzero(&remoteAddr, sizeof(remoteAddr));
1791   remoteAddr.sin_family = AF_INET;
1792   remoteAddr.sin_port = htons(LISTEN_PORT);
1793   remoteAddr.sin_addr.s_addr = htonl(mid);
1794
1795   if (connect(sock, (struct sockaddr *)&remoteAddr, sizeof(remoteAddr)) < 0) {
1796     printf("startRemoteThread():error %d connecting to %s:%d\n", errno,
1797            inet_ntoa(remoteAddr.sin_addr), LISTEN_PORT);
1798     status = -1;
1799   } else
1800   {
1801     msg[0] = START_REMOTE_THREAD;
1802     *((unsigned int *) &msg[1]) = oid;
1803     send_data(sock, msg, 1 + sizeof(unsigned int));
1804   }
1805
1806   close(sock);
1807   return status;
1808 }
1809
1810 //TODO: when reusing oids, make sure they are not already in use!
1811 static unsigned int id = 0xFFFFFFFF;
1812 unsigned int getNewOID(void) {
1813   id += 2;
1814   if (id > oidMax || id < oidMin) {
1815     id = (oidMin | 1);
1816   }
1817   return id;
1818 }
1819
1820 int processConfigFile() {
1821   FILE *configFile;
1822   const int maxLineLength = 200;
1823   char lineBuffer[maxLineLength];
1824   char *token;
1825   const char *delimiters = " \t\n";
1826   char *commentBegin;
1827   in_addr_t tmpAddr;
1828
1829   configFile = fopen(CONFIG_FILENAME, "r");
1830   if (configFile == NULL) {
1831     printf("error opening %s:\n", CONFIG_FILENAME);
1832     perror("");
1833     return -1;
1834   }
1835
1836   numHostsInSystem = 0;
1837   sizeOfHostArray = 8;
1838   hostIpAddrs = calloc(sizeOfHostArray, sizeof(unsigned int));
1839
1840   while(fgets(lineBuffer, maxLineLength, configFile) != NULL) {
1841     commentBegin = strchr(lineBuffer, '#');
1842     if (commentBegin != NULL)
1843       *commentBegin = '\0';
1844     token = strtok(lineBuffer, delimiters);
1845     while (token != NULL) {
1846       tmpAddr = inet_addr(token);
1847       if ((int)tmpAddr == -1) {
1848         printf("error in %s: bad token:%s\n", CONFIG_FILENAME, token);
1849         fclose(configFile);
1850         return -1;
1851       } else
1852         addHost(htonl(tmpAddr));
1853       token = strtok(NULL, delimiters);
1854     }
1855   }
1856
1857   fclose(configFile);
1858
1859   if (numHostsInSystem < 1) {
1860     printf("error in %s: no IP Adresses found\n", CONFIG_FILENAME);
1861     return -1;
1862   }
1863 #ifdef MAC
1864   myIpAddr = getMyIpAddr("en1");
1865 #else
1866   myIpAddr = getMyIpAddr("eth0");
1867 #endif
1868   myIndexInHostArray = findHost(myIpAddr);
1869   if (myIndexInHostArray == -1) {
1870     printf("error in %s: IP Address of eth0 not found\n", CONFIG_FILENAME);
1871     return -1;
1872   }
1873   oidsPerBlock = (0xFFFFFFFF / numHostsInSystem) + 1;
1874   oidMin = oidsPerBlock * myIndexInHostArray;
1875   if (myIndexInHostArray == numHostsInSystem - 1)
1876     oidMax = 0xFFFFFFFF;
1877   else
1878     oidMax = oidsPerBlock * (myIndexInHostArray + 1) - 1;
1879
1880   return 0;
1881 }
1882
1883 void addHost(unsigned int hostIp) {
1884   unsigned int *tmpArray;
1885
1886   if (findHost(hostIp) != -1)
1887     return;
1888
1889   if (numHostsInSystem == sizeOfHostArray) {
1890     tmpArray = calloc(sizeOfHostArray * 2, sizeof(unsigned int));
1891     memcpy(tmpArray, hostIpAddrs, sizeof(unsigned int) * numHostsInSystem);
1892     free(hostIpAddrs);
1893     hostIpAddrs = tmpArray;
1894   }
1895
1896   hostIpAddrs[numHostsInSystem++] = hostIp;
1897
1898   return;
1899 }
1900
1901 int findHost(unsigned int hostIp) {
1902   int i;
1903   for (i = 0; i < numHostsInSystem; i++)
1904     if (hostIpAddrs[i] == hostIp)
1905       return i;
1906
1907   //not found
1908   return -1;
1909 }
1910
1911 /* This function sends notification request per thread waiting on object(s) whose version
1912  * changes */
1913 int reqNotify(unsigned int *oidarry, unsigned short *versionarry, unsigned int numoid) {
1914   int sock,i;
1915   objheader_t *objheader;
1916   struct sockaddr_in remoteAddr;
1917   char msg[1 + numoid * (sizeof(unsigned short) + sizeof(unsigned int)) +  3 * sizeof(unsigned int)];
1918   char *ptr;
1919   int bytesSent;
1920   int status, size;
1921   unsigned short version;
1922   unsigned int oid,mid;
1923   static unsigned int threadid = 0;
1924   pthread_mutex_t threadnotify = PTHREAD_MUTEX_INITIALIZER; //Lock and condition var for threadjoin and notification
1925   pthread_cond_t threadcond = PTHREAD_COND_INITIALIZER;
1926   notifydata_t *ndata;
1927
1928   oid = oidarry[0];
1929   if((mid = lhashSearch(oid)) == 0) {
1930     printf("Error: %s() No such machine found for oid =%x\n",__func__, oid);
1931     return;
1932   }
1933
1934   if ((sock = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
1935     perror("reqNotify():socket()");
1936     return -1;
1937   }
1938
1939   bzero(&remoteAddr, sizeof(remoteAddr));
1940   remoteAddr.sin_family = AF_INET;
1941   remoteAddr.sin_port = htons(LISTEN_PORT);
1942   remoteAddr.sin_addr.s_addr = htonl(mid);
1943
1944   /* Generate unique threadid */
1945   threadid++;
1946
1947   /* Save threadid, numoid, oidarray, versionarray, pthread_cond_variable for later processing */
1948   if((ndata = calloc(1, sizeof(notifydata_t))) == NULL) {
1949     printf("Calloc Error %s, %d\n", __FILE__, __LINE__);
1950     return -1;
1951   }
1952   ndata->numoid = numoid;
1953   ndata->threadid = threadid;
1954   ndata->oidarry = oidarry;
1955   ndata->versionarry = versionarry;
1956   ndata->threadcond = threadcond;
1957   ndata->threadnotify = threadnotify;
1958   if((status = notifyhashInsert(threadid, ndata)) != 0) {
1959     printf("reqNotify(): Insert into notify hash table not successful %s, %d\n", __FILE__, __LINE__);
1960     free(ndata);
1961     return -1;
1962   }
1963
1964   /* Send  number of oids, oidarry, version array, machine id and threadid */
1965   if (connect(sock, (struct sockaddr *)&remoteAddr, sizeof(remoteAddr)) < 0) {
1966     printf("reqNotify():error %d connecting to %s:%d\n", errno,
1967            inet_ntoa(remoteAddr.sin_addr), LISTEN_PORT);
1968     free(ndata);
1969     return -1;
1970   } else {
1971     msg[0] = THREAD_NOTIFY_REQUEST;
1972     *((unsigned int *)(&msg[1])) = numoid;
1973     /* Send array of oids  */
1974     size = sizeof(unsigned int);
1975
1976     for(i = 0;i < numoid; i++) {
1977       oid = oidarry[i];
1978       *((unsigned int *)(&msg[1] + size)) = oid;
1979       size += sizeof(unsigned int);
1980     }
1981
1982     /* Send array of version  */
1983     for(i = 0;i < numoid; i++) {
1984       version = versionarry[i];
1985       *((unsigned short *)(&msg[1] + size)) = version;
1986       size += sizeof(unsigned short);
1987     }
1988
1989     *((unsigned int *)(&msg[1] + size)) = myIpAddr; size += sizeof(unsigned int);
1990     *((unsigned int *)(&msg[1] + size)) = threadid;
1991     pthread_mutex_lock(&(ndata->threadnotify));
1992     size = 1 + numoid * (sizeof(unsigned int) + sizeof(unsigned short)) + 3 * sizeof(unsigned int);
1993     send_data(sock, msg, size);
1994     pthread_cond_wait(&(ndata->threadcond), &(ndata->threadnotify));
1995     pthread_mutex_unlock(&(ndata->threadnotify));
1996   }
1997
1998   pthread_cond_destroy(&threadcond);
1999   pthread_mutex_destroy(&threadnotify);
2000   free(ndata);
2001   close(sock);
2002   return status;
2003 }
2004
2005 void threadNotify(unsigned int oid, unsigned short version, unsigned int tid) {
2006   notifydata_t *ndata;
2007   int i, objIsFound = 0, index;
2008   void *ptr;
2009
2010   //Look up the tid and call the corresponding pthread_cond_signal
2011   if((ndata = notifyhashSearch(tid)) == NULL) {
2012     printf("threadnotify(): No such threadid is present %s, %d\n", __FILE__, __LINE__);
2013     return;
2014   } else  {
2015     for(i = 0; i < ndata->numoid; i++) {
2016       if(ndata->oidarry[i] == oid) {
2017         objIsFound = 1;
2018         index = i;
2019       }
2020     }
2021     if(objIsFound == 0) {
2022       printf("threadNotify(): Oid not found %s, %d\n", __FILE__, __LINE__);
2023       return;
2024     } else {
2025       if(version <= ndata->versionarry[index]) {
2026         printf("threadNotify(): New version %d has not changed since last version for oid = %d, %s, %d\n", version, oid, __FILE__, __LINE__);
2027         return;
2028       } else {
2029 #ifdef CACHE
2030         /* Clear from prefetch cache and free thread related data structure */
2031         if((ptr = prehashSearch(oid)) != NULL) {
2032           prehashRemove(oid);
2033         }
2034 #endif
2035         pthread_mutex_lock(&(ndata->threadnotify));
2036         pthread_cond_signal(&(ndata->threadcond));
2037         pthread_mutex_unlock(&(ndata->threadnotify));
2038       }
2039     }
2040   }
2041   return;
2042 }
2043
2044 int notifyAll(threadlist_t **head, unsigned int oid, unsigned int version) {
2045   threadlist_t *ptr;
2046   unsigned int mid;
2047   struct sockaddr_in remoteAddr;
2048   char msg[1 + sizeof(unsigned short) + 2*sizeof(unsigned int)];
2049   int sock, status, size, bytesSent;
2050
2051   while(*head != NULL) {
2052     ptr = *head;
2053     mid = ptr->mid;
2054     //create a socket connection to that machine
2055     if ((sock = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
2056       perror("notifyAll():socket()");
2057       return -1;
2058     }
2059
2060     bzero(&remoteAddr, sizeof(remoteAddr));
2061     remoteAddr.sin_family = AF_INET;
2062     remoteAddr.sin_port = htons(LISTEN_PORT);
2063     remoteAddr.sin_addr.s_addr = htonl(mid);
2064     //send Thread Notify response and threadid to that machine
2065     if (connect(sock, (struct sockaddr *)&remoteAddr, sizeof(remoteAddr)) < 0) {
2066       printf("notifyAll():error %d connecting to %s:%d\n", errno,
2067              inet_ntoa(remoteAddr.sin_addr), LISTEN_PORT);
2068       fflush(stdout);
2069       status = -1;
2070     } else {
2071       bzero(msg, (1+sizeof(unsigned short) + 2*sizeof(unsigned int)));
2072       msg[0] = THREAD_NOTIFY_RESPONSE;
2073       *((unsigned int *)&msg[1]) = oid;
2074       size = sizeof(unsigned int);
2075       *((unsigned short *)(&msg[1]+ size)) = version;
2076       size+= sizeof(unsigned short);
2077       *((unsigned int *)(&msg[1]+ size)) = ptr->threadid;
2078
2079       size = 1 + 2*sizeof(unsigned int) + sizeof(unsigned short);
2080       send_data(sock, msg, size);
2081     }
2082     //close socket
2083     close(sock);
2084     // Update head
2085     *head = ptr->next;
2086     free(ptr);
2087   }
2088   return status;
2089 }
2090
2091 void transAbort() {
2092 #ifdef ABORTREADERS
2093   removetransactionhash();
2094 #endif
2095   objstrDelete(t_cache);
2096   t_chashDelete();
2097 }
2098
2099 /* This function inserts necessary information into
2100  * a machine pile data structure */
2101 plistnode_t *pInsert(plistnode_t *pile, objheader_t *headeraddr, unsigned int mid, int num_objs) {
2102   plistnode_t *ptr, *tmp;
2103   int found = 0, offset = 0;
2104
2105   tmp = pile;
2106   //Add oid into a machine that is already present in the pile linked list structure
2107   while(tmp != NULL) {
2108     if (tmp->mid == mid) {
2109       int tmpsize;
2110
2111       if (STATUS(headeraddr) & NEW) {
2112         tmp->oidcreated[tmp->numcreated] = OID(headeraddr);
2113         tmp->numcreated++;
2114         GETSIZE(tmpsize, headeraddr);
2115         tmp->sum_bytes += sizeof(objheader_t) + tmpsize;
2116       } else if (STATUS(headeraddr) & DIRTY) {
2117         tmp->oidmod[tmp->nummod] = OID(headeraddr);
2118         tmp->nummod++;
2119         GETSIZE(tmpsize, headeraddr);
2120         tmp->sum_bytes += sizeof(objheader_t) + tmpsize;
2121       } else {
2122         offset = (sizeof(unsigned int) + sizeof(short)) * tmp->numread;
2123         *((unsigned int *)(((char *)tmp->objread) + offset))=OID(headeraddr);
2124         offset += sizeof(unsigned int);
2125         *((short *)(((char *)tmp->objread) + offset)) = headeraddr->version;
2126         tmp->numread++;
2127       }
2128       found = 1;
2129       break;
2130     }
2131     tmp = tmp->next;
2132   }
2133   //Add oid for any new machine
2134   if (!found) {
2135     int tmpsize;
2136     if((ptr = pCreate(num_objs)) == NULL) {
2137       return NULL;
2138     }
2139     ptr->mid = mid;
2140     if (STATUS(headeraddr) & NEW) {
2141       ptr->oidcreated[ptr->numcreated] = OID(headeraddr);
2142       ptr->numcreated++;
2143       GETSIZE(tmpsize, headeraddr);
2144       ptr->sum_bytes += sizeof(objheader_t) + tmpsize;
2145     } else if (STATUS(headeraddr) & DIRTY) {
2146       ptr->oidmod[ptr->nummod] = OID(headeraddr);
2147       ptr->nummod++;
2148       GETSIZE(tmpsize, headeraddr);
2149       ptr->sum_bytes += sizeof(objheader_t) + tmpsize;
2150     } else {
2151       *((unsigned int *)ptr->objread)=OID(headeraddr);
2152       offset = sizeof(unsigned int);
2153       *((short *)(((char *)ptr->objread) + offset)) = headeraddr->version;
2154       ptr->numread++;
2155     }
2156     ptr->next = pile;
2157     pile = ptr;
2158   }
2159
2160   /* Clear Flags */
2161   STATUS(headeraddr) =0;
2162
2163
2164   return pile;
2165 }
2166
2167 plistnode_t *sortPiles(plistnode_t *pileptr) {
2168   plistnode_t *head, *ptr, *tail;
2169   head = pileptr;
2170   ptr = pileptr;
2171   /* Get tail pointer */
2172   while(ptr!= NULL) {
2173     tail = ptr;
2174     ptr = ptr->next;
2175   }
2176   ptr = pileptr;
2177   plistnode_t *prev = pileptr;
2178   /* Arrange local machine processing at the end of the pile list */
2179   while(ptr != NULL) {
2180     if(ptr != tail) {
2181       if(ptr->mid == myIpAddr && (prev != pileptr)) {
2182         prev->next = ptr->next;
2183         ptr->next = NULL;
2184         tail->next = ptr;
2185         return pileptr;
2186       }
2187       if((ptr->mid == myIpAddr) && (prev == pileptr)) {
2188         prev = ptr->next;
2189         ptr->next = NULL;
2190         tail->next = ptr;
2191         return prev;
2192       }
2193       prev = ptr;
2194     }
2195     ptr = ptr->next;
2196   }
2197   return pileptr;
2198 }