11 #define LISTEN_PORT 2156
12 #define BACKLOG 10 //max pending connections
13 #define RECEIVE_BUFFER_SIZE 2048
15 extern int classsize[];
17 objstr_t *mainobjstore;
21 //Initialize main object store
22 mainobjstore = objstrCreate(DEFAULT_OBJ_STORE_SIZE);
23 if (mhashCreate(HASH_SIZE, LOADFACTOR))
26 if (lhashCreate(HASH_SIZE, LOADFACTOR))
29 //pthread_t threadListen;
30 //pthread_create(&threadListen, NULL, dstmListen, NULL);
37 int listenfd, acceptfd;
38 struct sockaddr_in my_addr;
39 struct sockaddr_in client_addr;
40 socklen_t addrlength = sizeof(struct sockaddr);
41 pthread_t thread_dstm_accept;
44 listenfd = socket(AF_INET, SOCK_STREAM, 0);
51 my_addr.sin_family = AF_INET;
52 my_addr.sin_port = htons(LISTEN_PORT);
53 my_addr.sin_addr.s_addr = INADDR_ANY;
54 memset(&(my_addr.sin_zero), '\0', 8);
56 if (bind(listenfd, (struct sockaddr *)&my_addr, addrlength) == -1)
62 if (listen(listenfd, BACKLOG) == -1)
68 printf("Listening on port %d, fd = %d\n", LISTEN_PORT, listenfd);
71 acceptfd = accept(listenfd, (struct sockaddr *)&client_addr, &addrlength);
72 pthread_create(&thread_dstm_accept, NULL, dstmAccept, (void *)acceptfd);
77 void *dstmAccept(void *acceptfd)
81 char buffer[RECEIVE_BUFFER_SIZE], control,ctrl;
85 trans_commit_data_t transinfo;
87 int fd_flags = fcntl((int)acceptfd, F_GETFD), size;
89 printf("Recieved connection: fd = %d\n", (int)acceptfd);
90 recv((int)acceptfd, &control, sizeof(char), 0);
93 printf("DEBUG -> Recv READ_REQUEST from Coordinator\n");
94 recv((int)acceptfd, &oid, sizeof(unsigned int), 0);
95 srcObj = mhashSearch(oid);
96 h = (objheader_t *) srcObj;
98 ctrl = OBJECT_NOT_FOUND;
101 size = sizeof(objheader_t) + sizeof(classsize[h->type]);
102 if(send((int)acceptfd, &ctrl, sizeof(char), 0) < 0) {
103 perror("Error sending control msg to coordinator\n");
105 if(send((int)acceptfd, &size, sizeof(int), 0) < 0) {
106 perror("Error sending size of object to coordinator\n");
108 if(send((int)acceptfd, h, size, 0) < 0) {
109 perror("Error in sending object\n");
114 case READ_MULT_REQUEST:
115 printf("DEBUG-> READ_MULT_REQUEST\n");
119 printf("DEBUG -> MOVE_REQUEST\n");
122 case MOVE_MULT_REQUEST:
123 printf("DEBUG -> MOVE_MULT_REQUEST\n");
127 printf("DEBUG -> Recv TRANS_REQUEST from Coordinator\n");
128 if((val = readClientReq((int)acceptfd, &transinfo)) != 0) {
129 printf("Error in readClientReq\n");
134 printf("Error receiving\n");
136 if (close((int)acceptfd) == -1)
141 printf("Closed connection: fd = %d\n", (int)acceptfd);
146 int readClientReq(int acceptfd, trans_commit_data_t *transinfo) {
147 char *ptr, control, prevctrl, sendctrl;
149 objheader_t *h, tmp_header;
151 int sum = 0, N, n, val;
153 //Reads to process the TRANS_REQUEST protocol further
155 N = sizeof(fixed) - 1;
156 ptr = (char *)&fixed;;
157 fixed.control = TRANS_REQUEST;
159 n = recv((int)acceptfd, (void *) ptr+1+sum, N-sum, 0);
160 // printf("DEBUG -> 1. Reading %d bytes \n", n);
162 } while(sum < N && n != 0);
164 //printf("Machine count = %d\tnumread = %d\tnummod = %d\tsum_bytes = %d\n", fixed.mcount, fixed.numread, fixed.nummod, fixed.sum_bytes);
166 int mcount = fixed.mcount;
167 N = mcount * sizeof(unsigned int);
168 unsigned int listmid[mcount];
169 ptr = (char *) listmid;
172 n = recv((int)acceptfd, (void *) ptr+sum, N-sum, 0);
173 // printf("DEBUG -> 2. Reading %d bytes cap = %d\n", n, N);
175 } while(sum < N && n != 0);
177 // Read oid and version tuples
178 int numread = fixed.numread;
179 N = numread * (sizeof(unsigned int) + sizeof(short));
183 n = recv((int)acceptfd, (void *) objread, N, 0);
184 // printf("DEBUG -> 3. Reading %d bytes cap = %d\n", n, N);
186 } while(sum < N && n != 0);
187 //printf("DEBUG -> %d %d %d %d\n", *objread, *(objread + 6), *(objread + 12), *(objread + 18));
189 // Read modified objects
190 if ((modptr = objstrAlloc(mainobjstore, fixed.sum_bytes)) == NULL) {
191 printf("objstrAlloc error for modified objects %s, %d", __FILE__, __LINE__);
196 n = recv((int)acceptfd, modptr+sum, fixed.sum_bytes-sum, 0);
197 //printf("DEBUG -> 4. Reading %d bytes cap = %d, oid = %d\n", n, fixed.sum_bytes, *((int *)modptr));
199 } while (sum < fixed.sum_bytes && n != 0);
201 //Send control message as per all votes from the particpants
202 if((prevctrl = handleTransReq(acceptfd, &fixed, transinfo, listmid, objread, modptr)) == 0 ) {
203 printf("Handle req error\n");
206 //Read for new control message from Coordiator
207 recv((int)acceptfd, &control, sizeof(char), 0);
210 printf("DEBUG -> Recv TRANS_ABORT from Coordinator\n");
211 //send ack to coordinator
212 sendctrl = TRANS_SUCESSFUL;
213 if(send((int)acceptfd, &sendctrl, sizeof(char), 0) < 0) {
214 perror("Error sending ACK to coordinator\n");
219 printf("DEBUG -> Recv TRANS_COMMIT from Coordinator\n");
220 if((val = transCommitProcess(transinfo, (int)acceptfd)) != 0) {
221 printf("Error in transCommitProcess %s, %d\n", __FILE__, __LINE__);
224 case TRANS_ABORT_BUT_RETRY_COMMIT:
225 printf("DEBUG -> Recv TRANS_ABORT_BUT_RETRY_COMMIT from Coordinator\n");
226 //Process again after waiting for sometime and on prev control message sent
231 case TRANS_AGREE_BUT_MISSING_OBJECTS:
233 case TRANS_SOFT_ABORT:
236 //Try sending either agree or disagree after sometime
238 //Wait in a blocking thread or something
239 //Recv from client new listmid, mcount and pilecount
240 //call 2 new functions that are similar to readClientReq and handleRequest
242 case TRANS_ABORT_BUT_RETRY_COMMIT_WITH_RELOCATING:
243 printf("DEBUG -> Recv TRANS_ABORT_BUT_RETRY_COMMIT_WITH_RELOCATING from Coordinator\n");
246 printf("No response to TRANS_AGREE OR DISAGREE protocol\n");
253 //This function runs a decision after all objects are weighed under one of the 4 possibilities
254 //and returns the appropriate control message to the Ccordinator
255 char handleTransReq(int acceptfd, fixed_data_t *fixed, trans_commit_data_t *transinfo, unsigned int *listmid, char *objread, void *modptr) {
257 char control = 0, ctrlmissoid, *ptr, *oidmodnotfound;
260 unsigned int *oidnotfound, *oidlocked, *oidmod;
262 oidnotfound = (unsigned int *) calloc(fixed->numread + fixed->nummod, sizeof(unsigned int));
263 oidlocked = (unsigned int *) calloc(fixed->numread + fixed->nummod, sizeof(unsigned int));
264 oidmod = (unsigned int *) calloc(fixed->nummod, sizeof(unsigned int));
265 oidmodnotfound = (char *) calloc(fixed->nummod, sizeof(char));
267 // Counters and arrays to formulate decision on control message to be sent
268 int objnotfound = 0, objlocked = 0, objmod =0, v_nomatch = 0, v_matchlock = 0, v_matchnolock = 0;
269 int objmodnotfound = 0, nummodfound = 0;
271 objheader_t *headptr;
272 //TODO remove this deadcode from here
273 objinfo_t objinfo[fixed->nummod + fixed->numread];// Structure that saves the possibility per object(if version match, object not found on machine etc)
275 //Process each object present in the pile
277 //Process each oid in the machine pile/ group
278 for (i = 0; i < fixed->numread + fixed->nummod; i++) {
279 if (i < fixed->numread) {//Object is read
280 int incr = sizeof(unsigned int) + sizeof(short);// Offset that points to next position in the objread array
282 oid = *((unsigned int *)(objread + incr));
283 incr += sizeof(unsigned int);
284 version = *((short *)(objread + incr));
285 } else {//Obj is modified
286 headptr = (objheader_t *) ptr;
288 oidmod[objmod] = oid;//Array containing modified oids
290 version = headptr->version;
291 ptr += sizeof(objheader_t) + classsize[headptr->type];
293 //Check if object is still present in the machine since the beginning of TRANS_REQUEST
294 if ((mobj = mhashSearch(oid)) == NULL) {// Obj not found
295 objinfo[i].poss_val = OBJECT_NOT_FOUND;
296 if(i >= fixed->numread && (i < (fixed->nummod + fixed->numread))) {
297 oidmodnotfound[i - fixed->numread] = 1; //array keeps track of oids that are a subset of oidmod and found on machine
300 //Save the oids not found for later use
301 oidnotfound[objnotfound] = ((objheader_t *)mobj)->oid;
303 } else { // If obj found in machine (i.e. has not moved)
304 //Check if obj is locked
305 if ((((objheader_t *)mobj)->status & LOCK) == LOCK) {
306 if (version == ((objheader_t *)mobj)->version) { // If version match
307 objinfo[i].poss_val = OBJ_LOCKED_BUT_VERSION_MATCH;
309 } else {//If versions don't match ..HARD ABORT
310 objinfo[i].poss_val = VERSION_NO_MATCH;
312 //send TRANS_DISAGREE to Coordinator
313 control = TRANS_DISAGREE;
314 write(acceptfd, &control, sizeof(char));
315 printf("DEBUG -> Sending TRANS_DISAGREE\n");
318 } else {//Obj is not locked , so lock object
319 ((objheader_t *)mobj)->status |= LOCK;
320 //Save all object oids that are locked on this machine during this transaction request call
321 oidlocked[objlocked] = ((objheader_t *)mobj)->oid;
323 if (version == ((objheader_t *)mobj)->version) { //If versions match
324 objinfo[i].poss_val = OBJ_UNLOCK_BUT_VERSION_MATCH;
326 } else { //If versions don't match
327 objinfo[i].poss_val = VERSION_NO_MATCH;
329 //send TRANS_DISAGREE to Coordinator
330 control = TRANS_DISAGREE;
331 write(acceptfd, &control, sizeof(char));
332 printf("DEBUG -> Sending TRANS_DISAGREE\n");
339 printf("No of objs locked = %d\n", objlocked);
340 printf("No of v_nomatch = %d\n", v_nomatch);
341 printf("No of objs v_match but are did not have locks before = %d\n", v_matchnolock);
342 printf("No of objs v_match but had locks before = %d\n", v_matchlock);
343 printf("No of objs not found = %d\n", objnotfound);
344 printf("No of objs modified but not found = %d\n", objmodnotfound);
346 //Decide what control message(s) to send
347 if(v_matchnolock == fixed->numread + fixed->nummod) {
348 //send TRANS_AGREE to Coordinator
349 control = TRANS_AGREE;
350 write(acceptfd, &control, sizeof(char));
351 printf("DEBUG -> Sending TRANS_AGREE\n");
354 if(objnotfound > 0 && v_matchlock == 0 && v_nomatch == 0) {
355 //send TRANS_AGREE_BUT_MISSING_OBJECTS to Coordinator
356 control = TRANS_AGREE_BUT_MISSING_OBJECTS;
357 write(acceptfd, &control, sizeof(char));
358 printf("DEBUG -> Sending TRANS_AGREE_BUT_MISSING_OBJECTS\n");
359 //send number of oids not found and the missing oids
360 write(acceptfd, &objnotfound, sizeof(int));
361 write(acceptfd, oidnotfound, (sizeof(unsigned int) * objnotfound));
364 if(v_matchlock > 0 && v_nomatch == 0) {
365 //send TRANS_SOFT_ABORT to Coordinator
366 control = TRANS_SOFT_ABORT;
367 write(acceptfd, &control, sizeof(char));
368 printf("DEBUG -> Sending TRANS_SOFT_ABORT\n");
369 //send number of oids not found and the missing oids
370 write(acceptfd, &objnotfound, sizeof(int));
372 write(acceptfd, oidnotfound, (sizeof(unsigned int) * objnotfound));
375 //Do the following when TRANS_DISAGREE is sent
376 if(control == TRANS_DISAGREE) {
377 //Set the reference count of the object to 1 in mainstore for garbage collection
379 for(i = 0; i< fixed->nummod; i++) {
380 headptr = (objheader_t *) ptr;
382 ptr += sizeof(objheader_t) + classsize[headptr->type];
384 //Unlock objects that was locked in the trans
385 for(i = 0; i< objlocked ; i++) {
386 mobj = mhashSearch(oidlocked[i]);// find the header address
387 ((objheader_t *)mobj)->status &= ~(LOCK);
391 // List of objects that were sent as modified in the TRANS_REQUEST but are now not found on the machine
392 nummodfound = fixed->nummod - objmodnotfound;
393 unsigned int oidmodfound[nummodfound];
394 for(i = 0; i< fixed->nummod; i++) {
395 if(oidmodnotfound[i] == 0) {
396 oidmodfound[j] = oidmod[i];
400 //Fill out the structure required for a trans commit process if pile receives a TRANS_COMMIT
401 transinfo->objmod = oidmod;
402 transinfo->objlocked = oidlocked;
403 transinfo->modptr = modptr;
404 transinfo->nummod = fixed->nummod;
405 transinfo->numlocked = objlocked;
410 //Processes oids in the TRANS_COMMIT request at the participant end and sends an ack back
411 int transCommitProcess(trans_commit_data_t *transinfo, int acceptfd) {
413 int i = 0, offset = 0;
415 //Process each modified object saved in the mainobject store
416 for(i=0; i<transinfo->nummod; i++) {
417 if((header = (objheader_t *) mhashSearch(transinfo->objmod[i])) == NULL) {
418 printf("mhashserach returns NULL\n");
420 //change reference count of older address and free space in objstr ??
421 header->rcount = 1; //Not sure what would be th val
422 //change ptr address in mhash table
423 mhashRemove(transinfo->objmod[i]);
424 mhashInsert(transinfo->objmod[i], (transinfo->modptr + offset));
425 offset += sizeof(objheader_t) + classsize[header->type];
426 //update object version
427 header = (objheader_t *) mhashSearch(transinfo->objmod[i]);
428 header->version += 1;
430 for(i=0; i<transinfo->numlocked; i++) {
432 header = (objheader_t *) mhashSearch(transinfo->objlocked[i]);
433 header->status &= ~(LOCK);
436 //TODO Update location lookup table
438 //send ack to coordinator
439 control = TRANS_SUCESSFUL;
440 if(send((int)acceptfd, &control, sizeof(char), 0) < 0) {
441 perror("Error sending ACK to coordinator\n");