binary exponential backoff for soft abort
[IRC.git] / Robust / src / Runtime / DSTM / interface / trans.c
1 #include "dstm.h"
2 #include "debugmacro.h"
3 #include "ip.h"
4 #include "machinepile.h"
5 #include "altmlookup.h"
6 #include "llookup.h"
7 #include "plookup.h"
8 #include "altprelookup.h"
9 #include "threadnotify.h"
10 #include "queue.h"
11 #include "addUdpEnhance.h"
12 #include "addPrefetchEnhance.h"
13 #include "gCollect.h"
14 #include "dsmlock.h"
15 #include "prefetch.h"
16 #ifdef COMPILER
17 #include "thread.h"
18 #endif
19 #ifdef ABORTREADERS
20 #include "abortreaders.h"
21 #endif
22 #include "trans.h"
23
24 #define NUM_THREADS 1
25 #define CONFIG_FILENAME "dstm.conf"
26
27 //#define LOGEVENTS //turn on Logging events
28 #ifdef LOGEVENTS
29 char bigarray[16*1024*1024];
30 int bigindex=0;
31 #define LOGEVENT(x) { \
32     int tmp=bigindex++;                         \
33     bigarray[tmp]=x;                            \
34   }
35 #else
36 #define LOGEVENT(x)
37 #endif
38
39 //#define LOGTIMES
40 #ifdef LOGTIMES
41 char bigarray1[6*1024*1024];
42 unsigned int bigarray2[6*1024*1024];
43 unsigned int bigarray3[6*1024*1024];
44 long long bigarray4[6*1024*1024];
45 int bigarray5[6*1024*1024];
46 int bigindex1=0;
47 #define LOGTIME(x,y,z,a,b) {\
48   int tmp=bigindex1; \
49   bigarray1[tmp]=x; \
50   bigarray2[tmp]=y; \
51   bigarray3[tmp]=z; \
52   bigarray4[tmp]=a; \
53   bigarray5[tmp]=b; \
54   bigindex1++; \
55 }
56 #else
57 #define LOGTIME(x,y,z,a,b)
58 #endif
59
60 /* Thread transaction variables */
61
62 __thread objstr_t *t_cache;
63 __thread struct ___Object___ *revertlist;
64 #ifdef ABORTREADERS
65 __thread int t_abort;
66 __thread jmp_buf aborttrans;
67 #endif
68
69 int globalid=0; /* This variable is a unique global identifier for a sendPrefetch request */
70
71 /* Global Variables */
72 extern int classsize[];
73 pfcstats_t *evalPrefetch;
74 extern int numprefetchsites; //Global variable containing number of prefetch sites
75 extern pthread_mutex_t mainobjstore_mutex; // Mutex to lock main Object store
76 pthread_mutex_t prefetchcache_mutex; // Mutex to lock Prefetch Cache
77 pthread_mutexattr_t prefetchcache_mutex_attr; /* Attribute for lock to make it a recursive lock */
78 extern prehashtable_t pflookup; //Global Prefetch cache's lookup table
79 pthread_t wthreads[NUM_THREADS]; //Worker threads for working on the prefetch queue
80 pthread_t tPrefetch;            /* Primary Prefetch thread that processes the prefetch queue */
81 extern objstr_t *mainobjstore;
82 unsigned int myIpAddr;
83 unsigned int *hostIpAddrs;
84 int sizeOfHostArray;
85 int numHostsInSystem;
86 int myIndexInHostArray;
87 unsigned int oidsPerBlock;
88 unsigned int oidMin;
89 unsigned int oidMax;
90 sockPoolHashTable_t *transReadSockPool;
91 sockPoolHashTable_t *transPrefetchSockPool;
92 sockPoolHashTable_t *transRequestSockPool;
93 pthread_mutex_t notifymutex;
94 pthread_mutex_t atomicObjLock;
95 struct timespec exponential_backoff;
96 static int count_exponential_backoff = 0;
97 static const int max_exponential_backoff = 1000; // safety limit
98
99
100 /***********************************
101  * Global Variables for statistics
102  **********************************/
103 int numTransCommit = 0;
104 int numTransAbort = 0;
105 int nchashSearch = 0;
106 int nmhashSearch = 0;
107 int nprehashSearch = 0;
108 int ndirtyCacheObj = 0;
109 int nRemoteSend = 0;
110 int nSoftAbort = 0;
111 int bytesSent = 0;
112 int bytesRecv = 0;
113 int totalObjSize = 0;
114 int sendRemoteReq = 0;
115 int getResponse = 0;
116
117 void printhex(unsigned char *, int);
118 plistnode_t *createPiles();
119 plistnode_t *sortPiles(plistnode_t *pileptr);
120
121
122
123 /*******************************
124 * Send and Recv function calls
125 *******************************/
126 void send_data(int fd, void *buf, int buflen) {
127   char *buffer = (char *)(buf);
128   int size = buflen;
129   int numbytes;
130   while (size > 0) {
131     numbytes = send(fd, buffer, size, MSG_NOSIGNAL);
132     bytesSent = bytesSent + numbytes;
133     if (numbytes == -1) {
134       perror("send");
135       exit(0);
136     }
137     buffer += numbytes;
138     size -= numbytes;
139   }
140 }
141
142 void send_buf(int fd, struct writestruct * sendbuffer, void *buffer, int buflen) {
143   if (buflen+sendbuffer->offset>WMAXBUF) {
144     send_data(fd, sendbuffer->buf, sendbuffer->offset);
145     sendbuffer->offset=0;
146     send_data(fd, buffer, buflen);
147     return;
148   }
149   memcpy(&sendbuffer->buf[sendbuffer->offset], buffer, buflen);
150   sendbuffer->offset+=buflen;
151   if (sendbuffer->offset>WTOP) {
152     send_data(fd, sendbuffer->buf, sendbuffer->offset);
153     sendbuffer->offset=0;
154   }
155 }
156
157 void forcesend_buf(int fd, struct writestruct * sendbuffer, void *buffer, int buflen) {
158   if (buflen+sendbuffer->offset>WMAXBUF) {
159     send_data(fd, sendbuffer->buf, sendbuffer->offset);
160     sendbuffer->offset=0;
161     send_data(fd, buffer, buflen);
162     return;
163   }
164   memcpy(&sendbuffer->buf[sendbuffer->offset], buffer, buflen);
165   sendbuffer->offset+=buflen;
166   send_data(fd, sendbuffer->buf, sendbuffer->offset);
167   sendbuffer->offset=0;
168 }
169
170 int recvw(int fd, void *buf, int len, int flags) {
171   return recv(fd, buf, len, flags);
172 }
173  
174 void recv_data_buf(int fd, struct readstruct * readbuffer, void *buffer, int buflen) {
175   char *buf=(char *)buffer;
176   int numbytes=readbuffer->head-readbuffer->tail;
177   if (numbytes>buflen)
178     numbytes=buflen;
179   if (numbytes>0) {
180     memcpy(buf, &readbuffer->buf[readbuffer->tail], numbytes);
181     readbuffer->tail+=numbytes;
182     buflen-=numbytes;
183     buf+=numbytes;
184   }
185   if (buflen==0) {
186     return;
187   }
188   if (buflen>=MAXBUF) {
189     recv_data(fd, buf, buflen);
190     return;
191   }
192   
193   int maxbuf=MAXBUF;
194   int obufflen=buflen;
195   readbuffer->head=0;
196   
197   while (buflen > 0) {
198     int numbytes = recvw(fd, &readbuffer->buf[readbuffer->head], maxbuf, 0);
199     if (numbytes == -1) {
200       perror("recv");
201       exit(0);
202     }
203     bytesRecv+=numbytes;
204     buflen-=numbytes;
205     readbuffer->head+=numbytes;
206     maxbuf-=numbytes;
207   }
208   memcpy(buf,readbuffer->buf,obufflen);
209   readbuffer->tail=obufflen;
210 }
211
212 int recv_data_errorcode_buf(int fd, struct readstruct * readbuffer, void *buffer, int buflen) {
213   char *buf=(char *)buffer;
214   //now tail<=head
215   int numbytes=readbuffer->head-readbuffer->tail;
216   if (numbytes>buflen)
217     numbytes=buflen;
218   if (numbytes>0) {
219     memcpy(buf, &readbuffer->buf[readbuffer->tail], numbytes);
220     readbuffer->tail+=numbytes;
221     buflen-=numbytes;
222     buf+=numbytes;
223   }
224   if (buflen==0)
225     return 1;
226
227   if (buflen>=MAXBUF) {
228     return recv_data_errorcode(fd, buf, buflen);
229   }
230
231   int maxbuf=MAXBUF;
232   int obufflen=buflen;
233   readbuffer->head=0;
234   
235   while (buflen > 0) {
236     int numbytes = recvw(fd, &readbuffer->buf[readbuffer->head], maxbuf, 0);
237     if (numbytes ==0) {
238       return 0;
239     }
240     if (numbytes==-1) {
241       perror("recvbuf");
242       return -1;
243     }
244     bytesRecv+=numbytes;
245     buflen-=numbytes;
246     readbuffer->head+=numbytes;
247     maxbuf-=numbytes;
248   }
249   memcpy(buf,readbuffer->buf,obufflen);
250   readbuffer->tail=obufflen;
251   return 1;
252 }
253
254
255 void recv_data(int fd, void *buf, int buflen) {
256   char *buffer = (char *)(buf);
257   int size = buflen;
258   int numbytes;
259   while (size > 0) {
260     numbytes = recvw(fd, buffer, size, 0);
261     bytesRecv = bytesRecv + numbytes;
262     if (numbytes == -1) {
263       perror("recv");
264       exit(0);
265     }
266     buffer += numbytes;
267     size -= numbytes;
268   }
269 }
270
271 int recv_data_errorcode(int fd, void *buf, int buflen) {
272   char *buffer = (char *)(buf);
273   int size = buflen;
274   int numbytes;
275   while (size > 0) {
276     numbytes = recvw(fd, buffer, size, 0);
277     if (numbytes==0)
278       return 0;
279     if (numbytes == -1) {
280       perror("recv");
281       return -1;
282     }
283     bytesRecv+=numbytes;
284     buffer += numbytes;
285     size -= numbytes;
286   }
287   return 1;
288 }
289
290 void printhex(unsigned char *ptr, int numBytes) {
291   int i;
292   for (i = 0; i < numBytes; i++) {
293     if (ptr[i] < 16)
294       printf("0%x ", ptr[i]);
295     else
296       printf("%x ", ptr[i]);
297   }
298   printf("\n");
299   return;
300 }
301
302 inline int arrayLength(int *array) {
303   int i;
304   for(i=0 ; array[i] != -1; i++)
305     ;
306   return i;
307 }
308
309 inline int findmax(int *array, int arraylength) {
310   int max, i;
311   max = array[0];
312   for(i = 0; i < arraylength; i++) {
313     if(array[i] > max) {
314       max = array[i];
315     }
316   }
317   return max;
318 }
319
320 //#define INLINEPREFETCH
321 #define PREFTHRESHOLD 0
322
323 /* This function is a prefetch call generated by the compiler that
324  * populates the shared primary prefetch queue*/
325 void prefetch(int siteid, int ntuples, unsigned int *oids, unsigned short *endoffsets, short *arrayfields) {
326   /* Allocate for the queue node*/
327   int qnodesize = 2*sizeof(int) + ntuples * (sizeof(unsigned short) + sizeof(unsigned int)) + endoffsets[ntuples - 1] * sizeof(short);
328   int len;
329 #ifdef INLINEPREFETCH
330   int attempted=0;
331   char *node;
332   do {
333   node=getmemory(qnodesize);
334   if (node==NULL&&attempted)
335     break;
336   if (node!=NULL) {
337 #else
338   char *node=getmemory(qnodesize);
339 #endif
340   int top=endoffsets[ntuples-1];
341
342   if (node==NULL) {
343     LOGEVENT('D');
344     return;
345   }
346   /* Set queue node values */
347
348   /* TODO: Remove this after testing */
349   evalPrefetch[siteid].callcount++;
350
351   *((int *)(node))=siteid;
352   *((int *)(node + sizeof(int))) = ntuples;
353   len = 2*sizeof(int);
354   memcpy(node+len, oids, ntuples*sizeof(unsigned int));
355   memcpy(node+len+ntuples*sizeof(unsigned int), endoffsets, ntuples*sizeof(unsigned short));
356   memcpy(node+len+ntuples*(sizeof(unsigned int)+sizeof(short)), arrayfields, top*sizeof(short));
357
358 #ifdef INLINEPREFETCH
359   movehead(qnodesize);
360   }
361   int numpref=numavailable();
362   attempted=1;
363
364   if (node==NULL && numpref!=0 || numpref>=PREFTHRESHOLD) {
365     node=gettail();
366     prefetchpile_t *pilehead = foundLocal(node,numpref,siteid);
367     if (pilehead!=NULL) {
368       // Get sock from shared pool
369       
370       /* Send  Prefetch Request */
371       prefetchpile_t *ptr = pilehead;
372       while(ptr != NULL) {
373         globalid++;
374         int sd = getSock2(transPrefetchSockPool, ptr->mid);
375         sendPrefetchReq(ptr, sd, globalid);
376         ptr = ptr->next;
377       }
378       
379       mcdealloc(pilehead);
380     }
381     resetqueue();
382   }//end do prefetch if condition
383   } while(node==NULL);
384 #else
385   /* Lock and insert into primary prefetch queue */
386   movehead(qnodesize);
387 #endif
388 }
389
390 /* This function starts up the transaction runtime. */
391 int dstmStartup(const char * option) {
392   pthread_t thread_Listen, udp_thread_Listen;
393   pthread_attr_t attr;
394   int master=option!=NULL && strcmp(option, "master")==0;
395   int fd;
396   int udpfd;
397
398   if (processConfigFile() != 0)
399     return 0; //TODO: return error value, cause main program to exit
400 #ifdef COMPILER
401   if (!master)
402     threadcount--;
403 #endif
404
405 #ifdef TRANSSTATS
406   printf("Trans stats is on\n");
407   fflush(stdout);
408 #endif
409 #ifdef ABORTREADERS
410   initreaderlist();
411 #endif
412
413   //Initialize socket pool
414   transReadSockPool = createSockPool(transReadSockPool, DEFAULTSOCKPOOLSIZE);
415   transPrefetchSockPool = createSockPool(transPrefetchSockPool, DEFAULTSOCKPOOLSIZE);
416   transRequestSockPool = createSockPool(transRequestSockPool, DEFAULTSOCKPOOLSIZE);
417
418   dstmInit();
419   transInit();
420
421   fd=startlistening();
422   pthread_attr_init(&attr);
423   pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
424 #ifdef CACHE
425   udpfd = udpInit();
426   pthread_create(&udp_thread_Listen, &attr, udpListenBroadcast, (void*)udpfd);
427 #endif
428   if (master) {
429     pthread_create(&thread_Listen, &attr, dstmListen, (void*)fd);
430     return 1;
431   } else {
432     dstmListen((void *)fd);
433     return 0;
434   }
435 }
436
437 //TODO Use this later
438 void *pCacheAlloc(objstr_t *store, unsigned int size) {
439   void *tmp;
440   objstr_t *ptr;
441   ptr = store;
442   int success = 0;
443
444   while(ptr->next != NULL) {
445     /* check if store is empty */
446     if(((unsigned int)ptr->top - (unsigned int)ptr - sizeof(objstr_t) + size) <= ptr->size) {
447       tmp = ptr->top;
448       ptr->top += size;
449       success = 1;
450       return tmp;
451     } else {
452       ptr = ptr->next;
453     }
454   }
455
456   if(success == 0) {
457     return NULL;
458   }
459 }
460
461 /* This function initiates the prefetch thread A queue is shared
462  * between the main thread of execution and the prefetch thread to
463  * process the prefetch call Call from compiler populates the shared
464  * queue with prefetch requests while prefetch thread processes the
465  * prefetch requests */
466
467 void transInit() {
468   //Create and initialize prefetch cache structure
469 #ifdef CACHE
470   initializePCache();
471   if((evalPrefetch = initPrefetchStats()) == NULL) {
472     printf("%s() Error allocating memory at %s, %d\n", __func__, __FILE__, __LINE__);
473     exit(0);
474   }
475 #endif
476
477   /* Initialize attributes for mutex */
478   pthread_mutexattr_init(&prefetchcache_mutex_attr);
479   pthread_mutexattr_settype(&prefetchcache_mutex_attr, PTHREAD_MUTEX_RECURSIVE_NP);
480
481   pthread_mutex_init(&prefetchcache_mutex, &prefetchcache_mutex_attr);
482   pthread_mutex_init(&notifymutex, NULL);
483   pthread_mutex_init(&atomicObjLock, NULL);
484 #ifdef CACHE
485   //Create prefetch cache lookup table
486   if(prehashCreate(PHASH_SIZE, PLOADFACTOR)) {
487     printf("ERROR\n");
488     return; //Failure
489   }
490
491   //Initialize primary shared queue
492   queueInit();
493   //Initialize machine pile w/prefetch oids and offsets shared queue
494   mcpileqInit();
495
496   //Create the primary prefetch thread
497   int retval;
498 #ifdef RANGEPREFETCH
499   do {
500     retval=pthread_create(&tPrefetch, NULL, transPrefetchNew, NULL);
501   } while(retval!=0);
502 #else
503 #ifndef INLINEPREFETCH
504   do {
505     retval=pthread_create(&tPrefetch, NULL, transPrefetch, NULL);
506   } while(retval!=0);
507 #endif
508 #endif
509 #ifndef INLINEPREFETCH
510   pthread_detach(tPrefetch);
511 #endif
512 #endif
513 }
514
515 /* This function stops the threads spawned */
516 void transExit() {
517 #ifdef CACHE
518   int t;
519   pthread_cancel(tPrefetch);
520   for(t = 0; t < NUM_THREADS; t++)
521     pthread_cancel(wthreads[t]);
522 #endif
523
524   return;
525 }
526
527 /* This functions inserts randowm wait delays in the order of msec
528  * Mostly used when transaction commits retry*/
529 void randomdelay() {
530   struct timespec req;
531   time_t t;
532
533   t = time(NULL);
534   req.tv_sec = 0;
535   req.tv_nsec = (long)(1000 + (t%10000)); //1-11 microsec
536   nanosleep(&req, NULL);
537   return;
538 }
539
540 void exponentialdelay() {
541   exponential_backoff.tv_nsec = exponential_backoff.tv_nsec * 2;
542   nanosleep(&exponential_backoff, NULL);
543   ++count_exponential_backoff;
544   if (count_exponential_backoff >= max_exponential_backoff) {
545     printf(" reached max_exponential_backoff at %s, %s(), %d\n", __FILE__, __func__, __LINE__);
546     exit(-1);
547   }
548   return;
549 }
550
551 /* This function initializes things required in the transaction start*/
552 void transStart() {
553   t_cache = objstrCreate(1048576);
554   t_chashCreate(CHASH_SIZE, CLOADFACTOR);
555   revertlist=NULL;
556 #ifdef ABORTREADERS
557   t_abort=0;
558 #endif
559 }
560
561 // Search for an address for a given oid                                                                               
562 /*#define INLINE    inline __attribute__((always_inline))
563
564 INLINE void * chashSearchI(chashtable_t *table, unsigned int key) {
565   //REMOVE HASH FUNCTION CALL TO MAKE SURE IT IS INLINED HERE                                                          
566   chashlistnode_t *node = &table->table[(key & table->mask)>>1];
567
568   do {
569     if(node->key == key) {
570       return node->val;
571     }
572     node = node->next;
573   } while(node != NULL);
574
575   return NULL;
576   }*/
577
578
579
580
581 /* This function finds the location of the objects involved in a transaction
582  * and returns the pointer to the object if found in a remote location */
583 __attribute__((pure)) objheader_t *transRead(unsigned int oid) {
584   unsigned int machinenumber;
585   objheader_t *tmp, *objheader;
586   objheader_t *objcopy;
587   int size;
588   void *buf;
589   chashlistnode_t *node;
590
591   if(oid == 0) {
592     return NULL;
593   }
594   
595
596   node= &c_table[(oid & c_mask)>>1];
597   do {
598     if(node->key == oid) {
599 #ifdef TRANSSTATS
600     nchashSearch++;
601 #endif
602 #ifdef COMPILER
603     return &((objheader_t*)node->val)[1];
604 #else
605     return node->val;
606 #endif
607     }
608     node = node->next;
609   } while(node != NULL);
610   
611
612   /*  
613   if((objheader = chashSearchI(record->lookupTable, oid)) != NULL) {
614 #ifdef TRANSSTATS
615     nchashSearch++;
616 #endif
617 #ifdef COMPILER
618     return &objheader[1];
619 #else
620     return objheader;
621 #endif
622   } else 
623   */
624
625 #ifdef ABORTREADERS
626   if (t_abort) {
627     //abort this transaction
628     removetransactionhash();
629     objstrDelete(t_cache);
630     t_chashDelete();
631     _longjmp(aborttrans,1);
632   } else
633     addtransaction(oid);
634 #endif
635
636   if ((objheader = (objheader_t *) mhashSearch(oid)) != NULL) {
637 #ifdef TRANSSTATS
638     nmhashSearch++;
639 #endif
640     /* Look up in machine lookup table  and copy  into cache*/
641     GETSIZE(size, objheader);
642     size += sizeof(objheader_t);
643     objcopy = (objheader_t *) objstrAlloc(&t_cache, size);
644     memcpy(objcopy, objheader, size);
645     /* Insert into cache's lookup table */
646     STATUS(objcopy)=0;
647     t_chashInsert(OID(objheader), objcopy);
648 #ifdef COMPILER
649     return &objcopy[1];
650 #else
651     return objcopy;
652 #endif
653   } else {
654 #ifdef CACHE
655     if((tmp = (objheader_t *) prehashSearch(oid)) != NULL) {
656       if(STATUS(tmp) & DIRTY) {
657 #ifdef TRANSSTATS
658         ndirtyCacheObj++;
659 #endif
660         goto remoteread;
661       }
662 #ifdef TRANSSTATS
663       nprehashSearch++;
664 #endif
665       /* Look up in prefetch cache */
666       GETSIZE(size, tmp);
667       size+=sizeof(objheader_t);
668       objcopy = (objheader_t *) objstrAlloc(&t_cache, size);
669       memcpy(objcopy, tmp, size);
670       /* Insert into cache's lookup table */
671       t_chashInsert(OID(tmp), objcopy);
672 #ifdef COMPILER
673       return &objcopy[1];
674 #else
675       return objcopy;
676 #endif
677     }
678 remoteread:
679 #endif
680     /* Get the object from the remote location */
681     if((machinenumber = lhashSearch(oid)) == 0) {
682       printf("Error: %s() No machine found for oid =% %s,%dx\n",__func__, machinenumber, __FILE__, __LINE__);
683       return NULL;
684     }
685     objcopy = getRemoteObj(machinenumber, oid);
686
687     if(objcopy == NULL) {
688       printf("Error: Object not found in Remote location %s, %d\n", __FILE__, __LINE__);
689       return NULL;
690     } else {
691 #ifdef TRANSSTATS
692       nRemoteSend++;
693 #endif
694 #ifdef COMPILER
695 #ifdef CACHE
696       //Copy object to prefetch cache
697       pthread_mutex_lock(&prefetchcache_mutex);
698       objheader_t *headerObj;
699       int size;
700       GETSIZE(size, objcopy);
701       if((headerObj = prefetchobjstrAlloc(size + sizeof(objheader_t))) == NULL) {
702         printf("%s(): Error in getting memory from prefetch cache at %s, %d\n", __func__,
703             __FILE__, __LINE__);
704         pthread_mutex_unlock(&prefetchcache_mutex);
705         return NULL;
706       }
707       pthread_mutex_unlock(&prefetchcache_mutex);
708       memcpy(headerObj, objcopy, size+sizeof(objheader_t));
709       //make an entry in prefetch lookup hashtable
710       prehashInsert(oid, headerObj);
711       LOGEVENT('B');
712 #endif
713       return &objcopy[1];
714 #else
715       return objcopy;
716 #endif
717     }
718   }
719 }
720
721
722 /* This function finds the location of the objects involved in a transaction
723  * and returns the pointer to the object if found in a remote location */
724 __attribute__((pure)) objheader_t *transRead2(unsigned int oid) {
725 //DEBUG: __attribute__((pure)) objheader_t *transRead2(unsigned int oid, char tmpptr[]) {
726   unsigned int machinenumber;
727   objheader_t *tmp, *objheader;
728   objheader_t *objcopy;
729   int size;
730
731 #ifdef ABORTREADERS
732   if (t_abort) {
733     //abort this transaction
734     removetransactionhash();
735     objstrDelete(t_cache);
736     t_chashDelete();
737     _longjmp(aborttrans,1);
738   } else
739     addtransaction(oid);
740 #endif
741
742   if ((objheader = (objheader_t *) mhashSearch(oid)) != NULL) {
743 #ifdef TRANSSTATS
744     nmhashSearch++;
745 #endif
746     /* Look up in machine lookup table  and copy  into cache*/
747     GETSIZE(size, objheader);
748     size += sizeof(objheader_t);
749     objcopy = (objheader_t *) objstrAlloc(&t_cache, size);
750     memcpy(objcopy, objheader, size);
751     /* Insert into cache's lookup table */
752     STATUS(objcopy)=0;
753     t_chashInsert(OID(objheader), objcopy);
754 #ifdef COMPILER
755     return &objcopy[1];
756 #else
757     return objcopy;
758 #endif
759   } else {
760 #ifdef CACHE
761     if((tmp = (objheader_t *) prehashSearch(oid)) != NULL) {
762       if(STATUS(tmp) & DIRTY) {
763 #ifdef TRANSSTATS
764         ndirtyCacheObj++;
765 #endif
766         goto remoteread;
767       }
768 #ifdef TRANSSTATS
769       LOGEVENT('P')
770       nprehashSearch++;
771 #endif
772       /* Look up in prefetch cache */
773       GETSIZE(size, tmp);
774       size+=sizeof(objheader_t);
775       objcopy = (objheader_t *) objstrAlloc(&t_cache, size);
776       memcpy(objcopy, tmp, size);
777       LOGOIDTYPE("P",oid, TYPE(objcopy), myrdtsc());
778       /* Insert into cache's lookup table */
779       t_chashInsert(OID(tmp), objcopy);
780 #ifdef COMPILER
781       return &objcopy[1];
782 #else
783       return objcopy;
784 #endif
785     }
786 remoteread:
787 #endif
788     /* Get the object from the remote location */
789     if((machinenumber = lhashSearch(oid)) == 0) {
790       printf("Error: %s() No machine found for oid =% %s,%dx\n",__func__, machinenumber, __FILE__, __LINE__);
791       return NULL;
792     }
793     objcopy = getRemoteObj(machinenumber, oid);
794 #ifdef TRANSSTATS
795       LOGEVENT('R');
796       nRemoteSend++;
797 #endif
798
799     if(objcopy == NULL) {
800       printf("Error: Object %u not found in Remote location %s, %d\n", oid,__FILE__, __LINE__);
801       return NULL;
802     } else {
803 #ifdef COMPILER
804 #ifdef CACHE
805       LOGOIDTYPE("RR",oid, TYPE(objcopy),myrdtsc());
806       LOGTIME('r', oid, TYPE(objcopy),myrdtsc(),0);
807       //Copy object to prefetch cache
808       pthread_mutex_lock(&prefetchcache_mutex);
809       objheader_t *headerObj;
810       int size;
811       GETSIZE(size, objcopy);
812       if((headerObj = prefetchobjstrAlloc(size+sizeof(objheader_t))) == NULL) {
813         printf("%s(): Error in getting memory from prefetch cache at %s, %d\n", __func__,
814             __FILE__, __LINE__);
815         pthread_mutex_unlock(&prefetchcache_mutex);
816         return NULL;
817       }
818       pthread_mutex_unlock(&prefetchcache_mutex);
819       memcpy(headerObj, objcopy, size+sizeof(objheader_t));
820       //make an entry in prefetch lookup hashtable
821       prehashInsert(oid, headerObj);
822       LOGEVENT('B');
823 #endif
824       return &objcopy[1];
825 #else
826       return objcopy;
827 #endif
828     }
829   }
830 }
831
832 /* This function creates objects in the transaction record */
833 objheader_t *transCreateObj(unsigned int size) {
834   objheader_t *tmp = (objheader_t *) objstrAlloc(&t_cache, (sizeof(objheader_t) + size));
835   OID(tmp) = getNewOID();
836   tmp->version = 1;
837   tmp->rcount = 1;
838   STATUS(tmp) = NEW;
839   t_chashInsert(OID(tmp), tmp);
840
841 #ifdef COMPILER
842   return &tmp[1]; //want space after object header
843 #else
844   return tmp;
845 #endif
846 }
847
848 #if 1
849 /* This function creates machine piles based on all machines involved in a
850  * transaction commit request */
851 plistnode_t *createPiles() {
852   int i;
853   plistnode_t *pile = NULL;
854   unsigned int machinenum;
855   objheader_t *headeraddr;
856   chashlistnode_t * ptr = c_table;
857   /* Represents number of bins in the chash table */
858   unsigned int size = c_size;
859
860   for(i = 0; i < size ; i++) {
861     chashlistnode_t * curr = &ptr[i];
862     /* Inner loop to traverse the linked list of the cache lookupTable */
863     while(curr != NULL) {
864       //if the first bin in hash table is empty
865       if(curr->key == 0)
866         break;
867       headeraddr=(objheader_t *) curr->val;
868
869       //Get machine location for object id (and whether local or not)
870       if (STATUS(headeraddr) & NEW || (mhashSearch(curr->key) != NULL)) {
871         machinenum = myIpAddr;
872       } else if ((machinenum = lhashSearch(curr->key)) == 0) {
873         printf("Error: No such machine %s, %d\n", __FILE__, __LINE__);
874         return NULL;
875       }
876
877       //Make machine groups
878       pile = pInsert(pile, headeraddr, machinenum, c_numelements);
879       curr = curr->next;
880     }
881   }
882   return pile;
883 }
884 #else
885 /* This function creates machine piles based on all machines involved in a
886  * transaction commit request */
887 plistnode_t *createPiles() {
888   int i;
889   plistnode_t *pile = NULL;
890   unsigned int machinenum;
891   objheader_t *headeraddr;
892   struct chashentry * ptr = c_table;
893   /* Represents number of bins in the chash table */
894   unsigned int size = c_size;
895
896   for(i = 0; i < size ; i++) {
897     struct chashentry * curr = & ptr[i];
898     /* Inner loop to traverse the linked list of the cache lookupTable */
899     //if the first bin in hash table is empty
900     if(curr->key == 0)
901       continue;
902     headeraddr=(objheader_t *) curr->ptr;
903
904     //Get machine location for object id (and whether local or not)
905     if (STATUS(headeraddr) & NEW || (mhashSearch(curr->key) != NULL)) {
906       machinenum = myIpAddr;
907     } else if ((machinenum = lhashSearch(curr->key)) == 0) {
908       printf("Error: No such machine %s, %d\n", __FILE__, __LINE__);
909       return NULL;
910     }
911
912     //Make machine groups
913     pile = pInsert(pile, headeraddr, machinenum, c_numelements);
914   }
915   return pile;
916 }
917 #endif
918
919 /* This function initiates the transaction commit process
920  * Spawns threads for each of the new connections with Participants
921  * and creates new piles by calling the createPiles(),
922  * Sends a transrequest() to each remote machines for objects found remotely
923  * and calls handleLocalReq() to process objects found locally */
924 int transCommit() {
925   unsigned int tot_bytes_mod, *listmid;
926   plistnode_t *pile, *pile_ptr;
927   char treplyretry; /* keeps track of the common response that needs to be sent */
928   int firsttime=1;
929   trans_commit_data_t transinfo; /* keeps track of objs locked during transaction */
930   char finalResponse;
931 #ifdef SANDBOX
932   abortenabled=0;
933 #endif
934
935 #ifdef LOGEVENTS
936   int iii;
937   for(iii=0;iii<bigindex;iii++) {
938     printf("%c", bigarray[iii]);
939   }
940 #endif
941
942 #ifdef LOGTIMES
943   int jjj;
944   for(jjj=0; jjj<bigindex1; jjj++) {
945     printf("[%c %u %u %lld %d]\n", bigarray1[jjj], bigarray2[jjj], bigarray3[jjj], bigarray4[jjj], bigarray5[jjj]);
946   }
947 #endif
948
949 #ifdef ABORTREADERS
950   if (t_abort) {
951     //abort this transaction
952     removetransactionhash();
953     objstrDelete(t_cache);
954     t_chashDelete();
955     return 1;
956   }
957 #endif
958
959
960   int treplyretryCount = 0;
961   exponential_backoff.tv_sec = 0;
962   exponential_backoff.tv_nsec = (long)(10000);//10 microsec
963   do {
964     treplyretry = 0;
965
966     /* Look through all the objects in the transaction record and make piles
967      * for each machine involved in the transaction*/
968     if (firsttime) {
969       pile_ptr = pile = createPiles();
970       pile_ptr = pile = sortPiles(pile);
971     } else {
972       pile = pile_ptr;
973     }
974     firsttime = 0;
975     /* Create the packet to be sent in TRANS_REQUEST */
976
977     /* Count the number of participants */
978     int pilecount;
979     pilecount = pCount(pile);
980
981     /* Create a list of machine ids(Participants) involved in transaction   */
982     listmid = calloc(pilecount, sizeof(unsigned int));
983     pListMid(pile, listmid);
984
985     /* Create a socket and getReplyCtrl array, initialize */
986     int socklist[pilecount];
987     char getReplyCtrl[pilecount];
988     int loopcount;
989     for(loopcount = 0 ; loopcount < pilecount; loopcount++){
990       socklist[loopcount] = 0;
991       getReplyCtrl[loopcount] = 0;
992     }
993
994     /* Process each machine pile */
995     int sockindex = 0;
996     trans_req_data_t *tosend;
997     tosend = calloc(pilecount, sizeof(trans_req_data_t));
998     while(pile != NULL) {
999       tosend[sockindex].f.control = TRANS_REQUEST;
1000       tosend[sockindex].f.mcount = pilecount;
1001       tosend[sockindex].f.numread = pile->numread;
1002       tosend[sockindex].f.nummod = pile->nummod;
1003       tosend[sockindex].f.numcreated = pile->numcreated;
1004       tosend[sockindex].f.sum_bytes = pile->sum_bytes;
1005       tosend[sockindex].listmid = listmid;
1006       tosend[sockindex].objread = pile->objread;
1007       tosend[sockindex].oidmod = pile->oidmod;
1008       tosend[sockindex].oidcreated = pile->oidcreated;
1009       int sd = 0;
1010       if(pile->mid != myIpAddr) {
1011         if((sd = getSock2WithLock(transRequestSockPool, pile->mid)) < 0) {
1012           printf("transRequest(): socket create error\n");
1013           free(listmid);
1014           free(tosend);
1015           return 1;
1016         }
1017         socklist[sockindex] = sd;
1018         /* Send bytes of data with TRANS_REQUEST control message */
1019         send_data(sd, &(tosend[sockindex].f), sizeof(fixed_data_t));
1020
1021         /* Send list of machines involved in the transaction */
1022         {
1023           int size=sizeof(unsigned int)*(tosend[sockindex].f.mcount);
1024           send_data(sd, tosend[sockindex].listmid, size);
1025         }
1026
1027         /* Send oids and version number tuples for objects that are read */
1028         {
1029           int size=(sizeof(unsigned int)+sizeof(unsigned short))*(tosend[sockindex].f.numread);
1030           send_data(sd, tosend[sockindex].objread, size);
1031         }
1032
1033         /* Send objects that are modified */
1034         void *modptr;
1035         if((modptr = calloc(1, tosend[sockindex].f.sum_bytes)) == NULL) {
1036           printf("Calloc error for modified objects %s, %d\n", __FILE__, __LINE__);
1037           free(listmid);
1038           free(tosend);
1039           return 1;
1040         }
1041         int offset = 0;
1042         int i;
1043         for(i = 0; i < tosend[sockindex].f.nummod ; i++) {
1044           int size;
1045           objheader_t *headeraddr;
1046           if((headeraddr = t_chashSearch(tosend[sockindex].oidmod[i])) == NULL) {
1047             printf("%s() Error: No such oid %s, %d\n", __func__, __FILE__, __LINE__);
1048             free(modptr);
1049             free(listmid);
1050             free(tosend);
1051             return 1;
1052           }
1053           GETSIZE(size,headeraddr);
1054           size+=sizeof(objheader_t);
1055           memcpy(modptr+offset, headeraddr, size);
1056           offset+=size;
1057         }
1058         send_data(sd, modptr, tosend[sockindex].f.sum_bytes);
1059         free(modptr);
1060       } else { //handle request locally
1061         handleLocalReq(&tosend[sockindex], &transinfo, &getReplyCtrl[sockindex]);
1062       }
1063       sockindex++;
1064       pile = pile->next;
1065     } //end of pile processing
1066       /* Recv Ctrl msgs from all machines */
1067     int i;
1068     for(i = 0; i < pilecount; i++) {
1069       int sd = socklist[i];
1070       if(sd != 0) {
1071         char control;
1072         recv_data(sd, &control, sizeof(char));
1073         //Update common data structure with new ctrl msg
1074         getReplyCtrl[i] = control;
1075         /* Recv Objects if participant sends TRANS_DISAGREE */
1076 #ifdef CACHE
1077         if(control == TRANS_DISAGREE) {
1078           int length;
1079           recv_data(sd, &length, sizeof(int));
1080           void *newAddr;
1081           pthread_mutex_lock(&prefetchcache_mutex);
1082           if ((newAddr = prefetchobjstrAlloc((unsigned int)length)) == NULL) {
1083             printf("Error: %s() objstrAlloc error for copying into prefetch cache %s, %d\n", __func__, __FILE__, __LINE__);
1084             free(tosend);
1085             free(listmid);
1086             pthread_mutex_unlock(&prefetchcache_mutex);
1087             return 1;
1088           }
1089           pthread_mutex_unlock(&prefetchcache_mutex);
1090           recv_data(sd, newAddr, length);
1091           int offset = 0;
1092           while(length != 0) {
1093             unsigned int oidToPrefetch;
1094             objheader_t * header;
1095             header = (objheader_t *)(((char *)newAddr) + offset);
1096             oidToPrefetch = OID(header);
1097             STATUS(header)=0;
1098             int size = 0;
1099             GETSIZE(size, header);
1100             size += sizeof(objheader_t);
1101             //make an entry in prefetch hash table
1102         prehashInsert(oidToPrefetch, header);
1103         LOGEVENT('E');
1104             length = length - size;
1105             offset += size;
1106           }
1107         } //end of receiving objs
1108 #endif
1109       }
1110     }
1111     /* Decide the final response */
1112     if((finalResponse = decideResponse(getReplyCtrl, &treplyretry, pilecount)) == 0) {
1113       printf("Error: %s() in updating prefetch cache %s, %d\n", __func__, __FILE__, __LINE__);
1114       free(tosend);
1115       free(listmid);
1116       return 1;
1117     }
1118
1119     /* Send responses to all machines */
1120     for(i = 0; i < pilecount; i++) {
1121       int sd = socklist[i];
1122       if(sd != 0) {
1123 #ifdef CACHE
1124         if(finalResponse == TRANS_COMMIT) {
1125           int retval;
1126           /* Update prefetch cache */
1127           if((retval = updatePrefetchCache(&(tosend[i]))) != 0) {
1128             printf("Error: %s() in updating prefetch cache %s, %d\n", __func__, __FILE__, __LINE__);
1129             free(tosend);
1130             free(listmid);
1131             return 1;
1132           }
1133
1134
1135           /* Invalidate objects in other machine cache */
1136           if(tosend[i].f.nummod > 0) {
1137             if((retval = invalidateObj(&(tosend[i]))) != 0) {
1138               printf("Error: %s() in invalidating Objects %s, %d\n", __func__, __FILE__, __LINE__);
1139               free(tosend);
1140               free(listmid);
1141               return 1;
1142             }
1143           }
1144 #ifdef ABORTREADERS
1145           removetransaction(tosend[i].oidmod,tosend[i].f.nummod);
1146           removethisreadtransaction(tosend[i].objread, tosend[i].f.numread);
1147 #endif
1148         }
1149 #ifdef ABORTREADERS
1150         else if (!treplyretry) {
1151           removethistransaction(tosend[i].oidmod,tosend[i].f.nummod);
1152           removethisreadtransaction(tosend[i].objread,tosend[i].f.numread);
1153         }
1154 #endif
1155 #endif
1156         send_data(sd, &finalResponse, sizeof(char));
1157       } else {
1158         /* Complete local processing */
1159         doLocalProcess(finalResponse, &(tosend[i]), &transinfo);
1160 #ifdef ABORTREADERS
1161         if(finalResponse == TRANS_COMMIT) {
1162           removetransaction(tosend[i].oidmod,tosend[i].f.nummod);
1163           removethisreadtransaction(tosend[i].objread,tosend[i].f.numread);
1164         } else if (!treplyretry) {
1165           removethistransaction(tosend[i].oidmod,tosend[i].f.nummod);
1166           removethisreadtransaction(tosend[i].objread,tosend[i].f.numread);
1167         }
1168 #endif
1169       }
1170     }
1171
1172     /* Free resources */
1173     free(tosend);
1174     free(listmid);
1175     if (!treplyretry)
1176       pDelete(pile_ptr);
1177     /* wait a random amount of time before retrying to commit transaction*/
1178     if(treplyretry) {
1179       treplyretryCount++;
1180       if(treplyretryCount >= NUM_TRY_TO_COMMIT)
1181         exponentialdelay();
1182       else 
1183         randomdelay();
1184 #ifdef TRANSSTATS
1185       nSoftAbort++;
1186 #endif
1187     }
1188     /* Retry trans commit procedure during soft_abort case */
1189   } while (treplyretry);
1190
1191   /* Reset to initial timeout for exponential delay */
1192   exponential_backoff.tv_sec = 0;
1193   exponential_backoff.tv_nsec = (long)(10000);//10 microsec_
1194   count_exponential_backoff = 0;
1195
1196   if(finalResponse == TRANS_ABORT) {
1197 #ifdef TRANSSTATS
1198     LOGEVENT('A');
1199     numTransAbort++;
1200 #endif
1201     /* Free Resources */
1202     objstrDelete(t_cache);
1203     t_chashDelete();
1204 #ifdef SANDBOX
1205       abortenabled=1;
1206 #endif
1207     return TRANS_ABORT;
1208   } else if(finalResponse == TRANS_COMMIT) {
1209 #ifdef TRANSSTATS
1210     LOGEVENT('C');
1211     numTransCommit++;
1212 #endif
1213     /* Free Resources */
1214     objstrDelete(t_cache);
1215     t_chashDelete();
1216     return 0;
1217   } else {
1218     //TODO Add other cases
1219     printf("Error: in %s() THIS SHOULD NOT HAPPEN.....EXIT PROGRAM\n", __func__);
1220     exit(-1);
1221   }
1222   return 0;
1223 }
1224
1225 /* This function handles the local objects involved in a transaction
1226  * commiting process.  It also makes a decision if this local machine
1227  * sends AGREE or DISAGREE or SOFT_ABORT to coordinator */
1228 void handleLocalReq(trans_req_data_t *tdata, trans_commit_data_t *transinfo, char *getReplyCtrl) {
1229   unsigned int *oidnotfound = NULL, *oidlocked = NULL;
1230   int numoidnotfound = 0, numoidlocked = 0;
1231   int v_nomatch = 0, v_matchlock = 0, v_matchnolock = 0;
1232   int numread, i;
1233   unsigned int oid;
1234   unsigned short version;
1235
1236   /* Counters and arrays to formulate decision on control message to be sent */
1237   oidnotfound = (unsigned int *) calloc((tdata->f.numread + tdata->f.nummod), sizeof(unsigned int));
1238   oidlocked = (unsigned int *) calloc((tdata->f.numread + tdata->f.nummod +1), sizeof(unsigned int)); // calloc additional 1 byte for
1239   //setting a divider between read and write locks
1240   numread = tdata->f.numread;
1241   /* Process each oid in the machine pile/ group per thread */
1242   for (i = 0; i < tdata->f.numread + tdata->f.nummod; i++) {
1243     if (i < tdata->f.numread) {
1244       int incr = sizeof(unsigned int) + sizeof(unsigned short); // Offset that points to next position in the objread array
1245       incr *= i;
1246       oid = *((unsigned int *)(((char *)tdata->objread) + incr));
1247       version = *((unsigned short *)(((char *)tdata->objread) + incr + sizeof(unsigned int)));
1248       commitCountForObjRead(getReplyCtrl, oidnotfound, oidlocked, &numoidnotfound, &numoidlocked, &v_nomatch, &v_matchlock, &v_matchnolock, oid, version);
1249     } else { // Objects Modified
1250       if(i == tdata->f.numread) {
1251         oidlocked[numoidlocked++] = -1;
1252       }
1253       int tmpsize;
1254       objheader_t *headptr;
1255       headptr = (objheader_t *) t_chashSearch(tdata->oidmod[i-numread]);
1256       if (headptr == NULL) {
1257         printf("Error: handleLocalReq() returning NULL, no such oid %s, %d\n", __FILE__, __LINE__);
1258         return;
1259       }
1260       oid = OID(headptr);
1261       version = headptr->version;
1262       commitCountForObjMod(getReplyCtrl, oidnotfound, oidlocked, &numoidnotfound, &numoidlocked, &v_nomatch, &v_matchlock, &v_matchnolock, oid, version);
1263     }
1264   }
1265
1266 /* Fill out the trans_commit_data_t data structure. This is required for a trans commit process
1267  * if Participant receives a TRANS_COMMIT */
1268   transinfo->objlocked = oidlocked;
1269   transinfo->objnotfound = oidnotfound;
1270   transinfo->modptr = NULL;
1271   transinfo->numlocked = numoidlocked;
1272   transinfo->numnotfound = numoidnotfound;
1273
1274   /* Condition to send TRANS_AGREE */
1275   if(v_matchnolock == tdata->f.numread + tdata->f.nummod) {
1276     *getReplyCtrl = TRANS_AGREE;
1277   }
1278   /* Condition to send TRANS_SOFT_ABORT */
1279   if((v_matchlock > 0 && v_nomatch == 0) || (numoidnotfound > 0 && v_nomatch == 0)) {
1280     *getReplyCtrl = TRANS_SOFT_ABORT;
1281   }
1282 }
1283
1284 void doLocalProcess(char finalResponse, trans_req_data_t *tdata, trans_commit_data_t *transinfo) {
1285   if(finalResponse == TRANS_ABORT) {
1286     if(transAbortProcess(transinfo) != 0) {
1287       printf("Error in transAbortProcess() %s,%d\n", __FILE__, __LINE__);
1288       fflush(stdout);
1289       return;
1290     }
1291   } else if(finalResponse == TRANS_COMMIT) {
1292 #ifdef CACHE
1293     /* Invalidate objects in other machine cache */
1294     if(tdata->f.nummod > 0) {
1295       int retval;
1296       if((retval = invalidateObj(tdata)) != 0) {
1297         printf("Error: %s() in invalidating Objects %s, %d\n", __func__, __FILE__, __LINE__);
1298         return;
1299       }
1300     }
1301 #endif
1302     if(transComProcess(tdata, transinfo) != 0) {
1303       printf("Error in transComProcess() %s,%d\n", __FILE__, __LINE__);
1304       fflush(stdout);
1305       return;
1306     }
1307   } else {
1308     printf("ERROR...No Decision\n");
1309   }
1310
1311   /* Free memory */
1312   if (transinfo->objlocked != NULL) {
1313     free(transinfo->objlocked);
1314   }
1315   if (transinfo->objnotfound != NULL) {
1316     free(transinfo->objnotfound);
1317   }
1318 }
1319
1320 /* This function decides the reponse that needs to be sent to
1321  * all Participant machines after the TRANS_REQUEST protocol */
1322 char decideResponse(char *getReplyCtrl, char *treplyretry, int pilecount) {
1323   int i, transagree = 0, transdisagree = 0, transsoftabort = 0; /* Counters to formulate decision of what
1324                                                                    message to send */
1325   for (i = 0 ; i < pilecount; i++) {
1326     char control;
1327     control = getReplyCtrl[i];
1328     switch(control) {
1329     default:
1330       printf("Participant sent unknown message %d in %s, %d\n", control, __FILE__, __LINE__);
1331
1332       /* treat as disagree, pass thru */
1333     case TRANS_DISAGREE:
1334       transdisagree++;
1335       break;
1336
1337     case TRANS_AGREE:
1338       transagree++;
1339       break;
1340
1341     case TRANS_SOFT_ABORT:
1342       transsoftabort++;
1343       break;
1344     }
1345   }
1346
1347   if(transdisagree > 0) {
1348     /* Send Abort */
1349     *treplyretry = 0;
1350     return TRANS_ABORT;
1351 #ifdef CACHE
1352     /* clear objects from prefetch cache */
1353     //cleanPCache();
1354 #endif
1355   } else if(transagree == pilecount) {
1356     /* Send Commit */
1357     *treplyretry = 0;
1358     return TRANS_COMMIT;
1359   } else {
1360     /* Send Abort in soft abort case followed by retry commiting transaction again*/
1361     *treplyretry = 1;
1362     return TRANS_ABORT;
1363   }
1364   return 0;
1365 }
1366
1367 /* This function opens a connection, places an object read request to
1368  * the remote machine, reads the control message and object if
1369  * available and copies the object and its header to the local
1370  * cache. */
1371
1372 void *getRemoteObj(unsigned int mnum, unsigned int oid) {
1373   int size, val;
1374   struct sockaddr_in serv_addr;
1375   char machineip[16];
1376   char control;
1377   objheader_t *h;
1378   void *objcopy = NULL;
1379
1380   int sd = getSock2(transReadSockPool, mnum);
1381   char readrequest[sizeof(char)+sizeof(unsigned int)];
1382   readrequest[0] = READ_REQUEST;
1383   *((unsigned int *)(&readrequest[1])) = oid;
1384   send_data(sd, readrequest, sizeof(readrequest));
1385
1386   /* Read response from the Participant */
1387   recv_data(sd, &control, sizeof(char));
1388
1389   if (control==OBJECT_NOT_FOUND) {
1390     objcopy = NULL;
1391   } else {
1392     /* Read object if found into local cache */
1393     recv_data(sd, &size, sizeof(int));
1394     objcopy = objstrAlloc(&t_cache, size);
1395     recv_data(sd, objcopy, size);
1396     STATUS(objcopy)=0;
1397     /* Insert into cache's lookup table */
1398     t_chashInsert(oid, objcopy);
1399 #ifdef TRANSSTATS
1400     totalObjSize += size;
1401 #endif
1402   }
1403
1404   return objcopy;
1405 }
1406
1407 /*  Commit info for objects modified */
1408 void commitCountForObjMod(char *getReplyCtrl, unsigned int *oidnotfound, unsigned int *oidlocked, int *numoidnotfound,
1409                           int *numoidlocked, int *v_nomatch, int *v_matchlock, int *v_matchnolock, unsigned int oid, unsigned short version) {
1410   void *mobj;
1411   /* Check if object is still present in the machine since the beginning of TRANS_REQUEST */
1412   /* Save the oids not found and number of oids not found for later use */
1413   if ((mobj = mhashSearch(oid)) == NULL) { /* Obj not found */
1414     /* Save the oids not found and number of oids not found for later use */
1415     oidnotfound[*numoidnotfound] = oid;
1416     (*numoidnotfound)++;
1417   } else { /* If Obj found in machine (i.e. has not moved) */
1418     /* Check if Obj is locked by any previous transaction */
1419     if (write_trylock(STATUSPTR(mobj))) { // Can acquire write lock
1420       if (version == ((objheader_t *)mobj)->version) {      /* match versions */
1421         (*v_matchnolock)++;
1422         //Keep track of what is locked
1423         oidlocked[(*numoidlocked)++] = OID(((objheader_t *)mobj));
1424       } else { /* If versions don't match ...HARD ABORT */
1425         (*v_nomatch)++;
1426         /* Send TRANS_DISAGREE to Coordinator */
1427         *getReplyCtrl = TRANS_DISAGREE;
1428
1429         //Keep track of what is locked
1430         oidlocked[(*numoidlocked)++] = OID(((objheader_t *)mobj));
1431         return;
1432       }
1433     } else { //A lock is acquired some place else
1434       if (version == ((objheader_t *)mobj)->version) { /* Check if versions match */
1435         (*v_matchlock)++;
1436       } else { /* If versions don't match ...HARD ABORT */
1437         (*v_nomatch)++;
1438         /* Send TRANS_DISAGREE to Coordinator */
1439         *getReplyCtrl = TRANS_DISAGREE;
1440         return;
1441       }
1442     }
1443   }
1444 }
1445
1446 /*  Commit info for objects modified */
1447 void commitCountForObjRead(char *getReplyCtrl, unsigned int *oidnotfound, unsigned int *oidlocked, int *numoidnotfound,
1448                            int *numoidlocked, int *v_nomatch, int *v_matchlock, int *v_matchnolock, unsigned int oid, unsigned short version) {
1449   void *mobj;
1450   /* Check if object is still present in the machine since the beginning of TRANS_REQUEST */
1451   /* Save the oids not found and number of oids not found for later use */
1452   if ((mobj = mhashSearch(oid)) == NULL) { /* Obj not found */
1453     /* Save the oids not found and number of oids not found for later use */
1454     oidnotfound[*numoidnotfound] = oid;
1455     (*numoidnotfound)++;
1456   } else { /* If Obj found in machine (i.e. has not moved) */
1457     /* Check if Obj is locked by any previous transaction */
1458     if (read_trylock(STATUSPTR(mobj))) { // Can further acquire read locks
1459       if (version == ((objheader_t *)mobj)->version) {      /* If locked then match versions */
1460         (*v_matchnolock)++;
1461         //Keep track of what is locked
1462         oidlocked[(*numoidlocked)++] = OID(((objheader_t *)mobj));
1463       } else { /* If versions don't match ...HARD ABORT */
1464         (*v_nomatch)++;
1465         /* Send TRANS_DISAGREE to Coordinator */
1466         *getReplyCtrl = TRANS_DISAGREE;
1467         //Keep track of what is locked
1468         oidlocked[(*numoidlocked)++] = OID(((objheader_t *)mobj));
1469         return;
1470       }
1471     } else { //Has reached max number of readers or some other transaction
1472       //has acquired a lock on this object
1473       if (version == ((objheader_t *)mobj)->version) { /* Check if versions match */
1474         (*v_matchlock)++;
1475       } else { /* If versions don't match ...HARD ABORT */
1476         (*v_nomatch)++;
1477         /* Send TRANS_DISAGREE to Coordinator */
1478         *getReplyCtrl = TRANS_DISAGREE;
1479         return;
1480       }
1481     }
1482   }
1483 }
1484
1485 /* This function completes the ABORT process if the transaction is aborting */
1486 int transAbortProcess(trans_commit_data_t *transinfo) {
1487   int i, numlocked;
1488   unsigned int *objlocked;
1489   void *header;
1490
1491   numlocked = transinfo->numlocked;
1492   objlocked = transinfo->objlocked;
1493
1494   int useWriteUnlock = 0;
1495   for (i = 0; i < numlocked; i++) {
1496     if(objlocked[i] == -1) {
1497       useWriteUnlock = 1;
1498       continue;
1499     }
1500     if((header = mhashSearch(objlocked[i])) == NULL) {
1501       printf("mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__);
1502       return 1;
1503     }
1504     if(!useWriteUnlock) {
1505       read_unlock(STATUSPTR(header));
1506     } else {
1507       write_unlock(STATUSPTR(header));
1508     }
1509   }
1510
1511   return 0;
1512 }
1513
1514 /*This function completes the COMMIT process if the transaction is commiting*/
1515 int transComProcess(trans_req_data_t *tdata, trans_commit_data_t *transinfo) {
1516   objheader_t *header, *tcptr;
1517   int i, nummod, tmpsize, numcreated, numlocked;
1518   unsigned int *oidmod, *oidcreated, *oidlocked;
1519   void *ptrcreate;
1520
1521   nummod = tdata->f.nummod;
1522   oidmod = tdata->oidmod;
1523   numcreated = tdata->f.numcreated;
1524   oidcreated = tdata->oidcreated;
1525   numlocked = transinfo->numlocked;
1526   oidlocked = transinfo->objlocked;
1527
1528   for (i = 0; i < nummod; i++) {
1529     if((header = (objheader_t *) mhashSearch(oidmod[i])) == NULL) {
1530       printf("Error: transComProcess() mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__);
1531       return 1;
1532     }
1533     /* Copy from transaction cache -> main object store */
1534     if ((tcptr = ((objheader_t *) t_chashSearch(oidmod[i]))) == NULL) {
1535       printf("Error: transComProcess() chashSearch returned NULL at %s, %d\n", __FILE__, __LINE__);
1536       return 1;
1537     }
1538     GETSIZE(tmpsize, header);
1539     char *tmptcptr = (char *) tcptr;
1540     {
1541       struct ___Object___ *dst=(struct ___Object___*)((char*)header+sizeof(objheader_t));
1542       struct ___Object___ *src=(struct ___Object___*)((char*)tmptcptr+sizeof(objheader_t));
1543       dst->___cachedCode___=src->___cachedCode___;
1544       dst->___cachedHash___=src->___cachedHash___;
1545
1546       memcpy(&dst[1], &src[1], tmpsize-sizeof(struct ___Object___));
1547     }
1548
1549     header->version += 1;
1550     if(header->notifylist != NULL) {
1551       notifyAll(&header->notifylist, OID(header), header->version);
1552     }
1553   }
1554   /* If object is newly created inside transaction then commit it */
1555   for (i = 0; i < numcreated; i++) {
1556     if ((header = ((objheader_t *) t_chashSearch(oidcreated[i]))) == NULL) {
1557       printf("Error: transComProcess() chashSearch returned NULL for oid = %x at %s, %d\n", oidcreated[i], __FILE__, __LINE__);
1558       return 1;
1559     }
1560     GETSIZE(tmpsize, header);
1561     tmpsize += sizeof(objheader_t);
1562     pthread_mutex_lock(&mainobjstore_mutex);
1563     if ((ptrcreate = objstrAlloc(&mainobjstore, tmpsize)) == NULL) {
1564       printf("Error: transComProcess() failed objstrAlloc %s, %d\n", __FILE__, __LINE__);
1565       pthread_mutex_unlock(&mainobjstore_mutex);
1566       return 1;
1567     }
1568     pthread_mutex_unlock(&mainobjstore_mutex);
1569     /* Initialize read and write locks */
1570     initdsmlocks(STATUSPTR(header));
1571     memcpy(ptrcreate, header, tmpsize);
1572     mhashInsert(oidcreated[i], ptrcreate);
1573     lhashInsert(oidcreated[i], myIpAddr);
1574   }
1575   /* Unlock locked objects */
1576   int useWriteUnlock = 0;
1577   for(i = 0; i < numlocked; i++) {
1578     if(oidlocked[i] == -1) {
1579       useWriteUnlock = 1;
1580       continue;
1581     }
1582     if((header = (objheader_t *) mhashSearch(oidlocked[i])) == NULL) {
1583       printf("mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__);
1584       return 1;
1585     }
1586     if(!useWriteUnlock) {
1587       read_unlock(STATUSPTR(header));
1588     } else {
1589       write_unlock(STATUSPTR(header));
1590     }
1591   }
1592   return 0;
1593 }
1594
1595 prefetchpile_t *foundLocal(char *ptr, int numprefetches, int mysiteid) {
1596   int i;
1597   int j;
1598   prefetchpile_t * head=NULL;
1599
1600   for(j=0;j<numprefetches;j++) {
1601     int siteid = *(GET_SITEID(ptr));
1602     int ntuples = *(GET_NTUPLES(ptr));
1603     unsigned int * oidarray = GET_PTR_OID(ptr);
1604     unsigned short * endoffsets = GET_PTR_EOFF(ptr, ntuples);
1605     short * arryfields = GET_PTR_ARRYFLD(ptr, ntuples);
1606     int numLocal = 0;
1607     
1608     for(i=0; i<ntuples; i++) {
1609       unsigned short baseindex=(i==0) ? 0 : endoffsets[i-1];
1610       unsigned short endindex=endoffsets[i];
1611       unsigned int oid=oidarray[i];
1612       int newbase;
1613       int machinenum;
1614       int countInvalidObj=0;
1615
1616       if (oid==0) {
1617         numLocal++;
1618         continue;
1619       }
1620       //Look up fields locally
1621       int isLastOffset=0;
1622       if(endindex==0)
1623           isLastOffset=1;
1624       for(newbase=baseindex; newbase<endindex; newbase++) {
1625         if(newbase==(endindex-1))
1626           isLastOffset=1;
1627         if (!lookupObject(&oid,arryfields[newbase],&countInvalidObj)) {
1628           break;
1629         }
1630         //Ended in a null pointer...
1631         if (oid==0) {
1632           numLocal++;
1633           goto tuple;
1634         }
1635       }
1636
1637       //Entire prefetch is local
1638       if (newbase==endindex&&checkoid(oid,isLastOffset)) {
1639         numLocal++;
1640         goto tuple;
1641       }
1642
1643       //Add to remote requests
1644       machinenum=lhashSearch(oid);
1645       insertPile(machinenum, oid, siteid,endindex-newbase, &arryfields[newbase], &head);
1646     tuple:
1647       ;
1648     }
1649     
1650     /* handle dynamic prefetching */
1651     handleDynPrefetching(numLocal, ntuples, siteid);
1652     ptr=((char *)&arryfields[endoffsets[ntuples-1]])+sizeof(int);
1653   }
1654
1655   return head;
1656 }
1657
1658 int checkoid(unsigned int oid, int isLastOffset) {
1659   objheader_t *header;
1660   if ((header=mhashSearch(oid))!=NULL) {
1661     //Found on machine
1662     return 1;
1663   } else if ((header=prehashSearch(oid))!=NULL) {
1664     //if the last offset then prefetch object
1665     if((STATUS(header) & DIRTY) && isLastOffset) {
1666       return 0;
1667     }
1668     //Found in cache
1669     return 1;
1670   } else {
1671     return 0;
1672   }
1673 }
1674
1675 int lookupObject(unsigned int * oid, short offset, int *countInvalidObj) {
1676   objheader_t *header;
1677   if ((header=mhashSearch(*oid))!=NULL) {
1678     //Found on machine
1679     ;
1680   } else if ((header=prehashSearch(*oid))!=NULL) {
1681     //Found in cache
1682     if(STATUS(header) & DIRTY) {//Read an oid that is an old entry in the cache;
1683       //only once because later old entries may still cause unnecessary roundtrips during prefetching
1684       (*countInvalidObj)+=1;
1685       if(*countInvalidObj > 1) {
1686         return 0;
1687       }
1688     }
1689   } else {
1690     return 0;
1691   }
1692
1693   if(TYPE(header) >= NUMCLASSES) {
1694     int elementsize = classsize[TYPE(header)];
1695     struct ArrayObject *ao = (struct ArrayObject *) (((char *)header) + sizeof(objheader_t));
1696     int length = ao->___length___;
1697     /* Check if array out of bounds */
1698     if(offset < 0 || offset >= length) {
1699       //if yes treat the object as found
1700       (*oid)=0;
1701       return 1;
1702     }
1703     (*oid) = *((unsigned int *)(((char *)ao) + sizeof(struct ArrayObject) + (elementsize*offset)));
1704     return 1;
1705   } else {
1706     (*oid) = *((unsigned int *)(((char *)header) + sizeof(objheader_t) + offset));
1707     return 1;
1708   }
1709 }
1710
1711
1712 /* This function is called by the thread calling transPrefetch */
1713 void *transPrefetch(void *t) {
1714   while(1) {
1715     /* read from prefetch queue */
1716     void *node=gettail();
1717     /* Check if the tuples are found locally, if yes then reduce them further*/
1718     /* and group requests by remote machine ids by calling the makePreGroups() */
1719     int count=numavailable();
1720     prefetchpile_t *pilehead = foundLocal(node, count, 0);
1721
1722     if (pilehead!=NULL) {
1723       // Get sock from shared pool
1724
1725       /* Send  Prefetch Request */
1726       prefetchpile_t *ptr = pilehead;
1727       while(ptr != NULL) {
1728         globalid++;
1729         int sd = getSock2(transPrefetchSockPool, ptr->mid);
1730         sendPrefetchReq(ptr, sd,globalid);
1731         ptr = ptr->next;
1732       }
1733
1734       /* Release socket */
1735       //        freeSock(transPrefetchSockPool, pilehead->mid, sd);
1736
1737       /* Deallocated pilehead */
1738       mcdealloc(pilehead);
1739     }
1740     // Deallocate the prefetch queue pile node
1741     incmulttail(count);
1742   }
1743 }
1744
1745 void sendPrefetchReqnew(prefetchpile_t *mcpilenode, int sd) {
1746   objpile_t *tmp;
1747
1748   int size=sizeof(char)+sizeof(int);
1749   for(tmp=mcpilenode->objpiles; tmp!=NULL; tmp=tmp->next) {
1750     size += sizeof(int) + sizeof(unsigned int) + sizeof(unsigned int) + ((tmp->numoffset) * sizeof(short));
1751   }
1752
1753   char buft[size];
1754   char *buf=buft;
1755   *buf=TRANS_PREFETCH;
1756   buf+=sizeof(char);
1757
1758   for(tmp=mcpilenode->objpiles; tmp!=NULL; tmp=tmp->next) {
1759     int len = sizeof(int) + sizeof(unsigned int) + sizeof(unsigned int) + ((tmp->numoffset) * sizeof(short));
1760     *((int*)buf)=len;
1761     buf+=sizeof(int);
1762     *((unsigned int *)buf)=tmp->oid;
1763     buf+=sizeof(unsigned int);
1764     *((unsigned int *)(buf)) = myIpAddr;
1765     buf+=sizeof(unsigned int);
1766     memcpy(buf, tmp->offset, tmp->numoffset*sizeof(short));
1767     buf+=tmp->numoffset*sizeof(short);
1768   }
1769   *((int *)buf)=-1;
1770   send_data(sd, buft, size);
1771   return;
1772 }
1773
1774 /**
1775  * parameters: mcpilenode -> pile node to traverse to assemble pref requests
1776  * sd -> socket id
1777  * gid -> global identifier for each prefetch request sent, starts with 0
1778  **/
1779 void sendPrefetchReq(prefetchpile_t *mcpilenode, int sd, int gid) {
1780   int len, endpair;
1781   char control;
1782   objpile_t *tmp;
1783   struct writestruct writebuffer;
1784   writebuffer.offset=0;
1785
1786
1787   /* Send TRANS_PREFETCH control message */
1788   int first=1;
1789
1790   /* Send Oids and offsets in pairs */
1791   tmp = mcpilenode->objpiles;
1792   while(tmp != NULL) {
1793     len = sizeof(int)+sizeof(int) + sizeof(unsigned int) + sizeof(unsigned int) + ((tmp->numoffset) * sizeof(short));
1794     char oidnoffset[len+5];
1795     char *buf=oidnoffset;
1796     if (first) {
1797       *buf=TRANS_PREFETCH;
1798       buf++;len++;
1799       first=0;
1800     }
1801     *((int*)buf) = tmp->numoffset;
1802     buf+=sizeof(int);
1803     *((unsigned int *)buf) = tmp->oid;
1804     LOGOIDTYPE("S",tmp->oid,tmp->numoffset,myrdtsc());
1805 #ifdef TRANSSTATS
1806     sendRemoteReq++;
1807 #endif
1808     buf+=sizeof(unsigned int);
1809     *((unsigned int *)buf) = myIpAddr;
1810     buf+= sizeof(unsigned int);
1811     *((int*)buf) = gid;
1812     buf+=sizeof(int);
1813     memcpy(buf, tmp->offset, (tmp->numoffset)*sizeof(short));
1814     tmp = tmp->next;
1815     if (tmp==NULL) {
1816       *((int *)(&oidnoffset[len]))=-1;
1817       len+=sizeof(int);
1818     }
1819     if (tmp!=NULL)
1820       send_buf(sd, &writebuffer, oidnoffset, len);
1821     else
1822       forcesend_buf(sd, &writebuffer, oidnoffset, len);
1823   }
1824   LOGOIDTYPE("SREQ",0,0,myrdtsc());
1825   LOGEVENT('S');
1826   LOGTIME('S',0,0,myrdtsc(),gid); //after sending
1827   return;
1828 }
1829
1830 int getPrefetchResponse(int sd, struct readstruct *readbuffer) {
1831   int gid,length = 0, size = 0;
1832   char control;
1833   unsigned int oid;
1834   void *modptr, *oldptr;
1835
1836   recv_data_buf(sd, readbuffer, &length, sizeof(int));
1837   size = length - sizeof(int);
1838   char recvbuffer[size];
1839 #ifdef TRANSSTATS
1840   getResponse++;
1841   LOGEVENT('Z');
1842   LOGTIME('K',0,0, myrdtsc(),0); //log time after first recv
1843 #endif
1844   recv_data_buf(sd, readbuffer, recvbuffer, size);
1845   control = *((char *) recvbuffer);
1846   if(control == OBJECT_FOUND) {
1847     oid = *((unsigned int *)(recvbuffer + sizeof(char)));
1848     gid = *((int *) (recvbuffer+sizeof(char)+sizeof(unsigned int)));
1849     LOGTIME('G',oid,0, myrdtsc(),gid); //log time after first recv
1850     size = size - (sizeof(char) + sizeof(unsigned int) + sizeof(int));
1851     pthread_mutex_lock(&prefetchcache_mutex);
1852     if ((modptr = prefetchobjstrAlloc(size)) == NULL) {
1853       printf("Error: objstrAlloc error for copying into prefetch cache %s, %d\n", __FILE__, __LINE__);
1854       pthread_mutex_unlock(&prefetchcache_mutex);
1855       return -1;
1856     }
1857     pthread_mutex_unlock(&prefetchcache_mutex);
1858     memcpy(modptr, recvbuffer + sizeof(char) + sizeof(unsigned int)+sizeof(int), size);
1859     STATUS(modptr)=0;
1860
1861
1862     /* Insert the oid and its address into the prefetch hash lookup table */
1863     /* Do a version comparison if the oid exists */
1864     if((oldptr = prehashSearch(oid)) != NULL) {
1865       /* If older version then update with new object ptr */
1866       if(((objheader_t *)oldptr)->version < ((objheader_t *)modptr)->version) {
1867         prehashInsert(oid, modptr);
1868       }
1869     } else { /* Else add the object ptr to hash table*/
1870       prehashInsert(oid, modptr);
1871     }
1872     LOGOIDTYPE("GR",oid, TYPE(modptr),myrdtsc());
1873     LOGTIME('Z',oid, TYPE(modptr), myrdtsc(),gid); //log time after copying it into the prefetch cache
1874   } else if(control == OBJECT_NOT_FOUND) {
1875     oid = *((unsigned int *)(recvbuffer + sizeof(char)));
1876     gid = *((int *) (recvbuffer+sizeof(char)+sizeof(unsigned int)));
1877     LOGOIDTYPE("NF",oid,0,myrdtsc());
1878     LOGTIME('F',oid, 0, myrdtsc(),gid); //log time after copying it into the prefetch cache
1879     /* TODO: For each object not found query DHT for new location and retrieve the object */
1880     /* Throw an error */
1881     //printf("OBJECT %x NOT FOUND.... THIS SHOULD NOT HAPPEN...TERMINATE PROGRAM\n", oid);
1882     //    exit(-1);
1883   } else {
1884     printf("Error: in decoding the control value %d, %s, %d\n",control, __FILE__, __LINE__);
1885   }
1886
1887   return 0;
1888 }
1889
1890 unsigned short getObjType(unsigned int oid) {
1891   objheader_t *objheader;
1892   unsigned short numoffset[] ={0};
1893   short fieldoffset[] ={};
1894
1895   if ((objheader = (objheader_t *) mhashSearch(oid)) == NULL) {
1896 #ifdef CACHE
1897     if ((objheader = (objheader_t *) prehashSearch(oid)) == NULL) {
1898 #endif
1899     unsigned int mid = lhashSearch(oid);
1900     int sd = getSock2(transReadSockPool, mid);
1901     char remotereadrequest[sizeof(char)+sizeof(unsigned int)];
1902     remotereadrequest[0] = READ_REQUEST;
1903     *((unsigned int *)(&remotereadrequest[1])) = oid;
1904     send_data(sd, remotereadrequest, sizeof(remotereadrequest));
1905
1906     /* Read response from the Participant */
1907     char control;
1908     recv_data(sd, &control, sizeof(char));
1909
1910     if (control==OBJECT_NOT_FOUND) {
1911       printf("Error: in %s() THIS SHOULD NOT HAPPEN.....EXIT PROGRAM\n", __func__);
1912       fflush(stdout);
1913       exit(-1);
1914     } else {
1915       /* Read object if found into local cache */
1916       int size;
1917       recv_data(sd, &size, sizeof(int));
1918 #ifdef CACHE
1919       pthread_mutex_lock(&prefetchcache_mutex);
1920       if ((objheader = prefetchobjstrAlloc(size)) == NULL) {
1921         printf("Error: %s() objstrAlloc error for copying into prefetch cache %s, %d\n", __func__, __FILE__, __LINE__);
1922         pthread_exit(NULL);
1923       }
1924       pthread_mutex_unlock(&prefetchcache_mutex);
1925       recv_data(sd, objheader, size);
1926       prehashInsert(oid, objheader);
1927       return TYPE(objheader);
1928 #else
1929       char *buffer;
1930       if((buffer = calloc(1, size)) == NULL) {
1931         printf("%s() Calloc Error %s at line %d\n", __func__, __FILE__, __LINE__);
1932         fflush(stdout);
1933         return 0;
1934       }
1935       recv_data(sd, buffer, size);
1936       objheader = (objheader_t *)buffer;
1937       unsigned short type = TYPE(objheader);
1938       free(buffer);
1939       return type;
1940 #endif
1941     }
1942 #ifdef CACHE
1943   }
1944 #endif
1945   }
1946   return TYPE(objheader);
1947 }
1948
1949 int startRemoteThread(unsigned int oid, unsigned int mid) {
1950   int sock;
1951   struct sockaddr_in remoteAddr;
1952   char msg[1 + sizeof(unsigned int)];
1953   int bytesSent;
1954   int status;
1955
1956   if ((sock = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
1957     perror("startRemoteThread():socket()");
1958     return -1;
1959   }
1960
1961   bzero(&remoteAddr, sizeof(remoteAddr));
1962   remoteAddr.sin_family = AF_INET;
1963   remoteAddr.sin_port = htons(LISTEN_PORT);
1964   remoteAddr.sin_addr.s_addr = htonl(mid);
1965
1966   if (connect(sock, (struct sockaddr *)&remoteAddr, sizeof(remoteAddr)) < 0) {
1967     printf("startRemoteThread():error %d connecting to %s:%d\n", errno,
1968            inet_ntoa(remoteAddr.sin_addr), LISTEN_PORT);
1969     status = -1;
1970   } else
1971   {
1972     msg[0] = START_REMOTE_THREAD;
1973     *((unsigned int *) &msg[1]) = oid;
1974     send_data(sock, msg, 1 + sizeof(unsigned int));
1975   }
1976
1977   close(sock);
1978   return status;
1979 }
1980
1981 //TODO: when reusing oids, make sure they are not already in use!
1982 static unsigned int id = 0xFFFFFFFF;
1983 unsigned int getNewOID(void) {
1984   id += 2;
1985   if (id > oidMax || id < oidMin) {
1986     id = (oidMin | 1);
1987   }
1988   return id;
1989 }
1990
1991 int processConfigFile() {
1992   FILE *configFile;
1993   const int maxLineLength = 200;
1994   char lineBuffer[maxLineLength];
1995   char *token;
1996   const char *delimiters = " \t\n";
1997   char *commentBegin;
1998   in_addr_t tmpAddr;
1999
2000   configFile = fopen(CONFIG_FILENAME, "r");
2001   if (configFile == NULL) {
2002     printf("error opening %s:\n", CONFIG_FILENAME);
2003     perror("");
2004     return -1;
2005   }
2006
2007   numHostsInSystem = 0;
2008   sizeOfHostArray = 8;
2009   hostIpAddrs = calloc(sizeOfHostArray, sizeof(unsigned int));
2010
2011   while(fgets(lineBuffer, maxLineLength, configFile) != NULL) {
2012     commentBegin = strchr(lineBuffer, '#');
2013     if (commentBegin != NULL)
2014       *commentBegin = '\0';
2015     token = strtok(lineBuffer, delimiters);
2016     while (token != NULL) {
2017       tmpAddr = inet_addr(token);
2018       if ((int)tmpAddr == -1) {
2019         printf("error in %s: bad token:%s\n", CONFIG_FILENAME, token);
2020         fclose(configFile);
2021         return -1;
2022       } else
2023         addHost(htonl(tmpAddr));
2024       token = strtok(NULL, delimiters);
2025     }
2026   }
2027
2028   fclose(configFile);
2029
2030   if (numHostsInSystem < 1) {
2031     printf("error in %s: no IP Adresses found\n", CONFIG_FILENAME);
2032     return -1;
2033   }
2034 #ifdef MAC
2035   myIpAddr = getMyIpAddr("en1");
2036 #else
2037   myIpAddr = getMyIpAddr("eth0");
2038 #endif
2039   myIndexInHostArray = findHost(myIpAddr);
2040   if (myIndexInHostArray == -1) {
2041     printf("error in %s: IP Address of eth0 not found\n", CONFIG_FILENAME);
2042     return -1;
2043   }
2044   oidsPerBlock = (0xFFFFFFFF / numHostsInSystem) + 1;
2045   oidMin = oidsPerBlock * myIndexInHostArray;
2046   if (myIndexInHostArray == numHostsInSystem - 1)
2047     oidMax = 0xFFFFFFFF;
2048   else
2049     oidMax = oidsPerBlock * (myIndexInHostArray + 1) - 1;
2050
2051   return 0;
2052 }
2053
2054 void addHost(unsigned int hostIp) {
2055   unsigned int *tmpArray;
2056
2057   if (findHost(hostIp) != -1)
2058     return;
2059
2060   if (numHostsInSystem == sizeOfHostArray) {
2061     tmpArray = calloc(sizeOfHostArray * 2, sizeof(unsigned int));
2062     memcpy(tmpArray, hostIpAddrs, sizeof(unsigned int) * numHostsInSystem);
2063     free(hostIpAddrs);
2064     hostIpAddrs = tmpArray;
2065   }
2066
2067   hostIpAddrs[numHostsInSystem++] = hostIp;
2068
2069   return;
2070 }
2071
2072 int findHost(unsigned int hostIp) {
2073   int i;
2074   for (i = 0; i < numHostsInSystem; i++)
2075     if (hostIpAddrs[i] == hostIp)
2076       return i;
2077
2078   //not found
2079   return -1;
2080 }
2081
2082 /* This function sends notification request per thread waiting on object(s) whose version
2083  * changes */
2084 int reqNotify(unsigned int *oidarry, unsigned short *versionarry, unsigned int numoid) {
2085   int sock,i;
2086   objheader_t *objheader;
2087   struct sockaddr_in remoteAddr;
2088   char msg[1 + numoid * (sizeof(unsigned short) + sizeof(unsigned int)) +  3 * sizeof(unsigned int)];
2089   char *ptr;
2090   int bytesSent;
2091   int status, size;
2092   unsigned short version;
2093   unsigned int oid,mid;
2094   static unsigned int threadid = 0;
2095   pthread_mutex_t threadnotify = PTHREAD_MUTEX_INITIALIZER; //Lock and condition var for threadjoin and notification
2096   pthread_cond_t threadcond = PTHREAD_COND_INITIALIZER;
2097   notifydata_t *ndata;
2098
2099   oid = oidarry[0];
2100   if((mid = lhashSearch(oid)) == 0) {
2101     printf("Error: %s() No such machine found for oid =%x\n",__func__, oid);
2102     return;
2103   }
2104
2105   if ((sock = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
2106     perror("reqNotify():socket()");
2107     return -1;
2108   }
2109
2110   bzero(&remoteAddr, sizeof(remoteAddr));
2111   remoteAddr.sin_family = AF_INET;
2112   remoteAddr.sin_port = htons(LISTEN_PORT);
2113   remoteAddr.sin_addr.s_addr = htonl(mid);
2114
2115   /* Generate unique threadid */
2116   threadid++;
2117
2118   /* Save threadid, numoid, oidarray, versionarray, pthread_cond_variable for later processing */
2119   if((ndata = calloc(1, sizeof(notifydata_t))) == NULL) {
2120     printf("Calloc Error %s, %d\n", __FILE__, __LINE__);
2121     return -1;
2122   }
2123   ndata->numoid = numoid;
2124   ndata->threadid = threadid;
2125   ndata->oidarry = oidarry;
2126   ndata->versionarry = versionarry;
2127   ndata->threadcond = threadcond;
2128   ndata->threadnotify = threadnotify;
2129   if((status = notifyhashInsert(threadid, ndata)) != 0) {
2130     printf("reqNotify(): Insert into notify hash table not successful %s, %d\n", __FILE__, __LINE__);
2131     free(ndata);
2132     return -1;
2133   }
2134
2135   /* Send  number of oids, oidarry, version array, machine id and threadid */
2136   if (connect(sock, (struct sockaddr *)&remoteAddr, sizeof(remoteAddr)) < 0) {
2137     printf("reqNotify():error %d connecting to %s:%d\n", errno,
2138            inet_ntoa(remoteAddr.sin_addr), LISTEN_PORT);
2139     free(ndata);
2140     return -1;
2141   } else {
2142     msg[0] = THREAD_NOTIFY_REQUEST;
2143     *((unsigned int *)(&msg[1])) = numoid;
2144     /* Send array of oids  */
2145     size = sizeof(unsigned int);
2146
2147     for(i = 0;i < numoid; i++) {
2148       oid = oidarry[i];
2149       *((unsigned int *)(&msg[1] + size)) = oid;
2150       size += sizeof(unsigned int);
2151     }
2152
2153     /* Send array of version  */
2154     for(i = 0;i < numoid; i++) {
2155       version = versionarry[i];
2156       *((unsigned short *)(&msg[1] + size)) = version;
2157       size += sizeof(unsigned short);
2158     }
2159
2160     *((unsigned int *)(&msg[1] + size)) = myIpAddr; size += sizeof(unsigned int);
2161     *((unsigned int *)(&msg[1] + size)) = threadid;
2162     pthread_mutex_lock(&(ndata->threadnotify));
2163     size = 1 + numoid * (sizeof(unsigned int) + sizeof(unsigned short)) + 3 * sizeof(unsigned int);
2164     send_data(sock, msg, size);
2165     pthread_cond_wait(&(ndata->threadcond), &(ndata->threadnotify));
2166     pthread_mutex_unlock(&(ndata->threadnotify));
2167   }
2168
2169   pthread_cond_destroy(&threadcond);
2170   pthread_mutex_destroy(&threadnotify);
2171   free(ndata);
2172   close(sock);
2173   return status;
2174 }
2175
2176 void threadNotify(unsigned int oid, unsigned short version, unsigned int tid) {
2177   notifydata_t *ndata;
2178   int i, objIsFound = 0, index;
2179   void *ptr;
2180
2181   //Look up the tid and call the corresponding pthread_cond_signal
2182   if((ndata = notifyhashSearch(tid)) == NULL) {
2183     printf("threadnotify(): No such threadid is present %s, %d\n", __FILE__, __LINE__);
2184     return;
2185   } else  {
2186     for(i = 0; i < ndata->numoid; i++) {
2187       if(ndata->oidarry[i] == oid) {
2188         objIsFound = 1;
2189         index = i;
2190       }
2191     }
2192     if(objIsFound == 0) {
2193       printf("threadNotify(): Oid not found %s, %d\n", __FILE__, __LINE__);
2194       return;
2195     } else {
2196       if(version <= ndata->versionarry[index]) {
2197         printf("threadNotify(): New version %d has not changed since last version for oid = %d, %s, %d\n", version, oid, __FILE__, __LINE__);
2198         return;
2199       } else {
2200 #ifdef CACHE
2201         /* Clear from prefetch cache and free thread related data structure */
2202         if((ptr = prehashSearch(oid)) != NULL) {
2203           prehashRemove(oid);
2204         }
2205 #endif
2206         pthread_mutex_lock(&(ndata->threadnotify));
2207         pthread_cond_signal(&(ndata->threadcond));
2208         pthread_mutex_unlock(&(ndata->threadnotify));
2209       }
2210     }
2211   }
2212   return;
2213 }
2214
2215 int notifyAll(threadlist_t **head, unsigned int oid, unsigned int version) {
2216   threadlist_t *ptr;
2217   unsigned int mid;
2218   struct sockaddr_in remoteAddr;
2219   char msg[1 + sizeof(unsigned short) + 2*sizeof(unsigned int)];
2220   int sock, status, size, bytesSent;
2221
2222   while(*head != NULL) {
2223     ptr = *head;
2224     mid = ptr->mid;
2225     //create a socket connection to that machine
2226     if ((sock = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
2227       perror("notifyAll():socket()");
2228       return -1;
2229     }
2230
2231     bzero(&remoteAddr, sizeof(remoteAddr));
2232     remoteAddr.sin_family = AF_INET;
2233     remoteAddr.sin_port = htons(LISTEN_PORT);
2234     remoteAddr.sin_addr.s_addr = htonl(mid);
2235     //send Thread Notify response and threadid to that machine
2236     if (connect(sock, (struct sockaddr *)&remoteAddr, sizeof(remoteAddr)) < 0) {
2237       printf("notifyAll():error %d connecting to %s:%d\n", errno,
2238              inet_ntoa(remoteAddr.sin_addr), LISTEN_PORT);
2239       fflush(stdout);
2240       status = -1;
2241     } else {
2242       bzero(msg, (1+sizeof(unsigned short) + 2*sizeof(unsigned int)));
2243       msg[0] = THREAD_NOTIFY_RESPONSE;
2244       *((unsigned int *)&msg[1]) = oid;
2245       size = sizeof(unsigned int);
2246       *((unsigned short *)(&msg[1]+ size)) = version;
2247       size+= sizeof(unsigned short);
2248       *((unsigned int *)(&msg[1]+ size)) = ptr->threadid;
2249
2250       size = 1 + 2*sizeof(unsigned int) + sizeof(unsigned short);
2251       send_data(sock, msg, size);
2252     }
2253     //close socket
2254     close(sock);
2255     // Update head
2256     *head = ptr->next;
2257     free(ptr);
2258   }
2259   return status;
2260 }
2261
2262 void transAbort() {
2263 #ifdef ABORTREADERS
2264   removetransactionhash();
2265 #endif
2266   objstrDelete(t_cache);
2267   t_chashDelete();
2268 }
2269
2270 /* This function inserts necessary information into
2271  * a machine pile data structure */
2272 plistnode_t *pInsert(plistnode_t *pile, objheader_t *headeraddr, unsigned int mid, int num_objs) {
2273   plistnode_t *ptr, *tmp;
2274   int found = 0, offset = 0;
2275
2276   tmp = pile;
2277   //Add oid into a machine that is already present in the pile linked list structure
2278   while(tmp != NULL) {
2279     if (tmp->mid == mid) {
2280       int tmpsize;
2281
2282       if (STATUS(headeraddr) & NEW) {
2283         tmp->oidcreated[tmp->numcreated] = OID(headeraddr);
2284         tmp->numcreated++;
2285         GETSIZE(tmpsize, headeraddr);
2286         tmp->sum_bytes += sizeof(objheader_t) + tmpsize;
2287       } else if (STATUS(headeraddr) & DIRTY) {
2288         tmp->oidmod[tmp->nummod] = OID(headeraddr);
2289         tmp->nummod++;
2290         GETSIZE(tmpsize, headeraddr);
2291         tmp->sum_bytes += sizeof(objheader_t) + tmpsize;
2292       } else {
2293         offset = (sizeof(unsigned int) + sizeof(short)) * tmp->numread;
2294         *((unsigned int *)(((char *)tmp->objread) + offset))=OID(headeraddr);
2295         offset += sizeof(unsigned int);
2296         *((short *)(((char *)tmp->objread) + offset)) = headeraddr->version;
2297         tmp->numread++;
2298       }
2299       found = 1;
2300       break;
2301     }
2302     tmp = tmp->next;
2303   }
2304   //Add oid for any new machine
2305   if (!found) {
2306     int tmpsize;
2307     if((ptr = pCreate(num_objs)) == NULL) {
2308       return NULL;
2309     }
2310     ptr->mid = mid;
2311     if (STATUS(headeraddr) & NEW) {
2312       ptr->oidcreated[ptr->numcreated] = OID(headeraddr);
2313       ptr->numcreated++;
2314       GETSIZE(tmpsize, headeraddr);
2315       ptr->sum_bytes += sizeof(objheader_t) + tmpsize;
2316     } else if (STATUS(headeraddr) & DIRTY) {
2317       ptr->oidmod[ptr->nummod] = OID(headeraddr);
2318       ptr->nummod++;
2319       GETSIZE(tmpsize, headeraddr);
2320       ptr->sum_bytes += sizeof(objheader_t) + tmpsize;
2321     } else {
2322       *((unsigned int *)ptr->objread)=OID(headeraddr);
2323       offset = sizeof(unsigned int);
2324       *((short *)(((char *)ptr->objread) + offset)) = headeraddr->version;
2325       ptr->numread++;
2326     }
2327     ptr->next = pile;
2328     pile = ptr;
2329   }
2330
2331   /* Clear Flags */
2332   STATUS(headeraddr) =0;
2333
2334
2335   return pile;
2336 }
2337
2338 plistnode_t *sortPiles(plistnode_t *pileptr) {
2339   plistnode_t *head, *ptr, *tail;
2340   head = pileptr;
2341   ptr = pileptr;
2342   /* Get tail pointer */
2343   while(ptr!= NULL) {
2344     tail = ptr;
2345     ptr = ptr->next;
2346   }
2347   ptr = pileptr;
2348   plistnode_t *prev = pileptr;
2349   /* Arrange local machine processing at the end of the pile list */
2350   while(ptr != NULL) {
2351     if(ptr != tail) {
2352       if(ptr->mid == myIpAddr && (prev != pileptr)) {
2353         prev->next = ptr->next;
2354         ptr->next = NULL;
2355         tail->next = ptr;
2356         return pileptr;
2357       }
2358       if((ptr->mid == myIpAddr) && (prev == pileptr)) {
2359         prev = ptr->next;
2360         ptr->next = NULL;
2361         tail->next = ptr;
2362         return prev;
2363       }
2364       prev = ptr;
2365     }
2366     ptr = ptr->next;
2367   }
2368   return pileptr;
2369 }