3 #include "SlotBuffer.h"
8 #include "PendingTransaction.h"
9 #include "TableStatus.h"
10 #include "TransactionStatus.h"
11 #include "Transaction.h"
12 #include "LastMessage.h"
16 Table::Table(IoTString *baseurl, IoTString *password, int64_t _localMachineId, int listeningPort) :
18 cloud(new CloudComm(this, baseurl, password, listeningPort)),
20 liveTableStatus(NULL),
21 pendingTransactionBuilder(NULL),
22 lastPendingTransactionSpeculatedOn(NULL),
23 firstPendingTransaction(NULL),
25 bufferResizeThreshold(0),
27 oldestLiveSlotSequenceNumver(1),
28 localMachineId(_localMachineId),
30 localTransactionSequenceNumber(0),
31 lastTransactionSequenceNumberSpeculatedOn(0),
32 oldestTransactionSequenceNumberSpeculatedOn(0),
33 localArbitrationSequenceNumber(0),
34 hadPartialSendToServer(false),
35 attemptedToSendToServer(false),
37 didFindTableStatus(false),
39 lastSlotAttemptedToSend(NULL),
42 lastTransactionPartsSent(NULL),
43 lastPendingSendArbitrationEntriesToDelete(NULL),
45 committedKeyValueTable(NULL),
46 speculatedKeyValueTable(NULL),
47 pendingTransactionSpeculatedKeyValueTable(NULL),
48 liveNewKeyTable(NULL),
49 lastMessageTable(NULL),
50 rejectedMessageWatchVectorTable(NULL),
51 arbitratorTable(NULL),
53 newTransactionParts(NULL),
55 lastArbitratedTransactionNumberByArbitratorTable(NULL),
56 liveTransactionBySequenceNumberTable(NULL),
57 liveTransactionByTransactionIdTable(NULL),
58 liveCommitsTable(NULL),
59 liveCommitsByKeyTable(NULL),
60 lastCommitSeenSequenceNumberByArbitratorTable(NULL),
61 rejectedSlotVector(NULL),
62 pendingTransactionQueue(NULL),
63 pendingSendArbitrationRounds(NULL),
64 pendingSendArbitrationEntriesToDelete(NULL),
65 transactionPartsSent(NULL),
66 outstandingTransactionStatus(NULL),
67 liveAbortsGeneratedByLocal(NULL),
68 offlineTransactionsCommittedAndAtServer(NULL),
69 localCommunicationTable(NULL),
70 lastTransactionSeenFromMachineFromServer(NULL),
71 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator(NULL),
72 lastInsertedNewKey(false),
78 Table::Table(CloudComm *_cloud, int64_t _localMachineId) :
82 liveTableStatus(NULL),
83 pendingTransactionBuilder(NULL),
84 lastPendingTransactionSpeculatedOn(NULL),
85 firstPendingTransaction(NULL),
87 bufferResizeThreshold(0),
89 oldestLiveSlotSequenceNumver(1),
90 localMachineId(_localMachineId),
92 localTransactionSequenceNumber(0),
93 lastTransactionSequenceNumberSpeculatedOn(0),
94 oldestTransactionSequenceNumberSpeculatedOn(0),
95 localArbitrationSequenceNumber(0),
96 hadPartialSendToServer(false),
97 attemptedToSendToServer(false),
99 didFindTableStatus(false),
101 lastSlotAttemptedToSend(NULL),
104 lastTransactionPartsSent(NULL),
105 lastPendingSendArbitrationEntriesToDelete(NULL),
107 committedKeyValueTable(NULL),
108 speculatedKeyValueTable(NULL),
109 pendingTransactionSpeculatedKeyValueTable(NULL),
110 liveNewKeyTable(NULL),
111 lastMessageTable(NULL),
112 rejectedMessageWatchVectorTable(NULL),
113 arbitratorTable(NULL),
114 liveAbortTable(NULL),
115 newTransactionParts(NULL),
116 newCommitParts(NULL),
117 lastArbitratedTransactionNumberByArbitratorTable(NULL),
118 liveTransactionBySequenceNumberTable(NULL),
119 liveTransactionByTransactionIdTable(NULL),
120 liveCommitsTable(NULL),
121 liveCommitsByKeyTable(NULL),
122 lastCommitSeenSequenceNumberByArbitratorTable(NULL),
123 rejectedSlotVector(NULL),
124 pendingTransactionQueue(NULL),
125 pendingSendArbitrationRounds(NULL),
126 pendingSendArbitrationEntriesToDelete(NULL),
127 transactionPartsSent(NULL),
128 outstandingTransactionStatus(NULL),
129 liveAbortsGeneratedByLocal(NULL),
130 offlineTransactionsCommittedAndAtServer(NULL),
131 localCommunicationTable(NULL),
132 lastTransactionSeenFromMachineFromServer(NULL),
133 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator(NULL),
134 lastInsertedNewKey(false),
141 * Init all the stuff needed for for table usage
144 // Init helper objects
145 random = new Random();
146 buffer = new SlotBuffer();
149 committedKeyValueTable = new Hashtable<IoTString *, KeyValue *>();
150 speculatedKeyValueTable = new Hashtable<IoTString *, KeyValue *>();
151 pendingTransactionSpeculatedKeyValueTable = new Hashtable<IoTString *, KeyValue *>();
152 liveNewKeyTable = new Hashtable<IoTString *, NewKey *>();
153 lastMessageTable = new Hashtable<int64_t, Pair<int64_t, Liveness *> >();
154 rejectedMessageWatchVectorTable = new Hashtable<int64_t, Hashset<RejectedMessage *> * >();
155 arbitratorTable = new Hashtable<IoTString *, int64_t>();
156 liveAbortTable = new Hashtable<Pair<int64_t, int64_t>, Abort *>();
157 newTransactionParts = new Hashtable<int64_t, Hashtable<Pair<int64_t, int32_t>, TransactionPart *> *>();
158 newCommitParts = new Hashtable<int64_t, Hashtable<Pair<int64_t, int32_t>, CommitPart *> *>();
159 lastArbitratedTransactionNumberByArbitratorTable = new Hashtable<int64_t, int64_t>();
160 liveTransactionBySequenceNumberTable = new Hashtable<int64_t, Transaction *>();
161 liveTransactionByTransactionIdTable = new Hashtable<Pair<int64_t, int64_t>, Transaction *>();
162 liveCommitsTable = new Hashtable<int64_t, Hashtable<int64_t, Commit *> >();
163 liveCommitsByKeyTable = new Hashtable<IoTString *, Commit *>();
164 lastCommitSeenSequenceNumberByArbitratorTable = new Hashtable<int64_t, int64_t>();
165 rejectedSlotVector = new Vector<int64_t>();
166 pendingTransactionQueue = new Vector<Transaction *>();
167 pendingSendArbitrationEntriesToDelete = new Vector<Entry *>();
168 transactionPartsSent = new Hashtable<Transaction *, Vector<int32_t> *>();
169 outstandingTransactionStatus = new Hashtable<int64_t, TransactionStatus *>();
170 liveAbortsGeneratedByLocal = new Hashtable<int64_t, Abort *>();
171 offlineTransactionsCommittedAndAtServer = new Hashset<Pair<int64_t, int64_t> >();
172 localCommunicationTable = new Hashtable<int64_t, Pair<IoTString *, int32_t> >();
173 lastTransactionSeenFromMachineFromServer = new Hashtable<int64_t, int64_t>();
174 pendingSendArbitrationRounds = new Vector<ArbitrationRound *>();
175 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator = new Hashtable<int64_t, int64_t>();
179 numberOfSlots = buffer->capacity();
180 setResizeThreshold();
184 * Initialize the table by inserting a table status as the first entry
185 * into the table status also initialize the crypto stuff.
187 void Table::initTable() {
188 cloud->initSecurity();
190 // Create the first insertion into the block chain which is the table status
191 Slot *s = new Slot(this, 1, localMachineId, localSequenceNumber);
192 localSequenceNumber++;
193 TableStatus *status = new TableStatus(s, numberOfSlots);
195 Array<Slot *> *array = cloud->putSlot(s, numberOfSlots);
198 array = new Array<Slot *>(1);
200 // update local block chain
201 validateAndUpdate(array, true);
202 } else if (array->length() == 1) {
203 // in case we did push the slot BUT we failed to init it
204 validateAndUpdate(array, true);
206 throw new Error("Error on initialization");
211 * Rebuild the table from scratch by pulling the latest block chain
214 void Table::rebuild() {
215 // Just pull the latest slots from the server
216 Array<Slot *> *newslots = cloud->getSlots(sequenceNumber + 1);
217 validateAndUpdate(newslots, true);
219 updateLiveTransactionsAndStatus();
222 void Table::addLocalCommunication(int64_t arbitrator, IoTString *hostName, int portNumber) {
223 localCommunicationTable->put(arbitrator, Pair<IoTString *, int32_t>(hostName, portNumber));
226 int64_t Table::getArbitrator(IoTString *key) {
227 return arbitratorTable->get(key);
230 void Table::close() {
234 IoTString *Table::getCommitted(IoTString *key) {
235 KeyValue *kv = committedKeyValueTable->get(key);
238 return kv->getValue();
244 IoTString *Table::getSpeculative(IoTString *key) {
245 KeyValue *kv = pendingTransactionSpeculatedKeyValueTable->get(key);
248 kv = speculatedKeyValueTable->get(key);
252 kv = committedKeyValueTable->get(key);
256 return kv->getValue();
262 IoTString *Table::getCommittedAtomic(IoTString *key) {
263 KeyValue *kv = committedKeyValueTable->get(key);
265 if (!arbitratorTable->contains(key)) {
266 throw new Error("Key not Found.");
269 // Make sure new key value pair matches the current arbitrator
270 if (!pendingTransactionBuilder->checkArbitrator(arbitratorTable->get(key))) {
271 // TODO: Maybe not throw en error
272 throw new Error("Not all Key Values Match Arbitrator.");
276 pendingTransactionBuilder->addKVGuard(new KeyValue(key, kv->getValue()));
277 return kv->getValue();
279 pendingTransactionBuilder->addKVGuard(new KeyValue(key, NULL));
284 IoTString *Table::getSpeculativeAtomic(IoTString *key) {
285 if (!arbitratorTable->contains(key)) {
286 throw new Error("Key not Found.");
289 // Make sure new key value pair matches the current arbitrator
290 if (!pendingTransactionBuilder->checkArbitrator(arbitratorTable->get(key))) {
291 // TODO: Maybe not throw en error
292 throw new Error("Not all Key Values Match Arbitrator.");
295 KeyValue *kv = pendingTransactionSpeculatedKeyValueTable->get(key);
298 kv = speculatedKeyValueTable->get(key);
302 kv = committedKeyValueTable->get(key);
306 pendingTransactionBuilder->addKVGuard(new KeyValue(key, kv->getValue()));
307 return kv->getValue();
309 pendingTransactionBuilder->addKVGuard(new KeyValue(key, NULL));
314 bool Table::update() {
316 Array<Slot *> *newSlots = cloud->getSlots(sequenceNumber + 1);
317 validateAndUpdate(newSlots, false);
319 updateLiveTransactionsAndStatus();
321 } catch (Exception *e) {
322 for (int64_t m : localCommunicationTable->keySet()) {
330 bool Table::createNewKey(IoTString *keyName, int64_t machineId) {
332 if (!arbitratorTable->contains(keyName)) {
333 // There is already an arbitrator
336 NewKey *newKey = new NewKey(NULL, keyName, machineId);
338 if (sendToServer(newKey)) {
339 // If successfully inserted
345 void Table::startTransaction() {
346 // Create a new transaction, invalidates any old pending transactions.
347 pendingTransactionBuilder = new PendingTransaction(localMachineId);
350 void Table::addKV(IoTString *key, IoTString *value) {
352 // Make sure it is a valid key
353 if (!arbitratorTable->contains(key)) {
354 throw new Error("Key not Found.");
357 // Make sure new key value pair matches the current arbitrator
358 if (!pendingTransactionBuilder->checkArbitrator(arbitratorTable->get(key))) {
359 // TODO: Maybe not throw en error
360 throw new Error("Not all Key Values Match Arbitrator.");
363 // Add the key value to this transaction
364 KeyValue *kv = new KeyValue(key, value);
365 pendingTransactionBuilder->addKV(kv);
368 TransactionStatus *Table::commitTransaction() {
369 if (pendingTransactionBuilder->getKVUpdates()->size() == 0) {
370 // transaction with no updates will have no effect on the system
371 return new TransactionStatus(TransactionStatus_StatusNoEffect, -1);
374 // Set the local transaction sequence number and increment
375 pendingTransactionBuilder->setClientLocalSequenceNumber(localTransactionSequenceNumber);
376 localTransactionSequenceNumber++;
378 // Create the transaction status
379 TransactionStatus *transactionStatus = new TransactionStatus(TransactionStatus_StatusPending, pendingTransactionBuilder->getArbitrator());
381 // Create the new transaction
382 Transaction *newTransaction = pendingTransactionBuilder->createTransaction();
383 newTransaction->setTransactionStatus(transactionStatus);
385 if (pendingTransactionBuilder->getArbitrator() != localMachineId) {
386 // Add it to the queue and invalidate the builder for safety
387 pendingTransactionQueue->add(newTransaction);
389 arbitrateOnLocalTransaction(newTransaction);
390 updateLiveStateFromLocal();
393 pendingTransactionBuilder = new PendingTransaction(localMachineId);
397 } catch (ServerException *e) {
399 Hashset<int64_t> *arbitratorTriedAndFailed = new Hashset<int64_t>();
400 for (Iterator<Transaction *> *iter = pendingTransactionQueue->iterator(); iter->hasNext(); ) {
401 Transaction *transaction = iter->next();
403 if (arbitratorTriedAndFailed->contains(transaction->getArbitrator())) {
404 // Already contacted this client so ignore all attempts to contact this client
405 // to preserve ordering for arbitrator
409 Pair<bool, bool> sendReturn = sendTransactionToLocal(transaction);
411 if (sendReturn->getFirst()) {
412 // Failed to contact over local
413 arbitratorTriedAndFailed->add(transaction->getArbitrator());
415 // Successful contact or should not contact
417 if (sendReturn->getSecond()) {
425 updateLiveStateFromLocal();
427 return transactionStatus;
431 * Recalculate the new resize threshold
433 void Table::setResizeThreshold() {
434 int resizeLower = (int) (Table_RESIZE_THRESHOLD * numberOfSlots);
435 bufferResizeThreshold = resizeLower - 1 + random->nextInt(numberOfSlots - resizeLower);
438 int64_t Table::getLocalSequenceNumber() {
439 return localSequenceNumber;
442 bool Table::sendToServer(NewKey *newKey) {
443 bool fromRetry = false;
445 if (hadPartialSendToServer) {
446 Array<Slot *> *newSlots = cloud->getSlots(sequenceNumber + 1);
447 if (newSlots->length() == 0) {
449 ThreeTuple<bool, bool, Array<Slot *> *> sendSlotsReturn = sendSlotsToServer(lastSlotAttemptedToSend, lastNewSize, lastIsNewKey);
451 if (sendSlotsReturn->getFirst()) {
452 if (newKey != NULL) {
453 if (lastInsertedNewKey && (lastNewKey->getKey() == newKey->getKey()) && (lastNewKey->getMachineID() == newKey->getMachineID())) {
458 for (Transaction *transaction : lastTransactionPartsSent->keySet()) {
459 transaction->resetServerFailure();
460 // Update which transactions parts still need to be sent
461 transaction->removeSentParts(lastTransactionPartsSent->get(transaction));
462 // Add the transaction status to the outstanding list
463 outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus());
465 // Update the transaction status
466 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial);
468 // Check if all the transaction parts were successfully
469 // sent and if so then remove it from pending
470 if (transaction->didSendAllParts()) {
471 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully);
472 pendingTransactionQueue->remove(transaction);
476 newSlots = sendSlotsReturn->getThird();
477 bool isInserted = false;
478 for (uint si = 0; si < newSlots->length(); si++) {
479 Slot *s = newSlots->get(si);
480 if ((s->getSequenceNumber() == lastSlotAttemptedToSend->getSequenceNumber()) && (s->getMachineID() == localMachineId)) {
486 for (uint si = 0; si < newSlots->length(); si++) {
487 Slot *s = newSlots->get(si);
492 // Process each entry in the slot
493 for (Entry *entry : s->getEntries()) {
494 if (entry->getType() == TypeLastMessage) {
495 LastMessage *lastMessage = (LastMessage *)entry;
496 if ((lastMessage->getMachineID() == localMachineId) && (lastMessage->getSequenceNumber() == lastSlotAttemptedToSend->getSequenceNumber())) {
505 if (newKey != NULL) {
506 if (lastInsertedNewKey && (lastNewKey->getKey() == newKey->getKey()) && (lastNewKey->getMachineID() == newKey->getMachineID())) {
511 for (Transaction *transaction : lastTransactionPartsSent->keySet()) {
512 transaction->resetServerFailure();
514 // Update which transactions parts still need to be sent
515 transaction->removeSentParts(lastTransactionPartsSent->get(transaction));
517 // Add the transaction status to the outstanding list
518 outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus());
520 // Update the transaction status
521 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial);
523 // Check if all the transaction parts were successfully sent and if so then remove it from pending
524 if (transaction->didSendAllParts()) {
525 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully);
526 pendingTransactionQueue->remove(transaction);
528 transaction->resetServerFailure();
529 // Set the transaction sequence number back to nothing
530 if (!transaction->didSendAPartToServer()) {
531 transaction->setSequenceNumber(-1);
538 for (Transaction *transaction : lastTransactionPartsSent->keySet()) {
539 transaction->resetServerFailure();
540 // Set the transaction sequence number back to nothing
541 if (!transaction->didSendAPartToServer()) {
542 transaction->setSequenceNumber(-1);
546 if (sendSlotsReturn->getThird()->length() != 0) {
547 // insert into the local block chain
548 validateAndUpdate(sendSlotsReturn->getThird(), true);
552 bool isInserted = false;
553 for (uint si = 0; si < newSlots->length(); si++) {
554 Slot *s = newSlots->get(si);
555 if ((s->getSequenceNumber() == lastSlotAttemptedToSend->getSequenceNumber()) && (s->getMachineID() == localMachineId)) {
561 for (uint si = 0; si < newSlots->length(); si++) {
562 Slot *s = newSlots->get(si);
567 // Process each entry in the slot
568 for (Entry *entry : s->getEntries()) {
570 if (entry->getType() == TypeLastMessage) {
571 LastMessage *lastMessage = (LastMessage *)entry;
572 if ((lastMessage->getMachineID() == localMachineId) && (lastMessage->getSequenceNumber() == lastSlotAttemptedToSend->getSequenceNumber())) {
581 if (newKey != NULL) {
582 if (lastInsertedNewKey && (lastNewKey->getKey() == newKey->getKey()) && (lastNewKey->getMachineID() == newKey->getMachineID())) {
587 for (Transaction *transaction : lastTransactionPartsSent->keySet()) {
588 transaction->resetServerFailure();
590 // Update which transactions parts still need to be sent
591 transaction->removeSentParts(lastTransactionPartsSent->get(transaction));
593 // Add the transaction status to the outstanding list
594 outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus());
596 // Update the transaction status
597 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial);
599 // Check if all the transaction parts were successfully sent and if so then remove it from pending
600 if (transaction->didSendAllParts()) {
601 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully);
602 pendingTransactionQueue->remove(transaction);
604 transaction->resetServerFailure();
605 // Set the transaction sequence number back to nothing
606 if (!transaction->didSendAPartToServer()) {
607 transaction->setSequenceNumber(-1);
612 for (Transaction *transaction : lastTransactionPartsSent->keySet()) {
613 transaction->resetServerFailure();
614 // Set the transaction sequence number back to nothing
615 if (!transaction->didSendAPartToServer()) {
616 transaction->setSequenceNumber(-1);
621 // insert into the local block chain
622 validateAndUpdate(newSlots, true);
625 } catch (ServerException *e) {
632 // While we have stuff that needs inserting into the block chain
633 while ((pendingTransactionQueue->size() > 0) || (pendingSendArbitrationRounds->size() > 0) || (newKey != NULL)) {
637 if (hadPartialSendToServer) {
638 throw new Error("Should Be error free");
643 // If there is a new key with same name then end
644 if ((newKey != NULL) && arbitratorTable->contains(newKey->getKey())) {
649 Slot *slot = new Slot(this, sequenceNumber + 1, localMachineId, buffer->getSlot(sequenceNumber)->getHMAC(), localSequenceNumber);
650 localSequenceNumber++;
652 // Try to fill the slot with data
653 ThreeTuple<bool, int32_t, bool> fillSlotsReturn = fillSlot(slot, false, newKey);
654 bool needsResize = fillSlotsReturn->getFirst();
655 int newSize = fillSlotsReturn->getSecond();
656 bool insertedNewKey = fillSlotsReturn->getThird();
659 // Reset which transaction to send
660 for (Transaction *transaction : transactionPartsSent->keySet()) {
661 transaction->resetNextPartToSend();
663 // Set the transaction sequence number back to nothing
664 if (!transaction->didSendAPartToServer() && !transaction->getServerFailure()) {
665 transaction->setSequenceNumber(-1);
669 // Clear the sent data since we are trying again
670 pendingSendArbitrationEntriesToDelete->clear();
671 transactionPartsSent->clear();
673 // We needed a resize so try again
674 fillSlot(slot, true, newKey);
677 lastSlotAttemptedToSend = slot;
678 lastIsNewKey = (newKey != NULL);
679 lastInsertedNewKey = insertedNewKey;
680 lastNewSize = newSize;
682 lastTransactionPartsSent = new Hashtable<Transaction *, Vector<int32_t> * >(transactionPartsSent);
683 lastPendingSendArbitrationEntriesToDelete = new Vector<Entry *>(pendingSendArbitrationEntriesToDelete);
686 ThreeTuple<bool, bool, Array<Slot *> *> sendSlotsReturn = sendSlotsToServer(slot, newSize, newKey != NULL);
688 if (sendSlotsReturn->getFirst()) {
690 // Did insert into the block chain
692 if (insertedNewKey) {
693 // This slot was what was inserted not a previous slot
695 // New Key was successfully inserted into the block chain so dont want to insert it again
699 // Remove the aborts and commit parts that were sent from the pending to send queue
700 for (Iterator<ArbitrationRound *> *iter = pendingSendArbitrationRounds->iterator(); iter->hasNext(); ) {
701 ArbitrationRound *round = iter->next();
702 round->removeParts(pendingSendArbitrationEntriesToDelete);
704 if (round->isDoneSending()) {
705 // Sent all the parts
710 for (Transaction *transaction : transactionPartsSent->keySet()) {
711 transaction->resetServerFailure();
713 // Update which transactions parts still need to be sent
714 transaction->removeSentParts(transactionPartsSent->get(transaction));
716 // Add the transaction status to the outstanding list
717 outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus());
719 // Update the transaction status
720 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial);
722 // Check if all the transaction parts were successfully sent and if so then remove it from pending
723 if (transaction->didSendAllParts()) {
724 transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully);
725 pendingTransactionQueue->remove(transaction);
729 // Reset which transaction to send
730 for (Transaction *transaction : transactionPartsSent->keySet()) {
731 transaction->resetNextPartToSend();
733 // Set the transaction sequence number back to nothing
734 if (!transaction->didSendAPartToServer() && !transaction->getServerFailure()) {
735 transaction->setSequenceNumber(-1);
740 // Clear the sent data in preparation for next send
741 pendingSendArbitrationEntriesToDelete->clear();
742 transactionPartsSent->clear();
744 if (sendSlotsReturn->getThird()->length() != 0) {
745 // insert into the local block chain
746 validateAndUpdate(sendSlotsReturn->getThird(), true);
750 } catch (ServerException *e) {
752 if (e->getType() != ServerException->TypeInputTimeout) {
753 // Nothing was able to be sent to the server so just clear these data structures
754 for (Transaction *transaction : transactionPartsSent->keySet()) {
755 transaction->resetNextPartToSend();
757 // Set the transaction sequence number back to nothing
758 if (!transaction->didSendAPartToServer() && !transaction->getServerFailure()) {
759 transaction->setSequenceNumber(-1);
763 // There was a partial send to the server
764 hadPartialSendToServer = true;
766 // Nothing was able to be sent to the server so just clear these data structures
767 for (Transaction *transaction : transactionPartsSent->keySet()) {
768 transaction->resetNextPartToSend();
769 transaction->setServerFailure();
773 pendingSendArbitrationEntriesToDelete->clear();
774 transactionPartsSent->clear();
779 return newKey == NULL;
782 bool Table::updateFromLocal(int64_t machineId) {
783 Pair<IoTString *, int32_t> localCommunicationInformation = localCommunicationTable->get(machineId);
784 if (localCommunicationInformation == NULL) {
785 // Cant talk to that device locally so do nothing
789 // Get the size of the send data
790 int sendDataSize = sizeof(int32_t) + sizeof(int64_t);
792 int64_t lastArbitrationDataLocalSequenceNumber = (int64_t) -1;
793 if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(machineId) != NULL) {
794 lastArbitrationDataLocalSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(machineId);
797 Array<char> *sendData = new Array<char>(sendDataSize);
798 ByteBuffer *bbEncode = ByteBuffer_wrap(sendData);
801 bbEncode->putLong(lastArbitrationDataLocalSequenceNumber);
805 Array<char> *returnData = cloud->sendLocalData(sendData, localSequenceNumber, localCommunicationInformation->getFirst(), localCommunicationInformation->getSecond());
806 localSequenceNumber++;
808 if (returnData == NULL) {
809 // Could not contact server
814 ByteBuffer *bbDecode = ByteBuffer_wrap(returnData);
815 int numberOfEntries = bbDecode->getInt();
817 for (int i = 0; i < numberOfEntries; i++) {
818 char type = bbDecode->get();
819 if (type == TypeAbort) {
820 Abort *abort = (Abort)Abort_decode(NULL, bbDecode);
822 } else if (type == TypeCommitPart) {
823 CommitPart *commitPart = (CommitPart)CommitPart_decode(NULL, bbDecode);
824 processEntry(commitPart);
828 updateLiveStateFromLocal();
833 Pair<bool, bool> Table::sendTransactionToLocal(Transaction *transaction) {
835 // Get the devices local communications
836 Pair<IoTString *, int32_t> localCommunicationInformation = localCommunicationTable->get(transaction->getArbitrator());
838 if (localCommunicationInformation == NULL) {
839 // Cant talk to that device locally so do nothing
840 return Pair<bool, bool>(true, false);
843 // Get the size of the send data
844 int sendDataSize = sizeof(int32_t) + sizeof(int64_t);
845 for (TransactionPart *part : transaction->getParts()->values()) {
846 sendDataSize += part->getSize();
849 int64_t lastArbitrationDataLocalSequenceNumber = (int64_t) -1;
850 if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(transaction->getArbitrator()) != NULL) {
851 lastArbitrationDataLocalSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(transaction->getArbitrator());
854 // Make the send data size
855 Array<char> *sendData = new Array<char>(sendDataSize);
856 ByteBuffer *bbEncode = ByteBuffer.wrap(sendData);
859 bbEncode->putLong(lastArbitrationDataLocalSequenceNumber);
860 bbEncode->putInt(transaction->getParts()->size());
861 for (TransactionPart *part : transaction->getParts()->values()) {
862 part->encode(bbEncode);
867 Array<char> *returnData = cloud->sendLocalData(sendData, localSequenceNumber, localCommunicationInformation->getFirst(), localCommunicationInformation->getSecond());
868 localSequenceNumber++;
870 if (returnData == NULL) {
871 // Could not contact server
872 return Pair<bool, bool>(true, false);
876 ByteBuffer *bbDecode = ByteBuffer_wrap(returnData);
877 bool didCommit = bbDecode->get() == 1;
878 bool couldArbitrate = bbDecode->get() == 1;
879 int numberOfEntries = bbDecode->getInt();
880 bool foundAbort = false;
882 for (int i = 0; i < numberOfEntries; i++) {
883 char type = bbDecode->get();
884 if (type == TypeAbort) {
885 Abort *abort = (Abort)Abort_decode(NULL, bbDecode);
887 if ((abort->getTransactionMachineId() == localMachineId) && (abort->getTransactionClientLocalSequenceNumber() == transaction->getClientLocalSequenceNumber())) {
892 } else if (type == TypeCommitPart) {
893 CommitPart *commitPart = (CommitPart)CommitPart_decode(NULL, bbDecode);
894 processEntry(commitPart);
898 updateLiveStateFromLocal();
900 if (couldArbitrate) {
901 TransactionStatus status = transaction->getTransactionStatus();
903 status->setStatus(TransactionStatus_StatusCommitted);
905 status->setStatus(TransactionStatus_StatusAborted);
908 TransactionStatus status = transaction->getTransactionStatus();
910 status->setStatus(TransactionStatus_StatusAborted);
912 status->setStatus(TransactionStatus_StatusCommitted);
916 return Pair<bool, bool>(false, true);
919 Array<char> *Table::acceptDataFromLocal(Array<char> *data) {
922 ByteBuffer *bbDecode = ByteBuffer_wrap(data);
923 int64_t lastArbitratedSequenceNumberSeen = bbDecode->getLong();
924 int numberOfParts = bbDecode->getInt();
926 // If we did commit a transaction or not
927 bool didCommit = false;
928 bool couldArbitrate = false;
930 if (numberOfParts != 0) {
932 // decode the transaction
933 Transaction *transaction = new Transaction();
934 for (int i = 0; i < numberOfParts; i++) {
936 TransactionPart *newPart = (TransactionPart)TransactionPart.decode(NULL, bbDecode);
937 transaction->addPartDecode(newPart);
940 // Arbitrate on transaction and pull relevant return data
941 Pair<bool, bool> localArbitrateReturn = arbitrateOnLocalTransaction(transaction);
942 couldArbitrate = localArbitrateReturn->getFirst();
943 didCommit = localArbitrateReturn->getSecond();
945 updateLiveStateFromLocal();
947 // Transaction was sent to the server so keep track of it to prevent double commit
948 if (transaction->getSequenceNumber() != -1) {
949 offlineTransactionsCommittedAndAtServer->add(transaction->getId());
953 // The data to send back
954 int returnDataSize = 0;
955 Vector<Entry *> *unseenArbitrations = new Vector<Entry *>();
957 // Get the aborts to send back
958 Vector<int64_t> *abortLocalSequenceNumbers = new Vector<int64_t>(liveAbortsGeneratedByLocal->keySet());
959 Collections->sort(abortLocalSequenceNumbers);
960 for (int64_t localSequenceNumber : abortLocalSequenceNumbers) {
961 if (localSequenceNumber <= lastArbitratedSequenceNumberSeen) {
965 Abort *abort = liveAbortsGeneratedByLocal->get(localSequenceNumber);
966 unseenArbitrations->add(abort);
967 returnDataSize += abort->getSize();
970 // Get the commits to send back
971 Hashtable<int64_t, Commit *> *commitForClientTable = liveCommitsTable->get(localMachineId);
972 if (commitForClientTable != NULL) {
973 Vector<int64_t> *commitLocalSequenceNumbers = new Vector<int64_t>(commitForClientTable->keySet());
974 Collections->sort(commitLocalSequenceNumbers);
976 for (int64_t localSequenceNumber : commitLocalSequenceNumbers) {
977 Commit *commit = commitForClientTable->get(localSequenceNumber);
979 if (localSequenceNumber <= lastArbitratedSequenceNumberSeen) {
983 unseenArbitrations->addAll(commit->getParts()->values());
985 for (CommitPart commitPart : commit->getParts()->values()) {
986 returnDataSize += commitPart->getSize();
991 // Number of arbitration entries to decode
992 returnDataSize += 2 * sizeof(int32_t);
994 // bool of did commit or not
995 if (numberOfParts != 0) {
996 returnDataSize += sizeof(char);
1000 Array<char> *returnData = new Array<char>(returnDataSize);
1001 ByteBuffer *bbEncode = ByteBuffer_wrap(returnData);
1003 if (numberOfParts != 0) {
1005 bbEncode->put((char)1);
1007 bbEncode->put((char)0);
1009 if (couldArbitrate) {
1010 bbEncode->put((char)1);
1012 bbEncode->put((char)0);
1016 bbEncode->putInt(unseenArbitrations->size());
1017 for (Entry *entry : unseenArbitrations) {
1018 entry->encode(bbEncode);
1022 localSequenceNumber++;
1026 ThreeTuple<bool, bool, Array<Slot *> *> Table::sendSlotsToServer(Slot *slot, int newSize, bool isNewKey) {
1027 bool attemptedToSendToServerTmp = attemptedToSendToServer;
1028 attemptedToSendToServer = true;
1030 bool inserted = false;
1031 bool lastTryInserted = false;
1033 Array<Slot *> *array = cloud->putSlot(slot, newSize);
1034 if (array == NULL) {
1035 array = new Array<Slot *>();
1036 array->set(0, slot);
1037 rejectedSlotVector->clear();
1040 if (array->length() == 0) {
1041 throw new Error("Server Error: Did not send any slots");
1044 // if (attemptedToSendToServerTmp) {
1045 if (hadPartialSendToServer) {
1047 bool isInserted = false;
1048 for (Slot *s : array) {
1049 if ((s->getSequenceNumber() == slot->getSequenceNumber()) && (s->getMachineID() == localMachineId)) {
1055 for (Slot *s : array) {
1060 // Process each entry in the slot
1061 for (Entry *entry : s->getEntries()) {
1063 if (entry->getType() == TypeLastMessage) {
1064 LastMessage *lastMessage = (LastMessage *)entry;
1066 if ((lastMessage->getMachineID() == localMachineId) && (lastMessage->getSequenceNumber() == slot->getSequenceNumber())) {
1075 rejectedSlotVector->add(slot->getSequenceNumber());
1076 lastTryInserted = false;
1078 lastTryInserted = true;
1081 rejectedSlotVector->add(slot->getSequenceNumber());
1082 lastTryInserted = false;
1086 return ThreeTuple<bool, bool, Array<Slot *> *>(inserted, lastTryInserted, array);
1090 * Returns false if a resize was needed
1092 ThreeTuple<bool, int32_t, bool> Table::fillSlot(Slot *slot, bool resize, NewKey *newKeyEntry) {
1094 if (liveSlotCount > bufferResizeThreshold) {
1095 resize = true; //Resize is forced
1099 newSize = (int) (numberOfSlots * Table_RESIZE_MULTIPLE);
1100 TableStatus *status = new TableStatus(slot, newSize);
1101 slot->addEntry(status);
1104 // Fill with rejected slots first before doing anything else
1105 doRejectedMessages(slot);
1107 // Do mandatory rescue of entries
1108 ThreeTuple<bool, bool, int64_t> mandatoryRescueReturn = doMandatoryResuce(slot, resize);
1110 // Extract working variables
1111 bool needsResize = mandatoryRescueReturn->getFirst();
1112 bool seenLiveSlot = mandatoryRescueReturn->getSecond();
1113 int64_t currentRescueSequenceNumber = mandatoryRescueReturn->getThird();
1115 if (needsResize && !resize) {
1116 // We need to resize but we are not resizing so return false
1117 return ThreeTuple<bool, int32_t, bool>(true, NULL, NULL);
1120 bool inserted = false;
1121 if (newKeyEntry != NULL) {
1122 newKeyEntry->setSlot(slot);
1123 if (slot->hasSpace(newKeyEntry)) {
1124 slot->addEntry(newKeyEntry);
1129 // Clear the transactions, aborts and commits that were sent previously
1130 transactionPartsSent->clear();
1131 pendingSendArbitrationEntriesToDelete->clear();
1133 for (ArbitrationRound *round : pendingSendArbitrationRounds) {
1134 bool isFull = false;
1135 round->generateParts();
1136 Vector<Entry *> *parts = round->getParts();
1138 // Insert pending arbitration data
1139 for (Entry *arbitrationData : parts) {
1141 // If it is an abort then we need to set some information
1142 if (arbitrationData instanceof Abort) {
1143 ((Abort *)arbitrationData)->setSequenceNumber(slot->getSequenceNumber());
1146 if (!slot->hasSpace(arbitrationData)) {
1147 // No space so cant do anything else with these data entries
1152 // Add to this current slot and add it to entries to delete
1153 slot->addEntry(arbitrationData);
1154 pendingSendArbitrationEntriesToDelete->add(arbitrationData);
1162 if (pendingTransactionQueue->size() > 0) {
1163 Transaction *transaction = pendingTransactionQueue->get(0);
1164 // Set the transaction sequence number if it has yet to be inserted into the block chain
1165 if ((!transaction->didSendAPartToServer()) || (transaction->getSequenceNumber() == -1)) {
1166 transaction->setSequenceNumber(slot->getSequenceNumber());
1170 TransactionPart *part = transaction->getNextPartToSend();
1172 // Ran out of parts to send for this transaction so move on
1176 if (slot->hasSpace(part)) {
1177 slot->addEntry(part);
1178 Vector<int32_t> *partsSent = transactionPartsSent->get(transaction);
1179 if (partsSent == NULL) {
1180 partsSent = new Vector<int32_t>();
1181 transactionPartsSent->put(transaction, partsSent);
1183 partsSent->add(part->getPartNumber());
1184 transactionPartsSent->put(transaction, partsSent);
1191 // Fill the remainder of the slot with rescue data
1192 doOptionalRescue(slot, seenLiveSlot, currentRescueSequenceNumber, resize);
1194 return ThreeTuple<bool, int32_t, bool>(false, newSize, inserted);
1197 void Table::doRejectedMessages(Slot *s) {
1198 if (!rejectedSlotVector->isEmpty()) {
1199 /* TODO: We should avoid generating a rejected message entry if
1200 * there is already a sufficient entry in the queue (e->g->,
1201 * equalsto value of true and same sequence number)-> */
1203 int64_t old_seqn = rejectedSlotVector->firstElement();
1204 if (rejectedSlotVector->size() > Table_REJECTED_THRESHOLD) {
1205 int64_t new_seqn = rejectedSlotVector->lastElement();
1206 RejectedMessage *rm = new RejectedMessage(s, s->getSequenceNumber(), localMachineId, old_seqn, new_seqn, false);
1209 int64_t prev_seqn = -1;
1211 /* Go through list of missing messages */
1212 for (; i < rejectedSlotVector->size(); i++) {
1213 int64_t curr_seqn = rejectedSlotVector->get(i);
1214 Slot *s_msg = buffer->getSlot(curr_seqn);
1217 prev_seqn = curr_seqn;
1219 /* Generate rejected message entry for missing messages */
1220 if (prev_seqn != -1) {
1221 RejectedMessage *rm = new RejectedMessage(s, s->getSequenceNumber(), localMachineId, old_seqn, prev_seqn, false);
1224 /* Generate rejected message entries for present messages */
1225 for (; i < rejectedSlotVector->size(); i++) {
1226 int64_t curr_seqn = rejectedSlotVector->get(i);
1227 Slot *s_msg = buffer->getSlot(curr_seqn);
1228 int64_t machineid = s_msg->getMachineID();
1229 RejectedMessage *rm = new RejectedMessage(s, s->getSequenceNumber(), machineid, curr_seqn, curr_seqn, true);
1236 ThreeTuple<bool, bool, int64_t> Table::doMandatoryResuce(Slot *slot, bool resize) {
1237 int64_t newestSequenceNumber = buffer->getNewestSeqNum();
1238 int64_t oldestSequenceNumber = buffer->getOldestSeqNum();
1239 if (oldestLiveSlotSequenceNumver < oldestSequenceNumber) {
1240 oldestLiveSlotSequenceNumver = oldestSequenceNumber;
1243 int64_t currentSequenceNumber = oldestLiveSlotSequenceNumver;
1244 bool seenLiveSlot = false;
1245 int64_t firstIfFull = newestSequenceNumber + 1 - numberOfSlots; // smallest seq number in the buffer if it is full
1246 int64_t threshold = firstIfFull + Table_FREE_SLOTS; // we want the buffer to be clear of live entries up to this point
1250 for (; currentSequenceNumber < threshold; currentSequenceNumber++) {
1251 Slot previousSlot = buffer->getSlot(currentSequenceNumber);
1252 // Push slot number forward
1253 if (!seenLiveSlot) {
1254 oldestLiveSlotSequenceNumver = currentSequenceNumber;
1257 if (!previousSlot->isLive()) {
1261 // We have seen a live slot
1262 seenLiveSlot = true;
1264 // Get all the live entries for a slot
1265 Vector<Entry *> *liveEntries = previousSlot->getLiveEntries(resize);
1267 // Iterate over all the live entries and try to rescue them
1268 for (Entry *liveEntry : liveEntries) {
1269 if (slot->hasSpace(liveEntry)) {
1271 // Enough space to rescue the entry
1272 slot->addEntry(liveEntry);
1273 } else if (currentSequenceNumber == firstIfFull) {
1274 //if there's no space but the entry is about to fall off the queue
1275 System->out->println("B"); //?
1276 return ThreeTuple<bool, bool, int64_t>(true, seenLiveSlot, currentSequenceNumber);
1283 return ThreeTuple<bool, bool, int64_t>(false, seenLiveSlot, currentSequenceNumber);
1286 void Table::doOptionalRescue(Slot *s, bool seenliveslot, int64_t seqn, bool resize) {
1287 /* now go through live entries from least to greatest sequence number until
1288 * either all live slots added, or the slot doesn't have enough room
1289 * for SKIP_THRESHOLD consecutive entries*/
1291 int64_t newestseqnum = buffer->getNewestSeqNum();
1293 for (; seqn <= newestseqnum; seqn++) {
1294 Slot *prevslot = buffer->getSlot(seqn);
1295 //Push slot number forward
1297 oldestLiveSlotSequenceNumver = seqn;
1299 if (!prevslot->isLive())
1301 seenliveslot = true;
1302 Vector<Entry *> *liveentries = prevslot->getLiveEntries(resize);
1303 for (Entry *liveentry : liveentries) {
1304 if (s->hasSpace(liveentry))
1305 s->addEntry(liveentry);
1308 if (skipcount > Table_SKIP_THRESHOLD)
1316 * Checks for malicious activity and updates the local copy of the block chain->
1318 void Table::validateAndUpdate(Array<Slot *> *newSlots, bool acceptUpdatesToLocal) {
1320 // The cloud communication layer has checked slot HMACs already
1322 if (newSlots->length() == 0) {
1326 // Make sure all slots are newer than the last largest slot this
1328 int64_t firstSeqNum = newSlots->get(0)->getSequenceNumber();
1329 if (firstSeqNum <= sequenceNumber) {
1330 throw new Error("Server Error: Sent older slots!");
1333 // Create an object that can access both new slots and slots in our
1334 // local chain without committing slots to our local chain
1335 SlotIndexer *indexer = new SlotIndexer(newSlots, buffer);
1337 // Check that the HMAC chain is not broken
1338 checkHMACChain(indexer, newSlots);
1340 // Set to keep track of messages from clients
1341 Hashset<int64_t> *machineSet = new Hashset<int64_t>(lastMessageTable->keySet());
1343 // Process each slots data
1344 for (Slot *slot : newSlots) {
1345 processSlot(indexer, slot, acceptUpdatesToLocal, machineSet);
1347 updateExpectedSize();
1350 // If there is a gap, check to see if the server sent us
1352 if (firstSeqNum != (sequenceNumber + 1)) {
1354 // Check the size of the slots that were sent down by the server->
1355 // Can only check the size if there was a gap
1356 checkNumSlots(newSlots->length);
1358 // Since there was a gap every machine must have pushed a slot or
1359 // must have a last message message-> If not then the server is
1361 if (!machineSet->isEmpty()) {
1362 throw new Error("Missing record for machines: " + machineSet);
1366 // Update the size of our local block chain->
1369 // Commit new to slots to the local block chain->
1370 for (Slot *slot : newSlots) {
1372 // Insert this slot into our local block chain copy->
1373 buffer->putSlot(slot);
1375 // Keep track of how many slots are currently live (have live data
1380 // Get the sequence number of the latest slot in the system
1381 sequenceNumber = newSlots->get(newSlots->length() - 1)->getSequenceNumber();
1382 updateLiveStateFromServer();
1384 // No Need to remember after we pulled from the server
1385 offlineTransactionsCommittedAndAtServer->clear();
1387 // This is invalidated now
1388 hadPartialSendToServer = false;
1391 void Table::updateLiveStateFromServer() {
1392 // Process the new transaction parts
1393 processNewTransactionParts();
1395 // Do arbitration on new transactions that were received
1396 arbitrateFromServer();
1398 // Update all the committed keys
1399 bool didCommitOrSpeculate = updateCommittedTable();
1401 // Delete the transactions that are now dead
1402 updateLiveTransactionsAndStatus();
1405 didCommitOrSpeculate |= updateSpeculativeTable(didCommitOrSpeculate);
1406 updatePendingTransactionSpeculativeTable(didCommitOrSpeculate);
1409 void Table::updateLiveStateFromLocal() {
1410 // Update all the committed keys
1411 bool didCommitOrSpeculate = updateCommittedTable();
1413 // Delete the transactions that are now dead
1414 updateLiveTransactionsAndStatus();
1417 didCommitOrSpeculate |= updateSpeculativeTable(didCommitOrSpeculate);
1418 updatePendingTransactionSpeculativeTable(didCommitOrSpeculate);
1421 void Table::initExpectedSize(int64_t firstSequenceNumber, int64_t numberOfSlots) {
1422 int64_t prevslots = firstSequenceNumber;
1424 if (didFindTableStatus) {
1426 expectedsize = (prevslots < ((int64_t) numberOfSlots)) ? (int) prevslots : numberOfSlots;
1429 didFindTableStatus = true;
1430 currMaxSize = numberOfSlots;
1433 void Table::updateExpectedSize() {
1436 if (expectedsize > currMaxSize) {
1437 expectedsize = currMaxSize;
1443 * Check the size of the block chain to make sure there are enough
1444 * slots sent back by the server-> This is only called when we have a
1445 * gap between the slots that we have locally and the slots sent by
1446 * the server therefore in the slots sent by the server there will be
1447 * at least 1 Table status message
1449 void Table::checkNumSlots(int numberOfSlots) {
1450 if (numberOfSlots != expectedsize) {
1451 throw new Error("Server Error: Server did not send all slots-> Expected: " + expectedsize + " Received:" + numberOfSlots);
1455 void Table::updateCurrMaxSize(int newmaxsize) {
1456 currMaxSize = newmaxsize;
1461 * Update the size of of the local buffer if it is needed->
1463 void Table::commitNewMaxSize() {
1464 didFindTableStatus = false;
1466 // Resize the local slot buffer
1467 if (numberOfSlots != currMaxSize) {
1468 buffer->resize((int32_t)currMaxSize);
1471 // Change the number of local slots to the new size
1472 numberOfSlots = (int32_t)currMaxSize;
1474 // Recalculate the resize threshold since the size of the local
1475 // buffer has changed
1476 setResizeThreshold();
1480 * Process the new transaction parts from this latest round of slots
1481 * received from the server
1483 void Table::processNewTransactionParts() {
1485 if (newTransactionParts->size() == 0) {
1486 // Nothing new to process
1490 // Iterate through all the machine Ids that we received new parts
1492 for (int64_t machineId : newTransactionParts->keySet()) {
1493 Hashtable<Pair<int64_t int32_t>, TransactionPart *> *parts = newTransactionParts->get(machineId);
1495 // Iterate through all the parts for that machine Id
1496 for (Pair<int64_t, int32_t> partId : parts->keySet()) {
1497 TransactionPart *part = parts->get(partId);
1499 int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(part->getArbitratorId());
1500 if ((lastTransactionNumber != NULL) && (lastTransactionNumber >= part->getSequenceNumber())) {
1501 // Set dead the transaction part
1506 // Get the transaction object for that sequence number
1507 Transaction *transaction = liveTransactionBySequenceNumberTable->get(part->getSequenceNumber());
1509 if (transaction == NULL) {
1510 // This is a new transaction that we dont have so make a new one
1511 transaction = new Transaction();
1513 // Insert this new transaction into the live tables
1514 liveTransactionBySequenceNumberTable->put(part->getSequenceNumber(), transaction);
1515 liveTransactionByTransactionIdTable->put(part->getTransactionId(), transaction);
1518 // Add that part to the transaction
1519 transaction->addPartDecode(part);
1523 // Clear all the new transaction parts in preparation for the next
1524 // time the server sends slots
1525 newTransactionParts->clear();
1528 void Table::arbitrateFromServer() {
1530 if (liveTransactionBySequenceNumberTable->size() == 0) {
1531 // Nothing to arbitrate on so move on
1535 // Get the transaction sequence numbers and sort from oldest to newest
1536 Vector<int64_t> *transactionSequenceNumbers = new Vector<int64_t>(liveTransactionBySequenceNumberTable->keySet());
1537 Collections->sort(transactionSequenceNumbers);
1539 // Collection of key value pairs that are
1540 Hashtable<IoTString *, KeyValue *> speculativeTableTmp = new Hashtable<IoTString *, KeyValue *>();
1542 // The last transaction arbitrated on
1543 int64_t lastTransactionCommitted = -1;
1544 Hashset<Abort *> *generatedAborts = new Hashset<Abort *>();
1546 for (int64_t transactionSequenceNumber : transactionSequenceNumbers) {
1547 Transaction *transaction = liveTransactionBySequenceNumberTable->get(transactionSequenceNumber);
1551 // Check if this machine arbitrates for this transaction if not
1552 // then we cant arbitrate this transaction
1553 if (transaction->getArbitrator() != localMachineId) {
1557 if (transactionSequenceNumber < lastSeqNumArbOn) {
1561 if (offlineTransactionsCommittedAndAtServer->contains(transaction->getId())) {
1562 // We have seen this already locally so dont commit again
1567 if (!transaction->isComplete()) {
1568 // Will arbitrate in incorrect order if we continue so just break
1574 // update the largest transaction seen by arbitrator from server
1575 if (lastTransactionSeenFromMachineFromServer->get(transaction->getMachineId()) == NULL) {
1576 lastTransactionSeenFromMachineFromServer->put(transaction->getMachineId(), transaction->getClientLocalSequenceNumber());
1578 int64_t lastTransactionSeenFromMachine = lastTransactionSeenFromMachineFromServer->get(transaction->getMachineId());
1579 if (transaction->getClientLocalSequenceNumber() > lastTransactionSeenFromMachine) {
1580 lastTransactionSeenFromMachineFromServer->put(transaction->getMachineId(), transaction->getClientLocalSequenceNumber());
1584 if (transaction->evaluateGuard(committedKeyValueTable, speculativeTableTmp, NULL)) {
1585 // Guard evaluated as true
1587 // Update the local changes so we can make the commit
1588 for (KeyValue *kv : transaction->getKeyValueUpdateSet()) {
1589 speculativeTableTmp->put(kv->getKey(), kv);
1592 // Update what the last transaction committed was for use in batch commit
1593 lastTransactionCommitted = transactionSequenceNumber;
1595 // Guard evaluated was false so create abort
1597 Abort *newAbort = new Abort(NULL,
1598 transaction->getClientLocalSequenceNumber(),
1599 transaction->getSequenceNumber(),
1600 transaction->getMachineId(),
1601 transaction->getArbitrator(),
1602 localArbitrationSequenceNumber);
1603 localArbitrationSequenceNumber++;
1604 generatedAborts->add(newAbort);
1606 // Insert the abort so we can process
1607 processEntry(newAbort);
1610 lastSeqNumArbOn = transactionSequenceNumber;
1613 Commit *newCommit = NULL;
1615 // If there is something to commit
1616 if (speculativeTableTmp->size() != 0) {
1617 // Create the commit and increment the commit sequence number
1618 newCommit = new Commit(localArbitrationSequenceNumber, localMachineId, lastTransactionCommitted);
1619 localArbitrationSequenceNumber++;
1621 // Add all the new keys to the commit
1622 for (KeyValue *kv : speculativeTableTmp->values()) {
1623 newCommit->addKV(kv);
1626 // create the commit parts
1627 newCommit->createCommitParts();
1629 // Append all the commit parts to the end of the pending queue
1630 // waiting for sending to the server
1631 // Insert the commit so we can process it
1632 for (CommitPart *commitPart : newCommit->getParts()->values()) {
1633 processEntry(commitPart);
1637 if ((newCommit != NULL) || (generatedAborts->size() > 0)) {
1638 ArbitrationRound *arbitrationRound = new ArbitrationRound(newCommit, generatedAborts);
1639 pendingSendArbitrationRounds->add(arbitrationRound);
1641 if (compactArbitrationData()) {
1642 ArbitrationRound *newArbitrationRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1);
1643 if (newArbitrationRound->getCommit() != NULL) {
1644 for (CommitPart *commitPart : newArbitrationRound->getCommit()->getParts()->values()) {
1645 processEntry(commitPart);
1652 Pair<bool, bool> Table::arbitrateOnLocalTransaction(Transaction *transaction) {
1654 // Check if this machine arbitrates for this transaction if not then
1655 // we cant arbitrate this transaction
1656 if (transaction->getArbitrator() != localMachineId) {
1657 return Pair<bool, bool>(false, false);
1660 if (!transaction->isComplete()) {
1661 // Will arbitrate in incorrect order if we continue so just break
1663 return Pair<bool, bool>(false, false);
1666 if (transaction->getMachineId() != localMachineId) {
1667 // dont do this check for local transactions
1668 if (lastTransactionSeenFromMachineFromServer->get(transaction->getMachineId()) != NULL) {
1669 if (lastTransactionSeenFromMachineFromServer->get(transaction->getMachineId()) > transaction->getClientLocalSequenceNumber()) {
1670 // We've have already seen this from the server
1671 return Pair<bool, bool>(false, false);
1676 if (transaction->evaluateGuard(committedKeyValueTable, NULL, NULL)) {
1677 // Guard evaluated as true Create the commit and increment the
1678 // commit sequence number
1679 Commit *newCommit = new Commit(localArbitrationSequenceNumber, localMachineId, -1);
1680 localArbitrationSequenceNumber++;
1682 // Update the local changes so we can make the commit
1683 for (KeyValue *kv : transaction->getKeyValueUpdateSet()) {
1684 newCommit->addKV(kv);
1687 // create the commit parts
1688 newCommit->createCommitParts();
1690 // Append all the commit parts to the end of the pending queue
1691 // waiting for sending to the server
1692 ArbitrationRound *arbitrationRound = new ArbitrationRound(newCommit, new Hashset<Abort *>());
1693 pendingSendArbitrationRounds->add(arbitrationRound);
1695 if (compactArbitrationData()) {
1696 ArbitrationRound *newArbitrationRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1);
1697 for (CommitPart *commitPart : newArbitrationRound->getCommit()->getParts()->values()) {
1698 processEntry(commitPart);
1701 // Insert the commit so we can process it
1702 for (CommitPart *commitPart : newCommit->getParts()->values()) {
1703 processEntry(commitPart);
1707 if (transaction->getMachineId() == localMachineId) {
1708 TransactionStatus *status = transaction->getTransactionStatus();
1709 if (status != NULL) {
1710 status->setStatus(TransactionStatus_StatusCommitted);
1714 updateLiveStateFromLocal();
1715 return Pair<bool, bool>(true, true);
1717 if (transaction->getMachineId() == localMachineId) {
1718 // For locally created messages update the status
1719 // Guard evaluated was false so create abort
1720 TransactionStatus status = transaction->getTransactionStatus();
1721 if (status != NULL) {
1722 status->setStatus(TransactionStatus_StatusAborted);
1725 Hashset<Abort *> *addAbortSet = new Hashset<Abort * >();
1728 Abort *newAbort = new Abort(NULL,
1729 transaction->getClientLocalSequenceNumber(),
1731 transaction->getMachineId(),
1732 transaction->getArbitrator(),
1733 localArbitrationSequenceNumber);
1734 localArbitrationSequenceNumber++;
1735 addAbortSet->add(newAbort);
1737 // Append all the commit parts to the end of the pending queue
1738 // waiting for sending to the server
1739 ArbitrationRound *arbitrationRound = new ArbitrationRound(NULL, addAbortSet);
1740 pendingSendArbitrationRounds->add(arbitrationRound);
1742 if (compactArbitrationData()) {
1743 ArbitrationRound *newArbitrationRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1);
1744 for (CommitPart *commitPart : newArbitrationRound->getCommit()->getParts()->values()) {
1745 processEntry(commitPart);
1750 updateLiveStateFromLocal();
1751 return Pair<bool, bool>(true, false);
1756 * Compacts the arbitration data my merging commits and aggregating
1757 * aborts so that a single large push of commits can be done instead
1758 * of many small updates
1760 bool Table::compactArbitrationData() {
1761 if (pendingSendArbitrationRounds->size() < 2) {
1762 // Nothing to compact so do nothing
1766 ArbitrationRound *lastRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1);
1767 if (lastRound->didSendPart()) {
1771 bool hadCommit = (lastRound->getCommit() == NULL);
1772 bool gotNewCommit = false;
1774 int numberToDelete = 1;
1775 while (numberToDelete < pendingSendArbitrationRounds->size()) {
1776 ArbitrationRound *xs round = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - numberToDelete - 1);
1778 if (round->isFull() || round->didSendPart()) {
1779 // Stop since there is a part that cannot be compacted and we
1780 // need to compact in order
1784 if (round->getCommit() == NULL) {
1785 // Try compacting aborts only
1786 int newSize = round->getCurrentSize() + lastRound->getAbortsCount();
1787 if (newSize > ArbitrationRound->MAX_PARTS) {
1788 // Cant compact since it would be too large
1791 lastRound->addAborts(round->getAborts());
1793 // Create a new larger commit
1794 Commit newCommit = Commit->merge(lastRound->getCommit(), round->getCommit(), localArbitrationSequenceNumber);
1795 localArbitrationSequenceNumber++;
1797 // Create the commit parts so that we can count them
1798 newCommit->createCommitParts();
1800 // Calculate the new size of the parts
1801 int newSize = newCommit->getNumberOfParts();
1802 newSize += lastRound->getAbortsCount();
1803 newSize += round->getAbortsCount();
1805 if (newSize > ArbitrationRound->MAX_PARTS) {
1806 // Cant compact since it would be too large
1810 // Set the new compacted part
1811 lastRound->setCommit(newCommit);
1812 lastRound->addAborts(round->getAborts());
1813 gotNewCommit = true;
1819 if (numberToDelete != 1) {
1820 // If there is a compaction
1821 // Delete the previous pieces that are now in the new compacted piece
1822 if (numberToDelete == pendingSendArbitrationRounds->size()) {
1823 pendingSendArbitrationRounds->clear();
1825 for (int i = 0; i < numberToDelete; i++) {
1826 pendingSendArbitrationRounds->remove(pendingSendArbitrationRounds->size() - 1);
1830 // Add the new compacted into the pending to send list
1831 pendingSendArbitrationRounds->add(lastRound);
1833 // Should reinsert into the commit processor
1834 if (hadCommit && gotNewCommit) {
1843 * Update all the commits and the committed tables, sets dead the dead
1846 bool Table::updateCommittedTable() {
1848 if (newCommitParts->size() == 0) {
1849 // Nothing new to process
1853 // Iterate through all the machine Ids that we received new parts for
1854 for (int64_t machineId : newCommitParts->keySet()) {
1855 Hashtable<Pair<int64_t, int32_t>, CommitPart *> *parts = newCommitParts->get(machineId);
1857 // Iterate through all the parts for that machine Id
1858 for (Pair<int64_t, int32_t> partId : parts->keySet()) {
1859 CommitPart *part = parts->get(partId);
1861 // Get the transaction object for that sequence number
1862 Hashtable<int64_t, Commit *> *commitForClientTable = liveCommitsTable->get(part->getMachineId());
1864 if (commitForClientTable == NULL) {
1865 // This is the first commit from this device
1866 commitForClientTable = new Hashtable<int64_t, Commit *>();
1867 liveCommitsTable->put(part->getMachineId(), commitForClientTable);
1870 Commit *commit = commitForClientTable->get(part->getSequenceNumber());
1872 if (commit == NULL) {
1873 // This is a new commit that we dont have so make a new one
1874 commit = new Commit();
1876 // Insert this new commit into the live tables
1877 commitForClientTable->put(part->getSequenceNumber(), commit);
1880 // Add that part to the commit
1881 commit->addPartDecode(part);
1885 // Clear all the new commits parts in preparation for the next time
1886 // the server sends slots
1887 newCommitParts->clear();
1889 // If we process a new commit keep track of it for future use
1890 bool didProcessANewCommit = false;
1892 // Process the commits one by one
1893 for (int64_T arbitratorId : liveCommitsTable->keySet()) {
1895 // Get all the commits for a specific arbitrator
1896 Hashtable<int64_t, Commit *> *commitForClientTable = liveCommitsTable->get(arbitratorId);
1898 // Sort the commits in order
1899 Vector<int64_t> *commitSequenceNumbers = new Vector<int64_t>(commitForClientTable->keySet());
1900 Collections->sort(commitSequenceNumbers);
1902 // Get the last commit seen from this arbitrator
1903 int64_t lastCommitSeenSequenceNumber = -1;
1904 if (lastCommitSeenSequenceNumberByArbitratorTable->get(arbitratorId) != NULL) {
1905 lastCommitSeenSequenceNumber = lastCommitSeenSequenceNumberByArbitratorTable->get(arbitratorId);
1908 // Go through each new commit one by one
1909 for (int i = 0; i < commitSequenceNumbers->size(); i++) {
1910 int64_t commitSequenceNumber = commitSequenceNumbers->get(i);
1911 Commit *commit = commitForClientTable->get(commitSequenceNumber);
1913 // Special processing if a commit is not complete
1914 if (!commit->isComplete()) {
1915 if (i == (commitSequenceNumbers->size() - 1)) {
1916 // If there is an incomplete commit and this commit is the
1917 // latest one seen then this commit cannot be processed and
1918 // there are no other commits
1921 // This is a commit that was already dead but parts of it
1922 // are still in the block chain (not flushed out yet)->
1923 // Delete it and move on
1925 commitForClientTable->remove(commit->getSequenceNumber());
1930 // Update the last transaction that was updated if we can
1931 if (commit->getTransactionSequenceNumber() != -1) {
1932 int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(commit->getMachineId());
1934 // Update the last transaction sequence number that the arbitrator arbitrated on
1935 if ((lastTransactionNumber == NULL) || (lastTransactionNumber < commit->getTransactionSequenceNumber())) {
1936 lastArbitratedTransactionNumberByArbitratorTable->put(commit->getMachineId(), commit->getTransactionSequenceNumber());
1940 // Update the last arbitration data that we have seen so far
1941 if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(commit->getMachineId()) != NULL) {
1943 int64_t lastArbitrationSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(commit->getMachineId());
1944 if (commit->getSequenceNumber() > lastArbitrationSequenceNumber) {
1946 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->put(commit->getMachineId(), commit->getSequenceNumber());
1949 // Never seen any data from this arbitrator so record the first one
1950 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->put(commit->getMachineId(), commit->getSequenceNumber());
1953 // We have already seen this commit before so need to do the
1954 // full processing on this commit
1955 if (commit->getSequenceNumber() <= lastCommitSeenSequenceNumber) {
1957 // Update the last transaction that was updated if we can
1958 if (commit->getTransactionSequenceNumber() != -1) {
1959 int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(commit->getMachineId());
1961 // Update the last transaction sequence number that the arbitrator arbitrated on
1962 if ((lastTransactionNumber == NULL) || (lastTransactionNumber < commit->getTransactionSequenceNumber())) {
1963 lastArbitratedTransactionNumberByArbitratorTable->put(commit->getMachineId(), commit->getTransactionSequenceNumber());
1970 // If we got here then this is a brand new commit and needs full
1972 // Get what commits should be edited, these are the commits that
1973 // have live values for their keys
1974 Hashset<Commit *> *commitsToEdit = new Hashset<Commit *>();
1975 for (KeyValue *kv : commit->getKeyValueUpdateSet()) {
1976 commitsToEdit->add(liveCommitsByKeyTable->get(kv->getKey()));
1978 commitsToEdit->remove(NULL); // remove NULL since it could be in this set
1980 // Update each previous commit that needs to be updated
1981 for (Commit *previousCommit : commitsToEdit) {
1983 // Only bother with live commits (TODO: Maybe remove this check)
1984 if (previousCommit->isLive()) {
1986 // Update which keys in the old commits are still live
1987 for (KeyValue *kv : commit->getKeyValueUpdateSet()) {
1988 previousCommit->invalidateKey(kv->getKey());
1991 // if the commit is now dead then remove it
1992 if (!previousCommit->isLive()) {
1993 commitForClientTable->remove(previousCommit);
1998 // Update the last seen sequence number from this arbitrator
1999 if (lastCommitSeenSequenceNumberByArbitratorTable->get(commit->getMachineId()) != NULL) {
2000 if (commit->getSequenceNumber() > lastCommitSeenSequenceNumberByArbitratorTable->get(commit->getMachineId())) {
2001 lastCommitSeenSequenceNumberByArbitratorTable->put(commit->getMachineId(), commit->getSequenceNumber());
2004 lastCommitSeenSequenceNumberByArbitratorTable->put(commit->getMachineId(), commit->getSequenceNumber());
2007 // We processed a new commit that we havent seen before
2008 didProcessANewCommit = true;
2010 // Update the committed table of keys and which commit is using which key
2011 for (KeyValue *kv : commit->getKeyValueUpdateSet()) {
2012 committedKeyValueTable->put(kv->getKey(), kv);
2013 liveCommitsByKeyTable->put(kv->getKey(), commit);
2018 return didProcessANewCommit;
2022 * Create the speculative table from transactions that are still live
2023 * and have come from the cloud
2025 bool Table::updateSpeculativeTable(bool didProcessNewCommits) {
2026 if (liveTransactionBySequenceNumberTable->keySet()->size() == 0) {
2027 // There is nothing to speculate on
2031 // Create a list of the transaction sequence numbers and sort them
2032 // from oldest to newest
2033 Vector<int64_t> *transactionSequenceNumbersSorted = new Vector<int64_t>(liveTransactionBySequenceNumberTable->keySet());
2034 Collections->sort(transactionSequenceNumbersSorted);
2036 bool hasGapInTransactionSequenceNumbers = transactionSequenceNumbersSorted->get(0) != oldestTransactionSequenceNumberSpeculatedOn;
2039 if (hasGapInTransactionSequenceNumbers || didProcessNewCommits) {
2040 // If there is a gap in the transaction sequence numbers then
2041 // there was a commit or an abort of a transaction OR there was a
2042 // new commit (Could be from offline commit) so a redo the
2043 // speculation from scratch
2045 // Start from scratch
2046 speculatedKeyValueTable->clear();
2047 lastTransactionSequenceNumberSpeculatedOn = -1;
2048 oldestTransactionSequenceNumberSpeculatedOn = -1;
2052 // Remember the front of the transaction list
2053 oldestTransactionSequenceNumberSpeculatedOn = transactionSequenceNumbersSorted->get(0);
2055 // Find where to start arbitration from
2056 int startIndex = transactionSequenceNumbersSorted->indexOf(lastTransactionSequenceNumberSpeculatedOn) + 1;
2058 if (startIndex >= transactionSequenceNumbersSorted->size()) {
2059 // Make sure we are not out of bounds
2060 return false; // did not speculate
2063 Hashset<int64_t> *incompleteTransactionArbitrator = new Hashset<int64_t>();
2064 bool didSkip = true;
2066 for (int i = startIndex; i < transactionSequenceNumbersSorted->size(); i++) {
2067 int64_t transactionSequenceNumber = transactionSequenceNumbersSorted->get(i);
2068 Transaction *transaction = liveTransactionBySequenceNumberTable->get(transactionSequenceNumber);
2070 if (!transaction->isComplete()) {
2071 // If there is an incomplete transaction then there is nothing
2072 // we can do add this transactions arbitrator to the list of
2073 // arbitrators we should ignore
2074 incompleteTransactionArbitrator->add(transaction->getArbitrator());
2079 if (incompleteTransactionArbitrator->contains(transaction->getArbitrator())) {
2083 lastTransactionSequenceNumberSpeculatedOn = transactionSequenceNumber;
2085 if (transaction->evaluateGuard(committedKeyValueTable, speculatedKeyValueTable, NULL)) {
2086 // Guard evaluated to true so update the speculative table
2087 for (KeyValue *kv : transaction->getKeyValueUpdateSet()) {
2088 speculatedKeyValueTable->put(kv->getKey(), kv);
2094 // Since there was a skip we need to redo the speculation next time around
2095 lastTransactionSequenceNumberSpeculatedOn = -1;
2096 oldestTransactionSequenceNumberSpeculatedOn = -1;
2099 // We did some speculation
2104 * Create the pending transaction speculative table from transactions
2105 * that are still in the pending transaction buffer
2107 void Table::updatePendingTransactionSpeculativeTable(bool didProcessNewCommitsOrSpeculate) {
2108 if (pendingTransactionQueue->size() == 0) {
2109 // There is nothing to speculate on
2113 if (didProcessNewCommitsOrSpeculate || (firstPendingTransaction != pendingTransactionQueue->get(0))) {
2114 // need to reset on the pending speculation
2115 lastPendingTransactionSpeculatedOn = NULL;
2116 firstPendingTransaction = pendingTransactionQueue->get(0);
2117 pendingTransactionSpeculatedKeyValueTable->clear();
2120 // Find where to start arbitration from
2121 int startIndex = pendingTransactionQueue->indexOf(firstPendingTransaction) + 1;
2123 if (startIndex >= pendingTransactionQueue->size()) {
2124 // Make sure we are not out of bounds
2128 for (int i = startIndex; i < pendingTransactionQueue->size(); i++) {
2129 Transaction *transaction = pendingTransactionQueue->get(i);
2131 lastPendingTransactionSpeculatedOn = transaction;
2133 if (transaction->evaluateGuard(committedKeyValueTable, speculatedKeyValueTable, pendingTransactionSpeculatedKeyValueTable)) {
2134 // Guard evaluated to true so update the speculative table
2135 for (KeyValue *kv : transaction->getKeyValueUpdateSet()) {
2136 pendingTransactionSpeculatedKeyValueTable->put(kv->getKey(), kv);
2143 * Set dead and remove from the live transaction tables the
2144 * transactions that are dead
2146 void Table::updateLiveTransactionsAndStatus() {
2148 // Go through each of the transactions
2149 for (Iterator<Map->Entry<int64_t, Transaction> > *iter = liveTransactionBySequenceNumberTable->entrySet()->iterator(); iter->hasNext();) {
2150 Transaction *transaction = iter->next()->getValue();
2152 // Check if the transaction is dead
2153 int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(transaction->getArbitrator());
2154 if ((lastTransactionNumber != NULL) && (lastTransactionNumber >= transaction->getSequenceNumber())) {
2156 // Set dead the transaction
2157 transaction->setDead();
2159 // Remove the transaction from the live table
2161 liveTransactionByTransactionIdTable->remove(transaction->getId());
2165 // Go through each of the transactions
2166 for (Iterator<Map->Entry<int64_t, TransactionStatus *> > *iter = outstandingTransactionStatus->entrySet()->iterator(); iter->hasNext();) {
2167 TransactionStatus *status = iter->next()->getValue();
2169 // Check if the transaction is dead
2170 int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(status->getTransactionArbitrator());
2171 if ((lastTransactionNumber != NULL) && (lastTransactionNumber >= status->getTransactionSequenceNumber())) {
2174 status->setStatus(TransactionStatus_StatusCommitted);
2183 * Process this slot, entry by entry-> Also update the latest message sent by slot
2185 void Table::processSlot(SlotIndexer *indexer, Slot *slot, bool acceptUpdatesToLocal, Hashset<int64_t> *machineSet) {
2187 // Update the last message seen
2188 updateLastMessage(slot->getMachineID(), slot->getSequenceNumber(), slot, acceptUpdatesToLocal, machineSet);
2190 // Process each entry in the slot
2191 for (Entry *entry : slot->getEntries()) {
2192 switch (entry->getType()) {
2193 case TypeCommitPart:
2194 processEntry((CommitPart *)entry);
2197 processEntry((Abort *)entry);
2199 case TypeTransactionPart:
2200 processEntry((TransactionPart *)entry);
2203 processEntry((NewKey *)entry);
2205 case TypeLastMessage:
2206 processEntry((LastMessage *)entry, machineSet);
2208 case TypeRejectedMessage:
2209 processEntry((RejectedMessage *)entry, indexer);
2211 case TypeTableStatus:
2212 processEntry((TableStatus *)entry, slot->getSequenceNumber());
2215 throw new Error("Unrecognized type: " + entry->getType());
2221 * Update the last message that was sent for a machine Id
2223 void Table::processEntry(LastMessage *entry, Hashset<int64_t> *machineSet) {
2224 // Update what the last message received by a machine was
2225 updateLastMessage(entry->getMachineID(), entry->getSequenceNumber(), entry, false, machineSet);
2229 * Add the new key to the arbitrators table and update the set of live
2230 * new keys (in case of a rescued new key message)
2232 void Table::processEntry(NewKey *entry) {
2233 // Update the arbitrator table with the new key information
2234 arbitratorTable->put(entry->getKey(), entry->getMachineID());
2236 // Update what the latest live new key is
2237 NewKey *oldNewKey = liveNewKeyTable->put(entry->getKey(), entry);
2238 if (oldNewKey != NULL) {
2239 // Delete the old new key messages
2240 oldNewKey->setDead();
2245 * Process new table status entries and set dead the old ones as new
2246 * ones come in-> keeps track of the largest and smallest table status
2247 * seen in this current round of updating the local copy of the block
2250 void Table::processEntry(TableStatus entry, int64_t seq) {
2251 int newNumSlots = entry->getMaxSlots();
2252 updateCurrMaxSize(newNumSlots);
2253 initExpectedSize(seq, newNumSlots);
2255 if (liveTableStatus != NULL) {
2256 // We have a larger table status so the old table status is no
2258 liveTableStatus->setDead();
2261 // Make this new table status the latest alive table status
2262 liveTableStatus = entry;
2266 * Check old messages to see if there is a block chain violation->
2269 void Table::processEntry(RejectedMessage *entry, SlotIndexer *indexer) {
2270 int64_t oldSeqNum = entry->getOldSeqNum();
2271 int64_t newSeqNum = entry->getNewSeqNum();
2272 bool isequal = entry->getEqual();
2273 int64_t machineId = entry->getMachineID();
2274 int64_t seq = entry->getSequenceNumber();
2276 // Check if we have messages that were supposed to be rejected in
2277 // our local block chain
2278 for (int64_t seqNum = oldSeqNum; seqNum <= newSeqNum; seqNum++) {
2280 Slot *slot = indexer->getSlot(seqNum);
2283 // If we have this slot make sure that it was not supposed to be
2285 int64_t slotMachineId = slot->getMachineID();
2286 if (isequal != (slotMachineId == machineId)) {
2287 throw new Error("Server Error: Trying to insert rejected message for slot " + seqNum);
2292 // Create a list of clients to watch until they see this rejected
2294 Hashset<int64_t> *deviceWatchSet = new Hashset<int64_t>();
2295 for (Map->Entry<int64_t, Pair<int64_t, Liveness *> > *lastMessageEntry : lastMessageTable->entrySet()) {
2296 // Machine ID for the last message entry
2297 int64_t lastMessageEntryMachineId = lastMessageEntry->getKey();
2299 // We've seen it, don't need to continue to watch-> Our next
2300 // message will implicitly acknowledge it->
2301 if (lastMessageEntryMachineId == localMachineId) {
2305 Pair<int64_t, Liveness *> lastMessageValue = lastMessageEntry->getValue();
2306 int64_t entrySequenceNumber = lastMessageValue->getFirst();
2308 if (entrySequenceNumber < seq) {
2309 // Add this rejected message to the set of messages that this
2310 // machine ID did not see yet
2311 addWatchVector(lastMessageEntryMachineId, entry);
2312 // This client did not see this rejected message yet so add it
2313 // to the watch set to monitor
2314 deviceWatchSet->add(lastMessageEntryMachineId);
2317 if (deviceWatchSet->isEmpty()) {
2318 // This rejected message has been seen by all the clients so
2321 // We need to watch this rejected message
2322 entry->setWatchSet(deviceWatchSet);
2327 * Check if this abort is live, if not then save it so we can kill it
2328 * later-> update the last transaction number that was arbitrated on->
2330 void Table::processEntry(Abort *entry) {
2331 if (entry->getTransactionSequenceNumber() != -1) {
2332 // update the transaction status if it was sent to the server
2333 TransactionStatus *status = outstandingTransactionStatus->remove(entry->getTransactionSequenceNumber());
2334 if (status != NULL) {
2335 status->setStatus(TransactionStatus_StatusAborted);
2339 // Abort has not been seen by the client it is for yet so we need to
2341 Abort *previouslySeenAbort = liveAbortTable->put(entry->getAbortId(), entry);
2342 if (previouslySeenAbort != NULL) {
2343 previouslySeenAbort->setDead(); // Delete old version of the abort since we got a rescued newer version
2346 if (entry->getTransactionArbitrator() == localMachineId) {
2347 liveAbortsGeneratedByLocal->put(entry->getArbitratorLocalSequenceNumber(), entry);
2350 if ((entry->getSequenceNumber() != -1) && (lastMessageTable->get(entry->getTransactionMachineId())->getFirst() >= entry->getSequenceNumber())) {
2351 // The machine already saw this so it is dead
2353 liveAbortTable->remove(entry->getAbortId());
2355 if (entry->getTransactionArbitrator() == localMachineId) {
2356 liveAbortsGeneratedByLocal->remove(entry->getArbitratorLocalSequenceNumber());
2361 // Update the last arbitration data that we have seen so far
2362 if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(entry->getTransactionArbitrator()) != NULL) {
2363 int64_t lastArbitrationSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(entry->getTransactionArbitrator());
2364 if (entry->getSequenceNumber() > lastArbitrationSequenceNumber) {
2366 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->put(entry->getTransactionArbitrator(), entry->getSequenceNumber());
2369 // Never seen any data from this arbitrator so record the first one
2370 lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->put(entry->getTransactionArbitrator(), entry->getSequenceNumber());
2373 // Set dead a transaction if we can
2374 Transaction *transactionToSetDead = liveTransactionByTransactionIdTable->remove(Pair<int64_t, int64_t>(entry->getTransactionMachineId(), entry->getTransactionClientLocalSequenceNumber()));
2375 if (transactionToSetDead != NULL) {
2376 liveTransactionBySequenceNumberTable->remove(transactionToSetDead->getSequenceNumber());
2379 // Update the last transaction sequence number that the arbitrator
2381 int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(entry->getTransactionArbitrator());
2382 if ((lastTransactionNumber == NULL) || (lastTransactionNumber < entry->getTransactionSequenceNumber())) {
2384 if (entry->getTransactionSequenceNumber() != -1) {
2385 lastArbitratedTransactionNumberByArbitratorTable->put(entry->getTransactionArbitrator(), entry->getTransactionSequenceNumber());
2391 * Set dead the transaction part if that transaction is dead and keep
2392 * track of all new parts
2394 void Table::processEntry(TransactionPart *entry) {
2395 // Check if we have already seen this transaction and set it dead OR
2396 // if it is not alive
2397 int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(entry->getArbitratorId());
2398 if ((lastTransactionNumber != NULL) && (lastTransactionNumber >= entry->getSequenceNumber())) {
2399 // This transaction is dead, it was already committed or aborted
2404 // This part is still alive
2405 Hashtable<Pair<int64_t, int32_t>, TransactionPart *> *transactionPart = newTransactionParts->get(entry->getMachineId());
2407 if (transactionPart == NULL) {
2408 // Dont have a table for this machine Id yet so make one
2409 transactionPart = new Hashtable<Pair<int64_t, int32_t>, TransactionPart *>();
2410 newTransactionParts->put(entry->getMachineId(), transactionPart);
2413 // Update the part and set dead ones we have already seen (got a
2415 TransactionPart *previouslySeenPart = transactionPart->put(entry->getPartId(), entry);
2416 if (previouslySeenPart != NULL) {
2417 previouslySeenPart->setDead();
2422 * Process new commit entries and save them for future use-> Delete duplicates
2424 void Table::processEntry(CommitPart *entry) {
2425 // Update the last transaction that was updated if we can
2426 if (entry->getTransactionSequenceNumber() != -1) {
2427 int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(entry->getMachineId());
2428 // Update the last transaction sequence number that the arbitrator
2430 if ((lastTransactionNumber == NULL) || (lastTransactionNumber < entry->getTransactionSequenceNumber())) {
2431 lastArbitratedTransactionNumberByArbitratorTable->put(entry->getMachineId(), entry->getTransactionSequenceNumber());
2435 Hashtable<Pair<int64_t, int32_t>, CommitPart *> *commitPart = newCommitParts->get(entry->getMachineId());
2436 if (commitPart == NULL) {
2437 // Don't have a table for this machine Id yet so make one
2438 commitPart = new Hashtable<Pair<int64_t, int32_t>, CommitPart *>();
2439 newCommitParts->put(entry->getMachineId(), commitPart);
2441 // Update the part and set dead ones we have already seen (got a
2443 CommitPart *previouslySeenPart = commitPart->put(entry->getPartId(), entry);
2444 if (previouslySeenPart != NULL) {
2445 previouslySeenPart->setDead();
2450 * Update the last message seen table-> Update and set dead the
2451 * appropriate RejectedMessages as clients see them-> Updates the live
2452 * aborts, removes those that are dead and sets them dead-> Check that
2453 * the last message seen is correct and that there is no mismatch of
2454 * our own last message or that other clients have not had a rollback
2455 * on the last message->
2457 void Table::updateLastMessage(int64_t machineId, int64_t seqNum, Liveness *liveness, bool acceptUpdatesToLocal, Hashset<intr64_t> *machineSet) {
2458 // We have seen this machine ID
2459 machineSet->remove(machineId);
2461 // Get the set of rejected messages that this machine Id is has not seen yet
2462 Hashset<RejectedMessage *> *watchset = rejectedMessageWatchVectorTable->get(machineId);
2463 // If there is a rejected message that this machine Id has not seen yet
2464 if (watchset != NULL) {
2465 // Go through each rejected message that this machine Id has not
2467 for (Iterator<RejectedMessage *> *rmit = watchset->iterator(); rmit->hasNext(); ) {
2468 RejectedMessage *rm = rmit->next();
2469 // If this machine Id has seen this rejected message->->->
2470 if (rm->getSequenceNumber() <= seqNum) {
2471 // Remove it from our watchlist
2473 // Decrement machines that need to see this notification
2474 rm->removeWatcher(machineId);
2479 // Set dead the abort
2480 for (Iterator<Map->Entry<Pair<int64_t, int64_t>, Abort *> > i = liveAbortTable->entrySet()->iterator(); i->hasNext();) {
2481 Abort *abort = i->next()->getValue();
2482 if ((abort->getTransactionMachineId() == machineId) && (abort->getSequenceNumber() <= seqNum)) {
2485 if (abort->getTransactionArbitrator() == localMachineId) {
2486 liveAbortsGeneratedByLocal->remove(abort->getArbitratorLocalSequenceNumber());
2490 if (machineId == localMachineId) {
2491 // Our own messages are immediately dead->
2492 if (liveness instanceof LastMessage) {
2493 ((LastMessage *)liveness)->setDead();
2494 } else if (liveness instanceof Slot) {
2495 ((Slot *)liveness)->setDead();
2497 throw new Error("Unrecognized type");
2500 // Get the old last message for this device
2501 Pair<int64_t, Liveness *> lastMessageEntry = lastMessageTable->put(machineId, Pair<int64_t, Liveness *>(seqNum, liveness));
2502 if (lastMessageEntry == NULL) {
2503 // If no last message then there is nothing else to process
2507 int64_t lastMessageSeqNum = lastMessageEntry->getFirst();
2508 Liveness *lastEntry = lastMessageEntry->getSecond();
2510 // If it is not our machine Id since we already set ours to dead
2511 if (machineId != localMachineId) {
2512 if (lastEntry instanceof LastMessage) {
2513 ((LastMessage *)lastEntry)->setDead();
2514 } else if (lastEntry instanceof Slot) {
2515 ((Slot *)lastEntry)->setDead();
2517 throw new Error("Unrecognized type");
2520 // Make sure the server is not playing any games
2521 if (machineId == localMachineId) {
2522 if (hadPartialSendToServer) {
2523 // We were not making any updates and we had a machine mismatch
2524 if (lastMessageSeqNum > seqNum && !acceptUpdatesToLocal) {
2525 throw new Error("Server Error: Mismatch on local machine sequence number, needed at least: " + lastMessageSeqNum + " got: " + seqNum);
2528 // We were not making any updates and we had a machine mismatch
2529 if (lastMessageSeqNum != seqNum && !acceptUpdatesToLocal) {
2530 throw new Error("Server Error: Mismatch on local machine sequence number, needed: " + lastMessageSeqNum + " got: " + seqNum);
2534 if (lastMessageSeqNum > seqNum) {
2535 throw new Error("Server Error: Rollback on remote machine sequence number");
2541 * Add a rejected message entry to the watch set to keep track of
2542 * which clients have seen that rejected message entry and which have
2545 void Table::addWatchVector(int64_t machineId, RejectedMessage *entry) {
2546 Hashset<RejectedMessage *> *entries = rejectedMessageWatchVectorTable->get(machineId);
2547 if (entries == NULL) {
2548 // There is no set for this machine ID yet so create one
2549 entries = new Hashset<RejectedMessage *>();
2550 rejectedMessageWatchVectorTable->put(machineId, entries);
2552 entries->add(entry);
2556 * Check if the HMAC chain is not violated
2558 void Table::checkHMACChain(SlotIndexer *indexer, Array<Slot *> *newSlots) {
2559 for (int i = 0; i < newSlots->length(); i++) {
2560 Slot *currSlot = newSlots->get(i);
2561 Slot *prevSlot = indexer->getSlot(currSlot->getSequenceNumber() - 1);
2562 if (prevSlot != NULL &&
2563 !prevSlot->getHMAC()->equals(currSlot->getPrevHMAC()))
2564 throw new Error("Server Error: Invalid HMAC Chain");