11 #define LISTEN_PORT 2156
12 #define BACKLOG 10 //max pending connections
13 #define RECEIVE_BUFFER_SIZE 2048
15 extern int classsize[];
17 objstr_t *mainobjstore;
21 //todo:initialize main object store
22 //do we want this to be a global variable, or provide
23 //separate access funtions and hide the structure?
24 mainobjstore = objstrCreate(DEFAULT_OBJ_STORE_SIZE);
25 if (mhashCreate(HASH_SIZE, LOADFACTOR))
28 if (lhashCreate(HASH_SIZE, LOADFACTOR))
31 //pthread_t threadListen;
32 //pthread_create(&threadListen, NULL, dstmListen, NULL);
39 int listenfd, acceptfd;
40 struct sockaddr_in my_addr;
41 struct sockaddr_in client_addr;
42 socklen_t addrlength = sizeof(struct sockaddr);
43 pthread_t thread_dstm_accept;
46 listenfd = socket(AF_INET, SOCK_STREAM, 0);
53 my_addr.sin_family = AF_INET;
54 my_addr.sin_port = htons(LISTEN_PORT);
55 my_addr.sin_addr.s_addr = INADDR_ANY;
56 memset(&(my_addr.sin_zero), '\0', 8);
58 if (bind(listenfd, (struct sockaddr *)&my_addr, addrlength) == -1)
64 if (listen(listenfd, BACKLOG) == -1)
70 printf("Listening on port %d, fd = %d\n", LISTEN_PORT, listenfd);
73 acceptfd = accept(listenfd, (struct sockaddr *)&client_addr, &addrlength);
74 pthread_create(&thread_dstm_accept, NULL, dstmAccept, (void *)acceptfd);
79 void *dstmAccept(void *acceptfd)
83 char buffer[RECEIVE_BUFFER_SIZE], control;
88 int fd_flags = fcntl((int)acceptfd, F_GETFD), size;
90 printf("Recieved connection: fd = %d\n", (int)acceptfd);
91 recv((int)acceptfd, &control, sizeof(char), 0);
94 recv((int)acceptfd, &oid, sizeof(unsigned int), 0);
95 srcObj = mhashSearch(oid);
96 h = (objheader_t *) srcObj;
98 buffer[0] = OBJECT_NOT_FOUND;
100 buffer[0] = OBJECT_FOUND;
101 size = sizeof(objheader_t) + sizeof(classsize[h->type]);
102 memcpy(buffer+1, srcObj, size);
104 if(send((int)acceptfd, (void *)buffer, sizeof(buffer), 0) < 0) {
109 case READ_MULT_REQUEST:
110 printf("DEBUG-> READ_MULT_REQUEST\n");
114 printf("DEBUG -> MOVE_REQUEST\n");
117 case MOVE_MULT_REQUEST:
118 printf("DEBUG -> MOVE_MULT_REQUEST\n");
122 //printf("DEBUG -> TRANS_REQUEST\n");
123 if((val = readClientReq((int)acceptfd)) == 1) {
124 printf("Error in readClientReq\n");
129 printf("Error receiving\n");
132 //Read for new control message from Coordiator
133 recv((int)acceptfd, &control, sizeof(char), 0);
136 printf("DEBUG -> TRANS_ABORT\n");
137 write((int)acceptfd, &control, sizeof(char));
141 printf("DEBUG -> TRANS_COMMIT\n");
142 write((int)acceptfd, &control, sizeof(char));
144 //change ptr address in mhash table
146 //update object version
147 //change reference count of older address??
148 //free space in objstr ??
149 //Update location lookup table
153 if (close((int)acceptfd) == -1)
158 printf("Closed connection: fd = %d\n", (int)acceptfd);
163 int readClientReq(int acceptfd) {
166 objheader_t *h, tmp_header;
171 N = sizeof(fixed) - 1;
172 ptr = (char *)&fixed;;
173 fixed.control = TRANS_REQUEST;
175 n = recv((int)acceptfd, (void *) ptr+1+sum, N-sum, 0);
176 // printf("DEBUG -> 1. Reading %d bytes \n", n);
178 } while(sum < N && n != 0);
180 //printf("Machine count = %d\tnumread = %d\tnummod = %d\tsum_bytes = %d\n", fixed.mcount, fixed.numread, fixed.nummod, fixed.sum_bytes);
182 int mcount = fixed.mcount;
183 N = mcount * sizeof(unsigned int);
184 unsigned int listmid[mcount];
185 ptr = (char *) listmid;
188 n = recv((int)acceptfd, (void *) ptr+sum, N-sum, 0);
189 // printf("DEBUG -> 2. Reading %d bytes cap = %d\n", n, N);
191 } while(sum < N && n != 0);
193 // Read oid and version tuples
194 int numread = fixed.numread;
195 N = numread * (sizeof(unsigned int) + sizeof(short));
199 n = recv((int)acceptfd, (void *) objread, N, 0);
200 // printf("DEBUG -> 3. Reading %d bytes cap = %d\n", n, N);
202 } while(sum < N && n != 0);
203 //printf("DEBUG -> %d %d %d %d\n", *objread, *(objread + 6), *(objread + 12), *(objread + 18));
205 // Read modified objects
206 if ((modptr = objstrAlloc(mainobjstore, fixed.sum_bytes)) == NULL) {
207 // printf("objstrAlloc error for modified objects %s, %d", __FILE__, __LINE__);
212 n = recv((int)acceptfd, modptr+sum, fixed.sum_bytes-sum, 0);
213 //printf("DEBUG -> 4. Reading %d bytes cap = %d, oid = %d\n", n, fixed.sum_bytes, *((int *)modptr));
215 } while (sum < fixed.sum_bytes && n != 0);
216 //Send control message as per all votes from the particpants
217 handleTransReq(acceptfd, &fixed, listmid, objread, modptr);
224 //This function runs a decision after all objects are weighed under one of the 4 possibilities
225 //and returns the appropriate control message to the Ccordinator
226 int handleTransReq(int acceptfd, fixed_data_t *fixed, unsigned int *listmid, char *objread, void *modptr) {
230 unsigned int oid, oidnotfound[fixed->numread + fixed->nummod], oidlocked[fixed->nummod + fixed->numread];
231 int objnotfound = 0, objlocked = 0, v_nomatch = 0, v_matchlock = 0, v_matchnolock = 0;// Counters to formulate decision on control message to be sent
233 objheader_t *headptr;
234 objinfo_t objinfo[fixed->nummod + fixed->numread];// Structure that saves the possibility per object(if version match, object not found on machine etc)
236 //Process each object present in the pile
238 //Process each oid in the machine pile/ group
239 for (i = 0; i < fixed->numread + fixed->nummod; i++) {
240 if (i < fixed->numread) {//Object is read
241 int incr = sizeof(unsigned int) + sizeof(short);// Offset that points to next position in the objread array
243 oid = *((unsigned int *)(objread + incr));
244 incr += sizeof(unsigned int);
245 version = *((short *)(objread + incr));
246 } else {//Obj is modified
247 headptr = (objheader_t *) ptr;
249 version = headptr->version;
250 ptr += sizeof(objheader_t) + classsize[headptr->type];
252 //Check if object is still present in the machine since the beginning of TRANS_REQUEST
253 if ((mobj = mhashSearch(oid)) == NULL) {// Obj not found
254 objinfo[i].poss_val = OBJECT_NOT_FOUND;
255 //Save the oids not found for later use
256 oidnotfound[objnotfound] = ((objheader_t *)mobj)->oid;
258 } else { // If obj found in machine (i.e. has not moved)
259 //Check if obj is locked
260 if ((((objheader_t *)mobj)->status & LOCK) == LOCK) {
261 if (version == ((objheader_t *)mobj)->version) { // If version match
262 objinfo[i].poss_val = OBJ_LOCKED_BUT_VERSION_MATCH;
264 } else {//If versions don't match ..HARD ABORT
265 objinfo[i].poss_val = VERSION_NO_MATCH;
267 //send TRANS_DISAGREE to Coordinator
268 control = TRANS_DISAGREE;
269 write(acceptfd, &control, sizeof(char));
270 //TODO when TRANS_DISAGREE is sent
271 //Free space allocated in main objstore
272 //Unlock objects that was locked in the trans
275 } else {//Obj is not locked , so lock object
276 ((objheader_t *)mobj)->status |= LOCK;
277 //Save all object oids that are locked on this machine during this transaction request call
278 oidlocked[objlocked] = ((objheader_t *)mobj)->oid;
280 if (version == ((objheader_t *)mobj)->version) { //If versions match
281 objinfo[i].poss_val = OBJ_UNLOCK_BUT_VERSION_MATCH;
283 } else { //If versions don't match
284 objinfo[i].poss_val = VERSION_NO_MATCH;
286 //send TRANS_DISAGREE to Coordinator
287 control = TRANS_DISAGREE;
288 write(acceptfd, &control, sizeof(char));
295 //Decide what control message(s) to send
296 if(v_matchnolock == fixed->numread + fixed->nummod) {
297 //send TRANS_AGREE to Coordinator
298 control = TRANS_AGREE;
299 write(acceptfd, &control, sizeof(char));
302 if(objnotfound > 0 && v_matchlock == 0 && v_nomatch == 0) {
303 //send TRANS_AGREE_BUT_MISSING_OBJECTS to Coordinator
304 control = TRANS_AGREE_BUT_MISSING_OBJECTS;
305 write(acceptfd, &control, sizeof(char));
306 //send missing oids and number of oids not found with it
307 write(acceptfd, &objnotfound, sizeof(int));
308 write(acceptfd, oidnotfound, (sizeof(unsigned int) * objnotfound));
311 if(v_matchlock > 0 && v_nomatch == 0) {
312 //send TRANS_SOFT_ABORT to Coordinator
313 control = TRANS_SOFT_ABORT;
314 write(acceptfd, &control, sizeof(char));
315 //send missing oids and number of oids not found with it
316 write(acceptfd, &objnotfound, sizeof(int));
317 write(acceptfd, oidnotfound, (sizeof(unsigned int) * objnotfound));
320 //TODO when TRANS_DISAGREE is sent
321 //Free space allocated in main objstore
322 //Unlock objects that was locked in the trans
323 if(control == TRANS_DISAGREE) {
324 for(i = 0; i< objlocked ; i++) {
325 mobj = mhashSearch(oidlocked[i]);// find the header address
326 ((objheader_t *)mobj)->status &= ~(LOCK);