From 3e13e92e323d8bb88021721ebc27765227fe7107 Mon Sep 17 00:00:00 2001 From: bdemsky Date: Fri, 19 Jan 2018 19:58:45 -0800 Subject: [PATCH] edits --- version2/src/C/Table.cc | 1613 ++++++++++++++++++--------------------- version2/src/C/Table.h | 45 +- 2 files changed, 759 insertions(+), 899 deletions(-) diff --git a/version2/src/C/Table.cc b/version2/src/C/Table.cc index 76a7f16..2687db1 100644 --- a/version2/src/C/Table.cc +++ b/version2/src/C/Table.cc @@ -1,6 +1,17 @@ #include "Table.h" - -Table::Table(String baseurl, String password, int64_t _localMachineId, int listeningPort) : +#include "CloudComm.h" +#include "SlotBuffer.h" +#include "NewKey.h" +#include "Slot.h" +#include "KeyValue.h" +#include "Error.h" +#include "PendingTransaction.h" +#include "TableStatus.h" +#include "TransactionStatus.h" +#include "Transaction.h" +#include "Random.h" + +Table::Table(IoTString *baseurl, IoTString *password, int64_t _localMachineId, int listeningPort) : buffer(NULL), cloud(new CloudComm(this, baseurl, password, listeningPort)), random(NULL), @@ -11,7 +22,7 @@ Table::Table(String baseurl, String password, int64_t _localMachineId, int liste numberOfSlots(0), bufferResizeThreshold(0), liveSlotCount(0), - oldestLiveSlotSequenceNumber(0), + oldestLiveSlotSequenceNumver(1), localMachineId(_localMachineId), sequenceNumber(0), localTransactionSequenceNumber(0), @@ -20,7 +31,7 @@ Table::Table(String baseurl, String password, int64_t _localMachineId, int liste localArbitrationSequenceNumber(0), hadPartialSendToServer(false), attemptedToSendToServer(false), - expectedSize(0), + expectedsize(0), didFindTableStatus(false), currMaxSize(0), lastSlotAttemptedToSend(NULL), @@ -62,7 +73,7 @@ Table::Table(String baseurl, String password, int64_t _localMachineId, int liste init(); } -Table::Table(CloudComm _cloud, int64_t _localMachineId) : +Table::Table(CloudComm * _cloud, int64_t _localMachineId) : buffer(NULL), cloud(_cloud), random(NULL), @@ -73,7 +84,7 @@ Table::Table(CloudComm _cloud, int64_t _localMachineId) : numberOfSlots(0), bufferResizeThreshold(0), liveSlotCount(0), - oldestLiveSlotSequenceNumber(0), + oldestLiveSlotSequenceNumver(1), localMachineId(_localMachineId), sequenceNumber(0), localTransactionSequenceNumber(0), @@ -82,7 +93,7 @@ Table::Table(CloudComm _cloud, int64_t _localMachineId) : localArbitrationSequenceNumber(0), hadPartialSendToServer(false), attemptedToSendToServer(false), - expectedSize(0), + expectedsize(0), didFindTableStatus(false), currMaxSize(0), lastSlotAttemptedToSend(NULL), @@ -128,148 +139,65 @@ Table::Table(CloudComm _cloud, int64_t _localMachineId) : * Init all the stuff needed for for table usage */ void Table::init() { - // Init helper objects random = new Random(); buffer = new SlotBuffer(); - // Set Variables - oldestLiveSlotSequenceNumver = 1; - // init data structs - committedKeyValueTable = new Hashtable(); - speculatedKeyValueTable = new Hashtable(); - pendingTransactionSpeculatedKeyValueTable = new Hashtable(); - liveNewKeyTable = new Hashtable(); - lastMessageTable = new Hashtable >(); - rejectedMessageWatchVectorTable = new Hashtable >(); - arbitratorTable = new Hashtable(); - liveAbortTable = new Hashtable, Abort>(); - newTransactionParts = new Hashtable, TransactionPart> >(); - newCommitParts = new Hashtable, CommitPart> >(); + committedKeyValueTable = new Hashtable(); + speculatedKeyValueTable = new Hashtable(); + pendingTransactionSpeculatedKeyValueTable = new Hashtable(); + liveNewKeyTable = new Hashtable(); + lastMessageTable = new Hashtable *>(); + rejectedMessageWatchVectorTable = new Hashtable* >(); + arbitratorTable = new Hashtable(); + liveAbortTable = new Hashtable*, Abort*>(); + newTransactionParts = new Hashtable*, TransactionPart*> *>(); + newCommitParts = new Hashtable*, CommitPart*> *>(); lastArbitratedTransactionNumberByArbitratorTable = new Hashtable(); - liveTransactionBySequenceNumberTable = new Hashtable(); - liveTransactionByTransactionIdTable = new Hashtable, Transaction>(); - liveCommitsTable = new Hashtable >(); - liveCommitsByKeyTable = new Hashtable(); + liveTransactionBySequenceNumberTable = new Hashtable(); + liveTransactionByTransactionIdTable = new Hashtable*, Transaction*>(); + liveCommitsTable = new Hashtable >(); + liveCommitsByKeyTable = new Hashtable(); lastCommitSeenSequenceNumberByArbitratorTable = new Hashtable(); - rejectedSlotVector = new Vector(); - pendingTransactionQueue = new Vector(); - pendingSendArbitrationEntriesToDelete = new Vector(); - transactionPartsSent = new Hashtable >(); - outstandingTransactionStatus = new Hashtable(); - liveAbortsGeneratedByLocal = new Hashtable(); - offlineTransactionsCommittedAndAtServer = new Hashset >(); - localCommunicationTable = new Hashtable >(); + rejectedSlotVector = new Vector(); + pendingTransactionQueue = new Vector(); + pendingSendArbitrationEntriesToDelete = new Vector(); + transactionPartsSent = new Hashtable *>(); + outstandingTransactionStatus = new Hashtable(); + liveAbortsGeneratedByLocal = new Hashtable(); + offlineTransactionsCommittedAndAtServer = new Hashset*>(); + localCommunicationTable = new Hashtable*>(); lastTransactionSeenFromMachineFromServer = new Hashtable(); - pendingSendArbitrationRounds = new Vector(); + pendingSendArbitrationRounds = new Vector(); lastArbitrationDataLocalSequenceNumberSeenFromArbitrator = new Hashtable(); // Other init stuff - numberOfSlots = buffer.capacity(); + numberOfSlots = buffer->capacity(); setResizeThreshold(); } -// TODO: delete method -synchronized void Table::printSlots() { - int64_t o = buffer.getOldestSeqNum(); - int64_t n = buffer.getNewestSeqNum(); - - Array *types = new Array(10); - - int num = 0; - - int livec = 0; - int deadc = 0; - - int casdasd = 0; - - int liveslo = 0; - - for (int64_t i = o; i < (n + 1); i++) { - Slot s = buffer.getSlot(i); - - - if (s.isLive()) { - liveslo++; - } - - Vector entries = s.getEntries(); - - for (Entry e : entries) { - if (e.isLive()) { - int type = e.getType(); - - - if (type == 6) { - RejectedMessage rej = (RejectedMessage)e; - casdasd++; - - System.out.println(rej.getMachineID()); - } - - - types[type] = types[type] + 1; - num++; - livec++; - } else { - deadc++; - } - } - } - - for (int i = 0; i < 10; i++) { - System.out.println(i + " " + types[i]); - } - System.out.println("Live count: " + livec); - System.out.println("Live Slot count: " + liveslo); - - System.out.println("Dead count: " + deadc); - System.out.println("Old: " + o); - System.out.println("New: " + n); - System.out.println("Size: " + buffer.size()); - // System.out.println("Commits: " + liveCommitsTable.size()); - System.out.println("pendingTrans: " + pendingTransactionQueue.size()); - System.out.println("Trans Status Out: " + outstandingTransactionStatus.size()); - - for (Long k : lastArbitratedTransactionNumberByArbitratorTable.keySet()) { - System.out.println(k + ": " + lastArbitratedTransactionNumberByArbitratorTable.get(k)); - } - - - for (Long a : liveCommitsTable.keySet()) { - for (Long b : liveCommitsTable.get(a).keySet()) { - for (KeyValue kv : liveCommitsTable.get(a).get(b).getKeyValueUpdateSet()) { - System.out.print(kv + " "); - } - System.out.print("|| "); - } - System.out.println(); - } - -} - /** * Initialize the table by inserting a table status as the first entry into the table status * also initialize the crypto stuff. */ -synchronized void Table::initTable() { - cloud.initSecurity(); + void Table::initTable() { + cloud->initSecurity(); // Create the first insertion into the block chain which is the table status - Slot s = new Slot(this, 1, localMachineId, localSequenceNumber); + Slot *s = new Slot(this, 1, localMachineId, localSequenceNumber); localSequenceNumber++; - TableStatus status = new TableStatus(s, numberOfSlots); - s.addEntry(status); - Array *array = cloud.putSlot(s, numberOfSlots); + TableStatus *status = new TableStatus(s, numberOfSlots); + s->addEntry(status); + Array *array = cloud->putSlot(s, numberOfSlots); if (array == NULL) { - array = new Array(1); + array = new Array(1); array->set(0, s); // update local block chain validateAndUpdate(array, true); - } else if (array.length == 1) { + } else if (array->length() == 1) { // in case we did push the slot BUT we failed to init it validateAndUpdate(array, true); } else { @@ -280,124 +208,109 @@ synchronized void Table::initTable() { /** * Rebuild the table from scratch by pulling the latest block chain from the server. */ -synchronized void Table::rebuild() { + void Table::rebuild() { // Just pull the latest slots from the server - Array *newslots = cloud.getSlots(sequenceNumber + 1); + Array *newslots = cloud->getSlots(sequenceNumber + 1); validateAndUpdate(newslots, true); sendToServer(NULL); updateLiveTransactionsAndStatus(); - } -// String toString() { -// String retString = " Committed Table: \n"; -// retString += "---------------------------\n"; -// retString += commitedTable.toString(); - -// retString += "\n\n"; - -// retString += " Speculative Table: \n"; -// retString += "---------------------------\n"; -// retString += speculativeTable.toString(); - -// return retString; -// } - -synchronized void Table::addLocalCommunication(int64_t arbitrator, String hostName, int portNumber) { - localCommunicationTable.put(arbitrator, new Pair(hostName, portNumber)); + void Table::addLocalCommunication(int64_t arbitrator, IoTString *hostName, int portNumber) { + localCommunicationTable->put(arbitrator, new Pair(hostName, portNumber)); } -synchronized Long Table::getArbitrator(IoTString key) { - return arbitratorTable.get(key); + int64_t Table::getArbitrator(IoTString *key) { + return arbitratorTable->get(key); } -synchronized void Table::close() { - cloud.close(); + void Table::close() { + cloud->close(); } -synchronized IoTString Table::getCommitted(IoTString key) { - KeyValue kv = committedKeyValueTable.get(key); + IoTString * Table::getCommitted(IoTString *key) { + KeyValue *kv = committedKeyValueTable->get(key); if (kv != NULL) { - return kv.getValue(); + return kv->getValue(); } else { return NULL; } } -synchronized IoTString Table::getSpeculative(IoTString key) { - KeyValue kv = pendingTransactionSpeculatedKeyValueTable.get(key); + IoTString * Table::getSpeculative(IoTString *key) { + KeyValue *kv = pendingTransactionSpeculatedKeyValueTable->get(key); if (kv == NULL) { - kv = speculatedKeyValueTable.get(key); + kv = speculatedKeyValueTable->get(key); } if (kv == NULL) { - kv = committedKeyValueTable.get(key); + kv = committedKeyValueTable->get(key); } if (kv != NULL) { - return kv.getValue(); + return kv->getValue(); } else { return NULL; } } -synchronized IoTString Table::getCommittedAtomic(IoTString key) { - KeyValue kv = committedKeyValueTable.get(key); + IoTString * Table::getCommittedAtomic(IoTString *key) { + KeyValue *kv = committedKeyValueTable->get(key); - if (arbitratorTable.get(key) == NULL) { + if (arbitratorTable->get(key) == NULL) { throw new Error("Key not Found."); } // Make sure new key value pair matches the current arbitrator - if (!pendingTransactionBuilder.checkArbitrator(arbitratorTable.get(key))) { + if (!pendingTransactionBuilder->checkArbitrator(arbitratorTable->get(key))) { // TODO: Maybe not throw en error throw new Error("Not all Key Values Match Arbitrator."); } if (kv != NULL) { - pendingTransactionBuilder.addKVGuard(new KeyValue(key, kv.getValue())); - return kv.getValue(); + pendingTransactionBuilder->addKVGuard(new KeyValue(key, kv->getValue())); + return kv->getValue(); } else { - pendingTransactionBuilder.addKVGuard(new KeyValue(key, NULL)); + pendingTransactionBuilder->addKVGuard(new KeyValue(key, NULL)); return NULL; } } -synchronized IoTString Table::getSpeculativeAtomic(IoTString key) { - if (arbitratorTable.get(key) == NULL) { + IoTString * Table::getSpeculativeAtomic(IoTString *key) { + if (arbitratorTable->get(key) == NULL) { throw new Error("Key not Found."); } // Make sure new key value pair matches the current arbitrator - if (!pendingTransactionBuilder.checkArbitrator(arbitratorTable.get(key))) { + if (!pendingTransactionBuilder->checkArbitrator(arbitratorTable->get(key))) { // TODO: Maybe not throw en error throw new Error("Not all Key Values Match Arbitrator."); } - KeyValue kv = pendingTransactionSpeculatedKeyValueTable.get(key); + KeyValue *kv = pendingTransactionSpeculatedKeyValueTable->get(key); if (kv == NULL) { - kv = speculatedKeyValueTable.get(key); + kv = speculatedKeyValueTable->get(key); } if (kv == NULL) { - kv = committedKeyValueTable.get(key); + kv = committedKeyValueTable->get(key); } if (kv != NULL) { - pendingTransactionBuilder.addKVGuard(new KeyValue(key, kv.getValue())); - return kv.getValue(); + pendingTransactionBuilder->addKVGuard(new KeyValue(key, kv->getValue())); + return kv->getValue(); } else { - pendingTransactionBuilder.addKVGuard(new KeyValue(key, NULL)); + pendingTransactionBuilder->addKVGuard(new KeyValue(key, NULL)); return NULL; } } -synchronized bool Table::update() { + bool Table::update() { try { - Array *newSlots = cloud.getSlots(sequenceNumber + 1); + Array *newSlots = cloud->getSlots(sequenceNumber + 1); validateAndUpdate(newSlots, false); sendToServer(NULL); @@ -405,10 +318,10 @@ synchronized bool Table::update() { updateLiveTransactionsAndStatus(); return true; - } catch (Exception e) { - // e.printStackTrace(); + } catch (Exception *e) { + // e->printStackTrace(); - for (Long m : localCommunicationTable.keySet()) { + for (int64_t m : localCommunicationTable->keySet()) { updateFromLocal(m); } } @@ -416,14 +329,14 @@ synchronized bool Table::update() { return false; } -synchronized bool Table::createNewKey(IoTString keyName, int64_t machineId) { + bool Table::createNewKey(IoTString *keyName, int64_t machineId) { while (true) { - if (arbitratorTable.get(keyName) != NULL) { + if (arbitratorTable->get(keyName) != NULL) { // There is already an arbitrator return false; } - NewKey newKey = new NewKey(NULL, keyName, machineId); + NewKey * newKey = new NewKey(NULL, keyName, machineId); if (sendToServer(newKey)) { // If successfully inserted @@ -432,50 +345,50 @@ synchronized bool Table::createNewKey(IoTString keyName, int64_t machineId) { } } -synchronized void Table::startTransaction() { + void Table::startTransaction() { // Create a new transaction, invalidates any old pending transactions. pendingTransactionBuilder = new PendingTransaction(localMachineId); } -synchronized void Table::addKV(IoTString key, IoTString value) { + void Table::addKV(IoTString *key, IoTString *value) { // Make sure it is a valid key - if (arbitratorTable.get(key) == NULL) { + if (arbitratorTable->get(key) == NULL) { throw new Error("Key not Found."); } // Make sure new key value pair matches the current arbitrator - if (!pendingTransactionBuilder.checkArbitrator(arbitratorTable.get(key))) { + if (!pendingTransactionBuilder->checkArbitrator(arbitratorTable->get(key))) { // TODO: Maybe not throw en error throw new Error("Not all Key Values Match Arbitrator."); } // Add the key value to this transaction - KeyValue kv = new KeyValue(key, value); - pendingTransactionBuilder.addKV(kv); + KeyValue *kv = new KeyValue(key, value); + pendingTransactionBuilder->addKV(kv); } -synchronized TransactionStatus Table::commitTransaction() { + TransactionStatus Table::commitTransaction() { - if (pendingTransactionBuilder.getKVUpdates().size() == 0) { + if (pendingTransactionBuilder->getKVUpdates()->size() == 0) { // transaction with no updates will have no effect on the system - return new TransactionStatus(TransactionStatus.StatusNoEffect, -1); + return new TransactionStatus(TransactionStatus_StatusNoEffect, -1); } // Set the local transaction sequence number and increment - pendingTransactionBuilder.setClientLocalSequenceNumber(localTransactionSequenceNumber); + pendingTransactionBuilder->setClientLocalSequenceNumber(localTransactionSequenceNumber); localTransactionSequenceNumber++; // Create the transaction status - TransactionStatus transactionStatus = new TransactionStatus(TransactionStatus.StatusPending, pendingTransactionBuilder.getArbitrator()); + TransactionStatus transactionStatus = new TransactionStatus(TransactionStatus_StatusPending, pendingTransactionBuilder->getArbitrator()); // Create the new transaction - Transaction newTransaction = pendingTransactionBuilder.createTransaction(); - newTransaction.setTransactionStatus(transactionStatus); + Transaction *newTransaction = pendingTransactionBuilder->createTransaction(); + newTransaction->setTransactionStatus(transactionStatus); - if (pendingTransactionBuilder.getArbitrator() != localMachineId) { + if (pendingTransactionBuilder->getArbitrator() != localMachineId) { // Add it to the queue and invalidate the builder for safety - pendingTransactionQueue.add(newTransaction); + pendingTransactionQueue->add(newTransaction); } else { arbitrateOnLocalTransaction(newTransaction); updateLiveStateFromLocal(); @@ -485,29 +398,29 @@ synchronized TransactionStatus Table::commitTransaction() { try { sendToServer(NULL); - } catch (ServerException e) { + } catch (ServerException *e) { - Set arbitratorTriedAndFailed = new Hashset(); - for (Iterator iter = pendingTransactionQueue.iterator(); iter.hasNext(); ) { - Transaction transaction = iter.next(); + Hashset* arbitratorTriedAndFailed = new Hashset(); + for (Iterator *iter = pendingTransactionQueue->iterator(); iter->hasNext(); ) { + Transaction * transaction = iter->next(); - if (arbitratorTriedAndFailed.contains(transaction.getArbitrator())) { + if (arbitratorTriedAndFailed->contains(transaction->getArbitrator())) { // Already contacted this client so ignore all attempts to contact this client // to preserve ordering for arbitrator continue; } - Pair sendReturn = sendTransactionToLocal(transaction); + Pair * sendReturn = sendTransactionToLocal(transaction); - if (sendReturn.getFirst()) { + if (sendReturn->getFirst()) { // Failed to contact over local - arbitratorTriedAndFailed.add(transaction.getArbitrator()); + arbitratorTriedAndFailed->add(transaction->getArbitrator()); } else { // Successful contact or should not contact - if (sendReturn.getSecond()) { + if (sendReturn->getSecond()) { // did arbitrate - iter.remove(); + iter->remove(); } } } @@ -518,26 +431,12 @@ synchronized TransactionStatus Table::commitTransaction() { return transactionStatus; } -/** - * Get the machine ID for this client - */ -int64_t Table::getMachineId() { - return localMachineId; -} - -/** - * Decrement the number of live slots that we currently have - */ -void Table::decrementLiveCount() { - liveSlotCount--; -} - /** * Recalculate the new resize threshold */ void Table::setResizeThreshold() { - int resizeLower = (int) (RESIZE_THRESHOLD * numberOfSlots); - bufferResizeThreshold = resizeLower - 1 + random.nextInt(numberOfSlots - resizeLower); + int resizeLower = (int) (Table_RESIZE_THRESHOLD * numberOfSlots); + bufferResizeThreshold = resizeLower - 1 + random->nextInt(numberOfSlots - resizeLower); } int64_t Table::getLocalSequenceNumber() { @@ -547,65 +446,65 @@ int64_t Table::getLocalSequenceNumber() { bool lastInsertedNewKey = false; -bool Table::sendToServer(NewKey newKey) { +bool Table::sendToServer(NewKey* newKey) { bool fromRetry = false; try { if (hadPartialSendToServer) { - Array *newSlots = cloud.getSlots(sequenceNumber + 1); - if (newSlots.length == 0) { + Array *newSlots = cloud->getSlots(sequenceNumber + 1); + if (newSlots->length() == 0) { fromRetry = true; - ThreeTuple *> sendSlotsReturn = sendSlotsToServer(lastSlotAttemptedToSend, lastNewSize, lastIsNewKey); + ThreeTuple *> *sendSlotsReturn = sendSlotsToServer(lastSlotAttemptedToSend, lastNewSize, lastIsNewKey); - if (sendSlotsReturn.getFirst()) { + if (sendSlotsReturn->getFirst()) { if (newKey != NULL) { - if (lastInsertedNewKey && (lastNewKey.getKey() == newKey.getKey()) && (lastNewKey.getMachineID() == newKey.getMachineID())) { + if (lastInsertedNewKey && (lastNewKey->getKey() == newKey->getKey()) && (lastNewKey->getMachineID() == newKey->getMachineID())) { newKey = NULL; } } - for (Transaction transaction : lastTransactionPartsSent.keySet()) { - transaction.resetServerFailure(); + for (Transaction *transaction : lastTransactionPartsSent->keySet()) { + transaction->resetServerFailure(); // Update which transactions parts still need to be sent - transaction.removeSentParts(lastTransactionPartsSent.get(transaction)); + transaction->removeSentParts(lastTransactionPartsSent->get(transaction)); // Add the transaction status to the outstanding list - outstandingTransactionStatus.put(transaction.getSequenceNumber(), transaction.getTransactionStatus()); + outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus()); // Update the transaction status - transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentPartial); + transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial); // Check if all the transaction parts were successfully sent and if so then remove it from pending - if (transaction.didSendAllParts()) { - transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentFully); - pendingTransactionQueue.remove(transaction); + if (transaction->didSendAllParts()) { + transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully); + pendingTransactionQueue->remove(transaction); } } } else { - newSlots = sendSlotsReturn.getThird(); + newSlots = sendSlotsReturn->getThird(); bool isInserted = false; - for (Slot s : newSlots) { - if ((s.getSequenceNumber() == lastSlotAttemptedToSend.getSequenceNumber()) && (s.getMachineID() == localMachineId)) { + for (Slot *s : newSlots) { + if ((s->getSequenceNumber() == lastSlotAttemptedToSend->getSequenceNumber()) && (s->getMachineID() == localMachineId)) { isInserted = true; break; } } - for (Slot s : newSlots) { + for (Slot *s : newSlots) { if (isInserted) { break; } // Process each entry in the slot - for (Entry entry : s.getEntries()) { + for (Entry *entry : s->getEntries()) { - if (entry.getType() == Entry.TypeLastMessage) { + if (entry->getType() == TypeLastMessage) { LastMessage lastMessage = (LastMessage)entry; - if ((lastMessage.getMachineID() == localMachineId) && (lastMessage.getSequenceNumber() == lastSlotAttemptedToSend.getSequenceNumber())) { + if ((lastMessage->getMachineID() == localMachineId) && (lastMessage->getSequenceNumber() == lastSlotAttemptedToSend->getSequenceNumber())) { isInserted = true; break; } @@ -615,71 +514,71 @@ bool Table::sendToServer(NewKey newKey) { if (isInserted) { if (newKey != NULL) { - if (lastInsertedNewKey && (lastNewKey.getKey() == newKey.getKey()) && (lastNewKey.getMachineID() == newKey.getMachineID())) { + if (lastInsertedNewKey && (lastNewKey->getKey() == newKey->getKey()) && (lastNewKey->getMachineID() == newKey->getMachineID())) { newKey = NULL; } } - for (Transaction transaction : lastTransactionPartsSent.keySet()) { - transaction.resetServerFailure(); + for (Transaction *transaction : lastTransactionPartsSent->keySet()) { + transaction->resetServerFailure(); // Update which transactions parts still need to be sent - transaction.removeSentParts(lastTransactionPartsSent.get(transaction)); + transaction->removeSentParts(lastTransactionPartsSent->get(transaction)); // Add the transaction status to the outstanding list - outstandingTransactionStatus.put(transaction.getSequenceNumber(), transaction.getTransactionStatus()); + outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus()); // Update the transaction status - transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentPartial); + transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial); // Check if all the transaction parts were successfully sent and if so then remove it from pending - if (transaction.didSendAllParts()) { - transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentFully); - pendingTransactionQueue.remove(transaction); + if (transaction->didSendAllParts()) { + transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully); + pendingTransactionQueue->remove(transaction); } else { - transaction.resetServerFailure(); + transaction->resetServerFailure(); // Set the transaction sequence number back to nothing - if (!transaction.didSendAPartToServer()) { - transaction.setSequenceNumber(-1); + if (!transaction->didSendAPartToServer()) { + transaction->setSequenceNumber(-1); } } } } } - for (Transaction transaction : lastTransactionPartsSent.keySet()) { - transaction.resetServerFailure(); + for (Transaction *transaction : lastTransactionPartsSent->keySet()) { + transaction->resetServerFailure(); // Set the transaction sequence number back to nothing - if (!transaction.didSendAPartToServer()) { - transaction.setSequenceNumber(-1); + if (!transaction->didSendAPartToServer()) { + transaction->setSequenceNumber(-1); } } - if (sendSlotsReturn.getThird().length != 0) { + if (sendSlotsReturn->getThird()->length() != 0) { // insert into the local block chain - validateAndUpdate(sendSlotsReturn.getThird(), true); + validateAndUpdate(sendSlotsReturn->getThird(), true); } // continue; } else { bool isInserted = false; - for (Slot s : newSlots) { - if ((s.getSequenceNumber() == lastSlotAttemptedToSend.getSequenceNumber()) && (s.getMachineID() == localMachineId)) { + for (Slot *s : newSlots) { + if ((s->getSequenceNumber() == lastSlotAttemptedToSend->getSequenceNumber()) && (s->getMachineID() == localMachineId)) { isInserted = true; break; } } - for (Slot s : newSlots) { + for (Slot *s : newSlots) { if (isInserted) { break; } // Process each entry in the slot - for (Entry entry : s.getEntries()) { + for (Entry *entry : s->getEntries()) { - if (entry.getType() == Entry.TypeLastMessage) { + if (entry->getType() == TypeLastMessage) { LastMessage lastMessage = (LastMessage)entry; - if ((lastMessage.getMachineID() == localMachineId) && (lastMessage.getSequenceNumber() == lastSlotAttemptedToSend.getSequenceNumber())) { + if ((lastMessage->getMachineID() == localMachineId) && (lastMessage->getSequenceNumber() == lastSlotAttemptedToSend->getSequenceNumber())) { isInserted = true; break; } @@ -689,41 +588,41 @@ bool Table::sendToServer(NewKey newKey) { if (isInserted) { if (newKey != NULL) { - if (lastInsertedNewKey && (lastNewKey.getKey() == newKey.getKey()) && (lastNewKey.getMachineID() == newKey.getMachineID())) { + if (lastInsertedNewKey && (lastNewKey->getKey() == newKey->getKey()) && (lastNewKey->getMachineID() == newKey->getMachineID())) { newKey = NULL; } } - for (Transaction transaction : lastTransactionPartsSent.keySet()) { - transaction.resetServerFailure(); + for (Transaction *transaction : lastTransactionPartsSent->keySet()) { + transaction->resetServerFailure(); // Update which transactions parts still need to be sent - transaction.removeSentParts(lastTransactionPartsSent.get(transaction)); + transaction->removeSentParts(lastTransactionPartsSent->get(transaction)); // Add the transaction status to the outstanding list - outstandingTransactionStatus.put(transaction.getSequenceNumber(), transaction.getTransactionStatus()); + outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus()); // Update the transaction status - transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentPartial); + transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial); // Check if all the transaction parts were successfully sent and if so then remove it from pending - if (transaction.didSendAllParts()) { - transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentFully); - pendingTransactionQueue.remove(transaction); + if (transaction->didSendAllParts()) { + transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully); + pendingTransactionQueue->remove(transaction); } else { - transaction.resetServerFailure(); + transaction->resetServerFailure(); // Set the transaction sequence number back to nothing - if (!transaction.didSendAPartToServer()) { - transaction.setSequenceNumber(-1); + if (!transaction->didSendAPartToServer()) { + transaction->setSequenceNumber(-1); } } } } else { - for (Transaction transaction : lastTransactionPartsSent.keySet()) { - transaction.resetServerFailure(); + for (Transaction *transaction : lastTransactionPartsSent->keySet()) { + transaction->resetServerFailure(); // Set the transaction sequence number back to nothing - if (!transaction.didSendAPartToServer()) { - transaction.setSequenceNumber(-1); + if (!transaction->didSendAPartToServer()) { + transaction->setSequenceNumber(-1); } } } @@ -732,7 +631,7 @@ bool Table::sendToServer(NewKey newKey) { validateAndUpdate(newSlots, true); } } - } catch (ServerException e) { + } catch (ServerException *e) { throw e; } @@ -740,7 +639,7 @@ bool Table::sendToServer(NewKey newKey) { try { // While we have stuff that needs inserting into the block chain - while ((pendingTransactionQueue.size() > 0) || (pendingSendArbitrationRounds.size() > 0) || (newKey != NULL)) { + while ((pendingTransactionQueue->size() > 0) || (pendingSendArbitrationRounds->size() > 0) || (newKey != NULL)) { fromRetry = false; @@ -751,34 +650,34 @@ bool Table::sendToServer(NewKey newKey) { // If there is a new key with same name then end - if ((newKey != NULL) && (arbitratorTable.get(newKey.getKey()) != NULL)) { + if ((newKey != NULL) && (arbitratorTable->get(newKey->getKey()) != NULL)) { return false; } // Create the slot - Slot slot = new Slot(this, sequenceNumber + 1, localMachineId, buffer.getSlot(sequenceNumber).getHMAC(), localSequenceNumber); + Slot *slot = new Slot(this, sequenceNumber + 1, localMachineId, buffer->getSlot(sequenceNumber)->getHMAC(), localSequenceNumber); localSequenceNumber++; // Try to fill the slot with data ThreeTuple fillSlotsReturn = fillSlot(slot, false, newKey); - bool needsResize = fillSlotsReturn.getFirst(); - int newSize = fillSlotsReturn.getSecond(); - bool insertedNewKey = fillSlotsReturn.getThird(); + bool needsResize = fillSlotsReturn->getFirst(); + int newSize = fillSlotsReturn->getSecond(); + bool insertedNewKey = fillSlotsReturn->getThird(); if (needsResize) { // Reset which transaction to send - for (Transaction transaction : transactionPartsSent.keySet()) { - transaction.resetNextPartToSend(); + for (Transaction *transaction : transactionPartsSent->keySet()) { + transaction->resetNextPartToSend(); // Set the transaction sequence number back to nothing - if (!transaction.didSendAPartToServer() && !transaction.getServerFailure()) { - transaction.setSequenceNumber(-1); + if (!transaction->didSendAPartToServer() && !transaction->getServerFailure()) { + transaction->setSequenceNumber(-1); } } // Clear the sent data since we are trying again - pendingSendArbitrationEntriesToDelete.clear(); - transactionPartsSent.clear(); + pendingSendArbitrationEntriesToDelete->clear(); + transactionPartsSent->clear(); // We needed a resize so try again fillSlot(slot, true, newKey); @@ -789,13 +688,13 @@ bool Table::sendToServer(NewKey newKey) { lastInsertedNewKey = insertedNewKey; lastNewSize = newSize; lastNewKey = newKey; - lastTransactionPartsSent = new Hashtable >(transactionPartsSent); - lastPendingSendArbitrationEntriesToDelete = new Vector(pendingSendArbitrationEntriesToDelete); + lastTransactionPartsSent = new Hashtable* >(transactionPartsSent); + lastPendingSendArbitrationEntriesToDelete = new Vector(pendingSendArbitrationEntriesToDelete); - ThreeTuple *> sendSlotsReturn = sendSlotsToServer(slot, newSize, newKey != NULL); + ThreeTuple *> sendSlotsReturn = sendSlotsToServer(slot, newSize, newKey != NULL); - if (sendSlotsReturn.getFirst()) { + if (sendSlotsReturn->getFirst()) { // Did insert into the block chain @@ -807,120 +706,81 @@ bool Table::sendToServer(NewKey newKey) { } // Remove the aborts and commit parts that were sent from the pending to send queue - for (Iterator iter = pendingSendArbitrationRounds.iterator(); iter.hasNext(); ) { - ArbitrationRound round = iter.next(); - round.removeParts(pendingSendArbitrationEntriesToDelete); + for (Iterator iter = pendingSendArbitrationRounds->iterator(); iter->hasNext(); ) { + ArbitrationRound round = iter->next(); + round->removeParts(pendingSendArbitrationEntriesToDelete); - if (round.isDoneSending()) { + if (round->isDoneSending()) { // Sent all the parts - iter.remove(); + iter->remove(); } } - for (Transaction transaction : transactionPartsSent.keySet()) { - transaction.resetServerFailure(); + for (Transaction *transaction : transactionPartsSent->keySet()) { + transaction->resetServerFailure(); // Update which transactions parts still need to be sent - transaction.removeSentParts(transactionPartsSent.get(transaction)); + transaction->removeSentParts(transactionPartsSent->get(transaction)); // Add the transaction status to the outstanding list - outstandingTransactionStatus.put(transaction.getSequenceNumber(), transaction.getTransactionStatus()); + outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus()); // Update the transaction status - transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentPartial); + transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial); // Check if all the transaction parts were successfully sent and if so then remove it from pending - if (transaction.didSendAllParts()) { - transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentFully); - pendingTransactionQueue.remove(transaction); + if (transaction->didSendAllParts()) { + transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully); + pendingTransactionQueue->remove(transaction); } } } else { - - // if (!sendSlotsReturn.getSecond()) { - // for (Transaction transaction : lastTransactionPartsSent.keySet()) { - // transaction.resetServerFailure(); - // } - // } else { - // for (Transaction transaction : lastTransactionPartsSent.keySet()) { - // transaction.resetServerFailure(); - - // // Update which transactions parts still need to be sent - // transaction.removeSentParts(transactionPartsSent.get(transaction)); - - // // Add the transaction status to the outstanding list - // outstandingTransactionStatus.put(transaction.getSequenceNumber(), transaction.getTransactionStatus()); - - // // Update the transaction status - // transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentPartial); - - // // Check if all the transaction parts were successfully sent and if so then remove it from pending - // if (transaction.didSendAllParts()) { - // transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentFully); - // pendingTransactionQueue.remove(transaction); - - // for (KeyValue kv : transaction.getKeyValueUpdateSet()) { - // System.out.println("Sent: " + kv + " from: " + localMachineId + " Slot:" + lastSlotAttemptedToSend.getSequenceNumber() + " Claimed:" + transaction.getSequenceNumber()); - // } - // } - // } - // } - // Reset which transaction to send - for (Transaction transaction : transactionPartsSent.keySet()) { - transaction.resetNextPartToSend(); - // transaction.resetNextPartToSend(); + for (Transaction *transaction : transactionPartsSent->keySet()) { + transaction->resetNextPartToSend(); // Set the transaction sequence number back to nothing - if (!transaction.didSendAPartToServer() && !transaction.getServerFailure()) { - transaction.setSequenceNumber(-1); + if (!transaction->didSendAPartToServer() && !transaction->getServerFailure()) { + transaction->setSequenceNumber(-1); } } } // Clear the sent data in preparation for next send - pendingSendArbitrationEntriesToDelete.clear(); - transactionPartsSent.clear(); + pendingSendArbitrationEntriesToDelete->clear(); + transactionPartsSent->clear(); - if (sendSlotsReturn.getThird().length != 0) { + if (sendSlotsReturn->getThird()->length() != 0) { // insert into the local block chain - validateAndUpdate(sendSlotsReturn.getThird(), true); + validateAndUpdate(sendSlotsReturn->getThird(), true); } } - } catch (ServerException e) { - - if (e.getType() != ServerException.TypeInputTimeout) { - // e.printStackTrace(); + } catch (ServerException *e) { + if (e->getType() != ServerException->TypeInputTimeout) { // Nothing was able to be sent to the server so just clear these data structures - for (Transaction transaction : transactionPartsSent.keySet()) { - transaction.resetNextPartToSend(); + for (Transaction *transaction : transactionPartsSent->keySet()) { + transaction->resetNextPartToSend(); // Set the transaction sequence number back to nothing - if (!transaction.didSendAPartToServer() && !transaction.getServerFailure()) { - transaction.setSequenceNumber(-1); + if (!transaction->didSendAPartToServer() && !transaction->getServerFailure()) { + transaction->setSequenceNumber(-1); } } } else { // There was a partial send to the server hadPartialSendToServer = true; - - // if (!fromRetry) { - // lastTransactionPartsSent = new Hashtable>(transactionPartsSent); - // lastPendingSendArbitrationEntriesToDelete = new Vector(pendingSendArbitrationEntriesToDelete); - // } - // Nothing was able to be sent to the server so just clear these data structures - for (Transaction transaction : transactionPartsSent.keySet()) { - transaction.resetNextPartToSend(); - transaction.setServerFailure(); + for (Transaction *transaction : transactionPartsSent->keySet()) { + transaction->resetNextPartToSend(); + transaction->setServerFailure(); } } - pendingSendArbitrationEntriesToDelete.clear(); - transactionPartsSent.clear(); + pendingSendArbitrationEntriesToDelete->clear(); + transactionPartsSent->clear(); throw e; } @@ -928,8 +788,8 @@ bool Table::sendToServer(NewKey newKey) { return newKey == NULL; } -synchronized bool Table::updateFromLocal(int64_t machineId) { - Pair localCommunicationInformation = localCommunicationTable.get(machineId); + bool Table::updateFromLocal(int64_t machineId) { + Pair localCommunicationInformation = localCommunicationTable->get(machineId); if (localCommunicationInformation == NULL) { // Cant talk to that device locally so do nothing return false; @@ -938,20 +798,20 @@ synchronized bool Table::updateFromLocal(int64_t machineId) { // Get the size of the send data int sendDataSize = sizeof(int32_t) + sizeof(int64_t); - Long lastArbitrationDataLocalSequenceNumber = (int64_t) -1; - if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.get(machineId) != NULL) { - lastArbitrationDataLocalSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.get(machineId); + int64_t lastArbitrationDataLocalSequenceNumber = (int64_t) -1; + if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(machineId) != NULL) { + lastArbitrationDataLocalSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(machineId); } - Array *sendData = new char[sendDataSize]; - ByteBuffer bbEncode = ByteBuffer.wrap(sendData); + Array *sendData = new Array(sendDataSize); + ByteBuffer * bbEncode = ByteBuffer_wrap(sendData); // Encode the data - bbEncode.putLong(lastArbitrationDataLocalSequenceNumber); - bbEncode.putInt(0); + bbEncode->putLong(lastArbitrationDataLocalSequenceNumber); + bbEncode->putInt(0); // Send by local - Array *returnData = cloud.sendLocalData(sendData, localSequenceNumber, localCommunicationInformation.getFirst(), localCommunicationInformation.getSecond()); + Array *returnData = cloud->sendLocalData(sendData, localSequenceNumber, localCommunicationInformation->getFirst(), localCommunicationInformation->getSecond()); localSequenceNumber++; if (returnData == NULL) { @@ -960,16 +820,16 @@ synchronized bool Table::updateFromLocal(int64_t machineId) { } // Decode the data - ByteBuffer bbDecode = ByteBuffer.wrap(returnData); - int numberOfEntries = bbDecode.getInt(); + ByteBuffer *bbDecode = ByteBuffer_wrap(returnData); + int numberOfEntries = bbDecode->getInt(); for (int i = 0; i < numberOfEntries; i++) { - char type = bbDecode.get(); - if (type == Entry.TypeAbort) { - Abort abort = (Abort)Abort.decode(NULL, bbDecode); + char type = bbDecode->get(); + if (type == TypeAbort) { + Abort *abort = (Abort)Abort_decode(NULL, bbDecode); processEntry(abort); - } else if (type == Entry.TypeCommitPart) { - CommitPart commitPart = (CommitPart)CommitPart.decode(NULL, bbDecode); + } else if (type == TypeCommitPart) { + CommitPart *commitPart = (CommitPart)CommitPart_decode(NULL, bbDecode); processEntry(commitPart); } } @@ -979,10 +839,10 @@ synchronized bool Table::updateFromLocal(int64_t machineId) { return true; } -Pair Table::sendTransactionToLocal(Transaction transaction) { +Pair Table::sendTransactionToLocal(Transaction *transaction) { // Get the devices local communications - Pair localCommunicationInformation = localCommunicationTable.get(transaction.getArbitrator()); + Pair localCommunicationInformation = localCommunicationTable->get(transaction->getArbitrator()); if (localCommunicationInformation == NULL) { // Cant talk to that device locally so do nothing @@ -991,29 +851,29 @@ Pair Table::sendTransactionToLocal(Transaction transaction) { // Get the size of the send data int sendDataSize = sizeof(int32_t) + sizeof(int64_t); - for (TransactionPart part : transaction.getParts().values()) { - sendDataSize += part.getSize(); + for (TransactionPart *part : transaction->getParts()->values()) { + sendDataSize += part->getSize(); } - Long lastArbitrationDataLocalSequenceNumber = (int64_t) -1; - if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.get(transaction.getArbitrator()) != NULL) { - lastArbitrationDataLocalSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.get(transaction.getArbitrator()); + int64_t lastArbitrationDataLocalSequenceNumber = (int64_t) -1; + if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(transaction->getArbitrator()) != NULL) { + lastArbitrationDataLocalSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(transaction->getArbitrator()); } // Make the send data size - Array *sendData = new char[sendDataSize]; - ByteBuffer bbEncode = ByteBuffer.wrap(sendData); + Array *sendData = new Array(sendDataSize); + ByteBuffer *bbEncode = ByteBuffer.wrap(sendData); // Encode the data - bbEncode.putLong(lastArbitrationDataLocalSequenceNumber); - bbEncode.putInt(transaction.getParts().size()); - for (TransactionPart part : transaction.getParts().values()) { - part.encode(bbEncode); + bbEncode->putLong(lastArbitrationDataLocalSequenceNumber); + bbEncode->putInt(transaction->getParts()->size()); + for (TransactionPart *part : transaction->getParts()->values()) { + part->encode(bbEncode); } // Send by local - Array *returnData = cloud.sendLocalData(sendData, localSequenceNumber, localCommunicationInformation.getFirst(), localCommunicationInformation.getSecond()); + Array *returnData = cloud->sendLocalData(sendData, localSequenceNumber, localCommunicationInformation->getFirst(), localCommunicationInformation->getSecond()); localSequenceNumber++; if (returnData == NULL) { @@ -1022,24 +882,24 @@ Pair Table::sendTransactionToLocal(Transaction transaction) { } // Decode the data - ByteBuffer bbDecode = ByteBuffer.wrap(returnData); - bool didCommit = bbDecode.get() == 1; - bool couldArbitrate = bbDecode.get() == 1; - int numberOfEntries = bbDecode.getInt(); + ByteBuffer *bbDecode = ByteBuffer_wrap(returnData); + bool didCommit = bbDecode->get() == 1; + bool couldArbitrate = bbDecode->get() == 1; + int numberOfEntries = bbDecode->getInt(); bool foundAbort = false; for (int i = 0; i < numberOfEntries; i++) { - char type = bbDecode.get(); - if (type == Entry.TypeAbort) { - Abort abort = (Abort)Abort.decode(NULL, bbDecode); + char type = bbDecode->get(); + if (type == TypeAbort) { + Abort abort = (Abort)Abort_decode(NULL, bbDecode); - if ((abort.getTransactionMachineId() == localMachineId) && (abort.getTransactionClientLocalSequenceNumber() == transaction.getClientLocalSequenceNumber())) { + if ((abort->getTransactionMachineId() == localMachineId) && (abort->getTransactionClientLocalSequenceNumber() == transaction->getClientLocalSequenceNumber())) { foundAbort = true; } processEntry(abort); - } else if (type == Entry.TypeCommitPart) { - CommitPart commitPart = (CommitPart)CommitPart.decode(NULL, bbDecode); + } else if (type == TypeCommitPart) { + CommitPart *commitPart = (CommitPart)CommitPart_decode(NULL, bbDecode); processEntry(commitPart); } } @@ -1047,30 +907,30 @@ Pair Table::sendTransactionToLocal(Transaction transaction) { updateLiveStateFromLocal(); if (couldArbitrate) { - TransactionStatus status = transaction.getTransactionStatus(); + TransactionStatus status = transaction->getTransactionStatus(); if (didCommit) { - status.setStatus(TransactionStatus.StatusCommitted); + status->setStatus(TransactionStatus_StatusCommitted); } else { - status.setStatus(TransactionStatus.StatusAborted); + status->setStatus(TransactionStatus_StatusAborted); } } else { - TransactionStatus status = transaction.getTransactionStatus(); + TransactionStatus status = transaction->getTransactionStatus(); if (foundAbort) { - status.setStatus(TransactionStatus.StatusAborted); + status->setStatus(TransactionStatus_StatusAborted); } else { - status.setStatus(TransactionStatus.StatusCommitted); + status->setStatus(TransactionStatus_StatusCommitted); } } return new Pair(false, true); } -synchronized Array *Table::acceptDataFromLocal(Array *data) { + Array *Table::acceptDataFromLocal(Array *data) { // Decode the data - ByteBuffer bbDecode = ByteBuffer.wrap(data); - int64_t lastArbitratedSequenceNumberSeen = bbDecode.getLong(); - int numberOfParts = bbDecode.getInt(); + ByteBuffer *bbDecode = ByteBuffer_wrap(data); + int64_t lastArbitratedSequenceNumberSeen = bbDecode->getLong(); + int numberOfParts = bbDecode->getInt(); // If we did commit a transaction or not bool didCommit = false; @@ -1079,60 +939,60 @@ synchronized Array *Table::acceptDataFromLocal(Array *data) { if (numberOfParts != 0) { // decode the transaction - Transaction transaction = new Transaction(); + Transaction *transaction = new Transaction(); for (int i = 0; i < numberOfParts; i++) { - bbDecode.get(); + bbDecode->get(); TransactionPart newPart = (TransactionPart)TransactionPart.decode(NULL, bbDecode); - transaction.addPartDecode(newPart); + transaction->addPartDecode(newPart); } // Arbitrate on transaction and pull relevant return data Pair localArbitrateReturn = arbitrateOnLocalTransaction(transaction); - couldArbitrate = localArbitrateReturn.getFirst(); - didCommit = localArbitrateReturn.getSecond(); + couldArbitrate = localArbitrateReturn->getFirst(); + didCommit = localArbitrateReturn->getSecond(); updateLiveStateFromLocal(); // Transaction was sent to the server so keep track of it to prevent double commit - if (transaction.getSequenceNumber() != -1) { - offlineTransactionsCommittedAndAtServer.add(transaction.getId()); + if (transaction->getSequenceNumber() != -1) { + offlineTransactionsCommittedAndAtServer->add(transaction->getId()); } } // The data to send back int returnDataSize = 0; - Vector unseenArbitrations = new Vector(); + Vector *unseenArbitrations = new Vector(); // Get the aborts to send back - Vector abortLocalSequenceNumbers = new Vector(liveAbortsGeneratedByLocal.keySet()); - Collections.sort(abortLocalSequenceNumbers); - for (Long localSequenceNumber : abortLocalSequenceNumbers) { + Vector *abortLocalSequenceNumbers = new Vector(liveAbortsGeneratedByLocal->keySet()); + Collections->sort(abortLocalSequenceNumbers); + for (int64_t localSequenceNumber : abortLocalSequenceNumbers) { if (localSequenceNumber <= lastArbitratedSequenceNumberSeen) { continue; } - Abort abort = liveAbortsGeneratedByLocal.get(localSequenceNumber); - unseenArbitrations.add(abort); - returnDataSize += abort.getSize(); + Abort abort = liveAbortsGeneratedByLocal->get(localSequenceNumber); + unseenArbitrations->add(abort); + returnDataSize += abort->getSize(); } // Get the commits to send back - Hashtable commitForClientTable = liveCommitsTable.get(localMachineId); + Hashtable* commitForClientTable = liveCommitsTable->get(localMachineId); if (commitForClientTable != NULL) { - Vector commitLocalSequenceNumbers = new Vector(commitForClientTable.keySet()); - Collections.sort(commitLocalSequenceNumbers); + Vector *commitLocalSequenceNumbers = new Vector(commitForClientTable->keySet()); + Collections->sort(commitLocalSequenceNumbers); - for (Long localSequenceNumber : commitLocalSequenceNumbers) { - Commit commit = commitForClientTable.get(localSequenceNumber); + for (int64_t localSequenceNumber : commitLocalSequenceNumbers) { + Commit commit = commitForClientTable->get(localSequenceNumber); if (localSequenceNumber <= lastArbitratedSequenceNumberSeen) { continue; } - unseenArbitrations.addAll(commit.getParts().values()); + unseenArbitrations->addAll(commit->getParts()->values()); - for (CommitPart commitPart : commit.getParts().values()) { - returnDataSize += commitPart.getSize(); + for (CommitPart commitPart : commit->getParts()->values()) { + returnDataSize += commitPart->getSize(); } } } @@ -1146,25 +1006,25 @@ synchronized Array *Table::acceptDataFromLocal(Array *data) { } // Data to send Back - Array *returnData = new char[returnDataSize]; - ByteBuffer bbEncode = ByteBuffer.wrap(returnData); + Array *returnData = new Array(returnDataSize); + ByteBuffer *bbEncode = ByteBuffer_wrap(returnData); if (numberOfParts != 0) { if (didCommit) { - bbEncode.put((char)1); + bbEncode->put((char)1); } else { - bbEncode.put((char)0); + bbEncode->put((char)0); } if (couldArbitrate) { - bbEncode.put((char)1); + bbEncode->put((char)1); } else { - bbEncode.put((char)0); + bbEncode->put((char)0); } } - bbEncode.putInt(unseenArbitrations.size()); - for (Entry entry : unseenArbitrations) { - entry.encode(bbEncode); + bbEncode->putInt(unseenArbitrations->size()); + for (Entry *entry : unseenArbitrations) { + entry->encode(bbEncode); } @@ -1172,21 +1032,21 @@ synchronized Array *Table::acceptDataFromLocal(Array *data) { return returnData; } -ThreeTuple *> Table::sendSlotsToServer(Slot slot, int newSize, bool isNewKey) { +ThreeTuple *> * Table::sendSlotsToServer(Slot *slot, int newSize, bool isNewKey) { bool attemptedToSendToServerTmp = attemptedToSendToServer; attemptedToSendToServer = true; bool inserted = false; bool lastTryInserted = false; - Array *array = cloud.putSlot(slot, newSize); + Array *array = cloud->putSlot(slot, newSize); if (array == NULL) { - array = new Array(); + array = new Array(); array->set(0, slot); - rejectedSlotVector.clear(); + rejectedSlotVector->clear(); inserted = true; } else { - if (array.length == 0) { + if (array->length() == 0) { throw new Error("Server Error: Did not send any slots"); } @@ -1194,25 +1054,25 @@ ThreeTuple *> Table::sendSlotsToServer(Slot slot, int ne if (hadPartialSendToServer) { bool isInserted = false; - for (Slot s : array) { - if ((s.getSequenceNumber() == slot.getSequenceNumber()) && (s.getMachineID() == localMachineId)) { + for (Slot *s : array) { + if ((s->getSequenceNumber() == slot->getSequenceNumber()) && (s->getMachineID() == localMachineId)) { isInserted = true; break; } } - for (Slot s : array) { + for (Slot *s : array) { if (isInserted) { break; } // Process each entry in the slot - for (Entry entry : s.getEntries()) { + for (Entry *entry : s->getEntries()) { - if (entry.getType() == Entry.TypeLastMessage) { + if (entry->getType() == TypeLastMessage) { LastMessage lastMessage = (LastMessage)entry; - if ((lastMessage.getMachineID() == localMachineId) && (lastMessage.getSequenceNumber() == slot.getSequenceNumber())) { + if ((lastMessage->getMachineID() == localMachineId) && (lastMessage->getSequenceNumber() == slot->getSequenceNumber())) { isInserted = true; break; } @@ -1221,24 +1081,24 @@ ThreeTuple *> Table::sendSlotsToServer(Slot slot, int ne } if (!isInserted) { - rejectedSlotVector.add(slot.getSequenceNumber()); + rejectedSlotVector->add(slot->getSequenceNumber()); lastTryInserted = false; } else { lastTryInserted = true; } } else { - rejectedSlotVector.add(slot.getSequenceNumber()); + rejectedSlotVector->add(slot->getSequenceNumber()); lastTryInserted = false; } } - return new ThreeTuple *>(inserted, lastTryInserted, array); + return new ThreeTuple *>(inserted, lastTryInserted, array); } /** * Returns false if a resize was needed */ -ThreeTuple *Table::fillSlot(Slot slot, bool resize, NewKey newKeyEntry) { +ThreeTuple *Table::fillSlot(Slot *slot, bool resize, NewKey * newKeyEntry) { int newSize = 0; @@ -1248,21 +1108,21 @@ ThreeTuple *Table::fillSlot(Slot slot, bool resize, NewKey } if (resize) { - newSize = (int) (numberOfSlots * RESIZE_MULTIPLE); - TableStatus status = new TableStatus(slot, newSize); - slot.addEntry(status); + newSize = (int) (numberOfSlots * Table_RESIZE_MULTIPLE); + TableStatus *status = new TableStatus(slot, newSize); + slot->addEntry(status); } // Fill with rejected slots first before doing anything else doRejectedMessages(slot); // Do mandatory rescue of entries - ThreeTuple mandatoryRescueReturn = doMandatoryResuce(slot, resize); + ThreeTuple mandatoryRescueReturn = doMandatoryResuce(slot, resize); // Extract working variables - bool needsResize = mandatoryRescueReturn.getFirst(); - bool seenLiveSlot = mandatoryRescueReturn.getSecond(); - int64_t currentRescueSequenceNumber = mandatoryRescueReturn.getThird(); + bool needsResize = mandatoryRescueReturn->getFirst(); + bool seenLiveSlot = mandatoryRescueReturn->getSecond(); + int64_t currentRescueSequenceNumber = mandatoryRescueReturn->getThird(); if (needsResize && !resize) { // We need to resize but we are not resizing so return false @@ -1271,40 +1131,40 @@ ThreeTuple *Table::fillSlot(Slot slot, bool resize, NewKey bool inserted = false; if (newKeyEntry != NULL) { - newKeyEntry.setSlot(slot); - if (slot.hasSpace(newKeyEntry)) { + newKeyEntry->setSlot(slot); + if (slot->hasSpace(newKeyEntry)) { - slot.addEntry(newKeyEntry); + slot->addEntry(newKeyEntry); inserted = true; } } // Clear the transactions, aborts and commits that were sent previously - transactionPartsSent.clear(); - pendingSendArbitrationEntriesToDelete.clear(); + transactionPartsSent->clear(); + pendingSendArbitrationEntriesToDelete->clear(); for (ArbitrationRound round : pendingSendArbitrationRounds) { bool isFull = false; - round.generateParts(); - Vector parts = round.getParts(); + round->generateParts(); + Vector* parts = round->getParts(); // Insert pending arbitration data for (Entry arbitrationData : parts) { // If it is an abort then we need to set some information if (arbitrationData instanceof Abort) { - ((Abort)arbitrationData).setSequenceNumber(slot.getSequenceNumber()); + ((Abort)arbitrationData)->setSequenceNumber(slot->getSequenceNumber()); } - if (!slot.hasSpace(arbitrationData)) { + if (!slot->hasSpace(arbitrationData)) { // No space so cant do anything else with these data entries isFull = true; break; } // Add to this current slot and add it to entries to delete - slot.addEntry(arbitrationData); - pendingSendArbitrationEntriesToDelete.add(arbitrationData); + slot->addEntry(arbitrationData); + pendingSendArbitrationEntriesToDelete->add(arbitrationData); } if (isFull) { @@ -1312,37 +1172,37 @@ ThreeTuple *Table::fillSlot(Slot slot, bool resize, NewKey } } - if (pendingTransactionQueue.size() > 0) { + if (pendingTransactionQueue->size() > 0) { - Transaction transaction = pendingTransactionQueue.get(0); + Transaction *transaction = pendingTransactionQueue->get(0); // Set the transaction sequence number if it has yet to be inserted into the block chain - // if ((!transaction.didSendAPartToServer() && !transaction.getServerFailure()) || (transaction.getSequenceNumber() == -1)) { - // transaction.setSequenceNumber(slot.getSequenceNumber()); + // if ((!transaction->didSendAPartToServer() && !transaction->getServerFailure()) || (transaction->getSequenceNumber() == -1)) { + // transaction->setSequenceNumber(slot->getSequenceNumber()); // } - if ((!transaction.didSendAPartToServer()) || (transaction.getSequenceNumber() == -1)) { - transaction.setSequenceNumber(slot.getSequenceNumber()); + if ((!transaction->didSendAPartToServer()) || (transaction->getSequenceNumber() == -1)) { + transaction->setSequenceNumber(slot->getSequenceNumber()); } while (true) { - TransactionPart part = transaction.getNextPartToSend(); + TransactionPart *part = transaction->getNextPartToSend(); if (part == NULL) { // Ran out of parts to send for this transaction so move on break; } - if (slot.hasSpace(part)) { - slot.addEntry(part); - Vector partsSent = transactionPartsSent.get(transaction); + if (slot->hasSpace(part)) { + slot->addEntry(part); + Vector* partsSent = transactionPartsSent->get(transaction); if (partsSent == NULL) { partsSent = new Vector(); - transactionPartsSent.put(transaction, partsSent); + transactionPartsSent->put(transaction, partsSent); } - partsSent.add(part.getPartNumber()); - transactionPartsSent.put(transaction, partsSent); + partsSent->add(part->getPartNumber()); + transactionPartsSent->put(transaction, partsSent); } else { break; } @@ -1355,48 +1215,48 @@ ThreeTuple *Table::fillSlot(Slot slot, bool resize, NewKey return new ThreeTuple(false, newSize, inserted); } -void Table::doRejectedMessages(Slot s) { - if (!rejectedSlotVector.isEmpty()) { +void Table::doRejectedMessages(Slot *s) { + if (!rejectedSlotVector->isEmpty()) { /* TODO: We should avoid generating a rejected message entry if - * there is already a sufficient entry in the queue (e.g., - * equalsto value of true and same sequence number). */ - - int64_t old_seqn = rejectedSlotVector.firstElement(); - if (rejectedSlotVector.size() > REJECTED_THRESHOLD) { - int64_t new_seqn = rejectedSlotVector.lastElement(); - RejectedMessage rm = new RejectedMessage(s, s.getSequenceNumber(), localMachineId, old_seqn, new_seqn, false); - s.addEntry(rm); + * there is already a sufficient entry in the queue (e->g->, + * equalsto value of true and same sequence number)-> */ + + int64_t old_seqn = rejectedSlotVector->firstElement(); + if (rejectedSlotVector->size() > Table_REJECTED_THRESHOLD) { + int64_t new_seqn = rejectedSlotVector->lastElement(); + RejectedMessage rm = new RejectedMessage(s, s->getSequenceNumber(), localMachineId, old_seqn, new_seqn, false); + s->addEntry(rm); } else { int64_t prev_seqn = -1; int i = 0; /* Go through list of missing messages */ - for (; i < rejectedSlotVector.size(); i++) { - int64_t curr_seqn = rejectedSlotVector.get(i); - Slot s_msg = buffer.getSlot(curr_seqn); + for (; i < rejectedSlotVector->size(); i++) { + int64_t curr_seqn = rejectedSlotVector->get(i); + Slot *s_msg = buffer->getSlot(curr_seqn); if (s_msg != NULL) break; prev_seqn = curr_seqn; } /* Generate rejected message entry for missing messages */ if (prev_seqn != -1) { - RejectedMessage rm = new RejectedMessage(s, s.getSequenceNumber(), localMachineId, old_seqn, prev_seqn, false); - s.addEntry(rm); + RejectedMessage rm = new RejectedMessage(s, s->getSequenceNumber(), localMachineId, old_seqn, prev_seqn, false); + s->addEntry(rm); } /* Generate rejected message entries for present messages */ - for (; i < rejectedSlotVector.size(); i++) { - int64_t curr_seqn = rejectedSlotVector.get(i); - Slot s_msg = buffer.getSlot(curr_seqn); - int64_t machineid = s_msg.getMachineID(); - RejectedMessage rm = new RejectedMessage(s, s.getSequenceNumber(), machineid, curr_seqn, curr_seqn, true); - s.addEntry(rm); + for (; i < rejectedSlotVector->size(); i++) { + int64_t curr_seqn = rejectedSlotVector->get(i); + Slot *s_msg = buffer->getSlot(curr_seqn); + int64_t machineid = s_msg->getMachineID(); + RejectedMessage rm = new RejectedMessage(s, s->getSequenceNumber(), machineid, curr_seqn, curr_seqn, true); + s->addEntry(rm); } } } } -ThreeTuple Table::doMandatoryResuce(Slot slot, bool resize) { - int64_t newestSequenceNumber = buffer.getNewestSeqNum(); - int64_t oldestSequenceNumber = buffer.getOldestSeqNum(); +ThreeTuple Table::doMandatoryResuce(Slot *slot, bool resize) { + int64_t newestSequenceNumber = buffer->getNewestSeqNum(); + int64_t oldestSequenceNumber = buffer->getOldestSeqNum(); if (oldestLiveSlotSequenceNumver < oldestSequenceNumber) { oldestLiveSlotSequenceNumver = oldestSequenceNumber; } @@ -1404,18 +1264,18 @@ ThreeTuple Table::doMandatoryResuce(Slot slot, bool resize) { int64_t currentSequenceNumber = oldestLiveSlotSequenceNumver; bool seenLiveSlot = false; int64_t firstIfFull = newestSequenceNumber + 1 - numberOfSlots; // smallest seq number in the buffer if it is full - int64_t threshold = firstIfFull + FREE_SLOTS; // we want the buffer to be clear of live entries up to this point + int64_t threshold = firstIfFull + Table_FREE_SLOTS; // we want the buffer to be clear of live entries up to this point // Mandatory Rescue for (; currentSequenceNumber < threshold; currentSequenceNumber++) { - Slot previousSlot = buffer.getSlot(currentSequenceNumber); + Slot previousSlot = buffer->getSlot(currentSequenceNumber); // Push slot number forward if (!seenLiveSlot) { oldestLiveSlotSequenceNumver = currentSequenceNumber; } - if (!previousSlot.isLive()) { + if (!previousSlot->isLive()) { continue; } @@ -1423,50 +1283,50 @@ ThreeTuple Table::doMandatoryResuce(Slot slot, bool resize) { seenLiveSlot = true; // Get all the live entries for a slot - Vector liveEntries = previousSlot.getLiveEntries(resize); + Vector* liveEntries = previousSlot->getLiveEntries(resize); // Iterate over all the live entries and try to rescue them for (Entry liveEntry : liveEntries) { - if (slot.hasSpace(liveEntry)) { + if (slot->hasSpace(liveEntry)) { // Enough space to rescue the entry - slot.addEntry(liveEntry); + slot->addEntry(liveEntry); } else if (currentSequenceNumber == firstIfFull) { //if there's no space but the entry is about to fall off the queue - System.out.println("B"); //? - return new ThreeTuple(true, seenLiveSlot, currentSequenceNumber); + System->out->println("B"); //? + return new ThreeTuple(true, seenLiveSlot, currentSequenceNumber); } } } // Did not resize - return new ThreeTuple(false, seenLiveSlot, currentSequenceNumber); + return new ThreeTuple(false, seenLiveSlot, currentSequenceNumber); } -void Table::doOptionalRescue(Slot s, bool seenliveslot, int64_t seqn, bool resize) { +void Table::doOptionalRescue(Slot *s, bool seenliveslot, int64_t seqn, bool resize) { /* now go through live entries from least to greatest sequence number until * either all live slots added, or the slot doesn't have enough room * for SKIP_THRESHOLD consecutive entries*/ int skipcount = 0; - int64_t newestseqnum = buffer.getNewestSeqNum(); + int64_t newestseqnum = buffer->getNewestSeqNum(); search: for (; seqn <= newestseqnum; seqn++) { - Slot prevslot = buffer.getSlot(seqn); + Slot prevslot = buffer->getSlot(seqn); //Push slot number forward if (!seenliveslot) oldestLiveSlotSequenceNumver = seqn; - if (!prevslot.isLive()) + if (!prevslot->isLive()) continue; seenliveslot = true; - Vector liveentries = prevslot.getLiveEntries(resize); - for (Entry liveentry : liveentries) { - if (s.hasSpace(liveentry)) - s.addEntry(liveentry); + Vector* liveentries = prevslot->getLiveEntries(resize); + for (Entry *liveentry : liveentries) { + if (s->hasSpace(liveentry)) + s->addEntry(liveentry); else { skipcount++; - if (skipcount > SKIP_THRESHOLD) + if (skipcount > Table_SKIP_THRESHOLD) break search; } } @@ -1474,17 +1334,17 @@ search: } /** - * Checks for malicious activity and updates the local copy of the block chain. + * Checks for malicious activity and updates the local copy of the block chain-> */ -void Table::validateAndUpdate(Array *newSlots, bool acceptUpdatesToLocal) { +void Table::validateAndUpdate(Array *newSlots, bool acceptUpdatesToLocal) { // The cloud communication layer has checked slot HMACs already before decoding - if (newSlots.length == 0) { + if (newSlots->length() == 0) { return; } // Make sure all slots are newer than the last largest slot this client has seen - int64_t firstSeqNum = newSlots[0].getSequenceNumber(); + int64_t firstSeqNum = newSlots[0]->getSequenceNumber(); if (firstSeqNum <= sequenceNumber) { throw new Error("Server Error: Sent older slots!"); } @@ -1497,49 +1357,49 @@ void Table::validateAndUpdate(Array *newSlots, bool acceptUpdatesToLocal) checkHMACChain(indexer, newSlots); // Set to keep track of messages from clients - Hashset machineSet = new Hashset(lastMessageTable.keySet()); + Hashset *machineSet = new Hashset(lastMessageTable->keySet()); // Process each slots data - for (Slot slot : newSlots) { + for (Slot *slot : newSlots) { processSlot(indexer, slot, acceptUpdatesToLocal, machineSet); updateExpectedSize(); } - // If there is a gap, check to see if the server sent us everything. + // If there is a gap, check to see if the server sent us everything-> if (firstSeqNum != (sequenceNumber + 1)) { - // Check the size of the slots that were sent down by the server. + // Check the size of the slots that were sent down by the server-> // Can only check the size if there was a gap - checkNumSlots(newSlots.length); + checkNumSlots(newSlots->length); // Since there was a gap every machine must have pushed a slot or must have - // a last message message. If not then the server is hiding slots - if (!machineSet.isEmpty()) { + // a last message message-> If not then the server is hiding slots + if (!machineSet->isEmpty()) { throw new Error("Missing record for machines: " + machineSet); } } - // Update the size of our local block chain. + // Update the size of our local block chain-> commitNewMaxSize(); - // Commit new to slots to the local block chain. - for (Slot slot : newSlots) { + // Commit new to slots to the local block chain-> + for (Slot *slot : newSlots) { - // Insert this slot into our local block chain copy. - buffer.putSlot(slot); + // Insert this slot into our local block chain copy-> + buffer->putSlot(slot); - // Keep track of how many slots are currently live (have live data in them). + // Keep track of how many slots are currently live (have live data in them)-> liveSlotCount++; } // Get the sequence number of the latest slot in the system - sequenceNumber = newSlots[newSlots.length - 1].getSequenceNumber(); + sequenceNumber = newSlots[newSlots->length() - 1]->getSequenceNumber(); updateLiveStateFromServer(); // No Need to remember after we pulled from the server - offlineTransactionsCommittedAndAtServer.clear(); + offlineTransactionsCommittedAndAtServer->clear(); // This is invalidated now hadPartialSendToServer = false; @@ -1584,14 +1444,14 @@ void Table::initExpectedSize(int64_t firstSequenceNumber, int64_t numberOfSlots) if (didFindTableStatus) { // expectedsize = (prevslots < ((int64_t) numberOfSlots)) ? (int) prevslots : expectedsize; - // System.out.println("Here2: " + expectedsize + " " + numberOfSlots + " " + prevslots); + // System->out->println("Here2: " + expectedsize + " " + numberOfSlots + " " + prevslots); } else { expectedsize = (prevslots < ((int64_t) numberOfSlots)) ? (int) prevslots : numberOfSlots; - // System.out.println("Here: " + expectedsize); + // System->out->println("Here: " + expectedsize); } - // System.out.println(numberOfSlots); + // System->out->println(numberOfSlots); didFindTableStatus = true; currMaxSize = numberOfSlots; @@ -1607,14 +1467,14 @@ void Table::updateExpectedSize() { /** - * Check the size of the block chain to make sure there are enough slots sent back by the server. + * Check the size of the block chain to make sure there are enough slots sent back by the server-> * This is only called when we have a gap between the slots that we have locally and the slots * sent by the server therefore in the slots sent by the server there will be at least 1 Table * status message */ void Table::checkNumSlots(int numberOfSlots) { if (numberOfSlots != expectedsize) { - throw new Error("Server Error: Server did not send all slots. Expected: " + expectedsize + " Received:" + numberOfSlots); + throw new Error("Server Error: Server did not send all slots-> Expected: " + expectedsize + " Received:" + numberOfSlots); } } @@ -1624,14 +1484,14 @@ void Table::updateCurrMaxSize(int newmaxsize) { /** - * Update the size of of the local buffer if it is needed. + * Update the size of of the local buffer if it is needed-> */ void Table::commitNewMaxSize() { didFindTableStatus = false; // Resize the local slot buffer if (numberOfSlots != currMaxSize) { - buffer.resize((int)currMaxSize); + buffer->resize((int)currMaxSize); } // Change the number of local slots to the new size @@ -1647,75 +1507,72 @@ void Table::commitNewMaxSize() { */ void Table::processNewTransactionParts() { - if (newTransactionParts.size() == 0) { + if (newTransactionParts->size() == 0) { // Nothing new to process return; } // Iterate through all the machine Ids that we received new parts for - for (Long machineId : newTransactionParts.keySet()) { - Hashtable, TransactionPart> parts = newTransactionParts.get(machineId); + for (int64_t machineId : newTransactionParts->keySet()) { + Hashtable*, TransactionPart*> * parts = newTransactionParts->get(machineId); // Iterate through all the parts for that machine Id - for (Pair partId : parts.keySet()) { - TransactionPart part = parts.get(partId); + for (Pair* partId : parts->keySet()) { + TransactionPart *part = parts->get(partId); - Long lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable.get(part.getArbitratorId()); - if ((lastTransactionNumber != NULL) && (lastTransactionNumber >= part.getSequenceNumber())) { + int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(part->getArbitratorId()); + if ((lastTransactionNumber != NULL) && (lastTransactionNumber >= part->getSequenceNumber())) { // Set dead the transaction part - part.setDead(); + part->setDead(); continue; } // Get the transaction object for that sequence number - Transaction transaction = liveTransactionBySequenceNumberTable.get(part.getSequenceNumber()); + Transaction *transaction = liveTransactionBySequenceNumberTable->get(part->getSequenceNumber()); if (transaction == NULL) { // This is a new transaction that we dont have so make a new one transaction = new Transaction(); // Insert this new transaction into the live tables - liveTransactionBySequenceNumberTable.put(part.getSequenceNumber(), transaction); - liveTransactionByTransactionIdTable.put(part.getTransactionId(), transaction); + liveTransactionBySequenceNumberTable->put(part->getSequenceNumber(), transaction); + liveTransactionByTransactionIdTable->put(part->getTransactionId(), transaction); } // Add that part to the transaction - transaction.addPartDecode(part); + transaction->addPartDecode(part); } } // Clear all the new transaction parts in preparation for the next time the server sends slots - newTransactionParts.clear(); + newTransactionParts->clear(); } - -int64_t lastSeqNumArbOn = 0; - void Table::arbitrateFromServer() { - if (liveTransactionBySequenceNumberTable.size() == 0) { + if (liveTransactionBySequenceNumberTable->size() == 0) { // Nothing to arbitrate on so move on return; } // Get the transaction sequence numbers and sort from oldest to newest - Vector transactionSequenceNumbers = new Vector(liveTransactionBySequenceNumberTable.keySet()); - Collections.sort(transactionSequenceNumbers); + Vector *transactionSequenceNumbers = new Vector(liveTransactionBySequenceNumberTable->keySet()); + Collections->sort(transactionSequenceNumbers); // Collection of key value pairs that are - Hashtable speculativeTableTmp = new Hashtable(); + Hashtable speculativeTableTmp = new Hashtable(); // The last transaction arbitrated on int64_t lastTransactionCommitted = -1; - Set generatedAborts = new Hashset(); + Hashset* generatedAborts = new Hashset(); - for (Long transactionSequenceNumber : transactionSequenceNumbers) { - Transaction transaction = liveTransactionBySequenceNumberTable.get(transactionSequenceNumber); + for (int64_t transactionSequenceNumber : transactionSequenceNumbers) { + Transaction *transaction = liveTransactionBySequenceNumberTable->get(transactionSequenceNumber); // Check if this machine arbitrates for this transaction if not then we cant arbitrate this transaction - if (transaction.getArbitrator() != localMachineId) { + if (transaction->getArbitrator() != localMachineId) { continue; } @@ -1723,13 +1580,13 @@ void Table::arbitrateFromServer() { continue; } - if (offlineTransactionsCommittedAndAtServer.contains(transaction.getId())) { + if (offlineTransactionsCommittedAndAtServer->contains(transaction->getId())) { // We have seen this already locally so dont commit again continue; } - if (!transaction.isComplete()) { + if (!transaction->isComplete()) { // Will arbitrate in incorrect order if we continue so just break // Most likely this break; @@ -1737,21 +1594,21 @@ void Table::arbitrateFromServer() { // update the largest transaction seen by arbitrator from server - if (lastTransactionSeenFromMachineFromServer.get(transaction.getMachineId()) == NULL) { - lastTransactionSeenFromMachineFromServer.put(transaction.getMachineId(), transaction.getClientLocalSequenceNumber()); + if (lastTransactionSeenFromMachineFromServer->get(transaction->getMachineId()) == NULL) { + lastTransactionSeenFromMachineFromServer->put(transaction->getMachineId(), transaction->getClientLocalSequenceNumber()); } else { - Long lastTransactionSeenFromMachine = lastTransactionSeenFromMachineFromServer.get(transaction.getMachineId()); - if (transaction.getClientLocalSequenceNumber() > lastTransactionSeenFromMachine) { - lastTransactionSeenFromMachineFromServer.put(transaction.getMachineId(), transaction.getClientLocalSequenceNumber()); + int64_t lastTransactionSeenFromMachine = lastTransactionSeenFromMachineFromServer->get(transaction->getMachineId()); + if (transaction->getClientLocalSequenceNumber() > lastTransactionSeenFromMachine) { + lastTransactionSeenFromMachineFromServer->put(transaction->getMachineId(), transaction->getClientLocalSequenceNumber()); } } - if (transaction.evaluateGuard(committedKeyValueTable, speculativeTableTmp, NULL)) { + if (transaction->evaluateGuard(committedKeyValueTable, speculativeTableTmp, NULL)) { // Guard evaluated as true // Update the local changes so we can make the commit - for (KeyValue kv : transaction.getKeyValueUpdateSet()) { - speculativeTableTmp.put(kv.getKey(), kv); + for (KeyValue *kv : transaction->getKeyValueUpdateSet()) { + speculativeTableTmp->put(kv->getKey(), kv); } // Update what the last transaction committed was for use in batch commit @@ -1761,14 +1618,14 @@ void Table::arbitrateFromServer() { // Create the abort Abort newAbort = new Abort(NULL, - transaction.getClientLocalSequenceNumber(), - transaction.getSequenceNumber(), - transaction.getMachineId(), - transaction.getArbitrator(), + transaction->getClientLocalSequenceNumber(), + transaction->getSequenceNumber(), + transaction->getMachineId(), + transaction->getArbitrator(), localArbitrationSequenceNumber); localArbitrationSequenceNumber++; - generatedAborts.add(newAbort); + generatedAborts->add(newAbort); // Insert the abort so we can process processEntry(newAbort); @@ -1776,42 +1633,42 @@ void Table::arbitrateFromServer() { lastSeqNumArbOn = transactionSequenceNumber; - // liveTransactionBySequenceNumberTable.remove(transactionSequenceNumber); + // liveTransactionBySequenceNumberTable->remove(transactionSequenceNumber); } Commit newCommit = NULL; // If there is something to commit - if (speculativeTableTmp.size() != 0) { + if (speculativeTableTmp->size() != 0) { // Create the commit and increment the commit sequence number newCommit = new Commit(localArbitrationSequenceNumber, localMachineId, lastTransactionCommitted); localArbitrationSequenceNumber++; // Add all the new keys to the commit - for (KeyValue kv : speculativeTableTmp.values()) { - newCommit.addKV(kv); + for (KeyValue *kv : speculativeTableTmp->values()) { + newCommit->addKV(kv); } // create the commit parts - newCommit.createCommitParts(); + newCommit->createCommitParts(); // Append all the commit parts to the end of the pending queue waiting for sending to the server // Insert the commit so we can process it - for (CommitPart commitPart : newCommit.getParts().values()) { + for (CommitPart commitPart : newCommit->getParts()->values()) { processEntry(commitPart); } } - if ((newCommit != NULL) || (generatedAborts.size() > 0)) { + if ((newCommit != NULL) || (generatedAborts->size() > 0)) { ArbitrationRound arbitrationRound = new ArbitrationRound(newCommit, generatedAborts); - pendingSendArbitrationRounds.add(arbitrationRound); + pendingSendArbitrationRounds->add(arbitrationRound); if (compactArbitrationData()) { - ArbitrationRound newArbitrationRound = pendingSendArbitrationRounds.get(pendingSendArbitrationRounds.size() - 1); - if (newArbitrationRound.getCommit() != NULL) { - for (CommitPart commitPart : newArbitrationRound.getCommit().getParts().values()) { + ArbitrationRound newArbitrationRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1); + if (newArbitrationRound->getCommit() != NULL) { + for (CommitPart commitPart : newArbitrationRound->getCommit()->getParts()->values()) { processEntry(commitPart); } } @@ -1819,30 +1676,30 @@ void Table::arbitrateFromServer() { } } -Pair Table::arbitrateOnLocalTransaction(Transaction transaction) { +Pair Table::arbitrateOnLocalTransaction(Transaction *transaction) { // Check if this machine arbitrates for this transaction if not then we cant arbitrate this transaction - if (transaction.getArbitrator() != localMachineId) { + if (transaction->getArbitrator() != localMachineId) { return new Pair(false, false); } - if (!transaction.isComplete()) { + if (!transaction->isComplete()) { // Will arbitrate in incorrect order if we continue so just break // Most likely this return new Pair(false, false); } - if (transaction.getMachineId() != localMachineId) { + if (transaction->getMachineId() != localMachineId) { // dont do this check for local transactions - if (lastTransactionSeenFromMachineFromServer.get(transaction.getMachineId()) != NULL) { - if (lastTransactionSeenFromMachineFromServer.get(transaction.getMachineId()) > transaction.getClientLocalSequenceNumber()) { + if (lastTransactionSeenFromMachineFromServer->get(transaction->getMachineId()) != NULL) { + if (lastTransactionSeenFromMachineFromServer->get(transaction->getMachineId()) > transaction->getClientLocalSequenceNumber()) { // We've have already seen this from the server return new Pair(false, false); } } } - if (transaction.evaluateGuard(committedKeyValueTable, NULL, NULL)) { + if (transaction->evaluateGuard(committedKeyValueTable, NULL, NULL)) { // Guard evaluated as true // Create the commit and increment the commit sequence number @@ -1850,33 +1707,33 @@ Pair Table::arbitrateOnLocalTransaction(Transaction transaction) { localArbitrationSequenceNumber++; // Update the local changes so we can make the commit - for (KeyValue kv : transaction.getKeyValueUpdateSet()) { - newCommit.addKV(kv); + for (KeyValue *kv : transaction->getKeyValueUpdateSet()) { + newCommit->addKV(kv); } // create the commit parts - newCommit.createCommitParts(); + newCommit->createCommitParts(); // Append all the commit parts to the end of the pending queue waiting for sending to the server ArbitrationRound *arbitrationRound = new ArbitrationRound(newCommit, new Hashset()); - pendingSendArbitrationRounds.add(arbitrationRound); + pendingSendArbitrationRounds->add(arbitrationRound); if (compactArbitrationData()) { - ArbitrationRound newArbitrationRound = pendingSendArbitrationRounds.get(pendingSendArbitrationRounds.size() - 1); - for (CommitPart commitPart : newArbitrationRound.getCommit().getParts().values()) { + ArbitrationRound newArbitrationRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1); + for (CommitPart commitPart : newArbitrationRound->getCommit()->getParts()->values()) { processEntry(commitPart); } } else { // Insert the commit so we can process it - for (CommitPart commitPart : newCommit.getParts().values()) { + for (CommitPart commitPart : newCommit->getParts()->values()) { processEntry(commitPart); } } - if (transaction.getMachineId() == localMachineId) { - TransactionStatus status = transaction.getTransactionStatus(); + if (transaction->getMachineId() == localMachineId) { + TransactionStatus status = transaction->getTransactionStatus(); if (status != NULL) { - status.setStatus(TransactionStatus.StatusCommitted); + status->setStatus(TransactionStatus_StatusCommitted); } } @@ -1884,13 +1741,13 @@ Pair Table::arbitrateOnLocalTransaction(Transaction transaction) { return new Pair(true, true); } else { - if (transaction.getMachineId() == localMachineId) { + if (transaction->getMachineId() == localMachineId) { // For locally created messages update the status // Guard evaluated was false so create abort - TransactionStatus status = transaction.getTransactionStatus(); + TransactionStatus status = transaction->getTransactionStatus(); if (status != NULL) { - status.setStatus(TransactionStatus.StatusAborted); + status->setStatus(TransactionStatus_StatusAborted); } } else { Hashset addAbortSet = new Hashset(); @@ -1898,23 +1755,23 @@ Pair Table::arbitrateOnLocalTransaction(Transaction transaction) { // Create the abort Abort newAbort = new Abort(NULL, - transaction.getClientLocalSequenceNumber(), + transaction->getClientLocalSequenceNumber(), -1, - transaction.getMachineId(), - transaction.getArbitrator(), + transaction->getMachineId(), + transaction->getArbitrator(), localArbitrationSequenceNumber); localArbitrationSequenceNumber++; - addAbortSet.add(newAbort); + addAbortSet->add(newAbort); // Append all the commit parts to the end of the pending queue waiting for sending to the server ArbitrationRound arbitrationRound = new ArbitrationRound(NULL, addAbortSet); - pendingSendArbitrationRounds.add(arbitrationRound); + pendingSendArbitrationRounds->add(arbitrationRound); if (compactArbitrationData()) { - ArbitrationRound newArbitrationRound = pendingSendArbitrationRounds.get(pendingSendArbitrationRounds.size() - 1); - for (CommitPart commitPart : newArbitrationRound.getCommit().getParts().values()) { + ArbitrationRound newArbitrationRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1); + for (CommitPart commitPart : newArbitrationRound->getCommit()->getParts()->values()) { processEntry(commitPart); } } @@ -1930,59 +1787,59 @@ Pair Table::arbitrateOnLocalTransaction(Transaction transaction) { */ bool Table::compactArbitrationData() { - if (pendingSendArbitrationRounds.size() < 2) { + if (pendingSendArbitrationRounds->size() < 2) { // Nothing to compact so do nothing return false; } - ArbitrationRound lastRound = pendingSendArbitrationRounds.get(pendingSendArbitrationRounds.size() - 1); - if (lastRound.didSendPart()) { + ArbitrationRound lastRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1); + if (lastRound->didSendPart()) { return false; } - bool hadCommit = (lastRound.getCommit() == NULL); + bool hadCommit = (lastRound->getCommit() == NULL); bool gotNewCommit = false; int numberToDelete = 1; - while (numberToDelete < pendingSendArbitrationRounds.size()) { - ArbitrationRound round = pendingSendArbitrationRounds.get(pendingSendArbitrationRounds.size() - numberToDelete - 1); + while (numberToDelete < pendingSendArbitrationRounds->size()) { + ArbitrationRound round = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - numberToDelete - 1); - if (round.isFull() || round.didSendPart()) { + if (round->isFull() || round->didSendPart()) { // Stop since there is a part that cannot be compacted and we need to compact in order break; } - if (round.getCommit() == NULL) { + if (round->getCommit() == NULL) { // Try compacting aborts only - int newSize = round.getCurrentSize() + lastRound.getAbortsCount(); - if (newSize > ArbitrationRound.MAX_PARTS) { + int newSize = round->getCurrentSize() + lastRound->getAbortsCount(); + if (newSize > ArbitrationRound->MAX_PARTS) { // Cant compact since it would be too large break; } - lastRound.addAborts(round.getAborts()); + lastRound->addAborts(round->getAborts()); } else { // Create a new larger commit - Commit newCommit = Commit.merge(lastRound.getCommit(), round.getCommit(), localArbitrationSequenceNumber); + Commit newCommit = Commit->merge(lastRound->getCommit(), round->getCommit(), localArbitrationSequenceNumber); localArbitrationSequenceNumber++; // Create the commit parts so that we can count them - newCommit.createCommitParts(); + newCommit->createCommitParts(); // Calculate the new size of the parts - int newSize = newCommit.getNumberOfParts(); - newSize += lastRound.getAbortsCount(); - newSize += round.getAbortsCount(); + int newSize = newCommit->getNumberOfParts(); + newSize += lastRound->getAbortsCount(); + newSize += round->getAbortsCount(); - if (newSize > ArbitrationRound.MAX_PARTS) { + if (newSize > ArbitrationRound->MAX_PARTS) { // Cant compact since it would be too large break; } // Set the new compacted part - lastRound.setCommit(newCommit); - lastRound.addAborts(round.getAborts()); + lastRound->setCommit(newCommit); + lastRound->addAborts(round->getAborts()); gotNewCommit = true; } @@ -1993,16 +1850,16 @@ bool Table::compactArbitrationData() { // If there is a compaction // Delete the previous pieces that are now in the new compacted piece - if (numberToDelete == pendingSendArbitrationRounds.size()) { - pendingSendArbitrationRounds.clear(); + if (numberToDelete == pendingSendArbitrationRounds->size()) { + pendingSendArbitrationRounds->clear(); } else { for (int i = 0; i < numberToDelete; i++) { - pendingSendArbitrationRounds.remove(pendingSendArbitrationRounds.size() - 1); + pendingSendArbitrationRounds->remove(pendingSendArbitrationRounds->size() - 1); } } // Add the new compacted into the pending to send list - pendingSendArbitrationRounds.add(lastRound); + pendingSendArbitrationRounds->add(lastRound); // Should reinsert into the commit processor if (hadCommit && gotNewCommit) { @@ -2021,117 +1878,117 @@ bool Table::compactArbitrationData() { */ bool Table::updateCommittedTable() { - if (newCommitParts.size() == 0) { + if (newCommitParts->size() == 0) { // Nothing new to process return false; } // Iterate through all the machine Ids that we received new parts for - for (Long machineId : newCommitParts.keySet()) { - Hashtable, CommitPart> parts = newCommitParts.get(machineId); + for (int64_t machineId : newCommitParts->keySet()) { + Hashtable*, CommitPart*>* parts = newCommitParts->get(machineId); // Iterate through all the parts for that machine Id - for (Pair partId : parts.keySet()) { - CommitPart part = parts.get(partId); + for (Pair* partId : parts->keySet()) { + CommitPart part = parts->get(partId); // Get the transaction object for that sequence number - Hashtable commitForClientTable = liveCommitsTable.get(part.getMachineId()); + Hashtable* commitForClientTable = liveCommitsTable->get(part->getMachineId()); if (commitForClientTable == NULL) { // This is the first commit from this device - commitForClientTable = new Hashtable(); - liveCommitsTable.put(part.getMachineId(), commitForClientTable); + commitForClientTable = new Hashtable(); + liveCommitsTable->put(part->getMachineId(), commitForClientTable); } - Commit commit = commitForClientTable.get(part.getSequenceNumber()); + Commit commit = commitForClientTable->get(part->getSequenceNumber()); if (commit == NULL) { // This is a new commit that we dont have so make a new one commit = new Commit(); // Insert this new commit into the live tables - commitForClientTable.put(part.getSequenceNumber(), commit); + commitForClientTable->put(part->getSequenceNumber(), commit); } // Add that part to the commit - commit.addPartDecode(part); + commit->addPartDecode(part); } } // Clear all the new commits parts in preparation for the next time the server sends slots - newCommitParts.clear(); + newCommitParts->clear(); // If we process a new commit keep track of it for future use bool didProcessANewCommit = false; // Process the commits one by one - for (Long arbitratorId : liveCommitsTable.keySet()) { + for (int64_T arbitratorId : liveCommitsTable->keySet()) { // Get all the commits for a specific arbitrator - Hashtable commitForClientTable = liveCommitsTable.get(arbitratorId); + Hashtable commitForClientTable = liveCommitsTable->get(arbitratorId); // Sort the commits in order - Vector commitSequenceNumbers = new Vector(commitForClientTable.keySet()); - Collections.sort(commitSequenceNumbers); + Vector* commitSequenceNumbers = new Vector(commitForClientTable->keySet()); + Collections->sort(commitSequenceNumbers); // Get the last commit seen from this arbitrator int64_t lastCommitSeenSequenceNumber = -1; - if (lastCommitSeenSequenceNumberByArbitratorTable.get(arbitratorId) != NULL) { - lastCommitSeenSequenceNumber = lastCommitSeenSequenceNumberByArbitratorTable.get(arbitratorId); + if (lastCommitSeenSequenceNumberByArbitratorTable->get(arbitratorId) != NULL) { + lastCommitSeenSequenceNumber = lastCommitSeenSequenceNumberByArbitratorTable->get(arbitratorId); } // Go through each new commit one by one - for (int i = 0; i < commitSequenceNumbers.size(); i++) { - Long commitSequenceNumber = commitSequenceNumbers.get(i); - Commit commit = commitForClientTable.get(commitSequenceNumber); + for (int i = 0; i < commitSequenceNumbers->size(); i++) { + int64_t commitSequenceNumber = commitSequenceNumbers->get(i); + Commit *commit = commitForClientTable->get(commitSequenceNumber); // Special processing if a commit is not complete - if (!commit.isComplete()) { - if (i == (commitSequenceNumbers.size() - 1)) { + if (!commit->isComplete()) { + if (i == (commitSequenceNumbers->size() - 1)) { // If there is an incomplete commit and this commit is the latest one seen then this commit cannot be processed and there are no other commits break; } else { - // This is a commit that was already dead but parts of it are still in the block chain (not flushed out yet). + // This is a commit that was already dead but parts of it are still in the block chain (not flushed out yet)-> // Delete it and move on - commit.setDead(); - commitForClientTable.remove(commit.getSequenceNumber()); + commit->setDead(); + commitForClientTable->remove(commit->getSequenceNumber()); continue; } } // Update the last transaction that was updated if we can - if (commit.getTransactionSequenceNumber() != -1) { - Long lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable.get(commit.getMachineId()); + if (commit->getTransactionSequenceNumber() != -1) { + int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(commit->getMachineId()); // Update the last transaction sequence number that the arbitrator arbitrated on - if ((lastTransactionNumber == NULL) || (lastTransactionNumber < commit.getTransactionSequenceNumber())) { - lastArbitratedTransactionNumberByArbitratorTable.put(commit.getMachineId(), commit.getTransactionSequenceNumber()); + if ((lastTransactionNumber == NULL) || (lastTransactionNumber < commit->getTransactionSequenceNumber())) { + lastArbitratedTransactionNumberByArbitratorTable->put(commit->getMachineId(), commit->getTransactionSequenceNumber()); } } // Update the last arbitration data that we have seen so far - if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.get(commit.getMachineId()) != NULL) { + if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(commit->getMachineId()) != NULL) { - int64_t lastArbitrationSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.get(commit.getMachineId()); - if (commit.getSequenceNumber() > lastArbitrationSequenceNumber) { + int64_t lastArbitrationSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(commit->getMachineId()); + if (commit->getSequenceNumber() > lastArbitrationSequenceNumber) { // Is larger - lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.put(commit.getMachineId(), commit.getSequenceNumber()); + lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->put(commit->getMachineId(), commit->getSequenceNumber()); } } else { // Never seen any data from this arbitrator so record the first one - lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.put(commit.getMachineId(), commit.getSequenceNumber()); + lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->put(commit->getMachineId(), commit->getSequenceNumber()); } // We have already seen this commit before so need to do the full processing on this commit - if (commit.getSequenceNumber() <= lastCommitSeenSequenceNumber) { + if (commit->getSequenceNumber() <= lastCommitSeenSequenceNumber) { // Update the last transaction that was updated if we can - if (commit.getTransactionSequenceNumber() != -1) { - Long lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable.get(commit.getMachineId()); + if (commit->getTransactionSequenceNumber() != -1) { + int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(commit->getMachineId()); // Update the last transaction sequence number that the arbitrator arbitrated on - if ((lastTransactionNumber == NULL) || (lastTransactionNumber < commit.getTransactionSequenceNumber())) { - lastArbitratedTransactionNumberByArbitratorTable.put(commit.getMachineId(), commit.getTransactionSequenceNumber()); + if ((lastTransactionNumber == NULL) || (lastTransactionNumber < commit->getTransactionSequenceNumber())) { + lastArbitratedTransactionNumberByArbitratorTable->put(commit->getMachineId(), commit->getTransactionSequenceNumber()); } } @@ -2142,45 +1999,45 @@ bool Table::updateCommittedTable() { // Get what commits should be edited, these are the commits that have live values for their keys Hashset *commitsToEdit = new Hashset(); - for (KeyValue kv : commit.getKeyValueUpdateSet()) { - commitsToEdit.add(liveCommitsByKeyTable.get(kv.getKey())); + for (KeyValue *kv : commit->getKeyValueUpdateSet()) { + commitsToEdit->add(liveCommitsByKeyTable->get(kv->getKey())); } - commitsToEdit.remove(NULL); // remove NULL since it could be in this set + commitsToEdit->remove(NULL); // remove NULL since it could be in this set // Update each previous commit that needs to be updated - for (Commit previousCommit : commitsToEdit) { + for (Commit * previousCommit : commitsToEdit) { // Only bother with live commits (TODO: Maybe remove this check) - if (previousCommit.isLive()) { + if (previousCommit->isLive()) { // Update which keys in the old commits are still live - for (KeyValue kv : commit.getKeyValueUpdateSet()) { - previousCommit.invalidateKey(kv.getKey()); + for (KeyValue * kv : commit->getKeyValueUpdateSet()) { + previousCommit->invalidateKey(kv->getKey()); } // if the commit is now dead then remove it - if (!previousCommit.isLive()) { - commitForClientTable.remove(previousCommit); + if (!previousCommit->isLive()) { + commitForClientTable->remove(previousCommit); } } } // Update the last seen sequence number from this arbitrator - if (lastCommitSeenSequenceNumberByArbitratorTable.get(commit.getMachineId()) != NULL) { - if (commit.getSequenceNumber() > lastCommitSeenSequenceNumberByArbitratorTable.get(commit.getMachineId())) { - lastCommitSeenSequenceNumberByArbitratorTable.put(commit.getMachineId(), commit.getSequenceNumber()); + if (lastCommitSeenSequenceNumberByArbitratorTable->get(commit->getMachineId()) != NULL) { + if (commit->getSequenceNumber() > lastCommitSeenSequenceNumberByArbitratorTable->get(commit->getMachineId())) { + lastCommitSeenSequenceNumberByArbitratorTable->put(commit->getMachineId(), commit->getSequenceNumber()); } } else { - lastCommitSeenSequenceNumberByArbitratorTable.put(commit.getMachineId(), commit.getSequenceNumber()); + lastCommitSeenSequenceNumberByArbitratorTable->put(commit->getMachineId(), commit->getSequenceNumber()); } // We processed a new commit that we havent seen before didProcessANewCommit = true; // Update the committed table of keys and which commit is using which key - for (KeyValue kv : commit.getKeyValueUpdateSet()) { - committedKeyValueTable.put(kv.getKey(), kv); - liveCommitsByKeyTable.put(kv.getKey(), commit); + for (KeyValue *kv : commit->getKeyValueUpdateSet()) { + committedKeyValueTable->put(kv->getKey(), kv); + liveCommitsByKeyTable->put(kv->getKey(), commit); } } } @@ -2192,16 +2049,16 @@ bool Table::updateCommittedTable() { * Create the speculative table from transactions that are still live and have come from the cloud */ bool Table::updateSpeculativeTable(bool didProcessNewCommits) { - if (liveTransactionBySequenceNumberTable.keySet().size() == 0) { + if (liveTransactionBySequenceNumberTable->keySet()->size() == 0) { // There is nothing to speculate on return false; } // Create a list of the transaction sequence numbers and sort them from oldest to newest - Vector transactionSequenceNumbersSorted = new Vector(liveTransactionBySequenceNumberTable.keySet()); - Collections.sort(transactionSequenceNumbersSorted); + Vector* transactionSequenceNumbersSorted = new Vector(liveTransactionBySequenceNumberTable->keySet()); + Collections->sort(transactionSequenceNumbersSorted); - bool hasGapInTransactionSequenceNumbers = transactionSequenceNumbersSorted.get(0) != oldestTransactionSequenceNumberSpeculatedOn; + bool hasGapInTransactionSequenceNumbers = transactionSequenceNumbersSorted->get(0) != oldestTransactionSequenceNumberSpeculatedOn; if (hasGapInTransactionSequenceNumbers || didProcessNewCommits) { @@ -2209,19 +2066,19 @@ bool Table::updateSpeculativeTable(bool didProcessNewCommits) { // OR there was a new commit (Could be from offline commit) so a redo the speculation from scratch // Start from scratch - speculatedKeyValueTable.clear(); + speculatedKeyValueTable->clear(); lastTransactionSequenceNumberSpeculatedOn = -1; oldestTransactionSequenceNumberSpeculatedOn = -1; } // Remember the front of the transaction list - oldestTransactionSequenceNumberSpeculatedOn = transactionSequenceNumbersSorted.get(0); + oldestTransactionSequenceNumberSpeculatedOn = transactionSequenceNumbersSorted->get(0); // Find where to start arbitration from - int startIndex = transactionSequenceNumbersSorted.indexOf(lastTransactionSequenceNumberSpeculatedOn) + 1; + int startIndex = transactionSequenceNumbersSorted->indexOf(lastTransactionSequenceNumberSpeculatedOn) + 1; - if (startIndex >= transactionSequenceNumbersSorted.size()) { + if (startIndex >= transactionSequenceNumbersSorted->size()) { // Make sure we are not out of bounds return false; // did not speculate } @@ -2229,28 +2086,28 @@ bool Table::updateSpeculativeTable(bool didProcessNewCommits) { Hashset *incompleteTransactionArbitrator = new Hashset(); bool didSkip = true; - for (int i = startIndex; i < transactionSequenceNumbersSorted.size(); i++) { - int64_t transactionSequenceNumber = transactionSequenceNumbersSorted.get(i); - Transaction transaction = liveTransactionBySequenceNumberTable.get(transactionSequenceNumber); + for (int i = startIndex; i < transactionSequenceNumbersSorted->size(); i++) { + int64_t transactionSequenceNumber = transactionSequenceNumbersSorted->get(i); + Transaction *transaction = liveTransactionBySequenceNumberTable->get(transactionSequenceNumber); - if (!transaction.isComplete()) { + if (!transaction->isComplete()) { // If there is an incomplete transaction then there is nothing we can do // add this transactions arbitrator to the list of arbitrators we should ignore - incompleteTransactionArbitrator.add(transaction.getArbitrator()); + incompleteTransactionArbitrator->add(transaction->getArbitrator()); didSkip = true; continue; } - if (incompleteTransactionArbitrator.contains(transaction.getArbitrator())) { + if (incompleteTransactionArbitrator->contains(transaction->getArbitrator())) { continue; } lastTransactionSequenceNumberSpeculatedOn = transactionSequenceNumber; - if (transaction.evaluateGuard(committedKeyValueTable, speculatedKeyValueTable, NULL)) { + if (transaction->evaluateGuard(committedKeyValueTable, speculatedKeyValueTable, NULL)) { // Guard evaluated to true so update the speculative table - for (KeyValue kv : transaction.getKeyValueUpdateSet()) { - speculatedKeyValueTable.put(kv.getKey(), kv); + for (KeyValue *kv : transaction->getKeyValueUpdateSet()) { + speculatedKeyValueTable->put(kv->getKey(), kv); } } } @@ -2269,36 +2126,36 @@ bool Table::updateSpeculativeTable(bool didProcessNewCommits) { * Create the pending transaction speculative table from transactions that are still in the pending transaction buffer */ void Table::updatePendingTransactionSpeculativeTable(bool didProcessNewCommitsOrSpeculate) { - if (pendingTransactionQueue.size() == 0) { + if (pendingTransactionQueue->size() == 0) { // There is nothing to speculate on return; } - if (didProcessNewCommitsOrSpeculate || (firstPendingTransaction != pendingTransactionQueue.get(0))) { + if (didProcessNewCommitsOrSpeculate || (firstPendingTransaction != pendingTransactionQueue->get(0))) { // need to reset on the pending speculation lastPendingTransactionSpeculatedOn = NULL; - firstPendingTransaction = pendingTransactionQueue.get(0); - pendingTransactionSpeculatedKeyValueTable.clear(); + firstPendingTransaction = pendingTransactionQueue->get(0); + pendingTransactionSpeculatedKeyValueTable->clear(); } // Find where to start arbitration from - int startIndex = pendingTransactionQueue.indexOf(firstPendingTransaction) + 1; + int startIndex = pendingTransactionQueue->indexOf(firstPendingTransaction) + 1; - if (startIndex >= pendingTransactionQueue.size()) { + if (startIndex >= pendingTransactionQueue->size()) { // Make sure we are not out of bounds return; } - for (int i = startIndex; i < pendingTransactionQueue.size(); i++) { - Transaction transaction = pendingTransactionQueue.get(i); + for (int i = startIndex; i < pendingTransactionQueue->size(); i++) { + Transaction *transaction = pendingTransactionQueue->get(i); lastPendingTransactionSpeculatedOn = transaction; - if (transaction.evaluateGuard(committedKeyValueTable, speculatedKeyValueTable, pendingTransactionSpeculatedKeyValueTable)) { + if (transaction->evaluateGuard(committedKeyValueTable, speculatedKeyValueTable, pendingTransactionSpeculatedKeyValueTable)) { // Guard evaluated to true so update the speculative table - for (KeyValue kv : transaction.getKeyValueUpdateSet()) { - pendingTransactionSpeculatedKeyValueTable.put(kv.getKey(), kv); + for (KeyValue *kv : transaction->getKeyValueUpdateSet()) { + pendingTransactionSpeculatedKeyValueTable->put(kv->getKey(), kv); } } } @@ -2310,81 +2167,81 @@ void Table::updatePendingTransactionSpeculativeTable(bool didProcessNewCommitsOr void Table::updateLiveTransactionsAndStatus() { // Go through each of the transactions - for (Iterator > iter = liveTransactionBySequenceNumberTable.entrySet().iterator(); iter.hasNext();) { - Transaction transaction = iter.next().getValue(); + for (IteratorEntry >* iter = liveTransactionBySequenceNumberTable->entrySet()->iterator(); iter->hasNext();) { + Transaction *transaction = iter->next()->getValue(); // Check if the transaction is dead - Long lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable.get(transaction.getArbitrator()); - if ((lastTransactionNumber != NULL) && (lastTransactionNumber >= transaction.getSequenceNumber())) { + int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(transaction->getArbitrator()); + if ((lastTransactionNumber != NULL) && (lastTransactionNumber >= transaction->getSequenceNumber())) { // Set dead the transaction - transaction.setDead(); + transaction->setDead(); // Remove the transaction from the live table - iter.remove(); - liveTransactionByTransactionIdTable.remove(transaction.getId()); + iter->remove(); + liveTransactionByTransactionIdTable->remove(transaction->getId()); } } // Go through each of the transactions - for (Iterator > iter = outstandingTransactionStatus.entrySet().iterator(); iter.hasNext();) { - TransactionStatus status = iter.next().getValue(); + for (IteratorEntry >* iter = outstandingTransactionStatus->entrySet()->iterator(); iter->hasNext();) { + TransactionStatus status = iter->next()->getValue(); // Check if the transaction is dead - Long lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable.get(status.getTransactionArbitrator()); - if ((lastTransactionNumber != NULL) && (lastTransactionNumber >= status.getTransactionSequenceNumber())) { + int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(status->getTransactionArbitrator()); + if ((lastTransactionNumber != NULL) && (lastTransactionNumber >= status->getTransactionSequenceNumber())) { // Set committed - status.setStatus(TransactionStatus.StatusCommitted); + status->setStatus(TransactionStatus_StatusCommitted); // Remove - iter.remove(); + iter->remove(); } } } /** - * Process this slot, entry by entry. Also update the latest message sent by slot + * Process this slot, entry by entry-> Also update the latest message sent by slot */ -void Table::processSlot(SlotIndexer indexer, Slot slot, bool acceptUpdatesToLocal, Hashset *machineSet) { +void Table::processSlot(SlotIndexer indexer, Slot *slot, bool acceptUpdatesToLocal, Hashset *machineSet) { // Update the last message seen - updateLastMessage(slot.getMachineID(), slot.getSequenceNumber(), slot, acceptUpdatesToLocal, machineSet); + updateLastMessage(slot->getMachineID(), slot->getSequenceNumber(), slot, acceptUpdatesToLocal, machineSet); // Process each entry in the slot - for (Entry entry : slot.getEntries()) { - switch (entry.getType()) { + for (Entry *entry : slot->getEntries()) { + switch (entry->getType()) { - case Entry.TypeCommitPart: + case TypeCommitPart: processEntry((CommitPart)entry); break; - case Entry.TypeAbort: + case TypeAbort: processEntry((Abort)entry); break; - case Entry.TypeTransactionPart: + case TypeTransactionPart: processEntry((TransactionPart)entry); break; - case Entry.TypeNewKey: + case TypeNewKey: processEntry((NewKey)entry); break; - case Entry.TypeLastMessage: + case TypeLastMessage: processEntry((LastMessage)entry, machineSet); break; - case Entry.TypeRejectedMessage: + case TypeRejectedMessage: processEntry((RejectedMessage)entry, indexer); break; - case Entry.TypeTableStatus: - processEntry((TableStatus)entry, slot.getSequenceNumber()); + case TypeTableStatus: + processEntry((TableStatus)entry, slot->getSequenceNumber()); break; default: - throw new Error("Unrecognized type: " + entry.getType()); + throw new Error("Unrecognized type: " + entry->getType()); } } } @@ -2394,39 +2251,39 @@ void Table::processSlot(SlotIndexer indexer, Slot slot, bool acceptUpdatesToLoca */ void Table::processEntry(LastMessage entry, Hashset *machineSet) { // Update what the last message received by a machine was - updateLastMessage(entry.getMachineID(), entry.getSequenceNumber(), entry, false, machineSet); + updateLastMessage(entry->getMachineID(), entry->getSequenceNumber(), entry, false, machineSet); } /** * Add the new key to the arbitrators table and update the set of live new keys (in case of a rescued new key message) */ -void Table::processEntry(NewKey entry) { +void Table::processEntry(NewKey* entry) { // Update the arbitrator table with the new key information - arbitratorTable.put(entry.getKey(), entry.getMachineID()); + arbitratorTable->put(entry->getKey(), entry->getMachineID()); // Update what the latest live new key is - NewKey oldNewKey = liveNewKeyTable.put(entry.getKey(), entry); + NewKey oldNewKey = liveNewKeyTable->put(entry->getKey(), entry); if (oldNewKey != NULL) { // Delete the old new key messages - oldNewKey.setDead(); + oldNewKey->setDead(); } } /** - * Process new table status entries and set dead the old ones as new ones come in. + * Process new table status entries and set dead the old ones as new ones come in-> * keeps track of the largest and smallest table status seen in this current round * of updating the local copy of the block chain */ void Table::processEntry(TableStatus entry, int64_t seq) { - int newNumSlots = entry.getMaxSlots(); + int newNumSlots = entry->getMaxSlots(); updateCurrMaxSize(newNumSlots); initExpectedSize(seq, newNumSlots); if (liveTableStatus != NULL) { // We have a larger table status so the old table status is no int64_ter alive - liveTableStatus.setDead(); + liveTableStatus->setDead(); } // Make this new table status the latest alive table status @@ -2434,26 +2291,26 @@ void Table::processEntry(TableStatus entry, int64_t seq) { } /** - * Check old messages to see if there is a block chain violation. Also + * Check old messages to see if there is a block chain violation-> Also */ void Table::processEntry(RejectedMessage entry, SlotIndexer indexer) { - int64_t oldSeqNum = entry.getOldSeqNum(); - int64_t newSeqNum = entry.getNewSeqNum(); - bool isequal = entry.getEqual(); - int64_t machineId = entry.getMachineID(); - int64_t seq = entry.getSequenceNumber(); + int64_t oldSeqNum = entry->getOldSeqNum(); + int64_t newSeqNum = entry->getNewSeqNum(); + bool isequal = entry->getEqual(); + int64_t machineId = entry->getMachineID(); + int64_t seq = entry->getSequenceNumber(); // Check if we have messages that were supposed to be rejected in our local block chain for (int64_t seqNum = oldSeqNum; seqNum <= newSeqNum; seqNum++) { // Get the slot - Slot slot = indexer.getSlot(seqNum); + Slot *slot = indexer->getSlot(seqNum); if (slot != NULL) { // If we have this slot make sure that it was not supposed to be a rejected slot - int64_t slotMachineId = slot.getMachineID(); + int64_t slotMachineId = slot->getMachineID(); if (isequal != (slotMachineId == machineId)) { throw new Error("Server Error: Trying to insert rejected message for slot " + seqNum); } @@ -2461,21 +2318,21 @@ void Table::processEntry(RejectedMessage entry, SlotIndexer indexer) { } - // Create a list of clients to watch until they see this rejected message entry. + // Create a list of clients to watch until they see this rejected message entry-> Hashset *deviceWatchSet = new Hashset(); - for (Map.Entry > lastMessageEntry : lastMessageTable.entrySet()) { + for (Map->Entry*>* lastMessageEntry : lastMessageTable->entrySet()) { // Machine ID for the last message entry - int64_t lastMessageEntryMachineId = lastMessageEntry.getKey(); + int64_t lastMessageEntryMachineId = lastMessageEntry->getKey(); - // We've seen it, don't need to continue to watch. Our next - // message will implicitly acknowledge it. + // We've seen it, don't need to continue to watch-> Our next + // message will implicitly acknowledge it-> if (lastMessageEntryMachineId == localMachineId) { continue; } - Pair lastMessageValue = lastMessageEntry.getValue(); - int64_t entrySequenceNumber = lastMessageValue.getFirst(); + Pair *lastMessageValue = lastMessageEntry->getValue(); + int64_t entrySequenceNumber = lastMessageValue->getFirst(); if (entrySequenceNumber < seq) { @@ -2483,52 +2340,52 @@ void Table::processEntry(RejectedMessage entry, SlotIndexer indexer) { addWatchVector(lastMessageEntryMachineId, entry); // This client did not see this rejected message yet so add it to the watch set to monitor - deviceWatchSet.add(lastMessageEntryMachineId); + deviceWatchSet->add(lastMessageEntryMachineId); } } - if (deviceWatchSet.isEmpty()) { + if (deviceWatchSet->isEmpty()) { // This rejected message has been seen by all the clients so - entry.setDead(); + entry->setDead(); } else { // We need to watch this rejected message - entry.setWatchSet(deviceWatchSet); + entry->setWatchSet(deviceWatchSet); } } /** - * Check if this abort is live, if not then save it so we can kill it later. - * update the last transaction number that was arbitrated on. + * Check if this abort is live, if not then save it so we can kill it later-> + * update the last transaction number that was arbitrated on-> */ void Table::processEntry(Abort entry) { - if (entry.getTransactionSequenceNumber() != -1) { + if (entry->getTransactionSequenceNumber() != -1) { // update the transaction status if it was sent to the server - TransactionStatus status = outstandingTransactionStatus.remove(entry.getTransactionSequenceNumber()); + TransactionStatus status = outstandingTransactionStatus->remove(entry->getTransactionSequenceNumber()); if (status != NULL) { - status.setStatus(TransactionStatus.StatusAborted); + status->setStatus(TransactionStatus_StatusAborted); } } // Abort has not been seen by the client it is for yet so we need to keep track of it - Abort previouslySeenAbort = liveAbortTable.put(entry.getAbortId(), entry); + Abort previouslySeenAbort = liveAbortTable->put(entry->getAbortId(), entry); if (previouslySeenAbort != NULL) { - previouslySeenAbort.setDead(); // Delete old version of the abort since we got a rescued newer version + previouslySeenAbort->setDead(); // Delete old version of the abort since we got a rescued newer version } - if (entry.getTransactionArbitrator() == localMachineId) { - liveAbortsGeneratedByLocal.put(entry.getArbitratorLocalSequenceNumber(), entry); + if (entry->getTransactionArbitrator() == localMachineId) { + liveAbortsGeneratedByLocal->put(entry->getArbitratorLocalSequenceNumber(), entry); } - if ((entry.getSequenceNumber() != -1) && (lastMessageTable.get(entry.getTransactionMachineId()).getFirst() >= entry.getSequenceNumber())) { + if ((entry->getSequenceNumber() != -1) && (lastMessageTable->get(entry->getTransactionMachineId())->getFirst() >= entry->getSequenceNumber())) { // The machine already saw this so it is dead - entry.setDead(); - liveAbortTable.remove(entry.getAbortId()); + entry->setDead(); + liveAbortTable->remove(entry->getAbortId()); - if (entry.getTransactionArbitrator() == localMachineId) { - liveAbortsGeneratedByLocal.remove(entry.getArbitratorLocalSequenceNumber()); + if (entry->getTransactionArbitrator() == localMachineId) { + liveAbortsGeneratedByLocal->remove(entry->getArbitratorLocalSequenceNumber()); } return; @@ -2538,32 +2395,32 @@ void Table::processEntry(Abort entry) { // Update the last arbitration data that we have seen so far - if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.get(entry.getTransactionArbitrator()) != NULL) { + if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(entry->getTransactionArbitrator()) != NULL) { - int64_t lastArbitrationSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.get(entry.getTransactionArbitrator()); - if (entry.getSequenceNumber() > lastArbitrationSequenceNumber) { + int64_t lastArbitrationSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(entry->getTransactionArbitrator()); + if (entry->getSequenceNumber() > lastArbitrationSequenceNumber) { // Is larger - lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.put(entry.getTransactionArbitrator(), entry.getSequenceNumber()); + lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->put(entry->getTransactionArbitrator(), entry->getSequenceNumber()); } } else { // Never seen any data from this arbitrator so record the first one - lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.put(entry.getTransactionArbitrator(), entry.getSequenceNumber()); + lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->put(entry->getTransactionArbitrator(), entry->getSequenceNumber()); } // Set dead a transaction if we can - Transaction transactionToSetDead = liveTransactionByTransactionIdTable.remove(new Pair(entry.getTransactionMachineId(), entry.getTransactionClientLocalSequenceNumber())); + Transaction *transactionToSetDead = liveTransactionByTransactionIdTable->remove(new Pair(entry->getTransactionMachineId(), entry->getTransactionClientLocalSequenceNumber())); if (transactionToSetDead != NULL) { - liveTransactionBySequenceNumberTable.remove(transactionToSetDead.getSequenceNumber()); + liveTransactionBySequenceNumberTable->remove(transactionToSetDead->getSequenceNumber()); } // Update the last transaction sequence number that the arbitrator arbitrated on - Long lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable.get(entry.getTransactionArbitrator()); - if ((lastTransactionNumber == NULL) || (lastTransactionNumber < entry.getTransactionSequenceNumber())) { + int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(entry->getTransactionArbitrator()); + if ((lastTransactionNumber == NULL) || (lastTransactionNumber < entry->getTransactionSequenceNumber())) { // Is a valid one - if (entry.getTransactionSequenceNumber() != -1) { - lastArbitratedTransactionNumberByArbitratorTable.put(entry.getTransactionArbitrator(), entry.getTransactionSequenceNumber()); + if (entry->getTransactionSequenceNumber() != -1) { + lastArbitratedTransactionNumberByArbitratorTable->put(entry->getTransactionArbitrator(), entry->getTransactionSequenceNumber()); } } } @@ -2573,107 +2430,107 @@ void Table::processEntry(Abort entry) { */ void Table::processEntry(TransactionPart entry) { // Check if we have already seen this transaction and set it dead OR if it is not alive - Long lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable.get(entry.getArbitratorId()); - if ((lastTransactionNumber != NULL) && (lastTransactionNumber >= entry.getSequenceNumber())) { + int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(entry->getArbitratorId()); + if ((lastTransactionNumber != NULL) && (lastTransactionNumber >= entry->getSequenceNumber())) { // This transaction is dead, it was already committed or aborted - entry.setDead(); + entry->setDead(); return; } // This part is still alive - Hashtable, TransactionPart> transactionPart = newTransactionParts.get(entry.getMachineId()); + Hashtable*, TransactionPart*>* transactionPart = newTransactionParts->get(entry->getMachineId()); if (transactionPart == NULL) { // Dont have a table for this machine Id yet so make one - transactionPart = new Hashtable, TransactionPart>(); - newTransactionParts.put(entry.getMachineId(), transactionPart); + transactionPart = new Hashtable*, TransactionPart*>(); + newTransactionParts->put(entry->getMachineId(), transactionPart); } // Update the part and set dead ones we have already seen (got a rescued version) - TransactionPart previouslySeenPart = transactionPart.put(entry.getPartId(), entry); + TransactionPart *previouslySeenPart = transactionPart->put(entry->getPartId(), entry); if (previouslySeenPart != NULL) { - previouslySeenPart.setDead(); + previouslySeenPart->setDead(); } } /** - * Process new commit entries and save them for future use. Delete duplicates + * Process new commit entries and save them for future use-> Delete duplicates */ void Table::processEntry(CommitPart entry) { // Update the last transaction that was updated if we can - if (entry.getTransactionSequenceNumber() != -1) { - Long lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable.get(entry.getMachineId()); + if (entry->getTransactionSequenceNumber() != -1) { + int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(entry->getMachineId()); // Update the last transaction sequence number that the arbitrator arbitrated on - if ((lastTransactionNumber == NULL) || (lastTransactionNumber < entry.getTransactionSequenceNumber())) { - lastArbitratedTransactionNumberByArbitratorTable.put(entry.getMachineId(), entry.getTransactionSequenceNumber()); + if ((lastTransactionNumber == NULL) || (lastTransactionNumber < entry->getTransactionSequenceNumber())) { + lastArbitratedTransactionNumberByArbitratorTable->put(entry->getMachineId(), entry->getTransactionSequenceNumber()); } } - Hashtable, CommitPart> commitPart = newCommitParts.get(entry.getMachineId()); + Hashtable*, CommitPart*>* commitPart = newCommitParts->get(entry->getMachineId()); if (commitPart == NULL) { // Don't have a table for this machine Id yet so make one - commitPart = new Hashtable, CommitPart>(); - newCommitParts.put(entry.getMachineId(), commitPart); + commitPart = new Hashtable*, CommitPart*>(); + newCommitParts->put(entry->getMachineId(), commitPart); } // Update the part and set dead ones we have already seen (got a rescued version) - CommitPart previouslySeenPart = commitPart.put(entry.getPartId(), entry); + CommitPart previouslySeenPart = commitPart->put(entry->getPartId(), entry); if (previouslySeenPart != NULL) { - previouslySeenPart.setDead(); + previouslySeenPart->setDead(); } } /** - * Update the last message seen table. Update and set dead the appropriate RejectedMessages as clients see them. - * Updates the live aborts, removes those that are dead and sets them dead. + * Update the last message seen table-> Update and set dead the appropriate RejectedMessages as clients see them-> + * Updates the live aborts, removes those that are dead and sets them dead-> * Check that the last message seen is correct and that there is no mismatch of our own last message or that - * other clients have not had a rollback on the last message. + * other clients have not had a rollback on the last message-> */ void Table::updateLastMessage(int64_t machineId, int64_t seqNum, Liveness liveness, bool acceptUpdatesToLocal, Hashset *machineSet) { // We have seen this machine ID - machineSet.remove(machineId); + machineSet->remove(machineId); // Get the set of rejected messages that this machine Id is has not seen yet - Hashset *watchset = rejectedMessageWatchVectorTable.get(machineId); + Hashset *watchset = rejectedMessageWatchVectorTable->get(machineId); // If there is a rejected message that this machine Id has not seen yet if (watchset != NULL) { // Go through each rejected message that this machine Id has not seen yet - for (Iterator rmit = watchset.iterator(); rmit.hasNext(); ) { + for (Iterator rmit = watchset->iterator(); rmit->hasNext(); ) { - RejectedMessage rm = rmit.next(); + RejectedMessage rm = rmit->next(); - // If this machine Id has seen this rejected message... - if (rm.getSequenceNumber() <= seqNum) { + // If this machine Id has seen this rejected message->->-> + if (rm->getSequenceNumber() <= seqNum) { // Remove it from our watchlist - rmit.remove(); + rmit->remove(); // Decrement machines that need to see this notification - rm.removeWatcher(machineId); + rm->removeWatcher(machineId); } } } // Set dead the abort - for (Iterator, Abort> > i = liveAbortTable.entrySet().iterator(); i.hasNext();) { - Abort abort = i.next().getValue(); + for (IteratorEntry*, Abort*> > i = liveAbortTable->entrySet()->iterator(); i->hasNext();) { + Abort abort = i->next()->getValue(); - if ((abort.getTransactionMachineId() == machineId) && (abort.getSequenceNumber() <= seqNum)) { - abort.setDead(); - i.remove(); + if ((abort->getTransactionMachineId() == machineId) && (abort->getSequenceNumber() <= seqNum)) { + abort->setDead(); + i->remove(); - if (abort.getTransactionArbitrator() == localMachineId) { - liveAbortsGeneratedByLocal.remove(abort.getArbitratorLocalSequenceNumber()); + if (abort->getTransactionArbitrator() == localMachineId) { + liveAbortsGeneratedByLocal->remove(abort->getArbitratorLocalSequenceNumber()); } } } @@ -2681,32 +2538,32 @@ void Table::updateLastMessage(int64_t machineId, int64_t seqNum, Liveness livene if (machineId == localMachineId) { - // Our own messages are immediately dead. + // Our own messages are immediately dead-> if (liveness instanceof LastMessage) { - ((LastMessage)liveness).setDead(); + ((LastMessage)liveness)->setDead(); } else if (liveness instanceof Slot) { - ((Slot)liveness).setDead(); + ((Slot)liveness)->setDead(); } else { throw new Error("Unrecognized type"); } } // Get the old last message for this device - Pair lastMessageEntry = lastMessageTable.put(machineId, new Pair(seqNum, liveness)); + Pair lastMessageEntry = lastMessageTable->put(machineId, new Pair(seqNum, liveness)); if (lastMessageEntry == NULL) { // If no last message then there is nothing else to process return; } - int64_t lastMessageSeqNum = lastMessageEntry.getFirst(); - Liveness lastEntry = lastMessageEntry.getSecond(); + int64_t lastMessageSeqNum = lastMessageEntry->getFirst(); + Liveness lastEntry = lastMessageEntry->getSecond(); // If it is not our machine Id since we already set ours to dead if (machineId != localMachineId) { if (lastEntry instanceof LastMessage) { - ((LastMessage)lastEntry).setDead(); + ((LastMessage)lastEntry)->setDead(); } else if (lastEntry instanceof Slot) { - ((Slot)lastEntry).setDead(); + ((Slot)lastEntry)->setDead(); } else { throw new Error("Unrecognized type"); } @@ -2736,27 +2593,27 @@ void Table::updateLastMessage(int64_t machineId, int64_t seqNum, Liveness livene /** * Add a rejected message entry to the watch set to keep track of which clients have seen that - * rejected message entry and which have not. + * rejected message entry and which have not-> */ void Table::addWatchVector(int64_t machineId, RejectedMessage entry) { - Hashset *entries = rejectedMessageWatchVectorTable.get(machineId); + Hashset *entries = rejectedMessageWatchVectorTable->get(machineId); if (entries == NULL) { // There is no set for this machine ID yet so create one entries = new Hashset(); - rejectedMessageWatchVectorTable.put(machineId, entries); + rejectedMessageWatchVectorTable->put(machineId, entries); } - entries.add(entry); + entries->add(entry); } /** * Check if the HMAC chain is not violated */ -void Table::checkHMACChain(SlotIndexer indexer, Array *newSlots) { - for (int i = 0; i < newSlots.length; i++) { +void Table::checkHMACChain(SlotIndexer indexer, Array *newSlots) { + for (int i = 0; i < newSlots->length; i++) { Slot currSlot = newSlots[i]; - Slot prevSlot = indexer.getSlot(currSlot.getSequenceNumber() - 1); + Slot prevSlot = indexer->getSlot(currSlot->getSequenceNumber() - 1); if (prevSlot != NULL && - !Arrays.equals(prevSlot.getHMAC(), currSlot.getPrevHMAC())) + !Arrays->equals(prevSlot->getHMAC(), currSlot->getPrevHMAC())) throw new Error("Server Error: Invalid HMAC Chain" + currSlot + " " + prevSlot); } } diff --git a/version2/src/C/Table.h b/version2/src/C/Table.h index 763ea90..1d58776 100644 --- a/version2/src/C/Table.h +++ b/version2/src/C/Table.h @@ -52,7 +52,7 @@ private: Slot *lastSlotAttemptedToSend; bool lastIsNewKey; int lastNewSize; - Hashtable *> lastTransactionPartsSent; + Hashtable *> *lastTransactionPartsSent; Vector *lastPendingSendArbitrationEntriesToDelete; NewKey *lastNewKey; @@ -78,7 +78,7 @@ private: Vector *pendingTransactionQueue; Vector *pendingSendArbitrationRounds; Vector *pendingSendArbitrationEntriesToDelete; - Hashtable *> *transactionPartsSent; + Hashtable *> *transactionPartsSent; Hashtable *outstandingTransactionStatus; Hashtable *liveAbortsGeneratedByLocal; Hashset *> *offlineTransactionsCommittedAndAtServer; @@ -101,16 +101,16 @@ private: /** * Returns false if a resize was needed */ - ThreeTuple *fillSlot(Slot *slot, bool resize, NewKey *newKeyEntry); - void doRejectedMessages(Slot s); + ThreeTuple *fillSlot(Slot *slot, bool resize, NewKey *newKeyEntry); + void doRejectedMessages(Slot *s); - ThreeTuple doMandatoryResuce(Slot slot, bool resize); + ThreeTuple doMandatoryResuce(Slot *slot, bool resize); - void doOptionalRescue(Slot s, bool seenliveslot, int64_t seqn, bool resize); + void doOptionalRescue(Slot *s, bool seenliveslot, int64_t seqn, bool resize); /** * Checks for malicious activity and updates the local copy of the block chain. */ - void validateAndUpdate(Array *newSlots, bool acceptUpdatesToLocal); + void validateAndUpdate(Array *newSlots, bool acceptUpdatesToLocal); void updateLiveStateFromServer(); @@ -146,7 +146,7 @@ private: void arbitrateFromServer(); - Pair arbitrateOnLocalTransaction(Transaction transaction); + Pair arbitrateOnLocalTransaction(Transaction * transaction); /** * Compacts the arbitration data my merging commits and aggregating aborts so that a single large push of commits can be done instead of many small updates @@ -176,45 +176,45 @@ private: /** * Process this slot, entry by entry. Also update the latest message sent by slot */ - void processSlot(SlotIndexer indexer, Slot slot, bool acceptUpdatesToLocal, Hashset machineSet); + void processSlot(SlotIndexer *indexer, Slot *slot, bool acceptUpdatesToLocal, Hashset *machineSet); /** * Update the last message that was sent for a machine Id */ - void processEntry(LastMessage entry, Hashset machineSet); + void processEntry(LastMessage *entry, Hashset *machineSet); /** * Add the new key to the arbitrators table and update the set of live new keys (in case of a rescued new key message) */ - void processEntry(NewKey entry); + void processEntry(NewKey * entry); /** * Process new table status entries and set dead the old ones as new ones come in. * keeps track of the largest and smallest table status seen in this current round * of updating the local copy of the block chain */ - void processEntry(TableStatus entry, int64_t seq); + void processEntry(TableStatus *entry, int64_t seq); /** * Check old messages to see if there is a block chain violation. Also */ - void processEntry(RejectedMessage entry, SlotIndexer indexer); + void processEntry(RejectedMessage *entry, SlotIndexer *indexer); /** * Check if this abort is live, if not then save it so we can kill it later. * update the last transaction number that was arbitrated on. */ - void processEntry(Abort entry); + void processEntry(Abort *entry); /** * Set dead the transaction part if that transaction is dead and keep track of all new parts */ - void processEntry(TransactionPart entry); + void processEntry(TransactionPart *entry); /** * Process new commit entries and save them for future use. Delete duplicates */ - void processEntry(CommitPart entry); + void processEntry(CommitPart *entry); /** * Update the last message seen table. Update and set dead the appropriate RejectedMessages as clients see them. @@ -222,23 +222,23 @@ private: * Check that the last message seen is correct and that there is no mismatch of our own last message or that * other clients have not had a rollback on the last message. */ - void updateLastMessage(int64_t machineId, int64_t seqNum, Liveness liveness, bool acceptUpdatesToLocal, Hashset machineSet); + void updateLastMessage(int64_t machineId, int64_t seqNum, Liveness *liveness, bool acceptUpdatesToLocal, Hashset *machineSet); /** * Add a rejected message entry to the watch set to keep track of which clients have seen that * rejected message entry and which have not. */ - void addWatchVector(int64_t machineId, RejectedMessage entry); + void addWatchVector(int64_t machineId, RejectedMessage * entry); /** * Check if the HMAC chain is not violated */ - void checkHMACChain(SlotIndexer indexer, Array *newSlots); + void checkHMACChain(SlotIndexer * indexer, Array *newSlots); public: Table(IoTString *baseurl, IoTString *password, int64_t _localMachineId, int listeningPort); - Table(CloudComm _cloud, int64_t _localMachineId); + Table(CloudComm *_cloud, int64_t _localMachineId); /** * Initialize the table by inserting a table status as the first entry into the table status @@ -249,14 +249,17 @@ public: /** * Rebuild the table from scratch by pulling the latest block chain from the server. */ + bool update(); void rebuild(); void addLocalCommunication(int64_t arbitrator, IoTString *hostName, int portNumber); - uint64_t getArbitrator(IoTString *key); + int64_t getArbitrator(IoTString *key); void close(); IoTString *getCommitted(IoTString *key); IoTString *getSpeculative(IoTString *key); IoTString *getCommittedAtomic(IoTString *key); bool createNewKey(IoTString *keyName, int64_t machineId); + void startTransaction(); + void addKV(IoTString *key, IoTString *value); TransactionStatus *commitTransaction(); /** -- 2.34.1