changes
[IRC.git] / Robust / src / Runtime / DSTM / interface / trans.c
1 #include "dstm.h"
2 #include "ip.h"
3 #include "machinepile.h"
4 #include "mlookup.h"
5 #include "llookup.h"
6 #include "plookup.h"
7 #include "prelookup.h"
8 #include "threadnotify.h"
9 #include "queue.h"
10 #include "addUdpEnhance.h"
11 #include "addPrefetchEnhance.h"
12 #include "gCollect.h"
13 #include "dsmlock.h"
14 #include "prefetch.h"
15 #ifdef COMPILER
16 #include "thread.h"
17 #endif
18 #ifdef ABORTREADERS
19 #include "abortreaders.h"
20 #endif
21 #include "trans.h"
22
23 #define NUM_THREADS 1
24 #define CONFIG_FILENAME "dstm.conf"
25
26 /* Thread transaction variables */
27
28 __thread objstr_t *t_cache;
29 __thread struct ___Object___ *revertlist;
30 #ifdef ABORTREADERS
31 __thread int t_abort;
32 __thread jmp_buf aborttrans;
33 #endif
34
35
36 /* Global Variables */
37 extern int classsize[];
38 pfcstats_t *evalPrefetch;
39 extern int numprefetchsites; //Global variable containing number of prefetch sites
40 extern pthread_mutex_t mainobjstore_mutex; // Mutex to lock main Object store
41 pthread_mutex_t prefetchcache_mutex; // Mutex to lock Prefetch Cache
42 pthread_mutexattr_t prefetchcache_mutex_attr; /* Attribute for lock to make it a recursive lock */
43 extern prehashtable_t pflookup; //Global Prefetch cache's lookup table
44 pthread_t wthreads[NUM_THREADS]; //Worker threads for working on the prefetch queue
45 pthread_t tPrefetch;            /* Primary Prefetch thread that processes the prefetch queue */
46 extern objstr_t *mainobjstore;
47 unsigned int myIpAddr;
48 unsigned int *hostIpAddrs;
49 int sizeOfHostArray;
50 int numHostsInSystem;
51 int myIndexInHostArray;
52 unsigned int oidsPerBlock;
53 unsigned int oidMin;
54 unsigned int oidMax;
55
56 sockPoolHashTable_t *transReadSockPool;
57 sockPoolHashTable_t *transPrefetchSockPool;
58 sockPoolHashTable_t *transRequestSockPool;
59 pthread_mutex_t notifymutex;
60 pthread_mutex_t atomicObjLock;
61
62 /***********************************
63  * Global Variables for statistics
64  **********************************/
65 int numTransCommit = 0;
66 int numTransAbort = 0;
67 int nchashSearch = 0;
68 int nmhashSearch = 0;
69 int nprehashSearch = 0;
70 int nRemoteSend = 0;
71 int nSoftAbort = 0;
72 int bytesSent = 0;
73 int bytesRecv = 0;
74 int totalObjSize = 0;
75 int sendRemoteReq = 0;
76 int getResponse = 0;
77
78 void printhex(unsigned char *, int);
79 plistnode_t *createPiles();
80 plistnode_t *sortPiles(plistnode_t *pileptr);
81
82 char bigarray[16*1024*1024];
83 int bigindex=0;
84 #define LOGEVENT(x) { \
85     int tmp=bigindex++;                         \
86     bigarray[tmp]=x;                            \
87   }
88
89
90 /*******************************
91 * Send and Recv function calls
92 *******************************/
93 void send_data(int fd, void *buf, int buflen) {
94   char *buffer = (char *)(buf);
95   int size = buflen;
96   int numbytes;
97   while (size > 0) {
98     numbytes = send(fd, buffer, size, MSG_NOSIGNAL);
99     bytesSent = bytesSent + numbytes;
100     if (numbytes == -1) {
101       perror("send");
102       exit(0);
103     }
104     buffer += numbytes;
105     size -= numbytes;
106   }
107 }
108
109 void recv_data_buf(int fd, struct readstruct * readbuffer, void *buffer, int buflen) {
110   char *buf=(char *)buffer;
111
112   int numbytes=readbuffer->head-readbuffer->tail;
113   if (numbytes>buflen)
114     numbytes=buflen;
115   if (numbytes>0) {
116     memcpy(buf, &readbuffer->buf[readbuffer->tail], numbytes);
117     readbuffer->tail+=numbytes;
118     buflen-=numbytes;
119     buf+=numbytes;
120     if (buflen==0) {
121       return;
122     }
123   }
124
125   if (buflen>=MAXBUF) {
126     recv_data(fd, buf, buflen);
127     return;
128   }
129   
130   int maxbuf=MAXBUF;
131   int obufflen=buflen;
132   readbuffer->head=0;
133   
134   while (buflen > 0) {
135     int numbytes = recv(fd, &readbuffer->buf[readbuffer->head], maxbuf, 0);
136     if (numbytes == -1) {
137       perror("recv");
138       exit(0);
139     }
140     buflen-=numbytes;
141     readbuffer->head+=numbytes;
142     maxbuf-=numbytes;
143   }
144   memcpy(buf,readbuffer->buf,obufflen);
145   readbuffer->tail=obufflen;
146 }
147
148 int recv_data_errorcode_buf(int fd, struct readstruct * readbuffer, void *buffer, int buflen) {
149   char *buf=(char *)buffer;
150   //now tail<=head
151   int numbytes=readbuffer->head-readbuffer->tail;
152   if (numbytes>buflen)
153     numbytes=buflen;
154   if (numbytes>0) {
155     memcpy(buf, &readbuffer->buf[readbuffer->tail], numbytes);
156     readbuffer->tail+=numbytes;
157     buflen-=numbytes;
158     buf+=numbytes;
159     if (buflen==0)
160       return 1;
161   }
162
163   if (buflen>=MAXBUF) {
164     return recv_data_errorcode(fd, buf, buflen);
165   }
166
167   int maxbuf=MAXBUF;
168   int obufflen=buflen;
169   readbuffer->head=0;
170   
171   while (buflen > 0) {
172     int numbytes = recv(fd, &readbuffer->buf[readbuffer->head], maxbuf, 0);
173     if (numbytes ==0) {
174       return 0;
175     }
176     if (numbytes==-1) {
177       perror("recvbuf");
178       return -1;
179     }
180     buflen-=numbytes;
181     readbuffer->head+=numbytes;
182     maxbuf-=numbytes;
183   }
184   memcpy(buf,readbuffer->buf,obufflen);
185   readbuffer->tail=obufflen;
186   return 1;
187 }
188
189
190 void recv_data(int fd, void *buf, int buflen) {
191   char *buffer = (char *)(buf);
192   int size = buflen;
193   int numbytes;
194   while (size > 0) {
195     numbytes = recv(fd, buffer, size, 0);
196     bytesRecv = bytesRecv + numbytes;
197     if (numbytes == -1) {
198       perror("recv");
199       exit(0);
200     }
201     buffer += numbytes;
202     size -= numbytes;
203   }
204 }
205
206 int recv_data_errorcode(int fd, void *buf, int buflen) {
207   char *buffer = (char *)(buf);
208   int size = buflen;
209   int numbytes;
210   while (size > 0) {
211     numbytes = recv(fd, buffer, size, 0);
212     if (numbytes==0)
213       return 0;
214     if (numbytes == -1) {
215       perror("recv");
216       return -1;
217     }
218     buffer += numbytes;
219     size -= numbytes;
220   }
221   return 1;
222 }
223
224 void printhex(unsigned char *ptr, int numBytes) {
225   int i;
226   for (i = 0; i < numBytes; i++) {
227     if (ptr[i] < 16)
228       printf("0%x ", ptr[i]);
229     else
230       printf("%x ", ptr[i]);
231   }
232   printf("\n");
233   return;
234 }
235
236 inline int arrayLength(int *array) {
237   int i;
238   for(i=0 ; array[i] != -1; i++)
239     ;
240   return i;
241 }
242
243 inline int findmax(int *array, int arraylength) {
244   int max, i;
245   max = array[0];
246   for(i = 0; i < arraylength; i++) {
247     if(array[i] > max) {
248       max = array[i];
249     }
250   }
251   return max;
252 }
253
254 /* This function is a prefetch call generated by the compiler that
255  * populates the shared primary prefetch queue*/
256 void prefetch(int siteid, int ntuples, unsigned int *oids, unsigned short *endoffsets, short *arrayfields) {
257   /* Allocate for the queue node*/
258   int qnodesize = 2*sizeof(int) + ntuples * (sizeof(unsigned short) + sizeof(unsigned int)) + endoffsets[ntuples - 1] * sizeof(short);
259   int len;
260 #ifdef INLINEPREFETCH
261   char node[qnodesize];
262 #else
263   char *node=getmemory(qnodesize);
264 #endif
265   int top=endoffsets[ntuples-1];
266
267   if (node==NULL) {
268     LOGEVENT('D');
269     return;
270   }
271   /* Set queue node values */
272
273   /* TODO: Remove this after testing */
274   evalPrefetch[siteid].callcount++;
275
276   *((int *)(node))=siteid;
277   *((int *)(node + sizeof(int))) = ntuples;
278   len = 2*sizeof(int);
279   memcpy(node+len, oids, ntuples*sizeof(unsigned int));
280   memcpy(node+len+ntuples*sizeof(unsigned int), endoffsets, ntuples*sizeof(unsigned short));
281   memcpy(node+len+ntuples*(sizeof(unsigned int)+sizeof(short)), arrayfields, top*sizeof(short));
282
283 #ifdef INLINEPREFETCH
284   prefetchpile_t *pilehead = foundLocal(node);
285
286   if (pilehead!=NULL) {
287     // Get sock from shared pool
288     
289     /* Send  Prefetch Request */
290     prefetchpile_t *ptr = pilehead;
291     while(ptr != NULL) {
292       int sd = getSock2(transPrefetchSockPool, ptr->mid);
293       sendPrefetchReq(ptr, sd);
294       ptr = ptr->next;
295     }
296     
297     /* Release socket */
298     //  freeSock(transPrefetchSockPool, pilehead->mid, sd);
299     
300     /* Deallocated pilehead */
301     mcdealloc(pilehead);
302   }
303 #else
304   /* Lock and insert into primary prefetch queue */
305   movehead(qnodesize);
306 #endif
307 }
308
309 /* This function starts up the transaction runtime. */
310 int dstmStartup(const char * option) {
311   pthread_t thread_Listen, udp_thread_Listen;
312   pthread_attr_t attr;
313   int master=option!=NULL && strcmp(option, "master")==0;
314   int fd;
315   int udpfd;
316
317   if (processConfigFile() != 0)
318     return 0; //TODO: return error value, cause main program to exit
319 #ifdef COMPILER
320   if (!master)
321     threadcount--;
322 #endif
323
324 #ifdef TRANSSTATS
325   printf("Trans stats is on\n");
326   fflush(stdout);
327 #endif
328 #ifdef ABORTREADERS
329   initreaderlist();
330 #endif
331
332   //Initialize socket pool
333   transReadSockPool = createSockPool(transReadSockPool, DEFAULTSOCKPOOLSIZE);
334   transPrefetchSockPool = createSockPool(transPrefetchSockPool, DEFAULTSOCKPOOLSIZE);
335   transRequestSockPool = createSockPool(transRequestSockPool, DEFAULTSOCKPOOLSIZE);
336
337   dstmInit();
338   transInit();
339
340   fd=startlistening();
341   pthread_attr_init(&attr);
342   pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
343 #ifdef CACHE
344   udpfd = udpInit();
345   pthread_create(&udp_thread_Listen, &attr, udpListenBroadcast, (void*)udpfd);
346 #endif
347   if (master) {
348     pthread_create(&thread_Listen, &attr, dstmListen, (void*)fd);
349     return 1;
350   } else {
351     dstmListen((void *)fd);
352     return 0;
353   }
354 }
355
356 //TODO Use this later
357 void *pCacheAlloc(objstr_t *store, unsigned int size) {
358   void *tmp;
359   objstr_t *ptr;
360   ptr = store;
361   int success = 0;
362
363   while(ptr->next != NULL) {
364     /* check if store is empty */
365     if(((unsigned int)ptr->top - (unsigned int)ptr - sizeof(objstr_t) + size) <= ptr->size) {
366       tmp = ptr->top;
367       ptr->top += size;
368       success = 1;
369       return tmp;
370     } else {
371       ptr = ptr->next;
372     }
373   }
374
375   if(success == 0) {
376     return NULL;
377   }
378 }
379
380 /* This function initiates the prefetch thread A queue is shared
381  * between the main thread of execution and the prefetch thread to
382  * process the prefetch call Call from compiler populates the shared
383  * queue with prefetch requests while prefetch thread processes the
384  * prefetch requests */
385
386 void transInit() {
387   //Create and initialize prefetch cache structure
388 #ifdef CACHE
389   initializePCache();
390   if((evalPrefetch = initPrefetchStats()) == NULL) {
391     printf("%s() Error allocating memory at %s, %d\n", __func__, __FILE__, __LINE__);
392     exit(0);
393   }
394 #endif
395
396   /* Initialize attributes for mutex */
397   pthread_mutexattr_init(&prefetchcache_mutex_attr);
398   pthread_mutexattr_settype(&prefetchcache_mutex_attr, PTHREAD_MUTEX_RECURSIVE_NP);
399
400   pthread_mutex_init(&prefetchcache_mutex, &prefetchcache_mutex_attr);
401   pthread_mutex_init(&notifymutex, NULL);
402   pthread_mutex_init(&atomicObjLock, NULL);
403 #ifdef CACHE
404   //Create prefetch cache lookup table
405   if(prehashCreate(PHASH_SIZE, PLOADFACTOR)) {
406     printf("ERROR\n");
407     return; //Failure
408   }
409
410   //Initialize primary shared queue
411   queueInit();
412   //Initialize machine pile w/prefetch oids and offsets shared queue
413   mcpileqInit();
414
415   //Create the primary prefetch thread
416   int retval;
417 #ifdef RANGEPREFETCH
418   do {
419     retval=pthread_create(&tPrefetch, NULL, transPrefetchNew, NULL);
420   } while(retval!=0);
421 #else
422 #ifndef INLINEPREFETCH
423   do {
424     retval=pthread_create(&tPrefetch, NULL, transPrefetch, NULL);
425   } while(retval!=0);
426 #endif
427 #endif
428   pthread_detach(tPrefetch);
429 #endif
430 }
431
432 /* This function stops the threads spawned */
433 void transExit() {
434 #ifdef CACHE
435   int t;
436   pthread_cancel(tPrefetch);
437   for(t = 0; t < NUM_THREADS; t++)
438     pthread_cancel(wthreads[t]);
439 #endif
440
441   return;
442 }
443
444 /* This functions inserts randowm wait delays in the order of msec
445  * Mostly used when transaction commits retry*/
446 void randomdelay() {
447   struct timespec req;
448   time_t t;
449
450   t = time(NULL);
451   req.tv_sec = 0;
452   req.tv_nsec = (long)(1000 + (t%10000)); //1-11 microsec
453   nanosleep(&req, NULL);
454   return;
455 }
456
457 /* This function initializes things required in the transaction start*/
458 void transStart() {
459   t_cache = objstrCreate(1048576);
460   t_chashCreate(CHASH_SIZE, CLOADFACTOR);
461   revertlist=NULL;
462 #ifdef ABORTREADERS
463   t_abort=0;
464 #endif
465 }
466
467 // Search for an address for a given oid                                                                               
468 /*#define INLINE    inline __attribute__((always_inline))
469
470 INLINE void * chashSearchI(chashtable_t *table, unsigned int key) {
471   //REMOVE HASH FUNCTION CALL TO MAKE SURE IT IS INLINED HERE                                                          
472   chashlistnode_t *node = &table->table[(key & table->mask)>>1];
473
474   do {
475     if(node->key == key) {
476       return node->val;
477     }
478     node = node->next;
479   } while(node != NULL);
480
481   return NULL;
482   }*/
483
484
485
486
487 /* This function finds the location of the objects involved in a transaction
488  * and returns the pointer to the object if found in a remote location */
489 __attribute__((pure)) objheader_t *transRead(unsigned int oid) {
490   unsigned int machinenumber;
491   objheader_t *tmp, *objheader;
492   objheader_t *objcopy;
493   int size;
494   void *buf;
495   chashlistnode_t *node;
496
497   if(oid == 0) {
498     return NULL;
499   }
500   
501
502   node= &c_table[(oid & c_mask)>>1];
503   do {
504     if(node->key == oid) {
505 #ifdef TRANSSTATS
506     nchashSearch++;
507 #endif
508 #ifdef COMPILER
509     return &((objheader_t*)node->val)[1];
510 #else
511     return node->val;
512 #endif
513     }
514     node = node->next;
515   } while(node != NULL);
516   
517
518   /*  
519   if((objheader = chashSearchI(record->lookupTable, oid)) != NULL) {
520 #ifdef TRANSSTATS
521     nchashSearch++;
522 #endif
523 #ifdef COMPILER
524     return &objheader[1];
525 #else
526     return objheader;
527 #endif
528   } else 
529   */
530
531 #ifdef ABORTREADERS
532   if (t_abort) {
533     //abort this transaction
534     //printf("ABORTING\n");
535     removetransactionhash();
536     objstrDelete(t_cache);
537     t_chashDelete();
538     _longjmp(aborttrans,1);
539   } else
540     addtransaction(oid);
541 #endif
542
543   if ((objheader = (objheader_t *) mhashSearch(oid)) != NULL) {
544 #ifdef TRANSSTATS
545     nmhashSearch++;
546 #endif
547     /* Look up in machine lookup table  and copy  into cache*/
548     GETSIZE(size, objheader);
549     size += sizeof(objheader_t);
550     objcopy = (objheader_t *) objstrAlloc(&t_cache, size);
551     memcpy(objcopy, objheader, size);
552     /* Insert into cache's lookup table */
553     STATUS(objcopy)=0;
554     t_chashInsert(OID(objheader), objcopy);
555 #ifdef COMPILER
556     return &objcopy[1];
557 #else
558     return objcopy;
559 #endif
560   } else {
561 #ifdef CACHE
562     if((tmp = (objheader_t *) prehashSearch(oid)) != NULL) {
563 #ifdef TRANSSTATS
564       nprehashSearch++;
565 #endif
566       /* Look up in prefetch cache */
567       GETSIZE(size, tmp);
568       size+=sizeof(objheader_t);
569       objcopy = (objheader_t *) objstrAlloc(&t_cache, size);
570       memcpy(objcopy, tmp, size);
571       /* Insert into cache's lookup table */
572       t_chashInsert(OID(tmp), objcopy);
573 #ifdef COMPILER
574       return &objcopy[1];
575 #else
576       return objcopy;
577 #endif
578     }
579 #endif
580     /* Get the object from the remote location */
581     if((machinenumber = lhashSearch(oid)) == 0) {
582       printf("Error: %s() No machine found for oid =% %s,%dx\n",__func__, machinenumber, __FILE__, __LINE__);
583       return NULL;
584     }
585     objcopy = getRemoteObj(machinenumber, oid);
586
587     if(objcopy == NULL) {
588       printf("Error: Object not found in Remote location %s, %d\n", __FILE__, __LINE__);
589       return NULL;
590     } else {
591 #ifdef TRANSSTATS
592       nRemoteSend++;
593 #endif
594 #ifdef COMPILER
595       return &objcopy[1];
596 #else
597       return objcopy;
598 #endif
599     }
600   }
601 }
602
603
604 /* This function finds the location of the objects involved in a transaction
605  * and returns the pointer to the object if found in a remote location */
606 __attribute__((pure)) objheader_t *transRead2(unsigned int oid) {
607   unsigned int machinenumber;
608   objheader_t *tmp, *objheader;
609   objheader_t *objcopy;
610   int size;
611
612 #ifdef ABORTREADERS
613   if (t_abort) {
614     //abort this transaction
615     //printf("ABORTING\n");
616     removetransactionhash();
617     objstrDelete(t_cache);
618     t_chashDelete();
619     _longjmp(aborttrans,1);
620   } else
621     addtransaction(oid);
622 #endif
623
624   if ((objheader = (objheader_t *) mhashSearch(oid)) != NULL) {
625 #ifdef TRANSSTATS
626     nmhashSearch++;
627 #endif
628     /* Look up in machine lookup table  and copy  into cache*/
629     GETSIZE(size, objheader);
630     size += sizeof(objheader_t);
631     objcopy = (objheader_t *) objstrAlloc(&t_cache, size);
632     memcpy(objcopy, objheader, size);
633     /* Insert into cache's lookup table */
634     STATUS(objcopy)=0;
635     t_chashInsert(OID(objheader), objcopy);
636 #ifdef COMPILER
637     return &objcopy[1];
638 #else
639     return objcopy;
640 #endif
641   } else {
642 #ifdef CACHE
643     if((tmp = (objheader_t *) prehashSearch(oid)) != NULL) {
644 #ifdef TRANSSTATS
645       LOGEVENT('P')
646       nprehashSearch++;
647 #endif
648       /* Look up in prefetch cache */
649       GETSIZE(size, tmp);
650       size+=sizeof(objheader_t);
651       objcopy = (objheader_t *) objstrAlloc(&t_cache, size);
652       memcpy(objcopy, tmp, size);
653       /* Insert into cache's lookup table */
654       t_chashInsert(OID(tmp), objcopy);
655 #ifdef COMPILER
656       return &objcopy[1];
657 #else
658       return objcopy;
659 #endif
660     }
661 #endif
662     /* Get the object from the remote location */
663     if((machinenumber = lhashSearch(oid)) == 0) {
664       printf("Error: %s() No machine found for oid =% %s,%dx\n",__func__, machinenumber, __FILE__, __LINE__);
665       return NULL;
666     }
667     objcopy = getRemoteObj(machinenumber, oid);
668
669     if(objcopy == NULL) {
670       printf("Error: Object not found in Remote location %s, %d\n", __FILE__, __LINE__);
671       return NULL;
672     } else {
673 #ifdef TRANSSTATS
674
675       LOGEVENT('R');
676       nRemoteSend++;
677 #endif
678 #ifdef COMPILER
679       return &objcopy[1];
680 #else
681       return objcopy;
682 #endif
683     }
684   }
685 }
686
687 /* This function creates objects in the transaction record */
688 objheader_t *transCreateObj(unsigned int size) {
689   objheader_t *tmp = (objheader_t *) objstrAlloc(&t_cache, (sizeof(objheader_t) + size));
690   OID(tmp) = getNewOID();
691   tmp->version = 1;
692   tmp->rcount = 1;
693   STATUS(tmp) = NEW;
694   t_chashInsert(OID(tmp), tmp);
695
696 #ifdef COMPILER
697   return &tmp[1]; //want space after object header
698 #else
699   return tmp;
700 #endif
701 }
702
703 #if 1
704 /* This function creates machine piles based on all machines involved in a
705  * transaction commit request */
706 plistnode_t *createPiles() {
707   int i;
708   plistnode_t *pile = NULL;
709   unsigned int machinenum;
710   objheader_t *headeraddr;
711   chashlistnode_t * ptr = c_table;
712   /* Represents number of bins in the chash table */
713   unsigned int size = c_size;
714
715   for(i = 0; i < size ; i++) {
716     chashlistnode_t * curr = &ptr[i];
717     /* Inner loop to traverse the linked list of the cache lookupTable */
718     while(curr != NULL) {
719       //if the first bin in hash table is empty
720       if(curr->key == 0)
721         break;
722       headeraddr=(objheader_t *) curr->val;
723
724       //Get machine location for object id (and whether local or not)
725       if (STATUS(headeraddr) & NEW || (mhashSearch(curr->key) != NULL)) {
726         machinenum = myIpAddr;
727       } else if ((machinenum = lhashSearch(curr->key)) == 0) {
728         printf("Error: No such machine %s, %d\n", __FILE__, __LINE__);
729         return NULL;
730       }
731
732       //Make machine groups
733       pile = pInsert(pile, headeraddr, machinenum, c_numelements);
734       curr = curr->next;
735     }
736   }
737   return pile;
738 }
739 #else
740 /* This function creates machine piles based on all machines involved in a
741  * transaction commit request */
742 plistnode_t *createPiles() {
743   int i;
744   plistnode_t *pile = NULL;
745   unsigned int machinenum;
746   objheader_t *headeraddr;
747   struct chashentry * ptr = c_table;
748   /* Represents number of bins in the chash table */
749   unsigned int size = c_size;
750
751   for(i = 0; i < size ; i++) {
752     struct chashentry * curr = & ptr[i];
753     /* Inner loop to traverse the linked list of the cache lookupTable */
754     //if the first bin in hash table is empty
755     if(curr->key == 0)
756       continue;
757     headeraddr=(objheader_t *) curr->ptr;
758
759     //Get machine location for object id (and whether local or not)
760     if (STATUS(headeraddr) & NEW || (mhashSearch(curr->key) != NULL)) {
761       machinenum = myIpAddr;
762     } else if ((machinenum = lhashSearch(curr->key)) == 0) {
763       printf("Error: No such machine %s, %d\n", __FILE__, __LINE__);
764       return NULL;
765     }
766
767     //Make machine groups
768     pile = pInsert(pile, headeraddr, machinenum, c_numelements);
769   }
770   return pile;
771 }
772 #endif
773
774 /* This function initiates the transaction commit process
775  * Spawns threads for each of the new connections with Participants
776  * and creates new piles by calling the createPiles(),
777  * Sends a transrequest() to each remote machines for objects found remotely
778  * and calls handleLocalReq() to process objects found locally */
779 int transCommit() {
780   unsigned int tot_bytes_mod, *listmid;
781   plistnode_t *pile, *pile_ptr;
782   char treplyretry; /* keeps track of the common response that needs to be sent */
783   int firsttime=1;
784   trans_commit_data_t transinfo; /* keeps track of objs locked during transaction */
785   char finalResponse;
786
787 #ifdef TRANSSTATS
788   int iii;
789   for(iii=0;iii<bigindex;iii++) {
790     printf("%c", bigarray[iii]);
791   }
792 #endif
793
794 #ifdef ABORTREADERS
795   if (t_abort) {
796     //abort this transaction
797     /* Debug
798      * printf("ABORTING TRANSACTION AT COMMIT\n");
799      */
800     removetransactionhash();
801     objstrDelete(t_cache);
802     t_chashDelete();
803     return 1;
804   }
805 #endif
806
807
808   do {
809     treplyretry = 0;
810
811     /* Look through all the objects in the transaction record and make piles
812      * for each machine involved in the transaction*/
813     if (firsttime) {
814       pile_ptr = pile = createPiles();
815       pile_ptr = pile = sortPiles(pile);
816     } else {
817       pile = pile_ptr;
818     }
819     firsttime = 0;
820     /* Create the packet to be sent in TRANS_REQUEST */
821
822     /* Count the number of participants */
823     int pilecount;
824     pilecount = pCount(pile);
825
826     /* Create a list of machine ids(Participants) involved in transaction   */
827     listmid = calloc(pilecount, sizeof(unsigned int));
828     pListMid(pile, listmid);
829
830     /* Create a socket and getReplyCtrl array, initialize */
831     int socklist[pilecount];
832     int loopcount;
833     for(loopcount = 0 ; loopcount < pilecount; loopcount++)
834       socklist[loopcount] = 0;
835     char getReplyCtrl[pilecount];
836     for(loopcount = 0 ; loopcount < pilecount; loopcount++)
837       getReplyCtrl[loopcount] = 0;
838
839     /* Process each machine pile */
840     int sockindex = 0;
841     trans_req_data_t *tosend;
842     tosend = calloc(pilecount, sizeof(trans_req_data_t));
843     while(pile != NULL) {
844       tosend[sockindex].f.control = TRANS_REQUEST;
845       tosend[sockindex].f.mcount = pilecount;
846       tosend[sockindex].f.numread = pile->numread;
847       tosend[sockindex].f.nummod = pile->nummod;
848       tosend[sockindex].f.numcreated = pile->numcreated;
849       tosend[sockindex].f.sum_bytes = pile->sum_bytes;
850       tosend[sockindex].listmid = listmid;
851       tosend[sockindex].objread = pile->objread;
852       tosend[sockindex].oidmod = pile->oidmod;
853       tosend[sockindex].oidcreated = pile->oidcreated;
854       int sd = 0;
855       if(pile->mid != myIpAddr) {
856         if((sd = getSock2WithLock(transRequestSockPool, pile->mid)) < 0) {
857           printf("transRequest(): socket create error\n");
858           free(listmid);
859           free(tosend);
860           return 1;
861         }
862         socklist[sockindex] = sd;
863         /* Send bytes of data with TRANS_REQUEST control message */
864         send_data(sd, &(tosend[sockindex].f), sizeof(fixed_data_t));
865
866         /* Send list of machines involved in the transaction */
867         {
868           int size=sizeof(unsigned int)*(tosend[sockindex].f.mcount);
869           send_data(sd, tosend[sockindex].listmid, size);
870         }
871
872         /* Send oids and version number tuples for objects that are read */
873         {
874           int size=(sizeof(unsigned int)+sizeof(unsigned short))*(tosend[sockindex].f.numread);
875           send_data(sd, tosend[sockindex].objread, size);
876         }
877
878         /* Send objects that are modified */
879         void *modptr;
880         if((modptr = calloc(1, tosend[sockindex].f.sum_bytes)) == NULL) {
881           printf("Calloc error for modified objects %s, %d\n", __FILE__, __LINE__);
882           free(listmid);
883           free(tosend);
884           return 1;
885         }
886         int offset = 0;
887         int i;
888         for(i = 0; i < tosend[sockindex].f.nummod ; i++) {
889           int size;
890           objheader_t *headeraddr;
891           if((headeraddr = t_chashSearch(tosend[sockindex].oidmod[i])) == NULL) {
892             printf("%s() Error: No such oid %s, %d\n", __func__, __FILE__, __LINE__);
893             free(modptr);
894             free(listmid);
895             free(tosend);
896             return 1;
897           }
898           GETSIZE(size,headeraddr);
899           size+=sizeof(objheader_t);
900           memcpy(modptr+offset, headeraddr, size);
901           offset+=size;
902         }
903         send_data(sd, modptr, tosend[sockindex].f.sum_bytes);
904         free(modptr);
905       } else { //handle request locally
906         handleLocalReq(&tosend[sockindex], &transinfo, &getReplyCtrl[sockindex]);
907       }
908       sockindex++;
909       pile = pile->next;
910     } //end of pile processing
911       /* Recv Ctrl msgs from all machines */
912     int i;
913     for(i = 0; i < pilecount; i++) {
914       int sd = socklist[i];
915       if(sd != 0) {
916         char control;
917         recv_data(sd, &control, sizeof(char));
918         //Update common data structure with new ctrl msg
919         getReplyCtrl[i] = control;
920         /* Recv Objects if participant sends TRANS_DISAGREE */
921 #ifdef CACHE
922         if(control == TRANS_DISAGREE) {
923           int length;
924           recv_data(sd, &length, sizeof(int));
925           void *newAddr;
926           pthread_mutex_lock(&prefetchcache_mutex);
927           if ((newAddr = prefetchobjstrAlloc((unsigned int)length)) == NULL) {
928             printf("Error: %s() objstrAlloc error for copying into prefetch cache %s, %d\n", __func__, __FILE__, __LINE__);
929             free(tosend);
930             free(listmid);
931             pthread_mutex_unlock(&prefetchcache_mutex);
932             return 1;
933           }
934           pthread_mutex_unlock(&prefetchcache_mutex);
935           recv_data(sd, newAddr, length);
936           int offset = 0;
937           while(length != 0) {
938             unsigned int oidToPrefetch;
939             objheader_t * header;
940             header = (objheader_t *)(((char *)newAddr) + offset);
941             oidToPrefetch = OID(header);
942             STATUS(header)=0;
943             int size = 0;
944             GETSIZE(size, header);
945             size += sizeof(objheader_t);
946             //make an entry in prefetch hash table
947             void *oldptr;
948             if((oldptr = prehashSearch(oidToPrefetch)) != NULL) {
949               prehashRemove(oidToPrefetch);
950               prehashInsert(oidToPrefetch, header);
951             } else {
952               prehashInsert(oidToPrefetch, header);
953             }
954             length = length - size;
955             offset += size;
956           }
957         } //end of receiving objs
958 #endif
959       }
960     }
961     /* Decide the final response */
962     if((finalResponse = decideResponse(getReplyCtrl, &treplyretry, pilecount)) == 0) {
963       printf("Error: %s() in updating prefetch cache %s, %d\n", __func__, __FILE__, __LINE__);
964       free(tosend);
965       free(listmid);
966       return 1;
967     }
968
969     /* Send responses to all machines */
970     for(i = 0; i < pilecount; i++) {
971       int sd = socklist[i];
972       if(sd != 0) {
973 #ifdef CACHE
974         if(finalResponse == TRANS_COMMIT) {
975           int retval;
976           /* Update prefetch cache */
977           if((retval = updatePrefetchCache(&(tosend[i]))) != 0) {
978             printf("Error: %s() in updating prefetch cache %s, %d\n", __func__, __FILE__, __LINE__);
979             free(tosend);
980             free(listmid);
981             return 1;
982           }
983
984
985           /* Invalidate objects in other machine cache */
986           if(tosend[i].f.nummod > 0) {
987             if((retval = invalidateObj(&(tosend[i]))) != 0) {
988               printf("Error: %s() in invalidating Objects %s, %d\n", __func__, __FILE__, __LINE__);
989               free(tosend);
990               free(listmid);
991               return 1;
992             }
993           }
994 #ifdef ABORTREADERS
995           removetransaction(tosend[i].oidmod,tosend[i].f.nummod);
996           removethisreadtransaction(tosend[i].objread, tosend[i].f.numread);
997 #endif
998         }
999 #ifdef ABORTREADERS
1000         else if (!treplyretry) {
1001           removethistransaction(tosend[i].oidmod,tosend[i].f.nummod);
1002           removethisreadtransaction(tosend[i].objread,tosend[i].f.numread);
1003         }
1004 #endif
1005 #endif
1006         send_data(sd, &finalResponse, sizeof(char));
1007       } else {
1008         /* Complete local processing */
1009         doLocalProcess(finalResponse, &(tosend[i]), &transinfo);
1010 #ifdef ABORTREADERS
1011         if(finalResponse == TRANS_COMMIT) {
1012           removetransaction(tosend[i].oidmod,tosend[i].f.nummod);
1013           removethisreadtransaction(tosend[i].objread,tosend[i].f.numread);
1014         } else if (!treplyretry) {
1015           removethistransaction(tosend[i].oidmod,tosend[i].f.nummod);
1016           removethisreadtransaction(tosend[i].objread,tosend[i].f.numread);
1017         }
1018 #endif
1019       }
1020     }
1021
1022     /* Free resources */
1023     free(tosend);
1024     free(listmid);
1025     if (!treplyretry)
1026       pDelete(pile_ptr);
1027     /* wait a random amount of time before retrying to commit transaction*/
1028     if(treplyretry) {
1029       randomdelay();
1030 #ifdef TRANSSTATS
1031       nSoftAbort++;
1032 #endif
1033     }
1034     /* Retry trans commit procedure during soft_abort case */
1035   } while (treplyretry);
1036
1037   if(finalResponse == TRANS_ABORT) {
1038     //printf("Aborting trans\n");
1039 #ifdef TRANSSTATS
1040     numTransAbort++;
1041 #endif
1042     /* Free Resources */
1043     objstrDelete(t_cache);
1044     t_chashDelete();
1045     return TRANS_ABORT;
1046   } else if(finalResponse == TRANS_COMMIT) {
1047 #ifdef TRANSSTATS
1048     numTransCommit++;
1049 #endif
1050     /* Free Resources */
1051     objstrDelete(t_cache);
1052     t_chashDelete();
1053     return 0;
1054   } else {
1055     //TODO Add other cases
1056     printf("Error: in %s() THIS SHOULD NOT HAPPEN.....EXIT PROGRAM\n", __func__);
1057     exit(-1);
1058   }
1059   return 0;
1060 }
1061
1062 /* This function handles the local objects involved in a transaction
1063  * commiting process.  It also makes a decision if this local machine
1064  * sends AGREE or DISAGREE or SOFT_ABORT to coordinator */
1065 void handleLocalReq(trans_req_data_t *tdata, trans_commit_data_t *transinfo, char *getReplyCtrl) {
1066   unsigned int *oidnotfound = NULL, *oidlocked = NULL;
1067   int numoidnotfound = 0, numoidlocked = 0;
1068   int v_nomatch = 0, v_matchlock = 0, v_matchnolock = 0;
1069   int numread, i;
1070   unsigned int oid;
1071   unsigned short version;
1072
1073   /* Counters and arrays to formulate decision on control message to be sent */
1074   oidnotfound = (unsigned int *) calloc((tdata->f.numread + tdata->f.nummod), sizeof(unsigned int));
1075   oidlocked = (unsigned int *) calloc((tdata->f.numread + tdata->f.nummod +1), sizeof(unsigned int)); // calloc additional 1 byte for
1076   //setting a divider between read and write locks
1077   numread = tdata->f.numread;
1078   /* Process each oid in the machine pile/ group per thread */
1079   for (i = 0; i < tdata->f.numread + tdata->f.nummod; i++) {
1080     if (i < tdata->f.numread) {
1081       int incr = sizeof(unsigned int) + sizeof(unsigned short); // Offset that points to next position in the objread array
1082       incr *= i;
1083       oid = *((unsigned int *)(((char *)tdata->objread) + incr));
1084       version = *((unsigned short *)(((char *)tdata->objread) + incr + sizeof(unsigned int)));
1085       commitCountForObjRead(getReplyCtrl, oidnotfound, oidlocked, &numoidnotfound, &numoidlocked, &v_nomatch, &v_matchlock, &v_matchnolock, oid, version);
1086     } else { // Objects Modified
1087       if(i == tdata->f.numread) {
1088         oidlocked[numoidlocked++] = -1;
1089       }
1090       int tmpsize;
1091       objheader_t *headptr;
1092       headptr = (objheader_t *) t_chashSearch(tdata->oidmod[i-numread]);
1093       if (headptr == NULL) {
1094         printf("Error: handleLocalReq() returning NULL, no such oid %s, %d\n", __FILE__, __LINE__);
1095         return;
1096       }
1097       oid = OID(headptr);
1098       version = headptr->version;
1099       commitCountForObjMod(getReplyCtrl, oidnotfound, oidlocked, &numoidnotfound, &numoidlocked, &v_nomatch, &v_matchlock, &v_matchnolock, oid, version);
1100     }
1101   }
1102
1103 /* Fill out the trans_commit_data_t data structure. This is required for a trans commit process
1104  * if Participant receives a TRANS_COMMIT */
1105   transinfo->objlocked = oidlocked;
1106   transinfo->objnotfound = oidnotfound;
1107   transinfo->modptr = NULL;
1108   transinfo->numlocked = numoidlocked;
1109   transinfo->numnotfound = numoidnotfound;
1110
1111   /* Condition to send TRANS_AGREE */
1112   if(v_matchnolock == tdata->f.numread + tdata->f.nummod) {
1113     *getReplyCtrl = TRANS_AGREE;
1114   }
1115   /* Condition to send TRANS_SOFT_ABORT */
1116   if((v_matchlock > 0 && v_nomatch == 0) || (numoidnotfound > 0 && v_nomatch == 0)) {
1117     *getReplyCtrl = TRANS_SOFT_ABORT;
1118   }
1119 }
1120
1121 void doLocalProcess(char finalResponse, trans_req_data_t *tdata, trans_commit_data_t *transinfo) {
1122   if(finalResponse == TRANS_ABORT) {
1123     if(transAbortProcess(transinfo) != 0) {
1124       printf("Error in transAbortProcess() %s,%d\n", __FILE__, __LINE__);
1125       fflush(stdout);
1126       return;
1127     }
1128   } else if(finalResponse == TRANS_COMMIT) {
1129 #ifdef CACHE
1130     /* Invalidate objects in other machine cache */
1131     if(tdata->f.nummod > 0) {
1132       int retval;
1133       if((retval = invalidateObj(tdata)) != 0) {
1134         printf("Error: %s() in invalidating Objects %s, %d\n", __func__, __FILE__, __LINE__);
1135         return;
1136       }
1137     }
1138 #endif
1139     if(transComProcess(tdata, transinfo) != 0) {
1140       printf("Error in transComProcess() %s,%d\n", __FILE__, __LINE__);
1141       fflush(stdout);
1142       return;
1143     }
1144   } else {
1145     printf("ERROR...No Decision\n");
1146   }
1147
1148   /* Free memory */
1149   if (transinfo->objlocked != NULL) {
1150     free(transinfo->objlocked);
1151   }
1152   if (transinfo->objnotfound != NULL) {
1153     free(transinfo->objnotfound);
1154   }
1155 }
1156
1157 /* This function decides the reponse that needs to be sent to
1158  * all Participant machines after the TRANS_REQUEST protocol */
1159 char decideResponse(char *getReplyCtrl, char *treplyretry, int pilecount) {
1160   int i, transagree = 0, transdisagree = 0, transsoftabort = 0; /* Counters to formulate decision of what
1161                                                                    message to send */
1162   for (i = 0 ; i < pilecount; i++) {
1163     char control;
1164     control = getReplyCtrl[i];
1165     switch(control) {
1166     default:
1167       printf("Participant sent unknown message in %s, %d\n", __FILE__, __LINE__);
1168
1169       /* treat as disagree, pass thru */
1170     case TRANS_DISAGREE:
1171       transdisagree++;
1172       break;
1173
1174     case TRANS_AGREE:
1175       transagree++;
1176       break;
1177
1178     case TRANS_SOFT_ABORT:
1179       transsoftabort++;
1180       break;
1181     }
1182   }
1183
1184   if(transdisagree > 0) {
1185     /* Send Abort */
1186     *treplyretry = 0;
1187     return TRANS_ABORT;
1188 #ifdef CACHE
1189     /* clear objects from prefetch cache */
1190     cleanPCache();
1191 #endif
1192   } else if(transagree == pilecount) {
1193     /* Send Commit */
1194     *treplyretry = 0;
1195     return TRANS_COMMIT;
1196   } else {
1197     /* Send Abort in soft abort case followed by retry commiting transaction again*/
1198     *treplyretry = 1;
1199     return TRANS_ABORT;
1200   }
1201   return 0;
1202 }
1203
1204 /* This function opens a connection, places an object read request to
1205  * the remote machine, reads the control message and object if
1206  * available and copies the object and its header to the local
1207  * cache. */
1208
1209 void *getRemoteObj(unsigned int mnum, unsigned int oid) {
1210   int size, val;
1211   struct sockaddr_in serv_addr;
1212   char machineip[16];
1213   char control;
1214   objheader_t *h;
1215   void *objcopy = NULL;
1216
1217   int sd = getSock2(transReadSockPool, mnum);
1218   char readrequest[sizeof(char)+sizeof(unsigned int)];
1219   readrequest[0] = READ_REQUEST;
1220   *((unsigned int *)(&readrequest[1])) = oid;
1221   send_data(sd, readrequest, sizeof(readrequest));
1222
1223   /* Read response from the Participant */
1224   recv_data(sd, &control, sizeof(char));
1225
1226   if (control==OBJECT_NOT_FOUND) {
1227     objcopy = NULL;
1228   } else {
1229     /* Read object if found into local cache */
1230     recv_data(sd, &size, sizeof(int));
1231     objcopy = objstrAlloc(&t_cache, size);
1232     recv_data(sd, objcopy, size);
1233     STATUS(objcopy)=0;
1234     /* Insert into cache's lookup table */
1235     t_chashInsert(oid, objcopy);
1236 #ifdef TRANSSTATS
1237     totalObjSize += size;
1238 #endif
1239   }
1240
1241   return objcopy;
1242 }
1243
1244 /*  Commit info for objects modified */
1245 void commitCountForObjMod(char *getReplyCtrl, unsigned int *oidnotfound, unsigned int *oidlocked, int *numoidnotfound,
1246                           int *numoidlocked, int *v_nomatch, int *v_matchlock, int *v_matchnolock, unsigned int oid, unsigned short version) {
1247   void *mobj;
1248   /* Check if object is still present in the machine since the beginning of TRANS_REQUEST */
1249   /* Save the oids not found and number of oids not found for later use */
1250   if ((mobj = mhashSearch(oid)) == NULL) { /* Obj not found */
1251     /* Save the oids not found and number of oids not found for later use */
1252     oidnotfound[*numoidnotfound] = oid;
1253     (*numoidnotfound)++;
1254   } else { /* If Obj found in machine (i.e. has not moved) */
1255     /* Check if Obj is locked by any previous transaction */
1256     if (write_trylock(STATUSPTR(mobj))) { // Can acquire write lock
1257       if (version == ((objheader_t *)mobj)->version) {      /* match versions */
1258         (*v_matchnolock)++;
1259         //Keep track of what is locked
1260         oidlocked[(*numoidlocked)++] = OID(((objheader_t *)mobj));
1261       } else { /* If versions don't match ...HARD ABORT */
1262         (*v_nomatch)++;
1263         /* Send TRANS_DISAGREE to Coordinator */
1264         *getReplyCtrl = TRANS_DISAGREE;
1265
1266         //Keep track of what is locked
1267         oidlocked[(*numoidlocked)++] = OID(((objheader_t *)mobj));
1268         //printf("%s() oid = %d, type = %d\t", __func__, OID(mobj), TYPE((objheader_t *)mobj));
1269         return;
1270       }
1271     } else { //A lock is acquired some place else
1272       if (version == ((objheader_t *)mobj)->version) { /* Check if versions match */
1273         (*v_matchlock)++;
1274       } else { /* If versions don't match ...HARD ABORT */
1275         (*v_nomatch)++;
1276         /* Send TRANS_DISAGREE to Coordinator */
1277         *getReplyCtrl = TRANS_DISAGREE;
1278         //printf("%s() oid = %d, type = %d\t", __func__, OID(mobj), TYPE((objheader_t *)mobj));
1279         return;
1280       }
1281     }
1282   }
1283 }
1284
1285 /*  Commit info for objects modified */
1286 void commitCountForObjRead(char *getReplyCtrl, unsigned int *oidnotfound, unsigned int *oidlocked, int *numoidnotfound,
1287                            int *numoidlocked, int *v_nomatch, int *v_matchlock, int *v_matchnolock, unsigned int oid, unsigned short version) {
1288   void *mobj;
1289   /* Check if object is still present in the machine since the beginning of TRANS_REQUEST */
1290   /* Save the oids not found and number of oids not found for later use */
1291   if ((mobj = mhashSearch(oid)) == NULL) { /* Obj not found */
1292     /* Save the oids not found and number of oids not found for later use */
1293     oidnotfound[*numoidnotfound] = oid;
1294     (*numoidnotfound)++;
1295   } else { /* If Obj found in machine (i.e. has not moved) */
1296     /* Check if Obj is locked by any previous transaction */
1297     if (read_trylock(STATUSPTR(mobj))) { // Can further acquire read locks
1298       if (version == ((objheader_t *)mobj)->version) {      /* If locked then match versions */
1299         (*v_matchnolock)++;
1300         //Keep track of what is locked
1301         oidlocked[(*numoidlocked)++] = OID(((objheader_t *)mobj));
1302       } else { /* If versions don't match ...HARD ABORT */
1303         (*v_nomatch)++;
1304         /* Send TRANS_DISAGREE to Coordinator */
1305         *getReplyCtrl = TRANS_DISAGREE;
1306         //Keep track of what is locked
1307         oidlocked[(*numoidlocked)++] = OID(((objheader_t *)mobj));
1308         //printf("%s() oid = %d, type = %d\t", __func__, OID(mobj), TYPE((objheader_t *)mobj));
1309         return;
1310       }
1311     } else { //Has reached max number of readers or some other transaction
1312       //has acquired a lock on this object
1313       if (version == ((objheader_t *)mobj)->version) { /* Check if versions match */
1314         (*v_matchlock)++;
1315       } else { /* If versions don't match ...HARD ABORT */
1316         (*v_nomatch)++;
1317         /* Send TRANS_DISAGREE to Coordinator */
1318         *getReplyCtrl = TRANS_DISAGREE;
1319         //printf("%s() oid = %d, type = %d\t", __func__, OID(mobj), TYPE((objheader_t *)mobj));
1320         return;
1321       }
1322     }
1323   }
1324 }
1325
1326 /* This function completes the ABORT process if the transaction is aborting */
1327 int transAbortProcess(trans_commit_data_t *transinfo) {
1328   int i, numlocked;
1329   unsigned int *objlocked;
1330   void *header;
1331
1332   numlocked = transinfo->numlocked;
1333   objlocked = transinfo->objlocked;
1334
1335   int useWriteUnlock = 0;
1336   for (i = 0; i < numlocked; i++) {
1337     if(objlocked[i] == -1) {
1338       useWriteUnlock = 1;
1339       continue;
1340     }
1341     if((header = mhashSearch(objlocked[i])) == NULL) {
1342       printf("mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__);
1343       return 1;
1344     }
1345     if(!useWriteUnlock) {
1346       read_unlock(STATUSPTR(header));
1347     } else {
1348       write_unlock(STATUSPTR(header));
1349     }
1350   }
1351
1352   return 0;
1353 }
1354
1355 /*This function completes the COMMIT process if the transaction is commiting*/
1356 int transComProcess(trans_req_data_t *tdata, trans_commit_data_t *transinfo) {
1357   objheader_t *header, *tcptr;
1358   int i, nummod, tmpsize, numcreated, numlocked;
1359   unsigned int *oidmod, *oidcreated, *oidlocked;
1360   void *ptrcreate;
1361
1362   nummod = tdata->f.nummod;
1363   oidmod = tdata->oidmod;
1364   numcreated = tdata->f.numcreated;
1365   oidcreated = tdata->oidcreated;
1366   numlocked = transinfo->numlocked;
1367   oidlocked = transinfo->objlocked;
1368
1369   for (i = 0; i < nummod; i++) {
1370     if((header = (objheader_t *) mhashSearch(oidmod[i])) == NULL) {
1371       printf("Error: transComProcess() mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__);
1372       return 1;
1373     }
1374     /* Copy from transaction cache -> main object store */
1375     if ((tcptr = ((objheader_t *) t_chashSearch(oidmod[i]))) == NULL) {
1376       printf("Error: transComProcess() chashSearch returned NULL at %s, %d\n", __FILE__, __LINE__);
1377       return 1;
1378     }
1379     GETSIZE(tmpsize, header);
1380     char *tmptcptr = (char *) tcptr;
1381     {
1382       struct ___Object___ *dst=(struct ___Object___*)((char*)header+sizeof(objheader_t));
1383       struct ___Object___ *src=(struct ___Object___*)((char*)tmptcptr+sizeof(objheader_t));
1384       dst->___cachedCode___=src->___cachedCode___;
1385       dst->___cachedHash___=src->___cachedHash___;
1386
1387       memcpy(&dst[1], &src[1], tmpsize-sizeof(struct ___Object___));
1388     }
1389
1390     header->version += 1;
1391     if(header->notifylist != NULL) {
1392       notifyAll(&header->notifylist, OID(header), header->version);
1393     }
1394   }
1395   /* If object is newly created inside transaction then commit it */
1396   for (i = 0; i < numcreated; i++) {
1397     if ((header = ((objheader_t *) t_chashSearch(oidcreated[i]))) == NULL) {
1398       printf("Error: transComProcess() chashSearch returned NULL for oid = %x at %s, %d\n", oidcreated[i], __FILE__, __LINE__);
1399       return 1;
1400     }
1401     GETSIZE(tmpsize, header);
1402     tmpsize += sizeof(objheader_t);
1403     pthread_mutex_lock(&mainobjstore_mutex);
1404     if ((ptrcreate = objstrAlloc(&mainobjstore, tmpsize)) == NULL) {
1405       printf("Error: transComProcess() failed objstrAlloc %s, %d\n", __FILE__, __LINE__);
1406       pthread_mutex_unlock(&mainobjstore_mutex);
1407       return 1;
1408     }
1409     pthread_mutex_unlock(&mainobjstore_mutex);
1410     /* Initialize read and write locks */
1411     initdsmlocks(STATUSPTR(header));
1412     memcpy(ptrcreate, header, tmpsize);
1413     mhashInsert(oidcreated[i], ptrcreate);
1414     lhashInsert(oidcreated[i], myIpAddr);
1415   }
1416   /* Unlock locked objects */
1417   int useWriteUnlock = 0;
1418   for(i = 0; i < numlocked; i++) {
1419     if(oidlocked[i] == -1) {
1420       useWriteUnlock = 1;
1421       continue;
1422     }
1423     if((header = (objheader_t *) mhashSearch(oidlocked[i])) == NULL) {
1424       printf("mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__);
1425       return 1;
1426     }
1427     if(!useWriteUnlock) {
1428       read_unlock(STATUSPTR(header));
1429     } else {
1430       write_unlock(STATUSPTR(header));
1431     }
1432   }
1433   return 0;
1434 }
1435
1436 prefetchpile_t *foundLocal(char *ptr) {
1437   int siteid = *(GET_SITEID(ptr));
1438   int ntuples = *(GET_NTUPLES(ptr));
1439   unsigned int * oidarray = GET_PTR_OID(ptr);
1440   unsigned short * endoffsets = GET_PTR_EOFF(ptr, ntuples);
1441   short * arryfields = GET_PTR_ARRYFLD(ptr, ntuples);
1442   prefetchpile_t * head=NULL;
1443   int numLocal = 0;
1444
1445   int i;
1446   for(i=0; i<ntuples; i++) {
1447     unsigned short baseindex=(i==0) ? 0 : endoffsets[i-1];
1448     unsigned short endindex=endoffsets[i];
1449     unsigned int oid=oidarray[i];
1450     int newbase;
1451     int machinenum;
1452
1453     if (oid==0)
1454       continue;
1455     //Look up fields locally
1456     for(newbase=baseindex; newbase<endindex; newbase++) {
1457       if (!lookupObject(&oid, arryfields[newbase]))
1458         break;
1459       //Ended in a null pointer...
1460       if (oid==0)
1461         goto tuple;
1462     }
1463     //Entire prefetch is local
1464     if (newbase==endindex&&checkoid(oid)) {
1465       numLocal++;
1466       goto tuple;
1467     }
1468     //Add to remote requests
1469     machinenum=lhashSearch(oid);
1470     insertPile(machinenum, oid, endindex-newbase, &arryfields[newbase], &head);
1471 tuple:
1472     ;
1473   }
1474
1475   /* handle dynamic prefetching */
1476   handleDynPrefetching(numLocal, ntuples, siteid);
1477   return head;
1478 }
1479
1480 int checkoid(unsigned int oid) {
1481   objheader_t *header;
1482   if ((header=mhashSearch(oid))!=NULL) {
1483     //Found on machine
1484     return 1;
1485   } else if ((header=prehashSearch(oid))!=NULL) {
1486     //Found in cache
1487     return 1;
1488   } else {
1489     return 0;
1490   }
1491 }
1492
1493 int lookupObject(unsigned int * oid, short offset) {
1494   objheader_t *header;
1495   if ((header=mhashSearch(*oid))!=NULL) {
1496     //Found on machine
1497     ;
1498   } else if ((header=prehashSearch(*oid))!=NULL) {
1499     //Found in cache
1500     ;
1501   } else {
1502     return 0;
1503   }
1504
1505   if(TYPE(header) >= NUMCLASSES) {
1506     int elementsize = classsize[TYPE(header)];
1507     struct ArrayObject *ao = (struct ArrayObject *) (((char *)header) + sizeof(objheader_t));
1508     int length = ao->___length___;
1509     /* Check if array out of bounds */
1510     if(offset < 0 || offset >= length) {
1511       //if yes treat the object as found
1512       (*oid)=0;
1513       return 1;
1514     }
1515     (*oid) = *((unsigned int *)(((char *)ao) + sizeof(struct ArrayObject) + (elementsize*offset)));
1516     return 1;
1517   } else {
1518     (*oid) = *((unsigned int *)(((char *)header) + sizeof(objheader_t) + offset));
1519     return 1;
1520   }
1521 }
1522
1523
1524 /* This function is called by the thread calling transPrefetch */
1525 void *transPrefetch(void *t) {
1526   while(1) {
1527     /* read from prefetch queue */
1528     void *node=gettail();
1529     /* Check if the tuples are found locally, if yes then reduce them further*/
1530     /* and group requests by remote machine ids by calling the makePreGroups() */
1531     prefetchpile_t *pilehead = foundLocal(node);
1532
1533     if (pilehead!=NULL) {
1534       // Get sock from shared pool
1535
1536       /* Send  Prefetch Request */
1537       prefetchpile_t *ptr = pilehead;
1538       while(ptr != NULL) {
1539         int sd = getSock2(transPrefetchSockPool, ptr->mid);
1540         sendPrefetchReq(ptr, sd);
1541         ptr = ptr->next;
1542       }
1543
1544       /* Release socket */
1545       //        freeSock(transPrefetchSockPool, pilehead->mid, sd);
1546
1547       /* Deallocated pilehead */
1548       mcdealloc(pilehead);
1549     }
1550     // Deallocate the prefetch queue pile node
1551     inctail();
1552   }
1553 }
1554
1555 void sendPrefetchReqnew(prefetchpile_t *mcpilenode, int sd) {
1556   objpile_t *tmp;
1557
1558   int size=sizeof(char)+sizeof(int);
1559   for(tmp=mcpilenode->objpiles; tmp!=NULL; tmp=tmp->next) {
1560     size += sizeof(int) + sizeof(unsigned int) + sizeof(unsigned int) + ((tmp->numoffset) * sizeof(short));
1561   }
1562
1563   char buft[size];
1564   char *buf=buft;
1565   *buf=TRANS_PREFETCH;
1566   buf+=sizeof(char);
1567
1568   for(tmp=mcpilenode->objpiles; tmp!=NULL; tmp=tmp->next) {
1569     int len = sizeof(int) + sizeof(unsigned int) + sizeof(unsigned int) + ((tmp->numoffset) * sizeof(short));
1570     *((int*)buf)=len;
1571     buf+=sizeof(int);
1572     *((unsigned int *)buf)=tmp->oid;
1573     buf+=sizeof(unsigned int);
1574     *((unsigned int *)(buf)) = myIpAddr;
1575     buf+=sizeof(unsigned int);
1576     memcpy(buf, tmp->offset, tmp->numoffset*sizeof(short));
1577     buf+=tmp->numoffset*sizeof(short);
1578   }
1579   *((int *)buf)=-1;
1580   send_data(sd, buft, size);
1581   return;
1582 }
1583
1584 void sendPrefetchReq(prefetchpile_t *mcpilenode, int sd) {
1585   int len, endpair;
1586   char control;
1587   objpile_t *tmp;
1588
1589   /* Send TRANS_PREFETCH control message */
1590   int first=1;
1591
1592   /* Send Oids and offsets in pairs */
1593   tmp = mcpilenode->objpiles;
1594   while(tmp != NULL) {
1595     len = sizeof(int) + sizeof(unsigned int) + sizeof(unsigned int) + ((tmp->numoffset) * sizeof(short));
1596     char oidnoffset[len+5];
1597     char *buf=oidnoffset;
1598     if (first) {
1599       *buf=TRANS_PREFETCH;
1600       buf++;len++;
1601       first=0;
1602     }
1603     *((int*)buf) = tmp->numoffset;
1604     buf+=sizeof(int);
1605     *((unsigned int *)buf) = tmp->oid;
1606 #ifdef TRANSSTATS
1607     sendRemoteReq++;
1608 #endif
1609     buf+=sizeof(unsigned int);
1610     *((unsigned int *)buf) = myIpAddr;
1611     buf += sizeof(unsigned int);
1612     memcpy(buf, tmp->offset, (tmp->numoffset)*sizeof(short));
1613     tmp = tmp->next;
1614     if (tmp==NULL) {
1615       *((int *)(&oidnoffset[len]))=-1;
1616       len+=sizeof(int);
1617     }
1618     send_data(sd, oidnoffset, len);
1619   }
1620
1621   LOGEVENT('S');
1622   return;
1623 }
1624
1625 int getPrefetchResponse(int sd, struct readstruct *readbuffer) {
1626   int length = 0, size = 0;
1627   char control;
1628   unsigned int oid;
1629   void *modptr, *oldptr;
1630
1631   recv_data_buf(sd, readbuffer, &length, sizeof(int));
1632   size = length - sizeof(int);
1633   char recvbuffer[size];
1634 #ifdef TRANSSTATS
1635     getResponse++;
1636     LOGEVENT('Z');
1637 #endif
1638     recv_data_buf(sd, readbuffer, recvbuffer, size);
1639   control = *((char *) recvbuffer);
1640   if(control == OBJECT_FOUND) {
1641     oid = *((unsigned int *)(recvbuffer + sizeof(char)));
1642     size = size - (sizeof(char) + sizeof(unsigned int));
1643     pthread_mutex_lock(&prefetchcache_mutex);
1644     if ((modptr = prefetchobjstrAlloc(size)) == NULL) {
1645       printf("Error: objstrAlloc error for copying into prefetch cache %s, %d\n", __FILE__, __LINE__);
1646       pthread_mutex_unlock(&prefetchcache_mutex);
1647       return -1;
1648     }
1649     pthread_mutex_unlock(&prefetchcache_mutex);
1650     memcpy(modptr, recvbuffer + sizeof(char) + sizeof(unsigned int), size);
1651     STATUS(modptr)=0;
1652
1653     /* Insert the oid and its address into the prefetch hash lookup table */
1654     /* Do a version comparison if the oid exists */
1655     if((oldptr = prehashSearch(oid)) != NULL) {
1656       /* If older version then update with new object ptr */
1657       if(((objheader_t *)oldptr)->version <= ((objheader_t *)modptr)->version) {
1658         prehashRemove(oid);
1659         prehashInsert(oid, modptr);
1660       }
1661     } else { /* Else add the object ptr to hash table*/
1662       prehashInsert(oid, modptr);
1663     }
1664     /* Lock the Prefetch Cache look up table*/
1665     pthread_mutex_lock(&pflookup.lock);
1666     /* Broadcast signal on prefetch cache condition variable */
1667     pthread_cond_broadcast(&pflookup.cond);
1668     /* Unlock the Prefetch Cache look up table*/
1669     pthread_mutex_unlock(&pflookup.lock);
1670   } else if(control == OBJECT_NOT_FOUND) {
1671     oid = *((unsigned int *)(recvbuffer + sizeof(char)));
1672     /* TODO: For each object not found query DHT for new location and retrieve the object */
1673     /* Throw an error */
1674     //printf("OBJECT %x NOT FOUND.... THIS SHOULD NOT HAPPEN...TERMINATE PROGRAM\n", oid);
1675     //    exit(-1);
1676   } else {
1677     printf("Error: in decoding the control value %d, %s, %d\n",control, __FILE__, __LINE__);
1678   }
1679
1680   return 0;
1681 }
1682
1683 unsigned short getObjType(unsigned int oid) {
1684   objheader_t *objheader;
1685   unsigned short numoffset[] ={0};
1686   short fieldoffset[] ={};
1687
1688   if ((objheader = (objheader_t *) mhashSearch(oid)) == NULL) {
1689 #ifdef CACHE
1690     if ((objheader = (objheader_t *) prehashSearch(oid)) == NULL) {
1691 #endif
1692     unsigned int mid = lhashSearch(oid);
1693     int sd = getSock2(transReadSockPool, mid);
1694     char remotereadrequest[sizeof(char)+sizeof(unsigned int)];
1695     remotereadrequest[0] = READ_REQUEST;
1696     *((unsigned int *)(&remotereadrequest[1])) = oid;
1697     send_data(sd, remotereadrequest, sizeof(remotereadrequest));
1698
1699     /* Read response from the Participant */
1700     char control;
1701     recv_data(sd, &control, sizeof(char));
1702
1703     if (control==OBJECT_NOT_FOUND) {
1704       printf("Error: in %s() THIS SHOULD NOT HAPPEN.....EXIT PROGRAM\n", __func__);
1705       fflush(stdout);
1706       exit(-1);
1707     } else {
1708       /* Read object if found into local cache */
1709       int size;
1710       recv_data(sd, &size, sizeof(int));
1711 #ifdef CACHE
1712       pthread_mutex_lock(&prefetchcache_mutex);
1713       if ((objheader = prefetchobjstrAlloc(size)) == NULL) {
1714         printf("Error: %s() objstrAlloc error for copying into prefetch cache %s, %d\n", __func__, __FILE__, __LINE__);
1715         pthread_exit(NULL);
1716       }
1717       pthread_mutex_unlock(&prefetchcache_mutex);
1718       recv_data(sd, objheader, size);
1719       prehashInsert(oid, objheader);
1720       return TYPE(objheader);
1721 #else
1722       char *buffer;
1723       if((buffer = calloc(1, size)) == NULL) {
1724         printf("%s() Calloc Error %s at line %d\n", __func__, __FILE__, __LINE__);
1725         fflush(stdout);
1726         return 0;
1727       }
1728       recv_data(sd, buffer, size);
1729       objheader = (objheader_t *)buffer;
1730       unsigned short type = TYPE(objheader);
1731       free(buffer);
1732       return type;
1733 #endif
1734     }
1735 #ifdef CACHE
1736   }
1737 #endif
1738   }
1739   return TYPE(objheader);
1740 }
1741
1742 int startRemoteThread(unsigned int oid, unsigned int mid) {
1743   int sock;
1744   struct sockaddr_in remoteAddr;
1745   char msg[1 + sizeof(unsigned int)];
1746   int bytesSent;
1747   int status;
1748
1749   if ((sock = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
1750     perror("startRemoteThread():socket()");
1751     return -1;
1752   }
1753
1754   bzero(&remoteAddr, sizeof(remoteAddr));
1755   remoteAddr.sin_family = AF_INET;
1756   remoteAddr.sin_port = htons(LISTEN_PORT);
1757   remoteAddr.sin_addr.s_addr = htonl(mid);
1758
1759   if (connect(sock, (struct sockaddr *)&remoteAddr, sizeof(remoteAddr)) < 0) {
1760     printf("startRemoteThread():error %d connecting to %s:%d\n", errno,
1761            inet_ntoa(remoteAddr.sin_addr), LISTEN_PORT);
1762     status = -1;
1763   } else
1764   {
1765     msg[0] = START_REMOTE_THREAD;
1766     *((unsigned int *) &msg[1]) = oid;
1767     send_data(sock, msg, 1 + sizeof(unsigned int));
1768   }
1769
1770   close(sock);
1771   return status;
1772 }
1773
1774 //TODO: when reusing oids, make sure they are not already in use!
1775 static unsigned int id = 0xFFFFFFFF;
1776 unsigned int getNewOID(void) {
1777   id += 2;
1778   if (id > oidMax || id < oidMin) {
1779     id = (oidMin | 1);
1780   }
1781   return id;
1782 }
1783
1784 int processConfigFile() {
1785   FILE *configFile;
1786   const int maxLineLength = 200;
1787   char lineBuffer[maxLineLength];
1788   char *token;
1789   const char *delimiters = " \t\n";
1790   char *commentBegin;
1791   in_addr_t tmpAddr;
1792
1793   configFile = fopen(CONFIG_FILENAME, "r");
1794   if (configFile == NULL) {
1795     printf("error opening %s:\n", CONFIG_FILENAME);
1796     perror("");
1797     return -1;
1798   }
1799
1800   numHostsInSystem = 0;
1801   sizeOfHostArray = 8;
1802   hostIpAddrs = calloc(sizeOfHostArray, sizeof(unsigned int));
1803
1804   while(fgets(lineBuffer, maxLineLength, configFile) != NULL) {
1805     commentBegin = strchr(lineBuffer, '#');
1806     if (commentBegin != NULL)
1807       *commentBegin = '\0';
1808     token = strtok(lineBuffer, delimiters);
1809     while (token != NULL) {
1810       tmpAddr = inet_addr(token);
1811       if ((int)tmpAddr == -1) {
1812         printf("error in %s: bad token:%s\n", CONFIG_FILENAME, token);
1813         fclose(configFile);
1814         return -1;
1815       } else
1816         addHost(htonl(tmpAddr));
1817       token = strtok(NULL, delimiters);
1818     }
1819   }
1820
1821   fclose(configFile);
1822
1823   if (numHostsInSystem < 1) {
1824     printf("error in %s: no IP Adresses found\n", CONFIG_FILENAME);
1825     return -1;
1826   }
1827 #ifdef MAC
1828   myIpAddr = getMyIpAddr("en1");
1829 #else
1830   myIpAddr = getMyIpAddr("eth0");
1831 #endif
1832   myIndexInHostArray = findHost(myIpAddr);
1833   if (myIndexInHostArray == -1) {
1834     printf("error in %s: IP Address of eth0 not found\n", CONFIG_FILENAME);
1835     return -1;
1836   }
1837   oidsPerBlock = (0xFFFFFFFF / numHostsInSystem) + 1;
1838   oidMin = oidsPerBlock * myIndexInHostArray;
1839   if (myIndexInHostArray == numHostsInSystem - 1)
1840     oidMax = 0xFFFFFFFF;
1841   else
1842     oidMax = oidsPerBlock * (myIndexInHostArray + 1) - 1;
1843
1844   return 0;
1845 }
1846
1847 void addHost(unsigned int hostIp) {
1848   unsigned int *tmpArray;
1849
1850   if (findHost(hostIp) != -1)
1851     return;
1852
1853   if (numHostsInSystem == sizeOfHostArray) {
1854     tmpArray = calloc(sizeOfHostArray * 2, sizeof(unsigned int));
1855     memcpy(tmpArray, hostIpAddrs, sizeof(unsigned int) * numHostsInSystem);
1856     free(hostIpAddrs);
1857     hostIpAddrs = tmpArray;
1858   }
1859
1860   hostIpAddrs[numHostsInSystem++] = hostIp;
1861
1862   return;
1863 }
1864
1865 int findHost(unsigned int hostIp) {
1866   int i;
1867   for (i = 0; i < numHostsInSystem; i++)
1868     if (hostIpAddrs[i] == hostIp)
1869       return i;
1870
1871   //not found
1872   return -1;
1873 }
1874
1875 /* This function sends notification request per thread waiting on object(s) whose version
1876  * changes */
1877 int reqNotify(unsigned int *oidarry, unsigned short *versionarry, unsigned int numoid) {
1878   int sock,i;
1879   objheader_t *objheader;
1880   struct sockaddr_in remoteAddr;
1881   char msg[1 + numoid * (sizeof(unsigned short) + sizeof(unsigned int)) +  3 * sizeof(unsigned int)];
1882   char *ptr;
1883   int bytesSent;
1884   int status, size;
1885   unsigned short version;
1886   unsigned int oid,mid;
1887   static unsigned int threadid = 0;
1888   pthread_mutex_t threadnotify = PTHREAD_MUTEX_INITIALIZER; //Lock and condition var for threadjoin and notification
1889   pthread_cond_t threadcond = PTHREAD_COND_INITIALIZER;
1890   notifydata_t *ndata;
1891
1892   oid = oidarry[0];
1893   if((mid = lhashSearch(oid)) == 0) {
1894     printf("Error: %s() No such machine found for oid =%x\n",__func__, oid);
1895     return;
1896   }
1897
1898   if ((sock = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
1899     perror("reqNotify():socket()");
1900     return -1;
1901   }
1902
1903   bzero(&remoteAddr, sizeof(remoteAddr));
1904   remoteAddr.sin_family = AF_INET;
1905   remoteAddr.sin_port = htons(LISTEN_PORT);
1906   remoteAddr.sin_addr.s_addr = htonl(mid);
1907
1908   /* Generate unique threadid */
1909   threadid++;
1910
1911   /* Save threadid, numoid, oidarray, versionarray, pthread_cond_variable for later processing */
1912   if((ndata = calloc(1, sizeof(notifydata_t))) == NULL) {
1913     printf("Calloc Error %s, %d\n", __FILE__, __LINE__);
1914     return -1;
1915   }
1916   ndata->numoid = numoid;
1917   ndata->threadid = threadid;
1918   ndata->oidarry = oidarry;
1919   ndata->versionarry = versionarry;
1920   ndata->threadcond = threadcond;
1921   ndata->threadnotify = threadnotify;
1922   if((status = notifyhashInsert(threadid, ndata)) != 0) {
1923     printf("reqNotify(): Insert into notify hash table not successful %s, %d\n", __FILE__, __LINE__);
1924     free(ndata);
1925     return -1;
1926   }
1927
1928   /* Send  number of oids, oidarry, version array, machine id and threadid */
1929   if (connect(sock, (struct sockaddr *)&remoteAddr, sizeof(remoteAddr)) < 0) {
1930     printf("reqNotify():error %d connecting to %s:%d\n", errno,
1931            inet_ntoa(remoteAddr.sin_addr), LISTEN_PORT);
1932     free(ndata);
1933     return -1;
1934   } else {
1935     msg[0] = THREAD_NOTIFY_REQUEST;
1936     *((unsigned int *)(&msg[1])) = numoid;
1937     /* Send array of oids  */
1938     size = sizeof(unsigned int);
1939
1940     for(i = 0;i < numoid; i++) {
1941       oid = oidarry[i];
1942       *((unsigned int *)(&msg[1] + size)) = oid;
1943       size += sizeof(unsigned int);
1944     }
1945
1946     /* Send array of version  */
1947     for(i = 0;i < numoid; i++) {
1948       version = versionarry[i];
1949       *((unsigned short *)(&msg[1] + size)) = version;
1950       size += sizeof(unsigned short);
1951     }
1952
1953     *((unsigned int *)(&msg[1] + size)) = myIpAddr; size += sizeof(unsigned int);
1954     *((unsigned int *)(&msg[1] + size)) = threadid;
1955     pthread_mutex_lock(&(ndata->threadnotify));
1956     size = 1 + numoid * (sizeof(unsigned int) + sizeof(unsigned short)) + 3 * sizeof(unsigned int);
1957     send_data(sock, msg, size);
1958     pthread_cond_wait(&(ndata->threadcond), &(ndata->threadnotify));
1959     pthread_mutex_unlock(&(ndata->threadnotify));
1960   }
1961
1962   pthread_cond_destroy(&threadcond);
1963   pthread_mutex_destroy(&threadnotify);
1964   free(ndata);
1965   close(sock);
1966   return status;
1967 }
1968
1969 void threadNotify(unsigned int oid, unsigned short version, unsigned int tid) {
1970   notifydata_t *ndata;
1971   int i, objIsFound = 0, index;
1972   void *ptr;
1973
1974   //Look up the tid and call the corresponding pthread_cond_signal
1975   if((ndata = notifyhashSearch(tid)) == NULL) {
1976     printf("threadnotify(): No such threadid is present %s, %d\n", __FILE__, __LINE__);
1977     return;
1978   } else  {
1979     for(i = 0; i < ndata->numoid; i++) {
1980       if(ndata->oidarry[i] == oid) {
1981         objIsFound = 1;
1982         index = i;
1983       }
1984     }
1985     if(objIsFound == 0) {
1986       printf("threadNotify(): Oid not found %s, %d\n", __FILE__, __LINE__);
1987       return;
1988     } else {
1989       if(version <= ndata->versionarry[index]) {
1990         printf("threadNotify(): New version %d has not changed since last version for oid = %d, %s, %d\n", version, oid, __FILE__, __LINE__);
1991         return;
1992       } else {
1993 #ifdef CACHE
1994         /* Clear from prefetch cache and free thread related data structure */
1995         if((ptr = prehashSearch(oid)) != NULL) {
1996           prehashRemove(oid);
1997         }
1998 #endif
1999         pthread_mutex_lock(&(ndata->threadnotify));
2000         pthread_cond_signal(&(ndata->threadcond));
2001         pthread_mutex_unlock(&(ndata->threadnotify));
2002       }
2003     }
2004   }
2005   return;
2006 }
2007
2008 int notifyAll(threadlist_t **head, unsigned int oid, unsigned int version) {
2009   threadlist_t *ptr;
2010   unsigned int mid;
2011   struct sockaddr_in remoteAddr;
2012   char msg[1 + sizeof(unsigned short) + 2*sizeof(unsigned int)];
2013   int sock, status, size, bytesSent;
2014
2015   while(*head != NULL) {
2016     ptr = *head;
2017     mid = ptr->mid;
2018     //create a socket connection to that machine
2019     if ((sock = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
2020       perror("notifyAll():socket()");
2021       return -1;
2022     }
2023
2024     bzero(&remoteAddr, sizeof(remoteAddr));
2025     remoteAddr.sin_family = AF_INET;
2026     remoteAddr.sin_port = htons(LISTEN_PORT);
2027     remoteAddr.sin_addr.s_addr = htonl(mid);
2028     //send Thread Notify response and threadid to that machine
2029     if (connect(sock, (struct sockaddr *)&remoteAddr, sizeof(remoteAddr)) < 0) {
2030       printf("notifyAll():error %d connecting to %s:%d\n", errno,
2031              inet_ntoa(remoteAddr.sin_addr), LISTEN_PORT);
2032       fflush(stdout);
2033       status = -1;
2034     } else {
2035       bzero(msg, (1+sizeof(unsigned short) + 2*sizeof(unsigned int)));
2036       msg[0] = THREAD_NOTIFY_RESPONSE;
2037       *((unsigned int *)&msg[1]) = oid;
2038       size = sizeof(unsigned int);
2039       *((unsigned short *)(&msg[1]+ size)) = version;
2040       size+= sizeof(unsigned short);
2041       *((unsigned int *)(&msg[1]+ size)) = ptr->threadid;
2042
2043       size = 1 + 2*sizeof(unsigned int) + sizeof(unsigned short);
2044       send_data(sock, msg, size);
2045     }
2046     //close socket
2047     close(sock);
2048     // Update head
2049     *head = ptr->next;
2050     free(ptr);
2051   }
2052   return status;
2053 }
2054
2055 void transAbort() {
2056 #ifdef ABORTREADERS
2057   removetransactionhash();
2058 #endif
2059   objstrDelete(t_cache);
2060   t_chashDelete();
2061 }
2062
2063 /* This function inserts necessary information into
2064  * a machine pile data structure */
2065 plistnode_t *pInsert(plistnode_t *pile, objheader_t *headeraddr, unsigned int mid, int num_objs) {
2066   plistnode_t *ptr, *tmp;
2067   int found = 0, offset = 0;
2068
2069   tmp = pile;
2070   //Add oid into a machine that is already present in the pile linked list structure
2071   while(tmp != NULL) {
2072     if (tmp->mid == mid) {
2073       int tmpsize;
2074
2075       if (STATUS(headeraddr) & NEW) {
2076         tmp->oidcreated[tmp->numcreated] = OID(headeraddr);
2077         tmp->numcreated++;
2078         GETSIZE(tmpsize, headeraddr);
2079         tmp->sum_bytes += sizeof(objheader_t) + tmpsize;
2080       } else if (STATUS(headeraddr) & DIRTY) {
2081         tmp->oidmod[tmp->nummod] = OID(headeraddr);
2082         tmp->nummod++;
2083         GETSIZE(tmpsize, headeraddr);
2084         tmp->sum_bytes += sizeof(objheader_t) + tmpsize;
2085       } else {
2086         offset = (sizeof(unsigned int) + sizeof(short)) * tmp->numread;
2087         *((unsigned int *)(((char *)tmp->objread) + offset))=OID(headeraddr);
2088         offset += sizeof(unsigned int);
2089         *((short *)(((char *)tmp->objread) + offset)) = headeraddr->version;
2090         tmp->numread++;
2091       }
2092       found = 1;
2093       break;
2094     }
2095     tmp = tmp->next;
2096   }
2097   //Add oid for any new machine
2098   if (!found) {
2099     int tmpsize;
2100     if((ptr = pCreate(num_objs)) == NULL) {
2101       return NULL;
2102     }
2103     ptr->mid = mid;
2104     if (STATUS(headeraddr) & NEW) {
2105       ptr->oidcreated[ptr->numcreated] = OID(headeraddr);
2106       ptr->numcreated++;
2107       GETSIZE(tmpsize, headeraddr);
2108       ptr->sum_bytes += sizeof(objheader_t) + tmpsize;
2109     } else if (STATUS(headeraddr) & DIRTY) {
2110       ptr->oidmod[ptr->nummod] = OID(headeraddr);
2111       ptr->nummod++;
2112       GETSIZE(tmpsize, headeraddr);
2113       ptr->sum_bytes += sizeof(objheader_t) + tmpsize;
2114     } else {
2115       *((unsigned int *)ptr->objread)=OID(headeraddr);
2116       offset = sizeof(unsigned int);
2117       *((short *)(((char *)ptr->objread) + offset)) = headeraddr->version;
2118       ptr->numread++;
2119     }
2120     ptr->next = pile;
2121     pile = ptr;
2122   }
2123
2124   /* Clear Flags */
2125   STATUS(headeraddr) =0;
2126
2127
2128   return pile;
2129 }
2130
2131 plistnode_t *sortPiles(plistnode_t *pileptr) {
2132   plistnode_t *head, *ptr, *tail;
2133   head = pileptr;
2134   ptr = pileptr;
2135   /* Get tail pointer */
2136   while(ptr!= NULL) {
2137     tail = ptr;
2138     ptr = ptr->next;
2139   }
2140   ptr = pileptr;
2141   plistnode_t *prev = pileptr;
2142   /* Arrange local machine processing at the end of the pile list */
2143   while(ptr != NULL) {
2144     if(ptr != tail) {
2145       if(ptr->mid == myIpAddr && (prev != pileptr)) {
2146         prev->next = ptr->next;
2147         ptr->next = NULL;
2148         tail->next = ptr;
2149         return pileptr;
2150       }
2151       if((ptr->mid == myIpAddr) && (prev == pileptr)) {
2152         prev = ptr->next;
2153         ptr->next = NULL;
2154         tail->next = ptr;
2155         return prev;
2156       }
2157       prev = ptr;
2158     }
2159     ptr = ptr->next;
2160   }
2161   return pileptr;
2162 }