From 9c3fa5cbce287df14626d262bd0179e994338869 Mon Sep 17 00:00:00 2001 From: bdemsky Date: Thu, 15 Feb 2018 15:59:57 -0800 Subject: [PATCH] edits --- version2/src/C/Commit.cc | 6 +- version2/src/C/PendingTransaction.cc | 8 +- version2/src/C/Table.cc | 149 +++++++++++++++++--------- version2/src/C/Table.h | 2 +- version2/src/C/Transaction.cc | 6 +- version2/src/C/Transaction.h | 2 +- version2/src/C/vector.h | 15 +++ version2/src/java/iotcloud/Table.java | 4 +- 8 files changed, 127 insertions(+), 65 deletions(-) diff --git a/version2/src/C/Commit.cc b/version2/src/C/Commit.cc index ec4b39a..f159f71 100644 --- a/version2/src/C/Commit.cc +++ b/version2/src/C/Commit.cc @@ -198,7 +198,7 @@ void Commit::decodeCommitData() { Array *Commit::convertDataToBytes() { // Calculate the size of the data int sizeOfData = sizeof(int32_t); // Number of Update KV's - SetIterator *kvit = keyValueUpdateSet->iterator(); + SetIterator *kvit = keyValueUpdateSet->iterator(); while (kvit->hasNext()) { KeyValue *kv = kvit->next(); sizeOfData += kv->getSize(); @@ -227,7 +227,7 @@ void Commit::setKVsMap(Hashsetclear(); keyValueUpdateSet->addAll(newKVs); liveKeys->clear(); - SetIterator *kvit = newKVs->iterator(); + SetIterator *kvit = newKVs->iterator(); while (kvit->hasNext()) { liveKeys->add(kvit->next()->getKey()); } @@ -241,7 +241,7 @@ Commit *Commit_merge(Commit *newer, Commit *older, int64_t newSequenceNumber) { return older; } Hashset *kvSet = new Hashset(); - SetIterator *kvit = older->getKeyValueUpdateSet()->iterator(); + SetIterator *kvit = older->getKeyValueUpdateSet()->iterator(); while (kvit->hasNext()) { KeyValue *kv = kvit->next(); kvSet->add(kv); diff --git a/version2/src/C/PendingTransaction.cc b/version2/src/C/PendingTransaction.cc index b1f6db3..f453219 100644 --- a/version2/src/C/PendingTransaction.cc +++ b/version2/src/C/PendingTransaction.cc @@ -27,7 +27,7 @@ void PendingTransaction::addKV(KeyValue *newKV) { KeyValue *rmKV = NULL; // Make sure there are no duplicates - SetIterator *kvit = keyValueUpdateSet->iterator(); + SetIterator *kvit = keyValueUpdateSet->iterator(); while (kvit->hasNext()) { KeyValue *kv = kvit->next(); if (kv->getKey()->equals(newKV->getKey())) { @@ -71,7 +71,7 @@ bool PendingTransaction::checkArbitrator(int64_t arb) { } bool PendingTransaction::evaluateGuard(Hashtable *keyValTableCommitted, Hashtable *keyValTableSpeculative, Hashtable *keyValTablePendingTransSpeculative) { - SetIterator *kvit = keyValueGuardSet->iterator(); + SetIterator *kvit = keyValueGuardSet->iterator(); while (kvit->hasNext()) { KeyValue *kvGuard = kvit->next(); // First check if the key is in the speculative table, this is the @@ -139,7 +139,7 @@ Transaction *PendingTransaction::createTransaction() { } // Add the Guard Conditions - SetIterator *kvit = keyValueGuardSet->iterator(); + SetIterator *kvit = keyValueGuardSet->iterator(); while (kvit->hasNext()) { KeyValue *kv = kvit->next(); newTransaction->addGuardKV(kv); @@ -170,7 +170,7 @@ Array *PendingTransaction::convertDataToBytes() { bbEncode->putInt(keyValueUpdateSet->size()); // Encode all the guard conditions - SetIterator *kvit = keyValueGuardSet->iterator(); + SetIterator *kvit = keyValueGuardSet->iterator(); while (kvit->hasNext()) { KeyValue *kv = kvit->next(); kv->encode(bbEncode); diff --git a/version2/src/C/Table.cc b/version2/src/C/Table.cc index 424e105..1bdc259 100644 --- a/version2/src/C/Table.cc +++ b/version2/src/C/Table.cc @@ -19,6 +19,18 @@ #include "Commit.h" #include "RejectedMessage.h" #include "SlotIndexer.h" +#include + +int compareInt64(const void * a, const void *b) { + const int64_t * pa = (const int64_t *) a; + const int64_t * pb = (const int64_t *) b; + if (*pa < *pb) + return -1; + else if (*pa > *pb) + return 1; + else + return 0; +} Table::Table(IoTString *baseurl, IoTString *password, int64_t _localMachineId, int listeningPort) : buffer(NULL), @@ -166,7 +178,7 @@ void Table::init() { lastArbitratedTransactionNumberByArbitratorTable = new Hashtable(); liveTransactionBySequenceNumberTable = new Hashtable(); liveTransactionByTransactionIdTable = new Hashtable *, Transaction *, uintptr_t, 0, pairHashFunction, pairEquals>(); - liveCommitsTable = new Hashtable >(); + liveCommitsTable = new Hashtable * >(); liveCommitsByKeyTable = new Hashtable(); lastCommitSeenSequenceNumberByArbitratorTable = new Hashtable(); rejectedSlotVector = new Vector(); @@ -974,7 +986,6 @@ Pair Table::sendTransactionToLocal(Transaction *transaction) { } Array *Table::acceptDataFromLocal(Array *data) { - // Decode the data ByteBuffer *bbDecode = ByteBuffer_wrap(data); int64_t lastArbitratedSequenceNumberSeen = bbDecode->getLong(); @@ -1012,8 +1023,16 @@ Array *Table::acceptDataFromLocal(Array *data) { Vector *unseenArbitrations = new Vector(); // Get the aborts to send back - Vector *abortLocalSequenceNumbers = new Vector(liveAbortsGeneratedByLocal->keySet()); - Collections->sort(abortLocalSequenceNumbers); + Vector *abortLocalSequenceNumbers = new Vector(); + { + SetIterator *abortit = getKeyIterator(liveAbortsGeneratedByLocal); + while(abortit->hasNext()) + abortLocalSequenceNumbers->add(abortit->next()); + delete abortit; + } + + qsort(abortLocalSequenceNumbers->expose(), abortLocalSequenceNumbers->size(), sizeof(int64_t), compareInt64); + uint asize = abortLocalSequenceNumbers->size(); for(uint i=0; iget(i); @@ -1029,8 +1048,14 @@ Array *Table::acceptDataFromLocal(Array *data) { // Get the commits to send back Hashtable *commitForClientTable = liveCommitsTable->get(localMachineId); if (commitForClientTable != NULL) { - Vector *commitLocalSequenceNumbers = new Vector(commitForClientTable->keySet()); - Collections->sort(commitLocalSequenceNumbers); + Vector *commitLocalSequenceNumbers = new Vector(); + { + SetIterator *commitit = getKeyIterator(commitForClientTable); + while(commitit->hasNext()) + commitLocalSequenceNumbers->add(commitit->next()); + delete commitit; + } + qsort(commitLocalSequenceNumbers->expose(), commitLocalSequenceNumbers->size(), sizeof(int64_t), compareInt64); uint clsSize = commitLocalSequenceNumbers->size(); for(uint clsi = 0; clsi < clsSize; clsi++) { @@ -1041,10 +1066,14 @@ Array *Table::acceptDataFromLocal(Array *data) { continue; } - unseenArbitrations->addAll(commit->getParts()->values()); - - for (CommitPart *commitPart : commit->getParts()->values()) { - returnDataSize += commitPart->getSize(); + { + Vector * parts = commit->getParts(); + uint nParts = parts->size(); + for(uint i=0; iget(i); + unseenArbitrations->add(commitPart); + returnDataSize += commitPart->getSize(); + } } } } @@ -1336,7 +1365,9 @@ ThreeTuple Table::doMandatoryResuce(Slot *slot, bool resize Vector *liveEntries = previousSlot->getLiveEntries(resize); // Iterate over all the live entries and try to rescue them - for (Entry *liveEntry : liveEntries) { + uint lESize = liveEntries->size(); + for (uint i=0; i< lESize; i++) { + Entry * liveEntry = liveEntries->get(i); if (slot->hasSpace(liveEntry)) { // Enough space to rescue the entry slot->addEntry(liveEntry); @@ -1368,7 +1399,9 @@ search: continue; seenliveslot = true; Vector *liveentries = prevslot->getLiveEntries(resize); - for (Entry *liveentry : liveentries) { + uint lESize = liveentries->size(); + for (uint i=0; i< lESize; i++) { + Entry * liveentry = liveentries->get(i); if (s->hasSpace(liveentry)) s->addEntry(liveentry); else { @@ -1407,13 +1440,22 @@ void Table::validateAndUpdate(Array *newSlots, bool acceptUpdatesToLocal checkHMACChain(indexer, newSlots); // Set to keep track of messages from clients - Hashset *machineSet = new Hashset(lastMessageTable->keySet()); + Hashset *machineSet = new Hashset(); + { + SetIterator *> * lmit=getKeyIterator(lastMessageTable); + while(lmit->hasNext()) + machineSet->add(lmit->next()); + delete lmit; + } // Process each slots data - for (Slot *slot : newSlots) { - processSlot(indexer, slot, acceptUpdatesToLocal, machineSet); - - updateExpectedSize(); + { + uint numSlots = newSlots->length(); + for(uint i=0; iget(i); + processSlot(indexer, slot, acceptUpdatesToLocal, machineSet); + updateExpectedSize(); + } } // If there is a gap, check to see if the server sent us @@ -1422,7 +1464,7 @@ void Table::validateAndUpdate(Array *newSlots, bool acceptUpdatesToLocal // Check the size of the slots that were sent down by the server-> // Can only check the size if there was a gap - checkNumSlots(newSlots->length); + checkNumSlots(newSlots->length()); // Since there was a gap every machine must have pushed a slot or // must have a last message message-> If not then the server is @@ -1436,16 +1478,19 @@ void Table::validateAndUpdate(Array *newSlots, bool acceptUpdatesToLocal commitNewMaxSize(); // Commit new to slots to the local block chain-> - for (Slot *slot : newSlots) { - - // Insert this slot into our local block chain copy-> - buffer->putSlot(slot); - - // Keep track of how many slots are currently live (have live data - // in them)-> - liveSlotCount++; + { + uint numSlots = newSlots->length(); + for(uint i=0; iget(i); + + // Insert this slot into our local block chain copy-> + buffer->putSlot(slot); + + // Keep track of how many slots are currently live (have live data + // in them)-> + liveSlotCount++; + } } - // Get the sequence number of the latest slot in the system sequenceNumber = newSlots->get(newSlots->length() - 1)->getSequenceNumber(); updateLiveStateFromServer(); @@ -1521,11 +1566,6 @@ void Table::checkNumSlots(int numberOfSlots) { } } -void Table::updateCurrMaxSize(int newmaxsize) { - currMaxSize = newmaxsize; -} - - /** * Update the size of of the local buffer if it is needed-> */ @@ -1558,20 +1598,26 @@ void Table::processNewTransactionParts() { // Iterate through all the machine Ids that we received new parts // for - for (int64_t machineId : newTransactionParts->keySet()) { + SetIterator *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *> * tpit= getKeyIterator(newTransactionParts); + while(tpit->hasNext()) { + int64_t machineId = tpit->next(); Hashtable *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *parts = newTransactionParts->get(machineId); + SetIterator *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *ptit = getKeyIterator(parts); // Iterate through all the parts for that machine Id - for (Pair partId : parts->keySet()) { + while(ptit->hasNext()) { + Pair * partId = ptit->next(); TransactionPart *part = parts->get(partId); - int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(part->getArbitratorId()); - if ((lastTransactionNumber != NULL) && (lastTransactionNumber >= part->getSequenceNumber())) { - // Set dead the transaction part - part->setDead(); - continue; + if (lastArbitratedTransactionNumberByArbitratorTable->contains(part->getArbitratorId())) { + int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(part->getArbitratorId()); + if (lastTransactionNumber >= part->getSequenceNumber()) { + // Set dead the transaction part + part->setDead(); + continue; + } } - + // Get the transaction object for that sequence number Transaction *transaction = liveTransactionBySequenceNumberTable->get(part->getSequenceNumber()); @@ -1581,14 +1627,15 @@ void Table::processNewTransactionParts() { // Insert this new transaction into the live tables liveTransactionBySequenceNumberTable->put(part->getSequenceNumber(), transaction); - liveTransactionByTransactionIdTable->put(part->getTransactionId(), transaction); + liveTransactionByTransactionIdTable->put(new Pair(part->getTransactionId()), transaction); } // Add that part to the transaction transaction->addPartDecode(part); } + delete ptit; } - + delete tpit; // Clear all the new transaction parts in preparation for the next // time the server sends slots newTransactionParts->clear(); @@ -1603,7 +1650,7 @@ void Table::arbitrateFromServer() { // Get the transaction sequence numbers and sort from oldest to newest Vector *transactionSequenceNumbers = new Vector(liveTransactionBySequenceNumberTable->keySet()); - Collections->sort(transactionSequenceNumbers); + qsort(transactionSequenceNumbers->expose(), transactionSequenceNumbers->size(), sizeof(int64_t), compareInt64); // Collection of key value pairs that are Hashtable * speculativeTableTmp = new Hashtable(); @@ -1848,7 +1895,7 @@ bool Table::compactArbitrationData() { int numberToDelete = 1; while (numberToDelete < pendingSendArbitrationRounds->size()) { - ArbitrationRound *xs round = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - numberToDelete - 1); + ArbitrationRound *round = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - numberToDelete - 1); if (round->isFull() || round->getDidSendPart()) { // Stop since there is a part that cannot be compacted and we @@ -1859,14 +1906,14 @@ bool Table::compactArbitrationData() { if (round->getCommit() == NULL) { // Try compacting aborts only int newSize = round->getCurrentSize() + lastRound->getAbortsCount(); - if (newSize > ArbitrationRound->MAX_PARTS) { + if (newSize > ArbitrationRound_MAX_PARTS) { // Cant compact since it would be too large break; } lastRound->addAborts(round->getAborts()); } else { // Create a new larger commit - Commit newCommit = Commit->merge(lastRound->getCommit(), round->getCommit(), localArbitrationSequenceNumber); + Commit * newCommit = Commit_merge(lastRound->getCommit(), round->getCommit(), localArbitrationSequenceNumber); localArbitrationSequenceNumber++; // Create the commit parts so that we can count them @@ -1877,7 +1924,7 @@ bool Table::compactArbitrationData() { newSize += lastRound->getAbortsCount(); newSize += round->getAbortsCount(); - if (newSize > ArbitrationRound->MAX_PARTS) { + if (newSize > ArbitrationRound_MAX_PARTS) { // Cant compact since it would be too large break; } @@ -1898,7 +1945,7 @@ bool Table::compactArbitrationData() { pendingSendArbitrationRounds->clear(); } else { for (int i = 0; i < numberToDelete; i++) { - pendingSendArbitrationRounds->remove(pendingSendArbitrationRounds->size() - 1); + pendingSendArbitrationRounds->removeIndex(pendingSendArbitrationRounds->size() - 1); } } @@ -1972,7 +2019,7 @@ bool Table::updateCommittedTable() { // Sort the commits in order Vector *commitSequenceNumbers = new Vector(commitForClientTable->keySet()); - Collections->sort(commitSequenceNumbers); + qsort(commitSequenceNumbers->expose(), commitSequenceNumbers->size(), sizeof(int64_t), compareInt64); // Get the last commit seen from this arbitrator int64_t lastCommitSeenSequenceNumber = -1; @@ -2120,7 +2167,7 @@ bool Table::updateSpeculativeTable(bool didProcessNewCommits) { // Create a list of the transaction sequence numbers and sort them // from oldest to newest Vector *transactionSequenceNumbersSorted = new Vector(liveTransactionBySequenceNumberTable->keySet()); - Collections->sort(transactionSequenceNumbersSorted); + qsort(transactionSequenceNumberSorted->expose(), transactionSequenceNumbersSorted->size(), sizeof(int64_t), compareInt64); bool hasGapInTransactionSequenceNumbers = transactionSequenceNumbersSorted->get(0) != oldestTransactionSequenceNumberSpeculatedOn; @@ -2347,7 +2394,7 @@ void Table::processEntry(NewKey *entry) { * seen in this current round of updating the local copy of the block * chain */ -void Table::processEntry(TableStatus entry, int64_t seq) { +void Table::processEntry(TableStatus * entry, int64_t seq) { int newNumSlots = entry->getMaxSlots(); updateCurrMaxSize(newNumSlots); initExpectedSize(seq, newNumSlots); diff --git a/version2/src/C/Table.h b/version2/src/C/Table.h index eb9faca..394e6d6 100644 --- a/version2/src/C/Table.h +++ b/version2/src/C/Table.h @@ -71,7 +71,7 @@ private: Hashtable *lastArbitratedTransactionNumberByArbitratorTable; // Last transaction sequence number that an arbitrator arbitrated on Hashtable *liveTransactionBySequenceNumberTable; // live transaction grouped by the sequence number Hashtable *, Transaction *, uintptr_t, 0, pairHashFunction, pairEquals> *liveTransactionByTransactionIdTable; // live transaction grouped by the transaction ID - Hashtable > *liveCommitsTable; + Hashtable *> *liveCommitsTable; Hashtable *liveCommitsByKeyTable; Hashtable *lastCommitSeenSequenceNumberByArbitratorTable; Vector *rejectedSlotVector; // Vector of rejected slots that have yet to be sent to the server diff --git a/version2/src/C/Transaction.cc b/version2/src/C/Transaction.cc index 77c3590..d9dbf91 100644 --- a/version2/src/C/Transaction.cc +++ b/version2/src/C/Transaction.cc @@ -205,8 +205,8 @@ bool Transaction::isComplete() { return fldisComplete; } -Pair Transaction::getId() { - return transactionId; +Pair * Transaction::getId() { + return & transactionId; } void Transaction::setDead() { @@ -265,7 +265,7 @@ void Transaction::decodeTransactionData() { } bool Transaction::evaluateGuard(Hashtable *committedKeyValueTable, Hashtable *speculatedKeyValueTable, Hashtable *pendingTransactionSpeculatedKeyValueTable) { - SetIterator *kvit = keyValueGuardSet->iterator(); + SetIterator *kvit = keyValueGuardSet->iterator(); while (kvit->hasNext()) { KeyValue *kvGuard = kvit->next(); // First check if the key is in the speculative table, this is the value of the latest assumption diff --git a/version2/src/C/Transaction.h b/version2/src/C/Transaction.h index f635a7a..240c535 100644 --- a/version2/src/C/Transaction.h +++ b/version2/src/C/Transaction.h @@ -50,7 +50,7 @@ public: int64_t getMachineId(); int64_t getArbitrator(); bool isComplete(); - Pair getId(); + Pair * getId(); void setDead(); TransactionPart *getPart(int32_t index); bool evaluateGuard(Hashtable *committedKeyValueTable, Hashtable *speculatedKeyValueTable, Hashtable *pendingTransactionSpeculatedKeyValueTable); diff --git a/version2/src/C/vector.h b/version2/src/C/vector.h index ccd8649..1d468a7 100644 --- a/version2/src/C/vector.h +++ b/version2/src/C/vector.h @@ -42,6 +42,13 @@ public: } } + void removeIndex(uint i) { + for (i++; i