3 #include "SlotBuffer.h"
8 #include "PendingTransaction.h"
9 #include "TableStatus.h"
10 #include "TransactionStatus.h"
11 #include "Transaction.h"
12 #include "LastMessage.h"
16 Table::Table(IoTString *baseurl, IoTString *password, int64_t _localMachineId, int listeningPort) :
18 cloud(new CloudComm(this, baseurl, password, listeningPort)),
20 liveTableStatus(NULL),
21 pendingTransactionBuilder(NULL),
22 lastPendingTransactionSpeculatedOn(NULL),
23 firstPendingTransaction(NULL),
25 bufferResizeThreshold(0),
27 oldestLiveSlotSequenceNumver(1),
28 localMachineId(_localMachineId),
30 localTransactionSequenceNumber(0),
31 lastTransactionSequenceNumberSpeculatedOn(0),
32 oldestTransactionSequenceNumberSpeculatedOn(0),
33 localArbitrationSequenceNumber(0),
34 hadPartialSendToServer(false),
35 attemptedToSendToServer(false),
37 didFindTableStatus(false),
39 lastSlotAttemptedToSend(NULL),
42 lastTransactionPartsSent(NULL),
43 lastPendingSendArbitrationEntriesToDelete(NULL),
45 committedKeyValueTable(NULL),
46 speculatedKeyValueTable(NULL),
47 pendingTransactionSpeculatedKeyValueTable(NULL),
48 liveNewKeyTable(NULL),
49 lastMessageTable(NULL),
50 rejectedMessageWatchVectorTable(NULL),
51 arbitratorTable(NULL),
53 newTransactionParts(NULL),
55 lastArbitratedTransactionNumberByArbitratorTable(NULL),
56 liveTransactionBySequenceNumberTable(NULL),
57 liveTransactionByTransactionIdTable(NULL),
58 liveCommitsTable(NULL),
59 liveCommitsByKeyTable(NULL),
60 lastCommitSeenSequenceNumberByArbitratorTable(NULL),
61 rejectedSlotVector(NULL),
62 pendingTransactionQueue(NULL),
63 pendingSendArbitrationRounds(NULL),
64 pendingSendArbitrationEntriesToDelete(NULL),
65 transactionPartsSent(NULL),
66 outstandingTransactionStatus(NULL),
67 liveAbortsGeneratedByLocal(NULL),
68 offlineTransactionsCommittedAndAtServer(NULL),
69 localCommunicationTable(NULL),
70 lastTransactionSeenFromMachineFromServer(NULL),
71 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator(NULL),
72 lastInsertedNewKey(false),
78 Table::Table(CloudComm *_cloud, int64_t _localMachineId) :
82 liveTableStatus(NULL),
83 pendingTransactionBuilder(NULL),
84 lastPendingTransactionSpeculatedOn(NULL),
85 firstPendingTransaction(NULL),
87 bufferResizeThreshold(0),
89 oldestLiveSlotSequenceNumver(1),
90 localMachineId(_localMachineId),
92 localTransactionSequenceNumber(0),
93 lastTransactionSequenceNumberSpeculatedOn(0),
94 oldestTransactionSequenceNumberSpeculatedOn(0),
95 localArbitrationSequenceNumber(0),
96 hadPartialSendToServer(false),
97 attemptedToSendToServer(false),
99 didFindTableStatus(false),
101 lastSlotAttemptedToSend(NULL),
104 lastTransactionPartsSent(NULL),
105 lastPendingSendArbitrationEntriesToDelete(NULL),
107 committedKeyValueTable(NULL),
108 speculatedKeyValueTable(NULL),
109 pendingTransactionSpeculatedKeyValueTable(NULL),
110 liveNewKeyTable(NULL),
111 lastMessageTable(NULL),
112 rejectedMessageWatchVectorTable(NULL),
113 arbitratorTable(NULL),
114 liveAbortTable(NULL),
115 newTransactionParts(NULL),
116 newCommitParts(NULL),
117 lastArbitratedTransactionNumberByArbitratorTable(NULL),
118 liveTransactionBySequenceNumberTable(NULL),
119 liveTransactionByTransactionIdTable(NULL),
120 liveCommitsTable(NULL),
121 liveCommitsByKeyTable(NULL),
122 lastCommitSeenSequenceNumberByArbitratorTable(NULL),
123 rejectedSlotVector(NULL),
124 pendingTransactionQueue(NULL),
125 pendingSendArbitrationRounds(NULL),
126 pendingSendArbitrationEntriesToDelete(NULL),
127 transactionPartsSent(NULL),
128 outstandingTransactionStatus(NULL),
129 liveAbortsGeneratedByLocal(NULL),
130 offlineTransactionsCommittedAndAtServer(NULL),
131 localCommunicationTable(NULL),
132 lastTransactionSeenFromMachineFromServer(NULL),
133 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator(NULL),
134 lastInsertedNewKey(false),
141 * Init all the stuff needed for for table usage
144 // Init helper objects
145 random = new Random();
146 buffer = new SlotBuffer();
149 committedKeyValueTable = new Hashtable<IoTString *, KeyValue *>();
150 speculatedKeyValueTable = new Hashtable<IoTString *, KeyValue *>();
151 pendingTransactionSpeculatedKeyValueTable = new Hashtable<IoTString *, KeyValue *>();
152 liveNewKeyTable = new Hashtable<IoTString *, NewKey *>();
153 lastMessageTable = new Hashtable<int64_t, Pair<int64_t, Liveness *> *>();
154 rejectedMessageWatchVectorTable = new Hashtable<int64_t, Hashset<RejectedMessage *> * >();
155 arbitratorTable = new Hashtable<IoTString *, int64_t>();
156 liveAbortTable = new Hashtable<Pair<int64_t, int64_t> *, Abort *>();
157 newTransactionParts = new Hashtable<int64_t, Hashtable<Pair<int64_t, int32_t> *, TransactionPart *> *>();
158 newCommitParts = new Hashtable<int64_t, Hashtable<Pair<int64_t, int32_t> *, CommitPart *> *>();
159 lastArbitratedTransactionNumberByArbitratorTable = new Hashtable<int64_t, int64_t>();
160 liveTransactionBySequenceNumberTable = new Hashtable<int64_t, Transaction *>();
161 liveTransactionByTransactionIdTable = new Hashtable<Pair<int64_t, int64_t> *, Transaction *>();
162 liveCommitsTable = new Hashtable<int64_t, Hashtable<int64_t, Commit *> >();
163 liveCommitsByKeyTable = new Hashtable<IoTString *, Commit *>();
164 lastCommitSeenSequenceNumberByArbitratorTable = new Hashtable<int64_t, int64_t>();
165 rejectedSlotVector = new Vector<int64_t>();
166 pendingTransactionQueue = new Vector<Transaction *>();
167 pendingSendArbitrationEntriesToDelete = new Vector<Entry *>();
168 transactionPartsSent = new Hashtable<Transaction *, Vector<int32_t> *>();
169 outstandingTransactionStatus = new Hashtable<int64_t, TransactionStatus *>();
170 liveAbortsGeneratedByLocal = new Hashtable<int64_t, Abort *>();
171 offlineTransactionsCommittedAndAtServer = new Hashset<Pair<int64_t, int64_t> *>();
172 localCommunicationTable = new Hashtable<int64_t, Pair<IoTString *, int32_t> >();
173 lastTransactionSeenFromMachineFromServer = new Hashtable<int64_t, int64_t>();
174 pendingSendArbitrationRounds = new Vector<ArbitrationRound *>();
175 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator = new Hashtable<int64_t, int64_t>();
179 numberOfSlots = buffer->capacity();
180 setResizeThreshold();
184 * Initialize the table by inserting a table status as the first entry
185 * into the table status also initialize the crypto stuff.
187 void Table::initTable() {
188 cloud->initSecurity();
190 // Create the first insertion into the block chain which is the table status
191 Slot *s = new Slot(this, 1, localMachineId, localSequenceNumber);
192 localSequenceNumber++;
193 TableStatus *status = new TableStatus(s, numberOfSlots);
195 Array<Slot *> *array = cloud->putSlot(s, numberOfSlots);
198 array = new Array<Slot *>(1);
200 // update local block chain
201 validateAndUpdate(array, true);
202 } else if (array->length() == 1) {
203 // in case we did push the slot BUT we failed to init it
204 validateAndUpdate(array, true);
206 throw new Error("Error on initialization");
211 * Rebuild the table from scratch by pulling the latest block chain
214 void Table::rebuild() {
215 // Just pull the latest slots from the server
216 Array<Slot *> *newslots = cloud->getSlots(sequenceNumber + 1);
217 validateAndUpdate(newslots, true);
219 updateLiveTransactionsAndStatus();
222 void Table::addLocalCommunication(int64_t arbitrator, IoTString *hostName, int portNumber) {
223 localCommunicationTable->put(arbitrator, Pair<IoTString *, int32_t>(hostName, portNumber));
226 int64_t Table::getArbitrator(IoTString *key) {
227 return arbitratorTable->get(key);
230 void Table::close() {
234 IoTString *Table::getCommitted(IoTString *key) {
235 KeyValue *kv = committedKeyValueTable->get(key);
238 return kv->getValue();
244 IoTString *Table::getSpeculative(IoTString *key) {
245 KeyValue *kv = pendingTransactionSpeculatedKeyValueTable->get(key);
248 kv = speculatedKeyValueTable->get(key);
252 kv = committedKeyValueTable->get(key);
256 return kv->getValue();
262 IoTString *Table::getCommittedAtomic(IoTString *key) {
263 KeyValue *kv = committedKeyValueTable->get(key);
265 if (!arbitratorTable->contains(key)) {
266 throw new Error("Key not Found.");
269 // Make sure new key value pair matches the current arbitrator
270 if (!pendingTransactionBuilder->checkArbitrator(arbitratorTable->get(key))) {
271 // TODO: Maybe not throw en error
272 throw new Error("Not all Key Values Match Arbitrator.");
276 pendingTransactionBuilder->addKVGuard(new KeyValue(key, kv->getValue()));
277 return kv->getValue();
279 pendingTransactionBuilder->addKVGuard(new KeyValue(key, NULL));
284 IoTString *Table::getSpeculativeAtomic(IoTString *key) {
285 if (!arbitratorTable->contains(key)) {
286 throw new Error("Key not Found.");
289 // Make sure new key value pair matches the current arbitrator
290 if (!pendingTransactionBuilder->checkArbitrator(arbitratorTable->get(key))) {
291 // TODO: Maybe not throw en error
292 throw new Error("Not all Key Values Match Arbitrator.");
295 KeyValue *kv = pendingTransactionSpeculatedKeyValueTable->get(key);
298 kv = speculatedKeyValueTable->get(key);
302 kv = committedKeyValueTable->get(key);
306 pendingTransactionBuilder->addKVGuard(new KeyValue(key, kv->getValue()));
307 return kv->getValue();
309 pendingTransactionBuilder->addKVGuard(new KeyValue(key, NULL));
314 bool Table::update() {
316 Array<Slot *> *newSlots = cloud->getSlots(sequenceNumber + 1);
317 validateAndUpdate(newSlots, false);
321 updateLiveTransactionsAndStatus();
324 } catch (Exception *e) {
325 for (int64_t m : localCommunicationTable->keySet()) {
333 bool Table::createNewKey(IoTString *keyName, int64_t machineId) {
335 if (!arbitratorTable->contains(keyName)) {
336 // There is already an arbitrator
340 NewKey *newKey = new NewKey(NULL, keyName, machineId);
342 if (sendToServer(newKey)) {
343 // If successfully inserted
349 void Table::startTransaction() {
350 // Create a new transaction, invalidates any old pending transactions.
351 pendingTransactionBuilder = new PendingTransaction(localMachineId);
354 void Table::addKV(IoTString *key, IoTString *value) {
356 // Make sure it is a valid key
357 if (!arbitratorTable->contains(key)) {
358 throw new Error("Key not Found.");
361 // Make sure new key value pair matches the current arbitrator
362 if (!pendingTransactionBuilder->checkArbitrator(arbitratorTable->get(key))) {
363 // TODO: Maybe not throw en error
364 throw new Error("Not all Key Values Match Arbitrator.");
367 // Add the key value to this transaction
368 KeyValue *kv = new KeyValue(key, value);
369 pendingTransactionBuilder->addKV(kv);
372 TransactionStatus *Table::commitTransaction() {
374 if (pendingTransactionBuilder->getKVUpdates()->size() == 0) {
375 // transaction with no updates will have no effect on the system
376 return new TransactionStatus(TransactionStatus_StatusNoEffect, -1);
379 // Set the local transaction sequence number and increment
380 pendingTransactionBuilder->setClientLocalSequenceNumber(localTransactionSequenceNumber);
381 localTransactionSequenceNumber++;
383 // Create the transaction status
384 TransactionStatus *transactionStatus = new TransactionStatus(TransactionStatus_StatusPending, pendingTransactionBuilder->getArbitrator());
386 // Create the new transaction
387 Transaction *newTransaction = pendingTransactionBuilder->createTransaction();
388 newTransaction->setTransactionStatus(transactionStatus);
390 if (pendingTransactionBuilder->getArbitrator() != localMachineId) {
391 // Add it to the queue and invalidate the builder for safety
392 pendingTransactionQueue->add(newTransaction);
394 arbitrateOnLocalTransaction(newTransaction);
395 updateLiveStateFromLocal();
398 pendingTransactionBuilder = new PendingTransaction(localMachineId);
402 } catch (ServerException *e) {
404 Hashset<int64_t> *arbitratorTriedAndFailed = new Hashset<int64_t>();
405 for (Iterator<Transaction *> *iter = pendingTransactionQueue->iterator(); iter->hasNext(); ) {
406 Transaction *transaction = iter->next();
408 if (arbitratorTriedAndFailed->contains(transaction->getArbitrator())) {
409 // Already contacted this client so ignore all attempts to contact this client
410 // to preserve ordering for arbitrator
414 Pair<bool, bool> *sendReturn = sendTransactionToLocal(transaction);
416 if (sendReturn->getFirst()) {
417 // Failed to contact over local
418 arbitratorTriedAndFailed->add(transaction->getArbitrator());
420 // Successful contact or should not contact
422 if (sendReturn->getSecond()) {
430 updateLiveStateFromLocal();
432 return transactionStatus;
436 * Recalculate the new resize threshold
438 void Table::setResizeThreshold() {
439 int resizeLower = (int) (Table_RESIZE_THRESHOLD * numberOfSlots);
440 bufferResizeThreshold = resizeLower - 1 + random->nextInt(numberOfSlots - resizeLower);
443 int64_t Table::getLocalSequenceNumber() {
444 return localSequenceNumber;
448 bool lastInsertedNewKey = false;
450 bool Table::sendToServer(NewKey *newKey) {
452 bool fromRetry = false;
455 if (hadPartialSendToServer) {
456 Array<Slot *> *newSlots = cloud->getSlots(sequenceNumber + 1);
457 if (newSlots->length() == 0) {
459 ThreeTuple<bool, bool, Array<Slot *> *> *sendSlotsReturn = sendSlotsToServer(lastSlotAttemptedToSend, lastNewSize, lastIsNewKey);
461 if (sendSlotsReturn->getFirst()) {
462 if (newKey != NULL) {
463 if (lastInsertedNewKey && (lastNewKey->getKey() == newKey->getKey()) && (lastNewKey->getMachineID() == newKey->getMachineID())) {
468 for (Transaction *transaction : lastTransactionPartsSent->keySet()) {
469 transaction->resetServerFailure();
471 // Update which transactions parts still need to be sent
472 transaction->removeSentParts(lastTransactionPartsSent->get(transaction));
474 // Add the transaction status to the outstanding list
475 outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus());
477 // Update the transaction status
478 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial);
480 // Check if all the transaction parts were successfully sent and if so then remove it from pending
481 if (transaction->didSendAllParts()) {
482 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully);
483 pendingTransactionQueue->remove(transaction);
488 newSlots = sendSlotsReturn->getThird();
490 bool isInserted = false;
491 for (uint si = 0; si < newSlots->length(); si++) {
492 Slot *s = newSlots->get(si);
493 if ((s->getSequenceNumber() == lastSlotAttemptedToSend->getSequenceNumber()) && (s->getMachineID() == localMachineId)) {
499 for (uint si = 0; si < newSlots->length(); si++) {
500 Slot *s = newSlots->get(si);
505 // Process each entry in the slot
506 for (Entry *entry : s->getEntries()) {
507 if (entry->getType() == TypeLastMessage) {
508 LastMessage *lastMessage = (LastMessage *)entry;
509 if ((lastMessage->getMachineID() == localMachineId) && (lastMessage->getSequenceNumber() == lastSlotAttemptedToSend->getSequenceNumber())) {
518 if (newKey != NULL) {
519 if (lastInsertedNewKey && (lastNewKey->getKey() == newKey->getKey()) && (lastNewKey->getMachineID() == newKey->getMachineID())) {
524 for (Transaction *transaction : lastTransactionPartsSent->keySet()) {
525 transaction->resetServerFailure();
527 // Update which transactions parts still need to be sent
528 transaction->removeSentParts(lastTransactionPartsSent->get(transaction));
530 // Add the transaction status to the outstanding list
531 outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus());
533 // Update the transaction status
534 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial);
536 // Check if all the transaction parts were successfully sent and if so then remove it from pending
537 if (transaction->didSendAllParts()) {
538 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully);
539 pendingTransactionQueue->remove(transaction);
541 transaction->resetServerFailure();
542 // Set the transaction sequence number back to nothing
543 if (!transaction->didSendAPartToServer()) {
544 transaction->setSequenceNumber(-1);
551 for (Transaction *transaction : lastTransactionPartsSent->keySet()) {
552 transaction->resetServerFailure();
553 // Set the transaction sequence number back to nothing
554 if (!transaction->didSendAPartToServer()) {
555 transaction->setSequenceNumber(-1);
559 if (sendSlotsReturn->getThird()->length() != 0) {
560 // insert into the local block chain
561 validateAndUpdate(sendSlotsReturn->getThird(), true);
565 bool isInserted = false;
566 for (uint si = 0; si < newSlots->length(); si++) {
567 Slot *s = newSlots->get(si);
568 if ((s->getSequenceNumber() == lastSlotAttemptedToSend->getSequenceNumber()) && (s->getMachineID() == localMachineId)) {
574 for (uint si = 0; si < newSlots->length(); si++) {
575 Slot *s = newSlots->get(si);
580 // Process each entry in the slot
581 for (Entry *entry : s->getEntries()) {
583 if (entry->getType() == TypeLastMessage) {
584 LastMessage *lastMessage = (LastMessage *)entry;
585 if ((lastMessage->getMachineID() == localMachineId) && (lastMessage->getSequenceNumber() == lastSlotAttemptedToSend->getSequenceNumber())) {
594 if (newKey != NULL) {
595 if (lastInsertedNewKey && (lastNewKey->getKey() == newKey->getKey()) && (lastNewKey->getMachineID() == newKey->getMachineID())) {
600 for (Transaction *transaction : lastTransactionPartsSent->keySet()) {
601 transaction->resetServerFailure();
603 // Update which transactions parts still need to be sent
604 transaction->removeSentParts(lastTransactionPartsSent->get(transaction));
606 // Add the transaction status to the outstanding list
607 outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus());
609 // Update the transaction status
610 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial);
612 // Check if all the transaction parts were successfully sent and if so then remove it from pending
613 if (transaction->didSendAllParts()) {
614 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully);
615 pendingTransactionQueue->remove(transaction);
617 transaction->resetServerFailure();
618 // Set the transaction sequence number back to nothing
619 if (!transaction->didSendAPartToServer()) {
620 transaction->setSequenceNumber(-1);
625 for (Transaction *transaction : lastTransactionPartsSent->keySet()) {
626 transaction->resetServerFailure();
627 // Set the transaction sequence number back to nothing
628 if (!transaction->didSendAPartToServer()) {
629 transaction->setSequenceNumber(-1);
634 // insert into the local block chain
635 validateAndUpdate(newSlots, true);
638 } catch (ServerException *e) {
645 // While we have stuff that needs inserting into the block chain
646 while ((pendingTransactionQueue->size() > 0) || (pendingSendArbitrationRounds->size() > 0) || (newKey != NULL)) {
650 if (hadPartialSendToServer) {
651 throw new Error("Should Be error free");
656 // If there is a new key with same name then end
657 if ((newKey != NULL) && arbitratorTable->contains(newKey->getKey())) {
662 Slot *slot = new Slot(this, sequenceNumber + 1, localMachineId, buffer->getSlot(sequenceNumber)->getHMAC(), localSequenceNumber);
663 localSequenceNumber++;
665 // Try to fill the slot with data
666 ThreeTuple<bool, int32_t, bool> fillSlotsReturn = fillSlot(slot, false, newKey);
667 bool needsResize = fillSlotsReturn->getFirst();
668 int newSize = fillSlotsReturn->getSecond();
669 bool insertedNewKey = fillSlotsReturn->getThird();
672 // Reset which transaction to send
673 for (Transaction *transaction : transactionPartsSent->keySet()) {
674 transaction->resetNextPartToSend();
676 // Set the transaction sequence number back to nothing
677 if (!transaction->didSendAPartToServer() && !transaction->getServerFailure()) {
678 transaction->setSequenceNumber(-1);
682 // Clear the sent data since we are trying again
683 pendingSendArbitrationEntriesToDelete->clear();
684 transactionPartsSent->clear();
686 // We needed a resize so try again
687 fillSlot(slot, true, newKey);
690 lastSlotAttemptedToSend = slot;
691 lastIsNewKey = (newKey != NULL);
692 lastInsertedNewKey = insertedNewKey;
693 lastNewSize = newSize;
695 lastTransactionPartsSent = new Hashtable<Transaction *, Vector<int32_t> * >(transactionPartsSent);
696 lastPendingSendArbitrationEntriesToDelete = new Vector<Entry *>(pendingSendArbitrationEntriesToDelete);
699 ThreeTuple<bool, bool, Array<Slot *> *> sendSlotsReturn = sendSlotsToServer(slot, newSize, newKey != NULL);
701 if (sendSlotsReturn->getFirst()) {
703 // Did insert into the block chain
705 if (insertedNewKey) {
706 // This slot was what was inserted not a previous slot
708 // New Key was successfully inserted into the block chain so dont want to insert it again
712 // Remove the aborts and commit parts that were sent from the pending to send queue
713 for (Iterator<ArbitrationRound *> *iter = pendingSendArbitrationRounds->iterator(); iter->hasNext(); ) {
714 ArbitrationRound *round = iter->next();
715 round->removeParts(pendingSendArbitrationEntriesToDelete);
717 if (round->isDoneSending()) {
718 // Sent all the parts
723 for (Transaction *transaction : transactionPartsSent->keySet()) {
724 transaction->resetServerFailure();
726 // Update which transactions parts still need to be sent
727 transaction->removeSentParts(transactionPartsSent->get(transaction));
729 // Add the transaction status to the outstanding list
730 outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus());
732 // Update the transaction status
733 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial);
735 // Check if all the transaction parts were successfully sent and if so then remove it from pending
736 if (transaction->didSendAllParts()) {
737 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully);
738 pendingTransactionQueue->remove(transaction);
742 // Reset which transaction to send
743 for (Transaction *transaction : transactionPartsSent->keySet()) {
744 transaction->resetNextPartToSend();
746 // Set the transaction sequence number back to nothing
747 if (!transaction->didSendAPartToServer() && !transaction->getServerFailure()) {
748 transaction->setSequenceNumber(-1);
753 // Clear the sent data in preparation for next send
754 pendingSendArbitrationEntriesToDelete->clear();
755 transactionPartsSent->clear();
757 if (sendSlotsReturn->getThird()->length() != 0) {
758 // insert into the local block chain
759 validateAndUpdate(sendSlotsReturn->getThird(), true);
763 } catch (ServerException *e) {
765 if (e->getType() != ServerException->TypeInputTimeout) {
766 // Nothing was able to be sent to the server so just clear these data structures
767 for (Transaction *transaction : transactionPartsSent->keySet()) {
768 transaction->resetNextPartToSend();
770 // Set the transaction sequence number back to nothing
771 if (!transaction->didSendAPartToServer() && !transaction->getServerFailure()) {
772 transaction->setSequenceNumber(-1);
776 // There was a partial send to the server
777 hadPartialSendToServer = true;
779 // Nothing was able to be sent to the server so just clear these data structures
780 for (Transaction *transaction : transactionPartsSent->keySet()) {
781 transaction->resetNextPartToSend();
782 transaction->setServerFailure();
786 pendingSendArbitrationEntriesToDelete->clear();
787 transactionPartsSent->clear();
792 return newKey == NULL;
795 bool Table::updateFromLocal(int64_t machineId) {
796 Pair<IoTString *, int32_t> localCommunicationInformation = localCommunicationTable->get(machineId);
797 if (localCommunicationInformation == NULL) {
798 // Cant talk to that device locally so do nothing
802 // Get the size of the send data
803 int sendDataSize = sizeof(int32_t) + sizeof(int64_t);
805 int64_t lastArbitrationDataLocalSequenceNumber = (int64_t) -1;
806 if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(machineId) != NULL) {
807 lastArbitrationDataLocalSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(machineId);
810 Array<char> *sendData = new Array<char>(sendDataSize);
811 ByteBuffer *bbEncode = ByteBuffer_wrap(sendData);
814 bbEncode->putLong(lastArbitrationDataLocalSequenceNumber);
818 Array<char> *returnData = cloud->sendLocalData(sendData, localSequenceNumber, localCommunicationInformation->getFirst(), localCommunicationInformation->getSecond());
819 localSequenceNumber++;
821 if (returnData == NULL) {
822 // Could not contact server
827 ByteBuffer *bbDecode = ByteBuffer_wrap(returnData);
828 int numberOfEntries = bbDecode->getInt();
830 for (int i = 0; i < numberOfEntries; i++) {
831 char type = bbDecode->get();
832 if (type == TypeAbort) {
833 Abort *abort = (Abort)Abort_decode(NULL, bbDecode);
835 } else if (type == TypeCommitPart) {
836 CommitPart *commitPart = (CommitPart)CommitPart_decode(NULL, bbDecode);
837 processEntry(commitPart);
841 updateLiveStateFromLocal();
846 Pair<bool, bool> Table::sendTransactionToLocal(Transaction *transaction) {
848 // Get the devices local communications
849 Pair<IoTString *, int32_t> localCommunicationInformation = localCommunicationTable->get(transaction->getArbitrator());
851 if (localCommunicationInformation == NULL) {
852 // Cant talk to that device locally so do nothing
853 return new Pair<bool, bool>(true, false);
856 // Get the size of the send data
857 int sendDataSize = sizeof(int32_t) + sizeof(int64_t);
858 for (TransactionPart *part : transaction->getParts()->values()) {
859 sendDataSize += part->getSize();
862 int64_t lastArbitrationDataLocalSequenceNumber = (int64_t) -1;
863 if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(transaction->getArbitrator()) != NULL) {
864 lastArbitrationDataLocalSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(transaction->getArbitrator());
867 // Make the send data size
868 Array<char> *sendData = new Array<char>(sendDataSize);
869 ByteBuffer *bbEncode = ByteBuffer.wrap(sendData);
872 bbEncode->putLong(lastArbitrationDataLocalSequenceNumber);
873 bbEncode->putInt(transaction->getParts()->size());
874 for (TransactionPart *part : transaction->getParts()->values()) {
875 part->encode(bbEncode);
880 Array<char> *returnData = cloud->sendLocalData(sendData, localSequenceNumber, localCommunicationInformation->getFirst(), localCommunicationInformation->getSecond());
881 localSequenceNumber++;
883 if (returnData == NULL) {
884 // Could not contact server
885 return new Pair<bool, bool>(true, false);
889 ByteBuffer *bbDecode = ByteBuffer_wrap(returnData);
890 bool didCommit = bbDecode->get() == 1;
891 bool couldArbitrate = bbDecode->get() == 1;
892 int numberOfEntries = bbDecode->getInt();
893 bool foundAbort = false;
895 for (int i = 0; i < numberOfEntries; i++) {
896 char type = bbDecode->get();
897 if (type == TypeAbort) {
898 Abort *abort = (Abort)Abort_decode(NULL, bbDecode);
900 if ((abort->getTransactionMachineId() == localMachineId) && (abort->getTransactionClientLocalSequenceNumber() == transaction->getClientLocalSequenceNumber())) {
905 } else if (type == TypeCommitPart) {
906 CommitPart *commitPart = (CommitPart)CommitPart_decode(NULL, bbDecode);
907 processEntry(commitPart);
911 updateLiveStateFromLocal();
913 if (couldArbitrate) {
914 TransactionStatus status = transaction->getTransactionStatus();
916 status->setStatus(TransactionStatus_StatusCommitted);
918 status->setStatus(TransactionStatus_StatusAborted);
921 TransactionStatus status = transaction->getTransactionStatus();
923 status->setStatus(TransactionStatus_StatusAborted);
925 status->setStatus(TransactionStatus_StatusCommitted);
929 return new Pair<bool, bool>(false, true);
932 Array<char> *Table::acceptDataFromLocal(Array<char> *data) {
935 ByteBuffer *bbDecode = ByteBuffer_wrap(data);
936 int64_t lastArbitratedSequenceNumberSeen = bbDecode->getLong();
937 int numberOfParts = bbDecode->getInt();
939 // If we did commit a transaction or not
940 bool didCommit = false;
941 bool couldArbitrate = false;
943 if (numberOfParts != 0) {
945 // decode the transaction
946 Transaction *transaction = new Transaction();
947 for (int i = 0; i < numberOfParts; i++) {
949 TransactionPart *newPart = (TransactionPart)TransactionPart.decode(NULL, bbDecode);
950 transaction->addPartDecode(newPart);
953 // Arbitrate on transaction and pull relevant return data
954 Pair<bool, bool> localArbitrateReturn = arbitrateOnLocalTransaction(transaction);
955 couldArbitrate = localArbitrateReturn->getFirst();
956 didCommit = localArbitrateReturn->getSecond();
958 updateLiveStateFromLocal();
960 // Transaction was sent to the server so keep track of it to prevent double commit
961 if (transaction->getSequenceNumber() != -1) {
962 offlineTransactionsCommittedAndAtServer->add(transaction->getId());
966 // The data to send back
967 int returnDataSize = 0;
968 Vector<Entry *> *unseenArbitrations = new Vector<Entry *>();
970 // Get the aborts to send back
971 Vector<int64_t> *abortLocalSequenceNumbers = new Vector<int64_t>(liveAbortsGeneratedByLocal->keySet());
972 Collections->sort(abortLocalSequenceNumbers);
973 for (int64_t localSequenceNumber : abortLocalSequenceNumbers) {
974 if (localSequenceNumber <= lastArbitratedSequenceNumberSeen) {
978 Abort *abort = liveAbortsGeneratedByLocal->get(localSequenceNumber);
979 unseenArbitrations->add(abort);
980 returnDataSize += abort->getSize();
983 // Get the commits to send back
984 Hashtable<int64_t, Commit *> *commitForClientTable = liveCommitsTable->get(localMachineId);
985 if (commitForClientTable != NULL) {
986 Vector<int64_t> *commitLocalSequenceNumbers = new Vector<int64_t>(commitForClientTable->keySet());
987 Collections->sort(commitLocalSequenceNumbers);
989 for (int64_t localSequenceNumber : commitLocalSequenceNumbers) {
990 Commit *commit = commitForClientTable->get(localSequenceNumber);
992 if (localSequenceNumber <= lastArbitratedSequenceNumberSeen) {
996 unseenArbitrations->addAll(commit->getParts()->values());
998 for (CommitPart commitPart : commit->getParts()->values()) {
999 returnDataSize += commitPart->getSize();
1004 // Number of arbitration entries to decode
1005 returnDataSize += 2 * sizeof(int32_t);
1007 // bool of did commit or not
1008 if (numberOfParts != 0) {
1009 returnDataSize += sizeof(char);
1012 // Data to send Back
1013 Array<char> *returnData = new Array<char>(returnDataSize);
1014 ByteBuffer *bbEncode = ByteBuffer_wrap(returnData);
1016 if (numberOfParts != 0) {
1018 bbEncode->put((char)1);
1020 bbEncode->put((char)0);
1022 if (couldArbitrate) {
1023 bbEncode->put((char)1);
1025 bbEncode->put((char)0);
1029 bbEncode->putInt(unseenArbitrations->size());
1030 for (Entry *entry : unseenArbitrations) {
1031 entry->encode(bbEncode);
1035 localSequenceNumber++;
1039 ThreeTuple<bool, bool, Array<Slot *> *> *Table::sendSlotsToServer(Slot *slot, int newSize, bool isNewKey) {
1040 bool attemptedToSendToServerTmp = attemptedToSendToServer;
1041 attemptedToSendToServer = true;
1043 bool inserted = false;
1044 bool lastTryInserted = false;
1046 Array<Slot *> *array = cloud->putSlot(slot, newSize);
1047 if (array == NULL) {
1048 array = new Array<Slot *>();
1049 array->set(0, slot);
1050 rejectedSlotVector->clear();
1053 if (array->length() == 0) {
1054 throw new Error("Server Error: Did not send any slots");
1057 // if (attemptedToSendToServerTmp) {
1058 if (hadPartialSendToServer) {
1060 bool isInserted = false;
1061 for (Slot *s : array) {
1062 if ((s->getSequenceNumber() == slot->getSequenceNumber()) && (s->getMachineID() == localMachineId)) {
1068 for (Slot *s : array) {
1073 // Process each entry in the slot
1074 for (Entry *entry : s->getEntries()) {
1076 if (entry->getType() == TypeLastMessage) {
1077 LastMessage *lastMessage = (LastMessage *)entry;
1079 if ((lastMessage->getMachineID() == localMachineId) && (lastMessage->getSequenceNumber() == slot->getSequenceNumber())) {
1088 rejectedSlotVector->add(slot->getSequenceNumber());
1089 lastTryInserted = false;
1091 lastTryInserted = true;
1094 rejectedSlotVector->add(slot->getSequenceNumber());
1095 lastTryInserted = false;
1099 return new ThreeTuple<bool, bool, Array<Slot *> *>(inserted, lastTryInserted, array);
1103 * Returns false if a resize was needed
1105 ThreeTuple<bool, int32_t, bool> *Table::fillSlot(Slot *slot, bool resize, NewKey *newKeyEntry) {
1109 if (liveSlotCount > bufferResizeThreshold) {
1110 resize = true; //Resize is forced
1115 newSize = (int) (numberOfSlots * Table_RESIZE_MULTIPLE);
1116 TableStatus *status = new TableStatus(slot, newSize);
1117 slot->addEntry(status);
1120 // Fill with rejected slots first before doing anything else
1121 doRejectedMessages(slot);
1123 // Do mandatory rescue of entries
1124 ThreeTuple<bool, bool, int64_t> mandatoryRescueReturn = doMandatoryResuce(slot, resize);
1126 // Extract working variables
1127 bool needsResize = mandatoryRescueReturn->getFirst();
1128 bool seenLiveSlot = mandatoryRescueReturn->getSecond();
1129 int64_t currentRescueSequenceNumber = mandatoryRescueReturn->getThird();
1131 if (needsResize && !resize) {
1132 // We need to resize but we are not resizing so return false
1133 return new ThreeTuple<bool, int32_t, bool>(true, NULL, NULL);
1136 bool inserted = false;
1137 if (newKeyEntry != NULL) {
1138 newKeyEntry->setSlot(slot);
1139 if (slot->hasSpace(newKeyEntry)) {
1141 slot->addEntry(newKeyEntry);
1146 // Clear the transactions, aborts and commits that were sent previously
1147 transactionPartsSent->clear();
1148 pendingSendArbitrationEntriesToDelete->clear();
1150 for (ArbitrationRound *round : pendingSendArbitrationRounds) {
1151 bool isFull = false;
1152 round->generateParts();
1153 Vector<Entry *> *parts = round->getParts();
1155 // Insert pending arbitration data
1156 for (Entry *arbitrationData : parts) {
1158 // If it is an abort then we need to set some information
1159 if (arbitrationData instanceof Abort) {
1160 ((Abort *)arbitrationData)->setSequenceNumber(slot->getSequenceNumber());
1163 if (!slot->hasSpace(arbitrationData)) {
1164 // No space so cant do anything else with these data entries
1169 // Add to this current slot and add it to entries to delete
1170 slot->addEntry(arbitrationData);
1171 pendingSendArbitrationEntriesToDelete->add(arbitrationData);
1179 if (pendingTransactionQueue->size() > 0) {
1181 Transaction *transaction = pendingTransactionQueue->get(0);
1183 // Set the transaction sequence number if it has yet to be inserted into the block chain
1184 // if ((!transaction->didSendAPartToServer() && !transaction->getServerFailure()) || (transaction->getSequenceNumber() == -1)) {
1185 // transaction->setSequenceNumber(slot->getSequenceNumber());
1188 if ((!transaction->didSendAPartToServer()) || (transaction->getSequenceNumber() == -1)) {
1189 transaction->setSequenceNumber(slot->getSequenceNumber());
1194 TransactionPart *part = transaction->getNextPartToSend();
1197 // Ran out of parts to send for this transaction so move on
1201 if (slot->hasSpace(part)) {
1202 slot->addEntry(part);
1203 Vector<int32_t> *partsSent = transactionPartsSent->get(transaction);
1204 if (partsSent == NULL) {
1205 partsSent = new Vector<int32_t>();
1206 transactionPartsSent->put(transaction, partsSent);
1208 partsSent->add(part->getPartNumber());
1209 transactionPartsSent->put(transaction, partsSent);
1216 // Fill the remainder of the slot with rescue data
1217 doOptionalRescue(slot, seenLiveSlot, currentRescueSequenceNumber, resize);
1219 return new ThreeTuple<bool, int32_t, bool>(false, newSize, inserted);
1222 void Table::doRejectedMessages(Slot *s) {
1223 if (!rejectedSlotVector->isEmpty()) {
1224 /* TODO: We should avoid generating a rejected message entry if
1225 * there is already a sufficient entry in the queue (e->g->,
1226 * equalsto value of true and same sequence number)-> */
1228 int64_t old_seqn = rejectedSlotVector->firstElement();
1229 if (rejectedSlotVector->size() > Table_REJECTED_THRESHOLD) {
1230 int64_t new_seqn = rejectedSlotVector->lastElement();
1231 RejectedMessage *rm = new RejectedMessage(s, s->getSequenceNumber(), localMachineId, old_seqn, new_seqn, false);
1234 int64_t prev_seqn = -1;
1236 /* Go through list of missing messages */
1237 for (; i < rejectedSlotVector->size(); i++) {
1238 int64_t curr_seqn = rejectedSlotVector->get(i);
1239 Slot *s_msg = buffer->getSlot(curr_seqn);
1242 prev_seqn = curr_seqn;
1244 /* Generate rejected message entry for missing messages */
1245 if (prev_seqn != -1) {
1246 RejectedMessage *rm = new RejectedMessage(s, s->getSequenceNumber(), localMachineId, old_seqn, prev_seqn, false);
1249 /* Generate rejected message entries for present messages */
1250 for (; i < rejectedSlotVector->size(); i++) {
1251 int64_t curr_seqn = rejectedSlotVector->get(i);
1252 Slot *s_msg = buffer->getSlot(curr_seqn);
1253 int64_t machineid = s_msg->getMachineID();
1254 RejectedMessage *rm = new RejectedMessage(s, s->getSequenceNumber(), machineid, curr_seqn, curr_seqn, true);
1261 ThreeTuple<bool, bool, int64_t> Table::doMandatoryResuce(Slot *slot, bool resize) {
1262 int64_t newestSequenceNumber = buffer->getNewestSeqNum();
1263 int64_t oldestSequenceNumber = buffer->getOldestSeqNum();
1264 if (oldestLiveSlotSequenceNumver < oldestSequenceNumber) {
1265 oldestLiveSlotSequenceNumver = oldestSequenceNumber;
1268 int64_t currentSequenceNumber = oldestLiveSlotSequenceNumver;
1269 bool seenLiveSlot = false;
1270 int64_t firstIfFull = newestSequenceNumber + 1 - numberOfSlots; // smallest seq number in the buffer if it is full
1271 int64_t threshold = firstIfFull + Table_FREE_SLOTS; // we want the buffer to be clear of live entries up to this point
1275 for (; currentSequenceNumber < threshold; currentSequenceNumber++) {
1276 Slot previousSlot = buffer->getSlot(currentSequenceNumber);
1277 // Push slot number forward
1278 if (!seenLiveSlot) {
1279 oldestLiveSlotSequenceNumver = currentSequenceNumber;
1282 if (!previousSlot->isLive()) {
1286 // We have seen a live slot
1287 seenLiveSlot = true;
1289 // Get all the live entries for a slot
1290 Vector<Entry *> *liveEntries = previousSlot->getLiveEntries(resize);
1292 // Iterate over all the live entries and try to rescue them
1293 for (Entry *liveEntry : liveEntries) {
1294 if (slot->hasSpace(liveEntry)) {
1296 // Enough space to rescue the entry
1297 slot->addEntry(liveEntry);
1298 } else if (currentSequenceNumber == firstIfFull) {
1299 //if there's no space but the entry is about to fall off the queue
1300 System->out->println("B"); //?
1301 return new ThreeTuple<bool, bool, int64_t>(true, seenLiveSlot, currentSequenceNumber);
1308 return new ThreeTuple<bool, bool, int64_t>(false, seenLiveSlot, currentSequenceNumber);
1311 void Table::doOptionalRescue(Slot *s, bool seenliveslot, int64_t seqn, bool resize) {
1312 /* now go through live entries from least to greatest sequence number until
1313 * either all live slots added, or the slot doesn't have enough room
1314 * for SKIP_THRESHOLD consecutive entries*/
1316 int64_t newestseqnum = buffer->getNewestSeqNum();
1318 for (; seqn <= newestseqnum; seqn++) {
1319 Slot *prevslot = buffer->getSlot(seqn);
1320 //Push slot number forward
1322 oldestLiveSlotSequenceNumver = seqn;
1324 if (!prevslot->isLive())
1326 seenliveslot = true;
1327 Vector<Entry *> *liveentries = prevslot->getLiveEntries(resize);
1328 for (Entry *liveentry : liveentries) {
1329 if (s->hasSpace(liveentry))
1330 s->addEntry(liveentry);
1333 if (skipcount > Table_SKIP_THRESHOLD)
1341 * Checks for malicious activity and updates the local copy of the block chain->
1343 void Table::validateAndUpdate(Array<Slot *> *newSlots, bool acceptUpdatesToLocal) {
1345 // The cloud communication layer has checked slot HMACs already before decoding
1346 if (newSlots->length() == 0) {
1350 // Make sure all slots are newer than the last largest slot this client has seen
1351 int64_t firstSeqNum = newSlots[0]->getSequenceNumber();
1352 if (firstSeqNum <= sequenceNumber) {
1353 throw new Error("Server Error: Sent older slots!");
1356 // Create an object that can access both new slots and slots in our local chain
1357 // without committing slots to our local chain
1358 SlotIndexer *indexer = new SlotIndexer(newSlots, buffer);
1360 // Check that the HMAC chain is not broken
1361 checkHMACChain(indexer, newSlots);
1363 // Set to keep track of messages from clients
1364 Hashset<int64_t> *machineSet = new Hashset<int64_t>(lastMessageTable->keySet());
1366 // Process each slots data
1367 for (Slot *slot : newSlots) {
1368 processSlot(indexer, slot, acceptUpdatesToLocal, machineSet);
1370 updateExpectedSize();
1373 // If there is a gap, check to see if the server sent us everything->
1374 if (firstSeqNum != (sequenceNumber + 1)) {
1376 // Check the size of the slots that were sent down by the server->
1377 // Can only check the size if there was a gap
1378 checkNumSlots(newSlots->length);
1380 // Since there was a gap every machine must have pushed a slot or must have
1381 // a last message message-> If not then the server is hiding slots
1382 if (!machineSet->isEmpty()) {
1383 throw new Error("Missing record for machines: " + machineSet);
1387 // Update the size of our local block chain->
1390 // Commit new to slots to the local block chain->
1391 for (Slot *slot : newSlots) {
1393 // Insert this slot into our local block chain copy->
1394 buffer->putSlot(slot);
1396 // Keep track of how many slots are currently live (have live data in them)->
1400 // Get the sequence number of the latest slot in the system
1401 sequenceNumber = newSlots[newSlots->length() - 1]->getSequenceNumber();
1403 updateLiveStateFromServer();
1405 // No Need to remember after we pulled from the server
1406 offlineTransactionsCommittedAndAtServer->clear();
1408 // This is invalidated now
1409 hadPartialSendToServer = false;
1412 void Table::updateLiveStateFromServer() {
1413 // Process the new transaction parts
1414 processNewTransactionParts();
1416 // Do arbitration on new transactions that were received
1417 arbitrateFromServer();
1419 // Update all the committed keys
1420 bool didCommitOrSpeculate = updateCommittedTable();
1422 // Delete the transactions that are now dead
1423 updateLiveTransactionsAndStatus();
1426 didCommitOrSpeculate |= updateSpeculativeTable(didCommitOrSpeculate);
1427 updatePendingTransactionSpeculativeTable(didCommitOrSpeculate);
1430 void Table::updateLiveStateFromLocal() {
1431 // Update all the committed keys
1432 bool didCommitOrSpeculate = updateCommittedTable();
1434 // Delete the transactions that are now dead
1435 updateLiveTransactionsAndStatus();
1438 didCommitOrSpeculate |= updateSpeculativeTable(didCommitOrSpeculate);
1439 updatePendingTransactionSpeculativeTable(didCommitOrSpeculate);
1442 void Table::initExpectedSize(int64_t firstSequenceNumber, int64_t numberOfSlots) {
1443 // if (didFindTableStatus) {
1446 int64_t prevslots = firstSequenceNumber;
1449 if (didFindTableStatus) {
1450 // expectedsize = (prevslots < ((int64_t) numberOfSlots)) ? (int) prevslots : expectedsize;
1451 // System->out->println("Here2: " + expectedsize + " " + numberOfSlots + " " + prevslots);
1454 expectedsize = (prevslots < ((int64_t) numberOfSlots)) ? (int) prevslots : numberOfSlots;
1455 // System->out->println("Here: " + expectedsize);
1458 // System->out->println(numberOfSlots);
1460 didFindTableStatus = true;
1461 currMaxSize = numberOfSlots;
1464 void Table::updateExpectedSize() {
1467 if (expectedsize > currMaxSize) {
1468 expectedsize = currMaxSize;
1474 * Check the size of the block chain to make sure there are enough slots sent back by the server->
1475 * This is only called when we have a gap between the slots that we have locally and the slots
1476 * sent by the server therefore in the slots sent by the server there will be at least 1 Table
1479 void Table::checkNumSlots(int numberOfSlots) {
1480 if (numberOfSlots != expectedsize) {
1481 throw new Error("Server Error: Server did not send all slots-> Expected: " + expectedsize + " Received:" + numberOfSlots);
1485 void Table::updateCurrMaxSize(int newmaxsize) {
1486 currMaxSize = newmaxsize;
1491 * Update the size of of the local buffer if it is needed->
1493 void Table::commitNewMaxSize() {
1494 didFindTableStatus = false;
1496 // Resize the local slot buffer
1497 if (numberOfSlots != currMaxSize) {
1498 buffer->resize((int32_t)currMaxSize);
1501 // Change the number of local slots to the new size
1502 numberOfSlots = (int32_t)currMaxSize;
1505 // Recalculate the resize threshold since the size of the local buffer has changed
1506 setResizeThreshold();
1510 * Process the new transaction parts from this latest round of slots received from the server
1512 void Table::processNewTransactionParts() {
1514 if (newTransactionParts->size() == 0) {
1515 // Nothing new to process
1519 // Iterate through all the machine Ids that we received new parts for
1520 for (int64_t machineId : newTransactionParts->keySet()) {
1521 Hashtable<Pair<int64_t int32_t> *, TransactionPart *> *parts = newTransactionParts->get(machineId);
1523 // Iterate through all the parts for that machine Id
1524 for (Pair<int64_t, int32_t> *partId : parts->keySet()) {
1525 TransactionPart *part = parts->get(partId);
1527 int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(part->getArbitratorId());
1528 if ((lastTransactionNumber != NULL) && (lastTransactionNumber >= part->getSequenceNumber())) {
1529 // Set dead the transaction part
1534 // Get the transaction object for that sequence number
1535 Transaction *transaction = liveTransactionBySequenceNumberTable->get(part->getSequenceNumber());
1537 if (transaction == NULL) {
1538 // This is a new transaction that we dont have so make a new one
1539 transaction = new Transaction();
1541 // Insert this new transaction into the live tables
1542 liveTransactionBySequenceNumberTable->put(part->getSequenceNumber(), transaction);
1543 liveTransactionByTransactionIdTable->put(part->getTransactionId(), transaction);
1546 // Add that part to the transaction
1547 transaction->addPartDecode(part);
1551 // Clear all the new transaction parts in preparation for the next time the server sends slots
1552 newTransactionParts->clear();
1555 void Table::arbitrateFromServer() {
1557 if (liveTransactionBySequenceNumberTable->size() == 0) {
1558 // Nothing to arbitrate on so move on
1562 // Get the transaction sequence numbers and sort from oldest to newest
1563 Vector<int64_t> *transactionSequenceNumbers = new Vector<int64_t>(liveTransactionBySequenceNumberTable->keySet());
1564 Collections->sort(transactionSequenceNumbers);
1566 // Collection of key value pairs that are
1567 Hashtable<IoTString *, KeyValue *> speculativeTableTmp = new Hashtable<IoTString *, KeyValue *>();
1569 // The last transaction arbitrated on
1570 int64_t lastTransactionCommitted = -1;
1571 Hashset<Abort *> *generatedAborts = new Hashset<Abort *>();
1573 for (int64_t transactionSequenceNumber : transactionSequenceNumbers) {
1574 Transaction *transaction = liveTransactionBySequenceNumberTable->get(transactionSequenceNumber);
1578 // Check if this machine arbitrates for this transaction if not then we cant arbitrate this transaction
1579 if (transaction->getArbitrator() != localMachineId) {
1583 if (transactionSequenceNumber < lastSeqNumArbOn) {
1587 if (offlineTransactionsCommittedAndAtServer->contains(transaction->getId())) {
1588 // We have seen this already locally so dont commit again
1593 if (!transaction->isComplete()) {
1594 // Will arbitrate in incorrect order if we continue so just break
1600 // update the largest transaction seen by arbitrator from server
1601 if (lastTransactionSeenFromMachineFromServer->get(transaction->getMachineId()) == NULL) {
1602 lastTransactionSeenFromMachineFromServer->put(transaction->getMachineId(), transaction->getClientLocalSequenceNumber());
1604 int64_t lastTransactionSeenFromMachine = lastTransactionSeenFromMachineFromServer->get(transaction->getMachineId());
1605 if (transaction->getClientLocalSequenceNumber() > lastTransactionSeenFromMachine) {
1606 lastTransactionSeenFromMachineFromServer->put(transaction->getMachineId(), transaction->getClientLocalSequenceNumber());
1610 if (transaction->evaluateGuard(committedKeyValueTable, speculativeTableTmp, NULL)) {
1611 // Guard evaluated as true
1613 // Update the local changes so we can make the commit
1614 for (KeyValue *kv : transaction->getKeyValueUpdateSet()) {
1615 speculativeTableTmp->put(kv->getKey(), kv);
1618 // Update what the last transaction committed was for use in batch commit
1619 lastTransactionCommitted = transactionSequenceNumber;
1621 // Guard evaluated was false so create abort
1624 Abort *newAbort = new Abort(NULL,
1625 transaction->getClientLocalSequenceNumber(),
1626 transaction->getSequenceNumber(),
1627 transaction->getMachineId(),
1628 transaction->getArbitrator(),
1629 localArbitrationSequenceNumber);
1630 localArbitrationSequenceNumber++;
1632 generatedAborts->add(newAbort);
1634 // Insert the abort so we can process
1635 processEntry(newAbort);
1638 lastSeqNumArbOn = transactionSequenceNumber;
1640 // liveTransactionBySequenceNumberTable->remove(transactionSequenceNumber);
1643 Commit *newCommit = NULL;
1645 // If there is something to commit
1646 if (speculativeTableTmp->size() != 0) {
1648 // Create the commit and increment the commit sequence number
1649 newCommit = new Commit(localArbitrationSequenceNumber, localMachineId, lastTransactionCommitted);
1650 localArbitrationSequenceNumber++;
1652 // Add all the new keys to the commit
1653 for (KeyValue *kv : speculativeTableTmp->values()) {
1654 newCommit->addKV(kv);
1657 // create the commit parts
1658 newCommit->createCommitParts();
1660 // Append all the commit parts to the end of the pending queue waiting for sending to the server
1662 // Insert the commit so we can process it
1663 for (CommitPart *commitPart : newCommit->getParts()->values()) {
1664 processEntry(commitPart);
1668 if ((newCommit != NULL) || (generatedAborts->size() > 0)) {
1669 ArbitrationRound *arbitrationRound = new ArbitrationRound(newCommit, generatedAborts);
1670 pendingSendArbitrationRounds->add(arbitrationRound);
1672 if (compactArbitrationData()) {
1673 ArbitrationRound *newArbitrationRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1);
1674 if (newArbitrationRound->getCommit() != NULL) {
1675 for (CommitPart *commitPart : newArbitrationRound->getCommit()->getParts()->values()) {
1676 processEntry(commitPart);
1683 Pair<bool, bool> Table::arbitrateOnLocalTransaction(Transaction *transaction) {
1685 // Check if this machine arbitrates for this transaction if not then we cant arbitrate this transaction
1686 if (transaction->getArbitrator() != localMachineId) {
1687 return new Pair<bool, bool>(false, false);
1690 if (!transaction->isComplete()) {
1691 // Will arbitrate in incorrect order if we continue so just break
1693 return new Pair<bool, bool>(false, false);
1696 if (transaction->getMachineId() != localMachineId) {
1697 // dont do this check for local transactions
1698 if (lastTransactionSeenFromMachineFromServer->get(transaction->getMachineId()) != NULL) {
1699 if (lastTransactionSeenFromMachineFromServer->get(transaction->getMachineId()) > transaction->getClientLocalSequenceNumber()) {
1700 // We've have already seen this from the server
1701 return new Pair<bool, bool>(false, false);
1706 if (transaction->evaluateGuard(committedKeyValueTable, NULL, NULL)) {
1707 // Guard evaluated as true
1709 // Create the commit and increment the commit sequence number
1710 Commit *newCommit = new Commit(localArbitrationSequenceNumber, localMachineId, -1);
1711 localArbitrationSequenceNumber++;
1713 // Update the local changes so we can make the commit
1714 for (KeyValue *kv : transaction->getKeyValueUpdateSet()) {
1715 newCommit->addKV(kv);
1718 // create the commit parts
1719 newCommit->createCommitParts();
1721 // Append all the commit parts to the end of the pending queue waiting for sending to the server
1722 ArbitrationRound *arbitrationRound = new ArbitrationRound(newCommit, new Hashset<Abort *>());
1723 pendingSendArbitrationRounds->add(arbitrationRound);
1725 if (compactArbitrationData()) {
1726 ArbitrationRound *newArbitrationRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1);
1727 for (CommitPart *commitPart : newArbitrationRound->getCommit()->getParts()->values()) {
1728 processEntry(commitPart);
1731 // Insert the commit so we can process it
1732 for (CommitPart *commitPart : newCommit->getParts()->values()) {
1733 processEntry(commitPart);
1737 if (transaction->getMachineId() == localMachineId) {
1738 TransactionStatus *status = transaction->getTransactionStatus();
1739 if (status != NULL) {
1740 status->setStatus(TransactionStatus_StatusCommitted);
1744 updateLiveStateFromLocal();
1745 return new Pair<bool, bool>(true, true);
1748 if (transaction->getMachineId() == localMachineId) {
1749 // For locally created messages update the status
1751 // Guard evaluated was false so create abort
1752 TransactionStatus status = transaction->getTransactionStatus();
1753 if (status != NULL) {
1754 status->setStatus(TransactionStatus_StatusAborted);
1757 Hashset<Abort *> *addAbortSet = new Hashset<Abort * >();
1761 Abort *newAbort = new Abort(NULL,
1762 transaction->getClientLocalSequenceNumber(),
1764 transaction->getMachineId(),
1765 transaction->getArbitrator(),
1766 localArbitrationSequenceNumber);
1767 localArbitrationSequenceNumber++;
1769 addAbortSet->add(newAbort);
1772 // Append all the commit parts to the end of the pending queue waiting for sending to the server
1773 ArbitrationRound *arbitrationRound = new ArbitrationRound(NULL, addAbortSet);
1774 pendingSendArbitrationRounds->add(arbitrationRound);
1776 if (compactArbitrationData()) {
1777 ArbitrationRound *newArbitrationRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1);
1778 for (CommitPart *commitPart : newArbitrationRound->getCommit()->getParts()->values()) {
1779 processEntry(commitPart);
1784 updateLiveStateFromLocal();
1785 return new Pair<bool, bool>(true, false);
1790 * Compacts the arbitration data my merging commits and aggregating aborts so that a single large push of commits can be done instead of many small updates
1792 bool Table::compactArbitrationData() {
1794 if (pendingSendArbitrationRounds->size() < 2) {
1795 // Nothing to compact so do nothing
1799 ArbitrationRound *lastRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1);
1800 if (lastRound->didSendPart()) {
1804 bool hadCommit = (lastRound->getCommit() == NULL);
1805 bool gotNewCommit = false;
1807 int numberToDelete = 1;
1808 while (numberToDelete < pendingSendArbitrationRounds->size()) {
1809 ArbitrationRound *xs round = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - numberToDelete - 1);
1811 if (round->isFull() || round->didSendPart()) {
1812 // Stop since there is a part that cannot be compacted and we need to compact in order
1816 if (round->getCommit() == NULL) {
1818 // Try compacting aborts only
1819 int newSize = round->getCurrentSize() + lastRound->getAbortsCount();
1820 if (newSize > ArbitrationRound->MAX_PARTS) {
1821 // Cant compact since it would be too large
1824 lastRound->addAborts(round->getAborts());
1827 // Create a new larger commit
1828 Commit newCommit = Commit->merge(lastRound->getCommit(), round->getCommit(), localArbitrationSequenceNumber);
1829 localArbitrationSequenceNumber++;
1831 // Create the commit parts so that we can count them
1832 newCommit->createCommitParts();
1834 // Calculate the new size of the parts
1835 int newSize = newCommit->getNumberOfParts();
1836 newSize += lastRound->getAbortsCount();
1837 newSize += round->getAbortsCount();
1839 if (newSize > ArbitrationRound->MAX_PARTS) {
1840 // Cant compact since it would be too large
1844 // Set the new compacted part
1845 lastRound->setCommit(newCommit);
1846 lastRound->addAborts(round->getAborts());
1847 gotNewCommit = true;
1853 if (numberToDelete != 1) {
1854 // If there is a compaction
1856 // Delete the previous pieces that are now in the new compacted piece
1857 if (numberToDelete == pendingSendArbitrationRounds->size()) {
1858 pendingSendArbitrationRounds->clear();
1860 for (int i = 0; i < numberToDelete; i++) {
1861 pendingSendArbitrationRounds->remove(pendingSendArbitrationRounds->size() - 1);
1865 // Add the new compacted into the pending to send list
1866 pendingSendArbitrationRounds->add(lastRound);
1868 // Should reinsert into the commit processor
1869 if (hadCommit && gotNewCommit) {
1876 // bool compactArbitrationData() {
1881 * Update all the commits and the committed tables, sets dead the dead transactions
1883 bool Table::updateCommittedTable() {
1885 if (newCommitParts->size() == 0) {
1886 // Nothing new to process
1890 // Iterate through all the machine Ids that we received new parts for
1891 for (int64_t machineId : newCommitParts->keySet()) {
1892 Hashtable<Pair<int64_t, int32_t> *, CommitPart *> *parts = newCommitParts->get(machineId);
1894 // Iterate through all the parts for that machine Id
1895 for (Pair<int64_t, int32_t> *partId : parts->keySet()) {
1896 CommitPart *part = parts->get(partId);
1898 // Get the transaction object for that sequence number
1899 Hashtable<int64_t, Commit *> *commitForClientTable = liveCommitsTable->get(part->getMachineId());
1901 if (commitForClientTable == NULL) {
1902 // This is the first commit from this device
1903 commitForClientTable = new Hashtable<int64_t, Commit *>();
1904 liveCommitsTable->put(part->getMachineId(), commitForClientTable);
1907 Commit *commit = commitForClientTable->get(part->getSequenceNumber());
1909 if (commit == NULL) {
1910 // This is a new commit that we dont have so make a new one
1911 commit = new Commit();
1913 // Insert this new commit into the live tables
1914 commitForClientTable->put(part->getSequenceNumber(), commit);
1917 // Add that part to the commit
1918 commit->addPartDecode(part);
1922 // Clear all the new commits parts in preparation for the next time the server sends slots
1923 newCommitParts->clear();
1925 // If we process a new commit keep track of it for future use
1926 bool didProcessANewCommit = false;
1928 // Process the commits one by one
1929 for (int64_T arbitratorId : liveCommitsTable->keySet()) {
1931 // Get all the commits for a specific arbitrator
1932 Hashtable<int64_t, Commit *> *commitForClientTable = liveCommitsTable->get(arbitratorId);
1934 // Sort the commits in order
1935 Vector<int64_t> *commitSequenceNumbers = new Vector<int64_t>(commitForClientTable->keySet());
1936 Collections->sort(commitSequenceNumbers);
1938 // Get the last commit seen from this arbitrator
1939 int64_t lastCommitSeenSequenceNumber = -1;
1940 if (lastCommitSeenSequenceNumberByArbitratorTable->get(arbitratorId) != NULL) {
1941 lastCommitSeenSequenceNumber = lastCommitSeenSequenceNumberByArbitratorTable->get(arbitratorId);
1944 // Go through each new commit one by one
1945 for (int i = 0; i < commitSequenceNumbers->size(); i++) {
1946 int64_t commitSequenceNumber = commitSequenceNumbers->get(i);
1947 Commit *commit = commitForClientTable->get(commitSequenceNumber);
1949 // Special processing if a commit is not complete
1950 if (!commit->isComplete()) {
1951 if (i == (commitSequenceNumbers->size() - 1)) {
1952 // If there is an incomplete commit and this commit is the latest one seen then this commit cannot be processed and there are no other commits
1955 // This is a commit that was already dead but parts of it are still in the block chain (not flushed out yet)->
1956 // Delete it and move on
1958 commitForClientTable->remove(commit->getSequenceNumber());
1963 // Update the last transaction that was updated if we can
1964 if (commit->getTransactionSequenceNumber() != -1) {
1965 int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(commit->getMachineId());
1967 // Update the last transaction sequence number that the arbitrator arbitrated on
1968 if ((lastTransactionNumber == NULL) || (lastTransactionNumber < commit->getTransactionSequenceNumber())) {
1969 lastArbitratedTransactionNumberByArbitratorTable->put(commit->getMachineId(), commit->getTransactionSequenceNumber());
1973 // Update the last arbitration data that we have seen so far
1974 if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(commit->getMachineId()) != NULL) {
1976 int64_t lastArbitrationSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(commit->getMachineId());
1977 if (commit->getSequenceNumber() > lastArbitrationSequenceNumber) {
1979 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->put(commit->getMachineId(), commit->getSequenceNumber());
1982 // Never seen any data from this arbitrator so record the first one
1983 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->put(commit->getMachineId(), commit->getSequenceNumber());
1986 // We have already seen this commit before so need to do the full processing on this commit
1987 if (commit->getSequenceNumber() <= lastCommitSeenSequenceNumber) {
1989 // Update the last transaction that was updated if we can
1990 if (commit->getTransactionSequenceNumber() != -1) {
1991 int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(commit->getMachineId());
1993 // Update the last transaction sequence number that the arbitrator arbitrated on
1994 if ((lastTransactionNumber == NULL) || (lastTransactionNumber < commit->getTransactionSequenceNumber())) {
1995 lastArbitratedTransactionNumberByArbitratorTable->put(commit->getMachineId(), commit->getTransactionSequenceNumber());
2002 // If we got here then this is a brand new commit and needs full processing
2004 // Get what commits should be edited, these are the commits that have live values for their keys
2005 Hashset<Commit *> *commitsToEdit = new Hashset<Commit *>();
2006 for (KeyValue *kv : commit->getKeyValueUpdateSet()) {
2007 commitsToEdit->add(liveCommitsByKeyTable->get(kv->getKey()));
2009 commitsToEdit->remove(NULL); // remove NULL since it could be in this set
2011 // Update each previous commit that needs to be updated
2012 for (Commit *previousCommit : commitsToEdit) {
2014 // Only bother with live commits (TODO: Maybe remove this check)
2015 if (previousCommit->isLive()) {
2017 // Update which keys in the old commits are still live
2018 for (KeyValue *kv : commit->getKeyValueUpdateSet()) {
2019 previousCommit->invalidateKey(kv->getKey());
2022 // if the commit is now dead then remove it
2023 if (!previousCommit->isLive()) {
2024 commitForClientTable->remove(previousCommit);
2029 // Update the last seen sequence number from this arbitrator
2030 if (lastCommitSeenSequenceNumberByArbitratorTable->get(commit->getMachineId()) != NULL) {
2031 if (commit->getSequenceNumber() > lastCommitSeenSequenceNumberByArbitratorTable->get(commit->getMachineId())) {
2032 lastCommitSeenSequenceNumberByArbitratorTable->put(commit->getMachineId(), commit->getSequenceNumber());
2035 lastCommitSeenSequenceNumberByArbitratorTable->put(commit->getMachineId(), commit->getSequenceNumber());
2038 // We processed a new commit that we havent seen before
2039 didProcessANewCommit = true;
2041 // Update the committed table of keys and which commit is using which key
2042 for (KeyValue *kv : commit->getKeyValueUpdateSet()) {
2043 committedKeyValueTable->put(kv->getKey(), kv);
2044 liveCommitsByKeyTable->put(kv->getKey(), commit);
2049 return didProcessANewCommit;
2053 * Create the speculative table from transactions that are still live and have come from the cloud
2055 bool Table::updateSpeculativeTable(bool didProcessNewCommits) {
2056 if (liveTransactionBySequenceNumberTable->keySet()->size() == 0) {
2057 // There is nothing to speculate on
2061 // Create a list of the transaction sequence numbers and sort them from oldest to newest
2062 Vector<int64_t> *transactionSequenceNumbersSorted = new Vector<int64_t>(liveTransactionBySequenceNumberTable->keySet());
2063 Collections->sort(transactionSequenceNumbersSorted);
2065 bool hasGapInTransactionSequenceNumbers = transactionSequenceNumbersSorted->get(0) != oldestTransactionSequenceNumberSpeculatedOn;
2068 if (hasGapInTransactionSequenceNumbers || didProcessNewCommits) {
2069 // If there is a gap in the transaction sequence numbers then there was a commit or an abort of a transaction
2070 // OR there was a new commit (Could be from offline commit) so a redo the speculation from scratch
2072 // Start from scratch
2073 speculatedKeyValueTable->clear();
2074 lastTransactionSequenceNumberSpeculatedOn = -1;
2075 oldestTransactionSequenceNumberSpeculatedOn = -1;
2079 // Remember the front of the transaction list
2080 oldestTransactionSequenceNumberSpeculatedOn = transactionSequenceNumbersSorted->get(0);
2082 // Find where to start arbitration from
2083 int startIndex = transactionSequenceNumbersSorted->indexOf(lastTransactionSequenceNumberSpeculatedOn) + 1;
2085 if (startIndex >= transactionSequenceNumbersSorted->size()) {
2086 // Make sure we are not out of bounds
2087 return false; // did not speculate
2090 Hashset<int64_t> *incompleteTransactionArbitrator = new Hashset<int64_t>();
2091 bool didSkip = true;
2093 for (int i = startIndex; i < transactionSequenceNumbersSorted->size(); i++) {
2094 int64_t transactionSequenceNumber = transactionSequenceNumbersSorted->get(i);
2095 Transaction *transaction = liveTransactionBySequenceNumberTable->get(transactionSequenceNumber);
2097 if (!transaction->isComplete()) {
2098 // If there is an incomplete transaction then there is nothing we can do
2099 // add this transactions arbitrator to the list of arbitrators we should ignore
2100 incompleteTransactionArbitrator->add(transaction->getArbitrator());
2105 if (incompleteTransactionArbitrator->contains(transaction->getArbitrator())) {
2109 lastTransactionSequenceNumberSpeculatedOn = transactionSequenceNumber;
2111 if (transaction->evaluateGuard(committedKeyValueTable, speculatedKeyValueTable, NULL)) {
2112 // Guard evaluated to true so update the speculative table
2113 for (KeyValue *kv : transaction->getKeyValueUpdateSet()) {
2114 speculatedKeyValueTable->put(kv->getKey(), kv);
2120 // Since there was a skip we need to redo the speculation next time around
2121 lastTransactionSequenceNumberSpeculatedOn = -1;
2122 oldestTransactionSequenceNumberSpeculatedOn = -1;
2125 // We did some speculation
2130 * Create the pending transaction speculative table from transactions that are still in the pending transaction buffer
2132 void Table::updatePendingTransactionSpeculativeTable(bool didProcessNewCommitsOrSpeculate) {
2133 if (pendingTransactionQueue->size() == 0) {
2134 // There is nothing to speculate on
2138 if (didProcessNewCommitsOrSpeculate || (firstPendingTransaction != pendingTransactionQueue->get(0))) {
2139 // need to reset on the pending speculation
2140 lastPendingTransactionSpeculatedOn = NULL;
2141 firstPendingTransaction = pendingTransactionQueue->get(0);
2142 pendingTransactionSpeculatedKeyValueTable->clear();
2145 // Find where to start arbitration from
2146 int startIndex = pendingTransactionQueue->indexOf(firstPendingTransaction) + 1;
2148 if (startIndex >= pendingTransactionQueue->size()) {
2149 // Make sure we are not out of bounds
2153 for (int i = startIndex; i < pendingTransactionQueue->size(); i++) {
2154 Transaction *transaction = pendingTransactionQueue->get(i);
2156 lastPendingTransactionSpeculatedOn = transaction;
2158 if (transaction->evaluateGuard(committedKeyValueTable, speculatedKeyValueTable, pendingTransactionSpeculatedKeyValueTable)) {
2159 // Guard evaluated to true so update the speculative table
2160 for (KeyValue *kv : transaction->getKeyValueUpdateSet()) {
2161 pendingTransactionSpeculatedKeyValueTable->put(kv->getKey(), kv);
2168 * Set dead and remove from the live transaction tables the transactions that are dead
2170 void Table::updateLiveTransactionsAndStatus() {
2172 // Go through each of the transactions
2173 for (Iterator<Map->Entry<int64_t, Transaction> > *iter = liveTransactionBySequenceNumberTable->entrySet()->iterator(); iter->hasNext();) {
2174 Transaction *transaction = iter->next()->getValue();
2176 // Check if the transaction is dead
2177 int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(transaction->getArbitrator());
2178 if ((lastTransactionNumber != NULL) && (lastTransactionNumber >= transaction->getSequenceNumber())) {
2180 // Set dead the transaction
2181 transaction->setDead();
2183 // Remove the transaction from the live table
2185 liveTransactionByTransactionIdTable->remove(transaction->getId());
2189 // Go through each of the transactions
2190 for (Iterator<Map->Entry<int64_t, TransactionStatus *> > *iter = outstandingTransactionStatus->entrySet()->iterator(); iter->hasNext();) {
2191 TransactionStatus *status = iter->next()->getValue();
2193 // Check if the transaction is dead
2194 int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(status->getTransactionArbitrator());
2195 if ((lastTransactionNumber != NULL) && (lastTransactionNumber >= status->getTransactionSequenceNumber())) {
2198 status->setStatus(TransactionStatus_StatusCommitted);
2207 * Process this slot, entry by entry-> Also update the latest message sent by slot
2209 void Table::processSlot(SlotIndexer *indexer, Slot *slot, bool acceptUpdatesToLocal, Hashset<int64_t> *machineSet) {
2211 // Update the last message seen
2212 updateLastMessage(slot->getMachineID(), slot->getSequenceNumber(), slot, acceptUpdatesToLocal, machineSet);
2214 // Process each entry in the slot
2215 for (Entry *entry : slot->getEntries()) {
2216 switch (entry->getType()) {
2218 case TypeCommitPart:
2219 processEntry((CommitPart *)entry);
2223 processEntry((Abort *)entry);
2226 case TypeTransactionPart:
2227 processEntry((TransactionPart *)entry);
2231 processEntry((NewKey *)entry);
2234 case TypeLastMessage:
2235 processEntry((LastMessage *)entry, machineSet);
2238 case TypeRejectedMessage:
2239 processEntry((RejectedMessage *)entry, indexer);
2242 case TypeTableStatus:
2243 processEntry((TableStatus *)entry, slot->getSequenceNumber());
2247 throw new Error("Unrecognized type: " + entry->getType());
2253 * Update the last message that was sent for a machine Id
2255 void Table::processEntry(LastMessage *entry, Hashset<int64_t> *machineSet) {
2256 // Update what the last message received by a machine was
2257 updateLastMessage(entry->getMachineID(), entry->getSequenceNumber(), entry, false, machineSet);
2261 * Add the new key to the arbitrators table and update the set of live new keys (in case of a rescued new key message)
2263 void Table::processEntry(NewKey *entry) {
2265 // Update the arbitrator table with the new key information
2266 arbitratorTable->put(entry->getKey(), entry->getMachineID());
2268 // Update what the latest live new key is
2269 NewKey *oldNewKey = liveNewKeyTable->put(entry->getKey(), entry);
2270 if (oldNewKey != NULL) {
2271 // Delete the old new key messages
2272 oldNewKey->setDead();
2277 * Process new table status entries and set dead the old ones as new ones come in->
2278 * keeps track of the largest and smallest table status seen in this current round
2279 * of updating the local copy of the block chain
2281 void Table::processEntry(TableStatus entry, int64_t seq) {
2282 int newNumSlots = entry->getMaxSlots();
2283 updateCurrMaxSize(newNumSlots);
2285 initExpectedSize(seq, newNumSlots);
2287 if (liveTableStatus != NULL) {
2288 // We have a larger table status so the old table status is no int64_ter alive
2289 liveTableStatus->setDead();
2292 // Make this new table status the latest alive table status
2293 liveTableStatus = entry;
2297 * Check old messages to see if there is a block chain violation-> Also
2299 void Table::processEntry(RejectedMessage *entry, SlotIndexer *indexer) {
2300 int64_t oldSeqNum = entry->getOldSeqNum();
2301 int64_t newSeqNum = entry->getNewSeqNum();
2302 bool isequal = entry->getEqual();
2303 int64_t machineId = entry->getMachineID();
2304 int64_t seq = entry->getSequenceNumber();
2307 // Check if we have messages that were supposed to be rejected in our local block chain
2308 for (int64_t seqNum = oldSeqNum; seqNum <= newSeqNum; seqNum++) {
2311 Slot *slot = indexer->getSlot(seqNum);
2314 // If we have this slot make sure that it was not supposed to be a rejected slot
2316 int64_t slotMachineId = slot->getMachineID();
2317 if (isequal != (slotMachineId == machineId)) {
2318 throw new Error("Server Error: Trying to insert rejected message for slot " + seqNum);
2324 // Create a list of clients to watch until they see this rejected message entry->
2325 Hashset<int64_t> *deviceWatchSet = new Hashset<int64_t>();
2326 for (Map->Entry<int64_t, Pair<int64_t, Liveness *> *> *lastMessageEntry : lastMessageTable->entrySet()) {
2328 // Machine ID for the last message entry
2329 int64_t lastMessageEntryMachineId = lastMessageEntry->getKey();
2331 // We've seen it, don't need to continue to watch-> Our next
2332 // message will implicitly acknowledge it->
2333 if (lastMessageEntryMachineId == localMachineId) {
2337 Pair<int64_t, Liveness *> *lastMessageValue = lastMessageEntry->getValue();
2338 int64_t entrySequenceNumber = lastMessageValue->getFirst();
2340 if (entrySequenceNumber < seq) {
2342 // Add this rejected message to the set of messages that this machine ID did not see yet
2343 addWatchVector(lastMessageEntryMachineId, entry);
2345 // This client did not see this rejected message yet so add it to the watch set to monitor
2346 deviceWatchSet->add(lastMessageEntryMachineId);
2350 if (deviceWatchSet->isEmpty()) {
2351 // This rejected message has been seen by all the clients so
2354 // We need to watch this rejected message
2355 entry->setWatchSet(deviceWatchSet);
2360 * Check if this abort is live, if not then save it so we can kill it later->
2361 * update the last transaction number that was arbitrated on->
2363 void Table::processEntry(Abort *entry) {
2366 if (entry->getTransactionSequenceNumber() != -1) {
2367 // update the transaction status if it was sent to the server
2368 TransactionStatus *status = outstandingTransactionStatus->remove(entry->getTransactionSequenceNumber());
2369 if (status != NULL) {
2370 status->setStatus(TransactionStatus_StatusAborted);
2374 // Abort has not been seen by the client it is for yet so we need to keep track of it
2375 Abort *previouslySeenAbort = liveAbortTable->put(entry->getAbortId(), entry);
2376 if (previouslySeenAbort != NULL) {
2377 previouslySeenAbort->setDead(); // Delete old version of the abort since we got a rescued newer version
2380 if (entry->getTransactionArbitrator() == localMachineId) {
2381 liveAbortsGeneratedByLocal->put(entry->getArbitratorLocalSequenceNumber(), entry);
2384 if ((entry->getSequenceNumber() != -1) && (lastMessageTable->get(entry->getTransactionMachineId())->getFirst() >= entry->getSequenceNumber())) {
2386 // The machine already saw this so it is dead
2388 liveAbortTable->remove(entry->getAbortId());
2390 if (entry->getTransactionArbitrator() == localMachineId) {
2391 liveAbortsGeneratedByLocal->remove(entry->getArbitratorLocalSequenceNumber());
2397 // Update the last arbitration data that we have seen so far
2398 if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(entry->getTransactionArbitrator()) != NULL) {
2400 int64_t lastArbitrationSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(entry->getTransactionArbitrator());
2401 if (entry->getSequenceNumber() > lastArbitrationSequenceNumber) {
2403 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->put(entry->getTransactionArbitrator(), entry->getSequenceNumber());
2406 // Never seen any data from this arbitrator so record the first one
2407 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->put(entry->getTransactionArbitrator(), entry->getSequenceNumber());
2411 // Set dead a transaction if we can
2412 Transaction *transactionToSetDead = liveTransactionByTransactionIdTable->remove(new Pair<int64_t, int64_t>(entry->getTransactionMachineId(), entry->getTransactionClientLocalSequenceNumber()));
2413 if (transactionToSetDead != NULL) {
2414 liveTransactionBySequenceNumberTable->remove(transactionToSetDead->getSequenceNumber());
2417 // Update the last transaction sequence number that the arbitrator arbitrated on
2418 int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(entry->getTransactionArbitrator());
2419 if ((lastTransactionNumber == NULL) || (lastTransactionNumber < entry->getTransactionSequenceNumber())) {
2422 if (entry->getTransactionSequenceNumber() != -1) {
2423 lastArbitratedTransactionNumberByArbitratorTable->put(entry->getTransactionArbitrator(), entry->getTransactionSequenceNumber());
2429 * Set dead the transaction part if that transaction is dead and keep track of all new parts
2431 void Table::processEntry(TransactionPart *entry) {
2432 // Check if we have already seen this transaction and set it dead OR if it is not alive
2433 int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(entry->getArbitratorId());
2434 if ((lastTransactionNumber != NULL) && (lastTransactionNumber >= entry->getSequenceNumber())) {
2435 // This transaction is dead, it was already committed or aborted
2440 // This part is still alive
2441 Hashtable<Pair<int64_t, int32_t> *, TransactionPart *> *transactionPart = newTransactionParts->get(entry->getMachineId());
2443 if (transactionPart == NULL) {
2444 // Dont have a table for this machine Id yet so make one
2445 transactionPart = new Hashtable<Pair<int64_t, int32_t> *, TransactionPart *>();
2446 newTransactionParts->put(entry->getMachineId(), transactionPart);
2449 // Update the part and set dead ones we have already seen (got a rescued version)
2450 TransactionPart *previouslySeenPart = transactionPart->put(entry->getPartId(), entry);
2451 if (previouslySeenPart != NULL) {
2452 previouslySeenPart->setDead();
2457 * Process new commit entries and save them for future use-> Delete duplicates
2459 void Table::processEntry(CommitPart *entry) {
2460 // Update the last transaction that was updated if we can
2461 if (entry->getTransactionSequenceNumber() != -1) {
2462 int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(entry->getMachineId());
2464 // Update the last transaction sequence number that the arbitrator arbitrated on
2465 if ((lastTransactionNumber == NULL) || (lastTransactionNumber < entry->getTransactionSequenceNumber())) {
2466 lastArbitratedTransactionNumberByArbitratorTable->put(entry->getMachineId(), entry->getTransactionSequenceNumber());
2473 Hashtable<Pair<int64_t, int32_t> *, CommitPart *> *commitPart = newCommitParts->get(entry->getMachineId());
2475 if (commitPart == NULL) {
2476 // Don't have a table for this machine Id yet so make one
2477 commitPart = new Hashtable<Pair<int64_t, int32_t> *, CommitPart *>();
2478 newCommitParts->put(entry->getMachineId(), commitPart);
2481 // Update the part and set dead ones we have already seen (got a rescued version)
2482 CommitPart *previouslySeenPart = commitPart->put(entry->getPartId(), entry);
2483 if (previouslySeenPart != NULL) {
2484 previouslySeenPart->setDead();
2489 * Update the last message seen table-> Update and set dead the appropriate RejectedMessages as clients see them->
2490 * Updates the live aborts, removes those that are dead and sets them dead->
2491 * Check that the last message seen is correct and that there is no mismatch of our own last message or that
2492 * other clients have not had a rollback on the last message->
2494 void Table::updateLastMessage(int64_t machineId, int64_t seqNum, Liveness *liveness, bool acceptUpdatesToLocal, Hashset<intr64_t> *machineSet) {
2496 // We have seen this machine ID
2497 machineSet->remove(machineId);
2499 // Get the set of rejected messages that this machine Id is has not seen yet
2500 Hashset<RejectedMessage *> *watchset = rejectedMessageWatchVectorTable->get(machineId);
2502 // If there is a rejected message that this machine Id has not seen yet
2503 if (watchset != NULL) {
2505 // Go through each rejected message that this machine Id has not seen yet
2506 for (Iterator<RejectedMessage *> *rmit = watchset->iterator(); rmit->hasNext(); ) {
2508 RejectedMessage *rm = rmit->next();
2510 // If this machine Id has seen this rejected message->->->
2511 if (rm->getSequenceNumber() <= seqNum) {
2513 // Remove it from our watchlist
2516 // Decrement machines that need to see this notification
2517 rm->removeWatcher(machineId);
2522 // Set dead the abort
2523 for (Iterator<Map->Entry<Pair<int64_t, int64_t> *, Abort *> > i = liveAbortTable->entrySet()->iterator(); i->hasNext();) {
2524 Abort *abort = i->next()->getValue();
2526 if ((abort->getTransactionMachineId() == machineId) && (abort->getSequenceNumber() <= seqNum)) {
2530 if (abort->getTransactionArbitrator() == localMachineId) {
2531 liveAbortsGeneratedByLocal->remove(abort->getArbitratorLocalSequenceNumber());
2538 if (machineId == localMachineId) {
2539 // Our own messages are immediately dead->
2540 if (liveness instanceof LastMessage) {
2541 ((LastMessage *)liveness)->setDead();
2542 } else if (liveness instanceof Slot) {
2543 ((Slot *)liveness)->setDead();
2545 throw new Error("Unrecognized type");
2549 // Get the old last message for this device
2550 Pair<int64_t, Liveness *> *lastMessageEntry = lastMessageTable->put(machineId, new Pair<int64_t, Liveness *>(seqNum, liveness));
2551 if (lastMessageEntry == NULL) {
2552 // If no last message then there is nothing else to process
2556 int64_t lastMessageSeqNum = lastMessageEntry->getFirst();
2557 Liveness *lastEntry = lastMessageEntry->getSecond();
2559 // If it is not our machine Id since we already set ours to dead
2560 if (machineId != localMachineId) {
2561 if (lastEntry instanceof LastMessage) {
2562 ((LastMessage *)lastEntry)->setDead();
2563 } else if (lastEntry instanceof Slot) {
2564 ((Slot *)lastEntry)->setDead();
2566 throw new Error("Unrecognized type");
2570 // Make sure the server is not playing any games
2571 if (machineId == localMachineId) {
2573 if (hadPartialSendToServer) {
2574 // We were not making any updates and we had a machine mismatch
2575 if (lastMessageSeqNum > seqNum && !acceptUpdatesToLocal) {
2576 throw new Error("Server Error: Mismatch on local machine sequence number, needed at least: " + lastMessageSeqNum + " got: " + seqNum);
2580 // We were not making any updates and we had a machine mismatch
2581 if (lastMessageSeqNum != seqNum && !acceptUpdatesToLocal) {
2582 throw new Error("Server Error: Mismatch on local machine sequence number, needed: " + lastMessageSeqNum + " got: " + seqNum);
2586 if (lastMessageSeqNum > seqNum) {
2587 throw new Error("Server Error: Rollback on remote machine sequence number");
2593 * Add a rejected message entry to the watch set to keep track of which clients have seen that
2594 * rejected message entry and which have not->
2596 void Table::addWatchVector(int64_t machineId, RejectedMessage *entry) {
2597 Hashset<RejectedMessage *> *entries = rejectedMessageWatchVectorTable->get(machineId);
2598 if (entries == NULL) {
2599 // There is no set for this machine ID yet so create one
2600 entries = new Hashset<RejectedMessage *>();
2601 rejectedMessageWatchVectorTable->put(machineId, entries);
2603 entries->add(entry);
2607 * Check if the HMAC chain is not violated
2609 void Table::checkHMACChain(SlotIndexer *indexer, Array<Slot *> *newSlots) {
2610 for (int i = 0; i < newSlots->length; i++) {
2611 Slot *currSlot = newSlots[i];
2612 Slot *prevSlot = indexer->getSlot(currSlot->getSequenceNumber() - 1);
2613 if (prevSlot != NULL &&
2614 !prevSlot->getHMAC()->equals(currSlot->getPrevHMAC()))
2615 throw new Error("Server Error: Invalid HMAC Chain");