3 #include "SlotBuffer.h"
8 #include "PendingTransaction.h"
9 #include "TableStatus.h"
10 #include "TransactionStatus.h"
11 #include "Transaction.h"
12 #include "LastMessage.h"
16 Table::Table(IoTString *baseurl, IoTString *password, int64_t _localMachineId, int listeningPort) :
18 cloud(new CloudComm(this, baseurl, password, listeningPort)),
20 liveTableStatus(NULL),
21 pendingTransactionBuilder(NULL),
22 lastPendingTransactionSpeculatedOn(NULL),
23 firstPendingTransaction(NULL),
25 bufferResizeThreshold(0),
27 oldestLiveSlotSequenceNumver(1),
28 localMachineId(_localMachineId),
30 localTransactionSequenceNumber(0),
31 lastTransactionSequenceNumberSpeculatedOn(0),
32 oldestTransactionSequenceNumberSpeculatedOn(0),
33 localArbitrationSequenceNumber(0),
34 hadPartialSendToServer(false),
35 attemptedToSendToServer(false),
37 didFindTableStatus(false),
39 lastSlotAttemptedToSend(NULL),
42 lastTransactionPartsSent(NULL),
43 lastPendingSendArbitrationEntriesToDelete(NULL),
45 committedKeyValueTable(NULL),
46 speculatedKeyValueTable(NULL),
47 pendingTransactionSpeculatedKeyValueTable(NULL),
48 liveNewKeyTable(NULL),
49 lastMessageTable(NULL),
50 rejectedMessageWatchVectorTable(NULL),
51 arbitratorTable(NULL),
53 newTransactionParts(NULL),
55 lastArbitratedTransactionNumberByArbitratorTable(NULL),
56 liveTransactionBySequenceNumberTable(NULL),
57 liveTransactionByTransactionIdTable(NULL),
58 liveCommitsTable(NULL),
59 liveCommitsByKeyTable(NULL),
60 lastCommitSeenSequenceNumberByArbitratorTable(NULL),
61 rejectedSlotVector(NULL),
62 pendingTransactionQueue(NULL),
63 pendingSendArbitrationRounds(NULL),
64 pendingSendArbitrationEntriesToDelete(NULL),
65 transactionPartsSent(NULL),
66 outstandingTransactionStatus(NULL),
67 liveAbortsGeneratedByLocal(NULL),
68 offlineTransactionsCommittedAndAtServer(NULL),
69 localCommunicationTable(NULL),
70 lastTransactionSeenFromMachineFromServer(NULL),
71 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator(NULL),
72 lastInsertedNewKey(false),
78 Table::Table(CloudComm *_cloud, int64_t _localMachineId) :
82 liveTableStatus(NULL),
83 pendingTransactionBuilder(NULL),
84 lastPendingTransactionSpeculatedOn(NULL),
85 firstPendingTransaction(NULL),
87 bufferResizeThreshold(0),
89 oldestLiveSlotSequenceNumver(1),
90 localMachineId(_localMachineId),
92 localTransactionSequenceNumber(0),
93 lastTransactionSequenceNumberSpeculatedOn(0),
94 oldestTransactionSequenceNumberSpeculatedOn(0),
95 localArbitrationSequenceNumber(0),
96 hadPartialSendToServer(false),
97 attemptedToSendToServer(false),
99 didFindTableStatus(false),
101 lastSlotAttemptedToSend(NULL),
104 lastTransactionPartsSent(NULL),
105 lastPendingSendArbitrationEntriesToDelete(NULL),
107 committedKeyValueTable(NULL),
108 speculatedKeyValueTable(NULL),
109 pendingTransactionSpeculatedKeyValueTable(NULL),
110 liveNewKeyTable(NULL),
111 lastMessageTable(NULL),
112 rejectedMessageWatchVectorTable(NULL),
113 arbitratorTable(NULL),
114 liveAbortTable(NULL),
115 newTransactionParts(NULL),
116 newCommitParts(NULL),
117 lastArbitratedTransactionNumberByArbitratorTable(NULL),
118 liveTransactionBySequenceNumberTable(NULL),
119 liveTransactionByTransactionIdTable(NULL),
120 liveCommitsTable(NULL),
121 liveCommitsByKeyTable(NULL),
122 lastCommitSeenSequenceNumberByArbitratorTable(NULL),
123 rejectedSlotVector(NULL),
124 pendingTransactionQueue(NULL),
125 pendingSendArbitrationRounds(NULL),
126 pendingSendArbitrationEntriesToDelete(NULL),
127 transactionPartsSent(NULL),
128 outstandingTransactionStatus(NULL),
129 liveAbortsGeneratedByLocal(NULL),
130 offlineTransactionsCommittedAndAtServer(NULL),
131 localCommunicationTable(NULL),
132 lastTransactionSeenFromMachineFromServer(NULL),
133 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator(NULL),
134 lastInsertedNewKey(false),
141 * Init all the stuff needed for for table usage
144 // Init helper objects
145 random = new Random();
146 buffer = new SlotBuffer();
149 committedKeyValueTable = new Hashtable<IoTString *, KeyValue *>();
150 speculatedKeyValueTable = new Hashtable<IoTString *, KeyValue *>();
151 pendingTransactionSpeculatedKeyValueTable = new Hashtable<IoTString *, KeyValue *>();
152 liveNewKeyTable = new Hashtable<IoTString *, NewKey *>();
153 lastMessageTable = new Hashtable<int64_t, Pair<int64_t, Liveness *> >();
154 rejectedMessageWatchVectorTable = new Hashtable<int64_t, Hashset<RejectedMessage *> * >();
155 arbitratorTable = new Hashtable<IoTString *, int64_t>();
156 liveAbortTable = new Hashtable<Pair<int64_t, int64_t>, Abort *>();
157 newTransactionParts = new Hashtable<int64_t, Hashtable<Pair<int64_t, int32_t>, TransactionPart *> *>();
158 newCommitParts = new Hashtable<int64_t, Hashtable<Pair<int64_t, int32_t>, CommitPart *> *>();
159 lastArbitratedTransactionNumberByArbitratorTable = new Hashtable<int64_t, int64_t>();
160 liveTransactionBySequenceNumberTable = new Hashtable<int64_t, Transaction *>();
161 liveTransactionByTransactionIdTable = new Hashtable<Pair<int64_t, int64_t>, Transaction *>();
162 liveCommitsTable = new Hashtable<int64_t, Hashtable<int64_t, Commit *> >();
163 liveCommitsByKeyTable = new Hashtable<IoTString *, Commit *>();
164 lastCommitSeenSequenceNumberByArbitratorTable = new Hashtable<int64_t, int64_t>();
165 rejectedSlotVector = new Vector<int64_t>();
166 pendingTransactionQueue = new Vector<Transaction *>();
167 pendingSendArbitrationEntriesToDelete = new Vector<Entry *>();
168 transactionPartsSent = new Hashtable<Transaction *, Vector<int32_t> *>();
169 outstandingTransactionStatus = new Hashtable<int64_t, TransactionStatus *>();
170 liveAbortsGeneratedByLocal = new Hashtable<int64_t, Abort *>();
171 offlineTransactionsCommittedAndAtServer = new Hashset<Pair<int64_t, int64_t> >();
172 localCommunicationTable = new Hashtable<int64_t, Pair<IoTString *, int32_t> >();
173 lastTransactionSeenFromMachineFromServer = new Hashtable<int64_t, int64_t>();
174 pendingSendArbitrationRounds = new Vector<ArbitrationRound *>();
175 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator = new Hashtable<int64_t, int64_t>();
179 numberOfSlots = buffer->capacity();
180 setResizeThreshold();
184 * Initialize the table by inserting a table status as the first entry
185 * into the table status also initialize the crypto stuff.
187 void Table::initTable() {
188 cloud->initSecurity();
190 // Create the first insertion into the block chain which is the table status
191 Slot *s = new Slot(this, 1, localMachineId, localSequenceNumber);
192 localSequenceNumber++;
193 TableStatus *status = new TableStatus(s, numberOfSlots);
195 Array<Slot *> *array = cloud->putSlot(s, numberOfSlots);
198 array = new Array<Slot *>(1);
200 // update local block chain
201 validateAndUpdate(array, true);
202 } else if (array->length() == 1) {
203 // in case we did push the slot BUT we failed to init it
204 validateAndUpdate(array, true);
206 throw new Error("Error on initialization");
211 * Rebuild the table from scratch by pulling the latest block chain
214 void Table::rebuild() {
215 // Just pull the latest slots from the server
216 Array<Slot *> *newslots = cloud->getSlots(sequenceNumber + 1);
217 validateAndUpdate(newslots, true);
219 updateLiveTransactionsAndStatus();
222 void Table::addLocalCommunication(int64_t arbitrator, IoTString *hostName, int portNumber) {
223 localCommunicationTable->put(arbitrator, Pair<IoTString *, int32_t>(hostName, portNumber));
226 int64_t Table::getArbitrator(IoTString *key) {
227 return arbitratorTable->get(key);
230 void Table::close() {
234 IoTString *Table::getCommitted(IoTString *key) {
235 KeyValue *kv = committedKeyValueTable->get(key);
238 return kv->getValue();
244 IoTString *Table::getSpeculative(IoTString *key) {
245 KeyValue *kv = pendingTransactionSpeculatedKeyValueTable->get(key);
248 kv = speculatedKeyValueTable->get(key);
252 kv = committedKeyValueTable->get(key);
256 return kv->getValue();
262 IoTString *Table::getCommittedAtomic(IoTString *key) {
263 KeyValue *kv = committedKeyValueTable->get(key);
265 if (!arbitratorTable->contains(key)) {
266 throw new Error("Key not Found.");
269 // Make sure new key value pair matches the current arbitrator
270 if (!pendingTransactionBuilder->checkArbitrator(arbitratorTable->get(key))) {
271 // TODO: Maybe not throw en error
272 throw new Error("Not all Key Values Match Arbitrator.");
276 pendingTransactionBuilder->addKVGuard(new KeyValue(key, kv->getValue()));
277 return kv->getValue();
279 pendingTransactionBuilder->addKVGuard(new KeyValue(key, NULL));
284 IoTString *Table::getSpeculativeAtomic(IoTString *key) {
285 if (!arbitratorTable->contains(key)) {
286 throw new Error("Key not Found.");
289 // Make sure new key value pair matches the current arbitrator
290 if (!pendingTransactionBuilder->checkArbitrator(arbitratorTable->get(key))) {
291 // TODO: Maybe not throw en error
292 throw new Error("Not all Key Values Match Arbitrator.");
295 KeyValue *kv = pendingTransactionSpeculatedKeyValueTable->get(key);
298 kv = speculatedKeyValueTable->get(key);
302 kv = committedKeyValueTable->get(key);
306 pendingTransactionBuilder->addKVGuard(new KeyValue(key, kv->getValue()));
307 return kv->getValue();
309 pendingTransactionBuilder->addKVGuard(new KeyValue(key, NULL));
314 bool Table::update() {
316 Array<Slot *> *newSlots = cloud->getSlots(sequenceNumber + 1);
317 validateAndUpdate(newSlots, false);
321 updateLiveTransactionsAndStatus();
324 } catch (Exception *e) {
325 for (int64_t m : localCommunicationTable->keySet()) {
333 bool Table::createNewKey(IoTString *keyName, int64_t machineId) {
335 if (!arbitratorTable->contains(keyName)) {
336 // There is already an arbitrator
340 NewKey *newKey = new NewKey(NULL, keyName, machineId);
342 if (sendToServer(newKey)) {
343 // If successfully inserted
349 void Table::startTransaction() {
350 // Create a new transaction, invalidates any old pending transactions.
351 pendingTransactionBuilder = new PendingTransaction(localMachineId);
354 void Table::addKV(IoTString *key, IoTString *value) {
356 // Make sure it is a valid key
357 if (!arbitratorTable->contains(key)) {
358 throw new Error("Key not Found.");
361 // Make sure new key value pair matches the current arbitrator
362 if (!pendingTransactionBuilder->checkArbitrator(arbitratorTable->get(key))) {
363 // TODO: Maybe not throw en error
364 throw new Error("Not all Key Values Match Arbitrator.");
367 // Add the key value to this transaction
368 KeyValue *kv = new KeyValue(key, value);
369 pendingTransactionBuilder->addKV(kv);
372 TransactionStatus *Table::commitTransaction() {
374 if (pendingTransactionBuilder->getKVUpdates()->size() == 0) {
375 // transaction with no updates will have no effect on the system
376 return new TransactionStatus(TransactionStatus_StatusNoEffect, -1);
379 // Set the local transaction sequence number and increment
380 pendingTransactionBuilder->setClientLocalSequenceNumber(localTransactionSequenceNumber);
381 localTransactionSequenceNumber++;
383 // Create the transaction status
384 TransactionStatus *transactionStatus = new TransactionStatus(TransactionStatus_StatusPending, pendingTransactionBuilder->getArbitrator());
386 // Create the new transaction
387 Transaction *newTransaction = pendingTransactionBuilder->createTransaction();
388 newTransaction->setTransactionStatus(transactionStatus);
390 if (pendingTransactionBuilder->getArbitrator() != localMachineId) {
391 // Add it to the queue and invalidate the builder for safety
392 pendingTransactionQueue->add(newTransaction);
394 arbitrateOnLocalTransaction(newTransaction);
395 updateLiveStateFromLocal();
398 pendingTransactionBuilder = new PendingTransaction(localMachineId);
402 } catch (ServerException *e) {
404 Hashset<int64_t> *arbitratorTriedAndFailed = new Hashset<int64_t>();
405 for (Iterator<Transaction *> *iter = pendingTransactionQueue->iterator(); iter->hasNext(); ) {
406 Transaction *transaction = iter->next();
408 if (arbitratorTriedAndFailed->contains(transaction->getArbitrator())) {
409 // Already contacted this client so ignore all attempts to contact this client
410 // to preserve ordering for arbitrator
414 Pair<bool, bool> sendReturn = sendTransactionToLocal(transaction);
416 if (sendReturn->getFirst()) {
417 // Failed to contact over local
418 arbitratorTriedAndFailed->add(transaction->getArbitrator());
420 // Successful contact or should not contact
422 if (sendReturn->getSecond()) {
430 updateLiveStateFromLocal();
432 return transactionStatus;
436 * Recalculate the new resize threshold
438 void Table::setResizeThreshold() {
439 int resizeLower = (int) (Table_RESIZE_THRESHOLD * numberOfSlots);
440 bufferResizeThreshold = resizeLower - 1 + random->nextInt(numberOfSlots - resizeLower);
443 int64_t Table::getLocalSequenceNumber() {
444 return localSequenceNumber;
448 bool lastInsertedNewKey = false;
450 bool Table::sendToServer(NewKey *newKey) {
452 bool fromRetry = false;
455 if (hadPartialSendToServer) {
456 Array<Slot *> *newSlots = cloud->getSlots(sequenceNumber + 1);
457 if (newSlots->length() == 0) {
459 ThreeTuple<bool, bool, Array<Slot *> *> sendSlotsReturn = sendSlotsToServer(lastSlotAttemptedToSend, lastNewSize, lastIsNewKey);
461 if (sendSlotsReturn->getFirst()) {
462 if (newKey != NULL) {
463 if (lastInsertedNewKey && (lastNewKey->getKey() == newKey->getKey()) && (lastNewKey->getMachineID() == newKey->getMachineID())) {
468 for (Transaction *transaction : lastTransactionPartsSent->keySet()) {
469 transaction->resetServerFailure();
471 // Update which transactions parts still need to be sent
472 transaction->removeSentParts(lastTransactionPartsSent->get(transaction));
474 // Add the transaction status to the outstanding list
475 outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus());
477 // Update the transaction status
478 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial);
480 // Check if all the transaction parts were successfully sent and if so then remove it from pending
481 if (transaction->didSendAllParts()) {
482 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully);
483 pendingTransactionQueue->remove(transaction);
488 newSlots = sendSlotsReturn->getThird();
490 bool isInserted = false;
491 for (uint si = 0; si < newSlots->length(); si++) {
492 Slot *s = newSlots->get(si);
493 if ((s->getSequenceNumber() == lastSlotAttemptedToSend->getSequenceNumber()) && (s->getMachineID() == localMachineId)) {
499 for (uint si = 0; si < newSlots->length(); si++) {
500 Slot *s = newSlots->get(si);
505 // Process each entry in the slot
506 for (Entry *entry : s->getEntries()) {
507 if (entry->getType() == TypeLastMessage) {
508 LastMessage *lastMessage = (LastMessage *)entry;
509 if ((lastMessage->getMachineID() == localMachineId) && (lastMessage->getSequenceNumber() == lastSlotAttemptedToSend->getSequenceNumber())) {
518 if (newKey != NULL) {
519 if (lastInsertedNewKey && (lastNewKey->getKey() == newKey->getKey()) && (lastNewKey->getMachineID() == newKey->getMachineID())) {
524 for (Transaction *transaction : lastTransactionPartsSent->keySet()) {
525 transaction->resetServerFailure();
527 // Update which transactions parts still need to be sent
528 transaction->removeSentParts(lastTransactionPartsSent->get(transaction));
530 // Add the transaction status to the outstanding list
531 outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus());
533 // Update the transaction status
534 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial);
536 // Check if all the transaction parts were successfully sent and if so then remove it from pending
537 if (transaction->didSendAllParts()) {
538 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully);
539 pendingTransactionQueue->remove(transaction);
541 transaction->resetServerFailure();
542 // Set the transaction sequence number back to nothing
543 if (!transaction->didSendAPartToServer()) {
544 transaction->setSequenceNumber(-1);
551 for (Transaction *transaction : lastTransactionPartsSent->keySet()) {
552 transaction->resetServerFailure();
553 // Set the transaction sequence number back to nothing
554 if (!transaction->didSendAPartToServer()) {
555 transaction->setSequenceNumber(-1);
559 if (sendSlotsReturn->getThird()->length() != 0) {
560 // insert into the local block chain
561 validateAndUpdate(sendSlotsReturn->getThird(), true);
565 bool isInserted = false;
566 for (uint si = 0; si < newSlots->length(); si++) {
567 Slot *s = newSlots->get(si);
568 if ((s->getSequenceNumber() == lastSlotAttemptedToSend->getSequenceNumber()) && (s->getMachineID() == localMachineId)) {
574 for (uint si = 0; si < newSlots->length(); si++) {
575 Slot *s = newSlots->get(si);
580 // Process each entry in the slot
581 for (Entry *entry : s->getEntries()) {
583 if (entry->getType() == TypeLastMessage) {
584 LastMessage *lastMessage = (LastMessage *)entry;
585 if ((lastMessage->getMachineID() == localMachineId) && (lastMessage->getSequenceNumber() == lastSlotAttemptedToSend->getSequenceNumber())) {
594 if (newKey != NULL) {
595 if (lastInsertedNewKey && (lastNewKey->getKey() == newKey->getKey()) && (lastNewKey->getMachineID() == newKey->getMachineID())) {
600 for (Transaction *transaction : lastTransactionPartsSent->keySet()) {
601 transaction->resetServerFailure();
603 // Update which transactions parts still need to be sent
604 transaction->removeSentParts(lastTransactionPartsSent->get(transaction));
606 // Add the transaction status to the outstanding list
607 outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus());
609 // Update the transaction status
610 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial);
612 // Check if all the transaction parts were successfully sent and if so then remove it from pending
613 if (transaction->didSendAllParts()) {
614 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully);
615 pendingTransactionQueue->remove(transaction);
617 transaction->resetServerFailure();
618 // Set the transaction sequence number back to nothing
619 if (!transaction->didSendAPartToServer()) {
620 transaction->setSequenceNumber(-1);
625 for (Transaction *transaction : lastTransactionPartsSent->keySet()) {
626 transaction->resetServerFailure();
627 // Set the transaction sequence number back to nothing
628 if (!transaction->didSendAPartToServer()) {
629 transaction->setSequenceNumber(-1);
634 // insert into the local block chain
635 validateAndUpdate(newSlots, true);
638 } catch (ServerException *e) {
645 // While we have stuff that needs inserting into the block chain
646 while ((pendingTransactionQueue->size() > 0) || (pendingSendArbitrationRounds->size() > 0) || (newKey != NULL)) {
650 if (hadPartialSendToServer) {
651 throw new Error("Should Be error free");
656 // If there is a new key with same name then end
657 if ((newKey != NULL) && arbitratorTable->contains(newKey->getKey())) {
662 Slot *slot = new Slot(this, sequenceNumber + 1, localMachineId, buffer->getSlot(sequenceNumber)->getHMAC(), localSequenceNumber);
663 localSequenceNumber++;
665 // Try to fill the slot with data
666 ThreeTuple<bool, int32_t, bool> fillSlotsReturn = fillSlot(slot, false, newKey);
667 bool needsResize = fillSlotsReturn->getFirst();
668 int newSize = fillSlotsReturn->getSecond();
669 bool insertedNewKey = fillSlotsReturn->getThird();
672 // Reset which transaction to send
673 for (Transaction *transaction : transactionPartsSent->keySet()) {
674 transaction->resetNextPartToSend();
676 // Set the transaction sequence number back to nothing
677 if (!transaction->didSendAPartToServer() && !transaction->getServerFailure()) {
678 transaction->setSequenceNumber(-1);
682 // Clear the sent data since we are trying again
683 pendingSendArbitrationEntriesToDelete->clear();
684 transactionPartsSent->clear();
686 // We needed a resize so try again
687 fillSlot(slot, true, newKey);
690 lastSlotAttemptedToSend = slot;
691 lastIsNewKey = (newKey != NULL);
692 lastInsertedNewKey = insertedNewKey;
693 lastNewSize = newSize;
695 lastTransactionPartsSent = new Hashtable<Transaction *, Vector<int32_t> * >(transactionPartsSent);
696 lastPendingSendArbitrationEntriesToDelete = new Vector<Entry *>(pendingSendArbitrationEntriesToDelete);
699 ThreeTuple<bool, bool, Array<Slot *> *> sendSlotsReturn = sendSlotsToServer(slot, newSize, newKey != NULL);
701 if (sendSlotsReturn->getFirst()) {
703 // Did insert into the block chain
705 if (insertedNewKey) {
706 // This slot was what was inserted not a previous slot
708 // New Key was successfully inserted into the block chain so dont want to insert it again
712 // Remove the aborts and commit parts that were sent from the pending to send queue
713 for (Iterator<ArbitrationRound *> *iter = pendingSendArbitrationRounds->iterator(); iter->hasNext(); ) {
714 ArbitrationRound *round = iter->next();
715 round->removeParts(pendingSendArbitrationEntriesToDelete);
717 if (round->isDoneSending()) {
718 // Sent all the parts
723 for (Transaction *transaction : transactionPartsSent->keySet()) {
724 transaction->resetServerFailure();
726 // Update which transactions parts still need to be sent
727 transaction->removeSentParts(transactionPartsSent->get(transaction));
729 // Add the transaction status to the outstanding list
730 outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus());
732 // Update the transaction status
733 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial);
735 // Check if all the transaction parts were successfully sent and if so then remove it from pending
736 if (transaction->didSendAllParts()) {
737 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully);
738 pendingTransactionQueue->remove(transaction);
742 // Reset which transaction to send
743 for (Transaction *transaction : transactionPartsSent->keySet()) {
744 transaction->resetNextPartToSend();
746 // Set the transaction sequence number back to nothing
747 if (!transaction->didSendAPartToServer() && !transaction->getServerFailure()) {
748 transaction->setSequenceNumber(-1);
753 // Clear the sent data in preparation for next send
754 pendingSendArbitrationEntriesToDelete->clear();
755 transactionPartsSent->clear();
757 if (sendSlotsReturn->getThird()->length() != 0) {
758 // insert into the local block chain
759 validateAndUpdate(sendSlotsReturn->getThird(), true);
763 } catch (ServerException *e) {
765 if (e->getType() != ServerException->TypeInputTimeout) {
766 // Nothing was able to be sent to the server so just clear these data structures
767 for (Transaction *transaction : transactionPartsSent->keySet()) {
768 transaction->resetNextPartToSend();
770 // Set the transaction sequence number back to nothing
771 if (!transaction->didSendAPartToServer() && !transaction->getServerFailure()) {
772 transaction->setSequenceNumber(-1);
776 // There was a partial send to the server
777 hadPartialSendToServer = true;
779 // Nothing was able to be sent to the server so just clear these data structures
780 for (Transaction *transaction : transactionPartsSent->keySet()) {
781 transaction->resetNextPartToSend();
782 transaction->setServerFailure();
786 pendingSendArbitrationEntriesToDelete->clear();
787 transactionPartsSent->clear();
792 return newKey == NULL;
795 bool Table::updateFromLocal(int64_t machineId) {
796 Pair<IoTString *, int32_t> localCommunicationInformation = localCommunicationTable->get(machineId);
797 if (localCommunicationInformation == NULL) {
798 // Cant talk to that device locally so do nothing
802 // Get the size of the send data
803 int sendDataSize = sizeof(int32_t) + sizeof(int64_t);
805 int64_t lastArbitrationDataLocalSequenceNumber = (int64_t) -1;
806 if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(machineId) != NULL) {
807 lastArbitrationDataLocalSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(machineId);
810 Array<char> *sendData = new Array<char>(sendDataSize);
811 ByteBuffer *bbEncode = ByteBuffer_wrap(sendData);
814 bbEncode->putLong(lastArbitrationDataLocalSequenceNumber);
818 Array<char> *returnData = cloud->sendLocalData(sendData, localSequenceNumber, localCommunicationInformation->getFirst(), localCommunicationInformation->getSecond());
819 localSequenceNumber++;
821 if (returnData == NULL) {
822 // Could not contact server
827 ByteBuffer *bbDecode = ByteBuffer_wrap(returnData);
828 int numberOfEntries = bbDecode->getInt();
830 for (int i = 0; i < numberOfEntries; i++) {
831 char type = bbDecode->get();
832 if (type == TypeAbort) {
833 Abort *abort = (Abort)Abort_decode(NULL, bbDecode);
835 } else if (type == TypeCommitPart) {
836 CommitPart *commitPart = (CommitPart)CommitPart_decode(NULL, bbDecode);
837 processEntry(commitPart);
841 updateLiveStateFromLocal();
846 Pair<bool, bool> Table::sendTransactionToLocal(Transaction *transaction) {
848 // Get the devices local communications
849 Pair<IoTString *, int32_t> localCommunicationInformation = localCommunicationTable->get(transaction->getArbitrator());
851 if (localCommunicationInformation == NULL) {
852 // Cant talk to that device locally so do nothing
853 return Pair<bool, bool>(true, false);
856 // Get the size of the send data
857 int sendDataSize = sizeof(int32_t) + sizeof(int64_t);
858 for (TransactionPart *part : transaction->getParts()->values()) {
859 sendDataSize += part->getSize();
862 int64_t lastArbitrationDataLocalSequenceNumber = (int64_t) -1;
863 if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(transaction->getArbitrator()) != NULL) {
864 lastArbitrationDataLocalSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(transaction->getArbitrator());
867 // Make the send data size
868 Array<char> *sendData = new Array<char>(sendDataSize);
869 ByteBuffer *bbEncode = ByteBuffer.wrap(sendData);
872 bbEncode->putLong(lastArbitrationDataLocalSequenceNumber);
873 bbEncode->putInt(transaction->getParts()->size());
874 for (TransactionPart *part : transaction->getParts()->values()) {
875 part->encode(bbEncode);
880 Array<char> *returnData = cloud->sendLocalData(sendData, localSequenceNumber, localCommunicationInformation->getFirst(), localCommunicationInformation->getSecond());
881 localSequenceNumber++;
883 if (returnData == NULL) {
884 // Could not contact server
885 return Pair<bool, bool>(true, false);
889 ByteBuffer *bbDecode = ByteBuffer_wrap(returnData);
890 bool didCommit = bbDecode->get() == 1;
891 bool couldArbitrate = bbDecode->get() == 1;
892 int numberOfEntries = bbDecode->getInt();
893 bool foundAbort = false;
895 for (int i = 0; i < numberOfEntries; i++) {
896 char type = bbDecode->get();
897 if (type == TypeAbort) {
898 Abort *abort = (Abort)Abort_decode(NULL, bbDecode);
900 if ((abort->getTransactionMachineId() == localMachineId) && (abort->getTransactionClientLocalSequenceNumber() == transaction->getClientLocalSequenceNumber())) {
905 } else if (type == TypeCommitPart) {
906 CommitPart *commitPart = (CommitPart)CommitPart_decode(NULL, bbDecode);
907 processEntry(commitPart);
911 updateLiveStateFromLocal();
913 if (couldArbitrate) {
914 TransactionStatus status = transaction->getTransactionStatus();
916 status->setStatus(TransactionStatus_StatusCommitted);
918 status->setStatus(TransactionStatus_StatusAborted);
921 TransactionStatus status = transaction->getTransactionStatus();
923 status->setStatus(TransactionStatus_StatusAborted);
925 status->setStatus(TransactionStatus_StatusCommitted);
929 return Pair<bool, bool>(false, true);
932 Array<char> *Table::acceptDataFromLocal(Array<char> *data) {
935 ByteBuffer *bbDecode = ByteBuffer_wrap(data);
936 int64_t lastArbitratedSequenceNumberSeen = bbDecode->getLong();
937 int numberOfParts = bbDecode->getInt();
939 // If we did commit a transaction or not
940 bool didCommit = false;
941 bool couldArbitrate = false;
943 if (numberOfParts != 0) {
945 // decode the transaction
946 Transaction *transaction = new Transaction();
947 for (int i = 0; i < numberOfParts; i++) {
949 TransactionPart *newPart = (TransactionPart)TransactionPart.decode(NULL, bbDecode);
950 transaction->addPartDecode(newPart);
953 // Arbitrate on transaction and pull relevant return data
954 Pair<bool, bool> localArbitrateReturn = arbitrateOnLocalTransaction(transaction);
955 couldArbitrate = localArbitrateReturn->getFirst();
956 didCommit = localArbitrateReturn->getSecond();
958 updateLiveStateFromLocal();
960 // Transaction was sent to the server so keep track of it to prevent double commit
961 if (transaction->getSequenceNumber() != -1) {
962 offlineTransactionsCommittedAndAtServer->add(transaction->getId());
966 // The data to send back
967 int returnDataSize = 0;
968 Vector<Entry *> *unseenArbitrations = new Vector<Entry *>();
970 // Get the aborts to send back
971 Vector<int64_t> *abortLocalSequenceNumbers = new Vector<int64_t>(liveAbortsGeneratedByLocal->keySet());
972 Collections->sort(abortLocalSequenceNumbers);
973 for (int64_t localSequenceNumber : abortLocalSequenceNumbers) {
974 if (localSequenceNumber <= lastArbitratedSequenceNumberSeen) {
978 Abort *abort = liveAbortsGeneratedByLocal->get(localSequenceNumber);
979 unseenArbitrations->add(abort);
980 returnDataSize += abort->getSize();
983 // Get the commits to send back
984 Hashtable<int64_t, Commit *> *commitForClientTable = liveCommitsTable->get(localMachineId);
985 if (commitForClientTable != NULL) {
986 Vector<int64_t> *commitLocalSequenceNumbers = new Vector<int64_t>(commitForClientTable->keySet());
987 Collections->sort(commitLocalSequenceNumbers);
989 for (int64_t localSequenceNumber : commitLocalSequenceNumbers) {
990 Commit *commit = commitForClientTable->get(localSequenceNumber);
992 if (localSequenceNumber <= lastArbitratedSequenceNumberSeen) {
996 unseenArbitrations->addAll(commit->getParts()->values());
998 for (CommitPart commitPart : commit->getParts()->values()) {
999 returnDataSize += commitPart->getSize();
1004 // Number of arbitration entries to decode
1005 returnDataSize += 2 * sizeof(int32_t);
1007 // bool of did commit or not
1008 if (numberOfParts != 0) {
1009 returnDataSize += sizeof(char);
1012 // Data to send Back
1013 Array<char> *returnData = new Array<char>(returnDataSize);
1014 ByteBuffer *bbEncode = ByteBuffer_wrap(returnData);
1016 if (numberOfParts != 0) {
1018 bbEncode->put((char)1);
1020 bbEncode->put((char)0);
1022 if (couldArbitrate) {
1023 bbEncode->put((char)1);
1025 bbEncode->put((char)0);
1029 bbEncode->putInt(unseenArbitrations->size());
1030 for (Entry *entry : unseenArbitrations) {
1031 entry->encode(bbEncode);
1035 localSequenceNumber++;
1039 ThreeTuple<bool, bool, Array<Slot *> *> Table::sendSlotsToServer(Slot *slot, int newSize, bool isNewKey) {
1040 bool attemptedToSendToServerTmp = attemptedToSendToServer;
1041 attemptedToSendToServer = true;
1043 bool inserted = false;
1044 bool lastTryInserted = false;
1046 Array<Slot *> *array = cloud->putSlot(slot, newSize);
1047 if (array == NULL) {
1048 array = new Array<Slot *>();
1049 array->set(0, slot);
1050 rejectedSlotVector->clear();
1053 if (array->length() == 0) {
1054 throw new Error("Server Error: Did not send any slots");
1057 // if (attemptedToSendToServerTmp) {
1058 if (hadPartialSendToServer) {
1060 bool isInserted = false;
1061 for (Slot *s : array) {
1062 if ((s->getSequenceNumber() == slot->getSequenceNumber()) && (s->getMachineID() == localMachineId)) {
1068 for (Slot *s : array) {
1073 // Process each entry in the slot
1074 for (Entry *entry : s->getEntries()) {
1076 if (entry->getType() == TypeLastMessage) {
1077 LastMessage *lastMessage = (LastMessage *)entry;
1079 if ((lastMessage->getMachineID() == localMachineId) && (lastMessage->getSequenceNumber() == slot->getSequenceNumber())) {
1088 rejectedSlotVector->add(slot->getSequenceNumber());
1089 lastTryInserted = false;
1091 lastTryInserted = true;
1094 rejectedSlotVector->add(slot->getSequenceNumber());
1095 lastTryInserted = false;
1099 return ThreeTuple<bool, bool, Array<Slot *> *>(inserted, lastTryInserted, array);
1103 * Returns false if a resize was needed
1105 ThreeTuple<bool, int32_t, bool> Table::fillSlot(Slot *slot, bool resize, NewKey *newKeyEntry) {
1107 if (liveSlotCount > bufferResizeThreshold) {
1108 resize = true; //Resize is forced
1112 newSize = (int) (numberOfSlots * Table_RESIZE_MULTIPLE);
1113 TableStatus *status = new TableStatus(slot, newSize);
1114 slot->addEntry(status);
1117 // Fill with rejected slots first before doing anything else
1118 doRejectedMessages(slot);
1120 // Do mandatory rescue of entries
1121 ThreeTuple<bool, bool, int64_t> mandatoryRescueReturn = doMandatoryResuce(slot, resize);
1123 // Extract working variables
1124 bool needsResize = mandatoryRescueReturn->getFirst();
1125 bool seenLiveSlot = mandatoryRescueReturn->getSecond();
1126 int64_t currentRescueSequenceNumber = mandatoryRescueReturn->getThird();
1128 if (needsResize && !resize) {
1129 // We need to resize but we are not resizing so return false
1130 return ThreeTuple<bool, int32_t, bool>(true, NULL, NULL);
1133 bool inserted = false;
1134 if (newKeyEntry != NULL) {
1135 newKeyEntry->setSlot(slot);
1136 if (slot->hasSpace(newKeyEntry)) {
1138 slot->addEntry(newKeyEntry);
1143 // Clear the transactions, aborts and commits that were sent previously
1144 transactionPartsSent->clear();
1145 pendingSendArbitrationEntriesToDelete->clear();
1147 for (ArbitrationRound *round : pendingSendArbitrationRounds) {
1148 bool isFull = false;
1149 round->generateParts();
1150 Vector<Entry *> *parts = round->getParts();
1152 // Insert pending arbitration data
1153 for (Entry *arbitrationData : parts) {
1155 // If it is an abort then we need to set some information
1156 if (arbitrationData instanceof Abort) {
1157 ((Abort *)arbitrationData)->setSequenceNumber(slot->getSequenceNumber());
1160 if (!slot->hasSpace(arbitrationData)) {
1161 // No space so cant do anything else with these data entries
1166 // Add to this current slot and add it to entries to delete
1167 slot->addEntry(arbitrationData);
1168 pendingSendArbitrationEntriesToDelete->add(arbitrationData);
1176 if (pendingTransactionQueue->size() > 0) {
1177 Transaction *transaction = pendingTransactionQueue->get(0);
1178 // Set the transaction sequence number if it has yet to be inserted into the block chain
1179 if ((!transaction->didSendAPartToServer()) || (transaction->getSequenceNumber() == -1)) {
1180 transaction->setSequenceNumber(slot->getSequenceNumber());
1184 TransactionPart *part = transaction->getNextPartToSend();
1186 // Ran out of parts to send for this transaction so move on
1190 if (slot->hasSpace(part)) {
1191 slot->addEntry(part);
1192 Vector<int32_t> *partsSent = transactionPartsSent->get(transaction);
1193 if (partsSent == NULL) {
1194 partsSent = new Vector<int32_t>();
1195 transactionPartsSent->put(transaction, partsSent);
1197 partsSent->add(part->getPartNumber());
1198 transactionPartsSent->put(transaction, partsSent);
1205 // Fill the remainder of the slot with rescue data
1206 doOptionalRescue(slot, seenLiveSlot, currentRescueSequenceNumber, resize);
1208 return ThreeTuple<bool, int32_t, bool>(false, newSize, inserted);
1211 void Table::doRejectedMessages(Slot *s) {
1212 if (!rejectedSlotVector->isEmpty()) {
1213 /* TODO: We should avoid generating a rejected message entry if
1214 * there is already a sufficient entry in the queue (e->g->,
1215 * equalsto value of true and same sequence number)-> */
1217 int64_t old_seqn = rejectedSlotVector->firstElement();
1218 if (rejectedSlotVector->size() > Table_REJECTED_THRESHOLD) {
1219 int64_t new_seqn = rejectedSlotVector->lastElement();
1220 RejectedMessage *rm = new RejectedMessage(s, s->getSequenceNumber(), localMachineId, old_seqn, new_seqn, false);
1223 int64_t prev_seqn = -1;
1225 /* Go through list of missing messages */
1226 for (; i < rejectedSlotVector->size(); i++) {
1227 int64_t curr_seqn = rejectedSlotVector->get(i);
1228 Slot *s_msg = buffer->getSlot(curr_seqn);
1231 prev_seqn = curr_seqn;
1233 /* Generate rejected message entry for missing messages */
1234 if (prev_seqn != -1) {
1235 RejectedMessage *rm = new RejectedMessage(s, s->getSequenceNumber(), localMachineId, old_seqn, prev_seqn, false);
1238 /* Generate rejected message entries for present messages */
1239 for (; i < rejectedSlotVector->size(); i++) {
1240 int64_t curr_seqn = rejectedSlotVector->get(i);
1241 Slot *s_msg = buffer->getSlot(curr_seqn);
1242 int64_t machineid = s_msg->getMachineID();
1243 RejectedMessage *rm = new RejectedMessage(s, s->getSequenceNumber(), machineid, curr_seqn, curr_seqn, true);
1250 ThreeTuple<bool, bool, int64_t> Table::doMandatoryResuce(Slot *slot, bool resize) {
1251 int64_t newestSequenceNumber = buffer->getNewestSeqNum();
1252 int64_t oldestSequenceNumber = buffer->getOldestSeqNum();
1253 if (oldestLiveSlotSequenceNumver < oldestSequenceNumber) {
1254 oldestLiveSlotSequenceNumver = oldestSequenceNumber;
1257 int64_t currentSequenceNumber = oldestLiveSlotSequenceNumver;
1258 bool seenLiveSlot = false;
1259 int64_t firstIfFull = newestSequenceNumber + 1 - numberOfSlots; // smallest seq number in the buffer if it is full
1260 int64_t threshold = firstIfFull + Table_FREE_SLOTS; // we want the buffer to be clear of live entries up to this point
1264 for (; currentSequenceNumber < threshold; currentSequenceNumber++) {
1265 Slot previousSlot = buffer->getSlot(currentSequenceNumber);
1266 // Push slot number forward
1267 if (!seenLiveSlot) {
1268 oldestLiveSlotSequenceNumver = currentSequenceNumber;
1271 if (!previousSlot->isLive()) {
1275 // We have seen a live slot
1276 seenLiveSlot = true;
1278 // Get all the live entries for a slot
1279 Vector<Entry *> *liveEntries = previousSlot->getLiveEntries(resize);
1281 // Iterate over all the live entries and try to rescue them
1282 for (Entry *liveEntry : liveEntries) {
1283 if (slot->hasSpace(liveEntry)) {
1285 // Enough space to rescue the entry
1286 slot->addEntry(liveEntry);
1287 } else if (currentSequenceNumber == firstIfFull) {
1288 //if there's no space but the entry is about to fall off the queue
1289 System->out->println("B"); //?
1290 return ThreeTuple<bool, bool, int64_t>(true, seenLiveSlot, currentSequenceNumber);
1297 return ThreeTuple<bool, bool, int64_t>(false, seenLiveSlot, currentSequenceNumber);
1300 void Table::doOptionalRescue(Slot *s, bool seenliveslot, int64_t seqn, bool resize) {
1301 /* now go through live entries from least to greatest sequence number until
1302 * either all live slots added, or the slot doesn't have enough room
1303 * for SKIP_THRESHOLD consecutive entries*/
1305 int64_t newestseqnum = buffer->getNewestSeqNum();
1307 for (; seqn <= newestseqnum; seqn++) {
1308 Slot *prevslot = buffer->getSlot(seqn);
1309 //Push slot number forward
1311 oldestLiveSlotSequenceNumver = seqn;
1313 if (!prevslot->isLive())
1315 seenliveslot = true;
1316 Vector<Entry *> *liveentries = prevslot->getLiveEntries(resize);
1317 for (Entry *liveentry : liveentries) {
1318 if (s->hasSpace(liveentry))
1319 s->addEntry(liveentry);
1322 if (skipcount > Table_SKIP_THRESHOLD)
1330 * Checks for malicious activity and updates the local copy of the block chain->
1332 void Table::validateAndUpdate(Array<Slot *> *newSlots, bool acceptUpdatesToLocal) {
1334 // The cloud communication layer has checked slot HMACs already before decoding
1335 if (newSlots->length() == 0) {
1339 // Make sure all slots are newer than the last largest slot this client has seen
1340 int64_t firstSeqNum = newSlots[0]->getSequenceNumber();
1341 if (firstSeqNum <= sequenceNumber) {
1342 throw new Error("Server Error: Sent older slots!");
1345 // Create an object that can access both new slots and slots in our local chain
1346 // without committing slots to our local chain
1347 SlotIndexer *indexer = new SlotIndexer(newSlots, buffer);
1349 // Check that the HMAC chain is not broken
1350 checkHMACChain(indexer, newSlots);
1352 // Set to keep track of messages from clients
1353 Hashset<int64_t> *machineSet = new Hashset<int64_t>(lastMessageTable->keySet());
1355 // Process each slots data
1356 for (Slot *slot : newSlots) {
1357 processSlot(indexer, slot, acceptUpdatesToLocal, machineSet);
1359 updateExpectedSize();
1362 // If there is a gap, check to see if the server sent us everything->
1363 if (firstSeqNum != (sequenceNumber + 1)) {
1365 // Check the size of the slots that were sent down by the server->
1366 // Can only check the size if there was a gap
1367 checkNumSlots(newSlots->length);
1369 // Since there was a gap every machine must have pushed a slot or must have
1370 // a last message message-> If not then the server is hiding slots
1371 if (!machineSet->isEmpty()) {
1372 throw new Error("Missing record for machines: " + machineSet);
1376 // Update the size of our local block chain->
1379 // Commit new to slots to the local block chain->
1380 for (Slot *slot : newSlots) {
1382 // Insert this slot into our local block chain copy->
1383 buffer->putSlot(slot);
1385 // Keep track of how many slots are currently live (have live data in them)->
1389 // Get the sequence number of the latest slot in the system
1390 sequenceNumber = newSlots[newSlots->length() - 1]->getSequenceNumber();
1392 updateLiveStateFromServer();
1394 // No Need to remember after we pulled from the server
1395 offlineTransactionsCommittedAndAtServer->clear();
1397 // This is invalidated now
1398 hadPartialSendToServer = false;
1401 void Table::updateLiveStateFromServer() {
1402 // Process the new transaction parts
1403 processNewTransactionParts();
1405 // Do arbitration on new transactions that were received
1406 arbitrateFromServer();
1408 // Update all the committed keys
1409 bool didCommitOrSpeculate = updateCommittedTable();
1411 // Delete the transactions that are now dead
1412 updateLiveTransactionsAndStatus();
1415 didCommitOrSpeculate |= updateSpeculativeTable(didCommitOrSpeculate);
1416 updatePendingTransactionSpeculativeTable(didCommitOrSpeculate);
1419 void Table::updateLiveStateFromLocal() {
1420 // Update all the committed keys
1421 bool didCommitOrSpeculate = updateCommittedTable();
1423 // Delete the transactions that are now dead
1424 updateLiveTransactionsAndStatus();
1427 didCommitOrSpeculate |= updateSpeculativeTable(didCommitOrSpeculate);
1428 updatePendingTransactionSpeculativeTable(didCommitOrSpeculate);
1431 void Table::initExpectedSize(int64_t firstSequenceNumber, int64_t numberOfSlots) {
1432 // if (didFindTableStatus) {
1435 int64_t prevslots = firstSequenceNumber;
1438 if (didFindTableStatus) {
1439 // expectedsize = (prevslots < ((int64_t) numberOfSlots)) ? (int) prevslots : expectedsize;
1440 // System->out->println("Here2: " + expectedsize + " " + numberOfSlots + " " + prevslots);
1443 expectedsize = (prevslots < ((int64_t) numberOfSlots)) ? (int) prevslots : numberOfSlots;
1444 // System->out->println("Here: " + expectedsize);
1447 // System->out->println(numberOfSlots);
1449 didFindTableStatus = true;
1450 currMaxSize = numberOfSlots;
1453 void Table::updateExpectedSize() {
1456 if (expectedsize > currMaxSize) {
1457 expectedsize = currMaxSize;
1463 * Check the size of the block chain to make sure there are enough slots sent back by the server->
1464 * This is only called when we have a gap between the slots that we have locally and the slots
1465 * sent by the server therefore in the slots sent by the server there will be at least 1 Table
1468 void Table::checkNumSlots(int numberOfSlots) {
1469 if (numberOfSlots != expectedsize) {
1470 throw new Error("Server Error: Server did not send all slots-> Expected: " + expectedsize + " Received:" + numberOfSlots);
1474 void Table::updateCurrMaxSize(int newmaxsize) {
1475 currMaxSize = newmaxsize;
1480 * Update the size of of the local buffer if it is needed->
1482 void Table::commitNewMaxSize() {
1483 didFindTableStatus = false;
1485 // Resize the local slot buffer
1486 if (numberOfSlots != currMaxSize) {
1487 buffer->resize((int32_t)currMaxSize);
1490 // Change the number of local slots to the new size
1491 numberOfSlots = (int32_t)currMaxSize;
1494 // Recalculate the resize threshold since the size of the local buffer has changed
1495 setResizeThreshold();
1499 * Process the new transaction parts from this latest round of slots received from the server
1501 void Table::processNewTransactionParts() {
1503 if (newTransactionParts->size() == 0) {
1504 // Nothing new to process
1508 // Iterate through all the machine Ids that we received new parts for
1509 for (int64_t machineId : newTransactionParts->keySet()) {
1510 Hashtable<Pair<int64_t int32_t>, TransactionPart *> *parts = newTransactionParts->get(machineId);
1512 // Iterate through all the parts for that machine Id
1513 for (Pair<int64_t, int32_t> partId : parts->keySet()) {
1514 TransactionPart *part = parts->get(partId);
1516 int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(part->getArbitratorId());
1517 if ((lastTransactionNumber != NULL) && (lastTransactionNumber >= part->getSequenceNumber())) {
1518 // Set dead the transaction part
1523 // Get the transaction object for that sequence number
1524 Transaction *transaction = liveTransactionBySequenceNumberTable->get(part->getSequenceNumber());
1526 if (transaction == NULL) {
1527 // This is a new transaction that we dont have so make a new one
1528 transaction = new Transaction();
1530 // Insert this new transaction into the live tables
1531 liveTransactionBySequenceNumberTable->put(part->getSequenceNumber(), transaction);
1532 liveTransactionByTransactionIdTable->put(part->getTransactionId(), transaction);
1535 // Add that part to the transaction
1536 transaction->addPartDecode(part);
1540 // Clear all the new transaction parts in preparation for the next time the server sends slots
1541 newTransactionParts->clear();
1544 void Table::arbitrateFromServer() {
1546 if (liveTransactionBySequenceNumberTable->size() == 0) {
1547 // Nothing to arbitrate on so move on
1551 // Get the transaction sequence numbers and sort from oldest to newest
1552 Vector<int64_t> *transactionSequenceNumbers = new Vector<int64_t>(liveTransactionBySequenceNumberTable->keySet());
1553 Collections->sort(transactionSequenceNumbers);
1555 // Collection of key value pairs that are
1556 Hashtable<IoTString *, KeyValue *> speculativeTableTmp = new Hashtable<IoTString *, KeyValue *>();
1558 // The last transaction arbitrated on
1559 int64_t lastTransactionCommitted = -1;
1560 Hashset<Abort *> *generatedAborts = new Hashset<Abort *>();
1562 for (int64_t transactionSequenceNumber : transactionSequenceNumbers) {
1563 Transaction *transaction = liveTransactionBySequenceNumberTable->get(transactionSequenceNumber);
1567 // Check if this machine arbitrates for this transaction if not then we cant arbitrate this transaction
1568 if (transaction->getArbitrator() != localMachineId) {
1572 if (transactionSequenceNumber < lastSeqNumArbOn) {
1576 if (offlineTransactionsCommittedAndAtServer->contains(transaction->getId())) {
1577 // We have seen this already locally so dont commit again
1582 if (!transaction->isComplete()) {
1583 // Will arbitrate in incorrect order if we continue so just break
1589 // update the largest transaction seen by arbitrator from server
1590 if (lastTransactionSeenFromMachineFromServer->get(transaction->getMachineId()) == NULL) {
1591 lastTransactionSeenFromMachineFromServer->put(transaction->getMachineId(), transaction->getClientLocalSequenceNumber());
1593 int64_t lastTransactionSeenFromMachine = lastTransactionSeenFromMachineFromServer->get(transaction->getMachineId());
1594 if (transaction->getClientLocalSequenceNumber() > lastTransactionSeenFromMachine) {
1595 lastTransactionSeenFromMachineFromServer->put(transaction->getMachineId(), transaction->getClientLocalSequenceNumber());
1599 if (transaction->evaluateGuard(committedKeyValueTable, speculativeTableTmp, NULL)) {
1600 // Guard evaluated as true
1602 // Update the local changes so we can make the commit
1603 for (KeyValue *kv : transaction->getKeyValueUpdateSet()) {
1604 speculativeTableTmp->put(kv->getKey(), kv);
1607 // Update what the last transaction committed was for use in batch commit
1608 lastTransactionCommitted = transactionSequenceNumber;
1610 // Guard evaluated was false so create abort
1613 Abort *newAbort = new Abort(NULL,
1614 transaction->getClientLocalSequenceNumber(),
1615 transaction->getSequenceNumber(),
1616 transaction->getMachineId(),
1617 transaction->getArbitrator(),
1618 localArbitrationSequenceNumber);
1619 localArbitrationSequenceNumber++;
1621 generatedAborts->add(newAbort);
1623 // Insert the abort so we can process
1624 processEntry(newAbort);
1627 lastSeqNumArbOn = transactionSequenceNumber;
1629 // liveTransactionBySequenceNumberTable->remove(transactionSequenceNumber);
1632 Commit *newCommit = NULL;
1634 // If there is something to commit
1635 if (speculativeTableTmp->size() != 0) {
1637 // Create the commit and increment the commit sequence number
1638 newCommit = new Commit(localArbitrationSequenceNumber, localMachineId, lastTransactionCommitted);
1639 localArbitrationSequenceNumber++;
1641 // Add all the new keys to the commit
1642 for (KeyValue *kv : speculativeTableTmp->values()) {
1643 newCommit->addKV(kv);
1646 // create the commit parts
1647 newCommit->createCommitParts();
1649 // Append all the commit parts to the end of the pending queue waiting for sending to the server
1651 // Insert the commit so we can process it
1652 for (CommitPart *commitPart : newCommit->getParts()->values()) {
1653 processEntry(commitPart);
1657 if ((newCommit != NULL) || (generatedAborts->size() > 0)) {
1658 ArbitrationRound *arbitrationRound = new ArbitrationRound(newCommit, generatedAborts);
1659 pendingSendArbitrationRounds->add(arbitrationRound);
1661 if (compactArbitrationData()) {
1662 ArbitrationRound *newArbitrationRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1);
1663 if (newArbitrationRound->getCommit() != NULL) {
1664 for (CommitPart *commitPart : newArbitrationRound->getCommit()->getParts()->values()) {
1665 processEntry(commitPart);
1672 Pair<bool, bool> Table::arbitrateOnLocalTransaction(Transaction *transaction) {
1674 // Check if this machine arbitrates for this transaction if not then we cant arbitrate this transaction
1675 if (transaction->getArbitrator() != localMachineId) {
1676 return Pair<bool, bool>(false, false);
1679 if (!transaction->isComplete()) {
1680 // Will arbitrate in incorrect order if we continue so just break
1682 return Pair<bool, bool>(false, false);
1685 if (transaction->getMachineId() != localMachineId) {
1686 // dont do this check for local transactions
1687 if (lastTransactionSeenFromMachineFromServer->get(transaction->getMachineId()) != NULL) {
1688 if (lastTransactionSeenFromMachineFromServer->get(transaction->getMachineId()) > transaction->getClientLocalSequenceNumber()) {
1689 // We've have already seen this from the server
1690 return Pair<bool, bool>(false, false);
1695 if (transaction->evaluateGuard(committedKeyValueTable, NULL, NULL)) {
1696 // Guard evaluated as true
1698 // Create the commit and increment the commit sequence number
1699 Commit *newCommit = new Commit(localArbitrationSequenceNumber, localMachineId, -1);
1700 localArbitrationSequenceNumber++;
1702 // Update the local changes so we can make the commit
1703 for (KeyValue *kv : transaction->getKeyValueUpdateSet()) {
1704 newCommit->addKV(kv);
1707 // create the commit parts
1708 newCommit->createCommitParts();
1710 // Append all the commit parts to the end of the pending queue waiting for sending to the server
1711 ArbitrationRound *arbitrationRound = new ArbitrationRound(newCommit, new Hashset<Abort *>());
1712 pendingSendArbitrationRounds->add(arbitrationRound);
1714 if (compactArbitrationData()) {
1715 ArbitrationRound *newArbitrationRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1);
1716 for (CommitPart *commitPart : newArbitrationRound->getCommit()->getParts()->values()) {
1717 processEntry(commitPart);
1720 // Insert the commit so we can process it
1721 for (CommitPart *commitPart : newCommit->getParts()->values()) {
1722 processEntry(commitPart);
1726 if (transaction->getMachineId() == localMachineId) {
1727 TransactionStatus *status = transaction->getTransactionStatus();
1728 if (status != NULL) {
1729 status->setStatus(TransactionStatus_StatusCommitted);
1733 updateLiveStateFromLocal();
1734 return Pair<bool, bool>(true, true);
1737 if (transaction->getMachineId() == localMachineId) {
1738 // For locally created messages update the status
1740 // Guard evaluated was false so create abort
1741 TransactionStatus status = transaction->getTransactionStatus();
1742 if (status != NULL) {
1743 status->setStatus(TransactionStatus_StatusAborted);
1746 Hashset<Abort *> *addAbortSet = new Hashset<Abort * >();
1750 Abort *newAbort = new Abort(NULL,
1751 transaction->getClientLocalSequenceNumber(),
1753 transaction->getMachineId(),
1754 transaction->getArbitrator(),
1755 localArbitrationSequenceNumber);
1756 localArbitrationSequenceNumber++;
1758 addAbortSet->add(newAbort);
1761 // Append all the commit parts to the end of the pending queue waiting for sending to the server
1762 ArbitrationRound *arbitrationRound = new ArbitrationRound(NULL, addAbortSet);
1763 pendingSendArbitrationRounds->add(arbitrationRound);
1765 if (compactArbitrationData()) {
1766 ArbitrationRound *newArbitrationRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1);
1767 for (CommitPart *commitPart : newArbitrationRound->getCommit()->getParts()->values()) {
1768 processEntry(commitPart);
1773 updateLiveStateFromLocal();
1774 return Pair<bool, bool>(true, false);
1779 * Compacts the arbitration data my merging commits and aggregating aborts so that a single large push of commits can be done instead of many small updates
1781 bool Table::compactArbitrationData() {
1783 if (pendingSendArbitrationRounds->size() < 2) {
1784 // Nothing to compact so do nothing
1788 ArbitrationRound *lastRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1);
1789 if (lastRound->didSendPart()) {
1793 bool hadCommit = (lastRound->getCommit() == NULL);
1794 bool gotNewCommit = false;
1796 int numberToDelete = 1;
1797 while (numberToDelete < pendingSendArbitrationRounds->size()) {
1798 ArbitrationRound *xs round = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - numberToDelete - 1);
1800 if (round->isFull() || round->didSendPart()) {
1801 // Stop since there is a part that cannot be compacted and we need to compact in order
1805 if (round->getCommit() == NULL) {
1807 // Try compacting aborts only
1808 int newSize = round->getCurrentSize() + lastRound->getAbortsCount();
1809 if (newSize > ArbitrationRound->MAX_PARTS) {
1810 // Cant compact since it would be too large
1813 lastRound->addAborts(round->getAborts());
1816 // Create a new larger commit
1817 Commit newCommit = Commit->merge(lastRound->getCommit(), round->getCommit(), localArbitrationSequenceNumber);
1818 localArbitrationSequenceNumber++;
1820 // Create the commit parts so that we can count them
1821 newCommit->createCommitParts();
1823 // Calculate the new size of the parts
1824 int newSize = newCommit->getNumberOfParts();
1825 newSize += lastRound->getAbortsCount();
1826 newSize += round->getAbortsCount();
1828 if (newSize > ArbitrationRound->MAX_PARTS) {
1829 // Cant compact since it would be too large
1833 // Set the new compacted part
1834 lastRound->setCommit(newCommit);
1835 lastRound->addAborts(round->getAborts());
1836 gotNewCommit = true;
1842 if (numberToDelete != 1) {
1843 // If there is a compaction
1845 // Delete the previous pieces that are now in the new compacted piece
1846 if (numberToDelete == pendingSendArbitrationRounds->size()) {
1847 pendingSendArbitrationRounds->clear();
1849 for (int i = 0; i < numberToDelete; i++) {
1850 pendingSendArbitrationRounds->remove(pendingSendArbitrationRounds->size() - 1);
1854 // Add the new compacted into the pending to send list
1855 pendingSendArbitrationRounds->add(lastRound);
1857 // Should reinsert into the commit processor
1858 if (hadCommit && gotNewCommit) {
1865 // bool compactArbitrationData() {
1870 * Update all the commits and the committed tables, sets dead the dead transactions
1872 bool Table::updateCommittedTable() {
1874 if (newCommitParts->size() == 0) {
1875 // Nothing new to process
1879 // Iterate through all the machine Ids that we received new parts for
1880 for (int64_t machineId : newCommitParts->keySet()) {
1881 Hashtable<Pair<int64_t, int32_t>, CommitPart *> *parts = newCommitParts->get(machineId);
1883 // Iterate through all the parts for that machine Id
1884 for (Pair<int64_t, int32_t> partId : parts->keySet()) {
1885 CommitPart *part = parts->get(partId);
1887 // Get the transaction object for that sequence number
1888 Hashtable<int64_t, Commit *> *commitForClientTable = liveCommitsTable->get(part->getMachineId());
1890 if (commitForClientTable == NULL) {
1891 // This is the first commit from this device
1892 commitForClientTable = new Hashtable<int64_t, Commit *>();
1893 liveCommitsTable->put(part->getMachineId(), commitForClientTable);
1896 Commit *commit = commitForClientTable->get(part->getSequenceNumber());
1898 if (commit == NULL) {
1899 // This is a new commit that we dont have so make a new one
1900 commit = new Commit();
1902 // Insert this new commit into the live tables
1903 commitForClientTable->put(part->getSequenceNumber(), commit);
1906 // Add that part to the commit
1907 commit->addPartDecode(part);
1911 // Clear all the new commits parts in preparation for the next time the server sends slots
1912 newCommitParts->clear();
1914 // If we process a new commit keep track of it for future use
1915 bool didProcessANewCommit = false;
1917 // Process the commits one by one
1918 for (int64_T arbitratorId : liveCommitsTable->keySet()) {
1920 // Get all the commits for a specific arbitrator
1921 Hashtable<int64_t, Commit *> *commitForClientTable = liveCommitsTable->get(arbitratorId);
1923 // Sort the commits in order
1924 Vector<int64_t> *commitSequenceNumbers = new Vector<int64_t>(commitForClientTable->keySet());
1925 Collections->sort(commitSequenceNumbers);
1927 // Get the last commit seen from this arbitrator
1928 int64_t lastCommitSeenSequenceNumber = -1;
1929 if (lastCommitSeenSequenceNumberByArbitratorTable->get(arbitratorId) != NULL) {
1930 lastCommitSeenSequenceNumber = lastCommitSeenSequenceNumberByArbitratorTable->get(arbitratorId);
1933 // Go through each new commit one by one
1934 for (int i = 0; i < commitSequenceNumbers->size(); i++) {
1935 int64_t commitSequenceNumber = commitSequenceNumbers->get(i);
1936 Commit *commit = commitForClientTable->get(commitSequenceNumber);
1938 // Special processing if a commit is not complete
1939 if (!commit->isComplete()) {
1940 if (i == (commitSequenceNumbers->size() - 1)) {
1941 // If there is an incomplete commit and this commit is the latest one seen then this commit cannot be processed and there are no other commits
1944 // This is a commit that was already dead but parts of it are still in the block chain (not flushed out yet)->
1945 // Delete it and move on
1947 commitForClientTable->remove(commit->getSequenceNumber());
1952 // Update the last transaction that was updated if we can
1953 if (commit->getTransactionSequenceNumber() != -1) {
1954 int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(commit->getMachineId());
1956 // Update the last transaction sequence number that the arbitrator arbitrated on
1957 if ((lastTransactionNumber == NULL) || (lastTransactionNumber < commit->getTransactionSequenceNumber())) {
1958 lastArbitratedTransactionNumberByArbitratorTable->put(commit->getMachineId(), commit->getTransactionSequenceNumber());
1962 // Update the last arbitration data that we have seen so far
1963 if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(commit->getMachineId()) != NULL) {
1965 int64_t lastArbitrationSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(commit->getMachineId());
1966 if (commit->getSequenceNumber() > lastArbitrationSequenceNumber) {
1968 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->put(commit->getMachineId(), commit->getSequenceNumber());
1971 // Never seen any data from this arbitrator so record the first one
1972 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->put(commit->getMachineId(), commit->getSequenceNumber());
1975 // We have already seen this commit before so need to do the full processing on this commit
1976 if (commit->getSequenceNumber() <= lastCommitSeenSequenceNumber) {
1978 // Update the last transaction that was updated if we can
1979 if (commit->getTransactionSequenceNumber() != -1) {
1980 int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(commit->getMachineId());
1982 // Update the last transaction sequence number that the arbitrator arbitrated on
1983 if ((lastTransactionNumber == NULL) || (lastTransactionNumber < commit->getTransactionSequenceNumber())) {
1984 lastArbitratedTransactionNumberByArbitratorTable->put(commit->getMachineId(), commit->getTransactionSequenceNumber());
1991 // If we got here then this is a brand new commit and needs full processing
1993 // Get what commits should be edited, these are the commits that have live values for their keys
1994 Hashset<Commit *> *commitsToEdit = new Hashset<Commit *>();
1995 for (KeyValue *kv : commit->getKeyValueUpdateSet()) {
1996 commitsToEdit->add(liveCommitsByKeyTable->get(kv->getKey()));
1998 commitsToEdit->remove(NULL); // remove NULL since it could be in this set
2000 // Update each previous commit that needs to be updated
2001 for (Commit *previousCommit : commitsToEdit) {
2003 // Only bother with live commits (TODO: Maybe remove this check)
2004 if (previousCommit->isLive()) {
2006 // Update which keys in the old commits are still live
2007 for (KeyValue *kv : commit->getKeyValueUpdateSet()) {
2008 previousCommit->invalidateKey(kv->getKey());
2011 // if the commit is now dead then remove it
2012 if (!previousCommit->isLive()) {
2013 commitForClientTable->remove(previousCommit);
2018 // Update the last seen sequence number from this arbitrator
2019 if (lastCommitSeenSequenceNumberByArbitratorTable->get(commit->getMachineId()) != NULL) {
2020 if (commit->getSequenceNumber() > lastCommitSeenSequenceNumberByArbitratorTable->get(commit->getMachineId())) {
2021 lastCommitSeenSequenceNumberByArbitratorTable->put(commit->getMachineId(), commit->getSequenceNumber());
2024 lastCommitSeenSequenceNumberByArbitratorTable->put(commit->getMachineId(), commit->getSequenceNumber());
2027 // We processed a new commit that we havent seen before
2028 didProcessANewCommit = true;
2030 // Update the committed table of keys and which commit is using which key
2031 for (KeyValue *kv : commit->getKeyValueUpdateSet()) {
2032 committedKeyValueTable->put(kv->getKey(), kv);
2033 liveCommitsByKeyTable->put(kv->getKey(), commit);
2038 return didProcessANewCommit;
2042 * Create the speculative table from transactions that are still live and have come from the cloud
2044 bool Table::updateSpeculativeTable(bool didProcessNewCommits) {
2045 if (liveTransactionBySequenceNumberTable->keySet()->size() == 0) {
2046 // There is nothing to speculate on
2050 // Create a list of the transaction sequence numbers and sort them from oldest to newest
2051 Vector<int64_t> *transactionSequenceNumbersSorted = new Vector<int64_t>(liveTransactionBySequenceNumberTable->keySet());
2052 Collections->sort(transactionSequenceNumbersSorted);
2054 bool hasGapInTransactionSequenceNumbers = transactionSequenceNumbersSorted->get(0) != oldestTransactionSequenceNumberSpeculatedOn;
2057 if (hasGapInTransactionSequenceNumbers || didProcessNewCommits) {
2058 // If there is a gap in the transaction sequence numbers then there was a commit or an abort of a transaction
2059 // OR there was a new commit (Could be from offline commit) so a redo the speculation from scratch
2061 // Start from scratch
2062 speculatedKeyValueTable->clear();
2063 lastTransactionSequenceNumberSpeculatedOn = -1;
2064 oldestTransactionSequenceNumberSpeculatedOn = -1;
2068 // Remember the front of the transaction list
2069 oldestTransactionSequenceNumberSpeculatedOn = transactionSequenceNumbersSorted->get(0);
2071 // Find where to start arbitration from
2072 int startIndex = transactionSequenceNumbersSorted->indexOf(lastTransactionSequenceNumberSpeculatedOn) + 1;
2074 if (startIndex >= transactionSequenceNumbersSorted->size()) {
2075 // Make sure we are not out of bounds
2076 return false; // did not speculate
2079 Hashset<int64_t> *incompleteTransactionArbitrator = new Hashset<int64_t>();
2080 bool didSkip = true;
2082 for (int i = startIndex; i < transactionSequenceNumbersSorted->size(); i++) {
2083 int64_t transactionSequenceNumber = transactionSequenceNumbersSorted->get(i);
2084 Transaction *transaction = liveTransactionBySequenceNumberTable->get(transactionSequenceNumber);
2086 if (!transaction->isComplete()) {
2087 // If there is an incomplete transaction then there is nothing we can do
2088 // add this transactions arbitrator to the list of arbitrators we should ignore
2089 incompleteTransactionArbitrator->add(transaction->getArbitrator());
2094 if (incompleteTransactionArbitrator->contains(transaction->getArbitrator())) {
2098 lastTransactionSequenceNumberSpeculatedOn = transactionSequenceNumber;
2100 if (transaction->evaluateGuard(committedKeyValueTable, speculatedKeyValueTable, NULL)) {
2101 // Guard evaluated to true so update the speculative table
2102 for (KeyValue *kv : transaction->getKeyValueUpdateSet()) {
2103 speculatedKeyValueTable->put(kv->getKey(), kv);
2109 // Since there was a skip we need to redo the speculation next time around
2110 lastTransactionSequenceNumberSpeculatedOn = -1;
2111 oldestTransactionSequenceNumberSpeculatedOn = -1;
2114 // We did some speculation
2119 * Create the pending transaction speculative table from transactions that are still in the pending transaction buffer
2121 void Table::updatePendingTransactionSpeculativeTable(bool didProcessNewCommitsOrSpeculate) {
2122 if (pendingTransactionQueue->size() == 0) {
2123 // There is nothing to speculate on
2127 if (didProcessNewCommitsOrSpeculate || (firstPendingTransaction != pendingTransactionQueue->get(0))) {
2128 // need to reset on the pending speculation
2129 lastPendingTransactionSpeculatedOn = NULL;
2130 firstPendingTransaction = pendingTransactionQueue->get(0);
2131 pendingTransactionSpeculatedKeyValueTable->clear();
2134 // Find where to start arbitration from
2135 int startIndex = pendingTransactionQueue->indexOf(firstPendingTransaction) + 1;
2137 if (startIndex >= pendingTransactionQueue->size()) {
2138 // Make sure we are not out of bounds
2142 for (int i = startIndex; i < pendingTransactionQueue->size(); i++) {
2143 Transaction *transaction = pendingTransactionQueue->get(i);
2145 lastPendingTransactionSpeculatedOn = transaction;
2147 if (transaction->evaluateGuard(committedKeyValueTable, speculatedKeyValueTable, pendingTransactionSpeculatedKeyValueTable)) {
2148 // Guard evaluated to true so update the speculative table
2149 for (KeyValue *kv : transaction->getKeyValueUpdateSet()) {
2150 pendingTransactionSpeculatedKeyValueTable->put(kv->getKey(), kv);
2157 * Set dead and remove from the live transaction tables the transactions that are dead
2159 void Table::updateLiveTransactionsAndStatus() {
2161 // Go through each of the transactions
2162 for (Iterator<Map->Entry<int64_t, Transaction> > *iter = liveTransactionBySequenceNumberTable->entrySet()->iterator(); iter->hasNext();) {
2163 Transaction *transaction = iter->next()->getValue();
2165 // Check if the transaction is dead
2166 int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(transaction->getArbitrator());
2167 if ((lastTransactionNumber != NULL) && (lastTransactionNumber >= transaction->getSequenceNumber())) {
2169 // Set dead the transaction
2170 transaction->setDead();
2172 // Remove the transaction from the live table
2174 liveTransactionByTransactionIdTable->remove(transaction->getId());
2178 // Go through each of the transactions
2179 for (Iterator<Map->Entry<int64_t, TransactionStatus *> > *iter = outstandingTransactionStatus->entrySet()->iterator(); iter->hasNext();) {
2180 TransactionStatus *status = iter->next()->getValue();
2182 // Check if the transaction is dead
2183 int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(status->getTransactionArbitrator());
2184 if ((lastTransactionNumber != NULL) && (lastTransactionNumber >= status->getTransactionSequenceNumber())) {
2187 status->setStatus(TransactionStatus_StatusCommitted);
2196 * Process this slot, entry by entry-> Also update the latest message sent by slot
2198 void Table::processSlot(SlotIndexer *indexer, Slot *slot, bool acceptUpdatesToLocal, Hashset<int64_t> *machineSet) {
2200 // Update the last message seen
2201 updateLastMessage(slot->getMachineID(), slot->getSequenceNumber(), slot, acceptUpdatesToLocal, machineSet);
2203 // Process each entry in the slot
2204 for (Entry *entry : slot->getEntries()) {
2205 switch (entry->getType()) {
2207 case TypeCommitPart:
2208 processEntry((CommitPart *)entry);
2212 processEntry((Abort *)entry);
2215 case TypeTransactionPart:
2216 processEntry((TransactionPart *)entry);
2220 processEntry((NewKey *)entry);
2223 case TypeLastMessage:
2224 processEntry((LastMessage *)entry, machineSet);
2227 case TypeRejectedMessage:
2228 processEntry((RejectedMessage *)entry, indexer);
2231 case TypeTableStatus:
2232 processEntry((TableStatus *)entry, slot->getSequenceNumber());
2236 throw new Error("Unrecognized type: " + entry->getType());
2242 * Update the last message that was sent for a machine Id
2244 void Table::processEntry(LastMessage *entry, Hashset<int64_t> *machineSet) {
2245 // Update what the last message received by a machine was
2246 updateLastMessage(entry->getMachineID(), entry->getSequenceNumber(), entry, false, machineSet);
2250 * Add the new key to the arbitrators table and update the set of live new keys (in case of a rescued new key message)
2252 void Table::processEntry(NewKey *entry) {
2254 // Update the arbitrator table with the new key information
2255 arbitratorTable->put(entry->getKey(), entry->getMachineID());
2257 // Update what the latest live new key is
2258 NewKey *oldNewKey = liveNewKeyTable->put(entry->getKey(), entry);
2259 if (oldNewKey != NULL) {
2260 // Delete the old new key messages
2261 oldNewKey->setDead();
2266 * Process new table status entries and set dead the old ones as new ones come in->
2267 * keeps track of the largest and smallest table status seen in this current round
2268 * of updating the local copy of the block chain
2270 void Table::processEntry(TableStatus entry, int64_t seq) {
2271 int newNumSlots = entry->getMaxSlots();
2272 updateCurrMaxSize(newNumSlots);
2274 initExpectedSize(seq, newNumSlots);
2276 if (liveTableStatus != NULL) {
2277 // We have a larger table status so the old table status is no int64_ter alive
2278 liveTableStatus->setDead();
2281 // Make this new table status the latest alive table status
2282 liveTableStatus = entry;
2286 * Check old messages to see if there is a block chain violation-> Also
2288 void Table::processEntry(RejectedMessage *entry, SlotIndexer *indexer) {
2289 int64_t oldSeqNum = entry->getOldSeqNum();
2290 int64_t newSeqNum = entry->getNewSeqNum();
2291 bool isequal = entry->getEqual();
2292 int64_t machineId = entry->getMachineID();
2293 int64_t seq = entry->getSequenceNumber();
2296 // Check if we have messages that were supposed to be rejected in our local block chain
2297 for (int64_t seqNum = oldSeqNum; seqNum <= newSeqNum; seqNum++) {
2300 Slot *slot = indexer->getSlot(seqNum);
2303 // If we have this slot make sure that it was not supposed to be a rejected slot
2305 int64_t slotMachineId = slot->getMachineID();
2306 if (isequal != (slotMachineId == machineId)) {
2307 throw new Error("Server Error: Trying to insert rejected message for slot " + seqNum);
2313 // Create a list of clients to watch until they see this rejected message entry->
2314 Hashset<int64_t> *deviceWatchSet = new Hashset<int64_t>();
2315 for (Map->Entry<int64_t, Pair<int64_t, Liveness *> > *lastMessageEntry : lastMessageTable->entrySet()) {
2317 // Machine ID for the last message entry
2318 int64_t lastMessageEntryMachineId = lastMessageEntry->getKey();
2320 // We've seen it, don't need to continue to watch-> Our next
2321 // message will implicitly acknowledge it->
2322 if (lastMessageEntryMachineId == localMachineId) {
2326 Pair<int64_t, Liveness *> lastMessageValue = lastMessageEntry->getValue();
2327 int64_t entrySequenceNumber = lastMessageValue->getFirst();
2329 if (entrySequenceNumber < seq) {
2331 // Add this rejected message to the set of messages that this machine ID did not see yet
2332 addWatchVector(lastMessageEntryMachineId, entry);
2334 // This client did not see this rejected message yet so add it to the watch set to monitor
2335 deviceWatchSet->add(lastMessageEntryMachineId);
2339 if (deviceWatchSet->isEmpty()) {
2340 // This rejected message has been seen by all the clients so
2343 // We need to watch this rejected message
2344 entry->setWatchSet(deviceWatchSet);
2349 * Check if this abort is live, if not then save it so we can kill it later->
2350 * update the last transaction number that was arbitrated on->
2352 void Table::processEntry(Abort *entry) {
2355 if (entry->getTransactionSequenceNumber() != -1) {
2356 // update the transaction status if it was sent to the server
2357 TransactionStatus *status = outstandingTransactionStatus->remove(entry->getTransactionSequenceNumber());
2358 if (status != NULL) {
2359 status->setStatus(TransactionStatus_StatusAborted);
2363 // Abort has not been seen by the client it is for yet so we need to keep track of it
2364 Abort *previouslySeenAbort = liveAbortTable->put(entry->getAbortId(), entry);
2365 if (previouslySeenAbort != NULL) {
2366 previouslySeenAbort->setDead(); // Delete old version of the abort since we got a rescued newer version
2369 if (entry->getTransactionArbitrator() == localMachineId) {
2370 liveAbortsGeneratedByLocal->put(entry->getArbitratorLocalSequenceNumber(), entry);
2373 if ((entry->getSequenceNumber() != -1) && (lastMessageTable->get(entry->getTransactionMachineId())->getFirst() >= entry->getSequenceNumber())) {
2375 // The machine already saw this so it is dead
2377 liveAbortTable->remove(entry->getAbortId());
2379 if (entry->getTransactionArbitrator() == localMachineId) {
2380 liveAbortsGeneratedByLocal->remove(entry->getArbitratorLocalSequenceNumber());
2386 // Update the last arbitration data that we have seen so far
2387 if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(entry->getTransactionArbitrator()) != NULL) {
2389 int64_t lastArbitrationSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(entry->getTransactionArbitrator());
2390 if (entry->getSequenceNumber() > lastArbitrationSequenceNumber) {
2392 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->put(entry->getTransactionArbitrator(), entry->getSequenceNumber());
2395 // Never seen any data from this arbitrator so record the first one
2396 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->put(entry->getTransactionArbitrator(), entry->getSequenceNumber());
2400 // Set dead a transaction if we can
2401 Transaction *transactionToSetDead = liveTransactionByTransactionIdTable->remove(Pair<int64_t, int64_t>(entry->getTransactionMachineId(), entry->getTransactionClientLocalSequenceNumber()));
2402 if (transactionToSetDead != NULL) {
2403 liveTransactionBySequenceNumberTable->remove(transactionToSetDead->getSequenceNumber());
2406 // Update the last transaction sequence number that the arbitrator arbitrated on
2407 int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(entry->getTransactionArbitrator());
2408 if ((lastTransactionNumber == NULL) || (lastTransactionNumber < entry->getTransactionSequenceNumber())) {
2411 if (entry->getTransactionSequenceNumber() != -1) {
2412 lastArbitratedTransactionNumberByArbitratorTable->put(entry->getTransactionArbitrator(), entry->getTransactionSequenceNumber());
2418 * Set dead the transaction part if that transaction is dead and keep track of all new parts
2420 void Table::processEntry(TransactionPart *entry) {
2421 // Check if we have already seen this transaction and set it dead OR if it is not alive
2422 int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(entry->getArbitratorId());
2423 if ((lastTransactionNumber != NULL) && (lastTransactionNumber >= entry->getSequenceNumber())) {
2424 // This transaction is dead, it was already committed or aborted
2429 // This part is still alive
2430 Hashtable<Pair<int64_t, int32_t>, TransactionPart *> *transactionPart = newTransactionParts->get(entry->getMachineId());
2432 if (transactionPart == NULL) {
2433 // Dont have a table for this machine Id yet so make one
2434 transactionPart = new Hashtable<Pair<int64_t, int32_t>, TransactionPart *>();
2435 newTransactionParts->put(entry->getMachineId(), transactionPart);
2438 // Update the part and set dead ones we have already seen (got a rescued version)
2439 TransactionPart *previouslySeenPart = transactionPart->put(entry->getPartId(), entry);
2440 if (previouslySeenPart != NULL) {
2441 previouslySeenPart->setDead();
2446 * Process new commit entries and save them for future use-> Delete duplicates
2448 void Table::processEntry(CommitPart *entry) {
2449 // Update the last transaction that was updated if we can
2450 if (entry->getTransactionSequenceNumber() != -1) {
2451 int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(entry->getMachineId());
2453 // Update the last transaction sequence number that the arbitrator arbitrated on
2454 if ((lastTransactionNumber == NULL) || (lastTransactionNumber < entry->getTransactionSequenceNumber())) {
2455 lastArbitratedTransactionNumberByArbitratorTable->put(entry->getMachineId(), entry->getTransactionSequenceNumber());
2462 Hashtable<Pair<int64_t, int32_t>, CommitPart *> *commitPart = newCommitParts->get(entry->getMachineId());
2464 if (commitPart == NULL) {
2465 // Don't have a table for this machine Id yet so make one
2466 commitPart = new Hashtable<Pair<int64_t, int32_t>, CommitPart *>();
2467 newCommitParts->put(entry->getMachineId(), commitPart);
2470 // Update the part and set dead ones we have already seen (got a rescued version)
2471 CommitPart *previouslySeenPart = commitPart->put(entry->getPartId(), entry);
2472 if (previouslySeenPart != NULL) {
2473 previouslySeenPart->setDead();
2478 * Update the last message seen table-> Update and set dead the appropriate RejectedMessages as clients see them->
2479 * Updates the live aborts, removes those that are dead and sets them dead->
2480 * Check that the last message seen is correct and that there is no mismatch of our own last message or that
2481 * other clients have not had a rollback on the last message->
2483 void Table::updateLastMessage(int64_t machineId, int64_t seqNum, Liveness *liveness, bool acceptUpdatesToLocal, Hashset<intr64_t> *machineSet) {
2485 // We have seen this machine ID
2486 machineSet->remove(machineId);
2488 // Get the set of rejected messages that this machine Id is has not seen yet
2489 Hashset<RejectedMessage *> *watchset = rejectedMessageWatchVectorTable->get(machineId);
2491 // If there is a rejected message that this machine Id has not seen yet
2492 if (watchset != NULL) {
2494 // Go through each rejected message that this machine Id has not seen yet
2495 for (Iterator<RejectedMessage *> *rmit = watchset->iterator(); rmit->hasNext(); ) {
2497 RejectedMessage *rm = rmit->next();
2499 // If this machine Id has seen this rejected message->->->
2500 if (rm->getSequenceNumber() <= seqNum) {
2502 // Remove it from our watchlist
2505 // Decrement machines that need to see this notification
2506 rm->removeWatcher(machineId);
2511 // Set dead the abort
2512 for (Iterator<Map->Entry<Pair<int64_t, int64_t>, Abort *> > i = liveAbortTable->entrySet()->iterator(); i->hasNext();) {
2513 Abort *abort = i->next()->getValue();
2515 if ((abort->getTransactionMachineId() == machineId) && (abort->getSequenceNumber() <= seqNum)) {
2519 if (abort->getTransactionArbitrator() == localMachineId) {
2520 liveAbortsGeneratedByLocal->remove(abort->getArbitratorLocalSequenceNumber());
2527 if (machineId == localMachineId) {
2528 // Our own messages are immediately dead->
2529 if (liveness instanceof LastMessage) {
2530 ((LastMessage *)liveness)->setDead();
2531 } else if (liveness instanceof Slot) {
2532 ((Slot *)liveness)->setDead();
2534 throw new Error("Unrecognized type");
2538 // Get the old last message for this device
2539 Pair<int64_t, Liveness *> lastMessageEntry = lastMessageTable->put(machineId, Pair<int64_t, Liveness *>(seqNum, liveness));
2540 if (lastMessageEntry == NULL) {
2541 // If no last message then there is nothing else to process
2545 int64_t lastMessageSeqNum = lastMessageEntry->getFirst();
2546 Liveness *lastEntry = lastMessageEntry->getSecond();
2548 // If it is not our machine Id since we already set ours to dead
2549 if (machineId != localMachineId) {
2550 if (lastEntry instanceof LastMessage) {
2551 ((LastMessage *)lastEntry)->setDead();
2552 } else if (lastEntry instanceof Slot) {
2553 ((Slot *)lastEntry)->setDead();
2555 throw new Error("Unrecognized type");
2559 // Make sure the server is not playing any games
2560 if (machineId == localMachineId) {
2562 if (hadPartialSendToServer) {
2563 // We were not making any updates and we had a machine mismatch
2564 if (lastMessageSeqNum > seqNum && !acceptUpdatesToLocal) {
2565 throw new Error("Server Error: Mismatch on local machine sequence number, needed at least: " + lastMessageSeqNum + " got: " + seqNum);
2569 // We were not making any updates and we had a machine mismatch
2570 if (lastMessageSeqNum != seqNum && !acceptUpdatesToLocal) {
2571 throw new Error("Server Error: Mismatch on local machine sequence number, needed: " + lastMessageSeqNum + " got: " + seqNum);
2575 if (lastMessageSeqNum > seqNum) {
2576 throw new Error("Server Error: Rollback on remote machine sequence number");
2582 * Add a rejected message entry to the watch set to keep track of which clients have seen that
2583 * rejected message entry and which have not->
2585 void Table::addWatchVector(int64_t machineId, RejectedMessage *entry) {
2586 Hashset<RejectedMessage *> *entries = rejectedMessageWatchVectorTable->get(machineId);
2587 if (entries == NULL) {
2588 // There is no set for this machine ID yet so create one
2589 entries = new Hashset<RejectedMessage *>();
2590 rejectedMessageWatchVectorTable->put(machineId, entries);
2592 entries->add(entry);
2596 * Check if the HMAC chain is not violated
2598 void Table::checkHMACChain(SlotIndexer *indexer, Array<Slot *> *newSlots) {
2599 for (int i = 0; i < newSlots->length; i++) {
2600 Slot *currSlot = newSlots[i];
2601 Slot *prevSlot = indexer->getSlot(currSlot->getSequenceNumber() - 1);
2602 if (prevSlot != NULL &&
2603 !prevSlot->getHMAC()->equals(currSlot->getPrevHMAC()))
2604 throw new Error("Server Error: Invalid HMAC Chain");