various bug fixes.
authorerubow <erubow>
Wed, 19 Sep 2007 02:10:04 +0000 (02:10 +0000)
committererubow <erubow>
Wed, 19 Sep 2007 02:10:04 +0000 (02:10 +0000)
Robust/src/Runtime/DSTM/interface/dstm.h
Robust/src/Runtime/DSTM/interface/dstmserver.c
Robust/src/Runtime/DSTM/interface/mcpileq.c
Robust/src/Runtime/DSTM/interface/mcpileq.h
Robust/src/Runtime/DSTM/interface/queue.c
Robust/src/Runtime/DSTM/interface/trans.c

index d4ea42b7da4ec67116a7b7d34e12aeb5d7e8a815..9ef13b5a22d3fe5b785eee26bea953c8f17f984d 100644 (file)
@@ -227,7 +227,7 @@ objheader_t *transCreateObj(transrecord_t *, unsigned int); //returns oid
 int transCommit(transrecord_t *record); //return 0 if successful
 void *transRequest(void *);    //the C routine that the thread will execute when TRANS_REQUEST begins
 void *handleLocalReq(void *);  //the C routine that the local m/c thread will execute 
-int decideResponse(thread_data_array_t *);// Coordinator decides what response to send to the participant
+void decideResponse(thread_data_array_t *);// Coordinator decides what response to send to the participant
 char sendResponse(thread_data_array_t *, int); //Sends control message back to Participants
 void *getRemoteObj(transrecord_t *, unsigned int, unsigned int);
 int transAbortProcess(void *, unsigned int *, int, int);
index 2d2935779bd7617dfab20172796f6e8248b7db94..2cea115cd032679706d0266c1cecca8c681710d1 100644 (file)
@@ -94,7 +94,6 @@ void *dstmListen()
                acceptfd = accept(listenfd, (struct sockaddr *)&client_addr, &addrlength);
                pthread_create(&thread_dstm_accept, NULL, dstmAccept, (void *)acceptfd);
        }
-       pthread_exit(NULL);
 }
 /* This function accepts a new connection request, decodes the control message in the connection 
  * and accordingly calls other functions to process new requests */
@@ -109,15 +108,21 @@ void *dstmAccept(void *acceptfd)
        trans_commit_data_t transinfo;
        unsigned short objType;
        
+       transinfo.objlocked = NULL;
+       transinfo.objnotfound = NULL;
+       transinfo.modptr = NULL;
+       transinfo.numlocked = 0;
+       transinfo.numnotfound = 0;
+
        int fd_flags = fcntl((int)acceptfd, F_GETFD), size;
 
        /* Receive control messages from other machines */
        if((retval = recv((int)acceptfd, &control, sizeof(char), 0)) <= 0) {
                if (retval == 0) {
-                       return; // Testing connection
+                       pthread_exit(NULL); // Testing connection
                }
                perror("Error in receiving control from coordinator\n");
-               return;
+               pthread_exit(NULL);
        }
        
        switch(control) {
@@ -125,7 +130,7 @@ void *dstmAccept(void *acceptfd)
                        /* Read oid requested and search if available */
                        if((retval = recv((int)acceptfd, &oid, sizeof(unsigned int), 0)) <= 0) {
                                perror("Error receiving object from cooridnator\n");
-                               return NULL;
+                               pthread_exit(NULL);
                        }
                        if((srcObj = mhashSearch(oid)) == NULL) {
                                printf("Object not found in Main Object Store %s %d\n", __FILE__, __LINE__);
@@ -138,7 +143,7 @@ void *dstmAccept(void *acceptfd)
                                ctrl = OBJECT_NOT_FOUND;
                                if(send((int)acceptfd, &ctrl, sizeof(char), MSG_NOSIGNAL) < sizeof(char)) {
                                        perror("Error sending control msg to coordinator\n");
-                                       return NULL;
+                                       pthread_exit(NULL);
                                }
                        } else {
                                /* Type */
@@ -146,11 +151,11 @@ void *dstmAccept(void *acceptfd)
                                *((int *)&msg[1])=size;
                                if(send((int)acceptfd, &msg, sizeof(msg), MSG_NOSIGNAL) < sizeof(msg)) {
                                        perror("Error sending size of object to coordinator\n");
-                                       return NULL;
+                                       pthread_exit(NULL);
                                }
                                if(send((int)acceptfd, h, size, MSG_NOSIGNAL) < size) {
                                        perror("Error in sending object\n");
-                                       return NULL;
+                                       pthread_exit(NULL);
                                }
                        }
                        break;
@@ -172,14 +177,14 @@ void *dstmAccept(void *acceptfd)
                        printf("DEBUG -> Recv TRANS_REQUEST\n");
                        if((val = readClientReq(&transinfo, (int)acceptfd)) != 0) {
                                printf("Error in readClientReq\n");
-                               return;
+                               pthread_exit(NULL);
                        }
                        break;
                case TRANS_PREFETCH:
                        printf("DEBUG -> Recv TRANS_PREFETCH\n");
                        if((val = prefetchReq((int)acceptfd)) != 0) {
                                printf("Error in readClientReq\n");
-                               return;
+                               pthread_exit(NULL);
                        }
                        break;
                case START_REMOTE_THREAD:
@@ -221,6 +226,8 @@ int readClientReq(trans_commit_data_t *transinfo, int acceptfd) {
        objheader_t *headaddr;
        int sum = 0, i, N, n, val;
 
+       oidmod = NULL;
+
        /* Read fixed_data_t data structure */ 
        N = sizeof(fixed) - 1;
        ptr = (char *)&fixed;;
@@ -274,6 +281,11 @@ int readClientReq(trans_commit_data_t *transinfo, int acceptfd) {
 
        /* Create an array of oids for modified objects */
        oidmod = (unsigned int *) calloc(fixed.nummod, sizeof(unsigned int));
+       if (oidmod == NULL)
+       {
+               printf("calloc error %s, %d\n", __FILE__, __LINE__);
+               return 1;
+       }
        ptr = (char *) modptr;
        for(i = 0 ; i < fixed.nummod; i++) {
          int tmpsize;
@@ -416,7 +428,7 @@ char handleTransReq(fixed_data_t *fixed, trans_commit_data_t *transinfo, unsigne
        short version;
        char control = 0, *ptr;
        unsigned int oid;
-       unsigned int *oidnotfound, *oidlocked, *oidmod;
+       unsigned int *oidnotfound, *oidlocked;
        void *mobj;
        objheader_t *headptr;
 
@@ -452,7 +464,6 @@ char handleTransReq(fixed_data_t *fixed, trans_commit_data_t *transinfo, unsigne
 
                if ((mobj = mhashSearch(oid)) == NULL) {/* Obj not found */
                        /* Save the oids not found and number of oids not found for later use */
-                       //oidnotfound[objnotfound] = OID(((objheader_t *)mobj));
                        oidnotfound[objnotfound] = oid;
                        objnotfound++;
                } else { /* If Obj found in machine (i.e. has not moved) */
index 0555e8c451e81aad07b993ec9dc64e26af416dee..38941ef5289dd3b9ea317bd5d3405844df4e098a 100644 (file)
@@ -27,12 +27,6 @@ void mcpileenqueue(prefetchpile_t *node) {
        prefetchpile_t *tmp, *prev;
        if(mcqueue.front == NULL && mcqueue.rear == NULL) {
                mcqueue.front = mcqueue.rear = node;
-               /*tmp = mcqueue.front = node;
-               while(tmp != NULL) {
-                       prev = tmp;
-                       tmp = tmp->next;
-               }
-               mcqueue.rear = prev;*/
        } else {
                tmp = mcqueue.rear->next = node;
                while(tmp != NULL) {
@@ -59,24 +53,6 @@ prefetchpile_t *mcpiledequeue(void) {
        return retnode;
 }
 
-/* Delete the node pointed to by the front ptr of the queue */
-void delnode() {
-       prefetchpile_t *delnode;
-       if((mcqueue.front == NULL) && (mcqueue.rear == NULL)) {
-               printf("The queue is empty: UNDERFLOW %s, %d\n", __FILE__, __LINE__);
-               return;
-       } else if ((mcqueue.front == mcqueue.rear) && mcqueue.front != NULL && mcqueue.rear != NULL) {
-               printf("TEST1\n");
-               free(mcqueue.front);
-               mcqueue.front = mcqueue.rear = NULL;
-       } else {
-               delnode = mcqueue.front;
-               mcqueue.front = mcqueue.front->next;
-               printf("TEST2\n");
-               free(delnode);
-       }
-}
-
 void mcpiledelete(void) {
        /* Remove each element */
        while(mcqueue.front != NULL)
index 93e206301e7c3a9925628db8eb07e60fcc400117..8291c5919e56e3f9c9407cc31d9fcc0050efa3e6 100644 (file)
@@ -30,7 +30,6 @@ typedef struct mcpileq {
 void mcpileqInit(void);
 void mcpileenqueue(prefetchpile_t *);
 prefetchpile_t *mcpiledequeue(void);
-void delnode();
 void mcpiledelete();
 void mcpiledisplay();
 void mcdealloc(prefetchpile_t *);
index 373de7492dfb8730f682987ffc5eb3793e96e076..7a8fbba392ade522054a244ba793079e28d7257a 100644 (file)
@@ -74,7 +74,6 @@ void queueDisplay() {
 
 void predealloc(prefetchqelem_t *node) {
        free(node);
-       node->next = NULL;
 }
 
 
index 2b0b5fbcaabe91aab98091984076db6bb75a42fb..c50a10b07ac1a381815df6b65b9b9bf6a1b4b6f0 100644 (file)
@@ -351,8 +351,8 @@ plistnode_t *createPiles(transrecord_t *record) {
 int transCommit(transrecord_t *record) {       
        unsigned int tot_bytes_mod, *listmid;
        plistnode_t *pile, *pile_ptr;
-       int i, rc, val;
-       int pilecount = 0, offset, threadnum = 0, trecvcount = 0, tmachcount = 0;
+       int i, j, rc, val;
+       int pilecount, offset, threadnum, trecvcount;
        char buffer[RECEIVE_BUFFER_SIZE],control;
        char transid[TID_LEN];
        trans_req_data_t *tosend;
@@ -362,6 +362,8 @@ int transCommit(transrecord_t *record) {
        char localstat = 0;
 
        do {
+               trecvcount = 0;
+               threadnum = 0;
 
                /* Look through all the objects in the transaction record and make piles 
                 * for each machine involved in the transaction*/
@@ -375,6 +377,7 @@ int transCommit(transrecord_t *record) {
                /* Create a list of machine ids(Participants) involved in transaction   */
                if((listmid = calloc(pilecount, sizeof(unsigned int))) == NULL) {
                        printf("Calloc error %s, %d\n", __FILE__, __LINE__);
+                       free(record);
                        return 1;
                }               
                pListMid(pile, listmid);
@@ -395,6 +398,7 @@ int transCommit(transrecord_t *record) {
                        pthread_mutex_destroy(&tlock);
                        pDelete(pile_ptr);
                        free(listmid);
+                       free(record);
                        return 1;
                }
 
@@ -406,6 +410,7 @@ int transCommit(transrecord_t *record) {
                        pDelete(pile_ptr);
                        free(listmid);
                        free(thread_data_array);
+                       free(record);
                        return 1;
                }
 
@@ -429,6 +434,7 @@ int transCommit(transrecord_t *record) {
                                free(listmid);
                                free(thread_data_array);
                                free(ltdata);
+                               free(record);
                                return 1;
                        }
                        tosend->f.control = TRANS_REQUEST;
@@ -454,31 +460,35 @@ int transCommit(transrecord_t *record) {
                        thread_data_array[threadnum].rec = record;
                        /* If local do not create any extra connection */
                        if(pile->mid != myIpAddr) { /* Not local */
-                               rc = pthread_create(&thread[threadnum], NULL, transRequest, (void *) &thread_data_array[threadnum]);  
+                               rc = pthread_create(&thread[threadnum], &attr, transRequest, (void *) &thread_data_array[threadnum]);  
                                if(rc) {
                                        perror("Error in pthread create\n");
                                        pthread_cond_destroy(&tcond);
                                        pthread_mutex_destroy(&tlock);
                                        pDelete(pile_ptr);
                                        free(listmid);
+                                       for (i = 0; i < threadnum; i++)
+                                               free(thread_data_array[i].buffer);
                                        free(thread_data_array);
                                        free(ltdata);
-                                       free(tosend);
+                                       free(record);
                                        return 1;
                                }
                        } else { /*Local*/
                                ltdata->tdata = &thread_data_array[threadnum];
                                ltdata->transinfo = &transinfo;
-                               val = pthread_create(&thread[threadnum], NULL, handleLocalReq, (void *) ltdata);
+                               val = pthread_create(&thread[threadnum], &attr, handleLocalReq, (void *) ltdata);
                                if(val) {
                                        perror("Error in pthread create\n");
                                        pthread_cond_destroy(&tcond);
                                        pthread_mutex_destroy(&tlock);
                                        pDelete(pile_ptr);
                                        free(listmid);
+                                       for (i = 0; i < threadnum; i++)
+                                               free(thread_data_array[i].buffer);
                                        free(thread_data_array);
                                        free(ltdata);
-                                       free(tosend);
+                                       free(record);
                                        return 1;
                                }
                        }
@@ -499,9 +509,11 @@ int transCommit(transrecord_t *record) {
                                pthread_mutex_destroy(&tlock);
                                pDelete(pile_ptr);
                                free(listmid);
+                               for (j = i; j < pilecount; j++)
+                                       free(thread_data_array[j].buffer);
                                free(thread_data_array);
                                free(ltdata);
-                               free(tosend);
+                               free(record);
                                return 1;
                        }
                        free(thread_data_array[i].buffer);
@@ -511,13 +523,10 @@ int transCommit(transrecord_t *record) {
                /* Free resources */    
                pthread_cond_destroy(&tcond);
                pthread_mutex_destroy(&tlock);
-               if(listmid != NULL)
-                       free(listmid);
+               free(listmid);
                pDelete(pile_ptr);
-               if(thread_data_array != NULL)
-                       free(thread_data_array);
-               if(ltdata != NULL)
-                       free(ltdata);
+               free(thread_data_array);
+               free(ltdata);
 
                /* wait a random amount of time */
                if (treplyretry == 1)
@@ -525,7 +534,8 @@ int transCommit(transrecord_t *record) {
 
        /* Retry trans commit procedure if not sucessful in the first try */
        } while (treplyretry == 1);
-
+       
+       free(record);
        return 0;
 }
 
@@ -548,7 +558,6 @@ void *transRequest(void *threadarg) {
        if ((sd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
                perror("Error in socket for TRANS_REQUEST\n");
                pthread_exit(NULL);
-               return NULL;
        }
        bzero((char*) &serv_addr, sizeof(serv_addr));
        serv_addr.sin_family = AF_INET;
@@ -559,24 +568,24 @@ void *transRequest(void *threadarg) {
        /* Open Connection */
        if (connect(sd, (struct sockaddr *) &serv_addr, sizeof(struct sockaddr)) < 0) {
                perror("Error in connect for TRANS_REQUEST\n");
+               close(sd);
                pthread_exit(NULL);
-               return NULL;
        }
 
        printf("DEBUG-> trans.c Sending TRANS_REQUEST to mid %s\n", machineip);
        /* Send bytes of data with TRANS_REQUEST control message */
        if (send(sd, &(tdata->buffer->f), sizeof(fixed_data_t),MSG_NOSIGNAL) < sizeof(fixed_data_t)) {
                perror("Error sending fixed bytes for thread\n");
+               close(sd);
                pthread_exit(NULL);
-               return NULL;
        }
        /* Send list of machines involved in the transaction */
        {
                int size=sizeof(unsigned int)*tdata->buffer->f.mcount;
                if (send(sd, tdata->buffer->listmid, size, MSG_NOSIGNAL) < size) {
                        perror("Error sending list of machines for thread\n");
+                       close(sd);
                        pthread_exit(NULL);
-                       return NULL;
                }
        }
        /* Send oids and version number tuples for objects that are read */
@@ -584,8 +593,8 @@ void *transRequest(void *threadarg) {
                int size=(sizeof(unsigned int)+sizeof(short))*tdata->buffer->f.numread;
                if (send(sd, tdata->buffer->objread, size, MSG_NOSIGNAL) < size) {
                        perror("Error sending tuples for thread\n");
+                       close(sd);
                        pthread_exit(NULL);
-                       return NULL;
                }
        }
        /* Send objects that are modified */
@@ -596,16 +605,16 @@ void *transRequest(void *threadarg) {
                size+=sizeof(objheader_t);
                if (send(sd, headeraddr, size, MSG_NOSIGNAL)  < size) {
                        perror("Error sending obj modified for thread\n");
+                       close(sd);
                        pthread_exit(NULL);
-                       return NULL;
                }
        }
 
        /* Read control message from Participant */
        if((n = read(sd, &control, sizeof(char))) <= 0) {
                perror("Error in reading control message from Participant\n");
+               close(sd);
                pthread_exit(NULL);
-               return NULL;
        }
        recvcontrol = control;
 
@@ -620,12 +629,7 @@ void *transRequest(void *threadarg) {
 
        /* Wake up the threads and invoke decideResponse (once) */
        if(*(tdata->count) == tdata->buffer->f.mcount) {
-               if (decideResponse(tdata) != 0) { 
-                       printf("decideResponse returned error %s,%d\n", __FILE__, __LINE__);
-                       pthread_mutex_unlock(tdata->lock);
-                       pthread_exit(NULL);
-                       return NULL;
-               }
+               decideResponse(tdata); 
                pthread_cond_broadcast(tdata->threshold);
        } else {
                pthread_cond_wait(tdata->threshold, tdata->lock);
@@ -636,9 +640,8 @@ void *transRequest(void *threadarg) {
         * to all participants in their respective socket */
        if (sendResponse(tdata, sd) == 0) { 
                printf("sendResponse returned error %s,%d\n", __FILE__, __LINE__);
-               pthread_mutex_unlock(tdata->lock);
+               close(sd);
                pthread_exit(NULL);
-               return NULL;
        }
 
        /* Close connection */
@@ -648,7 +651,7 @@ void *transRequest(void *threadarg) {
 
 /* This function decides the reponse that needs to be sent to 
  * all Participant machines after the TRANS_REQUEST protocol */
-int decideResponse(thread_data_array_t *tdata) {
+void decideResponse(thread_data_array_t *tdata) {
        char control;
        int i, transagree = 0, transdisagree = 0, transsoftabort = 0; /* Counters to formulate decision of what
                                                                         message to send */
@@ -657,6 +660,9 @@ int decideResponse(thread_data_array_t *tdata) {
                control = tdata->recvmsg[i].rcv_status; /* tdata: keeps track of all participant responses
                                                           written onto the shared array */
                switch(control) {
+                       default:
+                               printf("Participant sent unknown message in %s, %d\n", __FILE__, __LINE__);
+                               /* treat as disagree, pass thru */
                        case TRANS_DISAGREE:
                                transdisagree++;
                                break;
@@ -668,9 +674,6 @@ int decideResponse(thread_data_array_t *tdata) {
                        case TRANS_SOFT_ABORT:
                                transsoftabort++;
                                break;
-                       default:
-                               printf("Participant sent unknown message in %s, %d\n", __FILE__, __LINE__);
-                               return -1;
                }
        }
 
@@ -680,23 +683,19 @@ int decideResponse(thread_data_array_t *tdata) {
                /* Free resources */
                objstrDelete(tdata->rec->cache);
                chashDelete(tdata->rec->lookupTable);
-               free(tdata->rec);
        } else if(transagree == tdata->buffer->f.mcount){
                /* Send Commit */
                *(tdata->replyctrl) = TRANS_COMMIT;
                /* Free resources */
                objstrDelete(tdata->rec->cache);
                chashDelete(tdata->rec->lookupTable);
-               free(tdata->rec);
-       } else if(transsoftabort > 0 && transdisagree == 0) {
+       } else { /* (transsoftabort > 0 && transdisagree == 0) */
                /* Send Abort in soft abort case followed by retry commiting transaction again*/
                *(tdata->replyctrl) = TRANS_ABORT;
                *(tdata->replyretry) = 1;
-       } else {
-               return -1;
        }
 
-       return 0;
+       return;
 }
 /* This function sends the final response to remote machines per thread in their respective socket id */
 char sendResponse(thread_data_array_t *tdata, int sd) {
@@ -837,7 +836,7 @@ void *handleLocalReq(void *threadarg) {
        if ((modptr = objstrAlloc(mainobjstore, localtdata->tdata->buffer->f.sum_bytes)) == NULL) {
                printf("objstrAlloc error for modified objects %s, %d\n", __FILE__, __LINE__);
                pthread_mutex_unlock(&mainobjstore_mutex);
-               return NULL;
+               pthread_exit(NULL);
        }
        pthread_mutex_unlock(&mainobjstore_mutex);
        /* Write modified objects into the mainobject store */
@@ -946,11 +945,7 @@ void *handleLocalReq(void *threadarg) {
 
        /* Wake up the threads and invoke decideResponse (once) */
        if(*(localtdata->tdata->count) == localtdata->tdata->buffer->f.mcount) {
-               if (decideResponse(localtdata->tdata) != 0) { 
-                       printf("decideResponse returned error %s,%d\n", __FILE__, __LINE__);
-                       pthread_mutex_unlock(localtdata->tdata->lock);
-                       return NULL;
-               }
+               decideResponse(localtdata->tdata); 
                pthread_cond_broadcast(localtdata->tdata->threshold);
        } else {
                pthread_cond_wait(localtdata->tdata->threshold, localtdata->tdata->lock);
@@ -961,12 +956,12 @@ void *handleLocalReq(void *threadarg) {
        if(*(localtdata->tdata->replyctrl) == TRANS_ABORT){
                if(transAbortProcess(modptr,oidlocked, localtdata->transinfo->numlocked, localtdata->tdata->buffer->f.nummod) != 0) {
                        printf("Error in transAbortProcess() %s,%d\n", __FILE__, __LINE__);
-                       return NULL;
+                       pthread_exit(NULL);
                }
        }else if(*(localtdata->tdata->replyctrl) == TRANS_COMMIT){
                if(transComProcess(modptr, localtdata->tdata->buffer->oidmod, localtdata->tdata->buffer->oidcreated, oidlocked, localtdata->tdata->buffer->f.nummod, localtdata->tdata->buffer->f.numcreated, localtdata->transinfo->numlocked) != 0) {
                        printf("Error in transComProcess() %s,%d\n", __FILE__, __LINE__);
-                       return NULL;
+                       pthread_exit(NULL);
                }
        }
 
@@ -1328,7 +1323,7 @@ void *transPrefetch(void *t) {
                /* dequeue node to create a machine piles and  finally unlock mutex */
                if((qnode = pre_dequeue()) == NULL) {
                        printf("Error: No node returned %s, %d\n", __FILE__, __LINE__);
-                       return NULL;
+                       pthread_exit(NULL);
                }
                pthread_mutex_unlock(&pqueue.qlock);
                /* Reduce redundant prefetch requests */
@@ -1371,7 +1366,7 @@ void *mcqProcess(void *threadid) {
                /* Dequeue node to send remote machine connections*/
                if((mcpilenode = mcpiledequeue()) == NULL) {
                        printf("Dequeue Error: No node returned %s %d\n", __FILE__, __LINE__);
-                       return NULL;
+                       pthread_exit(NULL);
                }
                /* Unlock mutex */
                pthread_mutex_unlock(&mcqueue.qlock);
@@ -1409,6 +1404,7 @@ void sendPrefetchReq(prefetchpile_t *mcpilenode, int threadid) {
        /* Open Connection */
        if (connect(sd, (struct sockaddr *) &serv_addr, sizeof(struct sockaddr)) < 0) {
                perror("Error in connect for TRANS_REQUEST\n");
+               close(sd);
                return;
        }
 
@@ -1416,6 +1412,7 @@ void sendPrefetchReq(prefetchpile_t *mcpilenode, int threadid) {
        control = TRANS_PREFETCH;
        if(send(sd, &control, sizeof(char), MSG_NOSIGNAL) < sizeof(char)) {
                perror("Error in sending prefetch control\n");
+               close(sd);
                return;
        }
 
@@ -1436,6 +1433,7 @@ void sendPrefetchReq(prefetchpile_t *mcpilenode, int threadid) {
                }
                if (send(sd, &oidnoffset, sizeof(oidnoffset),MSG_NOSIGNAL) < sizeof(oidnoffset)) {
                        perror("Error sending fixed bytes for thread\n");
+                       close(sd);
                        return;
                }
                tmp = tmp->next;
@@ -1445,6 +1443,7 @@ void sendPrefetchReq(prefetchpile_t *mcpilenode, int threadid) {
        endpair = -1;
        if (send(sd, &endpair, sizeof(int), MSG_NOSIGNAL) < sizeof(int)) {
                perror("Error sending fixed bytes for thread\n");
+               close(sd);
                return;
        }