Fix tabbing.... Please fix your editors so they do tabbing correctly!!! (Spaces...
[IRC.git] / Robust / src / Runtime / DSTM / interface / trans.c
index 81e821487d4cebd9c3727fbb01868d76e8d1cc8b..77a6c823021bb641a636e495d77f03873a8a4b79 100644 (file)
@@ -29,9 +29,9 @@
 char bigarray[16*1024*1024];
 int bigindex=0;
 #define LOGEVENT(x) { \
-    int tmp=bigindex++;                                \
-    bigarray[tmp]=x;                           \
-  }
+    int tmp=bigindex++;                         \
+    bigarray[tmp]=x;                            \
+}
 #else
 #define LOGEVENT(x)
 #endif
@@ -44,14 +44,14 @@ unsigned int bigarray3[6*1024*1024];
 long long bigarray4[6*1024*1024];
 int bigarray5[6*1024*1024];
 int bigindex1=0;
-#define LOGTIME(x,y,z,a,b) {\
-  int tmp=bigindex1; \
-  bigarray1[tmp]=x; \
-  bigarray2[tmp]=y; \
-  bigarray3[tmp]=z; \
-  bigarray4[tmp]=a; \
-  bigarray5[tmp]=b; \
-  bigindex1++; \
+#define LOGTIME(x,y,z,a,b) { \
+    int tmp=bigindex1; \
+    bigarray1[tmp]=x; \
+    bigarray2[tmp]=y; \
+    bigarray3[tmp]=z; \
+    bigarray4[tmp]=a; \
+    bigarray5[tmp]=b; \
+    bigindex1++; \
 }
 #else
 #define LOGTIME(x,y,z,a,b)
@@ -171,7 +171,7 @@ void forcesend_buf(int fd, struct writestruct * sendbuffer, void *buffer, int bu
 int recvw(int fd, void *buf, int len, int flags) {
   return recv(fd, buf, len, flags);
 }
+
 void recv_data_buf(int fd, struct readstruct * readbuffer, void *buffer, int buflen) {
   char *buf=(char *)buffer;
   int numbytes=readbuffer->head-readbuffer->tail;
@@ -190,11 +190,11 @@ void recv_data_buf(int fd, struct readstruct * readbuffer, void *buffer, int buf
     recv_data(fd, buf, buflen);
     return;
   }
-  
+
   int maxbuf=MAXBUF;
   int obufflen=buflen;
   readbuffer->head=0;
-  
+
   while (buflen > 0) {
     int numbytes = recvw(fd, &readbuffer->buf[readbuffer->head], maxbuf, 0);
     if (numbytes == -1) {
@@ -232,7 +232,7 @@ int recv_data_errorcode_buf(int fd, struct readstruct * readbuffer, void *buffer
   int maxbuf=MAXBUF;
   int obufflen=buflen;
   readbuffer->head=0;
-  
+
   while (buflen > 0) {
     int numbytes = recvw(fd, &readbuffer->buf[readbuffer->head], maxbuf, 0);
     if (numbytes ==0) {
@@ -302,7 +302,7 @@ void printhex(unsigned char *ptr, int numBytes) {
 
 inline int arrayLength(int *array) {
   int i;
-  for(i=0 ; array[i] != -1; i++)
+  for(i=0; array[i] != -1; i++)
     ;
   return i;
 }
@@ -331,60 +331,60 @@ void prefetch(int siteid, int ntuples, unsigned int *oids, unsigned short *endof
   int attempted=0;
   char *node;
   do {
-  node=getmemory(qnodesize);
-  if (node==NULL&&attempted)
-    break;
-  if (node!=NULL) {
+    node=getmemory(qnodesize);
+    if (node==NULL&&attempted)
+      break;
+    if (node!=NULL) {
 #else
   char *node=getmemory(qnodesize);
 #endif
-  int top=endoffsets[ntuples-1];
+      int top=endoffsets[ntuples-1];
 
-  if (node==NULL) {
-    LOGEVENT('D');
-    return;
-  }
-  /* Set queue node values */
+      if (node==NULL) {
+       LOGEVENT('D');
+       return;
+      }
+      /* Set queue node values */
 
-  /* TODO: Remove this after testing */
-  evalPrefetch[siteid].callcount++;
+      /* TODO: Remove this after testing */
+      evalPrefetch[siteid].callcount++;
 
-  *((int *)(node))=siteid;
-  *((int *)(node + sizeof(int))) = ntuples;
-  len = 2*sizeof(int);
-  memcpy(node+len, oids, ntuples*sizeof(unsigned int));
-  memcpy(node+len+ntuples*sizeof(unsigned int), endoffsets, ntuples*sizeof(unsigned short));
-  memcpy(node+len+ntuples*(sizeof(unsigned int)+sizeof(short)), arrayfields, top*sizeof(short));
+      *((int *)(node))=siteid;
+      *((int *)(node + sizeof(int))) = ntuples;
+      len = 2*sizeof(int);
+      memcpy(node+len, oids, ntuples*sizeof(unsigned int));
+      memcpy(node+len+ntuples*sizeof(unsigned int), endoffsets, ntuples*sizeof(unsigned short));
+      memcpy(node+len+ntuples*(sizeof(unsigned int)+sizeof(short)), arrayfields, top*sizeof(short));
 
 #ifdef INLINEPREFETCH
-  movehead(qnodesize);
-  }
-  int numpref=numavailable();
-  attempted=1;
+      movehead(qnodesize);
+    }
+    int numpref=numavailable();
+    attempted=1;
+
+    if (node==NULL && numpref!=0 || numpref>=PREFTHRESHOLD) {
+      node=gettail();
+      prefetchpile_t *pilehead = foundLocal(node,numpref,siteid);
+      if (pilehead!=NULL) {
+       // Get sock from shared pool
+
+       /* Send  Prefetch Request */
+       prefetchpile_t *ptr = pilehead;
+       while(ptr != NULL) {
+         globalid++;
+         int sd = getSock2(transPrefetchSockPool, ptr->mid);
+         sendPrefetchReq(ptr, sd, globalid);
+         ptr = ptr->next;
+       }
 
-  if (node==NULL && numpref!=0 || numpref>=PREFTHRESHOLD) {
-    node=gettail();
-    prefetchpile_t *pilehead = foundLocal(node,numpref,siteid);
-    if (pilehead!=NULL) {
-      // Get sock from shared pool
-      
-      /* Send  Prefetch Request */
-      prefetchpile_t *ptr = pilehead;
-      while(ptr != NULL) {
-        globalid++;
-        int sd = getSock2(transPrefetchSockPool, ptr->mid);
-        sendPrefetchReq(ptr, sd, globalid);
-        ptr = ptr->next;
+       mcdealloc(pilehead);
       }
-      
-      mcdealloc(pilehead);
-    }
-    resetqueue();
-  }//end do prefetch if condition
+      resetqueue();
+    } //end do prefetch if condition
   } while(node==NULL);
 #else
-  /* Lock and insert into primary prefetch queue */
-  movehead(qnodesize);
+      /* Lock and insert into primary prefetch queue */
+      movehead(qnodesize);
 #endif
 }
 
@@ -397,7 +397,7 @@ int dstmStartup(const char * option) {
   int udpfd;
 
   if (processConfigFile() != 0)
-    return 0; //TODO: return error value, cause main program to exit
+    return 0;  //TODO: return error value, cause main program to exit
 #ifdef COMPILER
   if (!master)
     threadcount--;
@@ -562,22 +562,22 @@ void transStart() {
 #endif
 }
 
-// Search for an address for a given oid                                                                               
+// Search for an address for a given oid
 /*#define INLINE    inline __attribute__((always_inline))
 
-INLINE void * chashSearchI(chashtable_t *table, unsigned int key) {
-  //REMOVE HASH FUNCTION CALL TO MAKE SURE IT IS INLINED HERE                                                          
-  chashlistnode_t *node = &table->table[(key & table->mask)>>1];
+   INLINE void * chashSearchI(chashtable_t *table, unsigned int key) {
+   //REMOVE HASH FUNCTION CALL TO MAKE SURE IT IS INLINED HERE
+   chashlistnode_t *node = &table->table[(key & table->mask)>>1];
 
-  do {
+   do {
     if(node->key == key) {
       return node->val;
     }
     node = node->next;
-  } while(node != NULL);
+   } while(node != NULL);
 
-  return NULL;
-  }*/
+   return NULL;
+   }*/
 
 
 
@@ -595,36 +595,36 @@ __attribute__((pure)) objheader_t *transRead(unsigned int oid) {
   if(oid == 0) {
     return NULL;
   }
-  
+
 
   node= &c_table[(oid & c_mask)>>1];
   do {
     if(node->key == oid) {
 #ifdef TRANSSTATS
-    nchashSearch++;
+      nchashSearch++;
 #endif
 #ifdef COMPILER
-    return &((objheader_t*)node->val)[1];
+      return &((objheader_t*)node->val)[1];
 #else
-    return node->val;
+      return node->val;
 #endif
     }
     node = node->next;
   } while(node != NULL);
-  
 
-  /*  
-  if((objheader = chashSearchI(record->lookupTable, oid)) != NULL) {
-#ifdef TRANSSTATS
-    nchashSearch++;
-#endif
-#ifdef COMPILER
-    return &objheader[1];
-#else
-    return objheader;
-#endif
-  } else 
-  */
+
+  /*
+     if((objheader = chashSearchI(record->lookupTable, oid)) != NULL) {
+   #ifdef TRANSSTATS
+     nchashSearch++;
+   #endif
+   #ifdef COMPILER
+     return &objheader[1];
+   #else
+     return objheader;
+   #endif
+     } else
+   */
 
 #ifdef ABORTREADERS
   if (t_abort) {
@@ -659,9 +659,9 @@ __attribute__((pure)) objheader_t *transRead(unsigned int oid) {
     if((tmp = (objheader_t *) prehashSearch(oid)) != NULL) {
       if(STATUS(tmp) & DIRTY) {
 #ifdef TRANSSTATS
-        ndirtyCacheObj++;
+       ndirtyCacheObj++;
 #endif
-        goto remoteread;
+       goto remoteread;
       }
 #ifdef TRANSSTATS
       nprehashSearch++;
@@ -703,10 +703,10 @@ remoteread:
       int size;
       GETSIZE(size, objcopy);
       if((headerObj = prefetchobjstrAlloc(size + sizeof(objheader_t))) == NULL) {
-        printf("%s(): Error in getting memory from prefetch cache at %s, %d\n", __func__,
-            __FILE__, __LINE__);
-        pthread_mutex_unlock(&prefetchcache_mutex);
-        return NULL;
+       printf("%s(): Error in getting memory from prefetch cache at %s, %d\n", __func__,
+              __FILE__, __LINE__);
+       pthread_mutex_unlock(&prefetchcache_mutex);
+       return NULL;
       }
       pthread_mutex_unlock(&prefetchcache_mutex);
       memcpy(headerObj, objcopy, size+sizeof(objheader_t));
@@ -765,9 +765,9 @@ __attribute__((pure)) objheader_t *transRead2(unsigned int oid) {
     if((tmp = (objheader_t *) prehashSearch(oid)) != NULL) {
       if(STATUS(tmp) & DIRTY) {
 #ifdef TRANSSTATS
-        ndirtyCacheObj++;
+       ndirtyCacheObj++;
 #endif
-        goto remoteread;
+       goto remoteread;
       }
 #ifdef TRANSSTATS
       LOGEVENT('P')
@@ -796,8 +796,8 @@ remoteread:
     }
     objcopy = getRemoteObj(machinenumber, oid);
 #ifdef TRANSSTATS
-      LOGEVENT('R');
-      nRemoteSend++;
+    LOGEVENT('R');
+    nRemoteSend++;
 #endif
 
     if(objcopy == NULL) {
@@ -814,10 +814,10 @@ remoteread:
       int size;
       GETSIZE(size, objcopy);
       if((headerObj = prefetchobjstrAlloc(size+sizeof(objheader_t))) == NULL) {
-        printf("%s(): Error in getting memory from prefetch cache at %s, %d\n", __func__,
-            __FILE__, __LINE__);
-        pthread_mutex_unlock(&prefetchcache_mutex);
-        return NULL;
+       printf("%s(): Error in getting memory from prefetch cache at %s, %d\n", __func__,
+              __FILE__, __LINE__);
+       pthread_mutex_unlock(&prefetchcache_mutex);
+       return NULL;
       }
       pthread_mutex_unlock(&prefetchcache_mutex);
       memcpy(headerObj, objcopy, size+sizeof(objheader_t));
@@ -868,7 +868,7 @@ plistnode_t *createPiles() {
   /* Represents number of bins in the chash table */
   unsigned int size = c_size;
 
-  for(i = 0; i < size ; i++) {
+  for(i = 0; i < size; i++) {
     chashlistnode_t * curr = &ptr[i];
     /* Inner loop to traverse the linked list of the cache lookupTable */
     while(curr != NULL) {
@@ -903,8 +903,8 @@ plistnode_t *createPiles() {
   /* Represents number of bins in the chash table */
   unsigned int size = c_size;
 
-  for(i = 0; i < size ; i++) {
-    struct chashentry * curr = & ptr[i];
+  for(i = 0; i < size; i++) {
+    struct chashentry * curr = &ptr[i];
     /* Inner loop to traverse the linked list of the cache lookupTable */
     //if the first bin in hash table is empty
     if(curr->key == 0)
@@ -947,7 +947,7 @@ int transCommit() {
 
 #ifdef LOGEVENTS
   int iii;
-  for(iii=0;iii<bigindex;iii++) {
+  for(iii=0; iii<bigindex; iii++) {
     printf("%c", bigarray[iii]);
   }
 #endif
@@ -973,7 +973,7 @@ int transCommit() {
   int treplyretryCount = 0;
   /* Initialize timeout for exponential delay */
   exponential_backoff.tv_sec = 0;
-  exponential_backoff.tv_nsec = (long)(10000);//10 microsec
+  exponential_backoff.tv_nsec = (long)(10000); //10 microsec
   count_exponential_backoff = 0;
   do {
     treplyretry = 0;
@@ -1001,7 +1001,7 @@ int transCommit() {
     int socklist[pilecount];
     char getReplyCtrl[pilecount];
     int loopcount;
-    for(loopcount = 0 ; loopcount < pilecount; loopcount++){
+    for(loopcount = 0; loopcount < pilecount; loopcount++) {
       socklist[loopcount] = 0;
       getReplyCtrl[loopcount] = 0;
     }
@@ -1055,7 +1055,7 @@ int transCommit() {
        }
        int offset = 0;
        int i;
-       for(i = 0; i < tosend[sockindex].f.nummod ; i++) {
+       for(i = 0; i < tosend[sockindex].f.nummod; i++) {
          int size;
          objheader_t *headeraddr;
          if((headeraddr = t_chashSearch(tosend[sockindex].oidmod[i])) == NULL) {
@@ -1065,7 +1065,7 @@ int transCommit() {
            free(tosend);
            return 1;
          }
-      GETSIZE(size,headeraddr);
+         GETSIZE(size,headeraddr);
          size+=sizeof(objheader_t);
          memcpy(modptr+offset, headeraddr, size);
          offset+=size;
@@ -1079,7 +1079,7 @@ int transCommit() {
       pile = pile->next;
     } //end of pile processing
 
-      /* Recv Ctrl msgs from all machines */
+    /* Recv Ctrl msgs from all machines */
     int i;
     for(i = 0; i < pilecount; i++) {
       int sd = socklist[i];
@@ -1115,8 +1115,8 @@ int transCommit() {
            GETSIZE(size, header);
            size += sizeof(objheader_t);
            //make an entry in prefetch hash table
-        prehashInsert(oidToPrefetch, header);
-        LOGEVENT('E');
+           prehashInsert(oidToPrefetch, header);
+           LOGEVENT('E');
            length = length - size;
            offset += size;
          }
@@ -1195,9 +1195,9 @@ int transCommit() {
     /* wait a random amount of time before retrying to commit transaction*/
     if(treplyretry) {
       treplyretryCount++;
-     // if(treplyretryCount >= NUM_TRY_TO_COMMIT)
-     //   exponentialdelay();
-     // else 
+      // if(treplyretryCount >= NUM_TRY_TO_COMMIT)
+      //   exponentialdelay();
+      // else
       randomdelay();
 #ifdef TRANSSTATS
       nSoftAbort++;
@@ -1215,7 +1215,7 @@ int transCommit() {
     objstrDelete(t_cache);
     t_chashDelete();
 #ifdef SANDBOX
-      abortenabled=1;
+    abortenabled=1;
 #endif
     return TRANS_ABORT;
   } else if(finalResponse == TRANS_COMMIT) {
@@ -1283,7 +1283,7 @@ void handleLocalReq(trans_req_data_t *tdata, trans_commit_data_t *transinfo, cha
   transinfo->modptr = NULL;
   transinfo->numlocked = numoidlocked;
   transinfo->numnotfound = numoidnotfound;
-  
+
   /* Condition to send TRANS_AGREE */
   if(v_matchnolock == tdata->f.numread + tdata->f.nummod) {
     *getReplyCtrl = TRANS_AGREE;
@@ -1325,14 +1325,14 @@ void doLocalProcess(char finalResponse, trans_req_data_t *tdata, trans_commit_da
 char decideResponse(char *getReplyCtrl, char *treplyretry, int pilecount) {
   int i, transagree = 0, transdisagree = 0, transsoftabort = 0; /* Counters to formulate decision of what
                                                                   message to send */
-  for (i = 0 ; i < pilecount; i++) {
+  for (i = 0; i < pilecount; i++) {
     char control;
     control = getReplyCtrl[i];
     switch(control) {
     default:
       printf("Participant sent unknown message %d in %s, %d\n", control, __FILE__, __LINE__);
 
-      /* treat as disagree, pass thru */
+    /* treat as disagree, pass thru */
     case TRANS_DISAGREE:
       transdisagree++;
       break;
@@ -1603,14 +1603,14 @@ prefetchpile_t *foundLocal(char *ptr, int numprefetches, int mysiteid) {
   int j;
   prefetchpile_t * head=NULL;
 
-  for(j=0;j<numprefetches;j++) {
+  for(j=0; j<numprefetches; j++) {
     int siteid = *(GET_SITEID(ptr));
     int ntuples = *(GET_NTUPLES(ptr));
     unsigned int * oidarray = GET_PTR_OID(ptr);
     unsigned short * endoffsets = GET_PTR_EOFF(ptr, ntuples);
     short * arryfields = GET_PTR_ARRYFLD(ptr, ntuples);
     int numLocal = 0;
-    
+
     for(i=0; i<ntuples; i++) {
       unsigned short baseindex=(i==0) ? 0 : endoffsets[i-1];
       unsigned short endindex=endoffsets[i];
@@ -1626,10 +1626,10 @@ prefetchpile_t *foundLocal(char *ptr, int numprefetches, int mysiteid) {
       //Look up fields locally
       int isLastOffset=0;
       if(endindex==0)
-          isLastOffset=1;
+       isLastOffset=1;
       for(newbase=baseindex; newbase<endindex; newbase++) {
-        if(newbase==(endindex-1))
-          isLastOffset=1;
+       if(newbase==(endindex-1))
+         isLastOffset=1;
        if (!lookupObject(&oid,arryfields[newbase],&countInvalidObj)) {
          break;
        }
@@ -1649,10 +1649,10 @@ prefetchpile_t *foundLocal(char *ptr, int numprefetches, int mysiteid) {
       //Add to remote requests
       machinenum=lhashSearch(oid);
       insertPile(machinenum, oid, siteid,endindex-newbase, &arryfields[newbase], &head);
-    tuple:
+tuple:
       ;
     }
-    
+
     /* handle dynamic prefetching */
     handleDynPrefetching(numLocal, ntuples, siteid);
     ptr=((char *)&arryfields[endoffsets[ntuples-1]])+sizeof(int);
@@ -1685,11 +1685,11 @@ int lookupObject(unsigned int * oid, short offset, int *countInvalidObj) {
     ;
   } else if ((header=prehashSearch(*oid))!=NULL) {
     //Found in cache
-    if(STATUS(header) & DIRTY) {//Read an oid that is an old entry in the cache;
+    if(STATUS(header) & DIRTY) { //Read an oid that is an old entry in the cache;
       //only once because later old entries may still cause unnecessary roundtrips during prefetching
       (*countInvalidObj)+=1;
       if(*countInvalidObj > 1) {
-        return 0;
+       return 0;
       }
     }
   } else {
@@ -1731,10 +1731,10 @@ void *transPrefetch(void *t) {
       /* Send  Prefetch Request */
       prefetchpile_t *ptr = pilehead;
       while(ptr != NULL) {
-        globalid++;
-        int sd = getSock2(transPrefetchSockPool, ptr->mid);
-        sendPrefetchReq(ptr, sd,globalid);
-        ptr = ptr->next;
+       globalid++;
+       int sd = getSock2(transPrefetchSockPool, ptr->mid);
+       sendPrefetchReq(ptr, sd,globalid);
+       ptr = ptr->next;
       }
 
       /* Release socket */
@@ -1801,7 +1801,7 @@ void sendPrefetchReq(prefetchpile_t *mcpilenode, int sd, int gid) {
     char *buf=oidnoffset;
     if (first) {
       *buf=TRANS_PREFETCH;
-      buf++;len++;
+      buf++; len++;
       first=0;
     }
     *((int*)buf) = tmp->numoffset;
@@ -1870,7 +1870,7 @@ int getPrefetchResponse(int sd, struct readstruct *readbuffer) {
     if((oldptr = prehashSearch(oid)) != NULL) {
       /* If older version then update with new object ptr */
       if(((objheader_t *)oldptr)->version < ((objheader_t *)modptr)->version) {
-        prehashInsert(oid, modptr);
+       prehashInsert(oid, modptr);
       }
     } else { /* Else add the object ptr to hash table*/
       prehashInsert(oid, modptr);
@@ -2150,14 +2150,14 @@ int reqNotify(unsigned int *oidarry, unsigned short *versionarry, unsigned int n
     /* Send array of oids  */
     size = sizeof(unsigned int);
 
-    for(i = 0;i < numoid; i++) {
+    for(i = 0; i < numoid; i++) {
       oid = oidarry[i];
       *((unsigned int *)(&msg[1] + size)) = oid;
       size += sizeof(unsigned int);
     }
 
     /* Send array of version  */
-    for(i = 0;i < numoid; i++) {
+    for(i = 0; i < numoid; i++) {
       version = versionarry[i];
       *((unsigned short *)(&msg[1] + size)) = version;
       size += sizeof(unsigned short);
@@ -2345,22 +2345,22 @@ plistnode_t *pInsert(plistnode_t *pile, objheader_t *headeraddr, unsigned int mi
 
 // relocate the position of myIp pile to end of list
 plistnode_t *sortPiles(plistnode_t *pileptr) {
-       plistnode_t *ptr, *tail;
-       tail = pileptr;
+  plistnode_t *ptr, *tail;
+  tail = pileptr;
   ptr = NULL;
-       /* Get tail pointer and myIp pile ptr */
+  /* Get tail pointer and myIp pile ptr */
   if(pileptr == NULL)
     return pileptr;
 
-       while(tail->next != NULL) {
+  while(tail->next != NULL) {
     if(tail->mid == myIpAddr)
       ptr = tail;
-               tail = tail->next;    
-       }
+    tail = tail->next;
+  }
 
   // if ptr is null, then myIp pile is already at tail
   if(ptr != NULL) {
-       /* Arrange local machine processing at the end of the pile list */
+    /* Arrange local machine processing at the end of the pile list */
     tail->next = pileptr;
     pileptr = ptr->next;
     ptr->next = NULL;