get some more improvements...now at 12x speedup for remote array accesses
[IRC.git] / Robust / src / Runtime / DSTM / interface / trans.c
1 #include "dstm.h"
2 #include "ip.h"
3 #include "machinepile.h"
4 #include "mlookup.h"
5 #include "llookup.h"
6 #include "plookup.h"
7 #include "prelookup.h"
8 #include "threadnotify.h"
9 #include "queue.h"
10 #include "addUdpEnhance.h"
11 #include "addPrefetchEnhance.h"
12 #include "gCollect.h"
13 #include "dsmlock.h"
14 #include "prefetch.h"
15 #ifdef COMPILER
16 #include "thread.h"
17 #endif
18 #ifdef ABORTREADERS
19 #include "abortreaders.h"
20 #endif
21 #include "trans.h"
22
23 #define NUM_THREADS 1
24 #define CONFIG_FILENAME "dstm.conf"
25
26 /* Thread transaction variables */
27
28 __thread objstr_t *t_cache;
29 __thread struct ___Object___ *revertlist;
30 #ifdef ABORTREADERS
31 __thread int t_abort;
32 __thread jmp_buf aborttrans;
33 #endif
34
35
36 /* Global Variables */
37 extern int classsize[];
38 pfcstats_t *evalPrefetch;
39 extern int numprefetchsites; //Global variable containing number of prefetch sites
40 extern pthread_mutex_t mainobjstore_mutex; // Mutex to lock main Object store
41 pthread_mutex_t prefetchcache_mutex; // Mutex to lock Prefetch Cache
42 pthread_mutexattr_t prefetchcache_mutex_attr; /* Attribute for lock to make it a recursive lock */
43 extern prehashtable_t pflookup; //Global Prefetch cache's lookup table
44 pthread_t wthreads[NUM_THREADS]; //Worker threads for working on the prefetch queue
45 pthread_t tPrefetch;            /* Primary Prefetch thread that processes the prefetch queue */
46 extern objstr_t *mainobjstore;
47 unsigned int myIpAddr;
48 unsigned int *hostIpAddrs;
49 int sizeOfHostArray;
50 int numHostsInSystem;
51 int myIndexInHostArray;
52 unsigned int oidsPerBlock;
53 unsigned int oidMin;
54 unsigned int oidMax;
55
56 sockPoolHashTable_t *transReadSockPool;
57 sockPoolHashTable_t *transPrefetchSockPool;
58 sockPoolHashTable_t *transRequestSockPool;
59 pthread_mutex_t notifymutex;
60 pthread_mutex_t atomicObjLock;
61
62 /***********************************
63  * Global Variables for statistics
64  **********************************/
65 int numTransCommit = 0;
66 int numTransAbort = 0;
67 int nchashSearch = 0;
68 int nmhashSearch = 0;
69 int nprehashSearch = 0;
70 int nRemoteSend = 0;
71 int nSoftAbort = 0;
72 int bytesSent = 0;
73 int bytesRecv = 0;
74 int totalObjSize = 0;
75 int sendRemoteReq = 0;
76 int getResponse = 0;
77
78 void printhex(unsigned char *, int);
79 plistnode_t *createPiles();
80 plistnode_t *sortPiles(plistnode_t *pileptr);
81
82 #ifdef LOGEVENTS
83 char bigarray[16*1024*1024];
84 int bigindex=0;
85 #define LOGEVENT(x) { \
86     int tmp=bigindex++;                         \
87     bigarray[tmp]=x;                            \
88   }
89 #else
90 #define LOGEVENT(x)
91 #endif
92
93 /*******************************
94 * Send and Recv function calls
95 *******************************/
96 void send_data(int fd, void *buf, int buflen) {
97   char *buffer = (char *)(buf);
98   int size = buflen;
99   int numbytes;
100   while (size > 0) {
101     numbytes = send(fd, buffer, size, MSG_NOSIGNAL);
102     bytesSent = bytesSent + numbytes;
103     if (numbytes == -1) {
104       perror("send");
105       exit(0);
106     }
107     buffer += numbytes;
108     size -= numbytes;
109   }
110 }
111
112 void send_buf(int fd, struct writestruct * sendbuffer, void *buffer, int buflen) {
113   if (buflen+sendbuffer->offset>WMAXBUF) {
114     send_data(fd, sendbuffer->buf, sendbuffer->offset);
115     sendbuffer->offset=0;
116     send_data(fd, buffer, buflen);
117     return;
118   }
119   memcpy(&sendbuffer->buf[sendbuffer->offset], buffer, buflen);
120   sendbuffer->offset+=buflen;
121   if (sendbuffer->offset>WTOP) {
122     send_data(fd, sendbuffer->buf, sendbuffer->offset);
123     sendbuffer->offset=0;
124   }
125 }
126
127 void forcesend_buf(int fd, struct writestruct * sendbuffer, void *buffer, int buflen) {
128   if (buflen+sendbuffer->offset>WMAXBUF) {
129     send_data(fd, sendbuffer->buf, sendbuffer->offset);
130     sendbuffer->offset=0;
131     send_data(fd, buffer, buflen);
132     return;
133   }
134   memcpy(&sendbuffer->buf[sendbuffer->offset], buffer, buflen);
135   sendbuffer->offset+=buflen;
136   send_data(fd, sendbuffer->buf, sendbuffer->offset);
137   sendbuffer->offset=0;
138 }
139
140 int recvw(int fd, void *buf, int len, int flags) {
141   return recv(fd, buf, len, flags);
142 }
143  
144 void recv_data_buf(int fd, struct readstruct * readbuffer, void *buffer, int buflen) {
145   char *buf=(char *)buffer;
146   int numbytes=readbuffer->head-readbuffer->tail;
147   if (numbytes>buflen)
148     numbytes=buflen;
149   if (numbytes>0) {
150     memcpy(buf, &readbuffer->buf[readbuffer->tail], numbytes);
151     readbuffer->tail+=numbytes;
152     buflen-=numbytes;
153     buf+=numbytes;
154   }
155   if (buflen==0) {
156     return;
157   }
158   if (buflen>=MAXBUF) {
159     recv_data(fd, buf, buflen);
160     return;
161   }
162   
163   int maxbuf=MAXBUF;
164   int obufflen=buflen;
165   readbuffer->head=0;
166   
167   while (buflen > 0) {
168     int numbytes = recvw(fd, &readbuffer->buf[readbuffer->head], maxbuf, 0);
169     if (numbytes == -1) {
170       perror("recv");
171       exit(0);
172     }
173     buflen-=numbytes;
174     readbuffer->head+=numbytes;
175     maxbuf-=numbytes;
176   }
177   memcpy(buf,readbuffer->buf,obufflen);
178   readbuffer->tail=obufflen;
179 }
180
181 int recv_data_errorcode_buf(int fd, struct readstruct * readbuffer, void *buffer, int buflen) {
182   char *buf=(char *)buffer;
183   //now tail<=head
184   int numbytes=readbuffer->head-readbuffer->tail;
185   if (numbytes>buflen)
186     numbytes=buflen;
187   if (numbytes>0) {
188     memcpy(buf, &readbuffer->buf[readbuffer->tail], numbytes);
189     readbuffer->tail+=numbytes;
190     buflen-=numbytes;
191     buf+=numbytes;
192   }
193   if (buflen==0)
194     return 1;
195
196   if (buflen>=MAXBUF) {
197     return recv_data_errorcode(fd, buf, buflen);
198   }
199
200   int maxbuf=MAXBUF;
201   int obufflen=buflen;
202   readbuffer->head=0;
203   
204   while (buflen > 0) {
205     int numbytes = recvw(fd, &readbuffer->buf[readbuffer->head], maxbuf, 0);
206     if (numbytes ==0) {
207       return 0;
208     }
209     if (numbytes==-1) {
210       perror("recvbuf");
211       return -1;
212     }
213     buflen-=numbytes;
214     readbuffer->head+=numbytes;
215     maxbuf-=numbytes;
216   }
217   memcpy(buf,readbuffer->buf,obufflen);
218   readbuffer->tail=obufflen;
219   return 1;
220 }
221
222
223 void recv_data(int fd, void *buf, int buflen) {
224   char *buffer = (char *)(buf);
225   int size = buflen;
226   int numbytes;
227   while (size > 0) {
228     numbytes = recvw(fd, buffer, size, 0);
229     bytesRecv = bytesRecv + numbytes;
230     if (numbytes == -1) {
231       perror("recv");
232       exit(0);
233     }
234     buffer += numbytes;
235     size -= numbytes;
236   }
237 }
238
239 int recv_data_errorcode(int fd, void *buf, int buflen) {
240   char *buffer = (char *)(buf);
241   int size = buflen;
242   int numbytes;
243   while (size > 0) {
244     numbytes = recvw(fd, buffer, size, 0);
245     if (numbytes==0)
246       return 0;
247     if (numbytes == -1) {
248       perror("recv");
249       return -1;
250     }
251     buffer += numbytes;
252     size -= numbytes;
253   }
254   return 1;
255 }
256
257 void printhex(unsigned char *ptr, int numBytes) {
258   int i;
259   for (i = 0; i < numBytes; i++) {
260     if (ptr[i] < 16)
261       printf("0%x ", ptr[i]);
262     else
263       printf("%x ", ptr[i]);
264   }
265   printf("\n");
266   return;
267 }
268
269 inline int arrayLength(int *array) {
270   int i;
271   for(i=0 ; array[i] != -1; i++)
272     ;
273   return i;
274 }
275
276 inline int findmax(int *array, int arraylength) {
277   int max, i;
278   max = array[0];
279   for(i = 0; i < arraylength; i++) {
280     if(array[i] > max) {
281       max = array[i];
282     }
283   }
284   return max;
285 }
286
287 //#define INLINEPREFETCH
288 #define PREFTHRESHOLD 4
289
290 /* This function is a prefetch call generated by the compiler that
291  * populates the shared primary prefetch queue*/
292 void prefetch(int siteid, int ntuples, unsigned int *oids, unsigned short *endoffsets, short *arrayfields) {
293   /* Allocate for the queue node*/
294   int qnodesize = 2*sizeof(int) + ntuples * (sizeof(unsigned short) + sizeof(unsigned int)) + endoffsets[ntuples - 1] * sizeof(short);
295   int len;
296 #ifdef INLINEPREFETCH
297   int attempted=0;
298   char *node;
299   do {
300   node=getmemory(qnodesize);
301   if (node==NULL&&attempted)
302     break;
303   if (node!=NULL) {
304 #else
305   char *node=getmemory(qnodesize);
306 #endif
307   int top=endoffsets[ntuples-1];
308
309   if (node==NULL) {
310     LOGEVENT('D');
311     return;
312   }
313   /* Set queue node values */
314
315   /* TODO: Remove this after testing */
316   evalPrefetch[siteid].callcount++;
317
318   *((int *)(node))=siteid;
319   *((int *)(node + sizeof(int))) = ntuples;
320   len = 2*sizeof(int);
321   memcpy(node+len, oids, ntuples*sizeof(unsigned int));
322   memcpy(node+len+ntuples*sizeof(unsigned int), endoffsets, ntuples*sizeof(unsigned short));
323   memcpy(node+len+ntuples*(sizeof(unsigned int)+sizeof(short)), arrayfields, top*sizeof(short));
324
325 #ifdef INLINEPREFETCH
326   movehead(qnodesize);
327   }
328   int numpref=numavailable();
329   attempted=1;
330
331   if (node==NULL && numpref!=0 || numpref==PREFTHRESHOLD) {
332     node=gettail();
333     prefetchpile_t *pilehead = foundLocal(node,numpref);
334     if (pilehead!=NULL) {
335       // Get sock from shared pool
336       
337       /* Send  Prefetch Request */
338       prefetchpile_t *ptr = pilehead;
339       while(ptr != NULL) {
340         int sd = getSock2(transPrefetchSockPool, ptr->mid);
341         sendPrefetchReq(ptr, sd);
342         ptr = ptr->next;
343       }
344       
345       mcdealloc(pilehead);
346       resetqueue();
347     }
348   }//end do prefetch if condition
349   } while(node==NULL);
350 #else
351   /* Lock and insert into primary prefetch queue */
352   movehead(qnodesize);
353 #endif
354 }
355
356 /* This function starts up the transaction runtime. */
357 int dstmStartup(const char * option) {
358   pthread_t thread_Listen, udp_thread_Listen;
359   pthread_attr_t attr;
360   int master=option!=NULL && strcmp(option, "master")==0;
361   int fd;
362   int udpfd;
363
364   if (processConfigFile() != 0)
365     return 0; //TODO: return error value, cause main program to exit
366 #ifdef COMPILER
367   if (!master)
368     threadcount--;
369 #endif
370
371 #ifdef TRANSSTATS
372   printf("Trans stats is on\n");
373   fflush(stdout);
374 #endif
375 #ifdef ABORTREADERS
376   initreaderlist();
377 #endif
378
379   //Initialize socket pool
380   transReadSockPool = createSockPool(transReadSockPool, DEFAULTSOCKPOOLSIZE);
381   transPrefetchSockPool = createSockPool(transPrefetchSockPool, DEFAULTSOCKPOOLSIZE);
382   transRequestSockPool = createSockPool(transRequestSockPool, DEFAULTSOCKPOOLSIZE);
383
384   dstmInit();
385   transInit();
386
387   fd=startlistening();
388   pthread_attr_init(&attr);
389   pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
390 #ifdef CACHE
391   udpfd = udpInit();
392   pthread_create(&udp_thread_Listen, &attr, udpListenBroadcast, (void*)udpfd);
393 #endif
394   if (master) {
395     pthread_create(&thread_Listen, &attr, dstmListen, (void*)fd);
396     return 1;
397   } else {
398     dstmListen((void *)fd);
399     return 0;
400   }
401 }
402
403 //TODO Use this later
404 void *pCacheAlloc(objstr_t *store, unsigned int size) {
405   void *tmp;
406   objstr_t *ptr;
407   ptr = store;
408   int success = 0;
409
410   while(ptr->next != NULL) {
411     /* check if store is empty */
412     if(((unsigned int)ptr->top - (unsigned int)ptr - sizeof(objstr_t) + size) <= ptr->size) {
413       tmp = ptr->top;
414       ptr->top += size;
415       success = 1;
416       return tmp;
417     } else {
418       ptr = ptr->next;
419     }
420   }
421
422   if(success == 0) {
423     return NULL;
424   }
425 }
426
427 /* This function initiates the prefetch thread A queue is shared
428  * between the main thread of execution and the prefetch thread to
429  * process the prefetch call Call from compiler populates the shared
430  * queue with prefetch requests while prefetch thread processes the
431  * prefetch requests */
432
433 void transInit() {
434   //Create and initialize prefetch cache structure
435 #ifdef CACHE
436   initializePCache();
437   if((evalPrefetch = initPrefetchStats()) == NULL) {
438     printf("%s() Error allocating memory at %s, %d\n", __func__, __FILE__, __LINE__);
439     exit(0);
440   }
441 #endif
442
443   /* Initialize attributes for mutex */
444   pthread_mutexattr_init(&prefetchcache_mutex_attr);
445   pthread_mutexattr_settype(&prefetchcache_mutex_attr, PTHREAD_MUTEX_RECURSIVE_NP);
446
447   pthread_mutex_init(&prefetchcache_mutex, &prefetchcache_mutex_attr);
448   pthread_mutex_init(&notifymutex, NULL);
449   pthread_mutex_init(&atomicObjLock, NULL);
450 #ifdef CACHE
451   //Create prefetch cache lookup table
452   if(prehashCreate(PHASH_SIZE, PLOADFACTOR)) {
453     printf("ERROR\n");
454     return; //Failure
455   }
456
457   //Initialize primary shared queue
458   queueInit();
459   //Initialize machine pile w/prefetch oids and offsets shared queue
460   mcpileqInit();
461
462   //Create the primary prefetch thread
463   int retval;
464 #ifdef RANGEPREFETCH
465   do {
466     retval=pthread_create(&tPrefetch, NULL, transPrefetchNew, NULL);
467   } while(retval!=0);
468 #else
469 #ifndef INLINEPREFETCH
470   do {
471     retval=pthread_create(&tPrefetch, NULL, transPrefetch, NULL);
472   } while(retval!=0);
473 #endif
474 #endif
475 #ifndef INLINEPREFETCH
476   pthread_detach(tPrefetch);
477 #endif
478 #endif
479 }
480
481 /* This function stops the threads spawned */
482 void transExit() {
483 #ifdef CACHE
484   int t;
485   pthread_cancel(tPrefetch);
486   for(t = 0; t < NUM_THREADS; t++)
487     pthread_cancel(wthreads[t]);
488 #endif
489
490   return;
491 }
492
493 /* This functions inserts randowm wait delays in the order of msec
494  * Mostly used when transaction commits retry*/
495 void randomdelay() {
496   struct timespec req;
497   time_t t;
498
499   t = time(NULL);
500   req.tv_sec = 0;
501   req.tv_nsec = (long)(1000 + (t%10000)); //1-11 microsec
502   nanosleep(&req, NULL);
503   return;
504 }
505
506 /* This function initializes things required in the transaction start*/
507 void transStart() {
508   t_cache = objstrCreate(1048576);
509   t_chashCreate(CHASH_SIZE, CLOADFACTOR);
510   revertlist=NULL;
511 #ifdef ABORTREADERS
512   t_abort=0;
513 #endif
514 }
515
516 // Search for an address for a given oid                                                                               
517 /*#define INLINE    inline __attribute__((always_inline))
518
519 INLINE void * chashSearchI(chashtable_t *table, unsigned int key) {
520   //REMOVE HASH FUNCTION CALL TO MAKE SURE IT IS INLINED HERE                                                          
521   chashlistnode_t *node = &table->table[(key & table->mask)>>1];
522
523   do {
524     if(node->key == key) {
525       return node->val;
526     }
527     node = node->next;
528   } while(node != NULL);
529
530   return NULL;
531   }*/
532
533
534
535
536 /* This function finds the location of the objects involved in a transaction
537  * and returns the pointer to the object if found in a remote location */
538 __attribute__((pure)) objheader_t *transRead(unsigned int oid) {
539   unsigned int machinenumber;
540   objheader_t *tmp, *objheader;
541   objheader_t *objcopy;
542   int size;
543   void *buf;
544   chashlistnode_t *node;
545
546   if(oid == 0) {
547     return NULL;
548   }
549   
550
551   node= &c_table[(oid & c_mask)>>1];
552   do {
553     if(node->key == oid) {
554 #ifdef TRANSSTATS
555     nchashSearch++;
556 #endif
557 #ifdef COMPILER
558     return &((objheader_t*)node->val)[1];
559 #else
560     return node->val;
561 #endif
562     }
563     node = node->next;
564   } while(node != NULL);
565   
566
567   /*  
568   if((objheader = chashSearchI(record->lookupTable, oid)) != NULL) {
569 #ifdef TRANSSTATS
570     nchashSearch++;
571 #endif
572 #ifdef COMPILER
573     return &objheader[1];
574 #else
575     return objheader;
576 #endif
577   } else 
578   */
579
580 #ifdef ABORTREADERS
581   if (t_abort) {
582     //abort this transaction
583     //printf("ABORTING\n");
584     removetransactionhash();
585     objstrDelete(t_cache);
586     t_chashDelete();
587     _longjmp(aborttrans,1);
588   } else
589     addtransaction(oid);
590 #endif
591
592   if ((objheader = (objheader_t *) mhashSearch(oid)) != NULL) {
593 #ifdef TRANSSTATS
594     nmhashSearch++;
595 #endif
596     /* Look up in machine lookup table  and copy  into cache*/
597     GETSIZE(size, objheader);
598     size += sizeof(objheader_t);
599     objcopy = (objheader_t *) objstrAlloc(&t_cache, size);
600     memcpy(objcopy, objheader, size);
601     /* Insert into cache's lookup table */
602     STATUS(objcopy)=0;
603     t_chashInsert(OID(objheader), objcopy);
604 #ifdef COMPILER
605     return &objcopy[1];
606 #else
607     return objcopy;
608 #endif
609   } else {
610 #ifdef CACHE
611     if((tmp = (objheader_t *) prehashSearch(oid)) != NULL) {
612 #ifdef TRANSSTATS
613       nprehashSearch++;
614 #endif
615       /* Look up in prefetch cache */
616       GETSIZE(size, tmp);
617       size+=sizeof(objheader_t);
618       objcopy = (objheader_t *) objstrAlloc(&t_cache, size);
619       memcpy(objcopy, tmp, size);
620       /* Insert into cache's lookup table */
621       t_chashInsert(OID(tmp), objcopy);
622 #ifdef COMPILER
623       return &objcopy[1];
624 #else
625       return objcopy;
626 #endif
627     }
628 #endif
629     /* Get the object from the remote location */
630     if((machinenumber = lhashSearch(oid)) == 0) {
631       printf("Error: %s() No machine found for oid =% %s,%dx\n",__func__, machinenumber, __FILE__, __LINE__);
632       return NULL;
633     }
634     objcopy = getRemoteObj(machinenumber, oid);
635
636     if(objcopy == NULL) {
637       printf("Error: Object not found in Remote location %s, %d\n", __FILE__, __LINE__);
638       return NULL;
639     } else {
640 #ifdef TRANSSTATS
641       nRemoteSend++;
642 #endif
643 #ifdef COMPILER
644       return &objcopy[1];
645 #else
646       return objcopy;
647 #endif
648     }
649   }
650 }
651
652
653 /* This function finds the location of the objects involved in a transaction
654  * and returns the pointer to the object if found in a remote location */
655 __attribute__((pure)) objheader_t *transRead2(unsigned int oid) {
656   unsigned int machinenumber;
657   objheader_t *tmp, *objheader;
658   objheader_t *objcopy;
659   int size;
660
661 #ifdef ABORTREADERS
662   if (t_abort) {
663     //abort this transaction
664     //printf("ABORTING\n");
665     removetransactionhash();
666     objstrDelete(t_cache);
667     t_chashDelete();
668     _longjmp(aborttrans,1);
669   } else
670     addtransaction(oid);
671 #endif
672
673   if ((objheader = (objheader_t *) mhashSearch(oid)) != NULL) {
674 #ifdef TRANSSTATS
675     nmhashSearch++;
676 #endif
677     /* Look up in machine lookup table  and copy  into cache*/
678     GETSIZE(size, objheader);
679     size += sizeof(objheader_t);
680     objcopy = (objheader_t *) objstrAlloc(&t_cache, size);
681     memcpy(objcopy, objheader, size);
682     /* Insert into cache's lookup table */
683     STATUS(objcopy)=0;
684     t_chashInsert(OID(objheader), objcopy);
685 #ifdef COMPILER
686     return &objcopy[1];
687 #else
688     return objcopy;
689 #endif
690   } else {
691 #ifdef CACHE
692     if((tmp = (objheader_t *) prehashSearch(oid)) != NULL) {
693 #ifdef TRANSSTATS
694       LOGEVENT('P')
695       nprehashSearch++;
696 #endif
697       /* Look up in prefetch cache */
698       GETSIZE(size, tmp);
699       size+=sizeof(objheader_t);
700       objcopy = (objheader_t *) objstrAlloc(&t_cache, size);
701       memcpy(objcopy, tmp, size);
702       /* Insert into cache's lookup table */
703       t_chashInsert(OID(tmp), objcopy);
704 #ifdef COMPILER
705       return &objcopy[1];
706 #else
707       return objcopy;
708 #endif
709     }
710 #endif
711     /* Get the object from the remote location */
712     if((machinenumber = lhashSearch(oid)) == 0) {
713       printf("Error: %s() No machine found for oid =% %s,%dx\n",__func__, machinenumber, __FILE__, __LINE__);
714       return NULL;
715     }
716     objcopy = getRemoteObj(machinenumber, oid);
717
718     if(objcopy == NULL) {
719       printf("Error: Object not found in Remote location %s, %d\n", __FILE__, __LINE__);
720       return NULL;
721     } else {
722 #ifdef TRANSSTATS
723
724       LOGEVENT('R');
725       nRemoteSend++;
726 #endif
727 #ifdef COMPILER
728       return &objcopy[1];
729 #else
730       return objcopy;
731 #endif
732     }
733   }
734 }
735
736 /* This function creates objects in the transaction record */
737 objheader_t *transCreateObj(unsigned int size) {
738   objheader_t *tmp = (objheader_t *) objstrAlloc(&t_cache, (sizeof(objheader_t) + size));
739   OID(tmp) = getNewOID();
740   tmp->version = 1;
741   tmp->rcount = 1;
742   STATUS(tmp) = NEW;
743   t_chashInsert(OID(tmp), tmp);
744
745 #ifdef COMPILER
746   return &tmp[1]; //want space after object header
747 #else
748   return tmp;
749 #endif
750 }
751
752 #if 1
753 /* This function creates machine piles based on all machines involved in a
754  * transaction commit request */
755 plistnode_t *createPiles() {
756   int i;
757   plistnode_t *pile = NULL;
758   unsigned int machinenum;
759   objheader_t *headeraddr;
760   chashlistnode_t * ptr = c_table;
761   /* Represents number of bins in the chash table */
762   unsigned int size = c_size;
763
764   for(i = 0; i < size ; i++) {
765     chashlistnode_t * curr = &ptr[i];
766     /* Inner loop to traverse the linked list of the cache lookupTable */
767     while(curr != NULL) {
768       //if the first bin in hash table is empty
769       if(curr->key == 0)
770         break;
771       headeraddr=(objheader_t *) curr->val;
772
773       //Get machine location for object id (and whether local or not)
774       if (STATUS(headeraddr) & NEW || (mhashSearch(curr->key) != NULL)) {
775         machinenum = myIpAddr;
776       } else if ((machinenum = lhashSearch(curr->key)) == 0) {
777         printf("Error: No such machine %s, %d\n", __FILE__, __LINE__);
778         return NULL;
779       }
780
781       //Make machine groups
782       pile = pInsert(pile, headeraddr, machinenum, c_numelements);
783       curr = curr->next;
784     }
785   }
786   return pile;
787 }
788 #else
789 /* This function creates machine piles based on all machines involved in a
790  * transaction commit request */
791 plistnode_t *createPiles() {
792   int i;
793   plistnode_t *pile = NULL;
794   unsigned int machinenum;
795   objheader_t *headeraddr;
796   struct chashentry * ptr = c_table;
797   /* Represents number of bins in the chash table */
798   unsigned int size = c_size;
799
800   for(i = 0; i < size ; i++) {
801     struct chashentry * curr = & ptr[i];
802     /* Inner loop to traverse the linked list of the cache lookupTable */
803     //if the first bin in hash table is empty
804     if(curr->key == 0)
805       continue;
806     headeraddr=(objheader_t *) curr->ptr;
807
808     //Get machine location for object id (and whether local or not)
809     if (STATUS(headeraddr) & NEW || (mhashSearch(curr->key) != NULL)) {
810       machinenum = myIpAddr;
811     } else if ((machinenum = lhashSearch(curr->key)) == 0) {
812       printf("Error: No such machine %s, %d\n", __FILE__, __LINE__);
813       return NULL;
814     }
815
816     //Make machine groups
817     pile = pInsert(pile, headeraddr, machinenum, c_numelements);
818   }
819   return pile;
820 }
821 #endif
822
823 /* This function initiates the transaction commit process
824  * Spawns threads for each of the new connections with Participants
825  * and creates new piles by calling the createPiles(),
826  * Sends a transrequest() to each remote machines for objects found remotely
827  * and calls handleLocalReq() to process objects found locally */
828 int transCommit() {
829   unsigned int tot_bytes_mod, *listmid;
830   plistnode_t *pile, *pile_ptr;
831   char treplyretry; /* keeps track of the common response that needs to be sent */
832   int firsttime=1;
833   trans_commit_data_t transinfo; /* keeps track of objs locked during transaction */
834   char finalResponse;
835
836 #ifdef LOGEVENTS
837   int iii;
838   for(iii=0;iii<bigindex;iii++) {
839     printf("%c", bigarray[iii]);
840   }
841 #endif
842
843 #ifdef ABORTREADERS
844   if (t_abort) {
845     //abort this transaction
846     /* Debug
847      * printf("ABORTING TRANSACTION AT COMMIT\n");
848      */
849     removetransactionhash();
850     objstrDelete(t_cache);
851     t_chashDelete();
852     return 1;
853   }
854 #endif
855
856
857   do {
858     treplyretry = 0;
859
860     /* Look through all the objects in the transaction record and make piles
861      * for each machine involved in the transaction*/
862     if (firsttime) {
863       pile_ptr = pile = createPiles();
864       pile_ptr = pile = sortPiles(pile);
865     } else {
866       pile = pile_ptr;
867     }
868     firsttime = 0;
869     /* Create the packet to be sent in TRANS_REQUEST */
870
871     /* Count the number of participants */
872     int pilecount;
873     pilecount = pCount(pile);
874
875     /* Create a list of machine ids(Participants) involved in transaction   */
876     listmid = calloc(pilecount, sizeof(unsigned int));
877     pListMid(pile, listmid);
878
879     /* Create a socket and getReplyCtrl array, initialize */
880     int socklist[pilecount];
881     int loopcount;
882     for(loopcount = 0 ; loopcount < pilecount; loopcount++)
883       socklist[loopcount] = 0;
884     char getReplyCtrl[pilecount];
885     for(loopcount = 0 ; loopcount < pilecount; loopcount++)
886       getReplyCtrl[loopcount] = 0;
887
888     /* Process each machine pile */
889     int sockindex = 0;
890     trans_req_data_t *tosend;
891     tosend = calloc(pilecount, sizeof(trans_req_data_t));
892     while(pile != NULL) {
893       tosend[sockindex].f.control = TRANS_REQUEST;
894       tosend[sockindex].f.mcount = pilecount;
895       tosend[sockindex].f.numread = pile->numread;
896       tosend[sockindex].f.nummod = pile->nummod;
897       tosend[sockindex].f.numcreated = pile->numcreated;
898       tosend[sockindex].f.sum_bytes = pile->sum_bytes;
899       tosend[sockindex].listmid = listmid;
900       tosend[sockindex].objread = pile->objread;
901       tosend[sockindex].oidmod = pile->oidmod;
902       tosend[sockindex].oidcreated = pile->oidcreated;
903       int sd = 0;
904       if(pile->mid != myIpAddr) {
905         if((sd = getSock2WithLock(transRequestSockPool, pile->mid)) < 0) {
906           printf("transRequest(): socket create error\n");
907           free(listmid);
908           free(tosend);
909           return 1;
910         }
911         socklist[sockindex] = sd;
912         /* Send bytes of data with TRANS_REQUEST control message */
913         send_data(sd, &(tosend[sockindex].f), sizeof(fixed_data_t));
914
915         /* Send list of machines involved in the transaction */
916         {
917           int size=sizeof(unsigned int)*(tosend[sockindex].f.mcount);
918           send_data(sd, tosend[sockindex].listmid, size);
919         }
920
921         /* Send oids and version number tuples for objects that are read */
922         {
923           int size=(sizeof(unsigned int)+sizeof(unsigned short))*(tosend[sockindex].f.numread);
924           send_data(sd, tosend[sockindex].objread, size);
925         }
926
927         /* Send objects that are modified */
928         void *modptr;
929         if((modptr = calloc(1, tosend[sockindex].f.sum_bytes)) == NULL) {
930           printf("Calloc error for modified objects %s, %d\n", __FILE__, __LINE__);
931           free(listmid);
932           free(tosend);
933           return 1;
934         }
935         int offset = 0;
936         int i;
937         for(i = 0; i < tosend[sockindex].f.nummod ; i++) {
938           int size;
939           objheader_t *headeraddr;
940           if((headeraddr = t_chashSearch(tosend[sockindex].oidmod[i])) == NULL) {
941             printf("%s() Error: No such oid %s, %d\n", __func__, __FILE__, __LINE__);
942             free(modptr);
943             free(listmid);
944             free(tosend);
945             return 1;
946           }
947           GETSIZE(size,headeraddr);
948           size+=sizeof(objheader_t);
949           memcpy(modptr+offset, headeraddr, size);
950           offset+=size;
951         }
952         send_data(sd, modptr, tosend[sockindex].f.sum_bytes);
953         free(modptr);
954       } else { //handle request locally
955         handleLocalReq(&tosend[sockindex], &transinfo, &getReplyCtrl[sockindex]);
956       }
957       sockindex++;
958       pile = pile->next;
959     } //end of pile processing
960       /* Recv Ctrl msgs from all machines */
961     int i;
962     for(i = 0; i < pilecount; i++) {
963       int sd = socklist[i];
964       if(sd != 0) {
965         char control;
966         recv_data(sd, &control, sizeof(char));
967         //Update common data structure with new ctrl msg
968         getReplyCtrl[i] = control;
969         /* Recv Objects if participant sends TRANS_DISAGREE */
970 #ifdef CACHE
971         if(control == TRANS_DISAGREE) {
972           int length;
973           recv_data(sd, &length, sizeof(int));
974           void *newAddr;
975           pthread_mutex_lock(&prefetchcache_mutex);
976           if ((newAddr = prefetchobjstrAlloc((unsigned int)length)) == NULL) {
977             printf("Error: %s() objstrAlloc error for copying into prefetch cache %s, %d\n", __func__, __FILE__, __LINE__);
978             free(tosend);
979             free(listmid);
980             pthread_mutex_unlock(&prefetchcache_mutex);
981             return 1;
982           }
983           pthread_mutex_unlock(&prefetchcache_mutex);
984           recv_data(sd, newAddr, length);
985           int offset = 0;
986           while(length != 0) {
987             unsigned int oidToPrefetch;
988             objheader_t * header;
989             header = (objheader_t *)(((char *)newAddr) + offset);
990             oidToPrefetch = OID(header);
991             STATUS(header)=0;
992             int size = 0;
993             GETSIZE(size, header);
994             size += sizeof(objheader_t);
995             //make an entry in prefetch hash table
996             void *oldptr;
997             if((oldptr = prehashSearch(oidToPrefetch)) != NULL) {
998               prehashRemove(oidToPrefetch);
999               prehashInsert(oidToPrefetch, header);
1000             } else {
1001               prehashInsert(oidToPrefetch, header);
1002             }
1003             length = length - size;
1004             offset += size;
1005           }
1006         } //end of receiving objs
1007 #endif
1008       }
1009     }
1010     /* Decide the final response */
1011     if((finalResponse = decideResponse(getReplyCtrl, &treplyretry, pilecount)) == 0) {
1012       printf("Error: %s() in updating prefetch cache %s, %d\n", __func__, __FILE__, __LINE__);
1013       free(tosend);
1014       free(listmid);
1015       return 1;
1016     }
1017
1018     /* Send responses to all machines */
1019     for(i = 0; i < pilecount; i++) {
1020       int sd = socklist[i];
1021       if(sd != 0) {
1022 #ifdef CACHE
1023         if(finalResponse == TRANS_COMMIT) {
1024           int retval;
1025           /* Update prefetch cache */
1026           if((retval = updatePrefetchCache(&(tosend[i]))) != 0) {
1027             printf("Error: %s() in updating prefetch cache %s, %d\n", __func__, __FILE__, __LINE__);
1028             free(tosend);
1029             free(listmid);
1030             return 1;
1031           }
1032
1033
1034           /* Invalidate objects in other machine cache */
1035           if(tosend[i].f.nummod > 0) {
1036             if((retval = invalidateObj(&(tosend[i]))) != 0) {
1037               printf("Error: %s() in invalidating Objects %s, %d\n", __func__, __FILE__, __LINE__);
1038               free(tosend);
1039               free(listmid);
1040               return 1;
1041             }
1042           }
1043 #ifdef ABORTREADERS
1044           removetransaction(tosend[i].oidmod,tosend[i].f.nummod);
1045           removethisreadtransaction(tosend[i].objread, tosend[i].f.numread);
1046 #endif
1047         }
1048 #ifdef ABORTREADERS
1049         else if (!treplyretry) {
1050           removethistransaction(tosend[i].oidmod,tosend[i].f.nummod);
1051           removethisreadtransaction(tosend[i].objread,tosend[i].f.numread);
1052         }
1053 #endif
1054 #endif
1055         send_data(sd, &finalResponse, sizeof(char));
1056       } else {
1057         /* Complete local processing */
1058         doLocalProcess(finalResponse, &(tosend[i]), &transinfo);
1059 #ifdef ABORTREADERS
1060         if(finalResponse == TRANS_COMMIT) {
1061           removetransaction(tosend[i].oidmod,tosend[i].f.nummod);
1062           removethisreadtransaction(tosend[i].objread,tosend[i].f.numread);
1063         } else if (!treplyretry) {
1064           removethistransaction(tosend[i].oidmod,tosend[i].f.nummod);
1065           removethisreadtransaction(tosend[i].objread,tosend[i].f.numread);
1066         }
1067 #endif
1068       }
1069     }
1070
1071     /* Free resources */
1072     free(tosend);
1073     free(listmid);
1074     if (!treplyretry)
1075       pDelete(pile_ptr);
1076     /* wait a random amount of time before retrying to commit transaction*/
1077     if(treplyretry) {
1078       randomdelay();
1079 #ifdef TRANSSTATS
1080       nSoftAbort++;
1081 #endif
1082     }
1083     /* Retry trans commit procedure during soft_abort case */
1084   } while (treplyretry);
1085
1086   if(finalResponse == TRANS_ABORT) {
1087     //printf("Aborting trans\n");
1088 #ifdef TRANSSTATS
1089     numTransAbort++;
1090 #endif
1091     /* Free Resources */
1092     objstrDelete(t_cache);
1093     t_chashDelete();
1094     return TRANS_ABORT;
1095   } else if(finalResponse == TRANS_COMMIT) {
1096 #ifdef TRANSSTATS
1097     numTransCommit++;
1098 #endif
1099     /* Free Resources */
1100     objstrDelete(t_cache);
1101     t_chashDelete();
1102     return 0;
1103   } else {
1104     //TODO Add other cases
1105     printf("Error: in %s() THIS SHOULD NOT HAPPEN.....EXIT PROGRAM\n", __func__);
1106     exit(-1);
1107   }
1108   return 0;
1109 }
1110
1111 /* This function handles the local objects involved in a transaction
1112  * commiting process.  It also makes a decision if this local machine
1113  * sends AGREE or DISAGREE or SOFT_ABORT to coordinator */
1114 void handleLocalReq(trans_req_data_t *tdata, trans_commit_data_t *transinfo, char *getReplyCtrl) {
1115   unsigned int *oidnotfound = NULL, *oidlocked = NULL;
1116   int numoidnotfound = 0, numoidlocked = 0;
1117   int v_nomatch = 0, v_matchlock = 0, v_matchnolock = 0;
1118   int numread, i;
1119   unsigned int oid;
1120   unsigned short version;
1121
1122   /* Counters and arrays to formulate decision on control message to be sent */
1123   oidnotfound = (unsigned int *) calloc((tdata->f.numread + tdata->f.nummod), sizeof(unsigned int));
1124   oidlocked = (unsigned int *) calloc((tdata->f.numread + tdata->f.nummod +1), sizeof(unsigned int)); // calloc additional 1 byte for
1125   //setting a divider between read and write locks
1126   numread = tdata->f.numread;
1127   /* Process each oid in the machine pile/ group per thread */
1128   for (i = 0; i < tdata->f.numread + tdata->f.nummod; i++) {
1129     if (i < tdata->f.numread) {
1130       int incr = sizeof(unsigned int) + sizeof(unsigned short); // Offset that points to next position in the objread array
1131       incr *= i;
1132       oid = *((unsigned int *)(((char *)tdata->objread) + incr));
1133       version = *((unsigned short *)(((char *)tdata->objread) + incr + sizeof(unsigned int)));
1134       commitCountForObjRead(getReplyCtrl, oidnotfound, oidlocked, &numoidnotfound, &numoidlocked, &v_nomatch, &v_matchlock, &v_matchnolock, oid, version);
1135     } else { // Objects Modified
1136       if(i == tdata->f.numread) {
1137         oidlocked[numoidlocked++] = -1;
1138       }
1139       int tmpsize;
1140       objheader_t *headptr;
1141       headptr = (objheader_t *) t_chashSearch(tdata->oidmod[i-numread]);
1142       if (headptr == NULL) {
1143         printf("Error: handleLocalReq() returning NULL, no such oid %s, %d\n", __FILE__, __LINE__);
1144         return;
1145       }
1146       oid = OID(headptr);
1147       version = headptr->version;
1148       commitCountForObjMod(getReplyCtrl, oidnotfound, oidlocked, &numoidnotfound, &numoidlocked, &v_nomatch, &v_matchlock, &v_matchnolock, oid, version);
1149     }
1150   }
1151
1152 /* Fill out the trans_commit_data_t data structure. This is required for a trans commit process
1153  * if Participant receives a TRANS_COMMIT */
1154   transinfo->objlocked = oidlocked;
1155   transinfo->objnotfound = oidnotfound;
1156   transinfo->modptr = NULL;
1157   transinfo->numlocked = numoidlocked;
1158   transinfo->numnotfound = numoidnotfound;
1159
1160   /* Condition to send TRANS_AGREE */
1161   if(v_matchnolock == tdata->f.numread + tdata->f.nummod) {
1162     *getReplyCtrl = TRANS_AGREE;
1163   }
1164   /* Condition to send TRANS_SOFT_ABORT */
1165   if((v_matchlock > 0 && v_nomatch == 0) || (numoidnotfound > 0 && v_nomatch == 0)) {
1166     *getReplyCtrl = TRANS_SOFT_ABORT;
1167   }
1168 }
1169
1170 void doLocalProcess(char finalResponse, trans_req_data_t *tdata, trans_commit_data_t *transinfo) {
1171   if(finalResponse == TRANS_ABORT) {
1172     if(transAbortProcess(transinfo) != 0) {
1173       printf("Error in transAbortProcess() %s,%d\n", __FILE__, __LINE__);
1174       fflush(stdout);
1175       return;
1176     }
1177   } else if(finalResponse == TRANS_COMMIT) {
1178 #ifdef CACHE
1179     /* Invalidate objects in other machine cache */
1180     if(tdata->f.nummod > 0) {
1181       int retval;
1182       if((retval = invalidateObj(tdata)) != 0) {
1183         printf("Error: %s() in invalidating Objects %s, %d\n", __func__, __FILE__, __LINE__);
1184         return;
1185       }
1186     }
1187 #endif
1188     if(transComProcess(tdata, transinfo) != 0) {
1189       printf("Error in transComProcess() %s,%d\n", __FILE__, __LINE__);
1190       fflush(stdout);
1191       return;
1192     }
1193   } else {
1194     printf("ERROR...No Decision\n");
1195   }
1196
1197   /* Free memory */
1198   if (transinfo->objlocked != NULL) {
1199     free(transinfo->objlocked);
1200   }
1201   if (transinfo->objnotfound != NULL) {
1202     free(transinfo->objnotfound);
1203   }
1204 }
1205
1206 /* This function decides the reponse that needs to be sent to
1207  * all Participant machines after the TRANS_REQUEST protocol */
1208 char decideResponse(char *getReplyCtrl, char *treplyretry, int pilecount) {
1209   int i, transagree = 0, transdisagree = 0, transsoftabort = 0; /* Counters to formulate decision of what
1210                                                                    message to send */
1211   for (i = 0 ; i < pilecount; i++) {
1212     char control;
1213     control = getReplyCtrl[i];
1214     switch(control) {
1215     default:
1216       printf("Participant sent unknown message in %s, %d\n", __FILE__, __LINE__);
1217
1218       /* treat as disagree, pass thru */
1219     case TRANS_DISAGREE:
1220       transdisagree++;
1221       break;
1222
1223     case TRANS_AGREE:
1224       transagree++;
1225       break;
1226
1227     case TRANS_SOFT_ABORT:
1228       transsoftabort++;
1229       break;
1230     }
1231   }
1232
1233   if(transdisagree > 0) {
1234     /* Send Abort */
1235     *treplyretry = 0;
1236     return TRANS_ABORT;
1237 #ifdef CACHE
1238     /* clear objects from prefetch cache */
1239     cleanPCache();
1240 #endif
1241   } else if(transagree == pilecount) {
1242     /* Send Commit */
1243     *treplyretry = 0;
1244     return TRANS_COMMIT;
1245   } else {
1246     /* Send Abort in soft abort case followed by retry commiting transaction again*/
1247     *treplyretry = 1;
1248     return TRANS_ABORT;
1249   }
1250   return 0;
1251 }
1252
1253 /* This function opens a connection, places an object read request to
1254  * the remote machine, reads the control message and object if
1255  * available and copies the object and its header to the local
1256  * cache. */
1257
1258 void *getRemoteObj(unsigned int mnum, unsigned int oid) {
1259   int size, val;
1260   struct sockaddr_in serv_addr;
1261   char machineip[16];
1262   char control;
1263   objheader_t *h;
1264   void *objcopy = NULL;
1265
1266   int sd = getSock2(transReadSockPool, mnum);
1267   char readrequest[sizeof(char)+sizeof(unsigned int)];
1268   readrequest[0] = READ_REQUEST;
1269   *((unsigned int *)(&readrequest[1])) = oid;
1270   send_data(sd, readrequest, sizeof(readrequest));
1271
1272   /* Read response from the Participant */
1273   recv_data(sd, &control, sizeof(char));
1274
1275   if (control==OBJECT_NOT_FOUND) {
1276     objcopy = NULL;
1277   } else {
1278     /* Read object if found into local cache */
1279     recv_data(sd, &size, sizeof(int));
1280     objcopy = objstrAlloc(&t_cache, size);
1281     recv_data(sd, objcopy, size);
1282     STATUS(objcopy)=0;
1283     /* Insert into cache's lookup table */
1284     t_chashInsert(oid, objcopy);
1285 #ifdef TRANSSTATS
1286     totalObjSize += size;
1287 #endif
1288   }
1289
1290   return objcopy;
1291 }
1292
1293 /*  Commit info for objects modified */
1294 void commitCountForObjMod(char *getReplyCtrl, unsigned int *oidnotfound, unsigned int *oidlocked, int *numoidnotfound,
1295                           int *numoidlocked, int *v_nomatch, int *v_matchlock, int *v_matchnolock, unsigned int oid, unsigned short version) {
1296   void *mobj;
1297   /* Check if object is still present in the machine since the beginning of TRANS_REQUEST */
1298   /* Save the oids not found and number of oids not found for later use */
1299   if ((mobj = mhashSearch(oid)) == NULL) { /* Obj not found */
1300     /* Save the oids not found and number of oids not found for later use */
1301     oidnotfound[*numoidnotfound] = oid;
1302     (*numoidnotfound)++;
1303   } else { /* If Obj found in machine (i.e. has not moved) */
1304     /* Check if Obj is locked by any previous transaction */
1305     if (write_trylock(STATUSPTR(mobj))) { // Can acquire write lock
1306       if (version == ((objheader_t *)mobj)->version) {      /* match versions */
1307         (*v_matchnolock)++;
1308         //Keep track of what is locked
1309         oidlocked[(*numoidlocked)++] = OID(((objheader_t *)mobj));
1310       } else { /* If versions don't match ...HARD ABORT */
1311         (*v_nomatch)++;
1312         /* Send TRANS_DISAGREE to Coordinator */
1313         *getReplyCtrl = TRANS_DISAGREE;
1314
1315         //Keep track of what is locked
1316         oidlocked[(*numoidlocked)++] = OID(((objheader_t *)mobj));
1317         //printf("%s() oid = %d, type = %d\t", __func__, OID(mobj), TYPE((objheader_t *)mobj));
1318         return;
1319       }
1320     } else { //A lock is acquired some place else
1321       if (version == ((objheader_t *)mobj)->version) { /* Check if versions match */
1322         (*v_matchlock)++;
1323       } else { /* If versions don't match ...HARD ABORT */
1324         (*v_nomatch)++;
1325         /* Send TRANS_DISAGREE to Coordinator */
1326         *getReplyCtrl = TRANS_DISAGREE;
1327         //printf("%s() oid = %d, type = %d\t", __func__, OID(mobj), TYPE((objheader_t *)mobj));
1328         return;
1329       }
1330     }
1331   }
1332 }
1333
1334 /*  Commit info for objects modified */
1335 void commitCountForObjRead(char *getReplyCtrl, unsigned int *oidnotfound, unsigned int *oidlocked, int *numoidnotfound,
1336                            int *numoidlocked, int *v_nomatch, int *v_matchlock, int *v_matchnolock, unsigned int oid, unsigned short version) {
1337   void *mobj;
1338   /* Check if object is still present in the machine since the beginning of TRANS_REQUEST */
1339   /* Save the oids not found and number of oids not found for later use */
1340   if ((mobj = mhashSearch(oid)) == NULL) { /* Obj not found */
1341     /* Save the oids not found and number of oids not found for later use */
1342     oidnotfound[*numoidnotfound] = oid;
1343     (*numoidnotfound)++;
1344   } else { /* If Obj found in machine (i.e. has not moved) */
1345     /* Check if Obj is locked by any previous transaction */
1346     if (read_trylock(STATUSPTR(mobj))) { // Can further acquire read locks
1347       if (version == ((objheader_t *)mobj)->version) {      /* If locked then match versions */
1348         (*v_matchnolock)++;
1349         //Keep track of what is locked
1350         oidlocked[(*numoidlocked)++] = OID(((objheader_t *)mobj));
1351       } else { /* If versions don't match ...HARD ABORT */
1352         (*v_nomatch)++;
1353         /* Send TRANS_DISAGREE to Coordinator */
1354         *getReplyCtrl = TRANS_DISAGREE;
1355         //Keep track of what is locked
1356         oidlocked[(*numoidlocked)++] = OID(((objheader_t *)mobj));
1357         //printf("%s() oid = %d, type = %d\t", __func__, OID(mobj), TYPE((objheader_t *)mobj));
1358         return;
1359       }
1360     } else { //Has reached max number of readers or some other transaction
1361       //has acquired a lock on this object
1362       if (version == ((objheader_t *)mobj)->version) { /* Check if versions match */
1363         (*v_matchlock)++;
1364       } else { /* If versions don't match ...HARD ABORT */
1365         (*v_nomatch)++;
1366         /* Send TRANS_DISAGREE to Coordinator */
1367         *getReplyCtrl = TRANS_DISAGREE;
1368         //printf("%s() oid = %d, type = %d\t", __func__, OID(mobj), TYPE((objheader_t *)mobj));
1369         return;
1370       }
1371     }
1372   }
1373 }
1374
1375 /* This function completes the ABORT process if the transaction is aborting */
1376 int transAbortProcess(trans_commit_data_t *transinfo) {
1377   int i, numlocked;
1378   unsigned int *objlocked;
1379   void *header;
1380
1381   numlocked = transinfo->numlocked;
1382   objlocked = transinfo->objlocked;
1383
1384   int useWriteUnlock = 0;
1385   for (i = 0; i < numlocked; i++) {
1386     if(objlocked[i] == -1) {
1387       useWriteUnlock = 1;
1388       continue;
1389     }
1390     if((header = mhashSearch(objlocked[i])) == NULL) {
1391       printf("mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__);
1392       return 1;
1393     }
1394     if(!useWriteUnlock) {
1395       read_unlock(STATUSPTR(header));
1396     } else {
1397       write_unlock(STATUSPTR(header));
1398     }
1399   }
1400
1401   return 0;
1402 }
1403
1404 /*This function completes the COMMIT process if the transaction is commiting*/
1405 int transComProcess(trans_req_data_t *tdata, trans_commit_data_t *transinfo) {
1406   objheader_t *header, *tcptr;
1407   int i, nummod, tmpsize, numcreated, numlocked;
1408   unsigned int *oidmod, *oidcreated, *oidlocked;
1409   void *ptrcreate;
1410
1411   nummod = tdata->f.nummod;
1412   oidmod = tdata->oidmod;
1413   numcreated = tdata->f.numcreated;
1414   oidcreated = tdata->oidcreated;
1415   numlocked = transinfo->numlocked;
1416   oidlocked = transinfo->objlocked;
1417
1418   for (i = 0; i < nummod; i++) {
1419     if((header = (objheader_t *) mhashSearch(oidmod[i])) == NULL) {
1420       printf("Error: transComProcess() mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__);
1421       return 1;
1422     }
1423     /* Copy from transaction cache -> main object store */
1424     if ((tcptr = ((objheader_t *) t_chashSearch(oidmod[i]))) == NULL) {
1425       printf("Error: transComProcess() chashSearch returned NULL at %s, %d\n", __FILE__, __LINE__);
1426       return 1;
1427     }
1428     GETSIZE(tmpsize, header);
1429     char *tmptcptr = (char *) tcptr;
1430     {
1431       struct ___Object___ *dst=(struct ___Object___*)((char*)header+sizeof(objheader_t));
1432       struct ___Object___ *src=(struct ___Object___*)((char*)tmptcptr+sizeof(objheader_t));
1433       dst->___cachedCode___=src->___cachedCode___;
1434       dst->___cachedHash___=src->___cachedHash___;
1435
1436       memcpy(&dst[1], &src[1], tmpsize-sizeof(struct ___Object___));
1437     }
1438
1439     header->version += 1;
1440     if(header->notifylist != NULL) {
1441       notifyAll(&header->notifylist, OID(header), header->version);
1442     }
1443   }
1444   /* If object is newly created inside transaction then commit it */
1445   for (i = 0; i < numcreated; i++) {
1446     if ((header = ((objheader_t *) t_chashSearch(oidcreated[i]))) == NULL) {
1447       printf("Error: transComProcess() chashSearch returned NULL for oid = %x at %s, %d\n", oidcreated[i], __FILE__, __LINE__);
1448       return 1;
1449     }
1450     GETSIZE(tmpsize, header);
1451     tmpsize += sizeof(objheader_t);
1452     pthread_mutex_lock(&mainobjstore_mutex);
1453     if ((ptrcreate = objstrAlloc(&mainobjstore, tmpsize)) == NULL) {
1454       printf("Error: transComProcess() failed objstrAlloc %s, %d\n", __FILE__, __LINE__);
1455       pthread_mutex_unlock(&mainobjstore_mutex);
1456       return 1;
1457     }
1458     pthread_mutex_unlock(&mainobjstore_mutex);
1459     /* Initialize read and write locks */
1460     initdsmlocks(STATUSPTR(header));
1461     memcpy(ptrcreate, header, tmpsize);
1462     mhashInsert(oidcreated[i], ptrcreate);
1463     lhashInsert(oidcreated[i], myIpAddr);
1464   }
1465   /* Unlock locked objects */
1466   int useWriteUnlock = 0;
1467   for(i = 0; i < numlocked; i++) {
1468     if(oidlocked[i] == -1) {
1469       useWriteUnlock = 1;
1470       continue;
1471     }
1472     if((header = (objheader_t *) mhashSearch(oidlocked[i])) == NULL) {
1473       printf("mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__);
1474       return 1;
1475     }
1476     if(!useWriteUnlock) {
1477       read_unlock(STATUSPTR(header));
1478     } else {
1479       write_unlock(STATUSPTR(header));
1480     }
1481   }
1482   return 0;
1483 }
1484
1485 prefetchpile_t *foundLocal(char *ptr, int numprefetches) {
1486   int i;
1487   int j;
1488   prefetchpile_t * head=NULL;
1489
1490   for(j=0;j<numprefetches;j++) {
1491     int siteid = *(GET_SITEID(ptr));
1492     int ntuples = *(GET_NTUPLES(ptr));
1493     unsigned int * oidarray = GET_PTR_OID(ptr);
1494     unsigned short * endoffsets = GET_PTR_EOFF(ptr, ntuples);
1495     short * arryfields = GET_PTR_ARRYFLD(ptr, ntuples);
1496     int numLocal = 0;
1497     
1498     for(i=0; i<ntuples; i++) {
1499       unsigned short baseindex=(i==0) ? 0 : endoffsets[i-1];
1500       unsigned short endindex=endoffsets[i];
1501       unsigned int oid=oidarray[i];
1502       int newbase;
1503       int machinenum;
1504       
1505       if (oid==0)
1506         continue;
1507       //Look up fields locally
1508       for(newbase=baseindex; newbase<endindex; newbase++) {
1509         if (!lookupObject(&oid, arryfields[newbase]))
1510           break;
1511         //Ended in a null pointer...
1512         if (oid==0)
1513           goto tuple;
1514       }
1515       //Entire prefetch is local
1516       if (newbase==endindex&&checkoid(oid)) {
1517         numLocal++;
1518         goto tuple;
1519       }
1520       //Add to remote requests
1521       machinenum=lhashSearch(oid);
1522       insertPile(machinenum, oid, endindex-newbase, &arryfields[newbase], &head);
1523     tuple:
1524       ;
1525     }
1526     
1527     /* handle dynamic prefetching */
1528     handleDynPrefetching(numLocal, ntuples, siteid);
1529     ptr=((char *)&arryfields[endoffsets[ntuples-1]])+sizeof(int);
1530   }
1531
1532   return head;
1533 }
1534
1535 int checkoid(unsigned int oid) {
1536   objheader_t *header;
1537   if ((header=mhashSearch(oid))!=NULL) {
1538     //Found on machine
1539     return 1;
1540   } else if ((header=prehashSearch(oid))!=NULL) {
1541     //Found in cache
1542     return 1;
1543   } else {
1544     return 0;
1545   }
1546 }
1547
1548 int lookupObject(unsigned int * oid, short offset) {
1549   objheader_t *header;
1550   if ((header=mhashSearch(*oid))!=NULL) {
1551     //Found on machine
1552     ;
1553   } else if ((header=prehashSearch(*oid))!=NULL) {
1554     //Found in cache
1555     ;
1556   } else {
1557     return 0;
1558   }
1559
1560   if(TYPE(header) >= NUMCLASSES) {
1561     int elementsize = classsize[TYPE(header)];
1562     struct ArrayObject *ao = (struct ArrayObject *) (((char *)header) + sizeof(objheader_t));
1563     int length = ao->___length___;
1564     /* Check if array out of bounds */
1565     if(offset < 0 || offset >= length) {
1566       //if yes treat the object as found
1567       (*oid)=0;
1568       return 1;
1569     }
1570     (*oid) = *((unsigned int *)(((char *)ao) + sizeof(struct ArrayObject) + (elementsize*offset)));
1571     return 1;
1572   } else {
1573     (*oid) = *((unsigned int *)(((char *)header) + sizeof(objheader_t) + offset));
1574     return 1;
1575   }
1576 }
1577
1578
1579 /* This function is called by the thread calling transPrefetch */
1580 void *transPrefetch(void *t) {
1581   while(1) {
1582     /* read from prefetch queue */
1583     void *node=gettail();
1584     /* Check if the tuples are found locally, if yes then reduce them further*/
1585     /* and group requests by remote machine ids by calling the makePreGroups() */
1586     int count=numavailable();
1587     prefetchpile_t *pilehead = foundLocal(node, count);
1588
1589     if (pilehead!=NULL) {
1590       // Get sock from shared pool
1591
1592       /* Send  Prefetch Request */
1593       prefetchpile_t *ptr = pilehead;
1594       while(ptr != NULL) {
1595         int sd = getSock2(transPrefetchSockPool, ptr->mid);
1596         sendPrefetchReq(ptr, sd);
1597         ptr = ptr->next;
1598       }
1599
1600       /* Release socket */
1601       //        freeSock(transPrefetchSockPool, pilehead->mid, sd);
1602
1603       /* Deallocated pilehead */
1604       mcdealloc(pilehead);
1605     }
1606     // Deallocate the prefetch queue pile node
1607     incmulttail(count);
1608   }
1609 }
1610
1611 void sendPrefetchReqnew(prefetchpile_t *mcpilenode, int sd) {
1612   objpile_t *tmp;
1613
1614   int size=sizeof(char)+sizeof(int);
1615   for(tmp=mcpilenode->objpiles; tmp!=NULL; tmp=tmp->next) {
1616     size += sizeof(int) + sizeof(unsigned int) + sizeof(unsigned int) + ((tmp->numoffset) * sizeof(short));
1617   }
1618
1619   char buft[size];
1620   char *buf=buft;
1621   *buf=TRANS_PREFETCH;
1622   buf+=sizeof(char);
1623
1624   for(tmp=mcpilenode->objpiles; tmp!=NULL; tmp=tmp->next) {
1625     int len = sizeof(int) + sizeof(unsigned int) + sizeof(unsigned int) + ((tmp->numoffset) * sizeof(short));
1626     *((int*)buf)=len;
1627     buf+=sizeof(int);
1628     *((unsigned int *)buf)=tmp->oid;
1629     buf+=sizeof(unsigned int);
1630     *((unsigned int *)(buf)) = myIpAddr;
1631     buf+=sizeof(unsigned int);
1632     memcpy(buf, tmp->offset, tmp->numoffset*sizeof(short));
1633     buf+=tmp->numoffset*sizeof(short);
1634   }
1635   *((int *)buf)=-1;
1636   send_data(sd, buft, size);
1637   return;
1638 }
1639
1640 void sendPrefetchReq(prefetchpile_t *mcpilenode, int sd) {
1641   int len, endpair;
1642   char control;
1643   objpile_t *tmp;
1644   struct writestruct writebuffer;
1645   writebuffer.offset=0;
1646
1647   /* Send TRANS_PREFETCH control message */
1648   int first=1;
1649
1650   /* Send Oids and offsets in pairs */
1651   tmp = mcpilenode->objpiles;
1652   while(tmp != NULL) {
1653     len = sizeof(int) + sizeof(unsigned int) + sizeof(unsigned int) + ((tmp->numoffset) * sizeof(short));
1654     char oidnoffset[len+5];
1655     char *buf=oidnoffset;
1656     if (first) {
1657       *buf=TRANS_PREFETCH;
1658       buf++;len++;
1659       first=0;
1660     }
1661     *((int*)buf) = tmp->numoffset;
1662     buf+=sizeof(int);
1663     *((unsigned int *)buf) = tmp->oid;
1664 #ifdef TRANSSTATS
1665     sendRemoteReq++;
1666 #endif
1667     buf+=sizeof(unsigned int);
1668     *((unsigned int *)buf) = myIpAddr;
1669     buf += sizeof(unsigned int);
1670     memcpy(buf, tmp->offset, (tmp->numoffset)*sizeof(short));
1671     tmp = tmp->next;
1672     if (tmp==NULL) {
1673       *((int *)(&oidnoffset[len]))=-1;
1674       len+=sizeof(int);
1675     }
1676     if (tmp!=NULL)
1677       send_buf(sd, & writebuffer, oidnoffset, len);
1678     else
1679       forcesend_buf(sd, & writebuffer, oidnoffset, len);
1680   }
1681
1682   LOGEVENT('S');
1683   return;
1684 }
1685
1686 int getPrefetchResponse(int sd, struct readstruct *readbuffer) {
1687   int length = 0, size = 0;
1688   char control;
1689   unsigned int oid;
1690   void *modptr, *oldptr;
1691
1692   recv_data_buf(sd, readbuffer, &length, sizeof(int));
1693   size = length - sizeof(int);
1694   char recvbuffer[size];
1695 #ifdef TRANSSTATS
1696     getResponse++;
1697     LOGEVENT('Z');
1698 #endif
1699     recv_data_buf(sd, readbuffer, recvbuffer, size);
1700   control = *((char *) recvbuffer);
1701   if(control == OBJECT_FOUND) {
1702     oid = *((unsigned int *)(recvbuffer + sizeof(char)));
1703     size = size - (sizeof(char) + sizeof(unsigned int));
1704     pthread_mutex_lock(&prefetchcache_mutex);
1705     if ((modptr = prefetchobjstrAlloc(size)) == NULL) {
1706       printf("Error: objstrAlloc error for copying into prefetch cache %s, %d\n", __FILE__, __LINE__);
1707       pthread_mutex_unlock(&prefetchcache_mutex);
1708       return -1;
1709     }
1710     pthread_mutex_unlock(&prefetchcache_mutex);
1711     memcpy(modptr, recvbuffer + sizeof(char) + sizeof(unsigned int), size);
1712     STATUS(modptr)=0;
1713
1714     /* Insert the oid and its address into the prefetch hash lookup table */
1715     /* Do a version comparison if the oid exists */
1716     if((oldptr = prehashSearch(oid)) != NULL) {
1717       /* If older version then update with new object ptr */
1718       if(((objheader_t *)oldptr)->version <= ((objheader_t *)modptr)->version) {
1719         prehashRemove(oid);
1720         prehashInsert(oid, modptr);
1721       }
1722     } else { /* Else add the object ptr to hash table*/
1723       prehashInsert(oid, modptr);
1724     }
1725     /* Lock the Prefetch Cache look up table*/
1726     pthread_mutex_lock(&pflookup.lock);
1727     /* Broadcast signal on prefetch cache condition variable */
1728     pthread_cond_broadcast(&pflookup.cond);
1729     /* Unlock the Prefetch Cache look up table*/
1730     pthread_mutex_unlock(&pflookup.lock);
1731   } else if(control == OBJECT_NOT_FOUND) {
1732     oid = *((unsigned int *)(recvbuffer + sizeof(char)));
1733     /* TODO: For each object not found query DHT for new location and retrieve the object */
1734     /* Throw an error */
1735     //printf("OBJECT %x NOT FOUND.... THIS SHOULD NOT HAPPEN...TERMINATE PROGRAM\n", oid);
1736     //    exit(-1);
1737   } else {
1738     printf("Error: in decoding the control value %d, %s, %d\n",control, __FILE__, __LINE__);
1739   }
1740
1741   return 0;
1742 }
1743
1744 unsigned short getObjType(unsigned int oid) {
1745   objheader_t *objheader;
1746   unsigned short numoffset[] ={0};
1747   short fieldoffset[] ={};
1748
1749   if ((objheader = (objheader_t *) mhashSearch(oid)) == NULL) {
1750 #ifdef CACHE
1751     if ((objheader = (objheader_t *) prehashSearch(oid)) == NULL) {
1752 #endif
1753     unsigned int mid = lhashSearch(oid);
1754     int sd = getSock2(transReadSockPool, mid);
1755     char remotereadrequest[sizeof(char)+sizeof(unsigned int)];
1756     remotereadrequest[0] = READ_REQUEST;
1757     *((unsigned int *)(&remotereadrequest[1])) = oid;
1758     send_data(sd, remotereadrequest, sizeof(remotereadrequest));
1759
1760     /* Read response from the Participant */
1761     char control;
1762     recv_data(sd, &control, sizeof(char));
1763
1764     if (control==OBJECT_NOT_FOUND) {
1765       printf("Error: in %s() THIS SHOULD NOT HAPPEN.....EXIT PROGRAM\n", __func__);
1766       fflush(stdout);
1767       exit(-1);
1768     } else {
1769       /* Read object if found into local cache */
1770       int size;
1771       recv_data(sd, &size, sizeof(int));
1772 #ifdef CACHE
1773       pthread_mutex_lock(&prefetchcache_mutex);
1774       if ((objheader = prefetchobjstrAlloc(size)) == NULL) {
1775         printf("Error: %s() objstrAlloc error for copying into prefetch cache %s, %d\n", __func__, __FILE__, __LINE__);
1776         pthread_exit(NULL);
1777       }
1778       pthread_mutex_unlock(&prefetchcache_mutex);
1779       recv_data(sd, objheader, size);
1780       prehashInsert(oid, objheader);
1781       return TYPE(objheader);
1782 #else
1783       char *buffer;
1784       if((buffer = calloc(1, size)) == NULL) {
1785         printf("%s() Calloc Error %s at line %d\n", __func__, __FILE__, __LINE__);
1786         fflush(stdout);
1787         return 0;
1788       }
1789       recv_data(sd, buffer, size);
1790       objheader = (objheader_t *)buffer;
1791       unsigned short type = TYPE(objheader);
1792       free(buffer);
1793       return type;
1794 #endif
1795     }
1796 #ifdef CACHE
1797   }
1798 #endif
1799   }
1800   return TYPE(objheader);
1801 }
1802
1803 int startRemoteThread(unsigned int oid, unsigned int mid) {
1804   int sock;
1805   struct sockaddr_in remoteAddr;
1806   char msg[1 + sizeof(unsigned int)];
1807   int bytesSent;
1808   int status;
1809
1810   if ((sock = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
1811     perror("startRemoteThread():socket()");
1812     return -1;
1813   }
1814
1815   bzero(&remoteAddr, sizeof(remoteAddr));
1816   remoteAddr.sin_family = AF_INET;
1817   remoteAddr.sin_port = htons(LISTEN_PORT);
1818   remoteAddr.sin_addr.s_addr = htonl(mid);
1819
1820   if (connect(sock, (struct sockaddr *)&remoteAddr, sizeof(remoteAddr)) < 0) {
1821     printf("startRemoteThread():error %d connecting to %s:%d\n", errno,
1822            inet_ntoa(remoteAddr.sin_addr), LISTEN_PORT);
1823     status = -1;
1824   } else
1825   {
1826     msg[0] = START_REMOTE_THREAD;
1827     *((unsigned int *) &msg[1]) = oid;
1828     send_data(sock, msg, 1 + sizeof(unsigned int));
1829   }
1830
1831   close(sock);
1832   return status;
1833 }
1834
1835 //TODO: when reusing oids, make sure they are not already in use!
1836 static unsigned int id = 0xFFFFFFFF;
1837 unsigned int getNewOID(void) {
1838   id += 2;
1839   if (id > oidMax || id < oidMin) {
1840     id = (oidMin | 1);
1841   }
1842   return id;
1843 }
1844
1845 int processConfigFile() {
1846   FILE *configFile;
1847   const int maxLineLength = 200;
1848   char lineBuffer[maxLineLength];
1849   char *token;
1850   const char *delimiters = " \t\n";
1851   char *commentBegin;
1852   in_addr_t tmpAddr;
1853
1854   configFile = fopen(CONFIG_FILENAME, "r");
1855   if (configFile == NULL) {
1856     printf("error opening %s:\n", CONFIG_FILENAME);
1857     perror("");
1858     return -1;
1859   }
1860
1861   numHostsInSystem = 0;
1862   sizeOfHostArray = 8;
1863   hostIpAddrs = calloc(sizeOfHostArray, sizeof(unsigned int));
1864
1865   while(fgets(lineBuffer, maxLineLength, configFile) != NULL) {
1866     commentBegin = strchr(lineBuffer, '#');
1867     if (commentBegin != NULL)
1868       *commentBegin = '\0';
1869     token = strtok(lineBuffer, delimiters);
1870     while (token != NULL) {
1871       tmpAddr = inet_addr(token);
1872       if ((int)tmpAddr == -1) {
1873         printf("error in %s: bad token:%s\n", CONFIG_FILENAME, token);
1874         fclose(configFile);
1875         return -1;
1876       } else
1877         addHost(htonl(tmpAddr));
1878       token = strtok(NULL, delimiters);
1879     }
1880   }
1881
1882   fclose(configFile);
1883
1884   if (numHostsInSystem < 1) {
1885     printf("error in %s: no IP Adresses found\n", CONFIG_FILENAME);
1886     return -1;
1887   }
1888 #ifdef MAC
1889   myIpAddr = getMyIpAddr("en1");
1890 #else
1891   myIpAddr = getMyIpAddr("eth0");
1892 #endif
1893   myIndexInHostArray = findHost(myIpAddr);
1894   if (myIndexInHostArray == -1) {
1895     printf("error in %s: IP Address of eth0 not found\n", CONFIG_FILENAME);
1896     return -1;
1897   }
1898   oidsPerBlock = (0xFFFFFFFF / numHostsInSystem) + 1;
1899   oidMin = oidsPerBlock * myIndexInHostArray;
1900   if (myIndexInHostArray == numHostsInSystem - 1)
1901     oidMax = 0xFFFFFFFF;
1902   else
1903     oidMax = oidsPerBlock * (myIndexInHostArray + 1) - 1;
1904
1905   return 0;
1906 }
1907
1908 void addHost(unsigned int hostIp) {
1909   unsigned int *tmpArray;
1910
1911   if (findHost(hostIp) != -1)
1912     return;
1913
1914   if (numHostsInSystem == sizeOfHostArray) {
1915     tmpArray = calloc(sizeOfHostArray * 2, sizeof(unsigned int));
1916     memcpy(tmpArray, hostIpAddrs, sizeof(unsigned int) * numHostsInSystem);
1917     free(hostIpAddrs);
1918     hostIpAddrs = tmpArray;
1919   }
1920
1921   hostIpAddrs[numHostsInSystem++] = hostIp;
1922
1923   return;
1924 }
1925
1926 int findHost(unsigned int hostIp) {
1927   int i;
1928   for (i = 0; i < numHostsInSystem; i++)
1929     if (hostIpAddrs[i] == hostIp)
1930       return i;
1931
1932   //not found
1933   return -1;
1934 }
1935
1936 /* This function sends notification request per thread waiting on object(s) whose version
1937  * changes */
1938 int reqNotify(unsigned int *oidarry, unsigned short *versionarry, unsigned int numoid) {
1939   int sock,i;
1940   objheader_t *objheader;
1941   struct sockaddr_in remoteAddr;
1942   char msg[1 + numoid * (sizeof(unsigned short) + sizeof(unsigned int)) +  3 * sizeof(unsigned int)];
1943   char *ptr;
1944   int bytesSent;
1945   int status, size;
1946   unsigned short version;
1947   unsigned int oid,mid;
1948   static unsigned int threadid = 0;
1949   pthread_mutex_t threadnotify = PTHREAD_MUTEX_INITIALIZER; //Lock and condition var for threadjoin and notification
1950   pthread_cond_t threadcond = PTHREAD_COND_INITIALIZER;
1951   notifydata_t *ndata;
1952
1953   oid = oidarry[0];
1954   if((mid = lhashSearch(oid)) == 0) {
1955     printf("Error: %s() No such machine found for oid =%x\n",__func__, oid);
1956     return;
1957   }
1958
1959   if ((sock = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
1960     perror("reqNotify():socket()");
1961     return -1;
1962   }
1963
1964   bzero(&remoteAddr, sizeof(remoteAddr));
1965   remoteAddr.sin_family = AF_INET;
1966   remoteAddr.sin_port = htons(LISTEN_PORT);
1967   remoteAddr.sin_addr.s_addr = htonl(mid);
1968
1969   /* Generate unique threadid */
1970   threadid++;
1971
1972   /* Save threadid, numoid, oidarray, versionarray, pthread_cond_variable for later processing */
1973   if((ndata = calloc(1, sizeof(notifydata_t))) == NULL) {
1974     printf("Calloc Error %s, %d\n", __FILE__, __LINE__);
1975     return -1;
1976   }
1977   ndata->numoid = numoid;
1978   ndata->threadid = threadid;
1979   ndata->oidarry = oidarry;
1980   ndata->versionarry = versionarry;
1981   ndata->threadcond = threadcond;
1982   ndata->threadnotify = threadnotify;
1983   if((status = notifyhashInsert(threadid, ndata)) != 0) {
1984     printf("reqNotify(): Insert into notify hash table not successful %s, %d\n", __FILE__, __LINE__);
1985     free(ndata);
1986     return -1;
1987   }
1988
1989   /* Send  number of oids, oidarry, version array, machine id and threadid */
1990   if (connect(sock, (struct sockaddr *)&remoteAddr, sizeof(remoteAddr)) < 0) {
1991     printf("reqNotify():error %d connecting to %s:%d\n", errno,
1992            inet_ntoa(remoteAddr.sin_addr), LISTEN_PORT);
1993     free(ndata);
1994     return -1;
1995   } else {
1996     msg[0] = THREAD_NOTIFY_REQUEST;
1997     *((unsigned int *)(&msg[1])) = numoid;
1998     /* Send array of oids  */
1999     size = sizeof(unsigned int);
2000
2001     for(i = 0;i < numoid; i++) {
2002       oid = oidarry[i];
2003       *((unsigned int *)(&msg[1] + size)) = oid;
2004       size += sizeof(unsigned int);
2005     }
2006
2007     /* Send array of version  */
2008     for(i = 0;i < numoid; i++) {
2009       version = versionarry[i];
2010       *((unsigned short *)(&msg[1] + size)) = version;
2011       size += sizeof(unsigned short);
2012     }
2013
2014     *((unsigned int *)(&msg[1] + size)) = myIpAddr; size += sizeof(unsigned int);
2015     *((unsigned int *)(&msg[1] + size)) = threadid;
2016     pthread_mutex_lock(&(ndata->threadnotify));
2017     size = 1 + numoid * (sizeof(unsigned int) + sizeof(unsigned short)) + 3 * sizeof(unsigned int);
2018     send_data(sock, msg, size);
2019     pthread_cond_wait(&(ndata->threadcond), &(ndata->threadnotify));
2020     pthread_mutex_unlock(&(ndata->threadnotify));
2021   }
2022
2023   pthread_cond_destroy(&threadcond);
2024   pthread_mutex_destroy(&threadnotify);
2025   free(ndata);
2026   close(sock);
2027   return status;
2028 }
2029
2030 void threadNotify(unsigned int oid, unsigned short version, unsigned int tid) {
2031   notifydata_t *ndata;
2032   int i, objIsFound = 0, index;
2033   void *ptr;
2034
2035   //Look up the tid and call the corresponding pthread_cond_signal
2036   if((ndata = notifyhashSearch(tid)) == NULL) {
2037     printf("threadnotify(): No such threadid is present %s, %d\n", __FILE__, __LINE__);
2038     return;
2039   } else  {
2040     for(i = 0; i < ndata->numoid; i++) {
2041       if(ndata->oidarry[i] == oid) {
2042         objIsFound = 1;
2043         index = i;
2044       }
2045     }
2046     if(objIsFound == 0) {
2047       printf("threadNotify(): Oid not found %s, %d\n", __FILE__, __LINE__);
2048       return;
2049     } else {
2050       if(version <= ndata->versionarry[index]) {
2051         printf("threadNotify(): New version %d has not changed since last version for oid = %d, %s, %d\n", version, oid, __FILE__, __LINE__);
2052         return;
2053       } else {
2054 #ifdef CACHE
2055         /* Clear from prefetch cache and free thread related data structure */
2056         if((ptr = prehashSearch(oid)) != NULL) {
2057           prehashRemove(oid);
2058         }
2059 #endif
2060         pthread_mutex_lock(&(ndata->threadnotify));
2061         pthread_cond_signal(&(ndata->threadcond));
2062         pthread_mutex_unlock(&(ndata->threadnotify));
2063       }
2064     }
2065   }
2066   return;
2067 }
2068
2069 int notifyAll(threadlist_t **head, unsigned int oid, unsigned int version) {
2070   threadlist_t *ptr;
2071   unsigned int mid;
2072   struct sockaddr_in remoteAddr;
2073   char msg[1 + sizeof(unsigned short) + 2*sizeof(unsigned int)];
2074   int sock, status, size, bytesSent;
2075
2076   while(*head != NULL) {
2077     ptr = *head;
2078     mid = ptr->mid;
2079     //create a socket connection to that machine
2080     if ((sock = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
2081       perror("notifyAll():socket()");
2082       return -1;
2083     }
2084
2085     bzero(&remoteAddr, sizeof(remoteAddr));
2086     remoteAddr.sin_family = AF_INET;
2087     remoteAddr.sin_port = htons(LISTEN_PORT);
2088     remoteAddr.sin_addr.s_addr = htonl(mid);
2089     //send Thread Notify response and threadid to that machine
2090     if (connect(sock, (struct sockaddr *)&remoteAddr, sizeof(remoteAddr)) < 0) {
2091       printf("notifyAll():error %d connecting to %s:%d\n", errno,
2092              inet_ntoa(remoteAddr.sin_addr), LISTEN_PORT);
2093       fflush(stdout);
2094       status = -1;
2095     } else {
2096       bzero(msg, (1+sizeof(unsigned short) + 2*sizeof(unsigned int)));
2097       msg[0] = THREAD_NOTIFY_RESPONSE;
2098       *((unsigned int *)&msg[1]) = oid;
2099       size = sizeof(unsigned int);
2100       *((unsigned short *)(&msg[1]+ size)) = version;
2101       size+= sizeof(unsigned short);
2102       *((unsigned int *)(&msg[1]+ size)) = ptr->threadid;
2103
2104       size = 1 + 2*sizeof(unsigned int) + sizeof(unsigned short);
2105       send_data(sock, msg, size);
2106     }
2107     //close socket
2108     close(sock);
2109     // Update head
2110     *head = ptr->next;
2111     free(ptr);
2112   }
2113   return status;
2114 }
2115
2116 void transAbort() {
2117 #ifdef ABORTREADERS
2118   removetransactionhash();
2119 #endif
2120   objstrDelete(t_cache);
2121   t_chashDelete();
2122 }
2123
2124 /* This function inserts necessary information into
2125  * a machine pile data structure */
2126 plistnode_t *pInsert(plistnode_t *pile, objheader_t *headeraddr, unsigned int mid, int num_objs) {
2127   plistnode_t *ptr, *tmp;
2128   int found = 0, offset = 0;
2129
2130   tmp = pile;
2131   //Add oid into a machine that is already present in the pile linked list structure
2132   while(tmp != NULL) {
2133     if (tmp->mid == mid) {
2134       int tmpsize;
2135
2136       if (STATUS(headeraddr) & NEW) {
2137         tmp->oidcreated[tmp->numcreated] = OID(headeraddr);
2138         tmp->numcreated++;
2139         GETSIZE(tmpsize, headeraddr);
2140         tmp->sum_bytes += sizeof(objheader_t) + tmpsize;
2141       } else if (STATUS(headeraddr) & DIRTY) {
2142         tmp->oidmod[tmp->nummod] = OID(headeraddr);
2143         tmp->nummod++;
2144         GETSIZE(tmpsize, headeraddr);
2145         tmp->sum_bytes += sizeof(objheader_t) + tmpsize;
2146       } else {
2147         offset = (sizeof(unsigned int) + sizeof(short)) * tmp->numread;
2148         *((unsigned int *)(((char *)tmp->objread) + offset))=OID(headeraddr);
2149         offset += sizeof(unsigned int);
2150         *((short *)(((char *)tmp->objread) + offset)) = headeraddr->version;
2151         tmp->numread++;
2152       }
2153       found = 1;
2154       break;
2155     }
2156     tmp = tmp->next;
2157   }
2158   //Add oid for any new machine
2159   if (!found) {
2160     int tmpsize;
2161     if((ptr = pCreate(num_objs)) == NULL) {
2162       return NULL;
2163     }
2164     ptr->mid = mid;
2165     if (STATUS(headeraddr) & NEW) {
2166       ptr->oidcreated[ptr->numcreated] = OID(headeraddr);
2167       ptr->numcreated++;
2168       GETSIZE(tmpsize, headeraddr);
2169       ptr->sum_bytes += sizeof(objheader_t) + tmpsize;
2170     } else if (STATUS(headeraddr) & DIRTY) {
2171       ptr->oidmod[ptr->nummod] = OID(headeraddr);
2172       ptr->nummod++;
2173       GETSIZE(tmpsize, headeraddr);
2174       ptr->sum_bytes += sizeof(objheader_t) + tmpsize;
2175     } else {
2176       *((unsigned int *)ptr->objread)=OID(headeraddr);
2177       offset = sizeof(unsigned int);
2178       *((short *)(((char *)ptr->objread) + offset)) = headeraddr->version;
2179       ptr->numread++;
2180     }
2181     ptr->next = pile;
2182     pile = ptr;
2183   }
2184
2185   /* Clear Flags */
2186   STATUS(headeraddr) =0;
2187
2188
2189   return pile;
2190 }
2191
2192 plistnode_t *sortPiles(plistnode_t *pileptr) {
2193   plistnode_t *head, *ptr, *tail;
2194   head = pileptr;
2195   ptr = pileptr;
2196   /* Get tail pointer */
2197   while(ptr!= NULL) {
2198     tail = ptr;
2199     ptr = ptr->next;
2200   }
2201   ptr = pileptr;
2202   plistnode_t *prev = pileptr;
2203   /* Arrange local machine processing at the end of the pile list */
2204   while(ptr != NULL) {
2205     if(ptr != tail) {
2206       if(ptr->mid == myIpAddr && (prev != pileptr)) {
2207         prev->next = ptr->next;
2208         ptr->next = NULL;
2209         tail->next = ptr;
2210         return pileptr;
2211       }
2212       if((ptr->mid == myIpAddr) && (prev == pileptr)) {
2213         prev = ptr->next;
2214         ptr->next = NULL;
2215         tail->next = ptr;
2216         return prev;
2217       }
2218       prev = ptr;
2219     }
2220     ptr = ptr->next;
2221   }
2222   return pileptr;
2223 }