buffer = new SlotBuffer();
// init data structs
- committedKeyValueTable = new Hashtable<IoTString *, KeyValue *>();
- speculatedKeyValueTable = new Hashtable<IoTString *, KeyValue *>();
- pendingTransactionSpeculatedKeyValueTable = new Hashtable<IoTString *, KeyValue *>();
- liveNewKeyTable = new Hashtable<IoTString *, NewKey *>();
+ committedKeyValueTable = new Hashtable<IoTString *, KeyValue *, uintptr_t, 0, hashString, StringEquals>();
+ speculatedKeyValueTable = new Hashtable<IoTString *, KeyValue *, uintptr_t, 0, hashString, StringEquals>();
+ pendingTransactionSpeculatedKeyValueTable = new Hashtable<IoTString *, KeyValue *, uintptr_t, 0, hashString, StringEquals>();
+ liveNewKeyTable = new Hashtable<IoTString *, NewKey *, uintptr_t, 0, hashString, StringEquals >();
lastMessageTable = new Hashtable<int64_t, Pair<int64_t, Liveness *> * >();
rejectedMessageWatchVectorTable = new Hashtable<int64_t, Hashset<RejectedMessage *> * >();
arbitratorTable = new Hashtable<IoTString *, int64_t, uintptr_t, 0, hashString, StringEquals>();
liveTransactionBySequenceNumberTable = new Hashtable<int64_t, Transaction *>();
liveTransactionByTransactionIdTable = new Hashtable<Pair<int64_t, int64_t> *, Transaction *, uintptr_t, 0, pairHashFunction, pairEquals>();
liveCommitsTable = new Hashtable<int64_t, Hashtable<int64_t, Commit *> * >();
- liveCommitsByKeyTable = new Hashtable<IoTString *, Commit *>();
+ liveCommitsByKeyTable = new Hashtable<IoTString *, Commit *, uintptr_t, 0, hashString, StringEquals>();
lastCommitSeenSequenceNumberByArbitratorTable = new Hashtable<int64_t, int64_t>();
rejectedSlotVector = new Vector<int64_t>();
pendingTransactionQueue = new Vector<Transaction *>();
return localSequenceNumber;
}
-bool Table::sendToServer(NewKey *newKey) {
- bool fromRetry = false;
- try {
- if (hadPartialSendToServer) {
- Array<Slot *> *newSlots = cloud->getSlots(sequenceNumber + 1);
- if (newSlots->length() == 0) {
- fromRetry = true;
- ThreeTuple<bool, bool, Array<Slot *> *> sendSlotsReturn = sendSlotsToServer(lastSlotAttemptedToSend, lastNewSize, lastIsNewKey);
-
- if (sendSlotsReturn.getFirst()) {
- if (newKey != NULL) {
- if (lastInsertedNewKey && (lastNewKey->getKey() == newKey->getKey()) && (lastNewKey->getMachineID() == newKey->getMachineID())) {
- newKey = NULL;
- }
- }
-
- SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(lastTransactionPartsSent);
- while (trit->hasNext()) {
- Transaction *transaction = trit->next();
- transaction->resetServerFailure();
- // Update which transactions parts still need to be sent
- transaction->removeSentParts(lastTransactionPartsSent->get(transaction));
- // Add the transaction status to the outstanding list
- outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus());
-
- // Update the transaction status
- transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial);
-
- // Check if all the transaction parts were successfully
- // sent and if so then remove it from pending
- if (transaction->didSendAllParts()) {
- transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully);
- pendingTransactionQueue->remove(transaction);
- }
- }
- delete trit;
- } else {
- newSlots = sendSlotsReturn.getThird();
- bool isInserted = false;
- for (uint si = 0; si < newSlots->length(); si++) {
- Slot *s = newSlots->get(si);
- if ((s->getSequenceNumber() == lastSlotAttemptedToSend->getSequenceNumber()) && (s->getMachineID() == localMachineId)) {
+NewKey * Table::handlePartialSend(NewKey * newKey) {
+ //Didn't receive acknowledgement for last send
+ //See if the server has received a newer slot
+
+ Array<Slot *> *newSlots = cloud->getSlots(sequenceNumber + 1);
+ if (newSlots->length() == 0) {
+ //Retry sending old slot
+ bool wasInserted = false;
+ bool sendSlotsReturn = sendSlotsToServer(lastSlotAttemptedToSend, lastNewSize, lastIsNewKey, &wasInserted, &newSlots);
+
+ if (sendSlotsReturn) {
+ if (newKey != NULL) {
+ if (lastInsertedNewKey && (lastNewKey->getKey() == newKey->getKey()) && (lastNewKey->getMachineID() == newKey->getMachineID())) {
+ newKey = NULL;
+ }
+ }
+
+ SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(lastTransactionPartsSent);
+ while (trit->hasNext()) {
+ Transaction *transaction = trit->next();
+ transaction->resetServerFailure();
+ // Update which transactions parts still need to be sent
+ transaction->removeSentParts(lastTransactionPartsSent->get(transaction));
+ // Add the transaction status to the outstanding list
+ outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus());
+
+ // Update the transaction status
+ transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial);
+
+ // Check if all the transaction parts were successfully
+ // sent and if so then remove it from pending
+ if (transaction->didSendAllParts()) {
+ transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully);
+ pendingTransactionQueue->remove(transaction);
+ }
+ }
+ delete trit;
+ } else {
+ bool isInserted = false;
+ for (uint si = 0; si < newSlots->length(); si++) {
+ Slot *s = newSlots->get(si);
+ if ((s->getSequenceNumber() == lastSlotAttemptedToSend->getSequenceNumber()) && (s->getMachineID() == localMachineId)) {
+ isInserted = true;
+ break;
+ }
+ }
+
+ for (uint si = 0; si < newSlots->length(); si++) {
+ Slot *s = newSlots->get(si);
+ if (isInserted) {
+ break;
+ }
+
+ // Process each entry in the slot
+ Vector<Entry *> *ventries = s->getEntries();
+ uint vesize = ventries->size();
+ for (uint vei = 0; vei < vesize; vei++) {
+ Entry *entry = ventries->get(vei);
+ if (entry->getType() == TypeLastMessage) {
+ LastMessage *lastMessage = (LastMessage *)entry;
+ if ((lastMessage->getMachineID() == localMachineId) && (lastMessage->getSequenceNumber() == lastSlotAttemptedToSend->getSequenceNumber())) {
isInserted = true;
break;
}
}
-
- for (uint si = 0; si < newSlots->length(); si++) {
- Slot *s = newSlots->get(si);
- if (isInserted) {
- break;
- }
-
- // Process each entry in the slot
- Vector<Entry *> *ventries = s->getEntries();
- uint vesize = ventries->size();
- for (uint vei = 0; vei < vesize; vei++) {
- Entry *entry = ventries->get(vei);
- if (entry->getType() == TypeLastMessage) {
- LastMessage *lastMessage = (LastMessage *)entry;
- if ((lastMessage->getMachineID() == localMachineId) && (lastMessage->getSequenceNumber() == lastSlotAttemptedToSend->getSequenceNumber())) {
- isInserted = true;
- break;
- }
- }
- }
- }
-
- if (isInserted) {
- if (newKey != NULL) {
- if (lastInsertedNewKey && (lastNewKey->getKey() == newKey->getKey()) && (lastNewKey->getMachineID() == newKey->getMachineID())) {
- newKey = NULL;
- }
- }
-
- SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(lastTransactionPartsSent);
- while (trit->hasNext()) {
- Transaction *transaction = trit->next();
- transaction->resetServerFailure();
-
- // Update which transactions parts still need to be sent
- transaction->removeSentParts(lastTransactionPartsSent->get(transaction));
-
- // Add the transaction status to the outstanding list
- outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus());
-
- // Update the transaction status
- transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial);
-
- // Check if all the transaction parts were successfully sent and if so then remove it from pending
- if (transaction->didSendAllParts()) {
- transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully);
- pendingTransactionQueue->remove(transaction);
- } else {
- transaction->resetServerFailure();
- // Set the transaction sequence number back to nothing
- if (!transaction->didSendAPartToServer()) {
- transaction->setSequenceNumber(-1);
- }
- }
- }
- delete trit;
+ }
+ }
+
+ if (isInserted) {
+ if (newKey != NULL) {
+ if (lastInsertedNewKey && (lastNewKey->getKey() == newKey->getKey()) && (lastNewKey->getMachineID() == newKey->getMachineID())) {
+ newKey = NULL;
}
}
-
+
SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(lastTransactionPartsSent);
while (trit->hasNext()) {
Transaction *transaction = trit->next();
transaction->resetServerFailure();
- // Set the transaction sequence number back to nothing
- if (!transaction->didSendAPartToServer()) {
- transaction->setSequenceNumber(-1);
+
+ // Update which transactions parts still need to be sent
+ transaction->removeSentParts(lastTransactionPartsSent->get(transaction));
+
+ // Add the transaction status to the outstanding list
+ outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus());
+
+ // Update the transaction status
+ transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial);
+
+ // Check if all the transaction parts were successfully sent and if so then remove it from pending
+ if (transaction->didSendAllParts()) {
+ transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully);
+ pendingTransactionQueue->remove(transaction);
+ } else {
+ transaction->resetServerFailure();
+ // Set the transaction sequence number back to nothing
+ if (!transaction->didSendAPartToServer()) {
+ transaction->setSequenceNumber(-1);
+ }
}
}
delete trit;
-
- if (sendSlotsReturn.getThird()->length() != 0) {
- // insert into the local block chain
- validateAndUpdate(sendSlotsReturn.getThird(), true);
- }
- // continue;
- } else {
- bool isInserted = false;
- for (uint si = 0; si < newSlots->length(); si++) {
- Slot *s = newSlots->get(si);
- if ((s->getSequenceNumber() == lastSlotAttemptedToSend->getSequenceNumber()) && (s->getMachineID() == localMachineId)) {
+ }
+ }
+
+ SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(lastTransactionPartsSent);
+ while (trit->hasNext()) {
+ Transaction *transaction = trit->next();
+ transaction->resetServerFailure();
+ // Set the transaction sequence number back to nothing
+ if (!transaction->didSendAPartToServer()) {
+ transaction->setSequenceNumber(-1);
+ }
+ }
+ delete trit;
+
+ if (newSlots->length() != 0) {
+ // insert into the local block chain
+ validateAndUpdate(newSlots, true);
+ }
+ // continue;
+ } else {
+ bool isInserted = false;
+ for (uint si = 0; si < newSlots->length(); si++) {
+ Slot *s = newSlots->get(si);
+ if ((s->getSequenceNumber() == lastSlotAttemptedToSend->getSequenceNumber()) && (s->getMachineID() == localMachineId)) {
+ isInserted = true;
+ break;
+ }
+ }
+
+ for (uint si = 0; si < newSlots->length(); si++) {
+ Slot *s = newSlots->get(si);
+ if (isInserted) {
+ break;
+ }
+
+ // Process each entry in the slot
+ Vector<Entry *> *entries = s->getEntries();
+ uint eSize = entries->size();
+ for (uint ei = 0; ei < eSize; ei++) {
+ Entry *entry = entries->get(ei);
+
+ if (entry->getType() == TypeLastMessage) {
+ LastMessage *lastMessage = (LastMessage *)entry;
+ if ((lastMessage->getMachineID() == localMachineId) && (lastMessage->getSequenceNumber() == lastSlotAttemptedToSend->getSequenceNumber())) {
isInserted = true;
break;
}
}
-
- for (uint si = 0; si < newSlots->length(); si++) {
- Slot *s = newSlots->get(si);
- if (isInserted) {
- break;
- }
-
- // Process each entry in the slot
- Vector<Entry *> *entries = s->getEntries();
- uint eSize = entries->size();
- for (uint ei = 0; ei < eSize; ei++) {
- Entry *entry = entries->get(ei);
-
- if (entry->getType() == TypeLastMessage) {
- LastMessage *lastMessage = (LastMessage *)entry;
- if ((lastMessage->getMachineID() == localMachineId) && (lastMessage->getSequenceNumber() == lastSlotAttemptedToSend->getSequenceNumber())) {
- isInserted = true;
- break;
- }
- }
- }
+ }
+ }
+
+ if (isInserted) {
+ if (newKey != NULL) {
+ if (lastInsertedNewKey && (lastNewKey->getKey() == newKey->getKey()) && (lastNewKey->getMachineID() == newKey->getMachineID())) {
+ newKey = NULL;
}
-
- if (isInserted) {
- if (newKey != NULL) {
- if (lastInsertedNewKey && (lastNewKey->getKey() == newKey->getKey()) && (lastNewKey->getMachineID() == newKey->getMachineID())) {
- newKey = NULL;
- }
- }
-
- SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(lastTransactionPartsSent);
- while (trit->hasNext()) {
- Transaction *transaction = trit->next();
- transaction->resetServerFailure();
-
- // Update which transactions parts still need to be sent
- transaction->removeSentParts(lastTransactionPartsSent->get(transaction));
-
- // Add the transaction status to the outstanding list
- outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus());
-
- // Update the transaction status
- transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial);
-
- // Check if all the transaction parts were successfully sent and if so then remove it from pending
- if (transaction->didSendAllParts()) {
- transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully);
- pendingTransactionQueue->remove(transaction);
- } else {
- transaction->resetServerFailure();
- // Set the transaction sequence number back to nothing
- if (!transaction->didSendAPartToServer()) {
- transaction->setSequenceNumber(-1);
- }
- }
- }
- delete trit;
+ }
+
+ SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(lastTransactionPartsSent);
+ while (trit->hasNext()) {
+ Transaction *transaction = trit->next();
+ transaction->resetServerFailure();
+
+ // Update which transactions parts still need to be sent
+ transaction->removeSentParts(lastTransactionPartsSent->get(transaction));
+
+ // Add the transaction status to the outstanding list
+ outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus());
+
+ // Update the transaction status
+ transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial);
+
+ // Check if all the transaction parts were successfully sent and if so then remove it from pending
+ if (transaction->didSendAllParts()) {
+ transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully);
+ pendingTransactionQueue->remove(transaction);
} else {
- SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(lastTransactionPartsSent);
- while (trit->hasNext()) {
- Transaction *transaction = trit->next();
- transaction->resetServerFailure();
- // Set the transaction sequence number back to nothing
- if (!transaction->didSendAPartToServer()) {
- transaction->setSequenceNumber(-1);
- }
+ transaction->resetServerFailure();
+ // Set the transaction sequence number back to nothing
+ if (!transaction->didSendAPartToServer()) {
+ transaction->setSequenceNumber(-1);
}
- delete trit;
}
-
- // insert into the local block chain
- validateAndUpdate(newSlots, true);
}
+ delete trit;
+ } else {
+ SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(lastTransactionPartsSent);
+ while (trit->hasNext()) {
+ Transaction *transaction = trit->next();
+ transaction->resetServerFailure();
+ // Set the transaction sequence number back to nothing
+ if (!transaction->didSendAPartToServer()) {
+ transaction->setSequenceNumber(-1);
+ }
+ }
+ delete trit;
}
- } catch (ServerException *e) {
- throw e;
+
+ // insert into the local block chain
+ validateAndUpdate(newSlots, true);
}
+ return newKey;
+}
-
+bool Table::sendToServer(NewKey *newKey) {
+ if (hadPartialSendToServer) {
+ newKey = handlePartialSend(newKey);
+ }
try {
// While we have stuff that needs inserting into the block chain
while ((pendingTransactionQueue->size() > 0) || (pendingSendArbitrationRounds->size() > 0) || (newKey != NULL)) {
-
- fromRetry = false;
-
if (hadPartialSendToServer) {
throw new Error("Should Be error free");
}
-
-
-
+
// If there is a new key with same name then end
if ((newKey != NULL) && arbitratorTable->contains(newKey->getKey())) {
return false;
lastTransactionPartsSent = transactionPartsSent->clone();
lastPendingSendArbitrationEntriesToDelete = new Vector<Entry *>(pendingSendArbitrationEntriesToDelete);
- ThreeTuple<bool, bool, Array<Slot *> *> sendSlotsReturn = sendSlotsToServer(slot, newSize, newKey != NULL);
-
- if (sendSlotsReturn.getFirst()) {
+ Array<Slot *> * newSlots = NULL;
+ bool wasInserted = false;
+ bool sendSlotsReturn = sendSlotsToServer(slot, newSize, newKey != NULL, &wasInserted, &newSlots);
+ if (sendSlotsReturn) {
// Did insert into the block chain
if (insertedNewKey) {
pendingSendArbitrationEntriesToDelete->clear();
transactionPartsSent->clear();
- if (sendSlotsReturn.getThird()->length() != 0) {
+ if (newSlots->length() != 0) {
// insert into the local block chain
- validateAndUpdate(sendSlotsReturn.getThird(), true);
+ validateAndUpdate(newSlots, true);
}
}
return returnData;
}
-ThreeTuple<bool, bool, Array<Slot *> *> Table::sendSlotsToServer(Slot *slot, int newSize, bool isNewKey) {
- bool attemptedToSendToServerTmp = attemptedToSendToServer;
- attemptedToSendToServer = true;
- bool inserted = false;
- bool lastTryInserted = false;
+/** Method tries to send slot to server. Returns status in tuple.
+ isInserted returns whether last un-acked send (if any) was
+ successful. Returns whether send was confirmed.x
+ */
- Array<Slot *> *array = cloud->putSlot(slot, newSize);
- if (array == NULL) {
- array = new Array<Slot *>(1);
- array->set(0, slot);
+bool Table::sendSlotsToServer(Slot *slot, int newSize, bool isNewKey, bool *isInserted, Array<Slot *> **array) {
+ attemptedToSendToServer = true;
+
+ *array = cloud->putSlot(slot, newSize);
+ if (*array == NULL) {
+ *array = new Array<Slot *>(1);
+ (*array)->set(0, slot);
rejectedSlotVector->clear();
- inserted = true;
+ *isInserted = false;
+ return true;
} else {
- if (array->length() == 0) {
+ if ((*array)->length() == 0) {
throw new Error("Server Error: Did not send any slots");
}
- // if (attemptedToSendToServerTmp) {
if (hadPartialSendToServer) {
-
- bool isInserted = false;
- uint size = array->length();
+ *isInserted = false;
+ uint size = (*array)->length();
for (uint i = 0; i < size; i++) {
- Slot *s = array->get(i);
+ Slot *s = (*array)->get(i);
if ((s->getSequenceNumber() == slot->getSequenceNumber()) && (s->getMachineID() == localMachineId)) {
- isInserted = true;
+ *isInserted = true;
break;
}
}
- for (uint i = 0; i < size; i++) {
- Slot *s = array->get(i);
- if (isInserted) {
- break;
- }
-
- // Process each entry in the slot
- Vector<Entry *> *entries = s->getEntries();
- uint eSize = entries->size();
- for (uint ei = 0; ei < eSize; ei++) {
- Entry *entry = entries->get(ei);
-
- if (entry->getType() == TypeLastMessage) {
- LastMessage *lastMessage = (LastMessage *)entry;
-
- if ((lastMessage->getMachineID() == localMachineId) && (lastMessage->getSequenceNumber() == slot->getSequenceNumber())) {
- isInserted = true;
- break;
+ //Also need to see if other machines acknowledged our message
+ if (!(*isInserted)) {
+ for (uint i = 0; i < size; i++) {
+ Slot *s = (*array)->get(i);
+
+ // Process each entry in the slot
+ Vector<Entry *> *entries = s->getEntries();
+ uint eSize = entries->size();
+ for (uint ei = 0; ei < eSize; ei++) {
+ Entry *entry = entries->get(ei);
+
+ if (entry->getType() == TypeLastMessage) {
+ LastMessage *lastMessage = (LastMessage *)entry;
+
+ if ((lastMessage->getMachineID() == localMachineId) && (lastMessage->getSequenceNumber() == slot->getSequenceNumber())) {
+ *isInserted = true;
+ goto done;
+ }
}
}
}
}
-
- if (!isInserted) {
+ done:
+ if (!(*isInserted)) {
rejectedSlotVector->add(slot->getSequenceNumber());
- lastTryInserted = false;
- } else {
- lastTryInserted = true;
}
+
+ return false;
} else {
rejectedSlotVector->add(slot->getSequenceNumber());
- lastTryInserted = false;
+ *isInserted = false;
+ return false;
}
}
-
- return ThreeTuple<bool, bool, Array<Slot *> *>(inserted, lastTryInserted, array);
}
/**
qsort(transactionSequenceNumbers->expose(), transactionSequenceNumbers->size(), sizeof(int64_t), compareInt64);
// Collection of key value pairs that are
- Hashtable<IoTString *, KeyValue *> *speculativeTableTmp = new Hashtable<IoTString *, KeyValue *>();
+ Hashtable<IoTString *, KeyValue *, uintptr_t, 0, hashString, StringEquals> *speculativeTableTmp = new Hashtable<IoTString *, KeyValue *, uintptr_t, 0, hashString, StringEquals>();
// The last transaction arbitrated on
int64_t lastTransactionCommitted = -1;
localArbitrationSequenceNumber++;
// Add all the new keys to the commit
- SetIterator<IoTString *, KeyValue *> *spit = getKeyIterator(speculativeTableTmp);
+ SetIterator<IoTString *, KeyValue *, uintptr_t, 0, hashString, StringEquals> *spit = getKeyIterator(speculativeTableTmp);
while (spit->hasNext()) {
IoTString *string = spit->next();
KeyValue *kv = speculativeTableTmp->get(string);