implementing sandbox
[IRC.git] / Robust / src / Runtime / DSTM / interface / trans.c
1 #include "dstm.h"
2 #include "debugmacro.h"
3 #include "ip.h"
4 #include "machinepile.h"
5 #include "altmlookup.h"
6 #include "llookup.h"
7 #include "plookup.h"
8 #include "altprelookup.h"
9 #include "threadnotify.h"
10 #include "queue.h"
11 #include "addUdpEnhance.h"
12 #include "addPrefetchEnhance.h"
13 #include "gCollect.h"
14 #include "dsmlock.h"
15 #include "prefetch.h"
16 #ifdef COMPILER
17 #include "thread.h"
18 #endif
19 #ifdef ABORTREADERS
20 #include "abortreaders.h"
21 #endif
22 #include "trans.h"
23
24 #define NUM_THREADS 1
25 #define CONFIG_FILENAME "dstm.conf"
26
27 //#define LOGEVENTS //turn on Logging events
28 #ifdef LOGEVENTS
29 char bigarray[16*1024*1024];
30 int bigindex=0;
31 #define LOGEVENT(x) { \
32     int tmp=bigindex++;                         \
33     bigarray[tmp]=x;                            \
34   }
35 #else
36 #define LOGEVENT(x)
37 #endif
38
39 //#define LOGTIMES
40 #ifdef LOGTIMES
41 char bigarray1[6*1024*1024];
42 unsigned int bigarray2[6*1024*1024];
43 unsigned int bigarray3[6*1024*1024];
44 long long bigarray4[6*1024*1024];
45 int bigarray5[6*1024*1024];
46 int bigindex1=0;
47 #define LOGTIME(x,y,z,a,b) {\
48   int tmp=bigindex1; \
49   bigarray1[tmp]=x; \
50   bigarray2[tmp]=y; \
51   bigarray3[tmp]=z; \
52   bigarray4[tmp]=a; \
53   bigarray5[tmp]=b; \
54   bigindex1++; \
55 }
56 #else
57 #define LOGTIME(x,y,z,a,b)
58 #endif
59
60 /* Thread transaction variables */
61
62 __thread objstr_t *t_cache;
63 __thread struct ___Object___ *revertlist;
64 #ifdef ABORTREADERS
65 __thread int t_abort;
66 __thread jmp_buf aborttrans;
67 #endif
68
69 int globalid=0; /* This variable is a unique global identifier for a sendPrefetch request */
70
71 /* Global Variables */
72 extern int classsize[];
73 pfcstats_t *evalPrefetch;
74 extern int numprefetchsites; //Global variable containing number of prefetch sites
75 extern pthread_mutex_t mainobjstore_mutex; // Mutex to lock main Object store
76 pthread_mutex_t prefetchcache_mutex; // Mutex to lock Prefetch Cache
77 pthread_mutexattr_t prefetchcache_mutex_attr; /* Attribute for lock to make it a recursive lock */
78 extern prehashtable_t pflookup; //Global Prefetch cache's lookup table
79 pthread_t wthreads[NUM_THREADS]; //Worker threads for working on the prefetch queue
80 pthread_t tPrefetch;            /* Primary Prefetch thread that processes the prefetch queue */
81 extern objstr_t *mainobjstore;
82 unsigned int myIpAddr;
83 unsigned int *hostIpAddrs;
84 int sizeOfHostArray;
85 int numHostsInSystem;
86 int myIndexInHostArray;
87 unsigned int oidsPerBlock;
88 unsigned int oidMin;
89 unsigned int oidMax;
90
91 sockPoolHashTable_t *transReadSockPool;
92 sockPoolHashTable_t *transPrefetchSockPool;
93 sockPoolHashTable_t *transRequestSockPool;
94 pthread_mutex_t notifymutex;
95 pthread_mutex_t atomicObjLock;
96
97 /***********************************
98  * Global Variables for statistics
99  **********************************/
100 int numTransCommit = 0;
101 int numTransAbort = 0;
102 int nchashSearch = 0;
103 int nmhashSearch = 0;
104 int nprehashSearch = 0;
105 int ndirtyCacheObj = 0;
106 int nRemoteSend = 0;
107 int nSoftAbort = 0;
108 int bytesSent = 0;
109 int bytesRecv = 0;
110 int totalObjSize = 0;
111 int sendRemoteReq = 0;
112 int getResponse = 0;
113
114 void printhex(unsigned char *, int);
115 plistnode_t *createPiles();
116 plistnode_t *sortPiles(plistnode_t *pileptr);
117
118
119
120 /*******************************
121 * Send and Recv function calls
122 *******************************/
123 void send_data(int fd, void *buf, int buflen) {
124   char *buffer = (char *)(buf);
125   int size = buflen;
126   int numbytes;
127   while (size > 0) {
128     numbytes = send(fd, buffer, size, MSG_NOSIGNAL);
129     bytesSent = bytesSent + numbytes;
130     if (numbytes == -1) {
131       perror("send");
132       exit(0);
133     }
134     buffer += numbytes;
135     size -= numbytes;
136   }
137 }
138
139 void send_buf(int fd, struct writestruct * sendbuffer, void *buffer, int buflen) {
140   if (buflen+sendbuffer->offset>WMAXBUF) {
141     send_data(fd, sendbuffer->buf, sendbuffer->offset);
142     sendbuffer->offset=0;
143     send_data(fd, buffer, buflen);
144     return;
145   }
146   memcpy(&sendbuffer->buf[sendbuffer->offset], buffer, buflen);
147   sendbuffer->offset+=buflen;
148   if (sendbuffer->offset>WTOP) {
149     send_data(fd, sendbuffer->buf, sendbuffer->offset);
150     sendbuffer->offset=0;
151   }
152 }
153
154 void forcesend_buf(int fd, struct writestruct * sendbuffer, void *buffer, int buflen) {
155   if (buflen+sendbuffer->offset>WMAXBUF) {
156     send_data(fd, sendbuffer->buf, sendbuffer->offset);
157     sendbuffer->offset=0;
158     send_data(fd, buffer, buflen);
159     return;
160   }
161   memcpy(&sendbuffer->buf[sendbuffer->offset], buffer, buflen);
162   sendbuffer->offset+=buflen;
163   send_data(fd, sendbuffer->buf, sendbuffer->offset);
164   sendbuffer->offset=0;
165 }
166
167 int recvw(int fd, void *buf, int len, int flags) {
168   return recv(fd, buf, len, flags);
169 }
170  
171 void recv_data_buf(int fd, struct readstruct * readbuffer, void *buffer, int buflen) {
172   char *buf=(char *)buffer;
173   int numbytes=readbuffer->head-readbuffer->tail;
174   if (numbytes>buflen)
175     numbytes=buflen;
176   if (numbytes>0) {
177     memcpy(buf, &readbuffer->buf[readbuffer->tail], numbytes);
178     readbuffer->tail+=numbytes;
179     buflen-=numbytes;
180     buf+=numbytes;
181   }
182   if (buflen==0) {
183     return;
184   }
185   if (buflen>=MAXBUF) {
186     recv_data(fd, buf, buflen);
187     return;
188   }
189   
190   int maxbuf=MAXBUF;
191   int obufflen=buflen;
192   readbuffer->head=0;
193   
194   while (buflen > 0) {
195     int numbytes = recvw(fd, &readbuffer->buf[readbuffer->head], maxbuf, 0);
196     if (numbytes == -1) {
197       perror("recv");
198       exit(0);
199     }
200     bytesRecv+=numbytes;
201     buflen-=numbytes;
202     readbuffer->head+=numbytes;
203     maxbuf-=numbytes;
204   }
205   memcpy(buf,readbuffer->buf,obufflen);
206   readbuffer->tail=obufflen;
207 }
208
209 int recv_data_errorcode_buf(int fd, struct readstruct * readbuffer, void *buffer, int buflen) {
210   char *buf=(char *)buffer;
211   //now tail<=head
212   int numbytes=readbuffer->head-readbuffer->tail;
213   if (numbytes>buflen)
214     numbytes=buflen;
215   if (numbytes>0) {
216     memcpy(buf, &readbuffer->buf[readbuffer->tail], numbytes);
217     readbuffer->tail+=numbytes;
218     buflen-=numbytes;
219     buf+=numbytes;
220   }
221   if (buflen==0)
222     return 1;
223
224   if (buflen>=MAXBUF) {
225     return recv_data_errorcode(fd, buf, buflen);
226   }
227
228   int maxbuf=MAXBUF;
229   int obufflen=buflen;
230   readbuffer->head=0;
231   
232   while (buflen > 0) {
233     int numbytes = recvw(fd, &readbuffer->buf[readbuffer->head], maxbuf, 0);
234     if (numbytes ==0) {
235       return 0;
236     }
237     if (numbytes==-1) {
238       perror("recvbuf");
239       return -1;
240     }
241     bytesRecv+=numbytes;
242     buflen-=numbytes;
243     readbuffer->head+=numbytes;
244     maxbuf-=numbytes;
245   }
246   memcpy(buf,readbuffer->buf,obufflen);
247   readbuffer->tail=obufflen;
248   return 1;
249 }
250
251
252 void recv_data(int fd, void *buf, int buflen) {
253   char *buffer = (char *)(buf);
254   int size = buflen;
255   int numbytes;
256   while (size > 0) {
257     numbytes = recvw(fd, buffer, size, 0);
258     bytesRecv = bytesRecv + numbytes;
259     if (numbytes == -1) {
260       perror("recv");
261       exit(0);
262     }
263     buffer += numbytes;
264     size -= numbytes;
265   }
266 }
267
268 int recv_data_errorcode(int fd, void *buf, int buflen) {
269   char *buffer = (char *)(buf);
270   int size = buflen;
271   int numbytes;
272   while (size > 0) {
273     numbytes = recvw(fd, buffer, size, 0);
274     if (numbytes==0)
275       return 0;
276     if (numbytes == -1) {
277       perror("recv");
278       return -1;
279     }
280     bytesRecv+=numbytes;
281     buffer += numbytes;
282     size -= numbytes;
283   }
284   return 1;
285 }
286
287 void printhex(unsigned char *ptr, int numBytes) {
288   int i;
289   for (i = 0; i < numBytes; i++) {
290     if (ptr[i] < 16)
291       printf("0%x ", ptr[i]);
292     else
293       printf("%x ", ptr[i]);
294   }
295   printf("\n");
296   return;
297 }
298
299 inline int arrayLength(int *array) {
300   int i;
301   for(i=0 ; array[i] != -1; i++)
302     ;
303   return i;
304 }
305
306 inline int findmax(int *array, int arraylength) {
307   int max, i;
308   max = array[0];
309   for(i = 0; i < arraylength; i++) {
310     if(array[i] > max) {
311       max = array[i];
312     }
313   }
314   return max;
315 }
316
317 //#define INLINEPREFETCH
318 #define PREFTHRESHOLD 0
319
320 /* This function is a prefetch call generated by the compiler that
321  * populates the shared primary prefetch queue*/
322 void prefetch(int siteid, int ntuples, unsigned int *oids, unsigned short *endoffsets, short *arrayfields) {
323   /* Allocate for the queue node*/
324   int qnodesize = 2*sizeof(int) + ntuples * (sizeof(unsigned short) + sizeof(unsigned int)) + endoffsets[ntuples - 1] * sizeof(short);
325   int len;
326 #ifdef INLINEPREFETCH
327   int attempted=0;
328   char *node;
329   do {
330   node=getmemory(qnodesize);
331   if (node==NULL&&attempted)
332     break;
333   if (node!=NULL) {
334 #else
335   char *node=getmemory(qnodesize);
336 #endif
337   int top=endoffsets[ntuples-1];
338
339   if (node==NULL) {
340     LOGEVENT('D');
341     return;
342   }
343   /* Set queue node values */
344
345   /* TODO: Remove this after testing */
346   evalPrefetch[siteid].callcount++;
347
348   *((int *)(node))=siteid;
349   *((int *)(node + sizeof(int))) = ntuples;
350   len = 2*sizeof(int);
351   memcpy(node+len, oids, ntuples*sizeof(unsigned int));
352   memcpy(node+len+ntuples*sizeof(unsigned int), endoffsets, ntuples*sizeof(unsigned short));
353   memcpy(node+len+ntuples*(sizeof(unsigned int)+sizeof(short)), arrayfields, top*sizeof(short));
354
355 #ifdef INLINEPREFETCH
356   movehead(qnodesize);
357   }
358   int numpref=numavailable();
359   attempted=1;
360
361   if (node==NULL && numpref!=0 || numpref>=PREFTHRESHOLD) {
362     node=gettail();
363     prefetchpile_t *pilehead = foundLocal(node,numpref,siteid);
364     if (pilehead!=NULL) {
365       // Get sock from shared pool
366       
367       /* Send  Prefetch Request */
368       prefetchpile_t *ptr = pilehead;
369       while(ptr != NULL) {
370         globalid++;
371         int sd = getSock2(transPrefetchSockPool, ptr->mid);
372         sendPrefetchReq(ptr, sd, globalid);
373         ptr = ptr->next;
374       }
375       
376       mcdealloc(pilehead);
377     }
378     resetqueue();
379   }//end do prefetch if condition
380   } while(node==NULL);
381 #else
382   /* Lock and insert into primary prefetch queue */
383   movehead(qnodesize);
384 #endif
385 }
386
387 /* This function starts up the transaction runtime. */
388 int dstmStartup(const char * option) {
389   pthread_t thread_Listen, udp_thread_Listen;
390   pthread_attr_t attr;
391   int master=option!=NULL && strcmp(option, "master")==0;
392   int fd;
393   int udpfd;
394
395   if (processConfigFile() != 0)
396     return 0; //TODO: return error value, cause main program to exit
397 #ifdef COMPILER
398   if (!master)
399     threadcount--;
400 #endif
401
402 #ifdef TRANSSTATS
403   printf("Trans stats is on\n");
404   fflush(stdout);
405 #endif
406 #ifdef ABORTREADERS
407   initreaderlist();
408 #endif
409
410   //Initialize socket pool
411   transReadSockPool = createSockPool(transReadSockPool, DEFAULTSOCKPOOLSIZE);
412   transPrefetchSockPool = createSockPool(transPrefetchSockPool, DEFAULTSOCKPOOLSIZE);
413   transRequestSockPool = createSockPool(transRequestSockPool, DEFAULTSOCKPOOLSIZE);
414
415   dstmInit();
416   transInit();
417
418   fd=startlistening();
419   pthread_attr_init(&attr);
420   pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
421 #ifdef CACHE
422   udpfd = udpInit();
423   pthread_create(&udp_thread_Listen, &attr, udpListenBroadcast, (void*)udpfd);
424 #endif
425   if (master) {
426     pthread_create(&thread_Listen, &attr, dstmListen, (void*)fd);
427     return 1;
428   } else {
429     dstmListen((void *)fd);
430     return 0;
431   }
432 }
433
434 //TODO Use this later
435 void *pCacheAlloc(objstr_t *store, unsigned int size) {
436   void *tmp;
437   objstr_t *ptr;
438   ptr = store;
439   int success = 0;
440
441   while(ptr->next != NULL) {
442     /* check if store is empty */
443     if(((unsigned int)ptr->top - (unsigned int)ptr - sizeof(objstr_t) + size) <= ptr->size) {
444       tmp = ptr->top;
445       ptr->top += size;
446       success = 1;
447       return tmp;
448     } else {
449       ptr = ptr->next;
450     }
451   }
452
453   if(success == 0) {
454     return NULL;
455   }
456 }
457
458 /* This function initiates the prefetch thread A queue is shared
459  * between the main thread of execution and the prefetch thread to
460  * process the prefetch call Call from compiler populates the shared
461  * queue with prefetch requests while prefetch thread processes the
462  * prefetch requests */
463
464 void transInit() {
465   //Create and initialize prefetch cache structure
466 #ifdef CACHE
467   initializePCache();
468   if((evalPrefetch = initPrefetchStats()) == NULL) {
469     printf("%s() Error allocating memory at %s, %d\n", __func__, __FILE__, __LINE__);
470     exit(0);
471   }
472 #endif
473
474   /* Initialize attributes for mutex */
475   pthread_mutexattr_init(&prefetchcache_mutex_attr);
476   pthread_mutexattr_settype(&prefetchcache_mutex_attr, PTHREAD_MUTEX_RECURSIVE_NP);
477
478   pthread_mutex_init(&prefetchcache_mutex, &prefetchcache_mutex_attr);
479   pthread_mutex_init(&notifymutex, NULL);
480   pthread_mutex_init(&atomicObjLock, NULL);
481 #ifdef CACHE
482   //Create prefetch cache lookup table
483   if(prehashCreate(PHASH_SIZE, PLOADFACTOR)) {
484     printf("ERROR\n");
485     return; //Failure
486   }
487
488   //Initialize primary shared queue
489   queueInit();
490   //Initialize machine pile w/prefetch oids and offsets shared queue
491   mcpileqInit();
492
493   //Create the primary prefetch thread
494   int retval;
495 #ifdef RANGEPREFETCH
496   do {
497     retval=pthread_create(&tPrefetch, NULL, transPrefetchNew, NULL);
498   } while(retval!=0);
499 #else
500 #ifndef INLINEPREFETCH
501   do {
502     retval=pthread_create(&tPrefetch, NULL, transPrefetch, NULL);
503   } while(retval!=0);
504 #endif
505 #endif
506 #ifndef INLINEPREFETCH
507   pthread_detach(tPrefetch);
508 #endif
509 #endif
510 }
511
512 /* This function stops the threads spawned */
513 void transExit() {
514 #ifdef CACHE
515   int t;
516   pthread_cancel(tPrefetch);
517   for(t = 0; t < NUM_THREADS; t++)
518     pthread_cancel(wthreads[t]);
519 #endif
520
521   return;
522 }
523
524 /* This functions inserts randowm wait delays in the order of msec
525  * Mostly used when transaction commits retry*/
526 void randomdelay() {
527   struct timespec req;
528   time_t t;
529
530   t = time(NULL);
531   req.tv_sec = 0;
532   req.tv_nsec = (long)(1000 + (t%10000)); //1-11 microsec
533   nanosleep(&req, NULL);
534   return;
535 }
536
537 /* This function initializes things required in the transaction start*/
538 void transStart() {
539   t_cache = objstrCreate(1048576);
540   t_chashCreate(CHASH_SIZE, CLOADFACTOR);
541   revertlist=NULL;
542 #ifdef ABORTREADERS
543   t_abort=0;
544 #endif
545 }
546
547 // Search for an address for a given oid                                                                               
548 /*#define INLINE    inline __attribute__((always_inline))
549
550 INLINE void * chashSearchI(chashtable_t *table, unsigned int key) {
551   //REMOVE HASH FUNCTION CALL TO MAKE SURE IT IS INLINED HERE                                                          
552   chashlistnode_t *node = &table->table[(key & table->mask)>>1];
553
554   do {
555     if(node->key == key) {
556       return node->val;
557     }
558     node = node->next;
559   } while(node != NULL);
560
561   return NULL;
562   }*/
563
564
565
566
567 /* This function finds the location of the objects involved in a transaction
568  * and returns the pointer to the object if found in a remote location */
569 __attribute__((pure)) objheader_t *transRead(unsigned int oid) {
570   unsigned int machinenumber;
571   objheader_t *tmp, *objheader;
572   objheader_t *objcopy;
573   int size;
574   void *buf;
575   chashlistnode_t *node;
576
577   if(oid == 0) {
578     return NULL;
579   }
580   
581
582   node= &c_table[(oid & c_mask)>>1];
583   do {
584     if(node->key == oid) {
585 #ifdef TRANSSTATS
586     nchashSearch++;
587 #endif
588 #ifdef COMPILER
589     return &((objheader_t*)node->val)[1];
590 #else
591     return node->val;
592 #endif
593     }
594     node = node->next;
595   } while(node != NULL);
596   
597
598   /*  
599   if((objheader = chashSearchI(record->lookupTable, oid)) != NULL) {
600 #ifdef TRANSSTATS
601     nchashSearch++;
602 #endif
603 #ifdef COMPILER
604     return &objheader[1];
605 #else
606     return objheader;
607 #endif
608   } else 
609   */
610
611 #ifdef ABORTREADERS
612   if (t_abort) {
613     //abort this transaction
614     removetransactionhash();
615     objstrDelete(t_cache);
616     t_chashDelete();
617     _longjmp(aborttrans,1);
618   } else
619     addtransaction(oid);
620 #endif
621
622   if ((objheader = (objheader_t *) mhashSearch(oid)) != NULL) {
623 #ifdef TRANSSTATS
624     nmhashSearch++;
625 #endif
626     /* Look up in machine lookup table  and copy  into cache*/
627     GETSIZE(size, objheader);
628     size += sizeof(objheader_t);
629     objcopy = (objheader_t *) objstrAlloc(&t_cache, size);
630     memcpy(objcopy, objheader, size);
631     /* Insert into cache's lookup table */
632     STATUS(objcopy)=0;
633     t_chashInsert(OID(objheader), objcopy);
634 #ifdef COMPILER
635     return &objcopy[1];
636 #else
637     return objcopy;
638 #endif
639   } else {
640 #ifdef CACHE
641     if((tmp = (objheader_t *) prehashSearch(oid)) != NULL) {
642       if(STATUS(tmp) & DIRTY) {
643 #ifdef TRANSSTATS
644         ndirtyCacheObj++;
645 #endif
646         goto remoteread;
647       }
648 #ifdef TRANSSTATS
649       nprehashSearch++;
650 #endif
651       /* Look up in prefetch cache */
652       GETSIZE(size, tmp);
653       size+=sizeof(objheader_t);
654       objcopy = (objheader_t *) objstrAlloc(&t_cache, size);
655       memcpy(objcopy, tmp, size);
656       /* Insert into cache's lookup table */
657       t_chashInsert(OID(tmp), objcopy);
658 #ifdef COMPILER
659       return &objcopy[1];
660 #else
661       return objcopy;
662 #endif
663     }
664 remoteread:
665 #endif
666     /* Get the object from the remote location */
667     if((machinenumber = lhashSearch(oid)) == 0) {
668       printf("Error: %s() No machine found for oid =% %s,%dx\n",__func__, machinenumber, __FILE__, __LINE__);
669       return NULL;
670     }
671     objcopy = getRemoteObj(machinenumber, oid);
672
673     if(objcopy == NULL) {
674       printf("Error: Object not found in Remote location %s, %d\n", __FILE__, __LINE__);
675       return NULL;
676     } else {
677 #ifdef TRANSSTATS
678       nRemoteSend++;
679 #endif
680 #ifdef COMPILER
681 #ifdef CACHE
682       //Copy object to prefetch cache
683       pthread_mutex_lock(&prefetchcache_mutex);
684       objheader_t *headerObj;
685       int size;
686       GETSIZE(size, objcopy);
687       if((headerObj = prefetchobjstrAlloc(size + sizeof(objheader_t))) == NULL) {
688         printf("%s(): Error in getting memory from prefetch cache at %s, %d\n", __func__,
689             __FILE__, __LINE__);
690         pthread_mutex_unlock(&prefetchcache_mutex);
691         return NULL;
692       }
693       pthread_mutex_unlock(&prefetchcache_mutex);
694       memcpy(headerObj, objcopy, size+sizeof(objheader_t));
695       //make an entry in prefetch lookup hashtable
696       prehashInsert(oid, headerObj);
697       LOGEVENT('B');
698 #endif
699       return &objcopy[1];
700 #else
701       return objcopy;
702 #endif
703     }
704   }
705 }
706
707
708 /* This function finds the location of the objects involved in a transaction
709  * and returns the pointer to the object if found in a remote location */
710 __attribute__((pure)) objheader_t *transRead2(unsigned int oid) {
711 //DEBUG: __attribute__((pure)) objheader_t *transRead2(unsigned int oid, char tmpptr[]) {
712   unsigned int machinenumber;
713   objheader_t *tmp, *objheader;
714   objheader_t *objcopy;
715   int size;
716
717 #ifdef ABORTREADERS
718   if (t_abort) {
719     //abort this transaction
720     removetransactionhash();
721     objstrDelete(t_cache);
722     t_chashDelete();
723     _longjmp(aborttrans,1);
724   } else
725     addtransaction(oid);
726 #endif
727
728   if ((objheader = (objheader_t *) mhashSearch(oid)) != NULL) {
729 #ifdef TRANSSTATS
730     nmhashSearch++;
731 #endif
732     /* Look up in machine lookup table  and copy  into cache*/
733     GETSIZE(size, objheader);
734     size += sizeof(objheader_t);
735     objcopy = (objheader_t *) objstrAlloc(&t_cache, size);
736     memcpy(objcopy, objheader, size);
737     /* Insert into cache's lookup table */
738     STATUS(objcopy)=0;
739     t_chashInsert(OID(objheader), objcopy);
740 #ifdef COMPILER
741     return &objcopy[1];
742 #else
743     return objcopy;
744 #endif
745   } else {
746 #ifdef CACHE
747     if((tmp = (objheader_t *) prehashSearch(oid)) != NULL) {
748       if(STATUS(tmp) & DIRTY) {
749 #ifdef TRANSSTATS
750         ndirtyCacheObj++;
751 #endif
752         goto remoteread;
753       }
754 #ifdef TRANSSTATS
755       LOGEVENT('P')
756       nprehashSearch++;
757 #endif
758       /* Look up in prefetch cache */
759       GETSIZE(size, tmp);
760       size+=sizeof(objheader_t);
761       objcopy = (objheader_t *) objstrAlloc(&t_cache, size);
762       memcpy(objcopy, tmp, size);
763       LOGOIDTYPE("P",oid, TYPE(objcopy), myrdtsc());
764       /* Insert into cache's lookup table */
765       t_chashInsert(OID(tmp), objcopy);
766 #ifdef COMPILER
767       return &objcopy[1];
768 #else
769       return objcopy;
770 #endif
771     }
772 remoteread:
773 #endif
774     /* Get the object from the remote location */
775     if((machinenumber = lhashSearch(oid)) == 0) {
776       printf("Error: %s() No machine found for oid =% %s,%dx\n",__func__, machinenumber, __FILE__, __LINE__);
777       return NULL;
778     }
779     objcopy = getRemoteObj(machinenumber, oid);
780 #ifdef TRANSSTATS
781       LOGEVENT('R');
782       nRemoteSend++;
783 #endif
784
785     if(objcopy == NULL) {
786       printf("Error: Object %u not found in Remote location %s, %d\n", oid,__FILE__, __LINE__);
787       return NULL;
788     } else {
789 #ifdef COMPILER
790 #ifdef CACHE
791       LOGOIDTYPE("RR",oid, TYPE(objcopy),myrdtsc());
792       LOGTIME('r', oid, TYPE(objcopy),myrdtsc(),0);
793       //Copy object to prefetch cache
794       pthread_mutex_lock(&prefetchcache_mutex);
795       objheader_t *headerObj;
796       int size;
797       GETSIZE(size, objcopy);
798       if((headerObj = prefetchobjstrAlloc(size+sizeof(objheader_t))) == NULL) {
799         printf("%s(): Error in getting memory from prefetch cache at %s, %d\n", __func__,
800             __FILE__, __LINE__);
801         pthread_mutex_unlock(&prefetchcache_mutex);
802         return NULL;
803       }
804       pthread_mutex_unlock(&prefetchcache_mutex);
805       memcpy(headerObj, objcopy, size+sizeof(objheader_t));
806       //make an entry in prefetch lookup hashtable
807       prehashInsert(oid, headerObj);
808       LOGEVENT('B');
809 #endif
810       return &objcopy[1];
811 #else
812       return objcopy;
813 #endif
814     }
815   }
816 }
817
818 /* This function creates objects in the transaction record */
819 objheader_t *transCreateObj(unsigned int size) {
820   objheader_t *tmp = (objheader_t *) objstrAlloc(&t_cache, (sizeof(objheader_t) + size));
821   OID(tmp) = getNewOID();
822   tmp->version = 1;
823   tmp->rcount = 1;
824   STATUS(tmp) = NEW;
825   t_chashInsert(OID(tmp), tmp);
826
827 #ifdef COMPILER
828   return &tmp[1]; //want space after object header
829 #else
830   return tmp;
831 #endif
832 }
833
834 #if 1
835 /* This function creates machine piles based on all machines involved in a
836  * transaction commit request */
837 plistnode_t *createPiles() {
838   int i;
839   plistnode_t *pile = NULL;
840   unsigned int machinenum;
841   objheader_t *headeraddr;
842   chashlistnode_t * ptr = c_table;
843   /* Represents number of bins in the chash table */
844   unsigned int size = c_size;
845
846   for(i = 0; i < size ; i++) {
847     chashlistnode_t * curr = &ptr[i];
848     /* Inner loop to traverse the linked list of the cache lookupTable */
849     while(curr != NULL) {
850       //if the first bin in hash table is empty
851       if(curr->key == 0)
852         break;
853       headeraddr=(objheader_t *) curr->val;
854
855       //Get machine location for object id (and whether local or not)
856       if (STATUS(headeraddr) & NEW || (mhashSearch(curr->key) != NULL)) {
857         machinenum = myIpAddr;
858       } else if ((machinenum = lhashSearch(curr->key)) == 0) {
859         printf("Error: No such machine %s, %d\n", __FILE__, __LINE__);
860         return NULL;
861       }
862
863       //Make machine groups
864       pile = pInsert(pile, headeraddr, machinenum, c_numelements);
865       curr = curr->next;
866     }
867   }
868   return pile;
869 }
870 #else
871 /* This function creates machine piles based on all machines involved in a
872  * transaction commit request */
873 plistnode_t *createPiles() {
874   int i;
875   plistnode_t *pile = NULL;
876   unsigned int machinenum;
877   objheader_t *headeraddr;
878   struct chashentry * ptr = c_table;
879   /* Represents number of bins in the chash table */
880   unsigned int size = c_size;
881
882   for(i = 0; i < size ; i++) {
883     struct chashentry * curr = & ptr[i];
884     /* Inner loop to traverse the linked list of the cache lookupTable */
885     //if the first bin in hash table is empty
886     if(curr->key == 0)
887       continue;
888     headeraddr=(objheader_t *) curr->ptr;
889
890     //Get machine location for object id (and whether local or not)
891     if (STATUS(headeraddr) & NEW || (mhashSearch(curr->key) != NULL)) {
892       machinenum = myIpAddr;
893     } else if ((machinenum = lhashSearch(curr->key)) == 0) {
894       printf("Error: No such machine %s, %d\n", __FILE__, __LINE__);
895       return NULL;
896     }
897
898     //Make machine groups
899     pile = pInsert(pile, headeraddr, machinenum, c_numelements);
900   }
901   return pile;
902 }
903 #endif
904
905 /* This function initiates the transaction commit process
906  * Spawns threads for each of the new connections with Participants
907  * and creates new piles by calling the createPiles(),
908  * Sends a transrequest() to each remote machines for objects found remotely
909  * and calls handleLocalReq() to process objects found locally */
910 int transCommit() {
911   unsigned int tot_bytes_mod, *listmid;
912   plistnode_t *pile, *pile_ptr;
913   char treplyretry; /* keeps track of the common response that needs to be sent */
914   int firsttime=1;
915   trans_commit_data_t transinfo; /* keeps track of objs locked during transaction */
916   char finalResponse;
917
918 #ifdef LOGEVENTS
919   int iii;
920   for(iii=0;iii<bigindex;iii++) {
921     printf("%c", bigarray[iii]);
922   }
923 #endif
924
925 #ifdef LOGTIMES
926   int jjj;
927   for(jjj=0; jjj<bigindex1; jjj++) {
928     printf("[%c %u %u %lld %d]\n", bigarray1[jjj], bigarray2[jjj], bigarray3[jjj], bigarray4[jjj], bigarray5[jjj]);
929   }
930 #endif
931
932 #ifdef ABORTREADERS
933   if (t_abort) {
934     //abort this transaction
935     removetransactionhash();
936     objstrDelete(t_cache);
937     t_chashDelete();
938     return 1;
939   }
940 #endif
941
942
943   do {
944     treplyretry = 0;
945
946     /* Look through all the objects in the transaction record and make piles
947      * for each machine involved in the transaction*/
948     if (firsttime) {
949       pile_ptr = pile = createPiles();
950       pile_ptr = pile = sortPiles(pile);
951     } else {
952       pile = pile_ptr;
953     }
954     firsttime = 0;
955     /* Create the packet to be sent in TRANS_REQUEST */
956
957     /* Count the number of participants */
958     int pilecount;
959     pilecount = pCount(pile);
960
961     /* Create a list of machine ids(Participants) involved in transaction   */
962     listmid = calloc(pilecount, sizeof(unsigned int));
963     pListMid(pile, listmid);
964
965     /* Create a socket and getReplyCtrl array, initialize */
966     int socklist[pilecount];
967     char getReplyCtrl[pilecount];
968     int loopcount;
969     for(loopcount = 0 ; loopcount < pilecount; loopcount++){
970       socklist[loopcount] = 0;
971       getReplyCtrl[loopcount] = 0;
972     }
973
974     /* Process each machine pile */
975     int sockindex = 0;
976     trans_req_data_t *tosend;
977     tosend = calloc(pilecount, sizeof(trans_req_data_t));
978     while(pile != NULL) {
979       tosend[sockindex].f.control = TRANS_REQUEST;
980       tosend[sockindex].f.mcount = pilecount;
981       tosend[sockindex].f.numread = pile->numread;
982       tosend[sockindex].f.nummod = pile->nummod;
983       tosend[sockindex].f.numcreated = pile->numcreated;
984       tosend[sockindex].f.sum_bytes = pile->sum_bytes;
985       tosend[sockindex].listmid = listmid;
986       tosend[sockindex].objread = pile->objread;
987       tosend[sockindex].oidmod = pile->oidmod;
988       tosend[sockindex].oidcreated = pile->oidcreated;
989       int sd = 0;
990       if(pile->mid != myIpAddr) {
991         if((sd = getSock2WithLock(transRequestSockPool, pile->mid)) < 0) {
992           printf("transRequest(): socket create error\n");
993           free(listmid);
994           free(tosend);
995           return 1;
996         }
997         socklist[sockindex] = sd;
998         /* Send bytes of data with TRANS_REQUEST control message */
999         send_data(sd, &(tosend[sockindex].f), sizeof(fixed_data_t));
1000
1001         /* Send list of machines involved in the transaction */
1002         {
1003           int size=sizeof(unsigned int)*(tosend[sockindex].f.mcount);
1004           send_data(sd, tosend[sockindex].listmid, size);
1005         }
1006
1007         /* Send oids and version number tuples for objects that are read */
1008         {
1009           int size=(sizeof(unsigned int)+sizeof(unsigned short))*(tosend[sockindex].f.numread);
1010           send_data(sd, tosend[sockindex].objread, size);
1011         }
1012
1013         /* Send objects that are modified */
1014         void *modptr;
1015         if((modptr = calloc(1, tosend[sockindex].f.sum_bytes)) == NULL) {
1016           printf("Calloc error for modified objects %s, %d\n", __FILE__, __LINE__);
1017           free(listmid);
1018           free(tosend);
1019           return 1;
1020         }
1021         int offset = 0;
1022         int i;
1023         for(i = 0; i < tosend[sockindex].f.nummod ; i++) {
1024           int size;
1025           objheader_t *headeraddr;
1026           if((headeraddr = t_chashSearch(tosend[sockindex].oidmod[i])) == NULL) {
1027             printf("%s() Error: No such oid %s, %d\n", __func__, __FILE__, __LINE__);
1028             free(modptr);
1029             free(listmid);
1030             free(tosend);
1031             return 1;
1032           }
1033           GETSIZE(size,headeraddr);
1034           size+=sizeof(objheader_t);
1035           memcpy(modptr+offset, headeraddr, size);
1036           offset+=size;
1037         }
1038         send_data(sd, modptr, tosend[sockindex].f.sum_bytes);
1039         free(modptr);
1040       } else { //handle request locally
1041         handleLocalReq(&tosend[sockindex], &transinfo, &getReplyCtrl[sockindex]);
1042       }
1043       sockindex++;
1044       pile = pile->next;
1045     } //end of pile processing
1046       /* Recv Ctrl msgs from all machines */
1047     int i;
1048     for(i = 0; i < pilecount; i++) {
1049       int sd = socklist[i];
1050       if(sd != 0) {
1051         char control;
1052         recv_data(sd, &control, sizeof(char));
1053         //Update common data structure with new ctrl msg
1054         getReplyCtrl[i] = control;
1055         /* Recv Objects if participant sends TRANS_DISAGREE */
1056 #ifdef CACHE
1057         if(control == TRANS_DISAGREE) {
1058           int length;
1059           recv_data(sd, &length, sizeof(int));
1060           void *newAddr;
1061           pthread_mutex_lock(&prefetchcache_mutex);
1062           if ((newAddr = prefetchobjstrAlloc((unsigned int)length)) == NULL) {
1063             printf("Error: %s() objstrAlloc error for copying into prefetch cache %s, %d\n", __func__, __FILE__, __LINE__);
1064             free(tosend);
1065             free(listmid);
1066             pthread_mutex_unlock(&prefetchcache_mutex);
1067             return 1;
1068           }
1069           pthread_mutex_unlock(&prefetchcache_mutex);
1070           recv_data(sd, newAddr, length);
1071           int offset = 0;
1072           while(length != 0) {
1073             unsigned int oidToPrefetch;
1074             objheader_t * header;
1075             header = (objheader_t *)(((char *)newAddr) + offset);
1076             oidToPrefetch = OID(header);
1077             STATUS(header)=0;
1078             int size = 0;
1079             GETSIZE(size, header);
1080             size += sizeof(objheader_t);
1081             //make an entry in prefetch hash table
1082         prehashInsert(oidToPrefetch, header);
1083         LOGEVENT('E');
1084             length = length - size;
1085             offset += size;
1086           }
1087         } //end of receiving objs
1088 #endif
1089       }
1090     }
1091     /* Decide the final response */
1092     if((finalResponse = decideResponse(getReplyCtrl, &treplyretry, pilecount)) == 0) {
1093       printf("Error: %s() in updating prefetch cache %s, %d\n", __func__, __FILE__, __LINE__);
1094       free(tosend);
1095       free(listmid);
1096       return 1;
1097     }
1098
1099     /* Send responses to all machines */
1100     for(i = 0; i < pilecount; i++) {
1101       int sd = socklist[i];
1102       if(sd != 0) {
1103 #ifdef CACHE
1104         if(finalResponse == TRANS_COMMIT) {
1105           int retval;
1106           /* Update prefetch cache */
1107           if((retval = updatePrefetchCache(&(tosend[i]))) != 0) {
1108             printf("Error: %s() in updating prefetch cache %s, %d\n", __func__, __FILE__, __LINE__);
1109             free(tosend);
1110             free(listmid);
1111             return 1;
1112           }
1113
1114
1115           /* Invalidate objects in other machine cache */
1116           if(tosend[i].f.nummod > 0) {
1117             if((retval = invalidateObj(&(tosend[i]))) != 0) {
1118               printf("Error: %s() in invalidating Objects %s, %d\n", __func__, __FILE__, __LINE__);
1119               free(tosend);
1120               free(listmid);
1121               return 1;
1122             }
1123           }
1124 #ifdef ABORTREADERS
1125           removetransaction(tosend[i].oidmod,tosend[i].f.nummod);
1126           removethisreadtransaction(tosend[i].objread, tosend[i].f.numread);
1127 #endif
1128         }
1129 #ifdef ABORTREADERS
1130         else if (!treplyretry) {
1131           removethistransaction(tosend[i].oidmod,tosend[i].f.nummod);
1132           removethisreadtransaction(tosend[i].objread,tosend[i].f.numread);
1133         }
1134 #endif
1135 #endif
1136         send_data(sd, &finalResponse, sizeof(char));
1137       } else {
1138         /* Complete local processing */
1139         doLocalProcess(finalResponse, &(tosend[i]), &transinfo);
1140 #ifdef ABORTREADERS
1141         if(finalResponse == TRANS_COMMIT) {
1142           removetransaction(tosend[i].oidmod,tosend[i].f.nummod);
1143           removethisreadtransaction(tosend[i].objread,tosend[i].f.numread);
1144         } else if (!treplyretry) {
1145           removethistransaction(tosend[i].oidmod,tosend[i].f.nummod);
1146           removethisreadtransaction(tosend[i].objread,tosend[i].f.numread);
1147         }
1148 #endif
1149       }
1150     }
1151
1152     /* Free resources */
1153     free(tosend);
1154     free(listmid);
1155     if (!treplyretry)
1156       pDelete(pile_ptr);
1157     /* wait a random amount of time before retrying to commit transaction*/
1158     if(treplyretry) {
1159       randomdelay();
1160 #ifdef TRANSSTATS
1161       nSoftAbort++;
1162 #endif
1163     }
1164     /* Retry trans commit procedure during soft_abort case */
1165   } while (treplyretry);
1166
1167   if(finalResponse == TRANS_ABORT) {
1168 #ifdef TRANSSTATS
1169     LOGEVENT('A');
1170     numTransAbort++;
1171 #endif
1172     /* Free Resources */
1173     objstrDelete(t_cache);
1174     t_chashDelete();
1175     return TRANS_ABORT;
1176   } else if(finalResponse == TRANS_COMMIT) {
1177 #ifdef TRANSSTATS
1178     LOGEVENT('C');
1179     numTransCommit++;
1180 #endif
1181     /* Free Resources */
1182     objstrDelete(t_cache);
1183     t_chashDelete();
1184     return 0;
1185   } else {
1186     //TODO Add other cases
1187     printf("Error: in %s() THIS SHOULD NOT HAPPEN.....EXIT PROGRAM\n", __func__);
1188     exit(-1);
1189   }
1190   return 0;
1191 }
1192
1193 /* This function handles the local objects involved in a transaction
1194  * commiting process.  It also makes a decision if this local machine
1195  * sends AGREE or DISAGREE or SOFT_ABORT to coordinator */
1196 void handleLocalReq(trans_req_data_t *tdata, trans_commit_data_t *transinfo, char *getReplyCtrl) {
1197   unsigned int *oidnotfound = NULL, *oidlocked = NULL;
1198   int numoidnotfound = 0, numoidlocked = 0;
1199   int v_nomatch = 0, v_matchlock = 0, v_matchnolock = 0;
1200   int numread, i;
1201   unsigned int oid;
1202   unsigned short version;
1203
1204   /* Counters and arrays to formulate decision on control message to be sent */
1205   oidnotfound = (unsigned int *) calloc((tdata->f.numread + tdata->f.nummod), sizeof(unsigned int));
1206   oidlocked = (unsigned int *) calloc((tdata->f.numread + tdata->f.nummod +1), sizeof(unsigned int)); // calloc additional 1 byte for
1207   //setting a divider between read and write locks
1208   numread = tdata->f.numread;
1209   /* Process each oid in the machine pile/ group per thread */
1210   for (i = 0; i < tdata->f.numread + tdata->f.nummod; i++) {
1211     if (i < tdata->f.numread) {
1212       int incr = sizeof(unsigned int) + sizeof(unsigned short); // Offset that points to next position in the objread array
1213       incr *= i;
1214       oid = *((unsigned int *)(((char *)tdata->objread) + incr));
1215       version = *((unsigned short *)(((char *)tdata->objread) + incr + sizeof(unsigned int)));
1216       commitCountForObjRead(getReplyCtrl, oidnotfound, oidlocked, &numoidnotfound, &numoidlocked, &v_nomatch, &v_matchlock, &v_matchnolock, oid, version);
1217     } else { // Objects Modified
1218       if(i == tdata->f.numread) {
1219         oidlocked[numoidlocked++] = -1;
1220       }
1221       int tmpsize;
1222       objheader_t *headptr;
1223       headptr = (objheader_t *) t_chashSearch(tdata->oidmod[i-numread]);
1224       if (headptr == NULL) {
1225         printf("Error: handleLocalReq() returning NULL, no such oid %s, %d\n", __FILE__, __LINE__);
1226         return;
1227       }
1228       oid = OID(headptr);
1229       version = headptr->version;
1230       commitCountForObjMod(getReplyCtrl, oidnotfound, oidlocked, &numoidnotfound, &numoidlocked, &v_nomatch, &v_matchlock, &v_matchnolock, oid, version);
1231     }
1232   }
1233
1234 /* Fill out the trans_commit_data_t data structure. This is required for a trans commit process
1235  * if Participant receives a TRANS_COMMIT */
1236   transinfo->objlocked = oidlocked;
1237   transinfo->objnotfound = oidnotfound;
1238   transinfo->modptr = NULL;
1239   transinfo->numlocked = numoidlocked;
1240   transinfo->numnotfound = numoidnotfound;
1241
1242   /* Condition to send TRANS_AGREE */
1243   if(v_matchnolock == tdata->f.numread + tdata->f.nummod) {
1244     *getReplyCtrl = TRANS_AGREE;
1245   }
1246   /* Condition to send TRANS_SOFT_ABORT */
1247   if((v_matchlock > 0 && v_nomatch == 0) || (numoidnotfound > 0 && v_nomatch == 0)) {
1248     *getReplyCtrl = TRANS_SOFT_ABORT;
1249   }
1250 }
1251
1252 void doLocalProcess(char finalResponse, trans_req_data_t *tdata, trans_commit_data_t *transinfo) {
1253   if(finalResponse == TRANS_ABORT) {
1254     if(transAbortProcess(transinfo) != 0) {
1255       printf("Error in transAbortProcess() %s,%d\n", __FILE__, __LINE__);
1256       fflush(stdout);
1257       return;
1258     }
1259   } else if(finalResponse == TRANS_COMMIT) {
1260 #ifdef CACHE
1261     /* Invalidate objects in other machine cache */
1262     if(tdata->f.nummod > 0) {
1263       int retval;
1264       if((retval = invalidateObj(tdata)) != 0) {
1265         printf("Error: %s() in invalidating Objects %s, %d\n", __func__, __FILE__, __LINE__);
1266         return;
1267       }
1268     }
1269 #endif
1270     if(transComProcess(tdata, transinfo) != 0) {
1271       printf("Error in transComProcess() %s,%d\n", __FILE__, __LINE__);
1272       fflush(stdout);
1273       return;
1274     }
1275   } else {
1276     printf("ERROR...No Decision\n");
1277   }
1278
1279   /* Free memory */
1280   if (transinfo->objlocked != NULL) {
1281     free(transinfo->objlocked);
1282   }
1283   if (transinfo->objnotfound != NULL) {
1284     free(transinfo->objnotfound);
1285   }
1286 }
1287
1288 /* This function decides the reponse that needs to be sent to
1289  * all Participant machines after the TRANS_REQUEST protocol */
1290 char decideResponse(char *getReplyCtrl, char *treplyretry, int pilecount) {
1291   int i, transagree = 0, transdisagree = 0, transsoftabort = 0; /* Counters to formulate decision of what
1292                                                                    message to send */
1293   for (i = 0 ; i < pilecount; i++) {
1294     char control;
1295     control = getReplyCtrl[i];
1296     switch(control) {
1297     default:
1298       printf("Participant sent unknown message %d in %s, %d\n", control, __FILE__, __LINE__);
1299
1300       /* treat as disagree, pass thru */
1301     case TRANS_DISAGREE:
1302       transdisagree++;
1303       break;
1304
1305     case TRANS_AGREE:
1306       transagree++;
1307       break;
1308
1309     case TRANS_SOFT_ABORT:
1310       transsoftabort++;
1311       break;
1312     }
1313   }
1314
1315   if(transdisagree > 0) {
1316     /* Send Abort */
1317     *treplyretry = 0;
1318     return TRANS_ABORT;
1319 #ifdef CACHE
1320     /* clear objects from prefetch cache */
1321     //cleanPCache();
1322 #endif
1323   } else if(transagree == pilecount) {
1324     /* Send Commit */
1325     *treplyretry = 0;
1326     return TRANS_COMMIT;
1327   } else {
1328     /* Send Abort in soft abort case followed by retry commiting transaction again*/
1329     *treplyretry = 1;
1330     return TRANS_ABORT;
1331   }
1332   return 0;
1333 }
1334
1335 /* This function opens a connection, places an object read request to
1336  * the remote machine, reads the control message and object if
1337  * available and copies the object and its header to the local
1338  * cache. */
1339
1340 void *getRemoteObj(unsigned int mnum, unsigned int oid) {
1341   int size, val;
1342   struct sockaddr_in serv_addr;
1343   char machineip[16];
1344   char control;
1345   objheader_t *h;
1346   void *objcopy = NULL;
1347
1348   int sd = getSock2(transReadSockPool, mnum);
1349   char readrequest[sizeof(char)+sizeof(unsigned int)];
1350   readrequest[0] = READ_REQUEST;
1351   *((unsigned int *)(&readrequest[1])) = oid;
1352   send_data(sd, readrequest, sizeof(readrequest));
1353
1354   /* Read response from the Participant */
1355   recv_data(sd, &control, sizeof(char));
1356
1357   if (control==OBJECT_NOT_FOUND) {
1358     objcopy = NULL;
1359   } else {
1360     /* Read object if found into local cache */
1361     recv_data(sd, &size, sizeof(int));
1362     objcopy = objstrAlloc(&t_cache, size);
1363     recv_data(sd, objcopy, size);
1364     STATUS(objcopy)=0;
1365     /* Insert into cache's lookup table */
1366     t_chashInsert(oid, objcopy);
1367 #ifdef TRANSSTATS
1368     totalObjSize += size;
1369 #endif
1370   }
1371
1372   return objcopy;
1373 }
1374
1375 /*  Commit info for objects modified */
1376 void commitCountForObjMod(char *getReplyCtrl, unsigned int *oidnotfound, unsigned int *oidlocked, int *numoidnotfound,
1377                           int *numoidlocked, int *v_nomatch, int *v_matchlock, int *v_matchnolock, unsigned int oid, unsigned short version) {
1378   void *mobj;
1379   /* Check if object is still present in the machine since the beginning of TRANS_REQUEST */
1380   /* Save the oids not found and number of oids not found for later use */
1381   if ((mobj = mhashSearch(oid)) == NULL) { /* Obj not found */
1382     /* Save the oids not found and number of oids not found for later use */
1383     oidnotfound[*numoidnotfound] = oid;
1384     (*numoidnotfound)++;
1385   } else { /* If Obj found in machine (i.e. has not moved) */
1386     /* Check if Obj is locked by any previous transaction */
1387     if (write_trylock(STATUSPTR(mobj))) { // Can acquire write lock
1388       if (version == ((objheader_t *)mobj)->version) {      /* match versions */
1389         (*v_matchnolock)++;
1390         //Keep track of what is locked
1391         oidlocked[(*numoidlocked)++] = OID(((objheader_t *)mobj));
1392       } else { /* If versions don't match ...HARD ABORT */
1393         (*v_nomatch)++;
1394         /* Send TRANS_DISAGREE to Coordinator */
1395         *getReplyCtrl = TRANS_DISAGREE;
1396
1397         //Keep track of what is locked
1398         oidlocked[(*numoidlocked)++] = OID(((objheader_t *)mobj));
1399         return;
1400       }
1401     } else { //A lock is acquired some place else
1402       if (version == ((objheader_t *)mobj)->version) { /* Check if versions match */
1403         (*v_matchlock)++;
1404       } else { /* If versions don't match ...HARD ABORT */
1405         (*v_nomatch)++;
1406         /* Send TRANS_DISAGREE to Coordinator */
1407         *getReplyCtrl = TRANS_DISAGREE;
1408         return;
1409       }
1410     }
1411   }
1412 }
1413
1414 /*  Commit info for objects modified */
1415 void commitCountForObjRead(char *getReplyCtrl, unsigned int *oidnotfound, unsigned int *oidlocked, int *numoidnotfound,
1416                            int *numoidlocked, int *v_nomatch, int *v_matchlock, int *v_matchnolock, unsigned int oid, unsigned short version) {
1417   void *mobj;
1418   /* Check if object is still present in the machine since the beginning of TRANS_REQUEST */
1419   /* Save the oids not found and number of oids not found for later use */
1420   if ((mobj = mhashSearch(oid)) == NULL) { /* Obj not found */
1421     /* Save the oids not found and number of oids not found for later use */
1422     oidnotfound[*numoidnotfound] = oid;
1423     (*numoidnotfound)++;
1424   } else { /* If Obj found in machine (i.e. has not moved) */
1425     /* Check if Obj is locked by any previous transaction */
1426     if (read_trylock(STATUSPTR(mobj))) { // Can further acquire read locks
1427       if (version == ((objheader_t *)mobj)->version) {      /* If locked then match versions */
1428         (*v_matchnolock)++;
1429         //Keep track of what is locked
1430         oidlocked[(*numoidlocked)++] = OID(((objheader_t *)mobj));
1431       } else { /* If versions don't match ...HARD ABORT */
1432         (*v_nomatch)++;
1433         /* Send TRANS_DISAGREE to Coordinator */
1434         *getReplyCtrl = TRANS_DISAGREE;
1435         //Keep track of what is locked
1436         oidlocked[(*numoidlocked)++] = OID(((objheader_t *)mobj));
1437         return;
1438       }
1439     } else { //Has reached max number of readers or some other transaction
1440       //has acquired a lock on this object
1441       if (version == ((objheader_t *)mobj)->version) { /* Check if versions match */
1442         (*v_matchlock)++;
1443       } else { /* If versions don't match ...HARD ABORT */
1444         (*v_nomatch)++;
1445         /* Send TRANS_DISAGREE to Coordinator */
1446         *getReplyCtrl = TRANS_DISAGREE;
1447         return;
1448       }
1449     }
1450   }
1451 }
1452
1453 /* This function completes the ABORT process if the transaction is aborting */
1454 int transAbortProcess(trans_commit_data_t *transinfo) {
1455   int i, numlocked;
1456   unsigned int *objlocked;
1457   void *header;
1458
1459   numlocked = transinfo->numlocked;
1460   objlocked = transinfo->objlocked;
1461
1462   int useWriteUnlock = 0;
1463   for (i = 0; i < numlocked; i++) {
1464     if(objlocked[i] == -1) {
1465       useWriteUnlock = 1;
1466       continue;
1467     }
1468     if((header = mhashSearch(objlocked[i])) == NULL) {
1469       printf("mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__);
1470       return 1;
1471     }
1472     if(!useWriteUnlock) {
1473       read_unlock(STATUSPTR(header));
1474     } else {
1475       write_unlock(STATUSPTR(header));
1476     }
1477   }
1478
1479   return 0;
1480 }
1481
1482 /*This function completes the COMMIT process if the transaction is commiting*/
1483 int transComProcess(trans_req_data_t *tdata, trans_commit_data_t *transinfo) {
1484   objheader_t *header, *tcptr;
1485   int i, nummod, tmpsize, numcreated, numlocked;
1486   unsigned int *oidmod, *oidcreated, *oidlocked;
1487   void *ptrcreate;
1488
1489   nummod = tdata->f.nummod;
1490   oidmod = tdata->oidmod;
1491   numcreated = tdata->f.numcreated;
1492   oidcreated = tdata->oidcreated;
1493   numlocked = transinfo->numlocked;
1494   oidlocked = transinfo->objlocked;
1495
1496   for (i = 0; i < nummod; i++) {
1497     if((header = (objheader_t *) mhashSearch(oidmod[i])) == NULL) {
1498       printf("Error: transComProcess() mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__);
1499       return 1;
1500     }
1501     /* Copy from transaction cache -> main object store */
1502     if ((tcptr = ((objheader_t *) t_chashSearch(oidmod[i]))) == NULL) {
1503       printf("Error: transComProcess() chashSearch returned NULL at %s, %d\n", __FILE__, __LINE__);
1504       return 1;
1505     }
1506     GETSIZE(tmpsize, header);
1507     char *tmptcptr = (char *) tcptr;
1508     {
1509       struct ___Object___ *dst=(struct ___Object___*)((char*)header+sizeof(objheader_t));
1510       struct ___Object___ *src=(struct ___Object___*)((char*)tmptcptr+sizeof(objheader_t));
1511       dst->___cachedCode___=src->___cachedCode___;
1512       dst->___cachedHash___=src->___cachedHash___;
1513
1514       memcpy(&dst[1], &src[1], tmpsize-sizeof(struct ___Object___));
1515     }
1516
1517     header->version += 1;
1518     if(header->notifylist != NULL) {
1519       notifyAll(&header->notifylist, OID(header), header->version);
1520     }
1521   }
1522   /* If object is newly created inside transaction then commit it */
1523   for (i = 0; i < numcreated; i++) {
1524     if ((header = ((objheader_t *) t_chashSearch(oidcreated[i]))) == NULL) {
1525       printf("Error: transComProcess() chashSearch returned NULL for oid = %x at %s, %d\n", oidcreated[i], __FILE__, __LINE__);
1526       return 1;
1527     }
1528     GETSIZE(tmpsize, header);
1529     tmpsize += sizeof(objheader_t);
1530     pthread_mutex_lock(&mainobjstore_mutex);
1531     if ((ptrcreate = objstrAlloc(&mainobjstore, tmpsize)) == NULL) {
1532       printf("Error: transComProcess() failed objstrAlloc %s, %d\n", __FILE__, __LINE__);
1533       pthread_mutex_unlock(&mainobjstore_mutex);
1534       return 1;
1535     }
1536     pthread_mutex_unlock(&mainobjstore_mutex);
1537     /* Initialize read and write locks */
1538     initdsmlocks(STATUSPTR(header));
1539     memcpy(ptrcreate, header, tmpsize);
1540     mhashInsert(oidcreated[i], ptrcreate);
1541     lhashInsert(oidcreated[i], myIpAddr);
1542   }
1543   /* Unlock locked objects */
1544   int useWriteUnlock = 0;
1545   for(i = 0; i < numlocked; i++) {
1546     if(oidlocked[i] == -1) {
1547       useWriteUnlock = 1;
1548       continue;
1549     }
1550     if((header = (objheader_t *) mhashSearch(oidlocked[i])) == NULL) {
1551       printf("mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__);
1552       return 1;
1553     }
1554     if(!useWriteUnlock) {
1555       read_unlock(STATUSPTR(header));
1556     } else {
1557       write_unlock(STATUSPTR(header));
1558     }
1559   }
1560   return 0;
1561 }
1562
1563 prefetchpile_t *foundLocal(char *ptr, int numprefetches, int mysiteid) {
1564   int i;
1565   int j;
1566   prefetchpile_t * head=NULL;
1567
1568   for(j=0;j<numprefetches;j++) {
1569     int siteid = *(GET_SITEID(ptr));
1570     int ntuples = *(GET_NTUPLES(ptr));
1571     unsigned int * oidarray = GET_PTR_OID(ptr);
1572     unsigned short * endoffsets = GET_PTR_EOFF(ptr, ntuples);
1573     short * arryfields = GET_PTR_ARRYFLD(ptr, ntuples);
1574     int numLocal = 0;
1575     
1576     for(i=0; i<ntuples; i++) {
1577       unsigned short baseindex=(i==0) ? 0 : endoffsets[i-1];
1578       unsigned short endindex=endoffsets[i];
1579       unsigned int oid=oidarray[i];
1580       int newbase;
1581       int machinenum;
1582       int countInvalidObj=0;
1583
1584       if (oid==0) {
1585         numLocal++;
1586         continue;
1587       }
1588       //Look up fields locally
1589       int isLastOffset=0;
1590       if(endindex==0)
1591           isLastOffset=1;
1592       for(newbase=baseindex; newbase<endindex; newbase++) {
1593         if(newbase==(endindex-1))
1594           isLastOffset=1;
1595         if (!lookupObject(&oid,arryfields[newbase],&countInvalidObj)) {
1596           break;
1597         }
1598         //Ended in a null pointer...
1599         if (oid==0) {
1600           numLocal++;
1601           goto tuple;
1602         }
1603       }
1604
1605       //Entire prefetch is local
1606       if (newbase==endindex&&checkoid(oid,isLastOffset)) {
1607         numLocal++;
1608         goto tuple;
1609       }
1610
1611       //Add to remote requests
1612       machinenum=lhashSearch(oid);
1613       insertPile(machinenum, oid, siteid,endindex-newbase, &arryfields[newbase], &head);
1614     tuple:
1615       ;
1616     }
1617     
1618     /* handle dynamic prefetching */
1619     handleDynPrefetching(numLocal, ntuples, siteid);
1620     ptr=((char *)&arryfields[endoffsets[ntuples-1]])+sizeof(int);
1621   }
1622
1623   return head;
1624 }
1625
1626 int checkoid(unsigned int oid, int isLastOffset) {
1627   objheader_t *header;
1628   if ((header=mhashSearch(oid))!=NULL) {
1629     //Found on machine
1630     return 1;
1631   } else if ((header=prehashSearch(oid))!=NULL) {
1632     //if the last offset then prefetch object
1633     if((STATUS(header) & DIRTY) && isLastOffset) {
1634       return 0;
1635     }
1636     //Found in cache
1637     return 1;
1638   } else {
1639     return 0;
1640   }
1641 }
1642
1643 int lookupObject(unsigned int * oid, short offset, int *countInvalidObj) {
1644   objheader_t *header;
1645   if ((header=mhashSearch(*oid))!=NULL) {
1646     //Found on machine
1647     ;
1648   } else if ((header=prehashSearch(*oid))!=NULL) {
1649     //Found in cache
1650     if(STATUS(header) & DIRTY) {//Read an oid that is an old entry in the cache;
1651       //only once because later old entries may still cause unnecessary roundtrips during prefetching
1652       (*countInvalidObj)+=1;
1653       if(*countInvalidObj > 1) {
1654         return 0;
1655       }
1656     }
1657   } else {
1658     return 0;
1659   }
1660
1661   if(TYPE(header) >= NUMCLASSES) {
1662     int elementsize = classsize[TYPE(header)];
1663     struct ArrayObject *ao = (struct ArrayObject *) (((char *)header) + sizeof(objheader_t));
1664     int length = ao->___length___;
1665     /* Check if array out of bounds */
1666     if(offset < 0 || offset >= length) {
1667       //if yes treat the object as found
1668       (*oid)=0;
1669       return 1;
1670     }
1671     (*oid) = *((unsigned int *)(((char *)ao) + sizeof(struct ArrayObject) + (elementsize*offset)));
1672     return 1;
1673   } else {
1674     (*oid) = *((unsigned int *)(((char *)header) + sizeof(objheader_t) + offset));
1675     return 1;
1676   }
1677 }
1678
1679
1680 /* This function is called by the thread calling transPrefetch */
1681 void *transPrefetch(void *t) {
1682   while(1) {
1683     /* read from prefetch queue */
1684     void *node=gettail();
1685     /* Check if the tuples are found locally, if yes then reduce them further*/
1686     /* and group requests by remote machine ids by calling the makePreGroups() */
1687     int count=numavailable();
1688     prefetchpile_t *pilehead = foundLocal(node, count, 0);
1689
1690     if (pilehead!=NULL) {
1691       // Get sock from shared pool
1692
1693       /* Send  Prefetch Request */
1694       prefetchpile_t *ptr = pilehead;
1695       while(ptr != NULL) {
1696         globalid++;
1697         int sd = getSock2(transPrefetchSockPool, ptr->mid);
1698         sendPrefetchReq(ptr, sd,globalid);
1699         ptr = ptr->next;
1700       }
1701
1702       /* Release socket */
1703       //        freeSock(transPrefetchSockPool, pilehead->mid, sd);
1704
1705       /* Deallocated pilehead */
1706       mcdealloc(pilehead);
1707     }
1708     // Deallocate the prefetch queue pile node
1709     incmulttail(count);
1710   }
1711 }
1712
1713 void sendPrefetchReqnew(prefetchpile_t *mcpilenode, int sd) {
1714   objpile_t *tmp;
1715
1716   int size=sizeof(char)+sizeof(int);
1717   for(tmp=mcpilenode->objpiles; tmp!=NULL; tmp=tmp->next) {
1718     size += sizeof(int) + sizeof(unsigned int) + sizeof(unsigned int) + ((tmp->numoffset) * sizeof(short));
1719   }
1720
1721   char buft[size];
1722   char *buf=buft;
1723   *buf=TRANS_PREFETCH;
1724   buf+=sizeof(char);
1725
1726   for(tmp=mcpilenode->objpiles; tmp!=NULL; tmp=tmp->next) {
1727     int len = sizeof(int) + sizeof(unsigned int) + sizeof(unsigned int) + ((tmp->numoffset) * sizeof(short));
1728     *((int*)buf)=len;
1729     buf+=sizeof(int);
1730     *((unsigned int *)buf)=tmp->oid;
1731     buf+=sizeof(unsigned int);
1732     *((unsigned int *)(buf)) = myIpAddr;
1733     buf+=sizeof(unsigned int);
1734     memcpy(buf, tmp->offset, tmp->numoffset*sizeof(short));
1735     buf+=tmp->numoffset*sizeof(short);
1736   }
1737   *((int *)buf)=-1;
1738   send_data(sd, buft, size);
1739   return;
1740 }
1741
1742 /**
1743  * parameters: mcpilenode -> pile node to traverse to assemble pref requests
1744  * sd -> socket id
1745  * gid -> global identifier for each prefetch request sent, starts with 0
1746  **/
1747 void sendPrefetchReq(prefetchpile_t *mcpilenode, int sd, int gid) {
1748   int len, endpair;
1749   char control;
1750   objpile_t *tmp;
1751   struct writestruct writebuffer;
1752   writebuffer.offset=0;
1753
1754
1755   /* Send TRANS_PREFETCH control message */
1756   int first=1;
1757
1758   /* Send Oids and offsets in pairs */
1759   tmp = mcpilenode->objpiles;
1760   while(tmp != NULL) {
1761     len = sizeof(int)+sizeof(int) + sizeof(unsigned int) + sizeof(unsigned int) + ((tmp->numoffset) * sizeof(short));
1762     char oidnoffset[len+5];
1763     char *buf=oidnoffset;
1764     if (first) {
1765       *buf=TRANS_PREFETCH;
1766       buf++;len++;
1767       first=0;
1768     }
1769     *((int*)buf) = tmp->numoffset;
1770     buf+=sizeof(int);
1771     *((unsigned int *)buf) = tmp->oid;
1772     LOGOIDTYPE("S",tmp->oid,tmp->numoffset,myrdtsc());
1773 #ifdef TRANSSTATS
1774     sendRemoteReq++;
1775 #endif
1776     buf+=sizeof(unsigned int);
1777     *((unsigned int *)buf) = myIpAddr;
1778     buf+= sizeof(unsigned int);
1779     *((int*)buf) = gid;
1780     buf+=sizeof(int);
1781     memcpy(buf, tmp->offset, (tmp->numoffset)*sizeof(short));
1782     tmp = tmp->next;
1783     if (tmp==NULL) {
1784       *((int *)(&oidnoffset[len]))=-1;
1785       len+=sizeof(int);
1786     }
1787     if (tmp!=NULL)
1788       send_buf(sd, &writebuffer, oidnoffset, len);
1789     else
1790       forcesend_buf(sd, &writebuffer, oidnoffset, len);
1791   }
1792   LOGOIDTYPE("SREQ",0,0,myrdtsc());
1793   LOGEVENT('S');
1794   LOGTIME('S',0,0,myrdtsc(),gid); //after sending
1795   return;
1796 }
1797
1798 int getPrefetchResponse(int sd, struct readstruct *readbuffer) {
1799   int gid,length = 0, size = 0;
1800   char control;
1801   unsigned int oid;
1802   void *modptr, *oldptr;
1803
1804   recv_data_buf(sd, readbuffer, &length, sizeof(int));
1805   size = length - sizeof(int);
1806   char recvbuffer[size];
1807 #ifdef TRANSSTATS
1808   getResponse++;
1809   LOGEVENT('Z');
1810   LOGTIME('K',0,0, myrdtsc(),0); //log time after first recv
1811 #endif
1812   recv_data_buf(sd, readbuffer, recvbuffer, size);
1813   control = *((char *) recvbuffer);
1814   if(control == OBJECT_FOUND) {
1815     oid = *((unsigned int *)(recvbuffer + sizeof(char)));
1816     gid = *((int *) (recvbuffer+sizeof(char)+sizeof(unsigned int)));
1817     LOGTIME('G',oid,0, myrdtsc(),gid); //log time after first recv
1818     size = size - (sizeof(char) + sizeof(unsigned int) + sizeof(int));
1819     pthread_mutex_lock(&prefetchcache_mutex);
1820     if ((modptr = prefetchobjstrAlloc(size)) == NULL) {
1821       printf("Error: objstrAlloc error for copying into prefetch cache %s, %d\n", __FILE__, __LINE__);
1822       pthread_mutex_unlock(&prefetchcache_mutex);
1823       return -1;
1824     }
1825     pthread_mutex_unlock(&prefetchcache_mutex);
1826     memcpy(modptr, recvbuffer + sizeof(char) + sizeof(unsigned int)+sizeof(int), size);
1827     STATUS(modptr)=0;
1828
1829
1830     /* Insert the oid and its address into the prefetch hash lookup table */
1831     /* Do a version comparison if the oid exists */
1832     if((oldptr = prehashSearch(oid)) != NULL) {
1833       /* If older version then update with new object ptr */
1834       if(((objheader_t *)oldptr)->version < ((objheader_t *)modptr)->version) {
1835         prehashInsert(oid, modptr);
1836       }
1837     } else { /* Else add the object ptr to hash table*/
1838       prehashInsert(oid, modptr);
1839     }
1840     LOGOIDTYPE("GR",oid, TYPE(modptr),myrdtsc());
1841     LOGTIME('Z',oid, TYPE(modptr), myrdtsc(),gid); //log time after copying it into the prefetch cache
1842   } else if(control == OBJECT_NOT_FOUND) {
1843     oid = *((unsigned int *)(recvbuffer + sizeof(char)));
1844     gid = *((int *) (recvbuffer+sizeof(char)+sizeof(unsigned int)));
1845     LOGOIDTYPE("NF",oid,0,myrdtsc());
1846     LOGTIME('F',oid, 0, myrdtsc(),gid); //log time after copying it into the prefetch cache
1847     /* TODO: For each object not found query DHT for new location and retrieve the object */
1848     /* Throw an error */
1849     //printf("OBJECT %x NOT FOUND.... THIS SHOULD NOT HAPPEN...TERMINATE PROGRAM\n", oid);
1850     //    exit(-1);
1851   } else {
1852     printf("Error: in decoding the control value %d, %s, %d\n",control, __FILE__, __LINE__);
1853   }
1854
1855   return 0;
1856 }
1857
1858 unsigned short getObjType(unsigned int oid) {
1859   objheader_t *objheader;
1860   unsigned short numoffset[] ={0};
1861   short fieldoffset[] ={};
1862
1863   if ((objheader = (objheader_t *) mhashSearch(oid)) == NULL) {
1864 #ifdef CACHE
1865     if ((objheader = (objheader_t *) prehashSearch(oid)) == NULL) {
1866 #endif
1867     unsigned int mid = lhashSearch(oid);
1868     int sd = getSock2(transReadSockPool, mid);
1869     char remotereadrequest[sizeof(char)+sizeof(unsigned int)];
1870     remotereadrequest[0] = READ_REQUEST;
1871     *((unsigned int *)(&remotereadrequest[1])) = oid;
1872     send_data(sd, remotereadrequest, sizeof(remotereadrequest));
1873
1874     /* Read response from the Participant */
1875     char control;
1876     recv_data(sd, &control, sizeof(char));
1877
1878     if (control==OBJECT_NOT_FOUND) {
1879       printf("Error: in %s() THIS SHOULD NOT HAPPEN.....EXIT PROGRAM\n", __func__);
1880       fflush(stdout);
1881       exit(-1);
1882     } else {
1883       /* Read object if found into local cache */
1884       int size;
1885       recv_data(sd, &size, sizeof(int));
1886 #ifdef CACHE
1887       pthread_mutex_lock(&prefetchcache_mutex);
1888       if ((objheader = prefetchobjstrAlloc(size)) == NULL) {
1889         printf("Error: %s() objstrAlloc error for copying into prefetch cache %s, %d\n", __func__, __FILE__, __LINE__);
1890         pthread_exit(NULL);
1891       }
1892       pthread_mutex_unlock(&prefetchcache_mutex);
1893       recv_data(sd, objheader, size);
1894       prehashInsert(oid, objheader);
1895       return TYPE(objheader);
1896 #else
1897       char *buffer;
1898       if((buffer = calloc(1, size)) == NULL) {
1899         printf("%s() Calloc Error %s at line %d\n", __func__, __FILE__, __LINE__);
1900         fflush(stdout);
1901         return 0;
1902       }
1903       recv_data(sd, buffer, size);
1904       objheader = (objheader_t *)buffer;
1905       unsigned short type = TYPE(objheader);
1906       free(buffer);
1907       return type;
1908 #endif
1909     }
1910 #ifdef CACHE
1911   }
1912 #endif
1913   }
1914   return TYPE(objheader);
1915 }
1916
1917 int startRemoteThread(unsigned int oid, unsigned int mid) {
1918   int sock;
1919   struct sockaddr_in remoteAddr;
1920   char msg[1 + sizeof(unsigned int)];
1921   int bytesSent;
1922   int status;
1923
1924   if ((sock = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
1925     perror("startRemoteThread():socket()");
1926     return -1;
1927   }
1928
1929   bzero(&remoteAddr, sizeof(remoteAddr));
1930   remoteAddr.sin_family = AF_INET;
1931   remoteAddr.sin_port = htons(LISTEN_PORT);
1932   remoteAddr.sin_addr.s_addr = htonl(mid);
1933
1934   if (connect(sock, (struct sockaddr *)&remoteAddr, sizeof(remoteAddr)) < 0) {
1935     printf("startRemoteThread():error %d connecting to %s:%d\n", errno,
1936            inet_ntoa(remoteAddr.sin_addr), LISTEN_PORT);
1937     status = -1;
1938   } else
1939   {
1940     msg[0] = START_REMOTE_THREAD;
1941     *((unsigned int *) &msg[1]) = oid;
1942     send_data(sock, msg, 1 + sizeof(unsigned int));
1943   }
1944
1945   close(sock);
1946   return status;
1947 }
1948
1949 //TODO: when reusing oids, make sure they are not already in use!
1950 static unsigned int id = 0xFFFFFFFF;
1951 unsigned int getNewOID(void) {
1952   id += 2;
1953   if (id > oidMax || id < oidMin) {
1954     id = (oidMin | 1);
1955   }
1956   return id;
1957 }
1958
1959 int processConfigFile() {
1960   FILE *configFile;
1961   const int maxLineLength = 200;
1962   char lineBuffer[maxLineLength];
1963   char *token;
1964   const char *delimiters = " \t\n";
1965   char *commentBegin;
1966   in_addr_t tmpAddr;
1967
1968   configFile = fopen(CONFIG_FILENAME, "r");
1969   if (configFile == NULL) {
1970     printf("error opening %s:\n", CONFIG_FILENAME);
1971     perror("");
1972     return -1;
1973   }
1974
1975   numHostsInSystem = 0;
1976   sizeOfHostArray = 8;
1977   hostIpAddrs = calloc(sizeOfHostArray, sizeof(unsigned int));
1978
1979   while(fgets(lineBuffer, maxLineLength, configFile) != NULL) {
1980     commentBegin = strchr(lineBuffer, '#');
1981     if (commentBegin != NULL)
1982       *commentBegin = '\0';
1983     token = strtok(lineBuffer, delimiters);
1984     while (token != NULL) {
1985       tmpAddr = inet_addr(token);
1986       if ((int)tmpAddr == -1) {
1987         printf("error in %s: bad token:%s\n", CONFIG_FILENAME, token);
1988         fclose(configFile);
1989         return -1;
1990       } else
1991         addHost(htonl(tmpAddr));
1992       token = strtok(NULL, delimiters);
1993     }
1994   }
1995
1996   fclose(configFile);
1997
1998   if (numHostsInSystem < 1) {
1999     printf("error in %s: no IP Adresses found\n", CONFIG_FILENAME);
2000     return -1;
2001   }
2002 #ifdef MAC
2003   myIpAddr = getMyIpAddr("en1");
2004 #else
2005   myIpAddr = getMyIpAddr("eth0");
2006 #endif
2007   myIndexInHostArray = findHost(myIpAddr);
2008   if (myIndexInHostArray == -1) {
2009     printf("error in %s: IP Address of eth0 not found\n", CONFIG_FILENAME);
2010     return -1;
2011   }
2012   oidsPerBlock = (0xFFFFFFFF / numHostsInSystem) + 1;
2013   oidMin = oidsPerBlock * myIndexInHostArray;
2014   if (myIndexInHostArray == numHostsInSystem - 1)
2015     oidMax = 0xFFFFFFFF;
2016   else
2017     oidMax = oidsPerBlock * (myIndexInHostArray + 1) - 1;
2018
2019   return 0;
2020 }
2021
2022 void addHost(unsigned int hostIp) {
2023   unsigned int *tmpArray;
2024
2025   if (findHost(hostIp) != -1)
2026     return;
2027
2028   if (numHostsInSystem == sizeOfHostArray) {
2029     tmpArray = calloc(sizeOfHostArray * 2, sizeof(unsigned int));
2030     memcpy(tmpArray, hostIpAddrs, sizeof(unsigned int) * numHostsInSystem);
2031     free(hostIpAddrs);
2032     hostIpAddrs = tmpArray;
2033   }
2034
2035   hostIpAddrs[numHostsInSystem++] = hostIp;
2036
2037   return;
2038 }
2039
2040 int findHost(unsigned int hostIp) {
2041   int i;
2042   for (i = 0; i < numHostsInSystem; i++)
2043     if (hostIpAddrs[i] == hostIp)
2044       return i;
2045
2046   //not found
2047   return -1;
2048 }
2049
2050 /* This function sends notification request per thread waiting on object(s) whose version
2051  * changes */
2052 int reqNotify(unsigned int *oidarry, unsigned short *versionarry, unsigned int numoid) {
2053   int sock,i;
2054   objheader_t *objheader;
2055   struct sockaddr_in remoteAddr;
2056   char msg[1 + numoid * (sizeof(unsigned short) + sizeof(unsigned int)) +  3 * sizeof(unsigned int)];
2057   char *ptr;
2058   int bytesSent;
2059   int status, size;
2060   unsigned short version;
2061   unsigned int oid,mid;
2062   static unsigned int threadid = 0;
2063   pthread_mutex_t threadnotify = PTHREAD_MUTEX_INITIALIZER; //Lock and condition var for threadjoin and notification
2064   pthread_cond_t threadcond = PTHREAD_COND_INITIALIZER;
2065   notifydata_t *ndata;
2066
2067   oid = oidarry[0];
2068   if((mid = lhashSearch(oid)) == 0) {
2069     printf("Error: %s() No such machine found for oid =%x\n",__func__, oid);
2070     return;
2071   }
2072
2073   if ((sock = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
2074     perror("reqNotify():socket()");
2075     return -1;
2076   }
2077
2078   bzero(&remoteAddr, sizeof(remoteAddr));
2079   remoteAddr.sin_family = AF_INET;
2080   remoteAddr.sin_port = htons(LISTEN_PORT);
2081   remoteAddr.sin_addr.s_addr = htonl(mid);
2082
2083   /* Generate unique threadid */
2084   threadid++;
2085
2086   /* Save threadid, numoid, oidarray, versionarray, pthread_cond_variable for later processing */
2087   if((ndata = calloc(1, sizeof(notifydata_t))) == NULL) {
2088     printf("Calloc Error %s, %d\n", __FILE__, __LINE__);
2089     return -1;
2090   }
2091   ndata->numoid = numoid;
2092   ndata->threadid = threadid;
2093   ndata->oidarry = oidarry;
2094   ndata->versionarry = versionarry;
2095   ndata->threadcond = threadcond;
2096   ndata->threadnotify = threadnotify;
2097   if((status = notifyhashInsert(threadid, ndata)) != 0) {
2098     printf("reqNotify(): Insert into notify hash table not successful %s, %d\n", __FILE__, __LINE__);
2099     free(ndata);
2100     return -1;
2101   }
2102
2103   /* Send  number of oids, oidarry, version array, machine id and threadid */
2104   if (connect(sock, (struct sockaddr *)&remoteAddr, sizeof(remoteAddr)) < 0) {
2105     printf("reqNotify():error %d connecting to %s:%d\n", errno,
2106            inet_ntoa(remoteAddr.sin_addr), LISTEN_PORT);
2107     free(ndata);
2108     return -1;
2109   } else {
2110     msg[0] = THREAD_NOTIFY_REQUEST;
2111     *((unsigned int *)(&msg[1])) = numoid;
2112     /* Send array of oids  */
2113     size = sizeof(unsigned int);
2114
2115     for(i = 0;i < numoid; i++) {
2116       oid = oidarry[i];
2117       *((unsigned int *)(&msg[1] + size)) = oid;
2118       size += sizeof(unsigned int);
2119     }
2120
2121     /* Send array of version  */
2122     for(i = 0;i < numoid; i++) {
2123       version = versionarry[i];
2124       *((unsigned short *)(&msg[1] + size)) = version;
2125       size += sizeof(unsigned short);
2126     }
2127
2128     *((unsigned int *)(&msg[1] + size)) = myIpAddr; size += sizeof(unsigned int);
2129     *((unsigned int *)(&msg[1] + size)) = threadid;
2130     pthread_mutex_lock(&(ndata->threadnotify));
2131     size = 1 + numoid * (sizeof(unsigned int) + sizeof(unsigned short)) + 3 * sizeof(unsigned int);
2132     send_data(sock, msg, size);
2133     pthread_cond_wait(&(ndata->threadcond), &(ndata->threadnotify));
2134     pthread_mutex_unlock(&(ndata->threadnotify));
2135   }
2136
2137   pthread_cond_destroy(&threadcond);
2138   pthread_mutex_destroy(&threadnotify);
2139   free(ndata);
2140   close(sock);
2141   return status;
2142 }
2143
2144 void threadNotify(unsigned int oid, unsigned short version, unsigned int tid) {
2145   notifydata_t *ndata;
2146   int i, objIsFound = 0, index;
2147   void *ptr;
2148
2149   //Look up the tid and call the corresponding pthread_cond_signal
2150   if((ndata = notifyhashSearch(tid)) == NULL) {
2151     printf("threadnotify(): No such threadid is present %s, %d\n", __FILE__, __LINE__);
2152     return;
2153   } else  {
2154     for(i = 0; i < ndata->numoid; i++) {
2155       if(ndata->oidarry[i] == oid) {
2156         objIsFound = 1;
2157         index = i;
2158       }
2159     }
2160     if(objIsFound == 0) {
2161       printf("threadNotify(): Oid not found %s, %d\n", __FILE__, __LINE__);
2162       return;
2163     } else {
2164       if(version <= ndata->versionarry[index]) {
2165         printf("threadNotify(): New version %d has not changed since last version for oid = %d, %s, %d\n", version, oid, __FILE__, __LINE__);
2166         return;
2167       } else {
2168 #ifdef CACHE
2169         /* Clear from prefetch cache and free thread related data structure */
2170         if((ptr = prehashSearch(oid)) != NULL) {
2171           prehashRemove(oid);
2172         }
2173 #endif
2174         pthread_mutex_lock(&(ndata->threadnotify));
2175         pthread_cond_signal(&(ndata->threadcond));
2176         pthread_mutex_unlock(&(ndata->threadnotify));
2177       }
2178     }
2179   }
2180   return;
2181 }
2182
2183 int notifyAll(threadlist_t **head, unsigned int oid, unsigned int version) {
2184   threadlist_t *ptr;
2185   unsigned int mid;
2186   struct sockaddr_in remoteAddr;
2187   char msg[1 + sizeof(unsigned short) + 2*sizeof(unsigned int)];
2188   int sock, status, size, bytesSent;
2189
2190   while(*head != NULL) {
2191     ptr = *head;
2192     mid = ptr->mid;
2193     //create a socket connection to that machine
2194     if ((sock = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
2195       perror("notifyAll():socket()");
2196       return -1;
2197     }
2198
2199     bzero(&remoteAddr, sizeof(remoteAddr));
2200     remoteAddr.sin_family = AF_INET;
2201     remoteAddr.sin_port = htons(LISTEN_PORT);
2202     remoteAddr.sin_addr.s_addr = htonl(mid);
2203     //send Thread Notify response and threadid to that machine
2204     if (connect(sock, (struct sockaddr *)&remoteAddr, sizeof(remoteAddr)) < 0) {
2205       printf("notifyAll():error %d connecting to %s:%d\n", errno,
2206              inet_ntoa(remoteAddr.sin_addr), LISTEN_PORT);
2207       fflush(stdout);
2208       status = -1;
2209     } else {
2210       bzero(msg, (1+sizeof(unsigned short) + 2*sizeof(unsigned int)));
2211       msg[0] = THREAD_NOTIFY_RESPONSE;
2212       *((unsigned int *)&msg[1]) = oid;
2213       size = sizeof(unsigned int);
2214       *((unsigned short *)(&msg[1]+ size)) = version;
2215       size+= sizeof(unsigned short);
2216       *((unsigned int *)(&msg[1]+ size)) = ptr->threadid;
2217
2218       size = 1 + 2*sizeof(unsigned int) + sizeof(unsigned short);
2219       send_data(sock, msg, size);
2220     }
2221     //close socket
2222     close(sock);
2223     // Update head
2224     *head = ptr->next;
2225     free(ptr);
2226   }
2227   return status;
2228 }
2229
2230 void transAbort() {
2231 #ifdef ABORTREADERS
2232   removetransactionhash();
2233 #endif
2234   objstrDelete(t_cache);
2235   t_chashDelete();
2236 }
2237
2238 /* This function inserts necessary information into
2239  * a machine pile data structure */
2240 plistnode_t *pInsert(plistnode_t *pile, objheader_t *headeraddr, unsigned int mid, int num_objs) {
2241   plistnode_t *ptr, *tmp;
2242   int found = 0, offset = 0;
2243
2244   tmp = pile;
2245   //Add oid into a machine that is already present in the pile linked list structure
2246   while(tmp != NULL) {
2247     if (tmp->mid == mid) {
2248       int tmpsize;
2249
2250       if (STATUS(headeraddr) & NEW) {
2251         tmp->oidcreated[tmp->numcreated] = OID(headeraddr);
2252         tmp->numcreated++;
2253         GETSIZE(tmpsize, headeraddr);
2254         tmp->sum_bytes += sizeof(objheader_t) + tmpsize;
2255       } else if (STATUS(headeraddr) & DIRTY) {
2256         tmp->oidmod[tmp->nummod] = OID(headeraddr);
2257         tmp->nummod++;
2258         GETSIZE(tmpsize, headeraddr);
2259         tmp->sum_bytes += sizeof(objheader_t) + tmpsize;
2260       } else {
2261         offset = (sizeof(unsigned int) + sizeof(short)) * tmp->numread;
2262         *((unsigned int *)(((char *)tmp->objread) + offset))=OID(headeraddr);
2263         offset += sizeof(unsigned int);
2264         *((short *)(((char *)tmp->objread) + offset)) = headeraddr->version;
2265         tmp->numread++;
2266       }
2267       found = 1;
2268       break;
2269     }
2270     tmp = tmp->next;
2271   }
2272   //Add oid for any new machine
2273   if (!found) {
2274     int tmpsize;
2275     if((ptr = pCreate(num_objs)) == NULL) {
2276       return NULL;
2277     }
2278     ptr->mid = mid;
2279     if (STATUS(headeraddr) & NEW) {
2280       ptr->oidcreated[ptr->numcreated] = OID(headeraddr);
2281       ptr->numcreated++;
2282       GETSIZE(tmpsize, headeraddr);
2283       ptr->sum_bytes += sizeof(objheader_t) + tmpsize;
2284     } else if (STATUS(headeraddr) & DIRTY) {
2285       ptr->oidmod[ptr->nummod] = OID(headeraddr);
2286       ptr->nummod++;
2287       GETSIZE(tmpsize, headeraddr);
2288       ptr->sum_bytes += sizeof(objheader_t) + tmpsize;
2289     } else {
2290       *((unsigned int *)ptr->objread)=OID(headeraddr);
2291       offset = sizeof(unsigned int);
2292       *((short *)(((char *)ptr->objread) + offset)) = headeraddr->version;
2293       ptr->numread++;
2294     }
2295     ptr->next = pile;
2296     pile = ptr;
2297   }
2298
2299   /* Clear Flags */
2300   STATUS(headeraddr) =0;
2301
2302
2303   return pile;
2304 }
2305
2306 plistnode_t *sortPiles(plistnode_t *pileptr) {
2307   plistnode_t *head, *ptr, *tail;
2308   head = pileptr;
2309   ptr = pileptr;
2310   /* Get tail pointer */
2311   while(ptr!= NULL) {
2312     tail = ptr;
2313     ptr = ptr->next;
2314   }
2315   ptr = pileptr;
2316   plistnode_t *prev = pileptr;
2317   /* Arrange local machine processing at the end of the pile list */
2318   while(ptr != NULL) {
2319     if(ptr != tail) {
2320       if(ptr->mid == myIpAddr && (prev != pileptr)) {
2321         prev->next = ptr->next;
2322         ptr->next = NULL;
2323         tail->next = ptr;
2324         return pileptr;
2325       }
2326       if((ptr->mid == myIpAddr) && (prev == pileptr)) {
2327         prev = ptr->next;
2328         ptr->next = NULL;
2329         tail->next = ptr;
2330         return prev;
2331       }
2332       prev = ptr;
2333     }
2334     ptr = ptr->next;
2335   }
2336   return pileptr;
2337 }