2c47e2ac932190b66fe62205f0a39e3a7ddbade8
[IRC.git] / Robust / src / Runtime / DSTM / interface / trans.c
1 #include "dstm.h"
2 #include "ip.h"
3 #include "clookup.h"
4 #include "mlookup.h"
5 #include "llookup.h"
6 #include "plookup.h"
7 #include<pthread.h>
8 #include<sys/types.h>
9 #include<sys/socket.h>
10 #include<netdb.h>
11 #include<netinet/in.h>
12
13 #define LISTEN_PORT 2156
14 #define MACHINE_IP "127.0.0.1"
15 #define RECEIVE_BUFFER_SIZE 2048
16
17 extern int classsize[];
18
19 transrecord_t *transStart()
20 {
21         transrecord_t *tmp = malloc(sizeof(transrecord_t));
22         tmp->cache = objstrCreate(1048576);
23         tmp->lookupTable = chashCreate(HASH_SIZE, LOADFACTOR);
24         return tmp;
25 }
26
27 objheader_t *transRead(transrecord_t *record, unsigned int oid)
28 {       
29 //      printf("Enter TRANS_READ\n");
30         unsigned int machinenumber;
31         objheader_t *tmp, *objheader;
32         void *objcopy;
33         int size;
34         void *buf;
35                 //check cache
36         if((objheader =(objheader_t *)chashSearch(record->lookupTable, oid)) != NULL){
37                 printf("transRead oid %d found local\n %s, %d", oid, __FILE__, __LINE__);
38                 return(objheader);
39         } else if ((objheader = (objheader_t *) mhashSearch(oid)) != NULL) {
40                 //Look up in Machine lookup table and found
41
42                 //printf("oid is found in Local machinelookup\n");
43                 tmp = mhashSearch(oid);
44                 size = sizeof(objheader_t)+classsize[tmp->type];
45                 //Copy into cache
46                 objcopy = objstrAlloc(record->cache, size);
47                 memcpy(objcopy, (void *)tmp, size);
48                 //Insert into cache's lookup table
49                 chashInsert(record->lookupTable, objheader->oid, objcopy); 
50                 return(objcopy);
51         } else {
52                 //Get the object from the remote location
53                 //printf("oid is found in remote machine\n");
54                 machinenumber = lhashSearch(oid);
55                 objcopy = getRemoteObj(record, machinenumber, oid);
56                 if(objcopy == NULL) {
57                         //If object is not found in Remote location
58                         printf("Object oid = %d not found in Machine %d at %s, %d\n", oid, machinenumber, __FILE__, __LINE__);
59                         return NULL;
60                 }
61                 else {
62                         printf("Object oid = %d found in Machine %d\n", oid, machinenumber);
63                         return(objcopy);
64                 }
65         } 
66 }
67
68 objheader_t *transCreateObj(transrecord_t *record, unsigned short type)
69 {
70         objheader_t *tmp = (objheader_t *) objstrAlloc(record->cache, (sizeof(objheader_t) + classsize[type]));
71         tmp->oid = getNewOID();
72         tmp->type = type;
73         tmp->version = 1;
74         tmp->rcount = 0; //? not sure how to handle this yet
75         tmp->status = 0;
76         tmp->status |= NEW;
77         chashInsert(record->lookupTable, tmp->oid, tmp);
78         return tmp;
79 }
80 //int decideResponse(thread_data_array_t *tdata, char *control, int sd) {
81 int decideResponse(thread_data_array_t *tdata, int sd, int val) {
82         int i, n, N, sum, retval, oidcount = 0;
83         int transagree = 0, transdisagree = 0, transsoftabort = 0, transsoftabortmiss = 0;
84         char ctrl, control, *ptr;
85         unsigned int *oidnotfound;
86         objheader_t *header;
87
88 //      printf("DEBUG -> pilecount is %d\n", tdata->pilecount);
89         //Check common data structure 
90         for (i = 0 ; i < tdata->pilecount ; i++) {
91                 //Switch case
92                 control = tdata->recvmsg[i].rcv_status;
93                 switch(control) {
94                         case TRANS_DISAGREE:
95 //                              printf("DEBUG-> Inside TRANS_DISAGREE\n");
96                                 transdisagree++;
97                                 //Free transaction records
98                                 objstrDelete(tdata->rec->cache);
99                                 chashDelete(tdata->rec->lookupTable);
100                                 free(tdata->rec);
101                                 //send Abort
102                                 ctrl = TRANS_ABORT;
103                                 for (i = 0 ; i < tdata->pilecount ; i++) { //send writes to all machine groups involved
104                                         if (write(sd, &ctrl, sizeof(char)) < 0) {
105                                                 perror("Error sending ctrl message for participant\n");
106                                                 return 1;
107                                         }
108                                 }
109                                 return 0;
110
111                         case TRANS_AGREE:
112                                 printf("Inside TRANS_AGREE\n");
113                                 PRINT_TID(tdata);
114                                 transagree++;
115                                 break;
116                                 
117                         case TRANS_SOFT_ABORT:
118                                 printf("Inside TRANS_SOFT_ABORT\n");
119                                 transsoftabort++;
120                                 /* Do a socket read only if TRANS_SOFT_ABORT was meant for this thread */
121                                 if ((i == tdata->thread_id) && (val == 0)) {
122                                         //Read list of objects missing
123                                         if(read(sd, &oidcount, sizeof(int)) != 0) {
124                                                 //Break if only objs are locked at the Participant side
125                                                 if (oidcount == 0) {
126                                                         break;
127                                                 }
128                                                 transsoftabortmiss++;
129                                                 N = oidcount * sizeof(unsigned int);
130                                                 if((oidnotfound = calloc(oidcount, sizeof(unsigned int))) == NULL) {
131                                                         printf("Calloc error %s, %d\n", __FILE__, __LINE__);
132                                                 }
133                                                 ptr = (char *) oidnotfound;
134                                                 do {
135                                                         n = read(sd, ptr+sum, N-sum);
136                                                         sum += n;
137                                                 } while(sum < N && n !=0);
138                                         }
139                                 }
140                                 break;
141                         default:
142                                 printf("Participant sent unknown message\n");
143                 }
144         }
145         
146         //Decide what control message to send to Participant    
147         if(transagree == tdata->pilecount){
148                 //Send Commit
149                 ctrl = TRANS_COMMIT;
150                 printf("Sending TRANS_COMMIT accept_fd = %d\n", sd);
151                 if((retval = write(sd, &ctrl, sizeof(char))) < 0) {
152                         perror("Error sending ctrl message for participant\n");
153                         return 1;
154                 }
155                 //printf("Sending control %d ,sd = %d\n", ctrl, i, sd);
156         }
157
158         if(transsoftabort > 0 && transdisagree == 0 && transsoftabortmiss == 0) {
159                 //Send abort but retry commit
160                 ctrl = TRANS_ABORT_BUT_RETRY_COMMIT;
161                 printf("Sending TRANS_ABORT_BUT_RETRY_COMMIT acceptfd = %d\n", sd);
162                 if((retval = write(sd, &ctrl, sizeof(char))) <= 0) {
163                         perror("Error sending ctrl message for participant\n");
164                         return 1;
165                 }
166                 //Sleep and the resend the request
167                 sleep(2);
168                 //Read new control message from Participant
169
170                 if((n = read(sd, &control, sizeof(char))) <= 0) {
171                         perror("No bytes are read for participant\n");
172                         return 1;
173                 }
174                 
175                 //Update common data structure and increment count
176                 tdata->recvmsg[tdata->thread_id].rcv_status = control;
177                 val = 1;
178                 decideResponse(tdata, sd, val);         //Second call to decideResponse(); indicated by parameter val = 1
179         }
180
181         if(transsoftabort > 0 && transsoftabortmiss > 0 && transdisagree == 0) {
182                 //Send abort but retry commit after relooking up objects
183                 ctrl = TRANS_ABORT;
184                 printf("Sending TRANS_ABORT_BUT_RETRY_COMMIT_WITH_RELOCATING\n");
185                 if((retval = write(sd, &ctrl, sizeof(char))) < 0) {
186                         perror("Error sending ctrl message for participant\n");
187                         return 1;
188                 }
189                 //TODO
190                 //Relook up objects
191                 //update location table
192                 
193                 //Free pointers
194                 free(oidnotfound);
195         }
196         
197         return 0;
198 }
199
200 void *transRequest(void *threadarg) {
201         int sd, i, n;
202         struct sockaddr_in serv_addr;
203         struct hostent *server;
204         thread_data_array_t *tdata;
205         objheader_t *headeraddr;
206         //unsigned int *oidnotfound;
207         char buffer[RECEIVE_BUFFER_SIZE], control, recvcontrol;
208         char machineip[16];
209
210         tdata = (thread_data_array_t *) threadarg;
211         //Send Trans Request
212         if ((sd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
213                 perror("Error in socket for TRANS_REQUEST");
214                 return NULL;
215         }
216         bzero((char*) &serv_addr, sizeof(serv_addr));
217         serv_addr.sin_family = AF_INET;
218         serv_addr.sin_port = htons(LISTEN_PORT);
219         //serv_addr.sin_addr.s_addr = inet_addr(MACHINE_IP);
220         midtoIP(tdata->mid,machineip);
221         machineip[15] = '\0';
222         serv_addr.sin_addr.s_addr = inet_addr(machineip);
223
224         if (connect(sd, (struct sockaddr *) &serv_addr, sizeof(struct sockaddr)) < 0) {
225                 perror("Error in connect for TRANS_REQUEST");
226                 return NULL;
227         }
228
229         //Multiple writes for sending packets of data 
230         //Send first few fixed bytes of the TRANS_REQUEST protocol
231         printf("DEBUG -> Start sending commit data... %d\n", tdata->buffer->f.control);
232 //      printf("DEBUG-> Bytes sent in first write: %d\n", sizeof(fixed_data_t));
233 //      printf("Machine count = %d\tnumread = %d\tnummod = %d\tsum_bytes = %d\n", tdata->buffer->f.mcount, tdata->buffer->f.numread, tdata->buffer->f.nummod, tdata->buffer->f.sum_bytes);
234         if (write(sd, &(tdata->buffer->f), (sizeof(fixed_data_t))) < 0) {
235                 perror("Error sending fixed bytes for thread");
236                 return NULL;
237         }
238         //Send list of machines involved in the transaction
239 //      printf("DEBUG-> Bytes sent in second write: %d\n", sizeof(unsigned int) * tdata->pilecount);
240         if (write(sd, tdata->buffer->listmid, (sizeof(unsigned int) * tdata->pilecount )) < 0) {
241                 perror("Error sending list of machines for thread");
242                 return NULL;
243         }
244         //Send oids and version number tuples for objects that are read
245 //      printf("DEBUG-> Bytes sent in the third write: %d\n", (sizeof(unsigned int) + sizeof(short)) * tdata->buffer->f.numread);
246 //      printf(" DEBUG->Read oids are %d %d %d %d\n", *(tdata->buffer->objread), *(tdata->buffer->objread + 6), *(tdata->buffer->objread + 12), *(tdata->buffer->objread +18)); 
247         if (write(sd, tdata->buffer->objread, ((sizeof(unsigned int) + sizeof(short)) * tdata->buffer->f.numread )) < 0) {
248                 perror("Error sending tuples for thread");
249                 return NULL;
250         }
251         //Send objects that are modified
252         for(i = 0; i < tdata->buffer->f.nummod ; i++) {
253                 headeraddr = chashSearch(tdata->rec->lookupTable, tdata->buffer->oidmod[i]);
254                 if (write(sd, headeraddr, sizeof(objheader_t) + classsize[headeraddr->type])  < 0) {
255                         perror("Error sending obj modified for thread");
256                         return NULL;
257                 }
258         }
259         
260         //Read message  control message from participant side
261         if((n = read(sd, &control, sizeof(char))) <= 0) {
262                 perror("Error in reading control message from Participant\n");
263                 return NULL;
264         }
265         recvcontrol = control;
266         
267         //Update common data structure and increment count
268         tdata->recvmsg[tdata->thread_id].rcv_status = recvcontrol;
269         //Lock and update count
270         //Thread sleeps until all messages from pariticipants are received by coordinator
271         pthread_mutex_lock(tdata->lock);
272                 (*(tdata->count))++;
273         
274         if(*(tdata->count) == tdata->pilecount) {
275                 pthread_cond_broadcast(tdata->threshold);
276         } else {
277                 pthread_cond_wait(tdata->threshold, tdata->lock);
278         }       
279
280         //process the participant's request
281         if (decideResponse(tdata, sd, 0) != 0) { // Send the first call to decideResponse() ; indicated by parameter 0
282                 printf("decideResponse returned error %s,%d\n", __FILE__, __LINE__);
283                 pthread_mutex_unlock(tdata->lock);
284                 return NULL;
285         }
286         pthread_mutex_unlock(tdata->lock);
287
288         close(sd);
289         pthread_exit(NULL);
290 }
291
292 int transCommit(transrecord_t *record) {        
293         chashlistnode_t *curr, *ptr, *next;
294         unsigned int size;//Represents number of bins in the chash table
295         unsigned int machinenum, tot_bytes_mod, *listmid;
296         objheader_t *headeraddr;
297         plistnode_t *tmp, *pile = NULL;
298         int i, rc;
299         int pilecount = 0, offset, numthreads = 0, trecvcount = 0, tmachcount = 0;
300         char buffer[RECEIVE_BUFFER_SIZE],control;
301         char transid[TID_LEN];
302         static int newtid = 0;
303         trans_req_data_t *tosend;
304
305         ptr = record->lookupTable->table;
306         size = record->lookupTable->size;
307         //Look through all the objects in the cache and make piles
308         for(i = 0; i < size ;i++) {
309                 curr = &ptr[i];
310                 //Inner loop to traverse the linked list of the cache lookupTable
311                 while(curr != NULL) {
312                         //if the first bin in hash table is empty
313                         if(curr->key == 0) {
314                                 break;
315                         }
316                         next = curr->next;
317                         //Get machine location for object id
318                         
319                         if ((machinenum = lhashSearch(curr->key)) == 0) {
320                                printf("Error: No such machine\n");
321                                return 1;
322                         }
323                                         
324                         //TODO only for debug
325                         //machinenum = 1;
326                         if ((headeraddr = chashSearch(record->lookupTable, curr->key)) == NULL) {
327                                 printf("Error: No such oid\n");
328                                 return 1;
329                         }
330                         //Make machine groups
331                         if ((pile = pInsert(pile, headeraddr, machinenum, record->lookupTable->numelements)) == NULL) {
332                                 printf("pInsert error %s, %d\n", __FILE__, __LINE__);
333                                 return 1;
334                         }
335                         curr = next;
336                 }
337         }
338
339         //Create the packet to be sent in TRANS_REQUEST
340         tmp = pile;
341         pilecount = pCount(pile);               //Keeps track of the number of participants
342         
343         //Thread related variables
344         pthread_t thread[pilecount];            //Create threads for each participant
345         pthread_attr_t attr;                    
346         pthread_cond_t tcond;
347         pthread_mutex_t tlock;
348         pthread_mutex_t tlshrd;
349         //thread_data_array_t thread_data_array[pilecount];
350         thread_data_array_t *thread_data_array;
351
352         thread_data_array = (thread_data_array_t *) malloc(sizeof(thread_data_array_t)*pilecount);
353         
354         thread_response_t rcvd_control_msg[pilecount];  //Shared thread array that keeps track of responses of participants
355         
356         //Initialize and set thread detach attribute
357         pthread_attr_init(&attr);
358         pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
359         pthread_mutex_init(&tlock, NULL);
360         pthread_cond_init(&tcond, NULL);
361         
362         //Keep track of list of machine ids per transaction     
363         if((listmid = calloc(pilecount, sizeof(unsigned int))) == NULL) {
364                 printf("Calloc error %s, %d\n", __FILE__, __LINE__);
365                 return 1;
366         }
367                                 
368         pListMid(pile, listmid);
369         //Process each machine group
370         while(tmp != NULL) {
371                 //Create transaction id
372                 newtid++;
373                 //trans_req_data_t *tosend;
374                 if ((tosend = calloc(1, sizeof(trans_req_data_t))) == NULL) {
375                         printf("Calloc error %s, %d\n", __FILE__, __LINE__);
376                         return 1;
377                 }
378                 tosend->f.control = TRANS_REQUEST;
379                 sprintf(tosend->f.trans_id, "%x_%d", tmp->mid, newtid);
380                 tosend->f.mcount = pilecount;
381                 tosend->f.numread = tmp->numread;
382                 tosend->f.nummod = tmp->nummod;
383                 tosend->f.sum_bytes = tmp->sum_bytes;
384                 tosend->listmid = listmid;
385                 tosend->objread = tmp->objread;
386                 tosend->oidmod = tmp->oidmod;
387                 thread_data_array[numthreads].thread_id = numthreads;
388                 thread_data_array[numthreads].mid = tmp->mid;
389                 thread_data_array[numthreads].pilecount = pilecount;
390                 thread_data_array[numthreads].buffer = tosend;
391                 thread_data_array[numthreads].recvmsg = rcvd_control_msg;
392                 thread_data_array[numthreads].threshold = &tcond;
393                 thread_data_array[numthreads].lock = &tlock;
394                 thread_data_array[numthreads].count = &trecvcount;
395                 thread_data_array[numthreads].rec = record;
396
397                 rc = pthread_create(&thread[numthreads], NULL, transRequest, (void *) &thread_data_array[numthreads]);  
398                 if (rc) {
399                         perror("Error in pthread create");
400                         return 1;
401                 }               
402
403                 numthreads++;           
404                 //TODO frees 
405                 //free(tosend);
406                 tmp = tmp->next;
407         }
408
409         // Free attribute and wait for the other threads
410         pthread_attr_destroy(&attr);
411         for (i = 0 ;i < pilecount ; i++) {
412                 rc = pthread_join(thread[i], NULL);
413                 if (rc)
414                 {
415                         printf("ERROR return code from pthread_join() is %d\n", rc);
416                         return 1;
417                 }
418         }
419         
420         //Free resources        
421         pthread_cond_destroy(&tcond);
422         pthread_mutex_destroy(&tlock);
423 //      for(i = 0 ;i< pilecount ;i++) {
424                 free(tosend);
425 //      }
426         free(listmid);
427         pDelete(pile);
428         return 0;
429 }
430
431 //mnun will be used to represent the machine IP address later
432 void *getRemoteObj(transrecord_t *record, unsigned int mnum, unsigned int oid) {
433         int sd, size, val;
434         struct sockaddr_in serv_addr;
435         struct hostent *server;
436         char control;
437         char machineip[16];
438         objheader_t *h;
439         void *objcopy;
440
441         if ((sd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
442                 perror("Error in socket\n");
443                 return NULL;
444         }
445         bzero((char*) &serv_addr, sizeof(serv_addr));
446         serv_addr.sin_family = AF_INET;
447         serv_addr.sin_port = htons(LISTEN_PORT);
448         //serv_addr.sin_addr.s_addr = inet_addr(MACHINE_IP);
449         midtoIP(mnum,machineip);
450         machineip[15] = '\0';
451         serv_addr.sin_addr.s_addr = inet_addr(machineip);
452
453         if (connect(sd, (struct sockaddr *) &serv_addr, sizeof(struct sockaddr)) < 0) {
454                 perror("Error in connect\n");
455                 return NULL;
456         }
457         char readrequest[sizeof(char)+sizeof(unsigned int)];
458         readrequest[0] = READ_REQUEST;
459         *((unsigned int *)(&readrequest[1])) = oid;
460         if (write(sd, &readrequest, sizeof(readrequest)) < 0) {
461                 perror("Error sending message\n");
462                 return NULL;
463         }
464
465 #ifdef DEBUG1
466         printf("DEBUG -> ready to rcv ...\n");
467 #endif
468         //Read response from the Participant
469         if((val = read(sd, &control, sizeof(char))) <= 0) {
470                 perror("No control response for getRemoteObj sent\n");
471                 return NULL;
472         }
473         switch(control) {
474                 case OBJECT_NOT_FOUND:
475                         printf("DEBUG -> Control OBJECT_NOT_FOUND received\n");
476                         return NULL;
477                 case OBJECT_FOUND:
478                         if((val = read(sd, &size, sizeof(int))) <= 0) {
479                                 perror("No size is read from the participant\n");
480                                 return NULL;
481                         }
482                         objcopy = objstrAlloc(record->cache, size);
483                         if((val = read(sd, objcopy, size)) <= 0) {
484                                 perror("No objects are read from the remote participant\n");
485                                 return NULL;
486                         }
487                         //Insert into cache's lookup table
488                         chashInsert(record->lookupTable, oid, objcopy); 
489                         break;
490                 default:
491                         printf("Error in recv request from participant on a READ_REQUEST\n");
492                         return NULL;
493         }
494         close(sd);
495         return objcopy;
496 }