Various bug fixes
[IRC.git] / Robust / src / Runtime / DSTM / interface / trans.c
1 #include "dstm.h"
2 #include "ip.h"
3 #include "clookup.h"
4 #include "mlookup.h"
5 #include "llookup.h"
6 #include "plookup.h"
7 #include<pthread.h>
8 #include<sys/types.h>
9 #include<sys/socket.h>
10 #include<netdb.h>
11 #include<netinet/in.h>
12
13 #define LISTEN_PORT 2156
14 #define MACHINE_IP "127.0.0.1"
15 #define RECEIVE_BUFFER_SIZE 2048
16
17 extern int classsize[];
18
19 transrecord_t *transStart()
20 {
21         transrecord_t *tmp = malloc(sizeof(transrecord_t));
22         tmp->cache = objstrCreate(1048576);
23         tmp->lookupTable = chashCreate(HASH_SIZE, LOADFACTOR);
24         return tmp;
25 }
26
27 objheader_t *transRead(transrecord_t *record, unsigned int oid)
28 {       
29         printf("Enter TRANS_READ\n");
30         unsigned int machinenumber;
31         objheader_t *tmp, *objheader;
32         void *objcopy;
33         int size;
34         void *buf;
35                 //check cache
36         if((objheader =(objheader_t *)chashSearch(record->lookupTable, oid)) != NULL){
37                 printf("DEBUG -> transRead oid %d found local\n", oid);
38                 return(objheader);
39         } else if ((objheader = (objheader_t *) mhashSearch(oid)) != NULL) {
40                 //Look up in Machine lookup table and found
41                 printf("oid is found in Local mlookup\n");
42                 tmp = mhashSearch(oid);
43                 size = sizeof(objheader_t)+classsize[tmp->type];
44                 //Copy into cache
45                 objcopy = objstrAlloc(record->cache, size);
46                 memcpy(objcopy, (void *)tmp, size);
47                 //Insert into cache's lookup table
48                 chashInsert(record->lookupTable, objheader->oid, objcopy); 
49                 return(objcopy);
50         } else {
51                 //Get the object from the remote location
52                 printf("oid is found in remote machine\n");
53                 machinenumber = lhashSearch(oid);
54                 objcopy = getRemoteObj(record, machinenumber, oid);
55                 if(objcopy == NULL) {
56                         //If object is not found in Remote location
57                         printf("Object not found in Machine %d\n", machinenumber);
58                         return NULL;
59                 }
60                 else
61                         return(objcopy);
62         } 
63 }
64
65 objheader_t *transCreateObj(transrecord_t *record, unsigned short type)
66 {
67         objheader_t *tmp = (objheader_t *) objstrAlloc(record->cache, (sizeof(objheader_t) + classsize[type]));
68         tmp->oid = getNewOID();
69         tmp->type = type;
70         tmp->version = 1;
71         tmp->rcount = 0; //? not sure how to handle this yet
72         tmp->status = 0;
73         tmp->status |= NEW;
74         chashInsert(record->lookupTable, tmp->oid, tmp);
75         return tmp;
76 }
77 //int decideResponse(thread_data_array_t *tdata, char *control, int sd) {
78 int decideResponse(thread_data_array_t *tdata, int sd, int val) {
79         int i, n, N, sum, retval, oidcount = 0;
80         int transagree = 0, transdisagree = 0, transsoftabort = 0, transsoftabortmiss = 0;
81         char ctrl, control, *ptr;
82         unsigned int *oidnotfound;
83         objheader_t *header;
84         
85
86         //Check common data structure 
87         for (i = 0 ; i < tdata->pilecount ; i++) {
88                 //Switch case
89                 control = tdata->recvmsg[i].rcv_status;
90                 switch(control) {
91                         case TRANS_DISAGREE:
92                                 printf("DEBUG-> Inside TRANS_DISAGREE\n");
93                                 transdisagree++;
94                                 //Free transaction records
95                                 objstrDelete(tdata->rec->cache);
96                                 chashDelete(tdata->rec->lookupTable);
97                                 free(tdata->rec);
98                                 //send Abort
99                                 ctrl = TRANS_ABORT;
100                                 for (i = 0 ; i < tdata->pilecount ; i++) { //send writes to all machine groups involved
101                                         if (write(sd, &ctrl, sizeof(char)) < 0) {
102                                                 perror("Error sending ctrl message for participant\n");
103                                                 return 1;
104                                         }
105                                 }
106                                 return 0;
107
108                         case TRANS_AGREE:
109                                 printf("DEBUG-> Inside TRANS_AGREE\n");
110                                 transagree++;
111                                 break;
112                                 
113                         case TRANS_SOFT_ABORT:
114                                 printf("DEBUG-> Inside TRANS_SOFT_ABORT\n");
115                                 transsoftabort++;
116                                 /* Do a socket read only if TRANS_SOFT_ABORT was meant for this thread */
117                                 if ((i == tdata->thread_id) && (val == 0)) {
118                                         //Read list of objects missing
119                                         if(read(sd, &oidcount, sizeof(int)) != 0) {
120                                                 //Break if only objs are locked at the Participant side
121                                                 if (oidcount == 0) {
122                                                         break;
123                                                 }
124                                                 transsoftabortmiss++;
125                                                 N = oidcount * sizeof(unsigned int);
126                                                 if((oidnotfound = calloc(oidcount, sizeof(unsigned int))) == NULL) {
127                                                         printf("Calloc error %s, %d\n", __FILE__, __LINE__);
128                                                 }
129                                                 ptr = (char *) oidnotfound;
130                                                 do {
131                                                         n = read(sd, ptr+sum, N-sum);
132                                                         sum += n;
133                                                 } while(sum < N && n !=0);
134                                         }
135                                 }
136
137                                 break;
138                         default:
139                                 printf("Participant sent unknown message\n");
140                 }
141         }
142         
143         //Decide what control message to send to Participant    
144         if(transagree == tdata->pilecount){
145                 //Send Commit
146                 ctrl = TRANS_COMMIT;
147                 printf("Sending TRANS_COMMIT\n");
148                 if((retval = write(sd, &ctrl, sizeof(char))) < 0) {
149                         perror("Error sending ctrl message for participant\n");
150                         return 1;
151                 }
152                 //printf("Sending control %d ,sd = %d\n", ctrl, i, sd);
153         }
154
155         if(transsoftabort > 0 && transdisagree == 0 && transsoftabortmiss == 0) {
156                 //Send abort but retry commit
157                 ctrl = TRANS_ABORT_BUT_RETRY_COMMIT;
158                 printf("Sending TRANS_ABORT_BUT_RETRY_COMMIT\n");
159                 if((retval = write(sd, &ctrl, sizeof(char))) <= 0) {
160                         perror("Error sending ctrl message for participant\n");
161                         return 1;
162                 }
163                 //Sleep and the resend the request
164                 sleep(5);
165                 //Read new control message from Participant
166
167                 if((n = read(sd, &control, sizeof(char))) <= 0) {
168                         perror("No bytes are read for participant\n");
169                         return 1;
170                 }
171                 
172                 //Update common data structure and increment count
173                 tdata->recvmsg[tdata->thread_id].rcv_status = control;
174                 val = 1;
175                 decideResponse(tdata, sd, val);         //Second call to decideResponse(); indicated by parameter val = 1
176         }
177
178         if(transsoftabort > 0 && transsoftabortmiss > 0 && transdisagree == 0) {
179                 //Send abort but retry commit after relooking up objects
180                 ctrl = TRANS_ABORT;
181                 printf("Sending TRANS_ABORT_BUT_RETRY_COMMIT_WITH_RELOCATING\n");
182                 if((retval = write(sd, &ctrl, sizeof(char))) < 0) {
183                         perror("Error sending ctrl message for participant\n");
184                         return 1;
185                 }
186                 //TODO
187                 //Relook up objects
188                 //update location table
189                 
190                 //Free pointers
191                 free(oidnotfound);
192         }
193         
194         return 0;
195 }
196
197 void *transRequest(void *threadarg) {
198         int sd, i, n;
199         struct sockaddr_in serv_addr;
200         struct hostent *server;
201         thread_data_array_t *tdata;
202         objheader_t *headeraddr;
203         //unsigned int *oidnotfound;
204         char buffer[RECEIVE_BUFFER_SIZE], control, recvcontrol;
205         char machineip[16];
206
207         tdata = (thread_data_array_t *) threadarg;
208         //Send Trans Request
209         if ((sd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
210                 perror("Error in socket for TRANS_REQUEST");
211                 return NULL;
212         }
213         bzero((char*) &serv_addr, sizeof(serv_addr));
214         serv_addr.sin_family = AF_INET;
215         serv_addr.sin_port = htons(LISTEN_PORT);
216         //serv_addr.sin_addr.s_addr = inet_addr(MACHINE_IP);
217         midtoIP(tdata->mid,machineip);
218         machineip[15] = '\0';
219         serv_addr.sin_addr.s_addr = inet_addr(machineip);
220
221         if (connect(sd, (struct sockaddr *) &serv_addr, sizeof(struct sockaddr)) < 0) {
222                 perror("Error in connect for TRANS_REQUEST");
223                 return NULL;
224         }
225
226         //Multiple writes for sending packets of data 
227         //Send first few fixed bytes of the TRANS_REQUEST protocol
228         printf("DEBUG -> Start sending commit data... %d\n", tdata->buffer->f.control);
229 //      printf("DEBUG-> Bytes sent in first write: %d\n", sizeof(fixed_data_t));
230 //      printf("Machine count = %d\tnumread = %d\tnummod = %d\tsum_bytes = %d\n", tdata->buffer->f.mcount, tdata->buffer->f.numread, tdata->buffer->f.nummod, tdata->buffer->f.sum_bytes);
231         if (write(sd, &(tdata->buffer->f), (sizeof(fixed_data_t))) < 0) {
232                 perror("Error sending fixed bytes for thread");
233                 return NULL;
234         }
235         //Send list of machines involved in the transaction
236 //      printf("DEBUG-> Bytes sent in second write: %d\n", sizeof(unsigned int) * tdata->pilecount);
237         if (write(sd, tdata->buffer->listmid, (sizeof(unsigned int) * tdata->pilecount )) < 0) {
238                 perror("Error sending list of machines for thread");
239                 return NULL;
240         }
241         //Send oids and version number tuples for objects that are read
242 //      printf("DEBUG-> Bytes sent in the third write: %d\n", (sizeof(unsigned int) + sizeof(short)) * tdata->buffer->f.numread);
243 //      printf(" DEBUG->Read oids are %d %d %d %d\n", *(tdata->buffer->objread), *(tdata->buffer->objread + 6), *(tdata->buffer->objread + 12), *(tdata->buffer->objread +18)); 
244         if (write(sd, tdata->buffer->objread, ((sizeof(unsigned int) + sizeof(short)) * tdata->buffer->f.numread )) < 0) {
245                 perror("Error sending tuples for thread");
246                 return NULL;
247         }
248         //Send objects that are modified
249         for(i = 0; i < tdata->buffer->f.nummod ; i++) {
250                 headeraddr = chashSearch(tdata->rec->lookupTable, tdata->buffer->oidmod[i]);
251                 if (write(sd, headeraddr, sizeof(objheader_t) + classsize[headeraddr->type])  < 0) {
252                         perror("Error sending obj modified for thread");
253                         return NULL;
254                 }
255         }
256         
257         //Read message  control message from participant side
258         if((n = read(sd, &control, sizeof(char))) <= 0) {
259                 perror("Error in reading control message from Participant\n");
260                 return NULL;
261         }
262         recvcontrol = control;
263         
264         //Update common data structure and increment count
265         tdata->recvmsg[tdata->thread_id].rcv_status = recvcontrol;
266         //Lock and update count
267         //Thread sleeps until all messages from pariticipants are received by coordinator
268         pthread_mutex_lock(tdata->lock);
269                 (*(tdata->count))++;
270         
271         if(*(tdata->count) == tdata->pilecount) {
272                 pthread_cond_broadcast(tdata->threshold);
273         } else {
274                 pthread_cond_wait(tdata->threshold, tdata->lock);
275         }       
276
277         //process the participant's request
278         if (decideResponse(tdata, sd, 0) != 0) { // Send the first call to decideResponse() ; indicated by parameter 0
279                 printf("decideResponse returned error %s,%d\n", __FILE__, __LINE__);
280                 pthread_mutex_unlock(tdata->lock);
281                 return NULL;
282         }
283         pthread_mutex_unlock(tdata->lock);
284
285         close(sd);
286         pthread_exit(NULL);
287 }
288
289 int transCommit(transrecord_t *record) {        
290         chashlistnode_t *curr, *ptr, *next;
291         unsigned int size;//Represents number of bins in the chash table
292         unsigned int machinenum, tot_bytes_mod, *listmid;
293         objheader_t *headeraddr;
294         plistnode_t *tmp, *pile = NULL;
295         int i, rc;
296         int pilecount = 0, offset, numthreads = 0, trecvcount = 0, tmachcount = 0;
297         char buffer[RECEIVE_BUFFER_SIZE],control;
298         char transid[TID_LEN];
299         static int newtid = 0;
300         trans_req_data_t *tosend;
301
302         ptr = record->lookupTable->table;
303         size = record->lookupTable->size;
304         //Look through all the objects in the cache and make piles
305         for(i = 0; i < size ;i++) {
306                 curr = &ptr[i];
307                 //Inner loop to traverse the linked list of the cache lookupTable
308                 while(curr != NULL) {
309                         //if the first bin in hash table is empty
310                         if(curr->key == 0) {
311                                 break;
312                         }
313                         next = curr->next;
314                         //Get machine location for object id
315                         
316                         if ((machinenum = lhashSearch(curr->key)) == 0) {
317                                printf("Error: No such machine\n");
318                                return 1;
319                         }
320                                         
321                         //TODO only for debug
322                         //machinenum = 1;
323                         if ((headeraddr = chashSearch(record->lookupTable, curr->key)) == NULL) {
324                                 printf("Error: No such oid\n");
325                                 return 1;
326                         }
327                         //Make machine groups
328                         if ((pile = pInsert(pile, headeraddr, machinenum, record->lookupTable->numelements)) == NULL) {
329                                 printf("pInsert error %s, %d\n", __FILE__, __LINE__);
330                                 return 1;
331                         }
332                         curr = next;
333                 }
334         }
335
336         //Create the packet to be sent in TRANS_REQUEST
337         tmp = pile;
338         pilecount = pCount(pile);               //Keeps track of the number of participants
339         
340         //Thread related variables
341         pthread_t thread[pilecount];            //Create threads for each participant
342         pthread_attr_t attr;                    
343         pthread_cond_t tcond;
344         pthread_mutex_t tlock;
345         pthread_mutex_t tlshrd;
346         //thread_data_array_t thread_data_array[pilecount];
347         thread_data_array_t *thread_data_array;
348
349         thread_data_array = (thread_data_array_t *) malloc(sizeof(thread_data_array_t)*pilecount);
350         
351         thread_response_t rcvd_control_msg[pilecount];  //Shared thread array that keeps track of responses of participants
352         
353         //Initialize and set thread detach attribute
354         pthread_attr_init(&attr);
355         pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
356         pthread_mutex_init(&tlock, NULL);
357         pthread_cond_init(&tcond, NULL);
358         
359         //Keep track of list of machine ids per transaction     
360         if((listmid = calloc(pilecount, sizeof(unsigned int))) == NULL) {
361                 printf("Calloc error %s, %d\n", __FILE__, __LINE__);
362                 return 1;
363         }
364                                 
365         pListMid(pile, listmid);
366         //Process each machine group
367         while(tmp != NULL) {
368                 //Create transaction id
369                 newtid++;
370                 //trans_req_data_t *tosend;
371                 if ((tosend = calloc(1, sizeof(trans_req_data_t))) == NULL) {
372                         printf("Calloc error %s, %d\n", __FILE__, __LINE__);
373                         return 1;
374                 }
375                 tosend->f.control = TRANS_REQUEST;
376                 sprintf(tosend->f.trans_id, "%x_%d", tmp->mid, newtid);
377                 tosend->f.mcount = pilecount;
378                 tosend->f.numread = tmp->numread;
379                 tosend->f.nummod = tmp->nummod;
380                 tosend->f.sum_bytes = tmp->sum_bytes;
381                 tosend->listmid = listmid;
382                 tosend->objread = tmp->objread;
383                 tosend->oidmod = tmp->oidmod;
384                 thread_data_array[numthreads].thread_id = numthreads;
385                 thread_data_array[numthreads].mid = tmp->mid;
386                 thread_data_array[numthreads].pilecount = pilecount;
387                 thread_data_array[numthreads].buffer = tosend;
388                 thread_data_array[numthreads].recvmsg = rcvd_control_msg;
389                 thread_data_array[numthreads].threshold = &tcond;
390                 thread_data_array[numthreads].lock = &tlock;
391                 thread_data_array[numthreads].count = &trecvcount;
392                 thread_data_array[numthreads].rec = record;
393
394                 rc = pthread_create(&thread[numthreads], NULL, transRequest, (void *) &thread_data_array[numthreads]);  
395                 if (rc) {
396                         perror("Error in pthread create");
397                         return 1;
398                 }               
399
400                 numthreads++;           
401                 //TODO frees 
402                 //free(tosend);
403                 tmp = tmp->next;
404         }
405
406         // Free attribute and wait for the other threads
407         pthread_attr_destroy(&attr);
408         for (i = 0 ;i < pilecount ; i++) {
409                 rc = pthread_join(thread[i], NULL);
410                 if (rc)
411                 {
412                         printf("ERROR return code from pthread_join() is %d\n", rc);
413                         return 1;
414                 }
415         }
416         
417         //Free resources        
418         pthread_cond_destroy(&tcond);
419         pthread_mutex_destroy(&tlock);
420 //      for(i = 0 ;i< pilecount ;i++) {
421                 free(tosend);
422 //      }
423         free(listmid);
424         pDelete(pile);
425         return 0;
426 }
427
428 //mnun will be used to represent the machine IP address later
429 void *getRemoteObj(transrecord_t *record, unsigned int mnum, unsigned int oid) {
430         int sd, size, val;
431         struct sockaddr_in serv_addr;
432         struct hostent *server;
433         char control;
434         char machineip[16];
435         objheader_t *h;
436         void *objcopy;
437
438         if ((sd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
439                 perror("Error in socket\n");
440                 return NULL;
441         }
442         bzero((char*) &serv_addr, sizeof(serv_addr));
443         serv_addr.sin_family = AF_INET;
444         serv_addr.sin_port = htons(LISTEN_PORT);
445         //serv_addr.sin_addr.s_addr = inet_addr(MACHINE_IP);
446         midtoIP(mnum,machineip);
447         machineip[15] = '\0';
448         serv_addr.sin_addr.s_addr = inet_addr(machineip);
449
450         if (connect(sd, (struct sockaddr *) &serv_addr, sizeof(struct sockaddr)) < 0) {
451                 perror("Error in connect\n");
452                 return NULL;
453         }
454         char readrequest[sizeof(char)+sizeof(unsigned int)];
455         readrequest[0] = READ_REQUEST;
456         *((unsigned int *)(&readrequest[1])) = oid;
457         if (write(sd, &readrequest, sizeof(readrequest)) < 0) {
458                 perror("Error sending message\n");
459                 return NULL;
460         }
461
462 #ifdef DEBUG1
463         printf("DEBUG -> ready to rcv ...\n");
464 #endif
465         //Read response from the Participant
466         if((val = read(sd, &control, sizeof(char))) <= 0) {
467                 perror("No control response for getRemoteObj sent\n");
468                 return NULL;
469         }
470         switch(control) {
471                 case OBJECT_NOT_FOUND:
472                         return NULL;
473                         break;
474                 case OBJECT_FOUND:
475                         if((val = read(sd, &size, sizeof(int))) <= 0) {
476                                 perror("No size is read from the participant\n");
477                                 return NULL;
478                         }
479                         objcopy = objstrAlloc(record->cache, size);
480                         if((val = read(sd, objcopy, size)) <= 0) {
481                                 perror("No objects are read from the remote participant\n");
482                                 return NULL;
483                         }
484                         //Insert into cache's lookup table
485                         chashInsert(record->lookupTable, oid, objcopy); 
486                         break;
487                 default:
488                         printf("Error in recv request from participant on a READ_REQUEST\n");
489                         return NULL;
490         }
491         close(sd);
492         return objcopy;
493 }