X-Git-Url: http://plrg.eecs.uci.edu/git/?p=iotcloud.git;a=blobdiff_plain;f=version2%2Fsrc%2FC%2FTable.cc;h=1bdc259945f940e1ec05bd43c9a482c597de0816;hp=424e105690043f40fa83beae1b5d67b39863c2e3;hb=9c3fa5cbce287df14626d262bd0179e994338869;hpb=3e80b7cd423be3b7961a5f8d22bb6c274f1bc83a diff --git a/version2/src/C/Table.cc b/version2/src/C/Table.cc index 424e105..1bdc259 100644 --- a/version2/src/C/Table.cc +++ b/version2/src/C/Table.cc @@ -19,6 +19,18 @@ #include "Commit.h" #include "RejectedMessage.h" #include "SlotIndexer.h" +#include + +int compareInt64(const void * a, const void *b) { + const int64_t * pa = (const int64_t *) a; + const int64_t * pb = (const int64_t *) b; + if (*pa < *pb) + return -1; + else if (*pa > *pb) + return 1; + else + return 0; +} Table::Table(IoTString *baseurl, IoTString *password, int64_t _localMachineId, int listeningPort) : buffer(NULL), @@ -166,7 +178,7 @@ void Table::init() { lastArbitratedTransactionNumberByArbitratorTable = new Hashtable(); liveTransactionBySequenceNumberTable = new Hashtable(); liveTransactionByTransactionIdTable = new Hashtable *, Transaction *, uintptr_t, 0, pairHashFunction, pairEquals>(); - liveCommitsTable = new Hashtable >(); + liveCommitsTable = new Hashtable * >(); liveCommitsByKeyTable = new Hashtable(); lastCommitSeenSequenceNumberByArbitratorTable = new Hashtable(); rejectedSlotVector = new Vector(); @@ -974,7 +986,6 @@ Pair Table::sendTransactionToLocal(Transaction *transaction) { } Array *Table::acceptDataFromLocal(Array *data) { - // Decode the data ByteBuffer *bbDecode = ByteBuffer_wrap(data); int64_t lastArbitratedSequenceNumberSeen = bbDecode->getLong(); @@ -1012,8 +1023,16 @@ Array *Table::acceptDataFromLocal(Array *data) { Vector *unseenArbitrations = new Vector(); // Get the aborts to send back - Vector *abortLocalSequenceNumbers = new Vector(liveAbortsGeneratedByLocal->keySet()); - Collections->sort(abortLocalSequenceNumbers); + Vector *abortLocalSequenceNumbers = new Vector(); + { + SetIterator *abortit = getKeyIterator(liveAbortsGeneratedByLocal); + while(abortit->hasNext()) + abortLocalSequenceNumbers->add(abortit->next()); + delete abortit; + } + + qsort(abortLocalSequenceNumbers->expose(), abortLocalSequenceNumbers->size(), sizeof(int64_t), compareInt64); + uint asize = abortLocalSequenceNumbers->size(); for(uint i=0; iget(i); @@ -1029,8 +1048,14 @@ Array *Table::acceptDataFromLocal(Array *data) { // Get the commits to send back Hashtable *commitForClientTable = liveCommitsTable->get(localMachineId); if (commitForClientTable != NULL) { - Vector *commitLocalSequenceNumbers = new Vector(commitForClientTable->keySet()); - Collections->sort(commitLocalSequenceNumbers); + Vector *commitLocalSequenceNumbers = new Vector(); + { + SetIterator *commitit = getKeyIterator(commitForClientTable); + while(commitit->hasNext()) + commitLocalSequenceNumbers->add(commitit->next()); + delete commitit; + } + qsort(commitLocalSequenceNumbers->expose(), commitLocalSequenceNumbers->size(), sizeof(int64_t), compareInt64); uint clsSize = commitLocalSequenceNumbers->size(); for(uint clsi = 0; clsi < clsSize; clsi++) { @@ -1041,10 +1066,14 @@ Array *Table::acceptDataFromLocal(Array *data) { continue; } - unseenArbitrations->addAll(commit->getParts()->values()); - - for (CommitPart *commitPart : commit->getParts()->values()) { - returnDataSize += commitPart->getSize(); + { + Vector * parts = commit->getParts(); + uint nParts = parts->size(); + for(uint i=0; iget(i); + unseenArbitrations->add(commitPart); + returnDataSize += commitPart->getSize(); + } } } } @@ -1336,7 +1365,9 @@ ThreeTuple Table::doMandatoryResuce(Slot *slot, bool resize Vector *liveEntries = previousSlot->getLiveEntries(resize); // Iterate over all the live entries and try to rescue them - for (Entry *liveEntry : liveEntries) { + uint lESize = liveEntries->size(); + for (uint i=0; i< lESize; i++) { + Entry * liveEntry = liveEntries->get(i); if (slot->hasSpace(liveEntry)) { // Enough space to rescue the entry slot->addEntry(liveEntry); @@ -1368,7 +1399,9 @@ search: continue; seenliveslot = true; Vector *liveentries = prevslot->getLiveEntries(resize); - for (Entry *liveentry : liveentries) { + uint lESize = liveentries->size(); + for (uint i=0; i< lESize; i++) { + Entry * liveentry = liveentries->get(i); if (s->hasSpace(liveentry)) s->addEntry(liveentry); else { @@ -1407,13 +1440,22 @@ void Table::validateAndUpdate(Array *newSlots, bool acceptUpdatesToLocal checkHMACChain(indexer, newSlots); // Set to keep track of messages from clients - Hashset *machineSet = new Hashset(lastMessageTable->keySet()); + Hashset *machineSet = new Hashset(); + { + SetIterator *> * lmit=getKeyIterator(lastMessageTable); + while(lmit->hasNext()) + machineSet->add(lmit->next()); + delete lmit; + } // Process each slots data - for (Slot *slot : newSlots) { - processSlot(indexer, slot, acceptUpdatesToLocal, machineSet); - - updateExpectedSize(); + { + uint numSlots = newSlots->length(); + for(uint i=0; iget(i); + processSlot(indexer, slot, acceptUpdatesToLocal, machineSet); + updateExpectedSize(); + } } // If there is a gap, check to see if the server sent us @@ -1422,7 +1464,7 @@ void Table::validateAndUpdate(Array *newSlots, bool acceptUpdatesToLocal // Check the size of the slots that were sent down by the server-> // Can only check the size if there was a gap - checkNumSlots(newSlots->length); + checkNumSlots(newSlots->length()); // Since there was a gap every machine must have pushed a slot or // must have a last message message-> If not then the server is @@ -1436,16 +1478,19 @@ void Table::validateAndUpdate(Array *newSlots, bool acceptUpdatesToLocal commitNewMaxSize(); // Commit new to slots to the local block chain-> - for (Slot *slot : newSlots) { - - // Insert this slot into our local block chain copy-> - buffer->putSlot(slot); - - // Keep track of how many slots are currently live (have live data - // in them)-> - liveSlotCount++; + { + uint numSlots = newSlots->length(); + for(uint i=0; iget(i); + + // Insert this slot into our local block chain copy-> + buffer->putSlot(slot); + + // Keep track of how many slots are currently live (have live data + // in them)-> + liveSlotCount++; + } } - // Get the sequence number of the latest slot in the system sequenceNumber = newSlots->get(newSlots->length() - 1)->getSequenceNumber(); updateLiveStateFromServer(); @@ -1521,11 +1566,6 @@ void Table::checkNumSlots(int numberOfSlots) { } } -void Table::updateCurrMaxSize(int newmaxsize) { - currMaxSize = newmaxsize; -} - - /** * Update the size of of the local buffer if it is needed-> */ @@ -1558,20 +1598,26 @@ void Table::processNewTransactionParts() { // Iterate through all the machine Ids that we received new parts // for - for (int64_t machineId : newTransactionParts->keySet()) { + SetIterator *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *> * tpit= getKeyIterator(newTransactionParts); + while(tpit->hasNext()) { + int64_t machineId = tpit->next(); Hashtable *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *parts = newTransactionParts->get(machineId); + SetIterator *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *ptit = getKeyIterator(parts); // Iterate through all the parts for that machine Id - for (Pair partId : parts->keySet()) { + while(ptit->hasNext()) { + Pair * partId = ptit->next(); TransactionPart *part = parts->get(partId); - int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(part->getArbitratorId()); - if ((lastTransactionNumber != NULL) && (lastTransactionNumber >= part->getSequenceNumber())) { - // Set dead the transaction part - part->setDead(); - continue; + if (lastArbitratedTransactionNumberByArbitratorTable->contains(part->getArbitratorId())) { + int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(part->getArbitratorId()); + if (lastTransactionNumber >= part->getSequenceNumber()) { + // Set dead the transaction part + part->setDead(); + continue; + } } - + // Get the transaction object for that sequence number Transaction *transaction = liveTransactionBySequenceNumberTable->get(part->getSequenceNumber()); @@ -1581,14 +1627,15 @@ void Table::processNewTransactionParts() { // Insert this new transaction into the live tables liveTransactionBySequenceNumberTable->put(part->getSequenceNumber(), transaction); - liveTransactionByTransactionIdTable->put(part->getTransactionId(), transaction); + liveTransactionByTransactionIdTable->put(new Pair(part->getTransactionId()), transaction); } // Add that part to the transaction transaction->addPartDecode(part); } + delete ptit; } - + delete tpit; // Clear all the new transaction parts in preparation for the next // time the server sends slots newTransactionParts->clear(); @@ -1603,7 +1650,7 @@ void Table::arbitrateFromServer() { // Get the transaction sequence numbers and sort from oldest to newest Vector *transactionSequenceNumbers = new Vector(liveTransactionBySequenceNumberTable->keySet()); - Collections->sort(transactionSequenceNumbers); + qsort(transactionSequenceNumbers->expose(), transactionSequenceNumbers->size(), sizeof(int64_t), compareInt64); // Collection of key value pairs that are Hashtable * speculativeTableTmp = new Hashtable(); @@ -1848,7 +1895,7 @@ bool Table::compactArbitrationData() { int numberToDelete = 1; while (numberToDelete < pendingSendArbitrationRounds->size()) { - ArbitrationRound *xs round = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - numberToDelete - 1); + ArbitrationRound *round = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - numberToDelete - 1); if (round->isFull() || round->getDidSendPart()) { // Stop since there is a part that cannot be compacted and we @@ -1859,14 +1906,14 @@ bool Table::compactArbitrationData() { if (round->getCommit() == NULL) { // Try compacting aborts only int newSize = round->getCurrentSize() + lastRound->getAbortsCount(); - if (newSize > ArbitrationRound->MAX_PARTS) { + if (newSize > ArbitrationRound_MAX_PARTS) { // Cant compact since it would be too large break; } lastRound->addAborts(round->getAborts()); } else { // Create a new larger commit - Commit newCommit = Commit->merge(lastRound->getCommit(), round->getCommit(), localArbitrationSequenceNumber); + Commit * newCommit = Commit_merge(lastRound->getCommit(), round->getCommit(), localArbitrationSequenceNumber); localArbitrationSequenceNumber++; // Create the commit parts so that we can count them @@ -1877,7 +1924,7 @@ bool Table::compactArbitrationData() { newSize += lastRound->getAbortsCount(); newSize += round->getAbortsCount(); - if (newSize > ArbitrationRound->MAX_PARTS) { + if (newSize > ArbitrationRound_MAX_PARTS) { // Cant compact since it would be too large break; } @@ -1898,7 +1945,7 @@ bool Table::compactArbitrationData() { pendingSendArbitrationRounds->clear(); } else { for (int i = 0; i < numberToDelete; i++) { - pendingSendArbitrationRounds->remove(pendingSendArbitrationRounds->size() - 1); + pendingSendArbitrationRounds->removeIndex(pendingSendArbitrationRounds->size() - 1); } } @@ -1972,7 +2019,7 @@ bool Table::updateCommittedTable() { // Sort the commits in order Vector *commitSequenceNumbers = new Vector(commitForClientTable->keySet()); - Collections->sort(commitSequenceNumbers); + qsort(commitSequenceNumbers->expose(), commitSequenceNumbers->size(), sizeof(int64_t), compareInt64); // Get the last commit seen from this arbitrator int64_t lastCommitSeenSequenceNumber = -1; @@ -2120,7 +2167,7 @@ bool Table::updateSpeculativeTable(bool didProcessNewCommits) { // Create a list of the transaction sequence numbers and sort them // from oldest to newest Vector *transactionSequenceNumbersSorted = new Vector(liveTransactionBySequenceNumberTable->keySet()); - Collections->sort(transactionSequenceNumbersSorted); + qsort(transactionSequenceNumberSorted->expose(), transactionSequenceNumbersSorted->size(), sizeof(int64_t), compareInt64); bool hasGapInTransactionSequenceNumbers = transactionSequenceNumbersSorted->get(0) != oldestTransactionSequenceNumberSpeculatedOn; @@ -2347,7 +2394,7 @@ void Table::processEntry(NewKey *entry) { * seen in this current round of updating the local copy of the block * chain */ -void Table::processEntry(TableStatus entry, int64_t seq) { +void Table::processEntry(TableStatus * entry, int64_t seq) { int newNumSlots = entry->getMaxSlots(); updateCurrMaxSize(newNumSlots); initExpectedSize(seq, newNumSlots);