edits
[iotcloud.git] / version2 / src / C / Table.cc
index 424e105..1bdc259 100644 (file)
 #include "Commit.h"
 #include "RejectedMessage.h"
 #include "SlotIndexer.h"
+#include <stdlib.h>
+
+int compareInt64(const void * a, const void *b) {
+       const int64_t * pa = (const int64_t *) a;
+       const int64_t * pb = (const int64_t *) b;
+       if (*pa < *pb)
+               return -1;
+       else if (*pa > *pb)
+               return 1;
+       else
+               return 0;
+}
 
 Table::Table(IoTString *baseurl, IoTString *password, int64_t _localMachineId, int listeningPort) :
        buffer(NULL),
@@ -166,7 +178,7 @@ void Table::init() {
        lastArbitratedTransactionNumberByArbitratorTable = new Hashtable<int64_t, int64_t>();
        liveTransactionBySequenceNumberTable = new Hashtable<int64_t, Transaction *>();
        liveTransactionByTransactionIdTable = new Hashtable<Pair<int64_t, int64_t> *, Transaction *, uintptr_t, 0, pairHashFunction, pairEquals>();
-       liveCommitsTable = new Hashtable<int64_t, Hashtable<int64_t, Commit *> >();
+       liveCommitsTable = new Hashtable<int64_t, Hashtable<int64_t, Commit *> >();
        liveCommitsByKeyTable = new Hashtable<IoTString *, Commit *>();
        lastCommitSeenSequenceNumberByArbitratorTable = new Hashtable<int64_t, int64_t>();
        rejectedSlotVector = new Vector<int64_t>();
@@ -974,7 +986,6 @@ Pair<bool, bool> Table::sendTransactionToLocal(Transaction *transaction) {
 }
 
 Array<char> *Table::acceptDataFromLocal(Array<char> *data) {
-
        // Decode the data
        ByteBuffer *bbDecode = ByteBuffer_wrap(data);
        int64_t lastArbitratedSequenceNumberSeen = bbDecode->getLong();
@@ -1012,8 +1023,16 @@ Array<char> *Table::acceptDataFromLocal(Array<char> *data) {
        Vector<Entry *> *unseenArbitrations = new Vector<Entry *>();
 
        // Get the aborts to send back
-       Vector<int64_t> *abortLocalSequenceNumbers = new Vector<int64_t>(liveAbortsGeneratedByLocal->keySet());
-       Collections->sort(abortLocalSequenceNumbers);
+       Vector<int64_t> *abortLocalSequenceNumbers = new Vector<int64_t>();
+       {
+               SetIterator<int64_t, Abort *> *abortit = getKeyIterator(liveAbortsGeneratedByLocal);
+               while(abortit->hasNext())
+                       abortLocalSequenceNumbers->add(abortit->next());
+               delete abortit;
+       }
+       
+       qsort(abortLocalSequenceNumbers->expose(), abortLocalSequenceNumbers->size(), sizeof(int64_t), compareInt64);
+
        uint asize = abortLocalSequenceNumbers->size();
        for(uint i=0; i<asize; i++) {
                int64_t localSequenceNumber = abortLocalSequenceNumbers->get(i);
@@ -1029,8 +1048,14 @@ Array<char> *Table::acceptDataFromLocal(Array<char> *data) {
        // Get the commits to send back
        Hashtable<int64_t, Commit *> *commitForClientTable = liveCommitsTable->get(localMachineId);
        if (commitForClientTable != NULL) {
-               Vector<int64_t> *commitLocalSequenceNumbers = new Vector<int64_t>(commitForClientTable->keySet());
-               Collections->sort(commitLocalSequenceNumbers);
+               Vector<int64_t> *commitLocalSequenceNumbers = new Vector<int64_t>();
+               {
+                       SetIterator<int64_t, Commit *> *commitit = getKeyIterator(commitForClientTable);
+                       while(commitit->hasNext())
+                               commitLocalSequenceNumbers->add(commitit->next());
+                       delete commitit;
+               }
+               qsort(commitLocalSequenceNumbers->expose(), commitLocalSequenceNumbers->size(), sizeof(int64_t), compareInt64);
 
                uint clsSize = commitLocalSequenceNumbers->size();
                for(uint clsi = 0; clsi < clsSize; clsi++) {
@@ -1041,10 +1066,14 @@ Array<char> *Table::acceptDataFromLocal(Array<char> *data) {
                                continue;
                        }
 
-                       unseenArbitrations->addAll(commit->getParts()->values());
-
-                       for (CommitPart *commitPart : commit->getParts()->values()) {
-                               returnDataSize += commitPart->getSize();
+                       {
+                               Vector<CommitPart *> * parts = commit->getParts();
+                               uint nParts = parts->size();
+                               for(uint i=0; i<nParts; i++) {
+                                       CommitPart * commitPart = parts->get(i);
+                                       unseenArbitrations->add(commitPart);
+                                       returnDataSize += commitPart->getSize();
+                               }
                        }
                }
        }
@@ -1336,7 +1365,9 @@ ThreeTuple<bool, bool, int64_t> Table::doMandatoryResuce(Slot *slot, bool resize
                Vector<Entry *> *liveEntries = previousSlot->getLiveEntries(resize);
 
                // Iterate over all the live entries and try to rescue them
-               for (Entry *liveEntry : liveEntries) {
+               uint lESize = liveEntries->size();
+               for (uint i=0; i< lESize; i++) {
+                       Entry * liveEntry = liveEntries->get(i);
                        if (slot->hasSpace(liveEntry)) {
                                // Enough space to rescue the entry
                                slot->addEntry(liveEntry);
@@ -1368,7 +1399,9 @@ search:
                        continue;
                seenliveslot = true;
                Vector<Entry *> *liveentries = prevslot->getLiveEntries(resize);
-               for (Entry *liveentry : liveentries) {
+               uint lESize = liveentries->size();
+               for (uint i=0; i< lESize; i++) {
+                       Entry * liveentry = liveentries->get(i);
                        if (s->hasSpace(liveentry))
                                s->addEntry(liveentry);
                        else {
@@ -1407,13 +1440,22 @@ void Table::validateAndUpdate(Array<Slot *> *newSlots, bool acceptUpdatesToLocal
        checkHMACChain(indexer, newSlots);
 
        // Set to keep track of messages from clients
-       Hashset<int64_t> *machineSet = new Hashset<int64_t>(lastMessageTable->keySet());
+       Hashset<int64_t> *machineSet = new Hashset<int64_t>();
+       {
+               SetIterator<int64_t, Pair<int64_t, Liveness *> *> * lmit=getKeyIterator(lastMessageTable);
+               while(lmit->hasNext())
+                       machineSet->add(lmit->next());
+               delete lmit;
+       }
 
        // Process each slots data
-       for (Slot *slot : newSlots) {
-               processSlot(indexer, slot, acceptUpdatesToLocal, machineSet);
-
-               updateExpectedSize();
+       {
+               uint numSlots = newSlots->length();
+               for(uint i=0; i<numSlots; i++) {
+                       Slot *slot = newSlots->get(i);
+                       processSlot(indexer, slot, acceptUpdatesToLocal, machineSet);
+                       updateExpectedSize();
+               }
        }
 
        // If there is a gap, check to see if the server sent us
@@ -1422,7 +1464,7 @@ void Table::validateAndUpdate(Array<Slot *> *newSlots, bool acceptUpdatesToLocal
 
                // Check the size of the slots that were sent down by the server->
                // Can only check the size if there was a gap
-               checkNumSlots(newSlots->length);
+               checkNumSlots(newSlots->length());
 
                // Since there was a gap every machine must have pushed a slot or
                // must have a last message message-> If not then the server is
@@ -1436,16 +1478,19 @@ void Table::validateAndUpdate(Array<Slot *> *newSlots, bool acceptUpdatesToLocal
        commitNewMaxSize();
 
        // Commit new to slots to the local block chain->
-       for (Slot *slot : newSlots) {
-
-               // Insert this slot into our local block chain copy->
-               buffer->putSlot(slot);
-
-               // Keep track of how many slots are currently live (have live data
-               // in them)->
-               liveSlotCount++;
+       {
+               uint numSlots = newSlots->length();
+               for(uint i=0; i<numSlots; i++) {
+                       Slot *slot = newSlots->get(i);
+                       
+                       // Insert this slot into our local block chain copy->
+                       buffer->putSlot(slot);
+                       
+                       // Keep track of how many slots are currently live (have live data
+                       // in them)->
+                       liveSlotCount++;
+               }
        }
-
        // Get the sequence number of the latest slot in the system
        sequenceNumber = newSlots->get(newSlots->length() - 1)->getSequenceNumber();
        updateLiveStateFromServer();
@@ -1521,11 +1566,6 @@ void Table::checkNumSlots(int numberOfSlots) {
        }
 }
 
-void Table::updateCurrMaxSize(int newmaxsize) {
-       currMaxSize = newmaxsize;
-}
-
-
 /**
  * Update the size of of the local buffer if it is needed->
  */
@@ -1558,20 +1598,26 @@ void Table::processNewTransactionParts() {
 
        // Iterate through all the machine Ids that we received new parts
        // for
-       for (int64_t machineId : newTransactionParts->keySet()) {
+       SetIterator<int64_t, Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *> * tpit= getKeyIterator(newTransactionParts);
+       while(tpit->hasNext()) {
+               int64_t machineId = tpit->next();
                Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *parts = newTransactionParts->get(machineId);
 
+               SetIterator<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *ptit = getKeyIterator(parts);
                // Iterate through all the parts for that machine Id
-               for (Pair<int64_t, int32_t> partId : parts->keySet()) {
+               while(ptit->hasNext()) {
+                       Pair<int64_t, int32_t> * partId = ptit->next();
                        TransactionPart *part = parts->get(partId);
 
-                       int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(part->getArbitratorId());
-                       if ((lastTransactionNumber != NULL) && (lastTransactionNumber >= part->getSequenceNumber())) {
-                               // Set dead the transaction part
-                               part->setDead();
-                               continue;
+                       if (lastArbitratedTransactionNumberByArbitratorTable->contains(part->getArbitratorId())) {
+                               int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(part->getArbitratorId());
+                               if (lastTransactionNumber >= part->getSequenceNumber()) {
+                                       // Set dead the transaction part
+                                       part->setDead();
+                                       continue;
+                               }
                        }
-
+                       
                        // Get the transaction object for that sequence number
                        Transaction *transaction = liveTransactionBySequenceNumberTable->get(part->getSequenceNumber());
 
@@ -1581,14 +1627,15 @@ void Table::processNewTransactionParts() {
 
                                // Insert this new transaction into the live tables
                                liveTransactionBySequenceNumberTable->put(part->getSequenceNumber(), transaction);
-                               liveTransactionByTransactionIdTable->put(part->getTransactionId(), transaction);
+                               liveTransactionByTransactionIdTable->put(new Pair<int64_t, int64_t>(part->getTransactionId()), transaction);
                        }
 
                        // Add that part to the transaction
                        transaction->addPartDecode(part);
                }
+               delete ptit;
        }
-
+       delete tpit;
        // Clear all the new transaction parts in preparation for the next
        // time the server sends slots
        newTransactionParts->clear();
@@ -1603,7 +1650,7 @@ void Table::arbitrateFromServer() {
 
        // Get the transaction sequence numbers and sort from oldest to newest
        Vector<int64_t> *transactionSequenceNumbers = new Vector<int64_t>(liveTransactionBySequenceNumberTable->keySet());
-       Collections->sort(transactionSequenceNumbers);
+       qsort(transactionSequenceNumbers->expose(), transactionSequenceNumbers->size(), sizeof(int64_t), compareInt64);
 
        // Collection of key value pairs that are
        Hashtable<IoTString *, KeyValue *> * speculativeTableTmp = new Hashtable<IoTString *, KeyValue *>();
@@ -1848,7 +1895,7 @@ bool Table::compactArbitrationData() {
 
        int numberToDelete = 1;
        while (numberToDelete < pendingSendArbitrationRounds->size()) {
-               ArbitrationRound *xs round = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - numberToDelete - 1);
+               ArbitrationRound *round = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - numberToDelete - 1);
 
                if (round->isFull() || round->getDidSendPart()) {
                        // Stop since there is a part that cannot be compacted and we
@@ -1859,14 +1906,14 @@ bool Table::compactArbitrationData() {
                if (round->getCommit() == NULL) {
                        // Try compacting aborts only
                        int newSize = round->getCurrentSize() + lastRound->getAbortsCount();
-                       if (newSize > ArbitrationRound->MAX_PARTS) {
+                       if (newSize > ArbitrationRound_MAX_PARTS) {
                                // Cant compact since it would be too large
                                break;
                        }
                        lastRound->addAborts(round->getAborts());
                } else {
                        // Create a new larger commit
-                       Commit newCommit = Commit->merge(lastRound->getCommit(), round->getCommit(), localArbitrationSequenceNumber);
+                       Commit * newCommit = Commit_merge(lastRound->getCommit(), round->getCommit(), localArbitrationSequenceNumber);
                        localArbitrationSequenceNumber++;
 
                        // Create the commit parts so that we can count them
@@ -1877,7 +1924,7 @@ bool Table::compactArbitrationData() {
                        newSize += lastRound->getAbortsCount();
                        newSize += round->getAbortsCount();
 
-                       if (newSize > ArbitrationRound->MAX_PARTS) {
+                       if (newSize > ArbitrationRound_MAX_PARTS) {
                                // Cant compact since it would be too large
                                break;
                        }
@@ -1898,7 +1945,7 @@ bool Table::compactArbitrationData() {
                        pendingSendArbitrationRounds->clear();
                } else {
                        for (int i = 0; i < numberToDelete; i++) {
-                               pendingSendArbitrationRounds->remove(pendingSendArbitrationRounds->size() - 1);
+                               pendingSendArbitrationRounds->removeIndex(pendingSendArbitrationRounds->size() - 1);
                        }
                }
 
@@ -1972,7 +2019,7 @@ bool Table::updateCommittedTable() {
 
                // Sort the commits in order
                Vector<int64_t> *commitSequenceNumbers = new Vector<int64_t>(commitForClientTable->keySet());
-               Collections->sort(commitSequenceNumbers);
+               qsort(commitSequenceNumbers->expose(), commitSequenceNumbers->size(), sizeof(int64_t), compareInt64);
 
                // Get the last commit seen from this arbitrator
                int64_t lastCommitSeenSequenceNumber = -1;
@@ -2120,7 +2167,7 @@ bool Table::updateSpeculativeTable(bool didProcessNewCommits) {
        // Create a list of the transaction sequence numbers and sort them
        // from oldest to newest
        Vector<int64_t> *transactionSequenceNumbersSorted = new Vector<int64_t>(liveTransactionBySequenceNumberTable->keySet());
-       Collections->sort(transactionSequenceNumbersSorted);
+       qsort(transactionSequenceNumberSorted->expose(), transactionSequenceNumbersSorted->size(), sizeof(int64_t), compareInt64);
 
        bool hasGapInTransactionSequenceNumbers = transactionSequenceNumbersSorted->get(0) != oldestTransactionSequenceNumberSpeculatedOn;
 
@@ -2347,7 +2394,7 @@ void Table::processEntry(NewKey *entry) {
  * seen in this current round of updating the local copy of the block
  * chain
  */
-void Table::processEntry(TableStatus entry, int64_t seq) {
+void Table::processEntry(TableStatus entry, int64_t seq) {
        int newNumSlots = entry->getMaxSlots();
        updateCurrMaxSize(newNumSlots);
        initExpectedSize(seq, newNumSlots);