X-Git-Url: http://plrg.eecs.uci.edu/git/?a=blobdiff_plain;f=version2%2Fsrc%2FC%2FTable.cc;h=835b9db7b4e4a44fcd6ee52571af0073fe3f4781;hb=25a3da33ee1648fa13408c12564a54eb7c0ee3a7;hp=344c62ddbcf07ffe09c25d75b53f6718e31783e7;hpb=672eac8848e8211fb5103815fa7a2a2d7713bb7b;p=iotcloud.git diff --git a/version2/src/C/Table.cc b/version2/src/C/Table.cc index 344c62d..835b9db 100644 --- a/version2/src/C/Table.cc +++ b/version2/src/C/Table.cc @@ -246,7 +246,7 @@ int64_t Table::getArbitrator(IoTString *key) { } void Table::close() { - cloud->close(); + cloud->closeCloud(); } IoTString *Table::getCommitted(IoTString *key) { @@ -1388,7 +1388,6 @@ void Table::doOptionalRescue(Slot *s, bool seenliveslot, int64_t seqn, bool resi * for SKIP_THRESHOLD consecutive entries*/ int skipcount = 0; int64_t newestseqnum = buffer->getNewestSeqNum(); -search: for (; seqn <= newestseqnum; seqn++) { Slot *prevslot = buffer->getSlot(seqn); //Push slot number forward @@ -1743,17 +1742,24 @@ void Table::arbitrateFromServer() { localArbitrationSequenceNumber++; // Add all the new keys to the commit - for (KeyValue *kv : speculativeTableTmp->values()) { + SetIterator * spit = getKeyIterator(speculativeTableTmp); + while(spit->hasNext()) { + IoTString * string = spit->next(); + KeyValue * kv = speculativeTableTmp->get(string); newCommit->addKV(kv); } - + delete spit; + // create the commit parts newCommit->createCommitParts(); // Append all the commit parts to the end of the pending queue // waiting for sending to the server // Insert the commit so we can process it - for (CommitPart *commitPart : newCommit->getParts()->values()) { + Vector * parts = newCommit->getParts(); + uint partsSize = parts->size(); + for(uint i=0; iget(i); processEntry(commitPart); } } @@ -1765,7 +1771,10 @@ void Table::arbitrateFromServer() { if (compactArbitrationData()) { ArbitrationRound *newArbitrationRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1); if (newArbitrationRound->getCommit() != NULL) { - for (CommitPart *commitPart : newArbitrationRound->getCommit()->getParts()->values()) { + Vector * parts = newArbitrationRound->getCommit()->getParts(); + uint partsSize = parts->size(); + for(uint i=0; iget(i); processEntry(commitPart); } } @@ -1789,7 +1798,7 @@ Pair Table::arbitrateOnLocalTransaction(Transaction *transaction) { if (transaction->getMachineId() != localMachineId) { // dont do this check for local transactions - if (lastTransactionSeenFromMachineFromServer->get(transaction->getMachineId()) != NULL) { + if (lastTransactionSeenFromMachineFromServer->contains(transaction->getMachineId())) { if (lastTransactionSeenFromMachineFromServer->get(transaction->getMachineId()) > transaction->getClientLocalSequenceNumber()) { // We've have already seen this from the server return Pair(false, false); @@ -1821,12 +1830,18 @@ Pair Table::arbitrateOnLocalTransaction(Transaction *transaction) { if (compactArbitrationData()) { ArbitrationRound *newArbitrationRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1); - for (CommitPart *commitPart : newArbitrationRound->getCommit()->getParts()->values()) { + Vector * parts = newArbitrationRound->getCommit()->getParts(); + uint partsSize = parts->size(); + for(uint i=0; iget(i); processEntry(commitPart); } } else { // Insert the commit so we can process it - for (CommitPart *commitPart : newCommit->getParts()->values()) { + Vector * parts = newCommit->getParts(); + uint partsSize = parts->size(); + for(uint i=0; iget(i); processEntry(commitPart); } } @@ -1868,7 +1883,11 @@ Pair Table::arbitrateOnLocalTransaction(Transaction *transaction) { if (compactArbitrationData()) { ArbitrationRound *newArbitrationRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1); - for (CommitPart *commitPart : newArbitrationRound->getCommit()->getParts()->values()) { + + Vector * parts = newArbitrationRound->getCommit()->getParts(); + uint partsSize = parts->size(); + for(uint i=0; iget(i); processEntry(commitPart); } } @@ -1978,11 +1997,15 @@ bool Table::updateCommittedTable() { } // Iterate through all the machine Ids that we received new parts for - for (int64_t machineId : newCommitParts->keySet()) { - Hashtable *, CommitPart *> *parts = newCommitParts->get(machineId); + SetIterator *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *> * partsit=getKeyIterator(newCommitParts); + while(partsit->hasNext()) { + int64_t machineId = partsit->next(); + Hashtable *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *parts = newCommitParts->get(machineId); // Iterate through all the parts for that machine Id - for (Pair partId : parts->keySet()) { + SetIterator *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> * pairit=getKeyIterator(parts); + while(pairit->hasNext()) { + Pair * partId = pairit->next(); CommitPart *part = parts->get(partId); // Get the transaction object for that sequence number @@ -2007,8 +2030,10 @@ bool Table::updateCommittedTable() { // Add that part to the commit commit->addPartDecode(part); } + delete pairit; } - + delete partsit; + // Clear all the new commits parts in preparation for the next time // the server sends slots newCommitParts->clear(); @@ -2017,18 +2042,27 @@ bool Table::updateCommittedTable() { bool didProcessANewCommit = false; // Process the commits one by one - for (int64_T arbitratorId : liveCommitsTable->keySet()) { + SetIterator *> * liveit = getKeyIterator(liveCommitsTable); + while (liveit->hasNext()) { + int64_t arbitratorId = liveit->next(); // Get all the commits for a specific arbitrator Hashtable *commitForClientTable = liveCommitsTable->get(arbitratorId); // Sort the commits in order - Vector *commitSequenceNumbers = new Vector(commitForClientTable->keySet()); + Vector *commitSequenceNumbers = new Vector(); + { + SetIterator * clientit = getKeyIterator(commitForClientTable); + while(clientit->hasNext()) + commitSequenceNumbers->add(clientit->next()); + delete clientit; + } + qsort(commitSequenceNumbers->expose(), commitSequenceNumbers->size(), sizeof(int64_t), compareInt64); // Get the last commit seen from this arbitrator int64_t lastCommitSeenSequenceNumber = -1; - if (lastCommitSeenSequenceNumberByArbitratorTable->get(arbitratorId) != NULL) { + if (lastCommitSeenSequenceNumberByArbitratorTable->contains(arbitratorId)) { lastCommitSeenSequenceNumber = lastCommitSeenSequenceNumberByArbitratorTable->get(arbitratorId); } @@ -2056,10 +2090,8 @@ bool Table::updateCommittedTable() { // Update the last transaction that was updated if we can if (commit->getTransactionSequenceNumber() != -1) { - int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(commit->getMachineId()); - - // Update the last transaction sequence number that the arbitrator arbitrated on - if ((lastTransactionNumber == NULL) || (lastTransactionNumber < commit->getTransactionSequenceNumber())) { + // Update the last transaction sequence number that the arbitrator arbitrated on1 + if (!lastArbitratedTransactionNumberByArbitratorTable->contains(commit->getMachineId()) || lastArbitratedTransactionNumberByArbitratorTable->get(commit->getMachineId()) < commit->getTransactionSequenceNumber()) { lastArbitratedTransactionNumberByArbitratorTable->put(commit->getMachineId(), commit->getTransactionSequenceNumber()); } } @@ -2083,9 +2115,8 @@ bool Table::updateCommittedTable() { // Update the last transaction that was updated if we can if (commit->getTransactionSequenceNumber() != -1) { int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(commit->getMachineId()); - - // Update the last transaction sequence number that the arbitrator arbitrated on - if ((lastTransactionNumber == NULL) || (lastTransactionNumber < commit->getTransactionSequenceNumber())) { + if (!lastArbitratedTransactionNumberByArbitratorTable->contains(commit->getMachineId()) || + lastArbitratedTransactionNumberByArbitratorTable->get(commit->getMachineId()) < commit->getTransactionSequenceNumber()) { lastArbitratedTransactionNumberByArbitratorTable->put(commit->getMachineId(), commit->getTransactionSequenceNumber()); } } @@ -2102,14 +2133,17 @@ bool Table::updateCommittedTable() { SetIterator *kvit = commit->getKeyValueUpdateSet()->iterator(); while (kvit->hasNext()) { KeyValue *kv = kvit->next(); - commitsToEdit->add(liveCommitsByKeyTable->get(kv->getKey())); + Commit * commit = liveCommitsByKeyTable->get(kv->getKey()); + if (commit != NULL) + commitsToEdit->add(commit); } delete kvit; } - commitsToEdit->remove(NULL); // remove NULL since it could be in this set // Update each previous commit that needs to be updated - for (Commit *previousCommit : commitsToEdit) { + SetIterator * commitit = commitsToEdit->iterator(); + while(commitit->hasNext()) { + Commit *previousCommit = commitit->next(); // Only bother with live commits (TODO: Maybe remove this check) if (previousCommit->isLive()) { @@ -2126,13 +2160,14 @@ bool Table::updateCommittedTable() { // if the commit is now dead then remove it if (!previousCommit->isLive()) { - commitForClientTable->remove(previousCommit); + commitForClientTable->remove(previousCommit->getSequenceNumber()); } } } + delete commitit; // Update the last seen sequence number from this arbitrator - if (lastCommitSeenSequenceNumberByArbitratorTable->get(commit->getMachineId()) != NULL) { + if (lastCommitSeenSequenceNumberByArbitratorTable->contains(commit->getMachineId())) { if (commit->getSequenceNumber() > lastCommitSeenSequenceNumberByArbitratorTable->get(commit->getMachineId())) { lastCommitSeenSequenceNumberByArbitratorTable->put(commit->getMachineId(), commit->getSequenceNumber()); } @@ -2153,8 +2188,9 @@ bool Table::updateCommittedTable() { } delete kvit; } - } - } + } +} +delete liveit; return didProcessANewCommit; } @@ -2164,15 +2200,22 @@ bool Table::updateCommittedTable() { * and have come from the cloud */ bool Table::updateSpeculativeTable(bool didProcessNewCommits) { - if (liveTransactionBySequenceNumberTable->keySet()->size() == 0) { + if (liveTransactionBySequenceNumberTable->size() == 0) { // There is nothing to speculate on return false; } // Create a list of the transaction sequence numbers and sort them // from oldest to newest - Vector *transactionSequenceNumbersSorted = new Vector(liveTransactionBySequenceNumberTable->keySet()); - qsort(transactionSequenceNumberSorted->expose(), transactionSequenceNumbersSorted->size(), sizeof(int64_t), compareInt64); + Vector *transactionSequenceNumbersSorted = new Vector(); + { + SetIterator * trit = getKeyIterator(liveTransactionBySequenceNumberTable); + while(trit->hasNext()) + transactionSequenceNumbersSorted->add(trit->next()); + delete trit; + } + + qsort(transactionSequenceNumbersSorted->expose(), transactionSequenceNumbersSorted->size(), sizeof(int64_t), compareInt64); bool hasGapInTransactionSequenceNumbers = transactionSequenceNumbersSorted->get(0) != oldestTransactionSequenceNumberSpeculatedOn; @@ -2187,15 +2230,19 @@ bool Table::updateSpeculativeTable(bool didProcessNewCommits) { speculatedKeyValueTable->clear(); lastTransactionSequenceNumberSpeculatedOn = -1; oldestTransactionSequenceNumberSpeculatedOn = -1; - } // Remember the front of the transaction list oldestTransactionSequenceNumberSpeculatedOn = transactionSequenceNumbersSorted->get(0); // Find where to start arbitration from - int startIndex = transactionSequenceNumbersSorted->indexOf(lastTransactionSequenceNumberSpeculatedOn) + 1; + int startIndex = 0; + for(; startIndex < transactionSequenceNumbersSorted->size(); startIndex++) + if (transactionSequenceNumbersSorted->get(startIndex) == lastTransactionSequenceNumberSpeculatedOn) + break; + startIndex++; + if (startIndex >= transactionSequenceNumbersSorted->size()) { // Make sure we are not out of bounds return false; // did not speculate @@ -2264,8 +2311,12 @@ void Table::updatePendingTransactionSpeculativeTable(bool didProcessNewCommitsOr } // Find where to start arbitration from - int startIndex = pendingTransactionQueue->indexOf(firstPendingTransaction) + 1; + int startIndex = 0; + for(; startIndex < pendingTransactionQueue->size(); startIndex++) + if (pendingTransactionQueue->get(startIndex) == firstPendingTransaction) + break; + if (startIndex >= pendingTransactionQueue->size()) { // Make sure we are not out of bounds return; @@ -2293,38 +2344,44 @@ void Table::updatePendingTransactionSpeculativeTable(bool didProcessNewCommitsOr * transactions that are dead */ void Table::updateLiveTransactionsAndStatus() { - // Go through each of the transactions - for (IteratorEntry > *iter = liveTransactionBySequenceNumberTable->entrySet()->iterator(); iter->hasNext();) { - Transaction *transaction = iter->next()->getValue(); - - // Check if the transaction is dead - int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(transaction->getArbitrator()); - if ((lastTransactionNumber != NULL) && (lastTransactionNumber >= transaction->getSequenceNumber())) { - - // Set dead the transaction - transaction->setDead(); - - // Remove the transaction from the live table - iter->remove(); - liveTransactionByTransactionIdTable->remove(transaction->getId()); + { + SetIterator * iter = getKeyIterator(liveTransactionBySequenceNumberTable); + while(iter->hasNext()) { + int64_t key = iter->next(); + Transaction *transaction = liveTransactionBySequenceNumberTable->get(key); + + // Check if the transaction is dead + if (lastArbitratedTransactionNumberByArbitratorTable->contains(transaction->getArbitrator()) && lastArbitratedTransactionNumberByArbitratorTable->get(transaction->getArbitrator() >= transaction->getSequenceNumber())) { + // Set dead the transaction + transaction->setDead(); + + // Remove the transaction from the live table + iter->remove(); + liveTransactionByTransactionIdTable->remove(transaction->getId()); + } } + delete iter; } - + // Go through each of the transactions - for (IteratorEntry > *iter = outstandingTransactionStatus->entrySet()->iterator(); iter->hasNext();) { - TransactionStatus *status = iter->next()->getValue(); - - // Check if the transaction is dead - int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(status->getTransactionArbitrator()); - if ((lastTransactionNumber != NULL) && (lastTransactionNumber >= status->getTransactionSequenceNumber())) { - - // Set committed - status->setStatus(TransactionStatus_StatusCommitted); - - // Remove - iter->remove(); + { + SetIterator * iter = getKeyIterator(outstandingTransactionStatus); + while(iter->hasNext()) { + int64_t key = iter->next(); + TransactionStatus *status = outstandingTransactionStatus->get(key); + + // Check if the transaction is dead + if (lastArbitratedTransactionNumberByArbitratorTable->contains(status->getTransactionArbitrator()) && (lastArbitratedTransactionNumberByArbitratorTable->get(status->getTransactionArbitrator()) >= status->getTransactionSequenceNumber())) { + + // Set committed + status->setStatus(TransactionStatus_StatusCommitted); + + // Remove + iter->remove(); + } } + delete iter; } } @@ -2444,9 +2501,10 @@ void Table::processEntry(RejectedMessage *entry, SlotIndexer *indexer) { // Create a list of clients to watch until they see this rejected // message entry-> Hashset *deviceWatchSet = new Hashset(); - for (Map->Entry > *lastMessageEntry : lastMessageTable->entrySet()) { + SetIterator *> * iter = getKeyIterator(lastMessageTable); + while(iter->hasNext()) { // Machine ID for the last message entry - int64_t lastMessageEntryMachineId = lastMessageEntry->getKey(); + int64_t lastMessageEntryMachineId = iter->next(); // We've seen it, don't need to continue to watch-> Our next // message will implicitly acknowledge it-> @@ -2454,8 +2512,8 @@ void Table::processEntry(RejectedMessage *entry, SlotIndexer *indexer) { continue; } - Pair lastMessageValue = lastMessageEntry->getValue(); - int64_t entrySequenceNumber = lastMessageValue.getFirst(); + Pair * lastMessageValue = lastMessageTable->get(lastMessageEntryMachineId); + int64_t entrySequenceNumber = lastMessageValue->getFirst(); if (entrySequenceNumber < seq) { // Add this rejected message to the set of messages that this @@ -2466,6 +2524,8 @@ void Table::processEntry(RejectedMessage *entry, SlotIndexer *indexer) { deviceWatchSet->add(lastMessageEntryMachineId); } } + delete iter; + if (deviceWatchSet->isEmpty()) { // This rejected message has been seen by all the clients so entry->setDead(); @@ -2490,7 +2550,8 @@ void Table::processEntry(Abort *entry) { // Abort has not been seen by the client it is for yet so we need to // keep track of it - Abort *previouslySeenAbort = liveAbortTable->put(entry->getAbortId(), entry); + + Abort *previouslySeenAbort = liveAbortTable->put(new Pair(entry->getAbortId()), entry); if (previouslySeenAbort != NULL) { previouslySeenAbort->setDead(); // Delete old version of the abort since we got a rescued newer version } @@ -2499,10 +2560,11 @@ void Table::processEntry(Abort *entry) { liveAbortsGeneratedByLocal->put(entry->getArbitratorLocalSequenceNumber(), entry); } - if ((entry->getSequenceNumber() != -1) && (lastMessageTable->get(entry->getTransactionMachineId()).getFirst() >= entry->getSequenceNumber())) { + if ((entry->getSequenceNumber() != -1) && (lastMessageTable->get(entry->getTransactionMachineId())->getFirst() >= entry->getSequenceNumber())) { // The machine already saw this so it is dead entry->setDead(); - liveAbortTable->remove(&entry->getAbortId()); + Pair abortid = entry->getAbortId(); + liveAbortTable->remove(&abortid); if (entry->getTransactionArbitrator() == localMachineId) { liveAbortsGeneratedByLocal->remove(entry->getArbitratorLocalSequenceNumber()); @@ -2523,15 +2585,17 @@ void Table::processEntry(Abort *entry) { } // Set dead a transaction if we can - Transaction *transactionToSetDead = liveTransactionByTransactionIdTable->remove(Pair(entry->getTransactionMachineId(), entry->getTransactionClientLocalSequenceNumber())); + Pair deadPair = Pair(entry->getTransactionMachineId(), entry->getTransactionClientLocalSequenceNumber()); + + Transaction *transactionToSetDead = liveTransactionByTransactionIdTable->remove(&deadPair); if (transactionToSetDead != NULL) { liveTransactionBySequenceNumberTable->remove(transactionToSetDead->getSequenceNumber()); } // Update the last transaction sequence number that the arbitrator // arbitrated on - int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(entry->getTransactionArbitrator()); - if ((lastTransactionNumber == NULL) || (lastTransactionNumber < entry->getTransactionSequenceNumber())) { + if (!lastArbitratedTransactionNumberByArbitratorTable->contains(entry->getTransactionArbitrator()) || + (lastArbitratedTransactionNumberByArbitratorTable->get(entry->getTransactionArbitrator()) < entry->getTransactionSequenceNumber())) { // Is a valid one if (entry->getTransactionSequenceNumber() != -1) { lastArbitratedTransactionNumberByArbitratorTable->put(entry->getTransactionArbitrator(), entry->getTransactionSequenceNumber()); @@ -2546,8 +2610,7 @@ void Table::processEntry(Abort *entry) { void Table::processEntry(TransactionPart *entry) { // Check if we have already seen this transaction and set it dead OR // if it is not alive - int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(entry->getArbitratorId()); - if ((lastTransactionNumber != NULL) && (lastTransactionNumber >= entry->getSequenceNumber())) { + if (lastArbitratedTransactionNumberByArbitratorTable->contains(entry->getArbitratorId()) && (lastArbitratedTransactionNumberByArbitratorTable->get(entry->getArbitratorId()) >= entry->getSequenceNumber())) { // This transaction is dead, it was already committed or aborted entry->setDead(); return; @@ -2564,7 +2627,7 @@ void Table::processEntry(TransactionPart *entry) { // Update the part and set dead ones we have already seen (got a // rescued version) - TransactionPart *previouslySeenPart = transactionPart->put(entry->getPartId(), entry); + TransactionPart *previouslySeenPart = transactionPart->put(new Pair(entry->getPartId()), entry); if (previouslySeenPart != NULL) { previouslySeenPart->setDead(); } @@ -2576,23 +2639,20 @@ void Table::processEntry(TransactionPart *entry) { void Table::processEntry(CommitPart *entry) { // Update the last transaction that was updated if we can if (entry->getTransactionSequenceNumber() != -1) { - int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(entry->getMachineId()); - // Update the last transaction sequence number that the arbitrator - // arbitrated on - if ((lastTransactionNumber == NULL) || (lastTransactionNumber < entry->getTransactionSequenceNumber())) { + if (!lastArbitratedTransactionNumberByArbitratorTable->contains(entry->getMachineId() || lastArbitratedTransactionNumberByArbitratorTable->get(entry->getMachineId()) < entry->getTransactionSequenceNumber())) { lastArbitratedTransactionNumberByArbitratorTable->put(entry->getMachineId(), entry->getTransactionSequenceNumber()); } } - Hashtable *, CommitPart *> *commitPart = newCommitParts->get(entry->getMachineId()); + Hashtable *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *commitPart = newCommitParts->get(entry->getMachineId()); if (commitPart == NULL) { // Don't have a table for this machine Id yet so make one - commitPart = new Hashtable *, CommitPart *>(); + commitPart = new Hashtable *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals>(); newCommitParts->put(entry->getMachineId(), commitPart); } // Update the part and set dead ones we have already seen (got a // rescued version) - CommitPart *previouslySeenPart = commitPart->put(entry->getPartId(), entry); + CommitPart *previouslySeenPart = commitPart->put(new Pair(entry->getPartId()), entry); if (previouslySeenPart != NULL) { previouslySeenPart->setDead(); } @@ -2632,16 +2692,20 @@ void Table::updateLastMessage(int64_t machineId, int64_t seqNum, Liveness *liven } // Set dead the abort - for (IteratorEntry, Abort *> > i = liveAbortTable->entrySet()->iterator(); i->hasNext();) { - Abort *abort = i->next()->getValue(); + SetIterator *, Abort *, uintptr_t, 0, pairHashFunction, pairEquals> * abortit = getKeyIterator(liveAbortTable); + + while(abortit->hasNext()) { + Pair * key = abortit->next(); + Abort *abort = liveAbortTable->get(key); if ((abort->getTransactionMachineId() == machineId) && (abort->getSequenceNumber() <= seqNum)) { abort->setDead(); - i->remove(); + abortit->remove(); if (abort->getTransactionArbitrator() == localMachineId) { liveAbortsGeneratedByLocal->remove(abort->getArbitratorLocalSequenceNumber()); } } } + delete abortit; if (machineId == localMachineId) { // Our own messages are immediately dead-> char livenessType = liveness->getType();