bool didProcessANewCommit = false;
// Process the commits one by one
- for (int64_t arbitratorId : liveCommitsTable->keySet()) {
+ SetIterator<int64_t, Hashtable<int64_t, Commit *> *> * liveit = getKeyIterator(liveCommitsTable);
+ while (liveit->hasNext()) {
+ int64_t arbitratorId = liveit->next();
// Get all the commits for a specific arbitrator
Hashtable<int64_t, Commit *> *commitForClientTable = liveCommitsTable->get(arbitratorId);
// Sort the commits in order
- Vector<int64_t> *commitSequenceNumbers = new Vector<int64_t>(commitForClientTable->keySet());
+ Vector<int64_t> *commitSequenceNumbers = new Vector<int64_t>();
+ {
+ SetIterator<int64_t, Commit *> * clientit = getKeyIterator(commitForClientTable);
+ while(clientit->hasNext())
+ commitSequenceNumbers->add(clientit->next());
+ delete clientit;
+ }
+
qsort(commitSequenceNumbers->expose(), commitSequenceNumbers->size(), sizeof(int64_t), compareInt64);
// Get the last commit seen from this arbitrator
int64_t lastCommitSeenSequenceNumber = -1;
- if (lastCommitSeenSequenceNumberByArbitratorTable->get(arbitratorId) != NULL) {
+ if (lastCommitSeenSequenceNumberByArbitratorTable->contains(arbitratorId)) {
lastCommitSeenSequenceNumber = lastCommitSeenSequenceNumberByArbitratorTable->get(arbitratorId);
}
// Update the last transaction that was updated if we can
if (commit->getTransactionSequenceNumber() != -1) {
// Update the last transaction sequence number that the arbitrator arbitrated on1
- if (!lastArbitratedTransactionNumberByArbitratorTable->contains(commit->getMachineId()) || lastArbitratedTransactionNumberByArbitratorTable->get(commit->getMachineId()) < commit->getTransactionSequenceNumber())) {
+ if (!lastArbitratedTransactionNumberByArbitratorTable->contains(commit->getMachineId()) || lastArbitratedTransactionNumberByArbitratorTable->get(commit->getMachineId()) < commit->getTransactionSequenceNumber()) {
lastArbitratedTransactionNumberByArbitratorTable->put(commit->getMachineId(), commit->getTransactionSequenceNumber());
}
}
// Update the last transaction that was updated if we can
if (commit->getTransactionSequenceNumber() != -1) {
int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(commit->getMachineId());
-
- // Update the last transaction sequence number that the arbitrator arbitrated on
- if ((lastTransactionNumber == NULL) || (lastTransactionNumber < commit->getTransactionSequenceNumber())) {
+ if (!lastArbitratedTransactionNumberByArbitratorTable->contains(commit->getMachineId()) ||
+ lastArbitratedTransactionNumberByArbitratorTable->get(commit->getMachineId()) < commit->getTransactionSequenceNumber()) {
lastArbitratedTransactionNumberByArbitratorTable->put(commit->getMachineId(), commit->getTransactionSequenceNumber());
}
}
SetIterator<KeyValue *, KeyValue *, uintptr_t, 0, hashKeyValue, equalsKeyValue> *kvit = commit->getKeyValueUpdateSet()->iterator();
while (kvit->hasNext()) {
KeyValue *kv = kvit->next();
- commitsToEdit->add(liveCommitsByKeyTable->get(kv->getKey()));
+ Commit * commit = liveCommitsByKeyTable->get(kv->getKey());
+ if (commit != NULL)
+ commitsToEdit->add(commit);
}
delete kvit;
}
- commitsToEdit->remove(NULL); // remove NULL since it could be in this set
// Update each previous commit that needs to be updated
SetIterator<Commit *, Commit *> * commitit = commitsToEdit->iterator();
// if the commit is now dead then remove it
if (!previousCommit->isLive()) {
- commitForClientTable->remove(previousCommit);
+ commitForClientTable->remove(previousCommit->getSequenceNumber());
}
}
}
}
delete kvit;
}
- }
- }
+ }
+}
+delete liveit;
return didProcessANewCommit;
}
* and have come from the cloud
*/
bool Table::updateSpeculativeTable(bool didProcessNewCommits) {
- if (liveTransactionBySequenceNumberTable->keySet()->size() == 0) {
+ if (liveTransactionBySequenceNumberTable->size() == 0) {
// There is nothing to speculate on
return false;
}
// Create a list of the transaction sequence numbers and sort them
// from oldest to newest
- Vector<int64_t> *transactionSequenceNumbersSorted = new Vector<int64_t>(liveTransactionBySequenceNumberTable->keySet());
- qsort(transactionSequenceNumberSorted->expose(), transactionSequenceNumbersSorted->size(), sizeof(int64_t), compareInt64);
+ Vector<int64_t> *transactionSequenceNumbersSorted = new Vector<int64_t>();
+ {
+ SetIterator<int64_t, Transaction *> * trit = getKeyIterator(liveTransactionBySequenceNumberTable);
+ while(trit->hasNext())
+ transactionSequenceNumbersSorted->add(trit->next());
+ delete trit;
+ }
+
+ qsort(transactionSequenceNumbersSorted->expose(), transactionSequenceNumbersSorted->size(), sizeof(int64_t), compareInt64);
bool hasGapInTransactionSequenceNumbers = transactionSequenceNumbersSorted->get(0) != oldestTransactionSequenceNumberSpeculatedOn;
speculatedKeyValueTable->clear();
lastTransactionSequenceNumberSpeculatedOn = -1;
oldestTransactionSequenceNumberSpeculatedOn = -1;
-
}
// Remember the front of the transaction list
oldestTransactionSequenceNumberSpeculatedOn = transactionSequenceNumbersSorted->get(0);
// Find where to start arbitration from
- int startIndex = transactionSequenceNumbersSorted->indexOf(lastTransactionSequenceNumberSpeculatedOn) + 1;
+ int startIndex = 0;
+ for(; startIndex < transactionSequenceNumbersSorted->size(); startIndex++)
+ if (transactionSequenceNumbersSorted->get(startIndex) == lastTransactionSequenceNumberSpeculatedOn)
+ break;
+ startIndex++;
+
if (startIndex >= transactionSequenceNumbersSorted->size()) {
// Make sure we are not out of bounds
return false; // did not speculate
}
// Find where to start arbitration from
- int startIndex = pendingTransactionQueue->indexOf(firstPendingTransaction) + 1;
+ int startIndex = 0;
+ for(; startIndex < pendingTransactionQueue->size(); startIndex++)
+ if (pendingTransactionQueue->get(startIndex) == firstPendingTransaction)
+ break;
+
if (startIndex >= pendingTransactionQueue->size()) {
// Make sure we are not out of bounds
return;
* transactions that are dead
*/
void Table::updateLiveTransactionsAndStatus() {
-
// Go through each of the transactions
- for (Iterator<Map->Entry<int64_t, Transaction> > *iter = liveTransactionBySequenceNumberTable->entrySet()->iterator(); iter->hasNext();) {
- Transaction *transaction = iter->next()->getValue();
-
- // Check if the transaction is dead
- int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(transaction->getArbitrator());
- if ((lastTransactionNumber != NULL) && (lastTransactionNumber >= transaction->getSequenceNumber())) {
-
- // Set dead the transaction
- transaction->setDead();
-
- // Remove the transaction from the live table
- iter->remove();
- liveTransactionByTransactionIdTable->remove(transaction->getId());
+ {
+ SetIterator<int64_t, Transaction *> * iter = getKeyIterator(liveTransactionBySequenceNumberTable);
+ while(iter->hasNext()) {
+ int64_t key = iter->next();
+ Transaction *transaction = liveTransactionBySequenceNumberTable->get(key);
+
+ // Check if the transaction is dead
+ if (lastArbitratedTransactionNumberByArbitratorTable->contains(transaction->getArbitrator()) && lastArbitratedTransactionNumberByArbitratorTable->get(transaction->getArbitrator() >= transaction->getSequenceNumber())) {
+ // Set dead the transaction
+ transaction->setDead();
+
+ // Remove the transaction from the live table
+ iter->remove();
+ liveTransactionByTransactionIdTable->remove(transaction->getId());
+ }
}
+ delete iter;
}
-
+
// Go through each of the transactions
- for (Iterator<Map->Entry<int64_t, TransactionStatus *> > *iter = outstandingTransactionStatus->entrySet()->iterator(); iter->hasNext();) {
- TransactionStatus *status = iter->next()->getValue();
-
- // Check if the transaction is dead
- int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(status->getTransactionArbitrator());
- if ((lastTransactionNumber != NULL) && (lastTransactionNumber >= status->getTransactionSequenceNumber())) {
-
- // Set committed
- status->setStatus(TransactionStatus_StatusCommitted);
-
- // Remove
- iter->remove();
+ {
+ SetIterator<int64_t, TransactionStatus *> * iter = getKeyIterator(outstandingTransactionStatus);
+ while(iter->hasNext()) {
+ int64_t key = iter->next();
+ TransactionStatus *status = outstandingTransactionStatus->get(key);
+
+ // Check if the transaction is dead
+ if (lastArbitratedTransactionNumberByArbitratorTable->contains(status->getTransactionArbitrator()) && (lastArbitratedTransactionNumberByArbitratorTable->get(status->getTransactionArbitrator()) >= status->getTransactionSequenceNumber())) {
+
+ // Set committed
+ status->setStatus(TransactionStatus_StatusCommitted);
+
+ // Remove
+ iter->remove();
+ }
}
+ delete iter;
}
}
// Create a list of clients to watch until they see this rejected
// message entry->
Hashset<int64_t> *deviceWatchSet = new Hashset<int64_t>();
- for (Map->Entry<int64_t, Pair<int64_t, Liveness *> > *lastMessageEntry : lastMessageTable->entrySet()) {
+ SetIterator<int64_t, Pair<int64_t, Liveness*> *> * iter = getKeyIterator(lastMessageTable);
+ while(iter->hasNext()) {
// Machine ID for the last message entry
- int64_t lastMessageEntryMachineId = lastMessageEntry->getKey();
+ int64_t lastMessageEntryMachineId = iter->next();
// We've seen it, don't need to continue to watch-> Our next
// message will implicitly acknowledge it->
continue;
}
- Pair<int64_t, Liveness *> lastMessageValue = lastMessageEntry->getValue();
- int64_t entrySequenceNumber = lastMessageValue.getFirst();
+ Pair<int64_t, Liveness *> * lastMessageValue = lastMessageTable->get(lastMessageEntryMachineId);
+ int64_t entrySequenceNumber = lastMessageValue->getFirst();
if (entrySequenceNumber < seq) {
// Add this rejected message to the set of messages that this
deviceWatchSet->add(lastMessageEntryMachineId);
}
}
+ delete iter;
+
if (deviceWatchSet->isEmpty()) {
// This rejected message has been seen by all the clients so
entry->setDead();
// Abort has not been seen by the client it is for yet so we need to
// keep track of it
- Abort *previouslySeenAbort = liveAbortTable->put(entry->getAbortId(), entry);
+ Abort *previouslySeenAbort = liveAbortTable->put(new Pair<int64_t, int64_t>(entry->getAbortId()), entry);
if (previouslySeenAbort != NULL) {
previouslySeenAbort->setDead(); // Delete old version of the abort since we got a rescued newer version
}
liveAbortsGeneratedByLocal->put(entry->getArbitratorLocalSequenceNumber(), entry);
}
- if ((entry->getSequenceNumber() != -1) && (lastMessageTable->get(entry->getTransactionMachineId()).getFirst() >= entry->getSequenceNumber())) {
+ if ((entry->getSequenceNumber() != -1) && (lastMessageTable->get(entry->getTransactionMachineId())->getFirst() >= entry->getSequenceNumber())) {
// The machine already saw this so it is dead
entry->setDead();
liveAbortTable->remove(&entry->getAbortId());