annoying bug fixed; works well with current recovery system
[IRC.git] / Robust / src / Runtime / DSTM / interface / dstmserver.c
1 /* Coordinator => Machine that initiates the transaction request call for commiting a transaction
2  * Participant => Machines that host the objects involved in a transaction commit */
3
4 #include <netinet/tcp.h>
5 #include "dstm.h"
6 #include "altmlookup.h"
7 #include "llookup.h"
8 #include "threadnotify.h"
9 #include "prefetch.h"
10 #include <sched.h>
11 #ifdef COMPILER
12 #include "thread.h"
13 #endif
14 #include "gCollect.h"
15 #include "readstruct.h"
16 #include "debugmacro.h"
17 #ifdef SANDBOX
18 #include "sandbox.h"
19 #endif
20
21 #define BACKLOG 10 //max pending connections
22 #define RECEIVE_BUFFER_SIZE 2048
23
24 extern int classsize[];
25 extern int numHostsInSystem;
26 extern pthread_mutex_t notifymutex;
27 extern unsigned long long clockoffset;
28 long long startreq, endreq, diff;
29
30 //#define LOGTIMES
31 #ifdef LOGTIMES
32 extern char bigarray1[6*1024*1024];
33 extern unsigned int bigarray2[6*1024*1024];
34 extern unsigned int bigarray3[6*1024*1024];
35 extern long long bigarray4[6*1024*1024];
36 extern int bigarray5[6*1024*1024];
37 extern int bigindex1;
38 #define LOGTIME(x,y,z,a,b) {\
39   int tmp=bigindex1; \
40   bigarray1[tmp]=x; \
41   bigarray2[tmp]=y; \
42   bigarray3[tmp]=z; \
43   bigarray4[tmp]=a; \
44   bigarray5[tmp]=b; \
45   bigindex1++; \
46 }
47 #else
48 #define LOGTIME(x,y,z,a,b)
49 #endif
50
51
52 long long myrdtsc(void)
53 {
54   unsigned hi, lo; 
55   __asm__ __volatile__ ("rdtsc" : "=a"(lo), "=d"(hi));
56   return ( (unsigned long long)lo)|( ((unsigned long long)hi)<<32 );
57 }
58
59 objstr_t *mainobjstore;
60 pthread_mutex_t mainobjstore_mutex;
61 pthread_mutex_t lockObjHeader;
62 pthread_mutexattr_t mainobjstore_mutex_attr; /* Attribute for lock to make it a recursive lock */
63
64 sockPoolHashTable_t *transPResponseSocketPool;
65
66 /* This function initializes the main objects store and creates the
67  * global machine and location lookup table */
68
69 int dstmInit(void) {
70   mainobjstore = objstrCreate(DEFAULT_OBJ_STORE_SIZE);
71   /* Initialize attribute for mutex */
72   pthread_mutexattr_init(&mainobjstore_mutex_attr);
73   pthread_mutexattr_settype(&mainobjstore_mutex_attr, PTHREAD_MUTEX_RECURSIVE_NP);
74   pthread_mutex_init(&mainobjstore_mutex, &mainobjstore_mutex_attr);
75   pthread_mutex_init(&lockObjHeader,NULL);
76   if (mhashCreate(MHASH_SIZE, MLOADFACTOR))
77     return 1;             //failure
78
79   if (lhashCreate(HASH_SIZE, LOADFACTOR))
80     return 1;             //failure
81
82   if (notifyhashCreate(N_HASH_SIZE, N_LOADFACTOR))
83     return 1;             //failure
84
85   //Initialize socket pool
86   if((transPResponseSocketPool = createSockPool(transPResponseSocketPool, DEFAULTSOCKPOOLSIZE)) == NULL) {
87     printf("Error in creating new socket pool at  %s line %d\n", __FILE__, __LINE__);
88     return 0;
89   }
90
91   return 0;
92 }
93
94
95 int startlistening() {
96   int listenfd;
97   struct sockaddr_in my_addr;
98   socklen_t addrlength = sizeof(struct sockaddr);
99   int setsockflag=1;
100
101   listenfd = socket(AF_INET, SOCK_STREAM, 0);
102   if (listenfd == -1) {
103     perror("socket");
104     exit(1);
105   }
106
107   if (setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, &setsockflag, sizeof (setsockflag)) < 0) {
108     perror("socket");
109     exit(1);
110   }
111 #ifdef MAC
112   if (setsockopt(listenfd, SOL_SOCKET, SO_NOSIGPIPE, &setsockflag, sizeof (setsockflag)) < 0) {
113     perror("socket");
114     exit(1);
115   }
116 #endif
117
118   my_addr.sin_family = AF_INET;
119   my_addr.sin_port = htons(LISTEN_PORT);
120   my_addr.sin_addr.s_addr = INADDR_ANY;
121   memset(&(my_addr.sin_zero), '\0', 8);
122
123   if (bind(listenfd, (struct sockaddr *)&my_addr, addrlength) == -1) {
124     perror("bind");
125     exit(1);
126   }
127
128   if (listen(listenfd, BACKLOG) == -1) {
129     perror("listen");
130     exit(1);
131   }
132   return listenfd;
133 }
134
135 /* This function starts the thread to listen on a socket
136  * for tranaction calls */
137 void *dstmListen(void *lfd) {
138   int listenfd=(int)lfd;
139   int acceptfd;
140   struct sockaddr_in client_addr;
141   socklen_t addrlength = sizeof(struct sockaddr);
142   pthread_t thread_dstm_accept;
143
144   printf("Listening on port %d, fd = %d\n", LISTEN_PORT, listenfd);
145   while(1) {
146     int retval;
147     int flag=1;
148     acceptfd = accept(listenfd, (struct sockaddr *)&client_addr, &addrlength);
149     setsockopt(acceptfd, IPPROTO_TCP, TCP_NODELAY, (char *) &flag, sizeof(flag));
150     do {
151       retval=pthread_create(&thread_dstm_accept, NULL, dstmAccept, (void *)acceptfd);
152     } while(retval!=0);
153     pthread_detach(thread_dstm_accept);
154   }
155 }
156 /* This function accepts a new connection request, decodes the control message in the connection
157  * and accordingly calls other functions to process new requests */
158 void *dstmAccept(void *acceptfd) {
159   int val, retval, size, sum, sockid;
160   unsigned int oid;
161   char *buffer;
162   char control,ctrl;
163   void *srcObj;
164   objheader_t *h;
165   trans_commit_data_t transinfo;
166   unsigned short objType, *versionarry, version;
167   unsigned int *oidarry, numoid, mid, threadid;
168   struct readstruct readbuffer;
169   readbuffer.head=0;
170   readbuffer.tail=0;
171   unsigned int numread=0, nummod=0;
172 #ifdef SANDBOX
173   objData_t odata;
174   char *ptr;
175 #endif
176
177   /* Receive control messages from other machines */
178   while(1) {
179     int ret=recv_data_errorcode_buf((int)acceptfd, &readbuffer, &control, sizeof(char));
180     if (ret==0)
181       break;
182     if (ret==-1) {
183       printf("DEBUG -> RECV Error!.. retrying\n");
184       break;
185     }
186     switch(control) {
187     case READ_REQUEST:
188       /* Read oid requested and search if available */
189       recv_data_buf((int)acceptfd, &readbuffer, &oid, sizeof(unsigned int));
190       while((srcObj = mhashSearch(oid)) == NULL) {
191         int ret;
192         if((ret = sched_yield()) != 0) {
193           printf("%s(): error no %d in thread yield\n", __func__, errno);
194         }
195       }
196       h = (objheader_t *) srcObj;
197       /* If object is write locked, just wait */
198       /* May want to remove at some point */
199       while((*(volatile int *)STATUSPTR(h))<=0)
200         sched_yield();
201       GETSIZE(size, h);
202       size += sizeof(objheader_t);
203       sockid = (int) acceptfd;
204       if (h == NULL) {
205         ctrl = OBJECT_NOT_FOUND;
206         send_data(sockid, &ctrl, sizeof(char));
207       } else {
208         // Type
209         char msg[]={OBJECT_FOUND, 0, 0, 0, 0};
210         *((int *)&msg[1])=size;
211         send_data(sockid, &msg, sizeof(msg));
212         send_data(sockid, h, size);
213       }
214       break;
215
216     case READ_MULT_REQUEST:
217       break;
218
219     case MOVE_REQUEST:
220       break;
221
222     case MOVE_MULT_REQUEST:
223       break;
224
225     case TRANS_REQUEST:
226       /* Read transaction request */
227       transinfo.objlocked = NULL;
228       transinfo.objnotfound = NULL;
229       transinfo.modptr = NULL;
230       transinfo.numlocked = 0;
231       transinfo.numnotfound = 0;
232       if((val = readClientReq(&transinfo, (int)acceptfd, &readbuffer)) != 0) {
233         printf("Error: In readClientReq() %s, %d\n", __FILE__, __LINE__);
234         pthread_exit(NULL);
235       }
236       break;
237
238     case TRANS_PREFETCH:
239 #ifdef RANGEPREFETCH
240       if((val = rangePrefetchReq((int)acceptfd, &readbuffer)) != 0) {
241         printf("Error: In rangePrefetchReq() %s, %d\n", __FILE__, __LINE__);
242         break;
243       }
244 #else
245       LOGTIME('X',0,0,myrdtsc(),0);
246       if((val = prefetchReq((int)acceptfd, &readbuffer)) != 0) {
247         printf("Error: In prefetchReq() %s, %d\n", __FILE__, __LINE__);
248         break;
249       }
250 #endif
251       break;
252
253     case TRANS_PREFETCH_RESPONSE:
254 #ifdef RANGEPREFETCH
255       if((val = getRangePrefetchResponse((int)acceptfd, &readbuffer)) != 0) {
256         printf("Error: In getRangePrefetchRespose() %s, %d\n", __FILE__, __LINE__);
257         break;
258       }
259 #else
260       if((val = getPrefetchResponse((int) acceptfd, &readbuffer)) != 0) {
261         printf("Error: In getPrefetchResponse() %s, %d\n", __FILE__, __LINE__);
262         break;
263       }
264 #endif
265       break;
266
267     case START_REMOTE_THREAD:
268       recv_data_buf((int)acceptfd, &readbuffer, &oid, sizeof(unsigned int));
269       objType = getObjType(oid);
270       startDSMthread(oid, objType);
271       break;
272
273     case THREAD_NOTIFY_REQUEST:
274       recv_data_buf((int)acceptfd, &readbuffer, &numoid, sizeof(unsigned int));
275       size = (sizeof(unsigned int) + sizeof(unsigned short)) * numoid + 2 * sizeof(unsigned int);
276       if((buffer = calloc(1,size)) == NULL) {
277         printf("%s() Calloc error at %s, %d\n", __func__, __FILE__, __LINE__);
278         pthread_exit(NULL);
279       }
280
281       recv_data_buf((int)acceptfd, &readbuffer, buffer, size);
282
283       oidarry = calloc(numoid, sizeof(unsigned int));
284       memcpy(oidarry, buffer, sizeof(unsigned int) * numoid);
285       size = sizeof(unsigned int) * numoid;
286       versionarry = calloc(numoid, sizeof(unsigned short));
287       memcpy(versionarry, buffer+size, sizeof(unsigned short) * numoid);
288       size += sizeof(unsigned short) * numoid;
289       mid = *((unsigned int *)(buffer+size));
290       size += sizeof(unsigned int);
291       threadid = *((unsigned int *)(buffer+size));
292       processReqNotify(numoid, oidarry, versionarry, mid, threadid);
293       free(buffer);
294       break;
295
296     case THREAD_NOTIFY_RESPONSE:
297       size = sizeof(unsigned short) + 2 * sizeof(unsigned int);
298       if((buffer = calloc(1,size)) == NULL) {
299         printf("%s() Calloc error at %s, %d\n", __func__, __FILE__, __LINE__);
300         pthread_exit(NULL);
301       }
302
303       recv_data_buf((int)acceptfd, &readbuffer, buffer, size);
304
305       oid = *((unsigned int *)buffer);
306       size = sizeof(unsigned int);
307       version = *((unsigned short *)(buffer+size));
308       size += sizeof(unsigned short);
309       threadid = *((unsigned int *)(buffer+size));
310       threadNotify(oid,version,threadid);
311       free(buffer);
312       break;
313
314 #ifdef SANDBOX
315     case CHECK_OBJECTS: // check if versions of objects match
316       size = sizeof(odata) - 1;
317       ptr = (char*)&odata;
318       recv_data_buf((int)acceptfd, &readbuffer, ptr+1, size);
319       numread = odata.numread;
320       nummod = odata.nummod;
321       checkObjVersion(&readbuffer, (int) acceptfd, numread, nummod);
322       break;
323 #endif
324
325     case CLOSE_CONNECTION:
326       goto closeconnection;
327
328     default:
329       printf("Error: dstmAccept() Unknown opcode %d at %s, %d\n", control, __FILE__, __LINE__);
330     }
331   }
332
333 closeconnection:
334   /* Close connection */
335   if (close((int)acceptfd) == -1)
336     perror("close");
337   pthread_exit(NULL);
338 }
339
340 /* This function reads the information available in a transaction request
341  * and makes a function call to process the request */
342 int readClientReq(trans_commit_data_t *transinfo, int acceptfd, struct readstruct * readbuffer) {
343   char *ptr;
344   void *modptr;
345   unsigned int *oidmod, oid;
346   fixed_data_t fixed;
347   objheader_t *headaddr;
348   int sum, i, size, n, val;
349
350   oidmod = NULL;
351
352   /* Read fixed_data_t data structure */
353   size = sizeof(fixed) - 1;
354   ptr = (char *)&fixed;
355   fixed.control = TRANS_REQUEST;
356   recv_data_buf((int)acceptfd, readbuffer, ptr+1, size);
357
358   /* Read list of mids */
359   int mcount = fixed.mcount;
360   size = mcount * sizeof(unsigned int);
361   unsigned int listmid[mcount];
362   ptr = (char *) listmid;
363   recv_data_buf((int)acceptfd, readbuffer, ptr, size);
364
365   /* Read oid and version tuples for those objects that are not modified in the transaction */
366   int numread = fixed.numread;
367   size = numread * (sizeof(unsigned int) + sizeof(unsigned short));
368   char objread[size];
369   if(numread != 0) { //If pile contains more than one object to be read,
370     // keep reading all objects
371     recv_data_buf((int)acceptfd, readbuffer, objread, size);
372   }
373
374   /* Read modified objects */
375   if(fixed.nummod != 0) {
376     if ((modptr = calloc(1, fixed.sum_bytes)) == NULL) {
377       printf("calloc error for modified objects %s, %d\n", __FILE__, __LINE__);
378       return 1;
379     }
380     size = fixed.sum_bytes;
381     recv_data_buf((int)acceptfd, readbuffer, modptr, size);
382   }
383
384   /* Create an array of oids for modified objects */
385   oidmod = (unsigned int *) calloc(fixed.nummod, sizeof(unsigned int));
386   if (oidmod == NULL) {
387     printf("calloc error %s, %d\n", __FILE__, __LINE__);
388     return 1;
389   }
390   ptr = (char *) modptr;
391   for(i = 0 ; i < fixed.nummod; i++) {
392     headaddr = (objheader_t *) ptr;
393     oid = OID(headaddr);
394     oidmod[i] = oid;
395     int tmpsize=0;
396     GETSIZE(tmpsize, headaddr);
397     ptr += sizeof(objheader_t) + tmpsize;
398   }
399
400   /*Process the information read */
401   if((val = processClientReq(&fixed, transinfo, listmid, objread, modptr, oidmod, acceptfd, readbuffer)) != 0) {
402     printf("Error: In processClientReq() %s, %d\n", __FILE__, __LINE__);
403     /* Free resources */
404     if(oidmod != NULL) {
405       free(oidmod);
406     }
407     return 1;
408   }
409
410   /* Free resources */
411   if(oidmod != NULL) {
412     free(oidmod);
413   }
414
415   return 0;
416 }
417
418 /* This function processes the Coordinator's transaction request using "handleTransReq"
419  * function and sends a reply to the co-ordinator.
420  * Following this it also receives a new control message from the co-ordinator and processes this message*/
421 int processClientReq(fixed_data_t *fixed, trans_commit_data_t *transinfo,
422                      unsigned int *listmid, char *objread, void *modptr, unsigned int *oidmod, int acceptfd, struct readstruct *readbuffer) {
423
424   char control, sendctrl, retval;
425   objheader_t *tmp_header;
426   void *header;
427   int i = 0, val;
428
429   /* Send reply to the Coordinator */
430   if((retval = handleTransReq(fixed, transinfo, listmid, objread, modptr,acceptfd)) == 0 ) {
431     printf("Error: In handleTransReq() %s, %d\n", __FILE__, __LINE__);
432     return 1;
433   }
434
435   recv_data_buf((int)acceptfd, readbuffer, &control, sizeof(char));
436   /* Process the new control message */
437   switch(control) {
438   case TRANS_ABORT:
439     if (fixed->nummod > 0)
440       free(modptr);
441     /* Unlock objects that was locked due to this transaction */
442     int useWriteUnlock = 0; //TODO verify is this piece of unlocking code ever used
443     for(i = 0; i< transinfo->numlocked; i++) {
444       if(transinfo->objlocked[i] == -1) {
445         useWriteUnlock = 1;
446         continue;
447       }
448       if((header = mhashSearch(transinfo->objlocked[i])) == NULL) {
449         printf("mhashSearch returns NULL at %s, %d\n", __FILE__, __LINE__); // find the header address
450         return 1;
451       }
452       if(useWriteUnlock) {
453         write_unlock(STATUSPTR(header));
454       } else {
455         read_unlock(STATUSPTR(header));
456       }
457     }
458     break;
459
460   case TRANS_COMMIT:
461     /* Invoke the transCommit process() */
462     if((val = transCommitProcess(modptr, oidmod, transinfo->objlocked, fixed->nummod, transinfo->numlocked, (int)acceptfd)) != 0) {
463       printf("Error: In transCommitProcess() %s, %d\n", __FILE__, __LINE__);
464       /* Free memory */
465       if (transinfo->objlocked != NULL) {
466         free(transinfo->objlocked);
467       }
468       if (transinfo->objnotfound != NULL) {
469         free(transinfo->objnotfound);
470       }
471       return 1;
472     }
473     break;
474
475   case TRANS_ABORT_BUT_RETRY_COMMIT_WITH_RELOCATING:
476     break;
477
478   default:
479     printf("Error: No response to TRANS_AGREE OR DISAGREE protocol  control = %d %s, %d\n", control, __FILE__, __LINE__);
480     //TODO Use fixed.trans_id  TID since Client may have died
481     break;
482   }
483
484   /* Free memory */
485   if (transinfo->objlocked != NULL) {
486     free(transinfo->objlocked);
487   }
488   if (transinfo->objnotfound != NULL) {
489     free(transinfo->objnotfound);
490   }
491
492   return 0;
493 }
494
495 /* This function increments counters while running a voting decision on all objects involved
496  * in TRANS_REQUEST and If a TRANS_DISAGREE sends the response immediately back to the coordinator */
497 char handleTransReq(fixed_data_t *fixed, trans_commit_data_t *transinfo, unsigned int *listmid, char *objread, void *modptr, int acceptfd) {
498   int val, i = 0, j;
499   unsigned short version;
500   char control = 0, *ptr;
501   unsigned int oid;
502   unsigned int *oidnotfound, *oidlocked, *oidvernotmatch;
503   objheader_t *headptr;
504
505   /* Counters and arrays to formulate decision on control message to be sent */
506   oidnotfound = (unsigned int *) calloc(fixed->numread + fixed->nummod, sizeof(unsigned int));
507   oidlocked = (unsigned int *) calloc(fixed->numread + fixed->nummod + 1, sizeof(unsigned int));
508   oidvernotmatch = (unsigned int *) calloc(fixed->numread + fixed->nummod, sizeof(unsigned int));
509   int objnotfound = 0, objlocked = 0, objvernotmatch = 0;
510   int v_nomatch = 0, v_matchlock = 0, v_matchnolock = 0;
511   int numBytes = 0;
512   /* modptr points to the beginning of the object store
513    * created at the Pariticipant.
514    * Object store holds the modified objects involved in the transaction request */
515   ptr = (char *) modptr;
516
517   char retval;
518
519   /* Process each oid in the machine pile/ group per thread */
520   for (i = 0; i < fixed->numread + fixed->nummod; i++) {
521     if (i < fixed->numread) { //Objs only read and not modified
522       int incr = sizeof(unsigned int) + sizeof(unsigned short); // Offset that points to next position in the objread array
523       incr *= i;
524       oid = *((unsigned int *)(objread + incr));
525
526       incr += sizeof(unsigned int);
527       version = *((unsigned short *)(objread + incr));
528       retval=getCommitCountForObjRead(oidnotfound, oidlocked, oidvernotmatch, &objnotfound, &objlocked, &objvernotmatch,
529                                &v_matchnolock, &v_matchlock, &v_nomatch, &numBytes, &control, oid, version);
530     } else {  //Objs modified
531       if(i == fixed->numread) {
532         oidlocked[objlocked++] = -1;
533       }
534       int tmpsize;
535       headptr = (objheader_t *) ptr;
536       oid = OID(headptr);
537       version = headptr->version;
538       GETSIZE(tmpsize, headptr);
539       ptr += sizeof(objheader_t) + tmpsize;
540       retval=getCommitCountForObjMod(oidnotfound, oidlocked, oidvernotmatch, &objnotfound,
541                               &objlocked, &objvernotmatch, &v_matchnolock, &v_matchlock, &v_nomatch,
542                               &numBytes, &control, oid, version);
543     }
544     if(retval==TRANS_DISAGREE || retval==TRANS_SOFT_ABORT) {
545       //unlock objects as soon versions mismatch or locks cannot be acquired)
546       if (objlocked > 0) {
547         int useWriteUnlock = 0;
548         for(j = 0; j < objlocked; j++) {
549           if(oidlocked[j] == -1) {
550             useWriteUnlock = 1;
551             continue;
552           }
553           if((headptr = mhashSearch(oidlocked[j])) == NULL) {
554             printf("mhashSearch returns NULL at %s, %d\n", __FILE__, __LINE__);
555             return 0;
556           }
557           if(useWriteUnlock) {
558             write_unlock(STATUSPTR(headptr));
559           } else {
560             read_unlock(STATUSPTR(headptr));
561           }
562         }
563         if(v_nomatch > 0)
564           free(oidlocked);
565       }
566       objlocked=0;
567       break;
568     }
569   }
570   //go through rest of the objects for version mismatches
571   if(retval==TRANS_DISAGREE || retval==TRANS_SOFT_ABORT) {
572     i++;
573     procRestObjs(objread, ptr, i, fixed->numread, fixed->nummod, oidnotfound, oidvernotmatch, &objnotfound, &objvernotmatch, &v_nomatch, &numBytes);
574   }
575
576   /* send TRANS_DISAGREE and objs*/
577   if(v_nomatch > 0) {
578 #ifdef CACHE
579     char *objs = calloc(1, numBytes);
580     int j, offset = 0;
581     for(j = 0; j<objvernotmatch; j++) {
582       objheader_t *header = mhashSearch(oidvernotmatch[j]);
583       int size = 0;
584       GETSIZE(size, header);
585       size += sizeof(objheader_t);
586       memcpy(objs+offset, header, size);
587       offset += size;
588     }
589 #endif
590     /*
591     if (objlocked > 0) {
592       int useWriteUnlock = 0;
593       for(j = 0; j < objlocked; j++) {
594         if(oidlocked[j] == -1) {
595           useWriteUnlock = 1;
596           continue;
597         }
598         if((headptr = mhashSearch(oidlocked[j])) == NULL) {
599           printf("mhashSearch returns NULL at %s, %d\n", __FILE__, __LINE__);
600           return 0;
601         }
602         if(useWriteUnlock) {
603           write_unlock(STATUSPTR(headptr));
604         } else {
605           read_unlock(STATUSPTR(headptr));
606         }
607       }
608       free(oidlocked);
609     }
610     */
611     control=TRANS_DISAGREE;
612     send_data(acceptfd, &control, sizeof(char));
613 #ifdef CACHE
614     send_data(acceptfd, &numBytes, sizeof(int));
615     send_data(acceptfd, objs, numBytes);
616     transinfo->objvernotmatch = oidvernotmatch;
617     transinfo->numvernotmatch = objvernotmatch;
618     free(objs);
619     free(transinfo->objvernotmatch);
620 #endif
621     return control;
622   }
623
624   /* Decide what control message to send to Coordinator */
625   if ((control = decideCtrlMessage(fixed, transinfo, &v_matchnolock, &v_matchlock, &v_nomatch, &objnotfound, &objlocked,
626                                    modptr, oidnotfound, oidlocked, acceptfd)) == 0) {
627     printf("Error: In decideCtrlMessage() %s, %d\n", __FILE__, __LINE__);
628     return 0;
629   }
630   return control;
631 }
632
633 /* Update Commit info for objects that are read */
634 char getCommitCountForObjMod(unsigned int *oidnotfound, unsigned int *oidlocked,
635                              unsigned int *oidvernotmatch, int *objnotfound, int *objlocked, int *objvernotmatch,
636                              int *v_matchnolock, int *v_matchlock, int *v_nomatch, int *numBytes,
637                              char *control, unsigned int oid, unsigned short version) {
638   void *mobj;
639   /* Check if object is still present in the machine since the beginning of TRANS_REQUEST */
640
641   if ((mobj = mhashSearch(oid)) == NULL) {    /* Obj not found */
642     /* Save the oids not found and number of oids not found for later use */
643     oidnotfound[*objnotfound] = oid;
644     (*objnotfound)++;
645     *control = TRANS_DISAGREE;
646   } else {     /* If Obj found in machine (i.e. has not moved) */
647     /* Check if Obj is locked by any previous transaction */
648     if (write_trylock(STATUSPTR(mobj))) { // Can acquire write lock
649       if (version == ((objheader_t *)mobj)->version) { /* match versions */
650         (*v_matchnolock)++;
651     *control = TRANS_AGREE;
652       } else { /* If versions don't match ...HARD ABORT */
653         (*v_nomatch)++;
654         oidvernotmatch[*objvernotmatch] = oid;
655         (*objvernotmatch)++;
656         int size;
657         GETSIZE(size, mobj);
658         size += sizeof(objheader_t);
659         *numBytes += size;
660         /* Send TRANS_DISAGREE to Coordinator */
661         *control = TRANS_DISAGREE;
662       }
663       //Keep track of oid locked
664       oidlocked[(*objlocked)++] = OID(((objheader_t *)mobj));
665     } else {  //we are locked
666       if (version == ((objheader_t *)mobj)->version) {     /* Check if versions match */
667         (*v_matchlock)++;
668       *control=TRANS_SOFT_ABORT;
669       } else { /* If versions don't match ...HARD ABORT */
670         (*v_nomatch)++;
671         oidvernotmatch[*objvernotmatch] = oid;
672         (*objvernotmatch)++;
673         int size;
674         GETSIZE(size, mobj);
675         size += sizeof(objheader_t);
676         *numBytes += size;
677         *control = TRANS_DISAGREE;
678       }
679     }
680   }
681   return *control;
682 }
683
684 /* Update Commit info for objects that are read */
685 char getCommitCountForObjRead(unsigned int *oidnotfound, unsigned int *oidlocked, unsigned int *oidvernotmatch,
686                               int *objnotfound, int *objlocked, int * objvernotmatch, int *v_matchnolock, int *v_matchlock,
687                               int *v_nomatch, int *numBytes, char *control, unsigned int oid, unsigned short version) {
688   void *mobj;
689   /* Check if object is still present in the machine since the beginning of TRANS_REQUEST */
690   if ((mobj = mhashSearch(oid)) == NULL) {    /* Obj not found */
691     /* Save the oids not found and number of oids not found for later use */
692     oidnotfound[*objnotfound] = oid;
693     (*objnotfound)++;
694         *control = TRANS_DISAGREE;
695   } else {     /* If Obj found in machine (i.e. has not moved) */
696     /* Check if Obj is locked by any previous transaction */
697     if (read_trylock(STATUSPTR(mobj))) { //Can further acquire read locks
698       if (version == ((objheader_t *)mobj)->version) { /* match versions */
699         (*v_matchnolock)++;
700       *control=TRANS_AGREE;
701       } else { /* If versions don't match ...HARD ABORT */
702         (*v_nomatch)++;
703         oidvernotmatch[(*objvernotmatch)++] = oid;
704         int size;
705         GETSIZE(size, mobj);
706         size += sizeof(objheader_t);
707         *numBytes += size;
708         /* Send TRANS_DISAGREE to Coordinator */
709         *control = TRANS_DISAGREE;
710       }
711       //Keep track of oid locked
712       oidlocked[(*objlocked)++] = OID(((objheader_t *)mobj));
713     } else { /* Some other transaction has aquired a write lock on this object */
714       if (version == ((objheader_t *)mobj)->version) { /* Check if versions match */
715         (*v_matchlock)++;
716        *control=TRANS_SOFT_ABORT;
717       } else { /* If versions don't match ...HARD ABORT */
718         (*v_nomatch)++;
719         oidvernotmatch[*objvernotmatch] = oid;
720         (*objvernotmatch)++;
721         int size;
722         GETSIZE(size, mobj);
723         size += sizeof(objheader_t);
724         *numBytes += size;
725         *control = TRANS_DISAGREE;
726       }
727     }
728   }
729   return *control;
730 }
731
732 void procRestObjs(char *objread, 
733                   char *objmod, 
734                   int index, 
735                   int numread, 
736                   int nummod, 
737                   unsigned int *oidnotfound, 
738                   unsigned int *oidvernotmatch,
739                   int *objnotfound, 
740                   int *objvernotmatch, 
741                   int *v_nomatch, 
742                   int *numBytes) {
743   int i;
744   unsigned int oid;
745   unsigned short version;
746
747   /* Process each oid in the machine pile/ group per thread */
748   for (i = index; i < numread+nummod; i++) {
749     if (i < numread) { //Objs only read and not modified
750       int incr = sizeof(unsigned int) + sizeof(unsigned short); // Offset that points to next position in the objread array
751       incr *= i;
752       oid = *((unsigned int *)(objread + incr));
753       incr += sizeof(unsigned int);
754       version = *((unsigned short *)(objread + incr));
755     } else {  //Objs modified
756       objheader_t *headptr;
757       headptr = (objheader_t *) objmod;
758       oid = OID(headptr);
759       version = headptr->version;
760       int tmpsize;
761       GETSIZE(tmpsize, headptr);
762       objmod += sizeof(objheader_t) + tmpsize;
763     }
764     processVerNoMatch(oidnotfound,
765         oidvernotmatch,
766         objnotfound,
767         objvernotmatch,
768         v_nomatch,
769         numBytes,
770         oid, 
771         version);
772   }
773   return;
774 }
775
776 void processVerNoMatch(unsigned int *oidnotfound, 
777                       unsigned int *oidvernotmatch, 
778                       int *objnotfound, 
779                       int *objvernotmatch, 
780                       int *v_nomatch, 
781                       int *numBytes,
782                       unsigned int oid, 
783                       unsigned short version) {
784   void *mobj;
785   /* Check if object is still present in the machine since the beginning of TRANS_REQUEST */
786
787   if ((mobj = mhashSearch(oid)) == NULL) {    /* Obj not found */
788     /* Save the oids not found and number of oids not found for later use */
789     oidnotfound[*objnotfound] = oid;
790     (*objnotfound)++;
791   } else {     /* If Obj found in machine (i.e. has not moved) */
792     /* Check if Obj is locked by any previous transaction */
793     //if (!write_trylock(STATUSPTR(mobj))) { // Can acquire write lock
794     if (version != ((objheader_t *)mobj)->version) { /* match versions */
795       (*v_nomatch)++;
796       oidvernotmatch[*objvernotmatch] = oid;
797           (*objvernotmatch)++;
798           int size;
799       GETSIZE(size, mobj);
800       size += sizeof(objheader_t);
801       *numBytes += size;
802     }
803   }
804 }
805
806   /* This function decides what control message such as TRANS_AGREE, TRANS_DISAGREE or TRANS_SOFT_ABORT
807    * to send to Coordinator based on the votes of oids involved in the transaction */
808   char decideCtrlMessage(fixed_data_t *fixed, trans_commit_data_t *transinfo, int *v_matchnolock, int *v_matchlock,
809       int *v_nomatch, int *objnotfound, int *objlocked, void *modptr,
810       unsigned int *oidnotfound, unsigned int *oidlocked, int acceptfd) {
811     int val;
812     char control = 0;
813
814     /* Condition to send TRANS_AGREE */
815     if(*(v_matchnolock) == fixed->numread + fixed->nummod) {
816       control = TRANS_AGREE;
817       /* Send control message */
818       send_data(acceptfd, &control, sizeof(char));
819     }
820     /* Condition to send TRANS_SOFT_ABORT */
821     if((*(v_matchlock) > 0 && *(v_nomatch) == 0) || (*(objnotfound) > 0 && *(v_nomatch) == 0)) {
822       control = TRANS_SOFT_ABORT;
823
824       /* Send control message */
825       send_data(acceptfd, &control, sizeof(char));
826
827       /*  FIXME how to send objs Send number of oids not found and the missing oids if objects are missing in the machine */
828       if(*(objnotfound) != 0) {
829         int msg[1];
830         msg[0] = *(objnotfound);
831         send_data(acceptfd, &msg, sizeof(int));
832         int size = sizeof(unsigned int)* *(objnotfound);
833         send_data(acceptfd, oidnotfound, size);
834       }
835     }
836
837     /* Fill out the trans_commit_data_t data structure. This is required for a trans commit process
838      * if Participant receives a TRANS_COMMIT */
839     transinfo->objlocked = oidlocked;
840     transinfo->objnotfound = oidnotfound;
841     transinfo->modptr = modptr;
842     transinfo->numlocked = *(objlocked);
843     transinfo->numnotfound = *(objnotfound);
844     return control;
845   }
846
847   /* This function processes all modified objects involved in a TRANS_COMMIT and updates pointer
848    * addresses in lookup table and also changes version number
849    * Sends an ACK back to Coordinator */
850   int transCommitProcess(void *modptr, unsigned int *oidmod, unsigned int *oidlocked, int nummod, int numlocked, int acceptfd) {
851     objheader_t *header;
852     objheader_t *newheader;
853     int i = 0, offset = 0;
854     char control;
855     int tmpsize;
856
857     /* Process each modified object saved in the mainobject store */
858     for(i = 0; i < nummod; i++) {
859       if((header = (objheader_t *) mhashSearch(oidmod[i])) == NULL) {
860         printf("Error: mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__);
861         return 1;
862       }
863       GETSIZE(tmpsize,header);
864
865       {
866         struct ___Object___ *dst=(struct ___Object___*)((char*)header+sizeof(objheader_t));
867         struct ___Object___ *src=(struct ___Object___*)((char*)modptr+sizeof(objheader_t)+offset);
868         dst->type=src->type;
869         dst->___cachedCode___=src->___cachedCode___;
870         dst->___cachedHash___=src->___cachedHash___;
871         memcpy(&dst[1], &src[1], tmpsize-sizeof(struct ___Object___));
872       }
873
874       //memory barrier
875       CFENCE;
876
877       header->version += 1;
878       /* If threads are waiting on this object to be updated, notify them */
879       if(header->notifylist != NULL) {
880         notifyAll(&header->notifylist, OID(header), header->version);
881       }
882       offset += sizeof(objheader_t) + tmpsize;
883     }
884
885   if (nummod > 0)
886     free(modptr);
887
888   /* Unlock locked objects */
889   int useWriteUnlock = 0;
890   for(i = 0; i < numlocked; i++) {
891     if(oidlocked[i] == -1) {
892       useWriteUnlock = 1;
893       continue;
894     }
895     if((header = (objheader_t *) mhashSearch(oidlocked[i])) == NULL) {
896       printf("Error: mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__);
897       return 1;
898     }
899
900     if(useWriteUnlock) {
901       write_unlock(STATUSPTR(header));
902     } else {
903       read_unlock(STATUSPTR(header));
904     }
905   }
906   //TODO Update location lookup table
907   return 0;
908 }
909
910 /* This function recevies the oid and offset tuples from the Coordinator's prefetch call.
911  * Looks for the objects to be prefetched in the main object store.
912  * If objects are not found then record those and if objects are found
913  * then use offset values to prefetch references to other objects */
914 int prefetchReq(int acceptfd, struct readstruct * readbuffer) {
915   int i, size, objsize, numoffset = 0, gid=0;
916   int length;
917   char *recvbuffer, control;
918   unsigned int oid, mid=-1;
919   objheader_t *header;
920   oidmidpair_t oidmid;
921   struct writestruct writebuffer;
922   int sd = -1;
923
924   while(1) {
925     recv_data_buf((int)acceptfd, readbuffer, &numoffset, sizeof(int));
926     if(numoffset == -1)
927       break;
928     recv_data_buf((int)acceptfd, readbuffer, &oidmid, 2*sizeof(unsigned int));
929     oid = oidmid.oid;
930     if (mid != oidmid.mid) {
931       if (mid!=-1) {
932         forcesend_buf(sd, &writebuffer, NULL, 0);
933         freeSockWithLock(transPResponseSocketPool, mid, sd);
934       }
935       mid=oidmid.mid;
936       sd = getSockWithLock(transPResponseSocketPool, mid);
937       writebuffer.offset=0;
938     }
939     short offsetarry[numoffset];
940     recv_data_buf((int)acceptfd, readbuffer, &gid, sizeof(int));
941     recv_data_buf((int) acceptfd, readbuffer, offsetarry, numoffset*sizeof(short));
942     LOGTIME('A',oid ,0,myrdtsc(),gid); //after recv the entire prefetch request 
943
944     /*Process each oid */
945     if ((header = mhashSearch(oid)) == NULL) { /* Obj not found */
946       /* Save the oids not found in buffer for later use */
947       size = sizeof(int)+sizeof(int) + sizeof(char) + sizeof(unsigned int) ;
948       char sendbuffer[size+1];
949       sendbuffer[0]=TRANS_PREFETCH_RESPONSE;
950       *((int *) (sendbuffer+sizeof(char))) = size;
951       *((char *)(sendbuffer + sizeof(char)+sizeof(int))) = OBJECT_NOT_FOUND;
952       *((unsigned int *)(sendbuffer + sizeof(int) + sizeof(char)+sizeof(char))) = oid;
953       *((int *)(sendbuffer+sizeof(int) + sizeof(char)+sizeof(char)+sizeof(unsigned int))) = gid;
954       send_buf(sd, &writebuffer, sendbuffer, size+1);
955       LOGTIME('J',oid, 0,myrdtsc(), gid); //send first oid not found prefetch request
956     } else { /* Object Found */
957       int incr = 1;
958       GETSIZE(objsize, header);
959       size = sizeof(int)+sizeof(int) + sizeof(char) + sizeof(unsigned int) + sizeof(objheader_t) + objsize;
960       char sendbuffer[size+1];
961       sendbuffer[0]=TRANS_PREFETCH_RESPONSE;
962       *((int *)(sendbuffer + incr)) = size;
963       incr += sizeof(int);
964       *((char *)(sendbuffer + incr)) = OBJECT_FOUND;
965       incr += sizeof(char);
966       *((unsigned int *)(sendbuffer+incr)) = oid;
967       incr += sizeof(unsigned int);
968       *((int *)(sendbuffer+incr)) = gid;
969       incr += sizeof(int);
970       memcpy(sendbuffer + incr, header, objsize + sizeof(objheader_t));
971       send_buf(sd, &writebuffer, sendbuffer, size+1);
972       LOGOIDTYPE("SRES", oid, TYPE(header), (myrdtsc()-clockoffset));
973       LOGTIME('C',oid,TYPE(header),myrdtsc(), gid); //send first oid found from prefetch request
974
975       /* Calculate the oid corresponding to the offset value */
976       for(i = 0 ; i< numoffset ; i++) {
977         /* Check for arrays  */
978         if(TYPE(header) >= NUMCLASSES) {
979           int elementsize = classsize[TYPE(header)];
980           struct ArrayObject *ao = (struct ArrayObject *) (((char *)header) + sizeof(objheader_t));
981           unsigned short length = ao->___length___;
982           /* Check if array out of bounds */
983           if(offsetarry[i]< 0 || offsetarry[i] >= length) {
984             break;
985           }
986           oid = *((unsigned int *)(((char *)header) + sizeof(objheader_t) + sizeof(struct ArrayObject) + (elementsize*offsetarry[i])));
987         } else {
988           oid = *((unsigned int *)(((char *)header) + sizeof(objheader_t) + offsetarry[i]));
989         }
990
991         /* Don't continue if we hit a NULL pointer */
992         if (oid==0)
993           break;
994
995     LOGTIME('B',oid,0,myrdtsc(),gid); //send next oid found from prefetch request
996
997         if((header = mhashSearch(oid)) == NULL) {
998           size = sizeof(int)+sizeof(int) + sizeof(char) + sizeof(unsigned int) ;
999           char sendbuffer[size+1];
1000           sendbuffer[0]=TRANS_PREFETCH_RESPONSE;
1001           *((int *) (sendbuffer+1)) = size;
1002           *((char *)(sendbuffer + sizeof(char)+sizeof(int))) = OBJECT_NOT_FOUND;
1003           *((unsigned int *)(sendbuffer + sizeof(char)+sizeof(int) + sizeof(char))) = oid;
1004       *((int *)(sendbuffer+sizeof(int) + sizeof(char)+sizeof(char)+sizeof(unsigned int))) = gid;
1005
1006           send_buf(sd, &writebuffer, sendbuffer, size+1);
1007       LOGTIME('J',oid, 0,myrdtsc(), gid); //send first oid not found prefetch request
1008           break;
1009         } else { /* Obj Found */
1010           int incr = 1;
1011           GETSIZE(objsize, header);
1012           size = sizeof(int)+sizeof(int) + sizeof(char) + sizeof(unsigned int) + sizeof(objheader_t) + objsize;
1013           char sendbuffer[size+1];
1014           sendbuffer[0]=TRANS_PREFETCH_RESPONSE;
1015           *((int *)(sendbuffer + incr)) = size;
1016           incr += sizeof(int);
1017           *((char *)(sendbuffer + incr)) = OBJECT_FOUND;
1018           incr += sizeof(char);
1019           *((unsigned int *)(sendbuffer+incr)) = oid;
1020           incr += sizeof(unsigned int);
1021       *((int *)(sendbuffer+incr)) = gid;
1022       incr += sizeof(int);
1023           memcpy(sendbuffer + incr, header, objsize + sizeof(objheader_t));
1024           send_buf(sd, &writebuffer, sendbuffer, size+1);
1025       LOGOIDTYPE("SRES", oid, TYPE(header), (myrdtsc()-clockoffset));
1026       LOGTIME('C',oid,TYPE(header),myrdtsc(), gid); //send first oid found from prefetch request
1027         }
1028       } //end of for
1029     }
1030   } //end of while
1031
1032     //Release socket
1033   if (mid!=-1) {
1034     forcesend_buf(sd, &writebuffer, NULL, 0);
1035     freeSockWithLock(transPResponseSocketPool, mid, sd);
1036   }
1037   return 0;
1038 }
1039
1040 void sendPrefetchResponse(int sd, char *control, char *sendbuffer, int *size) {
1041   send_data(sd, control, sizeof(char));
1042   /* Send the buffer with its size */
1043   int length = *(size);
1044   send_data(sd, sendbuffer, length);
1045 }
1046
1047 void processReqNotify(unsigned int numoid, unsigned int *oidarry, unsigned short *versionarry, unsigned int mid, unsigned int threadid) {
1048   objheader_t *header;
1049   unsigned int oid;
1050   unsigned short newversion;
1051   char msg[1+  2 * sizeof(unsigned int) + sizeof(unsigned short)];
1052   int sd;
1053   struct sockaddr_in remoteAddr;
1054   //int bytesSent;
1055   int size;
1056   int i = 0;
1057
1058   while(i < numoid) {
1059     oid = *(oidarry + i);
1060     if((header = (objheader_t *) mhashSearch(oid)) == NULL) {
1061       printf("Error: mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__);
1062       return;
1063     } else {
1064       /* Check to see if versions are same */
1065 checkversion:
1066       if (write_trylock(STATUSPTR(header))) { // Can acquire write lock
1067         newversion = header->version;
1068         if(newversion == *(versionarry + i)) {
1069           //Add to the notify list
1070           if((header->notifylist = insNode(header->notifylist, threadid, mid)) == NULL) {
1071             printf("Error: Obj notify list points to NULL %s, %d\n", __FILE__, __LINE__);
1072             return;
1073           }
1074           write_unlock(STATUSPTR(header));
1075         } else {
1076           write_unlock(STATUSPTR(header));
1077           if ((sd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
1078             perror("processReqNotify():socket()");
1079             return;
1080           }
1081           bzero(&remoteAddr, sizeof(remoteAddr));
1082           remoteAddr.sin_family = AF_INET;
1083           remoteAddr.sin_port = htons(LISTEN_PORT);
1084           remoteAddr.sin_addr.s_addr = htonl(mid);
1085
1086           if (connect(sd, (struct sockaddr *)&remoteAddr, sizeof(remoteAddr)) < 0) {
1087             printf("Error: processReqNotify():error %d connecting to %s:%d\n", errno,
1088                    inet_ntoa(remoteAddr.sin_addr), LISTEN_PORT);
1089             close(sd);
1090             return;
1091           } else {
1092       
1093             //Send Update notification
1094             msg[0] = THREAD_NOTIFY_RESPONSE;
1095             *((unsigned int *)&msg[1]) = oid;
1096             size = sizeof(unsigned int);
1097             *((unsigned short *)(&msg[1]+size)) = newversion;
1098             size += sizeof(unsigned short);
1099             *((unsigned int *)(&msg[1]+size)) = threadid;
1100             size = 1+ 2*sizeof(unsigned int) + sizeof(unsigned short);
1101             send_data(sd, msg, size);
1102           }
1103           close(sd);
1104         }
1105       } else {
1106         randomdelay();
1107         goto checkversion;
1108       }
1109     }
1110     i++;
1111   }
1112   free(oidarry);
1113   free(versionarry);
1114 }