X-Git-Url: http://plrg.eecs.uci.edu/git/?p=iotcloud.git;a=blobdiff_plain;f=version2%2Fsrc%2FC%2FTable.cc;h=d70a590890b170fac9417fc6ab049437304de286;hp=d598f15cfda3939f9d65c9f6fbf6876530c18622;hb=8981ac1037b9381ff64428f9236cbf2d4954d1e2;hpb=3e54762d57367b1ce049830b42f00950055d8527 diff --git a/version2/src/C/Table.cc b/version2/src/C/Table.cc index d598f15..d70a590 100644 --- a/version2/src/C/Table.cc +++ b/version2/src/C/Table.cc @@ -10,8 +10,27 @@ #include "TransactionStatus.h" #include "Transaction.h" #include "LastMessage.h" -#include "Random.h" - +#include "SecureRandom.h" +#include "ByteBuffer.h" +#include "Abort.h" +#include "CommitPart.h" +#include "ArbitrationRound.h" +#include "TransactionPart.h" +#include "Commit.h" +#include "RejectedMessage.h" +#include "SlotIndexer.h" +#include + +int compareInt64(const void *a, const void *b) { + const int64_t *pa = (const int64_t *) a; + const int64_t *pb = (const int64_t *) b; + if (*pa < *pb) + return -1; + else if (*pa > *pb) + return 1; + else + return 0; +} Table::Table(IoTString *baseurl, IoTString *password, int64_t _localMachineId, int listeningPort) : buffer(NULL), @@ -142,7 +161,7 @@ Table::Table(CloudComm *_cloud, int64_t _localMachineId) : */ void Table::init() { // Init helper objects - random = new Random(); + random = new SecureRandom(); buffer = new SlotBuffer(); // init data structs @@ -150,16 +169,16 @@ void Table::init() { speculatedKeyValueTable = new Hashtable(); pendingTransactionSpeculatedKeyValueTable = new Hashtable(); liveNewKeyTable = new Hashtable(); - lastMessageTable = new Hashtable >(); + lastMessageTable = new Hashtable * >(); rejectedMessageWatchVectorTable = new Hashtable * >(); arbitratorTable = new Hashtable(); - liveAbortTable = new Hashtable, Abort *>(); - newTransactionParts = new Hashtable, TransactionPart *> *>(); - newCommitParts = new Hashtable, CommitPart *> *>(); + liveAbortTable = new Hashtable *, Abort *, uintptr_t, 0, pairHashFunction, pairEquals>(); + newTransactionParts = new Hashtable *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *>(); + newCommitParts = new Hashtable *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *>(); lastArbitratedTransactionNumberByArbitratorTable = new Hashtable(); liveTransactionBySequenceNumberTable = new Hashtable(); - liveTransactionByTransactionIdTable = new Hashtable, Transaction *>(); - liveCommitsTable = new Hashtable >(); + liveTransactionByTransactionIdTable = new Hashtable *, Transaction *, uintptr_t, 0, pairHashFunction, pairEquals>(); + liveCommitsTable = new Hashtable * >(); liveCommitsByKeyTable = new Hashtable(); lastCommitSeenSequenceNumberByArbitratorTable = new Hashtable(); rejectedSlotVector = new Vector(); @@ -168,13 +187,12 @@ void Table::init() { transactionPartsSent = new Hashtable *>(); outstandingTransactionStatus = new Hashtable(); liveAbortsGeneratedByLocal = new Hashtable(); - offlineTransactionsCommittedAndAtServer = new Hashset >(); - localCommunicationTable = new Hashtable >(); + offlineTransactionsCommittedAndAtServer = new Hashset *, uintptr_t, 0, pairHashFunction, pairEquals>(); + localCommunicationTable = new Hashtable *>(); lastTransactionSeenFromMachineFromServer = new Hashtable(); pendingSendArbitrationRounds = new Vector(); lastArbitrationDataLocalSequenceNumberSeenFromArbitrator = new Hashtable(); - // Other init stuff numberOfSlots = buffer->capacity(); setResizeThreshold(); @@ -220,7 +238,7 @@ void Table::rebuild() { } void Table::addLocalCommunication(int64_t arbitrator, IoTString *hostName, int portNumber) { - localCommunicationTable->put(arbitrator, Pair(hostName, portNumber)); + localCommunicationTable->put(arbitrator, new Pair(hostName, portNumber)); } int64_t Table::getArbitrator(IoTString *key) { @@ -228,7 +246,7 @@ int64_t Table::getArbitrator(IoTString *key) { } void Table::close() { - cloud->close(); + cloud->closeCloud(); } IoTString *Table::getCommitted(IoTString *key) { @@ -316,15 +334,15 @@ bool Table::update() { Array *newSlots = cloud->getSlots(sequenceNumber + 1); validateAndUpdate(newSlots, false); sendToServer(NULL); - - updateLiveTransactionsAndStatus(); - return true; } catch (Exception *e) { - for (int64_t m : localCommunicationTable->keySet()) { + SetIterator *> *kit = getKeyIterator(localCommunicationTable); + while (kit->hasNext()) { + int64_t m = kit->next(); updateFromLocal(m); } + delete kit; } return false; @@ -336,7 +354,6 @@ bool Table::createNewKey(IoTString *keyName, int64_t machineId) { // There is already an arbitrator return false; } - NewKey *newKey = new NewKey(NULL, keyName, machineId); if (sendToServer(newKey)) { @@ -370,7 +387,6 @@ void Table::addKV(IoTString *key, IoTString *value) { } TransactionStatus *Table::commitTransaction() { - if (pendingTransactionBuilder->getKVUpdates()->size() == 0) { // transaction with no updates will have no effect on the system return new TransactionStatus(TransactionStatus_StatusNoEffect, -1); @@ -402,8 +418,11 @@ TransactionStatus *Table::commitTransaction() { } catch (ServerException *e) { Hashset *arbitratorTriedAndFailed = new Hashset(); - for (Iterator *iter = pendingTransactionQueue->iterator(); iter->hasNext(); ) { - Transaction *transaction = iter->next(); + uint size = pendingTransactionQueue->size(); + uint oldindex = 0; + for (int iter = 0; iter < size; iter++) { + Transaction *transaction = pendingTransactionQueue->get(iter); + pendingTransactionQueue->set(oldindex++, pendingTransactionQueue->get(iter)); if (arbitratorTriedAndFailed->contains(transaction->getArbitrator())) { // Already contacted this client so ignore all attempts to contact this client @@ -413,18 +432,19 @@ TransactionStatus *Table::commitTransaction() { Pair sendReturn = sendTransactionToLocal(transaction); - if (sendReturn->getFirst()) { + if (sendReturn.getFirst()) { // Failed to contact over local arbitratorTriedAndFailed->add(transaction->getArbitrator()); } else { // Successful contact or should not contact - if (sendReturn->getSecond()) { + if (sendReturn.getSecond()) { // did arbitrate - iter->remove(); + oldindex--; } } } + pendingTransactionQueue->setSize(oldindex); } updateLiveStateFromLocal(); @@ -444,13 +464,8 @@ int64_t Table::getLocalSequenceNumber() { return localSequenceNumber; } - -bool lastInsertedNewKey = false; - bool Table::sendToServer(NewKey *newKey) { - bool fromRetry = false; - try { if (hadPartialSendToServer) { Array *newSlots = cloud->getSlots(sequenceNumber + 1); @@ -458,35 +473,35 @@ bool Table::sendToServer(NewKey *newKey) { fromRetry = true; ThreeTuple *> sendSlotsReturn = sendSlotsToServer(lastSlotAttemptedToSend, lastNewSize, lastIsNewKey); - if (sendSlotsReturn->getFirst()) { + if (sendSlotsReturn.getFirst()) { if (newKey != NULL) { if (lastInsertedNewKey && (lastNewKey->getKey() == newKey->getKey()) && (lastNewKey->getMachineID() == newKey->getMachineID())) { newKey = NULL; } } - for (Transaction *transaction : lastTransactionPartsSent->keySet()) { + SetIterator *> *trit = getKeyIterator(lastTransactionPartsSent); + while (trit->hasNext()) { + Transaction *transaction = trit->next(); transaction->resetServerFailure(); - // Update which transactions parts still need to be sent transaction->removeSentParts(lastTransactionPartsSent->get(transaction)); - // Add the transaction status to the outstanding list outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus()); // Update the transaction status transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial); - // Check if all the transaction parts were successfully sent and if so then remove it from pending + // Check if all the transaction parts were successfully + // sent and if so then remove it from pending if (transaction->didSendAllParts()) { transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully); pendingTransactionQueue->remove(transaction); } } + delete trit; } else { - - newSlots = sendSlotsReturn->getThird(); - + newSlots = sendSlotsReturn.getThird(); bool isInserted = false; for (uint si = 0; si < newSlots->length(); si++) { Slot *s = newSlots->get(si); @@ -503,7 +518,10 @@ bool Table::sendToServer(NewKey *newKey) { } // Process each entry in the slot - for (Entry *entry : s->getEntries()) { + Vector *ventries = s->getEntries(); + uint vesize = ventries->size(); + for (uint vei = 0; vei < vesize; vei++) { + Entry *entry = ventries->get(vei); if (entry->getType() == TypeLastMessage) { LastMessage *lastMessage = (LastMessage *)entry; if ((lastMessage->getMachineID() == localMachineId) && (lastMessage->getSequenceNumber() == lastSlotAttemptedToSend->getSequenceNumber())) { @@ -521,7 +539,9 @@ bool Table::sendToServer(NewKey *newKey) { } } - for (Transaction *transaction : lastTransactionPartsSent->keySet()) { + SetIterator *> *trit = getKeyIterator(lastTransactionPartsSent); + while (trit->hasNext()) { + Transaction *transaction = trit->next(); transaction->resetServerFailure(); // Update which transactions parts still need to be sent @@ -545,20 +565,24 @@ bool Table::sendToServer(NewKey *newKey) { } } } + delete trit; } } - for (Transaction *transaction : lastTransactionPartsSent->keySet()) { + SetIterator *> *trit = getKeyIterator(lastTransactionPartsSent); + while (trit->hasNext()) { + Transaction *transaction = trit->next(); transaction->resetServerFailure(); // Set the transaction sequence number back to nothing if (!transaction->didSendAPartToServer()) { transaction->setSequenceNumber(-1); } } + delete trit; - if (sendSlotsReturn->getThird()->length() != 0) { + if (sendSlotsReturn.getThird()->length() != 0) { // insert into the local block chain - validateAndUpdate(sendSlotsReturn->getThird(), true); + validateAndUpdate(sendSlotsReturn.getThird(), true); } // continue; } else { @@ -578,7 +602,10 @@ bool Table::sendToServer(NewKey *newKey) { } // Process each entry in the slot - for (Entry *entry : s->getEntries()) { + Vector *entries = s->getEntries(); + uint eSize = entries->size(); + for (uint ei = 0; ei < eSize; ei++) { + Entry *entry = entries->get(ei); if (entry->getType() == TypeLastMessage) { LastMessage *lastMessage = (LastMessage *)entry; @@ -597,7 +624,9 @@ bool Table::sendToServer(NewKey *newKey) { } } - for (Transaction *transaction : lastTransactionPartsSent->keySet()) { + SetIterator *> *trit = getKeyIterator(lastTransactionPartsSent); + while (trit->hasNext()) { + Transaction *transaction = trit->next(); transaction->resetServerFailure(); // Update which transactions parts still need to be sent @@ -621,14 +650,18 @@ bool Table::sendToServer(NewKey *newKey) { } } } + delete trit; } else { - for (Transaction *transaction : lastTransactionPartsSent->keySet()) { + SetIterator *> *trit = getKeyIterator(lastTransactionPartsSent); + while (trit->hasNext()) { + Transaction *transaction = trit->next(); transaction->resetServerFailure(); // Set the transaction sequence number back to nothing if (!transaction->didSendAPartToServer()) { transaction->setSequenceNumber(-1); } } + delete trit; } // insert into the local block chain @@ -664,13 +697,15 @@ bool Table::sendToServer(NewKey *newKey) { // Try to fill the slot with data ThreeTuple fillSlotsReturn = fillSlot(slot, false, newKey); - bool needsResize = fillSlotsReturn->getFirst(); - int newSize = fillSlotsReturn->getSecond(); - bool insertedNewKey = fillSlotsReturn->getThird(); + bool needsResize = fillSlotsReturn.getFirst(); + int newSize = fillSlotsReturn.getSecond(); + bool insertedNewKey = fillSlotsReturn.getThird(); if (needsResize) { // Reset which transaction to send - for (Transaction *transaction : transactionPartsSent->keySet()) { + SetIterator *> *trit = getKeyIterator(transactionPartsSent); + while (trit->hasNext()) { + Transaction *transaction = trit->next(); transaction->resetNextPartToSend(); // Set the transaction sequence number back to nothing @@ -678,6 +713,7 @@ bool Table::sendToServer(NewKey *newKey) { transaction->setSequenceNumber(-1); } } + delete trit; // Clear the sent data since we are trying again pendingSendArbitrationEntriesToDelete->clear(); @@ -692,13 +728,12 @@ bool Table::sendToServer(NewKey *newKey) { lastInsertedNewKey = insertedNewKey; lastNewSize = newSize; lastNewKey = newKey; - lastTransactionPartsSent = new Hashtable * >(transactionPartsSent); + lastTransactionPartsSent = transactionPartsSent->clone(); lastPendingSendArbitrationEntriesToDelete = new Vector(pendingSendArbitrationEntriesToDelete); - ThreeTuple *> sendSlotsReturn = sendSlotsToServer(slot, newSize, newKey != NULL); - if (sendSlotsReturn->getFirst()) { + if (sendSlotsReturn.getFirst()) { // Did insert into the block chain @@ -710,17 +745,23 @@ bool Table::sendToServer(NewKey *newKey) { } // Remove the aborts and commit parts that were sent from the pending to send queue - for (Iterator *iter = pendingSendArbitrationRounds->iterator(); iter->hasNext(); ) { - ArbitrationRound *round = iter->next(); + uint size = pendingSendArbitrationRounds->size(); + uint oldcount = 0; + for (uint i = 0; i < size; i++) { + ArbitrationRound *round = pendingSendArbitrationRounds->get(i); round->removeParts(pendingSendArbitrationEntriesToDelete); - if (round->isDoneSending()) { + if (!round->isDoneSending()) { // Sent all the parts - iter->remove(); + pendingSendArbitrationRounds->set(oldcount++, + pendingSendArbitrationRounds->get(i)); } } + pendingSendArbitrationRounds->setSize(oldcount); - for (Transaction *transaction : transactionPartsSent->keySet()) { + SetIterator *> *trit = getKeyIterator(transactionPartsSent); + while (trit->hasNext()) { + Transaction *transaction = trit->next(); transaction->resetServerFailure(); // Update which transactions parts still need to be sent @@ -738,9 +779,12 @@ bool Table::sendToServer(NewKey *newKey) { pendingTransactionQueue->remove(transaction); } } + delete trit; } else { // Reset which transaction to send - for (Transaction *transaction : transactionPartsSent->keySet()) { + SetIterator *> *trit = getKeyIterator(transactionPartsSent); + while (trit->hasNext()) { + Transaction *transaction = trit->next(); transaction->resetNextPartToSend(); // Set the transaction sequence number back to nothing @@ -748,23 +792,25 @@ bool Table::sendToServer(NewKey *newKey) { transaction->setSequenceNumber(-1); } } + delete trit; } // Clear the sent data in preparation for next send pendingSendArbitrationEntriesToDelete->clear(); transactionPartsSent->clear(); - if (sendSlotsReturn->getThird()->length() != 0) { + if (sendSlotsReturn.getThird()->length() != 0) { // insert into the local block chain - validateAndUpdate(sendSlotsReturn->getThird(), true); + validateAndUpdate(sendSlotsReturn.getThird(), true); } } } catch (ServerException *e) { - - if (e->getType() != ServerException->TypeInputTimeout) { + if (e->getType() != ServerException_TypeInputTimeout) { // Nothing was able to be sent to the server so just clear these data structures - for (Transaction *transaction : transactionPartsSent->keySet()) { + SetIterator *> *trit = getKeyIterator(transactionPartsSent); + while (trit->hasNext()) { + Transaction *transaction = trit->next(); transaction->resetNextPartToSend(); // Set the transaction sequence number back to nothing @@ -772,15 +818,19 @@ bool Table::sendToServer(NewKey *newKey) { transaction->setSequenceNumber(-1); } } + delete trit; } else { // There was a partial send to the server hadPartialSendToServer = true; // Nothing was able to be sent to the server so just clear these data structures - for (Transaction *transaction : transactionPartsSent->keySet()) { + SetIterator *> *trit = getKeyIterator(transactionPartsSent); + while (trit->hasNext()) { + Transaction *transaction = trit->next(); transaction->resetNextPartToSend(); transaction->setServerFailure(); } + delete trit; } pendingSendArbitrationEntriesToDelete->clear(); @@ -793,17 +843,16 @@ bool Table::sendToServer(NewKey *newKey) { } bool Table::updateFromLocal(int64_t machineId) { - Pair localCommunicationInformation = localCommunicationTable->get(machineId); - if (localCommunicationInformation == NULL) { - // Cant talk to that device locally so do nothing + if (!localCommunicationTable->contains(machineId)) return false; - } + + Pair *localCommunicationInformation = localCommunicationTable->get(machineId); // Get the size of the send data int sendDataSize = sizeof(int32_t) + sizeof(int64_t); int64_t lastArbitrationDataLocalSequenceNumber = (int64_t) -1; - if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(machineId) != NULL) { + if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->contains(machineId)) { lastArbitrationDataLocalSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(machineId); } @@ -830,10 +879,10 @@ bool Table::updateFromLocal(int64_t machineId) { for (int i = 0; i < numberOfEntries; i++) { char type = bbDecode->get(); if (type == TypeAbort) { - Abort *abort = (Abort)Abort_decode(NULL, bbDecode); + Abort *abort = (Abort *)Abort_decode(NULL, bbDecode); processEntry(abort); } else if (type == TypeCommitPart) { - CommitPart *commitPart = (CommitPart)CommitPart_decode(NULL, bbDecode); + CommitPart *commitPart = (CommitPart *)CommitPart_decode(NULL, bbDecode); processEntry(commitPart); } } @@ -846,36 +895,43 @@ bool Table::updateFromLocal(int64_t machineId) { Pair Table::sendTransactionToLocal(Transaction *transaction) { // Get the devices local communications - Pair localCommunicationInformation = localCommunicationTable->get(transaction->getArbitrator()); - - if (localCommunicationInformation == NULL) { - // Cant talk to that device locally so do nothing + if (!localCommunicationTable->contains(transaction->getArbitrator())) return Pair(true, false); - } + + Pair *localCommunicationInformation = localCommunicationTable->get(transaction->getArbitrator()); // Get the size of the send data int sendDataSize = sizeof(int32_t) + sizeof(int64_t); - for (TransactionPart *part : transaction->getParts()->values()) { - sendDataSize += part->getSize(); + { + Vector *tParts = transaction->getParts(); + uint tPartsSize = tParts->size(); + for (uint i = 0; i < tPartsSize; i++) { + TransactionPart *part = tParts->get(i); + sendDataSize += part->getSize(); + } } int64_t lastArbitrationDataLocalSequenceNumber = (int64_t) -1; - if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(transaction->getArbitrator()) != NULL) { + if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->contains(transaction->getArbitrator())) { lastArbitrationDataLocalSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(transaction->getArbitrator()); } // Make the send data size Array *sendData = new Array(sendDataSize); - ByteBuffer *bbEncode = ByteBuffer.wrap(sendData); + ByteBuffer *bbEncode = ByteBuffer_wrap(sendData); // Encode the data bbEncode->putLong(lastArbitrationDataLocalSequenceNumber); bbEncode->putInt(transaction->getParts()->size()); - for (TransactionPart *part : transaction->getParts()->values()) { - part->encode(bbEncode); + { + Vector *tParts = transaction->getParts(); + uint tPartsSize = tParts->size(); + for (uint i = 0; i < tPartsSize; i++) { + TransactionPart *part = tParts->get(i); + part->encode(bbEncode); + } } - // Send by local Array *returnData = cloud->sendLocalData(sendData, localSequenceNumber, localCommunicationInformation->getFirst(), localCommunicationInformation->getSecond()); localSequenceNumber++; @@ -895,7 +951,7 @@ Pair Table::sendTransactionToLocal(Transaction *transaction) { for (int i = 0; i < numberOfEntries; i++) { char type = bbDecode->get(); if (type == TypeAbort) { - Abort *abort = (Abort)Abort_decode(NULL, bbDecode); + Abort *abort = (Abort *)Abort_decode(NULL, bbDecode); if ((abort->getTransactionMachineId() == localMachineId) && (abort->getTransactionClientLocalSequenceNumber() == transaction->getClientLocalSequenceNumber())) { foundAbort = true; @@ -903,7 +959,7 @@ Pair Table::sendTransactionToLocal(Transaction *transaction) { processEntry(abort); } else if (type == TypeCommitPart) { - CommitPart *commitPart = (CommitPart)CommitPart_decode(NULL, bbDecode); + CommitPart *commitPart = (CommitPart *)CommitPart_decode(NULL, bbDecode); processEntry(commitPart); } } @@ -911,14 +967,14 @@ Pair Table::sendTransactionToLocal(Transaction *transaction) { updateLiveStateFromLocal(); if (couldArbitrate) { - TransactionStatus status = transaction->getTransactionStatus(); + TransactionStatus *status = transaction->getTransactionStatus(); if (didCommit) { status->setStatus(TransactionStatus_StatusCommitted); } else { status->setStatus(TransactionStatus_StatusAborted); } } else { - TransactionStatus status = transaction->getTransactionStatus(); + TransactionStatus *status = transaction->getTransactionStatus(); if (foundAbort) { status->setStatus(TransactionStatus_StatusAborted); } else { @@ -930,7 +986,6 @@ Pair Table::sendTransactionToLocal(Transaction *transaction) { } Array *Table::acceptDataFromLocal(Array *data) { - // Decode the data ByteBuffer *bbDecode = ByteBuffer_wrap(data); int64_t lastArbitratedSequenceNumberSeen = bbDecode->getLong(); @@ -946,20 +1001,20 @@ Array *Table::acceptDataFromLocal(Array *data) { Transaction *transaction = new Transaction(); for (int i = 0; i < numberOfParts; i++) { bbDecode->get(); - TransactionPart *newPart = (TransactionPart)TransactionPart.decode(NULL, bbDecode); + TransactionPart *newPart = (TransactionPart *)TransactionPart_decode(NULL, bbDecode); transaction->addPartDecode(newPart); } // Arbitrate on transaction and pull relevant return data Pair localArbitrateReturn = arbitrateOnLocalTransaction(transaction); - couldArbitrate = localArbitrateReturn->getFirst(); - didCommit = localArbitrateReturn->getSecond(); + couldArbitrate = localArbitrateReturn.getFirst(); + didCommit = localArbitrateReturn.getSecond(); updateLiveStateFromLocal(); // Transaction was sent to the server so keep track of it to prevent double commit if (transaction->getSequenceNumber() != -1) { - offlineTransactionsCommittedAndAtServer->add(transaction->getId()); + offlineTransactionsCommittedAndAtServer->add(new Pair(transaction->getId())); } } @@ -968,9 +1023,19 @@ Array *Table::acceptDataFromLocal(Array *data) { Vector *unseenArbitrations = new Vector(); // Get the aborts to send back - Vector *abortLocalSequenceNumbers = new Vector(liveAbortsGeneratedByLocal->keySet()); - Collections->sort(abortLocalSequenceNumbers); - for (int64_t localSequenceNumber : abortLocalSequenceNumbers) { + Vector *abortLocalSequenceNumbers = new Vector(); + { + SetIterator *abortit = getKeyIterator(liveAbortsGeneratedByLocal); + while (abortit->hasNext()) + abortLocalSequenceNumbers->add(abortit->next()); + delete abortit; + } + + qsort(abortLocalSequenceNumbers->expose(), abortLocalSequenceNumbers->size(), sizeof(int64_t), compareInt64); + + uint asize = abortLocalSequenceNumbers->size(); + for (uint i = 0; i < asize; i++) { + int64_t localSequenceNumber = abortLocalSequenceNumbers->get(i); if (localSequenceNumber <= lastArbitratedSequenceNumberSeen) { continue; } @@ -983,20 +1048,32 @@ Array *Table::acceptDataFromLocal(Array *data) { // Get the commits to send back Hashtable *commitForClientTable = liveCommitsTable->get(localMachineId); if (commitForClientTable != NULL) { - Vector *commitLocalSequenceNumbers = new Vector(commitForClientTable->keySet()); - Collections->sort(commitLocalSequenceNumbers); - - for (int64_t localSequenceNumber : commitLocalSequenceNumbers) { + Vector *commitLocalSequenceNumbers = new Vector(); + { + SetIterator *commitit = getKeyIterator(commitForClientTable); + while (commitit->hasNext()) + commitLocalSequenceNumbers->add(commitit->next()); + delete commitit; + } + qsort(commitLocalSequenceNumbers->expose(), commitLocalSequenceNumbers->size(), sizeof(int64_t), compareInt64); + + uint clsSize = commitLocalSequenceNumbers->size(); + for (uint clsi = 0; clsi < clsSize; clsi++) { + int64_t localSequenceNumber = commitLocalSequenceNumbers->get(clsi); Commit *commit = commitForClientTable->get(localSequenceNumber); if (localSequenceNumber <= lastArbitratedSequenceNumberSeen) { continue; } - unseenArbitrations->addAll(commit->getParts()->values()); - - for (CommitPart commitPart : commit->getParts()->values()) { - returnDataSize += commitPart->getSize(); + { + Vector *parts = commit->getParts(); + uint nParts = parts->size(); + for (uint i = 0; i < nParts; i++) { + CommitPart *commitPart = parts->get(i); + unseenArbitrations->add(commitPart); + returnDataSize += commitPart->getSize(); + } } } } @@ -1027,11 +1104,12 @@ Array *Table::acceptDataFromLocal(Array *data) { } bbEncode->putInt(unseenArbitrations->size()); - for (Entry *entry : unseenArbitrations) { + uint size = unseenArbitrations->size(); + for (uint i = 0; i < size; i++) { + Entry *entry = unseenArbitrations->get(i); entry->encode(bbEncode); } - localSequenceNumber++; return returnData; } @@ -1058,20 +1136,26 @@ ThreeTuple *> Table::sendSlotsToServer(Slot *slot, int if (hadPartialSendToServer) { bool isInserted = false; - for (Slot *s : array) { + uint size = array->length(); + for (uint i = 0; i < size; i++) { + Slot *s = array->get(i); if ((s->getSequenceNumber() == slot->getSequenceNumber()) && (s->getMachineID() == localMachineId)) { isInserted = true; break; } } - for (Slot *s : array) { + for (uint i = 0; i < size; i++) { + Slot *s = array->get(i); if (isInserted) { break; } // Process each entry in the slot - for (Entry *entry : s->getEntries()) { + Vector *entries = s->getEntries(); + uint eSize = entries->size(); + for (uint ei = 0; ei < eSize; ei++) { + Entry *entry = entries->get(ei); if (entry->getType() == TypeLastMessage) { LastMessage *lastMessage = (LastMessage *)entry; @@ -1103,12 +1187,9 @@ ThreeTuple *> Table::sendSlotsToServer(Slot *slot, int * Returns false if a resize was needed */ ThreeTuple Table::fillSlot(Slot *slot, bool resize, NewKey *newKeyEntry) { - - int newSize = 0; if (liveSlotCount > bufferResizeThreshold) { - resize = true; //Resize is forced - + resize = true;//Resize is forced } if (resize) { @@ -1124,9 +1205,9 @@ ThreeTuple Table::fillSlot(Slot *slot, bool resize, NewKey ThreeTuple mandatoryRescueReturn = doMandatoryResuce(slot, resize); // Extract working variables - bool needsResize = mandatoryRescueReturn->getFirst(); - bool seenLiveSlot = mandatoryRescueReturn->getSecond(); - int64_t currentRescueSequenceNumber = mandatoryRescueReturn->getThird(); + bool needsResize = mandatoryRescueReturn.getFirst(); + bool seenLiveSlot = mandatoryRescueReturn.getSecond(); + int64_t currentRescueSequenceNumber = mandatoryRescueReturn.getThird(); if (needsResize && !resize) { // We need to resize but we are not resizing so return false @@ -1137,7 +1218,6 @@ ThreeTuple Table::fillSlot(Slot *slot, bool resize, NewKey if (newKeyEntry != NULL) { newKeyEntry->setSlot(slot); if (slot->hasSpace(newKeyEntry)) { - slot->addEntry(newKeyEntry); inserted = true; } @@ -1146,17 +1226,20 @@ ThreeTuple Table::fillSlot(Slot *slot, bool resize, NewKey // Clear the transactions, aborts and commits that were sent previously transactionPartsSent->clear(); pendingSendArbitrationEntriesToDelete->clear(); - - for (ArbitrationRound *round : pendingSendArbitrationRounds) { + uint size = pendingSendArbitrationRounds->size(); + for (uint i = 0; i < size; i++) { + ArbitrationRound *round = pendingSendArbitrationRounds->get(i); bool isFull = false; round->generateParts(); Vector *parts = round->getParts(); // Insert pending arbitration data - for (Entry *arbitrationData : parts) { + uint vsize = parts->size(); + for (uint vi = 0; vi < vsize; vi++) { + Entry *arbitrationData = parts->get(vi); // If it is an abort then we need to set some information - if (arbitrationData instanceof Abort) { + if (arbitrationData->getType() == TypeAbort) { ((Abort *)arbitrationData)->setSequenceNumber(slot->getSequenceNumber()); } @@ -1177,22 +1260,14 @@ ThreeTuple Table::fillSlot(Slot *slot, bool resize, NewKey } if (pendingTransactionQueue->size() > 0) { - Transaction *transaction = pendingTransactionQueue->get(0); - // Set the transaction sequence number if it has yet to be inserted into the block chain - // if ((!transaction->didSendAPartToServer() && !transaction->getServerFailure()) || (transaction->getSequenceNumber() == -1)) { - // transaction->setSequenceNumber(slot->getSequenceNumber()); - // } - if ((!transaction->didSendAPartToServer()) || (transaction->getSequenceNumber() == -1)) { transaction->setSequenceNumber(slot->getSequenceNumber()); } - while (true) { TransactionPart *part = transaction->getNextPartToSend(); - if (part == NULL) { // Ran out of parts to send for this transaction so move on break; @@ -1225,7 +1300,7 @@ void Table::doRejectedMessages(Slot *s) { * there is already a sufficient entry in the queue (e->g->, * equalsto value of true and same sequence number)-> */ - int64_t old_seqn = rejectedSlotVector->firstElement(); + int64_t old_seqn = rejectedSlotVector->get(0); if (rejectedSlotVector->size() > Table_REJECTED_THRESHOLD) { int64_t new_seqn = rejectedSlotVector->lastElement(); RejectedMessage *rm = new RejectedMessage(s, s->getSequenceNumber(), localMachineId, old_seqn, new_seqn, false); @@ -1273,7 +1348,7 @@ ThreeTuple Table::doMandatoryResuce(Slot *slot, bool resize // Mandatory Rescue for (; currentSequenceNumber < threshold; currentSequenceNumber++) { - Slot previousSlot = buffer->getSlot(currentSequenceNumber); + Slot *previousSlot = buffer->getSlot(currentSequenceNumber); // Push slot number forward if (!seenLiveSlot) { oldestLiveSlotSequenceNumver = currentSequenceNumber; @@ -1290,16 +1365,15 @@ ThreeTuple Table::doMandatoryResuce(Slot *slot, bool resize Vector *liveEntries = previousSlot->getLiveEntries(resize); // Iterate over all the live entries and try to rescue them - for (Entry *liveEntry : liveEntries) { + uint lESize = liveEntries->size(); + for (uint i = 0; i < lESize; i++) { + Entry *liveEntry = liveEntries->get(i); if (slot->hasSpace(liveEntry)) { - // Enough space to rescue the entry slot->addEntry(liveEntry); } else if (currentSequenceNumber == firstIfFull) { //if there's no space but the entry is about to fall off the queue - System->out->println("B"); //? return ThreeTuple(true, seenLiveSlot, currentSequenceNumber); - } } } @@ -1314,7 +1388,6 @@ void Table::doOptionalRescue(Slot *s, bool seenliveslot, int64_t seqn, bool resi * for SKIP_THRESHOLD consecutive entries*/ int skipcount = 0; int64_t newestseqnum = buffer->getNewestSeqNum(); -search: for (; seqn <= newestseqnum; seqn++) { Slot *prevslot = buffer->getSlot(seqn); //Push slot number forward @@ -1325,62 +1398,78 @@ search: continue; seenliveslot = true; Vector *liveentries = prevslot->getLiveEntries(resize); - for (Entry *liveentry : liveentries) { + uint lESize = liveentries->size(); + for (uint i = 0; i < lESize; i++) { + Entry *liveentry = liveentries->get(i); if (s->hasSpace(liveentry)) s->addEntry(liveentry); else { skipcount++; if (skipcount > Table_SKIP_THRESHOLD) - break search; + goto donesearch; } } } +donesearch: + ; } /** * Checks for malicious activity and updates the local copy of the block chain-> */ void Table::validateAndUpdate(Array *newSlots, bool acceptUpdatesToLocal) { - - // The cloud communication layer has checked slot HMACs already before decoding + // The cloud communication layer has checked slot HMACs already + // before decoding if (newSlots->length() == 0) { return; } - // Make sure all slots are newer than the last largest slot this client has seen - int64_t firstSeqNum = newSlots[0]->getSequenceNumber(); + // Make sure all slots are newer than the last largest slot this + // client has seen + int64_t firstSeqNum = newSlots->get(0)->getSequenceNumber(); if (firstSeqNum <= sequenceNumber) { throw new Error("Server Error: Sent older slots!"); } - // Create an object that can access both new slots and slots in our local chain - // without committing slots to our local chain + // Create an object that can access both new slots and slots in our + // local chain without committing slots to our local chain SlotIndexer *indexer = new SlotIndexer(newSlots, buffer); // Check that the HMAC chain is not broken checkHMACChain(indexer, newSlots); // Set to keep track of messages from clients - Hashset *machineSet = new Hashset(lastMessageTable->keySet()); + Hashset *machineSet = new Hashset(); + { + SetIterator *> *lmit = getKeyIterator(lastMessageTable); + while (lmit->hasNext()) + machineSet->add(lmit->next()); + delete lmit; + } // Process each slots data - for (Slot *slot : newSlots) { - processSlot(indexer, slot, acceptUpdatesToLocal, machineSet); - - updateExpectedSize(); + { + uint numSlots = newSlots->length(); + for (uint i = 0; i < numSlots; i++) { + Slot *slot = newSlots->get(i); + processSlot(indexer, slot, acceptUpdatesToLocal, machineSet); + updateExpectedSize(); + } } - // If there is a gap, check to see if the server sent us everything-> + // If there is a gap, check to see if the server sent us + // everything-> if (firstSeqNum != (sequenceNumber + 1)) { // Check the size of the slots that were sent down by the server-> // Can only check the size if there was a gap - checkNumSlots(newSlots->length); + checkNumSlots(newSlots->length()); - // Since there was a gap every machine must have pushed a slot or must have - // a last message message-> If not then the server is hiding slots + // Since there was a gap every machine must have pushed a slot or + // must have a last message message-> If not then the server is + // hiding slots if (!machineSet->isEmpty()) { - throw new Error("Missing record for machines: " + machineSet); + throw new Error("Missing record for machines: "); } } @@ -1388,18 +1477,21 @@ void Table::validateAndUpdate(Array *newSlots, bool acceptUpdatesToLocal commitNewMaxSize(); // Commit new to slots to the local block chain-> - for (Slot *slot : newSlots) { + { + uint numSlots = newSlots->length(); + for (uint i = 0; i < numSlots; i++) { + Slot *slot = newSlots->get(i); - // Insert this slot into our local block chain copy-> - buffer->putSlot(slot); + // Insert this slot into our local block chain copy-> + buffer->putSlot(slot); - // Keep track of how many slots are currently live (have live data in them)-> - liveSlotCount++; + // Keep track of how many slots are currently live (have live data + // in them)-> + liveSlotCount++; + } } - // Get the sequence number of the latest slot in the system - sequenceNumber = newSlots[newSlots->length() - 1]->getSequenceNumber(); - + sequenceNumber = newSlots->get(newSlots->length() - 1)->getSequenceNumber(); updateLiveStateFromServer(); // No Need to remember after we pulled from the server @@ -1440,23 +1532,13 @@ void Table::updateLiveStateFromLocal() { } void Table::initExpectedSize(int64_t firstSequenceNumber, int64_t numberOfSlots) { - // if (didFindTableStatus) { - // return; - // } int64_t prevslots = firstSequenceNumber; - if (didFindTableStatus) { - // expectedsize = (prevslots < ((int64_t) numberOfSlots)) ? (int) prevslots : expectedsize; - // System->out->println("Here2: " + expectedsize + " " + numberOfSlots + " " + prevslots); - } else { expectedsize = (prevslots < ((int64_t) numberOfSlots)) ? (int) prevslots : numberOfSlots; - // System->out->println("Here: " + expectedsize); } - // System->out->println(numberOfSlots); - didFindTableStatus = true; currMaxSize = numberOfSlots; } @@ -1471,22 +1553,18 @@ void Table::updateExpectedSize() { /** - * Check the size of the block chain to make sure there are enough slots sent back by the server-> - * This is only called when we have a gap between the slots that we have locally and the slots - * sent by the server therefore in the slots sent by the server there will be at least 1 Table - * status message + * Check the size of the block chain to make sure there are enough + * slots sent back by the server-> This is only called when we have a + * gap between the slots that we have locally and the slots sent by + * the server therefore in the slots sent by the server there will be + * at least 1 Table status message */ void Table::checkNumSlots(int numberOfSlots) { if (numberOfSlots != expectedsize) { - throw new Error("Server Error: Server did not send all slots-> Expected: " + expectedsize + " Received:" + numberOfSlots); + throw new Error("Server Error: Server did not send all slots-> Expected: "); } } -void Table::updateCurrMaxSize(int newmaxsize) { - currMaxSize = newmaxsize; -} - - /** * Update the size of of the local buffer if it is needed-> */ @@ -1501,13 +1579,14 @@ void Table::commitNewMaxSize() { // Change the number of local slots to the new size numberOfSlots = (int32_t)currMaxSize; - - // Recalculate the resize threshold since the size of the local buffer has changed + // Recalculate the resize threshold since the size of the local + // buffer has changed setResizeThreshold(); } /** - * Process the new transaction parts from this latest round of slots received from the server + * Process the new transaction parts from this latest round of slots + * received from the server */ void Table::processNewTransactionParts() { @@ -1516,19 +1595,26 @@ void Table::processNewTransactionParts() { return; } - // Iterate through all the machine Ids that we received new parts for - for (int64_t machineId : newTransactionParts->keySet()) { - Hashtable, TransactionPart *> *parts = newTransactionParts->get(machineId); + // Iterate through all the machine Ids that we received new parts + // for + SetIterator *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *> *tpit = getKeyIterator(newTransactionParts); + while (tpit->hasNext()) { + int64_t machineId = tpit->next(); + Hashtable *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *parts = newTransactionParts->get(machineId); + SetIterator *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *ptit = getKeyIterator(parts); // Iterate through all the parts for that machine Id - for (Pair partId : parts->keySet()) { + while (ptit->hasNext()) { + Pair *partId = ptit->next(); TransactionPart *part = parts->get(partId); - int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(part->getArbitratorId()); - if ((lastTransactionNumber != NULL) && (lastTransactionNumber >= part->getSequenceNumber())) { - // Set dead the transaction part - part->setDead(); - continue; + if (lastArbitratedTransactionNumberByArbitratorTable->contains(part->getArbitratorId())) { + int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(part->getArbitratorId()); + if (lastTransactionNumber >= part->getSequenceNumber()) { + // Set dead the transaction part + part->setDead(); + continue; + } } // Get the transaction object for that sequence number @@ -1540,15 +1626,17 @@ void Table::processNewTransactionParts() { // Insert this new transaction into the live tables liveTransactionBySequenceNumberTable->put(part->getSequenceNumber(), transaction); - liveTransactionByTransactionIdTable->put(part->getTransactionId(), transaction); + liveTransactionByTransactionIdTable->put(new Pair(part->getTransactionId()), transaction); } // Add that part to the transaction transaction->addPartDecode(part); } + delete ptit; } - - // Clear all the new transaction parts in preparation for the next time the server sends slots + delete tpit; + // Clear all the new transaction parts in preparation for the next + // time the server sends slots newTransactionParts->clear(); } @@ -1560,22 +1648,28 @@ void Table::arbitrateFromServer() { } // Get the transaction sequence numbers and sort from oldest to newest - Vector *transactionSequenceNumbers = new Vector(liveTransactionBySequenceNumberTable->keySet()); - Collections->sort(transactionSequenceNumbers); + Vector *transactionSequenceNumbers = new Vector(); + { + SetIterator *trit = getKeyIterator(liveTransactionBySequenceNumberTable); + while (trit->hasNext()) + transactionSequenceNumbers->add(trit->next()); + delete trit; + } + qsort(transactionSequenceNumbers->expose(), transactionSequenceNumbers->size(), sizeof(int64_t), compareInt64); // Collection of key value pairs that are - Hashtable speculativeTableTmp = new Hashtable(); + Hashtable *speculativeTableTmp = new Hashtable(); // The last transaction arbitrated on int64_t lastTransactionCommitted = -1; Hashset *generatedAborts = new Hashset(); - - for (int64_t transactionSequenceNumber : transactionSequenceNumbers) { + uint tsnSize = transactionSequenceNumbers->size(); + for (uint i = 0; i < tsnSize; i++) { + int64_t transactionSequenceNumber = transactionSequenceNumbers->get(i); Transaction *transaction = liveTransactionBySequenceNumberTable->get(transactionSequenceNumber); - - - // Check if this machine arbitrates for this transaction if not then we cant arbitrate this transaction + // Check if this machine arbitrates for this transaction if not + // then we cant arbitrate this transaction if (transaction->getArbitrator() != localMachineId) { continue; } @@ -1598,7 +1692,7 @@ void Table::arbitrateFromServer() { // update the largest transaction seen by arbitrator from server - if (lastTransactionSeenFromMachineFromServer->get(transaction->getMachineId()) == NULL) { + if (!lastTransactionSeenFromMachineFromServer->contains(transaction->getMachineId())) { lastTransactionSeenFromMachineFromServer->put(transaction->getMachineId(), transaction->getClientLocalSequenceNumber()); } else { int64_t lastTransactionSeenFromMachine = lastTransactionSeenFromMachineFromServer->get(transaction->getMachineId()); @@ -1611,15 +1705,17 @@ void Table::arbitrateFromServer() { // Guard evaluated as true // Update the local changes so we can make the commit - for (KeyValue *kv : transaction->getKeyValueUpdateSet()) { + SetIterator *kvit = transaction->getKeyValueUpdateSet()->iterator(); + while (kvit->hasNext()) { + KeyValue *kv = kvit->next(); speculativeTableTmp->put(kv->getKey(), kv); } + delete kvit; // Update what the last transaction committed was for use in batch commit lastTransactionCommitted = transactionSequenceNumber; } else { // Guard evaluated was false so create abort - // Create the abort Abort *newAbort = new Abort(NULL, transaction->getClientLocalSequenceNumber(), @@ -1628,7 +1724,6 @@ void Table::arbitrateFromServer() { transaction->getArbitrator(), localArbitrationSequenceNumber); localArbitrationSequenceNumber++; - generatedAborts->add(newAbort); // Insert the abort so we can process @@ -1636,31 +1731,35 @@ void Table::arbitrateFromServer() { } lastSeqNumArbOn = transactionSequenceNumber; - - // liveTransactionBySequenceNumberTable->remove(transactionSequenceNumber); } Commit *newCommit = NULL; // If there is something to commit if (speculativeTableTmp->size() != 0) { - // Create the commit and increment the commit sequence number newCommit = new Commit(localArbitrationSequenceNumber, localMachineId, lastTransactionCommitted); localArbitrationSequenceNumber++; // Add all the new keys to the commit - for (KeyValue *kv : speculativeTableTmp->values()) { + SetIterator *spit = getKeyIterator(speculativeTableTmp); + while (spit->hasNext()) { + IoTString *string = spit->next(); + KeyValue *kv = speculativeTableTmp->get(string); newCommit->addKV(kv); } + delete spit; // create the commit parts newCommit->createCommitParts(); - // Append all the commit parts to the end of the pending queue waiting for sending to the server - + // Append all the commit parts to the end of the pending queue + // waiting for sending to the server // Insert the commit so we can process it - for (CommitPart *commitPart : newCommit->getParts()->values()) { + Vector *parts = newCommit->getParts(); + uint partsSize = parts->size(); + for (uint i = 0; i < partsSize; i++) { + CommitPart *commitPart = parts->get(i); processEntry(commitPart); } } @@ -1672,7 +1771,10 @@ void Table::arbitrateFromServer() { if (compactArbitrationData()) { ArbitrationRound *newArbitrationRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1); if (newArbitrationRound->getCommit() != NULL) { - for (CommitPart *commitPart : newArbitrationRound->getCommit()->getParts()->values()) { + Vector *parts = newArbitrationRound->getCommit()->getParts(); + uint partsSize = parts->size(); + for (uint i = 0; i < partsSize; i++) { + CommitPart *commitPart = parts->get(i); processEntry(commitPart); } } @@ -1682,7 +1784,8 @@ void Table::arbitrateFromServer() { Pair Table::arbitrateOnLocalTransaction(Transaction *transaction) { - // Check if this machine arbitrates for this transaction if not then we cant arbitrate this transaction + // Check if this machine arbitrates for this transaction if not then + // we cant arbitrate this transaction if (transaction->getArbitrator() != localMachineId) { return Pair(false, false); } @@ -1695,7 +1798,7 @@ Pair Table::arbitrateOnLocalTransaction(Transaction *transaction) { if (transaction->getMachineId() != localMachineId) { // dont do this check for local transactions - if (lastTransactionSeenFromMachineFromServer->get(transaction->getMachineId()) != NULL) { + if (lastTransactionSeenFromMachineFromServer->contains(transaction->getMachineId())) { if (lastTransactionSeenFromMachineFromServer->get(transaction->getMachineId()) > transaction->getClientLocalSequenceNumber()) { // We've have already seen this from the server return Pair(false, false); @@ -1704,32 +1807,41 @@ Pair Table::arbitrateOnLocalTransaction(Transaction *transaction) { } if (transaction->evaluateGuard(committedKeyValueTable, NULL, NULL)) { - // Guard evaluated as true - - // Create the commit and increment the commit sequence number + // Guard evaluated as true Create the commit and increment the + // commit sequence number Commit *newCommit = new Commit(localArbitrationSequenceNumber, localMachineId, -1); localArbitrationSequenceNumber++; // Update the local changes so we can make the commit - for (KeyValue *kv : transaction->getKeyValueUpdateSet()) { + SetIterator *kvit = transaction->getKeyValueUpdateSet()->iterator(); + while (kvit->hasNext()) { + KeyValue *kv = kvit->next(); newCommit->addKV(kv); } + delete kvit; // create the commit parts newCommit->createCommitParts(); - // Append all the commit parts to the end of the pending queue waiting for sending to the server + // Append all the commit parts to the end of the pending queue + // waiting for sending to the server ArbitrationRound *arbitrationRound = new ArbitrationRound(newCommit, new Hashset()); pendingSendArbitrationRounds->add(arbitrationRound); if (compactArbitrationData()) { ArbitrationRound *newArbitrationRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1); - for (CommitPart *commitPart : newArbitrationRound->getCommit()->getParts()->values()) { + Vector *parts = newArbitrationRound->getCommit()->getParts(); + uint partsSize = parts->size(); + for (uint i = 0; i < partsSize; i++) { + CommitPart *commitPart = parts->get(i); processEntry(commitPart); } } else { // Insert the commit so we can process it - for (CommitPart *commitPart : newCommit->getParts()->values()) { + Vector *parts = newCommit->getParts(); + uint partsSize = parts->size(); + for (uint i = 0; i < partsSize; i++) { + CommitPart *commitPart = parts->get(i); processEntry(commitPart); } } @@ -1744,19 +1856,16 @@ Pair Table::arbitrateOnLocalTransaction(Transaction *transaction) { updateLiveStateFromLocal(); return Pair(true, true); } else { - if (transaction->getMachineId() == localMachineId) { // For locally created messages update the status - // Guard evaluated was false so create abort - TransactionStatus status = transaction->getTransactionStatus(); + TransactionStatus *status = transaction->getTransactionStatus(); if (status != NULL) { status->setStatus(TransactionStatus_StatusAborted); } } else { Hashset *addAbortSet = new Hashset(); - // Create the abort Abort *newAbort = new Abort(NULL, transaction->getClientLocalSequenceNumber(), @@ -1765,17 +1874,20 @@ Pair Table::arbitrateOnLocalTransaction(Transaction *transaction) { transaction->getArbitrator(), localArbitrationSequenceNumber); localArbitrationSequenceNumber++; - addAbortSet->add(newAbort); - - // Append all the commit parts to the end of the pending queue waiting for sending to the server + // Append all the commit parts to the end of the pending queue + // waiting for sending to the server ArbitrationRound *arbitrationRound = new ArbitrationRound(NULL, addAbortSet); pendingSendArbitrationRounds->add(arbitrationRound); if (compactArbitrationData()) { ArbitrationRound *newArbitrationRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1); - for (CommitPart *commitPart : newArbitrationRound->getCommit()->getParts()->values()) { + + Vector *parts = newArbitrationRound->getCommit()->getParts(); + uint partsSize = parts->size(); + for (uint i = 0; i < partsSize; i++) { + CommitPart *commitPart = parts->get(i); processEntry(commitPart); } } @@ -1787,17 +1899,18 @@ Pair Table::arbitrateOnLocalTransaction(Transaction *transaction) { } /** - * Compacts the arbitration data my merging commits and aggregating aborts so that a single large push of commits can be done instead of many small updates + * Compacts the arbitration data my merging commits and aggregating + * aborts so that a single large push of commits can be done instead + * of many small updates */ bool Table::compactArbitrationData() { - if (pendingSendArbitrationRounds->size() < 2) { // Nothing to compact so do nothing return false; } ArbitrationRound *lastRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1); - if (lastRound->didSendPart()) { + if (lastRound->getDidSendPart()) { return false; } @@ -1806,26 +1919,25 @@ bool Table::compactArbitrationData() { int numberToDelete = 1; while (numberToDelete < pendingSendArbitrationRounds->size()) { - ArbitrationRound *xs round = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - numberToDelete - 1); + ArbitrationRound *round = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - numberToDelete - 1); - if (round->isFull() || round->didSendPart()) { - // Stop since there is a part that cannot be compacted and we need to compact in order + if (round->isFull() || round->getDidSendPart()) { + // Stop since there is a part that cannot be compacted and we + // need to compact in order break; } if (round->getCommit() == NULL) { - // Try compacting aborts only int newSize = round->getCurrentSize() + lastRound->getAbortsCount(); - if (newSize > ArbitrationRound->MAX_PARTS) { + if (newSize > ArbitrationRound_MAX_PARTS) { // Cant compact since it would be too large break; } lastRound->addAborts(round->getAborts()); } else { - // Create a new larger commit - Commit newCommit = Commit->merge(lastRound->getCommit(), round->getCommit(), localArbitrationSequenceNumber); + Commit *newCommit = Commit_merge(lastRound->getCommit(), round->getCommit(), localArbitrationSequenceNumber); localArbitrationSequenceNumber++; // Create the commit parts so that we can count them @@ -1836,7 +1948,7 @@ bool Table::compactArbitrationData() { newSize += lastRound->getAbortsCount(); newSize += round->getAbortsCount(); - if (newSize > ArbitrationRound->MAX_PARTS) { + if (newSize > ArbitrationRound_MAX_PARTS) { // Cant compact since it would be too large break; } @@ -1852,13 +1964,12 @@ bool Table::compactArbitrationData() { if (numberToDelete != 1) { // If there is a compaction - // Delete the previous pieces that are now in the new compacted piece if (numberToDelete == pendingSendArbitrationRounds->size()) { pendingSendArbitrationRounds->clear(); } else { for (int i = 0; i < numberToDelete; i++) { - pendingSendArbitrationRounds->remove(pendingSendArbitrationRounds->size() - 1); + pendingSendArbitrationRounds->removeIndex(pendingSendArbitrationRounds->size() - 1); } } @@ -1873,12 +1984,10 @@ bool Table::compactArbitrationData() { return false; } -// bool compactArbitrationData() { -// return false; -// } /** - * Update all the commits and the committed tables, sets dead the dead transactions + * Update all the commits and the committed tables, sets dead the dead + * transactions */ bool Table::updateCommittedTable() { @@ -1888,11 +1997,15 @@ bool Table::updateCommittedTable() { } // Iterate through all the machine Ids that we received new parts for - for (int64_t machineId : newCommitParts->keySet()) { - Hashtable, CommitPart *> *parts = newCommitParts->get(machineId); + SetIterator *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *> *partsit = getKeyIterator(newCommitParts); + while (partsit->hasNext()) { + int64_t machineId = partsit->next(); + Hashtable *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *parts = newCommitParts->get(machineId); // Iterate through all the parts for that machine Id - for (Pair partId : parts->keySet()) { + SetIterator *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *pairit = getKeyIterator(parts); + while (pairit->hasNext()) { + Pair *partId = pairit->next(); CommitPart *part = parts->get(partId); // Get the transaction object for that sequence number @@ -1917,27 +2030,39 @@ bool Table::updateCommittedTable() { // Add that part to the commit commit->addPartDecode(part); } + delete pairit; } + delete partsit; - // Clear all the new commits parts in preparation for the next time the server sends slots + // Clear all the new commits parts in preparation for the next time + // the server sends slots newCommitParts->clear(); // If we process a new commit keep track of it for future use bool didProcessANewCommit = false; // Process the commits one by one - for (int64_T arbitratorId : liveCommitsTable->keySet()) { + SetIterator *> *liveit = getKeyIterator(liveCommitsTable); + while (liveit->hasNext()) { + int64_t arbitratorId = liveit->next(); // Get all the commits for a specific arbitrator Hashtable *commitForClientTable = liveCommitsTable->get(arbitratorId); // Sort the commits in order - Vector *commitSequenceNumbers = new Vector(commitForClientTable->keySet()); - Collections->sort(commitSequenceNumbers); + Vector *commitSequenceNumbers = new Vector(); + { + SetIterator *clientit = getKeyIterator(commitForClientTable); + while (clientit->hasNext()) + commitSequenceNumbers->add(clientit->next()); + delete clientit; + } + + qsort(commitSequenceNumbers->expose(), commitSequenceNumbers->size(), sizeof(int64_t), compareInt64); // Get the last commit seen from this arbitrator int64_t lastCommitSeenSequenceNumber = -1; - if (lastCommitSeenSequenceNumberByArbitratorTable->get(arbitratorId) != NULL) { + if (lastCommitSeenSequenceNumberByArbitratorTable->contains(arbitratorId)) { lastCommitSeenSequenceNumber = lastCommitSeenSequenceNumberByArbitratorTable->get(arbitratorId); } @@ -1949,10 +2074,13 @@ bool Table::updateCommittedTable() { // Special processing if a commit is not complete if (!commit->isComplete()) { if (i == (commitSequenceNumbers->size() - 1)) { - // If there is an incomplete commit and this commit is the latest one seen then this commit cannot be processed and there are no other commits + // If there is an incomplete commit and this commit is the + // latest one seen then this commit cannot be processed and + // there are no other commits break; } else { - // This is a commit that was already dead but parts of it are still in the block chain (not flushed out yet)-> + // This is a commit that was already dead but parts of it + // are still in the block chain (not flushed out yet)-> // Delete it and move on commit->setDead(); commitForClientTable->remove(commit->getSequenceNumber()); @@ -1962,17 +2090,14 @@ bool Table::updateCommittedTable() { // Update the last transaction that was updated if we can if (commit->getTransactionSequenceNumber() != -1) { - int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(commit->getMachineId()); - - // Update the last transaction sequence number that the arbitrator arbitrated on - if ((lastTransactionNumber == NULL) || (lastTransactionNumber < commit->getTransactionSequenceNumber())) { + // Update the last transaction sequence number that the arbitrator arbitrated on1 + if (!lastArbitratedTransactionNumberByArbitratorTable->contains(commit->getMachineId()) || lastArbitratedTransactionNumberByArbitratorTable->get(commit->getMachineId()) < commit->getTransactionSequenceNumber()) { lastArbitratedTransactionNumberByArbitratorTable->put(commit->getMachineId(), commit->getTransactionSequenceNumber()); } } // Update the last arbitration data that we have seen so far - if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(commit->getMachineId()) != NULL) { - + if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->contains(commit->getMachineId())) { int64_t lastArbitrationSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(commit->getMachineId()); if (commit->getSequenceNumber() > lastArbitrationSequenceNumber) { // Is larger @@ -1983,15 +2108,15 @@ bool Table::updateCommittedTable() { lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->put(commit->getMachineId(), commit->getSequenceNumber()); } - // We have already seen this commit before so need to do the full processing on this commit + // We have already seen this commit before so need to do the + // full processing on this commit if (commit->getSequenceNumber() <= lastCommitSeenSequenceNumber) { // Update the last transaction that was updated if we can if (commit->getTransactionSequenceNumber() != -1) { int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(commit->getMachineId()); - - // Update the last transaction sequence number that the arbitrator arbitrated on - if ((lastTransactionNumber == NULL) || (lastTransactionNumber < commit->getTransactionSequenceNumber())) { + if (!lastArbitratedTransactionNumberByArbitratorTable->contains(commit->getMachineId()) || + lastArbitratedTransactionNumberByArbitratorTable->get(commit->getMachineId()) < commit->getTransactionSequenceNumber()) { lastArbitratedTransactionNumberByArbitratorTable->put(commit->getMachineId(), commit->getTransactionSequenceNumber()); } } @@ -1999,35 +2124,50 @@ bool Table::updateCommittedTable() { continue; } - // If we got here then this is a brand new commit and needs full processing - - // Get what commits should be edited, these are the commits that have live values for their keys + // If we got here then this is a brand new commit and needs full + // processing + // Get what commits should be edited, these are the commits that + // have live values for their keys Hashset *commitsToEdit = new Hashset(); - for (KeyValue *kv : commit->getKeyValueUpdateSet()) { - commitsToEdit->add(liveCommitsByKeyTable->get(kv->getKey())); + { + SetIterator *kvit = commit->getKeyValueUpdateSet()->iterator(); + while (kvit->hasNext()) { + KeyValue *kv = kvit->next(); + Commit *commit = liveCommitsByKeyTable->get(kv->getKey()); + if (commit != NULL) + commitsToEdit->add(commit); + } + delete kvit; } - commitsToEdit->remove(NULL); // remove NULL since it could be in this set // Update each previous commit that needs to be updated - for (Commit *previousCommit : commitsToEdit) { + SetIterator *commitit = commitsToEdit->iterator(); + while (commitit->hasNext()) { + Commit *previousCommit = commitit->next(); // Only bother with live commits (TODO: Maybe remove this check) if (previousCommit->isLive()) { // Update which keys in the old commits are still live - for (KeyValue *kv : commit->getKeyValueUpdateSet()) { - previousCommit->invalidateKey(kv->getKey()); + { + SetIterator *kvit = commit->getKeyValueUpdateSet()->iterator(); + while (kvit->hasNext()) { + KeyValue *kv = kvit->next(); + previousCommit->invalidateKey(kv->getKey()); + } + delete kvit; } // if the commit is now dead then remove it if (!previousCommit->isLive()) { - commitForClientTable->remove(previousCommit); + commitForClientTable->remove(previousCommit->getSequenceNumber()); } } } + delete commitit; // Update the last seen sequence number from this arbitrator - if (lastCommitSeenSequenceNumberByArbitratorTable->get(commit->getMachineId()) != NULL) { + if (lastCommitSeenSequenceNumberByArbitratorTable->contains(commit->getMachineId())) { if (commit->getSequenceNumber() > lastCommitSeenSequenceNumberByArbitratorTable->get(commit->getMachineId())) { lastCommitSeenSequenceNumberByArbitratorTable->put(commit->getMachineId(), commit->getSequenceNumber()); } @@ -2039,48 +2179,69 @@ bool Table::updateCommittedTable() { didProcessANewCommit = true; // Update the committed table of keys and which commit is using which key - for (KeyValue *kv : commit->getKeyValueUpdateSet()) { - committedKeyValueTable->put(kv->getKey(), kv); - liveCommitsByKeyTable->put(kv->getKey(), commit); + { + SetIterator *kvit = commit->getKeyValueUpdateSet()->iterator(); + while (kvit->hasNext()) { + KeyValue *kv = kvit->next(); + committedKeyValueTable->put(kv->getKey(), kv); + liveCommitsByKeyTable->put(kv->getKey(), commit); + } + delete kvit; } } } + delete liveit; return didProcessANewCommit; } /** - * Create the speculative table from transactions that are still live and have come from the cloud + * Create the speculative table from transactions that are still live + * and have come from the cloud */ bool Table::updateSpeculativeTable(bool didProcessNewCommits) { - if (liveTransactionBySequenceNumberTable->keySet()->size() == 0) { + if (liveTransactionBySequenceNumberTable->size() == 0) { // There is nothing to speculate on return false; } - // Create a list of the transaction sequence numbers and sort them from oldest to newest - Vector *transactionSequenceNumbersSorted = new Vector(liveTransactionBySequenceNumberTable->keySet()); - Collections->sort(transactionSequenceNumbersSorted); + // Create a list of the transaction sequence numbers and sort them + // from oldest to newest + Vector *transactionSequenceNumbersSorted = new Vector(); + { + SetIterator *trit = getKeyIterator(liveTransactionBySequenceNumberTable); + while (trit->hasNext()) + transactionSequenceNumbersSorted->add(trit->next()); + delete trit; + } + + qsort(transactionSequenceNumbersSorted->expose(), transactionSequenceNumbersSorted->size(), sizeof(int64_t), compareInt64); bool hasGapInTransactionSequenceNumbers = transactionSequenceNumbersSorted->get(0) != oldestTransactionSequenceNumberSpeculatedOn; if (hasGapInTransactionSequenceNumbers || didProcessNewCommits) { - // If there is a gap in the transaction sequence numbers then there was a commit or an abort of a transaction - // OR there was a new commit (Could be from offline commit) so a redo the speculation from scratch + // If there is a gap in the transaction sequence numbers then + // there was a commit or an abort of a transaction OR there was a + // new commit (Could be from offline commit) so a redo the + // speculation from scratch // Start from scratch speculatedKeyValueTable->clear(); lastTransactionSequenceNumberSpeculatedOn = -1; oldestTransactionSequenceNumberSpeculatedOn = -1; - } // Remember the front of the transaction list oldestTransactionSequenceNumberSpeculatedOn = transactionSequenceNumbersSorted->get(0); // Find where to start arbitration from - int startIndex = transactionSequenceNumbersSorted->indexOf(lastTransactionSequenceNumberSpeculatedOn) + 1; + int startIndex = 0; + + for (; startIndex < transactionSequenceNumbersSorted->size(); startIndex++) + if (transactionSequenceNumbersSorted->get(startIndex) == lastTransactionSequenceNumberSpeculatedOn) + break; + startIndex++; if (startIndex >= transactionSequenceNumbersSorted->size()) { // Make sure we are not out of bounds @@ -2095,8 +2256,9 @@ bool Table::updateSpeculativeTable(bool didProcessNewCommits) { Transaction *transaction = liveTransactionBySequenceNumberTable->get(transactionSequenceNumber); if (!transaction->isComplete()) { - // If there is an incomplete transaction then there is nothing we can do - // add this transactions arbitrator to the list of arbitrators we should ignore + // If there is an incomplete transaction then there is nothing + // we can do add this transactions arbitrator to the list of + // arbitrators we should ignore incompleteTransactionArbitrator->add(transaction->getArbitrator()); didSkip = true; continue; @@ -2110,8 +2272,13 @@ bool Table::updateSpeculativeTable(bool didProcessNewCommits) { if (transaction->evaluateGuard(committedKeyValueTable, speculatedKeyValueTable, NULL)) { // Guard evaluated to true so update the speculative table - for (KeyValue *kv : transaction->getKeyValueUpdateSet()) { - speculatedKeyValueTable->put(kv->getKey(), kv); + { + SetIterator *kvit = transaction->getKeyValueUpdateSet()->iterator(); + while (kvit->hasNext()) { + KeyValue *kv = kvit->next(); + speculatedKeyValueTable->put(kv->getKey(), kv); + } + delete kvit; } } } @@ -2127,7 +2294,8 @@ bool Table::updateSpeculativeTable(bool didProcessNewCommits) { } /** - * Create the pending transaction speculative table from transactions that are still in the pending transaction buffer + * Create the pending transaction speculative table from transactions + * that are still in the pending transaction buffer */ void Table::updatePendingTransactionSpeculativeTable(bool didProcessNewCommitsOrSpeculate) { if (pendingTransactionQueue->size() == 0) { @@ -2143,7 +2311,11 @@ void Table::updatePendingTransactionSpeculativeTable(bool didProcessNewCommitsOr } // Find where to start arbitration from - int startIndex = pendingTransactionQueue->indexOf(firstPendingTransaction) + 1; + int startIndex = 0; + + for (; startIndex < pendingTransactionQueue->size(); startIndex++) + if (pendingTransactionQueue->get(startIndex) == firstPendingTransaction) + break; if (startIndex >= pendingTransactionQueue->size()) { // Make sure we are not out of bounds @@ -2157,49 +2329,59 @@ void Table::updatePendingTransactionSpeculativeTable(bool didProcessNewCommitsOr if (transaction->evaluateGuard(committedKeyValueTable, speculatedKeyValueTable, pendingTransactionSpeculatedKeyValueTable)) { // Guard evaluated to true so update the speculative table - for (KeyValue *kv : transaction->getKeyValueUpdateSet()) { + SetIterator *kvit = transaction->getKeyValueUpdateSet()->iterator(); + while (kvit->hasNext()) { + KeyValue *kv = kvit->next(); pendingTransactionSpeculatedKeyValueTable->put(kv->getKey(), kv); } + delete kvit; } } } /** - * Set dead and remove from the live transaction tables the transactions that are dead + * Set dead and remove from the live transaction tables the + * transactions that are dead */ void Table::updateLiveTransactionsAndStatus() { - // Go through each of the transactions - for (IteratorEntry > *iter = liveTransactionBySequenceNumberTable->entrySet()->iterator(); iter->hasNext();) { - Transaction *transaction = iter->next()->getValue(); - - // Check if the transaction is dead - int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(transaction->getArbitrator()); - if ((lastTransactionNumber != NULL) && (lastTransactionNumber >= transaction->getSequenceNumber())) { - - // Set dead the transaction - transaction->setDead(); - - // Remove the transaction from the live table - iter->remove(); - liveTransactionByTransactionIdTable->remove(transaction->getId()); + { + SetIterator *iter = getKeyIterator(liveTransactionBySequenceNumberTable); + while (iter->hasNext()) { + int64_t key = iter->next(); + Transaction *transaction = liveTransactionBySequenceNumberTable->get(key); + + // Check if the transaction is dead + if (lastArbitratedTransactionNumberByArbitratorTable->contains(transaction->getArbitrator()) && lastArbitratedTransactionNumberByArbitratorTable->get(transaction->getArbitrator() >= transaction->getSequenceNumber())) { + // Set dead the transaction + transaction->setDead(); + + // Remove the transaction from the live table + iter->remove(); + liveTransactionByTransactionIdTable->remove(transaction->getId()); + } } + delete iter; } // Go through each of the transactions - for (IteratorEntry > *iter = outstandingTransactionStatus->entrySet()->iterator(); iter->hasNext();) { - TransactionStatus *status = iter->next()->getValue(); + { + SetIterator *iter = getKeyIterator(outstandingTransactionStatus); + while (iter->hasNext()) { + int64_t key = iter->next(); + TransactionStatus *status = outstandingTransactionStatus->get(key); - // Check if the transaction is dead - int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(status->getTransactionArbitrator()); - if ((lastTransactionNumber != NULL) && (lastTransactionNumber >= status->getTransactionSequenceNumber())) { + // Check if the transaction is dead + if (lastArbitratedTransactionNumberByArbitratorTable->contains(status->getTransactionArbitrator()) && (lastArbitratedTransactionNumberByArbitratorTable->get(status->getTransactionArbitrator()) >= status->getTransactionSequenceNumber())) { - // Set committed - status->setStatus(TransactionStatus_StatusCommitted); + // Set committed + status->setStatus(TransactionStatus_StatusCommitted); - // Remove - iter->remove(); + // Remove + iter->remove(); + } } + delete iter; } } @@ -2212,39 +2394,34 @@ void Table::processSlot(SlotIndexer *indexer, Slot *slot, bool acceptUpdatesToLo updateLastMessage(slot->getMachineID(), slot->getSequenceNumber(), slot, acceptUpdatesToLocal, machineSet); // Process each entry in the slot - for (Entry *entry : slot->getEntries()) { + Vector *entries = slot->getEntries(); + uint eSize = entries->size(); + for (uint ei = 0; ei < eSize; ei++) { + Entry *entry = entries->get(ei); switch (entry->getType()) { - case TypeCommitPart: processEntry((CommitPart *)entry); break; - case TypeAbort: processEntry((Abort *)entry); break; - case TypeTransactionPart: processEntry((TransactionPart *)entry); break; - case TypeNewKey: processEntry((NewKey *)entry); break; - case TypeLastMessage: processEntry((LastMessage *)entry, machineSet); break; - case TypeRejectedMessage: processEntry((RejectedMessage *)entry, indexer); break; - case TypeTableStatus: processEntry((TableStatus *)entry, slot->getSequenceNumber()); break; - default: - throw new Error("Unrecognized type: " + entry->getType()); + throw new Error("Unrecognized type: "); } } } @@ -2258,10 +2435,10 @@ void Table::processEntry(LastMessage *entry, Hashset *machineSet) { } /** - * Add the new key to the arbitrators table and update the set of live new keys (in case of a rescued new key message) + * Add the new key to the arbitrators table and update the set of live + * new keys (in case of a rescued new key message) */ void Table::processEntry(NewKey *entry) { - // Update the arbitrator table with the new key information arbitratorTable->put(entry->getKey(), entry->getMachineID()); @@ -2274,18 +2451,19 @@ void Table::processEntry(NewKey *entry) { } /** - * Process new table status entries and set dead the old ones as new ones come in-> - * keeps track of the largest and smallest table status seen in this current round - * of updating the local copy of the block chain + * Process new table status entries and set dead the old ones as new + * ones come in-> keeps track of the largest and smallest table status + * seen in this current round of updating the local copy of the block + * chain */ -void Table::processEntry(TableStatus entry, int64_t seq) { +void Table::processEntry(TableStatus *entry, int64_t seq) { int newNumSlots = entry->getMaxSlots(); updateCurrMaxSize(newNumSlots); - initExpectedSize(seq, newNumSlots); if (liveTableStatus != NULL) { - // We have a larger table status so the old table status is no int64_ter alive + // We have a larger table status so the old table status is no + // int64_ter alive liveTableStatus->setDead(); } @@ -2294,7 +2472,8 @@ void Table::processEntry(TableStatus entry, int64_t seq) { } /** - * Check old messages to see if there is a block chain violation-> Also + * Check old messages to see if there is a block chain violation-> + * Also */ void Table::processEntry(RejectedMessage *entry, SlotIndexer *indexer) { int64_t oldSeqNum = entry->getOldSeqNum(); @@ -2303,30 +2482,29 @@ void Table::processEntry(RejectedMessage *entry, SlotIndexer *indexer) { int64_t machineId = entry->getMachineID(); int64_t seq = entry->getSequenceNumber(); - - // Check if we have messages that were supposed to be rejected in our local block chain + // Check if we have messages that were supposed to be rejected in + // our local block chain for (int64_t seqNum = oldSeqNum; seqNum <= newSeqNum; seqNum++) { - // Get the slot Slot *slot = indexer->getSlot(seqNum); if (slot != NULL) { - // If we have this slot make sure that it was not supposed to be a rejected slot - + // If we have this slot make sure that it was not supposed to be + // a rejected slot int64_t slotMachineId = slot->getMachineID(); if (isequal != (slotMachineId == machineId)) { - throw new Error("Server Error: Trying to insert rejected message for slot " + seqNum); + throw new Error("Server Error: Trying to insert rejected message for slot "); } } } - - // Create a list of clients to watch until they see this rejected message entry-> + // Create a list of clients to watch until they see this rejected + // message entry-> Hashset *deviceWatchSet = new Hashset(); - for (Map->Entry > *lastMessageEntry : lastMessageTable->entrySet()) { - + SetIterator *> *iter = getKeyIterator(lastMessageTable); + while (iter->hasNext()) { // Machine ID for the last message entry - int64_t lastMessageEntryMachineId = lastMessageEntry->getKey(); + int64_t lastMessageEntryMachineId = iter->next(); // We've seen it, don't need to continue to watch-> Our next // message will implicitly acknowledge it-> @@ -2334,18 +2512,19 @@ void Table::processEntry(RejectedMessage *entry, SlotIndexer *indexer) { continue; } - Pair lastMessageValue = lastMessageEntry->getValue(); + Pair *lastMessageValue = lastMessageTable->get(lastMessageEntryMachineId); int64_t entrySequenceNumber = lastMessageValue->getFirst(); if (entrySequenceNumber < seq) { - - // Add this rejected message to the set of messages that this machine ID did not see yet + // Add this rejected message to the set of messages that this + // machine ID did not see yet addWatchVector(lastMessageEntryMachineId, entry); - - // This client did not see this rejected message yet so add it to the watch set to monitor + // This client did not see this rejected message yet so add it + // to the watch set to monitor deviceWatchSet->add(lastMessageEntryMachineId); } } + delete iter; if (deviceWatchSet->isEmpty()) { // This rejected message has been seen by all the clients so @@ -2357,12 +2536,10 @@ void Table::processEntry(RejectedMessage *entry, SlotIndexer *indexer) { } /** - * Check if this abort is live, if not then save it so we can kill it later-> - * update the last transaction number that was arbitrated on-> + * Check if this abort is live, if not then save it so we can kill it + * later-> update the last transaction number that was arbitrated on-> */ void Table::processEntry(Abort *entry) { - - if (entry->getTransactionSequenceNumber() != -1) { // update the transaction status if it was sent to the server TransactionStatus *status = outstandingTransactionStatus->remove(entry->getTransactionSequenceNumber()); @@ -2371,10 +2548,12 @@ void Table::processEntry(Abort *entry) { } } - // Abort has not been seen by the client it is for yet so we need to keep track of it - Abort *previouslySeenAbort = liveAbortTable->put(entry->getAbortId(), entry); + // Abort has not been seen by the client it is for yet so we need to + // keep track of it + + Abort *previouslySeenAbort = liveAbortTable->put(new Pair(entry->getAbortId()), entry); if (previouslySeenAbort != NULL) { - previouslySeenAbort->setDead(); // Delete old version of the abort since we got a rescued newer version + previouslySeenAbort->setDead(); // Delete old version of the abort since we got a rescued newer version } if (entry->getTransactionArbitrator() == localMachineId) { @@ -2382,21 +2561,19 @@ void Table::processEntry(Abort *entry) { } if ((entry->getSequenceNumber() != -1) && (lastMessageTable->get(entry->getTransactionMachineId())->getFirst() >= entry->getSequenceNumber())) { - // The machine already saw this so it is dead entry->setDead(); - liveAbortTable->remove(entry->getAbortId()); + Pair abortid = entry->getAbortId(); + liveAbortTable->remove(&abortid); if (entry->getTransactionArbitrator() == localMachineId) { liveAbortsGeneratedByLocal->remove(entry->getArbitratorLocalSequenceNumber()); } - return; } // Update the last arbitration data that we have seen so far - if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(entry->getTransactionArbitrator()) != NULL) { - + if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->contains(entry->getTransactionArbitrator())) { int64_t lastArbitrationSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(entry->getTransactionArbitrator()); if (entry->getSequenceNumber() > lastArbitrationSequenceNumber) { // Is larger @@ -2407,17 +2584,18 @@ void Table::processEntry(Abort *entry) { lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->put(entry->getTransactionArbitrator(), entry->getSequenceNumber()); } - // Set dead a transaction if we can - Transaction *transactionToSetDead = liveTransactionByTransactionIdTable->remove(Pair(entry->getTransactionMachineId(), entry->getTransactionClientLocalSequenceNumber())); + Pair deadPair = Pair(entry->getTransactionMachineId(), entry->getTransactionClientLocalSequenceNumber()); + + Transaction *transactionToSetDead = liveTransactionByTransactionIdTable->remove(&deadPair); if (transactionToSetDead != NULL) { liveTransactionBySequenceNumberTable->remove(transactionToSetDead->getSequenceNumber()); } - // Update the last transaction sequence number that the arbitrator arbitrated on - int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(entry->getTransactionArbitrator()); - if ((lastTransactionNumber == NULL) || (lastTransactionNumber < entry->getTransactionSequenceNumber())) { - + // Update the last transaction sequence number that the arbitrator + // arbitrated on + if (!lastArbitratedTransactionNumberByArbitratorTable->contains(entry->getTransactionArbitrator()) || + (lastArbitratedTransactionNumberByArbitratorTable->get(entry->getTransactionArbitrator()) < entry->getTransactionSequenceNumber())) { // Is a valid one if (entry->getTransactionSequenceNumber() != -1) { lastArbitratedTransactionNumberByArbitratorTable->put(entry->getTransactionArbitrator(), entry->getTransactionSequenceNumber()); @@ -2426,28 +2604,30 @@ void Table::processEntry(Abort *entry) { } /** - * Set dead the transaction part if that transaction is dead and keep track of all new parts + * Set dead the transaction part if that transaction is dead and keep + * track of all new parts */ void Table::processEntry(TransactionPart *entry) { - // Check if we have already seen this transaction and set it dead OR if it is not alive - int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(entry->getArbitratorId()); - if ((lastTransactionNumber != NULL) && (lastTransactionNumber >= entry->getSequenceNumber())) { + // Check if we have already seen this transaction and set it dead OR + // if it is not alive + if (lastArbitratedTransactionNumberByArbitratorTable->contains(entry->getArbitratorId()) && (lastArbitratedTransactionNumberByArbitratorTable->get(entry->getArbitratorId()) >= entry->getSequenceNumber())) { // This transaction is dead, it was already committed or aborted entry->setDead(); return; } // This part is still alive - Hashtable, TransactionPart *> *transactionPart = newTransactionParts->get(entry->getMachineId()); + Hashtable *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *transactionPart = newTransactionParts->get(entry->getMachineId()); if (transactionPart == NULL) { // Dont have a table for this machine Id yet so make one - transactionPart = new Hashtable, TransactionPart *>(); + transactionPart = new Hashtable *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals>(); newTransactionParts->put(entry->getMachineId(), transactionPart); } - // Update the part and set dead ones we have already seen (got a rescued version) - TransactionPart *previouslySeenPart = transactionPart->put(entry->getPartId(), entry); + // Update the part and set dead ones we have already seen (got a + // rescued version) + TransactionPart *previouslySeenPart = transactionPart->put(new Pair(entry->getPartId()), entry); if (previouslySeenPart != NULL) { previouslySeenPart->setDead(); } @@ -2459,95 +2639,86 @@ void Table::processEntry(TransactionPart *entry) { void Table::processEntry(CommitPart *entry) { // Update the last transaction that was updated if we can if (entry->getTransactionSequenceNumber() != -1) { - int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(entry->getMachineId()); - - // Update the last transaction sequence number that the arbitrator arbitrated on - if ((lastTransactionNumber == NULL) || (lastTransactionNumber < entry->getTransactionSequenceNumber())) { + if (!lastArbitratedTransactionNumberByArbitratorTable->contains(entry->getMachineId() || lastArbitratedTransactionNumberByArbitratorTable->get(entry->getMachineId()) < entry->getTransactionSequenceNumber())) { lastArbitratedTransactionNumberByArbitratorTable->put(entry->getMachineId(), entry->getTransactionSequenceNumber()); } } - - - - Hashtable, CommitPart *> *commitPart = newCommitParts->get(entry->getMachineId()); - + Hashtable *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *commitPart = newCommitParts->get(entry->getMachineId()); if (commitPart == NULL) { // Don't have a table for this machine Id yet so make one - commitPart = new Hashtable, CommitPart *>(); + commitPart = new Hashtable *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals>(); newCommitParts->put(entry->getMachineId(), commitPart); } - - // Update the part and set dead ones we have already seen (got a rescued version) - CommitPart *previouslySeenPart = commitPart->put(entry->getPartId(), entry); + // Update the part and set dead ones we have already seen (got a + // rescued version) + CommitPart *previouslySeenPart = commitPart->put(new Pair(entry->getPartId()), entry); if (previouslySeenPart != NULL) { previouslySeenPart->setDead(); } } /** - * Update the last message seen table-> Update and set dead the appropriate RejectedMessages as clients see them-> - * Updates the live aborts, removes those that are dead and sets them dead-> - * Check that the last message seen is correct and that there is no mismatch of our own last message or that - * other clients have not had a rollback on the last message-> + * Update the last message seen table-> Update and set dead the + * appropriate RejectedMessages as clients see them-> Updates the live + * aborts, removes those that are dead and sets them dead-> Check that + * the last message seen is correct and that there is no mismatch of + * our own last message or that other clients have not had a rollback + * on the last message-> */ -void Table::updateLastMessage(int64_t machineId, int64_t seqNum, Liveness *liveness, bool acceptUpdatesToLocal, Hashset *machineSet) { - +void Table::updateLastMessage(int64_t machineId, int64_t seqNum, Liveness *liveness, bool acceptUpdatesToLocal, Hashset *machineSet) { // We have seen this machine ID machineSet->remove(machineId); // Get the set of rejected messages that this machine Id is has not seen yet Hashset *watchset = rejectedMessageWatchVectorTable->get(machineId); - // If there is a rejected message that this machine Id has not seen yet if (watchset != NULL) { + // Go through each rejected message that this machine Id has not + // seen yet - // Go through each rejected message that this machine Id has not seen yet - for (Iterator *rmit = watchset->iterator(); rmit->hasNext(); ) { - + SetIterator *rmit = watchset->iterator(); + while (rmit->hasNext()) { RejectedMessage *rm = rmit->next(); - // If this machine Id has seen this rejected message->->-> if (rm->getSequenceNumber() <= seqNum) { - // Remove it from our watchlist rmit->remove(); - // Decrement machines that need to see this notification rm->removeWatcher(machineId); } } + delete rmit; } // Set dead the abort - for (IteratorEntry, Abort *> > i = liveAbortTable->entrySet()->iterator(); i->hasNext();) { - Abort *abort = i->next()->getValue(); + SetIterator *, Abort *, uintptr_t, 0, pairHashFunction, pairEquals> *abortit = getKeyIterator(liveAbortTable); + while (abortit->hasNext()) { + Pair *key = abortit->next(); + Abort *abort = liveAbortTable->get(key); if ((abort->getTransactionMachineId() == machineId) && (abort->getSequenceNumber() <= seqNum)) { abort->setDead(); - i->remove(); - + abortit->remove(); if (abort->getTransactionArbitrator() == localMachineId) { liveAbortsGeneratedByLocal->remove(abort->getArbitratorLocalSequenceNumber()); } } } - - - + delete abortit; if (machineId == localMachineId) { // Our own messages are immediately dead-> - if (liveness instanceof LastMessage) { + char livenessType = liveness->getType(); + if (livenessType == TypeLastMessage) { ((LastMessage *)liveness)->setDead(); - } else if (liveness instanceof Slot) { + } else if (livenessType == TypeSlot) { ((Slot *)liveness)->setDead(); } else { throw new Error("Unrecognized type"); } } - // Get the old last message for this device - Pair lastMessageEntry = lastMessageTable->put(machineId, Pair(seqNum, liveness)); + Pair *lastMessageEntry = lastMessageTable->put(machineId, new Pair(seqNum, liveness)); if (lastMessageEntry == NULL) { // If no last message then there is nothing else to process return; @@ -2555,31 +2726,31 @@ void Table::updateLastMessage(int64_t machineId, int64_t seqNum, Liveness *liven int64_t lastMessageSeqNum = lastMessageEntry->getFirst(); Liveness *lastEntry = lastMessageEntry->getSecond(); + delete lastMessageEntry; // If it is not our machine Id since we already set ours to dead if (machineId != localMachineId) { - if (lastEntry instanceof LastMessage) { + char lastEntryType = lastEntry->getType(); + + if (lastEntryType == TypeLastMessage) { ((LastMessage *)lastEntry)->setDead(); - } else if (lastEntry instanceof Slot) { + } else if (lastEntryType == TypeSlot) { ((Slot *)lastEntry)->setDead(); } else { throw new Error("Unrecognized type"); } } - // Make sure the server is not playing any games if (machineId == localMachineId) { - if (hadPartialSendToServer) { // We were not making any updates and we had a machine mismatch if (lastMessageSeqNum > seqNum && !acceptUpdatesToLocal) { - throw new Error("Server Error: Mismatch on local machine sequence number, needed at least: " + lastMessageSeqNum + " got: " + seqNum); + throw new Error("Server Error: Mismatch on local machine sequence number, needed at least: "); } - } else { // We were not making any updates and we had a machine mismatch if (lastMessageSeqNum != seqNum && !acceptUpdatesToLocal) { - throw new Error("Server Error: Mismatch on local machine sequence number, needed: " + lastMessageSeqNum + " got: " + seqNum); + throw new Error("Server Error: Mismatch on local machine sequence number, needed: "); } } } else { @@ -2590,8 +2761,9 @@ void Table::updateLastMessage(int64_t machineId, int64_t seqNum, Liveness *liven } /** - * Add a rejected message entry to the watch set to keep track of which clients have seen that - * rejected message entry and which have not-> + * Add a rejected message entry to the watch set to keep track of + * which clients have seen that rejected message entry and which have + * not. */ void Table::addWatchVector(int64_t machineId, RejectedMessage *entry) { Hashset *entries = rejectedMessageWatchVectorTable->get(machineId); @@ -2607,12 +2779,11 @@ void Table::addWatchVector(int64_t machineId, RejectedMessage *entry) { * Check if the HMAC chain is not violated */ void Table::checkHMACChain(SlotIndexer *indexer, Array *newSlots) { - for (int i = 0; i < newSlots->length; i++) { - Slot *currSlot = newSlots[i]; + for (int i = 0; i < newSlots->length(); i++) { + Slot *currSlot = newSlots->get(i); Slot *prevSlot = indexer->getSlot(currSlot->getSequenceNumber() - 1); if (prevSlot != NULL && !prevSlot->getHMAC()->equals(currSlot->getPrevHMAC())) throw new Error("Server Error: Invalid HMAC Chain"); } } -