3 #include "SlotBuffer.h"
8 #include "PendingTransaction.h"
9 #include "TableStatus.h"
10 #include "TransactionStatus.h"
11 #include "Transaction.h"
12 #include "LastMessage.h"
16 Table::Table(IoTString *baseurl, IoTString *password, int64_t _localMachineId, int listeningPort) :
18 cloud(new CloudComm(this, baseurl, password, listeningPort)),
20 liveTableStatus(NULL),
21 pendingTransactionBuilder(NULL),
22 lastPendingTransactionSpeculatedOn(NULL),
23 firstPendingTransaction(NULL),
25 bufferResizeThreshold(0),
27 oldestLiveSlotSequenceNumver(1),
28 localMachineId(_localMachineId),
30 localTransactionSequenceNumber(0),
31 lastTransactionSequenceNumberSpeculatedOn(0),
32 oldestTransactionSequenceNumberSpeculatedOn(0),
33 localArbitrationSequenceNumber(0),
34 hadPartialSendToServer(false),
35 attemptedToSendToServer(false),
37 didFindTableStatus(false),
39 lastSlotAttemptedToSend(NULL),
42 lastTransactionPartsSent(NULL),
43 lastPendingSendArbitrationEntriesToDelete(NULL),
45 committedKeyValueTable(NULL),
46 speculatedKeyValueTable(NULL),
47 pendingTransactionSpeculatedKeyValueTable(NULL),
48 liveNewKeyTable(NULL),
49 lastMessageTable(NULL),
50 rejectedMessageWatchVectorTable(NULL),
51 arbitratorTable(NULL),
53 newTransactionParts(NULL),
55 lastArbitratedTransactionNumberByArbitratorTable(NULL),
56 liveTransactionBySequenceNumberTable(NULL),
57 liveTransactionByTransactionIdTable(NULL),
58 liveCommitsTable(NULL),
59 liveCommitsByKeyTable(NULL),
60 lastCommitSeenSequenceNumberByArbitratorTable(NULL),
61 rejectedSlotVector(NULL),
62 pendingTransactionQueue(NULL),
63 pendingSendArbitrationRounds(NULL),
64 pendingSendArbitrationEntriesToDelete(NULL),
65 transactionPartsSent(NULL),
66 outstandingTransactionStatus(NULL),
67 liveAbortsGeneratedByLocal(NULL),
68 offlineTransactionsCommittedAndAtServer(NULL),
69 localCommunicationTable(NULL),
70 lastTransactionSeenFromMachineFromServer(NULL),
71 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator(NULL),
72 lastInsertedNewKey(false),
78 Table::Table(CloudComm *_cloud, int64_t _localMachineId) :
82 liveTableStatus(NULL),
83 pendingTransactionBuilder(NULL),
84 lastPendingTransactionSpeculatedOn(NULL),
85 firstPendingTransaction(NULL),
87 bufferResizeThreshold(0),
89 oldestLiveSlotSequenceNumver(1),
90 localMachineId(_localMachineId),
92 localTransactionSequenceNumber(0),
93 lastTransactionSequenceNumberSpeculatedOn(0),
94 oldestTransactionSequenceNumberSpeculatedOn(0),
95 localArbitrationSequenceNumber(0),
96 hadPartialSendToServer(false),
97 attemptedToSendToServer(false),
99 didFindTableStatus(false),
101 lastSlotAttemptedToSend(NULL),
104 lastTransactionPartsSent(NULL),
105 lastPendingSendArbitrationEntriesToDelete(NULL),
107 committedKeyValueTable(NULL),
108 speculatedKeyValueTable(NULL),
109 pendingTransactionSpeculatedKeyValueTable(NULL),
110 liveNewKeyTable(NULL),
111 lastMessageTable(NULL),
112 rejectedMessageWatchVectorTable(NULL),
113 arbitratorTable(NULL),
114 liveAbortTable(NULL),
115 newTransactionParts(NULL),
116 newCommitParts(NULL),
117 lastArbitratedTransactionNumberByArbitratorTable(NULL),
118 liveTransactionBySequenceNumberTable(NULL),
119 liveTransactionByTransactionIdTable(NULL),
120 liveCommitsTable(NULL),
121 liveCommitsByKeyTable(NULL),
122 lastCommitSeenSequenceNumberByArbitratorTable(NULL),
123 rejectedSlotVector(NULL),
124 pendingTransactionQueue(NULL),
125 pendingSendArbitrationRounds(NULL),
126 pendingSendArbitrationEntriesToDelete(NULL),
127 transactionPartsSent(NULL),
128 outstandingTransactionStatus(NULL),
129 liveAbortsGeneratedByLocal(NULL),
130 offlineTransactionsCommittedAndAtServer(NULL),
131 localCommunicationTable(NULL),
132 lastTransactionSeenFromMachineFromServer(NULL),
133 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator(NULL),
134 lastInsertedNewKey(false),
141 * Init all the stuff needed for for table usage
144 // Init helper objects
145 random = new Random();
146 buffer = new SlotBuffer();
149 committedKeyValueTable = new Hashtable<IoTString *, KeyValue *>();
150 speculatedKeyValueTable = new Hashtable<IoTString *, KeyValue *>();
151 pendingTransactionSpeculatedKeyValueTable = new Hashtable<IoTString *, KeyValue *>();
152 liveNewKeyTable = new Hashtable<IoTString *, NewKey *>();
153 lastMessageTable = new Hashtable<int64_t, Pair<int64_t, Liveness *> >();
154 rejectedMessageWatchVectorTable = new Hashtable<int64_t, Hashset<RejectedMessage *> * >();
155 arbitratorTable = new Hashtable<IoTString *, int64_t>();
156 liveAbortTable = new Hashtable<Pair<int64_t, int64_t>, Abort *>();
157 newTransactionParts = new Hashtable<int64_t, Hashtable<Pair<int64_t, int32_t>, TransactionPart *> *>();
158 newCommitParts = new Hashtable<int64_t, Hashtable<Pair<int64_t, int32_t>, CommitPart *> *>();
159 lastArbitratedTransactionNumberByArbitratorTable = new Hashtable<int64_t, int64_t>();
160 liveTransactionBySequenceNumberTable = new Hashtable<int64_t, Transaction *>();
161 liveTransactionByTransactionIdTable = new Hashtable<Pair<int64_t, int64_t>, Transaction *>();
162 liveCommitsTable = new Hashtable<int64_t, Hashtable<int64_t, Commit *> >();
163 liveCommitsByKeyTable = new Hashtable<IoTString *, Commit *>();
164 lastCommitSeenSequenceNumberByArbitratorTable = new Hashtable<int64_t, int64_t>();
165 rejectedSlotVector = new Vector<int64_t>();
166 pendingTransactionQueue = new Vector<Transaction *>();
167 pendingSendArbitrationEntriesToDelete = new Vector<Entry *>();
168 transactionPartsSent = new Hashtable<Transaction *, Vector<int32_t> *>();
169 outstandingTransactionStatus = new Hashtable<int64_t, TransactionStatus *>();
170 liveAbortsGeneratedByLocal = new Hashtable<int64_t, Abort *>();
171 offlineTransactionsCommittedAndAtServer = new Hashset<Pair<int64_t, int64_t> >();
172 localCommunicationTable = new Hashtable<int64_t, Pair<IoTString *, int32_t> >();
173 lastTransactionSeenFromMachineFromServer = new Hashtable<int64_t, int64_t>();
174 pendingSendArbitrationRounds = new Vector<ArbitrationRound *>();
175 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator = new Hashtable<int64_t, int64_t>();
178 numberOfSlots = buffer->capacity();
179 setResizeThreshold();
183 * Initialize the table by inserting a table status as the first entry
184 * into the table status also initialize the crypto stuff.
186 void Table::initTable() {
187 cloud->initSecurity();
189 // Create the first insertion into the block chain which is the table status
190 Slot *s = new Slot(this, 1, localMachineId, localSequenceNumber);
191 localSequenceNumber++;
192 TableStatus *status = new TableStatus(s, numberOfSlots);
194 Array<Slot *> *array = cloud->putSlot(s, numberOfSlots);
197 array = new Array<Slot *>(1);
199 // update local block chain
200 validateAndUpdate(array, true);
201 } else if (array->length() == 1) {
202 // in case we did push the slot BUT we failed to init it
203 validateAndUpdate(array, true);
205 throw new Error("Error on initialization");
210 * Rebuild the table from scratch by pulling the latest block chain
213 void Table::rebuild() {
214 // Just pull the latest slots from the server
215 Array<Slot *> *newslots = cloud->getSlots(sequenceNumber + 1);
216 validateAndUpdate(newslots, true);
218 updateLiveTransactionsAndStatus();
221 void Table::addLocalCommunication(int64_t arbitrator, IoTString *hostName, int portNumber) {
222 localCommunicationTable->put(arbitrator, Pair<IoTString *, int32_t>(hostName, portNumber));
225 int64_t Table::getArbitrator(IoTString *key) {
226 return arbitratorTable->get(key);
229 void Table::close() {
233 IoTString *Table::getCommitted(IoTString *key) {
234 KeyValue *kv = committedKeyValueTable->get(key);
237 return kv->getValue();
243 IoTString *Table::getSpeculative(IoTString *key) {
244 KeyValue *kv = pendingTransactionSpeculatedKeyValueTable->get(key);
247 kv = speculatedKeyValueTable->get(key);
251 kv = committedKeyValueTable->get(key);
255 return kv->getValue();
261 IoTString *Table::getCommittedAtomic(IoTString *key) {
262 KeyValue *kv = committedKeyValueTable->get(key);
264 if (!arbitratorTable->contains(key)) {
265 throw new Error("Key not Found.");
268 // Make sure new key value pair matches the current arbitrator
269 if (!pendingTransactionBuilder->checkArbitrator(arbitratorTable->get(key))) {
270 // TODO: Maybe not throw en error
271 throw new Error("Not all Key Values Match Arbitrator.");
275 pendingTransactionBuilder->addKVGuard(new KeyValue(key, kv->getValue()));
276 return kv->getValue();
278 pendingTransactionBuilder->addKVGuard(new KeyValue(key, NULL));
283 IoTString *Table::getSpeculativeAtomic(IoTString *key) {
284 if (!arbitratorTable->contains(key)) {
285 throw new Error("Key not Found.");
288 // Make sure new key value pair matches the current arbitrator
289 if (!pendingTransactionBuilder->checkArbitrator(arbitratorTable->get(key))) {
290 // TODO: Maybe not throw en error
291 throw new Error("Not all Key Values Match Arbitrator.");
294 KeyValue *kv = pendingTransactionSpeculatedKeyValueTable->get(key);
297 kv = speculatedKeyValueTable->get(key);
301 kv = committedKeyValueTable->get(key);
305 pendingTransactionBuilder->addKVGuard(new KeyValue(key, kv->getValue()));
306 return kv->getValue();
308 pendingTransactionBuilder->addKVGuard(new KeyValue(key, NULL));
313 bool Table::update() {
315 Array<Slot *> *newSlots = cloud->getSlots(sequenceNumber + 1);
316 validateAndUpdate(newSlots, false);
318 updateLiveTransactionsAndStatus();
320 } catch (Exception *e) {
321 for (int64_t m : localCommunicationTable->keySet()) {
329 bool Table::createNewKey(IoTString *keyName, int64_t machineId) {
331 if (!arbitratorTable->contains(keyName)) {
332 // There is already an arbitrator
335 NewKey *newKey = new NewKey(NULL, keyName, machineId);
337 if (sendToServer(newKey)) {
338 // If successfully inserted
344 void Table::startTransaction() {
345 // Create a new transaction, invalidates any old pending transactions.
346 pendingTransactionBuilder = new PendingTransaction(localMachineId);
349 void Table::addKV(IoTString *key, IoTString *value) {
351 // Make sure it is a valid key
352 if (!arbitratorTable->contains(key)) {
353 throw new Error("Key not Found.");
356 // Make sure new key value pair matches the current arbitrator
357 if (!pendingTransactionBuilder->checkArbitrator(arbitratorTable->get(key))) {
358 // TODO: Maybe not throw en error
359 throw new Error("Not all Key Values Match Arbitrator.");
362 // Add the key value to this transaction
363 KeyValue *kv = new KeyValue(key, value);
364 pendingTransactionBuilder->addKV(kv);
367 TransactionStatus *Table::commitTransaction() {
368 if (pendingTransactionBuilder->getKVUpdates()->size() == 0) {
369 // transaction with no updates will have no effect on the system
370 return new TransactionStatus(TransactionStatus_StatusNoEffect, -1);
373 // Set the local transaction sequence number and increment
374 pendingTransactionBuilder->setClientLocalSequenceNumber(localTransactionSequenceNumber);
375 localTransactionSequenceNumber++;
377 // Create the transaction status
378 TransactionStatus *transactionStatus = new TransactionStatus(TransactionStatus_StatusPending, pendingTransactionBuilder->getArbitrator());
380 // Create the new transaction
381 Transaction *newTransaction = pendingTransactionBuilder->createTransaction();
382 newTransaction->setTransactionStatus(transactionStatus);
384 if (pendingTransactionBuilder->getArbitrator() != localMachineId) {
385 // Add it to the queue and invalidate the builder for safety
386 pendingTransactionQueue->add(newTransaction);
388 arbitrateOnLocalTransaction(newTransaction);
389 updateLiveStateFromLocal();
392 pendingTransactionBuilder = new PendingTransaction(localMachineId);
396 } catch (ServerException *e) {
398 Hashset<int64_t> *arbitratorTriedAndFailed = new Hashset<int64_t>();
399 for (Iterator<Transaction *> *iter = pendingTransactionQueue->iterator(); iter->hasNext(); ) {
400 Transaction *transaction = iter->next();
402 if (arbitratorTriedAndFailed->contains(transaction->getArbitrator())) {
403 // Already contacted this client so ignore all attempts to contact this client
404 // to preserve ordering for arbitrator
408 Pair<bool, bool> sendReturn = sendTransactionToLocal(transaction);
410 if (sendReturn->getFirst()) {
411 // Failed to contact over local
412 arbitratorTriedAndFailed->add(transaction->getArbitrator());
414 // Successful contact or should not contact
416 if (sendReturn->getSecond()) {
424 updateLiveStateFromLocal();
426 return transactionStatus;
430 * Recalculate the new resize threshold
432 void Table::setResizeThreshold() {
433 int resizeLower = (int) (Table_RESIZE_THRESHOLD * numberOfSlots);
434 bufferResizeThreshold = resizeLower - 1 + random->nextInt(numberOfSlots - resizeLower);
437 int64_t Table::getLocalSequenceNumber() {
438 return localSequenceNumber;
441 bool Table::sendToServer(NewKey *newKey) {
442 bool fromRetry = false;
444 if (hadPartialSendToServer) {
445 Array<Slot *> *newSlots = cloud->getSlots(sequenceNumber + 1);
446 if (newSlots->length() == 0) {
448 ThreeTuple<bool, bool, Array<Slot *> *> sendSlotsReturn = sendSlotsToServer(lastSlotAttemptedToSend, lastNewSize, lastIsNewKey);
450 if (sendSlotsReturn->getFirst()) {
451 if (newKey != NULL) {
452 if (lastInsertedNewKey && (lastNewKey->getKey() == newKey->getKey()) && (lastNewKey->getMachineID() == newKey->getMachineID())) {
457 for (Transaction *transaction : lastTransactionPartsSent->keySet()) {
458 transaction->resetServerFailure();
459 // Update which transactions parts still need to be sent
460 transaction->removeSentParts(lastTransactionPartsSent->get(transaction));
461 // Add the transaction status to the outstanding list
462 outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus());
464 // Update the transaction status
465 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial);
467 // Check if all the transaction parts were successfully
468 // sent and if so then remove it from pending
469 if (transaction->didSendAllParts()) {
470 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully);
471 pendingTransactionQueue->remove(transaction);
475 newSlots = sendSlotsReturn->getThird();
476 bool isInserted = false;
477 for (uint si = 0; si < newSlots->length(); si++) {
478 Slot *s = newSlots->get(si);
479 if ((s->getSequenceNumber() == lastSlotAttemptedToSend->getSequenceNumber()) && (s->getMachineID() == localMachineId)) {
485 for (uint si = 0; si < newSlots->length(); si++) {
486 Slot *s = newSlots->get(si);
491 // Process each entry in the slot
492 for (Entry *entry : s->getEntries()) {
493 if (entry->getType() == TypeLastMessage) {
494 LastMessage *lastMessage = (LastMessage *)entry;
495 if ((lastMessage->getMachineID() == localMachineId) && (lastMessage->getSequenceNumber() == lastSlotAttemptedToSend->getSequenceNumber())) {
504 if (newKey != NULL) {
505 if (lastInsertedNewKey && (lastNewKey->getKey() == newKey->getKey()) && (lastNewKey->getMachineID() == newKey->getMachineID())) {
510 for (Transaction *transaction : lastTransactionPartsSent->keySet()) {
511 transaction->resetServerFailure();
513 // Update which transactions parts still need to be sent
514 transaction->removeSentParts(lastTransactionPartsSent->get(transaction));
516 // Add the transaction status to the outstanding list
517 outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus());
519 // Update the transaction status
520 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial);
522 // Check if all the transaction parts were successfully sent and if so then remove it from pending
523 if (transaction->didSendAllParts()) {
524 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully);
525 pendingTransactionQueue->remove(transaction);
527 transaction->resetServerFailure();
528 // Set the transaction sequence number back to nothing
529 if (!transaction->didSendAPartToServer()) {
530 transaction->setSequenceNumber(-1);
537 for (Transaction *transaction : lastTransactionPartsSent->keySet()) {
538 transaction->resetServerFailure();
539 // Set the transaction sequence number back to nothing
540 if (!transaction->didSendAPartToServer()) {
541 transaction->setSequenceNumber(-1);
545 if (sendSlotsReturn->getThird()->length() != 0) {
546 // insert into the local block chain
547 validateAndUpdate(sendSlotsReturn->getThird(), true);
551 bool isInserted = false;
552 for (uint si = 0; si < newSlots->length(); si++) {
553 Slot *s = newSlots->get(si);
554 if ((s->getSequenceNumber() == lastSlotAttemptedToSend->getSequenceNumber()) && (s->getMachineID() == localMachineId)) {
560 for (uint si = 0; si < newSlots->length(); si++) {
561 Slot *s = newSlots->get(si);
566 // Process each entry in the slot
567 for (Entry *entry : s->getEntries()) {
569 if (entry->getType() == TypeLastMessage) {
570 LastMessage *lastMessage = (LastMessage *)entry;
571 if ((lastMessage->getMachineID() == localMachineId) && (lastMessage->getSequenceNumber() == lastSlotAttemptedToSend->getSequenceNumber())) {
580 if (newKey != NULL) {
581 if (lastInsertedNewKey && (lastNewKey->getKey() == newKey->getKey()) && (lastNewKey->getMachineID() == newKey->getMachineID())) {
586 for (Transaction *transaction : lastTransactionPartsSent->keySet()) {
587 transaction->resetServerFailure();
589 // Update which transactions parts still need to be sent
590 transaction->removeSentParts(lastTransactionPartsSent->get(transaction));
592 // Add the transaction status to the outstanding list
593 outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus());
595 // Update the transaction status
596 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial);
598 // Check if all the transaction parts were successfully sent and if so then remove it from pending
599 if (transaction->didSendAllParts()) {
600 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully);
601 pendingTransactionQueue->remove(transaction);
603 transaction->resetServerFailure();
604 // Set the transaction sequence number back to nothing
605 if (!transaction->didSendAPartToServer()) {
606 transaction->setSequenceNumber(-1);
611 for (Transaction *transaction : lastTransactionPartsSent->keySet()) {
612 transaction->resetServerFailure();
613 // Set the transaction sequence number back to nothing
614 if (!transaction->didSendAPartToServer()) {
615 transaction->setSequenceNumber(-1);
620 // insert into the local block chain
621 validateAndUpdate(newSlots, true);
624 } catch (ServerException *e) {
631 // While we have stuff that needs inserting into the block chain
632 while ((pendingTransactionQueue->size() > 0) || (pendingSendArbitrationRounds->size() > 0) || (newKey != NULL)) {
636 if (hadPartialSendToServer) {
637 throw new Error("Should Be error free");
642 // If there is a new key with same name then end
643 if ((newKey != NULL) && arbitratorTable->contains(newKey->getKey())) {
648 Slot *slot = new Slot(this, sequenceNumber + 1, localMachineId, buffer->getSlot(sequenceNumber)->getHMAC(), localSequenceNumber);
649 localSequenceNumber++;
651 // Try to fill the slot with data
652 ThreeTuple<bool, int32_t, bool> fillSlotsReturn = fillSlot(slot, false, newKey);
653 bool needsResize = fillSlotsReturn->getFirst();
654 int newSize = fillSlotsReturn->getSecond();
655 bool insertedNewKey = fillSlotsReturn->getThird();
658 // Reset which transaction to send
659 for (Transaction *transaction : transactionPartsSent->keySet()) {
660 transaction->resetNextPartToSend();
662 // Set the transaction sequence number back to nothing
663 if (!transaction->didSendAPartToServer() && !transaction->getServerFailure()) {
664 transaction->setSequenceNumber(-1);
668 // Clear the sent data since we are trying again
669 pendingSendArbitrationEntriesToDelete->clear();
670 transactionPartsSent->clear();
672 // We needed a resize so try again
673 fillSlot(slot, true, newKey);
676 lastSlotAttemptedToSend = slot;
677 lastIsNewKey = (newKey != NULL);
678 lastInsertedNewKey = insertedNewKey;
679 lastNewSize = newSize;
681 lastTransactionPartsSent = new Hashtable<Transaction *, Vector<int32_t> * >(transactionPartsSent);
682 lastPendingSendArbitrationEntriesToDelete = new Vector<Entry *>(pendingSendArbitrationEntriesToDelete);
685 ThreeTuple<bool, bool, Array<Slot *> *> sendSlotsReturn = sendSlotsToServer(slot, newSize, newKey != NULL);
687 if (sendSlotsReturn->getFirst()) {
689 // Did insert into the block chain
691 if (insertedNewKey) {
692 // This slot was what was inserted not a previous slot
694 // New Key was successfully inserted into the block chain so dont want to insert it again
698 // Remove the aborts and commit parts that were sent from the pending to send queue
699 for (Iterator<ArbitrationRound *> *iter = pendingSendArbitrationRounds->iterator(); iter->hasNext(); ) {
700 ArbitrationRound *round = iter->next();
701 round->removeParts(pendingSendArbitrationEntriesToDelete);
703 if (round->isDoneSending()) {
704 // Sent all the parts
709 for (Transaction *transaction : transactionPartsSent->keySet()) {
710 transaction->resetServerFailure();
712 // Update which transactions parts still need to be sent
713 transaction->removeSentParts(transactionPartsSent->get(transaction));
715 // Add the transaction status to the outstanding list
716 outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus());
718 // Update the transaction status
719 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial);
721 // Check if all the transaction parts were successfully sent and if so then remove it from pending
722 if (transaction->didSendAllParts()) {
723 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully);
724 pendingTransactionQueue->remove(transaction);
728 // Reset which transaction to send
729 for (Transaction *transaction : transactionPartsSent->keySet()) {
730 transaction->resetNextPartToSend();
732 // Set the transaction sequence number back to nothing
733 if (!transaction->didSendAPartToServer() && !transaction->getServerFailure()) {
734 transaction->setSequenceNumber(-1);
739 // Clear the sent data in preparation for next send
740 pendingSendArbitrationEntriesToDelete->clear();
741 transactionPartsSent->clear();
743 if (sendSlotsReturn->getThird()->length() != 0) {
744 // insert into the local block chain
745 validateAndUpdate(sendSlotsReturn->getThird(), true);
749 } catch (ServerException *e) {
751 if (e->getType() != ServerException->TypeInputTimeout) {
752 // Nothing was able to be sent to the server so just clear these data structures
753 for (Transaction *transaction : transactionPartsSent->keySet()) {
754 transaction->resetNextPartToSend();
756 // Set the transaction sequence number back to nothing
757 if (!transaction->didSendAPartToServer() && !transaction->getServerFailure()) {
758 transaction->setSequenceNumber(-1);
762 // There was a partial send to the server
763 hadPartialSendToServer = true;
765 // Nothing was able to be sent to the server so just clear these data structures
766 for (Transaction *transaction : transactionPartsSent->keySet()) {
767 transaction->resetNextPartToSend();
768 transaction->setServerFailure();
772 pendingSendArbitrationEntriesToDelete->clear();
773 transactionPartsSent->clear();
778 return newKey == NULL;
781 bool Table::updateFromLocal(int64_t machineId) {
782 Pair<IoTString *, int32_t> localCommunicationInformation = localCommunicationTable->get(machineId);
783 if (localCommunicationInformation == NULL) {
784 // Cant talk to that device locally so do nothing
788 // Get the size of the send data
789 int sendDataSize = sizeof(int32_t) + sizeof(int64_t);
791 int64_t lastArbitrationDataLocalSequenceNumber = (int64_t) -1;
792 if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(machineId) != NULL) {
793 lastArbitrationDataLocalSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(machineId);
796 Array<char> *sendData = new Array<char>(sendDataSize);
797 ByteBuffer *bbEncode = ByteBuffer_wrap(sendData);
800 bbEncode->putLong(lastArbitrationDataLocalSequenceNumber);
804 Array<char> *returnData = cloud->sendLocalData(sendData, localSequenceNumber, localCommunicationInformation->getFirst(), localCommunicationInformation->getSecond());
805 localSequenceNumber++;
807 if (returnData == NULL) {
808 // Could not contact server
813 ByteBuffer *bbDecode = ByteBuffer_wrap(returnData);
814 int numberOfEntries = bbDecode->getInt();
816 for (int i = 0; i < numberOfEntries; i++) {
817 char type = bbDecode->get();
818 if (type == TypeAbort) {
819 Abort *abort = (Abort)Abort_decode(NULL, bbDecode);
821 } else if (type == TypeCommitPart) {
822 CommitPart *commitPart = (CommitPart)CommitPart_decode(NULL, bbDecode);
823 processEntry(commitPart);
827 updateLiveStateFromLocal();
832 Pair<bool, bool> Table::sendTransactionToLocal(Transaction *transaction) {
834 // Get the devices local communications
835 Pair<IoTString *, int32_t> localCommunicationInformation = localCommunicationTable->get(transaction->getArbitrator());
837 if (localCommunicationInformation == NULL) {
838 // Cant talk to that device locally so do nothing
839 return Pair<bool, bool>(true, false);
842 // Get the size of the send data
843 int sendDataSize = sizeof(int32_t) + sizeof(int64_t);
844 for (TransactionPart *part : transaction->getParts()->values()) {
845 sendDataSize += part->getSize();
848 int64_t lastArbitrationDataLocalSequenceNumber = (int64_t) -1;
849 if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(transaction->getArbitrator()) != NULL) {
850 lastArbitrationDataLocalSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(transaction->getArbitrator());
853 // Make the send data size
854 Array<char> *sendData = new Array<char>(sendDataSize);
855 ByteBuffer *bbEncode = ByteBuffer.wrap(sendData);
858 bbEncode->putLong(lastArbitrationDataLocalSequenceNumber);
859 bbEncode->putInt(transaction->getParts()->size());
860 for (TransactionPart *part : transaction->getParts()->values()) {
861 part->encode(bbEncode);
866 Array<char> *returnData = cloud->sendLocalData(sendData, localSequenceNumber, localCommunicationInformation->getFirst(), localCommunicationInformation->getSecond());
867 localSequenceNumber++;
869 if (returnData == NULL) {
870 // Could not contact server
871 return Pair<bool, bool>(true, false);
875 ByteBuffer *bbDecode = ByteBuffer_wrap(returnData);
876 bool didCommit = bbDecode->get() == 1;
877 bool couldArbitrate = bbDecode->get() == 1;
878 int numberOfEntries = bbDecode->getInt();
879 bool foundAbort = false;
881 for (int i = 0; i < numberOfEntries; i++) {
882 char type = bbDecode->get();
883 if (type == TypeAbort) {
884 Abort *abort = (Abort)Abort_decode(NULL, bbDecode);
886 if ((abort->getTransactionMachineId() == localMachineId) && (abort->getTransactionClientLocalSequenceNumber() == transaction->getClientLocalSequenceNumber())) {
891 } else if (type == TypeCommitPart) {
892 CommitPart *commitPart = (CommitPart)CommitPart_decode(NULL, bbDecode);
893 processEntry(commitPart);
897 updateLiveStateFromLocal();
899 if (couldArbitrate) {
900 TransactionStatus status = transaction->getTransactionStatus();
902 status->setStatus(TransactionStatus_StatusCommitted);
904 status->setStatus(TransactionStatus_StatusAborted);
907 TransactionStatus status = transaction->getTransactionStatus();
909 status->setStatus(TransactionStatus_StatusAborted);
911 status->setStatus(TransactionStatus_StatusCommitted);
915 return Pair<bool, bool>(false, true);
918 Array<char> *Table::acceptDataFromLocal(Array<char> *data) {
921 ByteBuffer *bbDecode = ByteBuffer_wrap(data);
922 int64_t lastArbitratedSequenceNumberSeen = bbDecode->getLong();
923 int numberOfParts = bbDecode->getInt();
925 // If we did commit a transaction or not
926 bool didCommit = false;
927 bool couldArbitrate = false;
929 if (numberOfParts != 0) {
931 // decode the transaction
932 Transaction *transaction = new Transaction();
933 for (int i = 0; i < numberOfParts; i++) {
935 TransactionPart *newPart = (TransactionPart)TransactionPart.decode(NULL, bbDecode);
936 transaction->addPartDecode(newPart);
939 // Arbitrate on transaction and pull relevant return data
940 Pair<bool, bool> localArbitrateReturn = arbitrateOnLocalTransaction(transaction);
941 couldArbitrate = localArbitrateReturn->getFirst();
942 didCommit = localArbitrateReturn->getSecond();
944 updateLiveStateFromLocal();
946 // Transaction was sent to the server so keep track of it to prevent double commit
947 if (transaction->getSequenceNumber() != -1) {
948 offlineTransactionsCommittedAndAtServer->add(transaction->getId());
952 // The data to send back
953 int returnDataSize = 0;
954 Vector<Entry *> *unseenArbitrations = new Vector<Entry *>();
956 // Get the aborts to send back
957 Vector<int64_t> *abortLocalSequenceNumbers = new Vector<int64_t>(liveAbortsGeneratedByLocal->keySet());
958 Collections->sort(abortLocalSequenceNumbers);
959 for (int64_t localSequenceNumber : abortLocalSequenceNumbers) {
960 if (localSequenceNumber <= lastArbitratedSequenceNumberSeen) {
964 Abort *abort = liveAbortsGeneratedByLocal->get(localSequenceNumber);
965 unseenArbitrations->add(abort);
966 returnDataSize += abort->getSize();
969 // Get the commits to send back
970 Hashtable<int64_t, Commit *> *commitForClientTable = liveCommitsTable->get(localMachineId);
971 if (commitForClientTable != NULL) {
972 Vector<int64_t> *commitLocalSequenceNumbers = new Vector<int64_t>(commitForClientTable->keySet());
973 Collections->sort(commitLocalSequenceNumbers);
975 for (int64_t localSequenceNumber : commitLocalSequenceNumbers) {
976 Commit *commit = commitForClientTable->get(localSequenceNumber);
978 if (localSequenceNumber <= lastArbitratedSequenceNumberSeen) {
982 unseenArbitrations->addAll(commit->getParts()->values());
984 for (CommitPart commitPart : commit->getParts()->values()) {
985 returnDataSize += commitPart->getSize();
990 // Number of arbitration entries to decode
991 returnDataSize += 2 * sizeof(int32_t);
993 // bool of did commit or not
994 if (numberOfParts != 0) {
995 returnDataSize += sizeof(char);
999 Array<char> *returnData = new Array<char>(returnDataSize);
1000 ByteBuffer *bbEncode = ByteBuffer_wrap(returnData);
1002 if (numberOfParts != 0) {
1004 bbEncode->put((char)1);
1006 bbEncode->put((char)0);
1008 if (couldArbitrate) {
1009 bbEncode->put((char)1);
1011 bbEncode->put((char)0);
1015 bbEncode->putInt(unseenArbitrations->size());
1016 for (Entry *entry : unseenArbitrations) {
1017 entry->encode(bbEncode);
1021 localSequenceNumber++;
1025 ThreeTuple<bool, bool, Array<Slot *> *> Table::sendSlotsToServer(Slot *slot, int newSize, bool isNewKey) {
1026 bool attemptedToSendToServerTmp = attemptedToSendToServer;
1027 attemptedToSendToServer = true;
1029 bool inserted = false;
1030 bool lastTryInserted = false;
1032 Array<Slot *> *array = cloud->putSlot(slot, newSize);
1033 if (array == NULL) {
1034 array = new Array<Slot *>();
1035 array->set(0, slot);
1036 rejectedSlotVector->clear();
1039 if (array->length() == 0) {
1040 throw new Error("Server Error: Did not send any slots");
1043 // if (attemptedToSendToServerTmp) {
1044 if (hadPartialSendToServer) {
1046 bool isInserted = false;
1047 for (Slot *s : array) {
1048 if ((s->getSequenceNumber() == slot->getSequenceNumber()) && (s->getMachineID() == localMachineId)) {
1054 for (Slot *s : array) {
1059 // Process each entry in the slot
1060 for (Entry *entry : s->getEntries()) {
1062 if (entry->getType() == TypeLastMessage) {
1063 LastMessage *lastMessage = (LastMessage *)entry;
1065 if ((lastMessage->getMachineID() == localMachineId) && (lastMessage->getSequenceNumber() == slot->getSequenceNumber())) {
1074 rejectedSlotVector->add(slot->getSequenceNumber());
1075 lastTryInserted = false;
1077 lastTryInserted = true;
1080 rejectedSlotVector->add(slot->getSequenceNumber());
1081 lastTryInserted = false;
1085 return ThreeTuple<bool, bool, Array<Slot *> *>(inserted, lastTryInserted, array);
1089 * Returns false if a resize was needed
1091 ThreeTuple<bool, int32_t, bool> Table::fillSlot(Slot *slot, bool resize, NewKey *newKeyEntry) {
1093 if (liveSlotCount > bufferResizeThreshold) {
1094 resize = true; //Resize is forced
1098 newSize = (int) (numberOfSlots * Table_RESIZE_MULTIPLE);
1099 TableStatus *status = new TableStatus(slot, newSize);
1100 slot->addEntry(status);
1103 // Fill with rejected slots first before doing anything else
1104 doRejectedMessages(slot);
1106 // Do mandatory rescue of entries
1107 ThreeTuple<bool, bool, int64_t> mandatoryRescueReturn = doMandatoryResuce(slot, resize);
1109 // Extract working variables
1110 bool needsResize = mandatoryRescueReturn->getFirst();
1111 bool seenLiveSlot = mandatoryRescueReturn->getSecond();
1112 int64_t currentRescueSequenceNumber = mandatoryRescueReturn->getThird();
1114 if (needsResize && !resize) {
1115 // We need to resize but we are not resizing so return false
1116 return ThreeTuple<bool, int32_t, bool>(true, NULL, NULL);
1119 bool inserted = false;
1120 if (newKeyEntry != NULL) {
1121 newKeyEntry->setSlot(slot);
1122 if (slot->hasSpace(newKeyEntry)) {
1123 slot->addEntry(newKeyEntry);
1128 // Clear the transactions, aborts and commits that were sent previously
1129 transactionPartsSent->clear();
1130 pendingSendArbitrationEntriesToDelete->clear();
1132 for (ArbitrationRound *round : pendingSendArbitrationRounds) {
1133 bool isFull = false;
1134 round->generateParts();
1135 Vector<Entry *> *parts = round->getParts();
1137 // Insert pending arbitration data
1138 for (Entry *arbitrationData : parts) {
1140 // If it is an abort then we need to set some information
1141 if (arbitrationData instanceof Abort) {
1142 ((Abort *)arbitrationData)->setSequenceNumber(slot->getSequenceNumber());
1145 if (!slot->hasSpace(arbitrationData)) {
1146 // No space so cant do anything else with these data entries
1151 // Add to this current slot and add it to entries to delete
1152 slot->addEntry(arbitrationData);
1153 pendingSendArbitrationEntriesToDelete->add(arbitrationData);
1161 if (pendingTransactionQueue->size() > 0) {
1162 Transaction *transaction = pendingTransactionQueue->get(0);
1163 // Set the transaction sequence number if it has yet to be inserted into the block chain
1164 if ((!transaction->didSendAPartToServer()) || (transaction->getSequenceNumber() == -1)) {
1165 transaction->setSequenceNumber(slot->getSequenceNumber());
1169 TransactionPart *part = transaction->getNextPartToSend();
1171 // Ran out of parts to send for this transaction so move on
1175 if (slot->hasSpace(part)) {
1176 slot->addEntry(part);
1177 Vector<int32_t> *partsSent = transactionPartsSent->get(transaction);
1178 if (partsSent == NULL) {
1179 partsSent = new Vector<int32_t>();
1180 transactionPartsSent->put(transaction, partsSent);
1182 partsSent->add(part->getPartNumber());
1183 transactionPartsSent->put(transaction, partsSent);
1190 // Fill the remainder of the slot with rescue data
1191 doOptionalRescue(slot, seenLiveSlot, currentRescueSequenceNumber, resize);
1193 return ThreeTuple<bool, int32_t, bool>(false, newSize, inserted);
1196 void Table::doRejectedMessages(Slot *s) {
1197 if (!rejectedSlotVector->isEmpty()) {
1198 /* TODO: We should avoid generating a rejected message entry if
1199 * there is already a sufficient entry in the queue (e->g->,
1200 * equalsto value of true and same sequence number)-> */
1202 int64_t old_seqn = rejectedSlotVector->firstElement();
1203 if (rejectedSlotVector->size() > Table_REJECTED_THRESHOLD) {
1204 int64_t new_seqn = rejectedSlotVector->lastElement();
1205 RejectedMessage *rm = new RejectedMessage(s, s->getSequenceNumber(), localMachineId, old_seqn, new_seqn, false);
1208 int64_t prev_seqn = -1;
1210 /* Go through list of missing messages */
1211 for (; i < rejectedSlotVector->size(); i++) {
1212 int64_t curr_seqn = rejectedSlotVector->get(i);
1213 Slot *s_msg = buffer->getSlot(curr_seqn);
1216 prev_seqn = curr_seqn;
1218 /* Generate rejected message entry for missing messages */
1219 if (prev_seqn != -1) {
1220 RejectedMessage *rm = new RejectedMessage(s, s->getSequenceNumber(), localMachineId, old_seqn, prev_seqn, false);
1223 /* Generate rejected message entries for present messages */
1224 for (; i < rejectedSlotVector->size(); i++) {
1225 int64_t curr_seqn = rejectedSlotVector->get(i);
1226 Slot *s_msg = buffer->getSlot(curr_seqn);
1227 int64_t machineid = s_msg->getMachineID();
1228 RejectedMessage *rm = new RejectedMessage(s, s->getSequenceNumber(), machineid, curr_seqn, curr_seqn, true);
1235 ThreeTuple<bool, bool, int64_t> Table::doMandatoryResuce(Slot *slot, bool resize) {
1236 int64_t newestSequenceNumber = buffer->getNewestSeqNum();
1237 int64_t oldestSequenceNumber = buffer->getOldestSeqNum();
1238 if (oldestLiveSlotSequenceNumver < oldestSequenceNumber) {
1239 oldestLiveSlotSequenceNumver = oldestSequenceNumber;
1242 int64_t currentSequenceNumber = oldestLiveSlotSequenceNumver;
1243 bool seenLiveSlot = false;
1244 int64_t firstIfFull = newestSequenceNumber + 1 - numberOfSlots; // smallest seq number in the buffer if it is full
1245 int64_t threshold = firstIfFull + Table_FREE_SLOTS; // we want the buffer to be clear of live entries up to this point
1249 for (; currentSequenceNumber < threshold; currentSequenceNumber++) {
1250 Slot previousSlot = buffer->getSlot(currentSequenceNumber);
1251 // Push slot number forward
1252 if (!seenLiveSlot) {
1253 oldestLiveSlotSequenceNumver = currentSequenceNumber;
1256 if (!previousSlot->isLive()) {
1260 // We have seen a live slot
1261 seenLiveSlot = true;
1263 // Get all the live entries for a slot
1264 Vector<Entry *> *liveEntries = previousSlot->getLiveEntries(resize);
1266 // Iterate over all the live entries and try to rescue them
1267 for (Entry *liveEntry : liveEntries) {
1268 if (slot->hasSpace(liveEntry)) {
1270 // Enough space to rescue the entry
1271 slot->addEntry(liveEntry);
1272 } else if (currentSequenceNumber == firstIfFull) {
1273 //if there's no space but the entry is about to fall off the queue
1274 System->out->println("B"); //?
1275 return ThreeTuple<bool, bool, int64_t>(true, seenLiveSlot, currentSequenceNumber);
1282 return ThreeTuple<bool, bool, int64_t>(false, seenLiveSlot, currentSequenceNumber);
1285 void Table::doOptionalRescue(Slot *s, bool seenliveslot, int64_t seqn, bool resize) {
1286 /* now go through live entries from least to greatest sequence number until
1287 * either all live slots added, or the slot doesn't have enough room
1288 * for SKIP_THRESHOLD consecutive entries*/
1290 int64_t newestseqnum = buffer->getNewestSeqNum();
1292 for (; seqn <= newestseqnum; seqn++) {
1293 Slot *prevslot = buffer->getSlot(seqn);
1294 //Push slot number forward
1296 oldestLiveSlotSequenceNumver = seqn;
1298 if (!prevslot->isLive())
1300 seenliveslot = true;
1301 Vector<Entry *> *liveentries = prevslot->getLiveEntries(resize);
1302 for (Entry *liveentry : liveentries) {
1303 if (s->hasSpace(liveentry))
1304 s->addEntry(liveentry);
1307 if (skipcount > Table_SKIP_THRESHOLD)
1315 * Checks for malicious activity and updates the local copy of the block chain->
1317 void Table::validateAndUpdate(Array<Slot *> *newSlots, bool acceptUpdatesToLocal) {
1318 // The cloud communication layer has checked slot HMACs already
1320 if (newSlots->length() == 0) {
1324 // Make sure all slots are newer than the last largest slot this
1326 int64_t firstSeqNum = newSlots->get(0)->getSequenceNumber();
1327 if (firstSeqNum <= sequenceNumber) {
1328 throw new Error("Server Error: Sent older slots!");
1331 // Create an object that can access both new slots and slots in our
1332 // local chain without committing slots to our local chain
1333 SlotIndexer *indexer = new SlotIndexer(newSlots, buffer);
1335 // Check that the HMAC chain is not broken
1336 checkHMACChain(indexer, newSlots);
1338 // Set to keep track of messages from clients
1339 Hashset<int64_t> *machineSet = new Hashset<int64_t>(lastMessageTable->keySet());
1341 // Process each slots data
1342 for (Slot *slot : newSlots) {
1343 processSlot(indexer, slot, acceptUpdatesToLocal, machineSet);
1345 updateExpectedSize();
1348 // If there is a gap, check to see if the server sent us
1350 if (firstSeqNum != (sequenceNumber + 1)) {
1352 // Check the size of the slots that were sent down by the server->
1353 // Can only check the size if there was a gap
1354 checkNumSlots(newSlots->length);
1356 // Since there was a gap every machine must have pushed a slot or
1357 // must have a last message message-> If not then the server is
1359 if (!machineSet->isEmpty()) {
1360 throw new Error("Missing record for machines: " + machineSet);
1364 // Update the size of our local block chain->
1367 // Commit new to slots to the local block chain->
1368 for (Slot *slot : newSlots) {
1370 // Insert this slot into our local block chain copy->
1371 buffer->putSlot(slot);
1373 // Keep track of how many slots are currently live (have live data
1378 // Get the sequence number of the latest slot in the system
1379 sequenceNumber = newSlots->get(newSlots->length() - 1)->getSequenceNumber();
1380 updateLiveStateFromServer();
1382 // No Need to remember after we pulled from the server
1383 offlineTransactionsCommittedAndAtServer->clear();
1385 // This is invalidated now
1386 hadPartialSendToServer = false;
1389 void Table::updateLiveStateFromServer() {
1390 // Process the new transaction parts
1391 processNewTransactionParts();
1393 // Do arbitration on new transactions that were received
1394 arbitrateFromServer();
1396 // Update all the committed keys
1397 bool didCommitOrSpeculate = updateCommittedTable();
1399 // Delete the transactions that are now dead
1400 updateLiveTransactionsAndStatus();
1403 didCommitOrSpeculate |= updateSpeculativeTable(didCommitOrSpeculate);
1404 updatePendingTransactionSpeculativeTable(didCommitOrSpeculate);
1407 void Table::updateLiveStateFromLocal() {
1408 // Update all the committed keys
1409 bool didCommitOrSpeculate = updateCommittedTable();
1411 // Delete the transactions that are now dead
1412 updateLiveTransactionsAndStatus();
1415 didCommitOrSpeculate |= updateSpeculativeTable(didCommitOrSpeculate);
1416 updatePendingTransactionSpeculativeTable(didCommitOrSpeculate);
1419 void Table::initExpectedSize(int64_t firstSequenceNumber, int64_t numberOfSlots) {
1420 int64_t prevslots = firstSequenceNumber;
1422 if (didFindTableStatus) {
1424 expectedsize = (prevslots < ((int64_t) numberOfSlots)) ? (int) prevslots : numberOfSlots;
1427 didFindTableStatus = true;
1428 currMaxSize = numberOfSlots;
1431 void Table::updateExpectedSize() {
1434 if (expectedsize > currMaxSize) {
1435 expectedsize = currMaxSize;
1441 * Check the size of the block chain to make sure there are enough
1442 * slots sent back by the server-> This is only called when we have a
1443 * gap between the slots that we have locally and the slots sent by
1444 * the server therefore in the slots sent by the server there will be
1445 * at least 1 Table status message
1447 void Table::checkNumSlots(int numberOfSlots) {
1448 if (numberOfSlots != expectedsize) {
1449 throw new Error("Server Error: Server did not send all slots-> Expected: " + expectedsize + " Received:" + numberOfSlots);
1453 void Table::updateCurrMaxSize(int newmaxsize) {
1454 currMaxSize = newmaxsize;
1459 * Update the size of of the local buffer if it is needed->
1461 void Table::commitNewMaxSize() {
1462 didFindTableStatus = false;
1464 // Resize the local slot buffer
1465 if (numberOfSlots != currMaxSize) {
1466 buffer->resize((int32_t)currMaxSize);
1469 // Change the number of local slots to the new size
1470 numberOfSlots = (int32_t)currMaxSize;
1472 // Recalculate the resize threshold since the size of the local
1473 // buffer has changed
1474 setResizeThreshold();
1478 * Process the new transaction parts from this latest round of slots
1479 * received from the server
1481 void Table::processNewTransactionParts() {
1483 if (newTransactionParts->size() == 0) {
1484 // Nothing new to process
1488 // Iterate through all the machine Ids that we received new parts
1490 for (int64_t machineId : newTransactionParts->keySet()) {
1491 Hashtable<Pair<int64_t int32_t>, TransactionPart *> *parts = newTransactionParts->get(machineId);
1493 // Iterate through all the parts for that machine Id
1494 for (Pair<int64_t, int32_t> partId : parts->keySet()) {
1495 TransactionPart *part = parts->get(partId);
1497 int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(part->getArbitratorId());
1498 if ((lastTransactionNumber != NULL) && (lastTransactionNumber >= part->getSequenceNumber())) {
1499 // Set dead the transaction part
1504 // Get the transaction object for that sequence number
1505 Transaction *transaction = liveTransactionBySequenceNumberTable->get(part->getSequenceNumber());
1507 if (transaction == NULL) {
1508 // This is a new transaction that we dont have so make a new one
1509 transaction = new Transaction();
1511 // Insert this new transaction into the live tables
1512 liveTransactionBySequenceNumberTable->put(part->getSequenceNumber(), transaction);
1513 liveTransactionByTransactionIdTable->put(part->getTransactionId(), transaction);
1516 // Add that part to the transaction
1517 transaction->addPartDecode(part);
1521 // Clear all the new transaction parts in preparation for the next
1522 // time the server sends slots
1523 newTransactionParts->clear();
1526 void Table::arbitrateFromServer() {
1528 if (liveTransactionBySequenceNumberTable->size() == 0) {
1529 // Nothing to arbitrate on so move on
1533 // Get the transaction sequence numbers and sort from oldest to newest
1534 Vector<int64_t> *transactionSequenceNumbers = new Vector<int64_t>(liveTransactionBySequenceNumberTable->keySet());
1535 Collections->sort(transactionSequenceNumbers);
1537 // Collection of key value pairs that are
1538 Hashtable<IoTString *, KeyValue *> speculativeTableTmp = new Hashtable<IoTString *, KeyValue *>();
1540 // The last transaction arbitrated on
1541 int64_t lastTransactionCommitted = -1;
1542 Hashset<Abort *> *generatedAborts = new Hashset<Abort *>();
1544 for (int64_t transactionSequenceNumber : transactionSequenceNumbers) {
1545 Transaction *transaction = liveTransactionBySequenceNumberTable->get(transactionSequenceNumber);
1549 // Check if this machine arbitrates for this transaction if not
1550 // then we cant arbitrate this transaction
1551 if (transaction->getArbitrator() != localMachineId) {
1555 if (transactionSequenceNumber < lastSeqNumArbOn) {
1559 if (offlineTransactionsCommittedAndAtServer->contains(transaction->getId())) {
1560 // We have seen this already locally so dont commit again
1565 if (!transaction->isComplete()) {
1566 // Will arbitrate in incorrect order if we continue so just break
1572 // update the largest transaction seen by arbitrator from server
1573 if (lastTransactionSeenFromMachineFromServer->get(transaction->getMachineId()) == NULL) {
1574 lastTransactionSeenFromMachineFromServer->put(transaction->getMachineId(), transaction->getClientLocalSequenceNumber());
1576 int64_t lastTransactionSeenFromMachine = lastTransactionSeenFromMachineFromServer->get(transaction->getMachineId());
1577 if (transaction->getClientLocalSequenceNumber() > lastTransactionSeenFromMachine) {
1578 lastTransactionSeenFromMachineFromServer->put(transaction->getMachineId(), transaction->getClientLocalSequenceNumber());
1582 if (transaction->evaluateGuard(committedKeyValueTable, speculativeTableTmp, NULL)) {
1583 // Guard evaluated as true
1585 // Update the local changes so we can make the commit
1586 for (KeyValue *kv : transaction->getKeyValueUpdateSet()) {
1587 speculativeTableTmp->put(kv->getKey(), kv);
1590 // Update what the last transaction committed was for use in batch commit
1591 lastTransactionCommitted = transactionSequenceNumber;
1593 // Guard evaluated was false so create abort
1595 Abort *newAbort = new Abort(NULL,
1596 transaction->getClientLocalSequenceNumber(),
1597 transaction->getSequenceNumber(),
1598 transaction->getMachineId(),
1599 transaction->getArbitrator(),
1600 localArbitrationSequenceNumber);
1601 localArbitrationSequenceNumber++;
1602 generatedAborts->add(newAbort);
1604 // Insert the abort so we can process
1605 processEntry(newAbort);
1608 lastSeqNumArbOn = transactionSequenceNumber;
1611 Commit *newCommit = NULL;
1613 // If there is something to commit
1614 if (speculativeTableTmp->size() != 0) {
1615 // Create the commit and increment the commit sequence number
1616 newCommit = new Commit(localArbitrationSequenceNumber, localMachineId, lastTransactionCommitted);
1617 localArbitrationSequenceNumber++;
1619 // Add all the new keys to the commit
1620 for (KeyValue *kv : speculativeTableTmp->values()) {
1621 newCommit->addKV(kv);
1624 // create the commit parts
1625 newCommit->createCommitParts();
1627 // Append all the commit parts to the end of the pending queue
1628 // waiting for sending to the server
1629 // Insert the commit so we can process it
1630 for (CommitPart *commitPart : newCommit->getParts()->values()) {
1631 processEntry(commitPart);
1635 if ((newCommit != NULL) || (generatedAborts->size() > 0)) {
1636 ArbitrationRound *arbitrationRound = new ArbitrationRound(newCommit, generatedAborts);
1637 pendingSendArbitrationRounds->add(arbitrationRound);
1639 if (compactArbitrationData()) {
1640 ArbitrationRound *newArbitrationRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1);
1641 if (newArbitrationRound->getCommit() != NULL) {
1642 for (CommitPart *commitPart : newArbitrationRound->getCommit()->getParts()->values()) {
1643 processEntry(commitPart);
1650 Pair<bool, bool> Table::arbitrateOnLocalTransaction(Transaction *transaction) {
1652 // Check if this machine arbitrates for this transaction if not then
1653 // we cant arbitrate this transaction
1654 if (transaction->getArbitrator() != localMachineId) {
1655 return Pair<bool, bool>(false, false);
1658 if (!transaction->isComplete()) {
1659 // Will arbitrate in incorrect order if we continue so just break
1661 return Pair<bool, bool>(false, false);
1664 if (transaction->getMachineId() != localMachineId) {
1665 // dont do this check for local transactions
1666 if (lastTransactionSeenFromMachineFromServer->get(transaction->getMachineId()) != NULL) {
1667 if (lastTransactionSeenFromMachineFromServer->get(transaction->getMachineId()) > transaction->getClientLocalSequenceNumber()) {
1668 // We've have already seen this from the server
1669 return Pair<bool, bool>(false, false);
1674 if (transaction->evaluateGuard(committedKeyValueTable, NULL, NULL)) {
1675 // Guard evaluated as true Create the commit and increment the
1676 // commit sequence number
1677 Commit *newCommit = new Commit(localArbitrationSequenceNumber, localMachineId, -1);
1678 localArbitrationSequenceNumber++;
1680 // Update the local changes so we can make the commit
1681 for (KeyValue *kv : transaction->getKeyValueUpdateSet()) {
1682 newCommit->addKV(kv);
1685 // create the commit parts
1686 newCommit->createCommitParts();
1688 // Append all the commit parts to the end of the pending queue
1689 // waiting for sending to the server
1690 ArbitrationRound *arbitrationRound = new ArbitrationRound(newCommit, new Hashset<Abort *>());
1691 pendingSendArbitrationRounds->add(arbitrationRound);
1693 if (compactArbitrationData()) {
1694 ArbitrationRound *newArbitrationRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1);
1695 for (CommitPart *commitPart : newArbitrationRound->getCommit()->getParts()->values()) {
1696 processEntry(commitPart);
1699 // Insert the commit so we can process it
1700 for (CommitPart *commitPart : newCommit->getParts()->values()) {
1701 processEntry(commitPart);
1705 if (transaction->getMachineId() == localMachineId) {
1706 TransactionStatus *status = transaction->getTransactionStatus();
1707 if (status != NULL) {
1708 status->setStatus(TransactionStatus_StatusCommitted);
1712 updateLiveStateFromLocal();
1713 return Pair<bool, bool>(true, true);
1715 if (transaction->getMachineId() == localMachineId) {
1716 // For locally created messages update the status
1717 // Guard evaluated was false so create abort
1718 TransactionStatus status = transaction->getTransactionStatus();
1719 if (status != NULL) {
1720 status->setStatus(TransactionStatus_StatusAborted);
1723 Hashset<Abort *> *addAbortSet = new Hashset<Abort * >();
1726 Abort *newAbort = new Abort(NULL,
1727 transaction->getClientLocalSequenceNumber(),
1729 transaction->getMachineId(),
1730 transaction->getArbitrator(),
1731 localArbitrationSequenceNumber);
1732 localArbitrationSequenceNumber++;
1733 addAbortSet->add(newAbort);
1735 // Append all the commit parts to the end of the pending queue
1736 // waiting for sending to the server
1737 ArbitrationRound *arbitrationRound = new ArbitrationRound(NULL, addAbortSet);
1738 pendingSendArbitrationRounds->add(arbitrationRound);
1740 if (compactArbitrationData()) {
1741 ArbitrationRound *newArbitrationRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1);
1742 for (CommitPart *commitPart : newArbitrationRound->getCommit()->getParts()->values()) {
1743 processEntry(commitPart);
1748 updateLiveStateFromLocal();
1749 return Pair<bool, bool>(true, false);
1754 * Compacts the arbitration data my merging commits and aggregating
1755 * aborts so that a single large push of commits can be done instead
1756 * of many small updates
1758 bool Table::compactArbitrationData() {
1759 if (pendingSendArbitrationRounds->size() < 2) {
1760 // Nothing to compact so do nothing
1764 ArbitrationRound *lastRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1);
1765 if (lastRound->didSendPart()) {
1769 bool hadCommit = (lastRound->getCommit() == NULL);
1770 bool gotNewCommit = false;
1772 int numberToDelete = 1;
1773 while (numberToDelete < pendingSendArbitrationRounds->size()) {
1774 ArbitrationRound *xs round = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - numberToDelete - 1);
1776 if (round->isFull() || round->didSendPart()) {
1777 // Stop since there is a part that cannot be compacted and we
1778 // need to compact in order
1782 if (round->getCommit() == NULL) {
1783 // Try compacting aborts only
1784 int newSize = round->getCurrentSize() + lastRound->getAbortsCount();
1785 if (newSize > ArbitrationRound->MAX_PARTS) {
1786 // Cant compact since it would be too large
1789 lastRound->addAborts(round->getAborts());
1791 // Create a new larger commit
1792 Commit newCommit = Commit->merge(lastRound->getCommit(), round->getCommit(), localArbitrationSequenceNumber);
1793 localArbitrationSequenceNumber++;
1795 // Create the commit parts so that we can count them
1796 newCommit->createCommitParts();
1798 // Calculate the new size of the parts
1799 int newSize = newCommit->getNumberOfParts();
1800 newSize += lastRound->getAbortsCount();
1801 newSize += round->getAbortsCount();
1803 if (newSize > ArbitrationRound->MAX_PARTS) {
1804 // Cant compact since it would be too large
1808 // Set the new compacted part
1809 lastRound->setCommit(newCommit);
1810 lastRound->addAborts(round->getAborts());
1811 gotNewCommit = true;
1817 if (numberToDelete != 1) {
1818 // If there is a compaction
1819 // Delete the previous pieces that are now in the new compacted piece
1820 if (numberToDelete == pendingSendArbitrationRounds->size()) {
1821 pendingSendArbitrationRounds->clear();
1823 for (int i = 0; i < numberToDelete; i++) {
1824 pendingSendArbitrationRounds->remove(pendingSendArbitrationRounds->size() - 1);
1828 // Add the new compacted into the pending to send list
1829 pendingSendArbitrationRounds->add(lastRound);
1831 // Should reinsert into the commit processor
1832 if (hadCommit && gotNewCommit) {
1841 * Update all the commits and the committed tables, sets dead the dead
1844 bool Table::updateCommittedTable() {
1846 if (newCommitParts->size() == 0) {
1847 // Nothing new to process
1851 // Iterate through all the machine Ids that we received new parts for
1852 for (int64_t machineId : newCommitParts->keySet()) {
1853 Hashtable<Pair<int64_t, int32_t>, CommitPart *> *parts = newCommitParts->get(machineId);
1855 // Iterate through all the parts for that machine Id
1856 for (Pair<int64_t, int32_t> partId : parts->keySet()) {
1857 CommitPart *part = parts->get(partId);
1859 // Get the transaction object for that sequence number
1860 Hashtable<int64_t, Commit *> *commitForClientTable = liveCommitsTable->get(part->getMachineId());
1862 if (commitForClientTable == NULL) {
1863 // This is the first commit from this device
1864 commitForClientTable = new Hashtable<int64_t, Commit *>();
1865 liveCommitsTable->put(part->getMachineId(), commitForClientTable);
1868 Commit *commit = commitForClientTable->get(part->getSequenceNumber());
1870 if (commit == NULL) {
1871 // This is a new commit that we dont have so make a new one
1872 commit = new Commit();
1874 // Insert this new commit into the live tables
1875 commitForClientTable->put(part->getSequenceNumber(), commit);
1878 // Add that part to the commit
1879 commit->addPartDecode(part);
1883 // Clear all the new commits parts in preparation for the next time
1884 // the server sends slots
1885 newCommitParts->clear();
1887 // If we process a new commit keep track of it for future use
1888 bool didProcessANewCommit = false;
1890 // Process the commits one by one
1891 for (int64_T arbitratorId : liveCommitsTable->keySet()) {
1893 // Get all the commits for a specific arbitrator
1894 Hashtable<int64_t, Commit *> *commitForClientTable = liveCommitsTable->get(arbitratorId);
1896 // Sort the commits in order
1897 Vector<int64_t> *commitSequenceNumbers = new Vector<int64_t>(commitForClientTable->keySet());
1898 Collections->sort(commitSequenceNumbers);
1900 // Get the last commit seen from this arbitrator
1901 int64_t lastCommitSeenSequenceNumber = -1;
1902 if (lastCommitSeenSequenceNumberByArbitratorTable->get(arbitratorId) != NULL) {
1903 lastCommitSeenSequenceNumber = lastCommitSeenSequenceNumberByArbitratorTable->get(arbitratorId);
1906 // Go through each new commit one by one
1907 for (int i = 0; i < commitSequenceNumbers->size(); i++) {
1908 int64_t commitSequenceNumber = commitSequenceNumbers->get(i);
1909 Commit *commit = commitForClientTable->get(commitSequenceNumber);
1911 // Special processing if a commit is not complete
1912 if (!commit->isComplete()) {
1913 if (i == (commitSequenceNumbers->size() - 1)) {
1914 // If there is an incomplete commit and this commit is the
1915 // latest one seen then this commit cannot be processed and
1916 // there are no other commits
1919 // This is a commit that was already dead but parts of it
1920 // are still in the block chain (not flushed out yet)->
1921 // Delete it and move on
1923 commitForClientTable->remove(commit->getSequenceNumber());
1928 // Update the last transaction that was updated if we can
1929 if (commit->getTransactionSequenceNumber() != -1) {
1930 int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(commit->getMachineId());
1932 // Update the last transaction sequence number that the arbitrator arbitrated on
1933 if ((lastTransactionNumber == NULL) || (lastTransactionNumber < commit->getTransactionSequenceNumber())) {
1934 lastArbitratedTransactionNumberByArbitratorTable->put(commit->getMachineId(), commit->getTransactionSequenceNumber());
1938 // Update the last arbitration data that we have seen so far
1939 if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(commit->getMachineId()) != NULL) {
1941 int64_t lastArbitrationSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(commit->getMachineId());
1942 if (commit->getSequenceNumber() > lastArbitrationSequenceNumber) {
1944 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->put(commit->getMachineId(), commit->getSequenceNumber());
1947 // Never seen any data from this arbitrator so record the first one
1948 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->put(commit->getMachineId(), commit->getSequenceNumber());
1951 // We have already seen this commit before so need to do the
1952 // full processing on this commit
1953 if (commit->getSequenceNumber() <= lastCommitSeenSequenceNumber) {
1955 // Update the last transaction that was updated if we can
1956 if (commit->getTransactionSequenceNumber() != -1) {
1957 int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(commit->getMachineId());
1959 // Update the last transaction sequence number that the arbitrator arbitrated on
1960 if ((lastTransactionNumber == NULL) || (lastTransactionNumber < commit->getTransactionSequenceNumber())) {
1961 lastArbitratedTransactionNumberByArbitratorTable->put(commit->getMachineId(), commit->getTransactionSequenceNumber());
1968 // If we got here then this is a brand new commit and needs full
1970 // Get what commits should be edited, these are the commits that
1971 // have live values for their keys
1972 Hashset<Commit *> *commitsToEdit = new Hashset<Commit *>();
1973 for (KeyValue *kv : commit->getKeyValueUpdateSet()) {
1974 commitsToEdit->add(liveCommitsByKeyTable->get(kv->getKey()));
1976 commitsToEdit->remove(NULL); // remove NULL since it could be in this set
1978 // Update each previous commit that needs to be updated
1979 for (Commit *previousCommit : commitsToEdit) {
1981 // Only bother with live commits (TODO: Maybe remove this check)
1982 if (previousCommit->isLive()) {
1984 // Update which keys in the old commits are still live
1985 for (KeyValue *kv : commit->getKeyValueUpdateSet()) {
1986 previousCommit->invalidateKey(kv->getKey());
1989 // if the commit is now dead then remove it
1990 if (!previousCommit->isLive()) {
1991 commitForClientTable->remove(previousCommit);
1996 // Update the last seen sequence number from this arbitrator
1997 if (lastCommitSeenSequenceNumberByArbitratorTable->get(commit->getMachineId()) != NULL) {
1998 if (commit->getSequenceNumber() > lastCommitSeenSequenceNumberByArbitratorTable->get(commit->getMachineId())) {
1999 lastCommitSeenSequenceNumberByArbitratorTable->put(commit->getMachineId(), commit->getSequenceNumber());
2002 lastCommitSeenSequenceNumberByArbitratorTable->put(commit->getMachineId(), commit->getSequenceNumber());
2005 // We processed a new commit that we havent seen before
2006 didProcessANewCommit = true;
2008 // Update the committed table of keys and which commit is using which key
2009 for (KeyValue *kv : commit->getKeyValueUpdateSet()) {
2010 committedKeyValueTable->put(kv->getKey(), kv);
2011 liveCommitsByKeyTable->put(kv->getKey(), commit);
2016 return didProcessANewCommit;
2020 * Create the speculative table from transactions that are still live
2021 * and have come from the cloud
2023 bool Table::updateSpeculativeTable(bool didProcessNewCommits) {
2024 if (liveTransactionBySequenceNumberTable->keySet()->size() == 0) {
2025 // There is nothing to speculate on
2029 // Create a list of the transaction sequence numbers and sort them
2030 // from oldest to newest
2031 Vector<int64_t> *transactionSequenceNumbersSorted = new Vector<int64_t>(liveTransactionBySequenceNumberTable->keySet());
2032 Collections->sort(transactionSequenceNumbersSorted);
2034 bool hasGapInTransactionSequenceNumbers = transactionSequenceNumbersSorted->get(0) != oldestTransactionSequenceNumberSpeculatedOn;
2037 if (hasGapInTransactionSequenceNumbers || didProcessNewCommits) {
2038 // If there is a gap in the transaction sequence numbers then
2039 // there was a commit or an abort of a transaction OR there was a
2040 // new commit (Could be from offline commit) so a redo the
2041 // speculation from scratch
2043 // Start from scratch
2044 speculatedKeyValueTable->clear();
2045 lastTransactionSequenceNumberSpeculatedOn = -1;
2046 oldestTransactionSequenceNumberSpeculatedOn = -1;
2050 // Remember the front of the transaction list
2051 oldestTransactionSequenceNumberSpeculatedOn = transactionSequenceNumbersSorted->get(0);
2053 // Find where to start arbitration from
2054 int startIndex = transactionSequenceNumbersSorted->indexOf(lastTransactionSequenceNumberSpeculatedOn) + 1;
2056 if (startIndex >= transactionSequenceNumbersSorted->size()) {
2057 // Make sure we are not out of bounds
2058 return false; // did not speculate
2061 Hashset<int64_t> *incompleteTransactionArbitrator = new Hashset<int64_t>();
2062 bool didSkip = true;
2064 for (int i = startIndex; i < transactionSequenceNumbersSorted->size(); i++) {
2065 int64_t transactionSequenceNumber = transactionSequenceNumbersSorted->get(i);
2066 Transaction *transaction = liveTransactionBySequenceNumberTable->get(transactionSequenceNumber);
2068 if (!transaction->isComplete()) {
2069 // If there is an incomplete transaction then there is nothing
2070 // we can do add this transactions arbitrator to the list of
2071 // arbitrators we should ignore
2072 incompleteTransactionArbitrator->add(transaction->getArbitrator());
2077 if (incompleteTransactionArbitrator->contains(transaction->getArbitrator())) {
2081 lastTransactionSequenceNumberSpeculatedOn = transactionSequenceNumber;
2083 if (transaction->evaluateGuard(committedKeyValueTable, speculatedKeyValueTable, NULL)) {
2084 // Guard evaluated to true so update the speculative table
2085 for (KeyValue *kv : transaction->getKeyValueUpdateSet()) {
2086 speculatedKeyValueTable->put(kv->getKey(), kv);
2092 // Since there was a skip we need to redo the speculation next time around
2093 lastTransactionSequenceNumberSpeculatedOn = -1;
2094 oldestTransactionSequenceNumberSpeculatedOn = -1;
2097 // We did some speculation
2102 * Create the pending transaction speculative table from transactions
2103 * that are still in the pending transaction buffer
2105 void Table::updatePendingTransactionSpeculativeTable(bool didProcessNewCommitsOrSpeculate) {
2106 if (pendingTransactionQueue->size() == 0) {
2107 // There is nothing to speculate on
2111 if (didProcessNewCommitsOrSpeculate || (firstPendingTransaction != pendingTransactionQueue->get(0))) {
2112 // need to reset on the pending speculation
2113 lastPendingTransactionSpeculatedOn = NULL;
2114 firstPendingTransaction = pendingTransactionQueue->get(0);
2115 pendingTransactionSpeculatedKeyValueTable->clear();
2118 // Find where to start arbitration from
2119 int startIndex = pendingTransactionQueue->indexOf(firstPendingTransaction) + 1;
2121 if (startIndex >= pendingTransactionQueue->size()) {
2122 // Make sure we are not out of bounds
2126 for (int i = startIndex; i < pendingTransactionQueue->size(); i++) {
2127 Transaction *transaction = pendingTransactionQueue->get(i);
2129 lastPendingTransactionSpeculatedOn = transaction;
2131 if (transaction->evaluateGuard(committedKeyValueTable, speculatedKeyValueTable, pendingTransactionSpeculatedKeyValueTable)) {
2132 // Guard evaluated to true so update the speculative table
2133 for (KeyValue *kv : transaction->getKeyValueUpdateSet()) {
2134 pendingTransactionSpeculatedKeyValueTable->put(kv->getKey(), kv);
2141 * Set dead and remove from the live transaction tables the
2142 * transactions that are dead
2144 void Table::updateLiveTransactionsAndStatus() {
2146 // Go through each of the transactions
2147 for (Iterator<Map->Entry<int64_t, Transaction> > *iter = liveTransactionBySequenceNumberTable->entrySet()->iterator(); iter->hasNext();) {
2148 Transaction *transaction = iter->next()->getValue();
2150 // Check if the transaction is dead
2151 int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(transaction->getArbitrator());
2152 if ((lastTransactionNumber != NULL) && (lastTransactionNumber >= transaction->getSequenceNumber())) {
2154 // Set dead the transaction
2155 transaction->setDead();
2157 // Remove the transaction from the live table
2159 liveTransactionByTransactionIdTable->remove(transaction->getId());
2163 // Go through each of the transactions
2164 for (Iterator<Map->Entry<int64_t, TransactionStatus *> > *iter = outstandingTransactionStatus->entrySet()->iterator(); iter->hasNext();) {
2165 TransactionStatus *status = iter->next()->getValue();
2167 // Check if the transaction is dead
2168 int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(status->getTransactionArbitrator());
2169 if ((lastTransactionNumber != NULL) && (lastTransactionNumber >= status->getTransactionSequenceNumber())) {
2172 status->setStatus(TransactionStatus_StatusCommitted);
2181 * Process this slot, entry by entry-> Also update the latest message sent by slot
2183 void Table::processSlot(SlotIndexer *indexer, Slot *slot, bool acceptUpdatesToLocal, Hashset<int64_t> *machineSet) {
2185 // Update the last message seen
2186 updateLastMessage(slot->getMachineID(), slot->getSequenceNumber(), slot, acceptUpdatesToLocal, machineSet);
2188 // Process each entry in the slot
2189 for (Entry *entry : slot->getEntries()) {
2190 switch (entry->getType()) {
2191 case TypeCommitPart:
2192 processEntry((CommitPart *)entry);
2195 processEntry((Abort *)entry);
2197 case TypeTransactionPart:
2198 processEntry((TransactionPart *)entry);
2201 processEntry((NewKey *)entry);
2203 case TypeLastMessage:
2204 processEntry((LastMessage *)entry, machineSet);
2206 case TypeRejectedMessage:
2207 processEntry((RejectedMessage *)entry, indexer);
2209 case TypeTableStatus:
2210 processEntry((TableStatus *)entry, slot->getSequenceNumber());
2213 throw new Error("Unrecognized type: " + entry->getType());
2219 * Update the last message that was sent for a machine Id
2221 void Table::processEntry(LastMessage *entry, Hashset<int64_t> *machineSet) {
2222 // Update what the last message received by a machine was
2223 updateLastMessage(entry->getMachineID(), entry->getSequenceNumber(), entry, false, machineSet);
2227 * Add the new key to the arbitrators table and update the set of live
2228 * new keys (in case of a rescued new key message)
2230 void Table::processEntry(NewKey *entry) {
2231 // Update the arbitrator table with the new key information
2232 arbitratorTable->put(entry->getKey(), entry->getMachineID());
2234 // Update what the latest live new key is
2235 NewKey *oldNewKey = liveNewKeyTable->put(entry->getKey(), entry);
2236 if (oldNewKey != NULL) {
2237 // Delete the old new key messages
2238 oldNewKey->setDead();
2243 * Process new table status entries and set dead the old ones as new
2244 * ones come in-> keeps track of the largest and smallest table status
2245 * seen in this current round of updating the local copy of the block
2248 void Table::processEntry(TableStatus entry, int64_t seq) {
2249 int newNumSlots = entry->getMaxSlots();
2250 updateCurrMaxSize(newNumSlots);
2251 initExpectedSize(seq, newNumSlots);
2253 if (liveTableStatus != NULL) {
2254 // We have a larger table status so the old table status is no
2256 liveTableStatus->setDead();
2259 // Make this new table status the latest alive table status
2260 liveTableStatus = entry;
2264 * Check old messages to see if there is a block chain violation->
2267 void Table::processEntry(RejectedMessage *entry, SlotIndexer *indexer) {
2268 int64_t oldSeqNum = entry->getOldSeqNum();
2269 int64_t newSeqNum = entry->getNewSeqNum();
2270 bool isequal = entry->getEqual();
2271 int64_t machineId = entry->getMachineID();
2272 int64_t seq = entry->getSequenceNumber();
2274 // Check if we have messages that were supposed to be rejected in
2275 // our local block chain
2276 for (int64_t seqNum = oldSeqNum; seqNum <= newSeqNum; seqNum++) {
2278 Slot *slot = indexer->getSlot(seqNum);
2281 // If we have this slot make sure that it was not supposed to be
2283 int64_t slotMachineId = slot->getMachineID();
2284 if (isequal != (slotMachineId == machineId)) {
2285 throw new Error("Server Error: Trying to insert rejected message for slot " + seqNum);
2290 // Create a list of clients to watch until they see this rejected
2292 Hashset<int64_t> *deviceWatchSet = new Hashset<int64_t>();
2293 for (Map->Entry<int64_t, Pair<int64_t, Liveness *> > *lastMessageEntry : lastMessageTable->entrySet()) {
2294 // Machine ID for the last message entry
2295 int64_t lastMessageEntryMachineId = lastMessageEntry->getKey();
2297 // We've seen it, don't need to continue to watch-> Our next
2298 // message will implicitly acknowledge it->
2299 if (lastMessageEntryMachineId == localMachineId) {
2303 Pair<int64_t, Liveness *> lastMessageValue = lastMessageEntry->getValue();
2304 int64_t entrySequenceNumber = lastMessageValue->getFirst();
2306 if (entrySequenceNumber < seq) {
2307 // Add this rejected message to the set of messages that this
2308 // machine ID did not see yet
2309 addWatchVector(lastMessageEntryMachineId, entry);
2310 // This client did not see this rejected message yet so add it
2311 // to the watch set to monitor
2312 deviceWatchSet->add(lastMessageEntryMachineId);
2315 if (deviceWatchSet->isEmpty()) {
2316 // This rejected message has been seen by all the clients so
2319 // We need to watch this rejected message
2320 entry->setWatchSet(deviceWatchSet);
2325 * Check if this abort is live, if not then save it so we can kill it
2326 * later-> update the last transaction number that was arbitrated on->
2328 void Table::processEntry(Abort *entry) {
2329 if (entry->getTransactionSequenceNumber() != -1) {
2330 // update the transaction status if it was sent to the server
2331 TransactionStatus *status = outstandingTransactionStatus->remove(entry->getTransactionSequenceNumber());
2332 if (status != NULL) {
2333 status->setStatus(TransactionStatus_StatusAborted);
2337 // Abort has not been seen by the client it is for yet so we need to
2339 Abort *previouslySeenAbort = liveAbortTable->put(entry->getAbortId(), entry);
2340 if (previouslySeenAbort != NULL) {
2341 previouslySeenAbort->setDead(); // Delete old version of the abort since we got a rescued newer version
2344 if (entry->getTransactionArbitrator() == localMachineId) {
2345 liveAbortsGeneratedByLocal->put(entry->getArbitratorLocalSequenceNumber(), entry);
2348 if ((entry->getSequenceNumber() != -1) && (lastMessageTable->get(entry->getTransactionMachineId())->getFirst() >= entry->getSequenceNumber())) {
2349 // The machine already saw this so it is dead
2351 liveAbortTable->remove(entry->getAbortId());
2353 if (entry->getTransactionArbitrator() == localMachineId) {
2354 liveAbortsGeneratedByLocal->remove(entry->getArbitratorLocalSequenceNumber());
2359 // Update the last arbitration data that we have seen so far
2360 if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(entry->getTransactionArbitrator()) != NULL) {
2361 int64_t lastArbitrationSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(entry->getTransactionArbitrator());
2362 if (entry->getSequenceNumber() > lastArbitrationSequenceNumber) {
2364 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->put(entry->getTransactionArbitrator(), entry->getSequenceNumber());
2367 // Never seen any data from this arbitrator so record the first one
2368 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->put(entry->getTransactionArbitrator(), entry->getSequenceNumber());
2371 // Set dead a transaction if we can
2372 Transaction *transactionToSetDead = liveTransactionByTransactionIdTable->remove(Pair<int64_t, int64_t>(entry->getTransactionMachineId(), entry->getTransactionClientLocalSequenceNumber()));
2373 if (transactionToSetDead != NULL) {
2374 liveTransactionBySequenceNumberTable->remove(transactionToSetDead->getSequenceNumber());
2377 // Update the last transaction sequence number that the arbitrator
2379 int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(entry->getTransactionArbitrator());
2380 if ((lastTransactionNumber == NULL) || (lastTransactionNumber < entry->getTransactionSequenceNumber())) {
2382 if (entry->getTransactionSequenceNumber() != -1) {
2383 lastArbitratedTransactionNumberByArbitratorTable->put(entry->getTransactionArbitrator(), entry->getTransactionSequenceNumber());
2389 * Set dead the transaction part if that transaction is dead and keep
2390 * track of all new parts
2392 void Table::processEntry(TransactionPart *entry) {
2393 // Check if we have already seen this transaction and set it dead OR
2394 // if it is not alive
2395 int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(entry->getArbitratorId());
2396 if ((lastTransactionNumber != NULL) && (lastTransactionNumber >= entry->getSequenceNumber())) {
2397 // This transaction is dead, it was already committed or aborted
2402 // This part is still alive
2403 Hashtable<Pair<int64_t, int32_t>, TransactionPart *> *transactionPart = newTransactionParts->get(entry->getMachineId());
2405 if (transactionPart == NULL) {
2406 // Dont have a table for this machine Id yet so make one
2407 transactionPart = new Hashtable<Pair<int64_t, int32_t>, TransactionPart *>();
2408 newTransactionParts->put(entry->getMachineId(), transactionPart);
2411 // Update the part and set dead ones we have already seen (got a
2413 TransactionPart *previouslySeenPart = transactionPart->put(entry->getPartId(), entry);
2414 if (previouslySeenPart != NULL) {
2415 previouslySeenPart->setDead();
2420 * Process new commit entries and save them for future use-> Delete duplicates
2422 void Table::processEntry(CommitPart *entry) {
2423 // Update the last transaction that was updated if we can
2424 if (entry->getTransactionSequenceNumber() != -1) {
2425 int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(entry->getMachineId());
2426 // Update the last transaction sequence number that the arbitrator
2428 if ((lastTransactionNumber == NULL) || (lastTransactionNumber < entry->getTransactionSequenceNumber())) {
2429 lastArbitratedTransactionNumberByArbitratorTable->put(entry->getMachineId(), entry->getTransactionSequenceNumber());
2433 Hashtable<Pair<int64_t, int32_t>, CommitPart *> *commitPart = newCommitParts->get(entry->getMachineId());
2434 if (commitPart == NULL) {
2435 // Don't have a table for this machine Id yet so make one
2436 commitPart = new Hashtable<Pair<int64_t, int32_t>, CommitPart *>();
2437 newCommitParts->put(entry->getMachineId(), commitPart);
2439 // Update the part and set dead ones we have already seen (got a
2441 CommitPart *previouslySeenPart = commitPart->put(entry->getPartId(), entry);
2442 if (previouslySeenPart != NULL) {
2443 previouslySeenPart->setDead();
2448 * Update the last message seen table-> Update and set dead the
2449 * appropriate RejectedMessages as clients see them-> Updates the live
2450 * aborts, removes those that are dead and sets them dead-> Check that
2451 * the last message seen is correct and that there is no mismatch of
2452 * our own last message or that other clients have not had a rollback
2453 * on the last message->
2455 void Table::updateLastMessage(int64_t machineId, int64_t seqNum, Liveness *liveness, bool acceptUpdatesToLocal, Hashset<intr64_t> *machineSet) {
2456 // We have seen this machine ID
2457 machineSet->remove(machineId);
2459 // Get the set of rejected messages that this machine Id is has not seen yet
2460 Hashset<RejectedMessage *> *watchset = rejectedMessageWatchVectorTable->get(machineId);
2461 // If there is a rejected message that this machine Id has not seen yet
2462 if (watchset != NULL) {
2463 // Go through each rejected message that this machine Id has not
2465 for (Iterator<RejectedMessage *> *rmit = watchset->iterator(); rmit->hasNext(); ) {
2466 RejectedMessage *rm = rmit->next();
2467 // If this machine Id has seen this rejected message->->->
2468 if (rm->getSequenceNumber() <= seqNum) {
2469 // Remove it from our watchlist
2471 // Decrement machines that need to see this notification
2472 rm->removeWatcher(machineId);
2477 // Set dead the abort
2478 for (Iterator<Map->Entry<Pair<int64_t, int64_t>, Abort *> > i = liveAbortTable->entrySet()->iterator(); i->hasNext();) {
2479 Abort *abort = i->next()->getValue();
2480 if ((abort->getTransactionMachineId() == machineId) && (abort->getSequenceNumber() <= seqNum)) {
2483 if (abort->getTransactionArbitrator() == localMachineId) {
2484 liveAbortsGeneratedByLocal->remove(abort->getArbitratorLocalSequenceNumber());
2488 if (machineId == localMachineId) {
2489 // Our own messages are immediately dead->
2490 if (liveness instanceof LastMessage) {
2491 ((LastMessage *)liveness)->setDead();
2492 } else if (liveness instanceof Slot) {
2493 ((Slot *)liveness)->setDead();
2495 throw new Error("Unrecognized type");
2498 // Get the old last message for this device
2499 Pair<int64_t, Liveness *> lastMessageEntry = lastMessageTable->put(machineId, Pair<int64_t, Liveness *>(seqNum, liveness));
2500 if (lastMessageEntry == NULL) {
2501 // If no last message then there is nothing else to process
2505 int64_t lastMessageSeqNum = lastMessageEntry->getFirst();
2506 Liveness *lastEntry = lastMessageEntry->getSecond();
2508 // If it is not our machine Id since we already set ours to dead
2509 if (machineId != localMachineId) {
2510 if (lastEntry instanceof LastMessage) {
2511 ((LastMessage *)lastEntry)->setDead();
2512 } else if (lastEntry instanceof Slot) {
2513 ((Slot *)lastEntry)->setDead();
2515 throw new Error("Unrecognized type");
2518 // Make sure the server is not playing any games
2519 if (machineId == localMachineId) {
2520 if (hadPartialSendToServer) {
2521 // We were not making any updates and we had a machine mismatch
2522 if (lastMessageSeqNum > seqNum && !acceptUpdatesToLocal) {
2523 throw new Error("Server Error: Mismatch on local machine sequence number, needed at least: " + lastMessageSeqNum + " got: " + seqNum);
2526 // We were not making any updates and we had a machine mismatch
2527 if (lastMessageSeqNum != seqNum && !acceptUpdatesToLocal) {
2528 throw new Error("Server Error: Mismatch on local machine sequence number, needed: " + lastMessageSeqNum + " got: " + seqNum);
2532 if (lastMessageSeqNum > seqNum) {
2533 throw new Error("Server Error: Rollback on remote machine sequence number");
2539 * Add a rejected message entry to the watch set to keep track of
2540 * which clients have seen that rejected message entry and which have
2543 void Table::addWatchVector(int64_t machineId, RejectedMessage *entry) {
2544 Hashset<RejectedMessage *> *entries = rejectedMessageWatchVectorTable->get(machineId);
2545 if (entries == NULL) {
2546 // There is no set for this machine ID yet so create one
2547 entries = new Hashset<RejectedMessage *>();
2548 rejectedMessageWatchVectorTable->put(machineId, entries);
2550 entries->add(entry);
2554 * Check if the HMAC chain is not violated
2556 void Table::checkHMACChain(SlotIndexer *indexer, Array<Slot *> *newSlots) {
2557 for (int i = 0; i < newSlots->length(); i++) {
2558 Slot *currSlot = newSlots->get(i);
2559 Slot *prevSlot = indexer->getSlot(currSlot->getSequenceNumber() - 1);
2560 if (prevSlot != NULL &&
2561 !prevSlot->getHMAC()->equals(currSlot->getPrevHMAC()))
2562 throw new Error("Server Error: Invalid HMAC Chain");