From: bdemsky Date: Mon, 12 Mar 2018 16:03:25 +0000 (-0700) Subject: edits X-Git-Url: http://plrg.eecs.uci.edu/git/?p=iotcloud.git;a=commitdiff_plain;h=5849d63d12e23059d5245f142e4aad2ab8d08f4e edits --- diff --git a/version2/src/C/Table.cc b/version2/src/C/Table.cc index 7c3fde0..88f43ee 100644 --- a/version2/src/C/Table.cc +++ b/version2/src/C/Table.cc @@ -538,37 +538,7 @@ NewKey * Table::handlePartialSend(NewKey * newKey) { } delete trit; } else { - bool isInserted = false; - for (uint si = 0; si < newSlots->length(); si++) { - Slot *s = newSlots->get(si); - if ((s->getSequenceNumber() == lastSlotAttemptedToSend->getSequenceNumber()) && (s->getMachineID() == localMachineId)) { - isInserted = true; - break; - } - } - - for (uint si = 0; si < newSlots->length(); si++) { - Slot *s = newSlots->get(si); - if (isInserted) { - break; - } - - // Process each entry in the slot - Vector *ventries = s->getEntries(); - uint vesize = ventries->size(); - for (uint vei = 0; vei < vesize; vei++) { - Entry *entry = ventries->get(vei); - if (entry->getType() == TypeLastMessage) { - LastMessage *lastMessage = (LastMessage *)entry; - if ((lastMessage->getMachineID() == localMachineId) && (lastMessage->getSequenceNumber() == lastSlotAttemptedToSend->getSequenceNumber())) { - isInserted = true; - break; - } - } - } - } - - if (isInserted) { + if (checkSend(newSlots, lastSlotAttemptedToSend)) { if (newKey != NULL) { if (lastInsertedNewKey && (lastNewKey->getKey() == newKey->getKey()) && (lastNewKey->getMachineID() == newKey->getMachineID())) { newKey = NULL; @@ -622,38 +592,7 @@ NewKey * Table::handlePartialSend(NewKey * newKey) { } // continue; } else { - bool isInserted = false; - for (uint si = 0; si < newSlots->length(); si++) { - Slot *s = newSlots->get(si); - if ((s->getSequenceNumber() == lastSlotAttemptedToSend->getSequenceNumber()) && (s->getMachineID() == localMachineId)) { - isInserted = true; - break; - } - } - - for (uint si = 0; si < newSlots->length(); si++) { - Slot *s = newSlots->get(si); - if (isInserted) { - break; - } - - // Process each entry in the slot - Vector *entries = s->getEntries(); - uint eSize = entries->size(); - for (uint ei = 0; ei < eSize; ei++) { - Entry *entry = entries->get(ei); - - if (entry->getType() == TypeLastMessage) { - LastMessage *lastMessage = (LastMessage *)entry; - if ((lastMessage->getMachineID() == localMachineId) && (lastMessage->getSequenceNumber() == lastSlotAttemptedToSend->getSequenceNumber())) { - isInserted = true; - break; - } - } - } - } - - if (isInserted) { + if (checkSend(newSlots, lastSlotAttemptedToSend)) { if (newKey != NULL) { if (lastInsertedNewKey && (lastNewKey->getKey() == newKey->getKey()) && (lastNewKey->getMachineID() == newKey->getMachineID())) { newKey = NULL; @@ -728,10 +667,9 @@ bool Table::sendToServer(NewKey *newKey) { localSequenceNumber++; // Try to fill the slot with data - ThreeTuple fillSlotsReturn = fillSlot(slot, false, newKey); - bool needsResize = fillSlotsReturn.getFirst(); - int newSize = fillSlotsReturn.getSecond(); - bool insertedNewKey = fillSlotsReturn.getThird(); + int newSize = 0; + bool insertedNewKey = false; + bool needsResize = fillSlot(slot, false, newKey, newSize, insertedNewKey); if (needsResize) { // Reset which transaction to send @@ -752,7 +690,7 @@ bool Table::sendToServer(NewKey *newKey) { transactionPartsSent->clear(); // We needed a resize so try again - fillSlot(slot, true, newKey); + fillSlot(slot, true, newKey, newSize, insertedNewKey); } lastSlotAttemptedToSend = slot; @@ -769,10 +707,8 @@ bool Table::sendToServer(NewKey *newKey) { if (sendSlotsReturn) { // Did insert into the block chain - if (insertedNewKey) { // This slot was what was inserted not a previous slot - // New Key was successfully inserted into the block chain so dont want to insert it again newKey = NULL; } @@ -785,7 +721,7 @@ bool Table::sendToServer(NewKey *newKey) { round->removeParts(pendingSendArbitrationEntriesToDelete); if (!round->isDoneSending()) { - // Sent all the parts + //Add part back in pendingSendArbitrationRounds->set(oldcount++, pendingSendArbitrationRounds->get(i)); } @@ -837,7 +773,6 @@ bool Table::sendToServer(NewKey *newKey) { validateAndUpdate(newSlots, true); } } - } catch (ServerException *e) { if (e->getType() != ServerException_TypeInputTimeout) { // Nothing was able to be sent to the server so just clear these data structures @@ -1147,6 +1082,40 @@ Array *Table::acceptDataFromLocal(Array *data) { return returnData; } +/** Checks whether a given slot was sent using new slots in + array. Returns true if sent and false otherwise. */ + +bool Table::checkSend(Array * array, Slot *checkSlot) { + uint size = array->length(); + for (uint i = 0; i < size; i++) { + Slot *s = array->get(i); + if ((s->getSequenceNumber() == checkSlot->getSequenceNumber()) && (s->getMachineID() == localMachineId)) { + return true; + } + } + + //Also need to see if other machines acknowledged our message + for (uint i = 0; i < size; i++) { + Slot *s = array->get(i); + + // Process each entry in the slot + Vector *entries = s->getEntries(); + uint eSize = entries->size(); + for (uint ei = 0; ei < eSize; ei++) { + Entry *entry = entries->get(ei); + + if (entry->getType() == TypeLastMessage) { + LastMessage *lastMessage = (LastMessage *)entry; + + if ((lastMessage->getMachineID() == localMachineId) && (lastMessage->getSequenceNumber() == checkSlot->getSequenceNumber())) { + return true; + } + } + } + } + //Not found + return false; +} /** Method tries to send slot to server. Returns status in tuple. isInserted returns whether last un-acked send (if any) was @@ -1169,39 +1138,8 @@ bool Table::sendSlotsToServer(Slot *slot, int newSize, bool isNewKey, bool *isIn } if (hadPartialSendToServer) { - *isInserted = false; - uint size = (*array)->length(); - for (uint i = 0; i < size; i++) { - Slot *s = (*array)->get(i); - if ((s->getSequenceNumber() == slot->getSequenceNumber()) && (s->getMachineID() == localMachineId)) { - *isInserted = true; - break; - } - } + *isInserted = checkSend(*array, slot); - //Also need to see if other machines acknowledged our message - if (!(*isInserted)) { - for (uint i = 0; i < size; i++) { - Slot *s = (*array)->get(i); - - // Process each entry in the slot - Vector *entries = s->getEntries(); - uint eSize = entries->size(); - for (uint ei = 0; ei < eSize; ei++) { - Entry *entry = entries->get(ei); - - if (entry->getType() == TypeLastMessage) { - LastMessage *lastMessage = (LastMessage *)entry; - - if ((lastMessage->getMachineID() == localMachineId) && (lastMessage->getSequenceNumber() == slot->getSequenceNumber())) { - *isInserted = true; - goto done; - } - } - } - } - } - done: if (!(*isInserted)) { rejectedSlotVector->add(slot->getSequenceNumber()); } @@ -1216,10 +1154,10 @@ bool Table::sendSlotsToServer(Slot *slot, int newSize, bool isNewKey, bool *isIn } /** - * Returns false if a resize was needed + * Returns true if a resize was needed but not done. */ -ThreeTuple Table::fillSlot(Slot *slot, bool resize, NewKey *newKeyEntry) { - int newSize = 0; +bool Table::fillSlot(Slot *slot, bool resize, NewKey *newKeyEntry, int & newSize, bool & insertedKey) { + newSize = 0;//special value to indicate no resize if (liveSlotCount > bufferResizeThreshold) { resize = true;//Resize is forced } @@ -1242,16 +1180,16 @@ ThreeTuple Table::fillSlot(Slot *slot, bool resize, NewKey int64_t currentRescueSequenceNumber = mandatoryRescueReturn.getThird(); if (needsResize && !resize) { - // We need to resize but we are not resizing so return false - return ThreeTuple(true, NULL, NULL); + // We need to resize but we are not resizing so return true to force on retry + return true; } - bool inserted = false; + insertedKey = false; if (newKeyEntry != NULL) { newKeyEntry->setSlot(slot); if (slot->hasSpace(newKeyEntry)) { slot->addEntry(newKeyEntry); - inserted = true; + insertedKey = true; } } @@ -1323,7 +1261,7 @@ ThreeTuple Table::fillSlot(Slot *slot, bool resize, NewKey // Fill the remainder of the slot with rescue data doOptionalRescue(slot, seenLiveSlot, currentRescueSequenceNumber, resize); - return ThreeTuple(false, newSize, inserted); + return false; } void Table::doRejectedMessages(Slot *s) { @@ -1673,7 +1611,6 @@ void Table::processNewTransactionParts() { } void Table::arbitrateFromServer() { - if (liveTransactionBySequenceNumberTable->size() == 0) { // Nothing to arbitrate on so move on return; @@ -2022,7 +1959,6 @@ bool Table::compactArbitrationData() { * transactions */ bool Table::updateCommittedTable() { - if (newCommitParts->size() == 0) { // Nothing new to process return false; @@ -2384,7 +2320,8 @@ void Table::updateLiveTransactionsAndStatus() { Transaction *transaction = liveTransactionBySequenceNumberTable->get(key); // Check if the transaction is dead - if (lastArbitratedTransactionNumberByArbitratorTable->contains(transaction->getArbitrator()) && lastArbitratedTransactionNumberByArbitratorTable->get(transaction->getArbitrator() >= transaction->getSequenceNumber())) { + if (lastArbitratedTransactionNumberByArbitratorTable->contains(transaction->getArbitrator()) + && lastArbitratedTransactionNumberByArbitratorTable->get(transaction->getArbitrator()) >= transaction->getSequenceNumber()) { // Set dead the transaction transaction->setDead(); @@ -2404,8 +2341,8 @@ void Table::updateLiveTransactionsAndStatus() { TransactionStatus *status = outstandingTransactionStatus->get(key); // Check if the transaction is dead - if (lastArbitratedTransactionNumberByArbitratorTable->contains(status->getTransactionArbitrator()) && (lastArbitratedTransactionNumberByArbitratorTable->get(status->getTransactionArbitrator()) >= status->getTransactionSequenceNumber())) { - + if (lastArbitratedTransactionNumberByArbitratorTable->contains(status->getTransactionArbitrator()) + && (lastArbitratedTransactionNumberByArbitratorTable->get(status->getTransactionArbitrator()) >= status->getTransactionSequenceNumber())) { // Set committed status->setStatus(TransactionStatus_StatusCommitted); diff --git a/version2/src/C/Table.h b/version2/src/C/Table.h index 99abb55..59ba472 100644 --- a/version2/src/C/Table.h +++ b/version2/src/C/Table.h @@ -98,13 +98,15 @@ private: void setResizeThreshold(); bool sendToServer(NewKey *newKey); NewKey * handlePartialSend(NewKey * newKey); + bool checkSend(Array * array, Slot *checkSlot); + bool updateFromLocal(int64_t machineId); Pair sendTransactionToLocal(Transaction *transaction); bool sendSlotsToServer(Slot *slot, int newSize, bool isNewKey, bool * wasInserted, Array **array); /** * Returns false if a resize was needed */ - ThreeTuple fillSlot(Slot *slot, bool resize, NewKey *newKeyEntry); + bool fillSlot(Slot *slot, bool resize, NewKey *newKeyEntry, int & newSize, bool & insertedKey); void doRejectedMessages(Slot *s); ThreeTuple doMandatoryResuce(Slot *slot, bool resize); diff --git a/version2/src/C/hashtable.h b/version2/src/C/hashtable.h index 91d1e5a..868060c 100644 --- a/version2/src/C/hashtable.h +++ b/version2/src/C/hashtable.h @@ -261,6 +261,7 @@ public: tail = search; else list->prev = search; + list = search; Size++; return (_Val) 0; }