3 #include "SlotBuffer.h"
8 #include "PendingTransaction.h"
9 #include "TableStatus.h"
10 #include "TransactionStatus.h"
11 #include "Transaction.h"
12 #include "LastMessage.h"
16 Table::Table(IoTString *baseurl, IoTString *password, int64_t _localMachineId, int listeningPort) :
18 cloud(new CloudComm(this, baseurl, password, listeningPort)),
20 liveTableStatus(NULL),
21 pendingTransactionBuilder(NULL),
22 lastPendingTransactionSpeculatedOn(NULL),
23 firstPendingTransaction(NULL),
25 bufferResizeThreshold(0),
27 oldestLiveSlotSequenceNumver(1),
28 localMachineId(_localMachineId),
30 localTransactionSequenceNumber(0),
31 lastTransactionSequenceNumberSpeculatedOn(0),
32 oldestTransactionSequenceNumberSpeculatedOn(0),
33 localArbitrationSequenceNumber(0),
34 hadPartialSendToServer(false),
35 attemptedToSendToServer(false),
37 didFindTableStatus(false),
39 lastSlotAttemptedToSend(NULL),
42 lastTransactionPartsSent(NULL),
43 lastPendingSendArbitrationEntriesToDelete(NULL),
45 committedKeyValueTable(NULL),
46 speculatedKeyValueTable(NULL),
47 pendingTransactionSpeculatedKeyValueTable(NULL),
48 liveNewKeyTable(NULL),
49 lastMessageTable(NULL),
50 rejectedMessageWatchVectorTable(NULL),
51 arbitratorTable(NULL),
53 newTransactionParts(NULL),
55 lastArbitratedTransactionNumberByArbitratorTable(NULL),
56 liveTransactionBySequenceNumberTable(NULL),
57 liveTransactionByTransactionIdTable(NULL),
58 liveCommitsTable(NULL),
59 liveCommitsByKeyTable(NULL),
60 lastCommitSeenSequenceNumberByArbitratorTable(NULL),
61 rejectedSlotVector(NULL),
62 pendingTransactionQueue(NULL),
63 pendingSendArbitrationRounds(NULL),
64 pendingSendArbitrationEntriesToDelete(NULL),
65 transactionPartsSent(NULL),
66 outstandingTransactionStatus(NULL),
67 liveAbortsGeneratedByLocal(NULL),
68 offlineTransactionsCommittedAndAtServer(NULL),
69 localCommunicationTable(NULL),
70 lastTransactionSeenFromMachineFromServer(NULL),
71 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator(NULL),
72 lastInsertedNewKey(false),
78 Table::Table(CloudComm *_cloud, int64_t _localMachineId) :
82 liveTableStatus(NULL),
83 pendingTransactionBuilder(NULL),
84 lastPendingTransactionSpeculatedOn(NULL),
85 firstPendingTransaction(NULL),
87 bufferResizeThreshold(0),
89 oldestLiveSlotSequenceNumver(1),
90 localMachineId(_localMachineId),
92 localTransactionSequenceNumber(0),
93 lastTransactionSequenceNumberSpeculatedOn(0),
94 oldestTransactionSequenceNumberSpeculatedOn(0),
95 localArbitrationSequenceNumber(0),
96 hadPartialSendToServer(false),
97 attemptedToSendToServer(false),
99 didFindTableStatus(false),
101 lastSlotAttemptedToSend(NULL),
104 lastTransactionPartsSent(NULL),
105 lastPendingSendArbitrationEntriesToDelete(NULL),
107 committedKeyValueTable(NULL),
108 speculatedKeyValueTable(NULL),
109 pendingTransactionSpeculatedKeyValueTable(NULL),
110 liveNewKeyTable(NULL),
111 lastMessageTable(NULL),
112 rejectedMessageWatchVectorTable(NULL),
113 arbitratorTable(NULL),
114 liveAbortTable(NULL),
115 newTransactionParts(NULL),
116 newCommitParts(NULL),
117 lastArbitratedTransactionNumberByArbitratorTable(NULL),
118 liveTransactionBySequenceNumberTable(NULL),
119 liveTransactionByTransactionIdTable(NULL),
120 liveCommitsTable(NULL),
121 liveCommitsByKeyTable(NULL),
122 lastCommitSeenSequenceNumberByArbitratorTable(NULL),
123 rejectedSlotVector(NULL),
124 pendingTransactionQueue(NULL),
125 pendingSendArbitrationRounds(NULL),
126 pendingSendArbitrationEntriesToDelete(NULL),
127 transactionPartsSent(NULL),
128 outstandingTransactionStatus(NULL),
129 liveAbortsGeneratedByLocal(NULL),
130 offlineTransactionsCommittedAndAtServer(NULL),
131 localCommunicationTable(NULL),
132 lastTransactionSeenFromMachineFromServer(NULL),
133 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator(NULL),
134 lastInsertedNewKey(false),
141 * Init all the stuff needed for for table usage
144 // Init helper objects
145 random = new Random();
146 buffer = new SlotBuffer();
149 committedKeyValueTable = new Hashtable<IoTString *, KeyValue *>();
150 speculatedKeyValueTable = new Hashtable<IoTString *, KeyValue *>();
151 pendingTransactionSpeculatedKeyValueTable = new Hashtable<IoTString *, KeyValue *>();
152 liveNewKeyTable = new Hashtable<IoTString *, NewKey *>();
153 lastMessageTable = new Hashtable<int64_t, Pair<int64_t, Liveness *> >();
154 rejectedMessageWatchVectorTable = new Hashtable<int64_t, Hashset<RejectedMessage *> * >();
155 arbitratorTable = new Hashtable<IoTString *, int64_t>();
156 liveAbortTable = new Hashtable<Pair<int64_t, int64_t>, Abort *>();
157 newTransactionParts = new Hashtable<int64_t, Hashtable<Pair<int64_t, int32_t>, TransactionPart *> *>();
158 newCommitParts = new Hashtable<int64_t, Hashtable<Pair<int64_t, int32_t>, CommitPart *> *>();
159 lastArbitratedTransactionNumberByArbitratorTable = new Hashtable<int64_t, int64_t>();
160 liveTransactionBySequenceNumberTable = new Hashtable<int64_t, Transaction *>();
161 liveTransactionByTransactionIdTable = new Hashtable<Pair<int64_t, int64_t>, Transaction *>();
162 liveCommitsTable = new Hashtable<int64_t, Hashtable<int64_t, Commit *> >();
163 liveCommitsByKeyTable = new Hashtable<IoTString *, Commit *>();
164 lastCommitSeenSequenceNumberByArbitratorTable = new Hashtable<int64_t, int64_t>();
165 rejectedSlotVector = new Vector<int64_t>();
166 pendingTransactionQueue = new Vector<Transaction *>();
167 pendingSendArbitrationEntriesToDelete = new Vector<Entry *>();
168 transactionPartsSent = new Hashtable<Transaction *, Vector<int32_t> *>();
169 outstandingTransactionStatus = new Hashtable<int64_t, TransactionStatus *>();
170 liveAbortsGeneratedByLocal = new Hashtable<int64_t, Abort *>();
171 offlineTransactionsCommittedAndAtServer = new Hashset<Pair<int64_t, int64_t> >();
172 localCommunicationTable = new Hashtable<int64_t, Pair<IoTString *, int32_t> >();
173 lastTransactionSeenFromMachineFromServer = new Hashtable<int64_t, int64_t>();
174 pendingSendArbitrationRounds = new Vector<ArbitrationRound *>();
175 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator = new Hashtable<int64_t, int64_t>();
179 numberOfSlots = buffer->capacity();
180 setResizeThreshold();
184 * Initialize the table by inserting a table status as the first entry
185 * into the table status also initialize the crypto stuff.
187 void Table::initTable() {
188 cloud->initSecurity();
190 // Create the first insertion into the block chain which is the table status
191 Slot *s = new Slot(this, 1, localMachineId, localSequenceNumber);
192 localSequenceNumber++;
193 TableStatus *status = new TableStatus(s, numberOfSlots);
195 Array<Slot *> *array = cloud->putSlot(s, numberOfSlots);
198 array = new Array<Slot *>(1);
200 // update local block chain
201 validateAndUpdate(array, true);
202 } else if (array->length() == 1) {
203 // in case we did push the slot BUT we failed to init it
204 validateAndUpdate(array, true);
206 throw new Error("Error on initialization");
211 * Rebuild the table from scratch by pulling the latest block chain
214 void Table::rebuild() {
215 // Just pull the latest slots from the server
216 Array<Slot *> *newslots = cloud->getSlots(sequenceNumber + 1);
217 validateAndUpdate(newslots, true);
219 updateLiveTransactionsAndStatus();
222 void Table::addLocalCommunication(int64_t arbitrator, IoTString *hostName, int portNumber) {
223 localCommunicationTable->put(arbitrator, Pair<IoTString *, int32_t>(hostName, portNumber));
226 int64_t Table::getArbitrator(IoTString *key) {
227 return arbitratorTable->get(key);
230 void Table::close() {
234 IoTString *Table::getCommitted(IoTString *key) {
235 KeyValue *kv = committedKeyValueTable->get(key);
238 return kv->getValue();
244 IoTString *Table::getSpeculative(IoTString *key) {
245 KeyValue *kv = pendingTransactionSpeculatedKeyValueTable->get(key);
248 kv = speculatedKeyValueTable->get(key);
252 kv = committedKeyValueTable->get(key);
256 return kv->getValue();
262 IoTString *Table::getCommittedAtomic(IoTString *key) {
263 KeyValue *kv = committedKeyValueTable->get(key);
265 if (!arbitratorTable->contains(key)) {
266 throw new Error("Key not Found.");
269 // Make sure new key value pair matches the current arbitrator
270 if (!pendingTransactionBuilder->checkArbitrator(arbitratorTable->get(key))) {
271 // TODO: Maybe not throw en error
272 throw new Error("Not all Key Values Match Arbitrator.");
276 pendingTransactionBuilder->addKVGuard(new KeyValue(key, kv->getValue()));
277 return kv->getValue();
279 pendingTransactionBuilder->addKVGuard(new KeyValue(key, NULL));
284 IoTString *Table::getSpeculativeAtomic(IoTString *key) {
285 if (!arbitratorTable->contains(key)) {
286 throw new Error("Key not Found.");
289 // Make sure new key value pair matches the current arbitrator
290 if (!pendingTransactionBuilder->checkArbitrator(arbitratorTable->get(key))) {
291 // TODO: Maybe not throw en error
292 throw new Error("Not all Key Values Match Arbitrator.");
295 KeyValue *kv = pendingTransactionSpeculatedKeyValueTable->get(key);
298 kv = speculatedKeyValueTable->get(key);
302 kv = committedKeyValueTable->get(key);
306 pendingTransactionBuilder->addKVGuard(new KeyValue(key, kv->getValue()));
307 return kv->getValue();
309 pendingTransactionBuilder->addKVGuard(new KeyValue(key, NULL));
314 bool Table::update() {
316 Array<Slot *> *newSlots = cloud->getSlots(sequenceNumber + 1);
317 validateAndUpdate(newSlots, false);
321 updateLiveTransactionsAndStatus();
324 } catch (Exception *e) {
325 for (int64_t m : localCommunicationTable->keySet()) {
333 bool Table::createNewKey(IoTString *keyName, int64_t machineId) {
335 if (!arbitratorTable->contains(keyName)) {
336 // There is already an arbitrator
340 NewKey *newKey = new NewKey(NULL, keyName, machineId);
342 if (sendToServer(newKey)) {
343 // If successfully inserted
349 void Table::startTransaction() {
350 // Create a new transaction, invalidates any old pending transactions.
351 pendingTransactionBuilder = new PendingTransaction(localMachineId);
354 void Table::addKV(IoTString *key, IoTString *value) {
356 // Make sure it is a valid key
357 if (!arbitratorTable->contains(key)) {
358 throw new Error("Key not Found.");
361 // Make sure new key value pair matches the current arbitrator
362 if (!pendingTransactionBuilder->checkArbitrator(arbitratorTable->get(key))) {
363 // TODO: Maybe not throw en error
364 throw new Error("Not all Key Values Match Arbitrator.");
367 // Add the key value to this transaction
368 KeyValue *kv = new KeyValue(key, value);
369 pendingTransactionBuilder->addKV(kv);
372 TransactionStatus *Table::commitTransaction() {
374 if (pendingTransactionBuilder->getKVUpdates()->size() == 0) {
375 // transaction with no updates will have no effect on the system
376 return new TransactionStatus(TransactionStatus_StatusNoEffect, -1);
379 // Set the local transaction sequence number and increment
380 pendingTransactionBuilder->setClientLocalSequenceNumber(localTransactionSequenceNumber);
381 localTransactionSequenceNumber++;
383 // Create the transaction status
384 TransactionStatus *transactionStatus = new TransactionStatus(TransactionStatus_StatusPending, pendingTransactionBuilder->getArbitrator());
386 // Create the new transaction
387 Transaction *newTransaction = pendingTransactionBuilder->createTransaction();
388 newTransaction->setTransactionStatus(transactionStatus);
390 if (pendingTransactionBuilder->getArbitrator() != localMachineId) {
391 // Add it to the queue and invalidate the builder for safety
392 pendingTransactionQueue->add(newTransaction);
394 arbitrateOnLocalTransaction(newTransaction);
395 updateLiveStateFromLocal();
398 pendingTransactionBuilder = new PendingTransaction(localMachineId);
402 } catch (ServerException *e) {
404 Hashset<int64_t> *arbitratorTriedAndFailed = new Hashset<int64_t>();
405 for (Iterator<Transaction *> *iter = pendingTransactionQueue->iterator(); iter->hasNext(); ) {
406 Transaction *transaction = iter->next();
408 if (arbitratorTriedAndFailed->contains(transaction->getArbitrator())) {
409 // Already contacted this client so ignore all attempts to contact this client
410 // to preserve ordering for arbitrator
414 Pair<bool, bool> sendReturn = sendTransactionToLocal(transaction);
416 if (sendReturn->getFirst()) {
417 // Failed to contact over local
418 arbitratorTriedAndFailed->add(transaction->getArbitrator());
420 // Successful contact or should not contact
422 if (sendReturn->getSecond()) {
430 updateLiveStateFromLocal();
432 return transactionStatus;
436 * Recalculate the new resize threshold
438 void Table::setResizeThreshold() {
439 int resizeLower = (int) (Table_RESIZE_THRESHOLD * numberOfSlots);
440 bufferResizeThreshold = resizeLower - 1 + random->nextInt(numberOfSlots - resizeLower);
443 int64_t Table::getLocalSequenceNumber() {
444 return localSequenceNumber;
448 bool lastInsertedNewKey = false;
450 bool Table::sendToServer(NewKey *newKey) {
452 bool fromRetry = false;
455 if (hadPartialSendToServer) {
456 Array<Slot *> *newSlots = cloud->getSlots(sequenceNumber + 1);
457 if (newSlots->length() == 0) {
459 ThreeTuple<bool, bool, Array<Slot *> *> sendSlotsReturn = sendSlotsToServer(lastSlotAttemptedToSend, lastNewSize, lastIsNewKey);
461 if (sendSlotsReturn->getFirst()) {
462 if (newKey != NULL) {
463 if (lastInsertedNewKey && (lastNewKey->getKey() == newKey->getKey()) && (lastNewKey->getMachineID() == newKey->getMachineID())) {
468 for (Transaction *transaction : lastTransactionPartsSent->keySet()) {
469 transaction->resetServerFailure();
471 // Update which transactions parts still need to be sent
472 transaction->removeSentParts(lastTransactionPartsSent->get(transaction));
474 // Add the transaction status to the outstanding list
475 outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus());
477 // Update the transaction status
478 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial);
480 // Check if all the transaction parts were successfully sent and if so then remove it from pending
481 if (transaction->didSendAllParts()) {
482 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully);
483 pendingTransactionQueue->remove(transaction);
488 newSlots = sendSlotsReturn->getThird();
490 bool isInserted = false;
491 for (uint si = 0; si < newSlots->length(); si++) {
492 Slot *s = newSlots->get(si);
493 if ((s->getSequenceNumber() == lastSlotAttemptedToSend->getSequenceNumber()) && (s->getMachineID() == localMachineId)) {
499 for (uint si = 0; si < newSlots->length(); si++) {
500 Slot *s = newSlots->get(si);
505 // Process each entry in the slot
506 for (Entry *entry : s->getEntries()) {
507 if (entry->getType() == TypeLastMessage) {
508 LastMessage *lastMessage = (LastMessage *)entry;
509 if ((lastMessage->getMachineID() == localMachineId) && (lastMessage->getSequenceNumber() == lastSlotAttemptedToSend->getSequenceNumber())) {
518 if (newKey != NULL) {
519 if (lastInsertedNewKey && (lastNewKey->getKey() == newKey->getKey()) && (lastNewKey->getMachineID() == newKey->getMachineID())) {
524 for (Transaction *transaction : lastTransactionPartsSent->keySet()) {
525 transaction->resetServerFailure();
527 // Update which transactions parts still need to be sent
528 transaction->removeSentParts(lastTransactionPartsSent->get(transaction));
530 // Add the transaction status to the outstanding list
531 outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus());
533 // Update the transaction status
534 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial);
536 // Check if all the transaction parts were successfully sent and if so then remove it from pending
537 if (transaction->didSendAllParts()) {
538 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully);
539 pendingTransactionQueue->remove(transaction);
541 transaction->resetServerFailure();
542 // Set the transaction sequence number back to nothing
543 if (!transaction->didSendAPartToServer()) {
544 transaction->setSequenceNumber(-1);
551 for (Transaction *transaction : lastTransactionPartsSent->keySet()) {
552 transaction->resetServerFailure();
553 // Set the transaction sequence number back to nothing
554 if (!transaction->didSendAPartToServer()) {
555 transaction->setSequenceNumber(-1);
559 if (sendSlotsReturn->getThird()->length() != 0) {
560 // insert into the local block chain
561 validateAndUpdate(sendSlotsReturn->getThird(), true);
565 bool isInserted = false;
566 for (uint si = 0; si < newSlots->length(); si++) {
567 Slot *s = newSlots->get(si);
568 if ((s->getSequenceNumber() == lastSlotAttemptedToSend->getSequenceNumber()) && (s->getMachineID() == localMachineId)) {
574 for (uint si = 0; si < newSlots->length(); si++) {
575 Slot *s = newSlots->get(si);
580 // Process each entry in the slot
581 for (Entry *entry : s->getEntries()) {
583 if (entry->getType() == TypeLastMessage) {
584 LastMessage *lastMessage = (LastMessage *)entry;
585 if ((lastMessage->getMachineID() == localMachineId) && (lastMessage->getSequenceNumber() == lastSlotAttemptedToSend->getSequenceNumber())) {
594 if (newKey != NULL) {
595 if (lastInsertedNewKey && (lastNewKey->getKey() == newKey->getKey()) && (lastNewKey->getMachineID() == newKey->getMachineID())) {
600 for (Transaction *transaction : lastTransactionPartsSent->keySet()) {
601 transaction->resetServerFailure();
603 // Update which transactions parts still need to be sent
604 transaction->removeSentParts(lastTransactionPartsSent->get(transaction));
606 // Add the transaction status to the outstanding list
607 outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus());
609 // Update the transaction status
610 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial);
612 // Check if all the transaction parts were successfully sent and if so then remove it from pending
613 if (transaction->didSendAllParts()) {
614 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully);
615 pendingTransactionQueue->remove(transaction);
617 transaction->resetServerFailure();
618 // Set the transaction sequence number back to nothing
619 if (!transaction->didSendAPartToServer()) {
620 transaction->setSequenceNumber(-1);
625 for (Transaction *transaction : lastTransactionPartsSent->keySet()) {
626 transaction->resetServerFailure();
627 // Set the transaction sequence number back to nothing
628 if (!transaction->didSendAPartToServer()) {
629 transaction->setSequenceNumber(-1);
634 // insert into the local block chain
635 validateAndUpdate(newSlots, true);
638 } catch (ServerException *e) {
645 // While we have stuff that needs inserting into the block chain
646 while ((pendingTransactionQueue->size() > 0) || (pendingSendArbitrationRounds->size() > 0) || (newKey != NULL)) {
650 if (hadPartialSendToServer) {
651 throw new Error("Should Be error free");
656 // If there is a new key with same name then end
657 if ((newKey != NULL) && arbitratorTable->contains(newKey->getKey())) {
662 Slot *slot = new Slot(this, sequenceNumber + 1, localMachineId, buffer->getSlot(sequenceNumber)->getHMAC(), localSequenceNumber);
663 localSequenceNumber++;
665 // Try to fill the slot with data
666 ThreeTuple<bool, int32_t, bool> fillSlotsReturn = fillSlot(slot, false, newKey);
667 bool needsResize = fillSlotsReturn->getFirst();
668 int newSize = fillSlotsReturn->getSecond();
669 bool insertedNewKey = fillSlotsReturn->getThird();
672 // Reset which transaction to send
673 for (Transaction *transaction : transactionPartsSent->keySet()) {
674 transaction->resetNextPartToSend();
676 // Set the transaction sequence number back to nothing
677 if (!transaction->didSendAPartToServer() && !transaction->getServerFailure()) {
678 transaction->setSequenceNumber(-1);
682 // Clear the sent data since we are trying again
683 pendingSendArbitrationEntriesToDelete->clear();
684 transactionPartsSent->clear();
686 // We needed a resize so try again
687 fillSlot(slot, true, newKey);
690 lastSlotAttemptedToSend = slot;
691 lastIsNewKey = (newKey != NULL);
692 lastInsertedNewKey = insertedNewKey;
693 lastNewSize = newSize;
695 lastTransactionPartsSent = new Hashtable<Transaction *, Vector<int32_t> * >(transactionPartsSent);
696 lastPendingSendArbitrationEntriesToDelete = new Vector<Entry *>(pendingSendArbitrationEntriesToDelete);
699 ThreeTuple<bool, bool, Array<Slot *> *> sendSlotsReturn = sendSlotsToServer(slot, newSize, newKey != NULL);
701 if (sendSlotsReturn->getFirst()) {
703 // Did insert into the block chain
705 if (insertedNewKey) {
706 // This slot was what was inserted not a previous slot
708 // New Key was successfully inserted into the block chain so dont want to insert it again
712 // Remove the aborts and commit parts that were sent from the pending to send queue
713 for (Iterator<ArbitrationRound *> *iter = pendingSendArbitrationRounds->iterator(); iter->hasNext(); ) {
714 ArbitrationRound *round = iter->next();
715 round->removeParts(pendingSendArbitrationEntriesToDelete);
717 if (round->isDoneSending()) {
718 // Sent all the parts
723 for (Transaction *transaction : transactionPartsSent->keySet()) {
724 transaction->resetServerFailure();
726 // Update which transactions parts still need to be sent
727 transaction->removeSentParts(transactionPartsSent->get(transaction));
729 // Add the transaction status to the outstanding list
730 outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus());
732 // Update the transaction status
733 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial);
735 // Check if all the transaction parts were successfully sent and if so then remove it from pending
736 if (transaction->didSendAllParts()) {
737 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully);
738 pendingTransactionQueue->remove(transaction);
742 // Reset which transaction to send
743 for (Transaction *transaction : transactionPartsSent->keySet()) {
744 transaction->resetNextPartToSend();
746 // Set the transaction sequence number back to nothing
747 if (!transaction->didSendAPartToServer() && !transaction->getServerFailure()) {
748 transaction->setSequenceNumber(-1);
753 // Clear the sent data in preparation for next send
754 pendingSendArbitrationEntriesToDelete->clear();
755 transactionPartsSent->clear();
757 if (sendSlotsReturn->getThird()->length() != 0) {
758 // insert into the local block chain
759 validateAndUpdate(sendSlotsReturn->getThird(), true);
763 } catch (ServerException *e) {
765 if (e->getType() != ServerException->TypeInputTimeout) {
766 // Nothing was able to be sent to the server so just clear these data structures
767 for (Transaction *transaction : transactionPartsSent->keySet()) {
768 transaction->resetNextPartToSend();
770 // Set the transaction sequence number back to nothing
771 if (!transaction->didSendAPartToServer() && !transaction->getServerFailure()) {
772 transaction->setSequenceNumber(-1);
776 // There was a partial send to the server
777 hadPartialSendToServer = true;
779 // Nothing was able to be sent to the server so just clear these data structures
780 for (Transaction *transaction : transactionPartsSent->keySet()) {
781 transaction->resetNextPartToSend();
782 transaction->setServerFailure();
786 pendingSendArbitrationEntriesToDelete->clear();
787 transactionPartsSent->clear();
792 return newKey == NULL;
795 bool Table::updateFromLocal(int64_t machineId) {
796 Pair<IoTString *, int32_t> localCommunicationInformation = localCommunicationTable->get(machineId);
797 if (localCommunicationInformation == NULL) {
798 // Cant talk to that device locally so do nothing
802 // Get the size of the send data
803 int sendDataSize = sizeof(int32_t) + sizeof(int64_t);
805 int64_t lastArbitrationDataLocalSequenceNumber = (int64_t) -1;
806 if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(machineId) != NULL) {
807 lastArbitrationDataLocalSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(machineId);
810 Array<char> *sendData = new Array<char>(sendDataSize);
811 ByteBuffer *bbEncode = ByteBuffer_wrap(sendData);
814 bbEncode->putLong(lastArbitrationDataLocalSequenceNumber);
818 Array<char> *returnData = cloud->sendLocalData(sendData, localSequenceNumber, localCommunicationInformation->getFirst(), localCommunicationInformation->getSecond());
819 localSequenceNumber++;
821 if (returnData == NULL) {
822 // Could not contact server
827 ByteBuffer *bbDecode = ByteBuffer_wrap(returnData);
828 int numberOfEntries = bbDecode->getInt();
830 for (int i = 0; i < numberOfEntries; i++) {
831 char type = bbDecode->get();
832 if (type == TypeAbort) {
833 Abort *abort = (Abort)Abort_decode(NULL, bbDecode);
835 } else if (type == TypeCommitPart) {
836 CommitPart *commitPart = (CommitPart)CommitPart_decode(NULL, bbDecode);
837 processEntry(commitPart);
841 updateLiveStateFromLocal();
846 Pair<bool, bool> Table::sendTransactionToLocal(Transaction *transaction) {
848 // Get the devices local communications
849 Pair<IoTString *, int32_t> localCommunicationInformation = localCommunicationTable->get(transaction->getArbitrator());
851 if (localCommunicationInformation == NULL) {
852 // Cant talk to that device locally so do nothing
853 return Pair<bool, bool>(true, false);
856 // Get the size of the send data
857 int sendDataSize = sizeof(int32_t) + sizeof(int64_t);
858 for (TransactionPart *part : transaction->getParts()->values()) {
859 sendDataSize += part->getSize();
862 int64_t lastArbitrationDataLocalSequenceNumber = (int64_t) -1;
863 if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(transaction->getArbitrator()) != NULL) {
864 lastArbitrationDataLocalSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(transaction->getArbitrator());
867 // Make the send data size
868 Array<char> *sendData = new Array<char>(sendDataSize);
869 ByteBuffer *bbEncode = ByteBuffer.wrap(sendData);
872 bbEncode->putLong(lastArbitrationDataLocalSequenceNumber);
873 bbEncode->putInt(transaction->getParts()->size());
874 for (TransactionPart *part : transaction->getParts()->values()) {
875 part->encode(bbEncode);
880 Array<char> *returnData = cloud->sendLocalData(sendData, localSequenceNumber, localCommunicationInformation->getFirst(), localCommunicationInformation->getSecond());
881 localSequenceNumber++;
883 if (returnData == NULL) {
884 // Could not contact server
885 return Pair<bool, bool>(true, false);
889 ByteBuffer *bbDecode = ByteBuffer_wrap(returnData);
890 bool didCommit = bbDecode->get() == 1;
891 bool couldArbitrate = bbDecode->get() == 1;
892 int numberOfEntries = bbDecode->getInt();
893 bool foundAbort = false;
895 for (int i = 0; i < numberOfEntries; i++) {
896 char type = bbDecode->get();
897 if (type == TypeAbort) {
898 Abort *abort = (Abort)Abort_decode(NULL, bbDecode);
900 if ((abort->getTransactionMachineId() == localMachineId) && (abort->getTransactionClientLocalSequenceNumber() == transaction->getClientLocalSequenceNumber())) {
905 } else if (type == TypeCommitPart) {
906 CommitPart *commitPart = (CommitPart)CommitPart_decode(NULL, bbDecode);
907 processEntry(commitPart);
911 updateLiveStateFromLocal();
913 if (couldArbitrate) {
914 TransactionStatus status = transaction->getTransactionStatus();
916 status->setStatus(TransactionStatus_StatusCommitted);
918 status->setStatus(TransactionStatus_StatusAborted);
921 TransactionStatus status = transaction->getTransactionStatus();
923 status->setStatus(TransactionStatus_StatusAborted);
925 status->setStatus(TransactionStatus_StatusCommitted);
929 return Pair<bool, bool>(false, true);
932 Array<char> *Table::acceptDataFromLocal(Array<char> *data) {
935 ByteBuffer *bbDecode = ByteBuffer_wrap(data);
936 int64_t lastArbitratedSequenceNumberSeen = bbDecode->getLong();
937 int numberOfParts = bbDecode->getInt();
939 // If we did commit a transaction or not
940 bool didCommit = false;
941 bool couldArbitrate = false;
943 if (numberOfParts != 0) {
945 // decode the transaction
946 Transaction *transaction = new Transaction();
947 for (int i = 0; i < numberOfParts; i++) {
949 TransactionPart *newPart = (TransactionPart)TransactionPart.decode(NULL, bbDecode);
950 transaction->addPartDecode(newPart);
953 // Arbitrate on transaction and pull relevant return data
954 Pair<bool, bool> localArbitrateReturn = arbitrateOnLocalTransaction(transaction);
955 couldArbitrate = localArbitrateReturn->getFirst();
956 didCommit = localArbitrateReturn->getSecond();
958 updateLiveStateFromLocal();
960 // Transaction was sent to the server so keep track of it to prevent double commit
961 if (transaction->getSequenceNumber() != -1) {
962 offlineTransactionsCommittedAndAtServer->add(transaction->getId());
966 // The data to send back
967 int returnDataSize = 0;
968 Vector<Entry *> *unseenArbitrations = new Vector<Entry *>();
970 // Get the aborts to send back
971 Vector<int64_t> *abortLocalSequenceNumbers = new Vector<int64_t>(liveAbortsGeneratedByLocal->keySet());
972 Collections->sort(abortLocalSequenceNumbers);
973 for (int64_t localSequenceNumber : abortLocalSequenceNumbers) {
974 if (localSequenceNumber <= lastArbitratedSequenceNumberSeen) {
978 Abort *abort = liveAbortsGeneratedByLocal->get(localSequenceNumber);
979 unseenArbitrations->add(abort);
980 returnDataSize += abort->getSize();
983 // Get the commits to send back
984 Hashtable<int64_t, Commit *> *commitForClientTable = liveCommitsTable->get(localMachineId);
985 if (commitForClientTable != NULL) {
986 Vector<int64_t> *commitLocalSequenceNumbers = new Vector<int64_t>(commitForClientTable->keySet());
987 Collections->sort(commitLocalSequenceNumbers);
989 for (int64_t localSequenceNumber : commitLocalSequenceNumbers) {
990 Commit *commit = commitForClientTable->get(localSequenceNumber);
992 if (localSequenceNumber <= lastArbitratedSequenceNumberSeen) {
996 unseenArbitrations->addAll(commit->getParts()->values());
998 for (CommitPart commitPart : commit->getParts()->values()) {
999 returnDataSize += commitPart->getSize();
1004 // Number of arbitration entries to decode
1005 returnDataSize += 2 * sizeof(int32_t);
1007 // bool of did commit or not
1008 if (numberOfParts != 0) {
1009 returnDataSize += sizeof(char);
1012 // Data to send Back
1013 Array<char> *returnData = new Array<char>(returnDataSize);
1014 ByteBuffer *bbEncode = ByteBuffer_wrap(returnData);
1016 if (numberOfParts != 0) {
1018 bbEncode->put((char)1);
1020 bbEncode->put((char)0);
1022 if (couldArbitrate) {
1023 bbEncode->put((char)1);
1025 bbEncode->put((char)0);
1029 bbEncode->putInt(unseenArbitrations->size());
1030 for (Entry *entry : unseenArbitrations) {
1031 entry->encode(bbEncode);
1035 localSequenceNumber++;
1039 ThreeTuple<bool, bool, Array<Slot *> *> Table::sendSlotsToServer(Slot *slot, int newSize, bool isNewKey) {
1040 bool attemptedToSendToServerTmp = attemptedToSendToServer;
1041 attemptedToSendToServer = true;
1043 bool inserted = false;
1044 bool lastTryInserted = false;
1046 Array<Slot *> *array = cloud->putSlot(slot, newSize);
1047 if (array == NULL) {
1048 array = new Array<Slot *>();
1049 array->set(0, slot);
1050 rejectedSlotVector->clear();
1053 if (array->length() == 0) {
1054 throw new Error("Server Error: Did not send any slots");
1057 // if (attemptedToSendToServerTmp) {
1058 if (hadPartialSendToServer) {
1060 bool isInserted = false;
1061 for (Slot *s : array) {
1062 if ((s->getSequenceNumber() == slot->getSequenceNumber()) && (s->getMachineID() == localMachineId)) {
1068 for (Slot *s : array) {
1073 // Process each entry in the slot
1074 for (Entry *entry : s->getEntries()) {
1076 if (entry->getType() == TypeLastMessage) {
1077 LastMessage *lastMessage = (LastMessage *)entry;
1079 if ((lastMessage->getMachineID() == localMachineId) && (lastMessage->getSequenceNumber() == slot->getSequenceNumber())) {
1088 rejectedSlotVector->add(slot->getSequenceNumber());
1089 lastTryInserted = false;
1091 lastTryInserted = true;
1094 rejectedSlotVector->add(slot->getSequenceNumber());
1095 lastTryInserted = false;
1099 return ThreeTuple<bool, bool, Array<Slot *> *>(inserted, lastTryInserted, array);
1103 * Returns false if a resize was needed
1105 ThreeTuple<bool, int32_t, bool> Table::fillSlot(Slot *slot, bool resize, NewKey *newKeyEntry) {
1107 if (liveSlotCount > bufferResizeThreshold) {
1108 resize = true; //Resize is forced
1112 newSize = (int) (numberOfSlots * Table_RESIZE_MULTIPLE);
1113 TableStatus *status = new TableStatus(slot, newSize);
1114 slot->addEntry(status);
1117 // Fill with rejected slots first before doing anything else
1118 doRejectedMessages(slot);
1120 // Do mandatory rescue of entries
1121 ThreeTuple<bool, bool, int64_t> mandatoryRescueReturn = doMandatoryResuce(slot, resize);
1123 // Extract working variables
1124 bool needsResize = mandatoryRescueReturn->getFirst();
1125 bool seenLiveSlot = mandatoryRescueReturn->getSecond();
1126 int64_t currentRescueSequenceNumber = mandatoryRescueReturn->getThird();
1128 if (needsResize && !resize) {
1129 // We need to resize but we are not resizing so return false
1130 return ThreeTuple<bool, int32_t, bool>(true, NULL, NULL);
1133 bool inserted = false;
1134 if (newKeyEntry != NULL) {
1135 newKeyEntry->setSlot(slot);
1136 if (slot->hasSpace(newKeyEntry)) {
1138 slot->addEntry(newKeyEntry);
1143 // Clear the transactions, aborts and commits that were sent previously
1144 transactionPartsSent->clear();
1145 pendingSendArbitrationEntriesToDelete->clear();
1147 for (ArbitrationRound *round : pendingSendArbitrationRounds) {
1148 bool isFull = false;
1149 round->generateParts();
1150 Vector<Entry *> *parts = round->getParts();
1152 // Insert pending arbitration data
1153 for (Entry *arbitrationData : parts) {
1155 // If it is an abort then we need to set some information
1156 if (arbitrationData instanceof Abort) {
1157 ((Abort *)arbitrationData)->setSequenceNumber(slot->getSequenceNumber());
1160 if (!slot->hasSpace(arbitrationData)) {
1161 // No space so cant do anything else with these data entries
1166 // Add to this current slot and add it to entries to delete
1167 slot->addEntry(arbitrationData);
1168 pendingSendArbitrationEntriesToDelete->add(arbitrationData);
1176 if (pendingTransactionQueue->size() > 0) {
1177 Transaction *transaction = pendingTransactionQueue->get(0);
1178 // Set the transaction sequence number if it has yet to be inserted into the block chain
1179 if ((!transaction->didSendAPartToServer()) || (transaction->getSequenceNumber() == -1)) {
1180 transaction->setSequenceNumber(slot->getSequenceNumber());
1184 TransactionPart *part = transaction->getNextPartToSend();
1186 // Ran out of parts to send for this transaction so move on
1190 if (slot->hasSpace(part)) {
1191 slot->addEntry(part);
1192 Vector<int32_t> *partsSent = transactionPartsSent->get(transaction);
1193 if (partsSent == NULL) {
1194 partsSent = new Vector<int32_t>();
1195 transactionPartsSent->put(transaction, partsSent);
1197 partsSent->add(part->getPartNumber());
1198 transactionPartsSent->put(transaction, partsSent);
1205 // Fill the remainder of the slot with rescue data
1206 doOptionalRescue(slot, seenLiveSlot, currentRescueSequenceNumber, resize);
1208 return ThreeTuple<bool, int32_t, bool>(false, newSize, inserted);
1211 void Table::doRejectedMessages(Slot *s) {
1212 if (!rejectedSlotVector->isEmpty()) {
1213 /* TODO: We should avoid generating a rejected message entry if
1214 * there is already a sufficient entry in the queue (e->g->,
1215 * equalsto value of true and same sequence number)-> */
1217 int64_t old_seqn = rejectedSlotVector->firstElement();
1218 if (rejectedSlotVector->size() > Table_REJECTED_THRESHOLD) {
1219 int64_t new_seqn = rejectedSlotVector->lastElement();
1220 RejectedMessage *rm = new RejectedMessage(s, s->getSequenceNumber(), localMachineId, old_seqn, new_seqn, false);
1223 int64_t prev_seqn = -1;
1225 /* Go through list of missing messages */
1226 for (; i < rejectedSlotVector->size(); i++) {
1227 int64_t curr_seqn = rejectedSlotVector->get(i);
1228 Slot *s_msg = buffer->getSlot(curr_seqn);
1231 prev_seqn = curr_seqn;
1233 /* Generate rejected message entry for missing messages */
1234 if (prev_seqn != -1) {
1235 RejectedMessage *rm = new RejectedMessage(s, s->getSequenceNumber(), localMachineId, old_seqn, prev_seqn, false);
1238 /* Generate rejected message entries for present messages */
1239 for (; i < rejectedSlotVector->size(); i++) {
1240 int64_t curr_seqn = rejectedSlotVector->get(i);
1241 Slot *s_msg = buffer->getSlot(curr_seqn);
1242 int64_t machineid = s_msg->getMachineID();
1243 RejectedMessage *rm = new RejectedMessage(s, s->getSequenceNumber(), machineid, curr_seqn, curr_seqn, true);
1250 ThreeTuple<bool, bool, int64_t> Table::doMandatoryResuce(Slot *slot, bool resize) {
1251 int64_t newestSequenceNumber = buffer->getNewestSeqNum();
1252 int64_t oldestSequenceNumber = buffer->getOldestSeqNum();
1253 if (oldestLiveSlotSequenceNumver < oldestSequenceNumber) {
1254 oldestLiveSlotSequenceNumver = oldestSequenceNumber;
1257 int64_t currentSequenceNumber = oldestLiveSlotSequenceNumver;
1258 bool seenLiveSlot = false;
1259 int64_t firstIfFull = newestSequenceNumber + 1 - numberOfSlots; // smallest seq number in the buffer if it is full
1260 int64_t threshold = firstIfFull + Table_FREE_SLOTS; // we want the buffer to be clear of live entries up to this point
1264 for (; currentSequenceNumber < threshold; currentSequenceNumber++) {
1265 Slot previousSlot = buffer->getSlot(currentSequenceNumber);
1266 // Push slot number forward
1267 if (!seenLiveSlot) {
1268 oldestLiveSlotSequenceNumver = currentSequenceNumber;
1271 if (!previousSlot->isLive()) {
1275 // We have seen a live slot
1276 seenLiveSlot = true;
1278 // Get all the live entries for a slot
1279 Vector<Entry *> *liveEntries = previousSlot->getLiveEntries(resize);
1281 // Iterate over all the live entries and try to rescue them
1282 for (Entry *liveEntry : liveEntries) {
1283 if (slot->hasSpace(liveEntry)) {
1285 // Enough space to rescue the entry
1286 slot->addEntry(liveEntry);
1287 } else if (currentSequenceNumber == firstIfFull) {
1288 //if there's no space but the entry is about to fall off the queue
1289 System->out->println("B"); //?
1290 return ThreeTuple<bool, bool, int64_t>(true, seenLiveSlot, currentSequenceNumber);
1297 return ThreeTuple<bool, bool, int64_t>(false, seenLiveSlot, currentSequenceNumber);
1300 void Table::doOptionalRescue(Slot *s, bool seenliveslot, int64_t seqn, bool resize) {
1301 /* now go through live entries from least to greatest sequence number until
1302 * either all live slots added, or the slot doesn't have enough room
1303 * for SKIP_THRESHOLD consecutive entries*/
1305 int64_t newestseqnum = buffer->getNewestSeqNum();
1307 for (; seqn <= newestseqnum; seqn++) {
1308 Slot *prevslot = buffer->getSlot(seqn);
1309 //Push slot number forward
1311 oldestLiveSlotSequenceNumver = seqn;
1313 if (!prevslot->isLive())
1315 seenliveslot = true;
1316 Vector<Entry *> *liveentries = prevslot->getLiveEntries(resize);
1317 for (Entry *liveentry : liveentries) {
1318 if (s->hasSpace(liveentry))
1319 s->addEntry(liveentry);
1322 if (skipcount > Table_SKIP_THRESHOLD)
1330 * Checks for malicious activity and updates the local copy of the block chain->
1332 void Table::validateAndUpdate(Array<Slot *> *newSlots, bool acceptUpdatesToLocal) {
1334 // The cloud communication layer has checked slot HMACs already
1336 if (newSlots->length() == 0) {
1340 // Make sure all slots are newer than the last largest slot this
1342 int64_t firstSeqNum = newSlots->get(0)->getSequenceNumber();
1343 if (firstSeqNum <= sequenceNumber) {
1344 throw new Error("Server Error: Sent older slots!");
1347 // Create an object that can access both new slots and slots in our
1348 // local chain without committing slots to our local chain
1349 SlotIndexer *indexer = new SlotIndexer(newSlots, buffer);
1351 // Check that the HMAC chain is not broken
1352 checkHMACChain(indexer, newSlots);
1354 // Set to keep track of messages from clients
1355 Hashset<int64_t> *machineSet = new Hashset<int64_t>(lastMessageTable->keySet());
1357 // Process each slots data
1358 for (Slot *slot : newSlots) {
1359 processSlot(indexer, slot, acceptUpdatesToLocal, machineSet);
1361 updateExpectedSize();
1364 // If there is a gap, check to see if the server sent us
1366 if (firstSeqNum != (sequenceNumber + 1)) {
1368 // Check the size of the slots that were sent down by the server->
1369 // Can only check the size if there was a gap
1370 checkNumSlots(newSlots->length);
1372 // Since there was a gap every machine must have pushed a slot or
1373 // must have a last message message-> If not then the server is
1375 if (!machineSet->isEmpty()) {
1376 throw new Error("Missing record for machines: " + machineSet);
1380 // Update the size of our local block chain->
1383 // Commit new to slots to the local block chain->
1384 for (Slot *slot : newSlots) {
1386 // Insert this slot into our local block chain copy->
1387 buffer->putSlot(slot);
1389 // Keep track of how many slots are currently live (have live data
1394 // Get the sequence number of the latest slot in the system
1395 sequenceNumber = newSlots->get(newSlots->length() - 1)->getSequenceNumber();
1396 updateLiveStateFromServer();
1398 // No Need to remember after we pulled from the server
1399 offlineTransactionsCommittedAndAtServer->clear();
1401 // This is invalidated now
1402 hadPartialSendToServer = false;
1405 void Table::updateLiveStateFromServer() {
1406 // Process the new transaction parts
1407 processNewTransactionParts();
1409 // Do arbitration on new transactions that were received
1410 arbitrateFromServer();
1412 // Update all the committed keys
1413 bool didCommitOrSpeculate = updateCommittedTable();
1415 // Delete the transactions that are now dead
1416 updateLiveTransactionsAndStatus();
1419 didCommitOrSpeculate |= updateSpeculativeTable(didCommitOrSpeculate);
1420 updatePendingTransactionSpeculativeTable(didCommitOrSpeculate);
1423 void Table::updateLiveStateFromLocal() {
1424 // Update all the committed keys
1425 bool didCommitOrSpeculate = updateCommittedTable();
1427 // Delete the transactions that are now dead
1428 updateLiveTransactionsAndStatus();
1431 didCommitOrSpeculate |= updateSpeculativeTable(didCommitOrSpeculate);
1432 updatePendingTransactionSpeculativeTable(didCommitOrSpeculate);
1435 void Table::initExpectedSize(int64_t firstSequenceNumber, int64_t numberOfSlots) {
1436 int64_t prevslots = firstSequenceNumber;
1438 if (didFindTableStatus) {
1440 expectedsize = (prevslots < ((int64_t) numberOfSlots)) ? (int) prevslots : numberOfSlots;
1443 didFindTableStatus = true;
1444 currMaxSize = numberOfSlots;
1447 void Table::updateExpectedSize() {
1450 if (expectedsize > currMaxSize) {
1451 expectedsize = currMaxSize;
1457 * Check the size of the block chain to make sure there are enough
1458 * slots sent back by the server-> This is only called when we have a
1459 * gap between the slots that we have locally and the slots sent by
1460 * the server therefore in the slots sent by the server there will be
1461 * at least 1 Table status message
1463 void Table::checkNumSlots(int numberOfSlots) {
1464 if (numberOfSlots != expectedsize) {
1465 throw new Error("Server Error: Server did not send all slots-> Expected: " + expectedsize + " Received:" + numberOfSlots);
1469 void Table::updateCurrMaxSize(int newmaxsize) {
1470 currMaxSize = newmaxsize;
1475 * Update the size of of the local buffer if it is needed->
1477 void Table::commitNewMaxSize() {
1478 didFindTableStatus = false;
1480 // Resize the local slot buffer
1481 if (numberOfSlots != currMaxSize) {
1482 buffer->resize((int32_t)currMaxSize);
1485 // Change the number of local slots to the new size
1486 numberOfSlots = (int32_t)currMaxSize;
1488 // Recalculate the resize threshold since the size of the local
1489 // buffer has changed
1490 setResizeThreshold();
1494 * Process the new transaction parts from this latest round of slots
1495 * received from the server
1497 void Table::processNewTransactionParts() {
1499 if (newTransactionParts->size() == 0) {
1500 // Nothing new to process
1504 // Iterate through all the machine Ids that we received new parts
1506 for (int64_t machineId : newTransactionParts->keySet()) {
1507 Hashtable<Pair<int64_t int32_t>, TransactionPart *> *parts = newTransactionParts->get(machineId);
1509 // Iterate through all the parts for that machine Id
1510 for (Pair<int64_t, int32_t> partId : parts->keySet()) {
1511 TransactionPart *part = parts->get(partId);
1513 int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(part->getArbitratorId());
1514 if ((lastTransactionNumber != NULL) && (lastTransactionNumber >= part->getSequenceNumber())) {
1515 // Set dead the transaction part
1520 // Get the transaction object for that sequence number
1521 Transaction *transaction = liveTransactionBySequenceNumberTable->get(part->getSequenceNumber());
1523 if (transaction == NULL) {
1524 // This is a new transaction that we dont have so make a new one
1525 transaction = new Transaction();
1527 // Insert this new transaction into the live tables
1528 liveTransactionBySequenceNumberTable->put(part->getSequenceNumber(), transaction);
1529 liveTransactionByTransactionIdTable->put(part->getTransactionId(), transaction);
1532 // Add that part to the transaction
1533 transaction->addPartDecode(part);
1537 // Clear all the new transaction parts in preparation for the next
1538 // time the server sends slots
1539 newTransactionParts->clear();
1542 void Table::arbitrateFromServer() {
1544 if (liveTransactionBySequenceNumberTable->size() == 0) {
1545 // Nothing to arbitrate on so move on
1549 // Get the transaction sequence numbers and sort from oldest to newest
1550 Vector<int64_t> *transactionSequenceNumbers = new Vector<int64_t>(liveTransactionBySequenceNumberTable->keySet());
1551 Collections->sort(transactionSequenceNumbers);
1553 // Collection of key value pairs that are
1554 Hashtable<IoTString *, KeyValue *> speculativeTableTmp = new Hashtable<IoTString *, KeyValue *>();
1556 // The last transaction arbitrated on
1557 int64_t lastTransactionCommitted = -1;
1558 Hashset<Abort *> *generatedAborts = new Hashset<Abort *>();
1560 for (int64_t transactionSequenceNumber : transactionSequenceNumbers) {
1561 Transaction *transaction = liveTransactionBySequenceNumberTable->get(transactionSequenceNumber);
1565 // Check if this machine arbitrates for this transaction if not
1566 // then we cant arbitrate this transaction
1567 if (transaction->getArbitrator() != localMachineId) {
1571 if (transactionSequenceNumber < lastSeqNumArbOn) {
1575 if (offlineTransactionsCommittedAndAtServer->contains(transaction->getId())) {
1576 // We have seen this already locally so dont commit again
1581 if (!transaction->isComplete()) {
1582 // Will arbitrate in incorrect order if we continue so just break
1588 // update the largest transaction seen by arbitrator from server
1589 if (lastTransactionSeenFromMachineFromServer->get(transaction->getMachineId()) == NULL) {
1590 lastTransactionSeenFromMachineFromServer->put(transaction->getMachineId(), transaction->getClientLocalSequenceNumber());
1592 int64_t lastTransactionSeenFromMachine = lastTransactionSeenFromMachineFromServer->get(transaction->getMachineId());
1593 if (transaction->getClientLocalSequenceNumber() > lastTransactionSeenFromMachine) {
1594 lastTransactionSeenFromMachineFromServer->put(transaction->getMachineId(), transaction->getClientLocalSequenceNumber());
1598 if (transaction->evaluateGuard(committedKeyValueTable, speculativeTableTmp, NULL)) {
1599 // Guard evaluated as true
1601 // Update the local changes so we can make the commit
1602 for (KeyValue *kv : transaction->getKeyValueUpdateSet()) {
1603 speculativeTableTmp->put(kv->getKey(), kv);
1606 // Update what the last transaction committed was for use in batch commit
1607 lastTransactionCommitted = transactionSequenceNumber;
1609 // Guard evaluated was false so create abort
1611 Abort *newAbort = new Abort(NULL,
1612 transaction->getClientLocalSequenceNumber(),
1613 transaction->getSequenceNumber(),
1614 transaction->getMachineId(),
1615 transaction->getArbitrator(),
1616 localArbitrationSequenceNumber);
1617 localArbitrationSequenceNumber++;
1618 generatedAborts->add(newAbort);
1620 // Insert the abort so we can process
1621 processEntry(newAbort);
1624 lastSeqNumArbOn = transactionSequenceNumber;
1627 Commit *newCommit = NULL;
1629 // If there is something to commit
1630 if (speculativeTableTmp->size() != 0) {
1631 // Create the commit and increment the commit sequence number
1632 newCommit = new Commit(localArbitrationSequenceNumber, localMachineId, lastTransactionCommitted);
1633 localArbitrationSequenceNumber++;
1635 // Add all the new keys to the commit
1636 for (KeyValue *kv : speculativeTableTmp->values()) {
1637 newCommit->addKV(kv);
1640 // create the commit parts
1641 newCommit->createCommitParts();
1643 // Append all the commit parts to the end of the pending queue
1644 // waiting for sending to the server
1645 // Insert the commit so we can process it
1646 for (CommitPart *commitPart : newCommit->getParts()->values()) {
1647 processEntry(commitPart);
1651 if ((newCommit != NULL) || (generatedAborts->size() > 0)) {
1652 ArbitrationRound *arbitrationRound = new ArbitrationRound(newCommit, generatedAborts);
1653 pendingSendArbitrationRounds->add(arbitrationRound);
1655 if (compactArbitrationData()) {
1656 ArbitrationRound *newArbitrationRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1);
1657 if (newArbitrationRound->getCommit() != NULL) {
1658 for (CommitPart *commitPart : newArbitrationRound->getCommit()->getParts()->values()) {
1659 processEntry(commitPart);
1666 Pair<bool, bool> Table::arbitrateOnLocalTransaction(Transaction *transaction) {
1668 // Check if this machine arbitrates for this transaction if not then
1669 // we cant arbitrate this transaction
1670 if (transaction->getArbitrator() != localMachineId) {
1671 return Pair<bool, bool>(false, false);
1674 if (!transaction->isComplete()) {
1675 // Will arbitrate in incorrect order if we continue so just break
1677 return Pair<bool, bool>(false, false);
1680 if (transaction->getMachineId() != localMachineId) {
1681 // dont do this check for local transactions
1682 if (lastTransactionSeenFromMachineFromServer->get(transaction->getMachineId()) != NULL) {
1683 if (lastTransactionSeenFromMachineFromServer->get(transaction->getMachineId()) > transaction->getClientLocalSequenceNumber()) {
1684 // We've have already seen this from the server
1685 return Pair<bool, bool>(false, false);
1690 if (transaction->evaluateGuard(committedKeyValueTable, NULL, NULL)) {
1691 // Guard evaluated as true Create the commit and increment the
1692 // commit sequence number
1693 Commit *newCommit = new Commit(localArbitrationSequenceNumber, localMachineId, -1);
1694 localArbitrationSequenceNumber++;
1696 // Update the local changes so we can make the commit
1697 for (KeyValue *kv : transaction->getKeyValueUpdateSet()) {
1698 newCommit->addKV(kv);
1701 // create the commit parts
1702 newCommit->createCommitParts();
1704 // Append all the commit parts to the end of the pending queue
1705 // waiting for sending to the server
1706 ArbitrationRound *arbitrationRound = new ArbitrationRound(newCommit, new Hashset<Abort *>());
1707 pendingSendArbitrationRounds->add(arbitrationRound);
1709 if (compactArbitrationData()) {
1710 ArbitrationRound *newArbitrationRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1);
1711 for (CommitPart *commitPart : newArbitrationRound->getCommit()->getParts()->values()) {
1712 processEntry(commitPart);
1715 // Insert the commit so we can process it
1716 for (CommitPart *commitPart : newCommit->getParts()->values()) {
1717 processEntry(commitPart);
1721 if (transaction->getMachineId() == localMachineId) {
1722 TransactionStatus *status = transaction->getTransactionStatus();
1723 if (status != NULL) {
1724 status->setStatus(TransactionStatus_StatusCommitted);
1728 updateLiveStateFromLocal();
1729 return Pair<bool, bool>(true, true);
1731 if (transaction->getMachineId() == localMachineId) {
1732 // For locally created messages update the status
1733 // Guard evaluated was false so create abort
1734 TransactionStatus status = transaction->getTransactionStatus();
1735 if (status != NULL) {
1736 status->setStatus(TransactionStatus_StatusAborted);
1739 Hashset<Abort *> *addAbortSet = new Hashset<Abort * >();
1742 Abort *newAbort = new Abort(NULL,
1743 transaction->getClientLocalSequenceNumber(),
1745 transaction->getMachineId(),
1746 transaction->getArbitrator(),
1747 localArbitrationSequenceNumber);
1748 localArbitrationSequenceNumber++;
1749 addAbortSet->add(newAbort);
1751 // Append all the commit parts to the end of the pending queue
1752 // waiting for sending to the server
1753 ArbitrationRound *arbitrationRound = new ArbitrationRound(NULL, addAbortSet);
1754 pendingSendArbitrationRounds->add(arbitrationRound);
1756 if (compactArbitrationData()) {
1757 ArbitrationRound *newArbitrationRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1);
1758 for (CommitPart *commitPart : newArbitrationRound->getCommit()->getParts()->values()) {
1759 processEntry(commitPart);
1764 updateLiveStateFromLocal();
1765 return Pair<bool, bool>(true, false);
1770 * Compacts the arbitration data my merging commits and aggregating
1771 * aborts so that a single large push of commits can be done instead
1772 * of many small updates
1774 bool Table::compactArbitrationData() {
1775 if (pendingSendArbitrationRounds->size() < 2) {
1776 // Nothing to compact so do nothing
1780 ArbitrationRound *lastRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1);
1781 if (lastRound->didSendPart()) {
1785 bool hadCommit = (lastRound->getCommit() == NULL);
1786 bool gotNewCommit = false;
1788 int numberToDelete = 1;
1789 while (numberToDelete < pendingSendArbitrationRounds->size()) {
1790 ArbitrationRound *xs round = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - numberToDelete - 1);
1792 if (round->isFull() || round->didSendPart()) {
1793 // Stop since there is a part that cannot be compacted and we
1794 // need to compact in order
1798 if (round->getCommit() == NULL) {
1799 // Try compacting aborts only
1800 int newSize = round->getCurrentSize() + lastRound->getAbortsCount();
1801 if (newSize > ArbitrationRound->MAX_PARTS) {
1802 // Cant compact since it would be too large
1805 lastRound->addAborts(round->getAborts());
1807 // Create a new larger commit
1808 Commit newCommit = Commit->merge(lastRound->getCommit(), round->getCommit(), localArbitrationSequenceNumber);
1809 localArbitrationSequenceNumber++;
1811 // Create the commit parts so that we can count them
1812 newCommit->createCommitParts();
1814 // Calculate the new size of the parts
1815 int newSize = newCommit->getNumberOfParts();
1816 newSize += lastRound->getAbortsCount();
1817 newSize += round->getAbortsCount();
1819 if (newSize > ArbitrationRound->MAX_PARTS) {
1820 // Cant compact since it would be too large
1824 // Set the new compacted part
1825 lastRound->setCommit(newCommit);
1826 lastRound->addAborts(round->getAborts());
1827 gotNewCommit = true;
1833 if (numberToDelete != 1) {
1834 // If there is a compaction
1835 // Delete the previous pieces that are now in the new compacted piece
1836 if (numberToDelete == pendingSendArbitrationRounds->size()) {
1837 pendingSendArbitrationRounds->clear();
1839 for (int i = 0; i < numberToDelete; i++) {
1840 pendingSendArbitrationRounds->remove(pendingSendArbitrationRounds->size() - 1);
1844 // Add the new compacted into the pending to send list
1845 pendingSendArbitrationRounds->add(lastRound);
1847 // Should reinsert into the commit processor
1848 if (hadCommit && gotNewCommit) {
1857 * Update all the commits and the committed tables, sets dead the dead
1860 bool Table::updateCommittedTable() {
1862 if (newCommitParts->size() == 0) {
1863 // Nothing new to process
1867 // Iterate through all the machine Ids that we received new parts for
1868 for (int64_t machineId : newCommitParts->keySet()) {
1869 Hashtable<Pair<int64_t, int32_t>, CommitPart *> *parts = newCommitParts->get(machineId);
1871 // Iterate through all the parts for that machine Id
1872 for (Pair<int64_t, int32_t> partId : parts->keySet()) {
1873 CommitPart *part = parts->get(partId);
1875 // Get the transaction object for that sequence number
1876 Hashtable<int64_t, Commit *> *commitForClientTable = liveCommitsTable->get(part->getMachineId());
1878 if (commitForClientTable == NULL) {
1879 // This is the first commit from this device
1880 commitForClientTable = new Hashtable<int64_t, Commit *>();
1881 liveCommitsTable->put(part->getMachineId(), commitForClientTable);
1884 Commit *commit = commitForClientTable->get(part->getSequenceNumber());
1886 if (commit == NULL) {
1887 // This is a new commit that we dont have so make a new one
1888 commit = new Commit();
1890 // Insert this new commit into the live tables
1891 commitForClientTable->put(part->getSequenceNumber(), commit);
1894 // Add that part to the commit
1895 commit->addPartDecode(part);
1899 // Clear all the new commits parts in preparation for the next time
1900 // the server sends slots
1901 newCommitParts->clear();
1903 // If we process a new commit keep track of it for future use
1904 bool didProcessANewCommit = false;
1906 // Process the commits one by one
1907 for (int64_T arbitratorId : liveCommitsTable->keySet()) {
1909 // Get all the commits for a specific arbitrator
1910 Hashtable<int64_t, Commit *> *commitForClientTable = liveCommitsTable->get(arbitratorId);
1912 // Sort the commits in order
1913 Vector<int64_t> *commitSequenceNumbers = new Vector<int64_t>(commitForClientTable->keySet());
1914 Collections->sort(commitSequenceNumbers);
1916 // Get the last commit seen from this arbitrator
1917 int64_t lastCommitSeenSequenceNumber = -1;
1918 if (lastCommitSeenSequenceNumberByArbitratorTable->get(arbitratorId) != NULL) {
1919 lastCommitSeenSequenceNumber = lastCommitSeenSequenceNumberByArbitratorTable->get(arbitratorId);
1922 // Go through each new commit one by one
1923 for (int i = 0; i < commitSequenceNumbers->size(); i++) {
1924 int64_t commitSequenceNumber = commitSequenceNumbers->get(i);
1925 Commit *commit = commitForClientTable->get(commitSequenceNumber);
1927 // Special processing if a commit is not complete
1928 if (!commit->isComplete()) {
1929 if (i == (commitSequenceNumbers->size() - 1)) {
1930 // If there is an incomplete commit and this commit is the
1931 // latest one seen then this commit cannot be processed and
1932 // there are no other commits
1935 // This is a commit that was already dead but parts of it
1936 // are still in the block chain (not flushed out yet)->
1937 // Delete it and move on
1939 commitForClientTable->remove(commit->getSequenceNumber());
1944 // Update the last transaction that was updated if we can
1945 if (commit->getTransactionSequenceNumber() != -1) {
1946 int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(commit->getMachineId());
1948 // Update the last transaction sequence number that the arbitrator arbitrated on
1949 if ((lastTransactionNumber == NULL) || (lastTransactionNumber < commit->getTransactionSequenceNumber())) {
1950 lastArbitratedTransactionNumberByArbitratorTable->put(commit->getMachineId(), commit->getTransactionSequenceNumber());
1954 // Update the last arbitration data that we have seen so far
1955 if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(commit->getMachineId()) != NULL) {
1957 int64_t lastArbitrationSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(commit->getMachineId());
1958 if (commit->getSequenceNumber() > lastArbitrationSequenceNumber) {
1960 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->put(commit->getMachineId(), commit->getSequenceNumber());
1963 // Never seen any data from this arbitrator so record the first one
1964 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->put(commit->getMachineId(), commit->getSequenceNumber());
1967 // We have already seen this commit before so need to do the
1968 // full processing on this commit
1969 if (commit->getSequenceNumber() <= lastCommitSeenSequenceNumber) {
1971 // Update the last transaction that was updated if we can
1972 if (commit->getTransactionSequenceNumber() != -1) {
1973 int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(commit->getMachineId());
1975 // Update the last transaction sequence number that the arbitrator arbitrated on
1976 if ((lastTransactionNumber == NULL) || (lastTransactionNumber < commit->getTransactionSequenceNumber())) {
1977 lastArbitratedTransactionNumberByArbitratorTable->put(commit->getMachineId(), commit->getTransactionSequenceNumber());
1984 // If we got here then this is a brand new commit and needs full
1986 // Get what commits should be edited, these are the commits that
1987 // have live values for their keys
1988 Hashset<Commit *> *commitsToEdit = new Hashset<Commit *>();
1989 for (KeyValue *kv : commit->getKeyValueUpdateSet()) {
1990 commitsToEdit->add(liveCommitsByKeyTable->get(kv->getKey()));
1992 commitsToEdit->remove(NULL); // remove NULL since it could be in this set
1994 // Update each previous commit that needs to be updated
1995 for (Commit *previousCommit : commitsToEdit) {
1997 // Only bother with live commits (TODO: Maybe remove this check)
1998 if (previousCommit->isLive()) {
2000 // Update which keys in the old commits are still live
2001 for (KeyValue *kv : commit->getKeyValueUpdateSet()) {
2002 previousCommit->invalidateKey(kv->getKey());
2005 // if the commit is now dead then remove it
2006 if (!previousCommit->isLive()) {
2007 commitForClientTable->remove(previousCommit);
2012 // Update the last seen sequence number from this arbitrator
2013 if (lastCommitSeenSequenceNumberByArbitratorTable->get(commit->getMachineId()) != NULL) {
2014 if (commit->getSequenceNumber() > lastCommitSeenSequenceNumberByArbitratorTable->get(commit->getMachineId())) {
2015 lastCommitSeenSequenceNumberByArbitratorTable->put(commit->getMachineId(), commit->getSequenceNumber());
2018 lastCommitSeenSequenceNumberByArbitratorTable->put(commit->getMachineId(), commit->getSequenceNumber());
2021 // We processed a new commit that we havent seen before
2022 didProcessANewCommit = true;
2024 // Update the committed table of keys and which commit is using which key
2025 for (KeyValue *kv : commit->getKeyValueUpdateSet()) {
2026 committedKeyValueTable->put(kv->getKey(), kv);
2027 liveCommitsByKeyTable->put(kv->getKey(), commit);
2032 return didProcessANewCommit;
2036 * Create the speculative table from transactions that are still live
2037 * and have come from the cloud
2039 bool Table::updateSpeculativeTable(bool didProcessNewCommits) {
2040 if (liveTransactionBySequenceNumberTable->keySet()->size() == 0) {
2041 // There is nothing to speculate on
2045 // Create a list of the transaction sequence numbers and sort them
2046 // from oldest to newest
2047 Vector<int64_t> *transactionSequenceNumbersSorted = new Vector<int64_t>(liveTransactionBySequenceNumberTable->keySet());
2048 Collections->sort(transactionSequenceNumbersSorted);
2050 bool hasGapInTransactionSequenceNumbers = transactionSequenceNumbersSorted->get(0) != oldestTransactionSequenceNumberSpeculatedOn;
2053 if (hasGapInTransactionSequenceNumbers || didProcessNewCommits) {
2054 // If there is a gap in the transaction sequence numbers then
2055 // there was a commit or an abort of a transaction OR there was a
2056 // new commit (Could be from offline commit) so a redo the
2057 // speculation from scratch
2059 // Start from scratch
2060 speculatedKeyValueTable->clear();
2061 lastTransactionSequenceNumberSpeculatedOn = -1;
2062 oldestTransactionSequenceNumberSpeculatedOn = -1;
2066 // Remember the front of the transaction list
2067 oldestTransactionSequenceNumberSpeculatedOn = transactionSequenceNumbersSorted->get(0);
2069 // Find where to start arbitration from
2070 int startIndex = transactionSequenceNumbersSorted->indexOf(lastTransactionSequenceNumberSpeculatedOn) + 1;
2072 if (startIndex >= transactionSequenceNumbersSorted->size()) {
2073 // Make sure we are not out of bounds
2074 return false; // did not speculate
2077 Hashset<int64_t> *incompleteTransactionArbitrator = new Hashset<int64_t>();
2078 bool didSkip = true;
2080 for (int i = startIndex; i < transactionSequenceNumbersSorted->size(); i++) {
2081 int64_t transactionSequenceNumber = transactionSequenceNumbersSorted->get(i);
2082 Transaction *transaction = liveTransactionBySequenceNumberTable->get(transactionSequenceNumber);
2084 if (!transaction->isComplete()) {
2085 // If there is an incomplete transaction then there is nothing
2086 // we can do add this transactions arbitrator to the list of
2087 // arbitrators we should ignore
2088 incompleteTransactionArbitrator->add(transaction->getArbitrator());
2093 if (incompleteTransactionArbitrator->contains(transaction->getArbitrator())) {
2097 lastTransactionSequenceNumberSpeculatedOn = transactionSequenceNumber;
2099 if (transaction->evaluateGuard(committedKeyValueTable, speculatedKeyValueTable, NULL)) {
2100 // Guard evaluated to true so update the speculative table
2101 for (KeyValue *kv : transaction->getKeyValueUpdateSet()) {
2102 speculatedKeyValueTable->put(kv->getKey(), kv);
2108 // Since there was a skip we need to redo the speculation next time around
2109 lastTransactionSequenceNumberSpeculatedOn = -1;
2110 oldestTransactionSequenceNumberSpeculatedOn = -1;
2113 // We did some speculation
2118 * Create the pending transaction speculative table from transactions
2119 * that are still in the pending transaction buffer
2121 void Table::updatePendingTransactionSpeculativeTable(bool didProcessNewCommitsOrSpeculate) {
2122 if (pendingTransactionQueue->size() == 0) {
2123 // There is nothing to speculate on
2127 if (didProcessNewCommitsOrSpeculate || (firstPendingTransaction != pendingTransactionQueue->get(0))) {
2128 // need to reset on the pending speculation
2129 lastPendingTransactionSpeculatedOn = NULL;
2130 firstPendingTransaction = pendingTransactionQueue->get(0);
2131 pendingTransactionSpeculatedKeyValueTable->clear();
2134 // Find where to start arbitration from
2135 int startIndex = pendingTransactionQueue->indexOf(firstPendingTransaction) + 1;
2137 if (startIndex >= pendingTransactionQueue->size()) {
2138 // Make sure we are not out of bounds
2142 for (int i = startIndex; i < pendingTransactionQueue->size(); i++) {
2143 Transaction *transaction = pendingTransactionQueue->get(i);
2145 lastPendingTransactionSpeculatedOn = transaction;
2147 if (transaction->evaluateGuard(committedKeyValueTable, speculatedKeyValueTable, pendingTransactionSpeculatedKeyValueTable)) {
2148 // Guard evaluated to true so update the speculative table
2149 for (KeyValue *kv : transaction->getKeyValueUpdateSet()) {
2150 pendingTransactionSpeculatedKeyValueTable->put(kv->getKey(), kv);
2157 * Set dead and remove from the live transaction tables the
2158 * transactions that are dead
2160 void Table::updateLiveTransactionsAndStatus() {
2162 // Go through each of the transactions
2163 for (Iterator<Map->Entry<int64_t, Transaction> > *iter = liveTransactionBySequenceNumberTable->entrySet()->iterator(); iter->hasNext();) {
2164 Transaction *transaction = iter->next()->getValue();
2166 // Check if the transaction is dead
2167 int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(transaction->getArbitrator());
2168 if ((lastTransactionNumber != NULL) && (lastTransactionNumber >= transaction->getSequenceNumber())) {
2170 // Set dead the transaction
2171 transaction->setDead();
2173 // Remove the transaction from the live table
2175 liveTransactionByTransactionIdTable->remove(transaction->getId());
2179 // Go through each of the transactions
2180 for (Iterator<Map->Entry<int64_t, TransactionStatus *> > *iter = outstandingTransactionStatus->entrySet()->iterator(); iter->hasNext();) {
2181 TransactionStatus *status = iter->next()->getValue();
2183 // Check if the transaction is dead
2184 int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(status->getTransactionArbitrator());
2185 if ((lastTransactionNumber != NULL) && (lastTransactionNumber >= status->getTransactionSequenceNumber())) {
2188 status->setStatus(TransactionStatus_StatusCommitted);
2197 * Process this slot, entry by entry-> Also update the latest message sent by slot
2199 void Table::processSlot(SlotIndexer *indexer, Slot *slot, bool acceptUpdatesToLocal, Hashset<int64_t> *machineSet) {
2201 // Update the last message seen
2202 updateLastMessage(slot->getMachineID(), slot->getSequenceNumber(), slot, acceptUpdatesToLocal, machineSet);
2204 // Process each entry in the slot
2205 for (Entry *entry : slot->getEntries()) {
2206 switch (entry->getType()) {
2207 case TypeCommitPart:
2208 processEntry((CommitPart *)entry);
2211 processEntry((Abort *)entry);
2213 case TypeTransactionPart:
2214 processEntry((TransactionPart *)entry);
2217 processEntry((NewKey *)entry);
2219 case TypeLastMessage:
2220 processEntry((LastMessage *)entry, machineSet);
2222 case TypeRejectedMessage:
2223 processEntry((RejectedMessage *)entry, indexer);
2225 case TypeTableStatus:
2226 processEntry((TableStatus *)entry, slot->getSequenceNumber());
2229 throw new Error("Unrecognized type: " + entry->getType());
2235 * Update the last message that was sent for a machine Id
2237 void Table::processEntry(LastMessage *entry, Hashset<int64_t> *machineSet) {
2238 // Update what the last message received by a machine was
2239 updateLastMessage(entry->getMachineID(), entry->getSequenceNumber(), entry, false, machineSet);
2243 * Add the new key to the arbitrators table and update the set of live
2244 * new keys (in case of a rescued new key message)
2246 void Table::processEntry(NewKey *entry) {
2247 // Update the arbitrator table with the new key information
2248 arbitratorTable->put(entry->getKey(), entry->getMachineID());
2250 // Update what the latest live new key is
2251 NewKey *oldNewKey = liveNewKeyTable->put(entry->getKey(), entry);
2252 if (oldNewKey != NULL) {
2253 // Delete the old new key messages
2254 oldNewKey->setDead();
2259 * Process new table status entries and set dead the old ones as new
2260 * ones come in-> keeps track of the largest and smallest table status
2261 * seen in this current round of updating the local copy of the block
2264 void Table::processEntry(TableStatus entry, int64_t seq) {
2265 int newNumSlots = entry->getMaxSlots();
2266 updateCurrMaxSize(newNumSlots);
2267 initExpectedSize(seq, newNumSlots);
2269 if (liveTableStatus != NULL) {
2270 // We have a larger table status so the old table status is no
2272 liveTableStatus->setDead();
2275 // Make this new table status the latest alive table status
2276 liveTableStatus = entry;
2280 * Check old messages to see if there is a block chain violation->
2283 void Table::processEntry(RejectedMessage *entry, SlotIndexer *indexer) {
2284 int64_t oldSeqNum = entry->getOldSeqNum();
2285 int64_t newSeqNum = entry->getNewSeqNum();
2286 bool isequal = entry->getEqual();
2287 int64_t machineId = entry->getMachineID();
2288 int64_t seq = entry->getSequenceNumber();
2290 // Check if we have messages that were supposed to be rejected in
2291 // our local block chain
2292 for (int64_t seqNum = oldSeqNum; seqNum <= newSeqNum; seqNum++) {
2294 Slot *slot = indexer->getSlot(seqNum);
2297 // If we have this slot make sure that it was not supposed to be
2299 int64_t slotMachineId = slot->getMachineID();
2300 if (isequal != (slotMachineId == machineId)) {
2301 throw new Error("Server Error: Trying to insert rejected message for slot " + seqNum);
2306 // Create a list of clients to watch until they see this rejected
2308 Hashset<int64_t> *deviceWatchSet = new Hashset<int64_t>();
2309 for (Map->Entry<int64_t, Pair<int64_t, Liveness *> > *lastMessageEntry : lastMessageTable->entrySet()) {
2310 // Machine ID for the last message entry
2311 int64_t lastMessageEntryMachineId = lastMessageEntry->getKey();
2313 // We've seen it, don't need to continue to watch-> Our next
2314 // message will implicitly acknowledge it->
2315 if (lastMessageEntryMachineId == localMachineId) {
2319 Pair<int64_t, Liveness *> lastMessageValue = lastMessageEntry->getValue();
2320 int64_t entrySequenceNumber = lastMessageValue->getFirst();
2322 if (entrySequenceNumber < seq) {
2323 // Add this rejected message to the set of messages that this
2324 // machine ID did not see yet
2325 addWatchVector(lastMessageEntryMachineId, entry);
2326 // This client did not see this rejected message yet so add it
2327 // to the watch set to monitor
2328 deviceWatchSet->add(lastMessageEntryMachineId);
2331 if (deviceWatchSet->isEmpty()) {
2332 // This rejected message has been seen by all the clients so
2335 // We need to watch this rejected message
2336 entry->setWatchSet(deviceWatchSet);
2341 * Check if this abort is live, if not then save it so we can kill it
2342 * later-> update the last transaction number that was arbitrated on->
2344 void Table::processEntry(Abort *entry) {
2345 if (entry->getTransactionSequenceNumber() != -1) {
2346 // update the transaction status if it was sent to the server
2347 TransactionStatus *status = outstandingTransactionStatus->remove(entry->getTransactionSequenceNumber());
2348 if (status != NULL) {
2349 status->setStatus(TransactionStatus_StatusAborted);
2353 // Abort has not been seen by the client it is for yet so we need to
2355 Abort *previouslySeenAbort = liveAbortTable->put(entry->getAbortId(), entry);
2356 if (previouslySeenAbort != NULL) {
2357 previouslySeenAbort->setDead(); // Delete old version of the abort since we got a rescued newer version
2360 if (entry->getTransactionArbitrator() == localMachineId) {
2361 liveAbortsGeneratedByLocal->put(entry->getArbitratorLocalSequenceNumber(), entry);
2364 if ((entry->getSequenceNumber() != -1) && (lastMessageTable->get(entry->getTransactionMachineId())->getFirst() >= entry->getSequenceNumber())) {
2365 // The machine already saw this so it is dead
2367 liveAbortTable->remove(entry->getAbortId());
2369 if (entry->getTransactionArbitrator() == localMachineId) {
2370 liveAbortsGeneratedByLocal->remove(entry->getArbitratorLocalSequenceNumber());
2375 // Update the last arbitration data that we have seen so far
2376 if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(entry->getTransactionArbitrator()) != NULL) {
2377 int64_t lastArbitrationSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(entry->getTransactionArbitrator());
2378 if (entry->getSequenceNumber() > lastArbitrationSequenceNumber) {
2380 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->put(entry->getTransactionArbitrator(), entry->getSequenceNumber());
2383 // Never seen any data from this arbitrator so record the first one
2384 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->put(entry->getTransactionArbitrator(), entry->getSequenceNumber());
2387 // Set dead a transaction if we can
2388 Transaction *transactionToSetDead = liveTransactionByTransactionIdTable->remove(Pair<int64_t, int64_t>(entry->getTransactionMachineId(), entry->getTransactionClientLocalSequenceNumber()));
2389 if (transactionToSetDead != NULL) {
2390 liveTransactionBySequenceNumberTable->remove(transactionToSetDead->getSequenceNumber());
2393 // Update the last transaction sequence number that the arbitrator
2395 int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(entry->getTransactionArbitrator());
2396 if ((lastTransactionNumber == NULL) || (lastTransactionNumber < entry->getTransactionSequenceNumber())) {
2398 if (entry->getTransactionSequenceNumber() != -1) {
2399 lastArbitratedTransactionNumberByArbitratorTable->put(entry->getTransactionArbitrator(), entry->getTransactionSequenceNumber());
2405 * Set dead the transaction part if that transaction is dead and keep
2406 * track of all new parts
2408 void Table::processEntry(TransactionPart *entry) {
2409 // Check if we have already seen this transaction and set it dead OR
2410 // if it is not alive
2411 int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(entry->getArbitratorId());
2412 if ((lastTransactionNumber != NULL) && (lastTransactionNumber >= entry->getSequenceNumber())) {
2413 // This transaction is dead, it was already committed or aborted
2418 // This part is still alive
2419 Hashtable<Pair<int64_t, int32_t>, TransactionPart *> *transactionPart = newTransactionParts->get(entry->getMachineId());
2421 if (transactionPart == NULL) {
2422 // Dont have a table for this machine Id yet so make one
2423 transactionPart = new Hashtable<Pair<int64_t, int32_t>, TransactionPart *>();
2424 newTransactionParts->put(entry->getMachineId(), transactionPart);
2427 // Update the part and set dead ones we have already seen (got a
2429 TransactionPart *previouslySeenPart = transactionPart->put(entry->getPartId(), entry);
2430 if (previouslySeenPart != NULL) {
2431 previouslySeenPart->setDead();
2436 * Process new commit entries and save them for future use-> Delete duplicates
2438 void Table::processEntry(CommitPart *entry) {
2439 // Update the last transaction that was updated if we can
2440 if (entry->getTransactionSequenceNumber() != -1) {
2441 int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(entry->getMachineId());
2442 // Update the last transaction sequence number that the arbitrator
2444 if ((lastTransactionNumber == NULL) || (lastTransactionNumber < entry->getTransactionSequenceNumber())) {
2445 lastArbitratedTransactionNumberByArbitratorTable->put(entry->getMachineId(), entry->getTransactionSequenceNumber());
2449 Hashtable<Pair<int64_t, int32_t>, CommitPart *> *commitPart = newCommitParts->get(entry->getMachineId());
2450 if (commitPart == NULL) {
2451 // Don't have a table for this machine Id yet so make one
2452 commitPart = new Hashtable<Pair<int64_t, int32_t>, CommitPart *>();
2453 newCommitParts->put(entry->getMachineId(), commitPart);
2455 // Update the part and set dead ones we have already seen (got a
2457 CommitPart *previouslySeenPart = commitPart->put(entry->getPartId(), entry);
2458 if (previouslySeenPart != NULL) {
2459 previouslySeenPart->setDead();
2464 * Update the last message seen table-> Update and set dead the
2465 * appropriate RejectedMessages as clients see them-> Updates the live
2466 * aborts, removes those that are dead and sets them dead-> Check that
2467 * the last message seen is correct and that there is no mismatch of
2468 * our own last message or that other clients have not had a rollback
2469 * on the last message->
2471 void Table::updateLastMessage(int64_t machineId, int64_t seqNum, Liveness *liveness, bool acceptUpdatesToLocal, Hashset<intr64_t> *machineSet) {
2472 // We have seen this machine ID
2473 machineSet->remove(machineId);
2475 // Get the set of rejected messages that this machine Id is has not seen yet
2476 Hashset<RejectedMessage *> *watchset = rejectedMessageWatchVectorTable->get(machineId);
2477 // If there is a rejected message that this machine Id has not seen yet
2478 if (watchset != NULL) {
2479 // Go through each rejected message that this machine Id has not
2481 for (Iterator<RejectedMessage *> *rmit = watchset->iterator(); rmit->hasNext(); ) {
2482 RejectedMessage *rm = rmit->next();
2483 // If this machine Id has seen this rejected message->->->
2484 if (rm->getSequenceNumber() <= seqNum) {
2485 // Remove it from our watchlist
2487 // Decrement machines that need to see this notification
2488 rm->removeWatcher(machineId);
2493 // Set dead the abort
2494 for (Iterator<Map->Entry<Pair<int64_t, int64_t>, Abort *> > i = liveAbortTable->entrySet()->iterator(); i->hasNext();) {
2495 Abort *abort = i->next()->getValue();
2496 if ((abort->getTransactionMachineId() == machineId) && (abort->getSequenceNumber() <= seqNum)) {
2499 if (abort->getTransactionArbitrator() == localMachineId) {
2500 liveAbortsGeneratedByLocal->remove(abort->getArbitratorLocalSequenceNumber());
2504 if (machineId == localMachineId) {
2505 // Our own messages are immediately dead->
2506 if (liveness instanceof LastMessage) {
2507 ((LastMessage *)liveness)->setDead();
2508 } else if (liveness instanceof Slot) {
2509 ((Slot *)liveness)->setDead();
2511 throw new Error("Unrecognized type");
2514 // Get the old last message for this device
2515 Pair<int64_t, Liveness *> lastMessageEntry = lastMessageTable->put(machineId, Pair<int64_t, Liveness *>(seqNum, liveness));
2516 if (lastMessageEntry == NULL) {
2517 // If no last message then there is nothing else to process
2521 int64_t lastMessageSeqNum = lastMessageEntry->getFirst();
2522 Liveness *lastEntry = lastMessageEntry->getSecond();
2524 // If it is not our machine Id since we already set ours to dead
2525 if (machineId != localMachineId) {
2526 if (lastEntry instanceof LastMessage) {
2527 ((LastMessage *)lastEntry)->setDead();
2528 } else if (lastEntry instanceof Slot) {
2529 ((Slot *)lastEntry)->setDead();
2531 throw new Error("Unrecognized type");
2534 // Make sure the server is not playing any games
2535 if (machineId == localMachineId) {
2536 if (hadPartialSendToServer) {
2537 // We were not making any updates and we had a machine mismatch
2538 if (lastMessageSeqNum > seqNum && !acceptUpdatesToLocal) {
2539 throw new Error("Server Error: Mismatch on local machine sequence number, needed at least: " + lastMessageSeqNum + " got: " + seqNum);
2542 // We were not making any updates and we had a machine mismatch
2543 if (lastMessageSeqNum != seqNum && !acceptUpdatesToLocal) {
2544 throw new Error("Server Error: Mismatch on local machine sequence number, needed: " + lastMessageSeqNum + " got: " + seqNum);
2548 if (lastMessageSeqNum > seqNum) {
2549 throw new Error("Server Error: Rollback on remote machine sequence number");
2555 * Add a rejected message entry to the watch set to keep track of
2556 * which clients have seen that rejected message entry and which have
2559 void Table::addWatchVector(int64_t machineId, RejectedMessage *entry) {
2560 Hashset<RejectedMessage *> *entries = rejectedMessageWatchVectorTable->get(machineId);
2561 if (entries == NULL) {
2562 // There is no set for this machine ID yet so create one
2563 entries = new Hashset<RejectedMessage *>();
2564 rejectedMessageWatchVectorTable->put(machineId, entries);
2566 entries->add(entry);
2570 * Check if the HMAC chain is not violated
2572 void Table::checkHMACChain(SlotIndexer *indexer, Array<Slot *> *newSlots) {
2573 for (int i = 0; i < newSlots->length(); i++) {
2574 Slot *currSlot = newSlots->get(i);
2575 Slot *prevSlot = indexer->getSlot(currSlot->getSequenceNumber() - 1);
2576 if (prevSlot != NULL &&
2577 !prevSlot->getHMAC()->equals(currSlot->getPrevHMAC()))
2578 throw new Error("Server Error: Invalid HMAC Chain");