X-Git-Url: http://plrg.eecs.uci.edu/git/?p=iotcloud.git;a=blobdiff_plain;f=version2%2Fsrc%2FC%2FTable.cc;h=76a7f16adfc7b6a7431859fa3742ccf1f31db7dd;hp=1148e9343aa931eff202757b1724b23699f3dfce;hb=2d567b75be4055f6a40ffe1cedb5cdc9be262d86;hpb=bee76cd147ce3f14198f601123154500da5e4f1a diff --git a/version2/src/C/Table.cc b/version2/src/C/Table.cc index 1148e93..76a7f16 100644 --- a/version2/src/C/Table.cc +++ b/version2/src/C/Table.cc @@ -1,16 +1,126 @@ #include "Table.h" -Table::Table(String baseurl, String password, int64_t _localMachineId, int listeningPort) { - localMachineId = _localMachineId; - cloud = new CloudComm(this, baseurl, password, listeningPort); - +Table::Table(String baseurl, String password, int64_t _localMachineId, int listeningPort) : + buffer(NULL), + cloud(new CloudComm(this, baseurl, password, listeningPort)), + random(NULL), + liveTableStatus(NULL), + pendingTransactionBuilder(NULL), + lastPendingTransactionSpeculatedOn(NULL), + firstPendingTransaction(NULL), + numberOfSlots(0), + bufferResizeThreshold(0), + liveSlotCount(0), + oldestLiveSlotSequenceNumber(0), + localMachineId(_localMachineId), + sequenceNumber(0), + localTransactionSequenceNumber(0), + lastTransactionSequenceNumberSpeculatedOn(0), + oldestTransactionSequenceNumberSpeculatedOn(0), + localArbitrationSequenceNumber(0), + hadPartialSendToServer(false), + attemptedToSendToServer(false), + expectedSize(0), + didFindTableStatus(false), + currMaxSize(0), + lastSlotAttemptedToSend(NULL), + lastIsNewKey(false), + lastNewSize(0), + lastTransactionPartsSent(NULL), + lastPendingSendArbitrationEntriesToDelete(NULL), + lastNewKey(NULL), + committedKeyValueTable(NULL), + speculatedKeyValueTable(NULL), + pendingTransactionSpeculatedKeyValueTable(NULL), + liveNewKeyTable(NULL), + lastMessageTable(NULL), + rejectedMessageWatchVectorTable(NULL), + arbitratorTable(NULL), + liveAbortTable(NULL), + newTransactionParts(NULL), + newCommitParts(NULL), + lastArbitratedTransactionNumberByArbitratorTable(NULL), + liveTransactionBySequenceNumberTable(NULL), + liveTransactionByTransactionIdTable(NULL), + liveCommitsTable(NULL), + liveCommitsByKeyTable(NULL), + lastCommitSeenSequenceNumberByArbitratorTable(NULL), + rejectedSlotVector(NULL), + pendingTransactionQueue(NULL), + pendingSendArbitrationRounds(NULL), + pendingSendArbitrationEntriesToDelete(NULL), + transactionPartsSent(NULL), + outstandingTransactionStatus(NULL), + liveAbortsGeneratedByLocal(NULL), + offlineTransactionsCommittedAndAtServer(NULL), + localCommunicationTable(NULL), + lastTransactionSeenFromMachineFromServer(NULL), + lastArbitrationDataLocalSequenceNumberSeenFromArbitrator(NULL), + lastInsertedNewKey(false), + lastSeqNumArbOn(0) +{ init(); } -Table::Table(CloudComm _cloud, int64_t _localMachineId) { - localMachineId = _localMachineId; - cloud = _cloud; - +Table::Table(CloudComm _cloud, int64_t _localMachineId) : + buffer(NULL), + cloud(_cloud), + random(NULL), + liveTableStatus(NULL), + pendingTransactionBuilder(NULL), + lastPendingTransactionSpeculatedOn(NULL), + firstPendingTransaction(NULL), + numberOfSlots(0), + bufferResizeThreshold(0), + liveSlotCount(0), + oldestLiveSlotSequenceNumber(0), + localMachineId(_localMachineId), + sequenceNumber(0), + localTransactionSequenceNumber(0), + lastTransactionSequenceNumberSpeculatedOn(0), + oldestTransactionSequenceNumberSpeculatedOn(0), + localArbitrationSequenceNumber(0), + hadPartialSendToServer(false), + attemptedToSendToServer(false), + expectedSize(0), + didFindTableStatus(false), + currMaxSize(0), + lastSlotAttemptedToSend(NULL), + lastIsNewKey(false), + lastNewSize(0), + lastTransactionPartsSent(NULL), + lastPendingSendArbitrationEntriesToDelete(NULL), + lastNewKey(NULL), + committedKeyValueTable(NULL), + speculatedKeyValueTable(NULL), + pendingTransactionSpeculatedKeyValueTable(NULL), + liveNewKeyTable(NULL), + lastMessageTable(NULL), + rejectedMessageWatchVectorTable(NULL), + arbitratorTable(NULL), + liveAbortTable(NULL), + newTransactionParts(NULL), + newCommitParts(NULL), + lastArbitratedTransactionNumberByArbitratorTable(NULL), + liveTransactionBySequenceNumberTable(NULL), + liveTransactionByTransactionIdTable(NULL), + liveCommitsTable(NULL), + liveCommitsByKeyTable(NULL), + lastCommitSeenSequenceNumberByArbitratorTable(NULL), + rejectedSlotVector(NULL), + pendingTransactionQueue(NULL), + pendingSendArbitrationRounds(NULL), + pendingSendArbitrationEntriesToDelete(NULL), + transactionPartsSent(NULL), + outstandingTransactionStatus(NULL), + liveAbortsGeneratedByLocal(NULL), + offlineTransactionsCommittedAndAtServer(NULL), + localCommunicationTable(NULL), + lastTransactionSeenFromMachineFromServer(NULL), + lastArbitrationDataLocalSequenceNumberSeenFromArbitrator(NULL), + lastInsertedNewKey(false), + lastSeqNumArbOn(0) +{ init(); } @@ -32,7 +142,7 @@ void Table::init() { pendingTransactionSpeculatedKeyValueTable = new Hashtable(); liveNewKeyTable = new Hashtable(); lastMessageTable = new Hashtable >(); - rejectedMessageWatchVectorTable = new Hashtable >(); + rejectedMessageWatchVectorTable = new Hashtable >(); arbitratorTable = new Hashtable(); liveAbortTable = new Hashtable, Abort>(); newTransactionParts = new Hashtable, TransactionPart> >(); @@ -49,7 +159,7 @@ void Table::init() { transactionPartsSent = new Hashtable >(); outstandingTransactionStatus = new Hashtable(); liveAbortsGeneratedByLocal = new Hashtable(); - offlineTransactionsCommittedAndAtServer = new HashSet >(); + offlineTransactionsCommittedAndAtServer = new Hashset >(); localCommunicationTable = new Hashtable >(); lastTransactionSeenFromMachineFromServer = new Hashtable(); pendingSendArbitrationRounds = new Vector(); @@ -66,7 +176,7 @@ synchronized void Table::printSlots() { int64_t o = buffer.getOldestSeqNum(); int64_t n = buffer.getNewestSeqNum(); - int[] types = new int[10]; + Array *types = new Array(10); int num = 0; @@ -152,10 +262,11 @@ synchronized void Table::initTable() { localSequenceNumber++; TableStatus status = new TableStatus(s, numberOfSlots); s.addEntry(status); - Slot[] array = cloud.putSlot(s, numberOfSlots); + Array *array = cloud.putSlot(s, numberOfSlots); if (array == NULL) { - array = new Slot[] {s}; + array = new Array(1); + array->set(0, s); // update local block chain validateAndUpdate(array, true); } else if (array.length == 1) { @@ -171,7 +282,7 @@ synchronized void Table::initTable() { */ synchronized void Table::rebuild() { // Just pull the latest slots from the server - Slot[] newslots = cloud.getSlots(sequenceNumber + 1); + Array *newslots = cloud.getSlots(sequenceNumber + 1); validateAndUpdate(newslots, true); sendToServer(NULL); updateLiveTransactionsAndStatus(); @@ -286,7 +397,7 @@ synchronized IoTString Table::getSpeculativeAtomic(IoTString key) { synchronized bool Table::update() { try { - Slot[] newSlots = cloud.getSlots(sequenceNumber + 1); + Array *newSlots = cloud.getSlots(sequenceNumber + 1); validateAndUpdate(newSlots, false); sendToServer(NULL); @@ -376,7 +487,7 @@ synchronized TransactionStatus Table::commitTransaction() { sendToServer(NULL); } catch (ServerException e) { - Set arbitratorTriedAndFailed = new HashSet(); + Set arbitratorTriedAndFailed = new Hashset(); for (Iterator iter = pendingTransactionQueue.iterator(); iter.hasNext(); ) { Transaction transaction = iter.next(); @@ -442,10 +553,10 @@ bool Table::sendToServer(NewKey newKey) { try { if (hadPartialSendToServer) { - Slot[] newSlots = cloud.getSlots(sequenceNumber + 1); + Array *newSlots = cloud.getSlots(sequenceNumber + 1); if (newSlots.length == 0) { fromRetry = true; - ThreeTuple sendSlotsReturn = sendSlotsToServer(lastSlotAttemptedToSend, lastNewSize, lastIsNewKey); + ThreeTuple *> sendSlotsReturn = sendSlotsToServer(lastSlotAttemptedToSend, lastNewSize, lastIsNewKey); if (sendSlotsReturn.getFirst()) { if (newKey != NULL) { @@ -682,7 +793,7 @@ bool Table::sendToServer(NewKey newKey) { lastPendingSendArbitrationEntriesToDelete = new Vector(pendingSendArbitrationEntriesToDelete); - ThreeTuple sendSlotsReturn = sendSlotsToServer(slot, newSize, newKey != NULL); + ThreeTuple *> sendSlotsReturn = sendSlotsToServer(slot, newSize, newKey != NULL); if (sendSlotsReturn.getFirst()) { @@ -1061,16 +1172,17 @@ synchronized Array *Table::acceptDataFromLocal(Array *data) { return returnData; } -ThreeTuple Table::sendSlotsToServer(Slot slot, int newSize, bool isNewKey) { +ThreeTuple *> Table::sendSlotsToServer(Slot slot, int newSize, bool isNewKey) { bool attemptedToSendToServerTmp = attemptedToSendToServer; attemptedToSendToServer = true; bool inserted = false; bool lastTryInserted = false; - Slot[] array = cloud.putSlot(slot, newSize); + Array *array = cloud.putSlot(slot, newSize); if (array == NULL) { - array = new Slot[] {slot}; + array = new Array(); + array->set(0, slot); rejectedSlotVector.clear(); inserted = true; } else { @@ -1120,7 +1232,7 @@ ThreeTuple Table::sendSlotsToServer(Slot slot, int newSize, } } - return new ThreeTuple(inserted, lastTryInserted, array); + return new ThreeTuple *>(inserted, lastTryInserted, array); } /** @@ -1364,7 +1476,7 @@ search: /** * Checks for malicious activity and updates the local copy of the block chain. */ -void Table::validateAndUpdate(Slot[] newSlots, bool acceptUpdatesToLocal) { +void Table::validateAndUpdate(Array *newSlots, bool acceptUpdatesToLocal) { // The cloud communication layer has checked slot HMACs already before decoding if (newSlots.length == 0) { @@ -1385,7 +1497,7 @@ void Table::validateAndUpdate(Slot[] newSlots, bool acceptUpdatesToLocal) { checkHMACChain(indexer, newSlots); // Set to keep track of messages from clients - HashSet machineSet = new HashSet(lastMessageTable.keySet()); + Hashset machineSet = new Hashset(lastMessageTable.keySet()); // Process each slots data for (Slot slot : newSlots) { @@ -1595,7 +1707,7 @@ void Table::arbitrateFromServer() { // The last transaction arbitrated on int64_t lastTransactionCommitted = -1; - Set generatedAborts = new HashSet(); + Set generatedAborts = new Hashset(); for (Long transactionSequenceNumber : transactionSequenceNumbers) { Transaction transaction = liveTransactionBySequenceNumberTable.get(transactionSequenceNumber); @@ -1746,7 +1858,7 @@ Pair Table::arbitrateOnLocalTransaction(Transaction transaction) { newCommit.createCommitParts(); // Append all the commit parts to the end of the pending queue waiting for sending to the server - ArbitrationRound arbitrationRound = new ArbitrationRound(newCommit, new HashSet()); + ArbitrationRound *arbitrationRound = new ArbitrationRound(newCommit, new Hashset()); pendingSendArbitrationRounds.add(arbitrationRound); if (compactArbitrationData()) { @@ -1781,7 +1893,7 @@ Pair Table::arbitrateOnLocalTransaction(Transaction transaction) { status.setStatus(TransactionStatus.StatusAborted); } } else { - Set addAbortSet = new HashSet(); + Hashset addAbortSet = new Hashset(); // Create the abort @@ -2029,7 +2141,7 @@ bool Table::updateCommittedTable() { // If we got here then this is a brand new commit and needs full processing // Get what commits should be edited, these are the commits that have live values for their keys - Set commitsToEdit = new HashSet(); + Hashset *commitsToEdit = new Hashset(); for (KeyValue kv : commit.getKeyValueUpdateSet()) { commitsToEdit.add(liveCommitsByKeyTable.get(kv.getKey())); } @@ -2114,7 +2226,7 @@ bool Table::updateSpeculativeTable(bool didProcessNewCommits) { return false; // did not speculate } - Set incompleteTransactionArbitrator = new HashSet(); + Hashset *incompleteTransactionArbitrator = new Hashset(); bool didSkip = true; for (int i = startIndex; i < transactionSequenceNumbersSorted.size(); i++) { @@ -2234,7 +2346,7 @@ void Table::updateLiveTransactionsAndStatus() { /** * Process this slot, entry by entry. Also update the latest message sent by slot */ -void Table::processSlot(SlotIndexer indexer, Slot slot, bool acceptUpdatesToLocal, HashSet machineSet) { +void Table::processSlot(SlotIndexer indexer, Slot slot, bool acceptUpdatesToLocal, Hashset *machineSet) { // Update the last message seen updateLastMessage(slot.getMachineID(), slot.getSequenceNumber(), slot, acceptUpdatesToLocal, machineSet); @@ -2280,7 +2392,7 @@ void Table::processSlot(SlotIndexer indexer, Slot slot, bool acceptUpdatesToLoca /** * Update the last message that was sent for a machine Id */ -void Table::processEntry(LastMessage entry, HashSet machineSet) { +void Table::processEntry(LastMessage entry, Hashset *machineSet) { // Update what the last message received by a machine was updateLastMessage(entry.getMachineID(), entry.getSequenceNumber(), entry, false, machineSet); } @@ -2350,7 +2462,7 @@ void Table::processEntry(RejectedMessage entry, SlotIndexer indexer) { // Create a list of clients to watch until they see this rejected message entry. - HashSet deviceWatchSet = new HashSet(); + Hashset *deviceWatchSet = new Hashset(); for (Map.Entry > lastMessageEntry : lastMessageTable.entrySet()) { // Machine ID for the last message entry @@ -2524,13 +2636,13 @@ void Table::processEntry(CommitPart entry) { * Check that the last message seen is correct and that there is no mismatch of our own last message or that * other clients have not had a rollback on the last message. */ -void Table::updateLastMessage(int64_t machineId, int64_t seqNum, Liveness liveness, bool acceptUpdatesToLocal, HashSet machineSet) { +void Table::updateLastMessage(int64_t machineId, int64_t seqNum, Liveness liveness, bool acceptUpdatesToLocal, Hashset *machineSet) { // We have seen this machine ID machineSet.remove(machineId); // Get the set of rejected messages that this machine Id is has not seen yet - HashSet watchset = rejectedMessageWatchVectorTable.get(machineId); + Hashset *watchset = rejectedMessageWatchVectorTable.get(machineId); // If there is a rejected message that this machine Id has not seen yet if (watchset != NULL) { @@ -2627,10 +2739,10 @@ void Table::updateLastMessage(int64_t machineId, int64_t seqNum, Liveness livene * rejected message entry and which have not. */ void Table::addWatchVector(int64_t machineId, RejectedMessage entry) { - HashSet entries = rejectedMessageWatchVectorTable.get(machineId); + Hashset *entries = rejectedMessageWatchVectorTable.get(machineId); if (entries == NULL) { // There is no set for this machine ID yet so create one - entries = new HashSet(); + entries = new Hashset(); rejectedMessageWatchVectorTable.put(machineId, entries); } entries.add(entry); @@ -2639,7 +2751,7 @@ void Table::addWatchVector(int64_t machineId, RejectedMessage entry) { /** * Check if the HMAC chain is not violated */ -void Table::checkHMACChain(SlotIndexer indexer, Slot[] newSlots) { +void Table::checkHMACChain(SlotIndexer indexer, Array *newSlots) { for (int i = 0; i < newSlots.length; i++) { Slot currSlot = newSlots[i]; Slot prevSlot = indexer.getSlot(currSlot.getSequenceNumber() - 1);