edits
authorbdemsky <bdemsky@uci.edu>
Mon, 12 Mar 2018 16:03:25 +0000 (09:03 -0700)
committerbdemsky <bdemsky@uci.edu>
Mon, 12 Mar 2018 16:03:25 +0000 (09:03 -0700)
version2/src/C/Table.cc
version2/src/C/Table.h
version2/src/C/hashtable.h

index 7c3fde0..88f43ee 100644 (file)
@@ -538,37 +538,7 @@ NewKey * Table::handlePartialSend(NewKey * newKey) {
                        }
                        delete trit;
                } else {
-                       bool isInserted = false;
-                       for (uint si = 0; si < newSlots->length(); si++) {
-                               Slot *s = newSlots->get(si);
-                               if ((s->getSequenceNumber() == lastSlotAttemptedToSend->getSequenceNumber()) && (s->getMachineID() == localMachineId)) {
-                                       isInserted = true;
-                                       break;
-                               }
-                       }
-                       
-                       for (uint si = 0; si < newSlots->length(); si++) {
-                               Slot *s = newSlots->get(si);
-                               if (isInserted) {
-                                       break;
-                               }
-                               
-                               // Process each entry in the slot
-                               Vector<Entry *> *ventries = s->getEntries();
-                               uint vesize = ventries->size();
-                               for (uint vei = 0; vei < vesize; vei++) {
-                                       Entry *entry = ventries->get(vei);
-                                       if (entry->getType() == TypeLastMessage) {
-                                               LastMessage *lastMessage = (LastMessage *)entry;
-                                               if ((lastMessage->getMachineID() == localMachineId) && (lastMessage->getSequenceNumber() == lastSlotAttemptedToSend->getSequenceNumber())) {
-                                                       isInserted = true;
-                                                       break;
-                                               }
-                                       }
-                               }
-                       }
-                       
-                       if (isInserted) {
+                       if (checkSend(newSlots, lastSlotAttemptedToSend)) {
                                if (newKey != NULL) {
                                        if (lastInsertedNewKey && (lastNewKey->getKey() == newKey->getKey()) && (lastNewKey->getMachineID() == newKey->getMachineID())) {
                                                newKey = NULL;
@@ -622,38 +592,7 @@ NewKey * Table::handlePartialSend(NewKey * newKey) {
                }
                // continue;
        } else {
-               bool isInserted = false;
-               for (uint si = 0; si < newSlots->length(); si++) {
-                       Slot *s = newSlots->get(si);
-                       if ((s->getSequenceNumber() == lastSlotAttemptedToSend->getSequenceNumber()) && (s->getMachineID() == localMachineId)) {
-                               isInserted = true;
-                               break;
-                       }
-               }
-               
-               for (uint si = 0; si < newSlots->length(); si++) {
-                       Slot *s = newSlots->get(si);
-                       if (isInserted) {
-                               break;
-                       }
-                       
-                       // Process each entry in the slot
-                       Vector<Entry *> *entries = s->getEntries();
-                       uint eSize = entries->size();
-                       for (uint ei = 0; ei < eSize; ei++) {
-                               Entry *entry = entries->get(ei);
-                               
-                               if (entry->getType() == TypeLastMessage) {
-                                       LastMessage *lastMessage = (LastMessage *)entry;
-                                       if ((lastMessage->getMachineID() == localMachineId) && (lastMessage->getSequenceNumber() == lastSlotAttemptedToSend->getSequenceNumber())) {
-                                               isInserted = true;
-                                               break;
-                                       }
-                               }
-                       }
-               }
-               
-               if (isInserted) {
+               if (checkSend(newSlots, lastSlotAttemptedToSend)) {
                        if (newKey != NULL) {
                                if (lastInsertedNewKey && (lastNewKey->getKey() == newKey->getKey()) && (lastNewKey->getMachineID() == newKey->getMachineID())) {
                                        newKey = NULL;
@@ -728,10 +667,9 @@ bool Table::sendToServer(NewKey *newKey) {
                        localSequenceNumber++;
 
                        // Try to fill the slot with data
-                       ThreeTuple<bool, int32_t, bool> fillSlotsReturn = fillSlot(slot, false, newKey);
-                       bool needsResize = fillSlotsReturn.getFirst();
-                       int newSize = fillSlotsReturn.getSecond();
-                       bool insertedNewKey = fillSlotsReturn.getThird();
+                       int newSize = 0;
+                       bool insertedNewKey = false;
+                       bool needsResize = fillSlot(slot, false, newKey, newSize, insertedNewKey);
 
                        if (needsResize) {
                                // Reset which transaction to send
@@ -752,7 +690,7 @@ bool Table::sendToServer(NewKey *newKey) {
                                transactionPartsSent->clear();
 
                                // We needed a resize so try again
-                               fillSlot(slot, true, newKey);
+                               fillSlot(slot, true, newKey, newSize, insertedNewKey);
                        }
 
                        lastSlotAttemptedToSend = slot;
@@ -769,10 +707,8 @@ bool Table::sendToServer(NewKey *newKey) {
 
                        if (sendSlotsReturn) {
                                // Did insert into the block chain
-
                                if (insertedNewKey) {
                                        // This slot was what was inserted not a previous slot
-
                                        // New Key was successfully inserted into the block chain so dont want to insert it again
                                        newKey = NULL;
                                }
@@ -785,7 +721,7 @@ bool Table::sendToServer(NewKey *newKey) {
                                        round->removeParts(pendingSendArbitrationEntriesToDelete);
 
                                        if (!round->isDoneSending()) {
-                                               // Sent all the parts
+                                               //Add part back in
                                                pendingSendArbitrationRounds->set(oldcount++,
                                                                                                                                                                                        pendingSendArbitrationRounds->get(i));
                                        }
@@ -837,7 +773,6 @@ bool Table::sendToServer(NewKey *newKey) {
                                validateAndUpdate(newSlots, true);
                        }
                }
-
        } catch (ServerException *e) {
                if (e->getType() != ServerException_TypeInputTimeout) {
                        // Nothing was able to be sent to the server so just clear these data structures
@@ -1147,6 +1082,40 @@ Array<char> *Table::acceptDataFromLocal(Array<char> *data) {
        return returnData;
 }
 
+/** Checks whether a given slot was sent using new slots in
+               array. Returns true if sent and false otherwise.  */
+
+bool Table::checkSend(Array<Slot *> * array, Slot *checkSlot) {
+       uint size = array->length();
+       for (uint i = 0; i < size; i++) {
+               Slot *s = array->get(i);
+               if ((s->getSequenceNumber() == checkSlot->getSequenceNumber()) && (s->getMachineID() == localMachineId)) {
+                       return true;
+               }
+       }
+       
+       //Also need to see if other machines acknowledged our message
+       for (uint i = 0; i < size; i++) {
+               Slot *s = array->get(i);
+               
+               // Process each entry in the slot
+               Vector<Entry *> *entries = s->getEntries();
+               uint eSize = entries->size();
+               for (uint ei = 0; ei < eSize; ei++) {
+                       Entry *entry = entries->get(ei);
+                       
+                       if (entry->getType() == TypeLastMessage) {
+                               LastMessage *lastMessage = (LastMessage *)entry;
+                               
+                               if ((lastMessage->getMachineID() == localMachineId) && (lastMessage->getSequenceNumber() == checkSlot->getSequenceNumber())) {
+                                       return true;
+                               }
+                       }
+               }
+       }
+       //Not found
+       return false;
+}
 
 /** Method tries to send slot to server.  Returns status in tuple.
                isInserted returns whether last un-acked send (if any) was
@@ -1169,39 +1138,8 @@ bool Table::sendSlotsToServer(Slot *slot, int newSize, bool isNewKey, bool *isIn
                }
 
                if (hadPartialSendToServer) {
-                       *isInserted = false;
-                       uint size = (*array)->length();
-                       for (uint i = 0; i < size; i++) {
-                               Slot *s = (*array)->get(i);
-                               if ((s->getSequenceNumber() == slot->getSequenceNumber()) && (s->getMachineID() == localMachineId)) {
-                                       *isInserted = true;
-                                       break;
-                               }
-                       }
+                       *isInserted = checkSend(*array, slot);
 
-                       //Also need to see if other machines acknowledged our message
-                       if (!(*isInserted)) {
-                               for (uint i = 0; i < size; i++) {
-                                       Slot *s = (*array)->get(i);
-                                       
-                                       // Process each entry in the slot
-                                       Vector<Entry *> *entries = s->getEntries();
-                                       uint eSize = entries->size();
-                                       for (uint ei = 0; ei < eSize; ei++) {
-                                               Entry *entry = entries->get(ei);
-                                               
-                                               if (entry->getType() == TypeLastMessage) {
-                                                       LastMessage *lastMessage = (LastMessage *)entry;
-                                                       
-                                                       if ((lastMessage->getMachineID() == localMachineId) && (lastMessage->getSequenceNumber() == slot->getSequenceNumber())) {
-                                                               *isInserted = true;
-                                                               goto done;
-                                                       }
-                                               }
-                                       }
-                               }
-                       }
-               done:
                        if (!(*isInserted)) {
                                rejectedSlotVector->add(slot->getSequenceNumber());
                        }
@@ -1216,10 +1154,10 @@ bool Table::sendSlotsToServer(Slot *slot, int newSize, bool isNewKey, bool *isIn
 }
 
 /**
- * Returns false if a resize was needed
+ * Returns true if a resize was needed but not done.
  */
-ThreeTuple<bool, int32_t, bool> Table::fillSlot(Slot *slot, bool resize, NewKey *newKeyEntry) {
-       int newSize = 0;
+bool Table::fillSlot(Slot *slot, bool resize, NewKey *newKeyEntry, int & newSize, bool & insertedKey) {
+       newSize = 0;//special value to indicate no resize
        if (liveSlotCount > bufferResizeThreshold) {
                resize = true;//Resize is forced
        }
@@ -1242,16 +1180,16 @@ ThreeTuple<bool, int32_t, bool> Table::fillSlot(Slot *slot, bool resize, NewKey
        int64_t currentRescueSequenceNumber = mandatoryRescueReturn.getThird();
 
        if (needsResize && !resize) {
-               // We need to resize but we are not resizing so return false
-               return ThreeTuple<bool, int32_t, bool>(true, NULL, NULL);
+               // We need to resize but we are not resizing so return true to force on retry
+               return true;
        }
 
-       bool inserted = false;
+       insertedKey = false;
        if (newKeyEntry != NULL) {
                newKeyEntry->setSlot(slot);
                if (slot->hasSpace(newKeyEntry)) {
                        slot->addEntry(newKeyEntry);
-                       inserted = true;
+                       insertedKey = true;
                }
        }
 
@@ -1323,7 +1261,7 @@ ThreeTuple<bool, int32_t, bool> Table::fillSlot(Slot *slot, bool resize, NewKey
        // Fill the remainder of the slot with rescue data
        doOptionalRescue(slot, seenLiveSlot, currentRescueSequenceNumber, resize);
 
-       return ThreeTuple<bool, int32_t, bool>(false, newSize, inserted);
+       return false;
 }
 
 void Table::doRejectedMessages(Slot *s) {
@@ -1673,7 +1611,6 @@ void Table::processNewTransactionParts() {
 }
 
 void Table::arbitrateFromServer() {
-
        if (liveTransactionBySequenceNumberTable->size() == 0) {
                // Nothing to arbitrate on so move on
                return;
@@ -2022,7 +1959,6 @@ bool Table::compactArbitrationData() {
  * transactions
  */
 bool Table::updateCommittedTable() {
-
        if (newCommitParts->size() == 0) {
                // Nothing new to process
                return false;
@@ -2384,7 +2320,8 @@ void Table::updateLiveTransactionsAndStatus() {
                        Transaction *transaction = liveTransactionBySequenceNumberTable->get(key);
 
                        // Check if the transaction is dead
-                       if (lastArbitratedTransactionNumberByArbitratorTable->contains(transaction->getArbitrator()) && lastArbitratedTransactionNumberByArbitratorTable->get(transaction->getArbitrator() >= transaction->getSequenceNumber())) {
+                       if (lastArbitratedTransactionNumberByArbitratorTable->contains(transaction->getArbitrator())
+                                       && lastArbitratedTransactionNumberByArbitratorTable->get(transaction->getArbitrator()) >= transaction->getSequenceNumber()) {
                                // Set dead the transaction
                                transaction->setDead();
 
@@ -2404,8 +2341,8 @@ void Table::updateLiveTransactionsAndStatus() {
                        TransactionStatus *status = outstandingTransactionStatus->get(key);
 
                        // Check if the transaction is dead
-                       if (lastArbitratedTransactionNumberByArbitratorTable->contains(status->getTransactionArbitrator()) && (lastArbitratedTransactionNumberByArbitratorTable->get(status->getTransactionArbitrator()) >= status->getTransactionSequenceNumber())) {
-
+                       if (lastArbitratedTransactionNumberByArbitratorTable->contains(status->getTransactionArbitrator())
+                                       && (lastArbitratedTransactionNumberByArbitratorTable->get(status->getTransactionArbitrator()) >= status->getTransactionSequenceNumber())) {
                                // Set committed
                                status->setStatus(TransactionStatus_StatusCommitted);
 
index 99abb55..59ba472 100644 (file)
@@ -98,13 +98,15 @@ private:
        void setResizeThreshold();
        bool sendToServer(NewKey *newKey);
        NewKey * handlePartialSend(NewKey * newKey);
+       bool checkSend(Array<Slot *> * array, Slot *checkSlot);
+
        bool updateFromLocal(int64_t machineId);
        Pair<bool, bool> sendTransactionToLocal(Transaction *transaction);
        bool sendSlotsToServer(Slot *slot, int newSize, bool isNewKey, bool * wasInserted, Array<Slot *> **array);
        /**
         * Returns false if a resize was needed
         */
-       ThreeTuple<bool, int32_t, bool> fillSlot(Slot *slot, bool resize, NewKey *newKeyEntry);
+       bool fillSlot(Slot *slot, bool resize, NewKey *newKeyEntry, int & newSize, bool & insertedKey);
        void doRejectedMessages(Slot *s);
 
        ThreeTuple<bool, bool, int64_t> doMandatoryResuce(Slot *slot, bool resize);
index 91d1e5a..868060c 100644 (file)
@@ -261,6 +261,7 @@ public:
                        tail = search;
                else
                        list->prev = search;
+               list = search;
                Size++;
                return (_Val) 0;
        }