X-Git-Url: http://plrg.eecs.uci.edu/git/?a=blobdiff_plain;f=version2%2Fsrc%2FC%2FTable.cc;h=400ecc344bf4e4bed0932d556b7222c1c15cb759;hb=e8add21403bd6e29530a399f1ec8209f14e03623;hp=835b9db7b4e4a44fcd6ee52571af0073fe3f4781;hpb=25a3da33ee1648fa13408c12564a54eb7c0ee3a7;p=iotcloud.git diff --git a/version2/src/C/Table.cc b/version2/src/C/Table.cc index 835b9db..400ecc3 100644 --- a/version2/src/C/Table.cc +++ b/version2/src/C/Table.cc @@ -21,9 +21,9 @@ #include "SlotIndexer.h" #include -int compareInt64(const void * a, const void *b) { - const int64_t * pa = (const int64_t *) a; - const int64_t * pb = (const int64_t *) b; +int compareInt64(const void *a, const void *b) { + const int64_t *pa = (const int64_t *) a; + const int64_t *pb = (const int64_t *) b; if (*pa < *pb) return -1; else if (*pa > *pb) @@ -604,9 +604,9 @@ bool Table::sendToServer(NewKey *newKey) { // Process each entry in the slot Vector *entries = s->getEntries(); uint eSize = entries->size(); - for(uint ei=0; ei < eSize; ei++) { - Entry * entry = entries->get(ei); - + for (uint ei = 0; ei < eSize; ei++) { + Entry *entry = entries->get(ei); + if (entry->getType() == TypeLastMessage) { LastMessage *lastMessage = (LastMessage *)entry; if ((lastMessage->getMachineID() == localMachineId) && (lastMessage->getSequenceNumber() == lastSlotAttemptedToSend->getSequenceNumber())) { @@ -846,7 +846,7 @@ bool Table::updateFromLocal(int64_t machineId) { if (!localCommunicationTable->contains(machineId)) return false; - Pair * localCommunicationInformation = localCommunicationTable->get(machineId); + Pair *localCommunicationInformation = localCommunicationTable->get(machineId); // Get the size of the send data int sendDataSize = sizeof(int32_t) + sizeof(int64_t); @@ -897,16 +897,16 @@ Pair Table::sendTransactionToLocal(Transaction *transaction) { // Get the devices local communications if (!localCommunicationTable->contains(transaction->getArbitrator())) return Pair(true, false); - - Pair * localCommunicationInformation = localCommunicationTable->get(transaction->getArbitrator()); + + Pair *localCommunicationInformation = localCommunicationTable->get(transaction->getArbitrator()); // Get the size of the send data int sendDataSize = sizeof(int32_t) + sizeof(int64_t); { - Vector * tParts = transaction->getParts(); + Vector *tParts = transaction->getParts(); uint tPartsSize = tParts->size(); for (uint i = 0; i < tPartsSize; i++) { - TransactionPart * part = tParts->get(i); + TransactionPart *part = tParts->get(i); sendDataSize += part->getSize(); } } @@ -924,10 +924,10 @@ Pair Table::sendTransactionToLocal(Transaction *transaction) { bbEncode->putLong(lastArbitrationDataLocalSequenceNumber); bbEncode->putInt(transaction->getParts()->size()); { - Vector * tParts = transaction->getParts(); + Vector *tParts = transaction->getParts(); uint tPartsSize = tParts->size(); for (uint i = 0; i < tPartsSize; i++) { - TransactionPart * part = tParts->get(i); + TransactionPart *part = tParts->get(i); part->encode(bbEncode); } } @@ -967,14 +967,14 @@ Pair Table::sendTransactionToLocal(Transaction *transaction) { updateLiveStateFromLocal(); if (couldArbitrate) { - TransactionStatus * status = transaction->getTransactionStatus(); + TransactionStatus *status = transaction->getTransactionStatus(); if (didCommit) { status->setStatus(TransactionStatus_StatusCommitted); } else { status->setStatus(TransactionStatus_StatusAborted); } } else { - TransactionStatus * status = transaction->getTransactionStatus(); + TransactionStatus *status = transaction->getTransactionStatus(); if (foundAbort) { status->setStatus(TransactionStatus_StatusAborted); } else { @@ -1026,20 +1026,20 @@ Array *Table::acceptDataFromLocal(Array *data) { Vector *abortLocalSequenceNumbers = new Vector(); { SetIterator *abortit = getKeyIterator(liveAbortsGeneratedByLocal); - while(abortit->hasNext()) + while (abortit->hasNext()) abortLocalSequenceNumbers->add(abortit->next()); delete abortit; } - + qsort(abortLocalSequenceNumbers->expose(), abortLocalSequenceNumbers->size(), sizeof(int64_t), compareInt64); uint asize = abortLocalSequenceNumbers->size(); - for(uint i=0; iget(i); if (localSequenceNumber <= lastArbitratedSequenceNumberSeen) { continue; } - + Abort *abort = liveAbortsGeneratedByLocal->get(localSequenceNumber); unseenArbitrations->add(abort); returnDataSize += abort->getSize(); @@ -1051,14 +1051,14 @@ Array *Table::acceptDataFromLocal(Array *data) { Vector *commitLocalSequenceNumbers = new Vector(); { SetIterator *commitit = getKeyIterator(commitForClientTable); - while(commitit->hasNext()) + while (commitit->hasNext()) commitLocalSequenceNumbers->add(commitit->next()); delete commitit; } qsort(commitLocalSequenceNumbers->expose(), commitLocalSequenceNumbers->size(), sizeof(int64_t), compareInt64); uint clsSize = commitLocalSequenceNumbers->size(); - for(uint clsi = 0; clsi < clsSize; clsi++) { + for (uint clsi = 0; clsi < clsSize; clsi++) { int64_t localSequenceNumber = commitLocalSequenceNumbers->get(clsi); Commit *commit = commitForClientTable->get(localSequenceNumber); @@ -1067,10 +1067,10 @@ Array *Table::acceptDataFromLocal(Array *data) { } { - Vector * parts = commit->getParts(); + Vector *parts = commit->getParts(); uint nParts = parts->size(); - for(uint i=0; iget(i); + for (uint i = 0; i < nParts; i++) { + CommitPart *commitPart = parts->get(i); unseenArbitrations->add(commitPart); returnDataSize += commitPart->getSize(); } @@ -1154,8 +1154,8 @@ ThreeTuple *> Table::sendSlotsToServer(Slot *slot, int // Process each entry in the slot Vector *entries = s->getEntries(); uint eSize = entries->size(); - for(uint ei=0; ei < eSize; ei++) { - Entry * entry = entries->get(ei); + for (uint ei = 0; ei < eSize; ei++) { + Entry *entry = entries->get(ei); if (entry->getType() == TypeLastMessage) { LastMessage *lastMessage = (LastMessage *)entry; @@ -1348,7 +1348,7 @@ ThreeTuple Table::doMandatoryResuce(Slot *slot, bool resize // Mandatory Rescue for (; currentSequenceNumber < threshold; currentSequenceNumber++) { - Slot * previousSlot = buffer->getSlot(currentSequenceNumber); + Slot *previousSlot = buffer->getSlot(currentSequenceNumber); // Push slot number forward if (!seenLiveSlot) { oldestLiveSlotSequenceNumver = currentSequenceNumber; @@ -1366,8 +1366,8 @@ ThreeTuple Table::doMandatoryResuce(Slot *slot, bool resize // Iterate over all the live entries and try to rescue them uint lESize = liveEntries->size(); - for (uint i=0; i< lESize; i++) { - Entry * liveEntry = liveEntries->get(i); + for (uint i = 0; i < lESize; i++) { + Entry *liveEntry = liveEntries->get(i); if (slot->hasSpace(liveEntry)) { // Enough space to rescue the entry slot->addEntry(liveEntry); @@ -1399,8 +1399,8 @@ void Table::doOptionalRescue(Slot *s, bool seenliveslot, int64_t seqn, bool resi seenliveslot = true; Vector *liveentries = prevslot->getLiveEntries(resize); uint lESize = liveentries->size(); - for (uint i=0; i< lESize; i++) { - Entry * liveentry = liveentries->get(i); + for (uint i = 0; i < lESize; i++) { + Entry *liveentry = liveentries->get(i); if (s->hasSpace(liveentry)) s->addEntry(liveentry); else { @@ -1410,7 +1410,7 @@ void Table::doOptionalRescue(Slot *s, bool seenliveslot, int64_t seqn, bool resi } } } - donesearch: +donesearch: ; } @@ -1441,8 +1441,8 @@ void Table::validateAndUpdate(Array *newSlots, bool acceptUpdatesToLocal // Set to keep track of messages from clients Hashset *machineSet = new Hashset(); { - SetIterator *> * lmit=getKeyIterator(lastMessageTable); - while(lmit->hasNext()) + SetIterator *> *lmit = getKeyIterator(lastMessageTable); + while (lmit->hasNext()) machineSet->add(lmit->next()); delete lmit; } @@ -1450,7 +1450,7 @@ void Table::validateAndUpdate(Array *newSlots, bool acceptUpdatesToLocal // Process each slots data { uint numSlots = newSlots->length(); - for(uint i=0; iget(i); processSlot(indexer, slot, acceptUpdatesToLocal, machineSet); updateExpectedSize(); @@ -1479,12 +1479,12 @@ void Table::validateAndUpdate(Array *newSlots, bool acceptUpdatesToLocal // Commit new to slots to the local block chain-> { uint numSlots = newSlots->length(); - for(uint i=0; iget(i); - + // Insert this slot into our local block chain copy-> buffer->putSlot(slot); - + // Keep track of how many slots are currently live (have live data // in them)-> liveSlotCount++; @@ -1597,15 +1597,15 @@ void Table::processNewTransactionParts() { // Iterate through all the machine Ids that we received new parts // for - SetIterator *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *> * tpit= getKeyIterator(newTransactionParts); - while(tpit->hasNext()) { + SetIterator *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *> *tpit = getKeyIterator(newTransactionParts); + while (tpit->hasNext()) { int64_t machineId = tpit->next(); Hashtable *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *parts = newTransactionParts->get(machineId); SetIterator *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *ptit = getKeyIterator(parts); // Iterate through all the parts for that machine Id - while(ptit->hasNext()) { - Pair * partId = ptit->next(); + while (ptit->hasNext()) { + Pair *partId = ptit->next(); TransactionPart *part = parts->get(partId); if (lastArbitratedTransactionNumberByArbitratorTable->contains(part->getArbitratorId())) { @@ -1616,7 +1616,7 @@ void Table::processNewTransactionParts() { continue; } } - + // Get the transaction object for that sequence number Transaction *transaction = liveTransactionBySequenceNumberTable->get(part->getSequenceNumber()); @@ -1650,21 +1650,21 @@ void Table::arbitrateFromServer() { // Get the transaction sequence numbers and sort from oldest to newest Vector *transactionSequenceNumbers = new Vector(); { - SetIterator * trit = getKeyIterator(liveTransactionBySequenceNumberTable); - while(trit->hasNext()) + SetIterator *trit = getKeyIterator(liveTransactionBySequenceNumberTable); + while (trit->hasNext()) transactionSequenceNumbers->add(trit->next()); delete trit; } qsort(transactionSequenceNumbers->expose(), transactionSequenceNumbers->size(), sizeof(int64_t), compareInt64); // Collection of key value pairs that are - Hashtable * speculativeTableTmp = new Hashtable(); + Hashtable *speculativeTableTmp = new Hashtable(); // The last transaction arbitrated on int64_t lastTransactionCommitted = -1; Hashset *generatedAborts = new Hashset(); uint tsnSize = transactionSequenceNumbers->size(); - for(uint i=0; iget(i); Transaction *transaction = liveTransactionBySequenceNumberTable->get(transactionSequenceNumber); @@ -1711,7 +1711,7 @@ void Table::arbitrateFromServer() { speculativeTableTmp->put(kv->getKey(), kv); } delete kvit; - + // Update what the last transaction committed was for use in batch commit lastTransactionCommitted = transactionSequenceNumber; } else { @@ -1742,24 +1742,24 @@ void Table::arbitrateFromServer() { localArbitrationSequenceNumber++; // Add all the new keys to the commit - SetIterator * spit = getKeyIterator(speculativeTableTmp); - while(spit->hasNext()) { - IoTString * string = spit->next(); - KeyValue * kv = speculativeTableTmp->get(string); + SetIterator *spit = getKeyIterator(speculativeTableTmp); + while (spit->hasNext()) { + IoTString *string = spit->next(); + KeyValue *kv = speculativeTableTmp->get(string); newCommit->addKV(kv); } delete spit; - + // create the commit parts newCommit->createCommitParts(); // Append all the commit parts to the end of the pending queue // waiting for sending to the server // Insert the commit so we can process it - Vector * parts = newCommit->getParts(); + Vector *parts = newCommit->getParts(); uint partsSize = parts->size(); - for(uint i=0; iget(i); + for (uint i = 0; i < partsSize; i++) { + CommitPart *commitPart = parts->get(i); processEntry(commitPart); } } @@ -1771,10 +1771,10 @@ void Table::arbitrateFromServer() { if (compactArbitrationData()) { ArbitrationRound *newArbitrationRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1); if (newArbitrationRound->getCommit() != NULL) { - Vector * parts = newArbitrationRound->getCommit()->getParts(); + Vector *parts = newArbitrationRound->getCommit()->getParts(); uint partsSize = parts->size(); - for(uint i=0; iget(i); + for (uint i = 0; i < partsSize; i++) { + CommitPart *commitPart = parts->get(i); processEntry(commitPart); } } @@ -1819,7 +1819,7 @@ Pair Table::arbitrateOnLocalTransaction(Transaction *transaction) { newCommit->addKV(kv); } delete kvit; - + // create the commit parts newCommit->createCommitParts(); @@ -1830,18 +1830,18 @@ Pair Table::arbitrateOnLocalTransaction(Transaction *transaction) { if (compactArbitrationData()) { ArbitrationRound *newArbitrationRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1); - Vector * parts = newArbitrationRound->getCommit()->getParts(); + Vector *parts = newArbitrationRound->getCommit()->getParts(); uint partsSize = parts->size(); - for(uint i=0; iget(i); + for (uint i = 0; i < partsSize; i++) { + CommitPart *commitPart = parts->get(i); processEntry(commitPart); } } else { // Insert the commit so we can process it - Vector * parts = newCommit->getParts(); + Vector *parts = newCommit->getParts(); uint partsSize = parts->size(); - for(uint i=0; iget(i); + for (uint i = 0; i < partsSize; i++) { + CommitPart *commitPart = parts->get(i); processEntry(commitPart); } } @@ -1859,7 +1859,7 @@ Pair Table::arbitrateOnLocalTransaction(Transaction *transaction) { if (transaction->getMachineId() == localMachineId) { // For locally created messages update the status // Guard evaluated was false so create abort - TransactionStatus * status = transaction->getTransactionStatus(); + TransactionStatus *status = transaction->getTransactionStatus(); if (status != NULL) { status->setStatus(TransactionStatus_StatusAborted); } @@ -1884,10 +1884,10 @@ Pair Table::arbitrateOnLocalTransaction(Transaction *transaction) { if (compactArbitrationData()) { ArbitrationRound *newArbitrationRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1); - Vector * parts = newArbitrationRound->getCommit()->getParts(); + Vector *parts = newArbitrationRound->getCommit()->getParts(); uint partsSize = parts->size(); - for(uint i=0; iget(i); + for (uint i = 0; i < partsSize; i++) { + CommitPart *commitPart = parts->get(i); processEntry(commitPart); } } @@ -1937,7 +1937,7 @@ bool Table::compactArbitrationData() { lastRound->addAborts(round->getAborts()); } else { // Create a new larger commit - Commit * newCommit = Commit_merge(lastRound->getCommit(), round->getCommit(), localArbitrationSequenceNumber); + Commit *newCommit = Commit_merge(lastRound->getCommit(), round->getCommit(), localArbitrationSequenceNumber); localArbitrationSequenceNumber++; // Create the commit parts so that we can count them @@ -1997,15 +1997,15 @@ bool Table::updateCommittedTable() { } // Iterate through all the machine Ids that we received new parts for - SetIterator *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *> * partsit=getKeyIterator(newCommitParts); - while(partsit->hasNext()) { + SetIterator *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *> *partsit = getKeyIterator(newCommitParts); + while (partsit->hasNext()) { int64_t machineId = partsit->next(); Hashtable *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *parts = newCommitParts->get(machineId); // Iterate through all the parts for that machine Id - SetIterator *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> * pairit=getKeyIterator(parts); - while(pairit->hasNext()) { - Pair * partId = pairit->next(); + SetIterator *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *pairit = getKeyIterator(parts); + while (pairit->hasNext()) { + Pair *partId = pairit->next(); CommitPart *part = parts->get(partId); // Get the transaction object for that sequence number @@ -2033,7 +2033,7 @@ bool Table::updateCommittedTable() { delete pairit; } delete partsit; - + // Clear all the new commits parts in preparation for the next time // the server sends slots newCommitParts->clear(); @@ -2042,7 +2042,7 @@ bool Table::updateCommittedTable() { bool didProcessANewCommit = false; // Process the commits one by one - SetIterator *> * liveit = getKeyIterator(liveCommitsTable); + SetIterator *> *liveit = getKeyIterator(liveCommitsTable); while (liveit->hasNext()) { int64_t arbitratorId = liveit->next(); @@ -2052,8 +2052,8 @@ bool Table::updateCommittedTable() { // Sort the commits in order Vector *commitSequenceNumbers = new Vector(); { - SetIterator * clientit = getKeyIterator(commitForClientTable); - while(clientit->hasNext()) + SetIterator *clientit = getKeyIterator(commitForClientTable); + while (clientit->hasNext()) commitSequenceNumbers->add(clientit->next()); delete clientit; } @@ -2133,7 +2133,7 @@ bool Table::updateCommittedTable() { SetIterator *kvit = commit->getKeyValueUpdateSet()->iterator(); while (kvit->hasNext()) { KeyValue *kv = kvit->next(); - Commit * commit = liveCommitsByKeyTable->get(kv->getKey()); + Commit *commit = liveCommitsByKeyTable->get(kv->getKey()); if (commit != NULL) commitsToEdit->add(commit); } @@ -2141,8 +2141,8 @@ bool Table::updateCommittedTable() { } // Update each previous commit that needs to be updated - SetIterator * commitit = commitsToEdit->iterator(); - while(commitit->hasNext()) { + SetIterator *commitit = commitsToEdit->iterator(); + while (commitit->hasNext()) { Commit *previousCommit = commitit->next(); // Only bother with live commits (TODO: Maybe remove this check) @@ -2157,7 +2157,7 @@ bool Table::updateCommittedTable() { } delete kvit; } - + // if the commit is now dead then remove it if (!previousCommit->isLive()) { commitForClientTable->remove(previousCommit->getSequenceNumber()); @@ -2188,9 +2188,9 @@ bool Table::updateCommittedTable() { } delete kvit; } - } -} -delete liveit; + } + } + delete liveit; return didProcessANewCommit; } @@ -2209,12 +2209,12 @@ bool Table::updateSpeculativeTable(bool didProcessNewCommits) { // from oldest to newest Vector *transactionSequenceNumbersSorted = new Vector(); { - SetIterator * trit = getKeyIterator(liveTransactionBySequenceNumberTable); - while(trit->hasNext()) + SetIterator *trit = getKeyIterator(liveTransactionBySequenceNumberTable); + while (trit->hasNext()) transactionSequenceNumbersSorted->add(trit->next()); delete trit; } - + qsort(transactionSequenceNumbersSorted->expose(), transactionSequenceNumbersSorted->size(), sizeof(int64_t), compareInt64); bool hasGapInTransactionSequenceNumbers = transactionSequenceNumbersSorted->get(0) != oldestTransactionSequenceNumberSpeculatedOn; @@ -2238,11 +2238,11 @@ bool Table::updateSpeculativeTable(bool didProcessNewCommits) { // Find where to start arbitration from int startIndex = 0; - for(; startIndex < transactionSequenceNumbersSorted->size(); startIndex++) + for (; startIndex < transactionSequenceNumbersSorted->size(); startIndex++) if (transactionSequenceNumbersSorted->get(startIndex) == lastTransactionSequenceNumberSpeculatedOn) break; startIndex++; - + if (startIndex >= transactionSequenceNumbersSorted->size()) { // Make sure we are not out of bounds return false; // did not speculate @@ -2313,10 +2313,10 @@ void Table::updatePendingTransactionSpeculativeTable(bool didProcessNewCommitsOr // Find where to start arbitration from int startIndex = 0; - for(; startIndex < pendingTransactionQueue->size(); startIndex++) + for (; startIndex < pendingTransactionQueue->size(); startIndex++) if (pendingTransactionQueue->get(startIndex) == firstPendingTransaction) break; - + if (startIndex >= pendingTransactionQueue->size()) { // Make sure we are not out of bounds return; @@ -2346,16 +2346,16 @@ void Table::updatePendingTransactionSpeculativeTable(bool didProcessNewCommitsOr void Table::updateLiveTransactionsAndStatus() { // Go through each of the transactions { - SetIterator * iter = getKeyIterator(liveTransactionBySequenceNumberTable); - while(iter->hasNext()) { + SetIterator *iter = getKeyIterator(liveTransactionBySequenceNumberTable); + while (iter->hasNext()) { int64_t key = iter->next(); Transaction *transaction = liveTransactionBySequenceNumberTable->get(key); - + // Check if the transaction is dead if (lastArbitratedTransactionNumberByArbitratorTable->contains(transaction->getArbitrator()) && lastArbitratedTransactionNumberByArbitratorTable->get(transaction->getArbitrator() >= transaction->getSequenceNumber())) { // Set dead the transaction transaction->setDead(); - + // Remove the transaction from the live table iter->remove(); liveTransactionByTransactionIdTable->remove(transaction->getId()); @@ -2363,20 +2363,20 @@ void Table::updateLiveTransactionsAndStatus() { } delete iter; } - + // Go through each of the transactions { - SetIterator * iter = getKeyIterator(outstandingTransactionStatus); - while(iter->hasNext()) { + SetIterator *iter = getKeyIterator(outstandingTransactionStatus); + while (iter->hasNext()) { int64_t key = iter->next(); TransactionStatus *status = outstandingTransactionStatus->get(key); // Check if the transaction is dead if (lastArbitratedTransactionNumberByArbitratorTable->contains(status->getTransactionArbitrator()) && (lastArbitratedTransactionNumberByArbitratorTable->get(status->getTransactionArbitrator()) >= status->getTransactionSequenceNumber())) { - + // Set committed status->setStatus(TransactionStatus_StatusCommitted); - + // Remove iter->remove(); } @@ -2396,8 +2396,8 @@ void Table::processSlot(SlotIndexer *indexer, Slot *slot, bool acceptUpdatesToLo // Process each entry in the slot Vector *entries = slot->getEntries(); uint eSize = entries->size(); - for(uint ei=0; ei < eSize; ei++) { - Entry * entry = entries->get(ei); + for (uint ei = 0; ei < eSize; ei++) { + Entry *entry = entries->get(ei); switch (entry->getType()) { case TypeCommitPart: processEntry((CommitPart *)entry); @@ -2456,7 +2456,7 @@ void Table::processEntry(NewKey *entry) { * seen in this current round of updating the local copy of the block * chain */ -void Table::processEntry(TableStatus * entry, int64_t seq) { +void Table::processEntry(TableStatus *entry, int64_t seq) { int newNumSlots = entry->getMaxSlots(); updateCurrMaxSize(newNumSlots); initExpectedSize(seq, newNumSlots); @@ -2501,8 +2501,8 @@ void Table::processEntry(RejectedMessage *entry, SlotIndexer *indexer) { // Create a list of clients to watch until they see this rejected // message entry-> Hashset *deviceWatchSet = new Hashset(); - SetIterator *> * iter = getKeyIterator(lastMessageTable); - while(iter->hasNext()) { + SetIterator *> *iter = getKeyIterator(lastMessageTable); + while (iter->hasNext()) { // Machine ID for the last message entry int64_t lastMessageEntryMachineId = iter->next(); @@ -2512,7 +2512,7 @@ void Table::processEntry(RejectedMessage *entry, SlotIndexer *indexer) { continue; } - Pair * lastMessageValue = lastMessageTable->get(lastMessageEntryMachineId); + Pair *lastMessageValue = lastMessageTable->get(lastMessageEntryMachineId); int64_t entrySequenceNumber = lastMessageValue->getFirst(); if (entrySequenceNumber < seq) { @@ -2525,7 +2525,7 @@ void Table::processEntry(RejectedMessage *entry, SlotIndexer *indexer) { } } delete iter; - + if (deviceWatchSet->isEmpty()) { // This rejected message has been seen by all the clients so entry->setDead(); @@ -2550,7 +2550,7 @@ void Table::processEntry(Abort *entry) { // Abort has not been seen by the client it is for yet so we need to // keep track of it - + Abort *previouslySeenAbort = liveAbortTable->put(new Pair(entry->getAbortId()), entry); if (previouslySeenAbort != NULL) { previouslySeenAbort->setDead(); // Delete old version of the abort since we got a rescued newer version @@ -2586,7 +2586,7 @@ void Table::processEntry(Abort *entry) { // Set dead a transaction if we can Pair deadPair = Pair(entry->getTransactionMachineId(), entry->getTransactionClientLocalSequenceNumber()); - + Transaction *transactionToSetDead = liveTransactionByTransactionIdTable->remove(&deadPair); if (transactionToSetDead != NULL) { liveTransactionBySequenceNumberTable->remove(transactionToSetDead->getSequenceNumber()); @@ -2639,7 +2639,7 @@ void Table::processEntry(TransactionPart *entry) { void Table::processEntry(CommitPart *entry) { // Update the last transaction that was updated if we can if (entry->getTransactionSequenceNumber() != -1) { - if (!lastArbitratedTransactionNumberByArbitratorTable->contains(entry->getMachineId() || lastArbitratedTransactionNumberByArbitratorTable->get(entry->getMachineId()) < entry->getTransactionSequenceNumber())) { + if (!lastArbitratedTransactionNumberByArbitratorTable->contains(entry->getMachineId() || lastArbitratedTransactionNumberByArbitratorTable->get(entry->getMachineId()) < entry->getTransactionSequenceNumber())) { lastArbitratedTransactionNumberByArbitratorTable->put(entry->getMachineId(), entry->getTransactionSequenceNumber()); } } @@ -2678,7 +2678,7 @@ void Table::updateLastMessage(int64_t machineId, int64_t seqNum, Liveness *liven // seen yet SetIterator *rmit = watchset->iterator(); - while(rmit->hasNext()) { + while (rmit->hasNext()) { RejectedMessage *rm = rmit->next(); // If this machine Id has seen this rejected message->->-> if (rm->getSequenceNumber() <= seqNum) { @@ -2692,10 +2692,10 @@ void Table::updateLastMessage(int64_t machineId, int64_t seqNum, Liveness *liven } // Set dead the abort - SetIterator *, Abort *, uintptr_t, 0, pairHashFunction, pairEquals> * abortit = getKeyIterator(liveAbortTable); + SetIterator *, Abort *, uintptr_t, 0, pairHashFunction, pairEquals> *abortit = getKeyIterator(liveAbortTable); - while(abortit->hasNext()) { - Pair * key = abortit->next(); + while (abortit->hasNext()) { + Pair *key = abortit->next(); Abort *abort = liveAbortTable->get(key); if ((abort->getTransactionMachineId() == machineId) && (abort->getSequenceNumber() <= seqNum)) { abort->setDead(); @@ -2709,7 +2709,7 @@ void Table::updateLastMessage(int64_t machineId, int64_t seqNum, Liveness *liven if (machineId == localMachineId) { // Our own messages are immediately dead-> char livenessType = liveness->getType(); - if (livenessType==TypeLastMessage) { + if (livenessType == TypeLastMessage) { ((LastMessage *)liveness)->setDead(); } else if (livenessType == TypeSlot) { ((Slot *)liveness)->setDead(); @@ -2718,7 +2718,7 @@ void Table::updateLastMessage(int64_t machineId, int64_t seqNum, Liveness *liven } } // Get the old last message for this device - Pair * lastMessageEntry = lastMessageTable->put(machineId, new Pair(seqNum, liveness)); + Pair *lastMessageEntry = lastMessageTable->put(machineId, new Pair(seqNum, liveness)); if (lastMessageEntry == NULL) { // If no last message then there is nothing else to process return; @@ -2727,11 +2727,11 @@ void Table::updateLastMessage(int64_t machineId, int64_t seqNum, Liveness *liven int64_t lastMessageSeqNum = lastMessageEntry->getFirst(); Liveness *lastEntry = lastMessageEntry->getSecond(); delete lastMessageEntry; - + // If it is not our machine Id since we already set ours to dead if (machineId != localMachineId) { char lastEntryType = lastEntry->getType(); - + if (lastEntryType == TypeLastMessage) { ((LastMessage *)lastEntry)->setDead(); } else if (lastEntryType == TypeSlot) {