X-Git-Url: http://plrg.eecs.uci.edu/git/?p=iotcloud.git;a=blobdiff_plain;f=version2%2Fsrc%2FC%2FTransaction.cc;h=d316aa10985723556b2c7c2ccc3c9ab10696d854;hp=b5b8e5074ad6a5b5a682a41dd933d30264bf24f3;hb=793c445e17d24c146e7e820a5ecb5f02a5e00ea3;hpb=18c08ca8d6f4aed6fa24003826304cd5700f7b7d diff --git a/version2/src/C/Transaction.cc b/version2/src/C/Transaction.cc index b5b8e50..d316aa1 100644 --- a/version2/src/C/Transaction.cc +++ b/version2/src/C/Transaction.cc @@ -6,7 +6,8 @@ #include "TransactionStatus.h" Transaction::Transaction() : - parts(new Hashtable()), + parts(new Vector()), + partCount(0), missingParts(NULL), partsPendingSend(new Vector()), fldisComplete(false), @@ -18,12 +19,17 @@ Transaction::Transaction() : clientLocalSequenceNumber(-1), arbitratorId(-1), machineId(-1), - transactionId(NULL), + transactionId(Pair(0,0)), + nextPartToSend(0), + flddidSendAPartToServer(false), + transactionStatus(NULL), hadServerFailure(false) { } void Transaction::addPartEncode(TransactionPart *newPart) { - parts->put(newPart->getPartNumber(), newPart); + TransactionPart *old = parts->setExpand(newPart->getPartNumber(), newPart); + if (old == NULL) + partCount++; partsPendingSend->add(newPart->getPartNumber()); sequenceNumber = newPart->getSequenceNumber(); @@ -31,7 +37,6 @@ void Transaction::addPartEncode(TransactionPart *newPart) { transactionId = newPart->getTransactionId(); clientLocalSequenceNumber = newPart->getClientLocalSequenceNumber(); machineId = newPart->getMachineId(); - fldisComplete = true; } @@ -48,11 +53,13 @@ void Transaction::addPartDecode(TransactionPart *newPart) { clientLocalSequenceNumber = newPart->getClientLocalSequenceNumber(); machineId = newPart->getMachineId(); - TransactionPart *previoslySeenPart = parts->put(newPart->getPartNumber(), newPart); + TransactionPart *previouslySeenPart = parts->setExpand(newPart->getPartNumber(), newPart); + if (previouslySeenPart == NULL) + partCount++; - if (previoslySeenPart != NULL) { + if (previouslySeenPart != NULL) { // Set dead the old one since the new one is a rescued version of this part - previoslySeenPart->setDead(); + previouslySeenPart->setDead(); } else if (newPart->isLastPart()) { missingParts = new Hashset(); hasLastPart = true; @@ -97,8 +104,10 @@ int64_t Transaction::getSequenceNumber() { void Transaction::setSequenceNumber(int64_t _sequenceNumber) { sequenceNumber = _sequenceNumber; - for (int32_t i : parts->keySet()) { - parts->get(i)->setSequenceNumber(sequenceNumber); + for (uint32_t i = 0; i < parts->size(); i++) { + TransactionPart *tp = parts->get(i); + if (tp != NULL) + tp->setSequenceNumber(sequenceNumber); } } @@ -106,7 +115,7 @@ int64_t Transaction::getClientLocalSequenceNumber() { return clientLocalSequenceNumber; } -Hashtable *Transaction::getParts() { +Vector *Transaction::getParts() { return parts; } @@ -152,8 +161,23 @@ TransactionStatus *Transaction::getTransactionStatus() { void Transaction::removeSentParts(Vector *sentParts) { nextPartToSend = 0; - if (partsPendingSend->removeAll(sentParts)) - { + bool changed = false; + uint lastusedindex = 0; + for (uint i = 0; i < partsPendingSend->size(); i++) { + int32_t parti = partsPendingSend->get(i); + for (uint j = 0; j < sentParts->size(); j++) { + int32_t partj = sentParts->get(j); + if (parti == partj) { + changed = true; + goto NextElement; + } + } + partsPendingSend->set(lastusedindex++, parti); +NextElement: + ; + } + if (changed) { + partsPendingSend->setSize(lastusedindex); flddidSendAPartToServer = true; transactionStatus->setTransactionSequenceNumber(sequenceNumber); } @@ -168,7 +192,7 @@ Hashset *Transaction::getKeyValueUpdateSet() { } int Transaction::getNumberOfParts() { - return parts->size(); + return partCount; } int64_t Transaction::getMachineId() { @@ -184,22 +208,19 @@ bool Transaction::isComplete() { } Pair *Transaction::getId() { - return transactionId; + return &transactionId; } void Transaction::setDead() { - if (isDead) { - // Already dead - return; - } - - // Set dead - isDead = true; - - // Make all the parts of this transaction dead - for (int32_t partNumber : parts->keySet()) { - TransactionPart *part = parts->get(partNumber); - part->setDead(); + if (!isDead) { + // Set dead + isDead = true; + // Make all the parts of this transaction dead + for (uint32_t partNumber = 0; partNumber < parts->size(); partNumber++) { + TransactionPart *part = parts->get(partNumber); + if (part != NULL) + part->setDead(); + } } } @@ -208,10 +229,9 @@ TransactionPart *Transaction::getPart(int index) { } void Transaction::decodeTransactionData() { - // Calculate the size of the data section int dataSize = 0; - for (int i = 0; i < parts->keySet()->size(); i++) { + for (uint i = 0; i < parts->size(); i++) { TransactionPart *tp = parts->get(i); dataSize += tp->getDataSize(); } @@ -220,7 +240,7 @@ void Transaction::decodeTransactionData() { int currentPosition = 0; // Stitch all the data sections together - for (int i = 0; i < parts->keySet()->size(); i++) { + for (uint i = 0; i < parts->size(); i++) { TransactionPart *tp = parts->get(i); System_arraycopy(tp->getData(), 0, combinedData, currentPosition, tp->getDataSize()); currentPosition += tp->getDataSize(); @@ -246,9 +266,10 @@ void Transaction::decodeTransactionData() { } } -bool Transaction::evaluateGuard(Hashtable *committedKeyValueTable, Hashtable *speculatedKeyValueTable, Hashtable *pendingTransactionSpeculatedKeyValueTable) { - for (KeyValue *kvGuard : keyValueGuardSet) { - +bool Transaction::evaluateGuard(Hashtable *committedKeyValueTable, Hashtable *speculatedKeyValueTable, Hashtable *pendingTransactionSpeculatedKeyValueTable) { + SetIterator *kvit = keyValueGuardSet->iterator(); + while (kvit->hasNext()) { + KeyValue *kvGuard = kvit->next(); // First check if the key is in the speculative table, this is the value of the latest assumption KeyValue *kv = NULL; @@ -277,15 +298,17 @@ bool Transaction::evaluateGuard(Hashtable *committedKey } else { printf("%s null\n", kvGuard->getValue()->internalBytes()->internalArray()); } - + delete kvit; return false; } } else { if (kv != NULL) { + delete kvit; return false; } } } + delete kvit; return true; }