X-Git-Url: http://plrg.eecs.uci.edu/git/?p=iotcloud.git;a=blobdiff_plain;f=version2%2Fsrc%2FC%2FTransaction.cc;h=d9dbf916156bcff2c151c7f48da9160515470f93;hp=a3083ee61ed99a69cd68560f6730a5860e7b1b4e;hb=9c3fa5cbce287df14626d262bd0179e994338869;hpb=3e54762d57367b1ce049830b42f00950055d8527 diff --git a/version2/src/C/Transaction.cc b/version2/src/C/Transaction.cc index a3083ee..d9dbf91 100644 --- a/version2/src/C/Transaction.cc +++ b/version2/src/C/Transaction.cc @@ -6,7 +6,8 @@ #include "TransactionStatus.h" Transaction::Transaction() : - parts(new Hashtable()), + parts(new Vector()), + partCount(0), missingParts(NULL), partsPendingSend(new Vector()), fldisComplete(false), @@ -18,12 +19,14 @@ Transaction::Transaction() : clientLocalSequenceNumber(-1), arbitratorId(-1), machineId(-1), - transactionId(NULL), + transactionId(Pair(0,0)), hadServerFailure(false) { } void Transaction::addPartEncode(TransactionPart *newPart) { - parts->put(newPart->getPartNumber(), newPart); + TransactionPart *old = parts->setExpand(newPart->getPartNumber(), newPart); + if (old == NULL) + partCount++; partsPendingSend->add(newPart->getPartNumber()); sequenceNumber = newPart->getSequenceNumber(); @@ -48,11 +51,13 @@ void Transaction::addPartDecode(TransactionPart *newPart) { clientLocalSequenceNumber = newPart->getClientLocalSequenceNumber(); machineId = newPart->getMachineId(); - TransactionPart *previoslySeenPart = parts->put(newPart->getPartNumber(), newPart); + TransactionPart *previouslySeenPart = parts->setExpand(newPart->getPartNumber(), newPart); + if (previouslySeenPart == NULL) + partCount++; - if (previoslySeenPart != NULL) { + if (previouslySeenPart != NULL) { // Set dead the old one since the new one is a rescued version of this part - previoslySeenPart->setDead(); + previouslySeenPart->setDead(); } else if (newPart->isLastPart()) { missingParts = new Hashset(); hasLastPart = true; @@ -97,8 +102,10 @@ int64_t Transaction::getSequenceNumber() { void Transaction::setSequenceNumber(int64_t _sequenceNumber) { sequenceNumber = _sequenceNumber; - for (int32_t i : parts->keySet()) { - parts->get(i)->setSequenceNumber(sequenceNumber); + for (int32_t i = 0; i < parts->size(); i++) { + TransactionPart *tp = parts->get(i); + if (tp != NULL) + tp->setSequenceNumber(sequenceNumber); } } @@ -106,7 +113,7 @@ int64_t Transaction::getClientLocalSequenceNumber() { return clientLocalSequenceNumber; } -Hashtable *Transaction::getParts() { +Vector *Transaction::getParts() { return parts; } @@ -152,8 +159,23 @@ TransactionStatus *Transaction::getTransactionStatus() { void Transaction::removeSentParts(Vector *sentParts) { nextPartToSend = 0; - if (partsPendingSend->removeAll(sentParts)) - { + bool changed = false; + uint lastusedindex = 0; + for (uint i = 0; i < partsPendingSend->size(); i++) { + int32_t parti = partsPendingSend->get(i); + for (uint j = 0; j < sentParts->size(); j++) { + int32_t partj = sentParts->get(j); + if (parti == partj) { + changed = true; + goto NextElement; + } + } + partsPendingSend->set(lastusedindex++, parti); +NextElement: + ; + } + if (changed) { + partsPendingSend->setSize(lastusedindex); flddidSendAPartToServer = true; transactionStatus->setTransactionSequenceNumber(sequenceNumber); } @@ -168,7 +190,7 @@ Hashset *Transaction::getKeyValueUpdateSet() { } int Transaction::getNumberOfParts() { - return parts->size(); + return partCount; } int64_t Transaction::getMachineId() { @@ -183,23 +205,20 @@ bool Transaction::isComplete() { return fldisComplete; } -Pair Transaction::getId() { - return transactionId; +Pair * Transaction::getId() { + return & transactionId; } void Transaction::setDead() { - if (isDead) { - // Already dead - return; - } - - // Set dead - isDead = true; - - // Make all the parts of this transaction dead - for (int32_t partNumber : parts->keySet()) { - TransactionPart *part = parts->get(partNumber); - part->setDead(); + if (!isDead) { + // Set dead + isDead = true; + // Make all the parts of this transaction dead + for (int32_t partNumber = 0; partNumber < parts->size(); partNumber++) { + TransactionPart *part = parts->get(partNumber); + if (part != NULL) + part->setDead(); + } } } @@ -208,10 +227,9 @@ TransactionPart *Transaction::getPart(int index) { } void Transaction::decodeTransactionData() { - // Calculate the size of the data section int dataSize = 0; - for (int i = 0; i < parts->keySet()->size(); i++) { + for (int i = 0; i < parts->size(); i++) { TransactionPart *tp = parts->get(i); dataSize += tp->getDataSize(); } @@ -220,7 +238,7 @@ void Transaction::decodeTransactionData() { int currentPosition = 0; // Stitch all the data sections together - for (int i = 0; i < parts->keySet()->size(); i++) { + for (int i = 0; i < parts->size(); i++) { TransactionPart *tp = parts->get(i); System_arraycopy(tp->getData(), 0, combinedData, currentPosition, tp->getDataSize()); currentPosition += tp->getDataSize(); @@ -247,8 +265,9 @@ void Transaction::decodeTransactionData() { } bool Transaction::evaluateGuard(Hashtable *committedKeyValueTable, Hashtable *speculatedKeyValueTable, Hashtable *pendingTransactionSpeculatedKeyValueTable) { - for (KeyValue *kvGuard : keyValueGuardSet) { - + SetIterator *kvit = keyValueGuardSet->iterator(); + while (kvit->hasNext()) { + KeyValue *kvGuard = kvit->next(); // First check if the key is in the speculative table, this is the value of the latest assumption KeyValue *kv = NULL; @@ -277,15 +296,17 @@ bool Transaction::evaluateGuard(Hashtable *committedKey } else { printf("%s null\n", kvGuard->getValue()->internalBytes()->internalArray()); } - + delete kvit; return false; } } else { if (kv != NULL) { + delete kvit; return false; } } } + delete kvit; return true; }