__thread struct timespec exponential_backoff;
__thread int count_exponential_backoff;
__thread const int max_exponential_backoff = 1000; // safety limit
+#ifdef SANDBOX
__thread int trans_allocation_bytes;
+#endif
#ifdef ABORTREADERS
plistnode_t *createPiles();
plistnode_t *sortPiles(plistnode_t *pileptr);
-
-
/*******************************
* Send and Recv function calls
*******************************/
return max;
}
-//#define INLINEPREFETCH
+#define INLINEPREFETCH
#define PREFTHRESHOLD 0
/* This function is a prefetch call generated by the compiler that
t_cache = objstrCreate(1048576);
t_chashCreate(CHASH_SIZE, CLOADFACTOR);
revertlist=NULL;
+#ifdef SANDBOX
trans_allocation_bytes = 0;
+#endif
#ifdef ABORTREADERS
t_abort=0;
#endif
tmp->rcount = 1;
STATUS(tmp) = NEW;
t_chashInsert(OID(tmp), tmp);
+#ifdef SANDBOX
trans_allocation_bytes += size;
/* Validate the read set if allocation is exceeds threshold */
if(trans_allocation_bytes > MEM_ALLOC_THRESHOLD) {
check_mem_alloc();
}
+#endif
#ifdef COMPILER
return &tmp[1]; //want space after object header
* Sends a transrequest() to each remote machines for objects found remotely
* and calls handleLocalReq() to process objects found locally */
int transCommit() {
+ //char buffer[30];
unsigned int tot_bytes_mod, *listmid;
plistnode_t *pile, *pile_ptr;
char treplyretry; /* keeps track of the common response that needs to be sent */
#ifdef SANDBOX
abortenabled=0;
#endif
+ struct writestruct writebuffer;
+ writebuffer.offset=0;
#ifdef LOGEVENTS
int iii;
}
socklist[sockindex] = sd;
/* Send bytes of data with TRANS_REQUEST control message */
- send_data(sd, &(tosend[sockindex].f), sizeof(fixed_data_t));
+ send_buf(sd, &writebuffer, &(tosend[sockindex].f), sizeof(fixed_data_t));
/* Send list of machines involved in the transaction */
{
int size=sizeof(unsigned int)*(tosend[sockindex].f.mcount);
- send_data(sd, tosend[sockindex].listmid, size);
+ send_buf(sd, &writebuffer, tosend[sockindex].listmid, size);
}
/* Send oids and version number tuples for objects that are read */
{
int size=(sizeof(unsigned int)+sizeof(unsigned short))*(tosend[sockindex].f.numread);
- send_data(sd, tosend[sockindex].objread, size);
+ send_buf(sd, &writebuffer, tosend[sockindex].objread, size);
}
/* Send objects that are modified */
memcpy(modptr+offset, headeraddr, size);
offset+=size;
}
- send_data(sd, modptr, tosend[sockindex].f.sum_bytes);
+ forcesend_buf(sd, &writebuffer, modptr, tosend[sockindex].f.sum_bytes);
free(modptr);
} else { //handle request locally
handleLocalReq(&tosend[sockindex], &transinfo, &getReplyCtrl[sockindex]);
sockindex++;
pile = pile->next;
} //end of pile processing
+
/* Recv Ctrl msgs from all machines */
int i;
for(i = 0; i < pilecount; i++) {
#endif
}
}
+
/* Decide the final response */
if((finalResponse = decideResponse(getReplyCtrl, &treplyretry, pilecount)) == 0) {
printf("Error: %s() in updating prefetch cache %s, %d\n", __func__, __FILE__, __LINE__);
return 1;
}
+#ifdef CACHE
+ if (finalResponse == TRANS_COMMIT) {
+ /* Invalidate objects in other machine cache */
+ int retval;
+ if((retval = invalidateObj(tosend, pilecount,finalResponse,socklist)) != 0) {
+ printf("Error: %s() in invalidating Objects %s, %d\n", __func__, __FILE__, __LINE__);
+ free(tosend);
+ free(listmid);
+ return 1;
+ }
+ }
+#endif
/* Send responses to all machines */
for(i = 0; i < pilecount; i++) {
int sd = socklist[i];
free(listmid);
return 1;
}
-
-
- /* Invalidate objects in other machine cache */
- if(tosend[i].f.nummod > 0) {
- if((retval = invalidateObj(&(tosend[i]))) != 0) {
- printf("Error: %s() in invalidating Objects %s, %d\n", __func__, __FILE__, __LINE__);
- free(tosend);
- free(listmid);
- return 1;
- }
- }
#ifdef ABORTREADERS
removetransaction(tosend[i].oidmod,tosend[i].f.nummod);
removethisreadtransaction(tosend[i].objread, tosend[i].f.numread);
/* wait a random amount of time before retrying to commit transaction*/
if(treplyretry) {
treplyretryCount++;
- if(treplyretryCount >= NUM_TRY_TO_COMMIT)
- exponentialdelay();
- else
- randomdelay();
+ // if(treplyretryCount >= NUM_TRY_TO_COMMIT)
+ // exponentialdelay();
+ // else
+ randomdelay();
#ifdef TRANSSTATS
nSoftAbort++;
#endif
/* Retry trans commit procedure during soft_abort case */
} while (treplyretry);
- exponential_backoff.tv_sec = 0;
- exponential_backoff.tv_nsec = (long)(10000);//10 microsec_
-
if(finalResponse == TRANS_ABORT) {
#ifdef TRANSSTATS
LOGEVENT('A');
transinfo->modptr = NULL;
transinfo->numlocked = numoidlocked;
transinfo->numnotfound = numoidnotfound;
-
+
/* Condition to send TRANS_AGREE */
if(v_matchnolock == tdata->f.numread + tdata->f.nummod) {
*getReplyCtrl = TRANS_AGREE;
return;
}
} else if(finalResponse == TRANS_COMMIT) {
-#ifdef CACHE
- /* Invalidate objects in other machine cache */
- if(tdata->f.nummod > 0) {
- int retval;
- if((retval = invalidateObj(tdata)) != 0) {
- printf("Error: %s() in invalidating Objects %s, %d\n", __func__, __FILE__, __LINE__);
- return;
- }
- }
-#endif
if(transComProcess(tdata, transinfo) != 0) {
printf("Error in transComProcess() %s,%d\n", __FILE__, __LINE__);
fflush(stdout);
int sock;
struct sockaddr_in remoteAddr;
char msg[1 + sizeof(unsigned int)];
- int bytesSent;
+ //int bytesSent;
int status;
if ((sock = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
struct sockaddr_in remoteAddr;
char msg[1 + numoid * (sizeof(unsigned short) + sizeof(unsigned int)) + 3 * sizeof(unsigned int)];
char *ptr;
- int bytesSent;
+ //int bytesSent;
int status, size;
unsigned short version;
unsigned int oid,mid;
unsigned int mid;
struct sockaddr_in remoteAddr;
char msg[1 + sizeof(unsigned short) + 2*sizeof(unsigned int)];
- int sock, status, size, bytesSent;
+ int sock, status, size;
+ //int bytesSent;
while(*head != NULL) {
ptr = *head;