complete sandbox implementation
[IRC.git] / Robust / src / Runtime / DSTM / interface / trans.c
1 #include "dstm.h"
2 #include "debugmacro.h"
3 #include "ip.h"
4 #include "machinepile.h"
5 #include "altmlookup.h"
6 #include "llookup.h"
7 #include "plookup.h"
8 #include "altprelookup.h"
9 #include "threadnotify.h"
10 #include "queue.h"
11 #include "addUdpEnhance.h"
12 #include "addPrefetchEnhance.h"
13 #include "gCollect.h"
14 #include "dsmlock.h"
15 #include "prefetch.h"
16 #ifdef COMPILER
17 #include "thread.h"
18 #endif
19 #ifdef ABORTREADERS
20 #include "abortreaders.h"
21 #endif
22 #include "trans.h"
23
24 #define NUM_THREADS 1
25 #define CONFIG_FILENAME "dstm.conf"
26
27 //#define LOGEVENTS //turn on Logging events
28 #ifdef LOGEVENTS
29 char bigarray[16*1024*1024];
30 int bigindex=0;
31 #define LOGEVENT(x) { \
32     int tmp=bigindex++;                         \
33     bigarray[tmp]=x;                            \
34   }
35 #else
36 #define LOGEVENT(x)
37 #endif
38
39 //#define LOGTIMES
40 #ifdef LOGTIMES
41 char bigarray1[6*1024*1024];
42 unsigned int bigarray2[6*1024*1024];
43 unsigned int bigarray3[6*1024*1024];
44 long long bigarray4[6*1024*1024];
45 int bigarray5[6*1024*1024];
46 int bigindex1=0;
47 #define LOGTIME(x,y,z,a,b) {\
48   int tmp=bigindex1; \
49   bigarray1[tmp]=x; \
50   bigarray2[tmp]=y; \
51   bigarray3[tmp]=z; \
52   bigarray4[tmp]=a; \
53   bigarray5[tmp]=b; \
54   bigindex1++; \
55 }
56 #else
57 #define LOGTIME(x,y,z,a,b)
58 #endif
59
60 /* Thread transaction variables */
61
62 __thread objstr_t *t_cache;
63 __thread struct ___Object___ *revertlist;
64 #ifdef ABORTREADERS
65 __thread int t_abort;
66 __thread jmp_buf aborttrans;
67 #endif
68
69 int globalid=0; /* This variable is a unique global identifier for a sendPrefetch request */
70
71 /* Global Variables */
72 extern int classsize[];
73 pfcstats_t *evalPrefetch;
74 extern int numprefetchsites; //Global variable containing number of prefetch sites
75 extern pthread_mutex_t mainobjstore_mutex; // Mutex to lock main Object store
76 pthread_mutex_t prefetchcache_mutex; // Mutex to lock Prefetch Cache
77 pthread_mutexattr_t prefetchcache_mutex_attr; /* Attribute for lock to make it a recursive lock */
78 extern prehashtable_t pflookup; //Global Prefetch cache's lookup table
79 pthread_t wthreads[NUM_THREADS]; //Worker threads for working on the prefetch queue
80 pthread_t tPrefetch;            /* Primary Prefetch thread that processes the prefetch queue */
81 extern objstr_t *mainobjstore;
82 unsigned int myIpAddr;
83 unsigned int *hostIpAddrs;
84 int sizeOfHostArray;
85 int numHostsInSystem;
86 int myIndexInHostArray;
87 unsigned int oidsPerBlock;
88 unsigned int oidMin;
89 unsigned int oidMax;
90
91 sockPoolHashTable_t *transReadSockPool;
92 sockPoolHashTable_t *transPrefetchSockPool;
93 sockPoolHashTable_t *transRequestSockPool;
94 pthread_mutex_t notifymutex;
95 pthread_mutex_t atomicObjLock;
96
97 /***********************************
98  * Global Variables for statistics
99  **********************************/
100 int numTransCommit = 0;
101 int numTransAbort = 0;
102 int nchashSearch = 0;
103 int nmhashSearch = 0;
104 int nprehashSearch = 0;
105 int ndirtyCacheObj = 0;
106 int nRemoteSend = 0;
107 int nSoftAbort = 0;
108 int bytesSent = 0;
109 int bytesRecv = 0;
110 int totalObjSize = 0;
111 int sendRemoteReq = 0;
112 int getResponse = 0;
113
114 void printhex(unsigned char *, int);
115 plistnode_t *createPiles();
116 plistnode_t *sortPiles(plistnode_t *pileptr);
117
118
119
120 /*******************************
121 * Send and Recv function calls
122 *******************************/
123 void send_data(int fd, void *buf, int buflen) {
124   char *buffer = (char *)(buf);
125   int size = buflen;
126   int numbytes;
127   while (size > 0) {
128     numbytes = send(fd, buffer, size, MSG_NOSIGNAL);
129     bytesSent = bytesSent + numbytes;
130     if (numbytes == -1) {
131       perror("send");
132       exit(0);
133     }
134     buffer += numbytes;
135     size -= numbytes;
136   }
137 }
138
139 void send_buf(int fd, struct writestruct * sendbuffer, void *buffer, int buflen) {
140   if (buflen+sendbuffer->offset>WMAXBUF) {
141     send_data(fd, sendbuffer->buf, sendbuffer->offset);
142     sendbuffer->offset=0;
143     send_data(fd, buffer, buflen);
144     return;
145   }
146   memcpy(&sendbuffer->buf[sendbuffer->offset], buffer, buflen);
147   sendbuffer->offset+=buflen;
148   if (sendbuffer->offset>WTOP) {
149     send_data(fd, sendbuffer->buf, sendbuffer->offset);
150     sendbuffer->offset=0;
151   }
152 }
153
154 void forcesend_buf(int fd, struct writestruct * sendbuffer, void *buffer, int buflen) {
155   if (buflen+sendbuffer->offset>WMAXBUF) {
156     send_data(fd, sendbuffer->buf, sendbuffer->offset);
157     sendbuffer->offset=0;
158     send_data(fd, buffer, buflen);
159     return;
160   }
161   memcpy(&sendbuffer->buf[sendbuffer->offset], buffer, buflen);
162   sendbuffer->offset+=buflen;
163   send_data(fd, sendbuffer->buf, sendbuffer->offset);
164   sendbuffer->offset=0;
165 }
166
167 int recvw(int fd, void *buf, int len, int flags) {
168   return recv(fd, buf, len, flags);
169 }
170  
171 void recv_data_buf(int fd, struct readstruct * readbuffer, void *buffer, int buflen) {
172   char *buf=(char *)buffer;
173   int numbytes=readbuffer->head-readbuffer->tail;
174   if (numbytes>buflen)
175     numbytes=buflen;
176   if (numbytes>0) {
177     memcpy(buf, &readbuffer->buf[readbuffer->tail], numbytes);
178     readbuffer->tail+=numbytes;
179     buflen-=numbytes;
180     buf+=numbytes;
181   }
182   if (buflen==0) {
183     return;
184   }
185   if (buflen>=MAXBUF) {
186     recv_data(fd, buf, buflen);
187     return;
188   }
189   
190   int maxbuf=MAXBUF;
191   int obufflen=buflen;
192   readbuffer->head=0;
193   
194   while (buflen > 0) {
195     int numbytes = recvw(fd, &readbuffer->buf[readbuffer->head], maxbuf, 0);
196     if (numbytes == -1) {
197       perror("recv");
198       exit(0);
199     }
200     bytesRecv+=numbytes;
201     buflen-=numbytes;
202     readbuffer->head+=numbytes;
203     maxbuf-=numbytes;
204   }
205   memcpy(buf,readbuffer->buf,obufflen);
206   readbuffer->tail=obufflen;
207 }
208
209 int recv_data_errorcode_buf(int fd, struct readstruct * readbuffer, void *buffer, int buflen) {
210   char *buf=(char *)buffer;
211   //now tail<=head
212   int numbytes=readbuffer->head-readbuffer->tail;
213   if (numbytes>buflen)
214     numbytes=buflen;
215   if (numbytes>0) {
216     memcpy(buf, &readbuffer->buf[readbuffer->tail], numbytes);
217     readbuffer->tail+=numbytes;
218     buflen-=numbytes;
219     buf+=numbytes;
220   }
221   if (buflen==0)
222     return 1;
223
224   if (buflen>=MAXBUF) {
225     return recv_data_errorcode(fd, buf, buflen);
226   }
227
228   int maxbuf=MAXBUF;
229   int obufflen=buflen;
230   readbuffer->head=0;
231   
232   while (buflen > 0) {
233     int numbytes = recvw(fd, &readbuffer->buf[readbuffer->head], maxbuf, 0);
234     if (numbytes ==0) {
235       return 0;
236     }
237     if (numbytes==-1) {
238       perror("recvbuf");
239       return -1;
240     }
241     bytesRecv+=numbytes;
242     buflen-=numbytes;
243     readbuffer->head+=numbytes;
244     maxbuf-=numbytes;
245   }
246   memcpy(buf,readbuffer->buf,obufflen);
247   readbuffer->tail=obufflen;
248   return 1;
249 }
250
251
252 void recv_data(int fd, void *buf, int buflen) {
253   char *buffer = (char *)(buf);
254   int size = buflen;
255   int numbytes;
256   while (size > 0) {
257     numbytes = recvw(fd, buffer, size, 0);
258     bytesRecv = bytesRecv + numbytes;
259     if (numbytes == -1) {
260       perror("recv");
261       exit(0);
262     }
263     buffer += numbytes;
264     size -= numbytes;
265   }
266 }
267
268 int recv_data_errorcode(int fd, void *buf, int buflen) {
269   char *buffer = (char *)(buf);
270   int size = buflen;
271   int numbytes;
272   while (size > 0) {
273     numbytes = recvw(fd, buffer, size, 0);
274     if (numbytes==0)
275       return 0;
276     if (numbytes == -1) {
277       perror("recv");
278       return -1;
279     }
280     bytesRecv+=numbytes;
281     buffer += numbytes;
282     size -= numbytes;
283   }
284   return 1;
285 }
286
287 void printhex(unsigned char *ptr, int numBytes) {
288   int i;
289   for (i = 0; i < numBytes; i++) {
290     if (ptr[i] < 16)
291       printf("0%x ", ptr[i]);
292     else
293       printf("%x ", ptr[i]);
294   }
295   printf("\n");
296   return;
297 }
298
299 inline int arrayLength(int *array) {
300   int i;
301   for(i=0 ; array[i] != -1; i++)
302     ;
303   return i;
304 }
305
306 inline int findmax(int *array, int arraylength) {
307   int max, i;
308   max = array[0];
309   for(i = 0; i < arraylength; i++) {
310     if(array[i] > max) {
311       max = array[i];
312     }
313   }
314   return max;
315 }
316
317 //#define INLINEPREFETCH
318 #define PREFTHRESHOLD 0
319
320 /* This function is a prefetch call generated by the compiler that
321  * populates the shared primary prefetch queue*/
322 void prefetch(int siteid, int ntuples, unsigned int *oids, unsigned short *endoffsets, short *arrayfields) {
323   /* Allocate for the queue node*/
324   int qnodesize = 2*sizeof(int) + ntuples * (sizeof(unsigned short) + sizeof(unsigned int)) + endoffsets[ntuples - 1] * sizeof(short);
325   int len;
326 #ifdef INLINEPREFETCH
327   int attempted=0;
328   char *node;
329   do {
330   node=getmemory(qnodesize);
331   if (node==NULL&&attempted)
332     break;
333   if (node!=NULL) {
334 #else
335   char *node=getmemory(qnodesize);
336 #endif
337   int top=endoffsets[ntuples-1];
338
339   if (node==NULL) {
340     LOGEVENT('D');
341     return;
342   }
343   /* Set queue node values */
344
345   /* TODO: Remove this after testing */
346   evalPrefetch[siteid].callcount++;
347
348   *((int *)(node))=siteid;
349   *((int *)(node + sizeof(int))) = ntuples;
350   len = 2*sizeof(int);
351   memcpy(node+len, oids, ntuples*sizeof(unsigned int));
352   memcpy(node+len+ntuples*sizeof(unsigned int), endoffsets, ntuples*sizeof(unsigned short));
353   memcpy(node+len+ntuples*(sizeof(unsigned int)+sizeof(short)), arrayfields, top*sizeof(short));
354
355 #ifdef INLINEPREFETCH
356   movehead(qnodesize);
357   }
358   int numpref=numavailable();
359   attempted=1;
360
361   if (node==NULL && numpref!=0 || numpref>=PREFTHRESHOLD) {
362     node=gettail();
363     prefetchpile_t *pilehead = foundLocal(node,numpref,siteid);
364     if (pilehead!=NULL) {
365       // Get sock from shared pool
366       
367       /* Send  Prefetch Request */
368       prefetchpile_t *ptr = pilehead;
369       while(ptr != NULL) {
370         globalid++;
371         int sd = getSock2(transPrefetchSockPool, ptr->mid);
372         sendPrefetchReq(ptr, sd, globalid);
373         ptr = ptr->next;
374       }
375       
376       mcdealloc(pilehead);
377     }
378     resetqueue();
379   }//end do prefetch if condition
380   } while(node==NULL);
381 #else
382   /* Lock and insert into primary prefetch queue */
383   movehead(qnodesize);
384 #endif
385 }
386
387 /* This function starts up the transaction runtime. */
388 int dstmStartup(const char * option) {
389   pthread_t thread_Listen, udp_thread_Listen;
390   pthread_attr_t attr;
391   int master=option!=NULL && strcmp(option, "master")==0;
392   int fd;
393   int udpfd;
394
395   if (processConfigFile() != 0)
396     return 0; //TODO: return error value, cause main program to exit
397 #ifdef COMPILER
398   if (!master)
399     threadcount--;
400 #endif
401
402 #ifdef TRANSSTATS
403   printf("Trans stats is on\n");
404   fflush(stdout);
405 #endif
406 #ifdef ABORTREADERS
407   initreaderlist();
408 #endif
409
410   //Initialize socket pool
411   transReadSockPool = createSockPool(transReadSockPool, DEFAULTSOCKPOOLSIZE);
412   transPrefetchSockPool = createSockPool(transPrefetchSockPool, DEFAULTSOCKPOOLSIZE);
413   transRequestSockPool = createSockPool(transRequestSockPool, DEFAULTSOCKPOOLSIZE);
414
415   dstmInit();
416   transInit();
417
418   fd=startlistening();
419   pthread_attr_init(&attr);
420   pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
421 #ifdef CACHE
422   udpfd = udpInit();
423   pthread_create(&udp_thread_Listen, &attr, udpListenBroadcast, (void*)udpfd);
424 #endif
425   if (master) {
426     pthread_create(&thread_Listen, &attr, dstmListen, (void*)fd);
427     return 1;
428   } else {
429     dstmListen((void *)fd);
430     return 0;
431   }
432 }
433
434 //TODO Use this later
435 void *pCacheAlloc(objstr_t *store, unsigned int size) {
436   void *tmp;
437   objstr_t *ptr;
438   ptr = store;
439   int success = 0;
440
441   while(ptr->next != NULL) {
442     /* check if store is empty */
443     if(((unsigned int)ptr->top - (unsigned int)ptr - sizeof(objstr_t) + size) <= ptr->size) {
444       tmp = ptr->top;
445       ptr->top += size;
446       success = 1;
447       return tmp;
448     } else {
449       ptr = ptr->next;
450     }
451   }
452
453   if(success == 0) {
454     return NULL;
455   }
456 }
457
458 /* This function initiates the prefetch thread A queue is shared
459  * between the main thread of execution and the prefetch thread to
460  * process the prefetch call Call from compiler populates the shared
461  * queue with prefetch requests while prefetch thread processes the
462  * prefetch requests */
463
464 void transInit() {
465   //Create and initialize prefetch cache structure
466 #ifdef CACHE
467   initializePCache();
468   if((evalPrefetch = initPrefetchStats()) == NULL) {
469     printf("%s() Error allocating memory at %s, %d\n", __func__, __FILE__, __LINE__);
470     exit(0);
471   }
472 #endif
473
474   /* Initialize attributes for mutex */
475   pthread_mutexattr_init(&prefetchcache_mutex_attr);
476   pthread_mutexattr_settype(&prefetchcache_mutex_attr, PTHREAD_MUTEX_RECURSIVE_NP);
477
478   pthread_mutex_init(&prefetchcache_mutex, &prefetchcache_mutex_attr);
479   pthread_mutex_init(&notifymutex, NULL);
480   pthread_mutex_init(&atomicObjLock, NULL);
481 #ifdef CACHE
482   //Create prefetch cache lookup table
483   if(prehashCreate(PHASH_SIZE, PLOADFACTOR)) {
484     printf("ERROR\n");
485     return; //Failure
486   }
487
488   //Initialize primary shared queue
489   queueInit();
490   //Initialize machine pile w/prefetch oids and offsets shared queue
491   mcpileqInit();
492
493   //Create the primary prefetch thread
494   int retval;
495 #ifdef RANGEPREFETCH
496   do {
497     retval=pthread_create(&tPrefetch, NULL, transPrefetchNew, NULL);
498   } while(retval!=0);
499 #else
500 #ifndef INLINEPREFETCH
501   do {
502     retval=pthread_create(&tPrefetch, NULL, transPrefetch, NULL);
503   } while(retval!=0);
504 #endif
505 #endif
506 #ifndef INLINEPREFETCH
507   pthread_detach(tPrefetch);
508 #endif
509 #endif
510 }
511
512 /* This function stops the threads spawned */
513 void transExit() {
514 #ifdef CACHE
515   int t;
516   pthread_cancel(tPrefetch);
517   for(t = 0; t < NUM_THREADS; t++)
518     pthread_cancel(wthreads[t]);
519 #endif
520
521   return;
522 }
523
524 /* This functions inserts randowm wait delays in the order of msec
525  * Mostly used when transaction commits retry*/
526 void randomdelay() {
527   struct timespec req;
528   time_t t;
529
530   t = time(NULL);
531   req.tv_sec = 0;
532   req.tv_nsec = (long)(1000 + (t%10000)); //1-11 microsec
533   nanosleep(&req, NULL);
534   return;
535 }
536
537 /* This function initializes things required in the transaction start*/
538 void transStart() {
539   t_cache = objstrCreate(1048576);
540   t_chashCreate(CHASH_SIZE, CLOADFACTOR);
541   revertlist=NULL;
542 #ifdef ABORTREADERS
543   t_abort=0;
544 #endif
545 }
546
547 // Search for an address for a given oid                                                                               
548 /*#define INLINE    inline __attribute__((always_inline))
549
550 INLINE void * chashSearchI(chashtable_t *table, unsigned int key) {
551   //REMOVE HASH FUNCTION CALL TO MAKE SURE IT IS INLINED HERE                                                          
552   chashlistnode_t *node = &table->table[(key & table->mask)>>1];
553
554   do {
555     if(node->key == key) {
556       return node->val;
557     }
558     node = node->next;
559   } while(node != NULL);
560
561   return NULL;
562   }*/
563
564
565
566
567 /* This function finds the location of the objects involved in a transaction
568  * and returns the pointer to the object if found in a remote location */
569 __attribute__((pure)) objheader_t *transRead(unsigned int oid) {
570   unsigned int machinenumber;
571   objheader_t *tmp, *objheader;
572   objheader_t *objcopy;
573   int size;
574   void *buf;
575   chashlistnode_t *node;
576
577   if(oid == 0) {
578     return NULL;
579   }
580   
581
582   node= &c_table[(oid & c_mask)>>1];
583   do {
584     if(node->key == oid) {
585 #ifdef TRANSSTATS
586     nchashSearch++;
587 #endif
588 #ifdef COMPILER
589     return &((objheader_t*)node->val)[1];
590 #else
591     return node->val;
592 #endif
593     }
594     node = node->next;
595   } while(node != NULL);
596   
597
598   /*  
599   if((objheader = chashSearchI(record->lookupTable, oid)) != NULL) {
600 #ifdef TRANSSTATS
601     nchashSearch++;
602 #endif
603 #ifdef COMPILER
604     return &objheader[1];
605 #else
606     return objheader;
607 #endif
608   } else 
609   */
610
611 #ifdef ABORTREADERS
612   if (t_abort) {
613     //abort this transaction
614     removetransactionhash();
615     objstrDelete(t_cache);
616     t_chashDelete();
617     _longjmp(aborttrans,1);
618   } else
619     addtransaction(oid);
620 #endif
621
622   if ((objheader = (objheader_t *) mhashSearch(oid)) != NULL) {
623 #ifdef TRANSSTATS
624     nmhashSearch++;
625 #endif
626     /* Look up in machine lookup table  and copy  into cache*/
627     GETSIZE(size, objheader);
628     size += sizeof(objheader_t);
629     objcopy = (objheader_t *) objstrAlloc(&t_cache, size);
630     memcpy(objcopy, objheader, size);
631     /* Insert into cache's lookup table */
632     STATUS(objcopy)=0;
633     t_chashInsert(OID(objheader), objcopy);
634 #ifdef COMPILER
635     return &objcopy[1];
636 #else
637     return objcopy;
638 #endif
639   } else {
640 #ifdef CACHE
641     if((tmp = (objheader_t *) prehashSearch(oid)) != NULL) {
642       if(STATUS(tmp) & DIRTY) {
643 #ifdef TRANSSTATS
644         ndirtyCacheObj++;
645 #endif
646         goto remoteread;
647       }
648 #ifdef TRANSSTATS
649       nprehashSearch++;
650 #endif
651       /* Look up in prefetch cache */
652       GETSIZE(size, tmp);
653       size+=sizeof(objheader_t);
654       objcopy = (objheader_t *) objstrAlloc(&t_cache, size);
655       memcpy(objcopy, tmp, size);
656       /* Insert into cache's lookup table */
657       t_chashInsert(OID(tmp), objcopy);
658 #ifdef COMPILER
659       return &objcopy[1];
660 #else
661       return objcopy;
662 #endif
663     }
664 remoteread:
665 #endif
666     /* Get the object from the remote location */
667     if((machinenumber = lhashSearch(oid)) == 0) {
668       printf("Error: %s() No machine found for oid =% %s,%dx\n",__func__, machinenumber, __FILE__, __LINE__);
669       return NULL;
670     }
671     objcopy = getRemoteObj(machinenumber, oid);
672
673     if(objcopy == NULL) {
674       printf("Error: Object not found in Remote location %s, %d\n", __FILE__, __LINE__);
675       return NULL;
676     } else {
677 #ifdef TRANSSTATS
678       nRemoteSend++;
679 #endif
680 #ifdef COMPILER
681 #ifdef CACHE
682       //Copy object to prefetch cache
683       pthread_mutex_lock(&prefetchcache_mutex);
684       objheader_t *headerObj;
685       int size;
686       GETSIZE(size, objcopy);
687       if((headerObj = prefetchobjstrAlloc(size + sizeof(objheader_t))) == NULL) {
688         printf("%s(): Error in getting memory from prefetch cache at %s, %d\n", __func__,
689             __FILE__, __LINE__);
690         pthread_mutex_unlock(&prefetchcache_mutex);
691         return NULL;
692       }
693       pthread_mutex_unlock(&prefetchcache_mutex);
694       memcpy(headerObj, objcopy, size+sizeof(objheader_t));
695       //make an entry in prefetch lookup hashtable
696       prehashInsert(oid, headerObj);
697       LOGEVENT('B');
698 #endif
699       return &objcopy[1];
700 #else
701       return objcopy;
702 #endif
703     }
704   }
705 }
706
707
708 /* This function finds the location of the objects involved in a transaction
709  * and returns the pointer to the object if found in a remote location */
710 __attribute__((pure)) objheader_t *transRead2(unsigned int oid) {
711 //DEBUG: __attribute__((pure)) objheader_t *transRead2(unsigned int oid, char tmpptr[]) {
712   unsigned int machinenumber;
713   objheader_t *tmp, *objheader;
714   objheader_t *objcopy;
715   int size;
716
717 #ifdef ABORTREADERS
718   if (t_abort) {
719     //abort this transaction
720     removetransactionhash();
721     objstrDelete(t_cache);
722     t_chashDelete();
723     _longjmp(aborttrans,1);
724   } else
725     addtransaction(oid);
726 #endif
727
728   if ((objheader = (objheader_t *) mhashSearch(oid)) != NULL) {
729 #ifdef TRANSSTATS
730     nmhashSearch++;
731 #endif
732     /* Look up in machine lookup table  and copy  into cache*/
733     GETSIZE(size, objheader);
734     size += sizeof(objheader_t);
735     objcopy = (objheader_t *) objstrAlloc(&t_cache, size);
736     memcpy(objcopy, objheader, size);
737     /* Insert into cache's lookup table */
738     STATUS(objcopy)=0;
739     t_chashInsert(OID(objheader), objcopy);
740 #ifdef COMPILER
741     return &objcopy[1];
742 #else
743     return objcopy;
744 #endif
745   } else {
746 #ifdef CACHE
747     if((tmp = (objheader_t *) prehashSearch(oid)) != NULL) {
748       if(STATUS(tmp) & DIRTY) {
749 #ifdef TRANSSTATS
750         ndirtyCacheObj++;
751 #endif
752         goto remoteread;
753       }
754 #ifdef TRANSSTATS
755       LOGEVENT('P')
756       nprehashSearch++;
757 #endif
758       /* Look up in prefetch cache */
759       GETSIZE(size, tmp);
760       size+=sizeof(objheader_t);
761       objcopy = (objheader_t *) objstrAlloc(&t_cache, size);
762       memcpy(objcopy, tmp, size);
763       LOGOIDTYPE("P",oid, TYPE(objcopy), myrdtsc());
764       /* Insert into cache's lookup table */
765       t_chashInsert(OID(tmp), objcopy);
766 #ifdef COMPILER
767       return &objcopy[1];
768 #else
769       return objcopy;
770 #endif
771     }
772 remoteread:
773 #endif
774     /* Get the object from the remote location */
775     if((machinenumber = lhashSearch(oid)) == 0) {
776       printf("Error: %s() No machine found for oid =% %s,%dx\n",__func__, machinenumber, __FILE__, __LINE__);
777       return NULL;
778     }
779     objcopy = getRemoteObj(machinenumber, oid);
780 #ifdef TRANSSTATS
781       LOGEVENT('R');
782       nRemoteSend++;
783 #endif
784
785     if(objcopy == NULL) {
786       printf("Error: Object %u not found in Remote location %s, %d\n", oid,__FILE__, __LINE__);
787       return NULL;
788     } else {
789 #ifdef COMPILER
790 #ifdef CACHE
791       LOGOIDTYPE("RR",oid, TYPE(objcopy),myrdtsc());
792       LOGTIME('r', oid, TYPE(objcopy),myrdtsc(),0);
793       //Copy object to prefetch cache
794       pthread_mutex_lock(&prefetchcache_mutex);
795       objheader_t *headerObj;
796       int size;
797       GETSIZE(size, objcopy);
798       if((headerObj = prefetchobjstrAlloc(size+sizeof(objheader_t))) == NULL) {
799         printf("%s(): Error in getting memory from prefetch cache at %s, %d\n", __func__,
800             __FILE__, __LINE__);
801         pthread_mutex_unlock(&prefetchcache_mutex);
802         return NULL;
803       }
804       pthread_mutex_unlock(&prefetchcache_mutex);
805       memcpy(headerObj, objcopy, size+sizeof(objheader_t));
806       //make an entry in prefetch lookup hashtable
807       prehashInsert(oid, headerObj);
808       LOGEVENT('B');
809 #endif
810       return &objcopy[1];
811 #else
812       return objcopy;
813 #endif
814     }
815   }
816 }
817
818 /* This function creates objects in the transaction record */
819 objheader_t *transCreateObj(unsigned int size) {
820   objheader_t *tmp = (objheader_t *) objstrAlloc(&t_cache, (sizeof(objheader_t) + size));
821   OID(tmp) = getNewOID();
822   tmp->version = 1;
823   tmp->rcount = 1;
824   STATUS(tmp) = NEW;
825   t_chashInsert(OID(tmp), tmp);
826
827 #ifdef COMPILER
828   return &tmp[1]; //want space after object header
829 #else
830   return tmp;
831 #endif
832 }
833
834 #if 1
835 /* This function creates machine piles based on all machines involved in a
836  * transaction commit request */
837 plistnode_t *createPiles() {
838   int i;
839   plistnode_t *pile = NULL;
840   unsigned int machinenum;
841   objheader_t *headeraddr;
842   chashlistnode_t * ptr = c_table;
843   /* Represents number of bins in the chash table */
844   unsigned int size = c_size;
845
846   for(i = 0; i < size ; i++) {
847     chashlistnode_t * curr = &ptr[i];
848     /* Inner loop to traverse the linked list of the cache lookupTable */
849     while(curr != NULL) {
850       //if the first bin in hash table is empty
851       if(curr->key == 0)
852         break;
853       headeraddr=(objheader_t *) curr->val;
854
855       //Get machine location for object id (and whether local or not)
856       if (STATUS(headeraddr) & NEW || (mhashSearch(curr->key) != NULL)) {
857         machinenum = myIpAddr;
858       } else if ((machinenum = lhashSearch(curr->key)) == 0) {
859         printf("Error: No such machine %s, %d\n", __FILE__, __LINE__);
860         return NULL;
861       }
862
863       //Make machine groups
864       pile = pInsert(pile, headeraddr, machinenum, c_numelements);
865       curr = curr->next;
866     }
867   }
868   return pile;
869 }
870 #else
871 /* This function creates machine piles based on all machines involved in a
872  * transaction commit request */
873 plistnode_t *createPiles() {
874   int i;
875   plistnode_t *pile = NULL;
876   unsigned int machinenum;
877   objheader_t *headeraddr;
878   struct chashentry * ptr = c_table;
879   /* Represents number of bins in the chash table */
880   unsigned int size = c_size;
881
882   for(i = 0; i < size ; i++) {
883     struct chashentry * curr = & ptr[i];
884     /* Inner loop to traverse the linked list of the cache lookupTable */
885     //if the first bin in hash table is empty
886     if(curr->key == 0)
887       continue;
888     headeraddr=(objheader_t *) curr->ptr;
889
890     //Get machine location for object id (and whether local or not)
891     if (STATUS(headeraddr) & NEW || (mhashSearch(curr->key) != NULL)) {
892       machinenum = myIpAddr;
893     } else if ((machinenum = lhashSearch(curr->key)) == 0) {
894       printf("Error: No such machine %s, %d\n", __FILE__, __LINE__);
895       return NULL;
896     }
897
898     //Make machine groups
899     pile = pInsert(pile, headeraddr, machinenum, c_numelements);
900   }
901   return pile;
902 }
903 #endif
904
905 /* This function initiates the transaction commit process
906  * Spawns threads for each of the new connections with Participants
907  * and creates new piles by calling the createPiles(),
908  * Sends a transrequest() to each remote machines for objects found remotely
909  * and calls handleLocalReq() to process objects found locally */
910 int transCommit() {
911   unsigned int tot_bytes_mod, *listmid;
912   plistnode_t *pile, *pile_ptr;
913   char treplyretry; /* keeps track of the common response that needs to be sent */
914   int firsttime=1;
915   trans_commit_data_t transinfo; /* keeps track of objs locked during transaction */
916   char finalResponse;
917 #ifdef SANDBOX
918   abortenabled=0;
919 #endif
920
921 #ifdef LOGEVENTS
922   int iii;
923   for(iii=0;iii<bigindex;iii++) {
924     printf("%c", bigarray[iii]);
925   }
926 #endif
927
928 #ifdef LOGTIMES
929   int jjj;
930   for(jjj=0; jjj<bigindex1; jjj++) {
931     printf("[%c %u %u %lld %d]\n", bigarray1[jjj], bigarray2[jjj], bigarray3[jjj], bigarray4[jjj], bigarray5[jjj]);
932   }
933 #endif
934
935 #ifdef ABORTREADERS
936   if (t_abort) {
937     //abort this transaction
938     removetransactionhash();
939     objstrDelete(t_cache);
940     t_chashDelete();
941     return 1;
942   }
943 #endif
944
945
946   do {
947     treplyretry = 0;
948
949     /* Look through all the objects in the transaction record and make piles
950      * for each machine involved in the transaction*/
951     if (firsttime) {
952       pile_ptr = pile = createPiles();
953       pile_ptr = pile = sortPiles(pile);
954     } else {
955       pile = pile_ptr;
956     }
957     firsttime = 0;
958     /* Create the packet to be sent in TRANS_REQUEST */
959
960     /* Count the number of participants */
961     int pilecount;
962     pilecount = pCount(pile);
963
964     /* Create a list of machine ids(Participants) involved in transaction   */
965     listmid = calloc(pilecount, sizeof(unsigned int));
966     pListMid(pile, listmid);
967
968     /* Create a socket and getReplyCtrl array, initialize */
969     int socklist[pilecount];
970     char getReplyCtrl[pilecount];
971     int loopcount;
972     for(loopcount = 0 ; loopcount < pilecount; loopcount++){
973       socklist[loopcount] = 0;
974       getReplyCtrl[loopcount] = 0;
975     }
976
977     /* Process each machine pile */
978     int sockindex = 0;
979     trans_req_data_t *tosend;
980     tosend = calloc(pilecount, sizeof(trans_req_data_t));
981     while(pile != NULL) {
982       tosend[sockindex].f.control = TRANS_REQUEST;
983       tosend[sockindex].f.mcount = pilecount;
984       tosend[sockindex].f.numread = pile->numread;
985       tosend[sockindex].f.nummod = pile->nummod;
986       tosend[sockindex].f.numcreated = pile->numcreated;
987       tosend[sockindex].f.sum_bytes = pile->sum_bytes;
988       tosend[sockindex].listmid = listmid;
989       tosend[sockindex].objread = pile->objread;
990       tosend[sockindex].oidmod = pile->oidmod;
991       tosend[sockindex].oidcreated = pile->oidcreated;
992       int sd = 0;
993       if(pile->mid != myIpAddr) {
994         if((sd = getSock2WithLock(transRequestSockPool, pile->mid)) < 0) {
995           printf("transRequest(): socket create error\n");
996           free(listmid);
997           free(tosend);
998           return 1;
999         }
1000         socklist[sockindex] = sd;
1001         /* Send bytes of data with TRANS_REQUEST control message */
1002         send_data(sd, &(tosend[sockindex].f), sizeof(fixed_data_t));
1003
1004         /* Send list of machines involved in the transaction */
1005         {
1006           int size=sizeof(unsigned int)*(tosend[sockindex].f.mcount);
1007           send_data(sd, tosend[sockindex].listmid, size);
1008         }
1009
1010         /* Send oids and version number tuples for objects that are read */
1011         {
1012           int size=(sizeof(unsigned int)+sizeof(unsigned short))*(tosend[sockindex].f.numread);
1013           send_data(sd, tosend[sockindex].objread, size);
1014         }
1015
1016         /* Send objects that are modified */
1017         void *modptr;
1018         if((modptr = calloc(1, tosend[sockindex].f.sum_bytes)) == NULL) {
1019           printf("Calloc error for modified objects %s, %d\n", __FILE__, __LINE__);
1020           free(listmid);
1021           free(tosend);
1022           return 1;
1023         }
1024         int offset = 0;
1025         int i;
1026         for(i = 0; i < tosend[sockindex].f.nummod ; i++) {
1027           int size;
1028           objheader_t *headeraddr;
1029           if((headeraddr = t_chashSearch(tosend[sockindex].oidmod[i])) == NULL) {
1030             printf("%s() Error: No such oid %s, %d\n", __func__, __FILE__, __LINE__);
1031             free(modptr);
1032             free(listmid);
1033             free(tosend);
1034             return 1;
1035           }
1036           GETSIZE(size,headeraddr);
1037           size+=sizeof(objheader_t);
1038           memcpy(modptr+offset, headeraddr, size);
1039           offset+=size;
1040         }
1041         send_data(sd, modptr, tosend[sockindex].f.sum_bytes);
1042         free(modptr);
1043       } else { //handle request locally
1044         handleLocalReq(&tosend[sockindex], &transinfo, &getReplyCtrl[sockindex]);
1045       }
1046       sockindex++;
1047       pile = pile->next;
1048     } //end of pile processing
1049       /* Recv Ctrl msgs from all machines */
1050     int i;
1051     for(i = 0; i < pilecount; i++) {
1052       int sd = socklist[i];
1053       if(sd != 0) {
1054         char control;
1055         recv_data(sd, &control, sizeof(char));
1056         //Update common data structure with new ctrl msg
1057         getReplyCtrl[i] = control;
1058         /* Recv Objects if participant sends TRANS_DISAGREE */
1059 #ifdef CACHE
1060         if(control == TRANS_DISAGREE) {
1061           int length;
1062           recv_data(sd, &length, sizeof(int));
1063           void *newAddr;
1064           pthread_mutex_lock(&prefetchcache_mutex);
1065           if ((newAddr = prefetchobjstrAlloc((unsigned int)length)) == NULL) {
1066             printf("Error: %s() objstrAlloc error for copying into prefetch cache %s, %d\n", __func__, __FILE__, __LINE__);
1067             free(tosend);
1068             free(listmid);
1069             pthread_mutex_unlock(&prefetchcache_mutex);
1070             return 1;
1071           }
1072           pthread_mutex_unlock(&prefetchcache_mutex);
1073           recv_data(sd, newAddr, length);
1074           int offset = 0;
1075           while(length != 0) {
1076             unsigned int oidToPrefetch;
1077             objheader_t * header;
1078             header = (objheader_t *)(((char *)newAddr) + offset);
1079             oidToPrefetch = OID(header);
1080             STATUS(header)=0;
1081             int size = 0;
1082             GETSIZE(size, header);
1083             size += sizeof(objheader_t);
1084             //make an entry in prefetch hash table
1085         prehashInsert(oidToPrefetch, header);
1086         LOGEVENT('E');
1087             length = length - size;
1088             offset += size;
1089           }
1090         } //end of receiving objs
1091 #endif
1092       }
1093     }
1094     /* Decide the final response */
1095     if((finalResponse = decideResponse(getReplyCtrl, &treplyretry, pilecount)) == 0) {
1096       printf("Error: %s() in updating prefetch cache %s, %d\n", __func__, __FILE__, __LINE__);
1097       free(tosend);
1098       free(listmid);
1099       return 1;
1100     }
1101
1102     /* Send responses to all machines */
1103     for(i = 0; i < pilecount; i++) {
1104       int sd = socklist[i];
1105       if(sd != 0) {
1106 #ifdef CACHE
1107         if(finalResponse == TRANS_COMMIT) {
1108           int retval;
1109           /* Update prefetch cache */
1110           if((retval = updatePrefetchCache(&(tosend[i]))) != 0) {
1111             printf("Error: %s() in updating prefetch cache %s, %d\n", __func__, __FILE__, __LINE__);
1112             free(tosend);
1113             free(listmid);
1114             return 1;
1115           }
1116
1117
1118           /* Invalidate objects in other machine cache */
1119           if(tosend[i].f.nummod > 0) {
1120             if((retval = invalidateObj(&(tosend[i]))) != 0) {
1121               printf("Error: %s() in invalidating Objects %s, %d\n", __func__, __FILE__, __LINE__);
1122               free(tosend);
1123               free(listmid);
1124               return 1;
1125             }
1126           }
1127 #ifdef ABORTREADERS
1128           removetransaction(tosend[i].oidmod,tosend[i].f.nummod);
1129           removethisreadtransaction(tosend[i].objread, tosend[i].f.numread);
1130 #endif
1131         }
1132 #ifdef ABORTREADERS
1133         else if (!treplyretry) {
1134           removethistransaction(tosend[i].oidmod,tosend[i].f.nummod);
1135           removethisreadtransaction(tosend[i].objread,tosend[i].f.numread);
1136         }
1137 #endif
1138 #endif
1139         send_data(sd, &finalResponse, sizeof(char));
1140       } else {
1141         /* Complete local processing */
1142         doLocalProcess(finalResponse, &(tosend[i]), &transinfo);
1143 #ifdef ABORTREADERS
1144         if(finalResponse == TRANS_COMMIT) {
1145           removetransaction(tosend[i].oidmod,tosend[i].f.nummod);
1146           removethisreadtransaction(tosend[i].objread,tosend[i].f.numread);
1147         } else if (!treplyretry) {
1148           removethistransaction(tosend[i].oidmod,tosend[i].f.nummod);
1149           removethisreadtransaction(tosend[i].objread,tosend[i].f.numread);
1150         }
1151 #endif
1152       }
1153     }
1154
1155     /* Free resources */
1156     free(tosend);
1157     free(listmid);
1158     if (!treplyretry)
1159       pDelete(pile_ptr);
1160     /* wait a random amount of time before retrying to commit transaction*/
1161     if(treplyretry) {
1162       randomdelay();
1163 #ifdef TRANSSTATS
1164       nSoftAbort++;
1165 #endif
1166     }
1167     /* Retry trans commit procedure during soft_abort case */
1168   } while (treplyretry);
1169
1170   if(finalResponse == TRANS_ABORT) {
1171 #ifdef TRANSSTATS
1172     LOGEVENT('A');
1173     numTransAbort++;
1174 #endif
1175     /* Free Resources */
1176     objstrDelete(t_cache);
1177     t_chashDelete();
1178 #ifdef SANDBOX
1179       abortenabled=1;
1180 #endif
1181     return TRANS_ABORT;
1182   } else if(finalResponse == TRANS_COMMIT) {
1183 #ifdef TRANSSTATS
1184     LOGEVENT('C');
1185     numTransCommit++;
1186 #endif
1187     /* Free Resources */
1188     objstrDelete(t_cache);
1189     t_chashDelete();
1190     return 0;
1191   } else {
1192     //TODO Add other cases
1193     printf("Error: in %s() THIS SHOULD NOT HAPPEN.....EXIT PROGRAM\n", __func__);
1194     exit(-1);
1195   }
1196   return 0;
1197 }
1198
1199 /* This function handles the local objects involved in a transaction
1200  * commiting process.  It also makes a decision if this local machine
1201  * sends AGREE or DISAGREE or SOFT_ABORT to coordinator */
1202 void handleLocalReq(trans_req_data_t *tdata, trans_commit_data_t *transinfo, char *getReplyCtrl) {
1203   unsigned int *oidnotfound = NULL, *oidlocked = NULL;
1204   int numoidnotfound = 0, numoidlocked = 0;
1205   int v_nomatch = 0, v_matchlock = 0, v_matchnolock = 0;
1206   int numread, i;
1207   unsigned int oid;
1208   unsigned short version;
1209
1210   /* Counters and arrays to formulate decision on control message to be sent */
1211   oidnotfound = (unsigned int *) calloc((tdata->f.numread + tdata->f.nummod), sizeof(unsigned int));
1212   oidlocked = (unsigned int *) calloc((tdata->f.numread + tdata->f.nummod +1), sizeof(unsigned int)); // calloc additional 1 byte for
1213   //setting a divider between read and write locks
1214   numread = tdata->f.numread;
1215   /* Process each oid in the machine pile/ group per thread */
1216   for (i = 0; i < tdata->f.numread + tdata->f.nummod; i++) {
1217     if (i < tdata->f.numread) {
1218       int incr = sizeof(unsigned int) + sizeof(unsigned short); // Offset that points to next position in the objread array
1219       incr *= i;
1220       oid = *((unsigned int *)(((char *)tdata->objread) + incr));
1221       version = *((unsigned short *)(((char *)tdata->objread) + incr + sizeof(unsigned int)));
1222       commitCountForObjRead(getReplyCtrl, oidnotfound, oidlocked, &numoidnotfound, &numoidlocked, &v_nomatch, &v_matchlock, &v_matchnolock, oid, version);
1223     } else { // Objects Modified
1224       if(i == tdata->f.numread) {
1225         oidlocked[numoidlocked++] = -1;
1226       }
1227       int tmpsize;
1228       objheader_t *headptr;
1229       headptr = (objheader_t *) t_chashSearch(tdata->oidmod[i-numread]);
1230       if (headptr == NULL) {
1231         printf("Error: handleLocalReq() returning NULL, no such oid %s, %d\n", __FILE__, __LINE__);
1232         return;
1233       }
1234       oid = OID(headptr);
1235       version = headptr->version;
1236       commitCountForObjMod(getReplyCtrl, oidnotfound, oidlocked, &numoidnotfound, &numoidlocked, &v_nomatch, &v_matchlock, &v_matchnolock, oid, version);
1237     }
1238   }
1239
1240 /* Fill out the trans_commit_data_t data structure. This is required for a trans commit process
1241  * if Participant receives a TRANS_COMMIT */
1242   transinfo->objlocked = oidlocked;
1243   transinfo->objnotfound = oidnotfound;
1244   transinfo->modptr = NULL;
1245   transinfo->numlocked = numoidlocked;
1246   transinfo->numnotfound = numoidnotfound;
1247
1248   /* Condition to send TRANS_AGREE */
1249   if(v_matchnolock == tdata->f.numread + tdata->f.nummod) {
1250     *getReplyCtrl = TRANS_AGREE;
1251   }
1252   /* Condition to send TRANS_SOFT_ABORT */
1253   if((v_matchlock > 0 && v_nomatch == 0) || (numoidnotfound > 0 && v_nomatch == 0)) {
1254     *getReplyCtrl = TRANS_SOFT_ABORT;
1255   }
1256 }
1257
1258 void doLocalProcess(char finalResponse, trans_req_data_t *tdata, trans_commit_data_t *transinfo) {
1259   if(finalResponse == TRANS_ABORT) {
1260     if(transAbortProcess(transinfo) != 0) {
1261       printf("Error in transAbortProcess() %s,%d\n", __FILE__, __LINE__);
1262       fflush(stdout);
1263       return;
1264     }
1265   } else if(finalResponse == TRANS_COMMIT) {
1266 #ifdef CACHE
1267     /* Invalidate objects in other machine cache */
1268     if(tdata->f.nummod > 0) {
1269       int retval;
1270       if((retval = invalidateObj(tdata)) != 0) {
1271         printf("Error: %s() in invalidating Objects %s, %d\n", __func__, __FILE__, __LINE__);
1272         return;
1273       }
1274     }
1275 #endif
1276     if(transComProcess(tdata, transinfo) != 0) {
1277       printf("Error in transComProcess() %s,%d\n", __FILE__, __LINE__);
1278       fflush(stdout);
1279       return;
1280     }
1281   } else {
1282     printf("ERROR...No Decision\n");
1283   }
1284
1285   /* Free memory */
1286   if (transinfo->objlocked != NULL) {
1287     free(transinfo->objlocked);
1288   }
1289   if (transinfo->objnotfound != NULL) {
1290     free(transinfo->objnotfound);
1291   }
1292 }
1293
1294 /* This function decides the reponse that needs to be sent to
1295  * all Participant machines after the TRANS_REQUEST protocol */
1296 char decideResponse(char *getReplyCtrl, char *treplyretry, int pilecount) {
1297   int i, transagree = 0, transdisagree = 0, transsoftabort = 0; /* Counters to formulate decision of what
1298                                                                    message to send */
1299   for (i = 0 ; i < pilecount; i++) {
1300     char control;
1301     control = getReplyCtrl[i];
1302     switch(control) {
1303     default:
1304       printf("Participant sent unknown message %d in %s, %d\n", control, __FILE__, __LINE__);
1305
1306       /* treat as disagree, pass thru */
1307     case TRANS_DISAGREE:
1308       transdisagree++;
1309       break;
1310
1311     case TRANS_AGREE:
1312       transagree++;
1313       break;
1314
1315     case TRANS_SOFT_ABORT:
1316       transsoftabort++;
1317       break;
1318     }
1319   }
1320
1321   if(transdisagree > 0) {
1322     /* Send Abort */
1323     *treplyretry = 0;
1324     return TRANS_ABORT;
1325 #ifdef CACHE
1326     /* clear objects from prefetch cache */
1327     //cleanPCache();
1328 #endif
1329   } else if(transagree == pilecount) {
1330     /* Send Commit */
1331     *treplyretry = 0;
1332     return TRANS_COMMIT;
1333   } else {
1334     /* Send Abort in soft abort case followed by retry commiting transaction again*/
1335     *treplyretry = 1;
1336     return TRANS_ABORT;
1337   }
1338   return 0;
1339 }
1340
1341 /* This function opens a connection, places an object read request to
1342  * the remote machine, reads the control message and object if
1343  * available and copies the object and its header to the local
1344  * cache. */
1345
1346 void *getRemoteObj(unsigned int mnum, unsigned int oid) {
1347   int size, val;
1348   struct sockaddr_in serv_addr;
1349   char machineip[16];
1350   char control;
1351   objheader_t *h;
1352   void *objcopy = NULL;
1353
1354   int sd = getSock2(transReadSockPool, mnum);
1355   char readrequest[sizeof(char)+sizeof(unsigned int)];
1356   readrequest[0] = READ_REQUEST;
1357   *((unsigned int *)(&readrequest[1])) = oid;
1358   send_data(sd, readrequest, sizeof(readrequest));
1359
1360   /* Read response from the Participant */
1361   recv_data(sd, &control, sizeof(char));
1362
1363   if (control==OBJECT_NOT_FOUND) {
1364     objcopy = NULL;
1365   } else {
1366     /* Read object if found into local cache */
1367     recv_data(sd, &size, sizeof(int));
1368     objcopy = objstrAlloc(&t_cache, size);
1369     recv_data(sd, objcopy, size);
1370     STATUS(objcopy)=0;
1371     /* Insert into cache's lookup table */
1372     t_chashInsert(oid, objcopy);
1373 #ifdef TRANSSTATS
1374     totalObjSize += size;
1375 #endif
1376   }
1377
1378   return objcopy;
1379 }
1380
1381 /*  Commit info for objects modified */
1382 void commitCountForObjMod(char *getReplyCtrl, unsigned int *oidnotfound, unsigned int *oidlocked, int *numoidnotfound,
1383                           int *numoidlocked, int *v_nomatch, int *v_matchlock, int *v_matchnolock, unsigned int oid, unsigned short version) {
1384   void *mobj;
1385   /* Check if object is still present in the machine since the beginning of TRANS_REQUEST */
1386   /* Save the oids not found and number of oids not found for later use */
1387   if ((mobj = mhashSearch(oid)) == NULL) { /* Obj not found */
1388     /* Save the oids not found and number of oids not found for later use */
1389     oidnotfound[*numoidnotfound] = oid;
1390     (*numoidnotfound)++;
1391   } else { /* If Obj found in machine (i.e. has not moved) */
1392     /* Check if Obj is locked by any previous transaction */
1393     if (write_trylock(STATUSPTR(mobj))) { // Can acquire write lock
1394       if (version == ((objheader_t *)mobj)->version) {      /* match versions */
1395         (*v_matchnolock)++;
1396         //Keep track of what is locked
1397         oidlocked[(*numoidlocked)++] = OID(((objheader_t *)mobj));
1398       } else { /* If versions don't match ...HARD ABORT */
1399         (*v_nomatch)++;
1400         /* Send TRANS_DISAGREE to Coordinator */
1401         *getReplyCtrl = TRANS_DISAGREE;
1402
1403         //Keep track of what is locked
1404         oidlocked[(*numoidlocked)++] = OID(((objheader_t *)mobj));
1405         return;
1406       }
1407     } else { //A lock is acquired some place else
1408       if (version == ((objheader_t *)mobj)->version) { /* Check if versions match */
1409         (*v_matchlock)++;
1410       } else { /* If versions don't match ...HARD ABORT */
1411         (*v_nomatch)++;
1412         /* Send TRANS_DISAGREE to Coordinator */
1413         *getReplyCtrl = TRANS_DISAGREE;
1414         return;
1415       }
1416     }
1417   }
1418 }
1419
1420 /*  Commit info for objects modified */
1421 void commitCountForObjRead(char *getReplyCtrl, unsigned int *oidnotfound, unsigned int *oidlocked, int *numoidnotfound,
1422                            int *numoidlocked, int *v_nomatch, int *v_matchlock, int *v_matchnolock, unsigned int oid, unsigned short version) {
1423   void *mobj;
1424   /* Check if object is still present in the machine since the beginning of TRANS_REQUEST */
1425   /* Save the oids not found and number of oids not found for later use */
1426   if ((mobj = mhashSearch(oid)) == NULL) { /* Obj not found */
1427     /* Save the oids not found and number of oids not found for later use */
1428     oidnotfound[*numoidnotfound] = oid;
1429     (*numoidnotfound)++;
1430   } else { /* If Obj found in machine (i.e. has not moved) */
1431     /* Check if Obj is locked by any previous transaction */
1432     if (read_trylock(STATUSPTR(mobj))) { // Can further acquire read locks
1433       if (version == ((objheader_t *)mobj)->version) {      /* If locked then match versions */
1434         (*v_matchnolock)++;
1435         //Keep track of what is locked
1436         oidlocked[(*numoidlocked)++] = OID(((objheader_t *)mobj));
1437       } else { /* If versions don't match ...HARD ABORT */
1438         (*v_nomatch)++;
1439         /* Send TRANS_DISAGREE to Coordinator */
1440         *getReplyCtrl = TRANS_DISAGREE;
1441         //Keep track of what is locked
1442         oidlocked[(*numoidlocked)++] = OID(((objheader_t *)mobj));
1443         return;
1444       }
1445     } else { //Has reached max number of readers or some other transaction
1446       //has acquired a lock on this object
1447       if (version == ((objheader_t *)mobj)->version) { /* Check if versions match */
1448         (*v_matchlock)++;
1449       } else { /* If versions don't match ...HARD ABORT */
1450         (*v_nomatch)++;
1451         /* Send TRANS_DISAGREE to Coordinator */
1452         *getReplyCtrl = TRANS_DISAGREE;
1453         return;
1454       }
1455     }
1456   }
1457 }
1458
1459 /* This function completes the ABORT process if the transaction is aborting */
1460 int transAbortProcess(trans_commit_data_t *transinfo) {
1461   int i, numlocked;
1462   unsigned int *objlocked;
1463   void *header;
1464
1465   numlocked = transinfo->numlocked;
1466   objlocked = transinfo->objlocked;
1467
1468   int useWriteUnlock = 0;
1469   for (i = 0; i < numlocked; i++) {
1470     if(objlocked[i] == -1) {
1471       useWriteUnlock = 1;
1472       continue;
1473     }
1474     if((header = mhashSearch(objlocked[i])) == NULL) {
1475       printf("mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__);
1476       return 1;
1477     }
1478     if(!useWriteUnlock) {
1479       read_unlock(STATUSPTR(header));
1480     } else {
1481       write_unlock(STATUSPTR(header));
1482     }
1483   }
1484
1485   return 0;
1486 }
1487
1488 /*This function completes the COMMIT process if the transaction is commiting*/
1489 int transComProcess(trans_req_data_t *tdata, trans_commit_data_t *transinfo) {
1490   objheader_t *header, *tcptr;
1491   int i, nummod, tmpsize, numcreated, numlocked;
1492   unsigned int *oidmod, *oidcreated, *oidlocked;
1493   void *ptrcreate;
1494
1495   nummod = tdata->f.nummod;
1496   oidmod = tdata->oidmod;
1497   numcreated = tdata->f.numcreated;
1498   oidcreated = tdata->oidcreated;
1499   numlocked = transinfo->numlocked;
1500   oidlocked = transinfo->objlocked;
1501
1502   for (i = 0; i < nummod; i++) {
1503     if((header = (objheader_t *) mhashSearch(oidmod[i])) == NULL) {
1504       printf("Error: transComProcess() mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__);
1505       return 1;
1506     }
1507     /* Copy from transaction cache -> main object store */
1508     if ((tcptr = ((objheader_t *) t_chashSearch(oidmod[i]))) == NULL) {
1509       printf("Error: transComProcess() chashSearch returned NULL at %s, %d\n", __FILE__, __LINE__);
1510       return 1;
1511     }
1512     GETSIZE(tmpsize, header);
1513     char *tmptcptr = (char *) tcptr;
1514     {
1515       struct ___Object___ *dst=(struct ___Object___*)((char*)header+sizeof(objheader_t));
1516       struct ___Object___ *src=(struct ___Object___*)((char*)tmptcptr+sizeof(objheader_t));
1517       dst->___cachedCode___=src->___cachedCode___;
1518       dst->___cachedHash___=src->___cachedHash___;
1519
1520       memcpy(&dst[1], &src[1], tmpsize-sizeof(struct ___Object___));
1521     }
1522
1523     header->version += 1;
1524     if(header->notifylist != NULL) {
1525       notifyAll(&header->notifylist, OID(header), header->version);
1526     }
1527   }
1528   /* If object is newly created inside transaction then commit it */
1529   for (i = 0; i < numcreated; i++) {
1530     if ((header = ((objheader_t *) t_chashSearch(oidcreated[i]))) == NULL) {
1531       printf("Error: transComProcess() chashSearch returned NULL for oid = %x at %s, %d\n", oidcreated[i], __FILE__, __LINE__);
1532       return 1;
1533     }
1534     GETSIZE(tmpsize, header);
1535     tmpsize += sizeof(objheader_t);
1536     pthread_mutex_lock(&mainobjstore_mutex);
1537     if ((ptrcreate = objstrAlloc(&mainobjstore, tmpsize)) == NULL) {
1538       printf("Error: transComProcess() failed objstrAlloc %s, %d\n", __FILE__, __LINE__);
1539       pthread_mutex_unlock(&mainobjstore_mutex);
1540       return 1;
1541     }
1542     pthread_mutex_unlock(&mainobjstore_mutex);
1543     /* Initialize read and write locks */
1544     initdsmlocks(STATUSPTR(header));
1545     memcpy(ptrcreate, header, tmpsize);
1546     mhashInsert(oidcreated[i], ptrcreate);
1547     lhashInsert(oidcreated[i], myIpAddr);
1548   }
1549   /* Unlock locked objects */
1550   int useWriteUnlock = 0;
1551   for(i = 0; i < numlocked; i++) {
1552     if(oidlocked[i] == -1) {
1553       useWriteUnlock = 1;
1554       continue;
1555     }
1556     if((header = (objheader_t *) mhashSearch(oidlocked[i])) == NULL) {
1557       printf("mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__);
1558       return 1;
1559     }
1560     if(!useWriteUnlock) {
1561       read_unlock(STATUSPTR(header));
1562     } else {
1563       write_unlock(STATUSPTR(header));
1564     }
1565   }
1566   return 0;
1567 }
1568
1569 prefetchpile_t *foundLocal(char *ptr, int numprefetches, int mysiteid) {
1570   int i;
1571   int j;
1572   prefetchpile_t * head=NULL;
1573
1574   for(j=0;j<numprefetches;j++) {
1575     int siteid = *(GET_SITEID(ptr));
1576     int ntuples = *(GET_NTUPLES(ptr));
1577     unsigned int * oidarray = GET_PTR_OID(ptr);
1578     unsigned short * endoffsets = GET_PTR_EOFF(ptr, ntuples);
1579     short * arryfields = GET_PTR_ARRYFLD(ptr, ntuples);
1580     int numLocal = 0;
1581     
1582     for(i=0; i<ntuples; i++) {
1583       unsigned short baseindex=(i==0) ? 0 : endoffsets[i-1];
1584       unsigned short endindex=endoffsets[i];
1585       unsigned int oid=oidarray[i];
1586       int newbase;
1587       int machinenum;
1588       int countInvalidObj=0;
1589
1590       if (oid==0) {
1591         numLocal++;
1592         continue;
1593       }
1594       //Look up fields locally
1595       int isLastOffset=0;
1596       if(endindex==0)
1597           isLastOffset=1;
1598       for(newbase=baseindex; newbase<endindex; newbase++) {
1599         if(newbase==(endindex-1))
1600           isLastOffset=1;
1601         if (!lookupObject(&oid,arryfields[newbase],&countInvalidObj)) {
1602           break;
1603         }
1604         //Ended in a null pointer...
1605         if (oid==0) {
1606           numLocal++;
1607           goto tuple;
1608         }
1609       }
1610
1611       //Entire prefetch is local
1612       if (newbase==endindex&&checkoid(oid,isLastOffset)) {
1613         numLocal++;
1614         goto tuple;
1615       }
1616
1617       //Add to remote requests
1618       machinenum=lhashSearch(oid);
1619       insertPile(machinenum, oid, siteid,endindex-newbase, &arryfields[newbase], &head);
1620     tuple:
1621       ;
1622     }
1623     
1624     /* handle dynamic prefetching */
1625     handleDynPrefetching(numLocal, ntuples, siteid);
1626     ptr=((char *)&arryfields[endoffsets[ntuples-1]])+sizeof(int);
1627   }
1628
1629   return head;
1630 }
1631
1632 int checkoid(unsigned int oid, int isLastOffset) {
1633   objheader_t *header;
1634   if ((header=mhashSearch(oid))!=NULL) {
1635     //Found on machine
1636     return 1;
1637   } else if ((header=prehashSearch(oid))!=NULL) {
1638     //if the last offset then prefetch object
1639     if((STATUS(header) & DIRTY) && isLastOffset) {
1640       return 0;
1641     }
1642     //Found in cache
1643     return 1;
1644   } else {
1645     return 0;
1646   }
1647 }
1648
1649 int lookupObject(unsigned int * oid, short offset, int *countInvalidObj) {
1650   objheader_t *header;
1651   if ((header=mhashSearch(*oid))!=NULL) {
1652     //Found on machine
1653     ;
1654   } else if ((header=prehashSearch(*oid))!=NULL) {
1655     //Found in cache
1656     if(STATUS(header) & DIRTY) {//Read an oid that is an old entry in the cache;
1657       //only once because later old entries may still cause unnecessary roundtrips during prefetching
1658       (*countInvalidObj)+=1;
1659       if(*countInvalidObj > 1) {
1660         return 0;
1661       }
1662     }
1663   } else {
1664     return 0;
1665   }
1666
1667   if(TYPE(header) >= NUMCLASSES) {
1668     int elementsize = classsize[TYPE(header)];
1669     struct ArrayObject *ao = (struct ArrayObject *) (((char *)header) + sizeof(objheader_t));
1670     int length = ao->___length___;
1671     /* Check if array out of bounds */
1672     if(offset < 0 || offset >= length) {
1673       //if yes treat the object as found
1674       (*oid)=0;
1675       return 1;
1676     }
1677     (*oid) = *((unsigned int *)(((char *)ao) + sizeof(struct ArrayObject) + (elementsize*offset)));
1678     return 1;
1679   } else {
1680     (*oid) = *((unsigned int *)(((char *)header) + sizeof(objheader_t) + offset));
1681     return 1;
1682   }
1683 }
1684
1685
1686 /* This function is called by the thread calling transPrefetch */
1687 void *transPrefetch(void *t) {
1688   while(1) {
1689     /* read from prefetch queue */
1690     void *node=gettail();
1691     /* Check if the tuples are found locally, if yes then reduce them further*/
1692     /* and group requests by remote machine ids by calling the makePreGroups() */
1693     int count=numavailable();
1694     prefetchpile_t *pilehead = foundLocal(node, count, 0);
1695
1696     if (pilehead!=NULL) {
1697       // Get sock from shared pool
1698
1699       /* Send  Prefetch Request */
1700       prefetchpile_t *ptr = pilehead;
1701       while(ptr != NULL) {
1702         globalid++;
1703         int sd = getSock2(transPrefetchSockPool, ptr->mid);
1704         sendPrefetchReq(ptr, sd,globalid);
1705         ptr = ptr->next;
1706       }
1707
1708       /* Release socket */
1709       //        freeSock(transPrefetchSockPool, pilehead->mid, sd);
1710
1711       /* Deallocated pilehead */
1712       mcdealloc(pilehead);
1713     }
1714     // Deallocate the prefetch queue pile node
1715     incmulttail(count);
1716   }
1717 }
1718
1719 void sendPrefetchReqnew(prefetchpile_t *mcpilenode, int sd) {
1720   objpile_t *tmp;
1721
1722   int size=sizeof(char)+sizeof(int);
1723   for(tmp=mcpilenode->objpiles; tmp!=NULL; tmp=tmp->next) {
1724     size += sizeof(int) + sizeof(unsigned int) + sizeof(unsigned int) + ((tmp->numoffset) * sizeof(short));
1725   }
1726
1727   char buft[size];
1728   char *buf=buft;
1729   *buf=TRANS_PREFETCH;
1730   buf+=sizeof(char);
1731
1732   for(tmp=mcpilenode->objpiles; tmp!=NULL; tmp=tmp->next) {
1733     int len = sizeof(int) + sizeof(unsigned int) + sizeof(unsigned int) + ((tmp->numoffset) * sizeof(short));
1734     *((int*)buf)=len;
1735     buf+=sizeof(int);
1736     *((unsigned int *)buf)=tmp->oid;
1737     buf+=sizeof(unsigned int);
1738     *((unsigned int *)(buf)) = myIpAddr;
1739     buf+=sizeof(unsigned int);
1740     memcpy(buf, tmp->offset, tmp->numoffset*sizeof(short));
1741     buf+=tmp->numoffset*sizeof(short);
1742   }
1743   *((int *)buf)=-1;
1744   send_data(sd, buft, size);
1745   return;
1746 }
1747
1748 /**
1749  * parameters: mcpilenode -> pile node to traverse to assemble pref requests
1750  * sd -> socket id
1751  * gid -> global identifier for each prefetch request sent, starts with 0
1752  **/
1753 void sendPrefetchReq(prefetchpile_t *mcpilenode, int sd, int gid) {
1754   int len, endpair;
1755   char control;
1756   objpile_t *tmp;
1757   struct writestruct writebuffer;
1758   writebuffer.offset=0;
1759
1760
1761   /* Send TRANS_PREFETCH control message */
1762   int first=1;
1763
1764   /* Send Oids and offsets in pairs */
1765   tmp = mcpilenode->objpiles;
1766   while(tmp != NULL) {
1767     len = sizeof(int)+sizeof(int) + sizeof(unsigned int) + sizeof(unsigned int) + ((tmp->numoffset) * sizeof(short));
1768     char oidnoffset[len+5];
1769     char *buf=oidnoffset;
1770     if (first) {
1771       *buf=TRANS_PREFETCH;
1772       buf++;len++;
1773       first=0;
1774     }
1775     *((int*)buf) = tmp->numoffset;
1776     buf+=sizeof(int);
1777     *((unsigned int *)buf) = tmp->oid;
1778     LOGOIDTYPE("S",tmp->oid,tmp->numoffset,myrdtsc());
1779 #ifdef TRANSSTATS
1780     sendRemoteReq++;
1781 #endif
1782     buf+=sizeof(unsigned int);
1783     *((unsigned int *)buf) = myIpAddr;
1784     buf+= sizeof(unsigned int);
1785     *((int*)buf) = gid;
1786     buf+=sizeof(int);
1787     memcpy(buf, tmp->offset, (tmp->numoffset)*sizeof(short));
1788     tmp = tmp->next;
1789     if (tmp==NULL) {
1790       *((int *)(&oidnoffset[len]))=-1;
1791       len+=sizeof(int);
1792     }
1793     if (tmp!=NULL)
1794       send_buf(sd, &writebuffer, oidnoffset, len);
1795     else
1796       forcesend_buf(sd, &writebuffer, oidnoffset, len);
1797   }
1798   LOGOIDTYPE("SREQ",0,0,myrdtsc());
1799   LOGEVENT('S');
1800   LOGTIME('S',0,0,myrdtsc(),gid); //after sending
1801   return;
1802 }
1803
1804 int getPrefetchResponse(int sd, struct readstruct *readbuffer) {
1805   int gid,length = 0, size = 0;
1806   char control;
1807   unsigned int oid;
1808   void *modptr, *oldptr;
1809
1810   recv_data_buf(sd, readbuffer, &length, sizeof(int));
1811   size = length - sizeof(int);
1812   char recvbuffer[size];
1813 #ifdef TRANSSTATS
1814   getResponse++;
1815   LOGEVENT('Z');
1816   LOGTIME('K',0,0, myrdtsc(),0); //log time after first recv
1817 #endif
1818   recv_data_buf(sd, readbuffer, recvbuffer, size);
1819   control = *((char *) recvbuffer);
1820   if(control == OBJECT_FOUND) {
1821     oid = *((unsigned int *)(recvbuffer + sizeof(char)));
1822     gid = *((int *) (recvbuffer+sizeof(char)+sizeof(unsigned int)));
1823     LOGTIME('G',oid,0, myrdtsc(),gid); //log time after first recv
1824     size = size - (sizeof(char) + sizeof(unsigned int) + sizeof(int));
1825     pthread_mutex_lock(&prefetchcache_mutex);
1826     if ((modptr = prefetchobjstrAlloc(size)) == NULL) {
1827       printf("Error: objstrAlloc error for copying into prefetch cache %s, %d\n", __FILE__, __LINE__);
1828       pthread_mutex_unlock(&prefetchcache_mutex);
1829       return -1;
1830     }
1831     pthread_mutex_unlock(&prefetchcache_mutex);
1832     memcpy(modptr, recvbuffer + sizeof(char) + sizeof(unsigned int)+sizeof(int), size);
1833     STATUS(modptr)=0;
1834
1835
1836     /* Insert the oid and its address into the prefetch hash lookup table */
1837     /* Do a version comparison if the oid exists */
1838     if((oldptr = prehashSearch(oid)) != NULL) {
1839       /* If older version then update with new object ptr */
1840       if(((objheader_t *)oldptr)->version < ((objheader_t *)modptr)->version) {
1841         prehashInsert(oid, modptr);
1842       }
1843     } else { /* Else add the object ptr to hash table*/
1844       prehashInsert(oid, modptr);
1845     }
1846     LOGOIDTYPE("GR",oid, TYPE(modptr),myrdtsc());
1847     LOGTIME('Z',oid, TYPE(modptr), myrdtsc(),gid); //log time after copying it into the prefetch cache
1848   } else if(control == OBJECT_NOT_FOUND) {
1849     oid = *((unsigned int *)(recvbuffer + sizeof(char)));
1850     gid = *((int *) (recvbuffer+sizeof(char)+sizeof(unsigned int)));
1851     LOGOIDTYPE("NF",oid,0,myrdtsc());
1852     LOGTIME('F',oid, 0, myrdtsc(),gid); //log time after copying it into the prefetch cache
1853     /* TODO: For each object not found query DHT for new location and retrieve the object */
1854     /* Throw an error */
1855     //printf("OBJECT %x NOT FOUND.... THIS SHOULD NOT HAPPEN...TERMINATE PROGRAM\n", oid);
1856     //    exit(-1);
1857   } else {
1858     printf("Error: in decoding the control value %d, %s, %d\n",control, __FILE__, __LINE__);
1859   }
1860
1861   return 0;
1862 }
1863
1864 unsigned short getObjType(unsigned int oid) {
1865   objheader_t *objheader;
1866   unsigned short numoffset[] ={0};
1867   short fieldoffset[] ={};
1868
1869   if ((objheader = (objheader_t *) mhashSearch(oid)) == NULL) {
1870 #ifdef CACHE
1871     if ((objheader = (objheader_t *) prehashSearch(oid)) == NULL) {
1872 #endif
1873     unsigned int mid = lhashSearch(oid);
1874     int sd = getSock2(transReadSockPool, mid);
1875     char remotereadrequest[sizeof(char)+sizeof(unsigned int)];
1876     remotereadrequest[0] = READ_REQUEST;
1877     *((unsigned int *)(&remotereadrequest[1])) = oid;
1878     send_data(sd, remotereadrequest, sizeof(remotereadrequest));
1879
1880     /* Read response from the Participant */
1881     char control;
1882     recv_data(sd, &control, sizeof(char));
1883
1884     if (control==OBJECT_NOT_FOUND) {
1885       printf("Error: in %s() THIS SHOULD NOT HAPPEN.....EXIT PROGRAM\n", __func__);
1886       fflush(stdout);
1887       exit(-1);
1888     } else {
1889       /* Read object if found into local cache */
1890       int size;
1891       recv_data(sd, &size, sizeof(int));
1892 #ifdef CACHE
1893       pthread_mutex_lock(&prefetchcache_mutex);
1894       if ((objheader = prefetchobjstrAlloc(size)) == NULL) {
1895         printf("Error: %s() objstrAlloc error for copying into prefetch cache %s, %d\n", __func__, __FILE__, __LINE__);
1896         pthread_exit(NULL);
1897       }
1898       pthread_mutex_unlock(&prefetchcache_mutex);
1899       recv_data(sd, objheader, size);
1900       prehashInsert(oid, objheader);
1901       return TYPE(objheader);
1902 #else
1903       char *buffer;
1904       if((buffer = calloc(1, size)) == NULL) {
1905         printf("%s() Calloc Error %s at line %d\n", __func__, __FILE__, __LINE__);
1906         fflush(stdout);
1907         return 0;
1908       }
1909       recv_data(sd, buffer, size);
1910       objheader = (objheader_t *)buffer;
1911       unsigned short type = TYPE(objheader);
1912       free(buffer);
1913       return type;
1914 #endif
1915     }
1916 #ifdef CACHE
1917   }
1918 #endif
1919   }
1920   return TYPE(objheader);
1921 }
1922
1923 int startRemoteThread(unsigned int oid, unsigned int mid) {
1924   int sock;
1925   struct sockaddr_in remoteAddr;
1926   char msg[1 + sizeof(unsigned int)];
1927   int bytesSent;
1928   int status;
1929
1930   if ((sock = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
1931     perror("startRemoteThread():socket()");
1932     return -1;
1933   }
1934
1935   bzero(&remoteAddr, sizeof(remoteAddr));
1936   remoteAddr.sin_family = AF_INET;
1937   remoteAddr.sin_port = htons(LISTEN_PORT);
1938   remoteAddr.sin_addr.s_addr = htonl(mid);
1939
1940   if (connect(sock, (struct sockaddr *)&remoteAddr, sizeof(remoteAddr)) < 0) {
1941     printf("startRemoteThread():error %d connecting to %s:%d\n", errno,
1942            inet_ntoa(remoteAddr.sin_addr), LISTEN_PORT);
1943     status = -1;
1944   } else
1945   {
1946     msg[0] = START_REMOTE_THREAD;
1947     *((unsigned int *) &msg[1]) = oid;
1948     send_data(sock, msg, 1 + sizeof(unsigned int));
1949   }
1950
1951   close(sock);
1952   return status;
1953 }
1954
1955 //TODO: when reusing oids, make sure they are not already in use!
1956 static unsigned int id = 0xFFFFFFFF;
1957 unsigned int getNewOID(void) {
1958   id += 2;
1959   if (id > oidMax || id < oidMin) {
1960     id = (oidMin | 1);
1961   }
1962   return id;
1963 }
1964
1965 int processConfigFile() {
1966   FILE *configFile;
1967   const int maxLineLength = 200;
1968   char lineBuffer[maxLineLength];
1969   char *token;
1970   const char *delimiters = " \t\n";
1971   char *commentBegin;
1972   in_addr_t tmpAddr;
1973
1974   configFile = fopen(CONFIG_FILENAME, "r");
1975   if (configFile == NULL) {
1976     printf("error opening %s:\n", CONFIG_FILENAME);
1977     perror("");
1978     return -1;
1979   }
1980
1981   numHostsInSystem = 0;
1982   sizeOfHostArray = 8;
1983   hostIpAddrs = calloc(sizeOfHostArray, sizeof(unsigned int));
1984
1985   while(fgets(lineBuffer, maxLineLength, configFile) != NULL) {
1986     commentBegin = strchr(lineBuffer, '#');
1987     if (commentBegin != NULL)
1988       *commentBegin = '\0';
1989     token = strtok(lineBuffer, delimiters);
1990     while (token != NULL) {
1991       tmpAddr = inet_addr(token);
1992       if ((int)tmpAddr == -1) {
1993         printf("error in %s: bad token:%s\n", CONFIG_FILENAME, token);
1994         fclose(configFile);
1995         return -1;
1996       } else
1997         addHost(htonl(tmpAddr));
1998       token = strtok(NULL, delimiters);
1999     }
2000   }
2001
2002   fclose(configFile);
2003
2004   if (numHostsInSystem < 1) {
2005     printf("error in %s: no IP Adresses found\n", CONFIG_FILENAME);
2006     return -1;
2007   }
2008 #ifdef MAC
2009   myIpAddr = getMyIpAddr("en1");
2010 #else
2011   myIpAddr = getMyIpAddr("eth0");
2012 #endif
2013   myIndexInHostArray = findHost(myIpAddr);
2014   if (myIndexInHostArray == -1) {
2015     printf("error in %s: IP Address of eth0 not found\n", CONFIG_FILENAME);
2016     return -1;
2017   }
2018   oidsPerBlock = (0xFFFFFFFF / numHostsInSystem) + 1;
2019   oidMin = oidsPerBlock * myIndexInHostArray;
2020   if (myIndexInHostArray == numHostsInSystem - 1)
2021     oidMax = 0xFFFFFFFF;
2022   else
2023     oidMax = oidsPerBlock * (myIndexInHostArray + 1) - 1;
2024
2025   return 0;
2026 }
2027
2028 void addHost(unsigned int hostIp) {
2029   unsigned int *tmpArray;
2030
2031   if (findHost(hostIp) != -1)
2032     return;
2033
2034   if (numHostsInSystem == sizeOfHostArray) {
2035     tmpArray = calloc(sizeOfHostArray * 2, sizeof(unsigned int));
2036     memcpy(tmpArray, hostIpAddrs, sizeof(unsigned int) * numHostsInSystem);
2037     free(hostIpAddrs);
2038     hostIpAddrs = tmpArray;
2039   }
2040
2041   hostIpAddrs[numHostsInSystem++] = hostIp;
2042
2043   return;
2044 }
2045
2046 int findHost(unsigned int hostIp) {
2047   int i;
2048   for (i = 0; i < numHostsInSystem; i++)
2049     if (hostIpAddrs[i] == hostIp)
2050       return i;
2051
2052   //not found
2053   return -1;
2054 }
2055
2056 /* This function sends notification request per thread waiting on object(s) whose version
2057  * changes */
2058 int reqNotify(unsigned int *oidarry, unsigned short *versionarry, unsigned int numoid) {
2059   int sock,i;
2060   objheader_t *objheader;
2061   struct sockaddr_in remoteAddr;
2062   char msg[1 + numoid * (sizeof(unsigned short) + sizeof(unsigned int)) +  3 * sizeof(unsigned int)];
2063   char *ptr;
2064   int bytesSent;
2065   int status, size;
2066   unsigned short version;
2067   unsigned int oid,mid;
2068   static unsigned int threadid = 0;
2069   pthread_mutex_t threadnotify = PTHREAD_MUTEX_INITIALIZER; //Lock and condition var for threadjoin and notification
2070   pthread_cond_t threadcond = PTHREAD_COND_INITIALIZER;
2071   notifydata_t *ndata;
2072
2073   oid = oidarry[0];
2074   if((mid = lhashSearch(oid)) == 0) {
2075     printf("Error: %s() No such machine found for oid =%x\n",__func__, oid);
2076     return;
2077   }
2078
2079   if ((sock = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
2080     perror("reqNotify():socket()");
2081     return -1;
2082   }
2083
2084   bzero(&remoteAddr, sizeof(remoteAddr));
2085   remoteAddr.sin_family = AF_INET;
2086   remoteAddr.sin_port = htons(LISTEN_PORT);
2087   remoteAddr.sin_addr.s_addr = htonl(mid);
2088
2089   /* Generate unique threadid */
2090   threadid++;
2091
2092   /* Save threadid, numoid, oidarray, versionarray, pthread_cond_variable for later processing */
2093   if((ndata = calloc(1, sizeof(notifydata_t))) == NULL) {
2094     printf("Calloc Error %s, %d\n", __FILE__, __LINE__);
2095     return -1;
2096   }
2097   ndata->numoid = numoid;
2098   ndata->threadid = threadid;
2099   ndata->oidarry = oidarry;
2100   ndata->versionarry = versionarry;
2101   ndata->threadcond = threadcond;
2102   ndata->threadnotify = threadnotify;
2103   if((status = notifyhashInsert(threadid, ndata)) != 0) {
2104     printf("reqNotify(): Insert into notify hash table not successful %s, %d\n", __FILE__, __LINE__);
2105     free(ndata);
2106     return -1;
2107   }
2108
2109   /* Send  number of oids, oidarry, version array, machine id and threadid */
2110   if (connect(sock, (struct sockaddr *)&remoteAddr, sizeof(remoteAddr)) < 0) {
2111     printf("reqNotify():error %d connecting to %s:%d\n", errno,
2112            inet_ntoa(remoteAddr.sin_addr), LISTEN_PORT);
2113     free(ndata);
2114     return -1;
2115   } else {
2116     msg[0] = THREAD_NOTIFY_REQUEST;
2117     *((unsigned int *)(&msg[1])) = numoid;
2118     /* Send array of oids  */
2119     size = sizeof(unsigned int);
2120
2121     for(i = 0;i < numoid; i++) {
2122       oid = oidarry[i];
2123       *((unsigned int *)(&msg[1] + size)) = oid;
2124       size += sizeof(unsigned int);
2125     }
2126
2127     /* Send array of version  */
2128     for(i = 0;i < numoid; i++) {
2129       version = versionarry[i];
2130       *((unsigned short *)(&msg[1] + size)) = version;
2131       size += sizeof(unsigned short);
2132     }
2133
2134     *((unsigned int *)(&msg[1] + size)) = myIpAddr; size += sizeof(unsigned int);
2135     *((unsigned int *)(&msg[1] + size)) = threadid;
2136     pthread_mutex_lock(&(ndata->threadnotify));
2137     size = 1 + numoid * (sizeof(unsigned int) + sizeof(unsigned short)) + 3 * sizeof(unsigned int);
2138     send_data(sock, msg, size);
2139     pthread_cond_wait(&(ndata->threadcond), &(ndata->threadnotify));
2140     pthread_mutex_unlock(&(ndata->threadnotify));
2141   }
2142
2143   pthread_cond_destroy(&threadcond);
2144   pthread_mutex_destroy(&threadnotify);
2145   free(ndata);
2146   close(sock);
2147   return status;
2148 }
2149
2150 void threadNotify(unsigned int oid, unsigned short version, unsigned int tid) {
2151   notifydata_t *ndata;
2152   int i, objIsFound = 0, index;
2153   void *ptr;
2154
2155   //Look up the tid and call the corresponding pthread_cond_signal
2156   if((ndata = notifyhashSearch(tid)) == NULL) {
2157     printf("threadnotify(): No such threadid is present %s, %d\n", __FILE__, __LINE__);
2158     return;
2159   } else  {
2160     for(i = 0; i < ndata->numoid; i++) {
2161       if(ndata->oidarry[i] == oid) {
2162         objIsFound = 1;
2163         index = i;
2164       }
2165     }
2166     if(objIsFound == 0) {
2167       printf("threadNotify(): Oid not found %s, %d\n", __FILE__, __LINE__);
2168       return;
2169     } else {
2170       if(version <= ndata->versionarry[index]) {
2171         printf("threadNotify(): New version %d has not changed since last version for oid = %d, %s, %d\n", version, oid, __FILE__, __LINE__);
2172         return;
2173       } else {
2174 #ifdef CACHE
2175         /* Clear from prefetch cache and free thread related data structure */
2176         if((ptr = prehashSearch(oid)) != NULL) {
2177           prehashRemove(oid);
2178         }
2179 #endif
2180         pthread_mutex_lock(&(ndata->threadnotify));
2181         pthread_cond_signal(&(ndata->threadcond));
2182         pthread_mutex_unlock(&(ndata->threadnotify));
2183       }
2184     }
2185   }
2186   return;
2187 }
2188
2189 int notifyAll(threadlist_t **head, unsigned int oid, unsigned int version) {
2190   threadlist_t *ptr;
2191   unsigned int mid;
2192   struct sockaddr_in remoteAddr;
2193   char msg[1 + sizeof(unsigned short) + 2*sizeof(unsigned int)];
2194   int sock, status, size, bytesSent;
2195
2196   while(*head != NULL) {
2197     ptr = *head;
2198     mid = ptr->mid;
2199     //create a socket connection to that machine
2200     if ((sock = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
2201       perror("notifyAll():socket()");
2202       return -1;
2203     }
2204
2205     bzero(&remoteAddr, sizeof(remoteAddr));
2206     remoteAddr.sin_family = AF_INET;
2207     remoteAddr.sin_port = htons(LISTEN_PORT);
2208     remoteAddr.sin_addr.s_addr = htonl(mid);
2209     //send Thread Notify response and threadid to that machine
2210     if (connect(sock, (struct sockaddr *)&remoteAddr, sizeof(remoteAddr)) < 0) {
2211       printf("notifyAll():error %d connecting to %s:%d\n", errno,
2212              inet_ntoa(remoteAddr.sin_addr), LISTEN_PORT);
2213       fflush(stdout);
2214       status = -1;
2215     } else {
2216       bzero(msg, (1+sizeof(unsigned short) + 2*sizeof(unsigned int)));
2217       msg[0] = THREAD_NOTIFY_RESPONSE;
2218       *((unsigned int *)&msg[1]) = oid;
2219       size = sizeof(unsigned int);
2220       *((unsigned short *)(&msg[1]+ size)) = version;
2221       size+= sizeof(unsigned short);
2222       *((unsigned int *)(&msg[1]+ size)) = ptr->threadid;
2223
2224       size = 1 + 2*sizeof(unsigned int) + sizeof(unsigned short);
2225       send_data(sock, msg, size);
2226     }
2227     //close socket
2228     close(sock);
2229     // Update head
2230     *head = ptr->next;
2231     free(ptr);
2232   }
2233   return status;
2234 }
2235
2236 void transAbort() {
2237 #ifdef ABORTREADERS
2238   removetransactionhash();
2239 #endif
2240   objstrDelete(t_cache);
2241   t_chashDelete();
2242 }
2243
2244 /* This function inserts necessary information into
2245  * a machine pile data structure */
2246 plistnode_t *pInsert(plistnode_t *pile, objheader_t *headeraddr, unsigned int mid, int num_objs) {
2247   plistnode_t *ptr, *tmp;
2248   int found = 0, offset = 0;
2249
2250   tmp = pile;
2251   //Add oid into a machine that is already present in the pile linked list structure
2252   while(tmp != NULL) {
2253     if (tmp->mid == mid) {
2254       int tmpsize;
2255
2256       if (STATUS(headeraddr) & NEW) {
2257         tmp->oidcreated[tmp->numcreated] = OID(headeraddr);
2258         tmp->numcreated++;
2259         GETSIZE(tmpsize, headeraddr);
2260         tmp->sum_bytes += sizeof(objheader_t) + tmpsize;
2261       } else if (STATUS(headeraddr) & DIRTY) {
2262         tmp->oidmod[tmp->nummod] = OID(headeraddr);
2263         tmp->nummod++;
2264         GETSIZE(tmpsize, headeraddr);
2265         tmp->sum_bytes += sizeof(objheader_t) + tmpsize;
2266       } else {
2267         offset = (sizeof(unsigned int) + sizeof(short)) * tmp->numread;
2268         *((unsigned int *)(((char *)tmp->objread) + offset))=OID(headeraddr);
2269         offset += sizeof(unsigned int);
2270         *((short *)(((char *)tmp->objread) + offset)) = headeraddr->version;
2271         tmp->numread++;
2272       }
2273       found = 1;
2274       break;
2275     }
2276     tmp = tmp->next;
2277   }
2278   //Add oid for any new machine
2279   if (!found) {
2280     int tmpsize;
2281     if((ptr = pCreate(num_objs)) == NULL) {
2282       return NULL;
2283     }
2284     ptr->mid = mid;
2285     if (STATUS(headeraddr) & NEW) {
2286       ptr->oidcreated[ptr->numcreated] = OID(headeraddr);
2287       ptr->numcreated++;
2288       GETSIZE(tmpsize, headeraddr);
2289       ptr->sum_bytes += sizeof(objheader_t) + tmpsize;
2290     } else if (STATUS(headeraddr) & DIRTY) {
2291       ptr->oidmod[ptr->nummod] = OID(headeraddr);
2292       ptr->nummod++;
2293       GETSIZE(tmpsize, headeraddr);
2294       ptr->sum_bytes += sizeof(objheader_t) + tmpsize;
2295     } else {
2296       *((unsigned int *)ptr->objread)=OID(headeraddr);
2297       offset = sizeof(unsigned int);
2298       *((short *)(((char *)ptr->objread) + offset)) = headeraddr->version;
2299       ptr->numread++;
2300     }
2301     ptr->next = pile;
2302     pile = ptr;
2303   }
2304
2305   /* Clear Flags */
2306   STATUS(headeraddr) =0;
2307
2308
2309   return pile;
2310 }
2311
2312 plistnode_t *sortPiles(plistnode_t *pileptr) {
2313   plistnode_t *head, *ptr, *tail;
2314   head = pileptr;
2315   ptr = pileptr;
2316   /* Get tail pointer */
2317   while(ptr!= NULL) {
2318     tail = ptr;
2319     ptr = ptr->next;
2320   }
2321   ptr = pileptr;
2322   plistnode_t *prev = pileptr;
2323   /* Arrange local machine processing at the end of the pile list */
2324   while(ptr != NULL) {
2325     if(ptr != tail) {
2326       if(ptr->mid == myIpAddr && (prev != pileptr)) {
2327         prev->next = ptr->next;
2328         ptr->next = NULL;
2329         tail->next = ptr;
2330         return pileptr;
2331       }
2332       if((ptr->mid == myIpAddr) && (prev == pileptr)) {
2333         prev = ptr->next;
2334         ptr->next = NULL;
2335         tail->next = ptr;
2336         return prev;
2337       }
2338       prev = ptr;
2339     }
2340     ptr = ptr->next;
2341   }
2342   return pileptr;
2343 }