From c85d46ac3c6022a4685e8f27723119111cc2fc4d Mon Sep 17 00:00:00 2001 From: bdemsky Date: Wed, 28 Feb 2018 08:53:43 -0800 Subject: [PATCH] edits --- version2/src/C/Table.cc | 78 +++++++++++++++++++++++++++-------------- 1 file changed, 51 insertions(+), 27 deletions(-) diff --git a/version2/src/C/Table.cc b/version2/src/C/Table.cc index 344c62d..ce70324 100644 --- a/version2/src/C/Table.cc +++ b/version2/src/C/Table.cc @@ -1743,17 +1743,24 @@ void Table::arbitrateFromServer() { localArbitrationSequenceNumber++; // Add all the new keys to the commit - for (KeyValue *kv : speculativeTableTmp->values()) { + SetIterator * spit = getKeyIterator(speculativeTableTmp); + while(spit->hasNext()) { + IoTString * string = spit->next(); + KeyValue * kv = speculativeTableTmp->get(string); newCommit->addKV(kv); } - + delete spit; + // create the commit parts newCommit->createCommitParts(); // Append all the commit parts to the end of the pending queue // waiting for sending to the server // Insert the commit so we can process it - for (CommitPart *commitPart : newCommit->getParts()->values()) { + Vector * parts = newCommit->getParts(); + uint partsSize = parts->size(); + for(uint i=0; iget(i); processEntry(commitPart); } } @@ -1765,7 +1772,10 @@ void Table::arbitrateFromServer() { if (compactArbitrationData()) { ArbitrationRound *newArbitrationRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1); if (newArbitrationRound->getCommit() != NULL) { - for (CommitPart *commitPart : newArbitrationRound->getCommit()->getParts()->values()) { + Vector * parts = newArbitrationRound->getCommit()->getParts(); + uint partsSize = parts->size(); + for(uint i=0; iget(i); processEntry(commitPart); } } @@ -1789,7 +1799,7 @@ Pair Table::arbitrateOnLocalTransaction(Transaction *transaction) { if (transaction->getMachineId() != localMachineId) { // dont do this check for local transactions - if (lastTransactionSeenFromMachineFromServer->get(transaction->getMachineId()) != NULL) { + if (lastTransactionSeenFromMachineFromServer->contains(transaction->getMachineId())) { if (lastTransactionSeenFromMachineFromServer->get(transaction->getMachineId()) > transaction->getClientLocalSequenceNumber()) { // We've have already seen this from the server return Pair(false, false); @@ -1821,12 +1831,18 @@ Pair Table::arbitrateOnLocalTransaction(Transaction *transaction) { if (compactArbitrationData()) { ArbitrationRound *newArbitrationRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1); - for (CommitPart *commitPart : newArbitrationRound->getCommit()->getParts()->values()) { + Vector * parts = newArbitrationRound->getCommit()->getParts(); + uint partsSize = parts->size(); + for(uint i=0; iget(i); processEntry(commitPart); } } else { // Insert the commit so we can process it - for (CommitPart *commitPart : newCommit->getParts()->values()) { + Vector * parts = newCommit->getParts(); + uint partsSize = parts->size(); + for(uint i=0; iget(i); processEntry(commitPart); } } @@ -1868,7 +1884,11 @@ Pair Table::arbitrateOnLocalTransaction(Transaction *transaction) { if (compactArbitrationData()) { ArbitrationRound *newArbitrationRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1); - for (CommitPart *commitPart : newArbitrationRound->getCommit()->getParts()->values()) { + + Vector * parts = newArbitrationRound->getCommit()->getParts(); + uint partsSize = parts->size(); + for(uint i=0; iget(i); processEntry(commitPart); } } @@ -1978,11 +1998,15 @@ bool Table::updateCommittedTable() { } // Iterate through all the machine Ids that we received new parts for - for (int64_t machineId : newCommitParts->keySet()) { - Hashtable *, CommitPart *> *parts = newCommitParts->get(machineId); + SetIterator *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *> * partsit=getKeyIterator(newCommitParts); + while(partsit->hasNext()) { + int64_t machineId = partsit->next(); + Hashtable *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *parts = newCommitParts->get(machineId); // Iterate through all the parts for that machine Id - for (Pair partId : parts->keySet()) { + SetIterator *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> * pairit=getKeyIterator(parts); + while(pairit->hasNext()) { + Pair * partId = pairit->next(); CommitPart *part = parts->get(partId); // Get the transaction object for that sequence number @@ -2007,8 +2031,10 @@ bool Table::updateCommittedTable() { // Add that part to the commit commit->addPartDecode(part); } + delete pairit; } - + delete partsit; + // Clear all the new commits parts in preparation for the next time // the server sends slots newCommitParts->clear(); @@ -2017,7 +2043,7 @@ bool Table::updateCommittedTable() { bool didProcessANewCommit = false; // Process the commits one by one - for (int64_T arbitratorId : liveCommitsTable->keySet()) { + for (int64_t arbitratorId : liveCommitsTable->keySet()) { // Get all the commits for a specific arbitrator Hashtable *commitForClientTable = liveCommitsTable->get(arbitratorId); @@ -2056,10 +2082,8 @@ bool Table::updateCommittedTable() { // Update the last transaction that was updated if we can if (commit->getTransactionSequenceNumber() != -1) { - int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(commit->getMachineId()); - - // Update the last transaction sequence number that the arbitrator arbitrated on - if ((lastTransactionNumber == NULL) || (lastTransactionNumber < commit->getTransactionSequenceNumber())) { + // Update the last transaction sequence number that the arbitrator arbitrated on1 + if (!lastArbitratedTransactionNumberByArbitratorTable->contains(commit->getMachineId()) || lastArbitratedTransactionNumberByArbitratorTable->get(commit->getMachineId()) < commit->getTransactionSequenceNumber())) { lastArbitratedTransactionNumberByArbitratorTable->put(commit->getMachineId(), commit->getTransactionSequenceNumber()); } } @@ -2109,7 +2133,9 @@ bool Table::updateCommittedTable() { commitsToEdit->remove(NULL); // remove NULL since it could be in this set // Update each previous commit that needs to be updated - for (Commit *previousCommit : commitsToEdit) { + SetIterator * commitit = commitsToEdit->iterator(); + while(commitit->hasNext()) { + Commit *previousCommit = commitit->next(); // Only bother with live commits (TODO: Maybe remove this check) if (previousCommit->isLive()) { @@ -2130,9 +2156,10 @@ bool Table::updateCommittedTable() { } } } + delete commitit; // Update the last seen sequence number from this arbitrator - if (lastCommitSeenSequenceNumberByArbitratorTable->get(commit->getMachineId()) != NULL) { + if (lastCommitSeenSequenceNumberByArbitratorTable->contains(commit->getMachineId())) { if (commit->getSequenceNumber() > lastCommitSeenSequenceNumberByArbitratorTable->get(commit->getMachineId())) { lastCommitSeenSequenceNumberByArbitratorTable->put(commit->getMachineId(), commit->getSequenceNumber()); } @@ -2564,7 +2591,7 @@ void Table::processEntry(TransactionPart *entry) { // Update the part and set dead ones we have already seen (got a // rescued version) - TransactionPart *previouslySeenPart = transactionPart->put(entry->getPartId(), entry); + TransactionPart *previouslySeenPart = transactionPart->put(new Pair(entry->getPartId()), entry); if (previouslySeenPart != NULL) { previouslySeenPart->setDead(); } @@ -2576,23 +2603,20 @@ void Table::processEntry(TransactionPart *entry) { void Table::processEntry(CommitPart *entry) { // Update the last transaction that was updated if we can if (entry->getTransactionSequenceNumber() != -1) { - int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(entry->getMachineId()); - // Update the last transaction sequence number that the arbitrator - // arbitrated on - if ((lastTransactionNumber == NULL) || (lastTransactionNumber < entry->getTransactionSequenceNumber())) { + if (!lastArbitratedTransactionNumberByArbitratorTable->contains(entry->getMachineId() || lastArbitratedTransactionNumberByArbitratorTable->get(entry->getMachineId()) < entry->getTransactionSequenceNumber())) { lastArbitratedTransactionNumberByArbitratorTable->put(entry->getMachineId(), entry->getTransactionSequenceNumber()); } } - Hashtable *, CommitPart *> *commitPart = newCommitParts->get(entry->getMachineId()); + Hashtable *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *commitPart = newCommitParts->get(entry->getMachineId()); if (commitPart == NULL) { // Don't have a table for this machine Id yet so make one - commitPart = new Hashtable *, CommitPart *>(); + commitPart = new Hashtable *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals>(); newCommitParts->put(entry->getMachineId(), commitPart); } // Update the part and set dead ones we have already seen (got a // rescued version) - CommitPart *previouslySeenPart = commitPart->put(entry->getPartId(), entry); + CommitPart *previouslySeenPart = commitPart->put(new Pair(entry->getPartId()), entry); if (previouslySeenPart != NULL) { previouslySeenPart->setDead(); } -- 2.34.1