changes to copy remote objs read into prefetch cache
[IRC.git] / Robust / src / Runtime / DSTM / interface / trans.c
1 #include "dstm.h"
2 #include "ip.h"
3 #include "machinepile.h"
4 #include "mlookup.h"
5 #include "llookup.h"
6 #include "plookup.h"
7 #include "prelookup.h"
8 #include "threadnotify.h"
9 #include "queue.h"
10 #include "addUdpEnhance.h"
11 #include "addPrefetchEnhance.h"
12 #include "gCollect.h"
13 #include "dsmlock.h"
14 #include "prefetch.h"
15 #ifdef COMPILER
16 #include "thread.h"
17 #endif
18 #ifdef ABORTREADERS
19 #include "abortreaders.h"
20 #endif
21 #include "trans.h"
22
23 #define NUM_THREADS 1
24 #define CONFIG_FILENAME "dstm.conf"
25
26 /* Thread transaction variables */
27
28 __thread objstr_t *t_cache;
29 __thread struct ___Object___ *revertlist;
30 #ifdef ABORTREADERS
31 __thread int t_abort;
32 __thread jmp_buf aborttrans;
33 #endif
34
35
36 /* Global Variables */
37 extern int classsize[];
38 pfcstats_t *evalPrefetch;
39 extern int numprefetchsites; //Global variable containing number of prefetch sites
40 extern pthread_mutex_t mainobjstore_mutex; // Mutex to lock main Object store
41 pthread_mutex_t prefetchcache_mutex; // Mutex to lock Prefetch Cache
42 pthread_mutexattr_t prefetchcache_mutex_attr; /* Attribute for lock to make it a recursive lock */
43 extern prehashtable_t pflookup; //Global Prefetch cache's lookup table
44 pthread_t wthreads[NUM_THREADS]; //Worker threads for working on the prefetch queue
45 pthread_t tPrefetch;            /* Primary Prefetch thread that processes the prefetch queue */
46 extern objstr_t *mainobjstore;
47 unsigned int myIpAddr;
48 unsigned int *hostIpAddrs;
49 int sizeOfHostArray;
50 int numHostsInSystem;
51 int myIndexInHostArray;
52 unsigned int oidsPerBlock;
53 unsigned int oidMin;
54 unsigned int oidMax;
55
56 sockPoolHashTable_t *transReadSockPool;
57 sockPoolHashTable_t *transPrefetchSockPool;
58 sockPoolHashTable_t *transRequestSockPool;
59 pthread_mutex_t notifymutex;
60 pthread_mutex_t atomicObjLock;
61
62 /***********************************
63  * Global Variables for statistics
64  **********************************/
65 int numTransCommit = 0;
66 int numTransAbort = 0;
67 int nchashSearch = 0;
68 int nmhashSearch = 0;
69 int nprehashSearch = 0;
70 int nRemoteSend = 0;
71 int nSoftAbort = 0;
72 int bytesSent = 0;
73 int bytesRecv = 0;
74 int totalObjSize = 0;
75 int sendRemoteReq = 0;
76 int getResponse = 0;
77
78 void printhex(unsigned char *, int);
79 plistnode_t *createPiles();
80 plistnode_t *sortPiles(plistnode_t *pileptr);
81
82 #ifdef LOGEVENTS
83 char bigarray[16*1024*1024];
84 int bigindex=0;
85 #define LOGEVENT(x) { \
86     int tmp=bigindex++;                         \
87     bigarray[tmp]=x;                            \
88   }
89 #else
90 #define LOGEVENT(x)
91 #endif
92
93 /*******************************
94 * Send and Recv function calls
95 *******************************/
96 void send_data(int fd, void *buf, int buflen) {
97   char *buffer = (char *)(buf);
98   int size = buflen;
99   int numbytes;
100   while (size > 0) {
101     numbytes = send(fd, buffer, size, MSG_NOSIGNAL);
102     bytesSent = bytesSent + numbytes;
103     if (numbytes == -1) {
104       perror("send");
105       exit(0);
106     }
107     buffer += numbytes;
108     size -= numbytes;
109   }
110 }
111
112 void send_buf(int fd, struct writestruct * sendbuffer, void *buffer, int buflen) {
113   if (buflen+sendbuffer->offset>WMAXBUF) {
114     send_data(fd, sendbuffer->buf, sendbuffer->offset);
115     sendbuffer->offset=0;
116     send_data(fd, buffer, buflen);
117     return;
118   }
119   memcpy(&sendbuffer->buf[sendbuffer->offset], buffer, buflen);
120   sendbuffer->offset+=buflen;
121   if (sendbuffer->offset>WTOP) {
122     send_data(fd, sendbuffer->buf, sendbuffer->offset);
123     sendbuffer->offset=0;
124   }
125 }
126
127 void forcesend_buf(int fd, struct writestruct * sendbuffer, void *buffer, int buflen) {
128   if (buflen+sendbuffer->offset>WMAXBUF) {
129     send_data(fd, sendbuffer->buf, sendbuffer->offset);
130     sendbuffer->offset=0;
131     send_data(fd, buffer, buflen);
132     return;
133   }
134   memcpy(&sendbuffer->buf[sendbuffer->offset], buffer, buflen);
135   sendbuffer->offset+=buflen;
136   send_data(fd, sendbuffer->buf, sendbuffer->offset);
137   sendbuffer->offset=0;
138 }
139
140 int recvw(int fd, void *buf, int len, int flags) {
141   return recv(fd, buf, len, flags);
142 }
143  
144 void recv_data_buf(int fd, struct readstruct * readbuffer, void *buffer, int buflen) {
145   char *buf=(char *)buffer;
146   int numbytes=readbuffer->head-readbuffer->tail;
147   if (numbytes>buflen)
148     numbytes=buflen;
149   if (numbytes>0) {
150     memcpy(buf, &readbuffer->buf[readbuffer->tail], numbytes);
151     readbuffer->tail+=numbytes;
152     buflen-=numbytes;
153     buf+=numbytes;
154   }
155   if (buflen==0) {
156     return;
157   }
158   if (buflen>=MAXBUF) {
159     recv_data(fd, buf, buflen);
160     return;
161   }
162   
163   int maxbuf=MAXBUF;
164   int obufflen=buflen;
165   readbuffer->head=0;
166   
167   while (buflen > 0) {
168     int numbytes = recvw(fd, &readbuffer->buf[readbuffer->head], maxbuf, 0);
169     if (numbytes == -1) {
170       perror("recv");
171       exit(0);
172     }
173     buflen-=numbytes;
174     readbuffer->head+=numbytes;
175     maxbuf-=numbytes;
176   }
177   memcpy(buf,readbuffer->buf,obufflen);
178   readbuffer->tail=obufflen;
179 }
180
181 int recv_data_errorcode_buf(int fd, struct readstruct * readbuffer, void *buffer, int buflen) {
182   char *buf=(char *)buffer;
183   //now tail<=head
184   int numbytes=readbuffer->head-readbuffer->tail;
185   if (numbytes>buflen)
186     numbytes=buflen;
187   if (numbytes>0) {
188     memcpy(buf, &readbuffer->buf[readbuffer->tail], numbytes);
189     readbuffer->tail+=numbytes;
190     buflen-=numbytes;
191     buf+=numbytes;
192   }
193   if (buflen==0)
194     return 1;
195
196   if (buflen>=MAXBUF) {
197     return recv_data_errorcode(fd, buf, buflen);
198   }
199
200   int maxbuf=MAXBUF;
201   int obufflen=buflen;
202   readbuffer->head=0;
203   
204   while (buflen > 0) {
205     int numbytes = recvw(fd, &readbuffer->buf[readbuffer->head], maxbuf, 0);
206     if (numbytes ==0) {
207       return 0;
208     }
209     if (numbytes==-1) {
210       perror("recvbuf");
211       return -1;
212     }
213     buflen-=numbytes;
214     readbuffer->head+=numbytes;
215     maxbuf-=numbytes;
216   }
217   memcpy(buf,readbuffer->buf,obufflen);
218   readbuffer->tail=obufflen;
219   return 1;
220 }
221
222
223 void recv_data(int fd, void *buf, int buflen) {
224   char *buffer = (char *)(buf);
225   int size = buflen;
226   int numbytes;
227   while (size > 0) {
228     numbytes = recvw(fd, buffer, size, 0);
229     bytesRecv = bytesRecv + numbytes;
230     if (numbytes == -1) {
231       perror("recv");
232       exit(0);
233     }
234     buffer += numbytes;
235     size -= numbytes;
236   }
237 }
238
239 int recv_data_errorcode(int fd, void *buf, int buflen) {
240   char *buffer = (char *)(buf);
241   int size = buflen;
242   int numbytes;
243   while (size > 0) {
244     numbytes = recvw(fd, buffer, size, 0);
245     if (numbytes==0)
246       return 0;
247     if (numbytes == -1) {
248       perror("recv");
249       return -1;
250     }
251     buffer += numbytes;
252     size -= numbytes;
253   }
254   return 1;
255 }
256
257 void printhex(unsigned char *ptr, int numBytes) {
258   int i;
259   for (i = 0; i < numBytes; i++) {
260     if (ptr[i] < 16)
261       printf("0%x ", ptr[i]);
262     else
263       printf("%x ", ptr[i]);
264   }
265   printf("\n");
266   return;
267 }
268
269 inline int arrayLength(int *array) {
270   int i;
271   for(i=0 ; array[i] != -1; i++)
272     ;
273   return i;
274 }
275
276 inline int findmax(int *array, int arraylength) {
277   int max, i;
278   max = array[0];
279   for(i = 0; i < arraylength; i++) {
280     if(array[i] > max) {
281       max = array[i];
282     }
283   }
284   return max;
285 }
286
287 //#define INLINEPREFETCH
288 #define PREFTHRESHOLD 4
289
290 /* This function is a prefetch call generated by the compiler that
291  * populates the shared primary prefetch queue*/
292 void prefetch(int siteid, int ntuples, unsigned int *oids, unsigned short *endoffsets, short *arrayfields) {
293   /* Allocate for the queue node*/
294   int qnodesize = 2*sizeof(int) + ntuples * (sizeof(unsigned short) + sizeof(unsigned int)) + endoffsets[ntuples - 1] * sizeof(short);
295   int len;
296 #ifdef INLINEPREFETCH
297   int attempted=0;
298   char *node;
299   do {
300   node=getmemory(qnodesize);
301   if (node==NULL&&attempted)
302     break;
303   if (node!=NULL) {
304 #else
305   char *node=getmemory(qnodesize);
306 #endif
307   int top=endoffsets[ntuples-1];
308
309   if (node==NULL) {
310     LOGEVENT('D');
311     return;
312   }
313   /* Set queue node values */
314
315   /* TODO: Remove this after testing */
316   evalPrefetch[siteid].callcount++;
317
318   *((int *)(node))=siteid;
319   *((int *)(node + sizeof(int))) = ntuples;
320   len = 2*sizeof(int);
321   memcpy(node+len, oids, ntuples*sizeof(unsigned int));
322   memcpy(node+len+ntuples*sizeof(unsigned int), endoffsets, ntuples*sizeof(unsigned short));
323   memcpy(node+len+ntuples*(sizeof(unsigned int)+sizeof(short)), arrayfields, top*sizeof(short));
324
325 #ifdef INLINEPREFETCH
326   movehead(qnodesize);
327   }
328   int numpref=numavailable();
329   attempted=1;
330
331   if (node==NULL && numpref!=0 || numpref==PREFTHRESHOLD) {
332     node=gettail();
333     prefetchpile_t *pilehead = foundLocal(node,numpref);
334     if (pilehead!=NULL) {
335       // Get sock from shared pool
336       
337       /* Send  Prefetch Request */
338       prefetchpile_t *ptr = pilehead;
339       while(ptr != NULL) {
340         int sd = getSock2(transPrefetchSockPool, ptr->mid);
341         sendPrefetchReq(ptr, sd);
342         ptr = ptr->next;
343       }
344       
345       mcdealloc(pilehead);
346       resetqueue();
347     }
348   }//end do prefetch if condition
349   } while(node==NULL);
350 #else
351   /* Lock and insert into primary prefetch queue */
352   movehead(qnodesize);
353 #endif
354 }
355
356 /* This function starts up the transaction runtime. */
357 int dstmStartup(const char * option) {
358   pthread_t thread_Listen, udp_thread_Listen;
359   pthread_attr_t attr;
360   int master=option!=NULL && strcmp(option, "master")==0;
361   int fd;
362   int udpfd;
363
364   if (processConfigFile() != 0)
365     return 0; //TODO: return error value, cause main program to exit
366 #ifdef COMPILER
367   if (!master)
368     threadcount--;
369 #endif
370
371 #ifdef TRANSSTATS
372   printf("Trans stats is on\n");
373   fflush(stdout);
374 #endif
375 #ifdef ABORTREADERS
376   initreaderlist();
377 #endif
378
379   //Initialize socket pool
380   transReadSockPool = createSockPool(transReadSockPool, DEFAULTSOCKPOOLSIZE);
381   transPrefetchSockPool = createSockPool(transPrefetchSockPool, DEFAULTSOCKPOOLSIZE);
382   transRequestSockPool = createSockPool(transRequestSockPool, DEFAULTSOCKPOOLSIZE);
383
384   dstmInit();
385   transInit();
386
387   fd=startlistening();
388   pthread_attr_init(&attr);
389   pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
390 #ifdef CACHE
391   udpfd = udpInit();
392   pthread_create(&udp_thread_Listen, &attr, udpListenBroadcast, (void*)udpfd);
393 #endif
394   if (master) {
395     pthread_create(&thread_Listen, &attr, dstmListen, (void*)fd);
396     return 1;
397   } else {
398     dstmListen((void *)fd);
399     return 0;
400   }
401 }
402
403 //TODO Use this later
404 void *pCacheAlloc(objstr_t *store, unsigned int size) {
405   void *tmp;
406   objstr_t *ptr;
407   ptr = store;
408   int success = 0;
409
410   while(ptr->next != NULL) {
411     /* check if store is empty */
412     if(((unsigned int)ptr->top - (unsigned int)ptr - sizeof(objstr_t) + size) <= ptr->size) {
413       tmp = ptr->top;
414       ptr->top += size;
415       success = 1;
416       return tmp;
417     } else {
418       ptr = ptr->next;
419     }
420   }
421
422   if(success == 0) {
423     return NULL;
424   }
425 }
426
427 /* This function initiates the prefetch thread A queue is shared
428  * between the main thread of execution and the prefetch thread to
429  * process the prefetch call Call from compiler populates the shared
430  * queue with prefetch requests while prefetch thread processes the
431  * prefetch requests */
432
433 void transInit() {
434   //Create and initialize prefetch cache structure
435 #ifdef CACHE
436   initializePCache();
437   if((evalPrefetch = initPrefetchStats()) == NULL) {
438     printf("%s() Error allocating memory at %s, %d\n", __func__, __FILE__, __LINE__);
439     exit(0);
440   }
441 #endif
442
443   /* Initialize attributes for mutex */
444   pthread_mutexattr_init(&prefetchcache_mutex_attr);
445   pthread_mutexattr_settype(&prefetchcache_mutex_attr, PTHREAD_MUTEX_RECURSIVE_NP);
446
447   pthread_mutex_init(&prefetchcache_mutex, &prefetchcache_mutex_attr);
448   pthread_mutex_init(&notifymutex, NULL);
449   pthread_mutex_init(&atomicObjLock, NULL);
450 #ifdef CACHE
451   //Create prefetch cache lookup table
452   if(prehashCreate(PHASH_SIZE, PLOADFACTOR)) {
453     printf("ERROR\n");
454     return; //Failure
455   }
456
457   //Initialize primary shared queue
458   queueInit();
459   //Initialize machine pile w/prefetch oids and offsets shared queue
460   mcpileqInit();
461
462   //Create the primary prefetch thread
463   int retval;
464 #ifdef RANGEPREFETCH
465   do {
466     retval=pthread_create(&tPrefetch, NULL, transPrefetchNew, NULL);
467   } while(retval!=0);
468 #else
469 #ifndef INLINEPREFETCH
470   do {
471     retval=pthread_create(&tPrefetch, NULL, transPrefetch, NULL);
472   } while(retval!=0);
473 #endif
474 #endif
475 #ifndef INLINEPREFETCH
476   pthread_detach(tPrefetch);
477 #endif
478 #endif
479 }
480
481 /* This function stops the threads spawned */
482 void transExit() {
483 #ifdef CACHE
484   int t;
485   pthread_cancel(tPrefetch);
486   for(t = 0; t < NUM_THREADS; t++)
487     pthread_cancel(wthreads[t]);
488 #endif
489
490   return;
491 }
492
493 /* This functions inserts randowm wait delays in the order of msec
494  * Mostly used when transaction commits retry*/
495 void randomdelay() {
496   struct timespec req;
497   time_t t;
498
499   t = time(NULL);
500   req.tv_sec = 0;
501   req.tv_nsec = (long)(1000 + (t%10000)); //1-11 microsec
502   nanosleep(&req, NULL);
503   return;
504 }
505
506 /* This function initializes things required in the transaction start*/
507 void transStart() {
508   t_cache = objstrCreate(1048576);
509   t_chashCreate(CHASH_SIZE, CLOADFACTOR);
510   revertlist=NULL;
511 #ifdef ABORTREADERS
512   t_abort=0;
513 #endif
514 }
515
516 // Search for an address for a given oid                                                                               
517 /*#define INLINE    inline __attribute__((always_inline))
518
519 INLINE void * chashSearchI(chashtable_t *table, unsigned int key) {
520   //REMOVE HASH FUNCTION CALL TO MAKE SURE IT IS INLINED HERE                                                          
521   chashlistnode_t *node = &table->table[(key & table->mask)>>1];
522
523   do {
524     if(node->key == key) {
525       return node->val;
526     }
527     node = node->next;
528   } while(node != NULL);
529
530   return NULL;
531   }*/
532
533
534
535
536 /* This function finds the location of the objects involved in a transaction
537  * and returns the pointer to the object if found in a remote location */
538 __attribute__((pure)) objheader_t *transRead(unsigned int oid) {
539   unsigned int machinenumber;
540   objheader_t *tmp, *objheader;
541   objheader_t *objcopy;
542   int size;
543   void *buf;
544   chashlistnode_t *node;
545
546   if(oid == 0) {
547     return NULL;
548   }
549   
550
551   node= &c_table[(oid & c_mask)>>1];
552   do {
553     if(node->key == oid) {
554 #ifdef TRANSSTATS
555     nchashSearch++;
556 #endif
557 #ifdef COMPILER
558     return &((objheader_t*)node->val)[1];
559 #else
560     return node->val;
561 #endif
562     }
563     node = node->next;
564   } while(node != NULL);
565   
566
567   /*  
568   if((objheader = chashSearchI(record->lookupTable, oid)) != NULL) {
569 #ifdef TRANSSTATS
570     nchashSearch++;
571 #endif
572 #ifdef COMPILER
573     return &objheader[1];
574 #else
575     return objheader;
576 #endif
577   } else 
578   */
579
580 #ifdef ABORTREADERS
581   if (t_abort) {
582     //abort this transaction
583     //printf("ABORTING\n");
584     removetransactionhash();
585     objstrDelete(t_cache);
586     t_chashDelete();
587     _longjmp(aborttrans,1);
588   } else
589     addtransaction(oid);
590 #endif
591
592   if ((objheader = (objheader_t *) mhashSearch(oid)) != NULL) {
593 #ifdef TRANSSTATS
594     nmhashSearch++;
595 #endif
596     /* Look up in machine lookup table  and copy  into cache*/
597     GETSIZE(size, objheader);
598     size += sizeof(objheader_t);
599     objcopy = (objheader_t *) objstrAlloc(&t_cache, size);
600     memcpy(objcopy, objheader, size);
601     /* Insert into cache's lookup table */
602     STATUS(objcopy)=0;
603     t_chashInsert(OID(objheader), objcopy);
604 #ifdef COMPILER
605     return &objcopy[1];
606 #else
607     return objcopy;
608 #endif
609   } else {
610 #ifdef CACHE
611     if((tmp = (objheader_t *) prehashSearch(oid)) != NULL) {
612 #ifdef TRANSSTATS
613       nprehashSearch++;
614 #endif
615       /* Look up in prefetch cache */
616       GETSIZE(size, tmp);
617       size+=sizeof(objheader_t);
618       objcopy = (objheader_t *) objstrAlloc(&t_cache, size);
619       memcpy(objcopy, tmp, size);
620       /* Insert into cache's lookup table */
621       t_chashInsert(OID(tmp), objcopy);
622 #ifdef COMPILER
623       return &objcopy[1];
624 #else
625       return objcopy;
626 #endif
627     }
628 #endif
629     /* Get the object from the remote location */
630     if((machinenumber = lhashSearch(oid)) == 0) {
631       printf("Error: %s() No machine found for oid =% %s,%dx\n",__func__, machinenumber, __FILE__, __LINE__);
632       return NULL;
633     }
634     objcopy = getRemoteObj(machinenumber, oid);
635
636     if(objcopy == NULL) {
637       printf("Error: Object not found in Remote location %s, %d\n", __FILE__, __LINE__);
638       return NULL;
639     } else {
640 #ifdef TRANSSTATS
641       nRemoteSend++;
642 #endif
643 #ifdef COMPILER
644 #ifdef CACHE
645       //Copy object to prefetch cache
646       pthread_mutex_lock(&prefetchcache_mutex);
647       objheader_t *headerObj;
648       int size;
649       GETSIZE(size, objcopy);
650       if((headerObj = prefetchobjstrAlloc(size + sizeof(objheader_t))) == NULL) {
651         printf("%s(): Error in getting memory from prefetch cache at %s, %d\n", __func__,
652             __FILE__, __LINE__);
653         pthread_mutex_unlock(&prefetchcache_mutex);
654         return NULL;
655       }
656       pthread_mutex_unlock(&prefetchcache_mutex);
657       memcpy(headerObj, objcopy, size+sizeof(objheader_t));
658       //make an entry in prefetch lookup hashtable
659       void *oldptr;
660       if((oldptr = prehashSearch(oid)) != NULL) {
661         prehashRemove(oid);
662         prehashInsert(oid, headerObj);
663       } else {
664         prehashInsert(oid, headerObj);
665       }
666 #endif
667       return &objcopy[1];
668 #else
669       return objcopy;
670 #endif
671     }
672   }
673 }
674
675
676 /* This function finds the location of the objects involved in a transaction
677  * and returns the pointer to the object if found in a remote location */
678 __attribute__((pure)) objheader_t *transRead2(unsigned int oid) {
679   unsigned int machinenumber;
680   objheader_t *tmp, *objheader;
681   objheader_t *objcopy;
682   int size;
683
684 #ifdef ABORTREADERS
685   if (t_abort) {
686     //abort this transaction
687     //printf("ABORTING\n");
688     removetransactionhash();
689     objstrDelete(t_cache);
690     t_chashDelete();
691     _longjmp(aborttrans,1);
692   } else
693     addtransaction(oid);
694 #endif
695
696   if ((objheader = (objheader_t *) mhashSearch(oid)) != NULL) {
697 #ifdef TRANSSTATS
698     nmhashSearch++;
699 #endif
700     /* Look up in machine lookup table  and copy  into cache*/
701     GETSIZE(size, objheader);
702     size += sizeof(objheader_t);
703     objcopy = (objheader_t *) objstrAlloc(&t_cache, size);
704     memcpy(objcopy, objheader, size);
705     /* Insert into cache's lookup table */
706     STATUS(objcopy)=0;
707     t_chashInsert(OID(objheader), objcopy);
708 #ifdef COMPILER
709     return &objcopy[1];
710 #else
711     return objcopy;
712 #endif
713   } else {
714 #ifdef CACHE
715     if((tmp = (objheader_t *) prehashSearch(oid)) != NULL) {
716 #ifdef TRANSSTATS
717       LOGEVENT('P')
718       nprehashSearch++;
719 #endif
720       /* Look up in prefetch cache */
721       GETSIZE(size, tmp);
722       size+=sizeof(objheader_t);
723       objcopy = (objheader_t *) objstrAlloc(&t_cache, size);
724       memcpy(objcopy, tmp, size);
725       /* Insert into cache's lookup table */
726       t_chashInsert(OID(tmp), objcopy);
727 #ifdef COMPILER
728       return &objcopy[1];
729 #else
730       return objcopy;
731 #endif
732     }
733 #endif
734     /* Get the object from the remote location */
735     if((machinenumber = lhashSearch(oid)) == 0) {
736       printf("Error: %s() No machine found for oid =% %s,%dx\n",__func__, machinenumber, __FILE__, __LINE__);
737       return NULL;
738     }
739     objcopy = getRemoteObj(machinenumber, oid);
740
741     if(objcopy == NULL) {
742       printf("Error: Object not found in Remote location %s, %d\n", __FILE__, __LINE__);
743       return NULL;
744     } else {
745 #ifdef TRANSSTATS
746
747       LOGEVENT('R');
748       nRemoteSend++;
749 #endif
750 #ifdef COMPILER
751 #ifdef CACHE
752       //Copy object to prefetch cache
753       pthread_mutex_lock(&prefetchcache_mutex);
754       objheader_t *headerObj;
755       int size;
756       GETSIZE(size, objcopy);
757       if((headerObj = prefetchobjstrAlloc(size + sizeof(objheader_t))) == NULL) {
758         printf("%s(): Error in getting memory from prefetch cache at %s, %d\n", __func__,
759             __FILE__, __LINE__);
760         pthread_mutex_unlock(&prefetchcache_mutex);
761         return NULL;
762       }
763       pthread_mutex_unlock(&prefetchcache_mutex);
764       memcpy(headerObj, objcopy, size+sizeof(objheader_t));
765       //make an entry in prefetch lookup hashtable
766       void *oldptr;
767       if((oldptr = prehashSearch(oid)) != NULL) {
768         prehashRemove(oid);
769         prehashInsert(oid, headerObj);
770       } else {
771         prehashInsert(oid, headerObj);
772       }
773 #endif
774
775       return &objcopy[1];
776 #else
777       return objcopy;
778 #endif
779     }
780   }
781 }
782
783 /* This function creates objects in the transaction record */
784 objheader_t *transCreateObj(unsigned int size) {
785   objheader_t *tmp = (objheader_t *) objstrAlloc(&t_cache, (sizeof(objheader_t) + size));
786   OID(tmp) = getNewOID();
787   tmp->version = 1;
788   tmp->rcount = 1;
789   STATUS(tmp) = NEW;
790   t_chashInsert(OID(tmp), tmp);
791
792 #ifdef COMPILER
793   return &tmp[1]; //want space after object header
794 #else
795   return tmp;
796 #endif
797 }
798
799 #if 1
800 /* This function creates machine piles based on all machines involved in a
801  * transaction commit request */
802 plistnode_t *createPiles() {
803   int i;
804   plistnode_t *pile = NULL;
805   unsigned int machinenum;
806   objheader_t *headeraddr;
807   chashlistnode_t * ptr = c_table;
808   /* Represents number of bins in the chash table */
809   unsigned int size = c_size;
810
811   for(i = 0; i < size ; i++) {
812     chashlistnode_t * curr = &ptr[i];
813     /* Inner loop to traverse the linked list of the cache lookupTable */
814     while(curr != NULL) {
815       //if the first bin in hash table is empty
816       if(curr->key == 0)
817         break;
818       headeraddr=(objheader_t *) curr->val;
819
820       //Get machine location for object id (and whether local or not)
821       if (STATUS(headeraddr) & NEW || (mhashSearch(curr->key) != NULL)) {
822         machinenum = myIpAddr;
823       } else if ((machinenum = lhashSearch(curr->key)) == 0) {
824         printf("Error: No such machine %s, %d\n", __FILE__, __LINE__);
825         return NULL;
826       }
827
828       //Make machine groups
829       pile = pInsert(pile, headeraddr, machinenum, c_numelements);
830       curr = curr->next;
831     }
832   }
833   return pile;
834 }
835 #else
836 /* This function creates machine piles based on all machines involved in a
837  * transaction commit request */
838 plistnode_t *createPiles() {
839   int i;
840   plistnode_t *pile = NULL;
841   unsigned int machinenum;
842   objheader_t *headeraddr;
843   struct chashentry * ptr = c_table;
844   /* Represents number of bins in the chash table */
845   unsigned int size = c_size;
846
847   for(i = 0; i < size ; i++) {
848     struct chashentry * curr = & ptr[i];
849     /* Inner loop to traverse the linked list of the cache lookupTable */
850     //if the first bin in hash table is empty
851     if(curr->key == 0)
852       continue;
853     headeraddr=(objheader_t *) curr->ptr;
854
855     //Get machine location for object id (and whether local or not)
856     if (STATUS(headeraddr) & NEW || (mhashSearch(curr->key) != NULL)) {
857       machinenum = myIpAddr;
858     } else if ((machinenum = lhashSearch(curr->key)) == 0) {
859       printf("Error: No such machine %s, %d\n", __FILE__, __LINE__);
860       return NULL;
861     }
862
863     //Make machine groups
864     pile = pInsert(pile, headeraddr, machinenum, c_numelements);
865   }
866   return pile;
867 }
868 #endif
869
870 /* This function initiates the transaction commit process
871  * Spawns threads for each of the new connections with Participants
872  * and creates new piles by calling the createPiles(),
873  * Sends a transrequest() to each remote machines for objects found remotely
874  * and calls handleLocalReq() to process objects found locally */
875 int transCommit() {
876   unsigned int tot_bytes_mod, *listmid;
877   plistnode_t *pile, *pile_ptr;
878   char treplyretry; /* keeps track of the common response that needs to be sent */
879   int firsttime=1;
880   trans_commit_data_t transinfo; /* keeps track of objs locked during transaction */
881   char finalResponse;
882
883 #ifdef LOGEVENTS
884   int iii;
885   for(iii=0;iii<bigindex;iii++) {
886     printf("%c", bigarray[iii]);
887   }
888 #endif
889
890 #ifdef ABORTREADERS
891   if (t_abort) {
892     //abort this transaction
893     /* Debug
894      * printf("ABORTING TRANSACTION AT COMMIT\n");
895      */
896     removetransactionhash();
897     objstrDelete(t_cache);
898     t_chashDelete();
899     return 1;
900   }
901 #endif
902
903
904   do {
905     treplyretry = 0;
906
907     /* Look through all the objects in the transaction record and make piles
908      * for each machine involved in the transaction*/
909     if (firsttime) {
910       pile_ptr = pile = createPiles();
911       pile_ptr = pile = sortPiles(pile);
912     } else {
913       pile = pile_ptr;
914     }
915     firsttime = 0;
916     /* Create the packet to be sent in TRANS_REQUEST */
917
918     /* Count the number of participants */
919     int pilecount;
920     pilecount = pCount(pile);
921
922     /* Create a list of machine ids(Participants) involved in transaction   */
923     listmid = calloc(pilecount, sizeof(unsigned int));
924     pListMid(pile, listmid);
925
926     /* Create a socket and getReplyCtrl array, initialize */
927     int socklist[pilecount];
928     int loopcount;
929     for(loopcount = 0 ; loopcount < pilecount; loopcount++)
930       socklist[loopcount] = 0;
931     char getReplyCtrl[pilecount];
932     for(loopcount = 0 ; loopcount < pilecount; loopcount++)
933       getReplyCtrl[loopcount] = 0;
934
935     /* Process each machine pile */
936     int sockindex = 0;
937     trans_req_data_t *tosend;
938     tosend = calloc(pilecount, sizeof(trans_req_data_t));
939     while(pile != NULL) {
940       tosend[sockindex].f.control = TRANS_REQUEST;
941       tosend[sockindex].f.mcount = pilecount;
942       tosend[sockindex].f.numread = pile->numread;
943       tosend[sockindex].f.nummod = pile->nummod;
944       tosend[sockindex].f.numcreated = pile->numcreated;
945       tosend[sockindex].f.sum_bytes = pile->sum_bytes;
946       tosend[sockindex].listmid = listmid;
947       tosend[sockindex].objread = pile->objread;
948       tosend[sockindex].oidmod = pile->oidmod;
949       tosend[sockindex].oidcreated = pile->oidcreated;
950       int sd = 0;
951       if(pile->mid != myIpAddr) {
952         if((sd = getSock2WithLock(transRequestSockPool, pile->mid)) < 0) {
953           printf("transRequest(): socket create error\n");
954           free(listmid);
955           free(tosend);
956           return 1;
957         }
958         socklist[sockindex] = sd;
959         /* Send bytes of data with TRANS_REQUEST control message */
960         send_data(sd, &(tosend[sockindex].f), sizeof(fixed_data_t));
961
962         /* Send list of machines involved in the transaction */
963         {
964           int size=sizeof(unsigned int)*(tosend[sockindex].f.mcount);
965           send_data(sd, tosend[sockindex].listmid, size);
966         }
967
968         /* Send oids and version number tuples for objects that are read */
969         {
970           int size=(sizeof(unsigned int)+sizeof(unsigned short))*(tosend[sockindex].f.numread);
971           send_data(sd, tosend[sockindex].objread, size);
972         }
973
974         /* Send objects that are modified */
975         void *modptr;
976         if((modptr = calloc(1, tosend[sockindex].f.sum_bytes)) == NULL) {
977           printf("Calloc error for modified objects %s, %d\n", __FILE__, __LINE__);
978           free(listmid);
979           free(tosend);
980           return 1;
981         }
982         int offset = 0;
983         int i;
984         for(i = 0; i < tosend[sockindex].f.nummod ; i++) {
985           int size;
986           objheader_t *headeraddr;
987           if((headeraddr = t_chashSearch(tosend[sockindex].oidmod[i])) == NULL) {
988             printf("%s() Error: No such oid %s, %d\n", __func__, __FILE__, __LINE__);
989             free(modptr);
990             free(listmid);
991             free(tosend);
992             return 1;
993           }
994           GETSIZE(size,headeraddr);
995           size+=sizeof(objheader_t);
996           memcpy(modptr+offset, headeraddr, size);
997           offset+=size;
998         }
999         send_data(sd, modptr, tosend[sockindex].f.sum_bytes);
1000         free(modptr);
1001       } else { //handle request locally
1002         handleLocalReq(&tosend[sockindex], &transinfo, &getReplyCtrl[sockindex]);
1003       }
1004       sockindex++;
1005       pile = pile->next;
1006     } //end of pile processing
1007       /* Recv Ctrl msgs from all machines */
1008     int i;
1009     for(i = 0; i < pilecount; i++) {
1010       int sd = socklist[i];
1011       if(sd != 0) {
1012         char control;
1013         recv_data(sd, &control, sizeof(char));
1014         //Update common data structure with new ctrl msg
1015         getReplyCtrl[i] = control;
1016         /* Recv Objects if participant sends TRANS_DISAGREE */
1017 #ifdef CACHE
1018         if(control == TRANS_DISAGREE) {
1019           int length;
1020           recv_data(sd, &length, sizeof(int));
1021           void *newAddr;
1022           pthread_mutex_lock(&prefetchcache_mutex);
1023           if ((newAddr = prefetchobjstrAlloc((unsigned int)length)) == NULL) {
1024             printf("Error: %s() objstrAlloc error for copying into prefetch cache %s, %d\n", __func__, __FILE__, __LINE__);
1025             free(tosend);
1026             free(listmid);
1027             pthread_mutex_unlock(&prefetchcache_mutex);
1028             return 1;
1029           }
1030           pthread_mutex_unlock(&prefetchcache_mutex);
1031           recv_data(sd, newAddr, length);
1032           int offset = 0;
1033           while(length != 0) {
1034             unsigned int oidToPrefetch;
1035             objheader_t * header;
1036             header = (objheader_t *)(((char *)newAddr) + offset);
1037             oidToPrefetch = OID(header);
1038             STATUS(header)=0;
1039             int size = 0;
1040             GETSIZE(size, header);
1041             size += sizeof(objheader_t);
1042             //make an entry in prefetch hash table
1043             void *oldptr;
1044             if((oldptr = prehashSearch(oidToPrefetch)) != NULL) {
1045               prehashRemove(oidToPrefetch);
1046               prehashInsert(oidToPrefetch, header);
1047             } else {
1048               prehashInsert(oidToPrefetch, header);
1049             }
1050             length = length - size;
1051             offset += size;
1052           }
1053         } //end of receiving objs
1054 #endif
1055       }
1056     }
1057     /* Decide the final response */
1058     if((finalResponse = decideResponse(getReplyCtrl, &treplyretry, pilecount)) == 0) {
1059       printf("Error: %s() in updating prefetch cache %s, %d\n", __func__, __FILE__, __LINE__);
1060       free(tosend);
1061       free(listmid);
1062       return 1;
1063     }
1064
1065     /* Send responses to all machines */
1066     for(i = 0; i < pilecount; i++) {
1067       int sd = socklist[i];
1068       if(sd != 0) {
1069 #ifdef CACHE
1070         if(finalResponse == TRANS_COMMIT) {
1071           int retval;
1072           /* Update prefetch cache */
1073           if((retval = updatePrefetchCache(&(tosend[i]))) != 0) {
1074             printf("Error: %s() in updating prefetch cache %s, %d\n", __func__, __FILE__, __LINE__);
1075             free(tosend);
1076             free(listmid);
1077             return 1;
1078           }
1079
1080
1081           /* Invalidate objects in other machine cache */
1082           if(tosend[i].f.nummod > 0) {
1083             if((retval = invalidateObj(&(tosend[i]))) != 0) {
1084               printf("Error: %s() in invalidating Objects %s, %d\n", __func__, __FILE__, __LINE__);
1085               free(tosend);
1086               free(listmid);
1087               return 1;
1088             }
1089           }
1090 #ifdef ABORTREADERS
1091           removetransaction(tosend[i].oidmod,tosend[i].f.nummod);
1092           removethisreadtransaction(tosend[i].objread, tosend[i].f.numread);
1093 #endif
1094         }
1095 #ifdef ABORTREADERS
1096         else if (!treplyretry) {
1097           removethistransaction(tosend[i].oidmod,tosend[i].f.nummod);
1098           removethisreadtransaction(tosend[i].objread,tosend[i].f.numread);
1099         }
1100 #endif
1101 #endif
1102         send_data(sd, &finalResponse, sizeof(char));
1103       } else {
1104         /* Complete local processing */
1105         doLocalProcess(finalResponse, &(tosend[i]), &transinfo);
1106 #ifdef ABORTREADERS
1107         if(finalResponse == TRANS_COMMIT) {
1108           removetransaction(tosend[i].oidmod,tosend[i].f.nummod);
1109           removethisreadtransaction(tosend[i].objread,tosend[i].f.numread);
1110         } else if (!treplyretry) {
1111           removethistransaction(tosend[i].oidmod,tosend[i].f.nummod);
1112           removethisreadtransaction(tosend[i].objread,tosend[i].f.numread);
1113         }
1114 #endif
1115       }
1116     }
1117
1118     /* Free resources */
1119     free(tosend);
1120     free(listmid);
1121     if (!treplyretry)
1122       pDelete(pile_ptr);
1123     /* wait a random amount of time before retrying to commit transaction*/
1124     if(treplyretry) {
1125       randomdelay();
1126 #ifdef TRANSSTATS
1127       nSoftAbort++;
1128 #endif
1129     }
1130     /* Retry trans commit procedure during soft_abort case */
1131   } while (treplyretry);
1132
1133   if(finalResponse == TRANS_ABORT) {
1134     //printf("Aborting trans\n");
1135 #ifdef TRANSSTATS
1136     numTransAbort++;
1137 #endif
1138     /* Free Resources */
1139     objstrDelete(t_cache);
1140     t_chashDelete();
1141     return TRANS_ABORT;
1142   } else if(finalResponse == TRANS_COMMIT) {
1143 #ifdef TRANSSTATS
1144     numTransCommit++;
1145 #endif
1146     /* Free Resources */
1147     objstrDelete(t_cache);
1148     t_chashDelete();
1149     return 0;
1150   } else {
1151     //TODO Add other cases
1152     printf("Error: in %s() THIS SHOULD NOT HAPPEN.....EXIT PROGRAM\n", __func__);
1153     exit(-1);
1154   }
1155   return 0;
1156 }
1157
1158 /* This function handles the local objects involved in a transaction
1159  * commiting process.  It also makes a decision if this local machine
1160  * sends AGREE or DISAGREE or SOFT_ABORT to coordinator */
1161 void handleLocalReq(trans_req_data_t *tdata, trans_commit_data_t *transinfo, char *getReplyCtrl) {
1162   unsigned int *oidnotfound = NULL, *oidlocked = NULL;
1163   int numoidnotfound = 0, numoidlocked = 0;
1164   int v_nomatch = 0, v_matchlock = 0, v_matchnolock = 0;
1165   int numread, i;
1166   unsigned int oid;
1167   unsigned short version;
1168
1169   /* Counters and arrays to formulate decision on control message to be sent */
1170   oidnotfound = (unsigned int *) calloc((tdata->f.numread + tdata->f.nummod), sizeof(unsigned int));
1171   oidlocked = (unsigned int *) calloc((tdata->f.numread + tdata->f.nummod +1), sizeof(unsigned int)); // calloc additional 1 byte for
1172   //setting a divider between read and write locks
1173   numread = tdata->f.numread;
1174   /* Process each oid in the machine pile/ group per thread */
1175   for (i = 0; i < tdata->f.numread + tdata->f.nummod; i++) {
1176     if (i < tdata->f.numread) {
1177       int incr = sizeof(unsigned int) + sizeof(unsigned short); // Offset that points to next position in the objread array
1178       incr *= i;
1179       oid = *((unsigned int *)(((char *)tdata->objread) + incr));
1180       version = *((unsigned short *)(((char *)tdata->objread) + incr + sizeof(unsigned int)));
1181       commitCountForObjRead(getReplyCtrl, oidnotfound, oidlocked, &numoidnotfound, &numoidlocked, &v_nomatch, &v_matchlock, &v_matchnolock, oid, version);
1182     } else { // Objects Modified
1183       if(i == tdata->f.numread) {
1184         oidlocked[numoidlocked++] = -1;
1185       }
1186       int tmpsize;
1187       objheader_t *headptr;
1188       headptr = (objheader_t *) t_chashSearch(tdata->oidmod[i-numread]);
1189       if (headptr == NULL) {
1190         printf("Error: handleLocalReq() returning NULL, no such oid %s, %d\n", __FILE__, __LINE__);
1191         return;
1192       }
1193       oid = OID(headptr);
1194       version = headptr->version;
1195       commitCountForObjMod(getReplyCtrl, oidnotfound, oidlocked, &numoidnotfound, &numoidlocked, &v_nomatch, &v_matchlock, &v_matchnolock, oid, version);
1196     }
1197   }
1198
1199 /* Fill out the trans_commit_data_t data structure. This is required for a trans commit process
1200  * if Participant receives a TRANS_COMMIT */
1201   transinfo->objlocked = oidlocked;
1202   transinfo->objnotfound = oidnotfound;
1203   transinfo->modptr = NULL;
1204   transinfo->numlocked = numoidlocked;
1205   transinfo->numnotfound = numoidnotfound;
1206
1207   /* Condition to send TRANS_AGREE */
1208   if(v_matchnolock == tdata->f.numread + tdata->f.nummod) {
1209     *getReplyCtrl = TRANS_AGREE;
1210   }
1211   /* Condition to send TRANS_SOFT_ABORT */
1212   if((v_matchlock > 0 && v_nomatch == 0) || (numoidnotfound > 0 && v_nomatch == 0)) {
1213     *getReplyCtrl = TRANS_SOFT_ABORT;
1214   }
1215 }
1216
1217 void doLocalProcess(char finalResponse, trans_req_data_t *tdata, trans_commit_data_t *transinfo) {
1218   if(finalResponse == TRANS_ABORT) {
1219     if(transAbortProcess(transinfo) != 0) {
1220       printf("Error in transAbortProcess() %s,%d\n", __FILE__, __LINE__);
1221       fflush(stdout);
1222       return;
1223     }
1224   } else if(finalResponse == TRANS_COMMIT) {
1225 #ifdef CACHE
1226     /* Invalidate objects in other machine cache */
1227     if(tdata->f.nummod > 0) {
1228       int retval;
1229       if((retval = invalidateObj(tdata)) != 0) {
1230         printf("Error: %s() in invalidating Objects %s, %d\n", __func__, __FILE__, __LINE__);
1231         return;
1232       }
1233     }
1234 #endif
1235     if(transComProcess(tdata, transinfo) != 0) {
1236       printf("Error in transComProcess() %s,%d\n", __FILE__, __LINE__);
1237       fflush(stdout);
1238       return;
1239     }
1240   } else {
1241     printf("ERROR...No Decision\n");
1242   }
1243
1244   /* Free memory */
1245   if (transinfo->objlocked != NULL) {
1246     free(transinfo->objlocked);
1247   }
1248   if (transinfo->objnotfound != NULL) {
1249     free(transinfo->objnotfound);
1250   }
1251 }
1252
1253 /* This function decides the reponse that needs to be sent to
1254  * all Participant machines after the TRANS_REQUEST protocol */
1255 char decideResponse(char *getReplyCtrl, char *treplyretry, int pilecount) {
1256   int i, transagree = 0, transdisagree = 0, transsoftabort = 0; /* Counters to formulate decision of what
1257                                                                    message to send */
1258   for (i = 0 ; i < pilecount; i++) {
1259     char control;
1260     control = getReplyCtrl[i];
1261     switch(control) {
1262     default:
1263       printf("Participant sent unknown message in %s, %d\n", __FILE__, __LINE__);
1264
1265       /* treat as disagree, pass thru */
1266     case TRANS_DISAGREE:
1267       transdisagree++;
1268       break;
1269
1270     case TRANS_AGREE:
1271       transagree++;
1272       break;
1273
1274     case TRANS_SOFT_ABORT:
1275       transsoftabort++;
1276       break;
1277     }
1278   }
1279
1280   if(transdisagree > 0) {
1281     /* Send Abort */
1282     *treplyretry = 0;
1283     return TRANS_ABORT;
1284 #ifdef CACHE
1285     /* clear objects from prefetch cache */
1286     cleanPCache();
1287 #endif
1288   } else if(transagree == pilecount) {
1289     /* Send Commit */
1290     *treplyretry = 0;
1291     return TRANS_COMMIT;
1292   } else {
1293     /* Send Abort in soft abort case followed by retry commiting transaction again*/
1294     *treplyretry = 1;
1295     return TRANS_ABORT;
1296   }
1297   return 0;
1298 }
1299
1300 /* This function opens a connection, places an object read request to
1301  * the remote machine, reads the control message and object if
1302  * available and copies the object and its header to the local
1303  * cache. */
1304
1305 void *getRemoteObj(unsigned int mnum, unsigned int oid) {
1306   int size, val;
1307   struct sockaddr_in serv_addr;
1308   char machineip[16];
1309   char control;
1310   objheader_t *h;
1311   void *objcopy = NULL;
1312
1313   int sd = getSock2(transReadSockPool, mnum);
1314   char readrequest[sizeof(char)+sizeof(unsigned int)];
1315   readrequest[0] = READ_REQUEST;
1316   *((unsigned int *)(&readrequest[1])) = oid;
1317   send_data(sd, readrequest, sizeof(readrequest));
1318
1319   /* Read response from the Participant */
1320   recv_data(sd, &control, sizeof(char));
1321
1322   if (control==OBJECT_NOT_FOUND) {
1323     objcopy = NULL;
1324   } else {
1325     /* Read object if found into local cache */
1326     recv_data(sd, &size, sizeof(int));
1327     objcopy = objstrAlloc(&t_cache, size);
1328     recv_data(sd, objcopy, size);
1329     STATUS(objcopy)=0;
1330     /* Insert into cache's lookup table */
1331     t_chashInsert(oid, objcopy);
1332 #ifdef TRANSSTATS
1333     totalObjSize += size;
1334 #endif
1335   }
1336
1337   return objcopy;
1338 }
1339
1340 /*  Commit info for objects modified */
1341 void commitCountForObjMod(char *getReplyCtrl, unsigned int *oidnotfound, unsigned int *oidlocked, int *numoidnotfound,
1342                           int *numoidlocked, int *v_nomatch, int *v_matchlock, int *v_matchnolock, unsigned int oid, unsigned short version) {
1343   void *mobj;
1344   /* Check if object is still present in the machine since the beginning of TRANS_REQUEST */
1345   /* Save the oids not found and number of oids not found for later use */
1346   if ((mobj = mhashSearch(oid)) == NULL) { /* Obj not found */
1347     /* Save the oids not found and number of oids not found for later use */
1348     oidnotfound[*numoidnotfound] = oid;
1349     (*numoidnotfound)++;
1350   } else { /* If Obj found in machine (i.e. has not moved) */
1351     /* Check if Obj is locked by any previous transaction */
1352     if (write_trylock(STATUSPTR(mobj))) { // Can acquire write lock
1353       if (version == ((objheader_t *)mobj)->version) {      /* match versions */
1354         (*v_matchnolock)++;
1355         //Keep track of what is locked
1356         oidlocked[(*numoidlocked)++] = OID(((objheader_t *)mobj));
1357       } else { /* If versions don't match ...HARD ABORT */
1358         (*v_nomatch)++;
1359         /* Send TRANS_DISAGREE to Coordinator */
1360         *getReplyCtrl = TRANS_DISAGREE;
1361
1362         //Keep track of what is locked
1363         oidlocked[(*numoidlocked)++] = OID(((objheader_t *)mobj));
1364         //printf("%s() oid = %d, type = %d\t", __func__, OID(mobj), TYPE((objheader_t *)mobj));
1365         return;
1366       }
1367     } else { //A lock is acquired some place else
1368       if (version == ((objheader_t *)mobj)->version) { /* Check if versions match */
1369         (*v_matchlock)++;
1370       } else { /* If versions don't match ...HARD ABORT */
1371         (*v_nomatch)++;
1372         /* Send TRANS_DISAGREE to Coordinator */
1373         *getReplyCtrl = TRANS_DISAGREE;
1374         //printf("%s() oid = %d, type = %d\t", __func__, OID(mobj), TYPE((objheader_t *)mobj));
1375         return;
1376       }
1377     }
1378   }
1379 }
1380
1381 /*  Commit info for objects modified */
1382 void commitCountForObjRead(char *getReplyCtrl, unsigned int *oidnotfound, unsigned int *oidlocked, int *numoidnotfound,
1383                            int *numoidlocked, int *v_nomatch, int *v_matchlock, int *v_matchnolock, unsigned int oid, unsigned short version) {
1384   void *mobj;
1385   /* Check if object is still present in the machine since the beginning of TRANS_REQUEST */
1386   /* Save the oids not found and number of oids not found for later use */
1387   if ((mobj = mhashSearch(oid)) == NULL) { /* Obj not found */
1388     /* Save the oids not found and number of oids not found for later use */
1389     oidnotfound[*numoidnotfound] = oid;
1390     (*numoidnotfound)++;
1391   } else { /* If Obj found in machine (i.e. has not moved) */
1392     /* Check if Obj is locked by any previous transaction */
1393     if (read_trylock(STATUSPTR(mobj))) { // Can further acquire read locks
1394       if (version == ((objheader_t *)mobj)->version) {      /* If locked then match versions */
1395         (*v_matchnolock)++;
1396         //Keep track of what is locked
1397         oidlocked[(*numoidlocked)++] = OID(((objheader_t *)mobj));
1398       } else { /* If versions don't match ...HARD ABORT */
1399         (*v_nomatch)++;
1400         /* Send TRANS_DISAGREE to Coordinator */
1401         *getReplyCtrl = TRANS_DISAGREE;
1402         //Keep track of what is locked
1403         oidlocked[(*numoidlocked)++] = OID(((objheader_t *)mobj));
1404         //printf("%s() oid = %d, type = %d\t", __func__, OID(mobj), TYPE((objheader_t *)mobj));
1405         return;
1406       }
1407     } else { //Has reached max number of readers or some other transaction
1408       //has acquired a lock on this object
1409       if (version == ((objheader_t *)mobj)->version) { /* Check if versions match */
1410         (*v_matchlock)++;
1411       } else { /* If versions don't match ...HARD ABORT */
1412         (*v_nomatch)++;
1413         /* Send TRANS_DISAGREE to Coordinator */
1414         *getReplyCtrl = TRANS_DISAGREE;
1415         //printf("%s() oid = %d, type = %d\t", __func__, OID(mobj), TYPE((objheader_t *)mobj));
1416         return;
1417       }
1418     }
1419   }
1420 }
1421
1422 /* This function completes the ABORT process if the transaction is aborting */
1423 int transAbortProcess(trans_commit_data_t *transinfo) {
1424   int i, numlocked;
1425   unsigned int *objlocked;
1426   void *header;
1427
1428   numlocked = transinfo->numlocked;
1429   objlocked = transinfo->objlocked;
1430
1431   int useWriteUnlock = 0;
1432   for (i = 0; i < numlocked; i++) {
1433     if(objlocked[i] == -1) {
1434       useWriteUnlock = 1;
1435       continue;
1436     }
1437     if((header = mhashSearch(objlocked[i])) == NULL) {
1438       printf("mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__);
1439       return 1;
1440     }
1441     if(!useWriteUnlock) {
1442       read_unlock(STATUSPTR(header));
1443     } else {
1444       write_unlock(STATUSPTR(header));
1445     }
1446   }
1447
1448   return 0;
1449 }
1450
1451 /*This function completes the COMMIT process if the transaction is commiting*/
1452 int transComProcess(trans_req_data_t *tdata, trans_commit_data_t *transinfo) {
1453   objheader_t *header, *tcptr;
1454   int i, nummod, tmpsize, numcreated, numlocked;
1455   unsigned int *oidmod, *oidcreated, *oidlocked;
1456   void *ptrcreate;
1457
1458   nummod = tdata->f.nummod;
1459   oidmod = tdata->oidmod;
1460   numcreated = tdata->f.numcreated;
1461   oidcreated = tdata->oidcreated;
1462   numlocked = transinfo->numlocked;
1463   oidlocked = transinfo->objlocked;
1464
1465   for (i = 0; i < nummod; i++) {
1466     if((header = (objheader_t *) mhashSearch(oidmod[i])) == NULL) {
1467       printf("Error: transComProcess() mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__);
1468       return 1;
1469     }
1470     /* Copy from transaction cache -> main object store */
1471     if ((tcptr = ((objheader_t *) t_chashSearch(oidmod[i]))) == NULL) {
1472       printf("Error: transComProcess() chashSearch returned NULL at %s, %d\n", __FILE__, __LINE__);
1473       return 1;
1474     }
1475     GETSIZE(tmpsize, header);
1476     char *tmptcptr = (char *) tcptr;
1477     {
1478       struct ___Object___ *dst=(struct ___Object___*)((char*)header+sizeof(objheader_t));
1479       struct ___Object___ *src=(struct ___Object___*)((char*)tmptcptr+sizeof(objheader_t));
1480       dst->___cachedCode___=src->___cachedCode___;
1481       dst->___cachedHash___=src->___cachedHash___;
1482
1483       memcpy(&dst[1], &src[1], tmpsize-sizeof(struct ___Object___));
1484     }
1485
1486     header->version += 1;
1487     if(header->notifylist != NULL) {
1488       notifyAll(&header->notifylist, OID(header), header->version);
1489     }
1490   }
1491   /* If object is newly created inside transaction then commit it */
1492   for (i = 0; i < numcreated; i++) {
1493     if ((header = ((objheader_t *) t_chashSearch(oidcreated[i]))) == NULL) {
1494       printf("Error: transComProcess() chashSearch returned NULL for oid = %x at %s, %d\n", oidcreated[i], __FILE__, __LINE__);
1495       return 1;
1496     }
1497     GETSIZE(tmpsize, header);
1498     tmpsize += sizeof(objheader_t);
1499     pthread_mutex_lock(&mainobjstore_mutex);
1500     if ((ptrcreate = objstrAlloc(&mainobjstore, tmpsize)) == NULL) {
1501       printf("Error: transComProcess() failed objstrAlloc %s, %d\n", __FILE__, __LINE__);
1502       pthread_mutex_unlock(&mainobjstore_mutex);
1503       return 1;
1504     }
1505     pthread_mutex_unlock(&mainobjstore_mutex);
1506     /* Initialize read and write locks */
1507     initdsmlocks(STATUSPTR(header));
1508     memcpy(ptrcreate, header, tmpsize);
1509     mhashInsert(oidcreated[i], ptrcreate);
1510     lhashInsert(oidcreated[i], myIpAddr);
1511   }
1512   /* Unlock locked objects */
1513   int useWriteUnlock = 0;
1514   for(i = 0; i < numlocked; i++) {
1515     if(oidlocked[i] == -1) {
1516       useWriteUnlock = 1;
1517       continue;
1518     }
1519     if((header = (objheader_t *) mhashSearch(oidlocked[i])) == NULL) {
1520       printf("mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__);
1521       return 1;
1522     }
1523     if(!useWriteUnlock) {
1524       read_unlock(STATUSPTR(header));
1525     } else {
1526       write_unlock(STATUSPTR(header));
1527     }
1528   }
1529   return 0;
1530 }
1531
1532 prefetchpile_t *foundLocal(char *ptr, int numprefetches) {
1533   int i;
1534   int j;
1535   prefetchpile_t * head=NULL;
1536
1537   for(j=0;j<numprefetches;j++) {
1538     int siteid = *(GET_SITEID(ptr));
1539     int ntuples = *(GET_NTUPLES(ptr));
1540     unsigned int * oidarray = GET_PTR_OID(ptr);
1541     unsigned short * endoffsets = GET_PTR_EOFF(ptr, ntuples);
1542     short * arryfields = GET_PTR_ARRYFLD(ptr, ntuples);
1543     int numLocal = 0;
1544     
1545     for(i=0; i<ntuples; i++) {
1546       unsigned short baseindex=(i==0) ? 0 : endoffsets[i-1];
1547       unsigned short endindex=endoffsets[i];
1548       unsigned int oid=oidarray[i];
1549       int newbase;
1550       int machinenum;
1551       
1552       if (oid==0)
1553         continue;
1554       //Look up fields locally
1555       for(newbase=baseindex; newbase<endindex; newbase++) {
1556         if (!lookupObject(&oid, arryfields[newbase]))
1557           break;
1558         //Ended in a null pointer...
1559         if (oid==0)
1560           goto tuple;
1561       }
1562       //Entire prefetch is local
1563       if (newbase==endindex&&checkoid(oid)) {
1564         numLocal++;
1565         goto tuple;
1566       }
1567       //Add to remote requests
1568       machinenum=lhashSearch(oid);
1569       insertPile(machinenum, oid, endindex-newbase, &arryfields[newbase], &head);
1570     tuple:
1571       ;
1572     }
1573     
1574     /* handle dynamic prefetching */
1575     handleDynPrefetching(numLocal, ntuples, siteid);
1576     ptr=((char *)&arryfields[endoffsets[ntuples-1]])+sizeof(int);
1577   }
1578
1579   return head;
1580 }
1581
1582 int checkoid(unsigned int oid) {
1583   objheader_t *header;
1584   if ((header=mhashSearch(oid))!=NULL) {
1585     //Found on machine
1586     return 1;
1587   } else if ((header=prehashSearch(oid))!=NULL) {
1588     //Found in cache
1589     return 1;
1590   } else {
1591     return 0;
1592   }
1593 }
1594
1595 int lookupObject(unsigned int * oid, short offset) {
1596   objheader_t *header;
1597   if ((header=mhashSearch(*oid))!=NULL) {
1598     //Found on machine
1599     ;
1600   } else if ((header=prehashSearch(*oid))!=NULL) {
1601     //Found in cache
1602     ;
1603   } else {
1604     return 0;
1605   }
1606
1607   if(TYPE(header) >= NUMCLASSES) {
1608     int elementsize = classsize[TYPE(header)];
1609     struct ArrayObject *ao = (struct ArrayObject *) (((char *)header) + sizeof(objheader_t));
1610     int length = ao->___length___;
1611     /* Check if array out of bounds */
1612     if(offset < 0 || offset >= length) {
1613       //if yes treat the object as found
1614       (*oid)=0;
1615       return 1;
1616     }
1617     (*oid) = *((unsigned int *)(((char *)ao) + sizeof(struct ArrayObject) + (elementsize*offset)));
1618     return 1;
1619   } else {
1620     (*oid) = *((unsigned int *)(((char *)header) + sizeof(objheader_t) + offset));
1621     return 1;
1622   }
1623 }
1624
1625
1626 /* This function is called by the thread calling transPrefetch */
1627 void *transPrefetch(void *t) {
1628   while(1) {
1629     /* read from prefetch queue */
1630     void *node=gettail();
1631     /* Check if the tuples are found locally, if yes then reduce them further*/
1632     /* and group requests by remote machine ids by calling the makePreGroups() */
1633     int count=numavailable();
1634     prefetchpile_t *pilehead = foundLocal(node, count);
1635
1636     if (pilehead!=NULL) {
1637       // Get sock from shared pool
1638
1639       /* Send  Prefetch Request */
1640       prefetchpile_t *ptr = pilehead;
1641       while(ptr != NULL) {
1642         int sd = getSock2(transPrefetchSockPool, ptr->mid);
1643         sendPrefetchReq(ptr, sd);
1644         ptr = ptr->next;
1645       }
1646
1647       /* Release socket */
1648       //        freeSock(transPrefetchSockPool, pilehead->mid, sd);
1649
1650       /* Deallocated pilehead */
1651       mcdealloc(pilehead);
1652     }
1653     // Deallocate the prefetch queue pile node
1654     incmulttail(count);
1655   }
1656 }
1657
1658 void sendPrefetchReqnew(prefetchpile_t *mcpilenode, int sd) {
1659   objpile_t *tmp;
1660
1661   int size=sizeof(char)+sizeof(int);
1662   for(tmp=mcpilenode->objpiles; tmp!=NULL; tmp=tmp->next) {
1663     size += sizeof(int) + sizeof(unsigned int) + sizeof(unsigned int) + ((tmp->numoffset) * sizeof(short));
1664   }
1665
1666   char buft[size];
1667   char *buf=buft;
1668   *buf=TRANS_PREFETCH;
1669   buf+=sizeof(char);
1670
1671   for(tmp=mcpilenode->objpiles; tmp!=NULL; tmp=tmp->next) {
1672     int len = sizeof(int) + sizeof(unsigned int) + sizeof(unsigned int) + ((tmp->numoffset) * sizeof(short));
1673     *((int*)buf)=len;
1674     buf+=sizeof(int);
1675     *((unsigned int *)buf)=tmp->oid;
1676     buf+=sizeof(unsigned int);
1677     *((unsigned int *)(buf)) = myIpAddr;
1678     buf+=sizeof(unsigned int);
1679     memcpy(buf, tmp->offset, tmp->numoffset*sizeof(short));
1680     buf+=tmp->numoffset*sizeof(short);
1681   }
1682   *((int *)buf)=-1;
1683   send_data(sd, buft, size);
1684   return;
1685 }
1686
1687 void sendPrefetchReq(prefetchpile_t *mcpilenode, int sd) {
1688   int len, endpair;
1689   char control;
1690   objpile_t *tmp;
1691   struct writestruct writebuffer;
1692   writebuffer.offset=0;
1693
1694   /* Send TRANS_PREFETCH control message */
1695   int first=1;
1696
1697   /* Send Oids and offsets in pairs */
1698   tmp = mcpilenode->objpiles;
1699   while(tmp != NULL) {
1700     len = sizeof(int) + sizeof(unsigned int) + sizeof(unsigned int) + ((tmp->numoffset) * sizeof(short));
1701     char oidnoffset[len+5];
1702     char *buf=oidnoffset;
1703     if (first) {
1704       *buf=TRANS_PREFETCH;
1705       buf++;len++;
1706       first=0;
1707     }
1708     *((int*)buf) = tmp->numoffset;
1709     buf+=sizeof(int);
1710     *((unsigned int *)buf) = tmp->oid;
1711 #ifdef TRANSSTATS
1712     sendRemoteReq++;
1713 #endif
1714     buf+=sizeof(unsigned int);
1715     *((unsigned int *)buf) = myIpAddr;
1716     buf += sizeof(unsigned int);
1717     memcpy(buf, tmp->offset, (tmp->numoffset)*sizeof(short));
1718     tmp = tmp->next;
1719     if (tmp==NULL) {
1720       *((int *)(&oidnoffset[len]))=-1;
1721       len+=sizeof(int);
1722     }
1723     if (tmp!=NULL)
1724       send_buf(sd, & writebuffer, oidnoffset, len);
1725     else
1726       forcesend_buf(sd, & writebuffer, oidnoffset, len);
1727   }
1728
1729   LOGEVENT('S');
1730   return;
1731 }
1732
1733 int getPrefetchResponse(int sd, struct readstruct *readbuffer) {
1734   int length = 0, size = 0;
1735   char control;
1736   unsigned int oid;
1737   void *modptr, *oldptr;
1738
1739   recv_data_buf(sd, readbuffer, &length, sizeof(int));
1740   size = length - sizeof(int);
1741   char recvbuffer[size];
1742 #ifdef TRANSSTATS
1743     getResponse++;
1744     LOGEVENT('Z');
1745 #endif
1746     recv_data_buf(sd, readbuffer, recvbuffer, size);
1747   control = *((char *) recvbuffer);
1748   if(control == OBJECT_FOUND) {
1749     oid = *((unsigned int *)(recvbuffer + sizeof(char)));
1750     size = size - (sizeof(char) + sizeof(unsigned int));
1751     pthread_mutex_lock(&prefetchcache_mutex);
1752     if ((modptr = prefetchobjstrAlloc(size)) == NULL) {
1753       printf("Error: objstrAlloc error for copying into prefetch cache %s, %d\n", __FILE__, __LINE__);
1754       pthread_mutex_unlock(&prefetchcache_mutex);
1755       return -1;
1756     }
1757     pthread_mutex_unlock(&prefetchcache_mutex);
1758     memcpy(modptr, recvbuffer + sizeof(char) + sizeof(unsigned int), size);
1759     STATUS(modptr)=0;
1760
1761     /* Insert the oid and its address into the prefetch hash lookup table */
1762     /* Do a version comparison if the oid exists */
1763     if((oldptr = prehashSearch(oid)) != NULL) {
1764       /* If older version then update with new object ptr */
1765       if(((objheader_t *)oldptr)->version <= ((objheader_t *)modptr)->version) {
1766         prehashRemove(oid);
1767         prehashInsert(oid, modptr);
1768       }
1769     } else { /* Else add the object ptr to hash table*/
1770       prehashInsert(oid, modptr);
1771     }
1772     /* Lock the Prefetch Cache look up table*/
1773     pthread_mutex_lock(&pflookup.lock);
1774     /* Broadcast signal on prefetch cache condition variable */
1775     pthread_cond_broadcast(&pflookup.cond);
1776     /* Unlock the Prefetch Cache look up table*/
1777     pthread_mutex_unlock(&pflookup.lock);
1778   } else if(control == OBJECT_NOT_FOUND) {
1779     oid = *((unsigned int *)(recvbuffer + sizeof(char)));
1780     /* TODO: For each object not found query DHT for new location and retrieve the object */
1781     /* Throw an error */
1782     //printf("OBJECT %x NOT FOUND.... THIS SHOULD NOT HAPPEN...TERMINATE PROGRAM\n", oid);
1783     //    exit(-1);
1784   } else {
1785     printf("Error: in decoding the control value %d, %s, %d\n",control, __FILE__, __LINE__);
1786   }
1787
1788   return 0;
1789 }
1790
1791 unsigned short getObjType(unsigned int oid) {
1792   objheader_t *objheader;
1793   unsigned short numoffset[] ={0};
1794   short fieldoffset[] ={};
1795
1796   if ((objheader = (objheader_t *) mhashSearch(oid)) == NULL) {
1797 #ifdef CACHE
1798     if ((objheader = (objheader_t *) prehashSearch(oid)) == NULL) {
1799 #endif
1800     unsigned int mid = lhashSearch(oid);
1801     int sd = getSock2(transReadSockPool, mid);
1802     char remotereadrequest[sizeof(char)+sizeof(unsigned int)];
1803     remotereadrequest[0] = READ_REQUEST;
1804     *((unsigned int *)(&remotereadrequest[1])) = oid;
1805     send_data(sd, remotereadrequest, sizeof(remotereadrequest));
1806
1807     /* Read response from the Participant */
1808     char control;
1809     recv_data(sd, &control, sizeof(char));
1810
1811     if (control==OBJECT_NOT_FOUND) {
1812       printf("Error: in %s() THIS SHOULD NOT HAPPEN.....EXIT PROGRAM\n", __func__);
1813       fflush(stdout);
1814       exit(-1);
1815     } else {
1816       /* Read object if found into local cache */
1817       int size;
1818       recv_data(sd, &size, sizeof(int));
1819 #ifdef CACHE
1820       pthread_mutex_lock(&prefetchcache_mutex);
1821       if ((objheader = prefetchobjstrAlloc(size)) == NULL) {
1822         printf("Error: %s() objstrAlloc error for copying into prefetch cache %s, %d\n", __func__, __FILE__, __LINE__);
1823         pthread_exit(NULL);
1824       }
1825       pthread_mutex_unlock(&prefetchcache_mutex);
1826       recv_data(sd, objheader, size);
1827       prehashInsert(oid, objheader);
1828       return TYPE(objheader);
1829 #else
1830       char *buffer;
1831       if((buffer = calloc(1, size)) == NULL) {
1832         printf("%s() Calloc Error %s at line %d\n", __func__, __FILE__, __LINE__);
1833         fflush(stdout);
1834         return 0;
1835       }
1836       recv_data(sd, buffer, size);
1837       objheader = (objheader_t *)buffer;
1838       unsigned short type = TYPE(objheader);
1839       free(buffer);
1840       return type;
1841 #endif
1842     }
1843 #ifdef CACHE
1844   }
1845 #endif
1846   }
1847   return TYPE(objheader);
1848 }
1849
1850 int startRemoteThread(unsigned int oid, unsigned int mid) {
1851   int sock;
1852   struct sockaddr_in remoteAddr;
1853   char msg[1 + sizeof(unsigned int)];
1854   int bytesSent;
1855   int status;
1856
1857   if ((sock = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
1858     perror("startRemoteThread():socket()");
1859     return -1;
1860   }
1861
1862   bzero(&remoteAddr, sizeof(remoteAddr));
1863   remoteAddr.sin_family = AF_INET;
1864   remoteAddr.sin_port = htons(LISTEN_PORT);
1865   remoteAddr.sin_addr.s_addr = htonl(mid);
1866
1867   if (connect(sock, (struct sockaddr *)&remoteAddr, sizeof(remoteAddr)) < 0) {
1868     printf("startRemoteThread():error %d connecting to %s:%d\n", errno,
1869            inet_ntoa(remoteAddr.sin_addr), LISTEN_PORT);
1870     status = -1;
1871   } else
1872   {
1873     msg[0] = START_REMOTE_THREAD;
1874     *((unsigned int *) &msg[1]) = oid;
1875     send_data(sock, msg, 1 + sizeof(unsigned int));
1876   }
1877
1878   close(sock);
1879   return status;
1880 }
1881
1882 //TODO: when reusing oids, make sure they are not already in use!
1883 static unsigned int id = 0xFFFFFFFF;
1884 unsigned int getNewOID(void) {
1885   id += 2;
1886   if (id > oidMax || id < oidMin) {
1887     id = (oidMin | 1);
1888   }
1889   return id;
1890 }
1891
1892 int processConfigFile() {
1893   FILE *configFile;
1894   const int maxLineLength = 200;
1895   char lineBuffer[maxLineLength];
1896   char *token;
1897   const char *delimiters = " \t\n";
1898   char *commentBegin;
1899   in_addr_t tmpAddr;
1900
1901   configFile = fopen(CONFIG_FILENAME, "r");
1902   if (configFile == NULL) {
1903     printf("error opening %s:\n", CONFIG_FILENAME);
1904     perror("");
1905     return -1;
1906   }
1907
1908   numHostsInSystem = 0;
1909   sizeOfHostArray = 8;
1910   hostIpAddrs = calloc(sizeOfHostArray, sizeof(unsigned int));
1911
1912   while(fgets(lineBuffer, maxLineLength, configFile) != NULL) {
1913     commentBegin = strchr(lineBuffer, '#');
1914     if (commentBegin != NULL)
1915       *commentBegin = '\0';
1916     token = strtok(lineBuffer, delimiters);
1917     while (token != NULL) {
1918       tmpAddr = inet_addr(token);
1919       if ((int)tmpAddr == -1) {
1920         printf("error in %s: bad token:%s\n", CONFIG_FILENAME, token);
1921         fclose(configFile);
1922         return -1;
1923       } else
1924         addHost(htonl(tmpAddr));
1925       token = strtok(NULL, delimiters);
1926     }
1927   }
1928
1929   fclose(configFile);
1930
1931   if (numHostsInSystem < 1) {
1932     printf("error in %s: no IP Adresses found\n", CONFIG_FILENAME);
1933     return -1;
1934   }
1935 #ifdef MAC
1936   myIpAddr = getMyIpAddr("en1");
1937 #else
1938   myIpAddr = getMyIpAddr("eth0");
1939 #endif
1940   myIndexInHostArray = findHost(myIpAddr);
1941   if (myIndexInHostArray == -1) {
1942     printf("error in %s: IP Address of eth0 not found\n", CONFIG_FILENAME);
1943     return -1;
1944   }
1945   oidsPerBlock = (0xFFFFFFFF / numHostsInSystem) + 1;
1946   oidMin = oidsPerBlock * myIndexInHostArray;
1947   if (myIndexInHostArray == numHostsInSystem - 1)
1948     oidMax = 0xFFFFFFFF;
1949   else
1950     oidMax = oidsPerBlock * (myIndexInHostArray + 1) - 1;
1951
1952   return 0;
1953 }
1954
1955 void addHost(unsigned int hostIp) {
1956   unsigned int *tmpArray;
1957
1958   if (findHost(hostIp) != -1)
1959     return;
1960
1961   if (numHostsInSystem == sizeOfHostArray) {
1962     tmpArray = calloc(sizeOfHostArray * 2, sizeof(unsigned int));
1963     memcpy(tmpArray, hostIpAddrs, sizeof(unsigned int) * numHostsInSystem);
1964     free(hostIpAddrs);
1965     hostIpAddrs = tmpArray;
1966   }
1967
1968   hostIpAddrs[numHostsInSystem++] = hostIp;
1969
1970   return;
1971 }
1972
1973 int findHost(unsigned int hostIp) {
1974   int i;
1975   for (i = 0; i < numHostsInSystem; i++)
1976     if (hostIpAddrs[i] == hostIp)
1977       return i;
1978
1979   //not found
1980   return -1;
1981 }
1982
1983 /* This function sends notification request per thread waiting on object(s) whose version
1984  * changes */
1985 int reqNotify(unsigned int *oidarry, unsigned short *versionarry, unsigned int numoid) {
1986   int sock,i;
1987   objheader_t *objheader;
1988   struct sockaddr_in remoteAddr;
1989   char msg[1 + numoid * (sizeof(unsigned short) + sizeof(unsigned int)) +  3 * sizeof(unsigned int)];
1990   char *ptr;
1991   int bytesSent;
1992   int status, size;
1993   unsigned short version;
1994   unsigned int oid,mid;
1995   static unsigned int threadid = 0;
1996   pthread_mutex_t threadnotify = PTHREAD_MUTEX_INITIALIZER; //Lock and condition var for threadjoin and notification
1997   pthread_cond_t threadcond = PTHREAD_COND_INITIALIZER;
1998   notifydata_t *ndata;
1999
2000   oid = oidarry[0];
2001   if((mid = lhashSearch(oid)) == 0) {
2002     printf("Error: %s() No such machine found for oid =%x\n",__func__, oid);
2003     return;
2004   }
2005
2006   if ((sock = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
2007     perror("reqNotify():socket()");
2008     return -1;
2009   }
2010
2011   bzero(&remoteAddr, sizeof(remoteAddr));
2012   remoteAddr.sin_family = AF_INET;
2013   remoteAddr.sin_port = htons(LISTEN_PORT);
2014   remoteAddr.sin_addr.s_addr = htonl(mid);
2015
2016   /* Generate unique threadid */
2017   threadid++;
2018
2019   /* Save threadid, numoid, oidarray, versionarray, pthread_cond_variable for later processing */
2020   if((ndata = calloc(1, sizeof(notifydata_t))) == NULL) {
2021     printf("Calloc Error %s, %d\n", __FILE__, __LINE__);
2022     return -1;
2023   }
2024   ndata->numoid = numoid;
2025   ndata->threadid = threadid;
2026   ndata->oidarry = oidarry;
2027   ndata->versionarry = versionarry;
2028   ndata->threadcond = threadcond;
2029   ndata->threadnotify = threadnotify;
2030   if((status = notifyhashInsert(threadid, ndata)) != 0) {
2031     printf("reqNotify(): Insert into notify hash table not successful %s, %d\n", __FILE__, __LINE__);
2032     free(ndata);
2033     return -1;
2034   }
2035
2036   /* Send  number of oids, oidarry, version array, machine id and threadid */
2037   if (connect(sock, (struct sockaddr *)&remoteAddr, sizeof(remoteAddr)) < 0) {
2038     printf("reqNotify():error %d connecting to %s:%d\n", errno,
2039            inet_ntoa(remoteAddr.sin_addr), LISTEN_PORT);
2040     free(ndata);
2041     return -1;
2042   } else {
2043     msg[0] = THREAD_NOTIFY_REQUEST;
2044     *((unsigned int *)(&msg[1])) = numoid;
2045     /* Send array of oids  */
2046     size = sizeof(unsigned int);
2047
2048     for(i = 0;i < numoid; i++) {
2049       oid = oidarry[i];
2050       *((unsigned int *)(&msg[1] + size)) = oid;
2051       size += sizeof(unsigned int);
2052     }
2053
2054     /* Send array of version  */
2055     for(i = 0;i < numoid; i++) {
2056       version = versionarry[i];
2057       *((unsigned short *)(&msg[1] + size)) = version;
2058       size += sizeof(unsigned short);
2059     }
2060
2061     *((unsigned int *)(&msg[1] + size)) = myIpAddr; size += sizeof(unsigned int);
2062     *((unsigned int *)(&msg[1] + size)) = threadid;
2063     pthread_mutex_lock(&(ndata->threadnotify));
2064     size = 1 + numoid * (sizeof(unsigned int) + sizeof(unsigned short)) + 3 * sizeof(unsigned int);
2065     send_data(sock, msg, size);
2066     pthread_cond_wait(&(ndata->threadcond), &(ndata->threadnotify));
2067     pthread_mutex_unlock(&(ndata->threadnotify));
2068   }
2069
2070   pthread_cond_destroy(&threadcond);
2071   pthread_mutex_destroy(&threadnotify);
2072   free(ndata);
2073   close(sock);
2074   return status;
2075 }
2076
2077 void threadNotify(unsigned int oid, unsigned short version, unsigned int tid) {
2078   notifydata_t *ndata;
2079   int i, objIsFound = 0, index;
2080   void *ptr;
2081
2082   //Look up the tid and call the corresponding pthread_cond_signal
2083   if((ndata = notifyhashSearch(tid)) == NULL) {
2084     printf("threadnotify(): No such threadid is present %s, %d\n", __FILE__, __LINE__);
2085     return;
2086   } else  {
2087     for(i = 0; i < ndata->numoid; i++) {
2088       if(ndata->oidarry[i] == oid) {
2089         objIsFound = 1;
2090         index = i;
2091       }
2092     }
2093     if(objIsFound == 0) {
2094       printf("threadNotify(): Oid not found %s, %d\n", __FILE__, __LINE__);
2095       return;
2096     } else {
2097       if(version <= ndata->versionarry[index]) {
2098         printf("threadNotify(): New version %d has not changed since last version for oid = %d, %s, %d\n", version, oid, __FILE__, __LINE__);
2099         return;
2100       } else {
2101 #ifdef CACHE
2102         /* Clear from prefetch cache and free thread related data structure */
2103         if((ptr = prehashSearch(oid)) != NULL) {
2104           prehashRemove(oid);
2105         }
2106 #endif
2107         pthread_mutex_lock(&(ndata->threadnotify));
2108         pthread_cond_signal(&(ndata->threadcond));
2109         pthread_mutex_unlock(&(ndata->threadnotify));
2110       }
2111     }
2112   }
2113   return;
2114 }
2115
2116 int notifyAll(threadlist_t **head, unsigned int oid, unsigned int version) {
2117   threadlist_t *ptr;
2118   unsigned int mid;
2119   struct sockaddr_in remoteAddr;
2120   char msg[1 + sizeof(unsigned short) + 2*sizeof(unsigned int)];
2121   int sock, status, size, bytesSent;
2122
2123   while(*head != NULL) {
2124     ptr = *head;
2125     mid = ptr->mid;
2126     //create a socket connection to that machine
2127     if ((sock = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
2128       perror("notifyAll():socket()");
2129       return -1;
2130     }
2131
2132     bzero(&remoteAddr, sizeof(remoteAddr));
2133     remoteAddr.sin_family = AF_INET;
2134     remoteAddr.sin_port = htons(LISTEN_PORT);
2135     remoteAddr.sin_addr.s_addr = htonl(mid);
2136     //send Thread Notify response and threadid to that machine
2137     if (connect(sock, (struct sockaddr *)&remoteAddr, sizeof(remoteAddr)) < 0) {
2138       printf("notifyAll():error %d connecting to %s:%d\n", errno,
2139              inet_ntoa(remoteAddr.sin_addr), LISTEN_PORT);
2140       fflush(stdout);
2141       status = -1;
2142     } else {
2143       bzero(msg, (1+sizeof(unsigned short) + 2*sizeof(unsigned int)));
2144       msg[0] = THREAD_NOTIFY_RESPONSE;
2145       *((unsigned int *)&msg[1]) = oid;
2146       size = sizeof(unsigned int);
2147       *((unsigned short *)(&msg[1]+ size)) = version;
2148       size+= sizeof(unsigned short);
2149       *((unsigned int *)(&msg[1]+ size)) = ptr->threadid;
2150
2151       size = 1 + 2*sizeof(unsigned int) + sizeof(unsigned short);
2152       send_data(sock, msg, size);
2153     }
2154     //close socket
2155     close(sock);
2156     // Update head
2157     *head = ptr->next;
2158     free(ptr);
2159   }
2160   return status;
2161 }
2162
2163 void transAbort() {
2164 #ifdef ABORTREADERS
2165   removetransactionhash();
2166 #endif
2167   objstrDelete(t_cache);
2168   t_chashDelete();
2169 }
2170
2171 /* This function inserts necessary information into
2172  * a machine pile data structure */
2173 plistnode_t *pInsert(plistnode_t *pile, objheader_t *headeraddr, unsigned int mid, int num_objs) {
2174   plistnode_t *ptr, *tmp;
2175   int found = 0, offset = 0;
2176
2177   tmp = pile;
2178   //Add oid into a machine that is already present in the pile linked list structure
2179   while(tmp != NULL) {
2180     if (tmp->mid == mid) {
2181       int tmpsize;
2182
2183       if (STATUS(headeraddr) & NEW) {
2184         tmp->oidcreated[tmp->numcreated] = OID(headeraddr);
2185         tmp->numcreated++;
2186         GETSIZE(tmpsize, headeraddr);
2187         tmp->sum_bytes += sizeof(objheader_t) + tmpsize;
2188       } else if (STATUS(headeraddr) & DIRTY) {
2189         tmp->oidmod[tmp->nummod] = OID(headeraddr);
2190         tmp->nummod++;
2191         GETSIZE(tmpsize, headeraddr);
2192         tmp->sum_bytes += sizeof(objheader_t) + tmpsize;
2193       } else {
2194         offset = (sizeof(unsigned int) + sizeof(short)) * tmp->numread;
2195         *((unsigned int *)(((char *)tmp->objread) + offset))=OID(headeraddr);
2196         offset += sizeof(unsigned int);
2197         *((short *)(((char *)tmp->objread) + offset)) = headeraddr->version;
2198         tmp->numread++;
2199       }
2200       found = 1;
2201       break;
2202     }
2203     tmp = tmp->next;
2204   }
2205   //Add oid for any new machine
2206   if (!found) {
2207     int tmpsize;
2208     if((ptr = pCreate(num_objs)) == NULL) {
2209       return NULL;
2210     }
2211     ptr->mid = mid;
2212     if (STATUS(headeraddr) & NEW) {
2213       ptr->oidcreated[ptr->numcreated] = OID(headeraddr);
2214       ptr->numcreated++;
2215       GETSIZE(tmpsize, headeraddr);
2216       ptr->sum_bytes += sizeof(objheader_t) + tmpsize;
2217     } else if (STATUS(headeraddr) & DIRTY) {
2218       ptr->oidmod[ptr->nummod] = OID(headeraddr);
2219       ptr->nummod++;
2220       GETSIZE(tmpsize, headeraddr);
2221       ptr->sum_bytes += sizeof(objheader_t) + tmpsize;
2222     } else {
2223       *((unsigned int *)ptr->objread)=OID(headeraddr);
2224       offset = sizeof(unsigned int);
2225       *((short *)(((char *)ptr->objread) + offset)) = headeraddr->version;
2226       ptr->numread++;
2227     }
2228     ptr->next = pile;
2229     pile = ptr;
2230   }
2231
2232   /* Clear Flags */
2233   STATUS(headeraddr) =0;
2234
2235
2236   return pile;
2237 }
2238
2239 plistnode_t *sortPiles(plistnode_t *pileptr) {
2240   plistnode_t *head, *ptr, *tail;
2241   head = pileptr;
2242   ptr = pileptr;
2243   /* Get tail pointer */
2244   while(ptr!= NULL) {
2245     tail = ptr;
2246     ptr = ptr->next;
2247   }
2248   ptr = pileptr;
2249   plistnode_t *prev = pileptr;
2250   /* Arrange local machine processing at the end of the pile list */
2251   while(ptr != NULL) {
2252     if(ptr != tail) {
2253       if(ptr->mid == myIpAddr && (prev != pileptr)) {
2254         prev->next = ptr->next;
2255         ptr->next = NULL;
2256         tail->next = ptr;
2257         return pileptr;
2258       }
2259       if((ptr->mid == myIpAddr) && (prev == pileptr)) {
2260         prev = ptr->next;
2261         ptr->next = NULL;
2262         tail->next = ptr;
2263         return prev;
2264       }
2265       prev = ptr;
2266     }
2267     ptr = ptr->next;
2268   }
2269   return pileptr;
2270 }