3 #include "SlotBuffer.h"
8 #include "PendingTransaction.h"
9 #include "TableStatus.h"
10 #include "TransactionStatus.h"
11 #include "Transaction.h"
14 Table::Table(IoTString *baseurl, IoTString *password, int64_t _localMachineId, int listeningPort) :
16 cloud(new CloudComm(this, baseurl, password, listeningPort)),
18 liveTableStatus(NULL),
19 pendingTransactionBuilder(NULL),
20 lastPendingTransactionSpeculatedOn(NULL),
21 firstPendingTransaction(NULL),
23 bufferResizeThreshold(0),
25 oldestLiveSlotSequenceNumver(1),
26 localMachineId(_localMachineId),
28 localTransactionSequenceNumber(0),
29 lastTransactionSequenceNumberSpeculatedOn(0),
30 oldestTransactionSequenceNumberSpeculatedOn(0),
31 localArbitrationSequenceNumber(0),
32 hadPartialSendToServer(false),
33 attemptedToSendToServer(false),
35 didFindTableStatus(false),
37 lastSlotAttemptedToSend(NULL),
40 lastTransactionPartsSent(NULL),
41 lastPendingSendArbitrationEntriesToDelete(NULL),
43 committedKeyValueTable(NULL),
44 speculatedKeyValueTable(NULL),
45 pendingTransactionSpeculatedKeyValueTable(NULL),
46 liveNewKeyTable(NULL),
47 lastMessageTable(NULL),
48 rejectedMessageWatchVectorTable(NULL),
49 arbitratorTable(NULL),
51 newTransactionParts(NULL),
53 lastArbitratedTransactionNumberByArbitratorTable(NULL),
54 liveTransactionBySequenceNumberTable(NULL),
55 liveTransactionByTransactionIdTable(NULL),
56 liveCommitsTable(NULL),
57 liveCommitsByKeyTable(NULL),
58 lastCommitSeenSequenceNumberByArbitratorTable(NULL),
59 rejectedSlotVector(NULL),
60 pendingTransactionQueue(NULL),
61 pendingSendArbitrationRounds(NULL),
62 pendingSendArbitrationEntriesToDelete(NULL),
63 transactionPartsSent(NULL),
64 outstandingTransactionStatus(NULL),
65 liveAbortsGeneratedByLocal(NULL),
66 offlineTransactionsCommittedAndAtServer(NULL),
67 localCommunicationTable(NULL),
68 lastTransactionSeenFromMachineFromServer(NULL),
69 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator(NULL),
70 lastInsertedNewKey(false),
76 Table::Table(CloudComm * _cloud, int64_t _localMachineId) :
80 liveTableStatus(NULL),
81 pendingTransactionBuilder(NULL),
82 lastPendingTransactionSpeculatedOn(NULL),
83 firstPendingTransaction(NULL),
85 bufferResizeThreshold(0),
87 oldestLiveSlotSequenceNumver(1),
88 localMachineId(_localMachineId),
90 localTransactionSequenceNumber(0),
91 lastTransactionSequenceNumberSpeculatedOn(0),
92 oldestTransactionSequenceNumberSpeculatedOn(0),
93 localArbitrationSequenceNumber(0),
94 hadPartialSendToServer(false),
95 attemptedToSendToServer(false),
97 didFindTableStatus(false),
99 lastSlotAttemptedToSend(NULL),
102 lastTransactionPartsSent(NULL),
103 lastPendingSendArbitrationEntriesToDelete(NULL),
105 committedKeyValueTable(NULL),
106 speculatedKeyValueTable(NULL),
107 pendingTransactionSpeculatedKeyValueTable(NULL),
108 liveNewKeyTable(NULL),
109 lastMessageTable(NULL),
110 rejectedMessageWatchVectorTable(NULL),
111 arbitratorTable(NULL),
112 liveAbortTable(NULL),
113 newTransactionParts(NULL),
114 newCommitParts(NULL),
115 lastArbitratedTransactionNumberByArbitratorTable(NULL),
116 liveTransactionBySequenceNumberTable(NULL),
117 liveTransactionByTransactionIdTable(NULL),
118 liveCommitsTable(NULL),
119 liveCommitsByKeyTable(NULL),
120 lastCommitSeenSequenceNumberByArbitratorTable(NULL),
121 rejectedSlotVector(NULL),
122 pendingTransactionQueue(NULL),
123 pendingSendArbitrationRounds(NULL),
124 pendingSendArbitrationEntriesToDelete(NULL),
125 transactionPartsSent(NULL),
126 outstandingTransactionStatus(NULL),
127 liveAbortsGeneratedByLocal(NULL),
128 offlineTransactionsCommittedAndAtServer(NULL),
129 localCommunicationTable(NULL),
130 lastTransactionSeenFromMachineFromServer(NULL),
131 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator(NULL),
132 lastInsertedNewKey(false),
139 * Init all the stuff needed for for table usage
142 // Init helper objects
143 random = new Random();
144 buffer = new SlotBuffer();
147 committedKeyValueTable = new Hashtable<IoTString *, KeyValue *>();
148 speculatedKeyValueTable = new Hashtable<IoTString *, KeyValue *>();
149 pendingTransactionSpeculatedKeyValueTable = new Hashtable<IoTString *, KeyValue *>();
150 liveNewKeyTable = new Hashtable<IoTString *, NewKey*>();
151 lastMessageTable = new Hashtable<int64_t, Pair<int64_t, Liveness*> *>();
152 rejectedMessageWatchVectorTable = new Hashtable<int64_t, Hashset<RejectedMessage*>* >();
153 arbitratorTable = new Hashtable<IoTString *, int64_t>();
154 liveAbortTable = new Hashtable<Pair<int64_t, int64_t>*, Abort*>();
155 newTransactionParts = new Hashtable<int64_t, Hashtable<Pair<int64_t, int32_t>*, TransactionPart*> *>();
156 newCommitParts = new Hashtable<int64_t, Hashtable<Pair<int64_t, int32_t>*, CommitPart*> *>();
157 lastArbitratedTransactionNumberByArbitratorTable = new Hashtable<int64_t, int64_t>();
158 liveTransactionBySequenceNumberTable = new Hashtable<int64_t, Transaction*>();
159 liveTransactionByTransactionIdTable = new Hashtable<Pair<int64_t, int64_t>*, Transaction*>();
160 liveCommitsTable = new Hashtable<int64_t, Hashtable<int64_t, Commit*> >();
161 liveCommitsByKeyTable = new Hashtable<IoTString *, Commit*>();
162 lastCommitSeenSequenceNumberByArbitratorTable = new Hashtable<int64_t, int64_t>();
163 rejectedSlotVector = new Vector<int64_t>();
164 pendingTransactionQueue = new Vector<Transaction*>();
165 pendingSendArbitrationEntriesToDelete = new Vector<Entry*>();
166 transactionPartsSent = new Hashtable<Transaction*, Vector<int32_t> *>();
167 outstandingTransactionStatus = new Hashtable<int64_t, TransactionStatus*>();
168 liveAbortsGeneratedByLocal = new Hashtable<int64_t, Abort*>();
169 offlineTransactionsCommittedAndAtServer = new Hashset<Pair<int64_t, int64_t>*>();
170 localCommunicationTable = new Hashtable<int64_t, Pair<IoTString*, int32_t>*>();
171 lastTransactionSeenFromMachineFromServer = new Hashtable<int64_t, int64_t>();
172 pendingSendArbitrationRounds = new Vector<ArbitrationRound*>();
173 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator = new Hashtable<int64_t, int64_t>();
177 numberOfSlots = buffer->capacity();
178 setResizeThreshold();
182 * Initialize the table by inserting a table status as the first entry into the table status
183 * also initialize the crypto stuff.
185 void Table::initTable() {
186 cloud->initSecurity();
188 // Create the first insertion into the block chain which is the table status
189 Slot *s = new Slot(this, 1, localMachineId, localSequenceNumber);
190 localSequenceNumber++;
191 TableStatus *status = new TableStatus(s, numberOfSlots);
193 Array<Slot*> *array = cloud->putSlot(s, numberOfSlots);
196 array = new Array<Slot*>(1);
198 // update local block chain
199 validateAndUpdate(array, true);
200 } else if (array->length() == 1) {
201 // in case we did push the slot BUT we failed to init it
202 validateAndUpdate(array, true);
204 throw new Error("Error on initialization");
209 * Rebuild the table from scratch by pulling the latest block chain from the server.
211 void Table::rebuild() {
212 // Just pull the latest slots from the server
213 Array<Slot*> *newslots = cloud->getSlots(sequenceNumber + 1);
214 validateAndUpdate(newslots, true);
216 updateLiveTransactionsAndStatus();
219 void Table::addLocalCommunication(int64_t arbitrator, IoTString *hostName, int portNumber) {
220 localCommunicationTable->put(arbitrator, new Pair<IoTString*, int32_t>(hostName, portNumber));
223 int64_t Table::getArbitrator(IoTString *key) {
224 return arbitratorTable->get(key);
227 void Table::close() {
231 IoTString * Table::getCommitted(IoTString *key) {
232 KeyValue *kv = committedKeyValueTable->get(key);
235 return kv->getValue();
241 IoTString * Table::getSpeculative(IoTString *key) {
242 KeyValue *kv = pendingTransactionSpeculatedKeyValueTable->get(key);
245 kv = speculatedKeyValueTable->get(key);
249 kv = committedKeyValueTable->get(key);
253 return kv->getValue();
259 IoTString * Table::getCommittedAtomic(IoTString *key) {
260 KeyValue *kv = committedKeyValueTable->get(key);
262 if (arbitratorTable->get(key) == NULL) {
263 throw new Error("Key not Found.");
266 // Make sure new key value pair matches the current arbitrator
267 if (!pendingTransactionBuilder->checkArbitrator(arbitratorTable->get(key))) {
268 // TODO: Maybe not throw en error
269 throw new Error("Not all Key Values Match Arbitrator.");
273 pendingTransactionBuilder->addKVGuard(new KeyValue(key, kv->getValue()));
274 return kv->getValue();
276 pendingTransactionBuilder->addKVGuard(new KeyValue(key, NULL));
281 IoTString * Table::getSpeculativeAtomic(IoTString *key) {
282 if (arbitratorTable->get(key) == NULL) {
283 throw new Error("Key not Found.");
286 // Make sure new key value pair matches the current arbitrator
287 if (!pendingTransactionBuilder->checkArbitrator(arbitratorTable->get(key))) {
288 // TODO: Maybe not throw en error
289 throw new Error("Not all Key Values Match Arbitrator.");
292 KeyValue *kv = pendingTransactionSpeculatedKeyValueTable->get(key);
295 kv = speculatedKeyValueTable->get(key);
299 kv = committedKeyValueTable->get(key);
303 pendingTransactionBuilder->addKVGuard(new KeyValue(key, kv->getValue()));
304 return kv->getValue();
306 pendingTransactionBuilder->addKVGuard(new KeyValue(key, NULL));
311 bool Table::update() {
313 Array<Slot*> *newSlots = cloud->getSlots(sequenceNumber + 1);
314 validateAndUpdate(newSlots, false);
318 updateLiveTransactionsAndStatus();
321 } catch (Exception *e) {
322 // e->printStackTrace();
324 for (int64_t m : localCommunicationTable->keySet()) {
332 bool Table::createNewKey(IoTString *keyName, int64_t machineId) {
334 if (arbitratorTable->get(keyName) != NULL) {
335 // There is already an arbitrator
339 NewKey * newKey = new NewKey(NULL, keyName, machineId);
341 if (sendToServer(newKey)) {
342 // If successfully inserted
348 void Table::startTransaction() {
349 // Create a new transaction, invalidates any old pending transactions.
350 pendingTransactionBuilder = new PendingTransaction(localMachineId);
353 void Table::addKV(IoTString *key, IoTString *value) {
355 // Make sure it is a valid key
356 if (arbitratorTable->get(key) == NULL) {
357 throw new Error("Key not Found.");
360 // Make sure new key value pair matches the current arbitrator
361 if (!pendingTransactionBuilder->checkArbitrator(arbitratorTable->get(key))) {
362 // TODO: Maybe not throw en error
363 throw new Error("Not all Key Values Match Arbitrator.");
366 // Add the key value to this transaction
367 KeyValue *kv = new KeyValue(key, value);
368 pendingTransactionBuilder->addKV(kv);
371 TransactionStatus Table::commitTransaction() {
373 if (pendingTransactionBuilder->getKVUpdates()->size() == 0) {
374 // transaction with no updates will have no effect on the system
375 return new TransactionStatus(TransactionStatus_StatusNoEffect, -1);
378 // Set the local transaction sequence number and increment
379 pendingTransactionBuilder->setClientLocalSequenceNumber(localTransactionSequenceNumber);
380 localTransactionSequenceNumber++;
382 // Create the transaction status
383 TransactionStatus transactionStatus = new TransactionStatus(TransactionStatus_StatusPending, pendingTransactionBuilder->getArbitrator());
385 // Create the new transaction
386 Transaction *newTransaction = pendingTransactionBuilder->createTransaction();
387 newTransaction->setTransactionStatus(transactionStatus);
389 if (pendingTransactionBuilder->getArbitrator() != localMachineId) {
390 // Add it to the queue and invalidate the builder for safety
391 pendingTransactionQueue->add(newTransaction);
393 arbitrateOnLocalTransaction(newTransaction);
394 updateLiveStateFromLocal();
397 pendingTransactionBuilder = new PendingTransaction(localMachineId);
401 } catch (ServerException *e) {
403 Hashset<int64_t>* arbitratorTriedAndFailed = new Hashset<int64_t>();
404 for (Iterator<Transaction*> *iter = pendingTransactionQueue->iterator(); iter->hasNext(); ) {
405 Transaction * transaction = iter->next();
407 if (arbitratorTriedAndFailed->contains(transaction->getArbitrator())) {
408 // Already contacted this client so ignore all attempts to contact this client
409 // to preserve ordering for arbitrator
413 Pair<bool, bool> * sendReturn = sendTransactionToLocal(transaction);
415 if (sendReturn->getFirst()) {
416 // Failed to contact over local
417 arbitratorTriedAndFailed->add(transaction->getArbitrator());
419 // Successful contact or should not contact
421 if (sendReturn->getSecond()) {
429 updateLiveStateFromLocal();
431 return transactionStatus;
435 * Recalculate the new resize threshold
437 void Table::setResizeThreshold() {
438 int resizeLower = (int) (Table_RESIZE_THRESHOLD * numberOfSlots);
439 bufferResizeThreshold = resizeLower - 1 + random->nextInt(numberOfSlots - resizeLower);
442 int64_t Table::getLocalSequenceNumber() {
443 return localSequenceNumber;
447 bool lastInsertedNewKey = false;
449 bool Table::sendToServer(NewKey* newKey) {
451 bool fromRetry = false;
454 if (hadPartialSendToServer) {
455 Array<Slot*> *newSlots = cloud->getSlots(sequenceNumber + 1);
456 if (newSlots->length() == 0) {
458 ThreeTuple<bool, bool, Array<Slot*> *> *sendSlotsReturn = sendSlotsToServer(lastSlotAttemptedToSend, lastNewSize, lastIsNewKey);
460 if (sendSlotsReturn->getFirst()) {
461 if (newKey != NULL) {
462 if (lastInsertedNewKey && (lastNewKey->getKey() == newKey->getKey()) && (lastNewKey->getMachineID() == newKey->getMachineID())) {
467 for (Transaction *transaction : lastTransactionPartsSent->keySet()) {
468 transaction->resetServerFailure();
470 // Update which transactions parts still need to be sent
471 transaction->removeSentParts(lastTransactionPartsSent->get(transaction));
473 // Add the transaction status to the outstanding list
474 outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus());
476 // Update the transaction status
477 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial);
479 // Check if all the transaction parts were successfully sent and if so then remove it from pending
480 if (transaction->didSendAllParts()) {
481 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully);
482 pendingTransactionQueue->remove(transaction);
487 newSlots = sendSlotsReturn->getThird();
489 bool isInserted = false;
490 for (Slot *s : newSlots) {
491 if ((s->getSequenceNumber() == lastSlotAttemptedToSend->getSequenceNumber()) && (s->getMachineID() == localMachineId)) {
497 for (Slot *s : newSlots) {
502 // Process each entry in the slot
503 for (Entry *entry : s->getEntries()) {
505 if (entry->getType() == TypeLastMessage) {
506 LastMessage lastMessage = (LastMessage)entry;
507 if ((lastMessage->getMachineID() == localMachineId) && (lastMessage->getSequenceNumber() == lastSlotAttemptedToSend->getSequenceNumber())) {
516 if (newKey != NULL) {
517 if (lastInsertedNewKey && (lastNewKey->getKey() == newKey->getKey()) && (lastNewKey->getMachineID() == newKey->getMachineID())) {
522 for (Transaction *transaction : lastTransactionPartsSent->keySet()) {
523 transaction->resetServerFailure();
525 // Update which transactions parts still need to be sent
526 transaction->removeSentParts(lastTransactionPartsSent->get(transaction));
528 // Add the transaction status to the outstanding list
529 outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus());
531 // Update the transaction status
532 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial);
534 // Check if all the transaction parts were successfully sent and if so then remove it from pending
535 if (transaction->didSendAllParts()) {
536 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully);
537 pendingTransactionQueue->remove(transaction);
539 transaction->resetServerFailure();
540 // Set the transaction sequence number back to nothing
541 if (!transaction->didSendAPartToServer()) {
542 transaction->setSequenceNumber(-1);
549 for (Transaction *transaction : lastTransactionPartsSent->keySet()) {
550 transaction->resetServerFailure();
551 // Set the transaction sequence number back to nothing
552 if (!transaction->didSendAPartToServer()) {
553 transaction->setSequenceNumber(-1);
557 if (sendSlotsReturn->getThird()->length() != 0) {
558 // insert into the local block chain
559 validateAndUpdate(sendSlotsReturn->getThird(), true);
563 bool isInserted = false;
564 for (Slot *s : newSlots) {
565 if ((s->getSequenceNumber() == lastSlotAttemptedToSend->getSequenceNumber()) && (s->getMachineID() == localMachineId)) {
571 for (Slot *s : newSlots) {
576 // Process each entry in the slot
577 for (Entry *entry : s->getEntries()) {
579 if (entry->getType() == TypeLastMessage) {
580 LastMessage lastMessage = (LastMessage)entry;
581 if ((lastMessage->getMachineID() == localMachineId) && (lastMessage->getSequenceNumber() == lastSlotAttemptedToSend->getSequenceNumber())) {
590 if (newKey != NULL) {
591 if (lastInsertedNewKey && (lastNewKey->getKey() == newKey->getKey()) && (lastNewKey->getMachineID() == newKey->getMachineID())) {
596 for (Transaction *transaction : lastTransactionPartsSent->keySet()) {
597 transaction->resetServerFailure();
599 // Update which transactions parts still need to be sent
600 transaction->removeSentParts(lastTransactionPartsSent->get(transaction));
602 // Add the transaction status to the outstanding list
603 outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus());
605 // Update the transaction status
606 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial);
608 // Check if all the transaction parts were successfully sent and if so then remove it from pending
609 if (transaction->didSendAllParts()) {
610 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully);
611 pendingTransactionQueue->remove(transaction);
613 transaction->resetServerFailure();
614 // Set the transaction sequence number back to nothing
615 if (!transaction->didSendAPartToServer()) {
616 transaction->setSequenceNumber(-1);
621 for (Transaction *transaction : lastTransactionPartsSent->keySet()) {
622 transaction->resetServerFailure();
623 // Set the transaction sequence number back to nothing
624 if (!transaction->didSendAPartToServer()) {
625 transaction->setSequenceNumber(-1);
630 // insert into the local block chain
631 validateAndUpdate(newSlots, true);
634 } catch (ServerException *e) {
641 // While we have stuff that needs inserting into the block chain
642 while ((pendingTransactionQueue->size() > 0) || (pendingSendArbitrationRounds->size() > 0) || (newKey != NULL)) {
646 if (hadPartialSendToServer) {
647 throw new Error("Should Be error free");
652 // If there is a new key with same name then end
653 if ((newKey != NULL) && (arbitratorTable->get(newKey->getKey()) != NULL)) {
658 Slot *slot = new Slot(this, sequenceNumber + 1, localMachineId, buffer->getSlot(sequenceNumber)->getHMAC(), localSequenceNumber);
659 localSequenceNumber++;
661 // Try to fill the slot with data
662 ThreeTuple<bool, int32_t, bool> fillSlotsReturn = fillSlot(slot, false, newKey);
663 bool needsResize = fillSlotsReturn->getFirst();
664 int newSize = fillSlotsReturn->getSecond();
665 bool insertedNewKey = fillSlotsReturn->getThird();
668 // Reset which transaction to send
669 for (Transaction *transaction : transactionPartsSent->keySet()) {
670 transaction->resetNextPartToSend();
672 // Set the transaction sequence number back to nothing
673 if (!transaction->didSendAPartToServer() && !transaction->getServerFailure()) {
674 transaction->setSequenceNumber(-1);
678 // Clear the sent data since we are trying again
679 pendingSendArbitrationEntriesToDelete->clear();
680 transactionPartsSent->clear();
682 // We needed a resize so try again
683 fillSlot(slot, true, newKey);
686 lastSlotAttemptedToSend = slot;
687 lastIsNewKey = (newKey != NULL);
688 lastInsertedNewKey = insertedNewKey;
689 lastNewSize = newSize;
691 lastTransactionPartsSent = new Hashtable<Transaction*, Vector<int32_t>* >(transactionPartsSent);
692 lastPendingSendArbitrationEntriesToDelete = new Vector<Entry*>(pendingSendArbitrationEntriesToDelete);
695 ThreeTuple<bool, bool, Array<Slot*> *> sendSlotsReturn = sendSlotsToServer(slot, newSize, newKey != NULL);
697 if (sendSlotsReturn->getFirst()) {
699 // Did insert into the block chain
701 if (insertedNewKey) {
702 // This slot was what was inserted not a previous slot
704 // New Key was successfully inserted into the block chain so dont want to insert it again
708 // Remove the aborts and commit parts that were sent from the pending to send queue
709 for (Iterator<ArbitrationRound> iter = pendingSendArbitrationRounds->iterator(); iter->hasNext(); ) {
710 ArbitrationRound round = iter->next();
711 round->removeParts(pendingSendArbitrationEntriesToDelete);
713 if (round->isDoneSending()) {
714 // Sent all the parts
719 for (Transaction *transaction : transactionPartsSent->keySet()) {
720 transaction->resetServerFailure();
722 // Update which transactions parts still need to be sent
723 transaction->removeSentParts(transactionPartsSent->get(transaction));
725 // Add the transaction status to the outstanding list
726 outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus());
728 // Update the transaction status
729 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial);
731 // Check if all the transaction parts were successfully sent and if so then remove it from pending
732 if (transaction->didSendAllParts()) {
733 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully);
734 pendingTransactionQueue->remove(transaction);
738 // Reset which transaction to send
739 for (Transaction *transaction : transactionPartsSent->keySet()) {
740 transaction->resetNextPartToSend();
742 // Set the transaction sequence number back to nothing
743 if (!transaction->didSendAPartToServer() && !transaction->getServerFailure()) {
744 transaction->setSequenceNumber(-1);
749 // Clear the sent data in preparation for next send
750 pendingSendArbitrationEntriesToDelete->clear();
751 transactionPartsSent->clear();
753 if (sendSlotsReturn->getThird()->length() != 0) {
754 // insert into the local block chain
755 validateAndUpdate(sendSlotsReturn->getThird(), true);
759 } catch (ServerException *e) {
761 if (e->getType() != ServerException->TypeInputTimeout) {
762 // Nothing was able to be sent to the server so just clear these data structures
763 for (Transaction *transaction : transactionPartsSent->keySet()) {
764 transaction->resetNextPartToSend();
766 // Set the transaction sequence number back to nothing
767 if (!transaction->didSendAPartToServer() && !transaction->getServerFailure()) {
768 transaction->setSequenceNumber(-1);
772 // There was a partial send to the server
773 hadPartialSendToServer = true;
775 // Nothing was able to be sent to the server so just clear these data structures
776 for (Transaction *transaction : transactionPartsSent->keySet()) {
777 transaction->resetNextPartToSend();
778 transaction->setServerFailure();
782 pendingSendArbitrationEntriesToDelete->clear();
783 transactionPartsSent->clear();
788 return newKey == NULL;
791 bool Table::updateFromLocal(int64_t machineId) {
792 Pair<IoTString*, int32_t> localCommunicationInformation = localCommunicationTable->get(machineId);
793 if (localCommunicationInformation == NULL) {
794 // Cant talk to that device locally so do nothing
798 // Get the size of the send data
799 int sendDataSize = sizeof(int32_t) + sizeof(int64_t);
801 int64_t lastArbitrationDataLocalSequenceNumber = (int64_t) -1;
802 if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(machineId) != NULL) {
803 lastArbitrationDataLocalSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(machineId);
806 Array<char> *sendData = new Array<char>(sendDataSize);
807 ByteBuffer * bbEncode = ByteBuffer_wrap(sendData);
810 bbEncode->putLong(lastArbitrationDataLocalSequenceNumber);
814 Array<char> *returnData = cloud->sendLocalData(sendData, localSequenceNumber, localCommunicationInformation->getFirst(), localCommunicationInformation->getSecond());
815 localSequenceNumber++;
817 if (returnData == NULL) {
818 // Could not contact server
823 ByteBuffer *bbDecode = ByteBuffer_wrap(returnData);
824 int numberOfEntries = bbDecode->getInt();
826 for (int i = 0; i < numberOfEntries; i++) {
827 char type = bbDecode->get();
828 if (type == TypeAbort) {
829 Abort *abort = (Abort)Abort_decode(NULL, bbDecode);
831 } else if (type == TypeCommitPart) {
832 CommitPart *commitPart = (CommitPart)CommitPart_decode(NULL, bbDecode);
833 processEntry(commitPart);
837 updateLiveStateFromLocal();
842 Pair<bool, bool> Table::sendTransactionToLocal(Transaction *transaction) {
844 // Get the devices local communications
845 Pair<IoTString*, int32_t> localCommunicationInformation = localCommunicationTable->get(transaction->getArbitrator());
847 if (localCommunicationInformation == NULL) {
848 // Cant talk to that device locally so do nothing
849 return new Pair<bool, bool>(true, false);
852 // Get the size of the send data
853 int sendDataSize = sizeof(int32_t) + sizeof(int64_t);
854 for (TransactionPart *part : transaction->getParts()->values()) {
855 sendDataSize += part->getSize();
858 int64_t lastArbitrationDataLocalSequenceNumber = (int64_t) -1;
859 if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(transaction->getArbitrator()) != NULL) {
860 lastArbitrationDataLocalSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(transaction->getArbitrator());
863 // Make the send data size
864 Array<char> *sendData = new Array<char>(sendDataSize);
865 ByteBuffer *bbEncode = ByteBuffer.wrap(sendData);
868 bbEncode->putLong(lastArbitrationDataLocalSequenceNumber);
869 bbEncode->putInt(transaction->getParts()->size());
870 for (TransactionPart *part : transaction->getParts()->values()) {
871 part->encode(bbEncode);
876 Array<char> *returnData = cloud->sendLocalData(sendData, localSequenceNumber, localCommunicationInformation->getFirst(), localCommunicationInformation->getSecond());
877 localSequenceNumber++;
879 if (returnData == NULL) {
880 // Could not contact server
881 return new Pair<bool, bool>(true, false);
885 ByteBuffer *bbDecode = ByteBuffer_wrap(returnData);
886 bool didCommit = bbDecode->get() == 1;
887 bool couldArbitrate = bbDecode->get() == 1;
888 int numberOfEntries = bbDecode->getInt();
889 bool foundAbort = false;
891 for (int i = 0; i < numberOfEntries; i++) {
892 char type = bbDecode->get();
893 if (type == TypeAbort) {
894 Abort abort = (Abort)Abort_decode(NULL, bbDecode);
896 if ((abort->getTransactionMachineId() == localMachineId) && (abort->getTransactionClientLocalSequenceNumber() == transaction->getClientLocalSequenceNumber())) {
901 } else if (type == TypeCommitPart) {
902 CommitPart *commitPart = (CommitPart)CommitPart_decode(NULL, bbDecode);
903 processEntry(commitPart);
907 updateLiveStateFromLocal();
909 if (couldArbitrate) {
910 TransactionStatus status = transaction->getTransactionStatus();
912 status->setStatus(TransactionStatus_StatusCommitted);
914 status->setStatus(TransactionStatus_StatusAborted);
917 TransactionStatus status = transaction->getTransactionStatus();
919 status->setStatus(TransactionStatus_StatusAborted);
921 status->setStatus(TransactionStatus_StatusCommitted);
925 return new Pair<bool, bool>(false, true);
928 Array<char> *Table::acceptDataFromLocal(Array<char> *data) {
931 ByteBuffer *bbDecode = ByteBuffer_wrap(data);
932 int64_t lastArbitratedSequenceNumberSeen = bbDecode->getLong();
933 int numberOfParts = bbDecode->getInt();
935 // If we did commit a transaction or not
936 bool didCommit = false;
937 bool couldArbitrate = false;
939 if (numberOfParts != 0) {
941 // decode the transaction
942 Transaction *transaction = new Transaction();
943 for (int i = 0; i < numberOfParts; i++) {
945 TransactionPart newPart = (TransactionPart)TransactionPart.decode(NULL, bbDecode);
946 transaction->addPartDecode(newPart);
949 // Arbitrate on transaction and pull relevant return data
950 Pair<bool, bool> localArbitrateReturn = arbitrateOnLocalTransaction(transaction);
951 couldArbitrate = localArbitrateReturn->getFirst();
952 didCommit = localArbitrateReturn->getSecond();
954 updateLiveStateFromLocal();
956 // Transaction was sent to the server so keep track of it to prevent double commit
957 if (transaction->getSequenceNumber() != -1) {
958 offlineTransactionsCommittedAndAtServer->add(transaction->getId());
962 // The data to send back
963 int returnDataSize = 0;
964 Vector<Entry*> *unseenArbitrations = new Vector<Entry*>();
966 // Get the aborts to send back
967 Vector<int64_t> *abortLocalSequenceNumbers = new Vector<int64_t>(liveAbortsGeneratedByLocal->keySet());
968 Collections->sort(abortLocalSequenceNumbers);
969 for (int64_t localSequenceNumber : abortLocalSequenceNumbers) {
970 if (localSequenceNumber <= lastArbitratedSequenceNumberSeen) {
974 Abort abort = liveAbortsGeneratedByLocal->get(localSequenceNumber);
975 unseenArbitrations->add(abort);
976 returnDataSize += abort->getSize();
979 // Get the commits to send back
980 Hashtable<int64_t, Commit*>* commitForClientTable = liveCommitsTable->get(localMachineId);
981 if (commitForClientTable != NULL) {
982 Vector<int64_t> *commitLocalSequenceNumbers = new Vector<int64_t>(commitForClientTable->keySet());
983 Collections->sort(commitLocalSequenceNumbers);
985 for (int64_t localSequenceNumber : commitLocalSequenceNumbers) {
986 Commit commit = commitForClientTable->get(localSequenceNumber);
988 if (localSequenceNumber <= lastArbitratedSequenceNumberSeen) {
992 unseenArbitrations->addAll(commit->getParts()->values());
994 for (CommitPart commitPart : commit->getParts()->values()) {
995 returnDataSize += commitPart->getSize();
1000 // Number of arbitration entries to decode
1001 returnDataSize += 2 * sizeof(int32_t);
1003 // bool of did commit or not
1004 if (numberOfParts != 0) {
1005 returnDataSize += sizeof(char);
1008 // Data to send Back
1009 Array<char> *returnData = new Array<char>(returnDataSize);
1010 ByteBuffer *bbEncode = ByteBuffer_wrap(returnData);
1012 if (numberOfParts != 0) {
1014 bbEncode->put((char)1);
1016 bbEncode->put((char)0);
1018 if (couldArbitrate) {
1019 bbEncode->put((char)1);
1021 bbEncode->put((char)0);
1025 bbEncode->putInt(unseenArbitrations->size());
1026 for (Entry *entry : unseenArbitrations) {
1027 entry->encode(bbEncode);
1031 localSequenceNumber++;
1035 ThreeTuple<bool, bool, Array<Slot*> *> * Table::sendSlotsToServer(Slot *slot, int newSize, bool isNewKey) {
1036 bool attemptedToSendToServerTmp = attemptedToSendToServer;
1037 attemptedToSendToServer = true;
1039 bool inserted = false;
1040 bool lastTryInserted = false;
1042 Array<Slot*> *array = cloud->putSlot(slot, newSize);
1043 if (array == NULL) {
1044 array = new Array<Slot*>();
1045 array->set(0, slot);
1046 rejectedSlotVector->clear();
1049 if (array->length() == 0) {
1050 throw new Error("Server Error: Did not send any slots");
1053 // if (attemptedToSendToServerTmp) {
1054 if (hadPartialSendToServer) {
1056 bool isInserted = false;
1057 for (Slot *s : array) {
1058 if ((s->getSequenceNumber() == slot->getSequenceNumber()) && (s->getMachineID() == localMachineId)) {
1064 for (Slot *s : array) {
1069 // Process each entry in the slot
1070 for (Entry *entry : s->getEntries()) {
1072 if (entry->getType() == TypeLastMessage) {
1073 LastMessage lastMessage = (LastMessage)entry;
1075 if ((lastMessage->getMachineID() == localMachineId) && (lastMessage->getSequenceNumber() == slot->getSequenceNumber())) {
1084 rejectedSlotVector->add(slot->getSequenceNumber());
1085 lastTryInserted = false;
1087 lastTryInserted = true;
1090 rejectedSlotVector->add(slot->getSequenceNumber());
1091 lastTryInserted = false;
1095 return new ThreeTuple<bool, bool, Array<Slot*> *>(inserted, lastTryInserted, array);
1099 * Returns false if a resize was needed
1101 ThreeTuple<bool, int32_t, bool> *Table::fillSlot(Slot *slot, bool resize, NewKey * newKeyEntry) {
1105 if (liveSlotCount > bufferResizeThreshold) {
1106 resize = true; //Resize is forced
1111 newSize = (int) (numberOfSlots * Table_RESIZE_MULTIPLE);
1112 TableStatus *status = new TableStatus(slot, newSize);
1113 slot->addEntry(status);
1116 // Fill with rejected slots first before doing anything else
1117 doRejectedMessages(slot);
1119 // Do mandatory rescue of entries
1120 ThreeTuple<bool, bool, int64_t> mandatoryRescueReturn = doMandatoryResuce(slot, resize);
1122 // Extract working variables
1123 bool needsResize = mandatoryRescueReturn->getFirst();
1124 bool seenLiveSlot = mandatoryRescueReturn->getSecond();
1125 int64_t currentRescueSequenceNumber = mandatoryRescueReturn->getThird();
1127 if (needsResize && !resize) {
1128 // We need to resize but we are not resizing so return false
1129 return new ThreeTuple<bool, int32_t, bool>(true, NULL, NULL);
1132 bool inserted = false;
1133 if (newKeyEntry != NULL) {
1134 newKeyEntry->setSlot(slot);
1135 if (slot->hasSpace(newKeyEntry)) {
1137 slot->addEntry(newKeyEntry);
1142 // Clear the transactions, aborts and commits that were sent previously
1143 transactionPartsSent->clear();
1144 pendingSendArbitrationEntriesToDelete->clear();
1146 for (ArbitrationRound round : pendingSendArbitrationRounds) {
1147 bool isFull = false;
1148 round->generateParts();
1149 Vector<Entry*>* parts = round->getParts();
1151 // Insert pending arbitration data
1152 for (Entry arbitrationData : parts) {
1154 // If it is an abort then we need to set some information
1155 if (arbitrationData instanceof Abort) {
1156 ((Abort)arbitrationData)->setSequenceNumber(slot->getSequenceNumber());
1159 if (!slot->hasSpace(arbitrationData)) {
1160 // No space so cant do anything else with these data entries
1165 // Add to this current slot and add it to entries to delete
1166 slot->addEntry(arbitrationData);
1167 pendingSendArbitrationEntriesToDelete->add(arbitrationData);
1175 if (pendingTransactionQueue->size() > 0) {
1177 Transaction *transaction = pendingTransactionQueue->get(0);
1179 // Set the transaction sequence number if it has yet to be inserted into the block chain
1180 // if ((!transaction->didSendAPartToServer() && !transaction->getServerFailure()) || (transaction->getSequenceNumber() == -1)) {
1181 // transaction->setSequenceNumber(slot->getSequenceNumber());
1184 if ((!transaction->didSendAPartToServer()) || (transaction->getSequenceNumber() == -1)) {
1185 transaction->setSequenceNumber(slot->getSequenceNumber());
1190 TransactionPart *part = transaction->getNextPartToSend();
1193 // Ran out of parts to send for this transaction so move on
1197 if (slot->hasSpace(part)) {
1198 slot->addEntry(part);
1199 Vector<int32_t>* partsSent = transactionPartsSent->get(transaction);
1200 if (partsSent == NULL) {
1201 partsSent = new Vector<int32_t>();
1202 transactionPartsSent->put(transaction, partsSent);
1204 partsSent->add(part->getPartNumber());
1205 transactionPartsSent->put(transaction, partsSent);
1212 // Fill the remainder of the slot with rescue data
1213 doOptionalRescue(slot, seenLiveSlot, currentRescueSequenceNumber, resize);
1215 return new ThreeTuple<bool, int32_t, bool>(false, newSize, inserted);
1218 void Table::doRejectedMessages(Slot *s) {
1219 if (!rejectedSlotVector->isEmpty()) {
1220 /* TODO: We should avoid generating a rejected message entry if
1221 * there is already a sufficient entry in the queue (e->g->,
1222 * equalsto value of true and same sequence number)-> */
1224 int64_t old_seqn = rejectedSlotVector->firstElement();
1225 if (rejectedSlotVector->size() > Table_REJECTED_THRESHOLD) {
1226 int64_t new_seqn = rejectedSlotVector->lastElement();
1227 RejectedMessage rm = new RejectedMessage(s, s->getSequenceNumber(), localMachineId, old_seqn, new_seqn, false);
1230 int64_t prev_seqn = -1;
1232 /* Go through list of missing messages */
1233 for (; i < rejectedSlotVector->size(); i++) {
1234 int64_t curr_seqn = rejectedSlotVector->get(i);
1235 Slot *s_msg = buffer->getSlot(curr_seqn);
1238 prev_seqn = curr_seqn;
1240 /* Generate rejected message entry for missing messages */
1241 if (prev_seqn != -1) {
1242 RejectedMessage rm = new RejectedMessage(s, s->getSequenceNumber(), localMachineId, old_seqn, prev_seqn, false);
1245 /* Generate rejected message entries for present messages */
1246 for (; i < rejectedSlotVector->size(); i++) {
1247 int64_t curr_seqn = rejectedSlotVector->get(i);
1248 Slot *s_msg = buffer->getSlot(curr_seqn);
1249 int64_t machineid = s_msg->getMachineID();
1250 RejectedMessage rm = new RejectedMessage(s, s->getSequenceNumber(), machineid, curr_seqn, curr_seqn, true);
1257 ThreeTuple<bool, bool, int64_t> Table::doMandatoryResuce(Slot *slot, bool resize) {
1258 int64_t newestSequenceNumber = buffer->getNewestSeqNum();
1259 int64_t oldestSequenceNumber = buffer->getOldestSeqNum();
1260 if (oldestLiveSlotSequenceNumver < oldestSequenceNumber) {
1261 oldestLiveSlotSequenceNumver = oldestSequenceNumber;
1264 int64_t currentSequenceNumber = oldestLiveSlotSequenceNumver;
1265 bool seenLiveSlot = false;
1266 int64_t firstIfFull = newestSequenceNumber + 1 - numberOfSlots; // smallest seq number in the buffer if it is full
1267 int64_t threshold = firstIfFull + Table_FREE_SLOTS; // we want the buffer to be clear of live entries up to this point
1271 for (; currentSequenceNumber < threshold; currentSequenceNumber++) {
1272 Slot previousSlot = buffer->getSlot(currentSequenceNumber);
1273 // Push slot number forward
1274 if (!seenLiveSlot) {
1275 oldestLiveSlotSequenceNumver = currentSequenceNumber;
1278 if (!previousSlot->isLive()) {
1282 // We have seen a live slot
1283 seenLiveSlot = true;
1285 // Get all the live entries for a slot
1286 Vector<Entry*>* liveEntries = previousSlot->getLiveEntries(resize);
1288 // Iterate over all the live entries and try to rescue them
1289 for (Entry liveEntry : liveEntries) {
1290 if (slot->hasSpace(liveEntry)) {
1292 // Enough space to rescue the entry
1293 slot->addEntry(liveEntry);
1294 } else if (currentSequenceNumber == firstIfFull) {
1295 //if there's no space but the entry is about to fall off the queue
1296 System->out->println("B"); //?
1297 return new ThreeTuple<bool, bool, int64_t>(true, seenLiveSlot, currentSequenceNumber);
1304 return new ThreeTuple<bool, bool, int64_t>(false, seenLiveSlot, currentSequenceNumber);
1307 void Table::doOptionalRescue(Slot *s, bool seenliveslot, int64_t seqn, bool resize) {
1308 /* now go through live entries from least to greatest sequence number until
1309 * either all live slots added, or the slot doesn't have enough room
1310 * for SKIP_THRESHOLD consecutive entries*/
1312 int64_t newestseqnum = buffer->getNewestSeqNum();
1314 for (; seqn <= newestseqnum; seqn++) {
1315 Slot prevslot = buffer->getSlot(seqn);
1316 //Push slot number forward
1318 oldestLiveSlotSequenceNumver = seqn;
1320 if (!prevslot->isLive())
1322 seenliveslot = true;
1323 Vector<Entry*>* liveentries = prevslot->getLiveEntries(resize);
1324 for (Entry *liveentry : liveentries) {
1325 if (s->hasSpace(liveentry))
1326 s->addEntry(liveentry);
1329 if (skipcount > Table_SKIP_THRESHOLD)
1337 * Checks for malicious activity and updates the local copy of the block chain->
1339 void Table::validateAndUpdate(Array<Slot*> *newSlots, bool acceptUpdatesToLocal) {
1341 // The cloud communication layer has checked slot HMACs already before decoding
1342 if (newSlots->length() == 0) {
1346 // Make sure all slots are newer than the last largest slot this client has seen
1347 int64_t firstSeqNum = newSlots[0]->getSequenceNumber();
1348 if (firstSeqNum <= sequenceNumber) {
1349 throw new Error("Server Error: Sent older slots!");
1352 // Create an object that can access both new slots and slots in our local chain
1353 // without committing slots to our local chain
1354 SlotIndexer indexer = new SlotIndexer(newSlots, buffer);
1356 // Check that the HMAC chain is not broken
1357 checkHMACChain(indexer, newSlots);
1359 // Set to keep track of messages from clients
1360 Hashset<int64_t> *machineSet = new Hashset<int64_t>(lastMessageTable->keySet());
1362 // Process each slots data
1363 for (Slot *slot : newSlots) {
1364 processSlot(indexer, slot, acceptUpdatesToLocal, machineSet);
1366 updateExpectedSize();
1369 // If there is a gap, check to see if the server sent us everything->
1370 if (firstSeqNum != (sequenceNumber + 1)) {
1372 // Check the size of the slots that were sent down by the server->
1373 // Can only check the size if there was a gap
1374 checkNumSlots(newSlots->length);
1376 // Since there was a gap every machine must have pushed a slot or must have
1377 // a last message message-> If not then the server is hiding slots
1378 if (!machineSet->isEmpty()) {
1379 throw new Error("Missing record for machines: " + machineSet);
1383 // Update the size of our local block chain->
1386 // Commit new to slots to the local block chain->
1387 for (Slot *slot : newSlots) {
1389 // Insert this slot into our local block chain copy->
1390 buffer->putSlot(slot);
1392 // Keep track of how many slots are currently live (have live data in them)->
1396 // Get the sequence number of the latest slot in the system
1397 sequenceNumber = newSlots[newSlots->length() - 1]->getSequenceNumber();
1399 updateLiveStateFromServer();
1401 // No Need to remember after we pulled from the server
1402 offlineTransactionsCommittedAndAtServer->clear();
1404 // This is invalidated now
1405 hadPartialSendToServer = false;
1408 void Table::updateLiveStateFromServer() {
1409 // Process the new transaction parts
1410 processNewTransactionParts();
1412 // Do arbitration on new transactions that were received
1413 arbitrateFromServer();
1415 // Update all the committed keys
1416 bool didCommitOrSpeculate = updateCommittedTable();
1418 // Delete the transactions that are now dead
1419 updateLiveTransactionsAndStatus();
1422 didCommitOrSpeculate |= updateSpeculativeTable(didCommitOrSpeculate);
1423 updatePendingTransactionSpeculativeTable(didCommitOrSpeculate);
1426 void Table::updateLiveStateFromLocal() {
1427 // Update all the committed keys
1428 bool didCommitOrSpeculate = updateCommittedTable();
1430 // Delete the transactions that are now dead
1431 updateLiveTransactionsAndStatus();
1434 didCommitOrSpeculate |= updateSpeculativeTable(didCommitOrSpeculate);
1435 updatePendingTransactionSpeculativeTable(didCommitOrSpeculate);
1438 void Table::initExpectedSize(int64_t firstSequenceNumber, int64_t numberOfSlots) {
1439 // if (didFindTableStatus) {
1442 int64_t prevslots = firstSequenceNumber;
1445 if (didFindTableStatus) {
1446 // expectedsize = (prevslots < ((int64_t) numberOfSlots)) ? (int) prevslots : expectedsize;
1447 // System->out->println("Here2: " + expectedsize + " " + numberOfSlots + " " + prevslots);
1450 expectedsize = (prevslots < ((int64_t) numberOfSlots)) ? (int) prevslots : numberOfSlots;
1451 // System->out->println("Here: " + expectedsize);
1454 // System->out->println(numberOfSlots);
1456 didFindTableStatus = true;
1457 currMaxSize = numberOfSlots;
1460 void Table::updateExpectedSize() {
1463 if (expectedsize > currMaxSize) {
1464 expectedsize = currMaxSize;
1470 * Check the size of the block chain to make sure there are enough slots sent back by the server->
1471 * This is only called when we have a gap between the slots that we have locally and the slots
1472 * sent by the server therefore in the slots sent by the server there will be at least 1 Table
1475 void Table::checkNumSlots(int numberOfSlots) {
1476 if (numberOfSlots != expectedsize) {
1477 throw new Error("Server Error: Server did not send all slots-> Expected: " + expectedsize + " Received:" + numberOfSlots);
1481 void Table::updateCurrMaxSize(int newmaxsize) {
1482 currMaxSize = newmaxsize;
1487 * Update the size of of the local buffer if it is needed->
1489 void Table::commitNewMaxSize() {
1490 didFindTableStatus = false;
1492 // Resize the local slot buffer
1493 if (numberOfSlots != currMaxSize) {
1494 buffer->resize((int)currMaxSize);
1497 // Change the number of local slots to the new size
1498 numberOfSlots = (int)currMaxSize;
1501 // Recalculate the resize threshold since the size of the local buffer has changed
1502 setResizeThreshold();
1506 * Process the new transaction parts from this latest round of slots received from the server
1508 void Table::processNewTransactionParts() {
1510 if (newTransactionParts->size() == 0) {
1511 // Nothing new to process
1515 // Iterate through all the machine Ids that we received new parts for
1516 for (int64_t machineId : newTransactionParts->keySet()) {
1517 Hashtable<Pair<int64_t int32_t>*, TransactionPart*> * parts = newTransactionParts->get(machineId);
1519 // Iterate through all the parts for that machine Id
1520 for (Pair<int64_t, int32_t>* partId : parts->keySet()) {
1521 TransactionPart *part = parts->get(partId);
1523 int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(part->getArbitratorId());
1524 if ((lastTransactionNumber != NULL) && (lastTransactionNumber >= part->getSequenceNumber())) {
1525 // Set dead the transaction part
1530 // Get the transaction object for that sequence number
1531 Transaction *transaction = liveTransactionBySequenceNumberTable->get(part->getSequenceNumber());
1533 if (transaction == NULL) {
1534 // This is a new transaction that we dont have so make a new one
1535 transaction = new Transaction();
1537 // Insert this new transaction into the live tables
1538 liveTransactionBySequenceNumberTable->put(part->getSequenceNumber(), transaction);
1539 liveTransactionByTransactionIdTable->put(part->getTransactionId(), transaction);
1542 // Add that part to the transaction
1543 transaction->addPartDecode(part);
1547 // Clear all the new transaction parts in preparation for the next time the server sends slots
1548 newTransactionParts->clear();
1551 void Table::arbitrateFromServer() {
1553 if (liveTransactionBySequenceNumberTable->size() == 0) {
1554 // Nothing to arbitrate on so move on
1558 // Get the transaction sequence numbers and sort from oldest to newest
1559 Vector<int64_t> *transactionSequenceNumbers = new Vector<int64_t>(liveTransactionBySequenceNumberTable->keySet());
1560 Collections->sort(transactionSequenceNumbers);
1562 // Collection of key value pairs that are
1563 Hashtable<IoTString *, KeyValue*> speculativeTableTmp = new Hashtable<IoTString *, KeyValue*>();
1565 // The last transaction arbitrated on
1566 int64_t lastTransactionCommitted = -1;
1567 Hashset<Abort*>* generatedAborts = new Hashset<Abort*>();
1569 for (int64_t transactionSequenceNumber : transactionSequenceNumbers) {
1570 Transaction *transaction = liveTransactionBySequenceNumberTable->get(transactionSequenceNumber);
1574 // Check if this machine arbitrates for this transaction if not then we cant arbitrate this transaction
1575 if (transaction->getArbitrator() != localMachineId) {
1579 if (transactionSequenceNumber < lastSeqNumArbOn) {
1583 if (offlineTransactionsCommittedAndAtServer->contains(transaction->getId())) {
1584 // We have seen this already locally so dont commit again
1589 if (!transaction->isComplete()) {
1590 // Will arbitrate in incorrect order if we continue so just break
1596 // update the largest transaction seen by arbitrator from server
1597 if (lastTransactionSeenFromMachineFromServer->get(transaction->getMachineId()) == NULL) {
1598 lastTransactionSeenFromMachineFromServer->put(transaction->getMachineId(), transaction->getClientLocalSequenceNumber());
1600 int64_t lastTransactionSeenFromMachine = lastTransactionSeenFromMachineFromServer->get(transaction->getMachineId());
1601 if (transaction->getClientLocalSequenceNumber() > lastTransactionSeenFromMachine) {
1602 lastTransactionSeenFromMachineFromServer->put(transaction->getMachineId(), transaction->getClientLocalSequenceNumber());
1606 if (transaction->evaluateGuard(committedKeyValueTable, speculativeTableTmp, NULL)) {
1607 // Guard evaluated as true
1609 // Update the local changes so we can make the commit
1610 for (KeyValue *kv : transaction->getKeyValueUpdateSet()) {
1611 speculativeTableTmp->put(kv->getKey(), kv);
1614 // Update what the last transaction committed was for use in batch commit
1615 lastTransactionCommitted = transactionSequenceNumber;
1617 // Guard evaluated was false so create abort
1620 Abort newAbort = new Abort(NULL,
1621 transaction->getClientLocalSequenceNumber(),
1622 transaction->getSequenceNumber(),
1623 transaction->getMachineId(),
1624 transaction->getArbitrator(),
1625 localArbitrationSequenceNumber);
1626 localArbitrationSequenceNumber++;
1628 generatedAborts->add(newAbort);
1630 // Insert the abort so we can process
1631 processEntry(newAbort);
1634 lastSeqNumArbOn = transactionSequenceNumber;
1636 // liveTransactionBySequenceNumberTable->remove(transactionSequenceNumber);
1639 Commit newCommit = NULL;
1641 // If there is something to commit
1642 if (speculativeTableTmp->size() != 0) {
1644 // Create the commit and increment the commit sequence number
1645 newCommit = new Commit(localArbitrationSequenceNumber, localMachineId, lastTransactionCommitted);
1646 localArbitrationSequenceNumber++;
1648 // Add all the new keys to the commit
1649 for (KeyValue *kv : speculativeTableTmp->values()) {
1650 newCommit->addKV(kv);
1653 // create the commit parts
1654 newCommit->createCommitParts();
1656 // Append all the commit parts to the end of the pending queue waiting for sending to the server
1658 // Insert the commit so we can process it
1659 for (CommitPart commitPart : newCommit->getParts()->values()) {
1660 processEntry(commitPart);
1664 if ((newCommit != NULL) || (generatedAborts->size() > 0)) {
1665 ArbitrationRound arbitrationRound = new ArbitrationRound(newCommit, generatedAborts);
1666 pendingSendArbitrationRounds->add(arbitrationRound);
1668 if (compactArbitrationData()) {
1669 ArbitrationRound newArbitrationRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1);
1670 if (newArbitrationRound->getCommit() != NULL) {
1671 for (CommitPart commitPart : newArbitrationRound->getCommit()->getParts()->values()) {
1672 processEntry(commitPart);
1679 Pair<bool, bool> Table::arbitrateOnLocalTransaction(Transaction *transaction) {
1681 // Check if this machine arbitrates for this transaction if not then we cant arbitrate this transaction
1682 if (transaction->getArbitrator() != localMachineId) {
1683 return new Pair<bool, bool>(false, false);
1686 if (!transaction->isComplete()) {
1687 // Will arbitrate in incorrect order if we continue so just break
1689 return new Pair<bool, bool>(false, false);
1692 if (transaction->getMachineId() != localMachineId) {
1693 // dont do this check for local transactions
1694 if (lastTransactionSeenFromMachineFromServer->get(transaction->getMachineId()) != NULL) {
1695 if (lastTransactionSeenFromMachineFromServer->get(transaction->getMachineId()) > transaction->getClientLocalSequenceNumber()) {
1696 // We've have already seen this from the server
1697 return new Pair<bool, bool>(false, false);
1702 if (transaction->evaluateGuard(committedKeyValueTable, NULL, NULL)) {
1703 // Guard evaluated as true
1705 // Create the commit and increment the commit sequence number
1706 Commit newCommit = new Commit(localArbitrationSequenceNumber, localMachineId, -1);
1707 localArbitrationSequenceNumber++;
1709 // Update the local changes so we can make the commit
1710 for (KeyValue *kv : transaction->getKeyValueUpdateSet()) {
1711 newCommit->addKV(kv);
1714 // create the commit parts
1715 newCommit->createCommitParts();
1717 // Append all the commit parts to the end of the pending queue waiting for sending to the server
1718 ArbitrationRound *arbitrationRound = new ArbitrationRound(newCommit, new Hashset<Abort *>());
1719 pendingSendArbitrationRounds->add(arbitrationRound);
1721 if (compactArbitrationData()) {
1722 ArbitrationRound newArbitrationRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1);
1723 for (CommitPart commitPart : newArbitrationRound->getCommit()->getParts()->values()) {
1724 processEntry(commitPart);
1727 // Insert the commit so we can process it
1728 for (CommitPart commitPart : newCommit->getParts()->values()) {
1729 processEntry(commitPart);
1733 if (transaction->getMachineId() == localMachineId) {
1734 TransactionStatus status = transaction->getTransactionStatus();
1735 if (status != NULL) {
1736 status->setStatus(TransactionStatus_StatusCommitted);
1740 updateLiveStateFromLocal();
1741 return new Pair<bool, bool>(true, true);
1744 if (transaction->getMachineId() == localMachineId) {
1745 // For locally created messages update the status
1747 // Guard evaluated was false so create abort
1748 TransactionStatus status = transaction->getTransactionStatus();
1749 if (status != NULL) {
1750 status->setStatus(TransactionStatus_StatusAborted);
1753 Hashset<Abort *> addAbortSet = new Hashset<Abort * >();
1757 Abort newAbort = new Abort(NULL,
1758 transaction->getClientLocalSequenceNumber(),
1760 transaction->getMachineId(),
1761 transaction->getArbitrator(),
1762 localArbitrationSequenceNumber);
1763 localArbitrationSequenceNumber++;
1765 addAbortSet->add(newAbort);
1768 // Append all the commit parts to the end of the pending queue waiting for sending to the server
1769 ArbitrationRound arbitrationRound = new ArbitrationRound(NULL, addAbortSet);
1770 pendingSendArbitrationRounds->add(arbitrationRound);
1772 if (compactArbitrationData()) {
1773 ArbitrationRound newArbitrationRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1);
1774 for (CommitPart commitPart : newArbitrationRound->getCommit()->getParts()->values()) {
1775 processEntry(commitPart);
1780 updateLiveStateFromLocal();
1781 return new Pair<bool, bool>(true, false);
1786 * Compacts the arbitration data my merging commits and aggregating aborts so that a single large push of commits can be done instead of many small updates
1788 bool Table::compactArbitrationData() {
1790 if (pendingSendArbitrationRounds->size() < 2) {
1791 // Nothing to compact so do nothing
1795 ArbitrationRound lastRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1);
1796 if (lastRound->didSendPart()) {
1800 bool hadCommit = (lastRound->getCommit() == NULL);
1801 bool gotNewCommit = false;
1803 int numberToDelete = 1;
1804 while (numberToDelete < pendingSendArbitrationRounds->size()) {
1805 ArbitrationRound round = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - numberToDelete - 1);
1807 if (round->isFull() || round->didSendPart()) {
1808 // Stop since there is a part that cannot be compacted and we need to compact in order
1812 if (round->getCommit() == NULL) {
1814 // Try compacting aborts only
1815 int newSize = round->getCurrentSize() + lastRound->getAbortsCount();
1816 if (newSize > ArbitrationRound->MAX_PARTS) {
1817 // Cant compact since it would be too large
1820 lastRound->addAborts(round->getAborts());
1823 // Create a new larger commit
1824 Commit newCommit = Commit->merge(lastRound->getCommit(), round->getCommit(), localArbitrationSequenceNumber);
1825 localArbitrationSequenceNumber++;
1827 // Create the commit parts so that we can count them
1828 newCommit->createCommitParts();
1830 // Calculate the new size of the parts
1831 int newSize = newCommit->getNumberOfParts();
1832 newSize += lastRound->getAbortsCount();
1833 newSize += round->getAbortsCount();
1835 if (newSize > ArbitrationRound->MAX_PARTS) {
1836 // Cant compact since it would be too large
1840 // Set the new compacted part
1841 lastRound->setCommit(newCommit);
1842 lastRound->addAborts(round->getAborts());
1843 gotNewCommit = true;
1849 if (numberToDelete != 1) {
1850 // If there is a compaction
1852 // Delete the previous pieces that are now in the new compacted piece
1853 if (numberToDelete == pendingSendArbitrationRounds->size()) {
1854 pendingSendArbitrationRounds->clear();
1856 for (int i = 0; i < numberToDelete; i++) {
1857 pendingSendArbitrationRounds->remove(pendingSendArbitrationRounds->size() - 1);
1861 // Add the new compacted into the pending to send list
1862 pendingSendArbitrationRounds->add(lastRound);
1864 // Should reinsert into the commit processor
1865 if (hadCommit && gotNewCommit) {
1872 // bool compactArbitrationData() {
1877 * Update all the commits and the committed tables, sets dead the dead transactions
1879 bool Table::updateCommittedTable() {
1881 if (newCommitParts->size() == 0) {
1882 // Nothing new to process
1886 // Iterate through all the machine Ids that we received new parts for
1887 for (int64_t machineId : newCommitParts->keySet()) {
1888 Hashtable<Pair<int64_t, int32_t>*, CommitPart*>* parts = newCommitParts->get(machineId);
1890 // Iterate through all the parts for that machine Id
1891 for (Pair<int64_t, int32_t>* partId : parts->keySet()) {
1892 CommitPart part = parts->get(partId);
1894 // Get the transaction object for that sequence number
1895 Hashtable<int64_t, Commit*>* commitForClientTable = liveCommitsTable->get(part->getMachineId());
1897 if (commitForClientTable == NULL) {
1898 // This is the first commit from this device
1899 commitForClientTable = new Hashtable<int64_t, Commit*>();
1900 liveCommitsTable->put(part->getMachineId(), commitForClientTable);
1903 Commit commit = commitForClientTable->get(part->getSequenceNumber());
1905 if (commit == NULL) {
1906 // This is a new commit that we dont have so make a new one
1907 commit = new Commit();
1909 // Insert this new commit into the live tables
1910 commitForClientTable->put(part->getSequenceNumber(), commit);
1913 // Add that part to the commit
1914 commit->addPartDecode(part);
1918 // Clear all the new commits parts in preparation for the next time the server sends slots
1919 newCommitParts->clear();
1921 // If we process a new commit keep track of it for future use
1922 bool didProcessANewCommit = false;
1924 // Process the commits one by one
1925 for (int64_T arbitratorId : liveCommitsTable->keySet()) {
1927 // Get all the commits for a specific arbitrator
1928 Hashtable<int64_t, Commit*> commitForClientTable = liveCommitsTable->get(arbitratorId);
1930 // Sort the commits in order
1931 Vector<int64_t>* commitSequenceNumbers = new Vector<int64_t>(commitForClientTable->keySet());
1932 Collections->sort(commitSequenceNumbers);
1934 // Get the last commit seen from this arbitrator
1935 int64_t lastCommitSeenSequenceNumber = -1;
1936 if (lastCommitSeenSequenceNumberByArbitratorTable->get(arbitratorId) != NULL) {
1937 lastCommitSeenSequenceNumber = lastCommitSeenSequenceNumberByArbitratorTable->get(arbitratorId);
1940 // Go through each new commit one by one
1941 for (int i = 0; i < commitSequenceNumbers->size(); i++) {
1942 int64_t commitSequenceNumber = commitSequenceNumbers->get(i);
1943 Commit *commit = commitForClientTable->get(commitSequenceNumber);
1945 // Special processing if a commit is not complete
1946 if (!commit->isComplete()) {
1947 if (i == (commitSequenceNumbers->size() - 1)) {
1948 // If there is an incomplete commit and this commit is the latest one seen then this commit cannot be processed and there are no other commits
1951 // This is a commit that was already dead but parts of it are still in the block chain (not flushed out yet)->
1952 // Delete it and move on
1954 commitForClientTable->remove(commit->getSequenceNumber());
1959 // Update the last transaction that was updated if we can
1960 if (commit->getTransactionSequenceNumber() != -1) {
1961 int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(commit->getMachineId());
1963 // Update the last transaction sequence number that the arbitrator arbitrated on
1964 if ((lastTransactionNumber == NULL) || (lastTransactionNumber < commit->getTransactionSequenceNumber())) {
1965 lastArbitratedTransactionNumberByArbitratorTable->put(commit->getMachineId(), commit->getTransactionSequenceNumber());
1969 // Update the last arbitration data that we have seen so far
1970 if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(commit->getMachineId()) != NULL) {
1972 int64_t lastArbitrationSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(commit->getMachineId());
1973 if (commit->getSequenceNumber() > lastArbitrationSequenceNumber) {
1975 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->put(commit->getMachineId(), commit->getSequenceNumber());
1978 // Never seen any data from this arbitrator so record the first one
1979 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->put(commit->getMachineId(), commit->getSequenceNumber());
1982 // We have already seen this commit before so need to do the full processing on this commit
1983 if (commit->getSequenceNumber() <= lastCommitSeenSequenceNumber) {
1985 // Update the last transaction that was updated if we can
1986 if (commit->getTransactionSequenceNumber() != -1) {
1987 int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(commit->getMachineId());
1989 // Update the last transaction sequence number that the arbitrator arbitrated on
1990 if ((lastTransactionNumber == NULL) || (lastTransactionNumber < commit->getTransactionSequenceNumber())) {
1991 lastArbitratedTransactionNumberByArbitratorTable->put(commit->getMachineId(), commit->getTransactionSequenceNumber());
1998 // If we got here then this is a brand new commit and needs full processing
2000 // Get what commits should be edited, these are the commits that have live values for their keys
2001 Hashset<Commit *> *commitsToEdit = new Hashset<Commit *>();
2002 for (KeyValue *kv : commit->getKeyValueUpdateSet()) {
2003 commitsToEdit->add(liveCommitsByKeyTable->get(kv->getKey()));
2005 commitsToEdit->remove(NULL); // remove NULL since it could be in this set
2007 // Update each previous commit that needs to be updated
2008 for (Commit * previousCommit : commitsToEdit) {
2010 // Only bother with live commits (TODO: Maybe remove this check)
2011 if (previousCommit->isLive()) {
2013 // Update which keys in the old commits are still live
2014 for (KeyValue * kv : commit->getKeyValueUpdateSet()) {
2015 previousCommit->invalidateKey(kv->getKey());
2018 // if the commit is now dead then remove it
2019 if (!previousCommit->isLive()) {
2020 commitForClientTable->remove(previousCommit);
2025 // Update the last seen sequence number from this arbitrator
2026 if (lastCommitSeenSequenceNumberByArbitratorTable->get(commit->getMachineId()) != NULL) {
2027 if (commit->getSequenceNumber() > lastCommitSeenSequenceNumberByArbitratorTable->get(commit->getMachineId())) {
2028 lastCommitSeenSequenceNumberByArbitratorTable->put(commit->getMachineId(), commit->getSequenceNumber());
2031 lastCommitSeenSequenceNumberByArbitratorTable->put(commit->getMachineId(), commit->getSequenceNumber());
2034 // We processed a new commit that we havent seen before
2035 didProcessANewCommit = true;
2037 // Update the committed table of keys and which commit is using which key
2038 for (KeyValue *kv : commit->getKeyValueUpdateSet()) {
2039 committedKeyValueTable->put(kv->getKey(), kv);
2040 liveCommitsByKeyTable->put(kv->getKey(), commit);
2045 return didProcessANewCommit;
2049 * Create the speculative table from transactions that are still live and have come from the cloud
2051 bool Table::updateSpeculativeTable(bool didProcessNewCommits) {
2052 if (liveTransactionBySequenceNumberTable->keySet()->size() == 0) {
2053 // There is nothing to speculate on
2057 // Create a list of the transaction sequence numbers and sort them from oldest to newest
2058 Vector<int64_t>* transactionSequenceNumbersSorted = new Vector<int64_t>(liveTransactionBySequenceNumberTable->keySet());
2059 Collections->sort(transactionSequenceNumbersSorted);
2061 bool hasGapInTransactionSequenceNumbers = transactionSequenceNumbersSorted->get(0) != oldestTransactionSequenceNumberSpeculatedOn;
2064 if (hasGapInTransactionSequenceNumbers || didProcessNewCommits) {
2065 // If there is a gap in the transaction sequence numbers then there was a commit or an abort of a transaction
2066 // OR there was a new commit (Could be from offline commit) so a redo the speculation from scratch
2068 // Start from scratch
2069 speculatedKeyValueTable->clear();
2070 lastTransactionSequenceNumberSpeculatedOn = -1;
2071 oldestTransactionSequenceNumberSpeculatedOn = -1;
2075 // Remember the front of the transaction list
2076 oldestTransactionSequenceNumberSpeculatedOn = transactionSequenceNumbersSorted->get(0);
2078 // Find where to start arbitration from
2079 int startIndex = transactionSequenceNumbersSorted->indexOf(lastTransactionSequenceNumberSpeculatedOn) + 1;
2081 if (startIndex >= transactionSequenceNumbersSorted->size()) {
2082 // Make sure we are not out of bounds
2083 return false; // did not speculate
2086 Hashset<int64_t> *incompleteTransactionArbitrator = new Hashset<int64_t>();
2087 bool didSkip = true;
2089 for (int i = startIndex; i < transactionSequenceNumbersSorted->size(); i++) {
2090 int64_t transactionSequenceNumber = transactionSequenceNumbersSorted->get(i);
2091 Transaction *transaction = liveTransactionBySequenceNumberTable->get(transactionSequenceNumber);
2093 if (!transaction->isComplete()) {
2094 // If there is an incomplete transaction then there is nothing we can do
2095 // add this transactions arbitrator to the list of arbitrators we should ignore
2096 incompleteTransactionArbitrator->add(transaction->getArbitrator());
2101 if (incompleteTransactionArbitrator->contains(transaction->getArbitrator())) {
2105 lastTransactionSequenceNumberSpeculatedOn = transactionSequenceNumber;
2107 if (transaction->evaluateGuard(committedKeyValueTable, speculatedKeyValueTable, NULL)) {
2108 // Guard evaluated to true so update the speculative table
2109 for (KeyValue *kv : transaction->getKeyValueUpdateSet()) {
2110 speculatedKeyValueTable->put(kv->getKey(), kv);
2116 // Since there was a skip we need to redo the speculation next time around
2117 lastTransactionSequenceNumberSpeculatedOn = -1;
2118 oldestTransactionSequenceNumberSpeculatedOn = -1;
2121 // We did some speculation
2126 * Create the pending transaction speculative table from transactions that are still in the pending transaction buffer
2128 void Table::updatePendingTransactionSpeculativeTable(bool didProcessNewCommitsOrSpeculate) {
2129 if (pendingTransactionQueue->size() == 0) {
2130 // There is nothing to speculate on
2135 if (didProcessNewCommitsOrSpeculate || (firstPendingTransaction != pendingTransactionQueue->get(0))) {
2136 // need to reset on the pending speculation
2137 lastPendingTransactionSpeculatedOn = NULL;
2138 firstPendingTransaction = pendingTransactionQueue->get(0);
2139 pendingTransactionSpeculatedKeyValueTable->clear();
2142 // Find where to start arbitration from
2143 int startIndex = pendingTransactionQueue->indexOf(firstPendingTransaction) + 1;
2145 if (startIndex >= pendingTransactionQueue->size()) {
2146 // Make sure we are not out of bounds
2150 for (int i = startIndex; i < pendingTransactionQueue->size(); i++) {
2151 Transaction *transaction = pendingTransactionQueue->get(i);
2153 lastPendingTransactionSpeculatedOn = transaction;
2155 if (transaction->evaluateGuard(committedKeyValueTable, speculatedKeyValueTable, pendingTransactionSpeculatedKeyValueTable)) {
2156 // Guard evaluated to true so update the speculative table
2157 for (KeyValue *kv : transaction->getKeyValueUpdateSet()) {
2158 pendingTransactionSpeculatedKeyValueTable->put(kv->getKey(), kv);
2165 * Set dead and remove from the live transaction tables the transactions that are dead
2167 void Table::updateLiveTransactionsAndStatus() {
2169 // Go through each of the transactions
2170 for (Iterator<Map->Entry<int64_t, Transaction> >* iter = liveTransactionBySequenceNumberTable->entrySet()->iterator(); iter->hasNext();) {
2171 Transaction *transaction = iter->next()->getValue();
2173 // Check if the transaction is dead
2174 int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(transaction->getArbitrator());
2175 if ((lastTransactionNumber != NULL) && (lastTransactionNumber >= transaction->getSequenceNumber())) {
2177 // Set dead the transaction
2178 transaction->setDead();
2180 // Remove the transaction from the live table
2182 liveTransactionByTransactionIdTable->remove(transaction->getId());
2186 // Go through each of the transactions
2187 for (Iterator<Map->Entry<int64_t, TransactionStatus*> >* iter = outstandingTransactionStatus->entrySet()->iterator(); iter->hasNext();) {
2188 TransactionStatus status = iter->next()->getValue();
2190 // Check if the transaction is dead
2191 int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(status->getTransactionArbitrator());
2192 if ((lastTransactionNumber != NULL) && (lastTransactionNumber >= status->getTransactionSequenceNumber())) {
2195 status->setStatus(TransactionStatus_StatusCommitted);
2204 * Process this slot, entry by entry-> Also update the latest message sent by slot
2206 void Table::processSlot(SlotIndexer indexer, Slot *slot, bool acceptUpdatesToLocal, Hashset<int64_t> *machineSet) {
2208 // Update the last message seen
2209 updateLastMessage(slot->getMachineID(), slot->getSequenceNumber(), slot, acceptUpdatesToLocal, machineSet);
2211 // Process each entry in the slot
2212 for (Entry *entry : slot->getEntries()) {
2213 switch (entry->getType()) {
2215 case TypeCommitPart:
2216 processEntry((CommitPart)entry);
2220 processEntry((Abort)entry);
2223 case TypeTransactionPart:
2224 processEntry((TransactionPart)entry);
2228 processEntry((NewKey)entry);
2231 case TypeLastMessage:
2232 processEntry((LastMessage)entry, machineSet);
2235 case TypeRejectedMessage:
2236 processEntry((RejectedMessage)entry, indexer);
2239 case TypeTableStatus:
2240 processEntry((TableStatus)entry, slot->getSequenceNumber());
2244 throw new Error("Unrecognized type: " + entry->getType());
2250 * Update the last message that was sent for a machine Id
2252 void Table::processEntry(LastMessage entry, Hashset<int64_t> *machineSet) {
2253 // Update what the last message received by a machine was
2254 updateLastMessage(entry->getMachineID(), entry->getSequenceNumber(), entry, false, machineSet);
2258 * Add the new key to the arbitrators table and update the set of live new keys (in case of a rescued new key message)
2260 void Table::processEntry(NewKey* entry) {
2262 // Update the arbitrator table with the new key information
2263 arbitratorTable->put(entry->getKey(), entry->getMachineID());
2265 // Update what the latest live new key is
2266 NewKey oldNewKey = liveNewKeyTable->put(entry->getKey(), entry);
2267 if (oldNewKey != NULL) {
2268 // Delete the old new key messages
2269 oldNewKey->setDead();
2274 * Process new table status entries and set dead the old ones as new ones come in->
2275 * keeps track of the largest and smallest table status seen in this current round
2276 * of updating the local copy of the block chain
2278 void Table::processEntry(TableStatus entry, int64_t seq) {
2279 int newNumSlots = entry->getMaxSlots();
2280 updateCurrMaxSize(newNumSlots);
2282 initExpectedSize(seq, newNumSlots);
2284 if (liveTableStatus != NULL) {
2285 // We have a larger table status so the old table status is no int64_ter alive
2286 liveTableStatus->setDead();
2289 // Make this new table status the latest alive table status
2290 liveTableStatus = entry;
2294 * Check old messages to see if there is a block chain violation-> Also
2296 void Table::processEntry(RejectedMessage entry, SlotIndexer indexer) {
2297 int64_t oldSeqNum = entry->getOldSeqNum();
2298 int64_t newSeqNum = entry->getNewSeqNum();
2299 bool isequal = entry->getEqual();
2300 int64_t machineId = entry->getMachineID();
2301 int64_t seq = entry->getSequenceNumber();
2304 // Check if we have messages that were supposed to be rejected in our local block chain
2305 for (int64_t seqNum = oldSeqNum; seqNum <= newSeqNum; seqNum++) {
2308 Slot *slot = indexer->getSlot(seqNum);
2311 // If we have this slot make sure that it was not supposed to be a rejected slot
2313 int64_t slotMachineId = slot->getMachineID();
2314 if (isequal != (slotMachineId == machineId)) {
2315 throw new Error("Server Error: Trying to insert rejected message for slot " + seqNum);
2321 // Create a list of clients to watch until they see this rejected message entry->
2322 Hashset<int64_t> *deviceWatchSet = new Hashset<int64_t>();
2323 for (Map->Entry<int64_t, Pair<int64_t, Liveness>*>* lastMessageEntry : lastMessageTable->entrySet()) {
2325 // Machine ID for the last message entry
2326 int64_t lastMessageEntryMachineId = lastMessageEntry->getKey();
2328 // We've seen it, don't need to continue to watch-> Our next
2329 // message will implicitly acknowledge it->
2330 if (lastMessageEntryMachineId == localMachineId) {
2334 Pair<int64_t, Liveness> *lastMessageValue = lastMessageEntry->getValue();
2335 int64_t entrySequenceNumber = lastMessageValue->getFirst();
2337 if (entrySequenceNumber < seq) {
2339 // Add this rejected message to the set of messages that this machine ID did not see yet
2340 addWatchVector(lastMessageEntryMachineId, entry);
2342 // This client did not see this rejected message yet so add it to the watch set to monitor
2343 deviceWatchSet->add(lastMessageEntryMachineId);
2347 if (deviceWatchSet->isEmpty()) {
2348 // This rejected message has been seen by all the clients so
2351 // We need to watch this rejected message
2352 entry->setWatchSet(deviceWatchSet);
2357 * Check if this abort is live, if not then save it so we can kill it later->
2358 * update the last transaction number that was arbitrated on->
2360 void Table::processEntry(Abort entry) {
2363 if (entry->getTransactionSequenceNumber() != -1) {
2364 // update the transaction status if it was sent to the server
2365 TransactionStatus status = outstandingTransactionStatus->remove(entry->getTransactionSequenceNumber());
2366 if (status != NULL) {
2367 status->setStatus(TransactionStatus_StatusAborted);
2371 // Abort has not been seen by the client it is for yet so we need to keep track of it
2372 Abort previouslySeenAbort = liveAbortTable->put(entry->getAbortId(), entry);
2373 if (previouslySeenAbort != NULL) {
2374 previouslySeenAbort->setDead(); // Delete old version of the abort since we got a rescued newer version
2377 if (entry->getTransactionArbitrator() == localMachineId) {
2378 liveAbortsGeneratedByLocal->put(entry->getArbitratorLocalSequenceNumber(), entry);
2381 if ((entry->getSequenceNumber() != -1) && (lastMessageTable->get(entry->getTransactionMachineId())->getFirst() >= entry->getSequenceNumber())) {
2383 // The machine already saw this so it is dead
2385 liveAbortTable->remove(entry->getAbortId());
2387 if (entry->getTransactionArbitrator() == localMachineId) {
2388 liveAbortsGeneratedByLocal->remove(entry->getArbitratorLocalSequenceNumber());
2397 // Update the last arbitration data that we have seen so far
2398 if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(entry->getTransactionArbitrator()) != NULL) {
2400 int64_t lastArbitrationSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(entry->getTransactionArbitrator());
2401 if (entry->getSequenceNumber() > lastArbitrationSequenceNumber) {
2403 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->put(entry->getTransactionArbitrator(), entry->getSequenceNumber());
2406 // Never seen any data from this arbitrator so record the first one
2407 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->put(entry->getTransactionArbitrator(), entry->getSequenceNumber());
2411 // Set dead a transaction if we can
2412 Transaction *transactionToSetDead = liveTransactionByTransactionIdTable->remove(new Pair<int64_t, int64_t>(entry->getTransactionMachineId(), entry->getTransactionClientLocalSequenceNumber()));
2413 if (transactionToSetDead != NULL) {
2414 liveTransactionBySequenceNumberTable->remove(transactionToSetDead->getSequenceNumber());
2417 // Update the last transaction sequence number that the arbitrator arbitrated on
2418 int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(entry->getTransactionArbitrator());
2419 if ((lastTransactionNumber == NULL) || (lastTransactionNumber < entry->getTransactionSequenceNumber())) {
2422 if (entry->getTransactionSequenceNumber() != -1) {
2423 lastArbitratedTransactionNumberByArbitratorTable->put(entry->getTransactionArbitrator(), entry->getTransactionSequenceNumber());
2429 * Set dead the transaction part if that transaction is dead and keep track of all new parts
2431 void Table::processEntry(TransactionPart entry) {
2432 // Check if we have already seen this transaction and set it dead OR if it is not alive
2433 int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(entry->getArbitratorId());
2434 if ((lastTransactionNumber != NULL) && (lastTransactionNumber >= entry->getSequenceNumber())) {
2435 // This transaction is dead, it was already committed or aborted
2440 // This part is still alive
2441 Hashtable<Pair<int64_t, int32_t>*, TransactionPart*>* transactionPart = newTransactionParts->get(entry->getMachineId());
2443 if (transactionPart == NULL) {
2444 // Dont have a table for this machine Id yet so make one
2445 transactionPart = new Hashtable<Pair<int64_t, int32_t>*, TransactionPart*>();
2446 newTransactionParts->put(entry->getMachineId(), transactionPart);
2449 // Update the part and set dead ones we have already seen (got a rescued version)
2450 TransactionPart *previouslySeenPart = transactionPart->put(entry->getPartId(), entry);
2451 if (previouslySeenPart != NULL) {
2452 previouslySeenPart->setDead();
2457 * Process new commit entries and save them for future use-> Delete duplicates
2459 void Table::processEntry(CommitPart entry) {
2462 // Update the last transaction that was updated if we can
2463 if (entry->getTransactionSequenceNumber() != -1) {
2464 int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(entry->getMachineId());
2466 // Update the last transaction sequence number that the arbitrator arbitrated on
2467 if ((lastTransactionNumber == NULL) || (lastTransactionNumber < entry->getTransactionSequenceNumber())) {
2468 lastArbitratedTransactionNumberByArbitratorTable->put(entry->getMachineId(), entry->getTransactionSequenceNumber());
2475 Hashtable<Pair<int64_t, int32_t>*, CommitPart*>* commitPart = newCommitParts->get(entry->getMachineId());
2477 if (commitPart == NULL) {
2478 // Don't have a table for this machine Id yet so make one
2479 commitPart = new Hashtable<Pair<int64_t, int32_t>*, CommitPart*>();
2480 newCommitParts->put(entry->getMachineId(), commitPart);
2483 // Update the part and set dead ones we have already seen (got a rescued version)
2484 CommitPart previouslySeenPart = commitPart->put(entry->getPartId(), entry);
2485 if (previouslySeenPart != NULL) {
2486 previouslySeenPart->setDead();
2491 * Update the last message seen table-> Update and set dead the appropriate RejectedMessages as clients see them->
2492 * Updates the live aborts, removes those that are dead and sets them dead->
2493 * Check that the last message seen is correct and that there is no mismatch of our own last message or that
2494 * other clients have not had a rollback on the last message->
2496 void Table::updateLastMessage(int64_t machineId, int64_t seqNum, Liveness liveness, bool acceptUpdatesToLocal, Hashset<intr64_t> *machineSet) {
2498 // We have seen this machine ID
2499 machineSet->remove(machineId);
2501 // Get the set of rejected messages that this machine Id is has not seen yet
2502 Hashset<RejectedMessage *> *watchset = rejectedMessageWatchVectorTable->get(machineId);
2504 // If there is a rejected message that this machine Id has not seen yet
2505 if (watchset != NULL) {
2507 // Go through each rejected message that this machine Id has not seen yet
2508 for (Iterator<RejectedMessage> rmit = watchset->iterator(); rmit->hasNext(); ) {
2510 RejectedMessage rm = rmit->next();
2512 // If this machine Id has seen this rejected message->->->
2513 if (rm->getSequenceNumber() <= seqNum) {
2515 // Remove it from our watchlist
2518 // Decrement machines that need to see this notification
2519 rm->removeWatcher(machineId);
2524 // Set dead the abort
2525 for (Iterator<Map->Entry<Pair<int64_t, int64_t>*, Abort*> > i = liveAbortTable->entrySet()->iterator(); i->hasNext();) {
2526 Abort abort = i->next()->getValue();
2528 if ((abort->getTransactionMachineId() == machineId) && (abort->getSequenceNumber() <= seqNum)) {
2532 if (abort->getTransactionArbitrator() == localMachineId) {
2533 liveAbortsGeneratedByLocal->remove(abort->getArbitratorLocalSequenceNumber());
2540 if (machineId == localMachineId) {
2541 // Our own messages are immediately dead->
2542 if (liveness instanceof LastMessage) {
2543 ((LastMessage)liveness)->setDead();
2544 } else if (liveness instanceof Slot) {
2545 ((Slot)liveness)->setDead();
2547 throw new Error("Unrecognized type");
2551 // Get the old last message for this device
2552 Pair<int64_t, Liveness*> lastMessageEntry = lastMessageTable->put(machineId, new Pair<int64_t, Liveness*>(seqNum, liveness));
2553 if (lastMessageEntry == NULL) {
2554 // If no last message then there is nothing else to process
2558 int64_t lastMessageSeqNum = lastMessageEntry->getFirst();
2559 Liveness lastEntry = lastMessageEntry->getSecond();
2561 // If it is not our machine Id since we already set ours to dead
2562 if (machineId != localMachineId) {
2563 if (lastEntry instanceof LastMessage) {
2564 ((LastMessage)lastEntry)->setDead();
2565 } else if (lastEntry instanceof Slot) {
2566 ((Slot)lastEntry)->setDead();
2568 throw new Error("Unrecognized type");
2572 // Make sure the server is not playing any games
2573 if (machineId == localMachineId) {
2575 if (hadPartialSendToServer) {
2576 // We were not making any updates and we had a machine mismatch
2577 if (lastMessageSeqNum > seqNum && !acceptUpdatesToLocal) {
2578 throw new Error("Server Error: Mismatch on local machine sequence number, needed at least: " + lastMessageSeqNum + " got: " + seqNum);
2582 // We were not making any updates and we had a machine mismatch
2583 if (lastMessageSeqNum != seqNum && !acceptUpdatesToLocal) {
2584 throw new Error("Server Error: Mismatch on local machine sequence number, needed: " + lastMessageSeqNum + " got: " + seqNum);
2588 if (lastMessageSeqNum > seqNum) {
2589 throw new Error("Server Error: Rollback on remote machine sequence number");
2595 * Add a rejected message entry to the watch set to keep track of which clients have seen that
2596 * rejected message entry and which have not->
2598 void Table::addWatchVector(int64_t machineId, RejectedMessage entry) {
2599 Hashset<RejectedMessage *> *entries = rejectedMessageWatchVectorTable->get(machineId);
2600 if (entries == NULL) {
2601 // There is no set for this machine ID yet so create one
2602 entries = new Hashset<RejectedMessage *>();
2603 rejectedMessageWatchVectorTable->put(machineId, entries);
2605 entries->add(entry);
2609 * Check if the HMAC chain is not violated
2611 void Table::checkHMACChain(SlotIndexer indexer, Array<Slot*> *newSlots) {
2612 for (int i = 0; i < newSlots->length; i++) {
2613 Slot currSlot = newSlots[i];
2614 Slot prevSlot = indexer->getSlot(currSlot->getSequenceNumber() - 1);
2615 if (prevSlot != NULL &&
2616 !Arrays->equals(prevSlot->getHMAC(), currSlot->getPrevHMAC()))
2617 throw new Error("Server Error: Invalid HMAC Chain" + currSlot + " " + prevSlot);